From e442c7235f6a48080789e44750850209969514da Mon Sep 17 00:00:00 2001 From: David Johnson Date: Wed, 14 Feb 2007 01:54:52 +0000 Subject: [PATCH] Add UDP to the synthetic app (including application message fragmentation). Add block numbers and timestamps collected along the way to a message header. --- pelab/supafly/defs.h | 39 ++++ pelab/supafly/sfmiddleman.c | 451 ++++++++++++++++++++++++++---------- pelab/supafly/sfreceiver.c | 104 ++++++--- pelab/supafly/sfsender.c | 181 +++++++++++---- pelab/supafly/util.c | 98 ++++++++ 5 files changed, 674 insertions(+), 199 deletions(-) diff --git a/pelab/supafly/defs.h b/pelab/supafly/defs.h index 67106d4ba..be81fe0fb 100644 --- a/pelab/supafly/defs.h +++ b/pelab/supafly/defs.h @@ -34,4 +34,43 @@ typedef uint32_t slen_t; #define MAX_NAME_LEN 255 +/* currently, just setting same as freebsd 6.x to get "same" behavior... */ +#ifndef UDPCTL_MAXDGRAM +#define UDPCTL_MAXDGRAM 9216 +#endif + +/** + * This struct doubles as a tx block hdr and a stats collector. + */ +typedef struct block_hdr { + uint32_t msg_id; + uint32_t block_id; + uint32_t frag_id; + + /* NOTE: times are when the first bytes were sent with write/sendto! */ + /** sender time */ + uint32_t tv_send_send_s; + uint32_t tv_send_send_us; + /** middleman times */ + uint32_t tv_mid_recv_s; + uint32_t tv_mid_recv_us; + uint32_t tv_mid_send_s; + uint32_t tv_mid_send_us; + /** + * receiver time (only used in stats; just here cause it doesn't make + * any difference to have in the packet, cause the packet is filled with + * deadbeef anyway). + */ + uint32_t tv_recv_recv_s; + uint32_t tv_recv_recv_us; +} block_hdr_t; + +int marshall_block_hdr_send(char *buf, + uint32_t msg_id,uint32_t block_id,uint32_t frag_id, + struct timeval *tv_send_send); +int marshall_block_hdr_mid( char *buf, + struct timeval *tv_mid_recv, + struct timeval *tv_mid_send); +block_hdr_t *unmarshall_block_hdr(char *buf,block_hdr_t *fill_hdr); + #endif /* __DEFS_H__ */ diff --git a/pelab/supafly/sfmiddleman.c b/pelab/supafly/sfmiddleman.c index 9393c06fb..ff089e990 100644 --- a/pelab/supafly/sfmiddleman.c +++ b/pelab/supafly/sfmiddleman.c @@ -28,6 +28,8 @@ char *deadbeef = "deadbeef"; int pushback = 0; +int udp = 0; + /** * defs */ @@ -36,11 +38,12 @@ void usage(char *bin) { "USAGE: %s -scohSRudb (option defaults in parens)\n" " -s tx block size (%d)\n" " -c tx block size (%d)\n" - " -h Listen on this host/addr (INADDR_ANY)\n" + " -m Listen on this host/addr (INADDR_ANY)\n" " -S Listen on this port for one sender at a time (%d)\n" " -R Listen on this port for receivers (%d)\n" " -o Only encrypt (instead of dec/enc)\n" " -b Ack all received blocks from sender AFTER crypto\n" + " -U Use udp instead of tcp\n" " -d[d..d] Enable various levels of debug output\n" " -u Print this msg\n", bin,block_size,block_count,srv_send_port,srv_recv_port @@ -51,7 +54,7 @@ void parse_args(int argc,char **argv) { int c; char *ep = NULL; - while ((c = getopt(argc,argv,"s:c:S:R:h:oudb")) != -1) { + while ((c = getopt(argc,argv,"s:c:S:R:m:oudbU")) != -1) { switch(c) { case 's': block_size = (int)strtol(optarg,&ep,10); @@ -67,7 +70,7 @@ void parse_args(int argc,char **argv) { exit(-1); } break; - case 'h': + case 'm': srv_host = optarg; break; case 'S': @@ -96,6 +99,9 @@ void parse_args(int argc,char **argv) { case 'b': pushback = 1; break; + case 'U': + udp = 1; + break; default: break; } @@ -128,6 +134,10 @@ int main(int argc,char **argv) { fd_set static_rfds; int current_send_rc; int block_count = 0; + struct sockaddr_in udp_recv_client_sins[MIDDLEMAN_MAX_CLIENTS]; + int udp_msg_size = 0; + struct timeval t0; + int bytesRead = 0; /* grab some quick args */ parse_args(argc,argv); @@ -135,6 +145,12 @@ int main(int argc,char **argv) { /* initialize ourself... */ srand(time(NULL)); + if (udp) { + memset(udp_recv_client_sins, + 0, + MIDDLEMAN_MAX_CLIENTS * sizeof(struct sockaddr_in)); + } + /* grab the bufs... */ if ((buf = (char *)malloc(sizeof(char)*block_size)) == NULL) { efatal("no memory for data buf"); @@ -167,10 +183,12 @@ int main(int argc,char **argv) { } /* startup server... */ - if ((srv_send_sock = socket(PF_INET,SOCK_STREAM,0)) == -1) { + if ((srv_send_sock = socket(PF_INET,(udp)?SOCK_DGRAM:SOCK_STREAM,0)) + == -1) { efatal("could not get send listen socket"); } - if ((srv_recv_sock = socket(PF_INET,SOCK_STREAM,0)) == -1) { + if ((srv_recv_sock = socket(PF_INET,(udp)?SOCK_DGRAM:SOCK_STREAM,0)) + == -1) { efatal("could not get send listen socket"); } @@ -187,15 +205,17 @@ int main(int argc,char **argv) { efatal("could not bind"); } - /* only listen for len 1 cause we won't accept on this guy if we - * already have a sender - */ - if (listen(srv_send_sock,1) < 0) { - efatal("could not listen"); - } - /* more sane listen queue for this one */ - if (listen(srv_recv_sock,8) < 0) { - efatal("could not listen"); + if (!udp) { + /* only listen for len 1 cause we won't accept on this guy if we + * already have a sender + */ + if (listen(srv_send_sock,1) < 0) { + efatal("could not listen"); + } + /* more sane listen queue for this one */ + if (listen(srv_recv_sock,8) < 0) { + efatal("could not listen"); + } } /* daemonize... */ @@ -213,15 +233,29 @@ int main(int argc,char **argv) { FD_SET(srv_recv_sock,&static_rfds); max_fd = (srv_send_sock > srv_recv_sock) ? srv_send_sock : srv_recv_sock; + if (debug > 1) { + fprintf(stdout,"DEBUG: server running...\n"); + } + /* listen and read forever */ while (1) { /* reset fdsets */ memcpy(&rfds,&static_rfds,sizeof(static_rfds)); + if (debug > 3) { + printf("DEBUG: calling select\n"); + } + retval = select(max_fd+1,&rfds,NULL,NULL,NULL); + + printf("AAA\n"); if (retval > 0) { - if (FD_ISSET(srv_send_sock,&rfds) + /* + * do we have input on port listening for senders, and not + * currently have a sender? + */ + if (!udp && FD_ISSET(srv_send_sock,&rfds) && send_client_fd < 0) { struct sockaddr_in client_sin; socklen_t slen; @@ -255,60 +289,125 @@ int main(int argc,char **argv) { current_send_rc = 0; block_count = 0; } + /* add a sender udp client */ + //else if (FD_ISSET(srv_send_sock,&rfds) && udp) { + + //} + /* add a new sfreceiver client */ else if (FD_ISSET(srv_recv_sock,&rfds)) { struct sockaddr_in client_sin; socklen_t slen; int client_fd; + char tmpbuf[255]; slen = sizeof(client_sin); - if ((client_fd = accept(srv_recv_sock, - (struct sockaddr *)&client_sin, - &slen)) < 0) { - warn("accept failed"); - } - else if (recv_client_cnt >= MIDDLEMAN_MAX_CLIENTS) { - warn("already at max clients"); - close(client_fd); + if (udp) { + if (recvfrom(srv_recv_sock,tmpbuf,sizeof(tmpbuf),0, + (struct sockaddr *)&client_sin,&slen) < 0) { + ewarn("in recvfrom while getting a new receiver " + "client"); + } + else { + /* + * if this is a client we haven't heard from, + * add it; else remove it. Basic "connection." + */ + + int first_zero_idx = -1; + for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) { + if (udp_recv_client_sins[i].sin_addr.s_addr == 0 + && first_zero_idx < 0) { + first_zero_idx = i; + } + else if ((udp_recv_client_sins[i].sin_addr.s_addr + == client_sin.sin_addr.s_addr) + && (udp_recv_client_sins[i].sin_port + == client_sin.sin_port)) { + /* old client, wants to "disconnect" */ + if (debug > 1) { + fprintf(stdout, + "removed udp client %s\n", + inet_ntoa(client_sin.sin_addr)); + } + memset(&udp_recv_client_sins[i], + 0, + sizeof(struct sockaddr_in)); + + break; + } + } + + if (i == MIDDLEMAN_MAX_CLIENTS) { + /* new client */ + if (first_zero_idx < 0) { + ewarn("could not add new client, max reached"); + } + else { + memcpy(&udp_recv_client_sins[first_zero_idx], + &client_sin, + sizeof(struct sockaddr_in)); + + if (debug > 1) { + fprintf(stdout, + "added new udp receiver client %s\n", + inet_ntoa(client_sin.sin_addr)); + } + + } + } + } } else { - /* add new recv client... */ - for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) { - if (recv_clientfds[i] == -1) { - break; - } + if ((client_fd = accept(srv_recv_sock, + (struct sockaddr *)&client_sin, + &slen)) < 0) { + warn("accept failed"); } - - recv_clientfds[i] = client_fd; - if (client_fd > max_fd) { - max_fd = client_fd; + else if (recv_client_cnt >= MIDDLEMAN_MAX_CLIENTS) { + warn("already at max clients"); + close(client_fd); } - - FD_SET(client_fd,&static_rfds); - - char *addr = inet_ntoa(client_sin.sin_addr); - int addrlen = strlen(addr); - - if (debug > 1) { - fprintf(stdout, - "DEBUG: connect from %s:%d\n", + else { + /* add new recv client... */ + for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) { + if (recv_clientfds[i] == -1) { + break; + } + } + + recv_clientfds[i] = client_fd; + if (client_fd > max_fd) { + max_fd = client_fd; + } + + FD_SET(client_fd,&static_rfds); + + char *addr = inet_ntoa(client_sin.sin_addr); + int addrlen = strlen(addr); + + if (debug > 1) { + fprintf(stdout, + "DEBUG: connect from %s:%d\n", + addr, + ntohs(client_sin.sin_port)); + } + + strncpy(recv_clientsas[i], addr, - ntohs(client_sin.sin_port)); - } - - strncpy(recv_clientsas[i], - addr, - (addrlen > MAX_NAME_LEN)?MAX_NAME_LEN:addrlen); - /* null term if strncpy couldn't */ - if (addrlen > MAX_NAME_LEN) { - recv_clientsas[i][MAX_NAME_LEN] = '\0'; + (addrlen > MAX_NAME_LEN)?MAX_NAME_LEN:addrlen); + /* null term if strncpy couldn't */ + if (addrlen > MAX_NAME_LEN) { + recv_clientsas[i][MAX_NAME_LEN] = '\0'; + } + + ++recv_client_cnt; } - - ++recv_client_cnt; } } - else if (send_client_fd > 0 - && FD_ISSET(send_client_fd,&rfds)) { + else if ((send_client_fd > 0 + && FD_ISSET(send_client_fd,&rfds)) + || (udp && FD_ISSET(srv_send_sock,&rfds))) { /** * ok, here we need to read a block from this guy for awhile, * then [decrypt/munge]/encrypt (according to command line @@ -330,65 +429,107 @@ int main(int argc,char **argv) { //printf("going for another read\n"); - retval = read(send_client_fd, - &buf[current_send_rc], - block_size - current_send_rc); - if (retval < 0) { - if (errno == ECONNRESET) { + if (!udp) { + retval = read(send_client_fd, + &buf[current_send_rc], + block_size - current_send_rc); + + if (retval < 0) { + if (errno == ECONNRESET) { + /* dead sender */ + close(send_client_fd); + FD_CLR(send_client_fd,&static_rfds); + send_client_fd = -1; + + current_send_rc = 0; + block_count = 0; + + if (debug > 1) { + fprintf(stderr, + "DEBUG: (send client!) disconnect from %s\n", + recv_clientsas[i]); + } + } + else if (errno == EAGAIN) { + // ignore, doh + //ewarn("while reading"); + } + else { + //ewarn("unexpected while reading"); + } + } + else if (retval == 0) { /* dead sender */ close(send_client_fd); FD_CLR(send_client_fd,&static_rfds); send_client_fd = -1; - + current_send_rc = 0; block_count = 0; - + if (debug > 1) { fprintf(stderr, - "DEBUG: (send client!) disconnect from %s\n", - recv_clientsas[i]); + "DEBUG: (send client!) disconnect!\n"); } } - else if (errno == EAGAIN) { - // ignore, doh - //ewarn("while reading"); - } else { - //ewarn("unexpected while reading"); + current_send_rc += retval; } } - else if (retval == 0) { - /* dead sender */ - close(send_client_fd); - FD_CLR(send_client_fd,&static_rfds); - send_client_fd = -1; - - current_send_rc = 0; - block_count = 0; + else { + /* + * NOTE: we do not establish that there's only one + * sender! We just forward msgs that come to us. + */ + struct sockaddr_in client_sin; + socklen_t slen; + slen = sizeof(client_sin); + + if (debug > 3) { + fprintf(stdout,"DEBUG: calling recvfrom on sender\n"); + } + + retval = recvfrom(srv_send_sock,buf,block_size,0, + (struct sockaddr *)&client_sin,&slen); + + if (retval < 0) { + ewarn("error in recvfrom on sending client"); + udp_msg_size = 0; + } + else { + udp_msg_size = retval; + + if (debug > 1) { + fprintf(stderr, + "DEBUG: udp dgram\n", + udp_msg_size); + } - if (debug > 1) { - fprintf(stderr, - "DEBUG: (send client!) disconnect!\n"); } } - else { - current_send_rc += retval; - } + bytesRead = (udp)?udp_msg_size:current_send_rc; + + /* grab recv timestamp */ + gettimeofday(&t0,NULL); + + if ((!udp && current_send_rc == block_size) + || (udp && udp_msg_size > 0)) { + block_hdr_t hdr; - if (current_send_rc == block_size) { current_send_rc = 0; - ++block_count; + /* unmarshall header */ + unmarshall_block_hdr(buf,&hdr); /* wait to send ack until after crypto */ /* end of block, do the encryption op and note times... */ if (debug > 1) { fprintf(stderr, - "DEBUG: read %d bytes (a block)\n", - block_size); + "DEBUG: read %d bytes\n", + bytesRead); } DES_cblock *iv = sgeniv(); @@ -398,17 +539,17 @@ int main(int argc,char **argv) { if (only_encrypt) { gettimeofday(&t1,NULL); - sencrypt(buf,outbuf,block_size, + sencrypt(buf,outbuf,bytesRead, k1,k2,iv); gettimeofday(&t2,NULL); } else { /* do a decrypt, then choose new iv, then encrypt. */ gettimeofday(&t1,NULL); - sdecrypt(buf,outbuf,block_size, + sdecrypt(buf,outbuf,bytesRead, k1,k2,iv); iv = sgeniv(); - sencrypt(buf,outbuf,block_size, + sencrypt(buf,outbuf,bytesRead, k1,k2,iv); gettimeofday(&t2,NULL); } @@ -420,18 +561,30 @@ int main(int argc,char **argv) { t3.tv_usec += 1000000; } fprintf(stdout, - "BLOCKTIME(%s): %d %d at %.6f\n", + "BLOCKTIME(%s): m%d b%d f%d %d at %.6f\n", (only_encrypt)?"e":"de", - block_count, + hdr.msg_id,hdr.block_id,hdr.frag_id, t3.tv_sec * 1000 + t3.tv_usec, t2.tv_sec + t2.tv_usec / 1000000.0f); fflush(stdout); + if (!udp) { + /* no app fragmentation with tcp */ + ++block_count; + } + else { + /* have to actually unmarshall the header to see + * which block and frag numbers we have. + */ + + } + /** * send the ack after crypto ops... */ - if (pushback) { - if ((retval = write(send_client_fd,&buf[0],sizeof(char))) < 0) { + if (!udp && pushback) { + if ((retval = write(send_client_fd, + &buf[0],sizeof(char))) < 0) { ewarn("failed to send ack"); } } @@ -441,65 +594,109 @@ int main(int argc,char **argv) { "DEBUG: sending to clients\n"); } + /* before sending on, copy the original header and + * note our timestamps... + */ + memcpy(outbuf,buf,sizeof(block_hdr_t)); + /* and marshall bits of the hdr that matter to the + * middleman + */ + marshall_block_hdr_mid(outbuf,&t0,&t2); + + /* send to clients */ - for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) { - if (recv_clientfds[i] > -1) { - retval = write(recv_clientfds[i], - outbuf, - block_size); - if (retval < 0) { - ewarn("client write failed"); + if (!udp) { + for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) { + if (recv_clientfds[i] > -1) { + retval = write(recv_clientfds[i], + outbuf, + bytesRead); + if (retval < 0) { + ewarn("client write failed"); + } + else if (retval != bytesRead) { + fprintf(stderr, + "ERROR: wrote only %d bytes to " + "client %d!\n", + retval,i); + } + else { + if (debug > 1) { + fprintf(stderr, + "DEBUG: wrote all " + "%d bytes to client %d!\n", + bytesRead,i); + } + } } - else if (retval != block_size) { - fprintf(stderr, - "ERROR: wrote only %d bytes to client %d!\n", - retval,i); + } + } + else { + for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) { + if (udp_recv_client_sins[i].sin_addr.s_addr != 0) { + if (debug > 1) { + fprintf(stdout,"sending %d-byte udp msg to" + " client %s", + bytesRead, + inet_ntoa(udp_recv_client_sins[i].sin_addr)); + } + + retval = sendto(srv_recv_sock, + outbuf,bytesRead,0, + (struct sockaddr *)&udp_recv_client_sins[i], + sizeof(struct sockaddr_in)); + + if (retval < 0) { + ewarn("while sending udp to client"); + } + else if (retval != udp_msg_size) { + ewarn("only sent part of udp msg"); + } } else { - if (debug > 1) { - fprintf(stderr, - "DEBUG: wrote all block_size %d bytes to client %d!\n", - block_size,i); + if (debug > 3) { + fprintf(stdout,"no client in slot %d\n",i); } } } } - } } else { - /** - * If we get here, assume that one of the recv clients - * has died (cause they are not supposed to write anything - * to us)... so nuke the doofus! - */ - for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) { - if (recv_clientfds[i] > -1) { - if (FD_ISSET(recv_clientfds[i],&rfds)) { - /* read a block, or as much as possible */ - //retval = read(recv_clientfds[i],buf,block_size); - - /* dead client, pretty much */ - //if (retval <= 0) { + if (!udp) { + /** + * If we get here, assume that one of the recv clients + * has died (cause they are not supposed to write anything + * to us)... so nuke the doofus! + */ + for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) { + if (recv_clientfds[i] > -1) { + if (FD_ISSET(recv_clientfds[i],&rfds)) { + /* read a block, or as much as possible */ + //retval = read(recv_clientfds[i],buf,block_size); + + /* dead client, pretty much */ + //if (retval <= 0) { if (debug > 1) { fprintf(stderr, "DEBUG: disconnect from %s\n", recv_clientsas[i]); } - + close(recv_clientfds[i]); FD_CLR(recv_clientfds[i],&static_rfds); recv_clientfds[i] = -1; - + --recv_client_cnt; - //} - //else if (debug > 2 ) { + //} + //else if (debug > 2 ) { //fprintf(stdout, //"DEBUG: read %d bytes from %s\n", //retval, //recv_clientsas[i]); - //} + //} + } } } } diff --git a/pelab/supafly/sfreceiver.c b/pelab/supafly/sfreceiver.c index 33bf584ec..3ffb39bbd 100644 --- a/pelab/supafly/sfreceiver.c +++ b/pelab/supafly/sfreceiver.c @@ -22,6 +22,7 @@ char *middleman_host = "localhost"; /** -R */ short middleman_port = MIDDLEMAN_RECV_CLIENT_PORT; int debug = 0; +int udp = 0; char *deadbeef = "deadbeef"; @@ -30,10 +31,11 @@ char *deadbeef = "deadbeef"; */ void usage(char *bin) { fprintf(stdout, - "USAGE: %s -cudR (option defaults in parens)\n" + "USAGE: %s -smMUdu (option defaults in parens)\n" " -s rx block size (%d)\n" " -m Middleman host to connect to (%s)\n" - " -R Middle port to connect to (%d)\n" + " -M Middle port to connect to (%d)\n" + " -U Use udp instead of tcp\n" " -d[d..d] Enable various levels of debug output\n" " -u Print this msg\n", bin,block_size,middleman_host,middleman_port @@ -44,7 +46,7 @@ void parse_args(int argc,char **argv) { int c; char *ep = NULL; - while ((c = getopt(argc,argv,"s:m:R:h:ud")) != -1) { + while ((c = getopt(argc,argv,"s:m:M:udU")) != -1) { switch(c) { case 's': block_size = (int)strtol(optarg,&ep,10); @@ -53,10 +55,10 @@ void parse_args(int argc,char **argv) { exit(-1); } break; - case 'h': + case 'm': middleman_host = optarg; break; - case 'R': + case 'M': middleman_port = (short)strtol(optarg,&ep,10); if (ep == optarg) { usage(argv[0]); @@ -69,6 +71,9 @@ void parse_args(int argc,char **argv) { case 'd': ++debug; break; + case 'U': + udp = 1; + break; default: break; } @@ -95,6 +100,10 @@ int main(int argc,char **argv) { int blocksRead = 0; struct timeval t1; int block_count = 0; + char *crbuf = "foo"; + struct sockaddr_in client_sin; + int slen; + block_hdr_t hdr; parse_args(argc,argv); @@ -127,60 +136,93 @@ int main(int argc,char **argv) { } /* startup recv client... */ - if ((recv_sock = socket(PF_INET,SOCK_STREAM,0)) == -1) { + if ((recv_sock = socket(PF_INET,(udp)?SOCK_DGRAM:SOCK_STREAM,0)) == -1) { efatal("could not get recv socket"); } - /* connect to the middleman if we can... */ - if (connect(recv_sock, - (struct sockaddr *)&recv_sa, - sizeof(struct sockaddr_in)) < 0) { - efatal("could not connect to middleman"); + if (!udp) { + /* connect to the middleman if we can... */ + if (connect(recv_sock, + (struct sockaddr *)&recv_sa, + sizeof(struct sockaddr_in)) < 0) { + efatal("could not connect to middleman"); + } + } + else { + /* just send the middleman a "register" msg */ + sendto(recv_sock,crbuf,sizeof(crbuf),0, + (struct sockaddr *)&recv_sa,sizeof(struct sockaddr_in)); + } + + if (debug > 1) { + fprintf(stdout,"DEBUG: connected to %s:%d\n", + inet_ntoa(recv_sa.sin_addr),ntohs(recv_sa.sin_port)); } /** * read blocks forever, noting times at which a block was completely read. */ + if (debug > 1) { + fprintf(stdout,"DEBUG: Receiving blocks...\n"); + } + while (1) { bytesRead = 0; - while (bytesRead < block_size) { - retval = read(recv_sock, - &buf[(block_size - bytesRead)], - (block_size - bytesRead)); - if (retval < 0) { - if (errno == ECONNRESET) { - /* middleman must've dumped out. */ - efatal("middleman appears to have disappeared"); + if (!udp) { + while (bytesRead < block_size) { + retval = read(recv_sock, + &buf[(block_size - bytesRead)], + (block_size - bytesRead)); + + if (retval < 0) { + if (errno == ECONNRESET) { + /* middleman must've dumped out. */ + efatal("middleman appears to have disappeared"); + } + else if (errno == EAGAIN) { + ; + } + else { + ewarn("weird"); + } } - else if (errno == EAGAIN) { - ; + else if (retval == 0) { + /* middleman dumped out */ + efatal("middleman appears to have disappeared"); } else { - ewarn("weird"); + bytesRead += retval; } } - else if (retval == 0) { - /* middleman dumped out */ - efatal("middleman appears to have disappeared"); + } + else { + slen = sizeof(struct sockaddr_in); + retval = recvfrom(recv_sock,buf,block_size,0, + (struct sockaddr *)&client_sin, + (socklen_t *)&slen); + if (retval < 0) { + ewarn("error while recvfrom middleman"); } else { - bytesRead += retval; + bytesRead = retval; } } - gettimeofday(&t1,NULL); - fprintf(stdout,"TIME %d %.4f\n", - block_count, + /* unmarshall to get id numbers */ + unmarshall_block_hdr(buf,&hdr); + + fprintf(stdout,"TIME m%d b%d f%d %.4f\n", + hdr.msg_id,hdr.block_id,hdr.frag_id, t1.tv_sec + t1.tv_usec / 1000000.0f); ++block_count; fprintf(stdout, - "INFO: read %d bytes (a block) at %.6f\n", - block_size, + "INFO: read %d bytes at %.6f\n", + bytesRead, t1.tv_sec + t1.tv_usec / 1000000.0f); } diff --git a/pelab/supafly/sfsender.c b/pelab/supafly/sfsender.c index 4e7764733..00eacc373 100644 --- a/pelab/supafly/sfsender.c +++ b/pelab/supafly/sfsender.c @@ -38,6 +38,9 @@ int ack_count = 0; int unack_threshold = -1; +int udp = 0; +int udp_frag_size = 0; + /** * functions */ @@ -51,7 +54,14 @@ void usage(char *bin) { " -P