event-sched.c 15.3 KB
Newer Older
Leigh B. Stoller's avatar
Leigh B. Stoller committed
1
2
/*
 * EMULAB-COPYRIGHT
3
 * Copyright (c) 2000-2003 University of Utah and the Flux Group.
Leigh B. Stoller's avatar
Leigh B. Stoller committed
4
5
6
 * All rights reserved.
 */

Ian Murdock's avatar
Ian Murdock committed
7
/*
8
 * event-sched.c --
Ian Murdock's avatar
Ian Murdock committed
9
10
11
 *
 *      Testbed event scheduler.
 *
12
13
14
15
16
 *      The event scheduler is an event system client; it operates by
 *      subscribing to the EVENT_SCHEDULE event, enqueuing the event
 *      notifications it receives, and resending the notifications at
 *      the indicated times.
 *
Ian Murdock's avatar
Ian Murdock committed
17
18
19
20
21
 */

#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
22
#include <signal.h>
Ian Murdock's avatar
Ian Murdock committed
23
24
#include <time.h>
#include <unistd.h>
25
26
#include <math.h>
#include <ctype.h>
27
#include "event-sched.h"
Leigh B. Stoller's avatar
Leigh B. Stoller committed
28
29
30
#include "log.h"
#include "tbdb.h"
#include "config.h"
Ian Murdock's avatar
Ian Murdock committed
31

32
33
static void enqueue(event_handle_t handle,
		    event_notification_t notification, void *data);
34
static void dequeue(event_handle_t handle);
35
static int  handle_simevent(event_handle_t handle, sched_event_t *eventp);
Ian Murdock's avatar
Ian Murdock committed
36

37
38
39
static char	*progname;
static char	*pid, *eid;
static int	get_static_events(event_handle_t handle);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
40
41
static int	debug;
static void	cleanup(void);
42
static void	quit(int);
43

44
struct agent {
45
46
47
48
49
	char    nodeid[TBDB_FLEN_NODEID];
	char    vnode[TBDB_FLEN_VNAME];
	char	objname[TBDB_FLEN_EVOBJNAME];
	char	objtype[TBDB_FLEN_EVOBJTYPE];
	char	ipaddr[32];
50
51
52
};
static struct agent	*agents;
static int		numagents;
53

54
55
56
void
usage()
{
57
58
59
60
61
62
63
64
	fprintf(stderr,
		"Usage: %s <options> -k keyfile <pid> <eid>\n"
		"options:\n"
		"-d         - Turn on debugging\n"
		"-s server  - Specify location of elvind server\n"
		"-p port    - Specify port number of elvind server\n"
		"-l logfile - Specify logfile to direct output\n"
		"-k keyfile - Specify keyfile name\n",
65
66
67
68
		progname);
	exit(-1);
}

Ian Murdock's avatar
Ian Murdock committed
69
70
71
int
main(int argc, char **argv)
{
Leigh B. Stoller's avatar
Leigh B. Stoller committed
72
73
74
75
	address_tuple_t tuple;
	event_handle_t handle;
	char *server = NULL;
	char *port = NULL;
76
	char *log = NULL;
77
	char *keyfile = NULL;
78
	char pideid[BUFSIZ], buf[BUFSIZ];
Leigh B. Stoller's avatar
Leigh B. Stoller committed
79
80
	int c, count;

81
82
	progname = argv[0];

Leigh B. Stoller's avatar
Leigh B. Stoller committed
83
84
85
	/* Initialize event queue semaphores: */
	sched_event_init();

86
	while ((c = getopt(argc, argv, "s:p:dl:k:")) != -1) {
Leigh B. Stoller's avatar
Leigh B. Stoller committed
87
88
		switch (c) {
		case 'd':
89
			debug++;
Leigh B. Stoller's avatar
Leigh B. Stoller committed
90
91
92
93
94
95
96
			break;
		case 's':
			server = optarg;
			break;
		case 'p':
			port = optarg;
			break;
97
98
99
		case 'l':
			log = optarg;
			break;
100
101
102
		case 'k':
			keyfile = optarg;
			break;
Leigh B. Stoller's avatar
Leigh B. Stoller committed
103
104
105
106
107
108
109
		default:
			fprintf(stderr, "Usage: %s [-s SERVER]\n", argv[0]);
			return 1;
		}
	}
	argc -= optind;
	argv += optind;
Ian Murdock's avatar
Ian Murdock committed
110

111
	if (argc != 2 || !keyfile)
Leigh B. Stoller's avatar
Leigh B. Stoller committed
112
		usage();
113

Leigh B. Stoller's avatar
Leigh B. Stoller committed
114
115
	pid = argv[0];
	eid = argv[1];
116
	sprintf(pideid, "%s/%s", pid, eid);
Ian Murdock's avatar
Ian Murdock committed
117

118
119
120
121
	signal(SIGINT, quit);
	signal(SIGTERM, quit);
	signal(SIGHUP, quit);

122
123
124
125
	if (debug)
		loginit(0, log);
	else
		loginit(1, "event-sched");
126

Leigh B. Stoller's avatar
Leigh B. Stoller committed
127
128
129
130
131
	/*
	 * Set up DB state.
	 */
	if (!dbinit())
		return 1;
132

Leigh B. Stoller's avatar
Leigh B. Stoller committed
133
	/*
134
135
136
	 * Set our pid in the DB. This will fail if there is already
	 * a non-zero value in the DB (although its okay to set it to
	 * zero no matter what). 
Leigh B. Stoller's avatar
Leigh B. Stoller committed
137
138
139
	 */
	if (! mydb_seteventschedulerpid(pid, eid, getpid()))
		fatal("Could not update DB with process id!");
140
	atexit(cleanup);
Ian Murdock's avatar
Ian Murdock committed
141

Leigh B. Stoller's avatar
Leigh B. Stoller committed
142
143
144
145
146
147
148
149
150
151
152
153
	/*
	 * Convert server/port to elvin thing.
	 *
	 * XXX This elvin string stuff should be moved down a layer. 
	 */
	if (server) {
		snprintf(buf, sizeof(buf), "elvin://%s%s%s",
			 server,
			 (port ? ":"  : ""),
			 (port ? port : ""));
		server = buf;
	}
Ian Murdock's avatar
Ian Murdock committed
154

Leigh B. Stoller's avatar
Leigh B. Stoller committed
155
	/* Register with the event system: */
156
	handle = event_register_withkeyfile(server, 1, keyfile);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
157
158
159
160
161
162
163
164
165
166
167
168
169
	if (handle == NULL) {
		fatal("could not register with event system");
	}

	/*
	 * Construct an address tuple for event subscription. We set the 
	 * scheduler flag to indicate we want to capture those notifications.
	 */
	tuple = address_tuple_alloc();
	if (tuple == NULL) {
		fatal("could not allocate an address tuple");
	}
	tuple->scheduler = 1;
170
	tuple->expt      = pideid;
Leigh B. Stoller's avatar
Leigh B. Stoller committed
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188

	if (event_subscribe(handle, enqueue, tuple, NULL) == NULL) {
		fatal("could not subscribe to EVENT_SCHEDULE event");
	}

	/*
	 * Hacky. Need to wait until all nodes in the experiment are
	 * in the ISUP state before we can start the event list rolling.
	 */
	count = 0;
	c     = 1;
	while (c) {
		if (! mydb_checkexptnodeeventstate(pid, eid,
						   TBDB_EVENTTYPE_ISUP, &c)) {
			fatal("Could not get node event state");
		}
		count++;
		if ((count % 10) == 0)
189
190
			info("Waiting for nodes in %s/%s to come up ...\n",
			     pid, eid);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
191

192
193
194
195
		/*
		 * Don't want to pound the DB too much.
		 */
		sleep(3);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
	}

	/*
	 * Read the static events list and schedule.
	 */
	if (!get_static_events(handle)) {
		fatal("could not get static event list");
	}

	/* Dequeue events and process them at the appropriate times: */
	dequeue(handle);

	/* Unregister with the event system: */
	if (event_unregister(handle) == 0) {
		fatal("could not unregister with event system");
	}
Ian Murdock's avatar
Ian Murdock committed
212

213
214
	info("Scheduler for %s/%s exiting\n", pid, eid);
	return 0;
Ian Murdock's avatar
Ian Murdock committed
215
216
}

217
/* Enqueue event notifications as they arrive. */
Ian Murdock's avatar
Ian Murdock committed
218
static void
219
enqueue(event_handle_t handle, event_notification_t notification, void *data)
Ian Murdock's avatar
Ian Murdock committed
220
{
221
222
223
    sched_event_t	event;
    char		objname[TBDB_FLEN_EVOBJNAME];
    int			x;
224
225
226

    /* Clone the event notification, since we want the notification to
       live beyond the callback function: */
227
    event.notification = event_notification_clone(handle, notification);
228
    if (!event.notification) {
229
	    error("event_notification_clone failed!\n");
230
	    return;
231
    }
Ian Murdock's avatar
Ian Murdock committed
232

233
234
235
236
    /* Clear the scheduler flag */
    if (! event_notification_remove(handle, event.notification, "SCHEDULER") ||
	! event_notification_put_int32(handle,
				       event.notification, "SCHEDULER", 0)) {
237
238
239
	    error("could not clear scheduler attribute of notification %p\n",
		  event.notification);
	    goto bad;
Ian Murdock's avatar
Ian Murdock committed
240
241
    }

242
    /* Get the event's firing time: */
243
244
245
246
247
248
249
    if (! event_notification_get_int32(handle, event.notification, "time_usec",
				       (int *) &event.time.tv_usec) ||
	! event_notification_get_int32(handle, event.notification, "time_sec",
				       (int *) &event.time.tv_sec)) {
	    error("could not get time from notification %p\n",
		  event.notification);
	    goto bad;
Ian Murdock's avatar
Ian Murdock committed
250
251
    }

252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
    /*
     * Must map the event to the proper agent running on a particular
     * node. 
     */
    if (! event_notification_get_objname(handle, event.notification,
					 objname, sizeof(objname))) {
	    error("could not get object name from notification %p\n",
		  event.notification);
	    goto bad;
    }
    for (x = 0; x < numagents; x++) {
	    if (!strcmp(agents[x].objname, objname))
		    break;
    }
    if (x == numagents) {
	    error("Could not map object to an agent: %s\n", objname);
	    goto bad;
    }
Leigh B. Stoller's avatar
Leigh B. Stoller committed
270
    
271
272
273
274
275
276
    event_notification_clear_host(handle, event.notification);
    event_notification_set_host(handle,
				event.notification, agents[x].ipaddr);
    event_notification_clear_objtype(handle, event.notification);
    event_notification_set_objtype(handle,
				   event.notification, agents[x].objtype);
277
    event_notification_insert_hmac(handle, event.notification);
278

Leigh B. Stoller's avatar
Leigh B. Stoller committed
279
280
    event.simevent = !strcmp(agents[x].objtype, TBDB_OBJECTTYPE_SIMULATOR);

281
282
283
284
285
286
287
288
289
290
    if (debug > 1) {
	    struct timeval now;
	    
	    gettimeofday(&now, NULL);
	    
	    info("Sched: note:%p at:%ld:%d now:%ld:%d agent:%d\n",
                 event.notification,
		 event.time.tv_sec, event.time.tv_usec,
		 now.tv_sec, now.tv_usec,
		 x);
Ian Murdock's avatar
Ian Murdock committed
291
    }
292
293
294
295

    /* Enqueue the event notification for resending at the indicated
       time: */
    sched_event_enqueue(event);
296
297
298
    return;
 bad:
    event_notification_free(handle, event.notification);
299
300
301
302
303
304
305
306
}

/* Dequeue events from the event queue and fire them at the
   appropriate time.  Runs in a separate thread. */
static void
dequeue(event_handle_t handle)
{
    sched_event_t next_event;
307
308
309
310
311
    struct timeval now;

    while (1) {
	if (sched_event_dequeue(&next_event, 1) < 0)
	    break;
312
313

        /* Fire event. */
314
315
	if (debug > 1)
	    gettimeofday(&now, NULL);
316

317
318
319
320
321
322
323
324
325
326
327
328
329
	/*
	 * Sim events are special right now since we handle them here.
	 */
	if (next_event.simevent) {
		if (! handle_simevent(handle, &next_event))
			goto bad;
	}
	else {
	    if (event_notify(handle, next_event.notification) == 0) {
		ERROR("could not fire event\n");
		return;
	    }
	}
330

331
	if (debug > 1) {
332
333
334
	    info("Fire:  note:%p at:%ld:%d now:%ld:%d\n",
                 next_event.notification,
		 next_event.time.tv_sec, next_event.time.tv_usec,
335
336
337
		 now.tv_sec,
		 now.tv_usec);
	}
338
    bad:
339
340
        event_notification_free(handle, next_event.notification);
    }
Ian Murdock's avatar
Ian Murdock committed
341
}
342
343
344
345
346
347
348
349
350
351
352
353
354
355

/*
 * Get the static event list from the DB and schedule according to
 * the relative time stamps.
 */
static int
get_static_events(event_handle_t handle)
{
	MYSQL_RES	*res;
	MYSQL_ROW	row;
	int		nrows;
	struct timeval	now, time;
	address_tuple_t tuple;
	char		pideid[BUFSIZ];
356
	event_notification_t notification;
357
	int		adx = 0;
358
	sched_event_t	event;
359
360
361

	/*
	 * Build up a table of agents that can receive dynamic events.
362
	 * These are stored in the virt_agents table, which we join
363
364
365
366
367
	 * with the reserved table to get the physical node name where
	 * the agent is running.
	 *
	 * That is, we want to be able to quickly map from "cbr0" to
	 * the node on which it lives (for dynamic events).
368
	 */
369
370
	res = mydb_query("select vi.vname,vi.vnode,r.node_id,o.type "
			 " from virt_agents as vi "
371
372
			 "left join reserved as r on "
			 " r.vname=vi.vnode and r.pid=vi.pid and r.eid=vi.eid "
373
374
375
376
			 "left join event_objecttypes as o on "
			 " o.idx=vi.objecttype "
			 "where vi.pid='%s' and vi.eid='%s'",
			 4, pid, eid);
377
378

	if (!res) {
379
		error("getting virt_agents list for %s/%s", pid, eid);
380
381
382
		return 0;
	}
	nrows = mysql_num_rows(res);
383
384
385
386
387
388
	agents = calloc(nrows, sizeof(struct agent));
	if (agents == NULL) {
		error("cannot allocate memory, too many agents (%d)\n", nrows);
		return 0;
	}

389
390
391
	while (nrows--) {
		row = mysql_fetch_row(res);

392
		if (!row[0] || !row[1] || !row[3])
393
394
395
396
			continue;

		strcpy(agents[numagents].objname, row[0]);
		strcpy(agents[numagents].vnode,   row[1]);
397
		strcpy(agents[numagents].objtype, row[3]);
398

399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
		/*
		 * Look for a wildcard in the vnode slot. As a result
		 * the node_id will come back null from the reserved
		 * table.
		 */
		if (strcmp("*", row[1])) {
			if (! row[2]) {
				error("No node_id for vnode %s", row[1]);
				continue;
			}
			
			strcpy(agents[numagents].nodeid,  row[2]);

			if (! mydb_nodeidtoip(row[2],
					      agents[numagents].ipaddr)) {
				error("No ipaddr for node_id %s", row[2]);
				continue;
			}
		}
		else {
			/*
			 * Force events to all nodes. The agents will
			 * need to discriminate on stuff inside the event.
			 */
			strcpy(agents[numagents].nodeid, ADDRESSTUPLE_ALL);
			strcpy(agents[numagents].ipaddr, ADDRESSTUPLE_ALL);
		}
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
		numagents++;
	}
	mysql_free_result(res);
	
	if (debug) {
		for (adx = 0; adx < numagents; adx++) {
			info("Agent %d: %10s %10s %10s %8s %16s\n", adx,
			     agents[adx].objname,
			     agents[adx].objtype,
			     agents[adx].vnode,
			     agents[adx].nodeid,
			     agents[adx].ipaddr);
		}
	}

	/*
	 * Now get the eventlist. There should be entries in the
	 * agents table for anything we find in the list.
	 */
445
	res = mydb_query("select ex.idx,ex.time,ex.vname,"
446
			 " ex.arguments,ot.type,et.type from eventlist as ex "
447
448
449
450
			 "left join event_eventtypes as et on "
			 " ex.eventtype=et.idx "
			 "left join event_objecttypes as ot on "
			 " ex.objecttype=ot.idx "
451
452
			 "where ex.pid='%s' and ex.eid='%s' "
			 "order by ex.time ASC",
453
			 6, pid, eid);
454
455
#define EXIDX	 row[0]
#define EXTIME	 row[1]
456
457
458
459
#define OBJNAME  row[2]
#define EXARGS	 row[3]
#define OBJTYPE	 row[4]
#define EVTTYPE	 row[5]
460
461

	if (!res) {
Leigh B. Stoller's avatar
Leigh B. Stoller committed
462
		error("getting static event list for %s/%s", pid, eid);
463
464
		return 0;
	}
465
	nrows = (int) mysql_num_rows(res);
466
467
468
469
470
471
472

	/*
	 * Construct an address tuple for the notifications. We can reuse
	 * the same one over and over since the data is copied out.
	 */
	tuple = address_tuple_alloc();
	if (tuple == NULL) {
Leigh B. Stoller's avatar
Leigh B. Stoller committed
473
		error("could not allocate an address tuple");
474
		return 0;
475
476
477
478
	}

	sprintf(pideid, "%s/%s", pid, eid);
	gettimeofday(&now, NULL);
479
480
481
482
483
484

	info("Getting event stream at: %lu:%d\n",  now.tv_sec, now.tv_usec);

	/*
	 * Pad the start time out a bit to give this code a chance to run.
	 */
485
	now.tv_sec += 30;
486
487
488
489
490
491
	
	while (nrows) {
		double		firetime;

		row = mysql_fetch_row(res);
		firetime = atof(EXTIME);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
492

493
494
495
496
497
498
499
500
		for (adx = 0; adx < numagents; adx++) {
			if (!strcmp(agents[adx].objname, OBJNAME))
				break;
		}
		if (adx == numagents) {
			error("Could not map event index %s", EXIDX);
			return 0;
		}
501
502

		tuple->expt      = pideid;
503
504
		tuple->host      = agents[adx].ipaddr;
		tuple->objname   = OBJNAME;
505
506
507
		tuple->objtype   = OBJTYPE;
		tuple->eventtype = EVTTYPE;

508
		if (debug) 
509
510
			info("%8s %10s %10s %10s %10s %10s\n",
			     EXTIME, OBJNAME, OBJTYPE,
511
512
513
			     EVTTYPE, agents[adx].ipaddr, 
			     EXARGS ? EXARGS : "");

514
515
		event.notification = event_notification_alloc(handle, tuple);
		if (! event.notification) {
Leigh B. Stoller's avatar
Leigh B. Stoller committed
516
			error("could not allocate notification");
517
			mysql_free_result(res);
518
			return 0;
519
		}
520
521
		event_notification_set_arguments(handle,
						 event.notification, EXARGS);
522

523
		event_notification_insert_hmac(handle, event.notification);
524
525
526
527
528
529
530
531
532
533
		time.tv_sec  = now.tv_sec  + (int)firetime;
		time.tv_usec = now.tv_usec +
			(int)((firetime - (floor(firetime))) * 1000000);

		if (time.tv_usec >= 1000000) {
			time.tv_sec  += 1;
			time.tv_usec -= 1000000;
		}
		event.time.tv_sec  = time.tv_sec;
		event.time.tv_usec = time.tv_usec;
534
535
		event.simevent     = !strcmp(OBJTYPE,
					     TBDB_OBJECTTYPE_SIMULATOR);
536
537
538
539
		sched_event_enqueue(event);
		nrows--;
	}
	mysql_free_result(res);
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554

	/*
	 * Generate a TIME starts message.
	 */
	tuple->expt      = pideid;
	tuple->host      = ADDRESSTUPLE_ALL;
	tuple->objname   = ADDRESSTUPLE_ANY;
	tuple->objtype   = TBDB_OBJECTTYPE_TIME;
	tuple->eventtype = TBDB_EVENTTYPE_START;
	
	notification = event_notification_alloc(handle, tuple);
	if (! notification) {
		error("could not allocate notification");
		return 0;
	}
555
	event_notification_insert_hmac(handle, notification);
556
	event.simevent     = 0;
557
558
559
560
	event.notification = notification;
	event.time.tv_sec  = now.tv_sec;
	event.time.tv_usec = now.tv_usec;
	sched_event_enqueue(event);
561

562
563
564
565
	info("TIME STARTS will be sent at: %lu:%d\n", now.tv_sec, now.tv_usec);

	gettimeofday(&now, NULL);
	info("The time is now: %lu:%d\n", now.tv_sec, now.tv_usec);
566

567
568
569
	return 1;
}
	
Leigh B. Stoller's avatar
Leigh B. Stoller committed
570
571
572
573
574
static void
cleanup(void)
{
	if (pid) 
		mydb_seteventschedulerpid(pid, eid, 0);
575
576
577
578
579
580
581
	pid = NULL;
	eid = NULL;
}

static void
quit(int sig)
{
582
583
	info("Got sig %d, exiting\n", sig);

584
585
	/* cleanup() will be called from atexit() */
	exit(0);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
586
}
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604

static int
handle_simevent(event_handle_t handle, sched_event_t *eventp)
{
	char		evtype[TBDB_FLEN_EVEVENTTYPE];
	int		rcode;
	char		cmd[BUFSIZ];

	if (! event_notification_get_eventtype(handle,
					       eventp->notification,
					       evtype, sizeof(evtype))) {
		error("could not get event type from notification %p\n",
		      eventp->notification);
		return 0;
	}

	/*
	 * All we know about is the "SWAPOUT" and "HALT" event!
605
	 * Also NSESWAP event
606
607
	 */
	if (strcmp(evtype, TBDB_EVENTTYPE_HALT) &&
608
609
	    strcmp(evtype, TBDB_EVENTTYPE_SWAPOUT) &&
	    strcmp(evtype, TBDB_EVENTTYPE_NSESWAP)) {
610
611
612
613
614
615
616
617
618
619
620
621
622
		error("cannot handle SIMULATOR event %s.\n", evtype);
		return 0;
	}

	/*
	 * We are lucky! The event scheduler runs as the user! But just in
	 * case, check our uid to make sure we are not root.
	 */
	if (!getuid() || !geteuid()) {
		error("Cannot run SIMULATOR %s as root.\n", evtype);
		return 0;
	}
	/*
Leigh B. Stoller's avatar
Leigh B. Stoller committed
623
	 * Run the command. Output goes ...
624
625
626
627
628
629
630
	 */
	if (!strcmp(evtype, TBDB_EVENTTYPE_SWAPOUT)) {
	    sprintf(cmd, "swapexp -s out %s %s", pid, eid);
	}
	else if (!strcmp(evtype, TBDB_EVENTTYPE_HALT)) {
	    sprintf(cmd, "endexp %s %s", pid, eid);
	}
Leigh B. Stoller's avatar
Leigh B. Stoller committed
631
632
633
	else if (!strcmp(evtype, TBDB_EVENTTYPE_NSESWAP)) {
	    sprintf(cmd, "nseswap %s %s", pid, eid);
	}
634
635
636
637
638
	rcode = system(cmd);
	
	/* Should not return, but ... */
	return (rcode == 0);
}