summaryrefslogtreecommitdiff
path: root/ncserver/base/util.py
blob: 164e30981a3925e1b947692037d7a205f50b7245 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
NETCONF for APK Distributions server:
    Base utility functions.

Copyright © 2020 Adélie Software in the Public Benefit, Inc.

Released under the terms of the NCSA license.  See the LICENSE file included
with this source distribution for more information.

SPDX-License-Identifier: NCSA
"""


from datetime import datetime, timezone
from netconf import error


def _(the_string: str) -> str:
    """Translate a user-facing string.

    Currently not implemented, but allows us to implement translation later."""
    return the_string


def user_for_session(session) -> str:
    """Determine the user for the specified session."""
    return session.pkt_stream.stream.get_transport().get_username()


def ensure_leaf(rpc, node):
    """Ensure that the node is a leaf (contains no child nodes)."""
    if len(node) > 0:
        raise error.UnknownElementAppError(rpc, node[0])


def node_operation(node, def_op='merge') -> str:
    """Determine the operation desired for an XML node.

    :param node:
        The node.

    :param str def_op:
        The default operation if none is specified.  Defaults to 'merge'.

    :returns str:
        The operation desired.
    """

    nc10 = '{urn:ietf:params:xml:ns:netconf:base:1.0}operation'
    nc11 = '{urn:ietf:params:xml:ns:netconf:base:1.1}operation'
    return node.attrib.get(nc11, node.attrib.get(nc10, def_op))


def yang_dt_for_timestamp(timestamp) -> str:
    """Provide a yang:date-and-time string for a given timestamp."""
    now = datetime.now(timezone.utc)
    zone = now.astimezone().tzinfo
    return datetime.fromtimestamp(timestamp, tz=zone).isoformat()


def _index_for_list_op(the_list: list, operation: str, insert_value) -> int:
    """Determine the index for a leaf-list insertion.

    :param list the_list:
        The Python list object that represents the leaf-list.

    :param str operation:
        The insertion mode: first, last, before, after.

    :param insert_value:
        The value to insert for modes before and after.  MUST be specified.

    :returns int:
        The index in which to insert.
    """
    if operation == 'last':
        return len(the_list)
    if operation == 'first':
        return 0
    if operation == 'before':
        return the_list.index(insert_value)
    if operation == 'after':
        return the_list.index(insert_value) + 1

    return -1


def handle_list_operation(rpc, the_list: list, node, operation: str) -> list:
    """Perform a <edit-config/> operation on a leaf-list based on RFC 6020.

    :param rpc:
        The RPC <edit-config/> message.

    :param list the_list:
        The Python list object that represents the leaf-list.

    :param node:
        The XML node that represents the item being operated on.

    :param str operation:
        The operation to perform: create, merge, replace, delete, remove.

    :returns list:
        The list with the value manipulated with the desired effect.
    """
    value = node.text

    # RFC 6020 § 7.7.7
    val_attr = '{urn:ietf:params:xml:ns:yang:1}value'
    insert = node.attrib.get('{urn:ietf:params:xml:ns:yang:1}insert', None)
    insert_value = node.attrib.get(val_attr, None)

    if operation == 'create':
        if value in the_list:
            raise error.DataExistsAppError(rpc)
        if insert is None:
            insert = 'last'

    if operation in ['merge', 'replace']:
        # Need clarification about RFC § 7.7.7.  This may be wrong.
        # If so, change `return` to `insert = 'last'`, or move to function top.
        if insert is None:
            return the_list

        # `insert` was specified, so we will move this item.
        if value in the_list:
            the_list.remove(value)

    if operation in ['delete', 'remove']:
        if value not in the_list:
            if operation == 'delete':
                raise error.DataMissingAppError(rpc)
            return the_list  # Job done.

        the_list.remove(value)
        return the_list

    # Okay.  We are performing an addition of some kind on this list.
    if insert in ['before', 'after'] and (
            insert_value is None or insert_value not in the_list):
        raise error.BadAttributeAppError(rpc, node, val_attr)

    index = _index_for_list_op(the_list, insert, insert_value)
    if index == -1:
        raise error.BadAttributeAppError(
                rpc, node, '{urn:ietf:params:xml:ns:yang:1}insert'
        )

    the_list.insert(index, value)
    return the_list