Commit b53340e6 authored by Jonathon Duerig's avatar Jonathon Duerig

Host of fixes making the monitor and magent interoperate. Added a --daemonize option.

parent d6dd8938
......@@ -12,6 +12,7 @@ using namespace std;
void NewConnectionCommand::run(std::multimap<Time, Connection *> &)
{
logWrite(COMMAND_INPUT, "Running NEW_CONNECTION_COMMAND");
std::map<Order, Connection>::iterator pos
= global::connections.find(key);
if (pos == global::connections.end())
......@@ -31,6 +32,7 @@ void NewConnectionCommand::runConnect(Connection *,
void TrafficModelCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
logWrite(COMMAND_INPUT, "Running TRAFFIC_MODEL_COMMAND");
std::auto_ptr<TrafficModel> model(new CircularTraffic());
conn->setTraffic(model);
}
......@@ -40,6 +42,7 @@ void TrafficModelCommand::runConnect(Connection * conn,
void ConnectionModelCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
logWrite(COMMAND_INPUT, "Running CONNECTION_MODEL_COMMAND");
conn->addConnectionModelParam(*this);
}
......@@ -48,6 +51,7 @@ void ConnectionModelCommand::runConnect(Connection * conn,
void SensorCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
logWrite(COMMAND_INPUT, "Running SENSOR_COMMAND");
conn->addSensor(*this);
}
......@@ -56,6 +60,7 @@ void SensorCommand::runConnect(Connection * conn,
void ConnectCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
logWrite(COMMAND_INPUT, "Running CONNECT_COMMAND");
conn->connect();
}
......@@ -64,6 +69,7 @@ void ConnectCommand::runConnect(Connection * conn,
void TrafficWriteCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> & schedule)
{
logWrite(COMMAND_INPUT, "Running TRAFFIC_WRITE_COMMAND");
conn->addTrafficWrite(*this, schedule);
}
......@@ -71,6 +77,7 @@ void TrafficWriteCommand::runConnect(Connection * conn,
void DeleteConnectionCommand::run(std::multimap<Time, Connection *> & schedule)
{
logWrite(COMMAND_INPUT, "Running DELETE_CONNECTION_COMMAND");
std::map<Order, Connection>::iterator pos
= global::connections.find(key);
if (pos != global::connections.end())
......
......@@ -14,24 +14,39 @@ Connection::Connection()
: isConnected(false)
, bufferFull(false)
{
logWrite(CONNECTION, "Connection created");
}
Connection::Connection(Connection const & right)
: peer(right.peer->clone())
, traffic(right.traffic->clone())
, measurements(right.measurements)
: measurements(right.measurements)
, isConnected(right.isConnected)
, bufferFull(right.bufferFull)
, nextWrite(right.nextWrite)
{
logWrite(CONNECTION, "Connection copy constructed");
if (right.peer.get() != NULL)
{
peer = right.peer->clone();
}
if (right.traffic.get() != NULL)
{
traffic = right.traffic->clone();
}
}
Connection & Connection::operator=(Connection const & right)
{
logWrite(CONNECTION, "Connection assigned");
if (this != &right)
{
peer = right.peer->clone();
traffic = right.traffic->clone();
if (right.peer.get() != NULL)
{
peer = right.peer->clone();
}
if (right.traffic.get() != NULL)
{
traffic = right.traffic->clone();
}
measurements = right.measurements;
isConnected = right.isConnected;
bufferFull = right.bufferFull;
......@@ -43,17 +58,20 @@ Connection & Connection::operator=(Connection const & right)
void Connection::reset(Order const & newElab,
std::auto_ptr<ConnectionModel> newPeer)
{
logWrite(CONNECTION, "Peer added to connection");
elab = newElab;
peer = newPeer;
}
void Connection::setTraffic(std::auto_ptr<TrafficModel> newTraffic)
{
logWrite(CONNECTION, "Traffic Model added to connection");
traffic = newTraffic;
}
void Connection::addConnectionModelParam(ConnectionModelCommand const & param)
{
logWrite(CONNECTION, "Connection Model Parameter added to connection");
if (peer.get() != NULL)
{
peer->addParam(param);
......@@ -67,6 +85,7 @@ void Connection::addConnectionModelParam(ConnectionModelCommand const & param)
void Connection::connect(void)
{
logWrite(CONNECTION, "Connection connected");
if (peer.get() != NULL)
{
planet.transport = TCP_CONNECTION;
......@@ -108,6 +127,7 @@ void Connection::addTrafficWrite(TrafficWriteCommand const & newWrite,
void Connection::addSensor(SensorCommand const & newSensor)
{
logWrite(CONNECTION, "Sensor added to connection");
measurements.addSensor(newSensor);
}
......@@ -160,6 +180,7 @@ Time Connection::writeToConnection(Time const & previousTime)
void Connection::cleanup(std::multimap<Time, Connection *> & schedule)
{
logWrite(CONNECTION, "Connection cleanup");
if (isConnected)
{
global::planetMap.erase(planet);
......
......@@ -12,64 +12,52 @@ DirectInput::DirectInput()
, monitorSocket(-1)
, index(0)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
{
logWrite(ERROR, "Unable to generate a command accept socket. "
"No incoming command connections will ever be accepted: %s",
strerror(errno));
}
else
{
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(global::monitorServerPort);
address.sin_addr.s_addr = INADDR_ANY;
int error = bind(sockfd, reinterpret_cast<struct sockaddr *>(&address),
sizeof(struct sockaddr));
if (error == -1)
{
logWrite(ERROR, "Unable to bind a command accept socket. "
"No incoming command connections will ever be accepted: %s",
strerror(errno));
close(sockfd);
}
else
{
setDescriptor(sockfd);
monitorAccept = sockfd;
}
}
logWrite(COMMAND_INPUT, "Creating the command accept socket");
monitorAccept = createServer(global::monitorServerPort,
"Command accept socket (No incoming command "
"connections will be accepted.");
}
DirectInput::~DirectInput()
{
logWrite(COMMAND_INPUT, "Destroying DirectInput()");
if (monitorAccept != -1)
{
clearDescriptor(monitorAccept);
close(monitorAccept);
}
if (monitorSocket != -1)
{
clearDescriptor(monitorSocket);
close(monitorSocket);
}
}
void DirectInput::nextCommand(fd_set * readable)
{
currentCommand.reset(NULL);
// logWrite(COMMAND_INPUT, "Before ACCEPTING check");
if (state == ACCEPTING && monitorAccept != -1
&& FD_ISSET(monitorAccept, readable))
{
logWrite(COMMAND_INPUT, "Creating the command socket");
monitorSocket = acceptServer(monitorAccept, NULL,
"Command socket (Incoming command "
"connection was not accepted)");
if (monitorSocket != -1)
{
state = HEADER;
}
}
if (monitorSocket == -1)
{
state = ACCEPTING;
}
if (state == HEADER && FD_ISSET(monitorSocket, readable))
// logWrite(COMMAND_INPUT, "Before HEADER check");
if (state == HEADER && monitorSocket != -1
&& FD_ISSET(monitorSocket, readable))
{
int error = recv(monitorSocket, headerBuffer + index,
Header::headerSize - index, 0);
if (error == Header::headerSize - index)
{
// logWrite(COMMAND_INPUT, "Finished reading a command header");
loadHeader(headerBuffer, &commandHeader);
index = 0;
state = BODY;
......@@ -80,20 +68,31 @@ void DirectInput::nextCommand(fd_set * readable)
}
else if (error == 0)
{
logWrite(EXCEPTION, "Read count of 0 returned (state BODY): %s",
strerror(errno));
disconnect();
}
else if (error == -1)
{
logWrite(EXCEPTION, "Failed read on monitorSocket state HEADER: %s",
logWrite(EXCEPTION, "Failed read on monitorSocket "
"(state HEADER) index=%d: %s", index,
strerror(errno));
}
}
if (state == BODY && FD_ISSET(monitorSocket, readable))
// logWrite(COMMAND_INPUT, "Before BODY check");
if (state == BODY && monitorSocket != -1
&& FD_ISSET(monitorSocket, readable))
{
int error = recv(monitorSocket, bodyBuffer + index,
commandHeader.size - index, 0);
int error = 0;
if (index != commandHeader.size)
{
error = recv(monitorSocket, bodyBuffer + index,
commandHeader.size - index, 0);
}
if (error == commandHeader.size - index)
{
// logWrite(COMMAND_INPUT, "Finished reading a command: CHECKSUM=%d",
// checksum());
currentCommand = loadCommand(&commandHeader, bodyBuffer);
index = 0;
state = HEADER;
......@@ -104,14 +103,19 @@ void DirectInput::nextCommand(fd_set * readable)
}
else if (error == 0)
{
logWrite(EXCEPTION, "Read count of 0 returned (state BODY): %s",
strerror(errno));
disconnect();
}
else if (error == -1)
{
logWrite(EXCEPTION, "Failed read on monitorSocket state BODY: %s",
logWrite(EXCEPTION, "Failed read on monitorSocket "
"(state BODY) index=%d: %s", index,
strerror(errno));
}
}
// logWrite(EXCEPTION, "monitorSocket: %d, FD_ISSET: %d", monitorSocket,
// FD_ISSET(monitorSocket, & global::readers));
}
int DirectInput::getMonitorSocket(void)
......@@ -121,12 +125,32 @@ int DirectInput::getMonitorSocket(void)
void DirectInput::disconnect(void)
{
logWrite(COMMAND_INPUT, "Disconnecting from command socket");
if (monitorSocket != -1)
{
clearDescriptor(monitorSocket);
close(monitorSocket);
monitorSocket = -1;
}
index = 0;
state = ACCEPTING;
currentCommand.reset(NULL);
}
int DirectInput::checksum(void)
{
int flip = 1;
int total = 0;
int i = 0;
for (i = 0; i < Header::headerSize; ++i)
{
total += (headerBuffer[i] & 0xff) * flip;
// flip *= -1;
}
for (i = 0; i < commandHeader.size; ++i)
{
total += (bodyBuffer[i] & 0xff) * flip;
// flip *= -1;
}
return total;
}
......@@ -14,6 +14,7 @@ public:
virtual void nextCommand(fd_set * readable);
virtual int getMonitorSocket(void);
virtual void disconnect(void);
int checksum(void);
private:
enum MonitorState
{
......
......@@ -222,36 +222,10 @@ enum { SNIFF_WAIT = 10 };
void KernelTcp::init(void)
{
int error = 0;
// Set up the peerAccept socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
{
logWrite(ERROR, "Unable to generate a peer accept socket. "
"No incoming peer connections will ever be accepted: %s",
strerror(errno));
}
else
{
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(global::peerServerPort);
address.sin_addr.s_addr = INADDR_ANY;
error = bind(sockfd, reinterpret_cast<struct sockaddr *>(&address),
sizeof(struct sockaddr));
if (error == -1)
{
logWrite(ERROR, "Unable to bind peer accept socket. "
"No incoming peer connections will ever be accepted: %s",
strerror(errno));
close(sockfd);
}
else
{
setDescriptor(sockfd);
global::peerAccept = sockfd;
}
}
global::peerAccept = createServer(global::peerServerPort,
"Peer accept socket (No incoming peer "
"connections will be accepted)");
// Set up the connectionModelExemplar
global::connectionModelExemplar.reset(new KernelTcp());
......@@ -311,43 +285,17 @@ void KernelTcp::addNewPeer(fd_set * readable)
&& FD_ISSET(global::peerAccept, readable))
{
struct sockaddr_in remoteAddress;
socklen_t addressSize = sizeof(remoteAddress);
int fd = accept(global::peerAccept,
reinterpret_cast<struct sockaddr *>(&remoteAddress),
&addressSize);
int fd = acceptServer(global::peerAccept, &remoteAddress,
"Peer socket (Incoming peer connection was not "
"accepted)");
if (fd != -1)
{
// Add the peer.
int flags = fcntl(fd, F_GETFL);
if (flags != -1)
{
int error = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (error != -1)
{
global::peers.push_back(
make_pair(fd, ipToString(remoteAddress.sin_addr.s_addr)));
setDescriptor(fd);
logWrite(PEER_CYCLE,
"Peer connection %d from %s was accepted normally.",
global::peers.back().first,
global::peers.back().second.c_str());
}
else
{
logWrite(EXCEPTION, "fctl(F_SETFL) failed: %s", strerror(errno));
close(fd);
}
}
else
{
logWrite(EXCEPTION, "fctl(F_GETFL) failed: %s", strerror(errno));
close(fd);
}
}
else
{
logWrite(EXCEPTION, "accept() called on a peer connection failed: %s",
strerror(errno));
global::peers.push_back(make_pair(fd, ipToString(
remoteAddress.sin_addr.s_addr)));
logWrite(PEER_CYCLE,
"Peer connection %d from %s was accepted normally.",
global::peers.back().first,
global::peers.back().second.c_str());
}
}
}
......@@ -357,7 +305,7 @@ void KernelTcp::readFromPeers(fd_set * readable)
list< pair<int, string> >::iterator pos = global::peers.begin();
while (pos != global::peers.end())
{
if (FD_ISSET(pos->first, readable))
if (pos->first != -1 && FD_ISSET(pos->first, readable))
{
static const int bufferSize = 8096;
static char buffer[bufferSize];
......@@ -398,7 +346,7 @@ void KernelTcp::readFromPeers(fd_set * readable)
void KernelTcp::packetCapture(fd_set * readable)
{
unsigned char * args = NULL;
if (FD_ISSET(pcapfd, readable))
if (pcapfd != -1 && FD_ISSET(pcapfd, readable))
{
pcap_dispatch(pcapDescriptor, -1, kernelTcpCallback, args);
}
......
......@@ -51,7 +51,7 @@ TrivialCommandOutput.o: TrivialCommandOutput.cc lib.h log.h saveload.h CommandOu
log.o: log.cc lib.h log.h
g++ -I. -g -Wall -c log.cc
main.o: main.cc lib.h log.h CommandInput.h CommandOutput.h Command.h Time.h Connection.h Sensor.h TrafficModel.h KernelTcp.h saveload.h
main.o: main.cc lib.h log.h CommandInput.h CommandOutput.h Command.h Time.h Connection.h Sensor.h TrafficModel.h KernelTcp.h saveload.h DirectInput.h TrivialCommandOutput.h
g++ -I. -g -Wall -c main.cc
saveload.o: saveload.cc lib.h log.h saveload.h Command.h
......
......@@ -7,8 +7,8 @@ ARGS=$*
#
# Start up our own measurement agent
#
echo $SH ${MAGENT_DIR}/run-magent.sh $ARGS
$SH ${MAGENT_DIR}/run-magent.sh $ARGS
echo $SH ${MAGENT_DIR}/run-magent.sh #$ARGS
$SH ${MAGENT_DIR}/run-magent.sh #$ARGS
# Kill the agent if we get killed - TODO: harsher kill?
# Because the magent backgrounds itself, it's harder to figure out
# what its pid is, just just do a killall
......
......@@ -181,6 +181,7 @@ namespace global
extern int connectionModelArg;
extern unsigned short peerServerPort;
extern unsigned short monitorServerPort;
extern bool doDaemonize;
extern int peerAccept;
extern std::auto_ptr<ConnectionModel> connectionModelExemplar;
......@@ -202,5 +203,7 @@ namespace global
void setDescriptor(int fd);
void clearDescriptor(int fd);
std::string ipToString(unsigned int ip);
int createServer(int port, std::string const & debugString);
int acceptServer(int acceptfd, struct sockaddr_in * remoteAddress,
std::string const & debugString);
#endif
......@@ -58,6 +58,14 @@ static void logPrefix(int flags)
{
fprintf(logFile, "MAIN_LOOP ");
}
if (flags & COMMAND_INPUT)
{
fprintf(logFile, "COMMAND_INPUT ");
}
if (flags & CONNECTION)
{
fprintf(logFile, "CONNECTION ");
}
if (logTimestamp)
{
struct timeval now;
......
......@@ -38,16 +38,18 @@ void logWrite(int flags, char const * format, ...);
enum LOG_TYPE
{
ERROR = 0x01,
EXCEPTION = 0x02,
PEER_CYCLE = 0x04,
SENSOR = 0x08,
CONNECTION_MODEL = 0x10,
ROBUST = 0x20,
MAIN_LOOP = 0x40,
ERROR = 0x001,
EXCEPTION = 0x002,
PEER_CYCLE = 0x004,
SENSOR = 0x008,
CONNECTION_MODEL = 0x010,
ROBUST = 0x020,
MAIN_LOOP = 0x040,
COMMAND_INPUT = 0x080,
CONNECTION = 0x100,
// Shortcuts for common cases.
LOG_NOTHING = 0x00,
LOG_EVERYTHING = 0x7f
LOG_NOTHING = 0x000,
LOG_EVERYTHING = 0x1ff
};
#endif
......@@ -12,6 +12,9 @@
#include "KernelTcp.h"
#include "DirectInput.h"
#include "TrivialCommandOutput.h"
#include <iostream>
using namespace std;
......@@ -21,6 +24,7 @@ namespace global
int connectionModelArg = 0;
unsigned short peerServerPort = 0;
unsigned short monitorServerPort = 0;
bool doDaemonize = false;
int peerAccept = -1;
string interface;
......@@ -51,11 +55,14 @@ int main(int argc, char * argv[])
{
processArgs(argc, argv);
init();
// Run with no change in directory, and no closing of standard files.
int error = daemon(1, 1);
if (error == -1)
if (global::doDaemonize)
{
logWrite(ERROR, "Daemonization failed: %s", strerror(errno));
// Run with no change in directory, and no closing of standard files.
int error = daemon(1, 1);
if (error == -1)
{
logWrite(ERROR, "Daemonization failed: %s", strerror(errno));
}
}
mainLoop();
return 0;
......@@ -67,16 +74,18 @@ void usageMessage(char *progname) {
cerr << " --peerserverport=<int> " << endl;
cerr << " --monitorserverport=<int> " << endl;
cerr << " --interface=<iface> " << endl;
cerr << " --daemonize " << endl;
logWrite(ERROR, "Bad command line argument", global::connectionModelArg);
}
void processArgs(int argc, char * argv[])
{
// Defaults, in case the user does not pass us explicit values
global::connectionModelArg = CONNECTION_MODEL_NULL;
global::connectionModelArg = CONNECTION_MODEL_KERNEL;
global::peerServerPort = 3491;
global::monitorServerPort = 4200;
global::interface = "vnet";
global::doDaemonize = false;
static struct option longOptions[] = {
// If you modify this, make sure to modify the shortOptions string below
......@@ -85,11 +94,12 @@ void processArgs(int argc, char * argv[])
{"peerserverport", required_argument, NULL, 'p'},
{"monitorserverport", required_argument, NULL, 'm'},
{"interface", required_argument, NULL, 'i'},
{"daemonize", no_argument , NULL, 'd'},
// Required so that getopt_long can find the end of the list
{NULL, 0, NULL, 0}
};
string shortOptions = "c:p:m:i:";
string shortOptions = "c:p:m:i:d";
// Not used
int longIndex;
......@@ -142,6 +152,9 @@ void processArgs(int argc, char * argv[])
case 'i':
global::interface = optArg;
break;
case 'd':
global::doDaemonize = true;
break;
case '?':
default:
usageMessage(argv[0]);
......@@ -170,8 +183,8 @@ void init(void)
FD_ZERO(&global::readers);
global::maxReader = -1;
global::input.reset(new NullCommandInput());
global::output.reset(new NullCommandOutput());
global::input.reset(new DirectInput());
global::output.reset(new TrivialCommandOutput());
}
void mainLoop(void)
......@@ -181,6 +194,10 @@ void mainLoop(void)
// Select on file descriptors
while (true)
{
// struct timeval debugTimeout;
// debugTimeout.tv_sec = 0;
// debugTimeout.tv_usec = 100000;
fd_set readable = global::readers;
Time timeUntilWrite;
struct timeval * waitPeriod;
......@@ -199,14 +216,16 @@ void mainLoop(void)
}
logWrite(MAIN_LOOP, "Select");
int error = select(global::maxReader + 1, &readable, NULL, NULL,
// &debugTimeout);
waitPeriod);
logWrite(MAIN_LOOP, "After select");
if (error == -1)