Commit df78b7ef authored by Mike Hibler's avatar Mike Hibler

More tweaks to frisbee heartbeat code.

Make sure server doesn't exit as long as it is getting heartbeats
from known clients. We used to exit when we stopped getting requests,
however clients often finish their network activity long before they
are actually done.

Emulab event now reports Mebibytes rather than bytes. It is accurate
enough and avoids perl bigints in the receiver(s).
parent 42af1a08
...@@ -167,7 +167,7 @@ MSERVEROBJS += config_emulab.o ...@@ -167,7 +167,7 @@ MSERVEROBJS += config_emulab.o
MSERVERLIBS += $(TESTBED_LIBOBJDIR)/libtb/libtb.a $(MYSQLLIBS) MSERVERLIBS += $(TESTBED_LIBOBJDIR)/libtb/libtb.a $(MYSQLLIBS)
endif endif
CFLAGS = -O2 -g -Wall -fno-builtin-log $(LDSTATIC) $(PTHREADCFLAGS) -DSTATS -DMASTER_SERVER CFLAGS = -O2 -g -Wall -fno-builtin-log $(LDSTATIC) $(PTHREADCFLAGS) -DMASTER_SERVER -DSTATS
LDFLAGS = $(LDSTATIC) LDFLAGS = $(LDSTATIC)
IUZFLAGS = -DFRISBEE -I$(IZSRCDIR) IUZFLAGS = -DFRISBEE -I$(IZSRCDIR)
......
...@@ -124,15 +124,6 @@ putint32(event_handle_t hand, event_notification_t note, char *key, uint32_t val ...@@ -124,15 +124,6 @@ putint32(event_handle_t hand, event_notification_t note, char *key, uint32_t val
(void) event_notification_put_string(hand, note, key, valbuf); (void) event_notification_put_string(hand, note, key, valbuf);
} }
static inline void
putint64(event_handle_t hand, event_notification_t note, char *key, uint64_t val)
{
char valbuf[24];
snprintf(valbuf, sizeof valbuf, "%"PRIu64, val);
(void) event_notification_put_string(hand, note, key, valbuf);
}
int int
EventSendClientReport(char *node, char *image, uint32_t tstamp, uint32_t seq, EventSendClientReport(char *node, char *image, uint32_t tstamp, uint32_t seq,
ClientSummary_t *summary, ClientStats_t *stats) ClientSummary_t *summary, ClientStats_t *stats)
...@@ -164,9 +155,9 @@ EventSendClientReport(char *node, char *image, uint32_t tstamp, uint32_t seq, ...@@ -164,9 +155,9 @@ EventSendClientReport(char *node, char *image, uint32_t tstamp, uint32_t seq,
* SEQUENCE: int32, sequence number of report * SEQUENCE: int32, sequence number of report
* *
* From summary (if present): * From summary (if present):
* CHUNKS_RECV: int32, chunks successfully received by client * CHUNKS_RECV: int32, chunks successfully received by client
* CHUNKS_DECOMP: int32, chunks successfully decompressed * CHUNKS_DECOMP: int32, chunks successfully decompressed
* BYTES_WRITTEN: int64, bytes written to disk * MBYTES_WRITTEN: int32, mebibytes written to disk
* *
* From stats (if present): * From stats (if present):
* nothing right now as client does not pass this. * nothing right now as client does not pass this.
...@@ -178,8 +169,8 @@ EventSendClientReport(char *node, char *image, uint32_t tstamp, uint32_t seq, ...@@ -178,8 +169,8 @@ EventSendClientReport(char *node, char *image, uint32_t tstamp, uint32_t seq,
summary->chunks_in); summary->chunks_in);
putint32(ehandle, notification, "CHUNKS_DECOMP", putint32(ehandle, notification, "CHUNKS_DECOMP",
summary->chunks_out); summary->chunks_out);
putint64(ehandle, notification, "BYTES_WRITTEN", putint32(ehandle, notification, "MBYTES_WRITTEN",
summary->bytes_out); (uint32_t)(summary->bytes_out / (1024*1024)));
} }
if (event_notify(ehandle, notification) == 0) { if (event_notify(ehandle, notification) == 0) {
......
...@@ -74,7 +74,7 @@ char *filename; ...@@ -74,7 +74,7 @@ char *filename;
char *hserver; char *hserver;
unsigned int hinterval; unsigned int hinterval;
struct timeval IdleTimeStamp, FirstReq, LastReq; struct timeval IdleTimeStamp, FirstReq, LastReq;
volatile int activeclients; volatile int activeclients, totalclients;
/* Forward decls */ /* Forward decls */
void quit(int); void quit(int);
...@@ -85,18 +85,25 @@ static void compute_sendrate(void); ...@@ -85,18 +85,25 @@ static void compute_sendrate(void);
static void dumpstats(void); static void dumpstats(void);
static int findclient(uint32_t id); static int findclient(uint32_t id);
#ifdef STATS
/* /*
* Track duplicate chunks/joins for stats gathering * Progress tracking.
*
* Since JOIN/LEAVE messages are UDP like everything else and can be lost,
* this is not definitive.
*/ */
char *chunkmap; #define MAXCLIENTS 1024 /* not a real limit, just for stats */
static struct {
#define MAXCLIENTS 256 /* not a real limit, just for stats */
struct {
unsigned int id; unsigned int id;
unsigned int ip; unsigned int ip;
unsigned int lastseq; unsigned int lastseq;
} clients[MAXCLIENTS]; } clients[MAXCLIENTS];
static int nextclientix = 0, maxclientnum = 0;
#ifdef STATS
/*
* Track duplicate chunks/joins for stats gathering
*/
char *chunkmap;
/* /*
* Stats gathering. * Stats gathering.
...@@ -469,44 +476,66 @@ ClientJoin(Packet_t *p, int version) ...@@ -469,44 +476,66 @@ ClientJoin(Packet_t *p, int version)
{ {
struct in_addr ipaddr = { p->hdr.srcip }; struct in_addr ipaddr = { p->hdr.srcip };
unsigned int clientid = p->msg.join.clientid; unsigned int clientid = p->msg.join.clientid;
int i, j;
EVENT(1, EV_JOINREQ, ipaddr, clientid, version, 0, 0); EVENT(1, EV_JOINREQ, ipaddr, clientid, version, 0, 0);
WorkQueueEnqueueJoin(version, clientid); WorkQueueEnqueueJoin(version, clientid);
#ifdef STATS
{ /*
int i, j = -1; * Sanity check the new client.
*/
for (i = 0; i < MAXCLIENTS; i++) { j = -1;
if (clients[i].id == clientid) { for (i = 0; i < nextclientix; i++) {
if (clients[i].ip != ipaddr.s_addr) { if (clients[i].id == clientid) {
FrisLog("%s reuses active client id", if (clients[i].ip != ipaddr.s_addr) {
inet_ntoa(ipaddr)); FrisLog("%s reuses active client id",
clients[i].ip = ipaddr.s_addr; inet_ntoa(ipaddr));
} clients[i].ip = ipaddr.s_addr;
break;
}
if (clients[i].ip == ipaddr.s_addr) {
FrisLog("%s rejoins with different cid, ocid=%u",
inet_ntoa(ipaddr), clients[i].id);
clients[i].id = clientid;
break;
} }
if (j == -1 && clients[i].id == 0) break;
j = i; }
if (clients[i].ip == ipaddr.s_addr) {
int k;
FrisLog("%s rejoins with different cid, ocid=%u",
inet_ntoa(ipaddr), clients[i].id);
/*
* Index is assumed to be unique, so look for other
* users. We just report it now.
*/
for (k = i + 1; k < nextclientix; k++)
if (clients[k].id == clients[i].id) {
FrisLog("%s also using cid; "
"pooch screwed",
clients[k].ip);
}
clients[i].id = clientid;
break;
}
if (j == -1 && clients[i].id == 0)
j = i;
}
if (i == nextclientix) {
if (j == -1 && nextclientix < MAXCLIENTS) {
j = nextclientix++;
if (nextclientix > maxclientnum)
maxclientnum = nextclientix;
} }
if (i == MAXCLIENTS) { if (j != -1) {
activeclients++; activeclients++;
if (j != -1) { totalclients++;
clients[j].id = clientid; clients[j].id = clientid;
clients[j].ip = ipaddr.s_addr; clients[j].ip = ipaddr.s_addr;
clients[j].lastseq = 0; clients[j].lastseq = 0;
} } else {
FrisLog("more than %d clients, not tracking %s",
MAXCLIENTS, inet_ntoa(ipaddr));
} }
i = j;
} }
DOSTAT(joinrep++); DOSTAT(joinrep++);
#else
activeclients++;
#endif
EVENT(1, EV_JOINREP, ipaddr, CHUNKSIZE, BLOCKSIZE, EVENT(1, EV_JOINREP, ipaddr, CHUNKSIZE, BLOCKSIZE,
(FileInfo.filesize >> 32), FileInfo.filesize); (FileInfo.filesize >> 32), FileInfo.filesize);
...@@ -529,33 +558,25 @@ ClientLeave(Packet_t *p) ...@@ -529,33 +558,25 @@ ClientLeave(Packet_t *p)
{ {
struct in_addr ipaddr = { p->hdr.srcip }; struct in_addr ipaddr = { p->hdr.srcip };
unsigned int clientid = p->msg.leave.clientid; unsigned int clientid = p->msg.leave.clientid;
int i;
EVENT(1, EV_LEAVEMSG, ipaddr, clientid, p->msg.leave.elapsed, 0, 0); EVENT(1, EV_LEAVEMSG, ipaddr, clientid, p->msg.leave.elapsed, 0, 0);
#ifdef STATS i = findclient(clientid);
{ if (i >= 0) {
int i = findclient(clientid); clients[i].id = clients[i].ip = 0;
clients[i].lastseq = 0;
if (i >= 0) { if (nextclientix == i + 1)
activeclients--; nextclientix = i;
clients[i].id = clients[i].ip = 0; activeclients--;
clients[i].lastseq = 0; FrisLog("%s (id %u, image %s): leaves at %s, "
FrisLog("%s (id %u, image %s): leaves at %s, " "ran for %d seconds. %d active clients",
"ran for %d seconds. %d active clients", inet_ntoa(ipaddr), clientid, filename,
inet_ntoa(ipaddr), clientid, filename, CurrentTimeString(), p->msg.leave.elapsed,
CurrentTimeString(), activeclients);
p->msg.leave.elapsed, activeclients); } else
} else FrisLog("%s (id %u): spurious leave ignored",
FrisLog("%s (id %u): spurious leave ignored", inet_ntoa(ipaddr), clientid);
inet_ntoa(ipaddr), clientid);
}
#else
activeclients--;
FrisLog("%s (id %u, image %s): leaves at %s, ran for %d seconds. "
"%d active clients",
inet_ntoa(ipaddr), clientid, filename, CurrentTimeString(),
p->msg.leave.elapsed, activeclients);
#endif
} }
/* /*
...@@ -567,34 +588,28 @@ ClientLeave2(Packet_t *p) ...@@ -567,34 +588,28 @@ ClientLeave2(Packet_t *p)
{ {
struct in_addr ipaddr = { p->hdr.srcip }; struct in_addr ipaddr = { p->hdr.srcip };
unsigned int clientid = p->msg.leave2.clientid; unsigned int clientid = p->msg.leave2.clientid;
int i;
EVENT(1, EV_LEAVEMSG, ipaddr, clientid, p->msg.leave2.elapsed, 0, 0); EVENT(1, EV_LEAVEMSG, ipaddr, clientid, p->msg.leave2.elapsed, 0, 0);
i = findclient(clientid);
if (i >= 0) {
clients[i].id = clients[i].ip = 0;
clients[i].lastseq = 0;
if (nextclientix == i + 1)
nextclientix = i;
activeclients--;
FrisLog("%s (id %u, image %s): leaves at %s, "
"ran for %d seconds. %d active clients",
inet_ntoa(ipaddr), clientid, filename,
CurrentTimeString(), p->msg.leave2.elapsed,
activeclients);
#ifdef STATS #ifdef STATS
{ ClientStatsDump(clientid, &p->msg.leave2.stats);
int i = findclient(clientid);
if (i >= 0) {
clients[i].id = clients[i].ip = 0;
clients[i].lastseq = 0;
activeclients--;
FrisLog("%s (id %u, image %s): leaves at %s, "
"ran for %d seconds. %d active clients",
inet_ntoa(ipaddr), clientid, filename,
CurrentTimeString(),
p->msg.leave2.elapsed, activeclients);
ClientStatsDump(clientid, &p->msg.leave2.stats);
} else
FrisLog("%s (id %u): spurious leave ignored",
inet_ntoa(ipaddr), clientid);
}
#else
activeclients--;
FrisLog("%s (id %u, image %s): leaves at %s, ran for %d seconds. "
"%d active clients",
inet_ntoa(ipaddr), clientid, filename, CurrentTimeString(),
p->msg.leave2.elapsed, activeclients);
#endif #endif
} else
FrisLog("%s (id %u): spurious leave ignored",
inet_ntoa(ipaddr), clientid);
} }
/* /*
...@@ -667,10 +682,8 @@ ClientReport(Packet_t *p) ...@@ -667,10 +682,8 @@ ClientReport(Packet_t *p)
uint32_t seq = 0, tstamp = 0; uint32_t seq = 0, tstamp = 0;
ClientSummary_t *sump = NULL; ClientSummary_t *sump = NULL;
ClientStats_t *statp = NULL; ClientStats_t *statp = NULL;
#ifdef STATS
uint32_t lastseq; uint32_t lastseq;
int i; int i;
#endif
if (p->hdr.type != PKTTYPE_REPLY || if (p->hdr.type != PKTTYPE_REPLY ||
p->msg.progress.hdr.clientid == 0 || p->msg.progress.hdr.clientid == 0 ||
...@@ -678,13 +691,16 @@ ClientReport(Packet_t *p) ...@@ -678,13 +691,16 @@ ClientReport(Packet_t *p)
(p->msg.progress.hdr.what != 0 && (p->msg.progress.hdr.what != 0 &&
p->hdr.datalen < sizeof(p->msg.progress))) p->hdr.datalen < sizeof(p->msg.progress)))
return; return;
#ifdef STATS
if ((i = findclient(p->msg.progress.hdr.clientid)) < 0) if ((i = findclient(p->msg.progress.hdr.clientid)) < 0)
return; return;
lastseq = clients[i].lastseq; lastseq = clients[i].lastseq;
#endif
DOSTAT(reportslogged++); DOSTAT(reportslogged++);
/* XXX keep the server alive */
gettimeofday(&IdleTimeStamp, 0);
tstamp = p->msg.progress.hdr.when; tstamp = p->msg.progress.hdr.when;
seq = p->msg.progress.hdr.seq; seq = p->msg.progress.hdr.seq;
...@@ -723,13 +739,11 @@ ClientReport(Packet_t *p) ...@@ -723,13 +739,11 @@ ClientReport(Packet_t *p)
#endif #endif
} }
#ifdef STATS
if (seq != lastseq + 1) if (seq != lastseq + 1)
FrisLog("%s (id %u): lost reports: last=%u, this=%u", FrisLog("%s (id %u): lost reports: last=%u, this=%u",
inet_ntoa(ipaddr), inet_ntoa(ipaddr),
p->msg.progress.hdr.clientid, lastseq, seq); p->msg.progress.hdr.clientid, lastseq, seq);
clients[i].lastseq = seq; clients[i].lastseq = seq;
#endif
#ifdef EMULAB_EVENTS #ifdef EMULAB_EVENTS
/* /*
...@@ -914,27 +928,27 @@ PlayFrisbee(void) ...@@ -914,27 +928,27 @@ PlayFrisbee(void)
continue; continue;
} }
#ifdef STATS
/* If less than zero, exit when last client leaves */ /* If less than zero, exit when last client leaves */
if (timeout < 0 && if (timeout < 0 &&
Stats.joins > 0 && activeclients == 0) { totalclients > 0 && activeclients == 0) {
fsleep(2000000); fsleep(2000000);
FrisLog("Last client left!"); FrisLog("Last client left!");
break; break;
} }
#endif
if (idlelastloop) { if (idlelastloop) {
if (timeout > 0 && if (timeout > 0 &&
stamp.tv_sec - IdleTimeStamp.tv_sec > stamp.tv_sec - IdleTimeStamp.tv_sec >
timeout) { timeout) {
FrisLog("No requests for %d seconds!", FrisLog("No requests or reports "
timeout); "for %d seconds "
"(%d remaining clients)",
timeout, activeclients);
break; break;
} }
} else { } else {
DOSTAT(goesidle++); DOSTAT(goesidle++);
IdleTimeStamp = stamp; IdleTimeStamp.tv_sec = stamp.tv_sec;
idlelastloop = 1; idlelastloop = 1;
} }
continue; continue;
...@@ -1499,31 +1513,20 @@ mypread(int fd, void *buf, size_t nbytes, off_t offset) ...@@ -1499,31 +1513,20 @@ mypread(int fd, void *buf, size_t nbytes, off_t offset)
return count; return count;
} }
#ifdef STATS
static int static int
findclient(uint32_t id) findclient(uint32_t id)
{ {
int i; int i;
for (i = 0; i < MAXCLIENTS; i++) for (i = 0; i < nextclientix; i++)
if (clients[i].id == id) if (clients[i].id == id)
return i; return i;
return -1; return -1;
} }
#endif
#define LINK_OVERHEAD (14+4+8+12) /* ethernet (hdr+CRC+preamble+gap) */ #define LINK_OVERHEAD (14+4+8+12) /* ethernet (hdr+CRC+preamble+gap) */
#define IP_OVERHEAD (20+8) /* IP + UDP hdrs */ #define IP_OVERHEAD (20+8) /* IP + UDP hdrs */
static unsigned long
compute_bandwidth(int bsize, int binterval)
{
int wireblocksize = (sizeof(Packet_t)+IP_OVERHEAD+LINK_OVERHEAD) * 8;
double burstspersec = 1000000.0 / binterval;
return (unsigned long)(burstspersec*bsize*wireblocksize);
}
#define LOSS_INTERVAL 500 /* interval in which we collect data (ms) */ #define LOSS_INTERVAL 500 /* interval in which we collect data (ms) */
#define MULT_DECREASE 0.90 /* mult factor to decrease burst rate */ #define MULT_DECREASE 0.90 /* mult factor to decrease burst rate */
#define ADD_INCREASE 1 /* add factor to increase burst rate */ #define ADD_INCREASE 1 /* add factor to increase burst rate */
...@@ -1743,6 +1746,17 @@ compute_sendrate(void) ...@@ -1743,6 +1746,17 @@ compute_sendrate(void)
burstsize, burstinterval); burstsize, burstinterval);
} }
#ifdef STATS
static unsigned long
compute_bandwidth(int bsize, int binterval)
{
int wireblocksize = (sizeof(Packet_t)+IP_OVERHEAD+LINK_OVERHEAD) * 8;
double burstspersec = 1000000.0 / binterval;
return (unsigned long)(burstspersec*bsize*wireblocksize);
}
#endif
static void static void
dumpstats(void) dumpstats(void)
{ {
...@@ -1773,6 +1787,8 @@ dumpstats(void) ...@@ -1773,6 +1787,8 @@ dumpstats(void)
FrisLog(" user/sys CPU time: %d.%03d/%d.%03d", FrisLog(" user/sys CPU time: %d.%03d/%d.%03d",
ru.ru_utime.tv_sec, ru.ru_utime.tv_usec/1000, ru.ru_utime.tv_sec, ru.ru_utime.tv_usec/1000,
ru.ru_stime.tv_sec, ru.ru_stime.tv_usec/1000); ru.ru_stime.tv_sec, ru.ru_stime.tv_usec/1000);
FrisLog(" max/total clients: %d/%d",
maxclientnum, totalclients);
FrisLog(" msgs in/out: %d/%d", FrisLog(" msgs in/out: %d/%d",
Stats.msgin, Stats.joinrep + Stats.blockssent); Stats.msgin, Stats.joinrep + Stats.blockssent);
FrisLog(" joins/leaves: %d/%d", Stats.joins, Stats.leaves); FrisLog(" joins/leaves: %d/%d", Stats.joins, Stats.leaves);
......
...@@ -99,7 +99,7 @@ sub callbackFunc($$$) { ...@@ -99,7 +99,7 @@ sub callbackFunc($$$) {
my $seq = event_notification_get_string($handle, $note, "SEQUENCE"); my $seq = event_notification_get_string($handle, $note, "SEQUENCE");
my $rchunks = event_notification_get_string($handle, $note, "CHUNKS_RECV"); my $rchunks = event_notification_get_string($handle, $note, "CHUNKS_RECV");
my $dchunks = event_notification_get_string($handle, $note, "CHUNKS_DECOMP"); my $dchunks = event_notification_get_string($handle, $note, "CHUNKS_DECOMP");
my $wbytes = event_notification_get_string($handle, $note, "BYTES_WRITTEN"); my $wmbytes = event_notification_get_string($handle, $note, "MBYTES_WRITTEN");
if (!exists($images{$image})) { if (!exists($images{$image})) {
my $isize = `imageinfo -s $image`; my $isize = `imageinfo -s $image`;
...@@ -111,7 +111,7 @@ sub callbackFunc($$$) { ...@@ -111,7 +111,7 @@ sub callbackFunc($$$) {
$usize = 0; $usize = 0;
} }
$images{$image}{'chunks'} = int($isize / 1048576); $images{$image}{'chunks'} = int($isize / 1048576);
$images{$image}{'bytes'} = int($usize); $images{$image}{'mbytes'} = int($usize / 1048576);
} }
my ($rpct,$dpct,$wpct); my ($rpct,$dpct,$wpct);
$rpct = $dpct = $wpct = "??"; $rpct = $dpct = $wpct = "??";
...@@ -119,9 +119,9 @@ sub callbackFunc($$$) { ...@@ -119,9 +119,9 @@ sub callbackFunc($$$) {
$rpct = sprintf "%.1f", $rchunks / $images{$image}{'chunks'} * 100; $rpct = sprintf "%.1f", $rchunks / $images{$image}{'chunks'} * 100;
$dpct = sprintf "%.1f", $dchunks / $images{$image}{'chunks'} * 100; $dpct = sprintf "%.1f", $dchunks / $images{$image}{'chunks'} * 100;
} }
if ($images{$image}{'bytes'} > 0) { if ($images{$image}{'mbytes'} > 0) {
$wpct = sprintf "%.1f", $wbytes / $images{$image}{'bytes'} * 100; $wpct = sprintf "%.1f", $wmbytes / $images{$image}{'mbytes'} * 100;
} }
print "$node\@$tstamp: image=$image seq=$seq recv=$rchunks ($rpct\%)". print "$node\@$tstamp: image=$image seq=$seq recv=$rchunks ($rpct\%)".
" decomp=$dchunks ($dpct\%) bwritten=$wbytes ($wpct\%)\n"; " decomp=$dchunks ($dpct\%) bwritten=$wmbytes ($wpct\%)\n";
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment