Commit c6f58671 authored by Mike Hibler's avatar Mike Hibler

Wait til we get the TIME START event to do connection setup. This will more

closely synch the sender/consumer startup.

Support -N name option to uniquely identify the trafgen on a machine
trafgen uses the name to further qualify event selection.

Support -l logname option to turn on TG's per-packet logging.  There is
no NS interface to this right now.  You would have to turn this on manually
by restarting the trafgens and then restarting the event-sched to replay
events.

Turn on some debugging printfs for now til things have stablized a bit.
The trafficconfig function in libsetup.pm currently redirect stdout/stderr
to a file in /tmp.
parent d29e4189
......@@ -16,10 +16,13 @@
#include "protocol.h"
/* event library stuff */
#include "tbdefs.h"
#include "event.h"
static char *progname;
static event_handle_t ehandle;
static tg_action actions[2]; /* large enough for setup/wait sequence */
static char *logfile;
extern int gotevent;
extern protocol prot;
......@@ -33,8 +36,8 @@ static void
usage()
{
fprintf(stderr,
"Usage: %s [-s serverip] [-p serverport] "
"[ -T targetip.targetport ] [-P proto] [-R role]\n",
"Usage: %s [-s serverip] [-p serverport] [-l logfile]"
"[ -N name ] [ -T targetip.targetport ] [-P proto] [-R role]\n",
progname);
exit(-1);
}
......@@ -63,6 +66,7 @@ tgevent_init(int argc, char *argv[])
char *server = NULL;
char *port = NULL;
char *ipaddr = NULL;
char *myname = NULL;
char buf[BUFSIZ], ipbuf[BUFSIZ];
struct sockaddr tmp;
int c;
......@@ -70,7 +74,7 @@ tgevent_init(int argc, char *argv[])
progname = argv[0];
memset(&tmp, 0, sizeof(tmp));
while ((c = getopt(argc, argv, "s:p:T:P:R:")) != -1) {
while ((c = getopt(argc, argv, "s:p:T:P:R:N:l:")) != -1) {
switch (c) {
case 's':
server = optarg;
......@@ -90,6 +94,13 @@ tgevent_init(int argc, char *argv[])
strcmp(optarg, "sink") == 0)
prot.qos |= QOS_SERVER;
break;
case 'N':
myname = optarg;
break;
case 'l':
logfile = optarg;
break;
default:
usage();
}
......@@ -151,15 +162,12 @@ tgevent_init(int argc, char *argv[])
if (tuple == NULL) {
fatal("could not allocate an address tuple");
}
/*
* Change this stuff as needed.
*/
tuple->host = ipaddr;
tuple->site = ADDRESSTUPLE_ANY;
tuple->group = ADDRESSTUPLE_ANY;
tuple->expt = ADDRESSTUPLE_ANY; /* pid/eid */
tuple->objtype = OBJECTTYPE_TRAFGEN;
tuple->objname = ADDRESSTUPLE_ANY;
tuple->objtype = TBDB_OBJECTTYPE_TRAFGEN;
tuple->objname = myname ? myname : ADDRESSTUPLE_ANY;
tuple->eventtype = ADDRESSTUPLE_ANY;
/*
......@@ -173,9 +181,30 @@ tgevent_init(int argc, char *argv[])
/*
* Subscribe to the event we specified above.
*/
if (! event_subscribe(ehandle, callback, tuple, NULL)) {
fatal("could not subscribe to event");
if (!event_subscribe(ehandle, callback, tuple, NULL)) {
fatal("could not subscribe to TRAFGEN events");
}
address_tuple_free(tuple);
/*
* Also subscribe to the start time event which we use to
* synchronize connection setup.
*/
tuple = address_tuple_alloc();
if (tuple == NULL) {
fatal("could not allocate an address tuple");
}
tuple->host = ADDRESSTUPLE_ANY;
tuple->site = ADDRESSTUPLE_ANY;
tuple->group = ADDRESSTUPLE_ANY;
tuple->expt = ADDRESSTUPLE_ANY; /* pid/eid */
tuple->objtype = TBDB_OBJECTTYPE_TIME;
tuple->objname = ADDRESSTUPLE_ANY;
tuple->eventtype = ADDRESSTUPLE_ANY;
if (!event_subscribe(ehandle, callback, tuple, NULL)) {
fatal("could not subscribe to TIME events");
}
address_tuple_free(tuple);
#endif
}
......@@ -192,9 +221,10 @@ tgevent_shutdown(void)
#ifndef TESTING
#define STARTEVENT "START"
#define STOPEVENT "STOP"
#define MODIFYEVENT "MODIFY"
#define STARTEVENT TBDB_EVENTTYPE_START
#define STOPEVENT TBDB_EVENTTYPE_STOP
#define MODIFYEVENT TBDB_EVENTTYPE_MODIFY
#define TIMESTARTS TBDB_EVENTTYPE_START
#define DEFAULT_PKT_SIZE 64
......@@ -273,7 +303,7 @@ parse_args(char *buf, tg_action *tg)
dist_const_init(&tg->length, psize);
tg->tg_flags |= (TG_ARRIVAL|TG_LENGTH);
#if 1
#if 0
fprintf(stderr, "parse_args: new psize=%d, interval=%f\n",
psize, interval);
#endif
......@@ -288,6 +318,8 @@ callback(event_handle_t handle, event_notification_t notification, void *data)
int len = 64;
static int startdone;
buf[0][0] = buf[1][0] = buf[2][0] = buf[3][0] = 0;
buf[4][0] = buf[5][0] = buf[6][0] = buf[7][0] = 0;
event_notification_get_site(handle, notification, buf[0], len);
event_notification_get_expt(handle, notification, buf[1], len);
event_notification_get_group(handle, notification, buf[2], len);
......@@ -301,16 +333,46 @@ callback(event_handle_t handle, event_notification_t notification, void *data)
{
struct timeval now;
gettimeofday(&now, NULL);
printf("Event: %lu %s %s %s %s %s %s %s %s\n", now.tv_sec,
buf[0], buf[1], buf[2],
buf[3], buf[4], buf[5], buf[6], buf[7]);
fprintf(stderr,
"Event: %lu %s %s %s %s %s %s %s %s\n", now.tv_sec,
buf[0], buf[1], buf[2],
buf[3], buf[4], buf[5], buf[6], buf[7]);
}
#endif
/*
* Perform a setup when we get the "start of time" event.
* XXX setup sink slightly ahead of source?
*/
if (strcmp(buf[4], TBDB_OBJECTTYPE_TIME) == 0) {
if (strcmp(buf[6], TIMESTARTS) == 0) {
/*
* XXX event daemon may have been restarted,
* avoid redoing setup, just go back to waiting.
*/
if (startdone)
goto dowait;
if (logfile) {
struct timeval now;
extern int FlushOutput;
FlushOutput = 1;
gettimeofday(&now, NULL);
start_log(now, NULL);
}
if (tg_first != &actions[0])
fatal("global action list corrupted!");
actions[0].tg_flags = TG_SETUP;
actions[0].next = &actions[1];
actions[1].tg_flags = TG_WAIT;
}
}
/*
* If eventtype is STOP, execute an infinite wait
*/
if (strcmp(buf[6], STOPEVENT) == 0) {
else if (strcmp(buf[6], STOPEVENT) == 0) {
dowait:
tg_first->tg_flags &= ~TG_SETUP;
tg_first->tg_flags |= TG_WAIT;
......@@ -351,23 +413,43 @@ void
tgevent_loop(void)
{
int err;
tg_action setup[2];
struct timeval now;
gettimeofday(&now, NULL);
#if 1
fprintf(stderr, "trafgen: %lu: started\n", now.tv_sec);
#endif
/*
* XXX hand build setup followed by wait forever commands.
* XXX hand build a wait forever command.
* Setup happens when the first event arrives.
*/
memset(setup, 0, sizeof(setup));
setup[0].tg_flags = TG_SETUP;
setup[0].next = &setup[1];
setup[1].tg_flags = TG_WAIT;
setup[1].next = NULL;
memset(actions, 0, sizeof(actions));
actions[0].tg_flags = TG_WAIT;
if (logfile) {
extern char prefix[], suffix[];
extern char *ofile;
char *cp = logfile;
ofile = logfile;
strcpy(prefix, strsep(&cp, "."));
if (cp)
strcpy(suffix, cp);
else
strcpy(suffix, "log");
#if 1
fprintf(stderr, "trafgen: logfile=%s.%s\n", prefix, suffix);
#endif
}
/*
* We loop in here til done
*/
tg_first = &setup[0];
tg_first = &actions[0];
do_actions();
log_close();
tgevent_shutdown();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment