""" NETCONF for APK Distributions server: ietf-system module Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ import logging import os.path import platform import subprocess import time from datetime import datetime from math import floor from socket import gethostname from netconf import util from ncserver.base.util import _ LOGGER = logging.getLogger(__name__) """The object used for logging informational messages.""" M_ABI_VERSION = 1 """The ABI version of this NETCONF module.""" M_PREFIX = "sys" """The XML tag prefix for this module's tags.""" M_NS = "urn:ietf:params:xml:ns:yang:ietf-system" """The XML namespace for this module.""" M_NAME = "ietf-system" """The YANG model name for this module.""" M_REVISION = "2014-08-06" """The YANG revision date for this module.""" M_IMPORTS = { 'ietf-yang-types@2013-07-15': { 'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang" }, 'ietf-inet-types@2013-07-15': { 'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types", 'prefix': "inet" }, 'ietf-netconf-acm@2018-02-14': { 'ns': "urn:ietf:params:xml:ns:yang:ietf-netconf-acm", 'prefix': "nacm" }, 'iana-crypt-hash@2014-08-06': { 'ns': "urn:ietf:params:xml:ns:yang:iana-crypt-hash", 'prefix': "ianach" } } """The imported YANG modules for this module.""" def _parse_ntp_conf_to(sys_node): """Parse NTP configuration into /sys:system/ntp format.""" root = util.subelm(sys_node, 'sys:ntp') if not os.path.exists('/etc/ntpsec/ntp.conf'): root.append(util.leaf_elm('sys:enabled', 'false')) return # XXX TODO: parse NTP configuration. def _parse_resolv_conf_to(sys_node): """Parse /etc/resolv.conf into /sys:system/dns-resolver format.""" root = util.subelm(sys_node, 'sys:dns-resolver') with open('/etc/resolv.conf', 'r') as rconf_f: rconf = rconf_f.readlines() # Stores the last NAME: line seen. last_name = 'Resolver' # Containers for actual data. resolvers = dict() search = list() # musl defaults; see musl:include/resolv.h attempts = 2 timeout = 5 for line in rconf: line = line.rstrip() if line.startswith('# NAME: '): last_name = line[8:] elif line.startswith('nameserver '): if len(resolvers) >= 3: continue # invalid configuration, so don't report it. if last_name in resolvers: last_name = last_name + "-1" resolvers[last_name] = line[11:] elif line.startswith('options attempts:'): try: # musl caps attempts at 10 attempts = min(10, int(line[17:])) except ValueError: LOGGER.warning(_("invalid resolv.conf: non-integral attempts")) elif line.startswith('options timeout:'): try: # musl caps timeout at 60 timeout = min(60, int(line[16:])) except ValueError: LOGGER.warning(_("invalid resolv.conf: non-integral timeout")) # This logic is taken directly from musl source code. elif line.startswith('domain') or line.startswith('search'): search = line[7:].split(' ') for domain in search: root.append(util.leaf_elm('sys:search', domain)) for pair in resolvers.items(): res = util.subelm(root, 'sys:server') res.append(util.leaf_elm('sys:name', pair[0])) udp = util.subelm(res, 'sys:udp-and-tcp') udp.append(util.leaf_elm('sys:address', pair[1])) opts = util.subelm(root, 'sys:options') opts.append(util.leaf_elm('sys:timeout', timeout)) opts.append(util.leaf_elm('sys:attempts', attempts)) def _parse_authn_to(sys_node): """Parse current config into /sys:system/authentication format.""" sys_node.append(util.leaf_elm('sys:authentication', '')) def running(node): """Retrieve the running configuration for this system.""" sys = util.subelm(node, 'sys:system') if os.path.exists('/etc/netconf/contact…txt'): with open('/etc/netconf/contact.txt', 'r') as contact_f: sys.append(util.leaf_elm('sys:contact', contact_f.read())) else: sys.append(util.leaf_elm('sys:contact', '')) if os.path.exists('/etc/netconf/location.txt'): with open('/etc/netconf/location.txt', 'r') as loc_f: sys.append(util.leaf_elm('sys:location', loc_f.read())) else: sys.append(util.leaf_elm('sys:location', '')) sys.append(util.leaf_elm('sys:hostname', gethostname())) _parse_ntp_conf_to(sys) _parse_resolv_conf_to(sys) _parse_authn_to(sys) def operational(node): """Retrieve the operational configuration for this system.""" state = util.subelm(node, 'sys:system-state') plat = util.subelm(state, 'sys:platform') plat.append(util.leaf_elm('sys:os-name', platform.system())) plat.append(util.leaf_elm('sys:os-release', platform.release())) try: osv = subprocess.run(['/bin/sh', '-c', '( . /etc/os-release && echo -n $PRETTY_NAME )'], stdout=subprocess.PIPE, check=True) version = osv.stdout.decode('utf-8') except subprocess.CalledProcessError: version = "Unknown Distribution" plat.append(util.leaf_elm('sys:os-version', version)) plat.append(util.leaf_elm('sys:machine', platform.machine())) clock = util.subelm(state, 'sys:clock') clock.append(util.leaf_elm( 'sys:current-datetime', datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') )) with open('/proc/uptime', 'r') as upfile: raw = upfile.read().split(' ')[0] boot = floor(time.time() - float(raw)) fmted = datetime.fromtimestamp(boot).strftime('%Y-%m-%dT%H:%M:%S.%f') clock.append(util.leaf_elm('sys:boot-datetime', fmted))