""" NETCONF for APK Distributions server: Base utility functions. Copyright © 2020 Adélie Software in the Public Benefit, Inc. Released under the terms of the NCSA license. See the LICENSE file included with this source distribution for more information. SPDX-License-Identifier: NCSA """ from datetime import datetime, timezone from netconf import error def _(the_string: str) -> str: """Translate a user-facing string. Currently not implemented, but allows us to implement translation later.""" return the_string def user_for_session(session) -> str: """Determine the user for the specified session.""" return session.pkt_stream.stream.get_transport().get_username() def ensure_leaf(rpc, node): """Ensure that the node is a leaf (contains no child nodes).""" if len(node) > 0: raise error.UnknownElementAppError(rpc, node[0]) def node_operation(node, def_op='merge') -> str: """Determine the operation desired for an XML node. :param node: The node. :param str def_op: The default operation if none is specified. Defaults to 'merge'. :returns str: The operation desired. """ nc10 = '{urn:ietf:params:xml:ns:netconf:base:1.0}operation' nc11 = '{urn:ietf:params:xml:ns:netconf:base:1.1}operation' return node.attrib.get(nc11, node.attrib.get(nc10, def_op)) def yang_dt_for_timestamp(timestamp) -> str: """Provide a yang:date-and-time string for a given timestamp.""" now = datetime.now(timezone.utc) zone = now.astimezone().tzinfo return datetime.fromtimestamp(timestamp, tz=zone).isoformat() def _index_for_list_op(the_list: list, operation: str, insert_value) -> int: """Determine the index for a leaf-list insertion. :param list the_list: The Python list object that represents the leaf-list. :param str operation: The insertion mode: first, last, before, after. :param insert_value: The value to insert for modes before and after. MUST be specified. :returns int: The index in which to insert. """ if operation == 'last': return len(the_list) if operation == 'first': return 0 if operation == 'before': return the_list.index(insert_value) if operation == 'after': return the_list.index(insert_value) + 1 return -1 def handle_list_operation(rpc, the_list: list, node, operation: str) -> list: """Perform a operation on a leaf-list based on RFC 6020. :param rpc: The RPC message. :param list the_list: The Python list object that represents the leaf-list. :param node: The XML node that represents the item being operated on. :param str operation: The operation to perform: create, merge, replace, delete, remove. :returns list: The list with the value manipulated with the desired effect. """ value = node.text # RFC 6020 § 7.7.7 val_attr = '{urn:ietf:params:xml:ns:yang:1}value' insert = node.attrib.get('{urn:ietf:params:xml:ns:yang:1}insert', None) insert_value = node.attrib.get(val_attr, None) if operation == 'create': if value in the_list: raise error.DataExistsAppError(rpc) if insert is None: insert = 'last' if operation in ['merge', 'replace']: # Need clarification about RFC § 7.7.7. This may be wrong. # If so, change `return` to `insert = 'last'`, or move to function top. if insert is None: return the_list # `insert` was specified, so we will move this item. if value in the_list: the_list.remove(value) if operation in ['delete', 'remove']: if value not in the_list: if operation == 'delete': raise error.DataMissingAppError(rpc) return the_list # Job done. the_list.remove(value) return the_list # Okay. We are performing an addition of some kind on this list. if insert in ['before', 'after'] and ( insert_value is None or insert_value not in the_list): raise error.BadAttributeAppError(rpc, node, val_attr) index = _index_for_list_op(the_list, insert, insert_value) if index == -1: raise error.BadAttributeAppError( rpc, node, '{urn:ietf:params:xml:ns:yang:1}insert' ) the_list.insert(index, value) return the_list