From f8e4942f34321932813e757f4f110d53b790ca6e Mon Sep 17 00:00:00 2001 From: Pramod R Sanaga <pramod@flux.utah.edu> Date: Fri, 14 Sep 2007 21:40:16 +0000 Subject: [PATCH] These are all the files required to make the clustering (with Wavelets) approach to shared-bottleneck detection work on Planetlab. Note: The delay calculation/traffic generation method used here is *different* from the one used earlier to calculate equivalence classes ( and Rubenstein's code ) - Hence the different directory entry. --- .../wavelet/Clustering/MinSikKim/Wavelet.cc | 28 + .../wavelet/Clustering/MinSikKim/compile.sh | 1 + .../wavelet/Clustering/MinSikKim/dc.cc | 252 +++++++ .../wavelet/Clustering/MinSikKim/dc.hh | 21 + .../wavelet/Clustering/createClusters.pl | 328 +++++++++ .../wavelet/Clustering/udpClient/compile.sh | 3 + .../wavelet/Clustering/udpClient/udpClient.cc | 502 ++++++++++++++ .../wavelet/Clustering/udpServer/compile.sh | 3 + .../wavelet/Clustering/udpServer/udpServer.cc | 253 +++++++ .../wavelet/Clustering/wvcluster.cc | 625 ++++++++++++++++++ 10 files changed, 2016 insertions(+) create mode 100644 pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/Wavelet.cc create mode 100755 pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/compile.sh create mode 100644 pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/dc.cc create mode 100644 pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/dc.hh create mode 100644 pelab/bw-bottleneck/wavelet/Clustering/createClusters.pl create mode 100755 pelab/bw-bottleneck/wavelet/Clustering/udpClient/compile.sh create mode 100644 pelab/bw-bottleneck/wavelet/Clustering/udpClient/udpClient.cc create mode 100755 pelab/bw-bottleneck/wavelet/Clustering/udpServer/compile.sh create mode 100644 pelab/bw-bottleneck/wavelet/Clustering/udpServer/udpServer.cc create mode 100644 pelab/bw-bottleneck/wavelet/Clustering/wvcluster.cc diff --git a/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/Wavelet.cc b/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/Wavelet.cc new file mode 100644 index 0000000000..78c89adcf5 --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/Wavelet.cc @@ -0,0 +1,28 @@ +#include <stdio.h> +#include<iostream> +#include<cstdlib> +#include<vector> +#include<fstream> +#include "dc.hh" + +int main(int argc, char **argv) +{ + std::ifstream fileHandle; + + fileHandle.open(argv[1], std::ios::in); + std::vector<double> delays; + int delayVal; + + while(!(fileHandle.eof())) + { + fileHandle >> delayVal; + if((fileHandle.eof())) + break; + delays.push_back(delayVal); + } + + fileHandle.close(); + + delay_correlation(delays, delays.size()); +} + diff --git a/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/compile.sh b/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/compile.sh new file mode 100755 index 0000000000..f76f0423af --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/compile.sh @@ -0,0 +1 @@ +g++ Wavelet.cc dc.cc -lm -o MinSikKimProgram diff --git a/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/dc.cc b/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/dc.cc new file mode 100644 index 0000000000..adc61d4695 --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/dc.cc @@ -0,0 +1,252 @@ +#include <math.h> +#include <algorithm> // reverse_copy() +#include <numeric> // accumulate() +#include "dc.hh" +#include <iostream> // DEBUG + +const int MAX_SAMPLE = 1024; +const int TEST_DURATION = 100; +const int FILTER_LENGTH = 12; /* Using 'db6' wavelet */ +const int MAX_LEVEL = 4; /* Maximum decomposition level */ +const double LOG2 = 0.69314718055994529f; // log(2) +const double _SMALL = 1.0e-15f; + +using namespace std; + +//vector<double> wavelet_denoising(vector<double> ); +//double xcor(vector<double> ,vector<double> ); +//vector<double> wdec(vector<double>,int); +//vector<double> wconv(vector<double>, double *); +//vector<double> wrec(vector<double> ,vector<double>,int); + +/* Filter coefficients for 'db6' wavelet */ +double LD[12]={-0.00107730108500,0.00477725751101,0.00055384220099,-0.03158203931803, + 0.02752286553002,0.09750160558708,-0.12976686756710,-0.22626469396517, + 0.31525035170924,0.75113390802158,0.49462389039839,0.11154074335008}; +double HD[12]={-0.11154074335008,0.49462389039839,-0.75113390802158,0.31525035170924, + 0.22626469396517,-0.12976686756710,-0.09750160558708,0.02752286553002, + 0.03158203931803,0.00055384220099,-0.00477725751101,-0.00107730108500}; +double LR[12]={0.11154074335008,0.49462389039839,0.75113390802158,0.31525035170924, + -0.22626469396517,-0.12976686756710,0.09750160558708,0.02752286553002, + -0.03158203931803,0.00055384220099,0.00477725751101,-0.00107730108500}; +double HR[12]={-0.00107730108500,-0.00477725751101,0.00055384220099,0.03158203931803, + 0.02752286553002,-0.09750160558708,-0.12976686756710,0.22626469396517, + 0.31525035170924,-0.75113390802158,0.49462389039839,-0.11154074335008}; + +vector<double> wconv(const vector<double> &data, double *filter) +{ + int lx = data.size(); + int lf = FILTER_LENGTH; + vector<double> result; + int i,j; + double sum = 0; + + for(i=1;i<lx+lf;i++) + { + if(i<lf) + { + for(j=0;j<i;j++) + sum=sum+filter[j]*data[i-j-1]; + } + else if(i<lx+1) + { + for(j=0;j<lf;j++) + sum=sum+filter[j]*data[i-j-1]; + } + else + { + for(j=i-lx;j<lf;j++) + sum=sum+filter[j]*data[i-j-1]; + } + result.push_back(sum); + sum=0; + } + + return result; +} + +vector<double> wdec(vector<double> data,int option) +{ + double diff; + int len,i; + vector<double> conv_result; + vector<double> dyaddown; + vector<double> edata; + + len=data.size(); + + /* Symmetric expansion to prevent edge effect */ + reverse_copy(data.begin(),(data.begin()+FILTER_LENGTH-1),inserter(edata,edata.begin())); + copy(data.begin(),data.end(),inserter(edata,edata.end())); + reverse_copy((data.end()-FILTER_LENGTH+1),data.end(),inserter(edata,edata.end())); + + /* option 1 for detail space, option 2 for approximation space */ + if(option == 1) + conv_result=wconv(edata,HD); + else if(option == 2) + conv_result=wconv(edata,LD); + + diff = (conv_result.size()-(len+FILTER_LENGTH-1))/2.0f; + conv_result.erase((conv_result.end()-(int)floor(diff)),conv_result.end()); + conv_result.erase(conv_result.begin(),(conv_result.begin()+(int)ceil(diff))); + + /* Downsampling */ + vector<double>::iterator index; + i=1; + for(index=conv_result.begin();index!=conv_result.end();index++) + { + if((i++)%2 ==0) + dyaddown.push_back(*(index)); + } + + return dyaddown; +} + +vector<double> wrec(vector<double> app,vector<double> detail,int length) +{ + vector<double> app_up; + vector<double> detail_up; + vector<double> app_conv; + vector<double> detail_conv; + int i,diff; + + /* Upsampling */ + for(i=1;i<=2*(app.size());i++) + { + if(i%2==1) + { + app_up.push_back(0.0); + detail_up.push_back(0.0); + } + else + { + app_up.push_back(app[(i/2)-1]); + detail_up.push_back(detail[(i/2)-1]); + } + } + + app_conv=wconv(app_up,LR); + detail_conv=wconv(detail_up,HR); + + /* Adding detail and approximation */ + for(i=0;i<app_conv.size();i++) + app_conv[i]=app_conv[i] + detail_conv[i]; + + app_conv.erase(app_conv.begin(),(app_conv.begin()+FILTER_LENGTH-1)); + diff=app_conv.size()-length; + app_conv.erase((app_conv.end()-diff),app_conv.end()); + + return app_conv; +} + +vector<double> wavelet_denoising(const vector<double> &data, int nSamples) +{ + double median; + int appr_size,next_size; + int med_position; + + vector<double> wcoeffs; + vector<double> detail; + vector<double> appr; + vector<double> abs_det; + + // Decide the maximum decomposition level. + int nblev; + nblev=(int)floorf(logf(nSamples)/LOG2 - logf(logf(nSamples))/LOG2); + if(nblev > MAX_LEVEL) nblev = MAX_LEVEL; + + /* Deciding threshold value for 'MINIMAXI' scheme */ + int no_wcoeffs = nSamples; /* Add the size of approximation */ + int detail_size = nSamples; + double thr, thr_lev; + for(int i = 1; i <= nblev; i++) { + detail_size= (int)floorf((detail_size+FILTER_LENGTH-1) / 2.0f); + no_wcoeffs += detail_size; + } + if(no_wcoeffs <= 32) thr = 0; + else thr = 0.3936 + 0.1829 * (logf(no_wcoeffs)/LOG2); + + /* Wavelets decomposition & thresholding */ + vector<int> wcolen; + wcolen.push_back(nSamples); + copy(data.begin(), data.end(), inserter(appr, appr.begin())); + for(int i = 1; i <= nblev; i++) { + detail = wdec(appr, 1); /* Fine detail space */ + appr = wdec(appr, 2); /* Coarse approximation space */ + abs_det.clear(); + transform(detail.begin(),detail.end(),inserter(abs_det,abs_det.begin()),fabsf); + + /* Finding the median to scale threshold */ + sort(abs_det.begin(),abs_det.end()); + if( (abs_det.size())%2 == 1 ) + { + med_position=(int)floor(abs_det.size()/2.0f); + median = abs_det[med_position]; + } + else + { + med_position=(int)(abs_det.size()/2); + median =( abs_det[med_position-1] + abs_det[med_position] )/2.0; + } + + thr_lev=thr*(median/0.6745); + + /* Soft-thresholding */ + for(int j=0;j<detail.size();j++) + { + if(detail[j] >= 0) + { + if(detail[j] <= thr_lev) + detail[j]=0; + else + detail[j]=detail[j]-thr_lev; + } + else + { + if(detail[j] >= (-1*thr_lev) ) + detail[j]=0; + else + detail[j]=detail[j]+thr_lev; + } + } + + + copy(detail.begin(),detail.end(),inserter(wcoeffs,wcoeffs.end())); + wcolen.push_back(detail.size()); + + } + wcolen.push_back(appr.size()); + copy(appr.begin(),appr.end(),inserter(wcoeffs,wcoeffs.end())); + + /* Wavelets reconstruction */ + appr.clear(); + detail.clear(); + appr_size=wcolen.back(); + wcolen.pop_back(); + next_size=wcolen.back(); + wcolen.pop_back(); + copy((wcoeffs.end()-appr_size),wcoeffs.end(),inserter(appr,appr.begin())); + wcoeffs.resize(wcoeffs.size()-appr_size); + for(int i=1;i<=nblev;i++) + { + copy((wcoeffs.end()-next_size),wcoeffs.end(),inserter(detail,detail.begin())); + wcoeffs.resize(wcoeffs.size()-next_size); + next_size=wcolen.back(); + wcolen.pop_back(); + appr=wrec(appr,detail,next_size); + } + + return appr; + +} + +void delay_correlation(const vector<double> &delay, + int nSamples) +{ + /* Wavelets denoising */ + vector<double> wd; + wd=wavelet_denoising(delay, nSamples); + + for(int i = 0;i < wd.size(); i++) + cout << wd[i] << "\n"; +} diff --git a/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/dc.hh b/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/dc.hh new file mode 100644 index 0000000000..e938768053 --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/MinSikKim/dc.hh @@ -0,0 +1,21 @@ +/* + Detecting Shared Congestion by wavelets + + Header file for shcon.cpp + By Taek H. Kim (thkim@ece.utexas.edu) + + Wavelet : Daubechies 12 (db6) + Thresholding : MINIMAXI soft-thresholding + Noise Scaling : Multi-level scaling + Maximum decomposition level : 4 + + */ + +//#include <iostream> +//#include <fstream> +//#include <algo.h> + +#include <vector> + +void delay_correlation(const std::vector<double> &delay1, + int nSamples); diff --git a/pelab/bw-bottleneck/wavelet/Clustering/createClusters.pl b/pelab/bw-bottleneck/wavelet/Clustering/createClusters.pl new file mode 100644 index 0000000000..8b3b533cfd --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/createClusters.pl @@ -0,0 +1,328 @@ +#!/usr/bin/perl +# +# EMULAB-COPYRIGHT +# Copyright (c) 2006 University of Utah and the Flux Group. +# All rights reserved. +# + + +my $expName; +my $projName; +my $logsDir; + +my $newExpName; +my $newProjName; + +%bottleNecks = {}; +my %nodeClasses; +my %nodeNameMap = {}; + +die "Usage: perl sharedBottle.pl proj_name exp_name newProj_name newExp_name initial_conditions.txt(Optional)" +if($#ARGV < 3); + +$projName = $ARGV[0]; +$expName = $ARGV[1]; +$newProjName = $ARGV[2]; +$newExpName = $ARGV[3]; +$initialConditionsFilename = $ARGV[4]; + +$logsDir = "/tmp/clusterLogs2"; + + +# Get the initial conditions. +$elabInitScript = "/proj/tbres/duerig/testbed/pelab/init-elabnodes.pl"; +$initConditionsCommand = $elabInitScript . " -o /tmp/initial-conditions.txt " . $newProjName . " " . $newExpName; + +if($#ARGV == 3) +{ + system($initConditionsCommand); + $initialConditionsFilename = "/tmp/initial-conditions.txt"; +} + +open(CONDITIONS, $initialConditionsFilename); + +my @initialConditions = (); + +while(<CONDITIONS>) +{ + chomp( $_ ); + push(@initialConditions, $_); +} + +close(CONDITIONS); + +my @addressList = (); +my $addrFlag = 0; + +my %bwMap = {}; +my %delayMap = {}; +my %elabMap = {}; + +# Create & send events. +# Get initial conditions for the paths of interest +# from the database, using init-elabnodes.pl +my $tevc = "/usr/testbed/bin/tevc -e $newProjName/$newExpName now"; + +#@@@`/usr/testbed/bin/tevc -w -e $newProjName/$newExpName now elabc reset`; +#@@@`$tevc elabc create start`; + +# Create a list of the IP addresses. +foreach $conditionLine (@initialConditions) +{ + if($conditionLine =~ /(\d*?\.\d*?\.\d*?\.(\d*?))\s(\d*?\.\d*?\.\d*?\.\d*?)\s(\d*?)\s(\d*?)\s[\d\w\-\.]*/) + { + $srcAddress = $1; + $addrFlag = 0; + + foreach $addrEntry (@addressList) + { + if($addrEntry eq $srcAddress) + { + $addrFlag = 1; + } + } + + if($addrFlag == 0) + { + push(@addressList, $srcAddress); + + $elabMap{$srcAddress} = "elabc-elab-" . $2; + print "Mapping $2 TO $elabMap{$srcAddress}\n"; + } + +# Create a mapping of the initial conditions. + $bwMap{$1}{$3} = $4; + $delayMap{$1}{$3} = $5; + } +} + +opendir(logsDirHandle, $logsDir); + +my $addrIndex = 0; +my %addrNodeMapping = {}; +my %nodeNumberMapping = {}; +my $numDests = 0; + +foreach $sourceName (readdir(logsDirHandle)) +{ +# Map the elab IP address in the initial conditions file, with +# the node names in the gather-results logs directory. + + if( (-d $logsDir . "/" . $sourceName ) && $sourceName ne "." && $sourceName ne ".." ) + { + $addrNodeMapping{$sourceName} = $addressList[$addrIndex]; + $nodeNameMapping{$sourceName} = $addrIndex + 1; + $nodeNumberMapping{$addrIndex + 1} = $sourceName; + $addrIndex++; + $numDests++; + } +} + +rewinddir(logsDirHandle); + +# Descend into all the source directories +foreach $sourceName (readdir(logsDirHandle)) +{ + if( (-d $logsDir . "/" . $sourceName ) && $sourceName ne "." && $sourceName ne ".." ) + { + + my @destLists; +# Then search for all possible destinations for +# a particular source. + opendir(sourceDirHandle, $logsDir . "/" . $sourceName); + + my @denoisedDelays = (); + +# print "Into directory $sourceName\n"; + + foreach $destOne (readdir(sourceDirHandle)) + { + if( (-d $logsDir . "/" . $sourceName . "/" . $destOne) && $destOne ne "." && $destOne ne ".." ) + { + # Inside each destination directory, look for + # delay.log file with delay values. + +# print "Into directory $sourceName/$destOne\n"; + + $fullPath = "$logsDir/$sourceName/$destOne/delay.log"; + $waveletScript = "./MinSikKim/MinSikKimProgram $fullPath"; + my @scriptOutput; + + @scriptOutput = `$waveletScript`; + #print "@scriptOutput\n"; + # print "Index = $nodeNameMapping{$destOne}\n"; + $denoisedDelays[$nodeNameMapping{$destOne}] = [ @scriptOutput ]; + + #foreach $testIter (@{$denoisedDelays[$nodeNameMapping{$destOne}]}) + #{ + # print "$testIter"; + #} + } + } + closedir(sourceDirHandle); + + local @equivClasses = (); + + $tmpTreeRecordFile = "/tmp/bw-wavelet-clustering.rcd"; + + open(RECORDFILE, ">$tmpTreeRecordFile"); + print RECORDFILE "128\n"; + for($i = 1; $i <= $numDests; $i++) + { + if($i == $nodeNameMapping{$sourceName}) + { + next; + } + else + { + my @scriptOutput; + @scriptOutput = @{ $denoisedDelays[$i] }; + + # Put this destination in a seperate cluster - we + # have zero samples/delay values. + if( ($#scriptOutput == 0) or ($#scriptOutput < 120) ) + { + my @newEquivClass = (); + push(@newEquivClass, $i); + + push(@equivClasses, [@newEquivClass]); + } + else + { + my $counter = 0; + my $avgValue = 0; + my @delayValueArray = (); + foreach $delayValue (@scriptOutput) + { + chomp($delayValue); + + if($counter < 128) + { + $avgValue += $delayValue; + push(@delayValueArray, $delayValue); + $counter++; + } + else + { + last; + } + } + $avgValue = $avgValue/128.0; + + my $denominator = 0; + foreach $delayValue (@delayValueArray) + { + $denominator += ($delayValue - $avgValue)*($delayValue - $avgValue); + } + $denominator = sqrt($denominator); + + foreach $delayValue (@delayValueArray) + { + $delayValue = ($delayValue - $avgValue)/$denominator; + print RECORDFILE "$delayValue:"; + } + + #foreach $delayValue (@scriptOutput) + #{ + # chomp($delayValue); +# +# if($counter < 128) +# { +# print RECORDFILE "$delayValue:"; +# $counter++; +# } +# else +# { +# last; +# } +# } + + print RECORDFILE "$i\n"; + } + } + } + close(RECORDFILE); + + $clusteringProgram = "/tmp/clustering/c++-samples/wvcluster $tmpTreeRecordFile /tmp/tmp.idx"; + + my @clusteringOutput ; + + @clusteringOutput = `$clusteringProgram`; + + foreach $clusterLine (@clusteringOutput) + { + @clusterElements = split(/ /, $clusterLine); + my @newEquivClass = (); + + foreach $nodeId (@clusterElements) + { + if($nodeId =~ /\d+/) + { + push(@newEquivClass, $nodeId); + } + + } + push(@equivClasses, [@newEquivClass]); + } + + print "Clusters for $sourceName:\n"; + foreach $tmpName (@equivClasses) + { + foreach $tmpName2 (@$tmpName) + { + print "$nodeNumberMapping{$tmpName2} "; + } + print "\n"; + } + + print "\n"; + +# Send the events to all the nodes which form an equivalent class. + foreach $tmpName (@equivClasses) + { + my $bwEventCommand = "$tevc $elabMap{$addrNodeMapping{$sourceName}} modify DEST="; + my $firstDestFlag = 0; + + my $maxBw = 0; + +# Find the maximum available bandwidth in all the paths of this equivalence class. + foreach $tmpName2 (@$tmpName) + { + if($bwMap{$addrNodeMapping{$sourceName}}{$addrNodeMapping{$nodeNumberMapping{$tmpName2}}} > $maxBw) + { + $maxBw = $bwMap{$addrNodeMapping{$sourceName}}{$addrNodeMapping{$nodeNumberMapping{$tmpName2}}}; + } + } + + foreach $tmpName2 (@$tmpName) + { + if($firstDestFlag == 0) + { + $bwEventCommand = $bwEventCommand . $addrNodeMapping{$nodeNumberMapping{$tmpName2}}; + $firstDestFlag = 1; + } + else + { + $bwEventCommand = $bwEventCommand . "," . $addrNodeMapping{$nodeNumberMapping{$tmpName2}}; + } + + +# my $delayEventCommand = "$tevc $elabMap{$addrNodeMapping{$sourceName}} modify DEST=" . $addrNodeMapping{$destSeen[$tmpName2]}; + my $delayEventCommand = "$tevc ".$elabMap{$addrNodeMapping{$nodeNumberMapping{$tmpName2}}}." modify DEST=" . $addrNodeMapping{$nodeNumberMapping{$tmpName2}}." SRC=".$addrNodeMapping{$sourceName}; + + $delayEventCommand = $delayEventCommand . " " . "DELAY=" . ($delayMap{$addrNodeMapping{$sourceName}}{$addrNodeMapping{$nodeNumberMapping{$tmpName2}}}); +# Execute the delay event command. + print "EXECUTE $delayEventCommand\n"; + #@@@ `$delayEventCommand`; + } + $bwEventCommand = $bwEventCommand . " " . "BANDWIDTH=" . $maxBw; +# Execute the event to set the bandwidth for this equivalence class. + print "EXECUTE $bwEventCommand\n"; + #@@@ `$bwEventCommand`; + } + + print "\n\n"; + } +} + +closedir(logsDirHandle); diff --git a/pelab/bw-bottleneck/wavelet/Clustering/udpClient/compile.sh b/pelab/bw-bottleneck/wavelet/Clustering/udpClient/compile.sh new file mode 100755 index 0000000000..0cc4bfce4d --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/udpClient/compile.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +g++ udpClient.cc -o udpClient -lpcap diff --git a/pelab/bw-bottleneck/wavelet/Clustering/udpClient/udpClient.cc b/pelab/bw-bottleneck/wavelet/Clustering/udpClient/udpClient.cc new file mode 100644 index 0000000000..f4867ecba4 --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/udpClient/udpClient.cc @@ -0,0 +1,502 @@ +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <stdio.h> +#include <unistd.h> +#include <fcntl.h> +#include <string.h> +#include <sys/time.h> +#include <pcap.h> +#include <errno.h> +#include <netinet/in.h> +#include <netinet/ip.h> +#include <netinet/udp.h> +#include <netinet/if_ether.h> +#include <net/ethernet.h> +#include <netinet/ether.h> +#include <iostream> +#include <fstream> +#include <cstdlib> +#include <vector> +#include <string> +#include <map> +#include <sstream> + +#define REMOTE_SERVER_PORT 19835 +#define MAX_MSG 100 + + +#define SOCKET_ERROR -1 +pcap_t *pcapDescriptor = NULL; + +using namespace std; + +vector< vector<int> > delaySequenceArray; +vector< map<unsigned long long, int> > packetTimeMaps; +vector< map<string, unsigned long long> > actualTimeMaps; + +unsigned long long getTimeMilli() +{ + struct timeval tp; + gettimeofday(&tp, NULL); + + long long tmpSecVal = tp.tv_sec; + long long tmpUsecVal = tp.tv_usec; + + return (tmpSecVal*1000 + tmpUsecVal/1000); +} + + +void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr, u_char *const udpPacketStart, struct ip const *ipPacket) +{ + u_char *dataPtr = udpPacketStart + 8; + + + char packetType = *(char *)(dataPtr); + long long milliSec = 0; + int ackLength = 0; + + if(packetType == '0') + { + int hostIndex = ( *(short int *)(dataPtr + 1)); + unsigned long long origTimestamp; + unsigned long long secVal = pcap_info->ts.tv_sec; + unsigned long long usecVal = pcap_info->ts.tv_usec; + + memcpy(&origTimestamp, ( unsigned long long *)(dataPtr + 1 + sizeof(short int)), sizeof(unsigned long long)); + ostringstream tmpStrStream; + tmpStrStream << origTimestamp; + actualTimeMaps[hostIndex][tmpStrStream.str()] = (unsigned long long)(secVal*1000 + usecVal/1000); + printf("Recording Timestamp = %s for Host = %d, value = %llu\n", tmpStrStream.str().c_str(), hostIndex, actualTimeMaps[hostIndex][tmpStrStream.str()]); + } + else if(packetType == '1') + { + // We received an ACK, pass it on to the sensors. + int hostIndex = ( *(short int *)(dataPtr + 1)); + unsigned long long origTimestamp; + long long oneWayDelay; + memcpy(&origTimestamp, ( unsigned long long *)(dataPtr + 1 + sizeof(short int)), sizeof(unsigned long long)); + memcpy(&oneWayDelay, ( long long *)(dataPtr + 1 + sizeof(short int) + sizeof(unsigned long long)), sizeof(long long)); + ostringstream tmpStrStream; + tmpStrStream << origTimestamp; + cout << " Onewaydelay for the ACK = " << oneWayDelay << ", host Index = "<< hostIndex << "\n"; + cout <<" Orig timestamp was "<< tmpStrStream.str() << " , actual time = "<< actualTimeMaps[hostIndex][tmpStrStream.str()]<<"\n"; + cout <<"Packet time map Index = "<< packetTimeMaps[hostIndex][origTimestamp] << ", host index = " << hostIndex << " \n"; + delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] = oneWayDelay - ( actualTimeMaps[hostIndex][tmpStrStream.str()] - origTimestamp); + + if(oneWayDelay < 50000 && oneWayDelay > -50000 && (delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] > 100000 || delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] < -100000 ) ) + { + cout<<"ERROR: Incorrect delay value, one way delay = "<< oneWayDelay<<", Calculated delay = "<< delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]]<<"\n"; + + + } + if(actualTimeMaps[hostIndex].count(tmpStrStream.str()) == 0) + { + printf("ERROR: Original timestamp was not found in the packet Hash\n\n"); + } + } + else + { + printf("ERROR: Unknown UDP packet received from remote agent\n"); + return; + } +} + +int getLinkLayer(struct pcap_pkthdr const *pcap_info, const u_char *pkt_data) +{ + unsigned int caplen = pcap_info->caplen; + + if (caplen < sizeof(struct ether_header)) + { + printf("A captured packet was too short to contain " + "an ethernet header"); + return -1; + } + else + { + struct ether_header * etherPacket = (struct ether_header *) pkt_data; + return ntohs(etherPacket->ether_type); + } +} + +void pcapCallback(u_char *user, const struct pcap_pkthdr *pcap_info, const u_char *pkt_data) +{ + int packetType = getLinkLayer(pcap_info, pkt_data); + + if(packetType != ETHERTYPE_IP) + { + printf("Unknown link layer type: %d\n", packetType); + return; + } + + struct ip const *ipPacket; + size_t bytesLeft = pcap_info->caplen - sizeof(struct ether_header); + + if(bytesLeft < sizeof(struct ip)) + { + printf("Captured packet was too short to contain an IP header.\n"); + return; + } + + ipPacket = (struct ip const *)(pkt_data + sizeof(struct ether_header)); + int ipHeaderLength = ipPacket->ip_hl; + int ipVersion = ipPacket->ip_v; + + + if(ipVersion != 4) + { + printf("Captured IP packet is not IPV4.\n"); + return; + } + + if(ipHeaderLength < 5) + { + printf("Captured IP packet has header less than the minimum 20 bytes.\n"); + return; + } + + if(ipPacket->ip_p != IPPROTO_UDP) + { + printf("Captured packet is not a UDP packet.\n"); + return; + } + + // Ignore the IP options for now - but count their length. + ///////////////////////////////////////////////////////// + u_char *udpPacketStart = (u_char *)(pkt_data + sizeof(struct ether_header) + ipHeaderLength*4); + + struct udphdr const *udpPacket; + + udpPacket = (struct udphdr const *)(udpPacketStart); + + bytesLeft -= ipHeaderLength*4; + + if(bytesLeft < sizeof(struct udphdr)) + { + printf("Captured packet is too small to contain a UDP header.\n"); + return; + } + + handleUDP(pcap_info,udpPacket,udpPacketStart, ipPacket); +} + +void init_pcap( char *ipAddress) +{ + char interface[] = "eth0"; + struct bpf_program bpfProg; + char errBuf[PCAP_ERRBUF_SIZE]; + char filter[128] = " udp "; + + // IP Address and sub net mask. + bpf_u_int32 maskp, netp; + struct in_addr localAddress; + + pcap_lookupnet(interface, &netp, &maskp, errBuf); + pcapDescriptor = pcap_open_live(interface, BUFSIZ, 0, 0, errBuf); + localAddress.s_addr = netp; + printf("IP addr = %s\n", ipAddress); + sprintf(filter," udp and ( (src host %s and dst port 19835 ) or (dst host %s and src port 19835 )) ", ipAddress, ipAddress); + + if(pcapDescriptor == NULL) + { + printf("Error opening device %s with libpcap = %s\n", interface, errBuf); + exit(1); + } + + pcap_compile(pcapDescriptor, &bpfProg, filter, 1, netp); + pcap_setfilter(pcapDescriptor, &bpfProg); + pcap_setnonblock(pcapDescriptor, 1, errBuf); + +} + +int main(int argc, char **argv) +{ + int clientSocket, rc, i, n, flags = 0, error, timeOut; + socklen_t echoLen; + struct sockaddr_in cliAddr, remoteServAddr1, remoteServAddr2, servAddr, localHostAddr; + struct hostent *host1, *host2, *localhostEnt; + char msg[MAX_MSG]; + vector<struct sockaddr_in> remoteServAddresses; + + string hostNameFile = argv[1]; + string outputDirectory = argv[2]; + string localHostName = argv[3]; + + int timeout = 2000; // 1 second + int probeRate = 10; // Hz + int probeDuration = 15000; // 15 seconds + vector<string> hostList; + + ifstream inputFileHandle; + + localhostEnt = gethostbyname(argv[3]); + memcpy((char *) &localHostAddr.sin_addr.s_addr, + localhostEnt->h_addr_list[0], localhostEnt->h_length); + init_pcap(inet_ntoa(localHostAddr.sin_addr)); + int pcapfd = pcap_get_selectable_fd(pcapDescriptor); + + // Create the output directory. + string commandString = "mkdir " + outputDirectory; + system(commandString.c_str()); + + // Read the input file having all the planetlab node IDs. + + inputFileHandle.open(hostNameFile.c_str(), std::ios::in); + + char tmpStr[81] = ""; + string tmpString; + bool enableClientFlag = false; + + while(!inputFileHandle.eof()) + { + inputFileHandle.getline(tmpStr, 80); + tmpString = tmpStr; + + if(tmpString.size() < 3) + continue; + if(tmpString != localHostName) + { + hostList.push_back(tmpString); + cout << tmpString << "\n"; + } + else + enableClientFlag = true; + } + inputFileHandle.close(); + + // We don't need to run the probe client on this node - just exit. + if(enableClientFlag == false) + exit(0); + + int numHosts = hostList.size(); + + remoteServAddresses.resize(numHosts); + for(int i = 0;i < numHosts; i++) + { + host1 = NULL; + host1 = gethostbyname(hostList[i].c_str()); + + remoteServAddresses[i].sin_family = host1->h_addrtype; + memcpy((char *) &remoteServAddresses[i].sin_addr.s_addr, + host1->h_addr_list[0], host1->h_length); + remoteServAddresses[i].sin_port = htons(REMOTE_SERVER_PORT); + } + + int targetSleepTime = (1000/probeRate) - 1; + + cliAddr.sin_family = AF_INET; + cliAddr.sin_addr.s_addr = htonl(INADDR_ANY); + cliAddr.sin_port = htons(0); + + clientSocket = socket(AF_INET, SOCK_DGRAM, 0); + rc = bind(clientSocket, (struct sockaddr *) &cliAddr, sizeof(cliAddr)); + + fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK); + + delaySequenceArray.resize(numHosts); + packetTimeMaps.resize(numHosts); + actualTimeMaps.resize(numHosts); + + ///////////////////////////// + // For each destination, send a train of UDP packets. + int packetCounter = 0; + unsigned long long startTime = getTimeMilli(); + unsigned long long lastSentTime = startTime; + bool endProbesFlag = false; + bool readTimeoutFlag = false; + + while ((( lastSentTime - startTime) < probeDuration) || !(readTimeoutFlag)) + { + + // Stop waiting for probe replies after a timeout - calculated from the + // time the last probe was sent out. + if (endProbesFlag && ( (getTimeMilli() - lastSentTime) > timeout)) + readTimeoutFlag = 1; + // Stop sending probes after the given probe duration. + if (!(endProbesFlag) && (lastSentTime - startTime) > probeDuration) + endProbesFlag = 1; + + if (endProbesFlag) + usleep(timeout*100); + + fd_set socketReadSet, socketWriteSet; + FD_ZERO(&socketReadSet); + FD_SET(clientSocket,&socketReadSet); + FD_SET(pcapfd,&socketReadSet); + FD_ZERO(&socketWriteSet); + FD_SET(clientSocket,&socketWriteSet); + + struct timeval timeoutStruct; + + timeoutStruct.tv_sec = 0; + timeoutStruct.tv_usec = 0; + + if (!endProbesFlag) + { + select(clientSocket+pcapfd+1,&socketReadSet,&socketWriteSet,0,&timeoutStruct); + } + else + { + select(clientSocket+pcapfd+1,&socketReadSet,0,0,&timeoutStruct); + } + + if (FD_ISSET(pcapfd,&socketReadSet) ) + { + pcap_dispatch(pcapDescriptor, 10000, pcapCallback, NULL); + } + if (!readTimeoutFlag) + { + if (FD_ISSET(clientSocket,&socketReadSet) != 0) + { + while (true) + { + int flags = 0; + if( recvfrom(clientSocket, msg, MAX_MSG, flags, + (struct sockaddr *) &servAddr, &echoLen) != -1) + { + while(pcap_dispatch(pcapDescriptor, 1, pcapCallback, NULL) != 0); + } + else + { + if(endProbesFlag) + usleep(timeout*100); + break; + } + } + } + } + + if (!endProbesFlag) + { + if (FD_ISSET(clientSocket,&socketWriteSet) != 0) + { + char messageString[60]; + int flags = 0; + short int hostIndex; + for (int i = 0; i < numHosts; i++) + { + // Send the probe packets. + unsigned long long sendTime = getTimeMilli(); + messageString[0] = '0'; + hostIndex = i; + + memcpy(&messageString[1], &hostIndex, sizeof(short int)); + memcpy(&messageString[1 + sizeof(short int)], &sendTime, sizeof(unsigned long long)); + + rc = sendto(clientSocket, messageString, 1 + sizeof(short int) + sizeof(unsigned long long), flags, + (struct sockaddr *) &remoteServAddresses[i], sizeof(remoteServAddresses[i])); + + if(rc < 0) + printf("ERROR sending %dth udp message, packetCounter = %d\n", i, packetCounter); + + packetTimeMaps[i][sendTime] = packetCounter; + delaySequenceArray[i].push_back(-9999); + + cout<< "TO " << hostList[i] << " :Counter=" << packetCounter << " :SendTime= " << sendTime << endl; + + + } + while(pcap_dispatch(pcapDescriptor, -1, pcapCallback, NULL) != 0); + packetCounter++; + lastSentTime = getTimeMilli(); + // Sleep for 99 msec for a 10Hz target probing rate. + usleep(targetSleepTime*1000); + } + else + { + if (!(getTimeMilli() - lastSentTime > targetSleepTime)) + { + cout << " About to sleep for " << ( targetSleepTime - (getTimeMilli() - lastSentTime) )*1000 <<"\n"; + usleep( ( targetSleepTime - (getTimeMilli() - lastSentTime) )*1000) ; + } + } + } + } + ////////////////////////////// + + usleep(20000000); + + while(pcap_dispatch(pcapDescriptor, 1, pcapCallback, NULL) != 0); + + for (int i = 0; i < numHosts; i++) + { + string dirPath = outputDirectory + "/" + hostList[i]; + commandString = "mkdir " + dirPath; + system(commandString.c_str()); + + ofstream tmpFileHandle; + string tmpFilePath = dirPath + "/" + "debug.log"; + tmpFileHandle.open(tmpFilePath.c_str(), std::ios::out); + + for(int k = 0; k < delaySequenceArray[i].size(); k++) + { + tmpFileHandle << delaySequenceArray[i][k] << "\n"; + } + tmpFileHandle.close(); + + // If we lost some replies/packets, linearly interpolate their delay values. + int delaySeqLen = delaySequenceArray[i].size(); + int firstSeenIndex = -1; + int lastSeenIndex = -1; + + for (int k = 0; k < delaySeqLen; k++) + { + if (delaySequenceArray[i][k] != -9999) + { + lastSeenIndex = k; + if (firstSeenIndex == -1) + firstSeenIndex = k; + } + } + + if (lastSeenIndex != -1) + { + for (int k = firstSeenIndex; k < lastSeenIndex + 1; k++) + { + if (delaySequenceArray[i][k] == -9999) + { + // Find the number of missing packets in this range. + int numMissingPackets = 0; + int lastInRange = 0; + for (int l = k; l < lastSeenIndex + 1; l++) + { + if(delaySequenceArray[i][l] == -9999) + { + numMissingPackets++; + } + else + { + lastInRange = l; + break; + } + } + + int step = (delaySequenceArray[i][lastInRange] - delaySequenceArray[i][k-1])/(numMissingPackets + 1); + + // Interpolate delays for the missing packets in this range. + int y = 0; + for (int x = k, y = 1; x < lastInRange; x++, y++) + delaySequenceArray[i][x] = delaySequenceArray[i][k-1] + y*step ; + } + } + } + + ofstream outputFileHandle; + string delayFilePath = dirPath + "/" + "delay.log"; + outputFileHandle.open(delayFilePath.c_str(), std::ios::out); + if (lastSeenIndex != -1) + { + for (int k = firstSeenIndex; k < lastSeenIndex + 1; k++) + { + outputFileHandle << delaySequenceArray[i][k] << "\n"; + } + } + outputFileHandle.close(); + if (lastSeenIndex == -1) + cout<< "ERROR: No samples were seen for host: " << hostList[i] << endl; + } +} + diff --git a/pelab/bw-bottleneck/wavelet/Clustering/udpServer/compile.sh b/pelab/bw-bottleneck/wavelet/Clustering/udpServer/compile.sh new file mode 100755 index 0000000000..edc3d56ef0 --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/udpServer/compile.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +g++ udpServer.cc -o udpServer -lpcap diff --git a/pelab/bw-bottleneck/wavelet/Clustering/udpServer/udpServer.cc b/pelab/bw-bottleneck/wavelet/Clustering/udpServer/udpServer.cc new file mode 100644 index 0000000000..9345d91039 --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/udpServer/udpServer.cc @@ -0,0 +1,253 @@ +#include <iostream> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <stdio.h> +#include <unistd.h> +#include <string.h> +#include <sys/time.h> +#include <pcap.h> +#include <errno.h> +#include <netinet/in.h> +#include <netinet/ip.h> +#include <netinet/udp.h> +#include <netinet/if_ether.h> +#include <net/ethernet.h> +#include <netinet/ether.h> + + +#define LOCAL_SERVER_PORT 19835 +#define MAX_MSG 1024 + +pcap_t *pcapDescriptor = NULL; + +char appAck[60]; +unsigned long long milliSec = 0; + +int sd, rc, n, flags; +socklen_t cliLen; +struct sockaddr_in cliAddr, servAddr; + +using namespace std; + +unsigned long long getTimeMilli() +{ + struct timeval tp; + gettimeofday(&tp, NULL); + + long long tmpSecVal = tp.tv_sec; + long long tmpUsecVal = tp.tv_usec; + + return (tmpSecVal*1000 + tmpUsecVal/1000); +} + +void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr, u_char *const udpPacketStart, struct ip const *ipPacket) +{ + + u_char *dataPtr = udpPacketStart + 8; + unsigned char packetType = *(unsigned char *)(dataPtr); + + if(packetType == '0')// This is a udp data packet arriving here. Send an + // application level acknowledgement packet for it. + { + unsigned long long secVal = pcap_info->ts.tv_sec; + unsigned long long usecVal = pcap_info->ts.tv_usec; + unsigned long long recvTimestamp = secVal*1000 + usecVal/1000; + unsigned long long sendTimestamp = *(unsigned long long *)(dataPtr + sizeof(short int) + 1); + long long oneWayDelay = recvTimestamp - sendTimestamp; + + appAck[0] = '1'; + memcpy(&appAck[1] ,(dataPtr + 1), sizeof(short int)); + memcpy(&appAck[1 + sizeof(short int)], &sendTimestamp, sizeof(unsigned long long)); + memcpy(&appAck[1 + + sizeof(short int) + sizeof(unsigned long long)], &oneWayDelay, sizeof(long long)); + cout << "Sending app level ack to "<< inet_ntoa(cliAddr.sin_addr) << ",at " << sendTimestamp << " , recvtimestamp = "<<recvTimestamp<< ", delay = "<< oneWayDelay << endl; + + sendto(sd,appAck,1 + + sizeof(short int) + 2*sizeof(long long),flags,(struct sockaddr *)&cliAddr,cliLen); + } + else if(packetType == '1') // TODO:This is an udp ACK packet. If it is being sent + // out from this host, do nothing. + { + + } + else + { + printf("ERROR: Unknown UDP packet received from remote agent\n"); + return; + } +} + +int getLinkLayer(struct pcap_pkthdr const *pcap_info, const u_char *pkt_data) +{ + unsigned int caplen = pcap_info->caplen; + + if (caplen < sizeof(struct ether_header)) + { + printf("A captured packet was too short to contain " + "an ethernet header"); + return -1; + } + else + { + struct ether_header * etherPacket = (struct ether_header *) pkt_data; + return ntohs(etherPacket->ether_type); + } +} + +void pcapCallback(u_char *user, const struct pcap_pkthdr *pcap_info, const u_char *pkt_data) +{ + int packetType = getLinkLayer(pcap_info, pkt_data); + + if(packetType != ETHERTYPE_IP) + { + printf("Unknown link layer type: %d\n", packetType); + return; + } + + struct ip const *ipPacket; + size_t bytesLeft = pcap_info->caplen - sizeof(struct ether_header); + + + if(bytesLeft < sizeof(struct ip)) + { + printf("Captured packet was too short to contain an IP header.\n"); + return; + } + + ipPacket = (struct ip const *)(pkt_data + sizeof(struct ether_header)); + int ipHeaderLength = ipPacket->ip_hl; + int ipVersion = ipPacket->ip_v; + + + if(ipVersion != 4) + { + printf("Captured IP packet is not IPV4.\n"); + return; + } + + if(ipHeaderLength < 5) + { + printf("Captured IP packet has header less than the minimum 20 bytes.\n"); + return; + } + + if(ipPacket->ip_p != IPPROTO_UDP) + { + printf("Captured packet is not a UDP packet.\n"); + return; + } + + // Ignore the IP options for now - but count their length. + ///////////////////////////////////////////////////////// + u_char *const udpPacketStart = (u_char *const)(pkt_data + sizeof(struct ether_header) + ipHeaderLength*4); + + struct udphdr const *udpPacket; + + udpPacket = (struct udphdr const *)udpPacketStart; + + bytesLeft -= ipHeaderLength*4; + + if(bytesLeft < sizeof(struct udphdr)) + { + printf("Captured packet is too small to contain a UDP header.\n"); + return; + } + + handleUDP(pcap_info,udpPacket,udpPacketStart, ipPacket); +} + + +void init_pcap( char *ipAddress) +{ + char interface[] = "eth0"; + struct bpf_program bpfProg; + char errBuf[PCAP_ERRBUF_SIZE]; + char filter[128] = " udp "; + + // IP Address and sub net mask. + bpf_u_int32 maskp, netp; + struct in_addr localAddress; + + pcap_lookupnet(interface, &netp, &maskp, errBuf); + pcapDescriptor = pcap_open_live(interface, BUFSIZ, 0, 0, errBuf); + localAddress.s_addr = netp; + printf("IP addr = %s\n", ipAddress); + sprintf(filter," udp and dst host %s and dst port 19835 ", ipAddress); + + if(pcapDescriptor == NULL) + { + printf("Error opening device %s with libpcap = %s\n", interface, errBuf); + exit(1); + } + + pcap_compile(pcapDescriptor, &bpfProg, filter, 1, netp); + pcap_setfilter(pcapDescriptor, &bpfProg); +} + + +int main(int argc, char *argv[]) { + + char msg[MAX_MSG]; + struct hostent *localHost; + + /* socket creation */ + sd=socket(AF_INET, SOCK_DGRAM, 0); + if(sd<0) { + printf("%s: cannot open socket \n",argv[0]); + exit(1); + } + + + + localHost = gethostbyname(argv[1]); + if(localHost == NULL) + { + printf("ERROR: Unknown host %s\n", argv[1]); + exit(1); + } + + /* bind local server port */ + servAddr.sin_family = AF_INET; + servAddr.sin_family = localHost->h_addrtype; + memcpy((char *) &servAddr.sin_addr.s_addr, + localHost->h_addr_list[0], localHost->h_length); + servAddr.sin_port = htons(LOCAL_SERVER_PORT); + rc = bind (sd, (struct sockaddr *) &servAddr,sizeof(servAddr)); + if(rc<0) { + printf("%s: cannot bind port number %d \n", + argv[0], LOCAL_SERVER_PORT); + exit(1); + } + + printf("%s: waiting for data on port UDP %u\n", + argv[0],LOCAL_SERVER_PORT); + + flags = 0; + + + init_pcap(inet_ntoa(servAddr.sin_addr)); + + /* server infinite loop */ + while(1) { + + memset(msg,0x0,MAX_MSG); + + /* receive message */ + cliLen = sizeof(cliAddr); + n = recvfrom(sd, msg, MAX_MSG, flags, + (struct sockaddr *) &cliAddr, &cliLen); + + pcap_dispatch(pcapDescriptor, 1, pcapCallback, NULL); + + if(n<0) { + printf("%s: cannot receive data \n",argv[0]); + continue; + } + } + +return 0; + +} + diff --git a/pelab/bw-bottleneck/wavelet/Clustering/wvcluster.cc b/pelab/bw-bottleneck/wavelet/Clustering/wvcluster.cc new file mode 100644 index 0000000000..1f6c46129e --- /dev/null +++ b/pelab/bw-bottleneck/wavelet/Clustering/wvcluster.cc @@ -0,0 +1,625 @@ +/* + * dynamicBuild.cc + * Copyright (C) 1999 Norio Katayama + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free + * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, + * MA 02111-1307, USA + * + * 10/14/96 katayama@rd.nacsis.ac.jp + * $Id: wvcluster.cc,v 1.1 2007-09-14 21:40:16 pramod Exp $ + */ + +/* Modified into wvcluster.cc by Taekhyun Kim */ +/* 2005.07.03 */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <fstream.h> +#include <vector> +#include <algorithm> + +#ifdef _MSC_VER +#include "HnSRTree/HnGetOpt.h" +#else +#include <unistd.h> +#endif +#include "RecordFileSt.hh" +#include "HnSRTree/HnSRTreeFile.hh" + +#include "HnSRTree/HnStringBuffer.hh" +#include "HnSRTree/HnTimesSt.h" + + +#define DEFAULT_ATTRIBUTE_SIZE 64 +#define DEFAULT_THRESHOLD 0.750 +#define NDATA 100 +#define NDIM 5 + +typedef struct{ + double Ocount; + double FalsePOS; + double FalseNEG; + double CPUtime; +} WVresultSt; + +typedef struct{ + double Ocount; + double FalsePOS; + double FalseNEG; +} CAresultSt; + +static HnTimesSt *ST1, *ST2, *ET1,*ET2, *RT1, *RT2; +static HnSRTreeProfileSt *treeProfile; + +static void printUsage(void); +static WVresultSt wvcluster(const char *recordFileName, int maxCount, + const char *indexFileName, int dataItemSize, + const HnProperties &properties, HnBool debug, + HnBool onMemory,double threshold,int verbose,const char *resultFileName, char *congFileName); +CAresultSt ClusterAccuracy(vector<vector<int> > Clustered,int CMcount,char *congFileName,int verbose); + +/* profiling */ +static void initializeProfileData(void); +static void startProfiling(int); +static void endProfiling(int); +static void printProfileData(int); +static void freeProfileData(void); + + int +main(int argc, char *argv[]) +{ + //char recordFileName[64]; + //char indexFileName[64]; + char *recordFileName; + char *indexFileName; + char congFileName[64]; + char resultFileName[64]; + char buff[16]; + int dataItemSize; + int maxCount; + double CATotal,FalsePOSTotal,FalseNEGTotal; + int verbose; + double CPUtimeTotal; + int dimensions[5]={18,36,61,101,170}; + int npaths[7]={1,2,4,8,16,32,64}; + WVresultSt WVresult; + HnBool debug; + HnBool onMemory; + HnStringBuffer sb; + HnProperties properties; + + int i,j,c, errflag; + extern int optind; + extern char *optarg; + int dim = 18; + + double threshold; + + WVresult.Ocount=0; + WVresult.CPUtime=0; + + dataItemSize = DEFAULT_ATTRIBUTE_SIZE; + threshold = DEFAULT_THRESHOLD; + maxCount = -1; + debug = HnFALSE; + onMemory = HnFALSE; + verbose = 0; + + sb = new_HnStringBuffer(); + + errflag = 0; + /* + while ( (c = getopt(argc, argv, "c:dmp:v:s:t:i:")) != EOF ) { + switch ( c ) { + case 'c': + maxCount = atoi(optarg); + break; + case 'd': + debug = HnTRUE; + break; + case 'm': + onMemory = HnTRUE; + break; + case 'p': + sb.append(optarg); + sb.append('\n'); + break; + case 'v': + verbose = atoi(optarg); + break; + case 's': + dataItemSize = atoi(optarg); + break; + case 't': + threshold = atof(optarg); + if(verbose) + printf("threshold = %f\n",threshold); + break; + case 'i': + dim=atoi(optarg); + break; + case '?': + errflag = 1; + break; + } + } + */ + /* + if ( errflag || optind != argc ) { + printUsage(); + return 1; + } + */ + + maxCount = 10000; + dim = 16; + verbose = 0; + //recordFileName = argv[optind]; + //indexFileName = argv[optind + 1]; + recordFileName = argv[1]; + indexFileName = argv[2]; + + properties = new_HnProperties(); + properties.load(sb.toString()); + + strcpy(resultFileName, "result.file"); + + WVresult = wvcluster(recordFileName, maxCount, + indexFileName, dataItemSize, + properties, debug, onMemory,threshold,verbose,resultFileName,congFileName); + + return 0; +} + + static void +printUsage(void) +{ + fprintf(stderr, "\ + Usage: wvcluster [options] recordFile indexFile\n\ + Options\n\ + -c count set the number of records to be read from recordFile\n\ + -d turn on the debug mode\n\ + -m build the tree on memory and finally dump it to the file\n\ + -p property set the property of the index\n\ + -s dataSize set the size of data attributes (%d by default)\n\ + -t threshold set the threshold for XCOR test (%f by default)\n\ + ", DEFAULT_ATTRIBUTE_SIZE,DEFAULT_THRESHOLD); +} + +static WVresultSt +wvcluster(const char *recordFileName, int maxCount, + const char *indexFileName, int dataItemSize, + const HnProperties &properties, HnBool debug, + HnBool onMemory,double threshold,int verbose,const char *resultFileName, char *congFileName) +{ + RecordFileSt *recordFile; + HnSRTreeFile indexCM; + HnPointVector allPaths; + //HnPointVector allPaths2; + HnDataItemVector allData; + int count; + int i1,i2; + HnProperties treeProperties; + WVresultSt Result; + CAresultSt CAresult; + + vector<vector<int> > Group(1024); + vector<double> CPUtimes1; + vector<double> CPUtimes2; + + ofstream resultFile(resultFileName); + + HnSRTreeFile::setDebug(debug); + + initializeProfileData(); + + recordFile = RecordFileSt_open(recordFileName); + indexCM = new_HnSRTreeFile((char *)NULL , + recordFile->dimension, dataItemSize, + properties); + + if ( indexCM == HnSRTreeFile::null ) { + perror(indexFileName); + exit(1); + } + + treeProperties = indexCM.getProperties(); + //treeProperties.print(); + indexCM.initializeStore(); + + startProfiling(2); + allPaths = new_HnPointVector(); + allData = new_HnDataItemVector(); + count = 0; + + /* Inserting record points to index file */ + + while ( maxCount < 0 || count < maxCount ) { + HnPoint point; + HnDataItem dataItem; + + if ( RecordFileSt_getRecord(recordFile, &point, &dataItem) < 0 ) { + break; + } + //printf("Inserting path %s Clustered(%d) \n",dataItem.toCharArray(),point.isClustered()); + //indexFile.store(point, dataItem); + + double *tmpArray = point.getCoords(); + + //for(int i = 0;i < point.getDimension(); i++) + //{ + // printf("Coord[%d] = %f\n", i, tmpArray[i]); + //} + allPaths.addElement(point); + allData.addElement(dataItem); + + + count ++; + } + + /* End of inserting */ + + /* Clustering */ + + //indexCM = new_HnSRTreeFile( (char *)NULL , + // recordFile->dimension, dataItemSize, + // properties); + + float radius; + double totalTimes = 0.0; + int CMcount = 0; + + radius = sqrt(2-2*threshold); + if(verbose) + printf("Radius = %f \n",radius); + + + for(i1=0;i1<allPaths.size();i1++) + { + + HnPoint querypoint; + HnDataItem querydata; + HnPoint newCM; + HnDataItem newCMdata; + char CMdatabuf[10]; + + HnPointVector CMpoints; + HnDataItemVector CMdataItems; + + querypoint = allPaths.elementAt(i1); + querydata = allData.elementAt(i1); + + //startProfiling(1); + indexCM.getNeighbors(querypoint,1,&CMpoints,&CMdataItems); + //endProfiling(1); + //totalTimes = totalTimes + HnTimesSt_getCPUTime(RT1); + //CPUtimes1.push_back(totalTimes); + + if(CMpoints.size()==0 ) + { + + newCM=new_HnPoint(querypoint); + + CMcount++; + + sprintf(CMdatabuf,"%d",CMcount-1); + newCMdata = new_HnDataItem(CMdatabuf,10); + char tmpStr[100] = "", tmpStr2[100]; + + strcpy(tmpStr2, querydata.toCharArray()); + tmpStr2[strlen(tmpStr2-2)] = '\0'; + strcpy(tmpStr, &tmpStr2[1]); + + + if(verbose>1) + { + printf("CM made--- (%s) %s %d '%s'\n",CMdatabuf,querydata.toCharArray(), atoi(tmpStr), tmpStr); + + } + + allPaths[i1].setClustered(1); + + Group[CMcount-1].push_back(atoi(querydata.toCharArray())); + //Group[CMcount-1].push_back(atoi(tmpStr)); + indexCM.store(newCM, newCMdata); + + } + else if(querypoint.getDistance(CMpoints[0]) > radius) + { + + newCM=new_HnPoint(querypoint); + CMcount++; + sprintf(CMdatabuf,"%d",CMcount-1); + newCMdata = new_HnDataItem(CMdatabuf,10); + + char tmpStr[100] = "", tmpStr2[100]; + + strcpy(tmpStr2, querydata.toCharArray()); + tmpStr2[strlen(tmpStr2-2)] = '\0'; + strcpy(tmpStr, &tmpStr2[1]); + + if(verbose>1) + { + printf("CM made--- (%s) %s %d '%s', distance = %f \n",CMdatabuf,querydata.toCharArray(), atoi(tmpStr), tmpStr, querypoint.getDistance(CMpoints[0])); + } + + allPaths[i1].setClustered(1); + + Group[CMcount-1].push_back(atoi(querydata.toCharArray())); + //Group[CMcount-1].push_back(atoi(tmpStr)); + indexCM.store(newCM, newCMdata); + + } + + } + + // copy(CPUtimes1.begin(),CPUtimes1.end(),ostream_iterator<double>(cout,"\n")); + + for(i2=0;i2<allPaths.size();i2++) + { + HnPoint querypoint; + HnDataItem querydata; + HnDataItem newCMdata; + int Gindex; + + HnPointVector CMpoints; + HnDataItemVector CMdataItems; + + querypoint = allPaths.elementAt(i2); + querydata = allData.elementAt(i2); + + if(querypoint.isClustered()==0) + { + indexCM.getNeighbors(querypoint,1,&CMpoints,&CMdataItems); + + if(CMpoints.size() > 0) + { + Gindex = atoi(CMdataItems[0].toCharArray()); + char tmpStr[100] = "", tmpStr2[100]; + + strcpy(tmpStr2, querydata.toCharArray()); + tmpStr2[strlen(tmpStr2-2)] = '\0'; + strcpy(tmpStr, &tmpStr2[1]); + //Group[Gindex].push_back(atoi(tmpStr)); + Group[Gindex].push_back(atoi(querydata.toCharArray())); + if(verbose >1) + { + printf("Clustering %s to CM(%d) \n",querydata.toCharArray(),Gindex); + } + + } + else + { + printf("What??? \n"); + } + } + + } + + endProfiling(2); + if(verbose) + printf("Finished:Number of Group (%d)\n",CMcount); + + for(int i=0;i<CMcount;i++) + { + sort(Group[i].begin(),Group[i].end()); + if(verbose) + copy(Group[i].begin(),Group[i].end(),ostream_iterator<int>(cout," ")); + //copy(Group[i].begin(),Group[i].end(),ostream_iterator<int>(resultFile," ")); + for(vector<int>::iterator I=Group[i].begin();I!=Group[i].end();I++) + { + resultFile << *I << " "; + std::cout << *I << " "; + } + resultFile << endl; + std::cout << endl; + if(verbose) + printf("\n"); + } + resultFile.close(); + + if(verbose) + printf("End of clustering \n"); + + /* End of clustering */ + + + RecordFileSt_close(recordFile); + indexCM.close(); + + freeProfileData(); + + + return Result; +} + + CAresultSt +ClusterAccuracy(vector<vector<int> > Clustered, int CMcount, char *congFileName,int verbose) +{ + vector<vector<int> > v1(16); + vector<int> g1; + int i,j,k,group,Gcount; + int GC; + int total; + int SHRtotal; + int Ocount,FalsePOS,FalseNEG; + char dummy[32]; + FILE *fp; + int Answer,Test; + CAresultSt Result; + + fp=fopen(congFileName,"r"); + + Gcount = 0; + for(i=16;i<32;i++) + { + fscanf(fp,"%s %d\n",&dummy,&group); + + GC=0; + count(g1.begin(),g1.end(),group,GC); + + if(GC==0) + { + g1.push_back(group); + v1[Gcount].push_back(i); + Gcount++; + } + else + { + v1[Gcount-1].push_back(i); + } + } + fclose(fp); + + + if(verbose) + { + cout << "# of groups (Answer) = " << Gcount << endl; + + for(i=0;i<Gcount;i++) + { + copy(v1[i].begin(),v1[i].end(),ostream_iterator<int>(cout," ")); + cout << endl; + + } + } + + /* Probability of any error */ + Ocount=0; + FalsePOS=0; + FalseNEG=0; + total=0; + SHRtotal=0; + + for(i=16;i<31;i++) + { + for(j=i+1;j<32;j++) + { + total++; + Answer=0; + Test=0; + + for(k=0;k<Gcount;k++) + { + if(find(v1[k].begin(),v1[k].end(),i) < v1[k].end()) + { + if(find(v1[k].begin(),v1[k].end(),j) < v1[k].end()) + { + Answer = 1; + SHRtotal++; + } + break; + } + } + + for(k=0;k<CMcount;k++) + { + if(find(Clustered[k].begin(),Clustered[k].end(),i) < Clustered[k].end()) + { + if(find(Clustered[k].begin(),Clustered[k].end(),j) < Clustered[k].end()) + { + Test = 1; + } + break; + } + } + + //printf("(%d,%d) Answer = %d Test = %d \n",i,j,Answer,Test); + if(Answer ==1 && Test == 0) + { + FalseNEG++; + } + else if(Answer == 0 && Test == 1) + { + FalsePOS++; + } + else + { + Ocount++; + } + } + } + + if(verbose) + printf(" Correct = %d/%d FalsePOS = %d/%d FalseNEG = %d/%d SharedTotal = %d \n",Ocount,total,FalsePOS,total-SHRtotal,FalseNEG,SHRtotal,SHRtotal); + + Result.Ocount = Ocount/(total*1.0); + Result.FalsePOS = FalsePOS/((total-SHRtotal)*1.0); + Result.FalseNEG = FalseNEG/(SHRtotal*1.0); + + return Result ; +} + +/* + * Profiling + */ + + static void +initializeProfileData(void) +{ + ST1 = HnTimesSt_allocate(); + ST2 = HnTimesSt_allocate(); + ET1 = HnTimesSt_allocate(); + ET2 = HnTimesSt_allocate(); + RT1 = HnTimesSt_allocate(); + RT2 = HnTimesSt_allocate(); +} + + static void +startProfiling(int i) +{ + if(i==1) + HnTimesSt_setCurrentTimes(ST1); + else + HnTimesSt_setCurrentTimes(ST2); +} + + static void +endProfiling(int i) +{ + if(i==1) + { + + HnTimesSt_setCurrentTimes(ET1); + HnTimesSt_subtract(RT1,ET1,ST1); + } + else + { + HnTimesSt_setCurrentTimes(ET2); + HnTimesSt_subtract(RT2,ET2,ST2); + } +} + + static void +printProfileData(int i) +{ + if(i==1) + HnTimesSt_print(RT1); + else + HnTimesSt_print(RT2); +} + + static void +freeProfileData(void) +{ + HnTimesSt_free(ST1); + HnTimesSt_free(ET1); + HnTimesSt_free(RT1); + + HnTimesSt_free(ST2); + HnTimesSt_free(ET2); + HnTimesSt_free(RT2); +} -- GitLab