xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, xprtrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

Because each ib_cqe carries a pointer to a completion method, the
core can now post its own operations on a consumer's QP, and handle
the completions itself, without changes to the consumer.

Send completions were previously handled entirely in the completion
upcall handler (ie, deferring to a process context is unneeded).
Thus IB_POLL_SOFTIRQ is a direct replacement for the current
xprtrdma send code path.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Devesh Sharma <devesh.sharma@broadcom.com>
Reviewed-by: Sagi Grimberg <sagig@mellanox.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 05779f4..f5ed9f9 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -112,73 +112,20 @@
 	}
 }
 
-static void
-rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
-{
-	struct rpcrdma_ep *ep = context;
-
-	pr_err("RPC:       %s: %s on device %s ep %p\n",
-	       __func__, ib_event_msg(event->event),
-		event->device->name, context);
-	if (ep->rep_connected == 1) {
-		ep->rep_connected = -EIO;
-		rpcrdma_conn_func(ep);
-		wake_up_all(&ep->rep_connect_wait);
-	}
-}
-
-static void
-rpcrdma_sendcq_process_wc(struct ib_wc *wc)
-{
-	/* WARNING: Only wr_id and status are reliable at this point */
-	if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
-		if (wc->status != IB_WC_SUCCESS &&
-		    wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("RPC:       %s: SEND: %s\n",
-			       __func__, ib_wc_status_msg(wc->status));
-	} else {
-		struct rpcrdma_mw *r;
-
-		r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-		r->mw_sendcompletion(wc);
-	}
-}
-
-/* The common case is a single send completion is waiting. By
- * passing two WC entries to ib_poll_cq, a return code of 1
- * means there is exactly one WC waiting and no more. We don't
- * have to invoke ib_poll_cq again to know that the CQ has been
- * properly drained.
+/**
+ * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
  */
 static void
-rpcrdma_sendcq_poll(struct ib_cq *cq)
+rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 {
-	struct ib_wc *pos, wcs[2];
-	int count, rc;
-
-	do {
-		pos = wcs;
-
-		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
-		if (rc < 0)
-			break;
-
-		count = rc;
-		while (count-- > 0)
-			rpcrdma_sendcq_process_wc(pos++);
-	} while (rc == ARRAY_SIZE(wcs));
-	return;
-}
-
-/* Handle provider send completion upcalls.
- */
-static void
-rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
-{
-	do {
-		rpcrdma_sendcq_poll(cq);
-	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
-				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
+		       ib_wc_status_msg(wc->status),
+		       wc->status, wc->vendor_err);
 }
 
 static void
@@ -263,8 +210,6 @@
 
 	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
 		rpcrdma_receive_wc(NULL, &wc);
-	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
-		rpcrdma_sendcq_process_wc(&wc);
 }
 
 static int
@@ -556,9 +501,8 @@
 				struct rpcrdma_create_data_internal *cdata)
 {
 	struct ib_cq *sendcq, *recvcq;
-	struct ib_cq_init_attr cq_attr = {};
 	unsigned int max_qp_wr;
-	int rc, err;
+	int rc;
 
 	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
 		dprintk("RPC:       %s: insufficient sge's available\n",
@@ -610,9 +554,9 @@
 	init_waitqueue_head(&ep->rep_connect_wait);
 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
-	cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
-	sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
-			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+	sendcq = ib_alloc_cq(ia->ri_device, NULL,
+			     ep->rep_attr.cap.max_send_wr + 1,
+			     0, IB_POLL_SOFTIRQ);
 	if (IS_ERR(sendcq)) {
 		rc = PTR_ERR(sendcq);
 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
@@ -620,13 +564,6 @@
 		goto out1;
 	}
 
-	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
-	if (rc) {
-		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-			__func__, rc);
-		goto out2;
-	}
-
 	recvcq = ib_alloc_cq(ia->ri_device, NULL,
 			     ep->rep_attr.cap.max_recv_wr + 1,
 			     0, IB_POLL_SOFTIRQ);
@@ -661,10 +598,7 @@
 	return 0;
 
 out2:
-	err = ib_destroy_cq(sendcq);
-	if (err)
-		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-			__func__, err);
+	ib_free_cq(sendcq);
 out1:
 	if (ia->ri_dma_mr)
 		ib_dereg_mr(ia->ri_dma_mr);
@@ -700,11 +634,7 @@
 	}
 
 	ib_free_cq(ep->rep_attr.recv_cq);
-
-	rc = ib_destroy_cq(ep->rep_attr.send_cq);
-	if (rc)
-		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-			__func__, rc);
+	ib_free_cq(ep->rep_attr.send_cq);
 
 	if (ia->ri_dma_mr) {
 		rc = ib_dereg_mr(ia->ri_dma_mr);
@@ -883,6 +813,7 @@
 	spin_lock(&buffer->rb_reqslock);
 	list_add(&req->rl_all, &buffer->rb_allreqs);
 	spin_unlock(&buffer->rb_reqslock);
+	req->rl_cqe.done = rpcrdma_wc_send;
 	req->rl_buffer = &r_xprt->rx_buf;
 	return req;
 }
@@ -1246,7 +1177,7 @@
 	}
 
 	send_wr.next = NULL;
-	send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
+	send_wr.wr_cqe = &req->rl_cqe;
 	send_wr.sg_list = iov;
 	send_wr.num_sge = req->rl_niovs;
 	send_wr.opcode = IB_WR_SEND;