Commit 2d953a1b authored by Kevin Tew's avatar Kevin Tew

testsuite/testswap ErrorStrategy

parent 74919953
DOCS TODO
Overview / howto write a test
TODO
TIMEOUT of XMLRPC Calls
TESTLWP
scp cleanup
chmod +x
wait for end of experiment
cmdline params (-D)
VERBOSENESS
Add basic image-test parameterization examples
EXAMPLES, Get some tests
traffic generation
convert more old tests
general result code handling framework
STILL MESSY
XMLRPC/Client/Experiment TestSuite/Experiment return codes, exceptions, composability
Calling parallel tests
XMLRPC/Client/Experiment TestSuite/Experiment dsl
EXPAND CURRENT IMPLEMENTATION
event subsystem
......@@ -30,7 +26,6 @@ LATER
Large External Tars and Resources for experiements
buildup, teardown using Test::Class
create BSD virtual machine for testing
possibly collapse tbts and t/harness
Parallel TODOS
retry
......
my $FFD = 0;
package TestBed::ForkFramework::Channel;
use SemiModern::Perl;
use Mouse;
use Data::Dumper;
use Carp;
use IO::Pipe;
use Storable qw(store_fd fd_retrieve);
has 'rd' => ( isa => 'Any', is => 'rw');
has 'wr' => ( isa => 'Any', is => 'rw');
sub build { TestBed::ForkFramework::Channel->new( 'rd' => shift, 'wr' => shift ); }
sub receive { receivefd(shift->rd); }
sub send { sendfd(shift->wr, shift); }
sub receivefd { my $fd = shift; my $r = fd_retrieve $fd; return $r->[0]; }
sub sendfd { my $fd = shift; store_fd [shift], $fd; $fd->flush; }
sub sendEnd { my $self = shift; $self->send(undef); $self->closeWr }
sub sendError { shift->send( [ 'E', @_ ] ) }
sub sendResult { shift->send( [ 'R', @_ ] ) }
sub selectInit { my $self = shift; return [ $self->rd, $self->wr, 0, $self ]; }
sub sendWorkStatus {
my ($self, $error, $jobid, $result) = @_;
if ($error) { $self->sendError($error, $jobid); }
else { $self->sendResult($result, $jobid); }
}
sub closeRd { my $self = shift; my $fh = $self->rd; close($fh) if defined $fh; $self->rd(undef); }
sub closeWr { my $self = shift; my $fh = $self->wr; close($fh) if defined $fh; $self->wr(undef); }
sub close { my $self = shift; $self->closeRd; $self->closeWr; }
package TestBed::ForkFramework::BiPipe;
use SemiModern::Perl;
use Mouse;
use Carp;
use IO::Pipe;
has 'pipes' => ( isa => 'ArrayRef', is => 'rw');
sub build { TestBed::ForkFramework::BiPipe->new( 'pipes' => [ map IO::Pipe->new, 1 .. 2 ]); }
sub parentAfterFork { buildChannel(@{shift->pipes}); }
sub childAfterFork { buildChannel(reverse(@{shift->pipes})); }
sub buildChannel { my ($rd, $wr) = @_; $rd->reader; $wr->writer; TestBed::ForkFramework::Channel::build($rd, $wr); }
sub build { TestBed::ForkFramework::Channel->new( 'pipes' => [ map IO::Pipe->new, 1 .. 2 ]); }
sub parentAfterFork { my $s = shift; $s->buildChannel(@{ $s->pipes}); }
sub childAfterFork { my $s = shift; $s->buildChannel(reverse(@{ $s->pipes})); }
sub buildChannel { my $s = shift; $s->rd(shift->reader); $s->wr(shift->writer); }
sub receive { receivefd(shift->rd); }
sub send { sendfd(shift->wr, shift); }
sub receivefd { my $fd = shift; my $r = fd_retrieve $fd; return $r->[0]; }
sub sendfd { my $fd = shift; store_fd [shift], $fd; $fd->flush; }
sub sendEnd { my $s = shift; $s->send(undef); $s->closeWr }
sub sendError { shift->send( [ 'E', @_ ] ) }
sub sendResult { shift->send( [ 'R', @_ ] ) }
sub selectInit { my $s = shift; return [ $s->rd, $s->wr, 0, $s ]; }
sub sendWorkStatus {
my ($self, $jobid, $result, $error) = @_;
if ($error) { $self->sendError($jobid, $error); }
else { $self->sendResult($jobid, $result); }
}
sub closeRd { my $s = shift; my $fh = $s->rd; close($fh) if defined $fh; $s->rd(undef); }
sub closeWr { my $s = shift; my $fh = $s->wr; close($fh) if defined $fh; $s->wr(undef); }
sub close { my $s = shift; $s->closeRd; $s->closeWr; }
package TestBed::ForkFramework::Redir;
use SemiModern::Perl;
......@@ -44,42 +39,31 @@ use Mouse;
use Carp;
use IO::Pipe;
has 'pipes' => ( isa => 'ArrayRef', is => 'rw');
has 'pipes' => ( isa => 'ArrayRef', is => 'rw', default => sub { [ map IO::Pipe->new, 1 .. 3 ] } );
sub build { TestBed::ForkFramework::Redir->new( 'pipes' => [ map IO::Pipe->new, 1 .. 3 ]); }
sub parentAfterFork { my $ps = shift->pipes; $ps->[0]->writer; $ps->[1]->reader; $ps->[2]->reader; return wantarray ? @{$ps} : $ps; }
sub childAfterFork {
sub childAfterFork {
my $s = shift;
my ($in, $out, $err) = @{$s->pipes};
$in->reader; $out->writer; $err->writer;
close STDIN;
close STDOUT;
close STDERR;
$in->reader; $out->writer; $err->writer;
close STDIN; close STDOUT; close STDERR;
open(STDIN, "<&", $in ->fileno);
open(STDOUT, ">&", $out->fileno);
open(STDOUT, ">&", $out->fileno);
open(STDERR, ">&", $err->fileno);
$s->close
}
sub handles { my $hs = shift->pipes; return wantarray ? @{$hs} : $hs; }
sub close { my $hs = shift->pipes; map { close $_; } @$hs; }
sub in { shift->pipes->[0]; }
sub out { shift->pipes->[1]; }
sub err { shift->pipes->[2]; }
package TestBed::ForkFramework;
sub redir_fork {
my ($worker) = @_;
my $redir = TestBed::ForkFramework::Redir::build;
sub forkit {
my ($parent_worker, $worker) = @_;
if ( my $pid = fork ) {
#Parent
my $handles = $redir->parentAfterFork;
return (@$handles, $pid);
return $parent_worker->($pid);
}
else {
#Child
$redir->childAfterFork;
use POSIX '_exit';
eval q{END { _exit 0 }};
......@@ -89,6 +73,31 @@ sub redir_fork {
}
}
sub fork_child_redir {
my ($worker) = @_;
fork_redir( sub { return @_; }, $worker);
}
sub fork_redir {
my ($parent_worker, $worker) = @_;
my $redir = TestBed::ForkFramework::Redir->new;
forkit( sub {
#Parent
my ($pid) = @_;
my $handles = $redir->parentAfterFork;
return $parent_worker->(@$handles, $pid);
#waitpid($pid, 0);
#return $pworker->(@$handles, $pid);
#return (@$handles, $pid);
},
sub {
$redir->childAfterFork;
$worker->();
}
);
}
package TestBed::ForkFramework::Scheduler;
use SemiModern::Perl;
use Mouse;
......@@ -96,12 +105,13 @@ use IO::Select;
use Carp;
use Data::Dumper;
has 'workers' => ( is => 'rw');
has 'results' => ( is => 'rw');
has 'errors' => ( is => 'rw');
has 'selector' => ( is => 'rw');
has 'items' => ( isa => 'ArrayRef' , is => 'rw');
has 'proc' => ( isa => 'CodeRef' , is => 'rw');
has 'workers' => ( is => 'rw', default => sub { [] });
has 'results' => ( is => 'rw', default => sub { [] });
has 'errors' => ( is => 'rw', default => sub { [] });
has 'selector' => ( is => 'rw', default => sub { IO::Select->new; });
has 'items' => ( is => 'rw', isa => 'ArrayRef', required => 1 );
has 'proc' => ( is => 'rw', isa => 'CodeRef' , required => 1 );
sub _gen_iterator {
my $items = shift;
......@@ -115,17 +125,24 @@ sub _gen_iterator {
}
}
sub wait_for_all_children_to_exit {
my ($self) = @_;
waitpid( $_, 0 ) for @{ $self->workers };
}
sub workloop {
my ($self) = @_;
LOOP: {
while( defined ( my $jobid = $self->spawnWorker ) ) {
say "spawnWorker $jobid" if $FFD;
$self->fffork($jobid);
}
say "CALL SELECT" if $FFD;
if ($self->selectloop) {
redo LOOP;
}
}
waitpid( $_, 0 ) for @{ $self->workers };
$self->wait_for_all_children_to_exit;
my @results = (scalar @{$self->errors}, $self->results, $self->errors);
return wantarray ? @results : \@results;
......@@ -142,50 +159,53 @@ sub selectloop {
for my $r ($selector->can_read) {
my ($rh, $wh, $eof, $ch) = @$r;
if (defined (my $result = $ch->receive)) {
my $type = shift @$result;
if ( $type eq 'R' ) { push @{ $self->results }, $result }
elsif ( $type eq 'E' ) { push @{ $self->errors }, $result }
else { die "Bad result type: $type"; }
$self->jobDone($result->[1]);
$self->jobDone(@$result);
unless ( $eof ) {
if( my $jobid = $self->nextJob ) { $ch->send($jobid); }
if( my $jobid = $self->nextJob ) {
say "newjob $jobid" if $FFD;
$ch->send($jobid); }
else {
say "no work killing $rh" if $FFD;
$ch->sendEnd;
@{$r}[1,2] = (undef, 1);
}
}
}
else {
say "received null ack from $rh" if $FFD;
$selector->remove($r);
$ch->close;
}
}
};
if ( my $error = $@ ) {
say "SELECT HAS ERRORS" if $FFD;
$_->[3]->sendEnd for $selector->handles;
waitpid( $_, 0 ) for @{ $self->workers };
$self->wait_for_all_children_to_exit;
die $error;
}
say "SELECT_HAS_HANDLES" if $FFD;
return SELECT_HAS_HANDLES;
}
say "SELECT_NO_HANDLES" if $FFD;
return SELECT_NO_HANDLES;
}
sub fffork {
my ($self, $workid) = @_;
my $bipipe = TestBed::ForkFramework::BiPipe::build;
my $ch = TestBed::ForkFramework::Channel::build;
if ( my $pid = fork ) {
#Parent
my $ch = $bipipe->parentAfterFork;
$ch->parentAfterFork;
push @{ $self->workers }, $pid;
$self->selector->add($ch->selectInit);
$ch->send($workid);
}
else {
#Child
my $ch = $bipipe->childAfterFork;
$ch->childAfterFork;
use POSIX '_exit';
eval q{END { _exit 0 }};
......@@ -193,7 +213,7 @@ sub fffork {
while ( defined( my $itemid = $ch->receive )) {
my $result = eval { $self->doItem($itemid); };
my $error = $@;
$ch->sendWorkStatus($error, $itemid, $result);
$ch->sendWorkStatus($itemid, $result, $error);
}
$ch->sendEnd;
$ch->close;
......@@ -202,39 +222,19 @@ sub fffork {
}
}
sub redir_std_fork {
my ($self, $pworker, $worker) = @_;
my $redir = TestBed::ForkFramework::Redir::build;
if ( my $pid = fork ) {
#Parent
my $handles = $redir->parentAfterFork;
$pworker->(@$handles, $pid);
waitpid($pid, 0);
}
else {
#Child
$redir->childAfterFork;
use POSIX '_exit';
eval q{END { _exit 0 }};
$worker->();
CORE::exit;
}
}
sub doItem { my ($s, $itemid) = @_; $s->proc->($s->items->[$itemid]); }
sub jobDone { }
sub jobDone {
my ($self, $type, @rest) = @_;
if ( $type eq 'R' ) { push @{ $self->results }, \@rest}
elsif ( $type eq 'E' ) { push @{ $self->errors }, \@rest}
else { die "Bad result type: $type"; }
}
package TestBed::ForkFramework::ForEach;
use SemiModern::Perl;
use Mouse;
has 'iter' => ( isa => 'CodeRef' , is => 'rw');
has 'iter' => ( isa => 'CodeRef' , is => 'rw', required => 1);
extends 'TestBed::ForkFramework::Scheduler';
......@@ -247,13 +247,10 @@ sub nextJob {
sub work {
my ($proc, $items) = @_;
my $s = TestBed::ForkFramework::ForEach->new(
'workers' => [],
'results' => [],
'items' => $items,
'errors' => [],
'proc' => $proc,
'iter' => TestBed::ForkFramework::Scheduler::_gen_iterator($items),
'selector' => IO::Select->new);
);
$s->workloop;
}
......@@ -261,9 +258,9 @@ package TestBed::ForkFramework::MaxWorkersScheduler;
use SemiModern::Perl;
use Mouse;
has 'maxworkers' => ( isa => 'Int' , is => 'rw');
has 'pos' => ( isa => 'Int' , is => 'rw');
has 'currworkers' => ( isa => 'Int' , is => 'rw');
has 'maxworkers' => ( isa => 'Int' , is => 'rw', default => 4);
has 'pos' => ( isa => 'Int' , is => 'rw', default => 0);
has 'currworkers' => ( isa => 'Int' , is => 'rw', default => 0);
extends 'TestBed::ForkFramework::Scheduler';
......@@ -271,14 +268,9 @@ sub work {
my ($max_workers, $proc, $items) = @_;
my $s = TestBed::ForkFramework::MaxWorkersScheduler->new(
'maxworkers' => $max_workers,
'currworkers' => 0,
'workers' => [],
'results' => [],
'items' => $items,
'errors' => [],
'proc' => $proc,
'pos' => 0,
'selector' => IO::Select->new);
);
$s->workloop;
}
......@@ -305,26 +297,30 @@ use Mouse;
extends 'TestBed::ForkFramework::Scheduler';
has 'maxnodes' => ( isa => 'Int' , is => 'rw');
has 'currnodes' => ( isa => 'Int' , is => 'rw');
has 'schedule' => ( isa => 'ArrayRef' , is => 'rw');
has 'weight' => ( isa => 'ArrayRef' , is => 'rw');
has 'maxnodes' => ( isa => 'Int' , is => 'rw', default => 20);
has 'currnodes' => ( isa => 'Int' , is => 'rw', default => 0);
has 'schedule' => ( isa => 'ArrayRef' , is => 'rw', required => 1);
has 'weight' => ( isa => 'ArrayRef' , is => 'rw', required => 1);
has 'retry' => ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'inRetry' => ( isa => 'Int' , is => 'rw', default => 0);
sub work {
my ($max_nodes, $proc, $weight, $items) = @_;
my $s = TestBed::ForkFramework::RateScheduler->new(
'maxnodes' => $max_nodes,
'currnodes' => 0,
'workers' => [],
'results' => [],
'items' => $items,
'errors' => [],
'proc' => $proc,
'schedule' => $weight,
'weight' => [ map { $_->[0] } (sort { $a->[1] <=> $b->[1] } @$weight) ],
'selector' => IO::Select->new);
#sayperl($s->schedule);
#sayperl($s->weight);
);
say toperl("SCHEDULE", $s->schedule) if $FFD;
say toperl("WEIGHTS", $s->weight) if $FFD;
$s->workloop;
say("RETRYING") if $FFD;
$s->inRetry(1);
$s->schedule( [ map { [$s->weight->[$_], $_] } @{$s->retry} ] );
say toperl("SCHEDULE", $s->schedule) if $FFD;
say toperl("WEIGHTS", $s->weight) if $FFD;
$s->workloop;
}
......@@ -344,6 +340,7 @@ sub find_largest_item {
return $found;
}
sub spawnWorker { shift->nextJob; }
sub nextJob {
my $s = shift;
......@@ -352,7 +349,7 @@ sub nextJob {
if ($tuple) {
my ($e_node_size, $eindex) = @$tuple;
#say sprintf("found %s size %s max_size $max_size currnodes %s maxnodes %s", $eindex, $e_node_size, $s->currnodes, $s->maxnodes);
say(sprintf("found %s size %s max_size $max_size currnodes %s maxnodes %s newcurrnodes %s", $eindex, $e_node_size, $s->currnodes, $s->maxnodes, $s->currnodes +$e_node_size)) if $FFD;
$s->{'currnodes'} += $e_node_size;
return $eindex;
}
......@@ -360,5 +357,24 @@ sub nextJob {
return;
}
}
sub jobDone { my ($s, $itemid) = @_; $s->{'currnodes'} -= $s->weight->[$itemid]; }
sub jobDone {
my $s = shift;
my ($type, $itemid, $result) = @_;
if ($type eq 'E') {
if ($result->isa('TestBed::ParallelRunner::SwapOutFailed')) { return; }
elsif ($result->isa('TestBed::ParallelRunner::RetryAtEnd')) {
if ($s->inRetry) {
push @{ $s->retry }, $itemid;
$s->{'currnodes'} -= $s->weight->[$itemid];
return;
}
else {
$result = $result->original;
}
}
}
$s->SUPER::jobDone($type, $itemid, $result);
$s->{'currnodes'} -= $s->weight->[$itemid];
}
1;
......@@ -25,13 +25,13 @@ sub _initialize {
return $self;
}
package TestBed::ParallelRunner;
use SemiModern::Perl;
use TestBed::ParallelRunner::Test;
use TestBed::ForkFramework;
use Data::Dumper;
use Mouse;
use TBConfig;
our $ExperimentTests = [];
......@@ -42,12 +42,16 @@ Not enough arguments to teste
teste($pid, $gid, $eid, $ns, $sub, $test_count, $desc);
END
sub add_experiment { push @$ExperimentTests, TestBed::ParallelRunner::Test::tn(@_); }
sub add_experiment {
my $te = TestBed::ParallelRunner::Test::tn(@_);
push @$ExperimentTests, $te;
$te;
}
sub runtests {
my ($concurrent_pre_runs, $concurrent_node_count_usage ) = @_;
$concurrent_pre_runs ||= 4;
$concurrent_node_count_usage ||= 20;
$concurrent_pre_runs ||= $TBConfig::concurrent_prerun_jobs;
$concurrent_node_count_usage ||= $TBConfig::concurrent_node_usage;
#prerun step
my $result = TestBed::ForkFramework::MaxWorkersScheduler::work($concurrent_pre_runs, sub { $_[0]->prep }, $ExperimentTests);
......@@ -63,14 +67,18 @@ sub runtests {
my $maximum_nodes = $hash->{'maximum_nodes'};
my $eid = $ExperimentTests->[$item_id]->e->eid;
#say "$eid $item_id $maximum_nodes";
push @weighted_experiements, [ $maximum_nodes, $item_id ];
if ($maximum_nodes > $concurrent_node_count_usage) {
warn "$eid requires upto $maximum_nodes nodes, only $concurrent_node_count_usage concurrent nodes permitted\n$eid will not be run";
}
else {
push @weighted_experiements, [ +$maximum_nodes, +$item_id ];
}
}
@weighted_experiements = sort { $a->[0] <=> $b->[0] } @weighted_experiements;
#count tests step
my $test_count = 0;
map { $test_count += $_->test_count } @$ExperimentTests;
map { $test_count += $ExperimentTests->[$_->[1]]->test_count } @weighted_experiements;
#run tests
reset_test_builder($test_count, no_numbers => 1);
......@@ -114,7 +122,7 @@ sub tap_wrapper {
my ($te) = @_;
if ($ENABLE_SUBTESTS_FEATURE) {
TestBed::ForkFramework::Scheduler->redir_std_fork( sub {
TestBed::ForkFramework::fork_redir( sub {
my ($in, $out, $err, $pid) = @_;
#while(<$out>) { print "K2" . $_; }
use TAP::Parser;
......@@ -138,7 +146,7 @@ sub tap_wrapper {
sub build_TAP_stream {
use TestBed::TestSuite;
my ($in, $out, $err, $pid) = TestBed::ForkFramework::redir_fork(sub { runtests; });
my ($in, $out, $err, $pid) = TestBed::ForkFramework::fork_child_redir(sub { runtests; });
return TAP::Parser::Iterator::StdOutErr->new($out, $err, $pid);
}
......
#!/usr/bin/perl
package TestBed::ParallelRunner::Exception;
use Mouse;
has original => ( is => 'rw');
no Mouse;
package TestBed::ParallelRunner::RetryAtEnd;
use Mouse;
extends('TestBed::ParallelRunner::Exception');
no Mouse;
package TestBed::ParallelRunner::SwapOutFailed;
use Mouse;
extends('TestBed::ParallelRunner::Exception');
no Mouse;
package TestBed::ParallelRunner::ErrorStrategy;
use SemiModern::Perl;
use Mouse;
sub swapin_error { }
sub run_error { }
sub swapout_error {
my ($xmlrpc_error) = @_;
die TestBed::ParallelRunner::SwapOutFailed->new($xmlrpc_error);
}
sub end_error { }
package TestBed::ParallelRunner::ErrorRetryStrategy;
use SemiModern::Perl;
use Mouse;
extends 'TestBed::ParallelRunner::ErrorStrategy';
sub swapin_error {
my @retry_causes = qw( temp internal software hardware canceled unknown);
my ($xmlrpc_error) = @_;
if ($xmlrpc_error =~ /RPC::XML::Struct/) {
my $cause = $xmlrpc_error->value->value->{'cause'};
if ( grep { /$cause/ } @retry_causes) {
die TestBed::ParallelRunner::RetryAtEnd->new($xmlrpc_error);
}
}
}
=head1 NAME
TestBed::ParallelRunner::ErrorStrategy
handle parallel run errors;
=over 4
=item C<< swapin_error($error) >>
=item C<< run_error($error) >>
=item C<< swapout_error($error) >>
=item C<< end_error($error) >>
=back
=cut