diff options
author | Kent Overstreet <koverstreet@google.com> | 2012-10-05 16:46:36 -0700 |
---|---|---|
committer | Kent Overstreet <koverstreet@google.com> | 2012-10-05 16:46:36 -0700 |
commit | 30e7aab200bc75798fe16d80a59de40a0a597d81 (patch) | |
tree | f9383b297d22f270ce8ed1a26f7bbe45b0b40671 | |
parent | c78172f7efa5f84ca8c94960b153d38d28e5965d (diff) |
-rw-r--r-- | fs/aio.c | 239 | ||||
-rw-r--r-- | include/linux/aio.h | 7 |
2 files changed, 71 insertions, 175 deletions
@@ -98,69 +98,35 @@ static void aio_free_ring(struct kioctx *ctx) static int aio_setup_ring(struct kioctx *ctx) { - struct aio_ring *ring; - struct aio_ring_info *info = &ctx->ring_info; unsigned nr_events = ctx->max_reqs; unsigned long size; - int nr_pages; /* Compensate for the ring buffer's head/tail overlap entry */ nr_events += 2; /* 1 is required, 2 for good luck */ size = sizeof(struct aio_ring); size += sizeof(struct io_event) * nr_events; - nr_pages = (size + PAGE_SIZE-1) >> PAGE_SHIFT; - if (nr_pages < 0) - return -EINVAL; + ctx->ring_order = get_order(size); - nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event); - - info->nr = 0; - info->ring_pages = info->internal_pages; - if (nr_pages > AIO_RING_PAGES) { - info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL); - if (!info->ring_pages) - return -ENOMEM; - } + nr_events = ((PAGE_SIZE << ctx->ring_order) - + sizeof(struct aio_ring)) / sizeof(struct io_event); - info->mmap_size = nr_pages * PAGE_SIZE; - dprintk("attempting mmap of %lu bytes\n", info->mmap_size); - down_write(&ctx->mm->mmap_sem); - info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size, - PROT_READ|PROT_WRITE, - MAP_ANONYMOUS|MAP_PRIVATE, 0); - if (IS_ERR((void *)info->mmap_base)) { - up_write(&ctx->mm->mmap_sem); - info->mmap_size = 0; - aio_free_ring(ctx); - return -EAGAIN; - } + ctx->ring = (void *) __get_free_pages(GFP_KERNEL, ctx->ring_order); + if (!ctx->ring) + return -ENOMEM; - dprintk("mmap address: 0x%08lx\n", info->mmap_base); - info->nr_pages = get_user_pages(current, ctx->mm, - info->mmap_base, nr_pages, - 1, 0, info->ring_pages, NULL); - up_write(&ctx->mm->mmap_sem); + ctx->user_id = (unsigned long) &ctx; - if (unlikely(info->nr_pages != nr_pages)) { - aio_free_ring(ctx); - return -EAGAIN; - } + ctx->nr_events = nr_events; /* trusted copy */ - ctx->user_id = info->mmap_base; - - info->nr = nr_events; /* trusted copy */ - - ring = kmap_atomic(info->ring_pages[0]); - ring->nr = nr_events; /* user copy */ - ring->id = ctx->user_id; - ring->head = ring->tail = 0; - ring->magic = AIO_RING_MAGIC; - ring->compat_features = AIO_RING_COMPAT_FEATURES; - ring->incompat_features = AIO_RING_INCOMPAT_FEATURES; - ring->header_length = sizeof(struct aio_ring); - kunmap_atomic(ring); + ctx->ring->nr = nr_events; /* user copy */ + ctx->ring->id = ctx->user_id; + ctx->ring->head = ctx->ring->tail = 0; + ctx->ring->magic = AIO_RING_MAGIC; + ctx->ring->compat_features = AIO_RING_COMPAT_FEATURES; + ctx->ring->incompat_features = AIO_RING_INCOMPAT_FEATURES; + ctx->ring->header_length = sizeof(struct aio_ring); return 0; } @@ -188,20 +154,14 @@ static int aio_setup_ring(struct kioctx *ctx) kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \ } while(0) -static void ctx_rcu_free(struct rcu_head *head) -{ - struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); - kmem_cache_free(kioctx_cachep, ctx); -} - /* __put_ioctx * Called when the last user of an aio context has gone away, * and the struct needs to be freed. */ -static void __put_ioctx(struct kioctx *ctx) +static void free_ioctx(struct work_struct *work) { + struct kioctx *ctx = container_of(work, struct kioctx, free_work); unsigned nr_events = ctx->max_reqs; - BUG_ON(ctx->reqs_active); cancel_delayed_work_sync(&ctx->wq); aio_free_ring(ctx); @@ -214,19 +174,21 @@ static void __put_ioctx(struct kioctx *ctx) spin_unlock(&aio_nr_lock); } pr_debug("__put_ioctx: freeing %p\n", ctx); - call_rcu(&ctx->rcu_head, ctx_rcu_free); -} - -static inline int try_get_ioctx(struct kioctx *kioctx) -{ - return atomic_inc_not_zero(&kioctx->users); + kmem_cache_free(kioctx_cachep, ctx); } static inline void put_ioctx(struct kioctx *kioctx) { BUG_ON(atomic_read(&kioctx->users) <= 0); if (unlikely(atomic_dec_and_test(&kioctx->users))) - __put_ioctx(kioctx); + schedule_work(&kioctx->free_work); +} + +static inline void req_put_ioctx(struct kioctx *kioctx) +{ + BUG_ON(atomic_read(&kioctx->reqs_active) <= 0); + if (unlikely(atomic_dec_and_test(&kioctx->reqs_active))) + put_ioctx(kioctx); } static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, @@ -279,13 +241,16 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) mm = ctx->mm = current->mm; atomic_inc(&mm->mm_count); - atomic_set(&ctx->users, 2); + atomic_set(&ctx->users, 1); + atomic_set(&ctx->reqs_active, 1); + spin_lock_init(&ctx->ctx_lock); spin_lock_init(&ctx->ring_info.ring_lock); init_waitqueue_head(&ctx->wait); INIT_LIST_HEAD(&ctx->active_reqs); INIT_LIST_HEAD(&ctx->run_list); + INIT_WORK(&ctx->free_work, free_ioctx); INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler); if (aio_setup_ring(ctx) < 0) @@ -327,36 +292,29 @@ out_freectx: */ static void kill_ctx(struct kioctx *ctx) { - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); + struct kiocb *iocb, *t; struct io_event res; + int put = 0; spin_lock_irq(&ctx->ctx_lock); - ctx->dead = 1; - while (!list_empty(&ctx->active_reqs)) { - struct list_head *pos = ctx->active_reqs.next; - struct kiocb *iocb = list_kiocb(pos); - list_del_init(&iocb->ki_list); - - kiocb_cancel(ctx, iocb, &res); - } - if (!ctx->reqs_active) - goto out; + if (!ctx->dead) { + put = 1; + ctx->dead = 1; + hlist_del_rcu(&ctx->list); - add_wait_queue(&ctx->wait, &wait); - set_task_state(tsk, TASK_UNINTERRUPTIBLE); - while (ctx->reqs_active) { spin_unlock_irq(&ctx->ctx_lock); - io_schedule(); - set_task_state(tsk, TASK_UNINTERRUPTIBLE); + synchronize_rcu(); spin_lock_irq(&ctx->ctx_lock); } - __set_task_state(tsk, TASK_RUNNING); - remove_wait_queue(&ctx->wait, &wait); -out: + list_for_each_entry_safe(iocb, t, &ctx->active_reqs, ki_list) + kiocb_cancel(ctx, iocb, &res); + spin_unlock_irq(&ctx->ctx_lock); + + if (put) + req_put_ioctx(ctx); } /* wait_on_sync_kiocb: @@ -388,15 +346,15 @@ void exit_aio(struct mm_struct *mm) while (!hlist_empty(&mm->ioctx_list)) { ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list); - hlist_del_rcu(&ctx->list); - - kill_ctx(ctx); +#if 0 if (1 != atomic_read(&ctx->users)) printk(KERN_DEBUG "exit_aio:ioctx still alive: %d %d %d\n", atomic_read(&ctx->users), ctx->dead, ctx->reqs_active); +#endif + /* * We don't need to bother with munmap() here - * exit_mmap(mm) is coming and it'll unmap everything. @@ -408,41 +366,33 @@ void exit_aio(struct mm_struct *mm) * all other callers have ctx->mm == current->mm. */ ctx->ring_info.mmap_size = 0; - put_ioctx(ctx); + + kill_ctx(ctx); } } /* aio_get_req - * Allocate a slot for an aio request. Increments the users count + * Allocate a slot for an aio request. Increments the ki_users count * of the kioctx so that the kioctx stays around until all requests are * complete. Returns NULL if no requests are free. * - * Returns with kiocb->users set to 2. The io submit code path holds + * Returns with kiocb->ki_users set to 2. The io submit code path holds * an extra reference while submitting the i/o. * This prevents races between the aio code path referencing the * req (after submitting it) and aio_complete() freeing the req. */ static struct kiocb *__aio_get_req(struct kioctx *ctx) { - struct kiocb *req = NULL; + struct kiocb *req; req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL); if (unlikely(!req)) return NULL; - req->ki_flags = 0; + memset(req, 0, sizeof(*req)); req->ki_users = 2; - req->ki_key = 0; req->ki_ctx = ctx; - req->ki_cancel = NULL; - req->ki_retry = NULL; - req->ki_dtor = NULL; - req->private = NULL; - req->ki_iovec = NULL; INIT_LIST_HEAD(&req->ki_run_list); - req->ki_attrs = NULL; - req->ki_attr_rets = NULL; - req->ki_eventfd = NULL; return req; } @@ -475,10 +425,8 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch) list_del(&req->ki_batch); list_del(&req->ki_list); kmem_cache_free(kiocb_cachep, req); - ctx->reqs_active--; + req_put_ioctx(ctx); } - if (unlikely(!ctx->reqs_active && ctx->dead)) - wake_up_all(&ctx->wait); spin_unlock_irq(&ctx->ctx_lock); } @@ -508,7 +456,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch) spin_lock_irq(&ctx->ctx_lock); ring = kmap_atomic(ctx->ring_info.ring_pages[0]); - avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active; + avail = aio_ring_avail(&ctx->ring_info, ring) - atomic_read(&ctx->reqs_active); BUG_ON(avail < 0); if (avail < allocated) { /* Trim back the number of requests. */ @@ -523,7 +471,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch) batch->count -= allocated; list_for_each_entry(req, &batch->head, ki_batch) { list_add(&req->ki_list, &ctx->active_reqs); - ctx->reqs_active++; + atomic_inc(&ctx->users); } kunmap_atomic(ring); @@ -548,8 +496,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx, static inline void really_put_req(struct kioctx *ctx, struct kiocb *req) { - assert_spin_locked(&ctx->ctx_lock); - + fput(req->ki_filp); if (req->ki_eventfd != NULL) eventfd_ctx_put(req->ki_eventfd); if (req->ki_dtor) @@ -558,10 +505,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req) kfree(req->ki_iovec); kfree(req->ki_attrs); kmem_cache_free(kiocb_cachep, req); - ctx->reqs_active--; - if (unlikely(!ctx->reqs_active && ctx->dead)) - wake_up_all(&ctx->wait); + req_put_ioctx(ctx); } /* __aio_put_req @@ -579,11 +524,7 @@ static void __aio_put_req(struct kioctx *ctx, struct kiocb *req) if (likely(req->ki_users)) return; list_del(&req->ki_list); /* remove from active_reqs */ - req->ki_cancel = NULL; - req->ki_retry = NULL; - fput(req->ki_filp); - req->ki_filp = NULL; really_put_req(ctx, req); } @@ -608,18 +549,13 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id) rcu_read_lock(); - hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) { - /* - * RCU protects us against accessing freed memory but - * we have to be careful not to get a reference when the - * reference count already dropped to 0 (ctx->dead test - * is unreliable because of races). - */ - if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){ + hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) + if (ctx->user_id == ctx_id){ + BUG_ON(ctx->dead); + atomic_inc(&ctx->users); ret = ctx; break; } - } rcu_read_unlock(); return ret; @@ -1165,7 +1101,7 @@ retry: break; /* Try to only show up in io wait if there are ops * in flight */ - if (ctx->reqs_active) + if (atomic_read(&ctx->reqs_active) > 1) io_schedule(); else schedule(); @@ -1200,35 +1136,6 @@ out: return i ? i : ret; } -/* Take an ioctx and remove it from the list of ioctx's. Protects - * against races with itself via ->dead. - */ -static void io_destroy(struct kioctx *ioctx) -{ - struct mm_struct *mm = current->mm; - int was_dead; - - /* delete the entry from the list is someone else hasn't already */ - spin_lock(&mm->ioctx_lock); - was_dead = ioctx->dead; - ioctx->dead = 1; - hlist_del_rcu(&ioctx->list); - spin_unlock(&mm->ioctx_lock); - - dprintk("aio_release(%p)\n", ioctx); - if (likely(!was_dead)) - put_ioctx(ioctx); /* twice for the list */ - - kill_ctx(ioctx); - - /* - * Wake up any waiters. The setting of ctx->dead must be seen - * by other CPUs at this point. Right now, we rely on the - * locking done by the above calls to ensure this consistency. - */ - wake_up_all(&ioctx->wait); -} - /* sys_io_setup: * Create an aio_context capable of receiving at least nr_events. * ctxp must not point to an aio_context that already exists, and @@ -1264,7 +1171,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp) if (!IS_ERR(ioctx)) { ret = put_user(ioctx->user_id, ctxp); if (ret) - io_destroy(ioctx); + kill_ctx(ioctx); put_ioctx(ioctx); } @@ -1282,7 +1189,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx) { struct kioctx *ioctx = lookup_ioctx(ctx); if (likely(NULL != ioctx)) { - io_destroy(ioctx); + kill_ctx(ioctx); put_ioctx(ioctx); return 0; } @@ -1697,23 +1604,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, goto out_put_req; spin_lock_irq(&ctx->ctx_lock); - /* - * We could have raced with io_destroy() and are currently holding a - * reference to ctx which should be destroyed. We cannot submit IO - * since ctx gets freed as soon as io_submit() puts its reference. The - * check here is reliable: io_destroy() sets ctx->dead before waiting - * for outstanding IO and the barrier between these two is realized by - * unlock of mm->ioctx_lock and lock of ctx->ctx_lock. Analogously we - * increment ctx->reqs_active before checking for ctx->dead and the - * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we - * don't see ctx->dead set here, io_destroy() waits for our IO to - * finish. - */ - if (ctx->dead) { - spin_unlock_irq(&ctx->ctx_lock); - ret = -EINVAL; - goto out_put_req; - } + aio_run_iocb(req); if (!list_empty(&ctx->run_list)) { /* drain the run list */ diff --git a/include/linux/aio.h b/include/linux/aio.h index fd628f9d2165..7ec611ae69f4 100644 --- a/include/linux/aio.h +++ b/include/linux/aio.h @@ -191,19 +191,24 @@ struct kioctx { unsigned long user_id; struct hlist_node list; + /* for io_getevents()/read_events() */ wait_queue_head_t wait; spinlock_t ctx_lock; - int reqs_active; + atomic_t reqs_active; struct list_head active_reqs; /* used for cancellation */ struct list_head run_list; /* used for kicked reqs */ /* sys_io_setup currently limits this to an unsigned int */ unsigned max_reqs; + struct aio_ring *ring; + unsigned ring_order; + unsigned nr_events; struct aio_ring_info ring_info; + struct work_struct free_work; struct delayed_work wq; struct rcu_head rcu_head; |