Commit c02cf0b3 authored by David Johnson's avatar David Johnson

Ensure we flush all our log lines immediately.

parent 09d3699e
Pipeline #1852 passed with stage
in 2 seconds
......@@ -19,6 +19,11 @@ import traceback
import capnet.capnet as cn
def log(msg):
print msg
sys.stdout.flush()
pass
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
def main():
......@@ -28,11 +33,11 @@ def main():
i = 0
while True:
try:
print "getting rp0 (try %d)" % (i,)
log("getting rp0 (try %d)" % (i,))
rp0 = p.rp0()
break
except:
print "ERROR: failed to get rp0 in 5 seconds; will keep trying!"
log("ERROR: failed to get rp0 in 5 seconds; will keep trying!")
traceback.print_exc()
pass
i += 1
......@@ -42,28 +47,28 @@ def main():
myname = None
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
log("rp0 recv me")
me = rp0.recv()
print "node info"
log("node info")
info = me.info()
print "node flow"
log("node flow")
flow = me.flow()
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
log("Received self cap: %s" % (info.name,))
print "rp0 recv broker"
log("rp0 recv broker")
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
log("Second cap not a broker (was %s)" % (str(type(broker)),))
pass
print "Received broker cap"
log("Received broker cap")
# The RP for the hadoop service
service_rp = p.create(cn.RP)
broker.register("hadoop",service_rp)
print "Registered hadoop service"
log("Registered hadoop service")
# Now receive node caps until we have a master, a resourcemanager,
# and we haven't heard from the controller for 60 N-second cycles
......@@ -75,22 +80,22 @@ def main():
while True:
(node,info,flow,name) = (None,None,None,None)
try:
print "service_rp recv"
log("service_rp recv")
node = service_rp.recv()
print "node info"
log("node info")
info = node.info()
print "node flow"
log("node flow")
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
log("Received new node: %s" % (str(info),))
counter = 0
except Exception, e:
counter += 1
(node,info,flow,node_rp,name) = (None,None,None,None,None)
print "Exception", e
log("Exception %s" % (str(e),))
pass
if not name is None:
......@@ -108,16 +113,16 @@ def main():
try:
a = nodes[name]
for (bname,b) in nodes.iteritems():
print "Sending flow ->%s to %s" % (b.info.name,name)
log("Sending flow ->%s to %s" % (b.info.name,name))
p.debug_grant(a.node,b.flow)
#a.rp.send(b.flow)
print "Sending flow ->%s to %s" % (name,b.info.name)
log("Sending flow ->%s to %s" % (name,b.info.name))
p.debug_grant(b.node,a.flow)
#b.rp.send(a.flow)
pass
except:
print "ERROR: while adding node %s into all-pairs mesh:" % (name,)
log("ERROR: while adding node %s into all-pairs mesh:" % (name,))
traceback.print_exc()
pass
pass
......@@ -125,35 +130,30 @@ def main():
if master and resourcemanager and counter > 59:
break
sys.stdout.flush()
sys.stderr.flush()
time.sleep(1)
pass
print "Assuming all nodes have been granted to us; setting up hadoop!"
log("Assuming all nodes have been granted to us; setting up hadoop!")
args = ""
print "master = %s %s %s" % (master.info.name,master.info.ipv4,
master.info.mac)
log("master = %s %s %s" % (master.info.name,master.info.ipv4,
master.info.mac))
args += " %s" % (master.info.ipv4,)
print "resourcemanager = %s %s %s" % (resourcemanager.info.name,
resourcemanager.info.ipv4,
resourcemanager.info.mac)
log("resourcemanager = %s %s %s" % (resourcemanager.info.name,
resourcemanager.info.ipv4,
resourcemanager.info.mac))
args += " %s" % (resourcemanager.info.ipv4,)
for slave in slaves:
print "slave = %s %s %s" % (slave.info.name,slave.info.ipv4,
slave.info.mac)
log("slave = %s %s %s" % (slave.info.name,slave.info.ipv4,
slave.info.mac))
args += " %s" % (slave.info.ipv4,)
pass
sys.stdout.flush()
sys.stderr.flush()
user = 'ubuntu'
mastercmd = "/home/ubuntu/setup.sh"
cmd = "ssh -o StrictHostKeyChecking=no %s@%s %s %s" % (user,master.info.ipv4,mastercmd,args)
print "Running '%s'..." % (cmd,)
log("Running '%s'..." % (cmd,))
os.system(cmd)
print "Finished '%s'..." % (cmd,)
log("Finished '%s'..." % (cmd,))
pass
......@@ -25,10 +25,15 @@ import capnet.util
ME = "SERVICEHADOOPMEMBRANEWFA"
RECVNODES_TIMEOUT = 60.0
def log(msg):
print msg
sys.stdout.flush()
pass
def ts(obj,id,op,t=None):
if t is None:
t = time.time()
print "%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,)
log("%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,))
pass
def main():
......@@ -46,17 +51,14 @@ def main():
ts('broker-rp',0,'start')
service_rp = p.create(cn.RP)
broker.register("hadoop", service_rp)
print "Registered hadoop service"
log("Registered hadoop service")
ts('broker-rp',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
#
# First receive the membrane from the client
#
print "Waiting for membrane..."
log("Waiting for membrane...")
ts('membrane',0,'start')
membrane_ext = service_rp.recv_wait()
ts('membrane',0,'stop')
......@@ -72,7 +74,7 @@ def main():
def trans_rp_recv_iter_debug_wrapper(it):
for x in it:
print "trans rp r iter got:", repr(x)
log("trans rp r iter got: %s" % repr(x))
yield x
ts('recvnodes',0,'start')
......@@ -97,54 +99,56 @@ def main():
ts("allpairs",0,'stop')
ts('hadoop-setup',0,'start')
print "Assuming all nodes have been granted to us; setting up hadoop!"
log("Assuming all nodes have been granted to us; setting up hadoop!")
args = ""
print "master = %s %s %s" % (master.info.name,master.info.ipv4,
master.info.mac)
log("master = %s %s %s" % (master.info.name,master.info.ipv4,
master.info.mac))
args += " %s" % (master.info.ipv4,)
print "resourcemanager = %s %s %s" % (resourcemanager.info.name,
resourcemanager.info.ipv4,
resourcemanager.info.mac)
log("resourcemanager = %s %s %s" % (resourcemanager.info.name,
resourcemanager.info.ipv4,
resourcemanager.info.mac))
args += " %s" % (resourcemanager.info.ipv4,)
# The IP addrs must be ordered. We can get away with this because of
# our naming scheme.
for i in range(0,len(slaves)):
slavename = "slave%d" % (i,)
slave = slaves[slavename]
print "slave = %s %s %s" % (slave.info.name, slave.info.ipv4,
slave.info.mac)
log("slave = %s %s %s" % (slave.info.name, slave.info.ipv4,
slave.info.mac))
args += " %s" % (slave.info.ipv4,)
pass
sys.stdout.flush()
sys.stderr.flush()
user = 'ubuntu'
mastercmd = "/home/ubuntu/setup.sh"
cmd = "ssh -o StrictHostKeyChecking=no %s@%s %s %s" % (user,master.info.ipv4,mastercmd,args)
print "Running '%s'..." % (cmd,)
sys.stdout.flush()
sys.stderr.flush()
os.system(cmd)
print "Finished '%s'..." % (cmd,)
status = 1
retries = 3
retryinterval = 4
i = 1
while status != 0 and i < retries:
log("Running '%s'... (%d)" % (cmd,i))
i += 1
status = os.system(cmd)
if status == 0:
break
else:
time.sleep(retryinterval)
pass
log("Finished '%s'... (%d, status %d)" % (cmd,i,status))
ts('hadoop-setup',0,'stop')
sys.stdout.flush()
sys.stderr.flush()
#
# Ok, now that hadoop is done, send all the flow caps we created
# back through the membrane, and exit.
#
print "master cap back through membrane..."
sys.stdout.flush()
sys.stderr.flush()
log("master cap back through membrane...")
ts('mastersendback',0,'start')
trans_rp_s.send(master.grant);
ts('mastersendback',0,'stop')
ts('wfa',0,'stop')
print "Finished setting up Hadoop and its network!"
log("Finished setting up Hadoop and its network!")
if __name__ == "__main__":
main()
......@@ -18,6 +18,11 @@ import traceback
import capnet.capnet as cn
def log(msg):
print msg
sys.stdout.flush()
pass
Entry = namedtuple("Entry", ["node", "info", "rp", "flow"])
def main():
......@@ -27,11 +32,11 @@ def main():
i = 0
while True:
try:
print "getting rp0 (try %d)" % (i,)
log("getting rp0 (try %d)" % (i,))
rp0 = p.rp0()
break
except:
print "ERROR: failed to get rp0 in 5 seconds; will keep trying!"
log("ERROR: failed to get rp0 in 5 seconds; will keep trying!")
traceback.print_exc()
pass
i += 1
......@@ -41,23 +46,23 @@ def main():
myname = None
# The wfagent cap comes to us first, after rp0.
print "rp0 recv me"
log("rp0 recv me")
me = rp0.recv()
print "node info"
log("node info")
info = me.info()
print "node flow"
log("node flow")
flow = me.flow()
node_rp = p.create(cn.RP)
nodes[info.name] = Entry(me,info,node_rp,flow)
myname = info.name
print "Received self cap: %s" % (info.name,)
log("Received self cap: %s" % (info.name,))
print "rp0 recv broker"
log("rp0 recv broker")
broker = rp0.recv()
if type(broker) != cn.Broker:
print "Second cap not a broker (was %s)" % (str(type(broker)),)
log("Second cap not a broker (was %s)" % (str(type(broker)),))
pass
print "Received broker cap"
log("Received broker cap")
# The RP for the hadoop service
service_rp = None
......@@ -67,35 +72,35 @@ def main():
while True:
(node,info,flow,name) = (None,None,None,None)
try:
print "rp0 recv"
log("rp0 recv")
node = rp0.recv()
print "node info"
log("node info")
info = node.info()
print "node flow"
log("node flow")
flow = node.flow()
node_rp = p.create(cn.RP)
node.reset(node_rp)
name = info.name
nodes[name] = Entry(node,info,node_rp,flow)
print "Received new node: %s" % (str(info),)
log("Received new node: %s" % (str(info),))
except Exception, e:
(node,info,flow,node_rp) = (None,None,None,None)
print "Exception", e
log("Exception %s" % (str(e),))
pass
if node:
# Allow it to communicate with us, we'll want to talk to the
# master and maybe other nodes later.
try:
print "Sending flow ->%s to %s" % (myname,name)
log("Sending flow ->%s to %s" % (myname,name))
p.debug_grant(node,nodes[myname].flow)
#a.rp.send(b.flow)
print "Sending flow ->%s to %s" % (name,myname)
log("Sending flow ->%s to %s" % (name,myname))
p.debug_grant(me,flow)
#b.rp.send(a.flow)
except:
print "Exception",e
log("Exception %s" % (str(e),))
pass
pass
......@@ -103,8 +108,8 @@ def main():
try:
service_rp = broker.lookup("hadoop")
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
log("Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),))
# Send all the nodes we've received so far!
for (name,entry) in nodes.iteritems():
......@@ -117,21 +122,19 @@ def main():
pass
pass
except Exception, e:
print "Exception: ",e
log("Exception %s" % (str(e),))
pass
pass
elif node:
try:
print "Sending cap to %s to hadoop service" % (info.name,)
log("Sending cap to %s to hadoop service" % (info.name,))
service_rp.send(node)
print "Sent cap to %s to hadoop service" % (info.name,)
log("Sent cap to %s to hadoop service" % (info.name,))
except Exception, e:
print "Exception: ",e
log("Exception %s" % (str(e),))
pass
pass
sys.stdout.flush()
sys.stderr.flush()
time.sleep(2)
pass
......
......@@ -24,10 +24,15 @@ RECVNODES_TIMEOUT = 30.0
ME = "USERHADOOPMEMBRANEWFA"
def log(msg):
print msg
sys.stdout.flush()
pass
def ts(obj,id,op,t=None):
if t is None:
t = time.time()
print "%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,)
log("%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,))
def main():
ts('wfa',0,'start')
......@@ -45,7 +50,7 @@ def main():
rp0, me, broker = capnet.util.recv_preamble(p)
# The Membrane we send caps through to the hadoop service
print "setting up membrane"
log("setting up membrane")
ts('membrane-create',0,'start')
membrane_int = p.create(cn.Membrane)
membrane_ext = membrane_int.external()
......@@ -76,8 +81,7 @@ def main():
membrane_int.send(trans_rp_r)
ts('trans-rp',0,'stop')
print "Found service hadoop, sending it %d nodes" \
% (len(nodes.keys()),)
log("Found service hadoop, sending it %d nodes" % (len(nodes.keys()),))
ts('service-node-send',0,'start')
with trans_rp_s.send_iter_context():
......@@ -90,16 +94,16 @@ def main():
ts('trans-recv',0,'stop')
# Ok, cut the membrane!
print "Hadoop seems done; cutting membrane!"
log("Hadoop seems done; cutting membrane!")
ts('membrane-clear',0,'start')
membrane_int.clear()
ts('membrane-clear',0,'stop')
print "Finished cutting membrane!"
log("Finished cutting membrane!")
print "Establishing connectivity to the master"
log("Establishing connectivity to the master")
master_flow = master_grant.flow()
master_grant.grant(me.flow)
print "Finished establishing connectivity to the master"
log("Finished establishing connectivity to the master")
# Ok, at this point, we can run some hadoop job!
......@@ -115,14 +119,11 @@ def main():
master = nodes['master']
cmd = "ssh -o StrictHostKeyChecking=no %s@%s %s %s" % (user,master.info.ipv4,mastercmd,' '.join(extraargs))
print "Running '%s'..." % (cmd,)
log("Running '%s'..." % (cmd,))
os.system(cmd)
print "Finished '%s'..." % (cmd,)
log("Finished '%s'..." % (cmd,))
ts('hadoop-job-run',0,'stop')
ts('wfa',0,'stop')
print "DONE"
if __name__ == "__main__":
main()
log("DONE")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment