"""
NETCONF for APK Distributions server:
Network Management System abstraction module for ifupdown-ng
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import ipaddress
import logging
import socket
import subprocess
import yaml
from ncserver.base.util import _
LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""
M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""
M_PREFIX = "nmsa"
"""The XML tag prefix for this module's tags."""
M_NS = "http://netconf.adelielinux.org/ns/netmgmt"
"""The XML namespace for this module."""
M_NAME = "adelie-nms-abstract"
"""The YANG model name for this module."""
M_REVISION = "2020-11-18"
"""The YANG revision date for this module."""
_CONFIG = dict()
"""The internal configuration handle."""
_TRANSACTION = False
"""Determines if a transaction is in progress."""
def _load_config():
"""Load the current active configuration from /e/n/i."""
global _CONFIG # pylint: disable=W0603
# Won't load during a transaction.
if _TRANSACTION:
return
result = None
try:
result = subprocess.run(['/sbin/ifparse', '-AF',
'yaml-raw'],
stdout=subprocess.PIPE, check=False)
except OSError:
LOGGER.error(_("ifupdown-ng may not be installed properly"))
return
if result.returncode != 0:
LOGGER.error(_("ifparse returned error %d"), result.returncode)
return
rawyaml = result.stdout.decode('utf-8')
_CONFIG = yaml.safe_load(rawyaml)
def _save():
"""Save changes to the configuration."""
eni = ""
for iface in _CONFIG.keys():
buf = "iface " + iface + "\n"
for item in _CONFIG[iface]:
for key, val in item.items():
if key == 'auto' and \
(val is True or val.lower()[0] == 't'):
buf = "auto " + iface + "\n" + buf
continue
if isinstance(val, bool):
val = str(val).lower()
buf += " "
buf += str(key) + " " + str(val)
buf += "\n"
eni += buf + "\n"
with open('/etc/network/interfaces', 'w') as conf_file:
# snip last double-\n off
conf_file.write(eni[:-1])
def _find_one(iface: str, key: str):
"""Find a single instance of configuration +key+ for +iface+."""
if iface not in _CONFIG.keys():
return None
for item in _CONFIG[iface]:
if key in item.keys():
return item[key]
return None
def _find_many(iface: str, key: str) -> list:
"""Find n instances of configuration +key+ for +iface+."""
ret = list()
if iface not in _CONFIG.keys():
return None
for item in _CONFIG[iface]:
if key in item.keys():
ret.append(item[key])
return ret
def interface_list():
"""Return a list of configured interfaces."""
_load_config()
return tuple(_CONFIG.keys())
def remove_interface(iface: str):
"""Completely remove configuration for +iface+."""
if iface in _CONFIG.keys():
del _CONFIG[iface]
if not _TRANSACTION:
_save()
def begin_transaction():
"""Begin a transaction."""
global _TRANSACTION # pylint: disable=W0603
if _TRANSACTION:
LOGGER.error(_("attempt to nest transactions"))
return
_TRANSACTION = True
def commit():
"""Commit any outstanding operations."""
global _TRANSACTION # pylint: disable=W0603
if not _TRANSACTION:
LOGGER.warning(_("commit when no transaction is in progress"))
_save()
_TRANSACTION = False
def get_param(iface: str, parameter: str):
"""Retrieve the parameter for the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("requested parameter %s for non-existent interface %s"),
parameter, iface
)
return None
# implement this.
raise NotImplementedError
def set_param(iface: str, parameter: str, value):
"""Set the parameter for the specified interface."""
_load_config()
# Allow creation of new interfaces from NETCONF.
if iface not in _CONFIG.keys():
_CONFIG[iface] = list()
# implement this.
raise NotImplementedError
def unset_param(iface: str, parameter: str):
"""Unset the parameter for the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to unset parameter %s for non-existent interface %s"),
parameter, iface
)
return
# implement this.
raise NotImplementedError
def list_addresses(iface: str) -> list:
"""Retrieve all configured addresses for the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(_("requested addresses for non-existent interface %s"),
iface)
return list()
# Per comment in lif_address_format_cidr, this is the right thing to do.
fallback_prefix = "24"
netmask = _find_one(iface, 'netmask')
if netmask:
net = ipaddress.IPv4Network('0.0.0.0/'+netmask)
fallback_prefix = str(net.prefixlen)
addrs = _find_many(iface, 'address')
def fixup(addr):
if '/' not in addr:
addr = addr + "/" + fallback_prefix
return addr
return list(map(fixup, addrs))
def add_address(iface: str, _type, addr: str, prefix):
"""Add an address of the specified ``type`` to the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to add address to non-existent interface %s"), iface
)
return
if _type not in (socket.AF_INET, socket.AF_INET6):
LOGGER.error(_("unknown address type %r"), _type)
return
# implement this.
raise NotImplementedError
def remove_address(iface: str, addr: str):
"""Remove an address from the specified interface."""
_load_config()
if iface not in _CONFIG.keys():
LOGGER.warning(
_("attempted to remove address from non-existent interface %s"),
iface
)
return
# implement this.
raise NotImplementedError
# Load immediately when we're loaded so we can go straight to a transaction
# if desired.
_load_config()