service_tenant_hadoop_service_membrane.py 6.91 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#!/usr/bin/env python

##
## This is the service half of a simple example that uses a brokered
## service tenant to configure flows on behalf of some compute tenant.
## This is the master wfa for the service tenant.  It registers itself
## with the broker as the "hadoop" service, and each time it receives
## a capability to a node, it adds it to an all-pairs flow mesh.  The
## hadoop user tenant master wfa is in user_tenant_hadoop_service.py
##

import sys
import os
import time
#import argparse
#from pprint import pprint
from collections import namedtuple
import traceback

20
import itertools
21

22 23
import capnet as cn
import capnet.util
24

25
ME = "SERVICEHADOOPMEMBRANEWFA"
26
RECVNODES_TIMEOUT = 60.0
27

28 29 30 31 32
def log(msg):
    print msg
    sys.stdout.flush()
    pass

33 34 35
def ts(obj,id,op,t=None):
    if t is None:
        t = time.time()
36
    log("%s,TIMESTAMP,%s,%s,%s,%f" % (ME,str(obj),str(id),str(op),t,))
37 38
    pass

39
def main():
40 41
    log("args: %s" % (' '.join(sys.argv)))

42
    nodes = {}
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
    iface = sys.argv[1]
    broker_name = "hadoop"
    runhadoop = True
    extraargs = []
    if len(sys.argv) > 2:
        for arg in sys.argv[2:]:
            sa = arg.split('=')
            if len(sa) == 2:
                if sa[0] == 'broker_name':
                    broker_name = sa[1]
                    log("Using broker name %s" % (broker_name,))
                elif sa[0] == 'runhadoop':
                    if sa[0] in ('False','false','F','f','0','No','no','N','n'):
                        runhadoop = False
                        log("Will not configure hadoop, only flows!")
                else:
                    log("WARNING: unknown argument %s" % (arg,))
                    pass
            else:
                extraargs.append(arg)
                pass
            pass
        pass
    log("iface = %s, broker_name = %s" % (iface,broker_name))
67 68 69 70
    
    ts('wfa',0,'start')
    
    ts('proto',0,'start')
71
    p = cn.Protocol(iface)
72
    ts('proto',0,'stop')
73

74
    rp0, me, broker = capnet.util.recv_preamble(p)
75 76
    
    # The RP for the hadoop service
77
    ts('broker-rp',0,'start')
78
    service_rp = p.create(cn.RP)
79
    broker.register(broker_name, service_rp)
80
    log("Registered hadoop service")
81
    ts('broker-rp',0,'stop')
82

83
    #
84
    # First receive the membrane from the client
85
    #
86

87
    log("Waiting for membrane...")
88
    ts('membrane',0,'start')
89
    membrane_ext = service_rp.recv_wait()
90
    ts('membrane',0,'stop')
91 92
    assert membrane_ext.__class__ == cn.Membrane, "must receive membrane from service rp"

93 94 95 96 97 98 99
    # Get our "receive" membrane
    trans_rp_r = membrane_ext.recv_wait()
    assert trans_rp_r.__class__ == cn.RP

    # Get our "send" membrane
    trans_rp_s = membrane_ext.recv_wait()
    assert trans_rp_s.__class__ == cn.RP
100

101 102
    def trans_rp_recv_iter_debug_wrapper(it):
        for x in it:
103
            log("trans rp r iter got: %s" % repr(x))
104 105
            yield x

106
    ts('recvnodes',0,'start')
107
    _it = trans_rp_recv_iter_debug_wrapper(trans_rp_r.recv_iter())
108
    nodes = capnet.util.recv_nodes(p, _it)
109
    master = None
110
    resourcemanager = None 
David Johnson's avatar
David Johnson committed
111
    slaves = {}
112 113 114 115 116 117 118
    for (name, node) in nodes.iteritems():
        if name == 'master':
            master = node
        elif name == 'resourcemanager':
            resourcemanager = node 
        else:
            slaves[name] = node 
119
    ts('recvnodes',0,'stop')
120 121

    ts("allpairs",0,'start')
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    ts("master flows",0,'start')
    #for (a, b) in itertools.combinations(nodes.itervalues(), 2):
    for a in nodes.values():
        if a == master:
            continue
        a.grant.grant(master.flow)
        master.grant.grant(a.flow)
        pass
    ts("master flows",0,'stop')
    for a in nodes.values():
        if a == master:
            continue
        for b in nodes.values():
            if a == b or b == master:
                continue
            a.grant.grant(b.flow)
            b.grant.grant(a.flow)
            pass
        pass
141
    ts("allpairs",0,'stop')
142 143 144 145 146 147

    log("Establishing connectivity to the master")
    ts("wfactlchannel",0,'start')
    master.grant.grant(me.flow)
    ts("wfactlchannel",0,'stop')
    log("Finished establishing connectivity to the master")
148
    
149
    ts('hadoop-setup',0,'start')
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
    if runhadoop:
        log("Not setting up Hadoop, on command.")
    else:
        log("Assuming all nodes have been granted to us; setting up hadoop!")
        args = ""
        pargs = ""
        log("master = %s %s %s" % (master.info.name,master.info.ipv4,
                                   master.info.mac))
        args += " %s" % (master.info.ipv4,)
        log("resourcemanager = %s %s %s" % (resourcemanager.info.name,
                                            resourcemanager.info.ipv4,
                                            resourcemanager.info.mac))
        args += " %s" % (resourcemanager.info.ipv4,)
        # The IP addrs must be ordered.  We can get away with this because of
        # our naming scheme.
        for i in range(1,len(slaves)+1):
            slavename = "slave-%d" % (i,)
            slave = slaves[slavename]
            log("slave = %s %s %s" % (slave.info.name, slave.info.ipv4,
                                      slave.info.mac))
            args += " %s" % (slave.info.ipv4,)
            pargs += " -H %s" % (slave.info.ipv4,)
            pass

        user = 'ubuntu'

        # First, check to see that all slave nodes can be reached via ssh.
        checkcmd = "ssh -o StrictHostKeyChecking=no %s@%s parallel-ssh -i -O StrictHostKeyChecking=no -t 5 %s 'ls >& /dev/null'" % (user,master.info.ipv4,pargs)
        retries = 8
        success = False
        while retries > 0:
            status = os.system(checkcmd)
            if status == 0:
                success = True
                break
            else:
                log("WARNING: could not reach all slave nodes (trying %d more times)" % (retries-1,))
                time.sleep(5)
            retries -= 1
            pass
        if not success:
            log("ERROR: could not reach all slave nodes via ssh!")
            pass

        mastercmd = "/home/ubuntu/setup.sh"
        cmd = "ssh -o StrictHostKeyChecking=no %s@%s %s %s" % (user,master.info.ipv4,mastercmd,args)
        status = 1
        retries = 3
        retryinterval = 4
        i = 1
        while status != 0 and i < retries:
            log("Running '%s'... (%d)" % (cmd,i))
            i += 1
            status = os.system(cmd)
            if status == 0:
                break
            else:
                time.sleep(retryinterval)
            pass
        log("Finished '%s'... (%d, status %d)" % (cmd,i,status))
        ts('hadoop-setup',0,'stop')
211 212
        pass

213 214 215 216
    #
    # Ok, now that hadoop is done, send all the flow caps we created
    # back through the membrane, and exit.
    #
217
    log("master cap back through membrane...")
218

219
    ts('mastersendback',0,'start')
220
    trans_rp_s.send(master.grant);
221
    ts('mastersendback',0,'stop')
222
    
223
    ts('wfa',0,'stop')
224
    log("Finished setting up Hadoop and its network!")
225 226 227

if __name__ == "__main__":
    main()