Commit 09b8130b authored by Pramod R Sanaga's avatar Pramod R Sanaga

Changes to integrate Rubenstein's and wavelet processing into a common set

of scripts - and a lot of CVS cleanup.

Also, added a check for variance in the case of Rubenstein congestion detection
code - the results are identical to those of wavelets in my Emulab experiment.
I guess we'll carry on working with both..
parent 82f1b29c
# delay2dump.pl
if ($ARGV != 2)
if ($#ARGV != 1)
{
die ("Usage: delay2dump.pl <in-file> <out-path>");
}
......@@ -16,17 +16,35 @@ $sequence = 0;
while ($line = <IN>)
{
$line =~ /^([0-9.]+) ([0-9.]+) ([0-9.]+) ([0-9.]+)$/;
$send1 = $1;
$send2 = $2;
$delay1 = $3;
$delay2 = $4;
$send1 = $2;
$send2 = $4;
$delay1 = $1;
$delay2 = $3;
$send1_delay = ($send1 + $delay1)*1000;
$send2_delay = ($send2 + $delay2)*1000;
$send1 = $send1*1000;
$send2 = $send2*1000;
$send1_sec = $send1/1000000;
$send1_usec = $send1 % 1000000;
$send2_sec = $send2/1000000;
$send2_usec = $send2 % 1000000;
$send1_delay_sec = $send1_delay/1000000;
$send1_delay_usec = $send1_delay % 1000000;
$send2_delay_sec = $send2_delay/1000000;
$send2_delay_usec = $send2_delay % 1000000;
if ($send1 ne -9999 && $send2 ne -9999)
{
print (OUT_SOURCE, $send1." 10.0.0.2 ".$sequence);
print (OUT_SOURCE, $send2." 10.0.0.3 ".$sequence);
print (OUT_DEST1, ($send1 + $delay1)." 10.0.0.1 ".$sequence);
print (OUT_DEST2, ($send2 + $delay2)." 10.0.0.1 ".$sequence);
printf (OUT_SOURCE "%d.%d 10.0.0.2 %d\n", $send1_sec,$send1_usec,$sequence);
printf (OUT_SOURCE "%d.%d 10.0.0.3 %d\n", $send2_sec,$send2_usec,$sequence);
printf (OUT_DEST1 "%d.%d 10.0.0.2 %d\n",$send1_delay_sec,$send1_delay_usec,$sequence);
printf (OUT_DEST2 "%d.%d 10.0.0.3 %d\n",$send2_delay_sec,$send2_delay_usec,$sequence);
}
$sequence++;
......
......@@ -16,27 +16,27 @@ sub setupDest
my %hash;
my $file;
open($file, "<".$fileName)
or die("Couldn't open file $fileName for reading");
or die("Couldn't open file $fileName for reading");
my @array = <$file>;
for ($i = 0; $i < @array; ++$i)
{
$array[$i] =~ /$dumpPattern/;
$hash{$3} = $1;
$array[$i] =~ /$dumpPattern/;
$hash{$3} = $1;
}
$array[0] =~ /$dumpPattern/;
if ($_[0] == 1)
{
$dest1Address = $2;
$dest1Address = $2;
}
else
{
$dest2Address = $2;
$dest2Address = $2;
}
return %hash;
}
open(SOURCE_IN, "<$basePath/source.dump")
or die("Couldn't open file $basePath/source.dump for reading");
or die("Couldn't open file $basePath/source.dump for reading");
@source = <SOURCE_IN>;
close(SOURCE_IN);
......@@ -60,32 +60,33 @@ for ($i = 0; $i < @source; ++$i)
$flowAddress = $2;
$flowSequence = $3;
if ($flowAddress == $dest1Address)
if ($flowAddress eq $dest1Address)
{
$destNum = 1;
$destSequence = $sequence1;
++$sequence1;
$destTime = $dest1{$3};
$destNum = 1;
$destSequence = $sequence1;
++$sequence1;
$destTime = $dest1{$3};
}
else
{
$destNum = 2;
$destSequence = $sequence2;
++$sequence2;
$destTime = $dest2{$3};
$destNum = 2;
$destSequence = $sequence2;
++$sequence2;
$destTime = $dest2{$3};
}
if (defined($destTime))
{
if ($flowAddress == $dest1Address)
{
print DEST1_OUT 'Rcvr: (f'.$destNum.':'.$destSequence.':M'.$merged.')@'.$sourceTime.' arr '.$destTime."\n";
}
else
{
print DEST2_OUT 'Rcvr: (f'.$destNum.':'.$destSequence.':M'.$merged.')@'.$sourceTime.' arr '.$destTime."\n";
}
print SOURCE_OUT 'Src1: send (f'.$destNum.':'.$destSequence.':M'.$merged.')@'.$sourceTime."\n";
++$merged;
if ($flowAddress eq $dest1Address)
{
print DEST1_OUT 'Rcvr: (f'.$destNum.':'.$destSequence.':M'.$merged.')@'.$sourceTime.' arr '.$destTime."\n";
}
else
{
print DEST2_OUT 'Rcvr: (f'.$destNum.':'.$destSequence.':M'.$merged.')@'.$sourceTime.' arr '.$destTime."\n";
}
print SOURCE_OUT 'Src1: send (f'.$destNum.':'.$destSequence.':M'.$merged.')@'.$sourceTime." \n";
++$merged;
}
}
This diff is collapsed.
......@@ -25,7 +25,7 @@
#include <map>
#include <sstream>
#define REMOTE_SERVER_PORT 19835
#define REMOTE_SERVER_PORT 19655
#define MAX_MSG 100
......@@ -34,6 +34,7 @@ pcap_t *pcapDescriptor = NULL;
using namespace std;
vector< vector<unsigned long long> > sendTimesArray;
vector< vector<int> > delaySequenceArray;
vector< map<unsigned long long, int> > packetTimeMaps;
vector< map<string, unsigned long long> > actualTimeMaps;
......@@ -70,6 +71,7 @@ void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr,
ostringstream tmpStrStream;
tmpStrStream << origTimestamp;
actualTimeMaps[hostIndex][tmpStrStream.str()] = (unsigned long long)(secVal*1000 + usecVal/1000);
sendTimesArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] = (unsigned long long)(secVal*1000 + usecVal/1000);
printf("Recording Timestamp = %s for Host = %d, value = %llu\n", tmpStrStream.str().c_str(), hostIndex, actualTimeMaps[hostIndex][tmpStrStream.str()]);
}
else if(packetType == '1')
......@@ -88,7 +90,9 @@ void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr,
delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] = oneWayDelay;
}
else
{
delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] = oneWayDelay - ( actualTimeMaps[hostIndex][tmpStrStream.str()] - origTimestamp);
}
cout << " Onewaydelay for the ACK = " << oneWayDelay << " recorded = "<< delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] <<", host Index = "<< hostIndex << "\n";
cout <<" Orig timestamp was "<< tmpStrStream.str() << " , actual time = "<< actualTimeMaps[hostIndex][tmpStrStream.str()]<<"\n";
......@@ -188,10 +192,15 @@ void pcapCallback(u_char *user, const struct pcap_pkthdr *pcap_info, const u_cha
void init_pcap( char *ipAddress)
{
char interface[] = "eth1";
char interface[20] = "vnet";
struct bpf_program bpfProg;
char errBuf[PCAP_ERRBUF_SIZE];
char filter[128] = " udp ";
ifstream ifmapHandle;
ifmapHandle.open("/var/emulab/boot/ifmap", ios::in);
ifmapHandle >> interface;
ifmapHandle.close();
// IP Address and sub net mask.
bpf_u_int32 maskp, netp;
......@@ -200,8 +209,8 @@ void init_pcap( char *ipAddress)
pcap_lookupnet(interface, &netp, &maskp, errBuf);
pcapDescriptor = pcap_open_live(interface, BUFSIZ, 0, 0, errBuf);
localAddress.s_addr = netp;
printf("IP addr = %s\n", ipAddress);
sprintf(filter," udp and ( (src host %s and dst port 19835 ) or (dst host %s and src port 19835 )) ", ipAddress, ipAddress);
printf("IP addr = %s, Interface = %s\n", ipAddress, interface);
sprintf(filter," udp and ( (src host %s and dst port 19655 ) or (dst host %s and src port 19655 )) ", ipAddress, ipAddress);
if(pcapDescriptor == NULL)
{
......@@ -229,8 +238,8 @@ int main(int argc, char **argv)
string localHostName = argv[3];
int timeout = 2000; // 1 second
int probeRate = 20; // Hz
int probeDuration = 15000; // 15 seconds
int probeRate = 12; // Hz
int probeDuration = 150000; // 150 seconds
vector<string> hostList;
ifstream inputFileHandle;
......@@ -299,6 +308,7 @@ int main(int argc, char **argv)
fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK);
sendTimesArray.resize(numHosts);
delaySequenceArray.resize(numHosts);
packetTimeMaps.resize(numHosts);
actualTimeMaps.resize(numHosts);
......@@ -404,10 +414,10 @@ int main(int argc, char **argv)
packetTimeMaps[i][sendTime] = packetCounter;
delaySequenceArray[i].push_back(-9999);
sendTimesArray[i].push_back(sendTime);
cout<< "TO " << hostList[i] << " :Counter=" << packetCounter << " :SendTime= " << sendTime << endl;
}
packetCounter++;
lastSentTime = getTimeMilli();
......@@ -452,6 +462,82 @@ int main(int argc, char **argv)
}
}
// Find the common sequence of delay indices present for
// all the probed destinations.
int minSequenceLength = 128;
int maxFirstSeenIndex = -999999, minLastSeenIndex = 9999999;
vector<int> delaySequenceSize;
delaySequenceSize.resize(numHosts);
for (int i = 0; i < numHosts; i++)
{
int delaySeqLen = delaySequenceArray[i].size();
int firstSeenIndex = -1;
int lastSeenIndex = -1;
for (int k = 0; k < delaySeqLen; k++)
{
if (delaySequenceArray[i][k] != -9999)
{
lastSeenIndex = k;
if (firstSeenIndex == -1)
firstSeenIndex = k;
}
}
if(lastSeenIndex == -1)
{
delaySequenceSize[i] = 0;
}
else
{
delaySequenceSize[i] = lastSeenIndex - firstSeenIndex + 1;
if(delaySequenceSize[i] >= minSequenceLength)
{
if(lastSeenIndex < minLastSeenIndex)
minLastSeenIndex = lastSeenIndex;
if(firstSeenIndex > maxFirstSeenIndex)
maxFirstSeenIndex = firstSeenIndex;
}
}
}
int delaySeqLen = delaySequenceArray[0].size();
maxFirstSeenIndex = -1;
minLastSeenIndex = -1;
bool missingSampleFlag = false;
for (int k = 0; k < delaySeqLen; k++)
{
missingSampleFlag = false;
for (int l = 0; l < numHosts; l++)
{
// Ignore paths which do not have the minimum
// number of delay samples.
if(delaySequenceSize[l] < minSequenceLength)
continue;
if (delaySequenceArray[l][k] == -9999)
{
missingSampleFlag = true;
}
}
if(!missingSampleFlag)
{
minLastSeenIndex = k;
if (maxFirstSeenIndex == -1)
maxFirstSeenIndex = k;
}
}
printf("Maxfirstseenindex = %d, minlastseenIndex = %d\n", maxFirstSeenIndex, minLastSeenIndex);
fflush(stdout);
for (int i = 0; i < numHosts; i++)
{
string dirPath = outputDirectory + "/" + hostList[i];
......@@ -473,6 +559,21 @@ int main(int argc, char **argv)
int firstSeenIndex = -1;
int lastSeenIndex = -1;
if(delaySequenceSize[i] < minSequenceLength)
{
// Create an empty log file.
ofstream outputFileHandle;
string delayFilePath = dirPath + "/" + "delay.log";
outputFileHandle.close();
continue;
}
else
{
firstSeenIndex = maxFirstSeenIndex;
lastSeenIndex = minLastSeenIndex;
}
/*
for (int k = 0; k < delaySeqLen; k++)
{
if (delaySequenceArray[i][k] != -9999)
......@@ -482,6 +583,7 @@ int main(int argc, char **argv)
firstSeenIndex = k;
}
}
*/
if (lastSeenIndex != -1)
{
......@@ -522,7 +624,7 @@ int main(int argc, char **argv)
{
for (int k = firstSeenIndex; k < lastSeenIndex + 1; k++)
{
outputFileHandle << delaySequenceArray[i][k] << "\n";
outputFileHandle << delaySequenceArray[i][k] << " "<< sendTimesArray[i][k] << "\n";
}
}
outputFileHandle.close();
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment