Commit 498f7f50 authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2

* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2: (22 commits)
  MAINTAINERS: Update Joel Becker's email address
  ocfs2: Remove unused truncate function from alloc.c
  ocfs2/cluster: dereferencing before checking in nst_seq_show()
  ocfs2: fix build for OCFS2_FS_STATS not enabled
  ocfs2/cluster: Show o2net timing statistics
  ocfs2/cluster: Track process message timing stats for each socket
  ocfs2/cluster: Track send message timing stats for each socket
  ocfs2/cluster: Use ktime instead of timeval in struct o2net_sock_container
  ocfs2/cluster: Replace timeval with ktime in struct o2net_send_tracking
  ocfs2: Add DEBUG_FS dependency
  ocfs2/dlm: Hard code the values for enums
  ocfs2/dlm: Minor cleanup
  ocfs2/dlm: Cleanup dlmdebug.c
  ocfs2: Release buffer_head in case of error in ocfs2_double_lock.
  ocfs2/cluster: Pin the local node when o2hb thread starts
  ocfs2/cluster: Show pin state for each o2hb region
  ocfs2/cluster: Pin/unpin o2hb regions
  ocfs2/cluster: Remove dropped region from o2hb quorum region bitmap
  ocfs2/cluster: Pin the remote node item in configfs
  ocfs2/dlm: make existing convertion precedent over new lock
  ...
parents 0969d11e d6351db2
......@@ -1785,7 +1785,8 @@ S: Maintained
F: drivers/usb/atm/cxacru.c
CONFIGFS
M: Joel Becker <joel.becker@oracle.com>
M: Joel Becker <jlbec@evilplan.org>
T: git git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/configfs.git
S: Supported
F: fs/configfs/
F: include/linux/configfs.h
......@@ -4549,7 +4550,7 @@ F: include/linux/oprofile.h
ORACLE CLUSTER FILESYSTEM 2 (OCFS2)
M: Mark Fasheh <mfasheh@suse.com>
M: Joel Becker <joel.becker@oracle.com>
M: Joel Becker <jlbec@evilplan.org>
L: ocfs2-devel@oss.oracle.com (moderated for non-subscribers)
W: http://oss.oracle.com/projects/ocfs2/
T: git git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2.git
......
......@@ -51,7 +51,7 @@ config OCFS2_FS_USERSPACE_CLUSTER
config OCFS2_FS_STATS
bool "OCFS2 statistics"
depends on OCFS2_FS
depends on OCFS2_FS && DEBUG_FS
default y
help
This option allows some fs statistics to be captured. Enabling
......
......@@ -565,7 +565,6 @@ static inline int ocfs2_et_sanity_check(struct ocfs2_extent_tree *et)
return ret;
}
static void ocfs2_free_truncate_context(struct ocfs2_truncate_context *tc);
static int ocfs2_cache_extent_block_free(struct ocfs2_cached_dealloc_ctxt *ctxt,
struct ocfs2_extent_block *eb);
static void ocfs2_adjust_rightmost_records(handle_t *handle,
......@@ -5858,6 +5857,7 @@ int ocfs2_truncate_log_append(struct ocfs2_super *osb,
ocfs2_journal_dirty(handle, tl_bh);
osb->truncated_clusters += num_clusters;
bail:
mlog_exit(status);
return status;
......@@ -5929,6 +5929,8 @@ static int ocfs2_replay_truncate_records(struct ocfs2_super *osb,
i--;
}
osb->truncated_clusters = 0;
bail:
mlog_exit(status);
return status;
......@@ -7138,64 +7140,6 @@ bail:
return status;
}
/*
* Expects the inode to already be locked.
*/
int ocfs2_prepare_truncate(struct ocfs2_super *osb,
struct inode *inode,
struct buffer_head *fe_bh,
struct ocfs2_truncate_context **tc)
{
int status;
unsigned int new_i_clusters;
struct ocfs2_dinode *fe;
struct ocfs2_extent_block *eb;
struct buffer_head *last_eb_bh = NULL;
mlog_entry_void();
*tc = NULL;
new_i_clusters = ocfs2_clusters_for_bytes(osb->sb,
i_size_read(inode));
fe = (struct ocfs2_dinode *) fe_bh->b_data;
mlog(0, "fe->i_clusters = %u, new_i_clusters = %u, fe->i_size ="
"%llu\n", le32_to_cpu(fe->i_clusters), new_i_clusters,
(unsigned long long)le64_to_cpu(fe->i_size));
*tc = kzalloc(sizeof(struct ocfs2_truncate_context), GFP_KERNEL);
if (!(*tc)) {
status = -ENOMEM;
mlog_errno(status);
goto bail;
}
ocfs2_init_dealloc_ctxt(&(*tc)->tc_dealloc);
if (fe->id2.i_list.l_tree_depth) {
status = ocfs2_read_extent_block(INODE_CACHE(inode),
le64_to_cpu(fe->i_last_eb_blk),
&last_eb_bh);
if (status < 0) {
mlog_errno(status);
goto bail;
}
eb = (struct ocfs2_extent_block *) last_eb_bh->b_data;
}
(*tc)->tc_last_eb_bh = last_eb_bh;
status = 0;
bail:
if (status < 0) {
if (*tc)
ocfs2_free_truncate_context(*tc);
*tc = NULL;
}
mlog_exit_void();
return status;
}
/*
* 'start' is inclusive, 'end' is not.
*/
......@@ -7270,18 +7214,3 @@ out_commit:
out:
return ret;
}
static void ocfs2_free_truncate_context(struct ocfs2_truncate_context *tc)
{
/*
* The caller is responsible for completing deallocation
* before freeing the context.
*/
if (tc->tc_dealloc.c_first_suballocator != NULL)
mlog(ML_NOTICE,
"Truncate completion has non-empty dealloc context\n");
brelse(tc->tc_last_eb_bh);
kfree(tc);
}
......@@ -228,10 +228,6 @@ struct ocfs2_truncate_context {
int ocfs2_zero_range_for_truncate(struct inode *inode, handle_t *handle,
u64 range_start, u64 range_end);
int ocfs2_prepare_truncate(struct ocfs2_super *osb,
struct inode *inode,
struct buffer_head *fe_bh,
struct ocfs2_truncate_context **tc);
int ocfs2_commit_truncate(struct ocfs2_super *osb,
struct inode *inode,
struct buffer_head *di_bh);
......
......@@ -1630,6 +1630,43 @@ static int ocfs2_zero_tail(struct inode *inode, struct buffer_head *di_bh,
return ret;
}
/*
* Try to flush truncate logs if we can free enough clusters from it.
* As for return value, "< 0" means error, "0" no space and "1" means
* we have freed enough spaces and let the caller try to allocate again.
*/
static int ocfs2_try_to_free_truncate_log(struct ocfs2_super *osb,
unsigned int needed)
{
tid_t target;
int ret = 0;
unsigned int truncated_clusters;
mutex_lock(&osb->osb_tl_inode->i_mutex);
truncated_clusters = osb->truncated_clusters;
mutex_unlock(&osb->osb_tl_inode->i_mutex);
/*
* Check whether we can succeed in allocating if we free
* the truncate log.
*/
if (truncated_clusters < needed)
goto out;
ret = ocfs2_flush_truncate_log(osb);
if (ret) {
mlog_errno(ret);
goto out;
}
if (jbd2_journal_start_commit(osb->journal->j_journal, &target)) {
jbd2_log_wait_commit(osb->journal->j_journal, target);
ret = 1;
}
out:
return ret;
}
int ocfs2_write_begin_nolock(struct file *filp,
struct address_space *mapping,
loff_t pos, unsigned len, unsigned flags,
......@@ -1637,7 +1674,7 @@ int ocfs2_write_begin_nolock(struct file *filp,
struct buffer_head *di_bh, struct page *mmap_page)
{
int ret, cluster_of_pages, credits = OCFS2_INODE_UPDATE_CREDITS;
unsigned int clusters_to_alloc, extents_to_split;
unsigned int clusters_to_alloc, extents_to_split, clusters_need = 0;
struct ocfs2_write_ctxt *wc;
struct inode *inode = mapping->host;
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
......@@ -1646,7 +1683,9 @@ int ocfs2_write_begin_nolock(struct file *filp,
struct ocfs2_alloc_context *meta_ac = NULL;
handle_t *handle;
struct ocfs2_extent_tree et;
int try_free = 1, ret1;
try_again:
ret = ocfs2_alloc_write_ctxt(&wc, osb, pos, len, di_bh);
if (ret) {
mlog_errno(ret);
......@@ -1681,6 +1720,7 @@ int ocfs2_write_begin_nolock(struct file *filp,
mlog_errno(ret);
goto out;
} else if (ret == 1) {
clusters_need = wc->w_clen;
ret = ocfs2_refcount_cow(inode, filp, di_bh,
wc->w_cpos, wc->w_clen, UINT_MAX);
if (ret) {
......@@ -1695,6 +1735,7 @@ int ocfs2_write_begin_nolock(struct file *filp,
mlog_errno(ret);
goto out;
}
clusters_need += clusters_to_alloc;
di = (struct ocfs2_dinode *)wc->w_di_bh->b_data;
......@@ -1817,6 +1858,22 @@ out:
ocfs2_free_alloc_context(data_ac);
if (meta_ac)
ocfs2_free_alloc_context(meta_ac);
if (ret == -ENOSPC && try_free) {
/*
* Try to free some truncate log so that we can have enough
* clusters to allocate.
*/
try_free = 0;
ret1 = ocfs2_try_to_free_truncate_log(osb, clusters_need);
if (ret1 == 1)
goto try_again;
if (ret1 < 0)
mlog_errno(ret1);
}
return ret;
}
......
......@@ -82,6 +82,7 @@ static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
#define O2HB_DB_TYPE_REGION_LIVENODES 4
#define O2HB_DB_TYPE_REGION_NUMBER 5
#define O2HB_DB_TYPE_REGION_ELAPSED_TIME 6
#define O2HB_DB_TYPE_REGION_PINNED 7
struct o2hb_debug_buf {
int db_type;
int db_size;
......@@ -101,6 +102,7 @@ static struct o2hb_debug_buf *o2hb_db_failedregions;
#define O2HB_DEBUG_FAILEDREGIONS "failed_regions"
#define O2HB_DEBUG_REGION_NUMBER "num"
#define O2HB_DEBUG_REGION_ELAPSED_TIME "elapsed_time_in_ms"
#define O2HB_DEBUG_REGION_PINNED "pinned"
static struct dentry *o2hb_debug_dir;
static struct dentry *o2hb_debug_livenodes;
......@@ -132,6 +134,33 @@ char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
/*
* o2hb_dependent_users tracks the number of registered callbacks that depend
* on heartbeat. o2net and o2dlm are two entities that register this callback.
* However only o2dlm depends on the heartbeat. It does not want the heartbeat
* to stop while a dlm domain is still active.
*/
unsigned int o2hb_dependent_users;
/*
* In global heartbeat mode, all regions are pinned if there are one or more
* dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
* regions are unpinned if the region count exceeds the cut off or the number
* of dependent users falls to zero.
*/
#define O2HB_PIN_CUT_OFF 3
/*
* In local heartbeat mode, we assume the dlm domain name to be the same as
* region uuid. This is true for domains created for the file system but not
* necessarily true for userdlm domains. This is a known limitation.
*
* In global heartbeat mode, we pin/unpin all o2hb regions. This solution
* works for both file system and userdlm domains.
*/
static int o2hb_region_pin(const char *region_uuid);
static void o2hb_region_unpin(const char *region_uuid);
/* Only sets a new threshold if there are no active regions.
*
* No locking or otherwise interesting code is required for reading
......@@ -186,7 +215,9 @@ struct o2hb_region {
struct config_item hr_item;
struct list_head hr_all_item;
unsigned hr_unclean_stop:1;
unsigned hr_unclean_stop:1,
hr_item_pinned:1,
hr_item_dropped:1;
/* protected by the hr_callback_sem */
struct task_struct *hr_task;
......@@ -212,9 +243,11 @@ struct o2hb_region {
struct dentry *hr_debug_livenodes;
struct dentry *hr_debug_regnum;
struct dentry *hr_debug_elapsed_time;
struct dentry *hr_debug_pinned;
struct o2hb_debug_buf *hr_db_livenodes;
struct o2hb_debug_buf *hr_db_regnum;
struct o2hb_debug_buf *hr_db_elapsed_time;
struct o2hb_debug_buf *hr_db_pinned;
/* let the person setting up hb wait for it to return until it
* has reached a 'steady' state. This will be fixed when we have
......@@ -701,6 +734,14 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg,
config_item_name(&reg->hr_item));
set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
/*
* If global heartbeat active, unpin all regions if the
* region count > CUT_OFF
*/
if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
o2hb_region_unpin(NULL);
}
static int o2hb_check_slot(struct o2hb_region *reg,
......@@ -1041,6 +1082,9 @@ static int o2hb_thread(void *data)
set_user_nice(current, -20);
/* Pin node */
o2nm_depend_this_node();
while (!kthread_should_stop() && !reg->hr_unclean_stop) {
/* We track the time spent inside
* o2hb_do_disk_heartbeat so that we avoid more than
......@@ -1090,6 +1134,9 @@ static int o2hb_thread(void *data)
mlog_errno(ret);
}
/* Unpin node */
o2nm_undepend_this_node();
mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
return 0;
......@@ -1142,6 +1189,12 @@ static int o2hb_debug_open(struct inode *inode, struct file *file)
reg->hr_last_timeout_start));
goto done;
case O2HB_DB_TYPE_REGION_PINNED:
reg = (struct o2hb_region *)db->db_data;
out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
!!reg->hr_item_pinned);
goto done;
default:
goto done;
}
......@@ -1315,6 +1368,8 @@ int o2hb_init(void)
memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
o2hb_dependent_users = 0;
return o2hb_debug_init();
}
......@@ -1384,6 +1439,7 @@ static void o2hb_region_release(struct config_item *item)
debugfs_remove(reg->hr_debug_livenodes);
debugfs_remove(reg->hr_debug_regnum);
debugfs_remove(reg->hr_debug_elapsed_time);
debugfs_remove(reg->hr_debug_pinned);
debugfs_remove(reg->hr_debug_dir);
spin_lock(&o2hb_live_lock);
......@@ -1948,6 +2004,18 @@ static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
goto bail;
}
reg->hr_debug_pinned =
o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
reg->hr_debug_dir,
&(reg->hr_db_pinned),
sizeof(*(reg->hr_db_pinned)),
O2HB_DB_TYPE_REGION_PINNED,
0, 0, reg);
if (!reg->hr_debug_pinned) {
mlog_errno(ret);
goto bail;
}
ret = 0;
bail:
return ret;
......@@ -2002,15 +2070,20 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group,
{
struct task_struct *hb_task;
struct o2hb_region *reg = to_o2hb_region(item);
int quorum_region = 0;
/* stop the thread when the user removes the region dir */
spin_lock(&o2hb_live_lock);
if (o2hb_global_heartbeat_active()) {
clear_bit(reg->hr_region_num, o2hb_region_bitmap);
clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
quorum_region = 1;
clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
}
hb_task = reg->hr_task;
reg->hr_task = NULL;
reg->hr_item_dropped = 1;
spin_unlock(&o2hb_live_lock);
if (hb_task)
......@@ -2028,7 +2101,27 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group,
if (o2hb_global_heartbeat_active())
printk(KERN_NOTICE "o2hb: Heartbeat stopped on region %s\n",
config_item_name(&reg->hr_item));
config_item_put(item);
if (!o2hb_global_heartbeat_active() || !quorum_region)
return;
/*
* If global heartbeat active and there are dependent users,
* pin all regions if quorum region count <= CUT_OFF
*/
spin_lock(&o2hb_live_lock);
if (!o2hb_dependent_users)
goto unlock;
if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
o2hb_region_pin(NULL);
unlock:
spin_unlock(&o2hb_live_lock);
}
struct o2hb_heartbeat_group_attribute {
......@@ -2214,63 +2307,138 @@ void o2hb_setup_callback(struct o2hb_callback_func *hc,
}
EXPORT_SYMBOL_GPL(o2hb_setup_callback);
static struct o2hb_region *o2hb_find_region(const char *region_uuid)
/*
* In local heartbeat mode, region_uuid passed matches the dlm domain name.
* In global heartbeat mode, region_uuid passed is NULL.
*
* In local, we only pin the matching region. In global we pin all the active
* regions.
*/
static int o2hb_region_pin(const char *region_uuid)
{
struct o2hb_region *p, *reg = NULL;
int ret = 0, found = 0;
struct o2hb_region *reg;
char *uuid;
assert_spin_locked(&o2hb_live_lock);
list_for_each_entry(p, &o2hb_all_regions, hr_all_item) {
if (!strcmp(region_uuid, config_item_name(&p->hr_item))) {
reg = p;
break;
list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
uuid = config_item_name(&reg->hr_item);
/* local heartbeat */
if (region_uuid) {
if (strcmp(region_uuid, uuid))
continue;
found = 1;
}
if (reg->hr_item_pinned || reg->hr_item_dropped)
goto skip_pin;
/* Ignore ENOENT only for local hb (userdlm domain) */
ret = o2nm_depend_item(&reg->hr_item);
if (!ret) {
mlog(ML_CLUSTER, "Pin region %s\n", uuid);
reg->hr_item_pinned = 1;
} else {
if (ret == -ENOENT && found)
ret = 0;
else {
mlog(ML_ERROR, "Pin region %s fails with %d\n",
uuid, ret);
break;
}
}
skip_pin:
if (found)
break;
}
return reg;
return ret;
}
static int o2hb_region_get(const char *region_uuid)
/*
* In local heartbeat mode, region_uuid passed matches the dlm domain name.
* In global heartbeat mode, region_uuid passed is NULL.
*
* In local, we only unpin the matching region. In global we unpin all the
* active regions.
*/
static void o2hb_region_unpin(const char *region_uuid)
{
int ret = 0;
struct o2hb_region *reg;
char *uuid;
int found = 0;
spin_lock(&o2hb_live_lock);
assert_spin_locked(&o2hb_live_lock);
reg = o2hb_find_region(region_uuid);
if (!reg)
ret = -ENOENT;
spin_unlock(&o2hb_live_lock);
list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
uuid = config_item_name(&reg->hr_item);
if (region_uuid) {
if (strcmp(region_uuid, uuid))
continue;
found = 1;
}
if (ret)
goto out;
if (reg->hr_item_pinned) {
mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
o2nm_undepend_item(&reg->hr_item);
reg->hr_item_pinned = 0;
}
if (found)
break;
}
}
ret = o2nm_depend_this_node();
if (ret)
goto out;
static int o2hb_region_inc_user(const char *region_uuid)
{
int ret = 0;
ret = o2nm_depend_item(&reg->hr_item);
if (ret)
o2nm_undepend_this_node();
spin_lock(&o2hb_live_lock);
out:
/* local heartbeat */
if (!o2hb_global_heartbeat_active()) {
ret = o2hb_region_pin(region_uuid);
goto unlock;
}
/*
* if global heartbeat active and this is the first dependent user,
* pin all regions if quorum region count <= CUT_OFF
*/
o2hb_dependent_users++;
if (o2hb_dependent_users > 1)
goto unlock;
if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
ret = o2hb_region_pin(NULL);
unlock:
spin_unlock(&o2hb_live_lock);
return ret;
}
static void o2hb_region_put(const char *region_uuid)
void o2hb_region_dec_user(const char *region_uuid)
{
struct o2hb_region *reg;
spin_lock(&o2hb_live_lock);
reg = o2hb_find_region(region_uuid);
/* local heartbeat */
if (!o2hb_global_heartbeat_active()) {
o2hb_region_unpin(region_uuid);
goto unlock;
}
spin_unlock(&o2hb_live_lock);
/*
* if global heartbeat active and there are no dependent users,
* unpin all quorum regions
*/
o2hb_dependent_users--;
if (!o2hb_dependent_users)
o2hb_region_unpin(NULL);
if (reg) {
o2nm_undepend_item(&reg->hr_item);
o2nm_undepend_this_node();
}
unlock:
spin_unlock(&o2hb_live_lock);
}
int o2hb_register_callback(const char *region_uuid,
......@@ -2291,9 +2459,11 @@ int o2hb_register_callback(const char *region_uuid,
}
if (region_uuid) {
ret = o2hb_region_get(region_uuid);
if (ret)
ret = o2hb_region_inc_user(region_uuid);