blk-mq: improve DM's blk-mq IO merging via blk_insert_cloned_request feedback

blk_insert_cloned_request() is called in the fast path of a dm-rq driver
(e.g. blk-mq request-based DM mpath).  blk_insert_cloned_request() uses
blk_mq_request_bypass_insert() to directly append the request to the
blk-mq hctx->dispatch_list of the underlying queue.

1) This way isn't efficient enough because the hctx spinlock is always
used.

2) With blk_insert_cloned_request(), we completely bypass underlying
queue's elevator and depend on the upper-level dm-rq driver's elevator
to schedule IO.  But dm-rq currently can't get the underlying queue's
dispatch feedback at all.  Without knowing whether a request was issued
or not (e.g. due to underlying queue being busy) the dm-rq elevator will
not be able to provide effective IO merging (as a side-effect of dm-rq
currently blindly destaging a request from its elevator only to requeue
it after a delay, which kills any opportunity for merging).  This
obviously causes very bad sequential IO performance.

Fix this by updating blk_insert_cloned_request() to use
blk_mq_request_direct_issue().  blk_mq_request_direct_issue() allows a
request to be issued directly to the underlying queue and returns the
dispatch feedback (blk_status_t).  If blk_mq_request_direct_issue()
returns BLK_SYS_RESOURCE the dm-rq driver will now use DM_MAPIO_REQUEUE
to _not_ destage the request.  Whereby preserving the opportunity to
merge IO.

With this, request-based DM's blk-mq sequential IO performance is vastly
improved (as much as 3X in mpath/virtio-scsi testing).

Signed-off-by: Ming Lei <ming.lei@redhat.com>
[blk-mq.c changes heavily influenced by Ming Lei's initial solution, but
they were refactored to make them less fragile and easier to read/review]
Signed-off-by: Mike Snitzer <snitzer@redhat.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/block/blk-mq.c b/block/blk-mq.c
index ddc46f2..e383a20 100644
--- a/block/blk-mq.c
+++ b/block/blk-mq.c
@@ -1775,15 +1775,19 @@ static blk_status_t __blk_mq_issue_directly(struct blk_mq_hw_ctx *hctx,
 
 static void __blk_mq_fallback_to_insert(struct blk_mq_hw_ctx *hctx,
 					struct request *rq,
-					bool run_queue)
+					bool run_queue, bool bypass_insert)
 {
-	blk_mq_sched_insert_request(rq, false, run_queue, false,
-					hctx->flags & BLK_MQ_F_BLOCKING);
+	if (!bypass_insert)
+		blk_mq_sched_insert_request(rq, false, run_queue, false,
+					    hctx->flags & BLK_MQ_F_BLOCKING);
+	else
+		blk_mq_request_bypass_insert(rq, run_queue);
 }
 
 static blk_status_t __blk_mq_try_issue_directly(struct blk_mq_hw_ctx *hctx,
 						struct request *rq,
-						blk_qc_t *cookie)
+						blk_qc_t *cookie,
+						bool bypass_insert)
 {
 	struct request_queue *q = rq->q;
 	bool run_queue = true;
@@ -1794,7 +1798,7 @@ static blk_status_t __blk_mq_try_issue_directly(struct blk_mq_hw_ctx *hctx,
 		goto insert;
 	}
 
-	if (q->elevator)
+	if (q->elevator && !bypass_insert)
 		goto insert;
 
 	if (!blk_mq_get_driver_tag(rq, NULL, false))
@@ -1807,7 +1811,9 @@ static blk_status_t __blk_mq_try_issue_directly(struct blk_mq_hw_ctx *hctx,
 
 	return __blk_mq_issue_directly(hctx, rq, cookie);
 insert:
-	__blk_mq_fallback_to_insert(hctx, rq, run_queue);
+	__blk_mq_fallback_to_insert(hctx, rq, run_queue, bypass_insert);
+	if (bypass_insert)
+		return BLK_STS_RESOURCE;
 
 	return BLK_STS_OK;
 }
@@ -1822,15 +1828,30 @@ static void blk_mq_try_issue_directly(struct blk_mq_hw_ctx *hctx,
 
 	hctx_lock(hctx, &srcu_idx);
 
-	ret = __blk_mq_try_issue_directly(hctx, rq, cookie);
+	ret = __blk_mq_try_issue_directly(hctx, rq, cookie, false);
 	if (ret == BLK_STS_RESOURCE)
-		__blk_mq_fallback_to_insert(hctx, rq, true);
+		__blk_mq_fallback_to_insert(hctx, rq, true, false);
 	else if (ret != BLK_STS_OK)
 		blk_mq_end_request(rq, ret);
 
 	hctx_unlock(hctx, srcu_idx);
 }
 
+blk_status_t blk_mq_request_direct_issue(struct request *rq)
+{
+	blk_status_t ret;
+	int srcu_idx;
+	blk_qc_t unused_cookie;
+	struct blk_mq_ctx *ctx = rq->mq_ctx;
+	struct blk_mq_hw_ctx *hctx = blk_mq_map_queue(rq->q, ctx->cpu);
+
+	hctx_lock(hctx, &srcu_idx);
+	ret = __blk_mq_try_issue_directly(hctx, rq, &unused_cookie, true);
+	hctx_unlock(hctx, srcu_idx);
+
+	return ret;
+}
+
 static blk_qc_t blk_mq_make_request(struct request_queue *q, struct bio *bio)
 {
 	const int is_sync = op_is_sync(bio->bi_opf);