Commit aead4580 authored by Leigh Stoller's avatar Leigh Stoller

Rework the event handling for the program agent so that both the

reload and halt events send proper completion events. This is required
for stoprun and startrun to work correctly. On stoprun the logs are
not collected until the programs have stopped, and on startrun we do
not want to proceed until all the agents have reloaded their
environments.
parent 2af56958
......@@ -1242,6 +1242,50 @@ sub ClearPortRegistration($)
return 0;
}
#
# Setup up phony program agent event agents and groups. This is so we
# can talk to the program agent itself, not to the programs the agent
# is responsible for.
#
sub SetupProgramAgents($)
{
my ($self) = @_;
# Must be a real reference.
return -1
if (! ref($self));
my $pid = $self->pid();
my $eid = $self->eid();
my $query_result =
DBQueryWarn("select distinct vnode from virt_programs ".
"where pid='$pid' and eid='$eid'");
return -1
if (!defined($query_result));
return 0
if (! $query_result->numrows);
while (my ($vnode) = $query_result->fetchrow_array()) {
DBQueryWarn("replace into virt_agents ".
" (pid, eid, vname, vnode, objecttype) ".
" select '$pid', '$eid', '__${vnode}_program-agent', ".
" '$vnode', ".
" idx from event_objecttypes where ".
" event_objecttypes.type='PROGRAM'")
or return -1;
DBQueryWarn("replace into event_groups ".
" (pid, eid, idx, group_name, agent_name) ".
" values ('$pid', '$eid', NULL, ".
" '__all_program-agents', ".
" '__${vnode}_program-agent')")
or return -1;
}
return 0;
}
#
# Write the virt program data for the program agent that will run on ops.
# Ops does not speak to tmcd for experiments, so need to get this info
......
......@@ -97,6 +97,11 @@ static int numagents;
*/
static char *user;
/**
* Our vnode name, for subscribing to events for the program agent itself.
*/
static char *vnode;
/**
* Pipe used by the SIGCHLD handler to notify the main event loop that one or
* more chld processes are ready to be reaped.
......@@ -118,6 +123,11 @@ static char *envfile;
*/
static char *pideid;
/**
* The name of the token file, which holds a completion token.
*/
static char *tokenfile;
/**
* Elvin error object.
*/
......@@ -129,10 +139,12 @@ static elvin_error_t elvin_error;
enum {
PIB_TIMEOUT_FIRED, /*< Indicates that the process was terminated
via a timeout and not a regular signal. */
PIB_HALT_COMPLETION,
};
enum {
PIF_TIMEOUT_FIRED = (1L << PIB_TIMEOUT_FIRED),
PIF_TIMEOUT_FIRED = (1L << PIB_TIMEOUT_FIRED),
PIF_HALT_COMPLETION = (1L << PIB_HALT_COMPLETION),
};
/**
......@@ -156,6 +168,7 @@ struct proginfo {
int pid;
struct timeval started;
unsigned long token;
unsigned long halt_token;
unsigned long flags;
struct proginfo *next;
};
......@@ -192,13 +205,13 @@ static void start_callback(event_handle_t handle,
/**
* Handler for the RELOAD event, which tells the program agent to reload
* its environment, and includes the new environment strings.
* its environment.
*
* @param handle The connection to the event system.
* @param notification The start event.
* @param data NULL
*/
static void startrun_callback(event_handle_t handle,
static void reload_callback(event_handle_t handle,
event_notification_t notification,
void *data);
......@@ -420,7 +433,7 @@ main(int argc, char **argv)
progname = argv[0];
bzero(agentlist, sizeof(agentlist));
while ((c = getopt(argc, argv, "hVdrs:p:l:u:i:e:c:k:f:o:")) != -1) {
while ((c = getopt(argc, argv, "hVdrs:p:l:u:i:e:c:k:f:o:v:t:")) != -1){
switch (c) {
case 'h':
usage(progname);
......@@ -455,7 +468,6 @@ main(int argc, char **argv)
break;
case 'o':
LOGDIR = optarg;
isops = 1;
break;
case 'i':
pidfile = optarg;
......@@ -490,6 +502,14 @@ main(int argc, char **argv)
case 'k':
keyfile = optarg;
break;
case 't':
tokenfile = optarg;
break;
case 'v':
vnode = optarg;
if (strcmp(vnode, "ops") == 0)
isops = 1;
break;
default:
usage(progname);
}
......@@ -771,6 +791,7 @@ main(int argc, char **argv)
TBDB_EVENTTYPE_RUN ","
TBDB_EVENTTYPE_START ","
TBDB_EVENTTYPE_STOP ","
TBDB_EVENTTYPE_HALT ","
TBDB_EVENTTYPE_KILL;
/*
......@@ -791,15 +812,41 @@ main(int argc, char **argv)
fatal("could not subscribe to event");
}
snprintf(buf, sizeof(buf), "__%s_program-agent", vnode);
tuple->objtype = TBDB_OBJECTTYPE_PROGRAM;
tuple->objname = ADDRESSTUPLE_ALL;
tuple->eventtype = TBDB_EVENTTYPE_RELOAD "," TBDB_EVENTTYPE_STOP;
tuple->objname = buf;
tuple->eventtype = TBDB_EVENTTYPE_RELOAD;
if (tokenfile && access(tokenfile, R_OK) == 0) {
FILE *fp;
unsigned long token = ~0;
if ((fp = fopen(tokenfile, "r")) != NULL) {
if (fscanf(fp, "%lu", &token) == 1) {
event_do(handle,
EA_Experiment, pideid,
EA_Type, TBDB_OBJECTTYPE_PROGRAM,
EA_Name, buf,
EA_Event, TBDB_EVENTTYPE_COMPLETE,
EA_ArgInteger, "ERROR", 0,
EA_ArgInteger, "CTOKEN", token,
EA_TAG_DONE);
}
else {
error("tokenfile could not be parsed!\n");
}
}
else {
errorc("Could not open token file for reading!");
}
}
/*
* Subscribe to the RELOAD/STOP start event we specified above.
* Subscribe to the RELOAD start event we specified above.
*/
if (! event_subscribe(handle, startrun_callback, tuple, NULL)) {
fatal("could not subscribe to event");
if (! event_subscribe(handle, reload_callback, tuple, NULL)) {
fatal("could not subscribe to reload event");
}
/*
......@@ -933,6 +980,27 @@ callback(event_handle_t handle, event_notification_t notification, void *data)
else if (strcmp(event, TBDB_EVENTTYPE_STOP) == 0) {
stop_program(pinfo, args);
}
else if (strcmp(event, TBDB_EVENTTYPE_HALT) == 0) {
/*
* HALT is special; it sends an event to the caller when
* the program actually exits.
*/
if (! pinfo->pid) {
event_do(handle,
EA_Experiment, pideid,
EA_Type, TBDB_OBJECTTYPE_PROGRAM,
EA_Name, pinfo->name,
EA_Event, TBDB_EVENTTYPE_COMPLETE,
EA_ArgInteger, "ERROR", 0,
EA_ArgInteger, "CTOKEN", token,
EA_TAG_DONE);
return;
}
pinfo->halt_token = token;
pinfo->flags |= PIF_HALT_COMPLETION;
stop_program(pinfo, args);
}
else if (strcmp(event, TBDB_EVENTTYPE_KILL) == 0) {
signal_program(pinfo, args);
}
......@@ -1066,13 +1134,13 @@ start_callback(event_handle_t handle,
}
static void
startrun_callback(event_handle_t handle,
reload_callback(event_handle_t handle,
event_notification_t notification,
void *data)
{
struct proginfo *pinfo;
char event[TBDB_FLEN_EVEVENTTYPE];
char objname[TBDB_FLEN_EVOBJTYPE];
unsigned long token = ~0;
assert(handle != NULL);
assert(notification != NULL);
......@@ -1088,28 +1156,8 @@ startrun_callback(event_handle_t handle,
error("Could not get objname from notification!\n");
return;
}
/* XXX Ignore events that are not to ALL. */
if (strcmp(objname, ADDRESSTUPLE_ALL))
return;
/*
* XXX Both of these need to send completion events
*/
if (strcmp(event, TBDB_EVENTTYPE_STOP) == 0) {
info("startrun_callback: Got a stop event.\n");
/*
* Stop all running programs so that their log files
* are complete.
*/
for (pinfo = proginfos; pinfo != NULL; pinfo = pinfo->next) {
if (pinfo->pid != 0) {
stop_program(pinfo, NULL);
}
}
return;
}
event_notification_get_int32(handle, notification,
"TOKEN", (int32_t *)&token);
if (strcmp(event, TBDB_EVENTTYPE_RELOAD) == 0) {
info("startrun_callback: Got a reload event.\n");
......@@ -1120,12 +1168,34 @@ startrun_callback(event_handle_t handle,
*/
if (isops) {
parse_configfile_env(envfile);
event_do(handle,
EA_Experiment, pideid,
EA_Type, TBDB_OBJECTTYPE_PROGRAM,
EA_Name, objname,
EA_Event, TBDB_EVENTTYPE_COMPLETE,
EA_ArgInteger, "ERROR", 0,
EA_ArgInteger, "CTOKEN", token,
EA_TAG_DONE);
return;
}
/*
* Wrapper will restart us.
* Wrapper will restart us but first write the token to a
* file so that we can send a completion upon restart.
*/
if (tokenfile) {
FILE *fp;
if ((fp = fopen(tokenfile, "w")) == NULL) {
errorc("Could not open token file");
exit(-1);
}
fprintf(fp, "%lu\n", token);
fflush(fp);
fclose(fp);
}
exit(45);
}
}
......@@ -1915,11 +1985,25 @@ child_callback(elvin_io_handler_t handler,
pi->token = ~0;
pi->flags &= ~(PIF_TIMEOUT_FIRED);
if (pi->flags & PIF_HALT_COMPLETION) {
event_do(handle,
EA_Experiment, pideid,
EA_Type, TBDB_OBJECTTYPE_PROGRAM,
EA_Name, pi->name,
EA_Event, TBDB_EVENTTYPE_COMPLETE,
EA_ArgInteger, "ERROR", exit_code,
EA_ArgInteger, "CTOKEN",
pi->halt_token,
EA_TAG_DONE);
pi->flags &= ~(PIF_HALT_COMPLETION);
}
if (pi->timeout_handle != NULL) {
elvin_sync_remove_timeout(pi->timeout_handle,
eerror);
pi->timeout_handle = NULL;
}
}
}
......
......@@ -553,6 +553,7 @@ int sends_complete(struct agent *agent, const char *evtype)
static char *run_completes[] = {
TBDB_EVENTTYPE_RUN,
TBDB_EVENTTYPE_HALT,
TBDB_EVENTTYPE_RELOAD,
NULL
};
......
......@@ -504,6 +504,9 @@ static void *simulator_agent_looper(void *arg)
(int32_t *)&token);
argsbuf[sizeof(argsbuf) - 1] = '\0';
/* Strictly for the event viewer */
event_notify(handle, se.notification);
if (strcmp(evtype, TBDB_EVENTTYPE_SWAPOUT) == 0) {
EmulabResponse er;
......
......@@ -267,6 +267,7 @@ push(@sched_command_options, ("-s", "localhost", "-k", $keyfile, $pid, $eid));
my @agent_command_options = ("-u", $user, "-d", "-e", "$pid/$eid",
"-k", $keyfile,
"-v", "ops",
"-c", "$EXPDIR/tbdata/program_agents",
"-f", "$EXPDIR/tbdata/environment",
"-o", "$EXPDIR/logs/ops");
......
# -*- tcl -*-
#
# EMULAB-COPYRIGHT
# Copyright (c) 2004 University of Utah and the Flux Group.
# Copyright (c) 2004-2006 University of Utah and the Flux Group.
# All rights reserved.
#
......@@ -38,6 +38,15 @@ EventGroup instproc rename {old new} {
$sim rename_eventgroup $old $new
}
EventGroup instproc rename-agent {old new} {
$self instvar members
if { [info exists members($old)] } {
unset members($old)
set members($new) {}
}
}
#
# Add members to the event group.
#
......@@ -81,7 +90,6 @@ EventGroup instproc updatedb {DB} {
$self instvar sim
if {[array size members] == 0} {
perror "\[updatedb] $self has no member list."
return
}
......
......@@ -16,10 +16,19 @@ Class Program -superclass NSObject
namespace eval GLOBALS {
set new_classes(Program) {}
variable all_programs {}
}
Program instproc init {s} {
global ::GLOBALS::last_class
global ::GLOBALS::all_programs
if {$all_programs == {}} {
# Create a default event group to hold all program agents.
set foo [uplevel \#0 "set __all_programs [new EventGroup $s]"]
set all_programs $foo
}
$all_programs add $self
$self set sim $s
$self set node {}
......@@ -35,9 +44,11 @@ Program instproc init {s} {
}
Program instproc rename {old new} {
global ::GLOBALS::all_programs
$self instvar sim
$sim rename_program $old $new
$all_programs rename-agent $old $new
}
# updatedb DB
......
......@@ -27,6 +27,8 @@ Class EventGroup -superclass NSObject
Class Firewall -superclass NSObject
Simulator instproc init {args} {
var_import ::GLOBALS::program_group
# A counter for internal ids
$self set id_counter 0
......@@ -1689,8 +1691,7 @@ Simulator instproc make_event {outer event} {
}
"reset-lans" {
set otype LINK
set vname "all_lans"
set vnode "*"
set vname "__all_lans"
set etype RESET
}
unknown {
......
......@@ -478,6 +478,10 @@ print "Writing environment strings ...\n";
$experiment->WriteEnvVariables() == 0
or fatal("Could not write environment strings for program agents");
print "Setting up additional program agent support ...\n";
$experiment->SetupProgramAgents() == 0
or fatal("Could not setup program agent support");
print "Writing program agent info ...\n";
$experiment->WriteProgramAgents() == 0
or fatal("Could not write program agent info");
......
......@@ -350,13 +350,11 @@ if ($waitmode) {
# Might not be a current run, which is okay.
#
if (defined($instance->runidx())) {
SignalProgAgents("STOP");
# Ug. I need to figure out how to hook into the event sequence
# mechanism so I can use a completion event.
print "Asking program agents to stop ... this will take a moment.\n";
sleep(5);
SignalProgAgents("STOP");
# This sets the stop time.
$instance->StopCurrentRun() == 0
or fatal(-1, "Could not stop experiment run for $instance!");
......@@ -444,15 +442,14 @@ if ($paramfile) {
DBQueryFatal("update virt_user_environment set value=$value ".
"where pid='$pid' and eid='$eid' and name='$name'");
}
print "Writing environment strings ...\n";
$instance->WriteEnvVariables() == 0
or fatal(-1, "Could not write environment strings for program agents");
print "Asking program agents to reload ... this will take a moment.\n";
SignalProgAgents("RELOAD");
# XXX Need to use a completion event!
sleep(5);
}
print "Writing environment strings ...\n";
$instance->WriteEnvVariables() == 0
or fatal(-1, "Could not write environment strings for program agents");
#
# Restart the event stream from the beginning.
#
......@@ -679,58 +676,22 @@ sub sighandler($) {
}
#
# Send the new environement strings to the program agents as an event.
# This bypasses the scheduler and goes directly. Not sure if this is the
# best approach, but I like it better then restarting the agents and having
# them contact tmcd.
# Use tevc to send an event and wait for completion.
#
sub SignalProgAgents($)
{
my ($action) = @_;
if (!defined($handle)) {
my $URL = "elvin://" . TB_EVENTSERVER();
my $keyfile = TBDB_EVENTKEY($pid, $eid);
$handle = event_register_withkeyfile($URL, 0, $keyfile);
fatal(-1, "Could not connect to event system!")
if (!$handle);
}
my $tuple = address_tuple_alloc();
fatal(-1, "Could not allocate an address tuple\n")
if (!$tuple);
%$tuple = (host => $event::ADDRESSTUPLE_ALL,
objtype => "PROGRAM",
objname => $event::ADDRESSTUPLE_ALL,
eventtype => $action,
expt => "$pid/$eid");
my $notification = event_notification_alloc($handle, $tuple);
fatal(-1, "Could not allocate a notification\n")
if (!$notification);
if ($action eq "RELOAD") {
# Add in the new environ strings.
my $binding_string = "";
foreach my $name (keys(%parameters)) {
my $value = $parameters{$name};
my $agent;
$binding_string .= "$name=$value\n";
}
if (! event_notification_put_string($handle, $notification,
"environment", $binding_string)) {
fatal(-1, "Could not add environment strings to notification\n")
}
if ($action eq "STOP") {
$agent = "__all_programs";
}
if (!event_notify($handle, $notification)) {
fatal(-1, "could not send environment event notification!");
else {
$agent = "__all_program-agents";
}
system("$tevc -w -t 60 -e $pid/$eid now $agent $action") == 0
or fatal(-1, "Could not send event notification!");
}
sub SendCompletionEvent()
......
......@@ -30,6 +30,7 @@ my $WRAPLOG = "$LOGDIR/progwrap.debug";
my $PIDFILE = "/var/run/progagent.pid";
my $CONFIG = "$BOOTDIR/progagents";
my $PAGENT = "$BINDIR/program-agent";
my $TOKEN = "/var/tmp/progagent-token";
#
# Load the OS independent support library. It will load the OS dependent
......@@ -133,6 +134,8 @@ sub doboot()
# Fully disconnect from bootup.
setsid();
$TOKEN .= ".$$";
while (1) {
if (tmcc(TMCCCMD_PROGRAMS, undef, \@agents) < 0) {
fatal("Could not get progagent config from server!");
......@@ -207,10 +210,11 @@ sub doboot()
}
system("$PAGENT -e $pid/$eid -s localhost -l $LOGFILE ".
"-d -i $PIDFILE -k " . TMEVENTKEY() . " -c $CONFIG ".
"-r -p $elvind_port");
"-v $vname -r -p $elvind_port -t $TOKEN");
}
else {
system("$PAGENT -e $pid/$eid -s localhost -l $LOGFILE ".
"-v $vname -t $TOKEN ".
"-d -i $PIDFILE -k " . TMEVENTKEY() . " -c $CONFIG");
exit(0)
if (! $?);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment