xprtrdma: Replace rpcrdma_receive_wq with a per-xprt workqueue

To address a connection-close ordering problem, we need the ability
to drain the RPC completions running on rpcrdma_receive_wq for just
one transport. Give each transport its own RPC completion workqueue,
and drain that workqueue when disconnecting the transport.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index dc23977..5738c9f 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1356,7 +1356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 
 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
-	queue_work(rpcrdma_receive_wq, &rep->rr_work);
+	queue_work(buf->rb_completion_wq, &rep->rr_work);
 	return;
 
 out_badversion:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index ae2a838..91c476a 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -444,10 +444,14 @@ xprt_rdma_close(struct rpc_xprt *xprt)
 	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
+	might_sleep();
+
 	dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);
 
+	/* Prevent marshaling and sending of new requests */
+	xprt_clear_connected(xprt);
+
 	if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
-		xprt_clear_connected(xprt);
 		rpcrdma_ia_remove(ia);
 		return;
 	}
@@ -858,8 +862,6 @@ void xprt_rdma_cleanup(void)
 		dprintk("RPC:       %s: xprt_unregister returned %i\n",
 			__func__, rc);
 
-	rpcrdma_destroy_wq();
-
 	rc = xprt_unregister_transport(&xprt_rdma_bc);
 	if (rc)
 		dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
@@ -870,20 +872,13 @@ int xprt_rdma_init(void)
 {
 	int rc;
 
-	rc = rpcrdma_alloc_wq();
+	rc = xprt_register_transport(&xprt_rdma);
 	if (rc)
 		return rc;
 
-	rc = xprt_register_transport(&xprt_rdma);
-	if (rc) {
-		rpcrdma_destroy_wq();
-		return rc;
-	}
-
 	rc = xprt_register_transport(&xprt_rdma_bc);
 	if (rc) {
 		xprt_unregister_transport(&xprt_rdma);
-		rpcrdma_destroy_wq();
 		return rc;
 	}
 
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e4461e7..cff3a5d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -80,33 +80,23 @@ static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
 static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
-
-int
-rpcrdma_alloc_wq(void)
+/* Wait for outstanding transport work to finish.
+ */
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 {
-	struct workqueue_struct *recv_wq;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	recv_wq = alloc_workqueue("xprtrdma_receive",
-				  WQ_MEM_RECLAIM | WQ_HIGHPRI,
-				  0);
-	if (!recv_wq)
-		return -ENOMEM;
+	/* Flush Receives, then wait for deferred Reply work
+	 * to complete.
+	 */
+	ib_drain_qp(ia->ri_id->qp);
+	drain_workqueue(buf->rb_completion_wq);
 
-	rpcrdma_receive_wq = recv_wq;
-	return 0;
-}
-
-void
-rpcrdma_destroy_wq(void)
-{
-	struct workqueue_struct *wq;
-
-	if (rpcrdma_receive_wq) {
-		wq = rpcrdma_receive_wq;
-		rpcrdma_receive_wq = NULL;
-		destroy_workqueue(wq);
-	}
+	/* Deferred Reply processing might have scheduled
+	 * local invalidations.
+	 */
+	ib_drain_sq(ia->ri_id->qp);
 }
 
 /**
@@ -483,7 +473,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 	 *   connection is already gone.
 	 */
 	if (ia->ri_id->qp) {
-		ib_drain_qp(ia->ri_id->qp);
+		rpcrdma_xprt_drain(r_xprt);
 		rdma_destroy_qp(ia->ri_id);
 		ia->ri_id->qp = NULL;
 	}
@@ -825,8 +815,10 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	return rc;
 }
 
-/*
- * rpcrdma_ep_disconnect
+/**
+ * rpcrdma_ep_disconnect - Disconnect underlying transport
+ * @ep: endpoint to disconnect
+ * @ia: associated interface adapter
  *
  * This is separate from destroy to facilitate the ability
  * to reconnect without recreating the endpoint.
@@ -837,19 +829,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 void
 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
+	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
+						   rx_ep);
 	int rc;
 
+	/* returns without wait if ID is not connected */
 	rc = rdma_disconnect(ia->ri_id);
 	if (!rc)
-		/* returns without wait if not connected */
 		wait_event_interruptible(ep->rep_connect_wait,
 							ep->rep_connected != 1);
 	else
 		ep->rep_connected = rc;
-	trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
-					       rx_ep), rc);
+	trace_xprtrdma_disconnect(r_xprt, rc);
 
-	ib_drain_qp(ia->ri_id->qp);
+	rpcrdma_xprt_drain(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -1183,6 +1176,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 	if (rc)
 		goto out;
 
+	buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
+						WQ_MEM_RECLAIM | WQ_HIGHPRI,
+						0,
+			r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
+	if (!buf->rb_completion_wq)
+		goto out;
+
 	return 0;
 out:
 	rpcrdma_buffer_destroy(buf);
@@ -1241,6 +1241,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
 	cancel_delayed_work_sync(&buf->rb_refresh_worker);
 
+	if (buf->rb_completion_wq) {
+		destroy_workqueue(buf->rb_completion_wq);
+		buf->rb_completion_wq = NULL;
+	}
+
 	rpcrdma_sendctxs_destroy(buf);
 
 	while (!list_empty(&buf->rb_recv_bufs)) {
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 788124c..3f198cd 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -412,6 +412,7 @@ struct rpcrdma_buffer {
 
 	u32			rb_bc_max_requests;
 
+	struct workqueue_struct *rb_completion_wq;
 	struct delayed_work	rb_refresh_worker;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -547,8 +548,6 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
 bool frwr_is_supported(struct rpcrdma_ia *);
 bool fmr_is_supported(struct rpcrdma_ia *);
 
-extern struct workqueue_struct *rpcrdma_receive_wq;
-
 /*
  * Endpoint calls - xprtrdma/verbs.c
  */
@@ -603,9 +602,6 @@ rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
 	return __rpcrdma_dma_map_regbuf(ia, rb);
 }
 
-int rpcrdma_alloc_wq(void);
-void rpcrdma_destroy_wq(void);
-
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */