main.py 31 KB
Newer Older
1
'''
2 3
Quick start
-----------
4

5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
The goal of NDB is to provide an easy access to RTNL info and entities via
Python objects, like `pyroute2.ndb.objects.interface` (see also:
:ref:`ndbinterfaces`), `pyroute2.ndb.objects.route` (see also:
:ref:`ndbroutes`) etc. These objects do not
only reflect the system state for the time of their instantiation, but
continuously monitor the system for relevant updates. The monitoring is
done via netlink notifications, thus no polling. Also the objects allow
to apply changes back to the system and rollback the changes.

On the other hand it's too expensive to create Python objects for all the
available RTNL entities, e.g. when there are hundreds of interfaces and
thousands of routes. Thus NDB creates objects only upon request, when
the user calls `.create()` to create new objects or runs
`ndb.<view>[selector]` (e.g. `ndb.interfaces['eth0']`) to access an
existing object.

To list existing RTNL entities NDB uses objects of the class `RecordSet`
that `yield` individual `Record` objects for every entity (see also:
:ref:`ndbreports`). An object of the `Record` class is immutable, doesn't
monitor any updates, doesn't contain any links to other objects and essentially
behaves like a simple named tuple.

.. aafig::
    :scale: 80
    :textual:


      +---------------------+
      |                     |
34
      |                     |
35 36
      | `NDB() instance`    |
      |                     |
37
      |                     |
38 39 40 41 42
      +---------------------+
                 |
                 |
        +-------------------+
      +-------------------+ |
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    +-------------------+ | |-----------+--------------------------+
    |                   | | |           |                          |
    |                   | | |           |                          |
    | `View()`          | | |           |                          |
    |                   | |-+           |                          |
    |                   |-+             |                          |
    +-------------------+               |                          |
                               +------------------+       +------------------+
                               |                  |       |                  |
                               |                  |       |                  |
                               | `.dump()`        |       | `.create()`      |
                               | `.summary()`     |       | `.__getitem__()` |
                               |                  |       |                  |
                               |                  |       |                  |
                               +------------------+       +------------------+
                                        |                           |
59 60
                                        |                           |
                                        v                           v
61 62 63 64 65 66 67 68 69 70
                              +-------------------+        +------------------+
                              |                   |      +------------------+ |
                              |                   |    +------------------+ | |
                              | `RecordSet()`     |    | `Interface()`    | | |
                              |                   |    | `Address()`      | | |
                              |                   |    | `Route()`        | | |
                              +-------------------+    | `Neighbour()`    | | |
                                        |              | `Rule()`         | |-+
                                        |              |  ...             |-+
                                        v              +------------------+
71 72 73
                                +-------------------+
                              +-------------------+ |
                            +-------------------+ | |
74 75 76 77 78 79 80 81 82 83 84 85
                            | `filter()`        | | |
                            | `select()`        | | |
                            | `transform()`     | | |
                            | `join()`          | |-+
                            |  ...              |-+
                            +-------------------+
                                        |
                                        v
                                +-------------------+
                              +-------------------+ |
                            +-------------------+ | |
                            |                   | | |
86
                            |                   | | |
87 88
                            | `Record()`        | | |
                            |                   | |-+
89 90 91 92 93 94 95 96 97 98 99 100
                            |                   |-+
                            +-------------------+

Here are some simple NDB usage examples. More info see in the reference
documentation below.

Print all the interface names on the system, assume we have an NDB
instance `ndb`::

    for interface in ndb.interfaces.dump():
        print(interface.ifname)

101
Print the routing information in the CSV format::
102

103 104
    for line in ndb.routes.summary().format('csv'):
        print(record)
105

Peter V. Saveliev's avatar
Peter V. Saveliev committed
106 107 108 109
.. note:: More on report filtering and formatting: :ref:`ndbreports`
.. note:: Since 0.5.11; versions 0.5.10 and earlier used
          syntax `summary(format='csv', match={...})`

110
Print IP addresses of interfaces in several network namespaces as::
111 112 113 114

    nslist = ['netns01',
              'netns02',
              'netns03']
Peter V. Saveliev's avatar
Peter V. Saveliev committed
115

116 117 118 119 120
    for nsname in nslist:
        ndb.sources.add(netns=nsname)

    for line in ndb.addresses.summary().format('json'):
        print(line)
121 122

Add an IP address on an interface::
Peter V. Saveliev's avatar
Peter V. Saveliev committed
123

124 125 126 127 128
    (ndb
     .interfaces['eth0']
     .add_ip('10.0.0.1/24')
     .commit())
    # ---> <---  NDB waits until the address actually
Peter V. Saveliev's avatar
Peter V. Saveliev committed
129

130
Change an interface property::
Peter V. Saveliev's avatar
Peter V. Saveliev committed
131

132 133 134 135 136 137 138 139 140 141 142 143 144
    (ndb
     .interfaces['eth0']
     .set('state', 'up')
     .set('address', '00:11:22:33:44:55')
     .commit()
    # ---> <---  NDB waits here for the changes to be applied

    # ... or with another syntax
    with ndb.interfaces['eth0'] as i:
        i['state'] = 'up'
        i['address'] = '00:11:22:33:44:55'
    # ---> <---  the commit() is called authomatically by
    #            the context manager's __exit__()
145

146
'''
147
import gc
Peter V. Saveliev's avatar
Peter V. Saveliev committed
148
import sys
Peter V. Saveliev's avatar
Peter V. Saveliev committed
149
import json
150
import time
151
import errno
152
import atexit
Peter V. Saveliev's avatar
Peter V. Saveliev committed
153
import sqlite3
Peter V. Saveliev's avatar
Peter V. Saveliev committed
154
import logging
155
import logging.handlers
Peter V. Saveliev's avatar
Peter V. Saveliev committed
156
import threading
157
import traceback
Peter V. Saveliev's avatar
Peter V. Saveliev committed
158 159
import ctypes
import ctypes.util
160
from functools import partial
161
from collections import OrderedDict
162
from pyroute2 import config
163
from pyroute2 import cli
Peter V. Saveliev's avatar
Peter V. Saveliev committed
164
from pyroute2.common import basestring
Peter V. Saveliev's avatar
Peter V. Saveliev committed
165
from pyroute2.ndb import schema
166
from pyroute2.ndb.events import (DBMExitException,
167
                                 ShutdownException,
168 169
                                 InvalidateHandlerException,
                                 RescheduleException)
170 171 172 173
from pyroute2.ndb.messages import (cmsg,
                                   cmsg_event,
                                   cmsg_failed,
                                   cmsg_sstart)
174
from pyroute2.ndb.source import Source
175 176
from pyroute2.ndb.auth_manager import check_auth
from pyroute2.ndb.auth_manager import AuthManager
Peter V. Saveliev's avatar
Peter V. Saveliev committed
177
from pyroute2.ndb.objects.interface import Interface
178
from pyroute2.ndb.objects.interface import Vlan
Peter V. Saveliev's avatar
Peter V. Saveliev committed
179 180 181
from pyroute2.ndb.objects.address import Address
from pyroute2.ndb.objects.route import Route
from pyroute2.ndb.objects.neighbour import Neighbour
182
from pyroute2.ndb.objects.rule import Rule
183
from pyroute2.ndb.objects.netns import NetNS
184
# from pyroute2.ndb.query import Query
185
from pyroute2.ndb.report import (RecordSet,
Peter V. Saveliev's avatar
Peter V. Saveliev committed
186
                                 Record)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
187 188 189 190 191
try:
    from urlparse import urlparse
except ImportError:
    from urllib.parse import urlparse

Peter V. Saveliev's avatar
Peter V. Saveliev committed
192 193 194
try:
    import queue
except ImportError:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
195
    import Queue as queue
Peter V. Saveliev's avatar
Peter V. Saveliev committed
196

197 198 199 200
try:
    import psycopg2
except ImportError:
    psycopg2 = None
Peter V. Saveliev's avatar
Peter V. Saveliev committed
201

202
log = logging.getLogger(__name__)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
203 204


Peter V. Saveliev's avatar
Peter V. Saveliev committed
205
def target_adapter(value):
206 207 208
    #
    # MPLS target adapter for SQLite3
    #
Peter V. Saveliev's avatar
Peter V. Saveliev committed
209 210 211
    return json.dumps(value)


212 213 214 215 216 217 218 219 220
class PostgreSQLAdapter(object):

    def __init__(self, obj):
        self.obj = obj

    def getquoted(self):
        return "'%s'" % json.dumps(self.obj)


Peter V. Saveliev's avatar
Peter V. Saveliev committed
221
sqlite3.register_adapter(list, target_adapter)
222 223 224 225
sqlite3.register_adapter(dict, target_adapter)
if psycopg2 is not None:
    psycopg2.extensions.register_adapter(list, PostgreSQLAdapter)
    psycopg2.extensions.register_adapter(dict, PostgreSQLAdapter)
226 227


228
class View(dict):
229
    '''
230
    The View() object returns RTNL objects on demand::
231

232 233 234 235 236
        ifobj1 = ndb.interfaces['eth0']
        ifobj2 = ndb.interfaces['eth0']
        # ifobj1 != ifobj2
    '''

Peter V. Saveliev's avatar
Peter V. Saveliev committed
237 238 239
    def __init__(self,
                 ndb,
                 table,
240
                 chain=None,
241 242
                 default_target='localhost',
                 auth_managers=None):
243
        self.ndb = ndb
244
        self.log = ndb.log.channel('view.%s' % table)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
245
        self.table = table
246
        self.event = table  # FIXME
Peter V. Saveliev's avatar
Peter V. Saveliev committed
247
        self.chain = chain
248
        self.cache = {}
249 250 251 252
        if auth_managers is None:
            auth_managers = []
        if chain:
            auth_managers += chain.auth_managers
253
        self.default_target = default_target
254
        self.auth_managers = auth_managers
Peter V. Saveliev's avatar
Peter V. Saveliev committed
255
        self.constraints = {}
256 257 258 259 260
        self.classes = OrderedDict()
        self.classes['interfaces'] = Interface
        self.classes['addresses'] = Address
        self.classes['neighbours'] = Neighbour
        self.classes['routes'] = Route
261
        self.classes['rules'] = Rule
262
        self.classes['netns'] = NetNS
263
        self.classes['vlans'] = Vlan
264

265 266 267 268 269 270
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass

271 272 273 274 275 276 277
    @property
    def context(self):
        if self.chain is not None:
            return self.chain.context
        else:
            return {}

278 279 280 281 282 283 284 285
    def getmany(self, spec, table=None):
        return self.ndb.schema.get(table or self.table, spec)

    def getone(self, spec, table=None):
        for obj in self.getmany(spec, table):
            return obj

    def get(self, spec, table=None):
286 287 288 289
        try:
            return self.__getitem__(spec, table)
        except KeyError:
            return None
Peter V. Saveliev's avatar
Peter V. Saveliev committed
290

291
    @cli.change_pointer
292
    def create(self, *argspec, **kwspec):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
293 294 295 296
        if self.chain:
            context = self.chain.context
        else:
            context = {}
297 298
        spec = (self
                .classes[self.table]
Peter V. Saveliev's avatar
Peter V. Saveliev committed
299
                .adjust_spec(kwspec or argspec[0], context))
Peter V. Saveliev's avatar
Peter V. Saveliev committed
300 301
        if self.chain:
            spec['ndb_chain'] = self.chain
Peter V. Saveliev's avatar
Peter V. Saveliev committed
302 303 304
        spec['create'] = True
        return self[spec]

305 306 307 308 309 310 311 312 313 314 315 316 317
    @cli.change_pointer
    def add(self, *argspec, **kwspec):
        self.log.warning('''\n
        The name add() will be removed in future releases, use create()
        instead. If you believe that the idea to rename is wrong, please
        file your opinion to the project's bugtracker.

        The reason behind the rename is not to confuse interfaces.add() with
        bridge and bond port operations, that don't create any new interfaces
        but work on existing ones.
        ''')
        return self.create(*argspec, **kwspec)

318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
    def wait(self, **spec):
        ret = None

        # install a limited events queue -- for a possible immediate reaction
        evq = queue.Queue(maxsize=100)

        def handler(evq, target, event):
            # ignore the "queue full" exception
            #
            # if we miss some events here, nothing bad happens: we just
            # load them from the DB after a timeout, falling back to
            # the DB polling
            #
            # the most important here is not to allocate too much memory
            try:
                evq.put_nowait((target, event))
            except queue.Full:
                pass
        #
        hdl = partial(handler, evq)
        (self
         .ndb
         .register_handler(self
                           .ndb
                           .schema
                           .classes[self.event], hdl))
        #
        try:
            ret = self.__getitem__(spec)
347 348 349 350
            for key in spec:
                if ret[key] != spec[key]:
                    ret = None
                    break
351
        except KeyError:
352
            ret = None
353 354 355 356 357 358 359

        while ret is None:
            try:
                target, msg = evq.get(timeout=1)
            except queue.Empty:
                try:
                    ret = self.__getitem__(spec)
360 361 362 363
                    for key in spec:
                        if ret[key] != spec[key]:
                            ret = None
                            raise KeyError()
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
                    break
                except KeyError:
                    continue

            #
            for key, value in spec.items():
                if key == 'target' and value != target:
                    break
                elif value not in (msg.get(key),
                                   msg.get_attr(msg.name2nla(key))):
                    break
            else:
                while ret is None:
                    try:
                        ret = self.__getitem__(spec)
                    except KeyError:
                        time.sleep(0.1)

        #
        (self
         .ndb
         .unregister_handler(self
                             .ndb
                             .schema
                             .classes[self.event], hdl))

        del evq
        del hdl
        gc.collect()
        return ret

395
    @check_auth('obj:read')
Peter V. Saveliev's avatar
Peter V. Saveliev committed
396
    def __getitem__(self, key, table=None):
397

Peter V. Saveliev's avatar
Peter V. Saveliev committed
398 399
        if self.chain:
            context = self.chain.context
Peter V. Saveliev's avatar
Peter V. Saveliev committed
400
        else:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
401 402 403 404 405
            context = {}
        iclass = self.classes[table or self.table]
        if isinstance(key, Record):
            key = key._as_dict()
        key = iclass.adjust_spec(key, context)
406 407 408 409 410
        ret = iclass(self,
                     key,
                     load=False,
                     master=self.chain,
                     auth_managers=self.auth_managers)
411 412 413 414 415 416

        # rtnl_object.key() returns a dcitionary that can not
        # be used as a cache key. Create here a tuple from it.
        # The key order guaranteed by the dictionary.
        cache_key = tuple(ret.key.items())

417 418
        rtime = time.time()

419
        # Iterate all the cache to remove unused and clean
420 421
        # (without any started transaction) objects.
        for ckey in tuple(self.cache):
422
            # Skip the current cache_key to avoid extra
423 424 425
            # cache del/add records in the logs
            if ckey == cache_key:
                continue
426
            # The number of referrers must be > 1, the first
427 428
            # one is the cache itself
            rcount = len(gc.get_referrers(self.cache[ckey]))
429 430
            # Remove only expired items
            expired = (rtime - self.cache[ckey].atime) > config.cache_expire
431
            # The number of changed rtnl_object fields must
432
            # be 0 which means that no transaction is started
433
            if rcount == 1 and self.cache[ckey].clean and expired:
434
                self.log.debug('cache del %s' % (ckey, ))
435
                self.cache.pop(ckey, None)
436

437 438 439 440 441 442 443 444 445 446 447 448
        if cache_key in self.cache:
            self.log.debug('cache hit %s' % (cache_key, ))
            # Explicitly get rid of the created object
            del ret
            # The object from the cache has already
            # registered callbacks, simply return it
            ret = self.cache[cache_key]
            ret.atime = rtime
            return ret
        else:
            # Cache only existing objects
            if ret.load_sql():
449 450 451
                self.log.debug('cache add %s' % (cache_key, ))
                self.cache[cache_key] = ret

452
        ret.register()
453
        return ret
454 455 456 457 458 459 460

    def __setitem__(self, key, value):
        raise NotImplementedError()

    def __delitem__(self, key):
        raise NotImplementedError()

461 462 463
    def __iter__(self):
        return self.keys()

464
    @check_auth('obj:list')
465
    def keys(self):
466 467 468
        for record in self.dump():
            yield record

469
    @check_auth('obj:list')
470 471 472
    def values(self):
        for key in self.keys():
            yield self[key]
473

474
    @check_auth('obj:list')
475
    def items(self):
476 477
        for key in self.keys():
            yield (key, self[key])
478

Peter V. Saveliev's avatar
Peter V. Saveliev committed
479
    @cli.show_result
480
    def count(self):
481 482 483 484
        return (self
                .ndb
                .schema
                .fetchone('SELECT count(*) FROM %s' % self.table))[0]
485 486 487 488

    def __len__(self):
        return self.count()

489
    def _keys(self, iclass):
490 491
        return (['target', 'tflags'] +
                self.ndb.schema.compiled[iclass.view or iclass.table]['names'])
492

