"""
NETCONF for APK Distributions server:
Network Management System abstraction module for ifupdown-ng
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import ipaddress
import logging
import socket
import subprocess
import yaml
from ncserver.base.util import _
LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""
M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""
M_PREFIX = "nmsa"
"""The XML tag prefix for this module's tags."""
M_NS = "http://netconf.adelielinux.org/ns/netmgmt"
"""The XML namespace for this module."""
M_NAME = "adelie-nms-abstract"
"""The YANG model name for this module."""
M_REVISION = "2020-11-18"
"""The YANG revision date for this module."""
_CONFIG = dict()
"""The internal configuration handle."""
_TRANSACTION = False
"""Determines if a transaction is in progress."""
#############################
# I N T E R N A L #
#############################
def _load_config():
"""Load the current active configuration from /e/n/i."""
global _CONFIG # pylint: disable=W0603
# Won't load during a transaction.
if _TRANSACTION:
return
result = None
try:
result = subprocess.run(['/sbin/ifparse', '-AF',
'yaml-raw'],
stdout=subprocess.PIPE, check=False)
except OSError:
LOGGER.error(_("ifupdown-ng may not be installed properly"))
return
if result.returncode != 0:
LOGGER.error(_("ifparse returned error %d"), result.returncode)
return
rawyaml = result.stdout.decode('utf-8')
_CONFIG = yaml.safe_load(rawyaml)
def _save_unlocked():
"""Save changes to the configuration."""
eni = ""
for iface in _CONFIG.keys():
buf = "iface " + iface + "\n"
for item in _CONFIG[iface]:
for key, val in item.items():
if key.casefold() == 'auto'.casefold() and \
(val is True or val.lower()[0] == 't'):
buf = "auto " + iface + "\n" + buf
continue
if isinstance(val, bool):
val = str(val).lower()
buf += " "
buf += str(key) + " " + str(val)
buf += "\n"
eni += buf + "\n"
with open('/etc/network/interfaces', 'w') as conf_file:
# snip last double-\n off
conf_file.write(eni[:-1])
def _save():
"""Save configuration changes, if a transaction is not in progress."""
if _TRANSACTION:
return
_save_unlocked()
def _find_one(iface: str, key: str):
"""Find a single instance of configuration +key+ for +iface+."""
if iface not in _CONFIG.keys():
return None
for item in _CONFIG[iface]:
for candidate in item.keys():
if key.casefold() == candidate.casefold():
return item[candidate]
return None
def _find_many(iface: str, key: str) -> list:
"""Find n instances of configuration +key+ for +iface+."""
ret = list()
if iface not in _CONFIG.keys():
return None
for item in _CONFIG[iface]:
for candidate in item.keys():
if key.casefold() == candidate.casefold():
ret.append(item[candidate])
return ret
def _replace_one(iface: str, key: str, value: str):
"""Replace a single instance of +key+ for +iface+ with +value+."""
iface_cfg = _CONFIG[iface]
for item in iface_cfg:
for candidate in item.keys():
if key.casefold() == candidate.casefold():
item[candidate] = value
_save()
return
iface_cfg.append({key: value})
_save()
return
def _remove_one(iface: str, key: str):
"""Remove a single instance of +key+ from +iface+."""
iface_cfg = _CONFIG[iface]
for item in iface_cfg:
for candidate in item.keys():
if key.casefold() == candidate.casefold():
iface_cfg.remove(item)
_save()
return
#############################
# P A R A M E T E R S #
#############################
def get_desc(iface: str):
"""Retrieve the description for the specified interface."""
return _find_one(iface, 'netconf-description')
def set_desc(iface: str, value: str):
"""Set the description for the specified interface."""
_replace_one(iface, 'netconf-description', value)
def unset_desc(iface: str):
"""Unset the description for the specified interface."""
_remove_one(iface, 'netconf-description')
_PARAMETERS = {
# "name": (getter, setter, unsetter)
"description": (get_desc, set_desc, unset_desc)
}
"""Describes all supported parameters and their methods."""
#############################
# P U B L I C A P I #
#############################
def interface_list():
"""Return a list of configured interfaces."""
_load_config()
return tuple(_CONFIG.keys())
def remove_interface(iface: str):
"""Completely remove configuration for +iface+."""
if iface in _CONFIG.keys():
del _CONFIG[iface]
if not _TRANSACTION:
_save()
def begin_transaction():
"""Begin a transaction."""
global _TRANSACTION # pylint: disable=W0603
if _TRANSACTION:
LOGGER.error(_("attempt to nest transactions"))
return
_TRANSACTION = True
def commit():
"""Commit any outstanding operations."""
global _TRANSACTION # pylint: disable=W0603
if not _TRANSACTION:
LOGGER.warning(_("commit when no transaction is in progress"))
_save_unlocked()
_TRANSACTION = False
def get_param(iface: str, parameter: str):
"""Retrieve the parameter for the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("requested parameter %s for non-existent interface %s"),
parameter, iface
)
return None
if parameter not in _PARAMETERS.keys():
LOGGER.error(_("requested non-existent parameter %s for interface %s"),
parameter, iface)
return None
return _PARAMETERS[parameter][0](iface)
def set_param(iface: str, parameter: str, value):
"""Set the parameter for the specified interface."""
_load_config()
if parameter not in _PARAMETERS.keys():
LOGGER.error(
_("attempted to set non-existent parameter %s for interface %s"),
parameter, iface
)
return
# Allow creation of new interfaces from NETCONF.
if iface not in _CONFIG.keys():
_CONFIG[iface] = list()
_PARAMETERS[parameter][1](iface, value)
def unset_param(iface: str, parameter: str):
"""Unset the parameter for the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to unset parameter %s for non-existent interface %s"),
parameter, iface
)
return
if parameter not in _PARAMETERS.keys():
LOGGER.error(
_("attempted to unset non-existent parameter %s for interface %s"),
parameter, iface
)
return
_PARAMETERS[parameter][2](iface)
def list_addresses(iface: str) -> list:
"""Retrieve all configured addresses for the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(_("requested addresses for non-existent interface %s"),
iface)
return list()
# Per comment in lif_address_format_cidr, this is the right thing to do.
fallback_prefix = "24"
netmask = _find_one(iface, 'netmask')
if netmask:
net = ipaddress.IPv4Network('0.0.0.0/'+netmask)
fallback_prefix = str(net.prefixlen)
addrs = _find_many(iface, 'address')
def fixup(addr):
if '/' not in addr:
addr = addr + "/" + fallback_prefix
return addr
return list(map(fixup, addrs))
def add_address(iface: str, _type, addr: str, prefix):
"""Add an address of the specified ``type`` to the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to add address to non-existent interface %s"), iface
)
return
if _type not in (socket.AF_INET, socket.AF_INET6):
LOGGER.error(_("unknown address type %r"), _type)
return
# implement this.
raise NotImplementedError
def remove_address(iface: str, addr: str):
"""Remove an address from the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to remove address from non-existent interface %s"),
iface
)
return
# implement this.
raise NotImplementedError
# Load immediately when we're loaded so we can go straight to a transaction
# if desired.
_load_config()