"""
NETCONF for APK Distributions server:
ietf-ip module
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
from socket import AF_INET, AF_INET6
import ipaddress
import logging
from lxml.etree import QName # pylint: disable=E0611
from netconf import error, util
from ncserver.base.log import log_config_change
from ncserver.base.util import _, node_operation
from ncserver.util import get_nmsa
LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""
M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""
M_PREFIX = "ip"
"""The XML tag prefix for this module's tags."""
M_NS = "urn:ietf:params:xml:ns:yang:ietf-ip"
"""The XML namespace for this module."""
M_NAME = "ietf-ip"
"""The YANG model name for this module."""
M_REVISION = "2018-02-22"
"""The YANG revision date for this module."""
M_IMPORTS = {
'ietf-inet-types@2013-07-15': {
'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types", 'prefix': "inet"
},
'ietf-yang-types@2013-07-15': {
'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang"
}
}
"""The imported YANG modules for this module."""
M_FEATURES = ('ipv6-privacy-autoconf',)
"""The supported features declared in YANG for this module."""
IF_NS = "urn:ietf:params:xml:ns:yang:ietf-interfaces"
"""The namespace of the ietf-interfaces module."""
M_AUGMENTS = (IF_NS,)
"""The namespaces that this YANG module augments."""
def _get_ifaces(node):
"""Retrieve the /if:interfaces node."""
ifaces = node.find('{'+IF_NS+'}interfaces')
if ifaces is None:
LOGGER.error(_("interfaces node not found: "
"This module requires ietf-interfaces to be loaded"))
return ifaces
def from_bool(value: bool) -> str:
"""Turn a Python bool into an XML bool."""
return str(value).lower()
def _add_ipv4(iface, getter):
"""Add IPv4 configuration nodes."""
name = iface.find('{'+IF_NS+'}name').text
ipv4 = util.subelm(iface, 'ip:ipv4')
#ipv4.append(util.leaf_elm('ip:enabled',
# from_bool(getter(name, 'ipv4_enabled'))))
ipv4.append(util.leaf_elm('ip:forwarding',
from_bool(getter(name, 'ipv4_forwarding'))))
v4mtu = getter(name, 'ipv4_mtu')
if v4mtu is not None:
ipv4.append(util.leaf_elm('ip:mtu', v4mtu - 28))
return ipv4
def _add_ipv6(iface, getter):
"""Add IPv6 configuration nodes."""
name = iface.find('{'+IF_NS+'}name').text
if getter(name, 'ipv6_enabled') is None:
return None # Unset means we don't have IPv6 config at all.
ipv6 = util.subelm(iface, 'ip:ipv6')
ipv6.append(util.leaf_elm('ip:enabled',
from_bool(getter(name, 'ipv6_enabled'))))
forwarding = getter(name, 'ipv6_forwarding')
if forwarding is not None:
ipv6.append(util.leaf_elm('ip:forwarding', from_bool(forwarding)))
v6mtu = getter(name, 'ipv6_mtu')
if v6mtu is not None:
ipv6.append(util.leaf_elm('ip:mtu', v6mtu))
dad_xmit = getter(name, 'ipv6_dad_xmit')
if dad_xmit is not None:
ipv6.append(util.leaf_elm('ip:dup-addr-detect-transmits', dad_xmit))
if any((getter(name, 'ipv6_slaac_globaladdr'),
getter(name, 'ipv6_slaac_tempaddr'))):
autoconf = util.subelm(ipv6, 'ip:autoconf')
autoconf.append(util.leaf_elm('ip:create-global-addresses',
from_bool(
getter(name, 'ipv6_slaac_globaladdr')
)))
autoconf.append(util.leaf_elm('ip:create-temporary-addresses',
from_bool(
getter(name, 'ipv6_slaac_tempaddr')
)))
autoconf.append(util.leaf_elm('ip:temporary-valid-lifetime',
getter(name, 'ipv6_slaac_validlft')))
autoconf.append(util.leaf_elm('ip:temporary-preferred-lifetime',
getter(name, 'ipv6_slaac_preflft')))
return ipv6
def running(node):
"""Retrieve the IP configuration for this device."""
ifaces = _get_ifaces(node)
nmsa = get_nmsa()
if None in (ifaces, nmsa):
# We can't retrieve configuration unless both the ietf-interfaces and
# the NMSA module is loaded.
return
for iface in ifaces.iterchildren():
name = iface.find('{'+IF_NS+'}name').text
# IPv4
ipv4 = _add_ipv4(iface, nmsa.get_param)
for address in nmsa.list_addresses(name):
# Only IPv4 addresses count.
if '.' not in address:
continue
ipaddr, subnet = address.split('/')
addr = util.subelm(ipv4, 'ip:address')
addr.append(util.leaf_elm('ip:ip', ipaddr))
addr.append(util.leaf_elm('ip:prefix-length', subnet))
# No neighbor support.
# IPv6
ipv6 = _add_ipv6(iface, nmsa.get_param)
if not ipv6:
continue
for address in nmsa.list_addresses(name):
# Only IPv6 addesses count.
if ':' not in address:
continue
ipaddr, length = address.split('/')
addr = util.subelm(ipv6, 'ip:address')
addr.append(util.leaf_elm('ip:ip', ipaddr))
addr.append(util.leaf_elm('ip:prefix-length', length))
# No neighbor support.
def operational(node):
"""Retrieve the IP state for this device."""
ifaces = _get_ifaces(node)
nmsa = get_nmsa()
if None in (ifaces, nmsa):
# We can't retrieve configuration unless both the ietf-interfaces and
# the NMSA module is loaded.
return
for iface in ifaces.iterchildren():
name = iface.find('{'+IF_NS+'}name').text
addrs = nmsa.live_addresses(name)
# IPv4
ipv4 = _add_ipv4(iface, nmsa.curr_param)
apipa_net = ipaddress.IPv4Network("169.254.0.0/16")
for addr in addrs:
if not isinstance(addr, ipaddress.IPv4Interface):
continue
addr_node = util.subelm(ipv4, 'ip:address')
addr_node.append(util.leaf_elm('ip:ip', str(addr.ip)))
addr_node.append(util.leaf_elm('ip:prefix-length',
addr.network.prefixlen))
origin = "other"
conf = "{a}/{p}".format(a=str(addr.ip), p=addr.network.prefixlen)
if conf in nmsa.list_addresses(name):
origin = "static"
elif addr in apipa_net:
origin = "random"
addr_node.append(util.leaf_elm('ip:origin', origin))
# IPv6
ipv6 = _add_ipv6(iface, nmsa.curr_param)
for addr in addrs:
if not isinstance(addr, ipaddress.IPv6Interface):
continue
addr_node = util.subelm(ipv6, 'ip:address')
addr_node.append(util.leaf_elm('ip:ip', str(addr.ip)))
addr_node.append(util.leaf_elm('ip:prefix-length',
addr.network.prefixlen))
origin = "other"
conf = "{a}/{p}".format(a=str(addr.ip), p=addr.network.prefixlen)
if conf in nmsa.list_addresses(name):
origin = "static"
addr_node.append(util.leaf_elm('ip:origin', origin))
def _edit_param(iface: str, param: str, operation: str, rpc, node):
"""Edit an NMSA-controlled parameter."""
nmsa = get_nmsa()
value = nmsa.get_param(iface, param)
if operation == 'create' and value is not None:
raise error.DataExistsAppError(rpc)
if operation == 'delete' and value is None:
raise error.DataMissingAppError(rpc)
if operation in ('delete', 'remove'):
nmsa.unset_param(iface, param)
return
if operation not in ('create', 'merge', 'replace'):
raise error.OperationNotSupportedAppError(rpc)
if node.text in ('true', 'false'):
nmsa.set_param(iface, param, node.text == 'true')
else:
nmsa.set_param(iface, param, node.text)
def _clear_ipv4(iface: str):
"""Remove all IPv4 configuration from a given interface."""
nmsa = get_nmsa()
nmsa.set_param(iface, 'ipv4_enabled', False)
nmsa.unset_param(iface, 'ipv4_forwarding')
nmsa.unset_param(iface, 'ipv4_mtu')
def _edit_address(session, rpc, node, operation, iface: str, _type):
"""Edit an IP address."""
addr_node = node.find('{'+M_NS+'}ip')
if addr_node is None:
raise error.MissingElementAppError(rpc, node)
address = addr_node.text
nmsa = get_nmsa()
if operation in ('delete', 'remove'):
try:
nmsa.remove_address(iface, address)
except KeyError as key_e:
if operation == 'delete':
raise error.DataMissingAppError(rpc) from key_e
log_config_change(session, "[ietf-ip %s]" % iface,
"%s IP %s" % (operation, address))
return
pref_node = node.find('{'+M_NS+'}prefix-length')
if pref_node is None:
raise error.MissingElementAppError(rpc, node)
prefix = pref_node.text
if operation not in ('create', 'merge', 'replace'):
raise error.OperationNotSupportedAppError(rpc)
try:
nmsa.add_address(iface, _type, address, prefix)
except ValueError as val_e:
raise error.InvalidValueAppError(rpc, info="invalid IP") from val_e
except RuntimeError as run_e:
if operation == 'create':
raise error.DataExistsAppError(rpc) from run_e
else:
nmsa.modify_prefix(iface, address, prefix)
else:
log_config_change(session, "[ietf-ip %s]" % iface,
"%s IP %s/%s" % (operation, address, prefix))
def _edit_ipv4(session, rpc, node, def_op, iface: str):
"""Edit IPv4 configuration for a given interface."""
_params = {'enabled': 'ipv4_enabled', 'forwarding': 'ipv4_forwarding',
'mtu': 'ipv4_mtu'}
operation = node_operation(node, def_op)
if operation in ('delete', 'remove'):
log_config_change(session, "[ietf-ip %s]" % iface,
"removing IPv4 configuration")
_clear_ipv4(iface)
return
if operation not in ('create', 'merge', 'replace'):
raise error.OperationNotSupportedAppError(rpc)
for xparam in node:
operation = node_operation(xparam, operation)
qparam = QName(xparam.tag)
if qparam.localname in _params.keys():
param = _params[qparam.localname]
log_config_change(session, "[ietf-ip %s]" % iface,
"IPv4 %s: -> %s" % (param, xparam.text))
_edit_param(iface, param, operation, rpc, xparam)
elif qparam.localname == 'address':
_edit_address(session, rpc, xparam, operation, iface, AF_INET)
elif qparam.localname == 'neighbor':
# Oh *no*!
raise NotImplementedError
else:
raise error.UnknownElementAppError(rpc, xparam)
def _clear_ipv6(iface: str):
"""Remove all IPv6 configuration from a given interface."""
nmsa = get_nmsa()
nmsa.set_param(iface, 'ipv6_enabled', False)
for param in ('ipv6_forwarding', 'ipv6_mtu', 'ipv6_dad_xmit',
'ipv6_slaac_globaladdr', 'ipv6_slaac_tempaddr',
'ipv6_slaac_validlft', 'ipv6_slaac_preflft'):
nmsa.unset_param(iface, param)
def _clear_slaac(iface: str):
"""Remove all SLAAC configuration from a given interface."""
nmsa = get_nmsa()
for param in ('ipv6_slaac_globaladdr', 'ipv6_slaac_tempaddr',
'ipv6_slaac_validlft', 'ipv6_slaac_preflft'):
nmsa.unset_param(iface, param)
def _edit_slaac(session, rpc, node, def_op, iface: str):
"""Edit SLAAC configuration for a given interface."""
_params = {'create-global-addresses': 'ipv6_slaac_globaladdr',
'create-temporary-addresses': 'ipv6_slaac_tempaddr',
'temporary-valid-lifetime': 'ipv6_slaac_validlft',
'temporary-preferred-lifetime': 'ipv6_slaac_preflft'}
operation = node_operation(node, def_op)
if operation in ('delete', 'remove'):
log_config_change(session, "[ietf-ip %s]" % iface,
"removing IPv6 SLAAC configuration")
_clear_slaac(iface)
return
if operation not in ('create', 'merge', 'replace'):
raise error.OperationNotSupportedAppError(rpc)
for param in node:
p_name = QName(param.tag).localname
if p_name not in _params.keys():
raise error.UnknownElementAppError(rpc, param)
p_op = node_operation(param, operation)
_edit_param(iface, _params[p_name], p_op, rpc, param)
def _edit_ipv6(session, rpc, node, def_op, iface: str):
"""Edit IPv6 configuration for a given interface."""
_params = {'enabled': 'ipv6_enabled', 'forwarding': 'ipv6_forwarding',
'mtu': 'ipv6_mtu', 'dup-addr-detect-transmits': 'ipv6_dad_xmit'}
operation = node_operation(node, def_op)
if operation in ('delete', 'remove'):
log_config_change(session, "[ietf-ip %s]" % iface,
"removing IPv6 configuration")
_clear_ipv6(iface)
return
if operation not in ('create', 'merge', 'replace'):
raise error.OperationNotSupportedAppError(rpc)
for xparam in node:
p_op = node_operation(xparam, operation)
qparam = QName(xparam.tag)
if qparam.localname in _params.keys():
param = _params[qparam.localname]
log_config_change(session, "[ietf-ip %s]" % iface,
"IPv6 %s: -> %s" % (param, xparam.text))
_edit_param(iface, param, p_op, rpc, xparam)
elif qparam.localname == 'address':
_edit_address(session, rpc, xparam, operation, iface, AF_INET6)
elif qparam.localname == 'neighbor':
# Oh *no*!
raise NotImplementedError
elif qparam.localname == 'autoconf':
# Configure SLAAC parameters.
_edit_slaac(session, rpc, node, def_op, iface)
else:
raise error.UnknownElementAppError(rpc, xparam)
def edit(session, rpc, node, def_op):
"""Edit the IP configuration for this device."""
methods = {'ipv4': _edit_ipv4, 'ipv6': _edit_ipv6}
nmsa = get_nmsa()
if nmsa is None:
raise error.OperationNotSupportedAppError(rpc)
for interface in node:
if QName(interface.tag).localname != 'interface' or\
QName(interface.tag).namespace != IF_NS:
continue # Ignore unknown tags given to us.
name_node = interface.find('{'+IF_NS+'}name')
if name_node is None:
raise error.MissingElementAppError(rpc, interface)
iface = name_node.text
operation = node_operation(interface, def_op)
if operation in ('delete', 'remove'):
# ietf-interfaces already removed the configuration for us.
continue
for conf in interface:
if QName(conf.tag).namespace == M_NS:
name = QName(conf.tag).localname
if name in methods:
methods[name](session, rpc, conf, operation, iface)
else:
raise error.UnknownElementAppError(rpc, conf)
else:
continue