aboutsummaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/cleaner_tests/basic_function_tests/binary_test.py1
-rw-r--r--tests/cleaner_tests/basic_function_tests/report_with_mask.py55
-rw-r--r--tests/cleaner_tests/existing_archive.py63
-rw-r--r--tests/cleaner_tests/full_report/full_report_run.py18
-rw-r--r--tests/cleaner_tests/help_output_tests.py3
-rw-r--r--tests/cleaner_tests/ipv6_test/ipv6_test.py11
-rw-r--r--tests/cleaner_tests/report_disabled_parsers.py32
-rw-r--r--tests/cleaner_tests/skip_versioning/skip_version_ip_parser.py1
-rw-r--r--tests/collect_tests/help_output_tests.py39
-rw-r--r--tests/product_tests/foreman/foreman_tests.py37
-rw-r--r--tests/report_tests/__init__.py1
-rw-r--r--tests/report_tests/basic_report_tests.py15
-rw-r--r--tests/report_tests/command_priority_tests.py7
-rw-r--r--tests/report_tests/encryption_tests.py10
-rw-r--r--tests/report_tests/exception_tests.py6
-rw-r--r--tests/report_tests/help_output_tests.py4
-rw-r--r--tests/report_tests/low_priority_tests.py5
-rw-r--r--tests/report_tests/options_tests/options_tests.py12
-rw-r--r--tests/report_tests/plugin_tests/collect_manual_tests.py8
-rw-r--r--tests/report_tests/plugin_tests/defaults.py7
-rw-r--r--tests/report_tests/plugin_tests/krb5.py1
-rw-r--r--tests/report_tests/plugin_tests/logs.py21
-rw-r--r--tests/report_tests/plugin_tests/sos_extras/sos_extras.py6
-rw-r--r--tests/report_tests/plugin_tests/sudo/sudo.py1
-rw-r--r--tests/report_tests/plugin_tests/teamd.py8
-rw-r--r--tests/report_tests/smoke_tests.py12
-rw-r--r--tests/report_tests/timeout/timeout_test.py1
-rw-r--r--tests/report_tests/timeout/timeout_tests.py8
-rw-r--r--tests/sos_tests.py145
-rw-r--r--tests/vendor_tests/redhat/rhbz1950350/rhbz1950350.py15
-rw-r--r--tests/vendor_tests/redhat/rhbz1965001.py2
-rw-r--r--tests/vendor_tests/redhat/rhbz2018033/rhbz2018033.py3
32 files changed, 399 insertions, 159 deletions
diff --git a/tests/cleaner_tests/basic_function_tests/binary_test.py b/tests/cleaner_tests/basic_function_tests/binary_test.py
index 80bc841b..3fdbbd69 100644
--- a/tests/cleaner_tests/basic_function_tests/binary_test.py
+++ b/tests/cleaner_tests/basic_function_tests/binary_test.py
@@ -16,6 +16,5 @@ class BinaryPlugin(Plugin, IndependentPlugin):
plugin_name = 'binary_test'
short_desc = 'test plugin for removing binaries with --clean'
-
def setup(self):
self.add_copy_spec('/var/log/binary_test.tar.xz')
diff --git a/tests/cleaner_tests/basic_function_tests/report_with_mask.py b/tests/cleaner_tests/basic_function_tests/report_with_mask.py
index bb72cdde..2b694554 100644
--- a/tests/cleaner_tests/basic_function_tests/report_with_mask.py
+++ b/tests/cleaner_tests/basic_function_tests/report_with_mask.py
@@ -25,7 +25,8 @@ class ReportWithMask(StageOneReportTest):
# obfuscate a random word from /etc/hosts and ensure the updated
# sanitised file has same permissions (a+r)
try:
- self.hosts_obfuscated = open('/etc/hosts').read().strip('#\n').split()[-1]
+ self.hosts_obfuscated = open(
+ '/etc/hosts').read().strip('#\n').split()[-1]
except (FileNotFoundError, IndexError) as e:
self.warning(f"Unable to process /etc/hosts: {e}")
if self.hosts_obfuscated:
@@ -36,8 +37,10 @@ class ReportWithMask(StageOneReportTest):
self.assertOutputContains('Obfuscation completed')
def test_private_map_was_generated(self):
- self.assertOutputContains('A mapping of obfuscated elements is available at')
- map_file = re.findall('/.*sosreport-.*-private_map', self.cmd_output.stdout)[-1]
+ self.assertOutputContains(
+ 'A mapping of obfuscated elements is available at')
+ map_file = re.findall(
+ '/.*sosreport-.*-private_map', self.cmd_output.stdout)[-1]
self.assertFileExists(map_file)
def test_tarball_named_obfuscated(self):
@@ -53,40 +56,53 @@ class ReportWithMask(StageOneReportTest):
# Note: do not test for starting with the 100.* block here, as test
# machines may have /32 addresses. Instead, test that the actual
# IP address is not present
- self.assertFileNotHasContent('ip_addr', self.sysinfo['pre']['networking']['ip_addr'])
+ self.assertFileNotHasContent(
+ 'ip_addr',
+ self.sysinfo['pre']['networking']['ip_addr']
+ )
def test_loopback_was_not_obfuscated(self):
self.assertFileHasContent('ip_addr', '127.0.0.1/8')
def test_mac_addrs_were_obfuscated(self):
- content = self.get_file_content('sos_commands/networking/ip_maddr_show')
+ content = self.get_file_content(
+ 'sos_commands/networking/ip_maddr_show'
+ )
for line in content.splitlines():
if line.strip().startswith('link'):
mac = line.strip().split()[1]
- assert mac.startswith('53:4f:53'), "Found unobfuscated mac addr %s" % mac
+ assert \
+ mac.startswith('53:4f:53'), \
+ f"Found unobfuscated mac addr {mac}"
def test_perms_unchanged_on_modified_file(self):
if self.hosts_obfuscated:
imode_orig = stat('/etc/hosts').st_mode
- imode_obfuscated = stat(self.get_name_in_archive('etc/hosts')).st_mode
+ imode_obfuscated = stat(
+ self.get_name_in_archive('etc/hosts')).st_mode
self.assertEqual(imode_orig, imode_obfuscated)
class ReportWithUserCustomisations(StageOneReportTest):
- """Testing for 1) obfuscated keywords provided by the user (--keywords option),
- and 2) skipping to clean specific files (--skip-cleaning-files option)
+ """Testing for 1) obfuscated keywords provided by the user (--keywords
+ option), and 2) skipping to clean specific files (--skip-cleaning-files
+ option)
:avocado: tags=stageone
"""
- sos_cmd = '--clean -o filesys,kernel --keywords=fstab,Linux,tmp,BOOT_IMAGE,fs.dentry-state \
- --skip-cleaning-files proc/cmdline,sos_commands/*/sysctl* --no-update'
+ sos_cmd = ('--clean -o filesys,kernel --keywords=fstab,Linux,tmp,'
+ 'BOOT_IMAGE,fs.dentry-state --skip-cleaning-files '
+ 'proc/cmdline,sos_commands/*/sysctl* --no-update')
- # Will the 'tmp' be properly treated in path to working dir without raising an error?
- # To make this test effective, we assume the test runs on a system / with Policy
- # returning '/var/tmp' as temp.dir
+ # Will the 'tmp' be properly treated in path to working dir without
+ # raising an error?
+ # To make this test effective, we assume the test runs on a system / with
+ # Policy returning '/var/tmp' as temp.dir
def test_keyword_in_tempdir_path(self):
- self.assertOutputContains('Your sosreport has been generated and saved in:')
+ self.assertOutputContains(
+ 'Your sosreport has been generated and saved in:'
+ )
self.assertTrue('tmp/' in self.archive)
# Ok, sort of cheesy here but this does actually test filename changes on
@@ -102,7 +118,10 @@ class ReportWithUserCustomisations(StageOneReportTest):
self.assertFileHasContent('proc/cmdline', 'BOOT_IMAGE')
def test_skip_cleaning_glob_file(self):
- self.assertFileHasContent('sos_commands/kernel/sysctl_-a', 'fs.dentry-state')
+ self.assertFileHasContent(
+ 'sos_commands/kernel/sysctl_-a',
+ 'fs.dentry-state'
+ )
class DefaultRemoveBinaryFilesTest(StageTwoReportTest):
@@ -134,8 +153,8 @@ class KeepBinaryFilesTest(StageTwoReportTest):
def test_warning_message_shown(self):
self.assertOutputContains(
- 'WARNING: binary files that potentially contain sensitive information '
- 'will NOT be removed from the final archive'
+ 'WARNING: binary files that potentially contain sensitive '
+ 'information will NOT be removed from the final archive'
)
def test_binary_is_in_archive(self):
diff --git a/tests/cleaner_tests/existing_archive.py b/tests/cleaner_tests/existing_archive.py
index 7f31f88e..57bae713 100644
--- a/tests/cleaner_tests/existing_archive.py
+++ b/tests/cleaner_tests/existing_archive.py
@@ -13,6 +13,7 @@ from sos_tests import StageTwoReportTest
ARCHIVE = 'sosreport-cleanertest-2021-08-03-qpkxdid'
+
class ExistingArchiveCleanTest(StageTwoReportTest):
"""Ensure that we can extract an already created archive and clean it the
same as we would an in-line run of `report --clean`.
@@ -26,34 +27,52 @@ class ExistingArchiveCleanTest(StageTwoReportTest):
sos_component = 'clean'
def test_obfuscation_log_created(self):
- self.assertFileExists(os.path.join(self.tmpdir, '%s-obfuscation.log' % ARCHIVE))
+ self.assertFileExists(
+ os.path.join(self.tmpdir, f'{ARCHIVE}-obfuscation.log')
+ )
def test_archive_type_correct(self):
- with open(os.path.join(self.tmpdir, '%s-obfuscation.log' % ARCHIVE), 'r') as log:
+ with open(os.path.join(
+ self.tmpdir,
+ f'{ARCHIVE}-obfuscation.log'), 'r') as log:
for line in log:
if "Loaded %s" % ARCHIVE in line:
- assert 'as type sos report archive' in line, "Incorrect archive type detected: %s" % line
+ assert \
+ 'as type sos report archive' in line, \
+ f"Incorrect archive type detected: {line}"
break
def test_from_cmdline_logged(self):
- with open(os.path.join(self.tmpdir, '%s-obfuscation.log' % ARCHIVE), 'r') as log:
+ with open(os.path.join(
+ self.tmpdir,
+ f'{ARCHIVE}-obfuscation.log'), 'r') as log:
for line in log:
if 'From cmdline' in line:
- assert 'From cmdline: True' in line, "Did not properly log cmdline run"
+ assert \
+ 'From cmdline: True' in line, \
+ "Did not properly log cmdline run"
break
def test_extraction_completed_successfully(self):
- with open(os.path.join(self.tmpdir, '%s-obfuscation.log' % ARCHIVE), 'r') as log:
+ with open(os.path.join(
+ self.tmpdir,
+ f'{ARCHIVE}-obfuscation.log'), 'r') as log:
for line in log:
if 'Extracted path is' in line:
path = line.split('Extracted path is')[-1].strip()
- assert path.startswith(self.tmpdir), "Extracted path appears wrong: %s (tmpdir: %s)" % (path, self.tmpdir)
+ assert \
+ path.startswith(self.tmpdir), \
+ (f"Extracted path appears wrong: {path} "
+ f"(tmpdir: {self.tmpdir})")
return
self.fail("Extracted path not logged")
def test_private_map_was_generated(self):
- self.assertOutputContains('A mapping of obfuscated elements is available at')
- map_file = re.findall('/.*sosreport-.*-private_map', self.cmd_output.stdout)[-1]
+ self.assertOutputContains(
+ 'A mapping of obfuscated elements is available at'
+ )
+ map_file = re.findall(
+ '/.*sosreport-.*-private_map', self.cmd_output.stdout)[-1]
self.assertFileExists(map_file)
def test_tarball_named_obfuscated(self):
@@ -70,7 +89,9 @@ class ExistingArchiveCleanTest(StageTwoReportTest):
def test_no_empty_obfuscations(self):
# get the private map file name
- map_file = re.findall('/.*sosreport-.*-private_map', self.cmd_output.stdout)[-1]
+ map_file = re.findall(
+ '/.*sosreport-.*-private_map', self.cmd_output.stdout
+ )[-1]
with open(map_file, 'r') as mf:
map_json = json.load(mf)
for mapping in map_json:
@@ -83,12 +104,17 @@ class ExistingArchiveCleanTest(StageTwoReportTest):
if not content:
assert True
else:
- self.fail("IP appears in files: %s" % "\n".join(f for f in content))
+ new_content = "\n".join(f for f in content)
+ self.fail(f'IP appears in files: {new_content}')
def test_user_is_obfuscated(self):
"""Ensure that the 'testuser1' user created at install is obfuscated
"""
- self.assertFileNotHasContent('var/log/anaconda/journal.log', 'testuser1')
+ self.assertFileNotHasContent(
+ 'var/log/anaconda/journal.log',
+ 'testuser1'
+ )
+
class ExistingArchiveCleanTmpTest(StageTwoReportTest):
"""Continuation of above tests which requires cleaning var / tmp keywords
@@ -98,13 +124,14 @@ class ExistingArchiveCleanTmpTest(StageTwoReportTest):
:avocado: tags=stagetwo
"""
- sos_cmd = '-v --keywords var,tmp,avocado --disable-parsers ip,ipv6,mac,username \
- --no-update tests/test_data/%s.tar.xz' % ARCHIVE
+ sos_cmd = f'-v --keywords var,tmp,avocado --disable-parsers \
+ ip,ipv6,mac,username --no-update tests/test_data/{ARCHIVE}.tar.xz'
sos_component = 'clean'
def test_sys_tmp_not_obfuscated(self):
- """ Ensure that keywords var, tmp and avocado remains in the final archive
- path despite they are parts of the --tmp-dir
+ """ Ensure that keywords var, tmp and avocado remains in the final
+ archive path despite they are parts of the --tmp-dir
"""
- self.assertTrue(self.archive.startswith(os.getenv('AVOCADO_TESTS_COMMON_TMPDIR')))
-
+ self.assertTrue(
+ self.archive.startswith(os.getenv('AVOCADO_TESTS_COMMON_TMPDIR'))
+ )
diff --git a/tests/cleaner_tests/full_report/full_report_run.py b/tests/cleaner_tests/full_report/full_report_run.py
index d17287a8..b7ce0742 100644
--- a/tests/cleaner_tests/full_report/full_report_run.py
+++ b/tests/cleaner_tests/full_report/full_report_run.py
@@ -9,7 +9,6 @@
import json
import re
-from avocado.utils import process
from sos_tests import StageTwoReportTest
@@ -46,8 +45,13 @@ class FullCleanTest(StageTwoReportTest):
)
def test_private_map_was_generated(self):
- self.assertOutputContains('A mapping of obfuscated elements is available at')
- map_file = re.findall('/.*sosreport-.*-private_map', self.cmd_output.stdout)[-1]
+ self.assertOutputContains(
+ 'A mapping of obfuscated elements is available at'
+ )
+ map_file = re.findall(
+ '/.*sosreport-.*-private_map',
+ self.cmd_output.stdout
+ )[-1]
self.assertFileExists(map_file)
def test_tarball_named_obfuscated(self):
@@ -69,7 +73,10 @@ class FullCleanTest(StageTwoReportTest):
def test_no_empty_obfuscations(self):
# get the private map file name
- map_file = re.findall('/.*sosreport-.*-private_map', self.cmd_output.stdout)[-1]
+ map_file = re.findall(
+ '/.*sosreport-.*-private_map',
+ self.cmd_output.stdout
+ )[-1]
with open(map_file, 'r') as mf:
map_json = json.load(mf)
for mapping in map_json:
@@ -83,4 +90,5 @@ class FullCleanTest(StageTwoReportTest):
if not content:
assert True
else:
- self.fail("IP appears in files: %s" % "\n".join(f for f in content))
+ new_content = "\n".join(f for f in content)
+ self.fail(f'IP appears in files: {new_content}')
diff --git a/tests/cleaner_tests/help_output_tests.py b/tests/cleaner_tests/help_output_tests.py
index 2f14438c..b77895f6 100644
--- a/tests/cleaner_tests/help_output_tests.py
+++ b/tests/cleaner_tests/help_output_tests.py
@@ -21,7 +21,8 @@ class CleanHelpTest(StageOneOutputTest):
def test_all_help_sections_present(self):
self.assertOutputContains('Global Options:')
self.assertOutputContains('Cleaner/Masking Options:')
- self.assertOutputContains('TARGET The directory or archive to obfuscate')
+ self.assertOutputContains('TARGET The directory or '
+ 'archive to obfuscate')
class MaskHelpTest(CleanHelpTest):
diff --git a/tests/cleaner_tests/ipv6_test/ipv6_test.py b/tests/cleaner_tests/ipv6_test/ipv6_test.py
index 851b7678..1334d92d 100644
--- a/tests/cleaner_tests/ipv6_test/ipv6_test.py
+++ b/tests/cleaner_tests/ipv6_test/ipv6_test.py
@@ -10,6 +10,7 @@ from sos_tests import StageTwoReportTest
MOCK_FILE = '/tmp/sos-test-ipv6.txt'
+
class IPv6Test(StageTwoReportTest):
"""Place artificial plugin collecting crafted text file with ipv6 adresses
to make sure ipv6 obfuscation works when calling 'sos clean' like a user
@@ -31,8 +32,14 @@ class IPv6Test(StageTwoReportTest):
def test_valid_ipv6(self):
self.assertFileCollected(MOCK_FILE)
self.assertFileHasContent(MOCK_FILE, 'GOOD_IP=')
- self.assertFileNotHasContent(MOCK_FILE, 'GOOD_IP=3000:505f:505f:505f:505f:505f:505f:505f')
+ self.assertFileNotHasContent(
+ MOCK_FILE,
+ 'GOOD_IP=3000:505f:505f:505f:505f:505f:505f:505f'
+ )
def test_bad_ipv6(self):
self.assertFileHasContent(MOCK_FILE, 'BAD_IP=')
- self.assertFileNotHasContent(MOCK_FILE, 'BAD_IP=505f:505f:505f:505f:505f:505f:505f:505f')
+ self.assertFileNotHasContent(
+ MOCK_FILE,
+ 'BAD_IP=505f:505f:505f:505f:505f:505f:505f:505f'
+ )
diff --git a/tests/cleaner_tests/report_disabled_parsers.py b/tests/cleaner_tests/report_disabled_parsers.py
index 286c6aa8..c313485a 100644
--- a/tests/cleaner_tests/report_disabled_parsers.py
+++ b/tests/cleaner_tests/report_disabled_parsers.py
@@ -11,6 +11,7 @@ from sos_tests import StageOneReportTest, StageTwoReportTest
ARCHIVE = 'sosreport-cleanertest-2021-08-03-qpkxdid.tar.xz'
+
class ReportDisabledParsersTest(StageOneReportTest):
"""Run report with selected disabled parsers and ensure those parsers are
in fact disabled and unused.
@@ -21,14 +22,18 @@ class ReportDisabledParsersTest(StageOneReportTest):
sos_cmd = '--clean -o host,kernel,networking --disable-parsers=ip'
def test_local_ip_not_obfuscated(self):
- self.assertFileHasContent('ip_addr', self.sysinfo['pre']['networking']['ip_addr'])
+ self.assertFileHasContent(
+ 'ip_addr',
+ self.sysinfo['pre']['networking']['ip_addr']
+ )
def test_disable_message_logged(self):
self.assertSosLogContains('Disabling parser: ip')
def test_ui_log_message_shown(self):
self.assertSosUILogContains(
- '.*Be aware that this may leave sensitive plain-text data in the archive.'
+ '.*Be aware that this may leave sensitive plain-text data in '
+ 'the archive.'
)
# make sure that the other parsers remain functional
@@ -36,11 +41,15 @@ class ReportDisabledParsersTest(StageOneReportTest):
self.assertFileHasContent('hostname', 'host0')
def test_mac_addrs_were_obfuscated(self):
- content = self.get_file_content('sos_commands/networking/ip_maddr_show')
+ content = self.get_file_content(
+ 'sos_commands/networking/ip_maddr_show'
+ )
for line in content.splitlines():
if line.strip().startswith('link'):
mac = line.strip().split()[1]
- assert mac.startswith('53:4f:53'), "Found unobfuscated mac addr %s" % mac
+ assert \
+ mac.startswith('53:4f:53'), \
+ f"Found unobfuscated mac addr {mac}"
class NativeCleanDisabledParsersTest(StageTwoReportTest):
@@ -54,8 +63,17 @@ class NativeCleanDisabledParsersTest(StageTwoReportTest):
sos_component = 'clean'
def test_localhost_not_obfuscated(self):
- self.assertFileNotHasContent('hostname', self.sysinfo['pre']['networking']['hostname'])
- self.assertFileNotHasContent('uname', self.sysinfo['pre']['networking']['hostname'])
+ self.assertFileNotHasContent(
+ 'hostname',
+ self.sysinfo['pre']['networking']['hostname']
+ )
+ self.assertFileNotHasContent(
+ 'uname',
+ self.sysinfo['pre']['networking']['hostname']
+ )
def test_local_ip_was_obfuscated(self):
- self.assertFileNotHasContent('ip_addr', self.sysinfo['pre']['networking']['ip_addr'])
+ self.assertFileNotHasContent(
+ 'ip_addr',
+ self.sysinfo['pre']['networking']['ip_addr']
+ )
diff --git a/tests/cleaner_tests/skip_versioning/skip_version_ip_parser.py b/tests/cleaner_tests/skip_versioning/skip_version_ip_parser.py
index 957585dd..b74eaa18 100644
--- a/tests/cleaner_tests/skip_versioning/skip_version_ip_parser.py
+++ b/tests/cleaner_tests/skip_versioning/skip_version_ip_parser.py
@@ -11,6 +11,7 @@ from sos_tests import StageTwoReportTest
DO_SKIP = '/tmp/sos-test-version.txt'
NO_SKIP = '/tmp/sos-test-version-noskip'
+
class SkipVersionIPParser(StageTwoReportTest):
"""Ensures that we _skip_ files ending in 'version' (or 'version.txt') to
avoid incorrectly obfuscating version numbers.
diff --git a/tests/collect_tests/help_output_tests.py b/tests/collect_tests/help_output_tests.py
index 584ad498..4a253f56 100644
--- a/tests/collect_tests/help_output_tests.py
+++ b/tests/collect_tests/help_output_tests.py
@@ -35,13 +35,20 @@ class CollectHelpOutputTest(StageOneOutputTest):
sos_cmd = 'collect --help'
- @skipIf(PEXPECT_PRESENT, "python3-pexpect is installed, placeholder will not be used")
+ @skipIf(
+ PEXPECT_PRESENT,
+ "python3-pexpect is installed, placeholder will not be used"
+ )
def test_placeholder_warning_shown(self):
- self.assertOutputContains("WARNING: `collect` is not available with this installation!")
+ self.assertOutputContains(
+ "WARNING: `collect` is not available with this installation!"
+ )
@skipIf(PEXPECT_PRESENT is False, "python3-pexpect not installed locally")
def test_help_sections_present(self):
- self.assertOutputNotContains("WARNING: `collect` is not available with this installation!")
+ self.assertOutputNotContains(
+ "WARNING: `collect` is not available with this installation!"
+ )
self.assertOutputContains("Global Options:")
self.assertOutputContains("Report Passthru Options:")
self.assertOutputContains("Collector Options:")
@@ -65,7 +72,9 @@ class CollectOptionsHelpTest(StageOneOutputTest):
@skipIf(PEXPECT_PRESENT is False, "python3-pexpect not installed locally")
def test_cluster_profiles_shown(self):
- _out = re.search(r"Use the short name with --cluster-type or cluster options \(-c\)(.*?)The following cluster options are available:",
+ _out = re.search("Use the short name with --cluster-type or cluster "
+ r"options \(-c\)(.*?)The following cluster options "
+ "are available:",
self.cmd_output.stdout, re.S).group(1).splitlines()
_profs = {}
for ln in _out:
@@ -75,8 +84,9 @@ class CollectOptionsHelpTest(StageOneOutputTest):
_profs[ln[0]] = ln[1]
clusters = []
- # get a list of names of profile pyfiles
- for pyfile in glob.glob(os.path.join(SOS_REPO_ROOT, 'sos/collector/clusters/*.py')):
+ # get a list of names of profile pyfiles
+ for pyfile in glob.glob(os.path.join(SOS_REPO_ROOT,
+ 'sos/collector/clusters/*.py')):
pyf = os.path.basename(pyfile)
if pyf.startswith('__'):
continue
@@ -88,17 +98,26 @@ class CollectOptionsHelpTest(StageOneOutputTest):
# this has the secondary effect of enforcing a stylistic requirement
# where at least one profile must match the pyfile name
for clus in clusters:
- assert clus in _profs, "Cluster '%s' not displayed in --list-options" % clus
+ assert \
+ clus in _profs, \
+ f"Cluster '{clus}' not displayed in --list-options"
@skipIf(PEXPECT_PRESENT is False, "python3-pexpect not installed locally")
def test_cluster_options_shown(self):
- _opts = re.search(" Cluster Option Name Type Default Description(.*?)Options take the form of cluster.name=value",
+ _opts = re.search(" Cluster Option Name "
+ "Type Default Description(.*?)Options "
+ "take the form of cluster.name=value",
self.cmd_output.stdout, re.S).group(1).splitlines()
_opts = [o for o in _opts if o]
assert _opts, "No option output detected"
- @skipIf(PEXPECT_PRESENT, "python3-pexpect is installed, placeholder will be unused")
+ @skipIf(
+ PEXPECT_PRESENT,
+ "python3-pexpect is installed, placeholder will be unused"
+ )
def test_placeholder_component_used(self):
- self.assertOutputContains("(unavailable) Collect an sos report from multiple nodes")
+ self.assertOutputContains(
+ "(unavailable) Collect an sos report from multiple nodes"
+ )
diff --git a/tests/product_tests/foreman/foreman_tests.py b/tests/product_tests/foreman/foreman_tests.py
index dd37f793..b524c8ce 100644
--- a/tests/product_tests/foreman/foreman_tests.py
+++ b/tests/product_tests/foreman/foreman_tests.py
@@ -14,7 +14,11 @@ FOREMAN_DB_PASSWORD = r'S0Sdb=p@ssw0rd!'
FOREMAN_ADMIN_PASSWORD = r'S0S@dmin\\p@ssw0rd!'
CANDLEPIN_DB_PASSWORD = r'S0SKatello%sp@ssw0rd!'
-FOREMAN_PASSWORDS = [FOREMAN_DB_PASSWORD, FOREMAN_ADMIN_PASSWORD, CANDLEPIN_DB_PASSWORD]
+FOREMAN_PASSWORDS = [
+ FOREMAN_DB_PASSWORD,
+ FOREMAN_ADMIN_PASSWORD,
+ CANDLEPIN_DB_PASSWORD
+]
class ForemanBasicTest(StageOneReportTest):
@@ -47,7 +51,9 @@ class ForemanBasicTest(StageOneReportTest):
self.assertFileGlobNotInArchive("/etc/foreman*/*key.pem")
def test_foreman_database_sizes_collected(self):
- self.assertFileCollected('sos_commands/foreman/foreman_db_tables_sizes')
+ self.assertFileCollected(
+ 'sos_commands/foreman/foreman_db_tables_sizes'
+ )
def test_foreman_installer_dirs_collected(self):
self.assertFileGlobInArchive("/etc/foreman-installer/*")
@@ -63,23 +69,34 @@ class ForemanBasicTest(StageOneReportTest):
self.assertFileCollected('sos_commands/foreman/foreman_tasks_tasks')
def test_proxyfeatures_not_collected(self):
- self.assertFileGlobNotInArchive("sos_commands/foreman/smart_proxies_features/*")
+ self.assertFileGlobNotInArchive(
+ "sos_commands/foreman/smart_proxies_features/*"
+ )
def test_foreman_config_postproc_worked(self):
- self.assertFileNotHasContent('/etc/foreman/database.yml', FOREMAN_DB_PASSWORD)
+ self.assertFileNotHasContent(
+ '/etc/foreman/database.yml',
+ FOREMAN_DB_PASSWORD
+ )
def test_foreman_password_postproc_worked(self):
- for _check in ['/var/log/foreman-installer/foreman.log', '/etc/foreman-installer/scenarios.d/foreman-answers.yaml']:
+ for _check in ['/var/log/foreman-installer/foreman.log',
+ '/etc/foreman-installer/scenarios.d/'
+ 'foreman-answers.yaml']:
for passwd in FOREMAN_PASSWORDS:
self.assertFileNotHasContent(_check, passwd)
@redhat_only
def test_candlepin_table_sizes_collected(self):
- self.assertFileCollected('sos_commands/candlepin/candlepin_db_tables_sizes')
+ self.assertFileCollected(
+ 'sos_commands/candlepin/candlepin_db_tables_sizes'
+ )
@redhat_only
def test_katello_password_postproc_worked(self):
- for _check in ['/var/log/foreman-installer/katello.log', '/etc/foreman-installer/scenarios.d/katello-answers.yaml']:
+ for _check in ['/var/log/foreman-installer/katello.log',
+ '/etc/foreman-installer/scenarios.d/'
+ 'katello-answers.yaml']:
for passwd in FOREMAN_PASSWORDS:
self.assertFileNotHasContent(_check, passwd)
@@ -107,7 +124,10 @@ class ForemanWithOptionsTest(StageOneReportTest):
@redhat_only
def test_proxyfeatures_collected(self):
- self.assertFileGlobInArchive("sos_commands/foreman/smart_proxies_features/*")
+ self.assertFileGlobInArchive(
+ "sos_commands/foreman/smart_proxies_features/*"
+ )
+
class ForemanInstallerTest(StageOneReportTest):
"""Check whether foreman-installer related data are properly collected
@@ -122,6 +142,7 @@ class ForemanInstallerTest(StageOneReportTest):
def test_foreman_installer_etc_collected(self):
self.assertFileCollected("/etc/foreman-installer/scenarios.d")
+
class ForemanProxyTest(StageOneReportTest):
"""Check whether foreman-proxy related data are properly collected
independently on main foreman plugin.
diff --git a/tests/report_tests/__init__.py b/tests/report_tests/__init__.py
index 8b137891..e69de29b 100644
--- a/tests/report_tests/__init__.py
+++ b/tests/report_tests/__init__.py
@@ -1 +0,0 @@
-
diff --git a/tests/report_tests/basic_report_tests.py b/tests/report_tests/basic_report_tests.py
index 58618cbc..acdf5e76 100644
--- a/tests/report_tests/basic_report_tests.py
+++ b/tests/report_tests/basic_report_tests.py
@@ -40,7 +40,10 @@ class NormalSoSReport(StageOneReportTest):
"No tag summary generated in report"
)
self.assertTrue(
- isinstance(self.manifest['components']['report']['tag_summary'], dict),
+ isinstance(
+ self.manifest['components']['report']['tag_summary'],
+ dict
+ ),
"Tag summary malformed"
)
@@ -73,7 +76,8 @@ class RestrictedSoSReport(StageOneReportTest):
:avocado: tags=stageone
"""
- sos_cmd = '-o kernel,host,sudo,hardware,dbus,x11 --no-env-var --no-report -t1 --no-postproc'
+ sos_cmd = ('-o kernel,host,sudo,hardware,dbus,x11 --no-env-var '
+ '--no-report -t1 --no-postproc')
def test_no_env_vars_collected(self):
self.assertFileNotCollected('environment')
@@ -90,7 +94,8 @@ class RestrictedSoSReport(StageOneReportTest):
self.assertOutputNotContains('substituting')
def test_only_selected_plugins_run(self):
- self.assertOnlyPluginsIncluded(['kernel', 'host', 'sudo', 'hardware', 'dbus', 'x11'])
+ self.assertOnlyPluginsIncluded(['kernel', 'host', 'sudo',
+ 'hardware', 'dbus', 'x11'])
class DisabledCollectionsReport(StageOneReportTest):
@@ -98,7 +103,8 @@ class DisabledCollectionsReport(StageOneReportTest):
:avocado: tags=stageone
"""
- sos_cmd = "-n networking,system,logs --skip-files=/etc/fstab --skip-commands='journalctl*'"
+ sos_cmd = ("-n networking,system,logs --skip-files=/etc/fstab "
+ "--skip-commands='journalctl*'")
def test_plugins_disabled(self):
self.assertPluginNotIncluded('networking')
@@ -115,4 +121,3 @@ class DisabledCollectionsReport(StageOneReportTest):
def test_skip_commands_working(self):
self.assertFileGlobNotInArchive('sos_commands/*/journalctl*')
-
diff --git a/tests/report_tests/command_priority_tests.py b/tests/report_tests/command_priority_tests.py
index ea6b38fa..f5a601de 100644
--- a/tests/report_tests/command_priority_tests.py
+++ b/tests/report_tests/command_priority_tests.py
@@ -33,7 +33,9 @@ class CommandPriorityTest(StageOneReportTest):
def test_process_correct_priorities(self):
cmds = self.get_plugin_manifest('process')['commands']
# ensure root symlinked ps ran first
- self.assertTrue(cmds[0]['priority'] == 1 and 'ps_aux' in cmds[0]['tags'])
+ self.assertTrue(
+ cmds[0]['priority'] == 1 and 'ps_aux' in cmds[0]['tags']
+ )
# get lsof and iotop command entries
_lsof = None
@@ -49,6 +51,3 @@ class CommandPriorityTest(StageOneReportTest):
self.assertEqual(_lsof['priority'], 50)
self.assertEqual(_iotop['priority'], 100)
self.assertTrue(_lsof['start_time'] < _iotop['start_time'])
-
-
-
diff --git a/tests/report_tests/encryption_tests.py b/tests/report_tests/encryption_tests.py
index d60d9d78..64b01fde 100644
--- a/tests/report_tests/encryption_tests.py
+++ b/tests/report_tests/encryption_tests.py
@@ -25,8 +25,9 @@ class EncryptedReportTest(StageOneReportTest):
self.assertOutputContains(r'/.*sosreport-.*tar.*\.gpg')
_cmd = "file %s" % self.encrypted_path
res = process.run(_cmd)
- self.assertTrue(("GPG symmetrically encrypted data" in res.stdout.decode())
- or ("PGP symmetric key encrypted data" in res.stdout.decode()))
+ self.assertTrue(
+ ("GPG symmetrically encrypted data" in res.stdout.decode())
+ or ("PGP symmetric key encrypted data" in res.stdout.decode()))
def test_tarball_named_secure(self):
self.assertTrue('secured-' in self.encrypted_path)
@@ -49,4 +50,7 @@ class EncryptedCleanedReportTest(EncryptedReportTest):
self.assertTrue('obfuscated' in self.archive)
def test_ip_address_was_obfuscated(self):
- self.assertFileNotHasContent('ip_addr', self.sysinfo['pre']['networking']['ip_addr'])
+ self.assertFileNotHasContent(
+ 'ip_addr',
+ self.sysinfo['pre']['networking']['ip_addr']
+ )
diff --git a/tests/report_tests/exception_tests.py b/tests/report_tests/exception_tests.py
index 678c06d6..a5f1b8ab 100644
--- a/tests/report_tests/exception_tests.py
+++ b/tests/report_tests/exception_tests.py
@@ -28,7 +28,8 @@ class InvalidPluginOptionTest(StageOneReportExceptionTest):
sos_cmd = '-o kernel -k kernel.colonel=on'
def test_caught_invalid_plugin_option(self):
- self.assertOutputContains(r'no such option "colonel" for plugin \(kernel\)')
+ self.assertOutputContains('no such option "colonel" for plugin '
+ r'\(kernel\)')
class InvalidReportOptionTest(StageOneReportExceptionTest):
@@ -51,4 +52,5 @@ class InvalidPluginDisableTest(StageOneReportTest):
sos_cmd = '-n logs,foobar,networking'
def test_caught_invalid_plugin_name(self):
- self.assertOutputContains("Requested to skip non-existing plugin 'foobar'")
+ self.assertOutputContains("Requested to skip non-existing plugin "
+ "'foobar'")
diff --git a/tests/report_tests/help_output_tests.py b/tests/report_tests/help_output_tests.py
index f1a4104b..8321f193 100644
--- a/tests/report_tests/help_output_tests.py
+++ b/tests/report_tests/help_output_tests.py
@@ -41,11 +41,11 @@ class ReportListPluginsTest(StageOneOutputTest):
self.assertOutputContains('Profiles:')
def test_no_missing_plugin_descriptions(self):
- _out = re.search("The following plugins are currently enabled:(.*?)The following plugins are currently disabled:",
+ _out = re.search("The following plugins are currently enabled:(.*?)"
+ "The following plugins are currently disabled:",
self.cmd_output.stdout, re.S).group(1).splitlines()
for ln in _out:
# Ignore newlines
if not ln:
continue
assert len(ln) > 1, "Plugin '%s' missing description" % ln[0]
-
diff --git a/tests/report_tests/low_priority_tests.py b/tests/report_tests/low_priority_tests.py
index 97bf0a28..0859cf73 100644
--- a/tests/report_tests/low_priority_tests.py
+++ b/tests/report_tests/low_priority_tests.py
@@ -30,4 +30,7 @@ class LowPrioTest(StageOneReportTest):
def test_niceness_set(self):
self.assertSosLogContains('Set niceness of report to 19')
- self.assertEqual(self.manifest['components']['report']['priority']['niceness'], 19)
+ self.assertEqual(
+ self.manifest['components']['report']['priority']['niceness'],
+ 19
+ )
diff --git a/tests/report_tests/options_tests/options_tests.py b/tests/report_tests/options_tests/options_tests.py
index e912da8d..15f0b572 100644
--- a/tests/report_tests/options_tests/options_tests.py
+++ b/tests/report_tests/options_tests/options_tests.py
@@ -27,10 +27,14 @@ class OptionsFromConfigTest(StageTwoReportTest):
def test_plugopts_logged_from_config(self):
self.assertSosLogContains(
- r"Set kernel plugin option to \(name=with-timer, desc='gather /proc/timer\* statistics', value=True, default=False\)"
+ r"Set kernel plugin option to \(name=with-timer, "
+ r"desc='gather /proc/timer\* statistics', value=True, "
+ r"default=False\)"
)
self.assertSosLogContains(
- r"Set kernel plugin option to \(name=trace, desc='gather /sys/kernel/debug/tracing/trace file', value=True, default=False\)"
+ r"Set kernel plugin option to \(name=trace, "
+ "desc='gather /sys/kernel/debug/tracing/trace file', "
+ r"value=True, default=False\)"
)
def test_disabled_plugopts_not_loaded(self):
@@ -41,5 +45,7 @@ class OptionsFromConfigTest(StageTwoReportTest):
def test_effective_options_logged_correctly(self):
self.assertSosLogContains(
- "effective options now: --batch --case-id 8675309 --only-plugins host,kernel --plugopts kernel.with-timer=on,kernel.trace=yes"
+ "effective options now: --batch --case-id 8675309 "
+ "--only-plugins host,kernel "
+ "--plugopts kernel.with-timer=on,kernel.trace=yes"
)
diff --git a/tests/report_tests/plugin_tests/collect_manual_tests.py b/tests/report_tests/plugin_tests/collect_manual_tests.py
index f63a074e..b0c92333 100644
--- a/tests/report_tests/plugin_tests/collect_manual_tests.py
+++ b/tests/report_tests/plugin_tests/collect_manual_tests.py
@@ -32,6 +32,10 @@ class CollectManualTest(StageOneReportTest):
def test_manifest_collections_correct(self):
pkgman = self.get_plugin_manifest('unpackaged')
- self.assertTrue(any(c['name'] == 'unpackaged' for c in pkgman['collections']))
+ self.assertTrue(
+ any(c['name'] == 'unpackaged' for c in pkgman['collections'])
+ )
pyman = self.get_plugin_manifest('python')
- self.assertTrue(any(c['name'] == 'digests.json' for c in pyman['collections']))
+ self.assertTrue(
+ any(c['name'] == 'digests.json' for c in pyman['collections'])
+ )
diff --git a/tests/report_tests/plugin_tests/defaults.py b/tests/report_tests/plugin_tests/defaults.py
index 4cdaabd4..4d790114 100644
--- a/tests/report_tests/plugin_tests/defaults.py
+++ b/tests/report_tests/plugin_tests/defaults.py
@@ -31,7 +31,8 @@ class DefaultCollectionsTest(StageTwoReportTest):
assert ent, "No manifest entry for systemctl status cups"
def test_journal_collected(self):
- self.assertFileCollected('sos_commands/cups/journalctl_--no-pager_--unit_cups')
+ self.assertFileCollected('sos_commands/cups/journalctl_--no-pager_'
+ '--unit_cups')
_m = self.get_plugin_manifest('cups')
ent = None
for cmd in _m['commands']:
@@ -39,4 +40,6 @@ class DefaultCollectionsTest(StageTwoReportTest):
ent = cmd
assert ent, "No manifest entry for journalctl cups"
- assert 'journal_cups' in ent['tags'], "Journal tags not correct: %s" % ent['tags']
+ assert \
+ 'journal_cups' in ent['tags'], \
+ f"Journal tags not correct: {ent['tags']}"
diff --git a/tests/report_tests/plugin_tests/krb5.py b/tests/report_tests/plugin_tests/krb5.py
index ab6b2379..260e5736 100644
--- a/tests/report_tests/plugin_tests/krb5.py
+++ b/tests/report_tests/plugin_tests/krb5.py
@@ -9,6 +9,7 @@
from sos_tests import StageTwoReportTest, redhat_only, ubuntu_only
+
class Krb5PluginTest(StageTwoReportTest):
"""Ensure that the krb5 plugin activates for the distros that we support it
on.
diff --git a/tests/report_tests/plugin_tests/logs.py b/tests/report_tests/plugin_tests/logs.py
index 49f1c592..c3811b3f 100644
--- a/tests/report_tests/plugin_tests/logs.py
+++ b/tests/report_tests/plugin_tests/logs.py
@@ -14,6 +14,7 @@ from sos_tests import StageOneReportTest, StageTwoReportTest
from string import ascii_uppercase, digits
from time import sleep
+
class LogsPluginTest(StageOneReportTest):
"""Ensure common collections from the `logs` plugin are properly collected
@@ -24,7 +25,8 @@ class LogsPluginTest(StageOneReportTest):
def test_journalctl_collections(self):
self.assertFileCollected('sos_commands/logs/journalctl_--disk-usage')
- self.assertFileCollected('sos_commands/logs/journalctl_--no-pager_--boot')
+ self.assertFileCollected('sos_commands/logs/journalctl_--no-pager_'
+ '--boot')
def test_journal_runtime_collected(self):
self.assertFileGlobInArchive('/var/log/journal/*')
@@ -63,7 +65,8 @@ class JournalSizeLimitTest(StageTwoReportTest):
for i in range(2):
# generate 10MB, write it, then write it in reverse.
# Spend less time generating new strings
- rand = ''.join(random.choice(ascii_uppercase + digits) for _ in range(rsize))
+ rand = ''.join(
+ random.choice(ascii_uppercase + digits) for _ in range(rsize))
sosfd.write(rand + '\n')
# sleep to avoid burst rate-limiting
sleep(5)
@@ -73,10 +76,16 @@ class JournalSizeLimitTest(StageTwoReportTest):
journ = 'sos_commands/logs/journalctl_--no-pager'
self.assertFileCollected(journ)
jsize = os.stat(self.get_name_in_archive(journ)).st_size
- assert jsize <= 20971520, "Collected journal is larger than 20MB (size: %s)" % jsize
+ assert \
+ jsize <= 20971520, \
+ f"Collected journal is larger than 20MB (size: {jsize})"
def test_journal_tailed_and_linked(self):
- tailed = self.get_name_in_archive('sos_strings/logs/journalctl_--no-pager.tailed')
+ tailed = self.get_name_in_archive('sos_strings/logs/'
+ 'journalctl_--no-pager.tailed')
self.assertFileExists(tailed)
- journ = self.get_name_in_archive('sos_commands/logs/journalctl_--no-pager')
- assert os.path.islink(journ), "Journal in sos_commands/logs is not a symlink"
+ journ = self.get_name_in_archive('sos_commands/logs/'
+ 'journalctl_--no-pager')
+ assert \
+ os.path.islink(journ), \
+ "Journal in sos_commands/logs is not a symlink"
diff --git a/tests/report_tests/plugin_tests/sos_extras/sos_extras.py b/tests/report_tests/plugin_tests/sos_extras/sos_extras.py
index ae5c347a..4c7f9783 100644
--- a/tests/report_tests/plugin_tests/sos_extras/sos_extras.py
+++ b/tests/report_tests/plugin_tests/sos_extras/sos_extras.py
@@ -25,8 +25,10 @@ class SosExtrasPluginTest(StageTwoReportTest):
self.assertPluginIncluded('sos_extras')
def test_setup_message_displayed(self):
- self.assertOutputContains('Collecting data from extras file /etc/sos/extras.d/sos_testing.conf')
+ self.assertOutputContains('Collecting data from extras file '
+ '/etc/sos/extras.d/sos_testing.conf')
def test_extras_config_parsed(self):
self.assertFileCollected('/etc/fstab')
- self.assertFileCollected('sos_commands/sos_extras/sos_testing.conf/echo_sos_test')
+ self.assertFileCollected('sos_commands/sos_extras/sos_testing.conf'
+ '/echo_sos_test')
diff --git a/tests/report_tests/plugin_tests/sudo/sudo.py b/tests/report_tests/plugin_tests/sudo/sudo.py
index e04a73a7..1539294c 100644
--- a/tests/report_tests/plugin_tests/sudo/sudo.py
+++ b/tests/report_tests/plugin_tests/sudo/sudo.py
@@ -33,4 +33,3 @@ class SudoLdapScrubbedTest(StageTwoReportTest):
def test_bindpw_scrubbed(self):
self.assertFileNotHasContent('/etc/sudo-ldap.conf', 'sostestpassword')
-
diff --git a/tests/report_tests/plugin_tests/teamd.py b/tests/report_tests/plugin_tests/teamd.py
index 1c64e457..6b1b9337 100644
--- a/tests/report_tests/plugin_tests/teamd.py
+++ b/tests/report_tests/plugin_tests/teamd.py
@@ -32,11 +32,15 @@ class TeamdPluginTest(StageTwoReportTest):
# create the team device
res = process.run('nmcli con add type team ifname sostesting',
timeout=30)
- assert res.exit_status == 0, "Failed creating team device: %s" % res.stdout_text
+ assert \
+ res.exit_status == 0, \
+ f"Failed creating team device: {res.stdout_text}"
def post_test_tear_down(self):
res = process.run('nmcli con delete team-sostesting', timeout=30)
- assert res.exit_status == 0, "Failed to delete temp team device: %s" % res.stdout_text
+ assert \
+ res.exit_status == 0, \
+ f"Failed to delete temp team device: {res.stdout_text}"
def test_teamd_plugin_executed(self):
self.assertPluginIncluded('teamd')
diff --git a/tests/report_tests/smoke_tests.py b/tests/report_tests/smoke_tests.py
index 2d5c41e3..593e9a0c 100644
--- a/tests/report_tests/smoke_tests.py
+++ b/tests/report_tests/smoke_tests.py
@@ -52,9 +52,14 @@ class AllPluginSmokeTest(StageOneReportTest):
Make sure our warnings are displayed
"""
- self.assertOutputContains('Not logged in to OCP API, and no login token provided. Will not collect `oc` commands')
- self.assertOutputContains('Source the environment file for the user intended to connect to the OpenStack environment.')
- self.assertOutputContains('Some or all of the skydive params are not set properly.')
+ self.assertOutputContains('Not logged in to OCP API, and no login '
+ 'token provided. Will not collect `oc` '
+ 'commands')
+ self.assertOutputContains('Source the environment file for the user '
+ 'intended to connect to the OpenStack '
+ 'environment.')
+ self.assertOutputContains('Some or all of the skydive params are not '
+ 'set properly.')
class ExpectedDefaultPluginsTest(StageOneReportTest):
@@ -109,4 +114,3 @@ class ExpectedDefaultPluginsTest(StageOneReportTest):
'apt',
'ubuntu'
])
-
diff --git a/tests/report_tests/timeout/timeout_test.py b/tests/report_tests/timeout/timeout_test.py
index cfb148d7..8bc5ae61 100644
--- a/tests/report_tests/timeout/timeout_test.py
+++ b/tests/report_tests/timeout/timeout_test.py
@@ -15,7 +15,6 @@ class TimeoutTest(Plugin, IndependentPlugin):
short_desc = 'Tests timeout functionality in test suite'
plugin_timeout = 100
-
def setup(self):
self.add_cmd_output('sleep 15')
self.add_cmd_output('echo I slept great', suggest_filename='echo_good')
diff --git a/tests/report_tests/timeout/timeout_tests.py b/tests/report_tests/timeout/timeout_tests.py
index 94117e8e..f18b7b38 100644
--- a/tests/report_tests/timeout/timeout_tests.py
+++ b/tests/report_tests/timeout/timeout_tests.py
@@ -50,9 +50,12 @@ class NativeCmdTimeoutTest(StageTwoReportTest):
self.assertFileCollected('sos_commands/timeout_test/echo_good')
def test_command_timed_out(self):
- self.assertSosLogContains(r"\[plugin:timeout_test\] command 'sleep 30' timed out after 10s")
+ self.assertSosLogContains(
+ r"\[plugin:timeout_test\] command 'sleep 30' timed out after 10s"
+ )
self.assertFileCollected('sos_commands/timeout_test/sleep_30')
+
class MultipleTimeoutValues(NativeCmdTimeoutTest):
"""Test that our plugin timeout option priority is functioning correctly
@@ -60,7 +63,8 @@ class MultipleTimeoutValues(NativeCmdTimeoutTest):
"""
install_plugins = ['timeout_test']
- sos_cmd = '-o timeout_test,host --plugin-timeout=30 -k timeout_test.timeout=60'
+ sos_cmd = ('-o timeout_test,host --plugin-timeout=30 -k '
+ 'timeout_test.timeout=60')
def test_correct_plugin_timeout(self):
man = self.get_plugin_manifest('timeout_test')
diff --git a/tests/sos_tests.py b/tests/sos_tests.py
index 1fb31875..4ba98eac 100644
--- a/tests/sos_tests.py
+++ b/tests/sos_tests.py
@@ -25,13 +25,15 @@ import re
SOS_TEST_DIR = os.path.dirname(os.path.realpath(__file__))
SOS_REPO_ROOT = os.path.realpath(os.path.join(SOS_TEST_DIR, '../'))
-SOS_PLUGIN_DIR = os.path.realpath(os.path.join(SOS_REPO_ROOT, 'sos/report/plugins'))
+SOS_PLUGIN_DIR = os.path.realpath(
+ os.path.join(SOS_REPO_ROOT, 'sos/report/plugins'))
SOS_TEST_DATA_DIR = os.path.realpath(os.path.join(SOS_TEST_DIR, 'test_data'))
SOS_TEST_BIN = os.path.realpath(os.path.join(SOS_TEST_DIR, '../bin/sos'))
RH_DIST = ['rhel', 'centos', 'fedora']
UBUNTU_DIST = ['Ubuntu', 'debian']
+
def skipIf(cond, message=None):
def decorator(function):
def wrapper(self, *args, **kwargs):
@@ -43,18 +45,21 @@ def skipIf(cond, message=None):
return wrapper
return decorator
+
def redhat_only(tst):
def wrapper(func):
if distro.detect().name not in RH_DIST:
raise TestSkipError('Not running on a Red Hat distro')
return wrapper
+
def ubuntu_only(tst):
def wrapper(func):
if distro.detect().name not in UBUNTU_DIST:
raise TestSkipError('Not running on a Ubuntu or Debian distro')
return wrapper
+
class BaseSoSTest(Test):
"""Base class for all our test classes to build off of.
@@ -77,18 +82,22 @@ class BaseSoSTest(Test):
@property
def klass_name(self):
if not self._klass_name:
- self._klass_name = os.path.basename(__file__) + '.' + self.__class__.__name__
+ self._klass_name = (f"{os.path.basename(__file__)}."
+ f"{self.__class__.__name__}")
return self._klass_name
@property
def tmpdir(self):
if not self._tmpdir:
- self._tmpdir = os.getenv('AVOCADO_TESTS_COMMON_TMPDIR') + self.klass_name
+ self._tmpdir = (f"{os.getenv('AVOCADO_TESTS_COMMON_TMPDIR')}"
+ f"{self.klass_name}")
return self._tmpdir
@property
def sos_bin(self):
- return self._local_sos_bin if self.params.get('TESTLOCAL') == 'true' else SOS_TEST_BIN
+ if self.params.get('TESTLOCAL') == 'true':
+ return self._local_sos_bin
+ return SOS_TEST_BIN
def generate_sysinfo(self):
"""Collects some basic information about the system for later reference
@@ -142,7 +151,9 @@ class BaseSoSTest(Test):
# a little hacky, but using self.log methods here will not
# print to console unless we ratchet up the verbosity for the
# entire test suite, which will become very difficult to read
- LOG_UI.error('ERROR:\n' + msg[:8196]) # don't flood w/ super verbose logs
+
+ # don't flood w/ super verbose logs
+ LOG_UI.error('ERROR:\n' + msg[:8196])
if err.result.interrupted:
raise Exception("Timeout exceeded, see output above")
else:
@@ -218,9 +229,9 @@ class BaseSoSTest(Test):
This allows us to define distro-specific test classes much the same way
we can define distro-specific tests _within_ a test class using the
- appropriate decorators. We can't use the decorators for the class however
- due to how avocado catches instantiation exceptions, so instead we need
- to raise the skip exception after instantiation is done.
+ appropriate decorators. We can't use the decorators for the class
+ however due to how avocado catches instantiation exceptions, so instead
+ we need to raise the skip exception after instantiation is done.
"""
if self.redhat_only:
if self.local_distro not in RH_DIST:
@@ -244,7 +255,6 @@ class BaseSoSTest(Test):
raise TestSkipError(f"Unsupported architecture {sys_arch} for test "
f"(supports: {self.arch})")
-
def setUp(self):
"""Setup the tmpdir and any needed mocking for the test, then execute
the defined sos command. Ensure that we only run the sos command once
@@ -333,7 +343,9 @@ class BaseSoSTest(Test):
:param content: The string that should not be in stdout
:type content: ``str``
"""
- found = re.search(r"(.*)?%s(.*)?" % content, self.cmd_output.stdout + self.cmd_output.stderr)
+ found = re.search(
+ fr"(.*)?{content}(.*)?",
+ self.cmd_output.stdout + self.cmd_output.stderr)
assert found, "Content string '%s' not in output" % content
def assertOutputNotContains(self, content):
@@ -342,7 +354,9 @@ class BaseSoSTest(Test):
:param content: The string that should not be in stdout
:type content: ``str``
"""
- found = re.search(r"(.*)?%s(.*)?" % content, self.cmd_output.stdout + self.cmd_output.stderr)
+ found = re.search(
+ fr"(.*)?{content}(.*)?",
+ self.cmd_output.stdout + self.cmd_output.stderr)
assert not found, "String '%s' present in stdout" % content
@@ -365,7 +379,8 @@ class BaseSoSReportTest(BaseSoSTest):
def manifest(self):
if self._manifest is None:
try:
- content = self.read_file_from_tmpdir(self.get_name_in_archive('sos_reports/manifest.json'))
+ content = self.read_file_from_tmpdir(
+ self.get_name_in_archive('sos_reports/manifest.json'))
self._manifest = json.loads(content)
except Exception:
self._manifest = ''
@@ -381,7 +396,7 @@ class BaseSoSReportTest(BaseSoSTest):
cmd = ("gpg --batch --passphrase %s -o %s --decrypt %s"
% (self.encrypt_pass, _archive, archive))
try:
- res = process.run(cmd, timeout=10)
+ process.run(cmd, timeout=10)
except Exception as err:
if err.result.interrupted:
self.error("Timeout while decrypting")
@@ -424,8 +439,11 @@ class BaseSoSReportTest(BaseSoSTest):
override
"""
try:
- return re.findall(r'/.*sosreport-.*tar.*\.gpg', self.cmd_output.stdout)[-1]
- except:
+ return re.findall(
+ r'/.*sosreport-.*tar.*\.gpg',
+ self.cmd_output.stdout
+ )[-1]
+ except Exception:
return None
def _extract_archive(self, arc_path):
@@ -444,14 +462,21 @@ class BaseSoSReportTest(BaseSoSTest):
"""Based on the klass id setup earlier, provide a name to extract the
archive to within the tmpdir
"""
- return os.path.join(self.tmpdir, "sosreport-%s" % self.__class__.__name__)
-
+ return os.path.join(
+ self.tmpdir,
+ f"sosreport-{self.__class__.__name__}"
+ )
+
def _generate_sos_command(self):
- return "%s %s -v --batch --tmp-dir %s %s" % (self.sos_bin, self.sos_component, self.tmpdir, self.sos_cmd)
+ return (f"{self.sos_bin} {self.sos_component} -v --batch "
+ f"--tmp-dir {self.tmpdir} {self.sos_cmd}")
def _execute_sos_cmd(self):
super(BaseSoSReportTest, self)._execute_sos_cmd()
- self.archive = re.findall('/.*sosreport-.*tar.*', self.cmd_output.stdout)
+ self.archive = re.findall(
+ '/.*sosreport-.*tar.*',
+ self.cmd_output.stdout
+ )
if self.archive:
self.archive = self.archive[-1]
self._extract_archive(self.archive)
@@ -513,13 +538,17 @@ class BaseSoSReportTest(BaseSoSTest):
:type fname: ``str``
"""
if fname.startswith(('sos_', '/sos_')):
- files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/')))
+ files = glob.glob(
+ os.path.join(self.archive_path, fname.lstrip('/'))
+ )
elif not glob.glob(fname):
# force the test to pass since the file glob could not have been
# collected
files = True
else:
- files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/')))
+ files = glob.glob(
+ os.path.join(self.archive_path, fname.lstrip('/'))
+ )
assert files, "No files matching %s found" % fname
def assertFileGlobNotInArchive(self, fname):
@@ -531,7 +560,9 @@ class BaseSoSReportTest(BaseSoSTest):
"""
files = glob.glob(os.path.join(self.tmpdir, fname.lstrip('/')))
self.log.debug(files)
- assert not files, "Found files in archive matching %s: %s" % (fname, files)
+ assert \
+ not files, \
+ f"Found files in archive matching {fname}: {files}"
def assertFileHasContent(self, fname, content):
"""Ensure that the given file fname contains the given content
@@ -551,7 +582,9 @@ class BaseSoSReportTest(BaseSoSTest):
if re.match(".*%s.*" % content, line, re.I):
matched = True
break
- assert matched, "Content '%s' does not appear in %s\n%s" % (content, fname, _contents)
+ assert \
+ matched, \
+ f"Content '{content}' does not appear in {fname}\n{_contents}"
def assertFileNotHasContent(self, fname, content):
"""Ensure that the file file fname does NOT contain the given content
@@ -569,7 +602,9 @@ class BaseSoSReportTest(BaseSoSTest):
if re.match(".*%s.*" % content, line, re.I):
matched = True
break
- assert not matched, "Content '%s' appears in file %s" % (content, fname)
+ assert \
+ not matched, \
+ f"Content '{content}' appears in file {fname}"
def assertSosLogContains(self, content):
"""Ensure that the given content string exists in sos.log
@@ -600,11 +635,15 @@ class BaseSoSReportTest(BaseSoSTest):
:type plugin: `` str``
"""
if not self.manifest:
- self.error("No manifest found, cannot check for %s execution" % plugin)
+ self.error(
+ f"No manifest found, cannot check for {plugin} execution"
+ )
if isinstance(plugin, str):
plugin = [plugin]
for plug in plugin:
- assert plug in self.manifest['components']['report']['plugins'].keys(), "Plugin '%s' not recorded in manifest" % plug
+ assert \
+ plug in self.manifest['components']['report']['plugins'], \
+ f"Plugin '{plug}' not recorded in manifest"
def assertPluginNotIncluded(self, plugin):
"""Ensure that the specified plugin did NOT run for the sos execution
@@ -614,11 +653,15 @@ class BaseSoSReportTest(BaseSoSTest):
:type plugin: `` str``
"""
if not self.manifest:
- self.error("No manifest found, cannot check for %s execution" % plugin)
+ self.error(
+ f"No manifest found, cannot check for {plugin} execution"
+ )
if isinstance(plugin, str):
plugin = [plugin]
for plug in plugin:
- assert plug not in self.manifest['components']['report']['plugins'].keys(), "Plugin '%s' is recorded in manifest" % plug
+ assert \
+ plug not in self.manifest['components']['report']['plugins'], \
+ f"Plugin '{plug}' is recorded in manifest"
def assertOnlyPluginsIncluded(self, plugins):
"""Ensure that only the specified plugins are in the manifest
@@ -627,7 +670,9 @@ class BaseSoSReportTest(BaseSoSTest):
:type plugins: ``str`` or ``list`` of strings
"""
if not self.manifest:
- self.error("No manifest found, cannot check for %s execution" % plugins)
+ self.error(
+ f"No manifest found, cannot check for {plugins} execution"
+ )
if isinstance(plugins, str):
plugins = [plugins]
_executed = self.manifest['components']['report']['plugins'].keys()
@@ -686,7 +731,8 @@ class StageOneReportTest(BaseSoSReportTest):
_chk = re.findall('sha256\t.*\n', self.cmd_output.stdout)
_chk = _chk[0].split('sha256\t')[1].strip()
assert _chk, "No checksum reported"
- _found = process.run("sha256sum %s" % (self.encrypted_path or self.archive)).stdout.decode().split()[0]
+ cmd = f"sha256sum {(self.encrypted_path or self.archive)}"
+ _found = process.run(cmd).stdout.decode().split()[0]
self.assertEqual(_chk, _found)
def test_no_new_kmods_loaded(self):
@@ -706,7 +752,8 @@ class StageOneReportTest(BaseSoSReportTest):
def test_manifest_created(self):
self.assertFileCollected('sos_reports/manifest.json')
- @skipIf(lambda x: '--no-report' in x.sos_cmd, '--no-report used in command')
+ @skipIf(lambda x: '--no-report' in x.sos_cmd,
+ '--no-report used in command')
def test_html_reports_created(self):
self.assertFileCollected('sos_reports/sos.html')
@@ -816,10 +863,15 @@ class StageTwoReportTest(BaseSoSReportTest):
for plug in self.install_plugins:
if not plug.endswith('.py'):
plug += '.py'
- fake_plug = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), plug)
+ fake_plug = os.path.join(
+ os.path.dirname(inspect.getfile(self.__class__)),
+ plug
+ )
if os.path.exists(fake_plug):
shutil.copy(fake_plug, SOS_PLUGIN_DIR)
- _installed.append(os.path.realpath(os.path.join(SOS_PLUGIN_DIR, plug)))
+ _installed.append(
+ os.path.realpath(os.path.join(SOS_PLUGIN_DIR, plug))
+ )
self._write_file_to_tmpdir('mocked_plugins', json.dumps(_installed))
def teardown_mocked_plugins(self):
@@ -849,7 +901,10 @@ class StageTwoReportTest(BaseSoSReportTest):
% ', '.join(self.packages[self.local_distro])
)
# save installed package list to our tmpdir to be removed later
- self._write_file_to_tmpdir('mocked_packages', json.dumps(self.packages[self.local_distro]))
+ self._write_file_to_tmpdir(
+ 'mocked_packages',
+ json.dumps(self.packages[self.local_distro])
+ )
def _strip_installed_packages(self):
"""For the list of packages given for a test, if any of the packages
@@ -885,7 +940,10 @@ class StageTwoReportTest(BaseSoSReportTest):
os.makedirs(_dir)
self._created_files.append(_dir)
dir_added = True
- _test_file = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), src.lstrip('/'))
+ _test_file = os.path.join(
+ os.path.dirname(inspect.getfile(self.__class__)),
+ src.lstrip('/')
+ )
shutil.copy(_test_file, dest)
if not dir_added:
self._created_files.append(dest)
@@ -900,10 +958,14 @@ class StageTwoReportTest(BaseSoSReportTest):
"""
for mfile in self.files:
if not isinstance(mfile, tuple):
- raise Exception(f"Mocked files must be provided via tuples, not {mfile.__class__}")
+ raise Exception("Mocked files must be provided via tuples,"
+ f"not {mfile.__class__}")
self._copy_test_file(mfile)
if self._created_files:
- self._write_file_to_tmpdir('mocked_files', json.dumps(self._created_files))
+ self._write_file_to_tmpdir(
+ 'mocked_files',
+ json.dumps(self._created_files)
+ )
def teardown_mocked_files(self):
"""Remove any mocked files from the test system's filesystem, and
@@ -982,10 +1044,13 @@ class StageOneOutputTest(BaseSoSTest):
def test_help_output_successful(self):
self.assertTrue(self.cmd_output.exit_status == 0)
assert self.cmd_output.stdout, "No stdout output generated"
- assert not self.cmd_output.stderr, "stderr received, but not expected: %s" % self.cmd_output.stderr
+ assert not self.cmd_output.stderr, (
+ f"stderr received, but not expected: {self.cmd_output.stderr}")
- @skipIf(lambda x: not x._exception_expected, "Not anticipating stderr output")
+ @skipIf(lambda x: not x._exception_expected,
+ "Not anticipating stderr output")
def test_help_error_reported(self):
self.assertTrue(self.cmd_output.exit_status != 0)
- assert not self.cmd_output.stdout, "stdout received, but not expected: %s" % self.cmd_output.stdout
+ assert not self.cmd_output.stdout, (
+ f"stdout received, but not expected: {self.cmd_output.stdout}")
assert self.cmd_output.stderr, "No stderr output generated"
diff --git a/tests/vendor_tests/redhat/rhbz1950350/rhbz1950350.py b/tests/vendor_tests/redhat/rhbz1950350/rhbz1950350.py
index 991cb76d..50897811 100644
--- a/tests/vendor_tests/redhat/rhbz1950350/rhbz1950350.py
+++ b/tests/vendor_tests/redhat/rhbz1950350/rhbz1950350.py
@@ -26,13 +26,22 @@ class rhbz1950350(StageTwoReportTest):
sos_cmd = '-v -o sos_extras --clean'
def test_clean_config_loaded(self):
- self.assertSosLogContains("effective options now: (.*)? --clean --domains (.*)? --keywords (.*)?")
+ self.assertSosLogContains(
+ "effective options now: (.*)? --clean --domains (.*)? "
+ "--keywords (.*)?"
+ )
def test_clean_config_performed(self):
self.assertFileCollected('var/log/clean_config_test.txt')
- self.assertFileHasContent('var/log/clean_config_test.txt', 'The domain example.com should not be removed.')
+ self.assertFileHasContent(
+ 'var/log/clean_config_test.txt',
+ 'The domain example.com should not be removed.'
+ )
self.assertFileNotHasContent(
'var/log/clean_config_test.txt',
"This line contains 'shibboleth' which should be scrubbed."
)
- self.assertFileNotHasContent('var/log/clean_config_test.txt', 'sosexample.com')
+ self.assertFileNotHasContent(
+ 'var/log/clean_config_test.txt',
+ 'sosexample.com'
+ )
diff --git a/tests/vendor_tests/redhat/rhbz1965001.py b/tests/vendor_tests/redhat/rhbz1965001.py
index aa16ba81..2e0aa448 100644
--- a/tests/vendor_tests/redhat/rhbz1965001.py
+++ b/tests/vendor_tests/redhat/rhbz1965001.py
@@ -7,8 +7,6 @@
# See the LICENSE file in the source distribution for further information.
-import tempfile
-import shutil
from sos_tests import StageOneReportTest
diff --git a/tests/vendor_tests/redhat/rhbz2018033/rhbz2018033.py b/tests/vendor_tests/redhat/rhbz2018033/rhbz2018033.py
index 25b9090c..7a947db1 100644
--- a/tests/vendor_tests/redhat/rhbz2018033/rhbz2018033.py
+++ b/tests/vendor_tests/redhat/rhbz2018033/rhbz2018033.py
@@ -19,7 +19,8 @@ class rhbz2018033(StageTwoReportTest):
"""
install_plugins = ['timeout_test']
- sos_cmd = '-vvv -o timeout_test,networking -k timeout_test.timeout=1 --plugin-timeout=123'
+ sos_cmd = ('-vvv -o timeout_test,networking '
+ '-k timeout_test.timeout=1 --plugin-timeout=123')
def test_timeouts_separate(self):
self.assertSosUILogContains('Plugin timeout_test timed out')