aboutsummaryrefslogtreecommitdiffstats
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/__init__.py0
-rw-r--r--tests/unittests/archive_tests.py120
-rw-r--r--tests/unittests/cleaner_tests.py157
-rw-r--r--tests/unittests/importer_tests.py24
-rw-r--r--tests/unittests/option_tests.py49
-rw-r--r--tests/unittests/path/to/leaf0
-rw-r--r--tests/unittests/plugin_tests.py479
-rw-r--r--tests/unittests/policy_tests.py103
-rw-r--r--tests/unittests/report_tests.py155
-rw-r--r--tests/unittests/sosreport_pexpect.py36
-rw-r--r--tests/unittests/tail_test.txt4
-rw-r--r--tests/unittests/test.txt1
-rw-r--r--tests/unittests/utilities_tests.py103
-rw-r--r--tests/unittests/ziptest0
l---------tests/unittests/ziptest_link1
15 files changed, 1232 insertions, 0 deletions
diff --git a/tests/unittests/__init__.py b/tests/unittests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/__init__.py
diff --git a/tests/unittests/archive_tests.py b/tests/unittests/archive_tests.py
new file mode 100644
index 00000000..320006d0
--- /dev/null
+++ b/tests/unittests/archive_tests.py
@@ -0,0 +1,120 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+import unittest
+import os
+import tarfile
+import tempfile
+import shutil
+
+from sos.archive import TarFileArchive
+from sos.utilities import tail
+from sos.policies import Policy
+
+
+class TarFileArchiveTest(unittest.TestCase):
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ enc = {'encrypt': False}
+ self.tf = TarFileArchive('test', self.tmpdir, Policy(), 1, enc, '/')
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def check_for_file(self, filename):
+ rtf = tarfile.open(os.path.join(self.tmpdir, 'test.tar'))
+ rtf.getmember(filename)
+ rtf.close()
+
+ def test_create(self):
+ self.tf.finalize('auto')
+ self.assertTrue(os.path.exists(os.path.join(self.tmpdir,
+ 'test.tar')))
+
+ def test_add_file(self):
+ self.tf.add_file('tests/ziptest')
+ self.tf.finalize('auto')
+
+ self.check_for_file('test/tests/ziptest')
+
+ def test_add_node_dev_null(self):
+ st = os.lstat('/dev/null')
+ dev_maj = os.major(st.st_rdev)
+ dev_min = os.minor(st.st_rdev)
+ self.tf.add_node('/dev/null', st.st_mode, os.makedev(dev_maj, dev_min))
+
+ # when the string comes from tail() output
+ def test_add_string_from_file(self):
+ self.copy_strings = []
+ testfile = tempfile.NamedTemporaryFile(dir=self.tmpdir, delete=False)
+ testfile.write(b"*" * 1000)
+ testfile.flush()
+ testfile.close()
+
+ self.copy_strings.append((tail(testfile.name, 100), 'string_test.txt'))
+ self.tf.add_string(self.copy_strings[0][0], 'tests/string_test.txt')
+ self.tf.finalize('auto')
+
+# Since commit 179d9bb add_file does not support recursive directory
+# addition. Disable this test for now.
+# def test_add_dir(self):
+# self.tf.add_file('tests/')
+# self.tf.close()
+#
+# self.check_for_file('test/tests/ziptest')
+
+ def test_add_renamed(self):
+ self.tf.add_file('tests/ziptest', dest='tests/ziptest_renamed')
+ self.tf.finalize('auto')
+
+ self.check_for_file('test/tests/ziptest_renamed')
+
+# Since commit 179d9bb add_file does not support recursive directory
+# addition. Disable this test for now.
+# def test_add_renamed_dir(self):
+# self.tf.add_file('tests/', 'tests_renamed/')
+# self.tf.close()
+#
+# self.check_for_file('test/tests_renamed/ziptest')
+
+ def test_add_string(self):
+ self.tf.add_string('this is content', 'tests/string_test.txt')
+ self.tf.finalize('auto')
+
+ self.check_for_file('test/tests/string_test.txt')
+
+ def test_get_file(self):
+ self.tf.add_string('this is my content', 'tests/string_test.txt')
+
+ afp = self.tf.open_file('tests/string_test.txt')
+ self.assertEquals('this is my content', afp.read())
+
+ def test_rewrite_file(self):
+ """Test that re-writing a file with add_string() modifies the content.
+ """
+ self.tf.add_string('this is my content', 'tests/string_test.txt')
+ self.tf.add_string('this is my new content', 'tests/string_test.txt')
+
+ afp = self.tf.open_file('tests/string_test.txt')
+ self.assertEquals('this is my new content', afp.read())
+
+ def test_make_link(self):
+ self.tf.add_file('tests/ziptest')
+ self.tf.add_link('tests/ziptest', 'link_name')
+
+ self.tf.finalize('auto')
+ self.check_for_file('test/link_name')
+
+ def test_compress(self):
+ self.tf.finalize("auto")
+
+
+if __name__ == "__main__":
+ unittest.main()
+
+# vim: set et ts=4 sw=4 :
diff --git a/tests/unittests/cleaner_tests.py b/tests/unittests/cleaner_tests.py
new file mode 100644
index 00000000..5510dd80
--- /dev/null
+++ b/tests/unittests/cleaner_tests.py
@@ -0,0 +1,157 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+
+import unittest
+
+from ipaddress import ip_interface
+from sos.cleaner.parsers.ip_parser import SoSIPParser
+from sos.cleaner.parsers.mac_parser import SoSMacParser
+from sos.cleaner.parsers.hostname_parser import SoSHostnameParser
+from sos.cleaner.parsers.keyword_parser import SoSKeywordParser
+from sos.cleaner.mappings.ip_map import SoSIPMap
+from sos.cleaner.mappings.mac_map import SoSMacMap
+from sos.cleaner.mappings.hostname_map import SoSHostnameMap
+from sos.cleaner.mappings.keyword_map import SoSKeywordMap
+
+
+class CleanerMapTests(unittest.TestCase):
+
+ def setUp(self):
+ self.mac_map = SoSMacMap()
+ self.ip_map = SoSIPMap()
+ self.host_map = SoSHostnameMap()
+ self.host_map.load_domains_from_options(['redhat.com'])
+ self.kw_map = SoSKeywordMap()
+
+ def test_mac_map_obfuscate_valid_v4(self):
+ _test = self.mac_map.get('12:34:56:78:90:ab')
+ self.assertNotEqual(_test, '12:34:56:78:90:ab')
+
+ def test_mac_map_obfuscate_valid_v6(self):
+ _test = self.mac_map.get('12:34:56:ff:fe:78:90:ab')
+ self.assertNotEqual(_test, '12:34:56:ff:fe:78:90:ab')
+
+ def test_mac_map_obfuscate_valid_v6_quad(self):
+ _test = self.mac_map.get('1234:56ff:fe78:90ab')
+ self.assertNotEqual(_test, '1234:56ff:fe78:90ab')
+
+ def test_mac_map_skip_ignores(self):
+ _test = self.mac_map.get('ff:ff:ff:ff:ff:ff')
+ self.assertEquals(_test, 'ff:ff:ff:ff:ff:ff')
+
+ def test_mac_map_avoid_duplicate_obfuscation(self):
+ _test = self.mac_map.get('ab:cd:ef:fe:dc:ba')
+ _dup = self.mac_map.get(_test)
+ self.assertEquals(_test, _dup)
+
+ def test_ip_map_obfuscate_v4_with_cidr(self):
+ _test = self.ip_map.get('192.168.1.0/24')
+ self.assertNotEqual(_test, '192.168.1.0/24')
+
+ def test_ip_map_obfuscate_no_cidr(self):
+ _test = self.ip_map.get('192.168.2.2')
+ self.assertNotEqual(_test, '192.168.2.2')
+
+ def test_ip_map_obfuscate_same_subnet(self):
+ _net = ip_interface(self.ip_map.get('192.168.3.0/24'))
+ _test = ip_interface(self.ip_map.get('192.168.3.1'))
+ self.assertTrue(_test.ip in _net.network)
+
+ def test_ip_map_get_same_with_or_without_cidr(self):
+ _hostwsub = self.ip_map.get('192.168.4.1/24')
+ _hostnosub = self.ip_map.get('192.168.4.1')
+ self.assertEqual(_hostwsub.split('/')[0], _hostnosub)
+
+ def test_ip_skip_ignores(self):
+ _test = self.ip_map.get('127.0.0.1')
+ self.assertEquals(_test, '127.0.0.1')
+
+ def test_hostname_obfuscate_domain_options(self):
+ _test = self.host_map.get('www.redhat.com')
+ self.assertNotEqual(_test, 'www.redhat.com')
+
+ def test_hostname_obfuscate_same_item(self):
+ _test1 = self.host_map.get('example.redhat.com')
+ _test2 = self.host_map.get('example.redhat.com')
+ self.assertEqual(_test1, _test2)
+
+ def test_hostname_obfuscate_just_domain(self):
+ _test = self.host_map.get('redhat.com')
+ self.assertEqual(_test, 'obfuscateddomain0.com')
+
+ def test_hostname_no_obfuscate_non_loaded_domain(self):
+ _test = self.host_map.get('foobar.com')
+ self.assertEqual(_test, 'foobar.com')
+
+ def test_hostname_no_obfuscate_non_loaded_fqdn(self):
+ _test = self.host_map.get('example.foobar.com')
+ self.assertEqual(_test, 'example.foobar.com')
+
+ def test_keyword_single(self):
+ _test = self.kw_map.get('foobar')
+ self.assertEqual(_test, 'obfuscatedword0')
+
+
+class CleanerParserTests(unittest.TestCase):
+
+ def setUp(self):
+ self.ip_parser = SoSIPParser()
+ self.mac_parser = SoSMacParser()
+ self.host_parser = SoSHostnameParser(opt_domains='foobar.com')
+ self.kw_parser = SoSKeywordParser(keywords=['foobar'])
+ self.kw_parser_none = SoSKeywordParser()
+
+ def test_ip_parser_valid_ipv4_line(self):
+ line = 'foobar foo 10.0.0.1/24 barfoo bar'
+ _test = self.ip_parser.parse_line(line)[0]
+ self.assertNotEqual(line, _test)
+
+ def test_ip_parser_invalid_ipv4_line(self):
+ line = 'foobar foo 10.1.2.350 barfoo bar'
+ self.assertRaises(ValueError, self.ip_parser.parse_line, line)
+
+ def test_ip_parser_package_version_line(self):
+ line = 'mycoolpackage-1.2.3.4.5'
+ _test = self.ip_parser.parse_line(line)[0]
+ self.assertEqual(line, _test)
+
+ def test_mac_parser_valid_ipv4_line(self):
+ line = 'foobar foo 13:24:35:46:57:68 bar barfoo'
+ _test = self.mac_parser.parse_line(line)[0]
+ self.assertNotEqual(line, _test)
+
+ def test_mac_parser_valid_ipv6_line(self):
+ line = 'foobar foo AA:BB:CC:FF:FE:DD:EE:FF bar barfoo'
+ _test = self.mac_parser.parse_line(line)[0]
+ self.assertNotEqual(line, _test)
+
+ def test_hostname_load_hostname_string(self):
+ fqdn = 'myhost.subnet.example.com'
+ self.host_parser.load_hostname_into_map(fqdn)
+
+ def test_hostname_valid_domain_line(self):
+ self.host_parser.load_hostname_into_map('myhost.subnet.example.com')
+ line = 'testing myhost.subnet.example.com in a string'
+ _test = self.host_parser.parse_line(line)[0]
+ self.assertNotEqual(line, _test)
+
+ def test_hostname_short_name_in_line(self):
+ self.host_parser.load_hostname_into_map('myhost.subnet.example.com')
+ line = 'testing just myhost in a line'
+ _test = self.host_parser.parse_line(line)[0]
+ self.assertNotEqual(line, _test)
+
+ def test_keyword_parser_valid_line(self):
+ line = 'this is my foobar test line'
+ _test = self.kw_parser.parse_line(line)[0]
+ self.assertNotEqual(line, _test)
+
+ def test_keyword_parser_no_change_by_default(self):
+ line = 'this is my foobar test line'
+ _test = self.kw_parser_none.parse_line(line)[0]
+ self.assertEqual(line, _test)
diff --git a/tests/unittests/importer_tests.py b/tests/unittests/importer_tests.py
new file mode 100644
index 00000000..a2dddaba
--- /dev/null
+++ b/tests/unittests/importer_tests.py
@@ -0,0 +1,24 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+import unittest
+
+from sos.utilities import ImporterHelper
+
+
+class ImporterHelperTests(unittest.TestCase):
+
+ def test_runs(self):
+ h = ImporterHelper(unittest)
+ modules = h.get_modules()
+ self.assertTrue('main' in modules)
+
+
+if __name__ == "__main__":
+ unittest.main()
+
+# vim: set et ts=4 sw=4 :
diff --git a/tests/unittests/option_tests.py b/tests/unittests/option_tests.py
new file mode 100644
index 00000000..58f54e94
--- /dev/null
+++ b/tests/unittests/option_tests.py
@@ -0,0 +1,49 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+import unittest
+
+from sos.report.plugins import Plugin
+from sos.policies.distros import LinuxPolicy
+from sos.policies.init_systems import InitSystem
+
+
+class MockOptions(object):
+ all_logs = False
+ dry_run = False
+ log_size = 25
+ allow_system_changes = False
+ skip_commands = []
+ skip_files = []
+
+
+class GlobalOptionTest(unittest.TestCase):
+
+ def setUp(self):
+ self.commons = {
+ 'sysroot': '/',
+ 'policy': LinuxPolicy(init=InitSystem()),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ }
+ self.plugin = Plugin(self.commons)
+ self.plugin.opt_names = ['baz', 'empty', 'test_option']
+ self.plugin.opt_parms = [
+ {'enabled': False}, {'enabled': None}, {'enabled': 'foobar'}
+ ]
+
+ def test_simple_lookup(self):
+ self.assertEquals(self.plugin.get_option('test_option'), 'foobar')
+
+ def test_cascade(self):
+ self.assertEquals(self.plugin.get_option(('baz')), False)
+
+
+if __name__ == "__main__":
+ unittest.main()
+
+# vim: set et ts=4 sw=4 :
diff --git a/tests/unittests/path/to/leaf b/tests/unittests/path/to/leaf
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/path/to/leaf
diff --git a/tests/unittests/plugin_tests.py b/tests/unittests/plugin_tests.py
new file mode 100644
index 00000000..2f362c94
--- /dev/null
+++ b/tests/unittests/plugin_tests.py
@@ -0,0 +1,479 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+import unittest
+import os
+import tempfile
+import shutil
+
+from io import StringIO
+
+from sos.report.plugins import Plugin, regex_findall, _mangle_command
+from sos.archive import TarFileArchive
+from sos.policies.distros import LinuxPolicy
+from sos.policies.init_systems import InitSystem
+
+PATH = os.path.dirname(__file__)
+
+
+def j(filename):
+ return os.path.join(PATH, filename)
+
+
+def create_file(size, dir=None):
+ f = tempfile.NamedTemporaryFile(delete=False, dir=dir)
+ f.write(b"*" * size * 1024 * 1024)
+ f.flush()
+ f.close()
+ return f.name
+
+
+class MockArchive(TarFileArchive):
+
+ def __init__(self):
+ self.m = {}
+ self.strings = {}
+
+ def name(self):
+ return "mock.archive"
+
+ def add_file(self, src, dest=None):
+ if not dest:
+ dest = src
+ self.m[src] = dest
+
+ def add_string(self, content, dest):
+ self.m[dest] = content
+
+ def add_link(self, dest, link_name):
+ pass
+
+ def open_file(self, name):
+ return open(self.m.get(name), 'r')
+
+ def close(self):
+ pass
+
+ def compress(self, method):
+ pass
+
+
+class MockPlugin(Plugin):
+
+ option_list = [("opt", 'an option', 'fast', None),
+ ("opt2", 'another option', 'fast', False)]
+
+ def setup(self):
+ pass
+
+
+class NamedMockPlugin(Plugin):
+
+ short_desc = "This plugin has a description."
+ plugin_name = "testing"
+
+ def setup(self):
+ pass
+
+
+class PostprocMockPlugin(Plugin):
+
+ did_postproc = False
+
+ def setup(self):
+ pass
+
+ def postproc(self):
+ if self.get_option('postproc'):
+ self.did_postproc = True
+
+
+class ForbiddenMockPlugin(Plugin):
+ """This plugin has a description."""
+
+ plugin_name = "forbidden"
+
+ def setup(self):
+ self.add_copy_spec("tests")
+ self.add_forbidden_path("tests")
+
+
+class EnablerPlugin(Plugin):
+
+ def is_installed(self, pkg):
+ return self.is_installed
+
+
+class MockOptions(object):
+ all_logs = False
+ dry_run = False
+ since = None
+ log_size = 25
+ allow_system_changes = False
+ no_postproc = False
+ skip_files = []
+ skip_commands = []
+
+
+class PluginToolTests(unittest.TestCase):
+
+ def test_regex_findall(self):
+ test_s = u"\n".join(
+ ['this is only a test', 'there are only two lines'])
+ test_fo = StringIO(test_s)
+ matches = regex_findall(r".*lines$", test_fo)
+ self.assertEquals(matches, ['there are only two lines'])
+
+ def test_regex_findall_miss(self):
+ test_s = u"\n".join(
+ ['this is only a test', 'there are only two lines'])
+ test_fo = StringIO(test_s)
+ matches = regex_findall(r".*not_there$", test_fo)
+ self.assertEquals(matches, [])
+
+ def test_regex_findall_bad_input(self):
+ matches = regex_findall(r".*", None)
+ self.assertEquals(matches, [])
+ matches = regex_findall(r".*", [])
+ self.assertEquals(matches, [])
+ matches = regex_findall(r".*", 1)
+ self.assertEquals(matches, [])
+
+ def test_mangle_command(self):
+ name_max = 255
+ self.assertEquals("foo", _mangle_command("/usr/bin/foo", name_max))
+ self.assertEquals(
+ "foo_-x", _mangle_command("/usr/bin/foo -x", name_max))
+ self.assertEquals(
+ "foo_--verbose", _mangle_command("/usr/bin/foo --verbose",
+ name_max))
+ self.assertEquals("foo_.path.to.stuff", _mangle_command(
+ "/usr/bin/foo /path/to/stuff", name_max))
+ longcmd = "foo is " + "a" * 256 + " long_command"
+ expected = longcmd[0:name_max].replace(' ', '_')
+ self.assertEquals(expected, _mangle_command(longcmd, name_max))
+
+
+class PluginTests(unittest.TestCase):
+
+ sysroot = os.getcwd()
+
+ def setUp(self):
+ self.mp = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.mp.archive = MockArchive()
+
+ def test_plugin_default_name(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.name(), "mockplugin")
+
+ def test_plugin_set_name(self):
+ p = NamedMockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.name(), "testing")
+
+ def test_plugin_no_descrip(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.get_description(), "<no description available>")
+
+ def test_plugin_has_descrip(self):
+ p = NamedMockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.get_description(),
+ "This plugin has a description.")
+
+ def test_set_plugin_option(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ p.set_option("opt", "testing")
+ self.assertEquals(p.get_option("opt"), "testing")
+
+ def test_set_nonexistant_plugin_option(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertFalse(p.set_option("badopt", "testing"))
+
+ def test_get_nonexistant_plugin_option(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.get_option("badopt"), 0)
+
+ def test_get_unset_plugin_option(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.get_option("opt"), 0)
+
+ def test_get_unset_plugin_option_with_default(self):
+ # this shows that even when we pass in a default to get,
+ # we'll get the option's default as set in the plugin
+ # this might not be what we really want
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.get_option("opt", True), True)
+
+ def test_get_unset_plugin_option_with_default_not_none(self):
+ # this shows that even when we pass in a default to get,
+ # if the plugin default is not None
+ # we'll get the option's default as set in the plugin
+ # this might not be what we really want
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.get_option("opt2", True), False)
+
+ def test_get_option_as_list_plugin_option(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ p.set_option("opt", "one,two,three")
+ self.assertEquals(p.get_option_as_list("opt"), ['one', 'two', 'three'])
+
+ def test_get_option_as_list_plugin_option_default(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ self.assertEquals(p.get_option_as_list("opt", default=[]), [])
+
+ def test_get_option_as_list_plugin_option_not_list(self):
+ p = MockPlugin({
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+ p.set_option("opt", "testing")
+ self.assertEquals(p.get_option_as_list("opt"), ['testing'])
+
+ def test_copy_dir(self):
+ self.mp._do_copy_path("tests")
+ self.assertEquals(
+ self.mp.archive.m["tests/plugin_tests.py"],
+ 'tests/plugin_tests.py')
+
+ def test_copy_dir_bad_path(self):
+ self.mp._do_copy_path("not_here_tests")
+ self.assertEquals(self.mp.archive.m, {})
+
+ def test_copy_dir_forbidden_path(self):
+ p = ForbiddenMockPlugin({
+ 'cmdlineopts': MockOptions(),
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'devices': {}
+ })
+ p.archive = MockArchive()
+ p.setup()
+ p.collect()
+ self.assertEquals(p.archive.m, {})
+
+ def test_postproc_default_on(self):
+ p = PostprocMockPlugin({
+ 'cmdlineopts': MockOptions(),
+ 'sysroot': self.sysroot,
+ 'policy': LinuxPolicy(init=InitSystem()),
+ 'devices': {}
+ })
+ p.postproc()
+ self.assertTrue(p.did_postproc)
+
+
+class AddCopySpecTests(unittest.TestCase):
+
+ expect_paths = set(['tests/tail_test.txt'])
+
+ def setUp(self):
+ self.mp = MockPlugin({
+ 'cmdlineopts': MockOptions(),
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'sysroot': os.getcwd(),
+ 'devices': {}
+ })
+ self.mp.archive = MockArchive()
+
+ def assert_expect_paths(self):
+ def pathmunge(path):
+ if path[0] == '/':
+ path = path[1:]
+ return os.path.join(self.mp.sysroot, path)
+ expected_paths = set(map(pathmunge, self.expect_paths))
+ self.assertEquals(self.mp.copy_paths, expected_paths)
+
+ def test_single_file_no_limit(self):
+ self.mp.add_copy_spec("tests/tail_test.txt")
+ self.assert_expect_paths()
+
+ def test_single_file_under_limit(self):
+ self.mp.add_copy_spec("tests/tail_test.txt", 1)
+ self.assert_expect_paths()
+
+ def test_single_file_over_limit(self):
+ self.mp.sysroot = '/'
+ fn = create_file(2) # create 2MB file, consider a context manager
+ self.mp.add_copy_spec(fn, 1)
+ content, fname = self.mp.copy_strings[0]
+ self.assertTrue("tailed" in fname)
+ self.assertTrue("tmp" in fname)
+ self.assertTrue("/" not in fname)
+ self.assertEquals(1024 * 1024, len(content))
+ os.unlink(fn)
+
+ def test_bad_filename(self):
+ self.mp.sysroot = '/'
+ self.assertFalse(self.mp.add_copy_spec('', 1))
+ self.assertFalse(self.mp.add_copy_spec(None, 1))
+
+ def test_glob_file(self):
+ self.mp.add_copy_spec('tests/tail_test.*')
+ self.assert_expect_paths()
+
+ def test_glob_file_limit_no_limit(self):
+ self.mp.sysroot = '/'
+ tmpdir = tempfile.mkdtemp()
+ create_file(2, dir=tmpdir)
+ create_file(2, dir=tmpdir)
+ self.mp.add_copy_spec(tmpdir + "/*")
+ self.assertEquals(len(self.mp.copy_paths), 2)
+ shutil.rmtree(tmpdir)
+
+ def test_glob_file_over_limit(self):
+ self.mp.sysroot = '/'
+ tmpdir = tempfile.mkdtemp()
+ create_file(2, dir=tmpdir)
+ create_file(2, dir=tmpdir)
+ self.mp.add_copy_spec(tmpdir + "/*", 1)
+ self.assertEquals(len(self.mp.copy_strings), 1)
+ content, fname = self.mp.copy_strings[0]
+ self.assertTrue("tailed" in fname)
+ self.assertEquals(1024 * 1024, len(content))
+ shutil.rmtree(tmpdir)
+
+ def test_multiple_files_no_limit(self):
+ self.mp.add_copy_spec(['tests/tail_test.txt', 'tests/test.txt'])
+ self.assertEquals(len(self.mp.copy_paths), 2)
+
+ def test_multiple_files_under_limit(self):
+ self.mp.add_copy_spec(['tests/tail_test.txt', 'tests/test.txt'], 1)
+ self.assertEquals(len(self.mp.copy_paths), 2)
+
+
+class CheckEnabledTests(unittest.TestCase):
+
+ def setUp(self):
+ self.mp = EnablerPlugin({
+ 'policy': LinuxPolicy(probe_runtime=False),
+ 'sysroot': os.getcwd(),
+ 'cmdlineopts': MockOptions(),
+ 'devices': {}
+ })
+
+ def test_checks_for_file(self):
+ f = j("tail_test.txt")
+ self.mp.files = (f,)
+ self.assertTrue(self.mp.check_enabled())
+
+ def test_checks_for_package(self):
+ self.mp.packages = ('foo',)
+ self.assertTrue(self.mp.check_enabled())
+
+ def test_allows_bad_tuple(self):
+ f = j("tail_test.txt")
+ self.mp.files = (f)
+ self.mp.packages = ('foo')
+ self.assertTrue(self.mp.check_enabled())
+
+ def test_enabled_by_default(self):
+ self.assertTrue(self.mp.check_enabled())
+
+
+class RegexSubTests(unittest.TestCase):
+
+ def setUp(self):
+ self.mp = MockPlugin({
+ 'cmdlineopts': MockOptions(),
+ 'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
+ 'sysroot': os.getcwd(),
+ 'devices': {}
+ })
+ self.mp.archive = MockArchive()
+
+ def test_file_never_copied(self):
+ self.assertEquals(0, self.mp.do_file_sub(
+ "never_copied", r"^(.*)$", "foobar"))
+
+ def test_no_replacements(self):
+ self.mp.add_copy_spec(j("tail_test.txt"))
+ self.mp.collect()
+ replacements = self.mp.do_file_sub(
+ j("tail_test.txt"), r"wont_match", "foobar")
+ self.assertEquals(0, replacements)
+
+ def test_replacements(self):
+ # test uses absolute paths
+ self.mp.sysroot = '/'
+ self.mp.add_copy_spec(j("tail_test.txt"))
+ self.mp.collect()
+ replacements = self.mp.do_file_sub(
+ j("tail_test.txt"), r"(tail)", "foobar")
+ self.assertEquals(1, replacements)
+ self.assertTrue("foobar" in self.mp.archive.m.get(j('tail_test.txt')))
+
+
+if __name__ == "__main__":
+ unittest.main()
+
+# vim: set et ts=4 sw=4 :
diff --git a/tests/unittests/policy_tests.py b/tests/unittests/policy_tests.py
new file mode 100644
index 00000000..6d0c42b9
--- /dev/null
+++ b/tests/unittests/policy_tests.py
@@ -0,0 +1,103 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+import unittest
+
+from sos.policies import Policy, import_policy
+from sos.policies.distros import LinuxPolicy
+from sos.policies.package_managers import PackageManager
+from sos.report.plugins import (Plugin, IndependentPlugin,
+ RedHatPlugin, DebianPlugin)
+
+
+class FauxPolicy(Policy):
+ distro = "Faux"
+
+
+class FauxLinuxPolicy(LinuxPolicy):
+ distro = "FauxLinux"
+
+ @classmethod
+ def set_forbidden_paths(cls):
+ return ['/etc/secret']
+
+
+class FauxPlugin(Plugin, IndependentPlugin):
+ pass
+
+
+class FauxRedHatPlugin(Plugin, RedHatPlugin):
+ pass
+
+
+class FauxDebianPlugin(Plugin, DebianPlugin):
+ pass
+
+
+class PolicyTests(unittest.TestCase):
+
+
+ def test_independent_only(self):
+ p = FauxPolicy()
+ p.valid_subclasses = []
+
+ self.assertTrue(p.validate_plugin(FauxPlugin))
+
+ def test_forbidden_paths_building(self):
+ p = FauxLinuxPolicy(probe_runtime=False)
+ self.assertTrue('*.pyc' in p.forbidden_paths)
+ self.assertTrue('/etc/passwd' in p.forbidden_paths)
+ self.assertTrue('/etc/secret' in p.forbidden_paths)
+
+ def test_redhat(self):
+ p = FauxPolicy()
+ p.valid_subclasses = [RedHatPlugin]
+
+ self.assertTrue(p.validate_plugin(FauxRedHatPlugin))
+
+ def test_debian(self):
+ p = FauxPolicy()
+ p.valid_subclasses = [DebianPlugin]
+
+ self.assertTrue(p.validate_plugin(FauxDebianPlugin))
+
+ def test_fails(self):
+ p = FauxPolicy()
+ p.valid_subclasses = []
+
+ self.assertFalse(p.validate_plugin(FauxDebianPlugin))
+
+ def test_can_import(self):
+ self.assertTrue(import_policy('redhat') is not None)
+
+ def test_cant_import(self):
+ self.assertTrue(import_policy('notreal') is None)
+
+
+class PackageManagerTests(unittest.TestCase):
+
+ def setUp(self):
+ self.pm = PackageManager()
+
+ def test_default_all_pkgs(self):
+ self.assertEquals(self.pm.all_pkgs(), {})
+
+ def test_default_all_pkgs_by_name(self):
+ self.assertEquals(self.pm.all_pkgs_by_name('doesntmatter'), [])
+
+ def test_default_all_pkgs_by_name_regex(self):
+ self.assertEquals(
+ self.pm.all_pkgs_by_name_regex('.*doesntmatter$'), [])
+
+ def test_default_pkg_by_name(self):
+ self.assertEquals(self.pm.pkg_by_name('foo'), None)
+
+
+if __name__ == "__main__":
+ unittest.main()
+
+# vim: set et ts=4 sw=4 :
diff --git a/tests/unittests/report_tests.py b/tests/unittests/report_tests.py
new file mode 100644
index 00000000..bb059012
--- /dev/null
+++ b/tests/unittests/report_tests.py
@@ -0,0 +1,155 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+import unittest
+
+try:
+ import json
+except ImportError:
+ import simplejson as json
+
+from sos.report.reporting import (Report, Section, Command, CopiedFile,
+ CreatedFile, Alert, PlainTextReport)
+
+
+class ReportTest(unittest.TestCase):
+
+ def test_empty(self):
+ report = Report()
+
+ expected = json.dumps({})
+
+ self.assertEquals(expected, str(report))
+
+ def test_nested_section(self):
+ report = Report()
+ section = Section(name="section")
+ report.add(section)
+
+ expected = json.dumps({"section": {}})
+
+ self.assertEquals(expected, str(report))
+
+ def test_multiple_sections(self):
+ report = Report()
+ section = Section(name="section")
+ report.add(section)
+
+ section2 = Section(name="section2")
+ report.add(section2)
+
+ expected = json.dumps({"section": {},
+ "section2": {}, })
+
+ self.assertEquals(expected, str(report))
+
+ def test_deeply_nested(self):
+ report = Report()
+ section = Section(name="section")
+ command = Command(name="a command", return_code=0,
+ href="does/not/matter")
+
+ section.add(command)
+ report.add(section)
+
+ expected = json.dumps({"section": {
+ "commands": [{"name": "a command",
+ "return_code": 0,
+ "href": "does/not/matter"}]}})
+
+ self.assertEquals(expected, str(report))
+
+
+class TestPlainReport(unittest.TestCase):
+
+ def setUp(self):
+ self.report = Report()
+ self.section = Section(name="plugin")
+ self.div = '\n' + PlainTextReport.PLUGDIVIDER
+ self.pluglist = "Loaded Plugins:\n{pluglist}"
+ self.defaultheader = u''.join([
+ self.pluglist.format(pluglist=" plugin"),
+ self.div,
+ "\nplugin\n"
+ ])
+
+ def test_basic(self):
+ self.assertEquals(self.pluglist.format(pluglist=""),
+ PlainTextReport(self.report).unicode())
+
+ def test_one_section(self):
+ self.report.add(self.section)
+
+ self.assertEquals(self.defaultheader,
+ PlainTextReport(self.report).unicode() + '\n')
+
+ def test_two_sections(self):
+ section1 = Section(name="first")
+ section2 = Section(name="second")
+ self.report.add(section1, section2)
+
+ self.assertEquals(u''.join([
+ self.pluglist.format(pluglist=" first second"),
+ self.div,
+ "\nfirst",
+ self.div,
+ "\nsecond"
+ ]),
+ PlainTextReport(self.report).unicode())
+
+ def test_command(self):
+ cmd = Command(name="ls -al /foo/bar/baz",
+ return_code=0,
+ href="sos_commands/plugin/ls_-al_foo.bar.baz")
+ self.section.add(cmd)
+ self.report.add(self.section)
+
+ self.assertEquals(u''.join([
+ self.defaultheader,
+ "- commands executed:\n * ls -al /foo/bar/baz"
+ ]),
+ PlainTextReport(self.report).unicode())
+
+ def test_copied_file(self):
+ cf = CopiedFile(name="/etc/hosts", href="etc/hosts")
+ self.section.add(cf)
+ self.report.add(self.section)
+
+ self.assertEquals(u''.join([
+ self.defaultheader,
+ "- files copied:\n * /etc/hosts"
+ ]),
+ PlainTextReport(self.report).unicode())
+
+ def test_created_file(self):
+ crf = CreatedFile(name="sample.txt",
+ href="../sos_strings/sample/sample.txt")
+ self.section.add(crf)
+ self.report.add(self.section)
+
+ self.assertEquals(u''.join([
+ self.defaultheader,
+ "- files created:\n * sample.txt"
+ ]),
+ PlainTextReport(self.report).unicode())
+
+ def test_alert(self):
+ alrt = Alert("this is an alert")
+ self.section.add(alrt)
+ self.report.add(self.section)
+
+ self.assertEquals(u''.join([
+ self.defaultheader,
+ "- alerts:\n ! this is an alert"
+ ]),
+ PlainTextReport(self.report).unicode())
+
+
+if __name__ == "__main__":
+ unittest.main()
+
+# vim: set et ts=4 sw=4 :
diff --git a/tests/unittests/sosreport_pexpect.py b/tests/unittests/sosreport_pexpect.py
new file mode 100644
index 00000000..3614fa5b
--- /dev/null
+++ b/tests/unittests/sosreport_pexpect.py
@@ -0,0 +1,36 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+import unittest
+import pexpect
+
+from os import kill
+from signal import SIGINT
+
+
+class PexpectTest(unittest.TestCase):
+ def test_plugins_install(self):
+ sos = pexpect.spawn('/usr/sbin/sosreport -l')
+ try:
+ sos.expect('plugin.*does not install, skipping')
+ except pexpect.EOF:
+ pass
+ else:
+ self.fail("a plugin does not install or sosreport is too slow")
+ kill(sos.pid, SIGINT)
+
+ def test_batchmode_removes_questions(self):
+ sos = pexpect.spawn('/usr/sbin/sosreport --batch')
+ grp = sos.expect('send this file to your support representative.', 15)
+ self.assertEquals(grp, 0)
+ kill(sos.pid, SIGINT)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+# vim: set et ts=4 sw=4 :
diff --git a/tests/unittests/tail_test.txt b/tests/unittests/tail_test.txt
new file mode 100644
index 00000000..8def0f72
--- /dev/null
+++ b/tests/unittests/tail_test.txt
@@ -0,0 +1,4 @@
+this is a file to test tail with
+I have a few lines in here
+I just need enough text to mess with it
+this is the last line
diff --git a/tests/unittests/test.txt b/tests/unittests/test.txt
new file mode 100644
index 00000000..d95f3ad1
--- /dev/null
+++ b/tests/unittests/test.txt
@@ -0,0 +1 @@
+content
diff --git a/tests/unittests/utilities_tests.py b/tests/unittests/utilities_tests.py
new file mode 100644
index 00000000..64be9f1e
--- /dev/null
+++ b/tests/unittests/utilities_tests.py
@@ -0,0 +1,103 @@
+# This file is part of the sos project: https://github.com/sosreport/sos
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions of
+# version 2 of the GNU General Public License.
+#
+# See the LICENSE file in the source distribution for further information.
+import os.path
+import unittest
+
+# PYCOMPAT
+from io import StringIO
+
+from sos.utilities import (grep, is_executable, sos_get_command_output,
+ find, tail, shell_out)
+
+TEST_DIR = os.path.dirname(__file__)
+
+
+class GrepTest(unittest.TestCase):
+
+ def test_file_obj(self):
+ test_s = u"\n".join(
+ ['this is only a test', 'there are only two lines'])
+ test_fo = StringIO(test_s)
+ matches = grep(".*test$", test_fo)
+ self.assertEquals(matches, ['this is only a test\n'])
+
+ def test_real_file(self):
+ matches = grep(".*unittest$", __file__.replace(".pyc", ".py"))
+ self.assertEquals(matches, ['import unittest\n'])
+
+ def test_open_file(self):
+ matches = grep(".*unittest$", open(__file__.replace(".pyc", ".py")))
+ self.assertEquals(matches, ['import unittest\n'])
+
+ def test_grep_multiple_files(self):
+ matches = grep(".*unittest$",
+ __file__.replace(".pyc", ".py"), "does_not_exist.txt")
+ self.assertEquals(matches, ['import unittest\n'])
+
+
+class TailTest(unittest.TestCase):
+
+ def test_tail(self):
+ t = tail("tests/tail_test.txt", 10)
+ self.assertEquals(t, b"last line\n")
+
+ def test_tail_too_many(self):
+ t = tail("tests/tail_test.txt", 200)
+ expected = open("tests/tail_test.txt", "r").read()
+ self.assertEquals(t, str.encode(expected))
+
+
+class ExecutableTest(unittest.TestCase):
+
+ def test_nonexe_file(self):
+ path = os.path.join(TEST_DIR, 'utility_tests.py')
+ self.assertFalse(is_executable(path))
+
+ def test_exe_file(self):
+ self.assertTrue(is_executable('true'))
+
+ def test_exe_file_abs_path(self):
+ self.assertTrue(is_executable("/usr/bin/timeout"))
+
+ def test_output(self):
+ result = sos_get_command_output("echo executed")
+ self.assertEquals(result['status'], 0)
+ self.assertEquals(result['output'], "executed\n")
+
+ def test_output_non_exe(self):
+ path = os.path.join(TEST_DIR, 'utility_tests.py')
+ result = sos_get_command_output(path)
+ self.assertEquals(result['status'], 127)
+ self.assertEquals(result['output'], b"")
+
+ def test_output_chdir(self):
+ cmd = "/bin/bash -c 'echo $PWD'"
+ result = sos_get_command_output(cmd, chdir=TEST_DIR)
+ print(result)
+ self.assertEquals(result['status'], 0)
+ self.assertEquals(result['output'].strip(), TEST_DIR)
+
+ def test_shell_out(self):
+ self.assertEquals("executed\n", shell_out('echo executed'))
+
+
+class FindTest(unittest.TestCase):
+
+ def test_find_leaf(self):
+ leaves = find("leaf", TEST_DIR)
+ self.assertTrue(any(name.endswith("leaf") for name in leaves))
+
+ def test_too_shallow(self):
+ leaves = find("leaf", TEST_DIR, max_depth=1)
+ self.assertFalse(any(name.endswith("leaf") for name in leaves))
+
+ def test_not_in_pattern(self):
+ leaves = find("leaf", TEST_DIR, path_pattern="tests/path")
+ self.assertFalse(any(name.endswith("leaf") for name in leaves))
+
+# vim: set et ts=4 sw=4 :
diff --git a/tests/unittests/ziptest b/tests/unittests/ziptest
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/ziptest
diff --git a/tests/unittests/ziptest_link b/tests/unittests/ziptest_link
new file mode 120000
index 00000000..e99bb13c
--- /dev/null
+++ b/tests/unittests/ziptest_link
@@ -0,0 +1 @@
+ziptest \ No newline at end of file