""" NETCONF for APK Distributions server: ietf-ip module Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ import ipaddress import logging from lxml.etree import QName # pylint: disable=E0611 from netconf import error, util from ncserver.base.log import log_config_change from ncserver.base.util import _, node_operation from ncserver.util import get_nmsa LOGGER = logging.getLogger(__name__) """The object used for logging informational messages.""" M_ABI_VERSION = 1 """The ABI version of this NETCONF module.""" M_PREFIX = "ip" """The XML tag prefix for this module's tags.""" M_NS = "urn:ietf:params:xml:ns:yang:ietf-ip" """The XML namespace for this module.""" M_NAME = "ietf-ip" """The YANG model name for this module.""" M_REVISION = "2018-02-22" """The YANG revision date for this module.""" M_IMPORTS = { 'ietf-inet-types@2013-07-15': { 'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types", 'prefix': "inet" }, 'ietf-yang-types@2013-07-15': { 'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang" } } """The imported YANG modules for this module.""" M_FEATURES = ('ipv6-privacy-autoconf',) """The supported features declared in YANG for this module.""" IF_NS = "urn:ietf:params:xml:ns:yang:ietf-interfaces" """The namespace of the ietf-interfaces module.""" M_AUGMENTS = (IF_NS,) """The namespaces that this YANG module augments.""" def _get_ifaces(node): """Retrieve the /if:interfaces node.""" ifaces = node.find('{'+IF_NS+'}interfaces') if ifaces is None: LOGGER.error(_("interfaces node not found: " "This module requires ietf-interfaces to be loaded")) return ifaces def from_bool(value: bool) -> str: """Turn a Python bool into an XML bool.""" return str(value).lower() def _add_ipv4(iface, getter): """Add IPv4 configuration nodes.""" name = iface.find('{'+IF_NS+'}name').text ipv4 = util.subelm(iface, 'ip:ipv4') #ipv4.append(util.leaf_elm('ip:enabled', # from_bool(getter(name, 'ipv4_enabled')))) ipv4.append(util.leaf_elm('ip:forwarding', from_bool(getter(name, 'ipv4_forwarding')))) v4mtu = getter(name, 'ipv4_mtu') if v4mtu is not None: ipv4.append(util.leaf_elm('ip:mtu', v4mtu - 28)) return ipv4 def _add_ipv6(iface, getter): """Add IPv6 configuration nodes.""" name = iface.find('{'+IF_NS+'}name').text if getter(name, 'ipv6_enabled') is None: return None # Unset means we don't have IPv6 config at all. ipv6 = util.subelm(iface, 'ip:ipv6') ipv6.append(util.leaf_elm('ip:enabled', from_bool(getter(name, 'ipv6_enabled')))) forwarding = getter(name, 'ipv6_forwarding') if forwarding is not None: ipv6.append(util.leaf_elm('ip:forwarding', from_bool(forwarding))) v6mtu = getter(name, 'ipv6_mtu') if v6mtu is not None: ipv6.append(util.leaf_elm('ip:mtu', v6mtu)) dad_xmit = getter(name, 'ipv6_dad_xmit') if dad_xmit is not None: ipv6.append(util.leaf_elm('ip:dup-addr-detect-transmits', dad_xmit)) if any((getter(name, 'ipv6_slaac_globaladdr'), getter(name, 'ipv6_slaac_tempaddr'))): autoconf = util.subelm(ipv6, 'ip:autoconf') autoconf.append(util.leaf_elm('ip:create-global-addresses', from_bool( getter(name, 'ipv6_slaac_globaladdr') ))) autoconf.append(util.leaf_elm('ip:create-temporary-addresses', from_bool( getter(name, 'ipv6_slaac_tempaddr') ))) autoconf.append(util.leaf_elm('ip:temporary-valid-lifetime', getter(name, 'ipv6_slaac_validlft'))) autoconf.append(util.leaf_elm('ip:temporary-preferred-lifetime', getter(name, 'ipv6_slaac_preflft'))) return ipv6 def running(node): """Retrieve the IP configuration for this device.""" ifaces = _get_ifaces(node) nmsa = get_nmsa() if None in (ifaces, nmsa): # We can't retrieve configuration unless both the ietf-interfaces and # the NMSA module is loaded. return for iface in ifaces.iterchildren(): name = iface.find('{'+IF_NS+'}name').text # IPv4 ipv4 = _add_ipv4(iface, nmsa.get_param) for address in nmsa.list_addresses(name): # Only IPv4 addresses count. if '.' not in address: continue ipaddr, subnet = address.split('/') addr = util.subelm(ipv4, 'ip:address') addr.append(util.leaf_elm('ip:ip', ipaddr)) addr.append(util.leaf_elm('ip:prefix-length', subnet)) # No neighbor support. # IPv6 ipv6 = _add_ipv6(iface, nmsa.get_param) if not ipv6: continue for address in nmsa.list_addresses(name): # Only IPv6 addesses count. if ':' not in address: continue ipaddr, length = address.split('/') addr = util.subelm(ipv6, 'ip:address') addr.append(util.leaf_elm('ip:ip', ipaddr)) addr.append(util.leaf_elm('ip:prefix-length', length)) # No neighbor support. def operational(node): """Retrieve the IP state for this device.""" ifaces = _get_ifaces(node) nmsa = get_nmsa() if None in (ifaces, nmsa): # We can't retrieve configuration unless both the ietf-interfaces and # the NMSA module is loaded. return for iface in ifaces.iterchildren(): name = iface.find('{'+IF_NS+'}name').text addrs = nmsa.live_addresses(name) # IPv4 ipv4 = _add_ipv4(iface, nmsa.curr_param) apipa_net = ipaddress.IPv4Network("169.254.0.0/16") for addr in addrs: if not isinstance(addr, ipaddress.IPv4Interface): continue addr_node = util.subelm(ipv4, 'ip:address') addr_node.append(util.leaf_elm('ip:ip', str(addr.ip))) addr_node.append(util.leaf_elm('ip:prefix-length', addr.network.prefixlen)) origin = "other" conf = "{a}/{p}".format(a=str(addr.ip), p=addr.network.prefixlen) if conf in nmsa.list_addresses(name): origin = "static" elif addr in apipa_net: origin = "random" addr_node.append(util.leaf_elm('ip:origin', origin)) # IPv6 ipv6 = _add_ipv6(iface, nmsa.curr_param) for addr in addrs: if not isinstance(addr, ipaddress.IPv6Interface): continue addr_node = util.subelm(ipv6, 'ip:address') addr_node.append(util.leaf_elm('ip:ip', str(addr.ip))) addr_node.append(util.leaf_elm('ip:prefix-length', addr.network.prefixlen)) origin = "other" conf = "{a}/{p}".format(a=str(addr.ip), p=addr.network.prefixlen) if conf in nmsa.list_addresses(name): origin = "static" addr_node.append(util.leaf_elm('ip:origin', origin)) def _edit_param(iface: str, param: str, operation: str, rpc, node): """Edit an NMSA-controlled parameter.""" nmsa = get_nmsa() value = nmsa.get_param(iface, param) if operation == 'create' and value is not None: raise error.DataExistsAppError(rpc) if operation == 'delete' and value is None: raise error.DataMissingAppError(rpc) if operation in ('delete', 'remove'): nmsa.unset_param(iface, param) return if operation not in ('create', 'merge', 'replace'): raise error.OperationNotSupportedAppError(rpc) if node.text in ('true', 'false'): nmsa.set_param(iface, param, node.text == 'true') else: nmsa.set_param(iface, param, node.text) def _clear_ipv4(iface: str): """Remove all IPv4 configuration from a given interface.""" nmsa = get_nmsa() nmsa.set_param(iface, 'ipv4_enabled', False) nmsa.unset_param(iface, 'ipv4_forwarding') nmsa.unset_param(iface, 'ipv4_mtu') def _edit_ipv4(session, rpc, node, def_op, iface: str): """Edit IPv4 configuration for a given interface.""" _params = {'enabled': 'ipv4_enabled', 'forwarding': 'ipv4_forwarding', 'mtu': 'ipv4_mtu'} operation = node_operation(node, def_op) if operation in ('delete', 'remove'): log_config_change(session, "[ietf-ip %s]" % iface, "removing IPv4 configuration") _clear_ipv4(iface) return if operation not in ('create', 'merge', 'replace'): raise error.OperationNotSupportedAppError(rpc) for xparam in node: operation = node_operation(xparam, operation) qparam = QName(xparam.tag) if qparam.localname in _params.keys(): param = _params[qparam.localname] log_config_change(session, "[ietf-ip %s]" % iface, "IPv4 %s: -> %s" % (param, node.text)) _edit_param(iface, param, operation, rpc, node) elif qparam.localname == 'address': # Oh no. raise NotImplementedError elif qparam.localname == 'neighbor': # Oh *no*! raise NotImplementedError else: raise error.UnknownElementAppError(rpc, xparam) def _clear_ipv6(iface: str): """Remove all IPv6 configuration from a given interface.""" nmsa = get_nmsa() nmsa.set_param(iface, 'ipv6_enabled', False) for param in ('ipv6_forwarding', 'ipv6_mtu', 'ipv6_dad_xmit', 'ipv6_slaac_globaladdr', 'ipv6_slaac_tempaddr', 'ipv6_slaac_validlft', 'ipv6_slaac_preflft'): nmsa.unset_param(iface, param) def _clear_slaac(iface: str): """Remove all SLAAC configuration from a given interface.""" nmsa = get_nmsa() for param in ('ipv6_slaac_globaladdr', 'ipv6_slaac_tempaddr', 'ipv6_slaac_validlft', 'ipv6_slaac_preflft'): nmsa.unset_param(iface, param) def _edit_slaac(session, rpc, node, def_op, iface: str): """Edit SLAAC configuration for a given interface.""" _params = {'create-global-addresses': 'ipv6_slaac_globaladdr', 'create-temporary-addresses': 'ipv6_slaac_tempaddr', 'temporary-valid-lifetime': 'ipv6_slaac_validlft', 'temporary-preferred-lifetime': 'ipv6_slaac_preflft'} operation = node_operation(node, def_op) if operation in ('delete', 'remove'): log_config_change(session, "[ietf-ip %s]" % iface, "removing IPv6 SLAAC configuration") _clear_slaac(iface) return if operation not in ('create', 'merge', 'replace'): raise error.OperationNotSupportedAppError(rpc) for param in node: p_name = QName(param.tag).localname if p_name not in _params.keys(): raise error.UnknownElementAppError(rpc, param) p_op = node_operation(param, operation) _edit_param(iface, _params[p_name], p_op, rpc, param) def _edit_ipv6(session, rpc, node, def_op, iface: str): """Edit IPv6 configuration for a given interface.""" _params = {'enabled': 'ipv6_enabled', 'forwarding': 'ipv6_forwarding', 'mtu': 'ipv6_mtu', 'dup-addr-detect-transmits': 'ipv6_dad_xmit'} operation = node_operation(node, def_op) if operation in ('delete', 'remove'): log_config_change(session, "[ietf-ip %s]" % iface, "removing IPv6 configuration") _clear_ipv6(iface) return if operation not in ('create', 'merge', 'replace'): raise error.OperationNotSupportedAppError(rpc) for xparam in node: p_op = node_operation(xparam, operation) qparam = QName(xparam.tag) if qparam.localname in _params.keys(): param = _params[qparam.localname] log_config_change(session, "[ietf-ip %s]" % iface, "IPv6 %s: -> %s" % (param, node.text)) _edit_param(iface, param, p_op, rpc, node) elif qparam.localname == 'address': # Oh no. raise NotImplementedError elif qparam.localname == 'neighbor': # Oh *no*! raise NotImplementedError elif qparam.localname == 'autoconf': # Configure SLAAC parameters. _edit_slaac(session, rpc, node, def_op, iface) else: raise error.UnknownElementAppError(rpc, xparam) def edit(session, rpc, node, def_op): """Edit the IP configuration for this device.""" methods = {'ipv4': _edit_ipv4, 'ipv6': _edit_ipv6} nmsa = get_nmsa() if nmsa is None: raise error.OperationNotSupportedAppError(rpc) for interface in node: if QName(interface.tag).localname != 'interface' or\ QName(interface.tag).namespace != IF_NS: continue # Ignore unknown tags given to us. name_node = interface.find('{'+IF_NS+'}name') if name_node is None: raise error.MissingElementAppError(rpc, interface) iface = name_node.text operation = node_operation(interface, def_op) if operation in ('delete', 'remove'): # ietf-interfaces already removed the configuration for us. continue for conf in interface: if QName(conf.tag).namespace == M_NS: name = QName(conf.tag).localname if name in methods: methods[name](session, rpc, conf, operation, iface) else: raise error.UnknownElementAppError(rpc, conf) else: continue