""" NETCONF for APK Distributions server: Server module Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ import logging from lxml import etree import netconf.base import paramiko from netconf import error, util from netconf.server import NetconfSSHServer, SSHAuthorizedKeysController from ncserver.base.log import configure_logging, LOG_AUTH, LOG_CONFIG, \ LOG_OPERATIONAL from ncserver.base.modman import MODMAN from ncserver.base.util import user_for_session from ncserver.config import ConfigManager QName = etree.QName # pylint: disable=I1101 def log_read(session, rpc): """Log a configuration read.""" LOG_CONFIG.debug( '%s on session %d is requesting configuration: %s', user_for_session(session), session.session_id, etree.tostring(rpc) ) def log_write(session, rpc): """Log a configuration write attempt.""" LOG_CONFIG.debug( '%s on session %d is editing configuration: %s', user_for_session(session), session.session_id, etree.tostring(rpc) ) class AuthenticationController(SSHAuthorizedKeysController): """Authentication controller that logs failures.""" def check_auth_publickey(self, username, key): result = super().check_auth_publickey(username, key) if result == paramiko.AUTH_FAILED: LOG_AUTH.error('Authentication failed for %s', username) return result _open_session = getattr(netconf.base.NetconfSession, '_open_session') def _open_session_and_log(self, is_server): """Log that authentication has been successful.""" _open_session(self, is_server) peername = self.pkt_stream.stream.getpeername() ipaddr = peername[0] port = peername[1] if ':' in ipaddr: ipaddr = '[' + ipaddr + ']' LOG_AUTH.notice( 'Authentication granted to %s from %s:%d, session %d', user_for_session(self), ipaddr, port, self.session_id ) setattr(netconf.base.NetconfSession, '_open_session', _open_session_and_log) class Server: """The NETCONF server component.""" def __init__(self, port=830): """Create the NETCONF server. :param int port: The port number to listen on. Typically 830. """ self.config = ConfigManager() self.auth = AuthenticationController( users=self.config.get_list('server', 'users') ) self.debug = self.config.get('server', 'debug') if self.debug: configure_logging(logging.DEBUG) else: loglevel = 'notice' try: loglevel = self.config.get('server', 'loglevel') except KeyError: pass configure_logging(getattr(logging, loglevel.upper())) self.server = NetconfSSHServer( self.auth, self, port, self.config.get('server', 'host_key'), self.debug ) for module in self.config.get_list('server', 'modules'): MODMAN.load_module(module) def __getattr__(self, attr): """Maybe pass RPC calls on.""" if attr.startswith("rpc_"): return self._rpc_wrapper raise AttributeError("'Server' object has no attribute '" + attr + "'") def close(self): """Close all connections.""" self.server.close() def nc_append_capabilities(self, capabilities): """List all capabilities of this NETCONF server.""" our_capabs = [ # Write directly to the `running` datastore. 'urn:ietf:params:netconf:capability:writable-running:1.0', # Filter with XPath in addition to subtrees. 'urn:ietf:params:netconf:capability:xpath:1.0' ] for capab in MODMAN.capabilities(): util.subelm(capabilities, 'capability').text = capab for capab in our_capabs: util.subelm(capabilities, 'capability').text = capab def _rpc_wrapper(self, session, rpc, *params): """Handle module RPCs.""" name = QName(rpc[0].tag).localname LOG_OPERATIONAL.info( 'RPC %s invoked by %s on session %d', name, user_for_session(session), session.session_id ) if MODMAN.has_rpc(name): try: result = MODMAN.rpc(name)(session, rpc, *params) LOG_OPERATIONAL.info( 'RPC %s invoked by %s on session %d completed', name, user_for_session(session), session.session_id ) return result except error.RPCServerError as rpc_error: LOG_OPERATIONAL.error( 'RPC %s invoked by %s on session %d encountered error %s', name, user_for_session(session), session.session_id, rpc_error ) raise else: LOG_OPERATIONAL.warning( 'RPC %s invoked by %s on session %d is unknown', name, user_for_session(session), session.session_id ) raise error.OperationNotSupportedProtoError(rpc) def rpc_get(self, session, rpc, filter_or_none): # pylint: disable=W0613 """Handle the RPC.""" log_read(session, rpc) root = util.elm('nc:data') MODMAN.collect_operational(root) return util.filter_results(rpc, root, filter_or_none, self.debug) # pylint: disable=W0613 def rpc_get_config(self, session, rpc, source, filter_or_none): """Handle the RPC.""" log_read(session, rpc) root = util.elm('nc:data') MODMAN.collect_running(root) return util.filter_results(rpc, root, filter_or_none, self.debug) # pylint: disable=R0913 def rpc_edit_config(self, session, rpc, target, config, default_or_none): """Handle the RPC.""" if len(target) != 1: # We must have exactly 1 target. raise error.BadElementProtoError(rpc, target) log_write(session, rpc) # strip the xmlns off the tag name. datastore = QName(target[0].tag).localname if datastore != "running": raise error.InvalidValueAppError(rpc) if default_or_none is None: def_op = 'merge' else: def_op = default_or_none.text MODMAN.collect_edit(session, rpc, config, def_op) root = util.elm('nc:ok') return root if __name__ == "__main__": s = Server() try: s.server.join() except KeyboardInterrupt: s.close()