aboutsummaryrefslogblamecommitdiffstats
path: root/tests/plugin_tests.py
blob: 548020509f8e37f7a722e1b0ef6631b964551c15 (plain) (tree)
1
2
3
4
5
6
7
8
9
10






                                                                          


               
             
 
                       
 
                                                                     
                                      

                                                


                                
 


                                       
 
                                





                                                          
 
                                  






























                                          
                                                      
                                                             





                              
 
                                                 





                           











                                       





                                        
                                   
                                        



                            
                                



                                
                    
                   
                
                 
                                
                       

                   




                                         

                                                                




                                                                

                                                                











                                                         
                                  

                                                                           







                                                                      

                                                                       
 


                                     

                         

                              
                                    
                                                                          

                                         



                                       


                                                                          

                                         
          


                                                 


                                                                          

                                         
          


                                              


                                                                          

                                         
          

                                                                            
                                      


                                                                          

                                         
          

                                                           

                                     


                                                                          

                                         
          

                                                         

                                                 


                                                                          

                                         
          
                                                           

                                                 


                                                                          

                                         
          
                                                    

                                           


                                                                          

                                         
          
                                                 




                                                                


                                                                          

                                         
          
                                                          





                                                                 


                                                                          

                                         
          
                                                            

                                                    


                                                                          

                                         
          

                                                                               

                                                            


                                                                          

                                         
          
                                                                      

                                                             


                                                                          

                                         
          

                                                                   

                            
                                      


                                                       
 
                                     
                                               



                                                
                                         
                                    

                                                                          


                                 
                   

                                          



                                         

                                                     



                                       
 


                                               


                              
                                         
                                                                          
                                   
                         


                                       
                                  





                                                               
 
                                        
                                                    

                                  
                                           
                                                       

                                  
                                          
                             
                                                                          
                                    
                                                
                                          
                                       




                                                    
                             

                                                        
 



                                                  


                                            

                                  
                                            


                                                     
                                        
                             
                                   

                                  
                                               

                                                       

                                                    
                             
 
                                           
                                                                        


                                                     
                                                                           

                                                     



                                           
                                 
                                                       
                                   

                                         
          



                                   
                                                


                                      
                                                




                                    
                                                

                                      
                                                





                                       
                                         
                                                                          

                                   



                                       

                                                 

                                   
                                                 
                         

                                                        


                                          

                                  
                                                 
                         

                                                    


                                                                              
 

                          
 
                         
# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.
import unittest
import os
import tempfile
import shutil

from io import StringIO

from sos.report.plugins import Plugin, regex_findall, _mangle_command
from sos.archive import TarFileArchive
from sos.policies.distros import LinuxPolicy
from sos.policies.init_systems import InitSystem

PATH = os.path.dirname(__file__)


def j(filename):
    return os.path.join(PATH, filename)


def create_file(size, dir=None):
    f = tempfile.NamedTemporaryFile(delete=False, dir=dir)
    f.write(b"*" * size * 1024 * 1024)
    f.flush()
    f.close()
    return f.name


class MockArchive(TarFileArchive):

    def __init__(self):
        self.m = {}
        self.strings = {}

    def name(self):
        return "mock.archive"

    def add_file(self, src, dest=None):
        if not dest:
            dest = src
        self.m[src] = dest

    def add_string(self, content, dest):
        self.m[dest] = content

    def add_link(self, dest, link_name):
        pass

    def open_file(self, name):
        return open(self.m.get(name), 'r')

    def close(self):
        pass

    def compress(self, method):
        pass


class MockPlugin(Plugin):

    option_list = [("opt", 'an option', 'fast', None),
                   ("opt2", 'another option', 'fast', False)]

    def setup(self):
        pass


class NamedMockPlugin(Plugin):

    short_desc = "This plugin has a description."
    plugin_name = "testing"

    def setup(self):
        pass


class PostprocMockPlugin(Plugin):

    did_postproc = False

    def setup(self):
        pass

    def postproc(self):
        if self.get_option('postproc'):
            self.did_postproc = True


class ForbiddenMockPlugin(Plugin):
    """This plugin has a description."""

    plugin_name = "forbidden"

    def setup(self):
        self.add_copy_spec("tests")
        self.add_forbidden_path("tests")


class EnablerPlugin(Plugin):

    def is_installed(self, pkg):
        return self.is_installed


class MockOptions(object):
    all_logs = False
    dry_run = False
    since = None
    log_size = 25
    allow_system_changes = False
    no_postproc = False
    skip_files = []
    skip_cmds = []


class PluginToolTests(unittest.TestCase):

    def test_regex_findall(self):
        test_s = u"\n".join(
            ['this is only a test', 'there are only two lines'])
        test_fo = StringIO(test_s)
        matches = regex_findall(r".*lines$", test_fo)
        self.assertEquals(matches, ['there are only two lines'])

    def test_regex_findall_miss(self):
        test_s = u"\n".join(
            ['this is only a test', 'there are only two lines'])
        test_fo = StringIO(test_s)
        matches = regex_findall(r".*not_there$", test_fo)
        self.assertEquals(matches, [])

    def test_regex_findall_bad_input(self):
        matches = regex_findall(r".*", None)
        self.assertEquals(matches, [])
        matches = regex_findall(r".*", [])
        self.assertEquals(matches, [])
        matches = regex_findall(r".*", 1)
        self.assertEquals(matches, [])

    def test_mangle_command(self):
        name_max = 255
        self.assertEquals("foo", _mangle_command("/usr/bin/foo", name_max))
        self.assertEquals(
            "foo_-x", _mangle_command("/usr/bin/foo -x", name_max))
        self.assertEquals(
            "foo_--verbose", _mangle_command("/usr/bin/foo --verbose",
                                             name_max))
        self.assertEquals("foo_.path.to.stuff", _mangle_command(
            "/usr/bin/foo /path/to/stuff", name_max))
        longcmd = "foo is " + "a" * 256 + " long_command"
        expected = longcmd[0:name_max].replace(' ', '_')
        self.assertEquals(expected, _mangle_command(longcmd, name_max))


class PluginTests(unittest.TestCase):

    sysroot = os.getcwd()

    def setUp(self):
        self.mp = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.mp.archive = MockArchive()

    def test_plugin_default_name(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.name(), "mockplugin")

    def test_plugin_set_name(self):
        p = NamedMockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.name(), "testing")

    def test_plugin_no_descrip(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.get_description(), "<no description available>")

    def test_plugin_has_descrip(self):
        p = NamedMockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.get_description(),
                          "This plugin has a description.")

    def test_set_plugin_option(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        p.set_option("opt", "testing")
        self.assertEquals(p.get_option("opt"), "testing")

    def test_set_nonexistant_plugin_option(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertFalse(p.set_option("badopt", "testing"))

    def test_get_nonexistant_plugin_option(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.get_option("badopt"), 0)

    def test_get_unset_plugin_option(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.get_option("opt"), 0)

    def test_get_unset_plugin_option_with_default(self):
        # this shows that even when we pass in a default to get,
        # we'll get the option's default as set in the plugin
        # this might not be what we really want
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.get_option("opt", True), True)

    def test_get_unset_plugin_option_with_default_not_none(self):
        # this shows that even when we pass in a default to get,
        # if the plugin default is not None
        # we'll get the option's default as set in the plugin
        # this might not be what we really want
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.get_option("opt2", True), False)

    def test_get_option_as_list_plugin_option(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        p.set_option("opt", "one,two,three")
        self.assertEquals(p.get_option_as_list("opt"), ['one', 'two', 'three'])

    def test_get_option_as_list_plugin_option_default(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        self.assertEquals(p.get_option_as_list("opt", default=[]), [])

    def test_get_option_as_list_plugin_option_not_list(self):
        p = MockPlugin({
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })
        p.set_option("opt", "testing")
        self.assertEquals(p.get_option_as_list("opt"), ['testing'])

    def test_copy_dir(self):
        self.mp._do_copy_path("tests")
        self.assertEquals(
            self.mp.archive.m["tests/plugin_tests.py"],
            'tests/plugin_tests.py')

    def test_copy_dir_bad_path(self):
        self.mp._do_copy_path("not_here_tests")
        self.assertEquals(self.mp.archive.m, {})

    def test_copy_dir_forbidden_path(self):
        p = ForbiddenMockPlugin({
            'cmdlineopts': MockOptions(),
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'devices': {}
        })
        p.archive = MockArchive()
        p.setup()
        p.collect()
        self.assertEquals(p.archive.m, {})

    def test_postproc_default_on(self):
        p = PostprocMockPlugin({
            'cmdlineopts': MockOptions(),
            'sysroot': self.sysroot,
            'policy': LinuxPolicy(init=InitSystem()),
            'devices': {}
        })
        p.postproc()
        self.assertTrue(p.did_postproc)


class AddCopySpecTests(unittest.TestCase):

    expect_paths = set(['tests/tail_test.txt'])

    def setUp(self):
        self.mp = MockPlugin({
            'cmdlineopts': MockOptions(),
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'sysroot': os.getcwd(),
            'devices': {}
        })
        self.mp.archive = MockArchive()

    def assert_expect_paths(self):
        def pathmunge(path):
            if path[0] == '/':
                path = path[1:]
            return os.path.join(self.mp.sysroot, path)
        expected_paths = set(map(pathmunge, self.expect_paths))
        self.assertEquals(self.mp.copy_paths, expected_paths)

    def test_single_file_no_limit(self):
        self.mp.add_copy_spec("tests/tail_test.txt")
        self.assert_expect_paths()

    def test_single_file_under_limit(self):
        self.mp.add_copy_spec("tests/tail_test.txt", 1)
        self.assert_expect_paths()

    def test_single_file_over_limit(self):
        self.mp.sysroot = '/'
        fn = create_file(2)  # create 2MB file, consider a context manager
        self.mp.add_copy_spec(fn, 1)
        content, fname = self.mp.copy_strings[0]
        self.assertTrue("tailed" in fname)
        self.assertTrue("tmp" in fname)
        self.assertTrue("/" not in fname)
        self.assertEquals(1024 * 1024, len(content))
        os.unlink(fn)

    def test_bad_filename(self):
        self.mp.sysroot = '/'
        self.assertFalse(self.mp.add_copy_spec('', 1))
        self.assertFalse(self.mp.add_copy_spec(None, 1))

    def test_glob_file(self):
        self.mp.add_copy_spec('tests/tail_test.*')
        self.assert_expect_paths()

    def test_glob_file_limit_no_limit(self):
        self.mp.sysroot = '/'
        tmpdir = tempfile.mkdtemp()
        create_file(2, dir=tmpdir)
        create_file(2, dir=tmpdir)
        self.mp.add_copy_spec(tmpdir + "/*")
        self.assertEquals(len(self.mp.copy_paths), 2)
        shutil.rmtree(tmpdir)

    def test_glob_file_over_limit(self):
        self.mp.sysroot = '/'
        tmpdir = tempfile.mkdtemp()
        create_file(2, dir=tmpdir)
        create_file(2, dir=tmpdir)
        self.mp.add_copy_spec(tmpdir + "/*", 1)
        self.assertEquals(len(self.mp.copy_strings), 1)
        content, fname = self.mp.copy_strings[0]
        self.assertTrue("tailed" in fname)
        self.assertEquals(1024 * 1024, len(content))
        shutil.rmtree(tmpdir)

    def test_multiple_files_no_limit(self):
        self.mp.add_copy_spec(['tests/tail_test.txt', 'tests/test.txt'])
        self.assertEquals(len(self.mp.copy_paths), 2)

    def test_multiple_files_under_limit(self):
        self.mp.add_copy_spec(['tests/tail_test.txt', 'tests/test.txt'], 1)
        self.assertEquals(len(self.mp.copy_paths), 2)


class CheckEnabledTests(unittest.TestCase):

    def setUp(self):
        self.mp = EnablerPlugin({
            'policy': LinuxPolicy(probe_runtime=False),
            'sysroot': os.getcwd(),
            'cmdlineopts': MockOptions(),
            'devices': {}
        })

    def test_checks_for_file(self):
        f = j("tail_test.txt")
        self.mp.files = (f,)
        self.assertTrue(self.mp.check_enabled())

    def test_checks_for_package(self):
        self.mp.packages = ('foo',)
        self.assertTrue(self.mp.check_enabled())

    def test_allows_bad_tuple(self):
        f = j("tail_test.txt")
        self.mp.files = (f)
        self.mp.packages = ('foo')
        self.assertTrue(self.mp.check_enabled())

    def test_enabled_by_default(self):
        self.assertTrue(self.mp.check_enabled())


class RegexSubTests(unittest.TestCase):

    def setUp(self):
        self.mp = MockPlugin({
            'cmdlineopts': MockOptions(),
            'policy': LinuxPolicy(init=InitSystem(), probe_runtime=False),
            'sysroot': os.getcwd(),
            'devices': {}
        })
        self.mp.archive = MockArchive()

    def test_file_never_copied(self):
        self.assertEquals(0, self.mp.do_file_sub(
            "never_copied", r"^(.*)$", "foobar"))

    def test_no_replacements(self):
        self.mp.add_copy_spec(j("tail_test.txt"))
        self.mp.collect()
        replacements = self.mp.do_file_sub(
            j("tail_test.txt"), r"wont_match", "foobar")
        self.assertEquals(0, replacements)

    def test_replacements(self):
        # test uses absolute paths
        self.mp.sysroot = '/'
        self.mp.add_copy_spec(j("tail_test.txt"))
        self.mp.collect()
        replacements = self.mp.do_file_sub(
            j("tail_test.txt"), r"(tail)", "foobar")
        self.assertEquals(1, replacements)
        self.assertTrue("foobar" in self.mp.archive.m.get(j('tail_test.txt')))


if __name__ == "__main__":
    unittest.main()

# vim: set et ts=4 sw=4 :