Commit 29ab19aa authored by Junxing Zhang's avatar Junxing Zhang
Browse files

add loss measurement to the stub

parent e459ec0d
......@@ -4,11 +4,11 @@
#include "stub.h"
char public_hostname0[] = "planetlab1.cs.dartmouth.edu"; //"planet0.measure.tbres.emulab.net";
char public_hostname1[] = "pl1.cs.utk.edu"; //"planet1.measure.tbres.emulab.net";
char public_hostname0[128]; //= "planetlab1.cs.dartmouth.edu"; // "planet0.measure.tbres.emulab.net";
char public_hostname1[128]; //= "pl1.cs.utk.edu"; // "planet1.measure.tbres.emulab.net";
char public_addr0[16];
char public_addr1[16];
char private_addr0[]="10.1.0.1";
char private_addr0[]="10.1.0.1"; //for testing on emulab only
char private_addr1[]="10.1.0.2";
short flag_debug;
fd_set read_fds,write_fds;
......@@ -34,8 +34,8 @@ void send_stub(int sockfd, char *addr, char *buf) {
inet_aton(addr, &address);
tmpulong = address.s_addr;
if (flag_debug) {
printf("tmpulong: %lu \n", tmpulong);
printf("store address: %s \n", inet_ntoa(address));
//printf("tmpulong: %lu \n", tmpulong);
printf("send the stub a probing address: %s \n", inet_ntoa(address));
}
memcpy(buf+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
//should use send_all()!
......@@ -94,6 +94,16 @@ int main(int argc, char *argv[])
else
flag_debug=0;
if (argc != 3) {
fprintf(stderr,"Usage: stub-monitor <hostname1> <hostname2>\n");
exit(1);
}
strcpy(public_hostname0, argv[1]);
strcpy(public_hostname1, argv[2]);
if (flag_debug) {
printf("hostname1: %s, hostname2: %s\n", public_hostname0, public_hostname1);
}
hp = gethostbyname(public_hostname0);
bcopy(hp->h_addr, &addr, hp->h_length);
ip = inet_ntoa(addr);
......@@ -169,11 +179,17 @@ int main(int argc, char *argv[])
}
//check write
if (flag_send_stub0==0 && FD_ISSET(sockfd0, &write_fds_copy)) {
send_stub(sockfd0, public_addr1, buf); //feed the stub with the private or public address
if (flag_debug) {
printf("send to: %s \n", public_addr0);
}
send_stub(sockfd0, public_addr1, buf); //feed the stub with the private or public address
flag_send_stub0=1;
}
if (flag_send_stub1==0 && FD_ISSET(sockfd1, &write_fds_copy)) {
if (flag_debug) {
printf("send to: %s \n", public_addr1);
}
send_stub(sockfd1, public_addr0, buf); //feed the stub with the private or public address
flag_send_stub1=1;
}
......
......@@ -36,18 +36,7 @@ struct my_ip {
struct in_addr ip_src,ip_dst; /* source and dest address */
};
struct sniff_record {
struct timeval captime;
unsigned long seq_start;
unsigned long seq_end;
};
typedef struct sniff_record sniff_record;
struct sniff_path {
sniff_record records[SNIFF_WINSIZE];
short start; //circular buffer pointers
short end;
};
typedef struct sniff_path sniff_path;
sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
pcap_t* descr;
int pcapfd;
......@@ -139,7 +128,7 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
u_int caplen = pkthdr->caplen;
u_short len, hlen, version, tcp_hlen, ack_bit;
u_long seq_start, seq_end, ack_seq, ip_src, ip_dst;
int path_id, record_id, msecs, end;
int path_id, record_id, msecs, end, flag_resend=0;
sniff_path *path;
/* jump pass the ethernet header */
......@@ -208,24 +197,28 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
path_id = search_rcvdb(ip_dst);
if (path_id != -1) { //a monitored outgoing packet
path = &(sniff_rcvdb[path_id]);
loss_records[path_id].total_counter++;
//ignore the pure outgoing ack
if ((ack_bit==1) && (seq_end==seq_start)) {
return 0;
}
//find the real received end index
if (path->end == path->start){ //no previous packet
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet
} else {
//find the real received end index
end = (path->end-1)%SNIFF_WINSIZE;
/* Note: we discard resent-packet records for the delay estimation because
* TCP don't use them to calculate the sample RTT in the RTT estimation */
/* Note: we use flag_resend to igore resend-affected-packets in the delay estimation
* because TCP don't use them to calculate the sample RTT in the RTT estimation */
//if the packet has no payload
if (seq_end == seq_start) {
if ((path->records[end].seq_end==path->records[end].seq_start) && (path->records[end].seq_end==seq_end)) {
//the last packet also has no payload and has the same seqnum
pop_sniff_rcvdb(path_id, seq_end); //pure resent
flag_resend = 1; //pure resent
loss_records[path_id].loss_counter++;
} else {
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet
}
......@@ -233,10 +226,12 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts));
} else {
if (seq_end > path->records[end].seq_end){ //partial resend
pop_sniff_rcvdb(path_id, path->records[end].seq_end-1);
flag_resend = 1;
loss_records[path_id].loss_counter++;
return push_sniff_rcvdb(path_id, path->records[end].seq_end+1, seq_end, &(pkthdr->ts));
} else { //pure resend
pop_sniff_rcvdb(path_id, seq_end-1);
flag_resend = 1;
loss_records[path_id].loss_counter++;
}
} // if has payload and resent
}
......@@ -247,10 +242,14 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
if (ack_bit == 1) { //has an acknowledgement
ack_seq = ntohl(tp->ack_seq);
record_id = search_sniff_rcvdb(path_id, ack_seq-1);
if (record_id != -1) {
msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5);
delays[path_id] = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs;
pop_sniff_rcvdb(path_id, ack_seq-1);
if (record_id != -1) { //new ack received
if (flag_resend) { //if the ack is triggered by a resend, skip the delay calculation.
flag_resend = 0;
} else { //calculate the delay
msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5);
delays[path_id] = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs;
}
pop_sniff_rcvdb(path_id, ack_seq-1); //advance the sniff window base
} //ack in rcvdb
} //has ack
} //if incoming
......@@ -328,7 +327,7 @@ void init_pcap(int to_ms) {
char string_filter[128];
//struct in_addr addr;
dev = "vnet"; //"eth0";
dev = "vnet"; //"eth0"; //
/* ask pcap for the network address and mask of the device */
pcap_lookupnet(dev,&netp,&maskp,errbuf);
......
......@@ -52,11 +52,32 @@ struct connection {
time_t last_usetime; //last monitor access time
};
typedef struct connection connection;
struct sniff_record {
struct timeval captime;
unsigned long seq_start;
unsigned long seq_end;
};
typedef struct sniff_record sniff_record;
struct sniff_path {
sniff_record records[SNIFF_WINSIZE];
short start; //circular buffer pointers
short end;
};
typedef struct sniff_path sniff_path;
struct loss_record {
unsigned int loss_counter; //in terms of packet
unsigned int total_counter;
};
typedef struct loss_record loss_record;
extern short flag_debug;
extern int pcapfd;
extern connection rcvdb[CONCURRENT_RECEIVERS];
extern unsigned long delays[CONCURRENT_SENDERS];
extern sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
extern unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
extern loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
extern int search_rcvdb(unsigned long indexip);
extern void sniff(void);
extern void init_pcap(int to_ms);
......
......@@ -12,17 +12,25 @@
//Global
short flag_debug;
connection rcvdb[CONCURRENT_RECEIVERS];
unsigned long delays[CONCURRENT_SENDERS];
unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
unsigned long last_delays[CONCURRENT_RECEIVERS];
loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
connection snddb[CONCURRENT_SENDERS];
unsigned long last_delays[CONCURRENT_SENDERS];
unsigned long throughputs[CONCURRENT_SENDERS], last_throughputs[CONCURRENT_SENDERS];
fd_set read_fds,write_fds;
void init_db(void) {
void init(void) {
int i;
for (i=0; i<CONCURRENT_RECEIVERS; i++){
rcvdb[i].valid = 0;
loss_records[i].loss_counter=0;
loss_records[i].total_counter=0;
last_loss_rates[i]=0;
delays[i]=0;
last_delays[i]=0;
}
for (i=0; i<CONCURRENT_SENDERS; i++){
snddb[i].valid = 0;
......@@ -30,19 +38,21 @@ void init_db(void) {
}
int insert_db(unsigned long ip, int sockfd, int dbtype) {
int i, next = -1;
int i, record_number, next = -1;
time_t now = time(NULL);
double thisdiff, maxdiff = 0;
connection *db;
if (dbtype == 0 ) {
db = rcvdb;
record_number = CONCURRENT_RECEIVERS;
} else {
db = snddb;
record_number = CONCURRENT_SENDERS;
}
//find an unused entry or LRU entry
for (i=0; i<CONCURRENT_RECEIVERS; i++){
for (i=0; i<record_number; i++){
if (db[i].valid == 0) {
next = i;
break;
......@@ -55,6 +65,10 @@ int insert_db(unsigned long ip, int sockfd, int dbtype) {
}
}
if (db[next].valid == 1) {
if (dbtype == 0 ) {
//if it is a rcvdb record, reset the corresponding sniff_rcvdb record
sniff_rcvdb[next].start= sniff_rcvdb[next].end;
}
FD_CLR(db[next].sockfd, &read_fds);
close(db[next].sockfd);
}
......@@ -251,39 +265,47 @@ int receive_monitor(int sockfd) {
}
int send_monitor(int sockfd) {
char outbuf_delay[3*SIZEOF_LONG], outbuf_bandwidth[3*SIZEOF_LONG];
unsigned long tmpulong;
char outbuf_delay[3*SIZEOF_LONG], outbuf_loss[3*SIZEOF_LONG];
unsigned long tmpulong, loss_rate;
int i;
//float tmpf;
tmpulong = htonl(CODE_DELAY);
memcpy(outbuf_delay+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(CODE_BANDWIDTH);
memcpy(outbuf_bandwidth+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(CODE_LOSS);
memcpy(outbuf_loss+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
for (i=0; i<CONCURRENT_RECEIVERS; i++){
if (snddb[i].valid == 1) {
if (rcvdb[i].valid == 1) {
//send delay
if (delays[i] != last_delays[i]) {
memcpy(outbuf_delay, &(snddb[i].ip), SIZEOF_LONG); //the sender or src ip
memcpy(outbuf_delay, &(rcvdb[i].ip), SIZEOF_LONG); //the receiver ip
tmpulong = htonl(delays[i]);
memcpy(outbuf_delay+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
if (send_all(sockfd, outbuf_delay, 3*SIZEOF_LONG) == 0){
return 0;
}
last_delays[i] = delays[i];
last_delays[i] = delays[i];
printf("Sent delay: %ld\n", delays[i]);
} //if measurement changed since last send
/* Throughput is not measured at the moment
if (throughputs[i] != last_throughputs[i]) {
memcpy(outbuf_bandwidth, &(snddb[i].ip), SIZEOF_LONG); //the sender or src ip
tmpf=throughputs[i]/QUANTA+0.5f;
tmpulong = htonl(floor(tmpf)+BANDWIDTH_OVER_THROUGHPUT);
memcpy(outbuf_bandwidth+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
if (send_all(sockfd, outbuf_bandwidth, 3*SIZEOF_LONG) == 0){
//send loss
if (loss_records[i].total_counter == 0){
loss_rate = 0;
} else {
loss_rate = floor(loss_records[i].loss_counter*1000000000.0f/loss_records[i].total_counter+0.5f); //loss per billion
}
if (loss_rate != last_loss_rates[i]) {
memcpy(outbuf_loss, &(rcvdb[i].ip), SIZEOF_LONG); //the receiver ip
tmpulong = htonl(loss_rate);
memcpy(outbuf_loss+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
if (send_all(sockfd, outbuf_loss, 3*SIZEOF_LONG) == 0){
return 0;
}
last_throughputs[i] = throughputs[i];
last_loss_rates[i] = loss_rate;
printf("Sent loss: %d/%d=%ld \n", loss_records[i].loss_counter, loss_records[i].total_counter, loss_rate);
} //if measurement changed since last send
*/
loss_records[i].loss_counter=0;
loss_records[i].total_counter=0;
} //if connection is valid
} //for
......@@ -368,7 +390,7 @@ int main(void) {
}
//initialization
init_db();
init();
init_pcap(SNIFF_TIMEOUT);
FD_ZERO(&read_fds);
FD_ZERO(&read_fds_copy);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment