"""
NETCONF for APK Distributions server:
ietf-interfaces module
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import logging
import pathlib
import subprocess
from lxml import etree
from netconf import util
from ncserver.base.util import _, yang_dt_for_timestamp
QName = etree.QName # pylint: disable=I1101
LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""
M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""
M_PREFIX = "if"
"""The XML tag prefix for this module's tags."""
M_NS = "urn:ietf:params:xml:ns:yang:ietf-interfaces"
"""The XML namespace for this module."""
M_NAME = "ietf-interfaces"
"""The YANG model name for this module."""
M_REVISION = "2018-02-20"
"""The YANG revision date for this module."""
M_IMPORTS = {
'ietf-yang-types@2013-07-15': {
'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang"
},
'iana-if-type@2017-01-19': {
'ns': "urn:ietf:params:xml:ns:yang:iana-if-type", 'prefix': "ianaift"
}
}
"""The imported YANG modules for this module."""
M_FEATURES = ['pre-provisioning']
"""The supported features declared in YANG for this module."""
def _add_running_contents(ifaces):
"""Retrieve the interface configuration for this device.
Allows returning the 'config true' data for both datastores."""
types = {
'1': 'ethernetCsmacd',
'3': 'rfc877x25',
'7': 'arcnet',
'8': 'arap',
'19': 'atm',
'24': 'ieee1394',
'32': 'infiniband',
'256': 'slip',
'512': 'ppp',
'513': 'hdlc',
'516': 'lapb',
'768': 'tunnel',
'770': 'frameRelay',
'772': 'softwareLoopback',
'774': 'fddi',
'804': 'ieee802154'
}
for ifname in pathlib.Path('/sys/class/net').iterdir():
iface = util.subelm(ifaces, 'if:interface')
iface.append(util.leaf_elm('if:name', ifname.name))
type_file = open(ifname / "type", 'r')
type_num = type_file.read()
type_file.close()
type_num = type_num.strip()
if type_num in types.keys():
iana_xmlns = "urn:ietf:params:xml:ns:yang:iana-if-type"
iface.append(util.leaf_elm('if:type', 'ianaift:'+types[type_num],
nsmap={'ianaift': iana_xmlns}))
else:
LOGGER.warning(_('unknown Linux hwtype for %s: %s'),
ifname.name, type_num)
def running(node):
"""Retrieve the service configuration for this device."""
ifaces = util.subelm(node, 'if:interfaces')
_add_running_contents(ifaces)
def _add_stats_to(iface, name: str):
"""Add the statistics node to +iface+."""
counter_tags = {
'rx.octets': 'if:in-octets',
'rx.discard': 'if:in-discards',
'rx.errors': 'if:in-errors',
'tx.octets': 'if:out-octets',
'tx.discard': 'if:out-discards',
'tx.errors': 'if:out-errors'
}
stats = util.subelm(iface, 'if:statistics')
# XXX BAD vvv
stats.append(util.leaf_elm('if:discontinuity-time',
'2020-01-01T01:01:01.011Z'))
# XXX BAD ^^^
result = subprocess.run(['/sbin/ifupdown', 'ifctrstat', name],
stdout=subprocess.PIPE, check=False)
if result.returncode != 0:
LOGGER.error(_('ifctrstat failed for %s: %s'),
name, result.returncode)
return
counters = result.stdout.decode('utf-8').split('\n')
counters.pop()
for counter, value in [data.split(': ') for data in counters]:
if counter not in counter_tags.keys():
LOGGER.warning(_('unhandled ifctrstat counter for %s: %s'),
name, counter)
continue
stats.append(util.leaf_elm(counter_tags[counter], value))
def operational(node):
"""Retrieve the service state for this device."""
ifaces = util.subelm(node, 'if:interfaces')
_add_running_contents(ifaces)
for iface in ifaces.iterchildren():
name = iface.find('{'+M_NS+'}name').text
ifpath = pathlib.Path("/sys/class/net/" + name)
# if-index
if (ifpath / "ifindex").exists():
index_file = open(ifpath / "ifindex", 'r')
iface.append(util.leaf_elm('if:if-index',
index_file.read().strip()))
index_file.close()
# oper-status
status = "unknown"
if (ifpath / "operstate").exists():
status_file = open(ifpath / "operstate", 'r')
status = status_file.read().strip()
status_file.close()
iface.append(util.leaf_elm('if:oper-status', status))
# speed
if status == "up" and (ifpath / "speed").exists():
try:
with open(ifpath / "speed", 'r') as speed_file:
speed = int(speed_file.read().strip())
speed *= 1000
iface.append(util.leaf_elm('if:speed', speed))
except IOError:
pass # if the interface does not have measurable speed, omit
except ValueError:
LOGGER.warning("%s has non-integral speed; kernel bug?", name)
# phys-address
if (ifpath / "address").exists():
addr_file = open(ifpath / "address", 'r')
iface.append(util.leaf_elm('if:phys-address',
addr_file.read().strip()))
addr_file.close()
# statistics
_add_stats_to(iface, name)
def edit(session, rpc, node, def_op):
"""Edit the interface configuration for this device."""