Commit 9e55b0b1 authored by Mike Hibler's avatar Mike Hibler

Frisbee general:

1. Implement PREQUEST message which passes a bit map of desired blocks.
   We still use the REQUEST message (start block + number of blocks) for
   full chunk requests as that is more efficient.  This message also
   includes a flag indicating whether it is a retry of a request we
   originally made or not.  This gives the server more accurate loss info.

2. More stats and tracing goo.


Frisbee client:

1. Add 'C' and 'W' command line options to specify amount of memory
   for chunk buffers (network buffering) and for write buffers (disk
   buffering).  The Emulab frisbee startup script uses these to partition
   up all the available memory on a machine.  Previously we were just
   using a fixed ~128MB even though our machines have 256 or 512MB of
   memory.  Also add the 'M' option which specifies the overall memory,
   internally dividing it up between chunk buffers and write buffers.

2. Add 'S' command line option to explicitly specify the server.  This
   allows us to make a feeb...um, "lightweight" authentication check
   on incoming messages.

3. Use the common BlockMap data struct to track which pieces of a chunk
   we have received.  This is easily inverted to make PREQUESTS and it is
   also smaller than the older byte-per-block technique.

4. Allow partial request-ahead.  Previously, we only issued request-ahead
   if there were enough empty chunk buffers for a maximum (2) request-ahead.

Frisbee server:

1. Use BlockMap for workQ elements.  An easy way to allow a complete merge
   of incoming requests with existing ones.

2. Check for overlap of incoming requests with the request currently
   being serviced.  This happens surprisingly often.

3. Dubious: burst gap becomes burst interval.  The latter takes into
   account the time required to read data, etc., in other words, we now
   have variable-sized gaps and put out bursts at specific times rather
   than having fixed gaps and putting out bursts at variable times.
   This gives us more accurate pacing over shorter time periods.  I
   thought this might be important for dynamic pacing.

4. Add 'W' command line option to specify a target bandwidth.  Frisbeed
   will use this to calculate a burst size/interval.

5. Rewrote the dynamic pacing code.  It is now easily as bad as before
   if not worse.  But it does have fewer magic constants!  Needs to be
   redone by someone who understands the TCP-friendly rate equation.

Imagezip:

1. add 'R' option to specify one or more partitions for which to force
   raw (naive) compression even if the FS format is understood.  Useful
   for benchmarking.

2. add 'D' option to allow "dangerous" writes.  In this mode, we don't
   do the fsync's or retries of failed writes.  Overrides the hack we put
   in for NFS.  Use this if writing to a local filesystem (or /dev/null).

3. Eliminate an extra copy of every chunk header.

Imageunzip:

1. Eliminate extra copy of decompressed data that we were doing between
   the single decompression buffer and the disk buffers.  Helps on slow
   machines (like gatech's 300Mhz machines with 66MHz memory bus).

2. Allow dynamic number of variable-sized write buffers.  Total memory
   not to exceed the writebufmem limit.  Previously we had a small number
   of fixed-size (256K) buffers.

3. Add debugging 'C' option to just compute a single CRC of the decompressed
   image.  Back-ported to older imageunzip and used to make sure my write
   buffer changes were correct.  Maybe handy for similar massive changes
   in the future.
parent 42709260
......@@ -33,8 +33,14 @@ SERVEROBJS = server.o $(SHAREDOBJS)
CFLAGS = -O2 -g -Wall -static $(PTHREADCFLAGS) -DSTATS
LDFLAGS = -static
# Hacky loss rate flag
#CFLAGS += -DDOLOSSRATE
# Define this if you implementation of cond_vars works well
#CFLAGS += -DCONDVARS_WORK
# Define this to a non-zero value to enable recording of trace data
#CFLAGS += -DNEVENTS=20000
#CFLAGS += -DNEVENTS=25000
# Turn on client event handling
#CFLAGS += -DDOEVENTS
......@@ -61,9 +67,10 @@ $(FRISBEEDIR)/imageunzip.c: $(FRISBEEDIR)/imagehdr.h $(FRISBEEDIR)/queue.h
frisbee.o: $(FRISBEEDIR)/imageunzip.c
$(CC) -c $(CFLAGS) -DFRISBEE -I$(FRISBEEDIR) -o frisbee.o $<
client.o: decls.h log.h trace.h
server.o: decls.h log.h trace.h
client.o: decls.h log.h utils.h trace.h
server.o: decls.h log.h utils.h trace.h
log.o: decls.h log.h
network.o: decls.h utils.h
trace.o: decls.h trace.h log.h
install: $(INSTALL_SBINDIR)/frisbeed
......
......@@ -33,3 +33,65 @@
when the rate is too high. Good news: it is symmetric with what the server
currently does. Bad news: harder to map this rate to an adjustment than
it is with the queue-size-estimate method.
2. Auto-adjust readahead on the client.
Similar to #1 the client should track the level of activity on the
server and increase its readahead accordingly. For example, if we are
the only client, we could increase our readahead.
3. Eliminate client-side copy of compressed data.
Right now we read packets into a local packet buffer and then, for
BLOCK messages, copy the data out to the chunk buffers. This results
in a complete copy of the compressed data. If we make a chunk buffer
into an array of pointers to data buffers, we can read packets into
these data buffers and link them straight into the chunk buffers.
The downside is that we must modify the already gruesome decompression
loop to deal with input buffer boundaries in addition to region
and writer buffer boundaries.
4. Multi-thread the frisbee server.
We can make our network output intervals more consistant if we
separate the disk reader from the network writer. This would have a
performance benefit for the imageunzip program which currently
combines the reader and decompresser having only a separate writer
thread.
5. Investigate large block/chunk sizes.
Most importantly would be to increase block size from 1024 to something
approaching the 1448 max (given current packet format). Constraint:
number of blocks in a chunk should be a multiple of 8 since we use a
bitmap to track blocks. This is not strictly necessary, it would just
be nice and the BlockMap routines might require a little tweaking ow.
Maybe should be a multiple of 32 to ensure bitmap is a multiple of 4
in size. Large chunk issues: 1) more potential wasted space per chunk,
though mostly only in the last chunk, 2) It takes longer to accumulate
chunks at the client, potentially idling the decompesser and writer,
3) takes more space to accumulate chunks, allowing for fewer in progress
chunks. So maybe 1448B/blk * 768 blks/chunk == 1.06MB/chunk. PREQUEST
BlockMaps come down from 128 bytes to 96.
6. Dynamic rate pacing in the server.
Our attempts to date have been pretty feeble. I think we have a
reasonable loss metric now, just need a smooth weighted decay formula
we can use. Look at the proposed standard TCP-friendly rate equation.
PROBLEMS:
1. Have seen the clients run out of socket buffer space causing them
to lose packets when still well short of the network bandwidth (at
~70Mb/sec). Not sure why. One thing we know is that the decompress
thread will almost certainly run for a full scheduling interval (1ms)
everytime. Thus we have to have enough buffering in the card and socket
buffers to handle 1ms of data. With the default params, we are only
putting out 8 packets every 1ms, so that shouldn't be an issue.
Assuming that we are getting it off the card in time, that means the
network thread is either not running frequently enough, or it is spending
too much time doing other things (like copying packet data, see #3 above).
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2002 University of Utah and the Flux Group.
* Copyright (c) 2000-2003 University of Utah and the Flux Group.
* All rights reserved.
*/
......@@ -23,7 +23,9 @@
#include <signal.h>
#include <stdarg.h>
#include <pthread.h>
#include <assert.h>
#include "decls.h"
#include "utils.h"
#include "trace.h"
#ifdef DOEVENTS
......@@ -36,13 +38,18 @@ static int exitstatus;
/* Tunable constants */
int maxchunkbufs = MAXCHUNKBUFS;
int maxwritebufmem = MAXWRITEBUFMEM;
int maxmem = 0;
int pkttimeout = PKTRCV_TIMEOUT;
int idletimer = CLIENT_IDLETIMER_COUNT;
int maxreadahead = MAXREADAHEAD;
int maxinprogress = MAXINPROGRESS;
int redodelay = CLIENT_REQUEST_REDO_DELAY;
int idledelay = CLIENT_WRITER_IDLE_DELAY;
int startdelay = 0;
int startdelay = 0, startat = 0;
#ifdef DOLOSSRATE
int lossrate = 0;
#endif
int debug = 0;
int tracing = 0;
......@@ -51,17 +58,20 @@ int randomize = 1;
int portnum;
struct in_addr mcastaddr;
struct in_addr mcastif;
static struct timeval stamp;
static int dotcol;
static struct timeval stamp;
static struct in_addr serverip;
/* Forward Decls */
static void PlayFrisbee(void);
static void GotBlock(Packet_t *p);
static void RequestChunk(int timedout);
static void RequestStamp(int chunk, int block, int count);
static void RequestStamp(int chunk, int block, int count, void *arg);
static int RequestRedoTime(int chunk, unsigned long long curtime);
extern int ImageUnzipInit(char *filename, int slice,
int debug, int zero, int nothreads, int dostype);
extern int ImageUnzipInit(char *filename, int slice, int debug, int zero,
int nothreads, int dostype,
unsigned long writebufmem);
extern void ImageUnzipSetMemory(unsigned long writebufmem);
extern int ImageUnzipChunk(char *chunkdata);
extern void ImageUnzipFlush(void);
extern int ImageUnzipQuit(void);
......@@ -69,11 +79,12 @@ extern int ImageUnzipQuit(void);
/*
* Chunk descriptor, one for each CHUNKSIZE*BLOCKSIZE bytes of an image file.
* For each chunk, record its state and the time at which it was last
* requested by someone.
* requested by someone. Ours indicates a previous request was made by us.
*/
typedef struct {
unsigned long long lastreq:62;
unsigned long long ours:1;
unsigned long long done:1;
unsigned long long lastreq:63;
} Chunk_t;
/*
......@@ -86,11 +97,11 @@ typedef struct {
* is complete and ready to write to disk.
*/
typedef struct {
int thischunk; /* Which chunk in progress */
int inprogress; /* Chunk in progress? Free/Allocated */
int ready; /* Ready to write to disk? */
int blockcount; /* Number of blocks not received yet */
char bitmap[CHUNKSIZE]; /* Which blocks have been received */
int thischunk; /* Which chunk in progress */
int inprogress; /* Chunk in progress? Free/Allocated */
int ready; /* Ready to write to disk? */
int blockcount; /* Number of blocks not received yet */
BlockMap_t blockmap; /* Which blocks have been received */
struct {
char data[BLOCKSIZE];
} blocks[CHUNKSIZE]; /* Actual block data */
......@@ -103,6 +114,7 @@ int TotalChunkCount; /* Total number of chunks in file */
int IdleCounter; /* Countdown to request more data */
#ifdef STATS
extern unsigned long decompblocks, writeridles; /* XXX imageunzip.c */
ClientStats_t Stats;
#define DOSTAT(x) (Stats.u.v1.x)
#else
......@@ -133,14 +145,14 @@ usage()
int
main(int argc, char **argv)
{
int ch;
int ch, mem;
char *filename;
int zero = 0;
int nothreads = 0;
int dostype = -1;
int slice = 0;
while ((ch = getopt(argc, argv, "dhp:m:s:i:tbznT:r:E:D:")) != -1)
while ((ch = getopt(argc, argv, "dhp:m:s:i:tbznT:r:E:D:C:W:S:M:")) != -1)
switch(ch) {
case 'd':
debug++;
......@@ -180,6 +192,14 @@ main(int argc, char **argv)
slice = atoi(optarg);
break;
case 'S':
if (!inet_aton(optarg, &serverip)) {
fprintf(stderr, "Invalid server IP `%s'\n",
optarg);
exit(1);
}
break;
case 't':
tracing++;
break;
......@@ -196,6 +216,34 @@ main(int argc, char **argv)
dostype = atoi(optarg);
break;
case 'C':
mem = atoi(optarg);
if (mem < 1)
mem = 1;
else if (mem > 1024)
mem = 1024;
maxchunkbufs = (mem * 1024 * 1024) /
sizeof(ChunkBuffer_t);
break;
case 'W':
mem = atoi(optarg);
if (mem < 1)
mem = 1;
else if (mem > 1024)
mem = 1024;
maxwritebufmem = mem;
break;
case 'M':
mem = atoi(optarg);
if (mem < 2)
mem = 2;
else if (mem > 2048)
mem = 2048;
maxmem = mem;
break;
case 'h':
case '?':
default:
......@@ -230,6 +278,10 @@ main(int argc, char **argv)
startdelay = event.data.start.startdelay;
else
startdelay = 0;
if (event.data.start.startat > 0)
startat = event.data.start.startat;
else
startat = 0;
if (event.data.start.pkttimeout >= 0)
pkttimeout = event.data.start.pkttimeout;
else
......@@ -243,6 +295,16 @@ main(int argc, char **argv)
maxchunkbufs = event.data.start.chunkbufs;
else
maxchunkbufs = MAXCHUNKBUFS;
if (event.data.start.writebufmem >= 0 &&
event.data.start.writebufmem < 4096)
maxwritebufmem = event.data.start.writebufmem;
else
maxwritebufmem = MAXWRITEBUFMEM;
if (event.data.start.maxmem >= 0 &&
event.data.start.maxmem < 4096)
maxmem = event.data.start.maxmem;
else
maxmem = 0;
if (event.data.start.readahead >= 0 &&
event.data.start.readahead <= maxchunkbufs)
maxreadahead = event.data.start.readahead;
......@@ -286,26 +348,54 @@ main(int argc, char **argv)
debug = event.data.start.debug;
else
debug = 0;
if (event.data.start.trace >= 0) {
if (event.data.start.trace >= 0)
tracing = event.data.start.trace;
traceprefix[0] = 0;
} else
else
tracing = 0;
if (event.data.start.traceprefix[0] > 0)
strncpy(traceprefix, event.data.start.traceprefix, 64);
else
traceprefix[0] = 0;
#ifdef DOLOSSRATE
if (event.data.start.plr >= 0.0 && event.data.start.plr <= 1.0)
lossrate = (int)(event.data.start.plr * 0x7fffffff);
else
lossrate = 0;
#endif
log("Starting: slice=%d, startdelay=%d, zero=%d, "
log("Starting: slice=%d, startat=%d, startdelay=%d, zero=%d, "
"randomize=%d, nothreads=%d, debug=%d, tracing=%d, "
"pkttimeout=%d, idletimer=%d, ideldelay=%d, redodelay=%d, "
"chunkbufs=%d maxreadahead=%d, maxinprogress=%d",
slice, startdelay, zero, randomize, nothreads,
"pkttimeout=%d, idletimer=%d, idledelay=%d, redodelay=%d, "
#ifdef DOLOSSRATE
"plr=%.2f, "
#endif
"maxmem=%d, chunkbufs=%d, maxwritebumfem=%d, "
"maxreadahead=%d, maxinprogress=%d",
slice, startat, startdelay, zero, randomize, nothreads,
debug, tracing, pkttimeout, idletimer, idledelay, redodelay,
maxchunkbufs, maxreadahead, maxinprogress);
#ifdef DOLOSSRATE
lossrate ? event.data.start.plr : 0,
#endif
maxmem, maxchunkbufs, maxwritebufmem,
maxreadahead, maxinprogress);
}
#endif
redodelay = sleeptime(redodelay, "request retry delay");
idledelay = sleeptime(idledelay, "writer idle delay");
redodelay = sleeptime(redodelay, "request retry delay", 0);
idledelay = sleeptime(idledelay, "writer idle delay", 0);
ImageUnzipInit(filename, slice, debug, zero, nothreads, dostype);
/*
* Set initial memory limits. These may be adjusted when we
* find out how big the image is.
*/
if (maxmem != 0) {
/* XXX divide it up 50/50 */
maxchunkbufs = (maxmem/2 * 1024*1024) / sizeof(ChunkBuffer_t);
maxwritebufmem = maxmem/2;
}
ImageUnzipInit(filename, slice, debug, zero, nothreads, dostype,
maxwritebufmem*1024*1024);
if (tracing) {
ClientTraceInit(traceprefix);
......@@ -349,6 +439,7 @@ ClientRecvThread(void *arg)
{
Packet_t packet, *p = &packet;
int BackOff;
static int gotone;
if (debug)
log("Receive pthread starting up ...");
......@@ -394,10 +485,11 @@ ClientRecvThread(void *arg)
* see that block for longer than our timeout period,
* leading us to issue another request, etc.
*/
if (PacketReceive(p) < 0) {
if (PacketReceive(p) != 0) {
pthread_testcancel();
if (--IdleCounter <= 0) {
DOSTAT(recvidles++);
if (gotone)
DOSTAT(recvidles++);
CLEVENT(2, EV_CLIRTIMO,
pstamp.tv_sec, pstamp.tv_usec, 0, 0);
#ifdef NEVENTS
......@@ -415,6 +507,7 @@ ClientRecvThread(void *arg)
continue;
}
pthread_testcancel();
gotone = 1;
if (! PacketValid(p, TotalChunkCount)) {
log("received bad packet %d/%d, ignored",
......@@ -424,22 +517,47 @@ ClientRecvThread(void *arg)
switch (p->hdr.subtype) {
case PKTSUBTYPE_BLOCK:
CLEVENT(2, EV_CLIGOTPKT,
/*
* Ensure blocks comes from where we expect.
* The validity of hdr.srcip has already been checked.
*/
if (serverip.s_addr != 0 &&
serverip.s_addr != p->hdr.srcip) {
struct in_addr tmp = { p->hdr.srcip };
log("received BLOCK from non-server %s",
inet_ntoa(tmp));
continue;
}
CLEVENT(3, EV_CLIGOTPKT,
pstamp.tv_sec, pstamp.tv_usec, 0, 0);
#ifdef NEVENTS
needstamp = 1;
#endif
BackOff = 0;
GotBlock(p);
/*
* We may have missed the request for this chunk/block
* so treat the arrival of a block as an indication
* that someone requested it.
*/
RequestStamp(p->msg.block.chunk, p->msg.block.block,
1, 0);
break;
case PKTSUBTYPE_REQUEST:
CLEVENT(4, EV_CLIREQMSG,
p->hdr.srcip, p->msg.request.chunk,
p->msg.request.block, p->msg.request.count);
RequestStamp(p->msg.request.chunk,
p->msg.request.block,
p->msg.request.count);
RequestStamp(p->msg.request.chunk, p->msg.request.block,
p->msg.request.count, 0);
break;
case PKTSUBTYPE_PREQUEST:
CLEVENT(4, EV_CLIPREQMSG,
p->hdr.srcip, p->msg.request.chunk, 0, 0);
BlockMapApply(&p->msg.prequest.blockmap,
p->msg.prequest.chunk, RequestStamp, 0);
break;
case PKTSUBTYPE_JOIN:
......@@ -458,25 +576,25 @@ ClientRecvThread(void *arg)
static void
ChunkerStartup(void)
{
pthread_t child_pid;
void *ignored;
int chunkcount = TotalChunkCount;
int i;
pthread_t child_pid;
void *ignored;
int chunkcount = TotalChunkCount;
int i, wasidle = 0;
static int gotone;
/*
* Allocate the chunk descriptors, request list and cache buffers.
*/
if ((Chunks =
(Chunk_t *) calloc(chunkcount, sizeof(*Chunks))) == NULL)
Chunks = calloc(chunkcount, sizeof(*Chunks));
if (Chunks == NULL)
fatal("Chunks: No more memory");
if ((ChunkRequestList =
(int *) calloc(chunkcount, sizeof(*ChunkRequestList))) == NULL)
ChunkRequestList = calloc(chunkcount, sizeof(*ChunkRequestList));
if (ChunkRequestList == NULL)
fatal("ChunkRequestList: No more memory");
if ((ChunkBuffer =
(ChunkBuffer_t *) malloc(maxchunkbufs * sizeof(ChunkBuffer_t)))
== NULL)
ChunkBuffer = malloc(maxchunkbufs * sizeof(ChunkBuffer_t));
if (ChunkBuffer == NULL)
fatal("ChunkBuffer: No more memory");
/*
......@@ -519,15 +637,6 @@ ChunkerStartup(void)
* Loop until all chunks have been received and written to disk.
*/
while (chunkcount) {
#ifdef DOEVENTS
Event_t event;
if (eventserver != NULL &&
EventCheck(&event) && event.type == EV_STOP) {
log("Aborted after %d chunks",
TotalChunkCount-chunkcount);
break;
}
#endif
/*
* Search the chunk cache for a chunk that is ready to write.
*/
......@@ -540,20 +649,36 @@ ChunkerStartup(void)
* XXX should be a condition variable.
*/
if (i == maxchunkbufs) {
CLEVENT(1, EV_CLIWRIDLE, 0, 0, 0, 0);
if (debug)
log("No chunks ready to write!");
DOSTAT(nochunksready++);
#ifdef DOEVENTS
Event_t event;
if (eventserver != NULL &&
EventCheck(&event) && event.type == EV_STOP) {
log("Aborted after %d chunks",
TotalChunkCount-chunkcount);
break;
}
#endif
if (!wasidle) {
CLEVENT(1, EV_CLIWRIDLE, 0, 0, 0, 0);
if (debug)
log("No chunks ready to write!");
}
if (gotone)
DOSTAT(nochunksready++);
fsleep(idledelay);
wasidle++;
continue;
}
gotone = 1;
/*
* We have a completed chunk. Write it to disk.
*/
if (debug)
log("Writing chunk %d (buffer %d)",
ChunkBuffer[i].thischunk, i);
log("Writing chunk %d (buffer %d) after idle=%d.%03d",
ChunkBuffer[i].thischunk, i,
(wasidle*idledelay) / 1000000,
((wasidle*idledelay) % 1000000) / 1000);
else {
struct timeval estamp;
......@@ -569,6 +694,11 @@ ChunkerStartup(void)
}
}
CLEVENT(1, EV_CLIWRSTART,
ChunkBuffer[i].thischunk, wasidle,
decompblocks, writeridles);
wasidle = 0;
if (ImageUnzipChunk(ChunkBuffer[i].blocks[0].data))
pfatal("ImageUnzipChunk failed");
......@@ -579,7 +709,8 @@ ChunkerStartup(void)
ChunkBuffer[i].inprogress = 0;
chunkcount--;
CLEVENT(1, EV_CLIWRDONE,
ChunkBuffer[i].thischunk, chunkcount, 0, 0);
ChunkBuffer[i].thischunk, chunkcount,
decompblocks, writeridles);
}
/*
* Kill the child and wait for it before returning. We do not
......@@ -596,10 +727,9 @@ ChunkerStartup(void)
ImageUnzipFlush();
#ifdef STATS
{
extern unsigned long decompidles, writeridles;
extern long long totaledata, totalrdata;
Stats.u.v1.decompidles = decompidles;
Stats.u.v1.decompblocks = decompblocks;
Stats.u.v1.writeridles = writeridles;
Stats.u.v1.ebyteswritten = totaledata;
Stats.u.v1.rbyteswritten = totalrdata;
......@@ -622,7 +752,7 @@ ChunkerStartup(void)
* what would otherwise be a redundant request.
*/
static void
RequestStamp(int chunk, int block, int count)
RequestStamp(int chunk, int block, int count, void *arg)
{
int stampme = 0;
......@@ -659,21 +789,17 @@ RequestStamp(int chunk, int block, int count)
* further request (i.e., we don't stamp).
*/
else {
int i, j;
int i;
for (i = 0; i < maxchunkbufs; i++)
if (ChunkBuffer[i].thischunk == chunk &&
ChunkBuffer[i].inprogress &&
!ChunkBuffer[i].ready)
break;
if (i < maxchunkbufs) {
for (j = 0; j < count; j++)
if (! ChunkBuffer[i].bitmap[block+j]) {
stampme = 1;
break;
}
}
if (i < maxchunkbufs &&
BlockMapIsAlloc(&ChunkBuffer[i].blockmap, block, count)
!= count)
stampme = 1;
}
if (stampme) {
......@@ -714,6 +840,7 @@ GotBlock(Packet_t *p)
int chunk = p->msg.block.chunk;
int block = p->msg.block.block;
int i, free = -1;
static int lastnoroomchunk = -1, lastnoroomblocks, inprogress;
/*
* Search the chunk buffer for a match (or a free one).
......@@ -735,13 +862,21 @@ GotBlock(Packet_t *p)
* packet if there is no free chunk.
*/
if (free == -1) {
CLEVENT(1, EV_CLINOROOM, chunk, block, 0, 0);
if (chunk != lastnoroomchunk) {
CLEVENT(1, EV_CLINOROOM, chunk, block,
lastnoroomblocks, 0);
lastnoroomchunk = chunk;
lastnoroomblocks = 0;
if (debug)
log("No free buffer for chunk %d!",
chunk);
}
lastnoroomblocks++;
DOSTAT(nofreechunks++);
if (debug)
log("No more free buffer slots for chunk %d!",
chunk);
return;
}
lastnoroomchunk = -1;
lastnoroomblocks = 0;
/*
* Was this chunk already processed?
......@@ -749,7 +884,7 @@ GotBlock(Packet_t *p)
if (Chunks[chunk].done) {
CLEVENT(3, EV_CLIDUPCHUNK, chunk, block, 0, 0);
DOSTAT(dupchunk++);
if (0)
if (debug > 2)
log("Duplicate chunk %d ignored!", chunk);
return;
}
......@@ -763,8 +898,10 @@ GotBlock(Packet_t *p)
ChunkBuffer[i].inprogress = 1;
ChunkBuffer[i].thischunk = chunk;
ChunkBuffer[i].blockcount = CHUNKSIZE;
bzero(ChunkBuffer[i].bitmap, sizeof(ChunkBuffer[i].bitmap));
CLEVENT(1, EV_CLISCHUNK, chunk, block, 0, 0);
bzero(&ChunkBuffer[i].blockmap,
sizeof(ChunkBuffer[i].blockmap));
inprogress++;
CLEVENT(1, EV_CLISCHUNK, chunk, block, inprogress, 0);
}
/*
......@@ -773,17 +910,34 @@ GotBlock(Packet_t *p)
* issue a request for a lost block, and we will see that even if
* we do not need it (cause of broadcast/multicast).
*/
if (ChunkBuffer[i].bitmap[block]) {
if (BlockMapAlloc(&ChunkBuffer[i].blockmap, block)) {
CLEVENT(3, EV_CLIDUPBLOCK, chunk, block, 0, 0);
DOSTAT(dupblock++);
if (0)
if (debug > 2)
log("Duplicate block %d in chunk %d", block, chunk);
return;
}
ChunkBuffer[i].bitmap[block] = 1;
ChunkBuffer[i].blockcount--;
memcpy(ChunkBuffer[i].blocks[block].data, p->msg.block.buf, BLOCKSIZE);
CLEVENT(3, EV_CLIBLOCK, chunk, block, ChunkBuffer[i].blockcount, 0);
#ifdef NEVENTS
/*
* If we switched chunks before completing the previous, make a note.
*/
{
static int lastchunk = -1, lastblock, lastchunkbuf;
if (lastchunk != -1 && chunk != lastchunk &&
lastchunk == ChunkBuffer[lastchunkbuf].thischunk &&
ChunkBuffer[lastchunkbuf].ready == 0)
CLEVENT(1, EV_CLILCHUNK, lastchunk, lastblock, 0, 0);
lastchunkbuf = i;
lastchunk = chunk;
lastblock = block;
CLEVENT(3, EV_CLIBLOCK, chunk, block,
ChunkBuffer[i].blockcount, 0);
}
#endif
/*
* Anytime we receive a packet thats needed, reset the idle counter.
......@@ -795,7 +949,8 @@ GotBlock(Packet_t *p)
* Is the chunk complete? If so, then release it to the main thread.
*/
if (ChunkBuffer[i].blockcount == 0) {
CLEVENT(1, EV_CLIECHUNK, chunk, block, 0, 0);
inprogress--;
CLEVENT(1, EV_CLIECHUNK, chunk, block, inprogress, 0);
if (debug)
log("Releasing chunk %d to main thread", chunk);
ChunkBuffer[i].ready = 1;
......@@ -810,6 +965,44 @@ GotBlock(Packet_t *p)
}
}
/*
* Request a chunk/block/range we do not have.
*/
static void
RequestMissing(int chunk, BlockMap_t *map, int count)
{
Packet_t packet, *p = &packet;
if (debug)
log("Requesting missing blocks of chunk:%d", chunk);
p->hdr.type = PKTTYPE_REQUEST;
p->hdr.subtype = PKTSUBTYPE_PREQUEST;
p->hdr.datalen = sizeof(p->msg.prequest);
p->msg.prequest.chunk = chunk;
p->msg.prequest.retries = Chunks[chunk].ours;
BlockMapInvert(map, &p->msg.prequest.blockmap);
PacketSend(p, 0);
#ifdef STATS
assert(count==BlockMapIsAlloc(&p->msg.prequest.blockmap,0,CHUNKSIZE));
if (count == 0)
log("Request 0 blocks from chunk %d", chunk);
Stats.u.v1.lostblocks += count;
Stats.u.v1.requests++;
if (Chunks[chunk].ours)
Stats.u.v1.rerequests++;
#endif
CLEVENT(1, EV_CLIPREQ, chunk, count, 0, 0);
/*
* Since stamps are per-chunk and we wouldn't be here
* unless we were requesting something we are missing
* we can just unconditionally stamp the chunk.
*/