UdpAvgThroughputSensor.cc 9.01 KB
Newer Older
1 2
/*
 * Copyright (c) 2006 University of Utah and the Flux Group.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * 
 * {{{EMULAB-LICENSE
 * 
 * This file is part of the Emulab network testbed software.
 * 
 * This file is free software: you can redistribute it and/or modify it
 * under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at
 * your option) any later version.
 * 
 * This file is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this file.  If not, see <http://www.gnu.org/licenses/>.
 * 
 * }}}
22 23
 */

24
#include "UdpAvgThroughputSensor.h"
25
#include "CommandOutput.h"
26 27 28 29 30 31 32 33

using namespace std;

UdpAvgThroughputSensor::UdpAvgThroughputSensor(UdpPacketSensor const *packetHistoryVal, UdpLossSensor const *lossSensorVal)
	: packetHistory(packetHistoryVal),
	lossSensor(lossSensorVal),
	lastAckTime(0),
	throughputKbps(0.0),
34
	lastSeenThroughput(0),
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
	numSamples(0),
	queuePtr(0)
{
}

UdpAvgThroughputSensor::~UdpAvgThroughputSensor()
{

}

void UdpAvgThroughputSensor::localSend(PacketInfo *packet)
{
	// Do nothing.

}

void UdpAvgThroughputSensor::localAck(PacketInfo *packet)
{

	// This is a re-ordered ACK - don't do anything
	// with it - just return.
	if( packetHistory->isAckValid() == false )
	{
		ackValid = false;
		return;
	}

	// Find out how many redundant ACKs this packet is carrying - 0 to 121.
	unsigned char numRedunAcksChar = 0;
	memcpy(&numRedunAcksChar, &packet->payload[0], global::UCHAR_SIZE);
	int numRedunAcks = static_cast<int>(numRedunAcksChar);

	// This is the timestamp at the receiver, when the original packet was received.
	unsigned long long currentAckTimeStamp = *(unsigned long long *)(packet->payload + 1 + 2*global::USHORT_INT_SIZE ); 
	unsigned long long timeStamp = packet->packetTime.toMicroseconds();

	// This is the first ACK we have seen, store its receiver timestamp
	// and return, we cannot calculate throughput from just one ACK - at least 2.
	if(lastAckTime == 0)
	{
		lastAckTime = currentAckTimeStamp;
		return;
	}

	unsigned short int seqNum = *(unsigned int *)(packet->payload + 1);
	unsigned short int echoedPacketSize = *(unsigned short int *)(packet->payload + 1 + global::USHORT_INT_SIZE);



	unsigned long long ackTimeDiff = (currentAckTimeStamp - lastAckTime);
	vector<UdpPacketInfo>::iterator vecIterator;
	UdpAck tmpUdpAck;
	vector<UdpPacketInfo > ackedPackets = packetHistory->getAckedPackets();

	unsigned long long timeDiff = 0;
	// Average the throughput over all the packets being acknowledged.
	if(numRedunAcks > 0)
	{
		int i;
		unsigned short int redunSeqNum;
		unsigned short int redunPacketSize;

		for(i = 0;i < numRedunAcks; i++)
		{
99 100
			redunSeqNum = *(unsigned short int *)(packet->payload + global::udpMinAckPacketSize + i*global::udpRedunAckSize);
			redunPacketSize = *(unsigned short int *)(packet->payload + global::udpMinAckPacketSize + i*global::udpRedunAckSize + global::USHORT_INT_SIZE);
101 102 103 104 105 106 107 108 109

			// Find if this redundant ACK is useful - or it was acked before.
			vecIterator = find_if(ackedPackets.begin(), ackedPackets.end(), bind2nd(equalSeqNum(), redunSeqNum));

			if(vecIterator != ackedPackets.end())
			{
				// Calculate throughput for the packet being acked by
				// the redundant ACK.

110
				timeDiff = *(unsigned long long *)(packet->payload + global::udpMinAckPacketSize + i*global::udpRedunAckSize + 2*global::USHORT_INT_SIZE);
111

112 113 114
				if((timeDiff > ackTimeDiff) || (ackTimeDiff - timeDiff == 0))
				{
					logWrite(EXCEPTION, "Error using UDP redun Seqnum = %d, for seqNum = %d, time taken = %llu,ackTimeDiff = %llu, timeDiff = %llu, i = %d, numAcks = %d",redunSeqNum,seqNum, ackTimeDiff - timeDiff,ackTimeDiff,timeDiff, i, numRedunAcks);
115
					continue;
116
				}
117 118

				tmpUdpAck.timeTaken = ackTimeDiff - timeDiff;
119 120
				tmpUdpAck.isRedun = true;
				tmpUdpAck.seqNum = redunSeqNum;
121
				tmpUdpAck.numPackets = 1;
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147


				// We lost the record of the size of this packet due to libpcap
				// loss, use the length echoed back in the ACK.
				if((*vecIterator).isFake == true)
				{
					tmpUdpAck.packetSize = redunPacketSize;
				}
				else
				{
					tmpUdpAck.packetSize = (*vecIterator).packetSize;
				}

				ackList[queuePtr] = tmpUdpAck;
				queuePtr = (queuePtr + 1)%MAX_SAMPLES;
				if(numSamples < MAX_SAMPLES)
					numSamples++;

				ackTimeDiff = timeDiff;
			}

		}
	}

	if(ackTimeDiff == 0)
	{
148
		calculateTput(timeStamp, packet);
149
		lastAckTime = currentAckTimeStamp;
150
		logWrite(ERROR,"Error - Two UDP ACKs report the same receive time, for seqNum = %d",seqNum);
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
		return;
	}

	// Calculate the throughput for the current packet being ACKed.
	vecIterator = find_if(ackedPackets.begin(), ackedPackets.end(), bind2nd(equalSeqNum(), seqNum));

	if(vecIterator == ackedPackets.end())
	{
		logWrite(ERROR,"Error - Incorrect ack packets state passed to UdpAvgThroughputSensor");
		return;
	}

	// We lost the record of the size of this packet due to libpcap
	// loss, use the length echoed back in the ACK.
	if(packetHistory->isAckFake() == true)
	{
		tmpUdpAck.packetSize = echoedPacketSize;
	}
	else
	{
		tmpUdpAck.packetSize = (*vecIterator).packetSize;
	}

174 175
	//tmpUdpAck.timeTaken = ackTimeDiff - timeDiff;
	tmpUdpAck.timeTaken = ackTimeDiff;
176 177
	tmpUdpAck.isRedun = false;
	tmpUdpAck.seqNum = seqNum;
178
	tmpUdpAck.numPackets = lossSensor->getPacketLoss() + 1;
179 180 181 182 183 184 185

	ackList[queuePtr] = tmpUdpAck;
	queuePtr = (queuePtr + 1)%MAX_SAMPLES;

	if(numSamples < MAX_SAMPLES)
		numSamples++;

186
	calculateTput(timeStamp, packet);
187 188 189 190 191 192

	// Save the receiver timestamp of this ACK packet, so that we can
	// use for calculating throughput for the next ACK packet.
	lastAckTime = currentAckTimeStamp;
}

193
void UdpAvgThroughputSensor::calculateTput(unsigned long long timeStamp, PacketInfo *packet)
194 195 196 197 198 199 200 201 202
{
	// We don't have enough samples to calculate the average throughput.
	if(numSamples < MIN_SAMPLES)
		return;

	int sampleCount = ( numSamples < MAX_SAMPLES )?numSamples:MAX_SAMPLES;
	int i, index;
	unsigned long long timePeriod = 0;
	long packetSizeSum = 0;
203 204 205 206
	int packetCount = 0;
	
	int firstPacketIndex = (queuePtr -1 + MAX_SAMPLES)%MAX_SAMPLES;
	bool heavyLossFlag = false;
207

208 209 210 211 212 213 214 215
	if(ackList[firstPacketIndex].numPackets >= MAX_SAMPLES)
	{
		logWrite(SENSOR,"Setting heavy loss flag");
		heavyLossFlag = true;
	}

	//for(i = 0;(i < sampleCount && timePeriod < MAX_TIME_PERIOD); i++)
	for(i = 0;i < sampleCount; i = i + packetCount)
216 217 218
	{
		index = (queuePtr -1 - i + MAX_SAMPLES)%MAX_SAMPLES;

219 220 221 222 223 224 225 226 227 228 229
		if(i + ackList[index].numPackets >= MAX_SAMPLES)
		{
			if(heavyLossFlag == false)
			{
				logWrite(SENSOR,"Breaking out of Tput loop");
				break;
			}
			else
				heavyLossFlag = false;
		}

230 231
		timePeriod += ackList[index].timeTaken;
		packetSizeSum += ackList[index].packetSize;
232
		packetCount = ackList[index].numPackets;
233 234 235 236 237
	}

	// Avoid dividing by zero.
	if(timePeriod == 0)
	{
238
		logWrite(ERROR, "Timeperiod is zero in UdpAvgThroughput calculation, i = %d, index = %d, seqNum = %d", i, index, ackList[index].seqNum);
239 240 241
		return;
	}

242
	if(timePeriod > 50000000)
243
	{
244
		logWrite(ERROR, " Incorrect UdpAvgThroughput timePeriod = %llu, bytes = %d, i = %d, numSamples = %d, sampleCount = %d queuePtr = %d", timePeriod, packetSizeSum, i,numSamples,sampleCount, queuePtr);
245 246 247 248
		int k;
		for(k = 0; k < i; k++)
		{
			index = (queuePtr -1 - k + MAX_SAMPLES)%MAX_SAMPLES;
249
			logWrite(ERROR, "Wrong UDP seqnum = %d, index = %d, bytes = %d, timePeriod = %llu, isRedun = %d", ackList[index].seqNum,index, ackList[index].packetSize, ackList[index].timeTaken, ackList[index].isRedun);
250 251 252 253 254
		}

		return;
	}

255 256 257
	// Calculate the average throughput.
	throughputKbps = 8000000.0*( static_cast<double> (packetSizeSum ))  / ( static_cast<double>(timePeriod)*1024.0 );

258

259 260 261 262
	int tputValue = static_cast<int>(throughputKbps);
	//Avoid sending a zero throughput value to the monitor.
	if(tputValue == 0)
		tputValue = 1;
263
	// Send a message to the monitor with the new bandwidth.
264
	if(lossSensor->getPacketLoss() == 0 )
265
	{
266
		if(tputValue > lastSeenThroughput )
267 268 269 270
		{
			// Send this available bandwidth as a tentative value.
			// To be used for dummynet events only if it is greater
			// than the last seen value.
271

272
			ostringstream messageBuffer;
273
			messageBuffer << tputValue;
274 275
			global::output->genericMessage(TENTATIVE_THROUGHPUT, messageBuffer.str(), packet->elab);
		}
276 277 278 279 280
		logWrite(SENSOR, "AVGTPUT:TIME=%llu,TENTATIVE=%f",timeStamp,throughputKbps);
		logWrite(SENSOR, "LOSS:TIME=%llu,LOSS=0",timeStamp);
	}
	else
	{
281
		ostringstream messageBuffer;
282
		messageBuffer << tputValue;
283
		global::output->genericMessage(AUTHORITATIVE_BANDWIDTH, messageBuffer.str(), packet->elab);
284 285 286 287 288 289
		// Send this as the authoritative available bandwidth value.
		logWrite(SENSOR, "AVGTPUT:TIME=%llu,AUTHORITATIVE=%f",timeStamp,throughputKbps);
		logWrite(SENSOR, "LOSS:TIME=%llu,LOSS=%d",timeStamp,lossSensor->getPacketLoss());

		(const_cast<UdpLossSensor *>(lossSensor))->resetLoss();
	}
290
	lastSeenThroughput = tputValue;
291 292
	logWrite(SENSOR, "STAT:TOTAL_LOSS = %d",lossSensor->getTotalPacketLoss());
}