event-sched.c 8.92 KB
Newer Older
Ian Murdock's avatar
Ian Murdock committed
1
/*
2
 * event-sched.c --
Ian Murdock's avatar
Ian Murdock committed
3
4
5
 *
 *      Testbed event scheduler.
 *
6
7
8
9
10
 *      The event scheduler is an event system client; it operates by
 *      subscribing to the EVENT_SCHEDULE event, enqueuing the event
 *      notifications it receives, and resending the notifications at
 *      the indicated times.
 *
Ian Murdock's avatar
Ian Murdock committed
11
12
13
14
15
16
17
18
 * @COPYRIGHT@
 */

#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
19
20
#include <math.h>
#include <ctype.h>
21
#include "event-sched.h"
22
#include "libdb.h"
Ian Murdock's avatar
Ian Murdock committed
23

24
25
static void enqueue(event_handle_t handle,
		    event_notification_t notification, void *data);
26
static void dequeue(event_handle_t handle);
Ian Murdock's avatar
Ian Murdock committed
27

28
29
30
31
32
33
34
35
36
37
38
39
static char	*progname;
static char	*pid, *eid;
static int	get_static_events(event_handle_t handle);

void
usage()
{
	fprintf(stderr,	"Usage: %s [-s server] [-p port] <pid> <eid>\n",
		progname);
	exit(-1);
}

Ian Murdock's avatar
Ian Murdock committed
40
41
42
int
main(int argc, char **argv)
{
43
    address_tuple_t tuple;
Ian Murdock's avatar
Ian Murdock committed
44
    event_handle_t handle;
45
    char *server = NULL;
46
47
    char *port = NULL;
    char buf[BUFSIZ];
48
49
    int c;

50
51
52
    /* Initialize event queue semaphores: */
    sched_event_init();

53
    while ((c = getopt(argc, argv, "s:p:")) != -1) {
54
55
56
57
        switch (c) {
          case 's':
              server = optarg;
              break;
58
59
60
          case 'p':
              port = optarg;
              break;
61
62
63
64
65
          default:
              fprintf(stderr, "Usage: %s [-s SERVER]\n", argv[0]);
              return 1;
        }
    }
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    argc -= optind;
    argv += optind;

    if (argc != 2)
	    usage();
    pid = argv[0];
    eid = argv[1];

    /*
     * Set up DB state.
     */
    if (!dbinit())
	    return 1;

    /*
     * Convert server/port to elvin thing.
     *
     * XXX This elvin string stuff should be moved down a layer. 
     */
    if (server) {
	    snprintf(buf, sizeof(buf), "elvin://%s%s%s",
		     server,
		     (port ? ":"  : ""),
		     (port ? port : ""));
	    server = buf;
    }
Ian Murdock's avatar
Ian Murdock committed
92
93

    /* Register with the event system: */
94
    handle = event_register(server, 1);
Ian Murdock's avatar
Ian Murdock committed
95
96
97
98
99
    if (handle == NULL) {
        ERROR("could not register with event system\n");
        return 1;
    }

100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    /*
     * Read the static events list and schedule.
     */
    if (!get_static_events(handle)) {
        ERROR("could not get static event list\n");
        return 1;
    }

    /*
     * Construct an address tuple for event subscription. We set the scheduler
     * flag to indicate we want to capture those notifications.
     */
    tuple = address_tuple_alloc();
    if (tuple == NULL) {
	    ERROR("could not allocate an address tuple\n");
	    return 1;
    }
    tuple->scheduler = 1;

    if (event_subscribe(handle, enqueue, tuple, NULL) == NULL) {
120
121
122
        ERROR("could not subscribe to EVENT_SCHEDULE event\n");
        return 1;
    }
Ian Murdock's avatar
Ian Murdock committed
123

124
125
    /* Dequeue events and process them at the appropriate times: */
    dequeue(handle);
Ian Murdock's avatar
Ian Murdock committed
126
127
128
129
130
131
132
133
134
135

    /* Unregister with the event system: */
    if (event_unregister(handle) == 0) {
        ERROR("could not unregister with event system\n");
        return 1;
    }

    return 0;
}

136
/* Enqueue event notifications as they arrive. */
Ian Murdock's avatar
Ian Murdock committed
137
static void
138
enqueue(event_handle_t handle, event_notification_t notification, void *data)
Ian Murdock's avatar
Ian Murdock committed
139
140
{
    sched_event_t event;
141
142
143
144
145
146
147
148
149
150

    /* Clone the event notification, since we want the notification to
       live beyond the callback function: */
    event.notification = elvin_notification_clone(notification,
                                                  handle->status);
    if (!event.notification) {
        ERROR("elvin_notification_clone failed: ");
        elvin_error_fprintf(stderr, handle->status);
        return;
    }
Ian Murdock's avatar
Ian Murdock committed
151

152
153
154
155
156
    /* Clear the scheduler flag */
    if (! event_notification_remove(handle, event.notification, "SCHEDULER") ||
	! event_notification_put_int32(handle,
				       event.notification, "SCHEDULER", 0)) {
        ERROR("could not clear scheduler attribute of notification %p\n",
157
              event.notification);
Ian Murdock's avatar
Ian Murdock committed
158
159
160
        return;
    }

161
162
163
164
165
166
167
168
    /* Get the event's firing time: */

    if (event_notification_get_int32(handle, event.notification, "time_sec",
                                     (int *) &event.time.tv_sec)
        == 0)
    {
        ERROR("could not get time.tv_sec attribute from notification %p\n",
              event.notification);
Ian Murdock's avatar
Ian Murdock committed
169
170
171
        return;
    }

172
173
174
175
176
177
    if (event_notification_get_int32(handle, event.notification, "time_usec",
                                     (int *) &event.time.tv_usec)
        == 0)
    {
        ERROR("could not get time.tv_usec attribute from notification %p\n",
              event.notification);
Ian Murdock's avatar
Ian Murdock committed
178
179
        return;
    }
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210

    /* Enqueue the event notification for resending at the indicated
       time: */
    sched_event_enqueue(event);
}

