diff --git a/pelab/magent/CircularTraffic.cc b/pelab/magent/CircularTraffic.cc index e6b0af8d0335b0a17a6d1507293e578425c67d65..527554058dc4884eccf8014d616747afc7b774b7 100644 --- a/pelab/magent/CircularTraffic.cc +++ b/pelab/magent/CircularTraffic.cc @@ -54,26 +54,23 @@ Time CircularTraffic::addWrite(TrafficWriteCommand const & newWrite, } } -WriteResult CircularTraffic::writeToPeer(ConnectionModel * peer, - Time const & previousTime) +void CircularTraffic::writeToPeer(ConnectionModel * peer, + Time const & previousTime, + WriteResult & result) { if (usedCount > 0) { - WriteResult result; current = (current + 1) % usedCount; peer->writeMessage(writes[current].size, result); result.nextWrite = previousTime + writes[current].delta; - return result; } else { logWrite(ERROR, "writeToPeer() called without addWrite() being " "called first. This should be impossible."); - WriteResult result; result.isConnected = peer->isConnected(); result.bufferFull = false; result.nextWrite = Time(); - return result; } } diff --git a/pelab/magent/CircularTraffic.h b/pelab/magent/CircularTraffic.h index ea2cea1db5b14ce745315d379aec7ccdec296bcb..4e5d9562770760d1b0aacf8f1457b8d161a59233 100644 --- a/pelab/magent/CircularTraffic.h +++ b/pelab/magent/CircularTraffic.h @@ -15,8 +15,9 @@ public: virtual std::auto_ptr clone(void); virtual Time addWrite(TrafficWriteCommand const & newWrite, Time const & deadline); - virtual WriteResult writeToPeer(ConnectionModel * peer, - Time const & previousTime); + virtual void writeToPeer(ConnectionModel * peer, + Time const & previousTime, + WriteResult & result); private: int begin; int usedCount; diff --git a/pelab/magent/Command.cc b/pelab/magent/Command.cc index 0088b6df547019eaec5f6ada9c9b38af7aa91be5..aedf034311091fded3f7636766a212b0f6127090 100644 --- a/pelab/magent/Command.cc +++ b/pelab/magent/Command.cc @@ -69,7 +69,7 @@ void ConnectCommand::runConnect(Connection * conn, void TrafficWriteCommand::runConnect(Connection * conn, std::multimap & schedule) { - logWrite(COMMAND_INPUT, "Running TRAFFIC_WRITE_COMMAND"); +// logWrite(COMMAND_INPUT, "Running TRAFFIC_WRITE_COMMAND"); conn->addTrafficWrite(*this, schedule); } diff --git a/pelab/magent/Connection.cc b/pelab/magent/Connection.cc index b280606aeadc565cdccdd7a6e0070e221e08e566..7d66e46862ca39f189b4272c9ec76364c8cacea5 100755 --- a/pelab/magent/Connection.cc +++ b/pelab/magent/Connection.cc @@ -159,7 +159,11 @@ ConnectionModel const * Connection::getConnectionModel(void) Time Connection::writeToConnection(Time const & previousTime) { - WriteResult result = traffic->writeToPeer(peer.get(), previousTime); + WriteResult result; + result.planet.transport = TCP_CONNECTION; + result.planet.ip = elab.ip; + result.planet.remotePort = global::peerServerPort; + traffic->writeToPeer(peer.get(), previousTime, result); if (!isConnected && result.isConnected) { planet = result.planet; diff --git a/pelab/magent/KernelTcp.cc b/pelab/magent/KernelTcp.cc index a32a15bddf0abbae84b13e350b42bd786abad5dd..45de482886bb14c2eddea33ca341d42987b6964a 100755 --- a/pelab/magent/KernelTcp.cc +++ b/pelab/magent/KernelTcp.cc @@ -76,6 +76,7 @@ void KernelTcp::connect(Order & planet) || !changeSocket(sockfd, IPPROTO_TCP, TCP_NODELAY, !useNagles, "TCP_NODELAY")) { + // Error message already printed close(sockfd); return; } @@ -85,7 +86,6 @@ void KernelTcp::connect(Order & planet) destAddress.sin_family = AF_INET; destAddress.sin_port = htons(global::peerServerPort); destAddress.sin_addr.s_addr = htonl(planet.ip); - int error = ::connect(sockfd, (struct sockaddr *)&destAddress, sizeof(destAddress)); if (error == -1) @@ -134,6 +134,8 @@ void KernelTcp::connect(Order & planet) } planet.localPort = ntohs(sourceAddress.sin_port); + logWrite(CONNECTION_MODEL, "Connected to peer at %s:%d", + ipToString(htonl(planet.ip)).c_str(), global::peerServerPort); peersock = sockfd; state = CONNECTED; } @@ -170,6 +172,8 @@ int KernelTcp::writeMessage(int size, WriteResult & result) { if (state == DISCONNECTED) { + logWrite(CONNECTION_MODEL, + "writeMessage() called while disconnected from peer."); connect(result.planet); } if (state == CONNECTED) @@ -195,7 +199,10 @@ int KernelTcp::writeMessage(int size, WriteResult & result) } else if (error == -1) { - logWrite(EXCEPTION, "Failed write to peer: %s", strerror(errno)); + if (errno != EWOULDBLOCK) + { + logWrite(EXCEPTION, "Failed write to peer: %s", strerror(errno)); + } return -1; } else @@ -226,6 +233,8 @@ void KernelTcp::init(void) global::peerAccept = createServer(global::peerServerPort, "Peer accept socket (No incoming peer " "connections will be accepted)"); + logWrite(PEER_CYCLE, "Created peer server on port %d", + global::peerServerPort); // Set up the connectionModelExemplar global::connectionModelExemplar.reset(new KernelTcp()); @@ -316,6 +325,7 @@ void KernelTcp::readFromPeers(fd_set * readable) "Peer connection %d from %s is closing normally.", pos->first, pos->second.c_str()); close(pos->first); + clearDescriptor(pos->first); list< pair >::iterator temp = pos; ++pos; global::peers.erase(temp); @@ -348,7 +358,7 @@ void KernelTcp::packetCapture(fd_set * readable) unsigned char * args = NULL; if (pcapfd != -1 && FD_ISSET(pcapfd, readable)) { - pcap_dispatch(pcapDescriptor, -1, kernelTcpCallback, args); + pcap_dispatch(pcapDescriptor, 1, kernelTcpCallback, args); } } @@ -370,7 +380,7 @@ namespace if (error == -1) { logWrite(ERROR, "Cannot read back socket option %s", optstring.c_str()); - return false; + return false; } logWrite(CONNECTION_MODEL, "Socket option %s is now %d", optstring.c_str(), newValue); diff --git a/pelab/magent/TrafficModel.h b/pelab/magent/TrafficModel.h index b6235e9ab81996ad29278884b695442e2eba8af0..e7b338c722e68d4cc44ae5f51e43165899eef4c2 100644 --- a/pelab/magent/TrafficModel.h +++ b/pelab/magent/TrafficModel.h @@ -13,8 +13,9 @@ public: virtual std::auto_ptr clone(void)=0; virtual Time addWrite(TrafficWriteCommand const & newWrite, Time const & deadline)=0; - virtual WriteResult writeToPeer(ConnectionModel * peer, - Time const & previousTime)=0; + virtual void writeToPeer(ConnectionModel * peer, + Time const & previousTime, + WriteResult & result)=0; }; class NullTrafficModel : public TrafficModel @@ -30,14 +31,13 @@ public: { return Time(); } - virtual WriteResult writeToPeer(ConnectionModel * peer, - Time const & previousTime) + virtual void writeToPeer(ConnectionModel * peer, + Time const & previousTime, + WriteResult & result) { - WriteResult result; result.isConnected = false; result.bufferFull = false; result.nextWrite = Time(); - return result; } }; diff --git a/pelab/magent/main.cc b/pelab/magent/main.cc index 7e2aee70cc324cd84bcc462f4be48fb2e4d19207..4bf877661e36d3ba211db49315e15f1de0c0cc7a 100755 --- a/pelab/magent/main.cc +++ b/pelab/magent/main.cc @@ -165,6 +165,9 @@ void processArgs(int argc, char * argv[]) void init(void) { + FD_ZERO(&global::readers); + global::maxReader = -1; + logInit(stderr, LOG_EVERYTHING, true); srandom(getpid()); switch (global::connectionModelArg) @@ -180,9 +183,6 @@ void init(void) "Terminating program.", global::connectionModelArg); } - FD_ZERO(&global::readers); - global::maxReader = -1; - global::input.reset(new DirectInput()); global::output.reset(new TrivialCommandOutput()); } @@ -190,6 +190,8 @@ void init(void) void mainLoop(void) { multimap schedule; + fd_set readable; + FD_ZERO(&readable); // Select on file descriptors while (true) @@ -198,7 +200,7 @@ void mainLoop(void) // debugTimeout.tv_sec = 0; // debugTimeout.tv_usec = 100000; - fd_set readable = global::readers; + readable = global::readers; Time timeUntilWrite; struct timeval * waitPeriod; Time now = getCurrentTime(); @@ -214,11 +216,9 @@ void mainLoop(void) timeUntilWrite = Time(); waitPeriod = NULL; } - logWrite(MAIN_LOOP, "Select"); int error = select(global::maxReader + 1, &readable, NULL, NULL, // &debugTimeout); waitPeriod); - logWrite(MAIN_LOOP, "After select"); if (error == -1) { switch (errno) @@ -239,25 +239,18 @@ void mainLoop(void) } } - logWrite(MAIN_LOOP, "Before command loop"); global::input->nextCommand(&readable); Command * current = global::input->getCommand(); - while (current != NULL) + if (current != NULL) { - logWrite(MAIN_LOOP, "Command Loop"); current->run(schedule); - global::input->nextCommand(&readable); - current = global::input->getCommand(); +// global::input->nextCommand(&readable); +// current = global::input->getCommand(); } -// logWrite(MAIN_LOOP, "Before writeToConnections()"); writeToConnections(schedule); -// logWrite(MAIN_LOOP, "Before addNewPeer()"); addNewPeer(&readable); -// logWrite(MAIN_LOOP, "Before readFromPeers()"); readFromPeers(&readable); -// logWrite(MAIN_LOOP, "Before packetCapture()"); packetCapture(&readable); -// logWrite(MAIN_LOOP, "End of main loop"); } } @@ -403,7 +396,7 @@ int createServer(int port, string const & debugString) } struct sockaddr_in address; - address.sin_family = htons(AF_INET); + address.sin_family = AF_INET; address.sin_port = htons(port); address.sin_addr.s_addr = htonl(INADDR_ANY); error = bind(fd, reinterpret_cast(&address),