Commit 302e1855 authored by Jonathon Duerig's avatar Jonathon Duerig

Added standalone support for multiple streams. Added a logging infrastructure....

Added standalone support for multiple streams. Added a logging infrastructure. Added a trivial application for testing purposes (dumb-client and dumb-server). Added the capability for running the standalone stub with a tcpdump replay. Added support for changing buffer sizes. Added shell files for experiment management, reset.sh which is to reset the dummynet settings between runs and run-iperf.sh which encapsulates a set of options to iperf.
parent ea55f687
...@@ -14,6 +14,15 @@ import socket ...@@ -14,6 +14,15 @@ import socket
import select import select
import re import re
CODE_BANDWIDTH = 1
CODE_DELAY = 2
CODE_LOSS = 3
CODE_LIST_DELAY = 4
PACKET_WRITE = 1
PACKET_SEND_BUFFER = 2
PACKET_RECEIVE_BUFFER = 3
emulated_to_real = {} emulated_to_real = {}
real_to_emulated = {} real_to_emulated = {}
emulated_to_interface = {} emulated_to_interface = {}
...@@ -21,7 +30,7 @@ ip_mapping_filename = '' ...@@ -21,7 +30,7 @@ ip_mapping_filename = ''
this_experiment = '' this_experiment = ''
this_ip = '' this_ip = ''
stub_ip = '' stub_ip = ''
netmon_output_version = 1 netmon_output_version = 2
total_size = 0 total_size = 0
last_total = -1 last_total = -1
...@@ -96,6 +105,7 @@ def populate_ip_tables(): ...@@ -96,6 +105,7 @@ def populate_ip_tables():
def get_next_packet(): def get_next_packet():
global total_size, last_total global total_size, last_total
event_code = PACKET_WRITE
line = sys.stdin.readline() line = sys.stdin.readline()
if line == "": if line == "":
raise EOFError raise EOFError
...@@ -110,11 +120,11 @@ def get_next_packet(): ...@@ -110,11 +120,11 @@ def get_next_packet():
linexp = re.compile('^(\d+\.\d+) > (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) (\((\d+)\))?') linexp = re.compile('^(\d+\.\d+) > (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) (\((\d+)\))?')
match = linexp.match(line) match = linexp.match(line)
conexp = re.compile('^(New|Closed): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+)') conexp = re.compile('^(New|Closed|SO_RCVBUF|SO_SNDBUF): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) ((\d+))?')
cmatch = conexp.match(line) cmatch = conexp.match(line)
if (match) : if match:
localport = 0 # We may not get this one localport = 0 # We may not get this one
if (netmon_output_version == 1): if netmon_output_version == 1:
time = float(match.group(1)) time = float(match.group(1))
ipaddr = match.group(2) ipaddr = match.group(2)
remoteport = int(match.group(3)) remoteport = int(match.group(3))
...@@ -122,7 +132,7 @@ def get_next_packet(): ...@@ -122,7 +132,7 @@ def get_next_packet():
size = int(match.group(5)) size = int(match.group(5))
elif (netmon_output_version == 2): elif (netmon_output_version == 2):
time = float(match.group(1)) time = float(match.group(1))
localport = match.group(2) localport = int(match.group(2))
ipaddr = match.group(3) ipaddr = match.group(3)
remoteport = int(match.group(4)) remoteport = int(match.group(4))
size_given = match.group(5) != '' size_given = match.group(5) != ''
...@@ -133,16 +143,29 @@ def get_next_packet(): ...@@ -133,16 +143,29 @@ def get_next_packet():
if not size_given: if not size_given:
size = 0 size = 0
total_size = total_size + size total_size = total_size + size
return (ipaddr, localport, remoteport, time, size) return (ipaddr, localport, remoteport, time, size, event_code)
elif ((netmon_output_version == 2) and cmatch): elif ((netmon_output_version == 2) and cmatch):
# #
# Watch for new or closed connections # Watch for new or closed connections
# #
event = cmatch.group(1) event = cmatch.group(1)
localport = cmatch.group(2) localport = int(cmatch.group(2))
ipaddr = cmatch.group(3) ipaddr = cmatch.group(3)
remoteport = int(cmatch.group(4))
value_given = cmatch.group(5) != ''
value = int(cmatch.group(6))
if not value_given:
value = 0
sys.stdout.write("Got a connection event: " + event + "\n") sys.stdout.write("Got a connection event: " + event + "\n")
return None if event == 'SO_RCVBUF':
event_code = PACKET_RECEIVE_BUFFER
elif event == 'SO_SNDBUF':
event_code = PACKET_SEND_BUFFER
sys.stdout.write('Packet send buffer was set to: ' + str(value) + '\n')
else:
return None
return (ipaddr, localport, remoteport, 0, value, event_code)
else: else:
sys.stdout.write('skipped line in the wrong format: ' + line) sys.stdout.write('skipped line in the wrong format: ' + line)
return None return None
...@@ -158,25 +181,39 @@ def receive_characteristic(conn): ...@@ -158,25 +181,39 @@ def receive_characteristic(conn):
# sys.stdout.write('received: ' + str(dest) + ' ' + str(source_port) + ' ' # sys.stdout.write('received: ' + str(dest) + ' ' + str(source_port) + ' '
# + str(dest_port) + ' ' + str(command) + ' ' + str(value) # + str(dest_port) + ' ' + str(command) + ' ' + str(value)
# + '\n') # + '\n')
if command == 1: if command == CODE_BANDWIDTH:
# value is bandwidth in kbps # value is bandwidth in kbps
sys.stdout.write('Bandwidth: ' + str(value) + '\n');
set_bandwidth(value, dest) set_bandwidth(value, dest)
elif command == 2: elif command == CODE_DELAY:
# value is delay in milliseconds # value is delay in milliseconds
sys.stdout.write('Delay: ' + str(value) + '\n');
set_delay(value, dest) set_delay(value, dest)
elif command == 3: elif command == CODE_LOSS:
# value is packet loss in packets per billion # value is packet loss in packets per billion
sys.stdout.write('Loss: ' + str(value) + '\n');
set_loss(value/1000000000.0, dest) set_loss(value/1000000000.0, dest)
elif command == CODE_LIST_DELAY:
# value is the number of samples
buffer = conn.recv(value*8)
# Dummynet isn't quite set up to deal with this yet, so ignore it.
sys.stdout.write('Ignoring delay list of size: ' + str(value) + '\n')
else:
sys.stdout.write('Other: ' + str(command) + ', ' + str(value) + '\n');
return True return True
elif len(buffer) == 0: elif len(buffer) == 0:
return False return False
def set_bandwidth(kbps, dest): def set_bandwidth(kbps, dest):
sys.stdout.write('<event> bandwidth=' + str(kbps) + '\n') # sys.stdout.write('<event> bandwidth=' + str(kbps) + '\n')
return set_link(this_ip, dest, 'bandwidth=' + str(kbps)) return set_link(this_ip, dest, 'bandwidth=' + str(kbps))
# Set delay on the link. We are given round trip time. # Set delay on the link. We are given round trip time.
def set_delay(milliseconds, dest): def set_delay(milliseconds, dest):
now = time.time()
sys.stderr.write('purple\n')
sys.stderr.write('line ' + ('%0.6f' % now) + ' 0 ' + ('%0.6f' % now)
+ ' ' + str(milliseconds) + '\n')
# Set the delay from here to there to 1/2 rtt. # Set the delay from here to there to 1/2 rtt.
error = set_link(this_ip, dest, 'delay=' + str(milliseconds/2)) error = set_link(this_ip, dest, 'delay=' + str(milliseconds/2))
if error == 0: if error == 0:
...@@ -204,11 +241,16 @@ def send_destinations(conn, packet_list): ...@@ -204,11 +241,16 @@ def send_destinations(conn, packet_list):
prev_time = packet_list[0][3] prev_time = packet_list[0][3]
for packet in packet_list: for packet in packet_list:
ip = ip_to_int(emulated_to_real[packet[0]]) ip = ip_to_int(emulated_to_real[packet[0]])
delta = int((packet[3] - prev_time) * 1000)
if packet[3] == 0:
delta = 0
output = (output + save_int(ip) + save_short(packet[1]) output = (output + save_int(ip) + save_short(packet[1])
+ save_short(packet[2]) + save_short(packet[2])
+ save_int(int((packet[3] - prev_time) * 1000)) + save_int(delta)
+ save_int(packet[4])) + save_int(packet[4])
prev_time = packet[3] + save_short(packet[5]))
if packet[3] != 0:
prev_time = packet[3]
conn.sendall(output) conn.sendall(output)
def load_int(str): def load_int(str):
......
sh instrument.sh ../iperf -i 0.5 -c elab1 -t 30
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 delay=5
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 delay=5
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 bandwidth=100000
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 bandwidth=100000
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 delay=5
/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 delay=5
/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 bandwidth=10000
/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 bandwidth=10000
/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 lpr=0.0
all: stubd stub-monitor all: stubd stub-monitor dumb-client dumb-server
stubd: stubd.o stub-pcap.o lookup.o stub.h stubd: stubd.o stub-pcap.o lookup.o log.o
g++ -g -Wall stubd.o stub-pcap.o lookup.o -lm -lpcap -o stubd g++ -g -Wall stubd.o stub-pcap.o lookup.o log.o -lm -lpcap -o stubd
stub-monitor: stub-monitor.c stub.h stub-monitor: stub-monitor.c stub.h
gcc -g -Wall stub-monitor.c -o stub-monitor gcc -g -Wall stub-monitor.c -o stub-monitor
stubd.o: stubd.c stub.h stubd.o: stubd.c stub.h log.h
gcc -g -Wall -c stubd.c gcc -g -Wall -c stubd.c
stub-pcap.o: stub-pcap.c stub.h stub-pcap.o: stub-pcap.c stub.h log.h
gcc -g -Wall -c stub-pcap.c gcc -g -Wall -c stub-pcap.c
lookup.o: lookup.cc stub.h lookup.o: lookup.cc stub.h log.h
g++ -g -Wall -c lookup.cc g++ -g -Wall -c lookup.cc
log.o: log.c log.h
gcc -g -Wall -c log.c
dumb-client: dumb-client.o
gcc -g -Wall dumb-client.o -lm -lpcap -o dumb-client
dumb-client.o: dumb-client.c stub.h
gcc -g -Wall -c dumb-client.c
dumb-server: dumb-server.o
gcc -g -Wall dumb-server.o -lm -lpcap -o dumb-server
dumb-server.o: dumb-server.c stub.h
gcc -g -Wall -c dumb-server.c
clean: clean:
rm *.o stubd stubm stub-monitor rm *.o stubd stubm stub-monitor
// dumb-client.c
#include "stub.h"
int main(int argc, char * argv[])
{
char buffer[MAX_PAYLOAD_SIZE] = {0};
int connection = -1;
int error = 0;
struct sockaddr_in address;
if (argc != 3)
{
printf("Usage: %s <server-ip> <server-port>\n", argv[0]);
return 1;
}
connection = socket(AF_INET, SOCK_STREAM, 0);
if (connection == -1)
{
printf("Failed socket()\n");
return 1;
}
address.sin_family = AF_INET;
address.sin_port = htons(atoi(argv[2]));
inet_aton(argv[1], &address.sin_addr);
memset(address.sin_zero, '\0', 8);
error = connect(connection, (struct sockaddr *)&address, sizeof(address));
if (error == -1)
{
printf("Failed connect()\n");
return 1;
}
int count = 0;
while (1)
{
// usleep(1000);
count = send(connection, buffer, MAX_PAYLOAD_SIZE, MSG_DONTWAIT);
// printf("Sent %d bytes\n", count);
}
return 0;
}
// dumb-server.c
#include "stub.h"
int main(int argc, char * argv[])
{
char buffer[MAX_PAYLOAD_SIZE] = {0};
int connection = -1;
int error = 0;
struct sockaddr_in address;
int client = -1;
int address_size = sizeof(address);
if (argc != 2)
{
printf("Usage: %s <port-number>\n", argv[0]);
return 1;
}
connection = socket(AF_INET, SOCK_STREAM, 0);
if (connection == -1)
{
printf("Failed socket()\n");
return 1;
}
address.sin_family = AF_INET;
address.sin_port = htons(atoi(argv[1]));
address.sin_addr.s_addr = htonl(INADDR_ANY);
memset(address.sin_zero, '\0', 8);
error = bind(connection, (struct sockaddr *)&address, sizeof(address));
if (error == -1)
{
printf("Failed bind()\n");
return 1;
}
error = listen(connection, 1);
if (error == -1)
{
printf("Failed listen()\n");
return 1;
}
client = accept(connection, (struct sockaddr *)&address, &address_size);
if (client == -1)
{
printf("Failed accept()\n");
return 1;
}
while (1)
{
recv(client, buffer, MAX_PAYLOAD_SIZE, 0);
}
return 0;
}
// log.c
#include "stub.h"
#include "log.h"
static FILE * logFile;
static int logFlags;
static int logTimestamp;
void logInit(FILE * destFile, int flags, int useTimestamp)
{
if (destFile == NULL || flags == LOG_NOTHING)
{
logFile = NULL;
logFlags = LOG_NOTHING;
logTimestamp = 0;
}
else
{
logFile = destFile;
logFlags = flags;
logTimestamp = useTimestamp;
}
}
void logCleanup(void)
{
}
// Print the timestamp and type of logging to the logFile.
static void logPrefix(int flags, struct timeval const * timestamp)
{
if (flags & CONTROL_SEND)
{
fprintf(logFile, "CONTROL_SEND ");
}
if (flags & CONTROL_RECEIVE)
{
fprintf(logFile, "CONTROL_RECEIVE ");
}
if (flags & TCPTRACE_SEND)
{
fprintf(logFile, "TCPTRACE_SEND ");
}
if (flags & TCPTRACE_RECEIVE)
{
fprintf(logFile, "TCPTRACE_RECEIVE ");
}
if (flags & SNIFF_SEND)
{
fprintf(logFile, "SNIFF_SEND ");
}
if (flags & SNIFF_RECEIVE)
{
fprintf(logFile, "SNIFF_RECEIVE ");
}
if (flags & PEER_WRITE)
{
fprintf(logFile, "PEER_WRITE ");
}
if (flags & PEER_READ)
{
fprintf(logFile, "PEER_READ ");
}
if (flags & MAIN_LOOP)
{
fprintf(logFile, "MAIN_LOOP ");
}
if (flags & LOOKUP_DB)
{
fprintf(logFile, "LOOKUP_DB ");
}
if (flags & DELAY_DETAIL)
{
fprintf(logFile, "DELAY_DETAIL ");
}
if (logTimestamp)
{
struct timeval now;
struct timeval const * timeptr = timestamp;
if (timeptr == NULL)
{
gettimeofday(&now, NULL);
timeptr = &now;
}
fprintf(logFile, "%d.%d ", (int)(timeptr->tv_sec),
(int)((timeptr->tv_usec)/1000));
}
fprintf(logFile, ": ");
}
void logWrite(int flags, struct timeval const * timestamp,
char const * format, ...)
{
va_list va;
va_start(va, format);
if ((flags & logFlags) != 0)
{
logPrefix(flags & logFlags, timestamp);
vfprintf(logFile, format, va);
fprintf(logFile, "\n");
}
va_end(va);
}
// log.h
#ifndef _LOG_H
#define _LOG_H
#ifdef __cplusplus
extern "C"
{
#endif
// The logging framework. Use logInit() during initialization to set
// up logging. Then use log() to generate messages. logCleanup()
// should be called during program shutdown.
// logInit() opens a file, sets what types of logging messages to
// actually print, and determines whether timestamps should be
// prepended onto messages. Logging is disabled if 'filename' is NULL
// or 'flags' is 0.
//
// 'filename' is the name of the file to be used for logging. It is
// opened in append mode.
//
// 'flags' is the logical or of one or more of the LOG_TYPEs defined
// below or 0. This determines which of the message types to print out.
//
// If 'useTimestamp' is 1, then timestamps are prepended to each
// message, otherwise, they are ignored.
void logInit(FILE * destFile, int flags, int useTimestamp);
// logCleanup() cleans up the logging state.
void logCleanup(void);
// The logWrite() function is used to print a logging message.
//
// 'flags' is the logical or of one or more of the LOG_TYPEs defined
// below. This tags the logging message so it can be selectively
// filtered. This should never be '0'. If it is '0', the message will
// not be printed.
// 'timestamp' is a pointer to a timestamp which will be prepended
// onto the logging message. If it is NULL, the current time is
// used. Note that whether or not the timestamp is printed at all is
// set elsewhere.
// 'format' printf-style formatting string. It and any extra variables
// act just like a normal printf call. This is where you actually
// write your message. It will be written to the logging file.
void logWrite(int flags, struct timeval const * timestamp,
char const * format, ...);
enum LOG_TYPE
{
// Information about control messages being sent to the monitor
CONTROL_SEND = 0x1,
// Information about control messages being received from the monitor
CONTROL_RECEIVE = 0x2,
// Information about sent control messages in a format used in
// tcptrace graphs.
TCPTRACE_SEND = 0x4,
// Information about received control messages in a format used in
// tcptrace graphs.
TCPTRACE_RECEIVE = 0x8,
// Information about sent packets which were sniffed by stub-pcap.
SNIFF_SEND = 0x10,
// Information about received packets which were sniffed by stub-pcap.
SNIFF_RECEIVE = 0x20,
// Information about writing to peers.
PEER_WRITE = 0x40,
// Information about reading from peers.
PEER_READ = 0x80,
// Which stages are being run in the main loop.
MAIN_LOOP = 0x100,
// Database storage and reterieval
LOOKUP_DB = 0x200,
DELAY_DETAIL = 0x400,
// Shortcuts for common cases.
LOG_NOTHING = 0x00,
LOG_EVERYTHING = 0x7ff
};
#ifdef __cplusplus
}
#endif
#endif
...@@ -48,7 +48,7 @@ struct Address ...@@ -48,7 +48,7 @@ struct Address
}; };
void init_connection(struct connection * conn) void init_connection(connection * conn)
{ {
if (conn != NULL) if (conn != NULL)
{ {
...@@ -191,8 +191,8 @@ int for_each_to_monitor(send_to_monitor function, int monitor) ...@@ -191,8 +191,8 @@ int for_each_to_monitor(send_to_monitor function, int monitor)
std::map<Stub, int>::iterator limit = stub_receivers.end(); std::map<Stub, int>::iterator limit = stub_receivers.end();
for (; pos != limit && result == 1; ++pos) for (; pos != limit && result == 1; ++pos)
{ {
int index = pos->second; int index = pos->second;
result = function(monitor, index); result = function(monitor, index);
} }
return result; return result;
} }
...@@ -230,13 +230,48 @@ int insert_by_address(unsigned long ip, unsigned short source_port, ...@@ -230,13 +230,48 @@ int insert_by_address(unsigned long ip, unsigned short source_port,
return result; return result;
} }
int insert_fake(unsigned long ip, unsigned short port)
{
int result = find_by_address(ip, 0, 0);
if (result == FAILED_LOOKUP && ! empty_receivers.empty())
{
// Update the lookup structures.
int index = * empty_receivers.begin();
empty_receivers.erase(index);
Address address_key = Address(ip, 0, 0);
address_receivers.insert(std::make_pair(address_key, index));
Stub stub_key = Stub(ip, port);
stub_receivers.insert(std::make_pair(stub_key, index));
// Connect to the index.
reset_receive_records(index, ip, 0, 0);
rcvdb[index].sockfd = -1;
// Set the new port and sockfd
rcvdb[index].stub_port = port;
// Reset sniffing records
sniff_rcvdb[index].start = 0;
sniff_rcvdb[index].end = 0;
throughput[index].isValid = 0;
loss_records[index].loss_counter=0;
loss_records[index].total_counter=0;
last_loss_rates[index]=0;