Commit bd627836 authored by Timothy Stack's avatar Timothy Stack

Some event system changes for linktest and any future things we want to
run with the event system in the swapin path:

	* event/linktest/linktest_control.in: Let linktest be run while
	the experiment is activating.

	* event/sched/event-sched.c, event/sched/rpc.h,
	event/sched/rpc.cc: Don't wait for the experiment to become active
	before loading the eventlist so any system defined agents are
	available to use.  Don't start time unless the experiment is
	already active, let boss do it otherwise.  Send out COMPLETE
	events so 'tevc' can listen for them.

	* event/sched/timeline-agent.c: Send a complete event even if the
	timeline is empty.

	* event/tbgen/tevc.c: Add a '-w' option so that tevc can wait for
	an event that sends back a COMPLETE.

	* tbsetup/tbswap.in: Explicitly send an event to start event time.
parent aeed9520
......@@ -139,9 +139,9 @@ if (! ($expstate = ExpState($pid, $eid))) {
die("*** $0:\n".
" No such experiment $pid/$eid!\n");
}
if ($expstate ne EXPTSTATE_ACTIVE) {
if ($expstate ne EXPTSTATE_ACTIVE && $expstate ne EXPTSTATE_ACTIVATING) {
die("*** $0:\n".
" Experiment $pid/$eid must active!\n");
" Experiment $pid/$eid must be active!\n");
}
# Need this to pass to boss.
......
......@@ -63,6 +63,8 @@ static int get_static_events(event_handle_t handle);
int debug;
const char *XMLRPC_ROOT = TBROOT;
expt_state_t expt_state;
struct lnList agents;
static struct lnList timelines;
......@@ -383,9 +385,11 @@ main(int argc, char *argv[])
fatal("could not subscribe to EVENT_SCHEDULE event");
}
if (RPC_waitforactive(pid, eid))
fatal("waitforactive: RPC failed!");
expt_state = RPC_expt_state(pid, eid);
if (expt_state == ES_UNKNOWN)
fatal("Experiment is not activating or active\n");
if (RPC_agentlist(handle, pid, eid))
fatal("Could not get agentlist from RPC server\n");
......@@ -795,6 +799,8 @@ handle_event(event_handle_t handle, sched_event_t *se)
primary_simulator_agent->sa_flags |= SAF_TIME_STARTED;
RPC_notifystart(pid, eid, "", 1);
RPC_drop();
did_start = 1;
}
}
}
......@@ -1240,6 +1246,11 @@ get_static_events(event_handle_t handle)
event.time.tv_usec = 1;
event.length = 1;
event.flags = SEF_SINGLE_HANDLER;
event_notification_put_int32(handle,
event.notification,
"TOKEN",
next_token);
next_token += 1;
sched_event_prepare(handle, &event);
timeline_agent_append(ns_sequence, &event);
......@@ -1265,6 +1276,11 @@ get_static_events(event_handle_t handle)
event.agent.s = NULL;
event.length = 1;
event.flags = SEF_TIME_START;
event_notification_put_int32(handle,
event.notification,
"TOKEN",
next_token);
next_token += 1;
timeline_agent_append(ns_sequence, &event);
......@@ -1280,6 +1296,11 @@ get_static_events(event_handle_t handle)
event.time.tv_usec = 3;
event.length = 1;
event.flags = SEF_SENDS_COMPLETE | SEF_SINGLE_HANDLER;
event_notification_put_int32(handle,
event.notification,
"TOKEN",
next_token);
next_token += 1;
sched_event_prepare(handle, &event);
timeline_agent_append(ns_sequence, &event);
......@@ -1311,12 +1332,14 @@ get_static_events(event_handle_t handle)
return -1;
}
event_do(handle,
EA_Experiment, pideid,
EA_Type, TBDB_OBJECTTYPE_SEQUENCE,
EA_Event, TBDB_EVENTTYPE_START,
EA_Name, ns_sequence_agent.name,
EA_TAG_DONE);
if (expt_state == ES_ACTIVE) {
event_do(handle,
EA_Experiment, pideid,
EA_Type, TBDB_OBJECTTYPE_SEQUENCE,
EA_Event, TBDB_EVENTTYPE_START,
EA_Name, ns_sequence_agent.name,
EA_TAG_DONE);
}
return 0;
}
......@@ -1366,6 +1389,9 @@ handle_completeevent(event_handle_t handle, sched_event_t *eventp)
char objtype[TBDB_FLEN_EVOBJTYPE];
int rc, ctoken = ~0, agerror = 0, handled = 0;
event_notification_insert_hmac(handle, eventp->notification);
event_notify(handle, eventp->notification);
event_notification_get_objname(handle, eventp->notification,
objname, sizeof(objname));
......
......@@ -308,9 +308,10 @@ RPC_metadata(char *pid, char *eid)
return retval;
}
int
RPC_waitforactive(char *pid, char *eid)
expt_state_t
RPC_expt_state(char *pid, char *eid)
{
expt_state_t retval = ES_UNKNOWN;
emulab::EmulabResponse er;
assert(pid != NULL);
......@@ -318,7 +319,19 @@ RPC_waitforactive(char *pid, char *eid)
assert(eid != NULL);
assert(strlen(eid) > 0);
return RPC_invoke(pid, eid, "experiment.waitforactive", &er);
if (RPC_invoke(pid, eid, "experiment.state", &er) == 0) {
ulxr::RpcString tmp;
const char *state;
tmp = er.getValue();
state = tmp.getString().c_str();
if (strcmp(state, "activating") == 0)
retval = ES_ACTIVATING;
else if (strcmp(state, "active") == 0)
retval = ES_ACTIVE;
}
return retval;
}
int RPC_notifystart(char *pid, char *eid, char *timeline, int set_or_clear)
......@@ -431,6 +444,19 @@ RPC_obstaclelist(FILE *emcd_config, char *area)
return 0;
}
int
RPC_waitforactive(char *pid, char *eid)
{
emulab::EmulabResponse er;
assert(pid != NULL);
assert(strlen(pid) > 0);
assert(eid != NULL);
assert(strlen(eid) > 0);
return RPC_invoke(pid, eid, "experiment.waitforactive", &er);
}
int
RPC_waitforrobots(event_handle_t handle, char *pid, char *eid)
{
......
......@@ -54,12 +54,19 @@ extern "C" {
extern const char *topography_name; // XXX temporary
typedef enum {
ES_UNKNOWN,
ES_ACTIVATING,
ES_ACTIVE,
} expt_state_t;
int RPC_init(const char *certpath, const char *host, unsigned short port);
int RPC_grab(void);
void RPC_drop(void);
int RPC_metadata(char *pid, char *eid);
int RPC_waitforrobots(event_handle_t handle, char *pid, char *eid);
expt_state_t RPC_expt_state(char *pid, char *eid);
int RPC_waitforactive(char *pid, char *eid);
int RPC_notifystart(char *pid, char *eid, char *timeline, int set_or_clear);
int RPC_agentlist(event_handle_t handle, char *pid, char *eid);
......
......@@ -332,6 +332,7 @@ static int timeline_agent_immediate(local_agent_t la, sched_event_t *se)
"TOKEN",
(int32_t *)&token);
gettimeofday(&now, NULL);
then = now;
for (lpc = 0; lpc < ta->ta_count; lpc++) {
timeradd(&now,
&ta->ta_events[lpc].time,
......@@ -340,17 +341,16 @@ static int timeline_agent_immediate(local_agent_t la, sched_event_t *se)
&ta->ta_events[lpc],
&then);
}
if (ta->ta_count > 0) {
event_do(la->la_handle,
EA_Experiment, pideid,
EA_When, &then,
EA_Type, TBDB_OBJECTTYPE_TIMELINE,
EA_Name, la->la_link.ln_Name,
EA_Event, TBDB_EVENTTYPE_COMPLETE,
EA_ArgInteger, "ERROR", 0,
EA_ArgInteger, "CTOKEN", token,
EA_TAG_DONE);
}
event_do(la->la_handle,
EA_Experiment, pideid,
EA_When, &then,
EA_Type, TBDB_OBJECTTYPE_TIMELINE,
EA_Name, la->la_link.ln_Name,
EA_Event, TBDB_EVENTTYPE_COMPLETE,
EA_ArgInteger, "ERROR", 0,
EA_ArgInteger, "CTOKEN", token,
EA_TAG_DONE);
retval = 0;
}
......
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2004 University of Utah and the Flux Group.
* Copyright (c) 2000-2005 University of Utah and the Flux Group.
* All rights reserved.
*/
......@@ -33,14 +33,22 @@
static int debug;
static int exit_value = 0;
static void comp_callback(event_handle_t handle,
event_notification_t notification,
void *data);
void
usage(char *progname)
{
fprintf(stderr,
"Usage: %s [-s server] [-c] event\n"
" %s [-s server] [-k keyfile] -e pid/eid time objname "
"Usage: %s [-w] [-s server] [-c] event\n"
" %s [-w] [-s server] [-k keyfile] -e pid/eid time objname "
"event [args ...]\n"
" time: 'now' or '+seconds' or [[[[yy]mm]dd]HH]MMss\n"
"Options:\n"
" -w Wait for the event to complete.\n"
"Examples:\n"
" %s -e pid/eid now cbr0 set interval_=0.2\n"
" %s -e pid/eid +10 cbr0 start\n"
......@@ -59,6 +67,7 @@ main(int argc, char **argv)
char *server = NULL;
char *port = NULL;
int control = 0;
int wait_for_complete = 0;
char *myeid = NULL;
char *keyfile = NULL;
char keyfilebuf[BUFSIZ];
......@@ -68,11 +77,14 @@ main(int argc, char **argv)
progname = argv[0];
while ((c = getopt(argc, argv, "ds:p:ce:k:")) != -1) {
while ((c = getopt(argc, argv, "dws:p:ce:k:")) != -1) {
switch (c) {
case 'd':
debug++;
break;
case 'w':
wait_for_complete = 1;
break;
case 's':
server = optarg;
break;
......@@ -202,6 +214,16 @@ main(int argc, char **argv)
fatal("could not register with event system");
}
if (wait_for_complete) {
tuple->expt = myeid;
tuple->eventtype = TBDB_EVENTTYPE_COMPLETE;
tuple->objname = objname;
if (! event_subscribe(handle, comp_callback, tuple, NULL)) {
fatal("could not subscribe to event");
}
}
if (control) {
/*
* Send a control event.
......@@ -343,10 +365,35 @@ main(int argc, char **argv)
event_notification_free(handle, notification);
if (wait_for_complete) {
event_main(handle);
}
/* Unregister with the event system: */
if (event_unregister(handle) == 0) {
fatal("could not unregister with event system");
}
return 0;
return exit_value;
}
void comp_callback(event_handle_t handle,
event_notification_t notification,
void *data)
{
char *value, argsbuf[BUFSIZ] = "";
event_notification_get_arguments(handle, notification,
argsbuf, sizeof(argsbuf));
if (event_arg_get(argsbuf, "ERROR", &value) > 0) {
if (sscanf(value, "%d", &exit_value) != 1) {
error("bad error value for complete: %s\n", argsbuf);
}
}
else {
warning("completion event is missing ERROR argument\n");
}
event_stop_main(handle);
}
......@@ -1164,6 +1164,17 @@ sub doSwapin($) {
TBDebugTimeStamp("ElabInElab setup finished");
}
if (! ($DISABLE_EVENTS || $elabinelab)) {
if ( $update_Eventsys_restart ||
($type != UPDATE && $type != UPDATE_RECOVER) ) {
TBDebugTimeStamp("Starting event time");
if (system("tevc -e $pid/$eid now __ns_sequence start")) {
print STDERR "*** Failed to start event time.\n";
return 1;
}
}
}
return 0;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment