Commit 1855869c authored by Dan Gebhardt's avatar Dan Gebhardt
Browse files

- use a single berkeley db file to store all measurement buffer entries.

- added parameterized send rate of buffered entries.
- ACK's are checked for a timestamp to prevent old ACKs of same index
  numbers to be interpreted as current.
- common routines have been moved to a .pm file.
- added 'die' API function to force bgmon to exit.
parent 8f313e23
......@@ -47,6 +47,8 @@ use strict;
use DB_File;
use IO::Socket::INET;
use IO::Select;
use libwanetmon;
$| = 1;
......@@ -57,9 +59,11 @@ sub usage {
#*****************************************
my $pollPer = 0.1; #number of seconds to sleep between poll loops
my %MAX_SIMU_TESTS = (latency => "10",
bw => "1");
my $iperfduration = 10;
my $cacheSendRate = 5; #number of cached results per second
my $iperfduration = 5; #length of each iperf test in seconds
my $iperftimeout = 30; #kill an iperf process lasting longer than this.
# percentage of testing period to wait after a test process abnormally exits
# note: 0.1 = 10%
......@@ -70,7 +74,6 @@ my %outsockets = (); #sockets keyed by dest node
#MARK_RELIABLE
# each result waiting to be acked has an id number and corresponding file
my $resultDBlimit = 100;
my %reslist = ();
my $magic = "0xDeAdBeAf";
my %testevents = ();
......@@ -84,10 +87,7 @@ my %testevents = ();
my %waitq = ( latency => [],
bw => [] );
my %ERRID;
$ERRID{timeout} = -1;
$ERRID{ttlexceed} = -2;
$ERRID{unknown} = -3;
#*****************************************
......@@ -100,7 +100,7 @@ getopts("s:a:p:e:d:i:h",\%opt);
my ($server, $cmdport, $cmdackport, $sendport, $ackport,$expid,$workingdir,$iperfport);
if ($opt{s}) { $server = $opt{s}; } else { $server = "ops"; }
if ($opt{c}) { $cmdport = $opt{c}; } else { $cmdport = 5060; }
if ($opt{c}) { $cmdport = $opt{c}; } else { $cmdport = 5052; }
if ($opt{a}) { $ackport = $opt{a}; } else { $ackport = 5050; }
if ($opt{p}) { $sendport = $opt{p}; } else { $sendport = 5051; }
if ($opt{e}) { $expid = $opt{e}; } else { $expid = "none"; }
......@@ -139,17 +139,6 @@ my $sel = IO::Select->new();
$sel->add($socket_cmd);
$sel->add($socket_ack);
#
# At startup, look for any old results that did not get acked. Add them to
# the reslist so they get resent below.
#
for (my $i = 0; $i < $resultDBlimit; $i++) {
if (-e createDBfilename($i)) {
$reslist{$i} = createDBfilename($i);
}
}
my $subtimer_reset = 50; # subtimer reaches 0 this many times thru poll loop
my $subtimer = $subtimer_reset; #decrement every poll-loop.
......@@ -160,45 +149,59 @@ sub handleincomingmsgs()
my $cmdHandle;
#check for pending received events
my @ready = $sel->can_read(0.1); #wait max of 0.1 sec. Don't want to
my @ready = $sel->can_read($pollPer); #wait max of 0.1 sec. Don't want to
#have 0 here, or CPU usage goes high
foreach my $handle (@ready){
my %sockIn;
if( $handle eq $socket_ack ){
$handle->recv( $inmsg, $rcvBufferSize );
%sockIn = %{ deserialize_hash($inmsg) };
}elsif( $handle eq $socket_cmd ){
my $cmdHandle = $handle->accept();
$inmsg = <$cmdHandle>;
chomp $inmsg;
%sockIn = %{ deserialize_hash($inmsg) };
if( !isMsgValid(\%sockIn) ){ return 0; }
print $cmdHandle "ACK\n";
close $cmdHandle;
}
# print "received msg: $inmsg\n";
my %sockIn = %{ deserialize_hash($inmsg) };
my $cmdtype = $sockIn{cmdtype};
if( !defined($cmdtype) ){
warn "bad message format\n";
return 0; #bad message
}
if( $sockIn{expid} ne $expid ){
return 0; #not addressed to this experiment
}
if( !isMsgValid(\%sockIn) ){ return 0; }
my $cmdtype = $sockIn{cmdtype};
if( $cmdtype eq "ACK" ){
my $index = $sockIn{index};
print time()." Ack for index $index. Deleting backup file\n";
if (exists($reslist{$index})) {
unlink($reslist{$index});
delete($reslist{$index});
}
my $tstamp =$sockIn{tstamp};
# delete cache entry if ACK tstamp matches the actual stored data
my %db;
my $filename = createDBfilename();
tie( %db, "DB_File", $filename) or
eval {
warn time()." cannot open db file";
return -1;
};
my %results = %{ deserialize_hash($db{$index}) };
print time()." Ack for index $index. Deleting cache entry\n";
delLocalDBEntry($index)
if( $tstamp eq $results{tstamp} );
}
elsif( $cmdtype eq "EDIT" ){
my $linkdest = $sockIn{dstnode};
my $testtype = $sockIn{testtype};
my $testper = $sockIn{testper};
$testevents{$linkdest}{$testtype}{"testper"} = $testper;
$testevents{$linkdest}{$testtype}{"flag_scheduled"} = 0;
$testevents{$linkdest}{$testtype}{"timeOfNextRun"} = time_all();
my $testev = \%{ $testevents{$linkdest}{$testtype} };
# $testev->{"limitTime"} = time_all()+10;
# TODO: Implement a limit on time of change
# need to save "baseline" testing period
$testev->{"limitTime"} = $sockIn{limitTime};
$testev->{"testper"} = $testper;
$testev->{"flag_scheduled"} = 0;
$testev->{"timeOfNextRun"} = time_all();
print time()." EDIT:\n";
print( "linkdest=$linkdest\n".
"testype =$testtype\n".
......@@ -210,7 +213,6 @@ sub handleincomingmsgs()
my @destnodes
= split(" ",$sockIn{destnodes});
my $testper = $sockIn{testper};
#TOOD: Add a start time offset, so as to schedule the initial test
#distribute start times from offset 0 to testper/2
my $offsetinc = 0;
if( (scalar @destnodes) != 0 ){
......@@ -218,20 +220,20 @@ sub handleincomingmsgs()
}
my $offset = 0;
foreach my $linkdest (@destnodes){
my $testev = \%{ $testevents{$linkdest}{$testtype} };
#be smart about adding tests
# don't want to change already running tests
# only change those tests which have been updated
if( defined($testevents{$linkdest}{$testtype}{"testper"}) &&
$testper == $testevents{$linkdest}{$testtype}{"testper"} )
if( defined($testev->{"testper"}) &&
$testper == $testev->{"testper"} )
{
# do nothing... keep test as it is
}else{
# update test
$testevents{$linkdest}{$testtype}{"testper"} =$testper;
$testevents{$linkdest}{$testtype}{"flag_scheduled"} =0;
$testev->{"testper"} =$testper;
$testev->{"flag_scheduled"} =0;
# TODO? be smart about when the first test should run?
$testevents{$linkdest}{$testtype}{"timeOfNextRun"} =
time_all() + $offset;
$testev->{"timeOfNextRun"} = time_all() + $offset;
$offset += $offsetinc;
}
}
......@@ -240,10 +242,11 @@ sub handleincomingmsgs()
elsif( $cmdtype eq "SINGLE" ){
my $linkdest = $sockIn{dstnode};
my $testtype = $sockIn{testtype};
$testevents{$linkdest}{$testtype}{"testper"} = 0;
$testevents{$linkdest}{$testtype}{"timeOfNextRun"} = time_all()-1;
$testevents{$linkdest}{$testtype}{"flag_scheduled"} = 1;
$testevents{$linkdest}{$testtype}{"pid"} = 0;
my $testev = \%{ $testevents{$linkdest}{$testtype} };
$testev->{"testper"} = 0;
$testev->{"timeOfNextRun"} = time_all()-1;
$testev->{"flag_scheduled"} = 1;
$testev->{"pid"} = 0;
print( time()." SINGLE\n".
"linkdest=$linkdest\n".
......@@ -254,6 +257,9 @@ sub handleincomingmsgs()
%testevents = ();
%waitq = ();
}
elsif( $cmdtype eq "DIE" ){
die "Received termination command. Exiting.\n";
}
}
}
......@@ -266,12 +272,13 @@ while (1) {
$subtimer--;
sendOldResults();
handleincomingmsgs();
#re-try wait Q every $subtimer_reset times through poll loop
#try to run tests on queue
if( $subtimer == 0 ){
sendOldResults();
if( $subtimer == 0 ){
foreach my $testtype (keys %waitq){
......@@ -292,9 +299,11 @@ while (1) {
for my $destaddr ( keys %testevents ) {
for my $testtype ( keys %{ $testevents{$destaddr} } ){
my $testev = \%{ $testevents{$destaddr}{$testtype} };
#mark flags of finished tests
#check if process is running
my $pid = $testevents{$destaddr}{$testtype}{"pid"};
my $pid = $testev->{"pid"};
if( $pid != 0 ){
use POSIX ":sys_wait_h";
my $kid = waitpid( $pid, &WNOHANG );
......@@ -302,19 +311,18 @@ while (1) {
{
if( $? == 0 ){
#process finished, so mark it's "finished" flag
$testevents{$destaddr}{$testtype}{"flag_finished"} = 1;
$testevents{$destaddr}{$testtype}{"pid"} = 0;
$testev->{"flag_finished"} = 1;
$testev->{"pid"} = 0;
# print "test $destaddr / $testtype finished\n";
}else{
#process exited abnormally
#reset pid
$testevents{$destaddr}{$testtype}{"pid"} = 0;
$testev->{"pid"} = 0;
#schedule next test at a % of a normal period from now
my $nextrun = time_all() +
$testevents{$destaddr}{$testtype}{"testper"} *
$testev->{"testper"} *
$TEST_FAIL_RETRY{$testtype};
$testevents{$destaddr}{$testtype}{"timeOfNextRun"} =
$nextrun;
$testev->{"timeOfNextRun"} = $nextrun;
#delete tmp filename
my $filename = createtmpfilename($destaddr, $testtype);
unlink($filename) or warn "can't delete temp file";
......@@ -324,9 +332,8 @@ while (1) {
("sourceaddr" => $thismonaddr,
"destaddr" => $destaddr,
"testtype" => $testtype,
"result" => "$ERRID{unknown}",
"tstamp" =>
$testevents{$destaddr}{$testtype}{"tstamp"},
"result" => $ERRID{unknown},
"tstamp" => $testev->{"tstamp"},
"magic" => "$magic",
);
# sendResults(\%results, 0); #0 is for a generic index
......@@ -338,7 +345,7 @@ while (1) {
# been running too long (iperf has a looong timeout)
elsif( $testtype eq "bw" &&
time_all() >
$testevents{$destaddr}{$testtype}{"tstamp"} +
$testev->{"tstamp"} +
$iperftimeout )
{
kill 'TERM', $pid;
......@@ -346,8 +353,8 @@ while (1) {
"pid=$pid\n";
# NEW: (not tested below) - send an "outage" result
$testevents{$destaddr}{$testtype}{"pid"} = 0;
$testevents{$destaddr}{$testtype}{"flag_scheduled"} = 0;
$testev->{"pid"} = 0;
$testev->{"flag_scheduled"} = 0;
#delete tmp filename
my $filename = createtmpfilename($destaddr, $testtype);
unlink($filename) or warn "can't delete temp file";
......@@ -355,9 +362,8 @@ while (1) {
("sourceaddr" => $thismonaddr,
"destaddr" => $destaddr,
"testtype" => $testtype,
"result" => "$ERRID{timeout}",
"tstamp" =>
$testevents{$destaddr}{$testtype}{"tstamp"},
"result" => $ERRID{timeout},
"tstamp" => $testev->{"tstamp"},
"magic" => "$magic",
);
sendResults(\%results, 0); #0 is for a generic index
......@@ -367,7 +373,7 @@ while (1) {
}
#check for finished events
if( $testevents{$destaddr}{$testtype}{"flag_finished"} == 1 ){
if( $testev->{"flag_finished"} == 1 ){
#read raw results from temp file
my $filename = createtmpfilename($destaddr, $testtype);
open FILE, "< $filename"
......@@ -381,16 +387,16 @@ while (1) {
unlink($filename) or die "can't delete temp file";
#parse raw data
my $parsedData = parsedata($testtype,$raw);
$testevents{$destaddr}{$testtype}{"results_parsed"} =
$parsedData;
$testev->{"results_parsed"} = $parsedData;
my %results =
("sourceaddr" => $thismonaddr,
"destaddr" => $destaddr,
"testtype" => $testtype,
"result" => $parsedData,
"tstamp" => $testevents{$destaddr}{$testtype}{"tstamp"},
"tstamp" => $testev->{"tstamp"},
"magic" => "$magic",
"ts_finished" => time()
);
#MARK_RELIABLE
......@@ -401,28 +407,25 @@ while (1) {
sendResults(\%results, $index);
#reset flags
$testevents{$destaddr}{$testtype}{"flag_finished"} = 0;
$testevents{$destaddr}{$testtype}{"flag_scheduled"} = 0;
$testev->{"flag_finished"} = 0;
$testev->{"flag_scheduled"} = 0;
}
#schedule new tests
if( $testevents{$destaddr}{$testtype}{"flag_scheduled"} == 0 &&
$testevents{$destaddr}{$testtype}{"testper"} > 0 )
if( $testev->{"flag_scheduled"} == 0 && $testev->{"testper"} > 0 )
{
if( time_all() <
$testevents{$destaddr}{$testtype}{"timeOfNextRun"}
+ $testevents{$destaddr}{$testtype}{"testper"} )
$testev->{"timeOfNextRun"} + $testev->{"testper"} )
{
#if time of next run is in the future, set it to that
$testevents{$destaddr}{$testtype}{"timeOfNextRun"}
+= $testevents{$destaddr}{$testtype}{"testper"};
$testev->{"timeOfNextRun"} += $testev->{"testper"};
}else{
#if time of next run is in the past, set to current time
$testevents{$destaddr}{$testtype}{"timeOfNextRun"}
$testev->{"timeOfNextRun"}
= time_all();
}
$testevents{$destaddr}{$testtype}{"flag_scheduled"} = 1;
$testev->{"flag_scheduled"} = 1;
}
# print "nextrun="
......@@ -432,10 +435,9 @@ while (1) {
# print "pid=".$testevents{$destaddr}{$testtype}{"pid"}."\n";
#check for new tests ready to run
if( $testevents{$destaddr}{$testtype}{"timeOfNextRun"}
<= time_all() &&
$testevents{$destaddr}{$testtype}{"flag_scheduled"} == 1 &&
$testevents{$destaddr}{$testtype}{"pid"} == 0 )
if( $testev->{"timeOfNextRun"} <= time_all() &&
$testev->{"flag_scheduled"} == 1 &&
$testev->{"pid"} == 0 )
{
#run test
spawnTest( $destaddr, $testtype );
......@@ -458,7 +460,7 @@ while (1) {
}
my $iterSinceLastRun = 0; #handles when cacheSendRate < 1/pollPer
sub sendOldResults()
{
#
......@@ -467,55 +469,50 @@ sub sendOldResults()
# acked because the network is slow or down.
#
my $count = 0;
my $maxcount = 10; # Wake up and send only this number at once.
for (my $index = 0; $index < $resultDBlimit; $index++) {
next
if (!exists($reslist{$index}));
my $filename = $reslist{$index};
# Wake up and send only this number at once.
my $maxcount = int( $pollPer*$cacheSendRate );
if (! -e $filename) {
# Hmm, something went wrong!
delete($reslist{$index});
next;
}
# Stat file to get create time.
my (undef,undef,undef,undef,undef,undef,undef,undef,
undef,undef,$ctime) = stat($filename);
my %db;
my $filename = createDBfilename();
tie( %db, "DB_File", $filename) or
eval {
warn time()." cannot open db file";
return -1;
};
next
if ((time() - $ctime) < 10);
my @ids = keys %db;
#resend
my %results;
my %db;
tie(%db, "DB_File", $filename)
or die "cannot open db file";
for my $key (keys %db ){
$results{$key} = $db{$key};
$iterSinceLastRun++;
while( ($count < $maxcount && $count < scalar(@ids))
|| ($iterSinceLastRun > (1/($pollPer*$cacheSendRate))) )
{
if( scalar(@ids) == 0 ){
last;
}
untie(%db);
# Verify results in case the file was scrogged.
my $index = $ids[int( rand scalar(@ids) )];
my %results = %{ deserialize_hash($db{$index}) };
if (!exists($results{"magic"}) || $results{"magic"} ne $magic) {
# Hmm, something went wrong!
print "Old results for index $index are scrogged; deleting!\n";
delete($reslist{$index});
unlink($filename);
next;
delLocalDBEntry($index);
@ids = keys %db;
# next;
}
sendResults(\%results, $index);
sleep(1);
$count++;
if ($count > $maxcount) {
# print "Delaying a bit before sending more old results!\n";
# sleep(2);
last;
}
$iterSinceLastRun = 0;
#don't send recently completed tests
if( time() - $results{ts_finished} < 10 ){ next; }
print "sending old result: $index\n";
sendResults(\%results, $index);
}
untie %db;
#TODO?: combine re-sends into a large message?
}
#############################################################################
......@@ -685,34 +682,50 @@ sub printTimeEvents {
#MARK_RELIABLE
sub saveTestToLocalDB($)
{
my ($results) = @_;
my %db;
my $filename = createDBfilename();
tie( %db, "DB_File", $filename ) or
eval {
warn time()." cannot create db file";
return -1;
};
#
# Find an unused index. Leave zero unused to indicate we ran out.
#
my $index;
for ($index = 1; $index < $resultDBlimit; $index++) {
last
if (!exists($reslist{$index}));
if( !defined($db{$index}) );
}
return 0
if ($index == $resultDBlimit);
#save result to DB's in files.
my $results = $_[0];
$db{$index} = serialize_hash($results);
untie %db;
return $index;
}
sub delLocalDBEntry($)
{
my ($index) = @_;
my %db;
my $filename = createDBfilename($index);
tie( %db, "DB_File", $filename ) or
my $filename = createDBfilename();
tie( %db, "DB_File", $filename) or
eval {
warn time()." cannot create db file";
warn time()." cannot open db file";
return -1;
};
for my $key (keys %$results ){
$db{$key} = $$results{$key};
};
if( defined $db{$index} ){
delete $db{$index};
}
untie %db;
$reslist{$index} = createDBfilename($index);
return $index;
untie %db;
}
#############################################################################
......@@ -732,18 +745,6 @@ sub sendResults($$){
$socket_snd->send($result_serial);
}
#############################################################################
sub time_all()
{
package main;
require 'sys/syscall.ph';
my $tv = pack("LL",());
syscall( &SYS_gettimeofday, $tv, undef ) >=0
or warn "gettimeofday: $!";
my ($sec, $usec) = unpack ("LL",$tv);
return $sec + ($usec / 1_000_000);
# return time();
}
#############################################################################
......@@ -752,9 +753,10 @@ sub createtmpfilename($$)
return "$workingdir$_[0]-$_[1].tmp";
}
sub createDBfilename($)
sub createDBfilename()
{
return "$workingdir$_[0].bgmonbdb";
# return "$workingdir$_[0].bgmonbdb";
return "$workingdir"."cachedResults.bdb";
}
......@@ -777,37 +779,16 @@ sub detectHang($)
}
#############################################################################
#
# Custom sub to turn a hash into a string. Hashes must not contain
# the substring of $separator anywhere!!!
#
sub serialize_hash($)
sub isMsgValid(\%)
{
my ($hashref) = @_;
my %hash = %$hashref;
my $separator = "::";
my $out = "";
for my $key (keys %hash){
$out .= $separator if( $out ne "" );
$out .= $key.$separator.$hash{$key};
my %msgHash = %$hashref;
if( !defined($msgHash{cmdtype}) ){
warn "bad message format\n";
return 0; #bad message
}
return $out;
}
sub deserialize_hash($)
{
my ($string) = @_;
my $separator = "::";
my %hashout;
my @tokens = split( /$separator/, $string );
for( my $i=0; $i<@tokens; $i+=2 ){
$hashout{$tokens[$i]} = $tokens[$i+1];
# print "setting $tokens[$i] => $tokens[$i+1]\n";
if( $msgHash{expid} ne $expid ){
return 0; #not addressed to this experiment
}
return \%hashout;
return 1;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment