Commit f80056e0 authored by Pramod R Sanaga's avatar Pramod R Sanaga

Changed the cross-correlation to use FFT - makes it really fast!

Cross-correlation calculation now takes less than 15 sec for a 15 minute
TCP transfer - averaging time is still huge though...

Added documentation to explain what is going on and how a run
is declared correct/incorrect.
parent 3d9a3a47
......@@ -6,18 +6,26 @@
#
#########################################################
# This script requires two command line arguments
# Input:
# This script requires one command line arguments
# Argument 1 - Pointer to the
# /proj/.../exp/project_name/ directory
#
# (The elab-*, planet-* directories should
# have tcpdump log files named SanityCheck.log)
#
# Argument 2 - Output file name
# Output:
# The report will be a file named SanityCheckReport.txt
# in the "given_directory/logs"
# Details for each pair of machines - the exact
# cross correlation value, Percentage difference between
# Flexlab & Planetlab transfer rates etc. can be found
# in "given_directory/logs/SanityCheckDetails.txt"
#
#########################################################
#########################################################
# Sanity check for Flexlab UDP runs
# Sanity check for Flexlab TCP & UDP runs
#
# Calculates the correlation of the traffic
# on Planetlab nodes versus the traffic on the
......@@ -25,8 +33,11 @@
#
# Requires that Tcpdump files be saved on elab-*
# and planet-* nodes during the Flexlab run.
# This can be done by passing "-t" option
# to start-experiment and stop-experiment scripts
#
# Run this script after collecting the logs
# This script can also be done in stand-alone mode
# after collecting the logs
# with stop-experiment and point to the
# directory having the collected logs.
#
......@@ -39,6 +50,8 @@ import sys
import re
import os
import array
from numpy import *
from numpy.fft import *
import math
from posix import *
from optparse import OptionParser
......@@ -131,6 +144,7 @@ def CalcBucketAvgThroughput(fileName, outputFile, bucketSize, transportFlag):
tmpFileHandle.write(str(tmpTimeCounter)+" "+str(throughput)+"\n")
iter += 1
numLines += 1
tmpTimeCounter = tmpTimeCounter + 10000
lastBucketThreshold = lastBucketThreshold + timePeriod / 10000
......@@ -156,6 +170,12 @@ def CalcBucketAvgThroughput(fileName, outputFile, bucketSize, transportFlag):
os.system("cat " + tmpFile + " >> " + outputFile)
os.remove(tmpFile)
del packetSizeArray
del packetTimeArray
# Return the avg. transfer rate - in kbps
return (8000000*packetAvgBytes/(packetAvgTime*1024.0))
#}
###############################################################
......@@ -263,6 +283,12 @@ def CalcMovingAvgThroughput(fileName, outputFile, minSamples, avgTimeScale, maxS
os.system("cat " + tmpFile + " >> " + outputFile)
os.remove(tmpFile)
del packetSizeArray
del packetTimeArray
# Return the avg. transfer rate - in kbps
return (8000000*packetAvgBytes/(packetAvgTime*1024.0))
#}
###############################################################
......@@ -360,11 +386,166 @@ def PreProcess(inputFile, outputFile):
os.system("cat " + tmpFile + " >> " + outputFile)
os.remove(tmpFile)
del timeArray
del valueArray
#}
##############################################################
##############################################################
def CalcCorrelation(InFile1, InFile2, transportFlag):
def CalcCorrelationFFT(InFile1, InFile2, transportFlag, plabAvg, elabAvg):
#{
i = 0; j = 0
mx = 0.0;my = 0.0; sx = 0.0; sy = 0.0; sxy = 0.0; denom = 0.0
n = 0
delay = 0
count1 = 0 ; count2 = 0
rows1 = 0; rows2 = 0
maxCorr = 0
InFileHandle1 = file(InFile1, "r")
InFileHandle2 = file(InFile2, "r")
rows1 = int( (InFileHandle1.readline()).split()[0] )
rows2 = int( (InFileHandle2.readline()).split()[0] )
if rows1 < rows2:
#{
n = rows1
#}
else:
#{
n = rows2
#}
x = [0.0] * n
y = [0.0] * n
count1 = 0
for lineRead in InFileHandle1:
#{
if count1 >= n:
break
x[count1] = float( lineRead.split()[1] )
count1 += 1
#}
InFileHandle1.close()
count2 = 0
for lineRead in InFileHandle2:
#{
if count2 >= n:
break
y[count2] = float( lineRead.split()[1] )
count2 += 1
#}
InFileHandle2.close()
# Calculate the delay interval with maximum
# correlation using FFT
xFFT = fft(x)
yFFT = fft(y)
inverseCorrArray = [0.0 + 0.0j] * (n)
corrArray = [0.0 + 0.0j] * (n)
for i in range(0, n):
#{
inverseCorrArray[i] = xFFT[i].conjugate() * yFFT[i]
#}
corrArray = ifft(inverseCorrArray)
maxCorr = -99999999
maxIndex = 0
# Find out the optimal shift in -10 sec: 10 sec range
for i in range(-1000, 1000):
#{
if corrArray[i].real > maxCorr:
maxCorr = corrArray[i].real
maxIndex = i
#}
delay = maxIndex
# Calculate the mean of the two series x[], y[]
mx = 0
my = 0
# Both the files are empty - just return.
if n != 0:
#{
for i in range(0, n):
#{
mx += x[i]
my += y[i]
#}
mx /= n
my /= n
# Calculate the denominator
sx = 0
sy = 0
for i in range(0,n):
#{
sx += (x[i] - mx) * (x[i] - mx)
sy += (y[i] - my) * (y[i] - my)
#}
denom = math.sqrt(sx*sy)
# Calculate the correlation coefficient for the
# above determinted shift
sxy = 0
for i in range(0,n):
#{
j = i + delay
if j < 0 or j >= n:
continue
else:
sxy += (x[i] - mx) * (y[j] - my)
#}
maxCorr = sxy / denom
#}
else:
#{
delay = 0
maxCorr = 0
#}
if transportFlag == 0:
transport = "TCP"
elif transportFlag == 1:
transport = "UDP"
diffTransferRate = elabAvg - plabAvg
diffPercentage = 100.0*(float(diffTransferRate))/(float(elabAvg))
OutputFileHandle.write("Max_Correlation=" + str(maxCorr) + ":Delay=" + str(delay*10) + "ms:Transport=" + transport + ":N=" + str(n) + ":KbpsPercentDiff=" + str(diffPercentage) + "\n")
del x
del y
del xFFT
del yFFT
del corrArray
del inverseCorrArray
#}
##############################################################
##############################################################
def CalcCorrelation(InFile1, InFile2, transportFlag, plabAvg, elabAvg):
#{
i = 0; j = 0
......@@ -416,7 +597,6 @@ def CalcCorrelation(InFile1, InFile2, transportFlag):
InFileHandle2.close()
# Calculate the mean of the two series x[], y[]
mx = 0
my = 0
......@@ -444,6 +624,14 @@ def CalcCorrelation(InFile1, InFile2, transportFlag):
sy += (y[i] - my) * (y[i] - my)
#}
# Subtract the mean of each series from individual values
# This saves us a lot of time in the nested for loop below.
for i in range(0,n):
#{
x[i] = x[i] - mx
y[i] = y[i] - my
#}
denom = math.sqrt(sx*sy)
maxCorr = -1.0
......@@ -455,6 +643,8 @@ def CalcCorrelation(InFile1, InFile2, transportFlag):
for delay in range(-maxdelay,maxdelay):
#{
sxy = 0
sys.stdout.write("$")
sys.stdout.flush()
for i in range(0,n):
#{
......@@ -463,7 +653,8 @@ def CalcCorrelation(InFile1, InFile2, transportFlag):
if j < 0 or j >= n:
continue
else:
sxy += (x[i] - mx) * (y[j] - my)
#sxy += (x[i] - mx) * (y[j] - my)
sxy += (x[i]) * (y[j])
#}
r = sxy / denom
# OutputFileHandle.write(str(delay) + " " +str(r) + "\n")
......@@ -491,7 +682,15 @@ def CalcCorrelation(InFile1, InFile2, transportFlag):
elif transportFlag == 1:
transport = "UDP"
OutputFileHandle.write("Max_Correlation=" + str(maxCorr) + ":Delay=" + str(maxDelay*10) + "ms:Transport=" + transport + ":N=" + str(n) + "\n")
diffTransferRate = elabAvg - plabAvg
diffPercentage = 100.0*(float(diffTransferRate))/(float(elabAvg))
OutputFileHandle.write("Max_Correlation=" + str(maxCorr) + ":Delay=" + str(maxDelay*10) + "ms:Transport=" + transport + ":N=" + str(n) + ":KbpsPercentDiff=" + str(diffPercentage) + "\n")
del x
del y
#}
......@@ -507,10 +706,10 @@ def processFiles(fileNameElab, fileNamePlab, averagingMethod, transportFlag):
if averagingMethod == 0: # Moving average
#{
CalcMovingAvgThroughput(fileNameElab, tmpFileElab1, minAvgSamples, avgTime, maxAvgSamples, transportFlag)
elabAvg = CalcMovingAvgThroughput(fileNameElab, tmpFileElab1, minAvgSamples, avgTime, maxAvgSamples, transportFlag)
sys.stdout.write("#")
sys.stdout.flush()
CalcMovingAvgThroughput(fileNamePlab, tmpFilePlab1, minAvgSamples, avgTime, maxAvgSamples, transportFlag)
plabAvg = CalcMovingAvgThroughput(fileNamePlab, tmpFilePlab1, minAvgSamples, avgTime, maxAvgSamples, transportFlag)
sys.stdout.write("#")
sys.stdout.flush()
......@@ -520,7 +719,7 @@ def processFiles(fileNameElab, fileNamePlab, averagingMethod, transportFlag):
PreProcess(tmpFilePlab1, tmpFilePlab2)
sys.stdout.write("#")
sys.stdout.flush()
CalcCorrelation(tmpFilePlab2, tmpFileElab2, transportFlag)
CalcCorrelationFFT(tmpFilePlab2, tmpFileElab2, transportFlag, plabAvg, elabAvg)
sys.stdout.write("#")
sys.stdout.flush()
......@@ -531,13 +730,13 @@ def processFiles(fileNameElab, fileNamePlab, averagingMethod, transportFlag):
#}
elif averagingMethod == 1: # Bucket average
#{
CalcBucketAvgThroughput(fileNameElab, tmpFileElab1, avgTime, transportFlag)
elabAvg = CalcBucketAvgThroughput(fileNameElab, tmpFileElab1, avgTime, transportFlag)
sys.stdout.write("#")
sys.stdout.flush()
CalcBucketAvgThroughput(fileNamePlab, tmpFilePlab1, avgTime, transportFlag)
plabAvg = CalcBucketAvgThroughput(fileNamePlab, tmpFilePlab1, avgTime, transportFlag)
sys.stdout.write("#")
sys.stdout.flush()
CalcCorrelation(tmpFilePlab1, tmpFileElab1, transportFlag)
CalcCorrelationFFT(tmpFilePlab1, tmpFileElab1, transportFlag, plabAvg, elabAvg)
sys.stdout.write("#")
sys.stdout.flush()
......@@ -545,17 +744,26 @@ def processFiles(fileNameElab, fileNamePlab, averagingMethod, transportFlag):
os.remove(tmpFilePlab1)
#}
#}
###############################################################
###############################################################
# This is the current set of crieteria for declaring a run
# to be valid.
#
# For all pairs of nodes in the Flexlab experiment:
# a) Cross-correlation > 0.5
# b) Offset used for this value of cross-correlation >= 0
# c) Offset < 8 seconds
# d) Difference between Flexlab & Planetlab transfer rates
# is less than 40%
# e) SanityCheck Tcpdump files exist for all pairs of nodes.
#
def PrintReport(correlationDataFile, numNodes, numNodesProcessed, reportFileName):
#{
correlationFileHandle = file(correlationDataFile, "r")
reportFileHandle = file (reportFileName, "w")
correlationRegex = re.compile('Max_Correlation=([\d\.]*?):Delay=([\-\d]*?)ms:Transport=([\w]*?):N=([\d]*)$')
correlationRegex = re.compile('Max_Correlation=([\d\.]*?):Delay=([\-\d]*?)ms:Transport=([\w]*?):N=([\d]*):KbpsPercentDiff=([\-\d\.]*?)$')
errorFlag = 0
......@@ -573,10 +781,10 @@ def PrintReport(correlationDataFile, numNodes, numNodesProcessed, reportFileName
matchObj = correlationRegex.match(correlationLine)
if ( ( int(matchObj.group(4)) ) and \
( ( float(matchObj.group(1)) < 0.5 ) or ( int(matchObj.group(2)) < 0 ) or ( int(matchObj.group(2)) > 8000 ) ) ) :
( ( float(matchObj.group(1)) < 0.5 ) or ( int(matchObj.group(2)) < 0 ) or ( int(matchObj.group(2)) > 8000 ) or ( float(matchObj.group(5)) > 40.0 or float(matchObj.group(5)) < -40.0 ) ) ) :
#{
print "\n***** ERROR: This Flexlab run is possibly tainted. \n Detected " + matchObj.group(3) + " cross-correlation mismatch for files '" + flexLabFile + "' and '" + pLabFile + "'"
reportFileHandle.write( "***** ERROR: This Flexlab run is possibly tainted. \n Detected " + matchObj.group(3) + " cross-correlation mismatch for files '" + flexLabFile + "' and '" + pLabFile + "'\n")
print "\n***** ERROR: This Flexlab run is possibly tainted. \nDetected " + matchObj.group(3) + " cross-correlation mismatch for files '" + flexLabFile + "' and '" + pLabFile + "'"
reportFileHandle.write( "***** ERROR: This Flexlab run is possibly tainted. \nDetected " + matchObj.group(3) + " cross-correlation mismatch for files '" + flexLabFile + "' and '" + pLabFile + "'\n")
errorFlag = 1
#}
......@@ -587,8 +795,8 @@ def PrintReport(correlationDataFile, numNodes, numNodesProcessed, reportFileName
if numNodesProcessed != numNodes :
#{
print "\n***** ERROR: " + str(numNodes - numNodesProcessed) + " files required for cross-correlation sanity check are missing.\n This Flexlab run is possibly tainted."
reportFileHandle.write("***** ERROR: " + str(numNodes - numNodesProcessed) + " files required for cross-correlation sanity check are missing.\n This Flexlab run is possibly tainted.\n")
print "\n***** ERROR: " + str(numNodes - numNodesProcessed) + " files required for cross-correlation sanity check are missing.\nThis Flexlab run is possibly tainted."
reportFileHandle.write("***** ERROR: " + str(numNodes - numNodesProcessed) + " files required for cross-correlation sanity check are missing.\nThis Flexlab run is possibly tainted.\n")
errorFlag = 1
#}
......@@ -686,10 +894,27 @@ def Main():
#{
numNodesProcessed = numNodesProcessed + 1
OutputFileHandle.write("Flexlab_File = " + fileNameElab + " \nPlab_File = " + fileNamePlab + "\n")
if (os.stat(fileNameElab).st_size > 0 and os.stat(fileNamePlab).st_size > 0):
#{
for j in range(iterator, transportFlag):
processFiles(fileNameElab, fileNamePlab, avgMethod, j)
OutputFileHandle.write("Flexlab_File = " + fileNameElab + " \nPlab_File = " + fileNamePlab + "\n")
if transportFlag == 2 and iterator == 1:
OutputFileHandle.write("Max_Correlation=0.000000" + ":Delay=0" + "ms:Transport=TCP" + ":N=0:KbpsPercentDiff=0.000000" + "\n")
for j in range(iterator, transportFlag):
processFiles(fileNameElab, fileNamePlab, avgMethod, j)
if transportFlag == 1 and iterator == 0:
OutputFileHandle.write("Max_Correlation=0.000000" + ":Delay=0" + "ms:Transport=UDP" + ":N=0:KbpsPercentDiff=0.000000" + "\n")
#}
else:
#{
OutputFileHandle.write("Flexlab_File = " + fileNameElab + " \nPlab_File = " + fileNamePlab + "\n")
OutputFileHandle.write("Max_Correlation=0.000000" + ":Delay=0" + "ms:Transport=TCP" + ":N=0:KbpsPercentDiff=0.000000" + "\n")
OutputFileHandle.write("Max_Correlation=0.000000" + ":Delay=0" + "ms:Transport=UDP" + ":N=0:KbpsPercentDiff=0.000000" + "\n")
#}
#}
#}
else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment