Merge tag 'nfs-rdma-4.12-1' of git://git.linux-nfs.org/projects/anna/nfs-rdma

NFS: NFS over RDMA Client Side Changes

New Features:
- Break RDMA connections after a connection timeout
- Support for unloading the underlying device driver

Bugfixes and cleanups:
- Mark the receive workqueue as "read-mostly"
- Silence warnings caused by ENOBUFS
- Update a comment in xdr_init_decode_pages()
- Remove rpcrdma_buffer->rb_pool.
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 1f70821..e34f4ee 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -807,7 +807,7 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
 EXPORT_SYMBOL_GPL(xdr_init_decode);
 
 /**
- * xdr_init_decode - Initialize an xdr_stream for decoding data.
+ * xdr_init_decode_pages - Initialize an xdr_stream for decoding into pages
  * @xdr: pointer to xdr_stream struct
  * @buf: pointer to XDR buffer from which to decode data
  * @pages: list of pages to decode into
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index b530a28..3e63c5e 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -651,6 +651,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 	spin_unlock_bh(&xprt->transport_lock);
 }
+EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 
 /**
  * xprt_conditional_disconnect - force a transport to disconnect
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index a044be2..694e9b1 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -494,7 +494,7 @@ rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
 	}
 	sge->length = len;
 
-	ib_dma_sync_single_for_device(ia->ri_device, sge->addr,
+	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
 				      sge->length, DMA_TO_DEVICE);
 	req->rl_send_wr.num_sge++;
 	return true;
@@ -523,7 +523,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
 	sge[sge_no].addr = rdmab_addr(rb);
 	sge[sge_no].length = xdr->head[0].iov_len;
 	sge[sge_no].lkey = rdmab_lkey(rb);
-	ib_dma_sync_single_for_device(device, sge[sge_no].addr,
+	ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
 				      sge[sge_no].length, DMA_TO_DEVICE);
 
 	/* If there is a Read chunk, the page list is being handled
@@ -781,9 +781,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	return 0;
 
 out_err:
-	pr_err("rpcrdma: rpcrdma_marshal_req failed, status %ld\n",
-	       PTR_ERR(iptr));
-	r_xprt->rx_stats.failed_marshal_count++;
+	if (PTR_ERR(iptr) != -ENOBUFS) {
+		pr_err("rpcrdma: rpcrdma_marshal_req failed, status %ld\n",
+		       PTR_ERR(iptr));
+		r_xprt->rx_stats.failed_marshal_count++;
+	}
 	return PTR_ERR(iptr);
 }
 
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index c717f54..62ecbcc 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -66,8 +66,8 @@ static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
 unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
 static unsigned int xprt_rdma_inline_write_padding;
-static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
-		int xprt_rdma_pad_optimize = 0;
+unsigned int xprt_rdma_memreg_strategy		= RPCRDMA_FRMR;
+int xprt_rdma_pad_optimize;
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 
@@ -396,7 +396,7 @@ xprt_setup_rdma(struct xprt_create *args)
 
 	new_xprt = rpcx_to_rdmax(xprt);
 
-	rc = rpcrdma_ia_open(new_xprt, sap, xprt_rdma_memreg_strategy);
+	rc = rpcrdma_ia_open(new_xprt, sap);
 	if (rc)
 		goto out1;
 
@@ -457,19 +457,33 @@ xprt_setup_rdma(struct xprt_create *args)
 	return ERR_PTR(rc);
 }
 
-/*
- * Close a connection, during shutdown or timeout/reconnect
+/**
+ * xprt_rdma_close - Close down RDMA connection
+ * @xprt: generic transport to be closed
+ *
+ * Called during transport shutdown reconnect, or device
+ * removal. Caller holds the transport's write lock.
  */
 static void
 xprt_rdma_close(struct rpc_xprt *xprt)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	dprintk("RPC:       %s: closing\n", __func__);
-	if (r_xprt->rx_ep.rep_connected > 0)
+	dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);
+
+	if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
+		xprt_clear_connected(xprt);
+		rpcrdma_ia_remove(ia);
+		return;
+	}
+	if (ep->rep_connected == -ENODEV)
+		return;
+	if (ep->rep_connected > 0)
 		xprt->reestablish_timeout = 0;
 	xprt_disconnect_done(xprt);
-	rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+	rpcrdma_ep_disconnect(ep, ia);
 }
 
 static void
@@ -484,6 +498,27 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
 	dprintk("RPC:       %s: %u\n", __func__, port);
 }
 
+/**
+ * xprt_rdma_timer - invoked when an RPC times out
+ * @xprt: controlling RPC transport
+ * @task: RPC task that timed out
+ *
+ * Invoked when the transport is still connected, but an RPC
+ * retransmit timeout occurs.
+ *
+ * Since RDMA connections don't have a keep-alive, forcibly
+ * disconnect and retry to connect. This drives full
+ * detection of the network path, and retransmissions of
+ * all pending RPCs.
+ */
+static void
+xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+	dprintk("RPC: %5u %s: xprt = %p\n", task->tk_pid, __func__, xprt);
+
+	xprt_force_disconnect(xprt);
+}
+
 static void
 xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 {
@@ -659,6 +694,8 @@ xprt_rdma_free(struct rpc_task *task)
  * xprt_rdma_send_request - marshal and send an RPC request
  * @task: RPC task with an RPC message in rq_snd_buf
  *
+ * Caller holds the transport's write lock.
+ *
  * Return values:
  *        0:	The request has been sent
  * ENOTCONN:	Caller needs to invoke connect logic then call again
@@ -685,6 +722,9 @@ xprt_rdma_send_request(struct rpc_task *task)
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	int rc = 0;
 
+	if (!xprt_connected(xprt))
+		goto drop_connection;
+
 	/* On retransmit, remove any previously registered chunks */
 	if (unlikely(!list_empty(&req->rl_registered)))
 		r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
@@ -776,6 +816,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
 	.alloc_slot		= xprt_alloc_slot,
 	.release_request	= xprt_release_rqst_cong,       /* ditto */
 	.set_retrans_timeout	= xprt_set_retrans_timeout_def, /* ditto */
+	.timer			= xprt_rdma_timer,
 	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */
 	.set_port		= xprt_rdma_set_port,
 	.connect		= xprt_rdma_connect,
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 3b332b3..3dbce9a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -53,7 +53,7 @@
 #include <linux/sunrpc/addr.h>
 #include <linux/sunrpc/svc_rdma.h>
 #include <asm/bitops.h>
-#include <linux/module.h> /* try_module_get()/module_put() */
+
 #include <rdma/ib_cm.h>
 
 #include "xprt_rdma.h"
@@ -69,8 +69,11 @@
 /*
  * internal functions
  */
+static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt);
+static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf);
+static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
 
-static struct workqueue_struct *rpcrdma_receive_wq;
+static struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
 
 int
 rpcrdma_alloc_wq(void)
@@ -180,7 +183,7 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 	rep->rr_wc_flags = wc->wc_flags;
 	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 
-	ib_dma_sync_single_for_cpu(rep->rr_device,
+	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
 				   rdmab_addr(rep->rr_rdmabuf),
 				   rep->rr_len, DMA_FROM_DEVICE);
 
@@ -262,6 +265,21 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 			__func__, ep);
 		complete(&ia->ri_done);
 		break;
+	case RDMA_CM_EVENT_DEVICE_REMOVAL:
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+		pr_info("rpcrdma: removing device for %pIS:%u\n",
+			sap, rpc_get_port(sap));
+#endif
+		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
+		ep->rep_connected = -ENODEV;
+		xprt_force_disconnect(&xprt->rx_xprt);
+		wait_for_completion(&ia->ri_remove_done);
+
+		ia->ri_id = NULL;
+		ia->ri_pd = NULL;
+		ia->ri_device = NULL;
+		/* Return 1 to ensure the core destroys the id. */
+		return 1;
 	case RDMA_CM_EVENT_ESTABLISHED:
 		connstate = 1;
 		ib_query_qp(ia->ri_id->qp, attr,
@@ -291,9 +309,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		goto connected;
 	case RDMA_CM_EVENT_DISCONNECTED:
 		connstate = -ECONNABORTED;
-		goto connected;
-	case RDMA_CM_EVENT_DEVICE_REMOVAL:
-		connstate = -ENODEV;
 connected:
 		dprintk("RPC:       %s: %sconnected\n",
 					__func__, connstate > 0 ? "" : "dis");
@@ -329,14 +344,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 	return 0;
 }
 
-static void rpcrdma_destroy_id(struct rdma_cm_id *id)
-{
-	if (id) {
-		module_put(id->device->owner);
-		rdma_destroy_id(id);
-	}
-}
-
 static struct rdma_cm_id *
 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 			struct rpcrdma_ia *ia, struct sockaddr *addr)
@@ -346,6 +353,7 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 	int rc;
 
 	init_completion(&ia->ri_done);
+	init_completion(&ia->ri_remove_done);
 
 	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
 			    IB_QPT_RC);
@@ -370,16 +378,6 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 		goto out;
 	}
 
-	/* FIXME:
-	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
-	 * be pinned while there are active NFS/RDMA mounts to prevent
-	 * hangs and crashes at umount time.
-	 */
-	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
-		dprintk("RPC:       %s: Failed to get device module\n",
-			__func__);
-		ia->ri_async_rc = -ENODEV;
-	}
 	rc = ia->ri_async_rc;
 	if (rc)
 		goto out;
@@ -389,21 +387,20 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 	if (rc) {
 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
 			__func__, rc);
-		goto put;
+		goto out;
 	}
 	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 	if (rc < 0) {
 		dprintk("RPC:       %s: wait() exited: %i\n",
 			__func__, rc);
-		goto put;
+		goto out;
 	}
 	rc = ia->ri_async_rc;
 	if (rc)
-		goto put;
+		goto out;
 
 	return id;
-put:
-	module_put(id->device->owner);
+
 out:
 	rdma_destroy_id(id);
 	return ERR_PTR(rc);
@@ -413,13 +410,16 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
  * Exported functions.
  */
 
-/*
- * Open and initialize an Interface Adapter.
- *  o initializes fields of struct rpcrdma_ia, including
- *    interface and provider attributes and protection zone.
+/**
+ * rpcrdma_ia_open - Open and initialize an Interface Adapter.
+ * @xprt: controlling transport
+ * @addr: IP address of remote peer
+ *
+ * Returns 0 on success, negative errno if an appropriate
+ * Interface Adapter could not be found and opened.
  */
 int
-rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
+rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr)
 {
 	struct rpcrdma_ia *ia = &xprt->rx_ia;
 	int rc;
@@ -427,7 +427,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
 	if (IS_ERR(ia->ri_id)) {
 		rc = PTR_ERR(ia->ri_id);
-		goto out1;
+		goto out_err;
 	}
 	ia->ri_device = ia->ri_id->device;
 
@@ -435,10 +435,10 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	if (IS_ERR(ia->ri_pd)) {
 		rc = PTR_ERR(ia->ri_pd);
 		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
-		goto out2;
+		goto out_err;
 	}
 
-	switch (memreg) {
+	switch (xprt_rdma_memreg_strategy) {
 	case RPCRDMA_FRMR:
 		if (frwr_is_supported(ia)) {
 			ia->ri_ops = &rpcrdma_frwr_memreg_ops;
@@ -452,28 +452,73 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 		}
 		/*FALLTHROUGH*/
 	default:
-		pr_err("rpcrdma: Unsupported memory registration mode: %d\n",
-		       memreg);
+		pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
+		       ia->ri_device->name, xprt_rdma_memreg_strategy);
 		rc = -EINVAL;
-		goto out3;
+		goto out_err;
 	}
 
 	return 0;
 
-out3:
-	ib_dealloc_pd(ia->ri_pd);
-	ia->ri_pd = NULL;
-out2:
-	rpcrdma_destroy_id(ia->ri_id);
-	ia->ri_id = NULL;
-out1:
+out_err:
+	rpcrdma_ia_close(ia);
 	return rc;
 }
 