/* Returns the amount of time until EVENT fires. */
static struct timeval
sched_time_until_event_fires(sched_event_t event)
{
    struct timeval now, time;

    gettimeofday(&now, NULL);

    time.tv_sec = event.time.tv_sec - now.tv_sec;
    time.tv_usec = event.time.tv_usec - now.tv_usec;
    if (time.tv_usec < 0) {
        time.tv_sec -= 1;
        time.tv_usec += 1000000;
    }

    return time;
}

/* Dequeue events from the event queue and fire them at the
   appropriate time.  Runs in a separate thread. */
static void
dequeue(event_handle_t handle)
{
    sched_event_t next_event;
    struct timeval next_event_wait, now;
211
    int foo;
212
213
214

    while (sched_event_dequeue(&next_event, 1) != 0) {
        /* Determine how long to wait before firing the next event. */
215
    again:
216
217
218
219
220
        next_event_wait = sched_time_until_event_fires(next_event);

        /* If the event's firing time is in the future, then use
           select to wait until the event should fire. */
        if (next_event_wait.tv_sec >= 0 && next_event_wait.tv_usec > 0) {
221
222
223
224
225
226
227
            if ((foo = select(0, NULL, NULL, NULL, &next_event_wait)) != 0) {
		/*
		 * I'll assume that this fails cause of a pthread
		 * related signal issue.
		 */
		ERROR("select did not timeout %d %d\n", foo, errno);
		goto again;
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
            }
        }

        /* Fire event. */

        gettimeofday(&now, NULL);

        TRACE("firing event (event=(notification=%p, "
              "time=(tv_sec=%ld, tv_usec=%ld)) "
              "at time (time=(tv_sec=%ld, tv_usec=%ld))\n",
              next_event.notification,
              next_event.time.tv_sec,
              next_event.time.tv_usec,
              now.tv_sec,
              now.tv_usec);

        if (event_notify(handle, next_event.notification) == 0) {
            ERROR("could not fire event\n");
            return;
        }

        event_notification_free(handle, next_event.notification);
    }
Ian Murdock's avatar
Ian Murdock committed
251
}
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354

/*
 * Get the static event list from the DB and schedule according to
 * the relative time stamps.
 */
static int
get_static_events(event_handle_t handle)
{
	MYSQL_RES	*res;
	MYSQL_ROW	row;
	int		nrows;
	struct timeval	now, time;
	address_tuple_t tuple;
	char		pideid[BUFSIZ];

	res = mydb_query("select ex.time,ex.vnode,ex.vname,ex.arguments,"
			 " ot.type,et.type,i.IP from %s_%s_events as ex "
			 "left join event_eventtypes as et on "
			 " ex.eventtype=et.idx "
			 "left join event_objecttypes as ot on "
			 " ex.objecttype=ot.idx "
			 "left join reserved as r on "
			 " ex.vnode=r.vname and r.pid='%s' and r.eid='%s' "
			 "left join nodes as n on r.node_id=n.node_id "
			 "left join node_types as nt on nt.type=n.type "
			 "left join interfaces as i on "
			 " i.node_id=r.node_id and i.iface=nt.control_iface",
			 7, pid, eid, pid, eid);
#define EXTIME	row[0]
#define EXVNODE	row[1]
#define EXVNAME	row[2]
#define EXARGS	row[3]
#define OBJTYPE	row[4]
#define EVTTYPE	row[5]
#define IPADDR	row[6]

	if (!res) {
		ERROR("getting static event list for %s/%s", pid, eid);
		return 0;
	}

	if ((nrows = (int)mysql_num_rows(res)) == 0) {
		mysql_free_result(res);
		return 1;
	}

	/*
	 * Construct an address tuple for the notifications. We can reuse
	 * the same one over and over since the data is copied out.
	 */
	tuple = address_tuple_alloc();
	if (tuple == NULL) {
		ERROR("could not allocate an address tuple\n");
		return 1;
	}

	sprintf(pideid, "%s/%s", pid, eid);
	gettimeofday(&now, NULL);
	
	while (nrows) {
		sched_event_t	event;
		double		firetime;

		row = mysql_fetch_row(res);
		firetime = atof(EXTIME);
			
		DBG("EV: %8s %10s %10s %10s %10s %10s %10s\n",
		    row[0], row[1], row[2],
		    row[3] ? row[3] : "",
		    row[4], row[5],
		    row[6] ? row[6] : "");
		

		tuple->expt      = pideid;
		tuple->host      = IPADDR;
		tuple->objname   = EXVNAME;
		tuple->objtype   = OBJTYPE;
		tuple->eventtype = EVTTYPE;

		event.notification = event_notification_alloc(handle, tuple);
		if (! event.notification) {
			ERROR("could not allocate notification\n");
			mysql_free_result(res);
			return 1;
		}

		time.tv_sec  = now.tv_sec  + (int)firetime;
		time.tv_usec = now.tv_usec +
			(int)((firetime - (floor(firetime))) * 1000000);

		if (time.tv_usec >= 1000000) {
			time.tv_sec  += 1;
			time.tv_usec -= 1000000;
		}
		event.time.tv_sec  = time.tv_sec;
		event.time.tv_usec = time.tv_usec;
		sched_event_enqueue(event);
		nrows--;
	}
	mysql_free_result(res);
	return 1;
}