GFS2: Merge lock_dlm module into GFS2
This is the big patch that I've been working on for some time
now. There are many reasons for wanting to make this change
such as:
o Reducing overhead by eliminating duplicated fields between structures
o Simplifcation of the code (reduces the code size by a fair bit)
o The locking interface is now the DLM interface itself as proposed
some time ago.
o Fewer lookups of glocks when processing replies from the DLM
o Fewer memory allocations/deallocations for each glock
o Scope to do further optimisations in the future (but this patch is
more than big enough for now!)
Please note that (a) this patch relates to the lock_dlm module and
not the DLM itself, that is still a separate module; and (b) that
we retain the ability to build GFS2 as a standalone single node
filesystem with out requiring the DLM.
This patch needs a lot of testing, hence my keeping it I restarted
my -git tree after the last merge window. That way, this has the maximum
exposure before its merged. This is (modulo a few minor bug fixes) the
same patch that I've been posting on and off the the last three months
and its passed a number of different tests so far.
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 6b983ae..cd200a5 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -10,7 +10,6 @@
#include <linux/sched.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
-#include <linux/completion.h>
#include <linux/buffer_head.h>
#include <linux/delay.h>
#include <linux/sort.h>
@@ -18,7 +17,6 @@
#include <linux/kallsyms.h>
#include <linux/gfs2_ondisk.h>
#include <linux/list.h>
-#include <linux/lm_interface.h>
#include <linux/wait.h>
#include <linux/module.h>
#include <linux/rwsem.h>
@@ -155,13 +153,10 @@
struct gfs2_sbd *sdp = gl->gl_sbd;
struct inode *aspace = gl->gl_aspace;
- if (sdp->sd_lockstruct.ls_ops->lm_put_lock)
- sdp->sd_lockstruct.ls_ops->lm_put_lock(gl->gl_lock);
-
if (aspace)
gfs2_aspace_put(aspace);
- kmem_cache_free(gfs2_glock_cachep, gl);
+ sdp->sd_lockstruct.ls_ops->lm_put_lock(gfs2_glock_cachep, gl);
}
/**
@@ -211,7 +206,6 @@
atomic_dec(&lru_count);
}
spin_unlock(&lru_lock);
- GLOCK_BUG_ON(gl, gl->gl_state != LM_ST_UNLOCKED);
GLOCK_BUG_ON(gl, !list_empty(&gl->gl_lru));
GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
glock_free(gl);
@@ -256,27 +250,6 @@
}
/**
- * gfs2_glock_find() - Find glock by lock number
- * @sdp: The GFS2 superblock
- * @name: The lock name
- *
- * Returns: NULL, or the struct gfs2_glock with the requested number
- */
-
-static struct gfs2_glock *gfs2_glock_find(const struct gfs2_sbd *sdp,
- const struct lm_lockname *name)
-{
- unsigned int hash = gl_hash(sdp, name);
- struct gfs2_glock *gl;
-
- read_lock(gl_lock_addr(hash));
- gl = search_bucket(hash, sdp, name);
- read_unlock(gl_lock_addr(hash));
-
- return gl;
-}
-
-/**
* may_grant - check if its ok to grant a new lock
* @gl: The glock
* @gh: The lock request which we wish to grant
@@ -523,7 +496,7 @@
}
static unsigned int gfs2_lm_lock(struct gfs2_sbd *sdp, void *lock,
- unsigned int cur_state, unsigned int req_state,
+ unsigned int req_state,
unsigned int flags)
{
int ret = LM_OUT_ERROR;
@@ -532,7 +505,7 @@
return req_state == LM_ST_UNLOCKED ? 0 : req_state;
if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
- ret = sdp->sd_lockstruct.ls_ops->lm_lock(lock, cur_state,
+ ret = sdp->sd_lockstruct.ls_ops->lm_lock(lock,
req_state, flags);
return ret;
}
@@ -575,7 +548,7 @@
gl->gl_state == LM_ST_DEFERRED) &&
!(lck_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)))
lck_flags |= LM_FLAG_TRY_1CB;
- ret = gfs2_lm_lock(sdp, gl->gl_lock, gl->gl_state, target, lck_flags);
+ ret = gfs2_lm_lock(sdp, gl, target, lck_flags);
if (!(ret & LM_OUT_ASYNC)) {
finish_xmote(gl, ret);
@@ -681,18 +654,6 @@
gfs2_glock_put(gl);
}
-static int gfs2_lm_get_lock(struct gfs2_sbd *sdp, struct lm_lockname *name,
- void **lockp)
-{
- int error = -EIO;
- if (!sdp->sd_lockstruct.ls_ops->lm_get_lock)
- return 0;
- if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
- error = sdp->sd_lockstruct.ls_ops->lm_get_lock(
- sdp->sd_lockstruct.ls_lockspace, name, lockp);
- return error;
-}
-
/**
* gfs2_glock_get() - Get a glock, or create one if one doesn't exist
* @sdp: The GFS2 superblock
@@ -736,6 +697,9 @@
gl->gl_demote_state = LM_ST_EXCLUSIVE;
gl->gl_hash = hash;
gl->gl_ops = glops;
+ snprintf(gl->gl_strname, GDLM_STRNAME_BYTES, "%8x%16llx", name.ln_type, (unsigned long long)number);
+ memset(&gl->gl_lksb, 0, sizeof(struct dlm_lksb));
+ gl->gl_lksb.sb_lvbptr = gl->gl_lvb;
gl->gl_stamp = jiffies;
gl->gl_tchange = jiffies;
gl->gl_object = NULL;
@@ -753,10 +717,6 @@
}
}
- error = gfs2_lm_get_lock(sdp, &name, &gl->gl_lock);
- if (error)
- goto fail_aspace;
-
write_lock(gl_lock_addr(hash));
tmp = search_bucket(hash, sdp, &name);
if (tmp) {
@@ -772,9 +732,6 @@
return 0;
-fail_aspace:
- if (gl->gl_aspace)
- gfs2_aspace_put(gl->gl_aspace);
fail:
kmem_cache_free(gfs2_glock_cachep, gl);
return error;
@@ -966,7 +923,7 @@
if (!(gh->gh_flags & LM_FLAG_PRIORITY)) {
spin_unlock(&gl->gl_spin);
if (sdp->sd_lockstruct.ls_ops->lm_cancel)
- sdp->sd_lockstruct.ls_ops->lm_cancel(gl->gl_lock);
+ sdp->sd_lockstruct.ls_ops->lm_cancel(gl);
spin_lock(&gl->gl_spin);
}
return;
@@ -1240,70 +1197,13 @@
gfs2_glock_dq_uninit(&ghs[x]);
}
-static int gfs2_lm_hold_lvb(struct gfs2_sbd *sdp, void *lock, char **lvbp)
+void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state)
{
- int error = -EIO;
- if (!sdp->sd_lockstruct.ls_ops->lm_hold_lvb)
- return 0;
- if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
- error = sdp->sd_lockstruct.ls_ops->lm_hold_lvb(lock, lvbp);
- return error;
-}
-
-/**
- * gfs2_lvb_hold - attach a LVB from a glock
- * @gl: The glock in question
- *
- */
-
-int gfs2_lvb_hold(struct gfs2_glock *gl)
-{
- int error;
-
- if (!atomic_read(&gl->gl_lvb_count)) {
- error = gfs2_lm_hold_lvb(gl->gl_sbd, gl->gl_lock, &gl->gl_lvb);
- if (error)
- return error;
- gfs2_glock_hold(gl);
- }
- atomic_inc(&gl->gl_lvb_count);
-
- return 0;
-}
-
-/**
- * gfs2_lvb_unhold - detach a LVB from a glock
- * @gl: The glock in question
- *
- */
-
-void gfs2_lvb_unhold(struct gfs2_glock *gl)
-{
- struct gfs2_sbd *sdp = gl->gl_sbd;
-
- gfs2_glock_hold(gl);
- gfs2_assert(gl->gl_sbd, atomic_read(&gl->gl_lvb_count) > 0);
- if (atomic_dec_and_test(&gl->gl_lvb_count)) {
- if (sdp->sd_lockstruct.ls_ops->lm_unhold_lvb)
- sdp->sd_lockstruct.ls_ops->lm_unhold_lvb(gl->gl_lock, gl->gl_lvb);
- gl->gl_lvb = NULL;
- gfs2_glock_put(gl);
- }
- gfs2_glock_put(gl);
-}
-
-static void blocking_cb(struct gfs2_sbd *sdp, struct lm_lockname *name,
- unsigned int state)
-{
- struct gfs2_glock *gl;
unsigned long delay = 0;
unsigned long holdtime;
unsigned long now = jiffies;
- gl = gfs2_glock_find(sdp, name);
- if (!gl)
- return;
-
+ gfs2_glock_hold(gl);
holdtime = gl->gl_tchange + gl->gl_ops->go_min_hold_time;
if (time_before(now, holdtime))
delay = holdtime - now;
@@ -1317,74 +1217,37 @@
gfs2_glock_put(gl);
}
-static void gfs2_jdesc_make_dirty(struct gfs2_sbd *sdp, unsigned int jid)
-{
- struct gfs2_jdesc *jd;
-
- spin_lock(&sdp->sd_jindex_spin);
- list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
- if (jd->jd_jid != jid)
- continue;
- jd->jd_dirty = 1;
- break;
- }
- spin_unlock(&sdp->sd_jindex_spin);
-}
-
/**
- * gfs2_glock_cb - Callback used by locking module
- * @sdp: Pointer to the superblock
- * @type: Type of callback
- * @data: Type dependent data pointer
+ * gfs2_glock_complete - Callback used by locking
+ * @gl: Pointer to the glock
+ * @ret: The return value from the dlm
*
- * Called by the locking module when it wants to tell us something.
- * Either we need to drop a lock, one of our ASYNC requests completed, or
- * a journal from another client needs to be recovered.
*/
-void gfs2_glock_cb(void *cb_data, unsigned int type, void *data)
+void gfs2_glock_complete(struct gfs2_glock *gl, int ret)
{
- struct gfs2_sbd *sdp = cb_data;
-
- switch (type) {
- case LM_CB_NEED_E:
- blocking_cb(sdp, data, LM_ST_UNLOCKED);
- return;
-
- case LM_CB_NEED_D:
- blocking_cb(sdp, data, LM_ST_DEFERRED);
- return;
-
- case LM_CB_NEED_S:
- blocking_cb(sdp, data, LM_ST_SHARED);
- return;
-
- case LM_CB_ASYNC: {
- struct lm_async_cb *async = data;
- struct gfs2_glock *gl;
-
- down_read(&gfs2_umount_flush_sem);
- gl = gfs2_glock_find(sdp, &async->lc_name);
- if (gfs2_assert_warn(sdp, gl))
+ struct lm_lockstruct *ls = &gl->gl_sbd->sd_lockstruct;
+ down_read(&gfs2_umount_flush_sem);
+ gl->gl_reply = ret;
+ if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_flags))) {
+ struct gfs2_holder *gh;
+ spin_lock(&gl->gl_spin);
+ gh = find_first_waiter(gl);
+ if ((!(gh && (gh->gh_flags & LM_FLAG_NOEXP)) &&
+ (gl->gl_target != LM_ST_UNLOCKED)) ||
+ ((ret & ~LM_OUT_ST_MASK) != 0))
+ set_bit(GLF_FROZEN, &gl->gl_flags);
+ spin_unlock(&gl->gl_spin);
+ if (test_bit(GLF_FROZEN, &gl->gl_flags)) {
+ up_read(&gfs2_umount_flush_sem);
return;
- gl->gl_reply = async->lc_ret;
- set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
- gfs2_glock_put(gl);
- up_read(&gfs2_umount_flush_sem);
- return;
+ }
}
-
- case LM_CB_NEED_RECOVERY:
- gfs2_jdesc_make_dirty(sdp, *(unsigned int *)data);
- if (sdp->sd_recoverd_process)
- wake_up_process(sdp->sd_recoverd_process);
- return;
-
- default:
- gfs2_assert_warn(sdp, 0);
- return;
- }
+ set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+ gfs2_glock_hold(gl);
+ if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
+ gfs2_glock_put(gl);
+ up_read(&gfs2_umount_flush_sem);
}
/**
@@ -1515,6 +1378,27 @@
return has_entries;
}
+
+/**
+ * thaw_glock - thaw out a glock which has an unprocessed reply waiting
+ * @gl: The glock to thaw
+ *
+ * N.B. When we freeze a glock, we leave a ref to the glock outstanding,
+ * so this has to result in the ref count being dropped by one.
+ */
+
+static void thaw_glock(struct gfs2_glock *gl)
+{
+ if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))
+ return;
+ down_read(&gfs2_umount_flush_sem);
+ set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+ gfs2_glock_hold(gl);
+ if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
+ gfs2_glock_put(gl);
+ up_read(&gfs2_umount_flush_sem);
+}
+
/**
* clear_glock - look at a glock and see if we can free it from glock cache
* @gl: the glock to look at
@@ -1540,6 +1424,20 @@
}
/**
+ * gfs2_glock_thaw - Thaw any frozen glocks
+ * @sdp: The super block
+ *
+ */
+
+void gfs2_glock_thaw(struct gfs2_sbd *sdp)
+{
+ unsigned x;
+
+ for (x = 0; x < GFS2_GL_HASH_SIZE; x++)
+ examine_bucket(thaw_glock, sdp, x);
+}
+
+/**
* gfs2_gl_hash_clear - Empty out the glock hash table
* @sdp: the filesystem
* @wait: wait until it's all gone
@@ -1619,7 +1517,7 @@
if (flags & LM_FLAG_NOEXP)
*p++ = 'e';
if (flags & LM_FLAG_ANY)
- *p++ = 'a';
+ *p++ = 'A';
if (flags & LM_FLAG_PRIORITY)
*p++ = 'p';
if (flags & GL_ASYNC)
@@ -1683,6 +1581,10 @@
*p++ = 'i';
if (test_bit(GLF_REPLY_PENDING, gflags))
*p++ = 'r';
+ if (test_bit(GLF_INITIAL, gflags))
+ *p++ = 'i';
+ if (test_bit(GLF_FROZEN, gflags))
+ *p++ = 'F';
*p = 0;
return buf;
}
@@ -1717,14 +1619,13 @@
dtime *= 1000000/HZ; /* demote time in uSec */
if (!test_bit(GLF_DEMOTE, &gl->gl_flags))
dtime = 0;
- gfs2_print_dbg(seq, "G: s:%s n:%u/%llu f:%s t:%s d:%s/%llu l:%d a:%d r:%d\n",
+ gfs2_print_dbg(seq, "G: s:%s n:%u/%llu f:%s t:%s d:%s/%llu a:%d r:%d\n",
state2str(gl->gl_state),
gl->gl_name.ln_type,
(unsigned long long)gl->gl_name.ln_number,
gflags2str(gflags_buf, &gl->gl_flags),
state2str(gl->gl_target),
state2str(gl->gl_demote_state), dtime,
- atomic_read(&gl->gl_lvb_count),
atomic_read(&gl->gl_ail_count),
atomic_read(&gl->gl_ref));