Commit 86efdd9e authored by Leigh B. Stoller's avatar Leigh B. Stoller

Checkpoint first working version of Frisbee Redux. This version

requires the linux threads package to give us kernel level pthreads.

From: Leigh Stoller <stoller@fast.cs.utah.edu>
To: Testbed Operations <testbed-ops@fast.cs.utah.edu>
Cc: Jay Lepreau <lepreau@cs.utah.edu>
Subject: Frisbee Redux
Date: Mon, 7 Jan 2002 12:03:56 -0800

Server:
The server is multithreaded. One thread takes in requests from the
clients, and adds the request to a work queue. The other thread processes
the work queue in fifo order, spitting out the desrired block ranges. A
request is a chunk/block/blockcount tuple, and most of the time the clients
are requesting complete 1MB chunks. The exception of course is when
individual blocks are lost, in which case the clients request just those
subranges.  The server it totally asynchronous; It maintains a list of who
is "connected", but thats just to make sure we can time the server out
after a suitable inactive time. The server really only cares about the work
queue; As long as the queue si non empty, it spits out data.

Client:
The client is also multithreaded. One thread receives data packets and
stuffs them in a chunkbuffer data structure. This thread also request more
data, either to complete chunks with missing blocks, or to request new
chunks. Each client can read ahead up 2 chunks, although with multiple
clients it might actually be much further ahead as it also receives chunks
that other clients requested. I set the number of chunk buffers to 16,
although this is probably unnecessary as I will explain below. The other
thread waits for chunkbuffers to be marked complete, and then invokes the
imagunzip code on that chunk. Meanwhile, the other thread is busily getting
more data and requesting/reading ahread, so that by the time the unzip is
done, there is another chunk to unzip. In practice, the main thread never
goes idle after the first chunk is received; there is always a ready chunk
for it. Perfect overlap of I/O! In order to prevent the clients from
getting overly synchronized (and causing all the clients to wait until the
last client is done!), each client randomizes it block request order. This
why we can retain the original frisbee name; clients end up catching random
blocks flung out from the server until it has all the blocks.

Performance:
The single node speed is about 180 seconds for our current full image.
Frisbee V1 compares at about 210 seconds. The two node speed was 181 and
174 seconds. The amount of CPU used for the two node run ranged from 1% to
4%, typically averaging about 2% while I watched it with "top."

The main problem on the server side is how to keep boss (1GHZ with a Gbit
ethernet) from spitting out packets so fast that 1/2 of them get dropped. I
eventually settled on a static 1ms delay every 64K of packets sent. Nothing
to be proud of, but it works.

As mentioned above, the number of chunk buffers is 16, although only a few
of them are used in practice. The reason is that the network transfer speed
is perhaps 10 times faster than the decompression and raw device write
speed. To know for sure, I would have to figure out the per byte transfer
rate for 350 MBs via network, via the time to decompress and write the
1.2GB of data to the raw disk. With such a big difference, its only
necessary to ensure that you stay 1 or 2 chunks ahead, since you can
request 10 chunks in the time it takes to write one of them.
parent d0b9f55f
......@@ -1049,6 +1049,7 @@ outfiles="$outfiles Makeconf GNUmakefile \
ipod/GNUmakefile \
lib/GNUmakefile \
os/GNUmakefile os/split-image.sh os/imagezip/GNUmakefile \
os/frisbee.redux/GNUmakefile \
pxe/GNUmakefile pxe/proxydhcp.restart pxe/bootinfo.restart \
security/GNUmakefile security/paperbag security/lastlog_daemon \
tbsetup/GNUmakefile tbsetup/console_setup \
......
......@@ -164,6 +164,7 @@ outfiles="$outfiles Makeconf GNUmakefile \
ipod/GNUmakefile \
lib/GNUmakefile \
os/GNUmakefile os/split-image.sh os/imagezip/GNUmakefile \
os/frisbee.redux/GNUmakefile \
pxe/GNUmakefile pxe/proxydhcp.restart pxe/bootinfo.restart \
security/GNUmakefile security/paperbag security/lastlog_daemon \
tbsetup/GNUmakefile tbsetup/console_setup \
......
SRCDIR = @srcdir@
TESTBED_SRCDIR = @top_srcdir@
OBJDIR = ../..
SUBDIR = os/frisbee.new
include $(OBJDIR)/Makeconf
all: frisbee frisbeed
include $(TESTBED_SRCDIR)/GNUmakerules
SHAREDOBJS = log.o network.o utils.o
PTHREADCFLAGS = -D_THREAD_SAFE \
-I/usr/local/include/pthread/linuxthreads
PTHREADLIBS = -L/usr/local/lib -llthread -llgcc_r
CLIENTFLAGS = $(CFLAGS)
CLIENTLIBS = ../imagezip/frisbee.o -lz $(PTHREADLIBS)
CLIENTOBJS = client.o $(SHAREDOBJS)
SERVERFLAGS = $(CFLAGS)
SERVERLIBS = $(PTHREADLIBS)
SERVEROBJS = server.o $(SHAREDOBJS)
CFLAGS = -O2 -g -Wall -static $(PTHREADCFLAGS)
LDFLAGS = -static
frisbee: $(CLIENTOBJS) ../imagezip/frisbee.o
$(CC) $(LDFLAGS) $(CLIENTFLAGS) $(CLIENTOBJS) $(CLIENTLIBS) -o frisbee
cp frisbee frisbee.debug
strip frisbee
frisbeed: $(SERVEROBJS)
$(CC) $(LDFLAGS) $(SERVERFLAGS) $(SERVEROBJS) $(SERVERLIBS) -o frisbeed
cp frisbeed frisbeed.debug
strip frisbeed
client.o: decls.h log.h
server.o: decls.h log.h
log.o: decls.h log.h
clean:
/bin/rm -f *.o *.a frisbee frisbeed frisbee.debug frisbeed.debug
/*
* Frisbee client.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <syslog.h>
#include <signal.h>
#include <stdarg.h>
#include <pthread.h>
#include "decls.h"
int debug = 0;
int portnum;
struct in_addr mcastaddr;
struct in_addr mcastif;
/* Forward Decls */
static void PlayFrisbee(void);
static void GotBlock(Packet_t *p);
static void RequestChunk(int timedout);
static int ImageUnzip(int chunk);
extern int ImageUnzipInit(char *filename, int slice, int debug);
/*
* The chunker data structure. For each chunk in progress, we maintain this
* array of blocks (plus meta info). This serves as a cache to receive
* blocks from the server while we write completed chunks to disk. The child
* thread reads packets and updates this cache, while the parent thread
* simply looks for completed blocks and writes them. The "inprogress" slot
* serves a free/allocated flag, while the ready bit indicates that a chunk
* is complete and ready to write to disk.
*/
typedef struct {
int thischunk; /* Which chunk in progress */
int inprogress; /* Chunk in progress? Free/Allocated */
int ready; /* Ready to write to disk? */
int blockcount; /* Number of blocks not received yet */
char bitmap[CHUNKSIZE]; /* Which blocks have been received */
struct {
char data[BLOCKSIZE];
} blocks[CHUNKSIZE]; /* Actual block data */
} ChunkBuffer_t;
ChunkBuffer_t *ChunkBuffer; /* The cache */
char *ChunkBitmap; /* Bitmap of which chunks finished */
int *ChunkRequestList; /* Randomized chunk request order */
int LastReceiveChunk; /* Chunk of last data packet received*/
int TotalChunkCount; /* Total number of chunks in file */
int IdleCounter; /* Countdown to request more data */
char *usagestr =
"usage: frisbee [-d] [-s #] <-p #> <-m mcastaddr> <output filename>\n"
" -d Turn on debugging. Multiple -d options increase output.\n"
" -p portnum Specify a port number.\n"
" -m mcastaddr Specify a multicast address in dotted notation.\n"
" -i mcastif Specify a multicast interface in dotted notation.\n"
" -s slice Output to DOS slice (DOS numbering 1-4)\n"
" NOTE: Must specify a raw disk device for output filename.\n"
"\n";
void
usage()
{
fprintf(stderr, usagestr);
exit(1);
}
int
main(int argc, char **argv)
{
int ch, slice = 0;
char *filename;
while ((ch = getopt(argc, argv, "dhp:m:s:i:")) != -1)
switch(ch) {
case 'd':
debug++;
break;
case 'p':
portnum = atoi(optarg);
break;
case 'm':
inet_aton(optarg, &mcastaddr);
break;
case 'i':
inet_aton(optarg, &mcastif);
break;
case 's':
slice = atoi(optarg);
break;
case 'h':
case '?':
default:
usage();
}
argc -= optind;
argv += optind;
if (argc != 1)
usage();
filename = argv[0];
if (!portnum || ! mcastaddr.s_addr)
usage();
ClientLogInit();
ImageUnzipInit(filename, slice, debug);
ClientNetInit();
PlayFrisbee();
exit(0);
}
/*
* The client receive thread. This thread takes in packets from the server.
*/
void *
ClientRecvThread(void *arg)
{
Packet_t packet, *p = &packet;
if (debug)
log("Receive pthread starting up ...");
IdleCounter = CLIENT_IDLETIMER_COUNT;
while (1) {
/*
* If we go too long without getting a block, we want
* to make another chunk request.
*/
if (PacketReceive(p) < 0) {
pthread_testcancel();
IdleCounter--;
if (! IdleCounter) {
RequestChunk(1);
IdleCounter = CLIENT_IDLETIMER_COUNT;
}
continue;
}
pthread_testcancel();
switch (p->hdr.subtype) {
case PKTSUBTYPE_BLOCK:
GotBlock(p);
break;
case PKTSUBTYPE_JOIN:
case PKTSUBTYPE_LEAVE:
case PKTSUBTYPE_REQUEST:
/* Ignore these. They are from other clients. */
break;
default:
fatal("ClientRecvThread: Bad packet type!");
break;
}
}
}
/*
* The heart of the game.
*/
static void
ChunkerStartup(void)
{
pthread_t child_pid;
void *ignored;
int chunkcount = TotalChunkCount;
int i;
/*
* Allocate the Chunk Buffer and Chunk Bitmap.
*/
if ((ChunkBitmap =
(char *) calloc(chunkcount, sizeof(*ChunkBitmap))) == NULL)
fatal("ChunkBitmap: No more memory");
if ((ChunkRequestList =
(int *) calloc(chunkcount, sizeof(*ChunkRequestList))) == NULL)
fatal("ChunkRequestList: No more memory");
if ((ChunkBuffer =
(ChunkBuffer_t *) malloc(MAXCHUNKBUFS * sizeof(ChunkBuffer_t)))
== NULL)
fatal("ChunkBuffer: No more memory");
/*
* Set all the buffers to "free"
*/
for (i = 0; i < MAXCHUNKBUFS; i++) {
ChunkBuffer[i].inprogress = 0;
ChunkBuffer[i].ready = 0;
}
/*
* We randomize the block selection so that multiple clients
* do not end up getting stalled by each other. That is, if
* all the clients were requesting blocks in order, then all
* the clients would end up waiting until the last client was
* done (since the server processes client requests in FIFO
* order).
*/
for (i = 0; i < TotalChunkCount; i++)
ChunkRequestList[i] = i;
for (i = 0; i < 50 * TotalChunkCount; i++) {
int c1 = random() % TotalChunkCount;
int c2 = random() % TotalChunkCount;
int t1 = ChunkRequestList[c1];
int t2 = ChunkRequestList[c2];
ChunkRequestList[c2] = t1;
ChunkRequestList[c1] = t2;
}
if (pthread_create(&child_pid, NULL,
ClientRecvThread, (void *)0)) {
fatal("Failed to create pthread!");
}
/*
* Loop until all chunks have been received and written to disk.
*/
while (chunkcount) {
/*
* Search the chunk cache for a chunk that is ready to write.
*/
for (i = 0; i < MAXCHUNKBUFS; i++) {
if (ChunkBuffer[i].ready)
break;
}
/*
* If nothing to do, then get out of the way for a while.
*/
if (i == MAXCHUNKBUFS) {
log("No chunks ready to write!");
fsleep(100000);
continue;
}
/*
* We have a completed chunk. Write it to disk.
*/
log("Writing chunk %d (buffer %d)",
ChunkBuffer[i].thischunk, i);
ImageUnzip(i);
/*
* Okay, free the slot up for another chunk.
*/
ChunkBuffer[i].ready = 0;
ChunkBuffer[i].inprogress = 0;
chunkcount--;
}
/*
* Kill the child and wait for it before returning. We do not
* want the child absorbing any more packets, cause that would
* mess up the termination handshake with the server.
*/
pthread_cancel(child_pid);
pthread_join(child_pid, &ignored);
}
/*
* Receive a single data block. If the block is for a chunk in progress, then
* insert the data and check for a completed chunk. It will be up to the main
* thread to process that chunk.
*
* If the block is the first of some chunk, then try to allocate a new chunk.
* If the chunk buffer is full, then drop the block. If this happens, it
* indicates the chunk buffer is not big enough, and should be increased.
*/
static void
GotBlock(Packet_t *p)
{
int chunk = p->msg.block.chunk;
int block = p->msg.block.block;
int i, free = -1;
/*
* Search the chunk buffer for a match (or a free one).
*/
for (i = 0; i < MAXCHUNKBUFS; i++) {
if (!ChunkBuffer[i].inprogress) {
free = i;
continue;
}
if (!ChunkBuffer[i].ready &&
ChunkBuffer[i].thischunk == chunk)
break;
}
if (i == MAXCHUNKBUFS) {
/*
* Did not find it. Allocate the free one, or drop the
* packet if there is no free chunk.
*/
if (free == -1) {
if (debug)
log("No more free buffer slots for chunk %d!",
chunk);
return;
}
/*
* Was this chunk already processed?
*/
if (ChunkBitmap[chunk]) {
if (0)
log("Duplicate chunk %d ignored!", chunk);
return;
}
ChunkBitmap[chunk] = 1;
if (debug)
log("Allocating chunk buffer %d to chunk %d",
free, chunk);
i = free;
ChunkBuffer[i].ready = 0;
ChunkBuffer[i].inprogress = 1;
ChunkBuffer[i].thischunk = chunk;
ChunkBuffer[i].blockcount = CHUNKSIZE;
bzero(ChunkBuffer[i].bitmap, sizeof(ChunkBuffer[i].bitmap));
}
/*
* Insert the block and update the metainfo. We have to watch for
* duplicate blocks in the same chunk since another client may
* issue a request for a lost block, and we will see that even if
* we do not need it (cause of broadcast/multicast).
*/
if (ChunkBuffer[i].bitmap[block]) {
if (0)
log("Duplicate block %d in chunk %d", block, chunk);
return;
}
ChunkBuffer[i].bitmap[block] = 1;
ChunkBuffer[i].blockcount--;
memcpy(ChunkBuffer[i].blocks[block].data, p->msg.block.buf, BLOCKSIZE);
LastReceiveChunk = chunk;
/*
* Anytime we receive a packet thats needed, reset the idle counter.
* This will prevent us from sending too many requests.
*/
IdleCounter = CLIENT_IDLETIMER_COUNT;
/*
* Is the chunk complete? If so, then release it to the main thread.
*/
if (ChunkBuffer[i].blockcount == 0) {
if (debug)
log("Releasing chunk %d to main thread", chunk);
ChunkBuffer[i].ready = 1;
/*
* Send off a request for a chunk we do not have yet. This
* should be enough to ensure that there is more work to do
* by the time the main thread finishes the chunk we just
* released.
*/
RequestChunk(0);
}
}
/*
* Request a chunk/block/range we do not have.
*/
static void
RequestRange(int chunk, int block, int count)
{
Packet_t packet, *p = &packet;
if (debug)
log("Requesting chunk:%d block:%d count:%d",
chunk, block, count);
p->hdr.type = PKTTYPE_REQUEST;
p->hdr.subtype = PKTSUBTYPE_REQUEST;
p->hdr.datalen = sizeof(p->msg.request);
p->msg.request.chunk = chunk;
p->msg.request.block = block;
p->msg.request.count = count;
PacketSend(p);
}
static void
RequestChunk(int timedout)
{
int i, j, k;
/*
* Look for unfinished chunks.
*/
for (i = 0; i < MAXCHUNKBUFS; i++) {
if (! ChunkBuffer[i].inprogress || ChunkBuffer[i].ready)
continue;
/*
* If we timed out, then its okay to send in another request
* for the last chunk in progress. The point of this test
* is to prevent requesting a chunk we are currently getting
* packets for. But once we time out, we obviously want to
* get more blocks for it.
*/
if (ChunkBuffer[i].thischunk == LastReceiveChunk && !timedout)
continue;
/*
* Scan the bitmap. We do not want to request single blocks,
* but rather small groups of them.
*/
j = 0;
while (j < CHUNKSIZE) {
/*
* When we find a missing block, scan forward from
* that point till we get to a block that is present.
* Send a request for the intervening range.
*/
if (! ChunkBuffer[i].bitmap[j]) {
for (k = j; k < CHUNKSIZE; k++) {
if (ChunkBuffer[i].bitmap[k])
break;
}
RequestRange(ChunkBuffer[i].thischunk,
j, k - j);
j = k;
}
j++;
}
}
/*
* Make sure we have a place to put new data.
*/
for (i = 0, k = 0; i < MAXCHUNKBUFS; i++) {
if (ChunkBuffer[i].inprogress)
k++;
}
if (MAXCHUNKBUFS - k < MAXREADAHEAD || k >= MAXINPROGRESS)
return;
/*
* Look for a chunk we are not working on yet.
*/
for (i = 0, j = 0; i < TotalChunkCount && j < MAXREADAHEAD; i++) {
int chunk = ChunkRequestList[i];
if (! ChunkBitmap[chunk]) {
/*
* Not working on this one yet, so send off a
* request for it.
*/
RequestRange(chunk, 0, CHUNKSIZE);
j++;
}
}
}
/*
* Join the Frisbee team, and then go into the main loop above.
*/
static void
PlayFrisbee(void)
{
Packet_t packet, *p = &packet;
struct timeval stamp, estamp;
unsigned int myid;
gettimeofday(&stamp, 0);
/*
* Init the random number generator. We randomize the block request
* sequence above, and its important that each client have a different
* sequence!
*/
srandomdev();
/*
* A random number ID. I do not think this is really necessary,
* but perhaps might be useful for determining when a client has
* crashed and returned.
*/
myid = random();
/*
* Send a join the team message. We block waiting for a reply
* since we need to know the total block size. We resend the
* message (dups are harmless) if we do not get a reply back.
*/
while (1) {
int countdown = 0;
if (countdown <= 0) {
p->hdr.type = PKTTYPE_REQUEST;
p->hdr.subtype = PKTSUBTYPE_JOIN;
p->hdr.datalen = sizeof(p->msg.join);
p->msg.join.clientid = myid;
PacketSend(p);
countdown = 3;
}
/*
* Throw away any data packets. We cannot start until
* we get a reply back. Wait several receive delays
* before resending the join message.
*/
if (PacketReceive(p) < 0) {
countdown--;
continue;
}
if (p->hdr.subtype == PKTSUBTYPE_JOIN &&
p->hdr.type == PKTTYPE_REPLY) {
break;
}
}
TotalChunkCount = p->msg.join.blockcount / BLOCKSIZE;
log("Joined the team. ID is %u. File is %d chunks (%d blocks)",
myid, TotalChunkCount, p->msg.join.blockcount);
ChunkerStartup();
/*
* Done! Handshake our exit with the server.
*/
while (1) {
int countdown = 0;
if (countdown <= 0) {
p->hdr.type = PKTTYPE_REQUEST;
p->hdr.subtype = PKTSUBTYPE_LEAVE;
p->hdr.datalen = sizeof(p->msg.join);
p->msg.join.clientid = myid;
PacketSend(p);
countdown = 3;
}
/*
* Throw away any data packets until we get a reply back.
* Wait several receive delays before resending the message.
*/
if (PacketReceive(p) < 0) {
countdown--;
continue;
}
if (p->hdr.subtype == PKTSUBTYPE_LEAVE &&
p->hdr.type == PKTTYPE_REPLY) {
break;
}
}
gettimeofday(&estamp, 0);
estamp.tv_sec -= stamp.tv_sec;
log("Left the team after %ld seconds on the field!", estamp.tv_sec);
}
/*
* Supply a read function to the imageunzip library. Kinda hokey right
* now. The original imageunzip code depends on read to keep track of
* the seek offset within the input file. Well, in our case we know what
* chunk the inflator is working on, and we keep track of the block offset
* as well.
*
* XXX We copy out the data. Need to change this interface to avoid that.
*/
static int ImageUnzipOffset;
static int ImageUnzipBuffer;
extern int inflate_subblock(void);
static int
ImageUnzip(int slot)
{
ImageUnzipBuffer = slot;
ImageUnzipOffset = 0;
if (inflate_subblock())
pfatal(