summaryrefslogtreecommitdiff
path: root/ncserver/module/nms_ifupdownng.py
blob: 16183ea4ae8727bb1c7ef8f0c7a3989ba9236933 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
"""
NETCONF for APK Distributions server:
    Network Management System abstraction module for ifupdown-ng

Copyright © 2020 Adélie Software in the Public Benefit, Inc.

Released under the terms of the NCSA license.  See the LICENSE file included
with this source distribution for more information.

SPDX-License-Identifier: NCSA
"""

from contextlib import contextmanager

import ipaddress
import logging
import pathlib
import socket
import subprocess
import yaml

# pylint: disable=I1101
import netifaces

from ncserver.base.util import _


LOGGER = logging.getLogger(__name__)
"""The object used for logging informational messages."""


M_ABI_VERSION = 1
"""The ABI version of this NETCONF module."""


M_PREFIX = "nmsa"
"""The XML tag prefix for this module's tags."""


M_NS = "http://netconf.adelielinux.org/ns/netmgmt"
"""The XML namespace for this module."""


M_NAME = "adelie-nms-abstract"
"""The YANG model name for this module."""


M_REVISION = "2020-11-18"
"""The YANG revision date for this module."""


_CONFIG = dict()
"""The internal configuration handle."""


_SYSCTL = dict()
"""The internatl sysctl configuration handle."""


_TRANSACTION = False
"""Determines if a transaction is in progress."""


#############################
#      I N T E R N A L      #
#############################


def _load_config():
    """Load the current active configuration from /e/n/i."""
    global _CONFIG  # pylint: disable=W0603

    # Won't load during a transaction.
    if _TRANSACTION:
        return

    result = None
    try:
        result = subprocess.run(['/sbin/ifparse', '-AF',
                                 'yaml-raw'],
                                stdout=subprocess.PIPE, check=False)
    except OSError:
        LOGGER.error(_("ifupdown-ng may not be installed properly"))
        return

    if result.returncode != 0:
        LOGGER.error(_("ifparse returned error %d"), result.returncode)
        return

    rawyaml = result.stdout.decode('utf-8')
    _CONFIG = yaml.safe_load(rawyaml)


def _save_unlocked():
    """Save changes to the configuration."""
    eni = ""
    for iface in _CONFIG.keys():
        buf = "iface " + iface + "\n"
        for item in _CONFIG[iface]:
            for key, val in item.items():
                if key == 'auto' and \
                   (val is True or val.lower()[0] == 't'):
                    buf = "auto " + iface + "\n" + buf
                    continue
                if isinstance(val, bool):
                    val = str(val).lower()
                buf += "  "
                buf += str(key) + " " + str(val)
                buf += "\n"
        eni += buf + "\n"

    with open('/etc/network/interfaces', 'w') as conf_file:
        # snip last double-\n off
        conf_file.write(eni[:-1])


def _sysctl_save_unlocked():
    """Save changes to sysctl.conf (and sysctl itself)."""
    buf = "\n".join("{key}={val}".format(key=key, val=val)
                    for key, val in _SYSCTL.items())

    with open('/etc/netconf/sysctl.conf', 'w') as conf_file:
        conf_file.write(buf)

    subprocess.run(['sysctl', '-f', '/etc/netconf/sysctl.conf'], check=False)


def _save():
    """Save configuration changes, if a transaction is not in progress."""
    if _TRANSACTION:
        return

    _save_unlocked()


def _sysctl_save():
    """Save sysctl changes, if a transaction is not in progress."""
    if _TRANSACTION:
        return

    _sysctl_save_unlocked()


def _find_one(iface: str, key: str):
    """Find a single instance of configuration +key+ for +iface+."""
    if iface not in _CONFIG.keys():
        return None

    for item in _CONFIG[iface]:
        if key in item.keys():
            return item[key]

    return None


def _find_many(iface: str, key: str) -> list:
    """Find n instances of configuration +key+ for +iface+."""
    ret = list()

    if iface not in _CONFIG.keys():
        return None

    for item in _CONFIG[iface]:
        if key in item.keys():
            ret.append(item[key])

    return ret


def _add_one_to_list(iface: str, key: str, value: str):
    """Add +value+ to list +key+ for +iface+."""
    _CONFIG[iface].append({key: value})
    _save()


def _replace_one(iface: str, key: str, value: str):
    """Replace a single instance of +key+ for +iface+ with +value+."""
    iface_cfg = _CONFIG[iface]
    for item in iface_cfg:
        if key in item.keys():
            item[key] = value
            _save()
            return

    iface_cfg.append({key: value})
    _save()
    return


def _remove_one(iface: str, key: str):
    """Remove a single instance of +key+ from +iface+."""
    iface_cfg = _CONFIG[iface]
    for item in iface_cfg:
        if key in item.keys():
            iface_cfg.remove(item)
            _save()
            return


def _remove_one_from_list(iface: str, key: str, value: str):
    """Remove +value+ from list +key+ for +iface+."""
    iface_cfg = _CONFIG[iface]
    for item in iface_cfg:
        if key in item.keys():
            if item[key] == value:
                iface_cfg.remove(item)
                _save()
                return


def _iface_path(iface: str) -> pathlib.Path:
    """Retrieve the system device path for the specified interface."""
    return pathlib.Path('/sys/class/net/' + iface)


def _load_sysctl_cfg() -> dict:
    """Retrieve the desired sysctl configuration."""
    global _SYSCTL  # pylint: disable=W0603

    # Won't load during a transaction.
    if _TRANSACTION:
        return

    sysctl_path = pathlib.Path('/etc/netconf/sysctl.conf')
    if not sysctl_path.exists():
        LOGGER.error(_("NETCONF sysctl config missing; check installation"))
        return

    lines = list()
    with open(sysctl_path, 'r') as sysctl_file:
        lines = sysctl_file.readlines()

    _SYSCTL = dict({
        value[0]: value[1].strip() for value in
        [line.split('=') for line in lines]
    })


#############################
#    P A R A M E T E R S    #
#############################

_ENI_MAPPING = {'description': 'netconf-description',
                'enabled': 'auto',
                'ipv4_mtu': 'mtu'}
"""Mapping of NMSA keys to /e/n/i keys."""


_SYSCTL_MAPPING = {
    'ipv4_forwarding': 'net.ipv4.conf.{_if}.forwarding',
    'ipv6_enabled': 'net.ipv6.conf.{_if}.disable_ipv6',
    'ipv6_forwarding': 'net.ipv6.conf.{_if}.forwarding',
    'ipv6_mtu': 'net.ipv6.conf.{_if}.mtu',
    'ipv6_dad_xmit': 'net.ipv6.conf.{_if}.dad_transmits',
    'ipv6_slaac_enabled': 'net.ipv6.conf.{_if}.autoconf',
    'ipv6_slaac_type': 'net.ipv6.conf.{_if}.use_tempaddr',
    'ipv6_slaac_validlft': 'net.ipv6.conf.{_if}.temp_valid_lft',
    'ipv6_slaac_preflft': 'net.ipv6.conf.{_if}.temp_prefered_lft'
}
"""Mapping of NMSA keys to sysctl nodes."""


# # # # Getters # # # #

def get_one_eni(iface: str, parameter: str):
    """Retrieve the specified parameter from /e/n/i."""
    return _find_one(iface, _ENI_MAPPING[parameter])


def get_sysctl(iface: str, parameter: str):
    """Retrieve the specified parameter from sysctl.conf."""
    key = _SYSCTL_MAPPING[parameter].format(_if=iface)
    fallback = _SYSCTL_MAPPING[parameter].format(_if='all')

    return _SYSCTL.get(key, _SYSCTL.get(fallback, None))


def get_sysctl_bool(iface: str, parameter: str) -> bool:
    """Retrieve the specified boolean from sysctl.conf."""
    value = get_sysctl(iface, parameter)
    if value == '1':
        return True
    return False


def get_sysctl_int(iface: str, parameter: str):
    """Retrieve the specified integer from sysctl.conf."""
    value = get_sysctl(iface, parameter)
    if value is None:
        return None

    return int(value)


def get_ipv4en(iface: str, _) -> bool:
    """Retrieve IPv4 enablement status for the specified interface."""
    executors = _find_many(iface, 'use')
    v4_execs = ('dhcp', 'apipa', 'loopback')
    if any(executor in executors for executor in v4_execs):
        # We can't guarantee that DHCPv4 is in use on this interface,
        # but we can't guarantee it isn't, either.
        return True

    if any('.' in addr for addr in list_addresses(iface)):
        # IPv4 addresses contain '.'s, others don't.
        return True

    return False


def get_ipv6en(iface: str, parameter: str):
    """Retrieve IPv6 enablement status for the specified interface."""
    return not get_sysctl_bool(iface, parameter)


def get_slaac(iface: str, parameter: str):
    """Retrieve SLAAC configuration for the specified interface."""
    enabled = get_sysctl_bool(iface, 'ipv6_slaac_enabled')
    if not enabled:
        return False

    value = get_sysctl_int(iface, 'ipv6_slaac_type')
    if parameter == 'ipv6_slaac_globaladdr':
        return value in ('0', '1')
    return value in ('1', '2')


def live_sysctl(iface: str, parameter: str):
    """Retrieve the live value of a parameter from /proc/sys."""
    key = _SYSCTL_MAPPING[parameter].format(_if=iface)

    path = pathlib.Path("/proc/sys/" + key.replace('.', '/'))
    if path.exists():
        with open(path, 'r') as sysctl:
            return sysctl.read().strip()

    LOGGER.error(_("kernel does not know %s"), parameter)
    return None


def live_sysctl_bool(iface: str, parameter: str) -> bool:
    """Retrieve a live boolean value from /proc/sys."""
    value = live_sysctl(iface, parameter)
    if value is None:
        return None
    if value == '1':
        return True
    return False


def live_sysctl_int(iface: str, parameter: str) -> int:
    """Retrieve a live integer from /proc/sys."""
    value = live_sysctl(iface, parameter)
    if value is None:
        return None

    return int(value)


def live_ipv4en(iface: str, _) -> bool:
    """Retrieve the current IPv4 status for the specified interface."""
    raise NotImplementedError


def live_ipv6en(iface: str, parameter: str):
    """Retrieve current IPv6 status for the specified interface."""
    return not live_sysctl_bool(iface, parameter)


def live_ipv4_mtu(iface: str, _):
    """Determine the IPv4 MTU for the interface."""
    mtupath = _iface_path(iface) / "mtu"
    if mtupath.exists():
        with open(mtupath, 'r') as mtu_file:
            return int(mtu_file.read().strip())

    # Linux kernel: net/ethernet/eth.c line 367
    #               include/uapi/linux/if_ether.h line 36
    return 1500


def live_slaac(iface: str, parameter: str):
    """Determine live SLAAC configuration for the specified interface."""
    enabled = live_sysctl_bool(iface, 'ipv6_slaac_enabled')
    if not enabled:
        return False

    value = live_sysctl_int(iface, 'ipv6_slaac_type')
    if parameter == 'ipv6_slaac_globaladdr':
        return value in ('0', '1')
    return value in ('1', '2')


# # # # Setters # # # #

def set_sysctl(iface: str, parameter: str, value):
    """Set a sysctl value."""
    key = _SYSCTL_MAPPING[parameter].format(_if=iface)
    _SYSCTL[key] = value
    _sysctl_save()


def set_sysctl_bool(iface: str, parameter: str, value: bool):
    """Set a boolean sysctl value."""
    real = '0'
    if value:
        real = '1'

    set_sysctl(iface, parameter, real)


def set_sysctl_int(iface: str, parameter: str, value: int):
    """Set an integer sysctl value."""
    set_sysctl(iface, parameter, str(value))


def set_ipv4en(iface: str, _, value: bool):
    """Set the IPv4 enabled flag."""
    raise NotImplementedError


def set_ipv6en(iface: str, parameter: str, value: bool):
    """Set IPv6 enabled/disabled for the specified interface."""
    set_sysctl_bool(iface, parameter, not value)


def set_desc(iface: str, _, value: str):
    """Set the description for the specified interface."""
    _replace_one(iface, 'netconf-description', value)


def set_auto(iface: str, _, value: bool):
    """Set the auto flag for the specified interface."""
    _replace_one(iface, 'auto', str(value).lower())


def set_ipv4_mtu(iface: str, _, value: int):
    """Set the MTU for the specified interface."""
    _replace_one(iface, 'mtu', str(value))


def set_slaac(iface: str, parameter: str, value: bool):
    # pylint: disable=R0911
    """Set SLAAC parameters for the specified interface."""
    curr_global = get_slaac(iface, 'ipv6_slaac_globaladdr')
    curr_temp = get_slaac(iface, 'ipv6_slaac_tempaddr')

    if parameter == 'ipv6_slaac_globaladdr':
        if not value and not curr_temp:
            set_sysctl_bool(iface, 'ipv6_slaac_enabled', False)
            set_sysctl_int(iface, 'ipv6_slaac_type', '0')
            return
        # all further types enable SLAAC in some way.
        set_sysctl_bool(iface, 'ipv6_slaac_enabled', True)
        if not value and curr_temp:
            set_sysctl_int(iface, 'ipv6_slaac_type', '2')
            return
        if value and not curr_temp:
            set_sysctl_int(iface, 'ipv6_slaac_type', '0')
            return
        if value and curr_temp:
            set_sysctl_int(iface, 'ipv6_slaac_type', '1')
            return

    if parameter == 'ipv6_slaac_tempaddr':
        if not value and not curr_global:
            set_sysctl_bool(iface, 'ipv6_slaac_enabled', False)
            set_sysctl_int(iface, 'ipv6_slaac_type', '0')
            return
        # all further types enable SLAAC in some way.
        set_sysctl_bool(iface, 'ipv6_slaac_enabled', True)
        if not value and curr_global:
            set_sysctl_int(iface, 'ipv6_slaac_type', '0')
            return
        if value and not curr_global:
            set_sysctl_int(iface, 'ipv6_slaac_type', '2')
            return
        if value and curr_global:
            set_sysctl_int(iface, 'ipv6_slaac_type', '1')
            return

    LOGGER.error(_("unknown SLAAC parameter %s"), parameter)


# # # # Unsetters # # # #

def unset_one_eni(iface: str, parameter: str):
    """Unset a parameter in /e/n/i."""
    _remove_one(iface, _ENI_MAPPING[parameter])


def unset_sysctl(iface: str, parameter: str):
    """Unset a sysctl."""
    key = _SYSCTL_MAPPING[parameter].format(_if=iface)
    _SYSCTL.pop(key, None)
    _sysctl_save()


def unset_ipv4en(iface: str, _):
    """Unset the IPv4 enabled flag."""
    raise NotImplementedError


def unset_slaac(iface: str, parameter: str):
    """Set SLAAC parameters for the specified interface."""
    curr_global = get_slaac(iface, 'ipv6_slaac_globaladdr')
    curr_temp = get_slaac(iface, 'ipv6_slaac_tempaddr')

    if parameter == 'ipv6_slaac_globaladdr':
        # unset => default true
        set_sysctl_bool(iface, 'ipv6_slaac_enabled', True)
        if curr_temp:
            set_sysctl_int(iface, 'ipv6_slaac_type', '1')
        else:
            set_sysctl_int(iface, 'ipv6_slaac_type', '0')
        return

    if parameter == 'ipv6_slaac_tempaddr':
        # unset => default false
        set_sysctl_int(iface, 'ipv6_slaac_type', '0')
        if not curr_global:
            set_sysctl_bool(iface, 'ipv6_slaac_enabled', False)
        return

    LOGGER.error(_("unknown SLAAC parameter %s"), parameter)


_PARAMETERS = {
    # "name": (getter, live getter, setter, unsetter)
    'description':         (get_one_eni, get_one_eni, set_desc, unset_one_eni),
    'enabled':             (get_one_eni, get_one_eni, set_auto, unset_one_eni),
    'ipv4_enabled':        (get_ipv4en, live_ipv4en, set_ipv4en, unset_ipv4en),
    'ipv4_forwarding':     (get_sysctl_bool, live_sysctl_bool,
                            set_sysctl_bool, unset_sysctl),
    'ipv4_mtu':            (get_one_eni, live_ipv4_mtu,
                            set_ipv4_mtu, unset_one_eni),
    'ipv6_enabled':        (get_ipv6en, live_ipv6en, set_ipv6en, unset_sysctl),
    'ipv6_forwarding':     (get_sysctl_bool, live_sysctl_bool,
                            set_sysctl_bool, unset_sysctl),
    'ipv6_mtu':            (get_sysctl_int, live_sysctl_int,
                            set_sysctl_int, unset_sysctl),
    'ipv6_dad_xmit':       (get_sysctl_int, live_sysctl_int,
                            set_sysctl_int, unset_sysctl),
    'ipv6_slaac_globaladdr': (get_slaac, live_slaac, set_slaac, unset_slaac),
    'ipv6_slaac_tempaddr': (get_slaac, live_slaac, set_slaac, unset_slaac),
    'ipv6_slaac_validlft': (get_sysctl_int, live_sysctl_int,
                            set_sysctl_int, unset_sysctl),
    'ipv6_slaac_preflft':  (get_sysctl_int, live_sysctl_int,
                            set_sysctl_int, unset_sysctl)
}
"""Describes all supported parameters and their methods."""


#############################
#    P U B L I C   A P I    #
#############################


def interface_list():
    """Return a list of configured interfaces."""
    _load_config()

    return tuple(_CONFIG.keys())


def remove_interface(iface: str):
    """Completely remove configuration for +iface+."""
    if iface in _CONFIG.keys():
        del _CONFIG[iface]

    if not _TRANSACTION:
        _save()


def begin_transaction():
    """Begin a transaction."""
    global _TRANSACTION  # pylint: disable=W0603

    if _TRANSACTION:
        LOGGER.error(_("attempt to nest transactions"))
        return

    _TRANSACTION = True


def commit():
    """Commit any outstanding operations."""
    global _TRANSACTION  # pylint: disable=W0603

    if not _TRANSACTION:
        LOGGER.warning(_("commit when no transaction is in progress"))

    _save_unlocked()
    _sysctl_save_unlocked()

    _TRANSACTION = False


def rollback():
    """Roll back outstanding operations."""
    global _TRANSACTION  # pylint: disable=W0603

    _load_config()
    _load_sysctl_cfg()

    _TRANSACTION = False


@contextmanager
def transaction():
    """Context manager for NMSA transactions."""
    begin_transaction()
    try:
        yield None
    except:
        rollback()
        raise
    else:
        commit()


def get_param(iface: str, parameter: str):
    """Retrieve the parameter for the specified interface."""
    _load_config()
    _load_sysctl_cfg()

    if iface not in _CONFIG.keys():
        LOGGER.warning(
            _("requested parameter %s for non-existent interface %s"),
            parameter, iface
        )
        return None

    if parameter not in _PARAMETERS.keys():
        LOGGER.error(_("requested non-existent parameter %s for interface %s"),
                     parameter, iface)
        return None

    return _PARAMETERS[parameter][0](iface, parameter)


def curr_param(iface: str, parameter: str):
    """Retrieve the current parameter value for the specified interface."""
    _load_config()
    # Won't read from sysctl.conf so don't need to _load_sysctl_cfg.

    if parameter not in _PARAMETERS.keys():
        LOGGER.error(_("requested non-existent parameter %s for interface %s"),
                     parameter, iface)
        return None

    return _PARAMETERS[parameter][1](iface, parameter)


def set_param(iface: str, parameter: str, value):
    """Set the parameter for the specified interface."""
    _load_config()
    _load_sysctl_cfg()

    if parameter not in _PARAMETERS.keys():
        LOGGER.error(
            _("attempted to set non-existent parameter %s for interface %s"),
            parameter, iface
        )
        return

    # Allow creation of new interfaces from NETCONF.
    if iface not in _CONFIG.keys():
        _CONFIG[iface] = list()

    _PARAMETERS[parameter][2](iface, parameter, value)


def unset_param(iface: str, parameter: str):
    """Unset the parameter for the specified interface."""
    _load_config()
    _load_sysctl_cfg()

    if iface not in _CONFIG.keys():
        LOGGER.warning(
            _("attempted to unset parameter %s for non-existent interface %s"),
            parameter, iface
        )
        return

    if parameter not in _PARAMETERS.keys():
        LOGGER.error(
            _("attempted to unset non-existent parameter %s for interface %s"),
            parameter, iface
        )
        return

    _PARAMETERS[parameter][3](iface, parameter)


def list_addresses(iface: str) -> list:
    """Retrieve all configured addresses for the specified interface."""
    _load_config()

    if iface not in _CONFIG.keys():
        LOGGER.warning(_("requested addresses for non-existent interface %s"),
                       iface)
        return list()

    return _find_many(iface, 'address')


def live_addresses(iface: str) -> list:
    """Retrieve live addresses for the specified interface."""
    addresses = list()

    if iface not in netifaces.interfaces():
        LOGGER.warning(_("interface %s is not live"), iface)
        return addresses

    raw = netifaces.ifaddresses(iface)
    for ipv4 in raw.get(socket.AF_INET, tuple()):
        addr = ipv4['addr']
        mask = ipv4['netmask']
        iface = ipaddress.IPv4Interface("{a}/{m}".format(a=addr, m=mask))
        addresses.append(iface)

    for ipv6 in raw.get(socket.AF_INET6, tuple()):
        addr = ipv6['addr'].split('%')[0]
        mask = ipv6['netmask'][ipv6['netmask'].find('/') + 1:]
        iface = ipaddress.IPv6Interface("{a}/{m}".format(a=addr, m=mask))
        addresses.append(iface)

    return addresses


def add_address(iface: str, _type, addr: str, prefix):
    """Add an address of the specified ``type`` to the specified interface."""
    _load_config()

    if iface not in _CONFIG.keys():
        LOGGER.warning(
            _("attempted to add address to non-existent interface %s"), iface
        )
        return

    if _type not in (socket.AF_INET, socket.AF_INET6):
        LOGGER.error(_("unknown address type %r"), _type)
        return

    addr = None
    iface = ipaddress.IPv6Interface
    if _type == socket.AF_INET:
        iface = ipaddress.IPv4Interface

    try:
        addr = iface("{a}/{p}".format(a=addr, p=prefix))
    except Exception as err:
        raise ValueError("IP address is not valid") from err

    s_addr = str(addr)

    if s_addr in list_addresses(iface):
        raise RuntimeError("Duplicate address attempt")

    _add_one_to_list(iface, 'address', s_addr)


def remove_address(iface: str, addr: str):
    """Remove an address from the specified interface."""
    _load_config()

    if iface not in _CONFIG.keys():
        LOGGER.warning(
            _("attempted to remove address from non-existent interface %s"),
            iface
        )
        return

    # implement this.
    raise NotImplementedError


def running(_):
    """Non-YANG functional module only - no nodes."""


def operational(_):
    """Non-YANG functional module only - no nodes."""


def edit(*params):  # pylint: disable=W0613
    """Non-YANG functional module only - no nodes."""


# Load immediately when we're loaded so we can go straight to a transaction
# if desired.
_load_config()
_load_sysctl_cfg()