aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage/vcs
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/storage/vcs')
-rw-r--r--libbe/storage/vcs/__init__.py41
-rw-r--r--libbe/storage/vcs/arch.py441
-rw-r--r--libbe/storage/vcs/base.py1127
-rw-r--r--libbe/storage/vcs/bzr.py361
-rw-r--r--libbe/storage/vcs/darcs.py399
-rw-r--r--libbe/storage/vcs/git.py269
-rw-r--r--libbe/storage/vcs/hg.py257
7 files changed, 2895 insertions, 0 deletions
diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py
new file mode 100644
index 0000000..552d43e
--- /dev/null
+++ b/libbe/storage/vcs/__init__.py
@@ -0,0 +1,41 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Define the Version Controlled System (VCS)-based
+:class:`~libbe.storage.base.Storage` and
+:class:`~libbe.storage.base.VersionedStorage` implementations.
+
+There is a base class (:class:`~libbe.storage.vcs.VCS`) translating
+Storage language to VCS language, and a number of `VCS` implementations:
+
+* :class:`~libbe.storage.vcs.arch.Arch`
+* :class:`~libbe.storage.vcs.bzr.Bzr`
+* :class:`~libbe.storage.vcs.darcs.Darcs`
+* :class:`~libbe.storage.vcs.git.Git`
+* :class:`~libbe.storage.vcs.hg.Hg`
+
+The base `VCS` class also serves as a filesystem Storage backend (not
+versioning) in the event that a user has no VCS installed.
+"""
+
+import base
+
+set_preferred_vcs = base.set_preferred_vcs
+vcs_by_name = base.vcs_by_name
+detect_vcs = base.detect_vcs
+installed_vcs = base.installed_vcs
+
+__all__ = [set_preferred_vcs, vcs_by_name, detect_vcs, installed_vcs]
diff --git a/libbe/storage/vcs/arch.py b/libbe/storage/vcs/arch.py
new file mode 100644
index 0000000..3a50414
--- /dev/null
+++ b/libbe/storage/vcs/arch.py
@@ -0,0 +1,441 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# James Rowe <jnrowe@ukfsn.org>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""GNU Arch_ (tla) backend.
+
+.. _Arch: http://www.gnu.org/software/gnu-arch/
+"""
+
+import codecs
+import os
+import os.path
+import re
+import shutil
+import sys
+import time # work around http://mercurial.selenic.com/bts/issue618
+
+import libbe
+import libbe.ui.util.user
+import libbe.storage.util.config
+from libbe.util.id import uuid_gen
+from libbe.util.subproc import CommandError
+import base
+
+if libbe.TESTING == True:
+ import unittest
+ import doctest
+
+
+class CantAddFile(Exception):
+ def __init__(self, file):
+ self.file = file
+ Exception.__init__(self, "Can't automatically add file %s" % file)
+
+DEFAULT_CLIENT = 'tla'
+
+client = libbe.storage.util.config.get_val(
+ 'arch_client', default=DEFAULT_CLIENT)
+
+def new():
+ return Arch()
+
+class Arch(base.VCS):
+ """:class:`base.VCS` implementation for GNU Arch.
+ """
+ name = 'arch'
+ client = client
+ _archive_name = None
+ _archive_dir = None
+ _tmp_archive = False
+ _project_name = None
+ _tmp_project = False
+ _arch_paramdir = os.path.expanduser('~/.arch-params')
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+ self.interspersed_vcs_files = True
+ self.paranoid = False
+ self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618
+
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client('--version')
+ version = '\n'.join(output.splitlines()[:2])
+ return version
+
+ def _vcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Arch"""
+ if self._u_search_parent_directories(path, '{arch}') != None :
+ libbe.storage.util.config.set_val('arch_client', client)
+ return True
+ return False
+
+ def _vcs_init(self, path):
+ self._create_archive(path)
+ self._create_project(path)
+ self._add_project_code(path)
+
+ def _create_archive(self, path):
+ """Create a temporary Arch archive in the directory PATH. This
+ archive will be removed by::
+
+ destroy->_vcs_destroy->_remove_archive
+ """
+ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
+ assert self._archive_name == None
+ id = self.get_user_id()
+ name, email = libbe.ui.util.user.parse_user_id(id)
+ if email == None:
+ email = '%s@example.com' % name
+ trailer = '%s-%s' % ('bugs-everywhere-auto', uuid_gen()[0:8])
+ self._archive_name = '%s--%s' % (email, trailer)
+ self._archive_dir = '/tmp/%s' % trailer
+ self._tmp_archive = True
+ self._u_invoke_client('make-archive', self._archive_name,
+ self._archive_dir, cwd=path)
+
+ def _invoke_client(self, *args, **kwargs):
+ """Invoke the client on our archive.
+ """
+ assert self._archive_name != None
+ command = args[0]
+ if len(args) > 1:
+ tailargs = args[1:]
+ else:
+ tailargs = []
+ arglist = [command, '-A', self._archive_name]
+ arglist.extend(tailargs)
+ args = tuple(arglist)
+ return self._u_invoke_client(*args, **kwargs)
+
+ def _remove_archive(self):
+ assert self._tmp_archive == True
+ assert self._archive_dir != None
+ assert self._archive_name != None
+ os.remove(os.path.join(self._arch_paramdir,
+ '=locations', self._archive_name))
+ shutil.rmtree(self._archive_dir)
+ self._tmp_archive = False
+ self._archive_dir = False
+ self._archive_name = False
+
+ def _create_project(self, path):
+ """
+ Create a temporary Arch project in the directory PATH. This
+ project will be removed by
+ destroy->_vcs_destroy->_remove_project
+ """
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
+ category = 'bugs-everywhere'
+ branch = 'mainline'
+ version = '0.1'
+ self._project_name = '%s--%s--%s' % (category, branch, version)
+ self._invoke_client('archive-setup', self._project_name,
+ cwd=path)
+ self._tmp_project = True
+
+ def _remove_project(self):
+ assert self._tmp_project == True
+ assert self._project_name != None
+ assert self._archive_dir != None
+ shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
+ self._tmp_project = False
+ self._project_name = False
+
+ def _archive_project_name(self):
+ assert self._archive_name != None
+ assert self._project_name != None
+ return '%s/%s' % (self._archive_name, self._project_name)
+
+ def _adjust_naming_conventions(self, path):
+ """Adjust `Arch naming conventions`_ so ``.be`` is considered source
+ code.
+
+ By default, Arch restricts source code filenames to::
+
+ ^[_=a-zA-Z0-9].*$
+
+ Since our bug directory ``.be`` doesn't satisfy these conventions,
+ we need to adjust them. The conventions are specified in::
+
+ project-root/{arch}/=tagging-method
+
+ .. _Arch naming conventions:
+ http://regexps.srparish.net/tutorial-tla/naming-conventions.html
+ """
+ tagpath = os.path.join(path, '{arch}', '=tagging-method')
+ lines_out = []
+ f = codecs.open(tagpath, 'r', self.encoding)
+ for line in f:
+ if line.startswith('source '):
+ lines_out.append('source ^[._=a-zA-X0-9].*$\n')
+ else:
+ lines_out.append(line)
+ f.close()
+ f = codecs.open(tagpath, 'w', self.encoding)
+ f.write(''.join(lines_out))
+ f.close()
+
+ def _add_project_code(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-source.html
+ # http://regexps.srparish.net/tutorial-tla/importing-first.html
+ self._invoke_client('init-tree', self._project_name,
+ cwd=path)
+ self._adjust_naming_conventions(path)
+ self._invoke_client('import', '--summary', 'Began versioning',
+ cwd=path)
+
+ def _vcs_destroy(self):
+ if self._tmp_project == True:
+ self._remove_project()
+ if self._tmp_archive == True:
+ self._remove_archive()
+ vcs_dir = os.path.join(self.repo, '{arch}')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+ self._archive_name = None
+
+ def _vcs_root(self, path):
+ if not os.path.isdir(path):
+ dirname = os.path.dirname(path)
+ else:
+ dirname = path
+ status,output,error = self._u_invoke_client('tree-root', dirname)
+ root = output.rstrip('\n')
+
+ self._get_archive_project_name(root)
+
+ return root
+
+ def _get_archive_name(self, root):
+ status,output,error = self._u_invoke_client('archives')
+ lines = output.split('\n')
+ # e.g. output:
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52
+ # /tmp/BEtestXXXXXX/rootdir
+ # (+ repeats)
+ for archive,location in zip(lines[::2], lines[1::2]):
+ if os.path.realpath(location) == os.path.realpath(root):
+ self._archive_name = archive
+ assert self._archive_name != None
+
+ def _get_archive_project_name(self, root):
+ # get project names
+ status,output,error = self._u_invoke_client('tree-version', cwd=root)
+ # e.g output
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1
+ archive_name,project_name = output.rstrip('\n').split('/')
+ self._archive_name = archive_name
+ self._project_name = project_name
+
+ def _vcs_get_user_id(self):
+ try:
+ status,output,error = self._u_invoke_client('my-id')
+ return output.rstrip('\n')
+ except Exception, e:
+ if 'no arch user id set' in e.args[0]:
+ return None
+ else:
+ raise
+
+ def _vcs_add(self, path):
+ self._u_invoke_client('add-id', path)
+ realpath = os.path.realpath(self._u_abspath(path))
+ pathAdded = realpath in self._list_added(self.repo)
+ if self.paranoid and not pathAdded:
+ self._force_source(path)
+
+ def _list_added(self, root):
+ assert os.path.exists(root)
+ assert os.access(root, os.X_OK)
+ root = os.path.realpath(root)
+ status,output,error = self._u_invoke_client('inventory', '--source',
+ '--both', '--all', root)
+ inv_str = output.rstrip('\n')
+ return [os.path.join(root, p) for p in inv_str.split('\n')]
+
+ def _add_dir_rule(self, rule, dirname, root):
+ inv_path = os.path.join(dirname, '.arch-inventory')
+ f = codecs.open(inv_path, 'a', self.encoding)
+ f.write(rule)
+ f.close()
+ if os.path.realpath(inv_path) not in self._list_added(root):
+ paranoid = self.paranoid
+ self.paranoid = False
+ self.add(inv_path)
+ self.paranoid = paranoid
+
+ def _force_source(self, path):
+ rule = 'source %s\n' % self._u_rel_path(path)
+ self._add_dir_rule(rule, os.path.dirname(path), self.repo)
+ if os.path.realpath(path) not in self._list_added(self.repo):
+ raise CantAddFile(path)
+
+ def _vcs_remove(self, path):
+ if self._vcs_is_versioned(path):
+ self._u_invoke_client('delete-id', path)
+ arch_ids = os.path.join(self.repo, path, '.arch-ids')
+ if os.path.exists(arch_ids):
+ shutil.rmtree(arch_ids)
+
+ def _vcs_update(self, path):
+ self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618
+
+ def _vcs_is_versioned(self, path):
+ if '.arch-ids' in path:
+ return False
+ return True
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ else:
+ relpath = self._file_find(path, revision, relpath=True)
+ return base.VCS._vcs_get_file_contents(self, relpath)
+
+ def _file_find(self, path, revision, relpath=False):
+ try:
+ status,output,error = \
+ self._invoke_client(
+ 'file-find', '--unescaped', path, revision)
+ path = output.rstrip('\n').splitlines()[-1]
+ except CommandError, e:
+ if e.status == 2 \
+ and 'illegally formed changeset index' in e.stderr:
+ raise NotImplementedError(
+"""Outstanding tla bug, see
+ https://bugs.launchpad.net/ubuntu/+source/tla/+bug/513472
+""")
+ raise
+ if relpath == True:
+ return path
+ return os.path.abspath(os.path.join(self.repo, path))
+
+ def _vcs_path(self, id, revision):
+ return self._u_find_id(id, revision)
+
+ def _vcs_isdir(self, path, revision):
+ abspath = self._file_find(path, revision)
+ return os.path.isdir(abspath)
+
+ def _vcs_listdir(self, path, revision):
+ abspath = self._file_find(path, revision)
+ return [p for p in os.listdir(abspath) if self._vcs_is_versioned(p)]
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ if allow_empty == False:
+ # arch applies empty commits without complaining, so check first
+ status,output,error = self._u_invoke_client('changes',expect=(0,1))
+ if status == 0:
+ # work around http://mercurial.selenic.com/bts/issue618
+ time.sleep(1)
+ for path in self.__updated:
+ os.utime(os.path.join(self.repo, path), None)
+ self.__updated = []
+ status,output,error = self._u_invoke_client('changes',expect=(0,1))
+ if status == 0:
+ # end work around
+ raise base.EmptyCommit()
+ summary,body = self._u_parse_commitfile(commitfile)
+ args = ['commit', '--summary', summary]
+ if body != None:
+ args.extend(['--log-message',body])
+ status,output,error = self._u_invoke_client(*args)
+ revision = None
+ revline = re.compile('[*] committed (.*)')
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revpath = match.groups()[0]
+ assert not " " in revpath, revpath
+ assert revpath.startswith(self._archive_project_name()+'--')
+ revision = revpath[len(self._archive_project_name()+'--'):]
+ return revpath
+
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client('logs')
+ logs = output.splitlines()
+ first_log = logs.pop(0)
+ assert first_log == 'base-0', first_log
+ try:
+ if index > 0:
+ log = logs[index-1]
+ elif index < 0:
+ log = logs[index]
+ else:
+ return None
+ except IndexError:
+ return None
+ return '%s--%s' % (self._archive_project_name(), log)
+
+ def _diff(self, revision):
+ status,output,error = self._u_invoke_client(
+ 'diff', '--summary', '--unescaped', revision, expect=(0,1))
+ return output
+
+ def _parse_diff(self, diff_text):
+ """
+ Example diff text:
+
+ * local directory is at ...
+ * build pristine tree for ...
+ * from import revision: ...
+ * patching for revision: ...
+ * comparing to ...
+ D .be/dir/bugs/.arch-ids/moved.id
+ D .be/dir/bugs/.arch-ids/removed.id
+ D .be/dir/bugs/moved
+ D .be/dir/bugs/removed
+ A .be/dir/bugs/.arch-ids/moved2.id
+ A .be/dir/bugs/.arch-ids/new.id
+ A .be/dir/bugs/moved2
+ A .be/dir/bugs/new
+ A {arch}/bugs-everywhere/bugs-everywhere--mainline/...
+ M .be/dir/bugs/modified
+ """
+ new = []
+ modified = []
+ removed = []
+ lines = diff_text.splitlines()
+ for i,line in enumerate(lines):
+ if line.startswith('* ') or '/.arch-ids/' in line:
+ continue
+ change,file = line.split(' ',1)
+ if file.startswith('{arch}/'):
+ continue
+ if change == 'A':
+ new.append(file)
+ elif change == 'M':
+ modified.append(file)
+ elif change == 'D':
+ removed.append(file)
+ return (new,modified,removed)
+
+ def _vcs_changed(self, revision):
+ return self._parse_diff(self._diff(revision))
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Arch, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
new file mode 100644
index 0000000..d85c94d
--- /dev/null
+++ b/libbe/storage/vcs/base.py
@@ -0,0 +1,1127 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Alexander Belchenko <bialix@ukr.net>
+# Ben Finney <benf@cybersource.com.au>
+# Chris Ball <cjb@laptop.org>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Define the base :class:`VCS` (Version Control System) class, which
+should be subclassed by other Version Control System backends. The
+base class implements a "do not version" VCS.
+"""
+
+import codecs
+import os
+import os.path
+import re
+import shutil
+import sys
+import tempfile
+import types
+
+import libbe
+import libbe.storage
+import libbe.storage.base
+import libbe.util.encoding
+from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
+from libbe.util.utility import Dir, search_parent_directories
+from libbe.util.subproc import CommandError, invoke
+from libbe.util.plugin import import_by_name
+import libbe.storage.util.upgrade as upgrade
+
+if libbe.TESTING == True:
+ import unittest
+ import doctest
+
+ import libbe.ui.util.user
+
+VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
+"""List VCS modules in order of preference.
+
+Don't list this module, it is implicitly last.
+"""
+
+def set_preferred_vcs(name):
+ """Manipulate :data:`VCS_ORDER` to place `name` first.
+
+ This is primarily indended for testing purposes.
+ """
+ global VCS_ORDER
+ assert name in VCS_ORDER, \
+ 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
+ VCS_ORDER.remove(name)
+ VCS_ORDER.insert(0, name)
+
+def _get_matching_vcs(matchfn):
+ """Return the first module for which matchfn(VCS_instance) is True.
+
+ Searches in :data:`VCS_ORDER`.
+ """
+ for submodname in VCS_ORDER:
+ module = import_by_name('libbe.storage.vcs.%s' % submodname)
+ vcs = module.new()
+ if matchfn(vcs) == True:
+ return vcs
+ return VCS()
+
+def vcs_by_name(vcs_name):
+ """Return the module for the VCS with the given name.
+
+ Searches in :data:`VCS_ORDER`.
+ """
+ if vcs_name == VCS.name:
+ return new()
+ return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
+
+def detect_vcs(dir):
+ """Return an VCS instance for the vcs being used in this directory.
+
+ Searches in :data:`VCS_ORDER`.
+ """
+ return _get_matching_vcs(lambda vcs: vcs._detect(dir))
+
+def installed_vcs():
+ """Return an instance of an installed VCS.
+
+ Searches in :data:`VCS_ORDER`.
+ """
+ return _get_matching_vcs(lambda vcs: vcs.installed())
+
+
+class VCSNotRooted (libbe.storage.base.ConnectionError):
+ def __init__(self, vcs):
+ msg = 'VCS not rooted'
+ libbe.storage.base.ConnectionError.__init__(self, msg)
+ self.vcs = vcs
+
+class VCSUnableToRoot (libbe.storage.base.ConnectionError):
+ def __init__(self, vcs):
+ msg = 'VCS unable to root'
+ libbe.storage.base.ConnectionError.__init__(self, msg)
+ self.vcs = vcs
+
+class InvalidPath (InvalidID):
+ def __init__(self, path, root, msg=None, **kwargs):
+ if msg == None:
+ msg = 'Path "%s" not in root "%s"' % (path, root)
+ InvalidID.__init__(self, msg=msg, **kwargs)
+ self.path = path
+ self.root = root
+
+class SpacerCollision (InvalidPath):
+ def __init__(self, path, spacer):
+ msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
+ InvalidPath.__init__(self, path, root=None, msg=msg)
+ self.spacer = spacer
+
+class NoSuchFile (InvalidID):
+ def __init__(self, pathname, root='.'):
+ path = os.path.abspath(os.path.join(root, pathname))
+ InvalidID.__init__(self, 'No such file: %s' % path)
+
+
+class CachedPathID (object):
+ """Cache Storage ID <-> path policy.
+
+ Paths generated following::
+
+ .../.be/BUGDIR/bugs/BUG/comments/COMMENT
+ ^-- root path
+
+ See :mod:`libbe.util.id` for a discussion of ID formats.
+
+ Examples
+ --------
+
+ >>> dir = Dir()
+ >>> os.mkdir(os.path.join(dir.path, '.be'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
+ ... 'w').close()
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
+ ... 'w').close()
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
+ ... 'w').close()
+ >>> c = CachedPathID()
+ >>> c.root(dir.path)
+ >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
+ 'def/values'
+ >>> c.init()
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc', 'id-cache']
+ >>> c.connect()
+ >>> c.path('123/values') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/values'
+ >>> c.disconnect()
+ >>> c.destroy()
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc']
+ >>> c.connect() # demonstrate auto init
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc', 'id-cache']
+ >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
+ u'.../.be/xyz'
+ >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
+ u'.../.be/xyz/def'
+ >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/comments/qrs'
+ >>> c.disconnect()
+ >>> c.connect()
+ >>> c.path('qrs') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/comments/qrs'
+ >>> c.remove_id('qrs')
+ >>> c.path('qrs')
+ Traceback (most recent call last):
+ ...
+ InvalidID: qrs in revision None
+ >>> c.disconnect()
+ >>> c.destroy()
+ >>> dir.cleanup()
+ """
+ def __init__(self, encoding=None):
+ self.encoding = libbe.util.encoding.get_filesystem_encoding()
+ self._spacer_dirs = ['.be', 'bugs', 'comments']
+
+ def root(self, path):
+ self._root = os.path.abspath(path).rstrip(os.path.sep)
+ self._cache_path = os.path.join(
+ self._root, self._spacer_dirs[0], 'id-cache')
+
+ def init(self, verbose=True, cache=None):
+ """Create cache file for an existing .be directory.
+
+ The file contains multiple lines of the form::
+
+ UUID\tPATH
+ """
+ if cache == None:
+ self._cache = {}
+ else:
+ self._cache = cache
+ spaced_root = os.path.join(self._root, self._spacer_dirs[0])
+ for dirpath, dirnames, filenames in os.walk(spaced_root):
+ if dirpath == spaced_root:
+ continue
+ try:
+ id = self.id(dirpath)
+ relpath = dirpath[len(self._root)+1:]
+ if id.count('/') == 0:
+ if verbose == True and id in self._cache:
+ print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
+ self._cache[id] = relpath
+ except InvalidPath:
+ pass
+ if self._cache != cache:
+ self._changed = True
+ if cache == None:
+ self.disconnect()
+
+ def destroy(self):
+ if os.path.exists(self._cache_path):
+ os.remove(self._cache_path)
+
+ def connect(self):
+ if not os.path.exists(self._cache_path):
+ try:
+ self.init()
+ except IOError:
+ raise libbe.storage.base.ConnectionError
+ self._cache = {} # key: uuid, value: path
+ self._changed = False
+ f = codecs.open(self._cache_path, 'r', self.encoding)
+ for line in f:
+ fields = line.rstrip('\n').split('\t')
+ self._cache[fields[0]] = fields[1]
+ f.close()
+
+ def disconnect(self):
+ if self._changed == True:
+ f = codecs.open(self._cache_path, 'w', self.encoding)
+ for uuid,path in self._cache.items():
+ f.write('%s\t%s\n' % (uuid, path))
+ f.close()
+ self._cache = {}
+
+ def path(self, id, relpath=False):
+ fields = id.split('/', 1)
+ uuid = fields[0]
+ if len(fields) == 1:
+ extra = []
+ else:
+ extra = fields[1:]
+ if uuid not in self._cache:
+ self.init(verbose=False, cache=self._cache)
+ if uuid not in self._cache:
+ raise InvalidID(uuid)
+ if relpath == True:
+ return os.path.join(self._cache[uuid], *extra)
+ return os.path.join(self._root, self._cache[uuid], *extra)
+
+ def add_id(self, id, parent=None):
+ if id.count('/') > 0:
+ # not a UUID-level path
+ assert id.startswith(parent), \
+ 'Strange ID: "%s" should start with "%s"' % (id, parent)
+ path = self.path(id)
+ elif id in self._cache:
+ # already added
+ path = self.path(id)
+ else:
+ if parent == None:
+ parent_path = ''
+ spacer = self._spacer_dirs[0]
+ else:
+ assert parent.count('/') == 0, \
+ 'Strange parent ID: "%s" should be UUID' % parent
+ parent_path = self.path(parent, relpath=True)
+ parent_spacer = parent_path.split(os.path.sep)[-2]
+ i = self._spacer_dirs.index(parent_spacer)
+ spacer = self._spacer_dirs[i+1]
+ path = os.path.join(parent_path, spacer, id)
+ self._cache[id] = path
+ self._changed = True
+ path = os.path.join(self._root, path)
+ return path
+
+ def remove_id(self, id):
+ if id.count('/') > 0:
+ return # not a UUID-level path
+ self._cache.pop(id)
+ self._changed = True
+
+ def id(self, path):
+ path = os.path.join(self._root, path)
+ if not path.startswith(self._root + os.path.sep):
+ raise InvalidPath(path, self._root)
+ path = path[len(self._root)+1:]
+ orig_path = path
+ if not path.startswith(self._spacer_dirs[0] + os.path.sep):
+ raise InvalidPath(path, self._spacer_dirs[0])
+ for spacer in self._spacer_dirs:
+ if not path.startswith(spacer + os.path.sep):
+ break
+ id = path[len(spacer)+1:]
+ fields = path[len(spacer)+1:].split(os.path.sep,1)
+ if len(fields) == 1:
+ break
+ path = fields[1]
+ for spacer in self._spacer_dirs:
+ if id.endswith(os.path.sep + spacer):
+ raise SpacerCollision(orig_path, spacer)
+ if os.path.sep != '/':
+ id = id.replace(os.path.sep, '/')
+ return id
+
+
+def new():
+ return VCS()
+
+class VCS (libbe.storage.base.VersionedStorage):
+ """Implement a 'no-VCS' interface.
+
+ Support for other VCSs can be added by subclassing this class, and
+ overriding methods `_vcs_*()` with code appropriate for your VCS.
+
+ The methods `_u_*()` are utility methods available to the `_vcs_*()`
+ methods.
+ """
+ name = 'None'
+ client = 'false' # command-line tool for _u_invoke_client
+
+ def __init__(self, *args, **kwargs):
+ if 'encoding' not in kwargs:
+ kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
+ libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
+ self.versioned = False
+ self.interspersed_vcs_files = False
+ self.verbose_invoke = False
+ self._cached_path_id = CachedPathID()
+ self._rooted = False
+
+ def _vcs_version(self):
+ """
+ Return the VCS version string.
+ """
+ return '0'
+
+ def _vcs_get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return None.
+ """
+ return None
+
+ def _vcs_detect(self, path=None):
+ """
+ Detect whether a directory is revision controlled with this VCS.
+ """
+ return True
+
+ def _vcs_root(self, path):
+ """
+ Get the VCS root. This is the default working directory for
+ future invocations. You would normally set this to the root
+ directory for your VCS.
+ """
+ if os.path.isdir(path) == False:
+ path = os.path.dirname(path)
+ if path == '':
+ path = os.path.abspath('.')
+ return path
+
+ def _vcs_init(self, path):
+ """
+ Begin versioning the tree based at path.
+ """
+ pass
+
+ def _vcs_destroy(self):
+ """
+ Remove any files used in versioning (e.g. whatever _vcs_init()
+ created).
+ """
+ pass
+
+ def _vcs_add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ pass
+
+ def _vcs_exists(self, path, revision=None):
+ """
+ Does the path exist in a given revision? (True/False)
+ """
+ raise NotImplementedError('Lazy BE developers')
+
+ def _vcs_remove(self, path):
+ """
+ Remove the file at path from version control. Optionally
+ remove the file from the filesystem as well.
+ """
+ pass
+
+ def _vcs_update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ pass
+
+ def _vcs_is_versioned(self, path):
+ """
+ Return true if a path is under version control, False
+ otherwise. You only need to set this if the VCS goes about
+ dumping VCS-specific files into the .be directory.
+
+ If you do need to implement this method (e.g. Arch), set
+ self.interspersed_vcs_files = True
+ """
+ assert self.interspersed_vcs_files == False
+ raise NotImplementedError
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ """
+ Get the file contents as they were in a given revision.
+ Revision==None specifies the current revision.
+ """
+ if revision != None:
+ raise libbe.storage.base.InvalidRevision(
+ 'The %s VCS does not support revision specifiers' % self.name)
+ path = os.path.join(self.repo, path)
+ if not os.path.exists(path):
+ return libbe.util.InvalidObject
+ if os.path.isdir(path):
+ return libbe.storage.base.InvalidDirectory
+ f = open(path, 'rb')
+ contents = f.read()
+ f.close()
+ return contents
+
+ def _vcs_path(self, id, revision):
+ """
+ Return the relative path to object id as of revision.
+
+ Revision will not be None.
+ """
+ raise NotImplementedError
+
+ def _vcs_isdir(self, path, revision):
+ """
+ Return True if path (as returned by _vcs_path) was a directory
+ as of revision, False otherwise.
+
+ Revision will not be None.
+ """
+ raise NotImplementedError
+
+ def _vcs_listdir(self, path, revision):
+ """
+ Return a list of the contents of the directory path (as
+ returned by _vcs_path) as of revision.
+
+ Revision will not be None, and ._vcs_isdir(path, revision)
+ will be True.
+ """
+ raise NotImplementedError
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ """
+ Commit the current working directory, using the contents of
+ commitfile as the comment. Return the name of the old
+ revision (or None if commits are not supported).
+
+ If allow_empty == False, raise EmptyCommit if there are no
+ changes to commit.
+ """
+ return None
+
+ def _vcs_revision_id(self, index):
+ """
+ Return the name of the <index>th revision. Index will be an
+ integer (possibly <= 0). The choice of which branch to follow
+ when crossing branches/merges is not defined.
+
+ Return None if revision IDs are not supported, or if the
+ specified revision does not exist.
+ """
+ return None
+
+ def _vcs_changed(self, revision):
+ """
+ Return a tuple of lists of ids
+ (new, modified, removed)
+ from the specified revision to the current situation.
+ """
+ return ([], [], [])
+
+ def version(self):
+ # Cache version string for efficiency.
+ if not hasattr(self, '_version'):
+ self._version = self._get_version()
+ return self._version
+
+ def _get_version(self):
+ try:
+ ret = self._vcs_version()
+ return ret
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ return None
+ else:
+ raise OSError, e
+ except CommandError:
+ return None
+
+ def installed(self):
+ if self.version() != None:
+ return True
+ return False
+
+ def get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return None.
+ You can override the automatic lookup procedure by setting the
+ VCS.user_id attribute to a string of your choice.
+ """
+ if not hasattr(self, 'user_id'):
+ self.user_id = self._vcs_get_user_id()
+ return self.user_id
+
+ def _detect(self, path='.'):
+ """
+ Detect whether a directory is revision controlled with this VCS.
+ """
+ return self._vcs_detect(path)
+
+ def root(self):
+ """Set the root directory to the path's VCS root.
+
+ This is the default working directory for future invocations.
+ Consider the following usage case:
+
+ You have a project rooted in::
+
+ /path/to/source/
+
+ by which I mean the VCS repository is in, for example::
+
+ /path/to/source/.bzr
+
+ However, you're of in some subdirectory like::
+
+ /path/to/source/ui/testing
+
+ and you want to comment on a bug. `root` will locate your VCS
+ root (``/path/to/source/``) and set the repo there. This
+ means that it doesn't matter where you are in your project
+ tree when you call "be COMMAND", it always acts as if you called
+ it from the VCS root.
+ """
+ if self._detect(self.repo) == False:
+ raise VCSUnableToRoot(self)
+ root = self._vcs_root(self.repo)
+ self.repo = os.path.abspath(root)
+ if os.path.isdir(self.repo) == False:
+ self.repo = os.path.dirname(self.repo)
+ self.be_dir = os.path.join(
+ self.repo, self._cached_path_id._spacer_dirs[0])
+ self._cached_path_id.root(self.repo)
+ self._rooted = True
+
+ def _init(self):
+ """
+ Begin versioning the tree based at self.repo.
+ Also roots the vcs at path.
+
+ See Also
+ --------
+ root : called if the VCS has already been initialized.
+ """
+ if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
+ raise VCSUnableToRoot(self)
+ if self._vcs_detect(self.repo) == False:
+ self._vcs_init(self.repo)
+ if self._rooted == False:
+ self.root()
+ os.mkdir(self.be_dir)
+ self._vcs_add(self._u_rel_path(self.be_dir))
+ self._setup_storage_version()
+ self._cached_path_id.init()
+
+ def _destroy(self):
+ self._vcs_destroy()
+ self._cached_path_id.destroy()
+ if os.path.exists(self.be_dir):
+ shutil.rmtree(self.be_dir)
+
+ def _connect(self):
+ if self._rooted == False:
+ self.root()
+ if not os.path.isdir(self.be_dir):
+ raise libbe.storage.base.ConnectionError(self)
+ self._cached_path_id.connect()
+ self.check_storage_version()
+
+ def _disconnect(self):
+ self._cached_path_id.disconnect()
+
+ def path(self, id, revision=None, relpath=True):
+ if revision == None:
+ path = self._cached_path_id.path(id)
+ if relpath == True:
+ return self._u_rel_path(path)
+ return path
+ path = self._vcs_path(id, revision)
+ if relpath == True:
+ return path
+ return os.path.join(self.repo, path)
+
+ def _add_path(self, path, directory=False):
+ relpath = self._u_rel_path(path)
+ reldirs = relpath.split(os.path.sep)
+ if directory == False:
+ reldirs = reldirs[:-1]
+ dir = self.repo
+ for reldir in reldirs:
+ dir = os.path.join(dir, reldir)
+ if not os.path.exists(dir):
+ os.mkdir(dir)
+ self._vcs_add(self._u_rel_path(dir))
+ elif not os.path.isdir(dir):
+ raise libbe.storage.base.InvalidDirectory
+ if directory == False:
+ if not os.path.exists(path):
+ open(path, 'w').close()
+ self._vcs_add(self._u_rel_path(path))
+
+ def _add(self, id, parent=None, **kwargs):
+ path = self._cached_path_id.add_id(id, parent)
+ self._add_path(path, **kwargs)
+
+ def _exists(self, id, revision=None):
+ if revision == None:
+ try:
+ path = self.path(id, revision, relpath=False)
+ except InvalidID, e:
+ return False
+ return os.path.exists(path)
+ path = self.path(id, revision, relpath=True)
+ return self._vcs_exists(relpath, revision)
+
+ def _remove(self, id):
+ path = self._cached_path_id.path(id)
+ if os.path.exists(path):
+ if os.path.isdir(path) and len(self.children(id)) > 0:
+ raise libbe.storage.base.DirectoryNotEmpty(id)
+ self._vcs_remove(self._u_rel_path(path))
+ if os.path.exists(path):
+ if os.path.isdir(path):
+ os.rmdir(path)
+ else:
+ os.remove(path)
+ self._cached_path_id.remove_id(id)
+
+ def _recursive_remove(self, id):
+ path = self._cached_path_id.path(id)
+ for dirpath,dirnames,filenames in os.walk(path, topdown=False):
+ filenames.extend(dirnames)
+ for f in filenames:
+ fullpath = os.path.join(dirpath, f)
+ if os.path.exists(fullpath) == False:
+ continue
+ self._vcs_remove(self._u_rel_path(fullpath))
+ if os.path.exists(path):
+ shutil.rmtree(path)
+ path = self._cached_path_id.path(id, relpath=True)
+ for id,p in self._cached_path_id._cache.items():
+ if p.startswith(path):
+ self._cached_path_id.remove_id(id)
+
+ def _ancestors(self, id=None, revision=None):
+ if id==None:
+ path = self.be_dir
+ else:
+ path = self.path(id, revision, relpath=False)
+ ancestors = []
+ while True:
+ if not path.startswith(self.repo + os.path.sep):
+ break
+ path = os.path.dirname(path)
+ try:
+ id = self._u_path_to_id(path)
+ ancestors.append(id)
+ except (SpacerCollision, InvalidPath):
+ pass
+ return ancestors
+
+ def _children(self, id=None, revision=None):
+ if revision == None:
+ isdir = os.path.isdir
+ listdir = os.listdir
+ else:
+ isdir = lambda path : self._vcs_isdir(
+ self._u_rel_path(path), revision)
+ listdir = lambda path : self._vcs_listdir(
+ self._u_rel_path(path), revision)
+ if id==None:
+ path = self.be_dir
+ else:
+ path = self.path(id, revision, relpath=False)
+ if isdir(path) == False:
+ return []
+ children = listdir(path)
+ for i,c in enumerate(children):
+ if c in self._cached_path_id._spacer_dirs:
+ children[i] = None
+ children.extend([os.path.join(c, c2) for c2 in
+ listdir(os.path.join(path, c))])
+ elif c in ['id-cache', 'version']:
+ children[i] = None
+ elif self.interspersed_vcs_files \
+ and self._vcs_is_versioned(c) == False:
+ children[i] = None
+ for i,c in enumerate(children):
+ if c == None: continue
+ cpath = os.path.join(path, c)
+ if self.interspersed_vcs_files == True \
+ and revision != None \
+ and self._vcs_is_versioned(cpath) == False:
+ children[i] = None
+ else:
+ children[i] = self._u_path_to_id(cpath)
+ children[i]
+ return [c for c in children if c != None]
+
+ def _get(self, id, default=libbe.util.InvalidObject, revision=None):
+ try:
+ relpath = self.path(id, revision, relpath=True)
+ contents = self._vcs_get_file_contents(relpath, revision)
+ except InvalidID, e:
+ if default == libbe.util.InvalidObject:
+ raise e
+ return default
+ if contents in [libbe.storage.base.InvalidDirectory,
+ libbe.util.InvalidObject] \
+ or len(contents) == 0:
+ if default == libbe.util.InvalidObject:
+ raise InvalidID(id, revision)
+ return default
+ return contents
+
+ def _set(self, id, value):
+ try:
+ path = self._cached_path_id.path(id)
+ except InvalidID, e:
+ raise
+ if not os.path.exists(path):
+ raise InvalidID(id)
+ if os.path.isdir(path):
+ raise libbe.storage.base.InvalidDirectory(id)
+ f = open(path, "wb")
+ f.write(value)
+ f.close()
+ self._vcs_update(self._u_rel_path(path))
+
+ def _commit(self, summary, body=None, allow_empty=False):
+ summary = summary.strip()+'\n'
+ if body is not None:
+ summary += '\n' + body.strip() + '\n'
+ descriptor, filename = tempfile.mkstemp()
+ revision = None
+ try:
+ temp_file = os.fdopen(descriptor, 'wb')
+ temp_file.write(summary)
+ temp_file.flush()
+ revision = self._vcs_commit(filename, allow_empty=allow_empty)
+ temp_file.close()
+ finally:
+ os.remove(filename)
+ return revision
+
+ def revision_id(self, index=None):
+ if index == None:
+ return None
+ try:
+ if int(index) != index:
+ raise InvalidRevision(index)
+ except ValueError:
+ raise InvalidRevision(index)
+ revid = self._vcs_revision_id(index)
+ if revid == None:
+ raise libbe.storage.base.InvalidRevision(index)
+ return revid
+
+ def changed(self, revision):
+ new,mod,rem = self._vcs_changed(revision)
+ def paths_to_ids(paths):
+ for p in paths:
+ try:
+ id = self._u_path_to_id(p)
+ yield id
+ except (SpacerCollision, InvalidPath):
+ pass
+ new_id = list(paths_to_ids(new))
+ mod_id = list(paths_to_ids(mod))
+ rem_id = list(paths_to_ids(rem))
+ return (new_id, mod_id, rem_id)
+
+ def _u_any_in_string(self, list, string):
+ """Return True if any of the strings in list are in string.
+ Otherwise return False.
+ """
+ for list_string in list:
+ if list_string in string:
+ return True
+ return False
+
+ def _u_invoke(self, *args, **kwargs):
+ if 'cwd' not in kwargs:
+ kwargs['cwd'] = self.repo
+ if 'verbose' not in kwargs:
+ kwargs['verbose'] = self.verbose_invoke
+ if 'encoding' not in kwargs:
+ kwargs['encoding'] = self.encoding
+ return invoke(*args, **kwargs)
+
+ def _u_invoke_client(self, *args, **kwargs):
+ cl_args = [self.client]
+ cl_args.extend(args)
+ return self._u_invoke(cl_args, **kwargs)
+
+ def _u_search_parent_directories(self, path, filename):
+ """Find the file (or directory) named filename in path or in any of
+ path's parents.
+
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
+ """
+ try:
+ ret = search_parent_directories(path, filename)
+ except AssertionError, e:
+ return None
+ return ret
+
+ def _u_find_id_from_manifest(self, id, manifest, revision=None):
+ """Search for the relative path to id using manifest, a list of all
+ files.
+
+ Returns None if the id is not found.
+ """
+ be_dir = self._cached_path_id._spacer_dirs[0]
+ be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
+ files = [f for f in manifest if f.startswith(be_dir_sep)]
+ for file in files:
+ if not file.startswith(be_dir+os.path.sep):
+ continue
+ parts = file.split(os.path.sep)
+ dir = parts.pop(0) # don't add the first spacer dir
+ for part in parts[:-1]:
+ dir = os.path.join(dir, part)
+ if not dir in files:
+ files.append(dir)
+ for file in files:
+ try:
+ p_id = self._u_path_to_id(file)
+ if p_id == id:
+ return file
+ except (SpacerCollision, InvalidPath):
+ pass
+ raise InvalidID(id, revision=revision)
+
+ def _u_find_id(self, id, revision):
+ """Search for the relative path to id as of revision.
+
+ Returns None if the id is not found.
+ """
+ assert self._rooted == True
+ be_dir = self._cached_path_id._spacer_dirs[0]
+ stack = [(be_dir, be_dir)]
+ while len(stack) > 0:
+ path,long_id = stack.pop()
+ if long_id.endswith('/'+id):
+ return path
+ if self._vcs_isdir(path, revision) == False:
+ continue
+ for child in self._vcs_listdir(path, revision):
+ stack.append((os.path.join(path, child),
+ '/'.join([long_id, child])))
+ raise InvalidID(id, revision=revision)
+
+ def _u_path_to_id(self, path):
+ return self._cached_path_id.id(path)
+
+ def _u_rel_path(self, path, root=None):
+ """Return the relative path to path from root.
+
+ Examples:
+
+ >>> vcs = new()
+ >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
+ '.be'
+ >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
+ '.'
+ >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
+ '.'
+ >>> vcs._u_rel_path("./a", ".")
+ 'a'
+ """
+ if root == None:
+ if self.repo == None:
+ raise VCSNotRooted(self)
+ root = self.repo
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if path in [absRoot, absRootSlashedDir]:
+ return '.'
+ if not path.startswith(absRootSlashedDir):
+ raise InvalidPath(path, absRootSlashedDir)
+ relpath = path[len(absRootSlashedDir):]
+ return relpath
+
+ def _u_abspath(self, path, root=None):
+ """Return the absolute path from a path realtive to root.
+
+ Examples
+ --------
+
+ >>> vcs = new()
+ >>> vcs._u_abspath(".be", "/a.b/c")
+ '/a.b/c/.be'
+ """
+ if root == None:
+ assert self.repo != None, "VCS not rooted"
+ root = self.repo
+ return os.path.abspath(os.path.join(root, path))
+
+ def _u_parse_commitfile(self, commitfile):
+ """Split the commitfile created in self.commit() back into summary and
+ header lines.
+ """
+ f = codecs.open(commitfile, 'r', self.encoding)
+ summary = f.readline()
+ body = f.read()
+ body.lstrip('\n')
+ if len(body) == 0:
+ body = None
+ f.close()
+ return (summary, body)
+
+ def check_storage_version(self):
+ version = self.storage_version()
+ if version != libbe.storage.STORAGE_VERSION:
+ upgrade.upgrade(self.repo, version)
+
+ def storage_version(self, revision=None, path=None):
+ """Return the storage version of the on-disk files.
+
+ See Also
+ --------
+ :mod:`libbe.storage.util.upgrade`
+ """
+ if path == None:
+ path = os.path.join(self.repo, '.be', 'version')
+ if not os.path.exists(path):
+ raise libbe.storage.InvalidStorageVersion(None)
+ if revision == None: # don't require connection
+ return libbe.util.encoding.get_file_contents(
+ path, decode=True).rstrip('\n')
+ relpath = self._u_rel_path(path)
+ contents = self._vcs_get_file_contents(relpath, revision=revision)
+ if type(contents) != types.UnicodeType:
+ contents = unicode(contents, self.encoding)
+ return contents.strip()
+
+ def _setup_storage_version(self):
+ """
+ Requires disk access.
+ """
+ assert self._rooted == True
+ path = os.path.join(self.be_dir, 'version')
+ if not os.path.exists(path):
+ libbe.util.encoding.set_file_contents(path,
+ libbe.storage.STORAGE_VERSION+'\n')
+ self._vcs_add(self._u_rel_path(path))
+
+
+if libbe.TESTING == True:
+ class VCSTestCase (unittest.TestCase):
+ """
+ Test cases for base VCS class (in addition to the Storage test
+ cases).
+ """
+
+ Class = VCS
+
+ def __init__(self, *args, **kwargs):
+ super(VCSTestCase, self).__init__(*args, **kwargs)
+ self.dirname = None
+
+ def setUp(self):
+ """Set up test fixtures for Storage test case."""
+ super(VCSTestCase, self).setUp()
+ self.dir = Dir()
+ self.dirname = self.dir.path
+ self.s = self.Class(repo=self.dirname)
+ if self.s.installed() == True:
+ self.s.init()
+ self.s.connect()
+
+ def tearDown(self):
+ super(VCSTestCase, self).tearDown()
+ if self.s.installed() == True:
+ self.s.disconnect()
+ self.s.destroy()
+ self.dir.cleanup()
+
+ class VCS_installed_TestCase (VCSTestCase):
+ def test_installed(self):
+ """See if the VCS is installed.
+ """
+ self.failUnless(self.s.installed() == True,
+ '%(name)s VCS not found' % vars(self.Class))
+
+
+ class VCS_detection_TestCase (VCSTestCase):
+ def test_detection(self):
+ """See if the VCS detects its installed repository
+ """
+ if self.s.installed():
+ self.s.disconnect()
+ self.failUnless(self.s._detect(self.dirname) == True,
+ 'Did not detected %(name)s VCS after initialising'
+ % vars(self.Class))
+ self.s.connect()
+
+ def test_no_detection(self):
+ """See if the VCS detects its installed repository
+ """
+ if self.s.installed() and self.Class.name != 'None':
+ self.s.disconnect()
+ self.s.destroy()
+ self.failUnless(self.s._detect(self.dirname) == False,
+ 'Detected %(name)s VCS before initialising'
+ % vars(self.Class))
+ self.s.init()
+ self.s.connect()
+
+ def test_vcs_repo_in_specified_root_path(self):
+ """VCS root directory should be in specified root path."""
+ rp = os.path.realpath(self.s.repo)
+ dp = os.path.realpath(self.dirname)
+ vcs_name = self.Class.name
+ self.failUnless(
+ dp == rp or rp == None,
+ "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
+
+ class VCS_get_user_id_TestCase(VCSTestCase):
+ """Test cases for VCS.get_user_id method."""
+
+ def test_gets_existing_user_id(self):
+ """Should get the existing user ID."""
+ if self.s.installed():
+ user_id = self.s.get_user_id()
+ if user_id == None:
+ return
+ name,email = libbe.ui.util.user.parse_user_id(user_id)
+ if email != None:
+ self.failUnless('@' in email, email)
+
+ def make_vcs_testcase_subclasses(vcs_class, namespace):
+ c = vcs_class()
+ if c.installed():
+ if c.versioned == True:
+ libbe.storage.base.make_versioned_storage_testcase_subclasses(
+ vcs_class, namespace)
+ else:
+ libbe.storage.base.make_storage_testcase_subclasses(
+ vcs_class, namespace)
+
+ if namespace != sys.modules[__name__]:
+ # Make VCSTestCase subclasses for vcs_class in the namespace.
+ vcs_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, VCSTestCase) \
+ and c.Class == VCS]
+
+ for base_class in vcs_testcase_classes:
+ testcase_class_name = vcs_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = vcs_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+
+ make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py
new file mode 100644
index 0000000..5a62968
--- /dev/null
+++ b/libbe/storage/vcs/bzr.py
@@ -0,0 +1,361 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# Marien Zwart <marienz@gentoo.org>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Bazaar_ (bzr) backend.
+
+.. _Bazaar: http://bazaar.canonical.com/
+"""
+
+try:
+ import bzrlib
+ import bzrlib.branch
+ import bzrlib.builtins
+ import bzrlib.config
+ import bzrlib.errors
+ import bzrlib.option
+except ImportError:
+ bzrlib = None
+import os
+import os.path
+import re
+import shutil
+import StringIO
+import sys
+import types
+
+import libbe
+import base
+
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+
+def new():
+ return Bzr()
+
+class Bzr(base.VCS):
+ """:class:`base.VCS` implementation for Bazaar.
+ """
+ name = 'bzr'
+ client = None # bzrlib module
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+
+ def _vcs_version(self):
+ if bzrlib == None:
+ return None
+ return bzrlib.__version__
+
+ def version_cmp(self, *args):
+ """Compare the installed Bazaar version `V_i` with another version
+ `V_o` (given in `*args`). Returns
+
+ === ===============
+ 1 if `V_i > V_o`
+ 0 if `V_i == V_o`
+ -1 if `V_i < V_o`
+ === ===============
+
+ Examples
+ --------
+
+ >>> b = Bzr(repo='.')
+ >>> b._vcs_version = lambda : "2.3.1 (release)"
+ >>> b.version_cmp(2,3,1)
+ 0
+ >>> b.version_cmp(2,3,2)
+ -1
+ >>> b.version_cmp(2,3,0)
+ 1
+ >>> b.version_cmp(3)
+ -1
+ >>> b._vcs_version = lambda : "2.0.0pre2"
+ >>> b._parsed_version = None
+ >>> b.version_cmp(3)
+ -1
+ >>> b.version_cmp(2,0,1)
+ Traceback (most recent call last):
+ ...
+ NotImplementedError: Cannot parse non-integer portion "0pre2" of Bzr version "2.0.0pre2"
+ """
+ if not hasattr(self, '_parsed_version') \
+ or self._parsed_version == None:
+ num_part = self._vcs_version().split(' ')[0]
+ self._parsed_version = []
+ for num in num_part.split('.'):
+ try:
+ self._parsed_version.append(int(num))
+ except ValueError, e:
+ self._parsed_version.append(num)
+ for current,other in zip(self._parsed_version, args):
+ if type(current) != types.IntType:
+ raise NotImplementedError(
+ 'Cannot parse non-integer portion "%s" of Bzr version "%s"'
+ % (current, self._vcs_version()))
+ c = cmp(current,other)
+ if c != 0:
+ return c
+ return 0
+
+ def _vcs_get_user_id(self):
+ # excerpted from bzrlib.builtins.cmd_whoami.run()
+ try:
+ c = bzrlib.branch.Branch.open_containing(self.repo)[0].get_config()
+ except errors.NotBranchError:
+ c = bzrlib.config.GlobalConfig()
+ return c.username()
+
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, '.bzr') != None :
+ return True
+ return False
+
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ cmd = bzrlib.builtins.cmd_root()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(filename=path)
+ return cmd.outf.getvalue().rstrip('\n')
+
+ def _vcs_init(self, path):
+ cmd = bzrlib.builtins.cmd_init()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(location=path)
+
+ def _vcs_destroy(self):
+ vcs_dir = os.path.join(self.repo, '.bzr')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+
+ def _vcs_add(self, path):
+ path = os.path.join(self.repo, path)
+ cmd = bzrlib.builtins.cmd_add()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(file_list=[path], file_ids_from=self.repo)
+
+ def _vcs_exists(self, path, revision=None):
+ manifest = self._vcs_listdir(
+ self.repo, revision=revision, recursive=True)
+ if path in manifest:
+ return True
+ return False
+
+ def _vcs_remove(self, path):
+ # --force to also remove unversioned files.
+ path = os.path.join(self.repo, path)
+ cmd = bzrlib.builtins.cmd_remove()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(file_list=[path], file_deletion_strategy='force')
+
+ def _vcs_update(self, path):
+ pass
+
+ def _parse_revision_string(self, revision=None):
+ if revision == None:
+ return revision
+ rev_opt = bzrlib.option.Option.OPTIONS['revision']
+ try:
+ rev_spec = rev_opt.type(revision)
+ except bzrlib.errors.NoSuchRevisionSpec:
+ raise base.InvalidRevision(revision)
+ return rev_spec
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ path = os.path.join(self.repo, path)
+ revision = self._parse_revision_string(revision)
+ cmd = bzrlib.builtins.cmd_cat()
+ cmd.outf = StringIO.StringIO()
+ if self.version_cmp(1,6,0) < 0:
+ # old bzrlib cmd_cat uses sys.stdout not self.outf for output.
+ stdout = sys.stdout
+ sys.stdout = cmd.outf
+ try:
+ cmd.run(filename=path, revision=revision)
+ except bzrlib.errors.BzrCommandError, e:
+ if 'not present in revision' in str(e):
+ raise base.InvalidPath(path, root=self.repo, revision=revision)
+ raise
+ finally:
+ if self.version_cmp(2,0,0) < 0:
+ cmd.outf = sys.stdout
+ sys.stdout = stdout
+ return cmd.outf.getvalue()
+
+ def _vcs_path(self, id, revision):
+ manifest = self._vcs_listdir(
+ self.repo, revision=revision, recursive=True)
+ return self._u_find_id_from_manifest(id, manifest, revision=revision)
+
+ def _vcs_isdir(self, path, revision):
+ try:
+ self._vcs_listdir(path, revision)
+ except AttributeError, e:
+ if 'children' in str(e):
+ return False
+ raise
+ return True
+
+ def _vcs_listdir(self, path, revision, recursive=False):
+ path = os.path.join(self.repo, path)
+ revision = self._parse_revision_string(revision)
+ cmd = bzrlib.builtins.cmd_ls()
+ cmd.outf = StringIO.StringIO()
+ try:
+ if self.version_cmp(2,0,0) >= 0:
+ cmd.run(revision=revision, path=path, recursive=recursive)
+ else:
+ # Pre-2.0 Bazaar (non_recursive)
+ # + working around broken non_recursive+path implementation
+ # (https://bugs.launchpad.net/bzr/+bug/158690)
+ cmd.run(revision=revision, path=path,
+ non_recursive=False)
+ except bzrlib.errors.BzrCommandError, e:
+ if 'not present in revision' in str(e):
+ raise base.InvalidPath(path, root=self.repo, revision=revision)
+ raise
+ children = cmd.outf.getvalue().rstrip('\n').splitlines()
+ children = [self._u_rel_path(c, path) for c in children]
+ if self.version_cmp(2,0,0) < 0 and recursive == False:
+ children = [c for c in children if os.path.sep not in c]
+ return children
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ cmd = bzrlib.builtins.cmd_commit()
+ cmd.outf = StringIO.StringIO()
+ cwd = os.getcwd()
+ os.chdir(self.repo)
+ try:
+ cmd.run(file=commitfile, unchanged=allow_empty)
+ except bzrlib.errors.BzrCommandError, e:
+ strings = ['no changes to commit.', # bzr 1.3.1
+ 'No changes to commit.'] # bzr 1.15.1
+ if self._u_any_in_string(strings, str(e)) == True:
+ raise base.EmptyCommit()
+ raise
+ finally:
+ os.chdir(cwd)
+ return self._vcs_revision_id(-1)
+
+ def _vcs_revision_id(self, index):
+ cmd = bzrlib.builtins.cmd_revno()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(location=self.repo)
+ current_revision = int(cmd.outf.getvalue())
+ if index > current_revision or index < -current_revision:
+ return None
+ if index >= 0:
+ return str(index) # bzr commit 0 is the empty tree.
+ return str(current_revision+index+1)
+
+ def _diff(self, revision):
+ revision = self._parse_revision_string(revision)
+ cmd = bzrlib.builtins.cmd_diff()
+ cmd.outf = StringIO.StringIO()
+ # for some reason, cmd_diff uses sys.stdout not self.outf for output.
+ stdout = sys.stdout
+ sys.stdout = cmd.outf
+ try:
+ status = cmd.run(revision=revision, file_list=[self.repo])
+ finally:
+ sys.stdout = stdout
+ assert status in [0,1], "Invalid status %d" % status
+ return cmd.outf.getvalue()
+
+ def _parse_diff(self, diff_text):
+ """_parse_diff(diff_text) -> (new,modified,removed)
+
+ `new`, `modified`, and `removed` are lists of files.
+
+ Example diff text::
+
+ === modified file 'dir/changed'
+ --- dir/changed 2010-01-16 01:54:53 +0000
+ +++ dir/changed 2010-01-16 01:54:54 +0000
+ @@ -1,3 +1,3 @@
+ hi
+ -there
+ +everyone and
+ joe
+
+ === removed file 'dir/deleted'
+ --- dir/deleted 2010-01-16 01:54:53 +0000
+ +++ dir/deleted 1970-01-01 00:00:00 +0000
+ @@ -1,3 +0,0 @@
+ -in
+ -the
+ -beginning
+
+ === removed file 'dir/moved'
+ --- dir/moved 2010-01-16 01:54:53 +0000
+ +++ dir/moved 1970-01-01 00:00:00 +0000
+ @@ -1,4 +0,0 @@
+ -the
+ -ants
+ -go
+ -marching
+
+ === added file 'dir/moved2'
+ --- dir/moved2 1970-01-01 00:00:00 +0000
+ +++ dir/moved2 2010-01-16 01:54:34 +0000
+ @@ -0,0 +1,4 @@
+ +the
+ +ants
+ +go
+ +marching
+
+ === added file 'dir/new'
+ --- dir/new 1970-01-01 00:00:00 +0000
+ +++ dir/new 2010-01-16 01:54:54 +0000
+ @@ -0,0 +1,2 @@
+ +hello
+ +world
+
+ """
+ new = []
+ modified = []
+ removed = []
+ for line in diff_text.splitlines():
+ if not line.startswith('=== '):
+ continue
+ fields = line.split()
+ action = fields[1]
+ file = fields[-1].strip("'")
+ if action == 'added':
+ new.append(file)
+ elif action == 'modified':
+ modified.append(file)
+ elif action == 'removed':
+ removed.append(file)
+ return (new,modified,removed)
+
+ def _vcs_changed(self, revision):
+ return self._parse_diff(self._diff(revision))
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/darcs.py b/libbe/storage/vcs/darcs.py
new file mode 100644
index 0000000..4a21888
--- /dev/null
+++ b/libbe/storage/vcs/darcs.py
@@ -0,0 +1,399 @@
+# Copyright (C) 2009-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Darcs_ backend.
+
+.. _Darcs: http://darcs.net/
+"""
+
+import codecs
+import os
+import re
+import shutil
+import sys
+import time # work around http://mercurial.selenic.com/bts/issue618
+import types
+try: # import core module, Python >= 2.5
+ from xml.etree import ElementTree
+except ImportError: # look for non-core module
+ from elementtree import ElementTree
+from xml.sax.saxutils import unescape
+
+import libbe
+import base
+
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+
+def new():
+ return Darcs()
+
+class Darcs(base.VCS):
+ """:class:`base.VCS` implementation for Darcs.
+ """
+ name='darcs'
+ client='darcs'
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+ self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618
+
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client('--version')
+ return output.strip()
+
+ def version_cmp(self, *args):
+ """Compare the installed Darcs version `V_i` with another version
+ `V_o` (given in `*args`). Returns
+
+ === ===============
+ 1 if `V_i > V_o`
+ 0 if `V_i == V_o`
+ -1 if `V_i < V_o`
+ === ===============
+
+ Examples
+ --------
+
+ >>> d = Darcs(repo='.')
+ >>> d._vcs_version = lambda : "2.3.1 (release)"
+ >>> d.version_cmp(2,3,1)
+ 0
+ >>> d.version_cmp(2,3,2)
+ -1
+ >>> d.version_cmp(2,3,0)
+ 1
+ >>> d.version_cmp(3)
+ -1
+ >>> d._vcs_version = lambda : "2.0.0pre2"
+ >>> d._parsed_version = None
+ >>> d.version_cmp(3)
+ -1
+ >>> d.version_cmp(2,0,1)
+ Traceback (most recent call last):
+ ...
+ NotImplementedError: Cannot parse non-integer portion "0pre2" of Darcs version "2.0.0pre2"
+ """
+ if not hasattr(self, '_parsed_version') \
+ or self._parsed_version == None:
+ num_part = self._vcs_version().split(' ')[0]
+ self._parsed_version = []
+ for num in num_part.split('.'):
+ try:
+ self._parsed_version.append(int(num))
+ except ValueError, e:
+ self._parsed_version.append(num)
+ for current,other in zip(self._parsed_version, args):
+ if type(current) != types.IntType:
+ raise NotImplementedError(
+ 'Cannot parse non-integer portion "%s" of Darcs version "%s"'
+ % (current, self._vcs_version()))
+ c = cmp(current,other)
+ if c != 0:
+ return c
+ return 0
+
+ def _vcs_get_user_id(self):
+ # following http://darcs.net/manual/node4.html#SECTION00410030000000000000
+ # as of June 29th, 2009
+ if self.repo == None:
+ return None
+ darcs_dir = os.path.join(self.repo, '_darcs')
+ if darcs_dir != None:
+ for pref_file in ['author', 'email']:
+ pref_path = os.path.join(darcs_dir, 'prefs', pref_file)
+ if os.path.exists(pref_path):
+ return self.get_file_contents(pref_path)
+ for env_variable in ['DARCS_EMAIL', 'EMAIL']:
+ if env_variable in os.environ:
+ return os.environ[env_variable]
+ return None
+
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, "_darcs") != None :
+ return True
+ return False
+
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ darcs_dir = self._u_search_parent_directories(path, '_darcs')
+ if darcs_dir == None:
+ return None
+ return os.path.dirname(darcs_dir)
+
+ def _vcs_init(self, path):
+ self._u_invoke_client('init', cwd=path)
+
+ def _vcs_destroy(self):
+ vcs_dir = os.path.join(self.repo, '_darcs')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+
+ def _vcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client('add', path)
+
+ def _vcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ os.remove(os.path.join(self.repo, path)) # darcs notices removal
+
+ def _vcs_update(self, path):
+ self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618
+ pass # darcs notices changes
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ if self.version_cmp(2, 0, 0) == 1:
+ status,output,error = self._u_invoke_client( \
+ 'show', 'contents', '--patch', revision, path)
+ return output
+ # Darcs versions < 2.0.0pre2 lack the 'show contents' command
+
+ patch = self._diff(revision, path=path, unicode_output=False)
+
+ # '--output -' to be supported in GNU patch > 2.5.9
+ # but that hasn't been released as of June 30th, 2009.
+
+ # Rewrite path to status before the patch we want
+ args=['patch', '--reverse', path]
+ status,output,error = self._u_invoke(args, stdin=patch)
+
+ if os.path.exists(os.path.join(self.repo, path)) == True:
+ contents = base.VCS._vcs_get_file_contents(self, path)
+ else:
+ contents = ''
+
+ # Now restore path to it's current incarnation
+ args=['patch', path]
+ status,output,error = self._u_invoke(args, stdin=patch)
+ return contents
+
+ def _vcs_path(self, id, revision):
+ return self._u_find_id(id, revision)
+
+ def _vcs_isdir(self, path, revision):
+ if self.version_cmp(2, 3, 1) == 1:
+ # Sun Nov 15 20:32:06 EST 2009 thomashartman1@gmail.com
+ # * add versioned show files functionality (darcs show files -p 'some patch')
+ status,output,error = self._u_invoke_client( \
+ 'show', 'files', '--no-files', '--patch', revision)
+ children = output.rstrip('\n').splitlines()
+ rpath = '.'
+ children = [self._u_rel_path(c, rpath) for c in children]
+ if path in children:
+ return True
+ return False
+ raise NotImplementedError(
+ 'Darcs versions <= 2.3.1 lack the --patch option for "show files"')
+
+ def _vcs_listdir(self, path, revision):
+ if self.version_cmp(2, 3, 1) == 1:
+ # Sun Nov 15 20:32:06 EST 2009 thomashartman1@gmail.com
+ # * add versioned show files functionality (darcs show files -p 'some patch')
+ # Wed Dec 9 05:42:21 EST 2009 Luca Molteni <volothamp@gmail.com>
+ # * resolve issue835 show file with file directory arguments
+ path = path.rstrip(os.path.sep)
+ status,output,error = self._u_invoke_client( \
+ 'show', 'files', '--patch', revision, path)
+ files = output.rstrip('\n').splitlines()
+ if path == '.':
+ descendents = [self._u_rel_path(f, path) for f in files
+ if f != '.']
+ else:
+ descendents = [self._u_rel_path(f, path) for f in files
+ if f.startswith(path)]
+ return [f for f in descendents if f.count(os.path.sep) == 0]
+ # Darcs versions <= 2.3.1 lack the --patch option for 'show files'
+ raise NotImplementedError
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ id = self.get_user_id()
+ if id == None or '@' not in id:
+ id = '%s <%s@invalid.com>' % (id, id)
+ args = ['record', '--all', '--author', id, '--logfile', commitfile]
+ status,output,error = self._u_invoke_client(*args)
+ empty_strings = ['No changes!']
+ # work around http://mercurial.selenic.com/bts/issue618
+ if self._u_any_in_string(empty_strings, output) == True \
+ and len(self.__updated) > 0:
+ time.sleep(1)
+ for path in self.__updated:
+ os.utime(os.path.join(self.repo, path), None)
+ status,output,error = self._u_invoke_client(*args)
+ self.__updated = []
+ # end work around
+ if self._u_any_in_string(empty_strings, output) == True:
+ if allow_empty == False:
+ raise base.EmptyCommit()
+ # note that darcs does _not_ make an empty revision.
+ # this returns the last non-empty revision id...
+ revision = self._vcs_revision_id(-1)
+ else:
+ revline = re.compile("Finished recording patch '(.*)'")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+
+ def _revisions(self):
+ """
+ Return a list of revisions in the repository.
+ """
+ status,output,error = self._u_invoke_client('changes', '--xml')
+ revisions = []
+ xml_str = output.encode('unicode_escape').replace(r'\n', '\n')
+ element = ElementTree.XML(xml_str)
+ assert element.tag == 'changelog', element.tag
+ for patch in element.getchildren():
+ assert patch.tag == 'patch', patch.tag
+ for child in patch.getchildren():
+ if child.tag == 'name':
+ text = unescape(unicode(child.text).decode('unicode_escape').strip())
+ revisions.append(text)
+ revisions.reverse()
+ return revisions
+
+ def _vcs_revision_id(self, index):
+ revisions = self._revisions()
+ try:
+ if index > 0:
+ return revisions[index-1]
+ elif index < 0:
+ return revisions[index]
+ else:
+ return None
+ except IndexError:
+ return None
+
+ def _diff(self, revision, path=None, unicode_output=True):
+ revisions = self._revisions()
+ i = revisions.index(revision)
+ args = ['diff', '--unified']
+ if i+1 < len(revisions):
+ next_rev = revisions[i+1]
+ args.extend(['--from-patch', next_rev])
+ if path != None:
+ args.append(path)
+ kwargs = {'unicode_output':unicode_output}
+ status,output,error = self._u_invoke_client(
+ *args, **kwargs)
+ return output
+
+ def _parse_diff(self, diff_text):
+ """_parse_diff(diff_text) -> (new,modified,removed)
+
+ `new`, `modified`, and `removed` are lists of files.
+
+ Example diff text::
+
+ Mon Jan 18 15:19:30 EST 2010 None <None@invalid.com>
+ * Final state
+ diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/modified new-BEtestgQtDuD/.be/dir/bugs/modified
+ --- old-BEtestgQtDuD/.be/dir/bugs/modified 2010-01-18 15:19:30.000000000 -0500
+ +++ new-BEtestgQtDuD/.be/dir/bugs/modified 2010-01-18 15:19:30.000000000 -0500
+ @@ -1 +1 @@
+ -some value to be modified
+ \ No newline at end of file
+ +a new value
+ \ No newline at end of file
+ diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/moved new-BEtestgQtDuD/.be/dir/bugs/moved
+ --- old-BEtestgQtDuD/.be/dir/bugs/moved 2010-01-18 15:19:30.000000000 -0500
+ +++ new-BEtestgQtDuD/.be/dir/bugs/moved 1969-12-31 19:00:00.000000000 -0500
+ @@ -1 +0,0 @@
+ -this entry will be moved
+ \ No newline at end of file
+ diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/moved2 new-BEtestgQtDuD/.be/dir/bugs/moved2
+ --- old-BEtestgQtDuD/.be/dir/bugs/moved2 1969-12-31 19:00:00.000000000 -0500
+ +++ new-BEtestgQtDuD/.be/dir/bugs/moved2 2010-01-18 15:19:30.000000000 -0500
+ @@ -0,0 +1 @@
+ +this entry will be moved
+ \ No newline at end of file
+ diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/new new-BEtestgQtDuD/.be/dir/bugs/new
+ --- old-BEtestgQtDuD/.be/dir/bugs/new 1969-12-31 19:00:00.000000000 -0500
+ +++ new-BEtestgQtDuD/.be/dir/bugs/new 2010-01-18 15:19:30.000000000 -0500
+ @@ -0,0 +1 @@
+ +this entry is new
+ \ No newline at end of file
+ diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/removed new-BEtestgQtDuD/.be/dir/bugs/removed
+ --- old-BEtestgQtDuD/.be/dir/bugs/removed 2010-01-18 15:19:30.000000000 -0500
+ +++ new-BEtestgQtDuD/.be/dir/bugs/removed 1969-12-31 19:00:00.000000000 -0500
+ @@ -1 +0,0 @@
+ -this entry will be deleted
+ \ No newline at end of file
+
+ """
+ new = []
+ modified = []
+ removed = []
+ lines = diff_text.splitlines()
+ repodir = os.path.basename(self.repo) + os.path.sep
+ i = 0
+ while i < len(lines):
+ line = lines[i]; i += 1
+ if not line.startswith('diff '):
+ continue
+ file_a,file_b = line.split()[-2:]
+ assert file_a.startswith('old-'), \
+ 'missformed file_a %s' % file_a
+ assert file_b.startswith('new-'), \
+ 'missformed file_a %s' % file_b
+ file = file_a[4:]
+ assert file_b[4:] == file, \
+ 'diff file missmatch %s != %s' % (file_a, file_b)
+ assert file.startswith(repodir), \
+ 'missformed file_a %s' % file_a
+ file = file[len(repodir):]
+ lines_added = 0
+ lines_removed = 0
+ line = lines[i]; i += 1
+ assert line.startswith('--- old-'), \
+ 'missformed "---" line %s' % line
+ time_a = line.split('\t')[1]
+ line = lines[i]; i += 1
+ assert line.startswith('+++ new-'), \
+ 'missformed "+++" line %s' % line
+ time_b = line.split('\t')[1]
+ zero_time = time.strftime('%Y-%m-%d %H:%M:%S.000000000 ',
+ time.localtime(0))
+ # note that zero_time is missing the trailing timezone offset
+ if time_a.startswith(zero_time):
+ new.append(file)
+ elif time_b.startswith(zero_time):
+ removed.append(file)
+ else:
+ modified.append(file)
+ return (new,modified,removed)
+
+ def _vcs_changed(self, revision):
+ return self._parse_diff(self._diff(revision))
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/git.py b/libbe/storage/vcs/git.py
new file mode 100644
index 0000000..4df9bc8
--- /dev/null
+++ b/libbe/storage/vcs/git.py
@@ -0,0 +1,269 @@
+# Copyright (C) 2008-2010 Ben Finney <benf@cybersource.com.au>
+# Chris Ball <cjb@laptop.org>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Git_ backend.
+
+.. _Git: http://git-scm.com/
+"""
+
+import os
+import os.path
+import re
+import shutil
+import unittest
+
+import libbe
+import libbe.ui.util.user
+import base
+
+if libbe.TESTING == True:
+ import doctest
+ import sys
+
+
+def new():
+ return Git()
+
+class Git(base.VCS):
+ """:class:`base.VCS` implementation for Git.
+ """
+ name='git'
+ client='git'
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client('--version')
+ return output.strip()
+
+ def _vcs_get_user_id(self):
+ status,output,error = \
+ self._u_invoke_client('config', 'user.name', expect=(0,1))
+ if status == 0:
+ name = output.rstrip('\n')
+ else:
+ name = ''
+ status,output,error = \
+ self._u_invoke_client('config', 'user.email', expect=(0,1))
+ if status == 0:
+ email = output.rstrip('\n')
+ else:
+ email = ''
+ if name != '' or email != '': # got something!
+ # guess missing info, if necessary
+ if name == '':
+ name = libbe.ui.util.user.get_fallback_username()
+ if email == '':
+ email = libe.ui.util.user.get_fallback_email()
+ return libbe.ui.util.user.create_user_id(name, email)
+ return None # Git has no infomation
+
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, '.git') != None :
+ return True
+ return False
+
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ status,output,error = self._u_invoke_client('rev-parse', '--git-dir',
+ cwd=path)
+ gitdir = os.path.join(path, output.rstrip('\n'))
+ dirname = os.path.abspath(os.path.dirname(gitdir))
+ return dirname
+
+ def _vcs_init(self, path):
+ self._u_invoke_client('init', cwd=path)
+
+ def _vcs_destroy(self):
+ vcs_dir = os.path.join(self.repo, '.git')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+
+ def _vcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client('add', path)
+
+ def _vcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ self._u_invoke_client('rm', '-f', path)
+
+ def _vcs_update(self, path):
+ self._vcs_add(path)
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ else:
+ arg = '%s:%s' % (revision,path)
+ status,output,error = self._u_invoke_client('show', arg)
+ return output
+
+ def _vcs_path(self, id, revision):
+ return self._u_find_id(id, revision)
+
+ def _vcs_isdir(self, path, revision):
+ arg = '%s:%s' % (revision,path)
+ args = ['ls-tree', arg]
+ kwargs = {'expect':(0,128)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status != 0:
+ if 'not a tree object' in error:
+ return False
+ raise base.CommandError(args, status, stderr=error)
+ return True
+
+ def _vcs_listdir(self, path, revision):
+ arg = '%s:%s' % (revision,path)
+ status,output,error = self._u_invoke_client(
+ 'ls-tree', '--name-only', arg)
+ return output.rstrip('\n').splitlines()
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ args = ['commit', '--all', '--file', commitfile]
+ if allow_empty == True:
+ args.append('--allow-empty')
+ status,output,error = self._u_invoke_client(*args)
+ else:
+ kwargs = {'expect':(0,1)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ strings = ['nothing to commit',
+ 'nothing added to commit']
+ if self._u_any_in_string(strings, output) == True:
+ raise base.EmptyCommit()
+ full_revision = self._vcs_revision_id(-1)
+ assert full_revision[:7] in output, \
+ 'Mismatched revisions:\n%s\n%s' % (full_revision, output)
+ return full_revision
+
+ def _vcs_revision_id(self, index):
+ args = ['rev-list', '--first-parent', '--reverse', 'HEAD']
+ kwargs = {'expect':(0,128)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status == 128:
+ if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
+ return None
+ raise base.CommandError(args, status, stderr=error)
+ revisions = output.splitlines()
+ try:
+ if index > 0:
+ return revisions[index-1]
+ elif index < 0:
+ return revisions[index]
+ else:
+ return None
+ except IndexError:
+ return None
+
+ def _diff(self, revision):
+ status,output,error = self._u_invoke_client('diff', revision)
+ return output
+
+ def _parse_diff(self, diff_text):
+ """_parse_diff(diff_text) -> (new,modified,removed)
+
+ `new`, `modified`, and `removed` are lists of files.
+
+ Example diff text::
+
+ diff --git a/dir/changed b/dir/changed
+ index 6c3ea8c..2f2f7c7 100644
+ --- a/dir/changed
+ +++ b/dir/changed
+ @@ -1,3 +1,3 @@
+ hi
+ -there
+ +everyone and
+ joe
+ diff --git a/dir/deleted b/dir/deleted
+ deleted file mode 100644
+ index 225ec04..0000000
+ --- a/dir/deleted
+ +++ /dev/null
+ @@ -1,3 +0,0 @@
+ -in
+ -the
+ -beginning
+ diff --git a/dir/moved b/dir/moved
+ deleted file mode 100644
+ index 5ef102f..0000000
+ --- a/dir/moved
+ +++ /dev/null
+ @@ -1,4 +0,0 @@
+ -the
+ -ants
+ -go
+ -marching
+ diff --git a/dir/moved2 b/dir/moved2
+ new file mode 100644
+ index 0000000..5ef102f
+ --- /dev/null
+ +++ b/dir/moved2
+ @@ -0,0 +1,4 @@
+ +the
+ +ants
+ +go
+ +marching
+ diff --git a/dir/new b/dir/new
+ new file mode 100644
+ index 0000000..94954ab
+ --- /dev/null
+ +++ b/dir/new
+ @@ -0,0 +1,2 @@
+ +hello
+ +world
+ """
+ new = []
+ modified = []
+ removed = []
+ lines = diff_text.splitlines()
+ for i,line in enumerate(lines):
+ if not line.startswith('diff '):
+ continue
+ file_a,file_b = line.split()[-2:]
+ assert file_a.startswith('a/'), \
+ 'missformed file_a %s' % file_a
+ assert file_b.startswith('b/'), \
+ 'missformed file_a %s' % file_b
+ file = file_a[2:]
+ assert file_b[2:] == file, \
+ 'diff file missmatch %s != %s' % (file_a, file_b)
+ if lines[i+1].startswith('new '):
+ new.append(file)
+ elif lines[i+1].startswith('index '):
+ modified.append(file)
+ elif lines[i+1].startswith('deleted '):
+ removed.append(file)
+ return (new,modified,removed)
+
+ def _vcs_changed(self, revision):
+ return self._parse_diff(self._diff(revision))
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Git, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py
new file mode 100644
index 0000000..9378336
--- /dev/null
+++ b/libbe/storage/vcs/hg.py
@@ -0,0 +1,257 @@
+# Copyright (C) 2007-2010 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Mercurial_ (hg) backend.
+
+.. _Mercurial: http://mercurial.selenic.com/
+"""
+
+try:
+ import mercurial
+ import mercurial.dispatch
+ import mercurial.ui
+except ImportError:
+ mercurial = None
+
+try:
+ # mercurial >= 1.2
+ from mercurial.util import version
+except ImportError:
+ try:
+ # mercurial <= 1.1.2
+ from mercurial.version import get_version as version
+ except ImportError:
+ version = None
+
+import os
+import os.path
+import re
+import shutil
+import StringIO
+import sys
+import time # work around http://mercurial.selenic.com/bts/issue618
+
+import libbe
+import base
+
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+
+def new():
+ return Hg()
+
+class Hg(base.VCS):
+ """:class:`base.VCS` implementation for Mercurial.
+ """
+ name='hg'
+ client=None # mercurial module
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+ self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618
+
+ def _vcs_version(self):
+ if version == None:
+ return None
+ return version()
+
+ def _u_invoke_client(self, *args, **kwargs):
+ if 'cwd' not in kwargs:
+ kwargs['cwd'] = self.repo
+ assert len(kwargs) == 1, kwargs
+ fullargs = ['--cwd', kwargs['cwd']]
+ fullargs.extend(args)
+ stdout = sys.stdout
+ tmp_stdout = StringIO.StringIO()
+ sys.stdout = tmp_stdout
+ cwd = os.getcwd()
+ mercurial.dispatch.dispatch(fullargs)
+ os.chdir(cwd)
+ sys.stdout = stdout
+ return tmp_stdout.getvalue().rstrip('\n')
+
+ def _vcs_get_user_id(self):
+ output = self._u_invoke_client(
+ 'showconfig', 'ui.username').rstrip('\n')
+ if output != '':
+ return output
+ return None
+
+ def _vcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Mercurial"""
+ if self._u_search_parent_directories(path, '.hg') != None:
+ return True
+ return False
+
+ def _vcs_root(self, path):
+ return self._u_invoke_client('root', cwd=path)
+
+ def _vcs_init(self, path):
+ self._u_invoke_client('init', cwd=path)
+
+ def _vcs_destroy(self):
+ vcs_dir = os.path.join(self.repo, '.hg')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+
+ def _vcs_add(self, path):
+ self._u_invoke_client('add', path)
+
+ def _vcs_remove(self, path):
+ self._u_invoke_client('rm', '--force', path)
+
+ def _vcs_update(self, path):
+ self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ else:
+ return self._u_invoke_client('cat', '-r', revision, path)
+
+ def _vcs_path(self, id, revision):
+ manifest = self._u_invoke_client(
+ 'manifest', '--rev', revision).splitlines()
+ return self._u_find_id_from_manifest(id, manifest, revision=revision)
+
+ def _vcs_isdir(self, path, revision):
+ output = self._u_invoke_client('manifest', '--rev', revision)
+ files = output.splitlines()
+ if path in files:
+ return False
+ return True
+
+ def _vcs_listdir(self, path, revision):
+ output = self._u_invoke_client('manifest', '--rev', revision)
+ files = output.splitlines()
+ path = path.rstrip(os.path.sep) + os.path.sep
+ return [self._u_rel_path(f, path) for f in files if f.startswith(path)]
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ args = ['commit', '--logfile', commitfile]
+ output = self._u_invoke_client(*args)
+ # work around http://mercurial.selenic.com/bts/issue618
+ strings = ['nothing changed']
+ if self._u_any_in_string(strings, output) == True \
+ and len(self.__updated) > 0:
+ time.sleep(1)
+ for path in self.__updated:
+ os.utime(os.path.join(self.repo, path), None)
+ output = self._u_invoke_client(*args)
+ self.__updated = []
+ # end work around
+ if allow_empty == False:
+ strings = ['nothing changed']
+ if self._u_any_in_string(strings, output) == True:
+ raise base.EmptyCommit()
+ return self._vcs_revision_id(-1)
+
+ def _vcs_revision_id(self, index, style='id'):
+ if index > 0:
+ index -= 1
+ args = ['identify', '--rev', str(int(index)), '--%s' % style]
+ output = self._u_invoke_client(*args)
+ id = output.strip()
+ if id == '000000000000':
+ return None # before initial commit.
+ return id
+
+ def _diff(self, revision):
+ return self._u_invoke_client(
+ 'diff', '-r', revision, '--git')
+
+ def _parse_diff(self, diff_text):
+ """_parse_diff(diff_text) -> (new,modified,removed)
+
+ `new`, `modified`, and `removed` are lists of files.
+
+ Example diff text::
+
+ diff --git a/.be/dir/bugs/modified b/.be/dir/bugs/modified
+ --- a/.be/dir/bugs/modified
+ +++ b/.be/dir/bugs/modified
+ @@ -1,1 +1,1 @@ some value to be modified
+ -some value to be modified
+ \ No newline at end of file
+ +a new value
+ \ No newline at end of file
+ diff --git a/.be/dir/bugs/moved b/.be/dir/bugs/moved
+ deleted file mode 100644
+ --- a/.be/dir/bugs/moved
+ +++ /dev/null
+ @@ -1,1 +0,0 @@
+ -this entry will be moved
+ \ No newline at end of file
+ diff --git a/.be/dir/bugs/moved2 b/.be/dir/bugs/moved2
+ new file mode 100644
+ --- /dev/null
+ +++ b/.be/dir/bugs/moved2
+ @@ -0,0 +1,1 @@
+ +this entry will be moved
+ \ No newline at end of file
+ diff --git a/.be/dir/bugs/new b/.be/dir/bugs/new
+ new file mode 100644
+ --- /dev/null
+ +++ b/.be/dir/bugs/new
+ @@ -0,0 +1,1 @@
+ +this entry is new
+ \ No newline at end of file
+ diff --git a/.be/dir/bugs/removed b/.be/dir/bugs/removed
+ deleted file mode 100644
+ --- a/.be/dir/bugs/removed
+ +++ /dev/null
+ @@ -1,1 +0,0 @@
+ -this entry will be deleted
+ \ No newline at end of file
+ """
+ new = []
+ modified = []
+ removed = []
+ lines = diff_text.splitlines()
+ for i,line in enumerate(lines):
+ if not line.startswith('diff '):
+ continue
+ file_a,file_b = line.split()[-2:]
+ assert file_a.startswith('a/'), \
+ 'missformed file_a %s' % file_a
+ assert file_b.startswith('b/'), \
+ 'missformed file_a %s' % file_b
+ file = file_a[2:]
+ assert file_b[2:] == file, \
+ 'diff file missmatch %s != %s' % (file_a, file_b)
+ if lines[i+1].startswith('new '):
+ new.append(file)
+ elif lines[i+1].startswith('deleted '):
+ removed.append(file)
+ else:
+ modified.append(file)
+ return (new,modified,removed)
+
+ def _vcs_changed(self, revision):
+ return self._parse_diff(self._diff(revision))
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])