monitor.py 28.1 KB
Newer Older
Mike Hibler's avatar
Mike Hibler committed
1 2
#
# Copyright (c) 2006 University of Utah and the Flux Group.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# 
# {{{EMULAB-LICENSE
# 
# This file is part of the Emulab network testbed software.
# 
# This file is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
# 
# This file is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
# License for more details.
# 
# You should have received a copy of the GNU Affero General Public License
# along with this file.  If not, see <http://www.gnu.org/licenses/>.
# 
# }}}
Mike Hibler's avatar
Mike Hibler committed
22 23
#

24
# Monitors the application's behaviour and report to the network model.
25 26 27 28 29 30

import sys
import os
import time
import socket
import select
31
import re
32
import traceback
33
import errno
34
from optparse import OptionParser
35 36
sys.path.append("/usr/testbed/lib")
from tbevent import EventClient, address_tuple, ADDRESSTUPLE_ALL
37

38 39
EVENT_FORWARD_PATH = 0
EVENT_BACKWARD_PATH = 1
40 41 42
TENTATIVE_THROUGHPUT = 2
AUTHORITATIVE_BANDWIDTH = 3
  
43 44 45 46 47 48 49 50
NEW_CONNECTION_COMMAND = 0
TRAFFIC_MODEL_COMMAND = 1
CONNECTION_MODEL_COMMAND = 2
SENSOR_COMMAND = 3
CONNECT_COMMAND = 4
TRAFFIC_WRITE_COMMAND = 5
DELETE_CONNECTION_COMMAND = 6

51 52 53 54 55 56 57 58
command_to_string = {NEW_CONNECTION_COMMAND: 'New Connection',
                     TRAFFIC_MODEL_COMMAND: 'Traffic Model',
                     CONNECTION_MODEL_COMMAND: 'Connection Model',
                     SENSOR_COMMAND: 'Sensor',
                     CONNECT_COMMAND: 'Connect',
                     TRAFFIC_WRITE_COMMAND: 'Write',
                     DELETE_CONNECTION_COMMAND: 'Delete'}

59
NULL_SENSOR = 0
60 61 62 63 64 65
STATE_SENSOR = 1
PACKET_SENSOR = 2
DELAY_SENSOR = 3
MIN_DELAY_SENSOR = 4
MAX_DELAY_SENSOR = 5
THROUGHPUT_SENSOR = 6
66
EWMA_THROUGHPUT_SENSOR = 7
67
LEAST_SQUARES_THROUGHPUT = 8
68 69
TSTHROUGHPUT_SENSOR = 9
AVERAGE_THROUGHPUT_SENSOR = 10
70 71 72 73 74 75 76 77
UDP_STATE_SENSOR = 11
UDP_PACKET_SENSOR = 12
UDP_THROUGHPUT_SENSOR = 13
UDP_MINDELAY_SENSOR = 14
UDP_MAXDELAY_SENSOR = 15
UDP_RTT_SENSOR = 16
UDP_LOSS_SENSOR = 17
UDP_AVG_THROUGHPUT_SENSOR = 18
78 79 80 81 82 83 84 85

CONNECTION_SEND_BUFFER_SIZE = 0
CONNECTION_RECEIVE_BUFFER_SIZE = 1
CONNECTION_MAX_SEGMENT_SIZE = 2
CONNECTION_USE_NAGLES = 3

TCP_CONNECTION = 0
UDP_CONNECTION = 1
86

87 88
KEY_SIZE = 32

89 90 91 92 93 94 95
# Client connection to event server.  Make this global since we don't
# want to be continually connecting and disconnecting (and creating/destroying
# the eventsys object).  The object is actually instantiated at the top of
# the main_loop function.
TBEVENT_SERVER = "event-server.emulab.net"
evclient = None

96 97 98 99
emulated_to_real = {}
real_to_emulated = {}
emulated_to_interface = {}
ip_mapping_filename = ''
100
initial_filename = ''
101
this_experiment = ''
102 103
pid = ''
eid = ''
104
this_ip = ''
105
stub_ip = ''
106
stub_port = 0
107 108
netmon_output_version = 3
magent_version = 1
109

110 111
is_fake = False

112
class Dest:
113 114 115 116 117 118 119 120 121 122 123 124
  def __init__(self, tup=None):
    if tup == None:
      self.local_port = ''
      self.remote_ip = ''
      self.remote_port = ''
    else:
      self.local_port = tup[0]
      self.remote_ip = tup[1]
      self.remote_port = tup[2]

  def toTuple(self):
    return (self.local_port, self.remote_ip, self.remote_port)
125 126 127 128 129

class Connection:
  def __init__(self, new_dest, new_number):
    # The actual local port bound to a connection. Used for events.
    self.dest = new_dest
130 131
    # A throughput measurement is only passed on as an event if it is
    # larger than the last bandwidth from that connection
132 133

    # Initialized when a RemoteIP or SendTo command is received
134
    self.last_bandwidth = 0
135
    # Initialized when a Connect or SendTo command is received
136
    self.prev_time = 0.0
137 138 139 140
    # Initialized by start_real_connection() or lookup()
    self.number = new_number
    # Initialized by finalize_real_connection() or lookup()
    self.is_connected = False
141 142
    # Initialized when a RemoteIP is received
    self.is_valid = True
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158

class Socket:
  def __init__(self):
    # Initialized when a New is received
    self.protocol = TCP_CONNECTION
    self.dest_to_number = {}
    self.number_to_connection = {}
    # Kept consistent by lookup() and start_real_connection() and
    # finalize_real_connection()
    self.count = 0
    # Initialized by SO_RCVBUF and SO_SNDBUF
    self.receive_buffer_size = 0
    self.send_buffer_size = 0

# Searches for a dest, inserts if dest not found. Returns the connection associated with dest.
  def lookup(self, dest):
159 160 161 162
    sys.stdout.write('Looking up: ' + dest.local_port + ':'
                     + dest.remote_ip + ':' + dest.remote_port + '\n')
    if self.dest_to_number.has_key(dest.toTuple()):
      number = self.dest_to_number[dest.toTuple()]
163 164
      return self.number_to_connection[number]
    else:
165 166
      sys.stdout.write('Lookup: Failed, creating new connection\n')
      self.dest_to_number[dest.toTuple()] = self.count
167 168
      self.number_to_connection[self.count] = Connection(dest, self.count)
      self.count = self.count + 1
169
      return self.number_to_connection[self.count - 1]
170 171

initial_connection_bandwidth = {}
172 173
socket_map = {}

174 175
##########################################################################

176 177
def main_loop():
  # Initialize
178
  read_args()
179
  conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
180 181 182 183 184
  if not is_fake:
    #conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    sys.stdout.write("Connect: stub_ip is " + stub_ip + ":" + str(stub_port)
                     + "\n")
    conn.connect((stub_ip, stub_port))
185 186
  poll = select.poll()
  poll.register(sys.stdin, select.POLLIN)
187 188
  if not is_fake:
    poll.register(conn, select.POLLIN)
189 190 191 192
  done = False

  while not done:
    # Collect data until the next quanta boundary
193 194 195 196 197
    try:
      fdlist = poll.poll()
      for pos in fdlist:
        if (pos[0] == sys.stdin.fileno() and (pos[1] & select.POLLIN) != 0
            and not done):
198
          # A line of data from tcpdump is available.
199 200 201 202 203 204 205 206 207 208 209 210
          get_next_packet(conn)
        elif (pos[0] == conn.fileno() and (pos[1] & select.POLLIN) != 0
              and not done):
          # A record for change in link characteristics is available.
          receive_characteristic(conn)
        elif not done:
          raise Exception('ERROR: Unknown fd on select: fd: ' + str(pos[0])
                          + ' conn-fd: ' + str(conn.fileno()))
    except EOFError:
      sys.stdout.write('Done: Got an EOF on stdin\n')
      done = True
    except Exception, err:
211 212
      traceback.print_exc(10, sys.stderr)
      sys.stderr.write('----\n')
213

214 215
##########################################################################

216
def read_args():
217
  global ip_mapping_filename, this_experiment, this_ip, stub_ip, stub_port
218
  global pid, eid, evclient
219
  global initial_filename
220
  global is_fake
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241

  usage = "usage: %prog [options]"
  parser = OptionParser(usage=usage)
  parser.add_option("--mapping", action="store", type="string",
                    dest="ip_mapping_filename", metavar="MAPPING_FILE",
                    help="File mapping IP addresses on Emulab to those on PlanetLab (required)")
  parser.add_option("--experiment", action="store", type="string",
                    dest="this_experiment", metavar="EXPERIMENT_NAME",
                    help="Experiment name of the form pid/eid (required)")
  parser.add_option("--ip", action="store", type="string",
                    dest="this_ip", metavar="X.X.X.X",
                    help="The IP address the monitor will use (required)")
  parser.add_option("--stub-ip", action="store", type="string",
                    dest="stub_ip", metavar="X.X.X.X",
                    help="The IP address of the stub (if not specified, defaults to the one in the ip mapping)")
  parser.add_option("--stub-port", action="store", type="int",
                    dest="stub_port", default=4200,
                    help="The port used to connect to the stub (defaults to 4200)")
  parser.add_option("--initial", action="store", type="string",
                    dest="initial_filename", metavar="INITIAL_CONDITIONS_FILE",
                    help="File giving initial conditions for connections (defaults to no shaping)")
242
  parser.add_option("--fake", action="store_true", dest="is_fake")
243 244 245 246 247 248 249 250

  (options, args) = parser.parse_args()
  ip_mapping_filename = options.ip_mapping_filename
  this_experiment = options.this_experiment
  this_ip = options.this_ip
  stub_ip = options.stub_ip
  stub_port = options.stub_port
  initial_filename = options.initial_filename
251 252 253 254 255 256
  is_fake = options.is_fake

  if is_fake:
    sys.stderr.write('***FAKE***\n')
  else:
    sys.stderr.write('***REAL***\n')
257 258 259 260 261 262 263 264 265 266 267 268 269 270

  if len(args) != 0:
    parser.print_help()
    parser.error("Invalid argument(s): " + str(args))
  if ip_mapping_filename == None:
    parser.print_help()
    parser.error("Missing --mapping=MAPPING_FILE option")
  if this_experiment == None:
    parser.print_help()
    parser.error("Missing --experiment=EXPERIMENT_NAME option")
  if this_ip == None:
    parser.print_help()
    parser.error("Missing --ip=X.X.X.X option")

271 272 273 274 275 276 277
  (pid,eid) = this_experiment.split('/')

  # XXX: The name of the event server is hardwired here, and due to the
  #      lack of autoconf, isn't even an autoconf var.  Could add a command
  #      line option to set it if needed.
  # XXX: This HAS to be done before here, becuase read_args can fire
  #      off set_link().
278 279 280 281 282
  #
  if not is_fake:
    evclient = EventClient(server=TBEVENT_SERVER,
                           keyfile="/proj/%s/exp/%s/tbdata/eventkey"
                           % (pid,eid))
283

284 285
  populate_ip_tables()
  if stub_ip == None:
286 287 288 289 290 291 292 293 294
    if emulated_to_real.has_key(this_ip):
      sys.stdout.write('Init: stub_ip was None before. '
                       + 'Using ip-mapping to find stub.\n')
      stub_ip = emulated_to_real[this_ip]
    elif is_fake:
      stub_ip = 'stub_ip'
    else:
      raise Exception('ERROR: No stub_ip was given. '
                      + 'Could not look one up in ip_mapping.')
295 296
  if initial_filename != None:
    read_initial_conditions()
297

298 299
##########################################################################

300 301 302 303 304 305 306 307 308 309 310
def populate_ip_tables():
  input = open(ip_mapping_filename, 'r')
  line = input.readline()
  while line != '':
    fields = line.strip().split(' ', 2)
    if len(fields) == 3:
      emulated_to_real[fields[0]] = fields[1]
      real_to_emulated[fields[1]] = fields[0]
      emulated_to_interface[fields[0]] = fields[2]
    line = input.readline()

311 312
##########################################################################

313 314 315
# Format of an initial conditions file is:
#
# List of lines, where each line is of the format:
316
# <source-ip> <dest-ip> <bandwidth> <delay> <loss>
317 318 319 320 321
#
# Where source and dest ip addresses are in x.x.x.x format, and delay and
# bandwidth are integral values in milliseconds and kilobits per second
# respectively.
def read_initial_conditions():
322
  global initial_connection_bandwidth
323 324 325
  input = open(initial_filename, 'r')
  line = input.readline()
  while line != '':
326 327 328
    # Don't worry about loss for now. Just discard the value.
    fields = line.strip().split(' ', 4)
    if len(fields) == 5 and fields[0] == this_ip:
329 330
      set_link(fields[0], fields[1], 'delay=' + str(int(fields[3])/2))
      set_link(fields[0], fields[1], 'bandwidth=' + fields[2])
331
      initial_connection_bandwidth[fields[1]] = int(fields[2])
332 333
    line = input.readline()

334 335
##########################################################################

336
def get_next_packet(conn):
337
  line = sys.stdin.readline()
338 339
  if line == "":
      raise EOFError
340
  if netmon_output_version == 3:
341
    linexp = re.compile('^(New|RemoteIP|RemotePort|LocalPort|TCP_NODELAY|TCP_MAXSEG|SO_RCVBUF|SO_SNDBUF|Connected|Accepted|Send|SendTo|Closed): ([^ ]*) (\d+\.\d+) ([^ ]*)\n$')
342
  else:
343
    raise Exception("ERROR: Only input version 3 is supported")
344
  match = linexp.match(line)
345
  if match:
346 347 348 349 350 351
    event = match.group(1)
    key = match.group(2)
    timestamp = float(match.group(3))
    value = match.group(4)
    process_event(conn, event, key, timestamp, value)
  else:
352
    sys.stderr.write('ERROR: skipped line in the wrong format: ' + line + '\n')
353 354 355 356 357 358

##########################################################################

def process_event(conn, event, key, timestamp, value):
  global socket_map
  if len(key) > KEY_SIZE - 2:
359 360
    sys.stderr.write('ERROR: Event has a key with > KEY_SIZE - 2 characters.'
                     + ' Truncating: ' + key + '\n')
361 362 363 364 365
    key = key[:KEY_SIZE - 2]
  key = key.ljust(KEY_SIZE - 2)
  if event == 'New':
    socket_map[key] = Socket()
  if not socket_map.has_key(key):
366 367
    raise Exception('ERROR: libnetmon event received that does not '
                    + 'correspond to a socket. Key: ' + key)
368 369 370 371 372 373
  sock = socket_map[key]
  if event == 'New':
    if value == 'TCP':
      sock.protocol = TCP_CONNECTION
    elif value == 'UDP':
      sock.protocol = UDP_CONNECTION
374
      sys.stderr.write('Notify: Set socket to UDP_CONNECTION: ' + key + '\n')
375 376
    else:
      sock.protocol = TCP_CONNECTION
377 378
      sys.stderr.write('ERROR: Received "New" event with invalid protocol.'
                       'Assuming TCP_CONNECTION. Protocol: ' + value + '\n')
379 380 381
  elif event == 'RemoteIP':
    start_real_connection(sock)
    app_connection = sock.number_to_connection[0]
382 383
    if emulated_to_real.has_key(value):
      app_connection.is_valid = True
384 385
      app_connection.dest.remote_ip = value
    else:
386 387
      sys.stderr.write('ERROR: Connection invalid: ' + key + ': ' + value
                       + '\n')
388
      app_connection.is_valid = False
389 390 391 392
    finalize_real_connection(conn, key, sock, app_connection)
    if initial_connection_bandwidth.has_key(value):
      sock.number_to_connection[0].last_bandwidth = initial_connection_bandwidth[value]
    else:
393
      sys.stderr.write("ERROR: No initial condition for " + value + "\n")
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
      sock.number_to_connection[0].last_bandwidth = 0
  elif event == 'RemotePort':
    start_real_connection(sock)
    sock.number_to_connection[0].dest.remote_port = value
    app_connection = sock.number_to_connection[0]
    finalize_real_connection(conn, key, sock, app_connection)
  elif event == 'LocalPort':
    start_real_connection(sock)
    sock.number_to_connection[0].dest.local_port = value
    app_connection = sock.number_to_connection[0]
    finalize_real_connection(conn, key, sock, app_connection)
  elif event == 'SO_RCVBUF':
    sock.receive_buffer_size = int(value)
  elif event == 'SO_SNDBUF':
    sock.send_buffer_size = int(value)
409
  elif event == 'Connected' or event == 'Accepted':
410 411 412 413
    if sock.protocol != UDP_CONNECTION:
      app_connection = sock.number_to_connection[0]
      finalize_real_connection(conn, key, sock, app_connection)
      sock.number_to_connection[0].prev_time = timestamp
414 415
  elif event == 'Send':
    app_connection = sock.number_to_connection[0]
416 417 418
    if app_connection.is_valid:
      send_write(conn, key, timestamp, app_connection, value)
      app_connection.prev_time = timestamp
419 420
    else:
      sys.stderr.write('Invalid Send\n')
421 422 423 424 425 426 427 428 429 430 431
  elif event == 'SendTo':
    # If this is a 'new connection' as well, then do not actually send
    # the sendto command. This is so that there is no '0' delta, and
    # so that trivial (single packet) connections do cause wierd
    # measurements.
    regexp = re.compile('^(\d+):(\d+\.\d+\.\d+\.\d+):(\d+):(\d+)')
    match = regexp.match(value)
    if match:
      dest = Dest()
      dest.local_port = match.group(1)
      dest.remote_ip = match.group(2)
432
      dest.remote_port = match.group(3)
433
      size = int(match.group(4))
434 435 436 437 438 439 440 441 442
      if emulated_to_real.has_key(dest.remote_ip):
        app_connection = sock.lookup(dest)
        if app_connection.is_connected == False:
          if initial_connection_bandwidth.has_key(app_connection.dest.remote_ip):
            app_connection.last_bandwidth = initial_connection_bandwidth[app_connection.dest.remote_ip]
          else:
            app_connection.last_bandwidth = 0
          app_connection.prev_time = timestamp
          app_connection.is_connected = True
443 444 445 446
          set_connection(this_ip, app_connection.dest.local_port,
                         app_connection.dest.remote_ip,
                         app_connection.dest.remote_port,
                         proto_to_string(sock.protocol), 'CREATE')
447 448
          send_connect(conn, key + save_short(app_connection.number),
                       sock, app_connection)
449
        else:
450 451
          send_write(conn, key, timestamp, app_connection, size)
          app_connection.prev_time = timestamp
452 453
  elif event == 'Closed':
    for pos in sock.number_to_connection.itervalues():
454 455 456 457 458 459
      if pos.is_valid:
        send_command(conn, key + save_short(pos.number),
                     DELETE_CONNECTION_COMMAND, '')
        set_connection(this_ip, pos.dest.local_port,
                       pos.dest.remote_ip, pos.dest.remote_port,
                       proto_to_string(sock.protocol), 'CLEAR')
460
  else:
461 462
    sys.stderr.write('ERROR: skipped line with an invalid event: '
                     + event + '\n')
463

464 465 466 467 468 469 470 471 472
def proto_to_string(proto):
  if proto == TCP_CONNECTION:
    return 'protocol=tcp'
  elif proto == UDP_CONNECTION:
    return 'protocol=udp'
  else:
    sys.stderr.write('ERROR: Invalid protocol received in proto_to_string: '
                     + str(proto) + '\n')
    return 'protocol=tcp'
473 474
##########################################################################
  
475 476 477 478 479
def start_real_connection(sock):
  if sock.count == 0:
    sock.number_to_connection[0] = Connection(Dest(), 0)
    sock.count = 1

480 481
##########################################################################
    
482 483 484 485 486 487 488
def finalize_real_connection(conn, key, sock, app_connection):
  dest = sock.number_to_connection[0].dest
  if (dest.local_port != ''
      and dest.remote_port != ''
      and dest.remote_ip != ''
      and not sock.number_to_connection[0].is_connected):
    sock.number_to_connection[0].is_connected = True
489
    sock.dest_to_number[dest.toTuple()] = 0
490 491 492 493 494 495
    if app_connection.is_valid:
      set_connection(this_ip, app_connection.dest.local_port,
                     app_connection.dest.remote_ip,
                     app_connection.dest.remote_port,
                     proto_to_string(sock.protocol), 'CREATE')
      send_connect(conn, key + save_short(0), sock, app_connection)
496

497 498
##########################################################################
    
499 500 501 502 503 504 505 506 507
def send_connect(conn, key, sock, app_connection):
  if sock.protocol == TCP_CONNECTION:
    min_delay = MIN_DELAY_SENSOR
    max_delay = MAX_DELAY_SENSOR
    average_throughput = AVERAGE_THROUGHPUT_SENSOR
  elif sock.protocol == UDP_CONNECTION:
    min_delay = UDP_MINDELAY_SENSOR
    max_delay = UDP_MAXDELAY_SENSOR
    average_throughput = UDP_AVG_THROUGHPUT_SENSOR
508
  else:
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
    return
  send_command(conn, key, NEW_CONNECTION_COMMAND,
               save_char(sock.protocol))
  send_command(conn, key, TRAFFIC_MODEL_COMMAND, '')
  send_command(conn, key, SENSOR_COMMAND, save_int(min_delay))
  send_command(conn, key, SENSOR_COMMAND, save_int(max_delay))
  #send_command(conn, key, SENSOR_COMMAND, save_int(NULL_SENSOR))
  #send_command(conn, key, SENSOR_COMMAND, save_int(EWMA_THROUGHPUT_SENSOR)
  send_command(conn, key, SENSOR_COMMAND,
               save_int(average_throughput))
  send_command(conn, key, CONNECTION_MODEL_COMMAND,
               save_int(CONNECTION_RECEIVE_BUFFER_SIZE)
               + save_int(sock.receive_buffer_size))
  send_command(conn, key, CONNECTION_MODEL_COMMAND,
               save_int(CONNECTION_SEND_BUFFER_SIZE)
               + save_int(sock.send_buffer_size))
  send_command(conn, key, CONNECT_COMMAND,
               save_int(ip_to_int(
                 emulated_to_real[app_connection.dest.remote_ip])))

def send_write(conn, key, timestamp, app_connection, size):
  send_command(conn, key + save_short(app_connection.number),
               TRAFFIC_WRITE_COMMAND,
               save_int(int((timestamp - app_connection.prev_time)*1000))
               + save_int(int(size)))
534 535 536

##########################################################################

537 538 539 540 541 542 543 544 545
RECEIVE_HEAD = 0
RECEIVE_BODY = 1

receive_head_buffer = ''
receive_body_buffer = ''
receive_state = RECEIVE_HEAD
receive_head_size = 36
receive_body_size = 0

546
def receive_characteristic(conn):
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
  global receive_head_buffer, receive_body_buffer, receive_state
  global receive_body_size

  if receive_state == RECEIVE_HEAD:
    receive_head_buffer = (receive_head_buffer
                           + conn.recv(receive_head_size
                                       - len(receive_head_buffer),
                                       socket.MSG_DONTWAIT))
    if len(receive_head_buffer) == receive_head_size:
      receive_state = RECEIVE_BODY
      receive_body_buffer = ''
      receive_body_size = load_short(receive_head_buffer[1:3])
  if receive_state == RECEIVE_BODY:
    receive_body_buffer = (receive_body_buffer
                           + conn.recv(receive_body_size
                                       - len(receive_body_buffer),
                                       socket.MSG_DONTWAIT))
    if len(receive_body_buffer) == receive_body_size:
565 566 567 568 569
      try:
        receive_parse_body()
      finally:
        receive_state = RECEIVE_HEAD
        receive_head_buffer = ''
570 571

def receive_parse_body():
572
  global socket_map
573 574 575
  eventType = load_char(receive_head_buffer[0:1])
  size = load_short(receive_head_buffer[1:3])
  version = load_char(receive_head_buffer[3:4])
576
  if version != magent_version:
577
    raise Exception('ERROR: Wrong version from magent: version('
578 579 580
                    + str(version) + ')')
  socketKey = receive_head_buffer[4:34];
  connectionKey = load_short(receive_head_buffer[34:36]);
581 582

  if not socket_map.has_key(socketKey):
583 584
    raise Exception('ERROR: magent event received that does not '
                    + 'correspond to a socket. Key: ' + socketKey)
585 586 587 588

  sock = socket_map[socketKey]

  if not sock.number_to_connection.has_key(connectionKey):
589 590
    raise Exception('ERROR: magent event received that does not '
                    + 'correspond to a connection. socketKey: ' + socketKey
591
                    + ' connectionKey: ' + str(connectionKey))
592
  app_connection = sock.number_to_connection[connectionKey]
593
  if eventType == EVENT_FORWARD_PATH:
594 595
    set_connection(this_ip, app_connection.dest.local_port,
                   app_connection.dest.remote_ip,
596 597
                   app_connection.dest.remote_port,
                   proto_to_string(sock.protocol) + ' ' + receive_body_buffer)
598
  elif eventType == EVENT_BACKWARD_PATH:
599 600
    set_connection(app_connection.dest.remote_ip,
                   app_connection.dest.remote_port,
601 602
                   this_ip, app_connection.dest.local_port,
                   proto_to_string(sock.protocol) + ' ' + receive_body_buffer)
603 604 605 606 607 608 609
  elif eventType == TENTATIVE_THROUGHPUT:
    # There is a throughput number, but we don't know whether the link
    # is saturated or not. If the link is not saturated, then we just
    # need to make sure that emulated bandwidth >= real
    # bandwidth. This means that we output a throughput number only if
    # it is greater than our previous measurements because we don't
    # know that bandwidth has decreased.
610 611
    # If we don't know what the bandwidth is for this host, ignore tentative
    # measurments until we find out (due to an authoritative message)
612 613 614 615 616 617

    # There is no way to know whether the link is saturated or not. If
    # it was saturated and we knew it, then the event type would be
    # AUTHORITATIVE_BANDWIDTH. Therefore, we assume that it is
    # authoritative if it is greater than the previous measurement,
    # and that it is just a throughput number if it is less.
618 619
    if int(receive_body_buffer) > app_connection.last_bandwidth:
      app_connection.last_bandwidth = int(receive_body_buffer)
620 621 622
      set_connection(this_ip, app_connection.dest.local_port,
                     app_connection.dest.remote_ip,
                     app_connection.dest.remote_port,
623 624
                     proto_to_string(sock.protocol)
                     + ' bandwidth=' + receive_body_buffer)
625
    else:
626 627
      sys.stdout.write('Recieve: Ignored TENTATIVE_THROUGHPUT for '
                       + app_connection.dest.remote_ip + ' - '
628 629
                       + receive_body_buffer + ' vs '
                       + str(app_connection.last_bandwidth)
630
                       + '\n')
631
  elif eventType == AUTHORITATIVE_BANDWIDTH and int(receive_body_buffer) > 0:
632
    # We know that the bandwidth has definitely changed. Reset everything.
633
    app_connection.last_bandwidth = int(receive_body_buffer)
634 635 636
    set_connection(this_ip, app_connection.dest.local_port,
                   app_connection.dest.remote_ip,
                   app_connection.dest.remote_port,
637 638
                   proto_to_string(sock.protocol)
                   + ' bandwidth=' + receive_body_buffer)
639
  else:
640
    raise Exception('ERROR: Unknown command type: ' + str(eventType)
641
                    + ', ' + str(receive_body_buffer));
642

643 644 645 646
##########################################################################

def set_connection(source, source_port, dest, dest_port, ending, event_type='MODIFY'):
  set_link(source, dest, 'srcport=' + source_port
647
           + ' dstport=' + dest_port + ' ' + ending, event_type)
648 649 650 651

##########################################################################

def set_link(source, dest, ending, event_type='MODIFY'):
652 653 654 655
  sys.stdout.write('Event: ' + source + ' -> ' + dest + '(' + event_type
                   + '): ' + ending + '\n')
  if is_fake:
    return 0
656
  # Create event system address tuple to identify the notification.
657 658
  # The event is not sent through the scheduler; it is sent as an
  # immediate notification.
659 660 661 662 663
  global evclient
  evtuple = address_tuple()
  evtuple.host  = ADDRESSTUPLE_ALL
  evtuple.site  = ADDRESSTUPLE_ALL
  evtuple.group = ADDRESSTUPLE_ALL
664
  evtuple.eventtype = event_type.upper() #'MODIFY'
665
  evtuple.objtype = 'LINK'
666 667 668 669 670 671 672 673 674 675 676
  evtuple.expt = this_experiment
  evtuple.objname   = emulated_to_interface[source]
  evnotification = evclient.create_notification(evtuple)
  evargs = ['DEST=' + dest, ]
  # Uppercase all arguments.
  for part in ending.split():
    arg, val = part.split('=',1)
    evargs.append(arg.upper() + '=' + val)
    pass
  evargstr = ' '.join(evargs)
  evnotification.setArguments(evargstr)
677 678
#  sys.stdout.write('event: ' + '(' + event_type.upper() + ') '
#                   + evargstr + '\n')
679 680 681
  # Must invert the return value of the notification send function as the
  # original code here returned the exit code of tevc (hence 0 means success).
  return not evclient.notify(evnotification)
682

683 684
##########################################################################

685 686
send_buffer = ''

687
def send_command(conn, key, command_id, command):
688
  global send_buffer
689
  if is_fake:
690 691
    sys.stdout.write('Command: ' + key + ' version(' + str(magent_version)
                     + ') ' + command_to_string[command_id] + '\n')
692
    return
693 694
  output = (save_char(command_id)
            + save_short(len(command))
695 696
            + save_char(magent_version)
            + key
697
            + command)
698
  send_buffer = send_buffer + output
699 700 701 702 703 704 705
  try:
    sent = conn.send(send_buffer, socket.MSG_DONTWAIT)
    send_buffer = send_buffer[sent:]
  except socket.error, inst:
    num = inst[0]
    if num != errno.EWOULDBLOCK:
      raise
706

707 708
##########################################################################

709
def load_int(str):
710 711 712 713 714
  return load_n(str, 4)

def save_int(number):
  return save_n(number, 4)

715 716
##########################################################################

717 718 719 720
def load_short(str):
  return load_n(str, 2)

def save_short(number):
721 722
  return save_n(number, 2)

723 724
##########################################################################

725 726 727 728 729
def load_char(str):
  return load_n(str, 1)

def save_char(number):
  return save_n(number, 1)
730

731 732
##########################################################################

733
def load_n(str, n):
734
  result = 0
735 736
  for i in range(n):
    result = result | ((ord(str[i]) & 0xff) << (8*(n-1-i)))
737 738
  return result

739
def save_n(number, n):
740
  result = ''
741 742
  for i in range(n):
    result = result + chr((number >> ((n-1-i)*8)) & 0xff)
743 744
  return result

745 746
##########################################################################

747 748 749 750 751 752
def ip_to_int(ip):
  ip_list = ip.split('.', 3)
  result = 0
  for ip_byte in ip_list:
    result = result << 8
    result = result | (int(ip_byte, 10) & 0xff)
753 754
  return result

755 756 757 758 759 760
def int_to_ip(num):
  ip_list = ['0', '0', '0', '0']
  for index in range(4):
    ip_list[3-index] = str(num & 0xff)
    num = num >> 8
  return '.'.join(ip_list)
761

762 763
##########################################################################

764
main_loop()