Commit 15e5bb6e authored by David Johnson's avatar David Johnson

Add a User/Hadoop workflow agent pair that uses membranes.

This is a bit weird.  The user wfa receives node and passes them to the
membrane until it hasn't received any for 60 seconds; then it starts
trying to receive capabilities from the membrane; that is how it figures
out that Hadoop has finished.  The hadoop wfa doesn't send any caps back
until it has finished setting up; then it sends all flow caps it created
back to the user wfa.
parent 8974bfbc
Pipeline #1323 skipped
#!/usr/bin/env python
##
## This is the service half of a simple example that uses a brokered
## service tenant to configure flows on behalf of some compute tenant.
## This is the master wfa for the service tenant. It registers itself
## with the broker as the "hadoop" service, and each time it receives
## a capability to a node, it adds it to an all-pairs flow mesh. The
## hadoop user tenant master wfa is in user_tenant_hadoop_service.py
##
import sys
import os
import time
#import argparse
#from pprint import pprint
from collections import namedtuple
import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
def main():
nodes = {}
p = cn.Protocol(sys.argv[1])
i = 0
while True:
try:
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0(timeout=10)
break
except:
print "ERROR: failed to get rp0 in 10 seconds; will keep trying!"
traceback.print_exc()
pass
sys.stdout.flush()
sys.stderr.flush()
time.sleep(1)
i += 1
pass
me = None
myname = None
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
print "node info"
info = me.info()
print "node flow"
flow = me.flow()
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
sys.stdout.flush()
sys.stderr.flush()
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
sys.stdout.flush()
sys.stderr.flush()
# The RP for the hadoop service
service_rp = p.create(cn.RP)
broker.register("hadoop",service_rp)
print "Registered hadoop service"
sys.stdout.flush()
sys.stderr.flush()
#
# First receive our membrane.
#
membrane_ext = None
while True:
try:
membrane_ext = service_rp.recv()
if type(membrane_ext) != cn.Membrane:
raise Exception("Cap is not a membrane: %s"
% (str(membrane_ext),))
print "Received membrane, looking for nodes"
break
except Exception, e:
print "Exception: could not receive membrane",e
time.sleep(1)
pass
sys.stdout.flush()
sys.stderr.flush()
pass
flow_caps = []
sys.stdout.flush()
sys.stderr.flush()
# Now receive node caps until we have a master, a resourcemanager,
# and we haven't heard from the controller for 60 N-second cycles
# (N is currently ~2s).
master = None
resourcemanager = None
slaves = []
counter = 0
while True:
(node,info,flow,name) = (None,None,None,None)
try:
print "service_rp recv"
node = membrane_ext.recv()
print "node info"
info = node.info()
print "node flow"
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
counter = 0
except Exception, e:
counter += 1
(node,info,flow,node_rp,name) = (None,None,None,None,None)
print "Exception", e
pass
sys.stdout.flush()
sys.stderr.flush()
if not name is None:
# Figure out which node this is.
if name == 'master':
master = nodes[name]
elif name == 'resourcemanager':
resourcemanager = nodes[name]
else:
slaves.append(nodes[name])
pass
# Setup the allpairs communication, because this is what
# hadoop needs if we don't have fine-grained flows.
try:
a = nodes[name]
for (bname,b) in nodes.iteritems():
print "Sending flow ->%s to %s" % (b.info.name,name)
p.debug_grant(a.node,b.flow)
#a.rp.send(b.flow)
print "Sending flow ->%s to %s" % (name,b.info.name)
p.debug_grant(b.node,a.flow)
#b.rp.send(a.flow)
pass
except:
print "ERROR: while adding node %s into all-pairs mesh:" % (name,)
traceback.print_exc()
pass
pass
if master and resourcemanager and counter > 59:
break
sys.stdout.flush()
sys.stderr.flush()
time.sleep(1)
pass
print "Assuming all nodes have been granted to us; setting up hadoop!"
args = ""
print "master = %s %s %s" % (master.info.name,master.info.ipv4,
master.info.mac)
args += " %s" % (master.info.ipv4,)
print "resourcemanager = %s %s %s" % (resourcemanager.info.name,
resourcemanager.info.ipv4,
resourcemanager.info.mac)
args += " %s" % (resourcemanager.info.ipv4,)
for slave in slaves:
print "slave = %s %s %s" % (slave.info.name,slave.info.ipv4,
slave.info.mac)
args += " %s" % (slave.info.ipv4,)
pass
sys.stdout.flush()
sys.stderr.flush()
user = 'ubuntu'
mastercmd = "/home/ubuntu/setup.sh"
cmd = "ssh -o StrictHostKeyChecking=no %s@%s %s %s" % (user,master.info.ipv4,mastercmd,args)
print "Running '%s'..." % (cmd,)
sys.stdout.flush()
sys.stderr.flush()
os.system(cmd)
print "Finished '%s'..." % (cmd,)
sys.stdout.flush()
sys.stderr.flush()
#
# Ok, now that hadoop is done, send all the flow caps we created
# back through the membrane, and exit.
#
print "Sending %d flow caps back through membrane..." % (len(flow_caps),)
sys.stdout.flush()
sys.stderr.flush()
for cap in flow_caps:
try:
membrane_ext.send(cap)
print "Sent flow cap %s" % (str(cap),)
except Exception, e:
print "Exception: ",e
pass
sys.stdout.flush()
sys.stderr.flush()
pass
print "Finished setting up Hadoop and its network!"
pass
#!/usr/bin/env python
##
## This is one half of a simple example that uses a brokered service
## tenant to configure flows on behalf of some compute tenant. This is
## the master wfa for the compute tenant. It looks up the "hadoop"
## service, and each time it receives a capability to a node it owns, it
## passes it to the hadoop service tenant. The hadoop service
## tenant master wfa is in service_tenant_hadoop_service.py .
##
import sys
import time
#import argparse
#from pprint import pprint
from collections import namedtuple
import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
def main():
nodes = {}
p = cn.Protocol(sys.argv[1])
i = 0
while True:
try:
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0(timeout=10)
break
except:
print "ERROR: failed to get rp0 in 10 seconds; will keep trying!"
traceback.print_exc()
pass
time.sleep(1)
i += 1
pass
me = None
myname = None
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
print "node info"
info = me.info()
print "node flow"
flow = me.flow()
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
# The RP for the hadoop service
service_rp = None
# The Membrane we send caps through to the hadoop service
print "setting up membrane"
membrane_int = p.create(cn.Membrane)
membrane_ext = membrane_int.external()
assert membrane_int.cptr != membrane_ext.cptr, "should be different cptrs"
caps_from_membrane = []
counter_one = 0
# Now receive node caps forever, and grant flows from us to everyone
# else, and vice versa.
while True:
(node,info,flow,name) = (None,None,None,None)
# Only receive node caps if we've started receiving nodes and
# we've seen one in the last 60 seconds. Otherwise, quit
# receiving under the assumption that we've sent them all to the
# hadoop wfa through the membrane.
try:
print "rp0 recv"
node = rp0.recv()
print "node info"
info = node.info()
print "node flow"
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
counter_one = 0
except Exception, e:
(node,info,flow,node_rp) = (None,None,None,None)
counter_one += 1
#print "Exception", e
pass
if node:
# Allow it to communicate with us, we'll want to talk to the
# master and maybe other nodes later.
try:
print "Sending flow ->%s to %s" % (myname,name)
p.debug_grant(node,nodes[myname].flow)
#a.rp.send(b.flow)
print "Sending flow ->%s to %s" % (name,myname)
p.debug_grant(me,flow)
#b.rp.send(a.flow)
except Exception, e:
print "Exception",e
pass
pass
if not service_rp:
try:
service_rp = broker.lookup("hadoop")
print "Found service hadoop, sending it our membrane"
service_rp.send(membrane_ext)
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
# Send all the nodes we've received so far!
for (name,entry) in nodes.iteritems():
# Except don't send our wfa!
if entry.node == me:
continue
else:
membrane_int.send(entry.node)
pass
pass
pass
except Exception, e:
print "Exception: ",e
pass
pass
elif node:
try:
print "Sending cap to %s to hadoop service" % (info.name,)
membrane_int.send(node)
print "Sent cap to %s to hadoop service" % (info.name,)
except Exception, e:
print "Exception: ",e
pass
pass
sys.stdout.flush()
sys.stderr.flush()
if len(nodes.keys()) > 1 and counter_one > 60:
break
pass
#while True:
# print "Just sleeping, not receiving back from membrane nor clearing it"
# time.sleep(1)
# pass
counter_two = 0
while True:
# Now also try to recv caps back from the service wfa. Once
# len(caps_from_membrane) > 0, and we haven't seen a new cap for
# 60 seconds, we cut the membrane and declare victory.
try:
cap = membrane_int.recv()
print "Received a cap back from the hadoop service"
counter_two = 0
caps_from_membrane.append(cap)
except:
counter_two += 1
pass
sys.stdout.flush()
sys.stderr.flush()
if len(caps_from_membrane) > 0 and counter_two > 60:
break
time.sleep(1)
pass
# Ok, cut the membrane!
print "Hadoop seems done; cutting membrane!"
membrane_int.clear()
print "Finished cutting membrane!"
# Ok, at this point, we can run some hadoop job!
pass
......@@ -62,6 +62,8 @@ console_scripts =
capnet-wfagent-service-tenant-allpairs = networking_capnet.tools.wfagent.service_tenant_allpairs_service:main
capnet-wfagent-user-tenant-hadoop = networking_capnet.tools.wfagent.user_tenant_hadoop_service:main
capnet-wfagent-service-tenant-hadoop = networking_capnet.tools.wfagent.service_tenant_hadoop_service:main
capnet-wfagent-user-tenant-hadoop-membrane = networking_capnet.tools.wfagent.user_tenant_hadoop_service_membrane:main
capnet-wfagent-service-tenant-hadoop-membrane = networking_capnet.tools.wfagent.service_tenant_hadoop_service_membrane:main
neutron.db.alembic_migrations =
networking-capnet = networking_capnet.db.migration:alembic_migrations
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment