Commit e442c723 authored by David Johnson's avatar David Johnson

Add UDP to the synthetic app (including application message

fragmentation).  Add block numbers and timestamps collected
along the way to a message header.
parent 60589bdb
......@@ -34,4 +34,43 @@ typedef uint32_t slen_t;
#define MAX_NAME_LEN 255
/* currently, just setting same as freebsd 6.x to get "same" behavior... */
#ifndef UDPCTL_MAXDGRAM
#define UDPCTL_MAXDGRAM 9216
#endif
/**
* This struct doubles as a tx block hdr and a stats collector.
*/
typedef struct block_hdr {
uint32_t msg_id;
uint32_t block_id;
uint32_t frag_id;
/* NOTE: times are when the first bytes were sent with write/sendto! */
/** sender time */
uint32_t tv_send_send_s;
uint32_t tv_send_send_us;
/** middleman times */
uint32_t tv_mid_recv_s;
uint32_t tv_mid_recv_us;
uint32_t tv_mid_send_s;
uint32_t tv_mid_send_us;
/**
* receiver time (only used in stats; just here cause it doesn't make
* any difference to have in the packet, cause the packet is filled with
* deadbeef anyway).
*/
uint32_t tv_recv_recv_s;
uint32_t tv_recv_recv_us;
} block_hdr_t;
int marshall_block_hdr_send(char *buf,
uint32_t msg_id,uint32_t block_id,uint32_t frag_id,
struct timeval *tv_send_send);
int marshall_block_hdr_mid( char *buf,
struct timeval *tv_mid_recv,
struct timeval *tv_mid_send);
block_hdr_t *unmarshall_block_hdr(char *buf,block_hdr_t *fill_hdr);
#endif /* __DEFS_H__ */
......@@ -28,6 +28,8 @@ char *deadbeef = "deadbeef";
int pushback = 0;
int udp = 0;
/**
* defs
*/
......@@ -36,11 +38,12 @@ void usage(char *bin) {
"USAGE: %s -scohSRudb (option defaults in parens)\n"
" -s <block_size> tx block size (%d)\n"
" -c <block_count> tx block size (%d)\n"
" -h <hostname> Listen on this host/addr (INADDR_ANY)\n"
" -m <hostname> Listen on this host/addr (INADDR_ANY)\n"
" -S <port> Listen on this port for one sender at a time (%d)\n"
" -R <port> Listen on this port for receivers (%d)\n"
" -o Only encrypt (instead of dec/enc)\n"
" -b Ack all received blocks from sender AFTER crypto\n"
" -U Use udp instead of tcp\n"
" -d[d..d] Enable various levels of debug output\n"
" -u Print this msg\n",
bin,block_size,block_count,srv_send_port,srv_recv_port
......@@ -51,7 +54,7 @@ void parse_args(int argc,char **argv) {
int c;
char *ep = NULL;
while ((c = getopt(argc,argv,"s:c:S:R:h:oudb")) != -1) {
while ((c = getopt(argc,argv,"s:c:S:R:m:oudbU")) != -1) {
switch(c) {
case 's':
block_size = (int)strtol(optarg,&ep,10);
......@@ -67,7 +70,7 @@ void parse_args(int argc,char **argv) {
exit(-1);
}
break;
case 'h':
case 'm':
srv_host = optarg;
break;
case 'S':
......@@ -96,6 +99,9 @@ void parse_args(int argc,char **argv) {
case 'b':
pushback = 1;
break;
case 'U':
udp = 1;
break;
default:
break;
}
......@@ -128,6 +134,10 @@ int main(int argc,char **argv) {
fd_set static_rfds;
int current_send_rc;
int block_count = 0;
struct sockaddr_in udp_recv_client_sins[MIDDLEMAN_MAX_CLIENTS];
int udp_msg_size = 0;
struct timeval t0;
int bytesRead = 0;
/* grab some quick args */
parse_args(argc,argv);
......@@ -135,6 +145,12 @@ int main(int argc,char **argv) {
/* initialize ourself... */
srand(time(NULL));
if (udp) {
memset(udp_recv_client_sins,
0,
MIDDLEMAN_MAX_CLIENTS * sizeof(struct sockaddr_in));
}
/* grab the bufs... */
if ((buf = (char *)malloc(sizeof(char)*block_size)) == NULL) {
efatal("no memory for data buf");
......@@ -167,10 +183,12 @@ int main(int argc,char **argv) {
}
/* startup server... */
if ((srv_send_sock = socket(PF_INET,SOCK_STREAM,0)) == -1) {
if ((srv_send_sock = socket(PF_INET,(udp)?SOCK_DGRAM:SOCK_STREAM,0))
== -1) {
efatal("could not get send listen socket");
}
if ((srv_recv_sock = socket(PF_INET,SOCK_STREAM,0)) == -1) {
if ((srv_recv_sock = socket(PF_INET,(udp)?SOCK_DGRAM:SOCK_STREAM,0))
== -1) {
efatal("could not get send listen socket");
}
......@@ -187,6 +205,7 @@ int main(int argc,char **argv) {
efatal("could not bind");
}
if (!udp) {
/* only listen for len 1 cause we won't accept on this guy if we
* already have a sender
*/
......@@ -197,6 +216,7 @@ int main(int argc,char **argv) {
if (listen(srv_recv_sock,8) < 0) {
efatal("could not listen");
}
}
/* daemonize... */
if (!debug) {
......@@ -213,15 +233,29 @@ int main(int argc,char **argv) {
FD_SET(srv_recv_sock,&static_rfds);
max_fd = (srv_send_sock > srv_recv_sock) ? srv_send_sock : srv_recv_sock;
if (debug > 1) {
fprintf(stdout,"DEBUG: server running...\n");
}
/* listen and read forever */
while (1) {
/* reset fdsets */
memcpy(&rfds,&static_rfds,sizeof(static_rfds));
if (debug > 3) {
printf("DEBUG: calling select\n");
}
retval = select(max_fd+1,&rfds,NULL,NULL,NULL);
printf("AAA\n");
if (retval > 0) {
if (FD_ISSET(srv_send_sock,&rfds)
/*
* do we have input on port listening for senders, and not
* currently have a sender?
*/
if (!udp && FD_ISSET(srv_send_sock,&rfds)
&& send_client_fd < 0) {
struct sockaddr_in client_sin;
socklen_t slen;
......@@ -255,13 +289,76 @@ int main(int argc,char **argv) {
current_send_rc = 0;
block_count = 0;
}
/* add a sender udp client */
//else if (FD_ISSET(srv_send_sock,&rfds) && udp) {
//}
/* add a new sfreceiver client */
else if (FD_ISSET(srv_recv_sock,&rfds)) {
struct sockaddr_in client_sin;
socklen_t slen;
int client_fd;
char tmpbuf[255];
slen = sizeof(client_sin);
if (udp) {
if (recvfrom(srv_recv_sock,tmpbuf,sizeof(tmpbuf),0,
(struct sockaddr *)&client_sin,&slen) < 0) {
ewarn("in recvfrom while getting a new receiver "
"client");
}
else {
/*
* if this is a client we haven't heard from,
* add it; else remove it. Basic "connection."
*/
int first_zero_idx = -1;
for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) {
if (udp_recv_client_sins[i].sin_addr.s_addr == 0
&& first_zero_idx < 0) {
first_zero_idx = i;
}
else if ((udp_recv_client_sins[i].sin_addr.s_addr
== client_sin.sin_addr.s_addr)
&& (udp_recv_client_sins[i].sin_port
== client_sin.sin_port)) {
/* old client, wants to "disconnect" */
if (debug > 1) {
fprintf(stdout,
"removed udp client %s\n",
inet_ntoa(client_sin.sin_addr));
}
memset(&udp_recv_client_sins[i],
0,
sizeof(struct sockaddr_in));
break;
}
}
if (i == MIDDLEMAN_MAX_CLIENTS) {
/* new client */
if (first_zero_idx < 0) {
ewarn("could not add new client, max reached");
}
else {
memcpy(&udp_recv_client_sins[first_zero_idx],
&client_sin,
sizeof(struct sockaddr_in));
if (debug > 1) {
fprintf(stdout,
"added new udp receiver client %s\n",
inet_ntoa(client_sin.sin_addr));
}
}
}
}
}
else {
if ((client_fd = accept(srv_recv_sock,
(struct sockaddr *)&client_sin,
&slen)) < 0) {
......@@ -307,8 +404,10 @@ int main(int argc,char **argv) {
++recv_client_cnt;
}
}
else if (send_client_fd > 0
&& FD_ISSET(send_client_fd,&rfds)) {
}
else if ((send_client_fd > 0
&& FD_ISSET(send_client_fd,&rfds))
|| (udp && FD_ISSET(srv_send_sock,&rfds))) {
/**
* ok, here we need to read a block from this guy for awhile,
* then [decrypt/munge]/encrypt (according to command line
......@@ -330,6 +429,8 @@ int main(int argc,char **argv) {
//printf("going for another read\n");
if (!udp) {
retval = read(send_client_fd,
&buf[current_send_rc],
block_size - current_send_rc);
......@@ -375,20 +476,60 @@ int main(int argc,char **argv) {
else {
current_send_rc += retval;
}
}
else {
/*
* NOTE: we do not establish that there's only one
* sender! We just forward msgs that come to us.
*/
struct sockaddr_in client_sin;
socklen_t slen;
slen = sizeof(client_sin);
if (debug > 3) {
fprintf(stdout,"DEBUG: calling recvfrom on sender\n");
}
retval = recvfrom(srv_send_sock,buf,block_size,0,
(struct sockaddr *)&client_sin,&slen);
if (retval < 0) {
ewarn("error in recvfrom on sending client");
udp_msg_size = 0;
}
else {
udp_msg_size = retval;
if (debug > 1) {
fprintf(stderr,
"DEBUG: udp dgram\n",
udp_msg_size);
}
}
}
bytesRead = (udp)?udp_msg_size:current_send_rc;
/* grab recv timestamp */
gettimeofday(&t0,NULL);
if ((!udp && current_send_rc == block_size)
|| (udp && udp_msg_size > 0)) {
block_hdr_t hdr;
if (current_send_rc == block_size) {
current_send_rc = 0;
++block_count;
/* unmarshall header */
unmarshall_block_hdr(buf,&hdr);
/* wait to send ack until after crypto */
/* end of block, do the encryption op and note times... */
if (debug > 1) {
fprintf(stderr,
"DEBUG: read %d bytes (a block)\n",
block_size);
"DEBUG: read %d bytes\n",
bytesRead);
}
DES_cblock *iv = sgeniv();
......@@ -398,17 +539,17 @@ int main(int argc,char **argv) {
if (only_encrypt) {
gettimeofday(&t1,NULL);
sencrypt(buf,outbuf,block_size,
sencrypt(buf,outbuf,bytesRead,
k1,k2,iv);
gettimeofday(&t2,NULL);
}
else {
/* do a decrypt, then choose new iv, then encrypt. */
gettimeofday(&t1,NULL);
sdecrypt(buf,outbuf,block_size,
sdecrypt(buf,outbuf,bytesRead,
k1,k2,iv);
iv = sgeniv();
sencrypt(buf,outbuf,block_size,
sencrypt(buf,outbuf,bytesRead,
k1,k2,iv);
gettimeofday(&t2,NULL);
}
......@@ -420,18 +561,30 @@ int main(int argc,char **argv) {
t3.tv_usec += 1000000;
}
fprintf(stdout,
"BLOCKTIME(%s): %d %d at %.6f\n",
"BLOCKTIME(%s): m%d b%d f%d %d at %.6f\n",
(only_encrypt)?"e":"de",
block_count,
hdr.msg_id,hdr.block_id,hdr.frag_id,
t3.tv_sec * 1000 + t3.tv_usec,
t2.tv_sec + t2.tv_usec / 1000000.0f);
fflush(stdout);
if (!udp) {
/* no app fragmentation with tcp */
++block_count;
}
else {
/* have to actually unmarshall the header to see
* which block and frag numbers we have.
*/
}
/**
* send the ack after crypto ops...
*/
if (pushback) {
if ((retval = write(send_client_fd,&buf[0],sizeof(char))) < 0) {
if (!udp && pushback) {
if ((retval = write(send_client_fd,
&buf[0],sizeof(char))) < 0) {
ewarn("failed to send ack");
}
}
......@@ -441,34 +594,77 @@ int main(int argc,char **argv) {
"DEBUG: sending to clients\n");
}
/* before sending on, copy the original header and
* note our timestamps...
*/
memcpy(outbuf,buf,sizeof(block_hdr_t));
/* and marshall bits of the hdr that matter to the
* middleman
*/
marshall_block_hdr_mid(outbuf,&t0,&t2);
/* send to clients */
if (!udp) {
for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) {
if (recv_clientfds[i] > -1) {
retval = write(recv_clientfds[i],
outbuf,
block_size);
bytesRead);
if (retval < 0) {
ewarn("client write failed");
}
else if (retval != block_size) {
else if (retval != bytesRead) {
fprintf(stderr,
"ERROR: wrote only %d bytes to client %d!\n",
"ERROR: wrote only %d bytes to "
"client %d!\n",
retval,i);
}
else {
if (debug > 1) {
fprintf(stderr,
"DEBUG: wrote all block_size %d bytes to client %d!\n",
block_size,i);
"DEBUG: wrote all "
"%d bytes to client %d!\n",
bytesRead,i);
}
}
}
}
}
else {
for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) {
if (udp_recv_client_sins[i].sin_addr.s_addr != 0) {
if (debug > 1) {
fprintf(stdout,"sending %d-byte udp msg to"
" client %s",
bytesRead,
inet_ntoa(udp_recv_client_sins[i].sin_addr));
}
retval = sendto(srv_recv_sock,
outbuf,bytesRead,0,
(struct sockaddr *)&udp_recv_client_sins[i],
sizeof(struct sockaddr_in));
if (retval < 0) {
ewarn("while sending udp to client");
}
else if (retval != udp_msg_size) {
ewarn("only sent part of udp msg");
}
}
else {
if (debug > 3) {
fprintf(stdout,"no client in slot %d\n",i);
}
}
}
}
}
}
else {
if (!udp) {
/**
* If we get here, assume that one of the recv clients
* has died (cause they are not supposed to write anything
......@@ -505,6 +701,7 @@ int main(int argc,char **argv) {
}
}
}
}
else if (retval < 0) {
/* error... */
ewarn("error in select");
......
......@@ -22,6 +22,7 @@ char *middleman_host = "localhost";
/** -R <middleman hostport> */
short middleman_port = MIDDLEMAN_RECV_CLIENT_PORT;
int debug = 0;
int udp = 0;
char *deadbeef = "deadbeef";
......@@ -30,10 +31,11 @@ char *deadbeef = "deadbeef";
*/
void usage(char *bin) {
fprintf(stdout,
"USAGE: %s -cudR (option defaults in parens)\n"
"USAGE: %s -smMUdu (option defaults in parens)\n"
" -s <block_size> rx block size (%d)\n"
" -m <hostname> Middleman host to connect to (%s)\n"
" -R <port> Middle port to connect to (%d)\n"
" -M <port> Middle port to connect to (%d)\n"
" -U Use udp instead of tcp\n"
" -d[d..d] Enable various levels of debug output\n"
" -u Print this msg\n",
bin,block_size,middleman_host,middleman_port
......@@ -44,7 +46,7 @@ void parse_args(int argc,char **argv) {
int c;
char *ep = NULL;
while ((c = getopt(argc,argv,"s:m:R:h:ud")) != -1) {
while ((c = getopt(argc,argv,"s:m:M:udU")) != -1) {
switch(c) {
case 's':
block_size = (int)strtol(optarg,&ep,10);
......@@ -53,10 +55,10 @@ void parse_args(int argc,char **argv) {
exit(-1);
}
break;
case 'h':
case 'm':
middleman_host = optarg;
break;
case 'R':
case 'M':
middleman_port = (short)strtol(optarg,&ep,10);
if (ep == optarg) {
usage(argv[0]);
......@@ -69,6 +71,9 @@ void parse_args(int argc,char **argv) {
case 'd':
++debug;
break;
case 'U':
udp = 1;
break;
default:
break;
}
......@@ -95,6 +100,10 @@ int main(int argc,char **argv) {
int blocksRead = 0;
struct timeval t1;
int block_count = 0;
char *crbuf = "foo";
struct sockaddr_in client_sin;
int slen;
block_hdr_t hdr;
parse_args(argc,argv);
......@@ -127,23 +136,41 @@ int main(int argc,char **argv) {
}
/* startup recv client... */
if ((recv_sock = socket(PF_INET,SOCK_STREAM,0)) == -1) {
if ((recv_sock = socket(PF_INET,(udp)?SOCK_DGRAM:SOCK_STREAM,0)) == -1) {
efatal("could not get recv socket");
}
if (!udp) {
/* connect to the middleman if we can... */
if (connect(recv_sock,
(struct sockaddr *)&recv_sa,
sizeof(struct sockaddr_in)) < 0) {
efatal("could not connect to middleman");
}
}
else {
/* just send the middleman a "register" msg */
sendto(recv_sock,crbuf,sizeof(crbuf),0,
(struct sockaddr *)&recv_sa,sizeof(struct sockaddr_in));
}
if (debug > 1) {
fprintf(stdout,"DEBUG: connected to %s:%d\n",
inet_ntoa(recv_sa.sin_addr),ntohs(recv_sa.sin_port));
}
/**
* read blocks forever, noting times at which a block was completely read.
*/
if (debug > 1) {
fprintf(stdout,"DEBUG: Receiving blocks...\n");
}
while (1) {
bytesRead = 0;
if (!udp) {
while (bytesRead < block_size) {
retval = read(recv_sock,
&buf[(block_size - bytesRead)],
......@@ -169,18 +196,33 @@ int main(int argc,char **argv) {
bytesRead += retval;
}
}
}
else {
slen = sizeof(struct sockaddr_in);
retval = recvfrom(recv_sock,buf,block_size,0,
(struct sockaddr *)&client_sin,
(socklen_t *)&slen);
if (retval < 0) {
ewarn("error while recvfrom middleman");
}
else {
bytesRead = retval;
}
}
gettimeofday(&t1,NULL);
fprintf(stdout,"TIME %d %.4f\n",
block_count,
/* unmarshall to get id numbers */
unmarshall_block_hdr(buf,&hdr);
fprintf(stdout,"TIME m%d b%d f%d %.4f\n",
hdr.msg_id,hdr.block_id,hdr.frag_id,
t1.tv_sec + t1.tv_usec / 1000000.0f);
++block_count;
fprintf(stdout,
"INFO: read %d bytes (a block) at %.6f\n",
block_size,
"INFO: read %d bytes at %.6f\n",
bytesRead,
t1.tv_sec + t1.tv_usec / 1000000.0f);
}
......
......@@ -38,6 +38,9 @@ int ack_count = 0;
int unack_threshold = -1;
int udp = 0;
int udp_frag_size = 0;
/**
* functions
*/
......@@ -51,7 +54,14 @@ void usage(char *bin) {
" -P <time> Microseconds to pause between msg sends (%d)\n"
" -m <hostname> Middleman host to connect to (%s)\n"
" -M <port> Middle port to connect to (%d)\n"
" -a <threshold> Stop sending if gte threshold unack'd blocks remain (if less than 0, never wait) (%d)\n"
" -a <threshold> Stop sending if gte threshold unack'd blocks "
"remain (if less than 0, \n"
" never wait) (%d)\n"
" (NOTE: cannot use with udp)\n"
" -U Use udp instead of tcp\n"
" -F <frag_size> Application fragment size for udp (i.e. if \n"
" -s 1024 and -F 512, fragment each block \n"
" into two chunks)\n"
" -d[d..d] Enable various levels of debug output\n"
" -u Print this msg\n",
bin,block_size,block_count,msg_count,block_pause_us,msg_pause_us,
......@@ -63,7 +73,7 @@ void parse_args(int argc,char **argv) {
int c;
char *ep = NULL;