aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.travis.yml1
-rw-r--r--sos/archive.py70
-rw-r--r--tests/archive_tests.py62
3 files changed, 69 insertions, 64 deletions
diff --git a/.travis.yml b/.travis.yml
index 4d1406ec..5b87103a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,7 @@ matrix:
allow_failures:
- python: 3.2
- python: 3.3
+ - python: "pypy"
notifications:
email: false
install:
diff --git a/sos/archive.py b/sos/archive.py
index 42916606..080f2593 100644
--- a/sos/archive.py
+++ b/sos/archive.py
@@ -1,4 +1,4 @@
-## Copyright (C) 2012 Red Hat, Inc.,
+## Copyright (C) 2012 Red Hat, Inc.,
## Jesse Jaggars <jjaggars@redhat.com>
## Bryn M. Reeves <bmr@redhat.com>
##
@@ -17,26 +17,20 @@
## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import os
import time
-import tempfile
import tarfile
import zipfile
import shutil
import logging
-import shutil
import shlex
import re
# required for compression callout (FIXME: move to policy?)
-from subprocess import Popen, PIPE, STDOUT
+from subprocess import Popen, PIPE
try:
import selinux
except ImportError:
pass
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
class Archive(object):
@@ -71,25 +65,26 @@ class Archive(object):
def finalize(self, method):
"""Finalize an archive object via method. This may involve creating
- An archive that is subsequently compressed or simply closing an
+ An archive that is subsequently compressed or simply closing an
archive that supports in-line handling. If method is automatic then
the following technologies are tried in order: xz, bz2 and gzip"""
self.close()
+
class FileCacheArchive(Archive):
_tmp_dir = ""
_archive_root = ""
_archive_path = ""
-
+
def __init__(self, name, tmpdir):
self._name = name
self._tmp_dir = tmpdir
self._archive_root = os.path.join(tmpdir, name)
os.makedirs(self._archive_root, 0700)
- self.log.debug("initialised empty FileCacheArchive at %s"
- % self._archive_root)
+ self.log.debug("initialised empty FileCacheArchive at %s" %
+ (self._archive_root,))
def dest_path(self, name):
if os.path.isabs(name):
@@ -113,10 +108,10 @@ class FileCacheArchive(Archive):
shutil.copystat(src, dest)
stat = os.stat(src)
os.chown(dest, stat.st_uid, stat.st_gid)
- except IOError as e:
+ except IOError:
self.log.info("caught IO error copying %s" % src)
- self.log.debug("added %s to FileCacheArchive %s"
- % (src, self._archive_root))
+ self.log.debug("added %s to FileCacheArchive %s" %
+ (src, self._archive_root))
def add_string(self, content, dest):
src = dest
@@ -127,7 +122,7 @@ class FileCacheArchive(Archive):
if os.path.exists(src):
shutil.copystat(src, dest)
self.log.debug("added string at %s to FileCacheArchive %s"
- % (src, self._archive_root))
+ % (src, self._archive_root))
def add_link(self, source, link_name):
dest = self.dest_path(link_name)
@@ -135,21 +130,21 @@ class FileCacheArchive(Archive):
if not os.path.exists(dest):
os.symlink(source, dest)
self.log.debug("added symlink at %s to %s in FileCacheArchive %s"
- % (dest, source, self._archive_root))
+ % (dest, source, self._archive_root))
def add_dir(self, path):
self.makedirs(path)
def _makedirs(self, path, mode=0700):
os.makedirs(path, mode)
-
+
def get_tmp_dir(self):
return self._archive_root
def makedirs(self, path, mode=0700):
self._makedirs(self.dest_path(path))
self.log.debug("created directory at %s in FileCacheArchive %s"
- % (path, self._archive_root))
+ % (path, self._archive_root))
def open_file(self, path):
path = self.dest_path(path)
@@ -157,15 +152,16 @@ class FileCacheArchive(Archive):
def cleanup(self):
shutil.rmtree(self._archive_root)
-
+
def finalize(self, method):
self.log.debug("finalizing archive %s" % self._archive_root)
self._build_archive()
self.cleanup()
self.log.debug("built archive at %s (size=%d)" % (self._archive_path,
- os.stat(self._archive_path).st_size))
+ os.stat(self._archive_path).st_size))
return self._compress()
+
class TarFileArchive(FileCacheArchive):
method = None
@@ -186,7 +182,7 @@ class TarFileArchive(FileCacheArchive):
tar_info.mode = fstat.st_mode
tar_info.uid = fstat.st_uid
tar_info.gid = fstat.st_gid
-
+
# this can be used to set permissions if using the
# tarfile.add() interface to add directory trees.
def copy_permissions_filter(self, tarinfo):
@@ -201,7 +197,7 @@ class TarFileArchive(FileCacheArchive):
context = self.get_selinux_context(orig_path)
if(context):
tarinfo.pax_headers['RHT.security.selinux'] = context
- self.set_tarinfo_from_stat(tarinfo,fstat)
+ self.set_tarinfo_from_stat(tarinfo, fstat)
return tarinfo
def get_selinux_context(self, path):
@@ -219,11 +215,12 @@ class TarFileArchive(FileCacheArchive):
old_umask = os.umask(0077)
os.chdir(self._tmp_dir)
tar = tarfile.open(self._archive_path, mode="w")
- tar.add(os.path.split(self._name)[1], filter=self.copy_permissions_filter)
+ tar.add(os.path.split(self._name)[1],
+ filter=self.copy_permissions_filter)
tar.close()
os.umask(old_umask)
os.chdir(old_pwd)
-
+
def _compress(self):
methods = ['xz', 'bzip2', 'gzip']
if self.method in methods:
@@ -238,7 +235,7 @@ class TarFileArchive(FileCacheArchive):
if cmd != "gzip":
cmd = "%s -1" % cmd
try:
- command = shlex.split("%s %s" % (cmd,self.name()))
+ command = shlex.split("%s %s" % (cmd, self.name()))
p = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=-1)
stdout, stderr = p.communicate()
if stdout:
@@ -252,6 +249,7 @@ class TarFileArchive(FileCacheArchive):
else:
raise last_error
+
class ZipFileArchive(Archive):
def __init__(self, name):
@@ -262,7 +260,8 @@ class ZipFileArchive(Archive):
except:
self.compression = zipfile.ZIP_STORED
- self.zipfile = zipfile.ZipFile(self.name(), mode="w", compression=self.compression)
+ self.zipfile = zipfile.ZipFile(self.name(), mode="w",
+ compression=self.compression)
def name(self):
return "%s.zip" % self._name
@@ -284,19 +283,19 @@ class ZipFileArchive(Archive):
for filename in filenames:
filename = "/".join((path, filename))
if dest:
- self.zipfile.write(filename,
- self.prepend(re.sub(regex, dest, filename)))
+ self.zipfile.write(filename, re.sub(regex, dest,
+ filename))
else:
- self.zipfile.write(filename, self.prepend(filename))
+ self.zipfile.write(filename)
else:
if dest:
- self.zipfile.write(src, self.prepend(dest))
+ self.zipfile.write(src, dest)
else:
- self.zipfile.write(src, self.prepend(src))
+ self.zipfile.write(src)
def add_string(self, content, dest):
- info = zipfile.ZipInfo(self.prepend(dest),
- date_time=time.localtime(time.time()))
+ info = zipfile.ZipInfo(dest,
+ date_time=time.localtime(time.time()))
info.compress_type = self.compression
info.external_attr = 0400 << 16L
self.zipfile.writestr(info, content)
@@ -305,7 +304,6 @@ class ZipFileArchive(Archive):
try:
self.zipfile.close()
self.zipfile = zipfile.ZipFile(self.name(), mode="r")
- name = self.prepend(name)
file_obj = self.zipfile.open(name)
return file_obj
finally:
@@ -314,5 +312,3 @@ class ZipFileArchive(Archive):
def close(self):
self.zipfile.close()
-
-
diff --git a/tests/archive_tests.py b/tests/archive_tests.py
index 7847c7a4..abdce994 100644
--- a/tests/archive_tests.py
+++ b/tests/archive_tests.py
@@ -4,9 +4,12 @@ import unittest
import os
import tarfile
import zipfile
+import tempfile
+import shutil
from sos.archive import TarFileArchive, ZipFileArchive
+
class ZipFileArchiveTest(unittest.TestCase):
def setUp(self):
@@ -27,37 +30,37 @@ class ZipFileArchiveTest(unittest.TestCase):
self.zf.add_file('tests/ziptest')
self.zf.close()
- self.check_for_file('test/tests/ziptest')
+ self.check_for_file('tests/ziptest')
def test_add_unicode_file(self):
self.zf.add_file(u'tests/')
self.zf.close()
- self.check_for_file('test/tests/ziptest')
+ self.check_for_file('tests/ziptest')
def test_add_dir(self):
self.zf.add_file('tests/')
self.zf.close()
- self.check_for_file('test/tests/ziptest')
+ self.check_for_file('tests/ziptest')
def test_add_renamed(self):
self.zf.add_file('tests/ziptest', dest='tests/ziptest_renamed')
self.zf.close()
- self.check_for_file('test/tests/ziptest_renamed')
+ self.check_for_file('tests/ziptest_renamed')
def test_add_renamed_dir(self):
self.zf.add_file('tests/', 'tests_renamed/')
self.zf.close()
- self.check_for_file('test/tests_renamed/ziptest')
+ self.check_for_file('tests_renamed/ziptest')
def test_add_string(self):
self.zf.add_string('this is content', 'tests/string_test.txt')
self.zf.close()
- self.check_for_file('test/tests/string_test.txt')
+ self.check_for_file('tests/string_test.txt')
def test_get_file(self):
self.zf.add_string('this is my content', 'tests/string_test.txt')
@@ -72,40 +75,45 @@ class ZipFileArchiveTest(unittest.TestCase):
afp = self.zf.open_file('tests/string_test.txt')
self.assertEquals('this is my new content', afp.read())
- def test_make_link(self):
- self.zf.add_file('tests/ziptest')
- self.zf.add_link('tests/ziptest', 'link_name')
+# Disabled as new api doesnt provide a add_link routine
+# def test_make_link(self):
+# self.zf.add_file('tests/ziptest')
+# self.zf.add_link('tests/ziptest', 'link_name')
+#
+# self.zf.close()
+# try:
+# self.check_for_file('test/link_name')
+# self.fail("link should not exist")
+# except KeyError:
+# pass
- self.zf.close()
- try:
- self.check_for_file('test/link_name')
- self.fail("link should not exist")
- except KeyError:
- pass
+# Disabled as new api doesnt provide a compress routine
+# def test_compress(self):
+# self.assertEquals(self.zf.compress("zip"), self.zf.name())
- def test_compress(self):
- self.assertEquals(self.zf.compress("zip"), self.zf.name())
class TarFileArchiveTest(unittest.TestCase):
def setUp(self):
- self.tf = TarFileArchive('test')
+ self.tmpdir = tempfile.mkdtemp()
+ self.tf = TarFileArchive('test', self.tmpdir)
def tearDown(self):
- os.unlink(self.tf.name())
+ shutil.rmtree(self.tmpdir)
def check_for_file(self, filename):
- rtf = tarfile.open('test.tar')
+ rtf = tarfile.open(os.path.join(self.tmpdir, 'test.tar'))
rtf.getmember(filename)
rtf.close()
def test_create(self):
- self.tf.close()
- self.assertTrue(os.path.exists('test.tar'))
+ self.tf.finalize('auto')
+ self.assertTrue(os.path.exists(os.path.join(self.tmpdir,
+ 'test.tar')))
def test_add_file(self):
self.tf.add_file('tests/ziptest')
- self.tf.close()
+ self.tf.finalize('auto')
self.check_for_file('test/tests/ziptest')
@@ -119,7 +127,7 @@ class TarFileArchiveTest(unittest.TestCase):
def test_add_renamed(self):
self.tf.add_file('tests/ziptest', dest='tests/ziptest_renamed')
- self.tf.close()
+ self.tf.finalize('auto')
self.check_for_file('test/tests/ziptest_renamed')
@@ -133,7 +141,7 @@ class TarFileArchiveTest(unittest.TestCase):
def test_add_string(self):
self.tf.add_string('this is content', 'tests/string_test.txt')
- self.tf.close()
+ self.tf.finalize('auto')
self.check_for_file('test/tests/string_test.txt')
@@ -154,11 +162,11 @@ class TarFileArchiveTest(unittest.TestCase):
self.tf.add_file('tests/ziptest')
self.tf.add_link('tests/ziptest', 'link_name')
- self.tf.close()
+ self.tf.finalize('auto')
self.check_for_file('test/link_name')
def test_compress(self):
- name = self.tf.compress("gzip")
+ self.tf.finalize("auto")
if __name__ == "__main__":
unittest.main()