Commit e5d36e0d authored by Leigh B Stoller's avatar Leigh B Stoller
Browse files

Rework how we store the sliver/slice status from the clusters:

In the beginning, the number and size of experiments was small, and so
storing the entire slice/sliver status blob as json in the web task was
fine, even though we had to lock tables to prevent races between the
event updates and the local polling.

But lately the size of those json blobs is getting huge and the lock is
bogging things down, including not being able to keep up with the number
of events coming from all the clusters, we get really far behind.

So I have moved the status blobs out of the per-instance web task and
into new tables, once per slice and one per node (sliver). This keeps
the blobs very small and thus the lock time very small. So now we can
keep up with the event stream.

If we grow big enough that this problem comes big enough, we can switch
to innodb for the per-sliver table and do row locking instead of table
locking, but I do not think that will happen
parent 60e65004
......@@ -29,6 +29,7 @@ use English;
use Data::Dumper;
use Date::Parse;
use POSIX qw(tmpnam);
use JSON;
use Exporter;
use vars qw(@ISA @EXPORT $AUTOLOAD
$RECORDHISTORY_TERMINATED $RECORDHISTORY_EXPIRED
......@@ -135,6 +136,7 @@ sub Lookup($$)
$self->{'BRAND'} = Brand->Create($self->{'INSTANCE'}->{'servername'});
$self->{'HASH'} = {};
$self->{'AGGREGATES'} = {};
$self->{'WEBTASK'} = undef;
bless($self, $class);
#
......@@ -150,27 +152,6 @@ sub Lookup($$)
APT_Instance::Aggregate->GenTemp($self)};
}
$self->{'AGGREGATES'} = $aggregates;
#
# Grab the webtask. Backwards compat mode, see if there is one associated
# with the object, use that. Otherwise create a new one.
#
my $webtask;
if (defined($self->webtask_id())) {
$webtask = WebTask->Lookup($self->webtask_id());
}
if (!defined($webtask)) {
$webtask = WebTask->LookupByObject($self->uuid());
if (!defined($webtask)) {
$webtask = WebTask->Create();
return undef
if (!defined($webtask));
}
$self->Update({"webtask_id" => $webtask->task_id()}) == 0
or return undef;
}
$self->{'WEBTASK'} = $webtask;
# Add to cache.
$instances{$self->uuid()} = $self;
......@@ -204,10 +185,38 @@ sub Brand($) { return $_[0]->{'BRAND'}; }
sub isAPT($) { return $_[0]->Brand()->isAPT() ? 1 : 0; }
sub isCloud($) { return $_[0]->Brand()->isCloud() ? 1 : 0; }
sub isPNet($) { return $_[0]->Brand()->isPNet() ? 1 : 0; }
sub webtask($) { return $_[0]->{'WEBTASK'}; }
sub AggregateList($) { return values(%{ $_[0]->{'AGGREGATES'} }); }
sub AggregateHash($) { return $_[0]->{'AGGREGATES'}; }
#
# Grab the webtask. Backwards compat mode, see if there is one associated
# with the object, use that. Otherwise create a new one.
#
sub webtask($)
{
my ($self) = @_;
my $webtask;
return $self->{'WEBTASK'}
if (defined($self->{'WEBTASK'}));
if (defined($self->webtask_id())) {
$webtask = WebTask->Lookup($self->webtask_id());
}
if (!defined($webtask)) {
$webtask = WebTask->LookupByObject($self->uuid());
if (!defined($webtask)) {
$webtask = WebTask->Create();
return undef
if (!defined($webtask));
}
$self->Update({"webtask_id" => $webtask->task_id()}) == 0
or return undef;
}
$self->{'WEBTASK'} = $webtask;
return $webtask;
}
# Break circular reference someplace to avoid exit errors.
sub DESTROY {
my $self = shift;
......@@ -368,7 +377,8 @@ sub LookupBySlice($$)
if (!$query_result || !$query_result->numrows);
my ($uuid) = $query_result->fetchrow_array();
return APT_Instance->Lookup($uuid);
my $foo = APT_Instance->Lookup($uuid);
return $foo;
}
#
......@@ -1213,6 +1223,7 @@ use WebTask;
use libtestbed;
use Carp;
use POSIX qw(tmpnam);
use JSON;
use English;
use GeniResponse;
use Genixmlrpc;
......@@ -1244,6 +1255,8 @@ sub Lookup($$$)
$self->{'DBROW'} = $query_result->fetchrow_hashref();
$self->{'HASH'} = {};
$self->{'INSTANCE'} = $instance;
$self->{'WEBTASK'} = undef;
$self->{'STATUS'} = undef;
bless($self, $class);
# Handy;
......@@ -1251,13 +1264,6 @@ sub Lookup($$$)
# Kludge
$self->{'ISAL2S'} = ($self->aggregate_urn() =~ /al2s/ ? 1 : 0);
my $webtask = WebTask->Lookup($self->webtask_id());
return $self
if (!defined($webtask));
$self->{'WEBTASK'} = $webtask;
$webtask->AutoStore(1);
return $self;
}
......@@ -1283,11 +1289,75 @@ AUTOLOAD {
carp("No such slot '$name' field in class $type");
return undef;
}
sub webtask($) { return $_[0]->{'WEBTASK'}; }
sub instance($) { return $_[0]->{'INSTANCE'}; }
sub domain($) { return $_[0]->{'AGGURN'}->domain(); }
sub isAL2S($) { return $_[0]->{'ISAL2S'}; }
#
# Grab the webtask.
#
sub webtask($)
{
my ($self) = @_;
return $self->{'WEBTASK'}
if (defined($self->{'WEBTASK'}));
my $webtask = WebTask->Lookup($self->webtask_id());
return undef
if (!defined($webtask));
$self->{'WEBTASK'} = $webtask;
$webtask->AutoStore(1);
return $webtask;
}
#
# Get sliver status rows. Turn into a class at some point.
#
sub SliverStatus($)
{
my ($self) = @_;
my $result = {};
my $uuid = $self->uuid();
my $aggregate_urn = $self->aggregate_urn();
my $query_result =
DBQueryWarn("select * from apt_instance_sliver_status ".
"where uuid='$uuid' and aggregate_urn='$aggregate_urn'");
return $result
if (!$query_result);
while (my $row = $query_result->fetchrow_hashref()) {
my $json = $row->{"sliver_data"};
my $hash = eval { decode_json($json) };
if ($@) {
print STDERR "Could not decode json data: $json\n";
next;
}
$row->{"sliver_data"} = $hash;
$result->{$row->{"client_id"}} = $row;
}
return $result;
}
#
# Delete sliver status for a node.
#
sub DeleteSliverStatus($$)
{
my ($self, $client_id) = @_;
my $uuid = $self->uuid();
my $aggregate_urn = $self->aggregate_urn();
my $safe_clientid = DBQuoteSpecial($client_id);
DBQueryWarn("delete from apt_instance_sliver_status ".
"where uuid='$uuid' and aggregate_urn='$aggregate_urn' and ".
" client_id=$safe_clientid")
or return -1;
return 0;
}
# Backwards compat for a while
sub GenTemp($$)
{
......@@ -1407,6 +1477,14 @@ sub Delete($)
return 0
if ($self->{'FAKE'});
DBQueryWarn("delete from apt_instance_slice_status ".
"where uuid='$uuid' and aggregate_urn='$urn'")
or return -1;
DBQueryWarn("delete from apt_instance_sliver_status ".
"where uuid='$uuid' and aggregate_urn='$urn'")
or return -1;
DBQueryWarn("delete from apt_instance_aggregates ".
"where uuid='$uuid' and aggregate_urn='$urn'")
or return -1;
......@@ -1434,7 +1512,7 @@ sub Refresh($)
$self->{'DBROW'} = $query_result->fetchrow_hashref();
return -1
if ($self->webtask()->Refresh());
if (defined($self->{'WEBTASK'}) && $self->webtask()->Refresh());
return 0;
}
......@@ -1641,6 +1719,207 @@ sub UpdateFrisbeeStatus($$)
return $current;
}
#
# Update the slice status for the instance.
#
sub UpdateSliceStatus($$$)
{
my ($self, $hash, $json) = @_;
my $uuid = $self->uuid();
my $aggregate_urn = $self->aggregate_urn();
my $timestamp = $hash->{'utc'};
my $safe_stamp = DBQuoteSpecial($timestamp);
my $safe_json;
if (defined($json)) {
$safe_json = DBQuoteSpecial($json);
}
else {
$json = eval { encode_json($hash); };
if ($@) {
print STDERR "Could not json encode sliver data\n";
return -1;
}
$safe_json = DBQuoteSpecial($json);
}
DBQueryWarn("lock tables apt_instance_slice_status write")
or return -1;
my $query_result =
DBQueryWarn("select timestamp from apt_instance_slice_status ".
"where uuid='$uuid' and aggregate_urn='$aggregate_urn' ");
goto bad
if (!$query_result);
if (! $query_result->numrows) {
my $name = $self->name();
DBQueryWarn("insert into apt_instance_slice_status set ".
" uuid='$uuid', name='$name', ".
" aggregate_urn='$aggregate_urn', ".
" timestamp=$safe_stamp, modified=now(), ".
" slice_data=$safe_json")
or goto bad;
}
else {
my $ref = $query_result->fetchrow_hashref();
my $clause = "";
if ($timestamp >= $ref->{'timestamp'}) {
DBQueryWarn("update apt_instance_slice_status set modified=now(), ".
" timestamp=$safe_stamp,slice_data=$safe_json ".
"where uuid='$uuid' and ".
" aggregate_urn='$aggregate_urn'")
or goto bad;
}
}
DBQueryWarn("unlock tables");
return 0;
bad:
DBQueryWarn("unlock tables");
return -1;
}
#
# Update the sliverstatus for the instance.
#
sub UpdateSliverStatus($$$)
{
my ($self, $urn, $hash, $json) = @_;
my $uuid = $self->uuid();
my $aggregate_urn = $self->aggregate_urn();
my $sliver_urn = DBQuoteSpecial($urn);
my $timestamp = $hash->{'utc'};
my $safe_stamp = DBQuoteSpecial($timestamp);
my $safe_json;
if (defined($json)) {
$safe_json = DBQuoteSpecial($json);
}
else {
$json = eval { encode_json($hash); };
if ($@) {
print STDERR "Could not json encode sliver data\n";
return -1;
}
$safe_json = DBQuoteSpecial($json);
}
DBQueryWarn("lock tables apt_instance_sliver_status write")
or return -1;
my $query_result =
DBQueryWarn("select timestamp from apt_instance_sliver_status ".
"where uuid='$uuid' and aggregate_urn='$aggregate_urn' and ".
" sliver_urn=$sliver_urn");
goto bad
if (!$query_result);
if (! $query_result->numrows) {
my $name = $self->name();
my $client_id = DBQuoteSpecial($hash->{'client_id'});
my $node_id = DBQuoteSpecial($hash->{'component_urn'});
DBQueryWarn("insert into apt_instance_sliver_status set ".
" uuid='$uuid', name='$name', ".
" aggregate_urn='$aggregate_urn', ".
" resource_id=$node_id, client_id=$client_id, ".
" sliver_urn=$sliver_urn, ".
" timestamp=$safe_stamp, modified=now(), ".
" sliver_data=$safe_json")
or goto bad;
}
else {
my $ref = $query_result->fetchrow_hashref();
my $clause = "";
if ($timestamp >= $ref->{'timestamp'}) {
#
# Keep frisbeestatus for reloading, but we want to clear
# it otherwise.
#
if (exists($hash->{"rawstate"}) &&
! ($hash->{"rawstate"} eq TBDB_NODESTATE_RELOADING() ||
$hash->{"rawstate"} eq TBDB_NODESTATE_TBSETUP())) {
$clause = ",frisbee_data=NULL";
}
DBQueryWarn("update apt_instance_sliver_status set modified=now(), ".
" timestamp=$safe_stamp,sliver_data=$safe_json ".
" $clause ".
"where uuid='$uuid' and ".
" aggregate_urn='$aggregate_urn' and ".
" sliver_urn=$sliver_urn")
or goto bad;
}
}
DBQueryWarn("unlock tables");
return 0;
bad:
DBQueryWarn("unlock tables");
return -1;
}
#
# Update the frisbee status for the instance.
#
sub UpdateFrisbeeStatusNew($$$)
{
my ($self, $urn, $hash, $json) = @_;
my $uuid = $self->uuid();
my $aggregate_urn = $self->aggregate_urn();
my $sliver_urn = DBQuoteSpecial($urn);
my $safe_json = DBQuoteSpecial($json);
DBQueryWarn("lock tables apt_instance_sliver_status write")
or return -1;
my $query_result =
DBQueryWarn("select uuid from apt_instance_sliver_status ".
"where uuid='$uuid' and aggregate_urn='$aggregate_urn' and ".
" sliver_urn=$sliver_urn");
goto bad
if (!$query_result);
# Ignore if no existing sliver status,
if (! $query_result->numrows) {
DBQueryWarn("unlock tables");
return 0;
}
DBQueryWarn("update apt_instance_sliver_status set modified=now(), ".
" frisbee_data=$safe_json ".
"where uuid='$uuid' and ".
" aggregate_urn='$aggregate_urn' and ".
" sliver_urn=$sliver_urn")
or goto bad;
DBQueryWarn("unlock tables");
return 0;
bad:
DBQueryWarn("unlock tables");
return -1;
}
#
# A variant for updating above from the results of a SliverStatus() API call.
#
sub UpdateSliverStatusAll($$)
{
my ($self, $blob) = @_;
foreach my $urn (keys(%{$blob})) {
my $details = $blob->{$urn};
# Update sliver status, but no json from here,
$self->UpdateSliverStatus($urn, $details, undef);
}
return 0;
}
#
# Ask aggregate to terminate a sliver.
#
......@@ -2616,7 +2895,7 @@ sub WaitForSliver($)
# cares about. We get this on each loop, update so the web
# interface can show changes.
#
my $statusblob = $aggobj->UpdateWebStatus($repblob->{'details'});
my $statusblob = {};
#print STDERR Dumper($statusblob);
......@@ -2625,6 +2904,11 @@ sub WaitForSliver($)
my $details = $repblob->{'details'}->{$urn};
my $node_id = $details->{'client_id'};
# Update sliver status, but no json from here,
$aggobj->UpdateSliverStatus($urn, $details, undef);
# For next loop.
$statusblob->{$node_id} = $details;
#
# Look at the last blob. If we changed, view that as progress.
#
......
......@@ -45,6 +45,7 @@ my $optlist = "dnsv";
my $debug = 0;
my $impotent = 0;
my $verbose = 0;
my $count = 0;
#
# Configure variables
......@@ -62,9 +63,10 @@ $ENV{'PATH'} = '/bin:/usr/bin:/usr/local/bin:/usr/site/bin';
delete @ENV{'IFS', 'CDPATH', 'ENV', 'BASH_ENV'};
# Protos
sub HandleSliverStatus($$$);
sub HandleSliverStatus($$$$);
sub HandleSliceStatus($$$);
sub HandleImageStatus($$$);
sub HandleFrisbeeStatus($$$);
sub HandleFrisbeeStatus($$$$);
sub fatal($);
#
......@@ -166,16 +168,17 @@ sub callback($$$)
my $time = time();
my $site = event_notification_get_site($handle, $note);
my $urn = event_notification_get_string($handle, $note, "urn");
my $slice = event_notification_get_string($handle, $note, "slice");
my $type = event_notification_get_string($handle, $note, "type");
my $details = event_notification_get_string($handle, $note, "details");
# Ignore extraneous events. They happen when listening to the local
# cluster pubsubd.
return
if (! (defined($site) && defined($slice)));
my $urn = event_notification_get_string($handle, $note, "urn");
my $type = event_notification_get_string($handle, $note, "type");
my $details = event_notification_get_string($handle, $note, "details");
#
# Not sure why this is happening, but sometime the slice urn has
# extra double quotes around it. Kill them so the instance lookup
......@@ -187,16 +190,25 @@ sub callback($$$)
# Debugging.
return
if (0 && $slice !~ /stoller/);
$count++;
if ($debug || $verbose) {
print "Event: $time $site $type $urn $slice $details\n";
print "Event: $count $time $site $type $urn $slice $details\n";
}
my $instance = APT_Instance->LookupBySlice($slice);
return
if (!defined($instance));
if (0) {
goto done;
}
if ($type eq "SLIVERSTATUS") {
HandleSliverStatus($site, $instance, $details);
HandleSliverStatus($site, $urn, $instance, $details);
goto done;
}
elsif ($type eq "SLICESTATUS") {
HandleSliceStatus($site, $instance, $details);
goto done;
}
elsif ($type eq "IMAGESTATUS") {
......@@ -204,7 +216,7 @@ sub callback($$$)
goto done;
}
elsif ($type eq "FRISBEESTATUS") {
HandleFrisbeeStatus($site, $instance, $details);
HandleFrisbeeStatus($site, $urn, $instance, $details);
goto done;
}
done:
......@@ -218,9 +230,11 @@ sub callback($$$)
#
# Handle an Sliverstatus event.
#
sub HandleSliverStatus($$$)
sub HandleSliverStatus($$$$)
{
my ($site, $instance, $details) = @_;
my ($site, $sliver_urn, $instance, $json) = @_;
print "HandleSliverStatus: $site, $sliver_urn, $instance\n";
if (exists($instance->AggregateHash()->{$site})) {
my $sliver = $instance->AggregateHash()->{$site};
......@@ -229,14 +243,42 @@ sub HandleSliverStatus($$$)
}
else {
if ($debug || $verbose) {
print "Updating sliver status for sliver from $details\n";
print "Updating sliver status for sliver from $json\n";
}
$details = eval { decode_json($details) };
my $hash = eval { decode_json($json) };
if ($@) {
print STDERR "Could not decode json data: $details\n";
print STDERR "Could not decode json data: $json\n";
return;
}
$sliver->UpdateWebStatus({$site => $details});
$sliver->UpdateSliverStatus($sliver_urn, $hash, $json);
}
}
}
#
# Handle an Slice status event.
#
sub HandleSliceStatus($$$)
{
my ($site, $instance, $json) = @_;
print "HandleSliceStatus: $site, $instance\n";
if (exists($instance->AggregateHash()->{$site})) {
my $sliver = $instance->AggregateHash()->{$site};
if ($impotent) {
print "Would update slice status for $sliver from details\n";
}
else {
if ($debug || $verbose) {
print "Updating slice status for sliver from $json\n";
}
my $hash = eval { decode_json($json) };
if ($@) {
print STDERR "Could not decode json data: $json\n";
return;
}
$sliver->UpdateSliceStatus($hash, $json);
}
}
}
......@@ -246,7 +288,7 @@ sub HandleSliverStatus($$$)
#
sub HandleImageStatus($$$)
{
my ($site, $instance, $details) = @_;
my ($site, $instance, $json) = @_;
if (exists($instance->AggregateHash()->{$site})) {
if ($impotent) {
......@@ -254,14 +296,14 @@ sub HandleImageStatus($$$)
}
else {
if ($debug || $verbose) {
print "Updating image status for instance from $details\n";
print "Updating image status for instance from $json\n";
}
$details = eval { decode_json($details) };
my $hash = eval { decode_json($json) };
if ($@) {
print STDERR "Could not decode json data: $details\n";
print STDERR "Could not decode json data: $json\n";
return;
}
$instance->UpdateImageStatus($details);
$instance->UpdateImageStatusNew($hash, $json);
}
}
}
......@@ -269,25 +311,25 @@ sub HandleImageStatus($$$)
#
# Handle an FRISBEESTATUS event.
#
sub HandleFrisbeeStatus($$$)
sub HandleFrisbeeStatus($$$$)