Commit 06ad9212 authored by Josh Kunz's avatar Josh Kunz

Fix to use recv_wait instead of polling

parent c5b3f046
...@@ -22,8 +22,7 @@ import capnet.capnet as cn ...@@ -22,8 +22,7 @@ import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "grant", "info", "rp", "flow"]) Entry = namedtuple("Entry", ["node", "grant", "info", "rp", "flow"])
ME = "SERVICEHADOOPMEMBRANEWFA" ME = "SERVICEHADOOPMEMBRANEWFA"
SLEEPTIME = 0.01 RECVNODES_TIMEOUT = 60.0
PHASETIME = 4.0
def ts(obj,id,op,t=None): def ts(obj,id,op,t=None):
if t is None: if t is None:
...@@ -41,20 +40,8 @@ def main(): ...@@ -41,20 +40,8 @@ def main():
ts('proto',0,'stop') ts('proto',0,'stop')
i = 0 i = 0
ts('rp0',0,'start') ts('rp0',0,'start')
while True: print "getting rp0"
try: rp0 = p.rp0()
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0()
break
except:
#print "ERROR: failed to get rp0 in 1 seconds; will keep trying!"
#traceback.print_exc()
pass
sys.stdout.flush()
sys.stderr.flush()
time.sleep(SLEEPTIME)
i += 1
pass
ts('rp0',0,'stop') ts('rp0',0,'stop')
me = None me = None
...@@ -102,21 +89,11 @@ def main(): ...@@ -102,21 +89,11 @@ def main():
ts('membrane',0,'start') ts('membrane',0,'start')
membrane_ext = None membrane_ext = None
print "Waiting for membrane..." print "Waiting for membrane..."
while True: membrane_ext = service_rp.recv_wait()
try: if type(membrane_ext) != cn.Membrane:
membrane_ext = service_rp.recv() raise Exception("Cap is not a membrane: %s"
if type(membrane_ext) != cn.Membrane: % (str(membrane_ext),))
raise Exception("Cap is not a membrane: %s" print "Received membrane, looking for nodes"
% (str(membrane_ext),))
print "Received membrane, looking for nodes"
break
except Exception, e:
#print "Exception: could not receive membrane",e
time.sleep(SLEEPTIME)
pass
sys.stdout.flush()
sys.stderr.flush()
pass
ts('membrane',0,'stop') ts('membrane',0,'stop')
flow_caps = [] flow_caps = []
...@@ -137,7 +114,7 @@ def main(): ...@@ -137,7 +114,7 @@ def main():
try: try:
t = time.time() t = time.time()
print "membrane_ext recv" print "membrane_ext recv"
node = membrane_ext.recv() node = membrane_ext.recv_wait(timeout=RECVNODES_TIMEOUT)
node_rp = p.create(cn.RP) node_rp = p.create(cn.RP)
grant = node.reset(node_rp) grant = node.reset(node_rp)
print "node info" print "node info"
...@@ -150,12 +127,8 @@ def main(): ...@@ -150,12 +127,8 @@ def main():
ts('recvnode',name,'start',t=t) ts('recvnode',name,'start',t=t)
ts('recvnode',name,'stop') ts('recvnode',name,'stop')
phase_start_time = 0 phase_start_time = 0
except Exception, e: except cn.TimeoutException:
if phase_start_time == 0: break
phase_start_time = time.time()
(node,grant,info,flow,node_rp,name) = (None,None,None,None,None,None)
#print "Exception", e
pass
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
...@@ -196,11 +169,6 @@ def main(): ...@@ -196,11 +169,6 @@ def main():
traceback.print_exc() traceback.print_exc()
pass pass
pass pass
if master and resourcemanager and phase_start_time != 0 \
and (time.time() - phase_start_time) > PHASETIME:
print "Waited for phasetime %f, moving on" % (PHASETIME,)
break
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
......
...@@ -102,7 +102,7 @@ def main(): ...@@ -102,7 +102,7 @@ def main():
# Now receive node caps forever, and grant flows from us to everyone # Now receive node caps forever, and grant flows from us to everyone
# else, and vice versa. # else, and vice versa.
ts('recvnodes',0,'start') ts('recvnodes',0,'start')
phase_start_time = 0 first = True
while True: while True:
(node,grant,info,flow,name) = (None,None,None,None,None) (node,grant,info,flow,name) = (None,None,None,None,None)
# Only receive node caps if we've started receiving nodes and # Only receive node caps if we've started receiving nodes and
...@@ -112,7 +112,11 @@ def main(): ...@@ -112,7 +112,11 @@ def main():
try: try:
t = time.time() t = time.time()
print "rp0 recv" print "rp0 recv"
node = rp0.recv() if first:
node = rp0.recv_wait()
first = False
else:
node = rp0.recv_wait(timeout=RECVNODES_TIMEOUT)
print "node info" print "node info"
info = node.info() info = node.info()
# none of this is important since we don't do anything with the nodes # none of this is important since we don't do anything with the nodes
...@@ -125,29 +129,16 @@ def main(): ...@@ -125,29 +129,16 @@ def main():
print "Received new node: %s" % (str(info),) print "Received new node: %s" % (str(info),)
ts('recvnode',name,'start',t=t) ts('recvnode',name,'start',t=t)
ts('recvnode',name,'stop') ts('recvnode',name,'stop')
phase_start_time = 0 except cn.TimeoutException:
except Exception, e: break
(node,grant,info,flow,node_rp) = (None,None,None,None,None)
if phase_start_time == 0:
phase_start_time = time.time()
#print "Exception", e
pass
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
if len(nodes.keys()) > 1 and phase_start_time != 0 \
and (time.time() - phase_start_time) > PHASETIME:
print "Waited for phasetime %f, moving on" % (PHASETIME,)
break
time.sleep(SLEEPTIME)
pass pass
ts('recvnodes',0,'stop') ts('recvnodes',0,'stop')
ts('service-rp',0,'start') ts('service-rp',0,'start')
# The RP for the hadoop service # The RP for the hadoop service
service_rp = None
while not service_rp: while not service_rp:
try: try:
service_rp = broker.lookup("hadoop") service_rp = broker.lookup("hadoop")
...@@ -204,30 +195,23 @@ def main(): ...@@ -204,30 +195,23 @@ def main():
# pass # pass
ts('membrane-recv',0,'start') ts('membrane-recv',0,'start')
first = True
while True: while True:
# Now also try to recv caps back from the service wfa. Once # Now also try to recv caps back from the service wfa. Once
# len(caps_from_membrane) > 0, and we haven't seen a new cap for # len(caps_from_membrane) > 0, and we haven't seen a new cap for
# PHASETIME seconds, we cut the membrane and declare victory. # PHASETIME seconds, we cut the membrane and declare victory.
try: try:
cap = membrane_int.recv() if first:
cap = membrane_int.recv_wait()
first = False
else:
cap = membrane_int.recv_wait(timeout=RECVNODES_TIMEOUT)
print "Received a cap back from the hadoop service" print "Received a cap back from the hadoop service"
phase_start_time = 0
caps_from_membrane.append(cap) caps_from_membrane.append(cap)
except: except cn.TimeoutException:
if phase_start_time == 0: break
phase_start_time = time.time()
pass
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
if len(caps_from_membrane) > 0 and phase_start_time != 0 \
and (time.time() - phase_start_time) > PHASETIME:
print "Waited for phasetime %f, moving on" % (PHASETIME,)
break
time.sleep(SLEEPTIME)
pass
ts('membrane-recv',0,'stop') ts('membrane-recv',0,'stop')
# Ok, cut the membrane! # Ok, cut the membrane!
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment