All new accounts created on Gitlab now require administrator approval. If you invite any collaborators, please let Flux staff know so they can approve the accounts.

Commit aa6875f7 authored by Josh Kunz's avatar Josh Kunz

Fix the wfas to work with reset/node_grant stuff

parent 7e3d1ea6
Pipeline #1383 skipped
......@@ -19,7 +19,7 @@ import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
Entry = namedtuple("Entry", ["node", "grant", "info", "rp", "flow"])
ME = "SERVICEHADOOPMEMBRANEWFA"
SLEEPTIME = 0.01
......@@ -67,7 +67,7 @@ def main():
print "node info"
info = me.info()
print "node flow"
flow = me.flow()
flow = p.create(cn.Flow)
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
......@@ -133,19 +133,19 @@ def main():
phase_start_time = 0
ts('recvnodes',0,'start')
while True:
(node,info,flow,name) = (None,None,None,None)
(node,grant,info,flow,name) = (None,None,None,None)
try:
t = time.time()
node = membrane_ext.recv()
print "membrane_ext recv"
node = membrane_ext.recv()
node_rp = p.create(cn.RP)
grant = node.reset(node_rp)
print "node info"
info = node.info()
print "node flow"
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
flow = grant.flow()
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
nodes[name] = Entry(node,grant,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
ts('recvnode',name,'start',t=t)
ts('recvnode',name,'stop')
......@@ -153,7 +153,7 @@ def main():
except Exception, e:
if phase_start_time == 0:
phase_start_time = time.time()
(node,info,flow,node_rp,name) = (None,None,None,None,None)
(node,grant,info,flow,node_rp,name) = (None,None,None,None,None)
#print "Exception", e
pass
sys.stdout.flush()
......@@ -179,11 +179,13 @@ def main():
continue
print "Sending flow ->%s to %s" % (b.info.name,name)
p.debug_grant(a.node,b.flow)
a.grant.grant(b.flow)
#p.debug_grant(a.node,b.flow)
#a.rp.send(b.flow)
print "Sending flow ->%s to %s" % (name,b.info.name)
p.debug_grant(b.node,a.flow)
b.grant.grant(a.flow)
#p.debug_grant(b.node,a.flow)
#b.rp.send(a.flow)
pass
ts("allpairs",a,'stop')
......@@ -240,21 +242,22 @@ def main():
# Ok, now that hadoop is done, send all the flow caps we created
# back through the membrane, and exit.
#
ts('flowsendback',0,'start')
print "Sending %d flow caps back through membrane..." % (len(flow_caps),)
ts('mastersendback',0,'start')
print "master cap back through membrane..."
sys.stdout.flush()
sys.stderr.flush()
for cap in flow_caps:
try:
membrane_ext.send(cap)
print "Sent flow cap %s" % (str(cap),)
except Exception, e:
print "Exception: ",e
pass
sys.stdout.flush()
sys.stderr.flush()
pass
ts('flowsendback',0,'stop')
##for cap in flow_caps:
## try:
## membrane_ext.send(cap)
## print "Sent flow cap %s" % (str(cap),)
## except Exception, e:
## print "Exception: ",e
## pass
## sys.stdout.flush()
## sys.stderr.flush()
## pass
membrane_ext.send(master.grant)
ts('mastersendback',0,'stop')
print "Finished setting up Hadoop and its network!"
......
......@@ -18,7 +18,7 @@ import traceback
import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
Entry = namedtuple("Entry", ["node", "grant", "info", "rp", "flow"])
ME = "USERHADOOPMEMBRANEWFA"
SLEEPTIME = 0.01
......@@ -70,9 +70,9 @@ def main():
print "node info"
info = me.info()
print "node flow"
flow = me.flow()
flow = p.create(cn.Flow)
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
nodes[info.name] = Entry(me,None,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
ts('self',0,'stop')
......@@ -103,7 +103,7 @@ def main():
ts('recvnodes',0,'start')
phase_start_time = 0
while True:
(node,info,flow,name) = (None,None,None,None)
(node,grant,info,flow,name) = (None,None,None,None)
# Only receive node caps if we've started receiving nodes and
# we've seen one in the last 60 seconds. Otherwise, quit
# receiving under the assumption that we've sent them all to the
......@@ -114,18 +114,19 @@ def main():
node = rp0.recv()
print "node info"
info = node.info()
print "node flow"
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
# none of this is important since we don't do anything with the nodes
#node_rp = p.create(cn.RP)
#grant = node.reset(node_rp)
#print "node flow"
#flow = grant.flow()
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
nodes[name] = Entry(node,grant,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
ts('recvnode',name,'start',t=t)
ts('recvnode',name,'stop')
phase_start_time = 0
except Exception, e:
(node,info,flow,node_rp) = (None,None,None,None)
(node,grant,info,flow,node_rp) = (None,None,None,None)
if phase_start_time == 0:
phase_start_time = time.time()
#print "Exception", e
......@@ -230,9 +231,17 @@ def main():
# Ok, cut the membrane!
print "Hadoop seems done; cutting membrane!"
ts('membrane-clear',0,'start')
membrane_int.clear()
ts('membrane-clear',0,'stop')
print "Finished cutting membrane!"
print "Establishing connectivity to the master"
master_grant = caps_from_membrane[0]
master_flow = master_grant.flow()
master_grant.grant(nodes[myname].flow)
print "Finished establishing connectivity to the master"
## if node:
## # Allow it to communicate with us, we'll want to talk to the
## # master and maybe other nodes later.
......@@ -268,8 +277,8 @@ def main():
print "Finished '%s'..." % (cmd,)
ts('hadoop-job-run',0,'stop')
print "DONE"
ts('wfa',0,'stop')
print "DONE"
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment