Commit b389ba3d authored by Jonathon Duerig's avatar Jonathon Duerig

Added a user-level buffer. This will allow the stub to catch up after the kernel buffer is replete.

parent 8496d41f
......@@ -123,12 +123,10 @@ unsigned int throughputTick(ThroughputAckState * state)
divisor = now.tv_sec - state->lastTime.tv_sec;
divisor += (now.tv_usec - state->lastTime.tv_usec)/1000000.0;
result = (state->ackSize * 8.0) / (divisor * 1000.0);
/*
printf("ByteCount: %u\n", state->ackSize);
printf("UnAck ByteCount: %i (%i - %i)\n",
state->nextSequence - state->firstUnknown,
state->nextSequence, state->firstUnknown);
*/
// printf("ByteCount: %u\n", state->ackSize);
// printf("UnAck ByteCount: %i (%i - %i)\n",
// state->nextSequence - state->firstUnknown,
// state->nextSequence, state->firstUnknown);
state->ackSize = 0;
state->repeatSize = 0;
state->lastTime = now;
......
......@@ -32,7 +32,13 @@
#define PENDING_CONNECTIONS 10 //the pending connections the queue will hold
#define CONCURRENT_SENDERS 50 //concurrent senders the stub maintains
#define CONCURRENT_RECEIVERS 50 //concurrent receivers the stub maintains
#define MAX_PAYLOAD_SIZE 2000 //size of the traffic payload
#define MAX_PAYLOAD_SIZE 110000 //size of the traffic payload
// This is the low water mark of the send buffer. That is, if select
// says that a write buffer is writable, this is the minimum amount of
// buffer space available.
#define LOW_WATER_MARK 110000
#define MAX_TCPDUMP_LINE 256 //the max line size of the tcpdump output
#define SIZEOF_LONG sizeof(long) //message bulding block
#define BANDWIDTH_OVER_THROUGHPUT 0 //the safty margin for estimating the available bandwidth
......@@ -50,6 +56,7 @@ struct connection {
int sockfd;
unsigned long ip;
time_t last_usetime; //last monitor access time
int pending; // How many bytes are pending to this peer?
};
typedef struct connection connection;
struct sniff_record {
......
......@@ -40,6 +40,8 @@ int maxfd;
char random_buffer[MAX_PAYLOAD_SIZE];
int total_size = 0;
typedef struct packet_buffer_node_tag
{
struct packet_buffer_node_tag * next;
......@@ -225,6 +227,7 @@ int insert_db(unsigned long ip, int sockfd, int dbtype) {
db[next].ip = ip;
db[next].sockfd= sockfd;
db[next].last_usetime = now;
db[next].pending = 0;
return next;
}
......@@ -278,11 +281,59 @@ int get_rcvdb_index(unsigned long destaddr){
clean_exit(1);
}
if (sockfd > maxfd) {
maxfd = sockfd;
}
dbindex=insert_db(destaddr, sockfd, 0); //insert rcvdb
}
return dbindex;
}
void remove_pending(int index)
{
if (rcvdb[index].pending == 0)
{
FD_CLR(rcvdb[index].sockfd, &write_fds);
}
}
void add_pending(int index, int size)
{
if (rcvdb[index].pending == 0 && size > 0)
{
FD_SET(rcvdb[index].sockfd, &write_fds);
}
rcvdb[index].pending += size;
}
void try_pending(int index, fd_set * write_fds_copy)
{
if (rcvdb[index].pending > 0 && FD_ISSET(rcvdb[index].sockfd,
write_fds_copy))
{
int size = 0;
int error = 0;
if (rcvdb[index].pending > LOW_WATER_MARK)
{
size = LOW_WATER_MARK;
}
else
{
size = rcvdb[index].pending;
}
error = send(rcvdb[index].sockfd, random_buffer, size, 0);
if (error == -1)
{
perror("try_pending");
clean_exit(1);
}
rcvdb[index].pending -= error;
total_size += error;
// printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending);
remove_pending(index);
}
}
void print_header(char *buf){
int i, j, len=14;
......@@ -348,54 +399,58 @@ void send_receiver(unsigned long destaddr, long size, fd_set * write_fds_copy){
int error = 1, retry=0;
struct in_addr addr;
index = get_rcvdb_index(destaddr);
sockfd= rcvdb[index].sockfd;
if (size <= 0) {
size = 1;
}
if (rcvdb[index].pending > 0) {
add_pending(index, size);
return;
}
if (size > MAX_PAYLOAD_SIZE){
printf("size exceeded MAX_PAYLOAD_SIZE\n");
add_pending(index, size - MAX_PAYLOAD_SIZE);
size = MAX_PAYLOAD_SIZE;
}
if (size <= 0){
size = 1;
}
index = get_rcvdb_index(destaddr);
sockfd= rcvdb[index].sockfd;
error = send(sockfd, random_buffer, size, MSG_DONTWAIT);
// Handle failed connection
while (error == -1 && errno == ECONNRESET && retry < 3) {
// TODO: Think hard about what resetting a connection means for sniffing
// traffic.
//if (select says its ok to write) || (we just created this connection)
if (FD_ISSET(sockfd, write_fds_copy) || !FD_ISSET(sockfd, &write_fds)){
error = send_all(sockfd, random_buffer, size);
}
while (error==0 && retry<3){ //rcv conn closed
//clear up the failed connection
if (!FD_ISSET(sockfd, &write_fds)) { //new connection
rcvdb[index].valid = 0;
} else { //existent connection
//reset the related state variables
sniff_rcvdb[index].start = 0;
sniff_rcvdb[index].end = 0;
throughput[index].isValid = 0;
FD_CLR(rcvdb[index].sockfd, &write_fds);
reset_rcv_entry(index);
}
//reset the related state variables
int pending = rcvdb[index].pending;
sniff_rcvdb[index].start = 0;
sniff_rcvdb[index].end = 0;
throughput[index].isValid = 0;
FD_CLR(rcvdb[index].sockfd, &write_fds);
reset_rcv_entry(index);
//try again
index = get_rcvdb_index(destaddr);
rcvdb[index].pending = pending;
sockfd= rcvdb[index].sockfd;
error = send_all(sockfd, random_buffer, size);
error = send(sockfd, random_buffer, size, MSG_DONTWAIT);
retry++;
} //while
//if no success for 3 tries, clean up and report the error
if (error == 0) {
}
//if still disconnected, reset
if (error == -1 && errno == ECONNRESET) {
rcvdb[index].valid = 0;
addr.s_addr = destaddr;
printf("Error: send_receiver() - failed send to %s three times. \n", inet_ntoa(addr));
} else {
//if a new connection succeeds, set the fds
if (error!=0 && !FD_ISSET(sockfd, &write_fds)){
FD_SET(sockfd, &write_fds);
if (sockfd > maxfd) {
maxfd = sockfd;
}
}
}
else if (error == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
add_pending(index, size);
}
else if (error == -1) {
perror("send_receiver: send");
clean_exit(1);
}
else {
total_size += error;
// printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending);
add_pending(index, size - error);
}
}
......@@ -451,9 +506,9 @@ int send_monitor(int sockfd) {
memcpy(outbuf_loss+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
for (i=0; i<CONCURRENT_RECEIVERS; i++){
if (rcvdb[i].valid == 1) {
printf("delays: %ld last: %ld\n", delays[i], last_delays[i]);
unsigned int through = throughputTick(&throughput[i]);
printf("throughput(kbps) = %u\n", through);
// printf("delays: %ld last: %ld\n", delays[i], last_delays[i]);
// unsigned int through = throughputTick(&throughput[i]);
// printf("throughput(kbps) = %u\n", through);
//send delay
if (delays[i] != last_delays[i]) {
memcpy(outbuf_delay, &(rcvdb[i].ip), SIZEOF_LONG); //the receiver ip
......@@ -463,7 +518,7 @@ int send_monitor(int sockfd) {
return 0;
}
last_delays[i] = delays[i];
printf("Sent delay: %ld\n", delays[i]);
// printf("Sent delay: %ld\n", delays[i]);
} //if measurement changed since last send
//send loss
......@@ -476,11 +531,11 @@ int send_monitor(int sockfd) {
memcpy(outbuf_loss, &(rcvdb[i].ip), SIZEOF_LONG); //the receiver ip
tmpulong = htonl(loss_rate);
memcpy(outbuf_loss+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
if (send_all(sockfd, outbuf_loss, 3*SIZEOF_LONG) == 0){
return 0;
}
// if (send_all(sockfd, outbuf_loss, 3*SIZEOF_LONG) == 0){
// return 0;
// }
last_loss_rates[i] = loss_rate;
printf("Sent loss: %d/%d=%ld \n", loss_records[i].loss_counter, loss_records[i].total_counter, loss_rate);
// printf("Sent loss: %d/%d=%ld \n", loss_records[i].loss_counter, loss_records[i].total_counter, loss_rate);
} //if measurement changed since last send
loss_records[i].loss_counter=0;
loss_records[i].total_counter=0;
......@@ -749,6 +804,7 @@ int main(int argc, char *argv[]) {
flag_send_monitor=0; //reset flag for each quanta
gettimeofday(&start_tv, NULL); //reset start time for each quanta
printf("Total: %d\n", total_size);
if (flag_debug) printf("quanta\n");
//while in a quanta
......@@ -766,6 +822,8 @@ int main(int argc, char *argv[]) {
//receive from existent senders
for (i=0; i<CONCURRENT_SENDERS; i++){
// Send pending data if it exists.
try_pending(i, &write_fds_copy);
if (snddb[i].valid==1 && FD_ISSET(snddb[i].sockfd, &read_fds_copy)) {
receive_sender(i);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment