libceph, rbd: new bio handling code (aka don't clone bios)

The reason we clone bios is to be able to give each object request
(and consequently each ceph_osd_data/ceph_msg_data item) its own
pointer to a (list of) bio(s).  The messenger then initializes its
cursor with cloned bio's ->bi_iter, so it knows where to start reading
from/writing to.  That's all the cloned bios are used for: to determine
each object request's starting position in the provided data buffer.

Introduce ceph_bio_iter to do exactly that -- store position within bio
list (i.e. pointer to bio) + position within that bio (i.e. bvec_iter).

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 8a4d375..b9fa8b8 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -839,90 +839,57 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
 					size_t length)
 {
 	struct ceph_msg_data *data = cursor->data;
-	struct bio *bio;
+	struct ceph_bio_iter *it = &cursor->bio_iter;
 
-	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+	cursor->resid = min_t(size_t, length, data->bio_length);
+	*it = data->bio_pos;
+	if (cursor->resid < it->iter.bi_size)
+		it->iter.bi_size = cursor->resid;
 
-	bio = data->bio;
-	BUG_ON(!bio);
-
-	cursor->resid = min(length, data->bio_length);
-	cursor->bio = bio;
-	cursor->bvec_iter = bio->bi_iter;
-	cursor->last_piece =
-		cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);
+	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+	cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 }
 
 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
 						size_t *page_offset,
 						size_t *length)
 {
-	struct ceph_msg_data *data = cursor->data;
-	struct bio *bio;
-	struct bio_vec bio_vec;
+	struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
+					   cursor->bio_iter.iter);
 
-	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
-
-	bio = cursor->bio;
-	BUG_ON(!bio);
-
-	bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
-
-	*page_offset = (size_t) bio_vec.bv_offset;
-	BUG_ON(*page_offset >= PAGE_SIZE);
-	if (cursor->last_piece) /* pagelist offset is always 0 */
-		*length = cursor->resid;
-	else
-		*length = (size_t) bio_vec.bv_len;
-	BUG_ON(*length > cursor->resid);
-	BUG_ON(*page_offset + *length > PAGE_SIZE);
-
-	return bio_vec.bv_page;
+	*page_offset = bv.bv_offset;
+	*length = bv.bv_len;
+	return bv.bv_page;
 }
 
 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
 					size_t bytes)
 {
-	struct bio *bio;
-	struct bio_vec bio_vec;
+	struct ceph_bio_iter *it = &cursor->bio_iter;
 
-	BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
-
-	bio = cursor->bio;
-	BUG_ON(!bio);
-
-	bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
-
-	/* Advance the cursor offset */
-
-	BUG_ON(cursor->resid < bytes);
+	BUG_ON(bytes > cursor->resid);
+	BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
 	cursor->resid -= bytes;
+	bio_advance_iter(it->bio, &it->iter, bytes);
 
-	bio_advance_iter(bio, &cursor->bvec_iter, bytes);
+	if (!cursor->resid) {
+		BUG_ON(!cursor->last_piece);
+		return false;   /* no more data */
+	}
 
-	if (bytes < bio_vec.bv_len)
+	if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done))
 		return false;	/* more bytes to process in this segment */
 
-	/* Move on to the next segment, and possibly the next bio */
-
-	if (!cursor->bvec_iter.bi_size) {
-		bio = bio->bi_next;
-		cursor->bio = bio;
-		if (bio)
-			cursor->bvec_iter = bio->bi_iter;
-		else
-			memset(&cursor->bvec_iter, 0,
-			       sizeof(cursor->bvec_iter));
+	if (!it->iter.bi_size) {
+		it->bio = it->bio->bi_next;
+		it->iter = it->bio->bi_iter;
+		if (cursor->resid < it->iter.bi_size)
+			it->iter.bi_size = cursor->resid;
 	}
 
-	if (!cursor->last_piece) {
-		BUG_ON(!cursor->resid);
-		BUG_ON(!bio);
-		/* A short read is OK, so use <= rather than == */
-		if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))
-			cursor->last_piece = true;
-	}
-
+	BUG_ON(cursor->last_piece);
+	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+	cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 	return true;
 }
 #endif /* CONFIG_BLOCK */
@@ -1163,9 +1130,11 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
 		page = NULL;
 		break;
 	}
+
 	BUG_ON(!page);
 	BUG_ON(*page_offset + *length > PAGE_SIZE);
 	BUG_ON(!*length);
+	BUG_ON(*length > cursor->resid);
 	if (last_piece)
 		*last_piece = cursor->last_piece;
 
@@ -3262,16 +3231,14 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
 
 #ifdef	CONFIG_BLOCK
-void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
-		size_t length)
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
+			   u32 length)
 {
 	struct ceph_msg_data *data;
 
-	BUG_ON(!bio);
-
 	data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
 	BUG_ON(!data);
-	data->bio = bio;
+	data->bio_pos = *bio_pos;
 	data->bio_length = length;
 
 	list_add_tail(&data->links, &msg->data);