aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJake Hunsaker <jhunsake@redhat.com>2020-05-28 10:39:35 -0400
committerJake Hunsaker <jhunsake@redhat.com>2020-06-17 12:11:29 -0400
commit33bfe244f66af4dedee887799db0b6eaf85c9f08 (patch)
tree6120ee9a5c0866a7229bdb3542d1f09dfc94eb7f
parentaf0f418b7f5fd677cf153b97c7a8f980b5728aaf (diff)
downloadsos-33bfe244f66af4dedee887799db0b6eaf85c9f08.tar.gz
[cleaner] Add hostname parser
Adds a hostname parser to `sos clean` that will attempt to obfuscate FQDNs matching the hostname of the system that generated the sosreport, as found in sos_commands/host/hostname. Additionally, any domains added via the `--domains` option will also be obfuscated, including any subdomains of the domain(s) specified by the option. Signed-off-by: Jake Hunsaker <jhunsake@redhat.com>
-rw-r--r--man/en/sos-clean.18
-rw-r--r--sos/cleaner/__init__.py34
-rw-r--r--sos/cleaner/mappings/__init__.py3
-rw-r--r--sos/cleaner/mappings/hostname_map.py123
-rw-r--r--sos/cleaner/mappings/ip_map.py16
-rw-r--r--sos/cleaner/obfuscation_archive.py1
-rw-r--r--sos/cleaner/parsers/__init__.py1
-rw-r--r--sos/cleaner/parsers/hostname_parser.py59
-rw-r--r--tests/cleaner_tests.py40
9 files changed, 270 insertions, 15 deletions
diff --git a/man/en/sos-clean.1 b/man/en/sos-clean.1
index 729c812f..85bd6dfc 100644
--- a/man/en/sos-clean.1
+++ b/man/en/sos-clean.1
@@ -3,6 +3,7 @@
sos clean - Obfuscate sensitive data from one or more sosreports
.SH SYNOPSIS
.B sos clean TARGET [options]
+ [\-\-domains]
[\-\-map]
[\-\-jobs]
[\-\-no-update]
@@ -37,7 +38,14 @@ directory. If an archive, it will first be extracted and then after obfuscation
using the same compression method as the original.
.SH OPTIONS
+.TP
+.B \-\-domains DOMAINS
+Provide a comma-delimited list of domain names to obfuscate, in addition to those
+matching the hostname of the system that created the sosreport. Subdomains that
+match a domain given via this option will also be obfuscated.
+For example, if \fB\-\-domains redhat.com\fR is specified, then 'redhat.com' will
+be obfuscated, as will 'www.redhat.com' and subdomains such as 'foo.redhat.com'.
.TP
.B \-\-map FILE
Provide a location to a valid mapping file to use as a reference for existing obfuscation pairs.
diff --git a/sos/cleaner/__init__.py b/sos/cleaner/__init__.py
index ef963534..3c4cb053 100644
--- a/sos/cleaner/__init__.py
+++ b/sos/cleaner/__init__.py
@@ -23,6 +23,7 @@ from sos import __version__
from sos.component import SoSComponent
from sos.cleaner.parsers.ip_parser import SoSIPParser
from sos.cleaner.parsers.mac_parser import SoSMacParser
+from sos.cleaner.parsers.hostname_parser import SoSHostnameParser
from sos.cleaner.obfuscation_archive import SoSObfuscationArchive
from sos.utilities import get_human_readable
from textwrap import fill
@@ -37,6 +38,7 @@ class SoSCleaner(SoSComponent):
desc = "Obfuscate sensitive networking information in a report"
arg_defaults = {
+ 'domains': [],
'jobs': 4,
'map_file': '/etc/sos/cleaner/mapping',
'no_update': False,
@@ -74,6 +76,7 @@ class SoSCleaner(SoSComponent):
self.hash_name = self.policy.get_preferred_hash_name()
self.parsers = [
+ SoSHostnameParser(self.opts.map_file, self.opts.domains),
SoSIPParser(self.opts.map_file),
SoSMacParser(self.opts.map_file)
]
@@ -150,8 +153,10 @@ third party.
'Cleaner/Masking Options',
'These options control how data obfuscation is performed'
)
- clean_grp.add_argument('target',
+ clean_grp.add_argument('target', metavar='TARGET',
help='The directory or archive to obfuscate')
+ clean_grp.add_argument('--domains', action='extend', default=[],
+ help='List of domain names to obfuscate')
clean_grp.add_argument('-j', '--jobs', default=4, type=int,
help='Number of concurrent archives to clean')
clean_grp.add_argument('--map', dest='map_file',
@@ -446,7 +451,7 @@ third party.
if archive.should_skip_file(short_name):
continue
try:
- count = self.obfuscate_file(fname)
+ count = self.obfuscate_file(fname, short_name)
if count:
archive.update_sub_count(short_name, count)
except Exception as err:
@@ -486,7 +491,21 @@ third party.
:param archive SoSObfuscationArchive: An open archive object
"""
for parser in self.parsers:
- self.obfuscate_file(archive.get_file_path(parser.prep_map_file))
+ # this is a bit clunky, but we need to load this particular
+ # parser in a different way due to how hostnames are validated for
+ # obfuscation
+ prep_file = archive.get_file_path(parser.prep_map_file)
+ if not prep_file:
+ self.log_debug("Could not prepare %s: %s does not exist"
+ % (parser.name, parser.prep_map_file),
+ caller=archive.archive_name)
+ continue
+ if isinstance(parser, SoSHostnameParser):
+ with open(prep_file, 'r') as host_file:
+ hostname = host_file.readline().strip()
+ parser.load_hostname_into_map(hostname)
+ else:
+ self.obfuscate_file(prep_file)
def obfuscate_file(self, filename, short_name=None, arc_name=None):
"""Obfuscate and individual file, line by line.
@@ -513,7 +532,7 @@ third party.
if not line.strip() or line.startswith('#'):
continue
try:
- line, count = self.obfuscate_line(line)
+ line, count = self.obfuscate_line(line, short_name)
subs += count
tfile.write(line)
except Exception as err:
@@ -525,7 +544,7 @@ third party.
tfile.close()
return subs
- def obfuscate_line(self, line):
+ def obfuscate_line(self, line, filename):
"""Run a line through each of the obfuscation parsers, keeping a
cumulative total of substitutions done on that particular line.
@@ -533,11 +552,16 @@ third party.
:param line str: The raw line as read from the file being
processed
+ :param filename str: Filename the line was read from
Returns the fully obfuscated line and the number of substitutions made
"""
count = 0
for parser in self.parsers:
+ if filename and any([
+ re.match(_s, filename) for _s in parser.skip_files
+ ]):
+ continue
try:
line, _count = parser.parse_line(line)
count += _count
diff --git a/sos/cleaner/mappings/__init__.py b/sos/cleaner/mappings/__init__.py
index 212aeb9f..27fd1d50 100644
--- a/sos/cleaner/mappings/__init__.py
+++ b/sos/cleaner/mappings/__init__.py
@@ -10,7 +10,6 @@
import re
-from collections import OrderedDict
from threading import Lock
@@ -25,7 +24,7 @@ class SoSMap():
ignore_list = []
def __init__(self):
- self.dataset = OrderedDict()
+ self.dataset = {}
self.lock = Lock()
def ignore_item(self, item):
diff --git a/sos/cleaner/mappings/hostname_map.py b/sos/cleaner/mappings/hostname_map.py
new file mode 100644
index 00000000..a53470f1
--- /dev/null
+++ b/sos/cleaner/mappings/hostname_map.py
@@ -0,0 +1,123 @@
+# Copyright 2020 Red Hat, Inc. Jake Hunsaker <jhunsake@redhat.com>
+
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+
+import re
+
+from sos.cleaner.mappings import SoSMap
+
+
+class SoSHostnameMap(SoSMap):
+ """Mapping store for hostnames and domain names
+
+ Hostnames are obfuscated using an incrementing counter based on the total
+ number of hosts matched regardless of domain name.
+
+ Domain names are obfuscated based on the host's hostname, plus any user
+ defined domains passed in by the `--domains` options.
+
+ Domains are obfuscated as whole units, meaning the domains 'example.com'
+ and 'host.foo.example.com' will be separately obfuscated with no relation
+ for example as 'obfuscatedomdain1.com' and 'obfuscatedomain2.com'.
+
+ Top-level domains are left untouched.
+ """
+
+ ignore_matches = [
+ 'localhost',
+ '.*localdomain.*',
+ '^com..*'
+ ]
+
+ host_count = 0
+ domain_count = 0
+ _domains = {}
+ hosts = {}
+
+ def __init__(self, opt_domains):
+ super(SoSHostnameMap, self).__init__()
+ self.load_domains_from_options(opt_domains)
+
+ def load_domains_from_options(self, domains):
+ for domain in domains:
+ self.sanitize_domain(domain.split('.'))
+
+ def domain_name_in_loaded_domains(self, domain):
+ """Check if a potential domain is in one of the domains we've loaded
+ and should be obfuscated
+ """
+ host = domain.split('.')
+ if len(host) == 1:
+ # don't block on host's shortname
+ return True
+ if len(host) < 2:
+ return False
+ else:
+ domain = host[0:-1]
+ for known_domain in self._domains:
+ if known_domain in domain:
+ return True
+ return False
+
+ def get(self, item):
+ if item.startswith(('.', '_')):
+ item = item.lstrip('._')
+ item = item.strip()
+ if not self.domain_name_in_loaded_domains(item):
+ return item
+ return super(SoSHostnameMap, self).get(item)
+
+ def sanitize_item(self, item):
+ host = item.split('.')
+ if len(host) == 1:
+ # we have a shortname for a host
+ return self.sanitize_short_name(host[0])
+ if len(host) == 2:
+ # we have just a domain name, e.g. example.com
+ return self.sanitize_domain(host)
+ if len(host) > 2:
+ # we have an FQDN, e.g. foo.example.com
+ hostname = host[0]
+ domain = host[1:]
+ # obfuscate the short name
+ ob_hostname = self.sanitize_short_name(hostname)
+ ob_domain = self.sanitize_domain(domain)
+ return '.'.join([ob_hostname, ob_domain])
+
+ def sanitize_short_name(self, hostname):
+ """Obfuscate the short name of the host with an incremented counter
+ based on the total number of obfuscated host names
+ """
+ if hostname not in self.hosts:
+ ob_host = "host%s" % self.host_count
+ self.hosts[hostname] = ob_host
+ self.host_count += 1
+ return self.hosts[hostname]
+
+ def sanitize_domain(self, domain):
+ """Obfuscate the domainname, broken out into subdomains. Top-level
+ domains are ignored.
+ """
+ for _skip in self.ignore_matches:
+ # don't obfuscate vendor domains
+ if re.match(_skip, '.'.join(domain)):
+ return '.'.join(domain)
+ top_domain = domain[-1]
+ dname = '.'.join(domain[0:-1])
+ ob_domain = self._new_obfuscated_domain(dname)
+ ob_domain = '.'.join([ob_domain, top_domain])
+ return ob_domain
+
+ def _new_obfuscated_domain(self, dname):
+ """Generate an obfuscated domain for each subdomain name given
+ """
+ if dname not in self._domains:
+ self._domains[dname] = "obfuscateddomain%s" % self.domain_count
+ self.domain_count += 1
+ return self._domains[dname]
diff --git a/sos/cleaner/mappings/ip_map.py b/sos/cleaner/mappings/ip_map.py
index 70519538..45fd9739 100644
--- a/sos/cleaner/mappings/ip_map.py
+++ b/sos/cleaner/mappings/ip_map.py
@@ -31,14 +31,14 @@ class SoSIPMap(SoSMap):
"""
ignore_matches = [
- '127.*',
- '::1',
- '0\.(.*)?',
- '1\.(.*)?',
- '8.8.8.8',
- '8.8.4.4',
- '169.254.*',
- '255.*'
+ r'127.*',
+ r'::1',
+ r'0\.(.*)?',
+ r'1\.(.*)?',
+ r'8.8.8.8',
+ r'8.8.4.4',
+ r'169.254.*',
+ r'255.*'
]
_networks = {}
diff --git a/sos/cleaner/obfuscation_archive.py b/sos/cleaner/obfuscation_archive.py
index a5c788ef..848b0133 100644
--- a/sos/cleaner/obfuscation_archive.py
+++ b/sos/cleaner/obfuscation_archive.py
@@ -89,6 +89,7 @@ class SoSObfuscationArchive():
self.extracted_path = self.extract_self()
else:
self.extracted_path = self.archive_path
+ self.log_debug("Extracted path is %s" % self.extracted_path)
def get_compression(self):
"""Return the compression type used by the archive, if any. This is
diff --git a/sos/cleaner/parsers/__init__.py b/sos/cleaner/parsers/__init__.py
index 960ebba2..f781312a 100644
--- a/sos/cleaner/parsers/__init__.py
+++ b/sos/cleaner/parsers/__init__.py
@@ -37,6 +37,7 @@ class SoSCleanerParser():
name = 'Undefined Parser'
regex_patterns = []
+ skip_files = []
map_file_key = 'unset'
prep_map_file = 'unset'
diff --git a/sos/cleaner/parsers/hostname_parser.py b/sos/cleaner/parsers/hostname_parser.py
new file mode 100644
index 00000000..33871c1c
--- /dev/null
+++ b/sos/cleaner/parsers/hostname_parser.py
@@ -0,0 +1,59 @@
+# Copyright 2020 Red Hat, Inc. Jake Hunsaker <jhunsake@redhat.com>
+
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+
+from sos.cleaner.parsers import SoSCleanerParser
+from sos.cleaner.mappings.hostname_map import SoSHostnameMap
+
+
+class SoSHostnameParser(SoSCleanerParser):
+
+ name = 'Hostname Parser'
+ map_file_key = 'hostname_map'
+ prep_map_file = 'sos_commands/host/hostname'
+ regex_patterns = [
+ r'(((\b|_)[a-zA-Z0-9-\.]{1,200}\.[a-zA-Z]{1,63}\b))'
+ ]
+
+ def __init__(self, conf_file=None, opt_domains=None):
+ self.mapping = SoSHostnameMap(opt_domains)
+ self.short_names = []
+ super(SoSHostnameParser, self).__init__(conf_file)
+
+ def load_hostname_into_map(self, hostname_string):
+ """Force add the domainname found in /sos_commands/host/hostname into
+ the map. We have to do this here since the normal map prep approach
+ from the parser would be ignored since the system's hostname is not
+ guaranteed
+ """
+ if 'localhost' in hostname_string:
+ return
+ domains = hostname_string.split('.')
+ if len(domains) > 1:
+ self.short_names.append(domains[0])
+ else:
+ self.short_names.append(hostname_string)
+ if len(domains) > 3:
+ # make sure we implicitly get example.com if the system's hostname
+ # is something like foo.bar.example.com
+ high_domain = '.'.join(domains[-2:])
+ self.mapping.add(high_domain)
+ self.mapping.add(hostname_string)
+
+ def parse_line(self, line):
+ """Override the default parse_line() method to also check for the
+ shortname of the host derived from the hostname.
+ """
+ count = 0
+ line, count = super(SoSHostnameParser, self).parse_line(line)
+ for short_name in self.short_names:
+ if short_name in line:
+ count += 1
+ line = line.replace(short_name, self.mapping.get(short_name))
+ return line, count
diff --git a/tests/cleaner_tests.py b/tests/cleaner_tests.py
index 0c6ac589..4e292a05 100644
--- a/tests/cleaner_tests.py
+++ b/tests/cleaner_tests.py
@@ -11,8 +11,10 @@ import unittest
from ipaddress import ip_interface
from sos.cleaner.parsers.ip_parser import SoSIPParser
from sos.cleaner.parsers.mac_parser import SoSMacParser
+from sos.cleaner.parsers.hostname_parser import SoSHostnameParser
from sos.cleaner.mappings.ip_map import SoSIPMap
from sos.cleaner.mappings.mac_map import SoSMacMap
+from sos.cleaner.mappings.hostname_map import SoSHostnameMap
class CleanerMapTests(unittest.TestCase):
@@ -20,6 +22,7 @@ class CleanerMapTests(unittest.TestCase):
def setUp(self):
self.mac_map = SoSMacMap()
self.ip_map = SoSIPMap()
+ self.host_map = SoSHostnameMap(['redhat.com'])
def test_mac_map_obfuscate_valid_v4(self):
_test = self.mac_map.get('12:34:56:78:90:ab')
@@ -65,12 +68,33 @@ class CleanerMapTests(unittest.TestCase):
_test = self.ip_map.get('127.0.0.1')
self.assertEquals(_test, '127.0.0.1')
+ def test_hostname_obfuscate_domain_options(self):
+ _test = self.host_map.get('www.redhat.com')
+ self.assertNotEqual(_test, 'www.redhat.com')
+
+ def test_hostname_obfuscate_same_item(self):
+ _test1 = self.host_map.get('example.redhat.com')
+ _test2 = self.host_map.get('example.redhat.com')
+ self.assertEqual(_test1, _test2)
+
+ def test_hostname_obfuscate_just_domain(self):
+ _test = self.host_map.get('redhat.com')
+ self.assertEqual(_test, 'obfuscateddomain0.com')
+
+ def test_hostname_no_obfuscate_non_loaded_domain(self):
+ _test = self.host_map.get('foobar.com')
+ self.assertEqual(_test, 'foobar.com')
+
+ def test_hostname_no_obfuscate_non_loaded_fqdn(self):
+ _test = self.host_map.get('example.foobar.com')
+ self.assertEqual(_test, 'example.foobar.com')
class CleanerParserTests(unittest.TestCase):
def setUp(self):
self.ip_parser = SoSIPParser()
self.mac_parser = SoSMacParser()
+ self.host_parser = SoSHostnameParser(opt_domains='foobar.com')
def test_ip_parser_valid_ipv4_line(self):
line = 'foobar foo 10.0.0.1/24 barfoo bar'
@@ -95,3 +119,19 @@ class CleanerParserTests(unittest.TestCase):
line = 'foobar foo AA:BB:CC:FF:FE:DD:EE:FF bar barfoo'
_test = self.mac_parser.parse_line(line)[0]
self.assertNotEqual(line, _test)
+
+ def test_hostname_load_hostname_string(self):
+ fqdn = 'myhost.subnet.example.com'
+ self.host_parser.load_hostname_into_map(fqdn)
+
+ def test_hostname_valid_domain_line(self):
+ self.host_parser.load_hostname_into_map('myhost.subnet.example.com')
+ line = 'testing myhost.subnet.example.com in a string'
+ _test = self.host_parser.parse_line(line)[0]
+ self.assertNotEqual(line, _test)
+
+ def test_hostname_short_name_in_line(self):
+ self.host_parser.load_hostname_into_map('myhost.subnet.example.com')
+ line = 'testing just myhost in a line'
+ _test = self.host_parser.parse_line(line)[0]
+ self.assertNotEqual(line, _test)