xprtrdma: Replace rpcrdma_receive_wq with a per-xprt workqueue

To address a connection-close ordering problem, we need the ability
to drain the RPC completions running on rpcrdma_receive_wq for just
one transport. Give each transport its own RPC completion workqueue,
and drain that workqueue when disconnecting the transport.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e4461e7..cff3a5d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -80,33 +80,23 @@ static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
 static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
-
-int
-rpcrdma_alloc_wq(void)
+/* Wait for outstanding transport work to finish.
+ */
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 {
-	struct workqueue_struct *recv_wq;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	recv_wq = alloc_workqueue("xprtrdma_receive",
-				  WQ_MEM_RECLAIM | WQ_HIGHPRI,
-				  0);
-	if (!recv_wq)
-		return -ENOMEM;
+	/* Flush Receives, then wait for deferred Reply work
+	 * to complete.
+	 */
+	ib_drain_qp(ia->ri_id->qp);
+	drain_workqueue(buf->rb_completion_wq);
 
-	rpcrdma_receive_wq = recv_wq;
-	return 0;
-}
-
-void
-rpcrdma_destroy_wq(void)
-{
-	struct workqueue_struct *wq;
-
-	if (rpcrdma_receive_wq) {
-		wq = rpcrdma_receive_wq;
-		rpcrdma_receive_wq = NULL;
-		destroy_workqueue(wq);
-	}
+	/* Deferred Reply processing might have scheduled
+	 * local invalidations.
+	 */
+	ib_drain_sq(ia->ri_id->qp);
 }
 
 /**
@@ -483,7 +473,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 	 *   connection is already gone.
 	 */
 	if (ia->ri_id->qp) {
-		ib_drain_qp(ia->ri_id->qp);
+		rpcrdma_xprt_drain(r_xprt);
 		rdma_destroy_qp(ia->ri_id);
 		ia->ri_id->qp = NULL;
 	}
@@ -825,8 +815,10 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	return rc;
 }
 
-/*
- * rpcrdma_ep_disconnect
+/**
+ * rpcrdma_ep_disconnect - Disconnect underlying transport
+ * @ep: endpoint to disconnect
+ * @ia: associated interface adapter
  *
  * This is separate from destroy to facilitate the ability
  * to reconnect without recreating the endpoint.
@@ -837,19 +829,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 void
 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
+	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
+						   rx_ep);
 	int rc;
 
+	/* returns without wait if ID is not connected */
 	rc = rdma_disconnect(ia->ri_id);
 	if (!rc)
-		/* returns without wait if not connected */
 		wait_event_interruptible(ep->rep_connect_wait,
 							ep->rep_connected != 1);
 	else
 		ep->rep_connected = rc;
-	trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
-					       rx_ep), rc);
+	trace_xprtrdma_disconnect(r_xprt, rc);
 
-	ib_drain_qp(ia->ri_id->qp);
+	rpcrdma_xprt_drain(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -1183,6 +1176,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 	if (rc)
 		goto out;
 
+	buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
+						WQ_MEM_RECLAIM | WQ_HIGHPRI,
+						0,
+			r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
+	if (!buf->rb_completion_wq)
+		goto out;
+
 	return 0;
 out:
 	rpcrdma_buffer_destroy(buf);
@@ -1241,6 +1241,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
 	cancel_delayed_work_sync(&buf->rb_refresh_worker);
 
+	if (buf->rb_completion_wq) {
+		destroy_workqueue(buf->rb_completion_wq);
+		buf->rb_completion_wq = NULL;
+	}
+
 	rpcrdma_sendctxs_destroy(buf);
 
 	while (!list_empty(&buf->rb_recv_bufs)) {