Commit c6fa578a authored by Jonathon Duerig's avatar Jonathon Duerig

Fixed the traffic model so that it is now in line with what Rob and I...

Fixed the traffic model so that it is now in line with what Rob and I discussed. Delays are before the write with the write size cached. Writes expire based on an expiration date.

Miscellaneous fixes. Open problem: PacketSensor does not deal correctly with zero-sized packets. I changed KernelTcp to pass such packets because otherwise there is no way to do state changes based on SYN/FIN/other packets. SYN is handled ok for now because of a change noted below. FIN is not.

Added a StateSensor. As I discovered, using the kernel tcp_info data structure isn't useful when dealing with fields that change on a packet by packet basis because the kernel information is retrieved at processing time and not capture time. For instance, it is useless when trying to determine whether a connection was established by the time a particular packet was sent (to determine whether it is part of the three-way-handshake). The StateSensor keeps track of the state machine and correlates it to the packet involved. This allows the other sensors to rely on it to distinguish between connection setup and the rest of the connection traffic. Added references to it in the PacketSensor, the DelaySensor and the ThroughputSensor.

Changed the way packet information was transmitted to the sensors to make it easier to add new packet types (as will be necessary when accept()s are handled).

Fixed all outstanding flaws in the basic feedback mechanism. In short, "Its alive!". Currently the only data being transmitted is the base rtt (MinDelay). Now that the feedback is working in a basic form, it will be easier to get the other characteristics online.
parent 19389112
......@@ -10,10 +10,7 @@ using namespace std;
CircularTraffic::CircularTraffic()
{
begin = 0;
usedCount = 0;
current = 0;
writes.resize(DEFAULT_SIZE);
current = writes.end();
}
CircularTraffic::~CircularTraffic()
......@@ -23,10 +20,9 @@ CircularTraffic::~CircularTraffic()
auto_ptr<TrafficModel> CircularTraffic::clone(void)
{
auto_ptr<CircularTraffic> result;
result->begin = begin;
result->usedCount = usedCount;
result->current = current;
result->writes = writes;
result->nextWriteSize = nextWriteSize;
auto_ptr<TrafficModel> model(result.release());
return model;
}
......@@ -34,22 +30,27 @@ auto_ptr<TrafficModel> CircularTraffic::clone(void)
Time CircularTraffic::addWrite(TrafficWriteCommand const & newWrite,
Time const & deadline)
{
if (usedCount < static_cast<int>(writes.size()))
Time now = getCurrentTime();
writes.push_back(newWrite);
writes.back().localTime = now;
// XXX: end() is not necessarily a constant value. Though this
// should work, if there is erratic behaviour, here is a good place
// to check.
if (current == writes.end())
{
writes[usedCount] = newWrite;
++usedCount;
}
else
{
writes[begin] = newWrite;
begin = (begin + 1) % writes.size();
current = writes.begin();
}
if (deadline == Time())
{
return getCurrentTime() + writes[current].delta;
// If there is no current deadline, return the next one.
nextWriteSize = writes.back().size;
return now + min(writes.back().delta,
static_cast<unsigned int>(EXPIRATION_TIME));
}
else
{
// Otherwise, it doesn't matter what we return.
return Time();
}
}
......@@ -58,11 +59,50 @@ void CircularTraffic::writeToPeer(ConnectionModel * peer,
Time const & previousTime,
WriteResult & result)
{
if (usedCount > 0)
if (current != writes.end())
{
current = (current + 1) % usedCount;
peer->writeMessage(writes[current].size, result);
result.nextWrite = previousTime + writes[current].delta;
// Write a cached message of the specified size.
peer->writeMessage(nextWriteSize, result);
// Iterate over any stale writes, removing them as we go along.
// First, start out with one past the current write.
current = advance(current);
// next is used to cache the next value in case we need to delete current.
std::list<TrafficWriteCommand>::iterator next = advance(current);
// Unless we find a recent write, we don't schedule another write.
result.nextWrite = Time();
bool done = false;
Time now = getCurrentTime();
while (current != writes.end() && !done)
{
if ((now - current->localTime).toMilliseconds() < EXPIRATION_TIME)
{
// The current write is recent. Use it.
result.nextWrite = previousTime + current->delta;
nextWriteSize = current->size;
done = true;
}
else
{
// The current write is stale.
if (current == next)
{
// Special case when our list is of size 1, we just nuke the
// list and give up.
writes.clear();
current = writes.end();
nextWriteSize = 0;
}
else
{
// Common case. list is > size 2. We erase the current
// node. Then we advance it using next.
writes.erase(current);
current = next;
next = advance(next);
}
}
}
result.isConnected = peer->isConnected();
}
else
......@@ -74,3 +114,18 @@ void CircularTraffic::writeToPeer(ConnectionModel * peer,
result.nextWrite = Time();
}
}
list<TrafficWriteCommand>::iterator CircularTraffic::advance(
list<TrafficWriteCommand>::iterator old)
{
list<TrafficWriteCommand>::iterator result = old;
if (result == writes.end())
{
result = writes.begin();
}
else
{
++result;
}
return result;
}
......@@ -8,7 +8,7 @@
class CircularTraffic : public TrafficModel
{
public:
enum { DEFAULT_SIZE = 20 };
enum { EXPIRATION_TIME = 500 }; // in milliseconds
public:
CircularTraffic();
virtual ~CircularTraffic();
......@@ -19,10 +19,14 @@ public:
Time const & previousTime,
WriteResult & result);
private:
int begin;
int usedCount;
int current;
std::vector<TrafficWriteCommand> writes;
// Quick function to treat the writes list as though it were
// circular
std::list<TrafficWriteCommand>::iterator advance(
std::list<TrafficWriteCommand>::iterator old);
private:
std::list<TrafficWriteCommand>::iterator current;
std::list<TrafficWriteCommand> writes;
unsigned int nextWriteSize;
};
#endif
......@@ -80,6 +80,7 @@ protected:
public:
unsigned int delta;
unsigned int size;
Time localTime;
};
class DeleteConnectionCommand : public Command
......
......@@ -133,28 +133,16 @@ void Connection::addSensor(SensorCommand const & newSensor)
measurements.addSensor(newSensor);
}
void Connection::captureSend(PacketInfo * packet)
void Connection::capturePacket(PacketInfo * packet)
{
Sensor * head = measurements.getHead();
packet->elab = elab;
packet->bufferFull = bufferFull;
if (head != NULL && isConnected)
{
head->captureSend(packet);
head->capturePacket(packet);
}
replayWritePacket(PACKET_INFO_SEND_COMMAND, packet);
}
void Connection::captureAck(PacketInfo * packet)
{
Sensor * head = measurements.getHead();
packet->elab = elab;
packet->bufferFull = bufferFull;
if (head != NULL && isConnected)
{
head->captureAck(packet);
}
replayWritePacket(PACKET_INFO_ACK_COMMAND, packet);
replayWritePacket(packet);
}
ConnectionModel const * Connection::getConnectionModel(void)
......
......@@ -37,10 +37,8 @@ public:
std::multimap<Time, Connection *> & schedule);
// Adds a particular kind of sensor when requested by the monitor.
void addSensor(SensorCommand const & newSensor);
// Notifies the sensors of a data packet which was sent.
void captureSend(PacketInfo * packet);
// Notifies the sensors of an acknowledged packet which was received.
void captureAck(PacketInfo * packet);
// Notifies the sensors of a captured packet.
void capturePacket(PacketInfo * packet);
// Allows the connection model to be viewed.
ConnectionModel const * getConnectionModel(void);
// Notifies the traffic model that a timer has expired on the
......
......@@ -3,11 +3,14 @@
#include "lib.h"
#include "DelaySensor.h"
#include "PacketSensor.h"
#include "StateSensor.h"
#include "Time.h"
using namespace std;
DelaySensor::DelaySensor(PacketSensor * newPacketHistory)
DelaySensor::DelaySensor(PacketSensor * newPacketHistory,
StateSensor * newState)
: state(newState)
{
lastDelay = 0;
packetHistory = newPacketHistory;
......@@ -24,6 +27,14 @@ void DelaySensor::localSend(PacketInfo *)
void DelaySensor::localAck(PacketInfo * packet)
{
Time diff = packet->packetTime - packetHistory->getAckedSendTime();
lastDelay = diff.toMilliseconds();
if (state->getState() == StateSensor::ESTABLISHED)
{
Time diff = packet->packetTime - packetHistory->getAckedSendTime();
lastDelay = diff.toMilliseconds();
logWrite(SENSOR, "DELAY realDiff: %f seconds, calcDiff: %f",
packet->packetTime.toDouble()
- packetHistory->getAckedSendTime().toDouble(),
diff.toDouble());
logWrite(SENSOR, "DELAY: %d", lastDelay);
}
}
......@@ -6,11 +6,12 @@
#include "Sensor.h"
class PacketSensor;
class StateSensor;
class DelaySensor : public Sensor
{
public:
DelaySensor(PacketSensor * newPacketHistory);
DelaySensor(PacketSensor * newPacketHistory, StateSensor * newState);
int getLastDelay(void) const;
protected:
virtual void localSend(PacketInfo * packet);
......@@ -18,6 +19,7 @@ protected:
private:
int lastDelay;
PacketSensor * packetHistory;
StateSensor * state;
};
#endif
......@@ -588,22 +588,28 @@ namespace
*/
//logWrite(ERROR,"Unhandled: outgoing ACK");
}
if (hasData) {
pos->second->captureSend(&packet);
}
// JD: This should be called even when there is no data. Otherwise,
// there is confusion at connection startup and when there are other
// zero length segments which should be sent.
// if (hasData) {
packet.packetType = PACKET_INFO_SEND_COMMAND;
pos->second->capturePacket(&packet);
// }
} else {
/*
* Incoming packets
*/
if (isAck) {
pos->second->captureAck(&packet);
packet.packetType = PACKET_INFO_ACK_COMMAND;
pos->second->capturePacket(&packet);
}
if (hasData) {
// See above
// if (hasData) {
/*
* XXX - This is not yet implemented (I think)
*/
//logWrite(ERROR,"Unhandled: incoming data");
}
// }
}
}
......
all: magent
magent: CircularTraffic.o Command.o Connection.o Decayer.o DelaySensor.o DirectInput.o KernelTcp.o MaxDelaySensor.o MinDelaySensor.o PacketSensor.o Sensor.o SensorList.o ThroughputSensor.o Time.o TrivialCommandOutput.o log.o main.o saveload.o
g++ -I. -g -Wall CircularTraffic.o Command.o Connection.o Decayer.o DelaySensor.o DirectInput.o KernelTcp.o MaxDelaySensor.o MinDelaySensor.o PacketSensor.o Sensor.o SensorList.o ThroughputSensor.o Time.o TrivialCommandOutput.o log.o main.o saveload.o -lm -lpcap -o magent
magent: CircularTraffic.o Command.o Connection.o Decayer.o DelaySensor.o DirectInput.o KernelTcp.o MaxDelaySensor.o MinDelaySensor.o PacketSensor.o Sensor.o SensorList.o StateSensor.o ThroughputSensor.o Time.o TrivialCommandOutput.o log.o main.o saveload.o
g++ -I. -g -Wall CircularTraffic.o Command.o Connection.o Decayer.o DelaySensor.o DirectInput.o KernelTcp.o MaxDelaySensor.o MinDelaySensor.o PacketSensor.o Sensor.o SensorList.o StateSensor.o ThroughputSensor.o Time.o TrivialCommandOutput.o log.o main.o saveload.o -lm -lpcap -o magent
CircularTraffic.o: CircularTraffic.cc lib.h log.h TrafficModel.h CircularTraffic.h Command.h ConnectionModel.h
g++ -I. -g -Wall -c CircularTraffic.cc
......@@ -15,7 +15,7 @@ Connection.o: Connection.cc lib.h log.h Connection.h Time.h ConnectionModel.h Tr
Decayer.o: Decayer.cc lib.h Decayer.h
g++ -I. -g -Wall -c Decayer.cc
DelaySensor.o: DelaySensor.cc lib.h Sensor.h DelaySensor.h PacketSensor.h Time.h
DelaySensor.o: DelaySensor.cc lib.h Sensor.h DelaySensor.h PacketSensor.h Time.h StateSensor.h
g++ -I. -g -Wall -c DelaySensor.cc
DirectInput.o: DirectInput.cc lib.h log.h CommandInput.h saveload.h DirectInput.h
......@@ -30,16 +30,19 @@ MaxDelaySensor.o: MaxDelaySensor.cc lib.h Sensor.h Decayer.h MaxDelaySensor.h De
MinDelaySensor.o: MinDelaySensor.cc lib.h Sensor.h Decayer.h MinDelaySensor.h DelaySensor.h log.h saveload.h CommandOutput.h
g++ -I. -g -Wall -c MinDelaySensor.cc
PacketSensor.o: PacketSensor.cc lib.h Sensor.h PacketSensor.h
PacketSensor.o: PacketSensor.cc lib.h Sensor.h PacketSensor.h StateSensor.h
g++ -I. -g -Wall -c PacketSensor.cc
Sensor.o: Sensor.cc lib.h Sensor.h
g++ -I. -g -Wall -c Sensor.cc
SensorList.o: SensorList.cc lib.h log.h SensorList.h Sensor.h Command.h PacketSensor.h DelaySensor.h MinDelaySensor.h MaxDelaySensor.h ThroughputSensor.h
SensorList.o: SensorList.cc lib.h log.h SensorList.h Sensor.h Command.h PacketSensor.h DelaySensor.h MinDelaySensor.h MaxDelaySensor.h ThroughputSensor.h StateSensor.h
g++ -I. -g -Wall -c SensorList.cc
ThroughputSensor.o: ThroughputSensor.cc lib.h Sensor.h ThroughputSensor.h PacketSensor.h
StateSensor.o: StateSensor.cc lib.h Sensor.h StateSensor.h
g++ -I. -g -Wall -c StateSensor.cc
ThroughputSensor.o: ThroughputSensor.cc lib.h Sensor.h ThroughputSensor.h PacketSensor.h StateSensor.h
g++ -I. -g -Wall -c ThroughputSensor.cc
Time.o: Time.cc lib.h Time.h
......
......@@ -2,10 +2,13 @@
#include "lib.h"
#include "PacketSensor.h"
#include "StateSensor.h"
using namespace std;
PacketSensor::PacketSensor() : globalSequence()
PacketSensor::PacketSensor(StateSensor * newState)
: globalSequence()
, state(newState)
{
ackedSize = 0;
ackedSendTime = Time();
......@@ -32,148 +35,153 @@ Time const & PacketSensor::getAckedSendTime(void) const
void PacketSensor::localSend(PacketInfo * packet)
{
logWrite(SENSOR,
"PacketSensor::localSend() for sequence number %u",
ntohl(packet->tcp->seq));
unsigned int startSequence = ntohl(packet->tcp->seq);
if (globalSequence.inSequenceBlock(startSequence))
if (state->getState() == StateSensor::ESTABLISHED)
{
logWrite(SENSOR,
"PacketSensor::localSend() within globalSequence");
/*
* This packet should be in our list of sent packets - ie. it's a
* retransmit.
*/
list<SentPacket>::iterator pos = unacked.begin();
list<SentPacket>::iterator limit = unacked.end();
bool done = false;
for (; pos != limit && !done; ++pos)
"PacketSensor::localSend() for sequence number %u",
ntohl(packet->tcp->seq));
unsigned int startSequence = ntohl(packet->tcp->seq);
if (globalSequence.inSequenceBlock(startSequence))
{
if (pos->inSequenceBlock(startSequence))
logWrite(SENSOR,
"PacketSensor::localSend() within globalSequence");
/*
* This packet should be in our list of sent packets - ie. it's a
* retransmit.
*/
list<SentPacket>::iterator pos = unacked.begin();
list<SentPacket>::iterator limit = unacked.end();
bool done = false;
for (; pos != limit && !done; ++pos)
{
pos->timestamp = packet->packetTime;
done = true;
if (pos->inSequenceBlock(startSequence))
{
pos->timestamp = packet->packetTime;
done = true;
}
}
}
if (!done) {
logWrite(ERROR, "localSend() unable to find packet record to update.");
if (!done) {
logWrite(ERROR, "localSend() unable to find packet record to update.");
}
}
}
else
{
/*
* Not in the current window of unacked packets - create a new
* SentPacket record for it
*/
SentPacket record;
record.seqStart = startSequence;
/*
* Calculate the packet payload size - we have to make sure to take into
* account IP and TCP option headers
*/
unsigned int sequenceLength =
else
{
/*
* Not in the current window of unacked packets - create a new
* SentPacket record for it
*/
SentPacket record;
record.seqStart = startSequence;
/*
* Calculate the packet payload size - we have to make sure to take into
* account IP and TCP option headers
*/
unsigned int sequenceLength =
// Total length of the IP part of the packet
(ntohs(packet->ip->ip_len))
// Total length of all IP headers (including options)
- (packet->ip->ip_hl*4)
- (packet->ip->ip_hl*4)
// Total length of all TCP headers (including options)
- (packet->tcp->doff*4);
// We want to get the sequence number of the last data byte, not the
// sequence number of the first byte of the next segment
record.seqEnd = record.seqStart + sequenceLength - 1;
record.totalLength = packet->packetLength;
record.timestamp = packet->packetTime;
logWrite(SENSOR,
"PacketSensor::localSend() new record: ss=%u,sl=%u,se=%u,tl=%u",
record.seqStart, sequenceLength, record.seqEnd,
record.totalLength);
globalSequence.seqEnd = record.seqEnd;
if (unacked.empty())
{
globalSequence.seqStart = record.seqStart;
- (packet->tcp->doff*4);
// We want to get the sequence number of the last data byte, not the
// sequence number of the first byte of the next segment
record.seqEnd = record.seqStart + sequenceLength - 1;
record.totalLength = packet->packetLength;
record.timestamp = packet->packetTime;
logWrite(SENSOR,
"PacketSensor::localSend() new record: ss=%u,sl=%u,se=%u,tl=%u",
record.seqStart, sequenceLength, record.seqEnd,
record.totalLength);
globalSequence.seqEnd = record.seqEnd;
if (unacked.empty())
{
globalSequence.seqStart = record.seqStart;
globalSequence.seqEnd = record.seqEnd;
}
logWrite(SENSOR,
"PacketSensor::localSend(): global start = %u, global end = %u",
globalSequence.seqStart, globalSequence.seqEnd);
unacked.push_back(record);
}
logWrite(SENSOR,
"PacketSensor::localSend(): global start = %u, global end = %u",
globalSequence.seqStart, globalSequence.seqEnd);
unacked.push_back(record);
}
ackedSize = 0;
ackedSendTime = Time();
ackedSize = 0;
ackedSendTime = Time();
}
}
void PacketSensor::localAck(PacketInfo * packet)
{
list<SentPacket>::iterator pos = unacked.begin();
list<SentPacket>::iterator limit = unacked.end();
bool found = false;
ackedSize = 0;
if (state->getState() == StateSensor::ESTABLISHED)
{
list<SentPacket>::iterator pos = unacked.begin();
list<SentPacket>::iterator limit = unacked.end();
bool found = false;
ackedSize = 0;
/*
* When we get an ACK, the sequence number is really the next one the peer
* excects to see: thus, the last sequence number it's ACKing is one less
* than this.
* Note: This should handle wraparound properly
*/
uint32_t ack_for = ntohl(packet->tcp->ack_seq) - 1;
logWrite(SENSOR, "PacketSensor::localAck() for sequence number %u",
ack_for);
/*
* Make sure this packet doesn't have a SACK option, in which case our
* calculation is wrong.
*/
list<Option>::iterator opt;
for (opt = packet->tcpOptions->begin();
opt != packet->tcpOptions->end();
++opt) {
if (opt->type == TCPOPT_SACK) {
logWrite(ERROR,"Packet has a SACK option!");
}
}
/*
* When we get an ACK, the sequence number is really the next one the peer
* excects to see: thus, the last sequence number it's ACKing is one less
* than this.
* Note: This should handle wraparound properly
*/
uint32_t ack_for = ntohl(packet->tcp->ack_seq) - 1;
logWrite(SENSOR, "PacketSensor::localAck() for sequence number %u",
ack_for);
while (pos != limit && !found)
{
found = pos->inSequenceBlock(ack_for);
/*
* XXX: Assumes that SACK is not in use - assumes that this ACK is for all
* sequence numbers up to the one it's ACKing
* Make sure this packet doesn't have a SACK option, in which case our
* calculation is wrong.
*/
ackedSize += pos->totalLength;
if (found)
list<Option>::iterator opt;
for (opt = packet->tcpOptions->begin();
opt != packet->tcpOptions->end();
++opt) {
if (opt->type == TCPOPT_SACK) {
logWrite(ERROR,"Packet has a SACK option!");
}
}
while (pos != limit && !found)
{
ackedSendTime = pos->timestamp;
found = pos->inSequenceBlock(ack_for);
/*
* XXX: Assumes that SACK is not in use - assumes that this ACK
* is for all sequence numbers up to the one it's ACKing
*/
ackedSize += pos->totalLength;
if (found)
{
ackedSendTime = pos->timestamp;
}
if (pos != limit)
{
++pos;
}
}
if (pos != limit)
if (!found)
{
++pos;
logWrite(ERROR, "Could not find the ack sequence number in list "
"of unacked packets.");
ackedSize = 0;
ackedSendTime = Time();
return;
}
unacked.erase(unacked.begin(), pos);
if (unacked.empty())
{
globalSequence.seqStart = 0;
globalSequence.seqEnd = 0;
}
else
{
globalSequence.seqStart = unacked.front().seqStart;
}
}
if (!found)
{
logWrite(ERROR, "Could not find the ack sequence number in list "
"of unacked packets.");
ackedSize = 0;
ackedSendTime = Time();
return;
}
unacked.erase(unacked.begin(), pos);
if (unacked.empty())
{
globalSequence.seqStart = 0;
globalSequence.seqEnd = 0;
}
else
{
globalSequence.seqStart = unacked.front().seqStart;
}
logWrite(SENSOR, "PacketSensor::localAck() decided on size %u",
ackedSize);
logWrite(SENSOR, "PacketSensor::localAck() decided on size %u",
ackedSize);
}
}
bool PacketSensor::SentPacket::inSequenceBlock(unsigned int sequence)
......
......@@ -8,10 +8,12 @@
#include "Sensor.h"
#include "Time.h"
class StateSensor;
class PacketSensor : public Sensor
{
public:
PacketSensor();
PacketSensor(StateSensor * newState);
// Get the size of the acknowledgement in bytes.
int getAckedSize(void) const;
// Get the time of the last packet sent which was acked.
......@@ -36,6 +38,8 @@ private:
Time ackedSendTime;
std::list<SentPacket> unacked;
SentPacket globalSequence;
StateSensor * state;
};
#endif
......@@ -33,24 +33,29 @@ Sensor * Sensor::getTail(void)
void Sensor::addNode(auto_ptr<Sensor> node)
{
next = node;
}
void Sensor::captureSend(PacketInfo * packet)
{
localSend(packet);
if (next.get() != NULL)
if (next.get() == NULL)
{
next->captureSend(packet);
next = node;
}
else
{
logWrite(ERROR, "Sensor::addNode(): A list tail was asked to add a node.");
}
}
void Sensor::captureAck(PacketInfo * packet)
void Sensor::capturePacket(PacketInfo * packet)
{
localAck(packet);
if (packet->packetType == PACKET_INFO_SEND_COMMAND)
{
localSend(packet);
}
else
{
localAck(packet);
}
if (next.get() != NULL)
{
next->captureAck(packet);
next->capturePacket(packet);
}
}
......
......@@ -17,8 +17,7 @@ public:
virtual ~Sensor();
Sensor * getTail(void);
void addNode(std::auto_ptr<Sensor> node);
void captureSend(PacketInfo * packet);
void captureAck(PacketInfo * packet);
void capturePacket(PacketInfo * packet);
private:
std::auto_ptr<Sensor> next;
protected:
......
......@@ -6,6 +6,7 @@
#include "Sensor.h"
#include "Command.h"
#include "StateSensor.h"
#include "PacketSensor.h"
#include "DelaySensor.h"
#include "MinDelaySensor.h"
......@@ -53,6 +54,9 @@ void SensorList::addSensor(SensorCommand const & newSensor)
case NULL_SENSOR:
pushNullSensor();
break;
case STATE_SENSOR:
pushStateSensor();
break;
case PACKET_SENSOR:
pushPacketSensor();
break;
......@@ -87,6 +91,7 @@ void SensorList::reset(void)
tail = NULL;
// Dependency = NULL here
depNullSensor = NULL;
depStateSensor = NULL;
depPacketSensor = NULL;
depDelaySensor = NULL;
depThroughputSensor = NULL;
......@@ -97,6 +102,7 @@ void SensorList::pushSensor(std::auto_ptr<Sensor> newSensor)