RDS: purge atomic resources too in rds_message_purge()

Add atomic_free_op function, analogous to rdma_free_op,
and call it in rds_message_purge().

Signed-off-by: Andy Grover <andy.grover@oracle.com>
diff --git a/net/rds/message.c b/net/rds/message.c
index a27e493..b53306c 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -73,6 +73,11 @@
 		rds_rdma_free_op(&rm->rdma.m_rdma_op);
 	if (rm->rdma.m_rdma_mr)
 		rds_mr_put(rm->rdma.m_rdma_mr);
+
+	if (rm->atomic.op_active)
+		rds_atomic_free_op(&rm->atomic);
+	if (rm->atomic.op_rdma_mr)
+		rds_mr_put(rm->atomic.op_rdma_mr);
 }
 
 void rds_message_put(struct rds_message *rm)
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 91967c8..0df86a3 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -462,6 +462,22 @@
 	ro->r_active = 0;
 }
 
+void rds_atomic_free_op(struct rm_atomic_op *ao)
+{
+	struct page *page = sg_page(ao->op_sg);
+
+	/* Mark page dirty if it was possibly modified, which
+	 * is the case for a RDMA_READ which copies from remote
+	 * to local memory */
+	set_page_dirty(page);
+	put_page(page);
+
+	kfree(ao->op_notifier);
+	ao->op_notifier = NULL;
+	ao->op_active = 0;
+}
+
+
 /*
  * Count the number of pages needed to describe an incoming iovec.
  */
diff --git a/net/rds/rds.h b/net/rds/rds.h
index bf2349da..32b3d46 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -745,6 +745,7 @@
 int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
 			  struct cmsghdr *cmsg);
 void rds_rdma_free_op(struct rds_rdma_op *ro);
+void rds_atomic_free_op(struct rm_atomic_op *ao);
 void rds_rdma_send_complete(struct rds_message *rm, int wc_status);
 void rds_atomic_send_complete(struct rds_message *rm, int wc_status);
 int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,