Btrfs: attach delayed ref updates to delayed ref heads

Currently we have two rb-trees, one for delayed ref heads and one for all of the
delayed refs, including the delayed ref heads.  When we process the delayed refs
we have to hold onto the delayed ref lock for all of the selecting and merging
and such, which results in quite a bit of lock contention.  This was solved by
having a waitqueue and only one flusher at a time, however this hurts if we get
a lot of delayed refs queued up.

So instead just have an rb tree for the delayed ref heads, and then attach the
delayed ref updates to an rb tree that is per delayed ref head.  Then we only
need to take the delayed ref lock when adding new delayed refs and when
selecting a delayed ref head to process, all the rest of the time we deal with a
per delayed ref head lock which will be much less contentious.

The locking rules for this get a little more complicated since we have to lock
up to 3 things to properly process delayed refs, but I will address that problem
later.  For now this passes all of xfstests and my overnight stress tests.
Thanks,

Signed-off-by: Josef Bacik <jbacik@fb.com>
Signed-off-by: Chris Mason <clm@fb.com>
diff --git a/fs/btrfs/extent-tree.c b/fs/btrfs/extent-tree.c
index 77acc08..c77156c 100644
--- a/fs/btrfs/extent-tree.c
+++ b/fs/btrfs/extent-tree.c
@@ -857,12 +857,14 @@
 			btrfs_put_delayed_ref(&head->node);
 			goto search_again;
 		}
+		spin_lock(&head->lock);
 		if (head->extent_op && head->extent_op->update_flags)
 			extent_flags |= head->extent_op->flags_to_set;
 		else
 			BUG_ON(num_refs == 0);
 
 		num_refs += head->node.ref_mod;
+		spin_unlock(&head->lock);
 		mutex_unlock(&head->mutex);
 	}
 	spin_unlock(&delayed_refs->lock);
@@ -2287,40 +2289,33 @@
 select_delayed_ref(struct btrfs_delayed_ref_head *head)
 {
 	struct rb_node *node;
-	struct btrfs_delayed_ref_node *ref;
-	int action = BTRFS_ADD_DELAYED_REF;
-again:
+	struct btrfs_delayed_ref_node *ref, *last = NULL;;
+
 	/*
 	 * select delayed ref of type BTRFS_ADD_DELAYED_REF first.
 	 * this prevents ref count from going down to zero when
 	 * there still are pending delayed ref.
 	 */
-	node = rb_prev(&head->node.rb_node);
-	while (1) {
-		if (!node)
-			break;
+	node = rb_first(&head->ref_root);
+	while (node) {
 		ref = rb_entry(node, struct btrfs_delayed_ref_node,
 				rb_node);
-		if (ref->bytenr != head->node.bytenr)
-			break;
-		if (ref->action == action)
+		if (ref->action == BTRFS_ADD_DELAYED_REF)
 			return ref;
-		node = rb_prev(node);
+		else if (last == NULL)
+			last = ref;
+		node = rb_next(node);
 	}
-	if (action == BTRFS_ADD_DELAYED_REF) {
-		action = BTRFS_DROP_DELAYED_REF;
-		goto again;
-	}
-	return NULL;
+	return last;
 }
 
 /*
  * Returns 0 on success or if called with an already aborted transaction.
  * Returns -ENOMEM or -EIO on failure and will abort the transaction.
  */
-static noinline int run_clustered_refs(struct btrfs_trans_handle *trans,
-				       struct btrfs_root *root,
-				       struct list_head *cluster)
+static noinline int __btrfs_run_delayed_refs(struct btrfs_trans_handle *trans,
+					     struct btrfs_root *root,
+					     unsigned long nr)
 {
 	struct btrfs_delayed_ref_root *delayed_refs;
 	struct btrfs_delayed_ref_node *ref;
@@ -2328,23 +2323,26 @@
 	struct btrfs_delayed_extent_op *extent_op;
 	struct btrfs_fs_info *fs_info = root->fs_info;
 	int ret;
-	int count = 0;
+	unsigned long count = 0;
 	int must_insert_reserved = 0;
 
 	delayed_refs = &trans->transaction->delayed_refs;
 	while (1) {
 		if (!locked_ref) {
-			/* pick a new head ref from the cluster list */
-			if (list_empty(cluster))
+			if (count >= nr)
 				break;
 
-			locked_ref = list_entry(cluster->next,
-				     struct btrfs_delayed_ref_head, cluster);
+			spin_lock(&delayed_refs->lock);
+			locked_ref = btrfs_select_ref_head(trans);
+			if (!locked_ref) {
+				spin_unlock(&delayed_refs->lock);
+				break;
+			}
 
 			/* grab the lock that says we are going to process
 			 * all the refs for this head */
 			ret = btrfs_delayed_ref_lock(trans, locked_ref);
-
+			spin_unlock(&delayed_refs->lock);
 			/*
 			 * we may have dropped the spin lock to get the head
 			 * mutex lock, and that might have given someone else
@@ -2365,6 +2363,7 @@
 		 * finish.  If we merged anything we need to re-loop so we can
 		 * get a good ref.
 		 */
+		spin_lock(&locked_ref->lock);
 		btrfs_merge_delayed_refs(trans, fs_info, delayed_refs,
 					 locked_ref);
 
@@ -2376,17 +2375,14 @@
 
 		if (ref && ref->seq &&
 		    btrfs_check_delayed_seq(fs_info, delayed_refs, ref->seq)) {
-			/*
-			 * there are still refs with lower seq numbers in the
-			 * process of being added. Don't run this ref yet.
-			 */
-			list_del_init(&locked_ref->cluster);
+			spin_unlock(&locked_ref->lock);
 			btrfs_delayed_ref_unlock(locked_ref);
-			locked_ref = NULL;
+			spin_lock(&delayed_refs->lock);
+			locked_ref->processing = 0;
 			delayed_refs->num_heads_ready++;
 			spin_unlock(&delayed_refs->lock);
+			locked_ref = NULL;
 			cond_resched();
-			spin_lock(&delayed_refs->lock);
 			continue;
 		}
 
@@ -2401,6 +2397,8 @@
 		locked_ref->extent_op = NULL;
 
 		if (!ref) {
+
+
 			/* All delayed refs have been processed, Go ahead
 			 * and send the head node to run_one_delayed_ref,
 			 * so that any accounting fixes can happen
@@ -2413,8 +2411,7 @@
 			}
 
 			if (extent_op) {
-				spin_unlock(&delayed_refs->lock);
-
+				spin_unlock(&locked_ref->lock);
 				ret = run_delayed_extent_op(trans, root,
 							    ref, extent_op);
 				btrfs_free_delayed_extent_op(extent_op);
@@ -2428,23 +2425,38 @@
 					 */
 					if (must_insert_reserved)
 						locked_ref->must_insert_reserved = 1;
+					locked_ref->processing = 0;
 					btrfs_debug(fs_info, "run_delayed_extent_op returned %d", ret);
-					spin_lock(&delayed_refs->lock);
 					btrfs_delayed_ref_unlock(locked_ref);
 					return ret;
 				}
-
-				goto next;
+				continue;
 			}
-		}
 
-		ref->in_tree = 0;
-		rb_erase(&ref->rb_node, &delayed_refs->root);
-		if (btrfs_delayed_ref_is_head(ref)) {
+			/*
+			 * Need to drop our head ref lock and re-aqcuire the
+			 * delayed ref lock and then re-check to make sure
+			 * nobody got added.
+			 */
+			spin_unlock(&locked_ref->lock);
+			spin_lock(&delayed_refs->lock);
+			spin_lock(&locked_ref->lock);
+			if (rb_first(&locked_ref->ref_root)) {
+				spin_unlock(&locked_ref->lock);
+				spin_unlock(&delayed_refs->lock);
+				continue;
+			}
+			ref->in_tree = 0;
+			delayed_refs->num_heads--;
 			rb_erase(&locked_ref->href_node,
 				 &delayed_refs->href_root);
+			spin_unlock(&delayed_refs->lock);
+		} else {
+			ref->in_tree = 0;
+			rb_erase(&ref->rb_node, &locked_ref->ref_root);
 		}
-		delayed_refs->num_entries--;
+		atomic_dec(&delayed_refs->num_entries);
+
 		if (!btrfs_delayed_ref_is_head(ref)) {
 			/*
 			 * when we play the delayed ref, also correct the
@@ -2461,20 +2473,18 @@
 			default:
 				WARN_ON(1);
 			}
-		} else {
-			list_del_init(&locked_ref->cluster);
 		}
-		spin_unlock(&delayed_refs->lock);
+		spin_unlock(&locked_ref->lock);
 
 		ret = run_one_delayed_ref(trans, root, ref, extent_op,
 					  must_insert_reserved);
 
 		btrfs_free_delayed_extent_op(extent_op);
 		if (ret) {
+			locked_ref->processing = 0;
 			btrfs_delayed_ref_unlock(locked_ref);
 			btrfs_put_delayed_ref(ref);
 			btrfs_debug(fs_info, "run_one_delayed_ref returned %d", ret);
-			spin_lock(&delayed_refs->lock);
 			return ret;
 		}
 
@@ -2490,11 +2500,9 @@
 		}
 		btrfs_put_delayed_ref(ref);
 		count++;
-next:
 		cond_resched();
-		spin_lock(&delayed_refs->lock);
 	}
-	return count;
+	return 0;
 }
 
 #ifdef SCRAMBLE_DELAYED_REFS
@@ -2576,16 +2584,6 @@
 	return ret;
 }
 
-static int refs_newer(struct btrfs_delayed_ref_root *delayed_refs, int seq,
-		      int count)
-{
-	int val = atomic_read(&delayed_refs->ref_seq);
-
-	if (val < seq || val >= seq + count)
-		return 1;
-	return 0;
-}
-
 static inline u64 heads_to_leaves(struct btrfs_root *root, u64 heads)
 {
 	u64 num_bytes;
@@ -2647,12 +2645,9 @@
 	struct rb_node *node;
 	struct btrfs_delayed_ref_root *delayed_refs;
 	struct btrfs_delayed_ref_head *head;
-	struct list_head cluster;
 	int ret;
-	u64 delayed_start;
 	int run_all = count == (unsigned long)-1;
 	int run_most = 0;
-	int loops;
 
 	/* We'll clean this up in btrfs_cleanup_transaction */
 	if (trans->aborted)
@@ -2664,121 +2659,31 @@
 	btrfs_delayed_refs_qgroup_accounting(trans, root->fs_info);
 
 	delayed_refs = &trans->transaction->delayed_refs;
-	INIT_LIST_HEAD(&cluster);
 	if (count == 0) {
-		count = delayed_refs->num_entries * 2;
+		count = atomic_read(&delayed_refs->num_entries) * 2;
 		run_most = 1;
 	}
 
-	if (!run_all && !run_most) {
-		int old;
-		int seq = atomic_read(&delayed_refs->ref_seq);
-
-progress:
-		old = atomic_cmpxchg(&delayed_refs->procs_running_refs, 0, 1);
-		if (old) {
-			DEFINE_WAIT(__wait);
-			if (delayed_refs->flushing ||
-			    !btrfs_should_throttle_delayed_refs(trans, root))
-				return 0;
-
-			prepare_to_wait(&delayed_refs->wait, &__wait,
-					TASK_UNINTERRUPTIBLE);
-
-			old = atomic_cmpxchg(&delayed_refs->procs_running_refs, 0, 1);
-			if (old) {
-				schedule();
-				finish_wait(&delayed_refs->wait, &__wait);
-
-				if (!refs_newer(delayed_refs, seq, 256))
-					goto progress;
-				else
-					return 0;
-			} else {
-				finish_wait(&delayed_refs->wait, &__wait);
-				goto again;
-			}
-		}
-
-	} else {
-		atomic_inc(&delayed_refs->procs_running_refs);
-	}
-
 again:
-	loops = 0;
-	spin_lock(&delayed_refs->lock);
-
 #ifdef SCRAMBLE_DELAYED_REFS
 	delayed_refs->run_delayed_start = find_middle(&delayed_refs->root);
 #endif
-
-	while (1) {
-		if (!(run_all || run_most) &&
-		    !btrfs_should_throttle_delayed_refs(trans, root))
-			break;
-
-		/*
-		 * go find something we can process in the rbtree.  We start at
-		 * the beginning of the tree, and then build a cluster
-		 * of refs to process starting at the first one we are able to
-		 * lock
-		 */
-		delayed_start = delayed_refs->run_delayed_start;
-		ret = btrfs_find_ref_cluster(trans, &cluster,
-					     delayed_refs->run_delayed_start);
-		if (ret)
-			break;
-
-		ret = run_clustered_refs(trans, root, &cluster);
-		if (ret < 0) {
-			btrfs_release_ref_cluster(&cluster);
-			spin_unlock(&delayed_refs->lock);
-			btrfs_abort_transaction(trans, root, ret);
-			atomic_dec(&delayed_refs->procs_running_refs);
-			wake_up(&delayed_refs->wait);
-			return ret;
-		}
-
-		atomic_add(ret, &delayed_refs->ref_seq);
-
-		count -= min_t(unsigned long, ret, count);
-
-		if (count == 0)
-			break;
-
-		if (delayed_start >= delayed_refs->run_delayed_start) {
-			if (loops == 0) {
-				/*
-				 * btrfs_find_ref_cluster looped. let's do one
-				 * more cycle. if we don't run any delayed ref
-				 * during that cycle (because we can't because
-				 * all of them are blocked), bail out.
-				 */
-				loops = 1;
-			} else {
-				/*
-				 * no runnable refs left, stop trying
-				 */
-				BUG_ON(run_all);
-				break;
-			}
-		}
-		if (ret) {
-			/* refs were run, let's reset staleness detection */
-			loops = 0;
-		}
+	ret = __btrfs_run_delayed_refs(trans, root, count);
+	if (ret < 0) {
+		btrfs_abort_transaction(trans, root, ret);
+		return ret;
 	}
 
 	if (run_all) {
-		if (!list_empty(&trans->new_bgs)) {
-			spin_unlock(&delayed_refs->lock);
+		if (!list_empty(&trans->new_bgs))
 			btrfs_create_pending_block_groups(trans, root);
-			spin_lock(&delayed_refs->lock);
-		}
 
+		spin_lock(&delayed_refs->lock);
 		node = rb_first(&delayed_refs->href_root);
-		if (!node)
+		if (!node) {
+			spin_unlock(&delayed_refs->lock);
 			goto out;
+		}
 		count = (unsigned long)-1;
 
 		while (node) {
@@ -2807,16 +2712,10 @@
 			node = rb_next(node);
 		}
 		spin_unlock(&delayed_refs->lock);
-		schedule_timeout(1);
+		cond_resched();
 		goto again;
 	}
 out:
-	atomic_dec(&delayed_refs->procs_running_refs);
-	smp_mb();
-	if (waitqueue_active(&delayed_refs->wait))
-		wake_up(&delayed_refs->wait);
-
-	spin_unlock(&delayed_refs->lock);
 	assert_qgroups_uptodate(trans);
 	return 0;
 }
@@ -2858,12 +2757,13 @@
 	struct rb_node *node;
 	int ret = 0;
 
-	ret = -ENOENT;
 	delayed_refs = &trans->transaction->delayed_refs;
 	spin_lock(&delayed_refs->lock);
 	head = btrfs_find_delayed_ref_head(trans, bytenr);
-	if (!head)
-		goto out;
+	if (!head) {
+		spin_unlock(&delayed_refs->lock);
+		return 0;
+	}
 
 	if (!mutex_trylock(&head->mutex)) {
 		atomic_inc(&head->node.refs);
@@ -2880,40 +2780,35 @@
 		btrfs_put_delayed_ref(&head->node);
 		return -EAGAIN;
 	}
-
-	node = rb_prev(&head->node.rb_node);
-	if (!node)
-		goto out_unlock;
-
-	ref = rb_entry(node, struct btrfs_delayed_ref_node, rb_node);
-
-	if (ref->bytenr != bytenr)
-		goto out_unlock;
-
-	ret = 1;
-	if (ref->type != BTRFS_EXTENT_DATA_REF_KEY)
-		goto out_unlock;
-
-	data_ref = btrfs_delayed_node_to_data_ref(ref);
-
-	node = rb_prev(node);
-	if (node) {
-		int seq = ref->seq;
-
-		ref = rb_entry(node, struct btrfs_delayed_ref_node, rb_node);
-		if (ref->bytenr == bytenr && ref->seq == seq)
-			goto out_unlock;
-	}
-
-	if (data_ref->root != root->root_key.objectid ||
-	    data_ref->objectid != objectid || data_ref->offset != offset)
-		goto out_unlock;
-
-	ret = 0;
-out_unlock:
-	mutex_unlock(&head->mutex);
-out:
 	spin_unlock(&delayed_refs->lock);
+
+	spin_lock(&head->lock);
+	node = rb_first(&head->ref_root);
+	while (node) {
+		ref = rb_entry(node, struct btrfs_delayed_ref_node, rb_node);
+		node = rb_next(node);
+
+		/* If it's a shared ref we know a cross reference exists */
+		if (ref->type != BTRFS_EXTENT_DATA_REF_KEY) {
+			ret = 1;
+			break;
+		}
+
+		data_ref = btrfs_delayed_node_to_data_ref(ref);
+
+		/*
+		 * If our ref doesn't match the one we're currently looking at
+		 * then we have a cross reference.
+		 */
+		if (data_ref->root != root->root_key.objectid ||
+		    data_ref->objectid != objectid ||
+		    data_ref->offset != offset) {
+			ret = 1;
+			break;
+		}
+	}
+	spin_unlock(&head->lock);
+	mutex_unlock(&head->mutex);
 	return ret;
 }
 
@@ -5953,8 +5848,6 @@
 {
 	struct btrfs_delayed_ref_head *head;
 	struct btrfs_delayed_ref_root *delayed_refs;
-	struct btrfs_delayed_ref_node *ref;
-	struct rb_node *node;
 	int ret = 0;
 
 	delayed_refs = &trans->transaction->delayed_refs;
@@ -5963,14 +5856,8 @@
 	if (!head)
 		goto out;
 
-	node = rb_prev(&head->node.rb_node);
-	if (!node)
-		goto out;
-
-	ref = rb_entry(node, struct btrfs_delayed_ref_node, rb_node);
-
-	/* there are still entries for this ref, we can't drop it */
-	if (ref->bytenr == bytenr)
+	spin_lock(&head->lock);
+	if (rb_first(&head->ref_root))
 		goto out;
 
 	if (head->extent_op) {
@@ -5992,20 +5879,19 @@
 	 * ahead and process it.
 	 */
 	head->node.in_tree = 0;
-	rb_erase(&head->node.rb_node, &delayed_refs->root);
 	rb_erase(&head->href_node, &delayed_refs->href_root);
 
-	delayed_refs->num_entries--;
+	atomic_dec(&delayed_refs->num_entries);
 
 	/*
 	 * we don't take a ref on the node because we're removing it from the
 	 * tree, so we just steal the ref the tree was holding.
 	 */
 	delayed_refs->num_heads--;
-	if (list_empty(&head->cluster))
+	if (head->processing == 0)
 		delayed_refs->num_heads_ready--;
-
-	list_del_init(&head->cluster);
+	head->processing = 0;
+	spin_unlock(&head->lock);
 	spin_unlock(&delayed_refs->lock);
 
 	BUG_ON(head->extent_op);
@@ -6016,6 +5902,7 @@
 	btrfs_put_delayed_ref(&head->node);
 	return ret;
 out:
+	spin_unlock(&head->lock);
 	spin_unlock(&delayed_refs->lock);
 	return 0;
 }