Commit 48420535 authored by David Johnson's avatar David Johnson

Add a broker-aware allpairs service master and tenant master.

This is the allpairs wfa again, but this time, the user-tenant-allpairs
wfa looks up the "allpairs" service at the broker, and as it receives
its own nodes, it send those caps to the allpairs service's RP that it
got from the broker.

The idea is that you run each of these wfas in a separate tenant.  For
instance, run the service-tenant-allpairs in the new service-0 tenant:

  neutron capnet-wfagent-create --name service-0-wfa0 \
    --tenant-id <SERVICE_0_UUID> \
    --master --wfapp-path=/usr/bin/capnet-wfagent-service-tenant-allpairs

Then, in the new tenant-0 tenant, do

  neutron capnet-wfagent-create --name tenant-0-wfa0 \
    --tenant-id <TENANT_0_UUID> \
    --master --wfapp-path=/usr/bin/capnet-wfagent-user-tenant-allpairs

Then add some nodes into tenant-0, and they will be granted to the
service-0 wfa0, which will add them to the allpairs mesh, i.e.

  nova boot --image trusty-server --flavor m1.small \
    --nic net-id=9bab982f-80d7-427f-a34b-0bf7d3dcd5bc t1

where net-id is the id of the Capnet network, and you have changed
the OS_PROJECT, OS_USERNAME, OS_PASSWORD, OS_TENANT env vars to send
your resource request from the tenant-0 tenant (I can't see that nova
boot supports an admin injecting resources on behalf of a tenant, like
neutron does).
parent 58168bb9
Pipeline #1234 skipped
...@@ -39,6 +39,14 @@ def main(): ...@@ -39,6 +39,14 @@ def main():
node_rp = p.create(cn.RP) node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow) nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name myname = info.name
print "Received self cap: %s" % (info.name,)
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
# Now receive node caps forever, and grant flows from us to everyone # Now receive node caps forever, and grant flows from us to everyone
# else, and vice versa. # else, and vice versa.
...@@ -55,7 +63,7 @@ def main(): ...@@ -55,7 +63,7 @@ def main():
node.reset(node_rp) node.reset(node_rp)
name = info.name name = info.name
nodes[name] = Entry(node,info,node_rp,flow) nodes[name] = Entry(node,info,node_rp,flow)
print "New node: %s" % (str(info),) print "Received new node: %s" % (str(info),)
except Exception, e: except Exception, e:
print "Exception", e print "Exception", e
time.sleep(4) time.sleep(4)
......
#!/usr/bin/env python
##
## This is the service half of a simple example that uses a brokered
## service tenant to configure flows on behalf of some compute tenant.
## This is the master wfa for the service tenant. It registers itself
## with the broker as the "allpairs" service, and each time it receives
## a capability to a node, it adds it to an all-pairs flow mesh. The
## allpairs user tenant master wfa is in user_tenant_allpairs_service.py
##
import sys
import time
#import argparse
#from pprint import pprint
from collections import namedtuple
import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
def main():
nodes = {}
p = cn.Protocol(sys.argv[1])
i = 0
while True:
try:
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0(timeout=5)
break
except:
print "ERROR: failed to get rp0 in 5 seconds; will keep trying!"
traceback.print_exc()
pass
i += 1
pass
me = None
myname = None
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
print "node info"
info = me.info()
print "node flow"
flow = me.flow()
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
# The RP for the allpairs service
service_rp = p.create(cn.RP)
broker.register("allpairs",service_rp)
print "Registered allpairs service"
# Now receive node caps forever, and grant flows between all the nodes.
while True:
(node,info,flow,name) = (None,None,None,None)
try:
print "rp0 recv"
node = rp0.recv()
print "node info"
info = node.info()
print "node flow"
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
except Exception, e:
(node,info,flow,node_rp,name) = (None,None,None,None,None)
print "Exception", e
pass
if not name is None:
try:
a = nodes[name]
for (bname,b) in nodes.iteritems():
# Don't give caps to send to us.
if b.info.name == me:
continue
print "Sending flow ->%s to %s" % (b.info.name,name)
p.debug_grant(a.node,b.flow)
#a.rp.send(b.flow)
print "Sending flow ->%s to %s" % (name,b.info.name)
p.debug_grant(b.node,a.flow)
#b.rp.send(a.flow)
pass
except:
print "ERROR: while adding node %s into all-pairs mesh:" % (name,)
traceback.print_exc()
pass
pass
time.sleep(2)
pass
pass
#!/usr/bin/env python
##
## This is one half of a simple example that uses a brokered service
## tenant to configure flows on behalf of some compute tenant. This is
## the master wfa for the compute tenant. It looks up the "allpairs"
## service, and each time it receives a capability to a node it owns, it
## passes it to the allpairs service tenant. The allpairs service
## tenant master wfa is in service_tenant_allpairs_service.py .
##
import sys
import time
#import argparse
#from pprint import pprint
from collections import namedtuple
import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
def main():
nodes = {}
p = cn.Protocol(sys.argv[1])
i = 0
while True:
try:
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0(timeout=5)
break
except:
print "ERROR: failed to get rp0 in 5 seconds; will keep trying!"
traceback.print_exc()
pass
i += 1
pass
me = None
myname = None
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
print "node info"
info = me.info()
print "node flow"
flow = me.flow()
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
# The RP for the allpairs service
service_rp = None
# Now receive node caps forever, and grant flows from us to everyone
# else, and vice versa.
while True:
(node,info,flow,name) = (None,None,None,None)
try:
print "rp0 recv"
node = rp0.recv()
print "node info"
info = node.info()
print "node flow"
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
except Exception, e:
(node,info,flow,node_rp) = (None,None,None,None)
print "Exception", e
pass
if not service_rp:
try:
service_rp = broker.lookup("allpairs")
print "Found service allpairs, sending it %d nodes" \
% (len(nodes.keys()),)
# Send all the nodes we've received so far!
for (name,entry) in nodes.iteritems():
# Except don't send our wfa!
if entry.node == me:
continue
else:
service_rp.send(entry.node)
pass
pass
pass
except Exception, e:
print "Exception: ",e
pass
pass
elif not node is None:
try:
service_rp.send(node)
except Exception, e:
print "Exception: ",e
pass
pass
time.sleep(2)
pass
pass
...@@ -58,6 +58,8 @@ console_scripts = ...@@ -58,6 +58,8 @@ console_scripts =
neutron-capnet-agent = networking_capnet.plugins.ml2.drivers.capnet.capnet_neutron_agent:main neutron-capnet-agent = networking_capnet.plugins.ml2.drivers.capnet.capnet_neutron_agent:main
capnet-wfagent-launcher = networking_capnet.tools.wfagent.launcher:main capnet-wfagent-launcher = networking_capnet.tools.wfagent.launcher:main
capnet-wfagent-allpairs = networking_capnet.tools.wfagent.allpairs:main capnet-wfagent-allpairs = networking_capnet.tools.wfagent.allpairs:main
capnet-wfagent-user-tenant-allpairs = networking_capnet.tools.wfagent.user_tenant_allpairs_service:main
capnet-wfagent-service-tenant-allpairs = networking_capnet.tools.wfagent.service_tenant_allpairs_service:main
neutron.db.alembic_migrations = neutron.db.alembic_migrations =
networking-capnet = networking_capnet.db.migration:alembic_migrations networking-capnet = networking_capnet.db.migration:alembic_migrations
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment