xprtrdma: Refactor Receive accounting

Clean up: Divide the work cleanly:

- rpcrdma_wc_receive is responsible only for RDMA Receives
- rpcrdma_reply_handler is responsible only for RPC Replies
- the posted send and receive counts both belong in rpcrdma_ep

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index b093058..2efe2d7 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -570,7 +570,7 @@ TRACE_EVENT(xprtrdma_post_recvs,
 		__entry->r_xprt = r_xprt;
 		__entry->count = count;
 		__entry->status = status;
-		__entry->posted = r_xprt->rx_buf.rb_posted_receives;
+		__entry->posted = r_xprt->rx_ep.rep_receive_count;
 		__assign_str(addr, rpcrdma_addrstr(r_xprt));
 		__assign_str(port, rpcrdma_portstr(r_xprt));
 	),
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index e5b367a..2cb07a3 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -207,7 +207,6 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
 	if (rc < 0)
 		goto failed_marshal;
 
-	rpcrdma_post_recvs(r_xprt, true);
 	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 		goto drop_connection;
 	return 0;
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 9f53e02..dc23977 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1312,11 +1312,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	u32 credits;
 	__be32 *p;
 
-	--buf->rb_posted_receives;
-
-	if (rep->rr_hdrbuf.head[0].iov_len == 0)
-		goto out_badstatus;
-
 	/* Fixed transport header fields */
 	xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
 			rep->rr_hdrbuf.head[0].iov_base);
@@ -1361,31 +1356,21 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 
 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
-
-	rpcrdma_post_recvs(r_xprt, false);
 	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 	return;
 
 out_badversion:
 	trace_xprtrdma_reply_vers(rep);
-	goto repost;
+	goto out;
 
-/* The RPC transaction has already been terminated, or the header
- * is corrupt.
- */
 out_norqst:
 	spin_unlock(&xprt->queue_lock);
 	trace_xprtrdma_reply_rqst(rep);
-	goto repost;
+	goto out;
 
 out_shortreply:
 	trace_xprtrdma_reply_short(rep);
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
- */
-repost:
-	rpcrdma_post_recvs(r_xprt, false);
-out_badstatus:
+out:
 	rpcrdma_recv_buffer_put(rep);
 }
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index b9bc7f9..e4461e7 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -78,6 +78,7 @@ static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
 static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
+static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
 struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
 
@@ -189,11 +190,13 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 	struct ib_cqe *cqe = wc->wr_cqe;
 	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 					       rr_cqe);
+	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 
-	/* WARNING: Only wr_id and status are reliable at this point */
+	/* WARNING: Only wr_cqe and status are reliable at this point */
 	trace_xprtrdma_wc_receive(wc);
+	--r_xprt->rx_ep.rep_receive_count;
 	if (wc->status != IB_WC_SUCCESS)
-		goto out_fail;
+		goto out_flushed;
 
 	/* status == SUCCESS means all fields in wc are trustworthy */
 	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
@@ -204,17 +207,16 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 				   rdmab_addr(rep->rr_rdmabuf),
 				   wc->byte_len, DMA_FROM_DEVICE);
 
-out_schedule:
+	rpcrdma_post_recvs(r_xprt, false);
 	rpcrdma_reply_handler(rep);
 	return;
 
-out_fail:
+out_flushed:
 	if (wc->status != IB_WC_WR_FLUSH_ERR)
 		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
 		       ib_wc_status_msg(wc->status),
 		       wc->status, wc->vendor_err);
-	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
-	goto out_schedule;
+	rpcrdma_recv_buffer_put(rep);
 }
 
 static void
@@ -581,6 +583,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	init_waitqueue_head(&ep->rep_connect_wait);
 	INIT_DELAYED_WORK(&ep->rep_disconnect_worker,
 			  rpcrdma_disconnect_worker);
+	ep->rep_receive_count = 0;
 
 	sendcq = ib_alloc_cq(ia->ri_device, NULL,
 			     ep->rep_attr.cap.max_send_wr + 1,
@@ -1174,7 +1177,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 	}
 
 	buf->rb_credits = 1;
-	buf->rb_posted_receives = 0;
 	INIT_LIST_HEAD(&buf->rb_recv_bufs);
 
 	rc = rpcrdma_sendctxs_create(r_xprt);
@@ -1511,25 +1513,20 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 	return 0;
 }
 
-/**
- * rpcrdma_post_recvs - Maybe post some Receive buffers
- * @r_xprt: controlling transport
- * @temp: when true, allocate temp rpcrdma_rep objects
- *
- */
-void
+static void
 rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 	struct ib_recv_wr *wr, *bad_wr;
 	int needed, count, rc;
 
 	rc = 0;
 	count = 0;
 	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
-	if (buf->rb_posted_receives > needed)
+	if (ep->rep_receive_count > needed)
 		goto out;
-	needed -= buf->rb_posted_receives;
+	needed -= ep->rep_receive_count;
 
 	count = 0;
 	wr = NULL;
@@ -1577,7 +1574,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 			--count;
 		}
 	}
-	buf->rb_posted_receives += count;
+	ep->rep_receive_count += count;
 out:
 	trace_xprtrdma_post_recvs(r_xprt, count, rc);
 }
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a13ccb6..788124c 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -102,6 +102,7 @@ struct rpcrdma_ep {
 	struct rpcrdma_connect_private	rep_cm_private;
 	struct rdma_conn_param	rep_remote_cma;
 	struct delayed_work	rep_disconnect_worker;
+	int			rep_receive_count;
 };
 
 /* Pre-allocate extra Work Requests for handling backward receives
@@ -404,7 +405,6 @@ struct rpcrdma_buffer {
 	unsigned long		rb_flags;
 	u32			rb_max_requests;
 	u32			rb_credits;	/* most recent credit grant */
-	int			rb_posted_receives;
 
 	u32			rb_bc_srv_max_requests;
 	spinlock_t		rb_reqslock;	/* protect rb_allreqs */
@@ -560,7 +560,6 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 
 int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
 				struct rpcrdma_req *);
-void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
 /*
  * Buffer calls - xprtrdma/verbs.c