diff options
author | Jens Axboe <jens.axboe@oracle.com> | 2009-06-18 14:32:19 +0200 |
---|---|---|
committer | Jens Axboe <jens.axboe@oracle.com> | 2009-06-18 14:32:19 +0200 |
commit | 6f44133e0505f8f1f8b4b2d376f150518ba168cd (patch) | |
tree | 1ee440d68753b767db98d0ec374a95da20b039bf /fs | |
parent | bbe340e2424be08500c88ed2575b4f3f1e0ce715 (diff) |
writeback: support > 1 flusher thread per bdi
Build on the bdi_writeback support by allowing registration of
more than 1 flusher thread. File systems can call bdi_add_flusher_task(bdi)
to add more flusher threads to the device. If they do so, they must also
provide a super_operations function to return the suitable bdi_writeback
struct from any given inode.
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
Diffstat (limited to 'fs')
-rw-r--r-- | fs/fs-writeback.c | 441 |
1 files changed, 364 insertions, 77 deletions
diff --git a/fs/fs-writeback.c b/fs/fs-writeback.c index c6be5d2ca56e..38a1cdf2d27f 100644 --- a/fs/fs-writeback.c +++ b/fs/fs-writeback.c @@ -34,77 +34,230 @@ */ int nr_pdflush_threads; -/** - * writeback_acquire - attempt to get exclusive writeback access to a device - * @bdi: the device's backing_dev_info structure - * - * It is a waste of resources to have more than one pdflush thread blocked on - * a single request queue. Exclusion at the request_queue level is obtained - * via a flag in the request_queue's backing_dev_info.state. - * - * Non-request_queue-backed address_spaces will share default_backing_dev_info, - * unless they implement their own. Which is somewhat inefficient, as this - * may prevent concurrent writeback against multiple devices. +static void generic_sync_wb_inodes(struct bdi_writeback *wb, + struct super_block *sb, + struct writeback_control *wbc); + +/* + * Work items for the bdi_writeback threads */ -static int writeback_acquire(struct bdi_writeback *wb) +struct bdi_work { + struct list_head list; + struct list_head wait_list; + struct rcu_head rcu_head; + + unsigned long seen; + atomic_t pending; + + unsigned long sb_data; + unsigned long nr_pages; + enum writeback_sync_modes sync_mode; + + unsigned long state; +}; + +static struct super_block *bdi_work_sb(struct bdi_work *work) { - struct backing_dev_info *bdi = wb->bdi; + return (struct super_block *) (work->sb_data & ~1UL); +} + +static inline bool bdi_work_on_stack(struct bdi_work *work) +{ + return work->sb_data & 1UL; +} + +static inline void bdi_work_init(struct bdi_work *work, struct super_block *sb, + unsigned long nr_pages, + enum writeback_sync_modes sync_mode) +{ + INIT_RCU_HEAD(&work->rcu_head); + work->sb_data = (unsigned long) sb; + work->nr_pages = nr_pages; + work->sync_mode = sync_mode; + work->state = 1; +} - return !test_and_set_bit(wb->nr, &bdi->wb_active); +static inline void bdi_work_init_on_stack(struct bdi_work *work, + struct super_block *sb, + unsigned long nr_pages, + enum writeback_sync_modes sync_mode) +{ + bdi_work_init(work, sb, nr_pages, sync_mode); + work->sb_data |= 1UL; } /** * writeback_in_progress - determine whether there is writeback in progress * @bdi: the device's backing_dev_info structure. * - * Determine whether there is writeback in progress against a backing device. + * Determine whether there is writeback waiting to be handled against a + * backing device. */ int writeback_in_progress(struct backing_dev_info *bdi) { - return bdi->wb_active != 0; + return !list_empty(&bdi->work_list); } -/** - * writeback_release - relinquish exclusive writeback access against a device. - * @bdi: the device's backing_dev_info structure - */ -static void writeback_release(struct bdi_writeback *wb) +static void bdi_work_clear(struct bdi_work *work) { - struct backing_dev_info *bdi = wb->bdi; + clear_bit(0, &work->state); + smp_mb__after_clear_bit(); + wake_up_bit(&work->state, 0); +} + +static void bdi_work_free(struct rcu_head *head) +{ + struct bdi_work *work = container_of(head, struct bdi_work, rcu_head); - wb->nr_pages = 0; - wb->sb = NULL; - clear_bit(wb->nr, &bdi->wb_active); + if (!bdi_work_on_stack(work)) + kfree(work); + else + bdi_work_clear(work); } -static void wb_start_writeback(struct bdi_writeback *wb, struct super_block *sb, - long nr_pages, - enum writeback_sync_modes sync_mode) +static void wb_work_complete(struct bdi_work *work) { - if (!wb_has_dirty_io(wb)) - return; + const enum writeback_sync_modes sync_mode = work->sync_mode; - if (writeback_acquire(wb)) { - wb->nr_pages = nr_pages; - wb->sb = sb; - wb->sync_mode = sync_mode; + /* + * For allocated work, we can clear the done/seen bit right here. + * For on-stack work, we need to postpone both the clear and free + * to after the RCU grace period, since the stack could be invalidated + * as soon as bdi_work_clear() has done the wakeup. + */ + if (!bdi_work_on_stack(work)) + bdi_work_clear(work); + if (sync_mode == WB_SYNC_NONE || bdi_work_on_stack(work)) + call_rcu(&work->rcu_head, bdi_work_free); +} - if (wb->task) - wake_up_process(wb->task); +static void wb_clear_pending(struct bdi_writeback *wb, struct bdi_work *work) +{ + /* + * The caller has retrieved the work arguments from this work, + * drop our reference. If this is the last ref, delete and free it + */ + if (atomic_dec_and_test(&work->pending)) { + struct backing_dev_info *bdi = wb->bdi; + + spin_lock(&bdi->wb_lock); + list_del_rcu(&work->list); + spin_unlock(&bdi->wb_lock); + + wb_work_complete(work); } } -void bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb, - long nr_pages, enum writeback_sync_modes sync_mode) +static void wb_start_writeback(struct bdi_writeback *wb, struct bdi_work *work) +{ + /* + * If we failed allocating the bdi work item, wake up the wb thread + * always. As a safety precaution, it'll flush out everything + */ + if (!wb_has_dirty_io(wb) && work) + wb_clear_pending(wb, work); + else if (wb->task) + wake_up_process(wb->task); +} + +static void bdi_sched_work(struct backing_dev_info *bdi, struct bdi_work *work) { + if (!bdi_wblist_needs_lock(bdi)) + wb_start_writeback(&bdi->wb, work); + else { + struct bdi_writeback *wb; + int idx; + + idx = srcu_read_lock(&bdi->srcu); + + list_for_each_entry_rcu(wb, &bdi->wb_list, list) + wb_start_writeback(wb, work); + + srcu_read_unlock(&bdi->srcu, idx); + } +} + +static void bdi_queue_work(struct backing_dev_info *bdi, struct bdi_work *work) +{ + if (work) { + work->seen = bdi->wb_mask; + BUG_ON(!work->seen); + atomic_set(&work->pending, bdi->wb_cnt); + BUG_ON(!bdi->wb_cnt); + + /* + * Make sure stores are seen before it appears on the list + */ + smp_mb(); + + spin_lock(&bdi->wb_lock); + list_add_tail_rcu(&work->list, &bdi->work_list); + spin_unlock(&bdi->wb_lock); + } + /* - * This only happens the first time someone kicks this bdi, so put - * it out-of-line. + * If the default thread isn't there, make sure we add it. When + * it gets created and wakes up, we'll run this work. */ - if (unlikely(!bdi->wb.task)) + if (unlikely(list_empty_careful(&bdi->wb_list))) wake_up_process(default_backing_dev_info.wb.task); + else + bdi_sched_work(bdi, work); +} + +/* + * Used for on-stack allocated work items. The caller needs to wait until + * the wb threads have acked the work before it's safe to continue. + */ +static void bdi_wait_on_work_clear(struct bdi_work *work) +{ + wait_on_bit(&work->state, 0, bdi_sched_wait, TASK_UNINTERRUPTIBLE); +} + +static struct bdi_work *bdi_alloc_work(struct super_block *sb, long nr_pages, + enum writeback_sync_modes sync_mode) +{ + struct bdi_work *work; + + work = kmalloc(sizeof(*work), GFP_ATOMIC); + if (work) + bdi_work_init(work, sb, nr_pages, sync_mode); + + return work; +} + +void bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb, + long nr_pages, enum writeback_sync_modes sync_mode) +{ + const bool must_wait = sync_mode == WB_SYNC_ALL; + struct bdi_work work_stack, *work = NULL; + + if (!must_wait) + work = bdi_alloc_work(sb, nr_pages, sync_mode); + + if (!work) { + work = &work_stack; + bdi_work_init_on_stack(work, sb, nr_pages, sync_mode); + } - wb_start_writeback(&bdi->wb, sb, nr_pages, sync_mode); + bdi_queue_work(bdi, work); + + /* + * If the sync mode is WB_SYNC_ALL, block waiting for the work to + * complete. If not, we only need to wait for the work to be started, + * if we allocated it on-stack. We use the same mechanism, if the + * wait bit is set in the bdi_work struct, then threads will not + * clear pending until after they are done. + * + * Note that work == &work_stack if must_wait is true, so we don't + * need to do call_rcu() here ever, since the completion path will + * have done that for us. + */ + if (must_wait || work == &work_stack) { + bdi_wait_on_work_clear(work); + if (work != &work_stack) + call_rcu(&work->rcu_head, bdi_work_free); + } } /* @@ -154,7 +307,7 @@ static void wb_kupdated(struct bdi_writeback *wb) wbc.more_io = 0; wbc.encountered_congestion = 0; wbc.nr_to_write = MAX_WRITEBACK_PAGES; - generic_sync_bdi_inodes(NULL, &wbc); + generic_sync_wb_inodes(wb, NULL, &wbc); if (wbc.nr_to_write > 0) break; /* All the old data is written */ nr_to_write -= MAX_WRITEBACK_PAGES; @@ -171,22 +324,19 @@ static inline bool over_bground_thresh(void) global_page_state(NR_UNSTABLE_NFS) >= background_thresh); } -static void generic_sync_wb_inodes(struct bdi_writeback *wb, - struct super_block *sb, - struct writeback_control *wbc); - -static void wb_writeback(struct bdi_writeback *wb) +static void __wb_writeback(struct bdi_writeback *wb, long nr_pages, + struct super_block *sb, + enum writeback_sync_modes sync_mode) { struct writeback_control wbc = { .bdi = wb->bdi, - .sync_mode = wb->sync_mode, + .sync_mode = sync_mode, .older_than_this = NULL, .range_cyclic = 1, }; - long nr_pages = wb->nr_pages; for (;;) { - if (wbc.sync_mode == WB_SYNC_NONE && nr_pages <= 0 && + if (sync_mode == WB_SYNC_NONE && nr_pages <= 0 && !over_bground_thresh()) break; @@ -194,7 +344,7 @@ static void wb_writeback(struct bdi_writeback *wb) wbc.encountered_congestion = 0; wbc.nr_to_write = MAX_WRITEBACK_PAGES; wbc.pages_skipped = 0; - generic_sync_wb_inodes(wb, wb->sb, &wbc); + generic_sync_wb_inodes(wb, sb, &wbc); nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write; /* * If we ran out of stuff to write, bail unless more_io got set @@ -208,6 +358,90 @@ static void wb_writeback(struct bdi_writeback *wb) } /* + * Return the next bdi_work struct that hasn't been processed by this + * wb thread yet + */ +static struct bdi_work *get_next_work_item(struct backing_dev_info *bdi, + struct bdi_writeback *wb) +{ + struct bdi_work *work, *ret = NULL; + + rcu_read_lock(); + + list_for_each_entry_rcu(work, &bdi->work_list, list) { + if (!test_and_clear_bit(wb->nr, &work->seen)) + continue; + + ret = work; + break; + } + + rcu_read_unlock(); + return ret; +} + +/* + * Retrieve work items and do the writeback they describe + */ +static void wb_writeback(struct bdi_writeback *wb, int force_wait) +{ + struct backing_dev_info *bdi = wb->bdi; + struct bdi_work *work; + + while ((work = get_next_work_item(bdi, wb)) != NULL) { + struct super_block *sb = bdi_work_sb(work); + long nr_pages = work->nr_pages; + enum writeback_sync_modes sync_mode; + + /* + * Override sync mode, in case we must wait for completion + */ + if (force_wait) + work->sync_mode = sync_mode = WB_SYNC_ALL; + else + sync_mode = work->sync_mode; + + /* + * If this isn't a data integrity operation, just notify + * that we have seen this work and we are now starting it. + */ + if (sync_mode == WB_SYNC_NONE) + wb_clear_pending(wb, work); + + __wb_writeback(wb, nr_pages, sb, sync_mode); + + /* + * This is a data integrity writeback, so only do the + * notification when we have completed the work. + */ + if (sync_mode == WB_SYNC_ALL) + wb_clear_pending(wb, work); + } +} + +/* + * This will be inlined in bdi_writeback_task() once we get rid of any + * dirty inodes on the default_backing_dev_info + */ +void wb_do_writeback(struct bdi_writeback *wb, int force_wait) +{ + /* + * We get here in two cases: + * + * schedule_timeout() returned because the dirty writeback + * interval has elapsed. If that happens, the work item list + * will be empty and we will proceed to do kupdated style writeout. + * + * Someone called bdi_start_writeback(), which put one/more work + * items on the work_list. Process those. + */ + if (list_empty(&wb->bdi->work_list)) + wb_kupdated(wb); + else + wb_writeback(wb, force_wait); +} + +/* * Handle writeback of dirty data for the device backed by this bdi. Also * wakes up periodically and does kupdated style flushing. */ @@ -216,26 +450,7 @@ int bdi_writeback_task(struct bdi_writeback *wb) while (!kthread_should_stop()) { unsigned long wait_jiffies; - /* - * We get here in two cases: - * - * schedule_timeout() returned because the dirty writeback - * interval has elapsed. If that happens, we will be able - * to acquire the writeback lock and will proceed to do - * kupdated style writeout. - * - * Someone called bdi_start_writeback(), which will acquire - * the writeback lock. This means our writeback_acquire() - * below will fail and we call into bdi_pdflush() for - * pdflush style writeout. - * - */ - if (writeback_acquire(wb)) - wb_kupdated(wb); - else - wb_writeback(wb); - - writeback_release(wb); + wb_do_writeback(wb, 0); wait_jiffies = msecs_to_jiffies(dirty_writeback_interval * 10); set_current_state(TASK_INTERRUPTIBLE); @@ -246,19 +461,68 @@ int bdi_writeback_task(struct bdi_writeback *wb) return 0; } +/* + * Schedule writeback for all backing devices. Expensive! If this is a data + * integrity operation, writeback will be complete when this returns. If + * we are simply called for WB_SYNC_NONE, then writeback will merely be + * scheduled to run. + */ void bdi_writeback_all(struct super_block *sb, struct writeback_control *wbc) { + const bool must_wait = wbc->sync_mode == WB_SYNC_ALL; struct backing_dev_info *bdi; + struct bdi_work *work; + LIST_HEAD(list); +restart: spin_lock(&bdi_lock); list_for_each_entry(bdi, &bdi_list, bdi_list) { + struct bdi_work *work; + if (!bdi_has_dirty_io(bdi)) continue; - bdi_start_writeback(bdi, sb, wbc->nr_to_write, wbc->sync_mode); + + /* + * If work allocation fails, do the writes inline. We drop + * the lock and restart the list writeout. This should be OK, + * since this happens rarely and because the writeout should + * eventually make more free memory available. + */ + work = bdi_alloc_work(sb, wbc->nr_to_write, wbc->sync_mode); + if (!work) { + struct writeback_control __wbc = *wbc; + + /* + * Not a data integrity writeout, just continue + */ + if (!must_wait) + continue; + + spin_unlock(&bdi_lock); + __wbc = *wbc; + __wbc.bdi = bdi; + generic_sync_bdi_inodes(sb, &__wbc); + goto restart; + } + if (must_wait) + list_add_tail(&work->wait_list, &list); + + bdi_queue_work(bdi, work); } spin_unlock(&bdi_lock); + + /* + * If this is for WB_SYNC_ALL, wait for pending work to complete + * before returning. + */ + while (!list_empty(&list)) { + work = list_entry(list.next, struct bdi_work, wait_list); + list_del(&work->wait_list); + bdi_wait_on_work_clear(work); + call_rcu(&work->rcu_head, bdi_work_free); + } } static noinline void block_dump___mark_inode_dirty(struct inode *inode) @@ -284,11 +548,18 @@ static noinline void block_dump___mark_inode_dirty(struct inode *inode) } /* - * We have only a single wb per bdi, so just return that. + * If the filesystem didn't provide a way to map an inode to a dedicated + * flusher thread, it doesn't support more than 1 thread. So we know it's + * the default thread, return that. */ static inline struct bdi_writeback *inode_get_wb(struct inode *inode) { - return &inode_to_bdi(inode)->wb; + const struct super_operations *sop = inode->i_sb->s_op; + + if (!sop->inode_get_wb) + return &inode_to_bdi(inode)->wb; + + return sop->inode_get_wb(inode); } /** @@ -725,8 +996,24 @@ void generic_sync_bdi_inodes(struct super_block *sb, struct writeback_control *wbc) { struct backing_dev_info *bdi = wbc->bdi; + struct bdi_writeback *wb; - generic_sync_wb_inodes(&bdi->wb, sb, wbc); + /* + * Common case is just a single wb thread and that is embedded in + * the bdi, so it doesn't need locking + */ + if (!bdi_wblist_needs_lock(bdi)) + generic_sync_wb_inodes(&bdi->wb, sb, wbc); + else { + int idx; + + idx = srcu_read_lock(&bdi->srcu); + + list_for_each_entry_rcu(wb, &bdi->wb_list, list) + generic_sync_wb_inodes(wb, sb, wbc); + + srcu_read_unlock(&bdi->srcu, idx); + } } /* @@ -753,7 +1040,7 @@ void generic_sync_sb_inodes(struct super_block *sb, struct writeback_control *wbc) { if (wbc->bdi) - generic_sync_bdi_inodes(sb, wbc); + bdi_start_writeback(wbc->bdi, sb, wbc->nr_to_write, wbc->sync_mode); else bdi_writeback_all(sb, wbc); |