diff options
author | jhjaggars <jhjaggars@gmail.com> | 2012-02-23 13:39:04 -0800 |
---|---|---|
committer | jhjaggars <jhjaggars@gmail.com> | 2012-02-23 13:39:04 -0800 |
commit | a644f995f5ed5c900293b440431443963fef8855 (patch) | |
tree | 7d417249ee9ffed1b98233a46202f329b1adb7f6 | |
parent | 237f8a0cdfc211126d962fd5dae3790f8599b41d (diff) | |
parent | ba01d5cbc1d83e09868eb0605f70e6c5bfbf20ad (diff) | |
download | sos-a644f995f5ed5c900293b440431443963fef8855.tar.gz |
Merge pull request #33 from jhjaggars/policy_refactoring
refactoring common linux-related policy features into a common superclas...
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | README | 4 | ||||
-rw-r--r-- | sos/policies/__init__.py | 103 | ||||
-rw-r--r-- | sos/policies/debian.py | 115 | ||||
-rw-r--r-- | sos/policies/redhat.py | 115 | ||||
-rw-r--r-- | sos/policies/ubuntu.py | 31 | ||||
-rw-r--r-- | sos/utilities.py | 33 | ||||
-rw-r--r--[-rwxr-xr-x] | tests/archive_tests.py | 0 | ||||
-rw-r--r-- | tests/importer_tests.py | 13 | ||||
-rw-r--r-- | tests/policy_tests.py | 51 |
10 files changed, 207 insertions, 260 deletions
@@ -8,3 +8,5 @@ buildjar/ gpgkeys/rhsupport.* rpm-build/* sos/__init__.py +cover/ +.coverage @@ -13,3 +13,7 @@ To access to the public source code repository for this project run: to install locally (as root) ==> make install to build an rpm ==> make rpm to build a zipfile for use with jython ==> make zip + +I recommend nose to run the unittests: + +nosetests -v --with-cover --cover-package=sos --cover-html diff --git a/sos/policies/__init__.py b/sos/policies/__init__.py index ad6539c0..f5c0e036 100644 --- a/sos/policies/__init__.py +++ b/sos/policies/__init__.py @@ -1,4 +1,5 @@ import os +import re import platform import time @@ -37,19 +38,23 @@ class PackageManager(object): """ Return a list of packages that match name. """ - return [] + return fnmatch.filter(self.allPkgs().keys(), name) def allPkgsByNameRegex(self, regex_name, flags=None): """ Return a list of packages that match regex_name. """ - return [] + reg = re.compile(regex_name, flags) + return [pkg for pkg in self.allPkgs().keys() if reg.match(pkg)] def pkgByName(self, name): """ Return a single package that matches name. """ - return None + try: + self.AllPkgsByName(name)[-1] + except Exception: + return None def allPkgs(self): """ @@ -57,6 +62,12 @@ class PackageManager(object): """ return [] + def pkgNVRA(self, pkg): + fields = pkg.split("-") + version, release, arch = fields[-3:] + name = "-".join(fields[:-3]) + return (name, version, release, arch) + class Policy(object): @@ -82,6 +93,7 @@ No changes will be made to your system. self.reportName = self.hostname self.ticketNumber = None self.package_manager = PackageManager() + self.valid_subclasses = [] def check(self): """ @@ -110,7 +122,8 @@ No changes will be made to your system. """ Verifies that the plugin_class should execute under this policy """ - return issubclass(plugin_class, IndependentPlugin) + valid_subclasses = [IndependentPlugin] + self.valid_subclasses + return any(issubclass(plugin_class, class_) for class_ in valid_subclasses) def preWork(self): """ @@ -131,7 +144,7 @@ No changes will be made to your system. pass def pkgByName(self, pkg): - return None + return self.package_manager.pkgByName(pkg) def _parse_uname(self): (system, node, release, @@ -160,7 +173,6 @@ No changes will be made to your system. archive_fp.close() return digest.hexdigest() - def getPreferredHashAlgorithm(self): """Returns the string name of the hashlib-supported checksum algorithm to use""" @@ -286,3 +298,82 @@ class GenericPolicy(Policy): def get_msg(self): return self.msg % {'distro': self.system} + + +class LinuxPolicy(Policy): + """This policy is meant to be an abc class that provides common implementations used + in Linux distros""" + + def __init__(self): + super(LinuxPolicy, self).__init__() + + def getPreferredHashAlgorithm(self): + checksum = "md5" + try: + fp = open("/proc/sys/crypto/fips_enabled", "r") + except: + return checksum + + fips_enabled = fp.read() + if fips_enabled.find("1") >= 0: + checksum = "sha256" + fp.close() + return checksum + + def runlevelDefault(self): + try: + with open("/etc/inittab") as fp: + pattern = r"id:(\d{1}):initdefault:" + text = fp.read() + return int(re.findall(pattern, text)[0]) + except: + return 3 + + def kernelVersion(self): + return self.release + + def hostName(self): + return self.hostname + + def isKernelSMP(self): + return self.smp + + def getArch(self): + return self.machine + + def getLocalName(self): + """Returns the name usd in the preWork step""" + return self.hostName() + + def preWork(self): + # this method will be called before the gathering begins + + localname = self.getLocalName() + + if not self.commons['cmdlineopts'].batch and not self.commons['cmdlineopts'].silent: + try: + self.reportName = raw_input(_("Please enter your first initial and last name [%s]: ") % localname) + self.reportName = re.sub(r"[^a-zA-Z.0-9]", "", self.reportName) + + self.ticketNumber = raw_input(_("Please enter the case number that you are generating this report for: ")) + self.ticketNumber = re.sub(r"[^0-9]", "", self.ticketNumber) + self._print() + except: + self._print() + sys.exit(0) + + if len(self.reportName) == 0: + self.reportName = localname + + if self.commons['cmdlineopts'].customerName: + self.reportName = self.commons['cmdlineopts'].customerName + self.reportName = re.sub(r"[^a-zA-Z.0-9]", "", self.reportName) + + if self.commons['cmdlineopts'].ticketNumber: + self.ticketNumber = self.commons['cmdlineopts'].ticketNumber + self.ticketNumber = re.sub(r"[^0-9]", "", self.ticketNumber) + + return + + def packageResults(self, archive_filename): + self._print(_("Creating compressed archive...")) diff --git a/sos/policies/debian.py b/sos/policies/debian.py index 6d11482d..0d6bcec9 100644 --- a/sos/policies/debian.py +++ b/sos/policies/debian.py @@ -1,13 +1,10 @@ from __future__ import with_statement -from sos import _sos as _ -from sos.plugins import DebianPlugin, IndependentPlugin -from sos.policies import PackageManager, Policy +from sos.plugins import DebianPlugin +from sos.policies import PackageManager, LinuxPolicy from sos.utilities import shell_out import os -import sys -import re class DebianPackageManager(PackageManager): @@ -24,83 +21,26 @@ class DebianPackageManager(PackageManager): 'version': version } - def allPkgsByName(self, name): - return fnmatch.filter(self.allPkgs().keys(), name) - - def allPkgsByNameRegex(self, regex_name, flags=None): - reg = re.compile(regex_name, flags) - return [pkg for pkg in self.allPkgs().keys() if reg.match(pkg)] - - def pkgByName(self, name): - try: - self.AllPkgsByName(name)[-1] - except Exception: - return None - def allPkgs(self): if not self._debs: self._debs = self._get_deb_list() return self._debs - def pkgNVRA(self, pkg): - fields = pkg.split("-") - version, release, arch = fields[-3:] - name = "-".join(fields[:-3]) - return (name, version, release, arch) -class DebianPolicy(Policy): +class DebianPolicy(LinuxPolicy): def __init__(self): super(DebianPolicy, self).__init__() self.reportName = "" self.ticketNumber = "" self.package_manager = DebianPackageManager() - - def validatePlugin(self, plugin_class): - "Checks that the plugin will execute given the environment" - return issubclass(plugin_class, DebianPlugin) or issubclass(plugin_class, IndependentPlugin) + self.valid_subclasses = [DebianPlugin] + self.distro = "Debian" @classmethod def check(self): """This method checks to see if we are running on Debian. It returns True or False.""" - if os.path.isfile('/etc/debian_version'): - return True - return False - - def preferedArchive(self): - from sos.utilities import TarFileArchive - return TarFileArchive - - def getPreferredHashAlgorithm(self): - checksum = "md5" - try: - fp = open("/proc/sys/crypto/fips_enabled", "r") - except: - return checksum - - fips_enabled = fp.read() - if fips_enabled.find("1") >= 0: - checksum = "sha256" - fp.close() - return checksum - - def pkgByName(self, name): - return self.package_manager.pkgByName(name) - - def runlevelDefault(self): - try: - with open("/etc/inittab") as fp: - pattern = r"id:(\d{1}):initdefault:" - text = fp.read() - return int(re.findall(pattern, text)[0]) - except: - return 3 - - def kernelVersion(self): - return self.release - - def hostName(self): - return self.hostname + return os.path.isfile('/etc/debian_version') def debianVersion(self): try: @@ -111,46 +51,3 @@ class DebianPolicy(Policy): except: pass return False - - def isKernelSMP(self): - return self.smp - - def getArch(self): - return self.machine - - def preWork(self): - # this method will be called before the gathering begins - - localname = self.hostName() - - if not self.commons['cmdlineopts'].batch and not self.commons['cmdlineopts'].silent: - try: - self.reportName = raw_input(_("Please enter your first initial and last name [%s]: ") % localname) - self.reportName = re.sub(r"[^a-zA-Z.0-9]", "", self.reportName) - - self.ticketNumber = raw_input(_("Please enter the case number that you are generating this report for: ")) - self.ticketNumber = re.sub(r"[^0-9]", "", self.ticketNumber) - self._print() - except: - self._print() - sys.exit(0) - - if len(self.reportName) == 0: - self.reportName = localname - - if self.commons['cmdlineopts'].customerName: - self.reportName = self.commons['cmdlineopts'].customerName - self.reportName = re.sub(r"[^a-zA-Z.0-9]", "", self.reportName) - - if self.commons['cmdlineopts'].ticketNumber: - self.ticketNumber = self.commons['cmdlineopts'].ticketNumber - self.ticketNumber = re.sub(r"[^0-9]", "", self.ticketNumber) - - return - - def packageResults(self, archive_filename): - self._print(_("Creating compressed archive...")) - - def get_msg(self): - msg_dict = {"distro": "Debian"} - return self.msg % msg_dict diff --git a/sos/policies/redhat.py b/sos/policies/redhat.py index 398514e1..928d42ab 100644 --- a/sos/policies/redhat.py +++ b/sos/policies/redhat.py @@ -21,16 +21,9 @@ from __future__ import with_statement import os import sys -from tempfile import gettempdir -import random -import re -import platform -import time -from collections import deque - -from sos import _sos as _ -from sos.plugins import RedHatPlugin, IndependentPlugin -from sos.policies import Policy, PackageManager + +from sos.plugins import RedHatPlugin +from sos.policies import LinuxPolicy, PackageManager from sos.utilities import shell_out sys.path.insert(0, "/usr/share/rhn/") @@ -58,68 +51,26 @@ class RHELPackageManager(PackageManager): 'version': version } - def allPkgsByName(self, name): - return fnmatch.filter(self.allPkgs().keys(), name) - - def allPkgsByNameRegex(self, regex_name, flags=None): - reg = re.compile(regex_name, flags) - return [pkg for pkg in self.allPkgs().keys() if reg.match(pkg)] - - def pkgByName(self, name): - try: - self.AllPkgsByName(name)[-1] - except Exception: - return None - def allPkgs(self): if not self._rpms: self._rpms = self._get_rpm_list() return self._rpms - def pkgNVRA(self, pkg): - fields = pkg.split("-") - version, release, arch = fields[-3:] - name = "-".join(fields[:-3]) - return (name, version, release, arch) - -class RHELPolicy(Policy): +class RHELPolicy(LinuxPolicy): def __init__(self): super(RHELPolicy, self).__init__() self.reportName = "" self.ticketNumber = "" self.package_manager = RHELPackageManager() - - def validatePlugin(self, plugin_class): - "Checks that the plugin will execute given the environment" - return issubclass(plugin_class, RedHatPlugin) or issubclass(plugin_class, IndependentPlugin) + self.valid_subclasses = [RedHatPlugin] @classmethod def check(self): "This method checks to see if we are running on RHEL. It returns True or False." return os.path.isfile('/etc/redhat-release') or os.path.isfile('/etc/fedora-release') - def preferedArchive(self): - from sos.utilities import TarFileArchive - return TarFileArchive - - def getPreferredHashAlgorithm(self): - checksum = "md5" - try: - fp = open("/proc/sys/crypto/fips_enabled", "r") - except: - return checksum - - fips_enabled = fp.read() - if fips_enabled.find("1") >= 0: - checksum = "sha256" - fp.close() - return checksum - - def pkgByName(self, name): - return self.package_manager.pkgByName(name) - def runlevelByService(self, name): from subprocess import Popen, PIPE ret = [] @@ -141,21 +92,6 @@ class RHELPolicy(Policy): ret.append(int(runlevel)) return ret - def runlevelDefault(self): - try: - with open("/etc/inittab") as fp: - pattern = r"id:(\d{1}):initdefault:" - text = fp.read() - return int(re.findall(pattern, text)[0]) - except: - return 3 - - def kernelVersion(self): - return self.release - - def hostName(self): - return self.hostname - def rhelVersion(self): try: pkg = self.pkgByName("redhat-release") or \ @@ -180,45 +116,8 @@ class RHELPolicy(Policy): # ignore any exception and return an empty username return "" - def isKernelSMP(self): - return self.smp - - def getArch(self): - return self.machine - - def preWork(self): - # this method will be called before the gathering begins - - localname = self.rhnUsername() - if len(localname) == 0: localname = self.hostName() - - if not self.commons['cmdlineopts'].batch and not self.commons['cmdlineopts'].silent: - try: - self.reportName = raw_input(_("Please enter your first initial and last name [%s]: ") % localname) - self.reportName = re.sub(r"[^a-zA-Z.0-9]", "", self.reportName) - - self.ticketNumber = raw_input(_("Please enter the case number that you are generating this report for: ")) - self.ticketNumber = re.sub(r"[^0-9]", "", self.ticketNumber) - self._print() - except: - self._print() - sys.exit(0) - - if len(self.reportName) == 0: - self.reportName = localname - - if self.commons['cmdlineopts'].customerName: - self.reportName = self.commons['cmdlineopts'].customerName - self.reportName = re.sub(r"[^a-zA-Z.0-9]", "", self.reportName) - - if self.commons['cmdlineopts'].ticketNumber: - self.ticketNumber = self.commons['cmdlineopts'].ticketNumber - self.ticketNumber = re.sub(r"[^0-9]", "", self.ticketNumber) - - return - - def packageResults(self, archive_filename): - self._print(_("Creating compressed archive...")) + def getLocalName(self): + return self.rhnUsername() or self.hostName() def get_msg(self): msg_dict = {"distro": "Red Hat Enterprise Linux"} diff --git a/sos/policies/ubuntu.py b/sos/policies/ubuntu.py index 7d61d054..a5eea4da 100644 --- a/sos/policies/ubuntu.py +++ b/sos/policies/ubuntu.py @@ -1,31 +1,22 @@ from __future__ import with_statement -from sos import _sos as _ -from sos.plugins import UbuntuPlugin, IndependentPlugin -from sos.policies.debian import DebianPolicy, DebianPackageManager -from sos.utilities import shell_out import os - + +from sos.plugins import UbuntuPlugin, IndependentPlugin +from sos.policies.debian import DebianPolicy + class UbuntuPolicy(DebianPolicy): def __init__(self): super(UbuntuPolicy, self).__init__() - - def validatePlugin(self, plugin_class): - "Checks that the plugin will execute given the environment" - return issubclass(plugin_class, UbuntuPlugin) or issubclass(plugin_class, IndependentPlugin) + self.distro = "Ubuntu" + self.valid_subclasses = [UbuntuPlugin] @classmethod def check(self): """This method checks to see if we are running on Ubuntu. It returns True or False.""" - if os.path.isfile('/etc/lsb-release'): - try: - with open('/etc/lsb-release', 'r') as fp: - return "Ubuntu" in fp.read() - except: - return False - return False - - def get_msg(self): - msg_dict = {"distro": "Ubuntu"} - return self.msg % msg_dict + try: + with open('/etc/lsb-release', 'r') as fp: + return "Ubuntu" in fp.read() + except: + return False diff --git a/sos/utilities.py b/sos/utilities.py index 55fab228..5aef0cb8 100644 --- a/sos/utilities.py +++ b/sos/utilities.py @@ -317,15 +317,18 @@ class TarFileArchive(Archive): else: dest = self.prepend(src) - fp = open(src, 'rb') - content = fp.read() - fp.close() + if os.path.isdir(src): + self.tarfile.add(src, dest) + else: + fp = open(src, 'rb') + content = fp.read() + fp.close() - tar_info = tarfile.TarInfo(name=dest) - tar_info.size = len(content) - tar_info.mtime = os.stat(src).st_mtime + tar_info = tarfile.TarInfo(name=dest) + tar_info.size = len(content) + tar_info.mtime = os.stat(src).st_mtime - self.tarfile.addfile(tar_info, StringIO(content)) + self.tarfile.addfile(tar_info, StringIO(content)) def add_string(self, content, dest): dest = self.prepend(dest) @@ -423,29 +426,25 @@ def compress(archive, method): methods = ['xz', 'bzip2', 'gzip'] - if method in ('xz', 'bzip2', 'gzip'): + if method in methods: methods = [method] - compressed = False - last_error = None - for cmd in ('xz', 'bzip2', 'gzip'): - if compressed: - break + last_error = Exception("compression failed for an unknown reason") + log = logging.getLogger('sos') + + for cmd in methods: try: command = shlex.split("%s %s" % (cmd,archive.name())) p = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=-1) stdout, stderr = p.communicate() - log = logging.getLogger('sos') if stdout: log.info(stdout) if stderr: log.error(stderr) - compressed = True return archive.name() + "." + cmd.replace('ip','') except Exception, e: last_error = e - - if not compressed: + else: raise last_error diff --git a/tests/archive_tests.py b/tests/archive_tests.py index 01c46395..01c46395 100755..100644 --- a/tests/archive_tests.py +++ b/tests/archive_tests.py diff --git a/tests/importer_tests.py b/tests/importer_tests.py new file mode 100644 index 00000000..91f82cc0 --- /dev/null +++ b/tests/importer_tests.py @@ -0,0 +1,13 @@ +import unittest + +from sos.utilities import ImporterHelper + +class ImporterHelperTests(unittest.TestCase): + + def test_runs(self): + h = ImporterHelper(unittest) + modules = h.get_modules() + self.assertTrue('main' in modules) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/policy_tests.py b/tests/policy_tests.py new file mode 100644 index 00000000..2654c82c --- /dev/null +++ b/tests/policy_tests.py @@ -0,0 +1,51 @@ +import unittest + +from sos.policies import Policy, import_policy +from sos.plugins import Plugin, IndependentPlugin, RedHatPlugin, DebianPlugin + +class FauxPolicy(Policy): + distro = "Faux" + +class FauxPlugin(Plugin, IndependentPlugin): + pass + +class FauxRedHatPlugin(Plugin, RedHatPlugin): + pass + +class FauxDebianPlugin(Plugin, DebianPlugin): + pass + +class PolicyTests(unittest.TestCase): + + def test_independent_only(self): + p = FauxPolicy() + p.valid_subclasses = [] + + self.assertTrue(p.validatePlugin(FauxPlugin)) + + def test_redhat(self): + p = FauxPolicy() + p.valid_subclasses = [RedHatPlugin] + + self.assertTrue(p.validatePlugin(FauxRedHatPlugin)) + + def test_debian(self): + p = FauxPolicy() + p.valid_subclasses = [DebianPlugin] + + self.assertTrue(p.validatePlugin(FauxDebianPlugin)) + + def test_fails(self): + p = FauxPolicy() + p.valid_subclasses = [] + + self.assertFalse(p.validatePlugin(FauxDebianPlugin)) + + def test_can_import(self): + self.assertTrue(import_policy('redhat') is not None) + + def test_cant_import(self): + self.assertTrue(import_policy('notreal') is None) + +if __name__ == "__main__": + unittest.main() |