diff options
Diffstat (limited to 'libbe/rcs.py')
-rw-r--r-- | libbe/rcs.py | 578 |
1 files changed, 533 insertions, 45 deletions
diff --git a/libbe/rcs.py b/libbe/rcs.py index 4487fba..2993a80 100644 --- a/libbe/rcs.py +++ b/libbe/rcs.py @@ -15,42 +15,43 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA from subprocess import Popen, PIPE +import os +import os.path +from socket import gethostname +import re import sys +import tempfile +import shutil +import unittest +import doctest +from utility import Dir -def rcs_by_name(rcs_name): - """Return the module for the RCS with the given name""" - if rcs_name == "Arch": - import arch - return arch - elif rcs_name == "bzr": - import bzr - return bzr - elif rcs_name == "hg": - import hg - return hg - elif rcs_name == "git": - import git - return git - elif rcs_name == "None": - import no_rcs - return no_rcs - -def detect(dir): - """Return the module for the rcs being used in this directory""" +def _get_matching_rcs(matchfn): + """Return the first module for which matchfn(RCS_instance) is true""" import arch import bzr import hg import git - if arch.detect(dir): - return arch - elif bzr.detect(dir): - return bzr - elif hg.detect(dir): - return hg - elif git.detect(dir): - return git - import no_rcs - return no_rcs + for module in [arch, bzr, hg, git]: + rcs = module.new() + if matchfn(rcs): + return rcs + else: + del(rcs) + return RCS() + +def rcs_by_name(rcs_name): + """Return the module for the RCS with the given name""" + return _get_matching_rcs(lambda rcs: rcs.name == rcs_name) + +def detect_rcs(dir): + """Return an RCS instance for the rcs being used in this directory""" + return _get_matching_rcs(lambda rcs: rcs.detect(dir)) + +def installed_rcs(): + """Return an instance of an installed RCS""" + return _get_matching_rcs(lambda rcs: rcs.installed()) + class CommandError(Exception): def __init__(self, err_str, status): @@ -58,19 +59,506 @@ class CommandError(Exception): self.err_str = err_str self.status = status -def invoke(args, expect=(0,), cwd=None): - try : - if sys.platform != "win32": - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd) +class SettingIDnotSupported(NotImplementedError): + pass + +def new(): + return RCS() + +class RCS(object): + """ + Implement the 'no-rcs' interface. + + Support for other RCSs can be added by subclassing this class, and + overriding methods _rcs_*() with code appropriate for your RCS. + + The methods _u_*() are utility methods available to the _rcs_*() + methods. + """ + name = "None" + client = "" # command-line tool for _u_invoke_client + versioned = False + def __init__(self, paranoid=False): + self.paranoid = paranoid + self.verboseInvoke = False + self.rootdir = None + self._duplicateBasedir = None + self._duplicateDirname = None + def __del__(self): + self.cleanup() + + def _rcs_help(self): + """ + Return the command help string. + (Allows a simple test to see if the client is installed.) + """ + pass + def _rcs_detect(self, path=None): + """ + Detect whether a directory is revision controlled with this RCS. + """ + return True + def _rcs_root(self, path): + """ + Get the RCS root. This is the default working directory for + future invocations. You would normally set this to the root + directory for your RCS. + """ + if os.path.isdir(path)==False: + path = os.path.dirname(path) + if path == "": + path = os.path.abspath(".") + return path + def _rcs_init(self, path): + """ + Begin versioning the tree based at path. + """ + pass + def _rcs_cleanup(self): + """ + Remove any cruft that _rcs_init() created outside of the + versioned tree. + """ + pass + def _rcs_get_user_id(self): + """ + Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the RCS has not been configured with a username, return None. + """ + return None + def _rcs_set_user_id(self, value): + """ + Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>"). + This is run if the RCS has not been configured with a usename, so + that commits will have a reasonable FROM value. + """ + raise SettingIDnotSupported + def _rcs_add(self, path): + """ + Add the already created file at path to version control. + """ + pass + def _rcs_remove(self, path): + """ + Remove the file at path from version control. Optionally + remove the file from the filesystem as well. + """ + pass + def _rcs_update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + pass + def _rcs_get_file_contents(self, path, revision=None): + """ + Get the file as it was in a given revision. + Revision==None specifies the current revision. + """ + assert revision == None, \ + "The %s RCS does not support revision specifiers" % self.name + return file(os.path.join(self.rootdir, path), "rb").read() + def _rcs_duplicate_repo(self, directory, revision=None): + """ + Get the repository as it was in a given revision. + revision==None specifies the current revision. + dir specifies a directory to create the duplicate in. + """ + shutil.copytree(self.rootdir, directory, True) + def _rcs_commit(self, commitfile): + """ + Commit the current working directory, using the contents of + commitfile as the comment. Return the name of the old + revision. + """ + return None + def installed(self): + try: + self._rcs_help() + return True + except OSError, e: + if e.errno == errno.ENOENT: + return False + raise e + def detect(self, path=None): + """ + Detect whether a directory is revision controlled with this RCS. + """ + return self._rcs_detect(path) + def root(self, path): + """ + Set the root directory to the path's RCS root. This is the + default working directory for future invocations. + """ + self.rootdir = self._rcs_root(path) + def init(self, path): + """ + Begin versioning the tree based at path. + Also roots the rcs at path. + """ + if os.path.isdir(path)==False: + path = os.path.dirname(path) + self._rcs_init(path) + self.root(path) + def cleanup(self): + self._rcs_cleanup() + def get_user_id(self): + """ + Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the RCS has not been configured with a username, return the user's + id. + """ + id = self._rcs_get_user_id() + if id == None: + name = self._u_get_fallback_username() + email = self._u_get_fallback_email() + id = self._u_create_id(name, email) + print >> sys.stderr, "Guessing id '%s'" % id + try: + self.set_user_id(id) + except SettingIDnotSupported: + pass + return id + def set_user_id(self, value): + """ + Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>"). + This is run if the RCS has not been configured with a usename, so + that commits will have a reasonable FROM value. + """ + self._rcs_set_user_id(value) + def add(self, path): + """ + Add the already created file at path to version control. + """ + self._rcs_add(self._u_rel_path(path)) + def remove(self, path): + """ + Remove a file from both version control and the filesystem. + """ + self._rcs_remove(self._u_rel_path(path)) + if os.path.exists(path): + os.remove(path) + def recursive_remove(self, dirname): + """ + Remove a file/directory and all its decendents from both + version control and the filesystem. + """ + for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): + filenames.extend(dirnames) + for path in filenames: + fullpath = os.path.join(dirpath, path) + if os.path.exists(fullpath) == False: + continue + self._rcs_remove(self._u_rel_path(fullpath)) + if os.path.exists(dirname): + shutil.rmtree(dirname) + def update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + self._rcs_update(self._u_rel_path(path)) + def get_file_contents(self, path, revision=None): + """ + Get the file as it was in a given revision. + Revision==None specifies the current revision. + """ + relpath = self._u_rel_path(path) + return self._rcs_get_file_contents(relpath, revision) + def set_file_contents(self, path, contents): + """ + Set the file contents under version control. + """ + add = not os.path.exists(path) + file(path, "wb").write(contents) + if add: + self.add(path) + else: + self.update(path) + def mkdir(self, path): + """ + Created directory at path under version control. + """ + os.mkdir(path) + self.add(path) + def duplicate_repo(self, revision=None): + """ + Get the repository as it was in a given revision. + revision==None specifies the current revision. + Return the path to the arbitrary directory at the base of the new repo. + """ + # Dirname in Baseir to protect against simlink attacks. + if self._duplicateBasedir == None: + self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs') + self._duplicateDirname = \ + os.path.join(self._duplicateBasedir, "duplicate") + self._rcs_duplicate_repo(directory=self._duplicateDirname, + revision=revision) + return self._duplicateDirname + def remove_duplicate_repo(self): + """ + Clean up a duplicate repo created with duplicate_repo(). + """ + if self._duplicateBasedir != None: + shutil.rmtree(self._duplicateBasedir) + self._duplicateBasedir = None + self._duplicateDirname = None + def commit(self, summary, body=None): + """ + Commit the current working directory, with a commit message + string summary and body. Return the name of the old revision + (or None if versioning is not supported). + """ + if body is not None: + summary += '\n' + body + descriptor, filename = tempfile.mkstemp() + revision = None + try: + temp_file = os.fdopen(descriptor, 'wb') + temp_file.write(summary) + temp_file.flush() + revision = self._rcs_commit(filename) + temp_file.close() + finally: + os.remove(filename) + return revision + def precommit(self, directory): + pass + def postcommit(self, directory): + pass + def _u_invoke(self, args, expect=(0,), cwd=None): + if cwd == None: + cwd = self.rootdir + try : + if self.verboseInvoke == True: + print "%s$ %s" % (cwd, " ".join(args)) + if sys.platform != "win32": + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd) + else: + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, + shell=True, cwd=cwd) + except OSError, e : + strerror = "%s\nwhile executing %s" % (e.args[1], args) + raise CommandError(strerror, e.args[0]) + output, error = q.communicate() + status = q.wait() + if status not in expect: + raise CommandError(error, status) + return status, output, error + def _u_invoke_client(self, *args, **kwargs): + directory = kwargs.get('directory',None) + expect = kwargs.get('expect', (0,)) + cl_args = [self.client] + cl_args.extend(args) + return self._u_invoke(cl_args, expect, cwd=directory) + def _u_search_parent_directories(self, path, filename): + """ + Find the file (or directory) named filename in path or in any + of path's parents. + + e.g. + search_parent_directories("/a/b/c", ".be") + will return the path to the first existing file from + /a/b/c/.be + /a/b/.be + /a/.be + /.be + or None if none of those files exist. + """ + path = os.path.realpath(path) + assert os.path.exists(path) + old_path = None + while True: + if os.path.exists(os.path.join(path, filename)): + return os.path.join(path, filename) + if path == old_path: + return None + old_path = path + path = os.path.dirname(path) + def _u_rel_path(self, path, root=None): + """ + Return the relative path to path from root. + >>> rcs = new() + >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c") + '.be' + """ + if root == None: + assert self.rootdir != None, "RCS not rooted" + root = self.rootdir + if os.path.isabs(path): + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + assert path.startswith(absRootSlashedDir), \ + "file %s not in root %s" % (path, absRootSlashedDir) + assert path != absRootSlashedDir, \ + "file %s == root directory %s" % (path, absRootSlashedDir) + path = path[len(absRootSlashedDir):] + return path + def _u_abspath(self, path, root=None): + """ + Return the absolute path from a path realtive to root. + >>> rcs = new() + >>> rcs._u_abspath(".be", "/a.b/c") + '/a.b/c/.be' + """ + if root == None: + assert self.rootdir != None, "RCS not rooted" + root = self.rootdir + return os.path.abspath(os.path.join(root, path)) + def _u_create_id(self, name, email=None): + """ + >>> rcs = new() + >>> rcs._u_create_id("John Doe", "jdoe@example.com") + 'John Doe <jdoe@example.com>' + >>> rcs._u_create_id("John Doe") + 'John Doe' + """ + assert len(name) > 0 + if email == None or len(email) == 0: + return name else: - # win32 don't have os.execvp() so have to run command in a shell - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True, - cwd=cwd) - except OSError, e : - strerror = "%s\nwhile executing %s" % (e.args[1], args) - raise CommandError(strerror, e.args[0]) - output, error = q.communicate() - status = q.wait() - if status not in expect: - raise CommandError(error, status) - return status, output, error + return "%s <%s>" % (name, email) + def _u_parse_id(self, value): + """ + >>> rcs = new() + >>> rcs._u_parse_id("John Doe <jdoe@example.com>") + ('John Doe', 'jdoe@example.com') + >>> rcs._u_parse_id("John Doe") + ('John Doe', None) + >>> try: + ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>") + ... except AssertionError: + ... print "Invalid match" + Invalid match + """ + emailexp = re.compile("(.*) <([^>]*)>(.*)") + match = emailexp.search(value) + if match == None: + email = None + name = value + else: + assert len(match.groups()) == 3 + assert match.groups()[2] == "", match.groups() + email = match.groups()[1] + name = match.groups()[0] + assert name != None + assert len(name) > 0 + return (name, email) + def _u_get_fallback_username(self): + name = None + for envariable in ["LOGNAME", "USERNAME"]: + if os.environ.has_key(envariable): + name = os.environ[envariable] + break + assert name != None + return name + def _u_get_fallback_email(self): + hostname = gethostname() + name = self._u_get_fallback_username() + return "%s@%s" % (name, hostname) + def _u_parse_commitfile(self, commitfile): + """ + Split the commitfile created in self.commit() back into + summary and header lines. + """ + f = file(commitfile, "rb") + summary = f.readline() + body = f.read() + body.lstrip('\n') + if len(body) == 0: + body = None + f.close + return (summary, body) + + +class RCStestCase(unittest.TestCase): + Class = RCS + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + self.dirname = None + def instantiateRCS(self): + return self.Class() + def setUp(self): + self.dir = Dir() + self.dirname = self.dir.path + self.rcs = self.instantiateRCS() + def tearDown(self): + del(self.rcs) + del(self.dirname) + def fullPath(self, path): + return os.path.join(self.dirname, path) + def assertPathExists(self, path): + fullpath = self.fullPath(path) + self.failUnless(os.path.exists(fullpath)==True, + "path %s does not exist" % fullpath) + def uidTest(self): + user_id = self.rcs.get_user_id() + self.failUnless(user_id != None, + "unable to get a user id") + user_idB = "John Doe <jdoe@example.com>" + if self.rcs.name in ["None", "hg"]: + self.assertRaises(SettingIDnotSupported, self.rcs.set_user_id, + user_idB) + else: + self.rcs.set_user_id(user_idB) + self.failUnless(self.rcs.get_user_id() == user_idB, + "user id not set correctly (was %s, is %s)" \ + % (user_id, self.rcs.get_user_id())) + self.failUnless(self.rcs.set_user_id(user_id) == None, + "unable to restore user id %s" % user_id) + self.failUnless(self.rcs.get_user_id() == user_id, + "unable to restore user id %s" % user_id) + def versionTest(self, path): + origpath = path + path = self.fullPath(path) + contentsA = "Lorem ipsum" + contentsB = "dolor sit amet" + self.rcs.set_file_contents(path,contentsA) + self.failUnless(self.rcs.get_file_contents(path)==contentsA, + "File contents not set or read correctly") + revision = self.rcs.commit("Commit current status") + self.failUnless(self.rcs.get_file_contents(path)==contentsA, + "Committing File contents not set or read correctly") + if self.rcs.versioned == True: + self.rcs.set_file_contents(path,contentsB) + self.failUnless(self.rcs.get_file_contents(path)==contentsB, + "File contents not set correctly after commit") + contentsArev = self.rcs.get_file_contents(path, revision) + self.failUnless(contentsArev==contentsA, \ + "Original file contents not saved in revision %s\n%s\n%s\n" \ + % (revision, contentsA, contentsArev)) + dup = self.rcs.duplicate_repo(revision) + duppath = os.path.join(dup, origpath) + dupcont = file(duppath, "rb").read() + self.failUnless(dupcont == contentsA) + self.rcs.remove_duplicate_repo() + def testRun(self): + self.failUnless(self.rcs.installed() == True, + "%s RCS not found" % self.Class.name) + if self.Class.name != "None": + self.failUnless(self.rcs.detect(self.dirname)==False, + "Detected %s RCS before initializing" \ + % self.Class.name) + self.rcs.init(self.dirname) + self.failUnless(self.rcs.detect(self.dirname)==True, + "Did not detect %s RCS after initializing" \ + % self.Class.name) + rp = os.path.realpath(self.rcs.rootdir) + dp = os.path.realpath(self.dirname) + self.failUnless(dp == rp or rp == None, + "%s RCS root in wrong dir (%s %s)" \ + % (self.Class.name, dp, rp)) + self.uidTest() + self.rcs.mkdir(self.fullPath('a')) + self.rcs.mkdir(self.fullPath('a/b')) + self.rcs.mkdir(self.fullPath('c')) + self.assertPathExists('a') + self.assertPathExists('a/b') + self.assertPathExists('c') + self.versionTest('a/text') + self.versionTest('a/b/text') + self.rcs.recursive_remove(self.fullPath('a')) + +unitsuite = unittest.TestLoader().loadTestsFromTestCase(RCStestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |