Commit 7523a98c authored by Leigh B. Stoller's avatar Leigh B. Stoller

Remove all of the connection handling stuff. Clients come and go, and

idleness is defined as an empty work queue. We still use join/leave
messages, but the join message is so that the client can be informed
of the number of blocks in the file. The leave message is strictly
informational, and includes the elapsed time on the client, so that it
can be written to the log file. If that message is lost, no big deal.
I ran a 6 node test on this new code, and all the clients ran in 174
to 176 seconds, with frisbeed using 1% CPU on average (typically
starts out at about 3%, and quickly drops off to steady state).
parent 29552163
......@@ -535,37 +535,21 @@ PlayFrisbee(void)
ChunkerStartup();
gettimeofday(&estamp, 0);
estamp.tv_sec -= stamp.tv_sec;
/*
* Done! Handshake our exit with the server.
* Done! Send off a leave message, but do not worry about whether
* the server gets it. All the server does with it is print a
* timestamp, and that is not critical to operation.
*/
while (1) {
int countdown = 0;
if (countdown <= 0) {
p->hdr.type = PKTTYPE_REQUEST;
p->hdr.subtype = PKTSUBTYPE_LEAVE;
p->hdr.datalen = sizeof(p->msg.join);
p->msg.join.clientid = myid;
PacketSend(p);
countdown = 3;
}
/*
* Throw away any data packets until we get a reply back.
* Wait several receive delays before resending the message.
*/
if (PacketReceive(p) < 0) {
countdown--;
continue;
}
p->hdr.type = PKTTYPE_REQUEST;
p->hdr.subtype = PKTSUBTYPE_LEAVE;
p->hdr.datalen = sizeof(p->msg.leave);
p->msg.leave.clientid = myid;
p->msg.leave.elapsed = estamp.tv_sec;
PacketSend(p);
if (p->hdr.subtype == PKTSUBTYPE_LEAVE &&
p->hdr.type == PKTTYPE_REPLY) {
break;
}
}
gettimeofday(&estamp, 0);
estamp.tv_sec -= stamp.tv_sec;
log("Left the team after %ld seconds on the field!", estamp.tv_sec);
}
......@@ -600,11 +584,7 @@ FrisbeeRead(void **buf, size_t count)
char *data = (char *) &ChunkBuffer[ImageUnzipBuffer].blocks;
data += ImageUnzipOffset;
#if 1
*buf = data;
#else
memcpy(buf, data, count);
#endif
ImageUnzipOffset += count;
return count;
}
......@@ -4,12 +4,6 @@
#include "log.h"
/*
* Max number of clients we can support at once. Not likely to be an issue
* since the amount of per client state is very little.
*/
#define MAXCLIENTS 1000
/*
* We operate in terms of this blocksize (in bytes).
*/
......@@ -39,12 +33,10 @@
* Timeout (in usecs) for packet receive. The idletimer number is how
* many PKT timeouts we allow before requesting more data from the server.
* That is, if we go TIMEOUT usecs without getting a packet, then ask for
* more (or on the server side, poll the clients). On the server, side
* use a timeout to check for dead clients. We want that to be longish.
* more.
*/
#define PKTRCV_TIMEOUT 30000
#define CLIENT_IDLETIMER_COUNT 1
#define SERVER_IDLETIMER_COUNT ((300 * 1000000) / PKTRCV_TIMEOUT)
/*
* Timeout (in seconds!) server will hang around with no active clients.
......@@ -70,12 +62,19 @@ typedef struct {
union {
/*
* Join/leave the Team. Send a randomized ID, and receive
* the number of blocks in the file.
* the number of blocks in the file. This is strictly
* informational; the info is reported in the log file.
* We must return the number of chunks in the file though.
*/
union {
unsigned int clientid;
int blockcount;
} join;
struct {
unsigned int clientid;
int elapsed; /* Stats only */
} leave;
/*
* A data block, indexed by chunk,block.
......
......@@ -24,8 +24,7 @@ int portnum;
struct in_addr mcastaddr;
struct in_addr mcastif;
char *filename;
struct timeval InactivityTimeStamp;
long long TimeCounter;
struct timeval IdleTimeStamp;
/*
* This structure defines the file we are spitting back.
......@@ -34,12 +33,6 @@ struct FileInfo {
int fd; /* Open file descriptor */
int blocks; /* Number of BLOCKSIZE blocks */
int chunks; /* Number of CHUNKSIZE chunks */
struct {
struct in_addr ipaddr; /* Client Network Address */
unsigned int id; /* Random ID. */
long long lastcontact; /* Last time heard from */
} clients[MAXCLIENTS];
int numclients; /* Current count of clients */
};
static struct FileInfo FileInfo;
......@@ -54,7 +47,6 @@ typedef struct {
} WQelem_t;
static queue_head_t WorkQ;
static pthread_mutex_t WorkQLock;
static pthread_cond_t WorkQCond;
/*
* Work queue routines. The work queue is a time ordered list of chunk/blocks
......@@ -76,7 +68,6 @@ static void
WorkQueueInit(void)
{
pthread_mutex_init(&WorkQLock, NULL);
pthread_cond_init(&WorkQCond, NULL);
queue_init(&WorkQ);
}
......@@ -104,7 +95,6 @@ WorkQueueEnqueue(int chunk, int block, int blockcount)
wqel->count = blockcount;
queue_enter(&WorkQ, wqel, WQelem_t *, chain);
/* pthread_cond_signal(&WorkQCond); */
pthread_mutex_unlock(&WorkQLock);
return 1;
}
......@@ -115,34 +105,17 @@ WorkQueueDequeue(int *chunk, int *block, int *blockcount)
WQelem_t *wqel;
pthread_mutex_lock(&WorkQLock);
#if 0
while (queue_empty(&WorkQ)) {
struct timeval tval;
struct timespec tspec;
int foo;
if (gettimeofday(&tval, NULL) < 0) {
pwarning("gettimeofday");
pthread_mutex_unlock(&WorkQLock);
return 0;
}
TIMEVAL_TO_TIMESPEC(&tval, &tspec);
tspec.tv_sec += 1;
foo = pthread_cond_timedwait(&WorkQCond, &WorkQLock, &tspec);
if (foo == ETIMEDOUT) {
pthread_mutex_unlock(&WorkQLock);
return 0;
}
}
#else
/*
* Condvars broken in linux threads impl, so use this rather bogus
* sleep to keep from churning cycles.
*/
if (queue_empty(&WorkQ)) {
pthread_mutex_unlock(&WorkQLock);
fsleep(5000);
return 0;
}
#endif
queue_remove_first(&WorkQ, wqel, WQelem_t *, chain);
*chunk = wqel->chunk;
*block = wqel->block;
......@@ -154,138 +127,44 @@ WorkQueueDequeue(int *chunk, int *block, int *blockcount)
}
/*
* A client joins. This message is information in the sense that we use it
* to determine if any clients are listening. As long as there are, we
* continue to throw out chunks. We allow for duplicates from a host, which
* might occur if the reply was lost or if the host crashed and restarted.
* If a client crashes and never returns, we will find out about it later,
* when we poll it to see if its alive.
* A client joins. We print out the time at which the client joins, and
* return a reply packet with the number of chunks in the file so that
* the client knows how much to ask for. We do not do anything else with
* this info; clients can crash and go away and it does not matter. If they
* crash they will start up again later. Inactivity is defined as a period
* with no data block requests. The client will resend its join message
* until it gets a reply back; duplicates of either the request or the
* reply are harmless.
*/
static void
ClientJoin(Packet_t *p)
{
struct in_addr ipaddr = { p->hdr.srcip };
int i;
/*
* Check to see if this is a duplicate. I do not think this matters
* but lets record the info anyway.
*/
for (i = 0; i < MAXCLIENTS; i++) {
if (FileInfo.clients[i].ipaddr.s_addr == ipaddr.s_addr) {
log("Duplicate join request from %s at %s!",
inet_ntoa(ipaddr), CurrentTimeString());
break;
}
}
if (i == MAXCLIENTS) {
/*
* Okay, not a current subscriber. Record the info in a free
* client slot.
*/
for (i = 0; i < FileInfo.numclients; i++) {
if (! FileInfo.clients[i].ipaddr.s_addr)
break;
}
if (i == MAXCLIENTS)
fatal("Too many subscribers!");
FileInfo.clients[i].lastcontact = TimeCounter;
FileInfo.clients[i].id = p->msg.join.clientid;
FileInfo.clients[i].ipaddr.s_addr = ipaddr.s_addr;
FileInfo.numclients++;
}
log("Client %s (id %u) joins at %s!",
log("%s (id %u) joins at %s!",
inet_ntoa(ipaddr), p->msg.join.clientid, CurrentTimeString());
/*
* Return fileinfo even for duplicates since that could mean that
* the last reply got lost. Duplicate replies are harmless.
* Return fileinfo. Duplicates are harmless.
*/
p->hdr.type = PKTTYPE_REPLY;
p->hdr.datalen = sizeof(p->msg.join);
p->hdr.type = PKTTYPE_REPLY;
p->hdr.datalen = sizeof(p->msg.join);
p->msg.join.blockcount = FileInfo.blocks;
PacketReply(p);
/*
* Whenever a client joins, add the first chunk to the work queue.
*/
WorkQueueEnqueue(0, 0, CHUNKSIZE);
}
/*
* A client leaves. Not much to it. We return a reply.
* A client leaves. Not much to it. All we do is print out a log statement
* about it so that we can see the time. If the packet is lost, no big deal.
*/
static void
ClientLeave(Packet_t *p)
{
struct in_addr ipaddr = { p->hdr.srcip };
int i;
/*
* Check to see if this is a valid leave message.
*/
for (i = 0; i < MAXCLIENTS; i++) {
if (FileInfo.clients[i].ipaddr.s_addr == ipaddr.s_addr) {
if (FileInfo.clients[i].id != p->msg.join.clientid) {
log("ClientLeave: Bad ID for %s. ID %u!=%u!",
inet_ntoa(ipaddr), p->msg.join.clientid,
FileInfo.clients[i].id);
}
break;
}
}
if (i == MAXCLIENTS) {
log("ClientLeave: No such client %s (id %u) connected!",
inet_ntoa(ipaddr), p->msg.join.clientid);
return;
}
FileInfo.clients[i].ipaddr.s_addr = 0;
FileInfo.clients[i].id = 0;
FileInfo.numclients--;
p->hdr.type = PKTTYPE_REPLY;
p->hdr.datalen = sizeof(p->msg.join);
PacketReply(p);
/*
* Whenever the count goes to zero, set the idletimer. We use this
* to dump out of the program.
*/
if (! FileInfo.numclients)
gettimeofday(&InactivityTimeStamp, 0);
}
/*
* Check for dead clients.
*/
static void
CheckClients(void)
{
int i, oldcount = FileInfo.numclients;
for (i = 0; i < MAXCLIENTS; i++) {
if (FileInfo.clients[i].ipaddr.s_addr &&
(TimeCounter - FileInfo.clients[i].lastcontact) >
SERVER_IDLETIMER_COUNT) {
log("Client %s (id %u) idle for too long.",
inet_ntoa(FileInfo.clients[i].ipaddr),
FileInfo.clients[i].id);
FileInfo.clients[i].ipaddr.s_addr = 0;
FileInfo.clients[i].id = 0;
FileInfo.numclients--;
}
}
/*
* Whenever the count goes to zero, set the idletimer. We use this
* to dump out of the program.
*/
if (oldcount && ! FileInfo.numclients)
gettimeofday(&InactivityTimeStamp, 0);
log("%s (id %u) leaves at %s! Reports %d elapsed seconds.",
inet_ntoa(ipaddr), p->msg.leave.clientid, CurrentTimeString(),
p->msg.leave.elapsed);
}
/*
......@@ -300,7 +179,7 @@ ClientRequest(Packet_t *p)
int chunk = p->msg.request.chunk;
int block = p->msg.request.block;
int count = p->msg.request.count;
int enqueued, i;
int enqueued;
if (block + count > CHUNKSIZE)
fatal("Bad request from %s - chunk:%d block:%d size:%d",
......@@ -312,19 +191,6 @@ ClientRequest(Packet_t *p)
log("Client %s requests chunk:%d block:%d size:%d new:%d",
inet_ntoa(ipaddr), chunk, block, count, enqueued);
}
/*
* Set the lastcontact for this client. This serves as a crude
* mechansim for determining when a client crashes. If an existing
* client goes for too long without being heard from, we drop it
* from the client list so that we can idle (and then exit if
* we go inactive for too long).
*/
for (i = 0; i < MAXCLIENTS; i++) {
if (FileInfo.clients[i].ipaddr.s_addr == ipaddr.s_addr) {
FileInfo.clients[i].lastcontact = TimeCounter;
}
}
}
/*
......@@ -341,7 +207,6 @@ ServerRecvThread(void *arg)
while (1) {
if (PacketReceive(p) < 0) {
TimeCounter++;
continue;
}
......@@ -370,7 +235,7 @@ ServerRecvThread(void *arg)
static void
PlayFrisbee(void)
{
int chunk, block, blockcount, cc, j;
int chunk, block, blockcount, cc, j, idlelastloop = 0;
int startblock, lastblock, throttle;
Packet_t packet, *p = &packet;
static char databuf[MAXREADBLOCKS * BLOCKSIZE];
......@@ -378,32 +243,32 @@ PlayFrisbee(void)
while (1) {
/*
* We want to exit the program if no clients connect for a
* long time.
* Look for a WorkQ item to process. When there is nothing
* to process, check for being idle too long, and exit if
* no one asks for anything for a long time. Note that
* WorkQueueDequeue will delay for a while, so this will not
* spin.
*/
if (! FileInfo.numclients) {
struct timeval estamp;
if (! WorkQueueDequeue(&chunk, &startblock, &blockcount)) {
if (idlelastloop) {
struct timeval estamp;
gettimeofday(&estamp, 0);
gettimeofday(&estamp, 0);
if ((estamp.tv_sec - InactivityTimeStamp.tv_sec) >
SERVER_INACTIVE_SECONDS) {
log("No clients for too long!");
return;
if ((estamp.tv_sec - IdleTimeStamp.tv_sec) >
SERVER_INACTIVE_SECONDS) {
log("No requests for too long!");
return;
}
}
else {
gettimeofday(&IdleTimeStamp, 0);
idlelastloop = 1;
}
fsleep(10000);
}
/*
* Look for a WorkQ item to process. When there is nothing
* to process, check to see if any of the clients went away
* mid stream. We want to dump those clients so that we can
* inactive after a while (see above test).
*/
if (! WorkQueueDequeue(&chunk, &startblock, &blockcount)) {
CheckClients();
continue;
}
idlelastloop = 0;
lastblock = startblock + blockcount;
throttle = 0;
......@@ -454,15 +319,6 @@ PlayFrisbee(void)
throttle = 0;
}
}
/*
* Delay a bit to keep from overloading the network.
*
* XXX - I think this timeout value needs to be tuned on
* the fly. Not sure yet how that would be done.
if (blockcount > (2 * MAXREADBLOCKS))
fsleep(50000);
*/
}
}
......@@ -548,7 +404,7 @@ main(int argc, char **argv)
if (pthread_create(&child_pid, NULL, ServerRecvThread, (void *)0)) {
fatal("Failed to create pthread!");
}
gettimeofday(&InactivityTimeStamp, 0);
gettimeofday(&IdleTimeStamp, 0);
PlayFrisbee();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment