dm thin: defer whole cells rather than individual bios

This avoids dropping the cell, so increases the probability that other
bios will collect within the cell, rather than being passed individually
to the worker.

Also add required process_cell and process_discard_cell error handling
wrappers and set associated pool-mode function pointers accordingly.

Signed-off-by: Joe Thornber <ejt@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@redhat.com>
diff --git a/drivers/md/dm-thin.c b/drivers/md/dm-thin.c
index 5256271..912d7f4 100644
--- a/drivers/md/dm-thin.c
+++ b/drivers/md/dm-thin.c
@@ -202,6 +202,7 @@
 
 struct thin_c;
 typedef void (*process_bio_fn)(struct thin_c *tc, struct bio *bio);
+typedef void (*process_cell_fn)(struct thin_c *tc, struct dm_bio_prison_cell *cell);
 typedef void (*process_mapping_fn)(struct dm_thin_new_mapping *m);
 
 struct pool {
@@ -246,6 +247,9 @@
 	process_bio_fn process_bio;
 	process_bio_fn process_discard;
 
+	process_cell_fn process_cell;
+	process_cell_fn process_discard_cell;
+
 	process_mapping_fn process_prepared_mapping;
 	process_mapping_fn process_prepared_discard;
 };
@@ -282,6 +286,7 @@
 	struct dm_thin_device *td;
 	bool requeue_mode:1;
 	spinlock_t lock;
+	struct list_head deferred_cells;
 	struct bio_list deferred_bio_list;
 	struct bio_list retry_on_resume_list;
 	struct rb_root sort_bio_list; /* sorted list of deferred bios */
@@ -346,19 +351,6 @@
 	dm_bio_prison_free_cell(pool->prison, cell);
 }
 
-static void cell_defer_no_holder_no_free(struct thin_c *tc,
-					 struct dm_bio_prison_cell *cell)
-{
-	struct pool *pool = tc->pool;
-	unsigned long flags;
-
-	spin_lock_irqsave(&tc->lock, flags);
-	dm_cell_release_no_holder(pool->prison, cell, &tc->deferred_bio_list);
-	spin_unlock_irqrestore(&tc->lock, flags);
-
-	wake_worker(pool);
-}
-
 static void cell_error_with_code(struct pool *pool,
 				 struct dm_bio_prison_cell *cell, int error_code)
 {
@@ -371,6 +363,16 @@
 	cell_error_with_code(pool, cell, -EIO);
 }
 
+static void cell_success(struct pool *pool, struct dm_bio_prison_cell *cell)
+{
+	cell_error_with_code(pool, cell, 0);
+}
+
+static void cell_requeue(struct pool *pool, struct dm_bio_prison_cell *cell)
+{
+	cell_error_with_code(pool, cell, DM_ENDIO_REQUEUE);
+}
+
 /*----------------------------------------------------------------*/
 
 /*
@@ -458,10 +460,28 @@
 		bio_endio(bio, DM_ENDIO_REQUEUE);
 }
 
+static void requeue_deferred_cells(struct thin_c *tc)
+{
+	struct pool *pool = tc->pool;
+	unsigned long flags;
+	struct list_head cells;
+	struct dm_bio_prison_cell *cell, *tmp;
+
+	INIT_LIST_HEAD(&cells);
+
+	spin_lock_irqsave(&tc->lock, flags);
+	list_splice_init(&tc->deferred_cells, &cells);
+	spin_unlock_irqrestore(&tc->lock, flags);
+
+	list_for_each_entry_safe(cell, tmp, &cells, user_list)
+		cell_requeue(pool, cell);
+}
+
 static void requeue_io(struct thin_c *tc)
 {
 	requeue_bio_list(tc, &tc->deferred_bio_list);
 	requeue_bio_list(tc, &tc->retry_on_resume_list);
+	requeue_deferred_cells(tc);
 }
 
 static void error_thin_retry_list(struct thin_c *tc)
@@ -706,6 +726,28 @@
 	wake_worker(pool);
 }
 
+static void thin_defer_bio(struct thin_c *tc, struct bio *bio);
+
+static void inc_remap_and_issue_cell(struct thin_c *tc,
+				     struct dm_bio_prison_cell *cell,
+				     dm_block_t block)
+{
+	struct bio *bio;
+	struct bio_list bios;
+
+	bio_list_init(&bios);
+	cell_release_no_holder(tc->pool, cell, &bios);
+
+	while ((bio = bio_list_pop(&bios))) {
+		if (bio->bi_rw & (REQ_DISCARD | REQ_FLUSH | REQ_FUA))
+			thin_defer_bio(tc, bio);
+		else {
+			inc_all_io_entry(tc->pool, bio);
+			remap_and_issue(tc, bio, block);
+		}
+	}
+}
+
 static void process_prepared_mapping_fail(struct dm_thin_new_mapping *m)
 {
 	if (m->bio) {
@@ -1193,19 +1235,21 @@
 			retry_on_resume(bio);
 }
 
-static void process_discard(struct thin_c *tc, struct bio *bio)
+static void process_discard_cell(struct thin_c *tc, struct dm_bio_prison_cell *cell)
 {
 	int r;
+	struct bio *bio = cell->holder;
 	struct pool *pool = tc->pool;
-	struct dm_bio_prison_cell *cell, *cell2;
-	struct dm_cell_key key, key2;
+	struct dm_bio_prison_cell *cell2;
+	struct dm_cell_key key2;
 	dm_block_t block = get_bio_block(tc, bio);
 	struct dm_thin_lookup_result lookup_result;
 	struct dm_thin_new_mapping *m;
 
-	build_virtual_key(tc->td, block, &key);
-	if (bio_detain(tc->pool, &key, bio, &cell))
+	if (tc->requeue_mode) {
+		cell_requeue(pool, cell);
 		return;
+	}
 
 	r = dm_thin_find_block(tc->td, block, 1, &lookup_result);
 	switch (r) {
@@ -1273,6 +1317,19 @@
 	}
 }
 
+static void process_discard_bio(struct thin_c *tc, struct bio *bio)
+{
+	struct dm_bio_prison_cell *cell;
+	struct dm_cell_key key;
+	dm_block_t block = get_bio_block(tc, bio);
+
+	build_virtual_key(tc->td, block, &key);
+	if (bio_detain(tc->pool, &key, bio, &cell))
+		return;
+
+	process_discard_cell(tc, cell);
+}
+
 static void break_sharing(struct thin_c *tc, struct bio *bio, dm_block_t block,
 			  struct dm_cell_key *key,
 			  struct dm_thin_lookup_result *lookup_result,
@@ -1379,34 +1436,30 @@
 	}
 }
 
-static void process_bio(struct thin_c *tc, struct bio *bio)
+static void process_cell(struct thin_c *tc, struct dm_bio_prison_cell *cell)
 {
 	int r;
 	struct pool *pool = tc->pool;
+	struct bio *bio = cell->holder;
 	dm_block_t block = get_bio_block(tc, bio);
-	struct dm_bio_prison_cell *cell;
-	struct dm_cell_key key;
 	struct dm_thin_lookup_result lookup_result;
 
-	/*
-	 * If cell is already occupied, then the block is already
-	 * being provisioned so we have nothing further to do here.
-	 */
-	build_virtual_key(tc->td, block, &key);
-	if (bio_detain(pool, &key, bio, &cell))
+	if (tc->requeue_mode) {
+		cell_requeue(pool, cell);
 		return;
+	}
 
 	r = dm_thin_find_block(tc->td, block, 1, &lookup_result);
 	switch (r) {
 	case 0:
 		if (lookup_result.shared) {
 			process_shared_bio(tc, bio, block, &lookup_result);
+			// FIXME: we can't remap because we're waiting on a commit
 			cell_defer_no_holder(tc, cell); /* FIXME: pass this cell into process_shared? */
 		} else {
 			inc_all_io_entry(pool, bio);
-			cell_defer_no_holder(tc, cell);
-
 			remap_and_issue(tc, bio, lookup_result.block);
+			inc_remap_and_issue_cell(tc, cell, lookup_result.block);
 		}
 		break;
 
@@ -1440,7 +1493,26 @@
 	}
 }
 
-static void process_bio_read_only(struct thin_c *tc, struct bio *bio)
+static void process_bio(struct thin_c *tc, struct bio *bio)
+{
+	struct pool *pool = tc->pool;
+	dm_block_t block = get_bio_block(tc, bio);
+	struct dm_bio_prison_cell *cell;
+	struct dm_cell_key key;
+
+	/*
+	 * If cell is already occupied, then the block is already
+	 * being provisioned so we have nothing further to do here.
+	 */
+	build_virtual_key(tc->td, block, &key);
+	if (bio_detain(pool, &key, bio, &cell))
+		return;
+
+	process_cell(tc, cell);
+}
+
+static void __process_bio_read_only(struct thin_c *tc, struct bio *bio,
+				    struct dm_bio_prison_cell *cell)
 {
 	int r;
 	int rw = bio_data_dir(bio);
@@ -1450,15 +1522,21 @@
 	r = dm_thin_find_block(tc->td, block, 1, &lookup_result);
 	switch (r) {
 	case 0:
-		if (lookup_result.shared && (rw == WRITE) && bio->bi_iter.bi_size)
+		if (lookup_result.shared && (rw == WRITE) && bio->bi_iter.bi_size) {
 			handle_unserviceable_bio(tc->pool, bio);
-		else {
+			if (cell)
+				cell_defer_no_holder(tc, cell);
+		} else {
 			inc_all_io_entry(tc->pool, bio);
 			remap_and_issue(tc, bio, lookup_result.block);
+			if (cell)
+				inc_remap_and_issue_cell(tc, cell, lookup_result.block);
 		}
 		break;
 
 	case -ENODATA:
+		if (cell)
+			cell_defer_no_holder(tc, cell);
 		if (rw != READ) {
 			handle_unserviceable_bio(tc->pool, bio);
 			break;
@@ -1477,11 +1555,23 @@
 	default:
 		DMERR_LIMIT("%s: dm_thin_find_block() failed: error = %d",
 			    __func__, r);
+		if (cell)
+			cell_defer_no_holder(tc, cell);
 		bio_io_error(bio);
 		break;
 	}
 }
 
+static void process_bio_read_only(struct thin_c *tc, struct bio *bio)
+{
+	__process_bio_read_only(tc, bio, NULL);
+}
+
+static void process_cell_read_only(struct thin_c *tc, struct dm_bio_prison_cell *cell)
+{
+	__process_bio_read_only(tc, cell->holder, cell);
+}
+
 static void process_bio_success(struct thin_c *tc, struct bio *bio)
 {
 	bio_endio(bio, 0);
@@ -1492,6 +1582,16 @@
 	bio_io_error(bio);
 }
 
+static void process_cell_success(struct thin_c *tc, struct dm_bio_prison_cell *cell)
+{
+	cell_success(tc->pool, cell);
+}
+
+static void process_cell_fail(struct thin_c *tc, struct dm_bio_prison_cell *cell)
+{
+	cell_error(tc->pool, cell);
+}
+
 /*
  * FIXME: should we also commit due to size of transaction, measured in
  * metadata blocks?
@@ -1624,6 +1724,45 @@
 	blk_finish_plug(&plug);
 }
 
+static void process_thin_deferred_cells(struct thin_c *tc)
+{
+	struct pool *pool = tc->pool;
+	unsigned long flags;
+	struct list_head cells;
+	struct dm_bio_prison_cell *cell, *tmp;
+
+	INIT_LIST_HEAD(&cells);
+
+	spin_lock_irqsave(&tc->lock, flags);
+	list_splice_init(&tc->deferred_cells, &cells);
+	spin_unlock_irqrestore(&tc->lock, flags);
+
+	if (list_empty(&cells))
+		return;
+
+	list_for_each_entry_safe(cell, tmp, &cells, user_list) {
+		BUG_ON(!cell->holder);
+
+		/*
+		 * If we've got no free new_mapping structs, and processing
+		 * this bio might require one, we pause until there are some
+		 * prepared mappings to process.
+		 */
+		if (ensure_next_mapping(pool)) {
+			spin_lock_irqsave(&tc->lock, flags);
+			list_add(&cell->user_list, &tc->deferred_cells);
+			list_splice(&cells, &tc->deferred_cells);
+			spin_unlock_irqrestore(&tc->lock, flags);
+			break;
+		}
+
+		if (cell->holder->bi_rw & REQ_DISCARD)
+			pool->process_discard_cell(tc, cell);
+		else
+			pool->process_cell(tc, cell);
+	}
+}
+
 static void thin_get(struct thin_c *tc);
 static void thin_put(struct thin_c *tc);
 
@@ -1672,6 +1811,7 @@
 
 	tc = get_first_thin(pool);
 	while (tc) {
+		process_thin_deferred_cells(tc);
 		process_thin_deferred_bios(tc);
 		tc = get_next_thin(pool, tc);
 	}
@@ -1850,6 +1990,8 @@
 		dm_pool_metadata_read_only(pool->pmd);
 		pool->process_bio = process_bio_fail;
 		pool->process_discard = process_bio_fail;
+		pool->process_cell = process_cell_fail;
+		pool->process_discard_cell = process_cell_fail;
 		pool->process_prepared_mapping = process_prepared_mapping_fail;
 		pool->process_prepared_discard = process_prepared_discard_fail;
 
@@ -1862,6 +2004,8 @@
 		dm_pool_metadata_read_only(pool->pmd);
 		pool->process_bio = process_bio_read_only;
 		pool->process_discard = process_bio_success;
+		pool->process_cell = process_cell_read_only;
+		pool->process_discard_cell = process_cell_success;
 		pool->process_prepared_mapping = process_prepared_mapping_fail;
 		pool->process_prepared_discard = process_prepared_discard_passdown;
 
@@ -1880,7 +2024,9 @@
 		if (old_mode != new_mode)
 			notify_of_pool_mode_change(pool, "out-of-data-space");
 		pool->process_bio = process_bio_read_only;
-		pool->process_discard = process_discard;
+		pool->process_discard = process_discard_bio;
+		pool->process_cell = process_cell_read_only;
+		pool->process_discard_cell = process_discard_cell;
 		pool->process_prepared_mapping = process_prepared_mapping;
 		pool->process_prepared_discard = process_prepared_discard_passdown;
 
@@ -1893,7 +2039,9 @@
 			notify_of_pool_mode_change(pool, "write");
 		dm_pool_metadata_read_write(pool->pmd);
 		pool->process_bio = process_bio;
-		pool->process_discard = process_discard;
+		pool->process_discard = process_discard_bio;
+		pool->process_cell = process_cell;
+		pool->process_discard_cell = process_discard_cell;
 		pool->process_prepared_mapping = process_prepared_mapping;
 		pool->process_prepared_discard = process_prepared_discard;
 		break;
@@ -1962,6 +2110,20 @@
 	throttle_unlock(&pool->throttle);
 }
 
+static void thin_defer_cell(struct thin_c *tc, struct dm_bio_prison_cell *cell)
+{
+	unsigned long flags;
+	struct pool *pool = tc->pool;
+
+	throttle_lock(&pool->throttle);
+	spin_lock_irqsave(&tc->lock, flags);
+	list_add_tail(&cell->user_list, &tc->deferred_cells);
+	spin_unlock_irqrestore(&tc->lock, flags);
+	throttle_unlock(&pool->throttle);
+
+	wake_worker(pool);
+}
+
 static void thin_hook_bio(struct thin_c *tc, struct bio *bio)
 {
 	struct dm_thin_endio_hook *h = dm_per_bio_data(bio, sizeof(struct dm_thin_endio_hook));
@@ -1982,8 +2144,7 @@
 	dm_block_t block = get_bio_block(tc, bio);
 	struct dm_thin_device *td = tc->td;
 	struct dm_thin_lookup_result result;
-	struct dm_bio_prison_cell cell1, cell2;
-	struct dm_bio_prison_cell *cell_result;
+	struct dm_bio_prison_cell *virt_cell, *data_cell;
 	struct dm_cell_key key;
 
 	thin_hook_bio(tc, bio);
@@ -2008,7 +2169,7 @@
 	 * there's a race with discard.
 	 */
 	build_virtual_key(tc->td, block, &key);
-	if (dm_bio_detain(tc->pool->prison, &key, bio, &cell1, &cell_result))
+	if (bio_detain(tc->pool, &key, bio, &virt_cell))
 		return DM_MAPIO_SUBMITTED;
 
 	r = dm_thin_find_block(td, block, 0, &result);
@@ -2033,20 +2194,19 @@
 			 * More distant ancestors are irrelevant. The
 			 * shared flag will be set in their case.
 			 */
-			thin_defer_bio(tc, bio);
-			cell_defer_no_holder_no_free(tc, &cell1);
+			thin_defer_cell(tc, virt_cell);
 			return DM_MAPIO_SUBMITTED;
 		}
 
 		build_data_key(tc->td, result.block, &key);
-		if (dm_bio_detain(tc->pool->prison, &key, bio, &cell2, &cell_result)) {
-			cell_defer_no_holder_no_free(tc, &cell1);
+		if (bio_detain(tc->pool, &key, bio, &data_cell)) {
+			cell_defer_no_holder(tc, virt_cell);
 			return DM_MAPIO_SUBMITTED;
 		}
 
 		inc_all_io_entry(tc->pool, bio);
-		cell_defer_no_holder_no_free(tc, &cell2);
-		cell_defer_no_holder_no_free(tc, &cell1);
+		cell_defer_no_holder(tc, data_cell);
+		cell_defer_no_holder(tc, virt_cell);
 
 		remap(tc, bio, result.block);
 		return DM_MAPIO_REMAPPED;
@@ -2058,14 +2218,13 @@
 			 * of doing so.
 			 */
 			handle_unserviceable_bio(tc->pool, bio);
-			cell_defer_no_holder_no_free(tc, &cell1);
+			cell_defer_no_holder(tc, virt_cell);
 			return DM_MAPIO_SUBMITTED;
 		}
 		/* fall through */
 
 	case -EWOULDBLOCK:
-		thin_defer_bio(tc, bio);
-		cell_defer_no_holder_no_free(tc, &cell1);
+		thin_defer_cell(tc, virt_cell);
 		return DM_MAPIO_SUBMITTED;
 
 	default:
@@ -2075,7 +2234,7 @@
 		 * pool is switched to fail-io mode.
 		 */
 		bio_io_error(bio);
-		cell_defer_no_holder_no_free(tc, &cell1);
+		cell_defer_no_holder(tc, virt_cell);
 		return DM_MAPIO_SUBMITTED;
 	}
 }
@@ -3394,6 +3553,7 @@
 		goto out_unlock;
 	}
 	spin_lock_init(&tc->lock);
+	INIT_LIST_HEAD(&tc->deferred_cells);
 	bio_list_init(&tc->deferred_bio_list);
 	bio_list_init(&tc->retry_on_resume_list);
 	tc->sort_bio_list = RB_ROOT;