monitor.py 16.3 KB
Newer Older
1
2
3
4
5
6
7
8
#Usage: tcpdump | python monitor.py <mapping-file> <experiment-name>
#                                   <my-address> [stub-address]
# mapping-file is a file which maps emulated addresses to real addresses.
# experiment-name is the project/experiment name on emulab. For instance
#   'tbres/pelab'.
# my-address is the IP address of the current node.
# stub-address is the IP address of the corresponding planet-lab node. If this
#   is left out, then the mapping-file is used to determine it.
9
10
11
12
13
14

import sys
import os
import time
import socket
import select
15
import re
16
from optparse import OptionParser
17

18
19
EVENT_FORWARD_PATH = 0
EVENT_BACKWARD_PATH = 1
20
21
22
TENTATIVE_THROUGHPUT = 2
AUTHORITATIVE_BANDWIDTH = 3
  
23
24
25
26
27
28
29
30
31
NEW_CONNECTION_COMMAND = 0
TRAFFIC_MODEL_COMMAND = 1
CONNECTION_MODEL_COMMAND = 2
SENSOR_COMMAND = 3
CONNECT_COMMAND = 4
TRAFFIC_WRITE_COMMAND = 5
DELETE_CONNECTION_COMMAND = 6

NULL_SENSOR = 0
32
33
34
35
36
37
STATE_SENSOR = 1
PACKET_SENSOR = 2
DELAY_SENSOR = 3
MIN_DELAY_SENSOR = 4
MAX_DELAY_SENSOR = 5
THROUGHPUT_SENSOR = 6
38
EWMA_THROUGHPUT_SENSOR = 7
39
40
41
42
43
44
45
46

CONNECTION_SEND_BUFFER_SIZE = 0
CONNECTION_RECEIVE_BUFFER_SIZE = 1
CONNECTION_MAX_SEGMENT_SIZE = 2
CONNECTION_USE_NAGLES = 3

TCP_CONNECTION = 0
UDP_CONNECTION = 1
47

48
49
50
51
emulated_to_real = {}
real_to_emulated = {}
emulated_to_interface = {}
ip_mapping_filename = ''
52
initial_filename = ''
53
54
this_experiment = ''
this_ip = ''
55
stub_ip = ''
56
stub_port = 0
57
netmon_output_version = 2
58

59
60
61
62
# A throughput measurement is only passed on as an event if it is
# larger than the last bandwidth from that connection
connection_bandwidth = {}

63
64
total_size = 0
last_total = -1
65
prev_time = 0.0
66
67

def main_loop():
68
  global total_size, last_total
69
  # Initialize
70
  read_args()
71
  conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
72
  conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
73
  sys.stdout.write("stub_ip is " + stub_ip + ":" + str(stub_port) + "\n")
74
  sys.stdout.flush()
75
  conn.connect((stub_ip, stub_port))
76
77
78
79
80
81
82
  poll = select.poll()
  poll.register(sys.stdin, select.POLLIN)
  poll.register(conn, select.POLLIN)
  done = False

  while not done:
    # Reset
83
#    max_time = time.time() + quanta
84
85

    # Collect data until the next quanta boundary
86
87
88
    fdlist = poll.poll()
    for pos in fdlist:
      if pos[0] == sys.stdin.fileno() and (pos[1] & select.POLLIN) != 0 and not done:
89
          # A line of data from tcpdump is available.
90
          try:
91
#            sys.stdout.write('get_next_packet()\n')
92
            get_next_packet(conn)
93
          except EOFError:
94
            sys.stdout.write('Done: Got EOF on stdin\n')
95
            done = 1
96
97
98
99
100
101
      elif pos[0] == conn.fileno() and (pos[1] & select.POLLIN) != 0 and not done:
        sys.stdout.write('receive_characteristic()\n')
        # A record for change in link characteristics is available.
        done = not receive_characteristic(conn)
      elif not done:
        sys.stdout.write('fd: ' + str(pos[0]) + ' conn-fd: ' + str(conn.fileno()) + '\n')
102
      # Update the stub
103
    if total_size != last_total:
104
#      sys.stdout.write('Total Size: ' + str(total_size) + '\n')
105
106
      last_total = total_size
#    sys.stdout.write('Loop end\n')
107

108
def read_args():
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
  global ip_mapping_filename, this_experiment, this_ip, stub_ip, stub_port
  global initial_filename

  usage = "usage: %prog [options]"
  parser = OptionParser(usage=usage)
  parser.add_option("--mapping", action="store", type="string",
                    dest="ip_mapping_filename", metavar="MAPPING_FILE",
                    help="File mapping IP addresses on Emulab to those on PlanetLab (required)")
  parser.add_option("--experiment", action="store", type="string",
                    dest="this_experiment", metavar="EXPERIMENT_NAME",
                    help="Experiment name of the form pid/eid (required)")
  parser.add_option("--ip", action="store", type="string",
                    dest="this_ip", metavar="X.X.X.X",
                    help="The IP address the monitor will use (required)")
  parser.add_option("--stub-ip", action="store", type="string",
                    dest="stub_ip", metavar="X.X.X.X",
                    help="The IP address of the stub (if not specified, defaults to the one in the ip mapping)")
  parser.add_option("--stub-port", action="store", type="int",
                    dest="stub_port", default=4200,
                    help="The port used to connect to the stub (defaults to 4200)")
  parser.add_option("--initial", action="store", type="string",
                    dest="initial_filename", metavar="INITIAL_CONDITIONS_FILE",
                    help="File giving initial conditions for connections (defaults to no shaping)")

  (options, args) = parser.parse_args()
  ip_mapping_filename = options.ip_mapping_filename
  this_experiment = options.this_experiment
  this_ip = options.this_ip
  stub_ip = options.stub_ip
  stub_port = options.stub_port
  initial_filename = options.initial_filename

  if len(args) != 0:
    parser.print_help()
    parser.error("Invalid argument(s): " + str(args))
  if ip_mapping_filename == None:
    parser.print_help()
    parser.error("Missing --mapping=MAPPING_FILE option")
  if this_experiment == None:
    parser.print_help()
    parser.error("Missing --experiment=EXPERIMENT_NAME option")
  if this_ip == None:
    parser.print_help()
    parser.error("Missing --ip=X.X.X.X option")

  populate_ip_tables()
  if stub_ip == None:
    sys.stdout.write('stub_ip was None before\n')
    stub_ip = emulated_to_real[this_ip]
  sys.stdout.write('stub_ip: ' + stub_ip + '\n')
  sys.stdout.write('this_ip: ' + this_ip + '\n')
  sys.stdout.write('emulated_to_real: ' + str(emulated_to_real) + '\n')
  if initial_filename != None:
    read_initial_conditions()
163
164
165
166
167
168
169
170
171
172
173
174

def populate_ip_tables():
  input = open(ip_mapping_filename, 'r')
  line = input.readline()
  while line != '':
    fields = line.strip().split(' ', 2)
    if len(fields) == 3:
      emulated_to_real[fields[0]] = fields[1]
      real_to_emulated[fields[1]] = fields[0]
      emulated_to_interface[fields[0]] = fields[2]
    line = input.readline()

175
176
177
# Format of an initial conditions file is:
#
# List of lines, where each line is of the format:
178
# <source-ip> <dest-ip> <delay> <bandwidth> <loss>
179
180
181
182
183
184
185
186
#
# Where source and dest ip addresses are in x.x.x.x format, and delay and
# bandwidth are integral values in milliseconds and kilobits per second
# respectively.
def read_initial_conditions():
  input = open(initial_filename, 'r')
  line = input.readline()
  while line != '':
187
188
189
    # Don't worry about loss for now. Just discard the value.
    fields = line.strip().split(' ', 4)
    if len(fields) == 5 and fields[0] == this_ip:
190
191
192
193
194
      set_link(fields[0], fields[1], 'delay=' + str(int(fields[2])/2))
      set_link(fields[0], fields[1], 'bandwidth=' + fields[3])
      connection_bandwidth[fields[1]] = int(fields[3])
    line = input.readline()

195
196
def get_next_packet(conn):
  global total_size, last_total, prev_time
197
  line = sys.stdin.readline()
198
199
  if line == "":
      raise EOFError
200
201
202
  #
  # Check for a packet from netmond
  #
203
204
  # Could move this elsewhere to avoid re-compiling, but I'd like to keep it
  # with this code for better readability
205
  if netmon_output_version == 1:
206
    linexp = re.compile('^(\d+\.\d+) > (\d+\.\d+\.\d+\.\d+)\.(\d+) (\((\d+)\))?')
207
  elif netmon_output_version == 2:
208
    linexp = re.compile('^(\d+\.\d+) > (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) (\((\d+)\))?')
209

210
  match = linexp.match(line)
211
  conexp = re.compile('^(New|Connected|Closed|SO_RCVBUF|SO_SNDBUF): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+)( ((?:\d+)(?:\.(?:\d+))?))?')
212
  cmatch = conexp.match(line)
213
  if match:
214
215
216
217
218
219
220
221
222
223
224
225
226
227
    localport = 0 # We may not get this one
    if netmon_output_version == 1:
      time = float(match.group(1))
      ipaddr = match.group(2)
      remoteport = int(match.group(3))
      size_given = match.group(4) != ''
      size = int(match.group(5))
    elif (netmon_output_version == 2):
      time = float(match.group(1))
      localport = int(match.group(2))
      ipaddr = match.group(3)
      remoteport = int(match.group(4))
      size_given = match.group(5) != ''
      size = int(match.group(6))
228
229
230

      #sys.stdout.write('dest: ' + ipaddr + ' destport: ' + str(remoteport) +
      #        ' srcport: ' + str(localport) + ' size: ' + str(size) + '\n')
231
      if not size_given:
232
        size = 0
233
      if emulated_to_real.has_key(ipaddr):
234
235
236
237
238
        total_size = total_size + size
        send_command(conn, TRAFFIC_WRITE_COMMAND, TCP_CONNECTION, ipaddr,
                     localport, remoteport,
                     save_int(int((time - prev_time)*1000)) + save_int(size))
        prev_time = time
239
240
241
242
243
  elif ((netmon_output_version == 2) and cmatch):
      #
      # Watch for new or closed connections
      #
      event = cmatch.group(1)
244
      localport = int(cmatch.group(2))
245
      ipaddr = cmatch.group(3)
246
247
      remoteport = int(cmatch.group(4))
      value_given = cmatch.group(5) != ''
248
      value = cmatch.group(6)
249
      if not value_given:
250
251
        value = '0'

252
      if emulated_to_real.has_key(ipaddr):
253
254
255
256
257
258
259
260
        sys.stdout.write("Got a connection event: " + event + "\n")
        if event == 'New':
          send_command(conn, NEW_CONNECTION_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport, '')
          send_command(conn, TRAFFIC_MODEL_COMMAND, TCP_CONNECTION, ipaddr,
                       localport, remoteport, '')
          send_command(conn, SENSOR_COMMAND, TCP_CONNECTION, ipaddr,
                       localport, remoteport, save_int(MIN_DELAY_SENSOR))
261
          send_command(conn, SENSOR_COMMAND, TCP_CONNECTION, ipaddr,
262
263
264
265
266
#                       localport, remoteport, save_int(NULL_SENSOR))
#          send_command(conn, SENSOR_COMMAND, TCP_CONNECTION, ipaddr,
                       localport, remoteport, save_int(MAX_DELAY_SENSOR))
          send_command(conn, SENSOR_COMMAND, TCP_CONNECTION, ipaddr,
                       localport, remoteport, save_int(EWMA_THROUGHPUT_SENSOR))
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
        elif event == 'Closed':
          send_command(conn, DELETE_CONNECTION_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport, '')
        elif event == 'Connected':
          send_command(conn, CONNECT_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport, '')
          prev_time = float(value)
#        elif event == 'LocalPort':
        elif event == 'SO_RCVBUF':
          send_command(conn, CONNECTION_MODEL_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport,
                      save_int(CONNECTION_RECEIVE_BUFFER_SIZE)
                      + save_int(int(value)))
        elif event == 'SO_SNDBUF':
          send_command(conn, CONNECTION_MODEL_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport,
                      save_int(CONNECTION_SEND_BUFFER_SIZE)
                       + save_int(int(value)))
        else:
          sys.stdout.write('skipped line with an invalid event: '
                           + event + '\n')
288
      else:
289
290
        sys.stdout.write('skipped line with an invalid destination: '
                         + ipaddr + '\n')
291
  else:
292
      sys.stdout.write('skipped line in the wrong format: ' + line)
293
    
294
def receive_characteristic(conn):
295
  global connection_bandwidth
296
297
298
299
  buf = conn.recv(12)
  if len(buf) != 12:
    sys.stdout.write('Event header is the wrong size. Length: '
                     + str(len(buf)) + '\n')
300
    return False
301
302
303
304
305
306
307
308
309
310
  eventType = load_char(buf[0:1]);
  size = load_short(buf[1:3]);
  transport = load_char(buf[3:4]);
  dest = real_to_emulated[int_to_ip(load_int(buf[4:8]))]
  source_port = load_short(buf[8:10])
  dest_port = load_short(buf[10:12])
  buf = conn.recv(size);
  if len(buf) != size:
    sys.stdout.write('Event body is the wrong size.\n')
    return False
311
  if eventType == EVENT_FORWARD_PATH:
312
    set_link(this_ip, dest, buf)
313
  elif eventType == EVENT_BACKWARD_PATH:
314
    set_link(dest, this_ip, buf)
315
316
317
318
319
320
321
322
323
324
325
326
327
328
  elif eventType == TENTATIVE_THROUGHPUT:
    # There is a throughput number, but we don't know whether the link
    # is saturated or not. If the link is not saturated, then we just
    # need to make sure that emulated bandwidth >= real
    # bandwidth. This means that we output a throughput number only if
    # it is greater than our previous measurements because we don't
    # know that bandwidth has decreased.
    if int(buf) > connection_bandwidth[dest]:
      connection_bandwidth[dest] = int(buf)
      set_link(dest, this_ip, 'bandwidth=' + buf)
  elif eventType == AUTHORITATIVE_BANDWIDTH:
    # We know that the bandwidth has definitely changed. Reset everything.
    connection_bandwidth[dest] = int(buf)
    set_link(dest, this_ip, 'bandwidth=' + buf)
329
330
331
  else:
    sys.stdout.write('Other: ' + str(eventType) + ', ' + str(value) + '\n');
  return True
332

333
def set_bandwidth(kbps, dest):
334
#  sys.stdout.write('<event> bandwidth=' + str(kbps) + '\n')
335
336
  now = time.time()
  sys.stderr.write('BANDWIDTH!purple\n')
337
338
339
  sys.stderr.write('BANDWIDTH!line ' + ('%0.6f' % now) + ' 0 '
                   + ('%0.6f' % now)
                   + ' ' + str(kbps*1000/8) + '\n')
340
  return set_link(this_ip, dest, 'bandwidth=' + str(kbps))
341

342
# Set delay on the link. We are given round trip time.
343
def set_delay(milliseconds, dest):
344
  now = time.time()
345
346
  sys.stderr.write('RTT!orange\n')
  sys.stderr.write('RTT!line ' + ('%0.6f' % now) + ' 0 ' + ('%0.6f' % now)
347
	+ ' ' + str(milliseconds) + '\n')
348
  # Set the delay from here to there to 1/2 rtt.
349
350
  error = set_link(this_ip, dest, 'delay=' + str(milliseconds/2))
  if error == 0:
351
    # If that succeeded, set the delay from there to here.
352
353
354
    return set_link(dest, this_ip, 'delay=' + str(milliseconds/2))
  else:
    return error
355

356
def set_loss(probability, dest):
357
358
  return set_link(this_ip, dest, 'plr=' + str(probability))

359
360
def set_max_delay(milliseconds, dest):
  return set_link(this_ip, dest, 'MAXINQ=' + str(milliseconds))
361

362
363
364
365
def set_link(source, dest, ending):
  command = ('/usr/testbed/bin/tevc -e ' + this_experiment + ' now '
             + emulated_to_interface[source] + ' modify dest=' + dest + ' '
             + ending)
366
  sys.stdout.write('event: ' + command + '\n')
367
368
  return os.system(command)

369
370
371
372
373
374
375
376
377
def send_command(conn, command_id, protocol, ipaddr, localport, remoteport,
                 command):
  output = (save_char(command_id)
            + save_short(len(command))
            + save_char(protocol)
            + save_int(ip_to_int(emulated_to_real[ipaddr]))
            + save_short(localport)
            + save_short(remoteport)
            + command)
378
#  sys.stdout.write('Sending command: CHECKSUM=' + str(checksum(output)) + '\n')
379
380
  conn.sendall(output)

381
def send_destinations(conn, packet_list):
382
#  sys.stdout.write('<send> total size:' + str(total_size) + ' packet count:' + str(len(packet_list)) + '\n')# + ' -- '
383
384
385
386
#                   + str(packet_list) + '\n')
  output = save_int(0) + save_int(len(packet_list))
  prev_time = 0.0
  if len(packet_list) > 0:
387
    prev_time = packet_list[0][3]
388
389
  for packet in packet_list:
    ip = ip_to_int(emulated_to_real[packet[0]])
390
391
    if prev_time == 0.0:
      prev_time = packet[3]
392
393
394
    delta = int((packet[3] - prev_time) * 1000)
    if packet[3] == 0:
      delta = 0
395
396
    output = (output + save_int(ip) + save_short(packet[1])
              + save_short(packet[2])
397
398
399
400
401
              + save_int(delta)
              + save_int(packet[4])
              + save_short(packet[5]))
    if packet[3] != 0:
      prev_time = packet[3]
402
403
  conn.sendall(output)

404
def load_int(str):
405
406
407
408
409
410
411
412
413
  return load_n(str, 4)

def save_int(number):
  return save_n(number, 4)

def load_short(str):
  return load_n(str, 2)

def save_short(number):
414
415
416
417
418
419
420
  return save_n(number, 2)

def load_char(str):
  return load_n(str, 1)

def save_char(number):
  return save_n(number, 1)
421
422

def load_n(str, n):
423
  result = 0
424
425
  for i in range(n):
    result = result | ((ord(str[i]) & 0xff) << (8*(n-1-i)))
426
427
  return result

428
def save_n(number, n):
429
  result = ''
430
431
  for i in range(n):
    result = result + chr((number >> ((n-1-i)*8)) & 0xff)
432
433
434
435
436
437
438
439
  return result

def ip_to_int(ip):
  ip_list = ip.split('.', 3)
  result = 0
  for ip_byte in ip_list:
    result = result << 8
    result = result | (int(ip_byte, 10) & 0xff)
440
441
  return result

442
443
444
445
446
447
448
449
450
451
452
453
454
455
#def int_to_ip(num):
#  ip_list = ['0', '0', '0', '0']
#  for ip_byte in ip_list:
#    ip_byte = str(num & 0xff)
#    num = num >> 8
#  ip_list.reverse()
#  return '.'.join(ip_list)

def int_to_ip(num):
  ip_list = ['0', '0', '0', '0']
  for index in range(4):
    ip_list[3-index] = str(num & 0xff)
    num = num >> 8
  return '.'.join(ip_list)
456

457
458
459
460
461
462
463
464
def checksum(buf):
  total = 0
  flip = 1
  for index in range(len(buf)):
    total += (ord(buf[index]) & 0xff) * flip
#    flip *= -1
  return total

465
main_loop()