SUNRPC: Add a server side per-connection limit

Allow the user to limit the number of requests serviced through a single
connection, to help prevent faster clients from starving slower clients.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index e7082a4..2adc8db 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -21,6 +21,10 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static unsigned int svc_rpc_per_connection_limit __read_mostly;
+module_param(svc_rpc_per_connection_limit, uint, 0644);
+
+
 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
 static int svc_deferred_recv(struct svc_rqst *rqstp);
 static struct cache_deferred_req *svc_defer(struct cache_req *req);
@@ -329,12 +333,41 @@
 }
 EXPORT_SYMBOL_GPL(svc_print_addr);
 
+static bool svc_xprt_slots_in_range(struct svc_xprt *xprt)
+{
+	unsigned int limit = svc_rpc_per_connection_limit;
+	int nrqsts = atomic_read(&xprt->xpt_nr_rqsts);
+
+	return limit == 0 || (nrqsts >= 0 && nrqsts < limit);
+}
+
+static bool svc_xprt_reserve_slot(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+{
+	if (!test_bit(RQ_DATA, &rqstp->rq_flags)) {
+		if (!svc_xprt_slots_in_range(xprt))
+			return false;
+		atomic_inc(&xprt->xpt_nr_rqsts);
+		set_bit(RQ_DATA, &rqstp->rq_flags);
+	}
+	return true;
+}
+
+static void svc_xprt_release_slot(struct svc_rqst *rqstp)
+{
+	struct svc_xprt	*xprt = rqstp->rq_xprt;
+	if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) {
+		atomic_dec(&xprt->xpt_nr_rqsts);
+		svc_xprt_enqueue(xprt);
+	}
+}
+
 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 {
 	if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
 		return true;
 	if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) {
-		if (xprt->xpt_ops->xpo_has_wspace(xprt))
+		if (xprt->xpt_ops->xpo_has_wspace(xprt) &&
+		    svc_xprt_slots_in_range(xprt))
 			return true;
 		trace_svc_xprt_no_write_space(xprt);
 		return false;
@@ -516,8 +549,8 @@
 
 	rqstp->rq_res.head[0].iov_len = 0;
 	svc_reserve(rqstp, 0);
+	svc_xprt_release_slot(rqstp);
 	rqstp->rq_xprt = NULL;
-
 	svc_xprt_put(xprt);
 }
 
@@ -785,7 +818,7 @@
 			svc_add_new_temp_xprt(serv, newxpt);
 		else
 			module_put(xprt->xpt_class->xcl_owner);
-	} else {
+	} else if (svc_xprt_reserve_slot(rqstp, xprt)) {
 		/* XPT_DATA|XPT_DEFERRED case: */
 		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
 			rqstp, rqstp->rq_pool->sp_id, xprt,