Commit d683483b authored by Pramod R Sanaga's avatar Pramod R Sanaga

Changed the log-processing perl script to ignore paths with low variance. All such

paths will now be placed in individual & isolated clusters.

Misc. changes in the client and server modules and minor fixes to sleep more often
using select instead of executing a busy loop.
parent 269136e2
......@@ -26,7 +26,7 @@ $newProjName = $ARGV[2];
$newExpName = $ARGV[3];
$initialConditionsFilename = $ARGV[4];
$logsDir = "/tmp/clusterLogs2";
$logsDir = "/tmp/testLogs";
# Get the initial conditions.
......@@ -149,14 +149,8 @@ foreach $sourceName (readdir(logsDirHandle))
my @scriptOutput;
@scriptOutput = `$waveletScript`;
#print "@scriptOutput\n";
# print "Index = $nodeNameMapping{$destOne}\n";
$denoisedDelays[$nodeNameMapping{$destOne}] = [ @scriptOutput ];
#foreach $testIter (@{$denoisedDelays[$nodeNameMapping{$destOne}]})
#{
# print "$testIter";
#}
}
}
closedir(sourceDirHandle);
......@@ -186,6 +180,7 @@ foreach $sourceName (readdir(logsDirHandle))
push(@newEquivClass, $i);
push(@equivClasses, [@newEquivClass]);
print "$sourceName: WARNING: Creating a new equiv class/cluster for $nodeNumberMapping{$i} due to lack of samples($#scriptOutput)\n";
}
else
{
......@@ -216,27 +211,23 @@ foreach $sourceName (readdir(logsDirHandle))
}
$denominator = sqrt($denominator);
# Exclude paths with low-variance.
if($denominator < 25)
{
my @newEquivClass = ();
push(@newEquivClass, $i);
push(@equivClasses, [@newEquivClass]);
print "$sourceName: WARNING: Creating a new equiv class/cluster for $nodeNumberMapping{$i} due to low variance of samples($denominator)\n";
next;
}
foreach $delayValue (@delayValueArray)
{
$delayValue = ($delayValue - $avgValue)/$denominator;
print RECORDFILE "$delayValue:";
}
#foreach $delayValue (@scriptOutput)
#{
# chomp($delayValue);
#
# if($counter < 128)
# {
# print RECORDFILE "$delayValue:";
# $counter++;
# }
# else
# {
# last;
# }
# }
print RECORDFILE "$i\n";
}
}
......
......@@ -82,10 +82,16 @@ void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr,
memcpy(&oneWayDelay, ( long long *)(dataPtr + 1 + sizeof(short int) + sizeof(unsigned long long)), sizeof(long long));
ostringstream tmpStrStream;
tmpStrStream << origTimestamp;
cout << " Onewaydelay for the ACK = " << oneWayDelay << ", host Index = "<< hostIndex << "\n";
if(actualTimeMaps[hostIndex].count(tmpStrStream.str()) == 0)
{
printf("ERROR: Original timestamp = %llu, and hostIndex = %d was not found in the packet Hash\n", origTimestamp, hostIndex);
delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] = oneWayDelay;
}
else
delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] = oneWayDelay - ( actualTimeMaps[hostIndex][tmpStrStream.str()] - origTimestamp);
cout << " Onewaydelay for the ACK = " << oneWayDelay << " recorded = "<< delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] <<", host Index = "<< hostIndex << "\n";
cout <<" Orig timestamp was "<< tmpStrStream.str() << " , actual time = "<< actualTimeMaps[hostIndex][tmpStrStream.str()]<<"\n";
cout <<"Packet time map Index = "<< packetTimeMaps[hostIndex][origTimestamp] << ", host index = " << hostIndex << " \n";
delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] = oneWayDelay - ( actualTimeMaps[hostIndex][tmpStrStream.str()] - origTimestamp);
if(oneWayDelay < 50000 && oneWayDelay > -50000 && (delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] > 100000 || delaySequenceArray[hostIndex][packetTimeMaps[hostIndex][origTimestamp]] < -100000 ) )
{
......@@ -93,10 +99,7 @@ void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr,
}
if(actualTimeMaps[hostIndex].count(tmpStrStream.str()) == 0)
{
printf("ERROR: Original timestamp was not found in the packet Hash\n\n");
}
cout <<"Packet time map Index = "<< packetTimeMaps[hostIndex][origTimestamp] << ", host index = " << hostIndex << " \n\n\n";
}
else
{
......@@ -185,7 +188,7 @@ void pcapCallback(u_char *user, const struct pcap_pkthdr *pcap_info, const u_cha
void init_pcap( char *ipAddress)
{
char interface[] = "eth0";
char interface[] = "eth1";
struct bpf_program bpfProg;
char errBuf[PCAP_ERRBUF_SIZE];
char filter[128] = " udp ";
......@@ -226,7 +229,7 @@ int main(int argc, char **argv)
string localHostName = argv[3];
int timeout = 2000; // 1 second
int probeRate = 10; // Hz
int probeRate = 20; // Hz
int probeDuration = 15000; // 15 seconds
vector<string> hostList;
......@@ -307,6 +310,9 @@ int main(int argc, char **argv)
unsigned long long lastSentTime = startTime;
bool endProbesFlag = false;
bool readTimeoutFlag = false;
bool disableWritesFlag = false;
bool firstWriteFlag = true;
unsigned long timeoutValue = 0;
while ((( lastSentTime - startTime) < probeDuration) || !(readTimeoutFlag))
{
......@@ -314,27 +320,53 @@ int main(int argc, char **argv)
// Stop waiting for probe replies after a timeout - calculated from the
// time the last probe was sent out.
if (endProbesFlag && ( (getTimeMilli() - lastSentTime) > timeout))
{
readTimeoutFlag = 1;
}
// Stop sending probes after the given probe duration.
if (!(endProbesFlag) && (lastSentTime - startTime) > probeDuration)
{
endProbesFlag = 1;
disableWritesFlag = true;
}
if (endProbesFlag)
usleep(timeout*100);
{
timeoutValue = timeout*100;
}
else
{
if (!(getTimeMilli() - lastSentTime > targetSleepTime))
{
timeoutValue = ( targetSleepTime - (getTimeMilli() - lastSentTime) )*1000 ;
if(!firstWriteFlag)
disableWritesFlag = true;
}
else
{
timeoutValue = 0;
disableWritesFlag = false;
}
}
fd_set socketReadSet, socketWriteSet;
FD_ZERO(&socketReadSet);
FD_SET(clientSocket,&socketReadSet);
FD_SET(pcapfd,&socketReadSet);
FD_ZERO(&socketWriteSet);
FD_SET(clientSocket,&socketWriteSet);
if(!disableWritesFlag)
FD_SET(clientSocket,&socketWriteSet);
struct timeval timeoutStruct;
timeoutStruct.tv_sec = 0;
timeoutStruct.tv_usec = 0;
if (!endProbesFlag)
timeoutStruct.tv_usec = timeoutValue % 1000000;
timeoutStruct.tv_sec = timeoutValue / 1000000;
if(!disableWritesFlag)
{
select(clientSocket+pcapfd+1,&socketReadSet,&socketWriteSet,0,&timeoutStruct);
}
......@@ -347,29 +379,7 @@ int main(int argc, char **argv)
{
pcap_dispatch(pcapDescriptor, 10000, pcapCallback, NULL);
}
if (!readTimeoutFlag)
{
if (FD_ISSET(clientSocket,&socketReadSet) != 0)
{
while (true)
{
int flags = 0;
if( recvfrom(clientSocket, msg, MAX_MSG, flags,
(struct sockaddr *) &servAddr, &echoLen) != -1)
{
while(pcap_dispatch(pcapDescriptor, 1, pcapCallback, NULL) != 0);
}
else
{
if(endProbesFlag)
usleep(timeout*100);
break;
}
}
}
}
if (!endProbesFlag)
if (!endProbesFlag && !disableWritesFlag)
{
if (FD_ISSET(clientSocket,&socketWriteSet) != 0)
{
......@@ -399,27 +409,48 @@ int main(int argc, char **argv)
}
while(pcap_dispatch(pcapDescriptor, -1, pcapCallback, NULL) != 0);
packetCounter++;
lastSentTime = getTimeMilli();
// Sleep for 99 msec for a 10Hz target probing rate.
usleep(targetSleepTime*1000);
firstWriteFlag = false;
}
else
}
if (!readTimeoutFlag)
{
if (FD_ISSET(clientSocket,&socketReadSet) != 0)
{
if (!(getTimeMilli() - lastSentTime > targetSleepTime))
while (true)
{
cout << " About to sleep for " << ( targetSleepTime - (getTimeMilli() - lastSentTime) )*1000 <<"\n";
usleep( ( targetSleepTime - (getTimeMilli() - lastSentTime) )*1000) ;
int flags = 0;
if( recvfrom(clientSocket, msg, MAX_MSG, flags,(struct sockaddr *) &servAddr, &echoLen) == -1)
{
break;
}
}
}
}
}
//////////////////////////////
usleep(20000000);
fd_set socketReadSet;
FD_ZERO(&socketReadSet);
FD_SET(pcapfd,&socketReadSet);
unsigned long long waitStartTime = getTimeMilli();
while( (getTimeMilli() - waitStartTime) < 20000)
{
struct timeval timeoutStruct;
while(pcap_dispatch(pcapDescriptor, 1, pcapCallback, NULL) != 0);
timeoutStruct.tv_sec = 5;
timeoutStruct.tv_usec = 0;
select(pcapfd+1,&socketReadSet,0,0,&timeoutStruct);
if (FD_ISSET(pcapfd,&socketReadSet) )
{
pcap_dispatch(pcapDescriptor, 10000, pcapCallback, NULL);
}
}
for (int i = 0; i < numHosts; i++)
{
......
......@@ -17,12 +17,15 @@
#include <netinet/if_ether.h>
#include <net/ethernet.h>
#include <netinet/ether.h>
#include <fcntl.h>
#define LOCAL_SERVER_PORT 19835
#define MAX_MSG 1024
pcap_t *pcapDescriptor = NULL;
long receivedPackets = 0;
long processedPackets = 0;
char appAck[60];
unsigned long long milliSec = 0;
......@@ -49,6 +52,10 @@ void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr,
u_char *dataPtr = udpPacketStart + 8;
unsigned char packetType = *(unsigned char *)(dataPtr);
struct sockaddr_in returnAddr;
processedPackets++;
printf("Received packets = %d, processed = %d\n", receivedPackets, processedPackets);
if(packetType == '0')// This is a udp data packet arriving here. Send an
// application level acknowledgement packet for it.
......@@ -59,13 +66,17 @@ void handleUDP(struct pcap_pkthdr const *pcap_info, struct udphdr const *udpHdr,
unsigned long long sendTimestamp = *(unsigned long long *)(dataPtr + sizeof(short int) + 1);
long long oneWayDelay = recvTimestamp - sendTimestamp;
returnAddr.sin_family = AF_INET;
memcpy((char *) &returnAddr.sin_addr, (char *)&(ipPacket->ip_src), sizeof(struct in_addr));
returnAddr.sin_port = udpHdr->source;
appAck[0] = '1';
memcpy(&appAck[1] ,(dataPtr + 1), sizeof(short int));
memcpy(&appAck[1 + sizeof(short int)], &sendTimestamp, sizeof(unsigned long long));
memcpy(&appAck[1 + + sizeof(short int) + sizeof(unsigned long long)], &oneWayDelay, sizeof(long long));
cout << "Sending app level ack to "<< inet_ntoa(cliAddr.sin_addr) << ",at " << sendTimestamp << " , recvtimestamp = "<<recvTimestamp<< ", delay = "<< oneWayDelay << endl;
cout << "Sending app level ack to "<< inet_ntoa(returnAddr.sin_addr) << ",at " << sendTimestamp << " , recvtimestamp = "<<recvTimestamp<< ", delay = "<< oneWayDelay << endl;
sendto(sd,appAck,1 + + sizeof(short int) + 2*sizeof(long long),flags,(struct sockaddr *)&cliAddr,cliLen);
sendto(sd,appAck,1 + + sizeof(short int) + 2*sizeof(long long),flags,(struct sockaddr *)&returnAddr,cliLen);
}
else if(packetType == '1') // TODO:This is an udp ACK packet. If it is being sent
// out from this host, do nothing.
......@@ -161,7 +172,7 @@ void pcapCallback(u_char *user, const struct pcap_pkthdr *pcap_info, const u_cha
void init_pcap( char *ipAddress)
{
char interface[] = "eth0";
char interface[] = "eth1";
struct bpf_program bpfProg;
char errBuf[PCAP_ERRBUF_SIZE];
char filter[128] = " udp ";
......@@ -184,6 +195,7 @@ void init_pcap( char *ipAddress)
pcap_compile(pcapDescriptor, &bpfProg, filter, 1, netp);
pcap_setfilter(pcapDescriptor, &bpfProg);
pcap_setnonblock(pcapDescriptor, 1, errBuf);
}
......@@ -222,32 +234,57 @@ int main(int argc, char *argv[]) {
}
printf("%s: waiting for data on port UDP %u\n",
argv[0],LOCAL_SERVER_PORT);
argv[0],LOCAL_SERVER_PORT);
flags = 0;
init_pcap(inet_ntoa(servAddr.sin_addr));
int pcapfd = pcap_get_selectable_fd(pcapDescriptor);
init_pcap(inet_ntoa(servAddr.sin_addr));
flags = 0;
fcntl(sd, F_SETFL, flags | O_NONBLOCK);
fd_set socketReadSet;
/* server infinite loop */
while(1) {
memset(msg,0x0,MAX_MSG);
/* receive message */
cliLen = sizeof(cliAddr);
n = recvfrom(sd, msg, MAX_MSG, flags,
(struct sockaddr *) &cliAddr, &cliLen);
pcap_dispatch(pcapDescriptor, 1, pcapCallback, NULL);
if(n<0) {
printf("%s: cannot receive data \n",argv[0]);
continue;
}
FD_ZERO(&socketReadSet);
FD_SET(sd,&socketReadSet);
FD_SET(pcapfd,&socketReadSet);
struct timeval timeoutStruct;
timeoutStruct.tv_sec = 2;
timeoutStruct.tv_usec = 0;
select(sd+pcapfd+1,&socketReadSet,0,0,&timeoutStruct);
if (FD_ISSET(sd,&socketReadSet) != 0)
{
while(true)
{
memset(msg,0x0,MAX_MSG);
/* receive message */
cliLen = sizeof(cliAddr);
if( recvfrom(sd, msg, MAX_MSG, flags,(struct sockaddr *) &cliAddr, &cliLen) == -1)
break;
else
{
receivedPackets++;
}
}
}
if (FD_ISSET(pcapfd,&socketReadSet) )
pcap_dispatch(pcapDescriptor, 1000, pcapCallback, NULL);
if(n<0) {
printf("%s: cannot receive data \n",argv[0]);
continue;
}
}
return 0;
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment