1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
"""
NETCONF for APK Distributions server:
Server module
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import logging
from lxml import etree
from netconf import error, util
from netconf.server import NetconfSSHServer, SSHAuthorizedKeysController
from ncserver.base.log import configure_logging, LOG_CONFIG
from ncserver.base.modman import ModuleManager
from ncserver.base.util import user_for_session
from ncserver.config import ConfigManager
QName = etree.QName # pylint: disable=I1101
def log_read(session, rpc):
"""Log a configuration read."""
LOG_CONFIG.debug('%s on session %d is requesting configuration: %s',
user_for_session(session), session.session_id, etree.tostring(rpc)
)
class Server:
"""The NETCONF server component."""
def __init__(self, port=830):
"""Create the NETCONF server.
:param int port:
The port number to listen on. Typically 830.
"""
self.config = ConfigManager()
self.auth = SSHAuthorizedKeysController(
users=self.config.get_list('server', 'users')
)
self.debug = self.config.get('server', 'debug')
if self.debug:
configure_logging(logging.DEBUG)
else:
loglevel = 'notice'
try:
loglevel = self.config.get('server', 'loglevel')
except KeyError:
pass
configure_logging(getattr(logging, loglevel.upper()))
self.server = NetconfSSHServer(
self.auth, self, port, self.config.get('server', 'host_key'),
self.debug
)
self.modman = ModuleManager()
for module in self.config.get_list('server', 'modules'):
self.modman.load_module(module)
def close(self):
"""Close all connections."""
self.server.close()
def nc_append_capabilities(self, capabilities):
"""List all capabilities of this NETCONF server."""
our_capabs = [
# Write directly to the `running` datastore.
'urn:ietf:params:netconf:capability:writable-running:1.0',
# Filter with XPath in addition to subtrees.
'urn:ietf:params:netconf:capability:xpath:1.0'
]
for capab in self.modman.capabilities():
util.subelm(capabilities, 'capability').text = capab
for capab in our_capabs:
util.subelm(capabilities, 'capability').text = capab
def rpc_get(self, session, rpc, filter_or_none): # pylint: disable=W0613
"""Handle the <get/> RPC."""
log_read(session, rpc)
root = util.elm('nc:data')
self.modman.collect_running(root)
self.modman.collect_operational(root)
return util.filter_results(rpc, root, filter_or_none, self.debug)
# pylint: disable=W0613
def rpc_get_config(self, session, rpc, source, filter_or_none):
"""Handle the <get-config/> RPC."""
log_read(session, rpc)
root = util.elm('nc:data')
self.modman.collect_running(root)
return util.filter_results(rpc, root, filter_or_none, self.debug)
# pylint: disable=R0913
def rpc_edit_config(self, session, rpc, target, config, default_or_none):
"""Handle the <edit-config/> RPC."""
if len(target) != 1:
# We must have exactly 1 target.
raise error.BadElementProtoError(rpc, target)
# strip the xmlns off the tag name.
datastore = QName(target[0].tag).localname
if datastore != "running":
raise error.InvalidValueAppError(rpc)
if default_or_none is None:
def_op = 'merge'
else:
def_op = default_or_none.text
self.modman.collect_edit(rpc, config, def_op)
root = util.elm('nc:ok')
return root
if __name__ == "__main__":
s = Server()
try:
s.server.join()
except KeyboardInterrupt:
s.close()
|