Commit 2e77122f authored by Mike Hibler's avatar Mike Hibler

Server: back to using a condvar since they seem to be fixed.

Server: make file readsize independent of burstsize (previously
readsize had to be a divisor of burstsize).  A subtle side-effect
is that the dynamic burst rate is recalcluated at the conslusion
of every burst instead of after every readsize count of blocks has
been sent (less than a burst)  This just seems to be more logical.

Client: add "-T DOS-type" option to tell frisbee, when in slice
mode, to set the type of the slice in the DOS partition table.
This is useful if you are dropping say a BSD filesystem into
an unused slice, you don't have to go back later and set this
with fdisk.  Considered making this info part of the image
itself (recorded by imagezip when creating a slice image),
but decided against it.
parent a2ccf66a
......@@ -38,8 +38,7 @@ LDFLAGS = -static
#CFLAGS += -DDOEVENTS
#CLIENTOBJS += event.o $(OBJDIR)/event/lib/event.o $(OBJDIR)/event/lib/util.o
#CLIENTLIBS += `elvin-config --libs vin4c`
#EVENTFLAGS = $(CFLAGS) `elvin-config --cflags vin4c` \
-I$(TESTBED_SRCDIR)
#EVENTFLAGS = $(CFLAGS) `elvin-config --cflags vin4c` -I$(TESTBED_SRCDIR)
frisbee: $(CLIENTOBJS) ../imagezip/frisbee.o
$(CC) $(LDFLAGS) $(CLIENTFLAGS) $(CLIENTOBJS) $(CLIENTLIBS) -o frisbee
......
......@@ -47,10 +47,7 @@ int startdelay = 0;
int debug = 0;
int tracing = 0;
char traceprefix[64];
int zero = 0;
int randomize = 1;
int slice = 0;
int nothreads = 0;
int portnum;
struct in_addr mcastaddr;
struct in_addr mcastif;
......@@ -61,11 +58,11 @@ static int dotcol;
static void PlayFrisbee(void);
static void GotBlock(Packet_t *p);
static void RequestChunk(int timedout);
static int ImageUnzip(int chunk);
static void RequestStamp(int chunk, int block, int count);
static int RequestRedoTime(int chunk, unsigned long long curtime);
extern int ImageUnzipInit(char *filename, int slice,
int debug, int zero, int nothreads);
int debug, int zero, int nothreads, int dostype);
extern int ImageUnzipChunk(char *chunkdata);
extern void ImageUnzipFlush(void);
extern int ImageUnzipQuit(void);
......@@ -138,8 +135,12 @@ main(int argc, char **argv)
{
int ch;
char *filename;
int zero = 0;
int nothreads = 0;
int dostype = -1;
int slice = 0;
while ((ch = getopt(argc, argv, "dhp:m:s:i:tbznT:r:E:")) != -1)
while ((ch = getopt(argc, argv, "dhp:m:s:i:tbznT:r:E:D:")) != -1)
switch(ch) {
case 'd':
debug++;
......@@ -191,6 +192,10 @@ main(int argc, char **argv)
zero++;
break;
case 'D':
dostype = atoi(optarg);
break;
case 'h':
case '?':
default:
......@@ -273,6 +278,10 @@ main(int argc, char **argv)
nothreads = event.data.start.nothreads;
else
nothreads = 0;
if (event.data.start.dostype >= 0)
dostype = event.data.start.dostype;
else
dostype = -1;
if (event.data.start.debug >= 0)
debug = event.data.start.debug;
else
......@@ -285,10 +294,10 @@ main(int argc, char **argv)
log("Starting: slice=%d, startdelay=%d, zero=%d, "
"randomize=%d, nothreads=%d, debug=%d, tracing=%d, "
"pkttimeout=%d, idletimer=%d, redodelay=%d, "
"pkttimeout=%d, idletimer=%d, ideldelay=%d, redodelay=%d, "
"chunkbufs=%d maxreadahead=%d, maxinprogress=%d",
slice, startdelay, zero, randomize, nothreads,
debug, tracing, pkttimeout, idletimer, redodelay,
debug, tracing, pkttimeout, idletimer, idledelay, redodelay,
maxchunkbufs, maxreadahead, maxinprogress);
}
#endif
......@@ -296,7 +305,7 @@ main(int argc, char **argv)
redodelay = sleeptime(redodelay, "request retry delay");
idledelay = sleeptime(idledelay, "writer idle delay");
ImageUnzipInit(filename, slice, debug, zero, nothreads);
ImageUnzipInit(filename, slice, debug, zero, nothreads, dostype);
if (tracing) {
ClientTraceInit(traceprefix);
......@@ -560,7 +569,8 @@ ChunkerStartup(void)
}
}
ImageUnzip(i);
if (ImageUnzipChunk(ChunkBuffer[i].blocks[0].data))
pfatal("ImageUnzipChunk failed");
/*
* Okay, free the slot up for another chunk.
......@@ -1062,25 +1072,3 @@ PlayFrisbee(void)
#endif
log("\nLeft the team after %ld seconds on the field!", estamp.tv_sec);
}
/*
* Supply a read function to the imageunzip library. Kinda hokey right
* now. The original imageunzip code depends on read to keep track of
* the seek offset within the input file. Well, in our case we know what
* chunk the inflator is working on, and we keep track of the block offset
* as well.
*
* XXX We copy out the data. Need to change this interface to avoid that.
*/
extern int inflate_subblock(char *);
static int
ImageUnzip(int slot)
{
char *data = (char *) &ChunkBuffer[slot].blocks;
if (inflate_subblock(data))
pfatal("ImageUnzip: inflate_subblock failed");
return 0;
}
......@@ -60,8 +60,7 @@
/*
* The number of disk read blocks in a single read on the server.
* Must be an even divisor of CHUNKSIZE. Should also be an even
* divisor of SERVER_BURST_SIZE below.
* Must be an even divisor of CHUNKSIZE.
*/
#define SERVER_READ_SIZE 32
......
......@@ -106,6 +106,10 @@ parse_event(Event_t *event, char *etype, char *buf)
event->data.start.nothreads = val;
continue;
}
if (sscanf(cp, "DOSTYPE=%d", &val) == 1) {
event->data.start.dostype = val;
continue;
}
if (sscanf(cp, "DEBUG=%d", &val) == 1) {
event->data.start.debug = val;
continue;
......
......@@ -23,6 +23,7 @@ typedef struct {
int zerofill; /* non-0 to zero fill free space */
int randomize; /* non-0 to randomize request list */
int nothreads; /* non-0 to single thread unzip */
int dostype; /* DOS partition type to set */
int debug; /* debug level */
int trace; /* tracing level */
} start;
......
......@@ -73,6 +73,7 @@ struct {
unsigned long badpackets;
unsigned long blockslost;
unsigned long goesidle;
unsigned long wakeups;
} Stats;
#define DOSTAT(x) (Stats.x)
#else
......@@ -100,6 +101,7 @@ typedef struct {
} WQelem_t;
static queue_head_t WorkQ;
static pthread_mutex_t WorkQLock;
static pthread_cond_t WorkQCond;
static int WorkQDelay = -1;
static int WorkQSize = 0;
#ifdef STATS
......@@ -126,6 +128,7 @@ static void
WorkQueueInit(void)
{
pthread_mutex_init(&WorkQLock, NULL);
pthread_cond_init(&WorkQCond, NULL);
queue_init(&WorkQ);
if (WorkQDelay < 0)
......@@ -184,6 +187,8 @@ WorkQueueEnqueue(int chunk, int block, int blockcount)
WorkQMax = WorkQSize;
#endif
if (WorkQSize == 1)
pthread_cond_signal(&WorkQCond);
pthread_mutex_unlock(&WorkQLock);
EVENT(1, EV_WORKENQ, mcastaddr, chunk, block, blockcount, WorkQSize);
......@@ -195,16 +200,31 @@ WorkQueueDequeue(int *chunk, int *block, int *blockcount)
{
WQelem_t *wqel;
pthread_mutex_lock(&WorkQLock);
/*
* Condvars broken in linux threads impl, so use this rather bogus
* sleep to keep from churning cycles.
* Wait for up to WorkQDelay usec for work
*/
if (queue_empty(&WorkQ)) {
pthread_mutex_unlock(&WorkQLock);
fsleep(WorkQDelay);
return 0;
pthread_mutex_lock(&WorkQLock);
if (WorkQSize == 0) {
struct timeval tv;
struct timespec ts;
gettimeofday(&tv, 0);
ts.tv_nsec = tv.tv_usec * 1000 + WorkQDelay;
if (ts.tv_nsec >= 1000000000) {
ts.tv_sec = tv.tv_sec + 1;
ts.tv_nsec -= 1000000000;
} else
ts.tv_sec = tv.tv_sec;
do {
if (pthread_cond_timedwait(&WorkQCond,
&WorkQLock, &ts) != 0) {
pthread_mutex_unlock(&WorkQLock);
return 0;
}
if (WorkQSize == 0)
DOSTAT(wakeups++);
} while (WorkQSize == 0);
}
queue_remove_first(&WorkQ, wqel, WQelem_t *, chain);
......@@ -490,22 +510,21 @@ PlayFrisbee(void)
DOSTAT(blockssent++);
EVENT(3, EV_BLOCKMSG, mcastaddr,
chunk, block+j, 0, 0);
/*
* Completed a burst. Adjust the busrtsize
* if necessary and delay as required.
*/
if (++throttle >= burstsize) {
if (dynburst)
calcburst();
if (gapsize > 0)
fsleep(gapsize);
throttle = 0;
}
}
offset += readbytes;
block += readcount;
throttle += readcount;
/*
* Dynamically adjust the burst size if necessary
*/
if (dynburst)
calcburst();
if (throttle >= burstsize) {
if (gapsize > 0)
fsleep(gapsize);
throttle = 0;
}
}
}
free(databuf);
......@@ -605,11 +624,6 @@ main(int argc, char **argv)
}
}
if (readsize > burstsize ||
((burstsize / readsize) * readsize != burstsize))
fatal("readsize (%d) not at even divisor of burstsize (%d)",
readsize, burstsize);
if (!portnum || ! mcastaddr.s_addr)
usage();
......@@ -691,6 +705,7 @@ main(int argc, char **argv)
Stats.filereads, Stats.filebytes,
Stats.filebytes - FileInfo.blocks * BLOCKSIZE);
log(" net idle/blocked: %d/%d", Stats.goesidle, nonetbufs);
log(" spurious wakeups: %d", Stats.wakeups);
log(" max workq size: %d", WorkQMax);
}
#endif
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment