[DLM] The core of the DLM for GFS2/CLVM
This is the core of the distributed lock manager which is required
to use GFS2 as a cluster filesystem. It is also used by CLVM and
can be used as a standalone lock manager independantly of either
of these two projects.
It implements VAX-style locking modes.
Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steve Whitehouse <swhiteho@redhat.com>
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
new file mode 100644
index 0000000..439249b
--- /dev/null
+++ b/fs/dlm/member.c
@@ -0,0 +1,314 @@
+/******************************************************************************
+*******************************************************************************
+**
+** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
+**
+** This copyrighted material is made available to anyone wishing to use,
+** modify, copy, or redistribute it subject to the terms and conditions
+** of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_internal.h"
+#include "lockspace.h"
+#include "member.h"
+#include "recoverd.h"
+#include "recover.h"
+#include "lowcomms.h"
+#include "rcom.h"
+#include "config.h"
+
+/*
+ * Following called by dlm_recoverd thread
+ */
+
+static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
+{
+ struct dlm_member *memb = NULL;
+ struct list_head *tmp;
+ struct list_head *newlist = &new->list;
+ struct list_head *head = &ls->ls_nodes;
+
+ list_for_each(tmp, head) {
+ memb = list_entry(tmp, struct dlm_member, list);
+ if (new->nodeid < memb->nodeid)
+ break;
+ }
+
+ if (!memb)
+ list_add_tail(newlist, head);
+ else {
+ /* FIXME: can use list macro here */
+ newlist->prev = tmp->prev;
+ newlist->next = tmp;
+ tmp->prev->next = newlist;
+ tmp->prev = newlist;
+ }
+}
+
+static int dlm_add_member(struct dlm_ls *ls, int nodeid)
+{
+ struct dlm_member *memb;
+ int w;
+
+ memb = kmalloc(sizeof(struct dlm_member), GFP_KERNEL);
+ if (!memb)
+ return -ENOMEM;
+
+ w = dlm_node_weight(ls->ls_name, nodeid);
+ if (w < 0)
+ return w;
+
+ memb->nodeid = nodeid;
+ memb->weight = w;
+ add_ordered_member(ls, memb);
+ ls->ls_num_nodes++;
+ return 0;
+}
+
+static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
+{
+ list_move(&memb->list, &ls->ls_nodes_gone);
+ ls->ls_num_nodes--;
+}
+
+static int dlm_is_member(struct dlm_ls *ls, int nodeid)
+{
+ struct dlm_member *memb;
+
+ list_for_each_entry(memb, &ls->ls_nodes, list) {
+ if (memb->nodeid == nodeid)
+ return TRUE;
+ }
+ return FALSE;
+}
+
+int dlm_is_removed(struct dlm_ls *ls, int nodeid)
+{
+ struct dlm_member *memb;
+
+ list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+ if (memb->nodeid == nodeid)
+ return TRUE;
+ }
+ return FALSE;
+}
+
+static void clear_memb_list(struct list_head *head)
+{
+ struct dlm_member *memb;
+
+ while (!list_empty(head)) {
+ memb = list_entry(head->next, struct dlm_member, list);
+ list_del(&memb->list);
+ kfree(memb);
+ }
+}
+
+void dlm_clear_members(struct dlm_ls *ls)
+{
+ clear_memb_list(&ls->ls_nodes);
+ ls->ls_num_nodes = 0;
+}
+
+void dlm_clear_members_gone(struct dlm_ls *ls)
+{
+ clear_memb_list(&ls->ls_nodes_gone);
+}
+
+static void make_member_array(struct dlm_ls *ls)
+{
+ struct dlm_member *memb;
+ int i, w, x = 0, total = 0, all_zero = 0, *array;
+
+ kfree(ls->ls_node_array);
+ ls->ls_node_array = NULL;
+
+ list_for_each_entry(memb, &ls->ls_nodes, list) {
+ if (memb->weight)
+ total += memb->weight;
+ }
+
+ /* all nodes revert to weight of 1 if all have weight 0 */
+
+ if (!total) {
+ total = ls->ls_num_nodes;
+ all_zero = 1;
+ }
+
+ ls->ls_total_weight = total;
+
+ array = kmalloc(sizeof(int) * total, GFP_KERNEL);
+ if (!array)
+ return;
+
+ list_for_each_entry(memb, &ls->ls_nodes, list) {
+ if (!all_zero && !memb->weight)
+ continue;
+
+ if (all_zero)
+ w = 1;
+ else
+ w = memb->weight;
+
+ DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
+
+ for (i = 0; i < w; i++)
+ array[x++] = memb->nodeid;
+ }
+
+ ls->ls_node_array = array;
+}
+
+/* send a status request to all members just to establish comms connections */
+
+static void ping_members(struct dlm_ls *ls)
+{
+ struct dlm_member *memb;
+ list_for_each_entry(memb, &ls->ls_nodes, list)
+ dlm_rcom_status(ls, memb->nodeid);
+}
+
+int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
+{
+ struct dlm_member *memb, *safe;
+ int i, error, found, pos = 0, neg = 0, low = -1;
+
+ /* move departed members from ls_nodes to ls_nodes_gone */
+
+ list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
+ found = FALSE;
+ for (i = 0; i < rv->node_count; i++) {
+ if (memb->nodeid == rv->nodeids[i]) {
+ found = TRUE;
+ break;
+ }
+ }
+
+ if (!found) {
+ neg++;
+ dlm_remove_member(ls, memb);
+ log_debug(ls, "remove member %d", memb->nodeid);
+ }
+ }
+
+ /* add new members to ls_nodes */
+
+ for (i = 0; i < rv->node_count; i++) {
+ if (dlm_is_member(ls, rv->nodeids[i]))
+ continue;
+ dlm_add_member(ls, rv->nodeids[i]);
+ pos++;
+ log_debug(ls, "add member %d", rv->nodeids[i]);
+ }
+
+ list_for_each_entry(memb, &ls->ls_nodes, list) {
+ if (low == -1 || memb->nodeid < low)
+ low = memb->nodeid;
+ }
+ ls->ls_low_nodeid = low;
+
+ make_member_array(ls);
+ dlm_set_recover_status(ls, DLM_RS_NODES);
+ *neg_out = neg;
+
+ ping_members(ls);
+
+ error = dlm_recover_members_wait(ls);
+ log_debug(ls, "total members %d", ls->ls_num_nodes);
+ return error;
+}
+
+/*
+ * Following called from lockspace.c
+ */
+
+int dlm_ls_stop(struct dlm_ls *ls)
+{
+ int new;
+
+ /*
+ * A stop cancels any recovery that's in progress (see RECOVERY_STOP,
+ * dlm_recovery_stopped()) and prevents any new locks from being
+ * processed (see RUNNING, dlm_locking_stopped()).
+ */
+
+ spin_lock(&ls->ls_recover_lock);
+ set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+ new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
+ ls->ls_recover_seq++;
+ spin_unlock(&ls->ls_recover_lock);
+
+ /*
+ * This in_recovery lock does two things:
+ *
+ * 1) Keeps this function from returning until all threads are out
+ * of locking routines and locking is truely stopped.
+ * 2) Keeps any new requests from being processed until it's unlocked
+ * when recovery is complete.
+ */
+
+ if (new)
+ down_write(&ls->ls_in_recovery);
+
+ /*
+ * The recoverd suspend/resume makes sure that dlm_recoverd (if
+ * running) has noticed the clearing of RUNNING above and quit
+ * processing the previous recovery. This will be true for all nodes
+ * before any nodes start the new recovery.
+ */
+
+ dlm_recoverd_suspend(ls);
+ ls->ls_recover_status = 0;
+ dlm_recoverd_resume(ls);
+ return 0;
+}
+
+int dlm_ls_start(struct dlm_ls *ls)
+{
+ struct dlm_recover *rv = NULL, *rv_old;
+ int *ids = NULL;
+ int error, count;
+
+ rv = kmalloc(sizeof(struct dlm_recover), GFP_KERNEL);
+ if (!rv)
+ return -ENOMEM;
+ memset(rv, 0, sizeof(struct dlm_recover));
+
+ error = count = dlm_nodeid_list(ls->ls_name, &ids);
+ if (error <= 0)
+ goto fail;
+
+ spin_lock(&ls->ls_recover_lock);
+
+ /* the lockspace needs to be stopped before it can be started */
+
+ if (!dlm_locking_stopped(ls)) {
+ spin_unlock(&ls->ls_recover_lock);
+ log_error(ls, "start ignored: lockspace running");
+ error = -EINVAL;
+ goto fail;
+ }
+
+ rv->nodeids = ids;
+ rv->node_count = count;
+ rv->seq = ++ls->ls_recover_seq;
+ rv_old = ls->ls_recover_args;
+ ls->ls_recover_args = rv;
+ spin_unlock(&ls->ls_recover_lock);
+
+ if (rv_old) {
+ kfree(rv_old->nodeids);
+ kfree(rv_old);
+ }
+
+ dlm_recoverd_kick(ls);
+ return 0;
+
+ fail:
+ kfree(rv);
+ kfree(ids);
+ return error;
+}
+