summaryrefslogtreecommitdiff
path: root/ncserver/base/modman.py
blob: 742a890f1f254f475af92f9157885d3cfc3bd1f6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
"""
NETCONF for APK Distributions server:
    Module Management System

Copyright © 2020 Adélie Software in the Public Benefit, Inc.

Released under the terms of the NCSA license.  See the LICENSE file included
with this source distribution for more information.

SPDX-License-Identifier: NCSA
"""

import hashlib

from logging import getLogger
from lxml.etree import QName  # pylint: disable=E0611
from netconf import error, nsmap_add, util
from taillight import Signal

from ncserver.base.util import _


MODULE_SET_CHANGE_SIGNAL = Signal(('base/ModuleManager', 'module_set_changed'))
"""Signal fired when the module set changes.  This implies the
yang-library-change notification should be fired, when this is supported."""


def _import_compatible(existing, improps):
    """Determine if an imported module is compatible with the existing
    module that has the same name/revision.

    :param existing:
    The dictionary of properties for the existing module.

    :param improps:
    The dictionary of properties for the candidate module.

    :returns bool:
    True if compatible, False otherwise.
    """
    return (improps.keys() == existing.keys() and
            all([existing[key] == improps[key] for key in existing.keys()]))


def _gen_modnode(parent, module):
    """Generate a yanglib:module node.

    The generated node is in the NMDA format.  With some massaging, it
    could be used for the deprecated non-NMDA format.

    :param parent:
    The parent XML node.

    :param module:
    The module.

    :returns:
    The generated module node.
    """
    modnode = util.subelm(parent, 'yanglib:module')
    modnode.append(util.leaf_elm('yanglib:name', module.M_NAME))
    modnode.append(util.leaf_elm('yanglib:revision', module.M_REVISION))
    modnode.append(util.leaf_elm('yanglib:namespace', module.M_NS))
    if getattr(module, 'M_FEATURES', None) is not None:
        for feature in module.M_FEATURES:
            modnode.append(util.leaf_elm('yanglib:feature', feature))
    return modnode


def _gen_modnode_import(parent, nodename, imname, improps):
    """Generate a node for an import-only module.

    :param parent:
    The parent XML node.

    :param nodename:
    The name of the XML node to create.  For NMDA library contents, use
    'yanglib:import-only-module'; otherwise, use 'yanglib:module'.

    :param imname:
    The name of the imported module in M_IMPORTS syntax.

    :param improps:
    The M_IMPORTS property dictionary for this imported module.

    :returns:
    The generated module node.
    """
    imn_name, imn_rev = imname.split('@')
    modnode = util.subelm(parent, nodename)
    modnode.append(util.leaf_elm('yanglib:name', imn_name))
    modnode.append(util.leaf_elm('yanglib:revision', imn_rev))
    modnode.append(util.leaf_elm('yanglib:namespace', improps['ns']))
    return modnode


class ModuleManager:
    """Manages the modules for the NETCONF for APK Distributions server.

    :ivar modules:
        Dictionary of loaded modules.

    :ivar library:
        Dictionary of loaded YANG models.
    """

    M_ABI_VERSION = 1
    M_PREFIX = "yanglib"
    M_NS = "urn:ietf:params:xml:ns:yang:ietf-yang-library"
    M_NAME = "ietf-yang-library"
    M_REVISION = "2019-01-04"
    M_IMPORTS = {'ietf-yang-types@2013-07-15':
                 {'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types",
                  'prefix': "yang"},
                 'ietf-inet-types@2013-07-15':
                 {'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types",
                  'prefix': "inet"},
                 'ietf-datastores@2018-02-14':
                 {'ns': "urn:ietf:params:xml:ns:yang:ietf-datastores",
                  'prefix': "ds"}}

    def __init__(self):
        self.modules = {'ncserver.base.modman': self}
        self.imports = dict(self.M_IMPORTS)
        self.library = {'ncserver.base.modman':
                        '{}@{}'.format(self.M_NAME, self.M_REVISION)}
        self.rpcs = dict()
        nsmap_add(self.M_PREFIX, self.M_NS)
        self.logger = getLogger('base/ModuleManager')

    def _module_for_ns(self, namespace):
        """Find a module that implements the specified namespace.

        :param str namespace:
            The namespace of the module to locate.

        :returns:
            The module that implements the namespace, or None.
        """
        for mod in self.modules.values():
            if mod.M_NS == namespace:
                return mod

        return None

    def _augments_for_ns(self, namespace):
        """Find modules that augment the specified namespace.

        :param str namespace:
            The namespace of the model whose augments are desired.

        :returns:
            An Iterable of modules that augment the namespace.
            The Iterable may be empty.
        """
        augments = list()

        for mod in self.modules.values():
            if getattr(mod, 'M_AUGMENTS', None) is not None and \
               namespace in mod.M_AUGMENTS:
                augments.append(mod)

        return augments

    def load_module(self, name):
        """Load a module.

        :param str name:
            The name of the module to load.

        :returns:
            Either the name of the module (if loaded), or None (on error).
        """
        if name in self.modules:
            self.logger.warning(_("Module '%s' is already loaded; skipping"),
                                name)
            return name

        self.logger.debug(_("Discovering module '%s'..."), name)

        mod = None
        try:
            mod = __import__(name, globals(), locals(), [name], 0)
        except ImportError:
            self.logger.error(_("Module '%s' was not found."), name)
            return None

        if any([getattr(mod, attr, None) is None
                for attr in ['M_ABI_VERSION', 'M_NS', 'M_PREFIX', 'M_NAME']]):
            self.logger.error(_("'%s' is not a valid module."), name)
            return None

        if mod.M_ABI_VERSION != 1:
            self.logger.error(_("Module '%s' requires ABI version %d."), name,
                              mod.M_ABI_VERSION)
            return None

        if self._module_for_ns(mod.M_NS) is not None:
            self.logger.error(
                    _("Module '%s' implements duplicate namespace %s."), name,
                    mod.M_NS
            )

        if getattr(mod, 'M_IMPORTS', None) is not None:
            for imname, improps in mod.M_IMPORTS.items():
                if imname in self.imports:
                    if not _import_compatible(self.imports[imname], improps):
                        self.logger.error(_("Module '%s' has incompatible "
                                            "imported module '%s'"), name,
                                          imname)
                        return None
                else:
                    self.imports[imname] = improps

        if getattr(mod, 'M_RPCS', None) is not None:
            self.rpcs.update(mod.M_RPCS)

        self.logger.info(_("Loading module '%s' with ABI %d for namespace %s"),
                         name, mod.M_ABI_VERSION, mod.M_NS)

        self.modules[name] = mod
        self.library[name] = '{}@{}'.format(mod.M_NAME, mod.M_REVISION)
        nsmap_add(mod.M_PREFIX, mod.M_NS)
        MODULE_SET_CHANGE_SIGNAL.call(name, mod)
        return name

    def capabilities(self) -> list:
        """Determine the NETCONF capabilities for this server."""
        capabs = []

        def _cap_for_module(module):
            base = '{ns}?module={mod}&revision={rev}'.format(
                    ns=module.M_NS, mod=module.M_NAME, rev=module.M_REVISION
            )
            if getattr(module, 'M_FEATURES', None) is not None:
                return '{base}&features={feat}'.format(
                        base=base, feat=','.join(module.M_FEATURES)
                )
            return base

        def _cap_for_import(imname, improp):
            name, rev = imname.split('@')
            return '{ns}?module={mod}&revision={rev}'.format(
                    ns=improp['ns'], mod=name, rev=rev
            )

        for imdata in self.imports.items():
            capabs.append(_cap_for_import(*imdata))
        for module in self.modules.values():
            capabs.append(_cap_for_module(module))
        capabs.append('{ns}?revision={rev}&module-set-id={id}'.format(
                ns='urn:ietf:params:netconf:capability:yang-library:1.0',
                rev='2019-01-04', id=self._module_set_id()
        ))

        return capabs

    def has_rpc(self, name: str) -> bool:
        """Determine if an RPC is implemented by a module."""
        return name in self.rpcs

    def rpc(self, name: str):
        """Return an RPC that is implemented by a module."""
        return self.rpcs.get(name, None)

    def running(self, node):
        """Return running configuration information."""
        # ietf-yang-library is state-only.

    def _module_set_id(self):
        """Retrieve the module set ID for this server."""
        module_set_id = hashlib.sha256(','.join(self.modules.keys()).encode())
        return module_set_id.hexdigest()

    def _gen_yang_library(self, node):
        """Generate yanglib:yang-library node.

        :param node:
        The XML node to append to.
        """
        lib = util.subelm(node, 'yanglib:yang-library', nsmap={
            'ds': "urn:ietf:params:xml:ns:yang:ietf-datastores"
        })
        modset = util.subelm(lib, 'yanglib:module-set')
        modset.append(util.leaf_elm('yanglib:name', 'netconfapk'))
        for module in self.modules.values():
            _gen_modnode(modset, module)
        for imname, improps in self.imports.items():
            _gen_modnode_import(modset, 'yanglib:import-only-module', imname,
                                improps)
        schema = util.subelm(lib, 'yanglib:schema')
        schema.append(util.leaf_elm('yanglib:name', 'apkschema'))
        schema.append(util.leaf_elm('yanglib:module-set', 'netconfapk'))

        for store in ['ds:running', 'ds:operational']:
            dsnode = util.subelm(lib, 'yanglib:datastore')
            dsnode.append(util.leaf_elm('yanglib:name', store))
            dsnode.append(util.leaf_elm('yanglib:schema', 'apkschema'))

        lib.append(util.leaf_elm('yanglib:content-id', self._module_set_id()))
        return lib

    def operational(self, node):
        """Return configuration and device state information."""
        # NDMA-enabled yang-library node.
        self._gen_yang_library(node)

        # Deprecated modules-state node.
        modstate = util.subelm(node, 'yanglib:modules-state')
        modstate.append(util.leaf_elm('yanglib:module-set-id',
                        self._module_set_id()))
        for module in self.modules.values():
            modnode = _gen_modnode(modstate, module)
            modnode.append(util.leaf_elm('yanglib:conformance-type',
                                         'implement'))
        for imname, improps in self.imports.items():
            modnode = _gen_modnode_import(modstate, 'yanglib:module', imname,
                                          improps)
            modnode.append(util.leaf_elm('yanglib:conformance-type', 'import'))

    def collect_running(self, node):
        """Collect all available information for the <running> datastore."""
        for mod in self.modules.values():
            mod.running(node)

    def collect_operational(self, node):
        """Collect all available information for the <operational> datastore"""
        for mod in self.modules.values():
            mod.operational(node)

    def collect_edit(self, session, rpc, node, def_op):
        """Send off edit operations to the requested module(s)."""
        for child in node:
            namespace = QName(child.tag).namespace
            module = self._module_for_ns(namespace)
            if module is None:
                raise error.UnknownNamespaceAppError(
                        rpc, child, error.RPCERR_TYPE_APPLICATION
                )
            if getattr(module, 'edit', None) is None:
                raise error.OperationNotSupportedAppError(rpc)
            self.logger.debug('Dispatching edit-config to %s', module.M_NAME)
            module.edit(session, rpc, child, def_op)
            for augment in self._augments_for_ns(namespace):
                try:
                    self.logger.debug(
                        'Augment: Dispatching edit-config for %s to %s',
                        namespace, augment.M_NAME
                    )
                    augment.edit(session, rpc, child, def_op)
                except error.OperationNotSupportedAppError:
                    continue


MODMAN = ModuleManager()
"""The global module manager instance."""