"""
NETCONF for APK Distributions server:
ietf-system module
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import logging
import os.path
import pathlib
import platform
import subprocess
from datetime import datetime
from socket import gethostname, sethostname
from lxml.etree import QName # pylint: disable=E0611
from netconf import error, util
from ncserver.base.log import HOSTNAME_CHANGED, log_config_change, \
log_operation
from ncserver.base.util import _, ensure_leaf, handle_list_operation, \
node_operation, yang_dt_for_timestamp
from ncserver.util import maybe_raise_on_invalid_node, system_boot_time
LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""
M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""
M_PREFIX = "sys"
"""The XML tag prefix for this module's tags."""
M_NS = "urn:ietf:params:xml:ns:yang:ietf-system"
"""The XML namespace for this module."""
M_NAME = "ietf-system"
"""The YANG model name for this module."""
M_REVISION = "2014-08-06"
"""The YANG revision date for this module."""
M_IMPORTS = {
'ietf-yang-types@2013-07-15': {
'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang"
},
'ietf-inet-types@2013-07-15': {
'ns': "urn:ietf:params:xml:ns:yang:ietf-inet-types", 'prefix': "inet"
},
'ietf-netconf-acm@2018-02-14': {
'ns': "urn:ietf:params:xml:ns:yang:ietf-netconf-acm", 'prefix': "nacm"
},
'iana-crypt-hash@2014-08-06': {
'ns': "urn:ietf:params:xml:ns:yang:iana-crypt-hash", 'prefix': "ianach"
}
}
"""The imported YANG modules for this module."""
M_FEATURES = ['ntp']
"""The supported features declared in YANG for this module."""
_CONTACT_PATH = '/etc/netconf/contact.txt'
"""The file path for the contents of /sys:system/contact."""
_LOCATION_PATH = '/etc/netconf/location.txt'
"""The file path for the contents of /sys:system/location."""
class NTPServer: # pylint: disable=R0902
"""Represents an NTP server entry."""
def __init__(self, host: str, **kwargs):
"""Create a new NTP server entry.
:param str host:
The host, in IP or DNS form.
"""
self.host = host
self.mode = kwargs.pop('mode', 'pool')
if self.mode in ('server', 'peer'):
self.key = kwargs.pop('key', None)
else:
self.key = None
self.burst = kwargs.pop('burst', False)
self.iburst = kwargs.pop('iburst', False)
self.version = kwargs.pop('version', None)
self.prefer = kwargs.pop('prefer', False)
self.minpoll = kwargs.pop('minpoll', None)
self.maxpoll = kwargs.pop('maxpoll', None)
if self.mode == 'poll':
self.preempt = kwargs.pop('preempt', False)
else:
self.preempt = False
def __repr__(self):
"""Return a representation of this object."""
args = {'mode': '"' + self.mode + '"'}
if self.burst:
args['burst'] = True
if self.iburst:
args['iburst'] = True
if self.version is not None:
args['version'] = '"' + self.version + '"'
if self.prefer:
args['prefer'] = True
if self.minpoll is not None:
args['minpoll'] = '"' + self.minpoll + '"'
if self.maxpoll is not None:
args['maxpoll'] = '"' + self.maxpoll + '"'
if self.mode in ('server', 'peer') and self.key is not None:
args['key'] = '"' + self.key + '"'
if self.mode == 'poll' and self.preempt:
args['preempt'] = True
nondef_args = ', '.join(
['{}={}'.format(*item) for item in args.items()]
)
return "NTPServer('{host}', {nondef_args})".format(
host=self.host, nondef_args=nondef_args
)
def __str__(self): # pylint: disable=R0912
"""Return the configuration line for this server."""
line = self.mode + ' '
if ':' in self.host and '[' not in self.host:
if '%' in self.host:
addr, dev = self.host.split('%')
line += '[' + addr + ']%' + dev
else:
line += '[' + self.host + ']'
else:
line += self.host
if self.mode in ('server', 'peer') and self.key is not None:
line += ' key ' + self.key
if self.mode in ('pool', 'server'):
if self.burst:
line += ' burst'
if self.iburst:
line += ' iburst'
if self.version is not None:
line += ' version ' + str(self.version)
if self.prefer:
line += ' prefer'
if self.minpoll is not None:
line += ' minpoll ' + self.minpoll
if self.maxpoll is not None:
line += ' maxpoll ' + self.maxpoll
if self.mode == 'pool' and self.preempt:
line += ' preempt'
return line
@classmethod
def from_line(cls, line: str):
"""Create an NTPServer object from a configuration file line."""
tokens = line.strip().split(' ')
tokens.reverse()
kwargs = {'mode': tokens.pop()}
host = tokens.pop()
bool_keys = ('burst', 'iburst', 'prefer', 'preempt')
value_keys = ('key', 'version', 'minpoll', 'maxpoll')
while len(tokens) > 0:
key = tokens.pop()
if key in bool_keys:
kwargs[key] = True
elif key in value_keys:
kwargs[key] = tokens.pop()
else:
LOGGER.warning('unknown NTP server conf key: %s', key)
return cls(host, **kwargs)
def _parse_ntp_config() -> dict:
"""Parse /etc/ntp.conf and return the configuration.
The return value is a dictionary with the following keys:
* servers: a list of NTPServer objects.
"""
conf = open('/etc/ntp.conf', 'r')
lines = conf.readlines()
conf.close()
result = {'servers': list()}
for line in lines:
line.strip()
if line.startswith('#') or len(line) == 0:
continue
if any([line.startswith(word) for word in ('server', 'pool', 'peer')]):
result['servers'].append(NTPServer.from_line(line))
else:
LOGGER.warning('unhandled NTP configuration line: %s', line)
return result
def _parse_ntp_conf_to(sys_node):
"""Parse NTP configuration into /sys:system/ntp format."""
root = util.subelm(sys_node, 'sys:ntp')
if not os.path.exists('/etc/ntp.conf'):
root.append(util.leaf_elm('sys:enabled', 'false'))
return
conf = _parse_ntp_config()
for server in conf['servers']:
serv_node = util.subelm(root, 'sys:server')
serv_node.append(util.leaf_elm('sys:name', server.host))
serv_node.append(util.leaf_elm('sys:association-type', server.mode))
serv_node.append(util.leaf_elm('sys:iburst',
str(server.iburst).lower()))
serv_node.append(util.leaf_elm('sys:prefer',
str(server.prefer).lower()))
udp = util.subelm(serv_node, 'sys:udp')
udp.append(util.leaf_elm('sys:address', server.host))
return
def _parse_resolv_conf() -> dict:
"""Parse /etc/resolv.conf and return the configuration.
The return value is a dictionary with the following keys:
* resolvers: a dict, with each key being the name of the resolver
and the value being the resolver IP
* search: a list, containing search domains
* attempts: the number of attempts to resolve a name
* timeout: the timeout in seconds to resolve a name
"""
with open('/etc/resolv.conf', 'r') as rconf_f:
rconf = rconf_f.readlines()
# Stores the last NAME: line seen.
last_name = 'Resolver'
# Containers for actual data.
resolvers = dict()
search = list()
# musl defaults; see musl:include/resolv.h
attempts = 2
attempts_default = True
timeout = 5
timeout_default = True
for line in rconf:
line = line.rstrip()
if line.startswith('# NAME: '):
last_name = line[8:]
elif line.startswith('nameserver '):
if len(resolvers) >= 3:
continue # invalid configuration, so don't report it.
if last_name in resolvers:
last_name = last_name + "-1"
resolvers[last_name] = line[11:]
elif line.startswith('options attempts:'):
try:
# musl caps attempts at 10
attempts = min(10, int(line[17:]))
attempts_default = False
except ValueError:
LOGGER.warning(_("invalid resolv.conf: non-integral attempts"))
elif line.startswith('options timeout:'):
try:
# musl caps timeout at 60
timeout = min(60, int(line[16:]))
timeout_default = False
except ValueError:
LOGGER.warning(_("invalid resolv.conf: non-integral timeout"))
# This logic is taken directly from musl source code.
elif line.startswith('domain') or line.startswith('search'):
search = line[7:].split(' ')
return {'resolvers': resolvers, 'search': search, 'attempts': attempts,
'timeout': timeout, 'attempts_default': attempts_default,
'timeout_default': timeout_default}
def _parse_resolv_conf_to(sys_node):
"""Parse /etc/resolv.conf into /sys:system/dns-resolver format."""
root = util.subelm(sys_node, 'sys:dns-resolver')
resolv_conf = _parse_resolv_conf()
for domain in resolv_conf['search']:
root.append(util.leaf_elm('sys:search', domain))
for pair in resolv_conf['resolvers'].items():
res = util.subelm(root, 'sys:server')
res.append(util.leaf_elm('sys:name', pair[0]))
udp = util.subelm(res, 'sys:udp-and-tcp')
udp.append(util.leaf_elm('sys:address', pair[1]))
opts = util.subelm(root, 'sys:options')
opts.append(util.leaf_elm('sys:timeout', resolv_conf['timeout']))
opts.append(util.leaf_elm('sys:attempts', resolv_conf['attempts']))
def _parse_authn_to(sys_node):
"""Parse current config into /sys:system/authentication format."""
sys_node.append(util.leaf_elm('sys:authentication', ''))
def running(node):
"""Retrieve the running configuration for this system."""
sys = util.subelm(node, 'sys:system')
if os.path.exists(_CONTACT_PATH):
with open(_CONTACT_PATH, 'r') as contact_f:
sys.append(util.leaf_elm('sys:contact', contact_f.read()))
else:
sys.append(util.leaf_elm('sys:contact', ''))
if os.path.exists(_LOCATION_PATH):
with open(_LOCATION_PATH, 'r') as loc_f:
sys.append(util.leaf_elm('sys:location', loc_f.read()))
else:
sys.append(util.leaf_elm('sys:location', ''))
sys.append(util.leaf_elm('sys:hostname', gethostname()))
_parse_ntp_conf_to(sys)
_parse_resolv_conf_to(sys)
_parse_authn_to(sys)
def operational(node):
"""Retrieve the operational configuration for this system."""
running(node) # Include the running configuration.
state = util.subelm(node, 'sys:system-state')
plat = util.subelm(state, 'sys:platform')
plat.append(util.leaf_elm('sys:os-name', platform.system()))
plat.append(util.leaf_elm('sys:os-release', platform.release()))
try:
osv = subprocess.run(['/bin/sh', '-c',
'( . /etc/os-release && echo -n $PRETTY_NAME )'],
stdout=subprocess.PIPE, check=True)
version = osv.stdout.decode('utf-8')
except subprocess.CalledProcessError:
version = "Unknown Distribution"
plat.append(util.leaf_elm('sys:os-version', version))
plat.append(util.leaf_elm('sys:machine', platform.machine()))
clock = util.subelm(state, 'sys:clock')
clock.append(util.leaf_elm(
'sys:current-datetime',
yang_dt_for_timestamp(datetime.now().timestamp())
))
clock.append(util.leaf_elm('sys:boot-datetime', system_boot_time()))
# -- Editing functions --
def _save_resolv_conf(conf: dict):
"""Save the current configuration to /etc/resolv.conf."""
rcfile = open('/etc/resolv.conf', 'w')
rcfile.write("# Generated by NETCONFAPK\n")
rcfile.write("# This file will be overwritten upon configuration. "
"Do not edit directly.\n\n")
if 'attempts' in conf and not conf.get('attempts_default', False):
rcfile.write("options attempts:{}\n".format(conf['attempts']))
if 'timeout' in conf and not conf.get('timeout_default', False):
rcfile.write("options timeout:{}\n".format(conf['timeout']))
if len(conf.get('search', list())) > 0:
rcfile.write("search {}\n".format(" ".join(conf['search'])))
for pair in conf.get('resolvers', dict()).items():
rcfile.write("# NAME: {}\nnameserver {}\n".format(pair[0], pair[1]))
rcfile.close()
def _edit_dns_option(session, rpc, node, def_op, curr):
"""Edit DNS option configuration."""
root_op = node_operation(node, def_op)
if root_op in ['replace', 'delete', 'remove']:
# delete must raise if no options are set.
if root_op == 'delete':
if ('attempts' not in curr or curr.get('attempts_default', False))\
or ('timeout' not in curr or curr.get('timeout_default', False)):
raise error.DataMissingAppError(rpc)
curr.pop('attempts', None)
curr.pop('timeout', None)
elif root_op == 'create':
if 'attempts' in curr and not curr.get('attempts_default', False) \
or 'timeout' in curr and not curr.get('timeout_default', False):
raise error.DataExistsAppError(rpc)
for opt in node:
option = QName(opt.tag).localname
operation = node_operation(opt, root_op)
ensure_leaf(rpc, opt)
if option == 'attempts':
limit = 10
elif option == 'timeout':
limit = 60
else:
raise error.UnknownElementAppError(rpc, opt)
if operation in ['delete', 'remove']:
if option in curr:
log_config_change(session, opt.tag, '( deleted )')
del curr[option]
continue
try:
value = int(opt.text)
log_config_change(session, opt.tag, value)
except ValueError as value:
raise error.InvalidValueAppError(rpc) from value
curr[option] = min(limit, value)
curr['{}_default'.format(option)] = False
_save_resolv_conf(curr)
def _edit_dns_search(session, rpc, node, def_op, curr: dict):
"""Edit DNS search-domain configuration."""
operation = node_operation(node, def_op)
log_config_change(session, 'sys:search', operation + ':' + node.text)
handle_list_operation(rpc, curr['search'], node, operation)
_save_resolv_conf(curr)
def _edit_dns_server(session, rpc, node, def_op, curr: dict):
# yes, this function is too complex.
# pylint: disable=R0912,R0914,R0915
"""Edit DNS nameserver configuration."""
operation = node_operation(node, def_op)
xmlmap = {'': 'urn:ietf:params:xml:ns:yang:ietf-system'}
name = node.find('name', xmlmap)
# XXX TODO: log changes
# This lets us handle cases where there are no resolvers yet.
# We set the dict key to the value in case we used the default.
resolvers = curr.get('resolvers', dict())
curr['resolvers'] = resolvers
if name is None:
raise error.MissingElementAppError(
rpc, '{urn:ietf:params:xml:ns:yang:ietf-system}name'
)
name = name.text
if operation in ['delete', 'remove']:
if operation == 'delete' and name not in resolvers.keys():
raise error.DataMissingAppError(rpc)
resolvers.pop(name, None)
_save_resolv_conf(curr)
return
insert = node.attrib.get('{urn:ietf:params:xml:ns:yang:1}insert', None)
other_key = node.attrib.get('{urn:ietf:params:xml:ns:yang:1}key', None)
value = None
for opt in node:
opt_name = QName(opt.tag).localname
if opt_name == 'name':
continue # already handled
if opt_name == 'udp-and-tcp':
for subopt in opt:
if QName(subopt.tag).localname != 'address':
raise error.UnknownElementAppError(rpc, subopt)
value = subopt.text
else:
raise error.UnknownElementAppError(rpc, opt)
if operation == 'create':
if name in resolvers:
raise error.DataExistsAppError(rpc)
if len(resolvers) >= 3:
# there is no guidance in the RFC for what to do with this...
raise error.BadElementAppError(rpc, node)
if insert is None:
insert = 'last'
elif operation in ['merge', 'replace']:
if value is None:
raise error.MissingElementAppError(
rpc, '{urn:ietf:params:xml:ns:yang:ietf-system}udp-and-tcp'
)
if name not in resolvers and len(resolvers) >= 3:
# ditto from above
raise error.BadElementAppError(rpc, node)
# Okay, we have to juggle everything around.
if insert is None or insert == 'last':
resolvers[name] = value
elif insert == 'first':
# update always adds keys to the end of a dict.
curr['resolvers'] = {name: value}
curr['resolvers'].update(resolvers)
elif insert in ('before', 'after'):
if other_key is None:
raise error.MissingAttributeAppError(
rpc, node, '{urn:ietf:params:xml:ns:yang:1}key'
)
if other_key not in resolvers:
raise error.DataMissingAppError(rpc)
work = dict()
for r_name, r_ip in resolvers.items():
if r_name == other_key and insert == 'before':
work[name] = value
work[r_name] = r_ip
if r_name == other_key and insert == 'after':
work[name] = value
curr['resolvers'] = work
else:
raise error.BadAttributeAppError(
rpc, node, '{urn:ietf:params:xml:ns:yang:1}insert'
)
_save_resolv_conf(curr)
def _is_resolv_conf_empty(curr: dict) -> bool:
"""Determine if a DNS configuration is empty/default."""
return (
len(curr.get('resolvers', dict())) == 0 and
len(curr.get('search', list())) == 0 and
('attempts' not in curr or curr.get('attempts_default', True)) and
('timeout' not in curr or curr.get('timeout_default', True))
)
def _edit_dns(session, rpc, node, def_op):
"""Edit the DNS configuration for this system."""
curr = _parse_resolv_conf()
root_op = node_operation(node, def_op)
if root_op == 'replace':
curr = {}
elif root_op == 'remove':
curr = {}
log_config_change(session, 'sys:dns-resolver', '( deleted )')
_save_resolv_conf(curr)
elif root_op == 'delete':
if _is_resolv_conf_empty(curr):
raise error.DataMissingAppError(rpc)
curr = {}
log_config_change(session, 'sys:dns-resolver', '( deleted )')
_save_resolv_conf(curr)
elif root_op == 'create':
if not _is_resolv_conf_empty(curr):
raise error.DataExistsAppError(rpc)
editors = {'options': _edit_dns_option, 'search': _edit_dns_search,
'server': _edit_dns_server}
for key_node in node:
key = QName(key_node.tag).localname
if key not in editors:
raise error.UnknownElementAppError(rpc, key)
editors[key](session, rpc, key_node, root_op, curr)
def _edit_file(session, rpc, node, def_op):
"""Edit a file-based configuration for this system."""
operation = node_operation(node, def_op)
ensure_leaf(rpc, node)
paths = {'contact': _CONTACT_PATH, 'location': _LOCATION_PATH}
selected = paths[QName(node.tag).localname]
already = os.path.exists(selected)
if operation == 'create' and already:
raise error.DataExistsAppError(rpc)
if operation == 'delete' and not already:
raise error.DataMissingAppError(rpc)
if operation in ['delete', 'remove']:
if already:
log_config_change(session, node.tag, '( deleted )')
pathlib.Path(selected).unlink()
return
log_config_change(session, node.tag, node.text)
with open(selected, 'wt') as myfile:
myfile.write(node.text)
def _edit_hostname(session, rpc, node, def_op):
"""Edit the hostname for this system."""
operation = node_operation(node, def_op)
ensure_leaf(rpc, node)
# you can't *unset* a hostname.
if operation in ['delete', 'remove']:
raise error.OperationNotSupportedAppError(rpc)
if operation == 'create':
raise error.DataExistsAppError(rpc)
if operation not in ['merge', 'replace']:
raise error.OperationNotSupportedAppError(rpc)
newname = node.text
log_config_change(session, 'sys:hostname', newname)
with open('/etc/hostname', 'wt') as hostfile:
hostfile.write(newname + '\n')
sethostname(newname)
HOSTNAME_CHANGED.call(newname)
def _edit_ntp(session, rpc, node, def_op):
"""Edit NTP configuration for this system."""
def edit(session, rpc, node, def_op):
"""Edit the configuration for this system."""
methods = {'dns-resolver': _edit_dns,
'contact': _edit_file, 'location': _edit_file,
'hostname': _edit_hostname,
'ntp': _edit_ntp}
for subsystem in node:
name = QName(subsystem.tag).localname
if name in methods:
methods[name](session, rpc, subsystem, def_op)
else:
maybe_raise_on_invalid_node(M_NS, rpc, subsystem)
def rpc_system_shutdown(session, _, *params):
"""Shutdown the system."""
log_operation(session, "shutting down the system")
subprocess.Popen(["/sbin/poweroff"])
return util.elm('nc:ok')
def rpc_system_restart(session, _, *params):
"""Restart the system."""
log_operation(session, "restarting the system")
subprocess.Popen(["/sbin/reboot"])
return util.elm('nc:ok')
M_RPCS = {'system-shutdown': rpc_system_shutdown,
'system-restart': rpc_system_restart}
"""The RPCs implemented by this module."""