Commit db3ca65a authored by Robert Ricci's avatar Robert Ricci
Browse files

Revert to revision 1.17 - 1.18, which was rushed for the paper, had a

bunch of bugs, and wasn't working in production use.
parent fc9aae80
......@@ -135,111 +135,98 @@ sub handleincomingmsgs()
#check for pending received results
my @ready = $sel->can_read(1000);
foreach my $handle (@ready){
$socket_rcv->recv( $inmsg, 2048 );
chomp $inmsg;
print "debug: got a udp message: $inmsg\n" if( $debug > 2 );
my %inhash = %{ deserialize_hash( $inmsg )};
$socket_rcv->recv( $inmsg, 2048 );
chomp $inmsg;
print "debug: got a udp message: $inmsg\n" if( $debug > 2 );
my %inhash = %{ deserialize_hash( $inmsg )};
# foreach my $key (keys %inhash){
# print "key=$key\n";
# print "$key \t$inhash{$key}\n";
# }
my ($exp_in, $linksrc, $linkdest, $testtype, $result, $tstamp, $index
, $toolname, $toolwrapperpath, $tooltype, $req_params, $opt_params
)
= ($inhash{expid}, $inhash{linksrc}, $inhash{linkdest},
$inhash{testtype}, $inhash{result}, $inhash{tstamp},
$inhash{index}
,$inhash{toolname}
,$inhash{toolwrapperpath}
,$inhash{tooltype}
,$inhash{req_params}
,$inhash{opt_params}
);
# if incoming result is not of this expid, return
if( $exp_in ne $expid ){
print "ignored msg from expid=$exp_in\n" if( $debug > 2 );
return;
}
print "\n" if( $debug > 1 );
print("linksrc =$linksrc\n".
"linkdest=$linkdest\n".
"testtype=$testtype\n".
"result =$result\n".
"index =$index\n".
"tstamp =$tstamp\n"
,"toolname=$toolname\n"
,"tooltype=$tooltype\n"
,"actualparams=$req_params $opt_params"
) if( $debug > 1 );
if( defined $linksrc ){
my $socket_snd;
eval{
$socket_snd =
IO::Socket::INET->new( PeerPort => $sendport,
Proto => 'udp',
PeerAddr => "$linksrc");
};
if( $@ ){
#socket creation was fatal
warn "Socket creation failed: $@\n";
}
my %ack = ( expid => $expid,
cmdtype => "ACK",
index => $index,
tstamp => $tstamp );
if( defined %ack && defined $socket_snd ){
my $ack_serial = serialize_hash( \%ack );
$socket_snd->send($ack_serial);
print "**SENT ACK**\n" if( $debug > 1 );
if( !defined $lasttimestamp{$linksrc}{$index} ||
$tstamp ne $lasttimestamp{$linksrc}{$index} )
{
saveTestToDB(linksrc => $linksrc,
linkdest => $linkdest,
testtype => $testtype,
result => $result,
tstamp => $tstamp
,toolname => $toolname
,actual_params=>"$req_params $opt_params"
);
#clear duplicatecnt for corresponding result index
if( defined($duplicatecnt{$linksrc}{$index}) ){
delete $duplicatecnt{$linksrc}{$index};
}
}else{
print("++++++duplicate data\n".
"linksrc=$linksrc\n".
"linkdest=$linkdest\n".
"testtype =$testtype\n".
"result=$result\n".
"index=$index\n".
"tstamp=$tstamp\n") if( $debug > 0);
#increment duplicatecnt for this src and index number
if( defined($duplicatecnt{$linksrc}{$index}) ){
$duplicatecnt{$linksrc}{$index}++;
#kill off offending node, if > threshold
if( $duplicatecnt{$linksrc}{$index}
> $duplicateKillThresh )
{
killnode($linksrc);
print "KILLING OFF BGMON at $linksrc".
" for index $index\n" if( $debug > 0 );
delete $duplicatecnt{$linksrc};
}
}else{
$duplicatecnt{$linksrc}{$index} = 1;
}
}
$lasttimestamp{$linksrc}{$index} = $tstamp;
}
}
my ($exp_in, $linksrc, $linkdest, $testtype, $result, $tstamp, $index)
= ($inhash{expid}, $inhash{linksrc}, $inhash{linkdest},
$inhash{testtype}, $inhash{result}, $inhash{tstamp},
$inhash{index});
# if incoming result is not of this expid, return
if( $exp_in ne $expid ){
print "ignored msg from expid=$exp_in\n" if( $debug > 2 );
return;
}
print "\n" if( $debug > 1 );
print("linksrc =$linksrc\n".
"linkdest=$linkdest\n".
"testtype=$testtype\n".
"result =$result\n".
"index =$index\n".
"tstamp =$tstamp\n") if( $debug > 1 );
if( defined $linksrc ){
my $socket_snd;
eval{
$socket_snd =
IO::Socket::INET->new( PeerPort => $sendport,
Proto => 'udp',
PeerAddr => "$linksrc");
};
if( $@ ){
#socket creation was fatal
warn "Socket creation failed: $@\n";
}
my %ack = ( expid => $expid,
cmdtype => "ACK",
index => $index,
tstamp => $tstamp );
if( defined %ack && defined $socket_snd ){
my $ack_serial = serialize_hash( \%ack );
$socket_snd->send($ack_serial);
print "**SENT ACK**\n" if( $debug > 1 );
if( !defined $lasttimestamp{$linksrc}{$index} ||
$tstamp ne $lasttimestamp{$linksrc}{$index} )
{
saveTestToDB(linksrc => $linksrc,
linkdest => $linkdest,
testtype => $testtype,
result => $result,
tstamp => $tstamp );
#clear duplicatecnt for corresponding result index
if( defined($duplicatecnt{$linksrc}{$index}) ){
delete $duplicatecnt{$linksrc}{$index};
}
}else{
print("++++++duplicate data\n".
"linksrc=$linksrc\n".
"linkdest=$linkdest\n".
"testtype =$testtype\n".
"result=$result\n".
"index=$index\n".
"tstamp=$tstamp\n") if( $debug > 0);
#increment duplicatecnt for this src and index number
if( defined($duplicatecnt{$linksrc}{$index}) ){
$duplicatecnt{$linksrc}{$index}++;
#kill off offending node, if > threshold
if( $duplicatecnt{$linksrc}{$index}
> $duplicateKillThresh )
{
killnode($linksrc);
print "KILLING OFF BGMON at $linksrc".
" for index $index\n" if( $debug > 0 );
delete $duplicatecnt{$linksrc};
}
}else{
$duplicatecnt{$linksrc}{$index} = 1;
}
}
$lasttimestamp{$linksrc}{$index} = $tstamp;
}
}
}
}
......@@ -255,143 +242,112 @@ sub saveTestToDB()
my ($srcsite, $srcnode, $dstsite, $dstnode);
if( !exists $siteids{$results{linksrc}} ){
@tmp = DBQuery("SELECT site_idx FROM site_mapping ".
"WHERE node_id='$results{linksrc}'") ->fetch;
$srcsite = $tmp[0][0];
$siteids{$results{linksrc}} = $srcsite;
@tmp = DBQuery("SELECT site_idx FROM site_mapping ".
"WHERE node_id='$results{linksrc}'") ->fetch;
$srcsite = $tmp[0][0];
$siteids{$results{linksrc}} = $srcsite;
}else{
$srcsite = $siteids{$results{linksrc}};
$srcsite = $siteids{$results{linksrc}};
}
if( !exists $nodeids{$results{linksrc}} ){
@tmp = DBQuery("SELECT node_idx FROM site_mapping ".
"WHERE node_id='$results{linksrc}'") ->fetch;
$srcnode = $tmp[0][0];
$nodeids{$results{linksrc}} = $srcnode;
@tmp = DBQuery("SELECT node_idx FROM site_mapping ".
"WHERE node_id='$results{linksrc}'") ->fetch;
$srcnode = $tmp[0][0];
$nodeids{$results{linksrc}} = $srcnode;
}else{
$srcnode = $nodeids{$results{linksrc}};
$srcnode = $nodeids{$results{linksrc}};
}
if( !exists $siteids{$results{linkdest}} ){
@tmp = DBQuery("SELECT site_idx FROM site_mapping ".
"WHERE node_id='$results{linkdest}'") ->fetch;
$dstsite = $tmp[0][0];
$siteids{$results{linkdest}} = $dstsite;
@tmp = DBQuery("SELECT site_idx FROM site_mapping ".
"WHERE node_id='$results{linkdest}'") ->fetch;
$dstsite = $tmp[0][0];
$siteids{$results{linkdest}} = $dstsite;
}else{
$dstsite = $siteids{$results{linkdest}};
$dstsite = $siteids{$results{linkdest}};
}
if( !exists $nodeids{$results{linkdest}} ){
@tmp = DBQuery("SELECT node_idx FROM site_mapping ".
"WHERE node_id='$results{linkdest}'") ->fetch;
$dstnode = $tmp[0][0];
$nodeids{$results{linkdest}} = $dstnode;
@tmp = DBQuery("SELECT node_idx FROM site_mapping ".
"WHERE node_id='$results{linkdest}'") ->fetch;
$dstnode = $tmp[0][0];
$nodeids{$results{linkdest}} = $dstnode;
}else{
$dstnode = $nodeids{$results{linkdest}};
$dstnode = $nodeids{$results{linkdest}};
}
my $testtype = $results{'testtype'};
my $result = $results{'result'};
my $tstamp = $results{'tstamp'};
# my $latency = ($testtype eq "latency" ? "$result" : "NULL");
# my $loss = ($testtype eq "loss" ? "$result" : "NULL");
# my $bw = ($testtype eq "bw" ? "$result" : "NULL");
# my $measured_by = 0; #assigned to later
my $latency = "NULL";
my $loss = "NULL";
my $bw = "NULL";
my $measured_by = "0";
my %resulthash = (split(/[=,]/, $result));
if( $testtype eq "latency" ){
$latency = ($resulthash{error} !=0 ?
$resulthash{error} : $resulthash{latency});
}elsif( $testtype eq "bw" ){
$bw = ($resulthash{error} !=0 ?
$resulthash{error} : $resulthash{bw});
}
my $latency = ($testtype eq "latency" ? "$result" : "NULL");
my $loss = ($testtype eq "loss" ? "$result" : "NULL");
my $bw = ($testtype eq "bw" ? "$result" : "NULL");
# TODO: hacky... log "outage" markers of 100% loss
elsif( $testtype eq "outage" && $result eq "down"){
$loss = "1";
print " LOSS = 100%\n\n" if( $debug > 2 );
if( $testtype eq "outage" && $result eq "down"){
$loss = "1";
print " LOSS = 100%\n\n" if( $debug > 2 );
}
#
# Find proper Measurement_id
# --based on test toolname and actualparams
# xxx / TODO: make a hash/cache these, so it doesn't query the DB every time
@tmp = DBQuery("SELECT idx FROM tool_idx ".
"WHERE toolname='$results{toolname}' and ".
"actual_params='$results{actual_params}'"
) -> fetch;
$measured_by = $tmp[0][0];
print "Result to be written to DB: $testtype, bw=$bw, lat=$latency, ".
"loss=$loss, tool=$measured_by\n" if( $debug > 3 );
#
#XXX / TODO: do we automatically add an entry to this table if
# no match is found? If not, what do we do if there is no match?
#
#
# Check for valid DB id's.. RETURN from sub if invalid
if( $srcsite eq "" || $srcnode eq "" || $dstsite eq "" || $dstnode eq "" ){
warn "No results matching node id's $results{linksrc} and/or ".
"$results{linkdest}. Results:\n";
warn "srcsite=$srcsite\n";
warn "srcnode=$srcnode\n";
warn "dstsite=$dstsite\n";
warn "dstnode=$dstnode\n";
return;
}
warn "No results matching node id's $results{linksrc} and/or ".
"$results{linkdest}. Results:\n";
warn "srcsite=$srcsite\n";
warn "srcnode=$srcnode\n";
warn "dstsite=$dstsite\n";
warn "dstnode=$dstnode\n";
return;
}
if ($bw eq "") {
my $src = $results{'linksrc'};
my $dst = $results{'linkdest'};
warn("BW came in as null string at $tstamp for $src,$dst\n");
return;
my $src = $results{'linksrc'};
my $dst = $results{'linkdest'};
warn("BW came in as null string at $tstamp for $src,$dst\n");
return;
}
if ($latency eq "") {
my $src = $results{'linksrc'};
my $dst = $results{'linkdest'};
warn("Latency came in as null string at $tstamp for $src,$dst\n");
return;
my $src = $results{'linksrc'};
my $dst = $results{'linkdest'};
warn("Latency came in as null string at $tstamp for $src,$dst\n");
return;
}
if ($batchsize == 0) {
$insertions =
"INSERT INTO pair_data (srcsite_idx, srcnode_idx, ".
"dstsite_idx, dstnode_idx, unixstamp, ".
"latency, loss, bw, measured_by) values ";
$insertions =
"INSERT INTO pair_data (srcsite_idx, srcnode_idx, ".
"dstsite_idx, dstnode_idx, unixstamp, ".
"latency, loss, bw) values ";
}
$insertions .= ","
if ($batchsize);
if ($batchsize);
$insertions .=
"($srcsite, $srcnode, $dstsite, $dstnode, $tstamp, ".
" $latency, $loss, $bw, $measured_by)";
"($srcsite, $srcnode, $dstsite, $dstnode, $tstamp, ".
" $latency, $loss, $bw)";
push @queued_data, sprintf("RECORD ADDED $results{linksrc} $results{linkdest} : %.6f", $results{tstamp});
$batchsize++;
SendBatchedInserts()
if ($batchsize > $maxbatch);
if ($batchsize > $maxbatch);
}
sub SendBatchedInserts()
{
if ($batchsize) {
DBQueryWarn($insertions)
if (!$impotent);
print "$insertions\n"
if ($debug > 2);
$lastinsert = time();
DBQueryWarn($insertions)
if (!$impotent);
print "$insertions\n"
if ($debug > 2);
$lastinsert = time();
}
my ($seconds, $microseconds) = gettimeofday;
my $time = $seconds + $microseconds/1000000;
foreach my $d (@queued_data) {
printf "$d %.6f\n", $time;
printf "$d %.6f\n", $time;
}
$batchsize = 0;
$insertions = "";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment