diff --git a/pelab/magent/Connection.cc b/pelab/magent/Connection.cc index 44b95ec6e63242a9f55ef226d030de7abd852e76..0a8201b277dda3da430ad4fe91533bc70a86f2a0 100755 --- a/pelab/magent/Connection.cc +++ b/pelab/magent/Connection.cc @@ -137,25 +137,28 @@ void Connection::captureSend(PacketInfo * packet) { logWrite(SENSOR, "Captured a send packet"); Sensor * head = measurements.getHead(); + packet->elab = elab; + packet->bufferFull = bufferFull; if (head != NULL && isConnected) { - packet->elab = elab; - packet->bufferFull = bufferFull; head->captureSend(packet); } + replayWritePacket(PACKET_INFO_SEND_COMMAND, packet); } void Connection::captureAck(PacketInfo * packet) { logWrite(SENSOR, "Captured an ack packet"); Sensor * head = measurements.getHead(); + packet->elab = elab; + packet->bufferFull = bufferFull; if (head != NULL && isConnected) { - packet->elab = elab; - packet->bufferFull = bufferFull; head->captureAck(packet); } + replayWritePacket(PACKET_INFO_ACK_COMMAND, packet); } + ConnectionModel const * Connection::getConnectionModel(void) { return peer.get(); diff --git a/pelab/magent/ConnectionModel.h b/pelab/magent/ConnectionModel.h index bc86a2df343c3dcb070ec7673578a7fc8f482cde..703ee8f51f1358f49c0d3df41f8badd38a8aac41 100644 --- a/pelab/magent/ConnectionModel.h +++ b/pelab/magent/ConnectionModel.h @@ -16,6 +16,13 @@ public: // an error. Errno is not preserved. virtual int writeMessage(int size, WriteResult & result)=0; virtual bool isConnected(void)=0; + + // These are static members which are used. They are called from + // switch statements in main.cc + // static void init(void); + // static void addNewPeer(fd_set * readable); + // static void readFromPeers(fd_set * readable); + // static void packetCapture(fd_set * readable); }; class ConnectionModelNull : public ConnectionModel diff --git a/pelab/magent/DirectInput.cc b/pelab/magent/DirectInput.cc index 9892a6eb9099aca08798fd59862ca62fd9e569c0..90c3460e09628b3d84f292013f122d4197964fdc 100644 --- a/pelab/magent/DirectInput.cc +++ b/pelab/magent/DirectInput.cc @@ -93,6 +93,13 @@ void DirectInput::nextCommand(fd_set * readable) { // logWrite(COMMAND_INPUT, "Finished reading a command: CHECKSUM=%d", // checksum()); + if (global::replayArg == REPLAY_SAVE + && (commandHeader.type == NEW_CONNECTION_COMMAND + || commandHeader.type == DELETE_CONNECTION_COMMAND + || commandHeader.type == SENSOR_COMMAND)) + { + replayWriteCommand(headerBuffer, bodyBuffer, commandHeader.size); + } currentCommand = loadCommand(&commandHeader, bodyBuffer); index = 0; state = HEADER; diff --git a/pelab/magent/Time.cc b/pelab/magent/Time.cc index 614b69dd9e257383da388a9942c73fac2b37168a..fb1d3bfa35ff39c8bca07a2e63cc8c953bdfa2ae 100644 --- a/pelab/magent/Time.cc +++ b/pelab/magent/Time.cc @@ -27,6 +27,11 @@ struct timeval * Time::getTimeval(void) return &data; } +struct timeval const * Time::getTimeval(void) const +{ + return &data; +} + Time Time::operator+(int const & right) const { Time result; diff --git a/pelab/magent/Time.h b/pelab/magent/Time.h index 75f299f6bcf425232e1407790aa01f5bfd9c5a70..7fb50e5e88daebaee5e9a8e7ab800390d2fc0d47 100755 --- a/pelab/magent/Time.h +++ b/pelab/magent/Time.h @@ -10,6 +10,7 @@ public: Time(struct timeval const & newData); long long toMilliseconds(void) const; struct timeval * getTimeval(void); + struct timeval const * getTimeval(void) const; Time operator+(int const & right) const; Time operator-(Time const & right) const; bool operator<(Time const & right) const; diff --git a/pelab/magent/lib.h b/pelab/magent/lib.h index a323822c862d7c0c8c139e0c6f05e778ddd62a6a..22bf0af12452118f098b4fd4aafa7e3c34063701 100755 --- a/pelab/magent/lib.h +++ b/pelab/magent/lib.h @@ -51,6 +51,12 @@ int createServer(int port, std::string const & debugString); int acceptServer(int acceptfd, struct sockaddr_in * remoteAddress, std::string const & debugString); +class PacketInfo; + +void replayWriteCommand(char * head, char * body, unsigned short bodySize); +void replayWritePacket(int command, PacketInfo * packet); + + // Enum of header types -- to-monitor (Events, etc.) enum { @@ -67,7 +73,10 @@ enum SENSOR_COMMAND = 3, CONNECT_COMMAND = 4, TRAFFIC_WRITE_COMMAND = 5, - DELETE_CONNECTION_COMMAND = 6 + DELETE_CONNECTION_COMMAND = 6, + // These are pseudo-commands for replay. + PACKET_INFO_SEND_COMMAND = 7, + PACKET_INFO_ACK_COMMAND = 8 }; // This is used for the type field in a SensorCommand @@ -177,6 +186,12 @@ struct IpHeader struct PacketInfo { + // packetTime+packetLength+kernel+elab + enum {size = sizeof(int)*(2 +1 +21 + 1) + + sizeof(short)*(0 +0 +0 + 2) + + sizeof(char)*(0 +0 +7 + 1) + + sizeof(IpHeader) + sizeof(struct tcphdr)}; + Time packetTime; int packetLength; struct tcp_info const * kernel; @@ -192,6 +207,13 @@ enum CONNECTION_MODEL_KERNEL = 1 }; +enum +{ + NO_REPLAY = 0, + REPLAY_LOAD = 1, + REPLAY_SAVE = 2 +}; + class ConnectionModel; class Connection; class CommandInput; @@ -203,6 +225,8 @@ namespace global extern unsigned short peerServerPort; extern unsigned short monitorServerPort; extern bool doDaemonize; + extern int replayArg; + extern int replayfd; extern int peerAccept; extern std::auto_ptr connectionModelExemplar; diff --git a/pelab/magent/main.cc b/pelab/magent/main.cc index 7b5b76cce2bf4b5e65213d802ac959f7a427fa71..1784dfce708b63546f5d3efa79dfc167988153f3 100755 --- a/pelab/magent/main.cc +++ b/pelab/magent/main.cc @@ -25,6 +25,8 @@ namespace global unsigned short peerServerPort = 0; unsigned short monitorServerPort = 0; bool doDaemonize = false; + int replayArg = NO_REPLAY; + int replayfd = -1; int peerAccept = -1; string interface; @@ -46,6 +48,7 @@ void usageMessage(char *progname); void processArgs(int argc, char * argv[]); void init(void); void mainLoop(void); +void replayLoop(void); void writeToConnections(multimap & schedule); void addNewPeer(fd_set * readable); void readFromPeers(fd_set * readable); @@ -53,6 +56,7 @@ void packetCapture(fd_set * readable); int main(int argc, char * argv[]) { + umask(0); processArgs(argc, argv); init(); if (global::doDaemonize) @@ -64,17 +68,27 @@ int main(int argc, char * argv[]) logWrite(ERROR, "Daemonization failed: %s", strerror(errno)); } } - mainLoop(); + if (global::replayArg == REPLAY_LOAD) + { + replayLoop(); + } + else + { + mainLoop(); + } return 0; } -void usageMessage(char *progname) { +void usageMessage(char *progname) +{ cerr << "Usage: " << progname << " [options]" << endl; cerr << " --connectionmodel= " << endl; cerr << " --peerserverport= " << endl; cerr << " --monitorserverport= " << endl; cerr << " --interface= " << endl; cerr << " --daemonize " << endl; + cerr << " --replay-save= " << endl; + cerr << " --replay-loae= " << endl; logWrite(ERROR, "Bad command line argument", global::connectionModelArg); } @@ -86,6 +100,8 @@ void processArgs(int argc, char * argv[]) global::monitorServerPort = 4200; global::interface = "vnet"; global::doDaemonize = false; + global::replayArg = NO_REPLAY; + global::replayfd = -1; static struct option longOptions[] = { // If you modify this, make sure to modify the shortOptions string below @@ -95,22 +111,26 @@ void processArgs(int argc, char * argv[]) {"monitorserverport", required_argument, NULL, 'm'}, {"interface", required_argument, NULL, 'i'}, {"daemonize", no_argument , NULL, 'd'}, + {"replay-save", required_argument, NULL, 's'}, + {"replay-load", required_argument, NULL, 'l'}, // Required so that getopt_long can find the end of the list {NULL, 0, NULL, 0} }; - string shortOptions = "c:p:m:i:d"; + string shortOptions = "c:p:m:i:ds:l:"; // Not used int longIndex; // Loop until all options have been handled - while (true) { + while (true) + { int ch = getopt_long(argc, argv, shortOptions.c_str(), longOptions, &longIndex); // Detect end of options - if (ch == -1) { + if (ch == -1) + { break; } @@ -118,47 +138,100 @@ void processArgs(int argc, char * argv[]) // Make a copy of optarg that's more c++-y. string optArg; - if (optarg != NULL) { + if (optarg != NULL) + { optArg = optarg; } - switch ((char) ch) { - case 'c': - if (optArg == "null") { - global::connectionModelArg = CONNECTION_MODEL_NULL; - } else if (optArg == "kerneltcp") { - global::connectionModelArg = CONNECTION_MODEL_KERNEL; - } else { - usageMessage(argv[0]); - exit(1); + switch ((char) ch) + { + case 'c': + if (optArg == "null") { + global::connectionModelArg = CONNECTION_MODEL_NULL; + } + else if (optArg == "kerneltcp") + { + global::connectionModelArg = CONNECTION_MODEL_KERNEL; + } + else + { + usageMessage(argv[0]); + exit(1); + } + break; + case 'p': + if (sscanf(optarg,"%i",&argIntVal)) + { + usageMessage(argv[0]); + exit(1); + } + else + { + global::peerServerPort = argIntVal; + } + break; + case 'm': + if (sscanf(optarg,"%i",&argIntVal)) + { + usageMessage(argv[0]); + exit(1); + } + else + { + global::monitorServerPort = argIntVal; + } + break; + case 'i': + global::interface = optArg; + break; + case 'd': + global::doDaemonize = true; + break; + case 's': + if (global::replayArg == NO_REPLAY) + { + global::replayfd = open(optarg, O_WRONLY | O_CREAT | O_TRUNC, + S_IRWXU | S_IRWXG | S_IRWXO); + if (global::replayfd != -1) + { + global::replayArg = REPLAY_SAVE; } - break; - case 'p': - if (sscanf(optarg,"%i",&argIntVal)) { - usageMessage(argv[0]); - exit(1); - } else { - global::peerServerPort = argIntVal; + else + { + logWrite(ERROR, "Error opening replay-save file: %s: %s", optarg, + strerror(errno)); } - break; - case 'm': - if (sscanf(optarg,"%i",&argIntVal)) { - usageMessage(argv[0]); - exit(1); - } else { - global::monitorServerPort = argIntVal; + } + else + { + logWrite(ERROR, "replay-save option was invoked when replay " + "was already set"); + } + break; + case 'l': + if (global::replayArg == NO_REPLAY) + { + global::replayfd = open(optarg, O_RDONLY); + if (global::replayfd != -1) + { + global::replayArg = REPLAY_LOAD; } - break; - case 'i': - global::interface = optArg; - break; - case 'd': - global::doDaemonize = true; - break; - case '?': - default: - usageMessage(argv[0]); - exit(1); + else + { + logWrite(ERROR, "Error opening replay-load file: %s: %s", optarg, + strerror(errno)); + } + } + else + { + logWrite(ERROR, "replay-load option was invoked when replay " + "was already set"); + } + break; + case '?': + default: + usageMessage(argv[0]); + exit(1); } } } @@ -263,6 +336,142 @@ void mainLoop(void) } } +// Returns true on success +bool replayWrite(char * source, int size) +{ + bool result = true; + int error = write(global::replayfd, source, size); + if (error <= 0) + { + if (error == 0) + { + logWrite(EXCEPTION, "replayfd was closed unexpectedly"); + } + else if (error == -1) + { + logWrite(EXCEPTION, "Error writing replay output: %s", strerror(errno)); + } + result = false; + } + return result; +} + +void replayWriteCommand(char * head, char * body, unsigned short bodySize) +{ + bool success = true; + success = replayWrite(head, Header::headerSize); + if (success) + { + replayWrite(body, bodySize); + } +} + +void replayWritePacket(int command, PacketInfo * packet) +{ + Header head; + head.type = command; + head.size = PacketInfo::size; + head.key = packet->elab; + char headBuffer[Header::headerSize]; + saveHeader(headBuffer, head); + + bool success = replayWrite(headBuffer, Header::headerSize); + if (success) + { + char packetBuffer[PacketInfo::size]; + savePacket(packetBuffer, *packet); + replayWrite(packetBuffer, PacketInfo::size); + } +} + +// Returns true on success +bool replayRead(char * dest, int size) +{ + bool result = true; + int error = read(global::replayfd, dest, size); + if (error <= 0) + { + if (error == 0) + { + logWrite(EXCEPTION, "End of replay input"); + } + else if (error == -1) + { + logWrite(EXCEPTION, "Error reading replay input: %s", strerror(errno)); + } + result = false; + } + return result; +} + +void replayLoop(void) +{ + bool done = false; + char headerBuffer[Header::headerSize]; + Header head; + char packetBuffer[(PacketInfo::size > sizeof(SensorCommand)) + ? PacketInfo::size : sizeof(SensorCommand)]; + struct tcp_info kernel; + IpHeader ip; + struct tcphdr tcp; + PacketInfo packet; + map streams; + + done = ! replayRead(headerBuffer, Header::headerSize); + while (!done) + { + loadHeader(headerBuffer, &head); + switch(head.type) + { + case NEW_CONNECTION_COMMAND: + streams.insert(make_pair(head.key, SensorList())); + break; + case DELETE_CONNECTION_COMMAND: + streams.erase(head.key); + break; + case SENSOR_COMMAND: + done = ! replayRead(packetBuffer, sizeof(int)); + if (!done) + { + unsigned int sensorType = 0; + loadInt(packetBuffer, & sensorType); + SensorCommand sensor; + sensor.type = sensorType; + streams[head.key].addSensor(sensor); + } + break; + case PACKET_INFO_SEND_COMMAND: + case PACKET_INFO_ACK_COMMAND: + done = ! replayRead(packetBuffer, PacketInfo::size); + if (!done) + { + loadPacket(packetBuffer, &packet, kernel, ip, tcp); + Sensor * sensorHead = streams[head.key].getHead(); + if (sensorHead != NULL) + { + if (head.type == PACKET_INFO_SEND_COMMAND) + { + sensorHead->captureSend(&packet); + } + else + { + sensorHead->captureAck(&packet); + } + } + } + break; + default: + logWrite(ERROR, "Invalid command type in replay input: %d", head.type); + done = true; + break; + } + if (!done) + { + done = ! replayRead(headerBuffer, Header::headerSize); + } + } +} + void writeToConnections(multimap & schedule) { Time now = getCurrentTime(); diff --git a/pelab/magent/run-magent.sh b/pelab/magent/run-magent.sh index 27774b8879527bd4ee0198c7135c7357813ad38c..ec5c88dbea290d8ed5a3435815e1fd50cd1154ea 100644 --- a/pelab/magent/run-magent.sh +++ b/pelab/magent/run-magent.sh @@ -23,4 +23,4 @@ export LIBNETMON_OUTPUTFILE="/local/logs/libnetmon.out" # echo "Running PID $$" echo "Starting magent on $PLAB_IFACE ($PLAB_IP) Extra arguments: $*" -exec $AS_ROOT $NETMON_DIR/instrument-standalone.sh $MAGENT_DIR/$MAGENT --interface=$PLAB_IFACE $* +exec $AS_ROOT $NETMON_DIR/instrument-standalone.sh $MAGENT_DIR/$MAGENT --interface=$PLAB_IFACE --replay-save=/local/logs/stub.replay $* diff --git a/pelab/magent/saveload.cc b/pelab/magent/saveload.cc index 8c477d6c5d2b650d4a7ee0210fcf5e61544ea173..67f7d2ce93d82c6a3f5bbe3923800034f63c3fe5 100644 --- a/pelab/magent/saveload.cc +++ b/pelab/magent/saveload.cc @@ -72,6 +72,66 @@ char * saveHeader(char * buffer, Header const & value) } } +char * savePacket(char * buffer, PacketInfo const & value) +{ + char * pos = buffer; + struct tcp_info const * kernel = value.kernel; + + pos = saveInt(pos, value.packetTime.getTimeval()->tv_sec); + pos = saveInt(pos, value.packetTime.getTimeval()->tv_usec); + pos = saveInt(pos, value.packetLength); + pos = saveChar(pos, kernel->tcpi_state); + pos = saveChar(pos, kernel->tcpi_ca_state); + pos = saveChar(pos, kernel->tcpi_retransmits); + pos = saveChar(pos, kernel->tcpi_probes); + pos = saveChar(pos, kernel->tcpi_backoff); + pos = saveChar(pos, kernel->tcpi_options); + unsigned char windowScale = (kernel->tcpi_snd_wscale & 0xf) + | ((kernel->tcpi_rcv_wscale & 0xf) << 4); + pos = saveChar(pos, windowScale); + + pos = saveInt(pos, kernel->tcpi_rto); + pos = saveInt(pos, kernel->tcpi_ato); + pos = saveInt(pos, kernel->tcpi_snd_mss); + pos = saveInt(pos, kernel->tcpi_rcv_mss); + + pos = saveInt(pos, kernel->tcpi_unacked); + pos = saveInt(pos, kernel->tcpi_sacked); + pos = saveInt(pos, kernel->tcpi_lost); + pos = saveInt(pos, kernel->tcpi_retrans); + pos = saveInt(pos, kernel->tcpi_fackets); + + pos = saveInt(pos, kernel->tcpi_last_data_sent); + pos = saveInt(pos, kernel->tcpi_last_ack_sent); + pos = saveInt(pos, kernel->tcpi_last_data_recv); + pos = saveInt(pos, kernel->tcpi_last_ack_recv); + + pos = saveInt(pos, kernel->tcpi_pmtu); + pos = saveInt(pos, kernel->tcpi_rcv_ssthresh); + pos = saveInt(pos, kernel->tcpi_rtt); + pos = saveInt(pos, kernel->tcpi_rttvar); + pos = saveInt(pos, kernel->tcpi_snd_ssthresh); + pos = saveInt(pos, kernel->tcpi_snd_cwnd); + pos = saveInt(pos, kernel->tcpi_advmss); + pos = saveInt(pos, kernel->tcpi_reordering); + + memcpy(pos, value.ip, sizeof(IpHeader)); + pos += sizeof(IpHeader); + + memcpy(pos, value.tcp, sizeof(struct tcphdr)); + pos += sizeof(struct tcphdr); + + pos = saveChar(pos, value.elab.transport); + pos = saveInt(pos, value.elab.ip); + pos = saveShort(pos, value.elab.localPort); + pos = saveShort(pos, value.elab.remotePort); + + unsigned char bufferFull = value.bufferFull; + pos = saveChar(pos, bufferFull); + + return pos; +} + char * loadChar(char * buffer, unsigned char * value) { if (buffer == NULL) @@ -216,3 +276,75 @@ auto_ptr loadCommand(Header * head, char * body) result->key = head->key; return result; } + +char * loadPacket(char * buffer, PacketInfo * value, struct tcp_info & kernel, + IpHeader & ip, struct tcphdr & tcp) +{ + char * pos = buffer; + value->kernel = &kernel; + value->ip = &ip; + value->tcp = &tcp; + + unsigned int timeSeconds = 0; + pos = loadInt(pos, & timeSeconds); + value->packetTime.getTimeval()->tv_sec = timeSeconds; + unsigned int timeMicroseconds = 0; + pos = loadInt(pos, & timeMicroseconds); + value->packetTime.getTimeval()->tv_usec = timeMicroseconds; + unsigned int packetLength = 0; + pos = loadInt(pos, & packetLength); + value->packetLength = static_cast(packetLength); + pos = loadChar(pos, & kernel.tcpi_state); + pos = loadChar(pos, & kernel.tcpi_ca_state); + pos = loadChar(pos, & kernel.tcpi_retransmits); + pos = loadChar(pos, & kernel.tcpi_probes); + pos = loadChar(pos, & kernel.tcpi_backoff); + pos = loadChar(pos, & kernel.tcpi_options); + unsigned char windowScale = 0; + pos = loadChar(pos, &windowScale); + kernel.tcpi_snd_wscale = windowScale & 0xf; + kernel.tcpi_rcv_wscale = (windowScale >> 4) & 0xf; + + pos = loadInt(pos, & kernel.tcpi_rto); + pos = loadInt(pos, & kernel.tcpi_ato); + pos = loadInt(pos, & kernel.tcpi_snd_mss); + pos = loadInt(pos, & kernel.tcpi_rcv_mss); + + pos = loadInt(pos, & kernel.tcpi_unacked); + pos = loadInt(pos, & kernel.tcpi_sacked); + pos = loadInt(pos, & kernel.tcpi_lost); + pos = loadInt(pos, & kernel.tcpi_retrans); + pos = loadInt(pos, & kernel.tcpi_fackets); + + pos = loadInt(pos, & kernel.tcpi_last_data_sent); + pos = loadInt(pos, & kernel.tcpi_last_ack_sent); + pos = loadInt(pos, & kernel.tcpi_last_data_recv); + pos = loadInt(pos, & kernel.tcpi_last_ack_recv); + + pos = loadInt(pos, & kernel.tcpi_pmtu); + pos = loadInt(pos, & kernel.tcpi_rcv_ssthresh); + pos = loadInt(pos, & kernel.tcpi_rtt); + pos = loadInt(pos, & kernel.tcpi_rttvar); + pos = loadInt(pos, & kernel.tcpi_snd_ssthresh); + pos = loadInt(pos, & kernel.tcpi_snd_cwnd); + pos = loadInt(pos, & kernel.tcpi_advmss); + pos = loadInt(pos, & kernel.tcpi_reordering); + + memcpy(&ip, pos, sizeof(IpHeader)); + pos += sizeof(IpHeader); + + memcpy(&tcp, pos, sizeof(struct tcphdr)); + pos += sizeof(struct tcphdr); + + pos = loadChar(pos, & value->elab.transport); + pos = loadInt(pos, & value->elab.ip); + pos = loadShort(pos, & value->elab.localPort); + pos = loadShort(pos, & value->elab.remotePort); + + unsigned char bufferFull = 0; + pos = loadChar(pos, &bufferFull); + value->bufferFull = (bufferFull == 1); + + return pos; +} + diff --git a/pelab/magent/saveload.h b/pelab/magent/saveload.h index 09217a52cd861da5ea2d627c80511731fb85b15f..3ff455d0d96fbebfa70ebc18622db97fc22893be 100644 --- a/pelab/magent/saveload.h +++ b/pelab/magent/saveload.h @@ -19,14 +19,23 @@ public: + sizeof(unsigned int) }; }; +// These take in a current position in the buffer and return the +// position after the saved or loaded item. + char * saveChar(char * buffer, unsigned char value); char * saveShort(char * buffer, unsigned short value); char * saveInt(char * buffer, unsigned int value); char * saveHeader(char * buffer, Header const & value); +char * savePacket(char * buffer, PacketInfo const & value); + char * loadChar(char * buffer, unsigned char * value); char * loadShort(char * buffer, unsigned short * value); char * loadInt(char * buffer, unsigned int * value); char * loadHeader(char * buffer, Header * value); std::auto_ptr loadCommand(Header * head, char * body); +// It is presumed that value contains pointers to the various +// substructures that need to be filled. +char * loadPacket(char * buffer, PacketInfo * value, struct tcp_info & kernel, + IpHeader & ip, struct tcphdr & tcp); #endif