Commit 8679bec6 authored by Jonathon Duerig's avatar Jonathon Duerig

Added versioning to magent. Added UDP support to the sinews of the magent....

Added versioning to magent. Added UDP support to the sinews of the magent. Added a simple monitor mockup (not functional yet) which can drive the magent by giving it UDP write commands.
parent 16aaa101
......@@ -20,12 +20,13 @@ void NewConnectionCommand::run(std::multimap<Time, Connection *> &)
{
logWrite(COMMAND_INPUT, "Running NEW_CONNECTION_COMMAND: %s",
key.toString().c_str());
std::map<Order, Connection>::iterator pos
std::map<ElabOrder, Connection>::iterator pos
= global::connections.find(key);
if (pos == global::connections.end())
{
pos = global::connections.insert(make_pair(key, Connection())).first;
pos->second.reset(key, global::connectionModelExemplar->clone());
pos->second.reset(key, global::connectionModelExemplar->clone(),
transport);
}
}
......@@ -72,7 +73,7 @@ void ConnectCommand::runConnect(Connection * conn,
{
logWrite(COMMAND_INPUT, "Running CONNECT_COMMAND: %s",
key.toString().c_str());
conn->connect();
conn->connect(ip);
}
//-----------------------
......@@ -90,7 +91,7 @@ void DeleteConnectionCommand::run(std::multimap<Time, Connection *> & schedule)
{
logWrite(COMMAND_INPUT, "Running DELETE_CONNECTION_COMMAND: %s",
key.toString().c_str());
std::map<Order, Connection>::iterator pos
std::map<ElabOrder, Connection>::iterator pos
= global::connections.find(key);
if (pos != global::connections.end())
{
......
......@@ -16,7 +16,7 @@ class Command
public:
virtual void run(std::multimap<Time, Connection *> & schedule)
{
std::map<Order, Connection>::iterator pos
std::map<ElabOrder, Connection>::iterator pos
= global::connections.find(key);
if (pos != global::connections.end())
{
......@@ -32,16 +32,19 @@ public:
// We use a key here and look up the connection only on a run()
// because some commands delete a connection and we don't want later
// commands to keep the reference around.
Order key;
ElabOrder key;
};
class NewConnectionCommand : public Command
{
public:
NewConnectionCommand() : transport(TCP_CONNECTION) {}
virtual void run(std::multimap<Time, Connection *> &);
protected:
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &);
public:
unsigned char transport;
};
class TrafficModelCommand : public Command
......@@ -73,9 +76,13 @@ public:
class ConnectCommand : public Command
{
public:
ConnectCommand() : ip(0) {}
protected:
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &);
public:
unsigned int ip;
};
class TrafficWriteCommand : public Command
......
......@@ -32,7 +32,7 @@ public:
};
public:
virtual ~CommandOutput() {}
void eventMessage(std::string const & message, Order const & key,
void eventMessage(std::string const & message, ElabOrder const & key,
PathDirection dir=FORWARD_PATH)
{
if (dir == FORWARD_PATH)
......@@ -45,7 +45,8 @@ public:
}
}
void genericMessage(int type, std::string const & message, Order const & key)
void genericMessage(int type, std::string const & message,
ElabOrder const & key)
{
if (message.size() <= 0xffff && message.size() > 0)
{
......
......@@ -61,12 +61,29 @@ Connection & Connection::operator=(Connection const & right)
return *this;
}
void Connection::reset(Order const & newElab,
std::auto_ptr<ConnectionModel> newPeer)
void Connection::reset(ElabOrder const & newElab,
std::auto_ptr<ConnectionModel> newPeer,
unsigned char transport)
{
logWrite(CONNECTION, "Peer added to connection");
elab = newElab;
peer = newPeer;
planet.transport = transport;
switch (transport)
{
case TCP_CONNECTION:
planet.remotePort = global::peerServerPort;
break;
case UDP_CONNECTION:
planet.remotePort = global::peerUdpServerPort;
break;
default:
logWrite(ERROR, "Invalid transport protocol %d, "
"defaulting to TCP_CONNECTION", transport);
planet.transport = TCP_CONNECTION;
planet.remotePort = global::peerServerPort;
break;
}
}
void Connection::setTraffic(std::auto_ptr<TrafficModel> newTraffic)
......@@ -89,14 +106,12 @@ void Connection::addConnectionModelParam(ConnectionModelCommand const & param)
}
}
void Connection::connect(void)
void Connection::connect(unsigned int ip)
{
logWrite(CONNECTION, "Connection connected");
planet.ip = ip;
if (peer.get() != NULL)
{
planet.transport = TCP_CONNECTION;
planet.ip = elab.ip;
planet.remotePort = global::peerServerPort;
// planet is modified by ConnectionModel::connect()
peer->connect(planet);
isConnected = peer->isConnected();
......@@ -159,9 +174,9 @@ ConnectionModel const * Connection::getConnectionModel(void)
Time Connection::writeToConnection(Time const & previousTime)
{
WriteResult result;
result.planet.transport = TCP_CONNECTION;
result.planet.ip = elab.ip;
result.planet.remotePort = global::peerServerPort;
result.planet.transport = planet.transport;
result.planet.ip = planet.ip;
result.planet.remotePort = planet.remotePort;
if (isConnected)
{
result.planet.localPort = planet.localPort;
......
......@@ -14,7 +14,6 @@
class Time;
class ConnectionModel;
class TrafficModel;
class Sensor;
class ConnectionModelCommand;
class TrafficWriteCommand;
class SensorCommand;
......@@ -27,9 +26,12 @@ public:
Connection & operator=(Connection const & right);
// Called after the Connection is added to the map. Sets the
// elabOrder sorting element and sets up the Connection Model.
void reset(Order const & newElab,
std::auto_ptr<ConnectionModel> newPeer);
// elabOrder sorting element and sets up the Connection Model. Also
// sets the transport protocol of this connection (TCP_CONNECTION or
// UDP_CONNECTION)
void reset(ElabOrder const & newElab,
std::auto_ptr<ConnectionModel> newPeer,
unsigned char transport);
// Called when the monitor specifies which traffic model to use.
void setTraffic(std::auto_ptr<TrafficModel> newTraffic);
// Set a connection model parameter. Things like socket buffer
......@@ -37,7 +39,7 @@ public:
void addConnectionModelParam(ConnectionModelCommand const & param);
// This starts an attempt to connect through the connection
// model. Called when the monitor notifies of a connect.
void connect(void);
void connect(unsigned int ip);
// Notifies the traffic model of write information from the monitor.
void addTrafficWrite(TrafficWriteCommand const & newWrite,
std::multimap<Time, Connection *> & schedule);
......@@ -55,8 +57,8 @@ public:
private:
// There are two kinds of ordering. One is for commands from
// emulab. One is for the pcap analysis.
Order elab;
Order planet;
ElabOrder elab;
PlanetOrder planet;
std::auto_ptr<ConnectionModel> peer;
std::auto_ptr<TrafficModel> traffic;
......
......@@ -16,7 +16,7 @@ class ConnectionModel
public:
virtual ~ConnectionModel() {}
virtual std::auto_ptr<ConnectionModel> clone(void)=0;
virtual void connect(Order & planet)=0;
virtual void connect(PlanetOrder & planet)=0;
virtual void addParam(ConnectionModelCommand const & param)=0;
// Returns the number of bytes actually written or -1 if there was
// an error. Errno is not preserved.
......@@ -39,7 +39,7 @@ public:
{
return std::auto_ptr<ConnectionModel>(new ConnectionModelNull());
}
virtual void connect(Order &) {}
virtual void connect(PlanetOrder &) {}
virtual void addParam(ConnectionModelCommand const &) {}
static void init(void) {}
......@@ -52,6 +52,12 @@ public:
result.bufferFull = false;
return 0;
}
virtual int sendToMessage(int size, WriteResult & result)
{
result.isConnected = false;
result.bufferFull = false;
return 0;
}
virtual bool isConnected(void)
{
return false;
......
......@@ -17,6 +17,7 @@ DirectInput::DirectInput()
, monitorAccept(-1)
, monitorSocket(-1)
, index(0)
, versionSize(0)
{
logWrite(COMMAND_INPUT, "Creating the command accept socket on port %d",
global::monitorServerPort);
......@@ -53,8 +54,49 @@ void DirectInput::nextCommand(fd_set * readable)
"connection was not accepted)");
if (monitorSocket != -1)
{
state = HEADER_PREFIX;
}
}
if (state == HEADER_PREFIX && monitorSocket != -1
&& FD_ISSET(monitorSocket, readable))
{
int error = recv(monitorSocket, headerBuffer+index,
Header::PREFIX_SIZE - index, 0);
if (error == Header::PREFIX_SIZE - index)
{
char version = headerBuffer[Header::PREFIX_SIZE - 1];
switch (version)
{
case 0:
versionSize = Header::PREFIX_SIZE + Header::VERSION_0_SIZE;
break;
case 1:
versionSize = Header::PREFIX_SIZE + Header::VERSION_1_SIZE;
break;
default:
logWrite(ERROR, "Unknown version: %d"
", assuming that it really means version 1", version);
versionSize = Header::PREFIX_SIZE + Header::VERSION_1_SIZE;
break;
}
state = HEADER;
}
else if (error > 0)
{
index += error;
}
else if (error == 0)
{
logWrite(EXCEPTION, "Read count of 0 returned (state BODY): %s",
strerror(errno));
disconnect();
}
else if (error == -1)
{
logWrite(EXCEPTION, "Failed read on monitorSocket "
"(state HEADER_PREFIX) index=%d: %s", index,
strerror(errno));
}
}
// logWrite(COMMAND_INPUT, "Before HEADER check");
if (state == HEADER && monitorSocket != -1
......@@ -114,7 +156,7 @@ void DirectInput::nextCommand(fd_set * readable)
}
currentCommand = loadCommand(&commandHeader, bodyBuffer);
index = 0;
state = HEADER;
state = HEADER_PREFIX;
}
else if (error > 0)
{
......
......@@ -25,6 +25,7 @@ private:
enum MonitorState
{
ACCEPTING,
HEADER_PREFIX,
HEADER,
BODY
};
......@@ -34,6 +35,7 @@ private:
int monitorSocket;
int index;
char headerBuffer[Header::headerSize];
int versionSize;
Header commandHeader;
enum { bodyBufferSize = 0xffff };
char bodyBuffer[bodyBufferSize];
......
This diff is collapsed.
......@@ -23,12 +23,15 @@ public:
KernelTcp();
virtual ~KernelTcp();
virtual std::auto_ptr<ConnectionModel> clone(void);
virtual void connect(Order & planet);
virtual void connect(PlanetOrder & planet);
virtual void addParam(ConnectionModelCommand const & param);
virtual int writeMessage(int size, WriteResult & result);
virtual bool isConnected(void);
int getSock(void) const;
private:
int writeTcpMessage(int size, WriteResult & result);
int writeUdpMessage(int size, WriteResult & result);
private:
ConnectionState state;
int peersock;
......@@ -36,7 +39,6 @@ private:
int receiveBufferSize;
int maxSegmentSize;
int useNagles;
static const int MAX_WRITESIZE;
private:
Time debugPrevTime;
int debugByteCount;
......
......@@ -24,7 +24,8 @@
#ifndef SENSOR_LIST_H_STUB_2
#define SENSOR_LIST_H_STUB_2
class Sensor;
#include "Sensor.h"
class SensorCommand;
class NullSensor;
......
// lib.cc
#include "lib.h"
#include "log.h"
#include "saveload.h"
#include "ConnectionModel.h"
#include "Connection.h"
#include "CommandInput.h"
#include "CommandOutput.h"
#include "TrafficModel.h"
using namespace std;
namespace global
{
int connectionModelArg = 0;
unsigned short peerUdpServerPort = 0;
unsigned short peerServerPort = 0;
unsigned short monitorServerPort = 0;
bool doDaemonize = false;
int replayArg = NO_REPLAY;
int replayfd = -1;
list<int> replaySensors;
int peerAccept = -1;
string interface;
auto_ptr<ConnectionModel> connectionModelExemplar;
list< pair<int, string> > peers;
map<ElabOrder, Connection> connections;
// A connection is in this map only if it is currently connected.
map<PlanetOrder, Connection *> planetMap;
fd_set readers;
int maxReader = -1;
std::auto_ptr<CommandInput> input;
std::auto_ptr<CommandOutput> output;
int logFlags = LOG_EVERYTHING /*& ~SENSOR_COMPLETE*/;
// The versionless pre-history was '0'. We can be backwards
// compatible by taking advantage of the fact that the old headers
// has a char 'transport' that was always set to 0 for
// TCP_CONNECTION.
const unsigned char CONTROL_VERSION = 1;
}
size_t PacketInfo::census(void) const
{
savePacket(NULL, *this);
return static_cast<size_t>(getLastSaveloadSize());
}
void setDescriptor(int fd)
{
if (fd > -1 && fd < FD_SETSIZE)
{
FD_SET(fd, &(global::readers));
if (fd > global::maxReader)
{
global::maxReader = fd;
}
}
else
{
logWrite(ERROR, "Invalid descriptor sent to setDescriptor: "
"%d (FDSET_SIZE=%d)", fd, FD_SETSIZE);
}
}
void clearDescriptor(int fd)
{
if (fd > -1 && fd < FD_SETSIZE)
{
FD_CLR(fd, &(global::readers));
if (fd > global::maxReader)
{
global::maxReader = fd;
}
}
else
{
logWrite(ERROR, "Invalid descriptor sent to clearDescriptor: "
"%d (FDSET_SIZE=%d)", fd, FD_SETSIZE);
}
}
string ipToString(unsigned int ip)
{
struct in_addr address;
address.s_addr = ip;
return string(inet_ntoa(address));
}
int createServer(int port, string const & debugString)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1)
{
logWrite(ERROR, "socket(): %s: %s", debugString.c_str(), strerror(errno));
return -1;
}
int doesReuse = 1;
int error = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &doesReuse,
sizeof(doesReuse));
if (error == -1)
{
logWrite(ERROR, "setsockopt(SO_REUSEADDR): %s: %s", debugString.c_str(),
strerror(errno));
close(fd);
return -1;
}
int forcedSize = 262144;
error = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &forcedSize,
sizeof(forcedSize));
if (error == -1)
{
logWrite(ERROR, "Failed to set receive buffer size: %s",
strerror(errno));
close(fd);
return -1;
}
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(port);
address.sin_addr.s_addr = htonl(INADDR_ANY);
error = bind(fd, reinterpret_cast<struct sockaddr *>(&address),
sizeof(address));
if (error == -1)
{
logWrite(ERROR, "bind(): %s: %s", debugString.c_str(), strerror(errno));
close(fd);
return -1;
}
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
{
logWrite(ERROR, "fcntl(F_GETFL): %s: %s", debugString.c_str(),
strerror(errno));
close(fd);
return -1;
}
error = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (error == -1)
{
logWrite(ERROR, "fcntl(F_SETFL): %s: %s", debugString.c_str(),
strerror(errno));
close(fd);
return -1;
}
error = listen(fd, 4);
if (error == -1)
{
logWrite(ERROR, "listen(): %s: %s", debugString.c_str(), strerror(errno));
close(fd);
return -1;
}
setDescriptor(fd);
return fd;
}
int acceptServer(int acceptfd, struct sockaddr_in * remoteAddress,
string const & debugString)
{
struct sockaddr_in stackAddress;
struct sockaddr_in * address;
socklen_t addressSize = sizeof(struct sockaddr_in);
if (remoteAddress == NULL)
{
address = &stackAddress;
}
else
{
address = remoteAddress;
}
int resultfd = accept(acceptfd,
reinterpret_cast<struct sockaddr *>(address),
&addressSize);
if (resultfd == -1)
{
if (errno != EINTR && errno != EWOULDBLOCK && errno != ECONNABORTED
&& errno != EPROTO)
{
logWrite(EXCEPTION, "accept(): %s: %s", debugString.c_str(),
strerror(errno));
}
return -1;
}
int flags = fcntl(resultfd, F_GETFL, 0);
if (flags == -1)
{
logWrite(EXCEPTION, "fcntl(F_GETFL): %s: %s", debugString.c_str(),
strerror(errno));
close(resultfd);
return -1;
}
int error = fcntl(resultfd, F_SETFL, flags | O_NONBLOCK);
if (error == -1)
{
logWrite(EXCEPTION, "fcntl(F_SETFL): %s: %s", debugString.c_str(),
strerror(errno));
close(resultfd);
return -1;
}
setDescriptor(resultfd);
return resultfd;
}
// Returns true on success
bool replayWrite(char * source, int size)
{
bool result = true;
if (size == 0) {
return true;
}
int error = write(global::replayfd, source, size);
if (error <= 0)
{
if (error == 0)
{
logWrite(EXCEPTION, "replayfd was closed unexpectedly");
}
else if (error == -1)
{
logWrite(EXCEPTION, "Error writing replay output: %s", strerror(errno));
}
result = false;
}
return result;
}
void replayWriteCommand(char * head, char * body, unsigned short bodySize)
{
bool success = true;
success = replayWrite(head, Header::headerSize);
if (success)
{
replayWrite(body, bodySize);
}
}
void replayWritePacket(PacketInfo * packet)
{
Header head;
head.type = packet->packetType;
head.size = packet->census();
head.key = packet->elab;
char headBuffer[Header::headerSize];
saveHeader(headBuffer, head);
bool success = replayWrite(headBuffer, Header::headerSize);
if (success)
{
char *packetBuffer;
packetBuffer = static_cast<char*>(malloc(head.size));
//logWrite(REPLAY,"Making a packet buffer of size %d",head.size);
char* endptr = savePacket(& packetBuffer[0], *packet);
// find out how many bytes were written
int writtensize = (endptr - packetBuffer);
if (writtensize != head.size) {
logWrite(ERROR,"replayWritePacket(): Made packet save buffer of size "
"%d, but wrote %d", head.size, writtensize);
}
replayWrite(& packetBuffer[0], head.size);
free(packetBuffer);
}
}
......@@ -142,21 +142,48 @@ enum
UDP_CONNECTION = 1
};
struct Order
struct ElabOrder
{
static const size_t idSize = 32;
char id[idSize];
ElabOrder()
{
memset(id, '\0', idSize);
}
bool operator<(ElabOrder const & right) const
{
return memcmp(id, right.id, idSize) < 0;
}
bool operator==(ElabOrder const & right) const
{
return memcmp(id, right.id, idSize) == 0;
}
bool operator!=(ElabOrder const & right) const
{
return memcmp(id, right.id, idSize) != 0;
}
std::string toString(void) const
{
return std::string(id, idSize);
}
};
struct PlanetOrder
{
unsigned char transport;
unsigned int ip;
unsigned short localPort;
unsigned short remotePort;
Order()
PlanetOrder()
{
transport = TCP_CONNECTION;
ip = 0;
localPort = 0;
remotePort = 0;
}
bool operator<(Order const & right) const
bool operator<(PlanetOrder const & right) const
{
return std::make_pair(transport,
std::make_pair(ip,
......@@ -167,14 +194,14 @@ struct Order
std::make_pair(right.localPort,
right.remotePort)));
}
bool operator==(Order const & right) const
bool operator==(PlanetOrder const & right) const
{
return transport == right.transport
&& ip == right.ip
&& localPort == right.localPort
&& remotePort == right.remotePort;
}
bool operator!=(Order const & right) const
bool operator!=(PlanetOrder const & right) const
{
return !(*this == right);
}
......@@ -198,7 +225,7 @@ struct WriteResult
{
bool isConnected;
bool bufferFull;
Order planet;
PlanetOrder planet;
Time nextWrite;
};
......@@ -214,6 +241,9 @@ struct PacketInfo
{
size_t census(void) const;
// This should come first because it determines which of the other
// two options are selected.
unsigned char transport;
Time packetTime;
int packetLength;
/* ---IMPORTANT NOTE--- The tcp_info structure is built at the time of
......@@ -226,7 +256,13 @@ struct PacketInfo
std::list<Option> * ipOptions;
struct tcphdr const * tcp;
std::list<Option> * tcpOptions;
Order elab;
struct udphdr const * udp;
// The size of the captured payload. This may be less than the
// payload actually received (due to snaplen).
unsigned int payloadSize;
// A pointer to the first character of a packet after the headers
unsigned char const * payload;
ElabOrder elab;
bool bufferFull;
unsigned char packetType;
};
......@@ -252,6 +288,7 @@ class CommandOutput;
namespace global
{
extern int connectionModelArg;
extern unsigned short peerUdpServerPort;
extern unsigned short peerServerPort;