Commit 40689ac4 authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'dlm-3.7' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "There are two main patches in this set, both related to the userland
  dlm_controld daemon.

  The first fixes a deadlock between dlm_controld and the dlm_send
  workqueue when both access configfs data simultaneously.

  The second reworks some code to get around a long standing, but
  intentional, unlock balance warning.  The userland daemon no longer
  takes a lock that is later released from the kernel.

  The other commits are minor fixes and changes."

* tag 'dlm-3.7' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: check the maximum size of a request from user
  dlm: cleanup send_to_sock routine
  dlm: convert add_sock routine return value type to void
  dlm: remove redundant variable assignments
  dlm: fix unlock balance warnings
  dlm: fix uninitialized spinlock
  dlm: fix deadlock between dlm_send and dlm_controld
parents cc150a28 2b75bc91
......@@ -15,8 +15,8 @@
#include "lock.h"
#include "user.h"
static uint64_t dlm_cb_seq;
static spinlock_t dlm_cb_seq_spin;
static uint64_t dlm_cb_seq;
static DEFINE_SPINLOCK(dlm_cb_seq_spin);
static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
{
......
......@@ -750,6 +750,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf,
static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
{
struct sockaddr_storage *addr;
int rv;
if (len != sizeof(struct sockaddr_storage))
return -EINVAL;
......@@ -762,6 +763,13 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
return -ENOMEM;
memcpy(addr, buf, len);
rv = dlm_lowcomms_addr(cm->nodeid, addr, len);
if (rv) {
kfree(addr);
return rv;
}
cm->addr[cm->addr_count++] = addr;
return len;
}
......@@ -878,34 +886,7 @@ static void put_space(struct dlm_space *sp)
config_item_put(&sp->group.cg_item);
}
static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
{
switch (x->ss_family) {
case AF_INET: {
struct sockaddr_in *sinx = (struct sockaddr_in *)x;
struct sockaddr_in *siny = (struct sockaddr_in *)y;
if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
return 0;
if (sinx->sin_port != siny->sin_port)
return 0;
break;
}
case AF_INET6: {
struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
return 0;
if (sinx->sin6_port != siny->sin6_port)
return 0;
break;
}
default:
return 0;
}
return 1;
}
static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr)
static struct dlm_comm *get_comm(int nodeid)
{
struct config_item *i;
struct dlm_comm *cm = NULL;
......@@ -919,19 +900,11 @@ static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr)
list_for_each_entry(i, &comm_list->cg_children, ci_entry) {
cm = config_item_to_comm(i);
if (nodeid) {
if (cm->nodeid != nodeid)
continue;
found = 1;
config_item_get(i);
break;
} else {
if (!cm->addr_count || !addr_compare(cm->addr[0], addr))
continue;
found = 1;
config_item_get(i);
break;
}
if (cm->nodeid != nodeid)
continue;
found = 1;
config_item_get(i);
break;
}
mutex_unlock(&clusters_root.subsys.su_mutex);
......@@ -995,7 +968,7 @@ int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
int dlm_comm_seq(int nodeid, uint32_t *seq)
{
struct dlm_comm *cm = get_comm(nodeid, NULL);
struct dlm_comm *cm = get_comm(nodeid);
if (!cm)
return -EEXIST;
*seq = cm->seq;
......@@ -1003,28 +976,6 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
return 0;
}
int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr)
{
struct dlm_comm *cm = get_comm(nodeid, NULL);
if (!cm)
return -EEXIST;
if (!cm->addr_count)
return -ENOENT;
memcpy(addr, cm->addr[0], sizeof(*addr));
put_comm(cm);
return 0;
}
int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
{
struct dlm_comm *cm = get_comm(0, addr);
if (!cm)
return -EEXIST;
*nodeid = cm->nodeid;
put_comm(cm);
return 0;
}
int dlm_our_nodeid(void)
{
return local_comm ? local_comm->nodeid : 0;
......
......@@ -46,8 +46,6 @@ void dlm_config_exit(void);
int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
int *count_out);
int dlm_comm_seq(int nodeid, uint32_t *seq);
int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
int dlm_our_nodeid(void);
int dlm_our_addr(struct sockaddr_storage *addr, int num);
......
......@@ -604,6 +604,7 @@ struct dlm_ls {
struct idr ls_recover_idr;
spinlock_t ls_recover_idr_lock;
wait_queue_head_t ls_wait_general;
wait_queue_head_t ls_recover_lock_wait;
struct mutex ls_clear_proc_locks;
struct list_head ls_root_list; /* root resources */
......@@ -616,15 +617,40 @@ struct dlm_ls {
char ls_name[1];
};
#define LSFL_WORK 0
#define LSFL_RUNNING 1
#define LSFL_RECOVERY_STOP 2
#define LSFL_RCOM_READY 3
#define LSFL_RCOM_WAIT 4
#define LSFL_UEVENT_WAIT 5
#define LSFL_TIMEWARN 6
#define LSFL_CB_DELAY 7
#define LSFL_NODIR 8
/*
* LSFL_RECOVER_STOP - dlm_ls_stop() sets this to tell dlm recovery routines
* that they should abort what they're doing so new recovery can be started.
*
* LSFL_RECOVER_DOWN - dlm_ls_stop() sets this to tell dlm_recoverd that it
* should do down_write() on the in_recovery rw_semaphore. (doing down_write
* within dlm_ls_stop causes complaints about the lock acquired/released
* in different contexts.)
*
* LSFL_RECOVER_LOCK - dlm_recoverd holds the in_recovery rw_semaphore.
* It sets this after it is done with down_write() on the in_recovery
* rw_semaphore and clears it after it has released the rw_semaphore.
*
* LSFL_RECOVER_WORK - dlm_ls_start() sets this to tell dlm_recoverd that it
* should begin recovery of the lockspace.
*
* LSFL_RUNNING - set when normal locking activity is enabled.
* dlm_ls_stop() clears this to tell dlm locking routines that they should
* quit what they are doing so recovery can run. dlm_recoverd sets
* this after recovery is finished.
*/
#define LSFL_RECOVER_STOP 0
#define LSFL_RECOVER_DOWN 1
#define LSFL_RECOVER_LOCK 2
#define LSFL_RECOVER_WORK 3
#define LSFL_RUNNING 4
#define LSFL_RCOM_READY 5
#define LSFL_RCOM_WAIT 6
#define LSFL_UEVENT_WAIT 7
#define LSFL_TIMEWARN 8
#define LSFL_CB_DELAY 9
#define LSFL_NODIR 10
/* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */
......@@ -667,7 +693,7 @@ static inline int dlm_locking_stopped(struct dlm_ls *ls)
static inline int dlm_recovery_stopped(struct dlm_ls *ls)
{
return test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
return test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
}
static inline int dlm_no_directory(struct dlm_ls *ls)
......
......@@ -582,8 +582,6 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_root_list);
init_rwsem(&ls->ls_root_sem);
down_write(&ls->ls_in_recovery);
spin_lock(&lslist_lock);
ls->ls_create_count = 1;
list_add(&ls->ls_list, &lslist);
......@@ -597,13 +595,24 @@ static int new_lockspace(const char *name, const char *cluster,
}
}
/* needs to find ls in lslist */
init_waitqueue_head(&ls->ls_recover_lock_wait);
/*
* Once started, dlm_recoverd first looks for ls in lslist, then
* initializes ls_in_recovery as locked in "down" mode. We need
* to wait for the wakeup from dlm_recoverd because in_recovery
* has to start out in down mode.
*/
error = dlm_recoverd_start(ls);
if (error) {
log_error(ls, "can't start dlm_recoverd %d", error);
goto out_callback;
}
wait_event(ls->ls_recover_lock_wait,
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
ls->ls_kobj.kset = dlm_kset;
error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
"%s", ls->ls_name);
......
......@@ -140,6 +140,16 @@ struct writequeue_entry {
struct connection *con;
};
struct dlm_node_addr {
struct list_head list;
int nodeid;
int addr_count;
struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
};
static LIST_HEAD(dlm_node_addrs);
static DEFINE_SPINLOCK(dlm_node_addrs_spin);
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
static int dlm_allow_conn;
......@@ -264,31 +274,146 @@ static struct connection *assoc2con(int assoc_id)
return NULL;
}
static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
static struct dlm_node_addr *find_node_addr(int nodeid)
{
struct dlm_node_addr *na;
list_for_each_entry(na, &dlm_node_addrs, list) {
if (na->nodeid == nodeid)
return na;
}
return NULL;
}
static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
{
switch (x->ss_family) {
case AF_INET: {
struct sockaddr_in *sinx = (struct sockaddr_in *)x;
struct sockaddr_in *siny = (struct sockaddr_in *)y;
if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
return 0;
if (sinx->sin_port != siny->sin_port)
return 0;
break;
}
case AF_INET6: {
struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
return 0;
if (sinx->sin6_port != siny->sin6_port)
return 0;
break;
}
default:
return 0;
}
return 1;
}
static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
struct sockaddr *sa_out)
{
struct sockaddr_storage addr;
int error;
struct sockaddr_storage sas;
struct dlm_node_addr *na;
if (!dlm_local_count)
return -1;
error = dlm_nodeid_to_addr(nodeid, &addr);
if (error)
return error;
spin_lock(&dlm_node_addrs_spin);
na = find_node_addr(nodeid);
if (na && na->addr_count)
memcpy(&sas, na->addr[0], sizeof(struct sockaddr_storage));
spin_unlock(&dlm_node_addrs_spin);
if (!na)
return -EEXIST;
if (!na->addr_count)
return -ENOENT;
if (sas_out)
memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
if (!sa_out)
return 0;
if (dlm_local_addr[0]->ss_family == AF_INET) {
struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
} else {
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
ret6->sin6_addr = in6->sin6_addr;
}
return 0;
}
static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
{
struct dlm_node_addr *na;
int rv = -EEXIST;
spin_lock(&dlm_node_addrs_spin);
list_for_each_entry(na, &dlm_node_addrs, list) {
if (!na->addr_count)
continue;
if (!addr_compare(na->addr[0], addr))
continue;
*nodeid = na->nodeid;
rv = 0;
break;
}
spin_unlock(&dlm_node_addrs_spin);
return rv;
}
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
{
struct sockaddr_storage *new_addr;
struct dlm_node_addr *new_node, *na;
new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
if (!new_node)
return -ENOMEM;
new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
if (!new_addr) {
kfree(new_node);
return -ENOMEM;
}
memcpy(new_addr, addr, len);
spin_lock(&dlm_node_addrs_spin);
na = find_node_addr(nodeid);
if (!na) {
new_node->nodeid = nodeid;
new_node->addr[0] = new_addr;
new_node->addr_count = 1;
list_add(&new_node->list, &dlm_node_addrs);
spin_unlock(&dlm_node_addrs_spin);
return 0;
}
if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
spin_unlock(&dlm_node_addrs_spin);
kfree(new_addr);
kfree(new_node);
return -ENOSPC;
}
na->addr[na->addr_count++] = new_addr;
spin_unlock(&dlm_node_addrs_spin);
kfree(new_node);
return 0;
}
/* Data available on socket or listen socket received a connect */
static void lowcomms_data_ready(struct sock *sk, int count_unused)
{
......@@ -348,7 +473,7 @@ int dlm_lowcomms_connect_node(int nodeid)
}
/* Make a socket active */
static int add_sock(struct socket *sock, struct connection *con)
static void add_sock(struct socket *sock, struct connection *con)
{
con->sock = sock;
......@@ -358,7 +483,6 @@ static int add_sock(struct socket *sock, struct connection *con)
con->sock->sk->sk_state_change = lowcomms_state_change;
con->sock->sk->sk_user_data = con;
con->sock->sk->sk_allocation = GFP_NOFS;
return 0;
}
/* Add the port number to an IPv6 or 4 sockaddr and return the address
......@@ -510,7 +634,7 @@ static void process_sctp_notification(struct connection *con,
return;
}
make_sockaddr(&prim.ssp_addr, 0, &addr_len);
if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
unsigned char *b=(unsigned char *)&prim.ssp_addr;
log_print("reject connect from unknown addr");
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
......@@ -747,7 +871,7 @@ static int tcp_accept_from_sock(struct connection *con)
/* Get the new node's NODEID */
make_sockaddr(&peeraddr, 0, &len);
if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
if (addr_to_nodeid(&peeraddr, &nodeid)) {
unsigned char *b=(unsigned char *)&peeraddr;
log_print("connect from non cluster node");
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
......@@ -862,7 +986,7 @@ static void sctp_init_assoc(struct connection *con)
if (con->retries++ > MAX_CONNECT_RETRIES)
return;
if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) {
if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr)) {
log_print("no address for nodeid %d", con->nodeid);
return;
}
......@@ -928,11 +1052,11 @@ static void sctp_init_assoc(struct connection *con)
/* Connect a new socket to its peer */
static void tcp_connect_to_sock(struct connection *con)
{
int result = -EHOSTUNREACH;
struct sockaddr_storage saddr, src_addr;
int addr_len;
struct socket *sock = NULL;
int one = 1;
int result;
if (con->nodeid == 0) {
log_print("attempt to connect sock 0 foiled");
......@@ -944,10 +1068,8 @@ static void tcp_connect_to_sock(struct connection *con)
goto out;
/* Some odd races can cause double-connects, ignore them */
if (con->sock) {
result = 0;
if (con->sock)
goto out;
}
/* Create a socket to communicate with */
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
......@@ -956,8 +1078,11 @@ static void tcp_connect_to_sock(struct connection *con)
goto out_err;
memset(&saddr, 0, sizeof(saddr));
if (dlm_nodeid_to_addr(con->nodeid, &saddr))
result = nodeid_to_addr(con->nodeid, &saddr, NULL);
if (result < 0) {
log_print("no address for nodeid %d", con->nodeid);
goto out_err;
}
sock->sk->sk_user_data = con;
con->rx_action = receive_from_sock;
......@@ -983,8 +1108,7 @@ static void tcp_connect_to_sock(struct connection *con)
kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
sizeof(one));
result =
sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
O_NONBLOCK);
if (result == -EINPROGRESS)
result = 0;
......@@ -1002,11 +1126,17 @@ out_err:
* Some errors are fatal and this list might need adjusting. For other
* errors we try again until the max number of retries is reached.
*/
if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
result != -ENETDOWN && result != -EINVAL
&& result != -EPROTONOSUPPORT) {
if (result != -EHOSTUNREACH &&
result != -ENETUNREACH &&
result != -ENETDOWN &&
result != -EINVAL &&
result != -EPROTONOSUPPORT) {
log_print("connect %d try %d error %d", con->nodeid,
con->retries, result);
mutex_unlock(&con->sock_mutex);
msleep(1000);
lowcomms_connect_sock(con);
result = 0;
return;
}
out:
mutex_unlock(&con->sock_mutex);
......@@ -1044,10 +1174,8 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
if (result < 0) {
log_print("Failed to set SO_REUSEADDR on socket: %d", result);
}
sock->sk->sk_user_data = con;
con->rx_action = tcp_accept_from_sock;
con->connect_action = tcp_connect_to_sock;
con->sock = sock;
/* Bind to our port */
make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
......@@ -1358,8 +1486,7 @@ static void send_to_sock(struct connection *con)
}
cond_resched();
goto out;
}
if (ret <= 0)
} else if (ret < 0)
goto send_error;
}
......@@ -1376,7 +1503,6 @@ static void send_to_sock(struct connection *con)
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
continue;
}
}
spin_unlock(&con->writequeue_lock);
......@@ -1394,7 +1520,6 @@ out_connect:
mutex_unlock(&con->sock_mutex);
if (!test_bit(CF_INIT_PENDING, &con->flags))
lowcomms_connect_sock(con);
return;
}
static void clean_one_writequeue(struct connection *con)
......@@ -1414,6 +1539,7 @@ static void clean_one_writequeue(struct connection *con)
int dlm_lowcomms_close(int nodeid)
{
struct connection *con;
struct dlm_node_addr *na;
log_print("closing connection to node %d", nodeid);
con = nodeid2con(nodeid, 0);
......@@ -1428,6 +1554,17 @@ int dlm_lowcomms_close(int nodeid)
clean_one_writequeue(con);
close_connection(con, true);
}
spin_lock(&dlm_node_addrs_spin);
na = find_node_addr(nodeid);
if (na) {
list_del(&na->list);
while (na->addr_count--)
kfree(na->addr[na->addr_count]);
kfree(na);
}
spin_unlock(&dlm_node_addrs_spin);
return 0;
}
......@@ -1577,3 +1714,17 @@ fail_destroy:
fail:
return error;
}
void dlm_lowcomms_exit(void)
{
struct dlm_node_addr *na, *safe;
spin_lock(&dlm_node_addrs_spin);
list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
list_del(&na->list);
while (na->addr_count--)
kfree(na->addr[na->addr_count]);
kfree(na);
}
spin_unlock(&dlm_node_addrs_spin);
}
......@@ -16,10 +16,12 @@
int dlm_lowcomms_start(void);
void dlm_lowcomms_stop(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid);
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc);
void dlm_lowcomms_commit_buffer(void *mh);
int dlm_lowcomms_connect_node(int nodeid);
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
#endif /* __LOWCOMMS_DOT_H__ */
......@@ -17,6 +17,7 @@
#include "user.h"
#include "memory.h"
#include "config.h"
#include "lowcomms.h"
static int __init init_dlm(void)
{
......@@ -78,6 +79,7 @@ static void __exit exit_dlm(void)
dlm_config_exit();