From 68d41f20a24f34a8cb7142319c3a16f41022137a Mon Sep 17 00:00:00 2001 From: Junxing Zhang Date: Tue, 10 Jan 2006 14:14:48 +0000 Subject: [PATCH] add Makefile for stub --- pelab/stub/Makefile | 13 ++++ pelab/stub/stub-pcap.c | 137 ++++++++++++++++++++++++----------------- pelab/stub/stub.h | 16 ++++- pelab/stub/stubd.c | 88 ++++++++------------------ 4 files changed, 134 insertions(+), 120 deletions(-) create mode 100644 pelab/stub/Makefile diff --git a/pelab/stub/Makefile b/pelab/stub/Makefile new file mode 100644 index 000000000..b5f746e65 --- /dev/null +++ b/pelab/stub/Makefile @@ -0,0 +1,13 @@ +all: stubd stub-monitor + +stubd: stubd.o stub-pcap.o stub.h + gcc -g -Wall -lm -lpcap stubd.o stub-pcap.o -o stubd +stub-monitor: stub-monitor.c stub.h + gcc -g -Wall stub-monitor.c -o stub-monitor +stubd.o: stubd.c stub.h + gcc -g -Wall -c stubd.c +stub-pcap.o: stub-pcap.c stub.h + gcc -g -Wall -c stub-pcap.c + +clean: + rm *.o stubd stub-monitor diff --git a/pelab/stub/stub-pcap.c b/pelab/stub/stub-pcap.c index fef28ef51..4386d5988 100644 --- a/pelab/stub/stub-pcap.c +++ b/pelab/stub/stub-pcap.c @@ -1,16 +1,3 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include "stub.h" /* tcpdump header (ether.h) defines ETHER_HDRLEN) */ @@ -56,12 +43,14 @@ struct sniff_record { }; typedef struct sniff_record sniff_record; struct sniff_path { - sniff_record records[SNIFFWIN_SIZE]; + sniff_record records[SNIFF_WINSIZE]; short start; //circular buffer pointers short end; }; typedef struct sniff_path sniff_path; sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS]; +pcap_t* descr; +int pcapfd; void init_sniff_rcvdb(void) { int i; @@ -78,8 +67,9 @@ int push_sniff_rcvdb(int path_id, u_long start_seq, u_long end_seq, const struct int next; path = &(sniff_rcvdb[path_id]); - next = (((path->end)+1)%SNIFFWIN_SIZE); - if ((next-(path->start))%SNIFFWIN_SIZE == 0){ + next = path->end; + //The circular buffer is full when the start and end pointers are back to back + if ((path->start-next)%SNIFF_WINSIZE == (SNIFF_WINSIZE-1)){ addr.s_addr =rcvdb[path_id].ip; printf("Error: circular buffer is full for the path to %s", inet_ntoa(addr)); return -1; @@ -88,19 +78,24 @@ int push_sniff_rcvdb(int path_id, u_long start_seq, u_long end_seq, const struct path->records[next].seq_end = end_seq; path->records[next].captime.tv_sec = ts->tv_sec; path->records[next].captime.tv_usec = ts->tv_usec; - path->end=next; + path->end=(next+1)%SNIFF_WINSIZE; return 0; } + int search_sniff_rcvdb(int path_id, u_long seqnum) { sniff_path *path = &(sniff_rcvdb[path_id]); int next = path->start; while (next != (path->end)){ - if ((path->records[next].seq_start)<=seqnum && (path->records[next].seq_end)>=seqnum) { + if ((path->records[next].seq_start)==(path->records[next].seq_end)){//no payload + if ((path->records[next].seq_end)==seqnum){ + return next; + } + } else if ((path->records[next].seq_start)<=seqnum && (path->records[next].seq_end)>seqnum) { return next; } - next = ((next+1) % SNIFFWIN_SIZE); + next = (next+1) % SNIFF_WINSIZE; } return -1; } @@ -108,8 +103,9 @@ int search_sniff_rcvdb(int path_id, u_long seqnum) { void pop_sniff_rcvdb(int path_id, u_long to_seqnum){ int to_index = search_sniff_rcvdb(path_id, to_seqnum); if (to_index != -1) { - if (sniff_rcvdb[path_id].records[to_index].seq_end == to_seqnum) { - sniff_rcvdb[path_id].start = to_index+1; //complete pop-up + if ((sniff_rcvdb[path_id].records[to_index].seq_end==sniff_rcvdb[path_id].records[to_index].seq_start) + || (sniff_rcvdb[path_id].records[to_index].seq_end-1 == to_seqnum)) { + sniff_rcvdb[path_id].start = (to_index+1)%SNIFF_WINSIZE; //complete pop-up } else { sniff_rcvdb[path_id].start = to_index; //partial pop-up sniff_rcvdb[path_id].records[to_index].seq_start = to_seqnum+1; @@ -137,7 +133,6 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char const struct my_ip* ip; const struct tcphdr *tp; const struct udphdr *up; - struct in_addr addr_src, addr_dst; u_char *cp; u_int length = pkthdr->len; u_int caplen = pkthdr->caplen; @@ -185,11 +180,10 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char ip_src = ip->ip_src.s_addr; ip_dst = ip->ip_dst.s_addr; - if (flag_debug){ - addr_src.s_addr = ip_src; - addr_dst.s_addr = ip_dst; - fprintf(stdout,"src:%s dst:%s hlen:%d version:%d len:%d\n", - inet_ntoa(addr_src),inet_ntoa(addr_dst),hlen,version,len); + if (flag_debug){ + //For an unknown reason, inet_ntoa returns the same string if called twice in one fprintf + fprintf(stdout,"IP src:%s ", inet_ntoa(ip->ip_src)); + fprintf(stdout,"dst:%s hlen:%d version:%d len:%d\n",inet_ntoa(ip->ip_dst),hlen,version,len); } /*jump pass the ip header */ cp = (u_char *)ip + (hlen * 4); @@ -207,36 +201,55 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char tcp_hlen = ((tp)->doff & 0x000f); length -= (tcp_hlen * 4); //jump pass the tcp header seq_start = ntohl(tp->seq); - seq_end = seq_start+length-1; + seq_end = seq_start+length; + ack_bit= ((tp)->ack & 0x0001); path_id = search_rcvdb(ip_dst); if (path_id != -1) { //a monitored outgoing packet path = &(sniff_rcvdb[path_id]); - end = path->end; - if (seq_start > path->records[end].seq_end) { //new packet - return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); + //ignore the pure outgoing ack + if ((ack_bit==1) && (seq_end==seq_start)) { + return 0; + } + + //find the real received end index + if (path->end == path->start){ //no previous packet + return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet } else { + end = (path->end-1)%SNIFF_WINSIZE; /* Note: we discard resent-packet records for the delay estimation because - * TCP don't use them to calculate the sample RTT in the RTT estimation */ - if (seq_end > path->records[end].seq_end){ //partial resend - pop_sniff_rcvdb(path_id, path->records[end].seq_end); - return push_sniff_rcvdb(path_id, path->records[end].seq_end+1, seq_end, &(pkthdr->ts)); - } else { //pure resend - pop_sniff_rcvdb(path_id, seq_end); - } - } + * TCP don't use them to calculate the sample RTT in the RTT estimation */ + //if the packet has no payload + if (seq_end == seq_start) { + if ((path->records[end].seq_end==path->records[end].seq_start) && (path->records[end].seq_end==seq_end)) { + //the last packet also has no payload and has the same seqnum + pop_sniff_rcvdb(path_id, seq_end); //pure resent + } else { + return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet + } + } else if (seq_start >= path->records[end].seq_end) { //new packet + return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); + } else { + if (seq_end > path->records[end].seq_end){ //partial resend + pop_sniff_rcvdb(path_id, path->records[end].seq_end-1); + return push_sniff_rcvdb(path_id, path->records[end].seq_end+1, seq_end, &(pkthdr->ts)); + } else { //pure resend + pop_sniff_rcvdb(path_id, seq_end-1); + } + } // if has payload and resent + } + } else { path_id = search_rcvdb(ip_src); if (path_id != -1) { //a monitored incoming packet - ack_bit= ((tp)->ack & 0x0001); if (ack_bit == 1) { //has an acknowledgement ack_seq = ntohl(tp->ack_seq); - record_id = search_sniff_rcvdb(path_id, ack_seq); + record_id = search_sniff_rcvdb(path_id, ack_seq-1); if (record_id != -1) { - msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000+0.5); + msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5); delays[path_id] = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs; - pop_sniff_rcvdb(path_id, ack_seq); + pop_sniff_rcvdb(path_id, ack_seq-1); } //ack in rcvdb } //has ack } //if incoming @@ -305,23 +318,22 @@ u_int16_t handle_ethernet (u_char *args,const struct pcap_pkthdr* pkthdr,const u return ether_type; } - -void sniff(int to_ms) { +void init_pcap(int to_ms) { char *dev; char errbuf[PCAP_ERRBUF_SIZE]; - pcap_t* descr; struct bpf_program fp; /* hold compiled program */ bpf_u_int32 maskp; /* subnet mask */ bpf_u_int32 netp; /* ip */ - u_char* args = NULL; - char string_filter[25]; - struct in_addr addr; + char string_filter[128]; + //struct in_addr addr; + + dev = "eth0"; //"vnet"; - dev = "vnet"; /* ask pcap for the network address and mask of the device */ pcap_lookupnet(dev,&netp,&maskp,errbuf); - addr.s_addr = netp; - sprintf(string_filter, "host %s", inet_ntoa(addr)); + //For an unknown reason, netp has the wrong 4th number + //addr.s_addr = netp; + sprintf(string_filter, "port %d and tcp", SENDER_PORT); /* open device for reading. * NOTE: We use non-promiscuous */ @@ -331,17 +343,32 @@ void sniff(int to_ms) { exit(1); } - /* Lets try and compile the program.. non-optimized */ - if(pcap_compile(descr, &fp, string_filter, 0, netp) == -1) { + + // Lets try and compile the program, optimized + if(pcap_compile(descr, &fp, string_filter, 1, maskp) == -1) { fprintf(stderr,"Error: calling pcap_compile\n"); exit(1); } - /* set the compiled program as the filter */ + // set the compiled program as the filter if(pcap_setfilter(descr,&fp) == -1) { fprintf(stderr,"Error: setting filter\n"); exit(1); } + /* + if (pcap_setnonblock(descr, 1, errbuf) == -1){ + printf("Error: pcap_setnonblock(): %s\n",errbuf); + exit(1); + } + */ + + pcapfd = pcap_fileno(descr); + init_sniff_rcvdb(); +} + +void sniff(void) { + u_char* args = NULL; + /* use dispatch here instead of loop * Note: no max pkt num is specified */ pcap_dispatch(descr,-1,my_callback,args); diff --git a/pelab/stub/stub.h b/pelab/stub/stub.h index 80d2438ef..61eb2ec3e 100644 --- a/pelab/stub/stub.h +++ b/pelab/stub/stub.h @@ -17,9 +17,16 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include #define STDIN 0 // file descriptor for standard input -#define QUANTA 5000000 //feed-loop interval in usec +#define QUANTA 5000 //feed-loop interval in msec #define MONITOR_PORT 3490 //the port the monitor connects to #define SENDER_PORT 3491 //the port the stub senders connect to #define PENDING_CONNECTIONS 10 //the pending connections the queue will hold @@ -29,7 +36,8 @@ #define MAX_TCPDUMP_LINE 256 //the max line size of the tcpdump output #define SIZEOF_LONG sizeof(long) //message bulding block #define BANDWIDTH_OVER_THROUGHPUT 0 //the safty margin for estimating the available bandwidth -#define SNIFFWIN_SIZE 131071 //from min(net.core.rmem_max, max(net.ipv4.tcp_rmem)) on Plab linux +#define SNIFF_WINSIZE 131071 //from min(net.core.rmem_max, max(net.ipv4.tcp_rmem)) on Plab linux +#define SNIFF_TIMEOUT QUANTA/10 //in msec //magic numbers #define CODE_BANDWIDTH 0x00000001 @@ -46,10 +54,12 @@ struct connection { typedef struct connection connection; extern short flag_debug; +extern int pcapfd; extern connection rcvdb[CONCURRENT_RECEIVERS]; extern unsigned long delays[CONCURRENT_SENDERS]; extern int search_rcvdb(unsigned long indexip); -extern void sniff(int to_ms); +extern void sniff(void); +extern void init_pcap(int to_ms); #endif diff --git a/pelab/stub/stubd.c b/pelab/stub/stubd.c index 1d2c9e099..319472339 100644 --- a/pelab/stub/stubd.c +++ b/pelab/stub/stubd.c @@ -169,13 +169,14 @@ int send_all(int sockfd, char *buf, int size) { void receive_sender(int i) { char inbuf[MAX_PAYLOAD_SIZE]; - unsigned long tmpulong, sndsec, sndusec; - struct timeval rcvtime; + //unsigned long tmpulong, sndsec, sndusec; + //struct timeval rcvtime; if (recv_all(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE)== 0) { //connection closed snddb[i].valid = 0; FD_CLR(snddb[i].sockfd, &read_fds); } else { + /* outdated since we use sniff for delay measurement now gettimeofday(&rcvtime, NULL); memcpy(&tmpulong, inbuf, SIZEOF_LONG); sndsec = ntohl(tmpulong); @@ -183,25 +184,30 @@ void receive_sender(int i) { sndusec = ntohl(tmpulong); delays[i] = (rcvtime.tv_sec-sndsec)*1000+floor((rcvtime.tv_usec-sndusec)/1000+0.5); if (flag_debug) printf("One Way Delay (msec): %ld \n",delays[i]); + */ } } void send_receiver(unsigned long destaddr, char *buf){ int i, index; - struct timeval sendtime; - unsigned long tmpulong; int sockfd; + //struct timeval sendtime; + //unsigned long tmpulong; + index = get_rcvdb_index(destaddr); sockfd= rcvdb[index].sockfd; srandom(getpid()); for (i=0; itv_sec)*1000000+ (current_tv.tv_usec-start_tvp->tv_usec); - left_usec = QUANTA-past_usec; + left_usec = QUANTA*1000-past_usec; //QUANTA is in msec if (left_usec > 0) { left_tvp->tv_sec = left_usec/1000000; left_tvp->tv_usec= left_usec%1000000; @@ -292,52 +302,6 @@ int have_time(struct timeval *start_tvp, struct timeval *left_tvp){ return 0; } -void readline(void) { - struct in_addr address; - char line[MAX_TCPDUMP_LINE]; - char *trash, *src, *size, *id; - char ip[16], size_string[5]; - int i=0, j=0, numbytes=0, index; - - if(fgets(line, MAX_TCPDUMP_LINE, STDIN) != NULL) { - strtok(line, " "); - while ( (trash=strtok(NULL, " ")) != NULL){ - if (i == 2 ) { - src = trash; - } else if (i == 6){ - id = trash; - } else if (i == 7){ - size = trash; - break; - } - } //while - if (src != NULL){ - i = 0; - j = 0; - while (j < 4 ) { - ip[i] = src[i]; - if (src[i] == '.'){ j++; } - i++; - } - if (j==4){ - ip[15]='\0'; - inet_aton(ip, &address); - if ((index=search_rcvdb(address.s_addr)) != -1) { - if (id != NULL) { - if (strcmp(id,"length")!=0) { - numbytes = atoi(size); //error on size==null? - } else { - sscanf(id, "%*s(%s)",size_string); - numbytes = atoi(size_string); - } //if id=="length" - throughputs[index] += numbytes; - } //if id exists - } //if ip in rcvdb - } //if 4 dots - } //if src exists - } //if fgets -} - int main(void) { int sockfd_snd, sockfd_rcv_sender, sockfd_rcv_monitor, sockfd_monitor=-1; struct sockaddr_in my_addr; // my address information @@ -345,7 +309,7 @@ int main(void) { fd_set read_fds_copy, write_fds_copy; socklen_t sin_size; struct timeval start_tv, left_tv; - int yes=1, maxfd, i, to_ms, flag_send_monitor=0; + int yes=1, maxfd, i, flag_send_monitor=0; //set up debug flag if (getenv("Debug")!=NULL) @@ -400,16 +364,17 @@ int main(void) { } //initialization + init_db(); + init_pcap(SNIFF_TIMEOUT); FD_ZERO(&read_fds); FD_ZERO(&read_fds_copy); FD_ZERO(&write_fds); FD_ZERO(&write_fds_copy); + FD_SET(pcapfd, &read_fds); FD_SET(sockfd_rcv_sender, &read_fds); FD_SET(sockfd_rcv_monitor, &read_fds); - FD_SET(STDIN, &read_fds); - maxfd = sockfd_rcv_monitor; //socket order + maxfd = pcapfd; //socket order sin_size = sizeof(struct sockaddr_in); - init_db(); //main loop - the stubd runs forever while (1) { @@ -493,9 +458,8 @@ int main(void) { } //sniff packets - if (have_time(&start_tv, &left_tv)) { - to_ms = left_tv.tv_sec*1000+floor(left_tv.tv_usec/1000+0.5); - sniff(to_ms); + if (FD_ISSET(pcapfd, &read_fds_copy)) { + sniff(); } } //while in quanta -- GitLab