""" NETCONF for APK Distributions server: ietf-interfaces module Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ import logging import pathlib import subprocess from lxml import etree from netconf import util from ncserver.base.util import _, yang_dt_for_timestamp QName = etree.QName # pylint: disable=I1101 LOGGER = logging.getLogger(__name__) """The object used for logging informational messages.""" M_ABI_VERSION = 1 """The ABI version of this NETCONF module.""" M_PREFIX = "if" """The XML tag prefix for this module's tags.""" M_NS = "urn:ietf:params:xml:ns:yang:ietf-interfaces" """The XML namespace for this module.""" M_NAME = "ietf-interfaces" """The YANG model name for this module.""" M_REVISION = "2018-02-20" """The YANG revision date for this module.""" M_IMPORTS = { 'ietf-yang-types@2013-07-15': { 'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang" }, 'iana-if-type@2017-01-19': { 'ns': "urn:ietf:params:xml:ns:yang:iana-if-type", 'prefix': "ianaift" } } """The imported YANG modules for this module.""" M_FEATURES = ['pre-provisioning'] """The supported features declared in YANG for this module.""" def _add_running_contents(ifaces): """Retrieve the interface configuration for this device. Allows returning the 'config true' data for both datastores.""" types = { '1': 'ethernetCsmacd', '3': 'rfc877x25', '7': 'arcnet', '8': 'arap', '19': 'atm', '24': 'ieee1394', '32': 'infiniband', '256': 'slip', '512': 'ppp', '513': 'hdlc', '516': 'lapb', '768': 'tunnel', '770': 'frameRelay', '772': 'softwareLoopback', '774': 'fddi', '804': 'ieee802154' } for ifname in pathlib.Path('/sys/class/net').iterdir(): iface = util.subelm(ifaces, 'if:interface') iface.append(util.leaf_elm('if:name', ifname.name)) type_file = open(ifname / "type", 'r') type_num = type_file.read() type_file.close() type_num = type_num.strip() if type_num in types.keys(): iana_xmlns = "urn:ietf:params:xml:ns:yang:iana-if-type" iface.append(util.leaf_elm('if:type', 'ianaift:'+types[type_num], nsmap={'ianaift': iana_xmlns})) else: LOGGER.warning(_('unknown Linux hwtype for %s: %s'), ifname.name, type_num) def running(node): """Retrieve the service configuration for this device.""" ifaces = util.subelm(node, 'if:interfaces') _add_running_contents(ifaces) def _add_stats_to(iface, name: str): """Add the statistics node to +iface+.""" counter_tags = { 'rx.octets': 'if:in-octets', 'rx.discard': 'if:in-discards', 'rx.errors': 'if:in-errors', 'tx.octets': 'if:out-octets', 'tx.discard': 'if:out-discards', 'tx.errors': 'if:out-errors' } stats = util.subelm(iface, 'if:statistics') # XXX BAD vvv stats.append(util.leaf_elm('if:discontinuity-time', '2020-01-01T01:01:01.011Z')) # XXX BAD ^^^ result = subprocess.run(['/sbin/ifupdown', 'ifctrstat', name], stdout=subprocess.PIPE, check=False) if result.returncode != 0: LOGGER.error(_('ifctrstat failed for %s: %s'), name, result.returncode) return counters = result.stdout.decode('utf-8').split('\n') counters.pop() for counter, value in [data.split(': ') for data in counters]: if counter not in counter_tags.keys(): LOGGER.warning(_('unhandled ifctrstat counter for %s: %s'), name, counter) continue stats.append(util.leaf_elm(counter_tags[counter], value)) def operational(node): """Retrieve the service state for this device.""" ifaces = util.subelm(node, 'if:interfaces') _add_running_contents(ifaces) for iface in ifaces.iterchildren(): name = iface.find('{'+M_NS+'}name').text ifpath = pathlib.Path("/sys/class/net/" + name) # if-index if (ifpath / "ifindex").exists(): index_file = open(ifpath / "ifindex", 'r') iface.append(util.leaf_elm('if:if-index', index_file.read().strip())) index_file.close() # oper-status status = "unknown" if (ifpath / "operstate").exists(): status_file = open(ifpath / "operstate", 'r') status = status_file.read().strip() status_file.close() iface.append(util.leaf_elm('if:oper-status', status)) # speed if status == "up" and (ifpath / "speed").exists(): try: with open(ifpath / "speed", 'r') as speed_file: speed = int(speed_file.read().strip()) speed *= 1000 iface.append(util.leaf_elm('if:speed', speed)) except IOError: pass # if the interface does not have measurable speed, omit except ValueError: LOGGER.warning("%s has non-integral speed; kernel bug?", name) # phys-address if (ifpath / "address").exists(): addr_file = open(ifpath / "address", 'r') iface.append(util.leaf_elm('if:phys-address', addr_file.read().strip())) addr_file.close() # statistics _add_stats_to(iface, name) def edit(session, rpc, node, def_op): """Edit the interface configuration for this device."""