Commit 4c08dcee authored by Kevin Tew's avatar Kevin Tew

testsuite/testswap retry example, cleanups

parent d149ca13
DOCS TODO
retry example
TODO
TIMEOUT of XMLRPC Calls
......@@ -28,12 +29,5 @@ LATER
create BSD virtual machine for testing
Parallel TODOS
retry
clean up FFF, Roles, constructors
CartProd teste - image test example
DONE
POD function documentation
better general result code handling (ping_test single_node_test)
ForkFramework Constants
my $FFD = 0;
my $FFDEBUG = 0;
package TestBed::ForkFramework::Channel;
use SemiModern::Perl;
......@@ -21,14 +21,7 @@ sub send { sendfd(shift->wr, shift); }
sub receivefd { my $fd = shift; my $r = fd_retrieve $fd; return $r->[0]; }
sub sendfd { my $fd = shift; store_fd [shift], $fd; $fd->flush; }
sub sendEnd { my $s = shift; $s->send(undef); $s->closeWr }
sub sendError { shift->send( [ 'E', @_ ] ) }
sub sendResult { shift->send( [ 'R', @_ ] ) }
sub selectInit { my $s = shift; return [ $s->rd, $s->wr, 0, $s ]; }
sub sendWorkStatus {
my ($self, $jobid, $result, $error) = @_;
if ($error) { $self->sendError($jobid, $error); }
else { $self->sendResult($jobid, $result); }
}
sub closeRd { my $s = shift; my $fh = $s->rd; close($fh) if defined $fh; $s->rd(undef); }
sub closeWr { my $s = shift; my $fh = $s->wr; close($fh) if defined $fh; $s->wr(undef); }
sub close { my $s = shift; $s->closeRd; $s->closeWr; }
......@@ -54,6 +47,35 @@ sub childAfterFork {
}
sub close { my $hs = shift->pipes; map { close $_; } @$hs; }
package TestBed::ForkFramework::Results;
use SemiModern::Perl;
use Mouse;
has 'successes' => ( isa => 'ArrayRef', is => 'rw', default => sub { [ ] } );
has 'errors' => ( isa => 'ArrayRef', is => 'rw', default => sub { [ ] } );
sub push_success { push @{shift->successes}, shift; }
sub push_error { push @{shift->errors}, shift; }
sub has_errors { scalar @{shift->errors};}
sub handle_result {
my ($self, $result) = @_;
if ( $result->is_error ) { $self->push_error($result); }
else { $self->push_success($result); }
}
package TestBed::ForkFramework::ItemResult;
use SemiModern::Perl;
use Mouse;
has 'result' => ( is => 'rw');
has 'error' => ( is => 'rw');
has 'itemid' => ( is => 'rw');
sub is_error { shift->error; }
use SemiModern::Perl;
use Mouse;
package TestBed::ForkFramework;
sub forkit {
my ($parent_worker, $worker) = @_;
......@@ -86,9 +108,6 @@ sub fork_redir {
my ($pid) = @_;
my $handles = $redir->parentAfterFork;
return $parent_worker->(@$handles, $pid);
#waitpid($pid, 0);
#return $pworker->(@$handles, $pid);
#return (@$handles, $pid);
},
sub {
$redir->childAfterFork;
......@@ -97,7 +116,6 @@ sub fork_redir {
);
}
package TestBed::ForkFramework::Scheduler;
use SemiModern::Perl;
use Mouse;
......@@ -106,8 +124,7 @@ use Carp;
use Data::Dumper;
has 'workers' => ( is => 'rw', default => sub { [] });
has 'results' => ( is => 'rw', default => sub { [] });
has 'errors' => ( is => 'rw', default => sub { [] });
has 'results' => ( is => 'rw', default => sub { TestBed::ForkFramework::Results->new; });
has 'selector' => ( is => 'rw', default => sub { IO::Select->new; });
has 'items' => ( is => 'rw', isa => 'ArrayRef', required => 1 );
has 'proc' => ( is => 'rw', isa => 'CodeRef' , required => 1 );
......@@ -134,21 +151,20 @@ sub workloop {
my ($self) = @_;
LOOP: {
while( defined ( my $jobid = $self->spawnWorker ) ) {
say "spawnWorker $jobid" if $FFD;
say "spawnWorker $jobid" if $FFDEBUG;
$self->fffork($jobid);
}
say "CALL SELECT" if $FFD;
say "CALL SELECT" if $FFDEBUG;
if ($self->selectloop) {
redo LOOP;
}
}
$self->wait_for_all_children_to_exit;
my @results = (scalar @{$self->errors}, $self->results, $self->errors);
return wantarray ? @results : \@results;
return $self->results;
}
use constant SELECT_HAS_HANDLES => 1;
use constant SELECT_HAS_HANDLES => 1;
use constant SELECT_NO_HANDLES => 0;
sub selectloop {
......@@ -159,36 +175,36 @@ sub selectloop {
for my $r ($selector->can_read) {
my ($rh, $wh, $eof, $ch) = @$r;
if (defined (my $result = $ch->receive)) {
$self->jobDone(@$result);
$self->handleResult($result);
unless ( $eof ) {
if( my $jobid = $self->nextJob ) {
say "newjob $jobid" if $FFD;
say "newjob $jobid" if $FFDEBUG;
$ch->send($jobid); }
else {
say "no work killing $rh" if $FFD;
say "no work killing $rh" if $FFDEBUG;
$ch->sendEnd;
@{$r}[1,2] = (undef, 1);
}
}
}
else {
say "received null ack from $rh" if $FFD;
say "received null ack from $rh" if $FFDEBUG;
$selector->remove($r);
$ch->close;
}
}
};
if ( my $error = $@ ) {
say "SELECT HAS ERRORS" if $FFD;
say "SELECT HAS ERRORS" if $FFDEBUG;
$_->[3]->sendEnd for $selector->handles;
$self->wait_for_all_children_to_exit;
die $error;
}
say "SELECT_HAS_HANDLES" if $FFD;
say "SELECT_HAS_HANDLES" if $FFDEBUG;
return SELECT_HAS_HANDLES;
}
say "SELECT_NO_HANDLES" if $FFD;
say "SELECT_NO_HANDLES" if $FFDEBUG;
return SELECT_NO_HANDLES;
}
......@@ -213,7 +229,7 @@ sub fffork {
while ( defined( my $itemid = $ch->receive )) {
my $result = eval { $self->doItem($itemid); };
my $error = $@;
$ch->sendWorkStatus($itemid, $result, $error);
$ch->send(TestBed::ForkFramework::ItemResult->new(itemid => $itemid, result => $result, error => $error));
}
$ch->sendEnd;
$ch->close;
......@@ -223,12 +239,8 @@ sub fffork {
}
sub doItem { my ($s, $itemid) = @_; $s->proc->($s->items->[$itemid]); }
sub jobDone {
my ($self, $type, @rest) = @_;
if ( $type eq 'R' ) { push @{ $self->results }, \@rest}
elsif ( $type eq 'E' ) { push @{ $self->errors }, \@rest}
else { die "Bad result type: $type"; }
}
sub handleResult { recordResult(@_); }
sub recordResult { shift->results->handle_result(shift); }
package TestBed::ForkFramework::ForEach;
use SemiModern::Perl;
......@@ -301,26 +313,38 @@ has 'maxnodes' => ( isa => 'Int' , is => 'rw', default => 20);
has 'currnodes' => ( isa => 'Int' , is => 'rw', default => 0);
has 'schedule' => ( isa => 'ArrayRef' , is => 'rw', required => 1);
has 'weight' => ( isa => 'ArrayRef' , is => 'rw', required => 1);
has 'retry' => ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'inRetry' => ( isa => 'Int' , is => 'rw', default => 0);
has 'retryItems'=> ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'inRetry' => ( isa => 'Int' , is => 'rw', default => 0);
sub incr_currnodes {
my ($s, $quantity) = @_;
$s->{'currnodes'} += $quantity;
}
sub return_node_resources {
my ($s, $itemid) = @_;
$s->{'currnodes'} -= $s->weight->[$itemid];
}
sub work {
my ($max_nodes, $proc, $weight, $items) = @_;
my ($max_nodes, $proc, $schedule, $items) = @_;
my $s = TestBed::ForkFramework::RateScheduler->new(
'maxnodes' => $max_nodes,
'items' => $items,
'proc' => $proc,
'schedule' => $weight,
'weight' => [ map { $_->[0] } (sort { $a->[1] <=> $b->[1] } @$weight) ],
'schedule' => $schedule,
'weight' => [ map { $_->[0] } (sort { $a->[1] <=> $b->[1] } @$schedule) ],
);
say toperl("SCHEDULE", $s->schedule) if $FFD;
say toperl("WEIGHTS", $s->weight) if $FFD;
say toperl("SCHEDULE", $s->schedule) if $FFDEBUG;
say toperl("WEIGHTS", $s->weight) if $FFDEBUG;
$s->workloop;
say("RETRYING") if $FFD;
say("RETRYING") if $FFDEBUG;
$s->inRetry(1);
$s->schedule( [ map { [$s->weight->[$_], $_] } @{$s->retry} ] );
say toperl("SCHEDULE", $s->schedule) if $FFD;
say toperl("WEIGHTS", $s->weight) if $FFD;
$s->schedule( [ map { [$s->weight->[$_], $_] } @{$s->retryItems} ] );
$s->retryItems([]);
say toperl("SCHEDULE", $s->schedule) if $FFDEBUG;
say toperl("WEIGHTS", $s->weight) if $FFDEBUG;
$s->workloop;
}
......@@ -330,7 +354,11 @@ sub find_largest_item {
#find largest item that is small enough
for (@{ $s->schedule }) {
$found = $_ if $_->[0] <= $max_size;
my $itemsize = $_->[0];
last if $itemsize > $max_size;
next if ($found and $found->[0] >= $itemsize);
$found = $_ if $itemsize <= $max_size;
}
#remove found from schedule
......@@ -349,32 +377,46 @@ sub nextJob {
if ($tuple) {
my ($e_node_size, $eindex) = @$tuple;
say(sprintf("found %s size %s max_size $max_size currnodes %s maxnodes %s newcurrnodes %s", $eindex, $e_node_size, $s->currnodes, $s->maxnodes, $s->currnodes +$e_node_size)) if $FFD;
say(sprintf("found %s size %s max_size $max_size currnodes %s maxnodes %s newcurrnodes %s", $eindex, $e_node_size, $s->currnodes, $s->maxnodes, $s->currnodes +$e_node_size)) if $FFDEBUG;
$s->{'currnodes'} += $e_node_size;
return $eindex;
}
else { return; }
}
use TestBed::ParallelRunner::ErrorConstants;
sub return_and_report {
my ($s, $result) = @_;
$s->recordResult($result);
$s->return_node_resources($result->itemid);
}
sub handleResult {
my ($s, $result) = @_;
my $executor = $s->items->[$result->itemid];
if ($executor->can('handleResult')) {
my $rc = $executor->handleResult($s, $result);
if ($rc == RETURN_AND_REPORT) { $s->return_and_report($result) }
}
else {
return;
$s->return_and_report($result);
}
}
sub jobDone {
my $s = shift;
my ($type, $itemid, $result) = @_;
if ($type eq 'E') {
if ($result->isa('TestBed::ParallelRunner::SwapOutFailed')) { return; }
elsif ($result->isa('TestBed::ParallelRunner::RetryAtEnd')) {
if ($s->inRetry) {
push @{ $s->retry }, $itemid;
$s->{'currnodes'} -= $s->weight->[$itemid];
return;
}
else {
$result = $result->original;
}
}
sub retry {
my ($s, $result) = @_;
my $itemid = $result->itemid;
if (!$s->inRetry) {
push @{ $s->retryItems }, $itemid;
$s->return_node_resources($itemid);
say "RETRYING item# $itemid";
return 1;
}
else {
say "DONE RETRYING";
$s->return_and_report($result);
}
$s->SUPER::jobDone($type, $itemid, $result);
$s->{'currnodes'} -= $s->weight->[$itemid];
}
1;
......@@ -28,7 +28,10 @@ sub split_t_pm {
sub runharness {
my @parts = my ($ts, $pms) = split_t_pm(@_);
for (@$pms) { eval "require \'$_\';"; }
for (@$pms) {
eval "require \'$_\';";
die $@ if $@;
}
my %harness_args = (
verbosity => 1,
......
......@@ -27,62 +27,69 @@ sub _initialize {
package TestBed::ParallelRunner;
use SemiModern::Perl;
use TestBed::ParallelRunner::Test;
use TestBed::ParallelRunner::Executor;
use TestBed::ForkFramework;
use Data::Dumper;
use Mouse;
use TBConfig;
our $ExperimentTests = [];
our $Executors = [];
my $teste_desc = <<'END';
Not enough arguments to teste
teste(eid, $ns, $sub, $test_count, $desc);
teste($pid, $eid, $ns, $sub, $test_count, $desc);
teste($pid, $gid, $eid, $ns, $sub, $test_count, $desc);
rege(eid, $ns, $sub, $test_count, $desc);
rege($pid, $eid, $ns, $sub, $test_count, $desc);
rege($pid, $gid, $eid, $ns, $sub, $test_count, $desc);
END
sub add_experiment {
my $te = TestBed::ParallelRunner::Test::tn(@_);
push @$ExperimentTests, $te;
$te;
sub add_executor {
my $executor = shift;
push @$Executors, $executor;
$executor;
}
sub build_executor {
my $executor = TestBed::ParallelRunner::Executor::build(@_);
add_executor($executor);
}
sub runtests {
my ($concurrent_pre_runs, $concurrent_node_count_usage ) = @_;
$concurrent_pre_runs ||= $TBConfig::concurrent_prerun_jobs;
$concurrent_pre_runs ||= $TBConfig::concurrent_prerun_jobs;
$concurrent_node_count_usage ||= $TBConfig::concurrent_node_usage;
#prerun step
my $result = TestBed::ForkFramework::MaxWorkersScheduler::work($concurrent_pre_runs, sub { $_[0]->prep }, $ExperimentTests);
if ($result->[0]) {
sayd($result->[2]);
my $result = TestBed::ForkFramework::MaxWorkersScheduler::work($concurrent_pre_runs, sub { shift->prep }, $Executors);
if ($result->has_errors) {
sayd($result->errors);
die 'TestBed::ParallelRunner::runtests died during test prep';
}
#create schedule step
my @weighted_experiements;
for (@{$result->[1]}) {
my ($hash, $item_id) = @$_;
my $maximum_nodes = $hash->{'maximum_nodes'};
my $eid = $ExperimentTests->[$item_id]->e->eid;
my @schedule;
for (@{$result->successes}) {
my $item_id = $_->itemid;
my $maximum_nodes = $_->result->{'maximum_nodes'};
my $eid = $Executors->[$item_id]->e->eid;
#say "$eid $item_id $maximum_nodes";
if ($maximum_nodes > $concurrent_node_count_usage) {
warn "$eid requires upto $maximum_nodes nodes, only $concurrent_node_count_usage concurrent nodes permitted\n$eid will not be run";
}
else {
push @weighted_experiements, [ +$maximum_nodes, +$item_id ];
push @schedule, [ +$maximum_nodes, +$item_id ];
}
}
@weighted_experiements = sort { $a->[0] <=> $b->[0] } @weighted_experiements;
@schedule = sort { $a->[0] <=> $b->[0] } @schedule;
#count tests step
my $test_count = 0;
map { $test_count += $ExperimentTests->[$_->[1]]->test_count } @weighted_experiements;
map { $test_count += $Executors->[$_->[1]]->test_count } @schedule;
#run tests
reset_test_builder($test_count, no_numbers => 1);
$result = TestBed::ForkFramework::RateScheduler::work($concurrent_node_count_usage, \&tap_wrapper, \@weighted_experiements, $ExperimentTests);
$result = TestBed::ForkFramework::RateScheduler::work($concurrent_node_count_usage, \&tap_wrapper, \@schedule, $Executors);
set_test_builder_to_end_state($test_count);
return;
}
......@@ -135,11 +142,11 @@ sub tap_wrapper {
sub {
reset_test_builder($te->test_count) if $ENABLE_SUBTESTS_FEATURE;
setup_test_builder_ouputs(*STDOUT, *STDERR);
$te->run_ensure_kill;
$te->execute;
});
}
else {
$te->run_ensure_kill;
$te->execute;
}
return 0;
}
......@@ -156,10 +163,10 @@ TestBed::ParallelRunner
=over 4
=item C<< add_experiment >>
=item C<< add_executor >>
helper function called by rege.
creates a TestBed::ParallelRunner::Test job and pushes it onto @$ExperimentTests
creates a TestBed::ParallelRunner::Execu:or job and pushes it onto @$Executors
=item C<< runtests >>
......
#!/usr/bin/perl
package TestBed::ParallelRunner::ErrorConstants;
use SemiModern::Perl;
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(RETURN_AND_REPORT);
use constant RETURN_AND_REPORT => 4096;
1;
#!/usr/bin/perl
package TestBed::ParallelRunner::Exception;
use Mouse;
has original => ( is => 'rw');
no Mouse;
package TestBed::ParallelRunner::RetryAtEnd;
use Mouse;
extends('TestBed::ParallelRunner::Exception');
no Mouse;
package TestBed::ParallelRunner::SwapOutFailed;
use Mouse;
extends('TestBed::ParallelRunner::Exception');
no Mouse;
package TestBed::ParallelRunner::ErrorStrategy;
use SemiModern::Perl;
use Mouse;
use TestBed::ParallelRunner::ErrorConstants;
has 'executor' => (is => 'rw');
has 'scheduler' => (is => 'rw');
has 'result' => (is => 'rw');
sub handleResult {
my $s = shift;
my ($executor, $scheduler, $result) = @_;
$s->executor($executor);
$s->scheduler($scheduler);
$s->result($result);
if ($result->is_error) {
my $error = $result->error;
if ( $error->isa ( 'TestBed::ParallelRunner::Executor::SwapinError')) { return $s->swapin_error ( @_); }
elsif ( $error->isa ( 'TestBed::ParallelRunner::Executor::RunError')) { return $s->run_error ( @_); }
elsif ( $error->isa ( 'TestBed::ParallelRunner::Executor::SwapoutError')) { return $s->swapout_error ( @_); }
elsif ( $error->isa ( 'TestBed::ParallelRunner::Executor::KillError')) { return $s->end_error ( @_); }
elsif ( $error->isa ( 'TestBed::ParallelRunner::Executor::Exception')) { return $s->run_error ( @_); }
else { return RETURN_AND_REPORT; }
}
else { return RETURN_AND_REPORT; }
}
sub swapin_error { }
sub run_error { }
sub swapout_error {
my ($xmlrpc_error) = @_;
die TestBed::ParallelRunner::SwapOutFailed->new($xmlrpc_error);
sub xmlrpc_error_cause {
my ($s) = @_;
my $result = $s->result;
if ( $result->error->original ) {
my $xmlrpc_error = $result->error->original;
if ($xmlrpc_error->isa ( 'RPC::XML::struct' ) ) {
return $xmlrpc_error->{value}->{'cause'};
}
}
return;
}
sub end_error { }
sub swapin_error { return RETURN_AND_REPORT; }
sub run_error { return RETURN_AND_REPORT; }
sub swapout_error { return RETURN_AND_REPORT; }
sub end_error { return RETURN_AND_REPORT; }
package TestBed::ParallelRunner::ErrorRetryStrategy;
use SemiModern::Perl;
use Mouse;
use TestBed::ParallelRunner::ErrorConstants;
extends 'TestBed::ParallelRunner::ErrorStrategy';
sub swapin_error {
my @retry_causes = qw( temp internal software hardware canceled unknown);
my ($xmlrpc_error) = @_;
if ($xmlrpc_error =~ /RPC::XML::Struct/) {
my $cause = $xmlrpc_error->value->value->{'cause'};
if ( grep { /$cause/ } @retry_causes) {
die TestBed::ParallelRunner::RetryAtEnd->new($xmlrpc_error);
}
}
my ($s, $executor, $scheduler, $result) = @_;
if ($s->is_retry_cause) { return $scheduler->retry($result); }
else { return RETURN_AND_REPORT; }
}
sub is_retry_cause {
my $s = shift;
my $cause = $s->xmlrpc_error_cause;
my @retry_causes = qw(temp internal software hardware canceled unknown);
if ( grep { /$$cause/ } @retry_causes) { return 1; }
return 0;
}
=head1 NAME
......@@ -51,6 +72,10 @@ handle parallel run errors;
=over 4
=item C<< $es->handleResult( $executor, $scheduler, $result ) >>
dispatch experiment errors to the right handler
=item C<< swapin_error($error) >>
=item C<< run_error($error) >>
=item C<< swapout_error($error) >>
......@@ -60,5 +85,4 @@ handle parallel run errors;
=cut
1;
#!/usr/bin/perl
package TestBed::ParallelRunner::Executor::Exception;
use Mouse;
has original => ( is => 'rw');
no Mouse;
package TestBed::ParallelRunner::Executor::PrepError;
use Mouse;
extends('TestBed::ParallelRunner::Executor::Exception');
no Mouse;
package TestBed::ParallelRunner::Executor::SwapinError;
use Mouse;
extends('TestBed::ParallelRunner::Executor::Exception');
no Mouse;
package TestBed::ParallelRunner::Executor::RunError;
use Mouse;
extends('TestBed::ParallelRunner::Executor::Exception');
no Mouse;
package TestBed::ParallelRunner::Executor::SwapoutError;
use Mouse;
extends('TestBed::ParallelRunner::Executor::Exception');
no Mouse;
package TestBed::ParallelRunner::Executor::KillError;
use Mouse;
extends('TestBed::ParallelRunner::Executor::Exception');
no Mouse;
package TestBed::ParallelRunner::Executor;
use TestBed::ParallelRunner::ErrorStrategy;
use SemiModern::Perl;
use TestBed::TestSuite::Experiment;
use Mouse;
use Data::Dumper;
has 'e' => ( isa => 'TestBed::TestSuite::Experiment', is => 'rw');
has 'desc' => ( isa => 'Str', is => 'rw');
has 'ns' => ( isa => 'Str', is => 'rw');
has 'proc' => ( isa => 'CodeRef', is => 'rw');
has 'test_count' => ( isa => 'Any', is => 'rw');
has 'error_strategy' => ( is => 'rw', lazy => 1, default => sub { TestBed::ParallelRunner::ErrorStrategy->new; } );
has 'pre_result_handler' => ( isa => 'CodeRef', is => 'rw');
sub build {
my ($e, $ns, $sub, $test_count, $desc) = (shift, shift, shift, shift, shift);
my %options = @_;
if (defined (delete $options{retry})) {
$options{error_strategy} = TestBed::ParallelRunner::ErrorRetryStrategy->new;
}
if (defined (my $strategy = delete $options{strategy})) {
$options{error_strategy} = $strategy;
}
return TestBed::ParallelRunner::Executor->new(
'e' => $e,
'ns' => $ns,
'proc' => $sub,
'test_count' => $test_count,
'desc' => $desc,
%options
);
}
sub handleResult {
my ($s) = @_;
my $prh = $s->pre_result_handler;
$prh->(@_) if $prh;
$s->error_strategy->handleResult( @_);
}
sub prep {
my $self = shift;
my $r = eval { $self->e->create_and_get_metadata($self->ns); };
die TestBed::ParallelRunner::Executor::PrepError->new( original => $@ ) if $@;
return $r;
}
sub execute {
my $self = shift;
my $e = $self->e;
eval { $e->swapin_wait; };
die TestBed::ParallelRunner::Executor::SwapinError->new( original => $@ ) if $@;
eval { $self->proc->($e); };
die TestBed::ParallelRunner::Executor::RunError->new( original => $@ ) if $@;
eval { $e->swapout_wait; };
die TestBed::ParallelRunner::Executor::SwapoutError->new( original => $@ ) if $@;
eval { $e->end_wait; };
die TestBed::ParallelRunner::Executor::KillError->new( original => $@ ) if $@;
return 1;
}
=head1 NAME
TestBed::ParallelRunner::Executor
Represents a ParallelRunner Job
=over 4
=item C<< build($e, $ns, $sub, $test_count, $desc) >>