RDS/IB: track signaled sends

We're seeing bugs today where IB connection shutdown clears the send
ring while the tasklet is processing completed sends.  Implementation
details cause this to dereference a null pointer.  Shutdown needs to
wait for send completion to stop before tearing down the connection.  We
can't simply wait for the ring to empty because it may contain
unsignaled sends that will never be processed.

This patch tracks the number of signaled sends that we've posted and
waits for them to complete.  It also makes sure that the tasklet has
finished executing.

Signed-off-by: Zach Brown <zach.brown@oracle.com>
diff --git a/net/rds/ib.h b/net/rds/ib.h
index acda2db..a13ced5 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -108,6 +108,7 @@
 	struct rds_header	*i_send_hdrs;
 	u64			i_send_hdrs_dma;
 	struct rds_ib_send_work *i_sends;
+	atomic_t		i_signaled_sends;
 
 	/* rx */
 	struct tasklet_struct	i_recv_tasklet;
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 10f6a88..123c7d3 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -615,11 +615,18 @@
 		}
 
 		/*
-		 * Don't wait for the send ring to be empty -- there may be completed
-		 * non-signaled entries sitting on there. We unmap these below.
+		 * We want to wait for tx and rx completion to finish
+		 * before we tear down the connection, but we have to be
+		 * careful not to get stuck waiting on a send ring that
+		 * only has unsignaled sends in it.  We've shutdown new
+		 * sends before getting here so by waiting for signaled
+		 * sends to complete we're ensured that there will be no
+		 * more tx processing.
 		 */
 		wait_event(rds_ib_ring_empty_wait,
-			rds_ib_ring_empty(&ic->i_recv_ring));
+			   rds_ib_ring_empty(&ic->i_recv_ring) &&
+			   (atomic_read(&ic->i_signaled_sends) == 0));
+		tasklet_kill(&ic->i_recv_tasklet);
 
 		if (ic->i_send_hdrs)
 			ib_dma_free_coherent(dev,
@@ -729,6 +736,7 @@
 #ifndef KERNEL_HAS_ATOMIC64
 	spin_lock_init(&ic->i_ack_lock);
 #endif
+	atomic_set(&ic->i_signaled_sends, 0);
 
 	/*
 	 * rds_ib_conn_shutdown() waits for these to be emptied so they
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index e88cb4a..15f7569 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -220,6 +220,18 @@
 }
 
 /*
+ * The only fast path caller always has a non-zero nr, so we don't
+ * bother testing nr before performing the atomic sub.
+ */
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+	if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+	    waitqueue_active(&rds_ib_ring_empty_wait))
+		wake_up(&rds_ib_ring_empty_wait);
+	BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
+/*
  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
  * operations performed in the send path.  As the sender allocs and potentially
  * unallocs the next free entry in the ring it doesn't alter which is
@@ -236,6 +248,7 @@
 	u32 oldest;
 	u32 i = 0;
 	int ret;
+	int nr_sig = 0;
 
 	rdsdebug("cq %p conn %p\n", cq, conn);
 	rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@
 
 		for (i = 0; i < completed; i++) {
 			send = &ic->i_sends[oldest];
+			if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+				nr_sig++;
 
 			rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
@@ -282,6 +297,8 @@
 		}
 
 		rds_ib_ring_free(&ic->i_send_ring, completed);
+		rds_ib_sub_signaled(ic, nr_sig);
+		nr_sig = 0;
 
 		if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
 		    test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@
 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
-					      struct rds_ib_send_work *send,
-					      bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+					     struct rds_ib_send_work *send,
+					     bool notify)
 {
 	/*
 	 * We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@
 	if (ic->i_unsignaled_wrs-- == 0 || notify) {
 		ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
 		send->s_wr.send_flags |= IB_SEND_SIGNALED;
+		return 1;
 	}
+	return 0;
 }
 
 /*
@@ -488,6 +507,7 @@
 	int bytes_sent = 0;
 	int ret;
 	int flow_controlled = 0;
+	int nr_sig = 0;
 
 	BUG_ON(off % RDS_FRAG_SIZE);
 	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@
 		if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
 			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 
+		if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+			nr_sig++;
+
 		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 			 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
@@ -689,6 +712,9 @@
 	if (ic->i_flowctl && i < credit_alloc)
 		rds_ib_send_add_credits(conn, credit_alloc - i);
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	/* XXX need to worry about failed_wr and partial sends. */
 	failed_wr = &first->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@
 		printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		if (prev->s_op) {
 			ic->i_data_op = prev->s_op;
 			prev->s_op = NULL;
@@ -728,6 +755,7 @@
 	u32 pos;
 	u32 work_alloc;
 	int ret;
+	int nr_sig = 0;
 
 	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
@@ -752,7 +780,7 @@
 		send->s_wr.wr.atomic.compare_add = op->op_swap_add;
 		send->s_wr.wr.atomic.swap = 0;
 	}
-	rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+	nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 	send->s_wr.num_sge = 1;
 	send->s_wr.next = NULL;
 	send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@
 	rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
 		 send->s_sge[0].addr, send->s_sge[0].length);
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	failed_wr = &send->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
 	rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@
 		printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		goto out;
 	}
 
@@ -817,6 +849,7 @@
 	int sent;
 	int ret;
 	int num_sge;
+	int nr_sig = 0;
 
 	/* map the op the first time we see it */
 	if (!op->op_mapped) {
@@ -859,7 +892,7 @@
 		send->s_queued = jiffies;
 		send->s_op = NULL;
 
-		rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+		nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
 		send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
 		send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@
 		work_alloc = i;
 	}
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	failed_wr = &first->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
 	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@
 		printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		goto out;
 	}