Commit 8d184eed authored by Jonathon Duerig's avatar Jonathon Duerig
Browse files

Added sensors, commands, peer communication, etc.

parent 34587863
...@@ -49,6 +49,8 @@ class ConnectionModelCommand : public Command ...@@ -49,6 +49,8 @@ class ConnectionModelCommand : public Command
protected: protected:
virtual void runConnect(Connection * conn, virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &); std::multimap<Time, Connection *> &);
int type;
int value;
}; };
class SensorCommand : public Command class SensorCommand : public Command
......
...@@ -20,9 +20,20 @@ public: ...@@ -20,9 +20,20 @@ public:
{ {
return currentCommand.get(); return currentCommand.get();
} }
virtual void nextCommand(fd_set & readable)=0; virtual void nextCommand(fd_set * readable)=0;
virtual int getMonitorSocket(void)=0;
virtual void disconnect(void)=0;
protected: protected:
std::auto_ptr<Command> currentCommand; std::auto_ptr<Command> currentCommand;
}; };
class NullCommandInput : public CommandInput
{
public:
virtual ~NullCommandInput() {}
virtual void nextCommand(fd_set *) {}
virtual int getMonitorSocket(void) { return -1; }
virtual void disconnect(void) {}
};
#endif #endif
...@@ -10,14 +10,34 @@ ...@@ -10,14 +10,34 @@
class CommandOutput class CommandOutput
{ {
public:
enum
{
SENDING_MESSAGE = 0,
DISCARDING_MESSAGE
}
public: public:
virtual ~CommandOutput() {} virtual ~CommandOutput() {}
void eventMessage(std::string const & message, Order const & key) void eventMessage(std::string const & message, Order const & key,
char direction)
{ {
if (message.size() <= 0xffff && message.size() > 0) if (message.size() <= 0xffff && message.size() > 0)
{ {
writeHeader(EVENT_TO_MONITOR, message.size(), key); Header prefix;
writeMessage(message.c_str(), message.size()); prefix.type = EVENT_TO_MONITOR;
prefix.size = message.size();
prefix.key = key;
char headerBuffer[Header::headerSize];
saveHeader(headerBuffer, prefix);
int result = startMessage(Header::headerSize + sizeof(direction)
+ message.size());
if (result == SENDING_MESSAGE)
{
writeMessage(headerBuffer, Header::headerSize);
writeMessage(&direction, sizeof(direction));
writeMessage(message.c_str(), message.size());
finishMessage();
}
} }
else else
{ {
...@@ -25,45 +45,20 @@ public: ...@@ -25,45 +45,20 @@ public:
"Size: %ud", message.size()); "Size: %ud", message.size());
} }
} }
private:
void writeHeader(int type, unsigned short size, Order const & key)
{
int bufferSize = sizeof(unsigned char)*2 + sizeof(unsigned short)*3
+ sizeof(unsigned long);
char buffer[bufferSize];
char * pos = buffer;
pos = saveChar(pos, type);
pos = saveShort(pos, size);
pos = saveChar(pos, key.transport);
pos = saveInt(pos, key.ip);
pos = saveShort(pos, key.localPort);
pos = saveShort(pos, key.remotePort);
writeMessage(buffer, bufferSize);
}
char * saveChar(char * buffer, unsigned char value)
{
memcpy(buffer, &value, sizeof(value));
return buffer + sizeof(value);
}
char * saveShort(char * buffer, unsigned short value)
{
unsigned short ordered = htons(value);
memcpy(buffer, &ordered, sizeof(ordered));
return buffer + sizeof(ordered);
}
char * saveInt(char * buffer, unsigned short value)
{
unsigned int ordered = htonl(value);
memcpy(buffer, &ordered, sizeof(ordered));
return buffer + sizeof(ordered);
}
protected: protected:
virtual int startMessage(int size)=0;
virtual void endMessage(void)=0;
virtual void writeMessage(char const * message, int count)=0; virtual void writeMessage(char const * message, int count)=0;
}; };
class NullCommandOutput : public CommandOutput
{
public:
virtual ~NullCommandOutput() {}
protected:
virtual int startMessage(int size) { return DISCARDING_MESSAGE; }
virtual void endMessage(void) {}
virtual void writeMessage(char const *, int) {}
};
#endif #endif
...@@ -52,11 +52,33 @@ void Connection::setTraffic(std::auto_ptr<TrafficModel> newTraffic) ...@@ -52,11 +52,33 @@ void Connection::setTraffic(std::auto_ptr<TrafficModel> newTraffic)
traffic = newTraffic; traffic = newTraffic;
} }
void Connection::addConnectionModelParam(ConnectionModelCommand const & param)
{
if (peer.get() != NULL)
{
peer->addParam(param);
}
else
{
logWrite(ERROR, "Connection model has not been initialized before an "
"addConnectionModelParam call");
}
}
void Connection::connect(void) void Connection::connect(void)
{ {
if (peer.get() != NULL) if (peer.get() != NULL)
{ {
peer->connect(); planet.transport = TCP_CONNECTION;
planet.ip = elab.ip;
planet.remotePort = global::peerServerPort;
// planet is modified by ConnectionModel::connect()
peer->connect(planet);
isConnected = peer->isConnected();
if (isConnected)
{
global::planetMap.insert(make_pair(planet, this));
}
} }
else else
{ {
...@@ -89,25 +111,31 @@ void Connection::addSensor(SensorCommand const & newSensor) ...@@ -89,25 +111,31 @@ void Connection::addSensor(SensorCommand const & newSensor)
measurements.addSensor(newSensor); measurements.addSensor(newSensor);
} }
void Connection::captureSend(Time & packetTime, struct tcp_info * kernel, void Connection::captureSend(PacketInfo * packet)
struct tcphdr * tcp)
{ {
Sensor * head = measurements.getHead(); Sensor * head = measurements.getHead();
if (head != NULL && isConnected) if (head != NULL && isConnected)
{ {
head->captureSend(packetTime, kernel, tcp, elab, bufferFull); packet->elab = elab;
packet->bufferFull = bufferFull;
head->captureSend(packet);
} }
} }
void Connection::captureAck(Time & packetTime, struct tcp_info * kernel, void Connection::captureAck(PacketInfo * packet)
struct tcphdr * tcp)
{ {
Sensor * head = measurements.getHead(); Sensor * head = measurements.getHead();
if (head != NULL && isConnected) if (head != NULL && isConnected)
{ {
head->captureAck(packetTime, kernel, tcp, elab, bufferFull); packet->elab = elab;
packet->bufferFull = bufferFull;
head->captureAck(packet);
} }
} }
ConnectionModel const * Connection::getConnectionModel(void)
{
return peer.get();
}
Time Connection::writeToConnection(Time const & previousTime) Time Connection::writeToConnection(Time const & previousTime)
{ {
......
...@@ -38,11 +38,11 @@ public: ...@@ -38,11 +38,11 @@ public:
// Adds a particular kind of sensor when requested by the monitor. // Adds a particular kind of sensor when requested by the monitor.
void addSensor(SensorCommand const & newSensor); void addSensor(SensorCommand const & newSensor);
// Notifies the sensors of a data packet which was sent. // Notifies the sensors of a data packet which was sent.
void captureSend(Time & packetTime, struct tcp_info * kernel, void captureSend(PacketInfo * packet);
struct tcphdr * tcp);
// Notifies the sensors of an acknowledged packet which was received. // Notifies the sensors of an acknowledged packet which was received.
void captureAck(Time & packetTime, struct tcp_info * kernel, void captureAck(PacketInfo * packet);
struct tcphdr * tcp); // Allows the connection model to be viewed.
ConnectionModel const * getConnectionModel(void);
// Notifies the traffic model that a timer has expired on the // Notifies the traffic model that a timer has expired on the
// scheduler. // scheduler.
Time writeToConnection(Time const & previousTime); Time writeToConnection(Time const & previousTime);
......
...@@ -3,12 +3,45 @@ ...@@ -3,12 +3,45 @@
#ifndef CONNECTION_MODEL_H_PELAB_2 #ifndef CONNECTION_MODEL_H_PELAB_2
#define CONNECTION_MODEL_H_PELAB_2 #define CONNECTION_MODEL_H_PELAB_2
class ConnectionModelCommand;
class ConnectionModel class ConnectionModel
{ {
public: public:
virtual ~ConnectionModel() {} virtual ~ConnectionModel() {}
virtual std::auto_ptr<ConnectionModel> clone(void)=0; virtual std::auto_ptr<ConnectionModel> clone(void)=0;
virtual void connect(void)=0; virtual void connect(void)=0;
virtual void addParam(ConnectionModelCommand const & param)=0;
// Returns the number of bytes actually written or -1 if there was
// an error. Errno is not preserved.
virtual int writeMessage(int size, WriteResult & result)=0;
virtual bool isConnected(void)=0;
};
class ConnectionModelNull : public ConnectionModel
{
public:
virtual ~ConnectionModelNull() {}
virtual std::auto_ptr<ConnectionModel> clone(void)
{
return std::auto_ptr<ConnectionModel>(new ConnectionModelNull());
}
virtual void connect(void) {}
virtual void addParam(ConnectionModelCommand const &) {}
static void init(void) {}
static void addNewPeer(fd_set *) {}
static void readFromPeers(fd_set *) {}
static void packetCapture(fd_set *) {}
virtual void writeMessage(int, WriteResult & result)
{
result.isConnected = false;
result.bufferFull = false;
}
virtual bool isConnected(void)
{
return false;
}
}; };
#endif #endif
...@@ -6,21 +6,291 @@ ...@@ -6,21 +6,291 @@
using namespace std; using namespace std;
void kernelTcp_init(void) KernelTcp::KernelTcp()
: state(DISCONNECTED)
, peersock(-1)
, sendBufferSize(0)
, receiveBufferSize(0)
, maxSegmentSize(0)
, useNagles(1)
{
}
KernelTcp::~KernelTcp()
{
if (peersock != -1)
{
close(peersock);
}
}
auto_ptr<ConnectionModel> KernelTcp::clone(void)
{
auto_ptr<KernelTcp> result(new KernelTcp());
result.sendBufferSize = sendBufferSize;
result.receiveBufferSize = receiveBufferSize;
result.maxSegmentSize = maxSegmentSize;
result.useNagles = useNagles;
return result;
}
void KernelTcp::connect(Order & planet)
{
if (state == DISCONNECTED)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
{
logWrite(EXCEPTION, "Cannot create a peer socket: %s", strerror(errno));
return;
}
// Set up all parameters
if ((sendBufferSize != 0 && !changeSocket(sockfd, SOL_SOCKET, SO_SNDBUF,
sendBufferSize, "SO_SNDBUF"))
|| (receiveBufferSize != 0 && !changeSocket(sockfd, SOL_SOCKET,
SO_RCVBUF,
receiveBufferSize,
"SO_RCVBUF"))
|| !changeSocket(sockfd, IPPROTO_TCP, TCP_NODELAY, !useNagles,
"TCP_NODELAY"))
{
close(sockfd);
return;
}
struct sockaddr_in destAddress;
destAddress.sin_family = AF_INET;
destAddress.sin_port = htons(global::peerServerPort);
destAddress.sin_addr.s_addr = htonl(planet.ip);
int error = ::connect(sockfd, (struct sockaddr *)&destAddress,
sizeof(destAddress));
if (error == -1)
{
logWrite(EXCEPTION, "Cannot connect to peer: %s", strerror(errno));
close(sockfd);
return;
}
if (maxSegmentSize != 0 && !changeSocket(sockfd, IPPROTO_TCP,
TCP_MAXSEG, maxSegmentSize,
"TCP_MAXSEG"))
{
close(sockfd);
return;
}
int flags = fcntl(sockfd, F_GETFL);
if (flags == -1)
{
logWrite(EXCEPTION, "Cannot get fcntl flags from a peer socket: %s",
strerror(errno));
close(sockfd);
return;
}
error = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
if (error == =1)
{
logWrite(EXCEPTION, "Cannot set fcntl flags (nonblocking) "
"on a peer socket: %s", strerror(errno));
close(sockfd);
return;
}
struct sockaddr_in sourceAddress;
socklen_t len = sizeof(sourceAddress);
error = getsockname(sockfd, (struct sockaddr *)&sourceAddress,
&len);
if (error == -1)
{
logWrite(EXCEPTION, "Cannot find the source address for a peer: %s");
close(sockfd);
return;
}
planet.localPort = ntohs(sourceAddress.sin_port);
peersock = sockfd;
state = CONNECTED;
}
}
void KernelTcp::addParam(ConnectionModelCommand const & param)
{
switch (param.type)
{
case CONNECTION_SEND_BUFFER_SIZE:
sendBufferSize = param.value;
break;
case CONNECTION_RECEIVE_BUFFER_SIZE:
receiveBufferSize = param.value;
break;
case CONNECTION_MAX_SEGMENT_SIZE:
maxSegmentSize = param.value;
break;
case CONNECTION_USE_NAGLES:
useNagles = param.value;
break;
default:
logWrite(ERROR, "Invalid ConnectionModelCommand type: %d", param.type);
}
}
bool KernelTcp::changeSocket(int sockfd, int optname, int value,
string optstring)
{
int error = setsockopt(sockfd, SOL_SOCKET, optname, &value, sizeof(value));
if (error == -1)
{
logWrite(ERROR, "Cannot set socket option %s", optstring.c_str());
return false;
}
int newValue = 0;
int newValueLength = sizeof(newValue);
error = getsockopt(sockfd, SOL_SOCKET, optname, &newValue, &newValueLength);
if (error == -1)
{
logWrite(ERROR, "Cannot read back socket option %s", optstring.c_str());
return false;
}
logWrite(CONNECTION_MODEL, "Socket option %s is now %d", optstring.c_str(),
newValue);
return true;
}
int KernelTcp::writeMessage(int size, WriteResult & result)
{
if (state == DISCONNECTED)
{
connect(result.planet);
}
if (state == CONNECTED)
{
// Create a different random write each time.
vector<char> buffer;
buffer.resize(size);
size_t i = 0;
for (i = 0; i < buffer.size(); ++i)
{
buffer[i] = static_cast<char>(random() & 0xff);
}
// Actually write the darn thing.
int error = send(peersock, & buffer[0], buffer.size(), 0);
if (error == 0)
{
close(peersock);
peersock = -1;
state = DISCONNECTED;
result.fullBuffer = false;
result.isConnected = false;
return 0;
}
else if (error == -1)
{
logWrite(EXCEPTION, "Failed write to peer: %s", strerror(errno));
return -1;
}
else
{
return error;
}
}
result.fullBuffer = false;
result.isConnected = false;
return -1;
}
bool KernelTcp::isConnected(void)
{
return state == CONNECTED;
}
enum { SNIFF_WAIT = 10 };
void KernelTcp::init(void)
{ {
int error = 0; int error = 0;
// Set up the peerAccept socket // Set up the peerAccept socket
address.sin_family = AF_INET; int sockfd = socket(AF_INET, SOCK_STREAM, 0);
address.sin_port = htons(SENDER_PORT); if (sockfd == -1)
address.sin_arrd.s_addr = INADDR_ANY; {
error = bind(global::peerAccept, logWrite(ERROR, "Unable to generate a peer accept socket. "
reinterpret_cast<struct sockaddr *>(address), "No incoming peer connections will ever be accepted: %s",
sizeof(struct sockaddr)) strerror(errno));
}
else
{
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(global::peerServerPort);
address.sin_addr.s_addr = INADDR_ANY;
error = bind(sockfd, reinterpret_cast<struct sockaddr *>(&address),
sizeof(struct sockaddr));
if (error == -1)
{
logWrite(ERROR, "Unable to bind peer accept socket. "
"No incoming peer connections will ever be accepted: %s",,
strerror(errno));
close(sockfd);
}
else
{
setDescriptor(sockfd);
global::peerAccept = sockfd;
}
}
// Set up the connectionModelExemplar // Set up the connectionModelExemplar
global::connectionModelExemplar.reset(new KernelTcp());
// Set up packet capture // Set up packet capture
char errorbuf[PCAP_ERRBUF_SIZE];
struct bpf_program fp; /* hold compiled program */
bpf_u_int32 maskp; /* subnet mask */
bpf_u_int32 netp; /* ip */
ostringstream filter;
/* ask pcap for the network address and mask of the device */
pcap_lookupnet(global::interface.c_str(), &netp, &maskp, errbuf);
filter << "port " << global::peerServerPort << " and tcp";
/* open device for reading.
* NOTE: We use non-promiscuous */
pcapDescriptor = pcap_open_live(global::interface.c_str(), BUFSIZ, 0,
SNIFF_WAIT, errbuf);
if(pcapDescriptor == NULL)
{
logWrite(ERROR, "pcap_open_live() failed: %s", errbuf);
}
// Lets try and compile the program, optimized
else if(pcap_compile(pcapDescriptor, &fp, filter, 1, maskp) == -1)
{
logWrite(ERROR, "pcap_compile() failed");
}
// set the compiled program as the filter
else if(pcap_setfilter(pcapDescriptor,&fp) == -1)
{
logWrite(ERROR, "pcap_filter() failed");
}
else
{
pcapfd = pcap_get_selectable_fd(pcapDescriptor);