Commit 2ff95cee authored by Mike Hibler's avatar Mike Hibler

Commit of USENIX driven improvements:

1. Client: add "NAK avoidance."  We track our (and others, via snooping) block
   requests and avoid making re-requests unless it has been "long enough."

2. Server: more aggressive merging of requests in the work queue.  For every
   new request, look for any overlap with an existing entry.

3. Server: from Leigh: first cut at dynamic rate adjustment.  Can be enabled
   with -D option.

4. Both: change a lot of the magic constants into runtime variables so that
   they can be adjusted on the command line or via the event interface (see
   below).

5. Add code to do basic validatation of incoming packets.

6. Client: randomization of block request order is now optional.

7. Client: startup delay is optional and specified via a parameter N which
   says "randomly delay between 0 and N seconds before attempting to join."

8. Both: add a new LEAVE message which reports back all the client stats to
   the server (which logs them).

9. Both: attempt to comment some of the magic values in decls.h.

10. Both: add cheezy hack to fake packet loss.  Disabled by default, see
   the GNUmakefile.  This code is coming out right after I archive it with
   this commit.

11. Add tracing code.  Frisbee server/client will record a number of
   interesting events in a memory buffer and dump them at the end.  Not
   compiled in by default, see the GNUmakefile (NEVENTS) for turning this on.

12. Not to be confused with the events above, also added testbed event system
   code so that frisbee clients can be remotely controlled.  This is a hack
   for measurement purposes (it requires a special rc.frisbee in the frisbee
   MFS).  Allows changing of all sorts of parameters as well as implementing
   a crude form of identification allowing you to start only a subset of
   clients.  Interface is via tevc with commands like:
	tevc -e testbed,frisbee now frisbee start maxclients=5 readahead=5
	tevc -e testbed,frisbee now frisbee stop exitstatus=42
   Again, this is not compiled in by default as it makes the client about
   4x bigger.  See the GNUmakefile for turning it on.
parent e416a8cc
......@@ -31,6 +31,19 @@ SERVEROBJS = server.o $(SHAREDOBJS)
CFLAGS = -O2 -g -Wall -static $(PTHREADCFLAGS) -DSTATS
LDFLAGS = -static
# Define this to a non-zero value to enable recording of trace data
#CFLAGS += -DNEVENTS=20000
# Turn on server packet loss rate ability
#CFLAGS += -DDOLOSSRATE
# Turn on client event handling
#CFLAGS += -DDOEVENTS
#CLIENTOBJS += event.o $(OBJDIR)/event/lib/event.o $(OBJDIR)/event/lib/util.o
#CLIENTLIBS += `elvin-config --libs vin4c`
#EVENTFLAGS = $(CFLAGS) `elvin-config --cflags vin4c` \
-I$(TESTBED_SRCDIR)
frisbee: $(CLIENTOBJS) ../imagezip/frisbee.o
$(CC) $(LDFLAGS) $(CLIENTFLAGS) $(CLIENTOBJS) $(CLIENTLIBS) -o frisbee
cp frisbee frisbee.debug
......@@ -41,6 +54,10 @@ frisbeed: $(SERVEROBJS)
cp frisbeed frisbeed.debug
strip frisbeed
event.o: $(SRCDIR)/event.c decls.h log.h event.h
$(CC) $(EVENTFLAGS) -c $(SRCDIR)/event.c
client.o: decls.h log.h trace.h
server.o: decls.h log.h trace.h
log.o: decls.h log.h
......
This diff is collapsed.
......@@ -25,6 +25,12 @@
*/
#define MAXCHUNKBUFS 64
/*
* Socket buffer size, used for both send and receive in client and
* server right now.
*/
#define SOCKBUFSIZE (128 * 1024)
/*
* The number of read-ahead chunks that the client will request
* at a time. No point in requesting too far ahead either, since they
......@@ -54,9 +60,102 @@
/*
* The number of disk read blocks in a single read on the server.
* Must be an even divisor of CHUNKSIZE.
* Must be an even divisor of CHUNKSIZE. Should also be an even
* divisor of SERVER_BURST_SIZE below.
*/
#define MAXREADBLOCKS 32
#define SERVER_READ_SIZE 32
/*
* Parameters for server network usage:
*
* SERVER_BURST_SIZE Max BLOCKSIZE packets sent in a burst.
* Should be a multiple of SERVER_READ_SIZE
* Should be less than SOCKBUFSIZE/BLOCKSIZE,
* bursts of greater than the send socket
* buffer size are almost certain to cause
* lost packets.
* SERVER_BURST_GAP Delay in usec between output bursts.
* Given the typical scheduling granularity
* of 10ms for most unix systems, this
* will likely be set to either 0 or 10000.
*
* Together with the BLOCKSIZE, these two params form a theoretical upper
* bound on bandwidth consumption for the server. That upper bound (for
* ethernet) is:
*
* (1000000 / SERVER_BURST_GAP) # bursts per second
* * (BLOCKSIZE + 42) * SERVER_BURST_SIZE # * wire size of a burst
*
* which for the default 1k packets, gap of 10ms and burst of 64 packets
* is about 6.8MB/sec. In practice, the server is ultimately throttled by
* clients' ability to generate requests which is limited by their ability
* to decompress and write to disk.
*/
#define SERVER_BURST_SIZE 64
#define SERVER_BURST_GAP 10000
/*
* Max burst size when doing dynamic bandwidth adjustment.
* Needs to be large enough to induce loss.
*/
#define SERVER_DYNBURST_SIZE (SOCKBUFSIZE/BLOCKSIZE)
/*
* How long (in usecs) to wait before re-reqesting a chunk.
* It will take the server more than:
*
* (CHUNKSIZE/SERVER_BURST_SIZE) * SERVER_BURST_GAP
*
* usec (0.16 sec with defaults) for each each chunk it pumps out,
* and we conservatively assume that there are a fair number of other
* chunks that must be processed before it gets to our chunk.
*/
#define CLIENT_REQUEST_REDO_DELAY \
(10 * ((CHUNKSIZE/SERVER_BURST_SIZE)*SERVER_BURST_GAP))
/*
* How long for the writer to sleep if there are no blocks currently
* ready to write. Allow a full server burst period, assuming that
* something in the next burst will complete a block.
*/
#define CLIENT_WRITER_IDLE_DELAY SERVER_BURST_GAP
/*
* Client parameters and statistics.
*/
#define CLIENT_STATS_VERSION 1
typedef struct {
int version;
union {
struct {
int runsec;
int runmsec;
int delayms;
unsigned long long rbyteswritten;
unsigned long long ebyteswritten;
int chunkbufs;
int maxreadahead;
int maxinprogress;
int pkttimeout;
int startdelay;
int idletimer;
int idledelay;
int redodelay;
int randomize;
unsigned long nochunksready;
unsigned long nofreechunks;
unsigned long dupchunk;
unsigned long dupblock;
unsigned long lostblocks;
unsigned long recvidles;
unsigned long joinattempts;
unsigned long requests;
unsigned long decompidles;
unsigned long writeridles;
} v1;
unsigned long limit[256];
} u;
} ClientStats_t;
/*
* Packet defs.
......@@ -102,6 +201,15 @@ typedef struct {
int block;
int count; /* Number of blocks */
} request;
/*
* Leave reporting client params/stats
*/
struct {
unsigned int clientid;
int elapsed;
ClientStats_t stats;
} leave2;
} msg;
} Packet_t;
#define PKTTYPE_REQUEST 1
......@@ -111,6 +219,7 @@ typedef struct {
#define PKTSUBTYPE_LEAVE 2
#define PKTSUBTYPE_BLOCK 3
#define PKTSUBTYPE_REQUEST 4
#define PKTSUBTYPE_LEAVE2 5
/*
* Protos.
......@@ -118,10 +227,13 @@ typedef struct {
int ClientNetInit(void);
int ServerNetInit(void);
int PacketReceive(Packet_t *p);
int PacketSend(Packet_t *p);
int PacketReply(Packet_t *p);
void PacketSend(Packet_t *p, int *resends);
void PacketReply(Packet_t *p);
int PacketValid(Packet_t *p, int nchunks);
char *CurrentTimeString(void);
int sleeptime(unsigned int usecs, char *str);
int fsleep(unsigned int usecs);
void ClientStatsDump(unsigned int id, ClientStats_t *stats);
/*
* Globals
......@@ -132,3 +244,4 @@ extern int broadcast;
extern struct in_addr mcastaddr;
extern struct in_addr mcastif;
extern char *filename;
extern int clockres;
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2002 University of Utah and the Flux Group.
* All rights reserved.
*/
/*
* Testbed event system interface
* Allow starting multiple clients in a synchronized way.
*/
#include <stdio.h>
#include <stdlib.h>
#include <netdb.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include "lib/libtb/tbdefs.h"
#include "event/lib/event.h"
#include "decls.h"
#include "log.h"
#include "event.h"
//#define EVENTDEBUG
static Event_t lastevent;
static event_handle_t ehandle;
static int gotevent;
static int clientnum;
/*
* type==START
* STAGGER=N PKTTIMEOUT=N IDLETIMER=N READAHEAD=N INPROGRESS=N REDODELAY=N
* type==STOP
* STAGGER=N
*/
static int
parse_event(Event_t *event, char *etype, char *buf)
{
char *cp;
int val;
memset(event, -1, sizeof *event);
if (strcmp(etype, TBDB_EVENTTYPE_START) == 0)
event->type = EV_START;
else if (strcmp(etype, TBDB_EVENTTYPE_STOP) == 0)
event->type = EV_STOP;
else
return 1;
cp = buf;
if (cp != NULL) {
while ((cp = strsep(&buf, " ")) != NULL) {
if (sscanf(cp, "STARTDELAY=%d", &val) == 1) {
event->data.start.startdelay = val;
continue;
}
if (sscanf(cp, "PKTTIMEOUT=%d", &val) == 1) {
event->data.start.pkttimeout = val;
continue;
}
if (sscanf(cp, "IDLETIMER=%d", &val) == 1) {
event->data.start.idletimer = val;
continue;
}
if (sscanf(cp, "CHUNKBUFS=%d", &val) == 1) {
event->data.start.chunkbufs = val;
continue;
}
if (sscanf(cp, "READAHEAD=%d", &val) == 1) {
event->data.start.readahead = val;
continue;
}
if (sscanf(cp, "INPROGRESS=%d", &val) == 1) {
event->data.start.inprogress = val;
continue;
}
if (sscanf(cp, "REDODELAY=%d", &val) == 1) {
event->data.start.redodelay = val;
continue;
}
if (sscanf(cp, "IDLEDELAY=%d", &val) == 1) {
event->data.start.idledelay = val;
continue;
}
if (sscanf(cp, "SLICE=%d", &val) == 1) {
event->data.start.slice = val;
continue;
}
if (sscanf(cp, "ZEROFILL=%d", &val) == 1) {
event->data.start.zerofill = val;
continue;
}
if (sscanf(cp, "RANDOMIZE=%d", &val) == 1) {
event->data.start.randomize = val;
continue;
}
if (sscanf(cp, "NOTHREADS=%d", &val) == 1) {
event->data.start.nothreads = val;
continue;
}
if (sscanf(cp, "DEBUG=%d", &val) == 1) {
event->data.start.debug = val;
continue;
}
if (sscanf(cp, "TRACE=%d", &val) == 1) {
event->data.start.trace = val;
continue;
}
if (sscanf(cp, "EXITSTATUS=%d", &val) == 1) {
event->data.stop.exitstatus = val;
continue;
}
/*
* Hideous Hack Alert!
*
* Assume our hostname is of the form 'c-<num>.<domain>'
* and use <num> to determine our client number.
* We use that number and compare to the maxclients
* field to determine if we should process this event.
* If not, we ignore this event.
*/
if (sscanf(cp, "MAXCLIENTS=%d", &val) == 1) {
if (clientnum >= val) {
gotevent = 0;
return 0;
}
continue;
}
}
}
gotevent = 1;
return 0;
}
static void
callback(event_handle_t handle, event_notification_t notification, void *data)
{
char buf[7][64];
char args[256];
int len = 64;
buf[0][0] = buf[1][0] = buf[2][0] = buf[3][0] = 0;
buf[4][0] = buf[5][0] = buf[6][0] = 0;
event_notification_get_site(handle, notification, buf[0], len);
event_notification_get_expt(handle, notification, buf[1], len);
event_notification_get_group(handle, notification, buf[2], len);
event_notification_get_host(handle, notification, buf[3], len);
event_notification_get_objtype(handle, notification, buf[4], len);
event_notification_get_objname(handle, notification, buf[5], len);
event_notification_get_eventtype(handle, notification, buf[6], len);
event_notification_get_arguments(handle, notification,
args, sizeof(args));
#ifdef EVENTDEBUG
{
struct timeval now;
static int ecount;
gettimeofday(&now, NULL);
fprintf(stderr, "Event %d: %lu.%03lu %s %s %s %s %s %s %s %s\n",
++ecount, now.tv_sec, now.tv_usec / 1000,
buf[0], buf[1], buf[2],
buf[3], buf[4], buf[5], buf[6], args);
}
#endif
if (parse_event(&lastevent, buf[6], args))
log("bogus event '%s %s' ignored", buf[6], args);
}
int
EventInit(char *server)
{
char buf[BUFSIZ], ipbuf[BUFSIZ];
struct hostent *he;
struct in_addr myip;
char *ipaddr;
address_tuple_t tuple;
if (server == NULL) {
warning("no event server specified");
return 1;
}
if (gethostname(buf, sizeof(buf)) < 0) {
pwarning("could not get hostname");
return 1;
}
if ((he = gethostbyname(buf)) == NULL) {
warning("could not get IP address from hostname");
return 1;
}
memcpy((char *)&myip, he->h_addr, he->h_length);
strcpy(ipbuf, inet_ntoa(myip));
ipaddr = ipbuf;
#if 1
/*
* Hideous Hack Alert!
*
* Assume our hostname is of the form 'c-<num>.<domain>'
* and use <num> to determine our client number.
* We use that number later to compare to the MAXCLIENTS
* field to determine if we should process an event.
*/
if (sscanf(buf, "c-%d.", &clientnum) != 1) {
warning("could not determine client number from hostname %s",
buf);
return 1;
} else if (debug)
log("client number %d for event handling", clientnum);
#endif
/*
* Convert server/port to elvin thing.
*/
snprintf(buf, sizeof(buf), "elvin://%s", server);
server = buf;
/*
* Construct an address tuple for subscribing to events for
* this node.
*/
tuple = address_tuple_alloc();
if (tuple == NULL) {
warning("could not allocate an address tuple");
return 1;
}
tuple->host = ADDRESSTUPLE_ANY; /* ipaddr; */
tuple->site = ADDRESSTUPLE_ANY;
tuple->group = ADDRESSTUPLE_ANY;
tuple->expt = ADDRESSTUPLE_ANY;
tuple->objtype = TBDB_OBJECTTYPE_FRISBEE;
tuple->objname = ADDRESSTUPLE_ANY;
tuple->eventtype = ADDRESSTUPLE_ANY;
/*
* Register with the event system.
*/
ehandle = event_register(server, 0);
if (ehandle == NULL) {
warning("could not register with event system");
address_tuple_free(tuple);
return 1;
}
/*
* Subscribe to the event we specified above.
*/
if (!event_subscribe(ehandle, callback, tuple, NULL)) {
warning("could not subscribe to FRISBEE events");
address_tuple_free(tuple);
return 1;
}
address_tuple_free(tuple);
return 0;
}
int
EventCheck(Event_t *event)
{
int rv;
gotevent = 0;
rv = event_poll(ehandle);
if (rv)
fatal("event_poll failed, err=%d\n", rv);
if (gotevent)
memcpy(event, &lastevent, sizeof lastevent);
return gotevent;
}
void
EventWait(int eventtype, Event_t *event)
{
while (EventCheck(event) == 0 ||
(eventtype != EV_ANY && event->type != eventtype))
/* sleep? */;
}
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2002 University of Utah and the Flux Group.
* All rights reserved.
*/
/*
* Event defs
*/
typedef struct {
int type;
union {
struct {
int startdelay; /* range in sec of start interval */
int pkttimeout; /* packet timeout in usec */
int idletimer; /* idle timer in pkt timeouts */
int chunkbufs; /* max receive buffers */
int readahead; /* max readahead in packets */
int inprogress; /* max packets in progress */
int redodelay; /* redo delay in usec */
int idledelay; /* writer idle delay in usec */
int slice; /* disk slice to write */
int zerofill; /* non-0 to zero fill free space */
int randomize; /* non-0 to randomize request list */
int nothreads; /* non-0 to single thread unzip */
int debug; /* debug level */
int trace; /* tracing level */
} start;
struct {
int exitstatus;
} stop;
} data;
} Event_t;
#define EV_ANY 0
#define EV_START 1
#define EV_STOP 2
extern int EventInit(char *server);
extern int EventCheck(Event_t *event);
extern void EventWait(int eventtype, Event_t *event);
......@@ -78,6 +78,22 @@ warning(const char *fmt, ...)
va_end(args);
}
void
error(const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
if (!usesyslog) {
vfprintf(stderr, fmt, args);
fflush(stderr);
}
else
vsyslog(LOG_ERR, fmt, args);
va_end(args);
}
void
fatal(const char *fmt, ...)
{
......
......@@ -21,6 +21,24 @@
#include <errno.h>
#include "decls.h"
#ifdef DOLOSSRATE
extern int lossrate;
#endif
#ifdef STATS
#ifdef DOLOSSRATE
unsigned long spackets, spacketslost;
unsigned long rpackets, rpacketslost;
#endif
unsigned long nonetbufs;
#define DOSTAT(x) (x)
#else
#define DOSTAT(x)
#endif
/* Time in usec to delay waiting for more buffer space on sends */
#define NOBUF_DELAY 1000
/* Max number of times to attempt bind to port before failing. */
#define MAXBINDATTEMPTS 10
......@@ -29,6 +47,7 @@
static int sock;
struct in_addr myipaddr;
static int nobufdelay = -1;
int broadcast = 0;
static void
......@@ -43,10 +62,10 @@ CommonInit(void)
if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
pfatal("Could not allocate a socket");
i = (128 * 1024);
i = SOCKBUFSIZE;
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &i, sizeof(i));
i = (128 * 1024);
i = SOCKBUFSIZE;
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &i, sizeof(i));
name.sin_family = AF_INET;
......@@ -144,6 +163,13 @@ CommonInit(void)
memcpy((char *)&myipaddr, he->h_addr, sizeof(myipaddr));
}
/*
* Compute the out of buffer space delay
*/
if (nobufdelay < 0)
nobufdelay = sleeptime(NOBUF_DELAY,
"out of socket buffer space delay");
}
int
......@@ -174,16 +200,47 @@ int
PacketReceive(Packet_t *p)
{
struct sockaddr_in from;
int mlen, alen = sizeof(from);
int mlen, alen;
bzero(&from, sizeof(from));
#ifdef DOLOSSRATE
struct timeval now, then;
if (lossrate) {
/*
* XXX cannot rely on socket timeout value since we need to
* treat received and dropped packets as though they never
* arrived. This is still not correct as a receive timeout
* could still be up to twice as long as it should be, but
* I don't want to mess with the socket timeout on every
* recv call.
*/
gettimeofday(&then, 0);
if ((then.tv_usec += PKTRCV_TIMEOUT) >= 1000000) {
then.tv_sec++;
then.tv_usec -= 1000000;
}
again:
gettimeofday(&now, 0);
if (timercmp(&now, &then, >=))
return -1;
}
#endif
alen = sizeof(from);
bzero(&from, alen);
if ((mlen = recvfrom(sock, p, sizeof(*p), 0,
(struct sockaddr *)&from, &alen)) < 0) {
if (errno == EWOULDBLOCK)
return -1;
pfatal("PacketReceive(recvfrom)");
}
#ifdef DOLOSSRATE
DOSTAT(rpackets++);
if (lossrate && random() < lossrate) {
DOSTAT(rpacketslost++);
goto again;
}