Commit b30e4f95 authored by Leigh B. Stoller's avatar Leigh B. Stoller
Browse files

Add some small stuff to support the current implementation of the plab

evproxy, which uses set_failover and set_connection_retries. The event
library had these as noops, so that was an easy change. Also, add back
in the async add/remove subscription stuff, which was already
implemented in pubsub but not hooked up from the event library.

In the tmcd/plab directory I purged all mention of elvin and changed it
to "event server". I also renamed the runelvin script to runevents.

In events/proxy I cleaned up the makefile and added evproxyplab to the
targets list since it should now build okay (no longer needs to link
against elvin stuff). Renamed elvindtest program to eventping, and
otherwise purged lots of "elvin" tokens.
parent 5756edfb
......@@ -15,8 +15,6 @@
* make sure handle->status (and error args in general) is correct.
* make sure _t types are passed as pointers-to
* deal with hmac_traverse
* implement async_subscribe/unsubscribe in the pubsub code
* implement set_idle_period and set_failover in pubsub code
*/
#include <stdio.h>
......@@ -25,6 +23,7 @@
#include <string.h>
#include <unistd.h>
#include <netdb.h>
#include <limits.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>
......@@ -94,7 +93,8 @@ event_register(char *name, int threaded)
event_handle_t
event_register_withkeyfile(char *name, int threaded, char *keyfile) {
return event_register_withkeyfile_withretry(name, threaded, keyfile, -1);
return event_register_withkeyfile_withretry(name,
threaded, keyfile, INT_MAX);
}
event_handle_t
......@@ -133,7 +133,7 @@ event_handle_t
event_register_withkeydata(char *name, int threaded,
unsigned char *keydata, int keylen){
return event_register_withkeydata_withretry(name, threaded, keydata,
keylen, -1);
keylen, INT_MAX);
}
......@@ -236,9 +236,22 @@ event_register_withkeydata_withretry(char *name, int threaded,
port = atoi(pstr);
}
/* Connect to the elvin server: */
/* Preallocate a pubsub handle so we can set the retry count */
if (pubsub_alloc_handle(&server) != 0) {
ERROR("could not allocate event server handle\n");
goto bad;
}
/* set connection retries */
if (pubsub_set_connection_retries(server,
retrycount, &handle->status) != 0) {
ERROR("pubsub_set_connection_retries failed\n");
goto bad;
}
/* Connect to the event server */
if (handle->connect(sstr, port, &server) != 0) {
ERROR("could not connect to Elvin server\n");
ERROR("could not connect to event server\n");
goto bad;
}
......@@ -407,7 +420,7 @@ event_main(event_handle_t handle)
handle->do_loop = 1;
if (handle->mainloop(handle->server, &handle->do_loop, &handle->status)) {
ERROR("Elvin mainloop failed: ");
ERROR("Event mainloop failed: ");
pubsub_error_fprintf(stderr, &handle->status);
return 0;
}
......@@ -1024,7 +1037,7 @@ struct subscription_callback_arg {
static void subscription_callback(pubsub_handle_t *server,
int result,
pubsub_subscription_t *subscription,
void *rock);
void *rock, pubsub_error_t *myerror);
#define EXPRESSION_LENGTH 8192
......@@ -1245,21 +1258,13 @@ event_async_subscribe(event_handle_t handle, event_notify_callback_t callback,
sarg->data = scb_data;
sarg->handle = handle;
#if 1
ERROR("async_add_subscribe not implemented\n");
retval = -1;
#else
retval = pubsub_async_add_subscription(handle->server,
retval = pubsub_add_subscription_async(handle->server,
expression,
NULL,
1,
notify_callback,
arg,
subscription_callback,
sarg,
&handle->status);
#endif
if (retval != 0) {
free(arg);
free(sarg);
......@@ -1280,16 +1285,10 @@ event_async_unsubscribe(event_handle_t handle, event_subscription_t es)
/* free(es->rock);
es->rock = NULL; */
#if 1
ERROR("async_add_subscribe not implemented\n");
retval = -1;
#else
retval = pubsub_async_delete_subscription(handle->server,
es,
NULL,
NULL,
&handle->status);
#endif
retval = pubsub_rem_subscription_async(handle->server, es,
NULL, NULL,
&handle->status);
return (retval == 0);
}
......@@ -1371,7 +1370,7 @@ static void
subscription_callback(pubsub_handle_t *server,
int result,
pubsub_subscription_t *subscription,
void *rock)
void *rock, pubsub_error_t *myerror)
{
struct subscription_callback_arg *arg =
(struct subscription_callback_arg *) rock;
......@@ -2133,18 +2132,11 @@ int event_set_idle_period(event_handle_t handle, int seconds) {
ERROR("invalid parameter\n");
return 0;
}
#if 1
ERROR("event_set_idle_period not implemented\n");
retval = -1;
#else
retval = pubsub_handle_set_idle_period(handle->server, seconds,
&handle->status);
if (retval == 0) {
retval = pubsub_set_idle_period(handle->server, seconds, &handle->status);
if (retval != 0) {
ERROR("could not set elvin idle period to %i", seconds);
pubsub_error_fprintf(stderr, &handle->status);
}
#endif
return retval;
}
......@@ -2156,17 +2148,10 @@ int event_set_failover(event_handle_t handle, int dofail) {
ERROR("invalid parameter\n");
return 0;
}
#if 1
ERROR("event_set_failover not implemented\n");
retval = -1;
#else
retval = pubsub_handle_set_failover(handle->server, dofail,
&handle->status);
if (retval == 0) {
retval = pubsub_set_failover(handle->server, dofail, &handle->status);
if (retval != 0) {
ERROR("Could not set failover on event handle: ");
pubsub_error_fprintf(stderr, &handle->status);
}
#endif
return retval;
}
......@@ -10,7 +10,7 @@ OBJDIR = ../..
SUBDIR = event/proxy
SYSTEM := $(shell uname -s)
PROGRAMS = evproxy proxytest elvindtest
PROGRAMS = evproxy proxytest eventping evproxyplab
ifneq ($(SYSTEM),Linux)
PROGRAMS +=
endif
......@@ -23,6 +23,7 @@ include $(TESTBED_SRCDIR)/GNUmakerules
LIBTBDIR = $(OBJDIR)/lib/libtb
LIBEVENTDIR = ../lib
CPPC = g++
# Rules to make sure that some libraries we need from other directories get
# built first
......@@ -43,10 +44,14 @@ PFLAGS += -O2 -g -Wall
PFLAGS += -I. -I${OBJDIR} -I$(SRCDIR)/../lib -I$(TESTBED_SRCDIR)/lib/libtb
PFLAGS += -L/usr/local/lib -lpubsub_r -lm
PLDFLAGS += -L$(OBJDIR)/lib -L$(OBJDIR)/lib/libtb -L../lib
PLIBS = -levent_r -ltb -lcrypto
PLIBS += -L/usr/local/lib -lpubsub_r -lm
PLDFLAGS += $(LDSTATIC)
LDFLAGS += -L$(OBJDIR)/lib -L$(OBJDIR)/lib/libtb -L../lib
LIBS += -levent_r -ltb -lcrypto
LIBS += -L/usr/local/lib -lpubsub_r -lm
CPPC = g++
LIBS = -levent -ltb -lcrypto
LIBS += -L/usr/local/lib -lpubsub -lm
LDFLAGS += $(LDSTATIC)
# Deal with the presence/absence of kerberos in the linux ssl library
......@@ -59,45 +64,30 @@ endif
ifeq ($(NOKERB),0)
CFLAGS += `/usr/kerberos/bin/krb5-config --cflags`
LIBS += `/usr/kerberos/bin/krb5-config --libs krb5`
PLIBS += `/usr/kerberos/bin/krb5-config --libs krb5`
ifneq ($(wildcard /usr/lib/libkrb5support.a),)
LIBS += -lkrb5support
PLIBS += -lkrb5support
endif
endif
endif
ifeq ($(SYSTEM),Linux)
LIBS += -ldl -lz
PLIBS += -ldl -lz
endif
PLDFLAGS = -pthread
PLDFLAGS += -L../lib -L${OBJDIR}/lib/libtb
PLIBS = -levent_r -ltb
PLIBS += -L/usr/local/lib -lvin4mt -lvin4c -lvin4 -lcrypto -lm
evproxy-debug: evproxy.o $(LIBTBDIR)/log.o
$(CC) $(LDFLAGS) -o $@ -pthread evproxy.o $(LIBS)
evproxyclient-debug: proxyclient.o $(LIBTBDIR)/log.o
$(CC) $(LDFLAGS) -o $@ proxyclient.o $(LIBS)
evproxyserver-debug: proxyserver.o $(LIBTBDIR)/log.o
$(CC) $(PLDFLAGS) -o $@ proxyserver.o $(PLIBS)
proxyserver.o: proxyserver.c
$(CC) $(PFLAGS) -c $<
proxytest-debug: proxytest.o $(LIBTBDIR)/log.o
$(CC) $(LDFLAGS) -o $@ proxytest.o -pthread $(LIBS)
$(CC) $(PLDFLAGS) -o $@ -pthread evproxy.o $(PLIBS)
evproxyplab.o: evproxyplab.cc
$(CPPC) $(CFLAGS) -c $<
evproxyplab: evproxyplab.o ../lib/libevent.a ../lib/event.h
$(CPPC) $(LDFLAGS) -o $@ evproxyplab.o $(LIBS)
elvindtest-debug: elvindtest.o
$(CC) $(LDFLAGS) -o $@ elvindtest.o -pthread $(LIBS)
evproxyplab-debug: evproxyplab.o ../lib/libevent.a ../lib/event.h
$(CPPC) $(LDFLAGS) -static -o $@ evproxyplab.o $(LIBS)
eventping-debug: eventping.o
$(CC) $(LDFLAGS) -static -o $@ eventping.o -pthread $(LIBS)
$(PROGRAMS): ../lib/libevent.a ../lib/event.h
......
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2003, 2004 University of Utah and the Flux Group.
* Copyright (c) 2007 University of Utah and the Flux Group.
* All rights reserved.
*/
......
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2003, 2004 University of Utah and the Flux Group.
* Copyright (c) 2003, 2004, 2007 University of Utah and the Flux Group.
* All rights reserved.
*/
......@@ -14,6 +14,7 @@
#include <time.h>
#include <math.h>
#include <paths.h>
#include <sys/errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
......@@ -57,12 +58,9 @@ sched_callback(event_handle_t handle,
static void subscribe_callback(event_handle_t handle, int result,
event_subscription_t es, void *data);
static void async_callback(event_handle_t handle, int result,
event_subscription_t es, void *data);
static void status_callback(elvin_handle_t handle, char *url,
elvin_status_event_t event, void *data,
elvin_error_t error);
static void status_callback(pubsub_handle_t *handle,
pubsub_status_t status, void *data,
pubsub_error_t *error);
static void schedule_updateevent();
......@@ -120,7 +118,7 @@ main(int argc, char **argv)
usage(progname);
if ((! pnodeid) || (! lport))
fatal("Must provide pnodeid and local elvin port");
fatal("Must provide pnodeid and local event server port");
if (debug) {
......@@ -198,7 +196,7 @@ main(int argc, char **argv)
}
/*
* Setup local elvind subscriptions:
* Setup local subscriptions:
*/
tuple = address_tuple_alloc();
......@@ -210,7 +208,7 @@ main(int argc, char **argv)
fatal("could not subscribe to events on local server");
}
info("Successfully connected to local elvind.\n");
info("Successfully connected to local pubsubd.\n");
/*
* Stash the pid away.
......@@ -244,13 +242,13 @@ main(int argc, char **argv)
"Sleeping for a bit before trying again...\n");
sleep(10);
}
info("Remote elvind registration complete.\n");
info("Remote pubsub registration complete.\n");
/* Jump into the main event loop. */
event_main(bosshandle);
/*
* If we drop out of the event loop, it's because there was/is
* some kind of problem with the connection to the remote elvind.
* So, clean up and re-register (re-subscribe).
* some kind of problem with the connection to the remote event
* server. So, clean up and re-register (re-subscribe).
*/
error("exited event_main: retrying remote registration.\n");
event_unregister(bosshandle);
......@@ -289,8 +287,9 @@ int do_remote_register(char *server) {
event_set_failover(bosshandle, 0);
/* Setup a status callback to watch the remote connection. */
if (!elvin_handle_set_status_cb(bosshandle->server, status_callback,
server, bosshandle->status)) {
if (pubsub_set_status_callback(bosshandle->server,
status_callback,
server, &bosshandle->status) != 0) {
error("Could not register status callback!");
}
......@@ -395,24 +394,6 @@ callback(event_handle_t handle, event_notification_t notification, void *data)
if (! retval) {
error("could not subscribe to events on remote server.\n");
}
#if 0
/* This stuff is borked right now - part of the pre-staging
* enhancement, but the code was failing to properly track
* (and hence unsubscribe) from both the standard experiment
* path, and the pre-stage path.
*/
tuple->scheduler = 2;
retval = event_async_subscribe(bosshandle,
expt_callback, tuple, NULL,
async_callback,
NULL, 1);
if (! retval) {
error("could not subscribe to events on remote server.\n");
}
#endif
info("Subscribing to experiment: %s\n", expt);
address_tuple_free(tuple);
......@@ -436,7 +417,7 @@ callback(event_handle_t handle, event_notification_t notification, void *data)
if (!success) {
error("not able to delete the subscription.\n");
elvin_error_fprintf(stderr, handle->status);
pubsub_error_fprintf(stderr, &handle->status);
} else {
exptmap.erase(key);
info("Unsubscribing from experiment: %s\n", expt);
......@@ -461,7 +442,7 @@ callback(event_handle_t handle, event_notification_t notification, void *data)
if (!success) {
error("not able to delete the subscription.\n");
elvin_error_fprintf(stderr, handle->status);
pubsub_error_fprintf(stderr, &handle->status);
}
}
......@@ -486,7 +467,6 @@ expt_callback(event_handle_t handle, event_notification_t notification, void *da
char objecttype[TBDB_FLEN_EVOBJTYPE];
char objectname[TBDB_FLEN_EVOBJNAME];
char expt[TBDB_FLEN_PID + TBDB_FLEN_EID + 1];
int plabsched = 0;
event_notification_get_objtype(handle,
notification, objecttype, sizeof(objecttype));
......@@ -499,24 +479,11 @@ expt_callback(event_handle_t handle, event_notification_t notification, void *da
}
if (strcmp(objecttype,TBDB_OBJECTTYPE_EVPROXY) != 0) {
#if 0
/*
* Filter <plabsched,1> events, and for rest resend the notification
* to the local elvind server.
*/
int ret = event_notification_get_int32(handle, notification,
TBDB_PLABSCHED, &plabsched);
if ((!ret) || (plabsched != 1)) {
#endif
if (! event_notify(localhandle, notification))
error("Failed to deliver notification!\n");
if (! event_notify(localhandle, notification))
error("Failed to deliver notification!\n");
}
}
static void
sched_callback(event_handle_t handle,
event_notification_t notification,
......@@ -529,7 +496,6 @@ sched_callback(event_handle_t handle,
/* Callback functions for asysn event subscribe */
void subscribe_callback(event_handle_t handle, int result,
......@@ -540,7 +506,7 @@ void subscribe_callback(event_handle_t handle, int result,
info("Subscription for %s added successfully.\n", (char *)data);
} else {
error("not able to add the subscription.\n");
elvin_error_fprintf(stderr, handle->status);
pubsub_error_fprintf(stderr, &handle->status);
}
free(data);
......@@ -548,41 +514,25 @@ void subscribe_callback(event_handle_t handle, int result,
}
/* Callback functions for async event subscribe/unsubscribe */
void async_callback(event_handle_t handle, int result,
event_subscription_t es, void *data) {
if (!result) {
error("Error in async callback\n");
elvin_error_fprintf(stderr, handle->status);
}
}
/* Status callback function - tries to maintain remote connection */
void status_callback(elvin_handle_t handle, char *url,
elvin_status_event_t event, void *data,
elvin_error_t status) {
switch(event) {
static void status_callback(pubsub_handle_t *handle,
pubsub_status_t status, void *data,
pubsub_error_t *ignored)
{
switch (status) {
case ELVIN_STATUS_CONNECTION_FAILED:
case PUBSUB_STATUS_CONNECTION_FAILED:
/* sleep, and try to connect again. */
error("Failed to connect to remote server");
/* XXX: may need to do something more. */
break;
case ELVIN_STATUS_CONNECTION_LOST:
case ELVIN_STATUS_PROTOCOL_ERROR:
case PUBSUB_STATUS_CONNECTION_LOST:
error("Connection loss/failure, trying to reconnect...\n");
event_stop_main(bosshandle);
//event_unregister(bosshandle);
//bosshandle = NULL;
//do_remote_register(server);
break;
case ELVIN_STATUS_CONNECTION_FOUND:
case PUBSUB_STATUS_CONNECTION_FOUND:
info("Remote connection established.");
break;
......@@ -593,7 +543,7 @@ void status_callback(elvin_handle_t handle, char *url,
void schedule_updateevent() {
/* send a message to elvind on ops */
/* send a message to event server on ops */
address_tuple_t tuple = address_tuple_alloc();
struct timeval now;
......
#
# EMULAB-COPYRIGHT
# Copyright (c) 2003-2006 University of Utah and the Flux Group.
# Copyright (c) 2003-2007 University of Utah and the Flux Group.
# All rights reserved.
#
......@@ -17,7 +17,7 @@ COMSCRIPTS = $(addprefix $(TESTBED_SRCDIR)/tmcd/common/, \
libtmcc.pm libsetup.pm libtestbed.pm startcmddone)
LINSCRIPTS = $(TESTBED_SRCDIR)/tmcd/linux/liblocsetup.pm
PLABSCRIPTS = $(addprefix $(TESTBED_SRCDIR)/tmcd/plab/, \
rc.inplab plabrusage runelvin)
rc.inplab plabrusage runevents)
OSSCRIPTS = $(addprefix $(TESTBED_SRCDIR)/os/, \
install-tarfile install-rpm)
EVSCRIPTS = $(addprefix $(TESTBED_SRCDIR)/event/lib/, \
......
#!/usr/bin/perl -w
#
# EMULAB-COPYRIGHT
# Copyright (c) 2004, 2006 University of Utah and the Flux Group.
# Copyright (c) 2004-2007 University of Utah and the Flux Group.
# All rights reserved.
#
use English;
......@@ -121,13 +121,14 @@ sub doboot()
}
#
# Fire up elvin and evproxy if we are the service sliver on this node
# Fire up event server and evproxy if we are the service sliver
# on this node
#
if (-e $SVCFILE and -x "$BINDIR/runelvin") {
print("Starting elvind and evproxy");
system("$BINDIR/runelvin");
if (-e $SVCFILE and -x "$BINDIR/runevents") {
print("Starting event server and evproxy");
system("$BINDIR/runevents");
if ($?) {
fatal("Error starting up elvind and/or evproxy!");
fatal("Error starting up event server and/or evproxy!");
}
}
......
#!/usr/bin/perl -w
#
# EMULAB-COPYRIGHT
# Copyright (c) 2006 University of Utah and the Flux Group.
# Copyright (c) 2006-2007 University of Utah and the Flux Group.
# All rights reserved.
#
#
# This script is a wrapper around the program-agent to handle
# the runtime knowledge of elvind port on local node
# the runtime knowledge of the event server port on local node
# Drag in path stuff so we can find emulab stuff.
BEGIN { require "/etc/emulab/paths.pm"; import emulabpaths; }
# Script specific goo.
$ELVIN_CONF = "/usr/local/etc/elvind.conf";
$ELVIN_CONF_ORIG = "/usr/local/etc/elvind.conf.tmpl";
$ELVIND_BIN = "/usr/local/sbin/elvind";
$EVENTSERVER = "/usr/local/libexec/pubsubd";
$EVENTPING = "$BINDIR/eventping";
$PORT_SUMMARY = "/proc/scout/ports/summary";
#
# Load the OS independent support library. It will load the OS dependent
# library and initialize itself.
#
use libsetup;
use libtmcc;
......@@ -42,8 +37,7 @@ while ($abc = <FP>) {
}
}
sub start_elvin {
sub start_eventserver {
my $port = shift;
# check in the portmap list if it's already active
......@@ -51,21 +45,13 @@ sub start_elvin {
return 0;
}
# update the port in the elvind.conf
`cp $ELVIN_CONF_ORIG $ELVIN_CONF`;
open(FP, ">>$ELVIN_CONF");
print FP "\nprotocol elvin:/tcp,none,xdr/0.0.0.0:$port\n";
close FP;
system("$ELVIND_BIN");
system("$EVENTSERVER -p $port");
# HACK: allow elvind to complete the bind/listen, as
# elvind daemonize before binding the port
sleep 1; # HACK. arbitary 1 secs.
# HACK: allow server to complete the bind/listen, and daemonize.
sleep(1);
# check if elvind is successfully able to bind the port
system("$BINDIR/elvindtest -s localhost -p $port");
# check if event server is successfully able to bind the port
system("$EVENTPING -s localhost -p $port");
if ($?) {
return 0;
}
......@@ -90,23 +76,23 @@ sub start_evproxy {
fatal("Not able to start evproxy");
}
# make a tmcc call, and let emulab know of elvind-port
# make a tmcc call, and let emulab know of event server port
if (tmcc(TMCCCMD_ELVINDPORT, "$port") < 0) {
fatal("Error sending Elvind port to emulab central!");
fatal("Error sending event server port to emulab central!");
}
}
#
# Start the elvind and evproxy
# Start the event server and evproxy
#
if ((-x "$ELVIND_BIN") && (-x "$BINDIR/evproxy")) {
my $res = start_elvin(2917); # first try 2917
if ((-x "$EVENTSERVER") && (-x "$BINDIR/evproxy")) {
my $res = start_eventserver(16505); # first try 16505
while ($res == 0) {
# select a random port between 10000 and 60000
$num = rand(50000);
$num = int($num) + 10000;
$res = start_elvin($num);
$res = start_eventserver($num);
}