Commit 78f3f8d3 authored by Mike Hibler's avatar Mike Hibler

Improve regular (non-image) file transfer via frisbee.

Basically, make it possible to transfer a non imagezip image.  Previously
you had to wrap a regular file as an image in order to transfer it.  The
big hang up was that the frisbee protocol could only transfer files that
were a multiple of 1MB (the chunk size).

This commit changes the frisbee protocol slightly to allow transfer of
non-1MB-multiple files.  The protocol change was to add a new JOIN message
that returns the size of the file in bytes rather than in blocks.  This
allows the client to know that the file in question is not a multiple of 1MB
and allows it to request the correct partial number of blocks for the
final chunk and to extract the correct amount of data from the final 1K block
(that block is still padded to 1K by the server).  For the server side, the
request mostly allows it to do some sanity checking.  The fact that the
server is started with a file that is not a multiple of 1MB is what triggers
it to know about partial chunks.  The sanity checking is that the server will
not acknowledge clients that attempt to join with a version 1 JOIN message,
since nothing good would come of that pairing.

On the client side, frisbee must be invoked with the -N (nodecompress) option
in order to issue a v2 JOIN.  See the comment in the code for the rationale,
but it is largely a backward compat feature.

While I was changing the JOIN message, I added a couple of other future
features.  One is that by passing back a 64-bit value for the size of the
image in bytes, we can feed bigger images.  However there is still much to
be done to realize this.  The other was to add blocksize/chunksize fields
in the message so that the server/client can negotiate the transfer parameters,
e.g., 1024 blocks of 1024 bytes vs. 256 blocks of 8192 bytes, the latter being
for "jumbo" packets on a Gb ethernet.  But there is still more to be done to
get this working too.
parent 0569e748
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2009 University of Utah and the Flux Group.
* Copyright (c) 2000-2010 University of Utah and the Flux Group.
* All rights reserved.
*/
......@@ -74,13 +74,13 @@ extern int ImageUnzipInit(char *filename, int slice, int debug, int zero,
unsigned long writebufmem);
extern void ImageUnzipSetChunkCount(unsigned long chunkcount);
extern void ImageUnzipSetMemory(unsigned long writebufmem);
extern int ImageWriteChunk(int chunkno, char *chunkdata);
extern int ImageUnzipChunk(char *chunkdata);
extern int ImageWriteChunk(int chunkno, char *chunkdata, int chunksize);
extern int ImageUnzipChunk(char *chunkdata, int chunksize);
extern void ImageUnzipFlush(void);
extern int ImageUnzipQuit(void);
/*
* Chunk descriptor, one for each CHUNKSIZE*BLOCKSIZE bytes of an image file.
* Chunk descriptor, one per MAXCHUNKSIZE*MAXBLOCKSIZE bytes of an image file.
* For each chunk, record its state and the time at which it was last
* requested by someone. The time stamp is "only" 61 bits. This could be a
* problem if packets arrive more than 73,000 years apart. But we'll take
......@@ -108,8 +108,8 @@ typedef struct {
int blockcount; /* Number of blocks not received yet */
BlockMap_t blockmap; /* Which blocks have been received */
struct {
char data[BLOCKSIZE];
} blocks[CHUNKSIZE]; /* Actual block data */
char data[MAXBLOCKSIZE];
} blocks[MAXCHUNKSIZE]; /* Actual block data */
} ChunkBuffer_t;
#define CHUNK_EMPTY 0
#define CHUNK_FILLING 1
......@@ -143,6 +143,7 @@ char *usagestr =
" -z Zero fill unused block ranges (default is to seek past).\n"
" -b Use broadcast instead of multicast\n"
" -n Do not use extra threads in diskwriter\n"
" -N Do not decompress the received data, just write to output.\n"
" -p portnum Specify a port number.\n"
" -m mcastaddr Specify a multicast address in dotted notation.\n"
" -i mcastif Specify a multicast interface in dotted notation.\n"
......@@ -651,6 +652,7 @@ ClientRecvThread(void *arg)
break;
case PKTSUBTYPE_JOIN:
case PKTSUBTYPE_JOIN2:
case PKTSUBTYPE_LEAVE:
/* Ignore these. They are from other clients. */
CLEVENT(3, EV_OCLIMSG,
......@@ -761,6 +763,8 @@ ChunkerStartup(void)
* Loop until all chunks have been received and written to disk.
*/
while (chunkcount) {
int chunkbytes;
/*
* Search the chunk cache for a chunk that is ready to write.
*/
......@@ -815,9 +819,10 @@ ChunkerStartup(void)
/*
* We have a completed chunk. Write it to disk.
*/
chunkbytes = ChunkBytes(ChunkBuffer[i].thischunk);
if (debug)
log("Writing chunk %d (buffer %d) after idle=%d.%03d",
ChunkBuffer[i].thischunk, i,
log("Writing chunk %d (buffer %d), size %d, after idle=%d.%03d",
ChunkBuffer[i].thischunk, i, chunkbytes,
(wasidle*idledelay) / 1000000,
((wasidle*idledelay) % 1000000) / 1000);
......@@ -828,17 +833,19 @@ ChunkerStartup(void)
if (nodecompress) {
if (ImageWriteChunk(ChunkBuffer[i].thischunk,
ChunkBuffer[i].blocks[0].data))
ChunkBuffer[i].blocks[0].data,
chunkbytes))
pfatal("ImageWriteChunk failed");
} else {
if (ImageUnzipChunk(ChunkBuffer[i].blocks[0].data))
if (ImageUnzipChunk(ChunkBuffer[i].blocks[0].data,
chunkbytes))
pfatal("ImageUnzipChunk failed");
}
CLEVENT(1, EV_CLIDCDONE,
ChunkBuffer[i].thischunk, chunkcount,
CLEVENT(1, EV_CLIDCDONE, ChunkBuffer[i].thischunk,
chunkbytes, chunkcount, 0);
CLEVENT(2, EV_CLIDCSTAT, (totalddata >> 32), totalddata,
decompblocks, writeridles);
CLEVENT(2, EV_CLIDCSTAT, (totalddata >> 32), totalddata, 0, 0);
/*
* Okay, free the slot up for another chunk.
......@@ -908,7 +915,7 @@ RequestStamp(int chunk, int block, int count, void *arg)
* Common case of a complete chunk request, always stamp as there will
* be some data in it we need.
*/
if (block == 0 && count == CHUNKSIZE)
if (block == 0 && count == ChunkSize(chunk))
stampme = 1;
/*
* Else, request is for a partial chunk. If we are not currently
......@@ -971,8 +978,12 @@ static int
RequestRedoTime(int chunk, unsigned long long curtime)
{
if (Chunks[chunk].lastreq == 0 || redodelay == 0 ||
(int)(curtime - Chunks[chunk].lastreq) >= redodelay)
(int)(curtime - Chunks[chunk].lastreq) >= redodelay) {
CLEVENT(5, EV_CLIREDO, chunk,
Chunks[chunk].lastreq/1000000,
Chunks[chunk].lastreq%1000000, 0);
return 1;
}
return 0;
}
......@@ -1134,7 +1145,7 @@ GotBlock(Packet_t *p)
i = free;
ChunkBuffer[i].state = state;
ChunkBuffer[i].thischunk = chunk;
ChunkBuffer[i].blockcount = CHUNKSIZE;
ChunkBuffer[i].blockcount = ChunkSize(chunk);
bzero(&ChunkBuffer[i].blockmap,
sizeof(ChunkBuffer[i].blockmap));
inprogress++;
......@@ -1157,7 +1168,8 @@ GotBlock(Packet_t *p)
return 0;
}
ChunkBuffer[i].blockcount--;
memcpy(ChunkBuffer[i].blocks[block].data, p->msg.block.buf, BLOCKSIZE);
memcpy(ChunkBuffer[i].blocks[block].data,
p->msg.block.buf, BlockSize(chunk, block));
#ifdef NEVENTS
goodblocksrecv++;
......@@ -1223,19 +1235,30 @@ static void
RequestMissing(int chunk, BlockMap_t *map, int count)
{
Packet_t packet, *p = &packet;
int csize = ChunkSize(chunk);
if (debug)
log("Requesting missing blocks of chunk:%d", chunk);
log("Requesting %d missing blocks of chunk:%d", count, chunk);
p->hdr.type = PKTTYPE_REQUEST;
p->hdr.subtype = PKTSUBTYPE_PREQUEST;
p->hdr.datalen = sizeof(p->msg.prequest);
p->msg.prequest.chunk = chunk;
p->msg.prequest.retries = Chunks[chunk].ours;
/*
* Invert the map of what we have so we request everything we
* don't have, but be careful not to request anything beyond the
* end of a partial chunk. Note that we use MAXCHUNKSIZE as the
* upper bound size size CHUNKSIZE may be less than that even for
* "full-sized" image chunks.
*/
BlockMapInvert(map, &p->msg.prequest.blockmap);
if (csize < MAXCHUNKSIZE)
BlockMapClear(&p->msg.prequest.blockmap,
csize, MAXCHUNKSIZE - csize);
PacketSend(p, 0);
#ifdef STATS
assert(count == BlockMapIsAlloc(&p->msg.prequest.blockmap,0,CHUNKSIZE));
assert(count == BlockMapIsAlloc(&p->msg.prequest.blockmap, 0, CHUNKSIZE));
if (count == 0)
log("Request 0 blocks from chunk %d", chunk);
Stats.u.v1.lostblocks += count;
......@@ -1250,7 +1273,7 @@ RequestMissing(int chunk, BlockMap_t *map, int count)
* unless we were requesting something we are missing
* we can just unconditionally stamp the chunk.
*/
RequestStamp(chunk, 0, CHUNKSIZE, (void *)1);
RequestStamp(chunk, 0, csize, (void *)1);
Chunks[chunk].ours = 1;
}
......@@ -1368,7 +1391,7 @@ RequestChunk(int timedout)
* is considered a read-ahead to us.
*/
if (timedout || RequestRedoTime(chunk, stamp))
RequestRange(chunk, 0, CHUNKSIZE);
RequestRange(chunk, 0, ChunkSize(chunk));
/*
* Even if we did not just request the block, we still
......@@ -1440,6 +1463,7 @@ PlayFrisbee(void)
gettimeofday(&timeo, 0);
while (1) {
struct timeval now;
int32_t subtype = PKTSUBTYPE_JOIN;
gettimeofday(&now, 0);
if (timercmp(&timeo, &now, <=)) {
......@@ -1454,9 +1478,29 @@ PlayFrisbee(void)
CLEVENT(1, EV_CLIJOINREQ, myid, 0, 0, 0);
DOSTAT(joinattempts++);
p->hdr.type = PKTTYPE_REQUEST;
p->hdr.subtype = PKTSUBTYPE_JOIN;
p->hdr.datalen = sizeof(p->msg.join);
p->msg.join.clientid = myid;
/*
* Unless they have specified the -N option, continue
* to use the V1 JOIN which tells the server only to
* let us join if the image being requested is a
* traditional, 1MB padded image.
*
* Two reasons for this: 1) right now the client
* code for decompressing images has not been modified
* to handle non-padded images, and 2) this gives us
* some degree of backward compatibility for a new
* client talking to an old (pre-JOINv2) server.
*/
if (!nodecompress) {
subtype = p->hdr.subtype = PKTSUBTYPE_JOIN;
p->hdr.datalen = sizeof(p->msg.join);
p->msg.join2.clientid = myid;
} else {
subtype = p->hdr.subtype = PKTSUBTYPE_JOIN2;
p->hdr.datalen = sizeof(p->msg.join2);
p->msg.join2.clientid = myid;
p->msg.join2.chunksize = MAXCHUNKSIZE;
p->msg.join2.blocksize = MAXBLOCKSIZE;
}
PacketSend(p, 0);
timeo.tv_sec = 0;
timeo.tv_usec = 500000;
......@@ -1468,15 +1512,19 @@ PlayFrisbee(void)
* we get a reply back.
*/
if (PacketReceive(p) == 0 &&
p->hdr.subtype == PKTSUBTYPE_JOIN &&
p->hdr.subtype == subtype &&
p->hdr.type == PKTTYPE_REPLY) {
CLEVENT(1, EV_CLIJOINREP,
p->msg.join.blockcount, BLOCKSIZE, 0, 0);
CHUNKSIZE, BLOCKSIZE,
(p->msg.join2.bytecount >> 32),
p->msg.join2.bytecount);
break;
}
}
gettimeofday(&timeo, 0);
TotalChunkCount = p->msg.join.blockcount / CHUNKSIZE;
InitSizes(p->msg.join2.chunksize, p->msg.join2.blocksize,
p->msg.join2.bytecount);
TotalChunkCount = TotalChunks();
ImageUnzipSetChunkCount(TotalChunkCount);
/*
......@@ -1498,9 +1546,9 @@ PlayFrisbee(void)
}
log("Joined the team after %d sec. ID is %u. "
"File is %d chunks (%d blocks)",
"File is %d chunks (%lld bytes)",
timeo.tv_sec - stamp.tv_sec,
myid, TotalChunkCount, p->msg.join.blockcount);
myid, TotalChunkCount, p->msg.join2.bytecount);
ChunkerStartup();
......
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2009 University of Utah and the Flux Group.
* Copyright (c) 2000-2010 University of Utah and the Flux Group.
* All rights reserved.
*/
......@@ -17,9 +17,9 @@
* header (24).
*/
#ifdef JUMBO
#define MAXBLOCKSIZE 8934
#define MAXPACKETDATA 8934
#else
#define MAXBLOCKSIZE 1448
#define MAXPACKETDATA 1448
#endif
/*
......@@ -27,17 +27,17 @@
* Chunks are broken into blocks which are the unit of transmission
*/
#ifdef JUMBO
#define CHUNKSIZE 128
#define BLOCKSIZE 8192
#define MAXCHUNKSIZE 128
#define MAXBLOCKSIZE 8192
#else
#define CHUNKSIZE 1024
#define BLOCKSIZE 1024
#define MAXCHUNKSIZE 1024
#define MAXBLOCKSIZE 1024
#endif
/*
* Make sure we can fit a block in a single ethernet MTU.
*/
#if BLOCKSIZE > MAXBLOCKSIZE
#if MAXBLOCKSIZE > MAXPACKETDATA
#error "Invalid block size"
#endif
......@@ -47,7 +47,7 @@
* With the maximum block size of 1448, this limits a chunk to no more
* than 16,773,632 bytes (just under 16MB).
*/
#if (CHUNKSIZE%CHAR_BIT) != 0 || (CHUNKSIZE/CHAR_BIT) > MAXBLOCKSIZE
#if (MAXCHUNKSIZE%CHAR_BIT) != 0 || (MAXCHUNKSIZE/CHAR_BIT) > MAXPACKETDATA
#error "Invalid chunk size"
#endif
......@@ -57,8 +57,8 @@
* of the client (forcing pieces of frisbee to be paged out to disk, even
* if there is a swap disk to use, is not a very efficient way to load disks!)
*
* MAXCHUNKBUFS is the number of BLOCKSIZE*CHUNKSIZE chunk buffers used to
* receive data from the network. With the default values, these are 1MB
* MAXCHUNKBUFS is the number of MAXBLOCKSIZE*MAXCHUNKSIZE chunk buffers used
* to receive data from the network. With the default values, these are 1MB
* each.
*
* MAXWRITEBUFMEM is the amount, in MB, of write buffer memory in the client.
......@@ -108,16 +108,16 @@
/*
* The number of disk read blocks in a single read on the server.
* Must be an integer divisor of CHUNKSIZE.
* Must be an integer divisor of MAXCHUNKSIZE.
*/
#define SERVER_READ_SIZE 32
/*
* Parameters for server network usage:
*
* SERVER_BURST_SIZE Max BLOCKSIZE packets sent in a burst.
* SERVER_BURST_SIZE Max MAXBLOCKSIZE packets sent in a burst.
* Should be a multiple of SERVER_READ_SIZE
* Should be less than SOCKBUFSIZE/BLOCKSIZE,
* Should be less than SOCKBUFSIZE/MAXBLOCKSIZE,
* bursts of greater than the send socket
* buffer size are almost certain to cause
* lost packets.
......@@ -128,12 +128,12 @@
* On FreeBSD we set the clock to 1ms
* granularity.
*
* Together with the BLOCKSIZE, these two params form a theoretical upper
* Together with the MAXBLOCKSIZE, these two params form a theoretical upper
* bound on bandwidth consumption for the server. That upper bound (for
* ethernet) is:
*
* (1000000 / SERVER_BURST_GAP) # bursts per second
* * (BLOCKSIZE+24+42) * SERVER_BURST_SIZE # * wire size of a burst
* (1000000 / SERVER_BURST_GAP) # bursts per second
* * (MAXBLOCKSIZE+24+42) * SERVER_BURST_SIZE # * wire size of a burst
*
* which for the default 1k packets, gap of 1ms and burst of 16 packets
* is about 17.4MB/sec. That is beyond the capacity of a 100Mb ethernet
......@@ -155,7 +155,7 @@
* How long (in usecs) to wait before re-reqesting a chunk.
* It will take the server more than:
*
* (CHUNKSIZE/SERVER_BURST_SIZE) * SERVER_BURST_GAP
* (MAXCHUNKSIZE/SERVER_BURST_SIZE) * SERVER_BURST_GAP
*
* usec (0.13 sec with defaults) for each each chunk it pumps out,
* and we conservatively assume that there are a fair number of other
......@@ -214,7 +214,7 @@ typedef struct {
} __attribute__((__packed__)) ClientStats_t;
typedef struct {
char map[CHUNKSIZE/CHAR_BIT];
char map[MAXCHUNKSIZE/CHAR_BIT];
} BlockMap_t;
/*
......@@ -250,7 +250,7 @@ typedef struct {
struct {
int32_t chunk;
int32_t block;
int8_t buf[BLOCKSIZE];
int8_t buf[MAXBLOCKSIZE];
} block;
/*
......@@ -275,6 +275,24 @@ typedef struct {
BlockMap_t blockmap;
} prequest;
/*
* Join V2 allows:
* - client to request a specific chunk/block size
* server will return what it will provide
* - server to return the size in bytes
* so that we can transfer files that are not a
* multiple of the block/chunk size
* Note the blockcount field remains for vague
* compatibility-ish reasons.
*/
struct {
uint32_t clientid;
int32_t blockcount;
int32_t chunksize;
int32_t blocksize;
int64_t bytecount;
} join2;
/*
* Leave reporting client params/stats
*/
......@@ -294,6 +312,7 @@ typedef struct {
#define PKTSUBTYPE_REQUEST 4
#define PKTSUBTYPE_LEAVE2 5
#define PKTSUBTYPE_PREQUEST 6
#define PKTSUBTYPE_JOIN2 7
/*
* Protos.
......
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2009 University of Utah and the Flux Group.
* Copyright (c) 2000-2010 University of Utah and the Flux Group.
* All rights reserved.
*/
......@@ -427,7 +427,7 @@ PacketValid(Packet_t *p, int nchunks)
if (p->msg.block.chunk < 0 ||
p->msg.block.chunk >= nchunks ||
p->msg.block.block < 0 ||
p->msg.block.block >= CHUNKSIZE)
p->msg.block.block >= MAXCHUNKSIZE)
return 0;
break;
case PKTSUBTYPE_REQUEST:
......@@ -436,9 +436,9 @@ PacketValid(Packet_t *p, int nchunks)
if (p->msg.request.chunk < 0 ||
p->msg.request.chunk >= nchunks ||
p->msg.request.block < 0 ||
p->msg.request.block >= CHUNKSIZE ||
p->msg.request.block >= MAXCHUNKSIZE ||
p->msg.request.count < 0 ||
p->msg.request.block+p->msg.request.count > CHUNKSIZE)
p->msg.request.block+p->msg.request.count > MAXCHUNKSIZE)
return 0;
break;
case PKTSUBTYPE_PREQUEST:
......@@ -452,6 +452,10 @@ PacketValid(Packet_t *p, int nchunks)
if (p->hdr.datalen < sizeof(p->msg.join))
return 0;
break;
case PKTSUBTYPE_JOIN2:
if (p->hdr.datalen < sizeof(p->msg.join2))
return 0;
break;
case PKTSUBTYPE_LEAVE:
if (p->hdr.datalen < sizeof(p->msg.leave))
return 0;
......
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2009 University of Utah and the Flux Group.
* Copyright (c) 2000-2010 University of Utah and the Flux Group.
* All rights reserved.
*/
......@@ -108,6 +108,8 @@ struct FileInfo {
int fd; /* Open file descriptor */
int blocks; /* Number of BLOCKSIZE blocks */
int chunks; /* Number of CHUNKSIZE chunks */
int isimage; /* non-zero if this is a 1MB-rounded file */
off_t filesize; /* Real size of file */
};
static struct FileInfo FileInfo;
......@@ -181,7 +183,7 @@ WorkQueueEnqueue(int chunk, BlockMap_t *map, int count)
* Common case: a full chunk request for the full block we are
* currently sending. Don't queue.
*/
if (count == CHUNKSIZE && chunk == WorkChunk && count == WorkCount) {
if (count == MAXCHUNKSIZE && chunk == WorkChunk && count == WorkCount) {
EVENT(1, EV_WORKMERGE, mcastaddr, chunk, count, count, ~0);
pthread_mutex_unlock(&WorkQLock);
return 0;
......@@ -206,15 +208,15 @@ WorkQueueEnqueue(int chunk, BlockMap_t *map, int count)
* We have a queued request for the entire chunk
* already, nothing to do.
*/
if (wqel->nblocks == CHUNKSIZE)
if (wqel->nblocks == MAXCHUNKSIZE)
blocks = 0;
/*
* Or if incoming request is an entire chunk
* just copy that map.
*/
else if (count == CHUNKSIZE) {
else if (count == MAXCHUNKSIZE) {
wqel->blockmap = *map;
blocks = CHUNKSIZE - wqel->nblocks;
blocks = MAXCHUNKSIZE - wqel->nblocks;
}
/*
* Otherwise do the full merge
......@@ -224,7 +226,7 @@ WorkQueueEnqueue(int chunk, BlockMap_t *map, int count)
EVENT(1, EV_WORKMERGE, mcastaddr,
chunk, wqel->nblocks, blocks, elt);
wqel->nblocks += blocks;
assert(wqel->nblocks <= CHUNKSIZE);
assert(wqel->nblocks <= MAXCHUNKSIZE);
pthread_mutex_unlock(&WorkQLock);
return 0;
}
......@@ -277,9 +279,9 @@ WorkQueueDequeue(int *chunkp, int *blockp, int *countp)
wqel = (WQelem_t *) queue_first(&WorkQ);
chunk = wqel->chunk;
if (wqel->nblocks == CHUNKSIZE) {
if (wqel->nblocks == MAXCHUNKSIZE) {
block = 0;
count = CHUNKSIZE;
count = MAXCHUNKSIZE;
} else
count = BlockMapExtract(&wqel->blockmap, &block);
assert(count <= wqel->nblocks);
......@@ -308,7 +310,7 @@ ClientEnqueueMap(int chunk, BlockMap_t *map, int count, int isretry)
{
int enqueued;
if (count != CHUNKSIZE) {
if (count != MAXCHUNKSIZE) {
DOSTAT(blockslost+=count);
blockslost += count;
DOSTAT(partialreq++);
......@@ -318,7 +320,7 @@ ClientEnqueueMap(int chunk, BlockMap_t *map, int count, int isretry)
if (!enqueued)
DOSTAT(qmerges++);
#ifdef STATS
else if (chunkmap != 0 && count == CHUNKSIZE) {
else if (chunkmap != 0 && count == MAXCHUNKSIZE) {
if (chunkmap[chunk]) {
if (debug > 1)
log("Duplicate chunk request: %d", chunk);
......@@ -356,7 +358,7 @@ ClientEnqueueMap(int chunk, BlockMap_t *map, int count, int isretry)
* reply are harmless.
*/
static void
ClientJoin(Packet_t *p)
ClientJoin(Packet_t *p, int version)
{
struct in_addr ipaddr = { p->hdr.srcip };
unsigned int clientid = p->msg.join.clientid;
......@@ -364,10 +366,31 @@ ClientJoin(Packet_t *p)
/*
* Return fileinfo. Duplicates are harmless.
*/
EVENT(1, EV_JOINREQ, ipaddr, clientid, 0, 0, 0);
p->hdr.type = PKTTYPE_REPLY;
p->hdr.datalen = sizeof(p->msg.join);
p->msg.join.blockcount = FileInfo.blocks;
EVENT(1, EV_JOINREQ, ipaddr, clientid, version, 0, 0);
p->hdr.type = PKTTYPE_REPLY;
if (version == 1) {
/*
* XXX we cannot accept JOINv1 requests if the image we
* are serving is not a "traditional" 1MB padded image.
* Otherwise, they would ultimately make requests for
* parts of the non-rouded file that don't exist and we
* would not respond. We could fake up some data to return
* but I'm not sure that is any better.
*/
if (!FileInfo.isimage) {
log("%s requested JOINv1 for non-image file, "
"ignoring...", inet_ntoa(ipaddr));
return;
}
p->hdr.datalen = sizeof(p->msg.join);
p->msg.join.blockcount = FileInfo.blocks;
} else {
p->hdr.datalen = sizeof(p->msg.join2);
p->msg.join2.blockcount = 0;
p->msg.join2.chunksize = CHUNKSIZE;
p->msg.join2.blocksize = BLOCKSIZE;
p->msg.join2.bytecount = FileInfo.filesize;
}
PacketReply(p);
#ifdef STATS
{
......@@ -410,8 +433,8 @@ ClientJoin(Packet_t *p)
* Log after we send reply so that we get the packet off as
* quickly as possible!
*/
log("%s (id %u, image %s) joins at %s! %d active clients.",
inet_ntoa(ipaddr), clientid, filename,
log("%s (id %u, image %s) joins (v%d) at %s! %d active clients.",
inet_ntoa(ipaddr), clientid, filename, version,
CurrentTimeString(), activeclients);
}
......@@ -516,7 +539,7 @@ ClientRequest(Packet_t *p)
log("WARNING: ClientRequest with zero count");
EVENT(1, EV_REQMSG, ipaddr, chunk, block, count, 0);
if (block + count > CHUNKSIZE)
if (block + count > MAXCHUNKSIZE)
fatal("Bad request from %s - chunk:%d block:%d size:%d",
inet_ntoa(ipaddr), chunk, block, count);
......@@ -541,7 +564,7 @@ ClientPartialRequest(Packet_t *p)
int chunk = p->msg.prequest.chunk;
int count;
count = BlockMapIsAlloc(&p->msg.prequest.blockmap, 0, CHUNKSIZE);
count = BlockMapIsAlloc(&p->msg.prequest.blockmap, 0, MAXCHUNKSIZE);
if (count == 0)
log("WARNING: ClientPartialRequest with zero count");
......@@ -621,7 +644,11 @@ ServerRecvThread(void *arg)
switch (p->hdr.subtype) {
case PKTSUBTYPE_JOIN:
DOSTAT(joins++);
ClientJoin(p);
ClientJoin(p, 1);
break;
case PKTSUBTYPE_JOIN2:
DOSTAT(joins++);
ClientJoin(p, 2);
break;
case PKTSUBTYPE_LEAVE:
DOSTAT(leaves++);
......@@ -659,7 +686,7 @@ PlayFrisbee(void)
off_t offset;
struct timeval startnext;
if ((databuf = malloc(readsize * BLOCKSIZE)) == NULL)
if ((databuf = malloc(readsize * MAXBLOCKSIZE)) == NULL)
fatal("could not allocate read buffer");
while (1) {
......@@ -722,14 +749,15 @@ PlayFrisbee(void)
lastblock = startblock + blockcount;
/* Offset within the file */
offset = (((off_t) BLOCKSIZE * chunk * CHUNKSIZE) +
((off_t) BLOCKSIZE * startblock));
offset = (((off_t) MAXBLOCKSIZE * chunk * MAXCHUNKSIZE) +
((off_t) MAXBLOCKSIZE * startblock));
for (block = startblock; block < lastblock; ) {
int readcount;
int readbytes;
int resends;
int thisburst = 0;
int resid = 0;
#if defined(NEVENTS) || defined(STATS)
struct timeval rstamp;
gettimeofday(&rstamp, 0);
......@@ -742,7 +770,30 @@ PlayFrisbee(void)
readcount = readsize;
else
readcount = lastblock - block;
readbytes = readcount * BLOCKSIZE;
readbytes = readcount * MAXBLOCKSIZE;
/*
* Check for final partial block and truncate
* read if necessary. This should only happen
* for the last block of a file, a request beyond
* that is an error (and do what?)