Commit f057f6cd authored by Steven Whitehouse's avatar Steven Whitehouse Committed by Steven Whitehouse

GFS2: Merge lock_dlm module into GFS2

This is the big patch that I've been working on for some time
now. There are many reasons for wanting to make this change
such as:
 o Reducing overhead by eliminating duplicated fields between structures
 o Simplifcation of the code (reduces the code size by a fair bit)
 o The locking interface is now the DLM interface itself as proposed
   some time ago.
 o Fewer lookups of glocks when processing replies from the DLM
 o Fewer memory allocations/deallocations for each glock
 o Scope to do further optimisations in the future (but this patch is
   more than big enough for now!)

Please note that (a) this patch relates to the lock_dlm module and
not the DLM itself, that is still a separate module; and (b) that
we retain the ability to build GFS2 as a standalone single node
filesystem with out requiring the DLM.

This patch needs a lot of testing, hence my keeping it I restarted
my -git tree after the last merge window. That way, this has the maximum
exposure before its merged. This is (modulo a few minor bug fixes) the
same patch that I've been posting on and off the the last three months
and its passed a number of different tests so far.
Signed-off-by: default avatarSteven Whitehouse <swhiteho@redhat.com>
parent 22077f57
config GFS2_FS
tristate "GFS2 file system support"
depends on EXPERIMENTAL && (64BIT || LBD)
select DLM if GFS2_FS_LOCKING_DLM
select CONFIGFS_FS if GFS2_FS_LOCKING_DLM
select SYSFS if GFS2_FS_LOCKING_DLM
select IP_SCTP if DLM_SCTP
select FS_POSIX_ACL
select CRC32
help
......@@ -18,17 +22,16 @@ config GFS2_FS
the locking module below. Documentation and utilities for GFS2 can
be found here: http://sources.redhat.com/cluster
The "nolock" lock module is now built in to GFS2 by default.
The "nolock" lock module is now built in to GFS2 by default. If
you want to use the DLM, be sure to enable HOTPLUG and IPv4/6
networking.
config GFS2_FS_LOCKING_DLM
tristate "GFS2 DLM locking module"
depends on GFS2_FS && SYSFS && NET && INET && (IPV6 || IPV6=n)
select IP_SCTP if DLM_SCTP
select CONFIGFS_FS
select DLM
bool "GFS2 DLM locking"
depends on (GFS2_FS!=n) && NET && INET && (IPV6 || IPV6=n) && HOTPLUG
help
Multiple node locking module for GFS2
Most users of GFS2 will require this module. It provides the locking
Most users of GFS2 will require this. It provides the locking
interface between GFS2 and the DLM, which is required to use GFS2
in a cluster environment.
obj-$(CONFIG_GFS2_FS) += gfs2.o
gfs2-y := acl.o bmap.o dir.o eaops.o eattr.o glock.o \
glops.o inode.o log.o lops.o locking.o main.o meta_io.o \
glops.o inode.o log.o lops.o main.o meta_io.o \
mount.o ops_address.o ops_dentry.o ops_export.o ops_file.o \
ops_fstype.o ops_inode.o ops_super.o quota.o \
recovery.o rgrp.o super.o sys.o trans.o util.o
obj-$(CONFIG_GFS2_FS_LOCKING_DLM) += locking/dlm/
gfs2-$(CONFIG_GFS2_FS_LOCKING_DLM) += lock_dlm.o
......@@ -15,7 +15,6 @@
#include <linux/posix_acl.h>
#include <linux/posix_acl_xattr.h>
#include <linux/gfs2_ondisk.h>
#include <linux/lm_interface.h>
#include "gfs2.h"
#include "incore.h"
......
......@@ -13,7 +13,6 @@
#include <linux/buffer_head.h>
#include <linux/gfs2_ondisk.h>
#include <linux/crc32.h>
#include <linux/lm_interface.h>
#include "gfs2.h"
#include "incore.h"
......
......@@ -60,7 +60,6 @@
#include <linux/gfs2_ondisk.h>
#include <linux/crc32.h>
#include <linux/vmalloc.h>
#include <linux/lm_interface.h>
#include "gfs2.h"
#include "incore.h"
......
......@@ -14,7 +14,6 @@
#include <linux/capability.h>
#include <linux/xattr.h>
#include <linux/gfs2_ondisk.h>
#include <linux/lm_interface.h>
#include <asm/uaccess.h>
#include "gfs2.h"
......
......@@ -13,7 +13,6 @@
#include <linux/buffer_head.h>
#include <linux/xattr.h>
#include <linux/gfs2_ondisk.h>
#include <linux/lm_interface.h>
#include <asm/uaccess.h>
#include "gfs2.h"
......
......@@ -10,7 +10,6 @@
#include <linux/sched.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
#include <linux/completion.h>
#include <linux/buffer_head.h>
#include <linux/delay.h>
#include <linux/sort.h>
......@@ -18,7 +17,6 @@
#include <linux/kallsyms.h>
#include <linux/gfs2_ondisk.h>
#include <linux/list.h>
#include <linux/lm_interface.h>
#include <linux/wait.h>
#include <linux/module.h>
#include <linux/rwsem.h>
......@@ -155,13 +153,10 @@ static void glock_free(struct gfs2_glock *gl)
struct gfs2_sbd *sdp = gl->gl_sbd;
struct inode *aspace = gl->gl_aspace;
if (sdp->sd_lockstruct.ls_ops->lm_put_lock)
sdp->sd_lockstruct.ls_ops->lm_put_lock(gl->gl_lock);
if (aspace)
gfs2_aspace_put(aspace);
kmem_cache_free(gfs2_glock_cachep, gl);
sdp->sd_lockstruct.ls_ops->lm_put_lock(gfs2_glock_cachep, gl);
}
/**
......@@ -211,7 +206,6 @@ int gfs2_glock_put(struct gfs2_glock *gl)
atomic_dec(&lru_count);
}
spin_unlock(&lru_lock);
GLOCK_BUG_ON(gl, gl->gl_state != LM_ST_UNLOCKED);
GLOCK_BUG_ON(gl, !list_empty(&gl->gl_lru));
GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
glock_free(gl);
......@@ -255,27 +249,6 @@ static struct gfs2_glock *search_bucket(unsigned int hash,
return NULL;
}
/**
* gfs2_glock_find() - Find glock by lock number
* @sdp: The GFS2 superblock
* @name: The lock name
*
* Returns: NULL, or the struct gfs2_glock with the requested number
*/
static struct gfs2_glock *gfs2_glock_find(const struct gfs2_sbd *sdp,
const struct lm_lockname *name)
{
unsigned int hash = gl_hash(sdp, name);
struct gfs2_glock *gl;
read_lock(gl_lock_addr(hash));
gl = search_bucket(hash, sdp, name);
read_unlock(gl_lock_addr(hash));
return gl;
}
/**
* may_grant - check if its ok to grant a new lock
* @gl: The glock
......@@ -523,7 +496,7 @@ out_locked:
}
static unsigned int gfs2_lm_lock(struct gfs2_sbd *sdp, void *lock,
unsigned int cur_state, unsigned int req_state,
unsigned int req_state,
unsigned int flags)
{
int ret = LM_OUT_ERROR;
......@@ -532,7 +505,7 @@ static unsigned int gfs2_lm_lock(struct gfs2_sbd *sdp, void *lock,
return req_state == LM_ST_UNLOCKED ? 0 : req_state;
if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
ret = sdp->sd_lockstruct.ls_ops->lm_lock(lock, cur_state,
ret = sdp->sd_lockstruct.ls_ops->lm_lock(lock,
req_state, flags);
return ret;
}
......@@ -575,7 +548,7 @@ __acquires(&gl->gl_spin)
gl->gl_state == LM_ST_DEFERRED) &&
!(lck_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)))
lck_flags |= LM_FLAG_TRY_1CB;
ret = gfs2_lm_lock(sdp, gl->gl_lock, gl->gl_state, target, lck_flags);
ret = gfs2_lm_lock(sdp, gl, target, lck_flags);
if (!(ret & LM_OUT_ASYNC)) {
finish_xmote(gl, ret);
......@@ -681,18 +654,6 @@ static void glock_work_func(struct work_struct *work)
gfs2_glock_put(gl);
}
static int gfs2_lm_get_lock(struct gfs2_sbd *sdp, struct lm_lockname *name,
void **lockp)
{
int error = -EIO;
if (!sdp->sd_lockstruct.ls_ops->lm_get_lock)
return 0;
if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
error = sdp->sd_lockstruct.ls_ops->lm_get_lock(
sdp->sd_lockstruct.ls_lockspace, name, lockp);
return error;
}
/**
* gfs2_glock_get() - Get a glock, or create one if one doesn't exist
* @sdp: The GFS2 superblock
......@@ -736,6 +697,9 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
gl->gl_demote_state = LM_ST_EXCLUSIVE;
gl->gl_hash = hash;
gl->gl_ops = glops;
snprintf(gl->gl_strname, GDLM_STRNAME_BYTES, "%8x%16llx", name.ln_type, (unsigned long long)number);
memset(&gl->gl_lksb, 0, sizeof(struct dlm_lksb));
gl->gl_lksb.sb_lvbptr = gl->gl_lvb;
gl->gl_stamp = jiffies;
gl->gl_tchange = jiffies;
gl->gl_object = NULL;
......@@ -753,10 +717,6 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
}
}
error = gfs2_lm_get_lock(sdp, &name, &gl->gl_lock);
if (error)
goto fail_aspace;
write_lock(gl_lock_addr(hash));
tmp = search_bucket(hash, sdp, &name);
if (tmp) {
......@@ -772,9 +732,6 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
return 0;
fail_aspace:
if (gl->gl_aspace)
gfs2_aspace_put(gl->gl_aspace);
fail:
kmem_cache_free(gfs2_glock_cachep, gl);
return error;
......@@ -966,7 +923,7 @@ do_cancel:
if (!(gh->gh_flags & LM_FLAG_PRIORITY)) {
spin_unlock(&gl->gl_spin);
if (sdp->sd_lockstruct.ls_ops->lm_cancel)
sdp->sd_lockstruct.ls_ops->lm_cancel(gl->gl_lock);
sdp->sd_lockstruct.ls_ops->lm_cancel(gl);
spin_lock(&gl->gl_spin);
}
return;
......@@ -1240,70 +1197,13 @@ void gfs2_glock_dq_uninit_m(unsigned int num_gh, struct gfs2_holder *ghs)
gfs2_glock_dq_uninit(&ghs[x]);
}
static int gfs2_lm_hold_lvb(struct gfs2_sbd *sdp, void *lock, char **lvbp)
void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state)
{
int error = -EIO;
if (!sdp->sd_lockstruct.ls_ops->lm_hold_lvb)
return 0;
if (likely(!test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
error = sdp->sd_lockstruct.ls_ops->lm_hold_lvb(lock, lvbp);
return error;
}
/**
* gfs2_lvb_hold - attach a LVB from a glock
* @gl: The glock in question
*
*/
int gfs2_lvb_hold(struct gfs2_glock *gl)
{
int error;
if (!atomic_read(&gl->gl_lvb_count)) {
error = gfs2_lm_hold_lvb(gl->gl_sbd, gl->gl_lock, &gl->gl_lvb);
if (error)
return error;
gfs2_glock_hold(gl);
}
atomic_inc(&gl->gl_lvb_count);
return 0;
}
/**
* gfs2_lvb_unhold - detach a LVB from a glock
* @gl: The glock in question
*
*/
void gfs2_lvb_unhold(struct gfs2_glock *gl)
{
struct gfs2_sbd *sdp = gl->gl_sbd;
gfs2_glock_hold(gl);
gfs2_assert(gl->gl_sbd, atomic_read(&gl->gl_lvb_count) > 0);
if (atomic_dec_and_test(&gl->gl_lvb_count)) {
if (sdp->sd_lockstruct.ls_ops->lm_unhold_lvb)
sdp->sd_lockstruct.ls_ops->lm_unhold_lvb(gl->gl_lock, gl->gl_lvb);
gl->gl_lvb = NULL;
gfs2_glock_put(gl);
}
gfs2_glock_put(gl);
}
static void blocking_cb(struct gfs2_sbd *sdp, struct lm_lockname *name,
unsigned int state)
{
struct gfs2_glock *gl;
unsigned long delay = 0;
unsigned long holdtime;
unsigned long now = jiffies;
gl = gfs2_glock_find(sdp, name);
if (!gl)
return;
gfs2_glock_hold(gl);
holdtime = gl->gl_tchange + gl->gl_ops->go_min_hold_time;
if (time_before(now, holdtime))
delay = holdtime - now;
......@@ -1317,74 +1217,37 @@ static void blocking_cb(struct gfs2_sbd *sdp, struct lm_lockname *name,
gfs2_glock_put(gl);
}
static void gfs2_jdesc_make_dirty(struct gfs2_sbd *sdp, unsigned int jid)
{
struct gfs2_jdesc *jd;
spin_lock(&sdp->sd_jindex_spin);
list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
if (jd->jd_jid != jid)
continue;
jd->jd_dirty = 1;
break;
}
spin_unlock(&sdp->sd_jindex_spin);
}
/**
* gfs2_glock_cb - Callback used by locking module
* @sdp: Pointer to the superblock
* @type: Type of callback
* @data: Type dependent data pointer
* gfs2_glock_complete - Callback used by locking
* @gl: Pointer to the glock
* @ret: The return value from the dlm
*
* Called by the locking module when it wants to tell us something.
* Either we need to drop a lock, one of our ASYNC requests completed, or
* a journal from another client needs to be recovered.
*/
void gfs2_glock_cb(void *cb_data, unsigned int type, void *data)
void gfs2_glock_complete(struct gfs2_glock *gl, int ret)
{
struct gfs2_sbd *sdp = cb_data;
switch (type) {
case LM_CB_NEED_E:
blocking_cb(sdp, data, LM_ST_UNLOCKED);
return;
case LM_CB_NEED_D:
blocking_cb(sdp, data, LM_ST_DEFERRED);
return;
case LM_CB_NEED_S:
blocking_cb(sdp, data, LM_ST_SHARED);
return;
case LM_CB_ASYNC: {
struct lm_async_cb *async = data;
struct gfs2_glock *gl;
down_read(&gfs2_umount_flush_sem);
gl = gfs2_glock_find(sdp, &async->lc_name);
if (gfs2_assert_warn(sdp, gl))
struct lm_lockstruct *ls = &gl->gl_sbd->sd_lockstruct;
down_read(&gfs2_umount_flush_sem);
gl->gl_reply = ret;
if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_flags))) {
struct gfs2_holder *gh;
spin_lock(&gl->gl_spin);
gh = find_first_waiter(gl);
if ((!(gh && (gh->gh_flags & LM_FLAG_NOEXP)) &&
(gl->gl_target != LM_ST_UNLOCKED)) ||
((ret & ~LM_OUT_ST_MASK) != 0))
set_bit(GLF_FROZEN, &gl->gl_flags);
spin_unlock(&gl->gl_spin);
if (test_bit(GLF_FROZEN, &gl->gl_flags)) {
up_read(&gfs2_umount_flush_sem);
return;
gl->gl_reply = async->lc_ret;
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
gfs2_glock_put(gl);
up_read(&gfs2_umount_flush_sem);
return;
}
case LM_CB_NEED_RECOVERY:
gfs2_jdesc_make_dirty(sdp, *(unsigned int *)data);
if (sdp->sd_recoverd_process)
wake_up_process(sdp->sd_recoverd_process);
return;
default:
gfs2_assert_warn(sdp, 0);
return;
}
}
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
gfs2_glock_hold(gl);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
gfs2_glock_put(gl);
up_read(&gfs2_umount_flush_sem);
}
/**
......@@ -1515,6 +1378,27 @@ out:
return has_entries;
}
/**
* thaw_glock - thaw out a glock which has an unprocessed reply waiting
* @gl: The glock to thaw
*
* N.B. When we freeze a glock, we leave a ref to the glock outstanding,
* so this has to result in the ref count being dropped by one.
*/
static void thaw_glock(struct gfs2_glock *gl)
{
if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))
return;
down_read(&gfs2_umount_flush_sem);
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
gfs2_glock_hold(gl);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
gfs2_glock_put(gl);
up_read(&gfs2_umount_flush_sem);
}
/**
* clear_glock - look at a glock and see if we can free it from glock cache
* @gl: the glock to look at
......@@ -1539,6 +1423,20 @@ static void clear_glock(struct gfs2_glock *gl)
gfs2_glock_put(gl);
}
/**
* gfs2_glock_thaw - Thaw any frozen glocks
* @sdp: The super block
*
*/
void gfs2_glock_thaw(struct gfs2_sbd *sdp)
{
unsigned x;
for (x = 0; x < GFS2_GL_HASH_SIZE; x++)
examine_bucket(thaw_glock, sdp, x);
}
/**
* gfs2_gl_hash_clear - Empty out the glock hash table
* @sdp: the filesystem
......@@ -1619,7 +1517,7 @@ static const char *hflags2str(char *buf, unsigned flags, unsigned long iflags)
if (flags & LM_FLAG_NOEXP)
*p++ = 'e';
if (flags & LM_FLAG_ANY)
*p++ = 'a';
*p++ = 'A';
if (flags & LM_FLAG_PRIORITY)
*p++ = 'p';
if (flags & GL_ASYNC)
......@@ -1683,6 +1581,10 @@ static const char *gflags2str(char *buf, const unsigned long *gflags)
*p++ = 'i';
if (test_bit(GLF_REPLY_PENDING, gflags))
*p++ = 'r';
if (test_bit(GLF_INITIAL, gflags))
*p++ = 'i';
if (test_bit(GLF_FROZEN, gflags))
*p++ = 'F';
*p = 0;
return buf;
}
......@@ -1717,14 +1619,13 @@ static int __dump_glock(struct seq_file *seq, const struct gfs2_glock *gl)
dtime *= 1000000/HZ; /* demote time in uSec */
if (!test_bit(GLF_DEMOTE, &gl->gl_flags))
dtime = 0;
gfs2_print_dbg(seq, "G: s:%s n:%u/%llu f:%s t:%s d:%s/%llu l:%d a:%d r:%d\n",
gfs2_print_dbg(seq, "G: s:%s n:%u/%llu f:%s t:%s d:%s/%llu a:%d r:%d\n",
state2str(gl->gl_state),
gl->gl_name.ln_type,
(unsigned long long)gl->gl_name.ln_number,
gflags2str(gflags_buf, &gl->gl_flags),
state2str(gl->gl_target),
state2str(gl->gl_demote_state), dtime,
atomic_read(&gl->gl_lvb_count),
atomic_read(&gl->gl_ail_count),
atomic_read(&gl->gl_ref));
......
......@@ -11,15 +11,130 @@
#define __GLOCK_DOT_H__
#include <linux/sched.h>
#include <linux/parser.h>
#include "incore.h"
/* Flags for lock requests; used in gfs2_holder gh_flag field.
From lm_interface.h:
/* Options for hostdata parser */
enum {
Opt_jid,
Opt_id,
Opt_first,
Opt_nodir,
Opt_err,
};
/*
* lm_lockname types
*/
#define LM_TYPE_RESERVED 0x00
#define LM_TYPE_NONDISK 0x01
#define LM_TYPE_INODE 0x02
#define LM_TYPE_RGRP 0x03
#define LM_TYPE_META 0x04
#define LM_TYPE_IOPEN 0x05
#define LM_TYPE_FLOCK 0x06
#define LM_TYPE_PLOCK 0x07
#define LM_TYPE_QUOTA 0x08
#define LM_TYPE_JOURNAL 0x09
/*
* lm_lock() states
*
* SHARED is compatible with SHARED, not with DEFERRED or EX.
* DEFERRED is compatible with DEFERRED, not with SHARED or EX.
*/
#define LM_ST_UNLOCKED 0
#define LM_ST_EXCLUSIVE 1
#define LM_ST_DEFERRED 2
#define LM_ST_SHARED 3
/*
* lm_lock() flags
*
* LM_FLAG_TRY
* Don't wait to acquire the lock if it can't be granted immediately.
*
* LM_FLAG_TRY_1CB
* Send one blocking callback if TRY is set and the lock is not granted.
*
* LM_FLAG_NOEXP
* GFS sets this flag on lock requests it makes while doing journal recovery.
* These special requests should not be blocked due to the recovery like
* ordinary locks would be.
*
* LM_FLAG_ANY
* A SHARED request may also be granted in DEFERRED, or a DEFERRED request may
* also be granted in SHARED. The preferred state is whichever is compatible
* with other granted locks, or the specified state if no other locks exist.
*
* LM_FLAG_PRIORITY
* Override fairness considerations. Suppose a lock is held in a shared state
* and there is a pending request for the deferred state. A shared lock
* request with the priority flag would be allowed to bypass the deferred
* request and directly join the other shared lock. A shared lock request
* without the priority flag might be forced to wait until the deferred
* requested had acquired and released the lock.
*/
#define LM_FLAG_TRY 0x00000001
#define LM_FLAG_TRY_1CB 0x00000002
#define LM_FLAG_NOEXP 0x00000004
#define LM_FLAG_ANY 0x00000008
#define LM_FLAG_PRIORITY 0x00000010 */
#define LM_FLAG_PRIORITY 0x00000010
#define GL_ASYNC 0x00000040
#define GL_EXACT 0x00000080
#define GL_SKIP 0x00000100
#define GL_ATIME 0x00000200
#define GL_NOCACHE 0x00000400
/*
* lm_lock() and lm_async_cb return flags
*
* LM_OUT_ST_MASK
* Masks the lower two bits of lock state in the returned value.
*
* LM_OUT_CANCELED
* The lock request was canceled.
*
* LM_OUT_ASYNC
* The result of the request will be returned in an LM_CB_ASYNC callback.
*
*/
#define LM_OUT_ST_MASK 0x00000003
#define LM_OUT_CANCELED 0x00000008
#define LM_OUT_ASYNC 0x00000080
#define LM_OUT_ERROR 0x00000100
/*
* lm_recovery_done() messages
*/
#define LM_RD_GAVEUP 308
#define LM_RD_SUCCESS 309
#define GLR_TRYFAILED 13
struct lm_lockops {
const char *lm_proto_name;
int (*lm_mount) (struct gfs2_sbd *sdp, const char *fsname);
void (*lm_unmount) (struct gfs2_sbd *sdp);
void (*lm_withdraw) (struct gfs2_sbd *sdp);
void (*lm_put_lock) (struct kmem_cache *cachep, void *gl);
unsigned int (*lm_lock) (struct gfs2_glock *gl,
unsigned int req_state, unsigned int flags);
void (*lm_cancel) (struct gfs2_glock *gl);
const match_table_t *lm_tokens;
};
#define LM_FLAG_TRY 0x00000001
#define LM_FLAG_TRY_1CB 0x00000002
#define LM_FLAG_NOEXP 0x00000004
#define LM_FLAG_ANY 0x00000008
#define LM_FLAG_PRIORITY 0x00000010