""" NETCONF for APK Distributions server: Module Management System Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ import hashlib from logging import getLogger from netconf import nsmap_add, util from taillight import Signal from ncserver.base.util import _ MODULE_SET_CHANGE_SIGNAL = Signal(('base/ModuleManager', 'module_set_changed')) """Signal fired when the module set changes. This implies the yang-library-change notification should be fired, when this is supported.""" def _import_compatible(existing, improps): """Determine if an imported module is compatible with the existing module that has the same name/revision. :param existing: The dictionary of properties for the existing module. :param improps: The dictionary of properties for the candidate module. :returns bool: True if compatible, False otherwise. """ return (improps.keys() == existing.keys() and all([existing[key] == improps[key] for key in existing.keys()])) def _gen_modnode(parent, module): """Generate a yanglib:module node. The generated node is in the NMDA format. With some massaging, it could be used for the deprecated non-NMDA format. :param parent: The parent XML node. :param module: The module. :returns: The generated module node. """ modnode = util.subelm(parent, 'yanglib:module') modnode.append(util.leaf_elm('yanglib:name', module.M_NAME)) modnode.append(util.leaf_elm('yanglib:revision', module.M_REVISION)) modnode.append(util.leaf_elm('yanglib:namespace', module.M_NS)) if getattr(module, 'M_FEATURES', None) is not None: for feature in module.M_FEATURES: modnode.append(util.leaf_elm('yanglib:feature', feature)) return modnode def _gen_modnode_import(parent, nodename, imname, improps): """Generate a node for an import-only module. :param parent: The parent XML node. :param nodename: The name of the XML node to create. For NMDA library contents, use 'yanglib:import-only-module'; otherwise, use 'yanglib:module'. :param imname: The name of the imported module in M_IMPORTS syntax. :param improps: The M_IMPORTS property dictionary for this imported module. :returns: The generated module node. """ imn_name, imn_rev = imname.split('@') modnode = util.subelm(parent, nodename) modnode.append(util.leaf_elm('yanglib:name', imn_name)) modnode.append(util.leaf_elm('yanglib:revision', imn_rev)) modnode.append(util.leaf_elm('yanglib:namespace', improps['ns'])) return modnode class ModuleManager: """Manages the modules for the NETCONF for APK Distributions server. :ivar modules: Dictionary of loaded modules. :ivar library: Dictionary of loaded YANG models. """ M_ABI_VERSION = 1 M_PREFIX = "yanglib" M_NS = "urn:ietf:params:xml:ns:yang:ietf-yang-library" M_NAME = "ietf-yang-library" M_REVISION = "2019-01-04" M_IMPORTS = {'ietf-yang-types@2013-07-15': {'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang"}, 'ietf-inet-types@2013-07-16': {'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types", 'prefix': "inet"}} def __init__(self): self.modules = {'ncserver.base.modman': self} self.imports = dict(self.M_IMPORTS) self.library = {'ncserver.base.modman': '{}@{}'.format(self.M_NAME, self.M_REVISION)} nsmap_add(self.M_PREFIX, self.M_NS) self.logger = getLogger('base/ModuleManager') def load_module(self, name): """Load a module. :param str name: The name of the module to load. :returns: Either the name of the module (if loaded), or None (on error). """ if name in self.modules: self.logger.warning(_("Module '%s' is already loaded; skipping"), name) return name self.logger.debug(_("Discovering module '%s'..."), name) mod = None try: mod = __import__(name, globals(), locals(), [name], 0) except ImportError: self.logger.error(_("Module '%s' was not found."), name) return None if any([getattr(mod, attr, None) is None for attr in ['M_ABI_VERSION', 'M_NS', 'M_PREFIX', 'M_NAME']]): self.logger.error(_("'%s' is not a valid module."), name) return None if mod.M_ABI_VERSION != 1: self.logger.error(_("Module '%s' requires ABI version %d."), name, mod.M_ABI_VERSION) return None if getattr(mod, 'M_IMPORTS', None) is not None: for imname, improps in mod.M_IMPORTS: if imname in self.imports: if not _import_compatible(self.imports[imname], improps): self.logger.error(_("Module '%s' has incompatible " "imported module '%s'"), name, imname) return None else: self.imports[imname] = improps self.logger.info(_("Loading module '%s' with ABI %d for namespace %s"), name, mod.M_ABI_VERSION, mod.M_NS) self.modules[name] = mod self.library[name] = '{}@{}'.format(mod.M_NAME, mod.M_REVISION) nsmap_add(mod.M_PREFIX, mod.M_NS) MODULE_SET_CHANGE_SIGNAL.call(name, mod) return name def running(self, node): """Return running configuration information.""" # ietf-yang-library is state-only. def _gen_yang_library(self, node, content_id): """Generate yanglib:yang-library node. :param node: The XML node to append to. :param content_id: The unique Content ID for this library. """ lib = util.subelm(node, 'yanglib:yang-library') modset = util.subelm(lib, 'yanglib:module-set') modset.append(util.leaf_elm('yanglib:name', 'netconfapk')) for module in self.modules.values(): _gen_modnode(modset, module) for imname, improps in self.imports.items(): _gen_modnode_import(modset, 'yanglib:import-only-module', imname, improps) schema = util.subelm(lib, 'yanglib:schema') schema.append(util.leaf_elm('yanglib:name', 'apkschema')) schema.append(util.leaf_elm('yanglib:module-set', 'netconfapk')) for store in ['running', 'operational']: dsnode = util.subelm(lib, 'yanglib:datastore') dsnode.append(util.leaf_elm('name', store)) dsnode.append(util.leaf_elm('schema', 'apkschema')) lib.append(util.leaf_elm('yanglib:content-id', content_id)) return lib def operational(self, node): """Return configuration and device state information.""" module_set_id = hashlib.sha256(','.join(self.modules.keys()).encode()) content_id = module_set_id.hexdigest() # NDMA-enabled yang-library node. self._gen_yang_library(node, content_id) # Deprecated modules-state node. modstate = util.subelm(node, 'yanglib:modules-state') modstate.append(util.leaf_elm('yanglib:module-set-id', content_id)) for module in self.modules.values(): modnode = _gen_modnode(modstate, module) modnode.append(util.leaf_elm('yanglib:conformance-type', 'implement')) for imname, improps in self.imports.items(): modnode = _gen_modnode_import(modstate, 'yanglib:module', imname, improps) modnode.append(util.leaf_elm('yanglib:conformance-type', 'import')) def collect_running(self, node): """Collect all available information for the datastore.""" for mod in self.modules.values(): mod.running(node) def collect_operational(self, node): """Collect all available information for the datastore""" for mod in self.modules.values(): mod.operational(node)