1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
|
"""
NETCONF for APK Distributions server:
Server module
Copyright © 2020 Adélie Software in the Public Benefit, Inc.
Released under the terms of the NCSA license. See the LICENSE file included
with this source distribution for more information.
SPDX-License-Identifier: NCSA
"""
import logging
from lxml import etree
import netconf.base
import paramiko
from netconf import error, util
from netconf.server import NetconfSSHServer, SSHAuthorizedKeysController
from ncserver.base.log import configure_logging, LOG_AUTH, LOG_CONFIG, \
LOG_OPERATIONAL
from ncserver.base.modman import MODMAN
from ncserver.base.util import user_for_session
from ncserver.config import ConfigManager
QName = etree.QName # pylint: disable=I1101
def log_read(session, rpc):
"""Log a configuration read."""
# pylint: disable=I1101
LOG_CONFIG.debug(
'%s on session %d is requesting configuration: %s',
user_for_session(session), session.session_id, etree.tostring(rpc)
)
def log_write(session, rpc):
"""Log a configuration write attempt."""
# pylint: disable=I1101
LOG_CONFIG.debug(
'%s on session %d is editing configuration: %s',
user_for_session(session), session.session_id, etree.tostring(rpc)
)
class AuthenticationController(SSHAuthorizedKeysController):
"""Authentication controller that logs failures."""
def check_auth_publickey(self, username, key):
result = super().check_auth_publickey(username, key)
if result == paramiko.AUTH_FAILED:
LOG_AUTH.error('Authentication failed for %s', username)
return result
_open_session = getattr(netconf.base.NetconfSession, '_open_session')
def _open_session_and_log(self, is_server):
"""Log that authentication has been successful."""
_open_session(self, is_server)
peername = self.pkt_stream.stream.getpeername()
ipaddr = peername[0]
port = peername[1]
if ':' in ipaddr:
ipaddr = '[' + ipaddr + ']'
LOG_AUTH.notice(
'Authentication granted to %s from %s:%d, session %d',
user_for_session(self), ipaddr, port, self.session_id
)
setattr(netconf.base.NetconfSession, '_open_session', _open_session_and_log)
class Server:
"""The NETCONF server component."""
def __init__(self, port=830):
"""Create the NETCONF server.
:param int port:
The port number to listen on. Typically 830.
"""
self.config = ConfigManager()
self.auth = AuthenticationController(
users=self.config.get_list('server', 'users')
)
self.debug = self.config.get('server', 'debug')
if self.debug:
configure_logging(logging.DEBUG)
else:
loglevel = 'notice'
try:
loglevel = self.config.get('server', 'loglevel')
except KeyError:
pass
configure_logging(getattr(logging, loglevel.upper()))
self.server = NetconfSSHServer(
self.auth, self, port, self.config.get('server', 'host_key'),
self.debug
)
for module in self.config.get_list('server', 'modules'):
MODMAN.load_module(module)
def __getattr__(self, attr):
"""Maybe pass RPC calls on."""
if attr.startswith("rpc_"):
return self._rpc_wrapper
raise AttributeError("'Server' object has no attribute '" + attr + "'")
def close(self):
"""Close all connections."""
self.server.close()
def nc_append_capabilities(self, capabilities): # pylint: disable=R0201
"""List all capabilities of this NETCONF server."""
our_capabs = [
# Write directly to the `running` datastore.
'urn:ietf:params:netconf:capability:writable-running:1.0',
# Filter with XPath in addition to subtrees.
'urn:ietf:params:netconf:capability:xpath:1.0'
]
for capab in MODMAN.capabilities():
util.subelm(capabilities, 'capability').text = capab
for capab in our_capabs:
util.subelm(capabilities, 'capability').text = capab
def _rpc_wrapper(self, session, rpc, *params): # pylint: disable=R0201
"""Handle module RPCs."""
name = QName(rpc[0].tag).localname
LOG_OPERATIONAL.info(
'RPC %s invoked by %s on session %d',
name, user_for_session(session), session.session_id
)
if MODMAN.has_rpc(name):
try:
result = MODMAN.rpc(name)(session, rpc, *params)
LOG_OPERATIONAL.info(
'RPC %s invoked by %s on session %d completed',
name, user_for_session(session), session.session_id
)
return result
except error.RPCServerError as rpc_error:
LOG_OPERATIONAL.error(
'RPC %s invoked by %s on session %d encountered error %s',
name, user_for_session(session), session.session_id,
rpc_error
)
raise
else:
LOG_OPERATIONAL.warning(
'RPC %s invoked by %s on session %d is unknown',
name, user_for_session(session), session.session_id
)
raise error.OperationNotSupportedProtoError(rpc)
def rpc_get(self, session, rpc, filter_or_none): # pylint: disable=W0613
"""Handle the <get/> RPC."""
log_read(session, rpc)
root = util.elm('nc:data')
MODMAN.collect_operational(root)
return util.filter_results(rpc, root, filter_or_none, self.debug)
# pylint: disable=W0613
def rpc_get_config(self, session, rpc, source, filter_or_none):
"""Handle the <get-config/> RPC."""
log_read(session, rpc)
root = util.elm('nc:data')
MODMAN.collect_running(root)
return util.filter_results(rpc, root, filter_or_none, self.debug)
# pylint: disable=R0913,R0201
def rpc_edit_config(self, session, rpc, target, config, default_or_none):
"""Handle the <edit-config/> RPC."""
if len(target) != 1:
# We must have exactly 1 target.
raise error.BadElementProtoError(rpc, target)
log_write(session, rpc)
# strip the xmlns off the tag name.
datastore = QName(target[0].tag).localname
if datastore != "running":
raise error.InvalidValueAppError(rpc)
if default_or_none is None:
def_op = 'merge'
else:
def_op = default_or_none.text
MODMAN.collect_edit(session, rpc, config, def_op)
root = util.elm('nc:ok')
return root
if __name__ == "__main__":
s = Server()
try:
s.server.join()
except KeyboardInterrupt:
s.close()
|