diff options
-rw-r--r-- | sos/plugins/__init__.py | 50 | ||||
-rw-r--r-- | tests/option_tests.py | 3 | ||||
-rw-r--r-- | tests/plugin_tests.py | 57 |
3 files changed, 81 insertions, 29 deletions
diff --git a/sos/plugins/__init__.py b/sos/plugins/__init__.py index 413ee739..790338bc 100644 --- a/sos/plugins/__init__.py +++ b/sos/plugins/__init__.py @@ -101,6 +101,7 @@ class Plugin(object): files = () archive = None profiles = () + sysroot = '/' def __init__(self, commons): if not getattr(self, "option_list", False): @@ -117,6 +118,7 @@ class Plugin(object): self.copy_paths = set() self.copy_strings = [] self.collect_cmds = [] + self.sysroot = commons['sysroot'] self.soslog = self.commons['soslog'] if 'soslog' in self.commons \ else logging.getLogger('sos') @@ -154,6 +156,19 @@ class Plugin(object): def policy(self): return self.commons["policy"] + def join_sysroot(self, path): + if path[0] == os.sep: + path = path[1:] + return os.path.join(self.sysroot, path) + + def strip_sysroot(self, path): + if path.startswith(self.sysroot): + return path[len(self.sysroot):] + return path + + def use_sysroot(self): + return self.sysroot != os.path.abspath(os.sep) + def is_installed(self, package_name): '''Is the package $package_name installed?''' return self.policy().pkg_by_name(package_name) is not None @@ -207,6 +222,7 @@ class Plugin(object): ''' try: path = self._get_dest_for_srcpath(srcpath) + self._log_debug("substituting scrpath '%s'" % srcpath) self._log_debug("substituting '%s' for '%s' in '%s'" % (subst, regexp, path)) if not path: @@ -257,8 +273,9 @@ class Plugin(object): self._log_debug("copying link '%s' pointing to '%s' with isdir=%s" % (srcpath, linkdest, os.path.isdir(absdest))) + dstpath = self.strip_sysroot(srcpath) # use the relative target path in the tarball - self.archive.add_link(reldest, srcpath) + self.archive.add_link(reldest, dstpath) if os.path.isdir(absdest): self._log_debug("link '%s' is a directory, skipping..." % linkdest) @@ -277,7 +294,7 @@ class Plugin(object): % linkdest) self.copied_files.append({'srcpath': srcpath, - 'dstpath': srcpath, + 'dstpath': dstpath, 'symlink': "yes", 'pointsto': linkdest}) @@ -288,6 +305,8 @@ class Plugin(object): self._do_copy_path(os.path.join(srcpath, afile), dest=None) def _get_dest_for_srcpath(self, srcpath): + if self.use_sysroot(): + srcpath = self.join_sysroot(srcpath) for copied in self.copied_files: if srcpath == copied["srcpath"]: return copied["dstpath"] @@ -315,6 +334,9 @@ class Plugin(object): if not dest: dest = srcpath + if self.use_sysroot(): + dest = self.strip_sysroot(dest) + try: st = os.lstat(srcpath) except (OSError, IOError): @@ -333,7 +355,7 @@ class Plugin(object): if not (stat.S_ISREG(st.st_mode) or stat.S_ISDIR(st.st_mode)): ntype = _node_type(st) self._log_debug("creating %s node at archive:'%s'" - % (ntype, srcpath)) + % (ntype, dest)) self._copy_node(srcpath, st) return @@ -347,9 +369,11 @@ class Plugin(object): else: self.archive.add_file(srcpath, dest) - self.copied_files.append({'srcpath': srcpath, - 'dstpath': dest, - 'symlink': "no"}) + self.copied_files.append({ + 'srcpath': srcpath, + 'dstpath': dest, + 'symlink': "no" + }) def add_forbidden_path(self, forbiddenPath): """Specify a path to not copy, even if it's part of a copy_specs[] @@ -416,6 +440,9 @@ class Plugin(object): except Exception: return default + def _add_copy_paths(self, copy_paths): + self.copy_paths.update(copy_paths) + def add_copy_spec_limit(self, copyspec, sizelimit=None, tailit=True): """Add a file or glob but limit it to sizelimit megabytes. If fname is a single file the file will be tailed to meet sizelimit. If the first @@ -424,10 +451,13 @@ class Plugin(object): if not (copyspec and len(copyspec)): return False + if self.use_sysroot(): + copyspec = self.join_sysroot(copyspec) files = glob.glob(copyspec) files.sort() if len(files) == 0: return + current_size = 0 limit_reached = False sizelimit *= 1024 * 1024 # in MB @@ -438,7 +468,7 @@ class Plugin(object): if sizelimit and current_size > sizelimit: limit_reached = True break - self.add_copy_spec(_file) + self._add_copy_paths([_file]) if limit_reached and tailit: file_name = _file @@ -459,12 +489,14 @@ class Plugin(object): if isinstance(copyspecs, six.string_types): copyspecs = [copyspecs] for copyspec in copyspecs: + if self.use_sysroot(): + copyspec = self.join_sysroot(copyspec) if not (copyspec and len(copyspec)): self._log_warn("added null or empty copy spec") return False copy_paths = self._expand_copy_spec(copyspec) - self.copy_paths.update(copy_paths) - self._log_info("added copyspec '%s'" % copyspec) + self._add_copy_paths(copy_paths) + self._log_info("added copyspec '%s'" % copy_paths) def get_command_output(self, prog, timeout=300, runat=None, stderr=True): result = sos_get_command_output(prog, timeout=timeout, runat=runat, diff --git a/tests/option_tests.py b/tests/option_tests.py index fe37ccfe..e8a26e2d 100644 --- a/tests/option_tests.py +++ b/tests/option_tests.py @@ -8,10 +8,11 @@ class GlobalOptionTest(unittest.TestCase): def setUp(self): self.commons = { + 'sysroot': '/', 'global_plugin_options': { 'test_option': 'foobar', 'baz': None, - 'empty_global': True, + 'empty_global': True }, } self.plugin = Plugin(self.commons) diff --git a/tests/plugin_tests.py b/tests/plugin_tests.py index e30ded5c..14d3b49c 100644 --- a/tests/plugin_tests.py +++ b/tests/plugin_tests.py @@ -127,50 +127,53 @@ class PluginToolTests(unittest.TestCase): class PluginTests(unittest.TestCase): + sysroot = os.getcwd() + def setUp(self): self.mp = MockPlugin({ - 'cmdlineopts': MockOptions() + 'cmdlineopts': MockOptions(), + 'sysroot': self.sysroot }) self.mp.archive = MockArchive() def test_plugin_default_name(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.name(), "mockplugin") def test_plugin_set_name(self): - p = NamedMockPlugin({}) + p = NamedMockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.name(), "testing") def test_plugin_no_descrip(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.get_description(), "<no description available>") def test_plugin_no_descrip(self): - p = NamedMockPlugin({}) + p = NamedMockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.get_description(), "This plugin has a description.") def test_set_plugin_option(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) p.set_option("opt", "testing") self.assertEquals(p.get_option("opt"), "testing") def test_set_nonexistant_plugin_option(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) self.assertFalse(p.set_option("badopt", "testing")) def test_get_nonexistant_plugin_option(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.get_option("badopt"), 0) def test_get_unset_plugin_option(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.get_option("opt"), 0) def test_get_unset_plugin_option_with_default(self): # this shows that even when we pass in a default to get, # we'll get the option's default as set in the plugin # this might not be what we really want - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.get_option("opt", True), True) def test_get_unset_plugin_option_with_default_not_none(self): @@ -178,20 +181,20 @@ class PluginTests(unittest.TestCase): # if the plugin default is not None # we'll get the option's default as set in the plugin # this might not be what we really want - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.get_option("opt2", True), False) def test_get_option_as_list_plugin_option(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) p.set_option("opt", "one,two,three") self.assertEquals(p.get_option_as_list("opt"), ['one', 'two', 'three']) def test_get_option_as_list_plugin_option_default(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) self.assertEquals(p.get_option_as_list("opt", default=[]), []) def test_get_option_as_list_plugin_option_not_list(self): - p = MockPlugin({}) + p = MockPlugin({'sysroot': self.sysroot}) p.set_option("opt", "testing") self.assertEquals(p.get_option_as_list("opt"), ['testing']) @@ -205,7 +208,8 @@ class PluginTests(unittest.TestCase): def test_copy_dir_forbidden_path(self): p = ForbiddenMockPlugin({ - 'cmdlineopts': MockOptions() + 'cmdlineopts': MockOptions(), + 'sysroot': self.sysroot }) p.archive = MockArchive() p.setup() @@ -219,12 +223,18 @@ class AddCopySpecTests(unittest.TestCase): def setUp(self): self.mp = MockPlugin({ - 'cmdlineopts': MockOptions() + 'cmdlineopts': MockOptions(), + 'sysroot': os.getcwd() }) self.mp.archive = MockArchive() def assert_expect_paths(self): - self.assertEquals(self.mp.copy_paths, self.expect_paths) + def pathmunge(path): + if path[0] == '/': + path = path[1:] + return os.path.join(self.mp.sysroot, path) + expected_paths = set(map(pathmunge, self.expect_paths)) + self.assertEquals(self.mp.copy_paths, expected_paths) # add_copy_spec() @@ -242,6 +252,7 @@ class AddCopySpecTests(unittest.TestCase): # add_copy_spec_limit() def test_single_file_over_limit(self): + self.mp.sysroot = '/' fn = create_file(2) # create 2MB file, consider a context manager self.mp.add_copy_spec_limit(fn, 1) content, fname = self.mp.copy_strings[0] @@ -252,10 +263,12 @@ class AddCopySpecTests(unittest.TestCase): os.unlink(fn) def test_bad_filename(self): + self.mp.sysroot = '/' self.assertFalse(self.mp.add_copy_spec_limit('', 1)) self.assertFalse(self.mp.add_copy_spec_limit(None, 1)) def test_glob_file_over_limit(self): + self.mp.sysroot = '/' # assume these are in /tmp fn = create_file(2) fn2 = create_file(2) @@ -271,7 +284,10 @@ class AddCopySpecTests(unittest.TestCase): class CheckEnabledTests(unittest.TestCase): def setUp(self): - self.mp = EnablerPlugin({'policy': sos.policies.load()}) + self.mp = EnablerPlugin({ + 'policy': sos.policies.load(), + 'sysroot': os.getcwd() + }) def test_checks_for_file(self): f = j("tail_test.txt") @@ -296,7 +312,8 @@ class RegexSubTests(unittest.TestCase): def setUp(self): self.mp = MockPlugin({ - 'cmdlineopts': MockOptions() + 'cmdlineopts': MockOptions(), + 'sysroot': os.getcwd() }) self.mp.archive = MockArchive() @@ -310,6 +327,8 @@ class RegexSubTests(unittest.TestCase): self.assertEquals(0, replacements) def test_replacements(self): + # test uses absolute paths + self.mp.sysroot = '/' self.mp.add_copy_spec(j("tail_test.txt")) self.mp.collect() replacements = self.mp.do_file_sub(j("tail_test.txt"), r"(tail)", "foobar") |