summaryrefslogtreecommitdiff
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-05-22 19:31:38 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2012-05-22 19:31:38 -0700
commit6101167727932a929e37fb8a6eeb68bdbf54d58e (patch)
treeda3e9c8244f86082c6ea4d150f7fa653a7843192 /fs/dlm/lock.c
parent6133308ad1a386e7e7f776003a1c44e8b54e2166 (diff)
parent75af271ed5f51b1f3506c7c1d567b1f32e5c9206 (diff)
Merge tag 'dlm-3.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
Pull dlm updates from David Teigland: "This set includes some minor fixes and improvements. The one large patch addresses the special "nodir" mode, which has been a long neglected proof of concept, but with these fixes seems to be quite usable. It allows the resource master to be assigned statically instead of dynamically, which can improve performance if there is little locality and most resources are shared." * tag 'dlm-3.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: dlm: NULL dereference on failure in kmem_cache_create() gfs2: fix recovery during unmount dlm: fixes for nodir mode dlm: improve error and debug messages dlm: avoid unnecessary search in search_rsb dlm: limit rcom debug messages dlm: fix waiter recovery dlm: prevent connections during shutdown
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c541
1 files changed, 371 insertions, 170 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 4c58d4a3adc4..bdafb65a5234 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -160,11 +160,12 @@ static const int __quecvt_compat_matrix[8][8] = {
void dlm_print_lkb(struct dlm_lkb *lkb)
{
- printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
- " status %d rqmode %d grmode %d wait_type %d\n",
+ printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
+ "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
- lkb->lkb_grmode, lkb->lkb_wait_type);
+ lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
+ (unsigned long long)lkb->lkb_recover_seq);
}
static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
static inline int is_master_copy(struct dlm_lkb *lkb)
{
- if (lkb->lkb_flags & DLM_IFL_MSTCPY)
- DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
}
@@ -479,6 +478,9 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
kref_get(&r->res_ref);
goto out;
}
+ if (error == -ENOTBLK)
+ goto out;
+
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
if (error)
goto out;
@@ -586,6 +588,23 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
return error;
}
+static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
+{
+ struct rb_node *n;
+ struct dlm_rsb *r;
+ int i;
+
+ for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+ spin_lock(&ls->ls_rsbtbl[i].lock);
+ for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+ r = rb_entry(n, struct dlm_rsb, res_hashnode);
+ if (r->res_hash == hash)
+ dlm_dump_rsb(r);
+ }
+ spin_unlock(&ls->ls_rsbtbl[i].lock);
+ }
+}
+
/* This is only called to add a reference when the code already holds
a valid reference to the rsb, so there's no need for locking. */
@@ -1064,8 +1083,9 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
goto out_del;
}
- log_error(ls, "remwait error %x reply %d flags %x no wait_type",
- lkb->lkb_id, mstype, lkb->lkb_flags);
+ log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
+ lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
+ mstype, lkb->lkb_flags);
return -1;
out_del:
@@ -1498,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
}
lkb->lkb_rqmode = DLM_LOCK_IV;
+ lkb->lkb_highbast = 0;
}
static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
set_lvb_lock(r, lkb);
_grant_lock(r, lkb);
- lkb->lkb_highbast = 0;
}
static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1866,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
/* Returns the highest requested mode of all blocked conversions; sets
cw if there's a blocked conversion to DLM_LOCK_CW. */
-static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
+ unsigned int *count)
{
struct dlm_lkb *lkb, *s;
int hi, demoted, quit, grant_restart, demote_restart;
@@ -1885,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
if (can_be_granted(r, lkb, 0, &deadlk)) {
grant_lock_pending(r, lkb);
grant_restart = 1;
+ if (count)
+ (*count)++;
continue;
}
@@ -1918,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
return max_t(int, high, hi);
}
-static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
+ unsigned int *count)
{
struct dlm_lkb *lkb, *s;
list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
- if (can_be_granted(r, lkb, 0, NULL))
+ if (can_be_granted(r, lkb, 0, NULL)) {
grant_lock_pending(r, lkb);
- else {
+ if (count)
+ (*count)++;
+ } else {
high = max_t(int, lkb->lkb_rqmode, high);
if (lkb->lkb_rqmode == DLM_LOCK_CW)
*cw = 1;
@@ -1954,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
return 0;
}
-static void grant_pending_locks(struct dlm_rsb *r)
+static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
{
struct dlm_lkb *lkb, *s;
int high = DLM_LOCK_IV;
int cw = 0;
- DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
+ if (!is_master(r)) {
+ log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+ dlm_dump_rsb(r);
+ return;
+ }
- high = grant_pending_convert(r, high, &cw);
- high = grant_pending_wait(r, high, &cw);
+ high = grant_pending_convert(r, high, &cw, count);
+ high = grant_pending_wait(r, high, &cw, count);
if (high == DLM_LOCK_IV)
return;
@@ -2499,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
before we try again to grant this one. */
if (is_demoted(lkb)) {
- grant_pending_convert(r, DLM_LOCK_IV, NULL);
+ grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
if (_can_be_granted(r, lkb, 1)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
@@ -2527,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
{
switch (error) {
case 0:
- grant_pending_locks(r);
+ grant_pending_locks(r, NULL);
/* grant_pending_locks also sends basts */
break;
case -EAGAIN:
@@ -2550,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
int error)
{
- grant_pending_locks(r);
+ grant_pending_locks(r, NULL);
}
/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2571,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
int error)
{
if (error)
- grant_pending_locks(r);
+ grant_pending_locks(r, NULL);
}
/*
@@ -3372,7 +3402,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
return error;
}
-static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3412,14 +3442,15 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
error = 0;
if (error)
dlm_put_lkb(lkb);
- return;
+ return 0;
fail:
setup_stub_lkb(ls, ms);
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+ return error;
}
-static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3429,6 +3460,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
if (error)
goto fail;
+ if (lkb->lkb_remid != ms->m_lkid) {
+ log_error(ls, "receive_convert %x remid %x recover_seq %llu "
+ "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
+ (unsigned long long)lkb->lkb_recover_seq,
+ ms->m_header.h_nodeid, ms->m_lkid);
+ error = -ENOENT;
+ goto fail;
+ }
+
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3456,14 +3496,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
- return;
+ return 0;
fail:
setup_stub_lkb(ls, ms);
send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+ return error;
}
-static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3473,6 +3514,14 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
if (error)
goto fail;
+ if (lkb->lkb_remid != ms->m_lkid) {
+ log_error(ls, "receive_unlock %x remid %x remote %d %x",
+ lkb->lkb_id, lkb->lkb_remid,
+ ms->m_header.h_nodeid, ms->m_lkid);
+ error = -ENOENT;
+ goto fail;
+ }
+
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3497,14 +3546,15 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
- return;
+ return 0;
fail:
setup_stub_lkb(ls, ms);
send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+ return error;
}
-static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3532,25 +3582,23 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
- return;
+ return 0;
fail:
setup_stub_lkb(ls, ms);
send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+ return error;
}
-static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_grant from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
r = lkb->lkb_resource;
@@ -3570,20 +3618,18 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
+ return 0;
}
-static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_bast from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
r = lkb->lkb_resource;
@@ -3595,10 +3641,12 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
goto out;
queue_bast(r, lkb, ms->m_bastmode);
+ lkb->lkb_highbast = ms->m_bastmode;
out:
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
+ return 0;
}
static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3653,18 +3701,15 @@ static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
do_purge(ls, ms->m_nodeid, ms->m_pid);
}
-static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error, mstype, result;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_request_reply from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3676,8 +3721,13 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
mstype = lkb->lkb_wait_type;
error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
- if (error)
+ if (error) {
+ log_error(ls, "receive_request_reply %x remote %d %x result %d",
+ lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+ ms->m_result);
+ dlm_dump_rsb(r);
goto out;
+ }
/* Optimization: the dir node was also the master, so it took our
lookup as a request and sent request reply instead of lookup reply */
@@ -3755,6 +3805,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
+ return 0;
}
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -3793,8 +3844,11 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
break;
default:
- log_error(r->res_ls, "receive_convert_reply %x error %d",
- lkb->lkb_id, ms->m_result);
+ log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
+ lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+ ms->m_result);
+ dlm_print_rsb(r);
+ dlm_print_lkb(lkb);
}
}
@@ -3821,20 +3875,18 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
put_rsb(r);
}
-static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_convert_reply from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
_receive_convert_reply(lkb, ms);
dlm_put_lkb(lkb);
+ return 0;
}
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3873,20 +3925,18 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
put_rsb(r);
}
-static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_unlock_reply from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
_receive_unlock_reply(lkb, ms);
dlm_put_lkb(lkb);
+ return 0;
}
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3925,20 +3975,18 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
put_rsb(r);
}
-static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_cancel_reply from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
_receive_cancel_reply(lkb, ms);
dlm_put_lkb(lkb);
+ return 0;
}
static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3949,7 +3997,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
error = find_lkb(ls, ms->m_lkid, &lkb);
if (error) {
- log_error(ls, "receive_lookup_reply no lkb");
+ log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
return;
}
@@ -3993,8 +4041,11 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
dlm_put_lkb(lkb);
}
-static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+ uint32_t saved_seq)
{
+ int error = 0, noent = 0;
+
if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
log_debug(ls, "ignore non-member message %d from %d %x %x %d",
ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
@@ -4007,47 +4058,50 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
/* messages sent to a master node */
case DLM_MSG_REQUEST:
- receive_request(ls, ms);
+ error = receive_request(ls, ms);
break;
case DLM_MSG_CONVERT:
- receive_convert(ls, ms);
+ error = receive_convert(ls, ms);
break;
case DLM_MSG_UNLOCK:
- receive_unlock(ls, ms);
+ error = receive_unlock(ls, ms);
break;
case DLM_MSG_CANCEL:
- receive_cancel(ls, ms);
+ noent = 1;
+ error = receive_cancel(ls, ms);
break;
/* messages sent from a master node (replies to above) */
case DLM_MSG_REQUEST_REPLY:
- receive_request_reply(ls, ms);
+ error = receive_request_reply(ls, ms);
break;
case DLM_MSG_CONVERT_REPLY:
- receive_convert_reply(ls, ms);
+ error = receive_convert_reply(ls, ms);
break;
case DLM_MSG_UNLOCK_REPLY:
- receive_unlock_reply(ls, ms);
+ error = receive_unlock_reply(ls, ms);
break;
case DLM_MSG_CANCEL_REPLY:
- receive_cancel_reply(ls, ms);
+ error = receive_cancel_reply(ls, ms);
break;
/* messages sent from a master node (only two types of async msg) */
case DLM_MSG_GRANT:
- receive_grant(ls, ms);
+ noent = 1;
+ error = receive_grant(ls, ms);
break;
case DLM_MSG_BAST:
- receive_bast(ls, ms);
+ noent = 1;
+ error = receive_bast(ls, ms);
break;
/* messages sent to a dir node */
@@ -4075,6 +4129,37 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
default:
log_error(ls, "unknown message type %d", ms->m_type);
}
+
+ /*
+ * When checking for ENOENT, we're checking the result of
+ * find_lkb(m_remid):
+ *
+ * The lock id referenced in the message wasn't found. This may
+ * happen in normal usage for the async messages and cancel, so
+ * only use log_debug for them.
+ *
+ * Some errors are expected and normal.
+ */
+
+ if (error == -ENOENT && noent) {
+ log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
+ ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
+ ms->m_lkid, saved_seq);
+ } else if (error == -ENOENT) {
+ log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
+ ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
+ ms->m_lkid, saved_seq);
+
+ if (ms->m_type == DLM_MSG_CONVERT)
+ dlm_dump_rsb_hash(ls, ms->m_hash);
+ }
+
+ if (error == -EINVAL) {
+ log_error(ls, "receive %d inval from %d lkid %x remid %x "
+ "saved_seq %u",
+ ms->m_type, ms->m_header.h_nodeid,
+ ms->m_lkid, ms->m_remid, saved_seq);
+ }
}
/* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4092,16 +4177,17 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
dlm_add_requestqueue(ls, nodeid, ms);
} else {
dlm_wait_requestqueue(ls);
- _receive_message(ls, ms);
+ _receive_message(ls, ms, 0);
}
}
/* This is called by dlm_recoverd to process messages that were saved on
the requestqueue. */
-void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
+ uint32_t saved_seq)
{
- _receive_message(ls, ms);
+ _receive_message(ls, ms, saved_seq);
}
/* This is called by the midcomms layer when something is received for
@@ -4137,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
- if (dlm_config.ci_log_debug)
- log_print("invalid lockspace %x from %d cmd %d type %d",
- hd->h_lockspace, nodeid, hd->h_cmd, type);
+ if (dlm_config.ci_log_debug) {
+ printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
+ "%u from %d cmd %d type %d\n",
+ hd->h_lockspace, nodeid, hd->h_cmd, type);
+ }
if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4187,15 +4275,13 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
/* A waiting lkb needs recovery if the master node has failed, or
the master node is changing (only when no directory is used) */
-static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
+static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
+ int dir_nodeid)
{
- if (dlm_is_removed(ls, lkb->lkb_nodeid))
+ if (dlm_no_directory(ls))
return 1;
- if (!dlm_no_directory(ls))
- return 0;
-
- if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
+ if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
return 1;
return 0;
@@ -4212,6 +4298,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
struct dlm_lkb *lkb, *safe;
struct dlm_message *ms_stub;
int wait_type, stub_unlock_result, stub_cancel_result;
+ int dir_nodeid;
ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
if (!ms_stub) {
@@ -4223,13 +4310,21 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
+ dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
+
/* exclude debug messages about unlocks because there can be so
many and they aren't very interesting */
if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
- log_debug(ls, "recover_waiter %x nodeid %d "
- "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
- lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
+ log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+ "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+ lkb->lkb_id,
+ lkb->lkb_remid,
+ lkb->lkb_wait_type,
+ lkb->lkb_resource->res_nodeid,
+ lkb->lkb_nodeid,
+ lkb->lkb_wait_nodeid,
+ dir_nodeid);
}
/* all outstanding lookups, regardless of destination will be
@@ -4240,7 +4335,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
continue;
}
- if (!waiter_needs_recovery(ls, lkb))
+ if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
continue;
wait_type = lkb->lkb_wait_type;
@@ -4373,8 +4468,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
ou = is_overlap_unlock(lkb);
err = 0;
- log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
- lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
+ log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+ "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
+ "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
+ r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
+ dlm_dir_nodeid(r), oc, ou);
/* At this point we assume that we won't get a reply to any
previous op or overlap op on this lock. First, do a big
@@ -4426,9 +4524,12 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
}
}
- if (err)
- log_error(ls, "recover_waiters_post %x %d %x %d %d",
- lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
+ if (err) {
+ log_error(ls, "waiter %x msg %d r_nodeid %d "
+ "dir_nodeid %d overlap %d %d",
+ lkb->lkb_id, mstype, r->res_nodeid,
+ dlm_dir_nodeid(r), oc, ou);
+ }
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
@@ -4437,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
return error;
}
-static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
- int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
+static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
+ struct list_head *list)
{
- struct dlm_ls *ls = r->res_ls;
struct dlm_lkb *lkb, *safe;
- list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
- if (test(ls, lkb)) {
- rsb_set_flag(r, RSB_LOCKS_PURGED);
- del_lkb(r, lkb);
- /* this put should free the lkb */
- if (!dlm_put_lkb(lkb))
- log_error(ls, "purged lkb not released");
- }
+ list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+ if (!is_master_copy(lkb))
+ continue;
+
+ /* don't purge lkbs we've added in recover_master_copy for
+ the current recovery seq */
+
+ if (lkb->lkb_recover_seq == ls->ls_recover_seq)
+ continue;
+
+ del_lkb(r, lkb);
+
+ /* this put should free the lkb */
+ if (!dlm_put_lkb(lkb))
+ log_error(ls, "purged mstcpy lkb not released");
}
}
-static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
+void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
{
- return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
-}
+ struct dlm_ls *ls = r->res_ls;
-static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
- return is_master_copy(lkb);
+ purge_mstcpy_list(ls, r, &r->res_grantqueue);
+ purge_mstcpy_list(ls, r, &r->res_convertqueue);
+ purge_mstcpy_list(ls, r, &r->res_waitqueue);
}
-static void purge_dead_locks(struct dlm_rsb *r)
+static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
+ struct list_head *list,
+ int nodeid_gone, unsigned int *count)
{
- purge_queue(r, &r->res_grantqueue, &purge_dead_test);
- purge_queue(r, &r->res_convertqueue, &purge_dead_test);
- purge_queue(r, &r->res_waitqueue, &purge_dead_test);
-}
+ struct dlm_lkb *lkb, *safe;
-void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
-{
- purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
- purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
- purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
+ list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+ if (!is_master_copy(lkb))
+ continue;
+
+ if ((lkb->lkb_nodeid == nodeid_gone) ||
+ dlm_is_removed(ls, lkb->lkb_nodeid)) {
+
+ del_lkb(r, lkb);
+
+ /* this put should free the lkb */
+ if (!dlm_put_lkb(lkb))
+ log_error(ls, "purged dead lkb not released");
+
+ rsb_set_flag(r, RSB_RECOVER_GRANT);
+
+ (*count)++;
+ }
+ }
}
/* Get rid of locks held by nodes that are gone. */
-int dlm_purge_locks(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls)
{
struct dlm_rsb *r;
+ struct dlm_member *memb;
+ int nodes_count = 0;
+ int nodeid_gone = 0;
+ unsigned int lkb_count = 0;
- log_debug(ls, "dlm_purge_locks");
+ /* cache one removed nodeid to optimize the common
+ case of a single node removed */
+
+ list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+ nodes_count++;
+ nodeid_gone = memb->nodeid;
+ }
+
+ if (!nodes_count)
+ return;
down_write(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
hold_rsb(r);
lock_rsb(r);
- if (is_master(r))
- purge_dead_locks(r);
+ if (is_master(r)) {
+ purge_dead_list(ls, r, &r->res_grantqueue,
+ nodeid_gone, &lkb_count);
+ purge_dead_list(ls, r, &r->res_convertqueue,
+ nodeid_gone, &lkb_count);
+ purge_dead_list(ls, r, &r->res_waitqueue,
+ nodeid_gone, &lkb_count);
+ }
unlock_rsb(r);
unhold_rsb(r);
-
- schedule();
+ cond_resched();
}
up_write(&ls->ls_root_sem);
- return 0;
+ if (lkb_count)
+ log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
+ lkb_count, nodes_count);
}
-static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
{
struct rb_node *n;
- struct dlm_rsb *r, *r_ret = NULL;
+ struct dlm_rsb *r;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
- if (!rsb_flag(r, RSB_LOCKS_PURGED))
+
+ if (!rsb_flag(r, RSB_RECOVER_GRANT))
+ continue;
+ rsb_clear_flag(r, RSB_RECOVER_GRANT);
+ if (!is_master(r))
continue;
hold_rsb(r);
- rsb_clear_flag(r, RSB_LOCKS_PURGED);
- r_ret = r;
- break;
+ spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ return r;
}
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
- return r_ret;
+ return NULL;
}
-void dlm_grant_after_purge(struct dlm_ls *ls)
+/*
+ * Attempt to grant locks on resources that we are the master of.
+ * Locks may have become grantable during recovery because locks
+ * from departed nodes have been purged (or not rebuilt), allowing
+ * previously blocked locks to now be granted. The subset of rsb's
+ * we are interested in are those with lkb's on either the convert or
+ * waiting queues.
+ *
+ * Simplest would be to go through each master rsb and check for non-empty
+ * convert or waiting queues, and attempt to grant on those rsbs.
+ * Checking the queues requires lock_rsb, though, for which we'd need
+ * to release the rsbtbl lock. This would make iterating through all
+ * rsb's very inefficient. So, we rely on earlier recovery routines
+ * to set RECOVER_GRANT on any rsb's that we should attempt to grant
+ * locks for.
+ */
+
+void dlm_recover_grant(struct dlm_ls *ls)
{
struct dlm_rsb *r;
int bucket = 0;
+ unsigned int count = 0;
+ unsigned int rsb_count = 0;
+ unsigned int lkb_count = 0;
while (1) {
- r = find_purged_rsb(ls, bucket);
+ r = find_grant_rsb(ls, bucket);
if (!r) {
if (bucket == ls->ls_rsbtbl_size - 1)
break;
bucket++;
continue;
}
+ rsb_count++;
+ count = 0;
lock_rsb(r);
- if (is_master(r)) {
- grant_pending_locks(r);
- confirm_master(r, 0);
- }
+ grant_pending_locks(r, &count);
+ lkb_count += count;
+ confirm_master(r, 0);
unlock_rsb(r);
put_rsb(r);
- schedule();
+ cond_resched();
}
+
+ if (lkb_count)
+ log_debug(ls, "dlm_recover_grant %u locks on %u resources",
+ lkb_count, rsb_count);
}
static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4631,6 +4797,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r;
struct dlm_lkb *lkb;
+ uint32_t remid = 0;
int error;
if (rl->rl_parent_lkid) {
@@ -4638,14 +4805,31 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
goto out;
}
- error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
- R_MASTER, &r);
+ remid = le32_to_cpu(rl->rl_lkid);
+
+ /* In general we expect the rsb returned to be R_MASTER, but we don't
+ have to require it. Recovery of masters on one node can overlap
+ recovery of locks on another node, so one node can send us MSTCPY
+ locks before we've made ourselves master of this rsb. We can still
+ add new MSTCPY locks that we receive here without any harm; when
+ we make ourselves master, dlm_recover_masters() won't touch the
+ MSTCPY locks we've received early. */
+
+ error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
if (error)
goto out;
+ if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
+ log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
+ rc->rc_header.h_nodeid, remid);
+ error = -EBADR;
+ put_rsb(r);
+ goto out;
+ }
+
lock_rsb(r);
- lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
+ lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
if (lkb) {
error = -EEXIST;
goto out_remid;
@@ -4664,19 +4848,25 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
attach_lkb(r, lkb);
add_lkb(r, lkb, rl->rl_status);
error = 0;
+ ls->ls_recover_locks_in++;
+
+ if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
+ rsb_set_flag(r, RSB_RECOVER_GRANT);
out_remid:
/* this is the new value returned to the lock holder for
saving in its process-copy lkb */
rl->rl_remid = cpu_to_le32(lkb->lkb_id);
+ lkb->lkb_recover_seq = ls->ls_recover_seq;
+
out_unlock:
unlock_rsb(r);
put_rsb(r);
out:
- if (error)
- log_debug(ls, "recover_master_copy %d %x", error,
- le32_to_cpu(rl->rl_lkid));
+ if (error && error != -EEXIST)
+ log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
+ rc->rc_header.h_nodeid, remid, error);
rl->rl_result = cpu_to_le32(error);
return error;
}
@@ -4687,41 +4877,52 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r;
struct dlm_lkb *lkb;
- int error;
+ uint32_t lkid, remid;
+ int error, result;
+
+ lkid = le32_to_cpu(rl->rl_lkid);
+ remid = le32_to_cpu(rl->rl_remid);
+ result = le32_to_cpu(rl->rl_result);
- error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
+ error = find_lkb(ls, lkid, &lkb);
if (error) {
- log_error(ls, "recover_process_copy no lkid %x",
- le32_to_cpu(rl->rl_lkid));
+ log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
+ lkid, rc->rc_header.h_nodeid, remid, result);
return error;
}
- DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
-
- error = le32_to_cpu(rl->rl_result);
-
r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
- switch (error) {
+ if (!is_process_copy(lkb)) {
+ log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
+ lkid, rc->rc_header.h_nodeid, remid, result);
+ dlm_dump_rsb(r);
+ unlock_rsb(r);
+ put_rsb(r);
+ dlm_put_lkb(lkb);
+ return -EINVAL;
+ }
+
+ switch (result) {
case -EBADR:
/* There's a chance the new master received our lock before
dlm_recover_master_reply(), this wouldn't happen if we did
a barrier between recover_masters and recover_locks. */
- log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
- (unsigned long)r, r->res_name);
+
+ log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
+ lkid, rc->rc_header.h_nodeid, remid, result);
+
dlm_send_rcom_lock(r, lkb);
goto out;
case -EEXIST:
- log_debug(ls, "master copy exists %x", lkb->lkb_id);
- /* fall through */
case 0:
- lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
+ lkb->lkb_remid = remid;
break;
default:
- log_error(ls, "dlm_recover_process_copy unknown error %d %x",
- error, lkb->lkb_id);
+ log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
+ lkid, rc->rc_header.h_nodeid, remid, result);
}
/* an ack for dlm_recover_locks() which waits for replies from