aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2009-12-13 07:20:31 -0500
committerW. Trevor King <wking@drexel.edu>2009-12-13 07:20:31 -0500
commit6f23fccbde4ede1121d5baf35c81932b7c8aa7bb (patch)
tree6457f240162bb45f7b08c3d4bc9d65d6ab53e340 /libbe/storage
parentc2a50865f1ea73f43f2d347b2e7595a484f43e78 (diff)
downloadbugseverywhere-6f23fccbde4ede1121d5baf35c81932b7c8aa7bb.tar.gz
Converted libbe.storage.vcs.hg to new Storage format.
Diffstat (limited to 'libbe/storage')
-rw-r--r--libbe/storage/base.py16
-rw-r--r--libbe/storage/vcs/base.py57
-rw-r--r--libbe/storage/vcs/hg.py73
3 files changed, 82 insertions, 64 deletions
diff --git a/libbe/storage/base.py b/libbe/storage/base.py
index 8419796..56b59ba 100644
--- a/libbe/storage/base.py
+++ b/libbe/storage/base.py
@@ -149,7 +149,7 @@ class Storage (object):
return self._init()
def _init(self):
- f = open(self.repo, 'wb')
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
root = Entry(id='__ROOT__', directory=True)
d = {root.id:root}
pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1)
@@ -162,7 +162,7 @@ class Storage (object):
return self._destroy()
def _destroy(self):
- os.remove(self.repo)
+ os.remove(os.path.join(self.repo, 'repo.pkl'))
def connect(self):
"""Open a connection to the repository."""
@@ -172,7 +172,7 @@ class Storage (object):
def _connect(self):
try:
- f = open(self.repo, 'rb')
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'rb')
except IOError:
raise ConnectionError(self)
d = pickle.load(f)
@@ -183,7 +183,7 @@ class Storage (object):
"""Close the connection to the repository."""
if self.is_writeable() == False:
return
- f = open(self.repo, 'wb')
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
pickle.dump(dict((k,v._objects_to_ids())
for k,v in self._data.items()), f, -1)
f.close()
@@ -299,7 +299,7 @@ class VersionedStorage (Storage):
self.versioned = True
def _init(self):
- f = open(self.repo, 'wb')
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
root = Entry(id='__ROOT__', directory=True)
summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit')
body = Entry(id='__COMMIT__BODY__')
@@ -310,7 +310,7 @@ class VersionedStorage (Storage):
def _connect(self):
try:
- f = open(self.repo, 'rb')
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'rb')
except IOError:
raise ConnectionError(self)
d = pickle.load(f)
@@ -322,7 +322,7 @@ class VersionedStorage (Storage):
"""Close the connection to the repository."""
if self.is_writeable() == False:
return
- f = open(self.repo, 'wb')
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
pickle.dump([dict((k,v._objects_to_ids())
for k,v in t.items()) for t in self._data], f, -1)
f.close()
@@ -426,7 +426,7 @@ if TESTING == True:
super(StorageTestCase, self).setUp()
self.dir = Dir()
self.dirname = self.dir.path
- self.s = self.Class(repo=os.path.join(self.dirname, 'repo.pkl'))
+ self.s = self.Class(repo=self.dirname)
self.assert_failed_connect()
self.s.init()
self.s.connect()
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
index 8c0ecf5..faa891a 100644
--- a/libbe/storage/vcs/base.py
+++ b/libbe/storage/vcs/base.py
@@ -83,10 +83,17 @@ def installed_vcs():
return _get_matching_vcs(lambda vcs: vcs.installed())
-class VCSnotRooted (libbe.storage.base.ConnectionError):
- def __init__(self):
+class VCSNotRooted (libbe.storage.base.ConnectionError):
+ def __init__(self, vcs):
msg = 'VCS not rooted'
libbe.storage.base.ConnectionError.__init__(self, msg)
+ self.vcs = vcs
+
+class VCSUnableToRoot (libbe.storage.base.ConnectionError):
+ def __init__(self, vcs):
+ msg = 'VCS unable to root'
+ libbe.storage.base.ConnectionError.__init__(self, msg)
+ self.vcs = vcs
class InvalidPath (libbe.storage.base.InvalidID):
def __init__(self, path, root, msg=None):
@@ -589,7 +596,10 @@ os.listdir(self.get_path("bugs")):
Set the root directory to the path's VCS root. This is the
default working directory for future invocations.
"""
- self.repo = os.path.abspath(self._vcs_root(self.repo))
+ root = self._vcs_root(self.repo)
+ if root == None:
+ raise VCSUnableToRoot(self)
+ self.repo = os.path.abspath(root)
if os.path.isdir(self.repo) == False:
self.repo = os.path.dirname(self.repo)
self.be_dir = os.path.join(
@@ -601,8 +611,8 @@ os.listdir(self.get_path("bugs")):
Begin versioning the tree based at self.repo.
Also roots the vcs at path.
"""
+ self._vcs_init(self.repo)
self.root()
- self._vcs_init()
os.mkdir(self.be_dir)
self._vcs_add(self._u_rel_path(self.be_dir))
self._cached_path_id.init()
@@ -798,7 +808,7 @@ os.listdir(self.get_path("bugs")):
exception = InvalidPath(path, self.repo)
else:
use_vcs = False
- exception = VCSnotRooted
+ exception = VCSNotRooted(self)
if use_vcs == False and allow_no_vcs==False:
raise exception
return use_vcs
@@ -814,7 +824,7 @@ os.listdir(self.get_path("bugs")):
"""
if root == None:
if self.repo == None:
- raise VCSnotRooted
+ raise VCSNotRooted(self)
root = self.repo
path = os.path.abspath(path)
absRoot = os.path.abspath(root)
@@ -832,7 +842,7 @@ os.listdir(self.get_path("bugs")):
"""
if root == None:
if self.repo == None:
- raise VCSnotRooted
+ raise VCSNotRooted(self)
root = self.repo
path = os.path.abspath(path)
absRoot = os.path.abspath(root)
@@ -929,6 +939,7 @@ if libbe.TESTING == True:
self.s.destroy()
self.dir.cleanup()
+ class VCS_installed_TestCase (VCSTestCase):
def test_installed(self):
"""
See if the VCS is installed.
@@ -976,21 +987,23 @@ if libbe.TESTING == True:
def test_gets_existing_user_id(self):
"""Should get the existing user ID."""
- user_id = self.s.get_user_id()
- if user_id == None:
- return
- name,email = libbe.ui.util.user.parse_user_id(user_id)
- if email != None:
- self.failUnless('@' in email, email)
-
- def make_vcs_testcase_subclasses(storage_class, namespace):
- c = storage_class()
- if c.versioned == True:
- libbe.storage.base.make_versioned_storage_testcase_subclasses(
- storage_class, namespace)
- else:
- libbe.storage.base.make_storage_testcase_subclasses(
- storage_class, namespace)
+ if self.s.installed():
+ user_id = self.s.get_user_id()
+ if user_id == None:
+ return
+ name,email = libbe.ui.util.user.parse_user_id(user_id)
+ if email != None:
+ self.failUnless('@' in email, email)
+
+ def make_vcs_testcase_subclasses(vcs_class, namespace):
+ c = vcs_class()
+ if c.installed():
+ if c.versioned == True:
+ libbe.storage.base.make_versioned_storage_testcase_subclasses(
+ vcs_class, namespace)
+ else:
+ libbe.storage.base.make_storage_testcase_subclasses(
+ vcs_class, namespace)
if namespace != sys.modules[__name__]:
# Make VCSTestCase subclasses for vcs_class in the namespace.
diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py
index ed27717..67c5bf3 100644
--- a/libbe/storage/vcs/hg.py
+++ b/libbe/storage/vcs/hg.py
@@ -26,7 +26,7 @@ import re
import sys
import libbe
-import vcs
+import base
if libbe.TESTING == True:
import unittest
@@ -36,62 +36,67 @@ if libbe.TESTING == True:
def new():
return Hg()
-class Hg(vcs.VCS):
- name="hg"
- client="hg"
+class Hg(base.VCS):
+ name='hg'
+ client='hg'
versioned=True
+
def _vcs_version(self):
- status,output,error = self._u_invoke_client("--version")
+ status,output,error = self._u_invoke_client('--version')
return output
+
+ def _vcs_get_user_id(self):
+ status,output,error = self._u_invoke_client(
+ 'showconfig', 'ui.username')
+ return output.rstrip('\n')
+
def _vcs_detect(self, path):
"""Detect whether a directory is revision-controlled using Mercurial"""
- if self._u_search_parent_directories(path, ".hg") != None:
+ if self._u_search_parent_directories(path, '.hg') != None:
return True
return False
+
def _vcs_root(self, path):
- status,output,error = self._u_invoke_client("root", cwd=path)
+ status,output,error = self._u_invoke_client(
+ 'root', expect=(0,255), cwd=path)
+ if status == 255:
+ # "abort: There is no Mercurial repository here
+ # (.hg not found)!"
+ return None
return output.rstrip('\n')
+
def _vcs_init(self, path):
- self._u_invoke_client("init", cwd=path)
- def _vcs_get_user_id(self):
- status,output,error = self._u_invoke_client("showconfig","ui.username")
- return output.rstrip('\n')
- def _vcs_set_user_id(self, value):
- """
- Supported by the Config Extension, but that is not part of
- standard Mercurial.
- http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
- """
- raise vcs.SettingIDnotSupported
+ self._u_invoke_client('init', cwd=path)
+
def _vcs_add(self, path):
- self._u_invoke_client("add", path)
+ self._u_invoke_client('add', path)
+
def _vcs_remove(self, path):
- self._u_invoke_client("rm", "--force", path)
+ self._u_invoke_client('rm', '--force', path)
+
def _vcs_update(self, path):
pass
- def _vcs_get_file_contents(self, path, revision=None, binary=False):
+
+ def _vcs_get_file_contents(self, path, revision=None):
if revision == None:
- return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
+ return base.VCS._vcs_get_file_contents(self, path, revision)
else:
status,output,error = \
- self._u_invoke_client("cat","-r",revision,path)
+ self._u_invoke_client('cat', '-r', revision, path)
return output
- def _vcs_duplicate_repo(self, directory, revision=None):
- if revision == None:
- return vcs.VCS._vcs_duplicate_repo(self, directory, revision)
- else:
- self._u_invoke_client("archive", "--rev", revision, directory)
+
def _vcs_commit(self, commitfile, allow_empty=False):
args = ['commit', '--logfile', commitfile]
status,output,error = self._u_invoke_client(*args)
if allow_empty == False:
- strings = ["nothing changed"]
+ strings = ['nothing changed']
if self._u_any_in_string(strings, output) == True:
- raise vcs.EmptyCommit()
+ raise base.EmptyCommit()
return self._vcs_revision_id(-1)
- def _vcs_revision_id(self, index, style="id"):
- args = ["identify", "--rev", str(int(index)), "--%s" % style]
- kwargs = {"expect": (0,255)}
+
+ def _vcs_revision_id(self, index, style='id'):
+ args = ['identify', '--rev', str(int(index)), '--%s' % style]
+ kwargs = {'expect': (0,255)}
status,output,error = self._u_invoke_client(*args, **kwargs)
if status == 0:
id = output.strip()
@@ -102,7 +107,7 @@ class Hg(vcs.VCS):
if libbe.TESTING == True:
- vcs.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
+ base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])