Commit 53fcb773 authored by Dan Gebhardt's avatar Dan Gebhardt
Browse files

First update after an architectural change:

- managerclients send their measurement request commands to the manager
  through the event system.
- the automanagerclient sends its measurement requests to the manager
  using TCP.
- The manager is the only entity to send measurement requests to the bgmons
  running on the nodes.
- A queuing system on each bgmon orders requests based on test frequency,
  so that the fastest frequncy measurement is used.
parent 9dc0cbed
#!/usr/bin/perl -w
package Cmdqueue;
use strict;
sub new{
my $class = shift;
my $self = {
CMDS => []
};
bless( $self, $class );
return $self;
}
=pod
overview:
adds a command to the queue, where/when appropriate
properties:
- If a cmd is "forever", it replaces any previous commands by the given
managerid.
- If a cmd is "temp", it is added to the queue. Additional operations
will take place as follows:
+ If there exists a duplicate command in the queue, the last one received
will be used (dup cmds only differ by received timestamp).
+ If the new command has a period of 0, all cmds in the queue with a
matching managerid will be deleted.
- Resulting queue is sorted by ascending test periods.
=cut
sub add{
my $self = shift;
my $cmd = shift;
bless $cmd, "Cmd";
$self->cleanQueue();
# special case if new command is "forever"
if( $cmd->duration() == 0 ){
#search for a matching managerID and "forever" status
my $replaced = 0;
for( my $i=0; $i<scalar(@{$self->{CMDS}}); $i++ ){
if( $self->{CMDS}->[$i]->duration() == 0 &&
($self->{CMDS}->[$i]->managerid() eq $cmd->managerid()) )
{
#replace existing forever
$self->{CMDS}->[$i] = $cmd;
# print "REPLACED\n";
$replaced = 1;
}
}
if(!$replaced){
#add
push @{$self->{CMDS}}, $cmd;
}
}else{
# not forever... just add to queue
#delete duplicates
for( my $i=0; $i<scalar(@{$self->{CMDS}}); $i++ ){
if( $self->{CMDS}->[$i]->eqCmd($cmd) ){
splice( @{$self->{CMDS}}, $i, 1 );
$i--; #since we just removed this position and need
#to check the element moved into it's place
}
}
push @{$self->{CMDS}}, $cmd;
}
# if new cmd has per=0, REMOVE ALL TESTS with its manid
if( $cmd->period() == 0 ){
$self->rmCmds($cmd->managerid());
}
#sort queue
$self->sortQueue();
}
sub getQueueInfo{
my $self = shift;
my $info = "-----------\n";
foreach my $cmd (@{$self->{CMDS}}){
$info .= $cmd->getCmdInfo()."---------\n";
}
return $info;
}
#
# remove expired commands
#
sub cleanQueue{
my $self = shift;
for( my $i=0; $i<scalar(@{$self->{CMDS}}); $i++ ){
if( defined $self->{CMDS}->[$i]->timeleft() &&
$self->{CMDS}->[$i]->timeleft() < 0 )
{
# print "q len = ".scalar(@{$self->{CMDS}})."\n";
#cmd expired, so remove from queue
splice( @{$self->{CMDS}}, $i, 1 );
$i--; #since we just removed this position and need
#to check the element moved into it's place
}
}
$self->sortQueue();
}
sub head{
my $self = shift;
return $self->{CMDS}->[0];
}
#
# remove all commands with a given managerID
# If no managerID, remove ALL commands
# returns: number of deleted elements
sub rmCmds{
my $self = shift;
my $managerID = shift;
# print "CMD: manid=$managerID\n";
my $numDel = 0;
if( !defined $managerID ){
# remove all commands
$numDel = scalar(@{$self->{CMDS}});
$self->{CMDS} = [];
}else{
for( my $i=0; $i < scalar(@{$self->{CMDS}}); $i++ ){
if( $self->{CMDS}->[$i]->managerid() eq $managerID ){
#delete cmd if it has a matching managerid
splice( @{$self->{CMDS}}, $i, 1 );
$i--; #since we just removed this position and need
#to check the element moved into it's place
$numDel++;
}
}
}
#resort
$self->sortQueue();
return $numDel;
}
sub sortQueue{
my $self = shift;
#TODO: do something faster....
my $l = scalar(@{$self->{CMDS}});
for( my $i=0; $i<$l-1; $i++ ){
for( my $j=$i+1; $j<$l; $j++ ){
if( $self->{CMDS}->[$j]->period()
<
$self->{CMDS}->[$i]->period() )
{
my $tmpcmd = $self->{CMDS}->[$j];
$self->{CMDS}->[$j] = $self->{CMDS}->[$i];
$self->{CMDS}->[$i] = $tmpcmd;
}
}
}
}
1; #DON'T REMOVE!
#########################################################################
package Cmd;
use strict;
sub new{
my $class = shift;
my ($manid, $per, $dur) = @_;
my $self = {
MANAGERID => $manid,
PERIOD => scalar($per),
DURATION => scalar($dur),
TIME_RECVD => time()
};
bless( $self, $class );
return $self;
}
sub getCmdInfo{
my $self = shift;
my $info = "";
foreach my $key (keys %{$self}){
$info .= "$key = ".$self->{$key}."\n";
}
return $info;
}
sub managerid{
my $self = shift;
return $self->{MANAGERID};
}
sub period{
my $self = shift;
return $self->{PERIOD};
}
sub duration{
my $self = shift;
return $self->{DURATION};
}
# if given command has no set duration, return undef
sub timeleft{
my $self = shift;
if( !defined $self->{DURATION} || $self->{DURATION} == 0 ){
return undef;
}else{
return ($self->{TIME_RECVD} + $self->{DURATION}) - time();
}
}
sub eqCmd{
my $self = shift;
my $cmd = shift;
if( $self->{PERIOD} == $cmd->period() &&
$self->{MANAGERID} eq $cmd->managerid() &&
$self->{DURATION} == $cmd->duration() )
{
return 1;
}else{
return 0;
}
}
1;
#!/usr/bin/perl -w
use strict;
use lib '/usr/testbed/lib';
use event;
use libxmlrpc;
use libwanetmon qw(!stopnode);
use English;
use Getopt::Std;
use IO::Socket::INET;
use IO::Select;
$| = 1;
my ($constrFilename, $thisManagerID, $expid, $bwdutycycle, $port,$server,
$cmdport);
my $numsites;
my %test_per = ( # defaults
"latency" => 300,
"bw" => 0,
);
$thisManagerID = "automanagerclient";
#$expid = "__none"; #expt field in event system notification.
my $bgmonexpt = "tbres/pelabbgmon";
my %intersitenodes = (); #final list for fully-connected test
my @constrnodes; #test constrained to these nodes
my %sitenodes; #hash listing all sites => nodes
my $CPUUSAGETHRESHOLD = 10; #should help prevent flip-flopping between
#"best" nodes at a site
#TODO: document normal range of values for CPU
my $SITEDIFFTHRESHOLD = 5; #number of site differences between period
#calculations that trigger an update
my $IPERFDURATION = 5; #duration in seconds of iperf test
my %allnodes;
my %deadsites;
# RPC STUFF ##############################################
my $TB = "/usr/testbed";
my $ELABINELAB = 0;
my $RPCSERVER = "boss.emulab.net"; #?
my $RPCPORT = "3069";
#my $RPCCERT = "/etc/outer_emulab.pem"; #?
my $RPCCERT = "~/.ssl/emulab.pem";
my $MODULE = "node";
my $METHOD = "getlist";
# END RPC STUFF ##########################################
sub usage
{
warn "Usage: $0 [-BLP] [-f constraint file] [-l latency test period]".
# " [-e pid/eid] [-c cmdport]".
" <bandwidth duty cycle 0-1>".
# " <number of sites or \"all\">\n".
"where: -B = Do not measure bandwidth\n".
" -L = Do not measure latency\n".
" -P = Do not measure latency to nodes not responding to pings\n";
return 1;
}
my %opt = ();
getopts("B:L:P:f:l:e:s:p:c",\%opt);
#TODO: other options
if( $opt{f}) { $constrFilename = $opt{f}; }
if( $opt{l}) { $test_per{latency} = $opt{l}; }
if ($opt{s}) { $server = $opt{s}; } else { $server = "localhost"; }
if ($opt{p}) { $port = $opt{p}; }
if ($opt{c}) { $cmdport = $opt{c}; } else {$cmdport = 5052;}
if( !defined $ARGV[0] ){
exit &usage;
}
$bwdutycycle = $ARGV[0];
my $lastupdated_numnodes = 0;
#my $socket;
#my $sel = IO::Select->new();
my $URL = "elvin://$server";
if ($port) { $URL .= ":$port"; }
my $handle = event_register($URL,0);
if (!$handle) { die "Unable to register with event system\n"; }
#
# these two should not be needed after "status" architecture worked out
#
setcmdport(5052);
setexpid("tbres/pelabbgmon");
#FORWARD DECL'S
sub main::outputErrors();
sub main::stopnode($);
#############################################################################
#
# Initialization
#
libxmlrpc::Config({"server" => $RPCSERVER,
"verbose" => 0,
# "cert" => $RPCCERT,
"portnum" => $RPCPORT});
# Stop all nodes in constraint set
if( defined $constrFilename )
{
open FILE, "< $constrFilename" or die "cannot open file $constrFilename";
while( <FILE> ){
chomp;
if( $_ =~ m/plab/ ){
push @constrnodes, $_;
print "$_\n";
}
}
close FILE;
foreach my $node (@constrnodes){
# print "stopping $node\n";
stopnode($node);
}
}
#stop all nodes from XML-RPC query
else{
getnodeinfo();
foreach my $site (keys %sitenodes){
foreach my $node (@{$sitenodes{$site}}){
stopnode($node);
}
}
}
###########################################################
#
# Main Loop
#
my $f_firsttime = 1;
while(1)
{
%deadnodes = ();
#update node list
getnodeinfo();
choosenodes();
# if( $f_firsttime ){
# print "FIRST TIME UPDATE\n";
# updateTests();
# }
modifytests($f_firsttime);
# printchosennodes();
outputErrors();
sleep( 60 );
$f_firsttime = 0;
}
###########################################################
sub getnodeinfo
{
#retrieve list of nodes
%sitenodes = ();
my $rval = libxmlrpc::CallMethod($MODULE, $METHOD,
{"class" => "pcplabphys"});
if( defined $rval ){
%allnodes = %$rval;
}else{ return; }
#remove old node-listing file
my $nodesfilename = "allnodelisting_automanage.txt";
unlink $nodesfilename or warn "can't delete node-listing file";
open FILE, "> $nodesfilename"
or warn "can't open file $nodesfilename\n";
#populate sitenodes
foreach my $node (keys %allnodes){
my $siteid = $allnodes{$node}{site};
push @{$sitenodes{$siteid}}, $node;
# print @{$sitenodes{$siteid}}."\n";
print FILE "$node\n";
}
close FILE;
}
sub printNodeInfo($)
{
my ($node) = @_;
foreach my $key (keys %{$allnodes{$node}} ){
print "\t$key = $allnodes{$node}{$key}\n";
}
}
########################################################
#
# choose a node from each possible site
sub choosenodes
{
foreach my $site (keys %sitenodes){
# print "site $site\n";
my $bestnode = choosebestnode($site);
# if( !defined $bestnode ){ print "BESTNODE NOT DEF!!!\n"; }
# if( $bestnode ne "NONE"){ print "bestnode for $site = $bestnode\n"; }
if( $bestnode ne "NONE" &&
!defined $intersitenodes{$site} )
{
print time()." SECTION 1: adding $bestnode at $site\n";
# ** This section handles when a site is seen for the 1st time
#set new node to represent this site
$intersitenodes{$site} = $bestnode;
initNewSiteNode($site,$bestnode)
if( $f_firsttime == 0 );
}
elsif( ("NONE" eq $bestnode) && defined $intersitenodes{$site} )
{
print time()." SECTION 2: removing tests to $site / ".
"$intersitenodes{$site} \n";
# ** This section handles when a site has no nodes available
#no available node at this site, so remove site from hash
foreach my $srcsite (keys %intersitenodes){
stoppairtest( $intersitenodes{$srcsite},
$intersitenodes{$site} );
}
delete $intersitenodes{$site};
}
elsif( defined $intersitenodes{$site} &&
( $intersitenodes{$site} ne $bestnode ||
getstatus($bestnode) eq "anyscheduled_no"
)
)
{
print time()." SECTION 3: node change/restart at $site from ".
"$intersitenodes{$site} to $bestnode\n";
# ** This section handles when a "bestnode" at a site changes
#TODO: This logic should be fixed so the new tests are
# started before old ones are stopped. This may help
# prevent "holes" in the data collection to a site.
# Stop sigs to other nodes using old "bestnode" value
if( defined $intersitenodes{$site} ){
foreach my $srcsite (keys %intersitenodes){
stoppairtest( $intersitenodes{$srcsite},
$intersitenodes{$site} );
}
}
#set new node to represent this site
$intersitenodes{$site} = $bestnode;
initNewSiteNode($site);
}
}
}
sub initNewSiteNode($)
{
my ($site) = @_;
# $intersitenodes{$site} = $bestnode;
# stop any tests remaining on this node.
stopnode($intersitenodes{$site});
# start tests to and from this new site
foreach my $srcsite (keys %intersitenodes){
edittest_amc( $intersitenodes{$srcsite},
$intersitenodes{$site},
$test_per{bw},
"bw",
0,
$thisManagerID );
edittest_amc( $intersitenodes{$site},
$intersitenodes{$srcsite},
$test_per{bw},
"bw",
0,
$thisManagerID);
my $r = rand;
if( $r <= .5 ){
edittest_amc( $intersitenodes{$srcsite},
$intersitenodes{$site},
$test_per{latency},
"latency",
0,
$thisManagerID );
}else{
edittest_amc( $intersitenodes{$site},
$intersitenodes{$srcsite},
$test_per{latency},
"latency",
0,
$thisManagerID );
}
}
}
#
# Re-adjust the test periods of connections based on number of nodes
# Pass a non-zero parameter to force an initialization of all paths
sub modifytests
{
my ($f_forceInit) = @_;
my $numsites = scalar(keys %intersitenodes);
my $bwper = ($numsites - 1) * $IPERFDURATION * 1/$bwdutycycle;
#TODO: ?? dynamically change latency period, too?
#update connections to use newly calculated periods
if( abs($lastupdated_numnodes - $numsites) > $SITEDIFFTHRESHOLD )
{
if( !$opt{B} ){
$test_per{bw} = $bwper;
print "new BW per = $bwper\n";
$lastupdated_numnodes = $numsites;
updateTests(1,0); #handles changing number of sites.
# only update bandwidth
}
}
if( defined $f_forceInit && $f_forceInit != 0 ){
updateTests(1,1);
}
}
sub printchosennodes
{
foreach my $node (values %intersitenodes){
print "site: ". $allnodes{$node}{site} . " = $node\n";
# #TODO:: why does this give an error?
# print "node = $node\n";
}
}
sub choosebestnode($)
{
my ($site) = @_;
my $bestnode = "NONE"; #default to an error value
my $flag_siteIncluded = 0; #set if any node at site is in constraint set
=pod
print "site: $site ";
foreach my $node ( @{$sitenodes{$site}} ){
print "$node ";
}
print "\n";
=cut
foreach my $node ( @{$sitenodes{$site}} ){
=pod
if(isnodeinconstrset($node)){
print "node $node is in constr set ";
if( $allnodes{$node}{free} == 1 ){
print "and free";
}
print "\n";
}
=cut
if( isnodeinconstrset($node) ){
$flag_siteIncluded = 1;
# print "SETTING SITEINCLUDED=1 for $node at $site\n";
}
if( $allnodes{$node}{free} == 1 && isnodeinconstrset($node) ){
# print "choosebestnode: considering $node\n";
# print "choosing best node for site $site\n";
#first time thru loop...
if( $bestnode eq "NONE" ){
#set this to be best node if it responds to a command
# if( edittest($node,"NOADDR",0,"bw") == 1 ){
# $bestnode = $node;
# }
if( getstatus($node) ne "error" ){
$bestnode = $node;
}
}else{
if( ($allnodes{$node}{cpu} < $allnodes{$bestnode}{cpu}
- $CPUUSAGETHRESHOLD) &&