Commit 639758e6 authored by Dan Gebhardt's avatar Dan Gebhardt

Some very basic infrastructure for generalized tools was added.

Some parts very "hacky", lots of XXX / TODOs.
BARELY tested!!!! Don't use on the production system yet.
Need to verify that things didn't break, especially with the automanager,
the bgmon.pl "outage detection stuff", etc...

See code's todos and also the file
flexlab-canonicalization-misc-notes.txe

At this point:
 - mangerclient & automanagerclient did not change.
 - The manager, when a probe request is received, looks at the DB table
   "tool_spec" to retrieve the proper wrapperpath, formal parameter list, etc..
 - The manager assembles a message and sends it out to the appropriate
   path probers, which operate on the request as they have been
   (no major internal changes to the path probers).
 - The path probers call the toolwrapper to run the tests
 - The toolwrappers return a canonical result for that type of metric.
 - The path probers send the result and the actual parameters used by the
   tool to the data collector.
 - The data collector uses the toolname and actual parameters to get an
   idx number. This idx is stored in the "measured_by" field of this
   measurement's entry in the "pair_data" table.
parent caa4023a
......@@ -444,35 +444,35 @@ while (1) {
my $destaddr = $runningtestPIDs{$pid}[0];
my $testtype = $runningtestPIDs{$pid}[1];
my $testev = \%{ $testevents{$destaddr}{$testtype} };
my $killit = 0;
if ($testev->{"continuous"}) {
my $filename = $testev->{"outfile"};
my $fsize = $testev->{"lastsize"};
my $cursize = (stat($filename))[7];
print time_all().": c$testtype test: fn=$filename, fs=$cursize, lastfs=$fsize\n" if ($debug > 1);
if (defined($cursize)) {
if ($cursize > $fsize) {
$testev->{"new_results"} = $fsize;
$testev->{"tstamp"} = time_all();
} else {
undef $testev->{"new_results"};
}
} else {
# something is horribly wrong, kill the process
$killit = "no output file";
}
}
my $killit = 0;
if ($testev->{"continuous"}) {
my $filename = $testev->{"outfile"};
my $fsize = $testev->{"lastsize"};
my $cursize = (stat($filename))[7];
print time_all().": c$testtype test: fn=$filename, fs=$cursize, lastfs=$fsize\n" if ($debug > 1);
if (defined($cursize)) {
if ($cursize > $fsize) {
$testev->{"new_results"} = $fsize;
$testev->{"tstamp"} = time_all();
} else {
undef $testev->{"new_results"};
}
} else {
# something is horribly wrong, kill the process
$killit = "no output file";
}
}
if( $testtype eq "bw" &&
time_all() >
$testev->{"tstamp"} +
$iperftimeout )
{
# bw test is running too long, so kill it
$killit = "timeout";
}
$killit = "timeout";
}
if ($killit) {
if ($killit) {
kill 'TERM', $pid;
print time_all()." $testtype $killit: killed $destaddr, ".
"pid=$pid\n";
......@@ -480,11 +480,14 @@ while (1) {
$testev->{"timedout"} = 1;
#delete tmp filename
my $filename = $testev->{"outfile"};
my $filename = $testev->{"outfile"};
unlink($filename) or warn "can't delete temp file";
undef $testev->{"new_results"};
undef $testev->{"lastsize"};
undef $testev->{"new_results"};
undef $testev->{"lastsize"};
#
# The special iperf timeout hack is to be handled by the iperf wrapper.
#
=pod
my %results =
("sourceaddr" => $thismonaddr,
"destaddr" => $destaddr,
......@@ -493,11 +496,12 @@ while (1) {
"tstamp" => $testev->{tstamp},
"magic" => "$magic",
"ts_finished" => time()
);
);
#save result to local DB
my $index = saveTestToLocalDB(\%results);
#send result to remote DB
sendResults(\%results, $index);
=cut
}
}
......@@ -505,41 +509,41 @@ while (1) {
#iterate through all event structures
for my $destaddr ( keys %testevents ) {
for my $testtype ( keys %{ $testevents{$destaddr} } ){
my $testev = \%{ $testevents{$destaddr}{$testtype} };
#check for finished events
if( $testev->{"flag_finished"} == 1 ||
($testev->{"continuous"} &&
defined($testev->{"new_results"})) ){
my @raw_lines;
#read raw results from temp file
# NOTE: parsing is now done in the wrapper, and we return the
# entire wrapper output
# this stuff left in for the "continuous hack" (for now)
my $filename = $testev->{"outfile"};
if (!$testev->{"flag_finished"} ||
!$testev->{"continuous"}) {
open FILE, "< $filename"
or warn "can't open file $filename";
if ($testev->{"new_results"}) {
seek(FILE, $testev->{"new_results"}, 0);
}
@raw_lines = <FILE>;
if (!$testev->{"flag_finished"}) {
$testev->{"lastsize"} = tell(FILE);
}
close FILE;
}
if ($testev->{"flag_finished"}) {
unlink($filename) or warn "can't delete temp file";
undef $testev->{"new_results"};
undef $testev->{"lastsize"};
} else {
print "c$testtype test: read ", scalar(@raw_lines),
" lines, size=", $testev->{"lastsize"}, "\n" if ($debug > 1);
($testev->{"continuous"} &&
defined($testev->{"new_results"})) ){
my @raw_lines;
#read raw results from temp file
# NOTE: parsing is now done in the wrapper, and we return the
# entire wrapper output
# this stuff left in for the "continuous hack" (for now)
my $filename = $testev->{"outfile"};
if (!$testev->{"flag_finished"} ||
!$testev->{"continuous"}) {
open FILE, "< $filename"
or warn "can't open file $filename";
if ($testev->{"new_results"}) {
seek(FILE, $testev->{"new_results"}, 0);
}
@raw_lines = <FILE>;
if (!$testev->{"flag_finished"}) {
$testev->{"lastsize"} = tell(FILE);
}
close FILE;
}
if ($testev->{"flag_finished"}) {
unlink($filename) or warn "can't delete temp file";
undef $testev->{"new_results"};
undef $testev->{"lastsize"};
} else {
print "c$testtype test: read ", scalar(@raw_lines),
" lines, size=", $testev->{"lastsize"}, "\n" if ($debug > 1);
}
#
......@@ -561,7 +565,6 @@ while (1) {
$testev->{"continuous"},
$raw);
$testev->{"results_parsed"} = $parsedData;
print "PARSED data = $parsedData\n";
my %results =
("sourceaddr" => $thismonaddr,
......@@ -571,6 +574,11 @@ while (1) {
"tstamp" => $testev->{tstamp},
"magic" => "$magic",
"ts_finished" => time()
,"toolname" => $testev->{toolname}
,"toolwrapperpath" => $testev->{toolwrapperpath}
,"tooltype" => $testev->{tooltype}
,"req_params" => $testev->{req_params}
,"opt_params" => $testev->{opt_params}
);
#MARK_RELIABLE
......@@ -607,16 +615,16 @@ while (1) {
time() >= $testev->{limitTime} )
{
my $oldper = $testev->{testper};
print time().": Ending test $testtype for $destaddr\n";
$testev->{"end_stats"} = getstats();
printstats("End", $testev->{"end_stats"});
my $diff = diffstats($testev->{"start_stats"},
$testev->{"end_stats"});
printstats("Total", $diff);
print time().": Ending test $testtype for $destaddr\n";
$testev->{"end_stats"} = getstats();
printstats("End", $testev->{"end_stats"});
my $diff = diffstats($testev->{"start_stats"},
$testev->{"end_stats"});
printstats("Total", $diff);
# Rate increase expired. Get new value from Q
$testev->{cmdq}->cleanQueue(); #rid expired values
$testev->{testper} = 0; #reset existing period
undef $testev->{continuous};
undef $testev->{continuous};
updateTestEvent($testev);
print "resetting period from $oldper";
print " to ".$testev->{testper}."\n";
......@@ -628,18 +636,18 @@ while (1) {
#if time of next run is in the future, set it to that
$testev->{"timeOfNextRun"} += $testev->{"testper"};
}else{
if ( ($testev->{"timeOfNextRun"} == 0) &&
( $testev->{"managerID"} eq "automanagerclient" ) &&
($testtype eq "bw" ) ) {
# init the test based on random initial time
my $range = $testev->{"testper"} - 2 * $iperfduration;
my $random_init = int(rand($range));
$testev->{"timeOfNextRun"} = time_all() + $random_init;
} else {
#if time of next run is in the past, set to current time
$testev->{"timeOfNextRun"}
= time_all();
}
if ( ($testev->{"timeOfNextRun"} == 0) &&
( $testev->{"managerID"} eq "automanagerclient" ) &&
($testtype eq "bw" ) ) {
# init the test based on random initial time
my $range = $testev->{"testper"} - 2 * $iperfduration;
my $random_init = int(rand($range));
$testev->{"timeOfNextRun"} = time_all() + $random_init;
} else {
#if time of next run is in the past, set to current time
$testev->{"timeOfNextRun"}
= time_all();
}
}
$testev->{"flag_scheduled"} = 1;
......@@ -651,9 +659,10 @@ while (1) {
$testev->{"pid"} == 0 )
{
#run test
print time().": Starting test $testtype for $destaddr\n";
$testev->{"start_stats"} = getstats();
printstats("Start", $testev->{"start_stats"});
print time().": Starting test $testtype for $destaddr\n"
if( $debug > 1 );
$testev->{"start_stats"} = getstats();
printstats("Start", $testev->{"start_stats"});
spawnTest( $destaddr, $testtype );
}
......@@ -806,13 +815,13 @@ sub spawnTest($$)
{
my $toolwrapperpath
= "$testevents{$linkdest}{$testtype}{toolwrapperpath}";
print "toolwrapperpath=$toolwrapperpath\n";
my $toolparams =
$testevents{$linkdest}{$testtype}{req_params} ." ".
# $testevents{$linkdest}{$testtype}{opt_params} .
" target $linkdest";
print "Running test: $toolwrapperpath $toolparams\n";
print "Running test: $toolwrapperpath $toolparams\n"
if( $debug > 2 );
exec "sudo $toolwrapperpath $toolparams >$filename 2>&1"
or die "can't exec: $!\n";
}
......@@ -1077,7 +1086,13 @@ sub sendResults($$){
testtype => $results->{testtype},
result => $results->{result},
tstamp => $results->{tstamp},
index => $index );
index => $index
,"toolname" => $results->{toolname}
,"toolwrapperpath" => $results->{toolwrapperpath}
,"tooltype" => $results->{tooltype}
,"req_params" => $results->{req_params}
,"opt_params" => $results->{opt_params}
);
my $result_serial = serialize_hash( \%result );
$socket_snd->send($result_serial);
}
......@@ -1437,7 +1452,7 @@ sub diffstats($$)
my %after = %{$re};
my %diff;
foreach my $key (keys(%before)) {
$diff{$key} = $after{$key} - $before{$key};
$diff{$key} = $after{$key} - $before{$key};
}
return \%diff;
}
......@@ -1447,7 +1462,8 @@ sub printstats($$)
my ($hdr,$stats) = @_;
print("$hdr: utime=", $stats->{utime},
", stime=", $stats->{stime},
", cutime=", $stats->{cutime},
", cstime=", $stats->{cstime}, "\n");
", stime=", $stats->{stime},
", cutime=", $stats->{cutime},
", cstime=", $stats->{cstime}, "\n")
if( $debug > 0 );
}
NOTICE: The "measured_by" field in the table "pair_data" is
a short integer... should be longer (?)
TODO:
2) Change bgmon queuing so that different tools of the
same class do not overwrite each other.
3) Add generalization into the managerclient
4) Make Iperfwrapper do the iperf timeout stuff.
5) move probe "target" into the actual param list? NO:
don't want a new entry in tool_idx for every destination!
6) make "outage detection system" a separate tool.
Data Collector
Gets result packet from path prober, containing extra
info about the tool type and actual parameters used.
Find the proper ToolID in the "ToolParamTable" indexing with
toolname and actual_params. If no match, add new entry
with new ID number.
Manager:
For now, just "hardwires" the tool parameters so that the
managerclient does not need to be changed
Managerclient:
no changes for now.
Wrapper scripts for iperf, fping, and ping:
- iperf
- input: list of arguments, e.g.: duration=5,target=plab222
- output: result, e.g: 500
wrapper executes actual tool with appropriate command-lines
wrapper outputs a key-value pair of results and error codes.
Path Prober:
bgmon.pl receives probe request:
cmdtype: INIT
toolname: iperf
type: one-shot
duration: 12000
wrapper_path: /tmp/iperfwrapper
required_params: target=plab222,period=600,port=6666,duration=5
optional_params:
bgmon.pl stores request into "testevents"
Test is started at appropriate time by running the wrapper_path with the argument fields. Wrapper output read from file, as is currently done.
......@@ -6,6 +6,10 @@
#
#
# TODO: Watch iperf, and kill if a timeout (30 sec) is exceeded.
#
use strict;
my %params = (@ARGV);
......
......@@ -89,8 +89,9 @@ sub serialize_hash($)
my $out = "";
for my $key (keys %hash){
$out .= $separator if( $out ne "" );
$out .= $key.$separator.$hash{$key};
$out .= $separator if( $out ne "" );
$out .= $key.$separator;
$out .= $hash{$key} if( defined $hash{$key} );
}
return $out;
}
......
......@@ -168,6 +168,51 @@ while (1) {
}
}
#
#XXX / TODO: stuff for tool generalization
# hacked in here for now, but source should be from the
# managerclient' message (?)
#
sub addToolSpecificFields
{
my ($cmdRef) = @_;
my $testtype = $cmdRef->{testtype};
my $toolname;
my ($req_params_actual, $opt_params_actual);
if( $testtype eq "bw" ){
$toolname = "iperf";
# $toolwrapperpath = "/tmp/iperfwrapper";
# $tooltype = "one-shot";
$req_params_actual = "port 5002 duration 5";
}elsif( $testtype eq "latency"){
$toolname = "fping";
# $toolwrapperpath = "/tmp/fpingwrapper";
# $tooltype = "one-shot";
$req_params_actual = "timeout 10000 retries 1";
}
my $sth = DBQuery("select * from tool_spec where toolname='$toolname'");
my ( $toolname, $metric, $type, $toolwrapperpath,
$req_params_formal, $opt_params_formal)
= ( $sth->fetchrow_array() );
#XXX / TODO Check that all given actual parameters match the formal params
#
$cmdRef->{toolname} = $toolname;
$cmdRef->{toolwrapperpath} = $toolwrapperpath;
$cmdRef->{tooltype} = $type;
$cmdRef->{req_params} = $req_params_actual;
$cmdRef->{opt_params} = $opt_params_actual;
print "CMD: \n";
foreach my $key (keys %{$cmdRef}){
my $value = ${$cmdRef}{$key};
print " $key=$value \n";
}
}
#
# callback for managerclient requests
......@@ -236,28 +281,6 @@ sub callbackFunc($$$) {
$notification,
"expid");
#
#XXX / TODO: stuff for tool generalization
# hacked in here for now, but source should be from the
# managerclient' message (?)
#
my $toolname;
my $toolwrapperpath;
my $tooltype; #one-shot or continuous
my $req_params; #params required, but universal for each tool instance
my $opt_params = "";
if( $testtype eq "bw" ){
$toolname = "iperf";
$toolwrapperpath = "/tmp/iperfwrapper";
$tooltype = "one-shot";
$req_params = "port 5002 duration 5";
}elsif( $testtype eq "latency"){
$toolname = "fping";
$toolwrapperpath = "/tmp/fpingwrapper";
$tooltype = "one-shot";
$req_params = "timeout 10000 printstats 1 retries 1";
}
if( !defined $newexpid || $newexpid eq "" ){
$newexpid = $bgmonexpt;
......@@ -270,13 +293,9 @@ sub callbackFunc($$$) {
testper => "$period",
duration => "$duration",
managerID => $managerID
,toolname => $toolname
,toolwrapperpath=>$toolwrapperpath
,tooltype => $tooltype
,req_params=> $req_params
,opt_params=> $opt_params
);
addToolSpecificFields(\%cmd);
print "got EDIT: $srcnode, $dstnode: $newexpid\n";
......@@ -310,28 +329,6 @@ sub callbackFunc($$$) {
my $newexpid = event_notification_get_string($handle,
$notification,
"expid");
#
#XXX / TODO: stuff for tool generalization
# hacked in here for now, but source should be from the
# managerclient' message (?)
#
my $toolname;
my $toolwrapperpath;
my $tooltype; #one-shot or continuous
my $req_params; #params required, but universal for each tool instance
my $opt_params = "";
if( $testtype eq "bw" ){
$toolname = "iperf";
$toolwrapperpath = "/tmp/iperfwrapper.pl";
$tooltype = "one-shot";
$req_params = "port 5002 duration 5";
}elsif( $testtype eq "latency"){
$toolname = "fping";
$toolwrapperpath = "/tmp/fpingwrapper.pl";
$tooltype = "one-shot";
$req_params = "timeout 10000 retries 1";
}
if( !defined $newexpid || $newexpid eq "" ){
$newexpid = $bgmonexpt;
......@@ -345,27 +342,18 @@ sub callbackFunc($$$) {
testper => "$testper",
duration => "$duration"
,managerID => $managerID
,toolname => $toolname
,toolwrapperpath=>$toolwrapperpath
,tooltype => $tooltype
,req_params=> $req_params
,opt_params=> $opt_params
);
addToolSpecificFields(\%cmd);
print "got $eventtype:$srcnode,$destnodes,$testtype,".
"$testper,$duration,$managerID,$newexpid\n";
# only automanager can send "forever" edits (duration=0)
# if( $duration > 0 ){ #|| $managerID eq "automanagerclient" ){
# print "sending cmd from $srcnode\n";
# sendcmd( $srcnode, \%cmd );
# }
if( isCmdValid(\%cmd) ){
# print "sending cmd from $srcnode\n";
print "sending cmd to $srcnode on behalf of $managerID\n";
sendcmd( $srcnode, \%cmd );
}else{
print "rejecting $testtype cmd for $srcnode\n";
print "rejecting $testtype cmd for $srcnode\n";
}
}
elsif( $eventtype eq "STOPALL" ){
......@@ -611,6 +599,7 @@ sub getBandwidth() {
}
}
=pod
sub event_poll_amc($){
my ($handle) = @_;
......
......@@ -135,98 +135,111 @@ sub handleincomingmsgs()
#check for pending received results
my @ready = $sel->can_read(1000);
foreach my $handle (@ready){
$socket_rcv->recv( $inmsg, 2048 );
chomp $inmsg;
print "debug: got a udp message: $inmsg\n" if( $debug > 2 );
my %inhash = %{ deserialize_hash( $inmsg )};
$socket_rcv->recv( $inmsg, 2048 );
chomp $inmsg;
print "debug: got a udp message: $inmsg\n" if( $debug > 2 );
my %inhash = %{ deserialize_hash( $inmsg )};
# foreach my $key (keys %inhash){
# print "key=$key\n";
# print "$key \t$inhash{$key}\n";
# }
my ($exp_in, $linksrc, $linkdest, $testtype, $result, $tstamp, $index)
= ($inhash{expid}, $inhash{linksrc}, $inhash{linkdest},
$inhash{testtype}, $inhash{result}, $inhash{tstamp},
$inhash{index});
# if incoming result is not of this expid, return
if( $exp_in ne $expid ){
print "ignored msg from expid=$exp_in\n" if( $debug > 2 );
return;
}
print "\n" if( $debug > 1 );
print("linksrc =$linksrc\n".
"linkdest=$linkdest\n".
"testtype=$testtype\n".
"result =$result\n".
"index =$index\n".
"tstamp =$tstamp\n") if( $debug > 1 );
if( defined $linksrc ){
my $socket_snd;
eval{
$socket_snd =
IO::Socket::INET->new( PeerPort => $sendport,
Proto => 'udp',
PeerAddr => "$linksrc");
};
if( $@ ){
#socket creation was fatal
warn "Socket creation failed: $@\n";
}
my %ack = ( expid => $expid,
cmdtype => "ACK",
index => $index,
tstamp => $tstamp );
if( defined %ack && defined $socket_snd ){
my $ack_serial = serialize_hash( \%ack );
$socket_snd->send($ack_serial);
print "**SENT ACK**\n" if( $debug > 1 );
if( !defined $lasttimestamp{$linksrc}{$index} ||
$tstamp ne $lasttimestamp{$linksrc}{$index} )
{
saveTestToDB(linksrc => $linksrc,
linkdest => $linkdest,
testtype => $testtype,
result => $result,
tstamp => $tstamp );
#clear duplicatecnt for corresponding result index
if( defined($duplicatecnt{$linksrc}{$index}) ){
delete $duplicatecnt{$linksrc}{$index};
}
}else{
print("++++++duplicate data\n".
"linksrc=$linksrc\n".
"linkdest=$linkdest\n".
"testtype =$testtype\n".
"result=$result\n".
"index=$index\n".
"tstamp=$tstamp\n") if( $debug > 0);
#increment duplicatecnt for this src and index number
if( defined($duplicatecnt{$linksrc}{$index}) ){
$duplicatecnt{$linksrc}{$index}++;
#kill off offending node, if > threshold
if( $duplicatecnt{$linksrc}{$index}
> $duplicateKillThresh )
{
killnode($linksrc);
print "KILLING OFF BGMON at $linksrc".
" for index $index\n" if( $debug > 0 );
delete $duplicatecnt{$linksrc};
}
}else{
$duplicatecnt{$linksrc}{$index} = 1;
}
}
$lasttimestamp{$linksrc}{$index} = $tstamp;
}