aboutsummaryrefslogtreecommitdiffstats
path: root/tests/sos_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sos_tests.py')
-rw-r--r--tests/sos_tests.py145
1 files changed, 105 insertions, 40 deletions
diff --git a/tests/sos_tests.py b/tests/sos_tests.py
index 1fb31875..4ba98eac 100644
--- a/tests/sos_tests.py
+++ b/tests/sos_tests.py
@@ -25,13 +25,15 @@ import re
SOS_TEST_DIR = os.path.dirname(os.path.realpath(__file__))
SOS_REPO_ROOT = os.path.realpath(os.path.join(SOS_TEST_DIR, '../'))
-SOS_PLUGIN_DIR = os.path.realpath(os.path.join(SOS_REPO_ROOT, 'sos/report/plugins'))
+SOS_PLUGIN_DIR = os.path.realpath(
+ os.path.join(SOS_REPO_ROOT, 'sos/report/plugins'))
SOS_TEST_DATA_DIR = os.path.realpath(os.path.join(SOS_TEST_DIR, 'test_data'))
SOS_TEST_BIN = os.path.realpath(os.path.join(SOS_TEST_DIR, '../bin/sos'))
RH_DIST = ['rhel', 'centos', 'fedora']
UBUNTU_DIST = ['Ubuntu', 'debian']
+
def skipIf(cond, message=None):
def decorator(function):
def wrapper(self, *args, **kwargs):
@@ -43,18 +45,21 @@ def skipIf(cond, message=None):
return wrapper
return decorator
+
def redhat_only(tst):
def wrapper(func):
if distro.detect().name not in RH_DIST:
raise TestSkipError('Not running on a Red Hat distro')
return wrapper
+
def ubuntu_only(tst):
def wrapper(func):
if distro.detect().name not in UBUNTU_DIST:
raise TestSkipError('Not running on a Ubuntu or Debian distro')
return wrapper
+
class BaseSoSTest(Test):
"""Base class for all our test classes to build off of.
@@ -77,18 +82,22 @@ class BaseSoSTest(Test):
@property
def klass_name(self):
if not self._klass_name:
- self._klass_name = os.path.basename(__file__) + '.' + self.__class__.__name__
+ self._klass_name = (f"{os.path.basename(__file__)}."
+ f"{self.__class__.__name__}")
return self._klass_name
@property
def tmpdir(self):
if not self._tmpdir:
- self._tmpdir = os.getenv('AVOCADO_TESTS_COMMON_TMPDIR') + self.klass_name
+ self._tmpdir = (f"{os.getenv('AVOCADO_TESTS_COMMON_TMPDIR')}"
+ f"{self.klass_name}")
return self._tmpdir
@property
def sos_bin(self):
- return self._local_sos_bin if self.params.get('TESTLOCAL') == 'true' else SOS_TEST_BIN
+ if self.params.get('TESTLOCAL') == 'true':
+ return self._local_sos_bin
+ return SOS_TEST_BIN
def generate_sysinfo(self):
"""Collects some basic information about the system for later reference
@@ -142,7 +151,9 @@ class BaseSoSTest(Test):
# a little hacky, but using self.log methods here will not
# print to console unless we ratchet up the verbosity for the
# entire test suite, which will become very difficult to read
- LOG_UI.error('ERROR:\n' + msg[:8196]) # don't flood w/ super verbose logs
+
+ # don't flood w/ super verbose logs
+ LOG_UI.error('ERROR:\n' + msg[:8196])
if err.result.interrupted:
raise Exception("Timeout exceeded, see output above")
else:
@@ -218,9 +229,9 @@ class BaseSoSTest(Test):
This allows us to define distro-specific test classes much the same way
we can define distro-specific tests _within_ a test class using the
- appropriate decorators. We can't use the decorators for the class however
- due to how avocado catches instantiation exceptions, so instead we need
- to raise the skip exception after instantiation is done.
+ appropriate decorators. We can't use the decorators for the class
+ however due to how avocado catches instantiation exceptions, so instead
+ we need to raise the skip exception after instantiation is done.
"""
if self.redhat_only:
if self.local_distro not in RH_DIST:
@@ -244,7 +255,6 @@ class BaseSoSTest(Test):
raise TestSkipError(f"Unsupported architecture {sys_arch} for test "
f"(supports: {self.arch})")
-
def setUp(self):
"""Setup the tmpdir and any needed mocking for the test, then execute
the defined sos command. Ensure that we only run the sos command once
@@ -333,7 +343,9 @@ class BaseSoSTest(Test):
:param content: The string that should not be in stdout
:type content: ``str``
"""
- found = re.search(r"(.*)?%s(.*)?" % content, self.cmd_output.stdout + self.cmd_output.stderr)
+ found = re.search(
+ fr"(.*)?{content}(.*)?",
+ self.cmd_output.stdout + self.cmd_output.stderr)
assert found, "Content string '%s' not in output" % content
def assertOutputNotContains(self, content):
@@ -342,7 +354,9 @@ class BaseSoSTest(Test):
:param content: The string that should not be in stdout
:type content: ``str``
"""
- found = re.search(r"(.*)?%s(.*)?" % content, self.cmd_output.stdout + self.cmd_output.stderr)
+ found = re.search(
+ fr"(.*)?{content}(.*)?",
+ self.cmd_output.stdout + self.cmd_output.stderr)
assert not found, "String '%s' present in stdout" % content
@@ -365,7 +379,8 @@ class BaseSoSReportTest(BaseSoSTest):
def manifest(self):
if self._manifest is None:
try:
- content = self.read_file_from_tmpdir(self.get_name_in_archive('sos_reports/manifest.json'))
+ content = self.read_file_from_tmpdir(
+ self.get_name_in_archive('sos_reports/manifest.json'))
self._manifest = json.loads(content)
except Exception:
self._manifest = ''
@@ -381,7 +396,7 @@ class BaseSoSReportTest(BaseSoSTest):
cmd = ("gpg --batch --passphrase %s -o %s --decrypt %s"
% (self.encrypt_pass, _archive, archive))
try:
- res = process.run(cmd, timeout=10)
+ process.run(cmd, timeout=10)
except Exception as err:
if err.result.interrupted:
self.error("Timeout while decrypting")
@@ -424,8 +439,11 @@ class BaseSoSReportTest(BaseSoSTest):
override
"""
try:
- return re.findall(r'/.*sosreport-.*tar.*\.gpg', self.cmd_output.stdout)[-1]
- except:
+ return re.findall(
+ r'/.*sosreport-.*tar.*\.gpg',
+ self.cmd_output.stdout
+ )[-1]
+ except Exception:
return None
def _extract_archive(self, arc_path):
@@ -444,14 +462,21 @@ class BaseSoSReportTest(BaseSoSTest):
"""Based on the klass id setup earlier, provide a name to extract the
archive to within the tmpdir
"""
- return os.path.join(self.tmpdir, "sosreport-%s" % self.__class__.__name__)
-
+ return os.path.join(
+ self.tmpdir,
+ f"sosreport-{self.__class__.__name__}"
+ )
+
def _generate_sos_command(self):
- return "%s %s -v --batch --tmp-dir %s %s" % (self.sos_bin, self.sos_component, self.tmpdir, self.sos_cmd)
+ return (f"{self.sos_bin} {self.sos_component} -v --batch "
+ f"--tmp-dir {self.tmpdir} {self.sos_cmd}")
def _execute_sos_cmd(self):
super(BaseSoSReportTest, self)._execute_sos_cmd()
- self.archive = re.findall('/.*sosreport-.*tar.*', self.cmd_output.stdout)
+ self.archive = re.findall(
+ '/.*sosreport-.*tar.*',
+ self.cmd_output.stdout
+ )
if self.archive:
self.archive = self.archive[-1]
self._extract_archive(self.archive)
@@ -513,13 +538,17 @@ class BaseSoSReportTest(BaseSoSTest):
:type fname: ``str``
"""
if fname.startswith(('sos_', '/sos_')):
- files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/')))
+ files = glob.glob(
+ os.path.join(self.archive_path, fname.lstrip('/'))
+ )
elif not glob.glob(fname):
# force the test to pass since the file glob could not have been
# collected
files = True
else:
- files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/')))
+ files = glob.glob(
+ os.path.join(self.archive_path, fname.lstrip('/'))
+ )
assert files, "No files matching %s found" % fname
def assertFileGlobNotInArchive(self, fname):
@@ -531,7 +560,9 @@ class BaseSoSReportTest(BaseSoSTest):
"""
files = glob.glob(os.path.join(self.tmpdir, fname.lstrip('/')))
self.log.debug(files)
- assert not files, "Found files in archive matching %s: %s" % (fname, files)
+ assert \
+ not files, \
+ f"Found files in archive matching {fname}: {files}"
def assertFileHasContent(self, fname, content):
"""Ensure that the given file fname contains the given content
@@ -551,7 +582,9 @@ class BaseSoSReportTest(BaseSoSTest):
if re.match(".*%s.*" % content, line, re.I):
matched = True
break
- assert matched, "Content '%s' does not appear in %s\n%s" % (content, fname, _contents)
+ assert \
+ matched, \
+ f"Content '{content}' does not appear in {fname}\n{_contents}"
def assertFileNotHasContent(self, fname, content):
"""Ensure that the file file fname does NOT contain the given content
@@ -569,7 +602,9 @@ class BaseSoSReportTest(BaseSoSTest):
if re.match(".*%s.*" % content, line, re.I):
matched = True
break
- assert not matched, "Content '%s' appears in file %s" % (content, fname)
+ assert \
+ not matched, \
+ f"Content '{content}' appears in file {fname}"
def assertSosLogContains(self, content):
"""Ensure that the given content string exists in sos.log
@@ -600,11 +635,15 @@ class BaseSoSReportTest(BaseSoSTest):
:type plugin: `` str``
"""
if not self.manifest:
- self.error("No manifest found, cannot check for %s execution" % plugin)
+ self.error(
+ f"No manifest found, cannot check for {plugin} execution"
+ )
if isinstance(plugin, str):
plugin = [plugin]
for plug in plugin:
- assert plug in self.manifest['components']['report']['plugins'].keys(), "Plugin '%s' not recorded in manifest" % plug
+ assert \
+ plug in self.manifest['components']['report']['plugins'], \
+ f"Plugin '{plug}' not recorded in manifest"
def assertPluginNotIncluded(self, plugin):
"""Ensure that the specified plugin did NOT run for the sos execution
@@ -614,11 +653,15 @@ class BaseSoSReportTest(BaseSoSTest):
:type plugin: `` str``
"""
if not self.manifest:
- self.error("No manifest found, cannot check for %s execution" % plugin)
+ self.error(
+ f"No manifest found, cannot check for {plugin} execution"
+ )
if isinstance(plugin, str):
plugin = [plugin]
for plug in plugin:
- assert plug not in self.manifest['components']['report']['plugins'].keys(), "Plugin '%s' is recorded in manifest" % plug
+ assert \
+ plug not in self.manifest['components']['report']['plugins'], \
+ f"Plugin '{plug}' is recorded in manifest"
def assertOnlyPluginsIncluded(self, plugins):
"""Ensure that only the specified plugins are in the manifest
@@ -627,7 +670,9 @@ class BaseSoSReportTest(BaseSoSTest):
:type plugins: ``str`` or ``list`` of strings
"""
if not self.manifest:
- self.error("No manifest found, cannot check for %s execution" % plugins)
+ self.error(
+ f"No manifest found, cannot check for {plugins} execution"
+ )
if isinstance(plugins, str):
plugins = [plugins]
_executed = self.manifest['components']['report']['plugins'].keys()
@@ -686,7 +731,8 @@ class StageOneReportTest(BaseSoSReportTest):
_chk = re.findall('sha256\t.*\n', self.cmd_output.stdout)
_chk = _chk[0].split('sha256\t')[1].strip()
assert _chk, "No checksum reported"
- _found = process.run("sha256sum %s" % (self.encrypted_path or self.archive)).stdout.decode().split()[0]
+ cmd = f"sha256sum {(self.encrypted_path or self.archive)}"
+ _found = process.run(cmd).stdout.decode().split()[0]
self.assertEqual(_chk, _found)
def test_no_new_kmods_loaded(self):
@@ -706,7 +752,8 @@ class StageOneReportTest(BaseSoSReportTest):
def test_manifest_created(self):
self.assertFileCollected('sos_reports/manifest.json')
- @skipIf(lambda x: '--no-report' in x.sos_cmd, '--no-report used in command')
+ @skipIf(lambda x: '--no-report' in x.sos_cmd,
+ '--no-report used in command')
def test_html_reports_created(self):
self.assertFileCollected('sos_reports/sos.html')
@@ -816,10 +863,15 @@ class StageTwoReportTest(BaseSoSReportTest):
for plug in self.install_plugins:
if not plug.endswith('.py'):
plug += '.py'
- fake_plug = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), plug)
+ fake_plug = os.path.join(
+ os.path.dirname(inspect.getfile(self.__class__)),
+ plug
+ )
if os.path.exists(fake_plug):
shutil.copy(fake_plug, SOS_PLUGIN_DIR)
- _installed.append(os.path.realpath(os.path.join(SOS_PLUGIN_DIR, plug)))
+ _installed.append(
+ os.path.realpath(os.path.join(SOS_PLUGIN_DIR, plug))
+ )
self._write_file_to_tmpdir('mocked_plugins', json.dumps(_installed))
def teardown_mocked_plugins(self):
@@ -849,7 +901,10 @@ class StageTwoReportTest(BaseSoSReportTest):
% ', '.join(self.packages[self.local_distro])
)
# save installed package list to our tmpdir to be removed later
- self._write_file_to_tmpdir('mocked_packages', json.dumps(self.packages[self.local_distro]))
+ self._write_file_to_tmpdir(
+ 'mocked_packages',
+ json.dumps(self.packages[self.local_distro])
+ )
def _strip_installed_packages(self):
"""For the list of packages given for a test, if any of the packages
@@ -885,7 +940,10 @@ class StageTwoReportTest(BaseSoSReportTest):
os.makedirs(_dir)
self._created_files.append(_dir)
dir_added = True
- _test_file = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), src.lstrip('/'))
+ _test_file = os.path.join(
+ os.path.dirname(inspect.getfile(self.__class__)),
+ src.lstrip('/')
+ )
shutil.copy(_test_file, dest)
if not dir_added:
self._created_files.append(dest)
@@ -900,10 +958,14 @@ class StageTwoReportTest(BaseSoSReportTest):
"""
for mfile in self.files:
if not isinstance(mfile, tuple):
- raise Exception(f"Mocked files must be provided via tuples, not {mfile.__class__}")
+ raise Exception("Mocked files must be provided via tuples,"
+ f"not {mfile.__class__}")
self._copy_test_file(mfile)
if self._created_files:
- self._write_file_to_tmpdir('mocked_files', json.dumps(self._created_files))
+ self._write_file_to_tmpdir(
+ 'mocked_files',
+ json.dumps(self._created_files)
+ )
def teardown_mocked_files(self):
"""Remove any mocked files from the test system's filesystem, and
@@ -982,10 +1044,13 @@ class StageOneOutputTest(BaseSoSTest):
def test_help_output_successful(self):
self.assertTrue(self.cmd_output.exit_status == 0)
assert self.cmd_output.stdout, "No stdout output generated"
- assert not self.cmd_output.stderr, "stderr received, but not expected: %s" % self.cmd_output.stderr
+ assert not self.cmd_output.stderr, (
+ f"stderr received, but not expected: {self.cmd_output.stderr}")
- @skipIf(lambda x: not x._exception_expected, "Not anticipating stderr output")
+ @skipIf(lambda x: not x._exception_expected,
+ "Not anticipating stderr output")
def test_help_error_reported(self):
self.assertTrue(self.cmd_output.exit_status != 0)
- assert not self.cmd_output.stdout, "stdout received, but not expected: %s" % self.cmd_output.stdout
+ assert not self.cmd_output.stdout, (
+ f"stdout received, but not expected: {self.cmd_output.stdout}")
assert self.cmd_output.stderr, "No stderr output generated"