aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/hg.py
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2008-11-24 18:29:16 -0500
committerW. Trevor King <wking@drexel.edu>2008-11-24 18:29:16 -0500
commita711ecf10df62e30d83c1941065404c53fecd35b (patch)
tree4111ef606fa52dc7f21ca3eb357ff83fae74fe1e /libbe/hg.py
parentc5d7551e6a6e98bb6da7c7d11360224edfda2f14 (diff)
parent2c3f6c066ceb03ae3579dff029bf01f0b62c1f82 (diff)
downloadbugseverywhere-a711ecf10df62e30d83c1941065404c53fecd35b.tar.gz
Merge from W. Trevor King's tree.
Diffstat (limited to 'libbe/hg.py')
-rw-r--r--libbe/hg.py167
1 files changed, 69 insertions, 98 deletions
diff --git a/libbe/hg.py b/libbe/hg.py
index 35de8e0..27cbb79 100644
--- a/libbe/hg.py
+++ b/libbe/hg.py
@@ -14,102 +14,73 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import tempfile
-
-import config
-from rcs import invoke, CommandError
-
-def invoke_client(*args, **kwargs):
- directory = kwargs['directory']
- expect = kwargs.get('expect', (0, 1))
- cl_args = ["hg"]
- cl_args.extend(args)
- status,output,error = invoke(cl_args, expect, cwd=directory)
- return status, output
-
-def add_id(filename, paranoid=False):
- invoke_client("add", filename, directory='.')
-
-def delete_id(filename):
- invoke_client("rm", filename, directory='.')
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def lookup_revision(revno, directory):
- return invoke_client('log', '--rev', str(revno), '--template={node}',
- directory=directory)[1].rstrip('\n')
-
-def export(revno, directory, revision_dir):
- invoke_client("archive", "--rev", str(revno), revision_dir,
- directory=directory)
-
-def find_or_make_export(revno, directory):
- revision_id = lookup_revision(revno, directory)
- home = os.path.expanduser("~")
- revision_root = os.path.join(home, ".be_revs")
- if not os.path.exists(revision_root):
- os.mkdir(revision_root)
- revision_dir = os.path.join(revision_root, revision_id)
- if not os.path.exists(revision_dir):
- export(revno, directory, revision_dir)
- return revision_dir
-
-def hg_root(path):
- return invoke_client("root", "-R", path, directory=None)[1].rstrip('\r')
-
-def path_in_reference(bug_dir, spec):
- if spec is None:
- spec = int(invoke_client('tip', '--template="{rev}"',
- directory=bug_dir)[1])
- rel_bug_dir = bug_dir[len(hg_root(bug_dir)):]
- export_root = find_or_make_export(spec, directory=bug_dir)
- return os.path.join(export_root, rel_bug_dir)
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using Mercurial"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, ".hg")):
+import re
+import unittest
+import doctest
+
+from rcs import RCS, RCStestCase, CommandError, SettingIDnotSupported
+
+def new():
+ return Hg()
+
+class Hg(RCS):
+ name="hg"
+ client="hg"
+ versioned=True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Mercurial"""
+ if self._u_search_parent_directories(path, ".hg") != None:
return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.dirname(path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- if body is not None:
- summary += '\n' + body
- descriptor, filename = tempfile.mkstemp()
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.close()
- invoke_client('commit', '--logfile', filename, directory=directory)
- finally:
- os.unlink(filename)
-
-def postcommit(directory):
- pass
-
-name = "hg"
+ return False
+ def _rcs_root(self, path):
+ status,output,error = self._u_invoke_client("root", directory=path)
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("showconfig","ui.username")
+ return output.rstrip('\n')
+ def _rcs_set_user_id(self, value):
+ """
+ Supported by the Config Extension, but that is not part of
+ standard Mercurial.
+ http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
+ """
+ raise SettingIDnotSupported
+ def _rcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ self._u_invoke_client("rm", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(os.path.join(self.rootdir, path), "rb").read()
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("archive", "--rev", revision, directory)
+ def _rcs_commit(self, commitfile):
+ self._u_invoke_client('commit', '--logfile', commitfile)
+ status,output,error = self._u_invoke_client('identify')
+ revision = None
+ revline = re.compile("(.*) tip")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+
+class HgTestCase(RCStestCase):
+ Class = Hg
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(HgTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])