Commit 9b361140 authored by Dan Gebhardt's avatar Dan Gebhardt
Browse files

Periodic checkin. Some bug fixes, feature additions, etc.. things seem to

be working well right now.
parent 66788d1b
......@@ -141,7 +141,7 @@ while(1)
outputErrors();
sleep( 60 );
sleep( 60*5 );
$f_firsttime = 0;
}
......@@ -195,7 +195,7 @@ sub choosenodes
if( $bestnode ne "NONE" &&
!defined $intersitenodes{$site} )
{
print "SECTION 1: adding $bestnode at $site\n";
print time()." SECTION 1: adding $bestnode at $site\n";
# ** This section handles when a site is seen for the 1st time
#set new node to represent this site
......@@ -205,7 +205,7 @@ sub choosenodes
}
elsif( ("NONE" eq $bestnode) && defined $intersitenodes{$site} )
{
print "SECTION 2: removing tests to $site / ".
print time()." SECTION 2: removing tests to $site / ".
"$intersitenodes{$site} \n";
# ** This section handles when a site has no nodes available
......@@ -221,7 +221,7 @@ sub choosenodes
#&& isnodeinconstrset($bestnode)
)
{
print "SECTION 3: node change at $site from ".
print time()." SECTION 3: node change at $site from ".
"$intersitenodes{$site} to $bestnode\n";
# ** This section handles when a "bestnode" at a site changes
......@@ -341,18 +341,9 @@ sub choosebestnode($)
$flag_siteIncluded = 1;
# print "SETTING SITEINCLUDED=1 for $node at $site\n";
}
#this command acts like a bgmon "ping" - used to
#determine if bgmon running correctly
=pod
my %cmd = ( expid => $expid,
cmdtype => "EDIT",
dstnode => "NOADDR",
testtype => "bw",
testper => 0,
limitTime=> 0 );
=cut
if( $allnodes{$node}{free} == 1 && isnodeinconstrset($node) ){
print "choosebestnode: considering $node\n";
# print "choosebestnode: considering $node\n";
# print "choosing best node for site $site\n";
#first time thru loop...
if( $bestnode eq "NONE" ){
......@@ -492,7 +483,7 @@ sub stoppairtest($$)
foreach my $testtype (@testtypes){
edittest($srcnode, $destnode, 0, $testtype);
}
print "stopping pair tests from $srcnode to $destnode\n";
# print "stopping pair tests from $srcnode to $destnode\n";
}
#
......
......@@ -39,7 +39,10 @@ OPERATION NUANCES:
- If a testing process abnormally exits with a value not 0 (say, iperf dies),
then the test is rescheduled to be run at a future time. This time is
the normal period times the %TEST_FAIL_RETRY ratio. (TODO: add error
reporting here.)
reporting here.) CHANGED: 8/3/06 This behavior has changed to the following:
- regardless of the return value, parsing the results continues as normal,
with the parsing function dealing with setting the error codes in the
result.
=cut
use Getopt::Std;
......@@ -63,12 +66,16 @@ my $pollPer = 0.1; #number of seconds to sleep between poll loops
my %MAX_SIMU_TESTS = (latency => "10",
bw => "1");
my $cacheSendRate = 5; #number of cached results per second
my %cacheLastSentAt; #{indx} -> tstamp of last sent cache result
my $cacheIndxWaitPer = 5; #wait x sec between cache send attempts of same indx
my $iperfduration = 5; #length of each iperf test in seconds
my $iperftimeout = 30; #kill an iperf process lasting longer than this.
# percentage of testing period to wait after a test process abnormally exits
# note: 0.1 = 10%
=pod
my %TEST_FAIL_RETRY= (latency => 0.3,
bw => 0.5);
=cut
my $rcvBufferSize = 2048; #max size for an incoming UDP message
my %outsockets = (); #sockets keyed by dest node
#MARK_RELIABLE
......@@ -87,7 +94,7 @@ my %testevents = ();
my %waitq = ( latency => [],
bw => [] );
my %runningtestPIDs = (); # maps from PID's -> array of destnode, bw
#*****************************************
......@@ -295,89 +302,113 @@ while (1) {
}
}
#iterate through all event structures
for my $destaddr ( keys %testevents ) {
for my $testtype ( keys %{ $testevents{$destaddr} } ){
#iterate through all outstanding running tests
#mark flags of finished tests
foreach my $pid (keys %runningtestPIDs){
use POSIX ":sys_wait_h";
my $testev = \%{ $testevents{$destaddr}{$testtype} };
#get nodeid and testtype
my $destaddr = $runningtestPIDs{$pid}[0];
my $testtype = $runningtestPIDs{$pid}[1];
my $testev = \%{ $testevents{$destaddr}{$testtype} };
#mark flags of finished tests
#check if process is running
my $pid = $testev->{"pid"};
if( $pid != 0 ){
use POSIX ":sys_wait_h";
my $kid = waitpid( $pid, &WNOHANG );
if( $kid != 0 )
{
if( $? == 0 ){
#process finished, so mark it's "finished" flag
$testev->{"flag_finished"} = 1;
$testev->{"pid"} = 0;
my $kid = waitpid( $pid, &WNOHANG );
if( $kid == $pid )
{
# process is dead
$testev->{"flag_finished"} = 1;
$testev->{"pid"} = 0;
=pod #NOT USED ANYMORE
if( $testtype eq "bw" && $? != 0 ){
# IPERF SPECIFIC... exited with error
#process exited abnormally
#reset pid
$testev->{"pid"} = 0;
#schedule next test at a % of a normal period from now
my $nextrun = time_all() +
$testev->{"testper"} *
$TEST_FAIL_RETRY{$testtype};
$testev->{"timeOfNextRun"} = $nextrun;
#delete tmp filename
my $filename = createtmpfilename($destaddr, $testtype);
unlink($filename) or warn "can't delete temp file";
# TODO - send errors to find down paths
my %results =
("sourceaddr" => $thismonaddr,
"destaddr" => $destaddr,
"testtype" => $testtype,
"result" => $ERRID{unknown},
"tstamp" => $testev->{"tstamp"},
"magic" => "$magic",
);
#save result to local DB
my $index = saveTestToLocalDB(\%results);
#send result to remote DB
sendResults(\%results, $index);
print time_all().
" $testev->{testtype} proc $pid exited abnormally\n";
}
else{
#process finished, so mark it's "finished" flag
$testev->{"flag_finished"} = 1;
$testev->{"pid"} = 0;
# print "test $destaddr / $testtype finished\n";
}else{
#process exited abnormally
#reset pid
$testev->{"pid"} = 0;
#schedule next test at a % of a normal period from now
my $nextrun = time_all() +
$testev->{"testper"} *
$TEST_FAIL_RETRY{$testtype};
$testev->{"timeOfNextRun"} = $nextrun;
#delete tmp filename
my $filename = createtmpfilename($destaddr, $testtype);
unlink($filename) or warn "can't delete temp file";
# NEW: (TODO) - send errors to find down paths
my %results =
("sourceaddr" => $thismonaddr,
"destaddr" => $destaddr,
"testtype" => $testtype,
"result" => $ERRID{unknown},
"tstamp" => $testev->{"tstamp"},
"magic" => "$magic",
);
# sendResults(\%results, 0); #0 is for a generic index
print time_all()." bw proc $pid exited abnormally\n";
}
}
}
=cut
#If this process is bandwidth, kill it if it has
# been running too long (iperf has a looong timeout)
elsif( $testtype eq "bw" &&
time_all() >
$testev->{"tstamp"} +
$iperftimeout )
{
kill 'TERM', $pid;
print time_all()." bw timeout: killed $destaddr, ".
"pid=$pid\n";
# NEW: (not tested below) - send an "outage" result
$testev->{"pid"} = 0;
$testev->{"flag_scheduled"} = 0;
#delete tmp filename
my $filename = createtmpfilename($destaddr, $testtype);
unlink($filename) or warn "can't delete temp file";
my %results =
("sourceaddr" => $thismonaddr,
"destaddr" => $destaddr,
"testtype" => $testtype,
"result" => $ERRID{timeout},
"tstamp" => $testev->{"tstamp"},
"magic" => "$magic",
);
sendResults(\%results, 0); #0 is for a generic index
}
delete $runningtestPIDs{$pid};
}
#If this process is bandwidth, kill it if it has
# been running too long (iperf has a looong timeout)
elsif( $testtype eq "bw" &&
time_all() >
$testev->{"tstamp"} +
$iperftimeout )
{
kill 'TERM', $pid;
delete $runningtestPIDs{$pid};
print time_all()." bw timeout: killed $destaddr, ".
"pid=$pid\n";
}
$testev->{"pid"} = 0;
$testev->{"flag_scheduled"} = 0;
#delete tmp filename
my $filename = createtmpfilename($destaddr, $testtype);
unlink($filename) or warn "can't delete temp file";
my %results =
("sourceaddr" => $thismonaddr,
"destaddr" => $destaddr,
"testtype" => $testtype,
"result" => $ERRID{timeout},
"tstamp" => $testev->{"tstamp"},
"magic" => "$magic",
);
#save result to local DB
my $index = saveTestToLocalDB(\%results);
#send result to remote DB
sendResults(\%results, $index);
}
}
#iterate through all event structures
for my $destaddr ( keys %testevents ) {
for my $testtype ( keys %{ $testevents{$destaddr} } ){
my $testev = \%{ $testevents{$destaddr}{$testtype} };
#check for finished events
if( $testev->{"flag_finished"} == 1 ){
#read raw results from temp file
my $filename = createtmpfilename($destaddr, $testtype);
open FILE, "< $filename"
or die "can't open file $filename";
open FILE, "< $filename"
or warn "can't open file $filename";
my @raw_lines = <FILE>;
my $raw;
foreach my $line (@raw_lines){
......@@ -460,7 +491,9 @@ while (1) {
}
my $iterSinceLastRun = 0; #handles when cacheSendRate < 1/pollPer
my $iterSinceLastRun = 0; #when cacheSendRate < 1/pollPer, only send old
# results a fraction of the number of calls to
# sendOldResults
sub sendOldResults()
{
#
......@@ -481,7 +514,7 @@ sub sendOldResults()
};
my @ids = keys %db;
$iterSinceLastRun++;
while( ($count < $maxcount && $count < scalar(@ids))
|| ($iterSinceLastRun > (1/($pollPer*$cacheSendRate))) )
......@@ -491,6 +524,10 @@ sub sendOldResults()
}
my $index = $ids[int( rand scalar(@ids) )];
my %results = %{ deserialize_hash($db{$index}) };
next if( (defined $cacheLastSentAt{$index}) &&
time() - $cacheLastSentAt{$index} <
$cacheIndxWaitPer );
if (!exists($results{"magic"}) || $results{"magic"} ne $magic) {
# Hmm, something went wrong!
print "Old results for index $index are scrogged; deleting!\n";
......@@ -503,9 +540,11 @@ sub sendOldResults()
$iterSinceLastRun = 0;
#don't send recently completed tests
if( time() - $results{ts_finished} < 10 ){ next; }
next if( time() - $results{ts_finished} < 10 );
print "sending old result: $index\n";
sendResults(\%results, $index);
# TODO: set last tstamp here in cacheLastSentAt
$cacheLastSentAt{$index} = time();
}
......@@ -569,7 +608,8 @@ sub spawnTest($$)
#save child pid in test event
$testevents{$linkdest}{$testtype}{"pid"} = $pid;
$testevents{$linkdest}{$testtype}{"tstamp"} = time_all();
$runningtestPIDs{$pid} = [$linkdest, $testtype];
}elsif( defined $pid ){
#child
#exec 'ls','-l' or die "can't exec: $!\n";
......@@ -582,8 +622,10 @@ sub spawnTest($$)
if( $testtype eq "latency" ){
#command line for "LATENCY TEST"
# print "##########latTest\n";
# one ping, with timeout of 60 sec
exec "ping -c 1 -t 60 $linkdest >$filename"
#one ping, using fping (2 total attempts, 10 sec timeout between)
exec "sudo $workingdir".
"fping -t10000 -s -r1 $linkdest >& $filename"
# exec "ping -c 1 -w 60 $linkdest >$filename"
or die "can't exec: $!\n";
}elsif( $testtype eq "bw" ){
#command line for "BANDWIDTH TEST"
......@@ -634,21 +676,71 @@ sub parsedata($$)
#############################
#latency test
if( $type eq "latency" ){
if( /time=(.*) ms/ ){
# for fping results parsing, using -s option.
# Note, these must be in this order!
# TODO: get these regex's to work as OR logic...
=pod
if(
/ICMP Network Unreachable/ ||
/ICMP Host Unreachable from/ ||
/ICMP Protocol Unreachable/ ||
/ICMP Port Unreachable/ ||
/ICMP Unreachable/
=cut
if( /^ICMP / )
{
$parsed = $ERRID{ICMPunreachable};
}elsif( /address not found/ ){
$parsed = $ERRID{unknownhost};
}elsif( /2 timeouts/ ){
$parsed = $ERRID{timeout};
}elsif( /[\s]+([\S]*) ms \(avg round trip time\)/ ){
$parsed = "$1" if( $1 ne "0.00" );
}else{
$parsed = $ERRID{unknown};
}
=pod
#this section of parsing is for linux ping hosts.
if( /100\% packet loss/ ){
$parsed = $ERRID{timeout};
}elsif( /Time to live exceeded/ ){ # ?
$parsed = $ERRID{ttlexceed};
}elsif( /unknown host/ ){
$parsed = $ERRID{unknownhost};
}elsif( /time=(.*)\s?ms/ ){
$parsed = "$1";
}elsif( /0 packets received/ ){
}else{
$parsed = $ERRID{unknown};
}
# This section of parsing is a bit wrong.. for freebsd hosts??
if( /0 packets received/ ){
$parsed = $ERRID{timeout};
}elsif( /Time to live exceeded/ ){
$parsed = $ERRID{ttlexceed};
}elsif( /time=(.*)\s?ms/ ){
$parsed = "$1";
}else{
$parsed = $ERRID{unknown};
}
=cut
}elsif( $type eq "bw" ){
/\s+(\S*)\s+([MK])bits\/sec/;
$parsed = $1;
if( $2 eq "M" ){
$parsed *= 1000;
if( /connect failed: Connection timed out/ ){
# this one shouldn't happen, if the timeout check done by
# bgmon is set low enough.
$parsed = $ERRID{timeout};
}elsif( /write1 failed:/ ){
$parsed = $ERRID{iperfHostUnreachable};
}elsif( /error: Name or service not known/ ){
$parsed = $ERRID{unknownhost};
}elsif( /\s+(\S*)\s+([MK])bits\/sec/ ){
$parsed = $1;
if( $2 eq "M" ){
$parsed *= 1000;
}
}else{
$parsed = $ERRID{unknown};
}
# print "parsed=$parsed\n";
}
......@@ -683,7 +775,7 @@ sub printTimeEvents {
sub saveTestToLocalDB($)
{
my ($results) = @_;
my %db;
my $filename = createDBfilename();
tie( %db, "DB_File", $filename ) or
......@@ -792,3 +884,4 @@ sub isMsgValid(\%)
}
return 1;
}
......@@ -10,7 +10,6 @@ use IO::Select;
require Exporter;
@ISA = "Exporter";
@EXPORT = qw (
%deadnodes
%ERRID
......@@ -26,16 +25,24 @@ require Exporter;
);
# These errors define specifics of when a measurement value cannot be
# reported due to some error in the network or at the remote end.
our %ERRID;
$ERRID{timeout} = -1;
$ERRID{ttlexceed} = -2; # was an error for "ping", but is not seen in fping.
$ERRID{unknown} = -3; # general error, which cannot be classified into others
$ERRID{unknownhost} = -4;
$ERRID{ICMPunreachable} = -5; #used for all ICMP errors (see fping.c for strs)
$ERRID{iperfHostUnreachable} = -6; #for iperf: error flagged by: write1 failed: Broken pipe
our %deadnodes;
my $socket;
my $sel = IO::Select->new();
my $port;
my $expid;
my %deadnodes;
my %ERRID;
$ERRID{timeout} = -1;
$ERRID{ttlexceed} = -2;
$ERRID{unknown} = -3;
sub setcmdport($)
......@@ -114,12 +121,10 @@ sub sendcmd($$)
PeerAddr => $node,
Timeout => 1 );
if( defined $socket ){
$sel->add($socket);
# $sel->add($socket);
print $socket "$sercmd\n";
#todo: wait for ack;
# timeout period?
$sel->add($socket);
my ($ready) = $sel->can_read(1);
my ($ready) = $sel->can_read(2); #timeout (seconds) (?)
if( defined($ready) && $ready eq $socket ){
my $ack = <$ready>;
# chomp $ack;
......
......@@ -43,22 +43,24 @@ my $duplicateKillThresh = 5; #kill node if this many duplicates occur per index
$| = 1;
sub usage {
print "Usage: $0 [-p receiveport] [-a sendport] [-e pid/eid] [-d] [-i]\n";
print "Usage: $0 [-p receiveport] [-a sendport] [-e pid/eid]".
" [-d debuglevel] [-i]\n";
return 1;
}
my $debug = 0;
my $impotent = 0;
my $debug = 0;
my ($port, $sendport, $expid);
my %opt = ();
if (!getopts("p:a:e:dih", \%opt)) {
if (!getopts("p:a:e:d:ih", \%opt)) {
exit &usage;
}
if ($opt{p}) { $port = $opt{p}; } else { $port = 5051; }
if ($opt{a}) { $sendport = $opt{a}; } else { $sendport = 5050; }
if ($opt{h}) { exit &usage; }
if ($opt{e}) { $expid = $opt{e}; } else { $expid = "none"; }
if ($opt{d}) { $debug = 1; }
if ($opt{d}) { $debug = $opt{d}; } else { $debug = 0; }
if ($opt{i}) { $impotent = 1; }
if (@ARGV !=0) { exit &usage; }
......@@ -133,13 +135,13 @@ sub handleincomingmsgs()
return;
}
print "\n";
print "\n" if( $debug > 1 );
print("linksrc=$linksrc\n".
"linkdest=$linkdest\n".
"testtype =$testtype\n".
"result=$result\n".
"index=$index\n".
"tstamp=$tstamp\n");
"tstamp=$tstamp\n") if( $debug > 1 );
if( defined $linksrc ){
my $socket_snd =
......@@ -153,7 +155,7 @@ sub handleincomingmsgs()
if( defined %ack && defined $socket_snd ){
my $ack_serial = serialize_hash( \%ack );
$socket_snd->send($ack_serial);
print "**SENT ACK**\n";
print "**SENT ACK**\n" if( $debug > 1 );
if( !defined $lasttimestamp{$linksrc}{$index} ||
$tstamp ne $lasttimestamp{$linksrc}{$index} )
......@@ -163,16 +165,19 @@ sub handleincomingmsgs()
testtype => $testtype,
result => $result,
tstamp => $tstamp );
#decrement duplicatecnt for corresponding result index
#clear duplicatecnt for corresponding result index
if( defined($duplicatecnt{$linksrc}{$index}) ){
$duplicatecnt{$linksrc}{$index}--
if( $duplicatecnt{$linksrc}{$index} > 0 );
}else{
$duplicatecnt{$linksrc}{$index} = 0;
delete $duplicatecnt{$linksrc}{$index};
}
}else{
print "++++++duplicate data\n";
print("++++++duplicate data\n".
"linksrc=$linksrc\n".
"linkdest=$linkdest\n".
"testtype =$testtype\n".
"result=$result\n".
"index=$index\n".
"tstamp=$tstamp\n") if( $debug > 0);
#increment duplicatecnt for this src and index number
if( defined($duplicatecnt{$linksrc}{$index}) ){
$duplicatecnt{$linksrc}{$index}++;
......@@ -181,7 +186,8 @@ sub handleincomingmsgs()
> $duplicateKillThresh )
{
killnode($linksrc);
print "KILLING OFF BGMON at $linksrc\n";
print "KILLING OFF BGMON at $linksrc".
" for index $index\n" if( $debug > 0 );
delete $duplicatecnt{$linksrc};
}
}else{
......@@ -297,7 +303,7 @@ sub SendBatchedInserts()
DBQueryWarn($insertions)
if (!$impotent);
print "$insertions\n"
if ($debug);
if ($debug > 2);
$lastinsert = time();
}
$batchsize = 0;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment