Commit 515423f9 authored by Jonathon Duerig's avatar Jonathon Duerig

Fixed several minor bugs in Rob's scripts. Added the capability for multiple...

Fixed several minor bugs in Rob's scripts. Added the capability for multiple streams along the same path to the monitor/stub loop. Replaced the linear searches with logarithmic searches. Cleaned up various associated parts of the code.

TODO: Finish connection recycling capability
TODO: Remove commented out older code after more debugging
parent c808ef0a
......@@ -9,8 +9,9 @@
#
export NICKNAME=`cat /var/emulab/boot/nickname`;
export HOSTNAME=`echo $NICKNAME | cut -d. -f1`;
export PROJECT=`echo $NICKNAME | cut -d. -f2`;
export EXPERIMENT=`echo $NICKNAME | cut -d. -f3`;
export EXPERIMENT=`echo $NICKNAME | cut -d. -f2`;
export PROJECT=`echo $NICKNAME | cut -d. -f3`;
#
# Important directories
......
......@@ -51,13 +51,15 @@ def main_loop():
if pos[0] == sys.stdin.fileno() and not done:
# A line of data from tcpdump is available.
try:
# sys.stdout.write('get_next_packet()\n')
packet = get_next_packet()
except EOFError:
sys.stdout.write('Done: Got EOF on stdin\n')
# sys.stdout.write('Done: Got EOF on stdin\n')
done = 1
if (packet):
packet_list = packet_list + [packet]
elif pos[0] == conn.fileno() and not done:
sys.stdout.write('receive_characteristic()\n')
# A record for change in link characteristics is available.
done = not receive_characteristic(conn)
elif not done:
......@@ -111,7 +113,7 @@ def get_next_packet():
conexp = re.compile('^(New|Closed): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+)')
cmatch = conexp.match(line)
if (match) :
localport = 0 # We man not get this one
localport = 0 # We may not get this one
if (netmon_output_version == 1):
time = float(match.group(1))
ipaddr = match.group(2)
......@@ -131,7 +133,7 @@ def get_next_packet():
if not size_given:
size = 0
total_size = total_size + size
return (ipaddr, time, size)
return (ipaddr, localport, remoteport, time, size)
elif ((netmon_output_version == 2) and cmatch):
#
# Watch for new or closed connections
......@@ -140,27 +142,31 @@ def get_next_packet():
localport = cmatch.group(2)
ipaddr = cmatch.group(3)
sys.stdout.write("Got a connection event: " + event + "\n")
return None
else:
sys.stdout.write('skipped line in the wrong format: ' + line)
return None
def receive_characteristic(conn):
buffer = conn.recv(12)
if len(buffer) == 12:
buffer = conn.recv(16)
if len(buffer) == 16:
dest = real_to_emulated[int_to_ip(load_int(buffer[0:4]))]
command = load_int(buffer[4:8])
value = load_int(buffer[8:12])
# sys.stdout.write('received: ' + str(dest) + ' '
# + str(command) + ' ' + str(value) + '\n')
source_port = load_short(buffer[4:6])
dest_port = load_short(buffer[6:8])
command = load_int(buffer[8:12])
value = load_int(buffer[12:16])
# sys.stdout.write('received: ' + str(dest) + ' ' + str(source_port) + ' '
# + str(dest_port) + ' ' + str(command) + ' ' + str(value)
# + '\n')
if command == 1:
# value is bandwidth in kbps
set_bandwidth(value, dest)
elif command == 2:
# value is delay in milliseconds
set_delay(value, dest)
# elif command == 3:
elif command == 3:
# value is packet loss in packets per billion
# set_loss(value/1000000000.0, dest)
set_loss(value/1000000000.0, dest)
return True
elif len(buffer) == 0:
return False
......@@ -195,24 +201,38 @@ def send_destinations(conn, packet_list):
output = save_int(0) + save_int(len(packet_list))
prev_time = 0.0
if len(packet_list) > 0:
prev_time = packet_list[0][1]
prev_time = packet_list[0][3]
for packet in packet_list:
ip = ip_to_int(emulated_to_real[packet[0]])
output = output + save_int(ip) + save_int(int((packet[1] - prev_time)
* 1000)) + save_int(packet[2])
prev_time = packet[1]
output = (output + save_int(ip) + save_short(packet[1])
+ save_short(packet[2])
+ save_int(int((packet[3] - prev_time) * 1000))
+ save_int(packet[4]))
prev_time = packet[3]
conn.sendall(output)
def load_int(str):
return load_n(str, 4)
def save_int(number):
return save_n(number, 4)
def load_short(str):
return load_n(str, 2)
def save_short(number):
return save_n(number, 2);
def load_n(str, n):
result = 0
for i in range(4):
result = result | ((ord(str[i]) & 0xff) << (8*(3-i)))
for i in range(n):
result = result | ((ord(str[i]) & 0xff) << (8*(n-1-i)))
return result
def save_int(number):
def save_n(number, n):
result = ''
for i in range(4):
result = result + chr((number >> ((3-i)*8)) & 0xff)
for i in range(n):
result = result + chr((number >> ((n-1-i)*8)) & 0xff)
return result
def ip_to_int(ip):
......
......@@ -13,7 +13,7 @@ if [ $# != 1 ]; then
echo "Usage: $0 <my-ip> [stub-ip]"
exit 1;
fi
SIP=$4
SIP=$2
fi
PID=$PROJECT
......
all: stubd stub-monitor stubm
all: stubd stub-monitor
stubd: stubd.o stub-pcap.o stub.h
gcc -g -Wall -lm -lpcap stubd.o stub-pcap.o -o stubd
stubm: stubm.o stub-pcap.o stub.h
gcc -g -Wall -lm -lpcap stubm.o stub-pcap.o -o stubm
stubd: stubd.o stub-pcap.o lookup.o stub.h
g++ -g -Wall stubd.o stub-pcap.o lookup.o -lm -lpcap -o stubd
stub-monitor: stub-monitor.c stub.h
gcc -g -Wall stub-monitor.c -o stub-monitor
stubd.o: stubd.c stub.h
gcc -g -Wall -c stubd.c
stubm.o: stubm.c stub.h
gcc -g -Wall -c stubm.c
stub-pcap.o: stub-pcap.c stub.h
gcc -g -Wall -c stub-pcap.c
lookup.o: lookup.cc stub.h
g++ -g -Wall -c lookup.cc
clean:
rm *.o stubd stubm stub-monitor
// lookup.cc
// Provide lookups of the sender and receiver indices based on a
// number of different keys.
// TODO: Add phasing out of old connections
#include "stub.h"
#include <set>
#include <map>
#include <utility>
struct Stub
{
Stub(unsigned long newIp = 0, unsigned short newPort = 0)
: ip(newIp)
, stub_port(newPort)
{
}
bool operator<(Stub const & right) const
{
return ip < right.ip || (ip == right.ip && stub_port < right.stub_port);
}
unsigned long ip;
unsigned short stub_port;
};
struct Address
{
Address(unsigned long newIp = 0, unsigned short newSource = 0,
unsigned short newDest = 0)
: ip(newIp)
, source_port(newSource)
, dest_port(newDest)
{
}
bool operator<(Address const & right) const
{
return ip < right.ip || (ip == right.ip &&
(source_port < right.source_port ||
(source_port == right.source_port
&& dest_port < right.dest_port)));
}
unsigned long ip;
unsigned short source_port;
unsigned short dest_port;
};
void init_connection(struct connection * conn)
{
if (conn != NULL)
{
conn->valid = 0;
conn->sockfd = -1;
conn->ip = 0;
conn->stub_port = 0;
conn->source_port = 0;
conn->dest_port = 0;
conn->last_usetime = time(NULL);
conn->pending = 0;
}
}
//-----------------------------------------------------------------
// Each of these functions uses and manipulates the sender database.
static std::set<int> empty_senders;
static std::map<Stub, int> stub_senders;
void add_empty_sender(int index)
{
empty_senders.insert(index);
}
void for_each_readable_sender(handle_index function, fd_set * read_fds_copy)
{
std::map<Stub, int>::iterator pos = stub_senders.begin();
std::map<Stub, int>::iterator limit = stub_senders.end();
for (; pos != limit; ++pos)
{
int index = pos->second;
if (FD_ISSET(snddb[index].sockfd, read_fds_copy))
{
function(index);
}
}
}
int replace_sender_by_stub_port(unsigned long ip, unsigned short stub_port,
int sockfd, fd_set * read_fds)
{
int result = FAILED_LOOKUP;
time_t now = time(NULL);
Stub key(ip, stub_port);
std::map<Stub, int>::iterator pos = stub_senders.find(key);
if (pos != stub_senders.end())
{
// There is already a connection. Get rid of the old connection.
int index = pos->second;
snddb[index].last_usetime = now;
snddb[index].pending = 0;
if (snddb[index].sockfd != -1)
{
FD_CLR(snddb[index].sockfd, read_fds);
close(snddb[index].sockfd);
}
if (sockfd > maxfd)
{
maxfd = sockfd;
}
snddb[index].sockfd = sockfd;
FD_SET(snddb[index].sockfd, read_fds);
result = index;
}
else if (! empty_senders.empty())
{
// There aren't any connections. Choose one of the empty slots.
int index = * empty_senders.begin();
empty_senders.erase(index);
// This will not overlap with anything because we checked it above.
// Therefore we don't have to check the return code now.
stub_senders.insert(std::make_pair(key, index));
snddb[index].valid = 1;
snddb[index].ip = ip;
snddb[index].stub_port = stub_port;
snddb[index].source_port = 0;
snddb[index].dest_port = 0;
FD_SET(sockfd, read_fds);
if (sockfd > maxfd)
{
maxfd = sockfd;
}
snddb[index].sockfd = sockfd;
snddb[index].last_usetime = now;
snddb[index].pending = 0;
result = index;
}
return result;
}
void remove_sender_index(int index, fd_set * read_fds)
{
bool was_valid = empty_senders.insert(index).second;
if (was_valid)
{
Stub key(snddb[index].ip, snddb[index].stub_port);
stub_senders.erase(key);
snddb[index].valid = 0;
FD_CLR(snddb[index].sockfd, read_fds);
if (snddb[index].sockfd != -1)
{
close(snddb[index].sockfd);
snddb[index].sockfd = -1;
}
}
}
//-----------------------------------------------------------------
// Each of these functions manipulates the receiver database.
static std::set<int> empty_receivers;
static std::set<int> pending_receivers;
static std::map<Stub, int> stub_receivers;
static std::map<Address, int> address_receivers;
void add_empty_receiver(int index)
{
empty_receivers.insert(index);
}
void for_each_pending(handle_index function, fd_set * write_fds_copy)
{
std::set<int>::iterator pos = pending_receivers.begin();
std::set<int>::iterator limit = pending_receivers.end();
for (; pos != limit; ++pos)
{
int index = *pos;
if (FD_ISSET(rcvdb[index].sockfd, write_fds_copy))
{
function(index);
}
}
}
int for_each_to_monitor(send_to_monitor function, int monitor)
{
int result = 1;
std::map<Stub, int>::iterator pos = stub_receivers.begin();
std::map<Stub, int>::iterator limit = stub_receivers.end();
for (; pos != limit && result == 1; ++pos)
{
int index = pos->second;
result = function(monitor, index);
}
return result;
}
int find_by_address(unsigned long ip, unsigned short source_port,
unsigned short dest_port)
{
int result = FAILED_LOOKUP;
Address key(ip, source_port, dest_port);
std::map<Address, int>::iterator pos = address_receivers.find(key);
if (pos != address_receivers.end())
{
result = pos->second;
}
return result;
}
int insert_by_address(unsigned long ip, unsigned short source_port,
unsigned short dest_port)
{
int result = find_by_address(ip, source_port, dest_port);
if (result == FAILED_LOOKUP && ! empty_receivers.empty())
{
// Update the lookup structures.
int index = * empty_receivers.begin();
empty_receivers.erase(index);
Address address_key = Address(ip, source_port, dest_port);
address_receivers.insert(std::make_pair(address_key, index));
// Connect to the index.
reset_receive_records(index, ip, source_port, dest_port);
reconnect_receiver(index);
result = index;
}
return result;
}
void reconnect_receiver(int index)
{
struct sockaddr_in dest_addr;
dest_addr.sin_family = AF_INET;
dest_addr.sin_port = htons(SENDER_PORT);
dest_addr.sin_addr.s_addr = rcvdb[index].ip;
memset(&(dest_addr.sin_zero), '\0', 8);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
{
perror("socket");
clean_exit(1);
}
int connect_error = connect(sockfd, (struct sockaddr *)&dest_addr,
sizeof(dest_addr));
if (connect_error == -1)
{
perror("connect");
clean_exit(1);
}
struct sockaddr_in source_addr;
socklen_t len = sizeof(source_addr);
int sockname_error = getsockname(sockfd, (struct sockaddr *)&source_addr,
&len);
if (sockname_error == -1)
{
perror("getsockname");
clean_exit(1);
}
unsigned short stub_port = ntohs(source_addr.sin_port);
if (sockfd > maxfd)
{
maxfd = sockfd;
}
// If there was a valid connection before, clean it up.
if (rcvdb[index].sockfd != -1)
{
close(rcvdb[index].sockfd);
Stub erase_key = Stub(rcvdb[index].ip, rcvdb[index].stub_port);
stub_receivers.erase(erase_key);
}
// Add it to the lookup
Stub insert_key = Stub(rcvdb[index].ip, stub_port);
stub_receivers.insert(std::make_pair(insert_key, index));
// Set the new port and sockfd
rcvdb[index].stub_port = stub_port;
rcvdb[index].sockfd = sockfd;
// Reset sniffing records
sniff_rcvdb[index].start = 0;
sniff_rcvdb[index].end = 0;
throughput[index].isValid = 0;
loss_records[index].loss_counter=0;
loss_records[index].total_counter=0;
last_loss_rates[index]=0;
delays[index]=0;
last_delays[index]=0;
}
void reset_receive_records(int index, unsigned long ip,
unsigned short source_port,
unsigned short dest_port)
{
time_t now = time(NULL);
rcvdb[index].valid = 1;
rcvdb[index].ip = ip;
rcvdb[index].source_port = source_port;
rcvdb[index].dest_port = dest_port;
rcvdb[index].last_usetime = now;
}
int find_by_stub_port(unsigned long ip, unsigned short stub_port)
{
int result = FAILED_LOOKUP;
Stub key(ip, stub_port);
std::map<Stub, int>::iterator pos = stub_receivers.find(key);
if (pos != stub_receivers.end())
{
result = pos->second;
}
return result;
}
void set_pending(int index, fd_set * write_fds)
{
if (rcvdb[index].valid == 1)
{
printf("Set sockfd: %d, maxfd: %d\n", rcvdb[index].sockfd, maxfd);
pending_receivers.insert(index);
FD_SET(rcvdb[index].sockfd, write_fds);
}
}
void clear_pending(int index, fd_set * write_fds)
{
if (rcvdb[index].valid == 1)
{
printf("Clear sockfd: %d\n", rcvdb[index].sockfd);
pending_receivers.erase(index);
FD_CLR(rcvdb[index].sockfd, write_fds);
rcvdb[index].pending = 0;
}
}
void remove_index(int index, fd_set * write_fds)
{
bool was_valid = empty_receivers.insert(index).second;
if (was_valid)
{
Stub stub_key(rcvdb[index].ip, rcvdb[index].stub_port);
stub_receivers.erase(stub_key);
Address address_key(rcvdb[index].ip, rcvdb[index].source_port,
rcvdb[index].dest_port);
address_receivers.erase(address_key);
clear_pending(index, write_fds);
rcvdb[index].valid = 0;
if (rcvdb[index].sockfd != -1)
{
close(rcvdb[index].sockfd);
rcvdb[index].sockfd = -1;
}
}
}
......@@ -108,12 +108,6 @@ void throughputProcessAck(ThroughputAckState * state, unsigned int sequence)
// How many bytes have been acknowledged since the last call to
// throughputTick()?
unsigned int bytesThisTick(ThroughputAckState * state) {
return state->ackSize;
}
// What is the bandwidth of the acknowledged bytes since the last call to
// throughputTick()?
unsigned int throughputTick(ThroughputAckState * state)
{
double result = 0.0;
......@@ -262,6 +256,8 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
u_int caplen = pkthdr->caplen;
u_short len, hlen, version, tcp_hlen, ack_bit;
u_long seq_start, seq_end, ack_seq, ip_src, ip_dst;
unsigned short source_port = 0;
unsigned short dest_port = 0;
int path_id, record_id, msecs, end, flag_resend=0;
sniff_path *path;
......@@ -327,8 +323,14 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
seq_start = ntohl(tp->seq);
seq_end = ((unsigned long)(seq_start+length));
ack_bit= ((tp)->ack & 0x0001);
path_id = search_rcvdb(ip_dst);
source_port = htons(tp->source);
dest_port = htons(tp->dest);
// path_id = search_rcvdb(ip_dst);
// I contacted the receiver. Therefore, my port is unique and
// the receiver's port is fixed. The destination is the
// receiver, therefore my port is the one that is of interest.
path_id = find_by_stub_port(ip_dst, source_port);
if (path_id != -1) { //a monitored outgoing packet
//ignore the pure outgoing ack
if ((ack_bit==1) && (seq_end==seq_start)) {
......@@ -371,7 +373,12 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
}
} else {
path_id = search_rcvdb(ip_src);
// path_id = search_rcvdb(ip_src);
// I contacted the receiver, so my port is unique and their
// port is the same every time. This means that if a packet is
// coming from them, the destination port is the one of
// interest.
path_id = find_by_stub_port(ip_src, dest_port);
if (path_id != -1) { //a monitored incoming packet
if (ack_bit == 1) { //has an acknowledgement
ack_seq = ntohl(tp->ack_seq);
......
#ifdef __cplusplus
extern "C"
{
#endif
#ifndef _STUB_H
#define _STUB_H
......@@ -50,11 +55,21 @@
#define CODE_DELAY 0x00000002
#define CODE_LOSS 0x00000003
enum { FAILED_LOOKUP = -1 };
// The int is the index of the connection.
typedef void (*handle_index)(int);
// The first int is the socket of the monitor.
// The second int is the index of the connection.
// This returns 1 for success and 0 for failure.
typedef int (*send_to_monitor)(int, int);
struct connection {
short valid;
int sockfd;
unsigned long ip;
unsigned short stub_port;
unsigned short source_port;
unsigned short dest_port;
time_t last_usetime; //last monitor access time
int pending; // How many bytes are pending to this peer?
};
......@@ -80,15 +95,19 @@ typedef struct loss_record loss_record;
extern short flag_debug;
extern int pcapfd;
extern int maxfd;
extern char sniff_interface[128];
extern connection snddb[CONCURRENT_SENDERS];
extern connection rcvdb[CONCURRENT_RECEIVERS];
extern sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
extern unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
extern unsigned long last_delays[CONCURRENT_RECEIVERS];
extern loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
extern unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
extern int search_rcvdb(unsigned long indexip);
extern void sniff(void);
extern void init_pcap(int to_ms);
void clean_exit(int);
typedef struct
{
......@@ -106,14 +125,84 @@ extern ThroughputAckState throughput[CONCURRENT_RECEIVERS];
// throughputTick() call.
extern unsigned int throughputTick(ThroughputAckState * state);
extern void throughputInit(ThroughputAckState * state, unsigned int sequence);
extern unsigned int bytesThisTick(ThroughputAckState * state);
#endif
// Add a potential sender to the pool.
void add_empty_sender(int index);
// Initialize a receiver or sender connection.
void init_connection(struct connection * conn);
// Run function on the index of every valid sender that is readable.
void for_each_readable_sender(handle_index function, fd_set * read_fds_copy);
// Try to find the sender by the IP address and the source port of the
// actual connection being created by the seinding peer. If there is a
// sender, the socket is closed and replaced with the sockfd
// argument. If the sender cannot be found, create a new sender based
// on the sockfd argument.
// Returns FAILED_LOOKUP if the sender cannot be found and there are
// no empty slots.
int replace_sender_by_stub_port(unsigned long ip, unsigned short stub_port,
int sockfd, fd_set * read_fds);
// Remove the index from the database, invalidating it.
void remove_sender_index(int index, fd_set * read_fds);
// Add a potential receiver to the pool.
void add_empty_receiver(int index);
// Run function on the index of each writable receiver which has some
// bytes pending.
void for_each_pending(handle_index function, fd_set * write_fds_copy);
// Run function on each index which will send an update to the
// monitor. Returns 1 for success and 0 for failure.
int for_each_to_monitor(send_to_monitor function, int monitor);
// Find the index of the receiver based on the IP address of the
// receiver, and the source and destination ports of the Emulab
// connection.
// Returns FAILED_LOOKUP on failure or the index of the receiver.
int find_by_address(unsigned long ip, unsigned short source_port,
unsigned short dest_port);
// Find the index of the receiver based on the above criteria. If this
// fails, create a new connection to the IP address.
// Returns FAILED_LOOKUP on failure or the index of the receiver.
// Failure happens when the receiver is not already in the database
// and there are no more empty slots in the database.
int insert_by_address(