""" NETCONF for APK Distributions server: ietf-interfaces module Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ import logging import pathlib import subprocess from lxml.etree import QName # pylint: disable=E0611 from netconf import error, util from ncserver.base.log import log_config_change from ncserver.base.util import _, node_operation from ncserver.util import get_nmsa, maybe_raise_on_invalid_node, \ system_boot_time LOGGER = logging.getLogger(__name__) """The object used for logging informational messages.""" M_ABI_VERSION = 1 """The ABI version of this NETCONF module.""" M_PREFIX = "if" """The XML tag prefix for this module's tags.""" M_NS = "urn:ietf:params:xml:ns:yang:ietf-interfaces" """The XML namespace for this module.""" M_NAME = "ietf-interfaces" """The YANG model name for this module.""" M_REVISION = "2018-02-20" """The YANG revision date for this module.""" M_IMPORTS = { 'ietf-yang-types@2013-07-15': { 'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang" }, 'iana-if-type@2017-01-19': { 'ns': "urn:ietf:params:xml:ns:yang:iana-if-type", 'prefix': "ianaift" } } """The imported YANG modules for this module.""" M_FEATURES = ['pre-provisioning'] """The supported features declared in YANG for this module.""" def _add_iface_contents(container, ifaces): """Retrieve the interfaces for this device.""" types = { '1': 'ethernetCsmacd', '3': 'rfc877x25', '7': 'arcnet', '8': 'arap', '19': 'atm', '24': 'ieee1394', '32': 'infiniband', '256': 'slip', '512': 'ppp', '513': 'hdlc', '516': 'lapb', '768': 'tunnel', '770': 'frameRelay', '772': 'softwareLoopback', '774': 'fddi', '804': 'ieee802154' } for ifname in ifaces: iface = util.subelm(container, 'if:interface') iface.append(util.leaf_elm('if:name', ifname.name)) type_path = ifname / "type" if type_path.exists(): type_file = open(type_path, 'r') type_num = type_file.read() type_file.close() type_num = type_num.strip() if type_num in types.keys(): iana_xmlns = "urn:ietf:params:xml:ns:yang:iana-if-type" iface.append(util.leaf_elm('if:type', 'ianaift:'+types[type_num], nsmap={'ianaift': iana_xmlns})) else: LOGGER.warning(_('unknown Linux hwtype for %s: %s'), ifname.name, type_num) def _log_iface_change(session, iface: str, info: str): """Log a change to an interface. :param session: The session performing the change. :param str iface: The interface being changed. :param str info: Information about the change. """ log_config_change(session, "[ietf-interface %s]" % iface, info) def running(node): """Retrieve the service configuration for this device.""" ifaces = util.subelm(node, 'if:interfaces') nmsa = get_nmsa() if nmsa is None: # We can't get any parameters if an NMSA module isn't loaded. return _add_iface_contents(ifaces, [ pathlib.Path('/sys/class/net/{iface}'.format(iface=iface)) for iface in nmsa.interface_list() ]) for iface in ifaces.iterchildren(): name = iface.find('{'+M_NS+'}name').text desc = nmsa.get_param(name, 'description') if desc is not None: iface.append(util.leaf_elm('if:description', desc)) enabled = nmsa.get_param(name, 'enabled') if enabled is not None: iface.append(util.leaf_elm('if:enabled', str(enabled).lower())) def _add_stats_to(iface, name: str): """Add the statistics node to +iface+.""" counter_tags = { 'rx.octets': 'if:in-octets', 'rx.discard': 'if:in-discards', 'rx.errors': 'if:in-errors', 'tx.octets': 'if:out-octets', 'tx.discard': 'if:out-discards', 'tx.errors': 'if:out-errors' } stats = util.subelm(iface, 'if:statistics') stats.append(util.leaf_elm('if:discontinuity-time', system_boot_time())) result = subprocess.run(['/sbin/ifupdown', 'ifctrstat', name], stdout=subprocess.PIPE, check=False) if result.returncode != 0: LOGGER.error(_('ifctrstat failed for %s: %s'), name, result.returncode) return counters = result.stdout.decode('utf-8').split('\n') counters.pop() for counter, value in [data.split(': ') for data in counters]: if counter not in counter_tags.keys(): LOGGER.warning(_('unhandled ifctrstat counter for %s: %s'), name, counter) continue stats.append(util.leaf_elm(counter_tags[counter], value)) def operational(node): """Retrieve the service state for this device.""" ifaces = util.subelm(node, 'if:interfaces') _add_iface_contents(ifaces, pathlib.Path('/sys/class/net').iterdir()) nmsa = get_nmsa() for iface in ifaces.iterchildren(): name = iface.find('{'+M_NS+'}name').text ifpath = pathlib.Path("/sys/class/net/" + name) if nmsa is not None: desc = nmsa.curr_param(name, 'description') if desc is not None: iface.append(util.leaf_elm('if:description', desc)) enabled = nmsa.curr_param(name, 'enabled') if enabled is not None: iface.append(util.leaf_elm('if:enabled', str(enabled).lower())) # if-index if (ifpath / "ifindex").exists(): index_file = open(ifpath / "ifindex", 'r') iface.append(util.leaf_elm('if:if-index', index_file.read().strip())) index_file.close() # oper-status status = "unknown" if (ifpath / "operstate").exists(): status_file = open(ifpath / "operstate", 'r') status = status_file.read().strip() status_file.close() iface.append(util.leaf_elm('if:oper-status', status)) # speed if status == "up" and (ifpath / "speed").exists(): try: with open(ifpath / "speed", 'r') as speed_file: speed = int(speed_file.read().strip()) speed *= 1000 iface.append(util.leaf_elm('if:speed', speed)) except IOError: pass # if the interface does not have measurable speed, omit except ValueError: LOGGER.warning("%s has non-integral speed; kernel bug?", name) # phys-address if (ifpath / "address").exists(): addr_file = open(ifpath / "address", 'r') iface.append(util.leaf_elm('if:phys-address', addr_file.read().strip())) addr_file.close() # statistics _add_stats_to(iface, name) def _edit_description(session, rpc, node, def_op, iface: str): """Edit the description for an interface.""" operation = node_operation(node, def_op) nmsa = get_nmsa() already = nmsa.get_param(iface, 'description') is not None if operation == 'create' and already: raise error.DataExistsAppError(rpc) if operation == 'delete' and not already: raise error.DataMissingAppError(rpc) if operation in ('delete', 'remove'): _log_iface_change(session, iface, "removing description") nmsa.unset_param(iface, 'description') return if operation not in ('create', 'merge', 'replace'): raise error.OperationNotSupportedAppError(rpc) _log_iface_change(session, iface, "description: " + node.text) nmsa.set_param(iface, 'description', node.text) def _edit_enabled(session, rpc, node, def_op, iface: str): """Edit the enabled property of an interface.""" operation = node_operation(node, def_op) nmsa = get_nmsa() if operation == 'create': raise error.DataExistsAppError(rpc) if operation not in ('merge', 'replace'): # You cannot delete the enabled node. raise error.OperationNotSupportedAppError(rpc) enable = (node.text == 'true') _log_iface_change(session, iface, "enabled: " + str(enable)) nmsa.set_param(iface, 'enabled', enable) def edit(session, rpc, node, def_op): # pylint: disable=R0912 """Edit the interface configuration for this device.""" methods = {'description': _edit_description, 'enabled': _edit_enabled} nmsa = get_nmsa() # We can't edit if we don't have an NMSA module loaded. # This guarantees that none of our child functions need to test for nmsa. if nmsa is None: raise error.OperationNotSupportedAppError(rpc) for interface in node: if QName(interface.tag).localname != 'interface': raise error.UnknownElementAppError(rpc, interface) name_node = interface.find('{'+M_NS+'}name') if name_node is None: raise error.MissingElementAppError(rpc, interface) iface = name_node.text operation = node_operation(interface, def_op) if operation in ('create', 'delete', 'remove'): # We are operating on the "entire" interface here, not just a # single element. if operation == 'delete' and iface not in nmsa.interface_list(): raise error.DataMissingAppError(rpc) if operation in ('delete', 'remove'): _log_iface_change(session, iface, "interface removal") nmsa.remove_interface(iface) continue _log_iface_change(session, iface, "interface creation") enable_node = interface.find('{'+M_NS+'}enabled') if enable_node is None: nmsa.set_param(iface, 'enabled', True) # Default. else: nmsa.set_param(iface, 'enabled', enable_node.text == 'true') # Don't let _edit_enabled raise an error due to 'create' op. interface.remove(enable_node) for candidate in interface: name = QName(candidate.tag).localname if name in methods: methods[name](session, rpc, candidate, operation, iface) elif name == 'name': continue else: maybe_raise_on_invalid_node(M_NS, rpc, candidate)