ForkFramework.pm 12.1 KB
Newer Older
1
my $FFDEBUG = 0;
Kevin Tew's avatar
Kevin Tew committed
2

3 4 5 6
package TestBed::ForkFramework::Channel;
use SemiModern::Perl;
use Mouse;
use Data::Dumper;
Kevin Tew's avatar
Kevin Tew committed
7 8
use Carp;
use IO::Pipe;
9 10 11 12 13 14
use Storable qw(store_fd fd_retrieve);

has 'rd' => ( isa => 'Any', is => 'rw');
has 'wr' => ( isa => 'Any', is => 'rw');
has 'pipes' => ( isa => 'ArrayRef', is => 'rw');

Kevin Tew's avatar
Kevin Tew committed
15 16 17 18 19 20 21 22 23 24 25 26 27
sub build           { TestBed::ForkFramework::Channel->new( 'pipes' => [ map IO::Pipe->new, 1 .. 2 ]); }
sub parentAfterFork { my $s = shift; $s->buildChannel(@{ $s->pipes}); }
sub childAfterFork  { my $s = shift; $s->buildChannel(reverse(@{ $s->pipes})); }
sub buildChannel    { my $s = shift; $s->rd(shift->reader); $s->wr(shift->writer); }
sub receive         { receivefd(shift->rd); }
sub send            { sendfd(shift->wr, shift); }
sub receivefd       { my $fd = shift; my $r = fd_retrieve $fd;  return $r->[0]; }
sub sendfd          { my $fd = shift; store_fd [shift], $fd;    $fd->flush; }
sub sendEnd         { my $s = shift; $s->send(undef); $s->closeWr }
sub selectInit      { my $s = shift; return [ $s->rd, $s->wr, 0, $s ]; }
sub closeRd         { my $s = shift; my $fh = $s->rd; close($fh) if defined $fh; $s->rd(undef); }
sub closeWr         { my $s = shift; my $fh = $s->wr; close($fh) if defined $fh; $s->wr(undef); }
sub close           { my $s = shift; $s->closeRd; $s->closeWr; }
28 29 30 31 32 33 34

package TestBed::ForkFramework::Redir;
use SemiModern::Perl;
use Mouse;
use Carp;
use IO::Pipe;

Kevin Tew's avatar
Kevin Tew committed
35
has 'pipes' => ( isa => 'ArrayRef', is => 'rw', default => sub { [ map IO::Pipe->new, 1 .. 3 ] } );
36 37

sub parentAfterFork { my $ps = shift->pipes; $ps->[0]->writer;  $ps->[1]->reader;  $ps->[2]->reader; return wantarray ? @{$ps} : $ps; }
Kevin Tew's avatar
Kevin Tew committed
38
sub childAfterFork  {
39 40
  my $s = shift;
  my ($in, $out, $err) = @{$s->pipes};
Kevin Tew's avatar
Kevin Tew committed
41 42
  $in->reader;  $out->writer;  $err->writer;
  close STDIN; close STDOUT; close STDERR;
43
  open(STDIN,  "<&", $in ->fileno);
Kevin Tew's avatar
Kevin Tew committed
44
  open(STDOUT, ">&", $out->fileno);
45 46 47 48 49
  open(STDERR, ">&", $err->fileno);
  $s->close
}
sub close           { my $hs = shift->pipes; map { close $_; } @$hs; }

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
package TestBed::ForkFramework::Results;
use SemiModern::Perl;
use Mouse;

has 'successes' => ( isa => 'ArrayRef', is => 'rw', default => sub { [ ] } );
has 'errors' => ( isa => 'ArrayRef', is => 'rw', default => sub { [ ] } );

sub push_success { push @{shift->successes}, shift; }
sub push_error { push @{shift->errors}, shift; }
sub has_errors { scalar @{shift->errors};}
sub handle_result { 
  my ($self, $result) = @_;
  if   ( $result->is_error ) { $self->push_error($result); } 
  else { $self->push_success($result); } 
}

package TestBed::ForkFramework::ItemResult;
use SemiModern::Perl;
use Mouse;

has 'result' => ( is => 'rw');
has 'error'  => ( is => 'rw');
has 'itemid' => ( is => 'rw');

sub is_error { shift->error; }

76
package TestBed::ForkFramework;
Kevin Tew's avatar
Kevin Tew committed
77 78
sub forkit {
  my ($parent_worker, $worker) = @_;
79 80 81

  if ( my $pid = fork ) {
    #Parent
Kevin Tew's avatar
Kevin Tew committed
82
    return $parent_worker->($pid);
83 84 85 86 87 88 89 90 91 92 93 94
  }
  else {
    #Child
    use POSIX '_exit';
    eval q{END { _exit 0 }};
  
    $worker->();

    CORE::exit;
  }
}

Kevin Tew's avatar
Kevin Tew committed
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
sub fork_redir {
  my ($parent_worker, $worker) = @_;
  my $redir = TestBed::ForkFramework::Redir->new;
  forkit( sub {
    #Parent
    my ($pid) = @_;
    my $handles = $redir->parentAfterFork;
    return $parent_worker->(@$handles, $pid);
  },
  sub {
    $redir->childAfterFork;
    $worker->();
  }
  );
}

Kevin Tew's avatar
Kevin Tew committed
111 112 113 114 115
sub fork_child_redir {
  my ($worker) = @_;
  fork_redir( sub { return @_; }, $worker);
}

116 117 118 119 120 121 122
package TestBed::ForkFramework::Scheduler;
use SemiModern::Perl;
use Mouse;
use IO::Select;
use Carp;
use Data::Dumper;

Kevin Tew's avatar
Kevin Tew committed
123 124 125 126 127
has 'workers'       => ( is => 'rw', default => sub { [] });
has 'results'       => ( is => 'rw', default => sub { TestBed::ForkFramework::Results->new; });
has 'selector'      => ( is => 'rw', default => sub { IO::Select->new; });
has 'selecttimeout' => ( is => 'rw', default => 10 ); #seconds
has 'proc'          => ( is => 'rw', isa => 'CodeRef' , required => 1 );
128 129


Kevin Tew's avatar
Kevin Tew committed
130 131 132 133 134
sub wait_for_all_children_to_exit {
  my ($self) = @_;
  waitpid( $_, 0 ) for @{ $self->workers };
}

135 136 137 138
sub workloop {
  my ($self) = @_;
  LOOP: {
    while( defined ( my $jobid = $self->spawnWorker ) ) {
139
      say "spawnWorker $jobid" if $FFDEBUG;
140 141
      $self->fffork($jobid);
    }
Kevin Tew's avatar
Kevin Tew committed
142 143 144 145
    my $selectrc = $self->process_select; say "CALL SELECT" if $FFDEBUG;
    my $schedulerc = $self->schedule;

    if ($selectrc || $schedulerc) { redo LOOP; }
146
  }
Kevin Tew's avatar
Kevin Tew committed
147
  $self->wait_for_all_children_to_exit;
148

149
  return $self->results;
150 151
}

152
use constant SELECT_HAS_HANDLES => 1;
Kevin Tew's avatar
Kevin Tew committed
153 154
use constant SELECT_NO_HANDLES  => 0;

Kevin Tew's avatar
Kevin Tew committed
155
sub process_select {
156 157 158 159
  my ($self) = @_;
  my $selector = $self->selector;
  if ($selector->count) {
    eval {
Kevin Tew's avatar
Kevin Tew committed
160
      for my $r ($selector->can_read($self->selecttimeout)) {
161 162
        my ($rh, $wh, $eof, $ch) = @$r;
        if (defined (my $result = $ch->receive)) {
163
          $self->handleResult($result);
164 165
          
          unless ( $eof ) {
Kevin Tew's avatar
Kevin Tew committed
166
            if( my $jobid = $self->nextJob ) { 
167
              say "newjob $jobid" if $FFDEBUG;
Kevin Tew's avatar
Kevin Tew committed
168
              $ch->send($jobid); }
169
            else {
170
              say "no work killing $rh" if $FFDEBUG;
171 172 173 174 175 176
              $ch->sendEnd;
              @{$r}[1,2] = (undef, 1);            
            }
          }
        }
        else {
177
          say "received null ack from $rh" if $FFDEBUG;
178 179 180 181 182 183
          $selector->remove($r);
          $ch->close;
        }
      }
    };
    if ( my $error = $@ ) {
184
      say "SELECT HAS ERRORS" if $FFDEBUG;
185
      $_->[3]->sendEnd for $selector->handles;
Kevin Tew's avatar
Kevin Tew committed
186
      $self->wait_for_all_children_to_exit;
187 188
      die $error;
    }
189
    say "SELECT_HAS_HANDLES" if $FFDEBUG;
Kevin Tew's avatar
Kevin Tew committed
190
    return SELECT_HAS_HANDLES;
191
  }
192
  say "SELECT_NO_HANDLES" if $FFDEBUG;
Kevin Tew's avatar
Kevin Tew committed
193
  return SELECT_NO_HANDLES;
194 195 196 197
}

sub fffork {
  my ($self, $workid) = @_;
Kevin Tew's avatar
Kevin Tew committed
198
  my $ch = TestBed::ForkFramework::Channel::build;
199 200 201

  if ( my $pid = fork ) {
    #Parent
Kevin Tew's avatar
Kevin Tew committed
202
    $ch->parentAfterFork;
203 204 205 206 207 208
    push @{ $self->workers }, $pid;
    $self->selector->add($ch->selectInit);
    $ch->send($workid);
  }
  else {
    #Child
Kevin Tew's avatar
Kevin Tew committed
209
    $ch->childAfterFork;
210 211 212 213 214 215 216
    
    use POSIX '_exit';
    eval q{END { _exit 0 }};

    while ( defined( my $itemid = $ch->receive )) {
      my $result = eval { $self->doItem($itemid); };
      my $error  = $@;
217
      $ch->send(TestBed::ForkFramework::ItemResult->new(itemid => $itemid, result => $result, error => $error));
218 219 220 221 222 223 224 225
    }
    $ch->sendEnd;
    $ch->close;

    CORE::exit;
  }
}

Kevin Tew's avatar
Kevin Tew committed
226
sub doItem { die "HAVE TO IMPLEMENT doItem"; }
227 228
sub handleResult { recordResult(@_); }
sub recordResult { shift->results->handle_result(shift); }
Kevin Tew's avatar
Kevin Tew committed
229
sub schedule { 0; }
230 231 232 233 234

package TestBed::ForkFramework::ForEach;
use SemiModern::Perl;
use Mouse;

Kevin Tew's avatar
Kevin Tew committed
235 236 237 238
has 'maxworkers'  => ( is => 'rw', isa => 'Int'     , default => 4);
has 'currworkers' => ( is => 'rw', isa => 'Int'     , default => 0);
has 'iter'        => ( is => 'rw', isa => 'CodeRef' , required => 1);
has 'items'       => ( is => 'rw', isa => 'ArrayRef', required => 1 );
239 240 241

extends 'TestBed::ForkFramework::Scheduler';

Kevin Tew's avatar
Kevin Tew committed
242 243 244 245 246 247 248
sub spawnWorker { 
  my $s = shift; 
  return if ($s->currworkers >= $s->maxworkers);
  $s->{'currworkers'}++;
  $s->nextJob; 
}

249 250 251 252 253 254 255
sub nextJob { 
  my @res = shift->iter->();
  $res[0];
}

sub work {
  my ($proc, $items) = @_;
Kevin Tew's avatar
Kevin Tew committed
256
  return max_work(scalar @$items, $proc, $items);
257 258
}

Kevin Tew's avatar
Kevin Tew committed
259 260 261 262 263 264 265 266 267 268 269
sub _gen_iterator {
  my $items = shift;
  my @ar = @$items;
  my $pos = 0;
  return sub {
    return if $pos >= @ar;
    my @r = ( $pos, $ar[$pos] );
    $pos++;
    return @r;
  }
}
270

Kevin Tew's avatar
Kevin Tew committed
271
sub max_work {
272
  my ($max_workers, $proc, $items) = @_;
Kevin Tew's avatar
Kevin Tew committed
273
  my $s = TestBed::ForkFramework::ForEach->new(
274 275 276
    'maxworkers' => $max_workers, 
    'items' => $items,
    'proc' => $proc,
Kevin Tew's avatar
Kevin Tew committed
277
    'iter' => _gen_iterator($items),
Kevin Tew's avatar
Kevin Tew committed
278
  );
279 280 281
  $s->workloop;
}

Kevin Tew's avatar
Kevin Tew committed
282
sub doItem { my ($s, $itemid) = @_; $s->proc->($s->items->[$itemid]); }
283

Kevin Tew's avatar
Kevin Tew committed
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
package TestBed::ForkFramework::WeightedScheduler::Task;
use SemiModern::Perl;
use Mouse;

has 'id'      => (is => 'rw');
has 'item'    => (is => 'rw');
has 'runtime' => (is => 'rw', default => 0);
has 'weight'  => (is => 'rw', default => 0);

sub build {
  shift;
  return TestBed::ForkFramework::WeightedScheduler::Task->new(
    id => shift,
    item => shift, 
    weight => shift
  );
300 301
}

Kevin Tew's avatar
Kevin Tew committed
302 303 304
sub ready { return time >= shift->runtime; }

package TestBed::ForkFramework::WeightedScheduler;
305 306 307 308 309 310 311
use SemiModern::Perl;
use Data::Dumper;
use Tools;
use Mouse;

extends 'TestBed::ForkFramework::Scheduler';

Kevin Tew's avatar
Kevin Tew committed
312
has 'ids'  =>      ( isa => 'Int'      , is => 'rw', default => 0);
Kevin Tew's avatar
Kevin Tew committed
313 314
has 'maxnodes'  => ( isa => 'Int'      , is => 'rw', default => 20);
has 'currnodes' => ( isa => 'Int'      , is => 'rw', default => 0);
Kevin Tew's avatar
Kevin Tew committed
315 316 317 318
has 'runqueue'  => ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'tasks'     => ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'retryTasks'=> ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
has 'waitTasks' => ( isa => 'ArrayRef' , is => 'rw', default => sub { [] } );
319 320
has 'inRetry'   => ( isa => 'Int'      , is => 'rw', default => 0);

Kevin Tew's avatar
Kevin Tew committed
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
sub nextID { 
  my ($s) = @_;
  my $id = $s->ids;
  $s->ids($id + 1);
  $id;
}

sub task { shift->tasks->[shift]; }

sub add_task {
  my ($s, $item, $weight) = @_;
  my $id = $s->nextID;
  my $task = TestBed::ForkFramework::WeightedScheduler::Task->build($id, $item, $weight);
  push @{$s->runqueue}, $task;
  $s->tasks->[$id] = $task;
}
337 338 339 340 341 342 343

sub incr_currnodes {
  my ($s, $quantity) = @_;
  $s->{'currnodes'} += $quantity;
}

sub return_node_resources {
Kevin Tew's avatar
Kevin Tew committed
344 345 346 347 348 349 350
  my ($s, $task) = @_;
  $s->{'currnodes'} -= $task->weight;
}

sub sort_runqueue {
  my ($s) = @_;
  $s->runqueue( [ sort { $a->weight <=> $b->weight } @{$s->runqueue} ] );
351
}
352 353

sub work {
Kevin Tew's avatar
Kevin Tew committed
354 355 356 357 358 359 360 361 362 363 364 365
  my ($maxnodes, $proc, $items_weights) = @_;
  my $s = TestBed::ForkFramework::WeightedScheduler->new(
    maxnodes => $maxnodes,
    proc => $proc,
  );
  $s->add_task($_->[0], $_->[1]) for (@$items_weights);
  $s->run;
}

sub run {
  my ($s) = @_;
  $s->sort_runqueue;
Kevin Tew's avatar
Kevin Tew committed
366
  $s->workloop;
Kevin Tew's avatar
Kevin Tew committed
367

368
  say("RETRYING") if $FFDEBUG;
Kevin Tew's avatar
Kevin Tew committed
369
  $s->inRetry(1);
Kevin Tew's avatar
Kevin Tew committed
370 371 372 373
  $s->runqueue( [ @{$s->retryTasks} ] );
  $s->retryTasks([]);

  $s->sort_runqueue;
374 375 376 377
  $s->workloop;
}

sub find_largest_item {
Kevin Tew's avatar
Kevin Tew committed
378
  my ($s, $max_weight) = @_;
379 380
  my $found = undef;

Kevin Tew's avatar
Kevin Tew committed
381 382 383
  #find largest task that is small enough
  for (@{ $s->runqueue }) {
    my $item_weight = $_->weight;
384

Kevin Tew's avatar
Kevin Tew committed
385 386 387
    last if $item_weight > $max_weight;
    next if ($found and $found->weight >= $item_weight);
    $found = $_ if $item_weight <= $max_weight;
388 389
  }

Kevin Tew's avatar
Kevin Tew committed
390
  #remove found from runqueue
391
  if (defined $found) {
Kevin Tew's avatar
Kevin Tew committed
392
    $s->runqueue( [ grep { !($_->id == $found->id) } @{ $s->runqueue} ]);
393 394 395 396
  }

  return $found;
}
Kevin Tew's avatar
Kevin Tew committed
397

398 399 400 401
sub spawnWorker { shift->nextJob; }
sub nextJob { 
  my $s = shift;
  my $max_size = $s->maxnodes - $s->currnodes; 
Kevin Tew's avatar
Kevin Tew committed
402
  my $task = $s->find_largest_item($max_size);
403

Kevin Tew's avatar
Kevin Tew committed
404 405 406 407
  if ($task) {
    say(sprintf("found %s size %s max_size $max_size currnodes %s maxnodes %s newcurrnodes %s", $task->id, $task->weight, $s->currnodes, $s->maxnodes, $s->currnodes + $task->weight)) if $FFDEBUG;
    $s->{'currnodes'} += $task->weight;
    return $task->id;
408
  }
409 410
  else { return; }
}
Kevin Tew's avatar
Kevin Tew committed
411
sub doItem { my ($s, $taskid) = @_; $s->proc->($s->tasks->[$taskid]->item); }
412 413 414 415 416 417

use TestBed::ParallelRunner::ErrorConstants;

sub return_and_report {
  my ($s, $result) = @_;
  $s->recordResult($result);
Kevin Tew's avatar
Kevin Tew committed
418
  $s->return_node_resources($s->task($result->itemid));
419 420 421 422
}

sub handleResult { 
  my ($s, $result) = @_;
Kevin Tew's avatar
Kevin Tew committed
423
  my $executor = $s->tasks->[$result->itemid]->item;
424 425 426 427
  if ($executor->can('handleResult')) {
    my $rc = $executor->handleResult($s, $result);
    if ($rc == RETURN_AND_REPORT) { $s->return_and_report($result) }
  }
428
  else {
429
    $s->return_and_report($result);
430 431
  }
}
Kevin Tew's avatar
Kevin Tew committed
432

Kevin Tew's avatar
Kevin Tew committed
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
sub schedule_at {
  my ($s, $result, $runtime) = @_;
  my $task = $s->task($result->itemid);
  $task->runtime($runtime);
  $s->return_node_resources($task);
  push @{ $s->waitTasks }, $task;
}

sub schedule {
  my ($s) = @_;
  my $new_wait_list = [];

  #iterate through waiting tasks adding ready tasks to runqueue
  for (@{$s->waitTasks}) {
    my $id = $_->id;
    if ($_->ready) { push @{$s->runqueue}, $_; }
    else { push @$new_wait_list, $_; }  
  }
  $s->sort_runqueue;
  $s->waitTasks($new_wait_list);
  return (scalar @$new_wait_list) || scalar (@{$s->runqueue});
}

456 457 458 459
sub retry {
  my ($s, $result) = @_;
  my $itemid = $result->itemid;
  if (!$s->inRetry) {
Kevin Tew's avatar
Kevin Tew committed
460 461 462
    push @{ $s->retryTasks }, $s->task($itemid);
    $s->return_node_resources($s->task($itemid));
#    say "RETRYING task# $itemid";
463 464 465
    return 1;
  }
  else { 
Kevin Tew's avatar
Kevin Tew committed
466
#    say "DONE RETRYING";
467
    $s->return_and_report($result); 
Kevin Tew's avatar
Kevin Tew committed
468 469
  }
}
470

471
1;