Commit 4f1ffb87 authored by Jonathon Duerig's avatar Jonathon Duerig
Browse files

Expanded monitor to be more generic (not bound to a particular IP address...

Expanded monitor to be more generic (not bound to a particular IP address ,etc.). Added throughput measurement to the stub (note that the throughput measurements are not sent to the monitor, only printed to the screen). Added the first primitive traffic modeling to the stub. Changed the stub defines to agree with the monitor (monitor port == 4200).
parent e2aa22bc
#Usage: tcpdump | python monitor.py <mapping-file> <experiment-name> <ip-address>
import sys
import os
......@@ -5,16 +6,25 @@ import time
import socket
import select
emulated_to_real = {'10.0.0.1' : '10.1.0.1',
'10.0.0.2' : '10.1.0.2'}
#emulated_to_real = {'10.0.0.1' : '10.1.0.1',
# '10.0.0.2' : '10.1.0.2'}
real_to_emulated = {'10.1.0.1' : '10.0.0.1',
'10.1.0.2' : '10.0.0.2'}
#real_to_emulated = {'10.1.0.1' : '10.0.0.1',
# '10.1.0.2' : '10.0.0.2'}
emulated_to_real = {}
real_to_emulated = {}
emulated_to_interface = {}
ip_mapping_filename = ''
this_experiment = ''
this_ip = ''
def main_loop():
# Initialize
read_args()
populate_ip_tables()
quanta = 5 # in seconds
stub_address = 'planet0.pelab.tbres.emulab.net'
stub_address = 'planet0.pelab.tbres.emulab.net' # emulated_to_real[this_ip]
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((stub_address, 4200))
poll = select.poll()
......@@ -24,7 +34,7 @@ def main_loop():
while not done:
# Reset
dest_set = set([])
packet_list = []
max_time = time.time() + quanta
# Collect data until the next quanta boundary
......@@ -33,84 +43,137 @@ def main_loop():
for pos in fdlist:
if pos[0] == sys.stdin.fileno() and not done:
# A line of data from tcpdump is available.
ip = get_next_destination()
dest_set.add(ip)
packet = get_next_packet()
packet_list = packet_list + [packet]
elif pos[0] == conn.fileno() and not done:
# A record for change in link characteristics is available.
done = not receive_characteristic(conn)
elif not done:
sys.stdout.write('fd: ' + str(pos[0]) + ' conn-fd: ' + str(conn.fileno()) + '\n')
# Update the stub
send_destinations(conn, dest_set)
send_destinations(conn, packet_list)
sys.stdout.write('Loop-end\n')
def get_next_destination():
def read_args():
global ip_mapping_filename, this_experiment, this_ip
if len(sys.argv) == 4:
ip_mapping_filename = sys.argv[1]
this_experiment = sys.argv[2]
this_ip = sys.argv[3]
def populate_ip_tables():
input = open(ip_mapping_filename, 'r')
line = input.readline()
while line != '':
fields = line.strip().split(' ', 2)
if len(fields) == 3:
emulated_to_real[fields[0]] = fields[1]
real_to_emulated[fields[1]] = fields[0]
emulated_to_interface[fields[0]] = fields[2]
line = input.readline()
def get_next_packet():
line = sys.stdin.readline()
ip_list = line.split('>', 1)[1].strip().split('.', 4)
result = ip_list[0] + '.' + ip_list[1] + '.' + ip_list[2] + '.' + ip_list[3]
sys.stdout.write('dest: ' + result + '\n')
return result
ip = ip_list[0] + '.' + ip_list[1] + '.' + ip_list[2] + '.' + ip_list[3]
# sys.stdout.write('dest: ' + result + '\n')
time = float(line.split(' ', 1)[0])
size_list = line.split('(', 1)
if len(size_list) > 1:
size = int(size_list[1].split(')', 1)[0])
else:
size = 0
return (ip, time, size)
def receive_characteristic(conn):
buffer = conn.recv(12)
if len(buffer) == 12:
dest = load_int(buffer[0:4])
# TODO: Map real dest back into emulated dest
dest = real_to_emulated[int_to_ip(load_int(buffer[0:4]))]
command = load_int(buffer[4:8])
value = load_int(buffer[8:12])
sys.stdout.write('received: ' + str(dest) + ' '
+ str(command) + ' ' + str(value) + '\n')
if command == 1:
# value is bandwidth in kbps
set_bandwidth(value)
set_bandwidth(value, dest)
elif command == 2:
# value is delay in milliseconds
set_delay(value)
set_delay(value, dest)
elif command == 3:
# value is packet loss in packets per billion
set_loss(value/1000000000.0)
set_loss(value/1000000000.0, dest)
return True
elif len(buffer) == 0:
return False
def load_int(str):
result = 0
for i in range(4):
result = result | ((ord(str[i]) & 0xff) << (8*i))
return result
def set_bandwidth(kbps):
def set_bandwidth(kbps, dest):
sys.stdout.write('<event> bandwidth=' + str(kbps) + '\n')
return set_link('bandwidth=' + str(kbps))
return set_link(this_ip, dest, 'bandwidth=' + str(kbps))
def set_delay(milliseconds):
return set_link('delay=' + str(milliseconds) + 'ms')
def set_delay(milliseconds, dest):
error = set_link(this_ip, dest, 'delay=' + str(milliseconds/2))
if error == 0:
return set_link(dest, this_ip, 'delay=' + str(milliseconds/2))
else:
return error
def set_loss(probability):
return set_link('plr=' + str(probability))
def set_link(ending):
command_base = '/usr/testbed/bin/tevc -e tbres/pelab now link0 modify '
return os.system(command_base + ending)
def send_destinations(conn, dest_set):
sys.stdout.write('<send> ' + str(0) + ' ' + str(len(dest_set)) + ' -- '
+ str(dest_set) + '\n')
output = save_int(0) + save_int(len(dest_set))
for dest in dest_set:
real_dest = emulated_to_real[dest]
ip_list = real_dest.split('.', 3)
ip = 0
for ip_byte in ip_list:
ip = ip << 8
ip = ip | (int(ip_byte, 10) & 0xff)
output = output + save_int(ip)
return set_link(this_ip, dest, 'plr=' + str(probability))
def set_link(source, dest, ending):
command = ('/usr/testbed/bin/tevc -e ' + this_experiment + ' now '
+ emulated_to_interface[source] + ' modify dest=' + dest + ' '
+ ending)
sys.stdout.write('event: ' + command + '\n')
return os.system(command)
def send_destinations(conn, packet_list):
# sys.stdout.write('<send> ' + str(0) + ' ' + str(len(packet_list)) + ' -- '
# + str(packet_list) + '\n')
output = save_int(0) + save_int(len(packet_list))
prev_time = 0.0
if len(packet_list) > 0:
prev_time = packet_list[0][1]
for packet in packet_list:
ip = ip_to_int(emulated_to_real[packet[0]])
output = output + save_int(ip) + save_int(int((packet[1] - prev_time)
* 1000)) + save_int(packet[2])
prev_time = packet[1]
conn.sendall(output)
def load_int(str):
result = 0
for i in range(4):
result = result | ((ord(str[i]) & 0xff) << (8*(3-i)))
return result
def save_int(number):
result = ''
for i in range(4):
result = result + chr(number & 0xff)
number = number >> 8
result = result + chr((number >> ((3-i)*8)) & 0xff)
return result
def ip_to_int(ip):
ip_list = ip.split('.', 3)
result = 0
for ip_byte in ip_list:
result = result << 8
result = result | (int(ip_byte, 10) & 0xff)
return result
#def int_to_ip(num):
# ip_list = ['0', '0', '0', '0']
# for ip_byte in ip_list:
# ip_byte = str(num & 0xff)
# num = num >> 8
# ip_list.reverse()
# return '.'.join(ip_list)
def int_to_ip(num):
ip_list = ['0', '0', '0', '0']
for index in range(4):
ip_list[3-index] = str(num & 0xff)
num = num >> 8
return '.'.join(ip_list)
main_loop()
sudo /usr/sbin/tcpdump -n -i eth1 "!(dst host 10.0.0.1) && dst net 10" | python monitor.py
# Usage: sh run-monitor.sh <experiment-name> <ip-address>
sudo /usr/sbin/tcpdump -tt -n -i eth1 "!(dst host $2) && dst net 10 && tcp" | tee tcpdump.txt | python monitor.py ip-mapping.txt $1 $2
......@@ -41,6 +41,71 @@ sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
pcap_t* descr;
int pcapfd;
ThroughputAckState throughput[CONCURRENT_RECEIVERS];
// Returns true if sequence is between the firstUnknown and the
// nextSequence. Takes account of wraparound.
int throughputInWindow(ThroughputAckState * state, unsigned int sequence)
{
return sequence >= state->firstUnknown
|| (state->nextSequence < state->firstUnknown
&& sequence < state->nextSequence);
}
// Reset the state of a connection completely.
void throughputInit(ThroughputAckState * state, unsigned int sequence)
{
state->firstUnknown = sequence;
state->nextSequence = sequence;
state->ackSize = 0;
state->repeatSize = 0;
}
// Notify the throughput monitor that a new packet has been sent
// out. This updates the expected nextSequence number.
void throughputProcessSend(ThroughputAckState * state, unsigned int sequence,
unsigned int size)
{
if (sequence == state->nextSequence)
{
state->nextSequence += size;
}
else if (throughputInWindow(state, sequence))
{
unsigned int maxRepeat = state->nextSequence - sequence;
if (size < maxRepeat)
{
state->repeatSize += size;
}
else
{
state->repeatSize += maxRepeat;
state->nextSequence += size - maxRepeat;
}
}
}
// Notify the throughput monitor that some bytes have been acknowledged.
void throughputProcessAck(ThroughputAckState * state, unsigned int sequence)
{
if (throughputInWindow(state, sequence))
{
state->ackSize += sequence - state->firstUnknown + 1;
state->firstUnknown = sequence + 1;
}
}
// How many bytes have been acknowledged since the last call to
// throughputTick()?
unsigned int throughputTick(ThroughputAckState * state)
{
int result = state->ackSize;
state->ackSize = 0;
state->repeatSize = 0;
return result;
}
void init_sniff_rcvdb(void) {
int i;
......@@ -205,8 +270,11 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
}
if (path->end == path->start){ //no previous packet
throughputInit(&throughput[path_id], seq_start);
throughputProcessSend(&throughput[path_id], seq_start, length);
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet
} else {
throughputProcessSend(&throughput[path_id], seq_start, length);
//find the real received end index
end = (path->end-1)%SNIFF_WINSIZE;
......@@ -241,6 +309,9 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
if (path_id != -1) { //a monitored incoming packet
if (ack_bit == 1) { //has an acknowledgement
ack_seq = ntohl(tp->ack_seq);
throughputProcessAck(&throughput[path_id], ack_seq);
record_id = search_sniff_rcvdb(path_id, ack_seq-1);
if (record_id != -1) { //new ack received
if (flag_resend) { //if the ack is triggered by a resend, skip the delay calculation.
......@@ -327,8 +398,8 @@ void init_pcap(int to_ms) {
char string_filter[128];
//struct in_addr addr;
dev = "vnet"; //"eth0"; //
// dev = "vnet"; //"eth0"; //
dev = "eth1";
/* ask pcap for the network address and mask of the device */
pcap_lookupnet(dev,&netp,&maskp,errbuf);
//For an unknown reason, netp has the wrong 4th number
......
......@@ -27,12 +27,12 @@
#define STDIN 0 // file descriptor for standard input
#define QUANTA 5000 //feed-loop interval in msec
#define MONITOR_PORT 3490 //the port the monitor connects to
#define MONITOR_PORT 4200 //the port the monitor connects to
#define SENDER_PORT 3491 //the port the stub senders connect to
#define PENDING_CONNECTIONS 10 //the pending connections the queue will hold
#define CONCURRENT_SENDERS 50 //concurrent senders the stub maintains
#define CONCURRENT_RECEIVERS 50 //concurrent receivers the stub maintains
#define MAX_PAYLOAD_SIZE 100 //size of the traffic payload
#define MAX_PAYLOAD_SIZE 2000 //size of the traffic payload
#define MAX_TCPDUMP_LINE 256 //the max line size of the tcpdump output
#define SIZEOF_LONG sizeof(long) //message bulding block
#define BANDWIDTH_OVER_THROUGHPUT 0 //the safty margin for estimating the available bandwidth
......@@ -82,6 +82,21 @@ extern int search_rcvdb(unsigned long indexip);
extern void sniff(void);
extern void init_pcap(int to_ms);
typedef struct
{
unsigned int firstUnknown;
unsigned int nextSequence;
unsigned int ackSize;
unsigned int repeatSize;
} ThroughputAckState;
extern ThroughputAckState throughput[CONCURRENT_RECEIVERS];
// Returns the number of acknowledged bytes since the last
// throughputTick() call.
extern unsigned int throughputTick(ThroughputAckState * state);
#endif
......
......@@ -9,6 +9,8 @@
#include "stub.h"
void clean_exit(int);
//Global
short flag_debug;
connection rcvdb[CONCURRENT_RECEIVERS];
......@@ -21,6 +23,129 @@ connection snddb[CONCURRENT_SENDERS];
unsigned long throughputs[CONCURRENT_SENDERS], last_throughputs[CONCURRENT_SENDERS];
fd_set read_fds,write_fds;
int maxfd;
char random_buffer[MAX_PAYLOAD_SIZE];
typedef struct packet_buffer_node_tag
{
struct packet_buffer_node_tag * next;
char * buffer;
int size;
} packet_buffer_node;
typedef struct
{
unsigned long ip;
long delta;
long size;
} packet_info;
packet_buffer_node * packet_buffer_head;
packet_buffer_node * packet_buffer_tail;
int packet_buffer_index;
void packet_buffer_init(void)
{
packet_buffer_head = NULL;
packet_buffer_tail = NULL;
packet_buffer_index = 0;
}
void packet_buffer_cleanup(void)
{
packet_buffer_node * old_head;
while (packet_buffer_head != NULL)
{
old_head = packet_buffer_head;
packet_buffer_head = old_head->next;
free(old_head->buffer);
free(old_head);
}
packet_buffer_tail = NULL;
packet_buffer_index = 0;
}
// Add a buffer with data about packets to send to the end of the list.
void packet_buffer_add(char * buffer, int size)
{
packet_buffer_node * newbuf = malloc(sizeof(packet_buffer_node));
if (newbuf == NULL)
{
perror("allocate");
clean_exit(1);
}
newbuf->next = NULL;
newbuf->buffer = buffer;
newbuf->size = size;
if (packet_buffer_tail == NULL)
{
packet_buffer_head = newbuf;
packet_buffer_tail = newbuf;
packet_buffer_index = 0;
}
else
{
packet_buffer_tail->next = newbuf;
packet_buffer_tail = newbuf;
}
}
// Get info about the next packet to send
packet_info packet_buffer_front(void)
{
packet_info result;
if (packet_buffer_head == NULL)
{
printf("packet_buffer_head == NULL in front\n");
clean_exit(1);
}
else
{
char * base = packet_buffer_head->buffer + packet_buffer_index;
memcpy(&result.ip, base, SIZEOF_LONG);
memcpy(&result.delta, base + SIZEOF_LONG, SIZEOF_LONG);
result.delta = ntohl(result.delta);
memcpy(&result.size, base + SIZEOF_LONG + SIZEOF_LONG, SIZEOF_LONG);
result.size = ntohl(result.size);
}
return result;
}
// Are there any packets to get info about?
int packet_buffer_more(void)
{
return packet_buffer_head != NULL;
}
// Move to the next packet, cleaning up as we go.
void packet_buffer_advance(void)
{
packet_buffer_index += 3*SIZEOF_LONG;
if (packet_buffer_index >= packet_buffer_head->size)
{
packet_buffer_node * old_head = packet_buffer_head;
packet_buffer_head = old_head->next;
packet_buffer_index = 0;
free(old_head->buffer);
free(old_head);
if (packet_buffer_head == NULL)
{
packet_buffer_tail = NULL;
}
}
}
void init_random_buffer(void)
{
int i = 0;
srandom(getpid());
for (i=0; i<MAX_PAYLOAD_SIZE; i++)
{
random_buffer[i]=(char)(random()&0x000000ff);
}
}
void init(void) {
int i;
......@@ -104,6 +229,7 @@ void clean_exit(int code){
close(snddb[i].sockfd);
}
}
packet_buffer_cleanup();
exit(code);
}
......@@ -127,6 +253,11 @@ int get_rcvdb_index(unsigned long destaddr){
perror("connect");
clean_exit(1);
}
// update maxfd
if (sockfd > maxfd)
{
maxfd = sockfd;
}
dbindex=insert_db(destaddr, sockfd, 0); //insert rcvdb
}
return dbindex;
......@@ -202,41 +333,48 @@ void receive_sender(int i) {
}
}
void send_receiver(unsigned long destaddr, char *buf){
int i, index;
void send_receiver(unsigned long destaddr, long size, fd_set * write_fds_copy){
int index;
int sockfd;
//struct timeval sendtime;
//unsigned long tmpulong;
int error = 1;
if (size > MAX_PAYLOAD_SIZE)
{
printf("size exceeded MAX_PAYLOAD_SIZE\n");
size = MAX_PAYLOAD_SIZE;
}
if (size <= 0)
{
size = 1;
}
index = get_rcvdb_index(destaddr);
sockfd= rcvdb[index].sockfd;
srandom(getpid());
for (i=0; i<MAX_PAYLOAD_SIZE; i++) buf[i]=(char)(random()&0x000000ff);
/* outdated since we use sniff for delay measurement now
* put the send time at the first eight bytes
gettimeofday(&sendtime, NULL);
tmpulong = htonl(sendtime.tv_sec);
memcpy(buf, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(sendtime.tv_usec);
memcpy(buf+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
*/
//send packets
while (send_all(sockfd, buf, MAX_PAYLOAD_SIZE) == 0){ //rcv conn closed
// if (select says its ok to write) || (we just created this connection)
if (FD_ISSET(sockfd, write_fds_copy) || !FD_ISSET(sockfd, &write_fds))
{
FD_SET(sockfd, &write_fds);
error = send_all(sockfd, random_buffer, size);
}
while (error == 0){ //rcv conn closed
// TODO: What reset stuff needs to go here?
rcvdb[index].valid = 0;
FD_CLR(rcvdb[index].sockfd, &read_fds);
FD_CLR(rcvdb[index].sockfd, &write_fds);
index = get_rcvdb_index(destaddr);
sockfd= rcvdb[index].sockfd;
FD_SET(sockfd, &write_fds);
error = send_all(sockfd, random_buffer, size);
}
}
int receive_monitor(int sockfd) {
int receive_monitor(int sockfd, struct timeval * deadline) {
char buf[MAX_PAYLOAD_SIZE];
char *nextptr;
unsigned long tmpulong, destnum, destaddr;
int i;
unsigned long tmpulong, destnum;
char * packet_buffer = NULL;
//receive first two longs
if (recv_all(sockfd, buf, 2*SIZEOF_LONG)==0) {
......@@ -245,22 +383,31 @@ int receive_monitor(int sockfd) {
nextptr = buf+SIZEOF_LONG;
memcpy(&tmpulong, nextptr, SIZEOF_LONG);
destnum = ntohl(tmpulong);
packet_buffer = malloc(destnum*3*SIZEOF_LONG);
//return success if no dest addr is given
if (destnum == 0){
return 1;
}
//otherwise, receive dest addrs
if (recv_all(sockfd, buf, destnum*SIZEOF_LONG)==0) {
if (recv_all(sockfd, packet_buffer, destnum*3*SIZEOF_LONG)==0) {
free(packet_buffer);
return 0;
}
nextptr=buf;
for (i=0; i<destnum; i++){
memcpy(&tmpulong, nextptr, SIZEOF_LONG);
destaddr = tmpulong; //address should stay in Network Order!
nextptr += SIZEOF_LONG;
send_receiver(destaddr, buf);
} //for
if (!packet_buffer_more())
{