diff options
Diffstat (limited to 'fs/aio.c')
-rw-r--r-- | fs/aio.c | 430 |
1 files changed, 308 insertions, 122 deletions
@@ -25,7 +25,9 @@ #include <linux/file.h> #include <linux/mm.h> #include <linux/mman.h> +#include <linux/bio.h> #include <linux/mmu_context.h> +#include <linux/percpu.h> #include <linux/slab.h> #include <linux/timer.h> #include <linux/aio.h> @@ -35,6 +37,7 @@ #include <linux/eventfd.h> #include <linux/blkdev.h> #include <linux/compat.h> +#include <linux/percpu-refcount.h> #include <asm/kmap_types.h> #include <asm/uaccess.h> @@ -59,14 +62,24 @@ struct aio_ring { #define AIO_RING_PAGES 8 +struct kioctx_cpu { + unsigned reqs_available; +}; + struct kioctx { - atomic_t users; - atomic_t dead; + struct percpu_ref users; /* This needs improving */ unsigned long user_id; struct hlist_node list; + struct __percpu kioctx_cpu *cpu; + + /* + * For percpu reqs_available, number of slots we move to/from global + * counter at a time: + */ + unsigned req_batch; /* * This is what userspace passed to io_setup(), it's not used for * anything but counting against the global max_reqs quota. @@ -89,7 +102,15 @@ struct kioctx { struct work_struct rcu_work; struct { - atomic_t reqs_active; + /* + * This counts the number of available slots in the ringbuffer, + * so we avoid overflowing it: it's decremented (if positive) + * when allocating a kiocb and incremented when the resulting + * io_event is pulled off the ringbuffer. + * + * We batch accesses to it with a percpu version. + */ + atomic_t reqs_available; } ____cacheline_aligned_in_smp; struct { @@ -100,11 +121,23 @@ struct kioctx { struct { struct mutex ring_lock; wait_queue_head_t wait; + + /* + * Copy of the real tail - to reduce cacheline bouncing. Updated + * by aio_complete() whenever it updates the real tail. + */ + unsigned shadow_tail; } ____cacheline_aligned_in_smp; struct { + /* + * This is the canonical copy of the tail pointer, updated by + * aio_complete(). But aio_complete() also uses it as a lock, so + * other code can't use it; aio_complete() keeps shadow_tail in + * sync with the real value of the tail pointer for other code + * to use. + */ unsigned tail; - spinlock_t completion_lock; } ____cacheline_aligned_in_smp; struct page *internal_pages[AIO_RING_PAGES]; @@ -275,6 +308,8 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, static void free_ioctx_rcu(struct rcu_head *head) { struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); + + free_percpu(ctx->cpu); kmem_cache_free(kioctx_cachep, ctx); } @@ -288,7 +323,7 @@ static void free_ioctx(struct kioctx *ctx) struct aio_ring *ring; struct io_event res; struct kiocb *req; - unsigned head, avail; + unsigned cpu, head, avail; spin_lock_irq(&ctx->ctx_lock); @@ -302,21 +337,31 @@ static void free_ioctx(struct kioctx *ctx) spin_unlock_irq(&ctx->ctx_lock); + for_each_possible_cpu(cpu) { + struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu); + + atomic_add(kcpu->reqs_available, &ctx->reqs_available); + kcpu->reqs_available = 0; + } + ring = kmap_atomic(ctx->ring_pages[0]); head = ring->head; kunmap_atomic(ring); - while (atomic_read(&ctx->reqs_active) > 0) { - wait_event(ctx->wait, head != ctx->tail); + while (atomic_read(&ctx->reqs_available) < ctx->nr_events - 1) { + wait_event(ctx->wait, + (head != ctx->shadow_tail) || + (atomic_read(&ctx->reqs_available) >= ctx->nr_events - 1)); - avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head; + avail = (head <= ctx->shadow_tail + ? ctx->shadow_tail : ctx->nr_events) - head; - atomic_sub(avail, &ctx->reqs_active); + atomic_add(avail, &ctx->reqs_available); head += avail; head %= ctx->nr_events; } - WARN_ON(atomic_read(&ctx->reqs_active) < 0); + WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr_events - 1); aio_free_ring(ctx); @@ -339,7 +384,7 @@ static void free_ioctx(struct kioctx *ctx) static void put_ioctx(struct kioctx *ctx) { - if (unlikely(atomic_dec_and_test(&ctx->users))) + if (percpu_ref_put(&ctx->users)) free_ioctx(ctx); } @@ -352,6 +397,18 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) struct kioctx *ctx; int err = -ENOMEM; + /* + * We keep track of the number of available ringbuffer slots, to prevent + * overflow (reqs_available), and we also use percpu counters for this. + * + * So since up to half the slots might be on other cpu's percpu counters + * and unavailable, double nr_events so userspace sees what they + * expected: additionally, we move req_batch slots to/from percpu + * counters at a time, so make sure that isn't 0: + */ + nr_events = max(nr_events, num_possible_cpus() * 4); + nr_events *= 2; + /* Prevent overflows */ if ((nr_events > (0x10000000U / sizeof(struct io_event))) || (nr_events > (0x10000000U / sizeof(struct kiocb)))) { @@ -368,18 +425,28 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) ctx->max_reqs = nr_events; - atomic_set(&ctx->users, 2); - atomic_set(&ctx->dead, 0); + percpu_ref_init(&ctx->users); + rcu_read_lock(); + percpu_ref_get(&ctx->users); + rcu_read_unlock(); + spin_lock_init(&ctx->ctx_lock); - spin_lock_init(&ctx->completion_lock); mutex_init(&ctx->ring_lock); init_waitqueue_head(&ctx->wait); INIT_LIST_HEAD(&ctx->active_reqs); - if (aio_setup_ring(ctx) < 0) + ctx->cpu = alloc_percpu(struct kioctx_cpu); + if (!ctx->cpu) goto out_freectx; + if (aio_setup_ring(ctx) < 0) + goto out_freepcpu; + + atomic_set(&ctx->reqs_available, ctx->nr_events - 1); + ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4); + BUG_ON(!ctx->req_batch); + /* limit the number of system wide aios */ spin_lock(&aio_nr_lock); if (aio_nr + nr_events > aio_max_nr || @@ -402,6 +469,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) out_cleanup: err = -EAGAIN; aio_free_ring(ctx); +out_freepcpu: + free_percpu(ctx->cpu); out_freectx: kmem_cache_free(kioctx_cachep, ctx); pr_debug("error allocating ioctx %d\n", err); @@ -431,7 +500,7 @@ static void kill_ioctx_rcu(struct rcu_head *head) */ static void kill_ioctx(struct kioctx *ctx) { - if (!atomic_xchg(&ctx->dead, 1)) { + if (percpu_ref_kill(&ctx->users)) { hlist_del_rcu(&ctx->list); /* Between hlist_del_rcu() and dropping the initial ref */ synchronize_rcu(); @@ -477,12 +546,6 @@ void exit_aio(struct mm_struct *mm) struct hlist_node *n; hlist_for_each_entry_safe(ctx, n, &mm->ioctx_list, list) { - if (1 != atomic_read(&ctx->users)) - printk(KERN_DEBUG - "exit_aio:ioctx still alive: %d %d %d\n", - atomic_read(&ctx->users), - atomic_read(&ctx->dead), - atomic_read(&ctx->reqs_active)); /* * We don't need to bother with munmap() here - * exit_mmap(mm) is coming and it'll unmap everything. @@ -493,13 +556,59 @@ void exit_aio(struct mm_struct *mm) */ ctx->mmap_size = 0; - if (!atomic_xchg(&ctx->dead, 1)) { + if (percpu_ref_kill(&ctx->users)) { hlist_del_rcu(&ctx->list); call_rcu(&ctx->rcu_head, kill_ioctx_rcu); } } } +static void put_reqs_available(struct kioctx *ctx, unsigned nr) +{ + struct kioctx_cpu *kcpu; + + preempt_disable(); + kcpu = this_cpu_ptr(ctx->cpu); + + kcpu->reqs_available += nr; + while (kcpu->reqs_available >= ctx->req_batch * 2) { + kcpu->reqs_available -= ctx->req_batch; + atomic_add(ctx->req_batch, &ctx->reqs_available); + } + + preempt_enable(); +} + +static bool get_reqs_available(struct kioctx *ctx) +{ + struct kioctx_cpu *kcpu; + bool ret = false; + + preempt_disable(); + kcpu = this_cpu_ptr(ctx->cpu); + + if (!kcpu->reqs_available) { + int old, avail = atomic_read(&ctx->reqs_available); + + do { + if (avail < ctx->req_batch) + goto out; + + old = avail; + avail = atomic_cmpxchg(&ctx->reqs_available, + avail, avail - ctx->req_batch); + } while (avail != old); + + kcpu->reqs_available += ctx->req_batch; + } + + ret = true; + kcpu->reqs_available--; +out: + preempt_enable(); + return ret; +} + /* aio_get_req * Allocate a slot for an aio request. Increments the ki_users count * of the kioctx so that the kioctx stays around until all requests are @@ -514,22 +623,18 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) { struct kiocb *req; - if (atomic_read(&ctx->reqs_active) >= ctx->nr_events) + if (!get_reqs_available(ctx)) return NULL; - if (atomic_inc_return(&ctx->reqs_active) > ctx->nr_events - 1) - goto out_put; - req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); if (unlikely(!req)) goto out_put; atomic_set(&req->ki_users, 2); req->ki_ctx = ctx; - return req; out_put: - atomic_dec(&ctx->reqs_active); + put_reqs_available(ctx, 1); return NULL; } @@ -562,7 +667,7 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id) hlist_for_each_entry_rcu(ctx, &mm->ioctx_list, list) { if (ctx->user_id == ctx_id) { - atomic_inc(&ctx->users); + percpu_ref_get(&ctx->users); ret = ctx; break; } @@ -572,66 +677,11 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id) return ret; } -/* aio_complete - * Called when the io request on the given iocb is complete. - */ -void aio_complete(struct kiocb *iocb, long res, long res2) +static inline unsigned kioctx_ring_put(struct kioctx *ctx, struct kiocb *req, + unsigned tail) { - struct kioctx *ctx = iocb->ki_ctx; - struct aio_ring *ring; struct io_event *ev_page, *event; - unsigned long flags; - unsigned tail, pos; - - /* - * Special case handling for sync iocbs: - * - events go directly into the iocb for fast handling - * - the sync task with the iocb in its stack holds the single iocb - * ref, no other paths have a way to get another ref - * - the sync task helpfully left a reference to itself in the iocb - */ - if (is_sync_kiocb(iocb)) { - BUG_ON(atomic_read(&iocb->ki_users) != 1); - iocb->ki_user_data = res; - atomic_set(&iocb->ki_users, 0); - wake_up_process(iocb->ki_obj.tsk); - return; - } - - /* - * Take rcu_read_lock() in case the kioctx is being destroyed, as we - * need to issue a wakeup after decrementing reqs_active. - */ - rcu_read_lock(); - - if (iocb->ki_list.next) { - unsigned long flags; - - spin_lock_irqsave(&ctx->ctx_lock, flags); - list_del(&iocb->ki_list); - spin_unlock_irqrestore(&ctx->ctx_lock, flags); - } - - /* - * cancelled requests don't get events, userland was given one - * when the event got cancelled. - */ - if (unlikely(xchg(&iocb->ki_cancel, - KIOCB_CANCELLED) == KIOCB_CANCELLED)) { - atomic_dec(&ctx->reqs_active); - /* Still need the wake_up in case free_ioctx is waiting */ - goto put_rq; - } - - /* - * Add a completion event to the ring buffer. Must be done holding - * ctx->ctx_lock to prevent other code from messing with the tail - * pointer since we might be called from irq context. - */ - spin_lock_irqsave(&ctx->completion_lock, flags); - - tail = ctx->tail; - pos = tail + AIO_EVENTS_OFFSET; + unsigned pos = tail + AIO_EVENTS_OFFSET; if (++tail >= ctx->nr_events) tail = 0; @@ -639,60 +689,195 @@ void aio_complete(struct kiocb *iocb, long res, long res2) ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]); event = ev_page + pos % AIO_EVENTS_PER_PAGE; - event->obj = (u64)(unsigned long)iocb->ki_obj.user; - event->data = iocb->ki_user_data; - event->res = res; - event->res2 = res2; + event->obj = (u64)(unsigned long)req->ki_obj.user; + event->data = req->ki_user_data; + event->res = req->ki_res; + event->res2 = req->ki_res2; kunmap_atomic(ev_page); flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]); pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n", - ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data, - res, res2); + ctx, tail, req, req->ki_obj.user, req->ki_user_data, + req->ki_res, req->ki_res2); - /* after flagging the request as done, we - * must never even look at it again + return tail; +} + +static inline unsigned kioctx_ring_lock(struct kioctx *ctx) +{ + unsigned tail; + + /* + * ctx->tail is both our lock and the canonical version of the tail + * pointer. */ - smp_wmb(); /* make event visible before updating tail */ + while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX) + cpu_relax(); - ctx->tail = tail; + return tail; +} + +static inline void kioctx_ring_unlock(struct kioctx *ctx, unsigned tail) +{ + struct aio_ring *ring; + + if (!ctx) + return; + + smp_wmb(); + /* make event visible before updating tail */ + + ctx->shadow_tail = tail; ring = kmap_atomic(ctx->ring_pages[0]); ring->tail = tail; kunmap_atomic(ring); flush_dcache_page(ctx->ring_pages[0]); - spin_unlock_irqrestore(&ctx->completion_lock, flags); + /* unlock, make new tail visible before checking waitlist */ + smp_mb(); + + ctx->tail = tail; + + if (waitqueue_active(&ctx->wait)) + wake_up(&ctx->wait); +} + +void batch_complete_aio(struct batch_complete *batch) +{ + struct kioctx *ctx = NULL; + struct eventfd_ctx *eventfd = NULL; + struct rb_node *n; + unsigned long flags; + unsigned tail = 0; + + if (RB_EMPTY_ROOT(&batch->kiocb)) + return; + + /* + * Take rcu_read_lock() in case the kioctx is being destroyed, as we + * need to issue a wakeup after incrementing reqs_available. + */ + rcu_read_lock(); + local_irq_save(flags); + + n = rb_first(&batch->kiocb); + while (n) { + struct kiocb *req = container_of(n, struct kiocb, ki_node); + + if (n->rb_right) { + n->rb_right->__rb_parent_color = n->__rb_parent_color; + n = n->rb_right; + + while (n->rb_left) + n = n->rb_left; + } else { + n = rb_parent(n); + } + + if (unlikely(req->ki_eventfd != eventfd)) { + if (eventfd) { + /* Make event visible */ + kioctx_ring_unlock(ctx, tail); + ctx = NULL; - pr_debug("added to ring %p at [%u]\n", iocb, tail); + eventfd_signal(eventfd, 1); + eventfd_ctx_put(eventfd); + } + + eventfd = req->ki_eventfd; + req->ki_eventfd = NULL; + } + + if (unlikely(req->ki_ctx != ctx)) { + kioctx_ring_unlock(ctx, tail); + + ctx = req->ki_ctx; + tail = kioctx_ring_lock(ctx); + } + + tail = kioctx_ring_put(ctx, req, tail); + aio_put_req(req); + } + + kioctx_ring_unlock(ctx, tail); + local_irq_restore(flags); + rcu_read_unlock(); /* * Check if the user asked us to deliver the result through an * eventfd. The eventfd_signal() function is safe to be called * from IRQ context. */ - if (iocb->ki_eventfd != NULL) - eventfd_signal(iocb->ki_eventfd, 1); + if (eventfd) { + eventfd_signal(eventfd, 1); + eventfd_ctx_put(eventfd); + } +} +EXPORT_SYMBOL(batch_complete_aio); -put_rq: - /* everything turned out well, dispose of the aiocb. */ - aio_put_req(iocb); +/* aio_complete_batch + * Called when the io request on the given iocb is complete; @batch may be + * NULL. + */ +void aio_complete_batch(struct kiocb *req, long res, long res2, + struct batch_complete *batch) +{ + req->ki_res = res; + req->ki_res2 = res2; + + if (req->ki_list.next) { + struct kioctx *ctx = req->ki_ctx; + unsigned long flags; + + spin_lock_irqsave(&ctx->ctx_lock, flags); + list_del(&req->ki_list); + spin_unlock_irqrestore(&ctx->ctx_lock, flags); + } /* - * We have to order our ring_info tail store above and test - * of the wait list below outside the wait lock. This is - * like in wake_up_bit() where clearing a bit has to be - * ordered with the unlocked test. + * Special case handling for sync iocbs: + * - events go directly into the iocb for fast handling + * - the sync task with the iocb in its stack holds the single iocb + * ref, no other paths have a way to get another ref + * - the sync task helpfully left a reference to itself in the iocb */ - smp_mb(); + if (is_sync_kiocb(req)) { + BUG_ON(atomic_read(&req->ki_users) != 1); + req->ki_user_data = req->ki_res; + atomic_set(&req->ki_users, 0); + wake_up_process(req->ki_obj.tsk); + } else if (batch) { + int res; + struct kiocb *t; + struct rb_node **n = &batch->kiocb.rb_node, *parent = NULL; + + while (*n) { + parent = *n; + t = container_of(*n, struct kiocb, ki_node); + + res = req->ki_ctx != t->ki_ctx + ? req->ki_ctx < t->ki_ctx + : req->ki_eventfd != t->ki_eventfd + ? req->ki_eventfd < t->ki_eventfd + : req < t; + + n = res ? &(*n)->rb_left : &(*n)->rb_right; + } - if (waitqueue_active(&ctx->wait)) - wake_up(&ctx->wait); + rb_link_node(&req->ki_node, parent, n); + rb_insert_color(&req->ki_node, &batch->kiocb); + } else { + struct batch_complete batch_stack; - rcu_read_unlock(); + memset(&req->ki_node, 0, sizeof(req->ki_node)); + batch_stack.kiocb.rb_node = &req->ki_node; + + batch_complete_aio(&batch_stack); + } } -EXPORT_SYMBOL(aio_complete); +EXPORT_SYMBOL(aio_complete_batch); /* aio_read_events * Pull an event off of the ioctx's event ring. Returns the number of @@ -712,9 +897,9 @@ static long aio_read_events_ring(struct kioctx *ctx, head = ring->head; kunmap_atomic(ring); - pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr_events); + pr_debug("h%u t%u m%u\n", head, ctx->shadow_tail, ctx->nr_events); - if (head == ctx->tail) + if (head == ctx->shadow_tail) goto out; while (ret < nr) { @@ -722,8 +907,9 @@ static long aio_read_events_ring(struct kioctx *ctx, struct io_event *ev; struct page *page; - avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head; - if (head == ctx->tail) + avail = (head <= ctx->shadow_tail ? + ctx->shadow_tail : ctx->nr_events) - head; + if (head == ctx->shadow_tail) break; avail = min(avail, nr - ret); @@ -754,9 +940,9 @@ static long aio_read_events_ring(struct kioctx *ctx, kunmap_atomic(ring); flush_dcache_page(ctx->ring_pages[0]); - pr_debug("%li h%u t%u\n", ret, head, ctx->tail); + pr_debug("%li h%u t%u\n", ret, head, ctx->shadow_tail); - atomic_sub(ret, &ctx->reqs_active); + put_reqs_available(ctx, ret); out: mutex_unlock(&ctx->ring_lock); @@ -771,7 +957,7 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr, if (ret > 0) *i += ret; - if (unlikely(atomic_read(&ctx->dead))) + if (unlikely(percpu_ref_dead(&ctx->users))) ret = -EINVAL; if (!*i) @@ -1140,7 +1326,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, aio_put_req(req); /* drop extra ref to req */ return 0; out_put_req: - atomic_dec(&ctx->reqs_active); + put_reqs_available(ctx, 1); aio_put_req(req); /* drop extra ref to req */ aio_put_req(req); /* drop i/o ref to req */ return ret; |