Commit c8f724c4 authored by Jonathon Duerig's avatar Jonathon Duerig

Many minor fixes. Still not quite functional. TODO: Move the rest of the...

Many minor fixes. Still not quite functional. TODO: Move the rest of the networking code from stubd. Move measurement code from stubd.
parent 98e27f9f
// Command.cc
#include "lib.h"
#include "Command.h"
#include "Sensor.h"
#include "Connection.h"
#include "ConnectionModel.h"
#include "TrafficModel.h"
using namespace std;
void NewConnectionCommand::run(std::multimap<Time, Connection *> &)
{
std::map<Order, Connection>::iterator pos
= global::connections.find(key);
if (pos == global::connections.end())
{
pos = global::connections.insert(make_pair(key, Connection())).first;
pos->second.reset(key, global::connectionModelExemplar->clone());
}
}
void NewConnectionCommand::runConnect(Connection *,
std::multimap<Time, Connection *> &)
{
}
//-----------------------
void TrafficModelCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
}
//-----------------------
void ConnectionModelCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
conn->addConnectionModelParam(*this);
}
//-----------------------
void SensorCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
conn->addSensor(*this);
}
//-----------------------
void ConnectCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
conn->connect();
}
//-----------------------
void TrafficWriteCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> & schedule)
{
conn->addTrafficWrite(*this, schedule);
}
//-----------------------
void DeleteConnectionCommand::run(std::multimap<Time, Connection *> & schedule)
{
std::map<Order, Connection>::iterator pos
= global::connections.find(key);
if (pos != global::connections.end())
{
pos->second.cleanup(schedule);
global::connections.erase(pos);
}
}
void DeleteConnectionCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> &)
{
}
......@@ -3,94 +3,76 @@
#ifndef COMMAND_H_STUB_2
#define COMMAND_H_STUB_2
#include "Connection.h"
class Command
{
public:
virtual void run(std::multimap<Time, Connection *> & schedule)
{
std::multimap<ElabOrder, Connection>::iterator pos
std::map<Order, Connection>::iterator pos
= global::connections.find(key);
if (pos != global::connections.end())
{
runConnect(&(pos->second));
runConnect(&(pos->second), schedule);
}
}
virtual ~Command() {}
protected:
virtual void runConnect(Connection * conn)=0;
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> & schedule)=0;
// We use a key here and look up the connection only on a run()
// because some commands delete a connection and we don't want later
// commands to keep the reference around.
ElabOrder key;
Order key;
};
class NewConnectionCommand : public Command
{
public:
virtual void run(std::multimap<Time, Connection *> &)
{
std::multimap<ElabOrder, Connection>::iterator pos
= global::connections.find(key);
if (pos == global::connections.end())
{
pos = global::connections.insert(make_pair(key, Command()));
pos->second.reset(elab, connectionModelExemplar.clone());
}
}
virtual void run(std::multimap<Time, Connection *> &);
protected:
virtual void runConnect(Connection *)
{
}
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &);
};
class TrafficModelCommand : public Command
{
protected:
virtual void runConnect(Connection * conn)
{
}
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &);
};
class ConnectionModelCommand : public Command
{
protected:
virtual void runConnect(Connection * conn)
{
conn->addConnectionModelParam(*this);
}
public:
int type;
unsigned int value;
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &);
};
class SensorCommand : public Command
{
protected:
virtual void runConnect(Connection * conn)
{
conn->addSensor(*this);
}
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &);
public:
int type;
vector<char> parameters;
std::vector<char> parameters;
};
class ConnectCommand : public Command
{
protected:
virtual void runConnect(Connection * conn)
{
conn->connect();
}
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &);
};
class TrafficWriteCommand : public Command
{
protected:
virtual void runConnect(Connection * conn)
{
conn->addTrafficWrite(*this, schedule);
}
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> & schedule);
public:
unsigned int delta;
unsigned int size;
......@@ -99,20 +81,10 @@ public:
class DeleteConnectionCommand : public Command
{
public:
virtual void run(std::multimap<Time, Connection *> & schedule)
{
std::multimap<ElabOrder, Connection>::iterator pos
= global::connections.find(key);
if (pos != global::connections.end())
{
pos->second.cleanup();
global::connections.erase(pos);
}
}
virtual void run(std::multimap<Time, Connection *> & schedule);
protected:
virtual void runConnect(Connection * conn)
{
}
virtual void runConnect(Connection * conn,
std::multimap<Time, Connection *> &);
};
#endif
......@@ -10,16 +10,17 @@
#ifndef COMMAND_INPUT_H_STUB_2
#define COMMAND_INPUT_H_STUB_2
class Command;
#include "Command.h"
class CommandInput
{
public:
virtual ~CommandInput() {}
virtual Command * getCommand(void)
{
return currentCommand.get();
}
virtual void nextCommand(void)=0;
virtual void nextCommand(fd_set & readable)=0;
protected:
std::auto_ptr<Command> currentCommand;
};
......
......@@ -11,12 +11,13 @@
class CommandOutput
{
public:
void eventMessage(string const & message, ElabOrder const & key)
virtual ~CommandOutput() {}
void eventMessage(std::string const & message, Order const & key)
{
if (message.size() <= 0xffff && message.size() > 0)
{
writeHeader(EVENT_TO_MONITOR, message.size(), key);
writeMessage(message.c_str(), message.size(), key);
writeMessage(message.c_str(), message.size());
}
else
{
......@@ -26,19 +27,19 @@ public:
}
private:
void writeHeader(int type, unsigned short size, ElabOrder const & key)
void writeHeader(int type, unsigned short size, Order const & key)
{
int bufferSize = sizeof(unsigned char)*2 + sizeof(unsigned short)*3
+ sizeof(unsigned long);
char buffer[bufferSize];
char * pos = buffer;
pos += saveChar(pos, special);
pos += saveShort(pos, size);
pos += saveChar(pos, key.transport);
pos += saveInt(pos, key.ip);
pos += saveShort(pos, key.localPort);
pos += saveShort(pos, key.remotePort);
pos = saveChar(pos, type);
pos = saveShort(pos, size);
pos = saveChar(pos, key.transport);
pos = saveInt(pos, key.ip);
pos = saveShort(pos, key.localPort);
pos = saveShort(pos, key.remotePort);
writeMessage(buffer, bufferSize);
}
......@@ -62,7 +63,7 @@ private:
return buffer + sizeof(ordered);
}
protected:
virtual void writeMessage(char const * message, int count);
virtual void writeMessage(char const * message, int count)=0;
};
#endif
// Connection.cc
#include "lib.h"
#include "log.h"
#include "Connection.h"
#include "Time.h"
#include "ConnectionModel.h"
......@@ -9,6 +10,36 @@
using namespace std;
Connection::Connection()
: isConnected(false)
, bufferFull(false)
{
}
Connection::Connection(Connection const & right)
: peer(right.peer->clone())
, traffic(right.traffic->clone())
, measurements(right.measurements)
, isConnected(right.isConnected)
, bufferFull(right.bufferFull)
, nextWrite(right.nextWrite)
{
}
Connection & Connection::operator=(Connection const & right)
{
if (this != &right)
{
peer = right.peer->clone();
traffic = right.traffic->clone();
measurements = right.measurements;
isConnected = right.isConnected;
bufferFull = right.bufferFull;
nextWrite = right.nextWrite;
}
return *this;
}
void Connection::reset(Order const & newElab,
std::auto_ptr<ConnectionModel> newPeer)
{
......@@ -34,15 +65,16 @@ void Connection::connect(void)
}
}
void Connection::addTrafficWrite(TrafficWriteCommand const & newWrite
std::multimap<Time, Connection *> const & schedule)
void Connection::addTrafficWrite(TrafficWriteCommand const & newWrite,
multimap<Time, Connection *> & schedule)
{
if (traffic.get() != NULL)
{
Time nextTime = traffic->addWrite(newWrite);
if (nextTime != Time())
Time temp = traffic->addWrite(newWrite, nextWrite);
if (temp != Time() && nextWrite == Time())
{
schedule.insert(make_pair(nextTime, this));
nextWrite = temp;
schedule.insert(make_pair(nextWrite, this));
}
}
else
......@@ -89,18 +121,34 @@ Time Connection::writeToConnection(Time const & previousTime)
&& planet != result.planet)
{
global::planetMap.erase(planet);
planet = result;
planet = result.planet;
global::planetMap.insert(make_pair(planet, this));
}
isConnected = result.isConnected;
bufferFull = result.bufferFull;
nextWrite = result.nextWrite;
return result.nextWrite;
}
void cleanup(void)
void Connection::cleanup(std::multimap<Time, Connection *> & schedule)
{
if (isConnected)
{
global::planetMap.erase(planet);
}
if (nextWrite != Time())
{
std::multimap<Time, Connection *>::iterator pos
= schedule.lower_bound(nextWrite);
std::multimap<Time, Connection *>::iterator limit
= schedule.upper_bound(nextWrite);
for (; pos != limit; ++pos)
{
if (pos->second == this)
{
schedule.erase(pos);
break;
}
}
}
}
......@@ -3,10 +3,15 @@
#ifndef CONNECTION_H_STUB_2
#define CONNECTION_H_STUB_2
#include "SensorList.h"
class Time;
class ConnectionModel;
class TrafficModel;
class Sensor;
class ConnectionModelCommand;
class TrafficWriteCommand;
class SensorCommand;
class Connection
{
......@@ -29,7 +34,7 @@ public:
void connect(void);
// Notifies the traffic model of write information from the monitor.
void addTrafficWrite(TrafficWriteCommand const & newWrite,
std::multimap<Time, Connection *> const & schedule);
std::multimap<Time, Connection *> & schedule);
// Adds a particular kind of sensor when requested by the monitor.
void addSensor(SensorCommand const & newSensor);
// Notifies the sensors of a data packet which was sent.
......@@ -42,7 +47,7 @@ public:
// scheduler.
Time writeToConnection(Time const & previousTime);
// Called just before a connection is removed from the map.
void cleanup(void);
void cleanup(std::multimap<Time, Connection *> & schedule);
private:
// There are two kinds of ordering. One is for commands from
// emulab. One is for the pcap analysis.
......@@ -54,6 +59,7 @@ private:
SensorList measurements;
bool isConnected;
bool bufferFull;
Time nextWrite;
};
#endif
......@@ -5,6 +5,10 @@
class ConnectionModel
{
public:
virtual ~ConnectionModel() {}
virtual std::auto_ptr<ConnectionModel> clone(void)=0;
virtual void connect(void)=0;
};
#endif
// KernelTcp.cc
#include "lib.h"
#include "log.h"
#include "KernelTcp.h"
using namespace std;
void kernelTcp_init(void)
{
int error = 0;
// Set up the peerAccept socket
address.sin_family = AF_INET;
address.sin_port = htons(SENDER_PORT);
address.sin_arrd.s_addr = INADDR_ANY;
error = bind(global::peerAccept,
reinterpret_cast<struct sockaddr *>(address),
sizeof(struct sockaddr))
// Set up the connectionModelExemplar
// Set up packet capture
}
void kernelTcp_addNewPeer(fd_set * readable)
{
if (global::peerAccept != -1
......@@ -10,14 +27,16 @@ void kernelTcp_addNewPeer(fd_set * readable)
{
struct sockaddr_in remoteAddress;
socklen_t addressSize = sizeof(remoteAddress);
int fd = accept(global::peerAccept, &remoteAddress, &addressSize);
int fd = accept(global::peerAccept,
reinterpret_cast<struct sockaddr *>(&remoteAddress),
&addressSize);
if (fd != -1)
{
// Add the peer.
int flags = fctl(fd, F_GETFL);
int flags = fcntl(fd, F_GETFL);
if (flags != -1)
{
int error = fctl(fd, F_SETFL, flags | O_NONBLOCK);
int error = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (error != -1)
{
global::peers.push_back(
......@@ -25,7 +44,8 @@ void kernelTcp_addNewPeer(fd_set * readable)
addDescriptor(fd);
logWrite(PEER_CYCLE,
"Peer connection %d from %s was accepted normally.",
global::peers.back().first, global::peers.back().second);
global::peers.back().first,
global::peers.back().second.c_str());
}
else
{
......@@ -49,8 +69,8 @@ void kernelTcp_addNewPeer(fd_set * readable)
void kernelTcp_readFromPeers(fd_set * readable)
{
list<int>::iterator pos = global::peers.begin();
while (pos != peers.end())
list< pair<int, string> >::iterator pos = global::peers.begin();
while (pos != global::peers.end())
{
if (FD_ISSET(pos->first, readable))
{
......@@ -61,9 +81,9 @@ void kernelTcp_readFromPeers(fd_set * readable)
{
logWrite(PEER_CYCLE,
"Peer connection %d from %s is closing normally.",
pos->first, pos->second);
pos->first, pos->second.c_str());
close(pos->first);
list<int>::iterator temp = pos;
list< pair<int, string> >::iterator temp = pos;
++pos;
global::peers.erase(temp);
}
......@@ -71,10 +91,10 @@ void kernelTcp_readFromPeers(fd_set * readable)
{
logWrite(EXCEPTION,
"Failed to read peer connection %d from %s so "
"I'm shutting it down: %s", pos->first, pos->second,
"I'm shutting it down: %s", pos->first, pos->second.c_str(),
strerror(errno));
close(pos->first);
list<int>::iterator temp = pos;
list< pair<int, string> >::iterator temp = pos;
++pos;
global::peers.erase(temp);
}
......@@ -89,3 +109,7 @@ void kernelTcp_readFromPeers(fd_set * readable)
}
}
}
void kernelTcp_packetCapture(fd_set * readable)
{
}
......@@ -23,6 +23,7 @@ class KernelTcp : public ConnectionModel
bool useNagles;
};
void kernelTcp_init(void);
void kernelTcp_addNewPeer(fd_set * readable);
void kernelTcp_readFromPeers(fd_set * readable);
void kernelTcp_packetCapture(fd_set * readable);
......
all: magent
magent: Command.o Connection.o Decayer.o KernelTcp.o SensorList.o Time.o log.o main.o
g++ -I. -g -Wall Command.o Connection.o Decayer.o KernelTcp.o SensorList.o Time.o log.o main.o -lm -lpcap -o magent
Command.o: Command.cc lib.h Command.h Sensor.h Connection.h ConnectionModel.h TrafficModel.h
g++ -I. -g -Wall -c Command.cc
Connection.o: Connection.cc lib.h log.h Connection.h Time.h ConnectionModel.h TrafficModel.h Sensor.h
g++ -I. -g -Wall -c Connection.cc
Decayer.o: Decayer.cc lib.h Decayer.h
g++ -I. -g -Wall -c Decayer.cc
KernelTcp.o: KernelTcp.cc lib.h log.h KernelTcp.h
g++ -I. -g -Wall -c KernelTcp.cc
SensorList.o: SensorList.cc lib.h log.h SensorList.h Sensor.h Command.h
g++ -I. -g -Wall -c SensorList.cc
Time.o: Time.cc lib.h Time.h
g++ -I. -g -Wall -c Time.cc
log.o: log.cc lib.h log.h
g++ -I. -g -Wall -c log.cc
main.o: main.cc lib.h log.h CommandInput.h CommandOutput.h Command.h Time.h Connection.h Sensor.h TrafficModel.h KernelTcp.h
g++ -I. -g -Wall -c main.cc
clean:
rm *.o magent
......@@ -10,6 +10,7 @@
class Sensor
{
public:
virtual ~Sensor() {}
Sensor * getTail(void)
{
if (next.get() == NULL)
......@@ -45,12 +46,12 @@ public:
next->captureAck(packetTime, kernel, tcp, elab, bufferFull);
}
}
std::auto_ptr<Sensor> clone(void) const;
std::auto_ptr<Sensor> clone(void) const
{
std::auto_ptr<Sensor> result(localClone());
if (next.get() != NULL)
{
result.next = next->clone();
result->next = next->clone();
}
return result;
}
......@@ -65,7 +66,7 @@ protected:
struct tcp_info const * kernel,
struct tcphdr const * tcp, Order const & elab,
bool bufferFull)=0;
virtual std::auto_ptr<Sensor> clone(void) const=0;
virtual std::auto_ptr<Sensor> localClone(void) const=0;
};
#endif
// SensorList.cc
#include "lib.h"
#include "log.h"
#include "SensorList.h"
#include "Sensor.h"
#include "Command.h"
using namespace std;
SensorList::SensorList()
{
tail = NULL;
// Dependency = NULL here
tail = NULL;
// Dependency = NULL here
}
SensorList::SensorList(SensorList const & right)
{
*this = right;
*this = right;
}
SensorList & SensorList::operator=(SensorList const & right)
{
if (this != &right)
if (this != &right)
{
if (right.head.get() != NULL)
{
if (right.head.get() != NULL)
{
head = right.head->clone();
tail = ;
}
else
{
head.reset(NULL);
tail = NULL;
}
// Copy dependency pointers here.
head = right.head->clone();
tail = head->getTail();
}
else
{
head.reset(NULL);
tail = NULL;
}
// Copy dependency pointers here.
}
return *this;
}
void SensorList::addSensor(SensorCommand const & newSensor)
{
// Add dependency type demux here.
switch(newSensor.type)
{
default:
logWrite(ERROR,
"Incorrect sensor type (%d). Ignoring add sensor command.",
newSensor.type);
break;
}
// Add dependency type demux here.
switch(newSensor.type)
{
default:
logWrite(ERROR,
"Incorrect sensor type (%d). Ignoring add sensor command.",
newSensor.type);
break;
}
}
Sensor * SensorList::getHead(void)
{
return head.get();
return head.get();
}
void pushSensor(std::auto_ptr<Sensor> newSensor)
void SensorList::pushSensor(std::auto_ptr<Sensor> newSensor)
{
if (tail != NULL)
{
tail->addNode(newSensor);
}
else
{
head = newSensor;
tail = head.get();
}
if (tail != NULL)
{
tail->addNode(newSensor);
}
else
{
head = newSensor;
tail = head.get();
}
}
// Add individual pushSensor functions here.
......
......@@ -13,6 +13,9 @@
#ifndef SENSOR_LIST_H_STUB_2
#define SENSOR_LIST_H_STUB_2
class Sensor;
class SensorCommand;
class SensorList
{
public:
......
......@@ -19,6 +19,12 @@ Time::Time(struct timeval const & newData)
long long Time::toMilliseconds(void) const
{
long long result = data.tv_sec * 1000 + data.tv_usec / 1000;