diff options
-rw-r--r-- | libbe/arch.py | 14 | ||||
-rw-r--r-- | libbe/bzr.py | 13 | ||||
-rw-r--r-- | libbe/git.py | 13 | ||||
-rw-r--r-- | libbe/hg.py | 13 | ||||
-rw-r--r-- | libbe/rcs.py | 338 |
5 files changed, 291 insertions, 100 deletions
diff --git a/libbe/arch.py b/libbe/arch.py index a2d6bde..cd0f3b2 100644 --- a/libbe/arch.py +++ b/libbe/arch.py @@ -14,17 +14,20 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + import codecs import os import re import shutil +import sys import time import unittest import doctest import config from beuuid import uuid_gen -from rcs import RCS, RCStestCase, CommandError +import rcs +from rcs import RCS DEFAULT_CLIENT = "tla" @@ -280,9 +283,10 @@ class CantAddFile(Exception): def __init__(self, file): self.file = file Exception.__init__(self, "Can't automatically add file %s" % file) - -class ArchTestCase(RCStestCase): - Class = Arch -unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase) + + +rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__]) + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/bzr.py b/libbe/bzr.py index 38af6bb..98ca571 100644 --- a/libbe/bzr.py +++ b/libbe/bzr.py @@ -14,12 +14,15 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + import os import re +import sys import unittest import doctest -from rcs import RCS, RCStestCase, CommandError +import rcs +from rcs import RCS def new(): return Bzr() @@ -79,7 +82,7 @@ class Bzr(RCS): def postcommit(self): try: self._u_invoke_client('merge') - except CommandError, e: + except rcs.CommandError, e: if ('No merge branch known or specified' in e.err_str or 'No merge location known or specified' in e.err_str): pass @@ -91,8 +94,8 @@ class Bzr(RCS): if len(self._u_invoke_client('status', directory=directory)[1]) > 0: self.commit('Merge from upstream') -class BzrTestCase(RCStestCase): - Class = Bzr + +rcs.make_rcs_testcase_subclasses(Bzr, sys.modules[__name__]) -unitsuite = unittest.TestLoader().loadTestsFromTestCase(BzrTestCase) +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/git.py b/libbe/git.py index e57014f..401b5e5 100644 --- a/libbe/git.py +++ b/libbe/git.py @@ -13,12 +13,15 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + import os import re +import sys import unittest import doctest -from rcs import RCS, RCStestCase, CommandError +import rcs +from rcs import RCS def new(): return Git() @@ -91,9 +94,9 @@ class Git(RCS): assert len(match.groups()) == 3 revision = match.groups()[1] return revision - -class GitTestCase(RCStestCase): - Class = Git -unitsuite = unittest.TestLoader().loadTestsFromTestCase(GitTestCase) + +rcs.make_rcs_testcase_subclasses(Git, sys.modules[__name__]) + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/hg.py b/libbe/hg.py index c00d7e2..a7413af 100644 --- a/libbe/hg.py +++ b/libbe/hg.py @@ -13,12 +13,15 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + import os import re +import sys import unittest import doctest -from rcs import RCS, RCStestCase, CommandError, SettingIDnotSupported +import rcs +from rcs import RCS def new(): return Hg() @@ -49,7 +52,7 @@ class Hg(RCS): standard Mercurial. http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension """ - raise SettingIDnotSupported + raise rcs.SettingIDnotSupported def _rcs_add(self, path): self._u_invoke_client("add", path) def _rcs_remove(self, path): @@ -79,8 +82,8 @@ class Hg(RCS): revision = match.groups()[0] return revision -class HgTestCase(RCStestCase): - Class = Hg + +rcs.make_rcs_testcase_subclasses(Hg, sys.modules[__name__]) -unitsuite = unittest.TestLoader().loadTestsFromTestCase(HgTestCase) +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/rcs.py b/libbe/rcs.py index 786f9dd..c0b92e7 100644 --- a/libbe/rcs.py +++ b/libbe/rcs.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + from subprocess import Popen, PIPE import codecs import os @@ -553,94 +554,271 @@ class RCS(object): f.close() return (summary, body) + +def setup_rcs_test_fixtures(testcase): + """Set up test fixtures for RCS test case.""" + testcase.rcs = testcase.Class() + testcase.dir = Dir() + testcase.dirname = testcase.dir.path + + testcase.rcs_supports_uninitialized_user_id = ( + testcase.rcs.name not in ["git"]) + testcase.rcs_supports_set_user_id = ( + testcase.rcs.name not in ["None", "hg"]) + + if not testcase.rcs.installed(): + testcase.fail( + "%(name)s RCS not found" % vars(testcase.Class)) + + if testcase.Class.name != "None": + testcase.failIf( + testcase.rcs.detect(testcase.dirname), + "Detected %(name)s RCS before initialising" + % vars(testcase.Class)) + + testcase.rcs.init(testcase.dirname) + + +class RCSTestCase(unittest.TestCase): + """Test cases for base RCS class.""" -class RCStestCase(unittest.TestCase): Class = RCS + def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) + super(RCSTestCase, self).__init__(*args, **kwargs) self.dirname = None - def instantiateRCS(self): - return self.Class() + def setUp(self): - self.dir = Dir() - self.dirname = self.dir.path - self.rcs = self.instantiateRCS() + super(RCSTestCase, self).setUp() + setup_rcs_test_fixtures(self) + def tearDown(self): del(self.rcs) - del(self.dirname) - def fullPath(self, path): - return os.path.join(self.dirname, path) - def assertPathExists(self, path): - fullpath = self.fullPath(path) - self.failUnless(os.path.exists(fullpath)==True, - "path %s does not exist" % fullpath) - def uidTest(self): + super(RCSTestCase, self).tearDown() + + def full_path(self, rel_path): + return os.path.join(self.dirname, rel_path) + + +class RCS_init_TestCase(RCSTestCase): + """Test cases for RCS.init method.""" + + def test_detect_should_succeed_after_init(self): + """Should detect RCS in directory after initialization.""" + self.failUnless( + self.rcs.detect(self.dirname), + "Did not detect %(name)s RCS after initialising" + % vars(self.Class)) + + def test_rcs_rootdir_in_specified_root_path(self): + """RCS root directory should be in specified root path.""" + rp = os.path.realpath(self.rcs.rootdir) + dp = os.path.realpath(self.dirname) + rcs_name = self.Class.name + self.failUnless( + dp == rp or rp == None, + "%(rcs_name)s RCS root in wrong dir (%(dp)s %(rp)s)" % vars()) + + +class RCS_get_user_id_TestCase(RCSTestCase): + """Test cases for RCS.get_user_id method.""" + + def test_gets_existing_user_id(self): + """Should get the existing user ID.""" + if not self.rcs_supports_uninitialized_user_id: + return + user_id = self.rcs.get_user_id() - self.failUnless(user_id != None, - "unable to get a user id") - user_idB = "John Doe <jdoe@example.com>" - if self.rcs.name in ["None", "hg"]: - self.assertRaises(SettingIDnotSupported, self.rcs.set_user_id, - user_idB) + self.failUnless( + user_id is not None, + "unable to get a user id") + + +class RCS_set_user_id_TestCase(RCSTestCase): + """Test cases for RCS.set_user_id method.""" + + def setUp(self): + super(RCS_set_user_id_TestCase, self).setUp() + + if self.rcs_supports_uninitialized_user_id: + self.prev_user_id = self.rcs.get_user_id() else: - self.rcs.set_user_id(user_idB) - self.failUnless(self.rcs.get_user_id() == user_idB, - "user id not set correctly (was %s, is %s)" \ - % (user_id, self.rcs.get_user_id())) - self.failUnless(self.rcs.set_user_id(user_id) == None, - "unable to restore user id %s" % user_id) - self.failUnless(self.rcs.get_user_id() == user_id, - "unable to restore user id %s" % user_id) - def versionTest(self, path): - origpath = path - path = self.fullPath(path) - contentsA = "Lorem ipsum" - contentsB = "dolor sit amet" - self.rcs.set_file_contents(path,contentsA) - self.failUnless(self.rcs.get_file_contents(path)==contentsA, - "File contents not set or read correctly") - revision = self.rcs.commit("Commit current status") - self.failUnless(self.rcs.get_file_contents(path)==contentsA, - "Committing File contents not set or read correctly") - if self.rcs.versioned == True: - self.rcs.set_file_contents(path,contentsB) - self.failUnless(self.rcs.get_file_contents(path)==contentsB, - "File contents not set correctly after commit") - contentsArev = self.rcs.get_file_contents(path, revision) - self.failUnless(contentsArev==contentsA, \ - "Original file contents not saved in revision %s\n%s\n%s\n" \ - % (revision, contentsA, contentsArev)) - dup = self.rcs.duplicate_repo(revision) - duppath = os.path.join(dup, origpath) - dupcont = file(duppath, "rb").read() - self.failUnless(dupcont == contentsA) + self.prev_user_id = "Uninitialized identity <bogus@example.org>" + + if self.rcs_supports_set_user_id: + self.test_new_user_id = "John Doe <jdoe@example.com>" + self.rcs.set_user_id(self.test_new_user_id) + + def tearDown(self): + if self.rcs_supports_set_user_id: + self.rcs.set_user_id(self.prev_user_id) + super(RCS_set_user_id_TestCase, self).tearDown() + + def test_raises_error_in_unsupported_vcs(self): + """Should raise an error in a VCS that doesn't support it.""" + if self.rcs_supports_set_user_id: + return + self.assertRaises( + SettingIDnotSupported, + self.rcs.set_user_id, "foo") + + def test_updates_user_id_in_supporting_rcs(self): + """Should update the user ID in an RCS that supports it.""" + if not self.rcs_supports_set_user_id: + return + user_id = self.rcs.get_user_id() + self.failUnlessEqual( + self.test_new_user_id, user_id, + "user id not set correctly (expected %s, got %s)" + % (self.test_new_user_id, user_id)) + + +def setup_rcs_revision_test_fixtures(testcase): + """Set up revision test fixtures for RCS test case.""" + testcase.test_dirs = ['a', 'a/b', 'c'] + for path in testcase.test_dirs: + testcase.rcs.mkdir(testcase.full_path(path)) + + testcase.test_files = ['a/text', 'a/b/text'] + + testcase.test_contents = { + 'rev_1': "Lorem ipsum", + 'uncommitted': "dolor sit amet", + } + + +class RCS_mkdir_TestCase(RCSTestCase): + """Test cases for RCS.mkdir method.""" + + def setUp(self): + super(RCS_mkdir_TestCase, self).setUp() + setup_rcs_revision_test_fixtures(self) + + def tearDown(self): + for path in reversed(sorted(self.test_dirs)): + self.rcs.recursive_remove(self.full_path(path)) + super(RCS_mkdir_TestCase, self).tearDown() + + def test_mkdir_creates_directory(self): + """Should create specified directory in filesystem.""" + for path in self.test_dirs: + full_path = self.full_path(path) + self.failUnless( + os.path.exists(full_path), + "path %(full_path)s does not exist" % vars()) + + +class RCS_commit_TestCase(RCSTestCase): + """Test cases for RCS.commit method.""" + + def setUp(self): + super(RCS_commit_TestCase, self).setUp() + setup_rcs_revision_test_fixtures(self) + + def tearDown(self): + for path in reversed(sorted(self.test_dirs)): + self.rcs.recursive_remove(self.full_path(path)) + super(RCS_commit_TestCase, self).tearDown() + + def test_file_contents_as_specified(self): + """Should set file contents as specified.""" + test_contents = self.test_contents['rev_1'] + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents(full_path, test_contents) + current_contents = self.rcs.get_file_contents(full_path) + self.failUnlessEqual(test_contents, current_contents) + + def test_file_contents_as_committed(self): + """Should have file contents as specified after commit.""" + test_contents = self.test_contents['rev_1'] + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents(full_path, test_contents) + revision = self.rcs.commit("Initial file contents.") + current_contents = self.rcs.get_file_contents(full_path) + self.failUnlessEqual(test_contents, current_contents) + + def test_file_contents_as_set_when_uncommitted(self): + """Should set file contents as specified after commit.""" + if not self.rcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.rcs.commit("Initial file contents.") + self.rcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + current_contents = self.rcs.get_file_contents(full_path) + self.failUnlessEqual( + self.test_contents['uncommitted'], current_contents) + + def test_revision_file_contents_as_committed(self): + """Should get file contents as committed to specified revision.""" + if not self.rcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.rcs.commit("Initial file contents.") + self.rcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + committed_contents = self.rcs.get_file_contents( + full_path, revision) + self.failUnlessEqual( + self.test_contents['rev_1'], committed_contents) + + +class RCS_duplicate_repo_TestCase(RCSTestCase): + """Test cases for RCS.duplicate_repo method.""" + + def setUp(self): + super(RCS_duplicate_repo_TestCase, self).setUp() + setup_rcs_revision_test_fixtures(self) + + def tearDown(self): + self.rcs.remove_duplicate_repo() + for path in reversed(sorted(self.test_dirs)): + self.rcs.recursive_remove(self.full_path(path)) + super(RCS_duplicate_repo_TestCase, self).tearDown() + + def test_revision_file_contents_as_committed(self): + """Should match file contents as committed to specified revision.""" + if not self.rcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.rcs.commit("Commit current status") + self.rcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + dup_repo_path = self.rcs.duplicate_repo(revision) + dup_file_path = os.path.join(dup_repo_path, path) + dup_file_contents = file(dup_file_path, 'rb').read() + self.failUnlessEqual( + self.test_contents['rev_1'], dup_file_contents) self.rcs.remove_duplicate_repo() - def testRun(self): - self.failUnless(self.rcs.installed() == True, - "%s RCS not found" % self.Class.name) - if self.Class.name != "None": - self.failUnless(self.rcs.detect(self.dirname)==False, - "Detected %s RCS before initializing" \ - % self.Class.name) - self.rcs.init(self.dirname) - self.failUnless(self.rcs.detect(self.dirname)==True, - "Did not detect %s RCS after initializing" \ - % self.Class.name) - rp = os.path.realpath(self.rcs.rootdir) - dp = os.path.realpath(self.dirname) - self.failUnless(dp == rp or rp == None, - "%s RCS root in wrong dir (%s %s)" \ - % (self.Class.name, dp, rp)) - self.uidTest() - self.rcs.mkdir(self.fullPath('a')) - self.rcs.mkdir(self.fullPath('a/b')) - self.rcs.mkdir(self.fullPath('c')) - self.assertPathExists('a') - self.assertPathExists('a/b') - self.assertPathExists('c') - self.versionTest('a/text') - self.versionTest('a/b/text') - self.rcs.recursive_remove(self.fullPath('a')) - -unitsuite = unittest.TestLoader().loadTestsFromTestCase(RCStestCase) + + +def make_rcs_testcase_subclasses(rcs_class, namespace): + """Make RCSTestCase subclasses for rcs_class in the namespace.""" + rcs_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, RCSTestCase)] + + for base_class in rcs_testcase_classes: + testcase_class_name = rcs_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = rcs_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |