summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKent Overstreet <koverstreet@google.com>2013-02-20 13:16:51 +1100
committerStephen Rothwell <sfr@canb.auug.org.au>2013-03-01 15:17:25 +1100
commit34efc3d80b0a9f956a9e2e27e9642ba18ad5b5a2 (patch)
treec26b54ad7c8ce3ee50127c5a360f8b6eb1e1bbd8
parentf5a580ef9b2f5ff5fbfcfa29582a8b0874d063ac (diff)
aio: use xchg() instead of completion_lock
So, for sticking kiocb completions on the kioctx ringbuffer, we need a lock - it unfortunately can't be lockless. When the kioctx is shared between threads on different cpus and the rate of completions is high, this lock sees quite a bit of contention - in terms of cacheline contention it's the hottest thing in the aio subsystem. That means, with a regular spinlock, we're going to take a cache miss to grab the lock, then another cache miss when we touch the data the lock protects - if it's on the same cacheline as the lock, other cpus spinning on the lock are going to be pulling it out from under us as we're using it. So, we use an old trick to get rid of this second forced cache miss - make the data the lock protects be the lock itself, so we grab them both at once. Signed-off-by: Kent Overstreet <koverstreet@google.com> Cc: Zach Brown <zab@redhat.com> Cc: Felipe Balbi <balbi@ti.com> Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org> Cc: Mark Fasheh <mfasheh@suse.com> Cc: Joel Becker <jlbec@evilplan.org> Cc: Rusty Russell <rusty@rustcorp.com.au> Cc: Jens Axboe <axboe@kernel.dk> Cc: Asai Thambi S P <asamymuthupa@micron.com> Cc: Selvan Mani <smani@micron.com> Cc: Sam Bradshaw <sbradshaw@micron.com> Cc: Jeff Moyer <jmoyer@redhat.com> Cc: Al Viro <viro@zeniv.linux.org.uk> Cc: Benjamin LaHaise <bcrl@kvack.org> Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
-rw-r--r--fs/aio.c44
1 files changed, 20 insertions, 24 deletions
diff --git a/fs/aio.c b/fs/aio.c
index 3fc5b5122d99..2ff05e6ac8d6 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -102,11 +102,11 @@ struct kioctx {
struct {
struct mutex ring_lock;
wait_queue_head_t wait;
+ unsigned shadow_tail;
} ____cacheline_aligned_in_smp;
struct {
unsigned tail;
- spinlock_t completion_lock;
} ____cacheline_aligned_in_smp;
struct page *internal_pages[AIO_RING_PAGES];
@@ -321,9 +321,9 @@ static void free_ioctx(struct kioctx *ctx)
kunmap_atomic(ring);
while (atomic_read(&ctx->reqs_available) < ctx->nr) {
- wait_event(ctx->wait, head != ctx->tail);
+ wait_event(ctx->wait, head != ctx->shadow_tail);
- avail = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
+ avail = (head < ctx->shadow_tail ? ctx->shadow_tail : ctx->nr) - head;
atomic_add(avail, &ctx->reqs_available);
head += avail;
@@ -388,7 +388,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
rcu_read_unlock();
spin_lock_init(&ctx->ctx_lock);
- spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
@@ -685,18 +684,19 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
* free_ioctx()
*/
atomic_inc(&ctx->reqs_available);
+ smp_mb__after_atomic_inc();
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
}
/*
- * Add a completion event to the ring buffer. Must be done holding
- * ctx->ctx_lock to prevent other code from messing with the tail
- * pointer since we might be called from irq context.
+ * Add a completion event to the ring buffer; ctx->tail is both our lock
+ * and the canonical version of the tail pointer.
*/
- spin_lock_irqsave(&ctx->completion_lock, flags);
+ local_irq_save(flags);
+ while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
+ cpu_relax();
- tail = ctx->tail;
pos = tail + AIO_EVENTS_OFFSET;
if (++tail >= ctx->nr)
@@ -722,14 +722,18 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
smp_wmb(); /* make event visible before updating tail */
- ctx->tail = tail;
+ ctx->shadow_tail = tail;
ring = kmap_atomic(ctx->ring_pages[0]);
ring->tail = tail;
kunmap_atomic(ring);
flush_dcache_page(ctx->ring_pages[0]);
- spin_unlock_irqrestore(&ctx->completion_lock, flags);
+ /* unlock, make new tail visible before checking waitlist */
+ smp_mb();
+
+ ctx->tail = tail;
+ local_irq_restore(flags);
pr_debug("added to ring %p at [%u]\n", iocb, tail);
@@ -745,14 +749,6 @@ put_rq:
/* everything turned out well, dispose of the aiocb. */
aio_put_req(iocb);
- /*
- * We have to order our ring_info tail store above and test
- * of the wait list below outside the wait lock. This is
- * like in wake_up_bit() where clearing a bit has to be
- * ordered with the unlocked test.
- */
- smp_mb();
-
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
@@ -780,19 +776,19 @@ static int aio_read_events_ring(struct kioctx *ctx,
head = ring->head;
kunmap_atomic(ring);
- pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr);
+ pr_debug("h%u t%u m%u\n", head, ctx->shadow_tail, ctx->nr);
- if (head == ctx->tail)
+ if (head == ctx->shadow_tail)
goto out;
__set_current_state(TASK_RUNNING);
while (ret < nr) {
- unsigned i = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
+ unsigned i = (head < ctx->shadow_tail ? ctx->shadow_tail : ctx->nr) - head;
struct io_event *ev;
struct page *page;
- if (head == ctx->tail)
+ if (head == ctx->shadow_tail)
break;
i = min_t(int, i, nr - ret);
@@ -822,7 +818,7 @@ static int aio_read_events_ring(struct kioctx *ctx,
kunmap_atomic(ring);
flush_dcache_page(ctx->ring_pages[0]);
- pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
+ pr_debug("%d h%u t%u\n", ret, head, ctx->shadow_tail);
put_reqs_available(ctx, ret);
out: