ocfs2: Change the recovery map to an array of node numbers.

The old recovery map was a bitmap of node numbers.  This was sufficient
for the maximum node number of 254.  Going forward, we want node numbers
to be UINT32.  Thus, we need a new recovery map.

Note that we can't keep track of slots here.  We must write down the
node number to recovery *before* we get the locks needed to convert a
node number into a slot number.

The recovery map is now an array of unsigned ints, max_slots in size.
It moves to journal.c with the rest of recovery.

Because it needs to be initialized, we move all of recovery initialization
into a new function, ocfs2_recovery_init().  This actually cleans up
ocfs2_initialize_super() a little as well.  Following on, recovery cleaup
becomes part of ocfs2_recovery_exit().

A number of node map functions are rendered obsolete and are removed.

Finally, waiting on recovery is wrapped in a function rather than naked
checks on the recovery_event.  This is a cleanup from Mark.

Signed-off-by: Joel Becker <joel.becker@oracle.com>
Signed-off-by: Mark Fasheh <mfasheh@suse.com>
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index ed0c6d0..ca4c0ea 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -64,6 +64,137 @@
 				 int slot);
 static int ocfs2_commit_thread(void *arg);
 
+
+/*
+ * The recovery_list is a simple linked list of node numbers to recover.
+ * It is protected by the recovery_lock.
+ */
+
+struct ocfs2_recovery_map {
+	int rm_used;
+	unsigned int *rm_entries;
+};
+
+int ocfs2_recovery_init(struct ocfs2_super *osb)
+{
+	struct ocfs2_recovery_map *rm;
+
+	mutex_init(&osb->recovery_lock);
+	osb->disable_recovery = 0;
+	osb->recovery_thread_task = NULL;
+	init_waitqueue_head(&osb->recovery_event);
+
+	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
+		     osb->max_slots * sizeof(unsigned int),
+		     GFP_KERNEL);
+	if (!rm) {
+		mlog_errno(-ENOMEM);
+		return -ENOMEM;
+	}
+
+	rm->rm_entries = (unsigned int *)((char *)rm +
+					  sizeof(struct ocfs2_recovery_map));
+	osb->recovery_map = rm;
+
+	return 0;
+}
+
+/* we can't grab the goofy sem lock from inside wait_event, so we use
+ * memory barriers to make sure that we'll see the null task before
+ * being woken up */
+static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
+{
+	mb();
+	return osb->recovery_thread_task != NULL;
+}
+
+void ocfs2_recovery_exit(struct ocfs2_super *osb)
+{
+	struct ocfs2_recovery_map *rm;
+
+	/* disable any new recovery threads and wait for any currently
+	 * running ones to exit. Do this before setting the vol_state. */
+	mutex_lock(&osb->recovery_lock);
+	osb->disable_recovery = 1;
+	mutex_unlock(&osb->recovery_lock);
+	wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
+
+	/* At this point, we know that no more recovery threads can be
+	 * launched, so wait for any recovery completion work to
+	 * complete. */
+	flush_workqueue(ocfs2_wq);
+
+	/*
+	 * Now that recovery is shut down, and the osb is about to be
+	 * freed,  the osb_lock is not taken here.
+	 */
+	rm = osb->recovery_map;
+	/* XXX: Should we bug if there are dirty entries? */
+
+	kfree(rm);
+}
+
+static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
+				     unsigned int node_num)
+{
+	int i;
+	struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+	assert_spin_locked(&osb->osb_lock);
+
+	for (i = 0; i < rm->rm_used; i++) {
+		if (rm->rm_entries[i] == node_num)
+			return 1;
+	}
+
+	return 0;
+}
+
+/* Behaves like test-and-set.  Returns the previous value */
+static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+				  unsigned int node_num)
+{
+	struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+	spin_lock(&osb->osb_lock);
+	if (__ocfs2_recovery_map_test(osb, node_num)) {
+		spin_unlock(&osb->osb_lock);
+		return 1;
+	}
+
+	/* XXX: Can this be exploited? Not from o2dlm... */
+	BUG_ON(rm->rm_used >= osb->max_slots);
+
+	rm->rm_entries[rm->rm_used] = node_num;
+	rm->rm_used++;
+	spin_unlock(&osb->osb_lock);
+
+	return 0;
+}
+
+static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
+				     unsigned int node_num)
+{
+	int i;
+	struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+	spin_lock(&osb->osb_lock);
+
+	for (i = 0; i < rm->rm_used; i++) {
+		if (rm->rm_entries[i] == node_num)
+			break;
+	}
+
+	if (i < rm->rm_used) {
+		/* XXX: be careful with the pointer math */
+		memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
+			(rm->rm_used - i - 1) * sizeof(unsigned int));
+		rm->rm_used--;
+	}
+
+	spin_unlock(&osb->osb_lock);
+}
+
 static int ocfs2_commit_cache(struct ocfs2_super *osb)
 {
 	int status = 0;
@@ -650,6 +781,23 @@
 	return status;
 }
 
+static int ocfs2_recovery_completed(struct ocfs2_super *osb)
+{
+	int empty;
+	struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+	spin_lock(&osb->osb_lock);
+	empty = (rm->rm_used == 0);
+	spin_unlock(&osb->osb_lock);
+
+	return empty;
+}
+
+void ocfs2_wait_for_recovery(struct ocfs2_super *osb)
+{
+	wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
+}
+
 /*
  * JBD Might read a cached version of another nodes journal file. We
  * don't want this as this file changes often and we get no
@@ -848,6 +996,7 @@
 {
 	int status, node_num;
 	struct ocfs2_super *osb = arg;
+	struct ocfs2_recovery_map *rm = osb->recovery_map;
 
 	mlog_entry_void();
 
@@ -863,26 +1012,29 @@
 		goto bail;
 	}
 
-	while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
-		node_num = ocfs2_node_map_first_set_bit(osb,
-							&osb->recovery_map);
-		if (node_num == O2NM_INVALID_NODE_NUM) {
-			mlog(0, "Out of nodes to recover.\n");
-			break;
-		}
+	spin_lock(&osb->osb_lock);
+	while (rm->rm_used) {
+		/* It's always safe to remove entry zero, as we won't
+		 * clear it until ocfs2_recover_node() has succeeded. */
+		node_num = rm->rm_entries[0];
+		spin_unlock(&osb->osb_lock);
 
 		status = ocfs2_recover_node(osb, node_num);
-		if (status < 0) {
+		if (!status) {
+			ocfs2_recovery_map_clear(osb, node_num);
+		} else {
 			mlog(ML_ERROR,
 			     "Error %d recovering node %d on device (%u,%u)!\n",
 			     status, node_num,
 			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 			mlog(ML_ERROR, "Volume requires unmount.\n");
-			continue;
 		}
 
-		ocfs2_recovery_map_clear(osb, node_num);
+		spin_lock(&osb->osb_lock);
 	}
+	spin_unlock(&osb->osb_lock);
+	mlog(0, "All nodes recovered\n");
+
 	ocfs2_super_unlock(osb, 1);
 
 	/* We always run recovery on our own orphan dir - the dead
@@ -893,8 +1045,7 @@
 
 bail:
 	mutex_lock(&osb->recovery_lock);
-	if (!status &&
-	    !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
+	if (!status && !ocfs2_recovery_completed(osb)) {
 		mutex_unlock(&osb->recovery_lock);
 		goto restart;
 	}
@@ -924,8 +1075,8 @@
 
 	/* People waiting on recovery will wait on
 	 * the recovery map to empty. */
-	if (!ocfs2_recovery_map_set(osb, node_num))
-		mlog(0, "node %d already be in recovery.\n", node_num);
+	if (ocfs2_recovery_map_set(osb, node_num))
+		mlog(0, "node %d already in recovery map.\n", node_num);
 
 	mlog(0, "starting recovery thread...\n");
 
@@ -1197,7 +1348,7 @@
 		if (status == -ENOENT)
 			continue;
 
-		if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num))
+		if (__ocfs2_recovery_map_test(osb, node_num))
 			continue;
 		spin_unlock(&osb->osb_lock);