-/*
- * Clean up/close an IA.
- *   o if event handles and PD have been initialized, free them.
- *   o close the IA
+/**
+ * rpcrdma_ia_remove - Handle device driver unload
+ * @ia: interface adapter being removed
+ *
+ * Divest transport H/W resources associated with this adapter,
+ * but allow it to be restored later.
+ */
+void
+rpcrdma_ia_remove(struct rpcrdma_ia *ia)
+{
+	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
+						   rx_ia);
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_req *req;
+	struct rpcrdma_rep *rep;
+
+	cancel_delayed_work_sync(&buf->rb_refresh_worker);
+
+	/* This is similar to rpcrdma_ep_destroy, but:
+	 * - Don't cancel the connect worker.
+	 * - Don't call rpcrdma_ep_disconnect, which waits
+	 *   for another conn upcall, which will deadlock.
+	 * - rdma_disconnect is unneeded, the underlying
+	 *   connection is already gone.
+	 */
+	if (ia->ri_id->qp) {
+		ib_drain_qp(ia->ri_id->qp);
+		rdma_destroy_qp(ia->ri_id);
+		ia->ri_id->qp = NULL;
+	}
+	ib_free_cq(ep->rep_attr.recv_cq);
+	ib_free_cq(ep->rep_attr.send_cq);
+
+	/* The ULP is responsible for ensuring all DMA
+	 * mappings and MRs are gone.
+	 */
+	list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
+		rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
+	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
+		rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
+		rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
+		rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
+	}
+	rpcrdma_destroy_mrs(buf);
+
+	/* Allow waiters to continue */
+	complete(&ia->ri_remove_done);
+}
+
+/**
+ * rpcrdma_ia_close - Clean up/close an IA.
+ * @ia: interface adapter to close
+ *
  */
 void
 rpcrdma_ia_close(struct rpcrdma_ia *ia)
@@ -482,13 +527,15 @@ rpcrdma_ia_close(struct rpcrdma_ia *ia)
 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 		if (ia->ri_id->qp)
 			rdma_destroy_qp(ia->ri_id);
-		rpcrdma_destroy_id(ia->ri_id);
-		ia->ri_id = NULL;
+		rdma_destroy_id(ia->ri_id);
 	}
+	ia->ri_id = NULL;
+	ia->ri_device = NULL;
 
 	/* If the pd is still busy, xprtrdma missed freeing a resource */
 	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
 		ib_dealloc_pd(ia->ri_pd);
+	ia->ri_pd = NULL;
 }
 
 /*
@@ -646,6 +693,99 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	ib_free_cq(ep->rep_attr.send_cq);
 }
 
+/* Re-establish a connection after a device removal event.
+ * Unlike a normal reconnection, a fresh PD and a new set
+ * of MRs and buffers is needed.
+ */
+static int
+rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
+			 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+	struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr;
+	int rc, err;
+
+	pr_info("%s: r_xprt = %p\n", __func__, r_xprt);
+
+	rc = -EHOSTUNREACH;
+	if (rpcrdma_ia_open(r_xprt, sap))
+		goto out1;
+
+	rc = -ENOMEM;
+	err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
+	if (err) {
+		pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
+		goto out2;
+	}
+
+	rc = -ENETUNREACH;
+	err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+	if (err) {
+		pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
+		goto out3;
+	}
+
+	rpcrdma_create_mrs(r_xprt);
+	return 0;
+
+out3:
+	rpcrdma_ep_destroy(ep, ia);
+out2:
+	rpcrdma_ia_close(ia);
+out1:
+	return rc;
+}
+
+static int
+rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
+		     struct rpcrdma_ia *ia)
+{
+	struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr;
+	struct rdma_cm_id *id, *old;
+	int err, rc;
+
+	dprintk("RPC:       %s: reconnecting...\n", __func__);
+
+	rpcrdma_ep_disconnect(ep, ia);
+
+	rc = -EHOSTUNREACH;
+	id = rpcrdma_create_id(r_xprt, ia, sap);
+	if (IS_ERR(id))
+		goto out;
+
+	/* As long as the new ID points to the same device as the
+	 * old ID, we can reuse the transport's existing PD and all
+	 * previously allocated MRs. Also, the same device means
+	 * the transport's previous DMA mappings are still valid.
+	 *
+	 * This is a sanity check only. There should be no way these
+	 * point to two different devices here.
+	 */
+	old = id;
+	rc = -ENETUNREACH;
+	if (ia->ri_device != id->device) {
+		pr_err("rpcrdma: can't reconnect on different device!\n");
+		goto out_destroy;
+	}
+
+	err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
+	if (err) {
+		dprintk("RPC:       %s: rdma_create_qp returned %d\n",
+			__func__, err);
+		goto out_destroy;
+	}
+
+	/* Atomically replace the transport's ID and QP. */
+	rc = 0;
+	old = ia->ri_id;
+	ia->ri_id = id;
+	rdma_destroy_qp(old);
+
+out_destroy:
+	rdma_destroy_id(old);
+out:
+	return rc;
+}
+
 /*
  * Connect unconnected endpoint.
  */