Peter V. Saveliev's avatar
Peter V. Saveliev committed
493
    def _native(self, dump):
494 495 496
        fnames = next(dump)
        for record in dump:
            yield Record(fnames, record)
497

Peter V. Saveliev's avatar
Peter V. Saveliev committed
498
    @cli.show_result
499
    @check_auth('obj:list')
Peter V. Saveliev's avatar
Peter V. Saveliev committed
500 501
    def dump(self):
        iclass = self.classes[self.table]
502
        return RecordSet(self._native(iclass.dump(self)))
Peter V. Saveliev's avatar
Peter V. Saveliev committed
503

Peter V. Saveliev's avatar
Peter V. Saveliev committed
504
    @cli.show_result
505
    @check_auth('obj:list')
506
    def summary(self):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
507
        iclass = self.classes[self.table]
508
        return RecordSet(self._native(iclass.summary(self)))
509 510 511 512 513 514 515 516


class SourcesView(View):

    def __init__(self, ndb):
        super(SourcesView, self).__init__(ndb, 'sources')
        self.classes['sources'] = Source
        self.cache = {}
517 518 519 520 521 522
        self.lock = threading.Lock()

    def async_add(self, **spec):
        spec = dict(Source.defaults(spec))
        self.cache[spec['target']] = Source(self.ndb, **spec).start()
        return self.cache[spec['target']]
523

Peter V. Saveliev's avatar
Peter V. Saveliev committed
524
    def add(self, **spec):
525
        spec = dict(Source.defaults(spec))
526 527 528 529 530
        if 'event' not in spec:
            sync = True
            spec['event'] = threading.Event()
        else:
            sync = False
531
        self.cache[spec['target']] = Source(self.ndb, **spec).start()
532 533
        if sync:
            self.cache[spec['target']].event.wait()
534 535
        return self.cache[spec['target']]

536 537 538 539 540 541
    def remove(self, target, code=errno.ECONNRESET, sync=True):
        with self.lock:
            if target in self.cache:
                source = self.cache[target]
                source.close(code=code, sync=sync)
                return self.cache.pop(target)
542

543 544 545 546
    def keys(self):
        for key in self.cache:
            yield key

547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565
    def _keys(self, iclass):
        return ['target', 'kind']

    def wait(self, **spec):
        raise NotImplementedError()

    def _summary(self, *argv, **kwarg):
        return self._dump(*argv, **kwarg)

    def __getitem__(self, key, table=None):
        if isinstance(key, basestring):
            target = key
        elif isinstance(key, dict) and 'target' in key.keys():
            target = key['target']
        else:
            raise ValueError('key format not supported')

        return self.cache[target]

566

567
class Log(object):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
568

Peter V. Saveliev's avatar
Peter V. Saveliev committed
569
    def __init__(self, log_id=None):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
570
        self.logger = None
Peter V. Saveliev's avatar
Peter V. Saveliev committed
571 572 573
        self.state = False
        self.log_id = log_id or id(self)
        self.logger = logging.getLogger('pyroute2.ndb.%s' % self.log_id)
574
        self.main = self.channel('main')
Peter V. Saveliev's avatar
Peter V. Saveliev committed
575

576
    def __call__(self, target=None, level=logging.INFO):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
577 578 579
        if target is None:
            return self.logger is not None

Peter V. Saveliev's avatar
Peter V. Saveliev committed
580 581 582 583 584 585 586 587
        if self.logger is not None:
            for handler in tuple(self.logger.handlers):
                self.logger.removeHandler(handler)

        if target in ('off', False):
            if self.state:
                self.logger.setLevel(0)
                self.logger.addHandler(logging.NullHandler())
Peter V. Saveliev's avatar
Peter V. Saveliev committed
588 589
            return

590
        if target in ('on', 'stderr'):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
591
            handler = logging.StreamHandler()
592 593 594
        elif target == 'debug':
            handler = logging.StreamHandler()
            level = logging.DEBUG
Peter V. Saveliev's avatar
Peter V. Saveliev committed
595 596 597 598 599
        elif isinstance(target, basestring):
            url = urlparse(target)
            if not url.scheme and url.path:
                handler = logging.FileHandler(url.path)
            elif url.scheme == 'syslog':
Peter V. Saveliev's avatar
Peter V. Saveliev committed
600 601 602
                handler = (logging
                           .handlers
                           .SysLogHandler(address=url.netloc.split(':')))
Peter V. Saveliev's avatar
Peter V. Saveliev committed
603 604 605 606 607
            else:
                raise ValueError('logging scheme not supported')
        else:
            handler = target

608 609 610 611 612 613
        # set formatting only for new created logging handlers
        if handler is not target:
            fmt = '%(asctime)s %(levelname)8s %(name)s: %(message)s'
            formatter = logging.Formatter(fmt)
            handler.setFormatter(formatter)

Peter V. Saveliev's avatar
Peter V. Saveliev committed
614
        self.logger.addHandler(handler)
615
        self.logger.setLevel(level)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
616 617 618 619 620 621 622 623 624

    @property
    def on(self):
        self.__call__(target='on')

    @property
    def off(self):
        self.__call__(target='off')

Peter V. Saveliev's avatar
Peter V. Saveliev committed
625 626 627
    def channel(self, name):
        return logging.getLogger('pyroute2.ndb.%s.%s' % (self.log_id, name))

628 629 630 631 632 633 634 635 636 637 638 639 640 641
    def debug(self, *argv, **kwarg):
        return self.main.debug(*argv, **kwarg)

    def info(self, *argv, **kwarg):
        return self.main.info(*argv, **kwarg)

    def warning(self, *argv, **kwarg):
        return self.main.warning(*argv, **kwarg)

    def error(self, *argv, **kwarg):
        return self.main.error(*argv, **kwarg)

    def critical(self, *argv, **kwarg):
        return self.main.critical(*argv, **kwarg)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
642

643 644 645 646 647 648 649 650 651 652 653 654 655 656

class ReadOnly(object):

    def __init__(self, ndb):
        self.ndb = ndb

    def __enter__(self):
        self.ndb.schema.allow_write(False)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.ndb.schema.allow_write(True)


657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
class DeadEnd(object):

    def put(self, *argv, **kwarg):
        raise ShutdownException('shutdown in progress')


class EventQueue(object):

    def __init__(self, *argv, **kwarg):
        self._bypass = self._queue = queue.Queue(*argv, **kwarg)

    def put(self, *argv, **kwarg):
        return self._queue.put(*argv, **kwarg)

    def shutdown(self):
        self._queue = DeadEnd()

    def bypass(self, *argv, **kwarg):
        return self._bypass.put(*argv, **kwarg)

    def get(self, *argv, **kwarg):
        return self._bypass.get(*argv, **kwarg)

    def qsize(self):
        return self._bypass.qsize()


684 685 686 687 688 689 690
def Events(*argv):
    for sequence in argv:
        if sequence is not None:
            for item in sequence:
                yield item


691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710
class AuthProxy(object):

    def __init__(self, ndb, auth_managers):
        self._ndb = ndb
        self._auth_managers = auth_managers

        for spec in (('interfaces', 'localhost'),
                     ('addresses', 'localhost'),
                     ('routes', 'localhost'),
                     ('neighbours', 'localhost'),
                     ('rules', 'localhost'),
                     ('netns', 'nsmanager'),
                     ('vlans', 'localhost')):
            view = View(self._ndb,
                        spec[0],
                        default_target=spec[1],
                        auth_managers=self._auth_managers)
            setattr(self, spec[0], view)


Peter V. Saveliev's avatar
Peter V. Saveliev committed
711 712
class NDB(object):

Peter V. Saveliev's avatar
Peter V. Saveliev committed
713
    def __init__(self,
Peter V. Saveliev's avatar
Peter V. Saveliev committed
714
                 sources=None,
Peter V. Saveliev's avatar
Peter V. Saveliev committed
715 716
                 db_provider='sqlite3',
                 db_spec=':memory:',
717
                 rtnl_debug=False,
718
                 log=False,
Peter V. Saveliev's avatar
Peter V. Saveliev committed
719 720
                 auto_netns=False,
                 libc=None):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
721

722
        self.ctime = self.gctime = time.time()
723
        self.schema = None
724
        self.config = {}
Peter V. Saveliev's avatar
Peter V. Saveliev committed
725 726
        self.libc = libc or ctypes.CDLL(ctypes.util.find_library('c'),
                                        use_errno=True)
727
        self.log = Log(log_id=id(self))
728
        self.readonly = ReadOnly(self)
729
        self._auto_netns = auto_netns
730
        self._db = None
Peter V. Saveliev's avatar
Peter V. Saveliev committed
731
        self._dbm_thread = None
Peter V. Saveliev's avatar
Peter V. Saveliev committed
732
        self._dbm_ready = threading.Event()
Peter V. Saveliev's avatar
Peter V. Saveliev committed
733
        self._dbm_shutdown = threading.Event()
Peter V. Saveliev's avatar
Peter V. Saveliev committed
734
        self._global_lock = threading.Lock()
735
        self._event_map = None
736
        self._event_queue = EventQueue(maxsize=100)
737
        #
738
        if log:
739 740 741 742 743 744 745 746
            if isinstance(log, basestring):
                self.log(log)
            elif isinstance(log, (tuple, list)):
                self.log(*log)
            elif isinstance(log, dict):
                self.log(**log)
            else:
                raise TypeError('wrong log spec format')
Peter V. Saveliev's avatar
Peter V. Saveliev committed
747
        #
748
        # fix sources prime
Peter V. Saveliev's avatar
Peter V. Saveliev committed
749
        if sources is None:
750 751
            sources = [{'target': 'localhost',
                        'kind': 'local',
Peter V. Saveliev's avatar
Peter V. Saveliev committed
752 753
                        'nlm_generator': 1}]
            if sys.platform.startswith('linux'):
754
                sources.append({'target': 'nsmanager',
Peter V. Saveliev's avatar
Peter V. Saveliev committed
755
                                'kind': 'nsmanager'})
756 757 758 759 760
        elif not isinstance(sources, (list, tuple)):
            raise ValueError('sources format not supported')

        self.sources = SourcesView(self)
        self._nl = sources
761 762
        self._db_provider = db_provider
        self._db_spec = db_spec
763
        self._db_rtnl_log = rtnl_debug
764
        atexit.register(self.close)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
765
        self._dbm_ready.clear()
766
        self._dbm_autoload = set()
Peter V. Saveliev's avatar
Peter V. Saveliev committed
767 768
        self._dbm_thread = threading.Thread(target=self.__dbm__,
                                            name='NDB main loop')
769
        self._dbm_thread.setDaemon(True)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
770
        self._dbm_thread.start()
771
        self._dbm_ready.wait()
772 773 774
        for event in tuple(self._dbm_autoload):
            event.wait()
        self._dbm_autoload = None
775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
        am = AuthManager({'obj:list': True,
                          'obj:read': True,
                          'obj:modify': True},
                         self.log.channel('auth'))
        for spec in (('interfaces', 'localhost'),
                     ('addresses', 'localhost'),
                     ('routes', 'localhost'),
                     ('neighbours', 'localhost'),
                     ('rules', 'localhost'),
                     ('netns', 'nsmanager'),
                     ('vlans', 'localhost')):
            view = View(self,
                        spec[0],
                        default_target=spec[1],
                        auth_managers=[am])
            setattr(self, spec[0], view)
        # self.query = Query(self.schema)
792

Peter V. Saveliev's avatar
Peter V. Saveliev committed
793 794
    def _get_view(self, name, chain=None):
        return View(self, name, chain)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
795

Peter V. Saveliev's avatar
Peter V. Saveliev committed
796 797 798 799 800 801
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

802 803 804
    def auth_proxy(self, auth_manager):
        return AuthProxy(self, [auth_manager, ])

805 806 807 808
    def register_handler(self, event, handler):
        if event not in self._event_map:
            self._event_map[event] = []
        self._event_map[event].append(handler)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
809

Peter V. Saveliev's avatar
Peter V. Saveliev committed
810 811 812
    def unregister_handler(self, event, handler):
        self._event_map[event].remove(handler)

813
    def execute(self, *argv, **kwarg):
814
        return self.schema.execute(*argv, **kwarg)
815

816
    def close(self):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
817
        with self._global_lock:
Peter V. Saveliev's avatar
Peter V. Saveliev committed
818 819 820 821
            if self._dbm_shutdown.is_set():
                return
            else:
                self._dbm_shutdown.set()
822 823 824 825 826 827 828
            if hasattr(atexit, 'unregister'):
                atexit.unregister(self.close)
            else:
                try:
                    atexit._exithandlers.remove((self.close, (), {}))
                except ValueError:
                    pass
829 830
            # shutdown the _dbm_thread
            self._event_queue.shutdown()
831
            self._event_queue.bypass((cmsg(None, ShutdownException()), ))
832
            self._dbm_thread.join()
833

Peter V. Saveliev's avatar
Peter V. Saveliev committed
834
    def __dbm__(self):
Peter V. Saveliev's avatar
Peter V. Saveliev committed
835

836
        def default_handler(target, event):
837 838
            if isinstance(getattr(event, 'payload', None), Exception):
                raise event.payload
Peter V. Saveliev's avatar
Peter V. Saveliev committed
839
            log.warning('unsupported event ignored: %s' % type(event))
840

Peter V. Saveliev's avatar
Peter V. Saveliev committed
841 842 843 844 845 846
        def check_sources_started(self, _locals, target, event):
            _locals['countdown'] -= 1
            if _locals['countdown'] == 0:
                self._dbm_ready.set()

        _locals = {'countdown': len(self._nl)}
847

Peter V. Saveliev's avatar
Peter V. Saveliev committed
848
        # init the events map
849 850 851 852 853 854
        event_map = {cmsg_event: [lambda t, x: x.payload.set()],
                     cmsg_failed: [lambda t, x: (self
                                                 .schema
                                                 .mark(t, 1))],
                     cmsg_sstart: [partial(check_sources_started,
                                           self, _locals)]}
Peter V. Saveliev's avatar
Peter V. Saveliev committed
855 856
        self._event_map = event_map

Peter V. Saveliev's avatar
Peter V. Saveliev committed
857
        event_queue = self._event_queue
Peter V. Saveliev's avatar
Peter V. Saveliev committed
858

859 860 861 862 863
        if self._db_provider == 'sqlite3':
            self._db = sqlite3.connect(self._db_spec)
        elif self._db_provider == 'psycopg2':
            self._db = psycopg2.connect(**self._db_spec)

Peter V. Saveliev's avatar
Peter V. Saveliev committed
864 865 866 867 868
        self.schema = schema.init(self,
                                  self._db,
                                  self._db_provider,
                                  self._db_rtnl_log,
                                  id(threading.current_thread()))
869

870
        for spec in self._nl:
871
            spec['event'] = None
Peter V. Saveliev's avatar
Peter V. Saveliev committed
872
            self.sources.add(**spec)
873

874 875 876
        for (event, handlers) in self.schema.event_map.items():
            for handler in handlers:
                self.register_handler(event, handler)
Peter V. Saveliev's avatar
Peter V. Saveliev committed
877

878
        stop = False
879
        reschedule = []
880
        while not stop:
881 882
            events = Events(event_queue.get(), reschedule)
            reschedule = []
883 884 885 886 887 888
            try:
                for event in events:
                    handlers = event_map.get(event.__class__,
                                             [default_handler, ])
                    for handler in tuple(handlers):
                        try:
889
                            target = event['header']['target']
890
                            handler(target, event)
891 892 893 894 895 896 897 898 899
                        except RescheduleException:
                            if 'rcounter' not in event['header']:
                                event['header']['rcounter'] = 0
                            if event['header']['rcounter'] < 3:
                                event['header']['rcounter'] += 1
                                self.log.debug('reschedule %s' % (event, ))
                                reschedule.append(event)
                            else:
                                self.log.error('drop %s' % (event, ))
900
                        except InvalidateHandlerException:
901
                            try:
902
                                handlers.remove(handler)
903
                            except:
904 905 906
                                self.log.error('could not invalidate '
                                               'event handler:\n%s'
                                               % traceback.format_exc())
907
                        except ShutdownException:
908 909
                            stop = True
                            break
910 911 912
                        except DBMExitException:
                            return
                        except:
913 914
                            self.log.error('could not load event:\n%s\n%s'
                                           % (event, traceback.format_exc()))
915 916 917 918 919
                    if time.time() - self.gctime > config.gc_timeout:
                        self.gctime = time.time()
            except Exception as e:
                self.log.error('exception <%s> in source %s' % (e, target))
                # restart the target
920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936
                try:
                    self.sources[target].restart(reason=e)
                except KeyError:
                    pass

        # release all the sources
        for target in tuple(self.sources.cache):
            source = self.sources.remove(target, sync=False)
            if source is not None and source.th is not None:
                source.shutdown.set()
                source.th.join()
                self.log.debug('flush DB for the target %s' % target)
                self.schema.flush(target)

        # close the database
        self.schema.commit()
        self.schema.close()