aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
Diffstat (limited to 'libbe')
-rw-r--r--libbe/arch.py14
-rw-r--r--libbe/bzr.py13
-rw-r--r--libbe/git.py13
-rw-r--r--libbe/hg.py13
-rw-r--r--libbe/rcs.py338
5 files changed, 291 insertions, 100 deletions
diff --git a/libbe/arch.py b/libbe/arch.py
index a2d6bde..cd0f3b2 100644
--- a/libbe/arch.py
+++ b/libbe/arch.py
@@ -14,17 +14,20 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
import codecs
import os
import re
import shutil
+import sys
import time
import unittest
import doctest
import config
from beuuid import uuid_gen
-from rcs import RCS, RCStestCase, CommandError
+import rcs
+from rcs import RCS
DEFAULT_CLIENT = "tla"
@@ -280,9 +283,10 @@ class CantAddFile(Exception):
def __init__(self, file):
self.file = file
Exception.__init__(self, "Can't automatically add file %s" % file)
-
-class ArchTestCase(RCStestCase):
- Class = Arch
-unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase)
+
+
+rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__])
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/bzr.py b/libbe/bzr.py
index 38af6bb..98ca571 100644
--- a/libbe/bzr.py
+++ b/libbe/bzr.py
@@ -14,12 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
import os
import re
+import sys
import unittest
import doctest
-from rcs import RCS, RCStestCase, CommandError
+import rcs
+from rcs import RCS
def new():
return Bzr()
@@ -79,7 +82,7 @@ class Bzr(RCS):
def postcommit(self):
try:
self._u_invoke_client('merge')
- except CommandError, e:
+ except rcs.CommandError, e:
if ('No merge branch known or specified' in e.err_str or
'No merge location known or specified' in e.err_str):
pass
@@ -91,8 +94,8 @@ class Bzr(RCS):
if len(self._u_invoke_client('status', directory=directory)[1]) > 0:
self.commit('Merge from upstream')
-class BzrTestCase(RCStestCase):
- Class = Bzr
+
+rcs.make_rcs_testcase_subclasses(Bzr, sys.modules[__name__])
-unitsuite = unittest.TestLoader().loadTestsFromTestCase(BzrTestCase)
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/git.py b/libbe/git.py
index e57014f..401b5e5 100644
--- a/libbe/git.py
+++ b/libbe/git.py
@@ -13,12 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
import os
import re
+import sys
import unittest
import doctest
-from rcs import RCS, RCStestCase, CommandError
+import rcs
+from rcs import RCS
def new():
return Git()
@@ -91,9 +94,9 @@ class Git(RCS):
assert len(match.groups()) == 3
revision = match.groups()[1]
return revision
-
-class GitTestCase(RCStestCase):
- Class = Git
-unitsuite = unittest.TestLoader().loadTestsFromTestCase(GitTestCase)
+
+rcs.make_rcs_testcase_subclasses(Git, sys.modules[__name__])
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/hg.py b/libbe/hg.py
index c00d7e2..a7413af 100644
--- a/libbe/hg.py
+++ b/libbe/hg.py
@@ -13,12 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
import os
import re
+import sys
import unittest
import doctest
-from rcs import RCS, RCStestCase, CommandError, SettingIDnotSupported
+import rcs
+from rcs import RCS
def new():
return Hg()
@@ -49,7 +52,7 @@ class Hg(RCS):
standard Mercurial.
http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
"""
- raise SettingIDnotSupported
+ raise rcs.SettingIDnotSupported
def _rcs_add(self, path):
self._u_invoke_client("add", path)
def _rcs_remove(self, path):
@@ -79,8 +82,8 @@ class Hg(RCS):
revision = match.groups()[0]
return revision
-class HgTestCase(RCStestCase):
- Class = Hg
+
+rcs.make_rcs_testcase_subclasses(Hg, sys.modules[__name__])
-unitsuite = unittest.TestLoader().loadTestsFromTestCase(HgTestCase)
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/rcs.py b/libbe/rcs.py
index 786f9dd..c0b92e7 100644
--- a/libbe/rcs.py
+++ b/libbe/rcs.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
from subprocess import Popen, PIPE
import codecs
import os
@@ -553,94 +554,271 @@ class RCS(object):
f.close()
return (summary, body)
+
+def setup_rcs_test_fixtures(testcase):
+ """Set up test fixtures for RCS test case."""
+ testcase.rcs = testcase.Class()
+ testcase.dir = Dir()
+ testcase.dirname = testcase.dir.path
+
+ testcase.rcs_supports_uninitialized_user_id = (
+ testcase.rcs.name not in ["git"])
+ testcase.rcs_supports_set_user_id = (
+ testcase.rcs.name not in ["None", "hg"])
+
+ if not testcase.rcs.installed():
+ testcase.fail(
+ "%(name)s RCS not found" % vars(testcase.Class))
+
+ if testcase.Class.name != "None":
+ testcase.failIf(
+ testcase.rcs.detect(testcase.dirname),
+ "Detected %(name)s RCS before initialising"
+ % vars(testcase.Class))
+
+ testcase.rcs.init(testcase.dirname)
+
+
+class RCSTestCase(unittest.TestCase):
+ """Test cases for base RCS class."""
-class RCStestCase(unittest.TestCase):
Class = RCS
+
def __init__(self, *args, **kwargs):
- unittest.TestCase.__init__(self, *args, **kwargs)
+ super(RCSTestCase, self).__init__(*args, **kwargs)
self.dirname = None
- def instantiateRCS(self):
- return self.Class()
+
def setUp(self):
- self.dir = Dir()
- self.dirname = self.dir.path
- self.rcs = self.instantiateRCS()
+ super(RCSTestCase, self).setUp()
+ setup_rcs_test_fixtures(self)
+
def tearDown(self):
del(self.rcs)
- del(self.dirname)
- def fullPath(self, path):
- return os.path.join(self.dirname, path)
- def assertPathExists(self, path):
- fullpath = self.fullPath(path)
- self.failUnless(os.path.exists(fullpath)==True,
- "path %s does not exist" % fullpath)
- def uidTest(self):
+ super(RCSTestCase, self).tearDown()
+
+ def full_path(self, rel_path):
+ return os.path.join(self.dirname, rel_path)
+
+
+class RCS_init_TestCase(RCSTestCase):
+ """Test cases for RCS.init method."""
+
+ def test_detect_should_succeed_after_init(self):
+ """Should detect RCS in directory after initialization."""
+ self.failUnless(
+ self.rcs.detect(self.dirname),
+ "Did not detect %(name)s RCS after initialising"
+ % vars(self.Class))
+
+ def test_rcs_rootdir_in_specified_root_path(self):
+ """RCS root directory should be in specified root path."""
+ rp = os.path.realpath(self.rcs.rootdir)
+ dp = os.path.realpath(self.dirname)
+ rcs_name = self.Class.name
+ self.failUnless(
+ dp == rp or rp == None,
+ "%(rcs_name)s RCS root in wrong dir (%(dp)s %(rp)s)" % vars())
+
+
+class RCS_get_user_id_TestCase(RCSTestCase):
+ """Test cases for RCS.get_user_id method."""
+
+ def test_gets_existing_user_id(self):
+ """Should get the existing user ID."""
+ if not self.rcs_supports_uninitialized_user_id:
+ return
+
user_id = self.rcs.get_user_id()
- self.failUnless(user_id != None,
- "unable to get a user id")
- user_idB = "John Doe <jdoe@example.com>"
- if self.rcs.name in ["None", "hg"]:
- self.assertRaises(SettingIDnotSupported, self.rcs.set_user_id,
- user_idB)
+ self.failUnless(
+ user_id is not None,
+ "unable to get a user id")
+
+
+class RCS_set_user_id_TestCase(RCSTestCase):
+ """Test cases for RCS.set_user_id method."""
+
+ def setUp(self):
+ super(RCS_set_user_id_TestCase, self).setUp()
+
+ if self.rcs_supports_uninitialized_user_id:
+ self.prev_user_id = self.rcs.get_user_id()
else:
- self.rcs.set_user_id(user_idB)
- self.failUnless(self.rcs.get_user_id() == user_idB,
- "user id not set correctly (was %s, is %s)" \
- % (user_id, self.rcs.get_user_id()))
- self.failUnless(self.rcs.set_user_id(user_id) == None,
- "unable to restore user id %s" % user_id)
- self.failUnless(self.rcs.get_user_id() == user_id,
- "unable to restore user id %s" % user_id)
- def versionTest(self, path):
- origpath = path
- path = self.fullPath(path)
- contentsA = "Lorem ipsum"
- contentsB = "dolor sit amet"
- self.rcs.set_file_contents(path,contentsA)
- self.failUnless(self.rcs.get_file_contents(path)==contentsA,
- "File contents not set or read correctly")
- revision = self.rcs.commit("Commit current status")
- self.failUnless(self.rcs.get_file_contents(path)==contentsA,
- "Committing File contents not set or read correctly")
- if self.rcs.versioned == True:
- self.rcs.set_file_contents(path,contentsB)
- self.failUnless(self.rcs.get_file_contents(path)==contentsB,
- "File contents not set correctly after commit")
- contentsArev = self.rcs.get_file_contents(path, revision)
- self.failUnless(contentsArev==contentsA, \
- "Original file contents not saved in revision %s\n%s\n%s\n" \
- % (revision, contentsA, contentsArev))
- dup = self.rcs.duplicate_repo(revision)
- duppath = os.path.join(dup, origpath)
- dupcont = file(duppath, "rb").read()
- self.failUnless(dupcont == contentsA)
+ self.prev_user_id = "Uninitialized identity <bogus@example.org>"
+
+ if self.rcs_supports_set_user_id:
+ self.test_new_user_id = "John Doe <jdoe@example.com>"
+ self.rcs.set_user_id(self.test_new_user_id)
+
+ def tearDown(self):
+ if self.rcs_supports_set_user_id:
+ self.rcs.set_user_id(self.prev_user_id)
+ super(RCS_set_user_id_TestCase, self).tearDown()
+
+ def test_raises_error_in_unsupported_vcs(self):
+ """Should raise an error in a VCS that doesn't support it."""
+ if self.rcs_supports_set_user_id:
+ return
+ self.assertRaises(
+ SettingIDnotSupported,
+ self.rcs.set_user_id, "foo")
+
+ def test_updates_user_id_in_supporting_rcs(self):
+ """Should update the user ID in an RCS that supports it."""
+ if not self.rcs_supports_set_user_id:
+ return
+ user_id = self.rcs.get_user_id()
+ self.failUnlessEqual(
+ self.test_new_user_id, user_id,
+ "user id not set correctly (expected %s, got %s)"
+ % (self.test_new_user_id, user_id))
+
+
+def setup_rcs_revision_test_fixtures(testcase):
+ """Set up revision test fixtures for RCS test case."""
+ testcase.test_dirs = ['a', 'a/b', 'c']
+ for path in testcase.test_dirs:
+ testcase.rcs.mkdir(testcase.full_path(path))
+
+ testcase.test_files = ['a/text', 'a/b/text']
+
+ testcase.test_contents = {
+ 'rev_1': "Lorem ipsum",
+ 'uncommitted': "dolor sit amet",
+ }
+
+
+class RCS_mkdir_TestCase(RCSTestCase):
+ """Test cases for RCS.mkdir method."""
+
+ def setUp(self):
+ super(RCS_mkdir_TestCase, self).setUp()
+ setup_rcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ for path in reversed(sorted(self.test_dirs)):
+ self.rcs.recursive_remove(self.full_path(path))
+ super(RCS_mkdir_TestCase, self).tearDown()
+
+ def test_mkdir_creates_directory(self):
+ """Should create specified directory in filesystem."""
+ for path in self.test_dirs:
+ full_path = self.full_path(path)
+ self.failUnless(
+ os.path.exists(full_path),
+ "path %(full_path)s does not exist" % vars())
+
+
+class RCS_commit_TestCase(RCSTestCase):
+ """Test cases for RCS.commit method."""
+
+ def setUp(self):
+ super(RCS_commit_TestCase, self).setUp()
+ setup_rcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ for path in reversed(sorted(self.test_dirs)):
+ self.rcs.recursive_remove(self.full_path(path))
+ super(RCS_commit_TestCase, self).tearDown()
+
+ def test_file_contents_as_specified(self):
+ """Should set file contents as specified."""
+ test_contents = self.test_contents['rev_1']
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(full_path, test_contents)
+ current_contents = self.rcs.get_file_contents(full_path)
+ self.failUnlessEqual(test_contents, current_contents)
+
+ def test_file_contents_as_committed(self):
+ """Should have file contents as specified after commit."""
+ test_contents = self.test_contents['rev_1']
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(full_path, test_contents)
+ revision = self.rcs.commit("Initial file contents.")
+ current_contents = self.rcs.get_file_contents(full_path)
+ self.failUnlessEqual(test_contents, current_contents)
+
+ def test_file_contents_as_set_when_uncommitted(self):
+ """Should set file contents as specified after commit."""
+ if not self.rcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.rcs.commit("Initial file contents.")
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ current_contents = self.rcs.get_file_contents(full_path)
+ self.failUnlessEqual(
+ self.test_contents['uncommitted'], current_contents)
+
+ def test_revision_file_contents_as_committed(self):
+ """Should get file contents as committed to specified revision."""
+ if not self.rcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.rcs.commit("Initial file contents.")
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ committed_contents = self.rcs.get_file_contents(
+ full_path, revision)
+ self.failUnlessEqual(
+ self.test_contents['rev_1'], committed_contents)
+
+
+class RCS_duplicate_repo_TestCase(RCSTestCase):
+ """Test cases for RCS.duplicate_repo method."""
+
+ def setUp(self):
+ super(RCS_duplicate_repo_TestCase, self).setUp()
+ setup_rcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ self.rcs.remove_duplicate_repo()
+ for path in reversed(sorted(self.test_dirs)):
+ self.rcs.recursive_remove(self.full_path(path))
+ super(RCS_duplicate_repo_TestCase, self).tearDown()
+
+ def test_revision_file_contents_as_committed(self):
+ """Should match file contents as committed to specified revision."""
+ if not self.rcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.rcs.commit("Commit current status")
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ dup_repo_path = self.rcs.duplicate_repo(revision)
+ dup_file_path = os.path.join(dup_repo_path, path)
+ dup_file_contents = file(dup_file_path, 'rb').read()
+ self.failUnlessEqual(
+ self.test_contents['rev_1'], dup_file_contents)
self.rcs.remove_duplicate_repo()
- def testRun(self):
- self.failUnless(self.rcs.installed() == True,
- "%s RCS not found" % self.Class.name)
- if self.Class.name != "None":
- self.failUnless(self.rcs.detect(self.dirname)==False,
- "Detected %s RCS before initializing" \
- % self.Class.name)
- self.rcs.init(self.dirname)
- self.failUnless(self.rcs.detect(self.dirname)==True,
- "Did not detect %s RCS after initializing" \
- % self.Class.name)
- rp = os.path.realpath(self.rcs.rootdir)
- dp = os.path.realpath(self.dirname)
- self.failUnless(dp == rp or rp == None,
- "%s RCS root in wrong dir (%s %s)" \
- % (self.Class.name, dp, rp))
- self.uidTest()
- self.rcs.mkdir(self.fullPath('a'))
- self.rcs.mkdir(self.fullPath('a/b'))
- self.rcs.mkdir(self.fullPath('c'))
- self.assertPathExists('a')
- self.assertPathExists('a/b')
- self.assertPathExists('c')
- self.versionTest('a/text')
- self.versionTest('a/b/text')
- self.rcs.recursive_remove(self.fullPath('a'))
-
-unitsuite = unittest.TestLoader().loadTestsFromTestCase(RCStestCase)
+
+
+def make_rcs_testcase_subclasses(rcs_class, namespace):
+ """Make RCSTestCase subclasses for rcs_class in the namespace."""
+ rcs_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, RCSTestCase)]
+
+ for base_class in rcs_testcase_classes:
+ testcase_class_name = rcs_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = rcs_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])