Commit 1099dafc authored by Jonathon Duerig's avatar Jonathon Duerig
Browse files

Added initial conditions handling to monitor/stub. This should also solve the...

Added initial conditions handling to monitor/stub. This should also solve the what does throughput mean when the connection is delay-dominated? problem. Now there are two kinds of bandwidth measurements that the magent can send back to the monitor. There is a throughput measurement which is tentative and which is applied only if it is greater than the last measurement. And there is a bandwidth measurement that is authoritative and is applied regardless.
parent cd5cff20
......@@ -28,20 +28,40 @@ public:
virtual ~CommandOutput() {}
void eventMessage(std::string const & message, Order const & key,
PathDirection dir=FORWARD_PATH)
{
if (dir == FORWARD_PATH)
{
genericMessage(EVENT_FORWARD_PATH, message, key);
}
else
{
genericMessage(EVENT_BACKWARD_PATH, message, key);
}
}
void genericMessage(int type, std::string const & message, Order const & key)
{
if (message.size() <= 0xffff && message.size() > 0)
{
Header prefix;
std::string pathString;
if (dir == FORWARD_PATH)
{
prefix.type = EVENT_FORWARD_PATH;
pathString = "FORWARD";
}
else
prefix.type = type;
switch (prefix.type)
{
prefix.type = EVENT_BACKWARD_PATH;
pathString = "BACKWARD";
case EVENT_FORWARD_PATH:
pathString = "EVENT_FORWARD";
break;
case EVENT_BACKWARD_PATH:
pathString = "EVENT_BACKWARD";
break;
case TENTATIVE_THROUGHPUT:
pathString = "TENTATIVE_THROUGHPUT";
break;
case AUTHORITATIVE_BANDWIDTH:
pathString = "AUTHORITATIVE_BANDWIDTH";
break;
default:
pathString = "Unknown Output Command Type";
}
prefix.size = message.size();
prefix.key = key;
......
......@@ -4,13 +4,17 @@
#include "EwmaThroughputSensor.h"
#include "ThroughputSensor.h"
#include "CommandOutput.h"
#include "StateSensor.h"
using namespace std;
EwmaThroughputSensor::EwmaThroughputSensor(
ThroughputSensor * newThroughputSource)
: throughput(0.0)
ThroughputSensor * newThroughputSource,
StateSensor * newState)
: maxThroughput(0)
, bandwidth(0.0)
, throughputSource(newThroughputSource)
, state(newState)
{
}
......@@ -21,21 +25,39 @@ void EwmaThroughputSensor::localSend(PacketInfo * packet)
void EwmaThroughputSensor::localAck(PacketInfo * packet)
{
int latest = throughputSource->getThroughputInKbps();
if (latest != 0)
if (state->isSaturated())
{
if (throughput == 0.0)
// The link is saturated, so we know that the throughput
// measurement is the real bandwidth.
if (bandwidth == 0.0)
{
throughput = latest;
bandwidth = latest;
}
else
{
static const double alpha = 0.1;
throughput = throughput*(1.0-alpha) + latest*alpha;
bandwidth = bandwidth*(1.0-alpha) + latest*alpha;
}
// We have got an actual bandwidth measurement, so reset
// maxThroughput accordingly.
maxThroughput = static_cast<int>(bandwidth);
ostringstream buffer;
buffer << setiosflags(ios::fixed | ios::showpoint) << setprecision(0);
buffer << "bandwidth=" << throughput;
global::output->eventMessage(buffer.str(), packet->elab,
CommandOutput::FORWARD_PATH);
buffer << static_cast<int>(bandwidth);
global::output->genericMessage(AUTHORITATIVE_BANDWIDTH, buffer.str(),
packet->elab);
}
else
{
// The link isn't saturated, so we don't know whether this
// throughput measurement represents real bandwidth or not.
if (latest > maxThroughput)
{
maxThroughput = latest;
// Send out a tentative number
ostringstream buffer;
buffer << maxThroughput;
global::output->genericMessage(TENTATIVE_THROUGHPUT, buffer.str(),
packet->elab);
}
}
}
......@@ -6,17 +6,23 @@
#include "Sensor.h"
class ThroughputSensor;
class StateSensor;
class EwmaThroughputSensor : public Sensor
{
public:
EwmaThroughputSensor(ThroughputSensor * newThroughputSource);
EwmaThroughputSensor(ThroughputSensor * newThroughputSource,
StateSensor * newState);
protected:
virtual void localSend(PacketInfo * packet);
virtual void localAck(PacketInfo * packet);
private:
double throughput;
// The maximum throughput or latest bandwidth number
int maxThroughput;
// And EWMA of the last several solid bandwidth measurements
double bandwidth;
ThroughputSensor * throughputSource;
StateSensor * state;
};
#endif
......@@ -204,7 +204,7 @@ void SensorList::pushMinDelaySensor(void)
// Dependency set
depMinDelaySensor = newSensor;
}
}
void SensorList::pushMaxDelaySensor(void)
......@@ -244,9 +244,11 @@ void SensorList::pushThroughputSensor(void)
void SensorList::pushEwmaThroughputSensor(void)
{
// Dependency list
pushStateSensor();
pushThroughputSensor();
logWrite(SENSOR, "Adding EwmaThroughputSensor");
std::auto_ptr<Sensor> current(new EwmaThroughputSensor(depThroughputSensor));
std::auto_ptr<Sensor> current(new EwmaThroughputSensor(depThroughputSensor,
depStateSensor));
pushSensor(current);
}
......@@ -63,8 +63,21 @@ void replayWritePacket(PacketInfo * packet);
// Enum of header types -- to-monitor (Events, etc.)
enum
{
// In these two cases, the body of the message is a string to be
// sent as an opaque event.
EVENT_FORWARD_PATH = 0,
EVENT_BACKWARD_PATH = 1
EVENT_BACKWARD_PATH = 1,
// In these two cases, the body of the message is a string
// consisting only of the number of estimated throughput or
// bandwidth in kbps.
// This is the bandwidth number to send if it exceeds the current
// bandwidth on the monitor side.
TENTATIVE_THROUGHPUT = 2,
// This is a bandwidth number to use even if it is smaller than the
// bandwidth on the monitor side
AUTHORITATIVE_BANDWIDTH = 3
};
// Enum of header types -- from-monitor (Commands)
......
10.0.0.1 10.0.0.2 20 2000
10.0.0.2 10.0.0.1 20 2000
10.0.0.1 10.1.0.1 elabc-elab0
10.0.0.2 10.4.0.1 elabc-elab1
10.0.0.1 10.1.0.1 elabc-elab-1
10.0.0.2 10.1.0.2 elabc-elab-2
......@@ -13,10 +13,13 @@ import time
import socket
import select
import re
from optparse import OptionParser
EVENT_FORWARD_PATH = 0
EVENT_BACKWARD_PATH = 1
TENTATIVE_THROUGHPUT = 2
AUTHORITATIVE_BANDWIDTH = 3
NEW_CONNECTION_COMMAND = 0
TRAFFIC_MODEL_COMMAND = 1
CONNECTION_MODEL_COMMAND = 2
......@@ -46,11 +49,17 @@ emulated_to_real = {}
real_to_emulated = {}
emulated_to_interface = {}
ip_mapping_filename = ''
initial_filename = ''
this_experiment = ''
this_ip = ''
stub_ip = ''
stub_port = 0
netmon_output_version = 2
# A throughput measurement is only passed on as an event if it is
# larger than the last bandwidth from that connection
connection_bandwidth = {}
total_size = 0
last_total = -1
prev_time = 0.0
......@@ -61,9 +70,9 @@ def main_loop():
read_args()
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sys.stdout.write("stub_ip is %s\n" % stub_ip)
sys.stdout.write("stub_ip is " + stub_ip + ":" + str(stub_port) + "\n")
sys.stdout.flush()
conn.connect((stub_ip, 4200))
conn.connect((stub_ip, stub_port))
poll = select.poll()
poll.register(sys.stdin, select.POLLIN)
poll.register(conn, select.POLLIN)
......@@ -97,16 +106,60 @@ def main_loop():
# sys.stdout.write('Loop end\n')
def read_args():
global ip_mapping_filename, this_experiment, this_ip, stub_ip
if len(sys.argv) >= 4:
ip_mapping_filename = sys.argv[1]
this_experiment = sys.argv[2]
this_ip = sys.argv[3]
populate_ip_tables()
if len(sys.argv) == 5:
stub_ip = sys.argv[4]
else:
stub_ip = emulated_to_real[this_ip]
global ip_mapping_filename, this_experiment, this_ip, stub_ip, stub_port
global initial_filename
usage = "usage: %prog [options]"
parser = OptionParser(usage=usage)
parser.add_option("--mapping", action="store", type="string",
dest="ip_mapping_filename", metavar="MAPPING_FILE",
help="File mapping IP addresses on Emulab to those on PlanetLab (required)")
parser.add_option("--experiment", action="store", type="string",
dest="this_experiment", metavar="EXPERIMENT_NAME",
help="Experiment name of the form pid/eid (required)")
parser.add_option("--ip", action="store", type="string",
dest="this_ip", metavar="X.X.X.X",
help="The IP address the monitor will use (required)")
parser.add_option("--stub-ip", action="store", type="string",
dest="stub_ip", metavar="X.X.X.X",
help="The IP address of the stub (if not specified, defaults to the one in the ip mapping)")
parser.add_option("--stub-port", action="store", type="int",
dest="stub_port", default=4200,
help="The port used to connect to the stub (defaults to 4200)")
parser.add_option("--initial", action="store", type="string",
dest="initial_filename", metavar="INITIAL_CONDITIONS_FILE",
help="File giving initial conditions for connections (defaults to no shaping)")
(options, args) = parser.parse_args()
ip_mapping_filename = options.ip_mapping_filename
this_experiment = options.this_experiment
this_ip = options.this_ip
stub_ip = options.stub_ip
stub_port = options.stub_port
initial_filename = options.initial_filename
if len(args) != 0:
parser.print_help()
parser.error("Invalid argument(s): " + str(args))
if ip_mapping_filename == None:
parser.print_help()
parser.error("Missing --mapping=MAPPING_FILE option")
if this_experiment == None:
parser.print_help()
parser.error("Missing --experiment=EXPERIMENT_NAME option")
if this_ip == None:
parser.print_help()
parser.error("Missing --ip=X.X.X.X option")
populate_ip_tables()
if stub_ip == None:
sys.stdout.write('stub_ip was None before\n')
stub_ip = emulated_to_real[this_ip]
sys.stdout.write('stub_ip: ' + stub_ip + '\n')
sys.stdout.write('this_ip: ' + this_ip + '\n')
sys.stdout.write('emulated_to_real: ' + str(emulated_to_real) + '\n')
if initial_filename != None:
read_initial_conditions()
def populate_ip_tables():
input = open(ip_mapping_filename, 'r')
......@@ -119,6 +172,25 @@ def populate_ip_tables():
emulated_to_interface[fields[0]] = fields[2]
line = input.readline()
# Format of an initial conditions file is:
#
# List of lines, where each line is of the format:
# <source-ip> <dest-ip> <delay> <bandwidth>
#
# Where source and dest ip addresses are in x.x.x.x format, and delay and
# bandwidth are integral values in milliseconds and kilobits per second
# respectively.
def read_initial_conditions():
input = open(initial_filename, 'r')
line = input.readline()
while line != '':
fields = line.strip().split(' ', 3)
if len(fields) == 4 and fields[0] == this_ip:
set_link(fields[0], fields[1], 'delay=' + str(int(fields[2])/2))
set_link(fields[0], fields[1], 'bandwidth=' + fields[3])
connection_bandwidth[fields[1]] = int(fields[3])
line = input.readline()
def get_next_packet(conn):
global total_size, last_total, prev_time
line = sys.stdin.readline()
......@@ -219,6 +291,7 @@ def get_next_packet(conn):
sys.stdout.write('skipped line in the wrong format: ' + line)
def receive_characteristic(conn):
global connection_bandwidth
buf = conn.recv(12)
if len(buf) != 12:
sys.stdout.write('Event header is the wrong size. Length: '
......@@ -234,10 +307,24 @@ def receive_characteristic(conn):
if len(buf) != size:
sys.stdout.write('Event body is the wrong size.\n')
return False
if (eventType == EVENT_FORWARD_PATH):
if eventType == EVENT_FORWARD_PATH:
set_link(this_ip, dest, buf)
elif (eventType == EVENT_BACKWARD_PATH):
elif eventType == EVENT_BACKWARD_PATH:
set_link(dest, this_ip, buf)
elif eventType == TENTATIVE_THROUGHPUT:
# There is a throughput number, but we don't know whether the link
# is saturated or not. If the link is not saturated, then we just
# need to make sure that emulated bandwidth >= real
# bandwidth. This means that we output a throughput number only if
# it is greater than our previous measurements because we don't
# know that bandwidth has decreased.
if int(buf) > connection_bandwidth[dest]:
connection_bandwidth[dest] = int(buf)
set_link(dest, this_ip, 'bandwidth=' + buf)
elif eventType == AUTHORITATIVE_BANDWIDTH:
# We know that the bandwidth has definitely changed. Reset everything.
connection_bandwidth[dest] = int(buf)
set_link(dest, this_ip, 'bandwidth=' + buf)
else:
sys.stdout.write('Other: ' + str(eventType) + ', ' + str(value) + '\n');
return True
......
......@@ -45,6 +45,7 @@ else
$PERL ${MONITOR_DIR}/$GENIPMAP > $IPMAP
fi
echo "Starting up monitor for $PROJECT/$EXPERIMENT $PELAB_IP $SIP";
exec $NETMON_DIR/$NETMOND -v 2 -f 262144 | tee $LOGDIR/libnetmon.out | $PYTHON $MONITOR_DIR/$MONITOR $IPMAP $PROJECT/$EXPERIMENT $PELAB_IP $SIP
#echo "Starting up monitor for $PROJECT/$EXPERIMENT $PELAB_IP $SIP";
echo "Starting up monitor with options --mapping=$IPMAP --experiment=$PROJECT/$EXPERIMENT --ip=$PELAB_IP --initial=$MONITOR_DIR/initial.txt";
exec $NETMON_DIR/$NETMOND -v 2 -f 262144 | tee $LOGDIR/libnetmon.out | $PYTHON $MONITOR_DIR/$MONITOR --mapping=$IPMAP --experiment=$PROJECT/$EXPERIMENT --ip=$PELAB_IP --initial=$MONITOR_DIR/initial.txt
#exec $NETMON_DIR/$NETMOND -v 2 | $PYTHON $MONITOR_DIR/$MONITOR ip-mapping.txt $PROJECT/$EXPERIMENT $PELAB_IP $SIP
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment