xprtrdma: Refactor MR recovery work queues

I found that commit ead3f26e359e ("xprtrdma: Add ro_unmap_safe
memreg method"), which introduces ro_unmap_safe, never wired up the
FMR recovery worker.

The FMR and FRWR recovery work queues both do the same thing.
Instead of setting up separate individual work queues for this,
schedule a delayed worker to deal with them, since recovering MRs is
not performance-critical.

Fixes: ead3f26e359e ("xprtrdma: Add ro_unmap_safe memreg method")
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Tested-by: Steve Wise <swise@opengridcomputing.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index b044d98a..77a371d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -777,6 +777,41 @@
 	ib_drain_qp(ia->ri_id->qp);
 }
 
+static void
+rpcrdma_mr_recovery_worker(struct work_struct *work)
+{
+	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+						  rb_recovery_worker.work);
+	struct rpcrdma_mw *mw;
+
+	spin_lock(&buf->rb_recovery_lock);
+	while (!list_empty(&buf->rb_stale_mrs)) {
+		mw = list_first_entry(&buf->rb_stale_mrs,
+				      struct rpcrdma_mw, mw_list);
+		list_del_init(&mw->mw_list);
+		spin_unlock(&buf->rb_recovery_lock);
+
+		dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
+		mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
+
+		spin_lock(&buf->rb_recovery_lock);
+	};
+	spin_unlock(&buf->rb_recovery_lock);
+}
+
+void
+rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
+{
+	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+	spin_lock(&buf->rb_recovery_lock);
+	list_add(&mw->mw_list, &buf->rb_stale_mrs);
+	spin_unlock(&buf->rb_recovery_lock);
+
+	schedule_delayed_work(&buf->rb_recovery_worker, 0);
+}
+
 struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
@@ -837,8 +872,12 @@
 
 	buf->rb_max_requests = r_xprt->rx_data.max_requests;
 	buf->rb_bc_srv_max_requests = 0;
-	spin_lock_init(&buf->rb_lock);
 	atomic_set(&buf->rb_credits, 1);
+	spin_lock_init(&buf->rb_lock);
+	spin_lock_init(&buf->rb_recovery_lock);
+	INIT_LIST_HEAD(&buf->rb_stale_mrs);
+	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
+			  rpcrdma_mr_recovery_worker);
 
 	rc = ia->ri_ops->ro_init(r_xprt);
 	if (rc)
@@ -923,6 +962,8 @@
 {
 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
 
+	cancel_delayed_work_sync(&buf->rb_recovery_worker);
+
 	while (!list_empty(&buf->rb_recv_bufs)) {
 		struct rpcrdma_rep *rep;