summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/linux/io_uring_types.h2
-rw-r--r--include/uapi/linux/io_uring.h7
-rw-r--r--io_uring/cancel.c2
-rw-r--r--io_uring/io_uring.c147
-rw-r--r--io_uring/io_uring.h29
-rw-r--r--io_uring/rsrc.c2
6 files changed, 168 insertions, 21 deletions
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 677a25d44d7f..d56ff2185168 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -301,6 +301,8 @@ struct io_ring_ctx {
struct io_hash_table cancel_table;
bool poll_multi_queue;
+ struct llist_head work_llist;
+
struct list_head io_buffers_comp;
} ____cacheline_aligned_in_smp;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 6b83177fd41d..972b179bc07a 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -157,6 +157,13 @@ enum {
*/
#define IORING_SETUP_SINGLE_ISSUER (1U << 12)
+/*
+ * Defer running task work to get events.
+ * Rather than running bits of task work whenever the task transitions
+ * try to do it just before it is needed.
+ */
+#define IORING_SETUP_DEFER_TASKRUN (1U << 13)
+
enum io_uring_op {
IORING_OP_NOP,
IORING_OP_READV,
diff --git a/io_uring/cancel.c b/io_uring/cancel.c
index 5fc5d3e80fcb..2291a53cdabd 100644
--- a/io_uring/cancel.c
+++ b/io_uring/cancel.c
@@ -292,7 +292,7 @@ int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg)
break;
mutex_unlock(&ctx->uring_lock);
- ret = io_run_task_work_sig();
+ ret = io_run_task_work_sig(ctx);
if (ret < 0) {
mutex_lock(&ctx->uring_lock);
break;
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index edf7381b0215..1f0df14c3062 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -142,7 +142,7 @@ static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
static void io_dismantle_req(struct io_kiocb *req);
static void io_clean_op(struct io_kiocb *req);
static void io_queue_sqe(struct io_kiocb *req);
-
+static void io_move_task_work_from_local(struct io_ring_ctx *ctx);
static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
static struct kmem_cache *req_cachep;
@@ -316,6 +316,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
init_llist_head(&ctx->rsrc_put_llist);
+ init_llist_head(&ctx->work_llist);
INIT_LIST_HEAD(&ctx->tctx_list);
ctx->submit_state.free_list.next = NULL;
INIT_WQ_LIST(&ctx->locked_free_list);
@@ -1047,12 +1048,36 @@ void tctx_task_work(struct callback_head *cb)
trace_io_uring_task_work_run(tctx, count, loops);
}
-void io_req_task_work_add(struct io_kiocb *req)
+static void io_req_local_work_add(struct io_kiocb *req)
+{
+ struct io_ring_ctx *ctx = req->ctx;
+
+ if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
+ return;
+
+ if (unlikely(atomic_read(&req->task->io_uring->in_idle))) {
+ io_move_task_work_from_local(ctx);
+ return;
+ }
+
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+
+ io_cqring_wake(ctx);
+
+}
+
+static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
{
struct io_uring_task *tctx = req->task->io_uring;
struct io_ring_ctx *ctx = req->ctx;
struct llist_node *node;
+ if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+ io_req_local_work_add(req);
+ return;
+ }
+
/* task_work already pending, we're done */
if (!llist_add(&req->io_task_work.node, &tctx->task_list))
return;
@@ -1074,6 +1099,73 @@ void io_req_task_work_add(struct io_kiocb *req)
}
}
+void io_req_task_work_add(struct io_kiocb *req)
+{
+ __io_req_task_work_add(req, true);
+}
+
+static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
+{
+ struct llist_node *node;
+
+ node = llist_del_all(&ctx->work_llist);
+ while (node) {
+ struct io_kiocb *req = container_of(node, struct io_kiocb,
+ io_task_work.node);
+
+ node = node->next;
+ __io_req_task_work_add(req, false);
+ }
+}
+
+int io_run_local_work(struct io_ring_ctx *ctx)
+{
+ bool locked;
+ struct llist_node *node;
+ struct llist_node fake;
+ struct llist_node *current_final = NULL;
+ int ret;
+
+ if (unlikely(ctx->submitter_task != current)) {
+ /* maybe this is before any submissions */
+ if (!ctx->submitter_task)
+ return 0;
+
+ return -EEXIST;
+ }
+
+ locked = mutex_trylock(&ctx->uring_lock);
+
+ node = io_llist_xchg(&ctx->work_llist, &fake);
+ ret = 0;
+again:
+ while (node != current_final) {
+ struct llist_node *next = node->next;
+ struct io_kiocb *req = container_of(node, struct io_kiocb,
+ io_task_work.node);
+ prefetch(container_of(next, struct io_kiocb, io_task_work.node));
+ req->io_task_work.func(req, &locked);
+ ret++;
+ node = next;
+ }
+
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+
+ node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL);
+ if (node != &fake) {
+ current_final = &fake;
+ node = io_llist_xchg(&ctx->work_llist, &fake);
+ goto again;
+ }
+
+ if (locked) {
+ io_submit_flush_completions(ctx);
+ mutex_unlock(&ctx->uring_lock);
+ }
+ return ret;
+}
+
static void io_req_tw_post(struct io_kiocb *req, bool *locked)
{
io_req_complete_post(req);
@@ -1285,8 +1377,10 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
u32 tail = ctx->cached_cq_tail;
mutex_unlock(&ctx->uring_lock);
- io_run_task_work();
+ ret = io_run_task_work_ctx(ctx);
mutex_lock(&ctx->uring_lock);
+ if (ret < 0)
+ break;
/* some requests don't go through iopoll_list */
if (tail != ctx->cached_cq_tail ||
@@ -2148,7 +2242,9 @@ struct io_wait_queue {
static inline bool io_has_work(struct io_ring_ctx *ctx)
{
- return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
+ return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
+ ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
+ !llist_empty(&ctx->work_llist));
}
static inline bool io_should_wake(struct io_wait_queue *iowq)
@@ -2180,9 +2276,9 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
return -1;
}
-int io_run_task_work_sig(void)
+int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
- if (io_run_task_work())
+ if (io_run_task_work_ctx(ctx) > 0)
return 1;
if (task_sigpending(current))
return -EINTR;
@@ -2198,7 +2294,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
unsigned long check_cq;
/* make sure we run task_work before checking for signals */
- ret = io_run_task_work_sig();
+ ret = io_run_task_work_sig(ctx);
if (ret || io_should_wake(iowq))
return ret;
@@ -2229,12 +2325,14 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
int ret;
do {
+ /* always run at least 1 task work to process local work */
+ ret = io_run_task_work_ctx(ctx);
+ if (ret < 0)
+ return ret;
io_cqring_overflow_flush(ctx);
if (io_cqring_events(ctx) >= min_events)
return 0;
- if (!io_run_task_work())
- break;
- } while (1);
+ } while (ret > 0);
if (sig) {
#ifdef CONFIG_COMPAT
@@ -2575,6 +2673,9 @@ static __cold void io_ring_exit_work(struct work_struct *work)
* as nobody else will be looking for them.
*/
do {
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ io_move_task_work_from_local(ctx);
+
while (io_uring_try_cancel_requests(ctx, NULL, true))
cond_resched();
@@ -2769,13 +2870,15 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
}
}
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ ret |= io_run_local_work(ctx) > 0;
ret |= io_cancel_defer_files(ctx, task, cancel_all);
mutex_lock(&ctx->uring_lock);
ret |= io_poll_remove_all(ctx, task, cancel_all);
mutex_unlock(&ctx->uring_lock);
ret |= io_kill_timeouts(ctx, task, cancel_all);
if (task)
- ret |= io_run_task_work();
+ ret |= io_run_task_work() > 0;
return ret;
}
@@ -3060,8 +3163,10 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
goto iopoll_locked;
mutex_unlock(&ctx->uring_lock);
}
+
if (flags & IORING_ENTER_GETEVENTS) {
int ret2;
+
if (ctx->syscall_iopoll) {
/*
* We disallow the app entering submit/complete with
@@ -3290,18 +3395,30 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
if (ctx->flags & IORING_SETUP_SQPOLL) {
/* IPI related flags don't make sense with SQPOLL */
if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
- IORING_SETUP_TASKRUN_FLAG))
+ IORING_SETUP_TASKRUN_FLAG |
+ IORING_SETUP_DEFER_TASKRUN))
goto err;
ctx->notify_method = TWA_SIGNAL_NO_IPI;
} else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
ctx->notify_method = TWA_SIGNAL_NO_IPI;
} else {
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
+ !(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
goto err;
ctx->notify_method = TWA_SIGNAL;
}
/*
+ * For DEFER_TASKRUN we require the completion task to be the same as the
+ * submission task. This implies that there is only one submitter, so enforce
+ * that.
+ */
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
+ !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
+ goto err;
+ }
+
+ /*
* This is just grabbed for accounting purposes. When a process exits,
* the mm is exited and dropped before the files, hence we need to hang
* on to this mm purely for the purposes of being able to unaccount
@@ -3401,7 +3518,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
- IORING_SETUP_SINGLE_ISSUER))
+ IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN))
return -EINVAL;
return io_uring_create(entries, &p, params);
@@ -3864,7 +3981,7 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
ctx = f.file->private_data;
- io_run_task_work();
+ io_run_task_work_ctx(ctx);
mutex_lock(&ctx->uring_lock);
ret = __io_uring_register(ctx, opcode, arg, nr_args);
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 2f73f83af960..f417d75d7bc1 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -26,7 +26,8 @@ enum {
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx);
bool io_req_cqe_overflow(struct io_kiocb *req);
-int io_run_task_work_sig(void);
+int io_run_task_work_sig(struct io_ring_ctx *ctx);
+int io_run_local_work(struct io_ring_ctx *ctx);
void io_req_complete_failed(struct io_kiocb *req, s32 res);
void __io_req_complete(struct io_kiocb *req, unsigned issue_flags);
void io_req_complete_post(struct io_kiocb *req);
@@ -221,17 +222,37 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
}
-static inline bool io_run_task_work(void)
+static inline int io_run_task_work(void)
{
if (test_thread_flag(TIF_NOTIFY_SIGNAL)) {
__set_current_state(TASK_RUNNING);
clear_notify_signal();
if (task_work_pending(current))
task_work_run();
- return true;
+ return 1;
}
- return false;
+ return 0;
+}
+
+static inline int io_run_task_work_ctx(struct io_ring_ctx *ctx)
+{
+ int ret = 0;
+ int ret2;
+
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ ret = io_run_local_work(ctx);
+
+ /* want to run this after in case more is added */
+ ret2 = io_run_task_work();
+
+ /* Try propagate error in favour of if tasks were run,
+ * but still make sure to run them if requested
+ */
+ if (ret >= 0)
+ ret += ret2;
+
+ return ret;
}
static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c
index cf3272113214..6f88ded0e7e5 100644
--- a/io_uring/rsrc.c
+++ b/io_uring/rsrc.c
@@ -341,7 +341,7 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
flush_delayed_work(&ctx->rsrc_put_work);
reinit_completion(&data->done);
- ret = io_run_task_work_sig();
+ ret = io_run_task_work_sig(ctx);
mutex_lock(&ctx->uring_lock);
} while (ret >= 0);
data->quiesce = false;