@@ -654,61 +794,30 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
 	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 						   rx_ia);
-	struct rdma_cm_id *id, *old;
-	struct sockaddr *sap;
 	unsigned int extras;
-	int rc = 0;
+	int rc;
 
-	if (ep->rep_connected != 0) {
 retry:
-		dprintk("RPC:       %s: reconnecting...\n", __func__);
-
-		rpcrdma_ep_disconnect(ep, ia);
-
-		sap = (struct sockaddr *)&r_xprt->rx_data.addr;
-		id = rpcrdma_create_id(r_xprt, ia, sap);
-		if (IS_ERR(id)) {
-			rc = -EHOSTUNREACH;
-			goto out;
-		}
-		/* TEMP TEMP TEMP - fail if new device:
-		 * Deregister/remarshal *all* requests!
-		 * Close and recreate adapter, pd, etc!
-		 * Re-determine all attributes still sane!
-		 * More stuff I haven't thought of!
-		 * Rrrgh!
-		 */
-		if (ia->ri_device != id->device) {
-			printk("RPC:       %s: can't reconnect on "
-				"different device!\n", __func__);
-			rpcrdma_destroy_id(id);
-			rc = -ENETUNREACH;
-			goto out;
-		}
-		/* END TEMP */
-		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
-		if (rc) {
-			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
-				__func__, rc);
-			rpcrdma_destroy_id(id);
-			rc = -ENETUNREACH;
-			goto out;
-		}
-
-		old = ia->ri_id;
-		ia->ri_id = id;
-
-		rdma_destroy_qp(old);
-		rpcrdma_destroy_id(old);
-	} else {
+	switch (ep->rep_connected) {
+	case 0:
 		dprintk("RPC:       %s: connecting...\n", __func__);
 		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
 		if (rc) {
 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 				__func__, rc);
-			/* do not update ep->rep_connected */
-			return -ENETUNREACH;
+			rc = -ENETUNREACH;
+			goto out_noupdate;
 		}
+		break;
+	case -ENODEV:
+		rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
+		if (rc)
+			goto out_noupdate;
+		break;
+	default:
+		rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
+		if (rc)
+			goto out;
 	}
 
 	ep->rep_connected = 0;
@@ -736,6 +845,8 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 out:
 	if (rc)
 		ep->rep_connected = rc;
+
+out_noupdate:
 	return rc;
 }
 
@@ -878,7 +989,6 @@ struct rpcrdma_rep *
 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_rep *rep;
 	int rc;
 
@@ -894,7 +1004,6 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 		goto out_free;
 	}
 
-	rep->rr_device = ia->ri_device;
 	rep->rr_cqe.done = rpcrdma_wc_receive;
 	rep->rr_rxprt = r_xprt;
 	INIT_WORK(&rep->rr_work, rpcrdma_reply_handler);
@@ -1037,6 +1146,7 @@ void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
 	cancel_delayed_work_sync(&buf->rb_recovery_worker);
+	cancel_delayed_work_sync(&buf->rb_refresh_worker);
 
 	while (!list_empty(&buf->rb_recv_bufs)) {
 		struct rpcrdma_rep *rep;
@@ -1081,7 +1191,8 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
 
 out_nomws:
 	dprintk("RPC:       %s: no MWs available\n", __func__);
-	schedule_delayed_work(&buf->rb_refresh_worker, 0);
+	if (r_xprt->rx_ep.rep_connected != -ENODEV)
+		schedule_delayed_work(&buf->rb_refresh_worker, 0);
 
 	/* Allow the reply handler and refresh worker to run */
 	cond_resched();
@@ -1231,17 +1342,19 @@ rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
 bool
 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
 {
+	struct ib_device *device = ia->ri_device;
+
 	if (rb->rg_direction == DMA_NONE)
 		return false;
 
-	rb->rg_iov.addr = ib_dma_map_single(ia->ri_device,
+	rb->rg_iov.addr = ib_dma_map_single(device,
 					    (void *)rb->rg_base,
 					    rdmab_length(rb),
 					    rb->rg_direction);
-	if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb)))
+	if (ib_dma_mapping_error(device, rdmab_addr(rb)))
 		return false;
 
-	rb->rg_device = ia->ri_device;
+	rb->rg_device = device;
 	rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
 	return true;
 }
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 171a351..1d66acf 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -69,6 +69,7 @@ struct rpcrdma_ia {
 	struct rdma_cm_id 	*ri_id;
 	struct ib_pd		*ri_pd;
 	struct completion	ri_done;
+	struct completion	ri_remove_done;
 	int			ri_async_rc;
 	unsigned int		ri_max_segs;
 	unsigned int		ri_max_frmr_depth;
@@ -78,10 +79,15 @@ struct rpcrdma_ia {
 	bool			ri_reminv_expected;
 	bool			ri_implicit_roundup;
 	enum ib_mr_type		ri_mrtype;
+	unsigned long		ri_flags;
 	struct ib_qp_attr	ri_qp_attr;
 	struct ib_qp_init_attr	ri_qp_init_attr;
 };
 
+enum {
+	RPCRDMA_IAF_REMOVING = 0,
+};
+
 /*
  * RDMA Endpoint -- one per transport instance
  */
@@ -164,6 +170,12 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
 	return (struct rpcrdma_msg *)rb->rg_base;
 }
 
+static inline struct ib_device *
+rdmab_device(struct rpcrdma_regbuf *rb)
+{
+	return rb->rg_device;
+}
+
 #define RPCRDMA_DEF_GFP		(GFP_NOIO | __GFP_NOWARN)
 
 /* To ensure a transport can always make forward progress,
@@ -209,7 +221,6 @@ struct rpcrdma_rep {
 	unsigned int		rr_len;
 	int			rr_wc_flags;
 	u32			rr_inv_rkey;
-	struct ib_device	*rr_device;
 	struct rpcrdma_xprt	*rr_rxprt;
 	struct work_struct	rr_work;
 	struct list_head	rr_list;
@@ -380,7 +391,6 @@ struct rpcrdma_buffer {
 	spinlock_t		rb_mwlock;	/* protect rb_mws list */
 	struct list_head	rb_mws;
 	struct list_head	rb_all;
-	char			*rb_pool;
 
 	spinlock_t		rb_lock;	/* protect buf lists */
 	int			rb_send_count, rb_recv_count;
@@ -497,10 +507,16 @@ struct rpcrdma_xprt {
  * Default is 0, see sysctl entry and rpc_rdma.c rpcrdma_convert_iovs() */
 extern int xprt_rdma_pad_optimize;
 
+/* This setting controls the hunt for a supported memory
+ * registration strategy.
+ */
+extern unsigned int xprt_rdma_memreg_strategy;
+
 /*
  * Interface Adapter calls - xprtrdma/verbs.c
  */
-int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int);
+int rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr);
+void rpcrdma_ia_remove(struct rpcrdma_ia *ia);
 void rpcrdma_ia_close(struct rpcrdma_ia *);
 bool frwr_is_supported(struct rpcrdma_ia *);
 bool fmr_is_supported(struct rpcrdma_ia *);