""" NETCONF for APK Distributions server: Server module Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ import logging from netconf import util from netconf.server import NetconfSSHServer, SSHAuthorizedKeysController from ncserver.base.modman import ModuleManager from ncserver.config import ConfigManager class Server: """The NETCONF server component.""" def __init__(self, port=830): """Create the NETCONF server. :param int port: The port number to listen on. Typically 830. """ self.config = ConfigManager() self.auth = SSHAuthorizedKeysController( users=self.config.get_list('server', 'users') ) self.debug = self.config.get('server', 'debug') if self.debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) self.server = NetconfSSHServer( self.auth, self, port, self.config.get('server', 'host_key'), self.debug ) self.modman = ModuleManager() for module in self.config.get_list('server', 'modules'): self.modman.load_module(module) def close(self): """Close all connections.""" self.server.close() def nc_append_capabilities(self, capabilities): """List all capabilities of this NETCONF server.""" for capab in self.modman.capabilities(): util.subelm(capabilities, 'capability').text = capab util.subelm(capabilities, 'capability').text = \ 'urn:ietf:params:netconf:capability:xpath:1.0' def rpc_get(self, session, rpc, filter_or_none): # pylint: disable=W0613 """Handle the RPC.""" root = util.elm('nc:data') self.modman.collect_running(root) self.modman.collect_operational(root) return util.filter_results(rpc, root, filter_or_none, self.debug) # pylint: disable=W0613 def rpc_get_config(self, session, rpc, source, filter_or_none): """Handle the RPC.""" root = util.elm('nc:data') self.modman.collect_running(root) return util.filter_results(rpc, root, filter_or_none, self.debug) if __name__ == "__main__": s = Server() try: s.server.join() except KeyboardInterrupt: s.close()