diff options
-rw-r--r-- | libbe/storage/base.py | 16 | ||||
-rw-r--r-- | libbe/storage/vcs/base.py | 57 | ||||
-rw-r--r-- | libbe/storage/vcs/hg.py | 73 |
3 files changed, 82 insertions, 64 deletions
diff --git a/libbe/storage/base.py b/libbe/storage/base.py index 8419796..56b59ba 100644 --- a/libbe/storage/base.py +++ b/libbe/storage/base.py @@ -149,7 +149,7 @@ class Storage (object): return self._init() def _init(self): - f = open(self.repo, 'wb') + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') root = Entry(id='__ROOT__', directory=True) d = {root.id:root} pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1) @@ -162,7 +162,7 @@ class Storage (object): return self._destroy() def _destroy(self): - os.remove(self.repo) + os.remove(os.path.join(self.repo, 'repo.pkl')) def connect(self): """Open a connection to the repository.""" @@ -172,7 +172,7 @@ class Storage (object): def _connect(self): try: - f = open(self.repo, 'rb') + f = open(os.path.join(self.repo, 'repo.pkl'), 'rb') except IOError: raise ConnectionError(self) d = pickle.load(f) @@ -183,7 +183,7 @@ class Storage (object): """Close the connection to the repository.""" if self.is_writeable() == False: return - f = open(self.repo, 'wb') + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') pickle.dump(dict((k,v._objects_to_ids()) for k,v in self._data.items()), f, -1) f.close() @@ -299,7 +299,7 @@ class VersionedStorage (Storage): self.versioned = True def _init(self): - f = open(self.repo, 'wb') + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') root = Entry(id='__ROOT__', directory=True) summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit') body = Entry(id='__COMMIT__BODY__') @@ -310,7 +310,7 @@ class VersionedStorage (Storage): def _connect(self): try: - f = open(self.repo, 'rb') + f = open(os.path.join(self.repo, 'repo.pkl'), 'rb') except IOError: raise ConnectionError(self) d = pickle.load(f) @@ -322,7 +322,7 @@ class VersionedStorage (Storage): """Close the connection to the repository.""" if self.is_writeable() == False: return - f = open(self.repo, 'wb') + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') pickle.dump([dict((k,v._objects_to_ids()) for k,v in t.items()) for t in self._data], f, -1) f.close() @@ -426,7 +426,7 @@ if TESTING == True: super(StorageTestCase, self).setUp() self.dir = Dir() self.dirname = self.dir.path - self.s = self.Class(repo=os.path.join(self.dirname, 'repo.pkl')) + self.s = self.Class(repo=self.dirname) self.assert_failed_connect() self.s.init() self.s.connect() diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py index 8c0ecf5..faa891a 100644 --- a/libbe/storage/vcs/base.py +++ b/libbe/storage/vcs/base.py @@ -83,10 +83,17 @@ def installed_vcs(): return _get_matching_vcs(lambda vcs: vcs.installed()) -class VCSnotRooted (libbe.storage.base.ConnectionError): - def __init__(self): +class VCSNotRooted (libbe.storage.base.ConnectionError): + def __init__(self, vcs): msg = 'VCS not rooted' libbe.storage.base.ConnectionError.__init__(self, msg) + self.vcs = vcs + +class VCSUnableToRoot (libbe.storage.base.ConnectionError): + def __init__(self, vcs): + msg = 'VCS unable to root' + libbe.storage.base.ConnectionError.__init__(self, msg) + self.vcs = vcs class InvalidPath (libbe.storage.base.InvalidID): def __init__(self, path, root, msg=None): @@ -589,7 +596,10 @@ os.listdir(self.get_path("bugs")): Set the root directory to the path's VCS root. This is the default working directory for future invocations. """ - self.repo = os.path.abspath(self._vcs_root(self.repo)) + root = self._vcs_root(self.repo) + if root == None: + raise VCSUnableToRoot(self) + self.repo = os.path.abspath(root) if os.path.isdir(self.repo) == False: self.repo = os.path.dirname(self.repo) self.be_dir = os.path.join( @@ -601,8 +611,8 @@ os.listdir(self.get_path("bugs")): Begin versioning the tree based at self.repo. Also roots the vcs at path. """ + self._vcs_init(self.repo) self.root() - self._vcs_init() os.mkdir(self.be_dir) self._vcs_add(self._u_rel_path(self.be_dir)) self._cached_path_id.init() @@ -798,7 +808,7 @@ os.listdir(self.get_path("bugs")): exception = InvalidPath(path, self.repo) else: use_vcs = False - exception = VCSnotRooted + exception = VCSNotRooted(self) if use_vcs == False and allow_no_vcs==False: raise exception return use_vcs @@ -814,7 +824,7 @@ os.listdir(self.get_path("bugs")): """ if root == None: if self.repo == None: - raise VCSnotRooted + raise VCSNotRooted(self) root = self.repo path = os.path.abspath(path) absRoot = os.path.abspath(root) @@ -832,7 +842,7 @@ os.listdir(self.get_path("bugs")): """ if root == None: if self.repo == None: - raise VCSnotRooted + raise VCSNotRooted(self) root = self.repo path = os.path.abspath(path) absRoot = os.path.abspath(root) @@ -929,6 +939,7 @@ if libbe.TESTING == True: self.s.destroy() self.dir.cleanup() + class VCS_installed_TestCase (VCSTestCase): def test_installed(self): """ See if the VCS is installed. @@ -976,21 +987,23 @@ if libbe.TESTING == True: def test_gets_existing_user_id(self): """Should get the existing user ID.""" - user_id = self.s.get_user_id() - if user_id == None: - return - name,email = libbe.ui.util.user.parse_user_id(user_id) - if email != None: - self.failUnless('@' in email, email) - - def make_vcs_testcase_subclasses(storage_class, namespace): - c = storage_class() - if c.versioned == True: - libbe.storage.base.make_versioned_storage_testcase_subclasses( - storage_class, namespace) - else: - libbe.storage.base.make_storage_testcase_subclasses( - storage_class, namespace) + if self.s.installed(): + user_id = self.s.get_user_id() + if user_id == None: + return + name,email = libbe.ui.util.user.parse_user_id(user_id) + if email != None: + self.failUnless('@' in email, email) + + def make_vcs_testcase_subclasses(vcs_class, namespace): + c = vcs_class() + if c.installed(): + if c.versioned == True: + libbe.storage.base.make_versioned_storage_testcase_subclasses( + vcs_class, namespace) + else: + libbe.storage.base.make_storage_testcase_subclasses( + vcs_class, namespace) if namespace != sys.modules[__name__]: # Make VCSTestCase subclasses for vcs_class in the namespace. diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py index ed27717..67c5bf3 100644 --- a/libbe/storage/vcs/hg.py +++ b/libbe/storage/vcs/hg.py @@ -26,7 +26,7 @@ import re import sys import libbe -import vcs +import base if libbe.TESTING == True: import unittest @@ -36,62 +36,67 @@ if libbe.TESTING == True: def new(): return Hg() -class Hg(vcs.VCS): - name="hg" - client="hg" +class Hg(base.VCS): + name='hg' + client='hg' versioned=True + def _vcs_version(self): - status,output,error = self._u_invoke_client("--version") + status,output,error = self._u_invoke_client('--version') return output + + def _vcs_get_user_id(self): + status,output,error = self._u_invoke_client( + 'showconfig', 'ui.username') + return output.rstrip('\n') + def _vcs_detect(self, path): """Detect whether a directory is revision-controlled using Mercurial""" - if self._u_search_parent_directories(path, ".hg") != None: + if self._u_search_parent_directories(path, '.hg') != None: return True return False + def _vcs_root(self, path): - status,output,error = self._u_invoke_client("root", cwd=path) + status,output,error = self._u_invoke_client( + 'root', expect=(0,255), cwd=path) + if status == 255: + # "abort: There is no Mercurial repository here + # (.hg not found)!" + return None return output.rstrip('\n') + def _vcs_init(self, path): - self._u_invoke_client("init", cwd=path) - def _vcs_get_user_id(self): - status,output,error = self._u_invoke_client("showconfig","ui.username") - return output.rstrip('\n') - def _vcs_set_user_id(self, value): - """ - Supported by the Config Extension, but that is not part of - standard Mercurial. - http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension - """ - raise vcs.SettingIDnotSupported + self._u_invoke_client('init', cwd=path) + def _vcs_add(self, path): - self._u_invoke_client("add", path) + self._u_invoke_client('add', path) + def _vcs_remove(self, path): - self._u_invoke_client("rm", "--force", path) + self._u_invoke_client('rm', '--force', path) + def _vcs_update(self, path): pass - def _vcs_get_file_contents(self, path, revision=None, binary=False): + + def _vcs_get_file_contents(self, path, revision=None): if revision == None: - return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary) + return base.VCS._vcs_get_file_contents(self, path, revision) else: status,output,error = \ - self._u_invoke_client("cat","-r",revision,path) + self._u_invoke_client('cat', '-r', revision, path) return output - def _vcs_duplicate_repo(self, directory, revision=None): - if revision == None: - return vcs.VCS._vcs_duplicate_repo(self, directory, revision) - else: - self._u_invoke_client("archive", "--rev", revision, directory) + def _vcs_commit(self, commitfile, allow_empty=False): args = ['commit', '--logfile', commitfile] status,output,error = self._u_invoke_client(*args) if allow_empty == False: - strings = ["nothing changed"] + strings = ['nothing changed'] if self._u_any_in_string(strings, output) == True: - raise vcs.EmptyCommit() + raise base.EmptyCommit() return self._vcs_revision_id(-1) - def _vcs_revision_id(self, index, style="id"): - args = ["identify", "--rev", str(int(index)), "--%s" % style] - kwargs = {"expect": (0,255)} + + def _vcs_revision_id(self, index, style='id'): + args = ['identify', '--rev', str(int(index)), '--%s' % style] + kwargs = {'expect': (0,255)} status,output,error = self._u_invoke_client(*args, **kwargs) if status == 0: id = output.strip() @@ -102,7 +107,7 @@ class Hg(vcs.VCS): if libbe.TESTING == True: - vcs.make_vcs_testcase_subclasses(Hg, sys.modules[__name__]) + base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__]) unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |