diff options
author | Jake Hunsaker <jhunsake@redhat.com> | 2021-01-27 09:55:39 -0500 |
---|---|---|
committer | Jake Hunsaker <jhunsake@redhat.com> | 2021-04-15 11:33:03 -0400 |
commit | f568fb219fff5b502882d591ddcc6bd5cc84c7a2 (patch) | |
tree | a37485633c469bfd4d166116da86a0b574446753 /tests/unittests | |
parent | 9191c690a3311b81eeb3480d4f054e93d1ce75e3 (diff) | |
download | sos-f568fb219fff5b502882d591ddcc6bd5cc84c7a2.tar.gz |
[tests] Start using avocado for test suite
This commit represents the start of an overhaul of the test suite used
by sos. Note that several more commits to follow will be required in
order for the test suite to be considered stable.
The new test suite will use the avocado-framework to build out new
tests.
This first part adopts a new 'stageX' naming scheme for our tests as
follows:
stage0 -> Unittests
stage1 -> Basic function tests, no mocking allowed
stage2 -> Mocked tests for specific scenarios/regressions
stage3 -> Complex setups for layered products/environments
At the moment, these unittests are not updated for avocado, though most
should still work with `nosetest` directly.
A new set of base classes is defined in tests/sos_tests.py which provide
the foundation for actual tests cases. This approach entails new test
cases subclassing a base class, such as the new `StageOneReportTest`,
and setting the `sos_cmd` class attr to the _options_ for an sos report
run. By default `sos report --batch` will be run, and targeted to the
test job's directory as a tmpdir.
Each sos command will be executed once, and all test_* methods within a
test case that subclasses `StageOneReportTest` will be checked against
the output of that execution. Note that this diverges from avocado's
typical approach where each test_* method is performed against a brand
new instance of the class (thus meaning any setup including our sos
report run would normally be run fresh). However, after speaking with
the avocado devel team, this is still seen as a valid pattern for the
framework.
The current organizational approach is to separate the tests by
component rather than stage. For example. `tests/report_tests/` should
hold any report-centric tests, and the `plugin_tests` directory therein
should be used for plugin-specific tests. As of this commit, there are
basic functionality tests under `tests/report_tests/` and a single
plugin test under `tests/report_tests/plugin_tests/` to act as a POC.
Further, there is a `tests/vendor_tests/` directory for organizing
vendor-specific bug/feature tests that are not covered by the generic
project-wide tests. A POC test from RHBZ1928628 is available with this
commit.
Note that in order for these tests to be run properly _without_
installing the current branch to the local system, you will need to run
the following command:
`PYTHONPATH=tests/ avocado run -t stageone tests/`
Related: #2431
Signed-off-by: Jake Hunsaker <jhunsake@redhat.com>
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/__init__.py | 0 | ||||
-rw-r--r-- | tests/unittests/archive_tests.py | 120 | ||||
-rw-r--r-- | tests/unittests/cleaner_tests.py | 157 | ||||
-rw-r--r-- | tests/unittests/importer_tests.py | 24 | ||||
-rw-r--r-- | tests/unittests/option_tests.py | 49 | ||||
-rw-r--r-- | tests/unittests/path/to/leaf | 0 | ||||
-rw-r--r-- | tests/unittests/plugin_tests.py | 479 | ||||
-rw-r--r-- | tests/unittests/policy_tests.py | 103 | ||||
-rw-r--r-- | tests/unittests/report_tests.py | 155 | ||||
-rw-r--r-- | tests/unittests/sosreport_pexpect.py | 36 | ||||
-rw-r--r-- | tests/unittests/tail_test.txt | 4 | ||||
-rw-r--r-- | tests/unittests/test.txt | 1 | ||||
-rw-r--r-- | tests/unittests/utilities_tests.py | 103 | ||||
-rw-r--r-- | tests/unittests/ziptest | 0 | ||||
l--------- | tests/unittests/ziptest_link | 1 |
15 files changed, 1232 insertions, 0 deletions
diff --git a/tests/unittests/__init__.py b/tests/unittests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/__init__.py diff --git a/tests/unittests/archive_tests.py b/tests/unittests/archive_tests.py new file mode 100644 index 00000000..320006d0 --- /dev/null +++ b/tests/unittests/archive_tests.py @@ -0,0 +1,120 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest +import os +import tarfile +import tempfile +import shutil + +from sos.archive import TarFileArchive +from sos.utilities import tail +from sos.policies import Policy + + +class TarFileArchiveTest(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + enc = {'encrypt': False} + self.tf = TarFileArchive('test', self.tmpdir, Policy(), 1, enc, '/') + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def check_for_file(self, filename): + rtf = tarfile.open(os.path.join(self.tmpdir, 'test.tar')) + rtf.getmember(filename) + rtf.close() + + def test_create(self): + self.tf.finalize('auto') + self.assertTrue(os.path.exists(os.path.join(self.tmpdir, + 'test.tar'))) + + def test_add_file(self): + self.tf.add_file('tests/ziptest') + self.tf.finalize('auto') + + self.check_for_file('test/tests/ziptest') + + def test_add_node_dev_null(self): + st = os.lstat('/dev/null') + dev_maj = os.major(st.st_rdev) + dev_min = os.minor(st.st_rdev) + self.tf.add_node('/dev/null', st.st_mode, os.makedev(dev_maj, dev_min)) + + # when the string comes from tail() output + def test_add_string_from_file(self): + self.copy_strings = [] + testfile = tempfile.NamedTemporaryFile(dir=self.tmpdir, delete=False) + testfile.write(b"*" * 1000) + testfile.flush() + testfile.close() + + self.copy_strings.append((tail(testfile.name, 100), 'string_test.txt')) + self.tf.add_string(self.copy_strings[0][0], 'tests/string_test.txt') + self.tf.finalize('auto') + +# Since commit 179d9bb add_file does not support recursive directory +# addition. Disable this test for now. +# def test_add_dir(self): +# self.tf.add_file('tests/') +# self.tf.close() +# +# self.check_for_file('test/tests/ziptest') + + def test_add_renamed(self): + self.tf.add_file('tests/ziptest', dest='tests/ziptest_renamed') + self.tf.finalize('auto') + + self.check_for_file('test/tests/ziptest_renamed') + +# Since commit 179d9bb add_file does not support recursive directory +# addition. Disable this test for now. +# def test_add_renamed_dir(self): +# self.tf.add_file('tests/', 'tests_renamed/') +# self.tf.close() +# +# self.check_for_file('test/tests_renamed/ziptest') + + def test_add_string(self): + self.tf.add_string('this is content', 'tests/string_test.txt') + self.tf.finalize('auto') + + self.check_for_file('test/tests/string_test.txt') + + def test_get_file(self): + self.tf.add_string('this is my content', 'tests/string_test.txt') + + afp = self.tf.open_file('tests/string_test.txt') + self.assertEquals('this is my content', afp.read()) + + def test_rewrite_file(self): + """Test that re-writing a file with add_string() modifies the content. + """ + self.tf.add_string('this is my content', 'tests/string_test.txt') + self.tf.add_string('this is my new content', 'tests/string_test.txt') + + afp = self.tf.open_file('tests/string_test.txt') + self.assertEquals('this is my new content', afp.read()) + + def test_make_link(self): + self.tf.add_file('tests/ziptest') + self.tf.add_link('tests/ziptest', 'link_name') + + self.tf.finalize('auto') + self.check_for_file('test/link_name') + + def test_compress(self): + self.tf.finalize("auto") + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/cleaner_tests.py b/tests/unittests/cleaner_tests.py new file mode 100644 index 00000000..5510dd80 --- /dev/null +++ b/tests/unittests/cleaner_tests.py @@ -0,0 +1,157 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + +import unittest + +from ipaddress import ip_interface +from sos.cleaner.parsers.ip_parser import SoSIPParser +from sos.cleaner.parsers.mac_parser import SoSMacParser +from sos.cleaner.parsers.hostname_parser import SoSHostnameParser +from sos.cleaner.parsers.keyword_parser import SoSKeywordParser +from sos.cleaner.mappings.ip_map import SoSIPMap +from sos.cleaner.mappings.mac_map import SoSMacMap +from sos.cleaner.mappings.hostname_map import SoSHostnameMap +from sos.cleaner.mappings.keyword_map import SoSKeywordMap + + +class CleanerMapTests(unittest.TestCase): + + def setUp(self): + self.mac_map = SoSMacMap() + self.ip_map = SoSIPMap() + self.host_map = SoSHostnameMap() + self.host_map.load_domains_from_options(['redhat.com']) + self.kw_map = SoSKeywordMap() + + def test_mac_map_obfuscate_valid_v4(self): + _test = self.mac_map.get('12:34:56:78:90:ab') + self.assertNotEqual(_test, '12:34:56:78:90:ab') + + def test_mac_map_obfuscate_valid_v6(self): + _test = self.mac_map.get('12:34:56:ff:fe:78:90:ab') + self.assertNotEqual(_test, '12:34:56:ff:fe:78:90:ab') + + def test_mac_map_obfuscate_valid_v6_quad(self): + _test = self.mac_map.get('1234:56ff:fe78:90ab') + self.assertNotEqual(_test, '1234:56ff:fe78:90ab') + + def test_mac_map_skip_ignores(self): + _test = self.mac_map.get('ff:ff:ff:ff:ff:ff') + self.assertEquals(_test, 'ff:ff:ff:ff:ff:ff') + + def test_mac_map_avoid_duplicate_obfuscation(self): + _test = self.mac_map.get('ab:cd:ef:fe:dc:ba') + _dup = self.mac_map.get(_test) + self.assertEquals(_test, _dup) + + def test_ip_map_obfuscate_v4_with_cidr(self): + _test = self.ip_map.get('192.168.1.0/24') + self.assertNotEqual(_test, '192.168.1.0/24') + + def test_ip_map_obfuscate_no_cidr(self): + _test = self.ip_map.get('192.168.2.2') + self.assertNotEqual(_test, '192.168.2.2') + + def test_ip_map_obfuscate_same_subnet(self): + _net = ip_interface(self.ip_map.get('192.168.3.0/24')) + _test = ip_interface(self.ip_map.get('192.168.3.1')) + self.assertTrue(_test.ip in _net.network) + + def test_ip_map_get_same_with_or_without_cidr(self): + _hostwsub = self.ip_map.get('192.168.4.1/24') + _hostnosub = self.ip_map.get('192.168.4.1') + self.assertEqual(_hostwsub.split('/')[0], _hostnosub) + + def test_ip_skip_ignores(self): + _test = self.ip_map.get('127.0.0.1') + self.assertEquals(_test, '127.0.0.1') + + def test_hostname_obfuscate_domain_options(self): + _test = self.host_map.get('www.redhat.com') + self.assertNotEqual(_test, 'www.redhat.com') + + def test_hostname_obfuscate_same_item(self): + _test1 = self.host_map.get('example.redhat.com') + _test2 = self.host_map.get('example.redhat.com') + self.assertEqual(_test1, _test2) + + def test_hostname_obfuscate_just_domain(self): + _test = self.host_map.get('redhat.com') + self.assertEqual(_test, 'obfuscateddomain0.com') + + def test_hostname_no_obfuscate_non_loaded_domain(self): + _test = self.host_map.get('foobar.com') + self.assertEqual(_test, 'foobar.com') + + def test_hostname_no_obfuscate_non_loaded_fqdn(self): + _test = self.host_map.get('example.foobar.com') + self.assertEqual(_test, 'example.foobar.com') + + def test_keyword_single(self): + _test = self.kw_map.get('foobar') + self.assertEqual(_test, 'obfuscatedword0') + + +class CleanerParserTests(unittest.TestCase): + + def setUp(self): + self.ip_parser = SoSIPParser() + self.mac_parser = SoSMacParser() + self.host_parser = SoSHostnameParser(opt_domains='foobar.com') + self.kw_parser = SoSKeywordParser(keywords=['foobar']) + self.kw_parser_none = SoSKeywordParser() + + def test_ip_parser_valid_ipv4_line(self): + line = 'foobar foo 10.0.0.1/24 barfoo bar' + _test = self.ip_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_ip_parser_invalid_ipv4_line(self): + line = 'foobar foo 10.1.2.350 barfoo bar' + self.assertRaises(ValueError, self.ip_parser.parse_line, line) + + def test_ip_parser_package_version_line(self): + line = 'mycoolpackage-1.2.3.4.5' + _test = self.ip_parser.parse_line(line)[0] + self.assertEqual(line, _test) + + def test_mac_parser_valid_ipv4_line(self): + line = 'foobar foo 13:24:35:46:57:68 bar barfoo' + _test = self.mac_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_mac_parser_valid_ipv6_line(self): + line = 'foobar foo AA:BB:CC:FF:FE:DD:EE:FF bar barfoo' + _test = self.mac_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_hostname_load_hostname_string(self): + fqdn = 'myhost.subnet.example.com' + self.host_parser.load_hostname_into_map(fqdn) + + def test_hostname_valid_domain_line(self): + self.host_parser.load_hostname_into_map('myhost.subnet.example.com') + line = 'testing myhost.subnet.example.com in a string' + _test = self.host_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_hostname_short_name_in_line(self): + self.host_parser.load_hostname_into_map('myhost.subnet.example.com') + line = 'testing just myhost in a line' + _test = self.host_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_keyword_parser_valid_line(self): + line = 'this is my foobar test line' + _test = self.kw_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_keyword_parser_no_change_by_default(self): + line = 'this is my foobar test line' + _test = self.kw_parser_none.parse_line(line)[0] + self.assertEqual(line, _test) diff --git a/tests/unittests/importer_tests.py b/tests/unittests/importer_tests.py new file mode 100644 index 00000000..a2dddaba --- /dev/null +++ b/tests/unittests/importer_tests.py @@ -0,0 +1,24 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest + +from sos.utilities import ImporterHelper + + +class ImporterHelperTests(unittest.TestCase): + + def test_runs(self): + h = ImporterHelper(unittest) + modules = h.get_modules() + self.assertTrue('main' in modules) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/option_tests.py b/tests/unittests/option_tests.py new file mode 100644 index 00000000..58f54e94 --- /dev/null +++ b/tests/unittests/option_tests.py @@ -0,0 +1,49 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest + +from sos.report.plugins import Plugin +from sos.policies.distros import LinuxPolicy +from sos.policies.init_systems import InitSystem + + +class MockOptions(object): + all_logs = False + dry_run = False + log_size = 25 + allow_system_changes = False + skip_commands = [] + skip_files = [] + + +class GlobalOptionTest(unittest.TestCase): + + def setUp(self): + self.commons = { + 'sysroot': '/', + 'policy': LinuxPolicy(init=InitSystem()), + 'cmdlineopts': MockOptions(), + 'devices': {} + } + self.plugin = Plugin(self.commons) + self.plugin.opt_names = ['baz', 'empty', 'test_option'] + self.plugin.opt_parms = [ + {'enabled': False}, {'enabled': None}, {'enabled': 'foobar'} + ] + + def test_simple_lookup(self): + self.assertEquals(self.plugin.get_option('test_option'), 'foobar') + + def test_cascade(self): + self.assertEquals(self.plugin.get_option(('baz')), False) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/path/to/leaf b/tests/unittests/path/to/leaf new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/path/to/leaf diff --git a/tests/unittests/plugin_tests.py b/tests/unittests/plugin_tests.py new file mode 100644 index 00000000..2f362c94 --- /dev/null +++ b/tests/unittests/plugin_tests.py @@ -0,0 +1,479 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest +import os +import tempfile +import shutil + +from io import StringIO + +from sos.report.plugins import Plugin, regex_findall, _mangle_command +from sos.archive import TarFileArchive +from sos.policies.distros import LinuxPolicy +from sos.policies.init_systems import InitSystem + +PATH = os.path.dirname(__file__) + + +def j(filename): + return os.path.join(PATH, filename) + + +def create_file(size, dir=None): + f = tempfile.NamedTemporaryFile(delete=False, dir=dir) + f.write(b"*" * size * 1024 * 1024) + f.flush() + f.close() + return f.name + + +class MockArchive(TarFileArchive): + + def __init__(self): + self.m = {} + self.strings = {} + + def name(self): + return "mock.archive" + + def add_file(self, src, dest=None): + if not dest: + dest = src + self.m[src] = dest + + def add_string(self, content, dest): + self.m[dest] = content + + def add_link(self, dest, link_name): + pass + + def open_file(self, name): + return open(self.m.get(name), 'r') + + def close(self): + pass + + def compress(self, method): + pass + + +class MockPlugin(Plugin): + + option_list = [("opt", 'an option', 'fast', None), + ("opt2", 'another option', 'fast', False)] + + def setup(self): + pass + + +class NamedMockPlugin(Plugin): + + short_desc = "This plugin has a description." + plugin_name = "testing" + + def setup(self): + pass + + +class PostprocMockPlugin(Plugin): + + did_postproc = False + + def setup(self): + pass + + def postproc(self): + if self.get_option('postproc'): + self.did_postproc = True + + +class ForbiddenMockPlugin(Plugin): + """This plugin has a description.""" + + plugin_name = "forbidden" + + def setup(self): + self.add_copy_spec("tests") + self.add_forbidden_path("tests") + + +class EnablerPlugin(Plugin): + + def is_installed(self, pkg): + return self.is_installed + + +class MockOptions(object): + all_logs = False + dry_run = False + since = None + log_size = 25 + allow_system_changes = False + no_postproc = False + skip_files = [] + skip_commands = [] + + +class PluginToolTests(unittest.TestCase): + + def test_regex_findall(self): + test_s = u"\n".join( + ['this is only a test', 'there are only two lines']) + test_fo = StringIO(test_s) + matches = regex_findall(r".*lines$", test_fo) + self.assertEquals(matches, ['there are only two lines']) + + def test_regex_findall_miss(self): + test_s = u"\n".join( + ['this is only a test', 'there are only two lines']) + test_fo = StringIO(test_s) + matches = regex_findall(r".*not_there$", test_fo) + self.assertEquals(matches, []) + + def test_regex_findall_bad_input(self): + matches = regex_findall(r".*", None) + self.assertEquals(matches, []) + matches = regex_findall(r".*", []) + self.assertEquals(matches, []) + matches = regex_findall(r".*", 1) + self.assertEquals(matches, []) + + def test_mangle_command(self): + name_max = 255 + self.assertEquals("foo", _mangle_command("/usr/bin/foo", name_max)) + self.assertEquals( + "foo_-x", _mangle_command("/usr/bin/foo -x", name_max)) + self.assertEquals( + "foo_--verbose", _mangle_command("/usr/bin/foo --verbose", + name_max)) + self.assertEquals("foo_.path.to.stuff", _mangle_command( + "/usr/bin/foo /path/to/stuff", name_max)) + longcmd = "foo is " + "a" * 256 + " long_command" + expected = longcmd[0:name_max].replace(' ', '_') + self.assertEquals(expected, _mangle_command(longcmd, name_max)) + + +class PluginTests(unittest.TestCase): + + sysroot = os.getcwd() + + def setUp(self): + self.mp = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.mp.archive = MockArchive() + + def test_plugin_default_name(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.name(), "mockplugin") + + def test_plugin_set_name(self): + p = NamedMockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.name(), "testing") + + def test_plugin_no_descrip(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_description(), "<no description available>") + + def test_plugin_has_descrip(self): + p = NamedMockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_description(), + "This plugin has a description.") + + def test_set_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + p.set_option("opt", "testing") + self.assertEquals(p.get_option("opt"), "testing") + + def test_set_nonexistant_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertFalse(p.set_option("badopt", "testing")) + + def test_get_nonexistant_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option("badopt"), 0) + + def test_get_unset_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option("opt"), 0) + + def test_get_unset_plugin_option_with_default(self): + # this shows that even when we pass in a default to get, + # we'll get the option's default as set in the plugin + # this might not be what we really want + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option("opt", True), True) + + def test_get_unset_plugin_option_with_default_not_none(self): + # this shows that even when we pass in a default to get, + # if the plugin default is not None + # we'll get the option's default as set in the plugin + # this might not be what we really want + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option("opt2", True), False) + + def test_get_option_as_list_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + p.set_option("opt", "one,two,three") + self.assertEquals(p.get_option_as_list("opt"), ['one', 'two', 'three']) + + def test_get_option_as_list_plugin_option_default(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option_as_list("opt", default=[]), []) + + def test_get_option_as_list_plugin_option_not_list(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + p.set_option("opt", "testing") + self.assertEquals(p.get_option_as_list("opt"), ['testing']) + + def test_copy_dir(self): + self.mp._do_copy_path("tests") + self.assertEquals( + self.mp.archive.m["tests/plugin_tests.py"], + 'tests/plugin_tests.py') + + def test_copy_dir_bad_path(self): + self.mp._do_copy_path("not_here_tests") + self.assertEquals(self.mp.archive.m, {}) + + def test_copy_dir_forbidden_path(self): + p = ForbiddenMockPlugin({ + 'cmdlineopts': MockOptions(), + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'devices': {} + }) + p.archive = MockArchive() + p.setup() + p.collect() + self.assertEquals(p.archive.m, {}) + + def test_postproc_default_on(self): + p = PostprocMockPlugin({ + 'cmdlineopts': MockOptions(), + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem()), + 'devices': {} + }) + p.postproc() + self.assertTrue(p.did_postproc) + + +class AddCopySpecTests(unittest.TestCase): + + expect_paths = set(['tests/tail_test.txt']) + + def setUp(self): + self.mp = MockPlugin({ + 'cmdlineopts': MockOptions(), + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'sysroot': os.getcwd(), + 'devices': {} + }) + self.mp.archive = MockArchive() + + def assert_expect_paths(self): + def pathmunge(path): + if path[0] == '/': + path = path[1:] + return os.path.join(self.mp.sysroot, path) + expected_paths = set(map(pathmunge, self.expect_paths)) + self.assertEquals(self.mp.copy_paths, expected_paths) + + def test_single_file_no_limit(self): + self.mp.add_copy_spec("tests/tail_test.txt") + self.assert_expect_paths() + + def test_single_file_under_limit(self): + self.mp.add_copy_spec("tests/tail_test.txt", 1) + self.assert_expect_paths() + + def test_single_file_over_limit(self): + self.mp.sysroot = '/' + fn = create_file(2) # create 2MB file, consider a context manager + self.mp.add_copy_spec(fn, 1) + content, fname = self.mp.copy_strings[0] + self.assertTrue("tailed" in fname) + self.assertTrue("tmp" in fname) + self.assertTrue("/" not in fname) + self.assertEquals(1024 * 1024, len(content)) + os.unlink(fn) + + def test_bad_filename(self): + self.mp.sysroot = '/' + self.assertFalse(self.mp.add_copy_spec('', 1)) + self.assertFalse(self.mp.add_copy_spec(None, 1)) + + def test_glob_file(self): + self.mp.add_copy_spec('tests/tail_test.*') + self.assert_expect_paths() + + def test_glob_file_limit_no_limit(self): + self.mp.sysroot = '/' + tmpdir = tempfile.mkdtemp() + create_file(2, dir=tmpdir) + create_file(2, dir=tmpdir) + self.mp.add_copy_spec(tmpdir + "/*") + self.assertEquals(len(self.mp.copy_paths), 2) + shutil.rmtree(tmpdir) + + def test_glob_file_over_limit(self): + self.mp.sysroot = '/' + tmpdir = tempfile.mkdtemp() + create_file(2, dir=tmpdir) + create_file(2, dir=tmpdir) + self.mp.add_copy_spec(tmpdir + "/*", 1) + self.assertEquals(len(self.mp.copy_strings), 1) + content, fname = self.mp.copy_strings[0] + self.assertTrue("tailed" in fname) + self.assertEquals(1024 * 1024, len(content)) + shutil.rmtree(tmpdir) + + def test_multiple_files_no_limit(self): + self.mp.add_copy_spec(['tests/tail_test.txt', 'tests/test.txt']) + self.assertEquals(len(self.mp.copy_paths), 2) + + def test_multiple_files_under_limit(self): + self.mp.add_copy_spec(['tests/tail_test.txt', 'tests/test.txt'], 1) + self.assertEquals(len(self.mp.copy_paths), 2) + + +class CheckEnabledTests(unittest.TestCase): + + def setUp(self): + self.mp = EnablerPlugin({ + 'policy': LinuxPolicy(probe_runtime=False), + 'sysroot': os.getcwd(), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + + def test_checks_for_file(self): + f = j("tail_test.txt") + self.mp.files = (f,) + self.assertTrue(self.mp.check_enabled()) + + def test_checks_for_package(self): + self.mp.packages = ('foo',) + self.assertTrue(self.mp.check_enabled()) + + def test_allows_bad_tuple(self): + f = j("tail_test.txt") + self.mp.files = (f) + self.mp.packages = ('foo') + self.assertTrue(self.mp.check_enabled()) + + def test_enabled_by_default(self): + self.assertTrue(self.mp.check_enabled()) + + +class RegexSubTests(unittest.TestCase): + + def setUp(self): + self.mp = MockPlugin({ + 'cmdlineopts': MockOptions(), + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'sysroot': os.getcwd(), + 'devices': {} + }) + self.mp.archive = MockArchive() + + def test_file_never_copied(self): + self.assertEquals(0, self.mp.do_file_sub( + "never_copied", r"^(.*)$", "foobar")) + + def test_no_replacements(self): + self.mp.add_copy_spec(j("tail_test.txt")) + self.mp.collect() + replacements = self.mp.do_file_sub( + j("tail_test.txt"), r"wont_match", "foobar") + self.assertEquals(0, replacements) + + def test_replacements(self): + # test uses absolute paths + self.mp.sysroot = '/' + self.mp.add_copy_spec(j("tail_test.txt")) + self.mp.collect() + replacements = self.mp.do_file_sub( + j("tail_test.txt"), r"(tail)", "foobar") + self.assertEquals(1, replacements) + self.assertTrue("foobar" in self.mp.archive.m.get(j('tail_test.txt'))) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/policy_tests.py b/tests/unittests/policy_tests.py new file mode 100644 index 00000000..6d0c42b9 --- /dev/null +++ b/tests/unittests/policy_tests.py @@ -0,0 +1,103 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest + +from sos.policies import Policy, import_policy +from sos.policies.distros import LinuxPolicy +from sos.policies.package_managers import PackageManager +from sos.report.plugins import (Plugin, IndependentPlugin, + RedHatPlugin, DebianPlugin) + + +class FauxPolicy(Policy): + distro = "Faux" + + +class FauxLinuxPolicy(LinuxPolicy): + distro = "FauxLinux" + + @classmethod + def set_forbidden_paths(cls): + return ['/etc/secret'] + + +class FauxPlugin(Plugin, IndependentPlugin): + pass + + +class FauxRedHatPlugin(Plugin, RedHatPlugin): + pass + + +class FauxDebianPlugin(Plugin, DebianPlugin): + pass + + +class PolicyTests(unittest.TestCase): + + + def test_independent_only(self): + p = FauxPolicy() + p.valid_subclasses = [] + + self.assertTrue(p.validate_plugin(FauxPlugin)) + + def test_forbidden_paths_building(self): + p = FauxLinuxPolicy(probe_runtime=False) + self.assertTrue('*.pyc' in p.forbidden_paths) + self.assertTrue('/etc/passwd' in p.forbidden_paths) + self.assertTrue('/etc/secret' in p.forbidden_paths) + + def test_redhat(self): + p = FauxPolicy() + p.valid_subclasses = [RedHatPlugin] + + self.assertTrue(p.validate_plugin(FauxRedHatPlugin)) + + def test_debian(self): + p = FauxPolicy() + p.valid_subclasses = [DebianPlugin] + + self.assertTrue(p.validate_plugin(FauxDebianPlugin)) + + def test_fails(self): + p = FauxPolicy() + p.valid_subclasses = [] + + self.assertFalse(p.validate_plugin(FauxDebianPlugin)) + + def test_can_import(self): + self.assertTrue(import_policy('redhat') is not None) + + def test_cant_import(self): + self.assertTrue(import_policy('notreal') is None) + + +class PackageManagerTests(unittest.TestCase): + + def setUp(self): + self.pm = PackageManager() + + def test_default_all_pkgs(self): + self.assertEquals(self.pm.all_pkgs(), {}) + + def test_default_all_pkgs_by_name(self): + self.assertEquals(self.pm.all_pkgs_by_name('doesntmatter'), []) + + def test_default_all_pkgs_by_name_regex(self): + self.assertEquals( + self.pm.all_pkgs_by_name_regex('.*doesntmatter$'), []) + + def test_default_pkg_by_name(self): + self.assertEquals(self.pm.pkg_by_name('foo'), None) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/report_tests.py b/tests/unittests/report_tests.py new file mode 100644 index 00000000..bb059012 --- /dev/null +++ b/tests/unittests/report_tests.py @@ -0,0 +1,155 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest + +try: + import json +except ImportError: + import simplejson as json + +from sos.report.reporting import (Report, Section, Command, CopiedFile, + CreatedFile, Alert, PlainTextReport) + + +class ReportTest(unittest.TestCase): + + def test_empty(self): + report = Report() + + expected = json.dumps({}) + + self.assertEquals(expected, str(report)) + + def test_nested_section(self): + report = Report() + section = Section(name="section") + report.add(section) + + expected = json.dumps({"section": {}}) + + self.assertEquals(expected, str(report)) + + def test_multiple_sections(self): + report = Report() + section = Section(name="section") + report.add(section) + + section2 = Section(name="section2") + report.add(section2) + + expected = json.dumps({"section": {}, + "section2": {}, }) + + self.assertEquals(expected, str(report)) + + def test_deeply_nested(self): + report = Report() + section = Section(name="section") + command = Command(name="a command", return_code=0, + href="does/not/matter") + + section.add(command) + report.add(section) + + expected = json.dumps({"section": { + "commands": [{"name": "a command", + "return_code": 0, + "href": "does/not/matter"}]}}) + + self.assertEquals(expected, str(report)) + + +class TestPlainReport(unittest.TestCase): + + def setUp(self): + self.report = Report() + self.section = Section(name="plugin") + self.div = '\n' + PlainTextReport.PLUGDIVIDER + self.pluglist = "Loaded Plugins:\n{pluglist}" + self.defaultheader = u''.join([ + self.pluglist.format(pluglist=" plugin"), + self.div, + "\nplugin\n" + ]) + + def test_basic(self): + self.assertEquals(self.pluglist.format(pluglist=""), + PlainTextReport(self.report).unicode()) + + def test_one_section(self): + self.report.add(self.section) + + self.assertEquals(self.defaultheader, + PlainTextReport(self.report).unicode() + '\n') + + def test_two_sections(self): + section1 = Section(name="first") + section2 = Section(name="second") + self.report.add(section1, section2) + + self.assertEquals(u''.join([ + self.pluglist.format(pluglist=" first second"), + self.div, + "\nfirst", + self.div, + "\nsecond" + ]), + PlainTextReport(self.report).unicode()) + + def test_command(self): + cmd = Command(name="ls -al /foo/bar/baz", + return_code=0, + href="sos_commands/plugin/ls_-al_foo.bar.baz") + self.section.add(cmd) + self.report.add(self.section) + + self.assertEquals(u''.join([ + self.defaultheader, + "- commands executed:\n * ls -al /foo/bar/baz" + ]), + PlainTextReport(self.report).unicode()) + + def test_copied_file(self): + cf = CopiedFile(name="/etc/hosts", href="etc/hosts") + self.section.add(cf) + self.report.add(self.section) + + self.assertEquals(u''.join([ + self.defaultheader, + "- files copied:\n * /etc/hosts" + ]), + PlainTextReport(self.report).unicode()) + + def test_created_file(self): + crf = CreatedFile(name="sample.txt", + href="../sos_strings/sample/sample.txt") + self.section.add(crf) + self.report.add(self.section) + + self.assertEquals(u''.join([ + self.defaultheader, + "- files created:\n * sample.txt" + ]), + PlainTextReport(self.report).unicode()) + + def test_alert(self): + alrt = Alert("this is an alert") + self.section.add(alrt) + self.report.add(self.section) + + self.assertEquals(u''.join([ + self.defaultheader, + "- alerts:\n ! this is an alert" + ]), + PlainTextReport(self.report).unicode()) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/sosreport_pexpect.py b/tests/unittests/sosreport_pexpect.py new file mode 100644 index 00000000..3614fa5b --- /dev/null +++ b/tests/unittests/sosreport_pexpect.py @@ -0,0 +1,36 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest +import pexpect + +from os import kill +from signal import SIGINT + + +class PexpectTest(unittest.TestCase): + def test_plugins_install(self): + sos = pexpect.spawn('/usr/sbin/sosreport -l') + try: + sos.expect('plugin.*does not install, skipping') + except pexpect.EOF: + pass + else: + self.fail("a plugin does not install or sosreport is too slow") + kill(sos.pid, SIGINT) + + def test_batchmode_removes_questions(self): + sos = pexpect.spawn('/usr/sbin/sosreport --batch') + grp = sos.expect('send this file to your support representative.', 15) + self.assertEquals(grp, 0) + kill(sos.pid, SIGINT) + + +if __name__ == '__main__': + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/tail_test.txt b/tests/unittests/tail_test.txt new file mode 100644 index 00000000..8def0f72 --- /dev/null +++ b/tests/unittests/tail_test.txt @@ -0,0 +1,4 @@ +this is a file to test tail with +I have a few lines in here +I just need enough text to mess with it +this is the last line diff --git a/tests/unittests/test.txt b/tests/unittests/test.txt new file mode 100644 index 00000000..d95f3ad1 --- /dev/null +++ b/tests/unittests/test.txt @@ -0,0 +1 @@ +content diff --git a/tests/unittests/utilities_tests.py b/tests/unittests/utilities_tests.py new file mode 100644 index 00000000..64be9f1e --- /dev/null +++ b/tests/unittests/utilities_tests.py @@ -0,0 +1,103 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import os.path +import unittest + +# PYCOMPAT +from io import StringIO + +from sos.utilities import (grep, is_executable, sos_get_command_output, + find, tail, shell_out) + +TEST_DIR = os.path.dirname(__file__) + + +class GrepTest(unittest.TestCase): + + def test_file_obj(self): + test_s = u"\n".join( + ['this is only a test', 'there are only two lines']) + test_fo = StringIO(test_s) + matches = grep(".*test$", test_fo) + self.assertEquals(matches, ['this is only a test\n']) + + def test_real_file(self): + matches = grep(".*unittest$", __file__.replace(".pyc", ".py")) + self.assertEquals(matches, ['import unittest\n']) + + def test_open_file(self): + matches = grep(".*unittest$", open(__file__.replace(".pyc", ".py"))) + self.assertEquals(matches, ['import unittest\n']) + + def test_grep_multiple_files(self): + matches = grep(".*unittest$", + __file__.replace(".pyc", ".py"), "does_not_exist.txt") + self.assertEquals(matches, ['import unittest\n']) + + +class TailTest(unittest.TestCase): + + def test_tail(self): + t = tail("tests/tail_test.txt", 10) + self.assertEquals(t, b"last line\n") + + def test_tail_too_many(self): + t = tail("tests/tail_test.txt", 200) + expected = open("tests/tail_test.txt", "r").read() + self.assertEquals(t, str.encode(expected)) + + +class ExecutableTest(unittest.TestCase): + + def test_nonexe_file(self): + path = os.path.join(TEST_DIR, 'utility_tests.py') + self.assertFalse(is_executable(path)) + + def test_exe_file(self): + self.assertTrue(is_executable('true')) + + def test_exe_file_abs_path(self): + self.assertTrue(is_executable("/usr/bin/timeout")) + + def test_output(self): + result = sos_get_command_output("echo executed") + self.assertEquals(result['status'], 0) + self.assertEquals(result['output'], "executed\n") + + def test_output_non_exe(self): + path = os.path.join(TEST_DIR, 'utility_tests.py') + result = sos_get_command_output(path) + self.assertEquals(result['status'], 127) + self.assertEquals(result['output'], b"") + + def test_output_chdir(self): + cmd = "/bin/bash -c 'echo $PWD'" + result = sos_get_command_output(cmd, chdir=TEST_DIR) + print(result) + self.assertEquals(result['status'], 0) + self.assertEquals(result['output'].strip(), TEST_DIR) + + def test_shell_out(self): + self.assertEquals("executed\n", shell_out('echo executed')) + + +class FindTest(unittest.TestCase): + + def test_find_leaf(self): + leaves = find("leaf", TEST_DIR) + self.assertTrue(any(name.endswith("leaf") for name in leaves)) + + def test_too_shallow(self): + leaves = find("leaf", TEST_DIR, max_depth=1) + self.assertFalse(any(name.endswith("leaf") for name in leaves)) + + def test_not_in_pattern(self): + leaves = find("leaf", TEST_DIR, path_pattern="tests/path") + self.assertFalse(any(name.endswith("leaf") for name in leaves)) + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/ziptest b/tests/unittests/ziptest new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/ziptest diff --git a/tests/unittests/ziptest_link b/tests/unittests/ziptest_link new file mode 120000 index 00000000..e99bb13c --- /dev/null +++ b/tests/unittests/ziptest_link @@ -0,0 +1 @@ +ziptest
\ No newline at end of file |