Commit b2298e01 authored by Jonathon Duerig's avatar Jonathon Duerig

Integrated backend and frontend. Backend still just passes on events to the original delay agent.

parent ad120c5f
......@@ -7,56 +7,78 @@ using namespace std;
EventPipe::EventPipe(std::string const & name)
{
agentName = name;
agentName = name;
}
EventPipe::~EventPipe()
{
}
EventPipe::reset(void)
void EventPipe::reset(void)
{
string command;
command = "/usr/testbed/bin/tevc -e "
+ g::experimentName
+ " now " + agentName +" MODIFY " +
+ g::experimentName + " now " + agentName +" UP ";
system(command.c_str());
command = "/usr/testbed/bin/tevc -e "
+ g::experimentName + " now " + agentName +" MODIFY "
+ "BANDWIDTH=100000 "
+ "DELAY=0 "
+ "PLR=0";
system(command);
system(command.c_str());
}
EventPipe::resetParameter(Parameter const & newParameter)
void EventPipe::resetParameter(Parameter const & newParameter)
{
string parameterName;
int parameterValue;
stringstream ss;
string valueString;
string command;
parameterValue = parameter.getValue();
switch(newParameter.getType()) {
case BANDWIDTH:
parameterString = "BANDWIDTH";
ss << parameterValue;
break;
case DELAY:
parameterString = "DELAY";
ss << parameterValue;
break;
case LOSS:
parameterString = "LOSS";
ss << parameterValue;
break;
}
valueString = ss.str();
command = "/usr/testbed/bin/tevc -e " +
projectName + '/' + experimentName +
" now " + agentName + " MODIFY " +
parameterString + '=' + valueString;
system(command);
string parameterString;
stringstream ss;
string valueString;
string command;
int parameterValue = newParameter.getValue();
if (newParameter.getType() == Parameter::LINK_UP)
{
string action;
if (parameterValue == 1)
{
action = "UP";
}
else
{
action = "DOWN";
}
command = "/usr/testbed/bin/tevc -e "
+ g::experimentName + " now " + agentName + " " + action;
system(command.c_str());
}
else
{
switch(newParameter.getType())
{
case Parameter::BANDWIDTH:
parameterString = "BANDWIDTH";
ss << parameterValue;
break;
case Parameter::DELAY:
parameterString = "DELAY";
ss << parameterValue;
break;
case Parameter::LOSS:
parameterString = "LOSS";
ss << parameterValue;
break;
default:
break;
}
valueString = ss.str();
command = "/usr/testbed/bin/tevc -e "
+ g::experimentName + " now "
+ agentName + " MODIFY "
+ parameterString + '=' + valueString;
system(command.c_str());
}
}
......@@ -10,6 +10,8 @@
#ifndef EVENT_PIPE_HH_DELAY_AGENT_1
#define EVENT_PIPE_HH_DELAY_AGENT_1
#include "RootPipe.hh"
class EventPipe : public RootPipe
{
public:
......@@ -20,7 +22,7 @@ public:
virtual void reset(void);
virtual void resetParameter(Parameter const & newParameter);
private:
string agentName;
std::string agentName;
};
#endif
......@@ -2,14 +2,15 @@
all: delay-agent
COMMON_OBJS += main.o
COMMON_OBJS += main.o Pipe.o Parameter.o lib.o RootPipe.o linkCallback.o resetCallback.o
LINUX_OBJS += LinuxPipe.o
FREEBSD_OBJS += FreeBSDPipe.o
EVENT_OBJS += EventPipe.o
OBJS += $(COMMON_OBJS)
LIBS += $(COMMON_LIBS)
CXXFLAGS = -Wall
LIBS += $(COMMON_LIBS) -lcrypto -lpubsub
CXXFLAGS = -DELVIN_COMPAT -Wall -I../../lib/ -I/usr/local/include
LDFLAGS = -L/usr/local/lib
OS =? `uname -s`
ifeq ($(OS), "FreeBSD")
......@@ -27,13 +28,24 @@ ifeq ($(OS), "Linux")
endif
%.o: %.cc
$(CXX) $(CXXFLAGS) -c $< $@
$(CXX) $(CXXFLAGS) -c $< -o $@
delay-agent: $(OBJS)
$(CXX) $(LDFLAGS) -o $@ $< $(LIBS)
delay-agent: $(OBJS) $(COMMON_OBJS) $(EVENT_OBJS) libevent.a
$(CXX) $(LDFLAGS) -o $@ $^ $(LIBS)
event-delay-agent: $(COMMON_OBJS) $(EVENT_OBJS)
$(CXX) $(LDFLAGS) -o $@ $< $(LIBS)
event-delay-agent: $(COMMON_OBJS) $(EVENT_OBJS) libevent.a
$(CXX) $(LDFLAGS) -o $@ $^ $(LIBS)
clean:
rm -f *.o delay-agent event-delay-agent
rm -f *.o *.a delay-agent event-delay-agent
libevent.a: event.o util.o
$(AR) crv libevent.a event.o util.o
ranlib libevent.a
event.o: ../../lib/event.c
gcc $(CXXFLAGS) -c -o event.o $<
util.o: ../../lib/util.c
gcc $(CXXFLAGS) -c -o util.o $<
......@@ -11,7 +11,7 @@ Parameter::Parameter(ParameterType newType, int newValue)
{
}
ParameterType Parameter::getType(void) const
Parameter::ParameterType Parameter::getType(void) const
{
return type;
}
......
......@@ -10,13 +10,16 @@ class Parameter
public:
enum ParameterType
{
NOTHING,
BANDWIDTH,
DELAY,
// This is the loss rate in thousandths. 1000 is 100% loss. 1 is 0.1% loss. Etc.
LOSS
LOSS,
// 1 means reset to link up. 0 means reset to link down.
LINK_UP
};
public:
Parameter(ParameterType newType=DELAY, int newValue=0);
Parameter(ParameterType newType=NOTHING, int newValue=0);
ParameterType getType(void) const;
int getValue(void) const;
......
......@@ -13,7 +13,7 @@ Pipe::Pipe(RootPipe * newSecret)
Pipe::~Pipe()
{
release(secret);
secret->release();
}
Pipe::Pipe(Pipe const & right)
......@@ -25,7 +25,7 @@ Pipe::Pipe(Pipe const & right)
Pipe & Pipe::operator=(Pipe const & right)
{
right.secret->acquire();
secret.release();
secret->release();
secret = right.secret;
return *this;
}
......
// PipeInfo.hh
#ifndef PIPE_INFO_HH_DELAY_AGENT_1
#define PIPE_INFO_HH_DELAY_AGENT_1
#include "Pipe.hh"
#include "EventPipe.hh"
// Each pipe here is uni-directional. Multiple directions are
// accounted for with multiple pipes.
struct PipeInfo
{
PipeInfo(std::string const & newName,
std::string const & newEndName,
bool newIsLan,
std::string const & newInterface,
std::string const & newPipeData)
: name(newName)
, endName(newEndName)
, linkName(newName + "-" + newEndName)
, isLan(newIsLan)
, parameters(g::defaultParameters)
, interface(newInterface)
, pipeData(newPipeData)
, rawPipe(new EventPipe(linkName))
{
}
// The name of the delay node for this pipe. There may be multiple
// pipes with the same delay node name. Ex: link0, link1, etc.
std::string name;
// The name of the node which is the end point of this pipe. Ex: nodeA, nodeB, etc.
std::string endName;
// The name of this pipe as a link name. Ex: link0-nodeA, link1-nodeB, etc.
std::string linkName;
bool isLan;
std::map<Parameter::ParameterType, Parameter> parameters;
// The interface name on the delay node: Ex: fxp0, fxp1, eth0, etc.
std::string interface;
// Implementation-specific data about the pipe. Might be a pipe number, for instance.
std::string pipeData;
// rawPipe must follow linkName in declaration order
Pipe rawPipe;
};
#endif
......@@ -6,6 +6,27 @@ using namespace std;
namespace g
{
string projectName;
string experimentName;
bool debug = false;
map<Parameter::ParameterType, Parameter> defaultParameters;
multimap<std::string, std::list<PipeInfo>::iterator> pipes;
list<PipeInfo> pipeVault;
}
int stringToInt(std::string const & val)
{
int result = 0;
istringstream stream(val);
stream >> result;
return result;
}
string intToString(int val)
{
ostringstream stream;
stream << val;
return stream.str();
}
......@@ -11,11 +11,43 @@
#include <vector>
#include <iostream>
#include <sstream>
#include <fstream>
#include "event.h"
#define TBDB_OBJECTTYPE_LINK "LINK"
#define TBDB_EVENTTYPE_UP "UP"
#define TBDB_EVENTTYPE_DOWN "DOWN"
#define TBDB_EVENTTYPE_MODIFY "MODIFY"
#define TBDB_EVENTTYPE_RESET "RESET"
#define TBDB_EVENTTYPE_COMPLETE "COMPLETE"
enum { EVENT_BUFFER_SIZE = 50 };
int stringToInt(std::string const & val);
std::string intToString(int val);
void resetCallback(event_handle_t handle,
event_notification_t notification, void *data);
void linkCallback(event_handle_t handle,
event_notification_t notification, void *data);
#include "Parameter.hh"
namespace g
{
extern std::string projectName;
extern std::string experimentName;
extern bool debug;
extern std::map<Parameter::ParameterType, Parameter> defaultParameters;
}
#include "PipeInfo.hh"
namespace g
{
extern std::multimap<std::string, std::list<PipeInfo>::iterator> pipes;
extern std::list<PipeInfo> pipeVault;
}
#endif
// linkCallback.cc
#include "lib.hh"
#include "PipeInfo.hh"
using namespace std;
void handleEvent(event_handle_t handle,
event_notification_t notification,
string const & type,
list<PipeInfo>::iterator pipe);
void handleModify(event_handle_t handle,
event_notification_t notification,
list<PipeInfo>::iterator pipe);
Parameter parseArg(string const & arg);
void changeParameter(list<PipeInfo>::iterator pipe, Parameter const & newParameter);
void linkCallback(event_handle_t handle,
event_notification_t notification,
void *data)
{
char name[EVENT_BUFFER_SIZE];
char type[EVENT_BUFFER_SIZE];
if (event_notification_get_string(handle, notification, "OBJNAME", name, EVENT_BUFFER_SIZE) == 0)
{
cerr << "EVENT: Could not get the object name" << endl;
return;
}
if (event_notification_get_string(handle, notification, "EVENTTYPE", type, EVENT_BUFFER_SIZE) == 0)
{
cerr << "EVENT: Could not get the event type" << endl;
return;
}
multimap<string, list<PipeInfo>::iterator>::iterator pos = g::pipes.lower_bound(name);
multimap<string, list<PipeInfo>::iterator>::iterator limit = g::pipes.upper_bound(name);
for (; pos != limit; ++pos)
{
handleEvent(handle, notification, type, pos->second);
}
}
void handleEvent(event_handle_t handle,
event_notification_t notification,
string const & type,
list<PipeInfo>::iterator pipe)
{
if (type == TBDB_EVENTTYPE_UP)
{
changeParameter(pipe, Parameter(Parameter::LINK_UP, 1));
}
else if (type == TBDB_EVENTTYPE_DOWN)
{
changeParameter(pipe, Parameter(Parameter::LINK_UP, 0));
}
else if (type == TBDB_EVENTTYPE_MODIFY)
{
handleModify(handle, notification, pipe);
}
else
{
cerr << "EVENT: Uknown link type" << endl;
}
}
void handleModify(event_handle_t handle,
event_notification_t notification,
list<PipeInfo>::iterator pipe)
{
char args[EVENT_BUFFER_SIZE];
if (event_notification_get_string(handle, notification,
"ARGS", args, EVENT_BUFFER_SIZE) == 0)
{
cerr << "EVENT: Could not get event arguments" << endl;
return;
}
istringstream line(args);
string arg;
line >> arg;
while (line)
{
Parameter newParameter = parseArg(arg);
if (newParameter.getType() != Parameter::NOTHING)
{
changeParameter(pipe, newParameter);
}
line >> arg;
}
}
Parameter parseArg(string const & arg)
{
Parameter result;
size_t equalsPos = arg.find('=');
if (equalsPos == string::npos)
{
cerr << "EVENT: Argument is not in the form of key=value: " << arg << endl;
return result;
}
std::string key = arg.substr(0, equalsPos);
std::string valueString = arg.substr(equalsPos + 1);
if (key == "BANDWIDTH" || key == "bandwidth")
{
int value = stringToInt(valueString);
result = Parameter(Parameter::BANDWIDTH, value);
}
else if (key == "DELAY" || key == "delay")
{
int value = stringToInt(valueString);
result = Parameter(Parameter::DELAY, value);
}
else if (key == "PLR" || key == "plr")
{
// TODO: Implement PLR
cerr << "EVENT: PLR is not yet implmented" << endl;
}
else
{
cerr << "EVENT: Unknown key: " << key << "=" << valueString << endl;
}
return result;
}
void changeParameter(list<PipeInfo>::iterator pipe, Parameter const & newParameter)
{
Parameter & oldParameter = pipe->parameters[newParameter.getType()];
if (newParameter.getType() == Parameter::LINK_UP
&& newParameter.getValue() == 1
&& oldParameter.getValue() == 0)
{
// Bring the link up
oldParameter = newParameter;
pipe->rawPipe.reset();
map<Parameter::ParameterType, Parameter>::iterator pos = pipe->parameters.begin();
map<Parameter::ParameterType, Parameter>::iterator limit = pipe->parameters.end();
for (; pos != limit; ++pos)
{
if (pos->second.getType() != Parameter::LINK_UP)
{
pipe->rawPipe.resetParameter(pos->second);
}
}
}
else if (oldParameter.getValue() != newParameter.getValue())
{
oldParameter = newParameter;
if (pipe->parameters[Parameter::LINK_UP].getValue() == 1)
{
pipe->rawPipe.resetParameter(newParameter);
}
}
}
// main.cc
#include "lib.hh"
using namespace std;
// For getopt
#include <unistd.h>
void setupDefaultParameters(void);
void readArgs(int argc, char * argv[]);
void usage(char * name);
void initLogging(string const & logFile);
// Reads the map file, initializes the pipe and pipeVault data
// structure, and sets up the two subscription strings for events.
void readMapFile(string const & mapFile, string & linkSubscribe, string & resetSubscribe);
// Adds a name to a comma-delimited list.
void addEntry(string & commaList, string const & newName);
void writePidFile(string const & pidFile);
void initEvents(string const & server, string const & port, string const & keyFile,
string const & linkSubscribe, string const & resetSubscribe);
void subscribeLink(event_handle_t handle, address_tuple_t eventTuple, string const & linkSubscription);
void subscribeReset(event_handle_t handle, address_tuple_t eventTuple, string const & resetSubscription);
int main(int argc, char * argv[])
{
//TODO: Daemonize
setupDefaultParameters();
readArgs(argc, argv);
return 0;
}
void setupDefaultParameters(void)
{
g::defaultParameters[Parameter::BANDWIDTH] = Parameter(Parameter::BANDWIDTH, 100000);
g::defaultParameters[Parameter::DELAY] = Parameter(Parameter::DELAY, 0);
g::defaultParameters[Parameter::LOSS] = Parameter(Parameter::LOSS, 0);
g::defaultParameters[Parameter::LINK_UP] = Parameter(Parameter::LINK_UP, 1);
}
void readArgs(int argc, char * argv[])
{
string server;
string port;
string keyFile;
string linkSubscribe;
string resetSubscribe;
string mapFile;
string logFile;
string pidFile;
// Prevent getopt from printing an error message.
opterr = 0;
/* get params from the optstring */
int option = getopt(argc, argv, "s:p:f:dE:l:i:k:j");
while (option != -1)
{
switch (option)
{
case 'd':
g::debug = true;
break;
case 's':
server = optarg;
break;
case 'p':
port = optarg;
break;
case 'f':
mapFile = optarg;
break;
case 'l':
logFile = optarg;
break;
case 'i':
pidFile = optarg;
break;
case 'E':
g::experimentName = optarg;
break;
case 'k':
keyFile = optarg;
break;
case 'j':
// TODO: Figure out what this does
cerr << "-j not yet implemented" << endl;
// myvnode = optarg;
break;
case '?':
default:
usage(argv[0]);
break;
}
}
/*Check if all params are specified, otherwise, print usage and exit*/
if(server == "" || mapFile == "" || g::experimentName == "")
usage(argv[0]);
initLogging(logFile);
readMapFile(mapFile, linkSubscribe, resetSubscribe);
writePidFile(pidFile);
initEvents(server, port, keyFile, linkSubscribe, resetSubscribe);
}
void usage(char * name)
{
cerr << "Usage: " << name << " -E proj/exp -s server -f mapFile [-d] [-p port] [-l logFile] "
<< "[-i pidFile] [-k keyFile] [-j myvnode???]" << endl;
exit(-1);
}
void initLogging(string const & /*logFile*/)
{
}
void readMapFile(string const & mapFile, string & linkSubscribe, string & resetSubscribe)
{
ifstream in(mapFile.c_str(), ios::in);
string lineString;
getline(in, lineString);
while (in)
{
istringstream line(lineString);
string name;
string type;
string end;
string interface;
string pipeData;
line >> name >> type;
if (type == "duplex")
{
string secondEnd;
string secondInterface;
string secondPipeData;
line >> end >> secondEnd;
line >> interface >> secondInterface;
line >> pipeData >> secondPipeData;
PipeInfo first(name, end, end == secondEnd, interface, pipeData);
g::pipeVault.push_front(first);
g::pipes.insert(make_pair("new-" + first.name, g::pipeVault.begin()));
g::pipes.insert(make_pair("new-" + first.linkName, g::pipeVault.begin()));
addEntry(linkSubscribe, "new-" + first.endName);
addEntry(linkSubscribe, "new-" + first.name);
PipeInfo second(name, secondEnd, end == secondEnd, secondInterface,
secondPipeData);
g::pipeVault.push_front(second);
g::pipes.insert(make_pair("new-" + second.name, g::pipeVault.begin()));
g::pipes.insert(make_pair("new-" + second.linkName, g::pipeVault.begin()));
if (!second.isLan)
{
addEntry(linkSubscribe, "new-" + second.endName);
}
addEntry(resetSubscribe, "new-" + first.name);
}
else
{
line >> end >> interface >> pipeData;
PipeInfo first(name, end, false, interface, pipeData);
g::pipeVault.push_front(first);
g::pipes.insert(make_pair("new-" + first.name, g::pipeVault.begin()));
g::pipes.insert(make_pair("new-" + first.linkName, g::pipeVault.begin()));
addEntry(linkSubscribe, "new-" + first.endName);
addEntry(linkSubscribe, "new-" + first.name);
addEntry(resetSubscribe, "new-" + first.name);
}
getline(in, lineString);
}
}
// Adds a name to a comma-delimited list.
void addEntry(string & commaList, string const & newName)
{
if (commaList == "")
{
commaList += newName;
}
else
{
commaList += "," + newName;
}
}
void writePidFile(string const & pidFile)
{
string filename = pidFile;
if (pidFile == "")
{
// TODO: Find a platform-independent way of getting pid file path
filename = "/var/run/delayagent.pid";
// filename = _PATH_VARRUN + "/delayagent.pid";
}
ofstream file(filename.c_str(), ios::out);
if (file)
{
file << getpid();
}
}
void initEvents(string const & server, string const & port, string const & keyFile,
string const & linkSubscription, string const & resetSubscription)
{
string serverString = "elvin://" + server;
event_handle_t handle;
if (port != "")
{
serverString += ":" + port;
}
if (keyFile != "")
{
handle = event_register_withkeyfile(const_cast<char *>(server.c_str()), 0,
const_cast<char *>(keyFile.c_str()));