"""
NETCONF for APK Distributions server:
Module Management System
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import hashlib
from logging import getLogger
from lxml import etree
from netconf import error, nsmap_add, util
from taillight import Signal
from ncserver.base.util import _
MODULE_SET_CHANGE_SIGNAL = Signal(('base/ModuleManager', 'module_set_changed'))
"""Signal fired when the module set changes. This implies the
yang-library-change notification should be fired, when this is supported."""
QName = etree.QName # pylint: disable=I1101
def _import_compatible(existing, improps):
"""Determine if an imported module is compatible with the existing
module that has the same name/revision.
:param existing:
The dictionary of properties for the existing module.
:param improps:
The dictionary of properties for the candidate module.
:returns bool:
True if compatible, False otherwise.
"""
return (improps.keys() == existing.keys() and
all([existing[key] == improps[key] for key in existing.keys()]))
def _gen_modnode(parent, module):
"""Generate a yanglib:module node.
The generated node is in the NMDA format. With some massaging, it
could be used for the deprecated non-NMDA format.
:param parent:
The parent XML node.
:param module:
The module.
:returns:
The generated module node.
"""
modnode = util.subelm(parent, 'yanglib:module')
modnode.append(util.leaf_elm('yanglib:name', module.M_NAME))
modnode.append(util.leaf_elm('yanglib:revision', module.M_REVISION))
modnode.append(util.leaf_elm('yanglib:namespace', module.M_NS))
if getattr(module, 'M_FEATURES', None) is not None:
for feature in module.M_FEATURES:
modnode.append(util.leaf_elm('yanglib:feature', feature))
return modnode
def _gen_modnode_import(parent, nodename, imname, improps):
"""Generate a node for an import-only module.
:param parent:
The parent XML node.
:param nodename:
The name of the XML node to create. For NMDA library contents, use
'yanglib:import-only-module'; otherwise, use 'yanglib:module'.
:param imname:
The name of the imported module in M_IMPORTS syntax.
:param improps:
The M_IMPORTS property dictionary for this imported module.
:returns:
The generated module node.
"""
imn_name, imn_rev = imname.split('@')
modnode = util.subelm(parent, nodename)
modnode.append(util.leaf_elm('yanglib:name', imn_name))
modnode.append(util.leaf_elm('yanglib:revision', imn_rev))
modnode.append(util.leaf_elm('yanglib:namespace', improps['ns']))
return modnode
class ModuleManager:
"""Manages the modules for the NETCONF for APK Distributions server.
:ivar modules:
Dictionary of loaded modules.
:ivar library:
Dictionary of loaded YANG models.
"""
M_ABI_VERSION = 1
M_PREFIX = "yanglib"
M_NS = "urn:ietf:params:xml:ns:yang:ietf-yang-library"
M_NAME = "ietf-yang-library"
M_REVISION = "2019-01-04"
M_IMPORTS = {'ietf-yang-types@2013-07-15':
{'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types",
'prefix': "yang"},
'ietf-inet-types@2013-07-15':
{'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types",
'prefix': "inet"},
'ietf-datastores@2018-02-14':
{'ns': "urn:ietf:params:xml:ns:yang:ietf-datastores",
'prefix': "ds"}}
def __init__(self):
self.modules = {'ncserver.base.modman': self}
self.imports = dict(self.M_IMPORTS)
self.library = {'ncserver.base.modman':
'{}@{}'.format(self.M_NAME, self.M_REVISION)}
nsmap_add(self.M_PREFIX, self.M_NS)
self.logger = getLogger('base/ModuleManager')
def _module_for_ns(self, namespace):
"""Find a module that implements the specified namespace.
:param str namespace:
The namespace of the module to locate.
:returns:
The module that implements the namespace, or None.
"""
for mod in self.modules.values():
if mod.M_NS == namespace:
return mod
return None
def load_module(self, name):
"""Load a module.
:param str name:
The name of the module to load.
:returns:
Either the name of the module (if loaded), or None (on error).
"""
if name in self.modules:
self.logger.warning(_("Module '%s' is already loaded; skipping"),
name)
return name
self.logger.debug(_("Discovering module '%s'..."), name)
mod = None
try:
mod = __import__(name, globals(), locals(), [name], 0)
except ImportError:
self.logger.error(_("Module '%s' was not found."), name)
return None
if any([getattr(mod, attr, None) is None
for attr in ['M_ABI_VERSION', 'M_NS', 'M_PREFIX', 'M_NAME']]):
self.logger.error(_("'%s' is not a valid module."), name)
return None
if mod.M_ABI_VERSION != 1:
self.logger.error(_("Module '%s' requires ABI version %d."), name,
mod.M_ABI_VERSION)
return None
if self._module_for_ns(mod.M_NS) is not None:
self.logger.error(
_("Module '%s' implements duplicate namespace %s."), name,
mod.M_NS
)
if getattr(mod, 'M_IMPORTS', None) is not None:
for imname, improps in mod.M_IMPORTS.items():
if imname in self.imports:
if not _import_compatible(self.imports[imname], improps):
self.logger.error(_("Module '%s' has incompatible "
"imported module '%s'"), name,
imname)
return None
else:
self.imports[imname] = improps
self.logger.info(_("Loading module '%s' with ABI %d for namespace %s"),
name, mod.M_ABI_VERSION, mod.M_NS)
self.modules[name] = mod
self.library[name] = '{}@{}'.format(mod.M_NAME, mod.M_REVISION)
nsmap_add(mod.M_PREFIX, mod.M_NS)
MODULE_SET_CHANGE_SIGNAL.call(name, mod)
return name
def capabilities(self) -> list:
"""Determine the NETCONF capabilities for this server."""
capabs = []
def _cap_for_module(module):
base = '{ns}?module={mod}&revision={rev}'.format(
ns=module.M_NS, mod=module.M_NAME, rev=module.M_REVISION
)
if getattr(module, 'M_FEATURES', None) is not None:
return '{base}&features={feat}'.format(
base=base, feat=','.join(module.M_FEATURES)
)
return base
def _cap_for_import(imname, improp):
name, rev = imname.split('@')
return '{ns}?module={mod}&revision={rev}'.format(
ns=improp['ns'], mod=name, rev=rev
)
for imdata in self.imports.items():
capabs.append(_cap_for_import(*imdata))
for module in self.modules.values():
capabs.append(_cap_for_module(module))
capabs.append('{ns}?revision={rev}&module-set-id={id}'.format(
ns='urn:ietf:params:netconf:capability:yang-library:1.0',
rev='2019-01-04', id=self._module_set_id()
))
return capabs
def running(self, node):
"""Return running configuration information."""
# ietf-yang-library is state-only.
def _module_set_id(self):
"""Retrieve the module set ID for this server."""
module_set_id = hashlib.sha256(','.join(self.modules.keys()).encode())
return module_set_id.hexdigest()
def _gen_yang_library(self, node):
"""Generate yanglib:yang-library node.
:param node:
The XML node to append to.
"""
lib = util.subelm(node, 'yanglib:yang-library', nsmap={
'ds': "urn:ietf:params:xml:ns:yang:ietf-datastores"
})
modset = util.subelm(lib, 'yanglib:module-set')
modset.append(util.leaf_elm('yanglib:name', 'netconfapk'))
for module in self.modules.values():
_gen_modnode(modset, module)
for imname, improps in self.imports.items():
_gen_modnode_import(modset, 'yanglib:import-only-module', imname,
improps)
schema = util.subelm(lib, 'yanglib:schema')
schema.append(util.leaf_elm('yanglib:name', 'apkschema'))
schema.append(util.leaf_elm('yanglib:module-set', 'netconfapk'))
for store in ['ds:running', 'ds:operational']:
dsnode = util.subelm(lib, 'yanglib:datastore')
dsnode.append(util.leaf_elm('yanglib:name', store))
dsnode.append(util.leaf_elm('yanglib:schema', 'apkschema'))
lib.append(util.leaf_elm('yanglib:content-id', self._module_set_id()))
return lib
def operational(self, node):
"""Return configuration and device state information."""
# NDMA-enabled yang-library node.
self._gen_yang_library(node)
# Deprecated modules-state node.
modstate = util.subelm(node, 'yanglib:modules-state')
modstate.append(util.leaf_elm('yanglib:module-set-id',
self._module_set_id()))
for module in self.modules.values():
modnode = _gen_modnode(modstate, module)
modnode.append(util.leaf_elm('yanglib:conformance-type',
'implement'))
for imname, improps in self.imports.items():
modnode = _gen_modnode_import(modstate, 'yanglib:module', imname,
improps)
modnode.append(util.leaf_elm('yanglib:conformance-type', 'import'))
def collect_running(self, node):
"""Collect all available information for the <running> datastore."""
for mod in self.modules.values():
mod.running(node)
def collect_operational(self, node):
"""Collect all available information for the <operational> datastore"""
for mod in self.modules.values():
mod.operational(node)
def collect_edit(self, rpc, node, def_op):
"""Send off edit operations to the requested module(s)."""
for child in node:
namespace = QName(child.tag).namespace
module = self._module_for_ns(namespace)
if module is None:
raise error.UnknownNamespaceAppError(
rpc, child, error.RPCERR_TYPE_APPLICATION
)
if getattr(module, 'edit', None) is None:
raise error.OperationNotSupportedAppError(rpc)
self.logger.debug('Dispatching edit-config to %s', module.M_NAME)
module.edit(rpc, child, def_op)