Commit 48a38d19 authored by David Johnson's avatar David Johnson
Browse files

Another knob for Jon: sender can now delay sending until it receives

another ack IF at least N acks are outstanding.
parent dbb62c05
......@@ -20,13 +20,24 @@ short srv_recv_port = MIDDLEMAN_RECV_CLIENT_PORT;
char *deadbeef = "deadbeef";
int pushback = 0;
/**
* defs
*/
void usage(char *bin) {
fprintf(stdout,
"USAGE: %s -scohSRud (option defaults in parens)\n",
bin
"USAGE: %s -scohSRudb (option defaults in parens)\n"
" -s <block_size> tx block size (%d)\n"
" -c <block_count> tx block size (%d)\n"
" -h <hostname> Listen on this host/addr (INADDR_ANY)\n"
" -S <port> Listen on this port for one sender at a time (%d)\n"
" -R <port> Listen on this port for receivers (%d)\n"
" -o Only encrypt (instead of dec/enc)\n"
" -b Ack all received blocks from sender AFTER crypto\n"
" -d[d..d] Enable various levels of debug output\n"
" -u Print this msg\n",
bin,block_size,block_count,srv_send_port,srv_recv_port
);
}
......@@ -34,7 +45,7 @@ void parse_args(int argc,char **argv) {
int c;
char *ep = NULL;
while ((c = getopt(argc,argv,"s:c:S:R:h:oud")) != -1) {
while ((c = getopt(argc,argv,"s:c:S:R:h:oudb")) != -1) {
switch(c) {
case 's':
block_size = (int)strtol(optarg,&ep,10);
......@@ -76,6 +87,9 @@ void parse_args(int argc,char **argv) {
case 'd':
++debug;
break;
case 'b':
pushback = 1;
break;
default:
break;
}
......@@ -233,6 +247,7 @@ int main(int argc,char **argv) {
/* zero the counter */
current_send_rc = 0;
block_count = 0;
}
else if (FD_ISSET(srv_recv_sock,&rfds)) {
struct sockaddr_in client_sin;
......@@ -321,6 +336,7 @@ int main(int argc,char **argv) {
send_client_fd = -1;
current_send_rc = 0;
block_count = 0;
if (debug > 1) {
fprintf(stderr,
......@@ -343,6 +359,7 @@ int main(int argc,char **argv) {
send_client_fd = -1;
current_send_rc = 0;
block_count = 0;
if (debug > 1) {
fprintf(stderr,
......@@ -359,6 +376,8 @@ int main(int argc,char **argv) {
++block_count;
/* wait to send ack until after crypto */
/* end of block, do the encryption op and note times... */
if (debug > 1) {
fprintf(stderr,
......@@ -401,6 +420,15 @@ int main(int argc,char **argv) {
t3.tv_sec * 1000 + t3.tv_usec,
t2.tv_sec + t2.tv_usec / 1000000.0f);
fflush(stdout);
/**
* send the ack after crypto ops...
*/
if (pushback) {
if ((retval = write(send_client_fd,&buf[0],sizeof(char))) < 0) {
ewarn("failed to send ack");
}
}
if (debug > 1) {
fprintf(stderr,
......
......@@ -13,7 +13,7 @@ int optind,opterr,optopt;
int block_size = 4096;
/** -m <middleman hostname> */
char *middleman_host = "localhost";
/** -M <middleman hostport> */
/** -R <middleman hostport> */
short middleman_port = MIDDLEMAN_RECV_CLIENT_PORT;
int debug = 0;
......@@ -24,8 +24,13 @@ char *deadbeef = "deadbeef";
*/
void usage(char *bin) {
fprintf(stdout,
"USAGE: %s -cudR (option defaults in parens)\n",
bin
"USAGE: %s -cudR (option defaults in parens)\n"
" -s <block_size> rx block size (%d)\n"
" -m <hostname> Middleman host to connect to (%s)\n"
" -R <port> Middle port to connect to (%d)\n"
" -d[d..d] Enable various levels of debug output\n"
" -u Print this msg\n",
bin,block_size,middleman_host,middleman_port
);
}
......@@ -134,7 +139,9 @@ int main(int argc,char **argv) {
while (1) {
bytesRead = 0;
while (bytesRead < block_size) {
retval = read(recv_sock,buf,block_size);
retval = read(recv_sock,
&buf[(block_size - bytesRead)],
(block_size - bytesRead));
if (retval < 0) {
if (errno == ECONNRESET) {
......
#include <stdio.h>
#include <fcntl.h>
#include "defs.h"
#include "crypto.h"
......@@ -27,13 +28,28 @@ int debug = 0;
char *deadbeef = "deadbeef";
int ack_count = 0;
int unack_threshold = -1;
/**
* functions
*/
void usage(char *bin) {
fprintf(stdout,
"USAGE: %s -scCpPmMud (option defaults in parens)\n",
bin
"USAGE: %s -scCpPmMudba (option defaults in parens)\n"
" -s <block_size> tx block size (%d)\n"
" -c <block_count> tx block size (%d)\n"
" -C <msg_count> number of times to send block_count msgs (%d)\n"
" -p <time> Microseconds to pause between block sends (%d)\n"
" -P <time> Microseconds to pause between msg sends (%d)\n"
" -m <hostname> Middleman host to connect to (%s)\n"
" -M <port> Middle port to connect to (%d)\n"
" -a <threshold> Stop sending if gte threshold unack'd blocks remain (if less than 0, never wait) (%d)\n"
" -d[d..d] Enable various levels of debug output\n"
" -u Print this msg\n",
bin,block_size,block_count,msg_count,block_pause_us,msg_pause_us,
middleman_host,middleman_port,unack_threshold
);
}
......@@ -41,7 +57,7 @@ void parse_args(int argc,char **argv) {
int c;
char *ep = NULL;
while ((c = getopt(argc,argv,"s:c:C:p:P:m:M:R:h:ud")) != -1) {
while ((c = getopt(argc,argv,"s:c:C:p:P:m:M:R:h:uda:b")) != -1) {
switch(c) {
case 's':
block_size = (int)strtol(optarg,&ep,10);
......@@ -94,6 +110,13 @@ void parse_args(int argc,char **argv) {
case 'd':
++debug;
break;
case 'a':
unack_threshold = (int)strtol(optarg,&ep,10);
if (ep == optarg) {
usage(argv[0]);
exit(-1);
}
break;
default:
break;
}
......@@ -120,11 +143,13 @@ int main(int argc,char **argv) {
int retval;
int bytesWritten;
struct timeval tv;
int ack_count;
parse_args(argc,argv);
remaining_block_count = block_count;
remaining_msg_count = msg_count;
ack_count = 0;
/* initialize ourself... */
srand(time(NULL));
......@@ -165,6 +190,9 @@ int main(int argc,char **argv) {
sizeof(struct sockaddr_in)) < 0) {
efatal("could not connect to middleman");
}
/* set nonblocking so we can read acks at low priority */
fcntl(send_sock,F_SETFL,O_NONBLOCK);
/* heh, we don't do encryption! that way, if the middleman
* does a decrypt/encrypt with the same keys/iv, they should forward
......@@ -186,6 +214,9 @@ int main(int argc,char **argv) {
if (errno == EPIPE) {
efatal("while writing to middleman");
}
else if (errno == EAGAIN) {
;
}
else {
ewarn("while writing to middleman");
}
......@@ -205,9 +236,10 @@ int main(int argc,char **argv) {
if (remaining_block_count % 8 == 0) {
fprintf(stdout,
"INFO: %d blocks remaining in msg %d\n",
"INFO: %d blocks remaining in msg %d; %d acks\n",
remaining_block_count,
msg_count - remaining_msg_count);
msg_count - remaining_msg_count,
ack_count);
}
/* interblock sleep time */
......@@ -217,9 +249,32 @@ int main(int argc,char **argv) {
}
}
/* check for acks... */
/* just read one right away to stay ahead of the game... */
while (read(send_sock,&buf[0],sizeof(char)) == sizeof(char)) {
++ack_count;
}
if (unack_threshold > -1) {
while (((block_count - remaining_block_count) - ack_count) > unack_threshold) {
warn("hit the unack threshold!");
/* sleep until we get ack'd */
if (usleep(100*1000) < 0) {
ewarn("sleep at ack edge failed");
}
if (read(send_sock,&buf[0],sizeof(char)) == sizeof(char)) {
++ack_count;
}
}
}
}
remaining_block_count = block_size;
--remaining_msg_count;
ack_count = 0;
fprintf(stdout,
"INFO: only %d msgs remaining\n",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment