Commit 2cb89492 authored by Leigh Stoller's avatar Leigh Stoller

Merge branch 'master' of git-public.flux.utah.edu:/flux/git/emulab-devel

parents d95ef44f 16b7a5f3
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2007-2010 University of Utah and the Flux Group.
* All rights reserved.
*/
#include <sys/types.h>
#include <stdio.h>
#include <errno.h>
#include <syslog.h>
#include <unistd.h>
#include <signal.h>
#include <stdarg.h>
#include <stdlib.h>
#include <netdb.h>
#include <string.h>
#include <sys/time.h>
#include <sys/param.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <time.h>
#include <assert.h>
#include <sys/types.h>
#include <setjmp.h>
#include <paths.h>
#include <pubsub/pubsub.h>
#include "log.h"
static int debug = 0;
static int verbose = 1;
static int stop = 0;
/* Forward decls */
static void pubsub_callback(pubsub_handle_t *handle,
pubsub_subscription_t *subscription,
pubsub_notification_t *notification,
void *data);
static void v0_callback(pubsub_handle_t *handle,
pubsub_subscription_t *subscription,
pubsub_notification_t *notification,
void *data);
/* A few things are easier if global */
static pubsub_handle_t *pubsub_handle;
static pubsub_handle_t *v0_handle;
char *usagestr =
"usage: pubsubv0_gateway [options]\n"
" -h Display this message\n"
" -d Turn on debugging\n"
" -v Turn on verbose logging\n"
" -p portnum Specify a pubsub port number to connect to\n"
"\n";
void
usage()
{
fprintf(stderr, "%s", usagestr);
exit(1);
}
static void
setverbose(int sig)
{
if (sig == SIGUSR1)
verbose = 1;
else
verbose = 0;
}
static void
sigterm(int sig)
{
stop = 1;
}
int
main(int argc, char **argv)
{
int ch;
int portnum = PUBSUB_SERVER_PORTNUM;
char buf[BUFSIZ];
pubsub_error_t pubsub_error;
while ((ch = getopt(argc, argv, "hdvp:")) != -1) {
switch(ch) {
case 'd':
debug++;
break;
case 'v':
verbose++;
break;
case 'p':
if (sscanf(optarg, "%d", &portnum) == 0) {
fprintf(stderr,
"Error: -p value is not a number: "
"%s\n",
optarg);
usage();
}
else if ((portnum <= 0) || (portnum >= 65536)) {
fprintf(stderr,
"Error: -p value is not between "
"0 and 65536: %d\n",
portnum);
usage();
}
break;
case 'h':
fprintf(stderr, "%s", usagestr);
exit(0);
break;
default:
usage();
}
}
argv += optind;
argc -= optind;
if (argc) {
error("Unrecognized command line arguments: %s ...\n",argv[0]);
usage();
}
if (!debug) {
loginit(1, "v0_gateway");
/* See below for daemonization */
pubsub_debug = (PUBSUB_DEBUG_CONNECT|
PUBSUB_DEBUG_RECONNECT|
PUBSUB_DEBUG_CALLBACK|
PUBSUB_DEBUG_DISPATCHER); /* XXX for now */
} else {
/* XXX backward compat */
pubsub_debug = (PUBSUB_DEBUG_CONNECT|
PUBSUB_DEBUG_RECONNECT|
PUBSUB_DEBUG_CALLBACK|
PUBSUB_DEBUG_DISPATCHER);
if (debug > 1)
pubsub_debug |= PUBSUB_DEBUG_DETAIL;
}
info("version0<->version1 gateway daemon starting\n");
/*
* XXX Need to daemonize earlier or the threads go away.
*/
if (!debug)
daemon(0, 0);
signal(SIGUSR1, setverbose);
signal(SIGUSR2, setverbose);
signal(SIGTERM, sigterm);
/*
* Stash the pid away.
*/
if (!geteuid()) {
FILE *fp;
sprintf(buf, "%s/version0_gateway.pid", _PATH_VARRUN);
fp = fopen(buf, "w");
if (fp != NULL) {
fprintf(fp, "%d\n", getpid());
(void) fclose(fp);
}
}
if (pubsub_connect("localhost",
PUBSUB_SERVER_PORTNUM, &pubsub_handle) < 0) {
error("Could not connect to pubsub server on localhost!\n");
exit(-1);
}
info("connected to pubsubd server\n");
if (pubsub_connect("localhost", portnum, &v0_handle) < 0) {
error("Could not connect to v0 pubsub on localhost!\n");
exit(-1);
}
info("connected to pubsubd server\n");
/* Subscribe to *all* events */
if (!pubsub_add_subscription(pubsub_handle, "",
pubsub_callback, NULL,
&pubsub_error)) {
error("subscription to pubsub server failed\n");
exit(-1);
}
info("pubsub subscription added\n");
/* Subscribe to *all* events */
if (!pubsub_add_subscription(v0_handle, "",
v0_callback, NULL,
&pubsub_error)) {
error("subscription to v0 server failed\n");
exit(-1);
}
info("v0 subscription added\n");
/*
* Threaded mode; stuff just happens.
*/
while (!stop) {
struct timeval tv = { 5, 0 };
select(0, NULL, NULL, NULL, &tv);
}
pubsub_disconnect(pubsub_handle);
pubsub_disconnect(v0_handle);
unlink(buf);
return 0;
}
static char notify_debug_string[2*BUFSIZ];
static int
pubsub_notify_traverse_debug(void *arg, char *name,
pubsub_type_t type, pubsub_value_t value,
pubsub_error_t *error)
{
char **bp = (char **) arg;
char buf[BUFSIZ], *tp = buf;
unsigned char *up;
int i, cc;
switch (type) {
case STRING_TYPE:
snprintf(*bp,
&notify_debug_string[sizeof(notify_debug_string)-1]-
*bp,
"%s=%s,", name, value.pv_string);
break;
case INT32_TYPE:
snprintf(*bp,
&notify_debug_string[sizeof(notify_debug_string)-1]-
*bp,
"%s=%d,", name, value.pv_int32);
break;
case OPAQUE_TYPE:
up = (unsigned char *) value.pv_opaque.data;
for (i = 0; i < value.pv_opaque.length; i++, up++) {
cc = snprintf(tp,
&buf[sizeof(buf)-1]-tp, "%02hhx", *up);
tp += cc;
if (tp >= &buf[sizeof(buf)-1])
break;
}
snprintf(*bp,
&notify_debug_string[sizeof(notify_debug_string)-1]-
*bp,
"%s=%s,", name, buf);
break;
default:
snprintf(*bp,
&notify_debug_string[sizeof(notify_debug_string)-1]-
*bp,
"%s=...,", name);
break;
}
*bp = *bp + strlen(*bp);
return 1;
}
static void
pubsub_callback(pubsub_handle_t *handle,
pubsub_subscription_t *subscription,
pubsub_notification_t *notification,
void *arg)
{
pubsub_error_t pubsub_error;
/*
* These are events coming from the version 1 server. We need to
* remove the ___elvin_ordered___ ordered flag since the client
* will not understand it (no elvin compat on them).
*/
pubsub_notification_remove(notification,
"___elvin_ordered___", &pubsub_error);
if (pubsub_notify(v0_handle, notification, &pubsub_error) < 0) {
error("Failed to send pubsub notification to v0!\n");
}
}
static void
v0_callback(pubsub_handle_t *handle,
pubsub_subscription_t *subscription,
pubsub_notification_t *notification,
void *arg)
{
pubsub_error_t pubsub_error;
/*
* These are events coming from version 0 clients. We do not
* need to do anything except forward them.
*/
if (pubsub_notify(pubsub_handle, notification, &pubsub_error) < 0) {
error("Failed to send v0 notification to pubsub!\n");
}
}
......@@ -15,9 +15,12 @@
#include <list>
#include <vector>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <fstream>
#include <sys/time.h>
using namespace std;
// For getopt
......@@ -73,6 +76,7 @@ void readArgs(int argc, char * argv[])
{
case 'd':
g::debug = true;
pubsub_debug = 1;
break;
case 's':
server = optarg;
......@@ -175,10 +179,13 @@ void callback(event_handle_t handle,
char name[EVENT_BUFFER_SIZE];
char type[EVENT_BUFFER_SIZE];
char args[EVENT_BUFFER_SIZE];
time_t basicTime = time(NULL);
struct tm * structTime = gmtime(&basicTime);
char timestamp[1024];
strftime(timestamp, 1024, "%Y-%m-%dT%H:%M:%S", structTime);
struct timeval basicTime;
gettimeofday(&basicTime, NULL);
double floatTime = basicTime.tv_sec + basicTime.tv_usec/1000000.0;
ostringstream timeStream;
timeStream << setprecision(3) << setiosflags(ios::fixed | ios::showpoint);
timeStream << floatTime;
string timestamp = timeStream.str();
if (event_notification_get_string(handle, notification, "OBJNAME", name, EVENT_BUFFER_SIZE) == 0)
{
......@@ -203,6 +210,5 @@ void callback(event_handle_t handle,
int sequence = 0;
line >> sequence;
cerr << timestamp << " name=" << name << " type=" << type
<< " sequence=" << sequence << endl;
cout << "RECEIVE: " << name << " " << sequence << " " << timestamp << endl;
}
......@@ -767,13 +767,13 @@ CONFIG_FEATURE_PS_WIDE=y
# CONFIG_FEATURE_PS_UNUSUAL_SYSTEMS is not set
# CONFIG_RENICE is not set
CONFIG_BB_SYSCTL=y
# CONFIG_TOP is not set
# CONFIG_FEATURE_TOP_CPU_USAGE_PERCENTAGE is not set
# CONFIG_FEATURE_TOP_CPU_GLOBAL_PERCENTS is not set
# CONFIG_FEATURE_TOP_SMP_CPU is not set
# CONFIG_FEATURE_TOP_DECIMALS is not set
# CONFIG_FEATURE_TOP_SMP_PROCESS is not set
# CONFIG_FEATURE_TOPMEM is not set
CONFIG_TOP=y
CONFIG_FEATURE_TOP_CPU_USAGE_PERCENTAGE=y
CONFIG_FEATURE_TOP_CPU_GLOBAL_PERCENTS=y
CONFIG_FEATURE_TOP_SMP_CPU=y
CONFIG_FEATURE_TOP_DECIMALS=y
CONFIG_FEATURE_TOP_SMP_PROCESS=y
CONFIG_FEATURE_TOPMEM=y
CONFIG_UPTIME=y
CONFIG_WATCH=y
......
This diff is collapsed.
This diff is collapsed.