Commit 212cc781 authored by Leigh Stoller's avatar Leigh Stoller

The rest of the sync server additions:

* Parser: Added new tb command to set the name of the sync server:

	tb-set-sync-server <node>

  This initializes the sync_server slot of the experiment entry to the
  *vname* of the node that should run the sync server for that
  experiment. In other words, the sync server is per-experiment, runs
  on a node in the experiment, and the user gets to chose which node
  it runs on.

* tmcd and client side setup. Added new syncserver command which
  returns the name of the syncserver and whether the requesting node
  is the lucky one to run the daemon:

    SYNCSERVER SERVER='nodeG.syncserver.testbed.emulab.net' ISSERVER=1

  The name of the syncserver is written to /var/emulab/boot/syncserver
  on the nodes so that clients can easily figure out where the server
  is.

  Aside: The ready bits are now ignored (no DB accesses are made) for
  virtual nodes; they are forced to use the new sync server.

* New os/syncd directory containing the daemon and the client. The
  daemon is pretty simple. It waits for TCP (and UDP, although that
  path is not complete yet) connections, and reads in a little
  structure that gives the name of the "barrier" to wait for, and an
  optional count of clients in the group (this would be used by the
  "master" who initializes barriers for clients). The socket is saved
  (no reply is made, so the client is blocked) until the count reaches
  zero. Then all clients are released by writting back to the
  sockets, and the sockets are closed. Obviously, the number of
  clients is limited by the numbed of FDs (open sockets), hence the
  need for a UDP variant, but that will take more work.

  The client has a simple command line interface:

    usage: emulab-sync [options]
    -n <name>         Optional barrier name; must be less than 64 bytes long
    -d                Turn on debugging
    -s server         Specify a sync server to connect to
    -p portnum        Specify a port number to connect to
    -i count          Initialize named barrier to count waiters
    -u                Use UDP instead of TCP

    The client figures out the server by looking for the file created
    above by libsetup (/var/emulab/boot/syncserver). If you do not
    specify a barrier "name", it uses an internal default. Yes, the
    server can handle multiple barriers (differently named of course)
    at once (non-overlapping clients obviously).

    Clients can wait before a barrier in "initialized." The count on
    the barrier just goes negative until someone initializes the
    barrier using the -i option, which increments the count by the
    count. Therefore, the master does not have to arrange to get there
    "first." As an example, consider a master and one client:

	nodeA> /usr/local/etc/emulab/emulab-sync -n mybarrier
	nodeB> /usr/local/etc/emulab/emulab-sync -n mybarrier -i 1

    Node A waits until Node B initializes the barrier (gives it a
    count).  The count is the number of *waiters*, not including the
    master. The master is also blocked until all of the waiters have
    checked in.

    I have not made an provision for timeouts or crashed clients. Lets
    see how it goes.
parent c7157a2f
......@@ -1349,6 +1349,7 @@ outfiles="$outfiles Makeconf GNUmakefile \
lib/GNUmakefile lib/libtb/GNUmakefile \
os/GNUmakefile os/split-image.sh os/imagezip/GNUmakefile \
os/frisbee.redux/GNUmakefile os/growdisk/GNUmakefile \
os/syncd/GNUmakefile \
pxe/GNUmakefile pxe/proxydhcp.restart pxe/bootinfo.restart \
security/GNUmakefile security/paperbag security/lastlog_daemon \
security/plasticwrap \
......
......@@ -395,6 +395,7 @@ outfiles="$outfiles Makeconf GNUmakefile \
lib/GNUmakefile lib/libtb/GNUmakefile \
os/GNUmakefile os/split-image.sh os/imagezip/GNUmakefile \
os/frisbee.redux/GNUmakefile os/growdisk/GNUmakefile \
os/syncd/GNUmakefile \
pxe/GNUmakefile pxe/proxydhcp.restart pxe/bootinfo.restart \
security/GNUmakefile security/paperbag security/lastlog_daemon \
security/plasticwrap \
......
......@@ -63,7 +63,8 @@ my %experiment_fields = ("multiplex_factor" => 1,
"uselatestwadata" => 1,
"wa_delay_solverweight" => 1,
"wa_bw_solverweight" => 1,
"wa_plr_solverweight" => 1);
"wa_plr_solverweight" => 1,
"sync_server" => 1);
#
# Turn off line buffering on output
......
stoller 2003/08/05 11:41:09 MDT
Modified files:
. configure configure.in
db xmlconvert.in
os GNUmakefile.in
tbsetup/ns2ir parse.tcl.in sim.tcl.in tb_compat.tcl.in
tmcd libsetup.pm
tmcd/common rc.setup watchdog
www showstuff.php3
Added files:
os/syncd GNUmakefile.in decls.h emulab-sync.c
emulab-syncd.c
Log:
The rest of the sync server additions:
* Parser: Added new tb command to set the name of the sync server:
tb-set-sync-server <node>
This initializes the sync_server slot of the experiment entry to the
*vname* of the node that should run the sync server for that
experiment. In other words, the sync server is per-experiment, runs
on a node in the experiment, and the user gets to chose which node
it runs on.
* tmcd and client side setup. Added new syncserver command which
returns the name of the syncserver and whether the requesting node
is the lucky one to run the daemon:
SYNCSERVER SERVER='nodeG.syncserver.testbed.emulab.net' ISSERVER=1
The name of the syncserver is written to /var/emulab/boot/syncserver
on the nodes so that clients can easily figure out where the server
is.
Aside: The ready bits are now ignored (no DB accesses are made) for
virtual nodes; they are forced to use the new sync server.
* New os/syncd directory containing the daemon and the client. The
daemon is pretty simple. It waits for TCP (and UDP, although that
path is not complete yet) connections, and reads in a little
structure that gives the name of the "barrier" to wait for, and an
optional count of clients in the group (this would be used by the
"master" who initializes barriers for clients). The socket is saved
(no reply is made, so the client is blocked) until the count reaches
zero. Then all clients are released by writting back to the
sockets, and the sockets are closed. Obviously, the number of
clients is limited by the numbed of FDs (open sockets), hence the
need for a UDP variant, but that will take more work.
The client has a simple command line interface:
usage: emulab-sync [options]
-n <name> Optional barrier name; must be less than 64 bytes long
-d Turn on debugging
-s server Specify a sync server to connect to
-p portnum Specify a port number to connect to
-i count Initialize named barrier to count waiters
-u Use UDP instead of TCP
The client figures out the server by looking for the file created
above by libsetup (/var/emulab/boot/syncserver). If you do not
specify a barrier "name", it uses an internal default. Yes, the
server can handle multiple barriers (differently named of course)
at once (non-overlapping clients obviously).
Clients can wait before a barrier in "initialized." The count on
the barrier just goes negative until someone initializes the
barrier using the -i option, which increments the count by the
count. Therefore, the master does not have to arrange to get there
"first." As an example, consider a master and one client:
nodeA> /usr/local/etc/emulab/emulab-sync -n mybarrier
nodeB> /usr/local/etc/emulab/emulab-sync -n mybarrier -i 1
Node A waits until Node B initializes the barrier (gives it a
count). The count is the number of *waiters*, not including the
master. The master is also blocked until all of the waiters have
checked in.
I have not made an provision for timeouts or crashed clients. Lets
see how it goes.
Revision Changes Path
1.166 +1 -0 testbed/configure
1.173 +1 -0 testbed/configure.in
1.5 +2 -1 testbed/db/xmlconvert.in
1.14 +10 -2 testbed/os/GNUmakefile.in
1.42 +3 -0 testbed/tbsetup/ns2ir/parse.tcl.in
1.49 +6 -0 testbed/tbsetup/ns2ir/sim.tcl.in
1.47 +14 -0 testbed/tbsetup/ns2ir/tb_compat.tcl.in
1.60 +72 -2 testbed/tmcd/libsetup.pm
1.9 +5 -0 testbed/tmcd/common/rc.setup
1.5 +2 -1 testbed/tmcd/common/watchdog
1.103 +8 -0 testbed/www/showstuff.php3
kwebb 2003/07/22 18:48:44 MDT
Modified files:
......
#
# EMULAB-COPYRIGHT
# Copyright (c) 2000-2002 University of Utah and the Flux Group.
# Copyright (c) 2000-2003 University of Utah and the Flux Group.
# All rights reserved.
#
......@@ -13,7 +13,7 @@ LBINDIR = $(DESTDIR)/usr/local/bin
include $(OBJDIR)/Makeconf
SUBDIRS = imagezip frisbee.redux growdisk
SUBDIRS = imagezip frisbee.redux growdisk syncd
all: $(SUBDIRS) split-image.sh
......@@ -25,6 +25,9 @@ imagezip:
frisbee.redux:
@$(MAKE) -C frisbee.redux all
syncd:
@$(MAKE) -C syncd all
install: $(INSTALL_SBINDIR)/split-image.sh
@$(MAKE) -C imagezip install
@$(MAKE) -C frisbee.redux install
......@@ -39,6 +42,11 @@ client-install:
$(INSTALL_PROGRAM) $(SRCDIR)/install-tarfile $(LBINDIR)/install-tarfile
$(INSTALL_PROGRAM) $(SRCDIR)/create-image $(LBINDIR)/create-image
$(MAKE) -C imagezip client-install
$(MAKE) -C syncd client-install
remote-install:
$(INSTALL) -m 755 -o root -g wheel -d $(LBINDIR)
$(INSTALL_PROGRAM) $(SRCDIR)/install-tarfile $(LBINDIR)/install-tarfile
clean: subdir-clean
......
#
# EMULAB-COPYRIGHT
# Copyright (c) 2000-2003 University of Utah and the Flux Group.
# All rights reserved.
#
SRCDIR = @srcdir@
TESTBED_SRCDIR = @top_srcdir@
OBJDIR = ../..
SUBDIR = os/syncd
include $(OBJDIR)/Makeconf
CFLAGS = -Wall -O2 -g -static \
-I${OBJDIR} -I${TESTBED_SRCDIR}/lib/libtb
LIBS = ${OBJDIR}/lib/libtb/libtb.a
all: emulab-syncd emulab-sync
include $(TESTBED_SRCDIR)/GNUmakerules
emulab-syncd: emulab-syncd.o version.o
$(CC) $(CFLAGS) emulab-syncd.o version.o $(LIBS) -o emulab-syncd
emulab-sync: emulab-sync.o version.o
$(CC) $(CFLAGS) emulab-sync.o version.o $(LIBS) -o emulab-sync
version.c: emulab-syncd.c
echo >$@ "char build_info[] = \"Built `date +%d-%b-%Y` by `id -nu`@`hostname | sed 's/\..*//'`:`pwd`\";"
client-install:
$(INSTALL_PROGRAM) emulab-syncd $(DESTDIR)$(CLIENT_BINDIR)/emulab-syncd
$(INSTALL_PROGRAM) emulab-sync $(DESTDIR)$(CLIENT_BINDIR)/emulab-sync
clean:
/bin/rm -f *.o emulab-syncd version.c
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2003 University of Utah and the Flux Group.
* All rights reserved.
*/
#define SERVER_PORTNUM 16534
#define SOCKBUFSIZE (1024 * 128)
/*
* The barrier request structure sent to the daemon. There is no return
* value; returning means go!
*/
typedef struct {
char name[64]; /* An arbitrary string */
int request; /* Either init or wait */
int count; /* Number of waiters */
} barrier_req_t;
#define BARRIER_INIT 1
#define BARRIER_WAIT 2
#define DEFAULT_BARRIER "barrier"
/* Info */
#define CURRENT_VERSION 1
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2003 University of Utah and the Flux Group.
* All rights reserved.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <sys/fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <syslog.h>
#include <unistd.h>
#include <signal.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>
#include <assert.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include "decls.h"
#include "config.h"
#ifndef SYNCSERVER
#define SYNCSERVER "/var/emulab/boot/syncserver"
#endif
static int debug = 0;
/* Forward decls */
static int doudp(barrier_req_t *, struct in_addr, int);
static int dotcp(barrier_req_t *, struct in_addr, int);
char *usagestr =
"usage: emulab-sync [options]\n"
" -n <name> Optional barrier name; must be less than 64 bytes long\n"
" -d Turn on debugging\n"
" -s server Specify a sync server to connect to\n"
" -p portnum Specify a port number to connect to\n"
" -i count Initialize named barrier to count waiters\n"
" -u Use UDP instead of TCP\n"
"\n";
void
usage()
{
fprintf(stderr, usagestr);
exit(1);
}
int
main(int argc, char **argv)
{
int n, ch;
struct hostent *he;
FILE *fp;
struct in_addr serverip;
int portnum = SERVER_PORTNUM;
int useudp = 0;
char *server = NULL;
int initme = 0;
char *barrier= DEFAULT_BARRIER;
barrier_req_t barrier_req;
while ((ch = getopt(argc, argv, "ds:p:ui:n:")) != -1)
switch(ch) {
case 'd':
debug++;
break;
case 'p':
portnum = atoi(optarg);
break;
case 'i':
initme = atoi(optarg);
break;
case 'n':
barrier = optarg;
break;
case 's':
server = optarg;
break;
case 'u':
useudp = 1;
break;
default:
usage();
}
argv += optind;
argc -= optind;
if (argc)
usage();
if (strlen(barrier) >= sizeof(barrier_req.name))
usage();
/*
* Look for the syncserver access file.
*/
if (!server &&
(access(SYNCSERVER, R_OK) == 0) &&
((fp = fopen(SYNCSERVER, "r")) != NULL)) {
char buf[BUFSIZ], *bp;
if (fgets(buf, sizeof(buf), fp)) {
if ((bp = strchr(buf, '\n')))
*bp = (char) NULL;
/*
* Look for port spec
*/
if ((bp = strchr(buf, ':'))) {
*bp++ = (char) NULL;
portnum = atoi(bp);
}
server = strdup(buf);
}
fclose(fp);
}
if (!server)
usage();
/*
* Map server to IP.
*/
he = gethostbyname(server);
if (he)
memcpy((char *)&serverip, he->h_addr, he->h_length);
else {
fprintf(stderr, "gethostbyname(%s) failed\n", server);
exit(1);
}
/*
* Build up the request structure to send to the server.
*/
strcpy(barrier_req.name, barrier);
if (initme) {
barrier_req.request = BARRIER_INIT;
barrier_req.count = initme;
}
else
barrier_req.request = BARRIER_WAIT;
if (useudp)
n = doudp(&barrier_req, serverip, portnum);
else
n = dotcp(&barrier_req, serverip, portnum);
exit(n);
}
/*
* TCP version, which uses ssl if compiled in.
*/
static int
dotcp(barrier_req_t *barrier_reqp, struct in_addr serverip, int portnum)
{
int sock, n, cc;
struct sockaddr_in name;
char *bp, buf[BUFSIZ];
while (1) {
/* Create socket from which to read. */
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("creating stream socket:");
return -1;
}
/* Create name. */
name.sin_family = AF_INET;
name.sin_addr = serverip;
name.sin_port = htons(portnum);
if (connect(sock, (struct sockaddr *) &name,
sizeof(name)) == 0) {
break;
}
if (errno != ECONNREFUSED) {
perror("connecting stream socket");
close(sock);
return -1;
}
close(sock);
if (debug)
fprintf(stderr, "Connection refused. Waiting ...\n");
sleep(5);
}
n = 1;
if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &n, sizeof(n)) < 0) {
perror("setsockopt SO_KEEPALIVE");
goto bad;
}
/*
* Write the command to the socket and wait for the response.
*/
bp = (char *) barrier_reqp;
n = sizeof(*barrier_reqp);
while (n) {
if ((cc = write(sock, bp, n)) <= 0) {
if (cc < 0) {
perror("Writing to socket:");
goto bad;
}
fprintf(stderr, "write aborted");
goto bad;
}
bp += cc;
n -= cc;
}
while (1) {
if ((cc = read(sock, buf, sizeof(buf) - 1)) <= 0) {
if (cc < 0) {
perror("Reading from socket:");
goto bad;
}
break;
}
}
close(sock);
return 0;
bad:
close(sock);
return -1;
}
/*
* Not very robust ...
*/
static int
doudp(barrier_req_t *barrier_reqp, struct in_addr serverip, int portnum)
{
int sock, length, n, cc;
struct sockaddr_in name, client;
char buf[BUFSIZ];
/* Create socket from which to read. */
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
perror("creating dgram socket:");
return -1;
}
/* Create name. */
name.sin_family = AF_INET;
name.sin_addr = serverip;
name.sin_port = htons(portnum);
/*
* Write the command to the socket and wait for the response
*/
n = sizeof(*barrier_reqp);
cc = sendto(sock, barrier_reqp,
n, 0, (struct sockaddr *)&name, sizeof(name));
if (cc != n) {
if (cc < 0) {
perror("Writing to socket:");
return -1;
}
fprintf(stderr, "short write (%d != %d)\n", cc, n);
return -1;
}
cc = recvfrom(sock, buf, sizeof(buf) - 1, 0,
(struct sockaddr *)&client, &length);
if (cc < 0) {
perror("Reading from socket:");
return -1;
}
close(sock);
return 0;
}
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2003 University of Utah and the Flux Group.
* All rights reserved.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <netdb.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <syslog.h>
#include <signal.h>
#include <stdarg.h>
#include <assert.h>
#include <sys/fcntl.h>
#include <paths.h>
#include "decls.h"
#include "log.h"
int debug = 0;
static int handle_request(int, struct sockaddr_in *, barrier_req_t *,int);
static int makesockets(int portnum, int *udpsockp, int *tcpsockp);
int client_writeback(int sock, void *buf, int len, int tcp);
void client_writeback_done(int sock, struct sockaddr_in *client);
/*
* This is the barrier structure, one per barrier. For each barrier, a list
* of waiters, with the socket they are waiting on. Fixed for now, make
* dynamic later.
*/
#define MAXWAITERS 4096
struct barrier_ctrl {
char *name;
int count;
int index;
struct in_addr ipaddr; /* Debugging */
struct barrier_ctrl *next; /* Linked list */
struct {
int sock;
int istcp;
struct in_addr ipaddr; /* Debugging */
} waiters[MAXWAITERS];
};
typedef struct barrier_ctrl barrier_ctrl_t;
barrier_ctrl_t *allbarriers;
char *usagestr =
"usage: tmcd [-d] [-p #]\n"
" -d Turn on debugging.\n"
" -p portnum Specify a port number to listen on\n"
"\n";
void
usage()
{
fprintf(stderr, usagestr);
exit(1);
}
static void
setverbose(int sig)
{
signal(sig, SIG_IGN);
if (sig == SIGUSR1)
debug = 1;
else
debug = 0;
}
int
main(int argc, char **argv)
{
int tcpsock, udpsock, i, fdcount, ch;
int portnum = SERVER_PORTNUM;
FILE *fp;
char buf[BUFSIZ];
extern char build_info[];
fd_set fds, sfds;
while ((ch = getopt(argc, argv, "dp:")) != -1)
switch(ch) {
case 'p':
portnum = atoi(optarg);
break;
case 'd':
debug++;
break;
case 'h':
case '?':
default:
usage();
}
argc -= optind;
argv += optind;
if (argc)
usage();
if (debug)
loginit(0, 0);
else {
/* Become a daemon */
daemon(0, 0);
loginit(1, "tmcd");
}
info("daemon starting (version %d)\n", CURRENT_VERSION);
info("%s\n", build_info);
/*
* Create TCP/UDP server.
*/
if (makesockets(portnum, &udpsock, &tcpsock) < 0) {
error("Could not make sockets!");
exit(1);
}
signal(SIGUSR1, setverbose);
signal(SIGUSR2, setverbose);
/*
* Stash the pid away.
*/
i = getpid();
sprintf(buf, "%s/syncd.pid", _PATH_VARRUN);
fp = fopen(buf, "w");
if (fp != NULL) {
fprintf(fp, "%d\n", i);
(void) fclose(fp);
}
/*
* Now sit and listen for connections.
*/
FD_ZERO(&sfds);
FD_SET(tcpsock, &sfds);
FD_SET(udpsock, &sfds);
fdcount = tcpsock;
if (udpsock > tcpsock)
fdcount = udpsock;
fdcount++;
while (1) {
struct sockaddr_in client;
int length, cc, newsock;
barrier_req_t barrier_req;
fds = sfds;
errno = 0;
i = select(fdcount, &fds, NULL, NULL, NULL);
if (i < 0) {
if (errno == EINTR) {
warning("select interrupted, continuing");
continue;
}
fatal("select: %s", strerror(errno));
}
if (i == 0)
continue;
if (FD_ISSET(tcpsock, &fds)) {
length = sizeof(client);
newsock = accept(tcpsock,
(struct sockaddr *)&client, &length);
if (newsock < 0) {
errorc("accepting TCP connection");
continue;
}
if ((cc = read(newsock, &barrier_req,
sizeof(barrier_req))) <= 0) {
if (cc < 0)
errorc("Reading TCP request");
error("TCP connection aborted\n");
close(newsock);
continue;
}
handle_request(newsock, &client, &barrier_req, 1);
}
if (FD_ISSET(udpsock, &fds)) {
length = sizeof(client);
cc = recvfrom(udpsock, &barrier_req,
sizeof(barrier_req),
0, (struct sockaddr *)&client, &length);
if (cc <= 0) {
if (cc < 0)
errorc("Reading UDP request");
error("UDP Connection aborted\n");
continue;
}
handle_request(udpsock, &client, &barrier_req, 0);
}
}
close(tcpsock);
close(udpsock);
info("daemon terminating\n");
exit(0);
}
/*
* Create sockets on specified port.
*/
static int
makesockets(int portnum, int *udpsockp, int *tcpsockp)
{
struct sockaddr_in name;
int length, i, udpsock, tcpsock;
/*
* Setup TCP socket for incoming connections.
*/
/* Create socket from which to read. */
tcpsock = socket(AF_INET, SOCK_STREAM, 0);
if (tcpsock < 0) {
pfatal("opening stream socket");
}
i = 1;
if (setsockopt(tcpsock, SOL_SOCKET, SO_REUSEADDR,
(char *)&i, sizeof(i)) < 0)
pwarning("setsockopt(SO_REUSEADDR)");;
/* Create name. */
name.sin_family = AF_INET;
name.sin_addr.s_addr = INADDR_ANY;
name.sin_port = htons((u_short) portnum);
if (bind(tcpsock, (struct sockaddr *) &name, sizeof(name))) {
pfatal("binding stream socket");