Commit 51100be1 authored by David Johnson's avatar David Johnson

Hadoop workflow agent tandem.

This is a bit odd, because the hadoop service wfa does not know when
it's done receiving node caps from the user/tenant wfa, and it has no
way to signal the user when it's done setting up.  Oh well.
parent 12a79dc9
Pipeline #1297 skipped
#!/usr/bin/env python
##
## This is the service half of a simple example that uses a brokered
## service tenant to configure flows on behalf of some compute tenant.
## This is the master wfa for the service tenant. It registers itself
## with the broker as the "hadoop" service, and each time it receives
## a capability to a node, it adds it to an all-pairs flow mesh. The
## hadoop user tenant master wfa is in user_tenant_hadoop_service.py
##
import sys
import time
#import argparse
#from pprint import pprint
from collections import namedtuple
import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
def main():
nodes = {}
p = cn.Protocol(sys.argv[1])
i = 0
while True:
try:
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0(timeout=5)
break
except:
print "ERROR: failed to get rp0 in 5 seconds; will keep trying!"
traceback.print_exc()
pass
i += 1
pass
me = None
myname = None
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
print "node info"
info = me.info()
print "node flow"
flow = me.flow()
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
# The RP for the hadoop service
service_rp = p.create(cn.RP)
broker.register("hadoop",service_rp)
print "Registered hadoop service"
# Now receive node caps until we have a master, a resourcemanager,
# and we haven't heard from the controller for 60 N-second cycles
# (N is currently ~2s).
master = None
resourcemanager = None
slaves = []
counter = 0
while True:
(node,info,flow,name) = (None,None,None,None)
try:
print "service_rp recv"
node = service_rp.recv()
print "node info"
info = node.info()
print "node flow"
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
counter = 0
except Exception, e:
counter += 1
(node,info,flow,node_rp,name) = (None,None,None,None,None)
print "Exception", e
pass
if not name is None:
# Figure out which node this is.
if name == 'master':
master = nodes[name]
elif name == 'resourcemanager':
resourcemanager = nodes[name]
else:
slaves.append(nodes[name])
pass
# Setup the allpairs communication, because this is what
# hadoop needs if we don't have fine-grained flows.
try:
a = nodes[name]
for (bname,b) in nodes.iteritems():
print "Sending flow ->%s to %s" % (b.info.name,name)
p.debug_grant(a.node,b.flow)
#a.rp.send(b.flow)
print "Sending flow ->%s to %s" % (name,b.info.name)
p.debug_grant(b.node,a.flow)
#b.rp.send(a.flow)
pass
except:
print "ERROR: while adding node %s into all-pairs mesh:" % (name,)
traceback.print_exc()
pass
pass
if master and resourcemanager and counter > 59:
break
sys.stdout.flush()
sys.stderr.flush()
time.sleep(1)
pass
print "Assuming all nodes have been granted to us; setting up hadoop!"
args = ""
print "master = %s %s %s" % (master.info.name,master.info.ipv4,
master.info.mac)
args += " %s" % (master.info.ipv4,)
print "resourcemanager = %s %s %s" % (resourcemanager.info.name,
resourcemanager.info.ipv4,
resourcemanager.info.mac)
args += " %s" % (resourcemanager.info.ipv4,)
for slave in slaves:
print "slave = %s %s %s" % (slave.info.name,slave.info.ipv4,
slave.info.mac)
args += " %s" % (slave.info.ipv4,)
pass
sys.stdout.flush()
sys.stderr.flush()
user = 'ubuntu'
mastercmd = "/home/ubuntu/setup.sh"
os.
cmd = "ssh %s@%s %s %s" % (user,master.info.ipv4,mastercmd,args))
print "Running '%s'..." % (cmd,)
os.system(cmd)
print "Finished '%s'..." % (cmd,)
pass
#!/usr/bin/env python
##
## This is one half of a simple example that uses a brokered service
## tenant to configure flows on behalf of some compute tenant. This is
## the master wfa for the compute tenant. It looks up the "hadoop"
## service, and each time it receives a capability to a node it owns, it
## passes it to the hadoop service tenant. The hadoop service
## tenant master wfa is in service_tenant_hadoop_service.py .
##
import sys
import time
#import argparse
#from pprint import pprint
from collections import namedtuple
import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
def main():
nodes = {}
p = cn.Protocol(sys.argv[1])
i = 0
while True:
try:
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0(timeout=5)
break
except:
print "ERROR: failed to get rp0 in 5 seconds; will keep trying!"
traceback.print_exc()
pass
i += 1
pass
me = None
myname = None
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
print "node info"
info = me.info()
print "node flow"
flow = me.flow()
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
# The RP for the hadoop service
service_rp = None
# Now receive node caps forever, and grant flows from us to everyone
# else, and vice versa.
while True:
(node,info,flow,name) = (None,None,None,None)
try:
print "rp0 recv"
node = rp0.recv()
print "node info"
info = node.info()
print "node flow"
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
except Exception, e:
(node,info,flow,node_rp) = (None,None,None,None)
print "Exception", e
pass
if not service_rp:
try:
service_rp = broker.lookup("hadoop")
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
# Send all the nodes we've received so far!
for (name,entry) in nodes.iteritems():
# Except don't send our wfa!
if entry.node == me:
continue
else:
service_rp.send(entry.node)
pass
pass
pass
except Exception, e:
print "Exception: ",e
pass
pass
elif node:
try:
print "Sending cap to %s to hadoop service" % (info.name,)
service_rp.send(node)
print "Sent cap to %s to hadoop service" % (info.name,)
except Exception, e:
print "Exception: ",e
pass
pass
sys.stdout.flush()
sys.stderr.flush()
time.sleep(2)
pass
pass
......@@ -60,6 +60,8 @@ console_scripts =
capnet-wfagent-allpairs = networking_capnet.tools.wfagent.allpairs:main
capnet-wfagent-user-tenant-allpairs = networking_capnet.tools.wfagent.user_tenant_allpairs_service:main
capnet-wfagent-service-tenant-allpairs = networking_capnet.tools.wfagent.service_tenant_allpairs_service:main
capnet-wfagent-user-tenant-hadoop = networking_capnet.tools.wfagent.user_tenant_hadoop_service:main
capnet-wfagent-service-tenant-hadoop = networking_capnet.tools.wfagent.service_tenant_hadoop_service:main
neutron.db.alembic_migrations =
networking-capnet = networking_capnet.db.migration:alembic_migrations
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment