rds: use RCU to synchronize work-enqueue with connection teardown

rds_sendmsg() can enqueue work on cp_send_w from process context, but
it should not enqueue this work if connection teardown  has commenced
(else we risk enquing work after rds_conn_path_destroy() has assumed that
all work has been cancelled/flushed).

Similarly some other functions like rds_cong_queue_updates
and rds_tcp_data_ready are called in softirq context, and may end
up enqueuing work on rds_wq after rds_conn_path_destroy() has assumed
that all workqs are quiesced.

Check the RDS_DESTROY_PENDING bit and use rcu synchronization to avoid
all these races.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 1eed197e..b10c0ef 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -366,8 +366,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
 	 * to the conn hash, so we never trigger a reconnect on this
 	 * conn - the reconnect is always triggered by the active peer. */
 	cancel_delayed_work_sync(&cp->cp_conn_w);
-	if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
-		return;
 	rcu_read_lock();
 	if (!hlist_unhashed(&conn->c_hash_node)) {
 		rcu_read_unlock();
@@ -390,6 +388,7 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
 		return;
 
 	/* make sure lingering queued work won't try to ref the conn */
+	synchronize_rcu();
 	cancel_delayed_work_sync(&cp->cp_send_w);
 	cancel_delayed_work_sync(&cp->cp_recv_w);
 
@@ -407,6 +406,11 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
 	if (cp->cp_xmit_rm)
 		rds_message_put(cp->cp_xmit_rm);
 
+	WARN_ON(delayed_work_pending(&cp->cp_send_w));
+	WARN_ON(delayed_work_pending(&cp->cp_recv_w));
+	WARN_ON(delayed_work_pending(&cp->cp_conn_w));
+	WARN_ON(work_pending(&cp->cp_down_w));
+
 	cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
 }
 
@@ -686,10 +690,13 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
 {
 	atomic_set(&cp->cp_state, RDS_CONN_ERROR);
 
-	if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+	rcu_read_lock();
+	if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
+		rcu_read_unlock();
 		return;
-
+	}
 	queue_work(rds_wq, &cp->cp_down_w);
+	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_drop);
 
@@ -706,9 +713,15 @@ EXPORT_SYMBOL_GPL(rds_conn_drop);
  */
 void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
 {
+	rcu_read_lock();
+	if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
+		rcu_read_unlock();
+		return;
+	}
 	if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
 	    !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
 		queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);