RDS: split out connection specific state from rds_connection to rds_conn_path

In preparation for multipath RDS, split the rds_connection
structure into a base structure, and a per-path struct rds_conn_path.
The base structure tracks information and locks common to all
paths. The workqs for send/recv/shutdown etc are tracked per
rds_conn_path. Thus the workq callbacks now work with rds_conn_path.

This commit allows for one rds_conn_path per rds_connection, and will
be extended into multiple conn_paths in  subsequent commits.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 387df5f..ca31a07 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -84,56 +84,69 @@
 #define RDS_IN_XMIT		2
 #define RDS_RECV_REFILL		3
 
+/* Max number of multipaths per RDS connection. Must be a power of 2 */
+#define	RDS_MPATH_WORKERS	1
+
+/* Per mpath connection state */
+struct rds_conn_path {
+	struct rds_connection	*cp_conn;
+	struct rds_message	*cp_xmit_rm;
+	unsigned long		cp_xmit_sg;
+	unsigned int		cp_xmit_hdr_off;
+	unsigned int		cp_xmit_data_off;
+	unsigned int		cp_xmit_atomic_sent;
+	unsigned int		cp_xmit_rdma_sent;
+	unsigned int		cp_xmit_data_sent;
+
+	spinlock_t		cp_lock;		/* protect msg queues */
+	u64			cp_next_tx_seq;
+	struct list_head	cp_send_queue;
+	struct list_head	cp_retrans;
+
+	u64			cp_next_rx_seq;
+
+	void			*cp_transport_data;
+
+	atomic_t		cp_state;
+	unsigned long		cp_send_gen;
+	unsigned long		cp_flags;
+	unsigned long		cp_reconnect_jiffies;
+	struct delayed_work	cp_send_w;
+	struct delayed_work	cp_recv_w;
+	struct delayed_work	cp_conn_w;
+	struct work_struct	cp_down_w;
+	struct mutex		cp_cm_lock;	/* protect cp_state & cm */
+	wait_queue_head_t	cp_waitq;
+
+	unsigned int		cp_unacked_packets;
+	unsigned int		cp_unacked_bytes;
+	unsigned int		cp_outgoing:1,
+				cp_pad_to_32:31;
+	unsigned int		cp_index;
+};
+
+/* One rds_connection per RDS address pair */
 struct rds_connection {
 	struct hlist_node	c_hash_node;
 	__be32			c_laddr;
 	__be32			c_faddr;
 	unsigned int		c_loopback:1,
-				c_outgoing:1,
-				c_pad_to_32:30;
+				c_pad_to_32:31;
+	int			c_npaths;
 	struct rds_connection	*c_passive;
+	struct rds_transport	*c_trans;
 
 	struct rds_cong_map	*c_lcong;
 	struct rds_cong_map	*c_fcong;
 
-	struct rds_message	*c_xmit_rm;
-	unsigned long		c_xmit_sg;
-	unsigned int		c_xmit_hdr_off;
-	unsigned int		c_xmit_data_off;
-	unsigned int		c_xmit_atomic_sent;
-	unsigned int		c_xmit_rdma_sent;
-	unsigned int		c_xmit_data_sent;
-
-	spinlock_t		c_lock;		/* protect msg queues */
-	u64			c_next_tx_seq;
-	struct list_head	c_send_queue;
-	struct list_head	c_retrans;
-
-	u64			c_next_rx_seq;
-
-	struct rds_transport	*c_trans;
-	void			*c_transport_data;
-
-	atomic_t		c_state;
-	unsigned long		c_send_gen;
-	unsigned long		c_flags;
-	unsigned long		c_reconnect_jiffies;
-	struct delayed_work	c_send_w;
-	struct delayed_work	c_recv_w;
-	struct delayed_work	c_conn_w;
-	struct work_struct	c_down_w;
-	struct mutex		c_cm_lock;	/* protect conn state & cm */
-	wait_queue_head_t	c_waitq;
+	/* Protocol version */
+	unsigned int		c_version;
+	possible_net_t		c_net;
 
 	struct list_head	c_map_item;
 	unsigned long		c_map_queued;
 
-	unsigned int		c_unacked_packets;
-	unsigned int		c_unacked_bytes;
-
-	/* Protocol version */
-	unsigned int		c_version;
-	possible_net_t		c_net;
+	struct rds_conn_path	c_path[RDS_MPATH_WORKERS];
 };
 
 static inline
@@ -639,6 +652,7 @@
 void rds_conn_shutdown(struct rds_connection *conn);
 void rds_conn_destroy(struct rds_connection *conn);
 void rds_conn_drop(struct rds_connection *conn);
+void rds_conn_path_drop(struct rds_conn_path *cpath);
 void rds_conn_connect_if_down(struct rds_connection *conn);
 void rds_for_each_conn_info(struct socket *sock, unsigned int len,
 			  struct rds_info_iterator *iter,
@@ -651,27 +665,51 @@
 	__rds_conn_error(conn, KERN_WARNING "RDS: " fmt)
 
 static inline int
+rds_conn_path_transition(struct rds_conn_path *cp, int old, int new)
+{
+	return atomic_cmpxchg(&cp->cp_state, old, new) == old;
+}
+
+static inline int
 rds_conn_transition(struct rds_connection *conn, int old, int new)
 {
-	return atomic_cmpxchg(&conn->c_state, old, new) == old;
+	return rds_conn_path_transition(&conn->c_path[0], old, new);
+}
+
+static inline int
+rds_conn_path_state(struct rds_conn_path *cp)
+{
+	return atomic_read(&cp->cp_state);
 }
 
 static inline int
 rds_conn_state(struct rds_connection *conn)
 {
-	return atomic_read(&conn->c_state);
+	return rds_conn_path_state(&conn->c_path[0]);
+}
+
+static inline int
+rds_conn_path_up(struct rds_conn_path *cp)
+{
+	return atomic_read(&cp->cp_state) == RDS_CONN_UP;
 }
 
 static inline int
 rds_conn_up(struct rds_connection *conn)
 {
-	return atomic_read(&conn->c_state) == RDS_CONN_UP;
+	return rds_conn_path_up(&conn->c_path[0]);
+}
+
+static inline int
+rds_conn_path_connecting(struct rds_conn_path *cp)
+{
+	return atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING;
 }
 
 static inline int
 rds_conn_connecting(struct rds_connection *conn)
 {
-	return atomic_read(&conn->c_state) == RDS_CONN_CONNECTING;
+	return rds_conn_path_connecting(&conn->c_path[0]);
 }
 
 /* message.c */
@@ -809,12 +847,12 @@
 int rds_threads_init(void);
 void rds_threads_exit(void);
 extern struct workqueue_struct *rds_wq;
-void rds_queue_reconnect(struct rds_connection *conn);
+void rds_queue_reconnect(struct rds_conn_path *cp);
 void rds_connect_worker(struct work_struct *);
 void rds_shutdown_worker(struct work_struct *);
 void rds_send_worker(struct work_struct *);
 void rds_recv_worker(struct work_struct *);
-void rds_connect_path_complete(struct rds_connection *conn, int curr);
+void rds_connect_path_complete(struct rds_conn_path *conn, int curr);
 void rds_connect_complete(struct rds_connection *conn);
 
 /* transport.c */