RDS: Implement atomic operations

Implement a CMSG-based interface to do FADD and CSWP ops.

Alter send routines to handle atomic ops.

Add atomic counters to stats.

Add xmit_atomic() to struct rds_transport

Inline rds_ib_send_unmap_rdma into unmap_rm

Signed-off-by: Andy Grover <andy.grover@oracle.com>
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 0bb4957..830e2bb 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -97,6 +97,7 @@
 	unsigned int		c_xmit_hdr_off;
 	unsigned int		c_xmit_data_off;
 	unsigned int		c_xmit_rdma_sent;
+	unsigned int		c_xmit_atomic_sent;
 
 	spinlock_t		c_lock;		/* protect msg queues */
 	u64			c_next_tx_seq;
@@ -260,6 +261,10 @@
 	return cookie >> 32;
 }
 
+/* atomic operation types */
+#define RDS_ATOMIC_TYPE_CSWP		0
+#define RDS_ATOMIC_TYPE_FADD		1
+
 /*
  * m_sock_item and m_conn_item are on lists that are serialized under
  * conn->c_lock.  m_sock_item has additional meaning in that once it is empty
@@ -315,11 +320,27 @@
 	struct rds_sock		*m_rs;
 	rds_rdma_cookie_t	m_rdma_cookie;
 	struct {
-		struct {
+		struct rm_atomic_op {
+			int			op_type;
+			uint64_t		op_swap_add;
+			uint64_t		op_compare;
+
+			u32			op_rkey;
+			u64			op_remote_addr;
+			unsigned int		op_notify:1;
+			unsigned int		op_recverr:1;
+			unsigned int		op_mapped:1;
+			unsigned int		op_active:1;
+			struct rds_notifier	*op_notifier;
+			struct scatterlist	*op_sg;
+
+			struct rds_mr		*op_rdma_mr;
+		} atomic;
+		struct rm_rdma_op {
 			struct rds_rdma_op	m_rdma_op;
 			struct rds_mr		*m_rdma_mr;
 		} rdma;
-		struct {
+		struct rm_data_op {
 			unsigned int		m_nents;
 			unsigned int		m_count;
 			struct scatterlist	*m_sg;
@@ -397,6 +418,7 @@
 	int (*xmit_cong_map)(struct rds_connection *conn,
 			     struct rds_cong_map *map, unsigned long offset);
 	int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op);
+	int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
 	int (*recv)(struct rds_connection *conn);
 	int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
 				size_t size);
@@ -546,6 +568,8 @@
 	uint64_t	s_cong_update_received;
 	uint64_t	s_cong_send_error;
 	uint64_t	s_cong_send_blocked;
+	uint64_t	s_atomic_cswp;
+	uint64_t	s_atomic_fadd;
 };
 
 /* af_rds.c */
@@ -722,7 +746,10 @@
 int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
 			  struct cmsghdr *cmsg);
 void rds_rdma_free_op(struct rds_rdma_op *ro);
-void rds_rdma_send_complete(struct rds_message *rm, int);
+void rds_rdma_send_complete(struct rds_message *rm, int wc_status);
+void rds_atomic_send_complete(struct rds_message *rm, int wc_status);
+int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
+		    struct cmsghdr *cmsg);
 
 extern void __rds_put_mr_final(struct rds_mr *mr);
 static inline void rds_mr_put(struct rds_mr *mr)