Commit 0f1f37ee authored by Pramod R Sanaga's avatar Pramod R Sanaga
Browse files

Code to implement the "Wavelet based approach to shared congestion detection". It produces

all the delay agent events correctly - some more work is needed for clustering the paths
properly when transitive property of bottleneck links is violated.
parent 039a86c8
#!/bin/sh
#
# EMULAB-COPYRIGHT
# Copyright (c) 2006, 2007 University of Utah and the Flux Group.
# All rights reserved.
#
InputFile="\"$1\""
OutputFile="\"$2\""
/usr/bin/R --no-save > /dev/null 2>&1 << EOF
library(rwt)
delays <- read.table($InputFile, header=0)
xval <- delays[,1]
yval <- delays[,2]
h <- daubcqf(6)\$h.1
xResult.dwt <- denoise.dwt(xval, h)
yResult.dwt <- denoise.dwt(yval, h)
correctedX <- xResult.dwt\$xd
correctedY <- yResult.dwt\$xd
xvec <- as.vector(correctedX)
yvec <- as.vector(correctedY)
corrObj <- ccf(xvec, yvec, lag.max=0, type="correlation", plot=FALSE)
dataFrame <- data.frame(corrObj[[4]], corrObj[[1]])
write.table(dataFrame,file=$OutputFile,quote=FALSE,row.names=FALSE, col.names=FALSE)
q(runLast=FALSE)
EOF
#!/usr/local/bin/python
#
# EMULAB-COPYRIGHT
# Copyright (c) 2006, 2007 University of Utah and the Flux Group.
# All rights reserved.
#
import sys
import time
import socket
import select
import fcntl, os
hostNameFile = sys.argv[1]
timeout = 5 # seconds
probeRate = 10 # Hz
probeDuration = 15 # seconds
portNum = 5001
outputDirectory = sys.argv[2]
localHostName = sys.argv[3]
# Create the output directory.
os.mkdir(outputDirectory)
# Read the input file having all the planetlab node IDs.
# Get the hostname of this host.
#localHostName = socket.gethostname()
inputFileHandle = open(hostNameFile, 'r')
hostList = []
for hostLine in inputFileHandle.readlines():
#{
hostLine = hostLine.strip()
if hostLine != localHostName:
#{
hostList.append(hostLine)
#}
#}
inputFileHandle.close()
numHosts = len(hostList)
targetSleepTime = float ((1000.0/float(probeRate))/1000.0 ) - 0.001
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
fcntl.fcntl(clientSocket, fcntl.F_SETFL, os.O_NONBLOCK)
for i in range(0, numHosts-1):
#{
firstHostAddr = (hostList[i] + ".flex-plab2.tbres.emulab.net", portNum)
delaySequenceArray = []
delaySequenceArray.append(list())
delaySequenceArray.append(list())
packetTimeMaps = []
packetTimeMaps.append(dict())
packetTimeMaps.append(dict())
packetCounter = 0
for j in range(i+1, numHosts):
#{
secondHostAddr = (hostList[j] + ".flex-plab2.tbres.emulab.net", portNum)
startTime = int(time.time()*1000)
lastSentTime = startTime
endProbesFlag = 0
readTimeoutFlag = 0
# For each combination(pair), send a train of UDP packets.
while (( lastSentTime - startTime)/1000 < probeDuration) or \
not(readTimeoutFlag):
#{
# Stop waiting for probe replies after a timeout - calculated from the
# time the last probe was sent out.
if endProbesFlag and ( (time.time()*1000 - lastSentTime)/1000.0 > timeout):
readTimeoutFlag = 1
# Stop sending probes after the given probe duration.
if not(endProbesFlag) and (lastSentTime - startTime)/1000.0 > probeDuration:
endProbesFlag = 1
if endProbesFlag:
time.sleep(timeout/10.0)
if not(endProbesFlag):
readSet, writeSet, exceptionSet = select.select([clientSocket], [clientSocket], [], 0.0)
else:
readSet, writeSet, exceptionSet = select.select([clientSocket], [], [], 0.0)
if not(readTimeoutFlag):
#{
if clientSocket in readSet:
#{
serverReply = "a"
while serverReply:
#{
try:
#{
serverReply, serverAddr = clientSocket.recvfrom(1024)
print hostList[i] + " " + hostList[j] + " " + serverReply
hostIndex, origTimestamp, oneWayDelay = serverReply.split()
hostIndex = int(hostIndex)
oneWayDelay = int(oneWayDelay)
delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] = oneWayDelay
#}
except socket.error:
#{
if endProbesFlag:
#{
time.sleep(timeout/10.0)
#}
break
#}
#}
#}
#}
if not(endProbesFlag):
#{
if clientSocket in writeSet:
#{
# Send the probe packets.
sendTime = int(time.time()*1000)
messageString = "0 "
messageString = messageString + str(sendTime)
clientSocket.sendto(messageString, firstHostAddr)
packetTimeMaps[0][str(sendTime)] = packetCounter
delaySequenceArray[0].append(-9999)
sendTime = int(time.time()*1000)
messageString = "1 "
messageString = messageString + str(sendTime)
clientSocket.sendto(messageString, secondHostAddr)
packetTimeMaps[1][str(sendTime)] = packetCounter
delaySequenceArray[1].append(-9999)
# Sleep for 99 msec for a 10Hz target probing rate.
lastSentTime = time.time()*1000
time.sleep(targetSleepTime)
packetCounter += 1
#}
else:
#{
if not(time.time()*1000 - lastSentTime > targetSleepTime*1000):
time.sleep( float( ( targetSleepTime*1000 - (time.time()*1000 - lastSentTime) )/1000.0) )
#}
#}
#}
# If we lost some replies/packets, linearly interpolate their delay values.
delaySeqLen = len(delaySequenceArray[0])
firstSeenIndex = -1
lastSeenIndex = -1
for k in range(0, delaySeqLen):
#{
if delaySequenceArray[0][k] != -9999 and delaySequenceArray[1][k] != -9999:
if firstSeenIndex == -1:
firstSeenIndex = k
lastSeenIndex = k
#}
if lastSeenIndex != -1:
#{
for k in range(firstSeenIndex, lastSeenIndex+1):
#{
if delaySequenceArray[0][k] == -9999:
delaySequenceArray[0][k] = int( ( delaySequenceArray[0][k-1] + delaySequenceArray[0][k+1]) /2 )
if delaySequenceArray[1][k] == -9999:
delaySequenceArray[1][k] = int( ( delaySequenceArray[1][k-1] + delaySequenceArray[1][k+1]) /2 )
#}
#}
dirPath = outputDirectory + "/" + hostList[i]
if not(os.path.isdir(dirPath)):
os.mkdir(dirPath)
dirPath = dirPath + "/" + hostList[j]
os.mkdir(dirPath)
outputFileHandle = open(dirPath + "/" + "delay.log", 'w')
if lastSeenIndex != -1:
#{
for k in range(firstSeenIndex, lastSeenIndex+1):
#{
outputFileHandle.write(str(delaySequenceArray[0][k]) + " " + str(delaySequenceArray[1][k]) + "\n")
#}
#}
outputFileHandle.close()
if lastSeenIndex == -1:
print "ERROR: No samples were seen for hosts " + hostList[i] + " " + hostList[j]
#}
#}
node-1
node-2
node-3
node-4
node-5
#!/usr/local/bin/python
#
# EMULAB-COPYRIGHT
# Copyright (c) 2006, 2007 University of Utah and the Flux Group.
# All rights reserved.
#
import sys,os
delayFileHandle = open(sys.argv[1], 'r')
pathOneDelayList = []
pathTwoDelayList = []
for delayLine in delayFileHandle.readlines():
#{
pathOneDelay, pathTwoDelay = delayLine.split()
pathOneDelay = int(pathOneDelay)
pathTwoDelay = int(pathTwoDelay)
pathOneDelayList.append(pathOneDelay)
pathTwoDelayList.append(pathTwoDelay)
#}
delayFileHandle.close()
delayListLen = len(pathOneDelayList)
# Check whether there are a minimum of number
# of delay measurements.
if delayListLen < 25:
#{
# Indicate an error.
print "CORRELATION=-2.0"
sys.exit()
#}
powerOf2 = 1
while True:
#{
if delayListLen > powerOf2:
#{
powerOf2 *= 2
#}
else:
#{
break
#}
#}
# Pad the delay sequences with zeros at the end -
# to make the sequences of length power of '2'.
for i in range(0, powerOf2 - delayListLen):
#{
pathOneDelayList.append(0)
pathTwoDelayList.append(0)
#}
tmpWaveletFile = "/tmp/bw-wavelet.tmp"
tmpCorrFile = "/tmp/bw-corr.tmp"
tmpFileHandle = open(tmpWaveletFile, 'w')
for i in range(0, powerOf2):
#{
tmpFileHandle.write(str(pathOneDelayList[i]) + " " + str(pathTwoDelayList[i]) + "\n")
#}
tmpFileHandle.close()
os.system("source WaveletR.sh " + tmpWaveletFile + " " + tmpCorrFile)
tmpFileHandle = open(tmpCorrFile, 'r')
corrLine = tmpFileHandle.readline()
tmpFileHandle.close()
print "CORRELATION=" + str(float(corrLine.split()[1]))
#!/usr/local/bin/python
#
# EMULAB-COPYRIGHT
# Copyright (c) 2006, 2007 University of Utah and the Flux Group.
# All rights reserved.
#
import sys
import socket
import time
hostName = sys.argv[1]
serverPort = 5001
serverSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serverSocket.bind((socket.gethostname(), serverPort))
while True:
clientData, clientAddr = serverSocket.recvfrom(1024)
print str(clientAddr[0]) + " " + str(clientAddr[1]) + " " + clientData
currentTime = int(time.time()*1000)
replyMessage = clientData + " " + str(currentTime - int(clientData.split()[1]))
serverSocket.sendto(replyMessage, clientAddr)
#!/usr/bin/perl
#
# EMULAB-COPYRIGHT
# Copyright (c) 2006 University of Utah and the Flux Group.
# All rights reserved.
#
my $expName;
my $projName;
my $logsDir;
my $newExpName;
my $newProjName;
%bottleNecks = {};
my %nodeClasses;
die "Usage: perl sharedBottle.pl proj_name exp_name newProj_name newExp_name initial_conditions.txt(Optional)"
if($#ARGV < 3);
$projName = $ARGV[0];
$expName = $ARGV[1];
$newProjName = $ARGV[2];
$newExpName = $ARGV[3];
$initialConditionsFilename = $ARGV[4];
$logsDir = "/home/pramod/testLogs";
# Get the initial conditions.
$elabInitScript = "/proj/tbres/duerig/testbed/pelab/init-elabnodes.pl";
$initConditionsCommand = $elabInitScript . " -o /tmp/initial-conditions.txt " . $newProjName . " " . $newExpName;
if($#ARGV == 3)
{
system($initConditionsCommand);
$initialConditionsFilename = "/tmp/initial-conditions.txt";
}
open(CONDITIONS, $initialConditionsFilename);
my @initialConditions = ();
while(<CONDITIONS>)
{
chomp( $_ );
push(@initialConditions, $_);
}
close(CONDITIONS);
my @addressList = ();
my $addrFlag = 0;
my %bwMap = {};
my %delayMap = {};
my %elabMap = {};
# Create & send events.
# Get initial conditions for the paths of interest
# from the database, using init-elabnodes.pl
my $tevc = "/usr/testbed/bin/tevc -e $newProjName/$newExpName now";
#@@@`/usr/testbed/bin/tevc -w -e $newProjName/$newExpName now elabc reset`;
#@@@`$tevc elabc create start`;
# Create a list of the IP addresses.
foreach $conditionLine (@initialConditions)
{
if($conditionLine =~ /(\d*?\.\d*?\.\d*?\.(\d*?))\s(\d*?\.\d*?\.\d*?\.\d*?)\s(\d*?)\s(\d*?)\s[\d\w\-\.]*/)
{
$srcAddress = $1;
$addrFlag = 0;
foreach $addrEntry (@addressList)
{
if($addrEntry eq $srcAddress)
{
$addrFlag = 1;
}
}
if($addrFlag == 0)
{
push(@addressList, $srcAddress);
$elabMap{$srcAddress} = "elabc-elab-" . $2;
print "Mapping $2 TO $elabMap{$srcAddress}\n";
}
# Create a mapping of the initial conditions.
$bwMap{$1}{$3} = $4;
$delayMap{$1}{$3} = $5;
}
}
opendir(logsDirHandle, $logsDir);
my $addrIndex = 0;
my %addrNodeMapping = {};
foreach $sourceName (readdir(logsDirHandle))
{
# Map the elab IP address in the initial conditions file, with
# the node names in the gather-results logs directory.
if( (-d $logsDir . "/" . $sourceName ) && $sourceName ne "." && $sourceName ne ".." )
{
$addrNodeMapping{$sourceName} = $addressList[$addrIndex];
print "Second mapping $addressList[$addrIndex] TO $sourceName\n";
$addrIndex++;
}
}
rewinddir(logsDirHandle);
# Descend into all the source directories
foreach $sourceName (readdir(logsDirHandle))
{
if( (-d $logsDir . "/" . $sourceName ) && $sourceName ne "." && $sourceName ne ".." )
{
my @destLists;
# Then search for all possible destinations for
# a particular source.
opendir(sourceDirHandle, $logsDir . "/" . $sourceName);
foreach $destOne (readdir(sourceDirHandle))
{
if( (-d $logsDir . "/" . $sourceName . "/" . $destOne) && $destOne ne "." && $destOne ne ".." )
{
# Inside each destination directory, look for
# all possible second destinations.
opendir(destDirHandle, $logsDir . "/" . $sourceName . "/" . $destOne);
foreach $destTwo (readdir(destDirHandle))
{
if( (-d $logsDir . "/" . $sourceName . "/" . $destOne . "/" . $destTwo) && $destTwo ne "." && $destTwo ne ".." )
{
$fullPath = "$logsDir/$sourceName/$destOne/$destTwo";
# Run Rubenstein's code on the ".filter" files
# inside the second destination directory.
#`perl /proj/tbres/duerig/testbed/pelab/bw-bottleneck/dump2filter.pl $fullPath`;
#$DansScript = "/proj/tbres/duerig/filter/genjitco.FreeBSD";
$waveletScript = "/usr/bin/python processDelay.py $fullPath/delay.log";
#$filterFile1 = $fullPath . "/" . "source.filter";
#$filterFile2 = $fullPath . "/" . "dest1.filter";
#$filterFile3 = $fullPath . "/" . "dest2.filter";
#if (!(-r $filterFile1) || !(-r $filterFile2) || !($filterFile3))
#{
# print "Missing file. Cannot process $fullPath\n";
# continue;
#}
#$sharedBottleneckCheck = $DansScript ." ". $filterFile1
# ." ". $filterFile2 ." ". $filterFile3;
print "RUN: $waveletScript\n";
my $scriptOutput;
$scriptOutput = `$waveletScript`;
# $scriptOutput[0] = "last CHANGE was CORRELATED, corr case: 30203 pkts, test case: 30203 pkts";
# $scriptOutput[1] = "testingabcdef";
# "CORRELATED" means that these two nodes have
# a shared bottleneck.
if($scriptOutput =~ /CORRELATION=(\-?[\d]*\.[\d]*)/)
{
print "For source $sourceName: Comparing $destOne $destTwo: $scriptOutput";
if($1 > 0.5)
{
push(@{ $bottleNecks{$sourceName} },$destOne . ":" . $destTwo);
push(@destLists,$destOne);
push(@destLists,$destTwo);
print "CORRELATED\n\n";
}
elsif($1 < 0.5)
{
push(@destLists,$destOne);
push(@destLists,$destTwo);
print "UNCORRELATED\n\n";
}
else
{
print "ERROR:($sourceName) Something went wrong in pattern matching phase.\n";
print "Skipping this source node\n";
next;
}
}
else
{
print "ERROR:($sourceName) Output line from Rubenstein's code did not match regular expression.\n";
print "Skipping this source node\n";
next;
}
}
}
closedir(destDirHandle);
}
}
closedir(sourceDirHandle);
# Make an adjacency matrix.
my @destSeen = ();
my $flagDestOne = 0;
my $flagDestTwo = 0;
# Count the number of unique destinations which have measurements
# from this source node.
my $numDests = 0;
my %destHash;
$tmpName = "";
$destElement = "";
foreach $destElement (@destLists)
{
$flagDest = 0;
foreach $tmpName (@destSeen)
{
if($destElement eq $tmpName)
{
$flagDest = 1;
}
}
if($flagDest == 0)
{
push(@destSeen,$destElement);
$destHash{$destElement} = $numDests;
$numDests++;
}
}
local @adjMatrix = ();
print "NUMDESTS = $numDests\n";