Commit 0e8d3df0 authored by Josh Kunz's avatar Josh Kunz

Simplify membrane-based wfas with new capnet features

parent 06ad9212
......@@ -17,9 +17,10 @@ import time
from collections import namedtuple
import traceback
import capnet.capnet as cn
import itertools
Entry = namedtuple("Entry", ["node", "grant", "info", "rp", "flow"])
import capnet as cn
import capnet.util
ME = "SERVICEHADOOPMEMBRANEWFA"
RECVNODES_TIMEOUT = 60.0
......@@ -38,144 +39,51 @@ def main():
ts('proto',0,'start')
p = cn.Protocol(sys.argv[1])
ts('proto',0,'stop')
i = 0
ts('rp0',0,'start')
print "getting rp0"
rp0 = p.rp0()
ts('rp0',0,'stop')
me = None
myname = None
ts('self',0,'start')
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
print "node info"
info = me.info()
print "node flow"
flow = p.create(cn.Flow)
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,None,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
ts('self',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
ts('broker',0,'start')
print "rp0 recv broker"
broker = rp0.recv()
ts('broker',0,'stop')
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
sys.stdout.flush()
sys.stderr.flush()
rp0, me, broker = capnet.util.recv_preamble(p)
# The RP for the hadoop service
ts('broker-rp',0,'start')
service_rp = p.create(cn.RP)
broker.register("hadoop",service_rp)
broker.register("hadoop", service_rp)
print "Registered hadoop service"
ts('broker-rp',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
#
# First receive our membrane.
# First receive the membrane from the client
#
ts('membrane',0,'start')
membrane_ext = None
print "Waiting for membrane..."
ts('membrane',0,'start')
membrane_ext = service_rp.recv_wait()
if type(membrane_ext) != cn.Membrane:
raise Exception("Cap is not a membrane: %s"
% (str(membrane_ext),))
print "Received membrane, looking for nodes"
ts('membrane',0,'stop')
flow_caps = []
sys.stdout.flush()
sys.stderr.flush()
# Now receive node caps until we have a master, a resourcemanager,
# and we haven't heard from the controller for 60 N-second cycles
# (N is currently ~2s).
assert membrane_ext.__class__ == cn.Membrane, "must receive membrane from service rp"
trans_rp = membrane_ext.recv_wait()
assert trans_rp.__class__ == cn.RP
ts('recvnodes',0,'start')
nodes = capnet.util.recv_nodes(p, trans_rp.recv_iter())
master = None
resourcemanager = None
resourcemanager = None
slaves = {}
phase_start_time = 0
ts('recvnodes',0,'start')
while True:
(node,grant,info,flow,node_rp,name) = (None,None,None,None,None,None)
try:
t = time.time()
print "membrane_ext recv"
node = membrane_ext.recv_wait(timeout=RECVNODES_TIMEOUT)
node_rp = p.create(cn.RP)
grant = node.reset(node_rp)
print "node info"
info = node.info()
print "node flow"
flow = grant.flow()
name = info.name
nodes[name] = Entry(node,grant,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
ts('recvnode',name,'start',t=t)
ts('recvnode',name,'stop')
phase_start_time = 0
except cn.TimeoutException:
break
sys.stdout.flush()
sys.stderr.flush()
if not name is None:
# Figure out which node this is.
if name == 'master':
master = nodes[name]
elif name == 'resourcemanager':
resourcemanager = nodes[name]
else:
slaves[name] = nodes[name]
pass
# Setup the allpairs communication, because this is what
# hadoop needs if we don't have fine-grained flows.
try:
a = nodes[name]
ts("allpairs",hash(a),'start')
for (bname,b) in nodes.iteritems():
if name == bname:
continue
print "Sending flow ->%s to %s" % (b.info.name,name)
a.grant.grant(b.flow)
#p.debug_grant(a.node,b.flow)
#a.rp.send(b.flow)
if not b.grant is None:
print "Sending flow ->%s to %s" % (name,b.info.name)
b.grant.grant(a.flow)
#p.debug_grant(b.node,a.flow)
#b.rp.send(a.flow)
pass
pass
ts("allpairs",hash(a),'stop')
except:
print "ERROR: while adding node %s into all-pairs mesh:" % (name,)
traceback.print_exc()
pass
pass
sys.stdout.flush()
sys.stderr.flush()
time.sleep(SLEEPTIME)
pass
for (name, node) in nodes.iteritems():
if name == 'master':
master = node
elif name == 'resourcemanager':
resourcemanager = node
else:
slaves[name] = node
ts('recvnodes',0,'stop')
ts("allpairs",0,'start')
for (a, b) in itertools.combinations(nodes.itervalues(), 2):
a.node_grant.grant(b.flow)
b.node_grant.grant(a.flow)
ts("allpairs",0,'stop')
ts('hadoop-setup',0,'start')
print "Assuming all nodes have been granted to us; setting up hadoop!"
......@@ -192,7 +100,7 @@ def main():
for i in range(0,len(slaves)):
slavename = "slave%d" % (i,)
slave = slaves[slavename]
print "slave = %s %s %s" % (slave.info.name,slave.info.ipv4,
print "slave = %s %s %s" % (slave.info.name, slave.info.ipv4,
slave.info.mac)
args += " %s" % (slave.info.ipv4,)
pass
......@@ -216,27 +124,13 @@ def main():
# Ok, now that hadoop is done, send all the flow caps we created
# back through the membrane, and exit.
#
ts('mastersendback',0,'start')
print "master cap back through membrane..."
sys.stdout.flush()
sys.stderr.flush()
##for cap in flow_caps:
## try:
## membrane_ext.send(cap)
## print "Sent flow cap %s" % (str(cap),)
## except Exception, e:
## print "Exception: ",e
## pass
## sys.stdout.flush()
## sys.stderr.flush()
## pass
membrane_ext.send(master.grant)
ts('mastersendback',0,'stop')
print "Finished setting up Hadoop and its network!"
ts('mastersendback',0,'start')
trans_rp.send(master.grant);
ts('mastersendback',0,'stop')
ts('wfa',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
pass
print "Finished setting up Hadoop and its network!"
......@@ -17,19 +17,15 @@ import time
from collections import namedtuple
import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "grant", "info", "rp", "flow"])
import capnet as cn
import capnet.util
ME = "USERHADOOPMEMBRANEWFA"
SLEEPTIME = 0.01
PHASETIME = 4.0
def ts(obj,id,op,t=None):
if t is None:
t = time.time()
print "%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,)
pass
def main():
ts('wfa',0,'start')
......@@ -39,180 +35,49 @@ def main():
extraargs=sys.argv[2:]
else:
extraargs=[]
nodes = {}
ts('proto',0,'start')
p = cn.Protocol(sys.argv[1])
ts('proto',0,'stop')
i = 0
ts('rp0',0,'start')
while True:
try:
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0()
break
except:
print "ERROR: failed to get rp0 in 10 seconds; will keep trying!"
traceback.print_exc()
pass
time.sleep(SLEEPTIME)
i += 1
pass
ts('rp0',0,'stop')
me = None
myname = None
ts('self',0,'start')
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
print "node info"
info = me.info()
print "node flow"
flow = p.create(cn.Flow)
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,None,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
ts('self',0,'stop')
ts('broker',0,'start')
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
ts('broker',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
rp0, me, broker = capnet.util.recv_preamble(p)
# The Membrane we send caps through to the hadoop service
ts('membrane-create',0,'start')
print "setting up membrane"
ts('membrane-create',0,'start')
membrane_int = p.create(cn.Membrane)
membrane_ext = membrane_int.external()
assert membrane_int.cptr != membrane_ext.cptr, "should be different cptrs"
ts('membrane-create',0,'stop')
caps_from_membrane = []
# Now receive node caps forever, and grant flows from us to everyone
# else, and vice versa.
ts('recvnodes',0,'start')
first = True
while True:
(node,grant,info,flow,name) = (None,None,None,None,None)
# Only receive node caps if we've started receiving nodes and
# we've seen one in the last 60 seconds. Otherwise, quit
# receiving under the assumption that we've sent them all to the
# hadoop wfa through the membrane.
try:
t = time.time()
print "rp0 recv"
if first:
node = rp0.recv_wait()
first = False
else:
node = rp0.recv_wait(timeout=RECVNODES_TIMEOUT)
print "node info"
info = node.info()
# none of this is important since we don't do anything with the nodes
#node_rp = p.create(cn.RP)
#grant = node.reset(node_rp)
#print "node flow"
#flow = grant.flow()
name = info.name
nodes[name] = Entry(node,grant,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
ts('recvnode',name,'start',t=t)
ts('recvnode',name,'stop')
except cn.TimeoutException:
break
sys.stdout.flush()
sys.stderr.flush()
pass
ts('recvnodes',0,'start')
nodes = capnet.util.recv_nodes(p, rp0.recv_iter_timeout(RECVNODES_TIMEOUT))
ts('recvnodes',0,'stop')
ts('service-rp',0,'start')
# The RP for the hadoop service
while not service_rp:
try:
service_rp = broker.lookup("hadoop")
print "Found service hadoop, sending it our membrane"
service_rp.send(membrane_ext)
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
break
except Exception, e:
#print "Exception: ",e
pass
time.sleep(SLEEPTIME)
pass
service_rp = broker.lookup_wait("hadoop")
ts('service-rp',0,'stop')
ts('trans-rp',0,'start')
service_rp.send(membrane_ext)
trans_rp = p.create(cn.RP)
service_rp.send(trans_rp)
ts('trans-rp',0,'stop')
ts('service-node-send',0,'start')
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
# Send all the nodes we've received so far!
for (name,entry) in nodes.iteritems():
# Except don't send our wfa!
if entry.node == me:
continue
else:
print "Sending cap to %s to hadoop service" % (name,)
membrane_int.send(entry.node)
print "Sent cap to %s to hadoop service" % (name,)
pass
pass
ts('service-node-send',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
ts('wait-membrane-drain',0,'start')
# Wait at least 0.25 second per node we sent, to ensure the membrane
# has been drained by the service wfa, OR wait 60 seconds -- the max.
waittime = max(0.5 * (len(nodes.keys()) - 1), 30)
print "Waiting for %f seconds for service wfa to drain the membrane..."
phase_start_time = time.time()
while (time.time() - phase_start_time) < waittime:
time.sleep(SLEEPTIME)
pass
print "Waited for phasetime %f, moving on" % (waittime,)
ts('wait-membrane-drain',0,'stop')
#while True:
# print "Just sleeping, not receiving back from membrane nor clearing it"
# time.sleep(1)
# pass
ts('service-node-send',0,'start')
with trans_rp.send_iter_context():
for (name, node) in nodes.iteritems():
  • Is this really an appropriate use of with?

  • "Appropriate"? I'm on the fence on that one. The other possible interface would be one where some type of generator is passed to rp.send_iter and send_iter itself calls the send method with values produced by the generator. This is probably the better interface, but I couldn't figure out what sort of generator should be passed to rp.send_iter since it could be generating capabilities, messages, or a combination of the two. As a compromise, I made this context thing that sends the appropriate start sequence and stop sequence and lets the user invoke send as many times as they want within that start/stop context. So I mean, it's not wrong per-se, but I'm not sure if it's the best interface.

  • Yeah, ok, I can see this as "appropriate"... I just had to go read the PEP. Now the question for tcloud/capnet/__init__.py send_iter_context(): should the yield be wrapped in a try with a finally to send the stop msg no matter what? How to actually give the user control of that part? Maybe yet another method that they can call to clean up by sending the stop signal, if they catch the exception themselves?

  • Yeah, you're right, it should probably wrapped in a try-finally. At least that way we could theoretically recover and re-use send_iter.

Please register or sign in to reply
trans_rp.send(node.node)
ts('service-node-send',0,'stop')
ts('membrane-recv',0,'start')
first = True
while True:
# Now also try to recv caps back from the service wfa. Once
# len(caps_from_membrane) > 0, and we haven't seen a new cap for
# PHASETIME seconds, we cut the membrane and declare victory.
try:
if first:
cap = membrane_int.recv_wait()
first = False
else:
cap = membrane_int.recv_wait(timeout=RECVNODES_TIMEOUT)
print "Received a cap back from the hadoop service"
caps_from_membrane.append(cap)
except cn.TimeoutException:
break
sys.stdout.flush()
sys.stderr.flush()
ts('membrane-recv',0,'stop')
ts('trans-recv',0,'start')
master_grant = trans_rp.recv_wait()
ts('trans-recv',0,'stop')
# Ok, cut the membrane!
print "Hadoop seems done; cutting membrane!"
......@@ -222,25 +87,9 @@ def main():
print "Finished cutting membrane!"
print "Establishing connectivity to the master"
master_grant = caps_from_membrane[0]
master_flow = master_grant.flow()
master_grant.grant(nodes[myname].flow)
master_grant.grant(me.flow)
print "Finished establishing connectivity to the master"
## if node:
## # Allow it to communicate with us, we'll want to talk to the
## # master and maybe other nodes later.
## try:
## print "Sending flow ->%s to %s" % (myname,name)
## p.debug_grant(node,nodes[myname].flow)
## #a.rp.send(b.flow)
## print "Sending flow ->%s to %s" % (name,myname)
## p.debug_grant(me,flow)
## #b.rp.send(a.flow)
## except Exception, e:
## print "Exception",e
## pass
## pass
# Ok, at this point, we can run some hadoop job!
......@@ -253,7 +102,6 @@ def main():
# By default, do a size of 128 * nslaves * 1024 * 1024
size = (len(nodes.keys()) - 3) * 128 * 1024 * 1024
mastercmd += " %d" % (size,)
pass
master = nodes['master']
cmd = "ssh -o StrictHostKeyChecking=no %s@%s %s %s" % (user,master.info.ipv4,mastercmd,' '.join(extraargs))
......@@ -265,5 +113,3 @@ def main():
ts('wfa',0,'stop')
print "DONE"
pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment