diff options
-rw-r--r-- | man/en/sos-clean.1 | 8 | ||||
-rw-r--r-- | sos/cleaner/__init__.py | 34 | ||||
-rw-r--r-- | sos/cleaner/mappings/__init__.py | 3 | ||||
-rw-r--r-- | sos/cleaner/mappings/hostname_map.py | 123 | ||||
-rw-r--r-- | sos/cleaner/mappings/ip_map.py | 16 | ||||
-rw-r--r-- | sos/cleaner/obfuscation_archive.py | 1 | ||||
-rw-r--r-- | sos/cleaner/parsers/__init__.py | 1 | ||||
-rw-r--r-- | sos/cleaner/parsers/hostname_parser.py | 59 | ||||
-rw-r--r-- | tests/cleaner_tests.py | 40 |
9 files changed, 270 insertions, 15 deletions
diff --git a/man/en/sos-clean.1 b/man/en/sos-clean.1 index 729c812f..85bd6dfc 100644 --- a/man/en/sos-clean.1 +++ b/man/en/sos-clean.1 @@ -3,6 +3,7 @@ sos clean - Obfuscate sensitive data from one or more sosreports .SH SYNOPSIS .B sos clean TARGET [options] + [\-\-domains] [\-\-map] [\-\-jobs] [\-\-no-update] @@ -37,7 +38,14 @@ directory. If an archive, it will first be extracted and then after obfuscation using the same compression method as the original. .SH OPTIONS +.TP +.B \-\-domains DOMAINS +Provide a comma-delimited list of domain names to obfuscate, in addition to those +matching the hostname of the system that created the sosreport. Subdomains that +match a domain given via this option will also be obfuscated. +For example, if \fB\-\-domains redhat.com\fR is specified, then 'redhat.com' will +be obfuscated, as will 'www.redhat.com' and subdomains such as 'foo.redhat.com'. .TP .B \-\-map FILE Provide a location to a valid mapping file to use as a reference for existing obfuscation pairs. diff --git a/sos/cleaner/__init__.py b/sos/cleaner/__init__.py index ef963534..3c4cb053 100644 --- a/sos/cleaner/__init__.py +++ b/sos/cleaner/__init__.py @@ -23,6 +23,7 @@ from sos import __version__ from sos.component import SoSComponent from sos.cleaner.parsers.ip_parser import SoSIPParser from sos.cleaner.parsers.mac_parser import SoSMacParser +from sos.cleaner.parsers.hostname_parser import SoSHostnameParser from sos.cleaner.obfuscation_archive import SoSObfuscationArchive from sos.utilities import get_human_readable from textwrap import fill @@ -37,6 +38,7 @@ class SoSCleaner(SoSComponent): desc = "Obfuscate sensitive networking information in a report" arg_defaults = { + 'domains': [], 'jobs': 4, 'map_file': '/etc/sos/cleaner/mapping', 'no_update': False, @@ -74,6 +76,7 @@ class SoSCleaner(SoSComponent): self.hash_name = self.policy.get_preferred_hash_name() self.parsers = [ + SoSHostnameParser(self.opts.map_file, self.opts.domains), SoSIPParser(self.opts.map_file), SoSMacParser(self.opts.map_file) ] @@ -150,8 +153,10 @@ third party. 'Cleaner/Masking Options', 'These options control how data obfuscation is performed' ) - clean_grp.add_argument('target', + clean_grp.add_argument('target', metavar='TARGET', help='The directory or archive to obfuscate') + clean_grp.add_argument('--domains', action='extend', default=[], + help='List of domain names to obfuscate') clean_grp.add_argument('-j', '--jobs', default=4, type=int, help='Number of concurrent archives to clean') clean_grp.add_argument('--map', dest='map_file', @@ -446,7 +451,7 @@ third party. if archive.should_skip_file(short_name): continue try: - count = self.obfuscate_file(fname) + count = self.obfuscate_file(fname, short_name) if count: archive.update_sub_count(short_name, count) except Exception as err: @@ -486,7 +491,21 @@ third party. :param archive SoSObfuscationArchive: An open archive object """ for parser in self.parsers: - self.obfuscate_file(archive.get_file_path(parser.prep_map_file)) + # this is a bit clunky, but we need to load this particular + # parser in a different way due to how hostnames are validated for + # obfuscation + prep_file = archive.get_file_path(parser.prep_map_file) + if not prep_file: + self.log_debug("Could not prepare %s: %s does not exist" + % (parser.name, parser.prep_map_file), + caller=archive.archive_name) + continue + if isinstance(parser, SoSHostnameParser): + with open(prep_file, 'r') as host_file: + hostname = host_file.readline().strip() + parser.load_hostname_into_map(hostname) + else: + self.obfuscate_file(prep_file) def obfuscate_file(self, filename, short_name=None, arc_name=None): """Obfuscate and individual file, line by line. @@ -513,7 +532,7 @@ third party. if not line.strip() or line.startswith('#'): continue try: - line, count = self.obfuscate_line(line) + line, count = self.obfuscate_line(line, short_name) subs += count tfile.write(line) except Exception as err: @@ -525,7 +544,7 @@ third party. tfile.close() return subs - def obfuscate_line(self, line): + def obfuscate_line(self, line, filename): """Run a line through each of the obfuscation parsers, keeping a cumulative total of substitutions done on that particular line. @@ -533,11 +552,16 @@ third party. :param line str: The raw line as read from the file being processed + :param filename str: Filename the line was read from Returns the fully obfuscated line and the number of substitutions made """ count = 0 for parser in self.parsers: + if filename and any([ + re.match(_s, filename) for _s in parser.skip_files + ]): + continue try: line, _count = parser.parse_line(line) count += _count diff --git a/sos/cleaner/mappings/__init__.py b/sos/cleaner/mappings/__init__.py index 212aeb9f..27fd1d50 100644 --- a/sos/cleaner/mappings/__init__.py +++ b/sos/cleaner/mappings/__init__.py @@ -10,7 +10,6 @@ import re -from collections import OrderedDict from threading import Lock @@ -25,7 +24,7 @@ class SoSMap(): ignore_list = [] def __init__(self): - self.dataset = OrderedDict() + self.dataset = {} self.lock = Lock() def ignore_item(self, item): diff --git a/sos/cleaner/mappings/hostname_map.py b/sos/cleaner/mappings/hostname_map.py new file mode 100644 index 00000000..a53470f1 --- /dev/null +++ b/sos/cleaner/mappings/hostname_map.py @@ -0,0 +1,123 @@ +# Copyright 2020 Red Hat, Inc. Jake Hunsaker <jhunsake@redhat.com> + +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + +import re + +from sos.cleaner.mappings import SoSMap + + +class SoSHostnameMap(SoSMap): + """Mapping store for hostnames and domain names + + Hostnames are obfuscated using an incrementing counter based on the total + number of hosts matched regardless of domain name. + + Domain names are obfuscated based on the host's hostname, plus any user + defined domains passed in by the `--domains` options. + + Domains are obfuscated as whole units, meaning the domains 'example.com' + and 'host.foo.example.com' will be separately obfuscated with no relation + for example as 'obfuscatedomdain1.com' and 'obfuscatedomain2.com'. + + Top-level domains are left untouched. + """ + + ignore_matches = [ + 'localhost', + '.*localdomain.*', + '^com..*' + ] + + host_count = 0 + domain_count = 0 + _domains = {} + hosts = {} + + def __init__(self, opt_domains): + super(SoSHostnameMap, self).__init__() + self.load_domains_from_options(opt_domains) + + def load_domains_from_options(self, domains): + for domain in domains: + self.sanitize_domain(domain.split('.')) + + def domain_name_in_loaded_domains(self, domain): + """Check if a potential domain is in one of the domains we've loaded + and should be obfuscated + """ + host = domain.split('.') + if len(host) == 1: + # don't block on host's shortname + return True + if len(host) < 2: + return False + else: + domain = host[0:-1] + for known_domain in self._domains: + if known_domain in domain: + return True + return False + + def get(self, item): + if item.startswith(('.', '_')): + item = item.lstrip('._') + item = item.strip() + if not self.domain_name_in_loaded_domains(item): + return item + return super(SoSHostnameMap, self).get(item) + + def sanitize_item(self, item): + host = item.split('.') + if len(host) == 1: + # we have a shortname for a host + return self.sanitize_short_name(host[0]) + if len(host) == 2: + # we have just a domain name, e.g. example.com + return self.sanitize_domain(host) + if len(host) > 2: + # we have an FQDN, e.g. foo.example.com + hostname = host[0] + domain = host[1:] + # obfuscate the short name + ob_hostname = self.sanitize_short_name(hostname) + ob_domain = self.sanitize_domain(domain) + return '.'.join([ob_hostname, ob_domain]) + + def sanitize_short_name(self, hostname): + """Obfuscate the short name of the host with an incremented counter + based on the total number of obfuscated host names + """ + if hostname not in self.hosts: + ob_host = "host%s" % self.host_count + self.hosts[hostname] = ob_host + self.host_count += 1 + return self.hosts[hostname] + + def sanitize_domain(self, domain): + """Obfuscate the domainname, broken out into subdomains. Top-level + domains are ignored. + """ + for _skip in self.ignore_matches: + # don't obfuscate vendor domains + if re.match(_skip, '.'.join(domain)): + return '.'.join(domain) + top_domain = domain[-1] + dname = '.'.join(domain[0:-1]) + ob_domain = self._new_obfuscated_domain(dname) + ob_domain = '.'.join([ob_domain, top_domain]) + return ob_domain + + def _new_obfuscated_domain(self, dname): + """Generate an obfuscated domain for each subdomain name given + """ + if dname not in self._domains: + self._domains[dname] = "obfuscateddomain%s" % self.domain_count + self.domain_count += 1 + return self._domains[dname] diff --git a/sos/cleaner/mappings/ip_map.py b/sos/cleaner/mappings/ip_map.py index 70519538..45fd9739 100644 --- a/sos/cleaner/mappings/ip_map.py +++ b/sos/cleaner/mappings/ip_map.py @@ -31,14 +31,14 @@ class SoSIPMap(SoSMap): """ ignore_matches = [ - '127.*', - '::1', - '0\.(.*)?', - '1\.(.*)?', - '8.8.8.8', - '8.8.4.4', - '169.254.*', - '255.*' + r'127.*', + r'::1', + r'0\.(.*)?', + r'1\.(.*)?', + r'8.8.8.8', + r'8.8.4.4', + r'169.254.*', + r'255.*' ] _networks = {} diff --git a/sos/cleaner/obfuscation_archive.py b/sos/cleaner/obfuscation_archive.py index a5c788ef..848b0133 100644 --- a/sos/cleaner/obfuscation_archive.py +++ b/sos/cleaner/obfuscation_archive.py @@ -89,6 +89,7 @@ class SoSObfuscationArchive(): self.extracted_path = self.extract_self() else: self.extracted_path = self.archive_path + self.log_debug("Extracted path is %s" % self.extracted_path) def get_compression(self): """Return the compression type used by the archive, if any. This is diff --git a/sos/cleaner/parsers/__init__.py b/sos/cleaner/parsers/__init__.py index 960ebba2..f781312a 100644 --- a/sos/cleaner/parsers/__init__.py +++ b/sos/cleaner/parsers/__init__.py @@ -37,6 +37,7 @@ class SoSCleanerParser(): name = 'Undefined Parser' regex_patterns = [] + skip_files = [] map_file_key = 'unset' prep_map_file = 'unset' diff --git a/sos/cleaner/parsers/hostname_parser.py b/sos/cleaner/parsers/hostname_parser.py new file mode 100644 index 00000000..33871c1c --- /dev/null +++ b/sos/cleaner/parsers/hostname_parser.py @@ -0,0 +1,59 @@ +# Copyright 2020 Red Hat, Inc. Jake Hunsaker <jhunsake@redhat.com> + +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + +from sos.cleaner.parsers import SoSCleanerParser +from sos.cleaner.mappings.hostname_map import SoSHostnameMap + + +class SoSHostnameParser(SoSCleanerParser): + + name = 'Hostname Parser' + map_file_key = 'hostname_map' + prep_map_file = 'sos_commands/host/hostname' + regex_patterns = [ + r'(((\b|_)[a-zA-Z0-9-\.]{1,200}\.[a-zA-Z]{1,63}\b))' + ] + + def __init__(self, conf_file=None, opt_domains=None): + self.mapping = SoSHostnameMap(opt_domains) + self.short_names = [] + super(SoSHostnameParser, self).__init__(conf_file) + + def load_hostname_into_map(self, hostname_string): + """Force add the domainname found in /sos_commands/host/hostname into + the map. We have to do this here since the normal map prep approach + from the parser would be ignored since the system's hostname is not + guaranteed + """ + if 'localhost' in hostname_string: + return + domains = hostname_string.split('.') + if len(domains) > 1: + self.short_names.append(domains[0]) + else: + self.short_names.append(hostname_string) + if len(domains) > 3: + # make sure we implicitly get example.com if the system's hostname + # is something like foo.bar.example.com + high_domain = '.'.join(domains[-2:]) + self.mapping.add(high_domain) + self.mapping.add(hostname_string) + + def parse_line(self, line): + """Override the default parse_line() method to also check for the + shortname of the host derived from the hostname. + """ + count = 0 + line, count = super(SoSHostnameParser, self).parse_line(line) + for short_name in self.short_names: + if short_name in line: + count += 1 + line = line.replace(short_name, self.mapping.get(short_name)) + return line, count diff --git a/tests/cleaner_tests.py b/tests/cleaner_tests.py index 0c6ac589..4e292a05 100644 --- a/tests/cleaner_tests.py +++ b/tests/cleaner_tests.py @@ -11,8 +11,10 @@ import unittest from ipaddress import ip_interface from sos.cleaner.parsers.ip_parser import SoSIPParser from sos.cleaner.parsers.mac_parser import SoSMacParser +from sos.cleaner.parsers.hostname_parser import SoSHostnameParser from sos.cleaner.mappings.ip_map import SoSIPMap from sos.cleaner.mappings.mac_map import SoSMacMap +from sos.cleaner.mappings.hostname_map import SoSHostnameMap class CleanerMapTests(unittest.TestCase): @@ -20,6 +22,7 @@ class CleanerMapTests(unittest.TestCase): def setUp(self): self.mac_map = SoSMacMap() self.ip_map = SoSIPMap() + self.host_map = SoSHostnameMap(['redhat.com']) def test_mac_map_obfuscate_valid_v4(self): _test = self.mac_map.get('12:34:56:78:90:ab') @@ -65,12 +68,33 @@ class CleanerMapTests(unittest.TestCase): _test = self.ip_map.get('127.0.0.1') self.assertEquals(_test, '127.0.0.1') + def test_hostname_obfuscate_domain_options(self): + _test = self.host_map.get('www.redhat.com') + self.assertNotEqual(_test, 'www.redhat.com') + + def test_hostname_obfuscate_same_item(self): + _test1 = self.host_map.get('example.redhat.com') + _test2 = self.host_map.get('example.redhat.com') + self.assertEqual(_test1, _test2) + + def test_hostname_obfuscate_just_domain(self): + _test = self.host_map.get('redhat.com') + self.assertEqual(_test, 'obfuscateddomain0.com') + + def test_hostname_no_obfuscate_non_loaded_domain(self): + _test = self.host_map.get('foobar.com') + self.assertEqual(_test, 'foobar.com') + + def test_hostname_no_obfuscate_non_loaded_fqdn(self): + _test = self.host_map.get('example.foobar.com') + self.assertEqual(_test, 'example.foobar.com') class CleanerParserTests(unittest.TestCase): def setUp(self): self.ip_parser = SoSIPParser() self.mac_parser = SoSMacParser() + self.host_parser = SoSHostnameParser(opt_domains='foobar.com') def test_ip_parser_valid_ipv4_line(self): line = 'foobar foo 10.0.0.1/24 barfoo bar' @@ -95,3 +119,19 @@ class CleanerParserTests(unittest.TestCase): line = 'foobar foo AA:BB:CC:FF:FE:DD:EE:FF bar barfoo' _test = self.mac_parser.parse_line(line)[0] self.assertNotEqual(line, _test) + + def test_hostname_load_hostname_string(self): + fqdn = 'myhost.subnet.example.com' + self.host_parser.load_hostname_into_map(fqdn) + + def test_hostname_valid_domain_line(self): + self.host_parser.load_hostname_into_map('myhost.subnet.example.com') + line = 'testing myhost.subnet.example.com in a string' + _test = self.host_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_hostname_short_name_in_line(self): + self.host_parser.load_hostname_into_map('myhost.subnet.example.com') + line = 'testing just myhost in a line' + _test = self.host_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) |