event.c 62.6 KB
Newer Older
Leigh B. Stoller's avatar
Leigh B. Stoller committed
1
/*
2
 * Copyright (c) 2000-2016 University of Utah and the Flux Group.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 * 
 * {{{EMULAB-LICENSE
 * 
 * This file is part of the Emulab network testbed software.
 * 
 * This file is free software: you can redistribute it and/or modify it
 * under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at
 * your option) any later version.
 * 
 * This file is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this file.  If not, see <http://www.gnu.org/licenses/>.
 * 
 * }}}
Leigh B. Stoller's avatar
Leigh B. Stoller committed
22
23
 */

Ian Murdock's avatar
Ian Murdock committed
24
25
26
27
28
/*
 * event.c --
 *
 *      Testbed event library.  Currently uses the Elvin publish/
 *      subscribe system for routing event notifications.
29
30
31
32
33
34
 *
 * TODO:
 *	check all pubsub_* call sites to get return value sense correct.
 *	make sure handle->status (and error args in general) is correct.
 *	make sure _t types are passed as pointers-to
 *	deal with hmac_traverse
Ian Murdock's avatar
Ian Murdock committed
35
36
37
38
39
40
 */

#include <stdio.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
41
#include <stdarg.h>
Ian Murdock's avatar
Ian Murdock committed
42
#include <unistd.h>
43
#include <netdb.h>
44
#include <limits.h>
45
46
#include <netinet/in.h>
#include <arpa/inet.h>
47
#include <sys/time.h>
48
49
#include <sys/param.h>
#include <time.h>
50
#include "event.h"
51

52
53
54
55
#ifdef ELVIN_COMPAT
#include <pubsub/elvin_hash.h>
#endif

56
57
58
59
60
61
62
63
64
/* So we can tell with a strings what kind of event library. */
char	emulab_event_library_buildinfo[] =
    "Emulab Event Library: Version " EVENT_LIBRARY_VERSION
#ifdef ELVIN_COMPAT
    " with Elvin Compat";
#else
    "";
#endif

Mike Hibler's avatar
Mike Hibler committed
65
66
67
68
#define ERROR(fmt,...) \
 { fputs(__FUNCTION__,stderr); fprintf(stderr,": " fmt, ## __VA_ARGS__); }
#define INFO(fmt,...) \
 { fputs(__FUNCTION__,stderr); fprintf(stderr,": " fmt, ## __VA_ARGS__); }
69
#ifdef  DEBUG
Mike Hibler's avatar
Mike Hibler committed
70
71
#define TRACE(fmt,...) \
 { fputs(__FUNCTION__,stderr); fprintf(stderr,": " fmt, ## __VA_ARGS__); }
72
#else
73
#define TRACE(fmt,...)
74
#endif
Ian Murdock's avatar
Ian Murdock committed
75

Kirk Webb's avatar
   
Kirk Webb committed
76
77
#define IPADDRFILE "/var/emulab/boot/myip"

78
79
80
static int event_notification_check_hmac(event_handle_t handle,
					  event_notification_t notification);

81
static char hostname[MAXHOSTNAMELEN+1];
82
static char ipaddr[32];
Ian Murdock's avatar
Ian Murdock committed
83

84
85
86
87
88
89
/*
 * Count of how many handles are in use, so that we can avoid cleaning up until
 * the last one is unregistered
 */
static int handles_in_use = 0;

90
91
92
93
94
95
/*
 * Register with the testbed event system.  NAME specifies the name of
 * the event server.  Returns a pointer to a handle that may be passed
 * to other event system routines if the operation is successful, NULL
 * otherwise.
 *
Ian Murdock's avatar
Ian Murdock committed
96
97
98
99
100
101
102
103
104
 * The THREADED parameter should be set to 1 if the registering
 * client is multi-threaded. If THREADED is 1, the event
 * library will call routines that are thread-safe, and event
 * notifications will be dispatched using background threads (i.e.,
 * the client will supply its own event loop). If THREADED is 0, event
 * notifications will be dispatched using an event system-provided
 * event loop, and the client must call event_main after connecting in
 * order to receive notifications.
 *
105
106
107
108
109
110
111
112
113
114
 * Elvin note: NAME is a URL of the form "elvin:/[protocol
 * stack]/[endpoint]", where a protocol stack names a transport
 * module, a security module, and a marshaling module as a comma
 * separated list (e.g., "http,none,xml"), and the endpoint format
 * is dependent on the transport module used.  If no protocol
 * stack is given, the default stack (tcp, none, xdr) is used.  For the
 * testbed's purposes, "elvin://HOSTNAME" should suffice.  If NAME
 * is NULL, then Elvin's server discovery protocol will be used to find
 * the Elvin server.
 */
Ian Murdock's avatar
Ian Murdock committed
115
event_handle_t
116
event_register(char *name, int threaded)
117
118
119
120
121
{
	return event_register_withkeydata(name, threaded, NULL, 0);
}

event_handle_t
Sachin Goyal's avatar
   
Sachin Goyal committed
122
event_register_withkeyfile(char *name, int threaded, char *keyfile) {
123
124
  return event_register_withkeyfile_withretry(name,
					      threaded, keyfile, INT_MAX);
Sachin Goyal's avatar
   
Sachin Goyal committed
125
126
127
128
129
}

event_handle_t
event_register_withkeyfile_withretry(char *name, int threaded, 
				     char *keyfile, int retrycount)
130
131
132
133
134
135
136
137
{
    /* Grab the key data and stick it into the handle. */
    if (keyfile) {
	FILE		*fp;
	unsigned char   buf[2*BUFSIZ];
	int		cc;

	if ((fp = fopen(keyfile, "r")) == NULL) {
138
		ERROR("could not open keyfile: %s\n", keyfile);
139
140
141
		return 0;
	}
	if ((cc = fread(buf, sizeof(char), sizeof(buf), fp)) == 0) {
142
		ERROR("could not read keyfile: %s\n", keyfile);
143
144
145
146
		fclose(fp);
		return 0;
	}
	if (cc == sizeof(buf)) {
147
		ERROR("keyfile is too big: %s\n", keyfile);
148
149
150
151
		fclose(fp);
		return 0;
	}
	fclose(fp);
Sachin Goyal's avatar
   
Sachin Goyal committed
152
153
	return event_register_withkeydata_withretry(name, threaded, 
					  buf, cc, retrycount);
154
    }
Sachin Goyal's avatar
   
Sachin Goyal committed
155
156
    return event_register_withkeydata_withretry(name, threaded, NULL, 
						0, retrycount);
157
158
159
160
}

event_handle_t
event_register_withkeydata(char *name, int threaded,
Sachin Goyal's avatar
   
Sachin Goyal committed
161
162
			   unsigned char *keydata, int keylen){
    return event_register_withkeydata_withretry(name, threaded, keydata,
163
						keylen, INT_MAX);
Sachin Goyal's avatar
   
Sachin Goyal committed
164
165
166
167
168
169
170

}

event_handle_t
event_register_withkeydata_withretry(char *name, int threaded,
			   unsigned char *keydata, int keylen,
			   int retrycount)
Ian Murdock's avatar
Ian Murdock committed
171
{
172
#ifndef __CYGWIN__
173
    extern int pubsub_is_threaded[] __attribute__ ((weak));
174
#endif
175
    
176
    event_handle_t	handle;
177
    pubsub_handle_t    *server;
178
179
    struct hostent     *he;
    struct in_addr	myip;
180
181
    char	       *sstr = 0, *pstr = 0, *cp;
    int			port = PUBSUB_SERVER_PORTNUM;
Ian Murdock's avatar
Ian Murdock committed
182

183
184
185
186
    /*
     * So, this might fail if the hostame is too long, but lets not
     * give up; we can still get our IP on experimental nodes. 
     */
187
    if (gethostname(hostname, sizeof(hostname)) == -1) {
Ian Murdock's avatar
Ian Murdock committed
188
        ERROR("could not get hostname: %s\n", strerror(errno));
189
190
191
192
193
194
195
196
197
198
199
200
        bzero(hostname, sizeof(hostname));
    }

    /*
     * Make sure hostname is qualified, else we could get the
     * IP of an experimental interface. Just clear the hostname
     * so that we use the fallback method below. This should never
     * happen on server nodes (boss, ops, etc) and if it does, we
     * want to fail below. 
     */
    if (! strchr(hostname, '.')) {
        bzero(hostname, sizeof(hostname));
Ian Murdock's avatar
Ian Murdock committed
201
202
    }

203
204
205
206
    /*
     * Get our IP address. Thats how we name ourselves to the
     * Testbed Event System. 
     */
207
208
    if (strlen(hostname) && 
	(he = gethostbyname(hostname)) != NULL) {
Kirk Webb's avatar
   
Kirk Webb committed
209
210
        memcpy((char *)&myip, he->h_addr, he->h_length);
        strcpy(ipaddr, inet_ntoa(myip));
211
212
    }
    else {
213
214
215
216
	unsigned int        o1, o2, o3, o4;
	int                 scanres;
	FILE               *fp;

217
218
219
220
221
222
	if (strlen(hostname)) {
		ERROR("could not get IP address from hostname");
	} else {
		ERROR("hostname not set or not fully qualified");
	}
	fprintf(stderr, ", reading IP from %s.\n", IPADDRFILE);
Kirk Webb's avatar
   
Kirk Webb committed
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
        /* Try getting the node's ID from BOOTDIR/myip before giving up. */
	fp = fopen(IPADDRFILE, "r");
	if (fp != NULL) {
            scanres = fscanf(fp, "%3u.%3u.%3u.%3u", &o1, &o2, &o3, &o4);
	    (void) fclose(fp);
            if (scanres != 4) {
                ERROR("IP address not found on first line of file!\n");
                return 0;
            }
            if (o1 > 255 || o2 > 255 || o3 > 255 || o4 > 255) {
                ERROR("IP address inside file is invalid!\n");
                return 0;
            }
            snprintf(ipaddr, sizeof(ipaddr), "%u.%u.%u.%u", o1, o2, o3, o4);
        } else {
238
            ERROR("could not get IP from local file %s either!\n", IPADDRFILE);
Kirk Webb's avatar
   
Kirk Webb committed
239
240
            return 0;
        }
241
242
    }

243
    TRACE("registering with event system (ipaddr=\"%s\")\n", ipaddr);
Ian Murdock's avatar
Ian Murdock committed
244

245
246
    /* Allocate a handle to be returned to the caller: */
    handle = xmalloc(sizeof(*handle));
247
248
249
250
251
252
253
254
255
    bzero(handle, sizeof(*handle));

    /* Grab the key data and stick it into the handle. */
    if (keydata) {
	handle->keydata = xmalloc(keylen + 1);
	handle->keylen  = keylen;
	memcpy(handle->keydata, keydata, keylen);
	handle->keydata[keylen] = (unsigned char)0;
    }
Ian Murdock's avatar
Ian Murdock committed
256

257
258
259
    /* Set up the interface pointers: */
    handle->connect = pubsub_connect;
    handle->disconnect = pubsub_disconnect;
260
#ifdef THREADED
261
    assert(threaded == 1);
262
#ifndef __CYGWIN__
263
    assert(pubsub_is_threaded != NULL);
264
#endif
265
    handle->mainloop = NULL; /* no mainloop for mt programs */
266
#else
267
    assert(threaded == 0);
268
#ifndef __CYGWIN__
269
    assert(pubsub_is_threaded == NULL);
270
#endif
271
    handle->mainloop = pubsub_mainloop;
272
#endif
273
274
275
    handle->notify = pubsub_notify;
    handle->subscribe = pubsub_add_subscription;
    handle->unsubscribe = pubsub_rem_subscription;
Ian Murdock's avatar
Ian Murdock committed
276

277
278
279
280
    /* XXX parse server and port from "elvin://host:port" */
    cp = strdup(name);
    if (cp) {
      sstr = strrchr(cp, '/');
Ian Murdock's avatar
Ian Murdock committed
281
    }
282
    if (!sstr) {
283
      ERROR("could not parse: %s\n", name);
284
      goto bad;
285
    }
286
287
288
289
290
    *sstr++ = '\0';
    pstr = strrchr(sstr, ':');
    if (pstr) {
	    *pstr++ = '\0';
	    port = atoi(pstr);
Sachin Goyal's avatar
   
Sachin Goyal committed
291
292
    }

293
294
295
296
297
298
299
300
301
302
303
304
305
306
    /* Preallocate a pubsub handle so we can set the retry count */
    if (pubsub_alloc_handle(&server) != 0) {
        ERROR("could not allocate event server handle\n");
	goto bad;
    }

    /* set connection retries */
    if (pubsub_set_connection_retries(server,
				      retrycount, &handle->status) != 0) {
	ERROR("pubsub_set_connection_retries failed\n");
	goto bad;
    }

    /* Connect to the event server */
307
    if (handle->connect(sstr, port, &server) != 0) {
308
        ERROR("could not connect to event server\n");
309
	goto bad;
Ian Murdock's avatar
Ian Murdock committed
310
311
312
313
    }

    handle->server = server;

314
315
316
317
    /*
     * Keep track of how many handles we have outstanding
     */
    handles_in_use++;
318
    free(cp);
Ian Murdock's avatar
Ian Murdock committed
319
    return handle;
320
321
322
323
324

 bad:
    if (handle->keydata)
        free(handle->keydata);
    free(handle);
325
    free(cp);
326
    return 0;
Ian Murdock's avatar
Ian Murdock committed
327
328
}

329
330

/*
Ian Murdock's avatar
Ian Murdock committed
331
 * Unregister with the testbed event system. Returns non-zero if the
332
333
334
 * operation is successful, 0 otherwise.
 */

Ian Murdock's avatar
Ian Murdock committed
335
336
337
338
int
event_unregister(event_handle_t handle)
{
    if (!handle) {
Ian Murdock's avatar
Ian Murdock committed
339
        ERROR("invalid parameter\n");
Ian Murdock's avatar
Ian Murdock committed
340
341
342
        return 0;
    }

343
    TRACE("unregistering with event system (ipaddr=\"%s\")\n", ipaddr);
Ian Murdock's avatar
Ian Murdock committed
344

345
346
347
    /* Disconnect from the server: */
    if (handle->disconnect(handle->server) != 0) {
        ERROR("could not disconnect from Pubsub server\n");
Ian Murdock's avatar
Ian Murdock committed
348
349
350
        return 0;
    }

351
352
353
354
    TRACE("disconnect completed\n");

    handles_in_use--;

355
356
    if (handle->keydata)
        free(handle->keydata);
Ian Murdock's avatar
Ian Murdock committed
357
358
359
360
361
    free(handle);

    return 1;
}

362
363
364
365
366
367
368
369
370
371
372
373
374
375
/*
 * Callback for event_poll timeout that just records that the timeout
 * happened.
 */
static int
timeout_callback(pubsub_handle_t *handle, pubsub_timeout_t *timeout,
		 void *data, pubsub_error_t *error)
{
	assert(data != 0);
	assert(*(int *)data == 0);
	*(int *)data = 1;

	return 0;
}
376

Mike Hibler's avatar
Mike Hibler committed
377
/*
378
379
 * An internal function to handle the two different event_poll calls, without
 * making the library user mess around with arguments they don't care about.
Mike Hibler's avatar
Mike Hibler committed
380
381
 */
int
382
internal_event_poll(event_handle_t handle, int blocking, unsigned int timeout)
Mike Hibler's avatar
Mike Hibler committed
383
{
384
385
	int rv, triggered = 0;
	pubsub_timeout_t *pubsub_timeout = NULL;
Mike Hibler's avatar
Mike Hibler committed
386
387
388
389
390
391

	if (!handle->mainloop) {
		ERROR("multithreaded programs cannot use event_poll\n");
		return 0;
	}

392
393
394
395
396
397
	/*
	 * If the user wants a timeout, set up an elvin timeout now. We just
	 * use a NULL callback, so that it simply causes a timeout, and doesn't
	 * actually do anything.
	 */
	if (timeout) {
398
399
400
401
402
403
404
405
406
		pubsub_timeout = pubsub_add_timeout(handle->server, NULL,
						    timeout,
						    timeout_callback,
						    (void *)&triggered,
						    &handle->status);
		if (!pubsub_timeout) {
			ERROR("Elvin pubsub_sync_add_timeout failed\n");
			pubsub_error_fprintf(stderr, &handle->status);
			return pubsub_error_get_code(&handle->status);
407
408
		}
	}
409
410
411
412
	rv = pubsub_dispatch(handle->server, blocking, &handle->status);
	if (rv != 0) {
		ERROR("Pubsub dispatcher failed\n");
		pubsub_error_fprintf(stderr, &handle->status);
Mike Hibler's avatar
Mike Hibler committed
413
414
	}

415
416
/*	rv = pubsub_error_get_code(&handle->status); */

417
418
419
420
421
422
	/*
	 * Try to remove the timeout - if it didn't go off, we don't want to
	 * hit it later. We don't check the return value, since, if it did go
	 * off (and we don't really have a good way of knowing that), it's not
	 * there any more, so it looks like an error.
	 */
423
424
425
	if (timeout && pubsub_timeout && !triggered)
		pubsub_remove_timeout(handle->server, pubsub_timeout,
				      &handle->status);
426

427
	return rv;
Mike Hibler's avatar
Mike Hibler committed
428
429
}

430
/*
431
432
 * A non-blocking poll of the event system.
 * XXX not an actual poll, rather a "dispatch at most once".
433
434
435
436
437
438
439
440
441
 */
int
event_poll(event_handle_t handle)
{
	return internal_event_poll(handle,0,0);
}

/*
 * A blocking poll of the event system, with an optional timeout
442
 * XXX not an actual poll either, rather a "dispatch for awhile".
443
444
445
446
447
 */
int event_poll_blocking(event_handle_t handle, unsigned int timeout)
{
	return internal_event_poll(handle,1,timeout);
}
Mike Hibler's avatar
Mike Hibler committed
448

449
450
451
452
453
454
/*
 * Enter the main loop of the event system, waiting to receive event
 * notifications. Returns non-zero if the operation is successful, 0
 * otherwise.
 */

Ian Murdock's avatar
Ian Murdock committed
455
456
457
458
int
event_main(event_handle_t handle)
{
    if (!handle) {
Ian Murdock's avatar
Ian Murdock committed
459
        ERROR("invalid parameter\n"); 
Ian Murdock's avatar
Ian Murdock committed
460
461
462
        return 0;
    }

463
464
465
466
467
    if (!handle->mainloop) {
        ERROR("multithreaded programs don't need to call event_main\n");
        return 0;
    }

Timothy Stack's avatar
   
Timothy Stack committed
468
469
470
471
472
    if (handle->do_loop) {
	ERROR("loop is already running\n");
	return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
473
474
    TRACE("entering event loop...\n");

Timothy Stack's avatar
   
Timothy Stack committed
475
    handle->do_loop = 1;
476
    if (handle->mainloop(handle->server, &handle->do_loop, &handle->status)) {
477
        ERROR("Event mainloop failed: ");
478
        pubsub_error_fprintf(stderr, &handle->status);
479
	handle->do_loop = 0;
Ian Murdock's avatar
Ian Murdock committed
480
481
        return 0;
    }
482
    handle->do_loop = 0;
Ian Murdock's avatar
Ian Murdock committed
483
484
485
    return 1;
}

486

Timothy Stack's avatar
   
Timothy Stack committed
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
int event_stop_main(event_handle_t handle)
{
    if (!handle) {
	ERROR("invalid parameter\n");
	return 0;
    }

    if (!handle->mainloop) {
	ERROR("multithreaded programs do not have a mainloop\n");
	return 0;
    }

    if (!handle->do_loop) {
	ERROR("mainloop is not running\n");
	return 0;
    }

    handle->do_loop = 0;
    
    return 1;
}


510
511
512
/*
 * Send the event notification NOTIFICATION.  NOTIFICATION is
 * allocated by event_notification_alloc, and may optionally
513
 * have attributes added to it by event_notification_put_*.
514
 * Returns non-zero if the operation is successful, 0 otherwise.
Ian Murdock's avatar
Ian Murdock committed
515
516
517
518
 *
 * Note that NOTIFICATION is not deallocated by event_notify.  The
 * caller is responsible for deallocating the notification when it
 * is finished with it.
519
520
 */

Ian Murdock's avatar
Ian Murdock committed
521
522
523
int
event_notify(event_handle_t handle, event_notification_t notification)
{
Ian Murdock's avatar
Ian Murdock committed
524
525
    if (!handle || !notification) {
        ERROR("invalid parameter\n");
Ian Murdock's avatar
Ian Murdock committed
526
527
        return 0;
    }
528
529
530
531
    if (handle->keydata && !notification->has_hmac &&
	event_notification_insert_hmac(handle, notification)) {
        return 0;
    }
Ian Murdock's avatar
Ian Murdock committed
532

Ian Murdock's avatar
Ian Murdock committed
533
    TRACE("sending event notification %p\n", notification);
Ian Murdock's avatar
Ian Murdock committed
534
535

    /* Send notification to Elvin server for routing: */
536
537
    if (handle->notify(handle->server, notification->pubsub_notification,
		       &handle->status)) {
538
        ERROR("could not send event notification: ");
539
        pubsub_error_fprintf(stderr, &handle->status);
Ian Murdock's avatar
Ian Murdock committed
540
541
542
543
544
545
        return 0;
    }

    return 1;
}

546

547
548
549
550
551
552
/*
 * Schedule the event notification NOTIFICATION to be sent at time
 * TIME.  NOTIFICATION is allocated by event_notification_alloc,
 * and may optionally have attributes added to it by
 * event_notification_put_*.  Returns non-zero if the operation
 * is successful, 0 otherwise.
Ian Murdock's avatar
Ian Murdock committed
553
554
555
556
 *
 * This function essentially operates as a deferred event_notify.
 * event_notify sends notifications immediately,
 * whereas event_schedule sends notifications at some later time.
Ian Murdock's avatar
Ian Murdock committed
557
558
559
560
 *
 * Note that NOTIFICATION is not deallocated by event_schedule.
 * The caller is responsible for deallocating the notification
 * when it is finished with it.
561
562
563
564
565
566
 */

int
event_schedule(event_handle_t handle, event_notification_t notification,
               struct timeval *time)
{
Ian Murdock's avatar
Ian Murdock committed
567
568
    if (!handle || !notification || !time) {
        ERROR("invalid parameter\n");
569
570
571
        return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
572
573
    TRACE("scheduling event notification %p to be sent at time (%ld, %ld)\n",
          notification, time->tv_sec, time->tv_usec);
574

575
576
577
578
    /*
     * Add an attribute that signifies its a scheduler operation.
     */
    if (! event_notification_remove(handle, notification, "SCHEDULER") ||
579
580
	! event_notification_put_int32(handle,
				       notification, "SCHEDULER", 1)) {
581
	ERROR("could not add scheduler attribute to notification %p\n",
Ian Murdock's avatar
Ian Murdock committed
582
              notification);
583
584
585
586
587
588
589
590
591
592
        return 0;
    }

    /* Add the time this event should be fired to the notification
       structure: */

    if (event_notification_put_int32(handle, notification, "time_sec",
                                     time->tv_sec)
        == 0)
    {
Ian Murdock's avatar
Ian Murdock committed
593
594
        ERROR("could not add time.tv_sec attribute to notification %p\n",
              notification);
595
596
597
598
599
600
601
        return 0;
    }
    if (event_notification_put_int32(handle, notification, "time_usec",
                                     time->tv_usec)
        == 0)

    {
Ian Murdock's avatar
Ian Murdock committed
602
603
        ERROR("could not add time.tv_usec attribute to notification %p\n",
              notification);
604
605
606
607
608
609
610
        return 0;
    }

    /* Send the event notification: */
    return event_notify(handle, notification);
}

611
/*
612
613
614
 * Allocate an event notification.  The address TUPLE defines who
 * should receive the notification. Returns a pointer to an event
 * notification structure if the operation is successful, 0 otherwise.
615
616
 */

Ian Murdock's avatar
Ian Murdock committed
617
event_notification_t
618
event_notification_alloc(event_handle_t handle, address_tuple_t tuple)
Ian Murdock's avatar
Ian Murdock committed
619
{
620
    event_notification_t notification;
621
    pubsub_notification_t *pubsub_notification;
Ian Murdock's avatar
Ian Murdock committed
622

623
    if (!handle) {
Ian Murdock's avatar
Ian Murdock committed
624
        ERROR("invalid paramater\n");
Ian Murdock's avatar
Ian Murdock committed
625
626
627
        return NULL;
    }

628
    TRACE("allocating notification (tuple=%p)\n", tuple);
Ian Murdock's avatar
Ian Murdock committed
629

Timothy Stack's avatar
   
Timothy Stack committed
630
    notification = xmalloc(sizeof(struct event_notification));
631
632
633
634
635
    pubsub_notification = pubsub_notification_alloc(handle->server,
						    &handle->status);
    if (pubsub_notification == NULL) {
        ERROR("pubsub_notification_alloc failed: ");
        pubsub_error_fprintf(stderr, &handle->status);
Ian Murdock's avatar
Ian Murdock committed
636
637
        return NULL;
    }
638
    notification->pubsub_notification = pubsub_notification;
639
640
    notification->has_hmac = 0;

641
642
643
644
645
646
647
648
649
    /*
     * Event version number
     */
    if (!event_notification_set_version(handle, notification,
					EVENT_LIBRARY_VERSION)) {
        ERROR("pubsub_notification_alloc failed to set version number\n");
	event_notification_free(handle, notification);
        return NULL;
    }
650
651
    if (tuple == NULL)
	    return notification;
Ian Murdock's avatar
Ian Murdock committed
652

Ian Murdock's avatar
Ian Murdock committed
653
    TRACE("allocated notification %p\n", notification);
654
655
656
657
658
659
#define EVPUT(name, field) \
({ \
	char *foo = (tuple->field ? tuple->field : ADDRESSTUPLE_ALL); \
	\
	event_notification_put_string(handle, notification, name, foo); \
})
660

661
662
663
664
665
666
667
668
    /* Add the target address stuff to the notification */
    if (!EVPUT("SITE", site) ||
	!EVPUT("EXPT", expt) ||
	!EVPUT("GROUP", group) ||
	!EVPUT("HOST", host) ||
	!EVPUT("OBJTYPE", objtype) ||
	!EVPUT("OBJNAME", objname) ||
	!EVPUT("EVENTTYPE", eventtype) ||
Timothy Stack's avatar
   
Timothy Stack committed
669
	!EVPUT("TIMELINE", timeline) ||
670
671
672
	!event_notification_put_int32(handle,
				      notification, "SCHEDULER",
				      tuple->scheduler)) {
673
	ERROR("could not add attributes to notification %p\n", notification);
Ian Murdock's avatar
Ian Murdock committed
674
675
        return NULL;
    }
676
677
678
    /* Add our address */
    event_notification_set_sender(handle, notification, ipaddr);

Ian Murdock's avatar
Ian Murdock committed
679
680
681
    return notification;
}

682
683

/*
Ian Murdock's avatar
Ian Murdock committed
684
 * Free the event notification NOTIFICATION. Returns non-zero if the
685
686
687
 * operation is successful, 0 otherwise.
 */

Ian Murdock's avatar
Ian Murdock committed
688
689
int
event_notification_free(event_handle_t handle,
690
			event_notification_t notification)
Ian Murdock's avatar
Ian Murdock committed
691
{
Timothy Stack's avatar
   
Timothy Stack committed
692
693
694
695
    if (!notification) {
	return 1;
    }
    
696
    if (!handle || !notification->pubsub_notification) {
Ian Murdock's avatar
Ian Murdock committed
697
        ERROR("invalid parameter\n");
Ian Murdock's avatar
Ian Murdock committed
698
699
700
        return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
701
    TRACE("freeing notification %p\n", notification);
Ian Murdock's avatar
Ian Murdock committed
702

703
704
    pubsub_notification_free(handle->server, notification->pubsub_notification,
			     &handle->status);
705
    free(notification);
Ian Murdock's avatar
Ian Murdock committed
706
707
708
709

    return 1;
}

710
711
712
713
714
715
716
717
718
719
/*
 * Clones (copies) the event notificaion. Returns the copy if successful,
 * or NULL if it is not.
 */
event_notification_t
event_notification_clone(event_handle_t handle,
			 event_notification_t notification)
{
    event_notification_t clone;

720
    if (!handle || !notification || !notification->pubsub_notification) {
721
722
723
724
725
726
        ERROR("invalid parameter\n");
        return 0;
    }

    TRACE("cloning notification %p\n", notification);

Timothy Stack's avatar
   
Timothy Stack committed
727
    clone = xmalloc(sizeof(struct event_notification));
728
729
730
731
732
733
    if (! (clone->pubsub_notification =
	   pubsub_notification_clone(handle->server,
				     notification->pubsub_notification,
				     &handle->status))) {
        ERROR("pubsub_notification_clone failed: ");
        pubsub_error_fprintf(stderr, &handle->status);
734
	free(clone);
735
736
        return 0;
    }
737
    clone->has_hmac = notification->has_hmac;
738
739
740
741

    return clone;
}

742
743

/*
744
745
746
747
 * Get the double attribute with name NAME from the event
 * notification NOTIFICATION.
 * Writes the value of the attribute to *VALUE and returns
 * non-zero if the named attribute is found, 0 otherwise.
748
749
 */

Ian Murdock's avatar
Ian Murdock committed
750
int
751
752
753
event_notification_get_double(event_handle_t handle,
                              event_notification_t notification,
                              char *name, double *value)
Ian Murdock's avatar
Ian Murdock committed
754
{
Ian Murdock's avatar
Ian Murdock committed
755
756
    if (!handle || !notification || !name || !value) {
        ERROR("invalid parameter\n");
757
758
759
        return 0;
    }

760
761
    if (pubsub_notification_get_real64(notification->pubsub_notification,
				       name, value, &handle->status) != 0) {
Ian Murdock's avatar
Ian Murdock committed
762
        ERROR("could not get double attribute \"%s\" from notification %p\n",
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
              name, notification);
        return 0;
    }

    return 1;
}


/*
 * Get the int32 attribute with name NAME from the event
 * notification NOTIFICATION.
 * Writes the value of the attribute to *VALUE and returns
 * non-zero if the named attribute is found, 0 otherwise.
 */

int
event_notification_get_int32(event_handle_t handle,
                             event_notification_t notification,
                             char *name, int32_t *value)
{
Ian Murdock's avatar
Ian Murdock committed
783
784
    if (!handle || !notification || !name || !value) {
        ERROR("invalid parameter\n");
785
786
787
        return 0;
    }

788
789
    if (pubsub_notification_get_int32(notification->pubsub_notification,
				      name, value, &handle->status) != 0) {
Ian Murdock's avatar
Ian Murdock committed
790
        ERROR("could not get int32 attribute \"%s\" from notification %p\n",
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
              name, notification);
        return 0;
    }

    return 1;
}


/*
 * Get the int64 attribute with name NAME from the event
 * notification NOTIFICATION.
 * Writes the value of the attribute to *VALUE and returns
 * non-zero if the named attribute is found, 0 otherwise.
 */

int
event_notification_get_int64(event_handle_t handle,
                             event_notification_t notification,
                             char *name, int64_t *value)
{
Ian Murdock's avatar
Ian Murdock committed
811
812
    if (!handle || !notification || !name || !value) {
        ERROR("invalid parameter\n");
813
814
815
        return 0;
    }

816
817
    if (pubsub_notification_get_int64(notification->pubsub_notification,
				      name, value, &handle->status) != 0) {
Ian Murdock's avatar
Ian Murdock committed
818
        ERROR("could not get int64 attribute \"%s\" from notification %p\n",
819
820
821
822
823
824
825
826
              name, notification);
        return 0;
    }

    return 1;
}


827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
/*
 * Return the length of a attribute with name NAME.
 * Used to dynamically size buffers for the event_notification_get_* calls.
 * Returns the length or -1 on error.
 *
 * Note that we only do this for opaques and strings as the other types
 * all have a "standard" size.
 */

int
event_notification_get_opaque_length(event_handle_t handle,
				     event_notification_t notification,
				     char *name)
{
    char *v;
    int len;

    if (!handle || !notification || !name) {
        ERROR("invalid parameter\n");
        return -1;
    }

    if (pubsub_notification_get_opaque(notification->pubsub_notification,
				       name, &v, &len, &handle->status) != 0) {
        ERROR("could not get opaque attribute \"%s\" from notification %p\n",
              name, notification);
        return -1;
    }

    return len;
}

int
event_notification_get_string_length(event_handle_t handle,
				     event_notification_t notification,
				     char *name)
{
    char *v;

    if (!handle || !notification || !name) {
        ERROR("invalid parameter\n");
        return -1;
    }

    if (pubsub_notification_get_string(notification->pubsub_notification,
				       name, &v, &handle->status) != 0) {
        ERROR("could not get string attribute \"%s\" from notification %p\n",
              name, notification);
        return -1;
    }

    return strlen(v);
}

881
882
883
/*
 * Get the opaque attribute with name NAME from the event
 * notification NOTIFICATION.
Ian Murdock's avatar
Ian Murdock committed
884
885
 * Writes LENGTH bytes into *BUFFER and returns non-zero if the named
 * attribute is found, 0 otherwise.
886
887
888
889
890
891
892
 */

int
event_notification_get_opaque(event_handle_t handle,
                              event_notification_t notification,
                              char *name, void *buffer, int length)
{
893
894
    char *v;
    int len;
895

Ian Murdock's avatar
Ian Murdock committed
896
897
    if (!handle || !notification || !name || !buffer || !length) {
        ERROR("invalid parameter\n");
898
899
900
        return 0;
    }

901
902
    if (pubsub_notification_get_opaque(notification->pubsub_notification,
				       name, &v, &len, &handle->status) != 0) {
Ian Murdock's avatar
Ian Murdock committed
903
        ERROR("could not get opaque attribute \"%s\" from notification %p\n",
904
905
906
907
              name, notification);
        return 0;
    }

908
909
910
911
912
913
    if (len < length) {
	memcpy(buffer, v, len);
	memset(buffer+len, 0, length-len);
    } else {
	memcpy(buffer, v, length);
    }
914
915
916
917
918
919
920
921

    return 1;
}


/*
 * Get the string attribute with name NAME from the event
 * notification NOTIFICATION.
922
 * Writes up to LENGTH bytes into *BUFFER and returns non-zero if the named
Ian Murdock's avatar
Ian Murdock committed
923
 * attribute is found, 0 otherwise.
924
 * The returned value will always be null terminated.
925
926
927
928
929
930
931
 */

int
event_notification_get_string(event_handle_t handle,
                              event_notification_t notification,
                              char *name, char *buffer, int length)
{
932
    char *v;
933

Ian Murdock's avatar
Ian Murdock committed
934
935
    if (!handle || !notification || !name || !buffer || !length) {
        ERROR("invalid parameter\n");
936
937
938
        return 0;
    }

939
940
    if (pubsub_notification_get_string(notification->pubsub_notification,
				       name, &v, &handle->status) != 0) {
941
	buffer[0] = '\0';
942
943
944
        return 0;
    }

945
    strncpy(buffer, v, length);
946

947
948
949
950
951
952
953
954
955
956
957
    /*
     * We never documented whether returned strings for values whose
     * length is >= "length" are null-terminated or not.  Previously,
     * there were not, but most of our callers never checked.  Hence,
     * as of 1/25/11 we force null termination.
     *
     * Note that if someone passes a bogus length value, this will
     * now blow up where it formerly might not have.
     */
    buffer[length-1] = '\0';

958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
    return 1;
}


/*
 * Add a double attribute with name NAME and value VALUE to the
 * notification NOTIFICATION.
 * Returns non-zero if the operation is successful, 0 otherwise.
 */

int
event_notification_put_double(event_handle_t handle,
                              event_notification_t notification,
                              char *name, double value)
{
Ian Murdock's avatar
Ian Murdock committed
973
974
    if (!handle || !notification || !name) {
        ERROR("invalid parameter\n");
975
976
977
        return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
978
979
    TRACE("adding attribute (name=\"%s\", value=%f) to notification %p\n",
          name, value, notification);
980

981
982
    if (pubsub_notification_add_real64(notification->pubsub_notification,
				       name, value, &handle->status) != 0)
983
    {
984
985
        ERROR("pubsub_notification_add_real64 failed: ");
        pubsub_error_fprintf(stderr, &handle->status);
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
        return 0;
    }
    return 1;
}


/*
 * Add an int32 attribute with name NAME and value VALUE to the
 * notification NOTIFICATION.
 * Returns non-zero if the operation is successful, 0 otherwise.
 */

int
event_notification_put_int32(event_handle_t handle,
                             event_notification_t notification,
1001
                             char *name, int value)
1002
{
Ian Murdock's avatar
Ian Murdock committed
1003
1004
    if (!handle || !notification || !name) {
        ERROR("invalid parameter\n");
1005
1006
1007
        return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
1008
1009
    TRACE("adding attribute (name=\"%s\", value=%d) to notification %p\n",
          name, value, notification);
1010

1011
1012
    if (pubsub_notification_add_int32(notification->pubsub_notification,
				      name, value, &handle->status) != 0)
1013
    {
1014
1015
        ERROR("pubsub_notification_add_int32 failed: ");
        pubsub_error_fprintf(stderr, &handle->status);
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
        return 0;
    }

    return 1;
}


/*
 * Add an int64 attribute with name NAME and value VALUE to the
 * notification NOTIFICATION.
 * Returns non-zero if the operation is successful, 0 otherwise.
 */

int
event_notification_put_int64(event_handle_t handle,
                             event_notification_t notification,
                             char *name, int64_t value)
{
Ian Murdock's avatar
Ian Murdock committed
1034
1035
    if (!handle || !notification || !name) {
        ERROR("invalid parameter\n");
1036
1037
1038
        return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
1039
1040
    TRACE("adding attribute (name=\"%s\", value=%lld) to notification %p\n",
          name, value, notification);
1041

1042
1043
    if (pubsub_notification_add_int64(notification->pubsub_notification,
				      name, value, &handle->status) != 0)
1044
    {
1045
1046
        ERROR("pubsub_notification_add_int64 failed: ");
        pubsub_error_fprintf(stderr, &handle->status);
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
        return 0;
    }

    return 1;
}


/*
 * Add an opaque attribute with name NAME to the notification
 * NOTIFICATION. The attribute is stored in the buffer BUFFER
 * with length LENGTH.
 * Returns non-zero if the operation is successful, 0 otherwise.
 */

int
event_notification_put_opaque(event_handle_t handle,
                              event_notification_t notification,
                              char *name, void *buffer, int length)
{
Ian Murdock's avatar
Ian Murdock committed
1066
1067
    if (!handle || !notification || !buffer || !length) {
        ERROR("invalid parameter\n");
1068
1069
1070
        return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
1071
1072
    TRACE("adding attribute (name=\"%s\", value=<opaque>) "
          "to notification %p\n", name, notification);
1073

1074
1075
1076
    if (pubsub_notification_add_opaque(notification->pubsub_notification,
				       name, buffer, length,
				       &handle->status) != 0)
1077
    {
1078
1079
        ERROR("pubsub_notification_add_opaque failed: ");
        pubsub_error_fprintf(stderr, &handle->status);
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
        return 0;
    }

    return 1;
}


/*
 * Add a string attribute with name NAME and value VALUE to the
 * notification NOTIFICATION.
 * Returns non-zero if the operation is successful, 0 otherwise.
 */

int
event_notification_put_string(event_handle_t handle,
                              event_notification_t notification,
                              char *name, char *value)
{
Ian Murdock's avatar
Ian Murdock committed
1098
1099
    if (!handle || !notification || !name || !value) {
        ERROR("invalid parameter\n");
1100
1101
1102
        return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
1103
1104
    TRACE("adding attribute (name=\"%s\", value=\"%s\") to notification %p\n",
          name, value, notification);
1105

1106
1107
    if (pubsub_notification_add_string(notification->pubsub_notification,
				       name, value, &handle->status) != 0)
1108
    {
1109
1110
        ERROR("pubsub_notification_add_string failed: ");
        pubsub_error_fprintf(stderr, &handle->status);
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
        return 0;
    }

    return 1;
}


/*
 * Remove the attribute with name NAME and type TYPE from the event
 * notification NOTIFICATION.  Returns non-zero if the operation is
 * successful, 0 otherwise.
 */

int
event_notification_remove(event_handle_t handle,
                          event_notification_t notification, char *name)
{
Ian Murdock's avatar
Ian Murdock committed
1128
1129
    if (!handle || !notification || !name) {
        ERROR("invalid parameter\n");
1130
1131
1132
        return 0;
    }

Ian Murdock's avatar
Ian Murdock committed
1133
1134
    TRACE("removing attribute \"%s\" from notification %p\n",
          name, notification);
1135

1136
1137
1138
1139
    if (pubsub_notification_remove(notification->pubsub_notification,
				   name, &handle->status) != 0) {
        ERROR("pubsub_notification_remove of %s failed: ", name);
        pubsub_error_fprintf(stderr, &handle->status);
1140
        return 0;
Ian Murdock's avatar
Ian Murdock committed
1141
1142
1143
1144
1145
    }

    return 1;
}

1146

Ian Murdock's avatar
Ian Murdock committed
1147
1148
1149
struct notify_callback_arg {
    event_notify_callback_t callback;
    void *data;
1150
    event_handle_t handle;
Timothy Stack's avatar
   
Timothy Stack committed
1151
    int do_auth;
Ian Murdock's avatar
Ian Murdock committed
1152
1153
};

1154
1155
1156
1157
static void notify_callback(pubsub_handle_t *server,
                            pubsub_subscription_t *subscription,
                            pubsub_notification_t *notification,
			    void *rock);
Ian Murdock's avatar
Ian Murdock committed
1158

1159
1160
1161
1162
1163
1164
struct subscription_callback_arg {
    event_subscription_callback_t callback;
    void *data;
    event_handle_t handle;
};

1165
static void subscription_callback(pubsub_handle_t *server,
1166
				  int result,
1167
				  pubsub_subscription_t *subscription,
1168
				  void *rock, pubsub_error_t *myerror);
1169

1170
#define EXPRESSION_LENGTH 8192
Ian Murdock's avatar
Ian Murdock committed
1171

1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
/*
 * Subscribe to events of type TYPE.  Event notifications that match
 * TYPE will be passed to the callback function CALLBACK; DATA is
 * an arbitrary pointer that will be passed to the callback function.
 * Callback functions are of the form
 *
 *     void callback(event_handle_t handle,
 *                   event_notification_t notification,
 *                   void *data);
 *
Ian Murdock's avatar
Ian Murdock committed
1182
 * where HANDLE is the handle to the event server, NOTIFICATION is the
1183
 * event notification, and DATA is the arbitrary pointer passed to
1184
1185
1186
 * event_subscribe.  Returns a pointer to an event
 * subscription structure if the operation is successful, 0 otherwise.
 */
1187

1188
1189
1190
1191
1192
1193
1194
1195
1196
/*
 * This routine takes a "FOO,BAR" string and breaks it up into
 * separate (TAG==FOO || TAG==BAR) clauses.
 */
static int
addclause(char *tag, char *clause, char *exp, int size, int *index)
{
	int	count = 0;
	char	*bp;
1197
1198
	char    clausecopy[EXPRESSION_LENGTH], *strp = clausecopy;
	char	buf[EXPRESSION_LENGTH];
1199
#if 0
1200
	int     needglob = 1;
1201
#endif
1202
1203

	/* Must copy clause since we use strsep! */
1204
1205
	if (strlen(clause) >= sizeof(clausecopy)-1)
		goto bad;
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
	strcpy(clausecopy, clause);

	/*
	 * Build expression of "or" statements.
	 */
	while (strp && count < sizeof(buf)) {
		bp = strsep(&strp, " ,");

		/* Empty token (two delimiters next to each other) */
		if (! *bp)
			continue;
1217

1218
#if 0
1219
1220
		if (! strcmp("*", bp))
			needglob = 0;
1221
#endif
1222
1223
1224
1225
1226
1227
1228
		
		count += snprintf(&buf[count], sizeof(buf) - count,
				  "%s %s == \"%s\" ",
				  (count ? "||" : ""), tag, bp);
	}
	if (strp || count >= sizeof(buf))
		goto bad;
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
#if 0
	if (needglob) {
		count += snprintf(&buf[count], sizeof(buf) - count,
				  "%s %s == \"*\" ",
				  (count ? "||" : ""), tag);

		if (count >= size)
			goto bad;
	}
#endif
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
	/*
	 * And wrap in parens (add an "and" if not the first clause).
	 */
	count = snprintf(exp, size, "%s (%s) ", (*index ? "&&" : ""), buf);
	if (count >= size)
		goto bad;
	
	*index += count;
	return 1;
 bad:
	ERROR("Ran out of room for subscription clause: %s %s\n", tag, clause);
	return 0;	
}

1253
1254
static char *
tuple_expression(address_tuple_t tuple, char *expression, int elen)
Timothy Stack's avatar
   
Timothy Stack committed
1255
{
1256
    char *retval = expression;
1257
    int index = 0;
Ian Murdock's avatar
Ian Murdock committed
1258

1259
1260
    if (tuple->site &&
	! addclause("SITE", tuple->site,
1261
		    &expression[index], elen - index, &index))