io_uring: improve poll completion performance
For busy IORING_OP_POLL_ADD workloads, we can have enough contention
on the completion lock that we fail the inline completion path quite
often as we fail the trylock on that lock. Add a list for deferred
completions that we can use in that case. This helps reduce the number
of async offloads we have to do, as if we get multiple completions in
a row, we'll piggy back on to the poll_llist instead of having to queue
our own offload.
Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/fs/io_uring.c b/fs/io_uring.c
index dfa99da..c54a8bd 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -286,7 +286,8 @@ struct io_ring_ctx {
struct {
spinlock_t completion_lock;
- bool poll_multi_file;
+ struct llist_head poll_llist;
+
/*
* ->poll_list is protected by the ctx->uring_lock for
* io_uring instances that don't use IORING_SETUP_SQPOLL.
@@ -296,6 +297,7 @@ struct io_ring_ctx {
struct list_head poll_list;
struct hlist_head *cancel_hash;
unsigned cancel_hash_bits;
+ bool poll_multi_file;
spinlock_t inflight_lock;
struct list_head inflight_list;
@@ -453,7 +455,14 @@ struct io_kiocb {
};
struct io_async_ctx *io;
- struct file *ring_file;
+ union {
+ /*
+ * ring_file is only used in the submission path, and
+ * llist_node is only used for poll deferred completions
+ */
+ struct file *ring_file;
+ struct llist_node llist_node;
+ };
int ring_fd;
bool has_user;
bool in_async;
@@ -725,6 +734,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->wait);
spin_lock_init(&ctx->completion_lock);
+ init_llist_head(&ctx->poll_llist);
INIT_LIST_HEAD(&ctx->poll_list);
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
@@ -1320,6 +1330,20 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
}
+static inline bool io_req_multi_free(struct io_kiocb *req)
+{
+ /*
+ * If we're not using fixed files, we have to pair the completion part
+ * with the file put. Use regular completions for those, only batch
+ * free for fixed file and non-linked commands.
+ */
+ if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == REQ_F_FIXED_FILE)
+ && !io_is_fallback_req(req) && !req->io)
+ return true;
+
+ return false;
+}
+
/*
* Find and free completed poll iocbs
*/
@@ -1339,14 +1363,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
(*nr_events)++;
if (refcount_dec_and_test(&req->refs)) {
- /* If we're not using fixed files, we have to pair the
- * completion part with the file put. Use regular
- * completions for those, only batch free for fixed
- * file and non-linked commands.
- */
- if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
- REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
- !req->io) {
+ if (io_req_multi_free(req)) {
reqs[to_free++] = req;
if (to_free == ARRAY_SIZE(reqs))
io_free_req_many(ctx, reqs, &to_free);
@@ -3081,6 +3098,44 @@ static void io_poll_complete_work(struct io_wq_work **workptr)
io_wq_assign_next(workptr, nxt);
}
+static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
+{
+ void *reqs[IO_IOPOLL_BATCH];
+ struct io_kiocb *req, *tmp;
+ int to_free = 0;
+
+ spin_lock_irq(&ctx->completion_lock);
+ llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
+ hash_del(&req->hash_node);
+ io_poll_complete(req, req->result, 0);
+
+ if (refcount_dec_and_test(&req->refs)) {
+ if (io_req_multi_free(req)) {
+ reqs[to_free++] = req;
+ if (to_free == ARRAY_SIZE(reqs))
+ io_free_req_many(ctx, reqs, &to_free);
+ } else {
+ req->flags |= REQ_F_COMP_LOCKED;
+ io_free_req(req);
+ }
+ }
+ }
+ spin_unlock_irq(&ctx->completion_lock);
+
+ io_cqring_ev_posted(ctx);
+ io_free_req_many(ctx, reqs, &to_free);
+}
+
+static void io_poll_flush(struct io_wq_work **workptr)
+{
+ struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
+ struct llist_node *nodes;
+
+ nodes = llist_del_all(&req->ctx->poll_llist);
+ if (nodes)
+ __io_poll_flush(req->ctx, nodes);
+}
+
static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
void *key)
{
@@ -3088,7 +3143,6 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
struct io_ring_ctx *ctx = req->ctx;
__poll_t mask = key_to_poll(key);
- unsigned long flags;
/* for instances that support it check for an event match first: */
if (mask && !(mask & poll->events))
@@ -3102,17 +3156,31 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
* If we have a link timeout we're going to need the completion_lock
* for finalizing the request, mark us as having grabbed that already.
*/
- if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
- hash_del(&req->hash_node);
- io_poll_complete(req, mask, 0);
- req->flags |= REQ_F_COMP_LOCKED;
- io_put_req(req);
- spin_unlock_irqrestore(&ctx->completion_lock, flags);
+ if (mask) {
+ unsigned long flags;
- io_cqring_ev_posted(ctx);
- } else {
- io_queue_async_work(req);
+ if (llist_empty(&ctx->poll_llist) &&
+ spin_trylock_irqsave(&ctx->completion_lock, flags)) {
+ hash_del(&req->hash_node);
+ io_poll_complete(req, mask, 0);
+ req->flags |= REQ_F_COMP_LOCKED;
+ io_put_req(req);
+ spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
+ io_cqring_ev_posted(ctx);
+ req = NULL;
+ } else {
+ req->result = mask;
+ req->llist_node.next = NULL;
+ /* if the list wasn't empty, we're done */
+ if (!llist_add(&req->llist_node, &ctx->poll_llist))
+ req = NULL;
+ else
+ req->work.func = io_poll_flush;
+ }
}
+ if (req)
+ io_queue_async_work(req);
return 1;
}