summaryrefslogblamecommitdiff
path: root/ncserver/module/ip.py
blob: 024ce4f36308eac276a6a7ac6a0a7029dd051e42 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12











                                                                            

                                    
                

              
                                                     
                               
 

                                                
                                  

 
























                                                         

                                                                             





























                                                                             








                                              

                                                                        



                                                                          
                                                        






                                             


                                                                    


                                                                       




                                                                          


                                                   




                                                                            





















                                                                               
                     
























                                                                             


                    















                                                                  
                     






                                                                             


                                                 
              
                                                
                                                           












                                                                             

                                   
                                                                
              














                                                                             

 































                                                                   








































                                                                           





















                                                                          

                                                                      
                                           
                                                                          










































































                                                                               

                                                                      
                                           
                                                                           









                                                           

                                                    





























                                                                       
"""
NETCONF for APK Distributions server:
    ietf-ip module

Copyright © 2020 Adélie Software in the Public Benefit, Inc.

Released under the terms of the NCSA license.  See the LICENSE file included
with this source distribution for more information.

SPDX-License-Identifier: NCSA
"""

from socket import AF_INET, AF_INET6

import ipaddress
import logging

from lxml.etree import QName  # pylint: disable=E0611
from netconf import error, util

from ncserver.base.log import log_config_change
from ncserver.base.util import _, node_operation
from ncserver.util import get_nmsa


LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""


M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""


M_PREFIX = "ip"
"""The XML tag prefix for this module's tags."""


M_NS = "urn:ietf:params:xml:ns:yang:ietf-ip"
"""The XML namespace for this module."""


M_NAME = "ietf-ip"
"""The YANG model name for this module."""


M_REVISION = "2018-02-22"
"""The YANG revision date for this module."""


M_IMPORTS = {
    'ietf-inet-types@2013-07-15': {
        'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types", 'prefix': "inet"
    },
    'ietf-yang-types@2013-07-15': {
        'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang"
    }
}
"""The imported YANG modules for this module."""


M_FEATURES = ('ipv6-privacy-autoconf',)
"""The supported features declared in YANG for this module."""


IF_NS = "urn:ietf:params:xml:ns:yang:ietf-interfaces"
"""The namespace of the ietf-interfaces module."""


M_AUGMENTS = (IF_NS,)
"""The namespaces that this YANG module augments."""


def _get_ifaces(node):
    """Retrieve the /if:interfaces node."""
    ifaces = node.find('{'+IF_NS+'}interfaces')
    if ifaces is None:
        LOGGER.error(_("interfaces node not found: "
                       "This module requires ietf-interfaces to be loaded"))

    return ifaces


def from_bool(value: bool) -> str:
    """Turn a Python bool into an XML bool."""
    return str(value).lower()


def _add_ipv4(iface, getter):
    """Add IPv4 configuration nodes."""
    name = iface.find('{'+IF_NS+'}name').text
    ipv4 = util.subelm(iface, 'ip:ipv4')
    #ipv4.append(util.leaf_elm('ip:enabled',
    #                          from_bool(getter(name, 'ipv4_enabled'))))
    ipv4.append(util.leaf_elm('ip:forwarding',
                              from_bool(getter(name, 'ipv4_forwarding'))))
    v4mtu = getter(name, 'ipv4_mtu')
    if v4mtu is not None:
        ipv4.append(util.leaf_elm('ip:mtu', v4mtu - 28))

    return ipv4


def _add_ipv6(iface, getter):
    """Add IPv6 configuration nodes."""
    name = iface.find('{'+IF_NS+'}name').text
    if getter(name, 'ipv6_enabled') is None:
        return None  # Unset means we don't have IPv6 config at all.

    ipv6 = util.subelm(iface, 'ip:ipv6')
    ipv6.append(util.leaf_elm('ip:enabled',
                              from_bool(getter(name, 'ipv6_enabled'))))

    forwarding = getter(name, 'ipv6_forwarding')
    if forwarding is not None:
        ipv6.append(util.leaf_elm('ip:forwarding', from_bool(forwarding)))

    v6mtu = getter(name, 'ipv6_mtu')
    if v6mtu is not None:
        ipv6.append(util.leaf_elm('ip:mtu', v6mtu))

    dad_xmit = getter(name, 'ipv6_dad_xmit')
    if dad_xmit is not None:
        ipv6.append(util.leaf_elm('ip:dup-addr-detect-transmits', dad_xmit))

    if any((getter(name, 'ipv6_slaac_globaladdr'),
            getter(name, 'ipv6_slaac_tempaddr'))):
        autoconf = util.subelm(ipv6, 'ip:autoconf')
        autoconf.append(util.leaf_elm('ip:create-global-addresses',
                                      from_bool(
                                          getter(name, 'ipv6_slaac_globaladdr')
                                      )))
        autoconf.append(util.leaf_elm('ip:create-temporary-addresses',
                                      from_bool(
                                          getter(name, 'ipv6_slaac_tempaddr')
                                      )))
        autoconf.append(util.leaf_elm('ip:temporary-valid-lifetime',
                                      getter(name, 'ipv6_slaac_validlft')))
        autoconf.append(util.leaf_elm('ip:temporary-preferred-lifetime',
                                      getter(name, 'ipv6_slaac_preflft')))

    return ipv6


def running(node):
    """Retrieve the IP configuration for this device."""
    ifaces = _get_ifaces(node)
    nmsa = get_nmsa()

    if None in (ifaces, nmsa):
        # We can't retrieve configuration unless both the ietf-interfaces and
        # the NMSA module is loaded.
        return

    for iface in ifaces.iterchildren():
        name = iface.find('{'+IF_NS+'}name').text

        # IPv4
        ipv4 = _add_ipv4(iface, nmsa.get_param)
        for address in nmsa.list_addresses(name):
            # Only IPv4 addresses count.
            if '.' not in address:
                continue

            ipaddr, subnet = address.split('/')

            addr = util.subelm(ipv4, 'ip:address')
            addr.append(util.leaf_elm('ip:ip', ipaddr))
            addr.append(util.leaf_elm('ip:prefix-length', subnet))
        # No neighbor support.

        # IPv6
        ipv6 = _add_ipv6(iface, nmsa.get_param)
        if not ipv6:
            continue

        for address in nmsa.list_addresses(name):
            # Only IPv6 addesses count.
            if ':' not in address:
                continue

            ipaddr, length = address.split('/')

            addr = util.subelm(ipv6, 'ip:address')
            addr.append(util.leaf_elm('ip:ip', ipaddr))
            addr.append(util.leaf_elm('ip:prefix-length', length))
        # No neighbor support.


def operational(node):
    """Retrieve the IP state for this device."""
    ifaces = _get_ifaces(node)
    nmsa = get_nmsa()

    if None in (ifaces, nmsa):
        # We can't retrieve configuration unless both the ietf-interfaces and
        # the NMSA module is loaded.
        return

    for iface in ifaces.iterchildren():
        name = iface.find('{'+IF_NS+'}name').text
        addrs = nmsa.live_addresses(name)

        # IPv4
        ipv4 = _add_ipv4(iface, nmsa.curr_param)
        apipa_net = ipaddress.IPv4Network("169.254.0.0/16")
        for addr in addrs:
            if not isinstance(addr, ipaddress.IPv4Interface):
                continue

            addr_node = util.subelm(ipv4, 'ip:address')
            addr_node.append(util.leaf_elm('ip:ip', str(addr.ip)))
            addr_node.append(util.leaf_elm('ip:prefix-length',
                                           addr.network.prefixlen))

            origin = "other"
            conf = "{a}/{p}".format(a=str(addr.ip), p=addr.network.prefixlen)
            if conf in nmsa.list_addresses(name):
                origin = "static"
            elif addr in apipa_net:
                origin = "random"
            addr_node.append(util.leaf_elm('ip:origin', origin))
        # IPv6
        ipv6 = _add_ipv6(iface, nmsa.curr_param)
        for addr in addrs:
            if not isinstance(addr, ipaddress.IPv6Interface):
                continue

            addr_node = util.subelm(ipv6, 'ip:address')
            addr_node.append(util.leaf_elm('ip:ip', str(addr.ip)))
            addr_node.append(util.leaf_elm('ip:prefix-length',
                                           addr.network.prefixlen))

            origin = "other"
            conf = "{a}/{p}".format(a=str(addr.ip), p=addr.network.prefixlen)
            if conf in nmsa.list_addresses(name):
                origin = "static"
            addr_node.append(util.leaf_elm('ip:origin', origin))


def _edit_param(iface: str, param: str, operation: str, rpc, node):
    """Edit an NMSA-controlled parameter."""
    nmsa = get_nmsa()
    value = nmsa.get_param(iface, param)

    if operation == 'create' and value is not None:
        raise error.DataExistsAppError(rpc)
    if operation == 'delete' and value is None:
        raise error.DataMissingAppError(rpc)

    if operation in ('delete', 'remove'):
        nmsa.unset_param(iface, param)
        return

    if operation not in ('create', 'merge', 'replace'):
        raise error.OperationNotSupportedAppError(rpc)

    if node.text in ('true', 'false'):
        nmsa.set_param(iface, param, node.text == 'true')
    else:
        nmsa.set_param(iface, param, node.text)


def _clear_ipv4(iface: str):
    """Remove all IPv4 configuration from a given interface."""
    nmsa = get_nmsa()

    nmsa.set_param(iface, 'ipv4_enabled', False)
    nmsa.unset_param(iface, 'ipv4_forwarding')
    nmsa.unset_param(iface, 'ipv4_mtu')


def _edit_address(session, rpc, node, operation, iface: str, _type):
    """Edit an IP address."""
    addr_node = node.find('{'+M_NS+'}ip')
    if addr_node is None:
        raise error.MissingElementAppError(rpc, node)
    address = addr_node.text

    nmsa = get_nmsa()

    if operation in ('delete', 'remove'):
        try:
            nmsa.remove_address(iface, address)
        except KeyError as key_e:
            if operation == 'delete':
                raise error.DataMissingAppError(rpc) from key_e
        log_config_change(session, "[ietf-ip %s]" % iface,
                          "%s IP %s" % (operation, address))
        return

    pref_node = node.find('{'+M_NS+'}prefix-length')
    if pref_node is None:
        raise error.MissingElementAppError(rpc, node)
    prefix = pref_node.text

    if operation not in ('create', 'merge', 'replace'):
        raise error.OperationNotSupportedAppError(rpc)

    try:
        nmsa.add_address(iface, _type, address, prefix)
    except ValueError as val_e:
        raise error.InvalidValueAppError(rpc, info="invalid IP") from val_e
    except RuntimeError as run_e:
        if operation == 'create':
            raise error.DataExistsAppError(rpc) from run_e
        else:
            nmsa.modify_prefix(iface, address, prefix)
    else:
        log_config_change(session, "[ietf-ip %s]" % iface,
                          "%s IP %s/%s" % (operation, address, prefix))


def _edit_ipv4(session, rpc, node, def_op, iface: str):
    """Edit IPv4 configuration for a given interface."""
    _params = {'enabled': 'ipv4_enabled', 'forwarding': 'ipv4_forwarding',
               'mtu': 'ipv4_mtu'}

    operation = node_operation(node, def_op)
    if operation in ('delete', 'remove'):
        log_config_change(session, "[ietf-ip %s]" % iface,
                          "removing IPv4 configuration")

        _clear_ipv4(iface)
        return

    if operation not in ('create', 'merge', 'replace'):
        raise error.OperationNotSupportedAppError(rpc)

    for xparam in node:
        operation = node_operation(xparam, operation)
        qparam = QName(xparam.tag)
        if qparam.localname in _params.keys():
            param = _params[qparam.localname]
            log_config_change(session, "[ietf-ip %s]" % iface,
                              "IPv4 %s: -> %s" % (param, xparam.text))
            _edit_param(iface, param, operation, rpc, xparam)
        elif qparam.localname == 'address':
            _edit_address(session, rpc, xparam, operation, iface, AF_INET)
        elif qparam.localname == 'neighbor':
            # Oh *no*!
            raise NotImplementedError
        else:
            raise error.UnknownElementAppError(rpc, xparam)


def _clear_ipv6(iface: str):
    """Remove all IPv6 configuration from a given interface."""
    nmsa = get_nmsa()

    nmsa.set_param(iface, 'ipv6_enabled', False)
    for param in ('ipv6_forwarding', 'ipv6_mtu', 'ipv6_dad_xmit',
                  'ipv6_slaac_globaladdr', 'ipv6_slaac_tempaddr',
                  'ipv6_slaac_validlft', 'ipv6_slaac_preflft'):
        nmsa.unset_param(iface, param)


def _clear_slaac(iface: str):
    """Remove all SLAAC configuration from a given interface."""
    nmsa = get_nmsa()

    for param in ('ipv6_slaac_globaladdr', 'ipv6_slaac_tempaddr',
                  'ipv6_slaac_validlft', 'ipv6_slaac_preflft'):
        nmsa.unset_param(iface, param)


def _edit_slaac(session, rpc, node, def_op, iface: str):
    """Edit SLAAC configuration for a given interface."""
    _params = {'create-global-addresses': 'ipv6_slaac_globaladdr',
               'create-temporary-addresses': 'ipv6_slaac_tempaddr',
               'temporary-valid-lifetime': 'ipv6_slaac_validlft',
               'temporary-preferred-lifetime': 'ipv6_slaac_preflft'}

    operation = node_operation(node, def_op)
    if operation in ('delete', 'remove'):
        log_config_change(session, "[ietf-ip %s]" % iface,
                          "removing IPv6 SLAAC configuration")
        _clear_slaac(iface)
        return

    if operation not in ('create', 'merge', 'replace'):
        raise error.OperationNotSupportedAppError(rpc)

    for param in node:
        p_name = QName(param.tag).localname
        if p_name not in _params.keys():
            raise error.UnknownElementAppError(rpc, param)

        p_op = node_operation(param, operation)
        _edit_param(iface, _params[p_name], p_op, rpc, param)


def _edit_ipv6(session, rpc, node, def_op, iface: str):
    """Edit IPv6 configuration for a given interface."""
    _params = {'enabled': 'ipv6_enabled', 'forwarding': 'ipv6_forwarding',
               'mtu': 'ipv6_mtu', 'dup-addr-detect-transmits': 'ipv6_dad_xmit'}

    operation = node_operation(node, def_op)
    if operation in ('delete', 'remove'):
        log_config_change(session, "[ietf-ip %s]" % iface,
                          "removing IPv6 configuration")

        _clear_ipv6(iface)
        return

    if operation not in ('create', 'merge', 'replace'):
        raise error.OperationNotSupportedAppError(rpc)

    for xparam in node:
        p_op = node_operation(xparam, operation)
        qparam = QName(xparam.tag)
        if qparam.localname in _params.keys():
            param = _params[qparam.localname]
            log_config_change(session, "[ietf-ip %s]" % iface,
                              "IPv6 %s: -> %s" % (param, xparam.text))
            _edit_param(iface, param, p_op, rpc, xparam)
        elif qparam.localname == 'address':
            _edit_address(session, rpc, xparam, operation, iface, AF_INET6)
        elif qparam.localname == 'neighbor':
            # Oh *no*!
            raise NotImplementedError
        elif qparam.localname == 'autoconf':
            # Configure SLAAC parameters.
            _edit_slaac(session, rpc, node, def_op, iface)
        else:
            raise error.UnknownElementAppError(rpc, xparam)


def edit(session, rpc, node, def_op):
    """Edit the IP configuration for this device."""
    methods = {'ipv4': _edit_ipv4, 'ipv6': _edit_ipv6}

    nmsa = get_nmsa()
    if nmsa is None:
        raise error.OperationNotSupportedAppError(rpc)

    for interface in node:
        if QName(interface.tag).localname != 'interface' or\
           QName(interface.tag).namespace != IF_NS:
            continue  # Ignore unknown tags given to us.

        name_node = interface.find('{'+IF_NS+'}name')
        if name_node is None:
            raise error.MissingElementAppError(rpc, interface)
        iface = name_node.text

        operation = node_operation(interface, def_op)
        if operation in ('delete', 'remove'):
            # ietf-interfaces already removed the configuration for us.
            continue

        for conf in interface:
            if QName(conf.tag).namespace == M_NS:
                name = QName(conf.tag).localname
                if name in methods:
                    methods[name](session, rpc, conf, operation, iface)
                else:
                    raise error.UnknownElementAppError(rpc, conf)
            else:
                continue