/* * Copyright (c) 2006 University of Utah and the Flux Group. * * {{{EMULAB-LICENSE * * This file is part of the Emulab network testbed software. * * This file is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or (at * your option) any later version. * * This file is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public * License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this file. If not, see . * * }}} */ #include "UdpAvgThroughputSensor.h" #include "CommandOutput.h" using namespace std; UdpAvgThroughputSensor::UdpAvgThroughputSensor(UdpPacketSensor const *packetHistoryVal, UdpLossSensor const *lossSensorVal) : packetHistory(packetHistoryVal), lossSensor(lossSensorVal), lastAckTime(0), throughputKbps(0.0), lastSeenThroughput(0), numSamples(0), queuePtr(0) { } UdpAvgThroughputSensor::~UdpAvgThroughputSensor() { } void UdpAvgThroughputSensor::localSend(PacketInfo *packet) { // Do nothing. } void UdpAvgThroughputSensor::localAck(PacketInfo *packet) { // This is a re-ordered ACK - don't do anything // with it - just return. if( packetHistory->isAckValid() == false ) { ackValid = false; return; } // Find out how many redundant ACKs this packet is carrying - 0 to 121. unsigned char numRedunAcksChar = 0; memcpy(&numRedunAcksChar, &packet->payload[0], global::UCHAR_SIZE); int numRedunAcks = static_cast(numRedunAcksChar); // This is the timestamp at the receiver, when the original packet was received. unsigned long long currentAckTimeStamp = *(unsigned long long *)(packet->payload + 1 + 2*global::USHORT_INT_SIZE ); unsigned long long timeStamp = packet->packetTime.toMicroseconds(); // This is the first ACK we have seen, store its receiver timestamp // and return, we cannot calculate throughput from just one ACK - at least 2. if(lastAckTime == 0) { lastAckTime = currentAckTimeStamp; return; } unsigned short int seqNum = *(unsigned int *)(packet->payload + 1); unsigned short int echoedPacketSize = *(unsigned short int *)(packet->payload + 1 + global::USHORT_INT_SIZE); unsigned long long ackTimeDiff = (currentAckTimeStamp - lastAckTime); vector::iterator vecIterator; UdpAck tmpUdpAck; vector ackedPackets = packetHistory->getAckedPackets(); unsigned long long timeDiff = 0; // Average the throughput over all the packets being acknowledged. if(numRedunAcks > 0) { int i; unsigned short int redunSeqNum; unsigned short int redunPacketSize; for(i = 0;i < numRedunAcks; i++) { redunSeqNum = *(unsigned short int *)(packet->payload + global::udpMinAckPacketSize + i*global::udpRedunAckSize); redunPacketSize = *(unsigned short int *)(packet->payload + global::udpMinAckPacketSize + i*global::udpRedunAckSize + global::USHORT_INT_SIZE); // Find if this redundant ACK is useful - or it was acked before. vecIterator = find_if(ackedPackets.begin(), ackedPackets.end(), bind2nd(equalSeqNum(), redunSeqNum)); if(vecIterator != ackedPackets.end()) { // Calculate throughput for the packet being acked by // the redundant ACK. timeDiff = *(unsigned long long *)(packet->payload + global::udpMinAckPacketSize + i*global::udpRedunAckSize + 2*global::USHORT_INT_SIZE); if((timeDiff > ackTimeDiff) || (ackTimeDiff - timeDiff == 0)) { logWrite(EXCEPTION, "Error using UDP redun Seqnum = %d, for seqNum = %d, time taken = %llu,ackTimeDiff = %llu, timeDiff = %llu, i = %d, numAcks = %d",redunSeqNum,seqNum, ackTimeDiff - timeDiff,ackTimeDiff,timeDiff, i, numRedunAcks); continue; } tmpUdpAck.timeTaken = ackTimeDiff - timeDiff; tmpUdpAck.isRedun = true; tmpUdpAck.seqNum = redunSeqNum; tmpUdpAck.numPackets = 1; // We lost the record of the size of this packet due to libpcap // loss, use the length echoed back in the ACK. if((*vecIterator).isFake == true) { tmpUdpAck.packetSize = redunPacketSize; } else { tmpUdpAck.packetSize = (*vecIterator).packetSize; } ackList[queuePtr] = tmpUdpAck; queuePtr = (queuePtr + 1)%MAX_SAMPLES; if(numSamples < MAX_SAMPLES) numSamples++; ackTimeDiff = timeDiff; } } } if(ackTimeDiff == 0) { calculateTput(timeStamp, packet); lastAckTime = currentAckTimeStamp; logWrite(ERROR,"Error - Two UDP ACKs report the same receive time, for seqNum = %d",seqNum); return; } // Calculate the throughput for the current packet being ACKed. vecIterator = find_if(ackedPackets.begin(), ackedPackets.end(), bind2nd(equalSeqNum(), seqNum)); if(vecIterator == ackedPackets.end()) { logWrite(ERROR,"Error - Incorrect ack packets state passed to UdpAvgThroughputSensor"); return; } // We lost the record of the size of this packet due to libpcap // loss, use the length echoed back in the ACK. if(packetHistory->isAckFake() == true) { tmpUdpAck.packetSize = echoedPacketSize; } else { tmpUdpAck.packetSize = (*vecIterator).packetSize; } //tmpUdpAck.timeTaken = ackTimeDiff - timeDiff; tmpUdpAck.timeTaken = ackTimeDiff; tmpUdpAck.isRedun = false; tmpUdpAck.seqNum = seqNum; tmpUdpAck.numPackets = lossSensor->getPacketLoss() + 1; ackList[queuePtr] = tmpUdpAck; queuePtr = (queuePtr + 1)%MAX_SAMPLES; if(numSamples < MAX_SAMPLES) numSamples++; calculateTput(timeStamp, packet); // Save the receiver timestamp of this ACK packet, so that we can // use for calculating throughput for the next ACK packet. lastAckTime = currentAckTimeStamp; } void UdpAvgThroughputSensor::calculateTput(unsigned long long timeStamp, PacketInfo *packet) { // We don't have enough samples to calculate the average throughput. if(numSamples < MIN_SAMPLES) return; int sampleCount = ( numSamples < MAX_SAMPLES )?numSamples:MAX_SAMPLES; int i, index; unsigned long long timePeriod = 0; long packetSizeSum = 0; int packetCount = 0; int firstPacketIndex = (queuePtr -1 + MAX_SAMPLES)%MAX_SAMPLES; bool heavyLossFlag = false; if(ackList[firstPacketIndex].numPackets >= MAX_SAMPLES) { logWrite(SENSOR,"Setting heavy loss flag"); heavyLossFlag = true; } //for(i = 0;(i < sampleCount && timePeriod < MAX_TIME_PERIOD); i++) for(i = 0;i < sampleCount; i = i + packetCount) { index = (queuePtr -1 - i + MAX_SAMPLES)%MAX_SAMPLES; if(i + ackList[index].numPackets >= MAX_SAMPLES) { if(heavyLossFlag == false) { logWrite(SENSOR,"Breaking out of Tput loop"); break; } else heavyLossFlag = false; } timePeriod += ackList[index].timeTaken; packetSizeSum += ackList[index].packetSize; packetCount = ackList[index].numPackets; } // Avoid dividing by zero. if(timePeriod == 0) { logWrite(ERROR, "Timeperiod is zero in UdpAvgThroughput calculation, i = %d, index = %d, seqNum = %d", i, index, ackList[index].seqNum); return; } if(timePeriod > 50000000) { logWrite(ERROR, " Incorrect UdpAvgThroughput timePeriod = %llu, bytes = %d, i = %d, numSamples = %d, sampleCount = %d queuePtr = %d", timePeriod, packetSizeSum, i,numSamples,sampleCount, queuePtr); int k; for(k = 0; k < i; k++) { index = (queuePtr -1 - k + MAX_SAMPLES)%MAX_SAMPLES; logWrite(ERROR, "Wrong UDP seqnum = %d, index = %d, bytes = %d, timePeriod = %llu, isRedun = %d", ackList[index].seqNum,index, ackList[index].packetSize, ackList[index].timeTaken, ackList[index].isRedun); } return; } // Calculate the average throughput. throughputKbps = 8000000.0*( static_cast (packetSizeSum )) / ( static_cast(timePeriod)*1024.0 ); int tputValue = static_cast(throughputKbps); //Avoid sending a zero throughput value to the monitor. if(tputValue == 0) tputValue = 1; // Send a message to the monitor with the new bandwidth. if(lossSensor->getPacketLoss() == 0 ) { if(tputValue > lastSeenThroughput ) { // Send this available bandwidth as a tentative value. // To be used for dummynet events only if it is greater // than the last seen value. ostringstream messageBuffer; messageBuffer << tputValue; global::output->genericMessage(TENTATIVE_THROUGHPUT, messageBuffer.str(), packet->elab); } logWrite(SENSOR, "AVGTPUT:TIME=%llu,TENTATIVE=%f",timeStamp,throughputKbps); logWrite(SENSOR, "LOSS:TIME=%llu,LOSS=0",timeStamp); } else { ostringstream messageBuffer; messageBuffer << tputValue; global::output->genericMessage(AUTHORITATIVE_BANDWIDTH, messageBuffer.str(), packet->elab); // Send this as the authoritative available bandwidth value. logWrite(SENSOR, "AVGTPUT:TIME=%llu,AUTHORITATIVE=%f",timeStamp,throughputKbps); logWrite(SENSOR, "LOSS:TIME=%llu,LOSS=%d",timeStamp,lossSensor->getPacketLoss()); (const_cast(lossSensor))->resetLoss(); } lastSeenThroughput = tputValue; logWrite(SENSOR, "STAT:TOTAL_LOSS = %d",lossSensor->getTotalPacketLoss()); }