monitor.py 12.3 KB
Newer Older
1 2 3 4 5 6 7 8
#Usage: tcpdump | python monitor.py <mapping-file> <experiment-name>
#                                   <my-address> [stub-address]
# mapping-file is a file which maps emulated addresses to real addresses.
# experiment-name is the project/experiment name on emulab. For instance
#   'tbres/pelab'.
# my-address is the IP address of the current node.
# stub-address is the IP address of the corresponding planet-lab node. If this
#   is left out, then the mapping-file is used to determine it.
9 10 11 12 13 14

import sys
import os
import time
import socket
import select
15
import re
16

17 18
EVENT_FORWARD_PATH = 0
EVENT_BACKWARD_PATH = 1
19

20 21 22 23 24 25 26 27 28
NEW_CONNECTION_COMMAND = 0
TRAFFIC_MODEL_COMMAND = 1
CONNECTION_MODEL_COMMAND = 2
SENSOR_COMMAND = 3
CONNECT_COMMAND = 4
TRAFFIC_WRITE_COMMAND = 5
DELETE_CONNECTION_COMMAND = 6

NULL_SENSOR = 0
29 30 31 32 33 34
STATE_SENSOR = 1
PACKET_SENSOR = 2
DELAY_SENSOR = 3
MIN_DELAY_SENSOR = 4
MAX_DELAY_SENSOR = 5
THROUGHPUT_SENSOR = 6
35
EWMA_THROUGHPUT_SENSOR = 7
36 37 38 39 40 41 42 43

CONNECTION_SEND_BUFFER_SIZE = 0
CONNECTION_RECEIVE_BUFFER_SIZE = 1
CONNECTION_MAX_SEGMENT_SIZE = 2
CONNECTION_USE_NAGLES = 3

TCP_CONNECTION = 0
UDP_CONNECTION = 1
44

45 46 47 48 49 50
emulated_to_real = {}
real_to_emulated = {}
emulated_to_interface = {}
ip_mapping_filename = ''
this_experiment = ''
this_ip = ''
51
stub_ip = ''
52
netmon_output_version = 2
53 54 55

total_size = 0
last_total = -1
56
prev_time = 0.0
57 58

def main_loop():
59
  global total_size, last_total
60
  # Initialize
61
  read_args()
62
  conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
63
  conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
64
  sys.stdout.write("stub_ip is %s\n" % stub_ip)
65
  sys.stdout.flush()
66
  conn.connect((stub_ip, 4200))
67 68 69 70 71 72 73
  poll = select.poll()
  poll.register(sys.stdin, select.POLLIN)
  poll.register(conn, select.POLLIN)
  done = False

  while not done:
    # Reset
74
#    max_time = time.time() + quanta
75 76

    # Collect data until the next quanta boundary
77 78 79
    fdlist = poll.poll()
    for pos in fdlist:
      if pos[0] == sys.stdin.fileno() and (pos[1] & select.POLLIN) != 0 and not done:
80
          # A line of data from tcpdump is available.
81
          try:
82
#            sys.stdout.write('get_next_packet()\n')
83
            get_next_packet(conn)
84
          except EOFError:
85
            sys.stdout.write('Done: Got EOF on stdin\n')
86
            done = 1
87 88 89 90 91 92
      elif pos[0] == conn.fileno() and (pos[1] & select.POLLIN) != 0 and not done:
        sys.stdout.write('receive_characteristic()\n')
        # A record for change in link characteristics is available.
        done = not receive_characteristic(conn)
      elif not done:
        sys.stdout.write('fd: ' + str(pos[0]) + ' conn-fd: ' + str(conn.fileno()) + '\n')
93
      # Update the stub
94
    if total_size != last_total:
95
#      sys.stdout.write('Total Size: ' + str(total_size) + '\n')
96 97
      last_total = total_size
#    sys.stdout.write('Loop end\n')
98

99
def read_args():
100 101
  global ip_mapping_filename, this_experiment, this_ip, stub_ip
  if len(sys.argv) >= 4:
102 103 104
    ip_mapping_filename = sys.argv[1]
    this_experiment = sys.argv[2]
    this_ip = sys.argv[3]
105 106 107 108 109
    populate_ip_tables()
    if len(sys.argv) == 5:
      stub_ip = sys.argv[4]
    else:
      stub_ip = emulated_to_real[this_ip]
110 111 112 113 114 115 116 117 118 119 120 121

def populate_ip_tables():
  input = open(ip_mapping_filename, 'r')
  line = input.readline()
  while line != '':
    fields = line.strip().split(' ', 2)
    if len(fields) == 3:
      emulated_to_real[fields[0]] = fields[1]
      real_to_emulated[fields[1]] = fields[0]
      emulated_to_interface[fields[0]] = fields[2]
    line = input.readline()

122 123
def get_next_packet(conn):
  global total_size, last_total, prev_time
124
  line = sys.stdin.readline()
125 126
  if line == "":
      raise EOFError
127 128 129
  #
  # Check for a packet from netmond
  #
130 131
  # Could move this elsewhere to avoid re-compiling, but I'd like to keep it
  # with this code for better readability
132
  if netmon_output_version == 1:
133
    linexp = re.compile('^(\d+\.\d+) > (\d+\.\d+\.\d+\.\d+)\.(\d+) (\((\d+)\))?')
134
  elif netmon_output_version == 2:
135
    linexp = re.compile('^(\d+\.\d+) > (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) (\((\d+)\))?')
136

137
  match = linexp.match(line)
138
  conexp = re.compile('^(New|Connected|Closed|SO_RCVBUF|SO_SNDBUF): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+)( ((?:\d+)(?:\.(?:\d+))?))?')
139
  cmatch = conexp.match(line)
140
  if match:
141 142 143 144 145 146 147 148 149 150 151 152 153 154
    localport = 0 # We may not get this one
    if netmon_output_version == 1:
      time = float(match.group(1))
      ipaddr = match.group(2)
      remoteport = int(match.group(3))
      size_given = match.group(4) != ''
      size = int(match.group(5))
    elif (netmon_output_version == 2):
      time = float(match.group(1))
      localport = int(match.group(2))
      ipaddr = match.group(3)
      remoteport = int(match.group(4))
      size_given = match.group(5) != ''
      size = int(match.group(6))
155 156 157

      #sys.stdout.write('dest: ' + ipaddr + ' destport: ' + str(remoteport) +
      #        ' srcport: ' + str(localport) + ' size: ' + str(size) + '\n')
158
      if not size_given:
159
        size = 0
160
      if emulated_to_real.has_key(ipaddr):
161 162 163 164 165
        total_size = total_size + size
        send_command(conn, TRAFFIC_WRITE_COMMAND, TCP_CONNECTION, ipaddr,
                     localport, remoteport,
                     save_int(int((time - prev_time)*1000)) + save_int(size))
        prev_time = time
166 167 168 169 170
  elif ((netmon_output_version == 2) and cmatch):
      #
      # Watch for new or closed connections
      #
      event = cmatch.group(1)
171
      localport = int(cmatch.group(2))
172
      ipaddr = cmatch.group(3)
173 174
      remoteport = int(cmatch.group(4))
      value_given = cmatch.group(5) != ''
175
      value = cmatch.group(6)
176
      if not value_given:
177 178
        value = '0'

179
      if emulated_to_real.has_key(ipaddr):
180 181 182 183 184 185 186 187
        sys.stdout.write("Got a connection event: " + event + "\n")
        if event == 'New':
          send_command(conn, NEW_CONNECTION_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport, '')
          send_command(conn, TRAFFIC_MODEL_COMMAND, TCP_CONNECTION, ipaddr,
                       localport, remoteport, '')
          send_command(conn, SENSOR_COMMAND, TCP_CONNECTION, ipaddr,
                       localport, remoteport, save_int(MIN_DELAY_SENSOR))
188
          send_command(conn, SENSOR_COMMAND, TCP_CONNECTION, ipaddr,
189 190 191 192 193
#                       localport, remoteport, save_int(NULL_SENSOR))
#          send_command(conn, SENSOR_COMMAND, TCP_CONNECTION, ipaddr,
                       localport, remoteport, save_int(MAX_DELAY_SENSOR))
          send_command(conn, SENSOR_COMMAND, TCP_CONNECTION, ipaddr,
                       localport, remoteport, save_int(EWMA_THROUGHPUT_SENSOR))
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
        elif event == 'Closed':
          send_command(conn, DELETE_CONNECTION_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport, '')
        elif event == 'Connected':
          send_command(conn, CONNECT_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport, '')
          prev_time = float(value)
#        elif event == 'LocalPort':
        elif event == 'SO_RCVBUF':
          send_command(conn, CONNECTION_MODEL_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport,
                      save_int(CONNECTION_RECEIVE_BUFFER_SIZE)
                      + save_int(int(value)))
        elif event == 'SO_SNDBUF':
          send_command(conn, CONNECTION_MODEL_COMMAND, TCP_CONNECTION, ipaddr,
                      localport, remoteport,
                      save_int(CONNECTION_SEND_BUFFER_SIZE)
                       + save_int(int(value)))
        else:
          sys.stdout.write('skipped line with an invalid event: '
                           + event + '\n')
215
      else:
216 217
        sys.stdout.write('skipped line with an invalid destination: '
                         + ipaddr + '\n')
218
  else:
219
      sys.stdout.write('skipped line in the wrong format: ' + line)
220
    
221
def receive_characteristic(conn):
222 223 224 225
  buf = conn.recv(12)
  if len(buf) != 12:
    sys.stdout.write('Event header is the wrong size. Length: '
                     + str(len(buf)) + '\n')
226
    return False
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
  eventType = load_char(buf[0:1]);
  size = load_short(buf[1:3]);
  transport = load_char(buf[3:4]);
  dest = real_to_emulated[int_to_ip(load_int(buf[4:8]))]
  source_port = load_short(buf[8:10])
  dest_port = load_short(buf[10:12])
  buf = conn.recv(size);
  if len(buf) != size:
    sys.stdout.write('Event body is the wrong size.\n')
    return False
  if (eventType == EVENT_FORWARD_PATH):
    set_link(this_ip, dest, buf)
  elif (eventType == EVENT_BACKWARD_PATH):
    set_link(dest, this_ip, buf)
  else:
    sys.stdout.write('Other: ' + str(eventType) + ', ' + str(value) + '\n');
  return True
244

245
def set_bandwidth(kbps, dest):
246
#  sys.stdout.write('<event> bandwidth=' + str(kbps) + '\n')
247 248
  now = time.time()
  sys.stderr.write('BANDWIDTH!purple\n')
249 250 251
  sys.stderr.write('BANDWIDTH!line ' + ('%0.6f' % now) + ' 0 '
                   + ('%0.6f' % now)
                   + ' ' + str(kbps*1000/8) + '\n')
252
  return set_link(this_ip, dest, 'bandwidth=' + str(kbps))
253

254
# Set delay on the link. We are given round trip time.
255
def set_delay(milliseconds, dest):
256
  now = time.time()
257 258
  sys.stderr.write('RTT!orange\n')
  sys.stderr.write('RTT!line ' + ('%0.6f' % now) + ' 0 ' + ('%0.6f' % now)
259
	+ ' ' + str(milliseconds) + '\n')
260
  # Set the delay from here to there to 1/2 rtt.
261 262
  error = set_link(this_ip, dest, 'delay=' + str(milliseconds/2))
  if error == 0:
263
    # If that succeeded, set the delay from there to here.
264 265 266
    return set_link(dest, this_ip, 'delay=' + str(milliseconds/2))
  else:
    return error
267

268
def set_loss(probability, dest):
269 270
  return set_link(this_ip, dest, 'plr=' + str(probability))

271 272 273 274 275 276
def set_max_delay(delay, dest):
  hertz = 10000.0
  milliseconds = 1000.0
  return set_link(this_ip, dest, 'MAXINQ=' + str(int(
    (hertz/milliseconds)*delay )))

277 278 279 280
def set_link(source, dest, ending):
  command = ('/usr/testbed/bin/tevc -e ' + this_experiment + ' now '
             + emulated_to_interface[source] + ' modify dest=' + dest + ' '
             + ending)
281
  sys.stdout.write('event: ' + command + '\n')
282 283
  return os.system(command)

284 285 286 287 288 289 290 291 292
def send_command(conn, command_id, protocol, ipaddr, localport, remoteport,
                 command):
  output = (save_char(command_id)
            + save_short(len(command))
            + save_char(protocol)
            + save_int(ip_to_int(emulated_to_real[ipaddr]))
            + save_short(localport)
            + save_short(remoteport)
            + command)
293
#  sys.stdout.write('Sending command: CHECKSUM=' + str(checksum(output)) + '\n')
294 295
  conn.sendall(output)

296
def send_destinations(conn, packet_list):
297
#  sys.stdout.write('<send> total size:' + str(total_size) + ' packet count:' + str(len(packet_list)) + '\n')# + ' -- '
298 299 300 301
#                   + str(packet_list) + '\n')
  output = save_int(0) + save_int(len(packet_list))
  prev_time = 0.0
  if len(packet_list) > 0:
302
    prev_time = packet_list[0][3]
303 304
  for packet in packet_list:
    ip = ip_to_int(emulated_to_real[packet[0]])
305 306
    if prev_time == 0.0:
      prev_time = packet[3]
307 308 309
    delta = int((packet[3] - prev_time) * 1000)
    if packet[3] == 0:
      delta = 0
310 311
    output = (output + save_int(ip) + save_short(packet[1])
              + save_short(packet[2])
312 313 314 315 316
              + save_int(delta)
              + save_int(packet[4])
              + save_short(packet[5]))
    if packet[3] != 0:
      prev_time = packet[3]
317 318
  conn.sendall(output)

319
def load_int(str):
320 321 322 323 324 325 326 327 328
  return load_n(str, 4)

def save_int(number):
  return save_n(number, 4)

def load_short(str):
  return load_n(str, 2)

def save_short(number):
329 330 331 332 333 334 335
  return save_n(number, 2)

def load_char(str):
  return load_n(str, 1)

def save_char(number):
  return save_n(number, 1)
336 337

def load_n(str, n):
338
  result = 0
339 340
  for i in range(n):
    result = result | ((ord(str[i]) & 0xff) << (8*(n-1-i)))
341 342
  return result

343
def save_n(number, n):
344
  result = ''
345 346
  for i in range(n):
    result = result + chr((number >> ((n-1-i)*8)) & 0xff)
347 348 349 350 351 352 353 354
  return result

def ip_to_int(ip):
  ip_list = ip.split('.', 3)
  result = 0
  for ip_byte in ip_list:
    result = result << 8
    result = result | (int(ip_byte, 10) & 0xff)
355 356
  return result

357 358 359 360 361 362 363 364 365 366 367 368 369 370
#def int_to_ip(num):
#  ip_list = ['0', '0', '0', '0']
#  for ip_byte in ip_list:
#    ip_byte = str(num & 0xff)
#    num = num >> 8
#  ip_list.reverse()
#  return '.'.join(ip_list)

def int_to_ip(num):
  ip_list = ['0', '0', '0', '0']
  for index in range(4):
    ip_list[3-index] = str(num & 0xff)
    num = num >> 8
  return '.'.join(ip_list)
371

372 373 374 375 376 377 378 379
def checksum(buf):
  total = 0
  flip = 1
  for index in range(len(buf)):
    total += (ord(buf[index]) & 0xff) * flip
#    flip *= -1
  return total

380
main_loop()