Commit 99d6dc71 authored by Pramod R Sanaga's avatar Pramod R Sanaga
Browse files

1) Added support for sequence number wrap around of udp packets.

2) Some more error messages will be output to the log when junk packets(which are not
generated by us) are received by the magent, and the throughput calculation will remain unaffected by these anomalies.
3) Changed mock-monitor to work with the change in Header::headerSize() ( as opposed to Header::headerSize variable ) .
parent bcd60460
......@@ -357,7 +357,7 @@ int KernelTcp::writeUdpMessage(int size, WriteResult & result)
// The size of the packet & its timestamp at the
// sender are echoed in the ACKs.
unsigned short networkOrder_size = htons(size);
unsigned short networkOrder_size = (size);
memcpy(&udpPacketBuffer[1 + global::USHORT_INT_SIZE],&networkOrder_size, global::USHORT_INT_SIZE);
// Copy the timestamp of when this packet is being sent.
......@@ -367,7 +367,7 @@ int KernelTcp::writeUdpMessage(int size, WriteResult & result)
// used as a fallback option in case we miss
// this packet because of a libpcap buffer overflow.
unsigned long long curTime = getCurrentTime().toMicroseconds();
curTime = htonll(curTime);
curTime = (curTime);
memcpy(&udpPacketBuffer[1 + 2*global::USHORT_INT_SIZE], &curTime, global::ULONG_LONG_SIZE);
bool socketConnectedFlag = true;
......@@ -402,7 +402,7 @@ int KernelTcp::writeUdpMessage(int size, WriteResult & result)
// which was assigned to the socket.
if(!socketConnectedFlag || ntohs(udpLocalAddr.sin_port) == 0 )
{
socklen_t udpLocalLen;
socklen_t udpLocalLen = sizeof(udpLocalAddr);
getsockname(peersock, (sockaddr *)&udpLocalAddr, &udpLocalLen);
}
......@@ -460,7 +460,8 @@ void KernelTcp::init(void)
/* ask pcap for the network address and mask of the device */
pcap_lookupnet(global::interface.c_str(), &netp, &maskp, errbuf);
filter << "port " << global::peerServerPort << " and tcp";
filter << "\(port " << global::peerServerPort << " and tcp\)"
" or \(port " << global::peerUdpServerPort << " and udp \)";
/* open device for reading.
* NOTE: We use non-promiscuous */
......
......@@ -13,4 +13,4 @@
# The server runs in an infinite while loop - it can be terminated after the session
# is determined to be done at the client - kill using Ctrl-C.
sudo ./UdpServer eth0 3492
sudo ./UdpServer eth1 3492
......@@ -93,6 +93,8 @@ void UdpAvgThroughputSensor::localAck(PacketInfo *packet)
numThroughputAcks++;
tmpUdpAck.timeTaken = ackTimeDiff - timeDiff;
tmpUdpAck.isRedun = true;
tmpUdpAck.seqNum = redunSeqNum;
// We lost the record of the size of this packet due to libpcap
......@@ -154,6 +156,8 @@ void UdpAvgThroughputSensor::localAck(PacketInfo *packet)
}
tmpUdpAck.timeTaken = ackTimeDiff - timeDiff;
tmpUdpAck.isRedun = false;
tmpUdpAck.seqNum = seqNum;
ackList[queuePtr] = tmpUdpAck;
queuePtr = (queuePtr + 1)%MAX_SAMPLES;
......@@ -195,9 +199,23 @@ void UdpAvgThroughputSensor::calculateTput(unsigned long long timeStamp)
return;
}
if(timePeriod > 500000)
{
logWrite(ERROR, " Incorrect UdpAvgThroughput timePeriod = %llu, bytes = %d, i = %d, queuePtr = %d", timePeriod, packetSizeSum, i, queuePtr);
int k;
for(k = 0; k < i; k++)
{
index = (queuePtr -1 - k + MAX_SAMPLES)%MAX_SAMPLES;
logWrite(ERROR, "Wrong UDP seqnum = %d, bytes = %d, timePeriod = %llu, isRedun = %d", ackList[index].seqNum, ackList[index].packetSize, ackList[index].timeTaken, ackList[index].isRedun);
}
return;
}
// Calculate the average throughput.
throughputKbps = 8000000.0*( static_cast<double> (packetSizeSum )) / ( static_cast<double>(timePeriod)*1024.0 );
// Send a message to the monitor with the new bandwidth.
if(lossSensor->getPacketLoss() == 0)
{
......
......@@ -13,6 +13,8 @@ class UdpLossSensor;
struct UdpAck {
unsigned long long timeTaken;
long packetSize;
unsigned short seqNum;
bool isRedun;
};
......
......@@ -116,6 +116,7 @@ void UdpPacketSensor::localAck(PacketInfo *packet)
"We might have received "
" a reordered ACK, which has already been ACKed using redundant ACKs .\n", seqNum);
ackValid = false;
return;
}
else
ackValid = true;
......@@ -218,8 +219,12 @@ void UdpPacketSensor::localAck(PacketInfo *packet)
// were lost - treat this as congestion on the forward path.
// Find out how many packets were lost.
struct UdpPacketCmp comparePacket;
listIterator = find_if(sentPacketList.begin(), curPacketIterator, bind2nd(lessSeqNum(), seqNum));
comparePacket.seqNum = seqNum;
comparePacket.timeStamp = (*curPacketIterator).timeStamp;
listIterator = find_if(sentPacketList.begin(), curPacketIterator, bind2nd(lessSeqNum(), &comparePacket));
if( (listIterator != sentPacketList.end()) && (listIterator != curPacketIterator ))
{
......@@ -243,7 +248,7 @@ void UdpPacketSensor::localAck(PacketInfo *packet)
packetLoss++;
totalPacketLoss++;
listIterator = find_if(sentPacketList.begin(), curPacketIterator, bind2nd(lessSeqNum(), seqNum ));
listIterator = find_if(sentPacketList.begin(), curPacketIterator, bind2nd(lessSeqNum(), &comparePacket ));
}
while( (listIterator != sentPacketList.end()) && (listIterator != curPacketIterator) );
......
......@@ -395,6 +395,11 @@ struct UdpPacketInfo
bool isFake;
};
struct UdpPacketCmp{
unsigned short seqNum;
unsigned long long timeStamp;
};
class equalSeqNum:public std::binary_function<UdpPacketInfo , unsigned short int, bool>
{
public:
......@@ -404,12 +409,12 @@ class equalSeqNum:public std::binary_function<UdpPacketInfo , unsigned short int
}
};
class lessSeqNum:public std::binary_function<UdpPacketInfo , unsigned short int, bool>
class lessSeqNum:public std::binary_function<UdpPacketInfo , UdpPacketCmp *,bool>
{
public:
bool operator()(const UdpPacketInfo& packet,unsigned short int seqNum) const
bool operator()(const UdpPacketInfo& packet,UdpPacketCmp *cmpPacket) const
{
return (packet.seqNum < seqNum);
return ( (packet.seqNum < cmpPacket->seqNum) && (packet.timeStamp < cmpPacket->timeStamp));
}
};
......
import sys
import re
import math
# Read stats from the log output by UdpClient.
inFile = open(sys.argv[1], 'r')
......
......@@ -16,7 +16,7 @@ int SEND_COUNT = 4200;
int PACKET_DELTA = 5;
int PACKET_SIZE = 1000;
void sendCommand(int conn, unsigned char commandId, ElabOrder const & key,
void sendCommand(int connection, unsigned char commandId, ElabOrder const & key,
vector<char> const & command)
{
static char buffer[8096] = {0};
......@@ -29,7 +29,7 @@ void sendCommand(int conn, unsigned char commandId, ElabOrder const & key,
pos = saveHeader(pos, head);
pos = saveBuffer(pos, & command[0], command.size());
send(conn, buffer, Header::headerSize + command.size(), 0);
send(connection, buffer, Header::headerSize() + command.size(), 0);
}
int main(int argc, char * argv[])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment