diff options
Diffstat (limited to 'tests/sos_tests.py')
-rw-r--r-- | tests/sos_tests.py | 490 |
1 files changed, 490 insertions, 0 deletions
diff --git a/tests/sos_tests.py b/tests/sos_tests.py new file mode 100644 index 00000000..8da0195d --- /dev/null +++ b/tests/sos_tests.py @@ -0,0 +1,490 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + + +from avocado.core.exceptions import TestSkipError +from avocado import Test +from avocado.utils import archive, process +from fnmatch import fnmatch + +import glob +import json +import os +import pickle +import socket +import re + +SOS_TEST_DIR = os.path.dirname(os.path.realpath(__file__)) +SOS_BIN = os.path.realpath(os.path.join(SOS_TEST_DIR, '../bin/sos')) + + +def skipIf(cond, message=None): + def decorator(function): + def wrapper(self, *args, **kwargs): + if callable(cond): + if cond(self): + raise TestSkipError(message) + elif cond: + raise TestSkipError(message) + return wrapper + return decorator + + +class BaseSoSTest(Test): + """Base class for all our test classes to build off of. + + Subclasses avocado.Test and then adds wrappers and helper methods that are + needed across sos components. Component specific test classes should in + turn subclass ``BaseSoSTest`` rather than ``avocado.Test`` directly + """ + + _klass_name = None + _tmpdir = None + sos_cmd = '' + + @property + def klass_name(self): + if not self._klass_name: + self._klass_name = os.path.basename(__file__) + '.' + self.__class__.__name__ + return self._klass_name + + @property + def tmpdir(self): + if not self._tmpdir: + self._tmpdir = os.getenv('AVOCADO_TESTS_COMMON_TMPDIR') + self.klass_name + return self._tmpdir + + def generate_sysinfo(self): + """Collects some basic information about the system for later reference + in individual tests + """ + sysinfo = {} + + # get kernel modules + mods = [] + _out = process.run('lsmod').stdout.decode() + for line in _out.splitlines()[1:]: + mods.append(line.split()[0]) + # this particular kmod is both innocuous and unpredictable in terms of + # pre-loading even within the same distribution. For now, turn a blind + # eye to it with regards to the "no new kmods loaded" perspective + if 'binfmt_misc' in mods: + mods.remove('binfmt_misc') + sysinfo['modules'] = sorted(mods, key=str.lower) + + # get networking info + hostname = socket.gethostname() + ip_addr = socket.gethostbyname(hostname) + sysinfo['networking'] = {} + sysinfo['networking']['hostname'] = hostname + sysinfo['networking']['ip_addr'] = ip_addr + + return sysinfo + + def _write_file_to_tmpdir(self, fname, content): + """Write the given content to fname within the test's tmpdir + """ + fname = os.path.join(self.tmpdir, fname) + if isinstance(content, bytes): + content = content.decode() + with open(fname, 'w') as wfile: + wfile.write(content) + + def read_file_from_tmpdir(self, fname): + fname = os.path.join(self.tmpdir, fname) + with open(fname, 'r') as tfile: + return tfile.read() + return '' + + def _write_sysinfo(self, fname): + """Get the current state of sysinfo and write it into our shared + tempdir so it can be loaded in setUp() later + + :param fname: The name of the file to be written in the tempdir + :type fname: ``str`` + """ + sysinfo = self.generate_sysinfo() + self._write_file_to_tmpdir(fname, json.dumps(sysinfo)) + + def _read_sysinfo(self, fname): + sysinfo = {} + content = self.read_file_from_tmpdir(fname) + if content: + sysinfo = json.loads(content) + return sysinfo + + def set_pre_sysinfo(self): + self._write_sysinfo('pre_sysinfo') + + def get_pre_sysinfo(self): + return self._read_sysinfo('pre_sysinfo') + + def set_post_sysinfo(self): + self._write_sysinfo('post_sysinfo') + + def get_post_sysinfo(self): + return self._read_sysinfo('post_sysinfo') + + def get_sysinfo(self): + sinfo = { + 'pre': self.get_pre_sysinfo(), + 'post': self.get_post_sysinfo() + } + return sinfo + + def assertFileExists(self, fname): + """Asserts that fname exists on the filesystem""" + assert os.path.exists(fname), "%s does not exist" % fname + + def assertFileNotExists(self, fname): + """Asserts that fname does not exist on the filesystem""" + assert not os.path.exists(fname), "%s exists" % fname + + +class BaseSoSReportTest(BaseSoSTest): + """This is the class to use for building sos report tests with. + + An instance of this test is expected to set at minimum a ``sos_cmd`` class + attribute that represets the options handed to a specific execution of an + sos command. This should be anything following ``sos report --batch``. + + """ + + archive = None + _manifest = None + + + @property + def manifest(self): + if self._manifest is None: + try: + content = self.read_file_from_tmpdir(self.get_name_in_archive('sos_reports/manifest.json')) + self._manifest = json.loads(content) + except Exception: + self._manifest = '' + self.warn('Could not load manifest for test') + return self._manifest + + def _extract_archive(self, arc_path): + """Extract an archive to the temp directory + """ + _extract_path = self._get_extracted_tarball_path() + try: + archive.extract(arc_path, _extract_path) + self.archive_path = self._get_archive_path() + except Exception as err: + self.cancel("Could not extract archive: %s" % err) + + def _get_extracted_tarball_path(self): + """Based on the klass id setup earlier, provide a name to extract the + archive to within the tmpdir + """ + return os.path.join(self.tmpdir, "sosreport-%s" % self.__class__.__name__) + + + def _execute_sos_cmd(self): + """Run the sos command for this test case, and extract it + """ + _cmd = '%s report --batch --tmp-dir %s %s' + exec_cmd = _cmd % (SOS_BIN, self.tmpdir, self.sos_cmd) + self.cmd_output = process.run(exec_cmd, timeout=300) + with open(os.path.join(self.tmpdir, 'output'), 'wb') as pfile: + pickle.dump(self.cmd_output, pfile) + self.cmd_output.stdout = self.cmd_output.stdout.decode() + self.cmd_output.stderr = self.cmd_output.stderr.decode() + self.archive = re.findall('/.*sosreport-.*tar.*', self.cmd_output.stdout)[-1] + if self.archive: + self._extract_archive(self.archive) + + + def _setup_tmpdir(self): + if not os.path.isdir(self.tmpdir): + os.mkdir(self.tmpdir) + + def _get_archive_path(self): + return glob.glob(self._get_extracted_tarball_path() + '/sosreport*')[0] + + def setup_mocking(self): + """Since we need to use setUp() in our overrides of avocado.Test, + provide an alternate method for test cases that subclass BaseSoSTest + to use. + """ + pass + + def setUp(self): + """Execute and extract the sos report to our temporary location, then + call sos_setup() for individual test case setup and/or mocking. + """ + # check to prevent multiple setUp() runs + if not os.path.isdir(self.tmpdir): + # setup our class-shared tmpdir + self._setup_tmpdir() + + # do our mocking called for in sos_setup + self.setup_mocking() + + # gather some pre-execution information + self.set_pre_sysinfo() + + # run the sos command for this test case + self._execute_sos_cmd() + self.set_post_sysinfo() + else: + with open(os.path.join(self.tmpdir, 'output'), 'rb') as pfile: + self.cmd_output = pickle.load(pfile) + if isinstance(self.cmd_output.stdout, bytes): + self.cmd_output.stdout = self.cmd_output.stdout.decode() + self.cmd_output.stderr = self.cmd_output.stderr.decode() + for f in os.listdir(self.tmpdir): + if fnmatch(f, 'sosreport*.tar.??'): + self.archive = os.path.join(self.tmpdir, f) + break + self.sysinfo = self.get_sysinfo() + self.archive_path = self._get_archive_path() + + def get_name_in_archive(self, fname): + """Get the full path to fname as it (would) exist in the archive + """ + return os.path.join(self.archive_path, fname.lstrip('/')) + + def get_file_content(self, fname): + """Reads the content of fname from within the archive and returns it + + :param fname: The name of the file + :type fname: ``str`` + + :returns: Content of fname + :rtype: ``str`` + """ + content = '' + with open(self.get_name_in_archive(fname), 'r') as gfile: + content = gfile.read() + return content + + def assertFileCollected(self, fname): + """Ensure that a given fname is in the extracted archive if it exists + on the host system + + :param fname: The name of the file within the archive + :type fname: ``str`` + """ + if os.path.exists(fname): + self.assertFileExists(self.get_name_in_archive(fname)) + assert True + + def assertFileNotCollected(self, fname): + """Ensure that a given fname is NOT in the extracted archive + + :param fname: The name of the file within the archive + :type fname: ``str`` + """ + self.assertFileNotExists(self.get_name_in_archive(fname)) + + def assertFileGlobInArchive(self, fname): + """Ensure that at least one file in the archive matches a given fname + glob + + :param fname: The glob to match filenames of + :type fname: ``str`` + """ + files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/'))) + assert files, "No files matching %s found" % fname + + def assertFileGlobNotInArchive(self, fname): + """Ensure that there are NO files in the archive matching a given fname + glob + + :param fname: The glob to match filename(s) of + :type fname: ``str`` + """ + files = glob.glob(os.path.join(self.tmpdir, fname.lstrip('/'))) + self.log.debug(files) + assert not files, "Found files in archive matching %s: %s" % (fname, files) + + def assertFileHasContent(self, fname, content): + """Ensure that the given file fname contains the given content + + :param fname: The name of the file + :type fname: ``str`` + + :param content: The content to match + :type content: ``str`` + """ + matched = False + fname = self.get_name_in_archive(fname) + self.assertFileExists(fname) + with open(fname, 'r') as lfile: + _contents = lfile.read() + for line in _contents.splitlines(): + if re.match(".*%s.*" % content, line, re.I): + matched = True + break + assert matched, "Content '%s' does not appear in %s\n%s" % (content, fname, _contents) + + def assertFileNotHasContent(self, fname, content): + """Ensure that the file file fname does NOT contain the given content + + :param fname: The name of the file + :type fname: ``str`` + + :param content: The content to (not) match + :type content: ``str`` + """ + matched = False + fname = self.get_name_in_archive(fname) + with open(fname, 'r') as mfile: + for line in mfile.read().splitlines(): + if re.match(".*%s.*" % content, line, re.I): + matched = True + break + assert not matched, "Content '%s' appears in file %s" % (content, fname) + + def assertSosLogContains(self, content): + """Ensure that the given content string exists in sos.log + """ + self.assertFileHasContent('sos_logs/sos.log', content) + + def assertSosLogNotContains(self, content): + """Ensure that the given content string does NOT exist in sos.log + """ + self.assertFileNotHasContent('sos_logs/sos.log', content) + + def assertOutputContains(self, content): + """Ensure that stdout did contain the given content string + + :param content: The string that should not be in stdout + :type content: ``str`` + """ + assert content in self.cmd_output.stdout, 'Content string not in output' + + def assertOutputNotContains(self, content): + """Ensure that stdout did NOT contain the given content string + + :param content: The string that should not be in stdout + :type content: ``str`` + """ + assert not re.match(".*%s.*" % content, self.cmd_output.stdout), "String '%s' present in stdout" % content + + def assertPluginIncluded(self, plugin): + """Ensure that the specified plugin did run for the sos execution + + Note that this relies on manifest.json being successfully created + + :param plugin: The name of the plugin + :type plugin: `` str`` + """ + if not self.manifest: + self.error("No manifest found, cannot check for %s execution" % plugin) + assert plugin in self.manifest['components']['report']['plugins'].keys(), 'Plugin not recorded in manifest' + + def assertPluginNotIncluded(self, plugin): + """Ensure that the specified plugin did NOT run for the sos execution + Note that this relies on manifest.json being successfully created + + :param plugin: The name of the plugin + :type plugin: `` str`` + """ + if not self.manifest: + self.error("No manifest found, cannot check for %s execution" % plugin) + assert plugin not in self.manifest['components']['report']['plugins'].keys(), 'Plugin is recorded in manifest' + + def assertOnlyPluginsIncluded(self, plugins): + """Ensure that only the specified plugins are in the manifest + + :param plugins: The plugin names + :type plugins: ``str`` or ``list`` of strings + """ + if not self.manifest: + self.error("No manifest found, cannot check for %s execution" % plugins) + if isinstance(plugins, str): + plugins = [plugins] + _executed = self.manifest['components']['report']['plugins'].keys() + + # test that all requested plugins did run + for i in plugins: + assert i in _executed, "Requested plugin '%s' did not run" % i + + # test that no unrequested plugins ran + for j in _executed: + assert j in plugins, "Unrequested plugin '%s' ran as well" % j + +class StageOneReportTest(BaseSoSReportTest): + """This is the test class to subclass for all Stage One (no mocking) tests + within the sos test suite. + + In addition to any test_* methods defined in the test cases that subclass + this, the methods defined here will ALSO run, to ensure basic consistency + across test cases + + NOTE: You MUST replace the following line in the docstring of your own + test cases, as otherwise the test will be disabled. This line is here to + prevent this base class from being treated as a valid test case. Also, if + you add any tests to this base class, make sure to add a line such as + ':avocado: tags=stageone' to ensure the base tests run with new test cases + + :avocado: disable + :avocado: tags=stageone + """ + + sos_cmd = '' + + def test_archive_created(self): + """Ensure that the archive tarball was created and has the right owner + + :avocado: tags=stageone + """ + self.assertFileExists(self.archive) + self.assertTrue(os.stat(self.archive).st_uid == 0) + + def test_no_new_kmods_loaded(self): + """Ensure that no additional kernel modules have been loaded during an + execution of a test + + :avocado: tags=stageone + """ + self.assertCountEqual(self.sysinfo['pre']['modules'], + self.sysinfo['post']['modules']) + + def test_archive_has_sos_dirs(self): + """Ensure that we have the expected directory layout with in the + archive + + :avocado: tags=stageone + """ + self.assertFileCollected('sos_commands') + self.assertFileCollected('sos_logs') + + def test_manifest_created(self): + """ + :avocado: tags=stageone + """ + self.assertFileCollected('sos_reports/manifest.json') + + @skipIf(lambda x: '--no-report' in x.sos_cmd, '--no-report used in command') + def test_html_reports_created(self): + """ + :avocado: tags=stageone + """ + self.assertFileCollected('sos_reports/sos.html') + + def test_no_exceptions_during_execution(self): + """ + :avocado: tags=stageone + """ + self.assertSosLogNotContains('caught exception in plugin') + self.assertFileGlobNotInArchive('sos_logs/*-plugin-errors.txt') + + def test_no_ip_changes(self): + """ + :avocado: tags=stageone + """ + # I.E. make sure we didn't cause any NIC flaps that for some reason + # resulted in a new primary IP address. TODO: build this out to make + # sure this IP is still bound to the same NIC + self.assertEqual(self.sysinfo['pre']['networking']['ip_addr'], + self.sysinfo['post']['networking']['ip_addr']) |