ocfs2: teach dlm_restart_lock_mastery() to wait on recovery

Change behavior of dlm_restart_lock_mastery() when a node goes down.  Dump
all responses that have been collected and start over.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index e5d7271..915283f 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -867,6 +867,7 @@
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 
+redo_request:
 	while (wait_on_recovery) {
 		/* any cluster changes that occurred after dropping the
 		 * dlm spinlock would be detectable be a change on the mle,
@@ -904,7 +905,6 @@
 	if (blocked)
 		goto wait;
 
-redo_request:
 	ret = -EINVAL;
 	dlm_node_iter_init(mle->vote_map, &iter);
 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
@@ -929,6 +929,7 @@
 	/* keep going until the response map includes all nodes */
 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
 	if (ret < 0) {
+		wait_on_recovery = 1;
 		mlog(0, "%s:%.*s: node map changed, redo the "
 		     "master request now, blocked=%d\n",
 		     dlm->name, res->lockname.len,
@@ -1210,18 +1211,6 @@
 			set_bit(node, mle->vote_map);
 		} else {
 			mlog(ML_ERROR, "node down! %d\n", node);
-
-			/* if the node wasn't involved in mastery skip it,
-			 * but clear it out from the maps so that it will
-			 * not affect mastery of this lockres */
-			clear_bit(node, mle->response_map);
-			clear_bit(node, mle->vote_map);
-			if (!test_bit(node, mle->maybe_map))
-				goto next;
-
-			/* if we're already blocked on lock mastery, and the
-			 * dead node wasn't the expected master, or there is
-			 * another node in the maybe_map, keep waiting */
 			if (blocked) {
 				int lowest = find_next_bit(mle->maybe_map,
 						       O2NM_MAX_NODES, 0);
@@ -1229,54 +1218,53 @@
 				/* act like it was never there */
 				clear_bit(node, mle->maybe_map);
 
-			       	if (node != lowest)
-					goto next;
-
-				mlog(ML_ERROR, "expected master %u died while "
-				     "this node was blocked waiting on it!\n",
-				     node);
-				lowest = find_next_bit(mle->maybe_map,
-						       O2NM_MAX_NODES,
-						       lowest+1);
-				if (lowest < O2NM_MAX_NODES) {
-					mlog(0, "still blocked. waiting "
-					     "on %u now\n", lowest);
-					goto next;
+			       	if (node == lowest) {
+					mlog(0, "expected master %u died"
+					    " while this node was blocked "
+					    "waiting on it!\n", node);
+					lowest = find_next_bit(mle->maybe_map,
+						       	O2NM_MAX_NODES,
+						       	lowest+1);
+					if (lowest < O2NM_MAX_NODES) {
+						mlog(0, "%s:%.*s:still "
+						     "blocked. waiting on %u "
+						     "now\n", dlm->name,
+						     res->lockname.len,
+						     res->lockname.name,
+						     lowest);
+					} else {
+						/* mle is an MLE_BLOCK, but
+						 * there is now nothing left to
+						 * block on.  we need to return
+						 * all the way back out and try
+						 * again with an MLE_MASTER.
+						 * dlm_do_local_recovery_cleanup
+						 * has already run, so the mle
+						 * refcount is ok */
+						mlog(0, "%s:%.*s: no "
+						     "longer blocking. try to "
+						     "master this here\n",
+						     dlm->name,
+						     res->lockname.len,
+						     res->lockname.name);
+						mle->type = DLM_MLE_MASTER;
+						mle->u.res = res;
+					}
 				}
-
-				/* mle is an MLE_BLOCK, but there is now
-				 * nothing left to block on.  we need to return
-				 * all the way back out and try again with
-				 * an MLE_MASTER. dlm_do_local_recovery_cleanup
-				 * has already run, so the mle refcount is ok */
-				mlog(0, "no longer blocking. we can "
-				     "try to master this here\n");
-				mle->type = DLM_MLE_MASTER;
-				memset(mle->maybe_map, 0,
-				       sizeof(mle->maybe_map));
-				memset(mle->response_map, 0,
-				       sizeof(mle->maybe_map));
-				memcpy(mle->vote_map, mle->node_map,
-				       sizeof(mle->node_map));
-				mle->u.res = res;
-				set_bit(dlm->node_num, mle->maybe_map);
-
-				ret = -EAGAIN;
-				goto next;
 			}
 
-			clear_bit(node, mle->maybe_map);
-			if (node > dlm->node_num)
-				goto next;
-
-			mlog(0, "dead node in map!\n");
-			/* yuck. go back and re-contact all nodes
-			 * in the vote_map, removing this node. */
-			memset(mle->response_map, 0,
-			       sizeof(mle->response_map));
+			/* now blank out everything, as if we had never
+			 * contacted anyone */
+			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
+			memset(mle->response_map, 0, sizeof(mle->response_map));
+			/* reset the vote_map to the current node_map */
+			memcpy(mle->vote_map, mle->node_map,
+			       sizeof(mle->node_map));
+			/* put myself into the maybe map */
+			if (mle->type != DLM_MLE_BLOCK)
+				set_bit(dlm->node_num, mle->maybe_map);
 		}
 		ret = -EAGAIN;
-next:
 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
 	}
 	return ret;