aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins_overview.py35
-rw-r--r--tests/unittests/archive_tests.py3
-rw-r--r--tests/unittests/cleaner_tests.py82
-rw-r--r--tests/unittests/conformance_tests.py9
-rw-r--r--tests/unittests/plugin_tests.py15
-rw-r--r--tests/unittests/policy_tests.py1
-rw-r--r--tests/unittests/report_tests.py4
7 files changed, 101 insertions, 48 deletions
diff --git a/plugins_overview.py b/plugins_overview.py
index 46bcbf5a..d847548c 100644
--- a/plugins_overview.py
+++ b/plugins_overview.py
@@ -8,7 +8,7 @@
# - list of paths it forbits to collect (add_forbidden_path)
# - list of commands it calls (add_cmd_output)
#
-# Output of the script:
+# Output of the script:
# - a JSON object with plugins in keys
# - or CSV format in case "csv" cmdline is provided
#
@@ -29,9 +29,11 @@ PLUGDIR = 'sos/report/plugins'
plugs_data = {} # the map of all plugins data to collect
plugcontent = '' # content of plugin file just being processed
+
# method to parse an item of a_s_c/a_c_o/.. methods
# we work on an assumption the item is a string quoted by \" or optionally
-# by \'. If we detect at least 2 such chars in the item, take what is between those.
+# by \'. If we detect at least 2 such chars in the item, take what is between
+# those.
def add_valid_item(dest, item):
for qoutemark in "\"\'":
split = item.split(qoutemark)
@@ -39,13 +41,14 @@ def add_valid_item(dest, item):
dest.append(split[1])
return
-# method to find in `plugcontent` all items of given method (a_c_s/a_c_o/..) split
-# by comma; add each valid item to the `dest` list
+
+# method to find in `plugcontent` all items of given method (a_c_s/a_c_o/..)
+# split by comma; add each valid item to the `dest` list
def add_all_items(method, dest, wrapopen='\(', wrapclose='\)'):
regexp = "%s%s(.*?)%s" % (method, wrapopen, wrapclose)
for match in re.findall(regexp, plugcontent, flags=re.MULTILINE|re.DOTALL):
# tuple of distros ended by either (class|from|import)
- if isinstance(match,tuple):
+ if isinstance(match, tuple):
for item in list(match):
if item not in ['class', 'from', 'import']:
for it in item.split(','):
@@ -63,7 +66,8 @@ def add_all_items(method, dest, wrapopen='\(', wrapclose='\)'):
else:
add_valid_item(dest, match)
-# main body: traverse report's plugins directory and for each plugin, grep for
+
+# main body: traverse report's plugins directory and for each plugin, grep for
# add_copy_spec / add_forbidden_path / add_cmd_output there
for plugfile in sorted(os.listdir(PLUGDIR)):
# ignore non-py files and __init__.py
@@ -73,7 +77,8 @@ for plugfile in sorted(os.listdir(PLUGDIR)):
# if plugname != 'bcache':
# continue
plugs_data[plugname] = {
- 'sourcecode': 'https://github.com/sosreport/sos/blob/main/sos/report/plugins/%s.py' % plugname,
+ 'sourcecode': 'https://github.com/sosreport/sos/blob/'
+ f'main/sos/report/plugins/{plugname}.py',
'distros': [],
'profiles': [],
'packages': [],
@@ -84,8 +89,13 @@ for plugfile in sorted(os.listdir(PLUGDIR)):
'journals': [],
'env': [],
}
- plugcontent = open(os.path.join(PLUGDIR, plugfile)).read().replace('\n','')
- add_all_items("from sos.report.plugins import ", plugs_data[plugname]['distros'], wrapopen='', wrapclose='(class|from|import)')
+ plugcontent = open(os.path.join(PLUGDIR, plugfile)).read().replace('\n', '')
+ add_all_items(
+ "from sos.report.plugins import ",
+ plugs_data[plugname]['distros'],
+ wrapopen='',
+ wrapclose='(class|from|import)'
+ )
add_all_items("profiles = ", plugs_data[plugname]['profiles'], wrapopen='')
add_all_items("packages = ", plugs_data[plugname]['packages'], wrapopen='')
add_all_items("add_copy_spec", plugs_data[plugname]['copyspecs'])
@@ -98,7 +108,8 @@ for plugfile in sorted(os.listdir(PLUGDIR)):
# print output; if "csv" is cmdline argument, print in CSV format, else JSON
if (len(sys.argv) > 1) and (sys.argv[1] == "csv"):
- print("plugin;url;distros;profiles;packages;copyspecs;forbidden;commands;service_status;journals;env_vars")
+ print("plugin;url;distros;profiles;packages;copyspecs;forbidden;commands;"
+ "service_status;journals;env_vars")
for plugname in plugs_data.keys():
plugin = plugs_data[plugname]
# determine max number of lines - usually "max(len(copyspec),len(commands))"
@@ -109,10 +120,10 @@ if (len(sys.argv) > 1) and (sys.argv[1] == "csv"):
for key in plugkeys:
maxline = max(maxline, len(plugin[key]))
for line in range(maxline):
- out = ";" if line>0 else ("%s;%s" % (plugname, plugin['sourcecode']))
+ out = ";" if line > 0 else f"{plugname};{plugin['sourcecode']}"
for key in plugkeys:
out += ";"
- if line<len(plugin[key]):
+ if line < len(plugin[key]):
out += plugin[key][line]
print(out)
else:
diff --git a/tests/unittests/archive_tests.py b/tests/unittests/archive_tests.py
index aac4edc7..d3d1809d 100644
--- a/tests/unittests/archive_tests.py
+++ b/tests/unittests/archive_tests.py
@@ -69,7 +69,8 @@ class TarFileArchiveTest(unittest.TestCase):
# self.check_for_file('test/tests/ziptest')
def test_add_renamed(self):
- self.tf.add_file('tests/unittests/ziptest', dest='tests/unittests/ziptest_renamed')
+ self.tf.add_file('tests/unittests/ziptest',
+ dest='tests/unittests/ziptest_renamed')
self.tf.finalize('auto')
self.check_for_file('test/tests/unittests/ziptest_renamed')
diff --git a/tests/unittests/cleaner_tests.py b/tests/unittests/cleaner_tests.py
index eec84e9c..5dffb2fd 100644
--- a/tests/unittests/cleaner_tests.py
+++ b/tests/unittests/cleaner_tests.py
@@ -26,6 +26,7 @@ from sos.cleaner.preppers.ip import IPPrepper
from sos.cleaner.archives.sos import SoSReportArchive
from sos.options import SoSOptions
+
class CleanerMapTests(unittest.TestCase):
def setUp(self):
@@ -108,7 +109,8 @@ class CleanerMapTests(unittest.TestCase):
_net = '2022:1104:abcd::'
_ob_net = self.ipv6_map.get(_net)
self.assertNotEqual(_net, _ob_net, 'Address was unchanged')
- self.assertTrue(_ob_net.startswith('534f'), 'Global address does not start with identifier')
+ self.assertTrue(_ob_net.startswith('534f'),
+ 'Global address does not start with identifier')
_host = '2022:1104:abcd::1234'
_ob_host = self.ipv6_map.get(_host)
self.assertNotEqual(_host, _ob_host, 'Host address was unchanged')
@@ -117,7 +119,8 @@ class CleanerMapTests(unittest.TestCase):
def test_ipv6_link_local(self):
_test = 'fe80::1234'
_ob_test = self.ipv6_map.get(_test)
- self.assertTrue(_ob_test.startswith('fe80'), 'Link-local identifier not maintained')
+ self.assertTrue(_ob_test.startswith('fe80'),
+ 'Link-local identifier not maintained')
self.assertNotEqual(_test, _ob_test, 'Device address was unchanged')
def test_ipv6_private(self):
@@ -125,20 +128,25 @@ class CleanerMapTests(unittest.TestCase):
_host = 'fd00:abcd::1234'
_ob_net = self.ipv6_map.get(_net).split('/')[0]
_ob_host = self.ipv6_map.get(_host)
- self.assertTrue(_ob_net.startswith('fd53'), 'Private network does not start with identifier')
- self.assertTrue(_ob_host.startswith(_ob_net), 'Private address not in same network')
+ self.assertTrue(_ob_net.startswith('fd53'),
+ 'Private network does not start with identifier')
+ self.assertTrue(_ob_host.startswith(_ob_net),
+ 'Private address not in same network')
self.assertNotEqual(_net, _ob_net, 'Private network was unchanged')
def test_ipv6_short_network(self):
_net = 'ff02::'
_ob_net = self.ipv6_map.get(_net)
- self.assertTrue(_ob_net.startswith(('53', '54')), f'Short network does not start with identifier: {_ob_net}')
+ self.assertTrue(_ob_net.startswith(('53', '54')),
+ f'Short network does not start with identifier: '
+ f'{_ob_net}')
def test_ipv6_consistent_obfuscation(self):
_test = '2022:1104:abcd::ef09'
_new = self.ipv6_map.get(_test)
_second = self.ipv6_map.get(_test)
- self.assertEqual(_new, _second, "Same address produced two different results")
+ self.assertEqual(_new, _second,
+ "Same address produced two different results")
def test_ipv6_global_no_collision(self):
"""Tests that generating more than 256 global network obfuscations does
@@ -146,9 +154,14 @@ class CleanerMapTests(unittest.TestCase):
_nets = []
for i in range(1, 300):
_nets.append(self.ipv6_map.get(f"f{i:03}::abcd").split('::')[0])
- # if there are any duplicates, then the length of the set will not match
- self.assertTrue(len(set(_nets)) == len(_nets), "Duplicate global network obfuscations produced")
- self.assertTrue(_nets[-1].startswith('54'), "First hextet of global network obfuscation over 256 not expected '54'")
+ # if there are any duplicates, then the length of the set will not
+ # match
+ self.assertTrue(len(set(_nets)) == len(_nets),
+ "Duplicate global network obfuscations produced")
+ self.assertTrue(_nets[-1].startswith('54'),
+ "First hextet of global network obfuscation over 256"
+ " not expected '54'")
+
class CleanerParserTests(unittest.TestCase):
@@ -163,7 +176,7 @@ class CleanerParserTests(unittest.TestCase):
self.kw_parser_none = SoSKeywordParser(config={})
self.kw_parser.generate_item_regexes()
self.uname_parser = SoSUsernameParser(config={})
- self.uname_parser.mapping.add('DOMAIN\myusername')
+ self.uname_parser.mapping.add('DOMAIN\\myusername')
self.uname_parser.mapping.add('foo')
def test_ip_parser_valid_ipv4_line(self):
@@ -261,28 +274,37 @@ class CleanerParserTests(unittest.TestCase):
t1 = 'testing abcd:ef01::1234 as a compressed address'
t2 = 'testing abcd:ef01::5678:1234 as a separate address'
t3 = 'testing 2607:c540:8c00:3318::34/64 as another address'
- t4 = 'testing 2007:1234:5678:90ab:0987:6543:21fe:dcba as a full address'
+ t4 = ('testing 2007:1234:5678:90ab:0987:6543:21fe:dcba as a full '
+ 'address')
t1_test = self.ipv6_parser.parse_line(t1)[0]
t2_test = self.ipv6_parser.parse_line(t2)[0]
t3_test = self.ipv6_parser.parse_line(t3)[0]
t4_test = self.ipv6_parser.parse_line(t4)[0]
- self.assertNotEqual(t1, t1_test, f"Parser did not match and obfuscate '{t1}'")
- self.assertNotEqual(t2, t2_test, f"Parser did not match and obfuscate '{t2}'")
- self.assertNotEqual(t3, t3_test, f"Parser did not match and obfuscate '{t3}'")
- self.assertNotEqual(t4, t4_test, f"Parser did not match and obfuscate '{t4}'")
+ self.assertNotEqual(t1, t1_test,
+ f"Parser did not match and obfuscate '{t1}'")
+ self.assertNotEqual(t2, t2_test,
+ f"Parser did not match and obfuscate '{t2}'")
+ self.assertNotEqual(t3, t3_test,
+ f"Parser did not match and obfuscate '{t3}'")
+ self.assertNotEqual(t4, t4_test,
+ f"Parser did not match and obfuscate '{t4}'")
def test_ipv6_no_match_signature(self):
modstr = '2D:4F:6E:55:4F:E8:5E:D2:D2:A3:73:62:AB:FD:F9:C5:A5:53:31:93'
mod_test = self.ipv6_parser.parse_line(modstr)[0]
- self.assertEqual(modstr, mod_test, "Parser matched module signature, and should not")
+ self.assertEqual(modstr, mod_test,
+ "Parser matched module signature, and should not")
def test_ipv6_no_match_log_false_positive(self):
- logln = 'Automatically imported trusted_ca::ca from trusted_ca/ca into production'
+ logln = ('Automatically imported trusted_ca::ca from trusted_ca/ca'
+ ' into production')
log_test = self.ipv6_parser.parse_line(logln)[0]
- self.assertEqual(logln, log_test, "IPv6 parser incorrectly matched a log line of 'trusted_ca::ca'")
+ self.assertEqual(logln, log_test,
+ "IPv6 parser incorrectly matched a log line of"
+ "'trusted_ca::ca'")
def test_ad_username(self):
- line = "DOMAIN\myusername"
+ line = "DOMAIN\\myusername"
_test = self.uname_parser.parse_line(line)[0]
self.assertNotEqual(line, _test)
@@ -301,24 +323,34 @@ class PrepperTests(unittest.TestCase):
def setUp(self):
self.prepper = SoSPrepper(SoSOptions())
self.archive = SoSReportArchive(
- archive_path='tests/test_data/sosreport-cleanertest-2021-08-03-qpkxdid.tar.xz',
+ archive_path='tests/test_data/'
+ 'sosreport-cleanertest-2021-08-03-qpkxdid.tar.xz',
tmpdir='/tmp'
)
self.host_prepper = HostnamePrepper(SoSOptions(domains=[]))
self.ipv4_prepper = IPPrepper(SoSOptions())
def test_parser_method_translation(self):
- self.assertEqual([], self.prepper.get_parser_file_list('hostname', None))
+ self.assertEqual([],
+ self.prepper.get_parser_file_list('hostname', None))
def test_mapping_method_translation(self):
self.assertEqual([], self.prepper.get_items_for_map('foobar', None))
def test_hostname_prepper_map_items(self):
- self.assertEqual(['cleanertest'], self.host_prepper.get_items_for_map('hostname', self.archive))
+ self.assertEqual(
+ ['cleanertest'],
+ self.host_prepper.get_items_for_map('hostname', self.archive)
+ )
def test_ipv4_prepper_parser_files(self):
- self.assertEqual(['sos_commands/networking/ip_-o_addr'], self.ipv4_prepper.get_parser_file_list('ip', self.archive))
+ self.assertEqual(
+ ['sos_commands/networking/ip_-o_addr'],
+ self.ipv4_prepper.get_parser_file_list('ip', self.archive)
+ )
def test_ipv4_prepper_invalid_parser_files(self):
- self.assertEqual([], self.ipv4_prepper.get_parser_file_list('foobar', self.archive))
-
+ self.assertEqual(
+ [],
+ self.ipv4_prepper.get_parser_file_list('foobar', self.archive)
+ )
diff --git a/tests/unittests/conformance_tests.py b/tests/unittests/conformance_tests.py
index 5419f9d3..137121ea 100644
--- a/tests/unittests/conformance_tests.py
+++ b/tests/unittests/conformance_tests.py
@@ -26,7 +26,8 @@ class PluginConformance(unittest.TestCase):
def test_plugin_tuples_set_correctly(self):
for plug in self.plug_classes:
- for tup in ['packages', 'commands', 'files', 'profiles', 'kernel_mods', 'containers']:
+ for tup in ['packages', 'commands', 'files', 'profiles',
+ 'kernel_mods', 'containers']:
_attr = getattr(plug, tup)
self.assertIsInstance(
_attr, tuple,
@@ -35,14 +36,16 @@ class PluginConformance(unittest.TestCase):
def test_plugin_description_is_str(self):
for plug in self.plug_classes:
- self.assertIsInstance(plug.short_desc, str, "%s name not string" % plug.__name__)
+ self.assertIsInstance(plug.short_desc, str,
+ "%s name not string" % plug.__name__)
# make sure the description is not empty
self.assertNotEqual(plug.short_desc, '',
"%s description unset" % plug.__name__)
def test_plugin_name_is_str(self):
for plug in self.plug_classes:
- self.assertIsInstance(plug.plugin_name, str, "%s name not string" % plug.__name__)
+ self.assertIsInstance(plug.plugin_name, str,
+ "%s name not string" % plug.__name__)
self.assertNotEqual(plug.plugin_name, '',
"%s name unset" % plug.__name__)
diff --git a/tests/unittests/plugin_tests.py b/tests/unittests/plugin_tests.py
index acc2df3a..4be93780 100644
--- a/tests/unittests/plugin_tests.py
+++ b/tests/unittests/plugin_tests.py
@@ -13,7 +13,8 @@ import random
from io import StringIO
-from sos.report.plugins import Plugin, regex_findall, _mangle_command, PluginOpt
+from sos.report.plugins import (Plugin, regex_findall,
+ _mangle_command, PluginOpt)
from sos.archive import TarFileArchive
from sos.policies.distros import LinuxPolicy
from sos.policies.init_systems import InitSystem
@@ -213,7 +214,7 @@ class PluginTests(unittest.TestCase):
'devices': {}
})
self.assertEqual(p.get_description(),
- "This plugin has a description.")
+ "This plugin has a description.")
def test_set_plugin_option(self):
p = MockPlugin({
@@ -394,11 +395,17 @@ class AddCopySpecTests(unittest.TestCase):
shutil.rmtree(tmpdir)
def test_multiple_files_no_limit(self):
- self.mp.add_copy_spec(['tests/unittests/tail_test.txt', 'tests/unittests/test.txt'])
+ self.mp.add_copy_spec([
+ 'tests/unittests/tail_test.txt',
+ 'tests/unittests/test.txt',
+ ])
self.assertEqual(len(self.mp.copy_paths), 2)
def test_multiple_files_under_limit(self):
- self.mp.add_copy_spec(['tests/unittests/tail_test.txt', 'tests/unittests/test.txt'], 1)
+ self.mp.add_copy_spec([
+ 'tests/unittests/tail_test.txt',
+ 'tests/unittests/test.txt',
+ ], 1)
self.assertEqual(len(self.mp.copy_paths), 2)
diff --git a/tests/unittests/policy_tests.py b/tests/unittests/policy_tests.py
index 02d5ae30..1cefaab7 100644
--- a/tests/unittests/policy_tests.py
+++ b/tests/unittests/policy_tests.py
@@ -44,7 +44,6 @@ class FauxDebianPlugin(Plugin, DebianPlugin):
class PolicyTests(unittest.TestCase):
-
def test_independent_only(self):
p = FauxPolicy()
p.valid_subclasses = []
diff --git a/tests/unittests/report_tests.py b/tests/unittests/report_tests.py
index 6e202cef..f9ea2daa 100644
--- a/tests/unittests/report_tests.py
+++ b/tests/unittests/report_tests.py
@@ -79,13 +79,13 @@ class TestPlainReport(unittest.TestCase):
def test_basic(self):
self.assertEqual(self.pluglist.format(pluglist=""),
- PlainTextReport(self.report).unicode())
+ PlainTextReport(self.report).unicode())
def test_one_section(self):
self.report.add(self.section)
self.assertEqual(self.defaultheader,
- PlainTextReport(self.report).unicode() + '\n')
+ PlainTextReport(self.report).unicode() + '\n')
def test_two_sections(self):
section1 = Section(name="first")