source.py 13.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
'''

Local RTNL
----------

Local RTNL source is a simple `IPRoute` instance. By default NDB
starts with one local RTNL source names `localhost`::

    >>> ndb = NDB()
    >>> ndb.sources.details()
    {'kind': u'local', u'nlm_generator': 1, 'target': u'localhost'}
    >>> ndb.sources['localhost']
    [running] <IPRoute {'nlm_generator': 1}>

The `localhost` RTNL source starts an additional async cache thread.
The `nlm_generator` option means that instead of collections the
`IPRoute` object returns generators, so `IPRoute` responses will not
consume memory regardless of the RTNL objects number::

    >>> ndb.sources['localhost'].nl.link('dump')
    <generator object _match at 0x7fa444961e10>

See also: :ref:`iproute`

Network namespaces
------------------

There are two ways to connect additional sources to an NDB instance.
One is to specify sources when creating an NDB object::

    ndb = NDB(sources=[{'target': 'localhost'}, {'netns': 'test01'}])

Another way is to call `ndb.sources.add()` method::

    ndb.sources.add(netns='test01')

This syntax: `{target': 'localhost'}` and `{'netns': 'test01'}` is the
short form. The full form would be::

    {'target': 'localhost', # the label for the DB
     'kind': 'local',       # use IPRoute class to start the source
     'nlm_generator': 1}    #

    {'target': 'test01',    # the label
     'kind': 'netns',       # use NetNS class
     'netns': 'test01'}     #

See also: :ref:`netns`

Remote systems
--------------

It is possible also to connect to remote systems using SSH. In order to
use this kind of sources it is required to install the
`mitogen <https://github.com/dw/mitogen>`_ module. The `remote` kind
of sources uses the `RemoteIPRoute` class. The short form::

    ndb.sources.add(hostname='worker1.example.com')


In some more extended form::

    ndb.sources.add(**{'target': 'worker1.example.com',
                       'kind': 'remote',
                       'hostname': 'worker1.example.com',
                       'username': 'jenkins',
                       'check_host_keys': False})

See also: :ref:`remote`
'''
Peter V. Saveliev's avatar
Peter V. Saveliev committed
71
import sys
72
import time
73
import errno
74
import socket
75
import struct
76
import threading
77 78
from pyroute2 import IPRoute
from pyroute2 import RemoteIPRoute
Peter V. Saveliev's avatar
Peter V. Saveliev committed
79
from pyroute2.ndb.events import (ShutdownException,
Peter V. Saveliev's avatar
Peter V. Saveliev committed
80
                                 State)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
81
from pyroute2.ndb.messages import (cmsg_event,
82 83
                                   cmsg_failed,
                                   cmsg_sstart)
84 85
from pyroute2.netlink.nlsocket import NetlinkMixin
from pyroute2.netlink.exceptions import NetlinkError
Peter V. Saveliev's avatar
Peter V. Saveliev committed
86
if sys.platform.startswith('linux'):
87
    from pyroute2 import netns
Peter V. Saveliev's avatar
Peter V. Saveliev committed
88 89 90 91 92
    from pyroute2.netns.nslink import NetNS
    from pyroute2.netns.manager import NetNSManager
else:
    NetNS = None
    NetNSManager = None
93 94 95 96

SOURCE_FAIL_PAUSE = 5


97
class Source(dict):
98 99 100 101 102 103
    '''
    The RNTL source. The source that is used to init the object
    must comply to IPRoute API, must support the async_cache. If
    the source starts additional threads, they must be joined
    in the source.close()
    '''
104 105 106 107 108 109 110 111 112
    table_alias = 'src'
    dump = None
    dump_header = None
    summary = None
    summary_header = None
    view = None
    table = 'sources'
    vmap = {'local': IPRoute,
            'netns': NetNS,
Peter V. Saveliev's avatar
Peter V. Saveliev committed
113 114
            'remote': RemoteIPRoute,
            'nsmanager': NetNSManager}
115 116

    def __init__(self, ndb, **spec):
117 118
        self.th = None
        self.nl = None
119 120
        self.ndb = ndb
        self.evq = self.ndb._event_queue
121
        # the target id -- just in case
122
        self.target = spec['target']
Peter V. Saveliev's avatar
Peter V. Saveliev committed
123
        self.kind = spec.pop('kind', 'local')
124
        self.persistent = spec.pop('persistent', True)
125
        self.event = spec.pop('event')
126
        # RTNL API
Peter V. Saveliev's avatar
Peter V. Saveliev committed
127
        self.nl_prime = self.vmap[self.kind]
128
        self.nl_kwarg = spec
129 130 131
        #
        self.shutdown = threading.Event()
        self.started = threading.Event()
132
        self.lock = threading.RLock()
Peter V. Saveliev's avatar
Peter V. Saveliev committed
133
        self.shutdown_lock = threading.RLock()
134
        self.started.clear()
135
        self.log = ndb.log.channel('sources.%s' % self.target)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
136
        self.state = State(log=self.log)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
137
        self.state.set('init')
138 139 140
        self.ndb.schema.execute('''
                                INSERT INTO sources (f_target, f_kind)
                                VALUES (%s, %s)
141 142
                                ''' % (self.ndb.schema.plch,
                                       self.ndb.schema.plch),
Peter V. Saveliev's avatar
Peter V. Saveliev committed
143
                                (self.target, self.kind))
144 145 146
        for key, value in spec.items():
            vtype = 'int' if isinstance(value, int) else 'str'
            self.ndb.schema.execute('''
147 148
                                    INSERT INTO sources_options
                                    (f_target, f_name, f_type, f_value)
149
                                    VALUES (%s, %s, %s, %s)
150 151 152 153
                                    ''' % (self.ndb.schema.plch,
                                           self.ndb.schema.plch,
                                           self.ndb.schema.plch,
                                           self.ndb.schema.plch),
154 155 156 157
                                    (self.target, key, vtype, value))

        self.load_sql()

158 159 160 161 162 163 164 165 166 167 168
    @classmethod
    def defaults(cls, spec):
        ret = dict(spec)
        defaults = {}
        if 'hostname' in spec:
            defaults['kind'] = 'remote'
            defaults['protocol'] = 'ssh'
            defaults['target'] = spec['hostname']
        if 'netns' in spec:
            defaults['kind'] = 'netns'
            defaults['target'] = spec['netns']
169
            ret['netns'] = netns._get_netnspath(spec['netns'])
170 171 172 173 174
        for key in defaults:
            if key not in ret:
                ret[key] = defaults[key]
        return ret

175 176 177 178 179 180
    def __repr__(self):
        if isinstance(self.nl_prime, NetlinkMixin):
            name = self.nl_prime.__class__.__name__
        elif isinstance(self.nl_prime, type):
            name = self.nl_prime.__name__

Peter V. Saveliev's avatar
Peter V. Saveliev committed
181
        return '[%s] <%s %s>' % (self.state.get(), name, self.nl_kwarg)
182

183 184 185 186 187 188 189 190
    @classmethod
    def nla2name(cls, name):
        return name

    @classmethod
    def name2nla(cls, name):
        return name

191 192 193 194 195 196 197 198
    def api(self, name, *argv, **kwarg):
        for _ in range(100):  # FIXME make a constant
            with self.lock:
                try:
                    return getattr(self.nl, name)(*argv, **kwarg)
                except (NetlinkError,
                        AttributeError,
                        ValueError,
199
                        KeyError,
200 201
                        TypeError,
                        socket.error,
202
                        struct.error):
203 204 205
                    raise
                except Exception as e:
                    # probably the source is restarting
Peter V. Saveliev's avatar
Peter V. Saveliev committed
206
                    self.log.debug('source api error: %s' % e)
207 208 209 210 211 212 213 214 215 216
                    time.sleep(1)
        raise RuntimeError('api call failed')

    def receiver(self):
        #
        # The source thread routine -- get events from the
        # channel and forward them into the common event queue
        #
        # The routine exists on an event with error code == 104
        #
217 218
        stop = False
        while not stop:
219 220
            with self.lock:
                if self.shutdown.is_set():
221
                    break
222 223 224 225 226

                if self.nl is not None:
                    try:
                        self.nl.close(code=0)
                    except Exception as e:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
227
                        self.log.warning('source restart: %s' % e)
228
                try:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
229
                    self.state.set('connecting')
230
                    if isinstance(self.nl_prime, type):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
231 232 233 234 235
                        spec = {}
                        spec.update(self.nl_kwarg)
                        if self.kind in ('nsmanager', ):
                            spec['libc'] = self.ndb.libc
                        self.nl = self.nl_prime(**spec)
236 237
                    else:
                        raise TypeError('source channel not supported')
Peter V. Saveliev's avatar
Peter V. Saveliev committed
238
                    self.state.set('loading')
239 240 241 242 243
                    #
                    self.nl.bind(async_cache=True, clone_socket=True)
                    #
                    # Initial load -- enqueue the data
                    #
244
                    self.ndb.schema.allow_read(False)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
245
                    try:
246
                        self.ndb.schema.flush(self.target)
247
                        self.evq.put(self.nl.dump())
Peter V. Saveliev's avatar
Peter V. Saveliev committed
248
                    finally:
249
                        self.ndb.schema.allow_read(True)
250 251
                    self.started.set()
                    self.shutdown.clear()
Peter V. Saveliev's avatar
Peter V. Saveliev committed
252
                    self.state.set('running')
253
                    if self.event is not None:
254 255 256
                        self.evq.put((cmsg_event(self.target, self.event), ))
                    else:
                        self.evq.put((cmsg_sstart(self.target), ))
257 258
                except Exception as e:
                    self.started.set()
Peter V. Saveliev's avatar
Peter V. Saveliev committed
259
                    self.state.set('failed')
Peter V. Saveliev's avatar
Peter V. Saveliev committed
260
                    self.log.error('source error: %s %s' % (type(e), e))
Peter V. Saveliev's avatar
Peter V. Saveliev committed
261
                    try:
262
                        self.evq.put((cmsg_failed(self.target), ))
Peter V. Saveliev's avatar
Peter V. Saveliev committed
263 264 265
                    except ShutdownException:
                        stop = True
                        break
266
                    if self.persistent:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
267
                        self.log.debug('sleeping before restart')
268 269
                        self.shutdown.wait(SOURCE_FAIL_PAUSE)
                        if self.shutdown.is_set():
Peter V. Saveliev's avatar
Peter V. Saveliev committed
270
                            self.log.debug('source shutdown')
271 272
                            stop = True
                            break
273
                    else:
274
                        self.event.set()
275 276 277
                        return
                    continue

278
            while not stop:
279 280 281
                try:
                    msg = tuple(self.nl.get())
                except Exception as e:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
282
                    self.log.error('source error: %s %s' % (type(e), e))
283
                    msg = None
284 285 286 287 288 289 290 291 292 293 294
                    if not self.persistent:
                        stop = True
                    break

                code = 0
                if msg and msg[0]['header']['error']:
                    code = msg[0]['header']['error'].code

                if msg is None or code == errno.ECONNRESET:
                    stop = True
                    break
295

296
                self.ndb.schema._allow_write.wait()
297
                try:
298
                    self.evq.put(msg)
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
                except ShutdownException:
                    stop = True
                    break

        # thus we make sure that all the events from
        # this source are consumed by the main loop
        # in __dbm__() routine
        try:
            self.sync()
            self.log.debug('flush DB for the target')
            self.ndb.schema.flush(self.target)
        except ShutdownException:
            self.log.debug('shutdown handled by the main thread')
            pass
        self.state.set('stopped')

    def sync(self):
        self.log.debug('sync')
        sync = threading.Event()
318
        self.evq.put((cmsg_event(self.target, sync), ))
319
        sync.wait()
320 321 322 323 324 325

    def start(self):

        #
        # Start source thread
        with self.lock:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
326
            self.log.debug('starting the source')
327 328 329 330 331 332 333
            if (self.th is not None) and self.th.is_alive():
                raise RuntimeError('source is running')

            self.th = (threading
                       .Thread(target=self.receiver,
                               name='NDB event source: %s' % (self.target)))
            self.th.start()
334
            return self
335

336 337 338 339 340 341
    def close(self, code=errno.ECONNRESET, sync=True):
        with self.shutdown_lock:
            if self.shutdown.is_set():
                self.log.debug('already stopped')
                return
            self.log.info('source shutdown')
342 343 344
            self.shutdown.set()
            if self.nl is not None:
                try:
345
                    self.nl.close(code=code)
346
                except Exception as e:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
347
                    self.log.error('source close: %s' % e)
348 349 350 351 352 353
        if sync:
            if self.th is not None:
                self.th.join()
                self.th = None
            else:
                self.log.debug('receiver thread missing')
354

355
    def restart(self, reason='unknown'):
356
        with self.lock:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
357
            with self.shutdown_lock:
358
                self.log.debug('restarting the source, reason <%s>' % (reason))
Peter V. Saveliev's avatar
Peter V. Saveliev committed
359 360
                self.started.clear()
                self.ndb.schema.allow_read(False)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
361 362
                try:
                    self.close()
363 364
                    if self.th:
                        self.th.join()
Peter V. Saveliev's avatar
Peter V. Saveliev committed
365
                    self.shutdown.clear()
Peter V. Saveliev's avatar
Peter V. Saveliev committed
366 367
                    self.start()
                finally:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
368 369
                    self.ndb.schema.allow_read(True)
        self.started.wait()
370

371 372 373 374 375
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()
376 377 378 379 380 381 382 383 384 385

    def load_sql(self):
        #
        spec = self.ndb.schema.fetchone('''
                                        SELECT * FROM sources
                                        WHERE f_target = %s
                                        ''' % self.ndb.schema.plch,
                                        (self.target, ))
        self['target'], self['kind'] = spec
        for spec in self.ndb.schema.fetch('''
386
                                          SELECT * FROM sources_options
387 388 389 390 391
                                          WHERE f_target = %s
                                          ''' % self.ndb.schema.plch,
                                          (self.target, )):
            f_target, f_name, f_type, f_value = spec
            self[f_name] = int(f_value) if f_type == 'int' else f_value