Commit 7acd041f authored by Pramod R Sanaga's avatar Pramod R Sanaga
Browse files

Changes to Dummynet and delay-agent to introduce 'Backfill' - non responsive

cross-traffic generation in the delay-agent nodes. These changes need to
be applied to FBSD54-DNODE-PUBSUB and delay-agent files from the PELAB branch
in order to create a working delay node image. "ipfw" must also be recompiled with
this modified Dummynet header.

Added a new parameter to the delay-agent event commands -- "BACKFILL", which
is set similar to bandwidth. Amount of link capacity minus backfill is the desired
available bandwidth.
parent 29e2a29e
/*
* EMULAB-COPYRIGHT
* Copyright (c) 2000-2003, 2006-2007 University of Utah and the Flux Group.
* All rights reserved.
*/
/*
* agent-callback.c --
*
* Delay node agent callback handling.
*
*/
/******************************* INCLUDES **************************/
#include "main.h"
#include "systemf.h"
#include <assert.h>
#include <math.h>
/******************************* INCLUDES **************************/
/******************************* EXTERNS **************************/
extern structlink_map *link_map;
extern int link_index;
extern structlink_map *old_map;
extern int old_length;
extern int s_dummy;
extern int debug;
extern void dump_link(structlink_map *);
extern void dump_link_map(void);
/******************************* EXTERNS **************************/
/********************************FUNCTION DEFS *******************/
static struct flowspec blankfs = { "", "", 0, 0 };
/**
* Compare two flowspecs for equality.
*
* @return Zero if the two flowspecs are equal.
*/
static
int flowspeccmp(struct flowspec *fs1, struct flowspec *fs2)
{
int retval = 1;
assert(fs1 != NULL);
assert(fs2->srcport >= 0);
assert(fs2->dstport >= 0);
assert(fs2 != NULL);
assert(fs2->srcport >= 0);
assert(fs2->dstport >= 0);
if (strcmp(fs1->dest, fs2->dest) == 0 &&
strcmp(fs1->protocol, fs2->protocol) == 0 &&
fs1->srcport == fs2->srcport &&
fs1->dstport == fs2->dstport) {
retval = 0;
}
return retval;
}
/**
* Find a structlink_map that matches the given object name and flow
* specification.
*
* @param objname The object name to search for.
* @param fs The flow specification to match.
* @return A structlink_map object that matches the given parameters or NULL.
*/
static
structlink_map_t find_map(char *objname, struct flowspec *fs)
{
structlink_map_t retval = NULL;
int i;
assert(objname != NULL);
assert(strlen(objname) > 0);
assert(fs != NULL);
for(i = 0; i < link_index && !retval; i++){
if(!strcmp(link_map[i].linkvnodes[0], objname) ||
!strcmp(link_map[i].linkvnodes[1], objname)) {
if (flowspeccmp(&link_map[i].fs, fs) == 0) {
retval = &link_map[i];
}
}
}
return retval;
}
/*
* Enable pipes for a node-to-node path in a cloud.
* For every path there is an outgoing-from-the-delay-node (incoming to
* the node) pipe handling delay and PLR. However, not every path will
* have a unique bandwidth pipe. We allow for a node to have a shared,
* incoming-to-the-delay-node (outgoing from the node) BW pipe used for
* all destinations that do not have a unique pipe.
*/
static void
activate_pipe(int mapix, char *args)
{
structlink_map_t link = &link_map[mapix];
char *redir = ">/dev/null";
if (debug)
redir = "";
/*
* If not done already, create the delay pipe
*/
if (link->inactive) {
if (link->clouddir == 3) {
if (debug)
info("activating delay/BW pipe %d\n", link->pipes[0]);
else
info(" create delay/BW pipe %d\n", link->pipes[0]);
systemf("ipfw add %d pipe %d ip from any to %s in recv %s %s",
link->pipes[0],
link->pipes[0],
link->fs.dest,
link->interfaces[0],
redir);
systemf("ipfw pipe %d config bw %d delay %d plr 0 queue %d %s",
link->pipes[0],
link->params[0].bw.bandwidth,
link->params[0].delay.delay,
link->params[0].q_size,
redir);
link->inactive = 0;
}
else if (link->clouddir == 2) {
if (debug)
info("activating delay pipe %d\n", link->pipes[0]);
else
info(" create delay pipe %d\n", link->pipes[0]);
systemf("ipfw add %d pipe %d ip from %s to any in recv %s %s",
link->pipes[0],
link->pipes[0],
link->fs.dest,
link->interfaces[0],
redir);
systemf("ipfw pipe %d config bw 0 delay %d plr %f queue %d %s",
link->pipes[0],
link->params[0].delay.delay,
(double)link->params[0].loss.plr/0x7fffffff,
link->params[0].q_size,
redir);
link->inactive = 0;
}
/*
* XXX determine if there is a flow spec involved, see if there is
* an explicit bandwidth provided. If so, we may need to create a
* flow-specific BW shaping pipe.
*/
else if (link->fs.dest[0] && strstr(args, "BANDWIDTH")) {
if (debug)
info("activating BW pipe %d\n", link->pipes[0]);
else
info(" create BW pipe %d\n", link->pipes[0]);
systemf("ipfw add %d pipe %d ip from any to %s in recv %s %s",
link->pipes[0],
link->pipes[0],
link->fs.dest,
link->interfaces[0],
redir);
systemf("ipfw pipe %d config bw %d delay 0 plr 0 queue %d %s",
link->pipes[0],
link->params[0].bw.bandwidth,
link->params[0].q_size,
redir);
link->inactive = 0;
}
}
}
/*************************** agent_callback **********************
This function is called from the event system when an event
notification is recd. from the server. It checks whether the
notification is valid (sanity check). If not print a warning,else
call handle_pipes which does the rest of thejob
*************************** agent_callback **********************/
#define LO_RULE_NO 100
#define HI_RULE_NO 59999
void agent_callback(event_handle_t handle,
event_notification_t notification, void *data)
{
#define MAX_LEN 50
char objname[MAX_LEN];
char eventtype[MAX_LEN];
char args[BUFSIZ];
struct flowspec fs = { "", "", 0, 0 };
char *dest, *srcport_str, *dstport_str, *protocol;
int i, dest_len, srcport_len, dstport_len, protocol_len;
static int lo_rule_no = LO_RULE_NO;
static int hi_rule_no = HI_RULE_NO;
char *redir = ">/dev/null";
if (debug)
redir = "";
/* get the name of the object, eg. link0 or link1*/
if(event_notification_get_string(handle,
notification,
"OBJNAME", objname, MAX_LEN) == 0 ||
strlen(objname) == 0){
error("could not get the objname \n");
return;
}
/* get the eventtype, eg up/down/modify*/
if(event_notification_get_string(handle,
notification,
"EVENTTYPE", eventtype, MAX_LEN) == 0){
error("could not get the eventtype \n");
return;
}
event_notification_get_arguments(handle,
notification, args, sizeof(args));
/*
* Get the flowspec parameters. If there are none, the default
* initialization (all zeros) will be used.
*/
dest_len = event_arg_get(args, "DEST", &dest);
protocol_len = event_arg_get(args, "PROTOCOL", &protocol);
srcport_len = event_arg_get(args, "SRCPORT", &srcport_str);
dstport_len = event_arg_get(args, "DSTPORT", &dstport_str);
if (dest_len > (int)sizeof(fs.dest)) {
error("DEST is too large: (%d>%d) %s\n", dest_len, sizeof(fs.dest), dest);
return;
}
else if (dest_len > 0) {
strncpy(fs.dest, dest, dest_len);
fs.dest[dest_len] = '\0';
}
if (protocol_len > (int)sizeof(fs.protocol)) {
error("PROTOCOL is too large\n");
return;
}
else if (protocol_len > 0) {
strncpy(fs.protocol, protocol, protocol_len);
fs.protocol[protocol_len] = '\0';
}
if (srcport_len > 0 && sscanf(srcport_str, "%d", &fs.srcport) != 1) {
error("SRCPORT is not a number\n");
return;
}
if (dstport_len > 0 && sscanf(dstport_str, "%d", &fs.dstport) != 1) {
error("DSTPORT is not a number\n");
return;
}
if (strcmp(eventtype, TBDB_EVENTTYPE_CLEAR) == 0) {
structlink_map_t lm;
/* Handle a CLEAR event. */
if (!debug) {
struct timeval tv;
gettimeofday(&tv, NULL);
info("%d.%06d: %s: CLEAR: %s\n",
tv.tv_sec, tv.tv_usec, objname, args);
}
if (dest_len > 0 && srcport_len > 0 && dstport_len > 0 &&
protocol_len > 0) {
if ((lm = find_map(objname, &fs)) == NULL) {
error("unknown flow for agent %s\n", objname);
}
else {
if (debug)
info("clearing pipe %d\n", lm->pipes[0]);
else
info(" clear pipe: %d\n", lm->pipes[0]);
/* Delete the rule/pipe and */
systemf("ipfw delete %d %s", lm->pipes[0], redir);
systemf("ipfw pipe delete %d %s", lm->pipes[0], redir);
/* ... mark the structure as free for another use. */
strcpy(lm->linkvnodes[0], "__free");
strcpy(lm->linkvnodes[1], "__free");
lm->fs = blankfs;
}
}
else {
/*
* We cannot just flush the world, because the delay node
* might be shared. So, we find all pipes associated with
* the indicated object that have non-null flow info.
*/
if (debug)
info("clearing all flow pipes for %s\n", objname);
for (lm = &link_map[0]; lm < &link_map[link_index]; lm++) {
if (strcmp(lm->linkname, objname) != 0 ||
strcmp(lm->linkvnodes[0], "__free") == 0 ||
lm->fs.dest[0] == '\0')
continue;
if (!lm->inactive) {
if (debug)
info("clearing pipe %d\n", lm->pipes[0]);
else
info(" clear pipe: %d\n", lm->pipes[0]);
/* Delete the rule/pipe and */
systemf("ipfw delete %d %s", lm->pipes[0], redir);
systemf("ipfw pipe delete %d %s", lm->pipes[0], redir);
}
/* ... mark the structure as free for another use. */
strcpy(lm->linkvnodes[0], "__free");
strcpy(lm->linkvnodes[1], "__free");
lm->fs = blankfs;
}
if (link_map != old_map) {
free(link_map);
link_map = old_map;
link_index = old_length;
lo_rule_no = LO_RULE_NO;
hi_rule_no = HI_RULE_NO;
}
}
if (debug)
dump_link_map();
return;
}
if (strcmp(eventtype, TBDB_EVENTTYPE_CREATE) == 0) {
if (!debug) {
struct timeval tv;
gettimeofday(&tv, NULL);
info("%d.%06d: %s: CREATE: %s\n",
tv.tv_sec, tv.tv_usec, objname, args);
}
if (dest_len == -1) {
if (debug)
info("creating per-host pipes for %s\n", objname);
if (link_map == old_map) {
link_map = NULL;
link_index = 0;
}
/*
* For every link we were managing, create two pipes for that link
* and a particular destination. The set of destinations are currently
* pulled from the /etc/hosts file.
*/
for (i = 0; i < old_length; i++) {
struct hostent *he;
/* preserve original rules */
realloc_map();
link_map[link_index] = old_map[i];
link_index++;
/* if not related to our link, we are done */
if (strcmp(old_map[i].linkname, objname) != 0)
continue;
while ((he = gethostent()) != NULL) {
char dest[32];
int j;
inet_ntop(he->h_addrtype, he->h_addr, dest, sizeof(dest));
/* addresses we don't care about */
if (strcmp(dest, "127.0.0.1") == 0 || strcmp(dest, "0.0.0.0") == 0)
continue;
/* XXX can only handle duplex links/lans (need two pipes) */
if (old_map[i].numpipes < 2)
continue;
/*
* Pipe 0 is for delay and will be created for all flows
* Pipe 1 is for BW and will only created if a MODIFY (with DEST=)
* event explicitly specifies a BW.
*/
for (j = 0; j < old_map[i].numpipes; j++) {
realloc_map();
link_map[link_index] = old_map[i];
link_map[link_index].islan = 0;
if (old_map[i].islan) {
link_map[link_index].clouddir = j ? 2 : 1;
} else {
/*
* We don't have enough pipes to do the hybrid technique
* for a link. We just arrange to set delay and BW on the
* same link.
*/
link_map[link_index].clouddir = 3;
}
link_map[link_index].numpipes = 1;
strncpy(link_map[link_index].fs.dest, dest,
sizeof(link_map[link_index].fs.dest));
link_map[link_index].inactive = 1;
link_map[link_index].pipes[0] = hi_rule_no;
link_map[link_index].interfaces[0] = old_map[i].interfaces[j];
link_map[link_index].params[0] = old_map[i].params[j];
hi_rule_no -= 1;
link_index += 1;
}
}
endhostent();
}
}
else if (srcport_len <= 0 && dstport_len <= 0 && protocol_len <= 0) {
error("CREATE event missing SRCPORT, DSTPORT, and PROTOCOL args\n");
}
else {
struct flowspec mainfs = { "", "", 0, 0 };
structlink_map_t mainlm, lm;
strcpy(mainfs.dest, fs.dest);
mainlm = find_map(objname, &mainfs);
if (mainlm == NULL) {
error("No such agent: %s\n", objname);
}
else if ((lm = find_map(objname, &fs)) != NULL) {
info("%s flow already exists\n", objname);
}
else {
int rule_no;
if ((lm = find_map("__free", &blankfs)) == NULL) {
/* No free structlink_map objects, allocate a new one. */
realloc_map();
/* XXX need to relocate the basis pipe due to realloc */
if ((mainlm = find_map(objname, &mainfs)) == NULL) {
error("No such agent: %s\n", objname);
}
lm = &link_map[link_index];
link_index += 1;
rule_no = lo_rule_no;
lo_rule_no += 1;
}
else {
/* Reuse an existing structlink_map object. */
rule_no = lm->pipes[0];
}
if (debug)
info("creating per-flow pipe\n");
else
info(" create flow pipe %d\n", rule_no);
*lm = *mainlm;
lm->clouddir = 0;
lm->inactive = 0;
lm->islan = 0;
lm->numpipes = 1;
lm->fs = fs;
lm->pipes[0] = rule_no;
systemf("ipfw add %d pipe %d %s from any to %s "
"src-port %d dst-port %d in recv %s %s",
lm->pipes[0],
lm->pipes[0],
lm->fs.protocol,
lm->fs.dest,
lm->fs.srcport,
lm->fs.dstport,
lm->interfaces[0],
redir);
/*
* Initialize its characteristics from the "basis" pipe.
* Note that if the basis is a funky BW only pipe, we need to
* extract the delay/PLR from the following pipe.
*/
lm->params[0] = mainlm->params[0];
assert(mainlm->clouddir != 2);
if (mainlm->clouddir == 1) {
assert((mainlm+1) < &link_map[link_index]);
lm->params[0].delay = (mainlm+1)->params[0].delay;
lm->params[0].loss = (mainlm+1)->params[0].loss;
}
systemf("ipfw pipe %d config bw %d delay %d plr 0 queue %d %s",
lm->pipes[0],
lm->params[0].bw.bandwidth,
lm->params[0].delay.delay,
lm->params[0].q_size,
redir);
}
}
if (debug)
dump_link_map();
return;
}
/*
* We could be an agent for several nodes on the same lan, so need to
* loop over the pipe sets and possibly repeat all the work multiple
* times. Sigh.
*/
for(i = 0; i < link_index; i++){
if(!strcmp(link_map[i].linkname, objname) ||
!strcmp(link_map[i].linkvnodes[0], objname) ||
!strcmp(link_map[i].linkvnodes[1], objname)) {
if (flowspeccmp(&link_map[i].fs, &fs) == 0) {
handle_pipes(objname, eventtype, args, i);
}
}
}
}
/******************** handle_pipes ***************************************
This dispatch function checks the event type and dispatches to the appropriate
routine to handle
******************** handle_pipes ***************************************/
void handle_pipes (char *objname, char *eventtype, char *args, int l_index)
{
/*link_map[index] contains the relevant info*/
if (!debug) {
struct timeval tv;
gettimeofday(&tv, NULL);
info("%d.%06d: %s(%d): %s: %s\n",
tv.tv_sec, tv.tv_usec, objname, l_index, eventtype, args);
}
if(strcmp(eventtype, TBDB_EVENTTYPE_UP) == 0){
handle_link_up(objname, l_index);
}
else if(strcmp(eventtype, TBDB_EVENTTYPE_DOWN) == 0){
handle_link_down(objname, l_index);
}
else if(strcmp(eventtype, TBDB_EVENTTYPE_MODIFY) == 0){
handle_link_modify(objname, l_index, args);
}
else error("unknown link event type\n");
if(debug){
system ("echo ======================================== >> /tmp/ipfw.log");
system("(date;echo PARAMS ; ipfw pipe show all) >> /tmp/ipfw.log");
}
}
/******************* handle_link_up **************************
This handles the link_up event. If link is already up, it returns
without doing anything. If link is down, it calls set_link_params
for looking up the link params in the link_table and configuring
them into dummynet
******************* handle_link_up ***************************/
void handle_link_up(char * linkname, int l_index)
{
/* get the pipe params from the params field of the
link_map table. Set the pipe params in dummynet
*/
if (debug) {
info("==========================================\n");
info("recd. UP event for link = %s\n", linkname);
}
/* no need to do anything if link is already up*/
if(link_map[l_index].stat == LINK_UP)
return;
link_map[l_index].stat = LINK_UP;
set_link_params(l_index, 0, -1);
}
/******************* handle_link_down **************************
This handles the link_down event. If link is already down, it returns
without doing anything. If link is up, it calls get_link_params
to populate the link_map table with current params. It then calls
set_link_params to write back the pipe params, but with plr = 1.0,
so that the link goes down
******************* handle_link_down***************************/
void handle_link_down(char * linkname, int l_index)
{
/* get the pipe params from dummynet