Commit afa661e8 authored by Jonathon Duerig's avatar Jonathon Duerig

Added sensor replay. It seems to be working perfectly. A replay is...

Added sensor replay. It seems to be working perfectly. A replay is automatically saved after every run in plab-n/local/logs/stub.replay. You can get a replay by running a command similar to: sudo ./magent --replay-load=/proj/tbres/exp/pelab-generated/logs/plab-1/local/logs/stub.replay
parent a62bd59b
......@@ -137,25 +137,28 @@ void Connection::captureSend(PacketInfo * packet)
{
logWrite(SENSOR, "Captured a send packet");
Sensor * head = measurements.getHead();
if (head != NULL && isConnected)
{
packet->elab = elab;
packet->bufferFull = bufferFull;
if (head != NULL && isConnected)
{
head->captureSend(packet);
}
replayWritePacket(PACKET_INFO_SEND_COMMAND, packet);
}
void Connection::captureAck(PacketInfo * packet)
{
logWrite(SENSOR, "Captured an ack packet");
Sensor * head = measurements.getHead();
if (head != NULL && isConnected)
{
packet->elab = elab;
packet->bufferFull = bufferFull;
if (head != NULL && isConnected)
{
head->captureAck(packet);
}
replayWritePacket(PACKET_INFO_ACK_COMMAND, packet);
}
ConnectionModel const * Connection::getConnectionModel(void)
{
return peer.get();
......
......@@ -16,6 +16,13 @@ public:
// an error. Errno is not preserved.
virtual int writeMessage(int size, WriteResult & result)=0;
virtual bool isConnected(void)=0;
// These are static members which are used. They are called from
// switch statements in main.cc
// static void init(void);
// static void addNewPeer(fd_set * readable);
// static void readFromPeers(fd_set * readable);
// static void packetCapture(fd_set * readable);
};
class ConnectionModelNull : public ConnectionModel
......
......@@ -93,6 +93,13 @@ void DirectInput::nextCommand(fd_set * readable)
{
// logWrite(COMMAND_INPUT, "Finished reading a command: CHECKSUM=%d",
// checksum());
if (global::replayArg == REPLAY_SAVE
&& (commandHeader.type == NEW_CONNECTION_COMMAND
|| commandHeader.type == DELETE_CONNECTION_COMMAND
|| commandHeader.type == SENSOR_COMMAND))
{
replayWriteCommand(headerBuffer, bodyBuffer, commandHeader.size);
}
currentCommand = loadCommand(&commandHeader, bodyBuffer);
index = 0;
state = HEADER;
......
......@@ -27,6 +27,11 @@ struct timeval * Time::getTimeval(void)
return &data;
}
struct timeval const * Time::getTimeval(void) const
{
return &data;
}
Time Time::operator+(int const & right) const
{
Time result;
......
......@@ -10,6 +10,7 @@ public:
Time(struct timeval const & newData);
long long toMilliseconds(void) const;
struct timeval * getTimeval(void);
struct timeval const * getTimeval(void) const;
Time operator+(int const & right) const;
Time operator-(Time const & right) const;
bool operator<(Time const & right) const;
......
......@@ -51,6 +51,12 @@ int createServer(int port, std::string const & debugString);
int acceptServer(int acceptfd, struct sockaddr_in * remoteAddress,
std::string const & debugString);
class PacketInfo;
void replayWriteCommand(char * head, char * body, unsigned short bodySize);
void replayWritePacket(int command, PacketInfo * packet);
// Enum of header types -- to-monitor (Events, etc.)
enum
{
......@@ -67,7 +73,10 @@ enum
SENSOR_COMMAND = 3,
CONNECT_COMMAND = 4,
TRAFFIC_WRITE_COMMAND = 5,
DELETE_CONNECTION_COMMAND = 6
DELETE_CONNECTION_COMMAND = 6,
// These are pseudo-commands for replay.
PACKET_INFO_SEND_COMMAND = 7,
PACKET_INFO_ACK_COMMAND = 8
};
// This is used for the type field in a SensorCommand
......@@ -177,6 +186,12 @@ struct IpHeader
struct PacketInfo
{
// packetTime+packetLength+kernel+elab
enum {size = sizeof(int)*(2 +1 +21 + 1) +
sizeof(short)*(0 +0 +0 + 2) +
sizeof(char)*(0 +0 +7 + 1) +
sizeof(IpHeader) + sizeof(struct tcphdr)};
Time packetTime;
int packetLength;
struct tcp_info const * kernel;
......@@ -192,6 +207,13 @@ enum
CONNECTION_MODEL_KERNEL = 1
};
enum
{
NO_REPLAY = 0,
REPLAY_LOAD = 1,
REPLAY_SAVE = 2
};
class ConnectionModel;
class Connection;
class CommandInput;
......@@ -203,6 +225,8 @@ namespace global
extern unsigned short peerServerPort;
extern unsigned short monitorServerPort;
extern bool doDaemonize;
extern int replayArg;
extern int replayfd;
extern int peerAccept;
extern std::auto_ptr<ConnectionModel> connectionModelExemplar;
......
......@@ -25,6 +25,8 @@ namespace global
unsigned short peerServerPort = 0;
unsigned short monitorServerPort = 0;
bool doDaemonize = false;
int replayArg = NO_REPLAY;
int replayfd = -1;
int peerAccept = -1;
string interface;
......@@ -46,6 +48,7 @@ void usageMessage(char *progname);
void processArgs(int argc, char * argv[]);
void init(void);
void mainLoop(void);
void replayLoop(void);
void writeToConnections(multimap<Time, Connection *> & schedule);
void addNewPeer(fd_set * readable);
void readFromPeers(fd_set * readable);
......@@ -53,6 +56,7 @@ void packetCapture(fd_set * readable);
int main(int argc, char * argv[])
{
umask(0);
processArgs(argc, argv);
init();
if (global::doDaemonize)
......@@ -64,17 +68,27 @@ int main(int argc, char * argv[])
logWrite(ERROR, "Daemonization failed: %s", strerror(errno));
}
}
if (global::replayArg == REPLAY_LOAD)
{
replayLoop();
}
else
{
mainLoop();
}
return 0;
}
void usageMessage(char *progname) {
void usageMessage(char *progname)
{
cerr << "Usage: " << progname << " [options]" << endl;
cerr << " --connectionmodel=<null|kerneltcp> " << endl;
cerr << " --peerserverport=<int> " << endl;
cerr << " --monitorserverport=<int> " << endl;
cerr << " --interface=<iface> " << endl;
cerr << " --daemonize " << endl;
cerr << " --replay-save=<filename> " << endl;
cerr << " --replay-loae=<filename> " << endl;
logWrite(ERROR, "Bad command line argument", global::connectionModelArg);
}
......@@ -86,6 +100,8 @@ void processArgs(int argc, char * argv[])
global::monitorServerPort = 4200;
global::interface = "vnet";
global::doDaemonize = false;
global::replayArg = NO_REPLAY;
global::replayfd = -1;
static struct option longOptions[] = {
// If you modify this, make sure to modify the shortOptions string below
......@@ -95,22 +111,26 @@ void processArgs(int argc, char * argv[])
{"monitorserverport", required_argument, NULL, 'm'},
{"interface", required_argument, NULL, 'i'},
{"daemonize", no_argument , NULL, 'd'},
{"replay-save", required_argument, NULL, 's'},
{"replay-load", required_argument, NULL, 'l'},
// Required so that getopt_long can find the end of the list
{NULL, 0, NULL, 0}
};
string shortOptions = "c:p:m:i:d";
string shortOptions = "c:p:m:i:ds:l:";
// Not used
int longIndex;
// Loop until all options have been handled
while (true) {
while (true)
{
int ch =
getopt_long(argc, argv, shortOptions.c_str(), longOptions, &longIndex);
// Detect end of options
if (ch == -1) {
if (ch == -1)
{
break;
}
......@@ -118,34 +138,46 @@ void processArgs(int argc, char * argv[])
// Make a copy of optarg that's more c++-y.
string optArg;
if (optarg != NULL) {
if (optarg != NULL)
{
optArg = optarg;
}
switch ((char) ch) {
switch ((char) ch)
{
case 'c':
if (optArg == "null") {
global::connectionModelArg = CONNECTION_MODEL_NULL;
} else if (optArg == "kerneltcp") {
}
else if (optArg == "kerneltcp")
{
global::connectionModelArg = CONNECTION_MODEL_KERNEL;
} else {
}
else
{
usageMessage(argv[0]);
exit(1);
}
break;
case 'p':
if (sscanf(optarg,"%i",&argIntVal)) {
if (sscanf(optarg,"%i",&argIntVal))
{
usageMessage(argv[0]);
exit(1);
} else {
}
else
{
global::peerServerPort = argIntVal;
}
break;
case 'm':
if (sscanf(optarg,"%i",&argIntVal)) {
if (sscanf(optarg,"%i",&argIntVal))
{
usageMessage(argv[0]);
exit(1);
} else {
}
else
{
global::monitorServerPort = argIntVal;
}
break;
......@@ -155,6 +187,47 @@ void processArgs(int argc, char * argv[])
case 'd':
global::doDaemonize = true;
break;
case 's':
if (global::replayArg == NO_REPLAY)
{
global::replayfd = open(optarg, O_WRONLY | O_CREAT | O_TRUNC,
S_IRWXU | S_IRWXG | S_IRWXO);
if (global::replayfd != -1)
{
global::replayArg = REPLAY_SAVE;
}
else
{
logWrite(ERROR, "Error opening replay-save file: %s: %s", optarg,
strerror(errno));
}
}
else
{
logWrite(ERROR, "replay-save option was invoked when replay "
"was already set");
}
break;
case 'l':
if (global::replayArg == NO_REPLAY)
{
global::replayfd = open(optarg, O_RDONLY);
if (global::replayfd != -1)
{
global::replayArg = REPLAY_LOAD;
}
else
{
logWrite(ERROR, "Error opening replay-load file: %s: %s", optarg,
strerror(errno));
}
}
else
{
logWrite(ERROR, "replay-load option was invoked when replay "
"was already set");
}
break;
case '?':
default:
usageMessage(argv[0]);
......@@ -263,6 +336,142 @@ void mainLoop(void)
}
}
// Returns true on success
bool replayWrite(char * source, int size)
{
bool result = true;
int error = write(global::replayfd, source, size);
if (error <= 0)
{
if (error == 0)
{
logWrite(EXCEPTION, "replayfd was closed unexpectedly");
}
else if (error == -1)
{
logWrite(EXCEPTION, "Error writing replay output: %s", strerror(errno));
}
result = false;
}
return result;
}
void replayWriteCommand(char * head, char * body, unsigned short bodySize)
{
bool success = true;
success = replayWrite(head, Header::headerSize);
if (success)
{
replayWrite(body, bodySize);
}
}
void replayWritePacket(int command, PacketInfo * packet)
{
Header head;
head.type = command;
head.size = PacketInfo::size;
head.key = packet->elab;
char headBuffer[Header::headerSize];
saveHeader(headBuffer, head);
bool success = replayWrite(headBuffer, Header::headerSize);
if (success)
{
char packetBuffer[PacketInfo::size];
savePacket(packetBuffer, *packet);
replayWrite(packetBuffer, PacketInfo::size);
}
}
// Returns true on success
bool replayRead(char * dest, int size)
{
bool result = true;
int error = read(global::replayfd, dest, size);
if (error <= 0)
{
if (error == 0)
{
logWrite(EXCEPTION, "End of replay input");
}
else if (error == -1)
{
logWrite(EXCEPTION, "Error reading replay input: %s", strerror(errno));
}
result = false;
}
return result;
}
void replayLoop(void)
{
bool done = false;
char headerBuffer[Header::headerSize];
Header head;
char packetBuffer[(PacketInfo::size > sizeof(SensorCommand))
? PacketInfo::size : sizeof(SensorCommand)];
struct tcp_info kernel;
IpHeader ip;
struct tcphdr tcp;
PacketInfo packet;
map<Order, SensorList> streams;
done = ! replayRead(headerBuffer, Header::headerSize);
while (!done)
{
loadHeader(headerBuffer, &head);
switch(head.type)
{
case NEW_CONNECTION_COMMAND:
streams.insert(make_pair(head.key, SensorList()));
break;
case DELETE_CONNECTION_COMMAND:
streams.erase(head.key);
break;
case SENSOR_COMMAND:
done = ! replayRead(packetBuffer, sizeof(int));
if (!done)
{
unsigned int sensorType = 0;
loadInt(packetBuffer, & sensorType);
SensorCommand sensor;
sensor.type = sensorType;
streams[head.key].addSensor(sensor);
}
break;
case PACKET_INFO_SEND_COMMAND:
case PACKET_INFO_ACK_COMMAND:
done = ! replayRead(packetBuffer, PacketInfo::size);
if (!done)
{
loadPacket(packetBuffer, &packet, kernel, ip, tcp);
Sensor * sensorHead = streams[head.key].getHead();
if (sensorHead != NULL)
{
if (head.type == PACKET_INFO_SEND_COMMAND)
{
sensorHead->captureSend(&packet);
}
else
{
sensorHead->captureAck(&packet);
}
}
}
break;
default:
logWrite(ERROR, "Invalid command type in replay input: %d", head.type);
done = true;
break;
}
if (!done)
{
done = ! replayRead(headerBuffer, Header::headerSize);
}
}
}
void writeToConnections(multimap<Time, Connection *> & schedule)
{
Time now = getCurrentTime();
......
......@@ -23,4 +23,4 @@ export LIBNETMON_OUTPUTFILE="/local/logs/libnetmon.out"
#
echo "Running PID $$"
echo "Starting magent on $PLAB_IFACE ($PLAB_IP) Extra arguments: $*"
exec $AS_ROOT $NETMON_DIR/instrument-standalone.sh $MAGENT_DIR/$MAGENT --interface=$PLAB_IFACE $*
exec $AS_ROOT $NETMON_DIR/instrument-standalone.sh $MAGENT_DIR/$MAGENT --interface=$PLAB_IFACE --replay-save=/local/logs/stub.replay $*
......@@ -72,6 +72,66 @@ char * saveHeader(char * buffer, Header const & value)
}
}
char * savePacket(char * buffer, PacketInfo const & value)
{
char * pos = buffer;
struct tcp_info const * kernel = value.kernel;
pos = saveInt(pos, value.packetTime.getTimeval()->tv_sec);
pos = saveInt(pos, value.packetTime.getTimeval()->tv_usec);
pos = saveInt(pos, value.packetLength);
pos = saveChar(pos, kernel->tcpi_state);
pos = saveChar(pos, kernel->tcpi_ca_state);
pos = saveChar(pos, kernel->tcpi_retransmits);
pos = saveChar(pos, kernel->tcpi_probes);
pos = saveChar(pos, kernel->tcpi_backoff);
pos = saveChar(pos, kernel->tcpi_options);
unsigned char windowScale = (kernel->tcpi_snd_wscale & 0xf)
| ((kernel->tcpi_rcv_wscale & 0xf) << 4);
pos = saveChar(pos, windowScale);
pos = saveInt(pos, kernel->tcpi_rto);
pos = saveInt(pos, kernel->tcpi_ato);
pos = saveInt(pos, kernel->tcpi_snd_mss);
pos = saveInt(pos, kernel->tcpi_rcv_mss);
pos = saveInt(pos, kernel->tcpi_unacked);
pos = saveInt(pos, kernel->tcpi_sacked);
pos = saveInt(pos, kernel->tcpi_lost);
pos = saveInt(pos, kernel->tcpi_retrans);
pos = saveInt(pos, kernel->tcpi_fackets);
pos = saveInt(pos, kernel->tcpi_last_data_sent);
pos = saveInt(pos, kernel->tcpi_last_ack_sent);
pos = saveInt(pos, kernel->tcpi_last_data_recv);
pos = saveInt(pos, kernel->tcpi_last_ack_recv);
pos = saveInt(pos, kernel->tcpi_pmtu);
pos = saveInt(pos, kernel->tcpi_rcv_ssthresh);
pos = saveInt(pos, kernel->tcpi_rtt);
pos = saveInt(pos, kernel->tcpi_rttvar);
pos = saveInt(pos, kernel->tcpi_snd_ssthresh);
pos = saveInt(pos, kernel->tcpi_snd_cwnd);
pos = saveInt(pos, kernel->tcpi_advmss);
pos = saveInt(pos, kernel->tcpi_reordering);
memcpy(pos, value.ip, sizeof(IpHeader));
pos += sizeof(IpHeader);
memcpy(pos, value.tcp, sizeof(struct tcphdr));
pos += sizeof(struct tcphdr);
pos = saveChar(pos, value.elab.transport);
pos = saveInt(pos, value.elab.ip);
pos = saveShort(pos, value.elab.localPort);
pos = saveShort(pos, value.elab.remotePort);
unsigned char bufferFull = value.bufferFull;
pos = saveChar(pos, bufferFull);
return pos;
}
char * loadChar(char * buffer, unsigned char * value)
{
if (buffer == NULL)
......@@ -216,3 +276,75 @@ auto_ptr<Command> loadCommand(Header * head, char * body)
result->key = head->key;
return result;
}
char * loadPacket(char * buffer, PacketInfo * value, struct tcp_info & kernel,
IpHeader & ip, struct tcphdr & tcp)
{
char * pos = buffer;
value->kernel = &kernel;
value->ip = &ip;
value->tcp = &tcp;
unsigned int timeSeconds = 0;
pos = loadInt(pos, & timeSeconds);
value->packetTime.getTimeval()->tv_sec = timeSeconds;
unsigned int timeMicroseconds = 0;
pos = loadInt(pos, & timeMicroseconds);
value->packetTime.getTimeval()->tv_usec = timeMicroseconds;
unsigned int packetLength = 0;
pos = loadInt(pos, & packetLength);
value->packetLength = static_cast<int>(packetLength);
pos = loadChar(pos, & kernel.tcpi_state);
pos = loadChar(pos, & kernel.tcpi_ca_state);
pos = loadChar(pos, & kernel.tcpi_retransmits);
pos = loadChar(pos, & kernel.tcpi_probes);
pos = loadChar(pos, & kernel.tcpi_backoff);
pos = loadChar(pos, & kernel.tcpi_options);
unsigned char windowScale = 0;
pos = loadChar(pos, &windowScale);
kernel.tcpi_snd_wscale = windowScale & 0xf;
kernel.tcpi_rcv_wscale = (windowScale >> 4) & 0xf;
pos = loadInt(pos, & kernel.tcpi_rto);
pos = loadInt(pos, & kernel.tcpi_ato);
pos = loadInt(pos, & kernel.tcpi_snd_mss);
pos = loadInt(pos, & kernel.tcpi_rcv_mss);
pos = loadInt(pos, & kernel.tcpi_unacked);
pos = loadInt(pos, & kernel.tcpi_sacked);
pos = loadInt(pos, & kernel.tcpi_lost);
pos = loadInt(pos, & kernel.tcpi_retrans);
pos = loadInt(pos, & kernel.tcpi_fackets);
pos = loadInt(pos, & kernel.tcpi_last_data_sent);
pos = loadInt(pos, & kernel.tcpi_last_ack_sent);
pos = loadInt(pos, & kernel.tcpi_last_data_recv);
pos = loadInt(pos, & kernel.tcpi_last_ack_recv);
pos = loadInt(pos, & kernel.tcpi_pmtu);
pos = loadInt(pos, & kernel.tcpi_rcv_ssthresh);
pos = loadInt(pos, & kernel.tcpi_rtt);
pos = loadInt(pos, & kernel.tcpi_rttvar);
pos = loadInt(pos, & kernel.tcpi_snd_ssthresh);
pos = loadInt(pos, & kernel.tcpi_snd_cwnd);
pos = loadInt(pos, & kernel.tcpi_advmss);
pos = loadInt(pos, & kernel.tcpi_reordering);
memcpy(&ip, pos, sizeof(IpHeader));
pos += sizeof(IpHeader);
memcpy(&tcp, pos, sizeof(struct tcphdr));
pos += sizeof(struct tcphdr);