From 49a7771336ce09f6d42c7699ef32aecea0e83182 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Mon, 7 Dec 2009 20:07:55 -0500 Subject: Initial directory restructuring to clarify dependencies --- libbe/vcs.py | 941 ----------------------------------------------------------- 1 file changed, 941 deletions(-) delete mode 100644 libbe/vcs.py (limited to 'libbe/vcs.py') diff --git a/libbe/vcs.py b/libbe/vcs.py deleted file mode 100644 index 44643a4..0000000 --- a/libbe/vcs.py +++ /dev/null @@ -1,941 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# Alexander Belchenko -# Ben Finney -# Chris Ball -# Gianluca Montecchi -# W. Trevor King -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -""" -Define the base VCS (Version Control System) class, which should be -subclassed by other Version Control System backends. The base class -implements a "do not version" VCS. -""" - -import codecs -import os -import os.path -import re -from socket import gethostname -import shutil -import sys -import tempfile - -import libbe -from utility import Dir, search_parent_directories -from subproc import CommandError, invoke -from plugin import get_plugin - -if libbe.TESTING == True: - import unittest - import doctest - - -# List VCS modules in order of preference. -# Don't list this module, it is implicitly last. -VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg'] - -def set_preferred_vcs(name): - global VCS_ORDER - assert name in VCS_ORDER, \ - 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER) - VCS_ORDER.remove(name) - VCS_ORDER.insert(0, name) - -def _get_matching_vcs(matchfn): - """Return the first module for which matchfn(VCS_instance) is true""" - for submodname in VCS_ORDER: - module = get_plugin('libbe', submodname) - vcs = module.new() - if matchfn(vcs) == True: - return vcs - vcs.cleanup() - return VCS() - -def vcs_by_name(vcs_name): - """Return the module for the VCS with the given name""" - return _get_matching_vcs(lambda vcs: vcs.name == vcs_name) - -def detect_vcs(dir): - """Return an VCS instance for the vcs being used in this directory""" - return _get_matching_vcs(lambda vcs: vcs.detect(dir)) - -def installed_vcs(): - """Return an instance of an installed VCS""" - return _get_matching_vcs(lambda vcs: vcs.installed()) - - - -class SettingIDnotSupported(NotImplementedError): - pass - -class VCSnotRooted(Exception): - def __init__(self): - msg = "VCS not rooted" - Exception.__init__(self, msg) - -class PathNotInRoot(Exception): - def __init__(self, path, root): - msg = "Path '%s' not in root '%s'" % (path, root) - Exception.__init__(self, msg) - self.path = path - self.root = root - -class NoSuchFile(Exception): - def __init__(self, pathname, root="."): - path = os.path.abspath(os.path.join(root, pathname)) - Exception.__init__(self, "No such file: %s" % path) - -class EmptyCommit(Exception): - def __init__(self): - Exception.__init__(self, "No changes to commit") - - -def new(): - return VCS() - -class VCS(object): - """ - This class implements a 'no-vcs' interface. - - Support for other VCSs can be added by subclassing this class, and - overriding methods _vcs_*() with code appropriate for your VCS. - - The methods _u_*() are utility methods available to the _vcs_*() - methods. - """ - name = "None" - client = "" # command-line tool for _u_invoke_client - versioned = False - def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()): - self.paranoid = paranoid - self.verboseInvoke = False - self.rootdir = None - self._duplicateBasedir = None - self._duplicateDirname = None - self.encoding = encoding - def __str__(self): - return "<%s %s>" % (self.__class__.__name__, id(self)) - def __repr__(self): - return str(self) - def _vcs_version(self): - """ - Return the VCS version string. - """ - return "0.0" - def _vcs_detect(self, path=None): - """ - Detect whether a directory is revision controlled with this VCS. - """ - return True - def _vcs_root(self, path): - """ - Get the VCS root. This is the default working directory for - future invocations. You would normally set this to the root - directory for your VCS. - """ - if os.path.isdir(path)==False: - path = os.path.dirname(path) - if path == "": - path = os.path.abspath(".") - return path - def _vcs_init(self, path): - """ - Begin versioning the tree based at path. - """ - pass - def _vcs_cleanup(self): - """ - Remove any cruft that _vcs_init() created outside of the - versioned tree. - """ - pass - def _vcs_get_user_id(self): - """ - Get the VCS's suggested user id (e.g. "John Doe "). - If the VCS has not been configured with a username, return None. - """ - return None - def _vcs_set_user_id(self, value): - """ - Set the VCS's suggested user id (e.g "John Doe "). - This is run if the VCS has not been configured with a usename, so - that commits will have a reasonable FROM value. - """ - raise SettingIDnotSupported - def _vcs_add(self, path): - """ - Add the already created file at path to version control. - """ - pass - def _vcs_remove(self, path): - """ - Remove the file at path from version control. Optionally - remove the file from the filesystem as well. - """ - pass - def _vcs_update(self, path): - """ - Notify the versioning system of changes to the versioned file - at path. - """ - pass - def _vcs_get_file_contents(self, path, revision=None, binary=False): - """ - Get the file contents as they were in a given revision. - Revision==None specifies the current revision. - """ - assert revision == None, \ - "The %s VCS does not support revision specifiers" % self.name - if binary == False: - f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding) - else: - f = open(os.path.join(self.rootdir, path), "rb") - contents = f.read() - f.close() - return contents - def _vcs_duplicate_repo(self, directory, revision=None): - """ - Get the repository as it was in a given revision. - revision==None specifies the current revision. - dir specifies a directory to create the duplicate in. - """ - shutil.copytree(self.rootdir, directory, True) - def _vcs_commit(self, commitfile, allow_empty=False): - """ - Commit the current working directory, using the contents of - commitfile as the comment. Return the name of the old - revision (or None if commits are not supported). - - If allow_empty == False, raise EmptyCommit if there are no - changes to commit. - """ - return None - def _vcs_revision_id(self, index): - """ - Return the name of the th revision. Index will be an - integer (possibly <= 0). The choice of which branch to follow - when crossing branches/merges is not defined. - - Return None if revision IDs are not supported, or if the - specified revision does not exist. - """ - return None - def version(self): - """Cache version string for efficiency.""" - if not hasattr(self, '_version'): - self._version = self._get_version() - return self._version - def _get_version(self): - try: - ret = self._vcs_version() - return ret - except OSError, e: - if e.errno == errno.ENOENT: - return None - else: - raise OSError, e - except CommandError: - return None - def installed(self): - if self.version() != None: - return True - return False - def detect(self, path="."): - """ - Detect whether a directory is revision controlled with this VCS. - """ - return self._vcs_detect(path) - def root(self, path): - """ - Set the root directory to the path's VCS root. This is the - default working directory for future invocations. - """ - self.rootdir = self._vcs_root(path) - def init(self, path): - """ - Begin versioning the tree based at path. - Also roots the vcs at path. - """ - if os.path.isdir(path)==False: - path = os.path.dirname(path) - self._vcs_init(path) - self.root(path) - def cleanup(self): - self._vcs_cleanup() - def get_user_id(self): - """ - Get the VCS's suggested user id (e.g. "John Doe "). - If the VCS has not been configured with a username, return the user's - id. You can override the automatic lookup procedure by setting the - VCS.user_id attribute to a string of your choice. - """ - if hasattr(self, "user_id"): - if self.user_id != None: - return self.user_id - id = self._vcs_get_user_id() - if id == None: - name = self._u_get_fallback_username() - email = self._u_get_fallback_email() - id = self._u_create_id(name, email) - print >> sys.stderr, "Guessing id '%s'" % id - try: - self.set_user_id(id) - except SettingIDnotSupported: - pass - return id - def set_user_id(self, value): - """ - Set the VCS's suggested user id (e.g "John Doe "). - This is run if the VCS has not been configured with a usename, so - that commits will have a reasonable FROM value. - """ - self._vcs_set_user_id(value) - def add(self, path): - """ - Add the already created file at path to version control. - """ - self._vcs_add(self._u_rel_path(path)) - def remove(self, path): - """ - Remove a file from both version control and the filesystem. - """ - self._vcs_remove(self._u_rel_path(path)) - if os.path.exists(path): - os.remove(path) - def recursive_remove(self, dirname): - """ - Remove a file/directory and all its decendents from both - version control and the filesystem. - """ - if not os.path.exists(dirname): - raise NoSuchFile(dirname) - for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): - filenames.extend(dirnames) - for path in filenames: - fullpath = os.path.join(dirpath, path) - if os.path.exists(fullpath) == False: - continue - self._vcs_remove(self._u_rel_path(fullpath)) - if os.path.exists(dirname): - shutil.rmtree(dirname) - def update(self, path): - """ - Notify the versioning system of changes to the versioned file - at path. - """ - self._vcs_update(self._u_rel_path(path)) - def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False): - """ - Get the file as it was in a given revision. - Revision==None specifies the current revision. - - allow_no_vcs==True allows direct access to files through - codecs.open() or open() if the vcs decides it can't handle the - given path. - """ - if not os.path.exists(path): - raise NoSuchFile(path) - if self._use_vcs(path, allow_no_vcs): - relpath = self._u_rel_path(path) - contents = self._vcs_get_file_contents(relpath,revision,binary=binary) - else: - if binary == True: - f = codecs.open(path, "r", self.encoding) - else: - f = open(path, "rb") - contents = f.read() - f.close() - return contents - def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False): - """ - Set the file contents under version control. - """ - add = not os.path.exists(path) - if binary == False: - f = codecs.open(path, "w", self.encoding) - else: - f = open(path, "wb") - f.write(contents) - f.close() - - if self._use_vcs(path, allow_no_vcs): - if add: - self.add(path) - else: - self.update(path) - def mkdir(self, path, allow_no_vcs=False, check_parents=True): - """ - Create (if neccessary) a directory at path under version - control. - """ - if check_parents == True: - parent = os.path.dirname(path) - if not os.path.exists(parent): # recurse through parents - self.mkdir(parent, allow_no_vcs, check_parents) - if not os.path.exists(path): - os.mkdir(path) - if self._use_vcs(path, allow_no_vcs): - self.add(path) - else: - assert os.path.isdir(path) - if self._use_vcs(path, allow_no_vcs): - #self.update(path)# Don't update directories. Changing files - pass # underneath them should be sufficient. - - def duplicate_repo(self, revision=None): - """ - Get the repository as it was in a given revision. - revision==None specifies the current revision. - Return the path to the arbitrary directory at the base of the new repo. - """ - # Dirname in Basedir to protect against simlink attacks. - if self._duplicateBasedir == None: - self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs') - self._duplicateDirname = \ - os.path.join(self._duplicateBasedir, "duplicate") - self._vcs_duplicate_repo(directory=self._duplicateDirname, - revision=revision) - return self._duplicateDirname - def remove_duplicate_repo(self): - """ - Clean up a duplicate repo created with duplicate_repo(). - """ - if self._duplicateBasedir != None: - shutil.rmtree(self._duplicateBasedir) - self._duplicateBasedir = None - self._duplicateDirname = None - def commit(self, summary, body=None, allow_empty=False): - """ - Commit the current working directory, with a commit message - string summary and body. Return the name of the old revision - (or None if versioning is not supported). - - If allow_empty == False (the default), raise EmptyCommit if - there are no changes to commit. - """ - summary = summary.strip()+'\n' - if body is not None: - summary += '\n' + body.strip() + '\n' - descriptor, filename = tempfile.mkstemp() - revision = None - try: - temp_file = os.fdopen(descriptor, 'wb') - temp_file.write(summary) - temp_file.flush() - self.precommit() - revision = self._vcs_commit(filename, allow_empty=allow_empty) - temp_file.close() - self.postcommit() - finally: - os.remove(filename) - return revision - def precommit(self): - """ - Executed before all attempted commits. - """ - pass - def postcommit(self): - """ - Only executed after successful commits. - """ - pass - def revision_id(self, index=None): - """ - Return the name of the th revision. The choice of - which branch to follow when crossing branches/merges is not - defined. - - Return None if index==None, revision IDs are not supported, or - if the specified revision does not exist. - """ - if index == None: - return None - return self._vcs_revision_id(index) - def _u_any_in_string(self, list, string): - """ - Return True if any of the strings in list are in string. - Otherwise return False. - """ - for list_string in list: - if list_string in string: - return True - return False - def _u_invoke(self, *args, **kwargs): - if 'cwd' not in kwargs: - kwargs['cwd'] = self.rootdir - if 'verbose' not in kwargs: - kwargs['verbose'] = self.verboseInvoke - if 'encoding' not in kwargs: - kwargs['encoding'] = self.encoding - return invoke(*args, **kwargs) - def _u_invoke_client(self, *args, **kwargs): - cl_args = [self.client] - cl_args.extend(args) - return self._u_invoke(cl_args, **kwargs) - def _u_search_parent_directories(self, path, filename): - """ - Find the file (or directory) named filename in path or in any - of path's parents. - - e.g. - search_parent_directories("/a/b/c", ".be") - will return the path to the first existing file from - /a/b/c/.be - /a/b/.be - /a/.be - /.be - or None if none of those files exist. - """ - return search_parent_directories(path, filename) - def _use_vcs(self, path, allow_no_vcs): - """ - Try and decide if _vcs_add/update/mkdir/etc calls will - succeed. Returns True is we think the vcs_call would - succeeed, and False otherwise. - """ - use_vcs = True - exception = None - if self.rootdir != None: - if self.path_in_root(path) == False: - use_vcs = False - exception = PathNotInRoot(path, self.rootdir) - else: - use_vcs = False - exception = VCSnotRooted - if use_vcs == False and allow_no_vcs==False: - raise exception - return use_vcs - def path_in_root(self, path, root=None): - """ - Return the relative path to path from root. - >>> vcs = new() - >>> vcs.path_in_root("/a.b/c/.be", "/a.b/c") - True - >>> vcs.path_in_root("/a.b/.be", "/a.b/c") - False - """ - if root == None: - if self.rootdir == None: - raise VCSnotRooted - root = self.rootdir - path = os.path.abspath(path) - absRoot = os.path.abspath(root) - absRootSlashedDir = os.path.join(absRoot,"") - if not path.startswith(absRootSlashedDir): - return False - return True - def _u_rel_path(self, path, root=None): - """ - Return the relative path to path from root. - >>> vcs = new() - >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c") - '.be' - """ - if root == None: - if self.rootdir == None: - raise VCSnotRooted - root = self.rootdir - path = os.path.abspath(path) - absRoot = os.path.abspath(root) - absRootSlashedDir = os.path.join(absRoot,"") - if not path.startswith(absRootSlashedDir): - raise PathNotInRoot(path, absRootSlashedDir) - assert path != absRootSlashedDir, \ - "file %s == root directory %s" % (path, absRootSlashedDir) - relpath = path[len(absRootSlashedDir):] - return relpath - def _u_abspath(self, path, root=None): - """ - Return the absolute path from a path realtive to root. - >>> vcs = new() - >>> vcs._u_abspath(".be", "/a.b/c") - '/a.b/c/.be' - """ - if root == None: - assert self.rootdir != None, "VCS not rooted" - root = self.rootdir - return os.path.abspath(os.path.join(root, path)) - def _u_create_id(self, name, email=None): - """ - >>> vcs = new() - >>> vcs._u_create_id("John Doe", "jdoe@example.com") - 'John Doe ' - >>> vcs._u_create_id("John Doe") - 'John Doe' - """ - assert len(name) > 0 - if email == None or len(email) == 0: - return name - else: - return "%s <%s>" % (name, email) - def _u_parse_id(self, value): - """ - >>> vcs = new() - >>> vcs._u_parse_id("John Doe ") - ('John Doe', 'jdoe@example.com') - >>> vcs._u_parse_id("John Doe") - ('John Doe', None) - >>> try: - ... vcs._u_parse_id("John Doe ") - ... except AssertionError: - ... print "Invalid match" - Invalid match - """ - emailexp = re.compile("(.*) <([^>]*)>(.*)") - match = emailexp.search(value) - if match == None: - email = None - name = value - else: - assert len(match.groups()) == 3 - assert match.groups()[2] == "", match.groups() - email = match.groups()[1] - name = match.groups()[0] - assert name != None - assert len(name) > 0 - return (name, email) - def _u_get_fallback_username(self): - name = None - for envariable in ["LOGNAME", "USERNAME"]: - if os.environ.has_key(envariable): - name = os.environ[envariable] - break - assert name != None - return name - def _u_get_fallback_email(self): - hostname = gethostname() - name = self._u_get_fallback_username() - return "%s@%s" % (name, hostname) - def _u_parse_commitfile(self, commitfile): - """ - Split the commitfile created in self.commit() back into - summary and header lines. - """ - f = codecs.open(commitfile, "r", self.encoding) - summary = f.readline() - body = f.read() - body.lstrip('\n') - if len(body) == 0: - body = None - f.close() - return (summary, body) - - -if libbe.TESTING == True: - def setup_vcs_test_fixtures(testcase): - """Set up test fixtures for VCS test case.""" - testcase.vcs = testcase.Class() - testcase.dir = Dir() - testcase.dirname = testcase.dir.path - - vcs_not_supporting_uninitialized_user_id = [] - vcs_not_supporting_set_user_id = ["None", "hg"] - testcase.vcs_supports_uninitialized_user_id = ( - testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id) - testcase.vcs_supports_set_user_id = ( - testcase.vcs.name not in vcs_not_supporting_set_user_id) - - if not testcase.vcs.installed(): - testcase.fail( - "%(name)s VCS not found" % vars(testcase.Class)) - - if testcase.Class.name != "None": - testcase.failIf( - testcase.vcs.detect(testcase.dirname), - "Detected %(name)s VCS before initialising" - % vars(testcase.Class)) - - testcase.vcs.init(testcase.dirname) - - class VCSTestCase(unittest.TestCase): - """Test cases for base VCS class.""" - - Class = VCS - - def __init__(self, *args, **kwargs): - super(VCSTestCase, self).__init__(*args, **kwargs) - self.dirname = None - - def setUp(self): - super(VCSTestCase, self).setUp() - setup_vcs_test_fixtures(self) - - def tearDown(self): - self.vcs.cleanup() - self.dir.cleanup() - super(VCSTestCase, self).tearDown() - - def full_path(self, rel_path): - return os.path.join(self.dirname, rel_path) - - - class VCS_init_TestCase(VCSTestCase): - """Test cases for VCS.init method.""" - - def test_detect_should_succeed_after_init(self): - """Should detect VCS in directory after initialization.""" - self.failUnless( - self.vcs.detect(self.dirname), - "Did not detect %(name)s VCS after initialising" - % vars(self.Class)) - - def test_vcs_rootdir_in_specified_root_path(self): - """VCS root directory should be in specified root path.""" - rp = os.path.realpath(self.vcs.rootdir) - dp = os.path.realpath(self.dirname) - vcs_name = self.Class.name - self.failUnless( - dp == rp or rp == None, - "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars()) - - - class VCS_get_user_id_TestCase(VCSTestCase): - """Test cases for VCS.get_user_id method.""" - - def test_gets_existing_user_id(self): - """Should get the existing user ID.""" - if not self.vcs_supports_uninitialized_user_id: - return - - user_id = self.vcs.get_user_id() - self.failUnless( - user_id is not None, - "unable to get a user id") - - - class VCS_set_user_id_TestCase(VCSTestCase): - """Test cases for VCS.set_user_id method.""" - - def setUp(self): - super(VCS_set_user_id_TestCase, self).setUp() - - if self.vcs_supports_uninitialized_user_id: - self.prev_user_id = self.vcs.get_user_id() - else: - self.prev_user_id = "Uninitialized identity " - - if self.vcs_supports_set_user_id: - self.test_new_user_id = "John Doe " - self.vcs.set_user_id(self.test_new_user_id) - - def tearDown(self): - if self.vcs_supports_set_user_id: - self.vcs.set_user_id(self.prev_user_id) - super(VCS_set_user_id_TestCase, self).tearDown() - - def test_raises_error_in_unsupported_vcs(self): - """Should raise an error in a VCS that doesn't support it.""" - if self.vcs_supports_set_user_id: - return - self.assertRaises( - SettingIDnotSupported, - self.vcs.set_user_id, "foo") - - def test_updates_user_id_in_supporting_vcs(self): - """Should update the user ID in an VCS that supports it.""" - if not self.vcs_supports_set_user_id: - return - user_id = self.vcs.get_user_id() - self.failUnlessEqual( - self.test_new_user_id, user_id, - "user id not set correctly (expected %s, got %s)" - % (self.test_new_user_id, user_id)) - - - def setup_vcs_revision_test_fixtures(testcase): - """Set up revision test fixtures for VCS test case.""" - testcase.test_dirs = ['a', 'a/b', 'c'] - for path in testcase.test_dirs: - testcase.vcs.mkdir(testcase.full_path(path)) - - testcase.test_files = ['a/text', 'a/b/text'] - - testcase.test_contents = { - 'rev_1': "Lorem ipsum", - 'uncommitted': "dolor sit amet", - } - - - class VCS_mkdir_TestCase(VCSTestCase): - """Test cases for VCS.mkdir method.""" - - def setUp(self): - super(VCS_mkdir_TestCase, self).setUp() - setup_vcs_revision_test_fixtures(self) - - def tearDown(self): - for path in reversed(sorted(self.test_dirs)): - self.vcs.recursive_remove(self.full_path(path)) - super(VCS_mkdir_TestCase, self).tearDown() - - def test_mkdir_creates_directory(self): - """Should create specified directory in filesystem.""" - for path in self.test_dirs: - full_path = self.full_path(path) - self.failUnless( - os.path.exists(full_path), - "path %(full_path)s does not exist" % vars()) - - - class VCS_commit_TestCase(VCSTestCase): - """Test cases for VCS.commit method.""" - - def setUp(self): - super(VCS_commit_TestCase, self).setUp() - setup_vcs_revision_test_fixtures(self) - - def tearDown(self): - for path in reversed(sorted(self.test_dirs)): - self.vcs.recursive_remove(self.full_path(path)) - super(VCS_commit_TestCase, self).tearDown() - - def test_file_contents_as_specified(self): - """Should set file contents as specified.""" - test_contents = self.test_contents['rev_1'] - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents(full_path, test_contents) - current_contents = self.vcs.get_file_contents(full_path) - self.failUnlessEqual(test_contents, current_contents) - - def test_file_contents_as_committed(self): - """Should have file contents as specified after commit.""" - test_contents = self.test_contents['rev_1'] - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents(full_path, test_contents) - revision = self.vcs.commit("Initial file contents.") - current_contents = self.vcs.get_file_contents(full_path) - self.failUnlessEqual(test_contents, current_contents) - - def test_file_contents_as_set_when_uncommitted(self): - """Should set file contents as specified after commit.""" - if not self.vcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.vcs.commit("Initial file contents.") - self.vcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - current_contents = self.vcs.get_file_contents(full_path) - self.failUnlessEqual( - self.test_contents['uncommitted'], current_contents) - - def test_revision_file_contents_as_committed(self): - """Should get file contents as committed to specified revision.""" - if not self.vcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.vcs.commit("Initial file contents.") - self.vcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - committed_contents = self.vcs.get_file_contents( - full_path, revision) - self.failUnlessEqual( - self.test_contents['rev_1'], committed_contents) - - def test_revision_id_as_committed(self): - """Check for compatibility between .commit() and .revision_id()""" - if not self.vcs.versioned: - self.failUnlessEqual(self.vcs.revision_id(5), None) - return - committed_revisions = [] - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.vcs.commit("Initial %s contents." % path) - committed_revisions.append(revision) - self.vcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - revision = self.vcs.commit("Altered %s contents." % path) - committed_revisions.append(revision) - for i,revision in enumerate(committed_revisions): - self.failUnlessEqual(self.vcs.revision_id(i), revision) - i += -len(committed_revisions) # check negative indices - self.failUnlessEqual(self.vcs.revision_id(i), revision) - i = len(committed_revisions) - self.failUnlessEqual(self.vcs.revision_id(i), None) - self.failUnlessEqual(self.vcs.revision_id(-i-1), None) - - def test_revision_id_as_committed(self): - """Check revision id before first commit""" - if not self.vcs.versioned: - self.failUnlessEqual(self.vcs.revision_id(5), None) - return - committed_revisions = [] - for path in self.test_files: - self.failUnlessEqual(self.vcs.revision_id(0), None) - - - class VCS_duplicate_repo_TestCase(VCSTestCase): - """Test cases for VCS.duplicate_repo method.""" - - def setUp(self): - super(VCS_duplicate_repo_TestCase, self).setUp() - setup_vcs_revision_test_fixtures(self) - - def tearDown(self): - self.vcs.remove_duplicate_repo() - for path in reversed(sorted(self.test_dirs)): - self.vcs.recursive_remove(self.full_path(path)) - super(VCS_duplicate_repo_TestCase, self).tearDown() - - def test_revision_file_contents_as_committed(self): - """Should match file contents as committed to specified revision. - """ - if not self.vcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.vcs.commit("Commit current status") - self.vcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - dup_repo_path = self.vcs.duplicate_repo(revision) - dup_file_path = os.path.join(dup_repo_path, path) - dup_file_contents = file(dup_file_path, 'rb').read() - self.failUnlessEqual( - self.test_contents['rev_1'], dup_file_contents) - self.vcs.remove_duplicate_repo() - - - def make_vcs_testcase_subclasses(vcs_class, namespace): - """Make VCSTestCase subclasses for vcs_class in the namespace.""" - vcs_testcase_classes = [ - c for c in ( - ob for ob in globals().values() if isinstance(ob, type)) - if issubclass(c, VCSTestCase)] - - for base_class in vcs_testcase_classes: - testcase_class_name = vcs_class.__name__ + base_class.__name__ - testcase_class_bases = (base_class,) - testcase_class_dict = dict(base_class.__dict__) - testcase_class_dict['Class'] = vcs_class - testcase_class = type( - testcase_class_name, testcase_class_bases, testcase_class_dict) - setattr(namespace, testcase_class_name, testcase_class) - - - unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) - suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) -- cgit