Commit ec8e8cef authored by Junxing Zhang's avatar Junxing Zhang
Browse files

Add the delay list in the stubd and stub-pcap for more accurate delay emulation.

parent 059b4abf
......@@ -36,10 +36,10 @@ struct my_ip {
struct in_addr ip_src,ip_dst; /* source and dest address */
};
sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
pcap_t* descr;
int pcapfd;
FILE *loss_log;
ThroughputAckState throughput[CONCURRENT_RECEIVERS];
......@@ -108,6 +108,12 @@ void throughputProcessAck(ThroughputAckState * state, unsigned int sequence)
// How many bytes have been acknowledged since the last call to
// throughputTick()?
unsigned int bytesThisTick(ThroughputAckState * state) {
return state->ackSize;
}
// What is the bandwidth of the acknowledged bytes since the last call to
// throughputTick()?
unsigned int throughputTick(ThroughputAckState * state)
{
double result = 0.0;
......@@ -256,8 +262,6 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
u_int caplen = pkthdr->caplen;
u_short len, hlen, version, tcp_hlen, ack_bit;
u_long seq_start, seq_end, ack_seq, ip_src, ip_dst;
unsigned short source_port = 0;
unsigned short dest_port = 0;
int path_id, record_id, msecs, end, flag_resend=0;
sniff_path *path;
......@@ -318,19 +322,14 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
}
tp = (struct tcphdr *)cp;
tcp_hlen = ((tp)->doff & 0x000f);
tcp_hlen = ((tp)->doff & 0x000f);
length -= (tcp_hlen * 4); //jump pass the tcp header
caplen -= (tcp_hlen * 4); //jump pass the tcp header
seq_start = ntohl(tp->seq);
seq_end = ((unsigned long)(seq_start+length));
ack_bit= ((tp)->ack & 0x0001);
source_port = htons(tp->source);
dest_port = htons(tp->dest);
// path_id = search_rcvdb(ip_dst);
// I contacted the receiver. Therefore, my port is unique and
// the receiver's port is fixed. The destination is the
// receiver, therefore my port is the one that is of interest.
path_id = find_by_stub_port(ip_dst, source_port);
path_id = search_rcvdb(ip_dst);
if (path_id != -1) { //a monitored outgoing packet
//ignore the pure outgoing ack
if ((ack_bit==1) && (seq_end==seq_start)) {
......@@ -358,6 +357,8 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
//the last packet also has no payload and has the same seqnum
flag_resend = 1; //pure resent
loss_records[path_id].loss_counter++;
fprintf(loss_log, "Resent: %lu\n",seq_end); //loss log
fflush(loss_log); //loss log
} else {
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet
}
......@@ -367,21 +368,20 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
flag_resend = 1;
loss_records[path_id].loss_counter++;
if (seq_end > path->records[end].seq_end){ //partial resend
fprintf(loss_log, "Resent: %lu to %lu\n",seq_start, path->records[end].seq_end-1); //loss log
fflush(loss_log); //loss log
return push_sniff_rcvdb(path_id, path->records[end].seq_end+1, seq_end, &(pkthdr->ts));
}
fprintf(loss_log, "Resent: %lu to %lu\n",seq_start, seq_end-1); //loss log
fflush(loss_log); //loss log
} // if has payload and resent
}
} else {
// path_id = search_rcvdb(ip_src);
// I contacted the receiver, so my port is unique and their
// port is the same every time. This means that if a packet is
// coming from them, the destination port is the one of
// interest.
path_id = find_by_stub_port(ip_src, dest_port);
path_id = search_rcvdb(ip_src);
if (path_id != -1) { //a monitored incoming packet
if (ack_bit == 1) { //has an acknowledgement
ack_seq = ntohl(tp->ack_seq);
ack_seq = ntohl(tp->ack_seq);
throughputProcessAck(&throughput[path_id], ack_seq);
......@@ -392,6 +392,7 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
} else { //calculate the delay
msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5);
delays[path_id] = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs;
append_delay_sample(path_id, delays[path_id]);
}
pop_sniff_rcvdb(path_id, (unsigned long)(ack_seq-1)); //advance the sniff window base
} //ack in rcvdb
......@@ -513,6 +514,9 @@ void init_pcap(int to_ms) {
pcapfd = pcap_fileno(descr);
init_sniff_rcvdb();
loss_log = fopen("loss.log", "w"); //loss log
}
void sniff(void) {
......
......@@ -6,6 +6,7 @@
*
****************************************************************************/
#include "stub.h"
/*
......@@ -18,18 +19,27 @@ extern int optopt;
extern int opterr;
extern int optreset;
void clean_exit(int);
void append_delay_sample(int path_id, long sample_value);
//Global
short flag_debug, flag_standalone;
char sniff_interface[128];
connection rcvdb[CONCURRENT_RECEIVERS];
unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
unsigned long last_delays[CONCURRENT_RECEIVERS];
delay_record delay_records[CONCURRENT_RECEIVERS]; //delay list is calculated at the sender side
loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
int last_through[CONCURRENT_RECEIVERS];
int flag_testmode=0;
enum {TEST_NOTTESTING, TEST_NOTSTARTED, TEST_RUNNING, TEST_DONE } test_state;
unsigned long long total_bytes = 0;
connection snddb[CONCURRENT_SENDERS];
fd_set read_fds,write_fds;
int maxfd;
char random_buffer[MAX_PAYLOAD_SIZE];
......@@ -48,8 +58,6 @@ typedef struct
unsigned long ip;
long delta;
long size;
unsigned short source_port;
unsigned short dest_port;
} packet_info;
packet_buffer_node * packet_buffer_head;
......@@ -114,30 +122,11 @@ packet_info packet_buffer_front(void)
else
{
char * base = packet_buffer_head->buffer + packet_buffer_index;
// Get ip
memcpy(&result.ip, base, SIZEOF_LONG);
base += SIZEOF_LONG;
// Get source port
memcpy(&result.source_port, base, sizeof(result.source_port));
result.source_port = ntohs(result.source_port);
base += sizeof(result.source_port);
// Get dest port
memcpy(&result.dest_port, base, sizeof(result.dest_port));
result.dest_port = ntohs(result.dest_port);
base += sizeof(result.dest_port);
// Get delta time
memcpy(&result.delta, base, SIZEOF_LONG);
memcpy(&result.delta, base + SIZEOF_LONG, SIZEOF_LONG);
result.delta = ntohl(result.delta);
base += SIZEOF_LONG;
// Get size
memcpy(&result.size, base, SIZEOF_LONG);
memcpy(&result.size, base + SIZEOF_LONG + SIZEOF_LONG, SIZEOF_LONG);
result.size = ntohl(result.size);
base += SIZEOF_LONG;
}
return result;
}
......@@ -151,7 +140,7 @@ int packet_buffer_more(void)
// Move to the next packet, cleaning up as we go.
void packet_buffer_advance(void)
{
packet_buffer_index += 3*SIZEOF_LONG + 2*sizeof(unsigned short);
packet_buffer_index += 3*SIZEOF_LONG;
if (packet_buffer_index >= packet_buffer_head->size)
{
packet_buffer_node * old_head = packet_buffer_head;
......@@ -175,7 +164,95 @@ void init_random_buffer(void)
random_buffer[i]=(char)(random()&0x000000ff);
}
}
/*
void copy_ulong(char **buf_ptr, ulong value, int flag_hton) {
ulong tmpulong;
if (flag_hton) {
tmpulong = htonl(value);
} else {
tmpulong = value;
}
memcpy(*buf_ptr, &tmpulong, SIZEOF_LONG);
(*buf_ptr) += SIZEOF_LONG;
}
//Append a delay sample to the tail of the ith delay-record queue.
void append_delay_sample(int path_id, long sample_value) {
delay_sample *sample = malloc(sizeof(delay_sample));
if (sample == NULL) {
perror("allocate");
clean_exit(1);
}
sample->next = NULL;
sample->value = sample_value;
gettimeofday(&(sample->time), NULL);
if (delay_records[path_id].tail == NULL) {
delay_records[path_id].head = sample;
delay_records[path_id].tail = sample;
} else {
delay_records[path_id].tail->next = sample;
delay_records[path_id].tail = sample;
}
delay_records[path_id].sample_number++;
}
//Remove and return the delay samples.
//Input: path_id.
//Output: size.
//Return: sample_buffer
char *remove_delay_samples(int path_id, int *size) {
int i, number, msecs;
unsigned long interval;
char *sample_buffer, *buf_ptr;
struct timeval *last_tvp, *this_tvp;
delay_sample *samp_ptr;
number = delay_records[path_id].sample_number;
if (number == 0) return NULL;
*size = 3 * SIZEOF_LONG * (number+1);
sample_buffer = (char *) malloc(*size);
if (sample_buffer == NULL) {
perror("allocate");
clean_exit(1);
}
buf_ptr = sample_buffer;
//copy list size into the buffer
//format: ip, type, value
copy_ulong(&buf_ptr, rcvdb[path_id].ip, 0); //the receiver ip
copy_ulong(&buf_ptr, CODE_LIST_SIZE, 1);
copy_ulong(&buf_ptr, number, 1);
//copy the first delay into the buffer
//format: interval, type, value
samp_ptr = delay_records[path_id].head;
copy_ulong(&buf_ptr, 0L, 1); //zero interval
copy_ulong(&buf_ptr, CODE_LIST_DELAY, 1);
copy_ulong(&buf_ptr, samp_ptr->value, 1);
last_tvp = &(samp_ptr->time);
//copy the following delays
for (i=1; i<number; i++) {
samp_ptr = samp_ptr->next;
this_tvp = &(samp_ptr->time);
msecs = floor((this_tvp->tv_usec-last_tvp->tv_usec)/1000.0+0.5);
interval = (this_tvp->tv_sec-last_tvp->tv_sec)*1000 + msecs;
copy_ulong(&buf_ptr, interval, 1); //interval in msecs
copy_ulong(&buf_ptr, CODE_LIST_DELAY, 1);
copy_ulong(&buf_ptr, samp_ptr->value, 1);
last_tvp = &(samp_ptr->time);
free(delay_records[path_id].head); //release the i-1 delay_sample
delay_records[path_id].head = samp_ptr; //use head as the pre_ptr
}
free(delay_records[path_id].head); //release the last delay_sample
delay_records[path_id].head = NULL;
delay_records[path_id].tail = NULL;
delay_records[path_id].sample_number = 0;
return sample_buffer;
}
//Initialize or reset state varialbes related to a receiver connection
void reset_rcv_entry(int i) {
rcvdb[i].valid = 0;
......@@ -185,31 +262,19 @@ void reset_rcv_entry(int i) {
delays[i]=0;
last_delays[i]=0;
}
*/
void init(void) {
int i;
/* for (i=0; i<CONCURRENT_RECEIVERS; i++){
for (i=0; i<CONCURRENT_RECEIVERS; i++){
reset_rcv_entry(i);
}
for (i=0; i<CONCURRENT_SENDERS; i++){
snddb[i].valid = 0;
}*/
for (i = 0; i < CONCURRENT_RECEIVERS; i++)
{
add_empty_receiver(i);
init_connection(& rcvdb[i]);
}
for (i = 0; i < CONCURRENT_SENDERS; i++)
{
add_empty_sender(i);
init_connection(& snddb[i]);
}
}
/*
int insert_db(unsigned long ip, unsigned short source_port,
unsigned short dest_port, int sockfd, int dbtype) {
int insert_db(unsigned long ip, int sockfd, int dbtype) {
int i, record_number, next = -1;
time_t now = time(NULL);
double thisdiff, maxdiff = 0;
......@@ -252,31 +317,23 @@ int insert_db(unsigned long ip, unsigned short source_port,
}
db[next].valid = 1;
db[next].ip = ip;
db[next].source_port = source_port;
db[next].dest_port = dest_port;
db[next].sockfd= sockfd;
db[next].last_usetime = now;
db[next].pending = 0;
return next;
}
*/
/*
int search_rcvdb(unsigned long indexip, unsigned short source_port,
unsigned short dest_port){
int search_rcvdb(unsigned long indexip){
int i;
for (i=0; i<CONCURRENT_RECEIVERS; i++){
if (rcvdb[i].valid==1 && rcvdb[i].ip == indexip
&& rcvdb[i].source_port == source_port
&& rcvdb[i].dest_port == dest_port) {
if (rcvdb[i].valid==1 && rcvdb[i].ip == indexip) {
rcvdb[i].last_usetime = time(NULL);
return i;
}
}
return -1; //no sockfd is -1
}
*/
void clean_exit(int code){
int i;
......@@ -295,13 +352,11 @@ void clean_exit(int code){
exit(code);
}
/*
int get_rcvdb_index(unsigned long destaddr, unsigned short source_port,
unsigned short dest_port){
int get_rcvdb_index(unsigned long destaddr){
int dbindex, sockfd;
struct sockaddr_in their_addr; // connector's address information
if ((dbindex=search_rcvdb(destaddr, source_port, dest_port)) == -1) {
if ((dbindex=search_rcvdb(destaddr)) == -1) {
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
clean_exit(1);
......@@ -321,18 +376,16 @@ int get_rcvdb_index(unsigned long destaddr, unsigned short source_port,
if (sockfd > maxfd) {
maxfd = sockfd;
}
//insert rcvdb
dbindex=insert_db(destaddr, source_port, dest_port, sockfd, 0);
dbindex=insert_db(destaddr, sockfd, 0); //insert rcvdb
}
return dbindex;
}
*/
void remove_pending(int index)
{
if (rcvdb[index].pending == 0)
{
clear_pending(index, &write_fds);
FD_CLR(rcvdb[index].sockfd, &write_fds);
}
}
......@@ -340,33 +393,37 @@ void add_pending(int index, int size)
{
if (rcvdb[index].pending == 0 && size > 0)
{
set_pending(index, &write_fds);
FD_SET(rcvdb[index].sockfd, &write_fds);
}
rcvdb[index].pending += size;
}
void try_pending(int index)
void try_pending(int index, fd_set * write_fds_copy)
{
int size = 0;
int error = 0;
if (rcvdb[index].pending > LOW_WATER_MARK)
if (rcvdb[index].pending > 0 && FD_ISSET(rcvdb[index].sockfd,
write_fds_copy))
{
size = LOW_WATER_MARK;
}
else
{
size = rcvdb[index].pending;
}
error = send(rcvdb[index].sockfd, random_buffer, size, 0);
if (error == -1)
{
perror("try_pending");
clean_exit(1);
int size = 0;
int error = 0;
if (rcvdb[index].pending > LOW_WATER_MARK)
{
size = LOW_WATER_MARK;
}
else
{
size = rcvdb[index].pending;
}
error = send(rcvdb[index].sockfd, random_buffer, size, 0);
if (error == -1)
{
perror("try_pending");
clean_exit(1);
}
rcvdb[index].pending -= error;
total_size += error;
// printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending);
remove_pending(index);
}
rcvdb[index].pending -= error;
total_size += error;
printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending);
remove_pending(index);
}
void print_header(char *buf){
......@@ -421,69 +478,62 @@ int send_all(int sockfd, char *buf, int size) {
void receive_sender(int i) {
char inbuf[MAX_PAYLOAD_SIZE];
if (recv(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE, 0)== 0) {
//connection closed
remove_sender_index(i, &read_fds);
if (recv(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE, 0)== 0) { //connection closed
snddb[i].valid = 0; //no additional clean-up because no other state varialbe is related
FD_CLR(snddb[i].sockfd, &read_fds);
}
}
void send_receiver(packet_info packet, fd_set * write_fds_copy){
void send_receiver(unsigned long destaddr, long size, fd_set * write_fds_copy){
int index;
int sockfd;
int error = 1, retry=0;
struct in_addr addr;
// index = get_rcvdb_index(packet.ip, packet.source_port, packet.dest_port);
index = insert_by_address(packet.ip, packet.source_port, packet.dest_port);
if (index == -1)
{
printf("No more connections.\n");
clean_exit(1);
}
index = get_rcvdb_index(destaddr);
sockfd= rcvdb[index].sockfd;
if (packet.size <= 0) {
packet.size = 1;
if (size <= 0) {
size = 1;
}
if (rcvdb[index].pending > 0) {
add_pending(index, packet.size);
add_pending(index, size);
return;
}
if (packet.size > MAX_PAYLOAD_SIZE){
add_pending(index, packet.size - MAX_PAYLOAD_SIZE);
packet.size = MAX_PAYLOAD_SIZE;
if (size > MAX_PAYLOAD_SIZE){
add_pending(index, size - MAX_PAYLOAD_SIZE);
size = MAX_PAYLOAD_SIZE;
}
error = send(sockfd, random_buffer, packet.size, MSG_DONTWAIT);
error = send(sockfd, random_buffer, size, MSG_DONTWAIT);
// Handle failed connection
while (error == -1 && errno == ECONNRESET && retry < 3) {
/*
// TODO: Think hard about what resetting a connection means for sniffing
// traffic.
//reset the related state variables
int pending = rcvdb[index].pending;
sniff_rcvdb[index].start = 0;
sniff_rcvdb[index].end = 0;
throughput[index].isValid = 0;
FD_CLR(rcvdb[index].sockfd, &write_fds);
// TODO: Fix redo
reset_rcv_entry(index);
//try again
index = get_rcvdb_index(packet.ip, packet.source_port, packet.dest_port);
index = get_rcvdb_index(destaddr);
rcvdb[index].pending = pending;
*/
reconnect_receiver(index);
sockfd= rcvdb[index].sockfd;
error = send(sockfd, random_buffer, packet.size, MSG_DONTWAIT);
error = send(sockfd, random_buffer, size, MSG_DONTWAIT);
retry++;
}
//if still disconnected, reset
if (error == -1 && errno == ECONNRESET) {
remove_index(index, &write_fds);
addr.s_addr = packet.ip;
rcvdb[index].valid = 0;
addr.s_addr = destaddr;
printf("Error: send_receiver() - failed send to %s three times. \n", inet_ntoa(addr));
}
else if (error == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
add_pending(index, packet.size);
add_pending(index, size);
}
else if (error == -1) {
perror("send_receiver: send");
......@@ -491,34 +541,32 @@ void send_receiver(packet_info packet, fd_set * write_fds_copy){
}
else {
total_size += error;
printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending);
add_pending(index, packet.size - error);
// printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending);
add_pending(index, size - error);
}
}
int receive_monitor(int sockfd, struct timeval * deadline) {
char buf[2*SIZEOF_LONG];
int buffer_size = 0;
unsigned long destnum = 0;
char buf[MAX_PAYLOAD_SIZE];
char *nextptr;
unsigned long tmpulong, destnum;
char * packet_buffer = NULL;
//receive first two longs
if (recv_all(sockfd, buf, 2*SIZEOF_LONG)==0) {
return 0;
}
memcpy(&destnum, buf + SIZEOF_LONG, SIZEOF_LONG);
destnum = ntohl(destnum);
nextptr = buf+SIZEOF_LONG;
memcpy(&tmpulong, nextptr, SIZEOF_LONG);
destnum = ntohl(tmpulong);
packet_buffer = malloc(destnum*3*SIZEOF_LONG);
//return success if no dest addr is given
if (destnum == 0){
return 1;
}
buffer_size = (int)(destnum * (3 * SIZEOF_LONG
+ 2 * sizeof(unsigned short)));
packet_buffer = malloc(buffer_size);
//otherwise, receive dest addrs
if (recv_all(sockfd, packet_buffer, buffer_size)==0) {
if (recv_all(sockfd, packet_buffer, destnum*3*SIZEOF_LONG)==0) {
free(packet_buffer);
return 0;
}
......@@ -526,120 +574,93 @@ int receive_monitor(int sockfd, struct timeval * deadline) {
{
gettimeofday(deadline, NULL);
}
packet_buffer_add(packet_buffer, buffer_size);
return 1;
}
packet_buffer_add(packet_buffer, destnum*3*SIZEOF_LONG);
char * save_receiver_address(char * buf, int index)
{
unsigned short port;
// Insert IP address
memcpy(buf, &(rcvdb[index].ip), SIZEOF_LONG); //the receiver ip
buf += SIZEOF_LONG;
// Insert source port
port = htons(rcvdb[index].source_port);
memcpy(buf, &port, sizeof(port));
buf += sizeof(port);
// Insert destination port
port = htons(rcvdb[index].dest_port);
memcpy(buf, &port, sizeof(port));
buf += sizeof(port);
return buf;
}
// nextptr=buf;
// for (i=0; i<destnum; i++){
// memcpy(&tmpulong, nextptr, SIZEOF_LONG);