io_uring: improve poll completion performance

For busy IORING_OP_POLL_ADD workloads, we can have enough contention
on the completion lock that we fail the inline completion path quite
often as we fail the trylock on that lock. Add a list for deferred
completions that we can use in that case. This helps reduce the number
of async offloads we have to do, as if we get multiple completions in
a row, we'll piggy back on to the poll_llist instead of having to queue
our own offload.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/fs/io_uring.c b/fs/io_uring.c
index dfa99da..c54a8bd 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -286,7 +286,8 @@ struct io_ring_ctx {
 
 	struct {
 		spinlock_t		completion_lock;
-		bool			poll_multi_file;
+		struct llist_head	poll_llist;
+
 		/*
 		 * ->poll_list is protected by the ctx->uring_lock for
 		 * io_uring instances that don't use IORING_SETUP_SQPOLL.
@@ -296,6 +297,7 @@ struct io_ring_ctx {
 		struct list_head	poll_list;
 		struct hlist_head	*cancel_hash;
 		unsigned		cancel_hash_bits;
+		bool			poll_multi_file;
 
 		spinlock_t		inflight_lock;
 		struct list_head	inflight_list;
@@ -453,7 +455,14 @@ struct io_kiocb {
 	};
 
 	struct io_async_ctx		*io;
-	struct file			*ring_file;
+	union {
+		/*
+		 * ring_file is only used in the submission path, and
+		 * llist_node is only used for poll deferred completions
+		 */
+		struct file		*ring_file;
+		struct llist_node	llist_node;
+	};
 	int				ring_fd;
 	bool				has_user;
 	bool				in_async;
@@ -725,6 +734,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	mutex_init(&ctx->uring_lock);
 	init_waitqueue_head(&ctx->wait);
 	spin_lock_init(&ctx->completion_lock);
+	init_llist_head(&ctx->poll_llist);
 	INIT_LIST_HEAD(&ctx->poll_list);
 	INIT_LIST_HEAD(&ctx->defer_list);
 	INIT_LIST_HEAD(&ctx->timeout_list);
@@ -1320,6 +1330,20 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
 	return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
 }
 
+static inline bool io_req_multi_free(struct io_kiocb *req)
+{
+	/*
+	 * If we're not using fixed files, we have to pair the completion part
+	 * with the file put. Use regular completions for those, only batch
+	 * free for fixed file and non-linked commands.
+	 */
+	if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == REQ_F_FIXED_FILE)
+	    && !io_is_fallback_req(req) && !req->io)
+		return true;
+
+	return false;
+}
+
 /*
  * Find and free completed poll iocbs
  */
@@ -1339,14 +1363,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 		(*nr_events)++;
 
 		if (refcount_dec_and_test(&req->refs)) {
-			/* If we're not using fixed files, we have to pair the
-			 * completion part with the file put. Use regular
-			 * completions for those, only batch free for fixed
-			 * file and non-linked commands.
-			 */
-			if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
-			    REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
-			    !req->io) {
+			if (io_req_multi_free(req)) {
 				reqs[to_free++] = req;
 				if (to_free == ARRAY_SIZE(reqs))
 					io_free_req_many(ctx, reqs, &to_free);
@@ -3081,6 +3098,44 @@ static void io_poll_complete_work(struct io_wq_work **workptr)
 		io_wq_assign_next(workptr, nxt);
 }
 
+static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
+{
+	void *reqs[IO_IOPOLL_BATCH];
+	struct io_kiocb *req, *tmp;
+	int to_free = 0;
+
+	spin_lock_irq(&ctx->completion_lock);
+	llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
+		hash_del(&req->hash_node);
+		io_poll_complete(req, req->result, 0);
+
+		if (refcount_dec_and_test(&req->refs)) {
+			if (io_req_multi_free(req)) {
+				reqs[to_free++] = req;
+				if (to_free == ARRAY_SIZE(reqs))
+					io_free_req_many(ctx, reqs, &to_free);
+			} else {
+				req->flags |= REQ_F_COMP_LOCKED;
+				io_free_req(req);
+			}
+		}
+	}
+	spin_unlock_irq(&ctx->completion_lock);
+
+	io_cqring_ev_posted(ctx);
+	io_free_req_many(ctx, reqs, &to_free);
+}
+
+static void io_poll_flush(struct io_wq_work **workptr)
+{
+	struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
+	struct llist_node *nodes;
+
+	nodes = llist_del_all(&req->ctx->poll_llist);
+	if (nodes)
+		__io_poll_flush(req->ctx, nodes);
+}
+
 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 			void *key)
 {
@@ -3088,7 +3143,6 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 	struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
 	struct io_ring_ctx *ctx = req->ctx;
 	__poll_t mask = key_to_poll(key);
-	unsigned long flags;
 
 	/* for instances that support it check for an event match first: */
 	if (mask && !(mask & poll->events))
@@ -3102,17 +3156,31 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 	 * If we have a link timeout we're going to need the completion_lock
 	 * for finalizing the request, mark us as having grabbed that already.
 	 */
-	if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
-		hash_del(&req->hash_node);
-		io_poll_complete(req, mask, 0);
-		req->flags |= REQ_F_COMP_LOCKED;
-		io_put_req(req);
-		spin_unlock_irqrestore(&ctx->completion_lock, flags);
+	if (mask) {
+		unsigned long flags;
 
-		io_cqring_ev_posted(ctx);
-	} else {
-		io_queue_async_work(req);
+		if (llist_empty(&ctx->poll_llist) &&
+		    spin_trylock_irqsave(&ctx->completion_lock, flags)) {
+			hash_del(&req->hash_node);
+			io_poll_complete(req, mask, 0);
+			req->flags |= REQ_F_COMP_LOCKED;
+			io_put_req(req);
+			spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
+			io_cqring_ev_posted(ctx);
+			req = NULL;
+		} else {
+			req->result = mask;
+			req->llist_node.next = NULL;
+			/* if the list wasn't empty, we're done */
+			if (!llist_add(&req->llist_node, &ctx->poll_llist))
+				req = NULL;
+			else
+				req->work.func = io_poll_flush;
+		}
 	}
+	if (req)
+		io_queue_async_work(req);
 
 	return 1;
 }