libceph: replace message data pointer with list

In place of the message data pointer, use a list head which links
through message data items.  For now we only support a single entry
on that list.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3aa0f30..8bfe7d3 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -985,8 +985,10 @@
 static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
 {
 	struct ceph_msg_data_cursor *cursor = &msg->cursor;
+	struct ceph_msg_data *data;
 
-	cursor->data = msg->data;
+	data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+	cursor->data = data;
 	switch (cursor->data->type) {
 	case CEPH_MSG_DATA_PAGELIST:
 		ceph_msg_data_pagelist_cursor_init(cursor, length);
@@ -1410,7 +1412,7 @@
 
 	dout("%s %p msg %p\n", __func__, con, msg);
 
-	if (WARN_ON(!msg->data))
+	if (list_empty(&msg->data))
 		return -EINVAL;
 
 	/*
@@ -2111,7 +2113,7 @@
 	int ret;
 
 	BUG_ON(!msg);
-	if (!msg->data)
+	if (list_empty(&msg->data))
 		return -EIO;
 
 	if (do_datacrc)
@@ -2963,6 +2965,7 @@
 	data = kzalloc(sizeof (*data), GFP_NOFS);
 	if (data)
 		data->type = type;
+	INIT_LIST_HEAD(&data->links);
 
 	return data;
 }
@@ -2972,6 +2975,7 @@
 	if (!data)
 		return;
 
+	WARN_ON(!list_empty(&data->links));
 	if (data->type == CEPH_MSG_DATA_PAGELIST) {
 		ceph_pagelist_release(data->pagelist);
 		kfree(data->pagelist);
@@ -2987,7 +2991,7 @@
 	BUG_ON(!pages);
 	BUG_ON(!length);
 	BUG_ON(msg->data_length);
-	BUG_ON(msg->data != NULL);
+	BUG_ON(!list_empty(&msg->data));
 
 	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
 	BUG_ON(!data);
@@ -2995,8 +2999,9 @@
 	data->length = length;
 	data->alignment = alignment & ~PAGE_MASK;
 
-	msg->data = data;
-	msg->data_length = length;
+	BUG_ON(!list_empty(&msg->data));
+	list_add_tail(&data->links, &msg->data);
+	msg->data_length += length;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pages);
 
@@ -3008,14 +3013,14 @@
 	BUG_ON(!pagelist);
 	BUG_ON(!pagelist->length);
 	BUG_ON(msg->data_length);
-	BUG_ON(msg->data != NULL);
+	BUG_ON(!list_empty(&msg->data));
 
 	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
 	BUG_ON(!data);
 	data->pagelist = pagelist;
 
-	msg->data = data;
-	msg->data_length = pagelist->length;
+	list_add_tail(&data->links, &msg->data);
+	msg->data_length += pagelist->length;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
 
@@ -3027,15 +3032,15 @@
 
 	BUG_ON(!bio);
 	BUG_ON(msg->data_length);
-	BUG_ON(msg->data != NULL);
+	BUG_ON(!list_empty(&msg->data));
 
 	data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
 	BUG_ON(!data);
 	data->bio = bio;
 	data->bio_length = length;
 
-	msg->data = data;
-	msg->data_length = length;
+	list_add_tail(&data->links, &msg->data);
+	msg->data_length += length;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_bio);
 #endif	/* CONFIG_BLOCK */
@@ -3059,6 +3064,7 @@
 
 	INIT_LIST_HEAD(&m->list_head);
 	kref_init(&m->kref);
+	INIT_LIST_HEAD(&m->data);
 
 	/* front */
 	m->front_max = front_len;
@@ -3204,6 +3210,9 @@
 void ceph_msg_last_put(struct kref *kref)
 {
 	struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+	LIST_HEAD(data);
+	struct list_head *links;
+	struct list_head *next;
 
 	dout("ceph_msg_put last one on %p\n", m);
 	WARN_ON(!list_empty(&m->list_head));
@@ -3213,8 +3222,15 @@
 		ceph_buffer_put(m->middle);
 		m->middle = NULL;
 	}
-	ceph_msg_data_destroy(m->data);
-	m->data = NULL;
+
+	list_splice_init(&m->data, &data);
+	list_for_each_safe(links, next, &data) {
+		struct ceph_msg_data *data;
+
+		data = list_entry(links, struct ceph_msg_data, links);
+		list_del_init(links);
+		ceph_msg_data_destroy(data);
+	}
 	m->data_length = 0;
 
 	if (m->pool)
@@ -3227,7 +3243,7 @@
 void ceph_msg_dump(struct ceph_msg *msg)
 {
 	pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
-		 msg->front_max, msg->data->length);
+		 msg->front_max, msg->data_length);
 	print_hex_dump(KERN_DEBUG, "header: ",
 		       DUMP_PREFIX_OFFSET, 16, 1,
 		       &msg->hdr, sizeof(msg->hdr), true);