All new accounts created on Gitlab now require administrator approval. If you invite any collaborators, please let Flux staff know so they can approve the accounts.

Commit 7e3d1ea6 authored by David Johnson's avatar David Johnson

Timestamp things; and improve the polling so the timestamps are better!

All recv() are polling right now... hopefully this is a good set of
timestamps.
parent d914fca1
Pipeline #1372 skipped
......@@ -21,29 +21,46 @@ import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
ME = "SERVICEHADOOPMEMBRANEWFA"
SLEEPTIME = 0.01
PHASETIME = 4.0
def ts(obj,id,op,t=None):
if t is None:
t = time.time()
print "%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,)
pass
def main():
nodes = {}
ts('wfa',0,'start')
ts('proto',0,'start')
p = cn.Protocol(sys.argv[1])
ts('proto',0,'stop')
i = 0
ts('rp0',0,'start')
while True:
try:
print "getting rp0 (try %d)" % (i,)
rp0 = p.rp0(timeout=10)
rp0 = p.rp0(timeout=1)
break
except:
print "ERROR: failed to get rp0 in 10 seconds; will keep trying!"
traceback.print_exc()
#print "ERROR: failed to get rp0 in 1 seconds; will keep trying!"
#traceback.print_exc()
pass
sys.stdout.flush()
sys.stderr.flush()
time.sleep(1)
time.sleep(SLEEPTIME)
i += 1
pass
ts('rp0',0,'stop')
me = None
myname = None
ts('self',0,'start')
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
......@@ -55,11 +72,14 @@ def main():
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
ts('self',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
ts('broker',0,'start')
print "rp0 recv broker"
broker = rp0.recv()
ts('broker',0,'stop')
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
......@@ -68,16 +88,20 @@ def main():
sys.stderr.flush()
# The RP for the hadoop service
ts('broker-rp',0,'start')
service_rp = p.create(cn.RP)
broker.register("hadoop",service_rp)
print "Registered hadoop service"
ts('broker-rp',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
#
# First receive our membrane.
#
ts('membrane',0,'start')
membrane_ext = None
print "Waiting for membrane..."
while True:
try:
membrane_ext = service_rp.recv()
......@@ -87,12 +111,13 @@ def main():
print "Received membrane, looking for nodes"
break
except Exception, e:
print "Exception: could not receive membrane",e
time.sleep(1)
#print "Exception: could not receive membrane",e
time.sleep(SLEEPTIME)
pass
sys.stdout.flush()
sys.stderr.flush()
pass
ts('membrane',0,'stop')
flow_caps = []
......@@ -105,12 +130,14 @@ def main():
master = None
resourcemanager = None
slaves = []
counter = 0
phase_start_time = 0
ts('recvnodes',0,'start')
while True:
(node,info,flow,name) = (None,None,None,None)
try:
print "service_rp recv"
t = time.time()
node = membrane_ext.recv()
print "membrane_ext recv"
print "node info"
info = node.info()
print "node flow"
......@@ -120,11 +147,14 @@ def main():
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
counter = 0
ts('recvnode',name,'start',t=t)
ts('recvnode',name,'stop')
phase_start_time = 0
except Exception, e:
counter += 1
if phase_start_time == 0:
phase_start_time = time.time()
(node,info,flow,node_rp,name) = (None,None,None,None,None)
print "Exception", e
#print "Exception", e
pass
sys.stdout.flush()
sys.stderr.flush()
......@@ -143,6 +173,7 @@ def main():
# hadoop needs if we don't have fine-grained flows.
try:
a = nodes[name]
ts("allpairs",a,'start')
for (bname,b) in nodes.iteritems():
if name == bname:
continue
......@@ -155,21 +186,26 @@ def main():
p.debug_grant(b.node,a.flow)
#b.rp.send(a.flow)
pass
ts("allpairs",a,'stop')
except:
print "ERROR: while adding node %s into all-pairs mesh:" % (name,)
traceback.print_exc()
#print "ERROR: while adding node %s into all-pairs mesh:" % (name,)
#traceback.print_exc()
pass
pass
if master and resourcemanager and counter > 59:
if master and resourcemanager and phase_start_time != 0 \
and (time.time() - phase_start_time) > PHASETIME:
print "Waited for phasetime %f, moving on" % (PHASETIME,)
break
sys.stdout.flush()
sys.stderr.flush()
time.sleep(1)
time.sleep(SLEEPTIME)
pass
ts('recvnodes',0,'stop')
ts('hadoop-setup',0,'start')
print "Assuming all nodes have been granted to us; setting up hadoop!"
args = ""
print "master = %s %s %s" % (master.info.name,master.info.ipv4,
......@@ -196,6 +232,7 @@ def main():
sys.stderr.flush()
os.system(cmd)
print "Finished '%s'..." % (cmd,)
ts('hadoop-setup',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
......@@ -203,6 +240,7 @@ def main():
# Ok, now that hadoop is done, send all the flow caps we created
# back through the membrane, and exit.
#
ts('flowsendback',0,'start')
print "Sending %d flow caps back through membrane..." % (len(flow_caps),)
sys.stdout.flush()
sys.stderr.flush()
......@@ -216,7 +254,12 @@ def main():
sys.stdout.flush()
sys.stderr.flush()
pass
ts('flowsendback',0,'stop')
print "Finished setting up Hadoop and its network!"
ts('wfa',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
pass
......@@ -20,11 +20,32 @@ import capnet.capnet as cn
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
ME = "USERHADOOPMEMBRANEWFA"
SLEEPTIME = 0.01
PHASETIME = 4.0
def ts(obj,id,op,t=None):
if t is None:
t = time.time()
print "%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,)
pass
def main():
ts('wfa',0,'start')
iface = sys.argv[1]
if len(sys.argv) > 2:
extraargs=sys.argv[2:]
else:
extraargs=[]
nodes = {}
ts('proto',0,'start')
p = cn.Protocol(sys.argv[1])
ts('proto',0,'stop')
i = 0
ts('rp0',0,'start')
while True:
try:
print "getting rp0 (try %d)" % (i,)
......@@ -34,13 +55,15 @@ def main():
print "ERROR: failed to get rp0 in 10 seconds; will keep trying!"
traceback.print_exc()
pass
time.sleep(1)
time.sleep(SLEEPTIME)
i += 1
pass
ts('rp0',0,'stop')
me = None
myname = None
ts('self',0,'start')
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
me = rp0.recv()
......@@ -52,27 +75,33 @@ def main():
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
ts('self',0,'stop')
ts('broker',0,'start')
print "rp0 recv broker"
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
pass
print "Received broker cap"
ts('broker',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
# The RP for the hadoop service
service_rp = None
# The Membrane we send caps through to the hadoop service
ts('membrane-create',0,'start')
print "setting up membrane"
membrane_int = p.create(cn.Membrane)
membrane_ext = membrane_int.external()
assert membrane_int.cptr != membrane_ext.cptr, "should be different cptrs"
ts('membrane-create',0,'stop')
caps_from_membrane = []
counter_one = 0
# Now receive node caps forever, and grant flows from us to everyone
# else, and vice versa.
ts('recvnodes',0,'start')
phase_start_time = 0
while True:
(node,info,flow,name) = (None,None,None,None)
# Only receive node caps if we've started receiving nodes and
......@@ -80,6 +109,7 @@ def main():
# receiving under the assumption that we've sent them all to the
# hadoop wfa through the membrane.
try:
t = time.time()
print "rp0 recv"
node = rp0.recv()
print "node info"
......@@ -91,105 +121,155 @@ def main():
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
counter_one = 0
ts('recvnode',name,'start',t=t)
ts('recvnode',name,'stop')
phase_start_time = 0
except Exception, e:
(node,info,flow,node_rp) = (None,None,None,None)
counter_one += 1
if phase_start_time == 0:
phase_start_time = time.time()
#print "Exception", e
pass
if node:
# Allow it to communicate with us, we'll want to talk to the
# master and maybe other nodes later.
try:
print "Sending flow ->%s to %s" % (myname,name)
p.debug_grant(node,nodes[myname].flow)
#a.rp.send(b.flow)
print "Sending flow ->%s to %s" % (name,myname)
p.debug_grant(me,flow)
#b.rp.send(a.flow)
except Exception, e:
print "Exception",e
pass
pass
if not service_rp:
try:
service_rp = broker.lookup("hadoop")
print "Found service hadoop, sending it our membrane"
service_rp.send(membrane_ext)
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
# Send all the nodes we've received so far!
for (name,entry) in nodes.iteritems():
# Except don't send our wfa!
if entry.node == me:
continue
else:
membrane_int.send(entry.node)
pass
pass
pass
except Exception, e:
print "Exception: ",e
pass
pass
elif node:
try:
print "Sending cap to %s to hadoop service" % (info.name,)
membrane_int.send(node)
print "Sent cap to %s to hadoop service" % (info.name,)
except Exception, e:
print "Exception: ",e
pass
pass
sys.stdout.flush()
sys.stderr.flush()
if len(nodes.keys()) > 1 and counter_one > 60:
if len(nodes.keys()) > 1 and phase_start_time != 0 \
and (time.time() - phase_start_time) > PHASETIME:
print "Waited for phasetime %f, moving on" % (PHASETIME,)
break
time.sleep(SLEEPTIME)
pass
ts('recvnodes',0,'stop')
ts('service-rp',0,'start')
# The RP for the hadoop service
service_rp = None
while not service_rp:
try:
service_rp = broker.lookup("hadoop")
print "Found service hadoop, sending it our membrane"
service_rp.send(membrane_ext)
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
break
except Exception, e:
#print "Exception: ",e
pass
time.sleep(SLEEPTIME)
pass
ts('service-rp',0,'stop')
ts('service-node-send',0,'start')
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
# Send all the nodes we've received so far!
for (name,entry) in nodes.iteritems():
# Except don't send our wfa!
if entry.node == me:
continue
else:
print "Sending cap to %s to hadoop service" % (name,)
membrane_int.send(entry.node)
print "Sent cap to %s to hadoop service" % (name,)
pass
pass
ts('service-node-send',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
ts('wait-membrane-drain',0,'start')
# Wait at least 0.25 second per node we sent, to ensure the membrane
# has been drained by the service wfa, OR wait 30 seconds -- the max.
waittime = max(0.25 * (len(nodes.keys()) - 1), 30)
print "Waiting for %f seconds for service wfa to drain the membrane..."
phase_start_time = time.time()
while (time.time() - phase_start_time) < waittime:
time.sleep(SLEEPTIME)
pass
print "Waited for phasetime %f, moving on" % (waittime,)
ts('wait-membrane-drain',0,'stop')
#while True:
# print "Just sleeping, not receiving back from membrane nor clearing it"
# time.sleep(1)
# pass
counter_two = 0
ts('membrane-recv',0,'start')
while True:
# Now also try to recv caps back from the service wfa. Once
# len(caps_from_membrane) > 0, and we haven't seen a new cap for
# 60 seconds, we cut the membrane and declare victory.
# PHASETIME seconds, we cut the membrane and declare victory.
try:
cap = membrane_int.recv()
print "Received a cap back from the hadoop service"
counter_two = 0
phase_start_time = 0
caps_from_membrane.append(cap)
except:
counter_two += 1
if phase_start_time == 0:
phase_start_time = time.time()
pass
sys.stdout.flush()
sys.stderr.flush()
if len(caps_from_membrane) > 0 and counter_two > 60:
if len(caps_from_membrane) > 0 and phase_start_time != 0 \
and (time.time() - phase_start_time) > PHASETIME:
print "Waited for phasetime %f, moving on" % (PHASETIME,)
break
time.sleep(1)
time.sleep(SLEEPTIME)
pass
ts('membrane-recv',0,'stop')
# Ok, cut the membrane!
print "Hadoop seems done; cutting membrane!"
membrane_int.clear()
print "Finished cutting membrane!"
## if node:
## # Allow it to communicate with us, we'll want to talk to the
## # master and maybe other nodes later.
## try:
## print "Sending flow ->%s to %s" % (myname,name)
## p.debug_grant(node,nodes[myname].flow)
## #a.rp.send(b.flow)
## print "Sending flow ->%s to %s" % (name,myname)
## p.debug_grant(me,flow)
## #b.rp.send(a.flow)
## except Exception, e:
## print "Exception",e
## pass
## pass
# Ok, at this point, we can run some hadoop job!
ts('hadoop-job-run',0,'start')
user = 'ubuntu'
mastercmd = "/home/ubuntu/hadoop-test-wordcount.sh"
if len(extraargs):
mastercmd += " " + " ".join(extraargs)
else:
# By default, do a size of 128 * nslaves * 1024 * 1024
size = (len(nodes.keys()) - 3) * 128 * 1024 * 1024
mastercmd += " %d" % (size,)
pass
master = nodes['master']
cmd = "ssh -o StrictHostKeyChecking=no %s@%s %s %s" % (user,master.info.ipv4,mastercmd,' '.join(extraargs))
print "Running '%s'..." % (cmd,)
os.system(cmd)
print "Finished '%s'..." % (cmd,)
ts('hadoop-job-run',0,'stop')
print "DONE"
ts('wfa',0,'stop')
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment