Commit 385c0a40 authored by Dan Gebhardt's avatar Dan Gebhardt
Browse files

reliable commands implemented.

parent 469b9924
#!/usr/bin/perl -w
use strict;
use lib '/usr/testbed/lib';
use libxmlrpc;
use event;
use English;
use Getopt::Std;
use IO::Socket::INET;
use IO::Select;
use strict;
my $constrFilename;
my $bwdutycycle;
my ($constrFilename, $expid, $bwdutycycle, $port);
my $numsites;
my %test_per = ( # defaults
"latency" => 300,
......@@ -21,10 +19,11 @@ my @constrnodes; #test constrained to these nodes
my %sitenodes; #hash listing all sites => nodes
my $CPUUSAGETHRESHOLD = 10; #should help prevent flip-flopping between
#"best" nodes at a site
my $SITEDIFFTHRESHOLD = 3; #number of site differences between period
my $SITEDIFFTHRESHOLD = 1; #number of site differences between period
#calculations that trigger an update
my $IPERFDURATION = 10; #duration in seconds of iperf test
my %allnodes;
my %deadnodes;
# RPC STUFF ##############################################
my $TB = "/usr/testbed";
......@@ -38,11 +37,11 @@ my $METHOD = "getlist";
# END RPC STUFF ##########################################
sub usage
{
warn "Usage: $0 [-BLP] [-f constraint file] [-l latency test period]".
" <bandwidth duty cycle>".
" [-e pid/eid]".
" <bandwidth duty cycle 0-1>".
# " <number of sites or \"all\">\n".
"where: -B = Do not measure bandwidth\n".
" -L = Do not measure latency\n".
......@@ -50,34 +49,30 @@ sub usage
return 1;
}
my %opt = ();
getopts("B:L:P:f:l:e:s:p",\%opt);
#TODO: other options
if( $opt{f}) { $constrFilename = $opt{f}; }
if( $opt{l}) { $test_per{latency} = $opt{l}; }
if ($opt{e}) { $expid = $opt{e}; } else { $expid = "none"; }
if ($opt{p}) { $port = $opt{p}; } else{ $port = 5060; }
if( !defined $ARGV[0] ){
exit &usage;
}
$bwdutycycle = $ARGV[0];
#if( !defined $ARGV[1] ){
# exit &usage;
#}
#$numsites = $ARGV[1];
my %opt;
getopt("B:L:P:f:l:s:p",\%opt,);
if( $opt{f}) { $constrFilename = $opt{f}; }
if( $opt{l}) { $test_per{latency} = $opt{l}; }
#TODO: other options
my ($server,$port);
if ($opt{s}) { $server = $opt{s}; } else { $server = "localhost"; }
if ($opt{p}) { $port = $opt{p}; }
my $URL = "elvin://$server";
if ($port) { $URL .= ":$port"; }
my $lastupdated_numnodes = 0;
my $handle = event_register($URL,0);
if (!$handle) { die "Unable to register with event system\n"; }
my $socket;
my $sel = IO::Select->new();
my $tuple = address_tuple_alloc();
if (!$tuple) { die "Could not allocate an address tuple\n"; }
sub stopnode($);
print "exp = $expid\n";
#############################################################################
#
# Initialization
......@@ -87,8 +82,22 @@ libxmlrpc::Config({"server" => $RPCSERVER,
# "cert" => $RPCCERT,
"portnum" => $RPCPORT});
my $lastupdated_numnodes = 0;
# TODO: Stop all nodes in constraint set
# Stop all nodes in constraint set
open FILE, "< $constrFilename"
or die "cannot open file $constrFilename";
while( <FILE> ){
chomp;
if( $_ =~ m/plab/ ){
push @constrnodes, $_;
print "$_\n";
}
}
close FILE;
foreach my $node (@constrnodes){
# print "stopping $node\n";
stopnode($node);
}
###########################################################
#
......@@ -100,6 +109,7 @@ while(1)
getnodeinfo();
choosenodes();
modifytests();
# printchosennodes();
select(undef, undef, undef, 5.0);
}
......@@ -112,14 +122,11 @@ sub getnodeinfo
#retrieve list of nodes
my $rval = libxmlrpc::CallMethod($MODULE, $METHOD,
{"class" => "pcplabphys"});
print "here";
%allnodes = %$rval;
#populate sitenodes
foreach my $node (keys %allnodes){
my $siteid = $allnodes{$node}{site};
# print $siteid;
# print $node;
push @{$sitenodes{$siteid}}, $node;
# print @{$sitenodes{$siteid}}."\n";
}
......@@ -134,25 +141,58 @@ sub getnodeinfo
sub choosenodes
{
foreach my $site (keys %sitenodes){
# print "site $site\n";
my $bestnode = choosebestnode($site);
if( "NONE" eq $bestnode ){
# ** This section handles when a site has no nodes available
#no available node at this site, so remove site from hash
#TODO: send "stop" signals to all other nodes having this
# node as the destination
#(done?)TODO: send "stop" signals to all other nodes having this
# site as the destination
foreach my $srcsite (keys %intersitenodes){
if( defined $intersitenodes{$site} ){
stoppairtest( $intersitenodes{$srcsite},
$intersitenodes{$site} );
}
}
delete $intersitenodes{$site};
}
else{
if( !defined $intersitenodes{$site} ||
$intersitenodes{$site} ne $bestnode )
if( (!defined $intersitenodes{$site} ||
$intersitenodes{$site} ne $bestnode)
#&& isnodeinconstrset($bestnode)
)
{
# ** This section handles when a "bestnode" at a site changes
# TODO: Stop sigs to other nodes using old "bestnode" value
#set new node to represent this site
#(done?)TODO
# Stop sigs to other nodes using old "bestnode" value
if( defined $intersitenodes{$site} ){
foreach my $srcsite (keys %intersitenodes){
stoppairtest( $intersitenodes{$srcsite},
$intersitenodes{$site} );
}
}
#set new node to represent this site
$intersitenodes{$site} = $bestnode;
# TODO: start other nodes using this new "bestnode"
# (This uses the EDIT bgmon signal - see bgmon.pl)
#(done?)TODO: start other nodes using this new "bestnode"
# (This uses the EDIT bgmon command - see bgmon.pl)
foreach my $srcsite (keys %intersitenodes){
edittest( $intersitenodes{$srcsite},
$intersitenodes{$site},
$test_per{bw},
"bw" );
}
#TODO: need to do this smartly...
=pod
edittest( $intersitenodes{$srcsite},
$intersitenodes{$site},
$test_per{latency},
"latency" );
=cut
}
}
}
......@@ -165,7 +205,6 @@ sub choosenodes
sub modifytests
{
my $numsites = scalar(keys %intersitenodes);
my $bwper = ($numsites - 1) * $IPERFDURATION * 1/$bwdutycycle;
#TODO: ?? dynamically change latency period, too?
......@@ -189,8 +228,8 @@ sub printchosennodes
{
foreach my $node (values %intersitenodes){
print "site: ". $allnodes{$node}{site} . " = $node\n";
#TODO:: why does this give an error?
print "node = $node\n";
# #TODO:: why does this give an error?
# print "node = $node\n";
}
}
......@@ -198,6 +237,7 @@ sub printchosennodes
sub choosebestnode($)
{
my ($site) = @_;
my $bestnode = "NONE"; #default to an error value
=pod
print "$site ";
......@@ -207,7 +247,17 @@ sub choosebestnode($)
print "\n";
=cut
foreach my $node ( @{$sitenodes{$site}} ){
=pod
if(isnodeinconstrset($node)){
print "node $node is in constr set ";
if( $allnodes{$node}{free} == 1 ){
print "and free";
}
print "\n";
}
=cut
if( $allnodes{$node}{free} == 1 && isnodeinconstrset($node) ) {
# print "choosing best node for site $site\n";
#first time thru loop...
if( $bestnode eq "NONE" ){
#set this to be best node
......@@ -221,7 +271,7 @@ sub choosebestnode($)
}
}
}
print "bestnode for $site = $bestnode\n";
# print "bestnode for $site = $bestnode\n";
return $bestnode;
}
......@@ -231,11 +281,16 @@ sub isnodeinconstrset($)
my ($node) = @_;
#if constraint set is empty, return true
if( ! defined(@constrnodes) ){
if( @constrnodes == 0 ){
return 1;
}else{
#TODO: check if node exists in contraint set
return -1;
#check if node exists in contraint set
foreach my $cnode (@constrnodes){
if( $node eq $cnode ){
return 1;
}
}
return 0;
}
}
......@@ -252,7 +307,9 @@ sub updateTests
#init bandwidth
foreach my $srcsite (keys %intersitenodes){
$srcnode = $intersitenodes{$srcsite};
foreach my $destsite (%intersitenodes){
$bw_destnodes = "";
foreach my $destsite (keys %intersitenodes){
# print "looking at site $destsite\n";
if( defined $intersitenodes{$destsite}) {
$destnode = $intersitenodes{$destsite};
}
......@@ -261,7 +318,7 @@ sub updateTests
$bw_destnodes .= " ".$destnode;
}
}
# initnode($srcnode, $bw_destnodes, $test_per{bw}, "bw");
initnode($srcnode, $bw_destnodes, $test_per{bw}, "bw");
#TODO! Distribute initialization times evenly
}
......@@ -269,27 +326,41 @@ sub updateTests
#TODO: LATENCY
}
#
# Stop all tests from a node
#
sub stopnode($)
{
my ($node) = @_;
if( isnodeinconstrset($node) ){
my %cmd = ( expid => $expid,
cmdtype => "STOPALL" );
sendcmd($node,\%cmd);
}
}
#
# Stops all nodes in constraint set
#
sub stopall()
#
sub edittest($$$$)
{
foreach my $node (values %sitenodes){
if( isnodeinconstrset($node) ){
%$tuple = ( objtype => "BGMON",
objname => $node,
eventtype => "STOPALL",
expt => "__none" );
my $notification = event_notification_alloc($handle,$tuple);
if (!$notification) { die "Could not allocate notification\n"; }
#send notification
if (!event_notify($handle, $notification)) {
die("could not send test event notification");
}
}
my ($srcnode, $destnode, $testper, $testtype) = @_;
if ($srcnode eq $destnode ){
return -1;
}
print "editing test: $srcnode\n".
" $destnode\n".
" $testtype\n".
" $testper\n";
my %cmd = ( expid => $expid,
cmdtype => "EDIT",
dstnode => $destnode,
testtype => $testtype,
testper => $testper );
sendcmd($srcnode,\%cmd);
}
#
......@@ -298,45 +369,13 @@ sub stopall()
sub stoppairtest($$)
{
my ($srcnode, $destnode) = @_;
print "stopping pair tests from $srcnode to $destnode\n";
my $testper = 0;
my @testtypes = ("latency","bw");
%$tuple = ( objtype => "BGMON",
objname => $srcnode,
eventtype => "EDIT",
expt => "__none" );
foreach my $testtype(@testtypes){
my $notification = event_notification_alloc($handle,$tuple);
if (!$notification) { die "Could not allocate notification\n"; }
#add destination nodes attribute
if( 0 == event_notification_put_string( $handle,
$notification,
"linkdest",
$destnode ) )
{ warn "Could not add attribute to notification\n"; }
#add tests and their default values
if( 0 == event_notification_put_string( $handle,
$notification,
"testper",
$testper ) )
{ warn "Could not add attribute to notification\n"; }
#add test type
if( 0 == event_notification_put_string( $handle,
$notification,
"testtype",
$testtype ) )
{ warn "Could not add attribute to notification\n"; }
#send notification
if (!event_notify($handle, $notification)) {
die("could not send test event notification");
}
foreach my $testtype (@testtypes){
edittest($srcnode, $destnode, 0, $testtype);
}
print "stopping pair tests from $srcnode to $destnode\n";
}
#
......@@ -346,41 +385,97 @@ sub initnode($$$$)
{
my ($node, $destnodes, $testper, $testtype) = @_;
print "SENDING INIT: *$node*$destnodes*$testper*$testtype*\n";
%$tuple = ( objtype => "BGMON",
objname => $node,
eventtype => "INIT",
expt => "__none" );
my $notification = event_notification_alloc($handle,$tuple);
if (!$notification) { die "Could not allocate notification\n"; }
#add destination nodes attribute
if( 0 == event_notification_put_string( $handle,
$notification,
"destnodes",
$destnodes ) )
{ warn "Could not add attribute to notification\n"; }
#add tests and their default values
if( 0 == event_notification_put_string( $handle,
$notification,
"testper",
$testper ) )
{ warn "Could not add attribute to notification\n"; }
#add test type
if( 0 == event_notification_put_string( $handle,
$notification,
"testtype",
$testtype ) )
{ warn "Could not add attribute to notification\n"; }
#send notification
if (!event_notify($handle, $notification)) {
die("could not send test event notification");
}
my %cmd = ( expid => $expid,
cmdtype => "INIT",
destnodes => $destnodes,
testtype => $testtype,
testper => $testper
);
sendcmd($node,\%cmd);
print "sent initnode to $node\n";
print "destnodes = $destnodes\n";
print "testper = $testper\n";
print "testtype = $testtype\n";
}
sub sendcmd($$)
{
my $node = $_[0];
my $hashref = $_[1];
my %cmd = %$hashref;
my $sercmd = serialize_hash( \%cmd );
my $f_success = 0;
my $max_tries = 5;
do{
$socket = IO::Socket::INET->new( PeerPort => $port,
Proto => 'tcp',
PeerAddr => $node );
$sel->add($socket);
if( defined $socket ){
print $socket "$sercmd\n";
#todo: wait for ack;
# timeout period?
$sel->add($socket);
my ($ready) = $sel->can_read(1);
if( defined $ready && $ready eq $socket ){
my $ack = <$ready>;
chomp $ack;
if( $ack eq "ACK" ){
$f_success = 1;
# print "Got ACK from $node for command\n";
close $socket;
}else{
$max_tries--;
}
}
$sel->remove($socket);
close($socket);
}else{
select(undef, undef, undef, 0.2);
$max_tries--;
}
}while( $f_success != 1 && $max_tries != 0 );
if( $max_tries == 0 ){
$deadnodes{$node} = 1;
}
outputErrors();
}
sub outputErrors()
{
print "Nodes not responding to Command:\n";
foreach my $node (keys %deadnodes){
print "$node ";
}
print "\n";
}
#
# Custom sub to turn a hash into a string. Hashes must not contain
# the substring of $separator anywhere!!!
#
sub serialize_hash($)
{
my ($hashref) = @_;
my %hash = %$hashref;
my $separator = "::";
my $out = "";
for my $key (keys %hash){
$out .= $separator if( $out ne "" );
$out .= $key.$separator.$hash{$key};
}
return $out;
}
......@@ -121,20 +121,22 @@ if( defined $ARGV[0] ){
}
print "thismonaddr = $thismonaddr\n";
# Create a UDP socket to receive commands on
# Create a TCP socket to receive commands on
my $socket_cmd = IO::Socket::INET->new( LocalPort => $cmdport,
Proto => 'udp' )
LocalHost => $thismonaddr,
Proto => 'tcp',
Blocking => 0,
Listen => 1,
ReuseAddr=> 1 )
or die "Couldn't create socket on $cmdport\n";
# Create a UDP socket to receive acks on
my $socket_ack = IO::Socket::INET->new( LocalPort => $ackport,
Proto => 'udp' )
or die "Couldn't create socket on $ackport\n";
print time()." creating socket\n";
my $socket_snd = IO::Socket::INET->new( PeerPort => $sendport,
Proto => 'udp',
PeerAddr => $server );
print time()." end creating socket\n";
#create Select object.
my $sel = IO::Select->new();
......@@ -159,26 +161,35 @@ my $subtimer = $subtimer_reset; #decrement every poll-loop.
sub handleincomingmsgs()
{
my $inmsg;
my $cmdHandle;
#check for pending received events
my @ready = $sel->can_read(0.1); #wait max of 0.1 sec. Don't want to
#have 0 here, or CPU usage goes high
foreach my $handle (@ready){
$handle->recv( $inmsg, $rcvBufferSize );
if( $handle eq $socket_ack ){
$handle->recv( $inmsg, $rcvBufferSize );
}elsif( $handle eq $socket_cmd ){
my $cmdHandle = $handle->accept();
$inmsg = <$cmdHandle>;
chomp $inmsg;
print $cmdHandle "ACK\n";
close $cmdHandle;
}
# print "received msg: $inmsg\n";
# my %udpin = %{ Storable::thaw $inmsg};
my %udpin = %{ deserialize_hash($inmsg) };
my $cmdtype = $udpin{cmdtype};
my %sockIn = %{ deserialize_hash($inmsg) };
my $cmdtype = $sockIn{cmdtype};
if( !defined($cmdtype) ){
warn "bad message format\n";
return 0; #bad message
}
if( $udpin{expid} ne $expid ){
if( $sockIn{expid} ne $expid ){
return 0; #not addressed to this experiment
}
if( $cmdtype eq "ACK" ){
my $index = $udpin{index};
my $index = $sockIn{index};
print time()." Ack for index $index. Deleting backup file\n";
if (exists($reslist{$index})) {
unlink($reslist{$index});
......@@ -186,23 +197,23 @@ sub handleincomingmsgs()
}
}
elsif( $cmdtype eq "EDIT" ){
my $linkdest = $udpin{dstnode};
my $testtype = $udpin{testtype};
my $testper = $udpin{testper};
my $linkdest = $sockIn{dstnode};
my $testtype = $sockIn{testtype};
my $testper = $sockIn{testper};
$testevents{$linkdest}{$testtype}{"testper"} = $testper;
$testevents{$linkdest}{$testtype}{"flag_scheduled"} = 0;
$testevents{$linkdest}{$testtype}{"timeOfNextRun"} = time_all();
print( "EDIT:\n");
print time()." EDIT:\n";
print( "linkdest=$linkdest\n".
"testype =$testtype\n".
"testper=$testper\n" );
}
elsif( $cmdtype eq "INIT" ){
print "INIT: ";
my $testtype = $udpin{testtype};
print time()." INIT: ";
my $testtype = $sockIn{testtype};
my @destnodes
= split(" ",$udpin{destnodes});
my $testper = $udpin{testper};
= split(" ",$sockIn{destnodes});