Commit de676b89 authored by Leigh B. Stoller's avatar Leigh B. Stoller
Browse files

Some reliability changes.

* opsrecv sends an ack back to bgmon for each packet it receives.

* Rework caching code a bit, and rearrange slightly to deal with acks.

Note that opsrecv does not deal with duplicates yet ... still thinking
about that, but it actually does not matter too much if the info gets
duplicated in the DP; multiple entries with the same data and the same
timestamp *should* be harmless.
parent a5778049
......@@ -34,6 +34,7 @@ use event;
use Getopt::Std;
use strict;
use DB_File;
use Socket;
sub usage {
warn "Usage: $0 [hostname] -d [workingdir]\n";
......@@ -50,8 +51,9 @@ my %TEST_FAIL_RETRY= (latency => 0.3,
bw => 0.1);
#MARK_RELIABLE
# each result waiting to be acked has an id number and corresponding file
my $resultDBlimit = 100;
my $resIndex = 0;
my $resultDBlimit = 1000;
my %reslist = ();
my $magic = "0xDeAdBeAf";
my %testevents = ();
......@@ -106,6 +108,19 @@ if (!event_subscribe($handle,\&callbackFunc,$tuple)) {
die "Could not subscribe to event\n";
}
# This is for our ack from ops.
$tuple = address_tuple_alloc();
if (!$tuple) { die "Could not allocate an address tuple\n"; }
%$tuple = ( objname => "ops",
eventtype => "ACK",
expt => "__none",
objtype => "BGMON");
if (!event_subscribe($handle,\&callbackFunc,$tuple)) {
die "Could not subscribe to ack event\n";
}
#this call will reconnect event system if it has failed
sendBlankNotification();
......@@ -116,6 +131,16 @@ sendBlankNotification();
#############################################################################
#main()
#
# At startup, look for any old results that did not get acked. Add them to
# the reslist so they get resent below.
#
for (my $i = 0; $i < $resultDBlimit; $i++) {
if (-e createDBfilename($i)) {
$reslist{$i} = createDBfilename($i);
}
}
while (1) {
#check for pending received events
......@@ -188,14 +213,14 @@ while (1) {
"destaddr" => $destaddr,
"testtype" => $testtype,
"result" => $parsedData,
"tstamp" => $testevents{$destaddr}{$testtype}
{"tstamp"} );
"tstamp" => $testevents{$destaddr}{$testtype}{"tstamp"},
"magic" => "$magic",
);
#MARK_RELIABLE
#save result to local DB
saveTestToLocalDB(\%results);
my $index = saveTestToLocalDB(\%results);
#send result to remote DB
sendResults(\%results, $resIndex);
$resIndex = ($resIndex+1) % $resultDBlimit;
sendResults(\%results, $index);
#reset flags
$testevents{$destaddr}{$testtype}{"flag_finished"} = 0;
......@@ -253,24 +278,60 @@ while (1) {
}
}
#MARK_RELIABLE
#check for results that could not be sent due to error
# TODO: horribly inefficient...
for( my $i=0; $i < $resultDBlimit; $i++ ){
if( -e createDBfilename($i) ){
#resend
my %results;
my %db;
tie( %db, "DB_File", createDBfilename($i) )
or die "cannot open db file";
for my $key (keys %db ){
$results{$key} = $db{$key};
}
untie %db;
sendResults(\%db,$i);
#
# Check for results that could not be sent due to error. We want to wait
# a little while though to avoid resending data that has yet to be
# acked cause the network is slow or down.
#
my $count = 0;
my $maxcount = 5; # Wake up and send only this number at once.
for (my $index = 0; $index < $resultDBlimit; $index++) {
next
if (!exists($reslist{$index}));
my $filename = $reslist{$index};
if (! -e $filename) {
# Hmm, something went wrong!
delete($reslist{$index});
next;
}
}
# Stat file to get create time.
my (undef,undef,undef,undef,undef,undef,undef,undef,
undef,undef,$ctime) = stat($filename);
next
if ((time() - $ctime) < 10);
#resend
my %results;
my %db;
tie(%db, "DB_File", $filename)
or die "cannot open db file";
for my $key (keys %db ){
$results{$key} = $db{$key};
}
untie(%db);
# Verify results in case the file was scrogged.
if (!exists($results{"magic"}) || $results{"magic"} ne $magic) {
# Hmm, something went wrong!
print "Old results for index $index are scrogged; deleting!\n";
delete($reslist{$index});
unlink($filename);
next;
}
sendResults(\%results, $index);
sleep(1);
$count++;
if ($count > $maxcount) {
print "Delaying a bit before sending more old results!\n";
sleep(2);
last;
}
}
}
#############################################################################
......@@ -299,7 +360,19 @@ sub callbackFunc($$$) {
# print "EVENT: $time $objtype $eventtype\n";
# Ack from ops.
if ($eventtype eq "ACK") {
my $index = event_notification_get_string($handle,
$notification,
"index");
print "Ack for index $index. Deleting backup file\n";
if (exists($reslist{$index})) {
unlink($reslist{$index});
delete($reslist{$index});
}
return;
}
#change values and/or initialize
if( $eventtype eq "EDIT" ){
......@@ -535,15 +608,30 @@ sub printTimeEvents {
#MARK_RELIABLE
sub saveTestToLocalDB($)
{
#
# Find an unused index. Leave zero unused to indicate we ran out.
#
my $index;
for ($index = 1; $index < $resultDBlimit; $index++) {
last
if (!exists($reslist{$index}));
}
return 0
if ($index == $resultDBlimit);
#save result to DB's in files.
my $results = $_[0];
my %db;
my $filename = createDBfilename($resIndex);
my $filename = createDBfilename($index);
tie( %db, "DB_File", $filename ) or die "cannot create db file";
for my $key (keys %$results ){
$db{$key} = $$results{$key};
}
untie %db;
$reslist{$index} = createDBfilename($index);
return $index;
}
#############################################################################
......@@ -551,8 +639,6 @@ sub sendResults($$){
my $results = $_[0];
my $index = $_[1];
my $f_success = 1; #flag to indicate error during send
my $tuple_res = address_tuple_alloc();
if (!$tuple_res) { warn "Could not allocate an address tuple\n"; }
......@@ -597,27 +683,17 @@ sub sendResults($$){
"tstamp",
$results->{tstamp} ) )
{ warn "Could not add attribute to notification\n"; }
#MARK_RELIABLE
#send notification, and check for send error from event system
if (!event_notify($handle, $notification_res)) {
warn("could not send test event notification");
$f_success = 0;
}
if( 0 == event_notification_put_string( $handle,
$notification_res,
"index",
"$index" ) )
{ warn "Could not add attribute to notification\n"; }
# if (!event_notify(undef, $notification_res)) {
# warn("could not send test event notification");
# $f_success = 0;
# }
print "Sending results to ops. Index: $index\n";
#MARK_RELIABLE
#check for successful send
if( $f_success == 1 ){
#delete file of event result
print " successful send: ";
print "$results->{testtype}=$results->{result}";
unlink( createDBfilename($index) );
if (!event_notify($handle, $notification_res)) {
warn("could not send test event notification");
}
if( event_notification_free( $handle, $notification_res ) == 0 ){
......
......@@ -18,11 +18,15 @@ use libtbdb;
use event;
use Getopt::Std;
use strict;
use Socket;
# node and site id caches
my %nodeids;
my %siteids;
# ipaddr cache, since nodes are addressed via IP in the event system.
my %ipaddrs;
# Batch up insertions. Simple string.
my $insertions = "";
my $batchsize = 0;
......@@ -154,9 +158,9 @@ sub callbackFunc($$$) {
my $tstamp = event_notification_get_string($handle,
$notification,
"tstamp");
# my $scheduler = event_notification_get_string($handle,
# $notification,
# "SCHEDULER");
my $index = event_notification_get_string($handle,
$notification,
"index");
#change values and/or initialize
if ( $debug && $eventtype eq "RESULT" ){
......@@ -167,6 +171,7 @@ sub callbackFunc($$$) {
"linkdest=$linkdest\n".
"testtype =$testtype\n".
"result=$result\n".
"index=$index\n".
"tstamp=$tstamp\n")
if ($debug);
......@@ -176,11 +181,46 @@ sub callbackFunc($$$) {
result => $result,
tstamp => $tstamp );
if (!exists($ipaddrs{$linksrc})) {
my (undef,undef,undef,undef,@ips) = gethostbyname("$linksrc");
if (!@ips) {
warn "Could not map $linksrc to its ipaddr\n";
return;
}
$ipaddrs{$linksrc} = inet_ntoa($ips[0]);
}
my $ipaddr = $ipaddrs{$linksrc};
my $tuple = address_tuple_alloc();
if (!$tuple) {
warn "Could not allocate an address tuple for reply to $linksrc\n";
return;
}
%$tuple = ( objtype => "BGMON",
objname => "ops",
eventtype => "ACK",
expt => "__none",
host => "$ipaddr",
);
my $reply_notification = event_notification_alloc($handle, $tuple);
if (!$reply_notification) {
warn "Could not allocate notification for reply to $linksrc\n";
return;
}
# So the sender knows which one we actually got.
event_notification_put_string($handle,
$reply_notification, "index", "$index");
# if (event_unregister($handle) == 0) {
# die "Unable to unregister with event system\n";
# }
# exit(0);
print "Sending ack event to $ipaddr\n"
if ($debug);
if (!event_notify($handle, $reply_notification)) {
warn("could not send reply notification to $linksrc");
}
event_notification_free($handle, $reply_notification);
}
#############################################################################
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment