Merge commit 'origin' into devel
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index c5c0175..06f064d 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -170,6 +170,8 @@
 	BUG_ON(!RB_EMPTY_ROOT(&clp->cl_state_owners));
 	if (__test_and_clear_bit(NFS_CS_IDMAP, &clp->cl_res_state))
 		nfs_idmap_delete(clp);
+
+	rpc_destroy_wait_queue(&clp->cl_rpcwaitq);
 #endif
 }
 
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 16844f9..e442005 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -280,6 +280,7 @@
 		.rpc_client = NFS_CLIENT(inode),
 		.rpc_message = &msg,
 		.callback_ops = &nfs_read_direct_ops,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 	unsigned int pgbase;
@@ -323,7 +324,7 @@
 		data->inode = inode;
 		data->cred = msg.rpc_cred;
 		data->args.fh = NFS_FH(inode);
-		data->args.context = ctx;
+		data->args.context = get_nfs_open_context(ctx);
 		data->args.offset = pos;
 		data->args.pgbase = pgbase;
 		data->args.pages = data->pagevec;
@@ -446,6 +447,7 @@
 	struct rpc_task_setup task_setup_data = {
 		.rpc_client = NFS_CLIENT(inode),
 		.callback_ops = &nfs_write_direct_ops,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 
@@ -537,6 +539,7 @@
 		.rpc_message = &msg,
 		.callback_ops = &nfs_commit_direct_ops,
 		.callback_data = data,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 
@@ -546,6 +549,7 @@
 	data->args.fh = NFS_FH(data->inode);
 	data->args.offset = 0;
 	data->args.count = 0;
+	data->args.context = get_nfs_open_context(dreq->ctx);
 	data->res.count = 0;
 	data->res.fattr = &data->fattr;
 	data->res.verf = &data->verf;
@@ -682,6 +686,7 @@
 		.rpc_client = NFS_CLIENT(inode),
 		.rpc_message = &msg,
 		.callback_ops = &nfs_write_direct_ops,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 	size_t wsize = NFS_SERVER(inode)->wsize;
@@ -728,7 +733,7 @@
 		data->inode = inode;
 		data->cred = msg.rpc_cred;
 		data->args.fh = NFS_FH(inode);
-		data->args.context = ctx;
+		data->args.context = get_nfs_open_context(ctx);
 		data->args.offset = pos;
 		data->args.pgbase = pgbase;
 		data->args.pages = data->pagevec;
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index a4c7cf2..c49f6d8 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -522,8 +522,12 @@
 
 static void __put_nfs_open_context(struct nfs_open_context *ctx, int wait)
 {
-	struct inode *inode = ctx->path.dentry->d_inode;
+	struct inode *inode;
 
+	if (ctx == NULL)
+		return;
+
+	inode = ctx->path.dentry->d_inode;
 	if (!atomic_dec_and_lock(&ctx->count, &inode->i_lock))
 		return;
 	list_del(&ctx->list);
@@ -1217,6 +1221,36 @@
 	kmem_cache_destroy(nfs_inode_cachep);
 }
 
+struct workqueue_struct *nfsiod_workqueue;
+
+/*
+ * start up the nfsiod workqueue
+ */
+static int nfsiod_start(void)
+{
+	struct workqueue_struct *wq;
+	dprintk("RPC:       creating workqueue nfsiod\n");
+	wq = create_singlethread_workqueue("nfsiod");
+	if (wq == NULL)
+		return -ENOMEM;
+	nfsiod_workqueue = wq;
+	return 0;
+}
+
+/*
+ * Destroy the nfsiod workqueue
+ */
+static void nfsiod_stop(void)
+{
+	struct workqueue_struct *wq;
+
+	wq = nfsiod_workqueue;
+	if (wq == NULL)
+		return;
+	nfsiod_workqueue = NULL;
+	destroy_workqueue(wq);
+}
+
 /*
  * Initialize NFS
  */
@@ -1224,6 +1258,10 @@
 {
 	int err;
 
+	err = nfsiod_start();
+	if (err)
+		goto out6;
+
 	err = nfs_fs_proc_init();
 	if (err)
 		goto out5;
@@ -1270,6 +1308,8 @@
 out4:
 	nfs_fs_proc_exit();
 out5:
+	nfsiod_stop();
+out6:
 	return err;
 }
 
@@ -1285,6 +1325,7 @@
 #endif
 	unregister_nfs_fs();
 	nfs_fs_proc_exit();
+	nfsiod_stop();
 }
 
 /* Not quite true; I just maintain it */
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index 9319927..4c1122a 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -146,6 +146,7 @@
 extern int nfs_access_cache_shrinker(int nr_to_scan, gfp_t gfp_mask);
 
 /* inode.c */
+extern struct workqueue_struct *nfsiod_workqueue;
 extern struct inode *nfs_alloc_inode(struct super_block *sb);
 extern void nfs_destroy_inode(struct inode *);
 extern int nfs_write_inode(struct inode *,int);
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 7ce0786..bbb0d58 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -51,6 +51,7 @@
 
 #include "nfs4_fs.h"
 #include "delegation.h"
+#include "internal.h"
 #include "iostat.h"
 
 #define NFSDBG_FACILITY		NFSDBG_PROC
@@ -773,6 +774,7 @@
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_open_confirm_ops,
 		.callback_data = data,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 	int status;
@@ -910,6 +912,7 @@
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_open_ops,
 		.callback_data = data,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 	int status;
@@ -1315,6 +1318,7 @@
 		.rpc_client = server->client,
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_close_ops,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 	int status = -ENOMEM;
@@ -2761,10 +2765,10 @@
 		case -NFS4ERR_STALE_CLIENTID:
 		case -NFS4ERR_STALE_STATEID:
 		case -NFS4ERR_EXPIRED:
-			rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL, NULL);
+			rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL);
 			nfs4_schedule_state_recovery(clp);
 			if (test_bit(NFS4CLNT_STATE_RECOVER, &clp->cl_state) == 0)
-				rpc_wake_up_task(task);
+				rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task);
 			task->tk_status = 0;
 			return -EAGAIN;
 		case -NFS4ERR_DELAY:
@@ -3235,6 +3239,7 @@
 		.rpc_client = NFS_CLIENT(lsp->ls_state->inode),
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_locku_ops,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 
@@ -3419,6 +3424,7 @@
 		.rpc_client = NFS_CLIENT(state->inode),
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_lock_ops,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 	};
 	int ret;
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index b962397..7775435 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -292,8 +292,10 @@
 	spin_unlock(&clp->cl_lock);
 	if (sp == new)
 		get_rpccred(cred);
-	else
+	else {
+		rpc_destroy_wait_queue(&new->so_sequence.wait);
 		kfree(new);
+	}
 	return sp;
 }
 
@@ -310,6 +312,7 @@
 		return;
 	nfs4_remove_state_owner(clp, sp);
 	spin_unlock(&clp->cl_lock);
+	rpc_destroy_wait_queue(&sp->so_sequence.wait);
 	put_rpccred(cred);
 	kfree(sp);
 }
@@ -529,6 +532,7 @@
 	spin_lock(&clp->cl_lock);
 	nfs_free_unique_id(&clp->cl_lockowner_id, &lsp->ls_id);
 	spin_unlock(&clp->cl_lock);
+	rpc_destroy_wait_queue(&lsp->ls_sequence.wait);
 	kfree(lsp);
 }
 
@@ -731,7 +735,7 @@
 		list_add_tail(&seqid->list, &sequence->list);
 	if (list_first_entry(&sequence->list, struct nfs_seqid, list) == seqid)
 		goto unlock;
-	rpc_sleep_on(&sequence->wait, task, NULL, NULL);
+	rpc_sleep_on(&sequence->wait, task, NULL);
 	status = -EAGAIN;
 unlock:
 	spin_unlock(&sequence->lock);
diff --git a/fs/nfs/read.c b/fs/nfs/read.c
index 3d7d963..be9e827 100644
--- a/fs/nfs/read.c
+++ b/fs/nfs/read.c
@@ -58,22 +58,19 @@
 	return p;
 }
 
-static void nfs_readdata_rcu_free(struct rcu_head *head)
+static void nfs_readdata_free(struct nfs_read_data *p)
 {
-	struct nfs_read_data *p = container_of(head, struct nfs_read_data, task.u.tk_rcu);
 	if (p && (p->pagevec != &p->page_array[0]))
 		kfree(p->pagevec);
 	mempool_free(p, nfs_rdata_mempool);
 }
 
-static void nfs_readdata_free(struct nfs_read_data *rdata)
-{
-	call_rcu_bh(&rdata->task.u.tk_rcu, nfs_readdata_rcu_free);
-}
-
 void nfs_readdata_release(void *data)
 {
-        nfs_readdata_free(data);
+	struct nfs_read_data *rdata = data;
+
+	put_nfs_open_context(rdata->args.context);
+	nfs_readdata_free(rdata);
 }
 
 static
@@ -174,6 +171,7 @@
 		.rpc_message = &msg,
 		.callback_ops = call_ops,
 		.callback_data = data,
+		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC | swap_flags,
 	};
 
@@ -186,7 +184,7 @@
 	data->args.pgbase = req->wb_pgbase + offset;
 	data->args.pages  = data->pagevec;
 	data->args.count  = count;
-	data->args.context = req->wb_context;
+	data->args.context = get_nfs_open_context(req->wb_context);
 
 	data->res.fattr   = &data->fattr;
 	data->res.count   = count;
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 80c61fdb..1667e398 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -58,19 +58,13 @@
 	return p;
 }
 
-static void nfs_commit_rcu_free(struct rcu_head *head)
+void nfs_commit_free(struct nfs_write_data *p)
 {
-	struct nfs_write_data *p = container_of(head, struct nfs_write_data, task.u.tk_rcu);
 	if (p && (p->pagevec != &p->page_array[0]))
 		kfree(p->pagevec);
 	mempool_free(p, nfs_commit_mempool);
 }
 
-void nfs_commit_free(struct nfs_write_data *wdata)
-{
-	call_rcu_bh(&wdata->task.u.tk_rcu, nfs_commit_rcu_free);
-}
-
 struct nfs_write_data *nfs_writedata_alloc(unsigned int pagecount)
 {
 	struct nfs_write_data *p = mempool_alloc(nfs_wdata_mempool, GFP_NOFS);
@@ -92,21 +86,18 @@
 	return p;
 }
 
-static void nfs_writedata_rcu_free(struct rcu_head *head)
+static void nfs_writedata_free(struct nfs_write_data *p)
 {
-	struct nfs_write_data *p = container_of(head, struct nfs_write_data, task.u.tk_rcu);
 	if (p && (p->pagevec != &p->page_array[0]))
 		kfree(p->pagevec);
 	mempool_free(p, nfs_wdata_mempool);
 }
 
-static void nfs_writedata_free(struct nfs_write_data *wdata)
+void nfs_writedata_release(void *data)
 {
-	call_rcu_bh(&wdata->task.u.tk_rcu, nfs_writedata_rcu_free);
-}
+	struct nfs_write_data *wdata = data;
 
-void nfs_writedata_release(void *wdata)
-{
+	put_nfs_open_context(wdata->args.context);
 	nfs_writedata_free(wdata);
 }
 
@@ -360,15 +351,13 @@
 /*
  * Insert a write request into an inode
  */
-static int nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
+static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
 {
 	struct nfs_inode *nfsi = NFS_I(inode);
 	int error;
 
 	error = radix_tree_insert(&nfsi->nfs_page_tree, req->wb_index, req);
-	BUG_ON(error == -EEXIST);
-	if (error)
-		return error;
+	BUG_ON(error);
 	if (!nfsi->npages) {
 		igrab(inode);
 		if (nfs_have_delegation(inode, FMODE_WRITE))
@@ -378,8 +367,8 @@
 	set_page_private(req->wb_page, (unsigned long)req);
 	nfsi->npages++;
 	kref_get(&req->wb_kref);
-	radix_tree_tag_set(&nfsi->nfs_page_tree, req->wb_index, NFS_PAGE_TAG_LOCKED);
-	return 0;
+	radix_tree_tag_set(&nfsi->nfs_page_tree, req->wb_index,
+				NFS_PAGE_TAG_LOCKED);
 }
 
 /*
@@ -591,6 +580,13 @@
 		/* Loop over all inode entries and see if we find
 		 * A request for the page we wish to update
 		 */
+		if (new) {
+			if (radix_tree_preload(GFP_NOFS)) {
+				nfs_release_request(new);
+				return ERR_PTR(-ENOMEM);
+			}
+		}
+
 		spin_lock(&inode->i_lock);
 		req = nfs_page_find_request_locked(page);
 		if (req) {
@@ -601,28 +597,27 @@
 				error = nfs_wait_on_request(req);
 				nfs_release_request(req);
 				if (error < 0) {
-					if (new)
+					if (new) {
+						radix_tree_preload_end();
 						nfs_release_request(new);
+					}
 					return ERR_PTR(error);
 				}
 				continue;
 			}
 			spin_unlock(&inode->i_lock);
-			if (new)
+			if (new) {
+				radix_tree_preload_end();
 				nfs_release_request(new);
+			}
 			break;
 		}
 
 		if (new) {
-			int error;
 			nfs_lock_request_dontget(new);
-			error = nfs_inode_add_request(inode, new);
-			if (error) {
-				spin_unlock(&inode->i_lock);
-				nfs_unlock_request(new);
-				return ERR_PTR(error);
-			}
+			nfs_inode_add_request(inode, new);
 			spin_unlock(&inode->i_lock);
+			radix_tree_preload_end();
 			req = new;
 			goto zero_page;
 		}
@@ -800,6 +795,7 @@
 		.rpc_message = &msg,
 		.callback_ops = call_ops,
 		.callback_data = data,
+		.workqueue = nfsiod_workqueue,
 		.flags = flags,
 		.priority = priority,
 	};
@@ -816,7 +812,7 @@
 	data->args.pgbase = req->wb_pgbase + offset;
 	data->args.pages  = data->pagevec;
 	data->args.count  = count;
-	data->args.context = req->wb_context;
+	data->args.context = get_nfs_open_context(req->wb_context);
 	data->args.stable  = NFS_UNSTABLE;
 	if (how & FLUSH_STABLE) {
 		data->args.stable = NFS_DATA_SYNC;
@@ -1153,8 +1149,11 @@
 
 
 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
-void nfs_commit_release(void *wdata)
+void nfs_commit_release(void *data)
 {
+	struct nfs_write_data *wdata = data;
+
+	put_nfs_open_context(wdata->args.context);
 	nfs_commit_free(wdata);
 }
 
@@ -1181,6 +1180,7 @@
 		.rpc_message = &msg,
 		.callback_ops = &nfs_commit_ops,
 		.callback_data = data,
+		.workqueue = nfsiod_workqueue,
 		.flags = flags,
 		.priority = priority,
 	};
@@ -1197,6 +1197,7 @@
 	/* Note: we always request a commit of the entire inode */
 	data->args.offset = 0;
 	data->args.count  = 0;
+	data->args.context = get_nfs_open_context(first->wb_context);
 	data->res.count   = 0;
 	data->res.fattr   = &data->fattr;
 	data->res.verf    = &data->verf;
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 129a86e..6fff7f8 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -127,11 +127,12 @@
 void		rpcb_getport_async(struct rpc_task *);
 
 void		rpc_call_start(struct rpc_task *);
-int		rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg,
-			       int flags, const struct rpc_call_ops *tk_ops,
+int		rpc_call_async(struct rpc_clnt *clnt,
+			       const struct rpc_message *msg, int flags,
+			       const struct rpc_call_ops *tk_ops,
 			       void *calldata);
-int		rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg,
-			      int flags);
+int		rpc_call_sync(struct rpc_clnt *clnt,
+			      const struct rpc_message *msg, int flags);
 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred,
 			       int flags);
 void		rpc_restart_call(struct rpc_task *);
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index f689f02..d1a5c8c 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -11,7 +11,6 @@
 
 #include <linux/timer.h>
 #include <linux/sunrpc/types.h>
-#include <linux/rcupdate.h>
 #include <linux/spinlock.h>
 #include <linux/wait.h>
 #include <linux/workqueue.h>
@@ -33,7 +32,8 @@
 struct rpc_wait {
 	struct list_head	list;		/* wait queue links */
 	struct list_head	links;		/* Links to related tasks */
-	struct rpc_wait_queue *	rpc_waitq;	/* RPC wait queue we're on */
+	struct list_head	timer_list;	/* Timer list */
+	unsigned long		expires;
 };
 
 /*
@@ -57,33 +57,25 @@
 	__u8			tk_cred_retry;
 
 	/*
-	 * timeout_fn   to be executed by timer bottom half
 	 * callback	to be executed after waking up
 	 * action	next procedure for async tasks
 	 * tk_ops	caller callbacks
 	 */
-	void			(*tk_timeout_fn)(struct rpc_task *);
 	void			(*tk_callback)(struct rpc_task *);
 	void			(*tk_action)(struct rpc_task *);
 	const struct rpc_call_ops *tk_ops;
 	void *			tk_calldata;
 
-	/*
-	 * tk_timer is used for async processing by the RPC scheduling
-	 * primitives. You should not access this directly unless
-	 * you have a pathological interest in kernel oopses.
-	 */
-	struct timer_list	tk_timer;	/* kernel timer */
 	unsigned long		tk_timeout;	/* timeout for rpc_sleep() */
 	unsigned short		tk_flags;	/* misc flags */
 	unsigned long		tk_runstate;	/* Task run status */
 	struct workqueue_struct	*tk_workqueue;	/* Normally rpciod, but could
 						 * be any workqueue
 						 */
+	struct rpc_wait_queue 	*tk_waitqueue;	/* RPC wait queue we're on */
 	union {
 		struct work_struct	tk_work;	/* Async task work queue */
 		struct rpc_wait		tk_wait;	/* RPC wait */
-		struct rcu_head		tk_rcu;		/* for task deletion */
 	} u;
 
 	unsigned short		tk_timeouts;	/* maj timeouts */
@@ -123,6 +115,7 @@
 	const struct rpc_message *rpc_message;
 	const struct rpc_call_ops *callback_ops;
 	void *callback_data;
+	struct workqueue_struct *workqueue;
 	unsigned short flags;
 	signed char priority;
 };
@@ -147,9 +140,7 @@
 
 #define RPC_TASK_RUNNING	0
 #define RPC_TASK_QUEUED		1
-#define RPC_TASK_WAKEUP		2
-#define RPC_TASK_HAS_TIMER	3
-#define RPC_TASK_ACTIVE		4
+#define RPC_TASK_ACTIVE		2
 
 #define RPC_IS_RUNNING(t)	test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
 #define rpc_set_running(t)	set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
@@ -171,15 +162,6 @@
 		smp_mb__after_clear_bit(); \
 	} while (0)
 
-#define rpc_start_wakeup(t) \
-	(test_and_set_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate) == 0)
-#define rpc_finish_wakeup(t) \
-	do { \
-		smp_mb__before_clear_bit(); \
-		clear_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate); \
-		smp_mb__after_clear_bit(); \
-	} while (0)
-
 #define RPC_IS_ACTIVATED(t)	test_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate)
 
 /*
@@ -192,6 +174,12 @@
 #define RPC_PRIORITY_HIGH	(1)
 #define RPC_NR_PRIORITY		(1 + RPC_PRIORITY_HIGH - RPC_PRIORITY_LOW)
 
+struct rpc_timer {
+	struct timer_list timer;
+	struct list_head list;
+	unsigned long expires;
+};
+
 /*
  * RPC synchronization objects
  */
@@ -204,6 +192,7 @@
 	unsigned char		count;			/* # task groups remaining serviced so far */
 	unsigned char		nr;			/* # tasks remaining for cookie */
 	unsigned short		qlen;			/* total # tasks waiting in queue */
+	struct rpc_timer	timer_list;
 #ifdef RPC_DEBUG
 	const char *		name;
 #endif
@@ -229,9 +218,11 @@
 void		rpc_execute(struct rpc_task *);
 void		rpc_init_priority_wait_queue(struct rpc_wait_queue *, const char *);
 void		rpc_init_wait_queue(struct rpc_wait_queue *, const char *);
+void		rpc_destroy_wait_queue(struct rpc_wait_queue *);
 void		rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *,
-					rpc_action action, rpc_action timer);
-void		rpc_wake_up_task(struct rpc_task *);
+					rpc_action action);
+void		rpc_wake_up_queued_task(struct rpc_wait_queue *,
+					struct rpc_task *);
 void		rpc_wake_up(struct rpc_wait_queue *);
 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
 void		rpc_wake_up_status(struct rpc_wait_queue *, int);
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 6dac387..ef63849 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -266,6 +266,7 @@
 	BUG_ON(!list_empty(&gss_msg->list));
 	if (gss_msg->ctx != NULL)
 		gss_put_ctx(gss_msg->ctx);
+	rpc_destroy_wait_queue(&gss_msg->rpc_waitqueue);
 	kfree(gss_msg);
 }
 
@@ -408,13 +409,13 @@
 	}
 	spin_lock(&inode->i_lock);
 	if (gss_cred->gc_upcall != NULL)
-		rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL, NULL);
+		rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL);
 	else if (gss_msg->ctx == NULL && gss_msg->msg.errno >= 0) {
 		task->tk_timeout = 0;
 		gss_cred->gc_upcall = gss_msg;
 		/* gss_upcall_callback will release the reference to gss_upcall_msg */
 		atomic_inc(&gss_msg->count);
-		rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback, NULL);
+		rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback);
 	} else
 		err = gss_msg->msg.errno;
 	spin_unlock(&inode->i_lock);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 8c6a7f1..ea14314 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -548,7 +548,7 @@
  * @msg: RPC call parameters
  * @flags: RPC call flags
  */
-int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
+int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
 {
 	struct rpc_task	*task;
 	struct rpc_task_setup task_setup_data = {
@@ -579,7 +579,7 @@
  * @data: user call data
  */
 int
-rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
+rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
 	       const struct rpc_call_ops *tk_ops, void *data)
 {
 	struct rpc_task	*task;
@@ -1066,7 +1066,7 @@
 	if (task->tk_msg.rpc_proc->p_decode != NULL)
 		return;
 	task->tk_action = rpc_exit_task;
-	rpc_wake_up_task(task);
+	rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
 }
 
 /*
@@ -1535,7 +1535,7 @@
 				proc = -1;
 
 			if (RPC_IS_QUEUED(t))
-				rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
+				rpc_waitq = rpc_qname(t->tk_waitqueue);
 
 			printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
 				t->tk_pid, proc,
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 3164a08..f480c71 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -298,7 +298,7 @@
 
 	/* Put self on queue before sending rpcbind request, in case
 	 * rpcb_getport_done completes before we return from rpc_run_task */
-	rpc_sleep_on(&xprt->binding, task, NULL, NULL);
+	rpc_sleep_on(&xprt->binding, task, NULL);
 
 	/* Someone else may have bound if we slept */
 	if (xprt_bound(xprt)) {
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 4c66912..cae219c 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -38,9 +38,9 @@
 static mempool_t	*rpc_task_mempool __read_mostly;
 static mempool_t	*rpc_buffer_mempool __read_mostly;
 
-static void			__rpc_default_timer(struct rpc_task *task);
 static void			rpc_async_schedule(struct work_struct *);
 static void			 rpc_release_task(struct rpc_task *task);
+static void __rpc_queue_timer_fn(unsigned long ptr);
 
 /*
  * RPC tasks sit here while waiting for conditions to improve.
@@ -57,41 +57,30 @@
  * queue->lock and bh_disabled in order to avoid races within
  * rpc_run_timer().
  */
-static inline void
-__rpc_disable_timer(struct rpc_task *task)
+static void
+__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
+	if (task->tk_timeout == 0)
+		return;
 	dprintk("RPC: %5u disabling timer\n", task->tk_pid);
-	task->tk_timeout_fn = NULL;
 	task->tk_timeout = 0;
+	list_del(&task->u.tk_wait.timer_list);
+	if (list_empty(&queue->timer_list.list))
+		del_timer(&queue->timer_list.timer);
 }
 
-/*
- * Run a timeout function.
- * We use the callback in order to allow __rpc_wake_up_task()
- * and friends to disable the timer synchronously on SMP systems
- * without calling del_timer_sync(). The latter could cause a
- * deadlock if called while we're holding spinlocks...
- */
-static void rpc_run_timer(struct rpc_task *task)
+static void
+rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
 {
-	void (*callback)(struct rpc_task *);
-
-	callback = task->tk_timeout_fn;
-	task->tk_timeout_fn = NULL;
-	if (callback && RPC_IS_QUEUED(task)) {
-		dprintk("RPC: %5u running timer\n", task->tk_pid);
-		callback(task);
-	}
-	smp_mb__before_clear_bit();
-	clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
-	smp_mb__after_clear_bit();
+	queue->timer_list.expires = expires;
+	mod_timer(&queue->timer_list.timer, expires);
 }
 
 /*
  * Set up a timer for the current task.
  */
-static inline void
-__rpc_add_timer(struct rpc_task *task, rpc_action timer)
+static void
+__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
 	if (!task->tk_timeout)
 		return;
@@ -99,27 +88,10 @@
 	dprintk("RPC: %5u setting alarm for %lu ms\n",
 			task->tk_pid, task->tk_timeout * 1000 / HZ);
 
-	if (timer)
-		task->tk_timeout_fn = timer;
-	else
-		task->tk_timeout_fn = __rpc_default_timer;
-	set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
-	mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
-}
-
-/*
- * Delete any timer for the current task. Because we use del_timer_sync(),
- * this function should never be called while holding queue->lock.
- */
-static void
-rpc_delete_timer(struct rpc_task *task)
-{
-	if (RPC_IS_QUEUED(task))
-		return;
-	if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
-		del_singleshot_timer_sync(&task->tk_timer);
-		dprintk("RPC: %5u deleting timer\n", task->tk_pid);
-	}
+	task->u.tk_wait.expires = jiffies + task->tk_timeout;
+	if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
+		rpc_set_queue_timer(queue, task->u.tk_wait.expires);
+	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
 }
 
 /*
@@ -161,7 +133,7 @@
 		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
 	else
 		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
-	task->u.tk_wait.rpc_waitq = queue;
+	task->tk_waitqueue = queue;
 	queue->qlen++;
 	rpc_set_queued(task);
 
@@ -181,22 +153,18 @@
 		list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
 		list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
 	}
-	list_del(&task->u.tk_wait.list);
 }
 
 /*
  * Remove request from queue.
  * Note: must be called with spin lock held.
  */
-static void __rpc_remove_wait_queue(struct rpc_task *task)
+static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-	struct rpc_wait_queue *queue;
-	queue = task->u.tk_wait.rpc_waitq;
-
+	__rpc_disable_timer(queue, task);
 	if (RPC_IS_PRIORITY(queue))
 		__rpc_remove_wait_queue_priority(task);
-	else
-		list_del(&task->u.tk_wait.list);
+	list_del(&task->u.tk_wait.list);
 	queue->qlen--;
 	dprintk("RPC: %5u removed from queue %p \"%s\"\n",
 			task->tk_pid, queue, rpc_qname(queue));
@@ -229,6 +197,9 @@
 		INIT_LIST_HEAD(&queue->tasks[i]);
 	queue->maxpriority = nr_queues - 1;
 	rpc_reset_waitqueue_priority(queue);
+	queue->qlen = 0;
+	setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
+	INIT_LIST_HEAD(&queue->timer_list.list);
 #ifdef RPC_DEBUG
 	queue->name = qname;
 #endif
@@ -245,6 +216,12 @@
 }
 EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
 
+void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
+{
+	del_timer_sync(&queue->timer_list.timer);
+}
+EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
+
 static int rpc_wait_bit_killable(void *word)
 {
 	if (fatal_signal_pending(current))
@@ -313,7 +290,6 @@
  */
 static void rpc_make_runnable(struct rpc_task *task)
 {
-	BUG_ON(task->tk_timeout_fn);
 	rpc_clear_queued(task);
 	if (rpc_test_and_set_running(task))
 		return;
@@ -326,7 +302,7 @@
 		int status;
 
 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
-		status = queue_work(task->tk_workqueue, &task->u.tk_work);
+		status = queue_work(rpciod_workqueue, &task->u.tk_work);
 		if (status < 0) {
 			printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
 			task->tk_status = status;
@@ -343,7 +319,7 @@
  * as it's on a wait queue.
  */
 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
-			rpc_action action, rpc_action timer)
+			rpc_action action)
 {
 	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
 			task->tk_pid, rpc_qname(q), jiffies);
@@ -357,11 +333,11 @@
 
 	BUG_ON(task->tk_callback != NULL);
 	task->tk_callback = action;
-	__rpc_add_timer(task, timer);
+	__rpc_add_timer(q, task);
 }
 
 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
-				rpc_action action, rpc_action timer)
+				rpc_action action)
 {
 	/* Mark the task as being activated if so needed */
 	rpc_set_active(task);
@@ -370,18 +346,19 @@
 	 * Protect the queue operations.
 	 */
 	spin_lock_bh(&q->lock);
-	__rpc_sleep_on(q, task, action, timer);
+	__rpc_sleep_on(q, task, action);
 	spin_unlock_bh(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on);
 
 /**
  * __rpc_do_wake_up_task - wake up a single rpc_task
+ * @queue: wait queue
  * @task: task to be woken up
  *
  * Caller must hold queue->lock, and have cleared the task queued flag.
  */
-static void __rpc_do_wake_up_task(struct rpc_task *task)
+static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
 	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
 			task->tk_pid, jiffies);
@@ -395,8 +372,7 @@
 		return;
 	}
 
-	__rpc_disable_timer(task);
-	__rpc_remove_wait_queue(task);
+	__rpc_remove_wait_queue(queue, task);
 
 	rpc_make_runnable(task);
 
@@ -404,48 +380,32 @@
 }
 
 /*
- * Wake up the specified task
+ * Wake up a queued task while the queue lock is being held
  */
-static void __rpc_wake_up_task(struct rpc_task *task)
+static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-	if (rpc_start_wakeup(task)) {
-		if (RPC_IS_QUEUED(task))
-			__rpc_do_wake_up_task(task);
-		rpc_finish_wakeup(task);
-	}
+	if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
+		__rpc_do_wake_up_task(queue, task);
 }
 
 /*
- * Default timeout handler if none specified by user
+ * Wake up a task on a specific queue
  */
-static void
-__rpc_default_timer(struct rpc_task *task)
+void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-	dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid);
-	task->tk_status = -ETIMEDOUT;
-	rpc_wake_up_task(task);
+	spin_lock_bh(&queue->lock);
+	rpc_wake_up_task_queue_locked(queue, task);
+	spin_unlock_bh(&queue->lock);
 }
+EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
 
 /*
  * Wake up the specified task
  */
-void rpc_wake_up_task(struct rpc_task *task)
+static void rpc_wake_up_task(struct rpc_task *task)
 {
-	rcu_read_lock_bh();
-	if (rpc_start_wakeup(task)) {
-		if (RPC_IS_QUEUED(task)) {
-			struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
-
-			/* Note: we're already in a bh-safe context */
-			spin_lock(&queue->lock);
-			__rpc_do_wake_up_task(task);
-			spin_unlock(&queue->lock);
-		}
-		rpc_finish_wakeup(task);
-	}
-	rcu_read_unlock_bh();
+	rpc_wake_up_queued_task(task->tk_waitqueue, task);
 }
-EXPORT_SYMBOL_GPL(rpc_wake_up_task);
 
 /*
  * Wake up the next task on a priority queue.
@@ -495,7 +455,7 @@
 new_owner:
 	rpc_set_waitqueue_owner(queue, task->tk_owner);
 out:
-	__rpc_wake_up_task(task);
+	rpc_wake_up_task_queue_locked(queue, task);
 	return task;
 }
 
@@ -508,16 +468,14 @@
 
 	dprintk("RPC:       wake_up_next(%p \"%s\")\n",
 			queue, rpc_qname(queue));
-	rcu_read_lock_bh();
-	spin_lock(&queue->lock);
+	spin_lock_bh(&queue->lock);
 	if (RPC_IS_PRIORITY(queue))
 		task = __rpc_wake_up_next_priority(queue);
 	else {
 		task_for_first(task, &queue->tasks[0])
-			__rpc_wake_up_task(task);
+			rpc_wake_up_task_queue_locked(queue, task);
 	}
-	spin_unlock(&queue->lock);
-	rcu_read_unlock_bh();
+	spin_unlock_bh(&queue->lock);
 
 	return task;
 }
@@ -534,18 +492,16 @@
 	struct rpc_task *task, *next;
 	struct list_head *head;
 
-	rcu_read_lock_bh();
-	spin_lock(&queue->lock);
+	spin_lock_bh(&queue->lock);
 	head = &queue->tasks[queue->maxpriority];
 	for (;;) {
 		list_for_each_entry_safe(task, next, head, u.tk_wait.list)
-			__rpc_wake_up_task(task);
+			rpc_wake_up_task_queue_locked(queue, task);
 		if (head == &queue->tasks[0])
 			break;
 		head--;
 	}
-	spin_unlock(&queue->lock);
-	rcu_read_unlock_bh();
+	spin_unlock_bh(&queue->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up);
 
@@ -561,26 +517,48 @@
 	struct rpc_task *task, *next;
 	struct list_head *head;
 
-	rcu_read_lock_bh();
-	spin_lock(&queue->lock);
+	spin_lock_bh(&queue->lock);
 	head = &queue->tasks[queue->maxpriority];
 	for (;;) {
 		list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
 			task->tk_status = status;
-			__rpc_wake_up_task(task);
+			rpc_wake_up_task_queue_locked(queue, task);
 		}
 		if (head == &queue->tasks[0])
 			break;
 		head--;
 	}
-	spin_unlock(&queue->lock);
-	rcu_read_unlock_bh();
+	spin_unlock_bh(&queue->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
 
+static void __rpc_queue_timer_fn(unsigned long ptr)
+{
+	struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
+	struct rpc_task *task, *n;
+	unsigned long expires, now, timeo;
+
+	spin_lock(&queue->lock);
+	expires = now = jiffies;
+	list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
+		timeo = task->u.tk_wait.expires;
+		if (time_after_eq(now, timeo)) {
+			dprintk("RPC: %5u timeout\n", task->tk_pid);
+			task->tk_status = -ETIMEDOUT;
+			rpc_wake_up_task_queue_locked(queue, task);
+			continue;
+		}
+		if (expires == now || time_after(expires, timeo))
+			expires = timeo;
+	}
+	if (!list_empty(&queue->timer_list.list))
+		rpc_set_queue_timer(queue, expires);
+	spin_unlock(&queue->lock);
+}
+
 static void __rpc_atrun(struct rpc_task *task)
 {
-	rpc_wake_up_task(task);
+	task->tk_status = 0;
 }
 
 /*
@@ -589,7 +567,7 @@
 void rpc_delay(struct rpc_task *task, unsigned long delay)
 {
 	task->tk_timeout = delay;
-	rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
+	rpc_sleep_on(&delay_queue, task, __rpc_atrun);
 }
 EXPORT_SYMBOL_GPL(rpc_delay);
 
@@ -644,10 +622,6 @@
 	BUG_ON(RPC_IS_QUEUED(task));
 
 	for (;;) {
-		/*
-		 * Garbage collection of pending timers...
-		 */
-		rpc_delete_timer(task);
 
 		/*
 		 * Execute any pending callback.
@@ -816,8 +790,6 @@
 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
 {
 	memset(task, 0, sizeof(*task));
-	setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer,
-			(unsigned long)task);
 	atomic_set(&task->tk_count, 1);
 	task->tk_flags  = task_setup_data->flags;
 	task->tk_ops = task_setup_data->callback_ops;
@@ -832,7 +804,7 @@
 	task->tk_owner = current->tgid;
 
 	/* Initialize workqueue for async tasks */
-	task->tk_workqueue = rpciod_workqueue;
+	task->tk_workqueue = task_setup_data->workqueue;
 
 	task->tk_client = task_setup_data->rpc_client;
 	if (task->tk_client != NULL) {
@@ -868,13 +840,6 @@
 	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 }
 
-static void rpc_free_task(struct rcu_head *rcu)
-{
-	struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
-	dprintk("RPC: %5u freeing task\n", task->tk_pid);
-	mempool_free(task, rpc_task_mempool);
-}
-
 /*
  * Create a new task for the specified client.
  */
@@ -898,12 +863,25 @@
 	return task;
 }
 
-
-void rpc_put_task(struct rpc_task *task)
+static void rpc_free_task(struct rpc_task *task)
 {
 	const struct rpc_call_ops *tk_ops = task->tk_ops;
 	void *calldata = task->tk_calldata;
 
+	if (task->tk_flags & RPC_TASK_DYNAMIC) {
+		dprintk("RPC: %5u freeing task\n", task->tk_pid);
+		mempool_free(task, rpc_task_mempool);
+	}
+	rpc_release_calldata(tk_ops, calldata);
+}
+
+static void rpc_async_release(struct work_struct *work)
+{
+	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
+}
+
+void rpc_put_task(struct rpc_task *task)
+{
 	if (!atomic_dec_and_test(&task->tk_count))
 		return;
 	/* Release resources */
@@ -915,9 +893,11 @@
 		rpc_release_client(task->tk_client);
 		task->tk_client = NULL;
 	}
-	if (task->tk_flags & RPC_TASK_DYNAMIC)
-		call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
-	rpc_release_calldata(tk_ops, calldata);
+	if (task->tk_workqueue != NULL) {
+		INIT_WORK(&task->u.tk_work, rpc_async_release);
+		queue_work(task->tk_workqueue, &task->u.tk_work);
+	} else
+		rpc_free_task(task);
 }
 EXPORT_SYMBOL_GPL(rpc_put_task);
 
@@ -937,9 +917,6 @@
 	}
 	BUG_ON (RPC_IS_QUEUED(task));
 
-	/* Synchronously delete any running timer */
-	rpc_delete_timer(task);
-
 #ifdef RPC_DEBUG
 	task->tk_magic = 0;
 #endif
@@ -1029,11 +1006,20 @@
 		kmem_cache_destroy(rpc_task_slabp);
 	if (rpc_buffer_slabp)
 		kmem_cache_destroy(rpc_buffer_slabp);
+	rpc_destroy_wait_queue(&delay_queue);
 }
 
 int
 rpc_init_mempool(void)
 {
+	/*
+	 * The following is not strictly a mempool initialisation,
+	 * but there is no harm in doing it here
+	 */
+	rpc_init_wait_queue(&delay_queue, "delayq");
+	if (!rpciod_start())
+		goto err_nomem;
+
 	rpc_task_slabp = kmem_cache_create("rpc_tasks",
 					     sizeof(struct rpc_task),
 					     0, SLAB_HWCACHE_ALIGN,
@@ -1054,13 +1040,6 @@
 						      rpc_buffer_slabp);
 	if (!rpc_buffer_mempool)
 		goto err_nomem;
-	if (!rpciod_start())
-		goto err_nomem;
-	/*
-	 * The following is not strictly a mempool initialisation,
-	 * but there is no harm in doing it here
-	 */
-	rpc_init_wait_queue(&delay_queue, "delayq");
 	return 0;
 err_nomem:
 	rpc_destroy_mempool();
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index d5553b8..85199c6 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -188,9 +188,9 @@
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
 	if (req && req->rq_ntrans)
-		rpc_sleep_on(&xprt->resend, task, NULL, NULL);
+		rpc_sleep_on(&xprt->resend, task, NULL);
 	else
-		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+		rpc_sleep_on(&xprt->sending, task, NULL);
 	return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
@@ -238,9 +238,9 @@
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
 	if (req && req->rq_ntrans)
-		rpc_sleep_on(&xprt->resend, task, NULL, NULL);
+		rpc_sleep_on(&xprt->resend, task, NULL);
 	else
-		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+		rpc_sleep_on(&xprt->sending, task, NULL);
 	return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -453,7 +453,7 @@
 	struct rpc_xprt *xprt = req->rq_xprt;
 
 	task->tk_timeout = req->rq_timeout;
-	rpc_sleep_on(&xprt->pending, task, NULL, NULL);
+	rpc_sleep_on(&xprt->pending, task, NULL);
 }
 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 
@@ -472,7 +472,7 @@
 	if (xprt->snd_task) {
 		dprintk("RPC:       write space: waking waiting task on "
 				"xprt %p\n", xprt);
-		rpc_wake_up_task(xprt->snd_task);
+		rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
 	}
 	spin_unlock_bh(&xprt->transport_lock);
 }
@@ -602,8 +602,7 @@
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 		queue_work(rpciod_workqueue, &xprt->task_cleanup);
-	else if (xprt->snd_task != NULL)
-		rpc_wake_up_task(xprt->snd_task);
+	xprt_wake_pending_tasks(xprt, -ENOTCONN);
 	spin_unlock_bh(&xprt->transport_lock);
 }
 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
@@ -653,7 +652,7 @@
 			task->tk_rqstp->rq_bytes_sent = 0;
 
 		task->tk_timeout = xprt->connect_timeout;
-		rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
+		rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
 		xprt->stat.connect_start = jiffies;
 		xprt->ops->connect(task);
 	}
@@ -749,18 +748,19 @@
 void xprt_complete_rqst(struct rpc_task *task, int copied)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
+	struct rpc_xprt *xprt = req->rq_xprt;
 
 	dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
 			task->tk_pid, ntohl(req->rq_xid), copied);
 
-	task->tk_xprt->stat.recvs++;
+	xprt->stat.recvs++;
 	task->tk_rtt = (long)jiffies - req->rq_xtime;
 
 	list_del_init(&req->rq_list);
 	/* Ensure all writes are done before we update req->rq_received */
 	smp_wmb();
 	req->rq_received = req->rq_private_buf.len = copied;
-	rpc_wake_up_task(task);
+	rpc_wake_up_queued_task(&xprt->pending, task);
 }
 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 
@@ -769,17 +769,17 @@
 	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_xprt *xprt = req->rq_xprt;
 
+	if (task->tk_status != -ETIMEDOUT)
+		return;
 	dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
 
-	spin_lock(&xprt->transport_lock);
+	spin_lock_bh(&xprt->transport_lock);
 	if (!req->rq_received) {
 		if (xprt->ops->timer)
 			xprt->ops->timer(task);
-		task->tk_status = -ETIMEDOUT;
-	}
-	task->tk_timeout = 0;
-	rpc_wake_up_task(task);
-	spin_unlock(&xprt->transport_lock);
+	} else
+		task->tk_status = 0;
+	spin_unlock_bh(&xprt->transport_lock);
 }
 
 /**
@@ -864,7 +864,7 @@
 		if (!xprt_connected(xprt))
 			task->tk_status = -ENOTCONN;
 		else if (!req->rq_received)
-			rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
+			rpc_sleep_on(&xprt->pending, task, xprt_timer);
 		spin_unlock_bh(&xprt->transport_lock);
 		return;
 	}
@@ -875,7 +875,7 @@
 	 */
 	task->tk_status = status;
 	if (status == -ECONNREFUSED)
-		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+		rpc_sleep_on(&xprt->sending, task, NULL);
 }
 
 static inline void do_xprt_reserve(struct rpc_task *task)
@@ -895,7 +895,7 @@
 	dprintk("RPC:       waiting for request slot\n");
 	task->tk_status = -EAGAIN;
 	task->tk_timeout = 0;
-	rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
+	rpc_sleep_on(&xprt->backlog, task, NULL);
 }
 
 /**
@@ -1052,6 +1052,11 @@
 	xprt->shutdown = 1;
 	del_timer_sync(&xprt->timer);
 
+	rpc_destroy_wait_queue(&xprt->binding);
+	rpc_destroy_wait_queue(&xprt->pending);
+	rpc_destroy_wait_queue(&xprt->sending);
+	rpc_destroy_wait_queue(&xprt->resend);
+	rpc_destroy_wait_queue(&xprt->backlog);
 	/*
 	 * Tear down transport state and free the rpc_xprt
 	 */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 30e7ac2..8bd3b0f 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1073,6 +1073,7 @@
 {
 	struct rpc_xprt *xprt;
 	read_descriptor_t rd_desc;
+	int read;
 
 	dprintk("RPC:       xs_tcp_data_ready...\n");
 
@@ -1084,8 +1085,10 @@
 
 	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
 	rd_desc.arg.data = xprt;
-	rd_desc.count = 65536;
-	tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+	do {
+		rd_desc.count = 65536;
+		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+	} while (read > 0);
 out:
 	read_unlock(&sk->sk_callback_lock);
 }