"""
NETCONF for APK Distributions server:
Module Management System
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import hashlib
from logging import getLogger
from netconf import nsmap_add, util
from taillight import Signal
from ncserver.base.util import _
MODULE_SET_CHANGE_SIGNAL = Signal(('base/ModuleManager', 'module_set_changed'))
"""Signal fired when the module set changes. This implies the
yang-library-change notification should be fired, when this is supported."""
class ModuleManager:
"""Manages the modules for the NETCONF for APK Distributions server.
:ivar modules:
Dictionary of loaded modules.
:ivar library:
Dictionary of loaded YANG models.
"""
M_ABI_VERSION = 1
M_PREFIX = "yanglib"
M_NS = "urn:ietf:params:xml:ns:yang:ietf-yang-library"
M_NAME = "ietf-yang-library"
M_REVISION = "2019-01-04"
def __init__(self):
self.modules = {'ncserver.base.modman': self}
self.library = {'ncserver.base.modman':
'{}@{}'.format(self.M_NAME, self.M_REVISION)}
nsmap_add(self.M_PREFIX, self.M_NS)
self.logger = getLogger('base/ModuleManager')
def load_module(self, name):
"""Load a module.
:param str name:
The name of the module to load.
:returns:
Either the name of the module (if loaded), or None (on error).
"""
if name in self.modules:
self.logger.warning(_("Module '%s' is already loaded; skipping"),
name)
return name
self.logger.debug(_("Discovering module '%s'..."), name)
mod = None
try:
mod = __import__(name, globals(), locals(), [name], 0)
except ImportError:
self.logger.error(_("Module '%s' was not found."), name)
return None
if any([getattr(mod, attr, None) is None
for attr in ['M_ABI_VERSION', 'M_NS', 'M_PREFIX', 'M_NAME']]):
self.logger.error(_("'%s' is not a valid module."), name)
return None
if mod.M_ABI_VERSION != 1:
self.logger.error(_("Module '%s' requires ABI version %d."), name,
mod.M_ABI_VERSION)
return None
self.logger.info(_("Loading module '%s' with ABI %d for namespace %s"),
name, mod.M_ABI_VERSION, mod.M_NS)
self.modules[name] = mod
self.library[name] = '{}@{}'.format(mod.M_NAME, mod.M_REVISION)
nsmap_add(mod.M_PREFIX, mod.M_NS)
return name
def running(self, node):
"""Return running configuration information."""
# ietf-yang-library is state-only.
def operational(self, node):
"""Return configuration and device state information."""
module_set_id = hashlib.sha256(','.join(self.modules.keys()).encode())
content_id = module_set_id.hexdigest()
# NDMA-enabled yang-library node.
lib = util.subelm(node, 'yanglib:yang-library')
modset = util.subelm(lib, 'yanglib:module-set')
modset.append(util.leaf_elm('yanglib:name', 'netconfapk'))
for module in self.modules.values():
# XXX or import-only-module, if it's import-only
modnode = util.subelm(modset, 'yanglib:module')
modnode.append(util.leaf_elm('yanglib:name', module.M_NAME))
modnode.append(util.leaf_elm('yanglib:revision',
module.M_REVISION))
modnode.append(util.leaf_elm('yanglib:namespace', module.M_NS))
schema = util.subelm(lib, 'yanglib:schema')
schema.append(util.leaf_elm('yanglib:name', 'apkschema'))
schema.append(util.leaf_elm('yanglib:module-set', 'netconfapk'))
for store in ['running', 'operational']:
dsnode = util.subelm(lib, 'yanglib:datastore')
dsnode.append(util.leaf_elm('name', store))
dsnode.append(util.leaf_elm('schema', 'apkschema'))
lib.append(util.leaf_elm('yanglib:content-id', content_id))
# Deprecated modules-state node.
modstate = util.subelm(node, 'yanglib:modules-state')
modstate.append(util.leaf_elm('yanglib:module-set-id', content_id))
for module in self.modules.values():
modnode = util.subelm(modstate, 'yanglib:module')
modnode.append(util.leaf_elm('yanglib:name', module.M_NAME))
modnode.append(util.leaf_elm('yanglib:revision',
module.M_REVISION))
modnode.append(util.leaf_elm('yanglib:namespace', module.M_NS))
# XXX import-only module support: use 'import'
modnode.append(util.leaf_elm('yanglib:conformance-type',
'implement'))
def collect_running(self, node):
"""Collect all available information for the <running> datastore."""
for mod in self.modules.values():
mod.running(node)
def collect_operational(self, node):
"""Collect all available information for the <operational> datastore"""
for mod in self.modules.values():
mod.operational(node)