diff --git a/configure b/configure index fda1d7e11515f2eb582e8761207c6c83278a2140..8a64af5a20bf9ece5df557208c3875b7ead1738f 100755 --- a/configure +++ b/configure @@ -1049,6 +1049,7 @@ outfiles="$outfiles Makeconf GNUmakefile \ ipod/GNUmakefile \ lib/GNUmakefile \ os/GNUmakefile os/split-image.sh os/imagezip/GNUmakefile \ + os/frisbee.redux/GNUmakefile \ pxe/GNUmakefile pxe/proxydhcp.restart pxe/bootinfo.restart \ security/GNUmakefile security/paperbag security/lastlog_daemon \ tbsetup/GNUmakefile tbsetup/console_setup \ diff --git a/configure.in b/configure.in index e2746d64b2b2d331377d3508ab08931a1cb959ec..5b5bdb8dc3cbe47c93117479fef7cc0e6b7c579e 100755 --- a/configure.in +++ b/configure.in @@ -164,6 +164,7 @@ outfiles="$outfiles Makeconf GNUmakefile \ ipod/GNUmakefile \ lib/GNUmakefile \ os/GNUmakefile os/split-image.sh os/imagezip/GNUmakefile \ + os/frisbee.redux/GNUmakefile \ pxe/GNUmakefile pxe/proxydhcp.restart pxe/bootinfo.restart \ security/GNUmakefile security/paperbag security/lastlog_daemon \ tbsetup/GNUmakefile tbsetup/console_setup \ diff --git a/os/frisbee.redux/GNUmakefile.in b/os/frisbee.redux/GNUmakefile.in new file mode 100644 index 0000000000000000000000000000000000000000..d6e25a74d755cfaf81b21ffa322115de43c5ca90 --- /dev/null +++ b/os/frisbee.redux/GNUmakefile.in @@ -0,0 +1,43 @@ +SRCDIR = @srcdir@ +TESTBED_SRCDIR = @top_srcdir@ +OBJDIR = ../.. +SUBDIR = os/frisbee.new + +include $(OBJDIR)/Makeconf + +all: frisbee frisbeed + +include $(TESTBED_SRCDIR)/GNUmakerules + +SHAREDOBJS = log.o network.o utils.o +PTHREADCFLAGS = -D_THREAD_SAFE \ + -I/usr/local/include/pthread/linuxthreads +PTHREADLIBS = -L/usr/local/lib -llthread -llgcc_r + +CLIENTFLAGS = $(CFLAGS) +CLIENTLIBS = ../imagezip/frisbee.o -lz $(PTHREADLIBS) +CLIENTOBJS = client.o $(SHAREDOBJS) + +SERVERFLAGS = $(CFLAGS) +SERVERLIBS = $(PTHREADLIBS) +SERVEROBJS = server.o $(SHAREDOBJS) + +CFLAGS = -O2 -g -Wall -static $(PTHREADCFLAGS) +LDFLAGS = -static + +frisbee: $(CLIENTOBJS) ../imagezip/frisbee.o + $(CC) $(LDFLAGS) $(CLIENTFLAGS) $(CLIENTOBJS) $(CLIENTLIBS) -o frisbee + cp frisbee frisbee.debug + strip frisbee + +frisbeed: $(SERVEROBJS) + $(CC) $(LDFLAGS) $(SERVERFLAGS) $(SERVEROBJS) $(SERVERLIBS) -o frisbeed + cp frisbeed frisbeed.debug + strip frisbeed + +client.o: decls.h log.h +server.o: decls.h log.h +log.o: decls.h log.h + +clean: + /bin/rm -f *.o *.a frisbee frisbeed frisbee.debug frisbeed.debug diff --git a/os/frisbee.redux/client.c b/os/frisbee.redux/client.c new file mode 100644 index 0000000000000000000000000000000000000000..d5eb8a1a0382ae6a71e09e822100b04fed1fda1c --- /dev/null +++ b/os/frisbee.redux/client.c @@ -0,0 +1,610 @@ +/* + * Frisbee client. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "decls.h" + +int debug = 0; +int portnum; +struct in_addr mcastaddr; +struct in_addr mcastif; + +/* Forward Decls */ +static void PlayFrisbee(void); +static void GotBlock(Packet_t *p); +static void RequestChunk(int timedout); +static int ImageUnzip(int chunk); +extern int ImageUnzipInit(char *filename, int slice, int debug); + +/* + * The chunker data structure. For each chunk in progress, we maintain this + * array of blocks (plus meta info). This serves as a cache to receive + * blocks from the server while we write completed chunks to disk. The child + * thread reads packets and updates this cache, while the parent thread + * simply looks for completed blocks and writes them. The "inprogress" slot + * serves a free/allocated flag, while the ready bit indicates that a chunk + * is complete and ready to write to disk. + */ +typedef struct { + int thischunk; /* Which chunk in progress */ + int inprogress; /* Chunk in progress? Free/Allocated */ + int ready; /* Ready to write to disk? */ + int blockcount; /* Number of blocks not received yet */ + char bitmap[CHUNKSIZE]; /* Which blocks have been received */ + struct { + char data[BLOCKSIZE]; + } blocks[CHUNKSIZE]; /* Actual block data */ +} ChunkBuffer_t; +ChunkBuffer_t *ChunkBuffer; /* The cache */ +char *ChunkBitmap; /* Bitmap of which chunks finished */ +int *ChunkRequestList; /* Randomized chunk request order */ +int LastReceiveChunk; /* Chunk of last data packet received*/ +int TotalChunkCount; /* Total number of chunks in file */ +int IdleCounter; /* Countdown to request more data */ + +char *usagestr = + "usage: frisbee [-d] [-s #] <-p #> <-m mcastaddr> \n" + " -d Turn on debugging. Multiple -d options increase output.\n" + " -p portnum Specify a port number.\n" + " -m mcastaddr Specify a multicast address in dotted notation.\n" + " -i mcastif Specify a multicast interface in dotted notation.\n" + " -s slice Output to DOS slice (DOS numbering 1-4)\n" + " NOTE: Must specify a raw disk device for output filename.\n" + "\n"; + +void +usage() +{ + fprintf(stderr, usagestr); + exit(1); +} + +int +main(int argc, char **argv) +{ + int ch, slice = 0; + char *filename; + + while ((ch = getopt(argc, argv, "dhp:m:s:i:")) != -1) + switch(ch) { + case 'd': + debug++; + break; + + case 'p': + portnum = atoi(optarg); + break; + + case 'm': + inet_aton(optarg, &mcastaddr); + break; + + case 'i': + inet_aton(optarg, &mcastif); + break; + + case 's': + slice = atoi(optarg); + break; + + case 'h': + case '?': + default: + usage(); + } + argc -= optind; + argv += optind; + + if (argc != 1) + usage(); + filename = argv[0]; + + if (!portnum || ! mcastaddr.s_addr) + usage(); + + ClientLogInit(); + ImageUnzipInit(filename, slice, debug); + ClientNetInit(); + PlayFrisbee(); + exit(0); +} + +/* + * The client receive thread. This thread takes in packets from the server. + */ +void * +ClientRecvThread(void *arg) +{ + Packet_t packet, *p = &packet; + + if (debug) + log("Receive pthread starting up ..."); + + IdleCounter = CLIENT_IDLETIMER_COUNT; + + while (1) { + /* + * If we go too long without getting a block, we want + * to make another chunk request. + */ + if (PacketReceive(p) < 0) { + pthread_testcancel(); + IdleCounter--; + if (! IdleCounter) { + RequestChunk(1); + IdleCounter = CLIENT_IDLETIMER_COUNT; + } + continue; + } + pthread_testcancel(); + + switch (p->hdr.subtype) { + case PKTSUBTYPE_BLOCK: + GotBlock(p); + break; + + case PKTSUBTYPE_JOIN: + case PKTSUBTYPE_LEAVE: + case PKTSUBTYPE_REQUEST: + /* Ignore these. They are from other clients. */ + break; + + default: + fatal("ClientRecvThread: Bad packet type!"); + break; + } + } +} + +/* + * The heart of the game. + */ +static void +ChunkerStartup(void) +{ + pthread_t child_pid; + void *ignored; + int chunkcount = TotalChunkCount; + int i; + + /* + * Allocate the Chunk Buffer and Chunk Bitmap. + */ + if ((ChunkBitmap = + (char *) calloc(chunkcount, sizeof(*ChunkBitmap))) == NULL) + fatal("ChunkBitmap: No more memory"); + + if ((ChunkRequestList = + (int *) calloc(chunkcount, sizeof(*ChunkRequestList))) == NULL) + fatal("ChunkRequestList: No more memory"); + + if ((ChunkBuffer = + (ChunkBuffer_t *) malloc(MAXCHUNKBUFS * sizeof(ChunkBuffer_t))) + == NULL) + fatal("ChunkBuffer: No more memory"); + + /* + * Set all the buffers to "free" + */ + for (i = 0; i < MAXCHUNKBUFS; i++) { + ChunkBuffer[i].inprogress = 0; + ChunkBuffer[i].ready = 0; + } + + /* + * We randomize the block selection so that multiple clients + * do not end up getting stalled by each other. That is, if + * all the clients were requesting blocks in order, then all + * the clients would end up waiting until the last client was + * done (since the server processes client requests in FIFO + * order). + */ + for (i = 0; i < TotalChunkCount; i++) + ChunkRequestList[i] = i; + + for (i = 0; i < 50 * TotalChunkCount; i++) { + int c1 = random() % TotalChunkCount; + int c2 = random() % TotalChunkCount; + int t1 = ChunkRequestList[c1]; + int t2 = ChunkRequestList[c2]; + + ChunkRequestList[c2] = t1; + ChunkRequestList[c1] = t2; + } + + if (pthread_create(&child_pid, NULL, + ClientRecvThread, (void *)0)) { + fatal("Failed to create pthread!"); + } + + /* + * Loop until all chunks have been received and written to disk. + */ + while (chunkcount) { + /* + * Search the chunk cache for a chunk that is ready to write. + */ + for (i = 0; i < MAXCHUNKBUFS; i++) { + if (ChunkBuffer[i].ready) + break; + } + + /* + * If nothing to do, then get out of the way for a while. + */ + if (i == MAXCHUNKBUFS) { + log("No chunks ready to write!"); + fsleep(100000); + continue; + } + + /* + * We have a completed chunk. Write it to disk. + */ + log("Writing chunk %d (buffer %d)", + ChunkBuffer[i].thischunk, i); + + ImageUnzip(i); + + /* + * Okay, free the slot up for another chunk. + */ + ChunkBuffer[i].ready = 0; + ChunkBuffer[i].inprogress = 0; + chunkcount--; + } + /* + * Kill the child and wait for it before returning. We do not + * want the child absorbing any more packets, cause that would + * mess up the termination handshake with the server. + */ + pthread_cancel(child_pid); + pthread_join(child_pid, &ignored); +} + +/* + * Receive a single data block. If the block is for a chunk in progress, then + * insert the data and check for a completed chunk. It will be up to the main + * thread to process that chunk. + * + * If the block is the first of some chunk, then try to allocate a new chunk. + * If the chunk buffer is full, then drop the block. If this happens, it + * indicates the chunk buffer is not big enough, and should be increased. + */ +static void +GotBlock(Packet_t *p) +{ + int chunk = p->msg.block.chunk; + int block = p->msg.block.block; + int i, free = -1; + + /* + * Search the chunk buffer for a match (or a free one). + */ + for (i = 0; i < MAXCHUNKBUFS; i++) { + if (!ChunkBuffer[i].inprogress) { + free = i; + continue; + } + + if (!ChunkBuffer[i].ready && + ChunkBuffer[i].thischunk == chunk) + break; + } + if (i == MAXCHUNKBUFS) { + /* + * Did not find it. Allocate the free one, or drop the + * packet if there is no free chunk. + */ + if (free == -1) { + if (debug) + log("No more free buffer slots for chunk %d!", + chunk); + return; + } + + /* + * Was this chunk already processed? + */ + if (ChunkBitmap[chunk]) { + if (0) + log("Duplicate chunk %d ignored!", chunk); + return; + } + ChunkBitmap[chunk] = 1; + + if (debug) + log("Allocating chunk buffer %d to chunk %d", + free, chunk); + + i = free; + ChunkBuffer[i].ready = 0; + ChunkBuffer[i].inprogress = 1; + ChunkBuffer[i].thischunk = chunk; + ChunkBuffer[i].blockcount = CHUNKSIZE; + bzero(ChunkBuffer[i].bitmap, sizeof(ChunkBuffer[i].bitmap)); + } + + /* + * Insert the block and update the metainfo. We have to watch for + * duplicate blocks in the same chunk since another client may + * issue a request for a lost block, and we will see that even if + * we do not need it (cause of broadcast/multicast). + */ + if (ChunkBuffer[i].bitmap[block]) { + if (0) + log("Duplicate block %d in chunk %d", block, chunk); + return; + } + ChunkBuffer[i].bitmap[block] = 1; + ChunkBuffer[i].blockcount--; + memcpy(ChunkBuffer[i].blocks[block].data, p->msg.block.buf, BLOCKSIZE); + LastReceiveChunk = chunk; + + /* + * Anytime we receive a packet thats needed, reset the idle counter. + * This will prevent us from sending too many requests. + */ + IdleCounter = CLIENT_IDLETIMER_COUNT; + + /* + * Is the chunk complete? If so, then release it to the main thread. + */ + if (ChunkBuffer[i].blockcount == 0) { + if (debug) + log("Releasing chunk %d to main thread", chunk); + ChunkBuffer[i].ready = 1; + + /* + * Send off a request for a chunk we do not have yet. This + * should be enough to ensure that there is more work to do + * by the time the main thread finishes the chunk we just + * released. + */ + RequestChunk(0); + } +} + +/* + * Request a chunk/block/range we do not have. + */ +static void +RequestRange(int chunk, int block, int count) +{ + Packet_t packet, *p = &packet; + + if (debug) + log("Requesting chunk:%d block:%d count:%d", + chunk, block, count); + + p->hdr.type = PKTTYPE_REQUEST; + p->hdr.subtype = PKTSUBTYPE_REQUEST; + p->hdr.datalen = sizeof(p->msg.request); + p->msg.request.chunk = chunk; + p->msg.request.block = block; + p->msg.request.count = count; + PacketSend(p); +} + +static void +RequestChunk(int timedout) +{ + int i, j, k; + + /* + * Look for unfinished chunks. + */ + for (i = 0; i < MAXCHUNKBUFS; i++) { + if (! ChunkBuffer[i].inprogress || ChunkBuffer[i].ready) + continue; + + /* + * If we timed out, then its okay to send in another request + * for the last chunk in progress. The point of this test + * is to prevent requesting a chunk we are currently getting + * packets for. But once we time out, we obviously want to + * get more blocks for it. + */ + if (ChunkBuffer[i].thischunk == LastReceiveChunk && !timedout) + continue; + + /* + * Scan the bitmap. We do not want to request single blocks, + * but rather small groups of them. + */ + j = 0; + while (j < CHUNKSIZE) { + /* + * When we find a missing block, scan forward from + * that point till we get to a block that is present. + * Send a request for the intervening range. + */ + if (! ChunkBuffer[i].bitmap[j]) { + for (k = j; k < CHUNKSIZE; k++) { + if (ChunkBuffer[i].bitmap[k]) + break; + } + RequestRange(ChunkBuffer[i].thischunk, + j, k - j); + j = k; + } + j++; + } + } + + /* + * Make sure we have a place to put new data. + */ + for (i = 0, k = 0; i < MAXCHUNKBUFS; i++) { + if (ChunkBuffer[i].inprogress) + k++; + } + if (MAXCHUNKBUFS - k < MAXREADAHEAD || k >= MAXINPROGRESS) + return; + + /* + * Look for a chunk we are not working on yet. + */ + for (i = 0, j = 0; i < TotalChunkCount && j < MAXREADAHEAD; i++) { + int chunk = ChunkRequestList[i]; + + if (! ChunkBitmap[chunk]) { + /* + * Not working on this one yet, so send off a + * request for it. + */ + RequestRange(chunk, 0, CHUNKSIZE); + j++; + } + } +} + +/* + * Join the Frisbee team, and then go into the main loop above. + */ +static void +PlayFrisbee(void) +{ + Packet_t packet, *p = &packet; + struct timeval stamp, estamp; + unsigned int myid; + + gettimeofday(&stamp, 0); + + /* + * Init the random number generator. We randomize the block request + * sequence above, and its important that each client have a different + * sequence! + */ + srandomdev(); + + /* + * A random number ID. I do not think this is really necessary, + * but perhaps might be useful for determining when a client has + * crashed and returned. + */ + myid = random(); + + /* + * Send a join the team message. We block waiting for a reply + * since we need to know the total block size. We resend the + * message (dups are harmless) if we do not get a reply back. + */ + while (1) { + int countdown = 0; + + if (countdown <= 0) { + p->hdr.type = PKTTYPE_REQUEST; + p->hdr.subtype = PKTSUBTYPE_JOIN; + p->hdr.datalen = sizeof(p->msg.join); + p->msg.join.clientid = myid; + PacketSend(p); + countdown = 3; + } + + /* + * Throw away any data packets. We cannot start until + * we get a reply back. Wait several receive delays + * before resending the join message. + */ + if (PacketReceive(p) < 0) { + countdown--; + continue; + } + + if (p->hdr.subtype == PKTSUBTYPE_JOIN && + p->hdr.type == PKTTYPE_REPLY) { + break; + } + } + TotalChunkCount = p->msg.join.blockcount / BLOCKSIZE; + + log("Joined the team. ID is %u. File is %d chunks (%d blocks)", + myid, TotalChunkCount, p->msg.join.blockcount); + + ChunkerStartup(); + + /* + * Done! Handshake our exit with the server. + */ + while (1) { + int countdown = 0; + + if (countdown <= 0) { + p->hdr.type = PKTTYPE_REQUEST; + p->hdr.subtype = PKTSUBTYPE_LEAVE; + p->hdr.datalen = sizeof(p->msg.join); + p->msg.join.clientid = myid; + PacketSend(p); + countdown = 3; + } + + /* + * Throw away any data packets until we get a reply back. + * Wait several receive delays before resending the message. + */ + if (PacketReceive(p) < 0) { + countdown--; + continue; + } + + if (p->hdr.subtype == PKTSUBTYPE_LEAVE && + p->hdr.type == PKTTYPE_REPLY) { + break; + } + } + gettimeofday(&estamp, 0); + estamp.tv_sec -= stamp.tv_sec; + log("Left the team after %ld seconds on the field!", estamp.tv_sec); +} + +/* + * Supply a read function to the imageunzip library. Kinda hokey right + * now. The original imageunzip code depends on read to keep track of + * the seek offset within the input file. Well, in our case we know what + * chunk the inflator is working on, and we keep track of the block offset + * as well. + * + * XXX We copy out the data. Need to change this interface to avoid that. + */ +static int ImageUnzipOffset; +static int ImageUnzipBuffer; +extern int inflate_subblock(void); + +static int +ImageUnzip(int slot) +{ + ImageUnzipBuffer = slot; + ImageUnzipOffset = 0; + + if (inflate_subblock()) + pfatal("ImageUnzip: inflate_subblock failed"); + + return 0; +} + +int +FrisbeeRead(void **buf, size_t count) +{ + char *data = (char *) &ChunkBuffer[ImageUnzipBuffer].blocks; + + data += ImageUnzipOffset; +#if 1 + *buf = data; +#else + memcpy(buf, data, count); +#endif + ImageUnzipOffset += count; + return count; +} diff --git a/os/frisbee.redux/decls.h b/os/frisbee.redux/decls.h new file mode 100644 index 0000000000000000000000000000000000000000..dfb36fcd5c8b334617e6970357e82d0d4642f14a --- /dev/null +++ b/os/frisbee.redux/decls.h @@ -0,0 +1,125 @@ +/* + * Shared for defintions for frisbee client/server code. + */ + +#include "log.h" + +/* + * Max number of clients we can support at once. Not likely to be an issue + * since the amount of per client state is very little. + */ +#define MAXCLIENTS 1000 + +/* + * We operate in terms of this blocksize (in bytes). + */ +#define BLOCKSIZE 1024 + +/* + * Each chunk is this many blocks. + */ +#define CHUNKSIZE 1024 + +/* + * The number of chunk buffers in the client. + */ +#define MAXCHUNKBUFS 16 + +/* + * The number of read-ahead chunks that the client will request + * at a time. No point in requesting to far ahead either, since they + * are uncompressed/written at a fraction of the network transfer speed. + * Also, with multiple clients at different stages, each requesting blocks + * it is likely that there will be plenty more chunks ready or in progress. + */ +#define MAXREADAHEAD 2 +#define MAXINPROGRESS 4 + +/* + * Timeout (in usecs) for packet receive. The idletimer number is how + * many PKT timeouts we allow before requesting more data from the server. + * That is, if we go TIMEOUT usecs without getting a packet, then ask for + * more (or on the server side, poll the clients). On the server, side + * use a timeout to check for dead clients. We want that to be longish. + */ +#define PKTRCV_TIMEOUT 30000 +#define CLIENT_IDLETIMER_COUNT 1 +#define SERVER_IDLETIMER_COUNT ((300 * 1000000) / PKTRCV_TIMEOUT) + +/* + * Timeout (in seconds!) server will hang around with no active clients. + */ +#define SERVER_INACTIVE_SECONDS 30 + +/* + * The number of disk read blocks in a single read on the server. + * Must be an even divisor of CHUNKSIZE. + */ +#define MAXREADBLOCKS 32 + +/* + * Packet defs. + */ +typedef struct { + struct { + int type; + int subtype; + int datalen; /* Useful amount of data in packet */ + unsigned int srcip; /* Filled in by network level. */ + } hdr; + union { + /* + * Join/leave the Team. Send a randomized ID, and receive + * the number of blocks in the file. + */ + union { + unsigned int clientid; + int blockcount; + } join; + + /* + * A data block, indexed by chunk,block. + */ + struct { + int chunk; + int block; + char buf[BLOCKSIZE]; + } block; + + /* + * A request for a data block, indexed by chunk,block. + */ + struct { + int chunk; + int block; + int count; /* Number of blocks */ + } request; + } msg; +} Packet_t; +#define PKTTYPE_REQUEST 1 +#define PKTTYPE_REPLY 2 + +#define PKTSUBTYPE_JOIN 1 +#define PKTSUBTYPE_LEAVE 2 +#define PKTSUBTYPE_BLOCK 3 +#define PKTSUBTYPE_REQUEST 4 + +/* + * Protos. + */ +int ClientNetInit(void); +int ServerNetInit(void); +int PacketReceive(Packet_t *p); +int PacketSend(Packet_t *p); +int PacketReply(Packet_t *p); +char *CurrentTimeString(void); +int fsleep(unsigned int usecs); + +/* + * Globals + */ +extern int debug; +extern int portnum; +extern struct in_addr mcastaddr; +extern struct in_addr mcastif; +extern char *filename; diff --git a/os/frisbee.redux/log.c b/os/frisbee.redux/log.c new file mode 100644 index 0000000000000000000000000000000000000000..d1678c7c3956e324dfa2aa93e9b6a41954234583 --- /dev/null +++ b/os/frisbee.redux/log.c @@ -0,0 +1,116 @@ +/* + * Logging and debug routines. + */ + +#include +#include +#include +#include +#include +#include +#include +#include "decls.h" + +static int usesyslog = 1; + +/* + * There is really no point in the client using syslog, but its nice + * to use the same log functions either way. + */ +int +ClientLogInit(void) +{ + usesyslog = 0; + return 0; +} + +int +ServerLogInit(void) +{ + if (debug) { + usesyslog = 0; + return 1; + } + + openlog("frisbee", LOG_PID, LOG_USER); + + return 0; +} + +void +log(const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + if (!usesyslog) { + vfprintf(stderr, fmt, args); + fputc('\n', stderr); + fflush(stderr); + } + else + vsyslog(LOG_INFO, fmt, args); + + va_end(args); +} + +void +warning(const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + if (!usesyslog) { + vfprintf(stderr, fmt, args); + fputc('\n', stderr); + fflush(stderr); + } + else + vsyslog(LOG_WARNING, fmt, args); + + va_end(args); +} + +void +fatal(const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + if (!usesyslog) { + vfprintf(stderr, fmt, args); + fputc('\n', stderr); + fflush(stderr); + } + else + vsyslog(LOG_ERR, fmt, args); + + va_end(args); + exit(-1); +} + +void +pwarning(const char *fmt, ...) +{ + va_list args; + char buf[BUFSIZ]; + + va_start(args, fmt); + vsnprintf(buf, sizeof(buf), fmt, args); + va_end(args); + + warning("%s : %s", buf, strerror(errno)); +} + +void +pfatal(const char *fmt, ...) +{ + va_list args; + char buf[BUFSIZ]; + + va_start(args, fmt); + vsnprintf(buf, sizeof(buf), fmt, args); + va_end(args); + + fatal("%s : %s", buf, strerror(errno)); +} diff --git a/os/frisbee.redux/log.h b/os/frisbee.redux/log.h new file mode 100644 index 0000000000000000000000000000000000000000..74974ff2412344ea77e4579b3ff01d1c78daf2a0 --- /dev/null +++ b/os/frisbee.redux/log.h @@ -0,0 +1,12 @@ +/* + * Log defs. + */ +#include + +int ClientLogInit(void); +int ServerLogInit(void); +void log(const char *fmt, ...); +void warning(const char *fmt, ...); +void fatal(const char *fmt, ...); +void pwarning(const char *fmt, ...); +void pfatal(const char *fmt, ...); diff --git a/os/frisbee.redux/network.c b/os/frisbee.redux/network.c new file mode 100644 index 0000000000000000000000000000000000000000..398794c902e189f8594f50c10f50f035461610ba --- /dev/null +++ b/os/frisbee.redux/network.c @@ -0,0 +1,243 @@ +/* + * Network routines. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "decls.h" + +/* Max number of times to attempt bind to port before failing. */ +#define MAXBINDATTEMPTS 10 + +/* Max number of hops multicast hops. */ +#define MCAST_TTL 5 + +static int sock; +static struct in_addr ipaddr; + +static void +CommonInit(void) +{ + struct sockaddr_in name; + struct timeval timeout; + int i; + char buf[BUFSIZ]; + struct hostent *he; + + if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) + pfatal("Could not allocate a socket"); + + i = (128 * 1024); + setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &i, sizeof(i)); + + i = (128 * 1024); + setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &i, sizeof(i)); + + name.sin_family = AF_INET; + name.sin_port = htons(portnum); + name.sin_addr.s_addr = htonl(INADDR_ANY); + + i = MAXBINDATTEMPTS; + while (i) { + if (bind(sock, (struct sockaddr *)&name, sizeof(name)) == 0) + break; + + pwarning("Bind to port %d failed. Will try %d more times!", + portnum, i); + i--; + sleep(5); + } + log("Bound to port %d", portnum); + + /* + * At present, we use a multicast address in both directions. + */ + if ((ntohl(mcastaddr.s_addr) >> 28) == 14) { + unsigned int loop = 0, ttl = MCAST_TTL; + struct ip_mreq mreq; + + log("Using Multicast"); + + mreq.imr_multiaddr.s_addr = mcastaddr.s_addr; + + if (mcastif.s_addr) + mreq.imr_interface.s_addr = mcastif.s_addr; + else + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + + if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreq, sizeof(mreq)) < 0) + pfatal("setsockopt(IPPROTO_IP, IP_ADD_MEMBERSHIP)"); + + if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, + &ttl, sizeof(ttl)) < 0) + pfatal("setsockopt(IPPROTO_IP, IP_MULTICAST_TTL)"); + + /* Disable local echo */ + if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_LOOP, + &loop, sizeof(loop)) < 0) + pfatal("setsockopt(IPPROTO_IP, IP_MULTICAST_LOOP)"); + + if (mcastif.s_addr && + setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF, + &mcastif, sizeof(mcastif)) < 0) { + pfatal("setsockopt(IPPROTO_IP, IP_MULTICAST_IF)"); + } + } + else { + /* + * Otherwise, we use a broadcast addr. + */ + i = 1; + + if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, + &i, sizeof(i)) < 0) + pfatal("setsockopt(SOL_SOCKET, SO_BROADCAST)"); + } + + /* + * We use a socket level timeout instead of polling for data. + */ + timeout.tv_sec = 0; + timeout.tv_usec = PKTRCV_TIMEOUT; + + if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, + &timeout, sizeof(timeout)) < 0) + pfatal("setsockopt(SOL_SOCKET, SO_RCVTIMEO)"); + + /* + * We add our (unicast) IP addr to every outgoing message. + * This is going to be used to return replies to the sender, + * where appropriate. + */ + if (gethostname(buf, sizeof(buf)) < 0) + pfatal("gethostname failed"); + + if ((he = gethostbyname(buf)) == 0) + fatal("gethostbyname: %s", hstrerror(h_errno)); + + memcpy((char *)&ipaddr, he->h_addr, sizeof(ipaddr)); +} + +int +ClientNetInit(void) +{ + CommonInit(); + + return 1; +} + +int +ServerNetInit(void) +{ + CommonInit(); + + return 1; +} + +/* + * Look for a packet on the socket. Propogate the errors back to the caller + * exactly as the system call does. Remember that we set up a socket timeout + * above, so we will get EWOULDBLOCK errors when no data is available. + * + * The amount of data received is determined from the datalen of the hdr. + * All packets are actually the same size/structure. + */ +int +PacketReceive(Packet_t *p) +{ + struct sockaddr_in from; + int mlen, alen = sizeof(from); + + bzero(&from, sizeof(from)); + + if ((mlen = recvfrom(sock, p, sizeof(*p), 0, + (struct sockaddr *)&from, &alen)) < 0) { + if (errno == EWOULDBLOCK) + return -1; + pfatal("PacketReceive(recvfrom)"); + } + if (mlen != sizeof(p->hdr) + p->hdr.datalen) + fatal("PacketReceive: Bad message length %d!=%d", + mlen, p->hdr.datalen); + + return p->hdr.datalen; +} + +/* + * We use blocking sends since there is no point in giving up. All packets + * go to the same place, whether client or server. + * + * The amount of data sent is determined from the datalen of the packet hdr. + * All packets are actually the same size/structure. + */ +int +PacketSend(Packet_t *p) +{ + struct sockaddr_in to; + int len; + + len = sizeof(p->hdr) + p->hdr.datalen; + p->hdr.srcip = ipaddr.s_addr; + + to.sin_family = AF_INET; + to.sin_port = htons(portnum); + to.sin_addr.s_addr = mcastaddr.s_addr; + + while (sendto(sock, (void *)p, len, 0, + (struct sockaddr *)&to, sizeof(to)) < 0) { + if (errno != ENOBUFS) + pfatal("PacketSend(sendto)"); + + /* + * ENOBUFS means we ran out of mbufs. Okay to sleep a bit + * to let things drain. + */ + fsleep(10000); + } + + return p->hdr.datalen; +} + +/* + * Basically the same as above, but instead of sending to the multicast + * group, send to the (unicast) IP in the packet header. This simplifies + * the logic in a number of places, by avoiding having to deal with + * multicast packets that are not destined for us, but for someone else. + */ +int +PacketReply(Packet_t *p) +{ + struct sockaddr_in to; + int len; + + len = sizeof(p->hdr) + p->hdr.datalen; + + to.sin_family = AF_INET; + to.sin_port = htons(portnum); + to.sin_addr.s_addr = p->hdr.srcip; + p->hdr.srcip = ipaddr.s_addr; + + while (sendto(sock, (void *)p, len, 0, + (struct sockaddr *)&to, sizeof(to)) < 0) { + if (errno != ENOBUFS) + pfatal("PacketSend(sendto)"); + + /* + * ENOBUFS means we ran out of mbufs. Okay to sleep a bit + * to let things drain. + */ + fsleep(10000); + } + + return p->hdr.datalen; +} diff --git a/os/frisbee.redux/queue.h b/os/frisbee.redux/queue.h new file mode 100644 index 0000000000000000000000000000000000000000..a1e961113ca4c0f7ff80fc120d3da0299062f1ce --- /dev/null +++ b/os/frisbee.redux/queue.h @@ -0,0 +1,377 @@ +/* + * Mach Operating System + * Copyright (c) 1991,1990,1989,1988,1987 Carnegie Mellon University + * All Rights Reserved. + * + * Permission to use, copy, modify and distribute this software and its + * documentation is hereby granted, provided that both the copyright + * notice and this permission notice appear in all copies of the + * software, derivative works or modified versions, and any portions + * thereof, and that both notices appear in supporting documentation. + * + * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" + * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR + * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. + * + * Carnegie Mellon requests users of this software to return to + * + * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU + * School of Computer Science + * Carnegie Mellon University + * Pittsburgh PA 15213-3890 + * + * any improvements or extensions that they make and grant Carnegie Mellon rights + * to redistribute these changes. + */ +/* + * File: queue.h + * Author: Avadis Tevanian, Jr. + * Date: 1985 + * + * Type definitions for generic queues. + * + */ + +#ifndef _KERN_QUEUE_H_ +#define _KERN_QUEUE_H_ + +/* + * Queue of abstract objects. Queue is maintained + * within that object. + * + * Supports fast removal from within the queue. + * + * How to declare a queue of elements of type "foo_t": + * In the "*foo_t" type, you must have a field of + * type "queue_chain_t" to hold together this queue. + * There may be more than one chain through a + * "foo_t", for use by different queues. + * + * Declare the queue as a "queue_t" type. + * + * Elements of the queue (of type "foo_t", that is) + * are referred to by reference, and cast to type + * "queue_entry_t" within this module. + */ + +/* + * A generic doubly-linked list (queue). + */ + +struct queue_entry { + struct queue_entry *next; /* next element */ + struct queue_entry *prev; /* previous element */ +}; + +typedef struct queue_entry *queue_t; +typedef struct queue_entry queue_head_t; +typedef struct queue_entry queue_chain_t; +typedef struct queue_entry *queue_entry_t; + +/* + * Macro: queue_init + * Function: + * Initialize the given queue. + * Header: + * void queue_init(q) + * queue_t q; *MODIFIED* + */ +#define queue_init(q) ((q)->next = (q)->prev = q) + +/* + * Macro: queue_first + * Function: + * Returns the first entry in the queue, + * Header: + * queue_entry_t queue_first(q) + * queue_t q; *IN* + */ +#define queue_first(q) ((q)->next) + +/* + * Macro: queue_next + * Function: + * Returns the entry after an item in the queue. + * Header: + * queue_entry_t queue_next(qc) + * queue_t qc; + */ +#define queue_next(qc) ((qc)->next) + +/* + * Macro: queue_last + * Function: + * Returns the last entry in the queue. + * Header: + * queue_entry_t queue_last(q) + * queue_t q; *IN* + */ +#define queue_last(q) ((q)->prev) + +/* + * Macro: queue_prev + * Function: + * Returns the entry before an item in the queue. + * Header: + * queue_entry_t queue_prev(qc) + * queue_t qc; + */ +#define queue_prev(qc) ((qc)->prev) + +/* + * Macro: queue_end + * Function: + * Tests whether a new entry is really the end of + * the queue. + * Header: + * boolean_t queue_end(q, qe) + * queue_t q; + * queue_entry_t qe; + */ +#define queue_end(q, qe) ((q) == (qe)) + +/* + * Macro: queue_empty + * Function: + * Tests whether a queue is empty. + * Header: + * boolean_t queue_empty(q) + * queue_t q; + */ +#define queue_empty(q) queue_end((q), queue_first(q)) + + +/*----------------------------------------------------------------*/ +/* + * Macros that operate on generic structures. The queue + * chain may be at any location within the structure, and there + * may be more than one chain. + */ + +/* + * Macro: queue_enter + * Function: + * Insert a new element at the tail of the queue. + * Header: + * void queue_enter(q, elt, type, field) + * queue_t q; + * elt; + * is what's in our queue + * is the chain field in (*) + */ +#define queue_enter(head, elt, type, field) \ +{ \ + register queue_entry_t prev; \ + \ + prev = (head)->prev; \ + if ((head) == prev) { \ + (head)->next = (queue_entry_t) (elt); \ + } \ + else { \ + ((type)prev)->field.next = (queue_entry_t)(elt);\ + } \ + (elt)->field.prev = prev; \ + (elt)->field.next = head; \ + (head)->prev = (queue_entry_t) elt; \ +} + +/* + * Macro: queue_enter_first + * Function: + * Insert a new element at the head of the queue. + * Header: + * void queue_enter_first(q, elt, type, field) + * queue_t q; + * elt; + * is what's in our queue + * is the chain field in (*) + */ +#define queue_enter_first(head, elt, type, field) \ +{ \ + register queue_entry_t next; \ + \ + next = (head)->next; \ + if ((head) == next) { \ + (head)->prev = (queue_entry_t) (elt); \ + } \ + else { \ + ((type)next)->field.prev = (queue_entry_t)(elt);\ + } \ + (elt)->field.next = next; \ + (elt)->field.prev = head; \ + (head)->next = (queue_entry_t) elt; \ +} + +/* + * Macro: queue_enter_before + * Function: + * Insert a new element before the indicated element. + * Header: + * void queue_enter_before(q, nelt, elt, type, field) + * queue_t q; + * nelt, elt; + * is what's in our queue + * is the chain field in (*) + */ +#define queue_enter_before(head, nelt, elt, type, field) \ +{ \ + register queue_entry_t prev; \ + \ + (elt)->field.next = (queue_entry_t)(nelt); \ + if ((head) == (queue_entry_t)(nelt)) { \ + (elt)->field.prev = (head)->prev; \ + prev = (head)->prev; \ + (head)->prev = (queue_entry_t)(elt); \ + } else { \ + (elt)->field.prev = (nelt)->field.prev; \ + prev = (nelt)->field.prev; \ + (nelt)->field.prev = (queue_entry_t)(elt); \ + } \ + if ((head) == prev) \ + (head)->next = (queue_entry_t)(elt); \ + else \ + ((type)prev)->field.next = (queue_entry_t)(elt);\ +} + +/* + * Macro: queue_enter_after + * Function: + * Insert a new element after the indicated element. + * Header: + * void queue_enter_after(q, pelt, elt, type, field) + * queue_t q; + * pelt, elt; + * is what's in our queue + * is the chain field in (*) + */ +#define queue_enter_after(head, pelt, elt, type, field) \ +{ \ + register queue_entry_t next; \ + \ + (elt)->field.prev = (queue_entry_t)(pelt); \ + if ((head) == (queue_entry_t)(pelt)) { \ + (elt)->field.next = (head)->next; \ + next = (head)->next; \ + (head)->next = (queue_entry_t)(elt); \ + } else { \ + (elt)->field.next = (pelt)->field.next; \ + next = (pelt)->field.next; \ + (pelt)->field.next = (queue_entry_t)(elt); \ + } \ + if ((head) == next) \ + (head)->prev = (queue_entry_t)(elt); \ + else \ + ((type)next)->field.prev = (queue_entry_t)(elt);\ +} + +/* + * Macro: queue_field [internal use only] + * Function: + * Find the queue_chain_t (or queue_t) for the + * given element (thing) in the given queue (head) + */ +#define queue_field(head, thing, type, field) \ + (((head) == (thing)) ? (head) : &((type)(thing))->field) + +/* + * Macro: queue_remove + * Function: + * Remove an arbitrary item from the queue. + * Header: + * void queue_remove(q, qe, type, field) + * arguments as in queue_enter + */ +#define queue_remove(head, elt, type, field) \ +{ \ + register queue_entry_t next, prev; \ + \ + next = (elt)->field.next; \ + prev = (elt)->field.prev; \ + \ + if ((head) == next) \ + (head)->prev = prev; \ + else \ + ((type)next)->field.prev = prev; \ + \ + if ((head) == prev) \ + (head)->next = next; \ + else \ + ((type)prev)->field.next = next; \ +} + +/* + * Macro: queue_remove_first + * Function: + * Remove and return the entry at the head of + * the queue. + * Header: + * queue_remove_first(head, entry, type, field) + * entry is returned by reference + */ +#define queue_remove_first(head, entry, type, field) \ +{ \ + register queue_entry_t next; \ + \ + (entry) = (type) ((head)->next); \ + next = (entry)->field.next; \ + \ + if ((head) == next) \ + (head)->prev = (head); \ + else \ + ((type)(next))->field.prev = (head); \ + (head)->next = next; \ +} + +/* + * Macro: queue_remove_last + * Function: + * Remove and return the entry at the tail of + * the queue. + * Header: + * queue_remove_last(head, entry, type, field) + * entry is returned by reference + */ +#define queue_remove_last(head, entry, type, field) \ +{ \ + register queue_entry_t prev; \ + \ + (entry) = (type) ((head)->prev); \ + prev = (entry)->field.prev; \ + \ + if ((head) == prev) \ + (head)->next = (head); \ + else \ + ((type)(prev))->field.next = (head); \ + (head)->prev = prev; \ +} + +/* + * Macro: queue_assign + */ +#define queue_assign(to, from, type, field) \ +{ \ + ((type)((from)->prev))->field.next = (to); \ + ((type)((from)->next))->field.prev = (to); \ + *to = *from; \ +} + +/* + * Macro: queue_iterate + * Function: + * iterate over each item in the queue. + * Generates a 'for' loop, setting elt to + * each item in turn (by reference). + * Header: + * queue_iterate(q, elt, type, field) + * queue_t q; + * elt; + * is what's in our queue + * is the chain field in (*) + */ +#define queue_iterate(head, elt, type, field) \ + for ((elt) = (type) queue_first(head); \ + !queue_end((head), (queue_entry_t)(elt)); \ + (elt) = (type) queue_next(&(elt)->field)) + + +#endif /* _KERN_QUEUE_H_ */ diff --git a/os/frisbee.redux/server.c b/os/frisbee.redux/server.c new file mode 100644 index 0000000000000000000000000000000000000000..f86545fb7ba7f36b099af1adb11ef9e3a043ae32 --- /dev/null +++ b/os/frisbee.redux/server.c @@ -0,0 +1,561 @@ +/* + * Frisbee server + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "decls.h" +#include "queue.h" + +/* Globals */ +int debug = 0; +int portnum; +struct in_addr mcastaddr; +struct in_addr mcastif; +char *filename; +struct timeval InactivityTimeStamp; +long long TimeCounter; + +/* + * This structure defines the file we are spitting back. + */ +struct FileInfo { + int fd; /* Open file descriptor */ + int blocks; /* Number of BLOCKSIZE blocks */ + int chunks; /* Number of CHUNKSIZE chunks */ + struct { + struct in_addr ipaddr; /* Client Network Address */ + unsigned int id; /* Random ID. */ + long long lastcontact; /* Last time heard from */ + } clients[MAXCLIENTS]; + int numclients; /* Current count of clients */ +}; +static struct FileInfo FileInfo; + +/* + * The work queue of regions a client has requested. + */ +typedef struct { + queue_chain_t chain; + int chunk; /* Which chunk */ + int block; /* Which starting block */ + int count; /* How many blocks */ +} WQelem_t; +static queue_head_t WorkQ; +static pthread_mutex_t WorkQLock; +static pthread_cond_t WorkQCond; + +/* + * Work queue routines. The work queue is a time ordered list of chunk/blocks + * pieces that a client is missing. When a request comes in, lock the list + * and scan it for an existing work item that covers the new request. The new + * request can be dropped if there already exists a Q item, since the client + * is going to see that piece eventually. + * + * We use a spinlock to guard the work queue, which incidentally will protect + * malloc/free. + * + * XXX - Clients make requests for chunk/block pieces they are + * missing. For now, map that into an entire chunk and add it to the + * work queue. This is going to result in a lot more data being sent + * than is needed by the client, but lets wait and see if that + * matters. + */ +static void +WorkQueueInit(void) +{ + pthread_mutex_init(&WorkQLock, NULL); + pthread_cond_init(&WorkQCond, NULL); + queue_init(&WorkQ); +} + +static int +WorkQueueEnqueue(int chunk, int block, int blockcount) +{ + WQelem_t *wqel; + + pthread_mutex_lock(&WorkQLock); + + queue_iterate(&WorkQ, wqel, WQelem_t *, chain) { + if (wqel->chunk == chunk && + wqel->block == block && + wqel->count == blockcount) { + pthread_mutex_unlock(&WorkQLock); + return 0; + } + } + + if ((wqel = (WQelem_t *) calloc(1, sizeof(WQelem_t))) == NULL) + fatal("WorkQueueEnqueue: No more memory"); + + wqel->chunk = chunk; + wqel->block = block; + wqel->count = blockcount; + queue_enter(&WorkQ, wqel, WQelem_t *, chain); + +/* pthread_cond_signal(&WorkQCond); */ + pthread_mutex_unlock(&WorkQLock); + return 1; +} + +static int +WorkQueueDequeue(int *chunk, int *block, int *blockcount) +{ + WQelem_t *wqel; + + pthread_mutex_lock(&WorkQLock); +#if 0 + while (queue_empty(&WorkQ)) { + struct timeval tval; + struct timespec tspec; + int foo; + + if (gettimeofday(&tval, NULL) < 0) { + pwarning("gettimeofday"); + pthread_mutex_unlock(&WorkQLock); + return 0; + } + TIMEVAL_TO_TIMESPEC(&tval, &tspec); + tspec.tv_sec += 1; + + foo = pthread_cond_timedwait(&WorkQCond, &WorkQLock, &tspec); + + if (foo == ETIMEDOUT) { + pthread_mutex_unlock(&WorkQLock); + return 0; + } + } +#else + if (queue_empty(&WorkQ)) { + pthread_mutex_unlock(&WorkQLock); + fsleep(5000); + return 0; + } +#endif + queue_remove_first(&WorkQ, wqel, WQelem_t *, chain); + *chunk = wqel->chunk; + *block = wqel->block; + *blockcount = wqel->count; + free(wqel); + + pthread_mutex_unlock(&WorkQLock); + return 1; +} + +/* + * A client joins. This message is information in the sense that we use it + * to determine if any clients are listening. As long as there are, we + * continue to throw out chunks. We allow for duplicates from a host, which + * might occur if the reply was lost or if the host crashed and restarted. + * If a client crashes and never returns, we will find out about it later, + * when we poll it to see if its alive. + */ +static void +ClientJoin(Packet_t *p) +{ + struct in_addr ipaddr = { p->hdr.srcip }; + int i; + + /* + * Check to see if this is a duplicate. I do not think this matters + * but lets record the info anyway. + */ + for (i = 0; i < MAXCLIENTS; i++) { + if (FileInfo.clients[i].ipaddr.s_addr == ipaddr.s_addr) { + log("Duplicate join request from %s at %s!", + inet_ntoa(ipaddr), CurrentTimeString()); + break; + } + } + + if (i == MAXCLIENTS) { + /* + * Okay, not a current subscriber. Record the info in a free + * client slot. + */ + for (i = 0; i < FileInfo.numclients; i++) { + if (! FileInfo.clients[i].ipaddr.s_addr) + break; + } + if (i == MAXCLIENTS) + fatal("Too many subscribers!"); + + FileInfo.clients[i].lastcontact = TimeCounter; + FileInfo.clients[i].id = p->msg.join.clientid; + FileInfo.clients[i].ipaddr.s_addr = ipaddr.s_addr; + FileInfo.numclients++; + } + log("Client %s (id %u) joins at %s!", + inet_ntoa(ipaddr), p->msg.join.clientid, CurrentTimeString()); + + /* + * Return fileinfo even for duplicates since that could mean that + * the last reply got lost. Duplicate replies are harmless. + */ + p->hdr.type = PKTTYPE_REPLY; + p->hdr.datalen = sizeof(p->msg.join); + p->msg.join.blockcount = FileInfo.blocks; + PacketReply(p); + + /* + * Whenever a client joins, add the first chunk to the work queue. + */ + WorkQueueEnqueue(0, 0, CHUNKSIZE); +} + +/* + * A client leaves. Not much to it. We return a reply. + */ +static void +ClientLeave(Packet_t *p) +{ + struct in_addr ipaddr = { p->hdr.srcip }; + int i; + + /* + * Check to see if this is a valid leave message. + */ + for (i = 0; i < MAXCLIENTS; i++) { + if (FileInfo.clients[i].ipaddr.s_addr == ipaddr.s_addr) { + if (FileInfo.clients[i].id != p->msg.join.clientid) { + log("ClientLeave: Bad ID for %s. ID %u!=%u!", + inet_ntoa(ipaddr), p->msg.join.clientid, + FileInfo.clients[i].id); + } + break; + } + } + if (i == MAXCLIENTS) { + log("ClientLeave: No such client %s (id %u) connected!", + inet_ntoa(ipaddr), p->msg.join.clientid); + return; + } + + FileInfo.clients[i].ipaddr.s_addr = 0; + FileInfo.clients[i].id = 0; + FileInfo.numclients--; + + p->hdr.type = PKTTYPE_REPLY; + p->hdr.datalen = sizeof(p->msg.join); + PacketReply(p); + + /* + * Whenever the count goes to zero, set the idletimer. We use this + * to dump out of the program. + */ + if (! FileInfo.numclients) + gettimeofday(&InactivityTimeStamp, 0); +} + +/* + * Check for dead clients. + */ +static void +CheckClients(void) +{ + int i, oldcount = FileInfo.numclients; + + for (i = 0; i < MAXCLIENTS; i++) { + if (FileInfo.clients[i].ipaddr.s_addr && + (TimeCounter - FileInfo.clients[i].lastcontact) > + SERVER_IDLETIMER_COUNT) { + log("Client %s (id %u) idle for too long.", + inet_ntoa(FileInfo.clients[i].ipaddr), + FileInfo.clients[i].id); + + FileInfo.clients[i].ipaddr.s_addr = 0; + FileInfo.clients[i].id = 0; + FileInfo.numclients--; + } + } + + /* + * Whenever the count goes to zero, set the idletimer. We use this + * to dump out of the program. + */ + if (oldcount && ! FileInfo.numclients) + gettimeofday(&InactivityTimeStamp, 0); +} + +/* + * A client requests a chunk/block. Add to the workqueue, but do not + * send a reply. The client will make a new request later if the packet + * got lost. + */ +static void +ClientRequest(Packet_t *p) +{ + struct in_addr ipaddr = { p->hdr.srcip }; + int chunk = p->msg.request.chunk; + int block = p->msg.request.block; + int count = p->msg.request.count; + int enqueued, i; + + if (block + count > CHUNKSIZE) + fatal("Bad request from %s - chunk:%d block:%d size:%d", + inet_ntoa(ipaddr), chunk, block, count); + + enqueued = WorkQueueEnqueue(chunk, block, count); + + if (debug) { + log("Client %s requests chunk:%d block:%d size:%d new:%d", + inet_ntoa(ipaddr), chunk, block, count, enqueued); + } + + /* + * Set the lastcontact for this client. This serves as a crude + * mechansim for determining when a client crashes. If an existing + * client goes for too long without being heard from, we drop it + * from the client list so that we can idle (and then exit if + * we go inactive for too long). + */ + for (i = 0; i < MAXCLIENTS; i++) { + if (FileInfo.clients[i].ipaddr.s_addr == ipaddr.s_addr) { + FileInfo.clients[i].lastcontact = TimeCounter; + } + } +} + +/* + * The server receive thread. This thread does nothing more than receive + * request packets from the clients, and add to the work queue. + */ +void * +ServerRecvThread(void *arg) +{ + Packet_t packet, *p = &packet; + + if (debug) + log("Server pthread starting up ..."); + + while (1) { + if (PacketReceive(p) < 0) { + TimeCounter++; + continue; + } + + switch (p->hdr.subtype) { + case PKTSUBTYPE_JOIN: + ClientJoin(p); + break; + case PKTSUBTYPE_LEAVE: + ClientLeave(p); + break; + case PKTSUBTYPE_REQUEST: + ClientRequest(p); + break; + default: + fatal("ServerRecvThread: Bad packet type!"); + break; + } + } +} + +/* + * The main thread spits out blocks. + * + * NOTES: Perhaps use readv into a vector of packet buffers? + */ +static void +PlayFrisbee(void) +{ + int chunk, block, blockcount, cc, j; + int startblock, lastblock, throttle; + Packet_t packet, *p = &packet; + static char databuf[MAXREADBLOCKS * BLOCKSIZE]; + off_t offset; + + while (1) { + /* + * We want to exit the program if no clients connect for a + * long time. + */ + if (! FileInfo.numclients) { + struct timeval estamp; + + gettimeofday(&estamp, 0); + + if ((estamp.tv_sec - InactivityTimeStamp.tv_sec) > + SERVER_INACTIVE_SECONDS) { + log("No clients for too long!"); + return; + } + fsleep(10000); + } + + /* + * Look for a WorkQ item to process. When there is nothing + * to process, check to see if any of the clients went away + * mid stream. We want to dump those clients so that we can + * inactive after a while (see above test). + */ + if (! WorkQueueDequeue(&chunk, &startblock, &blockcount)) { + CheckClients(); + continue; + } + lastblock = startblock + blockcount; + throttle = 0; + + /* Offset within the file */ + offset = (((off_t) BLOCKSIZE * chunk * CHUNKSIZE) + + ((off_t) BLOCKSIZE * startblock)); + + for (block = startblock; block < lastblock; ) { + int readcount; + int readsize; + + /* + * Read blocks of data from disk. + */ + if (lastblock - block > MAXREADBLOCKS) + readcount = MAXREADBLOCKS; + else + readcount = lastblock - block; + readsize = readcount * BLOCKSIZE; + + if ((cc = pread(FileInfo.fd, databuf, + readsize, offset)) <= 0) { + if (cc < 0) + pfatal("Reading File"); + fatal("EOF on file"); + } + if (cc != readsize) + fatal("Short read: %d!=%d", cc, readsize); + + for (j = 0; j < readcount; j++) { + p->hdr.type = PKTTYPE_REQUEST; + p->hdr.subtype = PKTSUBTYPE_BLOCK; + p->hdr.datalen = sizeof(p->msg.block); + p->msg.block.chunk = chunk; + p->msg.block.block = block + j; + memcpy(p->msg.block.buf, + &databuf[j * BLOCKSIZE], + BLOCKSIZE); + + PacketSend(p); + } + offset += readsize; + block += readcount; + throttle += readcount; + + if (throttle > 64) { + fsleep(10000); + throttle = 0; + } + } + + /* + * Delay a bit to keep from overloading the network. + * + * XXX - I think this timeout value needs to be tuned on + * the fly. Not sure yet how that would be done. + if (blockcount > (2 * MAXREADBLOCKS)) + fsleep(50000); + */ + } +} + +char *usagestr = + "usage: frisbeed [-d] <-p #> <-m mcastaddr> \n" + " -d Turn on debugging. Multiple -d options increase output.\n" + " -p portnum Specify a port number to listen on.\n" + " -m mcastaddr Specify a multicast address in dotted notation.\n" + " -i mcastif Specify a multicast interface in dotted notation.\n" + "\n"; + +void +usage() +{ + fprintf(stderr, usagestr); + exit(1); +} + +int +main(int argc, char **argv) +{ + int ch, fd; + pthread_t child_pid; + off_t fsize; + + while ((ch = getopt(argc, argv, "dhp:m:i:")) != -1) + switch(ch) { + case 'd': + debug++; + break; + + case 'p': + portnum = atoi(optarg); + break; + + case 'm': + inet_aton(optarg, &mcastaddr); + break; + + case 'i': + inet_aton(optarg, &mcastif); + break; + case 'h': + case '?': + default: + usage(); + } + argc -= optind; + argv += optind; + + if (argc != 1) + usage(); + + if (!portnum || ! mcastaddr.s_addr) + usage(); + + ServerLogInit(); + WorkQueueInit(); + ServerNetInit(); + + filename = argv[0]; + if (access(filename, R_OK) < 0) + pfatal("Cannot read %s", filename); + + /* + * Open the file and get its size so that we can tell clients how + * much to expect/require. + */ + if ((fd = open(filename, O_RDONLY)) < 0) + pfatal("Cannot open %s", filename); + + if ((fsize = lseek(fd, (off_t)0, SEEK_END)) < 0) + pfatal("Cannot lseek to end of file"); + + FileInfo.fd = fd; + FileInfo.blocks = (int) roundup(fsize, BLOCKSIZE) / BLOCKSIZE; + FileInfo.chunks = FileInfo.blocks / CHUNKSIZE; + log("Opened %s: %d blocks", filename, FileInfo.blocks); + + /* + * Create the subthread to listen for packets. + */ + if (pthread_create(&child_pid, NULL, ServerRecvThread, (void *)0)) { + fatal("Failed to create pthread!"); + } + gettimeofday(&InactivityTimeStamp, 0); + + PlayFrisbee(); + + /* + * Exit from main thread will kill all the children. + */ + log("Exiting!"); + exit(0); +} + diff --git a/os/frisbee.redux/utils.c b/os/frisbee.redux/utils.c new file mode 100644 index 0000000000000000000000000000000000000000..6d1568973fbb69d57707c75727e04a1598e20ef4 --- /dev/null +++ b/os/frisbee.redux/utils.c @@ -0,0 +1,57 @@ +/* + * Some simple common utility functions. + */ + +#include +#include +#include +#include +#include +#include +#include +#include "decls.h" + +/* + * Return current time in a string printable format. Caller must absorb + * the string. + */ +char * +CurrentTimeString(void) +{ + static char buf[32]; + time_t curtime; + static struct tm tm; + + time(&curtime); + strftime(buf, sizeof(buf), "%T", localtime_r(&curtime, &tm)); + + return buf; +} + +/* + * Sleep. Basically wraps nanosleep, but since the threads package uses + * signals, it keeps ending early! + */ +int +fsleep(unsigned int usecs) +{ + struct timespec time_to_sleep, time_not_slept; + int foo; + + time_to_sleep.tv_nsec = (usecs % 1000000) * 1000; + time_to_sleep.tv_sec = usecs / 1000000; + time_not_slept.tv_nsec = 0; + time_not_slept.tv_sec = 0; + + while ((foo = nanosleep(&time_to_sleep, &time_not_slept)) != 0) { + if (foo < 0 && errno != EINTR) { + return -1; + } + + time_to_sleep.tv_nsec = time_not_slept.tv_nsec; + time_to_sleep.tv_sec = time_not_slept.tv_sec; + time_not_slept.tv_nsec = 0; + time_not_slept.tv_sec = 0; + } + return 0; +}