Commit ed9b3581 authored by Jonathon Duerig's avatar Jonathon Duerig
Browse files

The framework works now. Now to make sure that the sensors are still working.

parent 2638b5bf
......@@ -54,26 +54,23 @@ Time CircularTraffic::addWrite(TrafficWriteCommand const & newWrite,
}
}
WriteResult CircularTraffic::writeToPeer(ConnectionModel * peer,
Time const & previousTime)
void CircularTraffic::writeToPeer(ConnectionModel * peer,
Time const & previousTime,
WriteResult & result)
{
if (usedCount > 0)
{
WriteResult result;
current = (current + 1) % usedCount;
peer->writeMessage(writes[current].size, result);
result.nextWrite = previousTime + writes[current].delta;
return result;
}
else
{
logWrite(ERROR, "writeToPeer() called without addWrite() being "
"called first. This should be impossible.");
WriteResult result;
result.isConnected = peer->isConnected();
result.bufferFull = false;
result.nextWrite = Time();
return result;
}
}
......@@ -15,8 +15,9 @@ public:
virtual std::auto_ptr<TrafficModel> clone(void);
virtual Time addWrite(TrafficWriteCommand const & newWrite,
Time const & deadline);
virtual WriteResult writeToPeer(ConnectionModel * peer,
Time const & previousTime);
virtual void writeToPeer(ConnectionModel * peer,
Time const & previousTime,
WriteResult & result);
private:
int begin;
int usedCount;
......
......@@ -69,7 +69,7 @@ void ConnectCommand::runConnect(Connection * conn,
void TrafficWriteCommand::runConnect(Connection * conn,
std::multimap<Time, Connection *> & schedule)
{
logWrite(COMMAND_INPUT, "Running TRAFFIC_WRITE_COMMAND");
// logWrite(COMMAND_INPUT, "Running TRAFFIC_WRITE_COMMAND");
conn->addTrafficWrite(*this, schedule);
}
......
......@@ -159,7 +159,11 @@ ConnectionModel const * Connection::getConnectionModel(void)
Time Connection::writeToConnection(Time const & previousTime)
{
WriteResult result = traffic->writeToPeer(peer.get(), previousTime);
WriteResult result;
result.planet.transport = TCP_CONNECTION;
result.planet.ip = elab.ip;
result.planet.remotePort = global::peerServerPort;
traffic->writeToPeer(peer.get(), previousTime, result);
if (!isConnected && result.isConnected)
{
planet = result.planet;
......
......@@ -76,6 +76,7 @@ void KernelTcp::connect(Order & planet)
|| !changeSocket(sockfd, IPPROTO_TCP, TCP_NODELAY, !useNagles,
"TCP_NODELAY"))
{
// Error message already printed
close(sockfd);
return;
}
......@@ -85,7 +86,6 @@ void KernelTcp::connect(Order & planet)
destAddress.sin_family = AF_INET;
destAddress.sin_port = htons(global::peerServerPort);
destAddress.sin_addr.s_addr = htonl(planet.ip);
int error = ::connect(sockfd, (struct sockaddr *)&destAddress,
sizeof(destAddress));
if (error == -1)
......@@ -134,6 +134,8 @@ void KernelTcp::connect(Order & planet)
}
planet.localPort = ntohs(sourceAddress.sin_port);
logWrite(CONNECTION_MODEL, "Connected to peer at %s:%d",
ipToString(htonl(planet.ip)).c_str(), global::peerServerPort);
peersock = sockfd;
state = CONNECTED;
}
......@@ -170,6 +172,8 @@ int KernelTcp::writeMessage(int size, WriteResult & result)
{
if (state == DISCONNECTED)
{
logWrite(CONNECTION_MODEL,
"writeMessage() called while disconnected from peer.");
connect(result.planet);
}
if (state == CONNECTED)
......@@ -195,7 +199,10 @@ int KernelTcp::writeMessage(int size, WriteResult & result)
}
else if (error == -1)
{
logWrite(EXCEPTION, "Failed write to peer: %s", strerror(errno));
if (errno != EWOULDBLOCK)
{
logWrite(EXCEPTION, "Failed write to peer: %s", strerror(errno));
}
return -1;
}
else
......@@ -226,6 +233,8 @@ void KernelTcp::init(void)
global::peerAccept = createServer(global::peerServerPort,
"Peer accept socket (No incoming peer "
"connections will be accepted)");
logWrite(PEER_CYCLE, "Created peer server on port %d",
global::peerServerPort);
// Set up the connectionModelExemplar
global::connectionModelExemplar.reset(new KernelTcp());
......@@ -316,6 +325,7 @@ void KernelTcp::readFromPeers(fd_set * readable)
"Peer connection %d from %s is closing normally.",
pos->first, pos->second.c_str());
close(pos->first);
clearDescriptor(pos->first);
list< pair<int, string> >::iterator temp = pos;
++pos;
global::peers.erase(temp);
......@@ -348,7 +358,7 @@ void KernelTcp::packetCapture(fd_set * readable)
unsigned char * args = NULL;
if (pcapfd != -1 && FD_ISSET(pcapfd, readable))
{
pcap_dispatch(pcapDescriptor, -1, kernelTcpCallback, args);
pcap_dispatch(pcapDescriptor, 1, kernelTcpCallback, args);
}
}
......@@ -370,7 +380,7 @@ namespace
if (error == -1)
{
logWrite(ERROR, "Cannot read back socket option %s", optstring.c_str());
return false;
return false;
}
logWrite(CONNECTION_MODEL, "Socket option %s is now %d", optstring.c_str(),
newValue);
......
......@@ -13,8 +13,9 @@ public:
virtual std::auto_ptr<TrafficModel> clone(void)=0;
virtual Time addWrite(TrafficWriteCommand const & newWrite,
Time const & deadline)=0;
virtual WriteResult writeToPeer(ConnectionModel * peer,
Time const & previousTime)=0;
virtual void writeToPeer(ConnectionModel * peer,
Time const & previousTime,
WriteResult & result)=0;
};
class NullTrafficModel : public TrafficModel
......@@ -30,14 +31,13 @@ public:
{
return Time();
}
virtual WriteResult writeToPeer(ConnectionModel * peer,
Time const & previousTime)
virtual void writeToPeer(ConnectionModel * peer,
Time const & previousTime,
WriteResult & result)
{
WriteResult result;
result.isConnected = false;
result.bufferFull = false;
result.nextWrite = Time();
return result;
}
};
......
......@@ -165,6 +165,9 @@ void processArgs(int argc, char * argv[])
void init(void)
{
FD_ZERO(&global::readers);
global::maxReader = -1;
logInit(stderr, LOG_EVERYTHING, true);
srandom(getpid());
switch (global::connectionModelArg)
......@@ -180,9 +183,6 @@ void init(void)
"Terminating program.", global::connectionModelArg);
}
FD_ZERO(&global::readers);
global::maxReader = -1;
global::input.reset(new DirectInput());
global::output.reset(new TrivialCommandOutput());
}
......@@ -190,6 +190,8 @@ void init(void)
void mainLoop(void)
{
multimap<Time, Connection *> schedule;
fd_set readable;
FD_ZERO(&readable);
// Select on file descriptors
while (true)
......@@ -198,7 +200,7 @@ void mainLoop(void)
// debugTimeout.tv_sec = 0;
// debugTimeout.tv_usec = 100000;
fd_set readable = global::readers;
readable = global::readers;
Time timeUntilWrite;
struct timeval * waitPeriod;
Time now = getCurrentTime();
......@@ -214,11 +216,9 @@ void mainLoop(void)
timeUntilWrite = Time();
waitPeriod = NULL;
}
logWrite(MAIN_LOOP, "Select");
int error = select(global::maxReader + 1, &readable, NULL, NULL,
// &debugTimeout);
waitPeriod);
logWrite(MAIN_LOOP, "After select");
if (error == -1)
{
switch (errno)
......@@ -239,25 +239,18 @@ void mainLoop(void)
}
}
logWrite(MAIN_LOOP, "Before command loop");
global::input->nextCommand(&readable);
Command * current = global::input->getCommand();
while (current != NULL)
if (current != NULL)
{
logWrite(MAIN_LOOP, "Command Loop");
current->run(schedule);
global::input->nextCommand(&readable);
current = global::input->getCommand();
// global::input->nextCommand(&readable);
// current = global::input->getCommand();
}
// logWrite(MAIN_LOOP, "Before writeToConnections()");
writeToConnections(schedule);
// logWrite(MAIN_LOOP, "Before addNewPeer()");
addNewPeer(&readable);
// logWrite(MAIN_LOOP, "Before readFromPeers()");
readFromPeers(&readable);
// logWrite(MAIN_LOOP, "Before packetCapture()");
packetCapture(&readable);
// logWrite(MAIN_LOOP, "End of main loop");
}
}
......@@ -403,7 +396,7 @@ int createServer(int port, string const & debugString)
}
struct sockaddr_in address;
address.sin_family = htons(AF_INET);
address.sin_family = AF_INET;
address.sin_port = htons(port);
address.sin_addr.s_addr = htonl(INADDR_ANY);
error = bind(fd, reinterpret_cast<struct sockaddr *>(&address),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment