aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Jaggars <jjaggars@redhat.com>2012-03-05 10:30:59 -0600
committerJesse Jaggars <jjaggars@redhat.com>2012-03-05 10:30:59 -0600
commitc10115652a795d0c7971ec9b03c4c58742063ddf (patch)
tree4c14fa591b807350f6db6c7da10f1430049602f5
parent83c6e09756bd228dc76e7a84646de5a9f5b5e694 (diff)
downloadsos-c10115652a795d0c7971ec9b03c4c58742063ddf.tar.gz
Refactoring PackageManagers and Archives
Pulled up PackageManager implementation to simplify subclass responsibilities Moved compress method to Archive classes Re-organized utilities.py Added tests to exercise more utilities methods
-rw-r--r--sos/policies/__init__.py35
-rw-r--r--sos/policies/redhat.py31
-rw-r--r--sos/sosreport.py4
-rw-r--r--sos/utilities.py450
-rw-r--r--tests/archive_tests.py26
-rw-r--r--tests/path/to/leaf0
-rwxr-xr-xtests/test_exe.py2
-rw-r--r--tests/utilities_tests.py48
8 files changed, 348 insertions, 248 deletions
diff --git a/sos/policies/__init__.py b/sos/policies/__init__.py
index 88b00247..c4f7010e 100644
--- a/sos/policies/__init__.py
+++ b/sos/policies/__init__.py
@@ -34,6 +34,21 @@ def load(cache={}):
class PackageManager(object):
+ """Encapsulates a package manager. If you provide a query_command to the
+ constructor it should print each package on the system in the following
+ format:
+ package name|package.version\n
+
+ You may also subclass this class and provide a getPackageList method to
+ build the list of packages and versions.
+ """
+
+ query_command = None
+
+ def __init__(self, query_command=None):
+ self.packages = {}
+ if query_command:
+ self.query_command = query_command
def allPkgsByName(self, name):
"""
@@ -57,11 +72,29 @@ class PackageManager(object):
except Exception:
return None
+ def getPackageList(self):
+ """
+ returns a dictionary of packages in the following format:
+ {'package_name': {'name': 'package_name', 'version': 'major.minor.version'}}
+ """
+ if self.query_command:
+ pkg_list = shell_out(self.query_command).splitlines()
+ for pkg in pkg_list:
+ name, version = pkg.split("|")
+ self.packages[name] = {
+ 'name': name,
+ 'version': version.split(".")
+ }
+
+ return self.packages
+
def allPkgs(self):
"""
Return a list of all packages.
"""
- return {}
+ if not self.packages:
+ self.packages = self.getPackageList()
+ return self.packages
def pkgNVRA(self, pkg):
fields = pkg.split("-")
diff --git a/sos/policies/redhat.py b/sos/policies/redhat.py
index 5d9af55e..89e6e9b2 100644
--- a/sos/policies/redhat.py
+++ b/sos/policies/redhat.py
@@ -36,40 +36,21 @@ except:
pass
-class RHELPackageManager(PackageManager):
-
- _rpms = None
-
- def _get_rpm_list(self):
- cmd = 'rpm -qa --queryformat "%{NAME}|%{VERSION}\\n"'
- pkg_list = shell_out(cmd).splitlines()
- self._rpms = {}
- for pkg in pkg_list:
- name, version = pkg.split("|")
- self._rpms[name] = {
- 'name': name,
- 'version': version
- }
-
- def allPkgs(self):
- if not self._rpms:
- self._rpms = self._get_rpm_list()
- return self._rpms
-
-
class RHELPolicy(LinuxPolicy):
def __init__(self):
super(RHELPolicy, self).__init__()
self.reportName = ""
self.ticketNumber = ""
- self.package_manager = RHELPackageManager()
+ self.package_manager = PackageManager('rpm -qa --queryformat "%{NAME}|%{VERSION}\\n"')
self.valid_subclasses = [RedHatPlugin]
@classmethod
def check(self):
- "This method checks to see if we are running on RHEL. It returns True or False."
- return os.path.isfile('/etc/redhat-release') or os.path.isfile('/etc/fedora-release')
+ """This method checks to see if we are running on RHEL. It returns True
+ or False."""
+ return (os.path.isfile('/etc/redhat-release')
+ or os.path.isfile('/etc/fedora-release'))
def runlevelByService(self, name):
from subprocess import Popen, PIPE
@@ -99,7 +80,7 @@ class RHELPolicy(LinuxPolicy):
pkgname = pkg["version"]
if pkgname[0] == "4":
return 4
- elif pkgname in [ "5Server", "5Client" ]:
+ elif pkgname[0] in [ "5Server", "5Client" ]:
return 5
elif pkgname[0] == "6":
return 6
diff --git a/sos/sosreport.py b/sos/sosreport.py
index 5cfa5db8..2631adfb 100644
--- a/sos/sosreport.py
+++ b/sos/sosreport.py
@@ -801,9 +801,7 @@ class SoSReport(object):
self._finish_logging()
- self.archive.close()
-
- final_filename = compress(self.archive, self.opts.compression_type)
+ final_filename = self.archive.compress(self.opts.compression_type)
# automated submission will go here
if not self.opts.upload:
diff --git a/sos/utilities.py b/sos/utilities.py
index 43ed1128..e9184394 100644
--- a/sos/utilities.py
+++ b/sos/utilities.py
@@ -22,12 +22,11 @@
import os
import re
-import sys
import string
import fnmatch
import inspect
from stat import *
-from itertools import *
+#from itertools import *
from subprocess import Popen, PIPE
import shlex
import logging
@@ -71,185 +70,20 @@ def get_hash_name():
except:
return 'sha256'
-class DirTree(object):
- """Builds an ascii representation of a directory structure"""
-
- def __init__(self, top_directory):
- self.directory_count = 0
- self.file_count = 0
- self.buffer = []
- self.top_directory = top_directory
- self._build_tree()
-
- def buf(self, s):
- self.buffer.append(s)
-
- def printtree(self):
- print str(self)
-
- def as_string(self):
- return str(self)
-
- def __str__(self):
- return "\n".join(self.buffer)
-
- def _build_tree(self):
- self.buf(os.path.abspath(self.top_directory))
- self.tree_i(self.top_directory, first=True)
-
- def _convert_bytes(self, n):
- K, M, G, T = 1 << 10, 1 << 20, 1 << 30, 1 << 40
- if n >= T:
- return '%.1fT' % (float(n) / T)
- elif n >= G:
- return '%.1fG' % (float(n) / G)
- elif n >= M:
- return '%.1fM' % (float(n) / M)
- elif n >= K:
- return '%.1fK' % (float(n) / K)
- else:
- return '%d' % n
-
- def _get_user(self, stats):
- try:
- import pwd
- return pwd.getpwuid(stats.st_uid)[0]
- except ImportError:
- return str(stats.st_uid)
-
- def _get_group(self, stats):
- try:
- import grp
- return grp.getgrgid(stats.st_gid)[0]
- except ImportError:
- return str(stats.st_uid)
-
- def _format(self, path):
- """Conditionally adds detail to paths"""
- stats = os.stat(path)
- details = {
- "filename": os.path.basename(path),
- "user": self._get_user(stats),
- "group": self._get_group(stats),
- "filesize": self._convert_bytes(stats.st_size),
- }
- return ("[%(user)s %(group)s %(filesize)s] " % details, "%(filename)s" % details)
-
- def tree_i(self, dir_, padding='', first=False, fmt="%-30s %s%s%s"):
- if not first:
- details, filename = self._format(os.path.abspath(dir_))
- line = fmt % (details, padding[:-1], "+-- ", filename)
- self.buf(line)
- padding += ' '
-
- count = 0
- files = os.listdir(dir_)
- files.sort(key=string.lower)
- for f in files:
- count += 1
- path = os.path.join(dir_, f)
-
- if f.startswith("."):
- pass
- elif os.path.isfile(path):
- self.file_count += 1
- details, filename = self._format(path)
- line = fmt % (details, padding, "+-- ", filename)
- self.buf(line)
- elif os.path.islink(path):
- self.buf(padding +
- '+-- ' +
- f +
- ' -> ' + os.path.basename(os.path.realpath(path)))
- if os.path.isdir(path):
- self.directory_count += 1
- else:
- self.file_count += 1
- elif os.path.isdir(path):
- self.directory_count += 1
- if count == len(files):
- self.tree_i(path, padding + ' ')
- else:
- self.tree_i(path, padding + '|')
-
-
-class ImporterHelper(object):
- """Provides a list of modules that can be imported in a package.
- Importable modules are located along the module __path__ list and modules
- are files that end in .py. This class will read from PKZip archives as well
- for listing out jar and egg contents."""
-
- def __init__(self, package):
- """package is a package module
- import my.package.module
- helper = ImporterHelper(my.package.module)"""
- self.package = package
-
- def _plugin_name(self, path):
- "Returns the plugin module name given the path"
- base = os.path.basename(path)
- name, ext = os.path.splitext(base)
- return name
-
- def _get_plugins_from_list(self, list_):
- plugins = [self._plugin_name(plugin)
- for plugin in list_
- if "__init__" not in plugin
- and plugin.endswith(".py")]
- plugins.sort()
- return plugins
-
- def _find_plugins_in_dir(self, path):
- if os.path.exists(path):
- py_files = list(find("*.py", path))
- pnames = self._get_plugins_from_list(py_files)
- if pnames:
- return pnames
- else:
- return []
-
- def _get_path_to_zip(self, path, tail_list=None):
- if not tail_list:
- tail_list = ['']
-
- if path.endswith(('.jar', '.zip', '.egg')):
- return path, os.path.join(*tail_list)
-
- head, tail = os.path.split(path)
- tail_list.insert(0, tail)
-
- if head == path:
- raise Exception("not a zip file")
- else:
- return self._get_path_to_zip(head, tail_list)
-
-
- def _find_plugins_in_zipfile(self, path):
- try:
- path_to_zip, tail = self._get_path_to_zip(path)
- zf = zipfile.ZipFile(path_to_zip)
- # the path will have os separators, but the zipfile will always have '/'
- tail = tail.replace(os.path.sep, "/")
- root_names = [name for name in zf.namelist() if tail in name]
- candidates = self._get_plugins_from_list(root_names)
- zf.close()
- if candidates:
- return candidates
- else:
- return []
- except (IOError, Exception):
- return []
-
- def get_modules(self):
- "Returns the list of importable modules in the configured python package."
- plugins = []
- for path in self.package.__path__:
- if os.path.isdir(path) or path == '':
- plugins.extend(self._find_plugins_in_dir(path))
- else:
- plugins.extend(self._find_plugins_in_zipfile(path))
+def convert_bytes(bytes_, K=1 << 10, M=1 << 20, G=1 << 30, T=1 << 40):
+ """Converts a number of bytes to a shorter, more human friendly format"""
+ fn = float(bytes_)
+ if bytes_ >= T:
+ return '%.1fT' % (fn / T)
+ elif bytes_ >= G:
+ return '%.1fG' % (fn / G)
+ elif bytes_ >= M:
+ return '%.1fM' % (fn / M)
+ elif bytes_ >= K:
+ return '%.1fK' % (fn / K)
+ else:
+ return '%d' % bytes_
- return plugins
def find(file_pattern, top_dir, max_depth=None, path_pattern=None):
@@ -284,6 +118,12 @@ def grep(pattern, *files_or_paths):
return matches
+def is_executable(command):
+ """Returns if a command matches an executable on the PATH"""
+
+ paths = os.environ.get("PATH", "").split(os.path.pathsep)
+ candidates = [command] + [os.path.join(p, command) for p in paths]
+ return any(os.access(path, os.X_OK) for path in candidates)
def sosGetCommandOutput(command, timeout=300):
"""Execute a command through the system shell. First checks to see if the
@@ -291,17 +131,18 @@ def sosGetCommandOutput(command, timeout=300):
# XXX: what is this doing this for?
cmdfile = command.strip("(").split()[0]
- possibles = [cmdfile] + [os.path.join(path, cmdfile) for path in os.environ.get("PATH", "").split(":")]
-
- if any(os.access(path, os.X_OK) for path in possibles):
+ if is_executable(cmdfile):
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, bufsize=-1)
stdout, stderr = p.communicate()
return (p.returncode, stdout.strip(), 0)
else:
return (127, "", 0)
-
def import_module(module_fqname, superclass=None):
+ """Imports the module module_fqname and returns a list of defined classes
+ from that module. If superclass is defined then the classes returned will
+ be subclasses of the specified superclass."""
+
module_name = module_fqname.rpartition(".")[-1]
module = __import__(module_fqname, globals(), locals(), [module_name])
modules = [class_ for cname, class_ in
@@ -312,6 +153,13 @@ def import_module(module_fqname, superclass=None):
return modules
+def shell_out(cmd):
+ """Uses subprocess.Popen to make a system call and returns stdout.
+ Does not handle exceptions."""
+ p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
+ return p.communicate()[0]
+
+
class Archive(object):
_name = "unset"
@@ -319,20 +167,29 @@ class Archive(object):
def prepend(self, src):
if src:
name = os.path.split(self._name)[-1]
- return os.path.join(name, src.lstrip(os.sep))
+ renamed = os.path.join(name, src.lstrip(os.sep))
+ return renamed
def add_link(self, dest, link_name):
pass
+ def compress(self, method):
+ """Compress an archive object via method. ZIP archives are ignored. If
+ method is automatic then the following technologies are tried in order: xz,
+ bz2 and gzip"""
+
+ self.close()
+
class TarFileArchive(Archive):
def __init__(self, name):
self._name = name
+ self._suffix = "tar"
self.tarfile = tarfile.open(self.name(), mode="w")
def name(self):
- return "%s.tar" % self._name
+ return "%s.%s" % (self._name, self._suffix)
def add_file(self, src, dest=None):
if dest:
@@ -382,6 +239,33 @@ class TarFileArchive(Archive):
def close(self):
self.tarfile.close()
+ def compress(self, method):
+ super(TarFileArchive, self).compress(method)
+
+ methods = ['xz', 'bzip2', 'gzip']
+
+ if method in methods:
+ methods = [method]
+
+ last_error = Exception("compression failed for an unknown reason")
+ log = logging.getLogger('sos')
+
+ for cmd in methods:
+ try:
+ command = shlex.split("%s %s" % (cmd,self.name()))
+ p = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=-1)
+ stdout, stderr = p.communicate()
+ if stdout:
+ log.info(stdout)
+ if stderr:
+ log.error(stderr)
+ self._suffix += "." + cmd.replace('ip', '')
+ return self.name()
+ except Exception, e:
+ last_error = e
+ else:
+ raise last_error
+
class ZipFileArchive(Archive):
@@ -398,6 +282,10 @@ class ZipFileArchive(Archive):
def name(self):
return "%s.zip" % self._name
+ def compress(self, method):
+ super(ZipFileArchive, self).compress(method)
+ return self.name()
+
def add_file(self, src, dest=None):
if os.path.isdir(src):
# We may not need, this, but if we do I only want to do it
@@ -405,7 +293,7 @@ class ZipFileArchive(Archive):
regex = re.compile(r"^" + src)
for path, dirnames, filenames in os.walk(src):
for filename in filenames:
- filename = path + filename
+ filename = "/".join((path, filename))
if dest:
self.zipfile.write(filename,
self.prepend(re.sub(regex, dest, filename)))
@@ -439,41 +327,169 @@ class ZipFileArchive(Archive):
self.zipfile.close()
-def compress(archive, method):
- """Compress an archive object via method. ZIP archives are ignored. If
- method is automatic then the following technologies are tried in order: xz,
- bz2 and gzip"""
+class DirTree(object):
+ """Builds an ascii representation of a directory structure"""
+
+ def __init__(self, top_directory):
+ self.directory_count = 0
+ self.file_count = 0
+ self.buffer = []
+ self.top_directory = top_directory
+ self._build_tree()
+
+ def buf(self, s):
+ self.buffer.append(s)
- if method == "zip":
- return archive.name()
+ def printtree(self):
+ print str(self)
- methods = ['xz', 'bzip2', 'gzip']
+ def as_string(self):
+ return str(self)
- if method in methods:
- methods = [method]
+ def __str__(self):
+ return "\n".join(self.buffer)
- last_error = Exception("compression failed for an unknown reason")
- log = logging.getLogger('sos')
+ def _build_tree(self):
+ self.buf(os.path.abspath(self.top_directory))
+ self.tree_i(self.top_directory, first=True)
- for cmd in methods:
+ def _get_user(self, stats):
try:
- command = shlex.split("%s %s" % (cmd,archive.name()))
- p = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=-1)
- stdout, stderr = p.communicate()
- if stdout:
- log.info(stdout)
- if stderr:
- log.error(stderr)
- return archive.name() + "." + cmd.replace('ip','')
- except Exception, e:
- last_error = e
- else:
- raise last_error
+ import pwd
+ return pwd.getpwuid(stats.st_uid)[0]
+ except ImportError:
+ return str(stats.st_uid)
+ def _get_group(self, stats):
+ try:
+ import grp
+ return grp.getgrgid(stats.st_gid)[0]
+ except ImportError:
+ return str(stats.st_uid)
-def shell_out(cmd):
- """Uses subprocess.Popen to make a system call and returns stdout.
- Does not handle exceptions."""
- p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
- return p.communicate()[0]
-# vim:ts=4 sw=4 et
+ def _format(self, path):
+ """Conditionally adds detail to paths"""
+ stats = os.stat(path)
+ details = {
+ "filename": os.path.basename(path),
+ "user": self._get_user(stats),
+ "group": self._get_group(stats),
+ "filesize": convert_bytes(stats.st_size),
+ }
+ return ("[%(user)s %(group)s %(filesize)s] " % details, "%(filename)s" % details)
+
+ def tree_i(self, dir_, padding='', first=False, fmt="%-30s %s%s%s"):
+ if not first:
+ details, filename = self._format(os.path.abspath(dir_))
+ line = fmt % (details, padding[:-1], "+-- ", filename)
+ self.buf(line)
+ padding += ' '
+
+ count = 0
+ files = os.listdir(dir_)
+ files.sort(key=string.lower)
+ for f in files:
+ count += 1
+ path = os.path.join(dir_, f)
+
+ if f.startswith("."):
+ pass
+ elif os.path.isfile(path):
+ self.file_count += 1
+ details, filename = self._format(path)
+ line = fmt % (details, padding, "+-- ", filename)
+ self.buf(line)
+ elif os.path.islink(path):
+ self.buf(padding +
+ '+-- ' +
+ f +
+ ' -> ' + os.path.basename(os.path.realpath(path)))
+ if os.path.isdir(path):
+ self.directory_count += 1
+ else:
+ self.file_count += 1
+ elif os.path.isdir(path):
+ self.directory_count += 1
+ if count == len(files):
+ self.tree_i(path, padding + ' ')
+ else:
+ self.tree_i(path, padding + '|')
+
+
+class ImporterHelper(object):
+ """Provides a list of modules that can be imported in a package.
+ Importable modules are located along the module __path__ list and modules
+ are files that end in .py. This class will read from PKZip archives as well
+ for listing out jar and egg contents."""
+
+ def __init__(self, package):
+ """package is a package module
+ import my.package.module
+ helper = ImporterHelper(my.package.module)"""
+ self.package = package
+
+ def _plugin_name(self, path):
+ "Returns the plugin module name given the path"
+ base = os.path.basename(path)
+ name, ext = os.path.splitext(base)
+ return name
+
+ def _get_plugins_from_list(self, list_):
+ plugins = [self._plugin_name(plugin)
+ for plugin in list_
+ if "__init__" not in plugin
+ and plugin.endswith(".py")]
+ plugins.sort()
+ return plugins
+
+ def _find_plugins_in_dir(self, path):
+ if os.path.exists(path):
+ py_files = list(find("*.py", path))
+ pnames = self._get_plugins_from_list(py_files)
+ if pnames:
+ return pnames
+ else:
+ return []
+
+ def _get_path_to_zip(self, path, tail_list=None):
+ if not tail_list:
+ tail_list = ['']
+
+ if path.endswith(('.jar', '.zip', '.egg')):
+ return path, os.path.join(*tail_list)
+
+ head, tail = os.path.split(path)
+ tail_list.insert(0, tail)
+
+ if head == path:
+ raise Exception("not a zip file")
+ else:
+ return self._get_path_to_zip(head, tail_list)
+
+
+ def _find_plugins_in_zipfile(self, path):
+ try:
+ path_to_zip, tail = self._get_path_to_zip(path)
+ zf = zipfile.ZipFile(path_to_zip)
+ # the path will have os separators, but the zipfile will always have '/'
+ tail = tail.replace(os.path.sep, "/")
+ root_names = [name for name in zf.namelist() if tail in name]
+ candidates = self._get_plugins_from_list(root_names)
+ zf.close()
+ if candidates:
+ return candidates
+ else:
+ return []
+ except (IOError, Exception):
+ return []
+
+ def get_modules(self):
+ "Returns the list of importable modules in the configured python package."
+ plugins = []
+ for path in self.package.__path__:
+ if os.path.isdir(path) or path == '':
+ plugins.extend(self._find_plugins_in_dir(path))
+ else:
+ plugins.extend(self._find_plugins_in_zipfile(path))
+
+ return plugins
diff --git a/tests/archive_tests.py b/tests/archive_tests.py
index f982be0a..8a542f73 100644
--- a/tests/archive_tests.py
+++ b/tests/archive_tests.py
@@ -66,13 +66,27 @@ class ZipFileArchiveTest(unittest.TestCase):
afp = self.zf.open_file('tests/string_test.txt')
self.assertEquals('this is my new content', afp.read())
+ def test_make_link(self):
+ self.zf.add_file('tests/ziptest')
+ self.zf.add_link('tests/ziptest', 'link_name')
+
+ self.zf.close()
+ try:
+ self.check_for_file('test/link_name')
+ self.fail("link should not exist")
+ except KeyError:
+ pass
+
+ def test_compress(self):
+ self.assertEquals(self.zf.compress("zip"), self.zf.name())
+
class TarFileArchiveTest(unittest.TestCase):
def setUp(self):
self.tf = TarFileArchive('test')
def tearDown(self):
- os.unlink('test.tar')
+ os.unlink(self.tf.name())
def check_for_file(self, filename):
rtf = tarfile.open('test.tar')
@@ -126,5 +140,15 @@ class TarFileArchiveTest(unittest.TestCase):
afp = self.tf.open_file('tests/string_test.txt')
self.assertEquals('this is my new content', afp.read())
+ def test_make_link(self):
+ self.tf.add_file('tests/ziptest')
+ self.tf.add_link('tests/ziptest', 'link_name')
+
+ self.tf.close()
+ self.check_for_file('test/link_name')
+
+ def test_compress(self):
+ name = self.tf.compress("gzip")
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/path/to/leaf b/tests/path/to/leaf
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/path/to/leaf
diff --git a/tests/test_exe.py b/tests/test_exe.py
new file mode 100755
index 00000000..f35d93f0
--- /dev/null
+++ b/tests/test_exe.py
@@ -0,0 +1,2 @@
+#!/usr/bin/env python
+print "executed"
diff --git a/tests/utilities_tests.py b/tests/utilities_tests.py
index d5115928..fbfc9a55 100644
--- a/tests/utilities_tests.py
+++ b/tests/utilities_tests.py
@@ -2,9 +2,11 @@ import os.path
import unittest
from StringIO import StringIO
-from sos.utilities import grep, DirTree, checksum
+from sos.utilities import grep, DirTree, checksum, get_hash_name, is_executable, sosGetCommandOutput, find
import sos
+TEST_DIR = os.path.dirname(__file__)
+
class GrepTest(unittest.TestCase):
def test_file_obj(self):
@@ -30,8 +32,52 @@ class DirTreeTest(unittest.TestCase):
t = DirTree(os.path.dirname(sos.__file__)).as_string()
self.assertTrue('Makefile' in t)
+
class ChecksumTest(unittest.TestCase):
def test_simple_hash(self):
self.assertEquals(checksum(StringIO('this is a test'), algorithm="sha256"),
'2e99758548972a8e8822ad47fa1017ff72f06f3ff6a016851f45c398732bc50c')
+
+ def test_hash_loading(self):
+ # not the greatest test, since we are asking the policy to pick for us
+ name = get_hash_name()
+ self.assertTrue(name in ('md5', 'sha256'))
+
+
+class ExecutableTest(unittest.TestCase):
+
+ def test_nonexe_file(self):
+ path = os.path.join(TEST_DIR, 'utility_tests.py')
+ self.assertFalse(is_executable(path))
+
+ def test_exe_file(self):
+ path = os.path.join(TEST_DIR, 'test_exe.py')
+ self.assertTrue(is_executable(path))
+
+ def test_output(self):
+ path = os.path.join(TEST_DIR, 'test_exe.py')
+ ret, out, junk = sosGetCommandOutput(path)
+ self.assertEquals(ret, 0)
+ self.assertEquals(out, "executed")
+
+ def test_output_non_exe(self):
+ path = os.path.join(TEST_DIR, 'utility_tests.py')
+ ret, out, junk = sosGetCommandOutput(path)
+ self.assertEquals(ret, 127)
+ self.assertEquals(out, "")
+
+
+class FindTest(unittest.TestCase):
+
+ def test_find_leaf(self):
+ leaves = find("leaf", TEST_DIR)
+ self.assertTrue(any(name.endswith("leaf") for name in leaves))
+
+ def test_too_shallow(self):
+ leaves = find("leaf", TEST_DIR, max_depth=1)
+ self.assertFalse(any(name.endswith("leaf") for name in leaves))
+
+ def test_not_in_pattern(self):
+ leaves = find("leaf", TEST_DIR, path_pattern="tests/path")
+ self.assertFalse(any(name.endswith("leaf") for name in leaves))