Btrfs: Optimize compressed writeback and reads

When reading compressed extents, try to put pages into the page cache
for any pages covered by the compressed extent that readpages didn't already
preload.

Add an async work queue to handle transformations at delayed allocation processing
time.  Right now this is just compression.  The workflow is:

1) Find offsets in the file marked for delayed allocation
2) Lock the pages
3) Lock the state bits
4) Call the async delalloc code

The async delalloc code clears the state lock bits and delalloc bits.  It is
important this happens before the range goes into the work queue because
otherwise it might deadlock with other work queue items that try to lock
those extent bits.

The file pages are compressed, and if the compression doesn't work the
pages are written back directly.

An ordered work queue is used to make sure the inodes are written in the same
order that pdflush or writepages sent them down.

This changes extent_write_cache_pages to let the writepage function
update the wbc nr_written count.

Signed-off-by: Chris Mason <chris.mason@oracle.com>

diff --git a/fs/btrfs/extent_io.c b/fs/btrfs/extent_io.c
index 9b37ce6..bbe3bcf 100644
--- a/fs/btrfs/extent_io.c
+++ b/fs/btrfs/extent_io.c
@@ -47,6 +47,11 @@
 	struct bio *bio;
 	struct extent_io_tree *tree;
 	get_extent_t *get_extent;
+
+	/* tells writepage not to lock the state bits for this range
+	 * it still does the unlocking
+	 */
+	int extent_locked;
 };
 
 int __init extent_io_init(void)
@@ -1198,11 +1203,18 @@
 			 * the caller is taking responsibility for
 			 * locked_page
 			 */
-			if (pages[i] != locked_page)
+			if (pages[i] != locked_page) {
 				lock_page(pages[i]);
+				if (pages[i]->mapping != inode->i_mapping) {
+					ret = -EAGAIN;
+					unlock_page(pages[i]);
+					page_cache_release(pages[i]);
+					goto done;
+				}
+			}
 			page_cache_release(pages[i]);
+			pages_locked++;
 		}
-		pages_locked += ret;
 		nrpages -= ret;
 		index += ret;
 		cond_resched();
@@ -1262,8 +1274,7 @@
 	 * if we're looping.
 	 */
 	if (delalloc_end + 1 - delalloc_start > max_bytes && loops) {
-		delalloc_end = (delalloc_start + PAGE_CACHE_SIZE - 1) &
-			~((u64)PAGE_CACHE_SIZE - 1);
+		delalloc_end = delalloc_start + PAGE_CACHE_SIZE - 1;
 	}
 	/* step two, lock all the pages after the page that has start */
 	ret = lock_delalloc_pages(inode, locked_page,
@@ -1306,7 +1317,10 @@
 int extent_clear_unlock_delalloc(struct inode *inode,
 				struct extent_io_tree *tree,
 				u64 start, u64 end, struct page *locked_page,
-				int clear_dirty, int set_writeback,
+				int unlock_pages,
+				int clear_unlock,
+				int clear_delalloc, int clear_dirty,
+				int set_writeback,
 				int end_writeback)
 {
 	int ret;
@@ -1315,12 +1329,19 @@
 	unsigned long end_index = end >> PAGE_CACHE_SHIFT;
 	unsigned long nr_pages = end_index - index + 1;
 	int i;
-	int clear_bits = EXTENT_LOCKED | EXTENT_DELALLOC;
+	int clear_bits = 0;
 
+	if (clear_unlock)
+		clear_bits |= EXTENT_LOCKED;
 	if (clear_dirty)
 		clear_bits |= EXTENT_DIRTY;
 
+	if (clear_delalloc)
+		clear_bits |= EXTENT_DELALLOC;
+
 	clear_extent_bit(tree, start, end, clear_bits, 1, 0, GFP_NOFS);
+	if (!(unlock_pages || clear_dirty || set_writeback || end_writeback))
+		return 0;
 
 	while(nr_pages > 0) {
 		ret = find_get_pages_contig(inode->i_mapping, index,
@@ -1336,7 +1357,8 @@
 				set_page_writeback(pages[i]);
 			if (end_writeback)
 				end_page_writeback(pages[i]);
-			unlock_page(pages[i]);
+			if (unlock_pages)
+				unlock_page(pages[i]);
 			page_cache_release(pages[i]);
 		}
 		nr_pages -= ret;
@@ -1741,9 +1763,10 @@
 			}
 		}
 
-		if (uptodate)
+		if (uptodate) {
 			set_extent_uptodate(tree, start, end,
 					    GFP_ATOMIC);
+		}
 		unlock_extent(tree, start, end, GFP_ATOMIC);
 
 		if (whole_page) {
@@ -1925,6 +1948,7 @@
 		set_page_private(page, EXTENT_PAGE_PRIVATE);
 	}
 }
+EXPORT_SYMBOL(set_page_extent_mapped);
 
 void set_page_extent_head(struct page *page, unsigned long len)
 {
@@ -2143,12 +2167,17 @@
 	u64 delalloc_end;
 	int page_started;
 	int compressed;
+	unsigned long nr_written = 0;
 
 	WARN_ON(!PageLocked(page));
 	pg_offset = i_size & (PAGE_CACHE_SIZE - 1);
 	if (page->index > end_index ||
 	   (page->index == end_index && !pg_offset)) {
-		page->mapping->a_ops->invalidatepage(page, 0);
+		if (epd->extent_locked) {
+			if (tree->ops && tree->ops->writepage_end_io_hook)
+				tree->ops->writepage_end_io_hook(page, start,
+							 page_end, NULL, 1);
+		}
 		unlock_page(page);
 		return 0;
 	}
@@ -2169,27 +2198,33 @@
 	delalloc_start = start;
 	delalloc_end = 0;
 	page_started = 0;
-	while(delalloc_end < page_end) {
-		nr_delalloc = find_lock_delalloc_range(inode, tree,
+	if (!epd->extent_locked) {
+		while(delalloc_end < page_end) {
+			nr_delalloc = find_lock_delalloc_range(inode, tree,
 						       page,
 						       &delalloc_start,
 						       &delalloc_end,
 						       128 * 1024 * 1024);
-		if (nr_delalloc == 0) {
+			if (nr_delalloc == 0) {
+				delalloc_start = delalloc_end + 1;
+				continue;
+			}
+			tree->ops->fill_delalloc(inode, page, delalloc_start,
+						 delalloc_end, &page_started,
+						 &nr_written);
 			delalloc_start = delalloc_end + 1;
-			continue;
 		}
-		tree->ops->fill_delalloc(inode, page, delalloc_start,
-					 delalloc_end, &page_started);
-		delalloc_start = delalloc_end + 1;
-	}
 
-	/* did the fill delalloc function already unlock and start the IO? */
-	if (page_started) {
-		return 0;
+		/* did the fill delalloc function already unlock and start
+		 * the IO?
+		 */
+		if (page_started) {
+			ret = 0;
+			goto update_nr_written;
+		}
 	}
-
 	lock_extent(tree, start, page_end, GFP_NOFS);
+
 	unlock_start = start;
 
 	if (tree->ops && tree->ops->writepage_start_hook) {
@@ -2199,10 +2234,13 @@
 			unlock_extent(tree, start, page_end, GFP_NOFS);
 			redirty_page_for_writepage(wbc, page);
 			unlock_page(page);
-			return 0;
+			ret = 0;
+			goto update_nr_written;
 		}
 	}
 
+	nr_written++;
+
 	end = page_end;
 	if (test_range_bit(tree, start, page_end, EXTENT_DELALLOC, 0)) {
 		printk("found delalloc bits after lock_extent\n");
@@ -2333,6 +2371,12 @@
 	if (unlock_start <= page_end)
 		unlock_extent(tree, unlock_start, page_end, GFP_NOFS);
 	unlock_page(page);
+
+update_nr_written:
+	wbc->nr_to_write -= nr_written;
+	if (wbc->range_cyclic || (wbc->nr_to_write > 0 &&
+	    wbc->range_start == 0 && wbc->range_end == LLONG_MAX))
+		page->mapping->writeback_index = page->index + nr_written;
 	return 0;
 }
 
@@ -2431,7 +2475,7 @@
 				unlock_page(page);
 				ret = 0;
 			}
-			if (ret || (--(wbc->nr_to_write) <= 0))
+			if (ret || wbc->nr_to_write <= 0)
 				done = 1;
 			if (wbc->nonblocking && bdi_write_congested(bdi)) {
 				wbc->encountered_congestion = 1;
@@ -2452,6 +2496,8 @@
 	}
 	if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
 		mapping->writeback_index = index;
+		if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
+			range_whole = 1;
 
 	if (wbc->range_cont)
 		wbc->range_start = index << PAGE_CACHE_SHIFT;
@@ -2469,6 +2515,7 @@
 		.bio = NULL,
 		.tree = tree,
 		.get_extent = get_extent,
+		.extent_locked = 0,
 	};
 	struct writeback_control wbc_writepages = {
 		.bdi		= wbc->bdi,
@@ -2491,6 +2538,52 @@
 }
 EXPORT_SYMBOL(extent_write_full_page);
 
+int extent_write_locked_range(struct extent_io_tree *tree, struct inode *inode,
+			      u64 start, u64 end, get_extent_t *get_extent,
+			      int mode)
+{
+	int ret = 0;
+	struct address_space *mapping = inode->i_mapping;
+	struct page *page;
+	unsigned long nr_pages = (end - start + PAGE_CACHE_SIZE) >>
+		PAGE_CACHE_SHIFT;
+
+	struct extent_page_data epd = {
+		.bio = NULL,
+		.tree = tree,
+		.get_extent = get_extent,
+		.extent_locked = 1,
+	};
+	struct writeback_control wbc_writepages = {
+		.bdi		= inode->i_mapping->backing_dev_info,
+		.sync_mode	= mode,
+		.older_than_this = NULL,
+		.nr_to_write	= nr_pages * 2,
+		.range_start	= start,
+		.range_end	= end + 1,
+	};
+
+	while(start <= end) {
+		page = find_get_page(mapping, start >> PAGE_CACHE_SHIFT);
+		if (clear_page_dirty_for_io(page))
+			ret = __extent_writepage(page, &wbc_writepages, &epd);
+		else {
+			if (tree->ops && tree->ops->writepage_end_io_hook)
+				tree->ops->writepage_end_io_hook(page, start,
+						 start + PAGE_CACHE_SIZE - 1,
+						 NULL, 1);
+			unlock_page(page);
+		}
+		page_cache_release(page);
+		start += PAGE_CACHE_SIZE;
+	}
+
+	if (epd.bio)
+		submit_one_bio(WRITE, epd.bio, 0, 0);
+	return ret;
+}
+EXPORT_SYMBOL(extent_write_locked_range);
+
 
 int extent_writepages(struct extent_io_tree *tree,
 		      struct address_space *mapping,
@@ -2502,6 +2595,7 @@
 		.bio = NULL,
 		.tree = tree,
 		.get_extent = get_extent,
+		.extent_locked = 0,
 	};
 
 	ret = extent_write_cache_pages(tree, mapping, wbc,