sunrpc: add a rcu_head to svc_rqst and use kfree_rcu to free it

...also make the manipulation of sp_all_threads list use RCU-friendly
functions.

Signed-off-by: Jeff Layton <jlayton@primarydata.com>
Tested-by: Chris Worley <chris.worley@primarydata.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 5f0ab39..7f80a99 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -223,6 +223,7 @@
 struct svc_rqst {
 	struct list_head	rq_list;	/* idle list */
 	struct list_head	rq_all;		/* all threads list */
+	struct rcu_head		rq_rcu_head;	/* for RCU deferred kfree */
 	struct svc_xprt *	rq_xprt;	/* transport ptr */
 
 	struct sockaddr_storage	rq_addr;	/* peer address */
@@ -262,6 +263,7 @@
 #define	RQ_SPLICE_OK	(4)			/* turned off in gss privacy
 						 * to prevent encrypting page
 						 * cache pages */
+#define	RQ_VICTIM	(5)			/* about to be shut down */
 	unsigned long		rq_flags;	/* flags field */
 
 	void *			rq_argp;	/* decoded arguments */
diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h
index 5848fc2..08a5fed 100644
--- a/include/trace/events/sunrpc.h
+++ b/include/trace/events/sunrpc.h
@@ -418,7 +418,8 @@
 		{ (1UL << RQ_LOCAL),		"RQ_LOCAL"},		\
 		{ (1UL << RQ_USEDEFERRAL),	"RQ_USEDEFERRAL"},	\
 		{ (1UL << RQ_DROPME),		"RQ_DROPME"},		\
-		{ (1UL << RQ_SPLICE_OK),	"RQ_SPLICE_OK"})
+		{ (1UL << RQ_SPLICE_OK),	"RQ_SPLICE_OK"},	\
+		{ (1UL << RQ_VICTIM),		"RQ_VICTIM"})
 
 TRACE_EVENT(svc_recv,
 	TP_PROTO(struct svc_rqst *rqst, int status),
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a06a891..b90d1bc 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -616,7 +616,7 @@
 	serv->sv_nrthreads++;
 	spin_lock_bh(&pool->sp_lock);
 	pool->sp_nrthreads++;
-	list_add(&rqstp->rq_all, &pool->sp_all_threads);
+	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
 	spin_unlock_bh(&pool->sp_lock);
 	rqstp->rq_server = serv;
 	rqstp->rq_pool = pool;
@@ -684,7 +684,8 @@
 		 * so we don't try to kill it again.
 		 */
 		rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
-		list_del_init(&rqstp->rq_all);
+		set_bit(RQ_VICTIM, &rqstp->rq_flags);
+		list_del_rcu(&rqstp->rq_all);
 		task = rqstp->rq_task;
 	}
 	spin_unlock_bh(&pool->sp_lock);
@@ -782,10 +783,11 @@
 
 	spin_lock_bh(&pool->sp_lock);
 	pool->sp_nrthreads--;
-	list_del(&rqstp->rq_all);
+	if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
+		list_del_rcu(&rqstp->rq_all);
 	spin_unlock_bh(&pool->sp_lock);
 
-	kfree(rqstp);
+	kfree_rcu(rqstp, rq_rcu_head);
 
 	/* Release the server */
 	if (serv)