"""
NETCONF for APK Distributions server:
adelie-services module, OpenRC edition
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import errno
import logging
import os
import pathlib
import subprocess
from netconf import error, util
from ncserver.base.service import Service, ServiceStatus
from ncserver.base.util import _, yang_dt_for_timestamp
LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""
M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""
M_PREFIX = "svcs"
"""The XML tag prefix for this module's tags."""
M_NS = "http://netconf.adelielinux.org/ns/service"
"""The XML namespace for this module."""
M_NAME = "adelie-services"
"""The YANG model name for this module."""
M_REVISION = "2020-09-22"
"""The YANG revision date for this module."""
M_IMPORTS = {
'ietf-yang-types@2013-07-15': {
'ns': "urn:ietf:params:xml:ns:yang:ietf-yang-types", 'prefix': "yang"
}
}
"""The imported YANG modules for this module."""
def valid_name(name):
"""Determine if the given name is a valid OpenRC service name."""
valid = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890_-."
return all(char in valid for char in name)
def is_enabled(name):
"""Determine if this OpenRC service is enabled."""
runlevels = (pathlib.Path('/etc/runlevels/boot'),
pathlib.Path('/etc/runlevels/sysinit'),
pathlib.Path('/etc/runlevels/default'))
for level in runlevels:
for service in level.iterdir():
# Resolve the symlink. If it's the service name, we have a match.
if pathlib.Path(service).resolve().name == name:
return True
return False
def get_var(shell_like, var_name: str, default="") -> str:
"""Retrieve the value of var_name from a shell script-like file shell_like.
:param shell_like:
The path to the shell script-like file.
:param str var_name:
The name of the variable to extract.
:param str default:
The default value to return if the variable is not set.
"""
script = ". /etc/init.d/functions.sh;(. {shlike} && printf %s \"${var}\")"
script = script.format(shlike=shell_like, var=var_name)
proc = subprocess.run(
['/bin/sh', '-e', '-c', script], check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
value = proc.stdout.decode('utf-8')
if value == "":
return default
return value
def check_alive(pidfile) -> bool:
"""Determine if the PID contained in a pidfile is alive."""
pid = 0
try:
with open(pidfile) as fhandle:
pid = int(fhandle.read().rstrip())
except OSError as ose:
if ose.errno == errno.EPERM or ose.errno == errno.EACCES:
return True # the pidfile cannot be accessed
return False # sounds bad to me
except ValueError:
return False # the pidfile doesn't contain a PID
if pid <= 0:
return False
try:
os.kill(pid, 0)
except OSError as ose:
# If EPERM, we can't send the signal because it's alive.
return ose.errno == errno.EPERM
else:
return True
class OpenRCService(Service):
"""The OpenRC implementation of the Service class."""
def __init__(self, name):
"""Initialise the structure."""
super().__init__()
if not valid_name(name):
raise NameError(_("{}: invalid service name").format(name))
svcpath = "/etc/init.d/{name}".format(name=name)
if not os.path.exists(svcpath):
raise ValueError(_("{} does not exist").format(name))
self._name = name
self._description = get_var(svcpath, "description", name)
self._enabled = is_enabled(name)
# Update status and start-time.
self.status()
def status(self):
"""Retrieve the service's status."""
stat = ServiceStatus.Stopped
path = None
status_dirs = {
ServiceStatus.Starting: "/run/openrc/starting",
ServiceStatus.Running: "/run/openrc/started",
ServiceStatus.Stopping: "/run/openrc/stopping"
}
for value, directory in status_dirs.items():
for service in pathlib.Path(directory).iterdir():
if pathlib.Path(service).resolve().name == self.name:
stat = value
path = service
break
if stat == ServiceStatus.Running:
# Check if crashed
for service in pathlib.Path("/run/openrc/daemons").iterdir():
svcdir = pathlib.Path(service)
if svcdir.name == self.name:
for daemon in svcdir.iterdir():
dpid = get_var(daemon, 'pidfile')
if dpid and not check_alive(dpid):
return ServiceStatus.Crashed
# Update start-time
self._start_time = pathlib.Path(path).lstat().st_mtime
return stat
def start(self):
"""Start the service."""
if self.status() in (ServiceStatus.Starting, ServiceStatus.Running):
return
subprocess.Popen(["/sbin/rc-service", self.name, "start"])
def stop(self):
"""Stop the service."""
subprocess.Popen(["/sbin/rc-service", self.name, "stop"])
def reload(self):
"""Reload the service's configuration, if possible."""
def restart(self, full: bool):
"""Restart the service.
OpenRC does not support a concept of full restart, so this parameter
is always ignored.
"""
subprocess.Popen(["/sbin/rc-service", self.name, "restart"])
def __str__(self):
return self.name
def __repr__(self):
return "<OpenRC service {name} ({status})>".format(
name=self.name, status=self.status()
)
def service_list():
"""Return the list of services available on this device."""
services = list()
for service in pathlib.Path('/etc/init.d').iterdir():
if service.suffix == '.sh':
continue
services.append(OpenRCService(service.name))
return services
def info(service: str) -> OpenRCService:
"""Return the Service object associated with the specified service.
:param str service:
The name of the service to look up.
:returns:
The Service object, or None if a service with that name does not exist.
"""
try:
return OpenRCService(service)
except ValueError:
return None
def running(node):
"""Retrieve the service configuration for this device."""
svcs = util.subelm(node, 'svcs:services')
for service in service_list():
svcnode = util.subelm(svcs, 'svcs:service')
svcnode.append(util.leaf_elm('svcs:name', service.name))
svcnode.append(util.leaf_elm('svcs:enabled', service.enabled))
def operational(node):
"""Retrieve the service state for this device."""
svcs = util.subelm(node, 'svcs:services')
for service in service_list():
svcnode = util.subelm(svcs, 'svcs:service')
svcnode.append(util.leaf_elm('svcs:name', service.name))
svcnode.append(util.leaf_elm('svcs:description', service.description))
svcnode.append(util.leaf_elm('svcs:enabled', service.enabled))
svcnode.append(util.leaf_elm(
'svcs:status', service.status().name.lower()
))
if service.status() == ServiceStatus.Running:
svcnode.append(util.leaf_elm(
'svcs:start-time', yang_dt_for_timestamp(service.start_time)
))
def edit(session, rpc, node, def_op):
"""Edit the service configuration for this device."""