""" NETCONF for APK Distributions server: Module Management System Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ import hashlib from logging import getLogger from lxml import etree from netconf import error, nsmap_add, util from taillight import Signal from ncserver.base.util import _ MODULE_SET_CHANGE_SIGNAL = Signal(('base/ModuleManager', 'module_set_changed')) """Signal fired when the module set changes. This implies the yang-library-change notification should be fired, when this is supported.""" QName = etree.QName # pylint: disable=I1101 def _import_compatible(existing, improps): """Determine if an imported module is compatible with the existing module that has the same name/revision. :param existing: The dictionary of properties for the existing module. :param improps: The dictionary of properties for the candidate module. :returns bool: True if compatible, False otherwise. """ return (improps.keys() == existing.keys() and all([existing[key] == improps[key] for key in existing.keys()])) def _gen_modnode(parent, module): """Generate a yanglib:module node. The generated node is in the NMDA format. With some massaging, it could be used for the deprecated non-NMDA format. :param parent: The parent XML node. :param module: The module. :returns: The generated module node. """ modnode = util.subelm(parent, 'yanglib:module') modnode.append(util.leaf_elm('yanglib:name', module.M_NAME)) modnode.append(util.leaf_elm('yanglib:revision', module.M_REVISION)) modnode.append(util.leaf_elm('yanglib:namespace', module.M_NS)) if getattr(module, 'M_FEATURES', None) is not None: for feature in module.M_FEATURES: modnode.append(util.leaf_elm('yanglib:feature', feature)) return modnode def _gen_modnode_import(parent, nodename, imname, improps): """Generate a node for an import-only module. :param parent: The parent XML node. :param nodename: The name of the XML node to create. For NMDA library contents, use 'yanglib:import-only-module'; otherwise, use 'yanglib:module'. :param imname: The name of the imported module in M_IMPORTS syntax. :param improps: The M_IMPORTS property dictionary for this imported module. :returns: The generated module node. """ imn_name, imn_rev = imname.split('@') modnode = util.subelm(parent, nodename) modnode.append(util.leaf_elm('yanglib:name', imn_name)) modnode.append(util.leaf_elm('yanglib:revision', imn_rev)) modnode.append(util.leaf_elm('yanglib:namespace', improps['ns'])) return modnode class ModuleManager: """Manages the modules for the NETCONF for APK Distributions server. :ivar modules: Dictionary of loaded modules. :ivar library: Dictionary of loaded YANG models. """ M_ABI_VERSION = 1 M_PREFIX = "yanglib" M_NS = "urn:ietf:params:xml:ns:yang:ietf-yang-library" M_NAME = "ietf-yang-library" M_REVISION = "2019-01-04" M_IMPORTS = {'ietf-yang-types@2013-07-15': {'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang"}, 'ietf-inet-types@2013-07-15': {'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types", 'prefix': "inet"}, 'ietf-datastores@2018-02-14': {'ns': "urn:ietf:params:xml:ns:yang:ietf-datastores", 'prefix': "ds"}} def __init__(self): self.modules = {'ncserver.base.modman': self} self.imports = dict(self.M_IMPORTS) self.library = {'ncserver.base.modman': '{}@{}'.format(self.M_NAME, self.M_REVISION)} nsmap_add(self.M_PREFIX, self.M_NS) self.logger = getLogger('base/ModuleManager') def _module_for_ns(self, namespace): """Find a module that implements the specified namespace. :param str namespace: The namespace of the module to locate. :returns: The module that implements the namespace, or None. """ for mod in self.modules.values(): if mod.M_NS == namespace: return mod return None def load_module(self, name): """Load a module. :param str name: The name of the module to load. :returns: Either the name of the module (if loaded), or None (on error). """ if name in self.modules: self.logger.warning(_("Module '%s' is already loaded; skipping"), name) return name self.logger.debug(_("Discovering module '%s'..."), name) mod = None try: mod = __import__(name, globals(), locals(), [name], 0) except ImportError: self.logger.error(_("Module '%s' was not found."), name) return None if any([getattr(mod, attr, None) is None for attr in ['M_ABI_VERSION', 'M_NS', 'M_PREFIX', 'M_NAME']]): self.logger.error(_("'%s' is not a valid module."), name) return None if mod.M_ABI_VERSION != 1: self.logger.error(_("Module '%s' requires ABI version %d."), name, mod.M_ABI_VERSION) return None if self._module_for_ns(mod.M_NS) is not None: self.logger.error( _("Module '%s' implements duplicate namespace %s."), name, mod.M_NS ) if getattr(mod, 'M_IMPORTS', None) is not None: for imname, improps in mod.M_IMPORTS.items(): if imname in self.imports: if not _import_compatible(self.imports[imname], improps): self.logger.error(_("Module '%s' has incompatible " "imported module '%s'"), name, imname) return None else: self.imports[imname] = improps self.logger.info(_("Loading module '%s' with ABI %d for namespace %s"), name, mod.M_ABI_VERSION, mod.M_NS) self.modules[name] = mod self.library[name] = '{}@{}'.format(mod.M_NAME, mod.M_REVISION) nsmap_add(mod.M_PREFIX, mod.M_NS) MODULE_SET_CHANGE_SIGNAL.call(name, mod) return name def capabilities(self) -> list: """Determine the NETCONF capabilities for this server.""" capabs = [] def _cap_for_module(module): base = '{ns}?module={mod}&revision={rev}'.format( ns=module.M_NS, mod=module.M_NAME, rev=module.M_REVISION ) if getattr(module, 'M_FEATURES', None) is not None: return '{base}&features={feat}'.format( base=base, feat=','.join(module.M_FEATURES) ) return base def _cap_for_import(imname, improp): name, rev = imname.split('@') return '{ns}?module={mod}&revision={rev}'.format( ns=improp['ns'], mod=name, rev=rev ) for imdata in self.imports.items(): capabs.append(_cap_for_import(*imdata)) for module in self.modules.values(): capabs.append(_cap_for_module(module)) capabs.append('{ns}?revision={rev}&module-set-id={id}'.format( ns='urn:ietf:params:netconf:capability:yang-library:1.0', rev='2019-01-04', id=self._module_set_id() )) return capabs def running(self, node): """Return running configuration information.""" # ietf-yang-library is state-only. def _module_set_id(self): """Retrieve the module set ID for this server.""" module_set_id = hashlib.sha256(','.join(self.modules.keys()).encode()) return module_set_id.hexdigest() def _gen_yang_library(self, node): """Generate yanglib:yang-library node. :param node: The XML node to append to. """ lib = util.subelm(node, 'yanglib:yang-library', nsmap={ 'ds': "urn:ietf:params:xml:ns:yang:ietf-datastores" }) modset = util.subelm(lib, 'yanglib:module-set') modset.append(util.leaf_elm('yanglib:name', 'netconfapk')) for module in self.modules.values(): _gen_modnode(modset, module) for imname, improps in self.imports.items(): _gen_modnode_import(modset, 'yanglib:import-only-module', imname, improps) schema = util.subelm(lib, 'yanglib:schema') schema.append(util.leaf_elm('yanglib:name', 'apkschema')) schema.append(util.leaf_elm('yanglib:module-set', 'netconfapk')) for store in ['ds:running', 'ds:operational']: dsnode = util.subelm(lib, 'yanglib:datastore') dsnode.append(util.leaf_elm('yanglib:name', store)) dsnode.append(util.leaf_elm('yanglib:schema', 'apkschema')) lib.append(util.leaf_elm('yanglib:content-id', self._module_set_id())) return lib def operational(self, node): """Return configuration and device state information.""" # NDMA-enabled yang-library node. self._gen_yang_library(node) # Deprecated modules-state node. modstate = util.subelm(node, 'yanglib:modules-state') modstate.append(util.leaf_elm('yanglib:module-set-id', self._module_set_id())) for module in self.modules.values(): modnode = _gen_modnode(modstate, module) modnode.append(util.leaf_elm('yanglib:conformance-type', 'implement')) for imname, improps in self.imports.items(): modnode = _gen_modnode_import(modstate, 'yanglib:module', imname, improps) modnode.append(util.leaf_elm('yanglib:conformance-type', 'import')) def collect_running(self, node): """Collect all available information for the datastore.""" for mod in self.modules.values(): mod.running(node) def collect_operational(self, node): """Collect all available information for the datastore""" for mod in self.modules.values(): mod.operational(node) def collect_edit(self, rpc, node, def_op): """Send off edit operations to the requested module(s).""" for child in node: namespace = QName(child.tag).namespace module = self._module_for_ns(namespace) if module is None: raise error.UnknownNamespaceAppError( rpc, child, error.RPCERR_TYPE_APPLICATION ) if getattr(module, 'edit', None) is None: raise error.OperationNotSupportedAppError(rpc) self.logger.debug('Dispatching edit-config to %s', module.M_NAME) module.edit(rpc, child, def_op)