Commit 3b01fa98 authored by Jonathon Duerig's avatar Jonathon Duerig

Fixed a time-calculation bug in the monitor. Added bandwidth and base-rtt...

Fixed a time-calculation bug in the monitor. Added bandwidth and base-rtt calculations based on TCP-vegas. Miscellaneous cleanups.
parent de2f2638
......@@ -241,6 +241,8 @@ def send_destinations(conn, packet_list):
prev_time = packet_list[0][3]
for packet in packet_list:
ip = ip_to_int(emulated_to_real[packet[0]])
if prev_time == 0.0:
prev_time = packet[3]
delta = int((packet[3] - prev_time) * 1000)
if packet[3] == 0:
delta = 0
......
sh instrument.sh ../iperf -i 0.5 -c elab1 -t 30
#sh instrument.sh ../iperf -c elab1 -t 30
......@@ -261,9 +261,11 @@ int insert_fake(unsigned long ip, unsigned short port)
delays[index]=0;
last_delays[index]=0;
delay_count[index]=0;
delay_records[index].head = NULL;
delay_records[index].tail = NULL;
delay_records[index].sample_number = 0;
max_throughput[index] = 0;
base_rtt[index] = LONG_MAX;
// delay_records[index].head = NULL;
// delay_records[index].tail = NULL;
// delay_records[index].sample_number = 0;
result = index;
}
return result;
......@@ -336,7 +338,9 @@ void reconnect_receiver(int index)
last_loss_rates[index]=0;
delays[index]=0;
last_delays[index]=0;
remove_delay_samples(index);
max_throughput[index] = 0;
base_rtt[index] = LONG_MAX;
// remove_delay_samples(index);
}
void reset_receive_records(int index, unsigned long ip,
......
......@@ -46,6 +46,9 @@ FILE *loss_log;
ThroughputAckState throughput[CONCURRENT_RECEIVERS];
unsigned long max_throughput[CONCURRENT_RECEIVERS];
unsigned long base_rtt[CONCURRENT_RECEIVERS];
// Returns true if sequence is between the firstUnknown and the
// nextSequence. Takes account of wraparound.
int throughputInWindow(ThroughputAckState * state, unsigned int sequence)
......@@ -349,25 +352,18 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
source_port = htons(tp->source);
dest_port = htons(tp->dest);
// path_id = search_rcvdb(ip_dst);
// If there is a fake entry, the stub_port entry will be the
// destination port requested by the command line.
// debug_addr.s_addr = ip_src;
// printf("ip_src: %s ", inet_ntoa(debug_addr));
// debug_addr.s_addr = ip_dst;
// printf("ip_dst: %s, dest_port: %d, source_port: %d\n",
// inet_ntoa(debug_addr), dest_port, source_port);
path_id = find_by_stub_port(ip_dst, dest_port);
// printf("outgoing path_id: %d\n", path_id);
if (path_id == -1 || rcvdb[path_id].source_port != 0
|| rcvdb[path_id].dest_port != 0)
if (flag_standalone)
{
// If this is standalone mode, the stub_port entry will be the
// destination port requested by the command line.
path_id = find_by_stub_port(ip_dst, dest_port);
}
else
{
// I contacted the receiver. Therefore, my port is unique and
// the receiver's port is fixed. The destination is the
// receiver, therefore my port is the one that is of interest.
path_id = find_by_stub_port(ip_dst, source_port);
// printf("stub path_id (outgoing): %d\n", path_id);
}
if (path_id != -1) { //a monitored outgoing packet
//ignore the pure outgoing ack
......@@ -417,22 +413,20 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
}
} else {
// path_id = search_rcvdb(ip_src);
// If there is a fake entry, and the packet is incoming, then
// the source_port will be the remote port requested on the
// command line.
path_id = find_by_stub_port(ip_src, source_port);
// printf("incoming path_id: %d\n", path_id);
if (path_id == -1 || rcvdb[path_id].source_port != 0
|| rcvdb[path_id].dest_port != 0)
if (flag_standalone)
{
// If this is standalone mode, and the packet is incoming,
// then the source_port will be the remote port requested on
// the command line.
path_id = find_by_stub_port(ip_src, source_port);
}
else
{
// I contacted the receiver, so my port is unique and their
// port is the same every time. This means that if a packet is
// coming from them, the destination port is the one of
// interest.
path_id = find_by_stub_port(ip_src, dest_port);
// printf("stub path_id (incoming): %d\n", path_id);
}
if (path_id != -1) { //a monitored incoming packet
if (ack_bit == 1) { //has an acknowledgement
......@@ -445,16 +439,53 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
if (flag_resend) { //if the ack is triggered by a resend, skip the delay calculation.
flag_resend = 0;
} else { //calculate the delay
int delay = 0;
int delay = 0;
int bandwidth = 0;
int goodput = 0;
struct tcp_info info;
msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5);
delay = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs;
if (delay != 0)
{
delays[path_id] += delay;
(delay_count[path_id])++;
append_delay_sample(path_id, delay, &(pkthdr->ts));
if (delay < base_rtt[path_id])
{
base_rtt[path_id] = delay;
}
if (is_live && delay_count[path_id] > 0)
{
int info_size = sizeof(info);
int error = getsockopt(rcvdb[path_id].sockfd,
SOL_TCP, TCP_INFO, &info,
&info_size);
if (error == -1)
{
perror("getsockopt() TCP_INFO");
clean_exit(1);
}
bandwidth = (info.tcpi_snd_cwnd * info.tcpi_snd_mss)
/ base_rtt[path_id];
if (bandwidth > max_throughput[path_id])
{
max_throughput[path_id] = bandwidth;
}
goodput = throughputTick(&throughput[path_id]);
logWrite(DELAY_DETAIL, NULL, "Goodput: %d", goodput);
logWrite(DELAY_DETAIL, NULL, "Throughput: %lu", bandwidth);
logWrite(DELAY_DETAIL, NULL, "Congestion Window Size: %lu",
info.tcpi_snd_cwnd);
logWrite(DELAY_DETAIL, NULL, "Sending MSS: %lu",
info.tcpi_snd_mss);
logWrite(DELAY_DETAIL, NULL, "Base RTT: %lu",
base_rtt[path_id]);
}
// append_delay_sample(path_id, delay, &(pkthdr->ts));
logWrite(DELAY_DETAIL, &(pkthdr->ts),
"Delay: %lu, Sum: %lu, Count: %lu", delay,
delays[path_id], delay_count[path_id]);
}
}
pop_sniff_rcvdb(path_id, (unsigned long)(ack_seq-1)); //advance the sniff window base
} //ack in rcvdb
} //has ack
......@@ -526,7 +557,7 @@ u_int16_t handle_ethernet (u_char *args,const struct pcap_pkthdr* pkthdr,const u
return ether_type;
}
void init_pcap(int to_ms, unsigned short port, char * device, int is_live) {
void init_pcap(int to_ms, unsigned short port, char * device) {
char errbuf[PCAP_ERRBUF_SIZE];
if (flag_debug) {
......
......@@ -108,6 +108,7 @@ typedef struct {
} delay_record;
extern short flag_debug;
extern short flag_standalone;
extern int pcapfd;
extern int maxfd;
extern connection snddb[CONCURRENT_SENDERS];
......@@ -119,10 +120,13 @@ extern unsigned long delay_count[CONCURRENT_RECEIVERS];
extern loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
extern unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
extern delay_record delay_records[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
extern int is_live;
extern unsigned long max_throughput[CONCURRENT_RECEIVERS];
extern unsigned long base_rtt[CONCURRENT_RECEIVERS];
extern void sniff(void);
extern void init_pcap(int to_ms, unsigned short port, char * device,
int is_live);
extern void init_pcap(int to_ms, unsigned short port, char * device);
extern void append_delay_sample(int path_id, long sample_value,
struct timeval const * timestamp);
extern void remove_delay_samples(int path_id);
......
......@@ -20,6 +20,8 @@ extern int opterr;
extern int optreset;
//Global
int is_live = 1;
short flag_debug, flag_standalone;
connection rcvdb[CONCURRENT_RECEIVERS];
unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
......@@ -267,26 +269,9 @@ void remove_delay_samples(int path_id)
delay_records[path_id].sample_number = 0;
}
/*
//Initialize or reset state varialbes related to a receiver connection
void reset_rcv_entry(int i) {
rcvdb[i].valid = 0;
loss_records[i].loss_counter=0;
loss_records[i].total_counter=0;
last_loss_rates[i]=0;
delays[i]=0;
last_delays[i]=0;
}
*/
void init(void) {
int i;
/* for (i=0; i<CONCURRENT_RECEIVERS; i++){
reset_rcv_entry(i);
}
for (i=0; i<CONCURRENT_SENDERS; i++){
snddb[i].valid = 0;
}*/
for (i = 0; i < CONCURRENT_RECEIVERS; i++)
{
add_empty_receiver(i);
......@@ -299,77 +284,6 @@ void init(void) {
}
}
/*
int insert_db(unsigned long ip, unsigned short source_port,
unsigned short dest_port, int sockfd, int dbtype) {
int i, record_number, next = -1;
time_t now = time(NULL);
double thisdiff, maxdiff = 0;
connection *db;
if (dbtype == 0 ) {
db = rcvdb;
record_number = CONCURRENT_RECEIVERS;
} else {
db = snddb;
record_number = CONCURRENT_SENDERS;
}
//find an unused entry or LRU entry
for (i=0; i<record_number; i++){
if (db[i].valid == 0) {
next = i;
break;
} else {
thisdiff = difftime(now, db[i].last_usetime);
if (thisdiff > maxdiff) {
maxdiff = thisdiff;
next = i;
}
}
}
if (db[next].valid == 1) {
close(db[next].sockfd);
if (dbtype == 0 ) { //if rcvdb
//reset related state variables
sniff_rcvdb[next].start = 0;
sniff_rcvdb[next].end = 0;
throughput[next].isValid = 0;
FD_CLR(db[next].sockfd, &write_fds);
reset_rcv_entry(next);
} else { //if snddb
FD_CLR(db[next].sockfd, &read_fds);
}
}
db[next].valid = 1;
db[next].ip = ip;
db[next].source_port = source_port;
db[next].dest_port = dest_port;
db[next].sockfd= sockfd;
db[next].last_usetime = now;
db[next].pending = 0;
return next;
}
*/
/*
int search_rcvdb(unsigned long indexip, unsigned short source_port,
unsigned short dest_port){
int i;
for (i=0; i<CONCURRENT_RECEIVERS; i++){
if (rcvdb[i].valid==1 && rcvdb[i].ip == indexip
&& rcvdb[i].source_port == source_port
&& rcvdb[i].dest_port == dest_port) {
rcvdb[i].last_usetime = time(NULL);
return i;
}
}
return -1; //no sockfd is -1
}
*/
void clean_exit(int code){
int i;
......@@ -388,39 +302,6 @@ void clean_exit(int code){
exit(code);
}
/*
int get_rcvdb_index(unsigned long destaddr, unsigned short source_port,
unsigned short dest_port){
int dbindex, sockfd;
struct sockaddr_in their_addr; // connector's address information
if ((dbindex=search_rcvdb(destaddr, source_port, dest_port)) == -1) {
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
clean_exit(1);
}
their_addr.sin_family = AF_INET; // host byte order
their_addr.sin_port = htons(SENDER_PORT); // short, network byte order
their_addr.sin_addr.s_addr = destaddr;
memset(&(their_addr.sin_zero), '\0', 8); // zero the rest of the struct
if (flag_debug) printf("Try to connect to %s \n", inet_ntoa(their_addr.sin_addr));
if (connect(sockfd, (struct sockaddr *)&their_addr, sizeof(struct sockaddr)) == -1) {
perror("connect");
clean_exit(1);
}
if (sockfd > maxfd) {
maxfd = sockfd;
}
//insert rcvdb
dbindex=insert_db(destaddr, source_port, dest_port, sockfd, 0);
}
return dbindex;
}
*/
void remove_pending(int index)
{
if (rcvdb[index].pending == 0)
......@@ -513,11 +394,14 @@ int send_all(int sockfd, char *buf, int size) {
void receive_sender(int i) {
static char inbuf[MAX_PAYLOAD_SIZE];
int receive_count = 0;
if (recv(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE, 0)== 0) {
receive_count = recv(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE, 0);
if (receive_count == 0) {
//connection closed
remove_sender_index(i, &read_fds);
}
logWrite(PEER_READ, NULL, "Read %d bytes", receive_count);
}
......@@ -544,19 +428,6 @@ void send_receiver(int index, int packet_size, fd_set * write_fds_copy){
logWrite(PEER_WRITE, NULL, "Wrote %d bytes", error);
// Handle failed connection
while (error == -1 && errno == ECONNRESET && retry < 3) {
/*
//reset the related state variables
int pending = rcvdb[index].pending;
sniff_rcvdb[index].start = 0;
sniff_rcvdb[index].end = 0;
throughput[index].isValid = 0;
FD_CLR(rcvdb[index].sockfd, &write_fds);
// TODO: Fix redo
reset_rcv_entry(index);
//try again
index = get_rcvdb_index(packet.ip, packet.source_port, packet.dest_port);
rcvdb[index].pending = pending;
*/
reconnect_receiver(index);
sockfd= rcvdb[index].sockfd;
error = send(sockfd, random_buffer, packet_size, MSG_DONTWAIT);
......@@ -658,7 +529,6 @@ int receive_monitor(int sockfd, struct timeval * deadline) {
gettimeofday(deadline, NULL);
}
packet_buffer_add(packet_buffer, buffer_size);
return 1;
}
......@@ -686,27 +556,35 @@ int send_delay_to_monitor(int monitor, int index)
{
int buffer_size = 3*SIZEOF_LONG + 2*sizeof(unsigned short);
char outbuf_delay[buffer_size];
unsigned long delay = 0;
unsigned long tmpulong;
if (delay_count[index] > 0)
{
delays[index] /= delay_count[index];
}
else
// delay = delays[index];
// if (delay_count[index] > 0)
// {
// delay /= delay_count[index];
// }
// else
// {
// delay = last_delays[index];
// }
delay = base_rtt[index];
if (delay_count[index] == 0)
{
delays[index] = last_delays[index];
delay = 0;
}
// If measurement changed since last send
// if (abs((long)delays[index] - (long)last_delays[index])
// > (long)(last_delays[index]/5)) {
if (delays[index] != last_delays[index])
if (delay != last_delays[index])
{
// Insert the address info
char * buf = save_receiver_address(outbuf_delay, index);
logWrite(CONTROL_SEND, NULL, "Sending delay(%d) about stream(%hu:%s:%hu)",
delays[index], rcvdb[index].source_port,
delay, rcvdb[index].source_port,
ipToString(rcvdb[index].ip), rcvdb[index].dest_port);
// Insert the code number for delay
......@@ -715,7 +593,7 @@ int send_delay_to_monitor(int monitor, int index)
buf += SIZEOF_LONG;
// Insert the delay value
tmpulong = htonl(delays[index]);
tmpulong = htonl(delay);
memcpy(buf, &tmpulong, SIZEOF_LONG);
buf += SIZEOF_LONG;
......@@ -743,14 +621,15 @@ int send_delay_to_monitor(int monitor, int index)
now.tv_sec, now.tv_usec/1000000000.0,
last_delays[index],
now.tv_sec, now.tv_usec/1000000000.0,
delays[index]);
delay);
earlier = now;
}
// printf("Sent delay: %ld\n", delays[i]);
}
last_delays[index] = delays[index];
last_delays[index] = delay;
delays[index] = 0;
delay_count[index] = 0;
// base_rtt[index] = LONG_MAX;
return 1;
}
......@@ -760,7 +639,8 @@ int send_bandwidth_to_monitor(int monitor, int index)
int buffer_size = 3*SIZEOF_LONG + 2*sizeof(unsigned short);
char outbuf[buffer_size];
unsigned long code = htonl(CODE_BANDWIDTH);
unsigned long bandwidth = throughputTick(&throughput[index]);
// unsigned long bandwidth = throughputTick(&throughput[index]);
unsigned long bandwidth = max_throughput[index];
if (bandwidth != 0) {
// Insert the address info
......@@ -796,6 +676,7 @@ int send_bandwidth_to_monitor(int monitor, int index)
hostBand);
}
}
max_throughput[index] = 0;
return 1;
}
......@@ -1009,19 +890,19 @@ int have_time(struct timeval *start_tvp, struct timeval *left_tvp){
void usage() {
fprintf(stderr,"Usage: stubd [-t] [-d] [-s] <sniff-interface> [remote_IPaddr]\n");
fprintf(stderr," -d: Enable debugging mode\n");
fprintf(stderr," -f <filename>: Save logs into filename. By default, stderr is used.");
fprintf(stderr," -f <filename>: Save logs into filename. By default, stderr is used.\n");
fprintf(stderr," -l <option>: Enable logging for a particular part of the stub. 'everything' eneables all logging 'nothing' disables all logging.\n");
fprintf(stderr," control-send -- Control messages sent from the stub to the monitor");
fprintf(stderr," control-receive -- Control messages received from the monitor");
fprintf(stderr," tcptrace-send -- Control messages sent in xplot format (for comparison with tcptrace)");
fprintf(stderr," tcptrace-receive -- Control messages received in xplot format (for comparison with tcptrace)");
fprintf(stderr," sniff-send -- Outgoing packets detected by stub-pcap");
fprintf(stderr," sniff-receive -- Incoming packets detected by stub-pcap");
fprintf(stderr," peer-write -- Writes made to other stubs");
fprintf(stderr," peer-read -- Reads made from other stubs");
fprintf(stderr," main-loop -- Print out quanta information and the stages of the main loop");
fprintf(stderr," lookup-db -- Manipulations of the connection db");
fprintf(stderr," delay-detail -- The finest grain delay measurements");
fprintf(stderr," control-send -- Control messages sent from the stub to the monitor\n");
fprintf(stderr," control-receive -- Control messages received from the monitor\n");
fprintf(stderr," tcptrace-send -- Control messages sent in xplot format (for comparison with tcptrace)\n");
fprintf(stderr," tcptrace-receive -- Control messages received in xplot format (for comparison with tcptrace)\n");
fprintf(stderr," sniff-send -- Outgoing packets detected by stub-pcap\n");
fprintf(stderr," sniff-receive -- Incoming packets detected by stub-pcap\n");
fprintf(stderr," peer-write -- Writes made to other stubs\n");
fprintf(stderr," peer-read -- Reads made from other stubs\n");
fprintf(stderr," main-loop -- Print out quanta information and the stages of the main loop\n");
fprintf(stderr," lookup-db -- Manipulations of the connection db\n");
fprintf(stderr," delay-detail -- The finest grain delay measurements\n");
fprintf(stderr," -r: Enable replay mode. This also turns on standalone mode. The device is now used as a filename.\n");
fprintf(stderr," -s: Enable standalone mode\n");
fprintf(stderr," -t: Enable testing mode\n");
......@@ -1042,7 +923,6 @@ int main(int argc, char *argv[]) {
char ch;
unsigned short standalone_port = SENDER_PORT;
// Do we use live data? Or previously recorded data?
int is_live = 1;
FILE * logfile = NULL;
unsigned long quantum_no=0;
int logflags = LOG_NOTHING;
......@@ -1246,7 +1126,7 @@ int main(int argc, char *argv[]) {
//initialization
packet_buffer_init();
init_random_buffer();
init_pcap(SNIFF_TIMEOUT, standalone_port, argv[0], is_live);
init_pcap(SNIFF_TIMEOUT, standalone_port, argv[0]);
FD_ZERO(&read_fds);
FD_ZERO(&read_fds_copy);
FD_ZERO(&write_fds);
......@@ -1362,7 +1242,7 @@ int main(int argc, char *argv[]) {
//sniff packets
if (FD_ISSET(pcapfd, &read_fds_copy)) {
// logWrite(MAIN_LOOP, NULL, "Sniff packet stream");
logWrite(MAIN_LOOP, NULL, "Sniff packet stream");
sniff();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment