Commit fad8051b authored by Jonathon Duerig's avatar Jonathon Duerig
Browse files

The monitor now correctly distinguishes between different connections on the...

The monitor now correctly distinguishes between different connections on the same pairs of nodes and interfaces with dummynet to implement this. It is now a good citizen and resets all of the pipes except those used for initial conditions when a connection ends.
parent da655189
...@@ -76,14 +76,23 @@ stub_ip = '' ...@@ -76,14 +76,23 @@ stub_ip = ''
stub_port = 0 stub_port = 0
netmon_output_version = 2 netmon_output_version = 2
# A throughput measurement is only passed on as an event if it is class Connection:
# larger than the last bandwidth from that connection def __init__(self):
connection_bandwidth = {} # The actual local port bound to a connection. Used for events.
self.local_port = ''
# A throughput measurement is only passed on as an event if it is
# larger than the last bandwidth from that connection
self.last_bandwidth = 0
initial_connection_bandwidth = {}
connection_map = {}
total_size = 0 total_size = 0
last_total = -1 last_total = -1
prev_time = 0.0 prev_time = 0.0
##########################################################################
def main_loop(): def main_loop():
global total_size, last_total global total_size, last_total
...@@ -126,6 +135,8 @@ def main_loop(): ...@@ -126,6 +135,8 @@ def main_loop():
last_total = total_size last_total = total_size
# sys.stdout.write('Loop end\n') # sys.stdout.write('Loop end\n')
##########################################################################
def read_args(): def read_args():
global ip_mapping_filename, this_experiment, this_ip, stub_ip, stub_port global ip_mapping_filename, this_experiment, this_ip, stub_ip, stub_port
global pid, eid, evclient global pid, eid, evclient
...@@ -194,6 +205,8 @@ def read_args(): ...@@ -194,6 +205,8 @@ def read_args():
if initial_filename != None: if initial_filename != None:
read_initial_conditions() read_initial_conditions()
##########################################################################
def populate_ip_tables(): def populate_ip_tables():
input = open(ip_mapping_filename, 'r') input = open(ip_mapping_filename, 'r')
line = input.readline() line = input.readline()
...@@ -205,6 +218,8 @@ def populate_ip_tables(): ...@@ -205,6 +218,8 @@ def populate_ip_tables():
emulated_to_interface[fields[0]] = fields[2] emulated_to_interface[fields[0]] = fields[2]
line = input.readline() line = input.readline()
##########################################################################
# Format of an initial conditions file is: # Format of an initial conditions file is:
# #
# List of lines, where each line is of the format: # List of lines, where each line is of the format:
...@@ -214,6 +229,7 @@ def populate_ip_tables(): ...@@ -214,6 +229,7 @@ def populate_ip_tables():
# bandwidth are integral values in milliseconds and kilobits per second # bandwidth are integral values in milliseconds and kilobits per second
# respectively. # respectively.
def read_initial_conditions(): def read_initial_conditions():
global initial_connection_bandwidth
input = open(initial_filename, 'r') input = open(initial_filename, 'r')
line = input.readline() line = input.readline()
while line != '': while line != '':
...@@ -222,11 +238,13 @@ def read_initial_conditions(): ...@@ -222,11 +238,13 @@ def read_initial_conditions():
if len(fields) == 5 and fields[0] == this_ip: if len(fields) == 5 and fields[0] == this_ip:
set_link(fields[0], fields[1], 'delay=' + str(int(fields[3])/2)) set_link(fields[0], fields[1], 'delay=' + str(int(fields[3])/2))
set_link(fields[0], fields[1], 'bandwidth=' + fields[2]) set_link(fields[0], fields[1], 'bandwidth=' + fields[2])
connection_bandwidth[fields[1]] = int(fields[2]) initial_connection_bandwidth[fields[1]] = int(fields[2])
line = input.readline() line = input.readline()
##########################################################################
def get_next_packet(conn): def get_next_packet(conn):
global total_size, last_total, prev_time global total_size, last_total, prev_time, connection_map
line = sys.stdin.readline() line = sys.stdin.readline()
if line == "": if line == "":
raise EOFError raise EOFError
...@@ -241,7 +259,7 @@ def get_next_packet(conn): ...@@ -241,7 +259,7 @@ def get_next_packet(conn):
linexp = re.compile('^(\d+\.\d+) > (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) (\((\d+)\))?') linexp = re.compile('^(\d+\.\d+) > (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) (\((\d+)\))?')
match = linexp.match(line) match = linexp.match(line)
conexp = re.compile('^(New|Connected|Closed|SO_RCVBUF|SO_SNDBUF): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+)( ((?:\d+)(?:\.(?:\d+))?))?') conexp = re.compile('^(New|Connected|LocalPort|Closed|SO_RCVBUF|SO_SNDBUF): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+)( ((?:\d+)(?:\.(?:\d+))?))?')
cmatch = conexp.match(line) cmatch = conexp.match(line)
if match: if match:
localport = 0 # We may not get this one localport = 0 # We may not get this one
...@@ -281,10 +299,16 @@ def get_next_packet(conn): ...@@ -281,10 +299,16 @@ def get_next_packet(conn):
value = cmatch.group(6) value = cmatch.group(6)
if not value_given: if not value_given:
value = '0' value = '0'
key = (ipaddr, localport, remoteport)
if emulated_to_real.has_key(ipaddr): if emulated_to_real.has_key(ipaddr):
sys.stdout.write("Got a connection event: " + event + "\n") sys.stdout.write("Got a connection event: " + event + "\n")
if event == 'New': if event == 'New':
connection_map[key] = Connection()
if initial_connection_bandwidth.has_key(ipaddr):
connection_map[key].last_bandwidth = initial_connection_bandwidth[ipaddr]
else:
sys.stderr.write("No initial condition for " + ipaddr + "\n")
connection_map[key].last_bandwidth = 0
send_command(conn, NEW_CONNECTION_COMMAND, TCP_CONNECTION, ipaddr, send_command(conn, NEW_CONNECTION_COMMAND, TCP_CONNECTION, ipaddr,
localport, remoteport, '') localport, remoteport, '')
send_command(conn, TRAFFIC_MODEL_COMMAND, TCP_CONNECTION, ipaddr, send_command(conn, TRAFFIC_MODEL_COMMAND, TCP_CONNECTION, ipaddr,
...@@ -302,6 +326,8 @@ def get_next_packet(conn): ...@@ -302,6 +326,8 @@ def get_next_packet(conn):
elif event == 'Closed': elif event == 'Closed':
send_command(conn, DELETE_CONNECTION_COMMAND, TCP_CONNECTION, ipaddr, send_command(conn, DELETE_CONNECTION_COMMAND, TCP_CONNECTION, ipaddr,
localport, remoteport, '') localport, remoteport, '')
set_connection(this_ip, connection_map[key].local_port, ipaddr,
str(remoteport), '', 'CLEAR')
elif event == 'Connected': elif event == 'Connected':
send_command(conn, CONNECT_COMMAND, TCP_CONNECTION, ipaddr, send_command(conn, CONNECT_COMMAND, TCP_CONNECTION, ipaddr,
localport, remoteport, '') localport, remoteport, '')
...@@ -317,6 +343,10 @@ def get_next_packet(conn): ...@@ -317,6 +343,10 @@ def get_next_packet(conn):
localport, remoteport, localport, remoteport,
save_int(CONNECTION_SEND_BUFFER_SIZE) save_int(CONNECTION_SEND_BUFFER_SIZE)
+ save_int(int(value))) + save_int(int(value)))
elif event == 'LocalPort':
connection_map[key].local_port = value
set_connection(this_ip, value,
ipaddr, str(remoteport), '', 'CREATE')
else: else:
sys.stdout.write('skipped line with an invalid event: ' sys.stdout.write('skipped line with an invalid event: '
+ event + '\n') + event + '\n')
...@@ -325,9 +355,11 @@ def get_next_packet(conn): ...@@ -325,9 +355,11 @@ def get_next_packet(conn):
+ ipaddr + '\n') + ipaddr + '\n')
else: else:
sys.stdout.write('skipped line in the wrong format: ' + line) sys.stdout.write('skipped line in the wrong format: ' + line)
##########################################################################
def receive_characteristic(conn): def receive_characteristic(conn):
global connection_bandwidth global connection_map
buf = conn.recv(12) buf = conn.recv(12)
if len(buf) != 12: if len(buf) != 12:
sys.stdout.write('Event header is the wrong size. Length: ' sys.stdout.write('Event header is the wrong size. Length: '
...@@ -339,14 +371,18 @@ def receive_characteristic(conn): ...@@ -339,14 +371,18 @@ def receive_characteristic(conn):
dest = real_to_emulated[int_to_ip(load_int(buf[4:8]))] dest = real_to_emulated[int_to_ip(load_int(buf[4:8]))]
source_port = load_short(buf[8:10]) source_port = load_short(buf[8:10])
dest_port = load_short(buf[10:12]) dest_port = load_short(buf[10:12])
key = (dest, source_port, dest_port);
real_local_port = connection_map[key].local_port
buf = conn.recv(size); buf = conn.recv(size);
if len(buf) != size: if len(buf) != size:
sys.stdout.write('Event body is the wrong size.\n') sys.stdout.write('Event body is the wrong size.\n')
return False return False
if eventType == EVENT_FORWARD_PATH: if eventType == EVENT_FORWARD_PATH:
set_link(this_ip, dest, buf) # set_link(this_ip, dest, buf)
set_connection(this_ip, real_local_port, dest, str(dest_port), buf)
elif eventType == EVENT_BACKWARD_PATH: elif eventType == EVENT_BACKWARD_PATH:
set_link(dest, this_ip, buf) # set_link(dest, this_ip, buf)
set_connection(dest, str(dest_port), this_ip, real_local_port, buf)
elif eventType == TENTATIVE_THROUGHPUT: elif eventType == TENTATIVE_THROUGHPUT:
# There is a throughput number, but we don't know whether the link # There is a throughput number, but we don't know whether the link
# is saturated or not. If the link is not saturated, then we just # is saturated or not. If the link is not saturated, then we just
...@@ -362,23 +398,29 @@ def receive_characteristic(conn): ...@@ -362,23 +398,29 @@ def receive_characteristic(conn):
# AUTHORITATIVE_BANDWIDTH. Therefore, we assume that it is # AUTHORITATIVE_BANDWIDTH. Therefore, we assume that it is
# authoritative if it is greater than the previous measurement, # authoritative if it is greater than the previous measurement,
# and that it is just a throughput number if it is less. # and that it is just a throughput number if it is less.
if connection_bandwidth.has_key(dest): if connection_map.has_key(key):
if int(buf) > connection_bandwidth[dest]: if int(buf) > connection_map[key].last_bandwidth:
connection_bandwidth[dest] = int(buf) connection_map[key].last_bandwidth = int(buf)
set_link(this_ip, dest, 'bandwidth=' + buf) # set_link(this_ip, dest, 'bandwidth=' + buf)
set_connection(this_ip, real_local_port, dest, str(dest_port),
'bandwidth=' + buf)
else: else:
sys.stdout.write('ignored TENTATIVE_THROUGHPUT for %s - %i vs %i\n' sys.stdout.write('ignored TENTATIVE_THROUGHPUT for %s - %i vs %i\n'
% (dest,int(buf), connection_bandwidth[dest])) % (dest,int(buf),connection_map[key].last_bandwidth))
else: else:
sys.stdout.write('ignored TENTATIVE_THROUGHPUT for %s - no data\n' % (dest)) sys.stdout.write('ignored TENTATIVE_THROUGHPUT for %s - no data\n' % (dest))
elif eventType == AUTHORITATIVE_BANDWIDTH and int(buf) > 0: elif eventType == AUTHORITATIVE_BANDWIDTH and int(buf) > 0:
# We know that the bandwidth has definitely changed. Reset everything. # We know that the bandwidth has definitely changed. Reset everything.
connection_bandwidth[dest] = int(buf) connection_map[key].last_bandwidth = int(buf)
set_link(this_ip, dest, 'bandwidth=' + buf) # set_link(this_ip, dest, 'bandwidth=' + buf)
set_connection(this_ip, real_local_port, dest, str(dest_port),
'bandwidth=' + buf)
else: else:
sys.stdout.write('Other: ' + str(eventType) + ', ' + str(value) + '\n'); sys.stdout.write('Other: ' + str(eventType) + ', ' + str(value) + '\n');
return True return True
##########################################################################
def set_bandwidth(kbps, dest): def set_bandwidth(kbps, dest):
# sys.stdout.write('<event> bandwidth=' + str(kbps) + '\n') # sys.stdout.write('<event> bandwidth=' + str(kbps) + '\n')
now = time.time() now = time.time()
...@@ -388,6 +430,8 @@ def set_bandwidth(kbps, dest): ...@@ -388,6 +430,8 @@ def set_bandwidth(kbps, dest):
+ ' ' + str(kbps*1000/8) + '\n') + ' ' + str(kbps*1000/8) + '\n')
return set_link(this_ip, dest, 'bandwidth=' + str(kbps)) return set_link(this_ip, dest, 'bandwidth=' + str(kbps))
##########################################################################
# Set delay on the link. We are given round trip time. # Set delay on the link. We are given round trip time.
def set_delay(milliseconds, dest): def set_delay(milliseconds, dest):
now = time.time() now = time.time()
...@@ -402,22 +446,34 @@ def set_delay(milliseconds, dest): ...@@ -402,22 +446,34 @@ def set_delay(milliseconds, dest):
else: else:
return error return error
##########################################################################
def set_loss(probability, dest): def set_loss(probability, dest):
return set_link(this_ip, dest, 'plr=' + str(probability)) return set_link(this_ip, dest, 'plr=' + str(probability))
##########################################################################
def set_max_delay(milliseconds, dest): def set_max_delay(milliseconds, dest):
return set_link(this_ip, dest, 'MAXINQ=' + str(milliseconds)) return set_link(this_ip, dest, 'MAXINQ=' + str(milliseconds))
def set_link(source, dest, ending): ##########################################################################
def set_connection(source, source_port, dest, dest_port, ending, event_type='MODIFY'):
set_link(source, dest, 'srcport=' + source_port
+ ' dstport=' + dest_port + ' protocol=tcp ' + ending, event_type)
##########################################################################
def set_link(source, dest, ending, event_type='MODIFY'):
# Create event system address tuple to identify the notification. # Create event system address tuple to identify the notification.
# The event type is currently always "MODIFY", and the event is not # The event is not sent through the scheduler; it is sent as an
# sent through the scheduler; it is sent as an immediate notification. # immediate notification.
global evclient global evclient
evtuple = address_tuple() evtuple = address_tuple()
evtuple.host = ADDRESSTUPLE_ALL evtuple.host = ADDRESSTUPLE_ALL
evtuple.site = ADDRESSTUPLE_ALL evtuple.site = ADDRESSTUPLE_ALL
evtuple.group = ADDRESSTUPLE_ALL evtuple.group = ADDRESSTUPLE_ALL
evtuple.eventtype = 'MODIFY' evtuple.eventtype = event_type.upper() #'MODIFY'
evtuple.objtype = 'LINK' evtuple.objtype = 'LINK'
evtuple.expt = this_experiment evtuple.expt = this_experiment
evtuple.objname = emulated_to_interface[source] evtuple.objname = emulated_to_interface[source]
...@@ -430,11 +486,14 @@ def set_link(source, dest, ending): ...@@ -430,11 +486,14 @@ def set_link(source, dest, ending):
pass pass
evargstr = ' '.join(evargs) evargstr = ' '.join(evargs)
evnotification.setArguments(evargstr) evnotification.setArguments(evargstr)
sys.stdout.write('event: ' + evargstr + '\n') sys.stdout.write('event: ' + '(' + event_type.upper() + ') '
+ evargstr + '\n')
# Must invert the return value of the notification send function as the # Must invert the return value of the notification send function as the
# original code here returned the exit code of tevc (hence 0 means success). # original code here returned the exit code of tevc (hence 0 means success).
return not evclient.notify(evnotification) return not evclient.notify(evnotification)
##########################################################################
def send_command(conn, command_id, protocol, ipaddr, localport, remoteport, def send_command(conn, command_id, protocol, ipaddr, localport, remoteport,
command): command):
output = (save_char(command_id) output = (save_char(command_id)
...@@ -447,6 +506,8 @@ def send_command(conn, command_id, protocol, ipaddr, localport, remoteport, ...@@ -447,6 +506,8 @@ def send_command(conn, command_id, protocol, ipaddr, localport, remoteport,
# sys.stdout.write('Sending command: CHECKSUM=' + str(checksum(output)) + '\n') # sys.stdout.write('Sending command: CHECKSUM=' + str(checksum(output)) + '\n')
conn.sendall(output) conn.sendall(output)
##########################################################################
def send_destinations(conn, packet_list): def send_destinations(conn, packet_list):
# sys.stdout.write('<send> total size:' + str(total_size) + ' packet count:' + str(len(packet_list)) + '\n')# + ' -- ' # sys.stdout.write('<send> total size:' + str(total_size) + ' packet count:' + str(len(packet_list)) + '\n')# + ' -- '
# + str(packet_list) + '\n') # + str(packet_list) + '\n')
...@@ -470,36 +531,54 @@ def send_destinations(conn, packet_list): ...@@ -470,36 +531,54 @@ def send_destinations(conn, packet_list):
prev_time = packet[3] prev_time = packet[3]
conn.sendall(output) conn.sendall(output)
##########################################################################
def load_int(str): def load_int(str):
return load_n(str, 4) return load_n(str, 4)
##########################################################################
def save_int(number): def save_int(number):
return save_n(number, 4) return save_n(number, 4)
##########################################################################
def load_short(str): def load_short(str):
return load_n(str, 2) return load_n(str, 2)
##########################################################################
def save_short(number): def save_short(number):
return save_n(number, 2) return save_n(number, 2)
##########################################################################
def load_char(str): def load_char(str):
return load_n(str, 1) return load_n(str, 1)
##########################################################################
def save_char(number): def save_char(number):
return save_n(number, 1) return save_n(number, 1)
##########################################################################
def load_n(str, n): def load_n(str, n):
result = 0 result = 0
for i in range(n): for i in range(n):
result = result | ((ord(str[i]) & 0xff) << (8*(n-1-i))) result = result | ((ord(str[i]) & 0xff) << (8*(n-1-i)))
return result return result
##########################################################################
def save_n(number, n): def save_n(number, n):
result = '' result = ''
for i in range(n): for i in range(n):
result = result + chr((number >> ((n-1-i)*8)) & 0xff) result = result + chr((number >> ((n-1-i)*8)) & 0xff)
return result return result
##########################################################################
def ip_to_int(ip): def ip_to_int(ip):
ip_list = ip.split('.', 3) ip_list = ip.split('.', 3)
result = 0 result = 0
...@@ -516,6 +595,8 @@ def ip_to_int(ip): ...@@ -516,6 +595,8 @@ def ip_to_int(ip):
# ip_list.reverse() # ip_list.reverse()
# return '.'.join(ip_list) # return '.'.join(ip_list)
##########################################################################
def int_to_ip(num): def int_to_ip(num):
ip_list = ['0', '0', '0', '0'] ip_list = ['0', '0', '0', '0']
for index in range(4): for index in range(4):
...@@ -523,6 +604,8 @@ def int_to_ip(num): ...@@ -523,6 +604,8 @@ def int_to_ip(num):
num = num >> 8 num = num >> 8
return '.'.join(ip_list) return '.'.join(ip_list)
##########################################################################
def checksum(buf): def checksum(buf):
total = 0 total = 0
flip = 1 flip = 1
...@@ -531,4 +614,6 @@ def checksum(buf): ...@@ -531,4 +614,6 @@ def checksum(buf):
# flip *= -1 # flip *= -1
return total return total
##########################################################################
main_loop() main_loop()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment