diff options
Diffstat (limited to 'fs/xfs/xfs_log_cil.c')
-rw-r--r-- | fs/xfs/xfs_log_cil.c | 804 |
1 files changed, 592 insertions, 212 deletions
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c index b0ef071b3cb5..705619e9dab4 100644 --- a/fs/xfs/xfs_log_cil.c +++ b/fs/xfs/xfs_log_cil.c @@ -37,16 +37,94 @@ xlog_cil_ticket_alloc( { struct xlog_ticket *tic; - tic = xlog_ticket_alloc(log, 0, 1, XFS_TRANSACTION, 0); + tic = xlog_ticket_alloc(log, 0, 1, 0); /* * set the current reservation to zero so we know to steal the basic * transaction overhead reservation from the first transaction commit. */ tic->t_curr_res = 0; + tic->t_iclog_hdrs = 0; return tic; } +static inline void +xlog_cil_set_iclog_hdr_count(struct xfs_cil *cil) +{ + struct xlog *log = cil->xc_log; + + atomic_set(&cil->xc_iclog_hdrs, + (XLOG_CIL_BLOCKING_SPACE_LIMIT(log) / + (log->l_iclog_size - log->l_iclog_hsize))); +} + +/* + * Unavoidable forward declaration - xlog_cil_push_work() calls + * xlog_cil_ctx_alloc() itself. + */ +static void xlog_cil_push_work(struct work_struct *work); + +static struct xfs_cil_ctx * +xlog_cil_ctx_alloc(void) +{ + struct xfs_cil_ctx *ctx; + + ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS); + INIT_LIST_HEAD(&ctx->committing); + INIT_LIST_HEAD(&ctx->busy_extents); + INIT_LIST_HEAD(&ctx->log_items); + INIT_LIST_HEAD(&ctx->lv_chain); + INIT_WORK(&ctx->push_work, xlog_cil_push_work); + return ctx; +} + +/* + * Aggregate the CIL per cpu structures into global counts, lists, etc and + * clear the percpu state ready for the next context to use. + */ +static void +xlog_cil_pcp_aggregate( + struct xfs_cil *cil, + struct xfs_cil_ctx *ctx) +{ + struct xlog_cil_pcp *cilpcp; + int cpu; + + for_each_online_cpu(cpu) { + cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); + + ctx->ticket->t_curr_res += cilpcp->space_reserved; + ctx->ticket->t_unit_res += cilpcp->space_reserved; + cilpcp->space_reserved = 0; + + if (!list_empty(&cilpcp->busy_extents)) { + list_splice_init(&cilpcp->busy_extents, + &ctx->busy_extents); + } + if (!list_empty(&cilpcp->log_items)) + list_splice_init(&cilpcp->log_items, &ctx->log_items); + + /* + * We're in the middle of switching cil contexts. Reset the + * counter we use to detect when the current context is nearing + * full. + */ + cilpcp->space_used = 0; + } +} + +static void +xlog_cil_ctx_switch( + struct xfs_cil *cil, + struct xfs_cil_ctx *ctx) +{ + xlog_cil_set_iclog_hdr_count(cil); + set_bit(XLOG_CIL_EMPTY, &cil->xc_flags); + ctx->sequence = ++cil->xc_current_sequence; + ctx->cil = cil; + cil->xc_ctx = ctx; +} + /* * After the first stage of log recovery is done, we know where the head and * tail of the log are. We need this log initialisation done before we can @@ -63,6 +141,7 @@ xlog_cil_init_post_recovery( { log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log); log->l_cilp->xc_ctx->sequence = 1; + xlog_cil_set_iclog_hdr_count(log->l_cilp); } static inline int @@ -153,13 +232,20 @@ xlog_cil_alloc_shadow_bufs( } /* - * We 64-bit align the length of each iovec so that the start - * of the next one is naturally aligned. We'll need to - * account for that slack space here. Then round nbytes up - * to 64-bit alignment so that the initial buffer alignment is - * easy to calculate and verify. + * We 64-bit align the length of each iovec so that the start of + * the next one is naturally aligned. We'll need to account for + * that slack space here. + * + * We also add the xlog_op_header to each region when + * formatting, but that's not accounted to the size of the item + * at this point. Hence we'll need an addition number of bytes + * for each vector to hold an opheader. + * + * Then round nbytes up to 64-bit alignment so that the initial + * buffer alignment is easy to calculate and verify. */ - nbytes += niovecs * sizeof(uint64_t); + nbytes += niovecs * + (sizeof(uint64_t) + sizeof(struct xlog_op_header)); nbytes = round_up(nbytes, sizeof(uint64_t)); /* @@ -188,6 +274,7 @@ xlog_cil_alloc_shadow_bufs( lv = kmem_alloc_large(buf_size, KM_NOFS); memset(lv, 0, xlog_cil_iovec_space(niovecs)); + INIT_LIST_HEAD(&lv->lv_list); lv->lv_item = lip; lv->lv_size = buf_size; if (ordered) @@ -203,7 +290,6 @@ xlog_cil_alloc_shadow_bufs( else lv->lv_buf_len = 0; lv->lv_bytes = 0; - lv->lv_next = NULL; } /* Ensure the lv is set up according to ->iop_size */ @@ -217,22 +303,18 @@ xlog_cil_alloc_shadow_bufs( /* * Prepare the log item for insertion into the CIL. Calculate the difference in - * log space and vectors it will consume, and if it is a new item pin it as - * well. + * log space it will consume, and if it is a new item pin it as well. */ STATIC void xfs_cil_prepare_item( struct xlog *log, struct xfs_log_vec *lv, struct xfs_log_vec *old_lv, - int *diff_len, - int *diff_iovecs) + int *diff_len) { /* Account for the new LV being passed in */ - if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) { + if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) *diff_len += lv->lv_bytes; - *diff_iovecs += lv->lv_niovecs; - } /* * If there is no old LV, this is the first time we've seen the item in @@ -249,7 +331,6 @@ xfs_cil_prepare_item( ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED); *diff_len -= old_lv->lv_bytes; - *diff_iovecs -= old_lv->lv_niovecs; lv->lv_item->li_lv_shadow = old_lv; } @@ -298,12 +379,10 @@ static void xlog_cil_insert_format_items( struct xlog *log, struct xfs_trans *tp, - int *diff_len, - int *diff_iovecs) + int *diff_len) { struct xfs_log_item *lip; - /* Bail out if we didn't find a log item. */ if (list_empty(&tp->t_items)) { ASSERT(0); @@ -337,7 +416,6 @@ xlog_cil_insert_format_items( if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) { /* same or smaller, optimise common overwrite case */ lv = lip->li_lv; - lv->lv_next = NULL; if (ordered) goto insert; @@ -346,7 +424,6 @@ xlog_cil_insert_format_items( * set the item up as though it is a new insertion so * that the space reservation accounting is correct. */ - *diff_iovecs -= lv->lv_niovecs; *diff_len -= lv->lv_bytes; /* Ensure the lv is set up according to ->iop_size */ @@ -371,7 +448,7 @@ xlog_cil_insert_format_items( ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t))); lip->li_ops->iop_format(lip, lv); insert: - xfs_cil_prepare_item(log, lv, old_lv, diff_len, diff_iovecs); + xfs_cil_prepare_item(log, lv, old_lv, diff_len); } } @@ -391,9 +468,10 @@ xlog_cil_insert_items( struct xfs_cil_ctx *ctx = cil->xc_ctx; struct xfs_log_item *lip; int len = 0; - int diff_iovecs = 0; - int iclog_space; int iovhdr_res = 0, split_res = 0, ctx_res = 0; + int space_used; + int order; + struct xlog_cil_pcp *cilpcp; ASSERT(tp); @@ -401,51 +479,92 @@ xlog_cil_insert_items( * We can do this safely because the context can't checkpoint until we * are done so it doesn't matter exactly how we update the CIL. */ - xlog_cil_insert_format_items(log, tp, &len, &diff_iovecs); + xlog_cil_insert_format_items(log, tp, &len); - spin_lock(&cil->xc_cil_lock); + /* + * We need to take the CIL checkpoint unit reservation on the first + * commit into the CIL. Test the XLOG_CIL_EMPTY bit first so we don't + * unnecessarily do an atomic op in the fast path here. We can clear the + * XLOG_CIL_EMPTY bit as we are under the xc_ctx_lock here and that + * needs to be held exclusively to reset the XLOG_CIL_EMPTY bit. + */ + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) && + test_and_clear_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) + ctx_res = ctx->ticket->t_unit_res; - /* account for space used by new iovec headers */ - iovhdr_res = diff_iovecs * sizeof(xlog_op_header_t); - len += iovhdr_res; - ctx->nvecs += diff_iovecs; + /* + * Check if we need to steal iclog headers. atomic_read() is not a + * locked atomic operation, so we can check the value before we do any + * real atomic ops in the fast path. If we've already taken the CIL unit + * reservation from this commit, we've already got one iclog header + * space reserved so we have to account for that otherwise we risk + * overrunning the reservation on this ticket. + * + * If the CIL is already at the hard limit, we might need more header + * space that originally reserved. So steal more header space from every + * commit that occurs once we are over the hard limit to ensure the CIL + * push won't run out of reservation space. + * + * This can steal more than we need, but that's OK. + */ + space_used = atomic_read(&ctx->space_used); + if (atomic_read(&cil->xc_iclog_hdrs) > 0 || + space_used + len >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) { + int split_res = log->l_iclog_hsize + + sizeof(struct xlog_op_header); + if (ctx_res) + ctx_res += split_res * (tp->t_ticket->t_iclog_hdrs - 1); + else + ctx_res = split_res * tp->t_ticket->t_iclog_hdrs; + atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs); + } + /* + * Update the CIL percpu pointer. This updates the global counter when + * over the percpu batch size or when the CIL is over the space limit. + * This means low lock overhead for normal updates, and when over the + * limit the space used is immediately accounted. This makes enforcing + * the hard limit much more accurate. The per cpu fold threshold is + * based on how close we are to the hard limit. + */ + cilpcp = get_cpu_ptr(cil->xc_pcp); + cilpcp->space_reserved += ctx_res; + cilpcp->space_used += len; + if (space_used >= XLOG_CIL_SPACE_LIMIT(log) || + cilpcp->space_used > + ((XLOG_CIL_BLOCKING_SPACE_LIMIT(log) - space_used) / + num_online_cpus())) { + atomic_add(cilpcp->space_used, &ctx->space_used); + cilpcp->space_used = 0; + } /* attach the transaction to the CIL if it has any busy extents */ if (!list_empty(&tp->t_busy)) - list_splice_init(&tp->t_busy, &ctx->busy_extents); - + list_splice_init(&tp->t_busy, &cilpcp->busy_extents); /* - * Now transfer enough transaction reservation to the context ticket - * for the checkpoint. The context ticket is special - the unit - * reservation has to grow as well as the current reservation as we - * steal from tickets so we can correctly determine the space used - * during the transaction commit. + * Now update the order of everything modified in the transaction + * and insert items into the CIL if they aren't already there. + * We do this here so we only need to take the CIL lock once during + * the transaction commit. */ - if (ctx->ticket->t_curr_res == 0) { - ctx_res = ctx->ticket->t_unit_res; - ctx->ticket->t_curr_res = ctx_res; - tp->t_ticket->t_curr_res -= ctx_res; - } + order = atomic_inc_return(&ctx->order_id); + list_for_each_entry(lip, &tp->t_items, li_trans) { - /* do we need space for more log record headers? */ - iclog_space = log->l_iclog_size - log->l_iclog_hsize; - if (len > 0 && (ctx->space_used / iclog_space != - (ctx->space_used + len) / iclog_space)) { - split_res = (len + iclog_space - 1) / iclog_space; - /* need to take into account split region headers, too */ - split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header); - ctx->ticket->t_unit_res += split_res; - ctx->ticket->t_curr_res += split_res; - tp->t_ticket->t_curr_res -= split_res; - ASSERT(tp->t_ticket->t_curr_res >= len); + /* Skip items which aren't dirty in this transaction. */ + if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) + continue; + + lip->li_order_id = order; + if (!list_empty(&lip->li_cil)) + continue; + list_add_tail(&lip->li_cil, &cilpcp->log_items); } - tp->t_ticket->t_curr_res -= len; - ctx->space_used += len; + put_cpu_ptr(cilpcp); /* * If we've overrun the reservation, dump the tx details before we move * the log items. Shutdown is imminent... */ + tp->t_ticket->t_curr_res -= ctx_res + len; if (WARN_ON(tp->t_ticket->t_curr_res < 0)) { xfs_warn(log->l_mp, "Transaction log reservation overrun:"); xfs_warn(log->l_mp, @@ -457,42 +576,20 @@ xlog_cil_insert_items( xlog_print_trans(tp); } - /* - * Now (re-)position everything modified at the tail of the CIL. - * We do this here so we only need to take the CIL lock once during - * the transaction commit. - */ - list_for_each_entry(lip, &tp->t_items, li_trans) { - - /* Skip items which aren't dirty in this transaction. */ - if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) - continue; - - /* - * Only move the item if it isn't already at the tail. This is - * to prevent a transient list_empty() state when reinserting - * an item that is already the only item in the CIL. - */ - if (!list_is_last(&lip->li_cil, &cil->xc_cil)) - list_move_tail(&lip->li_cil, &cil->xc_cil); - } - - spin_unlock(&cil->xc_cil_lock); - if (tp->t_ticket->t_curr_res < 0) xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR); } static void xlog_cil_free_logvec( - struct xfs_log_vec *log_vector) + struct list_head *lv_chain) { struct xfs_log_vec *lv; - for (lv = log_vector; lv; ) { - struct xfs_log_vec *next = lv->lv_next; + while (!list_empty(lv_chain)) { + lv = list_first_entry(lv_chain, struct xfs_log_vec, lv_list); + list_del_init(&lv->lv_list); kmem_free(lv); - lv = next; } } @@ -591,7 +688,7 @@ xlog_cil_committed( spin_unlock(&ctx->cil->xc_push_lock); } - xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain, + xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, &ctx->lv_chain, ctx->start_lsn, abort); xfs_extent_busy_sort(&ctx->busy_extents); @@ -602,7 +699,7 @@ xlog_cil_committed( list_del(&ctx->committing); spin_unlock(&ctx->cil->xc_push_lock); - xlog_cil_free_logvec(ctx->lv_chain); + xlog_cil_free_logvec(&ctx->lv_chain); if (!list_empty(&ctx->busy_extents)) xlog_discard_busy_extents(mp, ctx); @@ -623,6 +720,89 @@ xlog_cil_process_committed( } } +struct xlog_cil_trans_hdr { + struct xlog_op_header oph[2]; + struct xfs_trans_header thdr; + struct xfs_log_iovec lhdr[2]; +}; + +/* + * Build a checkpoint transaction header to begin the journal transaction. We + * need to account for the space used by the transaction header here as it is + * not accounted for in xlog_write(). + * + * This is the only place we write a transaction header, so we also build the + * log opheaders that indicate the start of a log transaction and wrap the + * transaction header. We keep the start record in it's own log vector rather + * than compacting them into a single region as this ends up making the logic + * in xlog_write() for handling empty opheaders for start, commit and unmount + * records much simpler. + */ +static void +xlog_cil_build_trans_hdr( + struct xfs_cil_ctx *ctx, + struct xlog_cil_trans_hdr *hdr, + struct xfs_log_vec *lvhdr, + int num_iovecs) +{ + struct xlog_ticket *tic = ctx->ticket; + uint32_t tid = cpu_to_be32(tic->t_tid); + + memset(hdr, 0, sizeof(*hdr)); + + /* Log start record */ + hdr->oph[0].oh_tid = tid; + hdr->oph[0].oh_clientid = XFS_TRANSACTION; + hdr->oph[0].oh_flags = XLOG_START_TRANS; + + /* log iovec region pointer */ + hdr->lhdr[0].i_addr = &hdr->oph[0]; + hdr->lhdr[0].i_len = sizeof(struct xlog_op_header); + hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER; + + /* log opheader */ + hdr->oph[1].oh_tid = tid; + hdr->oph[1].oh_clientid = XFS_TRANSACTION; + + /* transaction header */ + hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC; + hdr->thdr.th_type = XFS_TRANS_CHECKPOINT; + hdr->thdr.th_tid = tid; + hdr->thdr.th_num_items = num_iovecs; + + /* log iovec region pointer */ + hdr->lhdr[1].i_addr = &hdr->oph[1]; + hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) + + sizeof(struct xfs_trans_header); + hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR; + + lvhdr->lv_niovecs = 2; + lvhdr->lv_iovecp = &hdr->lhdr[0]; + lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len; + + tic->t_curr_res -= lvhdr->lv_bytes; +} + +/* + * CIL item reordering compare function. We want to order in ascending ID order, + * but we want to leave items with the same ID in the order they were added to + * the list. This is important for operations like reflink where we log 4 order + * dependent intents in a single transaction when we overwrite an existing + * shared extent with a new shared extent. i.e. BUI(unmap), CUI(drop), + * CUI (inc), BUI(remap)... + */ +static int +xlog_cil_order_cmp( + void *priv, + const struct list_head *a, + const struct list_head *b) +{ + struct xfs_log_vec *l1 = container_of(a, struct xfs_log_vec, lv_list); + struct xfs_log_vec *l2 = container_of(b, struct xfs_log_vec, lv_list); + + return l1->lv_order_id > l2->lv_order_id; +} + /* * Push the Committed Item List to the log. * @@ -641,36 +821,45 @@ static void xlog_cil_push_work( struct work_struct *work) { - struct xfs_cil *cil = - container_of(work, struct xfs_cil, xc_push_work); + struct xfs_cil_ctx *ctx = + container_of(work, struct xfs_cil_ctx, push_work); + struct xfs_cil *cil = ctx->cil; struct xlog *log = cil->xc_log; struct xfs_log_vec *lv; - struct xfs_cil_ctx *ctx; struct xfs_cil_ctx *new_ctx; struct xlog_in_core *commit_iclog; - struct xlog_ticket *tic; - int num_iovecs; + int num_iovecs = 0; + int num_bytes = 0; int error = 0; - struct xfs_trans_header thdr; - struct xfs_log_iovec lhdr; - struct xfs_log_vec lvhdr = { NULL }; + struct xlog_cil_trans_hdr thdr; + struct xfs_log_vec lvhdr = {}; xfs_lsn_t commit_lsn; xfs_lsn_t push_seq; + struct bio bio; + DECLARE_COMPLETION_ONSTACK(bdev_flush); + bool push_commit_stable; + struct xlog_ticket *ticket; - new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_NOFS); + new_ctx = xlog_cil_ctx_alloc(); new_ctx->ticket = xlog_cil_ticket_alloc(log); down_write(&cil->xc_ctx_lock); - ctx = cil->xc_ctx; spin_lock(&cil->xc_push_lock); push_seq = cil->xc_push_seq; ASSERT(push_seq <= ctx->sequence); + push_commit_stable = cil->xc_push_commit_stable; + cil->xc_push_commit_stable = false; /* - * Wake up any background push waiters now this context is being pushed. + * As we are about to switch to a new, empty CIL context, we no longer + * need to throttle tasks on CIL space overruns. Wake any waiters that + * the hard push throttle may have caught so they can start committing + * to the new context. The ctx->xc_push_lock provides the serialisation + * necessary for safely using the lockless waitqueue_active() check in + * this context. */ - if (ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) + if (waitqueue_active(&cil->xc_push_wait)) wake_up_all(&cil->xc_push_wait); /* @@ -678,7 +867,7 @@ xlog_cil_push_work( * move on to a new sequence number and so we have to be able to push * this sequence again later. */ - if (list_empty(&cil->xc_cil)) { + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) { cil->xc_push_seq = 0; spin_unlock(&cil->xc_push_lock); goto out_skip; @@ -686,7 +875,7 @@ xlog_cil_push_work( /* check for a previously pushed sequence */ - if (push_seq < cil->xc_ctx->sequence) { + if (push_seq < ctx->sequence) { spin_unlock(&cil->xc_push_lock); goto out_skip; } @@ -719,42 +908,36 @@ xlog_cil_push_work( spin_unlock(&cil->xc_push_lock); /* - * pull all the log vectors off the items in the CIL, and - * remove the items from the CIL. We don't need the CIL lock - * here because it's only needed on the transaction commit - * side which is currently locked out by the flush lock. + * The CIL is stable at this point - nothing new will be added to it + * because we hold the flush lock exclusively. Hence we can now issue + * a cache flush to ensure all the completed metadata in the journal we + * are about to overwrite is on stable storage. */ - lv = NULL; - num_iovecs = 0; - while (!list_empty(&cil->xc_cil)) { + xfs_flush_bdev_async(&bio, log->l_mp->m_ddev_targp->bt_bdev, + &bdev_flush); + + xlog_cil_pcp_aggregate(cil, ctx); + + while (!list_empty(&ctx->log_items)) { struct xfs_log_item *item; - item = list_first_entry(&cil->xc_cil, + item = list_first_entry(&ctx->log_items, struct xfs_log_item, li_cil); - list_del_init(&item->li_cil); - if (!ctx->lv_chain) - ctx->lv_chain = item->li_lv; - else - lv->lv_next = item->li_lv; lv = item->li_lv; - item->li_lv = NULL; + lv->lv_order_id = item->li_order_id; num_iovecs += lv->lv_niovecs; - } + /* we don't write ordered log vectors */ + if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) + num_bytes += lv->lv_bytes; - /* - * initialise the new context and attach it to the CIL. Then attach - * the current context to the CIL committing list so it can be found - * during log forces to extract the commit lsn of the sequence that - * needs to be forced. - */ - INIT_LIST_HEAD(&new_ctx->committing); - INIT_LIST_HEAD(&new_ctx->busy_extents); - new_ctx->sequence = ctx->sequence + 1; - new_ctx->cil = cil; - cil->xc_ctx = new_ctx; + list_add_tail(&lv->lv_list, &ctx->lv_chain); + list_del_init(&item->li_cil); + item->li_order_id = 0; + item->li_lv = NULL; + } /* - * The switch is now done, so we can drop the context lock and move out + * Switch the contexts so we can drop the context lock and move out * of a shared context. We can't just go straight to the commit record, * though - we need to synchronise with previous and future commits so * that the commit records are correctly ordered in the log to ensure @@ -772,41 +955,55 @@ xlog_cil_push_work( * that higher sequences will wait for us to write out a commit record * before they do. * - * xfs_log_force_lsn requires us to mirror the new sequence into the cil + * xfs_log_force_seq requires us to mirror the new sequence into the cil * structure atomically with the addition of this sequence to the * committing list. This also ensures that we can do unlocked checks * against the current sequence in log forces without risking * deferencing a freed context pointer. */ spin_lock(&cil->xc_push_lock); - cil->xc_current_sequence = new_ctx->sequence; + xlog_cil_ctx_switch(cil, new_ctx); spin_unlock(&cil->xc_push_lock); up_write(&cil->xc_ctx_lock); /* + * Sort the log vector chain before we add the transaction headers. + * This ensures we always have the transaction headers at the start + * of the chain. + */ + list_sort(NULL, &ctx->lv_chain, xlog_cil_order_cmp); + + /* * Build a checkpoint transaction header and write it to the log to * begin the transaction. We need to account for the space used by the * transaction header here as it is not accounted for in xlog_write(). - * - * The LSN we need to pass to the log items on transaction commit is - * the LSN reported by the first log vector write. If we use the commit - * record lsn then we can move the tail beyond the grant write head. + * Add the lvhdr to the head of the lv chain we pass to xlog_write() so + * it gets written into the iclog first. + */ + xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs); + num_bytes += lvhdr.lv_bytes; + list_add(&lvhdr.lv_list, &ctx->lv_chain); + + /* + * Before we format and submit the first iclog, we have to ensure that + * the metadata writeback ordering cache flush is complete. */ - tic = ctx->ticket; - thdr.th_magic = XFS_TRANS_HEADER_MAGIC; - thdr.th_type = XFS_TRANS_CHECKPOINT; - thdr.th_tid = tic->t_tid; - thdr.th_num_items = num_iovecs; - lhdr.i_addr = &thdr; - lhdr.i_len = sizeof(xfs_trans_header_t); - lhdr.i_type = XLOG_REG_TYPE_TRANSHDR; - tic->t_curr_res -= lhdr.i_len + sizeof(xlog_op_header_t); - - lvhdr.lv_niovecs = 1; - lvhdr.lv_iovecp = &lhdr; - lvhdr.lv_next = ctx->lv_chain; - - error = xlog_write(log, &lvhdr, tic, &ctx->start_lsn, NULL, 0, true); + wait_for_completion(&bdev_flush); + + /* + * The LSN we need to pass to the log items on transaction commit is the + * LSN reported by the first log vector write, not the commit lsn. If we + * use the commit record lsn then we can move the tail beyond the grant + * write head. + */ + error = xlog_write(log, &ctx->lv_chain, ctx->ticket, &ctx->start_lsn, + NULL, num_bytes); + + /* + * Take the lvhdr back off the lv_chain as it should not be passed + * to log IO completion. + */ + list_del(&lvhdr.lv_list); if (error) goto out_abort_free_ticket; @@ -844,16 +1041,14 @@ restart: } spin_unlock(&cil->xc_push_lock); - error = xlog_commit_record(log, tic, &commit_iclog, &commit_lsn); + error = xlog_commit_record(log, ctx->ticket, &commit_iclog, &commit_lsn); if (error) goto out_abort_free_ticket; - xfs_log_ticket_ungrant(log, tic); - spin_lock(&commit_iclog->ic_callback_lock); if (commit_iclog->ic_state == XLOG_STATE_IOERROR) { spin_unlock(&commit_iclog->ic_callback_lock); - goto out_abort; + goto out_abort_free_ticket; } ASSERT_ALWAYS(commit_iclog->ic_state == XLOG_STATE_ACTIVE || commit_iclog->ic_state == XLOG_STATE_WANT_SYNC); @@ -870,8 +1065,45 @@ restart: wake_up_all(&cil->xc_commit_wait); spin_unlock(&cil->xc_push_lock); - /* release the hounds! */ - xfs_log_release_iclog(commit_iclog); + /* + * Pull the ticket off the ctx so we can ungrant it after releasing the + * commit_iclog. The ctx may be freed by the time we return from + * releasing the commit_iclog (i.e. checkpoint has been completed and + * callback run) so we can't reference the ctx after the call to + * xlog_state_release_iclog(). + */ + ticket = ctx->ticket; + + /* + * If the checkpoint spans multiple iclogs, wait for all previous + * iclogs to complete before we submit the commit_iclog. In this case, + * the commit_iclog write needs to issue a pre-flush so that the + * ordering is correctly preserved down to stable storage. + */ + spin_lock(&log->l_icloglock); + if (ctx->start_lsn != commit_lsn) { + xlog_wait_on_iclog(commit_iclog->ic_prev); + spin_lock(&log->l_icloglock); + commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; + } + + /* + * The commit iclog must be written to stable storage to guarantee + * journal IO vs metadata writeback IO is correctly ordered on stable + * storage. + * + * If the push caller needs the commit to be immediately stable and the + * commit_iclog is not yet marked as XLOG_STATE_WANT_SYNC to indicate it + * will be written when released, switch it's state to WANT_SYNC right + * now. + */ + commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA; + if (push_commit_stable && commit_iclog->ic_state == XLOG_STATE_ACTIVE) + xlog_state_switch_iclogs(log, commit_iclog, 0); + xlog_state_release_iclog(log, commit_iclog, ticket); + spin_unlock(&log->l_icloglock); + + xfs_log_ticket_ungrant(log, ticket); return; out_skip: @@ -881,8 +1113,7 @@ out_skip: return; out_abort_free_ticket: - xfs_log_ticket_ungrant(log, tic); -out_abort: + xfs_log_ticket_ungrant(log, ctx->ticket); ASSERT(XLOG_FORCED_SHUTDOWN(log)); xlog_cil_committed(ctx); } @@ -899,18 +1130,27 @@ xlog_cil_push_background( struct xlog *log) __releases(cil->xc_ctx_lock) { struct xfs_cil *cil = log->l_cilp; + int space_used = atomic_read(&cil->xc_ctx->space_used); /* * The cil won't be empty because we are called while holding the - * context lock so whatever we added to the CIL will still be there + * context lock so whatever we added to the CIL will still be there. */ - ASSERT(!list_empty(&cil->xc_cil)); + ASSERT(!test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)); /* - * don't do a background push if we haven't used up all the - * space available yet. + * We are done if: + * - we haven't used up all the space available yet; or + * - we've already queued up a push; and + * - we're not over the hard limit; and + * - nothing has been over the hard limit. + * + * If so, we don't need to take the push lock as there's nothing to do. */ - if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) { + if (space_used < XLOG_CIL_SPACE_LIMIT(log) || + (cil->xc_push_seq == cil->xc_current_sequence && + space_used < XLOG_CIL_BLOCKING_SPACE_LIMIT(log) && + !waitqueue_active(&cil->xc_push_wait))) { up_read(&cil->xc_ctx_lock); return; } @@ -918,7 +1158,7 @@ xlog_cil_push_background( spin_lock(&cil->xc_push_lock); if (cil->xc_push_seq < cil->xc_current_sequence) { cil->xc_push_seq = cil->xc_current_sequence; - queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work); + queue_work(log->l_mp->m_cil_workqueue, &cil->xc_ctx->push_work); } /* @@ -931,11 +1171,18 @@ xlog_cil_push_background( /* * If we are well over the space limit, throttle the work that is being - * done until the push work on this context has begun. + * done until the push work on this context has begun. Enforce the hard + * throttle on all transaction commits once it has been activated, even + * if the committing transactions have resulted in the space usage + * dipping back down under the hard limit. + * + * The ctx->xc_push_lock provides the serialisation necessary for safely + * using the lockless waitqueue_active() check in this context. */ - if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) { + if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) || + waitqueue_active(&cil->xc_push_wait)) { trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket); - ASSERT(cil->xc_ctx->space_used < log->l_logsize); + ASSERT(space_used < log->l_logsize); xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock); return; } @@ -947,13 +1194,26 @@ xlog_cil_push_background( /* * xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence * number that is passed. When it returns, the work will be queued for - * @push_seq, but it won't be completed. The caller is expected to do any - * waiting for push_seq to complete if it is required. + * @push_seq, but it won't be completed. + * + * If the caller is performing a synchronous force, we will flush the workqueue + * to get previously queued work moving to minimise the wait time they will + * undergo waiting for all outstanding pushes to complete. The caller is + * expected to do the required waiting for push_seq to complete. + * + * If the caller is performing an async push, we need to ensure that the + * checkpoint is fully flushed out of the iclogs when we finish the push. If we + * don't do this, then the commit record may remain sitting in memory in an + * ACTIVE iclog. This then requires another full log force to push to disk, + * which defeats the purpose of having an async, non-blocking CIL force + * mechanism. Hence in this case we need to pass a flag to the push work to + * indicate it needs to flush the commit record itself. */ static void xlog_cil_push_now( struct xlog *log, - xfs_lsn_t push_seq) + xfs_lsn_t push_seq, + bool async) { struct xfs_cil *cil = log->l_cilp; @@ -963,20 +1223,23 @@ xlog_cil_push_now( ASSERT(push_seq && push_seq <= cil->xc_current_sequence); /* start on any pending background push to minimise wait time on it */ - flush_work(&cil->xc_push_work); + if (!async) + flush_workqueue(log->l_mp->m_cil_workqueue); /* * If the CIL is empty or we've already pushed the sequence then * there's no work we need to do. */ spin_lock(&cil->xc_push_lock); - if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) { + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) || + push_seq <= cil->xc_push_seq) { spin_unlock(&cil->xc_push_lock); return; } cil->xc_push_seq = push_seq; - queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work); + cil->xc_push_commit_stable = async; + queue_work(log->l_mp->m_cil_workqueue, &cil->xc_ctx->push_work); spin_unlock(&cil->xc_push_lock); } @@ -988,7 +1251,7 @@ xlog_cil_empty( bool empty = false; spin_lock(&cil->xc_push_lock); - if (list_empty(&cil->xc_cil)) + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) empty = true; spin_unlock(&cil->xc_push_lock); return empty; @@ -1008,16 +1271,14 @@ xlog_cil_empty( * allowed again. */ void -xfs_log_commit_cil( - struct xfs_mount *mp, +xlog_cil_commit( + struct xlog *log, struct xfs_trans *tp, - xfs_lsn_t *commit_lsn, + xfs_csn_t *commit_seq, bool regrant) { - struct xlog *log = mp->m_log; struct xfs_cil *cil = log->l_cilp; struct xfs_log_item *lip, *next; - xfs_lsn_t xc_commit_lsn; /* * Do all necessary memory allocation before we lock the CIL. @@ -1031,10 +1292,6 @@ xfs_log_commit_cil( xlog_cil_insert_items(log, tp); - xc_commit_lsn = cil->xc_ctx->sequence; - if (commit_lsn) - *commit_lsn = xc_commit_lsn; - if (regrant && !XLOG_FORCED_SHUTDOWN(log)) xfs_log_ticket_regrant(log, tp->t_ticket); else @@ -1057,27 +1314,44 @@ xfs_log_commit_cil( list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) { xfs_trans_del_item(lip); if (lip->li_ops->iop_committing) - lip->li_ops->iop_committing(lip, xc_commit_lsn); + lip->li_ops->iop_committing(lip, cil->xc_ctx->sequence); } + if (commit_seq) + *commit_seq = cil->xc_ctx->sequence; /* xlog_cil_push_background() releases cil->xc_ctx_lock */ xlog_cil_push_background(log); } /* + * Flush the CIL to stable storage but don't wait for it to complete. This + * requires the CIL push to ensure the commit record for the push hits the disk, + * but otherwise is no different to a push done from a log force. + */ +void +xlog_cil_flush( + struct xlog *log) +{ + xfs_csn_t seq = log->l_cilp->xc_current_sequence; + + trace_xfs_log_force(log->l_mp, seq, _RET_IP_); + xlog_cil_push_now(log, seq, true); +} + +/* * Conditionally push the CIL based on the sequence passed in. * - * We only need to push if we haven't already pushed the sequence - * number given. Hence the only time we will trigger a push here is - * if the push sequence is the same as the current context. + * We only need to push if we haven't already pushed the sequence number given. + * Hence the only time we will trigger a push here is if the push sequence is + * the same as the current context. * * We return the current commit lsn to allow the callers to determine if a * iclog flush is necessary following this call. */ xfs_lsn_t -xlog_cil_force_lsn( +xlog_cil_force_seq( struct xlog *log, - xfs_lsn_t sequence) + xfs_csn_t sequence) { struct xfs_cil *cil = log->l_cilp; struct xfs_cil_ctx *ctx; @@ -1085,13 +1359,17 @@ xlog_cil_force_lsn( ASSERT(sequence <= cil->xc_current_sequence); + if (!sequence) + sequence = cil->xc_current_sequence; + trace_xfs_log_force(log->l_mp, sequence, _RET_IP_); + /* * check to see if we need to force out the current context. * xlog_cil_push() handles racing pushes for the same sequence, * so no need to deal with it here. */ restart: - xlog_cil_push_now(log, sequence); + xlog_cil_push_now(log, sequence, false); /* * See if we can find a previous sequence still committing. @@ -1115,6 +1393,7 @@ restart: * It is still being pushed! Wait for the push to * complete, then start again from the beginning. */ + XFS_STATS_INC(log->l_mp, xs_log_force_sleep); xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock); goto restart; } @@ -1140,7 +1419,7 @@ restart: * we would have found the context on the committing list. */ if (sequence == cil->xc_current_sequence && - !list_empty(&cil->xc_cil)) { + !test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) { spin_unlock(&cil->xc_push_lock); goto restart; } @@ -1173,21 +1452,126 @@ bool xfs_log_item_in_current_chkpt( struct xfs_log_item *lip) { - struct xfs_cil_ctx *ctx; + struct xfs_cil *cil = lip->li_mountp->m_log->l_cilp; - if (list_empty(&lip->li_cil)) + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) return false; - ctx = lip->li_mountp->m_log->l_cilp->xc_ctx; - /* * li_seq is written on the first commit of a log item to record the * first checkpoint it is written to. Hence if it is different to the * current sequence, we're in a new checkpoint. */ - if (XFS_LSN_CMP(lip->li_seq, ctx->sequence) != 0) - return false; - return true; + return lip->li_seq == cil->xc_ctx->sequence; +} + +#ifdef CONFIG_HOTPLUG_CPU +static LIST_HEAD(xlog_cil_pcp_list); +static DEFINE_SPINLOCK(xlog_cil_pcp_lock); + +/* + * Move dead percpu state to the relevant CIL context structures. + * + * We have to lock the CIL context here to ensure that nothing is modifying + * the percpu state, either addition or removal. Both of these are done under + * the CIL context lock, so grabbing that exclusively here will ensure we can + * safely drain the cilpcp for the CPU that is dying. + */ +void +xlog_cil_pcp_dead( + unsigned int cpu) +{ + struct xfs_cil *cil, *n; + + spin_lock(&xlog_cil_pcp_lock); + list_for_each_entry_safe(cil, n, &xlog_cil_pcp_list, xc_pcp_list) { + struct xlog_cil_pcp *cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); + struct xfs_cil_ctx *ctx; + + spin_unlock(&xlog_cil_pcp_lock); + down_write(&cil->xc_ctx_lock); + ctx = cil->xc_ctx; + + atomic_add(cilpcp->space_used, &ctx->space_used); + if (ctx->ticket) { + ctx->ticket->t_curr_res += cilpcp->space_reserved; + ctx->ticket->t_unit_res += cilpcp->space_reserved; + } + if (!list_empty(&cilpcp->busy_extents)) { + list_splice_init(&cilpcp->busy_extents, + &ctx->busy_extents); + } + if (!list_empty(&cilpcp->log_items)) + list_splice_init(&cilpcp->log_items, &ctx->log_items); + + cilpcp->space_used = 0; + cilpcp->space_reserved = 0; + + up_write(&cil->xc_ctx_lock); + spin_lock(&xlog_cil_pcp_lock); + } + spin_unlock(&xlog_cil_pcp_lock); +} + +static int +xlog_cil_pcp_hpadd( + struct xfs_cil *cil) +{ + INIT_LIST_HEAD(&cil->xc_pcp_list); + spin_lock(&xlog_cil_pcp_lock); + list_add(&cil->xc_pcp_list, &xlog_cil_pcp_list); + spin_unlock(&xlog_cil_pcp_lock); + return 0; +} + +static void +xlog_cil_pcp_hpremove( + struct xfs_cil *cil) +{ + spin_lock(&xlog_cil_pcp_lock); + list_del(&cil->xc_pcp_list); + spin_unlock(&xlog_cil_pcp_lock); +} + +#else /* !CONFIG_HOTPLUG_CPU */ +static inline int xlog_cil_pcp_hpadd(struct xfs_cil *cil) { return 0; } +static inline void xlog_cil_pcp_hpremove(struct xfs_cil *cil) {} +#endif + +static void __percpu * +xlog_cil_pcp_alloc( + struct xfs_cil *cil) +{ + struct xlog_cil_pcp *cilpcp; + void __percpu *pcp; + int cpu; + + pcp = alloc_percpu(struct xlog_cil_pcp); + if (!pcp) + return NULL; + + if (xlog_cil_pcp_hpadd(cil) < 0) { + free_percpu(pcp); + return NULL; + } + + for_each_possible_cpu(cpu) { + cilpcp = per_cpu_ptr(pcp, cpu); + INIT_LIST_HEAD(&cilpcp->busy_extents); + INIT_LIST_HEAD(&cilpcp->log_items); + } + return pcp; +} + +static void +xlog_cil_pcp_free( + struct xfs_cil *cil, + void __percpu *pcp) +{ + if (!pcp) + return; + xlog_cil_pcp_hpremove(cil); + free_percpu(pcp); } /* @@ -1204,30 +1588,23 @@ xlog_cil_init( if (!cil) return -ENOMEM; - ctx = kmem_zalloc(sizeof(*ctx), KM_MAYFAIL); - if (!ctx) { + cil->xc_log = log; + cil->xc_pcp = xlog_cil_pcp_alloc(cil); + if (!cil->xc_pcp) { kmem_free(cil); return -ENOMEM; } - INIT_WORK(&cil->xc_push_work, xlog_cil_push_work); - INIT_LIST_HEAD(&cil->xc_cil); INIT_LIST_HEAD(&cil->xc_committing); - spin_lock_init(&cil->xc_cil_lock); spin_lock_init(&cil->xc_push_lock); init_waitqueue_head(&cil->xc_push_wait); init_rwsem(&cil->xc_ctx_lock); init_waitqueue_head(&cil->xc_commit_wait); + log->l_cilp = cil; - INIT_LIST_HEAD(&ctx->committing); - INIT_LIST_HEAD(&ctx->busy_extents); - ctx->sequence = 1; - ctx->cil = cil; - cil->xc_ctx = ctx; - cil->xc_current_sequence = ctx->sequence; + ctx = xlog_cil_ctx_alloc(); + xlog_cil_ctx_switch(cil, ctx); - cil->xc_log = log; - log->l_cilp = cil; return 0; } @@ -1235,13 +1612,16 @@ void xlog_cil_destroy( struct xlog *log) { - if (log->l_cilp->xc_ctx) { - if (log->l_cilp->xc_ctx->ticket) - xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket); - kmem_free(log->l_cilp->xc_ctx); + struct xfs_cil *cil = log->l_cilp; + + if (cil->xc_ctx) { + if (cil->xc_ctx->ticket) + xfs_log_ticket_put(cil->xc_ctx->ticket); + kmem_free(cil->xc_ctx); } - ASSERT(list_empty(&log->l_cilp->xc_cil)); - kmem_free(log->l_cilp); + ASSERT(test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)); + xlog_cil_pcp_free(cil, cil->xc_pcp); + kmem_free(cil); } |