xprtrdma: Basic support for Remote Invalidation

Have frwr's ro_unmap_sync recognize an invalidated rkey that appears
as part of a Receive completion. Local invalidation can be skipped
for that rkey.

Use an out-of-band signaling mechanism to indicate to the server
that the client is prepared to receive RDMA Send With Invalidate.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 16690a1..1ebb09e 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -273,6 +273,7 @@
 	 */
 	list_for_each_entry(mw, &req->rl_registered, mw_list)
 		list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
+	r_xprt->rx_stats.local_inv_needed++;
 	rc = ib_unmap_fmr(&unmap_list);
 	if (rc)
 		goto out_reset;
@@ -330,4 +331,5 @@
 	.ro_init_mr			= fmr_op_init_mr,
 	.ro_release_mr			= fmr_op_release_mr,
 	.ro_displayname			= "fmr",
+	.ro_send_w_inv_ok		= 0,
 };
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index fcfcf3a..e82d5cf 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -67,6 +67,8 @@
  * pending send queue WRs before the transport is reconnected.
  */
 
+#include <linux/sunrpc/rpc_rdma.h>
+
 #include "xprt_rdma.h"
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -471,6 +473,7 @@
 frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
 	struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
+	struct rpcrdma_rep *rep = req->rl_reply;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_mw *mw, *tmp;
 	struct rpcrdma_frmr *f;
@@ -486,6 +489,12 @@
 	f = NULL;
 	invalidate_wrs = pos = prev = NULL;
 	list_for_each_entry(mw, &req->rl_registered, mw_list) {
+		if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
+		    (mw->mw_handle == rep->rr_inv_rkey)) {
+			mw->frmr.fr_state = FRMR_IS_INVALID;
+			continue;
+		}
+
 		pos = __frwr_prepare_linv_wr(mw);
 
 		if (!invalidate_wrs)
@@ -495,6 +504,8 @@
 		prev = pos;
 		f = &mw->frmr;
 	}
+	if (!f)
+		goto unmap;
 
 	/* Strong send queue ordering guarantees that when the
 	 * last WR in the chain completes, all WRs in the chain
@@ -509,6 +520,7 @@
 	 * replaces the QP. The RPC reply handler won't call us
 	 * unless ri_id->qp is a valid pointer.
 	 */
+	r_xprt->rx_stats.local_inv_needed++;
 	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
 	if (rc)
 		goto reset_mrs;
@@ -575,4 +587,5 @@
 	.ro_init_mr			= frwr_op_init_mr,
 	.ro_release_mr			= frwr_op_release_mr,
 	.ro_displayname			= "frwr",
+	.ro_send_w_inv_ok		= RPCRDMA_CMP_F_SND_W_INV_OK,
 };
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index ea734c2..31a434d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -231,7 +231,8 @@
 
 static int
 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
-	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
+	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg,
+	bool reminv_expected)
 {
 	int len, n, p, page_base;
 	struct page **ppages;
@@ -273,6 +274,13 @@
 	if (type == rpcrdma_readch)
 		return n;
 
+	/* When encoding the Write list, some servers need to see an extra
+	 * segment for odd-length Write chunks. The upper layer provides
+	 * space in the tail iovec for this purpose.
+	 */
+	if (type == rpcrdma_writech && reminv_expected)
+		return n;
+
 	if (xdrbuf->tail[0].iov_len) {
 		/* the rpcrdma protocol allows us to omit any trailing
 		 * xdr pad bytes, saving the server an RDMA operation. */
@@ -329,7 +337,7 @@
 	if (rtype == rpcrdma_areadch)
 		pos = 0;
 	seg = req->rl_segments;
-	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, false);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
@@ -393,7 +401,8 @@
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
 				     rqst->rq_rcv_buf.head[0].iov_len,
-				     wtype, seg);
+				     wtype, seg,
+				     r_xprt->rx_ia.ri_reminv_expected);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
@@ -458,7 +467,8 @@
 	}
 
 	seg = req->rl_segments;
-	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+				     r_xprt->rx_ia.ri_reminv_expected);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 5adaa1d..7e11d71 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -730,10 +730,11 @@
 		   r_xprt->rx_stats.failed_marshal_count,
 		   r_xprt->rx_stats.bad_reply_count,
 		   r_xprt->rx_stats.nomsg_call_count);
-	seq_printf(seq, "%lu %lu %lu\n",
+	seq_printf(seq, "%lu %lu %lu %lu\n",
 		   r_xprt->rx_stats.mrs_recovered,
 		   r_xprt->rx_stats.mrs_orphaned,
-		   r_xprt->rx_stats.mrs_allocated);
+		   r_xprt->rx_stats.mrs_allocated,
+		   r_xprt->rx_stats.local_inv_needed);
 }
 
 static int
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 6bab841..e2d6390 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -185,6 +185,9 @@
 		__func__, rep, wc->byte_len);
 
 	rep->rr_len = wc->byte_len;
+	rep->rr_wc_flags = wc->wc_flags;
+	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
+
 	ib_dma_sync_single_for_cpu(rep->rr_device,
 				   rdmab_addr(rep->rr_rdmabuf),
 				   rep->rr_len, DMA_FROM_DEVICE);
@@ -212,12 +215,15 @@
 	const struct rpcrdma_connect_private *pmsg = param->private_data;
 	unsigned int rsize, wsize;
 
+	/* Default settings for RPC-over-RDMA Version One */
+	r_xprt->rx_ia.ri_reminv_expected = false;
 	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 
 	if (pmsg &&
 	    pmsg->cp_magic == rpcrdma_cmp_magic &&
 	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
+		r_xprt->rx_ia.ri_reminv_expected = true;
 		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
 		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
 	}
@@ -568,7 +574,7 @@
 	/* Prepare RDMA-CM private message */
 	pmsg->cp_magic = rpcrdma_cmp_magic;
 	pmsg->cp_version = RPCRDMA_CMP_VERSION;
-	pmsg->cp_flags = 0;
+	pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
 	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
 	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
 	ep->rep_remote_cma.private_data = pmsg;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 89df168..64b4e22 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -74,6 +74,7 @@
 	unsigned int		ri_max_frmr_depth;
 	unsigned int		ri_max_inline_write;
 	unsigned int		ri_max_inline_read;
+	bool			ri_reminv_expected;
 	struct ib_qp_attr	ri_qp_attr;
 	struct ib_qp_init_attr	ri_qp_init_attr;
 };
@@ -187,6 +188,8 @@
 struct rpcrdma_rep {
 	struct ib_cqe		rr_cqe;
 	unsigned int		rr_len;
+	int			rr_wc_flags;
+	u32			rr_inv_rkey;
 	struct ib_device	*rr_device;
 	struct rpcrdma_xprt	*rr_rxprt;
 	struct work_struct	rr_work;
@@ -385,6 +388,7 @@
 	unsigned long		mrs_recovered;
 	unsigned long		mrs_orphaned;
 	unsigned long		mrs_allocated;
+	unsigned long		local_inv_needed;
 };
 
 /*
@@ -408,6 +412,7 @@
 				      struct rpcrdma_mw *);
 	void		(*ro_release_mr)(struct rpcrdma_mw *);
 	const char	*ro_displayname;
+	const int	ro_send_w_inv_ok;
 };
 
 extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;