Commit 75842e6b authored by Dan Gebhardt's avatar Dan Gebhardt
Browse files

- ACK's timestamped. See bgmon.pl log for details.

- if a node cannot receive ACK's due to firewalled UDP, oprecv will kill
  off bgmon on that node. To discover this, opsrecv detects when a node
  sends many duplicates of the same data to which opsrecv has sent ACKs
  about in the past.
parent 1855869c
......@@ -13,6 +13,7 @@
use lib '/usr/testbed/lib';
use libtbdb;
use libwanetmon;
use Getopt::Std;
use strict;
use IO::Socket::INET;
......@@ -33,6 +34,8 @@ my $maxidletime= 5; # Seconds before forced insert.
my $lastinsert = 0; # Timestamp.
my %lasttimestamp; #prevents adding duplicate entries in DB
my %duplicatecnt; #counters keeping track of duplicate entries.
my $duplicateKillThresh = 5; #kill node if this many duplicates occur per index
#
# Turn off line buffering on output
......@@ -52,7 +55,7 @@ if (!getopts("p:a:e:dih", \%opt)) {
exit &usage;
}
if ($opt{p}) { $port = $opt{p}; } else { $port = 5051; }
if ($opt{p}) { $sendport = $opt{p}; } else { $sendport = 5050; }
if ($opt{a}) { $sendport = $opt{a}; } else { $sendport = 5050; }
if ($opt{h}) { exit &usage; }
if ($opt{e}) { $expid = $opt{e}; } else { $expid = "none"; }
if ($opt{d}) { $debug = 1; }
......@@ -84,12 +87,12 @@ my $socket_rcv = IO::Socket::INET->new( LocalPort => $port,
my $sel = IO::Select->new();
$sel->add($socket_rcv);
#############################################################################
# Note a difference from tbrecv.c - we don't yet have event_main() functional
# in perl, so we have to poll. (Nothing special about the select, it's just
# a wacky way to get usleep() )
#############################################################################
#main()
#
# MAIN LOOP
#
setcmdport(5052); #TODO: Parameterize this port number
setexpid($expid);
while (1) {
......@@ -111,22 +114,26 @@ exit(0);
sub handleincomingmsgs()
{
my $inmsg;
#check for pending received events
#check for pending received results
my @ready = $sel->can_read(1000);
foreach my $handle (@ready){
$socket_rcv->recv( $inmsg, 2048 );
# my %inhash = %{ Storable::thaw $inmsg};
my %inhash = %{ deserialize_hash( $inmsg )};
# foreach my $key (keys %inhash){
# print "key=$key\n";
# print "$key \t$inhash{$key}\n";
# }
my ($expid, $linksrc, $linkdest, $testtype, $result, $tstamp, $index)
my ($exp_in, $linksrc, $linkdest, $testtype, $result, $tstamp, $index)
= ($inhash{expid}, $inhash{linksrc}, $inhash{linkdest},
$inhash{testtype}, $inhash{result}, $inhash{tstamp},
$inhash{index});
# if incoming result is not of this expid, return
if( $exp_in ne $expid ){
return;
}
print "\n";
print("linksrc=$linksrc\n".
"linkdest=$linkdest\n".
"testtype =$testtype\n".
......@@ -141,12 +148,13 @@ sub handleincomingmsgs()
PeerAddr => "$linksrc");
my %ack = ( expid => $expid,
cmdtype => "ACK",
index => $index );
index => $index,
tstamp => $tstamp );
if( defined %ack && defined $socket_snd ){
my $ack_serial = serialize_hash( \%ack );
$socket_snd->send($ack_serial);
print "**SENT ACK**\n";
#=pod
if( !defined $lasttimestamp{$linksrc}{$index} ||
$tstamp ne $lasttimestamp{$linksrc}{$index} )
{
......@@ -155,11 +163,34 @@ sub handleincomingmsgs()
testtype => $testtype,
result => $result,
tstamp => $tstamp );
#decrement duplicatecnt for corresponding result index
if( defined($duplicatecnt{$linksrc}{$index}) ){
$duplicatecnt{$linksrc}{$index}--
if( $duplicatecnt{$linksrc}{$index} > 0 );
}else{
$duplicatecnt{$linksrc}{$index} = 0;
}
}else{
print "++++++duplicate data\n";
#increment duplicatecnt for this src and index number
if( defined($duplicatecnt{$linksrc}{$index}) ){
$duplicatecnt{$linksrc}{$index}++;
#kill off offending node, if > threshold
if( $duplicatecnt{$linksrc}{$index}
> $duplicateKillThresh )
{
killnode($linksrc);
print "KILLING OFF BGMON at $linksrc\n";
delete $duplicatecnt{$linksrc};
}
}else{
$duplicatecnt{$linksrc}{$index} = 1;
}
}
$lasttimestamp{$linksrc}{$index} = $tstamp;
#=cut
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment