Merge tag 'nfs-for-4.6-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs

Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  Features:
   - Add support for multiple NFSv4.1 callbacks in flight
   - Initial patchset for RPC multipath support
   - Adapt RPC/RDMA to use the new completion queue API

  Bugfixes and cleanups:
   - nfs4: nfs4_ff_layout_prepare_ds should return NULL if connection failed
   - Cleanups to remove nfs_inode_dio_wait and nfs4_file_fsync
   - Fix RPC/RDMA credit accounting
   - Properly handle RDMA_ERROR replies
   - xprtrdma: Do not wait if ib_post_send() fails
   - xprtrdma: Segment head and tail XDR buffers on page boundaries
   - xprtrdma cleanups for dprintk, physical_op_map and unused macros"

* tag 'nfs-for-4.6-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (35 commits)
  nfs/blocklayout: make sure making a aligned read request
  nfs4: nfs4_ff_layout_prepare_ds should return NULL if connection failed
  nfs: remove nfs_inode_dio_wait
  nfs: remove nfs4_file_fsync
  xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs
  xprtrdma: Use an anonymous union in struct rpcrdma_mw
  xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs
  xprtrdma: Serialize credit accounting again
  xprtrdma: Properly handle RDMA_ERROR replies
  rpcrdma: Add RPCRDMA_HDRLEN_ERR
  xprtrdma: Do not wait if ib_post_send() fails
  xprtrdma: Segment head and tail XDR buffers on page boundaries
  xprtrdma: Clean up dprintk format string containing a newline
  xprtrdma: Clean up physical_op_map()
  xprtrdma: Clean up unused RPCRDMA_INLINE_PAD_THRESH macro
  NFS add callback_ops to nfs4_proc_bind_conn_to_session_callback
  pnfs/NFSv4.1: Add multipath capabilities to pNFS flexfiles servers over NFSv3
  SUNRPC: Allow addition of new transports to a struct rpc_clnt
  NFSv4.1: nfs4_proc_bind_conn_to_session must iterate over all connections
  SUNRPC: Make NFS swap work with multipath
  ...
diff --git a/fs/nfs/blocklayout/blocklayout.c b/fs/nfs/blocklayout/blocklayout.c
index ddd0138..8bc870e 100644
--- a/fs/nfs/blocklayout/blocklayout.c
+++ b/fs/nfs/blocklayout/blocklayout.c
@@ -743,7 +743,7 @@
 
 static bool
 is_aligned_req(struct nfs_pageio_descriptor *pgio,
-		struct nfs_page *req, unsigned int alignment)
+		struct nfs_page *req, unsigned int alignment, bool is_write)
 {
 	/*
 	 * Always accept buffered writes, higher layers take care of the
@@ -758,7 +758,8 @@
 	if (IS_ALIGNED(req->wb_bytes, alignment))
 		return true;
 
-	if (req_offset(req) + req->wb_bytes == i_size_read(pgio->pg_inode)) {
+	if (is_write &&
+	    (req_offset(req) + req->wb_bytes == i_size_read(pgio->pg_inode))) {
 		/*
 		 * If the write goes up to the inode size, just write
 		 * the full page.  Data past the inode size is
@@ -775,7 +776,7 @@
 static void
 bl_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
 {
-	if (!is_aligned_req(pgio, req, SECTOR_SIZE)) {
+	if (!is_aligned_req(pgio, req, SECTOR_SIZE, false)) {
 		nfs_pageio_reset_read_mds(pgio);
 		return;
 	}
@@ -791,7 +792,7 @@
 bl_pg_test_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
 		struct nfs_page *req)
 {
-	if (!is_aligned_req(pgio, req, SECTOR_SIZE))
+	if (!is_aligned_req(pgio, req, SECTOR_SIZE, false))
 		return 0;
 	return pnfs_generic_pg_test(pgio, prev, req);
 }
@@ -824,7 +825,7 @@
 {
 	u64 wb_size;
 
-	if (!is_aligned_req(pgio, req, PAGE_SIZE)) {
+	if (!is_aligned_req(pgio, req, PAGE_SIZE, true)) {
 		nfs_pageio_reset_write_mds(pgio);
 		return;
 	}
@@ -846,7 +847,7 @@
 bl_pg_test_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
 		 struct nfs_page *req)
 {
-	if (!is_aligned_req(pgio, req, PAGE_SIZE))
+	if (!is_aligned_req(pgio, req, PAGE_SIZE, true))
 		return 0;
 	return pnfs_generic_pg_test(pgio, prev, req);
 }
diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
index ff8195b..5fe1cec 100644
--- a/fs/nfs/callback.h
+++ b/fs/nfs/callback.h
@@ -37,10 +37,11 @@
 	OP_CB_ILLEGAL = 10044,
 };
 
+struct nfs4_slot;
 struct cb_process_state {
 	__be32			drc_status;
 	struct nfs_client	*clp;
-	u32			slotid;
+	struct nfs4_slot	*slot;
 	u32			minorversion;
 	struct net		*net;
 };
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index f0939d0..618ced3 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -354,47 +354,38 @@
  * a single outstanding callback request at a time.
  */
 static __be32
-validate_seqid(struct nfs4_slot_table *tbl, struct cb_sequenceargs * args)
+validate_seqid(const struct nfs4_slot_table *tbl, const struct nfs4_slot *slot,
+		const struct cb_sequenceargs * args)
 {
-	struct nfs4_slot *slot;
+	dprintk("%s enter. slotid %u seqid %u, slot table seqid: %u\n",
+		__func__, args->csa_slotid, args->csa_sequenceid, slot->seq_nr);
 
-	dprintk("%s enter. slotid %u seqid %u\n",
-		__func__, args->csa_slotid, args->csa_sequenceid);
-
-	if (args->csa_slotid >= NFS41_BC_MAX_CALLBACKS)
+	if (args->csa_slotid > tbl->server_highest_slotid)
 		return htonl(NFS4ERR_BADSLOT);
 
-	slot = tbl->slots + args->csa_slotid;
-	dprintk("%s slot table seqid: %u\n", __func__, slot->seq_nr);
-
-	/* Normal */
-	if (likely(args->csa_sequenceid == slot->seq_nr + 1))
-		goto out_ok;
-
 	/* Replay */
 	if (args->csa_sequenceid == slot->seq_nr) {
 		dprintk("%s seqid %u is a replay\n",
 			__func__, args->csa_sequenceid);
+		if (nfs4_test_locked_slot(tbl, slot->slot_nr))
+			return htonl(NFS4ERR_DELAY);
 		/* Signal process_op to set this error on next op */
 		if (args->csa_cachethis == 0)
 			return htonl(NFS4ERR_RETRY_UNCACHED_REP);
 
-		/* The ca_maxresponsesize_cached is 0 with no DRC */
-		else if (args->csa_cachethis == 1)
-			return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE);
+		/* Liar! We never allowed you to set csa_cachethis != 0 */
+		return htonl(NFS4ERR_SEQ_FALSE_RETRY);
 	}
 
 	/* Wraparound */
-	if (args->csa_sequenceid == 1 && (slot->seq_nr + 1) == 0) {
-		slot->seq_nr = 1;
-		goto out_ok;
-	}
+	if (unlikely(slot->seq_nr == 0xFFFFFFFFU)) {
+		if (args->csa_sequenceid == 1)
+			return htonl(NFS4_OK);
+	} else if (likely(args->csa_sequenceid == slot->seq_nr + 1))
+		return htonl(NFS4_OK);
 
 	/* Misordered request */
 	return htonl(NFS4ERR_SEQ_MISORDERED);
-out_ok:
-	tbl->highest_used_slotid = args->csa_slotid;
-	return htonl(NFS4_OK);
 }
 
 /*
@@ -473,6 +464,12 @@
 	tbl = &clp->cl_session->bc_slot_table;
 	slot = tbl->slots + args->csa_slotid;
 
+	/* Set up res before grabbing the spinlock */
+	memcpy(&res->csr_sessionid, &args->csa_sessionid,
+	       sizeof(res->csr_sessionid));
+	res->csr_sequenceid = args->csa_sequenceid;
+	res->csr_slotid = args->csa_slotid;
+
 	spin_lock(&tbl->slot_tbl_lock);
 	/* state manager is resetting the session */
 	if (test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state)) {
@@ -485,18 +482,26 @@
 		goto out_unlock;
 	}
 
-	memcpy(&res->csr_sessionid, &args->csa_sessionid,
-	       sizeof(res->csr_sessionid));
-	res->csr_sequenceid = args->csa_sequenceid;
-	res->csr_slotid = args->csa_slotid;
-	res->csr_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
-	res->csr_target_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
-
-	status = validate_seqid(tbl, args);
-	if (status)
+	status = htonl(NFS4ERR_BADSLOT);
+	slot = nfs4_lookup_slot(tbl, args->csa_slotid);
+	if (IS_ERR(slot))
 		goto out_unlock;
 
-	cps->slotid = args->csa_slotid;
+	res->csr_highestslotid = tbl->server_highest_slotid;
+	res->csr_target_highestslotid = tbl->target_highest_slotid;
+
+	status = validate_seqid(tbl, slot, args);
+	if (status)
+		goto out_unlock;
+	if (!nfs4_try_to_lock_slot(tbl, slot)) {
+		status = htonl(NFS4ERR_DELAY);
+		goto out_unlock;
+	}
+	cps->slot = slot;
+
+	/* The ca_maxresponsesize_cached is 0 with no DRC */
+	if (args->csa_cachethis != 0)
+		return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE);
 
 	/*
 	 * Check for pending referring calls.  If a match is found, a
@@ -513,7 +518,7 @@
 	 * If CB_SEQUENCE returns an error, then the state of the slot
 	 * (sequence ID, cached reply) MUST NOT change.
 	 */
-	slot->seq_nr++;
+	slot->seq_nr = args->csa_sequenceid;
 out_unlock:
 	spin_unlock(&tbl->slot_tbl_lock);
 
diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index 646cdac..976c906 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -752,7 +752,8 @@
 	return htonl(NFS_OK);
 }
 
-static void nfs4_callback_free_slot(struct nfs4_session *session)
+static void nfs4_callback_free_slot(struct nfs4_session *session,
+		struct nfs4_slot *slot)
 {
 	struct nfs4_slot_table *tbl = &session->bc_slot_table;
 
@@ -761,15 +762,17 @@
 	 * Let the state manager know callback processing done.
 	 * A single slot, so highest used slotid is either 0 or -1
 	 */
-	tbl->highest_used_slotid = NFS4_NO_SLOT;
+	nfs4_free_slot(tbl, slot);
 	nfs4_slot_tbl_drain_complete(tbl);
 	spin_unlock(&tbl->slot_tbl_lock);
 }
 
 static void nfs4_cb_free_slot(struct cb_process_state *cps)
 {
-	if (cps->slotid != NFS4_NO_SLOT)
-		nfs4_callback_free_slot(cps->clp->cl_session);
+	if (cps->slot) {
+		nfs4_callback_free_slot(cps->clp->cl_session, cps->slot);
+		cps->slot = NULL;
+	}
 }
 
 #else /* CONFIG_NFS_V4_1 */
@@ -893,7 +896,6 @@
 	struct cb_process_state cps = {
 		.drc_status = 0,
 		.clp = NULL,
-		.slotid = NFS4_NO_SLOT,
 		.net = SVC_NET(rqstp),
 	};
 	unsigned int nops = 0;
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index 748bb81..89bf093 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -233,7 +233,7 @@
  * nfs_file_write() that a write error occurred, and hence cause it to
  * fall back to doing a synchronous write.
  */
-int
+static int
 nfs_file_fsync_commit(struct file *file, loff_t start, loff_t end, int datasync)
 {
 	struct nfs_open_context *ctx = nfs_file_open_context(file);
@@ -263,9 +263,8 @@
 out:
 	return ret;
 }
-EXPORT_SYMBOL_GPL(nfs_file_fsync_commit);
 
-static int
+int
 nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
 	int ret;
@@ -273,13 +272,15 @@
 
 	trace_nfs_fsync_enter(inode);
 
-	nfs_inode_dio_wait(inode);
+	inode_dio_wait(inode);
 	do {
 		ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
 		if (ret != 0)
 			break;
 		inode_lock(inode);
 		ret = nfs_file_fsync_commit(file, start, end, datasync);
+		if (!ret)
+			ret = pnfs_sync_inode(inode, !!datasync);
 		inode_unlock(inode);
 		/*
 		 * If nfs_file_fsync_commit detected a server reboot, then
@@ -293,6 +294,7 @@
 	trace_nfs_fsync_exit(inode, ret);
 	return ret;
 }
+EXPORT_SYMBOL_GPL(nfs_file_fsync);
 
 /*
  * Decide whether a read/modify/write cycle may be more efficient
@@ -368,7 +370,7 @@
 	/*
 	 * Wait for O_DIRECT to complete
 	 */
-	nfs_inode_dio_wait(mapping->host);
+	inode_dio_wait(mapping->host);
 
 	page = grab_cache_page_write_begin(mapping, index, flags);
 	if (!page)
diff --git a/fs/nfs/flexfilelayout/flexfilelayoutdev.c b/fs/nfs/flexfilelayout/flexfilelayoutdev.c
index eb37046..add0e5a7 100644
--- a/fs/nfs/flexfilelayout/flexfilelayoutdev.c
+++ b/fs/nfs/flexfilelayout/flexfilelayoutdev.c
@@ -418,6 +418,8 @@
 				pnfs_error_mark_layout_for_return(ino, lseg);
 		} else
 			pnfs_error_mark_layout_for_return(ino, lseg);
+		ds = NULL;
+		goto out;
 	}
 out_update_creds:
 	if (ff_layout_update_mirror_cred(mirror, ds))
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index 86faecf..33d18c4 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -141,7 +141,7 @@
 
 int nfs_sync_inode(struct inode *inode)
 {
-	nfs_inode_dio_wait(inode);
+	inode_dio_wait(inode);
 	return nfs_wb_all(inode);
 }
 EXPORT_SYMBOL_GPL(nfs_sync_inode);
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index 9a547aa..565f813 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -358,7 +358,7 @@
 int nfs_rename(struct inode *, struct dentry *, struct inode *, struct dentry *);
 
 /* file.c */
-int nfs_file_fsync_commit(struct file *, loff_t, loff_t, int);
+int nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync);
 loff_t nfs_file_llseek(struct file *, loff_t, int);
 ssize_t nfs_file_read(struct kiocb *, struct iov_iter *);
 ssize_t nfs_file_splice_read(struct file *, loff_t *, struct pipe_inode_info *,
@@ -515,10 +515,6 @@
 /* direct.c */
 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
 			      struct nfs_direct_req *dreq);
-static inline void nfs_inode_dio_wait(struct inode *inode)
-{
-	inode_dio_wait(inode);
-}
 extern ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq);
 
 /* nfs4proc.c */
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index 57ca1c8..22c35ab 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -128,37 +128,6 @@
 	return vfs_fsync(file, 0);
 }
 
-static int
-nfs4_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
-{
-	int ret;
-	struct inode *inode = file_inode(file);
-
-	trace_nfs_fsync_enter(inode);
-
-	nfs_inode_dio_wait(inode);
-	do {
-		ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
-		if (ret != 0)
-			break;
-		inode_lock(inode);
-		ret = nfs_file_fsync_commit(file, start, end, datasync);
-		if (!ret)
-			ret = pnfs_sync_inode(inode, !!datasync);
-		inode_unlock(inode);
-		/*
-		 * If nfs_file_fsync_commit detected a server reboot, then
-		 * resend all dirty pages that might have been covered by
-		 * the NFS_CONTEXT_RESEND_WRITES flag
-		 */
-		start = 0;
-		end = LLONG_MAX;
-	} while (ret == -EAGAIN);
-
-	trace_nfs_fsync_exit(inode, ret);
-	return ret;
-}
-
 #ifdef CONFIG_NFS_V4_2
 static loff_t nfs4_file_llseek(struct file *filep, loff_t offset, int whence)
 {
@@ -266,7 +235,7 @@
 	.open		= nfs4_file_open,
 	.flush		= nfs4_file_flush,
 	.release	= nfs_file_release,
-	.fsync		= nfs4_file_fsync,
+	.fsync		= nfs_file_fsync,
 	.lock		= nfs_lock,
 	.flock		= nfs_flock,
 	.splice_read	= nfs_file_splice_read,
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 400a70b..327b8c3 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -6783,13 +6783,26 @@
 	return false;
 }
 
+static void
+nfs4_bind_one_conn_to_session_done(struct rpc_task *task, void *calldata)
+{
+}
+
+static const struct rpc_call_ops nfs4_bind_one_conn_to_session_ops = {
+	.rpc_call_done =  &nfs4_bind_one_conn_to_session_done,
+};
+
 /*
- * nfs4_proc_bind_conn_to_session()
+ * nfs4_proc_bind_one_conn_to_session()
  *
  * The 4.1 client currently uses the same TCP connection for the
  * fore and backchannel.
  */
-int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred)
+static
+int nfs4_proc_bind_one_conn_to_session(struct rpc_clnt *clnt,
+		struct rpc_xprt *xprt,
+		struct nfs_client *clp,
+		struct rpc_cred *cred)
 {
 	int status;
 	struct nfs41_bind_conn_to_session_args args = {
@@ -6804,6 +6817,14 @@
 		.rpc_resp = &res,
 		.rpc_cred = cred,
 	};
+	struct rpc_task_setup task_setup_data = {
+		.rpc_client = clnt,
+		.rpc_xprt = xprt,
+		.callback_ops = &nfs4_bind_one_conn_to_session_ops,
+		.rpc_message = &msg,
+		.flags = RPC_TASK_TIMEOUT,
+	};
+	struct rpc_task *task;
 
 	dprintk("--> %s\n", __func__);
 
@@ -6811,7 +6832,16 @@
 	if (!(clp->cl_session->flags & SESSION4_BACK_CHAN))
 		args.dir = NFS4_CDFC4_FORE;
 
-	status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
+	/* Do not set the backchannel flag unless this is clnt->cl_xprt */
+	if (xprt != rcu_access_pointer(clnt->cl_xprt))
+		args.dir = NFS4_CDFC4_FORE;
+
+	task = rpc_run_task(&task_setup_data);
+	if (!IS_ERR(task)) {
+		status = task->tk_status;
+		rpc_put_task(task);
+	} else
+		status = PTR_ERR(task);
 	trace_nfs4_bind_conn_to_session(clp, status);
 	if (status == 0) {
 		if (memcmp(res.sessionid.data,
@@ -6838,6 +6868,31 @@
 	return status;
 }
 
+struct rpc_bind_conn_calldata {
+	struct nfs_client *clp;
+	struct rpc_cred *cred;
+};
+
+static int
+nfs4_proc_bind_conn_to_session_callback(struct rpc_clnt *clnt,
+		struct rpc_xprt *xprt,
+		void *calldata)
+{
+	struct rpc_bind_conn_calldata *p = calldata;
+
+	return nfs4_proc_bind_one_conn_to_session(clnt, xprt, p->clp, p->cred);
+}
+
+int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred)
+{
+	struct rpc_bind_conn_calldata data = {
+		.clp = clp,
+		.cred = cred,
+	};
+	return rpc_clnt_iterate_for_each_xprt(clp->cl_rpcclient,
+			nfs4_proc_bind_conn_to_session_callback, &data);
+}
+
 /*
  * Minimum set of SP4_MACH_CRED operations from RFC 5661 in the enforce map
  * and operations we'd like to see to enable certain features in the allow map
@@ -7320,7 +7375,7 @@
 	args->bc_attrs.max_resp_sz = PAGE_SIZE;
 	args->bc_attrs.max_resp_sz_cached = 0;
 	args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
-	args->bc_attrs.max_reqs = 1;
+	args->bc_attrs.max_reqs = NFS41_BC_MAX_CALLBACKS;
 
 	dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u "
 		"max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n",
diff --git a/fs/nfs/nfs4session.c b/fs/nfs/nfs4session.c
index e23366e..332d06e 100644
--- a/fs/nfs/nfs4session.c
+++ b/fs/nfs/nfs4session.c
@@ -135,6 +135,43 @@
 	return ERR_PTR(-ENOMEM);
 }
 
+static void nfs4_lock_slot(struct nfs4_slot_table *tbl,
+		struct nfs4_slot *slot)
+{
+	u32 slotid = slot->slot_nr;
+
+	__set_bit(slotid, tbl->used_slots);
+	if (slotid > tbl->highest_used_slotid ||
+	    tbl->highest_used_slotid == NFS4_NO_SLOT)
+		tbl->highest_used_slotid = slotid;
+	slot->generation = tbl->generation;
+}
+
+/*
+ * nfs4_try_to_lock_slot - Given a slot try to allocate it
+ *
+ * Note: must be called with the slot_tbl_lock held.
+ */
+bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot)
+{
+	if (nfs4_test_locked_slot(tbl, slot->slot_nr))
+		return false;
+	nfs4_lock_slot(tbl, slot);
+	return true;
+}
+
+/*
+ * nfs4_lookup_slot - Find a slot but don't allocate it
+ *
+ * Note: must be called with the slot_tbl_lock held.
+ */
+struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid)
+{
+	if (slotid <= tbl->max_slotid)
+		return nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
+	return ERR_PTR(-E2BIG);
+}
+
 /*
  * nfs4_alloc_slot - efficiently look for a free slot
  *
@@ -153,18 +190,11 @@
 		__func__, tbl->used_slots[0], tbl->highest_used_slotid,
 		tbl->max_slotid + 1);
 	slotid = find_first_zero_bit(tbl->used_slots, tbl->max_slotid + 1);
-	if (slotid > tbl->max_slotid)
-		goto out;
-	ret = nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
-	if (IS_ERR(ret))
-		goto out;
-	__set_bit(slotid, tbl->used_slots);
-	if (slotid > tbl->highest_used_slotid ||
-			tbl->highest_used_slotid == NFS4_NO_SLOT)
-		tbl->highest_used_slotid = slotid;
-	ret->generation = tbl->generation;
-
-out:
+	if (slotid <= tbl->max_slotid) {
+		ret = nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
+		if (!IS_ERR(ret))
+			nfs4_lock_slot(tbl, ret);
+	}
 	dprintk("<-- %s used_slots=%04lx highest_used=%u slotid=%u\n",
 		__func__, tbl->used_slots[0], tbl->highest_used_slotid,
 		!IS_ERR(ret) ? ret->slot_nr : NFS4_NO_SLOT);
diff --git a/fs/nfs/nfs4session.h b/fs/nfs/nfs4session.h
index e3ea2c5..5b51298d 100644
--- a/fs/nfs/nfs4session.h
+++ b/fs/nfs/nfs4session.h
@@ -77,6 +77,8 @@
 		unsigned int max_reqs, const char *queue);
 extern void nfs4_shutdown_slot_table(struct nfs4_slot_table *tbl);
 extern struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl);
+extern struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid);
+extern bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot);
 extern void nfs4_free_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot);
 extern void nfs4_slot_tbl_drain_complete(struct nfs4_slot_table *tbl);
 bool nfs41_wake_and_assign_slot(struct nfs4_slot_table *tbl,
@@ -88,6 +90,12 @@
 	return !!test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state);
 }
 
+static inline bool nfs4_test_locked_slot(const struct nfs4_slot_table *tbl,
+		u32 slotid)
+{
+	return !!test_bit(slotid, tbl->used_slots);
+}
+
 #if defined(CONFIG_NFS_V4_1)
 extern void nfs41_set_target_slotid(struct nfs4_slot_table *tbl,
 		u32 target_highest_slotid);
diff --git a/fs/nfs/pnfs_nfs.c b/fs/nfs/pnfs_nfs.c
index 81ac648..4aaed89 100644
--- a/fs/nfs/pnfs_nfs.c
+++ b/fs/nfs/pnfs_nfs.c
@@ -606,12 +606,22 @@
 		dprintk("%s: DS %s: trying address %s\n",
 			__func__, ds->ds_remotestr, da->da_remotestr);
 
-		clp = get_v3_ds_connect(mds_srv->nfs_client,
+		if (!IS_ERR(clp)) {
+			struct xprt_create xprt_args = {
+				.ident = XPRT_TRANSPORT_TCP,
+				.net = clp->cl_net,
+				.dstaddr = (struct sockaddr *)&da->da_addr,
+				.addrlen = da->da_addrlen,
+				.servername = clp->cl_hostname,
+			};
+			/* Add this address as an alias */
+			rpc_clnt_add_xprt(clp->cl_rpcclient, &xprt_args,
+					rpc_clnt_test_and_add_xprt, NULL);
+		} else
+			clp = get_v3_ds_connect(mds_srv->nfs_client,
 					(struct sockaddr *)&da->da_addr,
 					da->da_addrlen, IPPROTO_TCP,
 					timeo, retrans, au_flavor);
-		if (!IS_ERR(clp))
-			break;
 	}
 
 	if (IS_ERR(clp)) {
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 131032f..9a7ddba 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -25,6 +25,7 @@
 #include <asm/signal.h>
 #include <linux/path.h>
 #include <net/ipv6.h>
+#include <linux/sunrpc/xprtmultipath.h>
 
 struct rpc_inode;
 
@@ -67,6 +68,7 @@
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	struct dentry		*cl_debugfs;	/* debugfs directory */
 #endif
+	struct rpc_xprt_iter	cl_xpi;
 };
 
 /*
@@ -139,7 +141,6 @@
 					struct rpc_xprt *xprt);
 struct rpc_clnt	*rpc_bind_new_program(struct rpc_clnt *,
 				const struct rpc_program *, u32);
-void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt);
 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *);
 struct rpc_clnt *rpc_clone_client_set_auth(struct rpc_clnt *,
 				rpc_authflavor_t);
@@ -181,6 +182,21 @@
 const char	*rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t);
 int		rpc_localaddr(struct rpc_clnt *, struct sockaddr *, size_t);
 
+int 		rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
+			int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
+			void *data);
+
+int 		rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
+			struct rpc_xprt_switch *xps,
+			struct rpc_xprt *xprt,
+			void *dummy);
+int		rpc_clnt_add_xprt(struct rpc_clnt *, struct xprt_create *,
+			int (*setup)(struct rpc_clnt *,
+				struct rpc_xprt_switch *,
+				struct rpc_xprt *,
+				void *),
+			void *data);
+
 const char *rpc_proc_name(const struct rpc_task *task);
 #endif /* __KERNEL__ */
 #endif /* _LINUX_SUNRPC_CLNT_H */
diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index f33c5a4..3b1ff38 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -93,6 +93,12 @@
 			__be32 rm_pempty[3];	/* 3 empty chunk lists */
 		} rm_padded;
 
+		struct {
+			__be32 rm_err;
+			__be32 rm_vers_low;
+			__be32 rm_vers_high;
+		} rm_error;
+
 		__be32 rm_chunks[0];	/* read, write and reply chunks */
 
 	} rm_body;
@@ -102,17 +108,13 @@
  * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
  */
 #define RPCRDMA_HDRLEN_MIN	(sizeof(__be32) * 7)
+#define RPCRDMA_HDRLEN_ERR	(sizeof(__be32) * 5)
 
 enum rpcrdma_errcode {
 	ERR_VERS = 1,
 	ERR_CHUNK = 2
 };
 
-struct rpcrdma_err_vers {
-	uint32_t rdma_vers_low;	/* Version range supported by peer */
-	uint32_t rdma_vers_high;
-};
-
 enum rpcrdma_proc {
 	RDMA_MSG = 0,		/* An RPC call or reply msg */
 	RDMA_NOMSG = 1,		/* An RPC call or reply msg - separate body */
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index d703f0e..05a1809 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -42,40 +42,43 @@
  */
 struct rpc_task {
 	atomic_t		tk_count;	/* Reference count */
+	int			tk_status;	/* result of last operation */
 	struct list_head	tk_task;	/* global list of tasks */
-	struct rpc_clnt *	tk_client;	/* RPC client */
-	struct rpc_rqst *	tk_rqstp;	/* RPC request */
-
-	/*
-	 * RPC call state
-	 */
-	struct rpc_message	tk_msg;		/* RPC call info */
 
 	/*
 	 * callback	to be executed after waking up
 	 * action	next procedure for async tasks
-	 * tk_ops	caller callbacks
 	 */
 	void			(*tk_callback)(struct rpc_task *);
 	void			(*tk_action)(struct rpc_task *);
-	const struct rpc_call_ops *tk_ops;
-	void *			tk_calldata;
 
 	unsigned long		tk_timeout;	/* timeout for rpc_sleep() */
 	unsigned long		tk_runstate;	/* Task run status */
-	struct workqueue_struct	*tk_workqueue;	/* Normally rpciod, but could
-						 * be any workqueue
-						 */
+
 	struct rpc_wait_queue 	*tk_waitqueue;	/* RPC wait queue we're on */
 	union {
 		struct work_struct	tk_work;	/* Async task work queue */
 		struct rpc_wait		tk_wait;	/* RPC wait */
 	} u;
 
+	/*
+	 * RPC call state
+	 */
+	struct rpc_message	tk_msg;		/* RPC call info */
+	void *			tk_calldata;	/* Caller private data */
+	const struct rpc_call_ops *tk_ops;	/* Caller callbacks */
+
+	struct rpc_clnt *	tk_client;	/* RPC client */
+	struct rpc_xprt *	tk_xprt;	/* Transport */
+
+	struct rpc_rqst *	tk_rqstp;	/* RPC request */
+
+	struct workqueue_struct	*tk_workqueue;	/* Normally rpciod, but could
+						 * be any workqueue
+						 */
 	ktime_t			tk_start;	/* RPC task init timestamp */
 
 	pid_t			tk_owner;	/* Process id for batching tasks */
-	int			tk_status;	/* result of last operation */
 	unsigned short		tk_flags;	/* misc flags */
 	unsigned short		tk_timeouts;	/* maj timeouts */
 
@@ -100,6 +103,7 @@
 struct rpc_task_setup {
 	struct rpc_task *task;
 	struct rpc_clnt *rpc_client;
+	struct rpc_xprt *rpc_xprt;
 	const struct rpc_message *rpc_message;
 	const struct rpc_call_ops *callback_ops;
 	void *callback_data;
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 69ef5b3..fb0d212 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -13,6 +13,7 @@
 #include <linux/socket.h>
 #include <linux/in.h>
 #include <linux/ktime.h>
+#include <linux/kref.h>
 #include <linux/sunrpc/sched.h>
 #include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/msg_prot.h>
@@ -166,7 +167,7 @@
 };
 
 struct rpc_xprt {
-	atomic_t		count;		/* Reference count */
+	struct kref		kref;		/* Reference count */
 	struct rpc_xprt_ops *	ops;		/* transport methods */
 
 	const struct rpc_timeout *timeout;	/* timeout parms */
@@ -197,6 +198,11 @@
 	unsigned int		bind_index;	/* bind function index */
 
 	/*
+	 * Multipath
+	 */
+	struct list_head	xprt_switch;
+
+	/*
 	 * Connection of transports
 	 */
 	unsigned long		bind_timeout,
@@ -256,6 +262,7 @@
 	struct dentry		*debugfs;		/* debugfs directory */
 	atomic_t		inject_disconnect;
 #endif
+	struct rcu_head		rcu;
 };
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
@@ -318,24 +325,13 @@
 void			xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
 void			xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
 void			xprt_release(struct rpc_task *task);
+struct rpc_xprt *	xprt_get(struct rpc_xprt *xprt);
 void			xprt_put(struct rpc_xprt *xprt);
 struct rpc_xprt *	xprt_alloc(struct net *net, size_t size,
 				unsigned int num_prealloc,
 				unsigned int max_req);
 void			xprt_free(struct rpc_xprt *);
 
-/**
- * xprt_get - return a reference to an RPC transport.
- * @xprt: pointer to the transport
- *
- */
-static inline struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
-{
-	if (atomic_inc_not_zero(&xprt->count))
-		return xprt;
-	return NULL;
-}
-
 static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
 {
 	return p + xprt->tsh_size;
diff --git a/include/linux/sunrpc/xprtmultipath.h b/include/linux/sunrpc/xprtmultipath.h
new file mode 100644
index 0000000..5a9acff
--- /dev/null
+++ b/include/linux/sunrpc/xprtmultipath.h
@@ -0,0 +1,69 @@
+/*
+ * RPC client multipathing definitions
+ *
+ * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
+ *
+ * Trond Myklebust <trond.myklebust@primarydata.com>
+ */
+#ifndef _NET_SUNRPC_XPRTMULTIPATH_H
+#define _NET_SUNRPC_XPRTMULTIPATH_H
+
+struct rpc_xprt_iter_ops;
+struct rpc_xprt_switch {
+	spinlock_t		xps_lock;
+	struct kref		xps_kref;
+
+	unsigned int		xps_nxprts;
+	struct list_head	xps_xprt_list;
+
+	struct net *		xps_net;
+
+	const struct rpc_xprt_iter_ops *xps_iter_ops;
+
+	struct rcu_head		xps_rcu;
+};
+
+struct rpc_xprt_iter {
+	struct rpc_xprt_switch __rcu *xpi_xpswitch;
+	struct rpc_xprt *	xpi_cursor;
+
+	const struct rpc_xprt_iter_ops *xpi_ops;
+};
+
+
+struct rpc_xprt_iter_ops {
+	void (*xpi_rewind)(struct rpc_xprt_iter *);
+	struct rpc_xprt *(*xpi_xprt)(struct rpc_xprt_iter *);
+	struct rpc_xprt *(*xpi_next)(struct rpc_xprt_iter *);
+};
+
+extern struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
+		gfp_t gfp_flags);
+
+extern struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps);
+extern void xprt_switch_put(struct rpc_xprt_switch *xps);
+
+extern void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps);
+
+extern void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
+		struct rpc_xprt *xprt);
+extern void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
+		struct rpc_xprt *xprt);
+
+extern void xprt_iter_init(struct rpc_xprt_iter *xpi,
+		struct rpc_xprt_switch *xps);
+
+extern void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
+		struct rpc_xprt_switch *xps);
+
+extern void xprt_iter_destroy(struct rpc_xprt_iter *xpi);
+
+extern struct rpc_xprt_switch *xprt_iter_xchg_switch(
+		struct rpc_xprt_iter *xpi,
+		struct rpc_xprt_switch *newswitch);
+
+extern struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi);
+extern struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi);
+extern struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi);
+
+#endif
diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index b7b279b..767190b 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -54,8 +54,6 @@
 
 #define RPCRDMA_DEF_INLINE  (1024)	/* default inline max */
 
-#define RPCRDMA_INLINE_PAD_THRESH  (512)/* payload threshold to pad (bytes) */
-
 /* Memory registration strategies, by number.
  * This is part of a kernel / user space API. Do not remove. */
 enum rpcrdma_memreg {
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index b512fbd..ea7ffa1 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -12,7 +12,8 @@
 	    svc.o svcsock.o svcauth.o svcauth_unix.o \
 	    addr.o rpcb_clnt.o timer.o xdr.o \
 	    sunrpc_syms.o cache.o rpc_pipe.o \
-	    svc_xprt.o
+	    svc_xprt.o \
+	    xprtmultipath.o
 sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o
 sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o
 sunrpc-$(CONFIG_PROC_FS) += stats.o
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index cabf586..8c6bc79 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1181,12 +1181,12 @@
 gss_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 {
 	struct gss_auth *gss_auth;
-	struct rpc_xprt *xprt = rcu_access_pointer(clnt->cl_xprt);
+	struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
 
 	while (clnt != clnt->cl_parent) {
 		struct rpc_clnt *parent = clnt->cl_parent;
 		/* Find the original parent for this transport */
-		if (rcu_access_pointer(parent->cl_xprt) != xprt)
+		if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
 			break;
 		clnt = parent;
 	}
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index b7f2104..7e0c9bf 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -354,6 +354,7 @@
 }
 
 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
+		struct rpc_xprt_switch *xps,
 		struct rpc_xprt *xprt,
 		struct rpc_clnt *parent)
 {
@@ -411,6 +412,8 @@
 	}
 
 	rpc_clnt_set_transport(clnt, xprt, timeout);
+	xprt_iter_init(&clnt->cl_xpi, xps);
+	xprt_switch_put(xps);
 
 	clnt->cl_rtt = &clnt->cl_rtt_default;
 	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
@@ -438,6 +441,7 @@
 out_err:
 	rpciod_down();
 out_no_rpciod:
+	xprt_switch_put(xps);
 	xprt_put(xprt);
 	return ERR_PTR(err);
 }
@@ -446,8 +450,13 @@
 					struct rpc_xprt *xprt)
 {
 	struct rpc_clnt *clnt = NULL;
+	struct rpc_xprt_switch *xps;
 
-	clnt = rpc_new_client(args, xprt, NULL);
+	xps = xprt_switch_alloc(xprt, GFP_KERNEL);
+	if (xps == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	clnt = rpc_new_client(args, xps, xprt, NULL);
 	if (IS_ERR(clnt))
 		return clnt;
 
@@ -564,6 +573,7 @@
 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
 					   struct rpc_clnt *clnt)
 {
+	struct rpc_xprt_switch *xps;
 	struct rpc_xprt *xprt;
 	struct rpc_clnt *new;
 	int err;
@@ -571,13 +581,17 @@
 	err = -ENOMEM;
 	rcu_read_lock();
 	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 	rcu_read_unlock();
-	if (xprt == NULL)
+	if (xprt == NULL || xps == NULL) {
+		xprt_put(xprt);
+		xprt_switch_put(xps);
 		goto out_err;
+	}
 	args->servername = xprt->servername;
 	args->nodename = clnt->cl_nodename;
 
-	new = rpc_new_client(args, xprt, clnt);
+	new = rpc_new_client(args, xps, xprt, clnt);
 	if (IS_ERR(new)) {
 		err = PTR_ERR(new);
 		goto out_err;
@@ -657,6 +671,7 @@
 {
 	const struct rpc_timeout *old_timeo;
 	rpc_authflavor_t pseudoflavor;
+	struct rpc_xprt_switch *xps, *oldxps;
 	struct rpc_xprt *xprt, *old;
 	struct rpc_clnt *parent;
 	int err;
@@ -668,10 +683,17 @@
 		return PTR_ERR(xprt);
 	}
 
+	xps = xprt_switch_alloc(xprt, GFP_KERNEL);
+	if (xps == NULL) {
+		xprt_put(xprt);
+		return -ENOMEM;
+	}
+
 	pseudoflavor = clnt->cl_auth->au_flavor;
 
 	old_timeo = clnt->cl_timeout;
 	old = rpc_clnt_set_transport(clnt, xprt, timeout);
+	oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
 
 	rpc_unregister_client(clnt);
 	__rpc_clnt_remove_pipedir(clnt);
@@ -697,20 +719,74 @@
 	synchronize_rcu();
 	if (parent != clnt)
 		rpc_release_client(parent);
+	xprt_switch_put(oldxps);
 	xprt_put(old);
 	dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
 	return 0;
 
 out_revert:
+	xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
 	rpc_clnt_set_transport(clnt, old, old_timeo);
 	clnt->cl_parent = parent;
 	rpc_client_register(clnt, pseudoflavor, NULL);
+	xprt_switch_put(xps);
 	xprt_put(xprt);
 	dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
 	return err;
 }
 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
 
+static
+int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
+{
+	struct rpc_xprt_switch *xps;
+
+	rcu_read_lock();
+	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
+	rcu_read_unlock();
+	if (xps == NULL)
+		return -EAGAIN;
+	xprt_iter_init_listall(xpi, xps);
+	xprt_switch_put(xps);
+	return 0;
+}
+
+/**
+ * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
+ * @clnt: pointer to client
+ * @fn: function to apply
+ * @data: void pointer to function data
+ *
+ * Iterates through the list of RPC transports currently attached to the
+ * client and applies the function fn(clnt, xprt, data).
+ *
+ * On error, the iteration stops, and the function returns the error value.
+ */
+int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
+		int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
+		void *data)
+{
+	struct rpc_xprt_iter xpi;
+	int ret;
+
+	ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
+	if (ret)
+		return ret;
+	for (;;) {
+		struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
+
+		if (!xprt)
+			break;
+		ret = fn(clnt, xprt, data);
+		xprt_put(xprt);
+		if (ret < 0)
+			break;
+	}
+	xprt_iter_destroy(&xpi);
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
+
 /*
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
@@ -783,6 +859,7 @@
 	rpc_free_iostats(clnt->cl_metrics);
 	clnt->cl_metrics = NULL;
 	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
+	xprt_iter_destroy(&clnt->cl_xpi);
 	rpciod_down();
 	rpc_free_clid(clnt);
 	kfree(clnt);
@@ -868,6 +945,7 @@
 void rpc_task_release_client(struct rpc_task *task)
 {
 	struct rpc_clnt *clnt = task->tk_client;
+	struct rpc_xprt *xprt = task->tk_xprt;
 
 	if (clnt != NULL) {
 		/* Remove from client task list */
@@ -878,13 +956,22 @@
 
 		rpc_release_client(clnt);
 	}
+
+	if (xprt != NULL) {
+		task->tk_xprt = NULL;
+
+		xprt_put(xprt);
+	}
 }
 
 static
 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 {
+
 	if (clnt != NULL) {
 		rpc_task_release_client(task);
+		if (task->tk_xprt == NULL)
+			task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
 		task->tk_client = clnt;
 		atomic_inc(&clnt->cl_count);
 		if (clnt->cl_softrtry)
@@ -900,14 +987,6 @@
 	}
 }
 
-void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
-{
-	rpc_task_release_client(task);
-	rpc_task_set_client(task, clnt);
-}
-EXPORT_SYMBOL_GPL(rpc_task_reset_client);
-
-
 static void
 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
 {
@@ -2104,11 +2183,9 @@
 	}
 	if (RPC_IS_SOFT(task)) {
 		if (clnt->cl_chatty) {
-			rcu_read_lock();
 			printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
 				clnt->cl_program->name,
-				rcu_dereference(clnt->cl_xprt)->servername);
-			rcu_read_unlock();
+				task->tk_xprt->servername);
 		}
 		if (task->tk_flags & RPC_TASK_TIMEOUT)
 			rpc_exit(task, -ETIMEDOUT);
@@ -2120,11 +2197,9 @@
 	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
 		task->tk_flags |= RPC_CALL_MAJORSEEN;
 		if (clnt->cl_chatty) {
-			rcu_read_lock();
 			printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
 			clnt->cl_program->name,
-			rcu_dereference(clnt->cl_xprt)->servername);
-			rcu_read_unlock();
+			task->tk_xprt->servername);
 		}
 	}
 	rpc_force_rebind(clnt);
@@ -2154,11 +2229,9 @@
 
 	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
 		if (clnt->cl_chatty) {
-			rcu_read_lock();
 			printk(KERN_NOTICE "%s: server %s OK\n",
 				clnt->cl_program->name,
-				rcu_dereference(clnt->cl_xprt)->servername);
-			rcu_read_unlock();
+				task->tk_xprt->servername);
 		}
 		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
 	}
@@ -2312,11 +2385,9 @@
 			task->tk_action = call_bind;
 			goto out_retry;
 		case RPC_AUTH_TOOWEAK:
-			rcu_read_lock();
 			printk(KERN_NOTICE "RPC: server %s requires stronger "
 			       "authentication.\n",
-			       rcu_dereference(clnt->cl_xprt)->servername);
-			rcu_read_unlock();
+			       task->tk_xprt->servername);
 			break;
 		default:
 			dprintk("RPC: %5u %s: unknown auth error: %x\n",
@@ -2341,27 +2412,27 @@
 	case RPC_SUCCESS:
 		return p;
 	case RPC_PROG_UNAVAIL:
-		dprintk_rcu("RPC: %5u %s: program %u is unsupported "
+		dprintk("RPC: %5u %s: program %u is unsupported "
 				"by server %s\n", task->tk_pid, __func__,
 				(unsigned int)clnt->cl_prog,
-				rcu_dereference(clnt->cl_xprt)->servername);
+				task->tk_xprt->servername);
 		error = -EPFNOSUPPORT;
 		goto out_err;
 	case RPC_PROG_MISMATCH:
-		dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
+		dprintk("RPC: %5u %s: program %u, version %u unsupported "
 				"by server %s\n", task->tk_pid, __func__,
 				(unsigned int)clnt->cl_prog,
 				(unsigned int)clnt->cl_vers,
-				rcu_dereference(clnt->cl_xprt)->servername);
+				task->tk_xprt->servername);
 		error = -EPROTONOSUPPORT;
 		goto out_err;
 	case RPC_PROC_UNAVAIL:
-		dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
+		dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
 				"version %u on server %s\n",
 				task->tk_pid, __func__,
 				rpc_proc_name(task),
 				clnt->cl_prog, clnt->cl_vers,
-				rcu_dereference(clnt->cl_xprt)->servername);
+				task->tk_xprt->servername);
 		error = -EOPNOTSUPP;
 		goto out_err;
 	case RPC_GARBAGE_ARGS:
@@ -2421,7 +2492,10 @@
 	return err;
 }
 
-struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
+static
+struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
+		struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
+		const struct rpc_call_ops *ops, void *data)
 {
 	struct rpc_message msg = {
 		.rpc_proc = &rpcproc_null,
@@ -2429,14 +2503,140 @@
 	};
 	struct rpc_task_setup task_setup_data = {
 		.rpc_client = clnt,
+		.rpc_xprt = xprt,
 		.rpc_message = &msg,
-		.callback_ops = &rpc_default_ops,
+		.callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
+		.callback_data = data,
 		.flags = flags,
 	};
+
 	return rpc_run_task(&task_setup_data);
 }
+
+struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
+{
+	return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
+}
 EXPORT_SYMBOL_GPL(rpc_call_null);
 
+struct rpc_cb_add_xprt_calldata {
+	struct rpc_xprt_switch *xps;
+	struct rpc_xprt *xprt;
+};
+
+static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
+{
+	struct rpc_cb_add_xprt_calldata *data = calldata;
+
+	if (task->tk_status == 0)
+		rpc_xprt_switch_add_xprt(data->xps, data->xprt);
+}
+
+static void rpc_cb_add_xprt_release(void *calldata)
+{
+	struct rpc_cb_add_xprt_calldata *data = calldata;
+
+	xprt_put(data->xprt);
+	xprt_switch_put(data->xps);
+	kfree(data);
+}
+
+const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
+	.rpc_call_done = rpc_cb_add_xprt_done,
+	.rpc_release = rpc_cb_add_xprt_release,
+};
+
+/**
+ * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
+ * @clnt: pointer to struct rpc_clnt
+ * @xps: pointer to struct rpc_xprt_switch,
+ * @xprt: pointer struct rpc_xprt
+ * @dummy: unused
+ */
+int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
+		struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
+		void *dummy)
+{
+	struct rpc_cb_add_xprt_calldata *data;
+	struct rpc_cred *cred;
+	struct rpc_task *task;
+
+	data = kmalloc(sizeof(*data), GFP_NOFS);
+	if (!data)
+		return -ENOMEM;
+	data->xps = xprt_switch_get(xps);
+	data->xprt = xprt_get(xprt);
+
+	cred = authnull_ops.lookup_cred(NULL, NULL, 0);
+	task = rpc_call_null_helper(clnt, xprt, cred,
+			RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC,
+			&rpc_cb_add_xprt_call_ops, data);
+	put_rpccred(cred);
+	if (IS_ERR(task))
+		return PTR_ERR(task);
+	rpc_put_task(task);
+	return 1;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
+
+/**
+ * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
+ * @clnt: pointer to struct rpc_clnt
+ * @xprtargs: pointer to struct xprt_create
+ * @setup: callback to test and/or set up the connection
+ * @data: pointer to setup function data
+ *
+ * Creates a new transport using the parameters set in args and
+ * adds it to clnt.
+ * If ping is set, then test that connectivity succeeds before
+ * adding the new transport.
+ *
+ */
+int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
+		struct xprt_create *xprtargs,
+		int (*setup)(struct rpc_clnt *,
+			struct rpc_xprt_switch *,
+			struct rpc_xprt *,
+			void *),
+		void *data)
+{
+	struct rpc_xprt_switch *xps;
+	struct rpc_xprt *xprt;
+	unsigned char resvport;
+	int ret = 0;
+
+	rcu_read_lock();
+	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
+	xprt = xprt_iter_xprt(&clnt->cl_xpi);
+	if (xps == NULL || xprt == NULL) {
+		rcu_read_unlock();
+		return -EAGAIN;
+	}
+	resvport = xprt->resvport;
+	rcu_read_unlock();
+
+	xprt = xprt_create_transport(xprtargs);
+	if (IS_ERR(xprt)) {
+		ret = PTR_ERR(xprt);
+		goto out_put_switch;
+	}
+	xprt->resvport = resvport;
+
+	rpc_xprt_switch_set_roundrobin(xps);
+	if (setup) {
+		ret = setup(clnt, xps, xprt, data);
+		if (ret != 0)
+			goto out_put_xprt;
+	}
+	rpc_xprt_switch_add_xprt(xps, xprt);
+out_put_xprt:
+	xprt_put(xprt);
+out_put_switch:
+	xprt_switch_put(xps);
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
+
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 static void rpc_show_header(void)
 {
@@ -2483,57 +2683,39 @@
 #endif
 
 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+static int
+rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
+		struct rpc_xprt *xprt,
+		void *dummy)
+{
+	return xprt_enable_swap(xprt);
+}
+
 int
 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
 {
-	int ret = 0;
-	struct rpc_xprt	*xprt;
-
-	if (atomic_inc_return(&clnt->cl_swapper) == 1) {
-retry:
-		rcu_read_lock();
-		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
-		rcu_read_unlock();
-		if (!xprt) {
-			/*
-			 * If we didn't get a reference, then we likely are
-			 * racing with a migration event. Wait for a grace
-			 * period and try again.
-			 */
-			synchronize_rcu();
-			goto retry;
-		}
-
-		ret = xprt_enable_swap(xprt);
-		xprt_put(xprt);
-	}
-	return ret;
+	if (atomic_inc_return(&clnt->cl_swapper) == 1)
+		return rpc_clnt_iterate_for_each_xprt(clnt,
+				rpc_clnt_swap_activate_callback, NULL);
+	return 0;
 }
 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
 
+static int
+rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
+		struct rpc_xprt *xprt,
+		void *dummy)
+{
+	xprt_disable_swap(xprt);
+	return 0;
+}
+
 void
 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
 {
-	struct rpc_xprt	*xprt;
-
-	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) {
-retry:
-		rcu_read_lock();
-		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
-		rcu_read_unlock();
-		if (!xprt) {
-			/*
-			 * If we didn't get a reference, then we likely are
-			 * racing with a migration event. Wait for a grace
-			 * period and try again.
-			 */
-			synchronize_rcu();
-			goto retry;
-		}
-
-		xprt_disable_swap(xprt);
-		xprt_put(xprt);
-	}
+	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
+		rpc_clnt_iterate_for_each_xprt(clnt,
+				rpc_clnt_swap_deactivate_callback, NULL);
 }
 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
 #endif /* CONFIG_SUNRPC_SWAP */
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index cf5770d..5b30603 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -648,10 +648,10 @@
 static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt)
 {
 	struct rpc_clnt *parent = clnt->cl_parent;
-	struct rpc_xprt *xprt = rcu_dereference(clnt->cl_xprt);
+	struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
 
 	while (parent != clnt) {
-		if (rcu_dereference(parent->cl_xprt) != xprt)
+		if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
 			break;
 		if (clnt->cl_autobind)
 			break;
@@ -683,11 +683,9 @@
 	int status;
 
 	rcu_read_lock();
-	do {
-		clnt = rpcb_find_transport_owner(task->tk_client);
-		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
-	} while (xprt == NULL);
+	clnt = rpcb_find_transport_owner(task->tk_client);
 	rcu_read_unlock();
+	xprt = xprt_get(task->tk_xprt);
 
 	dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
 		task->tk_pid, __func__,
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 73ad57a..fcfd48d 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -909,6 +909,8 @@
 	/* Initialize workqueue for async tasks */
 	task->tk_workqueue = task_setup_data->workqueue;
 
+	task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);
+
 	if (task->tk_ops->rpc_call_prepare != NULL)
 		task->tk_action = rpc_prepare_task;
 
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 37edea6..216a138 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -48,6 +48,7 @@
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/metrics.h>
 #include <linux/sunrpc/bc_xprt.h>
+#include <linux/rcupdate.h>
 
 #include <trace/events/sunrpc.h>
 
@@ -1166,7 +1167,7 @@
 {
 	put_net(xprt->xprt_net);
 	xprt_free_all_slots(xprt);
-	kfree(xprt);
+	kfree_rcu(xprt, rcu);
 }
 EXPORT_SYMBOL_GPL(xprt_free);
 
@@ -1180,7 +1181,7 @@
  */
 void xprt_reserve(struct rpc_task *task)
 {
-	struct rpc_xprt	*xprt;
+	struct rpc_xprt *xprt = task->tk_xprt;
 
 	task->tk_status = 0;
 	if (task->tk_rqstp != NULL)
@@ -1188,11 +1189,8 @@
 
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
-	rcu_read_lock();
-	xprt = rcu_dereference(task->tk_client->cl_xprt);
 	if (!xprt_throttle_congested(xprt, task))
 		xprt->ops->alloc_slot(xprt, task);
-	rcu_read_unlock();
 }
 
 /**
@@ -1206,7 +1204,7 @@
  */
 void xprt_retry_reserve(struct rpc_task *task)
 {
-	struct rpc_xprt	*xprt;
+	struct rpc_xprt *xprt = task->tk_xprt;
 
 	task->tk_status = 0;
 	if (task->tk_rqstp != NULL)
@@ -1214,10 +1212,7 @@
 
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
-	rcu_read_lock();
-	xprt = rcu_dereference(task->tk_client->cl_xprt);
 	xprt->ops->alloc_slot(xprt, task);
-	rcu_read_unlock();
 }
 
 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
@@ -1264,11 +1259,9 @@
 
 	if (req == NULL) {
 		if (task->tk_client) {
-			rcu_read_lock();
-			xprt = rcu_dereference(task->tk_client->cl_xprt);
+			xprt = task->tk_xprt;
 			if (xprt->snd_task == task)
 				xprt_release_write(xprt, task);
-			rcu_read_unlock();
 		}
 		return;
 	}
@@ -1307,7 +1300,7 @@
 
 static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 {
-	atomic_set(&xprt->count, 1);
+	kref_init(&xprt->kref);
 
 	spin_lock_init(&xprt->transport_lock);
 	spin_lock_init(&xprt->reserve_lock);
@@ -1318,6 +1311,7 @@
 	spin_lock_init(&xprt->bc_pa_lock);
 	INIT_LIST_HEAD(&xprt->bc_pa_list);
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
+	INIT_LIST_HEAD(&xprt->xprt_switch);
 
 	xprt->last_used = jiffies;
 	xprt->cwnd = RPC_INITCWND;
@@ -1415,6 +1409,24 @@
 	xprt->ops->destroy(xprt);
 }
 
+static void xprt_destroy_kref(struct kref *kref)
+{
+	xprt_destroy(container_of(kref, struct rpc_xprt, kref));
+}
+
+/**
+ * xprt_get - return a reference to an RPC transport.
+ * @xprt: pointer to the transport
+ *
+ */
+struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
+{
+	if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
+		return xprt;
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(xprt_get);
+
 /**
  * xprt_put - release a reference to an RPC transport.
  * @xprt: pointer to the transport
@@ -1422,7 +1434,7 @@
  */
 void xprt_put(struct rpc_xprt *xprt)
 {
-	if (atomic_dec_and_test(&xprt->count))
-		xprt_destroy(xprt);
+	if (xprt != NULL)
+		kref_put(&xprt->kref, xprt_destroy_kref);
 }
 EXPORT_SYMBOL_GPL(xprt_put);
diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c
new file mode 100644
index 0000000..e7fd769
--- /dev/null
+++ b/net/sunrpc/xprtmultipath.c
@@ -0,0 +1,475 @@
+/*
+ * Multipath support for RPC
+ *
+ * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
+ *
+ * Trond Myklebust <trond.myklebust@primarydata.com>
+ *
+ */
+#include <linux/types.h>
+#include <linux/kref.h>
+#include <linux/list.h>
+#include <linux/rcupdate.h>
+#include <linux/rculist.h>
+#include <linux/slab.h>
+#include <asm/cmpxchg.h>
+#include <linux/spinlock.h>
+#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/xprtmultipath.h>
+
+typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head,
+		const struct rpc_xprt *cur);
+
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin;
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall;
+
+static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
+		struct rpc_xprt *xprt)
+{
+	if (unlikely(xprt_get(xprt) == NULL))
+		return;
+	list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list);
+	smp_wmb();
+	if (xps->xps_nxprts == 0)
+		xps->xps_net = xprt->xprt_net;
+	xps->xps_nxprts++;
+}
+
+/**
+ * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ *
+ * Adds xprt to the end of the list of struct rpc_xprt in xps.
+ */
+void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
+		struct rpc_xprt *xprt)
+{
+	if (xprt == NULL)
+		return;
+	spin_lock(&xps->xps_lock);
+	if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
+		xprt_switch_add_xprt_locked(xps, xprt);
+	spin_unlock(&xps->xps_lock);
+}
+
+static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
+		struct rpc_xprt *xprt)
+{
+	if (unlikely(xprt == NULL))
+		return;
+	xps->xps_nxprts--;
+	if (xps->xps_nxprts == 0)
+		xps->xps_net = NULL;
+	smp_wmb();
+	list_del_rcu(&xprt->xprt_switch);
+}
+
+/**
+ * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ *
+ * Removes xprt from the list of struct rpc_xprt in xps.
+ */
+void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
+		struct rpc_xprt *xprt)
+{
+	spin_lock(&xps->xps_lock);
+	xprt_switch_remove_xprt_locked(xps, xprt);
+	spin_unlock(&xps->xps_lock);
+	xprt_put(xprt);
+}
+
+/**
+ * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ * @gfp_flags: allocation flags
+ *
+ * On success, returns an initialised struct rpc_xprt_switch, containing
+ * the entry xprt. Returns NULL on failure.
+ */
+struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
+		gfp_t gfp_flags)
+{
+	struct rpc_xprt_switch *xps;
+
+	xps = kmalloc(sizeof(*xps), gfp_flags);
+	if (xps != NULL) {
+		spin_lock_init(&xps->xps_lock);
+		kref_init(&xps->xps_kref);
+		xps->xps_nxprts = 0;
+		INIT_LIST_HEAD(&xps->xps_xprt_list);
+		xps->xps_iter_ops = &rpc_xprt_iter_singular;
+		xprt_switch_add_xprt_locked(xps, xprt);
+	}
+
+	return xps;
+}
+
+static void xprt_switch_free_entries(struct rpc_xprt_switch *xps)
+{
+	spin_lock(&xps->xps_lock);
+	while (!list_empty(&xps->xps_xprt_list)) {
+		struct rpc_xprt *xprt;
+
+		xprt = list_first_entry(&xps->xps_xprt_list,
+				struct rpc_xprt, xprt_switch);
+		xprt_switch_remove_xprt_locked(xps, xprt);
+		spin_unlock(&xps->xps_lock);
+		xprt_put(xprt);
+		spin_lock(&xps->xps_lock);
+	}
+	spin_unlock(&xps->xps_lock);
+}
+
+static void xprt_switch_free(struct kref *kref)
+{
+	struct rpc_xprt_switch *xps = container_of(kref,
+			struct rpc_xprt_switch, xps_kref);
+
+	xprt_switch_free_entries(xps);
+	kfree_rcu(xps, xps_rcu);
+}
+
+/**
+ * xprt_switch_get - Return a reference to a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Returns a reference to xps unless the refcount is already zero.
+ */
+struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps)
+{
+	if (xps != NULL && kref_get_unless_zero(&xps->xps_kref))
+		return xps;
+	return NULL;
+}
+
+/**
+ * xprt_switch_put - Release a reference to a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Release the reference to xps, and free it once the refcount is zero.
+ */
+void xprt_switch_put(struct rpc_xprt_switch *xps)
+{
+	if (xps != NULL)
+		kref_put(&xps->xps_kref, xprt_switch_free);
+}
+
+/**
+ * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Sets a round-robin default policy for iterators acting on xps.
+ */
+void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps)
+{
+	if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin)
+		WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin);
+}
+
+static
+const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi)
+{
+	if (xpi->xpi_ops != NULL)
+		return xpi->xpi_ops;
+	return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops;
+}
+
+static
+void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi)
+{
+}
+
+static
+void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi)
+{
+	WRITE_ONCE(xpi->xpi_cursor, NULL);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
+{
+	return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch);
+}
+
+static
+struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi)
+{
+	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+
+	if (xps == NULL)
+		return NULL;
+	return xprt_switch_find_first_entry(&xps->xps_xprt_list);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head,
+		const struct rpc_xprt *cur)
+{
+	struct rpc_xprt *pos;
+
+	list_for_each_entry_rcu(pos, head, xprt_switch) {
+		if (cur == pos)
+			return pos;
+	}
+	return NULL;
+}
+
+static
+struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
+{
+	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+	struct list_head *head;
+
+	if (xps == NULL)
+		return NULL;
+	head = &xps->xps_xprt_list;
+	if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2)
+		return xprt_switch_find_first_entry(head);
+	return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
+		const struct rpc_xprt *cur)
+{
+	struct rpc_xprt *pos, *prev = NULL;
+
+	list_for_each_entry_rcu(pos, head, xprt_switch) {
+		if (cur == prev)
+			return pos;
+		prev = pos;
+	}
+	return NULL;
+}
+
+static
+struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head,
+		struct rpc_xprt **cursor,
+		xprt_switch_find_xprt_t find_next)
+{
+	struct rpc_xprt *cur, *pos, *old;
+
+	cur = READ_ONCE(*cursor);
+	for (;;) {
+		old = cur;
+		pos = find_next(head, old);
+		if (pos == NULL)
+			break;
+		cur = cmpxchg_relaxed(cursor, old, pos);
+		if (cur == old)
+			break;
+	}
+	return pos;
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
+		xprt_switch_find_xprt_t find_next)
+{
+	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+	struct list_head *head;
+
+	if (xps == NULL)
+		return NULL;
+	head = &xps->xps_xprt_list;
+	if (xps->xps_nxprts < 2)
+		return xprt_switch_find_first_entry(head);
+	return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head,
+		const struct rpc_xprt *cur)
+{
+	struct rpc_xprt *ret;
+
+	ret = xprt_switch_find_next_entry(head, cur);
+	if (ret != NULL)
+		return ret;
+	return xprt_switch_find_first_entry(head);
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
+{
+	return xprt_iter_next_entry_multiple(xpi,
+			xprt_switch_find_next_entry_roundrobin);
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
+{
+	return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry);
+}
+
+/*
+ * xprt_iter_rewind - Resets the xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Resets xpi to ensure that it points to the first entry in the list
+ * of transports.
+ */
+static
+void xprt_iter_rewind(struct rpc_xprt_iter *xpi)
+{
+	rcu_read_lock();
+	xprt_iter_ops(xpi)->xpi_rewind(xpi);
+	rcu_read_unlock();
+}
+
+static void __xprt_iter_init(struct rpc_xprt_iter *xpi,
+		struct rpc_xprt_switch *xps,
+		const struct rpc_xprt_iter_ops *ops)
+{
+	rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps));
+	xpi->xpi_cursor = NULL;
+	xpi->xpi_ops = ops;
+}
+
+/**
+ * xprt_iter_init - Initialise an xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to rpc_xprt_switch
+ *
+ * Initialises the iterator to use the default iterator ops
+ * as set in xps. This function is mainly intended for internal
+ * use in the rpc_client.
+ */
+void xprt_iter_init(struct rpc_xprt_iter *xpi,
+		struct rpc_xprt_switch *xps)
+{
+	__xprt_iter_init(xpi, xps, NULL);
+}
+
+/**
+ * xprt_iter_init_listall - Initialise an xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to rpc_xprt_switch
+ *
+ * Initialises the iterator to iterate once through the entire list
+ * of entries in xps.
+ */
+void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
+		struct rpc_xprt_switch *xps)
+{
+	__xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall);
+}
+
+/**
+ * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to a new rpc_xprt_switch or NULL
+ *
+ * Swaps out the existing xpi->xpi_xpswitch with a new value.
+ */
+struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi,
+		struct rpc_xprt_switch *newswitch)
+{
+	struct rpc_xprt_switch __rcu *oldswitch;
+
+	/* Atomically swap out the old xpswitch */
+	oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch));
+	if (newswitch != NULL)
+		xprt_iter_rewind(xpi);
+	return rcu_dereference_protected(oldswitch, true);
+}
+
+/**
+ * xprt_iter_destroy - Destroys the xprt iterator
+ * @xpi pointer to rpc_xprt_iter
+ */
+void xprt_iter_destroy(struct rpc_xprt_iter *xpi)
+{
+	xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL));
+}
+
+/**
+ * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a pointer to the struct rpc_xprt that is currently
+ * pointed to by the cursor.
+ * Caller must be holding rcu_read_lock().
+ */
+struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi)
+{
+	WARN_ON_ONCE(!rcu_read_lock_held());
+	return xprt_iter_ops(xpi)->xpi_xprt(xpi);
+}
+
+static
+struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi,
+		struct rpc_xprt *(*fn)(struct rpc_xprt_iter *))
+{
+	struct rpc_xprt *ret;
+
+	do {
+		ret = fn(xpi);
+		if (ret == NULL)
+			break;
+		ret = xprt_get(ret);
+	} while (ret == NULL);
+	return ret;
+}
+
+/**
+ * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a reference to the struct rpc_xprt that is currently
+ * pointed to by the cursor.
+ */
+struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi)
+{
+	struct rpc_xprt *xprt;
+
+	rcu_read_lock();
+	xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt);
+	rcu_read_unlock();
+	return xprt;
+}
+
+/**
+ * xprt_iter_get_next - Returns the next rpc_xprt following the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a reference to the struct rpc_xprt that immediately follows the
+ * entry pointed to by the cursor.
+ */
+struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi)
+{
+	struct rpc_xprt *xprt;
+
+	rcu_read_lock();
+	xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next);
+	rcu_read_unlock();
+	return xprt;
+}
+
+/* Policy for always returning the first entry in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = {
+	.xpi_rewind = xprt_iter_no_rewind,
+	.xpi_xprt = xprt_iter_first_entry,
+	.xpi_next = xprt_iter_first_entry,
+};
+
+/* Policy for round-robin iteration of entries in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = {
+	.xpi_rewind = xprt_iter_default_rewind,
+	.xpi_xprt = xprt_iter_current_entry,
+	.xpi_next = xprt_iter_next_entry_roundrobin,
+};
+
+/* Policy for once-through iteration of entries in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = {
+	.xpi_rewind = xprt_iter_default_rewind,
+	.xpi_xprt = xprt_iter_current_entry,
+	.xpi_next = xprt_iter_next_entry_all,
+};
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index c14f3a4..b289e10 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -80,13 +80,13 @@
 		if (!r)
 			goto out;
 
-		r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
-					     sizeof(u64), GFP_KERNEL);
-		if (!r->r.fmr.physaddrs)
+		r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
+					   sizeof(u64), GFP_KERNEL);
+		if (!r->fmr.physaddrs)
 			goto out_free;
 
-		r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
-		if (IS_ERR(r->r.fmr.fmr))
+		r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+		if (IS_ERR(r->fmr.fmr))
 			goto out_fmr_err;
 
 		list_add(&r->mw_list, &buf->rb_mws);
@@ -95,9 +95,9 @@
 	return 0;
 
 out_fmr_err:
-	rc = PTR_ERR(r->r.fmr.fmr);
+	rc = PTR_ERR(r->fmr.fmr);
 	dprintk("RPC:       %s: ib_alloc_fmr status %i\n", __func__, rc);
-	kfree(r->r.fmr.physaddrs);
+	kfree(r->fmr.physaddrs);
 out_free:
 	kfree(r);
 out:
@@ -109,7 +109,7 @@
 {
 	LIST_HEAD(l);
 
-	list_add(&r->r.fmr.fmr->list, &l);
+	list_add(&r->fmr.fmr->list, &l);
 	return ib_unmap_fmr(&l);
 }
 
@@ -148,7 +148,7 @@
 		nsegs = RPCRDMA_MAX_FMR_SGES;
 	for (i = 0; i < nsegs;) {
 		rpcrdma_map_one(device, seg, direction);
-		mw->r.fmr.physaddrs[i] = seg->mr_dma;
+		mw->fmr.physaddrs[i] = seg->mr_dma;
 		len += seg->mr_len;
 		++seg;
 		++i;
@@ -158,13 +158,13 @@
 			break;
 	}
 
-	rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
+	rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
 			     i, seg1->mr_dma);
 	if (rc)
 		goto out_maperr;
 
 	seg1->rl_mw = mw;
-	seg1->mr_rkey = mw->r.fmr.fmr->rkey;
+	seg1->mr_rkey = mw->fmr.fmr->rkey;
 	seg1->mr_base = seg1->mr_dma + pageoff;
 	seg1->mr_nsegs = i;
 	seg1->mr_len = len;
@@ -219,7 +219,7 @@
 		seg = &req->rl_segments[i];
 		mw = seg->rl_mw;
 
-		list_add(&mw->r.fmr.fmr->list, &unmap_list);
+		list_add(&mw->fmr.fmr->list, &unmap_list);
 
 		i += seg->mr_nsegs;
 	}
@@ -281,9 +281,9 @@
 	while (!list_empty(&buf->rb_all)) {
 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
 		list_del(&r->mw_all);
-		kfree(r->r.fmr.physaddrs);
+		kfree(r->fmr.physaddrs);
 
-		rc = ib_dealloc_fmr(r->r.fmr.fmr);
+		rc = ib_dealloc_fmr(r->fmr.fmr);
 		if (rc)
 			dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
 				__func__, rc);
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index e165673..c250924 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -109,20 +109,20 @@
 __frwr_recovery_worker(struct work_struct *work)
 {
 	struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
-					    r.frmr.fr_work);
-	struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
+					    frmr.fr_work);
+	struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
 	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
 	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
 
-	if (ib_dereg_mr(r->r.frmr.fr_mr))
+	if (ib_dereg_mr(r->frmr.fr_mr))
 		goto out_fail;
 
-	r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
-	if (IS_ERR(r->r.frmr.fr_mr))
+	r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
+	if (IS_ERR(r->frmr.fr_mr))
 		goto out_fail;
 
 	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
-	r->r.frmr.fr_state = FRMR_IS_INVALID;
+	r->frmr.fr_state = FRMR_IS_INVALID;
 	rpcrdma_put_mw(r_xprt, r);
 	return;
 
@@ -137,15 +137,15 @@
 static void
 __frwr_queue_recovery(struct rpcrdma_mw *r)
 {
-	INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
-	queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
+	INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
+	queue_work(frwr_recovery_wq, &r->frmr.fr_work);
 }
 
 static int
 __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
 	    unsigned int depth)
 {
-	struct rpcrdma_frmr *f = &r->r.frmr;
+	struct rpcrdma_frmr *f = &r->frmr;
 	int rc;
 
 	f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
@@ -158,6 +158,8 @@
 
 	sg_init_table(f->sg, depth);
 
+	init_completion(&f->fr_linv_done);
+
 	return 0;
 
 out_mr_err:
@@ -179,11 +181,11 @@
 {
 	int rc;
 
-	rc = ib_dereg_mr(r->r.frmr.fr_mr);
+	rc = ib_dereg_mr(r->frmr.fr_mr);
 	if (rc)
 		dprintk("RPC:       %s: ib_dereg_mr status %i\n",
 			__func__, rc);
-	kfree(r->r.frmr.sg);
+	kfree(r->frmr.sg);
 }
 
 static int
@@ -244,39 +246,76 @@
 		     rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
 }
 
-/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs
- * to be reset.
- *
- * WARNING: Only wr_id and status are reliable at this point
- */
 static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
