Commit 585646a5 authored by Leigh B. Stoller's avatar Leigh B. Stoller
Browse files

Commit the widearea proxy client and server. These are not finished,

but they do work, so I put them into the repository.
parent 57b3bd81
......@@ -1420,3 +1420,191 @@ event_notification_check_hmac(event_handle_t handle,
return 0;
}
/*
* Support for packing and unpacking a notification. Packing a notification
* converts it to something the caller can pass around; a set of three arrays,
* types, names, values. Unpacking a notification takes those three arrays
* and returns a new notification with those contents. For packing, the
* caller provides three static arrays, and gives us the length of then. We
* store the contents in those arrays, and return the actual length. The
* arrays must be big enough ...
*/
struct pack_traverse_arg {
int maxlen;
int len;
unsigned char *data;
};
struct pack_bin {
short reclen;
short dlen;
int type;
char name[32];
unsigned char data[0];
};
/*
* The traversal function callback.
*/
static int
pack_traverse(void *rock, char *name, elvin_basetypes_t type,
elvin_value_t value, elvin_error_t status)
{
struct pack_traverse_arg *packarg = (struct pack_traverse_arg *) rock;
struct pack_bin *bin;
int dlen = 0;
unsigned char buf[BUFSIZ];
bin = (struct pack_bin *) (packarg->data + packarg->len);
switch (type) {
case ELVIN_INT32:
sprintf(buf, "%d", value.i);
break;
case ELVIN_INT64:
sprintf(buf, "%lld", value.h);
break;
case ELVIN_REAL64:
sprintf(buf, "%f", value.d);
break;
case ELVIN_STRING:
if (strlen(value.s) >= BUFSIZ) {
ERROR("pack_traverse: string too big\n");
return 0;
}
strcpy(buf, value.s);
break;
case ELVIN_OPAQUE:
if (value.o.length >= BUFSIZ) {
ERROR("pack_traverse: opaque too big\n");
return 0;
}
memcpy(buf, (unsigned char *)(value.o.data), value.o.length);
buf[value.o.length] = (unsigned char) NULL;
dlen = value.o.length + 1;
break;
default:
ERROR("invalid parameter\n");
return 0;
}
if (!dlen)
dlen = strlen(buf) + 1;
/*
* Watch for too much stuff.
*/
if (packarg->len + (dlen + sizeof(*bin)) >= packarg->maxlen) {
ERROR("pack_traverse: Not enough room at %s!\n", name);
return 0;
}
/*
* XXX Name is bogus. Fix later.
*/
if (strlen(name) >= sizeof(bin->name)) {
ERROR("pack_traverse: Name too long %s!\n", name);
return 0;
}
strcpy(bin->name, name);
bin->type = type;
bin->dlen = dlen;
bin->reclen = roundup((dlen + sizeof(*bin)), sizeof(long));
memcpy(bin->data, buf, dlen);
packarg->len += bin->reclen;
return 1;
}
/*
* Extract stuff from inside notification and return.
*/
int
event_notification_pack(event_handle_t handle,
event_notification_t notification,
unsigned char *data, int *len)
{
struct pack_traverse_arg packarg;
packarg.maxlen = *len;
packarg.len = 0;
packarg.data = data;
if (!elvin_notification_traverse(notification->elvin_notification,
pack_traverse, &packarg, handle->status)) {
return 1;
}
*len = packarg.len;
return 0;
}
/*
* Take raw data and stuff it into a notification.
*/
int
event_notification_unpack(event_handle_t handle,
event_notification_t *notification,
unsigned char *data, int len)
{
event_notification_t newnote = event_notification_alloc(handle, NULL);
int rval, offset = 0;
elvin_value_t value;
if (! newnote)
return -1;
while (offset < len) {
struct pack_bin *bin = (struct pack_bin *) (data + offset);
info("type: %d %s %s\n", bin->type, bin->name, bin->data);
switch (bin->type) {
case ELVIN_INT32:
sscanf(bin->data, "%d", &(value.i));
rval = event_notification_put_int32(handle, newnote,
bin->name, value.i);
break;
case ELVIN_INT64:
sscanf(bin->data, "%lld", &(value.h));
rval = event_notification_put_int64(handle, newnote,
bin->name, value.h);
break;
case ELVIN_REAL64:
rval = event_notification_put_double(handle, newnote,
bin->name, value.d);
sscanf(bin->data, "%lf", &(value.d));
break;
case ELVIN_STRING:
rval = event_notification_put_string(handle, newnote,
bin->name, bin->data);
break;
case ELVIN_OPAQUE:
rval = event_notification_put_opaque(handle, newnote,
bin->name, bin->data,
bin->dlen);
break;
default:
ERROR("event_notification_unpack: invalid type\n");
return 0;
}
if (!rval) {
ERROR("event_notification_unpack: insert failed\n");
return 0;
}
offset += bin->reclen;
}
*notification = newnote;
return 0;
}
......@@ -201,6 +201,12 @@ event_subscription_t event_subscribe(event_handle_t handle,
address_tuple_t tuple, void *data);
int event_notification_insert_hmac(event_handle_t handle,
event_notification_t notification);
int event_notification_pack(event_handle_t handle,
event_notification_t notification,
unsigned char *data, int *len);
int event_notification_unpack(event_handle_t handle,
event_notification_t *notification,
unsigned char *data, int len);
/* util.c */
void *xmalloc(int size);
......
......@@ -10,7 +10,7 @@ OBJDIR = ../..
SUBDIR = event/proxy
SYSTEM := $(shell uname -s)
PROGRAMS = evproxy proxytest
PROGRAMS = evproxy proxytest evproxyclient evproxyserver
include $(OBJDIR)/Makeconf
......@@ -71,8 +71,6 @@ $(PROGRAMS): ../lib/libevent.a ../lib/event.h
client-install:
$(INSTALL_PROGRAM) evproxy $(DESTDIR)$(CLIENT_BINDIR)/evproxy
$(INSTALL_PROGRAM) evproxyclient \
$(DESTDIR)$(CLIENT_BINDIR)/evproxyclient
clean:
/bin/rm -f *.o $(PROGRAMS)
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2003 University of Utah and the Flux Group.
* All rights reserved.
*/
/*
*
*/
#include <stdio.h>
#include <ctype.h>
#include <netdb.h>
#include <unistd.h>
#include <time.h>
#include <math.h>
#include <paths.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "config.h"
#include "event.h"
#include "tbdefs.h"
#include "log.h"
static int debug = 1;
static event_handle_t localhandle;
/* protos */
static int RemoteSocket(int *sockp, int *ourport);
static int ServerConnect(int, int, struct in_addr, int);
static int GetEvents(event_handle_t handle, int sock);
void
usage(char *progname)
{
fprintf(stderr, "Usage: %s [-s server] -e pid/eid\n", progname);
exit(-1);
}
int
main(int argc, char **argv)
{
char *progname;
char *server = NULL;
int serverport = 9843;
char *myeid = NULL;
char buf[BUFSIZ];
char hostname[MAXHOSTNAMELEN];
struct hostent *he;
int sock, c, ourport;
struct in_addr myip, serverip;
FILE *fp;
progname = argv[0];
while ((c = getopt(argc, argv, "ds:p:e:")) != -1) {
switch (c) {
case 'd':
debug++;
break;
case 's':
server = optarg;
break;
case 'p':
serverport = atoi(optarg);
break;
case 'e':
myeid = optarg;
break;
default:
usage(progname);
}
}
argc -= optind;
argv += optind;
if (argc)
usage(progname);
if (! myeid)
fatal("Must provide pid/eid");
if (debug)
loginit(0, 0);
else {
loginit(1, "proxyclient");
/* See below for daemonization */
}
/*
* Get our IP address. Thats how we name this host to the
* event System.
*/
if (gethostname(hostname, MAXHOSTNAMELEN) == -1) {
fatal("could not get hostname: %s\n", strerror(errno));
}
if (! (he = gethostbyname(hostname))) {
fatal("could not get IP address from hostname: %s", hostname);
}
memcpy((char *)&myip, he->h_addr, he->h_length);
/*
* If server is not specified, then it defaults to BOSSNODE.
* This allows the client to work on either users.emulab.net
* or on a client node.
*/
if (!server)
server = BOSSNODE;
if (! (he = gethostbyname(server))) {
fatal("could not get IP address for server: %s", server);
}
memcpy((char *)&serverip, he->h_addr, he->h_length);
/* Register with the event system on the local node */
localhandle = event_register("elvin://localhost", 0);
if (localhandle == NULL) {
fatal("could not register with local event system");
}
/*
* Stash the pid away.
*/
sprintf(buf, "%s/evproxy.pid", _PATH_VARRUN);
fp = fopen(buf, "w");
if (fp != NULL) {
fprintf(fp, "%d\n", getpid());
(void) fclose(fp);
}
RemoteSocket(&sock, &ourport);
ServerConnect(sock, ourport, serverip, serverport);
/*
* Do this now, once we have had a chance to fail on the above
* event system calls.
*/
if (!debug)
daemon(0, 0);
while (1) {
ServerConnect(sock, ourport, serverip, serverport);
GetEvents(localhandle, sock);
}
/* Unregister with the local event system: */
if (event_unregister(localhandle) == 0) {
fatal("could not unregister with local event system");
}
return 0;
}
/*
* Create the remote socket to talk to the server side. This is a UDP socket.
* The local port is dynamic.
*/
static int
RemoteSocket(int *sockp, int *ourport)
{
int sock, length, i;
struct sockaddr_in name;
struct timeval timeout;
/* Create socket from which to read. */
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
pfatal("opening stream socket");
}
i = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(char *)&i, sizeof(i)) < 0)
pwarning("setsockopt(SO_REUSEADDR)");
/* Create name. */
name.sin_family = AF_INET;
name.sin_addr.s_addr = INADDR_ANY;
name.sin_port = 0;
if (bind(sock, (struct sockaddr *) &name, sizeof(name))) {
pfatal("binding dgram socket");
}
/* Find assigned port value and print it out. */
length = sizeof(name);
if (getsockname(sock, (struct sockaddr *) &name, &length)) {
pfatal("getsockname");
}
info("listening on UDP port %d\n", ntohs(name.sin_port));
/*
* We use a socket level timeout instead of polling for data.
*/
timeout.tv_sec = 10;
timeout.tv_usec = 0;
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
&timeout, sizeof(timeout)) < 0)
pfatal("setsockopt(SOL_SOCKET, SO_RCVTIMEO)");
*ourport = (int) ntohs(name.sin_port);
*sockp = sock;
return 0;
}
/*
* Tell the server about us. Since the server might crash or whatever,
* we do this every minute as a heartbeat. There is no need to wait
* for a reply; if the packet does not get through then we probably do not
* need to worry about getting events from the server since those packets
* are also UDP.
*/
static int
ServerConnect(int sock, int ourport, struct in_addr serverip, int serverport)
{
struct sockaddr_in request;
char buf[BUFSIZ];
int cc;
/* Server name. */
request.sin_family = AF_INET;
request.sin_addr = serverip;
request.sin_port = htons(serverport);
strcpy(buf, "Hi!");
cc = sendto(sock, buf, strlen(buf) + 1, 0,
(struct sockaddr *)&request, sizeof(request));
if (cc <= 0) {
if (cc < 0) {
if (errno != EWOULDBLOCK)
pfatal("Writing to server socket");
errorc("Writing to server socket");
goto bad;
}
error("short write to server\n");
goto bad;
}
return 0;
bad:
return -1;
}
static int
GetEvents(event_handle_t handle, int sock)
{
unsigned char buf[2*BUFSIZ];
while (1) {
struct sockaddr_in client;
int len, cc;
event_notification_t newnote;
len = sizeof(client);
cc = recvfrom(sock, buf, sizeof(buf), 0,
(struct sockaddr *)&client, &len);
if (cc <= 0) {
if (cc < 0) {
if (errno != EWOULDBLOCK)
pfatal("Reading from socket");
break;
}
error("short read from server\n");
break;
}
event_notification_unpack(handle, &newnote, buf, cc);
event_notify(handle, newnote);
event_notification_free(handle, newnote);
}
return 0;
}
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2003 University of Utah and the Flux Group.
* All rights reserved.
*/
/*
* TODO: Check for root start. Switch to user nobody.
* Require remote client to supply key to confirm its a pid/eid member.
*/
/*
*
*/
#include <stdio.h>
#include <ctype.h>
#include <netdb.h>
#include <unistd.h>
#include <time.h>
#include <math.h>
#include <paths.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "config.h"
#include "event.h"
#include "tbdefs.h"
#include "log.h"
#define SOCKBUFSIZE (128 * 1024)
static int debug = 1;
static event_handle_t bosshandle;
static int clientsock;
/*
* This record for each client that checks in
*/
typedef struct client_record {
struct sockaddr_in address;
struct client_record *next;
} client_record_t;
client_record_t *client_records;
/*
* This record is for each pending event that needs to be sent.
*/
typedef struct event_record {
struct in_addr host;
struct event_record *next;
int dlen;
unsigned char bindata[0];
} event_record_t;
event_record_t *events_tosend;
event_record_t *events_qend; /* Last event, for queuing */
/* Locks for pending events queue */
static pthread_mutex_t event_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t event_queue_cond = PTHREAD_COND_INITIALIZER;
/* protos */
static int RemoteSocket(int *sockp, int *ourport);
static int ServerLoop(void);
static int QueueEvent(unsigned char *data, int len, char *host);
static event_record_t * DeQueueEvent(int waittime);
static void FreeEvent(event_record_t *);
static void SendEvent(event_record_t *record);
void
usage(char *progname)
{
fprintf(stderr, "Usage: %s [-s server] -e pid/eid\n", progname);
exit(-1);
}
static void
callback(event_handle_t handle,
event_notification_t notification, void *data);
int
main(int argc, char **argv)
{
address_tuple_t tuple;
char *progname;
char *server = NULL;
char *port = NULL;
char *myeid = NULL;
char pid[TBDB_FLEN_PID], eid[TBDB_FLEN_EID];
char buf[BUFSIZ], *bp;
int c, ourport;
FILE *fp;
progname = argv[0];
while ((c = getopt(argc, argv, "ds:p:e:")) != -1) {
switch (c) {
case 'd':
debug++;
break;
case 's':
server = optarg;
break;
case 'p':
port = optarg;
break;
case 'e':
myeid = optarg;
break;
default:
usage(progname);
}
}
argc -= optind;
argv += optind;
if (argc)
usage(progname);
if (! myeid)
usage(progname);