Commit 68d41f20 authored by Junxing Zhang's avatar Junxing Zhang

add Makefile for stub

parent 9d5e39ad
all: stubd stub-monitor
stubd: stubd.o stub-pcap.o stub.h
gcc -g -Wall -lm -lpcap stubd.o stub-pcap.o -o stubd
stub-monitor: stub-monitor.c stub.h
gcc -g -Wall stub-monitor.c -o stub-monitor
stubd.o: stubd.c stub.h
gcc -g -Wall -c stubd.c
stub-pcap.o: stub-pcap.c stub.h
gcc -g -Wall -c stub-pcap.c
clean:
rm *.o stubd stub-monitor
#include <pcap.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/if_ether.h>
#include <net/ethernet.h>
#include <netinet/ether.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <netinet/tcp.h>
#include "stub.h"
/* tcpdump header (ether.h) defines ETHER_HDRLEN) */
......@@ -56,12 +43,14 @@ struct sniff_record {
};
typedef struct sniff_record sniff_record;
struct sniff_path {
sniff_record records[SNIFFWIN_SIZE];
sniff_record records[SNIFF_WINSIZE];
short start; //circular buffer pointers
short end;
};
typedef struct sniff_path sniff_path;
sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
pcap_t* descr;
int pcapfd;
void init_sniff_rcvdb(void) {
int i;
......@@ -78,8 +67,9 @@ int push_sniff_rcvdb(int path_id, u_long start_seq, u_long end_seq, const struct
int next;
path = &(sniff_rcvdb[path_id]);
next = (((path->end)+1)%SNIFFWIN_SIZE);
if ((next-(path->start))%SNIFFWIN_SIZE == 0){
next = path->end;
//The circular buffer is full when the start and end pointers are back to back
if ((path->start-next)%SNIFF_WINSIZE == (SNIFF_WINSIZE-1)){
addr.s_addr =rcvdb[path_id].ip;
printf("Error: circular buffer is full for the path to %s", inet_ntoa(addr));
return -1;
......@@ -88,19 +78,24 @@ int push_sniff_rcvdb(int path_id, u_long start_seq, u_long end_seq, const struct
path->records[next].seq_end = end_seq;
path->records[next].captime.tv_sec = ts->tv_sec;
path->records[next].captime.tv_usec = ts->tv_usec;
path->end=next;
path->end=(next+1)%SNIFF_WINSIZE;
return 0;
}
int search_sniff_rcvdb(int path_id, u_long seqnum) {
sniff_path *path = &(sniff_rcvdb[path_id]);
int next = path->start;
while (next != (path->end)){
if ((path->records[next].seq_start)<=seqnum && (path->records[next].seq_end)>=seqnum) {
if ((path->records[next].seq_start)==(path->records[next].seq_end)){//no payload
if ((path->records[next].seq_end)==seqnum){
return next;
}
} else if ((path->records[next].seq_start)<=seqnum && (path->records[next].seq_end)>seqnum) {
return next;
}
next = ((next+1) % SNIFFWIN_SIZE);
next = (next+1) % SNIFF_WINSIZE;
}
return -1;
}
......@@ -108,8 +103,9 @@ int search_sniff_rcvdb(int path_id, u_long seqnum) {
void pop_sniff_rcvdb(int path_id, u_long to_seqnum){
int to_index = search_sniff_rcvdb(path_id, to_seqnum);
if (to_index != -1) {
if (sniff_rcvdb[path_id].records[to_index].seq_end == to_seqnum) {
sniff_rcvdb[path_id].start = to_index+1; //complete pop-up
if ((sniff_rcvdb[path_id].records[to_index].seq_end==sniff_rcvdb[path_id].records[to_index].seq_start)
|| (sniff_rcvdb[path_id].records[to_index].seq_end-1 == to_seqnum)) {
sniff_rcvdb[path_id].start = (to_index+1)%SNIFF_WINSIZE; //complete pop-up
} else {
sniff_rcvdb[path_id].start = to_index; //partial pop-up
sniff_rcvdb[path_id].records[to_index].seq_start = to_seqnum+1;
......@@ -137,7 +133,6 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
const struct my_ip* ip;
const struct tcphdr *tp;
const struct udphdr *up;
struct in_addr addr_src, addr_dst;
u_char *cp;
u_int length = pkthdr->len;
u_int caplen = pkthdr->caplen;
......@@ -185,11 +180,10 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
ip_src = ip->ip_src.s_addr;
ip_dst = ip->ip_dst.s_addr;
if (flag_debug){
addr_src.s_addr = ip_src;
addr_dst.s_addr = ip_dst;
fprintf(stdout,"src:%s dst:%s hlen:%d version:%d len:%d\n",
inet_ntoa(addr_src),inet_ntoa(addr_dst),hlen,version,len);
if (flag_debug){
//For an unknown reason, inet_ntoa returns the same string if called twice in one fprintf
fprintf(stdout,"IP src:%s ", inet_ntoa(ip->ip_src));
fprintf(stdout,"dst:%s hlen:%d version:%d len:%d\n",inet_ntoa(ip->ip_dst),hlen,version,len);
}
/*jump pass the ip header */
cp = (u_char *)ip + (hlen * 4);
......@@ -207,36 +201,55 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
tcp_hlen = ((tp)->doff & 0x000f);
length -= (tcp_hlen * 4); //jump pass the tcp header
seq_start = ntohl(tp->seq);
seq_end = seq_start+length-1;
seq_end = seq_start+length;
ack_bit= ((tp)->ack & 0x0001);
path_id = search_rcvdb(ip_dst);
if (path_id != -1) { //a monitored outgoing packet
path = &(sniff_rcvdb[path_id]);
end = path->end;
if (seq_start > path->records[end].seq_end) { //new packet
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts));
//ignore the pure outgoing ack
if ((ack_bit==1) && (seq_end==seq_start)) {
return 0;
}
//find the real received end index
if (path->end == path->start){ //no previous packet
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet
} else {
end = (path->end-1)%SNIFF_WINSIZE;
/* Note: we discard resent-packet records for the delay estimation because
* TCP don't use them to calculate the sample RTT in the RTT estimation */
if (seq_end > path->records[end].seq_end){ //partial resend
pop_sniff_rcvdb(path_id, path->records[end].seq_end);
return push_sniff_rcvdb(path_id, path->records[end].seq_end+1, seq_end, &(pkthdr->ts));
} else { //pure resend
pop_sniff_rcvdb(path_id, seq_end);
}
}
* TCP don't use them to calculate the sample RTT in the RTT estimation */
//if the packet has no payload
if (seq_end == seq_start) {
if ((path->records[end].seq_end==path->records[end].seq_start) && (path->records[end].seq_end==seq_end)) {
//the last packet also has no payload and has the same seqnum
pop_sniff_rcvdb(path_id, seq_end); //pure resent
} else {
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet
}
} else if (seq_start >= path->records[end].seq_end) { //new packet
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts));
} else {
if (seq_end > path->records[end].seq_end){ //partial resend
pop_sniff_rcvdb(path_id, path->records[end].seq_end-1);
return push_sniff_rcvdb(path_id, path->records[end].seq_end+1, seq_end, &(pkthdr->ts));
} else { //pure resend
pop_sniff_rcvdb(path_id, seq_end-1);
}
} // if has payload and resent
}
} else {
path_id = search_rcvdb(ip_src);
if (path_id != -1) { //a monitored incoming packet
ack_bit= ((tp)->ack & 0x0001);
if (ack_bit == 1) { //has an acknowledgement
ack_seq = ntohl(tp->ack_seq);
record_id = search_sniff_rcvdb(path_id, ack_seq);
record_id = search_sniff_rcvdb(path_id, ack_seq-1);
if (record_id != -1) {
msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000+0.5);
msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5);
delays[path_id] = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs;
pop_sniff_rcvdb(path_id, ack_seq);
pop_sniff_rcvdb(path_id, ack_seq-1);
} //ack in rcvdb
} //has ack
} //if incoming
......@@ -305,23 +318,22 @@ u_int16_t handle_ethernet (u_char *args,const struct pcap_pkthdr* pkthdr,const u
return ether_type;
}
void sniff(int to_ms) {
void init_pcap(int to_ms) {
char *dev;
char errbuf[PCAP_ERRBUF_SIZE];
pcap_t* descr;
struct bpf_program fp; /* hold compiled program */
bpf_u_int32 maskp; /* subnet mask */
bpf_u_int32 netp; /* ip */
u_char* args = NULL;
char string_filter[25];
struct in_addr addr;
char string_filter[128];
//struct in_addr addr;
dev = "eth0"; //"vnet";
dev = "vnet";
/* ask pcap for the network address and mask of the device */
pcap_lookupnet(dev,&netp,&maskp,errbuf);
addr.s_addr = netp;
sprintf(string_filter, "host %s", inet_ntoa(addr));
//For an unknown reason, netp has the wrong 4th number
//addr.s_addr = netp;
sprintf(string_filter, "port %d and tcp", SENDER_PORT);
/* open device for reading.
* NOTE: We use non-promiscuous */
......@@ -331,17 +343,32 @@ void sniff(int to_ms) {
exit(1);
}
/* Lets try and compile the program.. non-optimized */
if(pcap_compile(descr, &fp, string_filter, 0, netp) == -1) {
// Lets try and compile the program, optimized
if(pcap_compile(descr, &fp, string_filter, 1, maskp) == -1) {
fprintf(stderr,"Error: calling pcap_compile\n");
exit(1);
}
/* set the compiled program as the filter */
// set the compiled program as the filter
if(pcap_setfilter(descr,&fp) == -1) {
fprintf(stderr,"Error: setting filter\n");
exit(1);
}
/*
if (pcap_setnonblock(descr, 1, errbuf) == -1){
printf("Error: pcap_setnonblock(): %s\n",errbuf);
exit(1);
}
*/
pcapfd = pcap_fileno(descr);
init_sniff_rcvdb();
}
void sniff(void) {
u_char* args = NULL;
/* use dispatch here instead of loop
* Note: no max pkt num is specified */
pcap_dispatch(descr,-1,my_callback,args);
......
......@@ -17,9 +17,16 @@
#include <fcntl.h>
#include <netdb.h>
#include <math.h>
#include <pcap.h>
#include <netinet/if_ether.h>
#include <net/ethernet.h>
#include <netinet/ether.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <netinet/tcp.h>
#define STDIN 0 // file descriptor for standard input
#define QUANTA 5000000 //feed-loop interval in usec
#define QUANTA 5000 //feed-loop interval in msec
#define MONITOR_PORT 3490 //the port the monitor connects to
#define SENDER_PORT 3491 //the port the stub senders connect to
#define PENDING_CONNECTIONS 10 //the pending connections the queue will hold
......@@ -29,7 +36,8 @@
#define MAX_TCPDUMP_LINE 256 //the max line size of the tcpdump output
#define SIZEOF_LONG sizeof(long) //message bulding block
#define BANDWIDTH_OVER_THROUGHPUT 0 //the safty margin for estimating the available bandwidth
#define SNIFFWIN_SIZE 131071 //from min(net.core.rmem_max, max(net.ipv4.tcp_rmem)) on Plab linux
#define SNIFF_WINSIZE 131071 //from min(net.core.rmem_max, max(net.ipv4.tcp_rmem)) on Plab linux
#define SNIFF_TIMEOUT QUANTA/10 //in msec
//magic numbers
#define CODE_BANDWIDTH 0x00000001
......@@ -46,10 +54,12 @@ struct connection {
typedef struct connection connection;
extern short flag_debug;
extern int pcapfd;
extern connection rcvdb[CONCURRENT_RECEIVERS];
extern unsigned long delays[CONCURRENT_SENDERS];
extern int search_rcvdb(unsigned long indexip);
extern void sniff(int to_ms);
extern void sniff(void);
extern void init_pcap(int to_ms);
#endif
......
......@@ -169,13 +169,14 @@ int send_all(int sockfd, char *buf, int size) {
void receive_sender(int i) {
char inbuf[MAX_PAYLOAD_SIZE];
unsigned long tmpulong, sndsec, sndusec;
struct timeval rcvtime;
//unsigned long tmpulong, sndsec, sndusec;
//struct timeval rcvtime;
if (recv_all(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE)== 0) { //connection closed
snddb[i].valid = 0;
FD_CLR(snddb[i].sockfd, &read_fds);
} else {
/* outdated since we use sniff for delay measurement now
gettimeofday(&rcvtime, NULL);
memcpy(&tmpulong, inbuf, SIZEOF_LONG);
sndsec = ntohl(tmpulong);
......@@ -183,25 +184,30 @@ void receive_sender(int i) {
sndusec = ntohl(tmpulong);
delays[i] = (rcvtime.tv_sec-sndsec)*1000+floor((rcvtime.tv_usec-sndusec)/1000+0.5);
if (flag_debug) printf("One Way Delay (msec): %ld \n",delays[i]);
*/
}
}
void send_receiver(unsigned long destaddr, char *buf){
int i, index;
struct timeval sendtime;
unsigned long tmpulong;
int sockfd;
//struct timeval sendtime;
//unsigned long tmpulong;
index = get_rcvdb_index(destaddr);
sockfd= rcvdb[index].sockfd;
srandom(getpid());
for (i=0; i<MAX_PAYLOAD_SIZE; i++) buf[i]=(char)(random()&0x000000ff);
//put the send time at the first eight bytes
/* outdated since we use sniff for delay measurement now
* put the send time at the first eight bytes
gettimeofday(&sendtime, NULL);
tmpulong = htonl(sendtime.tv_sec);
memcpy(buf, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(sendtime.tv_usec);
memcpy(buf+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
*/
//send packets
while (send_all(sockfd, buf, MAX_PAYLOAD_SIZE) == 0){ //rcv conn closed
......@@ -244,7 +250,7 @@ int send_monitor(int sockfd) {
char outbuf_delay[3*SIZEOF_LONG], outbuf_bandwidth[3*SIZEOF_LONG];
unsigned long tmpulong;
int i;
float tmpf;
//float tmpf;
tmpulong = htonl(CODE_DELAY);
memcpy(outbuf_delay+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
......@@ -260,10 +266,12 @@ int send_monitor(int sockfd) {
return 0;
}
last_delays[i] = delays[i];
} //if
} //if measurement changed since last send
/* Throughput is not measured at the moment
if (throughputs[i] != last_throughputs[i]) {
memcpy(outbuf_bandwidth, &(snddb[i].ip), SIZEOF_LONG); //the sender or src ip
tmpf=throughputs[i]*1000.0f/QUANTA+0.5f;
tmpf=throughputs[i]/QUANTA+0.5f;
tmpulong = htonl(floor(tmpf)+BANDWIDTH_OVER_THROUGHPUT);
memcpy(outbuf_bandwidth+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
if (send_all(sockfd, outbuf_bandwidth, 3*SIZEOF_LONG) == 0){
......@@ -271,6 +279,8 @@ int send_monitor(int sockfd) {
}
last_throughputs[i] = throughputs[i];
} //if measurement changed since last send
*/
} //if connection is valid
} //for
return 1;
......@@ -283,7 +293,7 @@ int have_time(struct timeval *start_tvp, struct timeval *left_tvp){
gettimeofday(&current_tv, NULL);
past_usec = (current_tv.tv_sec-start_tvp->tv_sec)*1000000+
(current_tv.tv_usec-start_tvp->tv_usec);
left_usec = QUANTA-past_usec;
left_usec = QUANTA*1000-past_usec; //QUANTA is in msec
if (left_usec > 0) {
left_tvp->tv_sec = left_usec/1000000;
left_tvp->tv_usec= left_usec%1000000;
......@@ -292,52 +302,6 @@ int have_time(struct timeval *start_tvp, struct timeval *left_tvp){
return 0;
}
void readline(void) {
struct in_addr address;
char line[MAX_TCPDUMP_LINE];
char *trash, *src, *size, *id;
char ip[16], size_string[5];
int i=0, j=0, numbytes=0, index;
if(fgets(line, MAX_TCPDUMP_LINE, STDIN) != NULL) {
strtok(line, " ");
while ( (trash=strtok(NULL, " ")) != NULL){
if (i == 2 ) {
src = trash;
} else if (i == 6){
id = trash;
} else if (i == 7){
size = trash;
break;
}
} //while
if (src != NULL){
i = 0;
j = 0;
while (j < 4 ) {
ip[i] = src[i];
if (src[i] == '.'){ j++; }
i++;
}
if (j==4){
ip[15]='\0';
inet_aton(ip, &address);
if ((index=search_rcvdb(address.s_addr)) != -1) {
if (id != NULL) {
if (strcmp(id,"length")!=0) {
numbytes = atoi(size); //error on size==null?
} else {
sscanf(id, "%*s(%s)",size_string);
numbytes = atoi(size_string);
} //if id=="length"
throughputs[index] += numbytes;
} //if id exists
} //if ip in rcvdb
} //if 4 dots
} //if src exists
} //if fgets
}
int main(void) {
int sockfd_snd, sockfd_rcv_sender, sockfd_rcv_monitor, sockfd_monitor=-1;
struct sockaddr_in my_addr; // my address information
......@@ -345,7 +309,7 @@ int main(void) {
fd_set read_fds_copy, write_fds_copy;
socklen_t sin_size;
struct timeval start_tv, left_tv;
int yes=1, maxfd, i, to_ms, flag_send_monitor=0;
int yes=1, maxfd, i, flag_send_monitor=0;
//set up debug flag
if (getenv("Debug")!=NULL)
......@@ -400,16 +364,17 @@ int main(void) {
}
//initialization
init_db();
init_pcap(SNIFF_TIMEOUT);
FD_ZERO(&read_fds);
FD_ZERO(&read_fds_copy);
FD_ZERO(&write_fds);
FD_ZERO(&write_fds_copy);
FD_SET(pcapfd, &read_fds);
FD_SET(sockfd_rcv_sender, &read_fds);
FD_SET(sockfd_rcv_monitor, &read_fds);
FD_SET(STDIN, &read_fds);
maxfd = sockfd_rcv_monitor; //socket order
maxfd = pcapfd; //socket order
sin_size = sizeof(struct sockaddr_in);
init_db();
//main loop - the stubd runs forever
while (1) {
......@@ -493,9 +458,8 @@ int main(void) {
}
//sniff packets
if (have_time(&start_tv, &left_tv)) {
to_ms = left_tv.tv_sec*1000+floor(left_tv.tv_usec/1000+0.5);
sniff(to_ms);
if (FD_ISSET(pcapfd, &read_fds_copy)) {
sniff();
}
} //while in quanta
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment