""" NETCONF for APK Distributions server: Network Management System abstraction module for ifupdown-ng Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ from contextlib import contextmanager import ipaddress import logging import pathlib import socket import subprocess import yaml # pylint: disable=I1101 import netifaces from ncserver.base.util import _ LOGGER = logging.getLogger(__name__) """The object used for logging informational messages.""" M_ABI_VERSION = 1 """The ABI version of this NETCONF module.""" M_PREFIX = "nmsa" """The XML tag prefix for this module's tags.""" M_NS = "http://netconf.adelielinux.org/ns/netmgmt" """The XML namespace for this module.""" M_NAME = "adelie-nms-abstract" """The YANG model name for this module.""" M_REVISION = "2020-11-18" """The YANG revision date for this module.""" _CONFIG = dict() """The internal configuration handle.""" _SYSCTL = dict() """The internatl sysctl configuration handle.""" _TRANSACTION = False """Determines if a transaction is in progress.""" ############################# # I N T E R N A L # ############################# def _load_config(): """Load the current active configuration from /e/n/i.""" global _CONFIG # pylint: disable=W0603 # Won't load during a transaction. if _TRANSACTION: return result = None try: result = subprocess.run(['/sbin/ifparse', '-AF', 'yaml-raw'], stdout=subprocess.PIPE, check=False) except OSError: LOGGER.error(_("ifupdown-ng may not be installed properly")) return if result.returncode != 0: LOGGER.error(_("ifparse returned error %d"), result.returncode) return rawyaml = result.stdout.decode('utf-8') _CONFIG = yaml.safe_load(rawyaml) def _save_unlocked(): """Save changes to the configuration.""" eni = "" for iface in _CONFIG.keys(): buf = "iface " + iface + "\n" for item in _CONFIG[iface]: for key, val in item.items(): if key == 'auto' and \ (val is True or val.lower()[0] == 't'): buf = "auto " + iface + "\n" + buf continue if isinstance(val, bool): val = str(val).lower() buf += " " buf += str(key) + " " + str(val) buf += "\n" eni += buf + "\n" with open('/etc/network/interfaces', 'w') as conf_file: # snip last double-\n off conf_file.write(eni[:-1]) def _sysctl_save_unlocked(): """Save changes to sysctl.conf (and sysctl itself).""" buf = "\n".join("{key}={val}".format(key=key, val=val) for key, val in _SYSCTL.items()) with open('/etc/netconf/sysctl.conf', 'w') as conf_file: conf_file.write(buf) subprocess.run(['sysctl', '-f', '/etc/netconf/sysctl.conf'], check=False) def _save(): """Save configuration changes, if a transaction is not in progress.""" if _TRANSACTION: return _save_unlocked() def _sysctl_save(): """Save sysctl changes, if a transaction is not in progress.""" if _TRANSACTION: return _sysctl_save_unlocked() def _find_one(iface: str, key: str): """Find a single instance of configuration +key+ for +iface+.""" if iface not in _CONFIG.keys(): return None for item in _CONFIG[iface]: if key in item.keys(): return item[key] return None def _find_many(iface: str, key: str) -> list: """Find n instances of configuration +key+ for +iface+.""" ret = list() if iface not in _CONFIG.keys(): return None for item in _CONFIG[iface]: if key in item.keys(): ret.append(item[key]) return ret def _add_one_to_list(iface: str, key: str, value: str): """Add +value+ to list +key+ for +iface+.""" _CONFIG[iface].append({key: value}) _save() def _replace_one(iface: str, key: str, value: str): """Replace a single instance of +key+ for +iface+ with +value+.""" iface_cfg = _CONFIG[iface] for item in iface_cfg: if key in item.keys(): item[key] = value _save() return iface_cfg.append({key: value}) _save() return def _replace_one_in_list(iface: str, key: str, old: str, value: str): """Replace +old+ with +value+ in +iface+'s list of +key+.""" iface_cfg = _CONFIG[iface] for item in iface_cfg: if key in item.keys(): if item[key] == old: item[key] = value _save() return raise ValueError(old) def _remove_one(iface: str, key: str): """Remove a single instance of +key+ from +iface+.""" iface_cfg = _CONFIG[iface] for item in iface_cfg: if key in item.keys(): iface_cfg.remove(item) _save() return def _remove_one_from_list(iface: str, key: str, value: str): """Remove +value+ from list +key+ for +iface+.""" iface_cfg = _CONFIG[iface] for item in iface_cfg: if key in item.keys(): if item[key] == value: iface_cfg.remove(item) _save() return def _iface_path(iface: str) -> pathlib.Path: """Retrieve the system device path for the specified interface.""" return pathlib.Path('/sys/class/net/' + iface) def _load_sysctl_cfg() -> dict: """Retrieve the desired sysctl configuration.""" global _SYSCTL # pylint: disable=W0603 # Won't load during a transaction. if _TRANSACTION: return sysctl_path = pathlib.Path('/etc/netconf/sysctl.conf') if not sysctl_path.exists(): LOGGER.error(_("NETCONF sysctl config missing; check installation")) return lines = list() with open(sysctl_path, 'r') as sysctl_file: lines = sysctl_file.readlines() _SYSCTL = dict({ value[0]: value[1].strip() for value in [line.split('=') for line in lines] }) ############################# # P A R A M E T E R S # ############################# _ENI_MAPPING = {'description': 'netconf-description', 'enabled': 'auto', 'ipv4_mtu': 'mtu'} """Mapping of NMSA keys to /e/n/i keys.""" _SYSCTL_MAPPING = { 'ipv4_forwarding': 'net.ipv4.conf.{_if}.forwarding', 'ipv6_enabled': 'net.ipv6.conf.{_if}.disable_ipv6', 'ipv6_forwarding': 'net.ipv6.conf.{_if}.forwarding', 'ipv6_mtu': 'net.ipv6.conf.{_if}.mtu', 'ipv6_dad_xmit': 'net.ipv6.conf.{_if}.dad_transmits', 'ipv6_slaac_enabled': 'net.ipv6.conf.{_if}.autoconf', 'ipv6_slaac_type': 'net.ipv6.conf.{_if}.use_tempaddr', 'ipv6_slaac_validlft': 'net.ipv6.conf.{_if}.temp_valid_lft', 'ipv6_slaac_preflft': 'net.ipv6.conf.{_if}.temp_prefered_lft' } """Mapping of NMSA keys to sysctl nodes.""" # # # # Getters # # # # def get_one_eni(iface: str, parameter: str): """Retrieve the specified parameter from /e/n/i.""" return _find_one(iface, _ENI_MAPPING[parameter]) def get_sysctl(iface: str, parameter: str): """Retrieve the specified parameter from sysctl.conf.""" key = _SYSCTL_MAPPING[parameter].format(_if=iface) fallback = _SYSCTL_MAPPING[parameter].format(_if='all') return _SYSCTL.get(key, _SYSCTL.get(fallback, None)) def get_sysctl_bool(iface: str, parameter: str) -> bool: """Retrieve the specified boolean from sysctl.conf.""" value = get_sysctl(iface, parameter) if value == '1': return True return False def get_sysctl_int(iface: str, parameter: str): """Retrieve the specified integer from sysctl.conf.""" value = get_sysctl(iface, parameter) if value is None: return None return int(value) def get_ipv4en(iface: str, _) -> bool: """Retrieve IPv4 enablement status for the specified interface.""" executors = _find_many(iface, 'use') v4_execs = ('dhcp', 'apipa', 'loopback') if any(executor in executors for executor in v4_execs): # We can't guarantee that DHCPv4 is in use on this interface, # but we can't guarantee it isn't, either. return True if any('.' in addr for addr in list_addresses(iface)): # IPv4 addresses contain '.'s, others don't. return True return False def get_ipv6en(iface: str, parameter: str): """Retrieve IPv6 enablement status for the specified interface.""" return not get_sysctl_bool(iface, parameter) def get_slaac(iface: str, parameter: str): """Retrieve SLAAC configuration for the specified interface.""" enabled = get_sysctl_bool(iface, 'ipv6_slaac_enabled') if not enabled: return False value = get_sysctl_int(iface, 'ipv6_slaac_type') if parameter == 'ipv6_slaac_globaladdr': return value in ('0', '1') return value in ('1', '2') def live_sysctl(iface: str, parameter: str): """Retrieve the live value of a parameter from /proc/sys.""" key = _SYSCTL_MAPPING[parameter].format(_if=iface) path = pathlib.Path("/proc/sys/" + key.replace('.', '/')) if path.exists(): with open(path, 'r') as sysctl: return sysctl.read().strip() LOGGER.error(_("kernel does not know %s"), parameter) return None def live_sysctl_bool(iface: str, parameter: str) -> bool: """Retrieve a live boolean value from /proc/sys.""" value = live_sysctl(iface, parameter) if value is None: return None if value == '1': return True return False def live_sysctl_int(iface: str, parameter: str) -> int: """Retrieve a live integer from /proc/sys.""" value = live_sysctl(iface, parameter) if value is None: return None return int(value) def live_ipv4en(iface: str, _) -> bool: """Retrieve the current IPv4 status for the specified interface.""" raise NotImplementedError def live_ipv6en(iface: str, parameter: str): """Retrieve current IPv6 status for the specified interface.""" return not live_sysctl_bool(iface, parameter) def live_ipv4_mtu(iface: str, _): """Determine the IPv4 MTU for the interface.""" mtupath = _iface_path(iface) / "mtu" if mtupath.exists(): with open(mtupath, 'r') as mtu_file: return int(mtu_file.read().strip()) # Linux kernel: net/ethernet/eth.c line 367 # include/uapi/linux/if_ether.h line 36 return 1500 def live_slaac(iface: str, parameter: str): """Determine live SLAAC configuration for the specified interface.""" enabled = live_sysctl_bool(iface, 'ipv6_slaac_enabled') if not enabled: return False value = live_sysctl_int(iface, 'ipv6_slaac_type') if parameter == 'ipv6_slaac_globaladdr': return value in ('0', '1') return value in ('1', '2') # # # # Setters # # # # def set_sysctl(iface: str, parameter: str, value): """Set a sysctl value.""" key = _SYSCTL_MAPPING[parameter].format(_if=iface) _SYSCTL[key] = value _sysctl_save() def set_sysctl_bool(iface: str, parameter: str, value: bool): """Set a boolean sysctl value.""" real = '0' if value: real = '1' set_sysctl(iface, parameter, real) def set_sysctl_int(iface: str, parameter: str, value: int): """Set an integer sysctl value.""" set_sysctl(iface, parameter, str(value)) def set_ipv4en(iface: str, _, value: bool): """Set the IPv4 enabled flag.""" raise NotImplementedError def set_ipv6en(iface: str, parameter: str, value: bool): """Set IPv6 enabled/disabled for the specified interface.""" set_sysctl_bool(iface, parameter, not value) def set_desc(iface: str, _, value: str): """Set the description for the specified interface.""" _replace_one(iface, 'netconf-description', value) def set_auto(iface: str, _, value: bool): """Set the auto flag for the specified interface.""" _replace_one(iface, 'auto', str(value).lower()) def set_ipv4_mtu(iface: str, _, value: int): """Set the MTU for the specified interface.""" _replace_one(iface, 'mtu', str(value)) def set_slaac(iface: str, parameter: str, value: bool): # pylint: disable=R0911 """Set SLAAC parameters for the specified interface.""" curr_global = get_slaac(iface, 'ipv6_slaac_globaladdr') curr_temp = get_slaac(iface, 'ipv6_slaac_tempaddr') if parameter == 'ipv6_slaac_globaladdr': if not value and not curr_temp: set_sysctl_bool(iface, 'ipv6_slaac_enabled', False) set_sysctl_int(iface, 'ipv6_slaac_type', '0') return # all further types enable SLAAC in some way. set_sysctl_bool(iface, 'ipv6_slaac_enabled', True) if not value and curr_temp: set_sysctl_int(iface, 'ipv6_slaac_type', '2') return if value and not curr_temp: set_sysctl_int(iface, 'ipv6_slaac_type', '0') return if value and curr_temp: set_sysctl_int(iface, 'ipv6_slaac_type', '1') return if parameter == 'ipv6_slaac_tempaddr': if not value and not curr_global: set_sysctl_bool(iface, 'ipv6_slaac_enabled', False) set_sysctl_int(iface, 'ipv6_slaac_type', '0') return # all further types enable SLAAC in some way. set_sysctl_bool(iface, 'ipv6_slaac_enabled', True) if not value and curr_global: set_sysctl_int(iface, 'ipv6_slaac_type', '0') return if value and not curr_global: set_sysctl_int(iface, 'ipv6_slaac_type', '2') return if value and curr_global: set_sysctl_int(iface, 'ipv6_slaac_type', '1') return LOGGER.error(_("unknown SLAAC parameter %s"), parameter) # # # # Unsetters # # # # def unset_one_eni(iface: str, parameter: str): """Unset a parameter in /e/n/i.""" _remove_one(iface, _ENI_MAPPING[parameter]) def unset_sysctl(iface: str, parameter: str): """Unset a sysctl.""" key = _SYSCTL_MAPPING[parameter].format(_if=iface) _SYSCTL.pop(key, None) _sysctl_save() def unset_ipv4en(iface: str, _): """Unset the IPv4 enabled flag.""" raise NotImplementedError def unset_slaac(iface: str, parameter: str): """Set SLAAC parameters for the specified interface.""" curr_global = get_slaac(iface, 'ipv6_slaac_globaladdr') curr_temp = get_slaac(iface, 'ipv6_slaac_tempaddr') if parameter == 'ipv6_slaac_globaladdr': # unset => default true set_sysctl_bool(iface, 'ipv6_slaac_enabled', True) if curr_temp: set_sysctl_int(iface, 'ipv6_slaac_type', '1') else: set_sysctl_int(iface, 'ipv6_slaac_type', '0') return if parameter == 'ipv6_slaac_tempaddr': # unset => default false set_sysctl_int(iface, 'ipv6_slaac_type', '0') if not curr_global: set_sysctl_bool(iface, 'ipv6_slaac_enabled', False) return LOGGER.error(_("unknown SLAAC parameter %s"), parameter) _PARAMETERS = { # "name": (getter, live getter, setter, unsetter) 'description': (get_one_eni, get_one_eni, set_desc, unset_one_eni), 'enabled': (get_one_eni, get_one_eni, set_auto, unset_one_eni), 'ipv4_enabled': (get_ipv4en, live_ipv4en, set_ipv4en, unset_ipv4en), 'ipv4_forwarding': (get_sysctl_bool, live_sysctl_bool, set_sysctl_bool, unset_sysctl), 'ipv4_mtu': (get_one_eni, live_ipv4_mtu, set_ipv4_mtu, unset_one_eni), 'ipv6_enabled': (get_ipv6en, live_ipv6en, set_ipv6en, unset_sysctl), 'ipv6_forwarding': (get_sysctl_bool, live_sysctl_bool, set_sysctl_bool, unset_sysctl), 'ipv6_mtu': (get_sysctl_int, live_sysctl_int, set_sysctl_int, unset_sysctl), 'ipv6_dad_xmit': (get_sysctl_int, live_sysctl_int, set_sysctl_int, unset_sysctl), 'ipv6_slaac_globaladdr': (get_slaac, live_slaac, set_slaac, unset_slaac), 'ipv6_slaac_tempaddr': (get_slaac, live_slaac, set_slaac, unset_slaac), 'ipv6_slaac_validlft': (get_sysctl_int, live_sysctl_int, set_sysctl_int, unset_sysctl), 'ipv6_slaac_preflft': (get_sysctl_int, live_sysctl_int, set_sysctl_int, unset_sysctl) } """Describes all supported parameters and their methods.""" ############################# # P U B L I C A P I # ############################# def interface_list(): """Return a list of configured interfaces.""" _load_config() return tuple(_CONFIG.keys()) def remove_interface(iface: str): """Completely remove configuration for +iface+.""" if iface in _CONFIG.keys(): del _CONFIG[iface] if not _TRANSACTION: _save() def begin_transaction(): """Begin a transaction.""" global _TRANSACTION # pylint: disable=W0603 if _TRANSACTION: LOGGER.error(_("attempt to nest transactions")) return _TRANSACTION = True def commit(): """Commit any outstanding operations.""" global _TRANSACTION # pylint: disable=W0603 if not _TRANSACTION: LOGGER.warning(_("commit when no transaction is in progress")) _save_unlocked() _sysctl_save_unlocked() _TRANSACTION = False def rollback(): """Roll back outstanding operations.""" global _TRANSACTION # pylint: disable=W0603 _load_config() _load_sysctl_cfg() _TRANSACTION = False @contextmanager def transaction(): """Context manager for NMSA transactions.""" begin_transaction() try: yield None except: rollback() raise else: commit() def get_param(iface: str, parameter: str): """Retrieve the parameter for the specified interface.""" _load_config() _load_sysctl_cfg() if iface not in _CONFIG.keys(): LOGGER.warning( _("requested parameter %s for non-existent interface %s"), parameter, iface ) return None if parameter not in _PARAMETERS.keys(): LOGGER.error(_("requested non-existent parameter %s for interface %s"), parameter, iface) return None return _PARAMETERS[parameter][0](iface, parameter) def curr_param(iface: str, parameter: str): """Retrieve the current parameter value for the specified interface.""" _load_config() # Won't read from sysctl.conf so don't need to _load_sysctl_cfg. if parameter not in _PARAMETERS.keys(): LOGGER.error(_("requested non-existent parameter %s for interface %s"), parameter, iface) return None return _PARAMETERS[parameter][1](iface, parameter) def set_param(iface: str, parameter: str, value): """Set the parameter for the specified interface.""" _load_config() _load_sysctl_cfg() if parameter not in _PARAMETERS.keys(): LOGGER.error( _("attempted to set non-existent parameter %s for interface %s"), parameter, iface ) return # Allow creation of new interfaces from NETCONF. if iface not in _CONFIG.keys(): _CONFIG[iface] = list() _PARAMETERS[parameter][2](iface, parameter, value) def unset_param(iface: str, parameter: str): """Unset the parameter for the specified interface.""" _load_config() _load_sysctl_cfg() if iface not in _CONFIG.keys(): LOGGER.warning( _("attempted to unset parameter %s for non-existent interface %s"), parameter, iface ) return if parameter not in _PARAMETERS.keys(): LOGGER.error( _("attempted to unset non-existent parameter %s for interface %s"), parameter, iface ) return _PARAMETERS[parameter][3](iface, parameter) def list_addresses(iface: str) -> list: """Retrieve all configured addresses for the specified interface.""" _load_config() if iface not in _CONFIG.keys(): LOGGER.warning(_("requested addresses for non-existent interface %s"), iface) return list() return _find_many(iface, 'address') def live_addresses(iface: str) -> list: """Retrieve live addresses for the specified interface.""" addresses = list() if iface not in netifaces.interfaces(): LOGGER.warning(_("interface %s is not live"), iface) return addresses raw = netifaces.ifaddresses(iface) for ipv4 in raw.get(socket.AF_INET, tuple()): addr = ipv4['addr'] mask = ipv4['netmask'] iface = ipaddress.IPv4Interface("{a}/{m}".format(a=addr, m=mask)) addresses.append(iface) for ipv6 in raw.get(socket.AF_INET6, tuple()): addr = ipv6['addr'].split('%')[0] mask = ipv6['netmask'][ipv6['netmask'].find('/') + 1:] iface = ipaddress.IPv6Interface("{a}/{m}".format(a=addr, m=mask)) addresses.append(iface) return addresses def add_address(iface: str, _type, addr: str, prefix): """Add an address of the specified ``type`` to the specified interface.""" _load_config() if iface not in _CONFIG.keys(): LOGGER.warning( _("attempted to add address to non-existent interface %s"), iface ) return if _type not in (socket.AF_INET, socket.AF_INET6): LOGGER.error(_("unknown address type %r"), _type) return ip_addr = None ctor = ipaddress.IPv6Interface if _type == socket.AF_INET: ctor = ipaddress.IPv4Interface try: ip_addr = ctor("{a}/{p}".format(a=addr, p=prefix)) except Exception as err: raise ValueError("IP address is not valid") from err for candidate in list_addresses(iface): cand_addr = candidate.split('/')[0] if addr == cand_addr: raise RuntimeError("Duplicate address attempt") s_addr = str(ip_addr) _add_one_to_list(iface, 'address', s_addr) def modify_prefix(iface: str, addr: str, prefix: str): """Change the prefix of existing IP address +addr+ to +prefix+.""" _load_config() if iface not in _CONFIG.keys(): LOGGER.warning( _("attempted to change addr prefix on non-existent interface %s"), iface ) return if '/' in addr: # Wow, the entire address! Much simpler. the_ip = addr.split('/')[0] if addr not in list_addresses(iface): raise KeyError("Non-existent address cannot be modified") _replace_one_in_list(iface, 'address', addr, the_ip + '/' + prefix) return for candidate in list_addresses(iface): cand_addr = candidate.split('/')[0] if addr == cand_addr: _replace_one_in_list(iface, 'address', candidate, addr + '/' + prefix) return raise KeyError("Non-existent address cannot be modified") def remove_address(iface: str, addr: str): """Remove an address from the specified interface.""" _load_config() if iface not in _CONFIG.keys(): LOGGER.warning( _("attempted to remove address from non-existent interface %s"), iface ) return if '/' in addr: # We have the entire address, including the prefix length. if addr not in list_addresses(iface): raise KeyError("Non-existent address cannot be removed") _remove_one_from_list(iface, 'address', addr) return for candidate in list_addresses(iface): cand_addr = candidate.split('/')[0] if addr == cand_addr: _remove_one_from_list(iface, 'address', candidate) return raise KeyError("Non-existent address cannot be removed") def running(_): """Non-YANG functional module only - no nodes.""" def operational(_): """Non-YANG functional module only - no nodes.""" def edit(*params): # pylint: disable=W0613 """Non-YANG functional module only - no nodes.""" # Load immediately when we're loaded so we can go straight to a transaction # if desired. _load_config() _load_sysctl_cfg()