Merge branch 'rds-assorted-bug-fixes'

Santosh Shilimkar says:

====================
RDS: Assorted bug fixes

We would like to improve RDS upstream support and in that context, I
started playing with it.  But run into number of issues including as
basic is RDS IB RDMA doesn't work. As part of the debug, I ended up
creating the $subject series which has bunch of assorted fixes. At
least with this series I can run RDS IB RDMA and other tests
successfully.

Some of these fixes have been done by Chris Meson, Andy Grover and
Zach Brown while at Oracle. There are still more kinks with FMR and
error handling and I plan to address them in a follow up series.

Series generated against Linus's master(v4.2-rc-7) but also applies
against next-next cleanly. Its tested on Oracle hardware with IB
fabric for both bcopy as well as RDMA mode. I don't have access
to iWARP hardware so any testing help on iWARP hardware appreciated.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index 896834c..a2f28a6 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -438,6 +438,14 @@
 	.sendpage =	sock_no_sendpage,
 };
 
+static void rds_sock_destruct(struct sock *sk)
+{
+	struct rds_sock *rs = rds_sk_to_rs(sk);
+
+	WARN_ON((&rs->rs_item != rs->rs_item.next ||
+		 &rs->rs_item != rs->rs_item.prev));
+}
+
 static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
 {
 	struct rds_sock *rs;
@@ -445,6 +453,7 @@
 	sock_init_data(sock, sk);
 	sock->ops		= &rds_proto_ops;
 	sk->sk_protocol		= protocol;
+	sk->sk_destruct		= rds_sock_destruct;
 
 	rs = rds_sk_to_rs(sk);
 	spin_lock_init(&rs->rs_lock);
diff --git a/net/rds/connection.c b/net/rds/connection.c
index d4fecb2..a50e652 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -301,6 +301,8 @@
 
 		wait_event(conn->c_waitq,
 			   !test_bit(RDS_IN_XMIT, &conn->c_flags));
+		wait_event(conn->c_waitq,
+			   !test_bit(RDS_RECV_REFILL, &conn->c_flags));
 
 		conn->c_trans->conn_shutdown(conn);
 		rds_conn_reset(conn);
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 86d88ec..6422c52 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -320,7 +320,7 @@
 int rds_ib_recv(struct rds_connection *conn);
 int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic);
 void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
-void rds_ib_recv_refill(struct rds_connection *conn, int prefill);
+void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp);
 void rds_ib_inc_free(struct rds_incoming *inc);
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
 void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index f40d8f5..d150bb4 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -135,7 +135,7 @@
 	rds_ib_recv_init_ring(ic);
 	/* Post receive buffers - as a side effect, this will update
 	 * the posted credit count. */
-	rds_ib_recv_refill(conn, 1);
+	rds_ib_recv_refill(conn, 1, GFP_KERNEL);
 
 	/* Tune RNR behavior */
 	rds_ib_tune_rnr(ic, &qp_attr);
@@ -640,6 +640,15 @@
 			   (atomic_read(&ic->i_signaled_sends) == 0));
 		tasklet_kill(&ic->i_recv_tasklet);
 
+		/* first destroy the ib state that generates callbacks */
+		if (ic->i_cm_id->qp)
+			rdma_destroy_qp(ic->i_cm_id);
+		if (ic->i_send_cq)
+			ib_destroy_cq(ic->i_send_cq);
+		if (ic->i_recv_cq)
+			ib_destroy_cq(ic->i_recv_cq);
+
+		/* then free the resources that ib callbacks use */
 		if (ic->i_send_hdrs)
 			ib_dma_free_coherent(dev,
 					   ic->i_send_ring.w_nr *
@@ -663,12 +672,6 @@
 		if (ic->i_recvs)
 			rds_ib_recv_clear_ring(ic);
 
-		if (ic->i_cm_id->qp)
-			rdma_destroy_qp(ic->i_cm_id);
-		if (ic->i_send_cq)
-			ib_destroy_cq(ic->i_send_cq);
-		if (ic->i_recv_cq)
-			ib_destroy_cq(ic->i_recv_cq);
 		rdma_destroy_id(ic->i_cm_id);
 
 		/*
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 657ba9f..7b7aac8 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -151,12 +151,17 @@
 	struct rds_ib_device *rds_ibdev_old;
 
 	rds_ibdev_old = rds_ib_get_device(ipaddr);
-	if (rds_ibdev_old) {
+	if (!rds_ibdev_old)
+		return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
+
+	if (rds_ibdev_old != rds_ibdev) {
 		rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
 		rds_ib_dev_put(rds_ibdev_old);
+		return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
 	}
+	rds_ib_dev_put(rds_ibdev_old);
 
-	return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
+	return 0;
 }
 
 void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
@@ -485,7 +490,7 @@
 
 			/* FIXME we need a way to tell a r/w MR
 			 * from a r/o MR */
-			BUG_ON(irqs_disabled());
+			WARN_ON(!page->mapping && irqs_disabled());
 			set_page_dirty(page);
 			put_page(page);
 		}
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index cac5b45..3afdcbd 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -297,7 +297,7 @@
 }
 
 static int rds_ib_recv_refill_one(struct rds_connection *conn,
-				  struct rds_ib_recv_work *recv, int prefill)
+				  struct rds_ib_recv_work *recv, gfp_t gfp)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct ib_sge *sge;
@@ -305,7 +305,7 @@
 	gfp_t slab_mask = GFP_NOWAIT;
 	gfp_t page_mask = GFP_NOWAIT;
 
-	if (prefill) {
+	if (gfp & __GFP_WAIT) {
 		slab_mask = GFP_KERNEL;
 		page_mask = GFP_HIGHUSER;
 	}
@@ -347,6 +347,24 @@
 	return ret;
 }
 
+static int acquire_refill(struct rds_connection *conn)
+{
+	return test_and_set_bit(RDS_RECV_REFILL, &conn->c_flags) == 0;
+}
+
+static void release_refill(struct rds_connection *conn)
+{
+	clear_bit(RDS_RECV_REFILL, &conn->c_flags);
+
+	/* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+	 * hot path and finding waiters is very rare.  We don't want to walk
+	 * the system-wide hashed waitqueue buckets in the fast path only to
+	 * almost never find waiters.
+	 */
+	if (waitqueue_active(&conn->c_waitq))
+		wake_up_all(&conn->c_waitq);
+}
+
 /*
  * This tries to allocate and post unused work requests after making sure that
  * they have all the allocations they need to queue received fragments into
@@ -354,15 +372,23 @@
  *
  * -1 is returned if posting fails due to temporary resource exhaustion.
  */
-void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
+void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct rds_ib_recv_work *recv;
 	struct ib_recv_wr *failed_wr;
 	unsigned int posted = 0;
 	int ret = 0;
+	int can_wait = gfp & __GFP_WAIT;
 	u32 pos;
 
+	/* the goal here is to just make sure that someone, somewhere
+	 * is posting buffers.  If we can't get the refill lock,
+	 * let them do their thing
+	 */
+	if (!acquire_refill(conn))
+		return;
+
 	while ((prefill || rds_conn_up(conn)) &&
 	       rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
 		if (pos >= ic->i_recv_ring.w_nr) {
@@ -372,7 +398,7 @@
 		}
 
 		recv = &ic->i_recvs[pos];
-		ret = rds_ib_recv_refill_one(conn, recv, prefill);
+		ret = rds_ib_recv_refill_one(conn, recv, gfp);
 		if (ret) {
 			break;
 		}
@@ -402,6 +428,24 @@
 
 	if (ret)
 		rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
+
+	release_refill(conn);
+
+	/* if we're called from the softirq handler, we'll be GFP_NOWAIT.
+	 * in this case the ring being low is going to lead to more interrupts
+	 * and we can safely let the softirq code take care of it unless the
+	 * ring is completely empty.
+	 *
+	 * if we're called from krdsd, we'll be GFP_KERNEL.  In this case
+	 * we might have raced with the softirq code while we had the refill
+	 * lock held.  Use rds_ib_ring_low() instead of ring_empty to decide
+	 * if we should requeue.
+	 */
+	if (rds_conn_up(conn) &&
+	    ((can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
+	    rds_ib_ring_empty(&ic->i_recv_ring))) {
+		queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+	}
 }
 
 /*
@@ -982,10 +1026,17 @@
 		}
 
 		/*
-		 * It's very important that we only free this ring entry if we've truly
-		 * freed the resources allocated to the entry.  The refilling path can
-		 * leak if we don't.
+		 * rds_ib_process_recv() doesn't always consume the frag, and
+		 * we might not have called it at all if the wc didn't indicate
+		 * success. We already unmapped the frag's pages, though, and
+		 * the following rds_ib_ring_free() call tells the refill path
+		 * that it will not find an allocated frag here. Make sure we
+		 * keep that promise by freeing a frag that's still on the ring.
 		 */
+		if (recv->r_frag) {
+			rds_ib_frag_free(ic, recv->r_frag);
+			recv->r_frag = NULL;
+		}
 		rds_ib_ring_free(&ic->i_recv_ring, 1);
 	}
 }
@@ -1016,7 +1067,7 @@
 		rds_ib_stats_inc(s_ib_rx_ring_empty);
 
 	if (rds_ib_ring_low(&ic->i_recv_ring))
-		rds_ib_recv_refill(conn, 0);
+		rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
 }
 
 int rds_ib_recv(struct rds_connection *conn)
@@ -1025,8 +1076,10 @@
 	int ret = 0;
 
 	rdsdebug("conn %p\n", conn);
-	if (rds_conn_up(conn))
+	if (rds_conn_up(conn)) {
 		rds_ib_attempt_ack(ic);
+		rds_ib_recv_refill(conn, 0, GFP_KERNEL);
+	}
 
 	return ret;
 }
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 5d0a704..c576ebe 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -709,6 +709,11 @@
 	if (scat == &rm->data.op_sg[rm->data.op_count]) {
 		prev->s_op = ic->i_data_op;
 		prev->s_wr.send_flags |= IB_SEND_SOLICITED;
+		if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED)) {
+			ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
+			prev->s_wr.send_flags |= IB_SEND_SIGNALED;
+			nr_sig++;
+		}
 		ic->i_data_op = NULL;
 	}
 
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 40084d8..c1df9b1 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -451,7 +451,7 @@
 		 * is the case for a RDMA_READ which copies from remote
 		 * to local memory */
 		if (!ro->op_write) {
-			BUG_ON(irqs_disabled());
+			WARN_ON(!page->mapping && irqs_disabled());
 			set_page_dirty(page);
 		}
 		put_page(page);
@@ -658,6 +658,8 @@
 		ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write);
 		if (ret < 0)
 			goto out;
+		else
+			ret = 0;
 
 		rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n",
 			 nr_bytes, nr, iov->bytes, iov->addr);
diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c
index 2082408..b9b40af 100644
--- a/net/rds/rdma_transport.c
+++ b/net/rds/rdma_transport.c
@@ -34,6 +34,7 @@
 #include <rdma/rdma_cm.h>
 
 #include "rdma_transport.h"
+#include "ib.h"
 
 static struct rdma_cm_id *rds_rdma_listen_id;
 
@@ -82,8 +83,18 @@
 		break;
 
 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
-		/* XXX worry about racing with listen acceptance */
-		ret = trans->cm_initiate_connect(cm_id);
+		/* Connection could have been dropped so make sure the
+		 * cm_id is valid before proceeding
+		 */
+		if (conn) {
+			struct rds_ib_connection *ibic;
+
+			ibic = conn->c_transport_data;
+			if (ibic && ibic->i_cm_id == cm_id)
+				ret = trans->cm_initiate_connect(cm_id);
+			else
+				rds_conn_drop(conn);
+		}
 		break;
 
 	case RDMA_CM_EVENT_ESTABLISHED:
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 9005fb0..afb4048 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -80,6 +80,7 @@
 #define RDS_LL_SEND_FULL	0
 #define RDS_RECONNECT_PENDING	1
 #define RDS_IN_XMIT		2
+#define RDS_RECV_REFILL		3
 
 struct rds_connection {
 	struct hlist_node	c_hash_node;
diff --git a/net/rds/send.c b/net/rds/send.c
index 2581b8e..4df61a5 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -282,26 +282,34 @@
 		/* The transport either sends the whole rdma or none of it */
 		if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
 			rm->m_final_op = &rm->rdma;
+			/* The transport owns the mapped memory for now.
+			 * You can't unmap it while it's on the send queue
+			 */
+			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
-			if (ret)
+			if (ret) {
+				clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
+				wake_up_interruptible(&rm->m_flush_wait);
 				break;
+			}
 			conn->c_xmit_rdma_sent = 1;
 
-			/* The transport owns the mapped memory for now.
-			 * You can't unmap it while it's on the send queue */
-			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 		}
 
 		if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
 			rm->m_final_op = &rm->atomic;
+			/* The transport owns the mapped memory for now.
+			 * You can't unmap it while it's on the send queue
+			 */
+			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 			ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
-			if (ret)
+			if (ret) {
+				clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
+				wake_up_interruptible(&rm->m_flush_wait);
 				break;
+			}
 			conn->c_xmit_atomic_sent = 1;
 
-			/* The transport owns the mapped memory for now.
-			 * You can't unmap it while it's on the send queue */
-			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 		}
 
 		/*
@@ -411,7 +419,8 @@
 	 */
 	if (ret == 0) {
 		smp_mb();
-		if (!list_empty(&conn->c_send_queue) &&
+		if ((test_bit(0, &conn->c_map_queued) ||
+		     !list_empty(&conn->c_send_queue)) &&
 		    send_gen == conn->c_send_gen) {
 			rds_stats_inc(s_send_lock_queue_raced);
 			goto restart;
@@ -769,8 +778,22 @@
 	while (!list_empty(&list)) {
 		rm = list_entry(list.next, struct rds_message, m_sock_item);
 		list_del_init(&rm->m_sock_item);
-
 		rds_message_wait(rm);
+
+		/* just in case the code above skipped this message
+		 * because RDS_MSG_ON_CONN wasn't set, run it again here
+		 * taking m_rs_lock is the only thing that keeps us
+		 * from racing with ack processing.
+		 */
+		spin_lock_irqsave(&rm->m_rs_lock, flags);
+
+		spin_lock(&rs->rs_lock);
+		__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
+		spin_unlock(&rs->rs_lock);
+
+		rm->m_rs = NULL;
+		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
+
 		rds_message_put(rm);
 	}
 }
@@ -992,6 +1015,11 @@
 		goto out;
 	}
 
+	if (payload_len > rds_sk_sndbuf(rs)) {
+		ret = -EMSGSIZE;
+		goto out;
+	}
+
 	/* size of rm including all sgs */
 	ret = rds_rm_size(msg, payload_len);
 	if (ret < 0)
@@ -1064,11 +1092,7 @@
 	while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
 				  dport, &queued)) {
 		rds_stats_inc(s_send_queue_full);
-		/* XXX make sure this is reasonable */
-		if (payload_len > rds_sk_sndbuf(rs)) {
-			ret = -EMSGSIZE;
-			goto out;
-		}
+
 		if (nonblock) {
 			ret = -EAGAIN;
 			goto out;