aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sos/plugins/__init__.py198
-rw-r--r--sos/utilities.py11
-rw-r--r--tests/plugin_tests.py311
-rw-r--r--tests/tail_test.txt4
-rw-r--r--tests/utilities_tests.py14
5 files changed, 455 insertions, 83 deletions
diff --git a/sos/plugins/__init__.py b/sos/plugins/__init__.py
index 3b12fd6b..fc3a4ed0 100644
--- a/sos/plugins/__init__.py
+++ b/sos/plugins/__init__.py
@@ -24,7 +24,7 @@
# pylint: disable-msg = W0611
# pylint: disable-msg = W0613
-from sos.utilities import sosGetCommandOutput, import_module, grep
+from sos.utilities import sosGetCommandOutput, import_module, grep, fileobj, tail
from sos import _sos as _
import inspect
import os
@@ -46,8 +46,7 @@ except ImportError:
import simplejson as json
def commonPrefix(l1, l2, common = None):
- """
- Returns a tuple like the following:
+ """Returns a tuple like the following:
([common, elements, from l1, and l2], [[tails, from, l1], [tails, from, l2]])
>>> commonPrefix(['usr','share','foo'], ['usr','share','bar'])
@@ -60,9 +59,9 @@ def commonPrefix(l1, l2, common = None):
return commonPrefix(l1[1:], l2[1:], common+[l1[0]])
def sosRelPath(path1, path2, sep=os.path.sep, pardir=os.path.pardir):
- ''' return a relative path from path1 equivalent to path path2.
- In particular: the empty string, if path1 == path2;
- path2, if path1 and path2 have no common prefix.
+ '''Return a relative path from path1 equivalent to path path2. In
+ particular: the empty string, if path1 == path2; path2, if path1 and path2
+ have no common prefix.
'''
try:
common, (u1, u2) = commonPrefix(path1.split(sep), path2.split(sep))
@@ -74,15 +73,41 @@ def sosRelPath(path1, path2, sep=os.path.sep, pardir=os.path.pardir):
return sep.join( [pardir]*len(u1) + u2 )
+def regex_findall(regex, fname):
+ '''Return a list of all non overlapping matches in the string(s)'''
+ try:
+ with fileobj(fname) as f:
+ return re.findall(regex, f.read(), re.MULTILINE)
+ except AttributeError:
+ return []
+
+
class PluginException(Exception):
pass
class Plugin(object):
- """
- This is the base class for sosreport plugins. This class should
- be subclassed by platform specific superclasses. Actual plugins
- should not subclass this class directly.
+ """ This is the base class for sosreport plugins. Plugins should subclass
+ this and set the class variables where applicable.
+
+ plugin_name is a string returned by plugin.name(). If this is set to None
+ (the default) class_.__name__.tolower() will be returned. Be sure to set
+ this if you are defining multiple plugins that do the same thing on
+ different platforms.
+
+ requires_root is a boolean that specifies whether or not sosreport should
+ execute this plugin as a super user.
+
+ version is a string representing the version of the plugin. This can be
+ useful for post-collection tooling.
+
+ packages is an iterable of the names of packages to check for before
+ running this plugin. If any of these packages is found on the system, the
+ default implementation of checkenabled will return True.
+
+ files is an iterable of the paths of files to check for before running this
+ plugin. If any of these packages is found on the system, the default
+ implementation of checkenabled will return True.
"""
plugin_name = None
@@ -120,7 +145,9 @@ class Plugin(object):
@classmethod
def name(class_):
- "Returns the plugin's name as a string"
+ """Returns the plugin's name as a string. This should return a
+ lowercase string.
+ """
if class_.plugin_name:
return class_.plugin_name
return class_.__name__.lower()
@@ -129,16 +156,14 @@ class Plugin(object):
return self.cInfo["policy"]
def isInstalled(self, package_name):
- '''Is the package $package_name installed?
- '''
+ '''Is the package $package_name installed?'''
return (self.policy().pkgByName(package_name) is not None)
def doRegexSub(self, srcpath, regexp, subst):
'''Apply a regexp substitution to a file archived by sosreport.
- srcpath is the path in the archive where the file can be found.
- regexp can be a regexp string or a compiled re object.
- subst is a string to replace each occurance of regexp in the content
- of srcpath.
+ srcpath is the path in the archive where the file can be found. regexp
+ can be a regexp string or a compiled re object. subst is a string to
+ replace each occurance of regexp in the content of srcpath.
This function returns the number of replacements made.
'''
@@ -153,16 +178,11 @@ class Plugin(object):
return replacements
else:
return 0
- except Exception:
+ except Exception, e:
return 0
def doRegexFindAll(self, regex, fname):
- ''' Return a list of all non overlapping matches in the string(s)
- '''
- try:
- return re.findall(regex, open(fname, 'r').read(), re.MULTILINE)
- except: # IOError, AttributeError, etc.
- return []
+ return regex_findall(regex, fname)
def _path_in_path_list(self, path, path_list):
for p in path_list:
@@ -211,13 +231,12 @@ class Plugin(object):
def doCopyFileOrDir(self, srcpath, dest=None, sub=None):
# pylint: disable-msg = R0912
# pylint: disable-msg = R0915
- '''
- Copy file or directory to the destination tree. If a directory,
- then everything below it is recursively copied. A list of copied files
- are saved for use later in preparing a report. sub can be used to
- rename the destination of the file, sub should be a two-tuple of
- (old,new). For example if you passed in ("etc","configurations") for
- use against /etc/my_file.conf the file would end up at
+ '''Copy file or directory to the destination tree. If a directory, then
+ everything below it is recursively copied. A list of copied files are
+ saved for use later in preparing a report. sub can be used to rename
+ the destination of the file, sub should be a two-tuple of (old,new).
+ For example if you passed in ("etc","configurations") for use against
+ /etc/my_file.conf the file would end up at
/configurations/my_file.conf.
'''
@@ -266,21 +285,19 @@ class Plugin(object):
def addForbiddenPath(self, forbiddenPath):
- """Specify a path to not copy, even if it's part of a copyPaths[] entry.
+ """Specify a path to not copy, even if it's part of a copyPaths[]
+ entry.
"""
# Glob case handling is such that a valid non-glob is a reduced glob
for filespec in glob.glob(forbiddenPath):
self.forbiddenPaths.append(filespec)
def getAllOptions(self):
- """
- return a list of all options selected
- """
+ """return a list of all options selected"""
return (self.optNames, self.optParms)
def setOption(self, optionname, value):
- ''' set the named option to value.
- '''
+ '''set the named option to value.'''
for name, parms in izip(self.optNames, self.optParms):
if name == optionname:
parms['enabled'] = value
@@ -289,8 +306,7 @@ class Plugin(object):
return False
def isOptionEnabled(self, optionname):
- ''' Deprecated, use getOption() instead
- '''
+ '''Deprecated, use getOption() instead'''
return self.getOption(optionname)
def getOption(self, optionname, default=0):
@@ -298,8 +314,9 @@ class Plugin(object):
passed in via the command line or set via set_option or via the
global_plugin_options dictionary, in that order.
- optionaname may be iterable, in which case the first option that matches
- any of the option names is returned."""
+ optionaname may be iterable, in which case the first option that
+ matches any of the option names is returned.
+ """
def _check(key):
if hasattr(optionname, "__iter__"):
@@ -320,7 +337,9 @@ class Plugin(object):
return default
def getOptionAsList(self, optionname, delimiter=",", default=None):
- '''Will try to return the option as a list separated by the delimiter'''
+ '''Will try to return the option as a list separated by the
+ delimiter.
+ '''
option = self.getOption(optionname)
try:
opt_list = [opt.strip() for opt in option.split(delimiter)]
@@ -329,37 +348,44 @@ class Plugin(object):
return default
def addCopySpecLimit(self, fname, sizelimit=None, sub=None):
- """Add a file specification (with limits)
+ """Add a file or glob but limit it to sizelimit megabytes. If fname is
+ a single file the file will be tailed to meet sizelimit. If the first
+ file in a glob is too large it will be tailed to meet the sizelimit.
"""
- if not ( fname and len(fname) ):
- # self.soslog.warning("invalid file path")
+ if not (fname and len(fname)):
return False
+
files = glob.glob(fname)
files.sort()
cursize = 0
limit_reached = False
sizelimit *= 1024 * 1024 # in MB
+ flog = None
+
for flog in files:
cursize += os.stat(flog)[ST_SIZE]
if sizelimit and cursize > sizelimit:
limit_reached = True
break
self.addCopySpec(flog, sub)
- # Truncate the first file (others would likely be compressed),
- # ensuring we get at least some logs
- # FIXME: figure this out for jython
- if len(files) == 1 and limit_reached:
- flog = files[0]
- self.collectExtOutput("tail -c%d %s" % (sizelimit, flog),
- "tail_" + os.path.basename(flog), flog[1:] + ".tailed")
+
+ if flog == files[0] and limit_reached:
+ flog_name = flog
+
+ if sub:
+ old, new = sub
+ flog_name = flog.replace(old, new)
+
+ self.addStringAsFile(tail(flog, sizelimit),
+ flog_name.replace(os.path.sep, ".") + ".tailed")
def addCopySpecs(self, copyspecs, sub=None):
for copyspec in copyspecs:
self.addCopySpec(copyspec, sub)
def addCopySpec(self, copyspec, sub=None):
- """ Add a file specification (can be file, dir,or shell glob) to be
- copied into the sosreport by this module
+ """Add a file specification (can be file, dir,or shell glob) to be
+ copied into the sosreport by this module.
"""
if not (copyspec and len(copyspec)):
# self.soslog.warning("invalid file path")
@@ -370,30 +396,31 @@ class Plugin(object):
self.copyPaths.append((filespec, sub))
def callExtProg(self, prog):
- """ Execute a command independantly of the output gathering part of
- sosreport
+ """Execute a command independantly of the output gathering part of
+ sosreport.
"""
# pylint: disable-msg = W0612
status, shout, runtime = sosGetCommandOutput(prog)
return (status, shout, runtime)
def checkExtprog(self, prog):
- """ Execute a command independently of the output gathering part of
+ """Execute a command independently of the output gathering part of
sosreport and check the return code. Return True for a return code of 0
- and False otherwise."""
+ and False otherwise.
+ """
(status, output, runtime) = self.callExtProg(prog)
return (status == 0)
def collectExtOutput(self, exe, suggest_filename=None, root_symlink=None, timeout=300):
- """
- Run a program and collect the output
- """
+ """Run a program and collect the output"""
self.collectProgs.append( (exe, suggest_filename, root_symlink, timeout) )
def fileGrep(self, regexp, *fnames):
- """Returns lines matched in fnames, where fnames can either be pathnames to files
- to grep through or open file objects to grep through line by line"""
+ """Returns lines matched in fnames, where fnames can either be
+ pathnames to files to grep through or open file objects to grep through
+ line by line.
+ """
return grep(regexp, *fnames)
def mangleCommand(self, exe):
@@ -404,7 +431,7 @@ class Plugin(object):
return mangledname
def makeCommandFilename(self, exe):
- """ The internal function to build up a filename based on a command """
+ """The internal function to build up a filename based on a command."""
outfn = os.path.join(self.cInfo['cmddir'], self.name(), self.mangleCommand(exe))
@@ -425,8 +452,8 @@ class Plugin(object):
self.copyStrings.append((content, filename))
def collectOutputNow(self, exe, suggest_filename=None, root_symlink=False, timeout=300):
- """ Execute a command and save the output to a file for inclusion in
- the report
+ """Execute a command and save the output to a file for inclusion in the
+ report.
"""
if self.cInfo['cmdlineopts'].profiler:
start_time = time()
@@ -461,28 +488,26 @@ class Plugin(object):
# For adding warning messages regarding configuration sanity
def addDiagnose(self, alertstring):
- """ Add a configuration sanity warning for this plugin. These
- will be displayed on-screen before collection and in the report as well.
+ """Add a configuration sanity warning for this plugin. These will be
+ displayed on-screen before collection and in the report as well.
"""
self.diagnose_msgs.append(alertstring)
# For adding output
def addAlert(self, alertstring):
- """ Add an alert to the collection of alerts for this plugin. These
+ """Add an alert to the collection of alerts for this plugin. These
will be displayed in the report
"""
self.alerts.append(alertstring)
def addCustomText(self, text):
- """ Append text to the custom text that is included in the report. This
+ """Append text to the custom text that is included in the report. This
is freeform and can include html.
"""
self.customText += text
def copyStuff(self):
- """
- Collect the data for a plugin
- """
+ """Collect the data for a plugin."""
for path, sub in self.copyPaths:
self.doCopyFileOrDir(path, sub=sub)
@@ -513,8 +538,13 @@ class Plugin(object):
return "<no description available>"
def checkenabled(self):
- """ This function can be overidden to let the plugin decide whether
- it should run or not.
+ """This method will be used to verify that a plugin should execute
+ given the condition of the underlying environment. The default
+ implementation will return True if neither class.files or
+ class.packages is specified. If either are specified the plugin will
+ check for the existence of any of the supplied files or packages and
+ return True if any exist. It is encouraged to override this method if
+ this behavior isn't applicabled.
"""
# some files or packages have been specified for this package
if self.files or self.packages:
@@ -634,9 +664,10 @@ class AS7Mixin(object):
self.parameters = {}
def url_parts(self):
- """Generator function to split a url into (key, value) tuples. The url
- should contain an even number of pairs. In the case of / the generator
- will immediately stop iteration."""
+ """Generator function to split a url into (key, value) tuples. The
+ url should contain an even number of pairs. In the case of / the
+ generator will immediately stop iteration.
+ """
parts = self.resource.strip("/").split("/")
if parts == ['']:
@@ -729,9 +760,10 @@ class AS7Mixin(object):
return err_msg
def set_domain_info(self, parameters=None):
- """This function will add host controller and server instance
- name data if it is present to the desired resource. This is to support
- domain-mode operation in AS7"""
+ """This function will add host controller and server instance name data
+ if it is present to the desired resource. This is to support
+ domain-mode operation in AS7.
+ """
host_controller_name = self.getOption("as7_host_controller_name")
server_name = self.getOption("as7_server_name")
@@ -756,7 +788,9 @@ class AS7Mixin(object):
def import_plugin(name, superclasses=None):
"""Import name as a module and return a list of all classes defined in that
- module"""
+ module. superclasses should be a tuple of valid superclasses to import,
+ this defaults to (Plugin,).
+ """
try:
plugin_fqname = "sos.plugins.%s" % name
if not superclasses:
diff --git a/sos/utilities.py b/sos/utilities.py
index 687c2513..3b76b999 100644
--- a/sos/utilities.py
+++ b/sos/utilities.py
@@ -20,6 +20,8 @@
# pylint: disable-msg = W0611
# pylint: disable-msg = W0613
+from __future__ import with_statement
+
import os
import re
import string
@@ -41,7 +43,16 @@ except ImportError:
from StringIO import StringIO
import time
+def tail(filename, number_of_bytes):
+ """Returns the last number_of_bytes of filename"""
+ with open(filename, "rb") as f:
+ if os.stat(filename).st_size > number_of_bytes:
+ f.seek(-number_of_bytes, 2)
+ return f.read()
+
+
def fileobj(path_or_file, mode='r'):
+ """Returns a file-like object that can be used as a context manager"""
if isinstance(path_or_file, basestring):
try:
return open(path_or_file, mode)
diff --git a/tests/plugin_tests.py b/tests/plugin_tests.py
new file mode 100644
index 00000000..87940b8e
--- /dev/null
+++ b/tests/plugin_tests.py
@@ -0,0 +1,311 @@
+import unittest
+import os
+import tempfile
+from StringIO import StringIO
+
+from sos.plugins import Plugin, regex_findall, sosRelPath
+from sos.utilities import Archive
+
+PATH = os.path.dirname(__file__)
+
+def j(filename):
+ return os.path.join(PATH, filename)
+
+def create_file(size):
+ f = tempfile.NamedTemporaryFile(delete=False)
+ f.write("*" * size * 1024 * 1024)
+ f.flush()
+ f.close()
+ return f.name
+
+class MockArchive(Archive):
+
+ def __init__(self):
+ self.m = {}
+ self.strings = {}
+
+ def name(self):
+ return "mock.archive"
+
+ def add_file(self, src, dest=None):
+ if not dest:
+ dest = src
+ self.m[src] = dest
+
+ def add_string(self, content, dest):
+ self.m[dest] = content
+
+ def add_link(self, dest, link_name):
+ pass
+
+ def open_file(self, name):
+ return open(self.m.get(name), 'r')
+
+ def close(self):
+ pass
+
+ def compress(self, method):
+ pass
+
+
+class MockPlugin(Plugin):
+
+ optionList = [("opt", 'an option', 'fast', None),
+ ("opt2", 'another option', 'fast', False)]
+
+ def setup(self):
+ pass
+
+
+class NamedMockPlugin(Plugin):
+ """This plugin has a description."""
+
+ plugin_name = "testing"
+
+ def setup(self):
+ pass
+
+
+class ForbiddenMockPlugin(Plugin):
+ """This plugin has a description."""
+
+ plugin_name = "forbidden"
+
+ def setup(self):
+ self.addForbiddenPath("tests")
+
+
+class EnablerPlugin(Plugin):
+
+ is_installed = False
+
+ def isInstalled(self, pkg):
+ return self.is_installed
+
+
+class MockOptions(object):
+
+ profiler = False
+
+
+
+class PluginToolTests(unittest.TestCase):
+
+ def test_regex_findall(self):
+ test_s = "\n".join(['this is only a test', 'there are only two lines'])
+ test_fo = StringIO(test_s)
+ matches = regex_findall(r".*lines$", test_fo)
+ self.assertEquals(matches, ['there are only two lines'])
+
+ def test_regex_findall_miss(self):
+ test_s = "\n".join(['this is only a test', 'there are only two lines'])
+ test_fo = StringIO(test_s)
+ matches = regex_findall(r".*not_there$", test_fo)
+ self.assertEquals(matches, [])
+
+ def test_regex_findall_bad_input(self):
+ matches = regex_findall(r".*", None)
+ self.assertEquals(matches, [])
+ matches = regex_findall(r".*", [])
+ self.assertEquals(matches, [])
+ matches = regex_findall(r".*", 1)
+ self.assertEquals(matches, [])
+
+ def test_rel_path(self):
+ path1 = "/usr/lib/foo"
+ path2 = "/usr/lib/boo"
+ self.assertEquals(sosRelPath(path1, path2), "../boo")
+
+ def test_abs_path(self):
+ path1 = "usr/lib/foo"
+ path2 = "foo/lib/boo"
+ self.assertEquals(sosRelPath(path1, path2), "foo/lib/boo")
+
+ def test_bad_path(self):
+ path1 = None
+ path2 = "foo/lib/boo"
+ self.assertEquals(sosRelPath(path1, path2), "foo/lib/boo")
+
+
+class PluginTests(unittest.TestCase):
+
+ def setUp(self):
+ self.mp = MockPlugin({
+ 'cmdlineopts': MockOptions()
+ })
+ self.mp.archive = MockArchive()
+
+ def test_plugin_default_name(self):
+ p = MockPlugin({})
+ self.assertEquals(p.name(), "mockplugin")
+
+ def test_plugin_set_name(self):
+ p = NamedMockPlugin({})
+ self.assertEquals(p.name(), "testing")
+
+ def test_plugin_no_descrip(self):
+ p = MockPlugin({})
+ self.assertEquals(p.get_description(), "<no description available>")
+
+ def test_plugin_no_descrip(self):
+ p = NamedMockPlugin({})
+ self.assertEquals(p.get_description(), "This plugin has a description.")
+
+ def test_set_plugin_option(self):
+ p = MockPlugin({})
+ p.setOption("opt", "testing")
+ self.assertEquals(p.getOption("opt"), "testing")
+
+ def test_set_nonexistant_plugin_option(self):
+ p = MockPlugin({})
+ self.assertFalse(p.setOption("badopt", "testing"))
+
+ def test_get_nonexistant_plugin_option(self):
+ p = MockPlugin({})
+ self.assertEquals(p.getOption("badopt"), 0)
+
+ def test_get_unset_plugin_option(self):
+ p = MockPlugin({})
+ self.assertEquals(p.getOption("opt"), 0)
+
+ def test_get_unset_plugin_option_with_default(self):
+ # this shows that even when we pass in a default to get,
+ # we'll get the option's default as set in the plugin
+ # this might not be what we really want
+ p = MockPlugin({})
+ self.assertEquals(p.getOption("opt", True), True)
+
+ def test_get_unset_plugin_option_with_default_not_none(self):
+ # this shows that even when we pass in a default to get,
+ # if the plugin default is not None
+ # we'll get the option's default as set in the plugin
+ # this might not be what we really want
+ p = MockPlugin({})
+ self.assertEquals(p.getOption("opt2", True), False)
+
+ def test_get_option_as_list_plugin_option(self):
+ p = MockPlugin({})
+ p.setOption("opt", "one,two,three")
+ self.assertEquals(p.getOptionAsList("opt"), ['one', 'two', 'three'])
+
+ def test_get_option_as_list_plugin_option_default(self):
+ p = MockPlugin({})
+ self.assertEquals(p.getOptionAsList("opt", default=[]), [])
+
+ def test_get_option_as_list_plugin_option_not_list(self):
+ p = MockPlugin({})
+ p.setOption("opt", "testing")
+ self.assertEquals(p.getOptionAsList("opt"), ['testing'])
+
+ def test_copy_dir(self):
+ self.mp.doCopyFileOrDir("tests")
+ self.assertEquals(self.mp.archive.m["tests/plugin_tests.py"], 'tests/plugin_tests.py')
+
+ def test_copy_dir_sub(self):
+ self.mp.doCopyFileOrDir("tests", sub=("tests/", "foobar/"))
+ self.assertEquals(self.mp.archive.m["tests/plugin_tests.py"], 'foobar/plugin_tests.py')
+
+ def test_copy_dir_bad_path(self):
+ self.mp.doCopyFileOrDir("not_here_tests")
+ self.assertEquals(self.mp.archive.m, {})
+
+ def test_copy_dir_forbidden_path(self):
+ p = ForbiddenMockPlugin({
+ 'cmdlineopts': MockOptions()
+ })
+ p.archive = MockArchive()
+ p.setup()
+ p.doCopyFileOrDir("tests")
+ self.assertEquals(p.archive.m, {})
+
+
+class AddCopySpecLimitTests(unittest.TestCase):
+
+ def setUp(self):
+ self.mp = MockPlugin({
+ 'cmdlineopts': MockOptions()
+ })
+ self.mp.archive = MockArchive()
+
+ def test_single_file_under_limit(self):
+ self.mp.addCopySpecLimit("tests/tail_test.txt", 1)
+ self.assertEquals(self.mp.copyPaths, [('tests/tail_test.txt', None)])
+
+ def test_single_file_over_limit(self):
+ fn = create_file(2) # create 2MB file, consider a context manager
+ self.mp.addCopySpecLimit(fn, 1, sub=('tmp', 'awesome'))
+ content, fname = self.mp.copyStrings[0]
+ self.assertTrue("tailed" in fname)
+ self.assertTrue("awesome" in fname)
+ self.assertTrue("/" not in fname)
+ self.assertEquals(1024 * 1024, len(content))
+ os.unlink(fn)
+
+ def test_bad_filename(self):
+ self.assertFalse(self.mp.addCopySpecLimit('', 1))
+ self.assertFalse(self.mp.addCopySpecLimit(None, 1))
+
+ def test_glob_file_over_limit(self):
+ # assume these are in /tmp
+ fn = create_file(2)
+ fn2 = create_file(2)
+ self.mp.addCopySpecLimit("/tmp/tmp*", 1)
+ self.assertEquals(len(self.mp.copyStrings), 1)
+ content, fname = self.mp.copyStrings[0]
+ self.assertTrue("tailed" in fname)
+ self.assertEquals(1024 * 1024, len(content))
+ os.unlink(fn)
+ os.unlink(fn2)
+
+
+class CheckEnabledTests(unittest.TestCase):
+
+ def setUp(self):
+ self.mp = EnablerPlugin({})
+
+ def test_checks_for_file(self):
+ f = j("tail_test.txt")
+ self.mp.files = (f,)
+ self.assertTrue(self.mp.checkenabled())
+
+ def test_checks_for_package(self):
+ self.mp.packages = ('foo',)
+ self.mp.is_installed = True
+ self.assertTrue(self.mp.checkenabled())
+
+ def test_allows_bad_tuple(self):
+ f = j("tail_test.txt")
+ self.mp.files = (f)
+ self.mp.packages = ('foo')
+ self.assertTrue(self.mp.checkenabled())
+
+ def test_enabled_by_default(self):
+ self.assertTrue(self.mp.checkenabled())
+
+
+class RegexSubTests(unittest.TestCase):
+
+ def setUp(self):
+ self.mp = MockPlugin({
+ 'cmdlineopts': MockOptions()
+ })
+ self.mp.archive = MockArchive()
+
+ def test_file_never_copied(self):
+ self.assertEquals(0, self.mp.doRegexSub("never_copied", r"^(.*)$", "foobar"))
+
+ def test_no_replacements(self):
+ self.mp.addCopySpec(j("tail_test.txt"))
+ self.mp.copyStuff()
+ replacements = self.mp.doRegexSub(j("tail_test.txt"), r"wont_match", "foobar")
+ self.assertEquals(0, replacements)
+
+ def test_replacements(self):
+ self.mp.addCopySpec(j("tail_test.txt"))
+ self.mp.copyStuff()
+ replacements = self.mp.doRegexSub(j("tail_test.txt"), r"(tail)", "foobar")
+ self.assertEquals(1, replacements)
+ self.assertTrue("foobar" in self.mp.archive.m.get(j('tail_test.txt')))
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/tail_test.txt b/tests/tail_test.txt
new file mode 100644
index 00000000..8def0f72
--- /dev/null
+++ b/tests/tail_test.txt
@@ -0,0 +1,4 @@
+this is a file to test tail with
+I have a few lines in here
+I just need enough text to mess with it
+this is the last line
diff --git a/tests/utilities_tests.py b/tests/utilities_tests.py
index 7eecc01a..24bc950b 100644
--- a/tests/utilities_tests.py
+++ b/tests/utilities_tests.py
@@ -2,7 +2,7 @@ import os.path
import unittest
from StringIO import StringIO
-from sos.utilities import grep, DirTree, checksum, get_hash_name, is_executable, sosGetCommandOutput, find
+from sos.utilities import grep, DirTree, checksum, get_hash_name, is_executable, sosGetCommandOutput, find, tail
import sos
TEST_DIR = os.path.dirname(__file__)
@@ -28,6 +28,18 @@ class GrepTest(unittest.TestCase):
self.assertEquals(matches, ['import unittest\n'])
+class TailTest(unittest.TestCase):
+
+ def test_tail(self):
+ t = tail("tests/tail_test.txt", 10)
+ self.assertEquals(t, "last line\n")
+
+ def test_tail_too_many(self):
+ t = tail("tests/tail_test.txt", 200)
+ expected = open("tests/tail_test.txt", "r").read()
+ self.assertEquals(t, expected)
+
+
class DirTreeTest(unittest.TestCase):
def test_makes_tree(self):