+__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
+			    const char *wr)
 {
-	if (likely(wc->status == IB_WC_SUCCESS))
-		return;
-
-	/* WARNING: Only wr_id and status are reliable at this point */
-	r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-	if (wc->status == IB_WC_WR_FLUSH_ERR)
-		dprintk("RPC:       %s: frmr %p flushed\n", __func__, r);
-	else
-		pr_warn("RPC:       %s: frmr %p error, status %s (%d)\n",
-			__func__, r, ib_wc_status_msg(wc->status), wc->status);
-
-	r->r.frmr.fr_state = FRMR_IS_STALE;
+	frmr->fr_state = FRMR_IS_STALE;
+	if (wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
+		       wr, ib_wc_status_msg(wc->status),
+		       wc->status, wc->vendor_err);
 }
 
+/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ */
 static void
-frwr_sendcompletion(struct ib_wc *wc)
+frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
 {
-	struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-	struct rpcrdma_frmr *f = &r->r.frmr;
+	struct rpcrdma_frmr *frmr;
+	struct ib_cqe *cqe;
 
-	if (unlikely(wc->status != IB_WC_SUCCESS))
-		__frwr_sendcompletion_flush(wc, r);
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	if (wc->status != IB_WC_SUCCESS) {
+		cqe = wc->wr_cqe;
+		frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+		__frwr_sendcompletion_flush(wc, frmr, "fastreg");
+	}
+}
 
-	if (f->fr_waiter)
-		complete(&f->fr_linv_done);
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ */
+static void
+frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct rpcrdma_frmr *frmr;
+	struct ib_cqe *cqe;
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	if (wc->status != IB_WC_SUCCESS) {
+		cqe = wc->wr_cqe;
+		frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+		__frwr_sendcompletion_flush(wc, frmr, "localinv");
+	}
+}
+
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void
+frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct rpcrdma_frmr *frmr;
+	struct ib_cqe *cqe;
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	cqe = wc->wr_cqe;
+	frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+	if (wc->status != IB_WC_SUCCESS)
+		__frwr_sendcompletion_flush(wc, frmr, "localinv");
+	complete_all(&frmr->fr_linv_done);
 }
 
 static int
@@ -313,8 +352,7 @@
 
 		list_add(&r->mw_list, &buf->rb_mws);
 		list_add(&r->mw_all, &buf->rb_all);
-		r->mw_sendcompletion = frwr_sendcompletion;
-		r->r.frmr.fr_xprt = r_xprt;
+		r->frmr.fr_xprt = r_xprt;
 	}
 
 	return 0;
@@ -347,10 +385,9 @@
 		mw = rpcrdma_get_mw(r_xprt);
 		if (!mw)
 			return -ENOMEM;
-	} while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
-	frmr = &mw->r.frmr;
+	} while (mw->frmr.fr_state != FRMR_IS_INVALID);
+	frmr = &mw->frmr;
 	frmr->fr_state = FRMR_IS_VALID;
-	frmr->fr_waiter = false;
 	mr = frmr->fr_mr;
 	reg_wr = &frmr->fr_regwr;
 
@@ -400,7 +437,8 @@
 
 	reg_wr->wr.next = NULL;
 	reg_wr->wr.opcode = IB_WR_REG_MR;
-	reg_wr->wr.wr_id = (uintptr_t)mw;
+	frmr->fr_cqe.done = frwr_wc_fastreg;
+	reg_wr->wr.wr_cqe = &frmr->fr_cqe;
 	reg_wr->wr.num_sge = 0;
 	reg_wr->wr.send_flags = 0;
 	reg_wr->mr = mr;
@@ -434,15 +472,15 @@
 __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
 {
 	struct rpcrdma_mw *mw = seg->rl_mw;
-	struct rpcrdma_frmr *f = &mw->r.frmr;
+	struct rpcrdma_frmr *f = &mw->frmr;
 	struct ib_send_wr *invalidate_wr;
 
-	f->fr_waiter = false;
 	f->fr_state = FRMR_IS_INVALID;
 	invalidate_wr = &f->fr_invwr;
 
 	memset(invalidate_wr, 0, sizeof(*invalidate_wr));
-	invalidate_wr->wr_id = (unsigned long)(void *)mw;
+	f->fr_cqe.done = frwr_wc_localinv;
+	invalidate_wr->wr_cqe = &f->fr_cqe;
 	invalidate_wr->opcode = IB_WR_LOCAL_INV;
 	invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;
 
@@ -455,7 +493,7 @@
 {
 	struct ib_device *device = r_xprt->rx_ia.ri_device;
 	struct rpcrdma_mw *mw = seg->rl_mw;
-	struct rpcrdma_frmr *f = &mw->r.frmr;
+	struct rpcrdma_frmr *f = &mw->frmr;
 
 	seg->rl_mw = NULL;
 
@@ -504,15 +542,15 @@
 
 		i += seg->mr_nsegs;
 	}
-	f = &seg->rl_mw->r.frmr;
+	f = &seg->rl_mw->frmr;
 
 	/* Strong send queue ordering guarantees that when the
 	 * last WR in the chain completes, all WRs in the chain
 	 * are complete.
 	 */
 	f->fr_invwr.send_flags = IB_SEND_SIGNALED;
-	f->fr_waiter = true;
-	init_completion(&f->fr_linv_done);
+	f->fr_cqe.done = frwr_wc_localinv_wake;
+	reinit_completion(&f->fr_linv_done);
 	INIT_CQCOUNT(&r_xprt->rx_ep);
 
 	/* Transport disconnect drains the receive CQ before it
@@ -520,14 +558,18 @@
 	 * unless ri_id->qp is a valid pointer.
 	 */
 	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
-	if (rc)
+	if (rc) {
 		pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
+		rdma_disconnect(ia->ri_id);
+		goto unmap;
+	}
 
 	wait_for_completion(&f->fr_linv_done);
 
 	/* ORDER: Now DMA unmap all of the req's MRs, and return
 	 * them to the free MW list.
 	 */
+unmap:
 	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
 		seg = &req->rl_segments[i];
 
@@ -549,7 +591,7 @@
 	struct rpcrdma_mr_seg *seg1 = seg;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_mw *mw = seg1->rl_mw;
-	struct rpcrdma_frmr *frmr = &mw->r.frmr;
+	struct rpcrdma_frmr *frmr = &mw->frmr;
 	struct ib_send_wr *invalidate_wr, *bad_wr;
 	int rc, nsegs = seg->mr_nsegs;
 
@@ -557,10 +599,11 @@
 
 	seg1->rl_mw = NULL;
 	frmr->fr_state = FRMR_IS_INVALID;
-	invalidate_wr = &mw->r.frmr.fr_invwr;
+	invalidate_wr = &mw->frmr.fr_invwr;
 
 	memset(invalidate_wr, 0, sizeof(*invalidate_wr));
-	invalidate_wr->wr_id = (uintptr_t)mw;
+	frmr->fr_cqe.done = frwr_wc_localinv;
+	invalidate_wr->wr_cqe = &frmr->fr_cqe;
 	invalidate_wr->opcode = IB_WR_LOCAL_INV;
 	invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
 	DECR_CQCOUNT(&r_xprt->rx_ep);
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index dbb302e..481b9b6 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -68,7 +68,6 @@
 	rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
 	seg->mr_rkey = ia->ri_dma_mr->rkey;
 	seg->mr_base = seg->mr_dma;
-	seg->mr_nsegs = 1;
 	return 1;
 }
 
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 0f28f2d..888823b 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -132,6 +132,33 @@
 	return tlen;
 }
 
+/* Split "vec" on page boundaries into segments. FMR registers pages,
+ * not a byte range. Other modes coalesce these segments into a single
+ * MR when they can.
+ */
+static int
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
+		     int n, int nsegs)
+{
+	size_t page_offset;
+	u32 remaining;
+	char *base;
+
+	base = vec->iov_base;
+	page_offset = offset_in_page(base);
+	remaining = vec->iov_len;
+	while (remaining && n < nsegs) {
+		seg[n].mr_page = NULL;
+		seg[n].mr_offset = base;
+		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
+		remaining -= seg[n].mr_len;
+		base += seg[n].mr_len;
+		++n;
+		page_offset = 0;
+	}
+	return n;
+}
+
 /*
  * Chunk assembly from upper layer xdr_buf.
  *
@@ -150,11 +177,10 @@
 	int page_base;
 	struct page **ppages;
 
-	if (pos == 0 && xdrbuf->head[0].iov_len) {
-		seg[n].mr_page = NULL;
-		seg[n].mr_offset = xdrbuf->head[0].iov_base;
-		seg[n].mr_len = xdrbuf->head[0].iov_len;
-		++n;
+	if (pos == 0) {
+		n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
+		if (n == nsegs)
+			return -EIO;
 	}
 
 	len = xdrbuf->page_len;
@@ -192,13 +218,9 @@
 		 * xdr pad bytes, saving the server an RDMA operation. */
 		if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
 			return n;
+		n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
 		if (n == nsegs)
-			/* Tail remains, but we're out of segments */
 			return -EIO;
-		seg[n].mr_page = NULL;
-		seg[n].mr_offset = xdrbuf->tail[0].iov_base;
-		seg[n].mr_len = xdrbuf->tail[0].iov_len;
-		++n;
 	}
 
 	return n;
@@ -773,20 +795,17 @@
 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	__be32 *iptr;
-	int rdmalen, status;
+	int rdmalen, status, rmerr;
 	unsigned long cwnd;
-	u32 credits;
 
 	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 
 	if (rep->rr_len == RPCRDMA_BAD_LEN)
 		goto out_badstatus;
-	if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
 		goto out_shortreply;
 
 	headerp = rdmab_to_msg(rep->rr_rdmabuf);
-	if (headerp->rm_vers != rpcrdma_version)
-		goto out_badversion;
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	if (rpcrdma_is_bcall(headerp))
 		goto out_bcall;
@@ -809,15 +828,16 @@
 	 */
 	list_del_init(&rqst->rq_list);
 	spin_unlock_bh(&xprt->transport_lock);
-	dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
-		"                   RPC request 0x%p xid 0x%08x\n",
-			__func__, rep, req, rqst,
-			be32_to_cpu(headerp->rm_xid));
+	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
+		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
 	/* from here on, the reply is no longer an orphan */
 	req->rl_reply = rep;
 	xprt->reestablish_timeout = 0;
 
+	if (headerp->rm_vers != rpcrdma_version)
+		goto out_badversion;
+
 	/* check for expected message types */
 	/* The order of some of these tests is important. */
 	switch (headerp->rm_type) {
@@ -878,6 +898,9 @@
 		status = rdmalen;
 		break;
 
+	case rdma_error:
+		goto out_rdmaerr;
+
 badheader:
 	default:
 		dprintk("%s: invalid rpcrdma reply header (type %d):"
@@ -893,6 +916,7 @@
 		break;
 	}
 
+out:
 	/* Invalidate and flush the data payloads before waking the
 	 * waiting application. This guarantees the memory region is
 	 * properly fenced from the server before the application
@@ -903,15 +927,9 @@
 	if (req->rl_nchunks)
 		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
 
-	credits = be32_to_cpu(headerp->rm_credit);
-	if (credits == 0)
-		credits = 1;	/* don't deadlock */
-	else if (credits > r_xprt->rx_buf.rb_max_requests)
-		credits = r_xprt->rx_buf.rb_max_requests;
-
 	spin_lock_bh(&xprt->transport_lock);
 	cwnd = xprt->cwnd;
-	xprt->cwnd = credits << RPC_CWNDSHIFT;
+	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
 		xprt_release_rqst_cong(rqst->rq_task);
 
@@ -935,13 +953,43 @@
 	return;
 #endif
 
-out_shortreply:
-	dprintk("RPC:       %s: short/invalid reply\n", __func__);
-	goto repost;
-
+/* If the incoming reply terminated a pending RPC, the next
+ * RPC call will post a replacement receive buffer as it is
+ * being marshaled.
+ */
 out_badversion:
 	dprintk("RPC:       %s: invalid version %d\n",
 		__func__, be32_to_cpu(headerp->rm_vers));
+	status = -EIO;
+	r_xprt->rx_stats.bad_reply_count++;
+	goto out;
+
+out_rdmaerr:
+	rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
+	switch (rmerr) {
+	case ERR_VERS:
+		pr_err("%s: server reports header version error (%u-%u)\n",
+		       __func__,
+		       be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
+		       be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
+		break;
+	case ERR_CHUNK:
+		pr_err("%s: server reports header decoding error\n",
+		       __func__);
+		break;
+	default:
+		pr_err("%s: server reports unknown error %d\n",
+		       __func__, rmerr);
+	}
+	status = -EREMOTEIO;
+	r_xprt->rx_stats.bad_reply_count++;
+	goto out;
+
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
+out_shortreply:
+	dprintk("RPC:       %s: short/invalid reply\n", __func__);
 	goto repost;
 
 out_nomatch:
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 878f1bf..f5ed9f9 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -112,73 +112,20 @@
 	}
 }
 
-static void
-rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
-{
-	struct rpcrdma_ep *ep = context;
-
-	pr_err("RPC:       %s: %s on device %s ep %p\n",
-	       __func__, ib_event_msg(event->event),
-		event->device->name, context);
-	if (ep->rep_connected == 1) {
-		ep->rep_connected = -EIO;
-		rpcrdma_conn_func(ep);
-		wake_up_all(&ep->rep_connect_wait);
-	}
-}
-
-static void
-rpcrdma_sendcq_process_wc(struct ib_wc *wc)
-{
-	/* WARNING: Only wr_id and status are reliable at this point */
-	if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
-		if (wc->status != IB_WC_SUCCESS &&
-		    wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("RPC:       %s: SEND: %s\n",
-			       __func__, ib_wc_status_msg(wc->status));
-	} else {
-		struct rpcrdma_mw *r;
-
-		r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-		r->mw_sendcompletion(wc);
-	}
-}
-
-/* The common case is a single send completion is waiting. By
- * passing two WC entries to ib_poll_cq, a return code of 1
- * means there is exactly one WC waiting and no more. We don't
- * have to invoke ib_poll_cq again to know that the CQ has been
- * properly drained.
+/**
+ * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
  */
 static void
-rpcrdma_sendcq_poll(struct ib_cq *cq)
+rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 {
-	struct ib_wc *pos, wcs[2];
-	int count, rc;
-
-	do {
-		pos = wcs;
-
-		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
-		if (rc < 0)
-			break;
-
-		count = rc;
-		while (count-- > 0)
-			rpcrdma_sendcq_process_wc(pos++);
-	} while (rc == ARRAY_SIZE(wcs));
-	return;
-}
-
-/* Handle provider send completion upcalls.
- */
-static void
-rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
-{
-	do {
-		rpcrdma_sendcq_poll(cq);
-	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
-				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
+		       ib_wc_status_msg(wc->status),
+		       wc->status, wc->vendor_err);
 }
 
 static void
@@ -190,11 +137,40 @@
 	rpcrdma_reply_handler(rep);
 }
 
+/* Perform basic sanity checking to avoid using garbage
+ * to update the credit grant value.
+ */
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc)
+rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
 {
-	struct rpcrdma_rep *rep =
-			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
+	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
+	u32 credits;
+
+	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
+		return;
+
+	credits = be32_to_cpu(rmsgp->rm_credit);
+	if (credits == 0)
+		credits = 1;	/* don't deadlock */
+	else if (credits > buffer->rb_max_requests)
+		credits = buffer->rb_max_requests;
+
+	atomic_set(&buffer->rb_credits, credits);
+}
+
+/**
+ * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ */
+static void
+rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
+					       rr_cqe);
 
 	/* WARNING: Only wr_id and status are reliable at this point */
 	if (wc->status != IB_WC_SUCCESS)
@@ -211,7 +187,8 @@
 	ib_dma_sync_single_for_cpu(rep->rr_device,
 				   rdmab_addr(rep->rr_rdmabuf),
 				   rep->rr_len, DMA_FROM_DEVICE);
-	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
+
+	rpcrdma_update_granted_credits(rep);
 
 out_schedule:
 	queue_work(rpcrdma_receive_wq, &rep->rr_work);
@@ -219,57 +196,20 @@
 
 out_fail:
 	if (wc->status != IB_WC_WR_FLUSH_ERR)
-		pr_err("RPC:       %s: rep %p: %s\n",
-		       __func__, rep, ib_wc_status_msg(wc->status));
+		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
+		       ib_wc_status_msg(wc->status),
+		       wc->status, wc->vendor_err);
 	rep->rr_len = RPCRDMA_BAD_LEN;
 	goto out_schedule;
 }
 
-/* The wc array is on stack: automatic memory is always CPU-local.
- *
- * struct ib_wc is 64 bytes, making the poll array potentially
- * large. But this is at the bottom of the call chain. Further
- * substantial work is done in another thread.
- */
-static void
-rpcrdma_recvcq_poll(struct ib_cq *cq)
-{
-	struct ib_wc *pos, wcs[4];
-	int count, rc;
-
-	do {
-		pos = wcs;
-
-		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
-		if (rc < 0)
-			break;
-
-		count = rc;
-		while (count-- > 0)
-			rpcrdma_recvcq_process_wc(pos++);
-	} while (rc == ARRAY_SIZE(wcs));
-}
-
-/* Handle provider receive completion upcalls.
- */
-static void
-rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
-{
-	do {
-		rpcrdma_recvcq_poll(cq);
-	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
-				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
-}
-
 static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
 	struct ib_wc wc;
 
 	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-		rpcrdma_recvcq_process_wc(&wc);
-	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
-		rpcrdma_sendcq_process_wc(&wc);
+		rpcrdma_receive_wc(NULL, &wc);
 }
 
 static int
@@ -330,6 +270,7 @@
 connected:
 		dprintk("RPC:       %s: %sconnected\n",
 					__func__, connstate > 0 ? "" : "dis");
+		atomic_set(&xprt->rx_buf.rb_credits, 1);
 		ep->rep_connected = connstate;
 		rpcrdma_conn_func(ep);
 		wake_up_all(&ep->rep_connect_wait);
@@ -560,9 +501,8 @@
 				struct rpcrdma_create_data_internal *cdata)
 {
 	struct ib_cq *sendcq, *recvcq;
-	struct ib_cq_init_attr cq_attr = {};
 	unsigned int max_qp_wr;
-	int rc, err;
+	int rc;
 
 	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
 		dprintk("RPC:       %s: insufficient sge's available\n",
@@ -614,9 +554,9 @@
 	init_waitqueue_head(&ep->rep_connect_wait);
 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
-	cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
-	sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
-			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+	sendcq = ib_alloc_cq(ia->ri_device, NULL,
+			     ep->rep_attr.cap.max_send_wr + 1,
+			     0, IB_POLL_SOFTIRQ);
 	if (IS_ERR(sendcq)) {
 		rc = PTR_ERR(sendcq);
 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
@@ -624,16 +564,9 @@
 		goto out1;
 	}
 
-	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
-	if (rc) {
-		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-			__func__, rc);
-		goto out2;
-	}
-
-	cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
-	recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
-			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+	recvcq = ib_alloc_cq(ia->ri_device, NULL,
+			     ep->rep_attr.cap.max_recv_wr + 1,
+			     0, IB_POLL_SOFTIRQ);
 	if (IS_ERR(recvcq)) {
 		rc = PTR_ERR(recvcq);
 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -641,14 +574,6 @@
 		goto out2;
 	}
 
-	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
-	if (rc) {
-		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-			__func__, rc);
-		ib_destroy_cq(recvcq);
-		goto out2;
-	}
-
 	ep->rep_attr.send_cq = sendcq;
 	ep->rep_attr.recv_cq = recvcq;
 
@@ -673,10 +598,7 @@
 	return 0;
 
 out2:
-	err = ib_destroy_cq(sendcq);
-	if (err)
-		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-			__func__, err);
+	ib_free_cq(sendcq);
 out1:
 	if (ia->ri_dma_mr)
 		ib_dereg_mr(ia->ri_dma_mr);
@@ -711,15 +633,8 @@
 		ia->ri_id->qp = NULL;
 	}
 
-	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
-	if (rc)
-		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-			__func__, rc);
-
-	rc = ib_destroy_cq(ep->rep_attr.send_cq);
-	if (rc)
-		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-			__func__, rc);
+	ib_free_cq(ep->rep_attr.recv_cq);
+	ib_free_cq(ep->rep_attr.send_cq);
 
 	if (ia->ri_dma_mr) {
 		rc = ib_dereg_mr(ia->ri_dma_mr);
@@ -898,6 +813,7 @@
 	spin_lock(&buffer->rb_reqslock);
 	list_add(&req->rl_all, &buffer->rb_allreqs);
 	spin_unlock(&buffer->rb_reqslock);
+	req->rl_cqe.done = rpcrdma_wc_send;
 	req->rl_buffer = &r_xprt->rx_buf;
 	return req;
 }
@@ -923,6 +839,7 @@
 	}
 
 	rep->rr_device = ia->ri_device;
+	rep->rr_cqe.done = rpcrdma_receive_wc;
 	rep->rr_rxprt = r_xprt;
 	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
 	return rep;
@@ -943,6 +860,7 @@
 	buf->rb_max_requests = r_xprt->rx_data.max_requests;
 	buf->rb_bc_srv_max_requests = 0;
 	spin_lock_init(&buf->rb_lock);
+	atomic_set(&buf->rb_credits, 1);
 
 	rc = ia->ri_ops->ro_init(r_xprt);
 	if (rc)
@@ -1259,7 +1177,7 @@
 	}
 
 	send_wr.next = NULL;
-	send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
+	send_wr.wr_cqe = &req->rl_cqe;
 	send_wr.sg_list = iov;
 	send_wr.num_sge = req->rl_niovs;
 	send_wr.opcode = IB_WR_SEND;
@@ -1297,7 +1215,7 @@
 	int rc;
 
 	recv_wr.next = NULL;
-	recv_wr.wr_id = (u64) (unsigned long) rep;
+	recv_wr.wr_cqe = &rep->rr_cqe;
 	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 	recv_wr.num_sge = 1;
 
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 38fe11b..2ebc743 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -95,10 +95,6 @@
 #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
 #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
 
-/* Force completion handler to ignore the signal
- */
-#define RPCRDMA_IGNORE_COMPLETION	(0ULL)
-
 /* Pre-allocate extra Work Requests for handling backward receives
  * and sends. This is a fixed value because the Work Queues are
  * allocated when the forward channel is set up.
@@ -171,6 +167,7 @@
 struct rpcrdma_buffer;
 
 struct rpcrdma_rep {
+	struct ib_cqe		rr_cqe;
 	unsigned int		rr_len;
 	struct ib_device	*rr_device;
 	struct rpcrdma_xprt	*rr_rxprt;
@@ -204,11 +201,11 @@
 	struct scatterlist		*sg;
 	int				sg_nents;
 	struct ib_mr			*fr_mr;
+	struct ib_cqe			fr_cqe;
 	enum rpcrdma_frmr_state		fr_state;
+	struct completion		fr_linv_done;
 	struct work_struct		fr_work;
 	struct rpcrdma_xprt		*fr_xprt;
-	bool				fr_waiter;
-	struct completion		fr_linv_done;;
 	union {
 		struct ib_reg_wr	fr_regwr;
 		struct ib_send_wr	fr_invwr;
@@ -224,8 +221,7 @@
 	union {
 		struct rpcrdma_fmr	fmr;
 		struct rpcrdma_frmr	frmr;
-	} r;
-	void			(*mw_sendcompletion)(struct ib_wc *);
+	};
 	struct list_head	mw_list;
 	struct list_head	mw_all;
 };
@@ -281,6 +277,7 @@
 	struct rpcrdma_regbuf	*rl_sendbuf;
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
 
+	struct ib_cqe		rl_cqe;
 	struct list_head	rl_all;
 	bool			rl_backchannel;
 };
@@ -311,6 +308,7 @@
 	struct list_head	rb_send_bufs;
 	struct list_head	rb_recv_bufs;
 	u32			rb_max_requests;
+	atomic_t		rb_credits;	/* most recent credit grant */
 
 	u32			rb_bc_srv_max_requests;
 	spinlock_t		rb_reqslock;	/* protect rb_allreqs */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index fde2138..65e7595 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1844,9 +1844,7 @@
  */
 static void xs_local_rpcbind(struct rpc_task *task)
 {
-	rcu_read_lock();
-	xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt));
-	rcu_read_unlock();
+	xprt_set_bound(task->tk_xprt);
 }
 
 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)