diff options
author | W. Trevor King <wking@drexel.edu> | 2009-12-13 06:19:23 -0500 |
---|---|---|
committer | W. Trevor King <wking@drexel.edu> | 2009-12-13 06:19:23 -0500 |
commit | 4d057dab603f42ec40b911dbee6792dcf107bd14 (patch) | |
tree | 9a73459aa160e3c96f4893b132543f412ca6e97f /libbe | |
parent | dff6bd9bf89ca80e2265696a478e540476718c9c (diff) | |
download | bugseverywhere-4d057dab603f42ec40b911dbee6792dcf107bd14.tar.gz |
Converted libbe.storage.vcs.base to new Storage format.
Diffstat (limited to 'libbe')
-rw-r--r-- | libbe/bug.py | 5 | ||||
-rw-r--r-- | libbe/bugdir.py | 12 | ||||
-rw-r--r-- | libbe/command/base.py | 7 | ||||
-rw-r--r-- | libbe/command/list.py | 16 | ||||
-rw-r--r-- | libbe/comment.py | 8 | ||||
-rw-r--r-- | libbe/storage/__init__.py | 14 | ||||
-rw-r--r-- | libbe/storage/base.py | 139 | ||||
-rw-r--r-- | libbe/storage/util/upgrade.py (renamed from libbe/storage/vcs/util/upgrade.py) | 26 | ||||
-rw-r--r-- | libbe/storage/vcs/__init__.py | 10 | ||||
-rw-r--r-- | libbe/storage/vcs/base.py | 1105 | ||||
-rwxr-xr-x | libbe/ui/command_line.py | 20 | ||||
-rw-r--r-- | libbe/ui/util/user.py | 18 | ||||
-rw-r--r-- | libbe/util/id.py | 2 |
13 files changed, 704 insertions, 678 deletions
diff --git a/libbe/bug.py b/libbe/bug.py index d62de49..29d95f5 100644 --- a/libbe/bug.py +++ b/libbe/bug.py @@ -657,8 +657,9 @@ class Bug(settings_object.SavedSettingsObject): parent = self.bugdir.id.storage() else: parent = None - self.storage.add(self.id.storage(), parent=parent) - self.storage.add(self.id.storage('values'), parent=self.id.storage()) + self.storage.add(self.id.storage(), parent=parent, directory=True) + self.storage.add(self.id.storage('values'), parent=self.id.storage(), + directory=False) self.save_settings() if len(self.comment_root) > 0: comment.save_comments(self) diff --git a/libbe/bugdir.py b/libbe/bugdir.py index 5f76d3c..c120482 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -30,7 +30,6 @@ import os.path import time import libbe -import libbe.util.encoding as encoding import libbe.storage as storage from libbe.storage.util.properties import Property, doc_property, \ local_property, defaulting_property, checked_property, \ @@ -175,6 +174,8 @@ class BugDir (list, settings_object.SavedSettingsObject): self.storage = storage self.id = libbe.util.id.ID(self, 'bugdir') if from_storage == True: + self.uuid = [c for c in self.storage.children() + if c != 'version'][0] self.load_settings() else: if uuid == None: @@ -192,8 +193,7 @@ class BugDir (list, settings_object.SavedSettingsObject): self.storage.get(self.id.storage('settings'), default='\n') self.settings = mapfile.parse(settings_mapfile) self._setup_saved_settings() - self._setup_user_id(self.user_id) - self._setup_encoding(self.encoding) + #self._setup_user_id(self.user_id) self._setup_severities(self.severities) self._setup_status(self.active_status, self.inactive_status) @@ -219,8 +219,9 @@ class BugDir (list, settings_object.SavedSettingsObject): happen, so calling this method will just waste time (unless something else has been messing with your stored files). """ - self.storage.add(self.id.storage()) - self.storage.add(self.id.storage('settings'), parent=self.id.storage()) + self.storage.add(self.id.storage(), directory=True) + self.storage.add(self.id.storage('settings'), parent=self.id.storage(), + directory=False) self.save_settings() for bug in self: bug.save() @@ -297,7 +298,6 @@ class BugDir (list, settings_object.SavedSettingsObject): dbd.storage.writeable = False added,changed,removed = self.storage.changed_since(revision) for id in added: - pass for id in removed: pass diff --git a/libbe/command/base.py b/libbe/command/base.py index 252dd24..52f193d 100644 --- a/libbe/command/base.py +++ b/libbe/command/base.py @@ -1,5 +1,6 @@ # Copyright +import codecs import optparse import sys @@ -168,7 +169,7 @@ class Command (object): else: # non-arg options are flags, set to default flag value params[option.name] = False if len(options) > 0: - raise UserError, 'Invalid options passed to command %s:\n %s' \ + raise UserError, 'Invalid option passed to command %s:\n %s' \ % (self.name, '\n '.join(['%s: %s' % (k,v) for k,v in options.items()])) for arg in self.args: @@ -190,9 +191,9 @@ class Command (object): def _setup_io(self, input_encoding=None, output_encoding=None): if input_encoding == None: - input_encoding = libbe.util.get_input_encoding() + input_encoding = libbe.util.encoding.get_input_encoding() if output_encoding == None: - output_encoding = libbe.util.get_output_encoding() + output_encoding = libbe.util.encoding.get_output_encoding() self.stdin = codecs.getwriter(input_encoding)(sys.stdin) self.stdin.encoding = input_encoding self.stdout = codecs.getwriter(output_encoding)(sys.stdout) diff --git a/libbe/command/list.py b/libbe/command/list.py index c835815..ce43ec9 100644 --- a/libbe/command/list.py +++ b/libbe/command/list.py @@ -129,6 +129,7 @@ class List (libbe.command.Command): # ]) def _run(self, bugdir, **params): + bugdir.storage.writeable = False cmp_list, status, severity, assigned, extra_strings_regexps = \ self._parse_params(params) filter = Filter(status, severity, assigned, extra_strings_regexps) @@ -136,7 +137,7 @@ class List (libbe.command.Command): bugs = [b for b in bugs if filter(b) == True] self.result = bugs if len(bugs) == 0 and params['xml'] == False: - print "No matching bugs found" + print >> self.stdout, "No matching bugs found" # sort bugs bugs = self._sort_bugs(bugs, cmp_list) @@ -144,7 +145,7 @@ class List (libbe.command.Command): # print list of bugs if params['uuids'] == True: for bug in bugs: - print bug.uuid + print >> self.stdout, bug.uuid else: self._list_bugs(bugs, xml=params['xml']) @@ -205,16 +206,17 @@ class List (libbe.command.Command): def _list_bugs(self, bugs, xml=False): if xml == True: - print '<?xml version="1.0" encoding="%s" ?>' % self.stdout.encoding - print "<bugs>" + print >> self.stdout, \ + '<?xml version="1.0" encoding="%s" ?>' % self.stdout.encoding + print >> self.stdout, '<bugs>' if len(bugs) > 0: for bug in bugs: if xml == True: - print bug.xml(show_comments=True) + print >> self.stdout, bug.xml(show_comments=True) else: - print bug.string(shortlist=True) + print >> self.stdout, bug.string(shortlist=True) if xml == True: - print "</bugs>" + print >> self.stdout, '</bugs>' def _long_help(self): return """ diff --git a/libbe/comment.py b/libbe/comment.py index bf69a69..7b318cb 100644 --- a/libbe/comment.py +++ b/libbe/comment.py @@ -594,9 +594,11 @@ class Comment(Tree, settings_object.SavedSettingsObject): parent = self.bug.id.storage() else: parent = None - self.storage.add(self.id.storage(), parent=parent) - self.storage.add(self.id.storage('values'), parent=self.id.storage()) - self.storage.add(self.id.storage('body'), parent=self.id.storage()) + self.storage.add(self.id.storage(), parent=parent, directory=True) + self.storage.add(self.id.storage('values'), parent=self.id.storage(), + directory=False) + self.storage.add(self.id.storage('body'), parent=self.id.storage(), + directory=False) self.save_settings() self._set_comment_body(new=self.body, force=True) diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py index 9c954ee..5d5b918 100644 --- a/libbe/storage/__init__.py +++ b/libbe/storage/__init__.py @@ -5,9 +5,21 @@ import base ConnectionError = base.ConnectionError InvalidID = base.InvalidID InvalidRevision = base.InvalidRevision +InvalidDirectory = base.InvalidDirectory NotWriteable = base.NotWriteable NotReadable = base.NotReadable EmptyCommit = base.EmptyCommit +def get_storage(location): + """ + Return a Storage instance from a repo location string. + """ + import vcs + #s = vcs.detect_vcs(location) + s = vcs.vcs_by_name('None') + s.repo = location + return s + __all__ = [ConnectionError, InvalidID, InvalidRevision, - NotWriteable, NotReadable, EmptyCommit] + InvalidDirectory, NotWriteable, NotReadable, + EmptyCommit, get_storage] diff --git a/libbe/storage/base.py b/libbe/storage/base.py index eb2b94c..8419796 100644 --- a/libbe/storage/base.py +++ b/libbe/storage/base.py @@ -31,6 +31,12 @@ class InvalidID (KeyError): class InvalidRevision (KeyError): pass +class InvalidDirectory (Exception): + pass + +class DirectoryNotEmpty (InvalidDirectory): + pass + class NotWriteable (NotSupported): def __init__(self, msg): NotSupported.__init__(self, 'write', msg) @@ -44,7 +50,8 @@ class EmptyCommit(Exception): Exception.__init__(self, 'No changes to commit') class Entry (Tree): - def __init__(self, id, value=None, parent=None, children=None): + def __init__(self, id, value=None, parent=None, directory=False, + children=None): if children == None: Tree.__init__(self) else: @@ -53,7 +60,11 @@ class Entry (Tree): self.value = value self.parent = parent if self.parent != None: + if self.parent.directory == False: + raise InvalidDirectory( + 'Non-directory %s cannot have children' % self.parent) parent.append(self) + self.directory = directory def __str__(self): return '<Entry %s: %s>' % (self.id, self.value) @@ -101,7 +112,7 @@ class Storage (object): """ name = 'Storage' - def __init__(self, repo, encoding='utf-8', options=None): + def __init__(self, repo='/', encoding='utf-8', options=None): self.repo = repo self.encoding = encoding self.options = options @@ -113,7 +124,7 @@ class Storage (object): self.can_init = True def __str__(self): - return '<%s %s>' % (self.__class__.__name__, id(self)) + return '<%s %s %s>' % (self.__class__.__name__, id(self), self.repo) def __repr__(self): return str(self) @@ -139,7 +150,7 @@ class Storage (object): def _init(self): f = open(self.repo, 'wb') - root = Entry(id='__ROOT__') + root = Entry(id='__ROOT__', directory=True) d = {root.id:root} pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1) f.close() @@ -178,7 +189,7 @@ class Storage (object): f.close() self._data = None - def add(self, *args, **kwargs): + def add(self, id, *args, **kwargs): """Add an entry""" if self.is_writeable() == False: raise NotWriteable('Cannot add entry to unwriteable storage.') @@ -186,13 +197,13 @@ class Storage (object): self.get(id) pass # yup, no need to add another except InvalidID: - self._add(*args, **kwargs) + self._add(id, *args, **kwargs) - def _add(self, id, parent=None): + def _add(self, id, parent=None, directory=False): if parent == None: parent = '__ROOT__' p = self._data[parent] - self._data[id] = Entry(id, parent=p) + self._data[id] = Entry(id, parent=p, directory=directory) def remove(self, *args, **kwargs): """Remove an entry.""" @@ -202,6 +213,9 @@ class Storage (object): self._remove(*args, **kwargs) def _remove(self, id): + if self._data[id].directory == True \ + and len(self.children(id)) > 0: + raise DirectoryNotEmpty(id) e = self._data.pop(id) e.parent.remove(e) @@ -213,7 +227,7 @@ class Storage (object): self._recursive_remove(*args, **kwargs) def _recursive_remove(self, id): - for entry in self._data[id].traverse(): + for entry in reversed(list(self._data[id].traverse())): self._remove(entry.id) def children(self, *args, **kwargs): @@ -266,6 +280,9 @@ class Storage (object): def _set(self, id, value): if id not in self._data: raise InvalidID(id) + if self._data[id].directory == True: + raise InvalidDirectory( + 'Directory %s cannot have data' % self.parent) self._data[id].value = value class VersionedStorage (Storage): @@ -283,7 +300,7 @@ class VersionedStorage (Storage): def _init(self): f = open(self.repo, 'wb') - root = Entry(id='__ROOT__') + root = Entry(id='__ROOT__', directory=True) summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit') body = Entry(id='__COMMIT__BODY__') initial_commit = {root.id:root, summary.id:summary, body.id:body} @@ -311,18 +328,21 @@ class VersionedStorage (Storage): f.close() self._data = None - def _add(self, id, parent=None): + def _add(self, id, parent=None, directory=False): if parent == None: parent = '__ROOT__' p = self._data[-1][parent] - self._data[-1][id] = Entry(id, parent=p) + self._data[-1][id] = Entry(id, parent=p, directory=directory) def _remove(self, id): + if self._data[-1][id].directory == True \ + and len(self.children(id)) > 0: + raise DirectoryNotEmpty(id) e = self._data[-1].pop(id) e.parent.remove(e) def _recursive_remove(self, id): - for entry in self._data[-1][id].traverse(): + for entry in reversed(list(self._data[-1][id].traverse())): self._remove(entry.id) def _children(self, id=None, revision=None): @@ -416,6 +436,7 @@ if TESTING == True: self.s.disconnect() self.s.destroy() self.assert_failed_connect() + self.dir.cleanup() def assert_failed_connect(self): try: @@ -440,12 +461,12 @@ if TESTING == True: """New repository should be empty.""" self.failUnless(len(self.s.children()) == 0, self.s.children()) - def test_add_rooted(self): + def test_add_identical_rooted(self): """ Adding entries with the same ID should not increase the number of children. """ for i in range(10): - self.s.add('some id') + self.s.add('some id', directory=False) s = sorted(self.s.children()) self.failUnless(s == ['some id'], s) @@ -456,7 +477,7 @@ if TESTING == True: ids = [] for i in range(10): ids.append(str(i)) - self.s.add(ids[-1]) + self.s.add(ids[-1], directory=False) s = sorted(self.s.children()) self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) @@ -464,16 +485,50 @@ if TESTING == True: """ Adding entries should increase the number of children (nonrooted). """ - self.s.add('parent') + self.s.add('parent', directory=True) ids = [] for i in range(10): ids.append(str(i)) - self.s.add(ids[-1], 'parent') + self.s.add(ids[-1], 'parent', directory=True) s = sorted(self.s.children('parent')) self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) s = self.s.children() self.failUnless(s == ['parent'], s) - + + def test_children(self): + """ + Non-UUID ids should be returned as such. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append('parent/%s' % str(i)) + self.s.add(ids[-1], 'parent', directory=True) + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + + def test_add_invalid_directory(self): + """ + Should not be able to add children to non-directories. + """ + self.s.add('parent', directory=False) + try: + self.s.add('child', 'parent', directory=False) + self.fail( + '%s.add() succeeded instead of raising InvalidDirectory' + % (vars(self.Class)['name'])) + except InvalidDirectory: + pass + try: + self.s.add('child', 'parent', directory=True) + self.fail( + '%s.add() succeeded instead of raising InvalidDirectory' + % (vars(self.Class)['name'])) + except InvalidDirectory: + pass + self.failUnless(len(self.s.children('parent')) == 0, + self.s.children('parent')) + def test_remove_rooted(self): """ Removing entries should decrease the number of children (rooted). @@ -481,7 +536,7 @@ if TESTING == True: ids = [] for i in range(10): ids.append(str(i)) - self.s.add(ids[-1]) + self.s.add(ids[-1], directory=True) for i in range(10): self.s.remove(ids.pop()) s = sorted(self.s.children()) @@ -491,11 +546,11 @@ if TESTING == True: """ Removing entries should decrease the number of children (nonrooted). """ - self.s.add('parent') + self.s.add('parent', directory=True) ids = [] for i in range(10): ids.append(str(i)) - self.s.add(ids[-1], 'parent') + self.s.add(ids[-1], 'parent', directory=False) for i in range(10): self.s.remove(ids.pop()) s = sorted(self.s.children('parent')) @@ -503,17 +558,35 @@ if TESTING == True: s = self.s.children() self.failUnless(s == ['parent'], s) + def test_remove_directory_not_empty(self): + """ + Removing a non-empty directory entry should raise exception. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=True) + self.s.remove(ids.pop()) # empty directory removal succeeds + try: + self.s.remove('parent') # empty directory removal succeeds + self.fail( + "%s.remove() didn't raise DirectoryNotEmpty" + % (vars(self.Class)['name'])) + except DirectoryNotEmpty: + pass + def test_recursive_remove(self): """ Recursive remove should empty the tree. """ - self.s.add('parent') + self.s.add('parent', directory=True) ids = [] for i in range(10): ids.append(str(i)) - self.s.add(ids[-1], 'parent') + self.s.add(ids[-1], 'parent', directory=True) for j in range(10): # add some grandkids - self.s.add(str(20*i+j), ids[-i]) + self.s.add(str(20*(i+1)+j), ids[-1], directory=False) self.s.recursive_remove('parent') s = sorted(self.s.children()) self.failUnless(s == [], s) @@ -549,7 +622,7 @@ if TESTING == True: """ Data value should be None before any value has been set. """ - self.s.add(self.id) + self.s.add(self.id, directory=False) ret = self.s.get(self.id) self.failUnless(ret == None, "%s.get() returned %s not None" @@ -571,7 +644,7 @@ if TESTING == True: """ Set should define the value returned by get. """ - self.s.add(self.id) + self.s.add(self.id, directory=False) self.s.set(self.id, self.val) ret = self.s.get(self.id) self.failUnless(ret == self.val, @@ -583,7 +656,7 @@ if TESTING == True: Set should define the value returned by get. """ val = u'Fran\xe7ois' - self.s.add(self.id) + self.s.add(self.id, directory=False) self.s.set(self.id, val) ret = self.s.get(self.id, decode=True) self.failUnless(type(ret) == types.UnicodeType, @@ -612,7 +685,7 @@ if TESTING == True: """ Set should define the value returned by get after reconnect. """ - self.s.add(self.id) + self.s.add(self.id, directory=False) self.s.set(self.id, self.val) self.s.disconnect() self.s.connect() @@ -625,11 +698,11 @@ if TESTING == True: """ Adding entries should increase the number of children after reconnect. """ - self.s.add('parent') + self.s.add('parent', directory=True) ids = [] for i in range(10): ids.append(str(i)) - self.s.add(ids[-1], 'parent') + self.s.add(ids[-1], 'parent', directory=False) self.s.disconnect() self.s.connect() s = sorted(self.s.children('parent')) @@ -707,7 +780,7 @@ if TESTING == True: """ def val(i): return '%s:%d' % (self.val, i+1) - self.s.add(self.id) + self.s.add(self.id, directory=False) revs = [] for i in range(10): self.s.set(self.id, val(i)) @@ -716,7 +789,7 @@ if TESTING == True: for i in range(10): ret = self.s.get(self.id, revision=revs[i]) self.failUnless(ret == val(i), - "%s.get() returned %s not %s for revision %d" + "%s.get() returned %s not %s for revision %s" % (vars(self.Class)['name'], ret, val(i), revs[i])) def make_storage_testcase_subclasses(storage_class, namespace): diff --git a/libbe/storage/vcs/util/upgrade.py b/libbe/storage/util/upgrade.py index dc9d54f..7ef760e 100644 --- a/libbe/storage/vcs/util/upgrade.py +++ b/libbe/storage/util/upgrade.py @@ -15,17 +15,18 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ -Handle conversion between the various on-disk images. +Handle conversion between the various BE storage formats. """ +import codecs import os, os.path import sys import libbe -import bug -import encoding -import mapfile -import vcs +import libbe.bug as bug +import libbe.util.encoding as encoding +import libbe.storage.util.mapfile as mapfile + if libbe.TESTING == True: import doctest @@ -39,30 +40,25 @@ BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0", BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1] class Upgrader (object): - "Class for converting " + "Class for converting between different on-disk BE storage formats." initial_version = None final_version = None def __init__(self, root): self.root = root - # use the "None" VCS to ensure proper encoding/decoding and - # simplify path construction. - self.vcs = vcs.vcs_by_name("None") - self.vcs.root(self.root) - self.vcs.encoding = encoding.get_encoding() def get_path(self, *args): """ Return a path relative to .root. """ - dir = os.path.join(self.root, ".be") + dir = os.path.join(self.root, '.be') if len(args) == 0: return dir - assert args[0] in ["version", "settings", "bugs"], str(args) + assert args[0] in ['version', 'settings', 'bugs'], str(args) return os.path.join(dir, *args) def check_initial_version(self): - path = self.get_path("version") - version = self.vcs.get_file_contents(path).rstrip("\n") + path = self.get_path('version') + version = self.vcs.get_file_contents(path).rstrip('\n') assert version == self.initial_version, version def set_version(self): diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py new file mode 100644 index 0000000..ddfb00a --- /dev/null +++ b/libbe/storage/vcs/__init__.py @@ -0,0 +1,10 @@ +# Copyright + +import base + +set_preferred_vcs = base.set_preferred_vcs +vcs_by_name = base.vcs_by_name +detect_vcs = base.detect_vcs +installed_vcs = base.installed_vcs + +__all__ = [set_preferred_vcs, vcs_by_name, detect_vcs, installed_vcs] diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py index f8b0727..8c0ecf5 100644 --- a/libbe/storage/vcs/base.py +++ b/libbe/storage/vcs/base.py @@ -29,20 +29,23 @@ import codecs import os import os.path import re -from socket import gethostname import shutil import sys import tempfile import libbe -from utility import Dir, search_parent_directories -from subproc import CommandError, invoke -from plugin import get_plugin +import libbe.storage.base +import libbe.util.encoding +from libbe.util.utility import Dir, search_parent_directories +from libbe.util.subproc import CommandError, invoke +from libbe.util.plugin import import_by_name +#import libbe.storage.util.upgrade as upgrade if libbe.TESTING == True: import unittest import doctest + import libbe.ui.util.user # List VCS modules in order of preference. # Don't list this module, it is implicitly last. @@ -58,62 +61,243 @@ def set_preferred_vcs(name): def _get_matching_vcs(matchfn): """Return the first module for which matchfn(VCS_instance) is true""" for submodname in VCS_ORDER: - module = get_plugin('libbe', submodname) + module = import_by_name('libbe.storage.vcs.%s' % submodname) vcs = module.new() if matchfn(vcs) == True: return vcs vcs.cleanup() return VCS() - + def vcs_by_name(vcs_name): """Return the module for the VCS with the given name""" + if vcs_name == VCS.name: + return new() return _get_matching_vcs(lambda vcs: vcs.name == vcs_name) def detect_vcs(dir): """Return an VCS instance for the vcs being used in this directory""" - return _get_matching_vcs(lambda vcs: vcs.detect(dir)) + return _get_matching_vcs(lambda vcs: vcs._detect(dir)) def installed_vcs(): """Return an instance of an installed VCS""" return _get_matching_vcs(lambda vcs: vcs.installed()) - -class SettingIDnotSupported(NotImplementedError): - pass - -class VCSnotRooted(Exception): +class VCSnotRooted (libbe.storage.base.ConnectionError): def __init__(self): - msg = "VCS not rooted" - Exception.__init__(self, msg) - -class PathNotInRoot(Exception): - def __init__(self, path, root): - msg = "Path '%s' not in root '%s'" % (path, root) - Exception.__init__(self, msg) + msg = 'VCS not rooted' + libbe.storage.base.ConnectionError.__init__(self, msg) + +class InvalidPath (libbe.storage.base.InvalidID): + def __init__(self, path, root, msg=None): + if msg == None: + msg = 'Path "%s" not in root "%s"' % (path, root) + libbe.storage.base.InvalidID.__init__(self, msg) self.path = path self.root = root -class NoSuchFile(Exception): - def __init__(self, pathname, root="."): +class SpacerCollision (InvalidPath): + def __init__(self, path, spacer): + msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer) + InvalidPath.__init__(self, path, root=None, msg=msg) + self.spacer = spacer + +class NoSuchFile (libbe.storage.base.InvalidID): + def __init__(self, pathname, root='.'): path = os.path.abspath(os.path.join(root, pathname)) - Exception.__init__(self, "No such file: %s" % path) + libbe.storage.base.InvalidID.__init__(self, 'No such file: %s' % path) -class EmptyCommit(Exception): - def __init__(self): - Exception.__init__(self, "No changes to commit") + +class CachedPathID (object): + """ + Storage ID <-> path policy. + .../.be/BUGDIR/bugs/BUG/comments/COMMENT + ^-- root path + + >>> dir = Dir() + >>> os.mkdir(os.path.join(dir.path, '.be')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456')) + >>> file(os.path.join(dir.path, '.be', 'abc', 'values'), + ... 'w').close() + >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'), + ... 'w').close() + >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'), + ... 'w').close() + >>> c = CachedPathID() + >>> c.root(dir.path) + >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values')) + 'def/values' + >>> c.init() + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc', 'id-cache'] + >>> c.connect() + >>> c.path('123/values') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/values' + >>> c.disconnect() + >>> c.destroy() + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc'] + >>> c.connect() # demonstrate auto init + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc', 'id-cache'] + >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS + u'.../.be/xyz' + >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS + u'.../.be/xyz/def' + >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/comments/qrs' + >>> c.disconnect() + >>> c.connect() + >>> c.path('qrs') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/comments/qrs' + >>> c.remove_id('qrs') + >>> c.path('qrs') + Traceback (most recent call last): + ... + InvalidID: 'qrs' + >>> c.disconnect() + >>> c.destroy() + >>> dir.cleanup() + """ + def __init__(self, encoding=None): + self.encoding = libbe.util.encoding.get_filesystem_encoding() + self._spacer_dirs = ['.be', 'bugs', 'comments'] + + def root(self, path): + self._root = os.path.abspath(path).rstrip(os.path.sep) + self._cache_path = os.path.join( + self._root, self._spacer_dirs[0], 'id-cache') + + def init(self): + """ + Create cache file for an existing .be directory. + File if multiple lines of the form: + UUID\tPATH + """ + self._cache = {} + spaced_root = os.path.join(self._root, self._spacer_dirs[0]) + for dirpath, dirnames, filenames in os.walk(spaced_root): + if dirpath == spaced_root: + continue + try: + id = self.id(dirpath) + relpath = dirpath[len(self._root)+1:] + if id.count('/') == 0: + self._cache[id] = relpath + except InvalidPath: + pass + self._changed = True + self.disconnect() + + def destroy(self): + os.remove(self._cache_path) + + def connect(self): + if not os.path.exists(self._cache_path): + try: + self.init() + except IOError: + raise libbe.storage.base.ConnectionError + self._cache = {} # key: uuid, value: path + self._changed = False + f = codecs.open(self._cache_path, 'r', self.encoding) + for line in f: + fields = line.rstrip('\n').split('\t') + self._cache[fields[0]] = fields[1] + f.close() + + def disconnect(self): + if self._changed == True: + f = codecs.open(self._cache_path, 'w', self.encoding) + for uuid,path in self._cache.items(): + f.write('%s\t%s\n' % (uuid, path)) + f.close() + self._cache = {} + + def path(self, id, relpath=False): + fields = id.split('/', 1) + uuid = fields[0] + if len(fields) == 1: + extra = [] + else: + extra = fields[1:] + if uuid not in self._cache: + raise libbe.storage.base.InvalidID(uuid) + if relpath == True: + return os.path.join(self._cache[uuid], *extra) + return os.path.join(self._root, self._cache[uuid], *extra) + + def add_id(self, id, parent=None): + if id.count('/') > 0: + # not a UUID-level path + assert id.startswith(parent), \ + 'Strange ID: "%s" should start with "%s"' % (id, parent) + path = self.path(id) + elif id in self._cache: + # already added + path = self.path(id) + else: + if parent == None: + parent_path = '' + spacer = self._spacer_dirs[0] + else: + assert parent.count('/') == 0, \ + 'Strange parent ID: "%s" should be UUID' % parent + parent_path = self.path(parent, relpath=True) + parent_spacer = parent_path.split(os.path.sep)[-2] + i = self._spacer_dirs.index(parent_spacer) + spacer = self._spacer_dirs[i+1] + path = os.path.join(parent_path, spacer, id) + self._cache[id] = path + self._changed = True + path = os.path.join(self._root, path) + return path + + def remove_id(self, id): + if id.count('/') > 0: + return # not a UUID-level path + self._cache.pop(id) + self._changed = True + + def id(self, path): + path = os.path.abspath(path) + if not path.startswith(self._root + os.path.sep): + raise InvalidPath('Path %s not in root %s' % (path, self._root)) + path = path[len(self._root)+1:] + orig_path = path + if not path.startswith(self._spacer_dirs[0] + os.path.sep): + raise InvalidPath(path, self._spacer_dirs[0]) + for spacer in self._spacer_dirs: + if not path.startswith(spacer + os.path.sep): + break + id = path[len(spacer)+1:] + fields = path[len(spacer)+1:].split(os.path.sep,1) + if len(fields) == 1: + break + path = fields[1] + for spacer in self._spacer_dirs: + if id.endswith(os.path.sep + spacer): + raise SpacerCollision(orig_path, spacer) + if os.path.sep != '/': + id = id.replace(os.path.sep, '/') + return id def new(): return VCS() -class VCS(object): +class VCS (libbe.storage.base.VersionedStorage): """ This class implements a 'no-vcs' interface. Support for other VCSs can be added by subclassing this class, and overriding methods _vcs_*() with code appropriate for your VCS. - + The methods _u_*() are utility methods available to the _vcs_*() methods. @@ -127,7 +311,7 @@ class VCS(object): /path/to/source/.be However, you're of in some subdirectory like /path/to/source/GUI/testing - and you want to comment on a bug. Setting sink_to_root=True wen + and you want to comment on a bug. Setting sink_to_root=True when you initialize your BugDir will cause it to search for the '.be' file in the ancestors of the path you passed in as 'root'. /path/to/source/GUI/testing/.be miss @@ -245,113 +429,110 @@ class VCS(object): os.listdir(self.get_path("bugs")): """ - name = "None" - client = "" # command-line tool for _u_invoke_client - versioned = False - def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()): - self.paranoid = paranoid - self.verboseInvoke = False - self.rootdir = None - self._duplicateBasedir = None - self._duplicateDirname = None - self.encoding = encoding - def __str__(self): - return "<%s %s>" % (self.__class__.__name__, id(self)) - def __repr__(self): - return str(self) + name = 'None' + client = 'false' # command-line tool for _u_invoke_client + + def __init__(self, *args, **kwargs): + if 'encoding' not in kwargs: + kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding() + libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs) + self.versioned = False + self.verbose_invoke = False + self._cached_path_id = CachedPathID() + def _vcs_version(self): """ Return the VCS version string. """ - return "0.0" + return '0' + + def _vcs_get_user_id(self): + """ + Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the VCS has not been configured with a username, return None. + """ + return None + def _vcs_detect(self, path=None): """ Detect whether a directory is revision controlled with this VCS. """ return True + def _vcs_root(self, path): """ Get the VCS root. This is the default working directory for future invocations. You would normally set this to the root directory for your VCS. """ - if os.path.isdir(path)==False: + if os.path.isdir(path) == False: path = os.path.dirname(path) - if path == "": - path = os.path.abspath(".") + if path == '': + path = os.path.abspath('.') return path - def _vcs_init(self, path): + + def _vcs_init(self): """ - Begin versioning the tree based at path. + Begin versioning the tree based at self.repo. """ pass - def _vcs_cleanup(self): + + def _vcs_destroy(self): """ - Remove any cruft that _vcs_init() created outside of the - versioned tree. + Remove any files used in versioning (e.g. whatever _vcs_init() + created). """ pass - def _vcs_get_user_id(self): - """ - Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). - If the VCS has not been configured with a username, return None. - """ - return None - def _vcs_set_user_id(self, value): - """ - Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>"). - This is run if the VCS has not been configured with a usename, so - that commits will have a reasonable FROM value. - """ - raise SettingIDnotSupported + def _vcs_add(self, path): """ Add the already created file at path to version control. """ pass + def _vcs_remove(self, path): """ Remove the file at path from version control. Optionally remove the file from the filesystem as well. """ pass + def _vcs_update(self, path): """ Notify the versioning system of changes to the versioned file at path. """ pass - def _vcs_get_file_contents(self, path, revision=None, binary=False): + + def _vcs_get_file_contents(self, path, revision=None): """ Get the file contents as they were in a given revision. Revision==None specifies the current revision. """ - assert revision == None, \ - "The %s VCS does not support revision specifiers" % self.name - if binary == False: - f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding) - else: - f = open(os.path.join(self.rootdir, path), "rb") + if revision != None: + raise libbe.storage.base.InvalidRevision( + 'The %s VCS does not support revision specifiers' % self.name) + path = os.path.join(self.repo, path) + if not os.path.exists(path): + return libbe.util.InvalidObject + if os.path.isdir(path): + return libbe.storage.base.InvalidDirectory + f = open(path, 'rb') contents = f.read() f.close() return contents - def _vcs_duplicate_repo(self, directory, revision=None): - """ - Get the repository as it was in a given revision. - revision==None specifies the current revision. - dir specifies a directory to create the duplicate in. - """ - shutil.copytree(self.rootdir, directory, True) + def _vcs_commit(self, commitfile, allow_empty=False): """ Commit the current working directory, using the contents of commitfile as the comment. Return the name of the old revision (or None if commits are not supported). - + If allow_empty == False, raise EmptyCommit if there are no changes to commit. """ return None + def _vcs_revision_id(self, index): """ Return the name of the <index>th revision. Index will be an @@ -362,11 +543,13 @@ os.listdir(self.get_path("bugs")): specified revision does not exist. """ return None + def version(self): - """Cache version string for efficiency.""" + # Cache version string for efficiency. if not hasattr(self, '_version'): self._version = self._get_version() return self._version + def _get_version(self): try: ret = self._vcs_version() @@ -378,183 +561,166 @@ os.listdir(self.get_path("bugs")): raise OSError, e except CommandError: return None + def installed(self): if self.version() != None: return True return False - def detect(self, path="."): - """ - Detect whether a directory is revision controlled with this VCS. - """ - return self._vcs_detect(path) - def root(self, path): - """ - Set the root directory to the path's VCS root. This is the - default working directory for future invocations. - """ - self.rootdir = self._vcs_root(path) - def init(self, path): - """ - Begin versioning the tree based at path. - Also roots the vcs at path. - """ - if os.path.isdir(path)==False: - path = os.path.dirname(path) - self._vcs_init(path) - self.root(path) - def cleanup(self): - self._vcs_cleanup() + def get_user_id(self): """ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). - If the VCS has not been configured with a username, return the user's - id. You can override the automatic lookup procedure by setting the + If the VCS has not been configured with a username, return None. + You can override the automatic lookup procedure by setting the VCS.user_id attribute to a string of your choice. """ - if hasattr(self, "user_id"): - if self.user_id != None: - return self.user_id - id = self._vcs_get_user_id() - if id == None: - name = self._u_get_fallback_username() - email = self._u_get_fallback_email() - id = self._u_create_id(name, email) - print >> sys.stderr, "Guessing id '%s'" % id - try: - self.set_user_id(id) - except SettingIDnotSupported: - pass - return id - def set_user_id(self, value): + if not hasattr(self, 'user_id'): + self.user_id = self._vcs_get_user_id() + return self.user_id + + def _detect(self, path='.'): """ - Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>"). - This is run if the VCS has not been configured with a usename, so - that commits will have a reasonable FROM value. + Detect whether a directory is revision controlled with this VCS. """ - self._vcs_set_user_id(value) - def add(self, path): + return self._vcs_detect(path) + + def root(self): """ - Add the already created file at path to version control. + Set the root directory to the path's VCS root. This is the + default working directory for future invocations. """ - self._vcs_add(self._u_rel_path(path)) - def remove(self, path): + self.repo = os.path.abspath(self._vcs_root(self.repo)) + if os.path.isdir(self.repo) == False: + self.repo = os.path.dirname(self.repo) + self.be_dir = os.path.join( + self.repo, self._cached_path_id._spacer_dirs[0]) + self._cached_path_id.root(self.repo) + + def _init(self): """ - Remove a file from both version control and the filesystem. + Begin versioning the tree based at self.repo. + Also roots the vcs at path. """ - self._vcs_remove(self._u_rel_path(path)) + self.root() + self._vcs_init() + os.mkdir(self.be_dir) + self._vcs_add(self._u_rel_path(self.be_dir)) + self._cached_path_id.init() + + def _destroy(self): + self._vcs_destroy() + self._cached_path_id.destroy() + shutil.rmtree(self.be_dir) + + def _connect(self): + self.root() + self._cached_path_id.connect() + self.check_disk_version() + + def disconnect(self): + self._cached_path_id.disconnect() + + def _add(self, id, parent=None, directory=False): + path = self._cached_path_id.add_id(id, parent) + relpath = self._u_rel_path(path) + reldirs = relpath.split(os.path.sep) + if directory == False: + reldirs = reldirs[:-1] + dir = self.repo + for reldir in reldirs: + dir = os.path.join(dir, reldir) + if not os.path.exists(dir): + os.mkdir(dir) + self._vcs_add(self._u_rel_path(dir)) + elif not os.path.isdir(dir): + raise libbe.storage.base.InvalidDirectory + if directory == False: + if not os.path.exists(path): + open(path, 'w').close() + self._vcs_add(self._u_rel_path(path)) + + def _remove(self, id): + path = self._cached_path_id.path(id) if os.path.exists(path): - os.remove(path) - def recursive_remove(self, dirname): - """ - Remove a file/directory and all its decendents from both - version control and the filesystem. - """ - if not os.path.exists(dirname): - raise NoSuchFile(dirname) - for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): + if os.path.isdir(path) and len(self.children(id)) > 0: + raise libbe.storage.base.DirectoryNotEmpty(id) + self._vcs_remove(self._u_rel_path(path)) + if os.path.exists(path): + if os.path.isdir(path): + os.rmdir(path) + else: + os.remove(path) + self._cached_path_id.remove_id(id) + + def _recursive_remove(self, id): + path = self._cached_path_id.path(id) + for dirpath,dirnames,filenames in os.walk(path, topdown=False): filenames.extend(dirnames) - for path in filenames: - fullpath = os.path.join(dirpath, path) + for f in filenames: + fullpath = os.path.join(dirpath, f) if os.path.exists(fullpath) == False: continue self._vcs_remove(self._u_rel_path(fullpath)) - if os.path.exists(dirname): - shutil.rmtree(dirname) - def update(self, path): - """ - Notify the versioning system of changes to the versioned file - at path. - """ - self._vcs_update(self._u_rel_path(path)) - def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False): - """ - Get the file as it was in a given revision. - Revision==None specifies the current revision. - - allow_no_vcs==True allows direct access to files through - codecs.open() or open() if the vcs decides it can't handle the - given path. - """ - if not os.path.exists(path): - raise NoSuchFile(path) - if self._use_vcs(path, allow_no_vcs): - relpath = self._u_rel_path(path) - contents = self._vcs_get_file_contents(relpath,revision,binary=binary) + if os.path.exists(path): + shutil.rmtree(path) + path = self._cached_path_id.path(id, relpath=True) + for id,p in self._cached_path_id._cache.items(): + if p.startswith(path): + self._cached_path_id.remove_id(id) + + def _children(self, id=None, revision=None): + if id==None: + path = self.be_dir else: - if binary == True: - f = codecs.open(path, "r", self.encoding) - else: - f = open(path, "rb") - contents = f.read() - f.close() + path = self._cached_path_id.path(id) + if os.path.isdir(path) == False: + return [] + children = os.listdir(path) + for i,c in enumerate(children): + if c in self._cached_path_id._spacer_dirs: + children[i] = None + children.extend([os.path.join(c, c2) for c2 in + os.listdir(os.path.join(path, c))]) + elif c == 'id-cache': + children[i] = None + for i,c in enumerate(children): + if c == None: continue + cpath = os.path.join(path, c) + children[i] = self._cached_path_id.id(cpath) + return [c for c in children if c != None] + + def _get(self, id, default=libbe.util.InvalidObject, revision=None): + try: + path = self._cached_path_id.path(id) + except libbe.storage.base.InvalidID, e: + if default == libbe.util.InvalidObject: + raise e + return default + relpath = self._u_rel_path(path) + contents = self._vcs_get_file_contents(relpath,revision) + if contents == libbe.storage.base.InvalidDirectory: + raise libbe.storage.base.InvalidDirectory(id) + elif contents == libbe.util.InvalidObject: + raise libbe.storage.base.InvalidID(id) + elif len(contents) == 0: + return None return contents - def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False): - """ - Set the file contents under version control. - """ - add = not os.path.exists(path) - if binary == False: - f = codecs.open(path, "w", self.encoding) - else: - f = open(path, "wb") - f.write(contents) - f.close() - - if self._use_vcs(path, allow_no_vcs): - if add: - self.add(path) - else: - self.update(path) - def mkdir(self, path, allow_no_vcs=False, check_parents=True): - """ - Create (if neccessary) a directory at path under version - control. - """ - if check_parents == True: - parent = os.path.dirname(path) - if not os.path.exists(parent): # recurse through parents - self.mkdir(parent, allow_no_vcs, check_parents) + + def _set(self, id, value): + try: + path = self._cached_path_id.path(id) + except libbe.storage.base.InvalidID, e: + raise e if not os.path.exists(path): - os.mkdir(path) - if self._use_vcs(path, allow_no_vcs): - self.add(path) - else: - assert os.path.isdir(path) - if self._use_vcs(path, allow_no_vcs): - #self.update(path)# Don't update directories. Changing files - pass # underneath them should be sufficient. - - def duplicate_repo(self, revision=None): - """ - Get the repository as it was in a given revision. - revision==None specifies the current revision. - Return the path to the arbitrary directory at the base of the new repo. - """ - # Dirname in Basedir to protect against simlink attacks. - if self._duplicateBasedir == None: - self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs') - self._duplicateDirname = \ - os.path.join(self._duplicateBasedir, "duplicate") - self._vcs_duplicate_repo(directory=self._duplicateDirname, - revision=revision) - return self._duplicateDirname - def remove_duplicate_repo(self): - """ - Clean up a duplicate repo created with duplicate_repo(). - """ - if self._duplicateBasedir != None: - shutil.rmtree(self._duplicateBasedir) - self._duplicateBasedir = None - self._duplicateDirname = None - def commit(self, summary, body=None, allow_empty=False): - """ - Commit the current working directory, with a commit message - string summary and body. Return the name of the old revision - (or None if versioning is not supported). - - If allow_empty == False (the default), raise EmptyCommit if - there are no changes to commit. - """ + raise libbe.storage.base.InvalidID(id) + if os.path.isdir(path): + raise libbe.storage.base.InvalidDirectory(id) + f = open(path, "wb") + f.write(value) + f.close() + self._vcs_update(self._u_rel_path(path)) + + def _commit(self, summary, body=None, allow_empty=False): summary = summary.strip()+'\n' if body is not None: summary += '\n' + body.strip() + '\n' @@ -564,35 +730,20 @@ os.listdir(self.get_path("bugs")): temp_file = os.fdopen(descriptor, 'wb') temp_file.write(summary) temp_file.flush() - self.precommit() revision = self._vcs_commit(filename, allow_empty=allow_empty) temp_file.close() - self.postcommit() finally: os.remove(filename) return revision - def precommit(self): - """ - Executed before all attempted commits. - """ - pass - def postcommit(self): - """ - Only executed after successful commits. - """ - pass - def revision_id(self, index=None): - """ - Return the name of the <index>th revision. The choice of - which branch to follow when crossing branches/merges is not - defined. - Return None if index==None, revision IDs are not supported, or - if the specified revision does not exist. - """ + def revision_id(self, index=None): if index == None: return None - return self._vcs_revision_id(index) + revid = self._vcs_revision_id(index) + if revid == None: + raise libbe.storage.base.InvalidRevision(index) + return revid + def _u_any_in_string(self, list, string): """ Return True if any of the strings in list are in string. @@ -602,23 +753,26 @@ os.listdir(self.get_path("bugs")): if list_string in string: return True return False + def _u_invoke(self, *args, **kwargs): if 'cwd' not in kwargs: - kwargs['cwd'] = self.rootdir + kwargs['cwd'] = self.repo if 'verbose' not in kwargs: - kwargs['verbose'] = self.verboseInvoke + kwargs['verbose'] = self.verbose_invoke if 'encoding' not in kwargs: kwargs['encoding'] = self.encoding return invoke(*args, **kwargs) + def _u_invoke_client(self, *args, **kwargs): cl_args = [self.client] cl_args.extend(args) return self._u_invoke(cl_args, **kwargs) + def _u_search_parent_directories(self, path, filename): """ Find the file (or directory) named filename in path or in any of path's parents. - + e.g. search_parent_directories("/a/b/c", ".be") will return the path to the first existing file from @@ -629,6 +783,7 @@ os.listdir(self.get_path("bugs")): or None if none of those files exist. """ return search_parent_directories(path, filename) + def _use_vcs(self, path, allow_no_vcs): """ Try and decide if _vcs_add/update/mkdir/etc calls will @@ -637,16 +792,17 @@ os.listdir(self.get_path("bugs")): """ use_vcs = True exception = None - if self.rootdir != None: + if self.repo != None: if self.path_in_root(path) == False: use_vcs = False - exception = PathNotInRoot(path, self.rootdir) + exception = InvalidPath(path, self.repo) else: use_vcs = False exception = VCSnotRooted if use_vcs == False and allow_no_vcs==False: raise exception return use_vcs + def path_in_root(self, path, root=None): """ Return the relative path to path from root. @@ -657,15 +813,16 @@ os.listdir(self.get_path("bugs")): False """ if root == None: - if self.rootdir == None: + if self.repo == None: raise VCSnotRooted - root = self.rootdir + root = self.repo path = os.path.abspath(path) absRoot = os.path.abspath(root) absRootSlashedDir = os.path.join(absRoot,"") if not path.startswith(absRootSlashedDir): return False return True + def _u_rel_path(self, path, root=None): """ Return the relative path to path from root. @@ -674,18 +831,19 @@ os.listdir(self.get_path("bugs")): '.be' """ if root == None: - if self.rootdir == None: + if self.repo == None: raise VCSnotRooted - root = self.rootdir + root = self.repo path = os.path.abspath(path) absRoot = os.path.abspath(root) absRootSlashedDir = os.path.join(absRoot,"") if not path.startswith(absRootSlashedDir): - raise PathNotInRoot(path, absRootSlashedDir) + raise InvalidPath(path, absRootSlashedDir) assert path != absRootSlashedDir, \ "file %s == root directory %s" % (path, absRootSlashedDir) relpath = path[len(absRootSlashedDir):] return relpath + def _u_abspath(self, path, root=None): """ Return the absolute path from a path realtive to root. @@ -694,60 +852,10 @@ os.listdir(self.get_path("bugs")): '/a.b/c/.be' """ if root == None: - assert self.rootdir != None, "VCS not rooted" - root = self.rootdir + assert self.repo != None, "VCS not rooted" + root = self.repo return os.path.abspath(os.path.join(root, path)) - def _u_create_id(self, name, email=None): - """ - >>> vcs = new() - >>> vcs._u_create_id("John Doe", "jdoe@example.com") - 'John Doe <jdoe@example.com>' - >>> vcs._u_create_id("John Doe") - 'John Doe' - """ - assert len(name) > 0 - if email == None or len(email) == 0: - return name - else: - return "%s <%s>" % (name, email) - def _u_parse_id(self, value): - """ - >>> vcs = new() - >>> vcs._u_parse_id("John Doe <jdoe@example.com>") - ('John Doe', 'jdoe@example.com') - >>> vcs._u_parse_id("John Doe") - ('John Doe', None) - >>> try: - ... vcs._u_parse_id("John Doe <jdoe@example.com><what?>") - ... except AssertionError: - ... print "Invalid match" - Invalid match - """ - emailexp = re.compile("(.*) <([^>]*)>(.*)") - match = emailexp.search(value) - if match == None: - email = None - name = value - else: - assert len(match.groups()) == 3 - assert match.groups()[2] == "", match.groups() - email = match.groups()[1] - name = match.groups()[0] - assert name != None - assert len(name) > 0 - return (name, email) - def _u_get_fallback_username(self): - name = None - for envariable in ["LOGNAME", "USERNAME"]: - if os.environ.has_key(envariable): - name = os.environ[envariable] - break - assert name != None - return name - def _u_get_fallback_email(self): - hostname = gethostname() - name = self._u_get_fallback_username() - return "%s@%s" % (name, hostname) + def _u_parse_commitfile(self, commitfile): """ Split the commitfile created in self.commit() back into @@ -763,9 +871,9 @@ os.listdir(self.get_path("bugs")): return (summary, body) def check_disk_version(self): - version = self.get_version() - if version != upgrade.BUGDIR_DISK_VERSION: - upgrade.upgrade(self.root, version) + version = self.version() + #if version != upgrade.BUGDIR_DISK_VERSION: + # upgrade.upgrade(self.repo, version) def disk_version(self, path=None, use_none_vcs=False, for_duplicate_bugdir=False): @@ -786,39 +894,17 @@ os.listdir(self.get_path("bugs")): if self.sync_with_disk == False: raise DiskAccessRequired("set version") self.vcs.mkdir(self.get_path()) - self.vcs.set_file_contents(self.get_path("version"), - upgrade.BUGDIR_DISK_VERSION+"\n") + #self.vcs.set_file_contents(self.get_path("version"), + # upgrade.BUGDIR_DISK_VERSION+"\n") + - if libbe.TESTING == True: - def setup_vcs_test_fixtures(testcase): - """Set up test fixtures for VCS test case.""" - testcase.vcs = testcase.Class() - testcase.dir = Dir() - testcase.dirname = testcase.dir.path - - vcs_not_supporting_uninitialized_user_id = [] - vcs_not_supporting_set_user_id = ["None", "hg"] - testcase.vcs_supports_uninitialized_user_id = ( - testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id) - testcase.vcs_supports_set_user_id = ( - testcase.vcs.name not in vcs_not_supporting_set_user_id) - - if not testcase.vcs.installed(): - testcase.fail( - "%(name)s VCS not found" % vars(testcase.Class)) - - if testcase.Class.name != "None": - testcase.failIf( - testcase.vcs.detect(testcase.dirname), - "Detected %(name)s VCS before initialising" - % vars(testcase.Class)) - - testcase.vcs.init(testcase.dirname) - - class VCSTestCase(unittest.TestCase): - """Test cases for base VCS class.""" + class VCSTestCase (unittest.TestCase): + """ + Test cases for base VCS class (in addition to the Storage test + cases). + """ Class = VCS @@ -827,271 +913,102 @@ if libbe.TESTING == True: self.dirname = None def setUp(self): + """Set up test fixtures for Storage test case.""" super(VCSTestCase, self).setUp() - setup_vcs_test_fixtures(self) + self.dir = Dir() + self.dirname = self.dir.path + self.s = self.Class(repo=self.dirname) + if self.s.installed() == True: + self.s.init() + self.s.connect() def tearDown(self): - self.vcs.cleanup() - self.dir.cleanup() super(VCSTestCase, self).tearDown() + if self.s.installed() == True: + self.s.disconnect() + self.s.destroy() + self.dir.cleanup() - def full_path(self, rel_path): - return os.path.join(self.dirname, rel_path) - + def test_installed(self): + """ + See if the VCS is installed. + """ + self.failUnless(self.s.installed() == True, + '%(name)s VCS not found' % vars(self.Class)) - class VCS_init_TestCase(VCSTestCase): - """Test cases for VCS.init method.""" - def test_detect_should_succeed_after_init(self): - """Should detect VCS in directory after initialization.""" - self.failUnless( - self.vcs.detect(self.dirname), - "Did not detect %(name)s VCS after initialising" + class VCS_detection_TestCase (VCSTestCase): + def test_detection(self): + """ + See if the VCS detects its installed repository + """ + if self.s.installed(): + self.s.disconnect() + self.failUnless(self.s._detect(self.dirname) == True, + 'Did not detected %(name)s VCS after initialising' % vars(self.Class)) + self.s.connect() - def test_vcs_rootdir_in_specified_root_path(self): + def test_no_detection(self): + """ + See if the VCS detects its installed repository + """ + if self.s.installed() and self.Class.name != 'None': + self.s.disconnect() + self.s.destroy() + self.failUnless(self.s._detect(self.dirname) == False, + 'Detected %(name)s VCS before initialising' + % self.Class) + self.s.init() + self.s.connect() + + def test_vcs_repo_in_specified_root_path(self): """VCS root directory should be in specified root path.""" - rp = os.path.realpath(self.vcs.rootdir) + rp = os.path.realpath(self.s.repo) dp = os.path.realpath(self.dirname) vcs_name = self.Class.name self.failUnless( dp == rp or rp == None, "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars()) - class VCS_get_user_id_TestCase(VCSTestCase): """Test cases for VCS.get_user_id method.""" def test_gets_existing_user_id(self): """Should get the existing user ID.""" - if not self.vcs_supports_uninitialized_user_id: - return - - user_id = self.vcs.get_user_id() - self.failUnless( - user_id is not None, - "unable to get a user id") - - - class VCS_set_user_id_TestCase(VCSTestCase): - """Test cases for VCS.set_user_id method.""" - - def setUp(self): - super(VCS_set_user_id_TestCase, self).setUp() - - if self.vcs_supports_uninitialized_user_id: - self.prev_user_id = self.vcs.get_user_id() - else: - self.prev_user_id = "Uninitialized identity <bogus@example.org>" - - if self.vcs_supports_set_user_id: - self.test_new_user_id = "John Doe <jdoe@example.com>" - self.vcs.set_user_id(self.test_new_user_id) - - def tearDown(self): - if self.vcs_supports_set_user_id: - self.vcs.set_user_id(self.prev_user_id) - super(VCS_set_user_id_TestCase, self).tearDown() - - def test_raises_error_in_unsupported_vcs(self): - """Should raise an error in a VCS that doesn't support it.""" - if self.vcs_supports_set_user_id: + user_id = self.s.get_user_id() + if user_id == None: return - self.assertRaises( - SettingIDnotSupported, - self.vcs.set_user_id, "foo") - - def test_updates_user_id_in_supporting_vcs(self): - """Should update the user ID in an VCS that supports it.""" - if not self.vcs_supports_set_user_id: - return - user_id = self.vcs.get_user_id() - self.failUnlessEqual( - self.test_new_user_id, user_id, - "user id not set correctly (expected %s, got %s)" - % (self.test_new_user_id, user_id)) - - - def setup_vcs_revision_test_fixtures(testcase): - """Set up revision test fixtures for VCS test case.""" - testcase.test_dirs = ['a', 'a/b', 'c'] - for path in testcase.test_dirs: - testcase.vcs.mkdir(testcase.full_path(path)) - - testcase.test_files = ['a/text', 'a/b/text'] - - testcase.test_contents = { - 'rev_1': "Lorem ipsum", - 'uncommitted': "dolor sit amet", - } - - - class VCS_mkdir_TestCase(VCSTestCase): - """Test cases for VCS.mkdir method.""" - - def setUp(self): - super(VCS_mkdir_TestCase, self).setUp() - setup_vcs_revision_test_fixtures(self) - - def tearDown(self): - for path in reversed(sorted(self.test_dirs)): - self.vcs.recursive_remove(self.full_path(path)) - super(VCS_mkdir_TestCase, self).tearDown() - - def test_mkdir_creates_directory(self): - """Should create specified directory in filesystem.""" - for path in self.test_dirs: - full_path = self.full_path(path) - self.failUnless( - os.path.exists(full_path), - "path %(full_path)s does not exist" % vars()) - - - class VCS_commit_TestCase(VCSTestCase): - """Test cases for VCS.commit method.""" - - def setUp(self): - super(VCS_commit_TestCase, self).setUp() - setup_vcs_revision_test_fixtures(self) - - def tearDown(self): - for path in reversed(sorted(self.test_dirs)): - self.vcs.recursive_remove(self.full_path(path)) - super(VCS_commit_TestCase, self).tearDown() - - def test_file_contents_as_specified(self): - """Should set file contents as specified.""" - test_contents = self.test_contents['rev_1'] - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents(full_path, test_contents) - current_contents = self.vcs.get_file_contents(full_path) - self.failUnlessEqual(test_contents, current_contents) - - def test_file_contents_as_committed(self): - """Should have file contents as specified after commit.""" - test_contents = self.test_contents['rev_1'] - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents(full_path, test_contents) - revision = self.vcs.commit("Initial file contents.") - current_contents = self.vcs.get_file_contents(full_path) - self.failUnlessEqual(test_contents, current_contents) - - def test_file_contents_as_set_when_uncommitted(self): - """Should set file contents as specified after commit.""" - if not self.vcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.vcs.commit("Initial file contents.") - self.vcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - current_contents = self.vcs.get_file_contents(full_path) - self.failUnlessEqual( - self.test_contents['uncommitted'], current_contents) - - def test_revision_file_contents_as_committed(self): - """Should get file contents as committed to specified revision.""" - if not self.vcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.vcs.commit("Initial file contents.") - self.vcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - committed_contents = self.vcs.get_file_contents( - full_path, revision) - self.failUnlessEqual( - self.test_contents['rev_1'], committed_contents) - - def test_revision_id_as_committed(self): - """Check for compatibility between .commit() and .revision_id()""" - if not self.vcs.versioned: - self.failUnlessEqual(self.vcs.revision_id(5), None) - return - committed_revisions = [] - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.vcs.commit("Initial %s contents." % path) - committed_revisions.append(revision) - self.vcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - revision = self.vcs.commit("Altered %s contents." % path) - committed_revisions.append(revision) - for i,revision in enumerate(committed_revisions): - self.failUnlessEqual(self.vcs.revision_id(i), revision) - i += -len(committed_revisions) # check negative indices - self.failUnlessEqual(self.vcs.revision_id(i), revision) - i = len(committed_revisions) - self.failUnlessEqual(self.vcs.revision_id(i), None) - self.failUnlessEqual(self.vcs.revision_id(-i-1), None) - - def test_revision_id_as_committed(self): - """Check revision id before first commit""" - if not self.vcs.versioned: - self.failUnlessEqual(self.vcs.revision_id(5), None) - return - committed_revisions = [] - for path in self.test_files: - self.failUnlessEqual(self.vcs.revision_id(0), None) - - - class VCS_duplicate_repo_TestCase(VCSTestCase): - """Test cases for VCS.duplicate_repo method.""" - - def setUp(self): - super(VCS_duplicate_repo_TestCase, self).setUp() - setup_vcs_revision_test_fixtures(self) - - def tearDown(self): - self.vcs.remove_duplicate_repo() - for path in reversed(sorted(self.test_dirs)): - self.vcs.recursive_remove(self.full_path(path)) - super(VCS_duplicate_repo_TestCase, self).tearDown() - - def test_revision_file_contents_as_committed(self): - """Should match file contents as committed to specified revision. - """ - if not self.vcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.vcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.vcs.commit("Commit current status") - self.vcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - dup_repo_path = self.vcs.duplicate_repo(revision) - dup_file_path = os.path.join(dup_repo_path, path) - dup_file_contents = file(dup_file_path, 'rb').read() - self.failUnlessEqual( - self.test_contents['rev_1'], dup_file_contents) - self.vcs.remove_duplicate_repo() - - - def make_vcs_testcase_subclasses(vcs_class, namespace): - """Make VCSTestCase subclasses for vcs_class in the namespace.""" - vcs_testcase_classes = [ - c for c in ( - ob for ob in globals().values() if isinstance(ob, type)) - if issubclass(c, VCSTestCase)] - - for base_class in vcs_testcase_classes: - testcase_class_name = vcs_class.__name__ + base_class.__name__ - testcase_class_bases = (base_class,) - testcase_class_dict = dict(base_class.__dict__) - testcase_class_dict['Class'] = vcs_class - testcase_class = type( - testcase_class_name, testcase_class_bases, testcase_class_dict) - setattr(namespace, testcase_class_name, testcase_class) - + name,email = libbe.ui.util.user.parse_user_id(user_id) + if email != None: + self.failUnless('@' in email, email) + + def make_vcs_testcase_subclasses(storage_class, namespace): + c = storage_class() + if c.versioned == True: + libbe.storage.base.make_versioned_storage_testcase_subclasses( + storage_class, namespace) + else: + libbe.storage.base.make_storage_testcase_subclasses( + storage_class, namespace) + + if namespace != sys.modules[__name__]: + # Make VCSTestCase subclasses for vcs_class in the namespace. + vcs_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, VCSTestCase)] + + for base_class in vcs_testcase_classes: + testcase_class_name = vcs_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = vcs_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + make_vcs_testcase_subclasses(VCS, sys.modules[__name__]) unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/ui/command_line.py b/libbe/ui/command_line.py index e84d32a..b0b52af 100755 --- a/libbe/ui/command_line.py +++ b/libbe/ui/command_line.py @@ -27,6 +27,7 @@ import os import sys import libbe +import libbe.bugdir import libbe.command import libbe.command.util import libbe.version @@ -52,11 +53,14 @@ class CmdOptionParser(optparse.OptionParser): option.validate() self._option_by_name[option.name] = option opt_strings = ['--'+option.name] + dest = option.name.replace('-', '_') + assert '_' not in option.name, \ + 'Non-reconstructable option name %s' % option.name if option.short_name != None: opt_strings.append('-'+option.short_name) if option.arg == None: # a callback option opt = optparse.Option( - *opt_strings, action='callback', + *opt_strings, action='callback', dest=dest, callback=self.callback, help=option.help) else: kwargs = {} @@ -68,7 +72,7 @@ class CmdOptionParser(optparse.OptionParser): opt = optparse.Option( *opt_strings, metavar=option.arg.metavar, default=option.arg.default, action=action, - help=option.help, **kwargs) + dest=dest, help=option.help, **kwargs) opt._option = option self.add_option(opt) @@ -76,7 +80,11 @@ class CmdOptionParser(optparse.OptionParser): args = self._get_args(args) options,parsed_args = optparse.OptionParser.parse_args( self, args=args, values=values) - for name,value in options.__dict__.items(): + options = options.__dict__ + for name,value in options.items(): + if '_' in name: # reconstruct original option name + options[name.replace('_', '-')] = options.pop(name) + for name,value in options.items(): if value == '--complete': argument = None option = self._option_by_name[name] @@ -231,9 +239,9 @@ def main(): return 1 paginate = 'auto' - if options.paginate == True: + if options['paginate'] == True: paginate = 'always' - if options.no_pager== True: + if options['no-pager'] == True: paginate = 'never' libbe.ui.util.pager.run_pager(paginate) @@ -249,7 +257,7 @@ def main(): if command.requires_bugdir == True: storage = libbe.storage.get_storage(options['repo']) storage.connect() - bugdir = BugDir(storage) + bugdir = libbe.bugdir.BugDir(storage, from_storage=True) else: storage = None bugdir = None diff --git a/libbe/ui/util/user.py b/libbe/ui/util/user.py index de2138c..64eb30c 100644 --- a/libbe/ui/util/user.py +++ b/libbe/ui/util/user.py @@ -7,6 +7,10 @@ example, Note that the Arch VCS backend *enforces* ids with this format. """ +import re +from socket import gethostname + +import libbe import libbe.storage.util.config def _get_fallback_username(self): @@ -23,11 +27,11 @@ def _get_fallback_email(self): name = _get_fallback_username() return "%s@%s" % (name, hostname) -def create_user_id(self, name, email=None): +def create_user_id(name, email=None): """ - >>> create_id("John Doe", "jdoe@example.com") + >>> create_user_id("John Doe", "jdoe@example.com") 'John Doe <jdoe@example.com>' - >>> create_id("John Doe") + >>> create_user_id("John Doe") 'John Doe' """ assert len(name) > 0 @@ -36,14 +40,14 @@ def create_user_id(self, name, email=None): else: return "%s <%s>" % (name, email) -def parse_user_id(self, value): +def parse_user_id(value): """ - >>> parse_id("John Doe <jdoe@example.com>") + >>> parse_user_id("John Doe <jdoe@example.com>") ('John Doe', 'jdoe@example.com') - >>> parse_id("John Doe") + >>> parse_user_id("John Doe") ('John Doe', None) >>> try: - ... parse_id("John Doe <jdoe@example.com><what?>") + ... parse_user_id("John Doe <jdoe@example.com><what?>") ... except AssertionError: ... print "Invalid match" Invalid match diff --git a/libbe/util/id.py b/libbe/util/id.py index ab62359..645a17c 100644 --- a/libbe/util/id.py +++ b/libbe/util/id.py @@ -206,7 +206,7 @@ def child_uuids(child_storage_ids): ['123abc', '123def'] """ for id in child_storage_ids: - fields = libbe.util.id._split(id) + fields = _split(id) if len(fields) == 1: yield fields[0] |