Commit 29563eb0 authored by Ian Murdock's avatar Ian Murdock
Browse files

Rewrote event scheduler. Event system clients may now call

event_schedule (see event library), which essentially operates as a
deferred event_notify. event_schedule accepts a notification and a
firing time, alters the notification to change the type attribute to
EVENT_SCHEDULE and add a firing time attribute, and then sends the
altered notification using event_notify.  The event scheduler
subscribes to EVENT_SCHEDULE notifications. As they arrive,
it restores the type in the notification to that of the
original event and enqueues the notification in a priority queue
for firing at the indicated time. When the time arrives, the
scheduler removes the notification from the queue and resends it
using event_notify.

With these changes, the event system now supports dynamic events.
parent 7a518579
/*
* sched.c --
* event-sched.c --
*
* Testbed event scheduler.
*
* The event scheduler is an event system client; it operates by
* subscribing to the EVENT_SCHEDULE event, enqueuing the event
* notifications it receives, and resending the notifications at
* the indicated times.
*
* @COPYRIGHT@
*/
static char rcsid[] = "$Id: event-sched.c,v 1.1 2001-12-04 15:15:32 imurdock Exp $";
static char rcsid[] = "$Id: event-sched.c,v 1.2 2002-01-29 17:08:14 imurdock Exp $";
#include <stdio.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
......@@ -16,39 +22,21 @@ static char rcsid[] = "$Id: event-sched.c,v 1.1 2001-12-04 15:15:32 imurdock Exp
#include <event-sched.h>
static void sched_seed_event_queue(void);
#ifdef TEST_SCHED
static void sched_seed_event_queue_debug(void);
#endif /* TEST_SCHED */
/* Returns the amount of time until EVENT fires. */
static struct timeval
sched_time_until_event_fires(sched_event_t event)
{
struct timeval now, time;
gettimeofday(&now, NULL);
time.tv_sec = event.time.tv_sec - now.tv_sec;
time.tv_usec = event.time.tv_usec - now.tv_usec;
if (time.tv_usec < 0) {
time.tv_sec -= 1;
time.tv_usec += 1000000;
}
return time;
}
static void enqueue(event_handle_t handle, event_notification_t notification,
char *host, event_type_t type, void *data);
static void dequeue(event_handle_t handle);
int
main(int argc, char **argv)
{
sched_event_t next_event;
struct timeval next_event_wait, now;
event_handle_t handle;
event_notification_t notification;
pthread_t thread;
char *server = NULL;
int c;
/* Initialize event queue semaphores: */
sched_event_init();
while ((c = getopt(argc, argv, "s:")) != -1) {
switch (c) {
case 's':
......@@ -67,55 +55,19 @@ main(int argc, char **argv)
return 1;
}
#ifdef TEST_SCHED
/* Seed event queue with scripted events, for testing. */
sched_seed_event_queue_debug();
#endif /* TEST_SCHED */
/* Get static events from testbed database. */
sched_seed_event_queue();
while (sched_event_dequeue(&next_event) != 0) {
/* Determine how long to wait before firing the next event. */
next_event_wait = sched_time_until_event_fires(next_event);
/* If the event's firing time is in the future, then use
select to wait until the event should fire. */
if (next_event_wait.tv_sec >= 0 && next_event_wait.tv_usec > 0) {
if (select(0, NULL, NULL, NULL, &next_event_wait) != 0) {
ERROR("select did not timeout\n");
return 1;
}
}
/* Fire event. */
gettimeofday(&now, NULL);
TRACE("firing event (event=(time=(tv_sec=%ld, tv_usec=%ld)), "
"host=%s, type=%d) "
"at time (time=(tv_sec=%ld, tv_usec=%ld))\n",
next_event.time.tv_sec,
next_event.time.tv_usec,
next_event.host,
next_event.type,
now.tv_sec,
now.tv_usec);
notification = event_notification_alloc(handle, next_event.host,
next_event.type);
if (notification == NULL) {
ERROR("could not allocate notification\n");
return 1;
}
/* Subscribe to the EVENT_SCHEDULE event, and enqueue events as
they arrive: */
if (event_subscribe(handle, enqueue, EVENT_SCHEDULE, NULL) == NULL) {
ERROR("could not subscribe to EVENT_SCHEDULE event\n");
return 1;
}
if (event_notify(handle, notification) == 0) {
ERROR("could not fire event\n");
return 1;
}
/* Create a thread to dequeue events: */
pthread_create(&thread, NULL, (void *(*)(void *)) dequeue, handle);
pthread_detach(thread);
event_notification_free(handle, notification);
}
/* Begin the event loop, waiting to receive enqueue requests: */
event_main(handle);
/* Unregister with the event system: */
if (event_unregister(handle) == 0) {
......@@ -126,58 +78,132 @@ main(int argc, char **argv)
return 0;
}
/* Enqueue event notifications as they arrive. */
static void
sched_seed_event_queue(void)
{
/* XXX: Load events from database here. */
}
#ifdef TEST_SCHED
/* Load scripted events into the event queue. */
static void
sched_seed_event_queue_debug(void)
enqueue(event_handle_t handle, event_notification_t notification, char *host,
event_type_t type, void *data)
{
sched_event_t event;
struct timeval now;
strncpy(event.host, EVENT_HOST_ANY, MAXHOSTNAMELEN);
event.type = EVENT_TEST;
event_type_t old_type;
/* Clone the event notification, since we want the notification to
live beyond the callback function: */
event.notification = elvin_notification_clone(notification,
handle->status);
if (!event.notification) {
ERROR("elvin_notification_clone failed: ");
elvin_error_fprintf(stderr, handle->status);
return;
}
gettimeofday(&now, NULL);
/* Restore the original event type: */
event.time = now;
event.time.tv_sec += 5;
if (sched_event_enqueue(event) == 0) {
ERROR("could not enqueue event\n");
if (event_notification_get_int32(handle, event.notification, "old_type",
(int *) &old_type)
== 0)
{
ERROR("could not restore type attribute of notification %p\n",
event.notification);
return;
}
/* Do two events at once, to make sure we can deal with that. */
event.time = now;
event.time.tv_sec += 60;
if (sched_event_enqueue(event) == 0) {
ERROR("could not enqueue event\n");
if (event_notification_remove(handle, event.notification, "type") == 0) {
ERROR("could not restore type attribute of notification %p\n",
event.notification);
return;
}
event.time = now;
event.time.tv_sec += 60;
if (sched_event_enqueue(event) == 0) {
ERROR("could not enqueue event\n");
if (event_notification_put_int32(handle, event.notification, "type",
old_type)
== 0)
{
ERROR("could not restore type attribute of notification %p\n",
event.notification);
return;
}
event.time = now;
event.time.tv_sec += 120;
if (sched_event_enqueue(event) == 0) {
ERROR("could not enqueue event\n");
/* Get the event's firing time: */
if (event_notification_get_int32(handle, event.notification, "time_sec",
(int *) &event.time.tv_sec)
== 0)
{
ERROR("could not get time.tv_sec attribute from notification %p\n",
event.notification);
return;
}
event.time = now;
event.time.tv_sec += 300;
if (sched_event_enqueue(event) == 0) {
ERROR("could not enqueue event\n");
if (event_notification_get_int32(handle, event.notification, "time_usec",
(int *) &event.time.tv_usec)
== 0)
{
ERROR("could not get time.tv_usec attribute from notification %p\n",
event.notification);
return;
}
/* Enqueue the event notification for resending at the indicated
time: */
sched_event_enqueue(event);
}
/* Returns the amount of time until EVENT fires. */
static struct timeval
sched_time_until_event_fires(sched_event_t event)
{
struct timeval now, time;
gettimeofday(&now, NULL);
time.tv_sec = event.time.tv_sec - now.tv_sec;
time.tv_usec = event.time.tv_usec - now.tv_usec;
if (time.tv_usec < 0) {
time.tv_sec -= 1;
time.tv_usec += 1000000;
}
return time;
}
/* Dequeue events from the event queue and fire them at the
appropriate time. Runs in a separate thread. */
static void
dequeue(event_handle_t handle)
{
sched_event_t next_event;
struct timeval next_event_wait, now;
while (sched_event_dequeue(&next_event, 1) != 0) {
/* Determine how long to wait before firing the next event. */
next_event_wait = sched_time_until_event_fires(next_event);
/* If the event's firing time is in the future, then use
select to wait until the event should fire. */
if (next_event_wait.tv_sec >= 0 && next_event_wait.tv_usec > 0) {
if (select(0, NULL, NULL, NULL, &next_event_wait) != 0) {
ERROR("select did not timeout\n");
return;
}
}
/* Fire event. */
gettimeofday(&now, NULL);
TRACE("firing event (event=(notification=%p, "
"time=(tv_sec=%ld, tv_usec=%ld)) "
"at time (time=(tv_sec=%ld, tv_usec=%ld))\n",
next_event.notification,
next_event.time.tv_sec,
next_event.time.tv_usec,
now.tv_sec,
now.tv_usec);
if (event_notify(handle, next_event.notification) == 0) {
ERROR("could not fire event\n");
return;
}
event_notification_free(handle, next_event.notification);
}
}
#endif /* TEST_SCHED */
/*
* sched.h --
* event-sched.h --
*
* This file contains definitions for the testbed event
* scheduler.
*
* @COPYRIGHT@
*
* $Id: event-sched.h,v 1.1 2001-12-04 15:15:32 imurdock Exp $
* $Id: event-sched.h,v 1.2 2002-01-29 17:08:14 imurdock Exp $
*/
#ifndef __SCHED_H__
......@@ -22,10 +22,8 @@
/* Scheduler-internal representation of an event. */
typedef struct sched_event {
event_notification_t notification; /* event notification */
struct timeval time; /* event firing time */
char host[MAXHOSTNAMELEN]; /* host to send the event to, or "*"
* for all hosts */
event_type_t type; /* event type */
} sched_event_t;
/*
......@@ -33,9 +31,9 @@ typedef struct sched_event {
*/
/* queue.c */
void sched_event_init(void);
int sched_event_enqueue(sched_event_t event);
int sched_event_dequeue(sched_event_t *event);
int sched_event_dequeue(sched_event_t *event, int wait);
void sched_event_queue_dump(FILE *fp);
void sched_event_queue_verify(void);
#endif /* __SCHED_H__ */
......@@ -6,10 +6,12 @@
* @COPYRIGHT@
*/
static char rcsid[] = "$Id: queue.c,v 1.2 2001-12-04 15:17:09 imurdock Exp $";
static char rcsid[] = "$Id: queue.c,v 1.3 2002-01-29 17:08:14 imurdock Exp $";
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <sys/time.h>
......@@ -24,7 +26,7 @@ static char rcsid[] = "$Id: queue.c,v 1.2 2001-12-04 15:17:09 imurdock Exp $";
the queue is EVENT_QUEUE[1], and the tail is
EVENT_QUEUE[EVENT_QUEUE_TAIL]. EVENT_QUEUE[0] is initialized to 0,
since it should never be used; this makes it easier to catch errors. */
static sched_event_t event_queue[EVENT_QUEUE_LENGTH] = { { { 0, 0 } } };
static sched_event_t event_queue[EVENT_QUEUE_LENGTH];
/* The index of the event queue head (fixed). */
#define EVENT_QUEUE_HEAD 1
......@@ -32,10 +34,19 @@ static sched_event_t event_queue[EVENT_QUEUE_LENGTH] = { { { 0, 0 } } };
/* The index of the event queue tail. */
static int event_queue_tail = 0;
static pthread_mutex_t event_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Producer/consumer semaphores. */
static sem_t empty_slot, event_to_consume;
static int initialized = 0;
static void sched_event_queue_dump_node_and_descendents(FILE *fp,
int index,
int level);
static void sched_event_queue_verify(void);
/* Returns non-zero if EVENT1 is more recent than EVENT2, 0 otherwise. */
static inline int
event_is_more_recent(sched_event_t event1, sched_event_t event2)
......@@ -47,6 +58,15 @@ event_is_more_recent(sched_event_t event1, sched_event_t event2)
}
}
/* Initialize priority queue semaphores. */
void
sched_event_init(void)
{
sem_init(&empty_slot, 0, EVENT_QUEUE_LENGTH);
sem_init(&event_to_consume, 0, 0);
initialized = 1;
}
/* Enqueue the event EVENT to the priority queue. Returns non-zero if
successful, 0 otherwise. */
int
......@@ -54,19 +74,23 @@ sched_event_enqueue(sched_event_t event)
{
int parent, child;
if (event_queue_tail == EVENT_QUEUE_LENGTH - 1) {
ERROR("queue full\n");
return 0;
}
assert(initialized);
/* Wait until there is an empty slot in the event queue. */
sem_wait(&empty_slot);
pthread_mutex_lock(&event_queue_mutex);
assert(event_queue_tail < EVENT_QUEUE_LENGTH - 1);
/* Add the event to the priority queue. The event is first
inserted as a leaf of the tree, then propogated up the tree
until the heap property is again satisfied. At each iteration,
we check to see if the event being inserted is more
recent that it's parent. If it is, we swap parent and child,
move up one level in the tree, and iterate again; if it
isn't, we're as far up the tree as we're going to get and are
done. */
recent that its parent. If it is, we swap parent and child,
move up one level in the tree, and iterate again; if
it isn't, we're as far up the tree as we're going to get and
are done. */
event_queue[++event_queue_tail] = event;
......@@ -88,36 +112,56 @@ sched_event_enqueue(sched_event_t event)
}
}
TRACE("enqueued event (event=(time=(tv_sec=%ld, tv_usec=%ld)), "
"host=%s, type=%d)\n",
TRACE("enqueued event (event=(notification=%p, "
"time=(tv_sec=%ld, tv_usec=%ld)))\n",
event.notification,
event.time.tv_sec,
event.time.tv_usec,
event.host,
event.type);
event.time.tv_usec);
/* Sanity check: Make sure the heap property is satisfied. */
sched_event_queue_verify();
pthread_mutex_unlock(&event_queue_mutex);
/* Signal that there is now an event to be consumed. */
sem_post(&event_to_consume);
return 1;
}
/* Dequeue the next event from the priority queue. Stores the event
/* Dequeue the next event from the priority queue. If WAIT is
non-zero, block until there is an event to dequeue, otherwise
return immediately if the queue is empty. Stores the event
at *EVENT and returns non-zero if successful, 0 otherwise. */
int
sched_event_dequeue(sched_event_t *event)
sched_event_dequeue(sched_event_t *event, int wait)
{
int parent, child;
assert(initialized);
if (event == NULL) {
ERROR("invalid event pointer\n");
return 0;
}
if (event_queue_tail == 0) {
ERROR("queue empty\n");
return 0;
if (wait) {
/* Wait until there is an event to be consumed. */
sem_wait(&event_to_consume);
} else {
/* Return immediately if the queue is empty. */
int val;
sem_getvalue(&event_to_consume, &val);
if (val == 0) {
TRACE("queue empty\n");
return 0;
}
}
pthread_mutex_lock(&event_queue_mutex);
assert(event_queue_tail > 0);
/* Remove the next event from the priority queue. */
/* Store the item at the head of the queue in *EVENT; this is the
......@@ -157,16 +201,20 @@ sched_event_dequeue(sched_event_t *event)
}
}
TRACE("dequeued event (event=(time=(tv_sec=%ld, tv_usec=%ld)), "
"host=%s, type=%d)\n",
TRACE("dequeued event (event=(notification=%p, "
"time=(tv_sec=%ld, tv_usec=%ld)))\n",
event->notification,
event->time.tv_sec,
event->time.tv_usec,
event->host,
event->type);
event->time.tv_usec);
/* Sanity check: Make sure the heap property is satisfied. */
sched_event_queue_verify();
pthread_mutex_unlock(&event_queue_mutex);
/* Signal that there is now an empty slot in the event queue. */
sem_post(&empty_slot);
return 1;
}
......@@ -174,11 +222,13 @@ sched_event_dequeue(sched_event_t *event)
void
sched_event_queue_dump(FILE *fp)
{
pthread_mutex_lock(&event_queue_mutex);
sched_event_queue_dump_node_and_descendents(fp, EVENT_QUEUE_HEAD, 0);
pthread_mutex_unlock(&event_queue_mutex);
}
/* Dump an event queue node and its descendents, with indentation for
readability. */
readability. Expects EVENT_QUEUE_MUTEX to be locked. */
static void
sched_event_queue_dump_node_and_descendents(FILE *fp, int index, int level)
{
......@@ -211,8 +261,9 @@ sched_event_queue_dump_node_and_descendents(FILE *fp, int index, int level)
}
}
/* Verify that the event queue satisfies the heap property. */
void
/* Verify that the event queue satisfies the heap property. Expects
EVENT_QUEUE_MUTEX to be locked. */
static void
sched_event_queue_verify(void)
{
int error, i;
......@@ -264,6 +315,8 @@ main(int argc, char **argv)
gettimeofday(&now, NULL);
srand((int) now.tv_usec);
sched_event_init();
/* Enqueue events. */
printf("Enqueueing events...\n");
fflush(stdout);
......@@ -285,7 +338,7 @@ main(int argc, char **argv)
printf("Dequeueing events...\n");
fflush(stdout);
for (i = 0; i < events; i++) {
if (sched_event_dequeue(&event) == 0) {
if (sched_event_dequeue(&event, 0) == 0) {
ERROR("could not dequeue event\n");
return 1;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment