Commit 80d60cf6 authored by Pramod R Sanaga's avatar Pramod R Sanaga
Browse files

1)Added 3 new sensors - Loss, Rtt and AvgThroughput.

2) Modified SensorList.h, SensorList.cc and lib.h to include the new sensors.
3) made changes in UdppacketSensor to accommodate the above new ones.
4) Added a script makeUdpPlots.py to create gnuplot graphs from the output
of UDP sensors.
parent c5d2f208
......@@ -105,6 +105,15 @@ void SensorList::addSensor(SensorCommand const & newSensor)
case UDP_MAXDELAY_SENSOR:
pushUdpMaxDelaySensor();
break;
case UDP_RTT_SENSOR:
pushUdpRttSensor();
break;
case UDP_LOSS_SENSOR:
pushUdpLossSensor();
break;
case UDP_AVG_THROUGHPUT_SENSOR:
pushUdpAvgThroughputSensor();
break;
default:
logWrite(ERROR,
"Incorrect sensor type (%d). Ignoring add sensor command.",
......@@ -401,3 +410,62 @@ void SensorList::pushUdpMaxDelaySensor()
}
}
void SensorList::pushUdpRttSensor()
{
pushUdpPacketSensor();
// Example dependency check
if (depUdpRttSensor == NULL)
{
logWrite(SENSOR, "Adding UdpRttSensor");
UdpRttSensor * newSensor = new UdpRttSensor(depUdpPacketSensor);
std::auto_ptr<Sensor> current(newSensor);
pushSensor(current);
// Example dependency set
depUdpRttSensor = newSensor;
}
}
void SensorList::pushUdpLossSensor()
{
pushUdpPacketSensor();
pushUdpRttSensor();
// Example dependency check
if (depUdpLossSensor == NULL)
{
logWrite(SENSOR, "Adding UdpLossSensor");
UdpLossSensor * newSensor = new UdpLossSensor(depUdpPacketSensor, depUdpRttSensor);
std::auto_ptr<Sensor> current(newSensor);
pushSensor(current);
// Example dependency set
depUdpLossSensor = newSensor;
}
}
void SensorList::pushUdpAvgThroughputSensor()
{
pushUdpPacketSensor();
pushUdpLossSensor();
// Example dependency check
if (depUdpAvgThroughputSensor == NULL)
{
logWrite(SENSOR, "Adding UdpAvgThroughputSensor");
UdpAvgThroughputSensor * newSensor = new UdpAvgThroughputSensor(depUdpPacketSensor, depUdpLossSensor);
std::auto_ptr<Sensor> current(newSensor);
pushSensor(current);
// Example dependency set
depUdpAvgThroughputSensor = newSensor;
}
}
......@@ -29,6 +29,9 @@
#include "UdpThroughputSensor.h"
#include "UdpMinDelaySensor.h"
#include "UdpMaxDelaySensor.h"
#include "UdpRttSensor.h"
#include "UdpLossSensor.h"
#include "UdpAvgThroughputSensor.h"
class SensorCommand;
......@@ -49,6 +52,9 @@ class UdpPacketSensor;
class UdpThroughputSensor;
class UdpMaxDelaySensor;
class UdpMinDelaySensor;
class UdpRttSensor;
class UdpLossSensor;
class UdpAvgThroughputSensor;
// Udp - CHANGES - End
class SensorList
......@@ -84,6 +90,9 @@ private:
void pushUdpThroughputSensor(void);
void pushUdpMinDelaySensor(void);
void pushUdpMaxDelaySensor(void);
void pushUdpRttSensor(void);
void pushUdpLossSensor(void);
void pushUdpAvgThroughputSensor(void);
// Udp - CHANGES - End
private:
......@@ -102,7 +111,10 @@ private:
UdpThroughputSensor const * depUdpThroughputSensor;
UdpMinDelaySensor const * depUdpMinDelaySensor;
UdpMaxDelaySensor const * depUdpMaxDelaySensor;
// Udp - CHANGES - End
UdpRttSensor const * depUdpRttSensor;
UdpLossSensor const * depUdpLossSensor;
UdpAvgThroughputSensor const * depUdpAvgThroughputSensor;
//Udp - CHANGES - End
};
......
#include "UdpAvgThroughputSensor.h"
using namespace std;
UdpAvgThroughputSensor::UdpAvgThroughputSensor(UdpPacketSensor const *packetHistoryVal, UdpLossSensor const *lossSensorVal)
: packetHistory(packetHistoryVal),
lossSensor(lossSensorVal),
lastAckTime(0),
throughputKbps(0.0),
numSamples(0),
queuePtr(0)
{
}
UdpAvgThroughputSensor::~UdpAvgThroughputSensor()
{
}
void UdpAvgThroughputSensor::localSend(PacketInfo *packet)
{
// Do nothing.
}
void UdpAvgThroughputSensor::localAck(PacketInfo *packet)
{
// This is a re-ordered ACK - don't do anything
// with it - just return.
if( packetHistory->isAckValid() == false )
{
ackValid = false;
return;
}
// Find out how many redundant ACKs this packet is carrying - 0 to 121.
unsigned char numRedunAcksChar = 0;
memcpy(&numRedunAcksChar, &packet->payload[0], global::UCHAR_SIZE);
int numRedunAcks = static_cast<int>(numRedunAcksChar);
int numThroughputAcks = 1;
double avgThroughput = 0;
// This is the timestamp at the receiver, when the original packet was received.
unsigned long long currentAckTimeStamp = *(unsigned long long *)(packet->payload + 1 + 2*global::USHORT_INT_SIZE );
unsigned long long timeStamp = packet->packetTime.toMicroseconds();
// This is the first ACK we have seen, store its receiver timestamp
// and return, we cannot calculate throughput from just one ACK - at least 2.
if(lastAckTime == 0)
{
lastAckTime = currentAckTimeStamp;
return;
}
unsigned short int seqNum = *(unsigned int *)(packet->payload + 1);
unsigned short int echoedPacketSize = *(unsigned short int *)(packet->payload + 1 + global::USHORT_INT_SIZE);
unsigned long long ackTimeDiff = (currentAckTimeStamp - lastAckTime);
vector<UdpPacketInfo>::iterator vecIterator;
UdpAck tmpUdpAck;
vector<UdpPacketInfo > ackedPackets = packetHistory->getAckedPackets();
unsigned long long timeDiff = 0;
// Average the throughput over all the packets being acknowledged.
if(numRedunAcks > 0)
{
int i;
unsigned short int redunSeqNum;
unsigned short int redunPacketSize;
for(i = 0;i < numRedunAcks; i++)
{
redunSeqNum = *(unsigned short int *)(packet->payload + 1 + global::udpMinAckPacketSize + i*global::udpRedunAckSize);
redunPacketSize = *(unsigned short int *)(packet->payload + 1 + global::udpMinAckPacketSize + i*global::udpRedunAckSize + global::USHORT_INT_SIZE);
// Find if this redundant ACK is useful - or it was acked before.
vecIterator = find_if(ackedPackets.begin(), ackedPackets.end(), bind2nd(equalSeqNum(), redunSeqNum));
if(vecIterator != ackedPackets.end())
{
// Calculate throughput for the packet being acked by
// the redundant ACK.
timeDiff = *(unsigned long long *)(packet->payload + 1 + global::udpMinAckPacketSize + i*global::udpRedunAckSize + global::udpSeqNumSize);
if(ackTimeDiff - timeDiff == 0)
continue;
numThroughputAcks++;
tmpUdpAck.timeTaken = ackTimeDiff - timeDiff;
// We lost the record of the size of this packet due to libpcap
// loss, use the length echoed back in the ACK.
if((*vecIterator).isFake == true)
{
avgThroughput += 8000000.0*( static_cast<double> ( redunPacketSize )) / ( static_cast<double>(ackTimeDiff - timeDiff)*1024.0 );
tmpUdpAck.packetSize = redunPacketSize;
}
else
{
avgThroughput += 8000000.0*( static_cast<double> ( (*vecIterator).packetSize )) / ( static_cast<double>(ackTimeDiff - timeDiff)*1024.0 );
tmpUdpAck.packetSize = (*vecIterator).packetSize;
}
if(tmpUdpAck.packetSize < 0)
{
printf("Packetsize negative for redunAck = %d, isFake = %d\n", redunSeqNum,packetHistory->isAckFake());
}
ackList[queuePtr] = tmpUdpAck;
queuePtr = (queuePtr + 1)%MAX_SAMPLES;
if(numSamples < MAX_SAMPLES)
numSamples++;
ackTimeDiff = timeDiff;
}
}
}
if(ackTimeDiff == 0)
{
calculateTput(timeStamp);
lastAckTime = currentAckTimeStamp;
return;
}
// Calculate the throughput for the current packet being ACKed.
vecIterator = find_if(ackedPackets.begin(), ackedPackets.end(), bind2nd(equalSeqNum(), seqNum));
if(vecIterator == ackedPackets.end())
{
logWrite(ERROR,"Error - Incorrect ack packets state passed to UdpAvgThroughputSensor");
return;
}
// We lost the record of the size of this packet due to libpcap
// loss, use the length echoed back in the ACK.
if(packetHistory->isAckFake() == true)
{
avgThroughput += 8000000.0*( static_cast<double> (echoedPacketSize )) / ( static_cast<double>(ackTimeDiff)*1024.0 );
tmpUdpAck.packetSize = echoedPacketSize;
}
else
{
avgThroughput += 8000000.0*( static_cast<double> ((*vecIterator).packetSize )) / ( static_cast<double>(ackTimeDiff)*1024.0 );
tmpUdpAck.packetSize = (*vecIterator).packetSize;
}
tmpUdpAck.timeTaken = ackTimeDiff - timeDiff;
ackList[queuePtr] = tmpUdpAck;
queuePtr = (queuePtr + 1)%MAX_SAMPLES;
numSamples++;
if(numSamples < MAX_SAMPLES)
numSamples++;
calculateTput(timeStamp);
// Save the receiver timestamp of this ACK packet, so that we can
// use for calculating throughput for the next ACK packet.
lastAckTime = currentAckTimeStamp;
}
void UdpAvgThroughputSensor::calculateTput(unsigned long long timeStamp)
{
// We don't have enough samples to calculate the average throughput.
if(numSamples < MIN_SAMPLES)
return;
int sampleCount = ( numSamples < MAX_SAMPLES )?numSamples:MAX_SAMPLES;
int i, index;
unsigned long long timePeriod = 0;
long packetSizeSum = 0;
for(i = 0;(i < sampleCount && timePeriod < MAX_TIME_PERIOD); i++)
{
index = (queuePtr -1 - i + MAX_SAMPLES)%MAX_SAMPLES;
timePeriod += ackList[index].timeTaken;
packetSizeSum += ackList[index].packetSize;
}
// Avoid dividing by zero.
if(timePeriod == 0)
{
logWrite(ERROR, "Timeperiod is zero in UdpAvgThroughput calculation");
return;
}
// Calculate the average throughput.
throughputKbps = 8000000.0*( static_cast<double> (packetSizeSum )) / ( static_cast<double>(timePeriod)*1024.0 );
// Send a message to the monitor with the new bandwidth.
if(lossSensor->getPacketLoss() == 0)
{
// Send this available bandwidth as a tentative value.
// To be used for dummynet events only if it is greater
// than the last seen value.
logWrite(SENSOR, "AVGTPUT:TIME=%llu,TENTATIVE=%f",timeStamp,throughputKbps);
logWrite(SENSOR, "LOSS:TIME=%llu,LOSS=0",timeStamp);
}
else
{
// Send this as the authoritative available bandwidth value.
logWrite(SENSOR, "AVGTPUT:TIME=%llu,AUTHORITATIVE=%f",timeStamp,throughputKbps);
logWrite(SENSOR, "LOSS:TIME=%llu,LOSS=%d",timeStamp,lossSensor->getPacketLoss());
(const_cast<UdpLossSensor *>(lossSensor))->resetLoss();
}
logWrite(SENSOR, "STAT:TOTAL_LOSS = %d",lossSensor->getTotalPacketLoss());
}
#ifndef UDP_AVG_THROUGHPUT_SENSOR_PELAB_H
#define UDP_AVG_THROUGHPUT_SENSOR_PELAB_H
#include "lib.h"
#include "Sensor.h"
#include "UdpPacketSensor.h"
#include "UdpLossSensor.h"
class Sensor;
class UdpPacketSensor;
class UdpLossSensor;
struct UdpAck {
unsigned long long timeTaken;
long packetSize;
};
class UdpAvgThroughputSensor:public Sensor{
public:
explicit UdpAvgThroughputSensor(UdpPacketSensor const *packetHistoryVal, UdpLossSensor const *lossSensorVal);
~UdpAvgThroughputSensor();
void localSend(PacketInfo *packet);
void localAck(PacketInfo *packet);
private:
void calculateTput(unsigned long long timeStamp);
UdpPacketSensor const *packetHistory;
UdpLossSensor const *lossSensor;
const static int MIN_SAMPLES = 5;
const static int MAX_SAMPLES = 100;
const static unsigned long long MAX_TIME_PERIOD = 500000;
unsigned long long lastAckTime;
double throughputKbps;
UdpAck ackList[100];
int numSamples;
int queuePtr;
};
#endif
#include "UdpLossSensor.h"
using namespace std;
UdpLossSensor::UdpLossSensor(UdpPacketSensor const *packetHistoryVal, UdpRttSensor const *rttSensorVal)
: packetLoss(0),
totalLoss(0),
packetHistory(packetHistoryVal),
rttSensor(rttSensorVal)
{
}
UdpLossSensor::~UdpLossSensor()
{
}
void UdpLossSensor::localSend(PacketInfo *packet)
{
}
void UdpLossSensor::localAck(PacketInfo *packet)
{
// This is a re-ordered ACK - don't do anything
// with it - just return.
if( packetHistory->isAckValid() == false )
{
ackValid = false;
return;
}
unsigned short int seqNum = *(unsigned short int *)(packet->payload + 1);
list<UdpPacketInfo>& unAckedPackets = (const_cast<UdpPacketSensor *>(packetHistory))->getUnAckedPacketList();
list<UdpPacketInfo>::iterator vecIterator = unAckedPackets.begin();
list<UdpPacketInfo>::iterator tempIterator;
unsigned long long ewmaRtt = rttSensor->getRtt();
unsigned long long ewmaDevRtt = rttSensor->getDevRtt();
unsigned long long timeStamp = packet->packetTime.toMicroseconds();
while(vecIterator != unAckedPackets.end())
{
if(seqNum > (*vecIterator).seqNum + 10 || ( timeStamp > (*vecIterator).timeStamp + 10*( ewmaRtt + 4*ewmaDevRtt)) )
{
tempIterator = vecIterator;
vecIterator++;
unAckedPackets.erase(tempIterator);
packetLoss++;
}
else
vecIterator++;
}
}
long UdpLossSensor::getPacketLoss() const
{
return packetLoss;
}
long UdpLossSensor::getTotalPacketLoss() const
{
return totalLoss;
}
void UdpLossSensor::resetLoss()
{
totalLoss += packetLoss;
packetLoss = 0;
}
#ifndef UDP_LOSS_SENSOR_PELAB_H
#define UDP_LOSS_SENSOR_PELAB_H
#include "lib.h"
#include "Sensor.h"
#include "UdpPacketSensor.h"
#include "UdpRttSensor.h"
class Sensor;
class UdpPacketSensor;
class UdpRttSensor;
class UdpLossSensor:public Sensor{
public:
explicit UdpLossSensor(UdpPacketSensor const *packetHistoryVal, UdpRttSensor const *rttSensorVal);
~UdpLossSensor();
void localSend(PacketInfo *packet);
void localAck(PacketInfo *packet);
long getPacketLoss() const;
long getTotalPacketLoss() const;
void resetLoss();
private:
long packetLoss;
long totalLoss;
UdpPacketSensor const *packetHistory;
UdpRttSensor const *rttSensor;
};
#endif
......@@ -50,7 +50,6 @@ void UdpMinDelaySensor::localAck(PacketInfo *packet)
ackValid = true;
sendValid = false;
int overheadLen = 14 + 4 + 8 + packet->ip->ip_hl*4;
unsigned short int seqNum = *(unsigned short int *)(packet->payload + 1);
unsigned short int echoedPacketSize = *(unsigned short int *)(packet->payload + 1 + global::USHORT_INT_SIZE);
unsigned long long echoedTimestamp = *(unsigned long long *)(packet->payload + 1 + 2*global::USHORT_INT_SIZE + global::ULONG_LONG_SIZE);
......@@ -75,11 +74,9 @@ void UdpMinDelaySensor::localAck(PacketInfo *packet)
// Calculate the delay for the maximum sized packet.
// We lost this packet size details due to loss in libpcap, use the
// size echoed in the ACK packet - this does not included the header
// overhead for the packet - we assume that the packet on the reverse path
// has the same overhead length as the original packet.
// size echoed in the ACK packet
if(packetHistory->isAckFake() == true)
oneWayDelay = ( oneWayDelay ) * 1518 / (overheadLen + echoedPacketSize);
oneWayDelay = ( oneWayDelay ) * 1518 / (echoedPacketSize);
else
oneWayDelay = ( oneWayDelay ) * 1518 / ( (*vecIterator).packetSize);
......
......@@ -7,6 +7,7 @@ UdpPacketSensor::UdpPacketSensor()
packetLoss(0),
totalPacketLoss(0),
libpcapSendLoss(0),
statReorderedPackets(0),
ackFake(false)
{
......@@ -53,8 +54,9 @@ void UdpPacketSensor::localSend(PacketInfo *packet)
sendValid = true;
ackValid = false;
unsigned short int seqNum = *(unsigned short int *)(packet->payload);
unsigned short int packetSize = *(unsigned short int *)(packet->payload + global::USHORT_INT_SIZE) + overheadLen;
// CHANGE:
unsigned short int seqNum = ntohs(*(unsigned short int *)(packet->payload + 1));
unsigned short int packetSize = (*(unsigned short int *)(packet->payload + 1 + global::USHORT_INT_SIZE)) + overheadLen;
UdpPacketInfo tmpPacketInfo;
if(lastSeenSeqNum != -1)
......@@ -106,10 +108,17 @@ void UdpPacketSensor::localAck(PacketInfo *packet)
if(listIterator == sentPacketList.end())
{
logWrite(EXCEPTION, "Unknown UDP seq number %d is being ACKed. "
"We might have received "
" a reordered ACK, which has already been ACKed using redundant ACKs .\n", seqNum);
ackValid = false;
bool isReordered = handleReorderedAck(packet);
if(isReordered == false)
{
logWrite(EXCEPTION, "Unknown UDP seq number %d is being ACKed. "
"We might have received "
" a reordered ACK, which has already been ACKed using redundant ACKs .\n", seqNum);
ackValid = false;
}
else
ackValid = true;
}
else
ackValid = true;
......@@ -169,6 +178,30 @@ void UdpPacketSensor::localAck(PacketInfo *packet)
sentPacketList.erase(listIterator);
}
else
{
// Check whether this ACK corresponds to a packet reordered
// on the forward path.
list<UdpPacketInfo >::iterator reOrderedPacketIterator ;
reOrderedPacketIterator = find_if(unAckedPacketList.begin(), unAckedPacketList.end(), bind2nd(equalSeqNum(), redunSeqNum));
// An unacked packet exists with this sequence number, delete it
// from the list and consider it acked.
if(reOrderedPacketIterator != unAckedPacketList.end())
{
tmpPacketInfo.seqNum = (*reOrderedPacketIterator).seqNum;
tmpPacketInfo.packetSize = (*reOrderedPacketIterator).packetSize;
tmpPacketInfo.timeStamp = (*reOrderedPacketIterator).timeStamp;
tmpPacketInfo.isFake = (*reOrderedPacketIterator).isFake;
ackedPackets.push_back(tmpPacketInfo);
unAckedPacketList.erase(reOrderedPacketIterator);
statReorderedPackets++;
logWrite(SENSOR,"STAT:: Number of reordered packets = %d",statReorderedPackets);
}
}
}
}
......@@ -194,6 +227,16 @@ void UdpPacketSensor::localAck(PacketInfo *packet)
do{
logWrite(SENSOR, "STAT::Lost packet seqnum = %d",(*listIterator).seqNum);
// This packet might have been lost - but store it as UnAcked
// to account for reordering on the forward path.
tmpPacketInfo.seqNum = (*listIterator).seqNum;
tmpPacketInfo.packetSize = (*listIterator).packetSize;
tmpPacketInfo.timeStamp = (*listIterator).timeStamp;
tmpPacketInfo.isFake = (*listIterator).isFake;
unAckedPacketList.push_back(tmpPacketInfo);
sentPacketList.erase(listIterator);
listIterator = sentPacketList.end();
......@@ -209,3 +252,29 @@ void UdpPacketSensor::localAck(PacketInfo *packet)
sentPacketList.erase(curPacketIterator);
}
bool UdpPacketSensor::handleReorderedAck(PacketInfo *packet)
{
list<UdpPacketInfo >::iterator listIterator;
unsigned short int seqNum = *(unsigned short int *)(packet->payload + 1);
bool retVal = false;