aptevent_daemon.in 9.92 KB
Newer Older
1 2
#!/usr/bin/perl -w
#
3
# Copyright (c) 2008-2018 University of Utah and the Flux Group.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
# 
# {{{GENIPUBLIC-LICENSE
# 
# GENI Public License
# 
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and/or hardware specification (the "Work") to
# deal in the Work without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Work, and to permit persons to whom the Work
# is furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
# 
# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
# IN THE WORK.
# 
# }}}
#
use strict;
use English;
use Getopt::Std;
use Data::Dumper;
use JSON;

#
# Look for APT things that need to be dealt with.
#
sub usage()
{
    print "Usage: aptevent_daemon [-d] [-s] [-n]\n";
    exit(1);
}
44
my $optlist   = "dnsv";
45 46
my $debug     = 0;
my $impotent  = 0;
47
my $verbose   = 0;
48
my $count     = 0;
49 50 51 52 53 54 55 56

#
# Configure variables
#
my $TB		     = "@prefix@";
my $TBOPS            = "@TBOPSEMAIL@";
my $TBLOGS           = "@TBLOGSEMAIL@";
my $MAINSITE         = @TBMAINSITE@;
57
my $BOSSNODE         = "@BOSSNODE@";
58
my $LOGFILE          = "$TB/log/aptevent_daemon.log";
59 60 61 62
# Portal SSL pubsubd running on this host:port
my $CLUSTER_PORTAL          = "@CLUSTER_PORTAL@";
my $CLUSTER_PUBSUBD_SSLPORT = "@CLUSTER_PUBSUBD_SSLPORT@";
my $CLUSTER_PUBSUBD_ALTPORT = "@CLUSTER_PUBSUBD_ALTPORT@";
63 64 65 66 67 68
	  
# un-taint path
$ENV{'PATH'} = '/bin:/usr/bin:/usr/local/bin:/usr/site/bin';
delete @ENV{'IFS', 'CDPATH', 'ENV', 'BASH_ENV'};

# Protos
69 70
sub HandleSliverStatus($$$$);
sub HandleSliceStatus($$$);
71
sub HandleImageStatus($$$);
72
sub HandleFrisbeeStatus($$$$);
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
sub fatal($);
	  
#
# Turn off line buffering on output
#
$| = 1; 

if ($UID != 0) {
    fatal("Must be root to run this script\n");
}

#
# Check args early so we get the right DB.
#
my %options = ();
if (! getopts($optlist, \%options)) {
    usage();
}
if (defined($options{"d"})) {
    $debug = 1;
}
if (defined($options{"n"})) {
    $impotent = 1;
}
97 98 99
if (defined($options{"v"})) {
    $verbose = 1;
}
100 101 102 103 104 105 106 107

# Do this early so that we talk to the right DB.
use vars qw($GENI_DBNAME);
$GENI_DBNAME = "geni-cm";

# Load the Testbed support stuff.
use lib "@prefix@/lib";
use emdb;
108
require GeniUtil;
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
require GeniDB;
require GeniSlice;
require GeniSliver;
use libtestbed;
use emutil;
use libEmulab;
use APT_Instance;
use event;

if (!$impotent) {
    if (CheckDaemonRunning("aptevent_daemon")) {
	fatal("Not starting another aptevent daemon!");
    }
    # Go to ground.
    if (! $debug) {
	if (TBBackGround($LOGFILE)) {
	    exit(0);
	}
    }
    if (MarkDaemonRunning("aptevent_daemon")) {
	fatal("Could not mark daemon as running!");
    }
}

133 134 135
# We receive a lot of events but we do not send any.
event_set_sockbufsizes(1024 * 32, 1024 * 192);

136
#
137 138 139 140 141 142 143
# At the Mothership (Cloudlab Portal) we get events from the SSL
# enabled version of pubsubd, which is getting events from all of the
# clusters including the local cluster. 
#
# Otherwise, we listen on the regular event server since that is where
# the events first go anyway. It is only clusters that are part of
# Cloudlab that are forwarding to to the Mothership. 
144
#
145
my $url = "elvin://localhost";
146 147
if ($CLUSTER_PORTAL ne "" && $CLUSTER_PORTAL eq $BOSSNODE) {
    $url .= ":${CLUSTER_PUBSUBD_ALTPORT}";
148 149
}
my $localhandle = event_register($url, 0);
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
if (!$localhandle) {
    fatal("Unable to register with event system");
}

#
# Subscribe to all events.
#
my $tuple = address_tuple_alloc();
if (!$tuple) {
    fatal("Could not allocate an address tuple");
}
if (!event_subscribe($localhandle, \&callback, $tuple)) {
    fatal("Could not subscribe to all events");
}

#
# Flag to know when there are no more events to process. 
#
my $gotone;
169
my @notifications = ();
170 171 172

sub callback($$$)
{
173
    my ($handle, $notification, $data) = @_;
174 175
    $gotone++;

176 177 178 179 180 181 182 183 184 185 186
    my $clone = event_notification_clone($handle, $notification);
    if (!$clone) {
	print STDERR "Could not clone notification\n";
	return;
    }
    unshift(@notifications, $clone);
}

sub HandleNotification($$)
{
    my ($handle, $note) = @_;
187 188 189 190
    my $time      = time();
    my $site      = event_notification_get_site($handle, $note);
    my $slice     = event_notification_get_string($handle, $note, "slice");

191 192 193 194 195
    # Ignore extraneous events. They happen when listening to the local
    # cluster pubsubd.
    return
	if (! (defined($site) && defined($slice)));

196 197 198 199
    my $urn       = event_notification_get_string($handle, $note, "urn");
    my $type      = event_notification_get_string($handle, $note, "type");
    my $details   = event_notification_get_string($handle, $note, "details");

200 201 202 203 204 205 206 207
    #
    # Not sure why this is happening, but sometime the slice urn has
    # extra double quotes around it. Kill them so the instance lookup
    # does not fail. Someday I will figure out why this is happening.
    #
    if ($slice =~ /^\"(.*)\"$/) {
	$slice = $1;
    }
208 209 210
    # Debugging.
    return
	if (0 && $slice !~ /stoller/);
211
    $count++;
212
    
213
    if ($debug || $verbose || $slice =~ /XXpurpnurp/) {
214
	print "Event: $count $time $site $type $urn $slice $details\n";
Leigh Stoller's avatar
Leigh Stoller committed
215
    }
216 217 218 219
    my $instance = APT_Instance->LookupBySlice($slice);
    return
	if (!defined($instance));

220 221 222 223
    if (0) {
	goto done;
    }

224
    if ($type eq "SLIVERSTATUS") {
225 226 227 228 229
	HandleSliverStatus($site, $urn, $instance, $details);
	goto done;
    }
    elsif ($type eq "SLICESTATUS") {
	HandleSliceStatus($site, $instance, $details);
230 231 232 233 234 235
	goto done;
    }
    elsif ($type eq "IMAGESTATUS") {
	HandleImageStatus($site, $instance, $details);
	goto done;
    }
236
    elsif ($type eq "FRISBEESTATUS") {
237
	HandleFrisbeeStatus($site, $urn, $instance, $details);
238 239
	goto done;
    }
240
  done:
241 242 243
    # This HAS TO BE DONE, to break a circular dependency that causes
    # the daemon to grow and grow till it consumes boss.
    $instance->Purge();
244 245
    emutil::FlushCaches();
    GeniUtil::FlushCaches();
246 247 248 249 250
}

#
# Handle an Sliverstatus event.
#
251
sub HandleSliverStatus($$$$)
252
{
253 254
    my ($site, $sliver_urn, $instance, $json) = @_;

255
    #print "HandleSliverStatus: $site, $sliver_urn, $instance\n";
256 257 258 259 260 261 262

    if (exists($instance->AggregateHash()->{$site})) {
	my $sliver = $instance->AggregateHash()->{$site};
	if ($impotent) {
	    print "Would update sliver status for $sliver from details\n";
	}
	else {
263
	    if ($debug || $verbose) {
264
		print "Updating sliver status for sliver from $json\n";
265
	    }
266
	    my $hash = eval { decode_json($json) };
267
	    if ($@) {
268
		print STDERR "Could not decode json data: $json\n";
269 270
		return;
	    }
271 272 273 274 275 276 277 278 279 280 281 282
	    $sliver->UpdateSliverStatus($sliver_urn, $hash, $json);
	}
    }
}

#
# Handle an Slice status event.
#
sub HandleSliceStatus($$$)
{
    my ($site, $instance, $json) = @_;

283
    #print "HandleSliceStatus: $site, $instance\n";
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299

    if (exists($instance->AggregateHash()->{$site})) {
	my $sliver = $instance->AggregateHash()->{$site};
	if ($impotent) {
	    print "Would update slice status for $sliver from details\n";
	}
	else {
	    if ($debug || $verbose) {
		print "Updating slice status for sliver from $json\n";
	    }
	    my $hash = eval { decode_json($json) };
	    if ($@) {
		print STDERR "Could not decode json data: $json\n";
		return;
	    }
	    $sliver->UpdateSliceStatus($hash, $json);
300 301 302 303 304 305 306 307 308
	}
    }
}

#
# Handle an IMAGESTATUS event.
#
sub HandleImageStatus($$$)
{
309
    my ($site, $instance, $json) = @_;
310 311 312 313 314 315

    if (exists($instance->AggregateHash()->{$site})) {
	if ($impotent) {
	    print "Would update image status for $instance from details\n";
	}
	else {
316
	    if ($debug || $verbose) {
317
		print "Updating image status for instance from $json\n";
318
	    }
319
	    my $hash = eval { decode_json($json) };
320
	    if ($@) {
321
		print STDERR "Could not decode json data: $json\n";
322 323
		return;
	    }
Leigh Stoller's avatar
Leigh Stoller committed
324
	    $instance->UpdateImageStatus($hash, $json);
325 326 327 328
	}
    }
}

329 330 331
#
# Handle an FRISBEESTATUS event.
#
332
sub HandleFrisbeeStatus($$$$)
333
{
334
    my ($site, $sliver_urn, $instance, $json) = @_;
335 336 337

    if (exists($instance->AggregateHash()->{$site})) {
	my $sliver = $instance->AggregateHash()->{$site};
338
	if ($impotent) {
339 340 341
	    print "Would update frisbee status for $sliver from details\n";
	}
	else {
342
	    if ($debug || $verbose) {
343
		print "Updating frisbee status for sliver from $json\n";
344
	    }
345
	    my $hash = eval { decode_json($json) };
346
	    if ($@) {
347
		print STDERR "Could not decode json data: $json\n";
348 349
		return;
	    }
350
	    $sliver->UpdateFrisbeeStatusNew($sliver_urn, $hash, $json);
351 352 353 354
	}
    }
}

355 356 357 358 359 360 361 362 363 364 365 366 367 368
#
# Setup a signal handler for newsyslog.
#
sub handler()
{
    my $SAVEEUID = $EUID;
    
    $EUID = 0;
    ReOpenLog($LOGFILE);
    $EUID = $SAVEEUID;
}
$SIG{HUP} = \&handler
    if (! ($debug || $impotent));

369 370 371 372 373 374 375 376 377 378
#
# Loop processing events.
#
while (1)
{
    $gotone = 1;
    while ($gotone) {
	$gotone = 0;
	event_poll($localhandle);
    }
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
    if (@notifications) {
	while (@notifications) {
	    my $notification = pop(@notifications);
	    HandleNotification($localhandle, $notification);
	    event_notification_free($localhandle, $notification);

	    #
	    # Keep the incoming queue drained! If the socket buffer
	    # fills up cause we are running slow, we lose events and
	    # pubsubd starts throwing errors back to the sender.
	    #
	    $gotone = 1;
	    while ($gotone) {
		$gotone = 0;
		event_poll($localhandle);
	    }
	}
    }
    event_poll_blocking($localhandle, 1000);
398 399
}

400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
exit(0);

sub fatal($)
{
    my ($msg) = @_;

    if (! ($debug || $impotent)) {
	#
	# Send a message to the testbed list. 
	#
	SENDMAIL($TBOPS,
		 "APT Event daemon died",
		 $msg,
		 $TBOPS);
    }
    MarkDaemonStopped("aptevent_daemon")
	if (!$impotent);

    die("*** $0:\n".
	"    $msg\n");
}