diff options
-rw-r--r-- | plugins_overview.py | 35 | ||||
-rw-r--r-- | tests/unittests/archive_tests.py | 3 | ||||
-rw-r--r-- | tests/unittests/cleaner_tests.py | 82 | ||||
-rw-r--r-- | tests/unittests/conformance_tests.py | 9 | ||||
-rw-r--r-- | tests/unittests/plugin_tests.py | 15 | ||||
-rw-r--r-- | tests/unittests/policy_tests.py | 1 | ||||
-rw-r--r-- | tests/unittests/report_tests.py | 4 |
7 files changed, 101 insertions, 48 deletions
diff --git a/plugins_overview.py b/plugins_overview.py index 46bcbf5a..d847548c 100644 --- a/plugins_overview.py +++ b/plugins_overview.py @@ -8,7 +8,7 @@ # - list of paths it forbits to collect (add_forbidden_path) # - list of commands it calls (add_cmd_output) # -# Output of the script: +# Output of the script: # - a JSON object with plugins in keys # - or CSV format in case "csv" cmdline is provided # @@ -29,9 +29,11 @@ PLUGDIR = 'sos/report/plugins' plugs_data = {} # the map of all plugins data to collect plugcontent = '' # content of plugin file just being processed + # method to parse an item of a_s_c/a_c_o/.. methods # we work on an assumption the item is a string quoted by \" or optionally -# by \'. If we detect at least 2 such chars in the item, take what is between those. +# by \'. If we detect at least 2 such chars in the item, take what is between +# those. def add_valid_item(dest, item): for qoutemark in "\"\'": split = item.split(qoutemark) @@ -39,13 +41,14 @@ def add_valid_item(dest, item): dest.append(split[1]) return -# method to find in `plugcontent` all items of given method (a_c_s/a_c_o/..) split -# by comma; add each valid item to the `dest` list + +# method to find in `plugcontent` all items of given method (a_c_s/a_c_o/..) +# split by comma; add each valid item to the `dest` list def add_all_items(method, dest, wrapopen='\(', wrapclose='\)'): regexp = "%s%s(.*?)%s" % (method, wrapopen, wrapclose) for match in re.findall(regexp, plugcontent, flags=re.MULTILINE|re.DOTALL): # tuple of distros ended by either (class|from|import) - if isinstance(match,tuple): + if isinstance(match, tuple): for item in list(match): if item not in ['class', 'from', 'import']: for it in item.split(','): @@ -63,7 +66,8 @@ def add_all_items(method, dest, wrapopen='\(', wrapclose='\)'): else: add_valid_item(dest, match) -# main body: traverse report's plugins directory and for each plugin, grep for + +# main body: traverse report's plugins directory and for each plugin, grep for # add_copy_spec / add_forbidden_path / add_cmd_output there for plugfile in sorted(os.listdir(PLUGDIR)): # ignore non-py files and __init__.py @@ -73,7 +77,8 @@ for plugfile in sorted(os.listdir(PLUGDIR)): # if plugname != 'bcache': # continue plugs_data[plugname] = { - 'sourcecode': 'https://github.com/sosreport/sos/blob/main/sos/report/plugins/%s.py' % plugname, + 'sourcecode': 'https://github.com/sosreport/sos/blob/' + f'main/sos/report/plugins/{plugname}.py', 'distros': [], 'profiles': [], 'packages': [], @@ -84,8 +89,13 @@ for plugfile in sorted(os.listdir(PLUGDIR)): 'journals': [], 'env': [], } - plugcontent = open(os.path.join(PLUGDIR, plugfile)).read().replace('\n','') - add_all_items("from sos.report.plugins import ", plugs_data[plugname]['distros'], wrapopen='', wrapclose='(class|from|import)') + plugcontent = open(os.path.join(PLUGDIR, plugfile)).read().replace('\n', '') + add_all_items( + "from sos.report.plugins import ", + plugs_data[plugname]['distros'], + wrapopen='', + wrapclose='(class|from|import)' + ) add_all_items("profiles = ", plugs_data[plugname]['profiles'], wrapopen='') add_all_items("packages = ", plugs_data[plugname]['packages'], wrapopen='') add_all_items("add_copy_spec", plugs_data[plugname]['copyspecs']) @@ -98,7 +108,8 @@ for plugfile in sorted(os.listdir(PLUGDIR)): # print output; if "csv" is cmdline argument, print in CSV format, else JSON if (len(sys.argv) > 1) and (sys.argv[1] == "csv"): - print("plugin;url;distros;profiles;packages;copyspecs;forbidden;commands;service_status;journals;env_vars") + print("plugin;url;distros;profiles;packages;copyspecs;forbidden;commands;" + "service_status;journals;env_vars") for plugname in plugs_data.keys(): plugin = plugs_data[plugname] # determine max number of lines - usually "max(len(copyspec),len(commands))" @@ -109,10 +120,10 @@ if (len(sys.argv) > 1) and (sys.argv[1] == "csv"): for key in plugkeys: maxline = max(maxline, len(plugin[key])) for line in range(maxline): - out = ";" if line>0 else ("%s;%s" % (plugname, plugin['sourcecode'])) + out = ";" if line > 0 else f"{plugname};{plugin['sourcecode']}" for key in plugkeys: out += ";" - if line<len(plugin[key]): + if line < len(plugin[key]): out += plugin[key][line] print(out) else: diff --git a/tests/unittests/archive_tests.py b/tests/unittests/archive_tests.py index aac4edc7..d3d1809d 100644 --- a/tests/unittests/archive_tests.py +++ b/tests/unittests/archive_tests.py @@ -69,7 +69,8 @@ class TarFileArchiveTest(unittest.TestCase): # self.check_for_file('test/tests/ziptest') def test_add_renamed(self): - self.tf.add_file('tests/unittests/ziptest', dest='tests/unittests/ziptest_renamed') + self.tf.add_file('tests/unittests/ziptest', + dest='tests/unittests/ziptest_renamed') self.tf.finalize('auto') self.check_for_file('test/tests/unittests/ziptest_renamed') diff --git a/tests/unittests/cleaner_tests.py b/tests/unittests/cleaner_tests.py index eec84e9c..5dffb2fd 100644 --- a/tests/unittests/cleaner_tests.py +++ b/tests/unittests/cleaner_tests.py @@ -26,6 +26,7 @@ from sos.cleaner.preppers.ip import IPPrepper from sos.cleaner.archives.sos import SoSReportArchive from sos.options import SoSOptions + class CleanerMapTests(unittest.TestCase): def setUp(self): @@ -108,7 +109,8 @@ class CleanerMapTests(unittest.TestCase): _net = '2022:1104:abcd::' _ob_net = self.ipv6_map.get(_net) self.assertNotEqual(_net, _ob_net, 'Address was unchanged') - self.assertTrue(_ob_net.startswith('534f'), 'Global address does not start with identifier') + self.assertTrue(_ob_net.startswith('534f'), + 'Global address does not start with identifier') _host = '2022:1104:abcd::1234' _ob_host = self.ipv6_map.get(_host) self.assertNotEqual(_host, _ob_host, 'Host address was unchanged') @@ -117,7 +119,8 @@ class CleanerMapTests(unittest.TestCase): def test_ipv6_link_local(self): _test = 'fe80::1234' _ob_test = self.ipv6_map.get(_test) - self.assertTrue(_ob_test.startswith('fe80'), 'Link-local identifier not maintained') + self.assertTrue(_ob_test.startswith('fe80'), + 'Link-local identifier not maintained') self.assertNotEqual(_test, _ob_test, 'Device address was unchanged') def test_ipv6_private(self): @@ -125,20 +128,25 @@ class CleanerMapTests(unittest.TestCase): _host = 'fd00:abcd::1234' _ob_net = self.ipv6_map.get(_net).split('/')[0] _ob_host = self.ipv6_map.get(_host) - self.assertTrue(_ob_net.startswith('fd53'), 'Private network does not start with identifier') - self.assertTrue(_ob_host.startswith(_ob_net), 'Private address not in same network') + self.assertTrue(_ob_net.startswith('fd53'), + 'Private network does not start with identifier') + self.assertTrue(_ob_host.startswith(_ob_net), + 'Private address not in same network') self.assertNotEqual(_net, _ob_net, 'Private network was unchanged') def test_ipv6_short_network(self): _net = 'ff02::' _ob_net = self.ipv6_map.get(_net) - self.assertTrue(_ob_net.startswith(('53', '54')), f'Short network does not start with identifier: {_ob_net}') + self.assertTrue(_ob_net.startswith(('53', '54')), + f'Short network does not start with identifier: ' + f'{_ob_net}') def test_ipv6_consistent_obfuscation(self): _test = '2022:1104:abcd::ef09' _new = self.ipv6_map.get(_test) _second = self.ipv6_map.get(_test) - self.assertEqual(_new, _second, "Same address produced two different results") + self.assertEqual(_new, _second, + "Same address produced two different results") def test_ipv6_global_no_collision(self): """Tests that generating more than 256 global network obfuscations does @@ -146,9 +154,14 @@ class CleanerMapTests(unittest.TestCase): _nets = [] for i in range(1, 300): _nets.append(self.ipv6_map.get(f"f{i:03}::abcd").split('::')[0]) - # if there are any duplicates, then the length of the set will not match - self.assertTrue(len(set(_nets)) == len(_nets), "Duplicate global network obfuscations produced") - self.assertTrue(_nets[-1].startswith('54'), "First hextet of global network obfuscation over 256 not expected '54'") + # if there are any duplicates, then the length of the set will not + # match + self.assertTrue(len(set(_nets)) == len(_nets), + "Duplicate global network obfuscations produced") + self.assertTrue(_nets[-1].startswith('54'), + "First hextet of global network obfuscation over 256" + " not expected '54'") + class CleanerParserTests(unittest.TestCase): @@ -163,7 +176,7 @@ class CleanerParserTests(unittest.TestCase): self.kw_parser_none = SoSKeywordParser(config={}) self.kw_parser.generate_item_regexes() self.uname_parser = SoSUsernameParser(config={}) - self.uname_parser.mapping.add('DOMAIN\myusername') + self.uname_parser.mapping.add('DOMAIN\\myusername') self.uname_parser.mapping.add('foo') def test_ip_parser_valid_ipv4_line(self): @@ -261,28 +274,37 @@ class CleanerParserTests(unittest.TestCase): t1 = 'testing abcd:ef01::1234 as a compressed address' t2 = 'testing abcd:ef01::5678:1234 as a separate address' t3 = 'testing 2607:c540:8c00:3318::34/64 as another address' - t4 = 'testing 2007:1234:5678:90ab:0987:6543:21fe:dcba as a full address' + t4 = ('testing 2007:1234:5678:90ab:0987:6543:21fe:dcba as a full ' + 'address') t1_test = self.ipv6_parser.parse_line(t1)[0] t2_test = self.ipv6_parser.parse_line(t2)[0] t3_test = self.ipv6_parser.parse_line(t3)[0] t4_test = self.ipv6_parser.parse_line(t4)[0] - self.assertNotEqual(t1, t1_test, f"Parser did not match and obfuscate '{t1}'") - self.assertNotEqual(t2, t2_test, f"Parser did not match and obfuscate '{t2}'") - self.assertNotEqual(t3, t3_test, f"Parser did not match and obfuscate '{t3}'") - self.assertNotEqual(t4, t4_test, f"Parser did not match and obfuscate '{t4}'") + self.assertNotEqual(t1, t1_test, + f"Parser did not match and obfuscate '{t1}'") + self.assertNotEqual(t2, t2_test, + f"Parser did not match and obfuscate '{t2}'") + self.assertNotEqual(t3, t3_test, + f"Parser did not match and obfuscate '{t3}'") + self.assertNotEqual(t4, t4_test, + f"Parser did not match and obfuscate '{t4}'") def test_ipv6_no_match_signature(self): modstr = '2D:4F:6E:55:4F:E8:5E:D2:D2:A3:73:62:AB:FD:F9:C5:A5:53:31:93' mod_test = self.ipv6_parser.parse_line(modstr)[0] - self.assertEqual(modstr, mod_test, "Parser matched module signature, and should not") + self.assertEqual(modstr, mod_test, + "Parser matched module signature, and should not") def test_ipv6_no_match_log_false_positive(self): - logln = 'Automatically imported trusted_ca::ca from trusted_ca/ca into production' + logln = ('Automatically imported trusted_ca::ca from trusted_ca/ca' + ' into production') log_test = self.ipv6_parser.parse_line(logln)[0] - self.assertEqual(logln, log_test, "IPv6 parser incorrectly matched a log line of 'trusted_ca::ca'") + self.assertEqual(logln, log_test, + "IPv6 parser incorrectly matched a log line of" + "'trusted_ca::ca'") def test_ad_username(self): - line = "DOMAIN\myusername" + line = "DOMAIN\\myusername" _test = self.uname_parser.parse_line(line)[0] self.assertNotEqual(line, _test) @@ -301,24 +323,34 @@ class PrepperTests(unittest.TestCase): def setUp(self): self.prepper = SoSPrepper(SoSOptions()) self.archive = SoSReportArchive( - archive_path='tests/test_data/sosreport-cleanertest-2021-08-03-qpkxdid.tar.xz', + archive_path='tests/test_data/' + 'sosreport-cleanertest-2021-08-03-qpkxdid.tar.xz', tmpdir='/tmp' ) self.host_prepper = HostnamePrepper(SoSOptions(domains=[])) self.ipv4_prepper = IPPrepper(SoSOptions()) def test_parser_method_translation(self): - self.assertEqual([], self.prepper.get_parser_file_list('hostname', None)) + self.assertEqual([], + self.prepper.get_parser_file_list('hostname', None)) def test_mapping_method_translation(self): self.assertEqual([], self.prepper.get_items_for_map('foobar', None)) def test_hostname_prepper_map_items(self): - self.assertEqual(['cleanertest'], self.host_prepper.get_items_for_map('hostname', self.archive)) + self.assertEqual( + ['cleanertest'], + self.host_prepper.get_items_for_map('hostname', self.archive) + ) def test_ipv4_prepper_parser_files(self): - self.assertEqual(['sos_commands/networking/ip_-o_addr'], self.ipv4_prepper.get_parser_file_list('ip', self.archive)) + self.assertEqual( + ['sos_commands/networking/ip_-o_addr'], + self.ipv4_prepper.get_parser_file_list('ip', self.archive) + ) def test_ipv4_prepper_invalid_parser_files(self): - self.assertEqual([], self.ipv4_prepper.get_parser_file_list('foobar', self.archive)) - + self.assertEqual( + [], + self.ipv4_prepper.get_parser_file_list('foobar', self.archive) + ) diff --git a/tests/unittests/conformance_tests.py b/tests/unittests/conformance_tests.py index 5419f9d3..137121ea 100644 --- a/tests/unittests/conformance_tests.py +++ b/tests/unittests/conformance_tests.py @@ -26,7 +26,8 @@ class PluginConformance(unittest.TestCase): def test_plugin_tuples_set_correctly(self): for plug in self.plug_classes: - for tup in ['packages', 'commands', 'files', 'profiles', 'kernel_mods', 'containers']: + for tup in ['packages', 'commands', 'files', 'profiles', + 'kernel_mods', 'containers']: _attr = getattr(plug, tup) self.assertIsInstance( _attr, tuple, @@ -35,14 +36,16 @@ class PluginConformance(unittest.TestCase): def test_plugin_description_is_str(self): for plug in self.plug_classes: - self.assertIsInstance(plug.short_desc, str, "%s name not string" % plug.__name__) + self.assertIsInstance(plug.short_desc, str, + "%s name not string" % plug.__name__) # make sure the description is not empty self.assertNotEqual(plug.short_desc, '', "%s description unset" % plug.__name__) def test_plugin_name_is_str(self): for plug in self.plug_classes: - self.assertIsInstance(plug.plugin_name, str, "%s name not string" % plug.__name__) + self.assertIsInstance(plug.plugin_name, str, + "%s name not string" % plug.__name__) self.assertNotEqual(plug.plugin_name, '', "%s name unset" % plug.__name__) diff --git a/tests/unittests/plugin_tests.py b/tests/unittests/plugin_tests.py index acc2df3a..4be93780 100644 --- a/tests/unittests/plugin_tests.py +++ b/tests/unittests/plugin_tests.py @@ -13,7 +13,8 @@ import random from io import StringIO -from sos.report.plugins import Plugin, regex_findall, _mangle_command, PluginOpt +from sos.report.plugins import (Plugin, regex_findall, + _mangle_command, PluginOpt) from sos.archive import TarFileArchive from sos.policies.distros import LinuxPolicy from sos.policies.init_systems import InitSystem @@ -213,7 +214,7 @@ class PluginTests(unittest.TestCase): 'devices': {} }) self.assertEqual(p.get_description(), - "This plugin has a description.") + "This plugin has a description.") def test_set_plugin_option(self): p = MockPlugin({ @@ -394,11 +395,17 @@ class AddCopySpecTests(unittest.TestCase): shutil.rmtree(tmpdir) def test_multiple_files_no_limit(self): - self.mp.add_copy_spec(['tests/unittests/tail_test.txt', 'tests/unittests/test.txt']) + self.mp.add_copy_spec([ + 'tests/unittests/tail_test.txt', + 'tests/unittests/test.txt', + ]) self.assertEqual(len(self.mp.copy_paths), 2) def test_multiple_files_under_limit(self): - self.mp.add_copy_spec(['tests/unittests/tail_test.txt', 'tests/unittests/test.txt'], 1) + self.mp.add_copy_spec([ + 'tests/unittests/tail_test.txt', + 'tests/unittests/test.txt', + ], 1) self.assertEqual(len(self.mp.copy_paths), 2) diff --git a/tests/unittests/policy_tests.py b/tests/unittests/policy_tests.py index 02d5ae30..1cefaab7 100644 --- a/tests/unittests/policy_tests.py +++ b/tests/unittests/policy_tests.py @@ -44,7 +44,6 @@ class FauxDebianPlugin(Plugin, DebianPlugin): class PolicyTests(unittest.TestCase): - def test_independent_only(self): p = FauxPolicy() p.valid_subclasses = [] diff --git a/tests/unittests/report_tests.py b/tests/unittests/report_tests.py index 6e202cef..f9ea2daa 100644 --- a/tests/unittests/report_tests.py +++ b/tests/unittests/report_tests.py @@ -79,13 +79,13 @@ class TestPlainReport(unittest.TestCase): def test_basic(self): self.assertEqual(self.pluglist.format(pluglist=""), - PlainTextReport(self.report).unicode()) + PlainTextReport(self.report).unicode()) def test_one_section(self): self.report.add(self.section) self.assertEqual(self.defaultheader, - PlainTextReport(self.report).unicode() + '\n') + PlainTextReport(self.report).unicode() + '\n') def test_two_sections(self): section1 = Section(name="first") |