diff options
Diffstat (limited to 'tests/sos_tests.py')
-rw-r--r-- | tests/sos_tests.py | 145 |
1 files changed, 105 insertions, 40 deletions
diff --git a/tests/sos_tests.py b/tests/sos_tests.py index 1fb31875..4ba98eac 100644 --- a/tests/sos_tests.py +++ b/tests/sos_tests.py @@ -25,13 +25,15 @@ import re SOS_TEST_DIR = os.path.dirname(os.path.realpath(__file__)) SOS_REPO_ROOT = os.path.realpath(os.path.join(SOS_TEST_DIR, '../')) -SOS_PLUGIN_DIR = os.path.realpath(os.path.join(SOS_REPO_ROOT, 'sos/report/plugins')) +SOS_PLUGIN_DIR = os.path.realpath( + os.path.join(SOS_REPO_ROOT, 'sos/report/plugins')) SOS_TEST_DATA_DIR = os.path.realpath(os.path.join(SOS_TEST_DIR, 'test_data')) SOS_TEST_BIN = os.path.realpath(os.path.join(SOS_TEST_DIR, '../bin/sos')) RH_DIST = ['rhel', 'centos', 'fedora'] UBUNTU_DIST = ['Ubuntu', 'debian'] + def skipIf(cond, message=None): def decorator(function): def wrapper(self, *args, **kwargs): @@ -43,18 +45,21 @@ def skipIf(cond, message=None): return wrapper return decorator + def redhat_only(tst): def wrapper(func): if distro.detect().name not in RH_DIST: raise TestSkipError('Not running on a Red Hat distro') return wrapper + def ubuntu_only(tst): def wrapper(func): if distro.detect().name not in UBUNTU_DIST: raise TestSkipError('Not running on a Ubuntu or Debian distro') return wrapper + class BaseSoSTest(Test): """Base class for all our test classes to build off of. @@ -77,18 +82,22 @@ class BaseSoSTest(Test): @property def klass_name(self): if not self._klass_name: - self._klass_name = os.path.basename(__file__) + '.' + self.__class__.__name__ + self._klass_name = (f"{os.path.basename(__file__)}." + f"{self.__class__.__name__}") return self._klass_name @property def tmpdir(self): if not self._tmpdir: - self._tmpdir = os.getenv('AVOCADO_TESTS_COMMON_TMPDIR') + self.klass_name + self._tmpdir = (f"{os.getenv('AVOCADO_TESTS_COMMON_TMPDIR')}" + f"{self.klass_name}") return self._tmpdir @property def sos_bin(self): - return self._local_sos_bin if self.params.get('TESTLOCAL') == 'true' else SOS_TEST_BIN + if self.params.get('TESTLOCAL') == 'true': + return self._local_sos_bin + return SOS_TEST_BIN def generate_sysinfo(self): """Collects some basic information about the system for later reference @@ -142,7 +151,9 @@ class BaseSoSTest(Test): # a little hacky, but using self.log methods here will not # print to console unless we ratchet up the verbosity for the # entire test suite, which will become very difficult to read - LOG_UI.error('ERROR:\n' + msg[:8196]) # don't flood w/ super verbose logs + + # don't flood w/ super verbose logs + LOG_UI.error('ERROR:\n' + msg[:8196]) if err.result.interrupted: raise Exception("Timeout exceeded, see output above") else: @@ -218,9 +229,9 @@ class BaseSoSTest(Test): This allows us to define distro-specific test classes much the same way we can define distro-specific tests _within_ a test class using the - appropriate decorators. We can't use the decorators for the class however - due to how avocado catches instantiation exceptions, so instead we need - to raise the skip exception after instantiation is done. + appropriate decorators. We can't use the decorators for the class + however due to how avocado catches instantiation exceptions, so instead + we need to raise the skip exception after instantiation is done. """ if self.redhat_only: if self.local_distro not in RH_DIST: @@ -244,7 +255,6 @@ class BaseSoSTest(Test): raise TestSkipError(f"Unsupported architecture {sys_arch} for test " f"(supports: {self.arch})") - def setUp(self): """Setup the tmpdir and any needed mocking for the test, then execute the defined sos command. Ensure that we only run the sos command once @@ -333,7 +343,9 @@ class BaseSoSTest(Test): :param content: The string that should not be in stdout :type content: ``str`` """ - found = re.search(r"(.*)?%s(.*)?" % content, self.cmd_output.stdout + self.cmd_output.stderr) + found = re.search( + fr"(.*)?{content}(.*)?", + self.cmd_output.stdout + self.cmd_output.stderr) assert found, "Content string '%s' not in output" % content def assertOutputNotContains(self, content): @@ -342,7 +354,9 @@ class BaseSoSTest(Test): :param content: The string that should not be in stdout :type content: ``str`` """ - found = re.search(r"(.*)?%s(.*)?" % content, self.cmd_output.stdout + self.cmd_output.stderr) + found = re.search( + fr"(.*)?{content}(.*)?", + self.cmd_output.stdout + self.cmd_output.stderr) assert not found, "String '%s' present in stdout" % content @@ -365,7 +379,8 @@ class BaseSoSReportTest(BaseSoSTest): def manifest(self): if self._manifest is None: try: - content = self.read_file_from_tmpdir(self.get_name_in_archive('sos_reports/manifest.json')) + content = self.read_file_from_tmpdir( + self.get_name_in_archive('sos_reports/manifest.json')) self._manifest = json.loads(content) except Exception: self._manifest = '' @@ -381,7 +396,7 @@ class BaseSoSReportTest(BaseSoSTest): cmd = ("gpg --batch --passphrase %s -o %s --decrypt %s" % (self.encrypt_pass, _archive, archive)) try: - res = process.run(cmd, timeout=10) + process.run(cmd, timeout=10) except Exception as err: if err.result.interrupted: self.error("Timeout while decrypting") @@ -424,8 +439,11 @@ class BaseSoSReportTest(BaseSoSTest): override """ try: - return re.findall(r'/.*sosreport-.*tar.*\.gpg', self.cmd_output.stdout)[-1] - except: + return re.findall( + r'/.*sosreport-.*tar.*\.gpg', + self.cmd_output.stdout + )[-1] + except Exception: return None def _extract_archive(self, arc_path): @@ -444,14 +462,21 @@ class BaseSoSReportTest(BaseSoSTest): """Based on the klass id setup earlier, provide a name to extract the archive to within the tmpdir """ - return os.path.join(self.tmpdir, "sosreport-%s" % self.__class__.__name__) - + return os.path.join( + self.tmpdir, + f"sosreport-{self.__class__.__name__}" + ) + def _generate_sos_command(self): - return "%s %s -v --batch --tmp-dir %s %s" % (self.sos_bin, self.sos_component, self.tmpdir, self.sos_cmd) + return (f"{self.sos_bin} {self.sos_component} -v --batch " + f"--tmp-dir {self.tmpdir} {self.sos_cmd}") def _execute_sos_cmd(self): super(BaseSoSReportTest, self)._execute_sos_cmd() - self.archive = re.findall('/.*sosreport-.*tar.*', self.cmd_output.stdout) + self.archive = re.findall( + '/.*sosreport-.*tar.*', + self.cmd_output.stdout + ) if self.archive: self.archive = self.archive[-1] self._extract_archive(self.archive) @@ -513,13 +538,17 @@ class BaseSoSReportTest(BaseSoSTest): :type fname: ``str`` """ if fname.startswith(('sos_', '/sos_')): - files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/'))) + files = glob.glob( + os.path.join(self.archive_path, fname.lstrip('/')) + ) elif not glob.glob(fname): # force the test to pass since the file glob could not have been # collected files = True else: - files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/'))) + files = glob.glob( + os.path.join(self.archive_path, fname.lstrip('/')) + ) assert files, "No files matching %s found" % fname def assertFileGlobNotInArchive(self, fname): @@ -531,7 +560,9 @@ class BaseSoSReportTest(BaseSoSTest): """ files = glob.glob(os.path.join(self.tmpdir, fname.lstrip('/'))) self.log.debug(files) - assert not files, "Found files in archive matching %s: %s" % (fname, files) + assert \ + not files, \ + f"Found files in archive matching {fname}: {files}" def assertFileHasContent(self, fname, content): """Ensure that the given file fname contains the given content @@ -551,7 +582,9 @@ class BaseSoSReportTest(BaseSoSTest): if re.match(".*%s.*" % content, line, re.I): matched = True break - assert matched, "Content '%s' does not appear in %s\n%s" % (content, fname, _contents) + assert \ + matched, \ + f"Content '{content}' does not appear in {fname}\n{_contents}" def assertFileNotHasContent(self, fname, content): """Ensure that the file file fname does NOT contain the given content @@ -569,7 +602,9 @@ class BaseSoSReportTest(BaseSoSTest): if re.match(".*%s.*" % content, line, re.I): matched = True break - assert not matched, "Content '%s' appears in file %s" % (content, fname) + assert \ + not matched, \ + f"Content '{content}' appears in file {fname}" def assertSosLogContains(self, content): """Ensure that the given content string exists in sos.log @@ -600,11 +635,15 @@ class BaseSoSReportTest(BaseSoSTest): :type plugin: `` str`` """ if not self.manifest: - self.error("No manifest found, cannot check for %s execution" % plugin) + self.error( + f"No manifest found, cannot check for {plugin} execution" + ) if isinstance(plugin, str): plugin = [plugin] for plug in plugin: - assert plug in self.manifest['components']['report']['plugins'].keys(), "Plugin '%s' not recorded in manifest" % plug + assert \ + plug in self.manifest['components']['report']['plugins'], \ + f"Plugin '{plug}' not recorded in manifest" def assertPluginNotIncluded(self, plugin): """Ensure that the specified plugin did NOT run for the sos execution @@ -614,11 +653,15 @@ class BaseSoSReportTest(BaseSoSTest): :type plugin: `` str`` """ if not self.manifest: - self.error("No manifest found, cannot check for %s execution" % plugin) + self.error( + f"No manifest found, cannot check for {plugin} execution" + ) if isinstance(plugin, str): plugin = [plugin] for plug in plugin: - assert plug not in self.manifest['components']['report']['plugins'].keys(), "Plugin '%s' is recorded in manifest" % plug + assert \ + plug not in self.manifest['components']['report']['plugins'], \ + f"Plugin '{plug}' is recorded in manifest" def assertOnlyPluginsIncluded(self, plugins): """Ensure that only the specified plugins are in the manifest @@ -627,7 +670,9 @@ class BaseSoSReportTest(BaseSoSTest): :type plugins: ``str`` or ``list`` of strings """ if not self.manifest: - self.error("No manifest found, cannot check for %s execution" % plugins) + self.error( + f"No manifest found, cannot check for {plugins} execution" + ) if isinstance(plugins, str): plugins = [plugins] _executed = self.manifest['components']['report']['plugins'].keys() @@ -686,7 +731,8 @@ class StageOneReportTest(BaseSoSReportTest): _chk = re.findall('sha256\t.*\n', self.cmd_output.stdout) _chk = _chk[0].split('sha256\t')[1].strip() assert _chk, "No checksum reported" - _found = process.run("sha256sum %s" % (self.encrypted_path or self.archive)).stdout.decode().split()[0] + cmd = f"sha256sum {(self.encrypted_path or self.archive)}" + _found = process.run(cmd).stdout.decode().split()[0] self.assertEqual(_chk, _found) def test_no_new_kmods_loaded(self): @@ -706,7 +752,8 @@ class StageOneReportTest(BaseSoSReportTest): def test_manifest_created(self): self.assertFileCollected('sos_reports/manifest.json') - @skipIf(lambda x: '--no-report' in x.sos_cmd, '--no-report used in command') + @skipIf(lambda x: '--no-report' in x.sos_cmd, + '--no-report used in command') def test_html_reports_created(self): self.assertFileCollected('sos_reports/sos.html') @@ -816,10 +863,15 @@ class StageTwoReportTest(BaseSoSReportTest): for plug in self.install_plugins: if not plug.endswith('.py'): plug += '.py' - fake_plug = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), plug) + fake_plug = os.path.join( + os.path.dirname(inspect.getfile(self.__class__)), + plug + ) if os.path.exists(fake_plug): shutil.copy(fake_plug, SOS_PLUGIN_DIR) - _installed.append(os.path.realpath(os.path.join(SOS_PLUGIN_DIR, plug))) + _installed.append( + os.path.realpath(os.path.join(SOS_PLUGIN_DIR, plug)) + ) self._write_file_to_tmpdir('mocked_plugins', json.dumps(_installed)) def teardown_mocked_plugins(self): @@ -849,7 +901,10 @@ class StageTwoReportTest(BaseSoSReportTest): % ', '.join(self.packages[self.local_distro]) ) # save installed package list to our tmpdir to be removed later - self._write_file_to_tmpdir('mocked_packages', json.dumps(self.packages[self.local_distro])) + self._write_file_to_tmpdir( + 'mocked_packages', + json.dumps(self.packages[self.local_distro]) + ) def _strip_installed_packages(self): """For the list of packages given for a test, if any of the packages @@ -885,7 +940,10 @@ class StageTwoReportTest(BaseSoSReportTest): os.makedirs(_dir) self._created_files.append(_dir) dir_added = True - _test_file = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), src.lstrip('/')) + _test_file = os.path.join( + os.path.dirname(inspect.getfile(self.__class__)), + src.lstrip('/') + ) shutil.copy(_test_file, dest) if not dir_added: self._created_files.append(dest) @@ -900,10 +958,14 @@ class StageTwoReportTest(BaseSoSReportTest): """ for mfile in self.files: if not isinstance(mfile, tuple): - raise Exception(f"Mocked files must be provided via tuples, not {mfile.__class__}") + raise Exception("Mocked files must be provided via tuples," + f"not {mfile.__class__}") self._copy_test_file(mfile) if self._created_files: - self._write_file_to_tmpdir('mocked_files', json.dumps(self._created_files)) + self._write_file_to_tmpdir( + 'mocked_files', + json.dumps(self._created_files) + ) def teardown_mocked_files(self): """Remove any mocked files from the test system's filesystem, and @@ -982,10 +1044,13 @@ class StageOneOutputTest(BaseSoSTest): def test_help_output_successful(self): self.assertTrue(self.cmd_output.exit_status == 0) assert self.cmd_output.stdout, "No stdout output generated" - assert not self.cmd_output.stderr, "stderr received, but not expected: %s" % self.cmd_output.stderr + assert not self.cmd_output.stderr, ( + f"stderr received, but not expected: {self.cmd_output.stderr}") - @skipIf(lambda x: not x._exception_expected, "Not anticipating stderr output") + @skipIf(lambda x: not x._exception_expected, + "Not anticipating stderr output") def test_help_error_reported(self): self.assertTrue(self.cmd_output.exit_status != 0) - assert not self.cmd_output.stdout, "stdout received, but not expected: %s" % self.cmd_output.stdout + assert not self.cmd_output.stdout, ( + f"stdout received, but not expected: {self.cmd_output.stdout}") assert self.cmd_output.stderr, "No stderr output generated" |