Commit 73aca117 authored by David Johnson's avatar David Johnson

Parameterize hadoop wfas; change master flow setup; check master flows.

First, we add key-val parameters.  broker_name=foo sets the broker name
to foo for registration and lookup (defaults to 'hadoop').
runhadoop=False (or other False equivalents) will force the wfas to skip
Hadoop config and run.

We also separate out master<->slave flow setup and put it at the
beginning of the allpairs flow mesh setup; this was for debugging.

Finally, we also try and retry a simple parallel ssh task from the
Hadoop master to all slaves, to "verify" and "wait" for them to all be
up and reachable.  This helps us catch problems like when a VM can't
fully boot because it failed to get its metadata, or whatever.
parent 4703e6c2
Pipeline #1916 passed with stage
in 2 seconds
......@@ -37,12 +37,38 @@ def ts(obj,id,op,t=None):
pass
def main():
log("args: %s" % (' '.join(sys.argv)))
nodes = {}
iface = sys.argv[1]
broker_name = "hadoop"
runhadoop = True
extraargs = []
if len(sys.argv) > 2:
for arg in sys.argv[2:]:
sa = arg.split('=')
if len(sa) == 2:
if sa[0] == 'broker_name':
broker_name = sa[1]
log("Using broker name %s" % (broker_name,))
elif sa[0] == 'runhadoop':
if sa[0] in ('False','false','F','f','0','No','no','N','n'):
runhadoop = False
log("Will not configure hadoop, only flows!")
else:
log("WARNING: unknown argument %s" % (arg,))
pass
else:
extraargs.append(arg)
pass
pass
pass
log("iface = %s, broker_name = %s" % (iface,broker_name))
ts('wfa',0,'start')
ts('proto',0,'start')
p = cn.Protocol(sys.argv[1])
p = cn.Protocol(iface)
ts('proto',0,'stop')
rp0, me, broker = capnet.util.recv_preamble(p)
......@@ -50,7 +76,7 @@ def main():
# The RP for the hadoop service
ts('broker-rp',0,'start')
service_rp = p.create(cn.RP)
broker.register("hadoop", service_rp)
broker.register(broker_name, service_rp)
log("Registered hadoop service")
ts('broker-rp',0,'stop')
......@@ -93,9 +119,25 @@ def main():
ts('recvnodes',0,'stop')
ts("allpairs",0,'start')
for (a, b) in itertools.combinations(nodes.itervalues(), 2):
ts("master flows",0,'start')
#for (a, b) in itertools.combinations(nodes.itervalues(), 2):
for a in nodes.values():
if a == master:
continue
a.grant.grant(master.flow)
master.grant.grant(a.flow)
pass
ts("master flows",0,'stop')
for a in nodes.values():
if a == master:
continue
for b in nodes.values():
if a == b or b == master:
continue
a.grant.grant(b.flow)
b.grant.grant(a.flow)
pass
pass
ts("allpairs",0,'stop')
log("Establishing connectivity to the master")
......@@ -105,8 +147,12 @@ def main():
log("Finished establishing connectivity to the master")
ts('hadoop-setup',0,'start')
if runhadoop:
log("Not setting up Hadoop, on command.")
else:
log("Assuming all nodes have been granted to us; setting up hadoop!")
args = ""
pargs = ""
log("master = %s %s %s" % (master.info.name,master.info.ipv4,
master.info.mac))
args += " %s" % (master.info.ipv4,)
......@@ -122,9 +168,29 @@ def main():
log("slave = %s %s %s" % (slave.info.name, slave.info.ipv4,
slave.info.mac))
args += " %s" % (slave.info.ipv4,)
pargs += " -H %s" % (slave.info.ipv4,)
pass
user = 'ubuntu'
# First, check to see that all slave nodes can be reached via ssh.
checkcmd = "ssh -o StrictHostKeyChecking=no %s@%s parallel-ssh -i -O StrictHostKeyChecking=no -t 5 %s 'ls >& /dev/null'" % (user,master.info.ipv4,pargs)
retries = 8
success = False
while retries > 0:
status = os.system(checkcmd)
if status == 0:
success = True
break
else:
log("WARNING: could not reach all slave nodes (trying %d more times)" % (retries-1,))
time.sleep(5)
retries -= 1
pass
if not success:
log("ERROR: could not reach all slave nodes via ssh!")
pass
mastercmd = "/home/ubuntu/setup.sh"
cmd = "ssh -o StrictHostKeyChecking=no %s@%s %s %s" % (user,master.info.ipv4,mastercmd,args)
status = 1
......@@ -142,6 +208,7 @@ def main():
pass
log("Finished '%s'... (%d, status %d)" % (cmd,i,status))
ts('hadoop-setup',0,'stop')
pass
#
# Ok, now that hadoop is done, send all the flow caps we created
......
......@@ -35,16 +35,37 @@ def ts(obj,id,op,t=None):
log("%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,))
def main():
ts('wfa',0,'start')
log("args: %s" % (' '.join(sys.argv)))
iface = sys.argv[1]
broker_name = "hadoop"
runhadoop = True
extraargs = []
if len(sys.argv) > 2:
extraargs=sys.argv[2:]
for arg in sys.argv[2:]:
sa = arg.split('=')
if len(sa) == 2:
if sa[0] == 'broker_name':
broker_name = sa[1]
log("Using broker name %s" % (broker_name,))
elif sa[0] == 'runhadoop':
if sa[0] in ('False','false','F','f','0','No','no','N','n'):
runhadoop = False
log("Will not run hadoop!")
else:
log("WARNING: unknown argument %s" % (arg,))
pass
else:
extraargs=[]
extraargs.append(arg)
pass
pass
pass
log("iface = %s, broker_name = %s" % (iface,broker_name))
ts('wfa',0,'start')
ts('proto',0,'start')
p = cn.Protocol(sys.argv[1])
p = cn.Protocol(iface)
ts('proto',0,'stop')
rp0, me, broker = capnet.util.recv_preamble(p)
......@@ -64,7 +85,7 @@ def main():
ts('recvnodes',0,'stop')
ts('service-rp',0,'start')
service_rp = broker.lookup_wait("hadoop")
service_rp = broker.lookup_wait(broker_name)
ts('service-rp',0,'stop')
ts('trans-rp',0,'start')
......@@ -91,6 +112,7 @@ def main():
ts('trans-recv',0,'start')
master_grant = trans_rp_r.recv_wait()
log("Received master_grant back from membrane: %s" % (str(master_grant)))
ts('trans-recv',0,'stop')
# Ok, cut the membrane!
......@@ -108,6 +130,9 @@ def main():
# Ok, at this point, we can run some hadoop job!
ts('hadoop-job-run',0,'start')
if not runhadoop:
log("Not running Hadoop, on command.")
else:
user = 'ubuntu'
mastercmd = "/home/ubuntu/hadoop-test-wordcount.sh"
if len(extraargs):
......@@ -122,6 +147,7 @@ def main():
log("Running '%s'..." % (cmd,))
os.system(cmd)
log("Finished '%s'..." % (cmd,))
pass
ts('hadoop-job-run',0,'stop')
ts('wfa',0,'stop')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment