ceph: support ceph_pagelist for message payload
The ceph_pagelist is a simple list of whole pages, strung together via
their lru list_head. It facilitates encoding to a "buffer" of unknown
size. Allow its use in place of the ceph_msg page vector.
This will be used to fix the huge buffer preallocation woes of MDS
reconnection.
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 68052f6..c1106e8 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -13,6 +13,7 @@
#include "super.h"
#include "messenger.h"
#include "decode.h"
+#include "pagelist.h"
/*
* Ceph uses the messenger to exchange ceph_msg messages with other
@@ -728,6 +729,11 @@
page = msg->pages[con->out_msg_pos.page];
if (crc)
kaddr = kmap(page);
+ } else if (msg->pagelist) {
+ page = list_first_entry(&msg->pagelist->head,
+ struct page, lru);
+ if (crc)
+ kaddr = kmap(page);
} else {
page = con->msgr->zero_page;
if (crc)
@@ -750,7 +756,7 @@
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
- if (crc && msg->pages)
+ if (crc && (msg->pages || msg->pagelist))
kunmap(page);
if (ret <= 0)
@@ -762,6 +768,9 @@
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.page++;
con->out_msg_pos.did_page_crc = 0;
+ if (msg->pagelist)
+ list_move_tail(&page->lru,
+ &msg->pagelist->head);
}
}
@@ -1051,13 +1060,13 @@
&con->actual_peer_addr) &&
!(addr_is_blank(&con->actual_peer_addr.in_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
- pr_err("wrong peer, want %s/%d, "
- "got %s/%d, wtf\n",
+ pr_warning("wrong peer, want %s/%d, "
+ "got %s/%d\n",
pr_addr(&con->peer_addr.in_addr),
con->peer_addr.nonce,
pr_addr(&con->actual_peer_addr.in_addr),
con->actual_peer_addr.nonce);
- con->error_msg = "protocol error, wrong peer";
+ con->error_msg = "wrong peer at address";
return -1;
}
@@ -2096,6 +2105,7 @@
/* data */
m->nr_pages = calc_pages_for(page_off, page_len);
m->pages = pages;
+ m->pagelist = NULL;
dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
m->nr_pages);
@@ -2181,6 +2191,12 @@
m->nr_pages = 0;
m->pages = NULL;
+ if (m->pagelist) {
+ ceph_pagelist_release(m->pagelist);
+ kfree(m->pagelist);
+ m->pagelist = NULL;
+ }
+
if (m->pool)
ceph_msgpool_put(m->pool, m);
else