diff options
author | Jake Hunsaker <jhunsake@redhat.com> | 2022-11-03 10:51:40 -0400 |
---|---|---|
committer | Jake Hunsaker <jhunsake@redhat.com> | 2022-11-30 13:25:53 -0500 |
commit | d70f0a18d5dc632021e3aa5a84e60d4171772a48 (patch) | |
tree | 7dbbb82634e15141d029b01b575c1cab5586bbf3 | |
parent | 54f3e7e0279029cfe980dd92baf64def7bb7cb36 (diff) | |
download | sos-d70f0a18d5dc632021e3aa5a84e60d4171772a48.tar.gz |
[cleaner,ipv6] Add support for IPv6 obfuscation
This commit adds a new parser and accompanying map for obfuscating IPv6
addresses.
This new parser will attempt to capture valid IPv6 networks and
addresses, and produce a mostly-randomized obfuscated pair. Due to the
multiple formats an IPv6 address can take, some identifiers are
necessary to preserve relevant information while still obfuscating
actual addresses and networks.
For example, global unicast addresses that have more than one defined
hextet (greater than /16 prefix) will always generate an obfuscated
address starting with `534f` (or 'so', continuing the style of our mac
address handling that uses 'sos' as an identifier). Addresses with a /16
prefix or less, will start with simply '53'. Private addresses, which
start with `fd` will generate an obfuscated address starting with
`fd53`, so that the contextual understanding that it is a private
network/address can remain. Link-local addresses which start with
`fe80::` will remain that way, only having the device hextets obfuscated
- again, keeping the contextual information that it is a link-local
interface intact, as otherwise these obfuscations may confuse end
users reviewing an sos report for problems.
Note that the address `::1` and `::/0` are explicitly skipped and never
obfuscated, for the same reasons given above.
Additionally, this parser/map will write data to the default map (and
any per-run private maps) differently than previous parsers. Rather than
simply dumping the obfuscation pairs into the map, it is broken up via
network, with hosts belonging to that network nested inside those
network entries (still being json-formatted). Users will also note that
the ipv6 entries in the map also have a `version` key, which is intended
to be used for handling future updates to the parser/map when upgrading
from an older sos version to a newer one. This may or may not be carried
over to future updates to other parsers.
Closes: #3008
Related: RHBZ#2134906
Signed-off-by: Jake Hunsaker <jhunsake@redhat.com>
-rw-r--r-- | man/en/sos-clean.1 | 4 | ||||
-rw-r--r-- | sos/cleaner/__init__.py | 11 | ||||
-rw-r--r-- | sos/cleaner/mappings/__init__.py | 2 | ||||
-rw-r--r-- | sos/cleaner/mappings/ipv6_map.py | 282 | ||||
-rw-r--r-- | sos/cleaner/parsers/ipv6_parser.py | 60 | ||||
-rw-r--r-- | tests/unittests/cleaner_tests.py | 73 |
6 files changed, 426 insertions, 6 deletions
diff --git a/man/en/sos-clean.1 b/man/en/sos-clean.1 index 7fd2df08..e8506eeb 100644 --- a/man/en/sos-clean.1 +++ b/man/en/sos-clean.1 @@ -60,8 +60,8 @@ Note that using this option is very likely to leave sensitive information in pla the target archive, so only use this option when absolutely necessary or you have complete trust in the party/parties that may handle the generated report. -Valid values for this option are currently: \fBhostname\fR, \fBip\fR, \fBmac\fR, \fBkeyword\fR, -and \fBusername\fR. +Valid values for this option are currently: \fBhostname\fR, \fBip\fR, \fBipv6\fR, +\fBmac\fR, \fBkeyword\fR, and \fBusername\fR. .TP .B \-\-keywords KEYWORDS Provide a comma-delimited list of keywords to scrub in addition to the default parsers. diff --git a/sos/cleaner/__init__.py b/sos/cleaner/__init__.py index ef072a23..5e73bf98 100644 --- a/sos/cleaner/__init__.py +++ b/sos/cleaner/__init__.py @@ -25,6 +25,7 @@ from sos.cleaner.parsers.mac_parser import SoSMacParser from sos.cleaner.parsers.hostname_parser import SoSHostnameParser from sos.cleaner.parsers.keyword_parser import SoSKeywordParser from sos.cleaner.parsers.username_parser import SoSUsernameParser +from sos.cleaner.parsers.ipv6_parser import SoSIPv6Parser from sos.cleaner.archives.sos import (SoSReportArchive, SoSReportDirectory, SoSCollectorArchive, SoSCollectorDirectory) @@ -54,11 +55,14 @@ class SoSCleaner(SoSComponent): that future iterations will maintain the same consistent obfuscation pairing. - In the case of IP addresses, support is for IPv4 and efforts are made to - keep network topology intact so that later analysis is as accurate and + In the case of IP addresses, support is for IPv4 and IPv6 - effort is made + to keep network topology intact so that later analysis is as accurate and easily understandable as possible. If an IP address is encountered that we cannot determine the netmask for, a random IP address is used instead. + For IPv6, note that IPv4-mapped addresses, e.g. ::ffff:10.11.12.13, are + NOT supported currently, and will remain unobfuscated. + For hostnames, domains are obfuscated as whole units, leaving the TLD in place. @@ -123,6 +127,7 @@ class SoSCleaner(SoSComponent): self.parsers = [ SoSHostnameParser(self.cleaner_mapping, self.opts.domains), SoSIPParser(self.cleaner_mapping), + SoSIPv6Parser(self.cleaner_mapping), SoSMacParser(self.cleaner_mapping), SoSKeywordParser(self.cleaner_mapping, self.opts.keywords, self.opts.keyword_file), @@ -447,7 +452,7 @@ third party. _map = {} for parser in self.parsers: _map[parser.map_file_key] = {} - _map[parser.map_file_key].update(parser.mapping.dataset) + _map[parser.map_file_key].update(parser.get_map_contents()) return _map diff --git a/sos/cleaner/mappings/__init__.py b/sos/cleaner/mappings/__init__.py index a6998f9d..da18d1e5 100644 --- a/sos/cleaner/mappings/__init__.py +++ b/sos/cleaner/mappings/__init__.py @@ -39,7 +39,7 @@ class SoSMap(): if not item or item in self.skip_keys or item in self.dataset.values(): return True for skip in self.ignore_matches: - if re.match(skip, item): + if re.match(skip, item, re.I): return True def add(self, item): diff --git a/sos/cleaner/mappings/ipv6_map.py b/sos/cleaner/mappings/ipv6_map.py new file mode 100644 index 00000000..cfe2900e --- /dev/null +++ b/sos/cleaner/mappings/ipv6_map.py @@ -0,0 +1,282 @@ +# Copyright 2022 Red Hat, Inc. Jake Hunsaker <jhunsake@redhat.com> + +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + +import ipaddress + +from random import getrandbits +from sos.cleaner.mappings import SoSMap + + +def generate_hextets(hextets): + """Generate a random set of hextets, based on the length of the source + hextet. If any hextets are compressed, keep that compression. + + E.G. '::1234:bcd' will generate a leading empty '' hextet, followed by two + 4-character hextets. + + :param hextets: The extracted hextets from a source address + :type hextets: ``list`` + + :returns: A set of randomized hextets for use in an obfuscated + address + :rtype: ``list`` + """ + return [random_hex(4) if h else '' for h in hextets] + + +def random_hex(length): + """Generate a string of size length of random hex characters. + + :param length: The number of characters to generate + :type length: ``int`` + + :returns: A string of ``length`` hex characters + :rtype: ``str`` + """ + return f"{getrandbits(4*length):0{length}x}" + + +class SoSIPv6Map(SoSMap): + """Mapping for IPv6 addresses and networks. + + Much like the IP map handles IPv4 addresses, this map is designed to take + IPv6 strings and obfuscate them consistently to maintain network topology. + To do this, addresses will be manipulated by the ipaddress library. + + If an IPv6 address is encountered without a netmask, it is assumed to be a + /64 address. + """ + + networks = {} + + ignore_matches = [ + r'^::1/.*', + r'::/0', + r'fd53:.*', + r'^53..:' + ] + + first_hexes = ['534f'] + + compile_regexes = False + version = 1 + + def conf_update(self, config): + """Override the base conf_update() so that we can load the existing + networks into ObfuscatedIPv6Network() objects for the current run. + """ + if 'networks' not in config: + return + for network in config['networks']: + _orig = ipaddress.ip_network(network) + _obfuscated = config['networks'][network]['obfuscated'] + _net = self._get_network(_orig, _obfuscated) + self.dataset[_net.original_address] = _net.obfuscated_address + for host in config['networks'][network]['hosts']: + _ob_host = config['networks'][network]['hosts'][host] + _net.add_obfuscated_host_address(host, _ob_host) + self.dataset[host] = _ob_host + + def sanitize_item(self, ipaddr): + _prefix = ipaddr.split('/')[-1] if '/' in ipaddr else '' + _ipaddr = ipaddr + if not _prefix: + # assume a /64 default per protocol + _ipaddr += "/64" + try: + _addr = ipaddress.ip_network(_ipaddr) + # ipaddr was an actual network per protocol + _net = self._get_network(_addr) + _ipaddr = _net.obfuscated_address + except ValueError: + # A ValueError is raised from the ipaddress module when passing + # an address such as 2620:52:0:2d80::4fe/64, which has host bits + # '::4fe' set - the /64 is generally interpreted only for network + # addresses. We use this behavior to properly obfuscate the network + # before obfuscating a host address within that network + _addr = ipaddress.ip_network(_ipaddr, strict=False) + _net = self._get_network(_addr) + if _net.network_addr not in self.dataset: + self.dataset[_net.original_address] = _net.obfuscated_address + # then, get the address within the network + _hostaddr = ipaddress.ip_address(_ipaddr.split('/')[0]) + _ipaddr = _net.obfuscate_host_address(_hostaddr) + + if _prefix and '/' not in _ipaddr: + return f"{_ipaddr}/{_prefix}" + return _ipaddr + + def _get_network(self, address, obfuscated=''): + """Attempt to find an existing ObfuscatedIPv6Network object from which + to either find an existing obfuscated match, or create a new one. If + no such object already exists, create it. + """ + _addr = address.compressed + if _addr not in self.networks: + self.networks[_addr] = ObfuscatedIPv6Network(address, obfuscated, + self.first_hexes) + return self.networks[_addr] + + +class ObfuscatedIPv6Network(): + """An abstraction class that represents a network that is (to be) handled + by sos. + + Each distinct IPv6 network that we encounter will have a representative + instance of this class, from which new obfuscated subnets and host + addresses will be generated. + + This class should be built from an ``ipaddress.IPv6Network`` object. If + an obfuscation string is not passed, one will be created during init. + """ + + def __init__(self, addr, obfuscation='', used_hexes=None): + """Basic setup for the obfuscated network. Minor validation on the addr + used to create the instance, as well as on an optional ``obfuscation`` + which if set, will serve as the obfuscated_network address. + + :param addr: The *un*obfuscated network to be handled + :type addr: ``ipaddress.IPv6Network`` + + :param obfuscation: An optional pre-determined string representation of + the obfuscated network address + :type obfuscation: ``str`` + + :param used_hexes: A list of already used hexes for the first hextet + of a potential global address obfuscation + :type used_hexes: ``list`` + """ + if not isinstance(addr, ipaddress.IPv6Network): + raise Exception('Invalid network: not an IPv6Network object') + self.addr = addr + self.prefix = addr.prefixlen + self.network_addr = addr.network_address.compressed + self.hosts = {} + if used_hexes is None: + self.first_hexes = ['534f'] + else: + self.first_hexes = used_hexes + if not obfuscation: + self._obfuscated_network = self._obfuscate_network_address() + else: + if not isinstance(obfuscation, str): + raise TypeError(f"Pre-determined obfuscated network address " + f"must be str, not {type(obfuscation)}") + self._obfuscated_network = obfuscation.split('/')[0] + + @property + def obfuscated_address(self): + return f"{self._obfuscated_network}/{self.prefix}" + + @property + def original_address(self): + return self.addr.compressed + + def _obfuscate_network_address(self): + """Generate the obfuscated pair for the network address. This is + determined based on the netmask of the network this class was built + on top of. + """ + if self.addr.is_global: + return self._obfuscate_global_address() + elif self.addr.is_link_local: + # link-local addresses are always fe80::/64. This is not sensitive + # in itself, and retaining the information that an address is a + # link-local address is important for problem analysis, so don't + # obfuscate this network information. + return self.network_addr + elif self.addr.is_private: + return self._obfuscate_private_address() + return self.network_addr + + def _obfuscate_global_address(self): + """Global unicast addresses have a 48-bit global routing prefix and a + 16-bit subnet. We set the global routing prefix to a static + sos-specific identifier that could never be seen in the wild, + '534f:' + + We then randomize the subnet hextet. + """ + _hextets = self.network_addr.split(':')[1:] + _ob_hex = ['534f'] + if all(not c for c in _hextets): + # we have only a single defined hextet, e.g. ff00::/64, so we need + # to not use the standard first-hex identifier or we'll overlap + # every similar address obfuscation. + # Set the leading bits to 53, but increment upwards from there for + # when we exceed 256 networks obfuscated in this manner. + _start = 53 + (len(self.first_hexes) // 256) + _ob_hex = f"{_start}{random_hex(2)}" + while _ob_hex in self.first_hexes: + # prevent duplicates + _ob_hex = f"{_start}{random_hex(2)}" + self.first_hexes.append(_ob_hex) + _ob_hex = [_ob_hex] + _ob_hex.extend(generate_hextets(_hextets)) + return ':'.join(_ob_hex) + + def _obfuscate_private_address(self): + """The first 8 bits will always be 'fd', the next 40 bits are meant + to be a global ID, followed by 16 bits for the subnet. To keep things + relatively simply we maintain the first hextet as 'fd53', and then + randomize any remaining hextets + """ + _hextets = self.network_addr.split(':')[1:] + _ob_hex = ['fd53'] + _ob_hex.extend(generate_hextets(_hextets)) + return ':'.join(_ob_hex) + + def obfuscate_host_address(self, addr): + """Given an unobfuscated address, generate an obfuscated match for it, + and save it to this network for tracking during the execution of clean. + + Note: another way to do this would be to convert the obfuscated network + to bytes, and add a random amount to that based on the number of + addresses that the network can support and from that new bytes count + craft a new IPv6 address. This has the advantage of absolutely + guaranteeing the new address is within the network space (whereas the + method employed below could *theoretically* generate an overlapping + address), but would in turn remove any ability to compress obfuscated + addresses to match the general format/syntax of the address it is + replacing. For the moment, it is assumed that being able to maintain a + quick mental note of "unobfuscated device ff00::1 is obfuscated device + 53ad::a1b2" is more desireable than "ff00::1 is now obfuscated as + 53ad::1234:abcd:9876:a1b2:". + + :param addr: The unobfuscated IPv6 address + :type addr: ``ipaddress.IPv6Address`` + + :returns: An obfuscated address within this network + :rtype: ``str`` + """ + def _generate_address(): + return ''.join([ + self._obfuscated_network, + ':'.join(generate_hextets(_host.split(':'))) + ]) + + if addr.compressed not in self.hosts: + try: + _, _host = addr.compressed.split(self.network_addr.rstrip(':')) + except ValueError: + # network addr is simply '::' + _n, _host = addr.compressed.split(self.network_addr) + _host = _host.lstrip(':') + _ob_host = _generate_address() + while _ob_host in self.hosts.values(): + _ob_host = _generate_address() + self.add_obfuscated_host_address(addr.compressed, _ob_host) + return self.hosts[addr.compressed] + + def add_obfuscated_host_address(self, host, obfuscated): + """Adds an obfuscated pair to the class for tracking and ongoing + consistency in obfuscation. + """ + self.hosts[host] = obfuscated diff --git a/sos/cleaner/parsers/ipv6_parser.py b/sos/cleaner/parsers/ipv6_parser.py new file mode 100644 index 00000000..b209c646 --- /dev/null +++ b/sos/cleaner/parsers/ipv6_parser.py @@ -0,0 +1,60 @@ +# Copyright 2022 Red Hat, Inc. Jake Hunsaker <jhunsake@redhat.com> + +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + +from sos.cleaner.parsers import SoSCleanerParser +from sos.cleaner.mappings.ipv6_map import SoSIPv6Map + + +class SoSIPv6Parser(SoSCleanerParser): + """Parser for handling IPv6 networks and addresses""" + + name = 'IPv6 Parser' + map_file_key = 'ipv6_map' + regex_patterns = [ + # Attention: note that this is a single long regex, not several entries + # This is initially based off of two regexes from the Java library + # for validating an IPv6 string. However, this is modified to begin and + # end with a negative lookbehind to ensure that a substring of 'ed::' + # is not extracted from a log message such as 'SomeFuncUsed::ADiffFunc' + # that come components may log with. Further, we optionally try to grab + # a trailing prefix for the network bits. + r"(?<![:\\.\\-a-z0-9])((([0-9a-f]{1,4})(:[0-9a-f]{1,4}){7})|" + r"(([0-9a-f]{1,4}(:[0-9a-f]{0,4}){0,5}))([^.])::(([0-9a-f]{1,4}" + r"(:[0-9a-f]{1,4}){0,5})?))(/\d{1,3})?(?![:\\a-z0-9])" + ] + skip_files = [ + 'etc/dnsmasq.conf.*', + '.*modinfo.*', + ] + compile_regexes = False + + def __init__(self, config): + self.mapping = SoSIPv6Map() + super(SoSIPv6Parser, self).__init__(config) + + def get_map_contents(self): + """Structure the dataset contents properly so that they can be reloaded + on subsequent runs correctly. + """ + _d = { + 'version': self.mapping.version, + 'networks': {} + } + for net in self.mapping.networks: + _net = self.mapping.networks[net] + _d['networks'][_net.original_address] = { + 'obfuscated': _net.obfuscated_address, + 'hosts': {} + } + for host in _net.hosts: + _ob_host = _net.hosts[host] + _d['networks'][_net.original_address]['hosts'][host] = _ob_host + + return _d diff --git a/tests/unittests/cleaner_tests.py b/tests/unittests/cleaner_tests.py index 9759b38a..8cf34341 100644 --- a/tests/unittests/cleaner_tests.py +++ b/tests/unittests/cleaner_tests.py @@ -13,10 +13,12 @@ from sos.cleaner.parsers.ip_parser import SoSIPParser from sos.cleaner.parsers.mac_parser import SoSMacParser from sos.cleaner.parsers.hostname_parser import SoSHostnameParser from sos.cleaner.parsers.keyword_parser import SoSKeywordParser +from sos.cleaner.parsers.ipv6_parser import SoSIPv6Parser from sos.cleaner.mappings.ip_map import SoSIPMap from sos.cleaner.mappings.mac_map import SoSMacMap from sos.cleaner.mappings.hostname_map import SoSHostnameMap from sos.cleaner.mappings.keyword_map import SoSKeywordMap +from sos.cleaner.mappings.ipv6_map import SoSIPv6Map class CleanerMapTests(unittest.TestCase): @@ -27,6 +29,7 @@ class CleanerMapTests(unittest.TestCase): self.host_map = SoSHostnameMap() self.host_map.load_domains_from_options(['redhat.com']) self.kw_map = SoSKeywordMap() + self.ipv6_map = SoSIPv6Map() def test_mac_map_obfuscate_valid_v4(self): _test = self.mac_map.get('12:34:56:78:90:ab') @@ -96,11 +99,57 @@ class CleanerMapTests(unittest.TestCase): _test = self.kw_map.get('foobar') self.assertEqual(_test, 'obfuscatedword0') + def test_ipv6_obfuscate_global(self): + _net = '2022:1104:abcd::' + _ob_net = self.ipv6_map.get(_net) + self.assertNotEqual(_net, _ob_net, 'Address was unchanged') + self.assertTrue(_ob_net.startswith('534f'), 'Global address does not start with identifier') + _host = '2022:1104:abcd::1234' + _ob_host = self.ipv6_map.get(_host) + self.assertNotEqual(_host, _ob_host, 'Host address was unchanged') + self.assertTrue(_host.startswith(_net), 'Host address not in network') + + def test_ipv6_link_local(self): + _test = 'fe80::1234' + _ob_test = self.ipv6_map.get(_test) + self.assertTrue(_ob_test.startswith('fe80'), 'Link-local identifier not maintained') + self.assertNotEqual(_test, _ob_test, 'Device address was unchanged') + + def test_ipv6_private(self): + _net = 'fd00:abcd::' + _host = 'fd00:abcd::1234' + _ob_net = self.ipv6_map.get(_net).split('/')[0] + _ob_host = self.ipv6_map.get(_host) + self.assertTrue(_ob_net.startswith('fd53'), 'Private network does not start with identifier') + self.assertTrue(_ob_host.startswith(_ob_net), 'Private address not in same network') + self.assertNotEqual(_net, _ob_net, 'Private network was unchanged') + + def test_ipv6_short_network(self): + _net = 'ff02::' + _ob_net = self.ipv6_map.get(_net) + self.assertTrue(_ob_net.startswith(('53', '54')), f'Short network does not start with identifier: {_ob_net}') + + def test_ipv6_consistent_obfuscation(self): + _test = '2022:1104:abcd::ef09' + _new = self.ipv6_map.get(_test) + _second = self.ipv6_map.get(_test) + self.assertEqual(_new, _second, "Same address produced two different results") + + def test_ipv6_global_no_collision(self): + """Tests that generating more than 256 global network obfuscations does + not produce any repeats""" + _nets = [] + for i in range(1, 300): + _nets.append(self.ipv6_map.get(f"f{i:03}::abcd").split('::')[0]) + # if there are any duplicates, then the length of the set will not match + self.assertTrue(len(set(_nets)) == len(_nets), "Duplicate global network obfuscations produced") + self.assertTrue(_nets[-1].startswith('54'), "First hextet of global network obfuscation over 256 not expected '54'") class CleanerParserTests(unittest.TestCase): def setUp(self): self.ip_parser = SoSIPParser(config={}) + self.ipv6_parser = SoSIPv6Parser(config={}) self.mac_parser = SoSMacParser(config={}) self.host_parser = SoSHostnameParser(config={}, opt_domains=['foobar.com']) @@ -193,3 +242,27 @@ class CleanerParserTests(unittest.TestCase): line = 'this is my foobar test line' _test = self.kw_parser_none.parse_line(line)[0] self.assertEqual(line, _test) + + def test_ipv6_parser_strings(self): + t1 = 'testing abcd:ef01::1234 as a compressed address' + t2 = 'testing abcd:ef01::5678:1234 as a separate address' + t3 = 'testing 2607:c540:8c00:3318::34/64 as another address' + t4 = 'testing 2007:1234:5678:90ab:0987:6543:21fe:dcba as a full address' + t1_test = self.ipv6_parser.parse_line(t1)[0] + t2_test = self.ipv6_parser.parse_line(t2)[0] + t3_test = self.ipv6_parser.parse_line(t3)[0] + t4_test = self.ipv6_parser.parse_line(t4)[0] + self.assertNotEqual(t1, t1_test, f"Parser did not match and obfuscate '{t1}'") + self.assertNotEqual(t2, t2_test, f"Parser did not match and obfuscate '{t2}'") + self.assertNotEqual(t3, t3_test, f"Parser did not match and obfuscate '{t3}'") + self.assertNotEqual(t4, t4_test, f"Parser did not match and obfuscate '{t4}'") + + def test_ipv6_no_match_signature(self): + modstr = '2D:4F:6E:55:4F:E8:5E:D2:D2:A3:73:62:AB:FD:F9:C5:A5:53:31:93' + mod_test = self.ipv6_parser.parse_line(modstr)[0] + self.assertEqual(modstr, mod_test, "Parser matched module signature, and should not") + + def test_ipv6_no_match_log_false_positive(self): + logln = 'Automatically imported trusted_ca::ca from trusted_ca/ca into production' + log_test = self.ipv6_parser.parse_line(logln)[0] + self.assertEqual(logln, log_test, "IPv6 parser incorrectly matched a log line of 'trusted_ca::ca'") |