summaryrefslogtreecommitdiff
path: root/fs
diff options
context:
space:
mode:
authorJens Axboe <jens.axboe@oracle.com>2009-06-18 14:32:19 +0200
committerJens Axboe <jens.axboe@oracle.com>2009-06-18 14:32:19 +0200
commit6f44133e0505f8f1f8b4b2d376f150518ba168cd (patch)
tree1ee440d68753b767db98d0ec374a95da20b039bf /fs
parentbbe340e2424be08500c88ed2575b4f3f1e0ce715 (diff)
writeback: support > 1 flusher thread per bdi
Build on the bdi_writeback support by allowing registration of more than 1 flusher thread. File systems can call bdi_add_flusher_task(bdi) to add more flusher threads to the device. If they do so, they must also provide a super_operations function to return the suitable bdi_writeback struct from any given inode. Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/fs-writeback.c441
1 files changed, 364 insertions, 77 deletions
diff --git a/fs/fs-writeback.c b/fs/fs-writeback.c
index c6be5d2ca56e..38a1cdf2d27f 100644
--- a/fs/fs-writeback.c
+++ b/fs/fs-writeback.c
@@ -34,77 +34,230 @@
*/
int nr_pdflush_threads;
-/**
- * writeback_acquire - attempt to get exclusive writeback access to a device
- * @bdi: the device's backing_dev_info structure
- *
- * It is a waste of resources to have more than one pdflush thread blocked on
- * a single request queue. Exclusion at the request_queue level is obtained
- * via a flag in the request_queue's backing_dev_info.state.
- *
- * Non-request_queue-backed address_spaces will share default_backing_dev_info,
- * unless they implement their own. Which is somewhat inefficient, as this
- * may prevent concurrent writeback against multiple devices.
+static void generic_sync_wb_inodes(struct bdi_writeback *wb,
+ struct super_block *sb,
+ struct writeback_control *wbc);
+
+/*
+ * Work items for the bdi_writeback threads
*/
-static int writeback_acquire(struct bdi_writeback *wb)
+struct bdi_work {
+ struct list_head list;
+ struct list_head wait_list;
+ struct rcu_head rcu_head;
+
+ unsigned long seen;
+ atomic_t pending;
+
+ unsigned long sb_data;
+ unsigned long nr_pages;
+ enum writeback_sync_modes sync_mode;
+
+ unsigned long state;
+};
+
+static struct super_block *bdi_work_sb(struct bdi_work *work)
{
- struct backing_dev_info *bdi = wb->bdi;
+ return (struct super_block *) (work->sb_data & ~1UL);
+}
+
+static inline bool bdi_work_on_stack(struct bdi_work *work)
+{
+ return work->sb_data & 1UL;
+}
+
+static inline void bdi_work_init(struct bdi_work *work, struct super_block *sb,
+ unsigned long nr_pages,
+ enum writeback_sync_modes sync_mode)
+{
+ INIT_RCU_HEAD(&work->rcu_head);
+ work->sb_data = (unsigned long) sb;
+ work->nr_pages = nr_pages;
+ work->sync_mode = sync_mode;
+ work->state = 1;
+}
- return !test_and_set_bit(wb->nr, &bdi->wb_active);
+static inline void bdi_work_init_on_stack(struct bdi_work *work,
+ struct super_block *sb,
+ unsigned long nr_pages,
+ enum writeback_sync_modes sync_mode)
+{
+ bdi_work_init(work, sb, nr_pages, sync_mode);
+ work->sb_data |= 1UL;
}
/**
* writeback_in_progress - determine whether there is writeback in progress
* @bdi: the device's backing_dev_info structure.
*
- * Determine whether there is writeback in progress against a backing device.
+ * Determine whether there is writeback waiting to be handled against a
+ * backing device.
*/
int writeback_in_progress(struct backing_dev_info *bdi)
{
- return bdi->wb_active != 0;
+ return !list_empty(&bdi->work_list);
}
-/**
- * writeback_release - relinquish exclusive writeback access against a device.
- * @bdi: the device's backing_dev_info structure
- */
-static void writeback_release(struct bdi_writeback *wb)
+static void bdi_work_clear(struct bdi_work *work)
{
- struct backing_dev_info *bdi = wb->bdi;
+ clear_bit(0, &work->state);
+ smp_mb__after_clear_bit();
+ wake_up_bit(&work->state, 0);
+}
+
+static void bdi_work_free(struct rcu_head *head)
+{
+ struct bdi_work *work = container_of(head, struct bdi_work, rcu_head);
- wb->nr_pages = 0;
- wb->sb = NULL;
- clear_bit(wb->nr, &bdi->wb_active);
+ if (!bdi_work_on_stack(work))
+ kfree(work);
+ else
+ bdi_work_clear(work);
}
-static void wb_start_writeback(struct bdi_writeback *wb, struct super_block *sb,
- long nr_pages,
- enum writeback_sync_modes sync_mode)
+static void wb_work_complete(struct bdi_work *work)
{
- if (!wb_has_dirty_io(wb))
- return;
+ const enum writeback_sync_modes sync_mode = work->sync_mode;
- if (writeback_acquire(wb)) {
- wb->nr_pages = nr_pages;
- wb->sb = sb;
- wb->sync_mode = sync_mode;
+ /*
+ * For allocated work, we can clear the done/seen bit right here.
+ * For on-stack work, we need to postpone both the clear and free
+ * to after the RCU grace period, since the stack could be invalidated
+ * as soon as bdi_work_clear() has done the wakeup.
+ */
+ if (!bdi_work_on_stack(work))
+ bdi_work_clear(work);
+ if (sync_mode == WB_SYNC_NONE || bdi_work_on_stack(work))
+ call_rcu(&work->rcu_head, bdi_work_free);
+}
- if (wb->task)
- wake_up_process(wb->task);
+static void wb_clear_pending(struct bdi_writeback *wb, struct bdi_work *work)
+{
+ /*
+ * The caller has retrieved the work arguments from this work,
+ * drop our reference. If this is the last ref, delete and free it
+ */
+ if (atomic_dec_and_test(&work->pending)) {
+ struct backing_dev_info *bdi = wb->bdi;
+
+ spin_lock(&bdi->wb_lock);
+ list_del_rcu(&work->list);
+ spin_unlock(&bdi->wb_lock);
+
+ wb_work_complete(work);
}
}
-void bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
- long nr_pages, enum writeback_sync_modes sync_mode)
+static void wb_start_writeback(struct bdi_writeback *wb, struct bdi_work *work)
+{
+ /*
+ * If we failed allocating the bdi work item, wake up the wb thread
+ * always. As a safety precaution, it'll flush out everything
+ */
+ if (!wb_has_dirty_io(wb) && work)
+ wb_clear_pending(wb, work);
+ else if (wb->task)
+ wake_up_process(wb->task);
+}
+
+static void bdi_sched_work(struct backing_dev_info *bdi, struct bdi_work *work)
{
+ if (!bdi_wblist_needs_lock(bdi))
+ wb_start_writeback(&bdi->wb, work);
+ else {
+ struct bdi_writeback *wb;
+ int idx;
+
+ idx = srcu_read_lock(&bdi->srcu);
+
+ list_for_each_entry_rcu(wb, &bdi->wb_list, list)
+ wb_start_writeback(wb, work);
+
+ srcu_read_unlock(&bdi->srcu, idx);
+ }
+}
+
+static void bdi_queue_work(struct backing_dev_info *bdi, struct bdi_work *work)
+{
+ if (work) {
+ work->seen = bdi->wb_mask;
+ BUG_ON(!work->seen);
+ atomic_set(&work->pending, bdi->wb_cnt);
+ BUG_ON(!bdi->wb_cnt);
+
+ /*
+ * Make sure stores are seen before it appears on the list
+ */
+ smp_mb();
+
+ spin_lock(&bdi->wb_lock);
+ list_add_tail_rcu(&work->list, &bdi->work_list);
+ spin_unlock(&bdi->wb_lock);
+ }
+
/*
- * This only happens the first time someone kicks this bdi, so put
- * it out-of-line.
+ * If the default thread isn't there, make sure we add it. When
+ * it gets created and wakes up, we'll run this work.
*/
- if (unlikely(!bdi->wb.task))
+ if (unlikely(list_empty_careful(&bdi->wb_list)))
wake_up_process(default_backing_dev_info.wb.task);
+ else
+ bdi_sched_work(bdi, work);
+}
+
+/*
+ * Used for on-stack allocated work items. The caller needs to wait until
+ * the wb threads have acked the work before it's safe to continue.
+ */
+static void bdi_wait_on_work_clear(struct bdi_work *work)
+{
+ wait_on_bit(&work->state, 0, bdi_sched_wait, TASK_UNINTERRUPTIBLE);
+}
+
+static struct bdi_work *bdi_alloc_work(struct super_block *sb, long nr_pages,
+ enum writeback_sync_modes sync_mode)
+{
+ struct bdi_work *work;
+
+ work = kmalloc(sizeof(*work), GFP_ATOMIC);
+ if (work)
+ bdi_work_init(work, sb, nr_pages, sync_mode);
+
+ return work;
+}
+
+void bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
+ long nr_pages, enum writeback_sync_modes sync_mode)
+{
+ const bool must_wait = sync_mode == WB_SYNC_ALL;
+ struct bdi_work work_stack, *work = NULL;
+
+ if (!must_wait)
+ work = bdi_alloc_work(sb, nr_pages, sync_mode);
+
+ if (!work) {
+ work = &work_stack;
+ bdi_work_init_on_stack(work, sb, nr_pages, sync_mode);
+ }
- wb_start_writeback(&bdi->wb, sb, nr_pages, sync_mode);
+ bdi_queue_work(bdi, work);
+
+ /*
+ * If the sync mode is WB_SYNC_ALL, block waiting for the work to
+ * complete. If not, we only need to wait for the work to be started,
+ * if we allocated it on-stack. We use the same mechanism, if the
+ * wait bit is set in the bdi_work struct, then threads will not
+ * clear pending until after they are done.
+ *
+ * Note that work == &work_stack if must_wait is true, so we don't
+ * need to do call_rcu() here ever, since the completion path will
+ * have done that for us.
+ */
+ if (must_wait || work == &work_stack) {
+ bdi_wait_on_work_clear(work);
+ if (work != &work_stack)
+ call_rcu(&work->rcu_head, bdi_work_free);
+ }
}
/*
@@ -154,7 +307,7 @@ static void wb_kupdated(struct bdi_writeback *wb)
wbc.more_io = 0;
wbc.encountered_congestion = 0;
wbc.nr_to_write = MAX_WRITEBACK_PAGES;
- generic_sync_bdi_inodes(NULL, &wbc);
+ generic_sync_wb_inodes(wb, NULL, &wbc);
if (wbc.nr_to_write > 0)
break; /* All the old data is written */
nr_to_write -= MAX_WRITEBACK_PAGES;
@@ -171,22 +324,19 @@ static inline bool over_bground_thresh(void)
global_page_state(NR_UNSTABLE_NFS) >= background_thresh);
}
-static void generic_sync_wb_inodes(struct bdi_writeback *wb,
- struct super_block *sb,
- struct writeback_control *wbc);
-
-static void wb_writeback(struct bdi_writeback *wb)
+static void __wb_writeback(struct bdi_writeback *wb, long nr_pages,
+ struct super_block *sb,
+ enum writeback_sync_modes sync_mode)
{
struct writeback_control wbc = {
.bdi = wb->bdi,
- .sync_mode = wb->sync_mode,
+ .sync_mode = sync_mode,
.older_than_this = NULL,
.range_cyclic = 1,
};
- long nr_pages = wb->nr_pages;
for (;;) {
- if (wbc.sync_mode == WB_SYNC_NONE && nr_pages <= 0 &&
+ if (sync_mode == WB_SYNC_NONE && nr_pages <= 0 &&
!over_bground_thresh())
break;
@@ -194,7 +344,7 @@ static void wb_writeback(struct bdi_writeback *wb)
wbc.encountered_congestion = 0;
wbc.nr_to_write = MAX_WRITEBACK_PAGES;
wbc.pages_skipped = 0;
- generic_sync_wb_inodes(wb, wb->sb, &wbc);
+ generic_sync_wb_inodes(wb, sb, &wbc);
nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
/*
* If we ran out of stuff to write, bail unless more_io got set
@@ -208,6 +358,90 @@ static void wb_writeback(struct bdi_writeback *wb)
}
/*
+ * Return the next bdi_work struct that hasn't been processed by this
+ * wb thread yet
+ */
+static struct bdi_work *get_next_work_item(struct backing_dev_info *bdi,
+ struct bdi_writeback *wb)
+{
+ struct bdi_work *work, *ret = NULL;
+
+ rcu_read_lock();
+
+ list_for_each_entry_rcu(work, &bdi->work_list, list) {
+ if (!test_and_clear_bit(wb->nr, &work->seen))
+ continue;
+
+ ret = work;
+ break;
+ }
+
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Retrieve work items and do the writeback they describe
+ */
+static void wb_writeback(struct bdi_writeback *wb, int force_wait)
+{
+ struct backing_dev_info *bdi = wb->bdi;
+ struct bdi_work *work;
+
+ while ((work = get_next_work_item(bdi, wb)) != NULL) {
+ struct super_block *sb = bdi_work_sb(work);
+ long nr_pages = work->nr_pages;
+ enum writeback_sync_modes sync_mode;
+
+ /*
+ * Override sync mode, in case we must wait for completion
+ */
+ if (force_wait)
+ work->sync_mode = sync_mode = WB_SYNC_ALL;
+ else
+ sync_mode = work->sync_mode;
+
+ /*
+ * If this isn't a data integrity operation, just notify
+ * that we have seen this work and we are now starting it.
+ */
+ if (sync_mode == WB_SYNC_NONE)
+ wb_clear_pending(wb, work);
+
+ __wb_writeback(wb, nr_pages, sb, sync_mode);
+
+ /*
+ * This is a data integrity writeback, so only do the
+ * notification when we have completed the work.
+ */
+ if (sync_mode == WB_SYNC_ALL)
+ wb_clear_pending(wb, work);
+ }
+}
+
+/*
+ * This will be inlined in bdi_writeback_task() once we get rid of any
+ * dirty inodes on the default_backing_dev_info
+ */
+void wb_do_writeback(struct bdi_writeback *wb, int force_wait)
+{
+ /*
+ * We get here in two cases:
+ *
+ * schedule_timeout() returned because the dirty writeback
+ * interval has elapsed. If that happens, the work item list
+ * will be empty and we will proceed to do kupdated style writeout.
+ *
+ * Someone called bdi_start_writeback(), which put one/more work
+ * items on the work_list. Process those.
+ */
+ if (list_empty(&wb->bdi->work_list))
+ wb_kupdated(wb);
+ else
+ wb_writeback(wb, force_wait);
+}
+
+/*
* Handle writeback of dirty data for the device backed by this bdi. Also
* wakes up periodically and does kupdated style flushing.
*/
@@ -216,26 +450,7 @@ int bdi_writeback_task(struct bdi_writeback *wb)
while (!kthread_should_stop()) {
unsigned long wait_jiffies;
- /*
- * We get here in two cases:
- *
- * schedule_timeout() returned because the dirty writeback
- * interval has elapsed. If that happens, we will be able
- * to acquire the writeback lock and will proceed to do
- * kupdated style writeout.
- *
- * Someone called bdi_start_writeback(), which will acquire
- * the writeback lock. This means our writeback_acquire()
- * below will fail and we call into bdi_pdflush() for
- * pdflush style writeout.
- *
- */
- if (writeback_acquire(wb))
- wb_kupdated(wb);
- else
- wb_writeback(wb);
-
- writeback_release(wb);
+ wb_do_writeback(wb, 0);
wait_jiffies = msecs_to_jiffies(dirty_writeback_interval * 10);
set_current_state(TASK_INTERRUPTIBLE);
@@ -246,19 +461,68 @@ int bdi_writeback_task(struct bdi_writeback *wb)
return 0;
}
+/*
+ * Schedule writeback for all backing devices. Expensive! If this is a data
+ * integrity operation, writeback will be complete when this returns. If
+ * we are simply called for WB_SYNC_NONE, then writeback will merely be
+ * scheduled to run.
+ */
void bdi_writeback_all(struct super_block *sb, struct writeback_control *wbc)
{
+ const bool must_wait = wbc->sync_mode == WB_SYNC_ALL;
struct backing_dev_info *bdi;
+ struct bdi_work *work;
+ LIST_HEAD(list);
+restart:
spin_lock(&bdi_lock);
list_for_each_entry(bdi, &bdi_list, bdi_list) {
+ struct bdi_work *work;
+
if (!bdi_has_dirty_io(bdi))
continue;
- bdi_start_writeback(bdi, sb, wbc->nr_to_write, wbc->sync_mode);
+
+ /*
+ * If work allocation fails, do the writes inline. We drop
+ * the lock and restart the list writeout. This should be OK,
+ * since this happens rarely and because the writeout should
+ * eventually make more free memory available.
+ */
+ work = bdi_alloc_work(sb, wbc->nr_to_write, wbc->sync_mode);
+ if (!work) {
+ struct writeback_control __wbc = *wbc;
+
+ /*
+ * Not a data integrity writeout, just continue
+ */
+ if (!must_wait)
+ continue;
+
+ spin_unlock(&bdi_lock);
+ __wbc = *wbc;
+ __wbc.bdi = bdi;
+ generic_sync_bdi_inodes(sb, &__wbc);
+ goto restart;
+ }
+ if (must_wait)
+ list_add_tail(&work->wait_list, &list);
+
+ bdi_queue_work(bdi, work);
}
spin_unlock(&bdi_lock);
+
+ /*
+ * If this is for WB_SYNC_ALL, wait for pending work to complete
+ * before returning.
+ */
+ while (!list_empty(&list)) {
+ work = list_entry(list.next, struct bdi_work, wait_list);
+ list_del(&work->wait_list);
+ bdi_wait_on_work_clear(work);
+ call_rcu(&work->rcu_head, bdi_work_free);
+ }
}
static noinline void block_dump___mark_inode_dirty(struct inode *inode)
@@ -284,11 +548,18 @@ static noinline void block_dump___mark_inode_dirty(struct inode *inode)
}
/*
- * We have only a single wb per bdi, so just return that.
+ * If the filesystem didn't provide a way to map an inode to a dedicated
+ * flusher thread, it doesn't support more than 1 thread. So we know it's
+ * the default thread, return that.
*/
static inline struct bdi_writeback *inode_get_wb(struct inode *inode)
{
- return &inode_to_bdi(inode)->wb;
+ const struct super_operations *sop = inode->i_sb->s_op;
+
+ if (!sop->inode_get_wb)
+ return &inode_to_bdi(inode)->wb;
+
+ return sop->inode_get_wb(inode);
}
/**
@@ -725,8 +996,24 @@ void generic_sync_bdi_inodes(struct super_block *sb,
struct writeback_control *wbc)
{
struct backing_dev_info *bdi = wbc->bdi;
+ struct bdi_writeback *wb;
- generic_sync_wb_inodes(&bdi->wb, sb, wbc);
+ /*
+ * Common case is just a single wb thread and that is embedded in
+ * the bdi, so it doesn't need locking
+ */
+ if (!bdi_wblist_needs_lock(bdi))
+ generic_sync_wb_inodes(&bdi->wb, sb, wbc);
+ else {
+ int idx;
+
+ idx = srcu_read_lock(&bdi->srcu);
+
+ list_for_each_entry_rcu(wb, &bdi->wb_list, list)
+ generic_sync_wb_inodes(wb, sb, wbc);
+
+ srcu_read_unlock(&bdi->srcu, idx);
+ }
}
/*
@@ -753,7 +1040,7 @@ void generic_sync_sb_inodes(struct super_block *sb,
struct writeback_control *wbc)
{
if (wbc->bdi)
- generic_sync_bdi_inodes(sb, wbc);
+ bdi_start_writeback(wbc->bdi, sb, wbc->nr_to_write, wbc->sync_mode);
else
bdi_writeback_all(sb, wbc);