GFS2: Merge lock_dlm module into GFS2

This is the big patch that I've been working on for some time
now. There are many reasons for wanting to make this change
such as:
 o Reducing overhead by eliminating duplicated fields between structures
 o Simplifcation of the code (reduces the code size by a fair bit)
 o The locking interface is now the DLM interface itself as proposed
   some time ago.
 o Fewer lookups of glocks when processing replies from the DLM
 o Fewer memory allocations/deallocations for each glock
 o Scope to do further optimisations in the future (but this patch is
   more than big enough for now!)

Please note that (a) this patch relates to the lock_dlm module and
not the DLM itself, that is still a separate module; and (b) that
we retain the ability to build GFS2 as a standalone single node
filesystem with out requiring the DLM.

This patch needs a lot of testing, hence my keeping it I restarted
my -git tree after the last merge window. That way, this has the maximum
exposure before its merged. This is (modulo a few minor bug fixes) the
same patch that I've been posting on and off the the last three months
and its passed a number of different tests so far.

Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 6b983ae..cd200a5 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -10,7 +10,6 @@
 #include <linux/sched.h>
 #include <linux/slab.h>
 #include <linux/spinlock.h>
-#include <linux/completion.h>
 #include <linux/buffer_head.h>
 #include <linux/delay.h>
 #include <linux/sort.h>
@@ -18,7 +17,6 @@
 #include <linux/kallsyms.h>
 #include <linux/gfs2_ondisk.h>
 #include <linux/list.h>
-#include <linux/lm_interface.h>
 #include <linux/wait.h>
 #include <linux/module.h>
 #include <linux/rwsem.h>
@@ -155,13 +153,10 @@
 	struct gfs2_sbd *sdp = gl->gl_sbd;
 	struct inode *aspace = gl->gl_aspace;
 
-	if (sdp->sd_lockstruct.ls_ops->lm_put_lock)
-		sdp->sd_lockstruct.ls_ops->lm_put_lock(gl->gl_lock);
-
 	if (aspace)
 		gfs2_aspace_put(aspace);
 
-	kmem_cache_free(gfs2_glock_cachep, gl);
+	sdp->sd_lockstruct.ls_ops->lm_put_lock(gfs2_glock_cachep, gl);
 }
 
 /**
@@ -211,7 +206,6 @@
 			atomic_dec(&lru_count);
 		}
 		spin_unlock(&lru_lock);
-		GLOCK_BUG_ON(gl, gl->gl_state != LM_ST_UNLOCKED);
 		GLOCK_BUG_ON(gl, !list_empty(&gl->gl_lru));
 		GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
 		glock_free(gl);
@@ -256,27 +250,6 @@
 }
 
 /**
- * gfs2_glock_find() - Find glock by lock number
- * @sdp: The GFS2 superblock
- * @name: The lock name
- *
- * Returns: NULL, or the struct gfs2_glock with the requested number
- */
-
-static struct gfs2_glock *gfs2_glock_find(const struct gfs2_sbd *sdp,
-					  const struct lm_lockname *name)
-{
-	unsigned int hash = gl_hash(sdp, name);
-	struct gfs2_glock *gl;
-
-	read_lock(gl_lock_addr(hash));
-	gl = search_bucket(hash, sdp, name);
-	read_unlock(gl_lock_addr(hash));
-
-	return gl;
-}
-
-/**
  * may_grant - check if its ok to grant a new lock
  * @gl: The glock
  * @gh: The lock request which we wish to grant
@@ -523,7 +496,7 @@
 }
 
 static unsigned int gfs2_lm_lock(struct gfs2_sbd *sdp, void *lock,
-				 unsigned int cur_state, unsigned int req_state,
+				 unsigned int req_state,
 				 unsigned int flags)
 {
 	int ret = LM_OUT_ERROR;
@@ -532,7 +505,7 @@
 		return req_state == LM_ST_UNLOCKED ? 0 : req_state;
 
 	if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
-		ret = sdp->sd_lockstruct.ls_ops->lm_lock(lock, cur_state,
+		ret = sdp->sd_lockstruct.ls_ops->lm_lock(lock,
 							 req_state, flags);
 	return ret;
 }
@@ -575,7 +548,7 @@
 	    gl->gl_state == LM_ST_DEFERRED) &&
 	    !(lck_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)))
 		lck_flags |= LM_FLAG_TRY_1CB;
-	ret = gfs2_lm_lock(sdp, gl->gl_lock, gl->gl_state, target, lck_flags);
+	ret = gfs2_lm_lock(sdp, gl, target, lck_flags);
 
 	if (!(ret & LM_OUT_ASYNC)) {
 		finish_xmote(gl, ret);
@@ -681,18 +654,6 @@
 		gfs2_glock_put(gl);
 }
 
-static int gfs2_lm_get_lock(struct gfs2_sbd *sdp, struct lm_lockname *name,
-		     void **lockp)
-{
-	int error = -EIO;
-	if (!sdp->sd_lockstruct.ls_ops->lm_get_lock)
-		return 0;
-	if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
-		error = sdp->sd_lockstruct.ls_ops->lm_get_lock(
-				sdp->sd_lockstruct.ls_lockspace, name, lockp);
-	return error;
-}
-
 /**
  * gfs2_glock_get() - Get a glock, or create one if one doesn't exist
  * @sdp: The GFS2 superblock
@@ -736,6 +697,9 @@
 	gl->gl_demote_state = LM_ST_EXCLUSIVE;
 	gl->gl_hash = hash;
 	gl->gl_ops = glops;
+	snprintf(gl->gl_strname, GDLM_STRNAME_BYTES, "%8x%16llx", name.ln_type, (unsigned long long)number);
+	memset(&gl->gl_lksb, 0, sizeof(struct dlm_lksb));
+	gl->gl_lksb.sb_lvbptr = gl->gl_lvb;
 	gl->gl_stamp = jiffies;
 	gl->gl_tchange = jiffies;
 	gl->gl_object = NULL;
@@ -753,10 +717,6 @@
 		}
 	}
 
-	error = gfs2_lm_get_lock(sdp, &name, &gl->gl_lock);
-	if (error)
-		goto fail_aspace;
-
 	write_lock(gl_lock_addr(hash));
 	tmp = search_bucket(hash, sdp, &name);
 	if (tmp) {
@@ -772,9 +732,6 @@
 
 	return 0;
 
-fail_aspace:
-	if (gl->gl_aspace)
-		gfs2_aspace_put(gl->gl_aspace);
 fail:
 	kmem_cache_free(gfs2_glock_cachep, gl);
 	return error;
@@ -966,7 +923,7 @@
 	if (!(gh->gh_flags & LM_FLAG_PRIORITY)) {
 		spin_unlock(&gl->gl_spin);
 		if (sdp->sd_lockstruct.ls_ops->lm_cancel)
-			sdp->sd_lockstruct.ls_ops->lm_cancel(gl->gl_lock);
+			sdp->sd_lockstruct.ls_ops->lm_cancel(gl);
 		spin_lock(&gl->gl_spin);
 	}
 	return;
@@ -1240,70 +1197,13 @@
 		gfs2_glock_dq_uninit(&ghs[x]);
 }
 
-static int gfs2_lm_hold_lvb(struct gfs2_sbd *sdp, void *lock, char **lvbp)
+void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state)
 {
-	int error = -EIO;
-	if (!sdp->sd_lockstruct.ls_ops->lm_hold_lvb)
-		return 0;
-	if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
-		error = sdp->sd_lockstruct.ls_ops->lm_hold_lvb(lock, lvbp);
-	return error;
-}
-
-/**
- * gfs2_lvb_hold - attach a LVB from a glock
- * @gl: The glock in question
- *
- */
-
-int gfs2_lvb_hold(struct gfs2_glock *gl)
-{
-	int error;
-
-	if (!atomic_read(&gl->gl_lvb_count)) {
-		error = gfs2_lm_hold_lvb(gl->gl_sbd, gl->gl_lock, &gl->gl_lvb);
-		if (error) 
-			return error;
-		gfs2_glock_hold(gl);
-	}
-	atomic_inc(&gl->gl_lvb_count);
-
-	return 0;
-}
-
-/**
- * gfs2_lvb_unhold - detach a LVB from a glock
- * @gl: The glock in question
- *
- */
-
-void gfs2_lvb_unhold(struct gfs2_glock *gl)
-{
-	struct gfs2_sbd *sdp = gl->gl_sbd;
-
-	gfs2_glock_hold(gl);
-	gfs2_assert(gl->gl_sbd, atomic_read(&gl->gl_lvb_count) > 0);
-	if (atomic_dec_and_test(&gl->gl_lvb_count)) {
-		if (sdp->sd_lockstruct.ls_ops->lm_unhold_lvb)
-			sdp->sd_lockstruct.ls_ops->lm_unhold_lvb(gl->gl_lock, gl->gl_lvb);
-		gl->gl_lvb = NULL;
-		gfs2_glock_put(gl);
-	}
-	gfs2_glock_put(gl);
-}
-
-static void blocking_cb(struct gfs2_sbd *sdp, struct lm_lockname *name,
-			unsigned int state)
-{
-	struct gfs2_glock *gl;
 	unsigned long delay = 0;
 	unsigned long holdtime;
 	unsigned long now = jiffies;
 
-	gl = gfs2_glock_find(sdp, name);
-	if (!gl)
-		return;
-
+	gfs2_glock_hold(gl);
 	holdtime = gl->gl_tchange + gl->gl_ops->go_min_hold_time;
 	if (time_before(now, holdtime))
 		delay = holdtime - now;
@@ -1317,74 +1217,37 @@
 		gfs2_glock_put(gl);
 }
 
-static void gfs2_jdesc_make_dirty(struct gfs2_sbd *sdp, unsigned int jid)
-{
-	struct gfs2_jdesc *jd;
-
-	spin_lock(&sdp->sd_jindex_spin);
-	list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
-		if (jd->jd_jid != jid)
-			continue;
-		jd->jd_dirty = 1;
-		break;
-	}
-	spin_unlock(&sdp->sd_jindex_spin);
-}
-
 /**
- * gfs2_glock_cb - Callback used by locking module
- * @sdp: Pointer to the superblock
- * @type: Type of callback
- * @data: Type dependent data pointer
+ * gfs2_glock_complete - Callback used by locking
+ * @gl: Pointer to the glock
+ * @ret: The return value from the dlm
  *
- * Called by the locking module when it wants to tell us something.
- * Either we need to drop a lock, one of our ASYNC requests completed, or
- * a journal from another client needs to be recovered.
  */
 
-void gfs2_glock_cb(void *cb_data, unsigned int type, void *data)
+void gfs2_glock_complete(struct gfs2_glock *gl, int ret)
 {
-	struct gfs2_sbd *sdp = cb_data;
-
-	switch (type) {
-	case LM_CB_NEED_E:
-		blocking_cb(sdp, data, LM_ST_UNLOCKED);
-		return;
-
-	case LM_CB_NEED_D:
-		blocking_cb(sdp, data, LM_ST_DEFERRED);
-		return;
-
-	case LM_CB_NEED_S:
-		blocking_cb(sdp, data, LM_ST_SHARED);
-		return;
-
-	case LM_CB_ASYNC: {
-		struct lm_async_cb *async = data;
-		struct gfs2_glock *gl;
-
-		down_read(&gfs2_umount_flush_sem);
-		gl = gfs2_glock_find(sdp, &async->lc_name);
-		if (gfs2_assert_warn(sdp, gl))
+	struct lm_lockstruct *ls = &gl->gl_sbd->sd_lockstruct;
+	down_read(&gfs2_umount_flush_sem);
+	gl->gl_reply = ret;
+	if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_flags))) {
+		struct gfs2_holder *gh;
+		spin_lock(&gl->gl_spin);
+		gh = find_first_waiter(gl);
+		if ((!(gh && (gh->gh_flags & LM_FLAG_NOEXP)) &&
+		     (gl->gl_target != LM_ST_UNLOCKED)) ||
+		    ((ret & ~LM_OUT_ST_MASK) != 0))
+			set_bit(GLF_FROZEN, &gl->gl_flags);
+		spin_unlock(&gl->gl_spin);
+		if (test_bit(GLF_FROZEN, &gl->gl_flags)) {
+			up_read(&gfs2_umount_flush_sem);
 			return;
-		gl->gl_reply = async->lc_ret;
-		set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
-		if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-			gfs2_glock_put(gl);
-		up_read(&gfs2_umount_flush_sem);
-		return;
+		}
 	}
-
-	case LM_CB_NEED_RECOVERY:
-		gfs2_jdesc_make_dirty(sdp, *(unsigned int *)data);
-		if (sdp->sd_recoverd_process)
-			wake_up_process(sdp->sd_recoverd_process);
-		return;
-
-	default:
-		gfs2_assert_warn(sdp, 0);
-		return;
-	}
+	set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+	gfs2_glock_hold(gl);
+	if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
+		gfs2_glock_put(gl);
+	up_read(&gfs2_umount_flush_sem);
 }
 
 /**
@@ -1515,6 +1378,27 @@
 	return has_entries;
 }
 
+
+/**
+ * thaw_glock - thaw out a glock which has an unprocessed reply waiting
+ * @gl: The glock to thaw
+ *
+ * N.B. When we freeze a glock, we leave a ref to the glock outstanding,
+ * so this has to result in the ref count being dropped by one.
+ */
+
+static void thaw_glock(struct gfs2_glock *gl)
+{
+	if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))
+		return;
+	down_read(&gfs2_umount_flush_sem);
+	set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+	gfs2_glock_hold(gl);
+	if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
+		gfs2_glock_put(gl);
+	up_read(&gfs2_umount_flush_sem);
+}
+
 /**
  * clear_glock - look at a glock and see if we can free it from glock cache
  * @gl: the glock to look at
@@ -1540,6 +1424,20 @@
 }
 
 /**
+ * gfs2_glock_thaw - Thaw any frozen glocks
+ * @sdp: The super block
+ *
+ */
+
+void gfs2_glock_thaw(struct gfs2_sbd *sdp)
+{
+	unsigned x;
+
+	for (x = 0; x < GFS2_GL_HASH_SIZE; x++)
+		examine_bucket(thaw_glock, sdp, x);
+}
+
+/**
  * gfs2_gl_hash_clear - Empty out the glock hash table
  * @sdp: the filesystem
  * @wait: wait until it's all gone
@@ -1619,7 +1517,7 @@
 	if (flags & LM_FLAG_NOEXP)
 		*p++ = 'e';
 	if (flags & LM_FLAG_ANY)
-		*p++ = 'a';
+		*p++ = 'A';
 	if (flags & LM_FLAG_PRIORITY)
 		*p++ = 'p';
 	if (flags & GL_ASYNC)
@@ -1683,6 +1581,10 @@
 		*p++ = 'i';
 	if (test_bit(GLF_REPLY_PENDING, gflags))
 		*p++ = 'r';
+	if (test_bit(GLF_INITIAL, gflags))
+		*p++ = 'i';
+	if (test_bit(GLF_FROZEN, gflags))
+		*p++ = 'F';
 	*p = 0;
 	return buf;
 }
@@ -1717,14 +1619,13 @@
 	dtime *= 1000000/HZ; /* demote time in uSec */
 	if (!test_bit(GLF_DEMOTE, &gl->gl_flags))
 		dtime = 0;
-	gfs2_print_dbg(seq, "G:  s:%s n:%u/%llu f:%s t:%s d:%s/%llu l:%d a:%d r:%d\n",
+	gfs2_print_dbg(seq, "G:  s:%s n:%u/%llu f:%s t:%s d:%s/%llu a:%d r:%d\n",
 		  state2str(gl->gl_state),
 		  gl->gl_name.ln_type,
 		  (unsigned long long)gl->gl_name.ln_number,
 		  gflags2str(gflags_buf, &gl->gl_flags),
 		  state2str(gl->gl_target),
 		  state2str(gl->gl_demote_state), dtime,
-		  atomic_read(&gl->gl_lvb_count),
 		  atomic_read(&gl->gl_ail_count),
 		  atomic_read(&gl->gl_ref));