Commit 68d3238e authored by Jonathon Duerig's avatar Jonathon Duerig

Rolled the AverageThroughputSensor into a special case of...

Rolled the AverageThroughputSensor into a special case of LeastSquaresThroughput. It still uses the same command line arguments/constants.
parent 4225023b
// AverageThroughputSensor.cc
#include "lib.h"
#include "AverageThroughputSensor.h"
#include "TSThroughputSensor.h"
#include "CommandOutput.h"
using namespace std;
AverageThroughputSensor::AverageThroughputSensor(
TSThroughputSensor const * newThroughput)
: throughput(newThroughput)
, latest(0)
, sampleCount(0)
{
}
AverageThroughputSensor::~AverageThroughputSensor()
{
}
void AverageThroughputSensor::localSend(PacketInfo * packet)
{
ackValid = false;
sendValid = true;
}
void AverageThroughputSensor::localAck(PacketInfo * packet)
{
sendValid = false;
if (throughput->isAckValid())
{
Ack newAck;
newAck.size = throughput->getLastByteCount();
newAck.period = throughput->getLastPeriod();
if (sampleCount > 0)
{
// If the buffer is not empty, then we want to put the new ack
// in the slot after latest.
latest = (latest + 1) % MAX_SAMPLE_COUNT;
}
// Otherwise, latest just points to the first ack and we want to
// fill it.
++sampleCount;
samples[latest] = newAck;
int byteSum = 0;
uint32_t periodSum = 0;
int i = 0;
int limit = min(static_cast<int>(MAX_SAMPLE_COUNT), sampleCount);
for (; i < limit && periodSum < AVERAGE_PERIOD; ++i)
{
// Add an extra MAX_SAMPLE_COUNT because taking the mod of a
// negative dividend is undefined (could be negative, could be
// from division rounding up or down, etc.)
int index = (latest - i + MAX_SAMPLE_COUNT) % MAX_SAMPLE_COUNT;
byteSum += samples[index].size;
periodSum += samples[index].period;
}
int result = throughput->getThroughputInKbps(periodSum, byteSum);
logWrite(SENSOR, "AVERAGE_THROUGHPUT: %d, period %u, kilobits %f",
result, periodSum, byteSum*(8.0/1000.0));
ackValid = true;
}
}
// AverageThroughputSensor.h
// A throughput sensor which averages the measure over a minimum
// period of time.
#ifndef AVERAGE_THROUGHPUT_SENSOR_H
#define AVERAGE_THROUGHPUT_SENSOR_H
#include "Sensor.h"
class TSThroughputSensor;
class AverageThroughputSensor : public Sensor
{
public:
AverageThroughputSensor(TSThroughputSensor const * newThroughput);
virtual ~AverageThroughputSensor();
protected:
virtual void localSend(PacketInfo * packet);
virtual void localAck(PacketInfo * packet);
private:
struct Ack
{
Ack() : size(0), period(0) {}
// The size begin acked (in bytes)
int size;
// The amount of time to the previous ack.
uint32_t period;
};
private:
TSThroughputSensor const * throughput;
// The period of time to average over (in milliseconds)
static const uint32_t AVERAGE_PERIOD = 500;
enum { MAX_SAMPLE_COUNT = 100 };
Ack samples[MAX_SAMPLE_COUNT];
int latest;
int sampleCount;
};
#endif
......@@ -8,14 +8,19 @@
using namespace std;
const int LeastSquaresThroughput::MAX_SAMPLE_COUNT;
const int LeastSquaresThroughput::DEFAULT_MAX_PERIOD;
LeastSquaresThroughput::LeastSquaresThroughput(
TSThroughputSensor const * newThroughput,
DelaySensor const * newDelay)
DelaySensor const * newDelay,
int newMaxPeriod)
: throughput(newThroughput)
, delay(newDelay)
, oldest(0)
, latest(0)
, totalSamples(0)
, maxPeriod(newMaxPeriod)
, lastReport(0)
{
}
......@@ -35,106 +40,111 @@ void LeastSquaresThroughput::localAck(PacketInfo * packet)
sendValid = false;
if (throughput->isAckValid() && delay->isAckValid())
{
// throughputSamples[oldest] = throughput->getThroughputInKbps();
byteSamples[oldest] = throughput->getLastByteCount();
timeSamples[oldest] = throughput->getLastPeriod();
delaySamples[oldest] = delay->getLastDelay();
oldest = (oldest + 1) % SAMPLE_COUNT;
if (totalSamples > 0)
{
latest = (latest + 1) % MAX_SAMPLE_COUNT;
}
Ack newAck;
newAck.size = throughput->getLastByteCount();
newAck.period = throughput->getLastPeriod();
newAck.rtt = delay->getLastDelay();
samples[latest] = newAck;
++totalSamples;
if (totalSamples >= SAMPLE_COUNT)
int i = 0;
int limit = min(MAX_SAMPLE_COUNT, totalSamples);
int byteTotal = 0;
uint32_t timeTotal = 0;
int x_i = 0;
int y_i = 0;
double numA = 0.0;
double numB = 0.0;
double numC = 0.0;
double numD = limit;
double denomA = 0.0;
double denomB = 0.0;
double denomC = 0.0;
double denomD = limit;
for (; i < limit && (timeTotal <= static_cast<uint32_t>(maxPeriod)
|| maxPeriod <= 0); ++i)
{
int i = 0;
// double throughputAverage = 0;
int byteTotal = 0;
uint32_t timeTotal = 0;
int x_i = 0;
int y_i = 0;
double numA = 0.0;
double numB = 0.0;
double numC = 0.0;
double numD = SAMPLE_COUNT;
double denomA = 0.0;
double denomB = 0.0;
double denomC = 0.0;
double denomD = SAMPLE_COUNT;
for (; i < SAMPLE_COUNT; ++i)
{
int index = (oldest + i) % SAMPLE_COUNT;
// throughputAverage += throughputSamples[index];
byteTotal += byteSamples[index];
timeTotal += timeSamples[index];
logWrite(SENSOR_COMPLETE, "LeastSquares: ***Delay sample #%d: %d", i,
delaySamples[index]);
logWrite(SENSOR_COMPLETE, "LeastSquares: Period sample: %d",
timeSamples[index]);
logWrite(SENSOR_COMPLETE, "LeastSquares: Kilobit sample: %f",
byteSamples[index]*(8.0/1000.0));
x_i += timeSamples[index];
y_i = delaySamples[index];
numA += x_i * y_i;
numB += x_i;
numC += y_i;
denomA += x_i * x_i;
denomB += x_i;
denomC += x_i;
}
// Calculate throughput.
// throughputAverage /= SAMPLE_COUNT;
logWrite(SENSOR, "LeastSquares: timeTotal: %d, kilobitTotal: %f",
timeTotal, byteTotal*(8.0/1000.0));
double throughputAverage = throughput->getThroughputInKbps(timeTotal,
byteTotal);
double num = (numA * numD) - (numB * numC);
double denom = (denomA * denomD) - (denomB * denomC);
// Theoretically denom cannot be 0 because our x values are
// sample numbers which monotonically increase.
double slope = 0.0;
if (fabs(denom) > 0.00001)
{
slope = num/denom;
}
// Add an extra MAX_SAMPLE_COUNT because taking the mod of a
// negative dividend is undefined (could be negative, could be
// from division rounding up or down, etc.)
int index = (latest - i + MAX_SAMPLE_COUNT) % MAX_SAMPLE_COUNT;
byteTotal += samples[index].size;
timeTotal += samples[index].period;
logWrite(SENSOR, "LeastSquares: SLOPE: %f TPA: %i LR:%i", slope,
static_cast<int>(throughputAverage),lastReport);
logWrite(SENSOR_COMPLETE, "LeastSquares: ***Delay sample #%d: %d", i,
samples[index].rtt);
logWrite(SENSOR_COMPLETE, "LeastSquares: Period sample: %d",
samples[index].period);
logWrite(SENSOR_COMPLETE, "LeastSquares: Kilobit sample: %f",
samples[index].size*(8.0/1000.0));
if (slope > 0.0)
{
// The closest linear approximation indicates that buffers are
// being filled up, which means that the link was saturated
// over the last SAMPLE_COUNT samples. So use the average to
// yield a result.
if (static_cast<int>(throughputAverage) != lastReport)
{
lastReport = static_cast<int>(throughputAverage);
ostringstream buffer;
buffer << static_cast<int>(throughputAverage);
global::output->genericMessage(AUTHORITATIVE_BANDWIDTH, buffer.str(),
packet->elab);
}
}
else
x_i += samples[index].period;
y_i = samples[index].rtt;
numA += x_i * y_i;
numB += x_i;
numC += y_i;
denomA += x_i * x_i;
denomB += x_i;
denomC += x_i;
}
// Calculate throughput.
logWrite(SENSOR, "LeastSquares: timeTotal: %d, kilobitTotal: %f",
timeTotal, byteTotal*(8.0/1000.0));
double throughputAverage = throughput->getThroughputInKbps(timeTotal,
byteTotal);
double num = (numA * numD) - (numB * numC);
double denom = (denomA * denomD) - (denomB * denomC);
// Theoretically denom cannot be 0 because our x values are
// sample numbers which monotonically increase.
double slope = 0.0;
if (fabs(denom) > 0.00001)
{
slope = num/denom;
}
logWrite(SENSOR, "LeastSquares: SLOPE: %f TPA: %i LR:%i", slope,
static_cast<int>(throughputAverage),lastReport);
// We have reversed the points left to right. This means that the
// line is reflected. So decreasing slope on the reflected-line
// means that there would have been increasing slope on the
// original line and therefore increasing delays.
if (slope < 0.0)
{
// The closest linear approximation indicates that buffers are
// being filled up, which means that the link was saturated
// over the last MAX_SAMPLE_COUNT samples. So use the average to
// yield a result.
if (static_cast<int>(throughputAverage) != lastReport)
{
// The buffers are not being filled up. So we just have a
// tentative throughput measurement.
if (static_cast<int>(throughputAverage) > lastReport)
{
lastReport = static_cast<int>(throughputAverage);
ostringstream buffer;
buffer << static_cast<int>(throughputAverage);
global::output->genericMessage(TENTATIVE_THROUGHPUT, buffer.str(),
packet->elab);
}
lastReport = static_cast<int>(throughputAverage);
ostringstream buffer;
buffer << static_cast<int>(throughputAverage);
global::output->genericMessage(AUTHORITATIVE_BANDWIDTH, buffer.str(),
packet->elab);
}
ackValid = true;
}
else
{
ackValid = false;
// The buffers are not being filled up. So we just have a
// tentative throughput measurement.
if (static_cast<int>(throughputAverage) > lastReport)
{
lastReport = static_cast<int>(throughputAverage);
ostringstream buffer;
buffer << static_cast<int>(throughputAverage);
global::output->genericMessage(TENTATIVE_THROUGHPUT, buffer.str(),
packet->elab);
}
}
ackValid = true;
}
else
{
......
......@@ -13,33 +13,41 @@ class DelaySensor;
class LeastSquaresThroughput : public Sensor
{
public:
// The default max period to use.
static const int DEFAULT_MAX_PERIOD = 500;
public:
LeastSquaresThroughput(TSThroughputSensor const * newThroughput,
DelaySensor const * newDelay);
DelaySensor const * newDelay,
int newMaxPeriod = 0);
virtual ~LeastSquaresThroughput();
protected:
virtual void localSend(PacketInfo * packet);
virtual void localAck(PacketInfo * packet);
private:
struct Ack
{
Ack() : size(0), period(0), rtt(0) {}
int size; // in bytes
uint32_t period; // in milliseconds
int rtt; // in milliseconds
};
private:
TSThroughputSensor const * throughput;
DelaySensor const * delay;
// The number of samples kept at any given time.
static const int SAMPLE_COUNT = 50;
// Circular buffers of the last SAMPLE_COUNT samples.
// The number of bytes in each sample. Used for average throughput
// calculation.
int byteSamples[SAMPLE_COUNT];
// The delta time of each sample. This is the difference between
// the time of the ack at that sample and the time of the ack at
// the previous sample (in milliseconds).
uint32_t timeSamples[SAMPLE_COUNT];
// int throughputSamples[SAMPLE_COUNT];
int delaySamples[SAMPLE_COUNT];
// The index of the oldest stored sample.
int oldest;
static const int MAX_SAMPLE_COUNT = 100;
// Circular buffer of the last MAX_SAMPLE_COUNT samples.
Ack samples[MAX_SAMPLE_COUNT];
// The index of the latest stored sample.
int latest;
// The total number of samples ever encountered.
int totalSamples;
// The maximum amount of time to look backwards in milliseconds. If
// this number is <= 0 then all available samples are used.
int maxPeriod;
// The last number reported to the monitor in kbps.
// Only send bandwidth if it is different than this number.
......
......@@ -15,7 +15,6 @@
#include "TSThroughputSensor.h"
#include "EwmaThroughputSensor.h"
#include "LeastSquaresThroughput.h"
#include "AverageThroughputSensor.h"
using namespace std;
......@@ -304,8 +303,11 @@ void SensorList::pushAverageThroughputSensor(void)
{
// Dependency list
pushTSThroughputSensor();
pushDelaySensor();
logWrite(SENSOR, "Adding AverageThroughputSensor");
std::auto_ptr<Sensor> current(new AverageThroughputSensor(depTSThroughputSensor));
std::auto_ptr<Sensor> current(
new LeastSquaresThroughput(depTSThroughputSensor, depDelaySensor,
LeastSquaresThroughput::DEFAULT_MAX_PERIOD));
pushSensor(current);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment