Commit df5d662e authored by David Johnson's avatar David Johnson
Browse files

more features, could kill a man.

parent fbc4695d
......@@ -138,6 +138,8 @@ int main(int argc,char **argv) {
int udp_msg_size = 0;
struct timeval t0;
int bytesRead = 0;
struct sockaddr_in udp_send_client_sin;
struct timeval tvs,tv;
/* grab some quick args */
parse_args(argc,argv);
......@@ -247,8 +249,6 @@ int main(int argc,char **argv) {
}
retval = select(max_fd+1,&rfds,NULL,NULL,NULL);
printf("AAA\n");
if (retval > 0) {
/*
......@@ -294,17 +294,21 @@ int main(int argc,char **argv) {
//}
/* add a new sfreceiver client */
/* OR pass through acks. */
else if (FD_ISSET(srv_recv_sock,&rfds)) {
struct sockaddr_in client_sin;
socklen_t slen;
int client_fd;
char tmpbuf[255];
block_hdr_t ack;
slen = sizeof(client_sin);
if (udp) {
if (recvfrom(srv_recv_sock,tmpbuf,sizeof(tmpbuf),0,
(struct sockaddr *)&client_sin,&slen) < 0) {
if ((bytesRead = recvfrom(srv_recv_sock,
tmpbuf,sizeof(tmpbuf),0,
(struct sockaddr *)&client_sin,
&slen)) < 0) {
ewarn("in recvfrom while getting a new receiver "
"client");
}
......@@ -313,8 +317,11 @@ int main(int argc,char **argv) {
* if this is a client we haven't heard from,
* add it; else remove it. Basic "connection."
*/
int first_zero_idx = -1;
/* record ack time */
gettimeofday(&tv,NULL);
for (i = 0; i < MIDDLEMAN_MAX_CLIENTS; ++i) {
if (udp_recv_client_sins[i].sin_addr.s_addr == 0
&& first_zero_idx < 0) {
......@@ -325,14 +332,53 @@ int main(int argc,char **argv) {
&& (udp_recv_client_sins[i].sin_port
== client_sin.sin_port)) {
/* old client, wants to "disconnect" */
if (debug > 1) {
/* OR it's sending an ack... if the msg size
* is sizeof(hdr) instead of 1 byte, pass
* the ack on to the sender.
*/
if (debug > 2) {
fprintf(stdout,"read %d-byte msg from "
"recv client %s\n",
bytesRead,inet_ntoa(client_sin.sin_addr));
}
if (bytesRead == sizeof(block_hdr_t)) {
/* unmarshall for debug */
unmarshall_block_hdr(tmpbuf,&ack);
retval = sendto(srv_send_sock,
tmpbuf,bytesRead,0,
(struct sockaddr *)&udp_send_client_sin,
sizeof(struct sockaddr_in));
if (retval < 0) {
ewarn("while sending udp ack to client");
}
else if (retval != bytesRead) {
ewarn("only sent part of udp ack msg");
}
fprintf(stdout,
"removed udp client %s\n",
inet_ntoa(client_sin.sin_addr));
"ACKTIME: recv m%d b%d f%d at %.6f\n",
ack.msg_id,
ack.block_id,
ack.frag_id,
tv.tv_sec+tv.tv_usec/1000000.0f);
fflush(stdout);
}
else if (slen == 1) {
if (debug > 1) {
fprintf(stdout,
"removed udp client %s\n",
inet_ntoa(client_sin.sin_addr));
}
memset(&udp_recv_client_sins[i],
0,
sizeof(struct sockaddr_in));
}
memset(&udp_recv_client_sins[i],
0,
sizeof(struct sockaddr_in));
break;
}
......@@ -493,6 +539,9 @@ int main(int argc,char **argv) {
retval = recvfrom(srv_send_sock,buf,block_size,0,
(struct sockaddr *)&client_sin,&slen);
/* keep the sender's sin around */
udp_send_client_sin = client_sin;
if (retval < 0) {
ewarn("error in recvfrom on sending client");
udp_msg_size = 0;
......@@ -560,13 +609,6 @@ int main(int argc,char **argv) {
t3.tv_sec--;
t3.tv_usec += 1000000;
}
fprintf(stdout,
"BLOCKTIME(%s): m%d b%d f%d %d at %.6f\n",
(only_encrypt)?"e":"de",
hdr.msg_id,hdr.block_id,hdr.frag_id,
t3.tv_sec * 1000 + t3.tv_usec,
t2.tv_sec + t2.tv_usec / 1000000.0f);
fflush(stdout);
if (!udp) {
/* no app fragmentation with tcp */
......@@ -636,7 +678,7 @@ int main(int argc,char **argv) {
if (udp_recv_client_sins[i].sin_addr.s_addr != 0) {
if (debug > 1) {
fprintf(stdout,"sending %d-byte udp msg to"
" client %s",
" client %s\n",
bytesRead,
inet_ntoa(udp_recv_client_sins[i].sin_addr));
}
......@@ -660,6 +702,19 @@ int main(int argc,char **argv) {
}
}
}
gettimeofday(&tvs,NULL);
/* dump times */
fprintf(stdout,
"BLOCKTIME(%s): m%d b%d f%d %d; read=%.6f; compute=%.6f; send=%.6f\n",
(only_encrypt)?"e":"de",
hdr.msg_id,hdr.block_id,hdr.frag_id,
t3.tv_sec * 1000 + t3.tv_usec,
t0.tv_sec + t0.tv_usec / 1000000.0f,
t2.tv_sec + t2.tv_usec / 1000000.0f,
tvs.tv_sec + tvs.tv_usec / 1000000.0f);
fflush(stdout);
}
}
......
......@@ -23,6 +23,7 @@ char *middleman_host = "localhost";
short middleman_port = MIDDLEMAN_RECV_CLIENT_PORT;
int debug = 0;
int udp = 0;
int udp_ack = 0;
char *deadbeef = "deadbeef";
......@@ -37,6 +38,7 @@ void usage(char *bin) {
" -M <port> Middle port to connect to (%d)\n"
" -U Use udp instead of tcp\n"
" -d[d..d] Enable various levels of debug output\n"
" -a Enable udp acks back to the sender via middleman\n"
" -u Print this msg\n",
bin,block_size,middleman_host,middleman_port
);
......@@ -46,7 +48,7 @@ void parse_args(int argc,char **argv) {
int c;
char *ep = NULL;
while ((c = getopt(argc,argv,"s:m:M:udU")) != -1) {
while ((c = getopt(argc,argv,"s:m:M:udUa")) != -1) {
switch(c) {
case 's':
block_size = (int)strtol(optarg,&ep,10);
......@@ -74,6 +76,9 @@ void parse_args(int argc,char **argv) {
case 'U':
udp = 1;
break;
case 'a':
udp_ack = 1;
break;
default:
break;
}
......@@ -104,6 +109,7 @@ int main(int argc,char **argv) {
struct sockaddr_in client_sin;
int slen;
block_hdr_t hdr;
struct timeval tv;
parse_args(argc,argv);
......@@ -202,6 +208,7 @@ int main(int argc,char **argv) {
retval = recvfrom(recv_sock,buf,block_size,0,
(struct sockaddr *)&client_sin,
(socklen_t *)&slen);
if (retval < 0) {
ewarn("error while recvfrom middleman");
}
......@@ -224,6 +231,32 @@ int main(int argc,char **argv) {
"INFO: read %d bytes at %.6f\n",
bytesRead,
t1.tv_sec + t1.tv_usec / 1000000.0f);
fflush(stdout);
/* if udp, send a best-effort ack that the middleman will forward. */
if (udp && udp_ack) {
retval = sendto(recv_sock,buf,sizeof(block_hdr_t),0,
(struct sockaddr *)&client_sin,
sizeof(struct sockaddr_in));
gettimeofday(&tv,NULL);
if (retval < 0) {
ewarn("while sending udp ack to client");
}
else if (retval != sizeof(block_hdr_t)) {
ewarn("only sent part of udp ack msg");
}
else if (retval == sizeof(block_hdr_t)) {
fprintf(stdout,
"ACKTIME: sent m%d b%d f%d at %.6f\n",
hdr.msg_id,hdr.block_id,hdr.frag_id,
tv.tv_sec+tv.tv_usec/1000000.0f);
fflush(stdout);
}
}
}
/* never get here... */
......
......@@ -40,13 +40,14 @@ int unack_threshold = -1;
int udp = 0;
int udp_frag_size = 0;
int udp_max_ack_wait_us = 0;
/**
* functions
*/
void usage(char *bin) {
fprintf(stdout,
"USAGE: %s -scCpPmMudba (option defaults in parens)\n"
"USAGE: %s -scCpPmMudbat (option defaults in parens)\n"
" -s <block_size> tx block size (%d)\n"
" -c <block_count> tx block size (%d)\n"
" -C <msg_count> number of times to send block_count msgs (%d)\n"
......@@ -62,6 +63,7 @@ void usage(char *bin) {
" -F <frag_size> Application fragment size for udp (i.e. if \n"
" -s 1024 and -F 512, fragment each block \n"
" into two chunks)\n"
" -t <udp_max_ack_timeout> Max udp timeout when ack threshold reached (us)\n"
" -d[d..d] Enable various levels of debug output\n"
" -u Print this msg\n",
bin,block_size,block_count,msg_count,block_pause_us,msg_pause_us,
......@@ -73,7 +75,7 @@ void parse_args(int argc,char **argv) {
int c;
char *ep = NULL;
while ((c = getopt(argc,argv,"s:c:C:p:P:m:M:R:h:uda:bUF:")) != -1) {
while ((c = getopt(argc,argv,"s:c:C:p:P:m:M:R:h:uda:bUF:t:")) != -1) {
switch(c) {
case 's':
block_size = (int)strtol(optarg,&ep,10);
......@@ -150,6 +152,13 @@ void parse_args(int argc,char **argv) {
exit(-1);
}
break;
case 't':
udp_max_ack_wait_us = (int)strtol(optarg,&ep,10);
if (ep == optarg) {
usage(argv[0]);
exit(-1);
}
break;
default:
break;
}
......@@ -161,12 +170,12 @@ void parse_args(int argc,char **argv) {
exit(-1);
}
if (unack_threshold > -1 && udp) {
fprintf(stderr,
"ERROR: cannot use an ack window with udp since this program\n"
"does not retransmit unack'd msgs!\n");
exit(-1);
}
//if (unack_threshold > -1 && udp) {
//fprintf(stderr,
// "ERROR: cannot use an ack window with udp since this program\n"
// "does not retransmit unack'd msgs!\n");
//exit(-1);
//}
if ((udp_frag_size > 0 && udp_frag_size < sizeof(block_hdr_t))
|| block_size < sizeof(block_hdr_t)) {
......@@ -197,6 +206,11 @@ int main(int argc,char **argv) {
/** might be block_size or udp_frag_size */
int udp_msg_size;
int bsize;
int udp_last_block_id = 0;
/** for acks */
char tmpbuf[255];
block_hdr_t ack;
int slen;
parse_args(argc,argv);
......@@ -271,7 +285,6 @@ int main(int argc,char **argv) {
if (udp) {
while (remaining_frag_count) {
gettimeofday(&tv,NULL);
marshall_block_hdr_send(buf,
msg_count-remaining_msg_count,
block_count-remaining_block_count,
......@@ -280,9 +293,13 @@ int main(int argc,char **argv) {
retval = sendto(send_sock,buf,udp_msg_size,0,
(struct sockaddr *)&send_sa,
sizeof(send_sa));
gettimeofday(&tv,NULL);
if (retval < 0) {
if (errno == EAGAIN) {
ewarn("EAGAIN while sending udp");
continue;
}
else if (errno == EMSGSIZE) {
efatal("msg size too big for atomic send");
......@@ -302,6 +319,15 @@ int main(int argc,char **argv) {
fprintf(stderr,"WARNING: only sent %d of %d bytes\n",
retval,udp_msg_size);
}
gettimeofday(&tv,NULL);
fprintf(stdout,"TIME m%d b%d f%d %.4f\n",
msg_count - remaining_msg_count,
block_count - remaining_block_count,
frag_count - (remaining_frag_count + 1),
tv.tv_sec + tv.tv_usec / 1000000.0f);
}
}
else {
......@@ -328,12 +354,15 @@ int main(int argc,char **argv) {
bytesWritten += retval;
}
}
gettimeofday(&tv,NULL);
fprintf(stdout,"TIME m%d b%d f%0 %.4f\n",
msg_count - remaining_msg_count,
block_count - remaining_block_count,
frag_count - remaining_frag_count,
tv.tv_sec + tv.tv_usec / 1000000.0f);
}
gettimeofday(&tv,NULL);
fprintf(stdout,"TIME %d %.4f\n",
block_count - remaining_block_count,
tv.tv_sec + tv.tv_usec / 1000000.0f);
--remaining_block_count;
......@@ -359,18 +388,124 @@ int main(int argc,char **argv) {
++ack_count;
}
}
else {
retval = recvfrom(send_sock,
tmpbuf,sizeof(tmpbuf),0,
(struct sockaddr *)&send_sa,&slen);
gettimeofday(&tv,NULL);
if (retval < 0) {
if (errno == EAGAIN) {
;
}
else {
ewarn("while reading udp ack");
}
}
else if (retval == sizeof(block_hdr_t)) {
if (debug > 2) {
fprintf(stdout,"read %d-byte ack\n",retval);
}
++ack_count;
unmarshall_block_hdr(tmpbuf,&ack);
if (ack.block_id > udp_last_block_id) {
udp_last_block_id = ack.block_id;
}
fprintf(stdout,"ACKTIME m%d b%d f%d %.4f\n",
ack.msg_id,
ack.block_id,
ack.frag_id,
tv.tv_sec + tv.tv_usec / 1000000.0f);
}
else {
fprintf(stdout,
"WARNING: read %d-byte unknown block while "
"expecting ack\n",
retval);
}
}
if (unack_threshold > -1) {
struct timeval tvtmp;
struct timeval tvlast;
int sleeptime = 0;
gettimeofday(&tvtmp,NULL);
if (!udp && unack_threshold > -1) {
while (((block_count - remaining_block_count) - ack_count) > unack_threshold) {
warn("hit the unack threshold!");
/* sleep until we get ack'd */
if (usleep(100*1000) < 0) {
ewarn("sleep at ack edge failed");
if (!udp) {
/* sleep until we get ack'd */
if (usleep(100*1000) < 0) {
ewarn("sleep at ack edge failed");
}
if (read(send_sock,&buf[0],sizeof(char))
== sizeof(char)) {
++ack_count;
}
}
if (read(send_sock,&buf[0],sizeof(char)) == sizeof(char)) {
++ack_count;
else {
/* if udp, only sleep until we hit the max wait time */
gettimeofday(&tvlast,NULL);
usleep(500);
sleeptime += 500;
if (sleeptime > udp_max_ack_wait_us) {
/* reset ack_count */
ack_count = block_count - remaining_block_count;
fprintf(stdout,
"WARNING: waited max time for udp ack "
"without receiving; continuing!\n");
fflush(stdout);
continue;
}
retval = recvfrom(send_sock,
tmpbuf,sizeof(tmpbuf),0,
(struct sockaddr *)&send_sa,&slen);
gettimeofday(&tv,NULL);
if (retval < 0) {
if (errno == EAGAIN) {
;
}
else {
ewarn("while reading udp ack");
}
}
else if (retval == sizeof(block_hdr_t)) {
if (debug > 2) {
fprintf(stdout,"read %d-byte ack\n",retval);
}
++ack_count;
unmarshall_block_hdr(tmpbuf,&ack);
if (ack.block_id > udp_last_block_id) {
udp_last_block_id = ack.block_id;
}
fprintf(stdout,"ACKTIME m%d b%d f%d %.4f\n",
ack.msg_id,
ack.block_id,
ack.frag_id,
tv.tv_sec + tv.tv_usec / 1000000.0f);
}
else {
fprintf(stdout,
"WARNING: read %d-byte unknown block while"
" expecting ack\n",
retval);
}
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment