Commit 8fd4b67e authored by Mike Hibler's avatar Mike Hibler

Changes:

Support for jumbo packets.  Setting WITH_JUMBO on the make command line
will change the image block size to 8192 bytes and reduces the number of
block per chunk to 256 (to maintain the 1MB chunk size for compat with old
images).  The default is still 1024.

Added the notion of a "dubious" chunk buffer in the client.  If an incoming
chunk buffer is marked as CHUNK_DUBIOUS, then its contents can be evicted and
the buffer reused for a more promising chunk.  This is a crude replacement
mechanism that is currently only used in one place: if we miss part of a
chunk and the server switches to sending a new chunk for which we have no
free buffer, we switch to collecting the new chunk.  The reasoning is that
it will take a while for the server to switch back to completing the former
chunk, during which time it may send one or more complete chunks that we
could more fruitfully use (decompress and write out).

Changed the meaning of the "done" field for a chunk.  It used to mean either
that we have completely processed the chunk or that we are currently collecting
it.  It took additional work (scanning all chunk buffers) to differentiate
these cases, so I make it explicit.

Allow the client and server to dynamically determine the maximum socket
buffer size.

Fix a couple more on-the-wire data structure size/alignment issues that
showed up on a 64-bit OS.

A few minor speedups to the bitmap handling code.  Think: "rearranging deck
chairs on the Titanic" here.  We need more serious algorithmic changes
to scale all this code going forward.

Add some more TRACE events and refine what is already there.

Added some hacks to allow frisbee client/server to run on the same machine.
We had made it remarkably hard to do this.  But then again, why would you
want to!  Look for SAME_HOST_HACK in the makefile.
parent f0885af0
......@@ -66,6 +66,24 @@ SERVEROBJS = server.o $(SHAREDOBJS)
CFLAGS = -O2 -g -Wall -fno-builtin-log $(LDSTATIC) $(PTHREADCFLAGS) -DSTATS
LDFLAGS = $(LDSTATIC)
#
# Define this to break chunks into 8192 byte rather than 1024 byte blocks
#
ifeq ($(WITH_JUMBO),1)
CFLAGS += -DJUMBO
endif
#
# Define this to run the client and server on the same physical machine
# over the loopback interface. You will also have to setup an alias on
# the loopback interface so that client and server can use different IPs:
#
# ifconfig lo0 alias 127.0.0.2 netmask 255.255.255.255
#
# and just use unicast and not multi/broadcast.
#
#CFLAGS += -DSAME_HOST_HACK
#
# Define this if your implementation of cond_vars works well
# ("works well" in this case means doesn't hang forever or burn up
......@@ -74,7 +92,7 @@ LDFLAGS = $(LDSTATIC)
#CFLAGS += -DCONDVARS_WORK
# Define this to a non-zero value to enable recording of trace data
#CFLAGS += -DNEVENTS=50000
#CFLAGS += -DNEVENTS=500000
# Turn on client event handling
#CFLAGS += -DDOEVENTS
......
......@@ -4,6 +4,8 @@
* All rights reserved.
*/
#undef OLD_SCHOOL
/*
* Frisbee client.
*
......@@ -54,6 +56,7 @@ int debug = 0;
int tracing = 0;
char traceprefix[64];
int randomize = 1;
int zero = 0;
int portnum;
struct in_addr mcastaddr;
struct in_addr mcastif;
......@@ -62,9 +65,9 @@ static struct in_addr serverip;
/* Forward Decls */
static void PlayFrisbee(void);
static void GotBlock(Packet_t *p);
static int GotBlock(Packet_t *p);
static void RequestChunk(int timedout);
static void RequestStamp(int chunk, int block, int count, void *arg);
static int RequestStamp(int chunk, int block, int count, void *arg);
static int RequestRedoTime(int chunk, unsigned long long curtime);
extern int ImageUnzipInit(char *filename, int slice, int debug, int zero,
int nothreads, int dostype, int dodots,
......@@ -79,12 +82,15 @@ extern int ImageUnzipQuit(void);
/*
* Chunk descriptor, one for each CHUNKSIZE*BLOCKSIZE bytes of an image file.
* For each chunk, record its state and the time at which it was last
* requested by someone. Ours indicates a previous request was made by us.
* requested by someone. The time stamp is "only" 61 bits. This could be a
* problem if packets arrive more than 73,000 years apart. But we'll take
* our chances...
*/
typedef struct {
unsigned long long lastreq:62;
unsigned long long ours:1;
unsigned long long done:1;
uint64_t lastreq:61;
uint64_t ours:1; /* last request was from us */
uint64_t seen:1; /* chunk is either filling or been processed */
uint64_t done:1; /* chunk has been fully processed */
} Chunk_t;
/*
......@@ -108,15 +114,18 @@ typedef struct {
#define CHUNK_EMPTY 0
#define CHUNK_FILLING 1
#define CHUNK_FULL 2
#define CHUNK_DUBIOUS 3
Chunk_t *Chunks; /* Chunk descriptors */
ChunkBuffer_t *ChunkBuffer; /* The cache */
int *ChunkRequestList; /* Randomized chunk request order */
int TotalChunkCount; /* Total number of chunks in file */
int IdleCounter; /* Countdown to request more data */
/* XXX imageunzip.c */
extern long long totaledata, totalrdata, totalddata;
extern unsigned long decompblocks, writeridles;
#ifdef STATS
extern unsigned long decompblocks, writeridles; /* XXX imageunzip.c */
ClientStats_t Stats;
#define DOSTAT(x) (Stats.u.v1.x)
#else
......@@ -157,7 +166,16 @@ void (*DiskIdleCallback)();
static void
WriterIdleCallback(int isidle)
{
CLEVENT(1, EV_CLIWRSTATUS, isidle, 0, 0, 0);
uint32_t hi, lo;
if (zero) {
hi = (totaledata >> 32);
lo = totaledata;
} else {
hi = (totalrdata >> 32);
lo = totalrdata;
}
CLEVENT(1, EV_CLIWRSTATUS, isidle, hi, lo, 0);
}
int
......@@ -165,7 +183,6 @@ main(int argc, char **argv)
{
int ch, mem;
char *filename;
int zero = 0;
int dostype = -1;
int slice = 0;
......@@ -237,8 +254,8 @@ main(int argc, char **argv)
mem = atoi(optarg);
if (mem < 1)
mem = 1;
else if (mem > 1024)
mem = 1024;
else if (mem > 32768)
mem = 32768;
maxchunkbufs = (mem * 1024 * 1024) /
sizeof(ChunkBuffer_t);
break;
......@@ -247,8 +264,8 @@ main(int argc, char **argv)
mem = atoi(optarg);
if (mem < 1)
mem = 1;
else if (mem > 1024)
mem = 1024;
else if (mem > 32768)
mem = 32768;
maxwritebufmem = mem;
break;
......@@ -256,8 +273,8 @@ main(int argc, char **argv)
mem = atoi(optarg);
if (mem < 2)
mem = 2;
else if (mem > 2048)
mem = 2048;
else if (mem > 65536)
mem = 65536;
maxmem = mem;
break;
......@@ -422,6 +439,11 @@ main(int argc, char **argv)
maxwritebufmem = maxmem/2;
}
/*
* Prepare the unzipper.
* This call fires off the disk writer thread as required.
* The writer thread synchronizes only with us (the decompresser).
*/
ImageUnzipInit(filename, slice, debug, zero, nothreads, dostype, 3,
maxwritebufmem*1024*1024);
......@@ -436,7 +458,7 @@ main(int argc, char **argv)
if (tracing) {
TraceStop();
TraceDump();
TraceDump(0);
}
ImageUnzipQuit();
......@@ -462,13 +484,17 @@ main(int argc, char **argv)
}
/*
* The client receive thread. This thread takes in packets from the server.
* The network receive (and send) thread. This thread takes in packets from the
* server. It is responsible for driving the protocol, making block requests
* as needed.
*
* XXX record time for entire download so that we can figure average download.
*/
void *
ClientRecvThread(void *arg)
{
Packet_t packet, *p = &packet;
int BackOff;
int IdleCounter, BackOff;
static int gotone;
if (debug)
......@@ -479,9 +505,10 @@ ClientRecvThread(void *arg)
* The IdleCounter is how many ticks we let pass without a
* useful block, before we make another request. We want that to
* be short, but not too short; we do not want to pummel the
* server.
* server. We initialize this to one so that we will issue an
* immediate first request to get the ball rolling.
*/
IdleCounter = idletimer;
IdleCounter = 1;
/*
* This is another throttling mechanism; avoid making repeated
......@@ -559,41 +586,70 @@ ClientRecvThread(void *arg)
continue;
}
CLEVENT(BackOff ? 1 : 3, EV_CLIGOTPKT,
pstamp.tv_sec, pstamp.tv_usec, 0, 0);
CLEVENT(BackOff ? 1 : (p->msg.block.block==0 ? 3 : 4),
EV_CLIGOTPKT, pstamp.tv_sec, pstamp.tv_usec,
0, 0);
#ifdef NEVENTS
needstamp = 1;
#endif
BackOff = 0;
GotBlock(p);
if (GotBlock(p)) {
/*
* Anytime we receive a packet thats needed,
* reset the idle counter. This will prevent
* us from sending too many requests.
*/
IdleCounter = idletimer;
}
/*
* We may have missed the request for this chunk/block
* so treat the arrival of a block as an indication
* that someone requested it.
*/
RequestStamp(p->msg.block.chunk, p->msg.block.block,
1, 0);
(void) RequestStamp(p->msg.block.chunk,
p->msg.block.block, 1, 0);
break;
case PKTSUBTYPE_REQUEST:
CLEVENT(4, EV_CLIREQMSG,
CLEVENT(3, EV_CLIREQMSG,
p->hdr.srcip, p->msg.request.chunk,
p->msg.request.block, p->msg.request.count);
RequestStamp(p->msg.request.chunk, p->msg.request.block,
p->msg.request.count, 0);
if (RequestStamp(p->msg.request.chunk,
p->msg.request.block,
p->msg.request.count, 0))
#ifndef OLD_SCHOOL
/*
* XXX experimental: Also reset timer if
* someone else requests a block we need.
* This is indicated by the Chunk stamp
* getting updated.
*/
IdleCounter = idletimer;
#else
;
#endif
break;
case PKTSUBTYPE_PREQUEST:
CLEVENT(4, EV_CLIPREQMSG,
CLEVENT(3, EV_CLIPREQMSG,
p->hdr.srcip, p->msg.request.chunk, 0, 0);
BlockMapApply(&p->msg.prequest.blockmap,
p->msg.prequest.chunk, RequestStamp, 0);
/*
* XXX could/should update the idlecounter but
* BlockMapApply doesn't return what we need
* to easily determine this.
*/
(void) BlockMapApply(&p->msg.prequest.blockmap,
p->msg.prequest.chunk,
RequestStamp, 0);
break;
case PKTSUBTYPE_JOIN:
case PKTSUBTYPE_LEAVE:
/* Ignore these. They are from other clients. */
CLEVENT(4, EV_OCLIMSG,
CLEVENT(3, EV_OCLIMSG,
p->hdr.srcip, p->hdr.subtype, 0, 0);
break;
}
......@@ -634,6 +690,8 @@ myexit(void)
/*
* The heart of the game.
* Fire off the network thread and wait for chunks to start appears.
* Synchronizes with the network thread via the chunk cache.
*/
static void
ChunkerStartup(void)
......@@ -773,14 +831,16 @@ ChunkerStartup(void)
pfatal("ImageUnzipChunk failed");
}
CLEVENT(1, EV_CLIDCDONE,
ChunkBuffer[i].thischunk, chunkcount,
decompblocks, writeridles);
CLEVENT(2, EV_CLIDCSTAT, (totalddata >> 32), totalddata, 0, 0);
/*
* Okay, free the slot up for another chunk.
*/
ChunkBuffer[i].state = CHUNK_EMPTY;
chunkcount--;
CLEVENT(1, EV_CLIDCDONE,
ChunkBuffer[i].thischunk, chunkcount,
decompblocks, writeridles);
}
/*
* Kill the child and wait for it before returning. We do not
......@@ -797,8 +857,6 @@ ChunkerStartup(void)
ImageUnzipFlush();
#ifdef STATS
{
extern long long totaledata, totalrdata;
Stats.u.v1.decompblocks = decompblocks;
Stats.u.v1.writeridles = writeridles;
Stats.u.v1.ebyteswritten = totaledata;
......@@ -820,8 +878,10 @@ ChunkerStartup(void)
* a re-request. The general strategy is: if a chunk request contains
* any blocks that we will be able to use, we update the stamp to delay
* what would otherwise be a redundant request.
*
* Returns one if the chunk was stamped.
*/
static void
static int
RequestStamp(int chunk, int block, int count, void *arg)
{
int stampme = 0;
......@@ -830,22 +890,28 @@ RequestStamp(int chunk, int block, int count, void *arg)
* If not doing delays, don't bother with the stamp
*/
if (redodelay == 0)
return;
return 0;
/*
* Chunk has been fully processed, no need to stamp.
*/
if (Chunks[chunk].done)
return 0;
/*
* Common case of a complete chunk request, always stamp.
* This will include chunks we have already written and wouldn't
* be re-requesting, but updating the stamp doesn't hurt anything.
* Either we have not seen this chunk or we are currently processing it.
*
* Common case of a complete chunk request, always stamp as there will
* be some data in it we need.
*/
if (block == 0 && count == CHUNKSIZE)
stampme = 1;
/*
* Else, request is for a partial chunk. If we are not currently
* processing this chunk, then the chunk data will be of use to
* us so we update the stamp. Again, this includes chunks we
* are already finished with, but no harm.
* us so we update the stamp.
*/
else if (! Chunks[chunk].done)
else if (! Chunks[chunk].seen)
stampme = 1;
/*
* Otherwise, this is a partial chunk request for which we have
......@@ -863,12 +929,22 @@ RequestStamp(int chunk, int block, int count, void *arg)
for (i = 0; i < maxchunkbufs; i++)
if (ChunkBuffer[i].thischunk == chunk &&
ChunkBuffer[i].state == CHUNK_FILLING)
(ChunkBuffer[i].state == CHUNK_FILLING ||
ChunkBuffer[i].state == CHUNK_DUBIOUS))
break;
if (i < maxchunkbufs &&
BlockMapIsAlloc(&ChunkBuffer[i].blockmap, block, count)
!= count)
stampme = 1;
!= count) {
stampme = 1;
/*
* Any block that was formerly of dubious value now
* has real value since someone has requested more.
*/
if (ChunkBuffer[i].state == CHUNK_DUBIOUS) {
CLEVENT(1, EV_CLIDUBPROMO, chunk, block, 0, 0);
ChunkBuffer[i].state = CHUNK_FILLING;
}
}
}
if (stampme) {
......@@ -879,6 +955,8 @@ RequestStamp(int chunk, int block, int count, void *arg)
(unsigned long long)tv.tv_sec * 1000000 + tv.tv_usec;
CLEVENT(5, EV_CLISTAMP, chunk, tv.tv_sec, tv.tv_usec, 0);
}
return stampme;
}
/*
......@@ -903,67 +981,145 @@ RequestRedoTime(int chunk, unsigned long long curtime)
* If the chunk buffer is full, then drop the block. If this happens, it
* indicates the chunk buffer is not big enough, and should be increased.
*/
static void
static int
GotBlock(Packet_t *p)
{
int chunk = p->msg.block.chunk;
int block = p->msg.block.block;
int i, free = -1;
int i, state, free = -1, dubious = -1;
static int lastnoroomchunk = -1, lastnoroomblocks, inprogress;
int nfull = 0, nfill = 0;
#ifndef OLD_SCHOOL
/*
* If we have already processed this chunk, bail now.
*/
if (Chunks[chunk].done) {
assert(Chunks[chunk].seen);
CLEVENT(3, EV_CLIDUPCHUNK, chunk, block, 0, 0);
DOSTAT(dupchunk++);
if (debug > 2)
log("Duplicate chunk %d data ignored!", chunk);
return 0;
}
#endif
/*
* Search the chunk buffer for a match (or a free one).
* Otherwise, search the chunk buffer for a match (or a free one).
*/
for (i = 0; i < maxchunkbufs; i++) {
if (ChunkBuffer[i].state == CHUNK_EMPTY) {
switch (ChunkBuffer[i].state) {
case CHUNK_FULL:
nfull++;
continue;
case CHUNK_EMPTY:
if (free == -1)
free = i;
continue;
case CHUNK_FILLING:
nfill++;
if (ChunkBuffer[i].thischunk == chunk)
break;
continue;
case CHUNK_DUBIOUS:
nfill++;
if (ChunkBuffer[i].thischunk == chunk)
break;
if (dubious == -1)
dubious = i;
continue;
default:
fatal("Unknown state %d for chunk %d",
ChunkBuffer[i].state, chunk);
}
if (ChunkBuffer[i].state == CHUNK_FILLING &&
ChunkBuffer[i].thischunk == chunk)
break;
break;
}
if (i == maxchunkbufs) {
#ifndef OLD_SCHOOL
assert(Chunks[chunk].seen == 0);
#endif
/*
* Did not find it. Allocate the free one, or drop the
* packet if there is no free chunk.
*/
if (free == -1) {
if (chunk != lastnoroomchunk) {
CLEVENT(1, EV_CLINOROOM, chunk, block,
lastnoroomblocks, 0);
lastnoroomchunk = chunk;
lastnoroomblocks = 0;
if (debug)
log("No free buffer for chunk %d!",
chunk);
/*
* No free blocks, but we do have a "dubious" block.
* If it looks like the start of a new chunk, toss
* the dubious block and reuse its buffer.
*/
if (dubious != -1 && block == 0) {
int dchunk = ChunkBuffer[dubious].thischunk;
assert(Chunks[dchunk].done == 0);
assert(Chunks[dchunk].seen == 1);
CLEVENT(1, EV_CLIREUSE, chunk, block,
dchunk, 0);
Chunks[dchunk].seen = 0;
Chunks[dchunk].lastreq = 0;
lastnoroomchunk = -1;
ChunkBuffer[dubious].state = CHUNK_EMPTY;
free = dubious;
inprogress--;
} else {
if (chunk != lastnoroomchunk) {
CLEVENT(1, EV_CLINOROOM, chunk, block,
nfull, nfill);
lastnoroomchunk = chunk;
lastnoroomblocks = 0;
if (debug)
log("No free buffer for chunk %d!",
chunk);
}
lastnoroomblocks++;
DOSTAT(nofreechunks++);
return 0;
}
lastnoroomblocks++;
DOSTAT(nofreechunks++);
return;
}
state = CHUNK_FILLING;
if (chunk == lastnoroomchunk
#ifdef OLD_SCHOOL
&& !Chunks[chunk].seen
#endif
) {
#ifndef OLD_SCHOOL
/*
* Here we have missed part of a chunk because of
* a lack of buffers. The part we missed is not
* likely to be resent for awhile. We go ahead and
* start collecting the chunk, but mark it as dubious
* in case a better offer comes along (i.e., the start
* of a new chunk we have not seen).
*/
state = CHUNK_DUBIOUS;
#endif
CLEVENT(1, EV_CLIFOUNDROOM, chunk, block,
lastnoroomblocks, 0);
}
lastnoroomchunk = -1;
lastnoroomblocks = 0;
#ifdef OLD_SCHOOL
/*
* Was this chunk already processed?
* If we have already processed this chunk, bail now.
*/
if (Chunks[chunk].done) {
assert(Chunks[chunk].seen);
CLEVENT(3, EV_CLIDUPCHUNK, chunk, block, 0, 0);
DOSTAT(dupchunk++);
if (debug > 2)
log("Duplicate chunk %d ignored!", chunk);
return;
log("Duplicate chunk %d data ignored!", chunk);
return 0;
}
Chunks[chunk].done = 1;
#endif
Chunks[chunk].seen = 1;
if (debug)
log("Starting chunk %d (buffer %d)", chunk, free);
i = free;
ChunkBuffer[i].state = CHUNK_FILLING;
ChunkBuffer[i].state = state;
ChunkBuffer[i].thischunk = chunk;
ChunkBuffer[i].blockcount = CHUNKSIZE;
bzero(&ChunkBuffer[i].blockmap,
......@@ -971,19 +1127,20 @@ GotBlock(Packet_t *p)
inprogress++;
CLEVENT(1, EV_CLISCHUNK, chunk, block, inprogress, 0);
}
assert(Chunks[chunk].seen);
/*
* Insert the block and update the metainfo. We have to watch for
* duplicate blocks in the same chunk since another client may
* issue a request for a lost block, and we will see that even if
* we do not need it (cause of broadcast/multicast).
* we do not need it (in the case of broadcast/multicast).
*/
if (BlockMapAlloc(&ChunkBuffer[i].blockmap, block)) {
CLEVENT(3, EV_CLIDUPBLOCK, chunk, block, 0, 0);
DOSTAT(dupblock++);
if (debug > 2)
log("Duplicate block %d in chunk %d", block, chunk);
return;
return 0;
}
ChunkBuffer[i].blockcount--;
memcpy(ChunkBuffer[i].blocks[block].data, p->msg.block.buf, BLOCKSIZE);
......@@ -997,33 +1154,40 @@ GotBlock(Packet_t *p)
if (lastchunk != -1 && chunk != lastchunk &&
lastchunk == ChunkBuffer[lastchunkbuf].thischunk &&
ChunkBuffer[lastchunkbuf].state == CHUNK_FILLING)
(ChunkBuffer[lastchunkbuf].state == CHUNK_FILLING ||
ChunkBuffer[lastchunkbuf].state == CHUNK_DUBIOUS))
CLEVENT(1, EV_CLILCHUNK, lastchunk, lastblock,
ChunkBuffer[lastchunkbuf].blockcount, 0);
lastchunkbuf = i;
lastchunk = chunk;
lastblock = block;
CLEVENT(3, EV_CLIBLOCK, chunk, block,
CLEVENT(4, EV_CLIBLOCK, chunk, block,
ChunkBuffer[i].blockcount, 0);
}
#endif
/*
* Anytime we receive a packet thats needed, reset the idle counter.
* This will prevent us from sending too many requests.
*/
IdleCounter = idletimer;
/*
* Is the chunk complete? If so, then release it to the main thread.
*/