summaryrefslogblamecommitdiff
path: root/ncserver/base/util.py
blob: 164e30981a3925e1b947692037d7a205f50b7245 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13












                                                                            
                                       



                              



                                                                               

 




                                                                   





















                                                                         

 






                                                                    






































                                                                             

                                                                         







                                                                    
                                                                           







































                                                                               
"""
NETCONF for APK Distributions server:
    Base utility functions.

Copyright © 2020 Adélie Software in the Public Benefit, Inc.

Released under the terms of the NCSA license.  See the LICENSE file included
with this source distribution for more information.

SPDX-License-Identifier: NCSA
"""


from datetime import datetime, timezone
from netconf import error


def _(the_string: str) -> str:
    """Translate a user-facing string.

    Currently not implemented, but allows us to implement translation later."""
    return the_string


def user_for_session(session) -> str:
    """Determine the user for the specified session."""
    return session.pkt_stream.stream.get_transport().get_username()


def ensure_leaf(rpc, node):
    """Ensure that the node is a leaf (contains no child nodes)."""
    if len(node) > 0:
        raise error.UnknownElementAppError(rpc, node[0])


def node_operation(node, def_op='merge') -> str:
    """Determine the operation desired for an XML node.

    :param node:
        The node.

    :param str def_op:
        The default operation if none is specified.  Defaults to 'merge'.

    :returns str:
        The operation desired.
    """

    nc10 = '{urn:ietf:params:xml:ns:netconf:base:1.0}operation'
    nc11 = '{urn:ietf:params:xml:ns:netconf:base:1.1}operation'
    return node.attrib.get(nc11, node.attrib.get(nc10, def_op))


def yang_dt_for_timestamp(timestamp) -> str:
    """Provide a yang:date-and-time string for a given timestamp."""
    now = datetime.now(timezone.utc)
    zone = now.astimezone().tzinfo
    return datetime.fromtimestamp(timestamp, tz=zone).isoformat()


def _index_for_list_op(the_list: list, operation: str, insert_value) -> int:
    """Determine the index for a leaf-list insertion.

    :param list the_list:
        The Python list object that represents the leaf-list.

    :param str operation:
        The insertion mode: first, last, before, after.

    :param insert_value:
        The value to insert for modes before and after.  MUST be specified.

    :returns int:
        The index in which to insert.
    """
    if operation == 'last':
        return len(the_list)
    if operation == 'first':
        return 0
    if operation == 'before':
        return the_list.index(insert_value)
    if operation == 'after':
        return the_list.index(insert_value) + 1

    return -1


def handle_list_operation(rpc, the_list: list, node, operation: str) -> list:
    """Perform a <edit-config/> operation on a leaf-list based on RFC 6020.

    :param rpc:
        The RPC <edit-config/> message.

    :param list the_list:
        The Python list object that represents the leaf-list.

    :param node:
        The XML node that represents the item being operated on.

    :param str operation:
        The operation to perform: create, merge, replace, delete, remove.

    :returns list:
        The list with the value manipulated with the desired effect.
    """
    value = node.text

    # RFC 6020 § 7.7.7
    val_attr = '{urn:ietf:params:xml:ns:yang:1}value'
    insert = node.attrib.get('{urn:ietf:params:xml:ns:yang:1}insert', None)
    insert_value = node.attrib.get(val_attr, None)

    if operation == 'create':
        if value in the_list:
            raise error.DataExistsAppError(rpc)
        if insert is None:
            insert = 'last'

    if operation in ['merge', 'replace']:
        # Need clarification about RFC § 7.7.7.  This may be wrong.
        # If so, change `return` to `insert = 'last'`, or move to function top.
        if insert is None:
            return the_list

        # `insert` was specified, so we will move this item.
        if value in the_list:
            the_list.remove(value)

    if operation in ['delete', 'remove']:
        if value not in the_list:
            if operation == 'delete':
                raise error.DataMissingAppError(rpc)
            return the_list  # Job done.

        the_list.remove(value)
        return the_list

    # Okay.  We are performing an addition of some kind on this list.
    if insert in ['before', 'after'] and (
            insert_value is None or insert_value not in the_list):
        raise error.BadAttributeAppError(rpc, node, val_attr)

    index = _index_for_list_op(the_list, insert, insert_value)
    if index == -1:
        raise error.BadAttributeAppError(
                rpc, node, '{urn:ietf:params:xml:ns:yang:1}insert'
        )

    the_list.insert(index, value)
    return the_list