Commit 7c874741 authored by Pramod R Sanaga's avatar Pramod R Sanaga
Browse files

TCP over UDP works ass a stand alone flow and along with itself as the

competing traffic - stemmed the packet losses in rcv buffers. Completely(& unfairly) dominates a competing IPerf TCP flow though.
parent 89a56729
......@@ -26,6 +26,7 @@
#include <map>
#include <sstream>
#include <math.h>
#include <set>
#define REMOTE_SERVER_PORT 9831
//#define MAX_MSG 450000
......@@ -54,8 +55,13 @@ struct tcp_block{
bool lossRecovery;
list< pair<unsigned long, unsigned long> > sackRanges;
set< pair<unsigned long, unsigned long> > sackRanges;
int congAvoidancePackets;
unsigned long sackEdge;
// Temp variables.
unsigned long range_start, range_end;
};
pcap_t *pcapDescriptor = NULL;
......@@ -75,19 +81,38 @@ unsigned long highSeq;
void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr, u_char *const udpPacketStart, struct ip const *ipPacket);
int xmit_packet(char *message, int messageLen);
unsigned long long getTimeMilli();
static int ackCount = 0;
template <class T> class In_Range : public std::unary_function<T, bool>
{
public:
bool operator() (T& val)
{
if( (val >= conn_info.range_start) && (val <= conn_info.range_end) )
return true;
else
return false;
}
};
bool IsLost(unsigned long seq)
{
// If this sequence number was acked by any of the SACKs, we wouldn't find
// it in the unacked list.
if(find(unackedPackets.begin(),unackedPackets.end(),seq) == unackedPackets.end())
/*
if(checkUnAcked)
{
return false;
if(find(unackedPackets.begin(),unackedPackets.end(),seq) == unackedPackets.end())
{
return false;
}
}
else
*/
{ // If the packet is still unacked, check that it is below the highest seq.
// sacked to be considered lost.
if( (conn_info.sackRanges.size() > 0) && (seq < conn_info.sackRanges.back().second ) )
//if( (conn_info.sackRanges.size() > 0) && (seq < (*(conn_info.sackRanges.rbegin())).second ) )
if(seq < conn_info.sackEdge)
return true;
else
return false;
......@@ -101,7 +126,9 @@ unsigned long NextSeq(bool timeout)
// Return the sequence number of the first unacked packet in case of a timeout.
if(timeout)
{
return (*listIter);
}
while(listIter != unackedPackets.end())
{
......@@ -109,7 +136,8 @@ unsigned long NextSeq(bool timeout)
{
if( (*listIter) > conn_info.HighRxt )
{
if( (*listIter) < conn_info.sackRanges.back().second )
//if( (*listIter) < (*(conn_info.sackRanges.rbegin())).second )
if( (*listIter) < conn_info.sackEdge )
{
return (*listIter);
}
......@@ -130,17 +158,26 @@ void SetPipe()
conn_info.pipe = 0;
for(i = conn_info.HighACK+1; i <= conn_info.HighData; i++)
//for(i = conn_info.HighACK+1; i <= conn_info.HighData; i++)
list<unsigned long>::iterator listIter = unackedPackets.begin();
// Check only in the unacked list.
while(listIter != unackedPackets.end())
{
if( ! IsLost(i) )
if( ! IsLost((*listIter)) )
conn_info.pipe++;
if(i <= conn_info.HighRxt)
conn_info.pipe++;
//FIXME:
//if((*listIter) <= conn_info.HighRxt)
// conn_info.pipe++;
listIter++;
}
}
/*
bool sackCompare( pair<unsigned long, unsigned long> pair1, pair<unsigned long, unsigned long> pair2)
{
if(pair1.second <= pair2.second)
......@@ -149,58 +186,147 @@ bool sackCompare( pair<unsigned long, unsigned long> pair1, pair<unsigned long,
return false;
}
*/
struct sackCompare
{
bool operator()( pair<unsigned long, unsigned long> pair1, pair<unsigned long, unsigned long> pair2)
{
if(pair1.second <= pair2.second)
return true;
else
return false;
}
};
void UpdateSACKs(int numSACKBlocks, u_char *dataPtr)
{
// Remove SACK ranges to the left side of packets which have been cumulatively acked.
/*
if(conn_info.HighACK >= (*(conn_info.sackRanges.begin())).second)
{
set<pair<unsigned long, unsigned long> >::iterator sackIter = conn_info.sackRanges.begin();
while(sackIter != conn_info.sackRanges.end())
{
if(conn_info.HighACK >= (*sackIter).second)
conn_info.sackRanges.erase(sackIter++);
else
sackIter++;
}
}
*/
if(numSACKBlocks > 0)
{
unsigned long lastSACKBlock = *(dataPtr + ((numSACKBlocks-1)*2 + 1 )*sizeof(unsigned long));
// unsigned long lastSACKBlock = *(dataPtr + ((numSACKBlocks-1)*2 + 1 )*sizeof(unsigned long));
pair<unsigned long, unsigned long> tmpRange;
printf("Received a SACK\n");
//printf("Received a SACK for seq = %u\n",conn_info.HighACK+1);
// Erase packets acked by the SACK blocks from the unacked list.
list<unsigned long>::iterator listIter;
for(int i = 0;i < numSACKBlocks; i++)
{
tmpRange.first = *(dataPtr + i*2*sizeof(unsigned long));
tmpRange.second = *(dataPtr + (i*2+1)*sizeof(unsigned long));
tmpRange.first = *( (unsigned long *)(dataPtr + i*2*sizeof(unsigned long)));
tmpRange.second = *( (unsigned long *)(dataPtr + (i*2+1)*sizeof(unsigned long)));
/*
if(conn_info.sackRanges.find(tmpRange) == conn_info.sackRanges.end())
{
conn_info.sackRanges.insert(tmpRange);
cout << " Inserting "<<tmpRange.first<<", "<<tmpRange.second<<", size = "<<conn_info.sackRanges.size()<<endl;
}
*/
conn_info.range_start = tmpRange.first;
conn_info.range_end = tmpRange.second;
unackedPackets.remove_if( In_Range<unsigned long>() );
/*
for( unsigned long j = tmpRange.first; j <= tmpRange.second; j++)
{
if(find(unackedPackets.begin(),unackedPackets.end(),j) != unackedPackets.end())
unackedPackets.erase(find(unackedPackets.begin(),unackedPackets.end(),j));
// listIter = find(unackedPackets.begin(),unackedPackets.end(),j);
// if(listIter != unackedPackets.end())
// unackedPackets.erase(listIter);
rexmitMap.erase(j);
packetTimeMap.erase(j);
}
*/
// Remember the highest SACK number seen.
if(tmpRange.second > conn_info.sackEdge)
conn_info.sackEdge = tmpRange.second;
}
// Update the SACK ranges.
// It is not necessary to merge them - all that we care about is their
// right most ( most recent ) edge.
/*
{
for(int i = 0;i < numSACKBlocks; i++)
{
tmpRange.first = *(dataPtr + i*2*sizeof(unsigned long));
tmpRange.second = *(dataPtr + (i*2+1)*sizeof(unsigned long));
tmpRange.first = *( (unsigned long *)(dataPtr + i*2*sizeof(unsigned long)));
tmpRange.second = *( (unsigned long *)(dataPtr + (i*2+1)*sizeof(unsigned long)));
conn_info.sackRanges.push_back(tmpRange);
//cout<<"Adding range " << tmpRange.first << ", "<<tmpRange.second<<endl;
conn_info.sackRanges.insert(tmpRange);
}
}
*/
conn_info.sackRanges.sort(sackCompare);
}
//conn_info.sackRanges.sort(sackCompare);
list<pair<unsigned long, unsigned long> >::iterator sackIter = conn_info.sackRanges.begin();
/*
// Remove duplicate entries.
list<pair<unsigned long, unsigned long> >::iterator sackIter = conn_info.sackRanges.begin();
unsigned long range_start, range_end;
range_start = (*sackIter).first;
range_end = (*sackIter).second;
sackIter++;
while(sackIter != conn_info.sackRanges.end())
{
if( ( (*sackIter).first == range_start) && ((*sackIter).second == range_end) )
{
conn_info.sackRanges.erase(sackIter++);
}
else
{
range_start = (*sackIter).first;
range_end = (*sackIter).second;
sackIter++;
}
}
*/
// Remove SACK ranges to the left side of packets which have been cumulatively acked.
while(sackIter != conn_info.sackRanges.end())
{
if(conn_info.HighACK >= (*sackIter).second)
conn_info.sackRanges.erase(sackIter++);
else
/*
list<pair<unsigned long, unsigned long> >::iterator sackIter = conn_info.sackRanges.begin();
printf("SACK Ranges:\n");
while(sackIter != conn_info.sackRanges.end())
{
printf("%d %d : ", (*sackIter).first, (*sackIter).second);
sackIter++;
}
printf("\n");
*/
}
/*
sackIter = conn_info.sackRanges.begin();
printf("SACK Ranges after ( unacked = %d,front = %u, back = %u):\n",unackedPackets.size(), unackedPackets.front(), unackedPackets.back());
while(sackIter != conn_info.sackRanges.end())
{
printf("%d %d :\n", (*sackIter).first, (*sackIter).second);
sackIter++;
}
printf("\n");
*/
}
// Updates all the state variables.
......@@ -242,11 +368,20 @@ void Update(unsigned long seqNum, int numSACKBlocks, u_char *dataPtr, unsigned l
}
}
list<unsigned long>::iterator listIter;
// Remove all cumulatively acked packets from the unacked list.
conn_info.range_start = conn_info.HighACK;
conn_info.range_end = seqNum - 1;
unackedPackets.remove_if( In_Range<unsigned long>() );
for(unsigned long i = conn_info.HighACK; i < seqNum; i++)
{
if(find(unackedPackets.begin(),unackedPackets.end(),i) != unackedPackets.end())
unackedPackets.erase(find(unackedPackets.begin(),unackedPackets.end(),i));
// listIter = find(unackedPackets.begin(),unackedPackets.end(),i);
// if(listIter != unackedPackets.end())
// unackedPackets.erase(listIter);
rexmitMap.erase(i);
packetTimeMap.erase(i);
......@@ -257,7 +392,11 @@ void Update(unsigned long seqNum, int numSACKBlocks, u_char *dataPtr, unsigned l
// If all packets upto the loss recovery point have been acked,
// terminate loss recovery.
if( ( conn_info.lossRecovery ) && ( seqNum >= conn_info.recoveryPoint ) )
{
// printf("Exiting loss recovery at %u\n", seqNum);
conn_info.lossRecovery = false;
conn_info.dup_acks = 0;
}
// Reset the timout value - because this ACK is acknowledging new data.
unsigned long long currTime = getTimeMilli();
......@@ -274,6 +413,7 @@ void Update(unsigned long seqNum, int numSACKBlocks, u_char *dataPtr, unsigned l
// We received three duplicate ACKS - enter the loss recovery stage.
if(conn_info.dup_acks == 3)
{
//printf("Entering loss recovery for seq = %u\n", seqNum);
conn_info.lossRecovery = true;
// Set the recovery point - multiple losses upto this point
......@@ -283,12 +423,25 @@ void Update(unsigned long seqNum, int numSACKBlocks, u_char *dataPtr, unsigned l
// Decrease the congestion window and slow start threshold.
conn_info.ssthresh = unackedPackets.size() / 2;
conn_info.congWindow = unackedPackets.size() / 2;
printf("Setting ssthresh = %d\n", conn_info.ssthresh);
if(conn_info.ssthresh < 2)
{
conn_info.ssthresh = 2;
conn_info.congWindow = 2;
}
//printf("Setting ssthresh = %d\n", conn_info.ssthresh);
}
}
}
unsigned long long measure1, measure2;
// measure1 = getTimeMilli();
UpdateSACKs(numSACKBlocks, dataPtr);
/*
measure2 = getTimeMilli();
if(measure2 - measure1 > 2)
cout << " Spent " << measure2 - measure1 << " ms in select\n";
*/
SetPipe();
......@@ -309,6 +462,7 @@ void Update(unsigned long seqNum, int numSACKBlocks, u_char *dataPtr, unsigned l
conn_info.congWindow += numPacketsAcked;
// printf(", window after = %f\n", conn_info.congWindow);
}
// cout << "CongWin: " << conn_info.congWindow<< " "<< conn_info.ssthresh << " , outstanding=" <<conn_info.pipe << ", last Range = "<<conn_info.sackRanges.back().second<<endl;
}
......@@ -481,7 +635,7 @@ void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr,
}
}
void handlePacket(u_char *const dataPtr, unsigned long long recvTime)
void handlePacket(u_char *const dataPtr, unsigned long long recvTime, bool newPacketTx)
{
char packetType = *(char *)(dataPtr);
......@@ -498,23 +652,34 @@ void handlePacket(u_char *const dataPtr, unsigned long long recvTime)
// Set the retransmission timer.
//retransTimer = currTime + (unsigned long long)conn_info.rto_estimate;
//cout<<"Sending seqNum "<<seqNum<<endl;
// cout<<"Sending seqNum "<<seqNum<<endl;
unackedPackets.push_back(seqNum);
if(newPacketTx)
unackedPackets.push_back(seqNum);
}
else if(packetType == '1')
{
// We received an ACK.
ackCount++;
int sackBlocks = (int)(*(char *)(dataPtr + 1));
unsigned long seqNum = ( *(unsigned long *)(dataPtr + 2));
reportedLost = ( *(unsigned long *)(dataPtr + 2 + sizeof(unsigned long)));
highSeq = ( *(unsigned long *)(dataPtr + 2 + 2*sizeof(unsigned long)));
/*
if(sackBlocks == 0)
cout << "Received an ACK for "<<seqNum<<endl;
else
cout << "HigSeq = "<<highSeq<<endl;
*/
highSeq = ( *(unsigned long *)(dataPtr + 2 + 2*sizeof(unsigned long)));
int sackStart = 2 + 3*sizeof(unsigned long);
Update(seqNum, sackBlocks, (dataPtr + sackStart), recvTime);
}
else
{
......@@ -549,15 +714,19 @@ int main(int argc, char **argv)
memcpy((char *) &localHostAddr.sin_addr.s_addr,
localhostEnt->h_addr_list[0], localhostEnt->h_length);
/*
init_pcap(inet_ntoa(localHostAddr.sin_addr), true);
int pcapfd = pcap_get_selectable_fd(pcapDescriptor);
*/
/*
init_pcap(inet_ntoa(localHostAddr.sin_addr), false);
int pcapfd_read = pcap_get_selectable_fd(pcapDescriptor_Read);
int pcapfd_write = pcap_get_selectable_fd(pcapDescriptor_Write);
*/
clientSocket = socket(AF_INET, SOCK_DGRAM, 0);
clientSocket = socket(PF_INET, SOCK_DGRAM, 0);
fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK);
......@@ -589,10 +758,11 @@ int main(int argc, char **argv)
conn_info.dup_acks = 0;
// Initialize the timeout value to be one second.
conn_info.rto_estimate = 1000;
conn_info.rtt_est = 1000;
conn_info.rto_estimate = 3000;
conn_info.rtt_est = 3000;
conn_info.rtt_deviation_est = 0;
retransTimer = 0;
conn_info.sackEdge = 0;
connDuration *= 1000;
rexmitMap.clear();
......@@ -616,22 +786,27 @@ int main(int argc, char **argv)
}
*/
struct timeval timeoutStruct;
struct timeval timeoutStruct, writeTimeout;
timeoutStruct.tv_sec = 0;
timeoutStruct.tv_usec = 50000;
timeoutStruct.tv_usec = 20000;
writeTimeout.tv_sec = 0;
writeTimeout.tv_usec = 2000;
while (( lastSentTime - startTime) < connDuration)
unsigned long long measure1, measure2;
unsigned long long recvTime = getTimeMilli();
while ( ( ( lastSentTime - startTime) < connDuration) && (( recvTime - startTime) < connDuration))
{
FD_ZERO(&socketReadSet);
FD_SET(clientSocket,&socketReadSet);
FD_SET(pcapfd,&socketReadSet);
//FD_SET(pcapfd_write,&socketReadSet);
FD_ZERO(&socketWriteSet);
FD_SET(clientSocket,&socketWriteSet);
//FD_SET(pcapfd,&socketReadSet);
//select(clientSocket+pcapfd_read+pcapfd_write+1,&socketReadSet,&socketWriteSet,0,&timeoutStruct);
select(clientSocket+1,&socketReadSet,&socketWriteSet,0,&timeoutStruct);
//select(clientSocket+1,&socketReadSet,&socketWriteSet,0,&timeoutStruct);
select(clientSocket+1,&socketReadSet,NULL,0,&timeoutStruct);
//select(pcapfd+clientSocket+1,&socketReadSet,&socketWriteSet,0,&timeoutStruct);
/*
......@@ -651,136 +826,180 @@ int main(int argc, char **argv)
if (FD_ISSET(clientSocket,&socketReadSet) )
{
unsigned long long recvTime = getTimeMilli();
recvTime = getTimeMilli();
//cout << recvTime - startTime << " " << conn_info.congWindow<< " "<< conn_info.ssthresh << " , outstanding=" <<conn_info.pipe <<", reportedLost = "<<reportedLost<<endl;
//memset(msg, 0x0, 100);
int retval = 1;
flags = 0;
// cout<<"Into recv at "<<recvTime - startTime<<endl;
//while( (retval = recvfrom(clientSocket, (void *)msg, MAX_MSG, flags,(struct sockaddr *) &servAddr, &echoLen) ) > 0)
do{
// do
{
retval = recvfrom(clientSocket, (void *)msg, 75, flags,(struct sockaddr *) &servAddr, &echoLen) ;
if(retval > 0)
{
/*
if(retval > 120)
{
printf("ERROR, packet size = %d\n",retval);
exit(1);
}
handlePacket(msg, recvTime);
*/
handlePacket(msg, recvTime,false);
//memset(msg, 0x0, 100);
}
/*
else
{
// printf("Retval = %d\n",retval);
// printf("ERRNO = %d\n", errno);
}
//pcap_dispatch(pcapDescriptor, 300, pcapCallback, NULL);
*/
}
while(retval > 0);
// while(retval > 0);
// recvTime = getTimeMilli();
// cout<<"Out of recv at "<<recvTime - startTime<<endl;
}
if (FD_ISSET(clientSocket,&socketWriteSet) != 0)
if( (conn_info.congWindow - conn_info.pipe >= 1.0) || timeoutFlag )
{
unsigned long long currTime;
unsigned long nextSeq;
FD_ZERO(&socketWriteSet);
FD_SET(clientSocket,&socketWriteSet);
currTime = getTimeMilli();
// Send a packet as long as the oustanding data is less than cong. window
while( (conn_info.congWindow - conn_info.pipe >= 1.0) || timeoutFlag )
{
nextSeq = NextSeq(timeoutFlag);
select(clientSocket+1,NULL,&socketWriteSet,0,&writeTimeout);
memset(messageString, 0x0, 100);
// Data packet.
messageString[0] = '0';
// Copy the sequence number into the packet data.
memcpy(&messageString[1], &nextSeq, sizeof(unsigned long));
if (FD_ISSET(clientSocket,&socketWriteSet) != 0)
{
unsigned long long currTime;
unsigned long nextSeq;
rc = xmit_packet(messageString, 1458);
currTime = getTimeMilli();
if(rc < 0)
break;
else
// Send a packet as long as the oustanding data is less than cong. window
while( (conn_info.congWindow - conn_info.pipe >= 1.0) || timeoutFlag )
{
//Record a timestamp for the packet. This is less accurate than
// the libpcap timestamp, but serves as a backup if we fail to
// catch this packet on the way out due to libpcap buffer overflow.
// In the most common case, this is going to be revised when the
// packet is recorded by the libpcap filter function.
packetTimeMap[conn_info.HighData] = currTime;
nextSeq = NextSeq(timeoutFlag);
retransTimer = currTime + (long long)conn_info.rto_estimate;
// Reduce the congestion window to '1' on a timeout.
if(timeoutFlag)
{
conn_info.ssthresh = unackedPackets.size()/2;
if(conn_info.ssthresh < 2)
conn_info.ssthresh = 2;
memset(messageString, 0x0, 100);
// Data packet.
messageString[0] = '0';
conn_info.congWindow = 1;
// Copy the sequence number into the packet data.
memcpy(&messageString[1], &nextSeq, sizeof(unsigned long));
timeoutFlag = false;
printf("Timed out\n");
}
rc = xmit_packet(messageString, 1458);
//cout << "Transmitting # "<<conn_info.HighData<<", Time = "<<currTime - startTime<<endl;
// This is a new packet, increment the sequence number.
if(nextSeq > conn_info.HighData)
conn_info.HighData = nextSeq;
else if( (nextSeq > conn_info.HighRxt) && conn_info.lossRecovery )
if(rc <= 0)
break;
else
{
conn_info.HighRxt = nextSeq;
// Mark this as a retransmitted packet - doesn't count
// towards RTT/RTO calculation.
rexmitMap[nextSeq] = true;
//Record a timestamp for the packet. This is less accurate than
// the libpcap timestamp, but serves as a backup if we fail to
// catch this packet on the way out due to libpcap buffer overflow.
// In the most common case, this is going to be revised when the
// packet is recorded by the libpcap filter function.
packetTimeMap[conn_info.HighData] = currTime;
retransTimer = currTime + (long long)conn_info.rto_estimate;
// Reduce the congestion window to '1' on a timeout.
if(timeoutFlag)
{