summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKent Overstreet <koverstreet@google.com>2012-10-05 16:46:36 -0700
committerKent Overstreet <koverstreet@google.com>2012-10-05 16:46:36 -0700
commit30e7aab200bc75798fe16d80a59de40a0a597d81 (patch)
treef9383b297d22f270ce8ed1a26f7bbe45b0b40671
parentc78172f7efa5f84ca8c94960b153d38d28e5965d (diff)
-rw-r--r--fs/aio.c239
-rw-r--r--include/linux/aio.h7
2 files changed, 71 insertions, 175 deletions
diff --git a/fs/aio.c b/fs/aio.c
index a8d22e12a1ac..184ed1c183c8 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -98,69 +98,35 @@ static void aio_free_ring(struct kioctx *ctx)
static int aio_setup_ring(struct kioctx *ctx)
{
- struct aio_ring *ring;
- struct aio_ring_info *info = &ctx->ring_info;
unsigned nr_events = ctx->max_reqs;
unsigned long size;
- int nr_pages;
/* Compensate for the ring buffer's head/tail overlap entry */
nr_events += 2; /* 1 is required, 2 for good luck */
size = sizeof(struct aio_ring);
size += sizeof(struct io_event) * nr_events;
- nr_pages = (size + PAGE_SIZE-1) >> PAGE_SHIFT;
- if (nr_pages < 0)
- return -EINVAL;
+ ctx->ring_order = get_order(size);
- nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
-
- info->nr = 0;
- info->ring_pages = info->internal_pages;
- if (nr_pages > AIO_RING_PAGES) {
- info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
- if (!info->ring_pages)
- return -ENOMEM;
- }
+ nr_events = ((PAGE_SIZE << ctx->ring_order) -
+ sizeof(struct aio_ring)) / sizeof(struct io_event);
- info->mmap_size = nr_pages * PAGE_SIZE;
- dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
- down_write(&ctx->mm->mmap_sem);
- info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size,
- PROT_READ|PROT_WRITE,
- MAP_ANONYMOUS|MAP_PRIVATE, 0);
- if (IS_ERR((void *)info->mmap_base)) {
- up_write(&ctx->mm->mmap_sem);
- info->mmap_size = 0;
- aio_free_ring(ctx);
- return -EAGAIN;
- }
+ ctx->ring = (void *) __get_free_pages(GFP_KERNEL, ctx->ring_order);
+ if (!ctx->ring)
+ return -ENOMEM;
- dprintk("mmap address: 0x%08lx\n", info->mmap_base);
- info->nr_pages = get_user_pages(current, ctx->mm,
- info->mmap_base, nr_pages,
- 1, 0, info->ring_pages, NULL);
- up_write(&ctx->mm->mmap_sem);
+ ctx->user_id = (unsigned long) &ctx;
- if (unlikely(info->nr_pages != nr_pages)) {
- aio_free_ring(ctx);
- return -EAGAIN;
- }
+ ctx->nr_events = nr_events; /* trusted copy */
- ctx->user_id = info->mmap_base;
-
- info->nr = nr_events; /* trusted copy */
-
- ring = kmap_atomic(info->ring_pages[0]);
- ring->nr = nr_events; /* user copy */
- ring->id = ctx->user_id;
- ring->head = ring->tail = 0;
- ring->magic = AIO_RING_MAGIC;
- ring->compat_features = AIO_RING_COMPAT_FEATURES;
- ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
- ring->header_length = sizeof(struct aio_ring);
- kunmap_atomic(ring);
+ ctx->ring->nr = nr_events; /* user copy */
+ ctx->ring->id = ctx->user_id;
+ ctx->ring->head = ctx->ring->tail = 0;
+ ctx->ring->magic = AIO_RING_MAGIC;
+ ctx->ring->compat_features = AIO_RING_COMPAT_FEATURES;
+ ctx->ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
+ ctx->ring->header_length = sizeof(struct aio_ring);
return 0;
}
@@ -188,20 +154,14 @@ static int aio_setup_ring(struct kioctx *ctx)
kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
} while(0)
-static void ctx_rcu_free(struct rcu_head *head)
-{
- struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
- kmem_cache_free(kioctx_cachep, ctx);
-}
-
/* __put_ioctx
* Called when the last user of an aio context has gone away,
* and the struct needs to be freed.
*/
-static void __put_ioctx(struct kioctx *ctx)
+static void free_ioctx(struct work_struct *work)
{
+ struct kioctx *ctx = container_of(work, struct kioctx, free_work);
unsigned nr_events = ctx->max_reqs;
- BUG_ON(ctx->reqs_active);
cancel_delayed_work_sync(&ctx->wq);
aio_free_ring(ctx);
@@ -214,19 +174,21 @@ static void __put_ioctx(struct kioctx *ctx)
spin_unlock(&aio_nr_lock);
}
pr_debug("__put_ioctx: freeing %p\n", ctx);
- call_rcu(&ctx->rcu_head, ctx_rcu_free);
-}
-
-static inline int try_get_ioctx(struct kioctx *kioctx)
-{
- return atomic_inc_not_zero(&kioctx->users);
+ kmem_cache_free(kioctx_cachep, ctx);
}
static inline void put_ioctx(struct kioctx *kioctx)
{
BUG_ON(atomic_read(&kioctx->users) <= 0);
if (unlikely(atomic_dec_and_test(&kioctx->users)))
- __put_ioctx(kioctx);
+ schedule_work(&kioctx->free_work);
+}
+
+static inline void req_put_ioctx(struct kioctx *kioctx)
+{
+ BUG_ON(atomic_read(&kioctx->reqs_active) <= 0);
+ if (unlikely(atomic_dec_and_test(&kioctx->reqs_active)))
+ put_ioctx(kioctx);
}
static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
@@ -279,13 +241,16 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
mm = ctx->mm = current->mm;
atomic_inc(&mm->mm_count);
- atomic_set(&ctx->users, 2);
+ atomic_set(&ctx->users, 1);
+ atomic_set(&ctx->reqs_active, 1);
+
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs);
INIT_LIST_HEAD(&ctx->run_list);
+ INIT_WORK(&ctx->free_work, free_ioctx);
INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
if (aio_setup_ring(ctx) < 0)
@@ -327,36 +292,29 @@ out_freectx:
*/
static void kill_ctx(struct kioctx *ctx)
{
- struct task_struct *tsk = current;
- DECLARE_WAITQUEUE(wait, tsk);
+ struct kiocb *iocb, *t;
struct io_event res;
+ int put = 0;
spin_lock_irq(&ctx->ctx_lock);
- ctx->dead = 1;
- while (!list_empty(&ctx->active_reqs)) {
- struct list_head *pos = ctx->active_reqs.next;
- struct kiocb *iocb = list_kiocb(pos);
- list_del_init(&iocb->ki_list);
-
- kiocb_cancel(ctx, iocb, &res);
- }
- if (!ctx->reqs_active)
- goto out;
+ if (!ctx->dead) {
+ put = 1;
+ ctx->dead = 1;
+ hlist_del_rcu(&ctx->list);
- add_wait_queue(&ctx->wait, &wait);
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- while (ctx->reqs_active) {
spin_unlock_irq(&ctx->ctx_lock);
- io_schedule();
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+ synchronize_rcu();
spin_lock_irq(&ctx->ctx_lock);
}
- __set_task_state(tsk, TASK_RUNNING);
- remove_wait_queue(&ctx->wait, &wait);
-out:
+ list_for_each_entry_safe(iocb, t, &ctx->active_reqs, ki_list)
+ kiocb_cancel(ctx, iocb, &res);
+
spin_unlock_irq(&ctx->ctx_lock);
+
+ if (put)
+ req_put_ioctx(ctx);
}
/* wait_on_sync_kiocb:
@@ -388,15 +346,15 @@ void exit_aio(struct mm_struct *mm)
while (!hlist_empty(&mm->ioctx_list)) {
ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
- hlist_del_rcu(&ctx->list);
-
- kill_ctx(ctx);
+#if 0
if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG
"exit_aio:ioctx still alive: %d %d %d\n",
atomic_read(&ctx->users), ctx->dead,
ctx->reqs_active);
+#endif
+
/*
* We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything.
@@ -408,41 +366,33 @@ void exit_aio(struct mm_struct *mm)
* all other callers have ctx->mm == current->mm.
*/
ctx->ring_info.mmap_size = 0;
- put_ioctx(ctx);
+
+ kill_ctx(ctx);
}
}
/* aio_get_req
- * Allocate a slot for an aio request. Increments the users count
+ * Allocate a slot for an aio request. Increments the ki_users count
* of the kioctx so that the kioctx stays around until all requests are
* complete. Returns NULL if no requests are free.
*
- * Returns with kiocb->users set to 2. The io submit code path holds
+ * Returns with kiocb->ki_users set to 2. The io submit code path holds
* an extra reference while submitting the i/o.
* This prevents races between the aio code path referencing the
* req (after submitting it) and aio_complete() freeing the req.
*/
static struct kiocb *__aio_get_req(struct kioctx *ctx)
{
- struct kiocb *req = NULL;
+ struct kiocb *req;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
if (unlikely(!req))
return NULL;
- req->ki_flags = 0;
+ memset(req, 0, sizeof(*req));
req->ki_users = 2;
- req->ki_key = 0;
req->ki_ctx = ctx;
- req->ki_cancel = NULL;
- req->ki_retry = NULL;
- req->ki_dtor = NULL;
- req->private = NULL;
- req->ki_iovec = NULL;
INIT_LIST_HEAD(&req->ki_run_list);
- req->ki_attrs = NULL;
- req->ki_attr_rets = NULL;
- req->ki_eventfd = NULL;
return req;
}
@@ -475,10 +425,8 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
list_del(&req->ki_batch);
list_del(&req->ki_list);
kmem_cache_free(kiocb_cachep, req);
- ctx->reqs_active--;
+ req_put_ioctx(ctx);
}
- if (unlikely(!ctx->reqs_active && ctx->dead))
- wake_up_all(&ctx->wait);
spin_unlock_irq(&ctx->ctx_lock);
}
@@ -508,7 +456,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
spin_lock_irq(&ctx->ctx_lock);
ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
- avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
+ avail = aio_ring_avail(&ctx->ring_info, ring) - atomic_read(&ctx->reqs_active);
BUG_ON(avail < 0);
if (avail < allocated) {
/* Trim back the number of requests. */
@@ -523,7 +471,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
batch->count -= allocated;
list_for_each_entry(req, &batch->head, ki_batch) {
list_add(&req->ki_list, &ctx->active_reqs);
- ctx->reqs_active++;
+ atomic_inc(&ctx->users);
}
kunmap_atomic(ring);
@@ -548,8 +496,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx,
static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
{
- assert_spin_locked(&ctx->ctx_lock);
-
+ fput(req->ki_filp);
if (req->ki_eventfd != NULL)
eventfd_ctx_put(req->ki_eventfd);
if (req->ki_dtor)
@@ -558,10 +505,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
kfree(req->ki_iovec);
kfree(req->ki_attrs);
kmem_cache_free(kiocb_cachep, req);
- ctx->reqs_active--;
- if (unlikely(!ctx->reqs_active && ctx->dead))
- wake_up_all(&ctx->wait);
+ req_put_ioctx(ctx);
}
/* __aio_put_req
@@ -579,11 +524,7 @@ static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
if (likely(req->ki_users))
return;
list_del(&req->ki_list); /* remove from active_reqs */
- req->ki_cancel = NULL;
- req->ki_retry = NULL;
- fput(req->ki_filp);
- req->ki_filp = NULL;
really_put_req(ctx, req);
}
@@ -608,18 +549,13 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
rcu_read_lock();
- hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
- /*
- * RCU protects us against accessing freed memory but
- * we have to be careful not to get a reference when the
- * reference count already dropped to 0 (ctx->dead test
- * is unreliable because of races).
- */
- if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
+ hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
+ if (ctx->user_id == ctx_id){
+ BUG_ON(ctx->dead);
+ atomic_inc(&ctx->users);
ret = ctx;
break;
}
- }
rcu_read_unlock();
return ret;
@@ -1165,7 +1101,7 @@ retry:
break;
/* Try to only show up in io wait if there are ops
* in flight */
- if (ctx->reqs_active)
+ if (atomic_read(&ctx->reqs_active) > 1)
io_schedule();
else
schedule();
@@ -1200,35 +1136,6 @@ out:
return i ? i : ret;
}
-/* Take an ioctx and remove it from the list of ioctx's. Protects
- * against races with itself via ->dead.
- */
-static void io_destroy(struct kioctx *ioctx)
-{
- struct mm_struct *mm = current->mm;
- int was_dead;
-
- /* delete the entry from the list is someone else hasn't already */
- spin_lock(&mm->ioctx_lock);
- was_dead = ioctx->dead;
- ioctx->dead = 1;
- hlist_del_rcu(&ioctx->list);
- spin_unlock(&mm->ioctx_lock);
-
- dprintk("aio_release(%p)\n", ioctx);
- if (likely(!was_dead))
- put_ioctx(ioctx); /* twice for the list */
-
- kill_ctx(ioctx);
-
- /*
- * Wake up any waiters. The setting of ctx->dead must be seen
- * by other CPUs at this point. Right now, we rely on the
- * locking done by the above calls to ensure this consistency.
- */
- wake_up_all(&ioctx->wait);
-}
-
/* sys_io_setup:
* Create an aio_context capable of receiving at least nr_events.
* ctxp must not point to an aio_context that already exists, and
@@ -1264,7 +1171,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
if (!IS_ERR(ioctx)) {
ret = put_user(ioctx->user_id, ctxp);
if (ret)
- io_destroy(ioctx);
+ kill_ctx(ioctx);
put_ioctx(ioctx);
}
@@ -1282,7 +1189,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
{
struct kioctx *ioctx = lookup_ioctx(ctx);
if (likely(NULL != ioctx)) {
- io_destroy(ioctx);
+ kill_ctx(ioctx);
put_ioctx(ioctx);
return 0;
}
@@ -1697,23 +1604,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
goto out_put_req;
spin_lock_irq(&ctx->ctx_lock);
- /*
- * We could have raced with io_destroy() and are currently holding a
- * reference to ctx which should be destroyed. We cannot submit IO
- * since ctx gets freed as soon as io_submit() puts its reference. The
- * check here is reliable: io_destroy() sets ctx->dead before waiting
- * for outstanding IO and the barrier between these two is realized by
- * unlock of mm->ioctx_lock and lock of ctx->ctx_lock. Analogously we
- * increment ctx->reqs_active before checking for ctx->dead and the
- * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we
- * don't see ctx->dead set here, io_destroy() waits for our IO to
- * finish.
- */
- if (ctx->dead) {
- spin_unlock_irq(&ctx->ctx_lock);
- ret = -EINVAL;
- goto out_put_req;
- }
+
aio_run_iocb(req);
if (!list_empty(&ctx->run_list)) {
/* drain the run list */
diff --git a/include/linux/aio.h b/include/linux/aio.h
index fd628f9d2165..7ec611ae69f4 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -191,19 +191,24 @@ struct kioctx {
unsigned long user_id;
struct hlist_node list;
+ /* for io_getevents()/read_events() */
wait_queue_head_t wait;
spinlock_t ctx_lock;
- int reqs_active;
+ atomic_t reqs_active;
struct list_head active_reqs; /* used for cancellation */
struct list_head run_list; /* used for kicked reqs */
/* sys_io_setup currently limits this to an unsigned int */
unsigned max_reqs;
+ struct aio_ring *ring;
+ unsigned ring_order;
+ unsigned nr_events;
struct aio_ring_info ring_info;
+ struct work_struct free_work;
struct delayed_work wq;
struct rcu_head rcu_head;