diff options
-rw-r--r-- | fs/fuse/dev.c | 9 | ||||
-rw-r--r-- | fs/fuse/dev_uring.c | 207 | ||||
-rw-r--r-- | fs/fuse/dev_uring_i.h | 51 |
3 files changed, 267 insertions, 0 deletions
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index aa33eba51c51..1c21e491e891 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -6,6 +6,7 @@ See the file COPYING. */ +#include "dev_uring_i.h" #include "fuse_i.h" #include "fuse_dev_i.h" @@ -2291,6 +2292,12 @@ void fuse_abort_conn(struct fuse_conn *fc) spin_unlock(&fc->lock); fuse_dev_end_requests(&to_end); + + /* + * fc->lock must not be taken to avoid conflicts with io-uring + * locks + */ + fuse_uring_abort(fc); } else { spin_unlock(&fc->lock); } @@ -2302,6 +2309,8 @@ void fuse_wait_aborted(struct fuse_conn *fc) /* matches implicit memory barrier in fuse_drop_waiting() */ smp_mb(); wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0); + + fuse_uring_wait_stopped_queues(fc); } int fuse_dev_release(struct inode *inode, struct file *file) diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 1030c1720990..1161b9aa5e11 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -35,6 +35,37 @@ static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req, fuse_request_end(req); } +/* Abort all list queued request on the given ring queue */ +static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue) +{ + struct fuse_req *req; + LIST_HEAD(req_list); + + spin_lock(&queue->lock); + list_for_each_entry(req, &queue->fuse_req_queue, list) + clear_bit(FR_PENDING, &req->flags); + list_splice_init(&queue->fuse_req_queue, &req_list); + spin_unlock(&queue->lock); + + /* must not hold queue lock to avoid order issues with fi->lock */ + fuse_dev_end_requests(&req_list); +} + +void fuse_uring_abort_end_requests(struct fuse_ring *ring) +{ + int qid; + struct fuse_ring_queue *queue; + + for (qid = 0; qid < ring->nr_queues; qid++) { + queue = READ_ONCE(ring->queues[qid]); + if (!queue) + continue; + + queue->stopped = true; + fuse_uring_abort_end_queue_requests(queue); + } +} + void fuse_uring_destruct(struct fuse_conn *fc) { struct fuse_ring *ring = fc->ring; @@ -94,10 +125,13 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) goto out_err; } + init_waitqueue_head(&ring->stop_waitq); + fc->ring = ring; ring->nr_queues = nr_queues; ring->fc = fc; ring->max_payload_sz = max_payload_size; + atomic_set(&ring->queue_refs, 0); spin_unlock(&fc->lock); return ring; @@ -154,6 +188,175 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, return queue; } +static void fuse_uring_stop_fuse_req_end(struct fuse_req *req) +{ + clear_bit(FR_SENT, &req->flags); + req->out.h.error = -ECONNABORTED; + fuse_request_end(req); +} + +/* + * Release a request/entry on connection tear down + */ +static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent) +{ + struct fuse_req *req; + struct io_uring_cmd *cmd; + + struct fuse_ring_queue *queue = ent->queue; + + spin_lock(&queue->lock); + cmd = ent->cmd; + ent->cmd = NULL; + req = ent->fuse_req; + ent->fuse_req = NULL; + if (req) { + /* remove entry from queue->fpq->processing */ + list_del_init(&req->list); + } + spin_unlock(&queue->lock); + + if (cmd) + io_uring_cmd_done(cmd, -ENOTCONN, 0, IO_URING_F_UNLOCKED); + + if (req) + fuse_uring_stop_fuse_req_end(req); + + list_del_init(&ent->list); + kfree(ent); +} + +static void fuse_uring_stop_list_entries(struct list_head *head, + struct fuse_ring_queue *queue, + enum fuse_ring_req_state exp_state) +{ + struct fuse_ring *ring = queue->ring; + struct fuse_ring_ent *ent, *next; + ssize_t queue_refs = SSIZE_MAX; + LIST_HEAD(to_teardown); + + spin_lock(&queue->lock); + list_for_each_entry_safe(ent, next, head, list) { + if (ent->state != exp_state) { + pr_warn("entry teardown qid=%d state=%d expected=%d", + queue->qid, ent->state, exp_state); + continue; + } + + list_move(&ent->list, &to_teardown); + } + spin_unlock(&queue->lock); + + /* no queue lock to avoid lock order issues */ + list_for_each_entry_safe(ent, next, &to_teardown, list) { + fuse_uring_entry_teardown(ent); + queue_refs = atomic_dec_return(&ring->queue_refs); + WARN_ON_ONCE(queue_refs < 0); + } +} + +static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue) +{ + fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, + FRRS_USERSPACE); + fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue, + FRRS_AVAILABLE); +} + +/* + * Log state debug info + */ +static void fuse_uring_log_ent_state(struct fuse_ring *ring) +{ + int qid; + struct fuse_ring_ent *ent; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = ring->queues[qid]; + + if (!queue) + continue; + + spin_lock(&queue->lock); + /* + * Log entries from the intermediate queue, the other queues + * should be empty + */ + list_for_each_entry(ent, &queue->ent_w_req_queue, list) { + pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n", + ring, qid, ent, ent->state); + } + list_for_each_entry(ent, &queue->ent_commit_queue, list) { + pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n", + ring, qid, ent, ent->state); + } + spin_unlock(&queue->lock); + } + ring->stop_debug_log = 1; +} + +static void fuse_uring_async_stop_queues(struct work_struct *work) +{ + int qid; + struct fuse_ring *ring = + container_of(work, struct fuse_ring, async_teardown_work.work); + + /* XXX code dup */ + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); + + if (!queue) + continue; + + fuse_uring_teardown_entries(queue); + } + + /* + * Some ring entries might be in the middle of IO operations, + * i.e. in process to get handled by file_operations::uring_cmd + * or on the way to userspace - we could handle that with conditions in + * run time code, but easier/cleaner to have an async tear down handler + * If there are still queue references left + */ + if (atomic_read(&ring->queue_refs) > 0) { + if (time_after(jiffies, + ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT)) + fuse_uring_log_ent_state(ring); + + schedule_delayed_work(&ring->async_teardown_work, + FUSE_URING_TEARDOWN_INTERVAL); + } else { + wake_up_all(&ring->stop_waitq); + } +} + +/* + * Stop the ring queues + */ +void fuse_uring_stop_queues(struct fuse_ring *ring) +{ + int qid; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); + + if (!queue) + continue; + + fuse_uring_teardown_entries(queue); + } + + if (atomic_read(&ring->queue_refs) > 0) { + ring->teardown_time = jiffies; + INIT_DELAYED_WORK(&ring->async_teardown_work, + fuse_uring_async_stop_queues); + schedule_delayed_work(&ring->async_teardown_work, + FUSE_URING_TEARDOWN_INTERVAL); + } else { + wake_up_all(&ring->stop_waitq); + } +} + /* * Checks for errors and stores it into the request */ @@ -525,6 +728,9 @@ static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, return err; fpq = &queue->fpq; + if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped)) + return err; + spin_lock(&queue->lock); /* Find a request based on the unique ID of the fuse request * This should get revised, as it needs a hash calculation and list @@ -652,6 +858,7 @@ fuse_uring_create_ring_ent(struct io_uring_cmd *cmd, ent->headers = iov[0].iov_base; ent->payload = iov[1].iov_base; + atomic_inc(&ring->queue_refs); return ent; } diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index 44bf237f0d5a..a4316e118cbd 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -11,6 +11,9 @@ #ifdef CONFIG_FUSE_IO_URING +#define FUSE_URING_TEARDOWN_TIMEOUT (5 * HZ) +#define FUSE_URING_TEARDOWN_INTERVAL (HZ/20) + enum fuse_ring_req_state { FRRS_INVALID = 0, @@ -80,6 +83,8 @@ struct fuse_ring_queue { struct list_head fuse_req_queue; struct fuse_pqueue fpq; + + bool stopped; }; /** @@ -97,12 +102,51 @@ struct fuse_ring { size_t max_payload_sz; struct fuse_ring_queue **queues; + + /* + * Log ring entry states on stop when entries cannot be released + */ + unsigned int stop_debug_log : 1; + + wait_queue_head_t stop_waitq; + + /* async tear down */ + struct delayed_work async_teardown_work; + + /* log */ + unsigned long teardown_time; + + atomic_t queue_refs; }; bool fuse_uring_enabled(void); void fuse_uring_destruct(struct fuse_conn *fc); +void fuse_uring_stop_queues(struct fuse_ring *ring); +void fuse_uring_abort_end_requests(struct fuse_ring *ring); int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags); +static inline void fuse_uring_abort(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + + if (ring == NULL) + return; + + if (atomic_read(&ring->queue_refs) > 0) { + fuse_uring_abort_end_requests(ring); + fuse_uring_stop_queues(ring); + } +} + +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + + if (ring) + wait_event(ring->stop_waitq, + atomic_read(&ring->queue_refs) == 0); +} + #else /* CONFIG_FUSE_IO_URING */ struct fuse_ring; @@ -120,6 +164,13 @@ static inline bool fuse_uring_enabled(void) return false; } +static inline void fuse_uring_abort(struct fuse_conn *fc) +{ +} + +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) +{ +} #endif /* CONFIG_FUSE_IO_URING */ #endif /* _FS_FUSE_DEV_URING_I_H */ |