Commit 31a2b5bd authored by Dan Gebhardt's avatar Dan Gebhardt

Looks like the UDP communication is working.

parent d81cf406
......@@ -13,12 +13,10 @@
use lib '/usr/testbed/lib';
use libtbdb;
#require Mysql;
#use libpelabdb;
use event;
use Getopt::Std;
use strict;
use Socket;
use IO::Socket::INET;
use IO::Select;
# node and site id caches
my %nodeids;
......@@ -34,27 +32,29 @@ my $maxbatch = 30;
my $maxidletime= 5; # Seconds before forced insert.
my $lastinsert = 0; # Timestamp.
my %lasttimestamp; #prevents adding duplicate entries in DB
#
# Turn off line buffering on output
#
$| = 1;
sub usage {
print "Usage: $0 [-s server] [-p port] [-e pid/eid] [-d] [-i]\n";
print "Usage: $0 [-p receiveport] [-a sendport] [-e pid/eid] [-d] [-i]\n";
return 1;
}
my $debug = 0;
my $impotent = 0;
my $evexpt = "__none";
my ($port, $sendport, $expid);
my %opt = ();
if (!getopts("s:p:e:dih", \%opt)) {
if (!getopts("p:a:e:dih", \%opt)) {
exit &usage;
}
if ($opt{p}) { $port = $opt{p}; } else { $port = 5051; }
if ($opt{p}) { $sendport = $opt{p}; } else { $sendport = 5050; }
if ($opt{h}) { exit &usage; }
if ($opt{e}) { $evexpt = $opt{e}; }
if ($opt{e}) { $expid = $opt{e}; } else { $expid = "none"; }
if ($opt{d}) { $debug = 1; }
if ($opt{i}) { $impotent = 1; }
......@@ -73,40 +73,16 @@ if( $DBPWD =~ /^([\w]*)\s([\w]*)$/ ) {
}else{
fatal("Bad chars in password!");
}
#print "PWD: $DBPWD\n";
#connect to database
my ($DB_data, $DB_sitemap);
TBDBConnect($DBNAME,$DBUSER,$DBPWD);
#DBConnect(\$DB_data, $DBNAME,$DBUSER,$DBPWD);
#DBConnect(\$DB_sitemap, $DBNAME,$DBUSER,$DBPWD);
my ($server,$port);
if ($opt{s}) { $server = $opt{s}; } else { $server = "localhost"; }
if ($opt{p}) { $port = $opt{p}; }
my $URL = "elvin://$server";
if ($port) { $URL .= ":$port"; }
my $handle = event_register($URL,0);
if (!$handle) { die "Unable to register with event system\n"; }
my $tuple = address_tuple_alloc();
if (!$tuple) { die "Could not allocate an address tuple\n"; }
#watch for notifications to ops
%$tuple = ( host => $event::ADDRESSTUPLE_ALL,
objtype => "BGMON",
objname => "ops"
, expt => $evexpt
, scheduler => 1
);
if (!event_subscribe($handle,\&callbackFunc,$tuple)) {
die "Could not subscribe to event\n";
}
my $socket_rcv = IO::Socket::INET->new( LocalPort => $port,
Proto => 'udp' );
my $sel = IO::Select->new();
$sel->add($socket_rcv);
#############################################################################
# Note a difference from tbrecv.c - we don't yet have event_main() functional
......@@ -118,119 +94,79 @@ if (!event_subscribe($handle,\&callbackFunc,$tuple)) {
while (1) {
#check for pending received events
event_poll_blocking($handle, 1000);
# event_poll_blocking($handle, 1000);
SendBatchedInserts()
if ($batchsize && (time() - $lastinsert) > $maxidletime);
}
#############################################################################
handleincomingmsgs();
if (event_unregister($handle) == 0) {
die "Unable to unregister with event system\n";
}
#############################################################################
exit(0);
sub callbackFunc($$$) {
my ($handle,$notification,$data) = @_;
sub handleincomingmsgs()
{
my $inmsg;
#check for pending received events
my @ready = $sel->can_read(1000);
foreach my $handle (@ready){
$socket_rcv->recv( $inmsg, 2048 );
# my %inhash = %{ Storable::thaw $inmsg};
my %inhash = %{ deserialize_hash( $inmsg )};
# foreach my $key (keys %inhash){
# print "key=$key\n";
# print "$key \t$inhash{$key}\n";
# }
my ($expid, $linksrc, $linkdest, $testtype, $result, $tstamp, $index)
= ($inhash{expid}, $inhash{linksrc}, $inhash{linkdest},
$inhash{testtype}, $inhash{result}, $inhash{tstamp},
$inhash{index});
my $time = time();
my $site = event_notification_get_site($handle, $notification);
my $expt = event_notification_get_expt($handle, $notification);
my $group = event_notification_get_group($handle, $notification);
my $host = event_notification_get_host($handle, $notification);
my $objtype = event_notification_get_objtype($handle, $notification);
my $objname = event_notification_get_objname($handle, $notification);
my $eventtype = event_notification_get_eventtype($handle,
$notification);
# print "Event: $time $site $expt $group $host $objtype $objname " .
# "$eventtype\n";
print "EVENT: $time $objtype $eventtype\n"
if ($debug);
my $linksrc = event_notification_get_string($handle,
$notification,
"linksrc");
my $linkdest = event_notification_get_string($handle,
$notification,
"linkdest");
my $testtype = event_notification_get_string($handle,
$notification,
"testtype");
my $result = event_notification_get_string($handle,
$notification,
"result");
my $tstamp = event_notification_get_string($handle,
$notification,
"tstamp");
my $index = event_notification_get_string($handle,
$notification,
"index");
#change values and/or initialize
if ( $debug && $eventtype eq "RESULT" ){
print "***GOT RESULT***\n";
}
print("linksrc=$linksrc\n".
"linkdest=$linkdest\n".
"testtype =$testtype\n".
"result=$result\n".
"index=$index\n".
"tstamp=$tstamp\n")
if ($debug);
saveTestToDB(linksrc => $linksrc,
linkdest => $linkdest,
testtype => $testtype,
result => $result,
tstamp => $tstamp );
if (!exists($ipaddrs{$linksrc})) {
my (undef,undef,undef,undef,@ips) = gethostbyname("$linksrc");
if (!@ips) {
warn "Could not map $linksrc to its ipaddr\n";
return;
"tstamp=$tstamp\n");
if( defined $linksrc ){
my $socket_snd =
IO::Socket::INET->new( PeerPort => $sendport,
Proto => 'udp',
PeerAddr => "$linksrc");
my %ack = ( expid => $expid,
cmdtype => "ACK",
index => $index );
# my $ack_serial = Storable::freeze \%ack;
my $ack_serial = serialize_hash( \%ack );
$socket_snd->send($ack_serial);
print "**SENT ACK**\n";
#=pod
if( !defined $lasttimestamp{$linksrc}{$index} ||
$tstamp ne $lasttimestamp{$linksrc}{$index} )
{
saveTestToDB(linksrc => $linksrc,
linkdest => $linkdest,
testtype => $testtype,
result => $result,
tstamp => $tstamp );
}else{
print "++++++duplicate data\n";
}
$ipaddrs{$linksrc} = inet_ntoa($ips[0]);
}
my $ipaddr = $ipaddrs{$linksrc};
my $tuple = address_tuple_alloc();
if (!$tuple) {
warn "Could not allocate an address tuple for reply to $linksrc\n";
return;
}
%$tuple = ( objtype => "BGMON",
objname => "ops",
eventtype => "ACK",
expt => "__none",
host => "$ipaddr",
);
my $reply_notification = event_notification_alloc($handle, $tuple);
if (!$reply_notification) {
warn "Could not allocate notification for reply to $linksrc\n";
return;
$lasttimestamp{$linksrc}{$index} = $tstamp;
#=cut
}
# So the sender knows which one we actually got.
event_notification_put_string($handle,
$reply_notification, "index", "$index");
}
print "Sending ack event to $ipaddr\n"
if ($debug);
if (!event_notify($handle, $reply_notification)) {
warn("could not send reply notification to $linksrc");
}
event_notification_free($handle, $reply_notification);
}
#############################################################################
sub saveTestToDB()
......@@ -334,3 +270,38 @@ sub SendBatchedInserts()
$batchsize = 0;
$insertions = "";
}
#############################################################################
#
# Custom sub to turn a hash into a string. Hashes must not contain
# the substring of $separator anywhere!!!
#
sub serialize_hash($)
{
my ($hashref) = @_;
my %hash = %$hashref;
my $separator = "::";
my $out = "";
for my $key (keys %hash){
$out .= $separator if( $out ne "" );
$out .= $key.$separator.$hash{$key};
}
return $out;
}
sub deserialize_hash($)
{
my ($string) = @_;
my $separator = "::";
my %hashout;
my @tokens = split( /$separator/, $string );
for( my $i=0; $i<@tokens; $i+=2 ){
$hashout{$tokens[$i]} = $tokens[$i+1];
}
return \%hashout;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment