"""
NETCONF for APK Distributions server:
Network Management System abstraction module for ifupdown-ng
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
from contextlib import contextmanager
import logging
import pathlib
import socket
import subprocess
import yaml
from ncserver.base.util import _
LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""
M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""
M_PREFIX = "nmsa"
"""The XML tag prefix for this module's tags."""
M_NS = "http://netconf.adelielinux.org/ns/netmgmt"
"""The XML namespace for this module."""
M_NAME = "adelie-nms-abstract"
"""The YANG model name for this module."""
M_REVISION = "2020-11-18"
"""The YANG revision date for this module."""
_CONFIG = dict()
"""The internal configuration handle."""
_SYSCTL = dict()
"""The internatl sysctl configuration handle."""
_TRANSACTION = False
"""Determines if a transaction is in progress."""
#############################
# I N T E R N A L #
#############################
def _load_config():
"""Load the current active configuration from /e/n/i."""
global _CONFIG # pylint: disable=W0603
# Won't load during a transaction.
if _TRANSACTION:
return
result = None
try:
result = subprocess.run(['/sbin/ifparse', '-AF',
'yaml-raw'],
stdout=subprocess.PIPE, check=False)
except OSError:
LOGGER.error(_("ifupdown-ng may not be installed properly"))
return
if result.returncode != 0:
LOGGER.error(_("ifparse returned error %d"), result.returncode)
return
rawyaml = result.stdout.decode('utf-8')
_CONFIG = yaml.safe_load(rawyaml)
def _save_unlocked():
"""Save changes to the configuration."""
eni = ""
for iface in _CONFIG.keys():
buf = "iface " + iface + "\n"
for item in _CONFIG[iface]:
for key, val in item.items():
if key == 'auto' and \
(val is True or val.lower()[0] == 't'):
buf = "auto " + iface + "\n" + buf
continue
if isinstance(val, bool):
val = str(val).lower()
buf += " "
buf += str(key) + " " + str(val)
buf += "\n"
eni += buf + "\n"
with open('/etc/network/interfaces', 'w') as conf_file:
# snip last double-\n off
conf_file.write(eni[:-1])
def _sysctl_save_unlocked():
"""Save changes to sysctl.conf (and sysctl itself)."""
buf = "\n".join("{key}={val}".format(key=key, val=val)
for key, val in _SYSCTL.items())
with open('/etc/netconf/sysctl.conf', 'w') as conf_file:
conf_file.write(buf)
subprocess.run(['sysctl', '-f', '/etc/netconf/sysctl.conf'], check=False)
def _save():
"""Save configuration changes, if a transaction is not in progress."""
if _TRANSACTION:
return
_save_unlocked()
def _sysctl_save():
"""Save sysctl changes, if a transaction is not in progress."""
if _TRANSACTION:
return
_sysctl_save_unlocked()
def _find_one(iface: str, key: str):
"""Find a single instance of configuration +key+ for +iface+."""
if iface not in _CONFIG.keys():
return None
for item in _CONFIG[iface]:
if key in item.keys():
return item[key]
return None
def _find_many(iface: str, key: str) -> list:
"""Find n instances of configuration +key+ for +iface+."""
ret = list()
if iface not in _CONFIG.keys():
return None
for item in _CONFIG[iface]:
if key in item.keys():
ret.append(item[key])
return ret
def _replace_one(iface: str, key: str, value: str):
"""Replace a single instance of +key+ for +iface+ with +value+."""
iface_cfg = _CONFIG[iface]
for item in iface_cfg:
if key in item.keys():
item[key] = value
_save()
return
iface_cfg.append({key: value})
_save()
return
def _remove_one(iface: str, key: str):
"""Remove a single instance of +key+ from +iface+."""
iface_cfg = _CONFIG[iface]
for item in iface_cfg:
if key in item.keys():
iface_cfg.remove(item)
_save()
return
def _iface_path(iface: str) -> pathlib.Path:
"""Retrieve the system device path for the specified interface."""
return pathlib.Path('/sys/class/net/' + iface)
def _load_sysctl_cfg() -> dict:
"""Retrieve the desired sysctl configuration."""
global _SYSCTL # pylint: disable=W0603
# Won't load during a transaction.
if _TRANSACTION:
return
sysctl_path = pathlib.Path('/etc/netconf/sysctl.conf')
if not sysctl_path.exists():
LOGGER.error(_("NETCONF sysctl config missing; check installation"))
return
lines = list()
with open(sysctl_path, 'r') as sysctl_file:
lines = sysctl_file.readlines()
_SYSCTL = dict({
value[0]: value[1].strip() for value in
[line.split('=') for line in lines]
})
#############################
# P A R A M E T E R S #
#############################
_ENI_MAPPING = {'description': 'netconf-description',
'enabled': 'auto',
'ipv4_mtu': 'mtu'}
"""Mapping of NMSA keys to /e/n/i keys."""
_SYSCTL_MAPPING = {
'ipv4_forwarding': 'net.ipv4.conf.{_if}.forwarding',
'ipv6_enabled': 'net.ipv6.conf.{_if}.disable_ipv6',
'ipv6_forwarding': 'net.ipv6.conf.{_if}.forwarding',
'ipv6_mtu': 'net.ipv6.conf.{_if}.mtu',
'ipv6_dad_xmit': 'net.ipv6.conf.{_if}.dad_transmits',
'ipv6_slaac_enabled': 'net.ipv6.conf.{_if}.autoconf',
'ipv6_slaac_type': 'net.ipv6.conf.{_if}.use_tempaddr',
'ipv6_slaac_validlft': 'net.ipv6.conf.{_if}.temp_valid_lft',
'ipv6_slaac_preflft': 'net.ipv6.conf.{_if}.temp_prefered_lft'
}
"""Mapping of NMSA keys to sysctl nodes."""
# # # # Getters # # # #
def get_one_eni(iface: str, parameter: str):
"""Retrieve the specified parameter from /e/n/i."""
return _find_one(iface, _ENI_MAPPING[parameter])
def get_sysctl(iface: str, parameter: str):
"""Retrieve the specified parameter from sysctl.conf."""
key = _SYSCTL_MAPPING[parameter].format(_if=iface)
fallback = _SYSCTL_MAPPING[parameter].format(_if='all')
return _SYSCTL.get(key, _SYSCTL.get(fallback, None))
def get_sysctl_bool(iface: str, parameter: str) -> bool:
"""Retrieve the specified boolean from sysctl.conf."""
value = get_sysctl(iface, parameter)
if value == '1':
return True
return False
def get_sysctl_int(iface: str, parameter: str):
"""Retrieve the specified integer from sysctl.conf."""
value = get_sysctl(iface, parameter)
if value is None:
return None
return int(value)
def get_ipv4en(iface: str, _) -> bool:
"""Retrieve IPv4 enablement status for the specified interface."""
executors = _find_many(iface, 'use')
v4_execs = ('dhcp', 'apipa', 'loopback')
if any(executor in executors for executor in v4_execs):
# We can't guarantee that DHCPv4 is in use on this interface,
# but we can't guarantee it isn't, either.
return True
if any('.' in addr for addr in list_addresses(iface)):
# IPv4 addresses contain '.'s, others don't.
return True
return False
def get_ipv6en(iface: str, parameter: str):
"""Retrieve IPv6 enablement status for the specified interface."""
return not get_sysctl_bool(iface, parameter)
def get_slaac(iface: str, parameter: str):
"""Retrieve SLAAC configuration for the specified interface."""
enabled = get_sysctl_bool(iface, 'ipv6_slaac_enabled')
if not enabled:
return False
value = get_sysctl_int(iface, 'ipv6_slaac_type')
if parameter == 'ipv6_slaac_globaladdr':
return value in ('0', '1')
return value in ('1', '2')
def live_sysctl(iface: str, parameter: str):
"""Retrieve the live value of a parameter from /proc/sys."""
key = _SYSCTL_MAPPING[parameter].format(_if=iface)
path = pathlib.Path("/proc/sys/" + key.replace('.', '/'))
if path.exists():
with open(path, 'r') as sysctl:
return sysctl.read().strip()
LOGGER.error(_("kernel does not know %s"), parameter)
return None
def live_sysctl_bool(iface: str, parameter: str) -> bool:
"""Retrieve a live boolean value from /proc/sys."""
value = live_sysctl(iface, parameter)
if value is None:
return None
if value == '1':
return True
return False
def live_sysctl_int(iface: str, parameter: str) -> int:
"""Retrieve a live integer from /proc/sys."""
value = live_sysctl(iface, parameter)
if value is None:
return None
return int(value)
def live_ipv4en(iface: str, _) -> bool:
"""Retrieve the current IPv4 status for the specified interface."""
raise NotImplementedError
def live_ipv6en(iface: str, parameter: str):
"""Retrieve current IPv6 status for the specified interface."""
return not live_sysctl_bool(iface, parameter)
def live_ipv4_mtu(iface: str, _):
"""Determine the IPv4 MTU for the interface."""
mtupath = _iface_path(iface) / "mtu"
if mtupath.exists():
with open(mtupath, 'r') as mtu_file:
return int(mtu_file.read().strip())
# Linux kernel: net/ethernet/eth.c line 367
# include/uapi/linux/if_ether.h line 36
return 1500
def live_slaac(iface: str, parameter: str):
"""Determine live SLAAC configuration for the specified interface."""
enabled = live_sysctl_bool(iface, 'ipv6_slaac_enabled')
if not enabled:
return False
value = live_sysctl_int(iface, 'ipv6_slaac_type')
if parameter == 'ipv6_slaac_globaladdr':
return value in ('0', '1')
return value in ('1', '2')
# # # # Setters # # # #
def set_sysctl(iface: str, parameter: str, value):
"""Set a sysctl value."""
key = _SYSCTL_MAPPING[parameter].format(_if=iface)
_SYSCTL[key] = value
_sysctl_save()
def set_sysctl_bool(iface: str, parameter: str, value: bool):
"""Set a boolean sysctl value."""
real = '0'
if value:
real = '1'
set_sysctl(iface, parameter, real)
def set_sysctl_int(iface: str, parameter: str, value: int):
"""Set an integer sysctl value."""
set_sysctl(iface, parameter, str(value))
def set_ipv4en(iface: str, _, value: bool):
"""Set the IPv4 enabled flag."""
raise NotImplementedError
def set_ipv6en(iface: str, parameter: str, value: bool):
"""Set IPv6 enabled/disabled for the specified interface."""
set_sysctl_bool(iface, parameter, not value)
def set_desc(iface: str, _, value: str):
"""Set the description for the specified interface."""
_replace_one(iface, 'netconf-description', value)
def set_auto(iface: str, _, value: bool):
"""Set the auto flag for the specified interface."""
_replace_one(iface, 'auto', str(value).lower())
def set_ipv4_mtu(iface: str, _, value: int):
"""Set the MTU for the specified interface."""
_replace_one(iface, 'mtu', str(value))
def set_slaac(iface: str, parameter: str, value: bool):
# pylint: disable=R0911
"""Set SLAAC parameters for the specified interface."""
curr_global = get_slaac(iface, 'ipv6_slaac_globaladdr')
curr_temp = get_slaac(iface, 'ipv6_slaac_tempaddr')
if parameter == 'ipv6_slaac_globaladdr':
if not value and not curr_temp:
set_sysctl_bool(iface, 'ipv6_slaac_enabled', False)
set_sysctl_int(iface, 'ipv6_slaac_type', '0')
return
# all further types enable SLAAC in some way.
set_sysctl_bool(iface, 'ipv6_slaac_enabled', True)
if not value and curr_temp:
set_sysctl_int(iface, 'ipv6_slaac_type', '2')
return
if value and not curr_temp:
set_sysctl_int(iface, 'ipv6_slaac_type', '0')
return
if value and curr_temp:
set_sysctl_int(iface, 'ipv6_slaac_type', '1')
return
if parameter == 'ipv6_slaac_tempaddr':
if not value and not curr_global:
set_sysctl_bool(iface, 'ipv6_slaac_enabled', False)
set_sysctl_int(iface, 'ipv6_slaac_type', '0')
return
# all further types enable SLAAC in some way.
set_sysctl_bool(iface, 'ipv6_slaac_enabled', True)
if not value and curr_global:
set_sysctl_int(iface, 'ipv6_slaac_type', '0')
return
if value and not curr_global:
set_sysctl_int(iface, 'ipv6_slaac_type', '2')
return
if value and curr_global:
set_sysctl_int(iface, 'ipv6_slaac_type', '1')
return
LOGGER.error(_("unknown SLAAC parameter %s"), parameter)
# # # # Unsetters # # # #
def unset_one_eni(iface: str, parameter: str):
"""Unset a parameter in /e/n/i."""
_remove_one(iface, _ENI_MAPPING[parameter])
def unset_sysctl(iface: str, parameter: str):
"""Unset a sysctl."""
key = _SYSCTL_MAPPING[parameter].format(_if=iface)
_SYSCTL.pop(key, None)
_sysctl_save()
def unset_ipv4en(iface: str, _):
"""Unset the IPv4 enabled flag."""
raise NotImplementedError
def unset_slaac(iface: str, parameter: str):
"""Set SLAAC parameters for the specified interface."""
curr_global = get_slaac(iface, 'ipv6_slaac_globaladdr')
curr_temp = get_slaac(iface, 'ipv6_slaac_tempaddr')
if parameter == 'ipv6_slaac_globaladdr':
# unset => default true
set_sysctl_bool(iface, 'ipv6_slaac_enabled', True)
if curr_temp:
set_sysctl_int(iface, 'ipv6_slaac_type', '1')
else:
set_sysctl_int(iface, 'ipv6_slaac_type', '0')
return
if parameter == 'ipv6_slaac_tempaddr':
# unset => default false
set_sysctl_int(iface, 'ipv6_slaac_type', '0')
if not curr_global:
set_sysctl_bool(iface, 'ipv6_slaac_enabled', False)
return
LOGGER.error(_("unknown SLAAC parameter %s"), parameter)
_PARAMETERS = {
# "name": (getter, live getter, setter, unsetter)
'description': (get_one_eni, get_one_eni, set_desc, unset_one_eni),
'enabled': (get_one_eni, get_one_eni, set_auto, unset_one_eni),
'ipv4_enabled': (get_ipv4en, live_ipv4en, set_ipv4en, unset_ipv4en),
'ipv4_forwarding': (get_sysctl_bool, live_sysctl_bool,
set_sysctl_bool, unset_sysctl),
'ipv4_mtu': (get_one_eni, live_ipv4_mtu,
set_ipv4_mtu, unset_one_eni),
'ipv6_enabled': (get_ipv6en, live_ipv6en, set_ipv6en, unset_sysctl),
'ipv6_forwarding': (get_sysctl_bool, live_sysctl_bool,
set_sysctl_bool, unset_sysctl),
'ipv6_mtu': (get_sysctl_int, live_sysctl_int,
set_sysctl_int, unset_sysctl),
'ipv6_dad_xmit': (get_sysctl_int, live_sysctl_int,
set_sysctl_int, unset_sysctl),
'ipv6_slaac_globaladdr': (get_slaac, live_slaac, set_slaac, unset_slaac),
'ipv6_slaac_tempaddr': (get_slaac, live_slaac, set_slaac, unset_slaac),
'ipv6_slaac_validlft': (get_sysctl_int, live_sysctl_int,
set_sysctl_int, unset_sysctl),
'ipv6_slaac_preflft': (get_sysctl_int, live_sysctl_int,
set_sysctl_int, unset_sysctl)
}
"""Describes all supported parameters and their methods."""
#############################
# P U B L I C A P I #
#############################
def interface_list():
"""Return a list of configured interfaces."""
_load_config()
return tuple(_CONFIG.keys())
def remove_interface(iface: str):
"""Completely remove configuration for +iface+."""
if iface in _CONFIG.keys():
del _CONFIG[iface]
if not _TRANSACTION:
_save()
def begin_transaction():
"""Begin a transaction."""
global _TRANSACTION # pylint: disable=W0603
if _TRANSACTION:
LOGGER.error(_("attempt to nest transactions"))
return
_TRANSACTION = True
def commit():
"""Commit any outstanding operations."""
global _TRANSACTION # pylint: disable=W0603
if not _TRANSACTION:
LOGGER.warning(_("commit when no transaction is in progress"))
_save_unlocked()
_sysctl_save_unlocked()
_TRANSACTION = False
def rollback():
"""Roll back outstanding operations."""
global _TRANSACTION # pylint: disable=W0603
_load_config()
_load_sysctl_cfg()
_TRANSACTION = False
@contextmanager
def transaction():
"""Context manager for NMSA transactions."""
begin_transaction()
try:
yield None
except:
rollback()
raise
else:
commit()
def get_param(iface: str, parameter: str):
"""Retrieve the parameter for the specified interface."""
_load_config()
_load_sysctl_cfg()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("requested parameter %s for non-existent interface %s"),
parameter, iface
)
return None
if parameter not in _PARAMETERS.keys():
LOGGER.error(_("requested non-existent parameter %s for interface %s"),
parameter, iface)
return None
return _PARAMETERS[parameter][0](iface, parameter)
def curr_param(iface: str, parameter: str):
"""Retrieve the current parameter value for the specified interface."""
_load_config()
# Won't read from sysctl.conf so don't need to _load_sysctl_cfg.
if parameter not in _PARAMETERS.keys():
LOGGER.error(_("requested non-existent parameter %s for interface %s"),
parameter, iface)
return None
return _PARAMETERS[parameter][1](iface, parameter)
def set_param(iface: str, parameter: str, value):
"""Set the parameter for the specified interface."""
_load_config()
_load_sysctl_cfg()
if parameter not in _PARAMETERS.keys():
LOGGER.error(
_("attempted to set non-existent parameter %s for interface %s"),
parameter, iface
)
return
# Allow creation of new interfaces from NETCONF.
if iface not in _CONFIG.keys():
_CONFIG[iface] = list()
_PARAMETERS[parameter][2](iface, parameter, value)
def unset_param(iface: str, parameter: str):
"""Unset the parameter for the specified interface."""
_load_config()
_load_sysctl_cfg()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to unset parameter %s for non-existent interface %s"),
parameter, iface
)
return
if parameter not in _PARAMETERS.keys():
LOGGER.error(
_("attempted to unset non-existent parameter %s for interface %s"),
parameter, iface
)
return
_PARAMETERS[parameter][3](iface, parameter)
def list_addresses(iface: str) -> list:
"""Retrieve all configured addresses for the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(_("requested addresses for non-existent interface %s"),
iface)
return list()
return _find_many(iface, 'address')
def add_address(iface: str, _type, addr: str, prefix):
"""Add an address of the specified ``type`` to the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to add address to non-existent interface %s"), iface
)
return
if _type not in (socket.AF_INET, socket.AF_INET6):
LOGGER.error(_("unknown address type %r"), _type)
return
# implement this.
raise NotImplementedError
def remove_address(iface: str, addr: str):
"""Remove an address from the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to remove address from non-existent interface %s"),
iface
)
return
# implement this.
raise NotImplementedError
def running(_):
"""Non-YANG functional module only - no nodes."""
def operational(_):
"""Non-YANG functional module only - no nodes."""
def edit(*params): # pylint: disable=W0613
"""Non-YANG functional module only - no nodes."""
# Load immediately when we're loaded so we can go straight to a transaction
# if desired.
_load_config()
_load_sysctl_cfg()