"""
NETCONF for APK Distributions server:
ietf-interfaces module
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import logging
import pathlib
import subprocess
from lxml import etree
from netconf import error, util
from ncserver.base.log import log_config_change
from ncserver.base.modman import MODMAN
from ncserver.base.util import _, node_operation, yang_dt_for_timestamp
from ncserver.util import maybe_raise_on_invalid_node
QName = etree.QName # pylint: disable=I1101
LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""
M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""
M_PREFIX = "if"
"""The XML tag prefix for this module's tags."""
M_NS = "urn:ietf:params:xml:ns:yang:ietf-interfaces"
"""The XML namespace for this module."""
M_NAME = "ietf-interfaces"
"""The YANG model name for this module."""
M_REVISION = "2018-02-20"
"""The YANG revision date for this module."""
M_IMPORTS = {
'ietf-yang-types@2013-07-15': {
'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang"
},
'iana-if-type@2017-01-19': {
'ns': "urn:ietf:params:xml:ns:yang:iana-if-type", 'prefix': "ianaift"
}
}
"""The imported YANG modules for this module."""
M_FEATURES = ['pre-provisioning']
"""The supported features declared in YANG for this module."""
def _add_iface_contents(container, ifaces):
"""Retrieve the interfaces for this device."""
types = {
'1': 'ethernetCsmacd',
'3': 'rfc877x25',
'7': 'arcnet',
'8': 'arap',
'19': 'atm',
'24': 'ieee1394',
'32': 'infiniband',
'256': 'slip',
'512': 'ppp',
'513': 'hdlc',
'516': 'lapb',
'768': 'tunnel',
'770': 'frameRelay',
'772': 'softwareLoopback',
'774': 'fddi',
'804': 'ieee802154'
}
for ifname in ifaces:
iface = util.subelm(container, 'if:interface')
iface.append(util.leaf_elm('if:name', ifname.name))
type_path = ifname / "type"
if type_path.exists():
type_file = open(type_path, 'r')
type_num = type_file.read()
type_file.close()
type_num = type_num.strip()
if type_num in types.keys():
iana_xmlns = "urn:ietf:params:xml:ns:yang:iana-if-type"
iface.append(util.leaf_elm('if:type',
'ianaift:'+types[type_num],
nsmap={'ianaift': iana_xmlns}))
else:
LOGGER.warning(_('unknown Linux hwtype for %s: %s'),
ifname.name, type_num)
def _get_nmsa():
"""Retrieve our NMSA module handle."""
nmsa_ns = "http://netconf.adelielinux.org/ns/netmgmt"
nmsa = MODMAN._module_for_ns(nmsa_ns) # pylint: disable=W0212
return nmsa
def _log_iface_change(session, iface: str, info: str):
"""Log a change to an interface.
:param session:
The session performing the change.
:param str iface:
The interface being changed.
:param str info:
Information about the change.
"""
log_config_change(session, "[ietf-interface %s]" % iface, info)
def running(node):
"""Retrieve the service configuration for this device."""
ifaces = util.subelm(node, 'if:interfaces')
nmsa = _get_nmsa()
if nmsa is None:
# We can't get any parameters if an NMSA module isn't loaded.
return
_add_iface_contents(ifaces, [
pathlib.Path('/sys/class/net/{iface}'.format(iface=iface))
for iface in nmsa.interface_list()
])
for iface in ifaces.iterchildren():
name = iface.find('{'+M_NS+'}name').text
desc = nmsa.get_param(name, 'description')
if desc is not None:
iface.append(util.leaf_elm('if:description', desc))
enabled = nmsa.get_param(name, 'enabled')
if enabled is not None:
iface.append(util.leaf_elm('if:enabled', str(enabled).lower()))
def _add_stats_to(iface, name: str):
"""Add the statistics node to +iface+."""
counter_tags = {
'rx.octets': 'if:in-octets',
'rx.discard': 'if:in-discards',
'rx.errors': 'if:in-errors',
'tx.octets': 'if:out-octets',
'tx.discard': 'if:out-discards',
'tx.errors': 'if:out-errors'
}
stats = util.subelm(iface, 'if:statistics')
# XXX BAD vvv
stats.append(util.leaf_elm('if:discontinuity-time',
'2020-01-01T01:01:01.011Z'))
# XXX BAD ^^^
result = subprocess.run(['/sbin/ifupdown', 'ifctrstat', name],
stdout=subprocess.PIPE, check=False)
if result.returncode != 0:
LOGGER.error(_('ifctrstat failed for %s: %s'),
name, result.returncode)
return
counters = result.stdout.decode('utf-8').split('\n')
counters.pop()
for counter, value in [data.split(': ') for data in counters]:
if counter not in counter_tags.keys():
LOGGER.warning(_('unhandled ifctrstat counter for %s: %s'),
name, counter)
continue
stats.append(util.leaf_elm(counter_tags[counter], value))
def operational(node):
"""Retrieve the service state for this device."""
ifaces = util.subelm(node, 'if:interfaces')
_add_iface_contents(ifaces, pathlib.Path('/sys/class/net').iterdir())
nmsa = _get_nmsa()
for iface in ifaces.iterchildren():
name = iface.find('{'+M_NS+'}name').text
ifpath = pathlib.Path("/sys/class/net/" + name)
if nmsa is not None:
desc = nmsa.curr_param(name, 'description')
if desc is not None:
iface.append(util.leaf_elm('if:description', desc))
enabled = nmsa.curr_param(name, 'enabled')
if enabled is not None:
iface.append(util.leaf_elm('if:enabled', str(enabled).lower()))
# if-index
if (ifpath / "ifindex").exists():
index_file = open(ifpath / "ifindex", 'r')
iface.append(util.leaf_elm('if:if-index',
index_file.read().strip()))
index_file.close()
# oper-status
status = "unknown"
if (ifpath / "operstate").exists():
status_file = open(ifpath / "operstate", 'r')
status = status_file.read().strip()
status_file.close()
iface.append(util.leaf_elm('if:oper-status', status))
# speed
if status == "up" and (ifpath / "speed").exists():
try:
with open(ifpath / "speed", 'r') as speed_file:
speed = int(speed_file.read().strip())
speed *= 1000
iface.append(util.leaf_elm('if:speed', speed))
except IOError:
pass # if the interface does not have measurable speed, omit
except ValueError:
LOGGER.warning("%s has non-integral speed; kernel bug?", name)
# phys-address
if (ifpath / "address").exists():
addr_file = open(ifpath / "address", 'r')
iface.append(util.leaf_elm('if:phys-address',
addr_file.read().strip()))
addr_file.close()
# statistics
_add_stats_to(iface, name)
def _edit_description(session, rpc, node, def_op, iface: str):
"""Edit the description for an interface."""
operation = node_operation(node, def_op)
nmsa = _get_nmsa()
already = nmsa.get_param(iface, 'description') is not None
if operation == 'create' and already:
raise error.DataExistsAppError(rpc)
if operation == 'delete' and not already:
raise error.DataMissingAppError(rpc)
if operation in ('delete', 'remove'):
_log_iface_change(session, iface, "removing description")
nmsa.unset_param(iface, 'description')
return
if operation not in ('create', 'merge', 'replace'):
raise error.OperationNotSupportedAppError(rpc)
_log_iface_change(session, iface, "description: " + node.text)
nmsa.set_param(iface, 'description', node.text)
def _edit_enabled(session, rpc, node, def_op, iface: str):
"""Edit the enabled property of an interface."""
operation = node_operation(node, def_op)
nmsa = _get_nmsa()
if operation == 'create':
raise error.DataExistsAppError(rpc)
if operation not in ('merge', 'replace'):
# You cannot delete the enabled node.
raise error.OperationNotSupportedAppError(rpc)
enable = (node.text == 'true')
_log_iface_change(session, iface, "enabled: " + str(enable))
nmsa.set_param(iface, 'enabled', enable)
def edit(session, rpc, node, def_op):
"""Edit the interface configuration for this device."""
methods = {'description': _edit_description, 'enabled': _edit_enabled}
nmsa = _get_nmsa()
# We can't edit if we don't have an NMSA module loaded.
# This guarantees that none of our child functions need to test for nmsa.
if nmsa is None:
raise error.OperationNotSupportedAppError(rpc)
for interface in node:
if QName(interface.tag).localname != 'interface':
raise error.UnknownElementAppError(rpc, interface)
name_node = interface.find('{'+M_NS+'}name')
if name_node is None:
raise error.MissingElementAppError(rpc, interface)
iface = name_node.text
operation = node_operation(interface, def_op)
if operation in ('create', 'delete', 'remove'):
# We are operating on the "entire" interface here, not just a
# single element.
if operation == 'delete' and iface not in nmsa.interface_list():
raise error.DataMissingAppError(rpc)
if operation in ('delete', 'remove'):
_log_iface_change(session, iface, "interface removal")
nmsa.remove_interface(iface)
continue
_log_iface_change(session, iface, "interface creation")
enable_node = interface.find('{'+M_NS+'}enabled')
if enable_node is None:
nmsa.set_param(iface, 'enabled', True) # Default.
else:
nmsa.set_param(iface, 'enabled', enable_node.text == 'true')
# Don't let _edit_enabled raise an error due to 'create' op.
interface.remove(enable_node)
for candidate in interface:
name = QName(candidate.tag).localname
if name in methods:
methods[name](session, rpc, candidate, operation, iface)
elif name == 'name':
continue
else:
maybe_raise_on_invalid_node(M_NS, rpc, candidate)