monitor.py 27.4 KB
Newer Older
Mike Hibler's avatar
Mike Hibler committed
1 2 3 4 5 6
#
# EMULAB-COPYRIGHT
# Copyright (c) 2006 University of Utah and the Flux Group.
# All rights reserved.
#

7
# Monitors the application's behaviour and report to the network model.
8 9 10 11 12 13

import sys
import os
import time
import socket
import select
14
import re
15
import traceback
16
import errno
17
from optparse import OptionParser
18 19
sys.path.append("/usr/testbed/lib")
from tbevent import EventClient, address_tuple, ADDRESSTUPLE_ALL
20

21 22
EVENT_FORWARD_PATH = 0
EVENT_BACKWARD_PATH = 1
23 24 25
TENTATIVE_THROUGHPUT = 2
AUTHORITATIVE_BANDWIDTH = 3
  
26 27 28 29 30 31 32 33
NEW_CONNECTION_COMMAND = 0
TRAFFIC_MODEL_COMMAND = 1
CONNECTION_MODEL_COMMAND = 2
SENSOR_COMMAND = 3
CONNECT_COMMAND = 4
TRAFFIC_WRITE_COMMAND = 5
DELETE_CONNECTION_COMMAND = 6

34 35 36 37 38 39 40 41
command_to_string = {NEW_CONNECTION_COMMAND: 'New Connection',
                     TRAFFIC_MODEL_COMMAND: 'Traffic Model',
                     CONNECTION_MODEL_COMMAND: 'Connection Model',
                     SENSOR_COMMAND: 'Sensor',
                     CONNECT_COMMAND: 'Connect',
                     TRAFFIC_WRITE_COMMAND: 'Write',
                     DELETE_CONNECTION_COMMAND: 'Delete'}

42
NULL_SENSOR = 0
43 44 45 46 47 48
STATE_SENSOR = 1
PACKET_SENSOR = 2
DELAY_SENSOR = 3
MIN_DELAY_SENSOR = 4
MAX_DELAY_SENSOR = 5
THROUGHPUT_SENSOR = 6
49
EWMA_THROUGHPUT_SENSOR = 7
50
LEAST_SQUARES_THROUGHPUT = 8
51 52
TSTHROUGHPUT_SENSOR = 9
AVERAGE_THROUGHPUT_SENSOR = 10
53 54 55 56 57 58 59 60
UDP_STATE_SENSOR = 11
UDP_PACKET_SENSOR = 12
UDP_THROUGHPUT_SENSOR = 13
UDP_MINDELAY_SENSOR = 14
UDP_MAXDELAY_SENSOR = 15
UDP_RTT_SENSOR = 16
UDP_LOSS_SENSOR = 17
UDP_AVG_THROUGHPUT_SENSOR = 18
61 62 63 64 65 66 67 68

CONNECTION_SEND_BUFFER_SIZE = 0
CONNECTION_RECEIVE_BUFFER_SIZE = 1
CONNECTION_MAX_SEGMENT_SIZE = 2
CONNECTION_USE_NAGLES = 3

TCP_CONNECTION = 0
UDP_CONNECTION = 1
69

70 71
KEY_SIZE = 32

72 73 74 75 76 77 78
# Client connection to event server.  Make this global since we don't
# want to be continually connecting and disconnecting (and creating/destroying
# the eventsys object).  The object is actually instantiated at the top of
# the main_loop function.
TBEVENT_SERVER = "event-server.emulab.net"
evclient = None

79 80 81 82
emulated_to_real = {}
real_to_emulated = {}
emulated_to_interface = {}
ip_mapping_filename = ''
83
initial_filename = ''
84
this_experiment = ''
85 86
pid = ''
eid = ''
87
this_ip = ''
88
stub_ip = ''
89
stub_port = 0
90 91
netmon_output_version = 3
magent_version = 1
92

93 94
is_fake = False

95
class Dest:
96 97 98 99 100 101 102 103 104 105 106 107
  def __init__(self, tup=None):
    if tup == None:
      self.local_port = ''
      self.remote_ip = ''
      self.remote_port = ''
    else:
      self.local_port = tup[0]
      self.remote_ip = tup[1]
      self.remote_port = tup[2]

  def toTuple(self):
    return (self.local_port, self.remote_ip, self.remote_port)
108 109 110 111 112

class Connection:
  def __init__(self, new_dest, new_number):
    # The actual local port bound to a connection. Used for events.
    self.dest = new_dest
113 114
    # A throughput measurement is only passed on as an event if it is
    # larger than the last bandwidth from that connection
115 116

    # Initialized when a RemoteIP or SendTo command is received
117
    self.last_bandwidth = 0
118
    # Initialized when a Connect or SendTo command is received
119
    self.prev_time = 0.0
120 121 122 123
    # Initialized by start_real_connection() or lookup()
    self.number = new_number
    # Initialized by finalize_real_connection() or lookup()
    self.is_connected = False
124 125
    # Initialized when a RemoteIP is received
    self.is_valid = True
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141

class Socket:
  def __init__(self):
    # Initialized when a New is received
    self.protocol = TCP_CONNECTION
    self.dest_to_number = {}
    self.number_to_connection = {}
    # Kept consistent by lookup() and start_real_connection() and
    # finalize_real_connection()
    self.count = 0
    # Initialized by SO_RCVBUF and SO_SNDBUF
    self.receive_buffer_size = 0
    self.send_buffer_size = 0

# Searches for a dest, inserts if dest not found. Returns the connection associated with dest.
  def lookup(self, dest):
142 143 144 145
    sys.stdout.write('Looking up: ' + dest.local_port + ':'
                     + dest.remote_ip + ':' + dest.remote_port + '\n')
    if self.dest_to_number.has_key(dest.toTuple()):
      number = self.dest_to_number[dest.toTuple()]
146 147
      return self.number_to_connection[number]
    else:
148 149
      sys.stdout.write('Lookup: Failed, creating new connection\n')
      self.dest_to_number[dest.toTuple()] = self.count
150 151
      self.number_to_connection[self.count] = Connection(dest, self.count)
      self.count = self.count + 1
152
      return self.number_to_connection[self.count - 1]
153 154

initial_connection_bandwidth = {}
155 156
socket_map = {}

157 158
##########################################################################

159 160
def main_loop():
  # Initialize
161
  read_args()
162
  conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
163 164 165 166 167
  if not is_fake:
    #conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    sys.stdout.write("Connect: stub_ip is " + stub_ip + ":" + str(stub_port)
                     + "\n")
    conn.connect((stub_ip, stub_port))
168 169
  poll = select.poll()
  poll.register(sys.stdin, select.POLLIN)
170 171
  if not is_fake:
    poll.register(conn, select.POLLIN)
172 173 174 175
  done = False

  while not done:
    # Collect data until the next quanta boundary
176 177 178 179 180
    try:
      fdlist = poll.poll()
      for pos in fdlist:
        if (pos[0] == sys.stdin.fileno() and (pos[1] & select.POLLIN) != 0
            and not done):
181
          # A line of data from tcpdump is available.
182 183 184 185 186 187 188 189 190 191 192 193
          get_next_packet(conn)
        elif (pos[0] == conn.fileno() and (pos[1] & select.POLLIN) != 0
              and not done):
          # A record for change in link characteristics is available.
          receive_characteristic(conn)
        elif not done:
          raise Exception('ERROR: Unknown fd on select: fd: ' + str(pos[0])
                          + ' conn-fd: ' + str(conn.fileno()))
    except EOFError:
      sys.stdout.write('Done: Got an EOF on stdin\n')
      done = True
    except Exception, err:
194 195
      traceback.print_exc(10, sys.stderr)
      sys.stderr.write('----\n')
196

197 198
##########################################################################

199
def read_args():
200
  global ip_mapping_filename, this_experiment, this_ip, stub_ip, stub_port
201
  global pid, eid, evclient
202
  global initial_filename
203
  global is_fake
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224

  usage = "usage: %prog [options]"
  parser = OptionParser(usage=usage)
  parser.add_option("--mapping", action="store", type="string",
                    dest="ip_mapping_filename", metavar="MAPPING_FILE",
                    help="File mapping IP addresses on Emulab to those on PlanetLab (required)")
  parser.add_option("--experiment", action="store", type="string",
                    dest="this_experiment", metavar="EXPERIMENT_NAME",
                    help="Experiment name of the form pid/eid (required)")
  parser.add_option("--ip", action="store", type="string",
                    dest="this_ip", metavar="X.X.X.X",
                    help="The IP address the monitor will use (required)")
  parser.add_option("--stub-ip", action="store", type="string",
                    dest="stub_ip", metavar="X.X.X.X",
                    help="The IP address of the stub (if not specified, defaults to the one in the ip mapping)")
  parser.add_option("--stub-port", action="store", type="int",
                    dest="stub_port", default=4200,
                    help="The port used to connect to the stub (defaults to 4200)")
  parser.add_option("--initial", action="store", type="string",
                    dest="initial_filename", metavar="INITIAL_CONDITIONS_FILE",
                    help="File giving initial conditions for connections (defaults to no shaping)")
225
  parser.add_option("--fake", action="store_true", dest="is_fake")
226 227 228 229 230 231 232 233

  (options, args) = parser.parse_args()
  ip_mapping_filename = options.ip_mapping_filename
  this_experiment = options.this_experiment
  this_ip = options.this_ip
  stub_ip = options.stub_ip
  stub_port = options.stub_port
  initial_filename = options.initial_filename
234 235 236 237 238 239
  is_fake = options.is_fake

  if is_fake:
    sys.stderr.write('***FAKE***\n')
  else:
    sys.stderr.write('***REAL***\n')
240 241 242 243 244 245 246 247 248 249 250 251 252 253

  if len(args) != 0:
    parser.print_help()
    parser.error("Invalid argument(s): " + str(args))
  if ip_mapping_filename == None:
    parser.print_help()
    parser.error("Missing --mapping=MAPPING_FILE option")
  if this_experiment == None:
    parser.print_help()
    parser.error("Missing --experiment=EXPERIMENT_NAME option")
  if this_ip == None:
    parser.print_help()
    parser.error("Missing --ip=X.X.X.X option")

254 255 256 257 258 259 260
  (pid,eid) = this_experiment.split('/')

  # XXX: The name of the event server is hardwired here, and due to the
  #      lack of autoconf, isn't even an autoconf var.  Could add a command
  #      line option to set it if needed.
  # XXX: This HAS to be done before here, becuase read_args can fire
  #      off set_link().
261 262 263 264 265
  #
  if not is_fake:
    evclient = EventClient(server=TBEVENT_SERVER,
                           keyfile="/proj/%s/exp/%s/tbdata/eventkey"
                           % (pid,eid))
266

267 268
  populate_ip_tables()
  if stub_ip == None:
269 270 271 272 273 274 275 276 277
    if emulated_to_real.has_key(this_ip):
      sys.stdout.write('Init: stub_ip was None before. '
                       + 'Using ip-mapping to find stub.\n')
      stub_ip = emulated_to_real[this_ip]
    elif is_fake:
      stub_ip = 'stub_ip'
    else:
      raise Exception('ERROR: No stub_ip was given. '
                      + 'Could not look one up in ip_mapping.')
278 279
  if initial_filename != None:
    read_initial_conditions()
280

281 282
##########################################################################

283 284 285 286 287 288 289 290 291 292 293
def populate_ip_tables():
  input = open(ip_mapping_filename, 'r')
  line = input.readline()
  while line != '':
    fields = line.strip().split(' ', 2)
    if len(fields) == 3:
      emulated_to_real[fields[0]] = fields[1]
      real_to_emulated[fields[1]] = fields[0]
      emulated_to_interface[fields[0]] = fields[2]
    line = input.readline()

294 295
##########################################################################

296 297 298
# Format of an initial conditions file is:
#
# List of lines, where each line is of the format:
299
# <source-ip> <dest-ip> <bandwidth> <delay> <loss>
300 301 302 303 304
#
# Where source and dest ip addresses are in x.x.x.x format, and delay and
# bandwidth are integral values in milliseconds and kilobits per second
# respectively.
def read_initial_conditions():
305
  global initial_connection_bandwidth
306 307 308
  input = open(initial_filename, 'r')
  line = input.readline()
  while line != '':
309 310 311
    # Don't worry about loss for now. Just discard the value.
    fields = line.strip().split(' ', 4)
    if len(fields) == 5 and fields[0] == this_ip:
312 313
      set_link(fields[0], fields[1], 'delay=' + str(int(fields[3])/2))
      set_link(fields[0], fields[1], 'bandwidth=' + fields[2])
314
      initial_connection_bandwidth[fields[1]] = int(fields[2])
315 316
    line = input.readline()

317 318
##########################################################################

319
def get_next_packet(conn):
320
  line = sys.stdin.readline()
321 322
  if line == "":
      raise EOFError
323
  if netmon_output_version == 3:
324
    linexp = re.compile('^(New|RemoteIP|RemotePort|LocalPort|TCP_NODELAY|TCP_MAXSEG|SO_RCVBUF|SO_SNDBUF|Connected|Accepted|Send|SendTo|Closed): ([^ ]*) (\d+\.\d+) ([^ ]*)\n$')
325
  else:
326
    raise Exception("ERROR: Only input version 3 is supported")
327
  match = linexp.match(line)
328
  if match:
329 330 331 332 333 334
    event = match.group(1)
    key = match.group(2)
    timestamp = float(match.group(3))
    value = match.group(4)
    process_event(conn, event, key, timestamp, value)
  else:
Jonathon Duerig's avatar
Jonathon Duerig committed
335
    sys.stderr.write('ERROR: skipped line in the wrong format: ' + line + '\n')
336 337 338 339 340 341

##########################################################################

def process_event(conn, event, key, timestamp, value):
  global socket_map
  if len(key) > KEY_SIZE - 2:
342 343
    sys.stderr.write('ERROR: Event has a key with > KEY_SIZE - 2 characters.'
                     + ' Truncating: ' + key + '\n')
344 345 346 347 348
    key = key[:KEY_SIZE - 2]
  key = key.ljust(KEY_SIZE - 2)
  if event == 'New':
    socket_map[key] = Socket()
  if not socket_map.has_key(key):
349 350
    raise Exception('ERROR: libnetmon event received that does not '
                    + 'correspond to a socket. Key: ' + key)
351 352 353 354 355 356
  sock = socket_map[key]
  if event == 'New':
    if value == 'TCP':
      sock.protocol = TCP_CONNECTION
    elif value == 'UDP':
      sock.protocol = UDP_CONNECTION
357
      sys.stderr.write('Notify: Set socket to UDP_CONNECTION: ' + key + '\n')
358 359
    else:
      sock.protocol = TCP_CONNECTION
360 361
      sys.stderr.write('ERROR: Received "New" event with invalid protocol.'
                       'Assuming TCP_CONNECTION. Protocol: ' + value + '\n')
362 363 364
  elif event == 'RemoteIP':
    start_real_connection(sock)
    app_connection = sock.number_to_connection[0]
365 366
    if emulated_to_real.has_key(value):
      app_connection.is_valid = True
367 368
      app_connection.dest.remote_ip = value
    else:
369 370
      sys.stderr.write('ERROR: Connection invalid: ' + key + ': ' + value
                       + '\n')
371
      app_connection.is_valid = False
372 373 374 375
    finalize_real_connection(conn, key, sock, app_connection)
    if initial_connection_bandwidth.has_key(value):
      sock.number_to_connection[0].last_bandwidth = initial_connection_bandwidth[value]
    else:
376
      sys.stderr.write("ERROR: No initial condition for " + value + "\n")
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
      sock.number_to_connection[0].last_bandwidth = 0
  elif event == 'RemotePort':
    start_real_connection(sock)
    sock.number_to_connection[0].dest.remote_port = value
    app_connection = sock.number_to_connection[0]
    finalize_real_connection(conn, key, sock, app_connection)
  elif event == 'LocalPort':
    start_real_connection(sock)
    sock.number_to_connection[0].dest.local_port = value
    app_connection = sock.number_to_connection[0]
    finalize_real_connection(conn, key, sock, app_connection)
  elif event == 'SO_RCVBUF':
    sock.receive_buffer_size = int(value)
  elif event == 'SO_SNDBUF':
    sock.send_buffer_size = int(value)
392
  elif event == 'Connected' or event == 'Accepted':
393 394 395 396
    if sock.protocol != UDP_CONNECTION:
      app_connection = sock.number_to_connection[0]
      finalize_real_connection(conn, key, sock, app_connection)
      sock.number_to_connection[0].prev_time = timestamp
397 398
  elif event == 'Send':
    app_connection = sock.number_to_connection[0]
399 400 401
    if app_connection.is_valid:
      send_write(conn, key, timestamp, app_connection, value)
      app_connection.prev_time = timestamp
402 403
    else:
      sys.stderr.write('Invalid Send\n')
404 405 406 407 408 409 410 411 412 413 414
  elif event == 'SendTo':
    # If this is a 'new connection' as well, then do not actually send
    # the sendto command. This is so that there is no '0' delta, and
    # so that trivial (single packet) connections do cause wierd
    # measurements.
    regexp = re.compile('^(\d+):(\d+\.\d+\.\d+\.\d+):(\d+):(\d+)')
    match = regexp.match(value)
    if match:
      dest = Dest()
      dest.local_port = match.group(1)
      dest.remote_ip = match.group(2)
415
      dest.remote_port = match.group(3)
416
      size = int(match.group(4))
417 418 419 420 421 422 423 424 425
      if emulated_to_real.has_key(dest.remote_ip):
        app_connection = sock.lookup(dest)
        if app_connection.is_connected == False:
          if initial_connection_bandwidth.has_key(app_connection.dest.remote_ip):
            app_connection.last_bandwidth = initial_connection_bandwidth[app_connection.dest.remote_ip]
          else:
            app_connection.last_bandwidth = 0
          app_connection.prev_time = timestamp
          app_connection.is_connected = True
426 427 428 429
          set_connection(this_ip, app_connection.dest.local_port,
                         app_connection.dest.remote_ip,
                         app_connection.dest.remote_port,
                         proto_to_string(sock.protocol), 'CREATE')
430 431
          send_connect(conn, key + save_short(app_connection.number),
                       sock, app_connection)
432
        else:
433 434
          send_write(conn, key, timestamp, app_connection, size)
          app_connection.prev_time = timestamp
435 436
  elif event == 'Closed':
    for pos in sock.number_to_connection.itervalues():
437 438 439 440 441 442
      if pos.is_valid:
        send_command(conn, key + save_short(pos.number),
                     DELETE_CONNECTION_COMMAND, '')
        set_connection(this_ip, pos.dest.local_port,
                       pos.dest.remote_ip, pos.dest.remote_port,
                       proto_to_string(sock.protocol), 'CLEAR')
443
  else:
Jonathon Duerig's avatar
Jonathon Duerig committed
444 445
    sys.stderr.write('ERROR: skipped line with an invalid event: '
                     + event + '\n')
446

447 448 449 450 451 452 453 454 455
def proto_to_string(proto):
  if proto == TCP_CONNECTION:
    return 'protocol=tcp'
  elif proto == UDP_CONNECTION:
    return 'protocol=udp'
  else:
    sys.stderr.write('ERROR: Invalid protocol received in proto_to_string: '
                     + str(proto) + '\n')
    return 'protocol=tcp'
456 457
##########################################################################
  
458 459 460 461 462
def start_real_connection(sock):
  if sock.count == 0:
    sock.number_to_connection[0] = Connection(Dest(), 0)
    sock.count = 1

463 464
##########################################################################
    
465 466 467 468 469 470 471
def finalize_real_connection(conn, key, sock, app_connection):
  dest = sock.number_to_connection[0].dest
  if (dest.local_port != ''
      and dest.remote_port != ''
      and dest.remote_ip != ''
      and not sock.number_to_connection[0].is_connected):
    sock.number_to_connection[0].is_connected = True
472
    sock.dest_to_number[dest.toTuple()] = 0
473 474 475 476 477 478
    if app_connection.is_valid:
      set_connection(this_ip, app_connection.dest.local_port,
                     app_connection.dest.remote_ip,
                     app_connection.dest.remote_port,
                     proto_to_string(sock.protocol), 'CREATE')
      send_connect(conn, key + save_short(0), sock, app_connection)
479

480 481
##########################################################################
    
482 483 484 485 486 487 488 489 490
def send_connect(conn, key, sock, app_connection):
  if sock.protocol == TCP_CONNECTION:
    min_delay = MIN_DELAY_SENSOR
    max_delay = MAX_DELAY_SENSOR
    average_throughput = AVERAGE_THROUGHPUT_SENSOR
  elif sock.protocol == UDP_CONNECTION:
    min_delay = UDP_MINDELAY_SENSOR
    max_delay = UDP_MAXDELAY_SENSOR
    average_throughput = UDP_AVG_THROUGHPUT_SENSOR
491
  else:
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
    return
  send_command(conn, key, NEW_CONNECTION_COMMAND,
               save_char(sock.protocol))
  send_command(conn, key, TRAFFIC_MODEL_COMMAND, '')
  send_command(conn, key, SENSOR_COMMAND, save_int(min_delay))
  send_command(conn, key, SENSOR_COMMAND, save_int(max_delay))
  #send_command(conn, key, SENSOR_COMMAND, save_int(NULL_SENSOR))
  #send_command(conn, key, SENSOR_COMMAND, save_int(EWMA_THROUGHPUT_SENSOR)
  send_command(conn, key, SENSOR_COMMAND,
               save_int(average_throughput))
  send_command(conn, key, CONNECTION_MODEL_COMMAND,
               save_int(CONNECTION_RECEIVE_BUFFER_SIZE)
               + save_int(sock.receive_buffer_size))
  send_command(conn, key, CONNECTION_MODEL_COMMAND,
               save_int(CONNECTION_SEND_BUFFER_SIZE)
               + save_int(sock.send_buffer_size))
  send_command(conn, key, CONNECT_COMMAND,
               save_int(ip_to_int(
                 emulated_to_real[app_connection.dest.remote_ip])))

def send_write(conn, key, timestamp, app_connection, size):
  send_command(conn, key + save_short(app_connection.number),
               TRAFFIC_WRITE_COMMAND,
               save_int(int((timestamp - app_connection.prev_time)*1000))
               + save_int(int(size)))
517 518 519

##########################################################################

520 521 522 523 524 525 526 527 528
RECEIVE_HEAD = 0
RECEIVE_BODY = 1

receive_head_buffer = ''
receive_body_buffer = ''
receive_state = RECEIVE_HEAD
receive_head_size = 36
receive_body_size = 0

529
def receive_characteristic(conn):
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
  global receive_head_buffer, receive_body_buffer, receive_state
  global receive_body_size

  if receive_state == RECEIVE_HEAD:
    receive_head_buffer = (receive_head_buffer
                           + conn.recv(receive_head_size
                                       - len(receive_head_buffer),
                                       socket.MSG_DONTWAIT))
    if len(receive_head_buffer) == receive_head_size:
      receive_state = RECEIVE_BODY
      receive_body_buffer = ''
      receive_body_size = load_short(receive_head_buffer[1:3])
  if receive_state == RECEIVE_BODY:
    receive_body_buffer = (receive_body_buffer
                           + conn.recv(receive_body_size
                                       - len(receive_body_buffer),
                                       socket.MSG_DONTWAIT))
    if len(receive_body_buffer) == receive_body_size:
548 549 550 551 552
      try:
        receive_parse_body()
      finally:
        receive_state = RECEIVE_HEAD
        receive_head_buffer = ''
553 554

def receive_parse_body():
555
  global socket_map
556 557 558
  eventType = load_char(receive_head_buffer[0:1])
  size = load_short(receive_head_buffer[1:3])
  version = load_char(receive_head_buffer[3:4])
559
  if version != magent_version:
560
    raise Exception('ERROR: Wrong version from magent: version('
561 562 563
                    + str(version) + ')')
  socketKey = receive_head_buffer[4:34];
  connectionKey = load_short(receive_head_buffer[34:36]);
564 565

  if not socket_map.has_key(socketKey):
566 567
    raise Exception('ERROR: magent event received that does not '
                    + 'correspond to a socket. Key: ' + socketKey)
568 569 570 571

  sock = socket_map[socketKey]

  if not sock.number_to_connection.has_key(connectionKey):
572 573
    raise Exception('ERROR: magent event received that does not '
                    + 'correspond to a connection. socketKey: ' + socketKey
574
                    + ' connectionKey: ' + str(connectionKey))
575
  app_connection = sock.number_to_connection[connectionKey]
576
  if eventType == EVENT_FORWARD_PATH:
577 578
    set_connection(this_ip, app_connection.dest.local_port,
                   app_connection.dest.remote_ip,
579 580
                   app_connection.dest.remote_port,
                   proto_to_string(sock.protocol) + ' ' + receive_body_buffer)
581
  elif eventType == EVENT_BACKWARD_PATH:
582 583
    set_connection(app_connection.dest.remote_ip,
                   app_connection.dest.remote_port,
584 585
                   this_ip, app_connection.dest.local_port,
                   proto_to_string(sock.protocol) + ' ' + receive_body_buffer)
586 587 588 589 590 591 592
  elif eventType == TENTATIVE_THROUGHPUT:
    # There is a throughput number, but we don't know whether the link
    # is saturated or not. If the link is not saturated, then we just
    # need to make sure that emulated bandwidth >= real
    # bandwidth. This means that we output a throughput number only if
    # it is greater than our previous measurements because we don't
    # know that bandwidth has decreased.
593 594
    # If we don't know what the bandwidth is for this host, ignore tentative
    # measurments until we find out (due to an authoritative message)
595 596 597 598 599 600

    # There is no way to know whether the link is saturated or not. If
    # it was saturated and we knew it, then the event type would be
    # AUTHORITATIVE_BANDWIDTH. Therefore, we assume that it is
    # authoritative if it is greater than the previous measurement,
    # and that it is just a throughput number if it is less.
601 602
    if int(receive_body_buffer) > app_connection.last_bandwidth:
      app_connection.last_bandwidth = int(receive_body_buffer)
603 604 605
      set_connection(this_ip, app_connection.dest.local_port,
                     app_connection.dest.remote_ip,
                     app_connection.dest.remote_port,
606 607
                     proto_to_string(sock.protocol)
                     + ' bandwidth=' + receive_body_buffer)
608
    else:
609 610
      sys.stdout.write('Recieve: Ignored TENTATIVE_THROUGHPUT for '
                       + app_connection.dest.remote_ip + ' - '
611 612
                       + receive_body_buffer + ' vs '
                       + str(app_connection.last_bandwidth)
613
                       + '\n')
614
  elif eventType == AUTHORITATIVE_BANDWIDTH and int(receive_body_buffer) > 0:
615
    # We know that the bandwidth has definitely changed. Reset everything.
616
    app_connection.last_bandwidth = int(receive_body_buffer)
617 618 619
    set_connection(this_ip, app_connection.dest.local_port,
                   app_connection.dest.remote_ip,
                   app_connection.dest.remote_port,
620 621
                   proto_to_string(sock.protocol)
                   + ' bandwidth=' + receive_body_buffer)
622
  else:
623
    raise Exception('ERROR: Unknown command type: ' + str(eventType)
624
                    + ', ' + str(receive_body_buffer));
625

626 627 628 629
##########################################################################

def set_connection(source, source_port, dest, dest_port, ending, event_type='MODIFY'):
  set_link(source, dest, 'srcport=' + source_port
630
           + ' dstport=' + dest_port + ' ' + ending, event_type)
631 632 633 634

##########################################################################

def set_link(source, dest, ending, event_type='MODIFY'):
635 636 637 638
  sys.stdout.write('Event: ' + source + ' -> ' + dest + '(' + event_type
                   + '): ' + ending + '\n')
  if is_fake:
    return 0
639
  # Create event system address tuple to identify the notification.
640 641
  # The event is not sent through the scheduler; it is sent as an
  # immediate notification.
642 643 644 645 646
  global evclient
  evtuple = address_tuple()
  evtuple.host  = ADDRESSTUPLE_ALL
  evtuple.site  = ADDRESSTUPLE_ALL
  evtuple.group = ADDRESSTUPLE_ALL
647
  evtuple.eventtype = event_type.upper() #'MODIFY'
648
  evtuple.objtype = 'LINK'
649 650 651 652 653 654 655 656 657 658 659
  evtuple.expt = this_experiment
  evtuple.objname   = emulated_to_interface[source]
  evnotification = evclient.create_notification(evtuple)
  evargs = ['DEST=' + dest, ]
  # Uppercase all arguments.
  for part in ending.split():
    arg, val = part.split('=',1)
    evargs.append(arg.upper() + '=' + val)
    pass
  evargstr = ' '.join(evargs)
  evnotification.setArguments(evargstr)
660 661
#  sys.stdout.write('event: ' + '(' + event_type.upper() + ') '
#                   + evargstr + '\n')
662 663 664
  # Must invert the return value of the notification send function as the
  # original code here returned the exit code of tevc (hence 0 means success).
  return not evclient.notify(evnotification)
665

666 667
##########################################################################

668 669
send_buffer = ''

670
def send_command(conn, key, command_id, command):
671
  global send_buffer
672
  if is_fake:
673 674
    sys.stdout.write('Command: ' + key + ' version(' + str(magent_version)
                     + ') ' + command_to_string[command_id] + '\n')
675
    return
676 677
  output = (save_char(command_id)
            + save_short(len(command))
678 679
            + save_char(magent_version)
            + key
680
            + command)
681
  send_buffer = send_buffer + output
682 683 684 685 686 687 688
  try:
    sent = conn.send(send_buffer, socket.MSG_DONTWAIT)
    send_buffer = send_buffer[sent:]
  except socket.error, inst:
    num = inst[0]
    if num != errno.EWOULDBLOCK:
      raise
689

690 691
##########################################################################

692
def load_int(str):
693 694 695 696 697
  return load_n(str, 4)

def save_int(number):
  return save_n(number, 4)

698 699
##########################################################################

700 701 702 703
def load_short(str):
  return load_n(str, 2)

def save_short(number):
704 705
  return save_n(number, 2)

706 707
##########################################################################

708 709 710 711 712
def load_char(str):
  return load_n(str, 1)

def save_char(number):
  return save_n(number, 1)
713

714 715
##########################################################################

716
def load_n(str, n):
717
  result = 0
718 719
  for i in range(n):
    result = result | ((ord(str[i]) & 0xff) << (8*(n-1-i)))
720 721
  return result

722
def save_n(number, n):
723
  result = ''
724 725
  for i in range(n):
    result = result + chr((number >> ((n-1-i)*8)) & 0xff)
726 727
  return result

728 729
##########################################################################

730 731 732 733 734 735
def ip_to_int(ip):
  ip_list = ip.split('.', 3)
  result = 0
  for ip_byte in ip_list:
    result = result << 8
    result = result | (int(ip_byte, 10) & 0xff)
736 737
  return result

738 739 740 741 742 743
def int_to_ip(num):
  ip_list = ['0', '0', '0', '0']
  for index in range(4):
    ip_list[3-index] = str(num & 0xff)
    num = num >> 8
  return '.'.join(ip_list)
744

745 746
##########################################################################

747
main_loop()