Commit 2be46ba4 authored by Mike Hibler's avatar Mike Hibler

Implement heartbeat/status reports in Frisbee.

There are three pieces here, a change to the frisbee protocol itself, an
Emulab event component to get status back to the portal, and the surrounding
infrastructure to make it all work.

Frisbee heartbeat messages:

Added a new message type to the frisbee protocol, "Progress". In theory it
operates by having the server send a multicast progress request to its clients
which includes an interval at which to report (or "just once") and an
indication of what to report (nothing, progress summary, or full stats). The
client then sends unicast "fire and forget" UDP replies according to that
schedule. However, I took a shortcut for the moment and just added a command
line option to the client to tell it to report a summary at the indicated
interval (-H <interval>).  So the server never sends requests.

This is implemented in the client by a fourth thread since I wanted it to
operate independent of packet reception (which would cause clients to report
in a highly synchronized fashion due to multicast). The server instance just
logs progress reports into its log.

This protocol addition should be fully backward compatible as both client and
server ignore (but log) unknown messages.

Emulab progress report events:

When this is compiled in (-DEMULAB_EVENTS) and turned on (-E <server>), the
frisbee server instances will send a FRISBEEPROGRESS event to the indicated
event server for every progress report it receives (in addition to logging the
events to its own log). Right now it will create an event with key/value pairs
for the information in a client summary reply:

TSTAMP is the client's time at which it sends the event. Could be used by the
received to determine latency of the report if it cared (and if it assumed
that the clocks are in sync). We don't care about this.

SEQUENCE is the report number. Again, could be used by the receiver, in this
case to detect loss, if it cared. We don't.

CHUNKS_RECV is complete chunks that the client has received from the network.
CHUNKS_DECOMP is chunks decompressed by the client.  BYTES_WRITTEN is bytes
written to disk by the client.

Any of the three can be used by the event receiver as an indication of life
and/or progress. However, only the last would be a reasonable indicator of
time remaining since it is the last (and slowest) phase of imaging. To
estimate time remaining we could compare that value to the amount of
uncompressed data that is in the image. This makes the sketchy assumptions
that time for writes to the disk are uniform and that the number and distance
of seeks is uniform, but it is better than a sharp stick in the eye.

Emulab infrastructure:

There is a new sitevar "images/frisbee/heartbeat" which can be set to a
non-zero value to tell the frisbee MFS to fire off frisbee with -H <value>
and thus make reports. The default value of zero means to not make reports.
The tmcd "loadinfo" command sends this through via the HEARTBEAT=<value>
param.

REQUIRED A TMCD VERSION BUMP TO 41.
parent 746fc74a
/*
* Copyright (c) 2000-2014 University of Utah and the Flux Group.
* Copyright (c) 2000-2017 University of Utah and the Flux Group.
*
* {{{EMULAB-LICENSE
*
......@@ -46,4 +46,4 @@
* it in clientside/tmcc/common/libsetup.pm!
*/
#define DEFAULT_VERSION 2
#define CURRENT_VERSION 40
#define CURRENT_VERSION 41
#
# Copyright (c) 2000-2015 University of Utah and the Flux Group.
# Copyright (c) 2000-2017 University of Utah and the Flux Group.
#
# {{{EMULAB-LICENSE
#
......@@ -111,7 +111,7 @@ distclean: subdir-distclean
frisbee-mfs:
$(MAKE) -C growdisk client
$(MAKE) -C zapdisk mfs
$(MAKE) -C frisbee.redux client
$(MAKE) -C frisbee.redux frisbee-mfs
frisbee-mfs-install:
ifeq ($(SYSTEM),FreeBSD)
......
#
# Copyright (c) 2000-2015 University of Utah and the Flux Group.
# Copyright (c) 2000-2017 University of Utah and the Flux Group.
#
# {{{EMULAB-LICENSE
#
......@@ -47,6 +47,18 @@ WITH_SIGNING = 1
#
WITH_IGMP = 1
#
# XXX note that this option has totally changed from what it was originally.
#
# We now use this to compile Emulab event support into the server so that it
# can forward frisbee progress info to interested subscribers on boss.
#
# Originally it was to support an event receiver on clients for benchmarking;
# receiving events to start up syncronized with other instances and with a
# specific set of parameters.
#
WITH_EVENTS = 1
SYSTEM := $(shell uname -s)
# FreeBSD specific goop
......@@ -236,34 +248,27 @@ endif
#CFLAGS += -DCONDVARS_WORK
# Define this to a non-zero value to enable recording of trace data
#CFLAGS += -DNEVENTS=500000
#CFLAGS += -DTRACE_EVENTS=500000
# Turn on client event handling
# XXX renaming of PacketSend is to avoid a namespace collision with pubsub
#CFLAGS += -DDOEVENTS -DPacketSend=_frisPacketSend
#CLIENTOBJS += event.o $(OBJDIR)/lib/event/event.o $(OBJDIR)/lib/event/util.o
#CLIENTLIBS += -lpubsub
#EVENTFLAGS = $(CFLAGS) -I/usr/local/include/pubsub -I$(TESTBED_SRCDIR)
ifeq ($(WITH_EVENTS),1)
CFLAGS += -DEMULAB_EVENTS -DBOSSNODE='"$(BOSSNODE)"'
SERVEROBJS += event.o $(TESTBED_LIBOBJDIR)/event/event.o $(TESTBED_LIBOBJDIR)/event/util.o
SERVERLIBS += -L/usr/local/lib -lpubsub -lcrypto
EVENTFLAGS = $(CFLAGS) -I/usr/local/include -I$(TESTBED_LIBSRCDIR)
endif
frisbee-debug: $(CLIENTOBJS) $(IUZDEPS)
$(CC) $(LDFLAGS) $(CLIENTFLAGS) $(CLIENTOBJS) $(CLIENTLIBS) $(IUZLIBS) -o $@
# cp frisbee frisbee.debug
# strip frisbee
frisbeed-debug: $(SERVEROBJS)
$(CC) $(LDFLAGS) $(SERVERFLAGS) $(SERVEROBJS) $(SERVERLIBS) -o $@
# cp frisbeed frisbeed.debug
# strip frisbeed
frisupload-debug: $(UPLOADOBJS)
$(CC) $(LDFLAGS) $(UPLOADFLAGS) $(UPLOADOBJS) $(UPLOADLIBS) -o $@
# cp frisupload frisupload.debug
# strip frisupload
frisuploadd-debug: $(UPLOADDOBJS)
$(CC) $(LDFLAGS) $(UPLOADDFLAGS) $(UPLOADDOBJS) $(UPLOADDLIBS) -o $@
# cp frisuploadd frisuploadd.debug
# strip frisuploadd
mfrisbeed-debug: $(MSERVEROBJS)
$(CC) $(LDFLAGS) $(MSERVERFLAGS) $(MSERVEROBJS) $(MSERVERLIBS) -o $@
......@@ -324,12 +329,15 @@ client-install: client
$(INSTALL_PROGRAM) frisbee $(DESTDIR)/usr/local/bin/frisbee
$(INSTALL_PROGRAM) frisupload $(DESTDIR)/usr/local/bin/frisupload
frisbee-mfs-install: client
frisbee-mfs:
$(MAKE) WITH_EVENTS=0 client
frisbee-mfs-install: frisbee-mfs
$(INSTALL_PROGRAM) frisbee $(DESTDIR)$(CLIENT_BINDIR)/frisbee
$(INSTALL_PROGRAM) frisupload $(DESTDIR)$(CLIENT_BINDIR)/frisupload
clean:
/bin/rm -f *.o *.a *.debug
/bin/rm -f *.o *.a *-debug
/bin/rm -f frisbee frisbeed mfrisbeed frisupload frisuploadd
/bin/rm -f frisbee.tar frisbee.tar.gz
/bin/rm -rf frisbee-dist
......
This diff is collapsed.
/*
* Copyright (c) 2000-2015 University of Utah and the Flux Group.
* Copyright (c) 2000-2017 University of Utah and the Flux Group.
*
* {{{EMULAB-LICENSE
*
......@@ -242,6 +242,12 @@ typedef struct {
char map[MAXCHUNKSIZE/CHAR_BIT];
} BlockMap_t;
typedef struct {
uint32_t chunks_in; /* Chunk successfully received */
uint32_t chunks_out; /* Chunk successfully written */
uint64_t bytes_out; /* Bytes written to disk */
} __attribute__((__packed__)) ClientSummary_t;
/*
* Packet defs.
*/
......@@ -326,6 +332,45 @@ typedef struct {
int32_t elapsed;
ClientStats_t stats;
} leave2;
/*
* Report progress. The request from the server tells
* the client how often and what to report. The reply
* from the client contains the requested info.
*
* On request, "when" is measured in seconds, with zero
* meaning "report one time right now". On reply, "when"
* contains the local timestamp for the info reported.
*
* On request, "what" is a flag word currently what info
* to report. On reply, it is the data that is included
* (which should be the same). Currently this can be one
* or more of:
* - a summary (chunks received, bytes written),
* - stats (same as reported by leave)
*
* On request, "seq" is an initial sequence number to
* use in reports. On reply it is the current sequence
* number, which is incremented for each report. This
* can be used on the server side to see if reports are
* being lost.
*
* Note that each client will skew the initial report
* by some random amount to prevent all clients reporting
* in sync.
*
* Requests can be multicast, replies are unicast.
*/
struct {
struct {
uint32_t clientid;
uint32_t when;
uint32_t what;
uint32_t seq;
} hdr;
ClientSummary_t summary;
ClientStats_t stats;
} progress;
} msg;
} Packet_t;
#define PKTTYPE_REQUEST 1
......@@ -338,6 +383,11 @@ typedef struct {
#define PKTSUBTYPE_LEAVE2 5
#define PKTSUBTYPE_PREQUEST 6
#define PKTSUBTYPE_JOIN2 7
#define PKTSUBTYPE_PROGRESS 8
/* types of progress reports */
#define PKTPROGRESS_SUMMARY 1
#define PKTPROGRESS_STATS 2
#ifdef MASTER_SERVER
#include <netinet/in.h>
......@@ -454,7 +504,7 @@ unsigned long ClientNetID(void);
int PacketReceive(Packet_t *p);
int PacketRequest(Packet_t *p);
void PacketSend(Packet_t *p, int *resends);
void PacketReply(Packet_t *p);
void PacketReply(Packet_t *p, int firenforget);
int PacketValid(Packet_t *p, int nchunks);
void dump_network(void);
#ifdef MASTER_SERVER
......
/*
* Copyright (c) 2002-2013 University of Utah and the Flux Group.
* Copyright (c) 2002-2017 University of Utah and the Flux Group.
*
* {{{EMULAB-LICENSE
*
......@@ -23,9 +23,11 @@
/*
* Testbed event system interface
* Allow starting multiple clients in a synchronized way.
* Supports sending of periodic events.
*/
#ifdef EMULAB_EVENTS
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
......@@ -37,344 +39,141 @@
#include <arpa/inet.h>
#include <sys/time.h>
#include "lib/libtb/tbdefs.h"
#include "lib/event/event.h"
//#include "libtb/tbdefs.h"
#include "event/event.h"
#include "decls.h"
#include "log.h"
#include "event.h"
//#define EVENTDEBUG
static Event_t lastevent;
static event_handle_t ehandle;
static int gotevent;
static int clientnum;
static address_tuple_t tuple;
static char *eserver;
static int
useclient(int clinum, char *buf)
EventReinit(void)
{
char *cp;
int low, high;
if (ehandle == NULL) {
if (eserver == NULL)
return 1;
ehandle = event_register(eserver, 0);
if (ehandle == NULL) {
FrisWarning("could not register with event server %s",
eserver);
return 1;
}
cp = buf;
if (cp != NULL) {
while ((cp = strsep(&buf, ",")) != NULL) {
if (sscanf(cp, "%d-%d", &low, &high) == 2) {
if (clinum >= low && clinum <= high)
return 1;
continue;
}
if (sscanf(cp, "%d", &low) == 1) {
if (clinum == low)
return 1;
continue;
}
tuple = address_tuple_alloc();
if (tuple == NULL) {
FrisWarning("could not allocate an address tuple");
EventDeinit();
return 1;
}
}
return 0;
}
/*
* type==START
* STAGGER=N PKTTIMEOUT=N IDLETIMER=N READAHEAD=N INPROGRESS=N REDODELAY=N
* type==STOP
* STAGGER=N
*/
static int
parse_event(Event_t *event, char *etype, char *buf)
int
EventInit(char *server)
{
char *cp;
int val;
char str[STRSIZE+1];
int skipping = 0;
char buf[BUFSIZ];
memset(event, -1, sizeof *event);
if (strcmp(etype, TBDB_EVENTTYPE_START) == 0)
event->type = EV_START;
else if (strcmp(etype, TBDB_EVENTTYPE_STOP) == 0)
event->type = EV_STOP;
else
if (server == NULL) {
FrisWarning("no event server specified");
return 1;
}
cp = buf;
if (cp != NULL) {
while ((cp = strsep(&buf, " ")) != NULL) {
/*
* Hideous Hack Alert!
*
* Assume hostname is of the form 'c-<num>.<domain>'
* and use <num> to determine our client number.
* We use that number and compare to the useclients
* string to determine if we should process this event.
* If not, we ignore this event.
*/
if (sscanf(cp, "USECLIENTS=%s", str) == 1) {
if (!useclient(clientnum, str))
skipping = 1;
else
skipping = 0;
continue;
}
if (sscanf(cp, "SKIPCLIENTS=%s", str) == 1) {
if (useclient(clientnum, str)) {
gotevent = 0;
return 0;
}
continue;
}
if (skipping)
continue;
/*
* Convert server/port to elvin thing.
*/
snprintf(buf, sizeof(buf), "elvin://%s", server);
eserver = strdup(buf);
if (sscanf(cp, "STARTDELAY=%d", &val) == 1) {
event->data.start.startdelay = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "STARTAT=%d", &val) == 1) {
event->data.start.startat = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "PKTTIMEOUT=%d", &val) == 1) {
event->data.start.pkttimeout = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "IDLETIMER=%d", &val) == 1) {
event->data.start.idletimer = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "CHUNKBUFS=%d", &val) == 1) {
event->data.start.chunkbufs = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "WRITEBUFMEM=%d", &val) == 1) {
event->data.start.writebufmem = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "MAXMEM=%d", &val) == 1) {
event->data.start.maxmem = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "READAHEAD=%d", &val) == 1) {
event->data.start.readahead = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "INPROGRESS=%d", &val) == 1) {
event->data.start.inprogress = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "REDODELAY=%d", &val) == 1) {
event->data.start.redodelay = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "IDLEDELAY=%d", &val) == 1) {
event->data.start.idledelay = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "SLICE=%d", &val) == 1) {
event->data.start.slice = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "ZEROFILL=%d", &val) == 1) {
event->data.start.zerofill = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "RANDOMIZE=%d", &val) == 1) {
event->data.start.randomize = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "NOTHREADS=%d", &val) == 1) {
event->data.start.nothreads = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "DOSTYPE=%d", &val) == 1) {
event->data.start.dostype = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "DEBUG=%d", &val) == 1) {
event->data.start.debug = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "TRACE=%d", &val) == 1) {
event->data.start.trace = val;
gotevent = 1;
continue;
}
if (sscanf(cp, "TRACEPREFIX=%s", str) == 1) {
strncpy(event->data.start.traceprefix,
str, STRSIZE-1);
gotevent = 1;
continue;
}
if (sscanf(cp, "EXITSTATUS=%d", &val) == 1) {
event->data.stop.exitstatus = val;
gotevent = 1;
continue;
}
}
}
return 0;
return EventReinit();
}
static void
callback(event_handle_t handle, event_notification_t notification, void *data)
void
EventDeinit(void)
{
char buf[7][64];
char args[256];
int len = 64;
buf[0][0] = buf[1][0] = buf[2][0] = buf[3][0] = 0;
buf[4][0] = buf[5][0] = buf[6][0] = 0;
event_notification_get_site(handle, notification, buf[0], len);
event_notification_get_expt(handle, notification, buf[1], len);
event_notification_get_group(handle, notification, buf[2], len);
event_notification_get_host(handle, notification, buf[3], len);
event_notification_get_objtype(handle, notification, buf[4], len);
event_notification_get_objname(handle, notification, buf[5], len);
event_notification_get_eventtype(handle, notification, buf[6], len);
event_notification_get_arguments(handle, notification,
args, sizeof(args));
#ifdef EVENTDEBUG
{
struct timeval now;
static int ecount;
gettimeofday(&now, NULL);
fprintf(stderr, "Event %d: %lu.%03lu %s %s %s %s %s %s %s %s\n",
++ecount, now.tv_sec, now.tv_usec / 1000,
buf[0], buf[1], buf[2],
buf[3], buf[4], buf[5], buf[6], args);
if (ehandle != NULL) {
if (tuple != NULL) {
address_tuple_free(tuple);
tuple = NULL;
}
event_unregister(ehandle);
ehandle = NULL;
}
#endif
if (parse_event(&lastevent, buf[6], args))
FrisLog("bogus event '%s %s' ignored", buf[6], args);
}
int
EventInit(char *server)
EventSendClientReport(char *node, char *image, uint32_t tstamp, uint32_t seq,
ClientSummary_t *summary, ClientStats_t *stats)
{
char buf[BUFSIZ], ipbuf[BUFSIZ];
struct hostent *he;
struct in_addr myip;
char *ipaddr;
address_tuple_t tuple;
if (server == NULL) {
FrisWarning("no event server specified");
return 1;
}
if (gethostname(buf, sizeof(buf)) < 0) {
FrisPwarning("could not get hostname");
return 1;
}
if ((he = gethostbyname(buf)) == NULL) {
FrisWarning("could not get IP address from hostname");
return 1;
}
memcpy((char *)&myip, he->h_addr, he->h_length);
strcpy(ipbuf, inet_ntoa(myip));
ipaddr = ipbuf;
event_notification_t notification;
/*
* Hideous Hack Alert!
*
* Assume our hostname is of the form 'c-<num>.<domain>'
* and use <num> to determine our client number.
* We use that number later to compare to the MAXCLIENTS
* field to determine if we should process an event.
* In case we got disconnected
*/
if (sscanf(buf, "c-%d.", &clientnum) != 1) {
FrisWarning("could not determine client number from hostname %s",
buf);
if (ehandle == NULL && EventReinit())
return 1;
} else if (debug)
FrisLog("client number %d for event handling", clientnum);
/*
* Convert server/port to elvin thing.
*/
snprintf(buf, sizeof(buf), "elvin://%s", server);
server = buf;
tuple->host = BOSSNODE;
tuple->objtype = "FRISBEESTATUS";
tuple->objname = node;
tuple->eventtype = image;
/*
* Construct an address tuple for subscribing to events for
* this node.
*/
tuple = address_tuple_alloc();
if (tuple == NULL) {
FrisWarning("could not allocate an address tuple");
notification = event_notification_alloc(ehandle, tuple);
if (notification == NULL) {
FrisWarning("EventSend: unable to allocate notification!");
return 1;
}
tuple->host = ADDRESSTUPLE_ANY; /* ipaddr; */
tuple->site = ADDRESSTUPLE_ANY;
tuple->group = ADDRESSTUPLE_ANY;
tuple->expt = ADDRESSTUPLE_ANY;
tuple->objtype = TBDB_OBJECTTYPE_CUSTOM;
tuple->objname = ADDRESSTUPLE_ANY;
tuple->eventtype = ADDRESSTUPLE_ANY;
/*
* Register with the event system.
* Insert interesting key/value pairs:
*
* Always:
* TSTAMP: int32, unix timestamp of report from client
* SEQUENCE: int32, sequence number of report
*
* From summary (if present):
* CHUNKS_RECV: int32, chunks successfully received by client
* CHUNKS_DECOMP: int32, chunks successfully decompressed
* BYTES_WRITTEN: int64, bytes written to disk
*
* From stats (if present):
* nothing right now as client does not pass this.
*/
ehandle = event_register(server, 0);
if (ehandle == NULL) {
FrisWarning("could not register with event system");
address_tuple_free(tuple);
return 1;
(void) event_notification_put_int32(ehandle, notification,
"TSTAMP", tstamp);
(void) event_notification_put_int32(ehandle, notification,
"SEQUENCE", seq);
if (summary != NULL) {
(void) event_notification_put_int32(ehandle, notification,
"CHUNKS_RECV",
summary->chunks_in);
(void) event_notification_put_int32(ehandle, notification,
"CHUNKS_DECOMP",
summary->chunks_out);
(void) event_notification_put_int64(ehandle, notification,
"BYTES_WRITTEN",
summary->bytes_out);
}