Commit 8a7e6259 authored by Kevin Tew's avatar Kevin Tew
Browse files

Backoff and function documentation

parent debd552d
ImageTest
Parameters
OldTests
DOCS TODO
retry example
backoff example
TODO
TIMEOUT of XMLRPC Calls
......@@ -18,11 +22,10 @@ STILL MESSY
EXPAND CURRENT IMPLEMENTATION
event subsystem
parallel support (custom test harness)
Test::Builder support
LATER
Client.pm duplicate code elimination - Maybe this would make the code too unreadable
TestSuite::Experiment::Macros should be a monadic language like JQuery
test groupings
Large External Tars and Resources for experiements
buildup, teardown using Test::Class
......
......@@ -73,9 +73,6 @@ has 'itemid' => ( is => 'rw');
sub is_error { shift->error; }
use SemiModern::Perl;
use Mouse;
package TestBed::ForkFramework;
sub forkit {
my ($parent_worker, $worker) = @_;
......@@ -95,11 +92,6 @@ sub forkit {
}
}
sub fork_child_redir {
my ($worker) = @_;
fork_redir( sub { return @_; }, $worker);
}
sub fork_redir {
my ($parent_worker, $worker) = @_;
my $redir = TestBed::ForkFramework::Redir->new;
......@@ -116,6 +108,11 @@ sub fork_redir {
);
}
sub fork_child_redir {
my ($worker) = @_;
fork_redir( sub { return @_; }, $worker);
}
package TestBed::ForkFramework::Scheduler;
use SemiModern::Perl;
use Mouse;
......@@ -123,24 +120,12 @@ use IO::Select;
use Carp;
use Data::Dumper;
has 'workers' => ( is => 'rw', default => sub { [] });
has 'results' => ( is => 'rw', default => sub { TestBed::ForkFramework::Results->new; });
has 'selector' => ( is => 'rw', default => sub { IO::Select->new; });
has 'items' => ( is => 'rw', isa => 'ArrayRef', required => 1 );
has 'proc' => ( is => 'rw', isa => 'CodeRef' , required => 1 );
has 'workers' => ( is => 'rw', default => sub { [] });
has 'results' => ( is => 'rw', default => sub { TestBed::ForkFramework::Results->new; });
has 'selector' => ( is => 'rw', default => sub { IO::Select->new; });
has 'selecttimeout' => ( is => 'rw', default => 10 ); #seconds
has 'proc' => ( is => 'rw', isa => 'CodeRef' , required => 1 );
sub _gen_iterator {
my $items = shift;
my @ar = @$items;
my $pos = 0;
return sub {
return if $pos >= @ar;
my @r = ( $pos, $ar[$pos] );
$pos++;
return @r;
}
}
sub wait_for_all_children_to_exit {
my ($self) = @_;
......@@ -154,10 +139,10 @@ sub workloop {
say "spawnWorker $jobid" if $FFDEBUG;
$self->fffork($jobid);
}
say "CALL SELECT" if $FFDEBUG;
if ($self->selectloop) {
redo LOOP;
}
my $selectrc = $self->process_select; say "CALL SELECT" if $FFDEBUG;
my $schedulerc = $self->schedule;
if ($selectrc || $schedulerc) { redo LOOP; }
}
$self->wait_for_all_children_to_exit;
......@@ -167,12 +152,12 @@ sub workloop {
use constant SELECT_HAS_HANDLES => 1;
use constant SELECT_NO_HANDLES => 0;
sub selectloop {
sub process_select {
my ($self) = @_;
my $selector = $self->selector;
if ($selector->count) {
eval {
for my $r ($selector->can_read) {
for my $r ($selector->can_read($self->selecttimeout)) {
my ($rh, $wh, $eof, $ch) = @$r;
if (defined (my $result = $ch->receive)) {
$self->handleResult($result);
......@@ -238,19 +223,29 @@ sub fffork {
}
}
sub doItem { my ($s, $itemid) = @_; $s->proc->($s->items->[$itemid]); }
sub doItem { die "HAVE TO IMPLEMENT doItem"; }
sub handleResult { recordResult(@_); }
sub recordResult { shift->results->handle_result(shift); }
sub schedule { 0; }
package TestBed::ForkFramework::ForEach;
use SemiModern::Perl;
use Mouse;
has 'iter' => ( isa => 'CodeRef' , is => 'rw', required => 1);
has 'maxworkers' => ( is => 'rw', isa => 'Int' , default => 4);
has 'currworkers' => ( is => 'rw', isa => 'Int' , default => 0);
has 'iter' => ( is => 'rw', isa => 'CodeRef' , required => 1);
has 'items' => ( is => 'rw', isa => 'ArrayRef', required => 1 );
extends 'TestBed::ForkFramework::Scheduler';
sub spawnWorker { shift->nextJob; }
sub spawnWorker {
my $s = shift;
return if ($s->currworkers >= $s->maxworkers);
$s->{'currworkers'}++;
$s->nextJob;
}
sub nextJob {
my @res = shift->iter->();
$res[0];
......@@ -258,50 +253,55 @@ sub nextJob {
sub work {
my ($proc, $items) = @_;
my $s = TestBed::ForkFramework::ForEach->new(
'items' => $items,
'proc' => $proc,
'iter' => TestBed::ForkFramework::Scheduler::_gen_iterator($items),
);
$s->workloop;
return max_work(scalar @$items, $proc, $items);
}
package TestBed::ForkFramework::MaxWorkersScheduler;
use SemiModern::Perl;
use Mouse;
has 'maxworkers' => ( isa => 'Int' , is => 'rw', default => 4);
has 'pos' => ( isa => 'Int' , is => 'rw', default => 0);
has 'currworkers' => ( isa => 'Int' , is => 'rw', default => 0);
sub _gen_iterator {
my $items = shift;
my @ar = @$items;
my $pos = 0;
return sub {
return if $pos >= @ar;
my @r = ( $pos, $ar[$pos] );
$pos++;
return @r;
}
}
extends 'TestBed::ForkFramework::Scheduler';
sub work {
sub max_work {
my ($max_workers, $proc, $items) = @_;
my $s = TestBed::ForkFramework::MaxWorkersScheduler->new(
my $s = TestBed::ForkFramework::ForEach->new(
'maxworkers' => $max_workers,
'items' => $items,
'proc' => $proc,
'iter' => _gen_iterator($items),
);
$s->workloop;
}
sub spawnWorker {
my $s = shift;
return if ($s->currworkers >= $s->maxworkers);
$s->{'currworkers'}++;
$s->nextJob;
}
sub doItem { my ($s, $itemid) = @_; $s->proc->($s->items->[$itemid]); }
sub nextJob {
my $s = shift;
my $pos = $s->pos;
return if ($pos >= scalar @{ $s->items });
$s->{'pos'}++;
$pos;
package TestBed::ForkFramework::WeightedScheduler::Task;
use SemiModern::Perl;
use Mouse;
has 'id' => (is => 'rw');
has 'item' => (is => 'rw');
has 'runtime' => (is => 'rw', default => 0);
has 'weight' => (is => 'rw', default => 0);
sub build {
shift;
return TestBed::ForkFramework::WeightedScheduler::Task->new(
id => shift,
item => shift,
weight => shift
);
}
package TestBed::ForkFramework::RateScheduler;
sub ready { return time >= shift->runtime; }
package TestBed::ForkFramework::WeightedScheduler;
use SemiModern::Perl;
use Data::Dumper;
use Tools;
......@@ -309,13 +309,31 @@ use Mouse;
extends 'TestBed::ForkFramework::Scheduler';
has 'ids' => ( isa => 'Int' , is => 'rw', default => 0);
has 'maxnodes' => ( isa => 'Int' , is => 'rw', default => 20);
has 'currnodes' => ( isa => 'Int' , is => 'rw', default => 0);
has 'schedule' => ( isa => 'ArrayRef' , is => 'rw', required => 1);
has 'weight' => ( isa => 'ArrayRef' , is => 'rw', required => 1);
has 'retryItems'=> ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'runqueue' => ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'tasks' => ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'retryTasks'=> ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'waitTasks' => ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'inRetry' => ( isa => 'Int' , is => 'rw', default => 0);
sub nextID {
my ($s) = @_;
my $id = $s->ids;
$s->ids($id + 1);
$id;
}
sub task { shift->tasks->[shift]; }
sub add_task {
my ($s, $item, $weight) = @_;
my $id = $s->nextID;
my $task = TestBed::ForkFramework::WeightedScheduler::Task->build($id, $item, $weight);
push @{$s->runqueue}, $task;
$s->tasks->[$id] = $task;
}
sub incr_currnodes {
my ($s, $quantity) = @_;
......@@ -323,47 +341,55 @@ sub incr_currnodes {
}
sub return_node_resources {
my ($s, $itemid) = @_;
$s->{'currnodes'} -= $s->weight->[$itemid];
my ($s, $task) = @_;
$s->{'currnodes'} -= $task->weight;
}
sub sort_runqueue {
my ($s) = @_;
$s->runqueue( [ sort { $a->weight <=> $b->weight } @{$s->runqueue} ] );
}
sub work {
my ($max_nodes, $proc, $schedule, $items) = @_;
my $s = TestBed::ForkFramework::RateScheduler->new(
'maxnodes' => $max_nodes,
'items' => $items,
'proc' => $proc,
'schedule' => $schedule,
'weight' => [ map { $_->[0] } (sort { $a->[1] <=> $b->[1] } @$schedule) ],
);
say toperl("SCHEDULE", $s->schedule) if $FFDEBUG;
say toperl("WEIGHTS", $s->weight) if $FFDEBUG;
my ($maxnodes, $proc, $items_weights) = @_;
my $s = TestBed::ForkFramework::WeightedScheduler->new(
maxnodes => $maxnodes,
proc => $proc,
);
$s->add_task($_->[0], $_->[1]) for (@$items_weights);
$s->run;
}
sub run {
my ($s) = @_;
$s->sort_runqueue;
$s->workloop;
say("RETRYING") if $FFDEBUG;
$s->inRetry(1);
$s->schedule( [ map { [$s->weight->[$_], $_] } @{$s->retryItems} ] );
$s->retryItems([]);
say toperl("SCHEDULE", $s->schedule) if $FFDEBUG;
say toperl("WEIGHTS", $s->weight) if $FFDEBUG;
$s->runqueue( [ @{$s->retryTasks} ] );
$s->retryTasks([]);
$s->sort_runqueue;
$s->workloop;
}
sub find_largest_item {
my ($s, $max_size) = @_;
my ($s, $max_weight) = @_;
my $found = undef;
#find largest item that is small enough
for (@{ $s->schedule }) {
my $itemsize = $_->[0];
#find largest task that is small enough
for (@{ $s->runqueue }) {
my $item_weight = $_->weight;
last if $itemsize > $max_size;
next if ($found and $found->[0] >= $itemsize);
$found = $_ if $itemsize <= $max_size;
last if $item_weight > $max_weight;
next if ($found and $found->weight >= $item_weight);
$found = $_ if $item_weight <= $max_weight;
}
#remove found from schedule
#remove found from runqueue
if (defined $found) {
$s->schedule( [ grep { !($_->[1] == $found->[1]) } @{ $s->schedule} ]);
$s->runqueue( [ grep { !($_->id == $found->id) } @{ $s->runqueue} ]);
}
return $found;
......@@ -373,28 +399,28 @@ sub spawnWorker { shift->nextJob; }
sub nextJob {
my $s = shift;
my $max_size = $s->maxnodes - $s->currnodes;
my $tuple = $s->find_largest_item($max_size);
my $task = $s->find_largest_item($max_size);
if ($tuple) {
my ($e_node_size, $eindex) = @$tuple;
say(sprintf("found %s size %s max_size $max_size currnodes %s maxnodes %s newcurrnodes %s", $eindex, $e_node_size, $s->currnodes, $s->maxnodes, $s->currnodes +$e_node_size)) if $FFDEBUG;
$s->{'currnodes'} += $e_node_size;
return $eindex;
if ($task) {
say(sprintf("found %s size %s max_size $max_size currnodes %s maxnodes %s newcurrnodes %s", $task->id, $task->weight, $s->currnodes, $s->maxnodes, $s->currnodes + $task->weight)) if $FFDEBUG;
$s->{'currnodes'} += $task->weight;
return $task->id;
}
else { return; }
}
sub doItem { my ($s, $taskid) = @_; $s->proc->($s->tasks->[$taskid]->item); }
use TestBed::ParallelRunner::ErrorConstants;
sub return_and_report {
my ($s, $result) = @_;
$s->recordResult($result);
$s->return_node_resources($result->itemid);
$s->return_node_resources($s->task($result->itemid));
}
sub handleResult {
my ($s, $result) = @_;
my $executor = $s->items->[$result->itemid];
my $executor = $s->tasks->[$result->itemid]->item;
if ($executor->can('handleResult')) {
my $rc = $executor->handleResult($s, $result);
if ($rc == RETURN_AND_REPORT) { $s->return_and_report($result) }
......@@ -404,17 +430,40 @@ sub handleResult {
}
}
sub schedule_at {
my ($s, $result, $runtime) = @_;
my $task = $s->task($result->itemid);
$task->runtime($runtime);
$s->return_node_resources($task);
push @{ $s->waitTasks }, $task;
}
sub schedule {
my ($s) = @_;
my $new_wait_list = [];
#iterate through waiting tasks adding ready tasks to runqueue
for (@{$s->waitTasks}) {
my $id = $_->id;
if ($_->ready) { push @{$s->runqueue}, $_; }
else { push @$new_wait_list, $_; }
}
$s->sort_runqueue;
$s->waitTasks($new_wait_list);
return (scalar @$new_wait_list) || scalar (@{$s->runqueue});
}
sub retry {
my ($s, $result) = @_;
my $itemid = $result->itemid;
if (!$s->inRetry) {
push @{ $s->retryItems }, $itemid;
$s->return_node_resources($itemid);
say "RETRYING item# $itemid";
push @{ $s->retryTasks }, $s->task($itemid);
$s->return_node_resources($s->task($itemid));
# say "RETRYING task# $itemid";
return 1;
}
else {
say "DONE RETRYING";
# say "DONE RETRYING";
$s->return_and_report($result);
}
}
......
......@@ -59,38 +59,53 @@ sub runtests {
$concurrent_node_count_usage ||= $TBConfig::concurrent_node_usage;
#prerun step
my $result = TestBed::ForkFramework::MaxWorkersScheduler::work($concurrent_pre_runs, sub { shift->prep }, $Executors);
my $result = TestBed::ForkFramework::ForEach::max_work($concurrent_pre_runs, sub { shift->prep }, $Executors);
if ($result->has_errors) {
sayd($result->errors);
warn 'TestBed::ParallelRunner::runtests died during test prep';
}
#create schedule step
my @schedule;
my $workscheduler = TestBed::ForkFramework::WeightedScheduler->new(
items => $Executors,
proc => &tap_wrapper,
maxnodes => $concurrent_node_count_usage,
);
#add taskss to scheduler step
my $total_test_count = 0;
for (@{$result->successes}) {
my $item_id = $_->itemid;
my $itemId = $_->itemid;
my $executor = $Executors->[$itemId];
my $maximum_nodes = $_->result->{'maximum_nodes'};
my $eid = $Executors->[$item_id]->e->eid;
#say "$eid $item_id $maximum_nodes";
my $eid = $executor->e->eid;
if ($maximum_nodes > $concurrent_node_count_usage) {
warn "$eid requires upto $maximum_nodes nodes, only $concurrent_node_count_usage concurrent nodes permitted\n$eid will not be run";
}
else {
push @schedule, [ +$maximum_nodes, +$item_id ];
$workscheduler->add_task($itemId, $maximum_nodes);
$total_test_count += $executor->test_count;
}
}
@schedule = sort { $a->[0] <=> $b->[0] } @schedule;
#count tests step
my $test_count = 0;
map { $test_count += $Executors->[$_->[1]]->test_count } @schedule;
USE_TESTBULDER_PREAMBLE: {
reset_test_builder($total_test_count, no_numbers => 1);
}
#run tests
reset_test_builder($test_count, no_numbers => 1);
$result = TestBed::ForkFramework::RateScheduler::work($concurrent_node_count_usage, \&tap_wrapper, \@schedule, $Executors);
set_test_builder_to_end_state($test_count);
$result = $workscheduler->run;
USE_TESTBULDER_POSTAMBLE: {
$total_test_count = 0;
for (@{$result->successes}) {
my $item_id = $_->itemid;
my $executor = $Executors->[$item_id];
$total_test_count += $executor->test_count;
}
set_test_builder_to_end_state($total_test_count);
}
if ($result->has_errors) {
sayd($result->errors);
die 'TestBed::ParallelRunner::runtests died during test execution';
......@@ -167,10 +182,14 @@ TestBed::ParallelRunner
=over 4
=item C<< add_executor >>
=item C<< build_executor >>
helper function called by rege.
creates a TestBed::ParallelRunner::Execu:or job and pushes it onto @$Executors
creates a TestBed::ParallelRunner::Executor job
=item C<< add_executor($executor) >>
pushes $executor onto @$Executors
=item C<< runtests >>
......
......@@ -9,3 +9,17 @@ our @EXPORT = qw(RETURN_AND_REPORT);
use constant RETURN_AND_REPORT => 4096;
1;
=head1 NAME
TestBed::ParallelRunner::ErrorConstants
contains RETURN_AND_REPORT error constant;
=over 4
=back
=cut
1;
......@@ -14,6 +14,7 @@ sub handleResult {
$s->executor($executor);
$s->scheduler($scheduler);
$s->result($result);
if ($result->is_error) {
my $error = $result->error;
if ( $error->isa ( 'TestBed::ParallelRunner::Executor::SwapinError')) { return $s->swapin_error ( @_); }
......@@ -38,6 +39,14 @@ sub xmlrpc_error_cause {
return;
}
sub is_retry_cause {
my $s = shift;
my $cause = $s->xmlrpc_error_cause;
my @retry_causes = qw(temp internal software hardware canceled unknown);
if ( grep { /$$cause/ } @retry_causes) { return 1; }
return 0;
}
sub swapin_error { return RETURN_AND_REPORT; }
sub run_error { return RETURN_AND_REPORT; }
sub swapout_error { return RETURN_AND_REPORT; }
......@@ -52,16 +61,73 @@ extends 'TestBed::ParallelRunner::ErrorStrategy';
sub swapin_error {
my ($s, $executor, $scheduler, $result) = @_;
if ($s->is_retry_cause) { return $scheduler->retry($result); }
if ($s->is_retry_cause) {
warn "Retrying";# . $executor->e->eid;
return $s->scheduler->retry($result);
}
else { return RETURN_AND_REPORT; }
}
sub is_retry_cause {
my $s = shift;
my $cause = $s->xmlrpc_error_cause;
my @retry_causes = qw(temp internal software hardware canceled unknown);
if ( grep { /$$cause/ } @retry_causes) { return 1; }
return 0;
package TestBed::ParallelRunner::BackoffStrategy;