diff options
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/__init__.py | 0 | ||||
-rw-r--r-- | tests/unittests/archive_tests.py | 120 | ||||
-rw-r--r-- | tests/unittests/cleaner_tests.py | 157 | ||||
-rw-r--r-- | tests/unittests/importer_tests.py | 24 | ||||
-rw-r--r-- | tests/unittests/option_tests.py | 49 | ||||
-rw-r--r-- | tests/unittests/path/to/leaf | 0 | ||||
-rw-r--r-- | tests/unittests/plugin_tests.py | 479 | ||||
-rw-r--r-- | tests/unittests/policy_tests.py | 103 | ||||
-rw-r--r-- | tests/unittests/report_tests.py | 155 | ||||
-rw-r--r-- | tests/unittests/sosreport_pexpect.py | 36 | ||||
-rw-r--r-- | tests/unittests/tail_test.txt | 4 | ||||
-rw-r--r-- | tests/unittests/test.txt | 1 | ||||
-rw-r--r-- | tests/unittests/utilities_tests.py | 103 | ||||
-rw-r--r-- | tests/unittests/ziptest | 0 | ||||
l--------- | tests/unittests/ziptest_link | 1 |
15 files changed, 1232 insertions, 0 deletions
diff --git a/tests/unittests/__init__.py b/tests/unittests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/__init__.py diff --git a/tests/unittests/archive_tests.py b/tests/unittests/archive_tests.py new file mode 100644 index 00000000..320006d0 --- /dev/null +++ b/tests/unittests/archive_tests.py @@ -0,0 +1,120 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest +import os +import tarfile +import tempfile +import shutil + +from sos.archive import TarFileArchive +from sos.utilities import tail +from sos.policies import Policy + + +class TarFileArchiveTest(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + enc = {'encrypt': False} + self.tf = TarFileArchive('test', self.tmpdir, Policy(), 1, enc, '/') + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def check_for_file(self, filename): + rtf = tarfile.open(os.path.join(self.tmpdir, 'test.tar')) + rtf.getmember(filename) + rtf.close() + + def test_create(self): + self.tf.finalize('auto') + self.assertTrue(os.path.exists(os.path.join(self.tmpdir, + 'test.tar'))) + + def test_add_file(self): + self.tf.add_file('tests/ziptest') + self.tf.finalize('auto') + + self.check_for_file('test/tests/ziptest') + + def test_add_node_dev_null(self): + st = os.lstat('/dev/null') + dev_maj = os.major(st.st_rdev) + dev_min = os.minor(st.st_rdev) + self.tf.add_node('/dev/null', st.st_mode, os.makedev(dev_maj, dev_min)) + + # when the string comes from tail() output + def test_add_string_from_file(self): + self.copy_strings = [] + testfile = tempfile.NamedTemporaryFile(dir=self.tmpdir, delete=False) + testfile.write(b"*" * 1000) + testfile.flush() + testfile.close() + + self.copy_strings.append((tail(testfile.name, 100), 'string_test.txt')) + self.tf.add_string(self.copy_strings[0][0], 'tests/string_test.txt') + self.tf.finalize('auto') + +# Since commit 179d9bb add_file does not support recursive directory +# addition. Disable this test for now. +# def test_add_dir(self): +# self.tf.add_file('tests/') +# self.tf.close() +# +# self.check_for_file('test/tests/ziptest') + + def test_add_renamed(self): + self.tf.add_file('tests/ziptest', dest='tests/ziptest_renamed') + self.tf.finalize('auto') + + self.check_for_file('test/tests/ziptest_renamed') + +# Since commit 179d9bb add_file does not support recursive directory +# addition. Disable this test for now. +# def test_add_renamed_dir(self): +# self.tf.add_file('tests/', 'tests_renamed/') +# self.tf.close() +# +# self.check_for_file('test/tests_renamed/ziptest') + + def test_add_string(self): + self.tf.add_string('this is content', 'tests/string_test.txt') + self.tf.finalize('auto') + + self.check_for_file('test/tests/string_test.txt') + + def test_get_file(self): + self.tf.add_string('this is my content', 'tests/string_test.txt') + + afp = self.tf.open_file('tests/string_test.txt') + self.assertEquals('this is my content', afp.read()) + + def test_rewrite_file(self): + """Test that re-writing a file with add_string() modifies the content. + """ + self.tf.add_string('this is my content', 'tests/string_test.txt') + self.tf.add_string('this is my new content', 'tests/string_test.txt') + + afp = self.tf.open_file('tests/string_test.txt') + self.assertEquals('this is my new content', afp.read()) + + def test_make_link(self): + self.tf.add_file('tests/ziptest') + self.tf.add_link('tests/ziptest', 'link_name') + + self.tf.finalize('auto') + self.check_for_file('test/link_name') + + def test_compress(self): + self.tf.finalize("auto") + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/cleaner_tests.py b/tests/unittests/cleaner_tests.py new file mode 100644 index 00000000..5510dd80 --- /dev/null +++ b/tests/unittests/cleaner_tests.py @@ -0,0 +1,157 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + +import unittest + +from ipaddress import ip_interface +from sos.cleaner.parsers.ip_parser import SoSIPParser +from sos.cleaner.parsers.mac_parser import SoSMacParser +from sos.cleaner.parsers.hostname_parser import SoSHostnameParser +from sos.cleaner.parsers.keyword_parser import SoSKeywordParser +from sos.cleaner.mappings.ip_map import SoSIPMap +from sos.cleaner.mappings.mac_map import SoSMacMap +from sos.cleaner.mappings.hostname_map import SoSHostnameMap +from sos.cleaner.mappings.keyword_map import SoSKeywordMap + + +class CleanerMapTests(unittest.TestCase): + + def setUp(self): + self.mac_map = SoSMacMap() + self.ip_map = SoSIPMap() + self.host_map = SoSHostnameMap() + self.host_map.load_domains_from_options(['redhat.com']) + self.kw_map = SoSKeywordMap() + + def test_mac_map_obfuscate_valid_v4(self): + _test = self.mac_map.get('12:34:56:78:90:ab') + self.assertNotEqual(_test, '12:34:56:78:90:ab') + + def test_mac_map_obfuscate_valid_v6(self): + _test = self.mac_map.get('12:34:56:ff:fe:78:90:ab') + self.assertNotEqual(_test, '12:34:56:ff:fe:78:90:ab') + + def test_mac_map_obfuscate_valid_v6_quad(self): + _test = self.mac_map.get('1234:56ff:fe78:90ab') + self.assertNotEqual(_test, '1234:56ff:fe78:90ab') + + def test_mac_map_skip_ignores(self): + _test = self.mac_map.get('ff:ff:ff:ff:ff:ff') + self.assertEquals(_test, 'ff:ff:ff:ff:ff:ff') + + def test_mac_map_avoid_duplicate_obfuscation(self): + _test = self.mac_map.get('ab:cd:ef:fe:dc:ba') + _dup = self.mac_map.get(_test) + self.assertEquals(_test, _dup) + + def test_ip_map_obfuscate_v4_with_cidr(self): + _test = self.ip_map.get('192.168.1.0/24') + self.assertNotEqual(_test, '192.168.1.0/24') + + def test_ip_map_obfuscate_no_cidr(self): + _test = self.ip_map.get('192.168.2.2') + self.assertNotEqual(_test, '192.168.2.2') + + def test_ip_map_obfuscate_same_subnet(self): + _net = ip_interface(self.ip_map.get('192.168.3.0/24')) + _test = ip_interface(self.ip_map.get('192.168.3.1')) + self.assertTrue(_test.ip in _net.network) + + def test_ip_map_get_same_with_or_without_cidr(self): + _hostwsub = self.ip_map.get('192.168.4.1/24') + _hostnosub = self.ip_map.get('192.168.4.1') + self.assertEqual(_hostwsub.split('/')[0], _hostnosub) + + def test_ip_skip_ignores(self): + _test = self.ip_map.get('127.0.0.1') + self.assertEquals(_test, '127.0.0.1') + + def test_hostname_obfuscate_domain_options(self): + _test = self.host_map.get('www.redhat.com') + self.assertNotEqual(_test, 'www.redhat.com') + + def test_hostname_obfuscate_same_item(self): + _test1 = self.host_map.get('example.redhat.com') + _test2 = self.host_map.get('example.redhat.com') + self.assertEqual(_test1, _test2) + + def test_hostname_obfuscate_just_domain(self): + _test = self.host_map.get('redhat.com') + self.assertEqual(_test, 'obfuscateddomain0.com') + + def test_hostname_no_obfuscate_non_loaded_domain(self): + _test = self.host_map.get('foobar.com') + self.assertEqual(_test, 'foobar.com') + + def test_hostname_no_obfuscate_non_loaded_fqdn(self): + _test = self.host_map.get('example.foobar.com') + self.assertEqual(_test, 'example.foobar.com') + + def test_keyword_single(self): + _test = self.kw_map.get('foobar') + self.assertEqual(_test, 'obfuscatedword0') + + +class CleanerParserTests(unittest.TestCase): + + def setUp(self): + self.ip_parser = SoSIPParser() + self.mac_parser = SoSMacParser() + self.host_parser = SoSHostnameParser(opt_domains='foobar.com') + self.kw_parser = SoSKeywordParser(keywords=['foobar']) + self.kw_parser_none = SoSKeywordParser() + + def test_ip_parser_valid_ipv4_line(self): + line = 'foobar foo 10.0.0.1/24 barfoo bar' + _test = self.ip_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_ip_parser_invalid_ipv4_line(self): + line = 'foobar foo 10.1.2.350 barfoo bar' + self.assertRaises(ValueError, self.ip_parser.parse_line, line) + + def test_ip_parser_package_version_line(self): + line = 'mycoolpackage-1.2.3.4.5' + _test = self.ip_parser.parse_line(line)[0] + self.assertEqual(line, _test) + + def test_mac_parser_valid_ipv4_line(self): + line = 'foobar foo 13:24:35:46:57:68 bar barfoo' + _test = self.mac_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_mac_parser_valid_ipv6_line(self): + line = 'foobar foo AA:BB:CC:FF:FE:DD:EE:FF bar barfoo' + _test = self.mac_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_hostname_load_hostname_string(self): + fqdn = 'myhost.subnet.example.com' + self.host_parser.load_hostname_into_map(fqdn) + + def test_hostname_valid_domain_line(self): + self.host_parser.load_hostname_into_map('myhost.subnet.example.com') + line = 'testing myhost.subnet.example.com in a string' + _test = self.host_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_hostname_short_name_in_line(self): + self.host_parser.load_hostname_into_map('myhost.subnet.example.com') + line = 'testing just myhost in a line' + _test = self.host_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_keyword_parser_valid_line(self): + line = 'this is my foobar test line' + _test = self.kw_parser.parse_line(line)[0] + self.assertNotEqual(line, _test) + + def test_keyword_parser_no_change_by_default(self): + line = 'this is my foobar test line' + _test = self.kw_parser_none.parse_line(line)[0] + self.assertEqual(line, _test) diff --git a/tests/unittests/importer_tests.py b/tests/unittests/importer_tests.py new file mode 100644 index 00000000..a2dddaba --- /dev/null +++ b/tests/unittests/importer_tests.py @@ -0,0 +1,24 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest + +from sos.utilities import ImporterHelper + + +class ImporterHelperTests(unittest.TestCase): + + def test_runs(self): + h = ImporterHelper(unittest) + modules = h.get_modules() + self.assertTrue('main' in modules) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/option_tests.py b/tests/unittests/option_tests.py new file mode 100644 index 00000000..58f54e94 --- /dev/null +++ b/tests/unittests/option_tests.py @@ -0,0 +1,49 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest + +from sos.report.plugins import Plugin +from sos.policies.distros import LinuxPolicy +from sos.policies.init_systems import InitSystem + + +class MockOptions(object): + all_logs = False + dry_run = False + log_size = 25 + allow_system_changes = False + skip_commands = [] + skip_files = [] + + +class GlobalOptionTest(unittest.TestCase): + + def setUp(self): + self.commons = { + 'sysroot': '/', + 'policy': LinuxPolicy(init=InitSystem()), + 'cmdlineopts': MockOptions(), + 'devices': {} + } + self.plugin = Plugin(self.commons) + self.plugin.opt_names = ['baz', 'empty', 'test_option'] + self.plugin.opt_parms = [ + {'enabled': False}, {'enabled': None}, {'enabled': 'foobar'} + ] + + def test_simple_lookup(self): + self.assertEquals(self.plugin.get_option('test_option'), 'foobar') + + def test_cascade(self): + self.assertEquals(self.plugin.get_option(('baz')), False) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/path/to/leaf b/tests/unittests/path/to/leaf new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/path/to/leaf diff --git a/tests/unittests/plugin_tests.py b/tests/unittests/plugin_tests.py new file mode 100644 index 00000000..2f362c94 --- /dev/null +++ b/tests/unittests/plugin_tests.py @@ -0,0 +1,479 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest +import os +import tempfile +import shutil + +from io import StringIO + +from sos.report.plugins import Plugin, regex_findall, _mangle_command +from sos.archive import TarFileArchive +from sos.policies.distros import LinuxPolicy +from sos.policies.init_systems import InitSystem + +PATH = os.path.dirname(__file__) + + +def j(filename): + return os.path.join(PATH, filename) + + +def create_file(size, dir=None): + f = tempfile.NamedTemporaryFile(delete=False, dir=dir) + f.write(b"*" * size * 1024 * 1024) + f.flush() + f.close() + return f.name + + +class MockArchive(TarFileArchive): + + def __init__(self): + self.m = {} + self.strings = {} + + def name(self): + return "mock.archive" + + def add_file(self, src, dest=None): + if not dest: + dest = src + self.m[src] = dest + + def add_string(self, content, dest): + self.m[dest] = content + + def add_link(self, dest, link_name): + pass + + def open_file(self, name): + return open(self.m.get(name), 'r') + + def close(self): + pass + + def compress(self, method): + pass + + +class MockPlugin(Plugin): + + option_list = [("opt", 'an option', 'fast', None), + ("opt2", 'another option', 'fast', False)] + + def setup(self): + pass + + +class NamedMockPlugin(Plugin): + + short_desc = "This plugin has a description." + plugin_name = "testing" + + def setup(self): + pass + + +class PostprocMockPlugin(Plugin): + + did_postproc = False + + def setup(self): + pass + + def postproc(self): + if self.get_option('postproc'): + self.did_postproc = True + + +class ForbiddenMockPlugin(Plugin): + """This plugin has a description.""" + + plugin_name = "forbidden" + + def setup(self): + self.add_copy_spec("tests") + self.add_forbidden_path("tests") + + +class EnablerPlugin(Plugin): + + def is_installed(self, pkg): + return self.is_installed + + +class MockOptions(object): + all_logs = False + dry_run = False + since = None + log_size = 25 + allow_system_changes = False + no_postproc = False + skip_files = [] + skip_commands = [] + + +class PluginToolTests(unittest.TestCase): + + def test_regex_findall(self): + test_s = u"\n".join( + ['this is only a test', 'there are only two lines']) + test_fo = StringIO(test_s) + matches = regex_findall(r".*lines$", test_fo) + self.assertEquals(matches, ['there are only two lines']) + + def test_regex_findall_miss(self): + test_s = u"\n".join( + ['this is only a test', 'there are only two lines']) + test_fo = StringIO(test_s) + matches = regex_findall(r".*not_there$", test_fo) + self.assertEquals(matches, []) + + def test_regex_findall_bad_input(self): + matches = regex_findall(r".*", None) + self.assertEquals(matches, []) + matches = regex_findall(r".*", []) + self.assertEquals(matches, []) + matches = regex_findall(r".*", 1) + self.assertEquals(matches, []) + + def test_mangle_command(self): + name_max = 255 + self.assertEquals("foo", _mangle_command("/usr/bin/foo", name_max)) + self.assertEquals( + "foo_-x", _mangle_command("/usr/bin/foo -x", name_max)) + self.assertEquals( + "foo_--verbose", _mangle_command("/usr/bin/foo --verbose", + name_max)) + self.assertEquals("foo_.path.to.stuff", _mangle_command( + "/usr/bin/foo /path/to/stuff", name_max)) + longcmd = "foo is " + "a" * 256 + " long_command" + expected = longcmd[0:name_max].replace(' ', '_') + self.assertEquals(expected, _mangle_command(longcmd, name_max)) + + +class PluginTests(unittest.TestCase): + + sysroot = os.getcwd() + + def setUp(self): + self.mp = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.mp.archive = MockArchive() + + def test_plugin_default_name(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.name(), "mockplugin") + + def test_plugin_set_name(self): + p = NamedMockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.name(), "testing") + + def test_plugin_no_descrip(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_description(), "<no description available>") + + def test_plugin_has_descrip(self): + p = NamedMockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_description(), + "This plugin has a description.") + + def test_set_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + p.set_option("opt", "testing") + self.assertEquals(p.get_option("opt"), "testing") + + def test_set_nonexistant_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertFalse(p.set_option("badopt", "testing")) + + def test_get_nonexistant_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option("badopt"), 0) + + def test_get_unset_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option("opt"), 0) + + def test_get_unset_plugin_option_with_default(self): + # this shows that even when we pass in a default to get, + # we'll get the option's default as set in the plugin + # this might not be what we really want + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option("opt", True), True) + + def test_get_unset_plugin_option_with_default_not_none(self): + # this shows that even when we pass in a default to get, + # if the plugin default is not None + # we'll get the option's default as set in the plugin + # this might not be what we really want + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option("opt2", True), False) + + def test_get_option_as_list_plugin_option(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + p.set_option("opt", "one,two,three") + self.assertEquals(p.get_option_as_list("opt"), ['one', 'two', 'three']) + + def test_get_option_as_list_plugin_option_default(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + self.assertEquals(p.get_option_as_list("opt", default=[]), []) + + def test_get_option_as_list_plugin_option_not_list(self): + p = MockPlugin({ + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + p.set_option("opt", "testing") + self.assertEquals(p.get_option_as_list("opt"), ['testing']) + + def test_copy_dir(self): + self.mp._do_copy_path("tests") + self.assertEquals( + self.mp.archive.m["tests/plugin_tests.py"], + 'tests/plugin_tests.py') + + def test_copy_dir_bad_path(self): + self.mp._do_copy_path("not_here_tests") + self.assertEquals(self.mp.archive.m, {}) + + def test_copy_dir_forbidden_path(self): + p = ForbiddenMockPlugin({ + 'cmdlineopts': MockOptions(), + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'devices': {} + }) + p.archive = MockArchive() + p.setup() + p.collect() + self.assertEquals(p.archive.m, {}) + + def test_postproc_default_on(self): + p = PostprocMockPlugin({ + 'cmdlineopts': MockOptions(), + 'sysroot': self.sysroot, + 'policy': LinuxPolicy(init=InitSystem()), + 'devices': {} + }) + p.postproc() + self.assertTrue(p.did_postproc) + + +class AddCopySpecTests(unittest.TestCase): + + expect_paths = set(['tests/tail_test.txt']) + + def setUp(self): + self.mp = MockPlugin({ + 'cmdlineopts': MockOptions(), + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'sysroot': os.getcwd(), + 'devices': {} + }) + self.mp.archive = MockArchive() + + def assert_expect_paths(self): + def pathmunge(path): + if path[0] == '/': + path = path[1:] + return os.path.join(self.mp.sysroot, path) + expected_paths = set(map(pathmunge, self.expect_paths)) + self.assertEquals(self.mp.copy_paths, expected_paths) + + def test_single_file_no_limit(self): + self.mp.add_copy_spec("tests/tail_test.txt") + self.assert_expect_paths() + + def test_single_file_under_limit(self): + self.mp.add_copy_spec("tests/tail_test.txt", 1) + self.assert_expect_paths() + + def test_single_file_over_limit(self): + self.mp.sysroot = '/' + fn = create_file(2) # create 2MB file, consider a context manager + self.mp.add_copy_spec(fn, 1) + content, fname = self.mp.copy_strings[0] + self.assertTrue("tailed" in fname) + self.assertTrue("tmp" in fname) + self.assertTrue("/" not in fname) + self.assertEquals(1024 * 1024, len(content)) + os.unlink(fn) + + def test_bad_filename(self): + self.mp.sysroot = '/' + self.assertFalse(self.mp.add_copy_spec('', 1)) + self.assertFalse(self.mp.add_copy_spec(None, 1)) + + def test_glob_file(self): + self.mp.add_copy_spec('tests/tail_test.*') + self.assert_expect_paths() + + def test_glob_file_limit_no_limit(self): + self.mp.sysroot = '/' + tmpdir = tempfile.mkdtemp() + create_file(2, dir=tmpdir) + create_file(2, dir=tmpdir) + self.mp.add_copy_spec(tmpdir + "/*") + self.assertEquals(len(self.mp.copy_paths), 2) + shutil.rmtree(tmpdir) + + def test_glob_file_over_limit(self): + self.mp.sysroot = '/' + tmpdir = tempfile.mkdtemp() + create_file(2, dir=tmpdir) + create_file(2, dir=tmpdir) + self.mp.add_copy_spec(tmpdir + "/*", 1) + self.assertEquals(len(self.mp.copy_strings), 1) + content, fname = self.mp.copy_strings[0] + self.assertTrue("tailed" in fname) + self.assertEquals(1024 * 1024, len(content)) + shutil.rmtree(tmpdir) + + def test_multiple_files_no_limit(self): + self.mp.add_copy_spec(['tests/tail_test.txt', 'tests/test.txt']) + self.assertEquals(len(self.mp.copy_paths), 2) + + def test_multiple_files_under_limit(self): + self.mp.add_copy_spec(['tests/tail_test.txt', 'tests/test.txt'], 1) + self.assertEquals(len(self.mp.copy_paths), 2) + + +class CheckEnabledTests(unittest.TestCase): + + def setUp(self): + self.mp = EnablerPlugin({ + 'policy': LinuxPolicy(probe_runtime=False), + 'sysroot': os.getcwd(), + 'cmdlineopts': MockOptions(), + 'devices': {} + }) + + def test_checks_for_file(self): + f = j("tail_test.txt") + self.mp.files = (f,) + self.assertTrue(self.mp.check_enabled()) + + def test_checks_for_package(self): + self.mp.packages = ('foo',) + self.assertTrue(self.mp.check_enabled()) + + def test_allows_bad_tuple(self): + f = j("tail_test.txt") + self.mp.files = (f) + self.mp.packages = ('foo') + self.assertTrue(self.mp.check_enabled()) + + def test_enabled_by_default(self): + self.assertTrue(self.mp.check_enabled()) + + +class RegexSubTests(unittest.TestCase): + + def setUp(self): + self.mp = MockPlugin({ + 'cmdlineopts': MockOptions(), + 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False), + 'sysroot': os.getcwd(), + 'devices': {} + }) + self.mp.archive = MockArchive() + + def test_file_never_copied(self): + self.assertEquals(0, self.mp.do_file_sub( + "never_copied", r"^(.*)$", "foobar")) + + def test_no_replacements(self): + self.mp.add_copy_spec(j("tail_test.txt")) + self.mp.collect() + replacements = self.mp.do_file_sub( + j("tail_test.txt"), r"wont_match", "foobar") + self.assertEquals(0, replacements) + + def test_replacements(self): + # test uses absolute paths + self.mp.sysroot = '/' + self.mp.add_copy_spec(j("tail_test.txt")) + self.mp.collect() + replacements = self.mp.do_file_sub( + j("tail_test.txt"), r"(tail)", "foobar") + self.assertEquals(1, replacements) + self.assertTrue("foobar" in self.mp.archive.m.get(j('tail_test.txt'))) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/policy_tests.py b/tests/unittests/policy_tests.py new file mode 100644 index 00000000..6d0c42b9 --- /dev/null +++ b/tests/unittests/policy_tests.py @@ -0,0 +1,103 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest + +from sos.policies import Policy, import_policy +from sos.policies.distros import LinuxPolicy +from sos.policies.package_managers import PackageManager +from sos.report.plugins import (Plugin, IndependentPlugin, + RedHatPlugin, DebianPlugin) + + +class FauxPolicy(Policy): + distro = "Faux" + + +class FauxLinuxPolicy(LinuxPolicy): + distro = "FauxLinux" + + @classmethod + def set_forbidden_paths(cls): + return ['/etc/secret'] + + +class FauxPlugin(Plugin, IndependentPlugin): + pass + + +class FauxRedHatPlugin(Plugin, RedHatPlugin): + pass + + +class FauxDebianPlugin(Plugin, DebianPlugin): + pass + + +class PolicyTests(unittest.TestCase): + + + def test_independent_only(self): + p = FauxPolicy() + p.valid_subclasses = [] + + self.assertTrue(p.validate_plugin(FauxPlugin)) + + def test_forbidden_paths_building(self): + p = FauxLinuxPolicy(probe_runtime=False) + self.assertTrue('*.pyc' in p.forbidden_paths) + self.assertTrue('/etc/passwd' in p.forbidden_paths) + self.assertTrue('/etc/secret' in p.forbidden_paths) + + def test_redhat(self): + p = FauxPolicy() + p.valid_subclasses = [RedHatPlugin] + + self.assertTrue(p.validate_plugin(FauxRedHatPlugin)) + + def test_debian(self): + p = FauxPolicy() + p.valid_subclasses = [DebianPlugin] + + self.assertTrue(p.validate_plugin(FauxDebianPlugin)) + + def test_fails(self): + p = FauxPolicy() + p.valid_subclasses = [] + + self.assertFalse(p.validate_plugin(FauxDebianPlugin)) + + def test_can_import(self): + self.assertTrue(import_policy('redhat') is not None) + + def test_cant_import(self): + self.assertTrue(import_policy('notreal') is None) + + +class PackageManagerTests(unittest.TestCase): + + def setUp(self): + self.pm = PackageManager() + + def test_default_all_pkgs(self): + self.assertEquals(self.pm.all_pkgs(), {}) + + def test_default_all_pkgs_by_name(self): + self.assertEquals(self.pm.all_pkgs_by_name('doesntmatter'), []) + + def test_default_all_pkgs_by_name_regex(self): + self.assertEquals( + self.pm.all_pkgs_by_name_regex('.*doesntmatter$'), []) + + def test_default_pkg_by_name(self): + self.assertEquals(self.pm.pkg_by_name('foo'), None) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/report_tests.py b/tests/unittests/report_tests.py new file mode 100644 index 00000000..bb059012 --- /dev/null +++ b/tests/unittests/report_tests.py @@ -0,0 +1,155 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest + +try: + import json +except ImportError: + import simplejson as json + +from sos.report.reporting import (Report, Section, Command, CopiedFile, + CreatedFile, Alert, PlainTextReport) + + +class ReportTest(unittest.TestCase): + + def test_empty(self): + report = Report() + + expected = json.dumps({}) + + self.assertEquals(expected, str(report)) + + def test_nested_section(self): + report = Report() + section = Section(name="section") + report.add(section) + + expected = json.dumps({"section": {}}) + + self.assertEquals(expected, str(report)) + + def test_multiple_sections(self): + report = Report() + section = Section(name="section") + report.add(section) + + section2 = Section(name="section2") + report.add(section2) + + expected = json.dumps({"section": {}, + "section2": {}, }) + + self.assertEquals(expected, str(report)) + + def test_deeply_nested(self): + report = Report() + section = Section(name="section") + command = Command(name="a command", return_code=0, + href="does/not/matter") + + section.add(command) + report.add(section) + + expected = json.dumps({"section": { + "commands": [{"name": "a command", + "return_code": 0, + "href": "does/not/matter"}]}}) + + self.assertEquals(expected, str(report)) + + +class TestPlainReport(unittest.TestCase): + + def setUp(self): + self.report = Report() + self.section = Section(name="plugin") + self.div = '\n' + PlainTextReport.PLUGDIVIDER + self.pluglist = "Loaded Plugins:\n{pluglist}" + self.defaultheader = u''.join([ + self.pluglist.format(pluglist=" plugin"), + self.div, + "\nplugin\n" + ]) + + def test_basic(self): + self.assertEquals(self.pluglist.format(pluglist=""), + PlainTextReport(self.report).unicode()) + + def test_one_section(self): + self.report.add(self.section) + + self.assertEquals(self.defaultheader, + PlainTextReport(self.report).unicode() + '\n') + + def test_two_sections(self): + section1 = Section(name="first") + section2 = Section(name="second") + self.report.add(section1, section2) + + self.assertEquals(u''.join([ + self.pluglist.format(pluglist=" first second"), + self.div, + "\nfirst", + self.div, + "\nsecond" + ]), + PlainTextReport(self.report).unicode()) + + def test_command(self): + cmd = Command(name="ls -al /foo/bar/baz", + return_code=0, + href="sos_commands/plugin/ls_-al_foo.bar.baz") + self.section.add(cmd) + self.report.add(self.section) + + self.assertEquals(u''.join([ + self.defaultheader, + "- commands executed:\n * ls -al /foo/bar/baz" + ]), + PlainTextReport(self.report).unicode()) + + def test_copied_file(self): + cf = CopiedFile(name="/etc/hosts", href="etc/hosts") + self.section.add(cf) + self.report.add(self.section) + + self.assertEquals(u''.join([ + self.defaultheader, + "- files copied:\n * /etc/hosts" + ]), + PlainTextReport(self.report).unicode()) + + def test_created_file(self): + crf = CreatedFile(name="sample.txt", + href="../sos_strings/sample/sample.txt") + self.section.add(crf) + self.report.add(self.section) + + self.assertEquals(u''.join([ + self.defaultheader, + "- files created:\n * sample.txt" + ]), + PlainTextReport(self.report).unicode()) + + def test_alert(self): + alrt = Alert("this is an alert") + self.section.add(alrt) + self.report.add(self.section) + + self.assertEquals(u''.join([ + self.defaultheader, + "- alerts:\n ! this is an alert" + ]), + PlainTextReport(self.report).unicode()) + + +if __name__ == "__main__": + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/sosreport_pexpect.py b/tests/unittests/sosreport_pexpect.py new file mode 100644 index 00000000..3614fa5b --- /dev/null +++ b/tests/unittests/sosreport_pexpect.py @@ -0,0 +1,36 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import unittest +import pexpect + +from os import kill +from signal import SIGINT + + +class PexpectTest(unittest.TestCase): + def test_plugins_install(self): + sos = pexpect.spawn('/usr/sbin/sosreport -l') + try: + sos.expect('plugin.*does not install, skipping') + except pexpect.EOF: + pass + else: + self.fail("a plugin does not install or sosreport is too slow") + kill(sos.pid, SIGINT) + + def test_batchmode_removes_questions(self): + sos = pexpect.spawn('/usr/sbin/sosreport --batch') + grp = sos.expect('send this file to your support representative.', 15) + self.assertEquals(grp, 0) + kill(sos.pid, SIGINT) + + +if __name__ == '__main__': + unittest.main() + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/tail_test.txt b/tests/unittests/tail_test.txt new file mode 100644 index 00000000..8def0f72 --- /dev/null +++ b/tests/unittests/tail_test.txt @@ -0,0 +1,4 @@ +this is a file to test tail with +I have a few lines in here +I just need enough text to mess with it +this is the last line diff --git a/tests/unittests/test.txt b/tests/unittests/test.txt new file mode 100644 index 00000000..d95f3ad1 --- /dev/null +++ b/tests/unittests/test.txt @@ -0,0 +1 @@ +content diff --git a/tests/unittests/utilities_tests.py b/tests/unittests/utilities_tests.py new file mode 100644 index 00000000..64be9f1e --- /dev/null +++ b/tests/unittests/utilities_tests.py @@ -0,0 +1,103 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. +import os.path +import unittest + +# PYCOMPAT +from io import StringIO + +from sos.utilities import (grep, is_executable, sos_get_command_output, + find, tail, shell_out) + +TEST_DIR = os.path.dirname(__file__) + + +class GrepTest(unittest.TestCase): + + def test_file_obj(self): + test_s = u"\n".join( + ['this is only a test', 'there are only two lines']) + test_fo = StringIO(test_s) + matches = grep(".*test$", test_fo) + self.assertEquals(matches, ['this is only a test\n']) + + def test_real_file(self): + matches = grep(".*unittest$", __file__.replace(".pyc", ".py")) + self.assertEquals(matches, ['import unittest\n']) + + def test_open_file(self): + matches = grep(".*unittest$", open(__file__.replace(".pyc", ".py"))) + self.assertEquals(matches, ['import unittest\n']) + + def test_grep_multiple_files(self): + matches = grep(".*unittest$", + __file__.replace(".pyc", ".py"), "does_not_exist.txt") + self.assertEquals(matches, ['import unittest\n']) + + +class TailTest(unittest.TestCase): + + def test_tail(self): + t = tail("tests/tail_test.txt", 10) + self.assertEquals(t, b"last line\n") + + def test_tail_too_many(self): + t = tail("tests/tail_test.txt", 200) + expected = open("tests/tail_test.txt", "r").read() + self.assertEquals(t, str.encode(expected)) + + +class ExecutableTest(unittest.TestCase): + + def test_nonexe_file(self): + path = os.path.join(TEST_DIR, 'utility_tests.py') + self.assertFalse(is_executable(path)) + + def test_exe_file(self): + self.assertTrue(is_executable('true')) + + def test_exe_file_abs_path(self): + self.assertTrue(is_executable("/usr/bin/timeout")) + + def test_output(self): + result = sos_get_command_output("echo executed") + self.assertEquals(result['status'], 0) + self.assertEquals(result['output'], "executed\n") + + def test_output_non_exe(self): + path = os.path.join(TEST_DIR, 'utility_tests.py') + result = sos_get_command_output(path) + self.assertEquals(result['status'], 127) + self.assertEquals(result['output'], b"") + + def test_output_chdir(self): + cmd = "/bin/bash -c 'echo $PWD'" + result = sos_get_command_output(cmd, chdir=TEST_DIR) + print(result) + self.assertEquals(result['status'], 0) + self.assertEquals(result['output'].strip(), TEST_DIR) + + def test_shell_out(self): + self.assertEquals("executed\n", shell_out('echo executed')) + + +class FindTest(unittest.TestCase): + + def test_find_leaf(self): + leaves = find("leaf", TEST_DIR) + self.assertTrue(any(name.endswith("leaf") for name in leaves)) + + def test_too_shallow(self): + leaves = find("leaf", TEST_DIR, max_depth=1) + self.assertFalse(any(name.endswith("leaf") for name in leaves)) + + def test_not_in_pattern(self): + leaves = find("leaf", TEST_DIR, path_pattern="tests/path") + self.assertFalse(any(name.endswith("leaf") for name in leaves)) + +# vim: set et ts=4 sw=4 : diff --git a/tests/unittests/ziptest b/tests/unittests/ziptest new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/ziptest diff --git a/tests/unittests/ziptest_link b/tests/unittests/ziptest_link new file mode 120000 index 00000000..e99bb13c --- /dev/null +++ b/tests/unittests/ziptest_link @@ -0,0 +1 @@ +ziptest
\ No newline at end of file |