event-sched.c 12.5 KB
Newer Older
Ian Murdock's avatar
Ian Murdock committed
1
/*
2
 * event-sched.c --
Ian Murdock's avatar
Ian Murdock committed
3
4
5
 *
 *      Testbed event scheduler.
 *
6
7
8
9
10
 *      The event scheduler is an event system client; it operates by
 *      subscribing to the EVENT_SCHEDULE event, enqueuing the event
 *      notifications it receives, and resending the notifications at
 *      the indicated times.
 *
Ian Murdock's avatar
Ian Murdock committed
11
12
13
14
15
16
 * @COPYRIGHT@
 */

#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
17
#include <signal.h>
Ian Murdock's avatar
Ian Murdock committed
18
19
#include <time.h>
#include <unistd.h>
20
21
#include <math.h>
#include <ctype.h>
22
#include "event-sched.h"
Leigh B. Stoller's avatar
Leigh B. Stoller committed
23
24
25
#include "log.h"
#include "tbdb.h"
#include "config.h"
Ian Murdock's avatar
Ian Murdock committed
26

27
28
static void enqueue(event_handle_t handle,
		    event_notification_t notification, void *data);
29
static void dequeue(event_handle_t handle);
Ian Murdock's avatar
Ian Murdock committed
30

31
32
33
static char	*progname;
static char	*pid, *eid;
static int	get_static_events(event_handle_t handle);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
34
35
static int	debug;
static void	cleanup(void);
36
static void	quit(int);
37

38
#define MAXAGENTS	200
39
40
41
42
43
44
45
46
47
static struct {
	char    nodeid[TBDB_FLEN_NODEID];
	char    vnode[TBDB_FLEN_VNAME];
	char	objname[TBDB_FLEN_EVOBJNAME];
	char	objtype[TBDB_FLEN_EVOBJTYPE];
	char	ipaddr[32];
} agents[MAXAGENTS];
static int	numagents;

48
49
50
51
52
53
54
55
void
usage()
{
	fprintf(stderr,	"Usage: %s [-s server] [-p port] <pid> <eid>\n",
		progname);
	exit(-1);
}

Ian Murdock's avatar
Ian Murdock committed
56
57
58
int
main(int argc, char **argv)
{
Leigh B. Stoller's avatar
Leigh B. Stoller committed
59
60
61
62
	address_tuple_t tuple;
	event_handle_t handle;
	char *server = NULL;
	char *port = NULL;
63
	char *log = NULL;
64
	char pideid[BUFSIZ], buf[BUFSIZ];
Leigh B. Stoller's avatar
Leigh B. Stoller committed
65
66
	int c, count;

67
68
	progname = argv[0];

Leigh B. Stoller's avatar
Leigh B. Stoller committed
69
70
71
	/* Initialize event queue semaphores: */
	sched_event_init();

72
	while ((c = getopt(argc, argv, "s:p:dl:")) != -1) {
Leigh B. Stoller's avatar
Leigh B. Stoller committed
73
74
		switch (c) {
		case 'd':
75
			debug++;
Leigh B. Stoller's avatar
Leigh B. Stoller committed
76
77
78
79
80
81
82
			break;
		case 's':
			server = optarg;
			break;
		case 'p':
			port = optarg;
			break;
83
84
85
		case 'l':
			log = optarg;
			break;
Leigh B. Stoller's avatar
Leigh B. Stoller committed
86
87
88
89
90
91
92
		default:
			fprintf(stderr, "Usage: %s [-s SERVER]\n", argv[0]);
			return 1;
		}
	}
	argc -= optind;
	argv += optind;
Ian Murdock's avatar
Ian Murdock committed
93

Leigh B. Stoller's avatar
Leigh B. Stoller committed
94
95
96
97
	if (argc != 2)
		usage();
	pid = argv[0];
	eid = argv[1];
98
	sprintf(pideid, "%s/%s", pid, eid);
Ian Murdock's avatar
Ian Murdock committed
99

100
101
102
103
	signal(SIGINT, quit);
	signal(SIGTERM, quit);
	signal(SIGHUP, quit);

104
105
106
107
	if (debug)
		loginit(0, log);
	else
		loginit(1, "event-sched");
108

Leigh B. Stoller's avatar
Leigh B. Stoller committed
109
110
111
112
113
	/*
	 * Set up DB state.
	 */
	if (!dbinit())
		return 1;
114

Leigh B. Stoller's avatar
Leigh B. Stoller committed
115
	/*
116
117
118
	 * Set our pid in the DB. This will fail if there is already
	 * a non-zero value in the DB (although its okay to set it to
	 * zero no matter what). 
Leigh B. Stoller's avatar
Leigh B. Stoller committed
119
120
121
	 */
	if (! mydb_seteventschedulerpid(pid, eid, getpid()))
		fatal("Could not update DB with process id!");
122
	atexit(cleanup);
Ian Murdock's avatar
Ian Murdock committed
123

Leigh B. Stoller's avatar
Leigh B. Stoller committed
124
125
126
127
128
129
130
131
132
133
134
135
	/*
	 * Convert server/port to elvin thing.
	 *
	 * XXX This elvin string stuff should be moved down a layer. 
	 */
	if (server) {
		snprintf(buf, sizeof(buf), "elvin://%s%s%s",
			 server,
			 (port ? ":"  : ""),
			 (port ? port : ""));
		server = buf;
	}
Ian Murdock's avatar
Ian Murdock committed
136

Leigh B. Stoller's avatar
Leigh B. Stoller committed
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
	/* Register with the event system: */
	handle = event_register(server, 1);
	if (handle == NULL) {
		fatal("could not register with event system");
	}

	/*
	 * Construct an address tuple for event subscription. We set the 
	 * scheduler flag to indicate we want to capture those notifications.
	 */
	tuple = address_tuple_alloc();
	if (tuple == NULL) {
		fatal("could not allocate an address tuple");
	}
	tuple->scheduler = 1;
152
	tuple->expt      = pideid;
Leigh B. Stoller's avatar
Leigh B. Stoller committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170

	if (event_subscribe(handle, enqueue, tuple, NULL) == NULL) {
		fatal("could not subscribe to EVENT_SCHEDULE event");
	}

	/*
	 * Hacky. Need to wait until all nodes in the experiment are
	 * in the ISUP state before we can start the event list rolling.
	 */
	count = 0;
	c     = 1;
	while (c) {
		if (! mydb_checkexptnodeeventstate(pid, eid,
						   TBDB_EVENTTYPE_ISUP, &c)) {
			fatal("Could not get node event state");
		}
		count++;
		if ((count % 10) == 0)
171
172
			info("Waiting for nodes in %s/%s to come up ...\n",
			     pid, eid);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
173

174
175
176
177
		/*
		 * Don't want to pound the DB too much.
		 */
		sleep(3);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
	}

	/*
	 * Read the static events list and schedule.
	 */
	if (!get_static_events(handle)) {
		fatal("could not get static event list");
	}

	/* Dequeue events and process them at the appropriate times: */
	dequeue(handle);

	/* Unregister with the event system: */
	if (event_unregister(handle) == 0) {
		fatal("could not unregister with event system");
	}
Ian Murdock's avatar
Ian Murdock committed
194
195
196
197

    return 0;
}

198
/* Enqueue event notifications as they arrive. */
Ian Murdock's avatar
Ian Murdock committed
199
static void
200
enqueue(event_handle_t handle, event_notification_t notification, void *data)
Ian Murdock's avatar
Ian Murdock committed
201
{
202
203
204
    sched_event_t	event;
    char		objname[TBDB_FLEN_EVOBJNAME];
    int			x;
205
206
207
208
209
210

    /* Clone the event notification, since we want the notification to
       live beyond the callback function: */
    event.notification = elvin_notification_clone(notification,
                                                  handle->status);
    if (!event.notification) {
211
212
	    error("elvin_notification_clone failed!\n");
	    return;
213
    }
Ian Murdock's avatar
Ian Murdock committed
214

215
216
217
218
    /* Clear the scheduler flag */
    if (! event_notification_remove(handle, event.notification, "SCHEDULER") ||
	! event_notification_put_int32(handle,
				       event.notification, "SCHEDULER", 0)) {
219
220
221
	    error("could not clear scheduler attribute of notification %p\n",
		  event.notification);
	    goto bad;
Ian Murdock's avatar
Ian Murdock committed
222
223
    }

224
    /* Get the event's firing time: */
225
226
227
228
229
230
231
    if (! event_notification_get_int32(handle, event.notification, "time_usec",
				       (int *) &event.time.tv_usec) ||
	! event_notification_get_int32(handle, event.notification, "time_sec",
				       (int *) &event.time.tv_sec)) {
	    error("could not get time from notification %p\n",
		  event.notification);
	    goto bad;
Ian Murdock's avatar
Ian Murdock committed
232
233
    }

234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
    /*
     * Must map the event to the proper agent running on a particular
     * node. 
     */
    if (! event_notification_get_objname(handle, event.notification,
					 objname, sizeof(objname))) {
	    error("could not get object name from notification %p\n",
		  event.notification);
	    goto bad;
    }
    for (x = 0; x < numagents; x++) {
	    if (!strcmp(agents[x].objname, objname))
		    break;
    }
    if (x == numagents) {
	    error("Could not map object to an agent: %s\n", objname);
	    goto bad;
    }
    event_notification_clear_host(handle, event.notification);
    event_notification_set_host(handle,
				event.notification, agents[x].ipaddr);
    event_notification_clear_objtype(handle, event.notification);
    event_notification_set_objtype(handle,
				   event.notification, agents[x].objtype);

    if (debug > 1) {
	    struct timeval now;
	    
	    gettimeofday(&now, NULL);
	    
	    info("Sched: note:%p at:%ld:%d now:%ld:%d agent:%d\n",
                 event.notification,
		 event.time.tv_sec, event.time.tv_usec,
		 now.tv_sec, now.tv_usec,
		 x);
Ian Murdock's avatar
Ian Murdock committed
269
    }
270
271
272
273

    /* Enqueue the event notification for resending at the indicated
       time: */
    sched_event_enqueue(event);
274
275
276
    return;
 bad:
    event_notification_free(handle, event.notification);
277
278
279
280
281
282
283
284
}

/* Dequeue events from the event queue and fire them at the
   appropriate time.  Runs in a separate thread. */
static void
dequeue(event_handle_t handle)
{
    sched_event_t next_event;
285
286
287
288
289
    struct timeval now;

    while (1) {
	if (sched_event_dequeue(&next_event, 1) < 0)
	    break;
290
291

        /* Fire event. */
292
293
	if (debug > 1)
	    gettimeofday(&now, NULL);
294
295
296
297
298
299

        if (event_notify(handle, next_event.notification) == 0) {
            ERROR("could not fire event\n");
            return;
        }

300
	if (debug > 1) {
301
302
303
	    info("Fire:  note:%p at:%ld:%d now:%ld:%d\n",
                 next_event.notification,
		 next_event.time.tv_sec, next_event.time.tv_usec,
304
305
306
		 now.tv_sec,
		 now.tv_usec);
	}
307
308
        event_notification_free(handle, next_event.notification);
    }
Ian Murdock's avatar
Ian Murdock committed
309
}
310
311
312
313
314
315
316
317
318
319
320
321
322
323

/*
 * Get the static event list from the DB and schedule according to
 * the relative time stamps.
 */
static int
get_static_events(event_handle_t handle)
{
	MYSQL_RES	*res;
	MYSQL_ROW	row;
	int		nrows;
	struct timeval	now, time;
	address_tuple_t tuple;
	char		pideid[BUFSIZ];
324
	event_notification_t notification;
325
	int		adx = 0;
326
	sched_event_t	event;
327
328
329

	/*
	 * Build up a table of agents that can receive dynamic events.
330
331
332
333
334
335
	 * These are stored in the virt_trafgens table, which we join
	 * with the reserved table to get the physical node name where
	 * the agent is running.
	 *
	 * That is, we want to be able to quickly map from "cbr0" to
	 * the node on which it lives (for dynamic events).
336
	 */
337
338
	res = mydb_query("select vi.vname,vi.vnode,r.node_id,o.type "
			 " from virt_agents as vi "
339
340
			 "left join reserved as r on "
			 " r.vname=vi.vnode and r.pid=vi.pid and r.eid=vi.eid "
341
342
343
344
			 "left join event_objecttypes as o on "
			 " o.idx=vi.objecttype "
			 "where vi.pid='%s' and vi.eid='%s'",
			 4, pid, eid);
345
346

	if (!res) {
347
		error("getting virt_agents list for %s/%s", pid, eid);
348
349
350
351
352
353
		return 0;
	}
	nrows = mysql_num_rows(res);
	while (nrows--) {
		row = mysql_fetch_row(res);

354
		if (!row[0] || !row[1] || !row[2] || !row[3])
355
356
357
358
359
			continue;

		strcpy(agents[numagents].objname, row[0]);
		strcpy(agents[numagents].vnode,   row[1]);
		strcpy(agents[numagents].nodeid,  row[2]);
360
		strcpy(agents[numagents].objtype, row[3]);
361
362
363
364

		if (! mydb_nodeidtoip(row[2], agents[numagents].ipaddr))
			continue;
		numagents++;
365
366
367
		if (numagents >= MAXAGENTS) {
			fatal("Too many agents!");
		}
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
	}
	mysql_free_result(res);
	
	if (debug) {
		for (adx = 0; adx < numagents; adx++) {
			info("Agent %d: %10s %10s %10s %8s %16s\n", adx,
			     agents[adx].objname,
			     agents[adx].objtype,
			     agents[adx].vnode,
			     agents[adx].nodeid,
			     agents[adx].ipaddr);
		}
	}

	/*
	 * Now get the eventlist. There should be entries in the
	 * agents table for anything we find in the list.
	 */
	res = mydb_query("select ex.idx,ex.time,ex.vnode,ex.vname,"
			 " ex.arguments,ot.type,et.type from eventlist as ex "
388
389
390
391
			 "left join event_eventtypes as et on "
			 " ex.eventtype=et.idx "
			 "left join event_objecttypes as ot on "
			 " ex.objecttype=ot.idx "
392
393
			 "where ex.pid='%s' and ex.eid='%s' "
			 "order by ex.time ASC",
394
			 7, pid, eid);
395
396
397
398
399
400
401
#define EXIDX	 row[0]
#define EXTIME	 row[1]
#define EXVNODE	 row[2]
#define OBJNAME  row[3]
#define EXARGS	 row[4]
#define OBJTYPE	 row[5]
#define EVTTYPE	 row[6]
402
403

	if (!res) {
Leigh B. Stoller's avatar
Leigh B. Stoller committed
404
		error("getting static event list for %s/%s", pid, eid);
405
406
407
408
409
410
411
412
413
414
415
416
417
418
		return 0;
	}

	if ((nrows = (int)mysql_num_rows(res)) == 0) {
		mysql_free_result(res);
		return 1;
	}

	/*
	 * Construct an address tuple for the notifications. We can reuse
	 * the same one over and over since the data is copied out.
	 */
	tuple = address_tuple_alloc();
	if (tuple == NULL) {
Leigh B. Stoller's avatar
Leigh B. Stoller committed
419
		error("could not allocate an address tuple");
420
		return 0;
421
422
423
424
	}

	sprintf(pideid, "%s/%s", pid, eid);
	gettimeofday(&now, NULL);
425
426
427
428
429
430

	info("Getting event stream at: %lu:%d\n",  now.tv_sec, now.tv_usec);

	/*
	 * Pad the start time out a bit to give this code a chance to run.
	 */
431
	now.tv_sec += 30;
432
433
434
435
436
437
	
	while (nrows) {
		double		firetime;

		row = mysql_fetch_row(res);
		firetime = atof(EXTIME);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
438

439
440
441
442
443
444
445
446
		for (adx = 0; adx < numagents; adx++) {
			if (!strcmp(agents[adx].objname, OBJNAME))
				break;
		}
		if (adx == numagents) {
			error("Could not map event index %s", EXIDX);
			return 0;
		}
447
448

		tuple->expt      = pideid;
449
450
		tuple->host      = agents[adx].ipaddr;
		tuple->objname   = OBJNAME;
451
452
453
		tuple->objtype   = OBJTYPE;
		tuple->eventtype = EVTTYPE;

454
455
456
457
458
459
		if (debug) 
			info("%8s %10s %10s %10s %10s %10s %10s\n",
			     EXTIME, EXVNODE, OBJNAME, OBJTYPE,
			     EVTTYPE, agents[adx].ipaddr, 
			     EXARGS ? EXARGS : "");

460
461
		event.notification = event_notification_alloc(handle, tuple);
		if (! event.notification) {
Leigh B. Stoller's avatar
Leigh B. Stoller committed
462
			error("could not allocate notification");
463
			mysql_free_result(res);
464
			return 0;
465
		}
466
467
		event_notification_set_arguments(handle,
						 event.notification, EXARGS);
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482

		time.tv_sec  = now.tv_sec  + (int)firetime;
		time.tv_usec = now.tv_usec +
			(int)((firetime - (floor(firetime))) * 1000000);

		if (time.tv_usec >= 1000000) {
			time.tv_sec  += 1;
			time.tv_usec -= 1000000;
		}
		event.time.tv_sec  = time.tv_sec;
		event.time.tv_usec = time.tv_usec;
		sched_event_enqueue(event);
		nrows--;
	}
	mysql_free_result(res);
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497

	/*
	 * Generate a TIME starts message.
	 */
	tuple->expt      = pideid;
	tuple->host      = ADDRESSTUPLE_ALL;
	tuple->objname   = ADDRESSTUPLE_ANY;
	tuple->objtype   = TBDB_OBJECTTYPE_TIME;
	tuple->eventtype = TBDB_EVENTTYPE_START;
	
	notification = event_notification_alloc(handle, tuple);
	if (! notification) {
		error("could not allocate notification");
		return 0;
	}
498
499
500
501
	event.notification = notification;
	event.time.tv_sec  = now.tv_sec;
	event.time.tv_usec = now.tv_usec;
	sched_event_enqueue(event);
502

503
504
505
506
	info("TIME STARTS will be sent at: %lu:%d\n", now.tv_sec, now.tv_usec);

	gettimeofday(&now, NULL);
	info("The time is now: %lu:%d\n", now.tv_sec, now.tv_usec);
507

508
509
510
	return 1;
}
	
Leigh B. Stoller's avatar
Leigh B. Stoller committed
511
512
513
514
515
static void
cleanup(void)
{
	if (pid) 
		mydb_seteventschedulerpid(pid, eid, 0);
516
517
518
519
520
521
522
523
524
	pid = NULL;
	eid = NULL;
}

static void
quit(int sig)
{
	/* cleanup() will be called from atexit() */
	exit(0);
Leigh B. Stoller's avatar
Leigh B. Stoller committed
525
}