aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2009-12-31 15:54:12 -0500
committerW. Trevor King <wking@drexel.edu>2009-12-31 15:54:12 -0500
commitb0b5341c4045dd27cfbb3e2585cb2614ed9ad903 (patch)
tree37c7c2d011617ccd7a6f28a24ea77bb1b3cddfe7 /libbe/storage
parenta06030436d3940dddfba37b344f90651366d67e1 (diff)
parent2d1562d951e763fed71fe60e77cc9921be9abdc9 (diff)
downloadbugseverywhere-b0b5341c4045dd27cfbb3e2585cb2614ed9ad903.tar.gz
Merged be.restructure, major internal reorganization.
Added a bunch of classes to make the commands, user interfaces, and storage backends more abstract and distinct. This should make it much easier to extend and maintain BE. Features: * Directory restructured: becommands/ -> libbe/commands submods sorted by functionality. * Lots of new classes: Option, Argument, Command InputOutput, StorageCallbacks, UserInterface Storage * Consolidated ID handling in libbe.util.id * Transitioned VCS backends for Python-based VCSs from subprocess calss to internal python calls. Plus the user-visible changes: * New bugdir/bug/comment ID format replaces old bug:comment format. * Deprecated support for `be diff` on Arch and Darcs <= 2.3.1. A new backend abstraction (Storage) makes the former implementation ungainly. * Improved command completion. * Removed commands close, open, email_bugs, * Flipped some arguments `be assign BUG-ID [ASSIGNEE]` -> `be status ASSIGNED BUG-ID ...` `be severity BUG-ID SEVERITY` -> `be severity SEVERITY BUG-ID ...` `be status BUG-ID STATUS` -> `be status STATUS BUG-ID ...` In the merge: * Added 'commit' to list of pagerless commands. * Updated doc/README.dev See #bea86499-824e-4e77-b085-2d581fa9ccab/1100c966-9671-4bc6-8b68-6d408a910da1# for a discussion of why the changes were made and some of the difficulties en-route.
Diffstat (limited to 'libbe/storage')
-rw-r--r--libbe/storage/__init__.py37
-rw-r--r--libbe/storage/base.py909
-rw-r--r--libbe/storage/util/__init__.py0
-rw-r--r--libbe/storage/util/config.py93
-rw-r--r--libbe/storage/util/mapfile.py130
-rw-r--r--libbe/storage/util/properties.py642
-rw-r--r--libbe/storage/util/settings_object.py432
-rw-r--r--libbe/storage/util/upgrade.py331
-rw-r--r--libbe/storage/vcs/__init__.py10
-rw-r--r--libbe/storage/vcs/arch.py350
-rw-r--r--libbe/storage/vcs/base.py1106
-rw-r--r--libbe/storage/vcs/bzr.py196
-rw-r--r--libbe/storage/vcs/darcs.py288
-rw-r--r--libbe/storage/vcs/git.py183
-rw-r--r--libbe/storage/vcs/hg.py180
15 files changed, 4887 insertions, 0 deletions
diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py
new file mode 100644
index 0000000..e99f799
--- /dev/null
+++ b/libbe/storage/__init__.py
@@ -0,0 +1,37 @@
+# Copyright
+
+import base
+
+ConnectionError = base.ConnectionError
+InvalidStorageVersion = base.InvalidStorageVersion
+InvalidID = base.InvalidID
+InvalidRevision = base.InvalidRevision
+InvalidDirectory = base.InvalidDirectory
+NotWriteable = base.NotWriteable
+NotReadable = base.NotReadable
+EmptyCommit = base.EmptyCommit
+
+# a list of all past versions
+STORAGE_VERSIONS = ['Bugs Everywhere Tree 1 0',
+ 'Bugs Everywhere Directory v1.1',
+ 'Bugs Everywhere Directory v1.2',
+ 'Bugs Everywhere Directory v1.3',
+ 'Bugs Everywhere Directory v1.4',
+ ]
+
+# the current version
+STORAGE_VERSION = STORAGE_VERSIONS[-1]
+
+def get_storage(location):
+ """
+ Return a Storage instance from a repo location string.
+ """
+ import vcs
+ s = vcs.detect_vcs(location)
+ s.repo = location
+ return s
+
+__all__ = [ConnectionError, InvalidStorageVersion, InvalidID,
+ InvalidRevision, InvalidDirectory, NotWriteable, NotReadable,
+ EmptyCommit, STORAGE_VERSIONS, STORAGE_VERSION,
+ get_storage]
diff --git a/libbe/storage/base.py b/libbe/storage/base.py
new file mode 100644
index 0000000..1c711fa
--- /dev/null
+++ b/libbe/storage/base.py
@@ -0,0 +1,909 @@
+# Copyright
+
+"""
+Abstract bug repository data storage to easily support multiple backends.
+"""
+
+import copy
+import os
+import pickle
+import types
+
+from libbe.error import NotSupported
+import libbe.storage
+from libbe.util.tree import Tree
+from libbe.util import InvalidObject
+from libbe import TESTING
+
+if TESTING == True:
+ import doctest
+ import os.path
+ import sys
+ import unittest
+
+ from libbe.util.utility import Dir
+
+class ConnectionError (Exception):
+ pass
+
+class InvalidStorageVersion(ConnectionError):
+ def __init__(self, active_version, expected_version=None):
+ if expected_version == None:
+ expected_version = libbe.storage.STORAGE_VERSION
+ msg = 'Storage in "%s" not the expected "%s"' \
+ % (active_version, expected_version)
+ Exception.__init__(self, msg)
+ self.active_version = active_version
+ self.expected_version = expected_version
+
+class InvalidID (KeyError):
+ def __init__(self, id=None, revision=None, msg=None):
+ if msg == None and id != None:
+ msg = id
+ KeyError.__init__(self, msg)
+ self.id = id
+ self.revision = revision
+
+class InvalidRevision (KeyError):
+ pass
+
+class InvalidDirectory (Exception):
+ pass
+
+class DirectoryNotEmpty (InvalidDirectory):
+ pass
+
+class NotWriteable (NotSupported):
+ def __init__(self, msg):
+ NotSupported.__init__(self, 'write', msg)
+
+class NotReadable (NotSupported):
+ def __init__(self, msg):
+ NotSupported.__init__(self, 'read', msg)
+
+class EmptyCommit(Exception):
+ def __init__(self):
+ Exception.__init__(self, 'No changes to commit')
+
+
+class Entry (Tree):
+ def __init__(self, id, value=None, parent=None, directory=False,
+ children=None):
+ if children == None:
+ Tree.__init__(self)
+ else:
+ Tree.__init__(self, children)
+ self.id = id
+ self.value = value
+ self.parent = parent
+ if self.parent != None:
+ if self.parent.directory == False:
+ raise InvalidDirectory(
+ 'Non-directory %s cannot have children' % self.parent)
+ parent.append(self)
+ self.directory = directory
+
+ def __str__(self):
+ return '<Entry %s: %s>' % (self.id, self.value)
+
+ def __repr__(self):
+ return str(self)
+
+ def __cmp__(self, other, local=False):
+ if other == None:
+ return cmp(1, None)
+ if cmp(self.id, other.id) != 0:
+ return cmp(self.id, other.id)
+ if cmp(self.value, other.value) != 0:
+ return cmp(self.value, other.value)
+ if local == False:
+ if self.parent == None:
+ if cmp(self.parent, other.parent) != 0:
+ return cmp(self.parent, other.parent)
+ elif self.parent.__cmp__(other.parent, local=True) != 0:
+ return self.parent.__cmp__(other.parent, local=True)
+ for sc,oc in zip(self, other):
+ if sc.__cmp__(oc, local=True) != 0:
+ return sc.__cmp__(oc, local=True)
+ return 0
+
+ def _objects_to_ids(self):
+ if self.parent != None:
+ self.parent = self.parent.id
+ for i,c in enumerate(self):
+ self[i] = c.id
+ return self
+
+ def _ids_to_objects(self, dict):
+ if self.parent != None:
+ self.parent = dict[self.parent]
+ for i,c in enumerate(self):
+ self[i] = dict[c]
+ return self
+
+class Storage (object):
+ """
+ This class declares all the methods required by a Storage
+ interface. This implementation just keeps the data in a
+ dictionary and uses pickle for persistent storage.
+ """
+ name = 'Storage'
+
+ def __init__(self, repo='/', encoding='utf-8', options=None):
+ self.repo = repo
+ self.encoding = encoding
+ self.options = options
+ self.readable = True # soft limit (user choice)
+ self._readable = True # hard limit (backend choice)
+ self.writeable = True # soft limit (user choice)
+ self._writeable = True # hard limit (backend choice)
+ self.versioned = False
+ self.can_init = True
+ self.connected = False
+
+ def __str__(self):
+ return '<%s %s %s>' % (self.__class__.__name__, id(self), self.repo)
+
+ def __repr__(self):
+ return str(self)
+
+ def version(self):
+ """Return a version string for this backend."""
+ return '0'
+
+ def storage_version(self, revision=None):
+ """Return the storage format for this backend."""
+ return libbe.storage.STORAGE_VERSION
+
+ def is_readable(self):
+ return self.readable and self._readable
+
+ def is_writeable(self):
+ return self.writeable and self._writeable
+
+ def init(self):
+ """Create a new storage repository."""
+ if self.can_init == False:
+ raise NotSupported('init',
+ 'Cannot initialize this repository format.')
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot initialize unwriteable storage.')
+ return self._init()
+
+ def _init(self):
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
+ root = Entry(id='__ROOT__', directory=True)
+ d = {root.id:root}
+ pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1)
+ f.close()
+
+ def destroy(self):
+ """Remove the storage repository."""
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot destroy unwriteable storage.')
+ return self._destroy()
+
+ def _destroy(self):
+ os.remove(os.path.join(self.repo, 'repo.pkl'))
+
+ def connect(self):
+ """Open a connection to the repository."""
+ if self.is_readable() == False:
+ raise NotReadable('Cannot connect to unreadable storage.')
+ self._connect()
+ self.connected = True
+
+ def _connect(self):
+ try:
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'rb')
+ except IOError:
+ raise ConnectionError(self)
+ d = pickle.load(f)
+ self._data = dict((k,v._ids_to_objects(d)) for k,v in d.items())
+ f.close()
+
+ def disconnect(self):
+ """Close the connection to the repository."""
+ if self.is_writeable() == False:
+ return
+ if self.connected == False:
+ return
+ self._disconnect()
+ self.connected = False
+
+ def _disconnect(self):
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
+ pickle.dump(dict((k,v._objects_to_ids())
+ for k,v in self._data.items()), f, -1)
+ f.close()
+ self._data = None
+
+ def add(self, id, *args, **kwargs):
+ """Add an entry"""
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot add entry to unwriteable storage.')
+ try: # Maybe we've already added that id?
+ self.get(id)
+ pass # yup, no need to add another
+ except InvalidID:
+ self._add(id, *args, **kwargs)
+
+ def _add(self, id, parent=None, directory=False):
+ if parent == None:
+ parent = '__ROOT__'
+ p = self._data[parent]
+ self._data[id] = Entry(id, parent=p, directory=directory)
+
+ def remove(self, *args, **kwargs):
+ """Remove an entry."""
+ if self.is_writeable() == False:
+ raise NotSupported('write',
+ 'Cannot remove entry from unwriteable storage.')
+ self._remove(*args, **kwargs)
+
+ def _remove(self, id):
+ if self._data[id].directory == True \
+ and len(self.children(id)) > 0:
+ raise DirectoryNotEmpty(id)
+ e = self._data.pop(id)
+ e.parent.remove(e)
+
+ def recursive_remove(self, *args, **kwargs):
+ """Remove an entry and all its decendents."""
+ if self.is_writeable() == False:
+ raise NotSupported('write',
+ 'Cannot remove entries from unwriteable storage.')
+ self._recursive_remove(*args, **kwargs)
+
+ def _recursive_remove(self, id):
+ for entry in reversed(list(self._data[id].traverse())):
+ self._remove(entry.id)
+
+ def children(self, *args, **kwargs):
+ """Return a list of specified entry's children's ids."""
+ if self.is_readable() == False:
+ raise NotReadable('Cannot list children with unreadable storage.')
+ return self._children(*args, **kwargs)
+
+ def _children(self, id=None, revision=None):
+ if id == None:
+ id = '__ROOT__'
+ return [c.id for c in self._data[id] if not c.id.startswith('__')]
+
+ def get(self, *args, **kwargs):
+ """
+ Get contents of and entry as they were in a given revision.
+ revision==None specifies the current revision.
+
+ If there is no id, return default, unless default is not
+ given, in which case raise InvalidID.
+ """
+ if self.is_readable() == False:
+ raise NotReadable('Cannot get entry with unreadable storage.')
+ if 'decode' in kwargs:
+ decode = kwargs.pop('decode')
+ else:
+ decode = False
+ value = self._get(*args, **kwargs)
+ if value != None:
+ if decode == True and type(value) != types.UnicodeType:
+ return unicode(value, self.encoding)
+ elif decode == False and type(value) != types.StringType:
+ return value.encode(self.encoding)
+ return value
+
+ def _get(self, id, default=InvalidObject, revision=None):
+ if id in self._data:
+ return self._data[id].value
+ elif default == InvalidObject:
+ raise InvalidID(id)
+ return default
+
+ def set(self, id, value, *args, **kwargs):
+ """
+ Set the entry contents.
+ """
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot set entry in unwriteable storage.')
+ if type(value) == types.UnicodeType:
+ value = value.encode(self.encoding)
+ self._set(id, value, *args, **kwargs)
+
+ def _set(self, id, value):
+ if id not in self._data:
+ raise InvalidID(id)
+ if self._data[id].directory == True:
+ raise InvalidDirectory(
+ 'Directory %s cannot have data' % self.parent)
+ self._data[id].value = value
+
+class VersionedStorage (Storage):
+ """
+ This class declares all the methods required by a Storage
+ interface that supports versioning. This implementation just
+ keeps the data in a list and uses pickle for persistent
+ storage.
+ """
+ name = 'VersionedStorage'
+
+ def __init__(self, *args, **kwargs):
+ Storage.__init__(self, *args, **kwargs)
+ self.versioned = True
+
+ def _init(self):
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
+ root = Entry(id='__ROOT__', directory=True)
+ summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit')
+ body = Entry(id='__COMMIT__BODY__')
+ initial_commit = {root.id:root, summary.id:summary, body.id:body}
+ d = dict((k,v._objects_to_ids()) for k,v in initial_commit.items())
+ pickle.dump([d, copy.deepcopy(d)], f, -1) # [inital tree, working tree]
+ f.close()
+
+ def _connect(self):
+ try:
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'rb')
+ except IOError:
+ raise ConnectionError(self)
+ d = pickle.load(f)
+ self._data = [dict((k,v._ids_to_objects(t)) for k,v in t.items())
+ for t in d]
+ f.close()
+
+ def _disconnect(self):
+ f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
+ pickle.dump([dict((k,v._objects_to_ids())
+ for k,v in t.items()) for t in self._data], f, -1)
+ f.close()
+ self._data = None
+
+ def _add(self, id, parent=None, directory=False):
+ if parent == None:
+ parent = '__ROOT__'
+ p = self._data[-1][parent]
+ self._data[-1][id] = Entry(id, parent=p, directory=directory)
+
+ def _remove(self, id):
+ if self._data[-1][id].directory == True \
+ and len(self.children(id)) > 0:
+ raise DirectoryNotEmpty(id)
+ e = self._data[-1].pop(id)
+ e.parent.remove(e)
+
+ def _recursive_remove(self, id):
+ for entry in reversed(list(self._data[-1][id].traverse())):
+ self._remove(entry.id)
+
+ def _children(self, id=None, revision=None):
+ if id == None:
+ id = '__ROOT__'
+ if revision == None:
+ revision = -1
+ return [c.id for c in self._data[revision][id]
+ if not c.id.startswith('__')]
+
+ def _get(self, id, default=InvalidObject, revision=None):
+ if revision == None:
+ revision = -1
+ if id in self._data[revision]:
+ return self._data[revision][id].value
+ elif default == InvalidObject:
+ raise InvalidID(id)
+ return default
+
+ def _set(self, id, value):
+ if id not in self._data[-1]:
+ raise InvalidID(id)
+ self._data[-1][id].value = value
+
+ def commit(self, *args, **kwargs):
+ """
+ Commit the current repository, with a commit message string
+ summary and body. Return the name of the new revision.
+
+ If allow_empty == False (the default), raise EmptyCommit if
+ there are no changes to commit.
+ """
+ if self.is_writeable() == False:
+ raise NotWriteable('Cannot commit to unwriteable storage.')
+ return self._commit(*args, **kwargs)
+
+ def _commit(self, summary, body=None, allow_empty=False):
+ if self._data[-1] == self._data[-2] and allow_empty == False:
+ raise EmptyCommit
+ self._data[-1]["__COMMIT__SUMMARY__"].value = summary
+ self._data[-1]["__COMMIT__BODY__"].value = body
+ rev = len(self._data)-1
+ self._data.append(copy.deepcopy(self._data[-1]))
+ return rev
+
+ def revision_id(self, index=None):
+ """
+ Return the name of the <index>th revision. The choice of
+ which branch to follow when crossing branches/merges is not
+ defined. Revision indices start at 1; ID 0 is the blank
+ repository.
+
+ Return None if index==None.
+
+ If the specified revision does not exist, raise InvalidRevision.
+ """
+ if index == None:
+ return None
+ try:
+ if int(index) != index:
+ raise InvalidRevision(index)
+ except ValueError:
+ raise InvalidRevision(index)
+ L = len(self._data) - 1 # -1 b/c of initial commit
+ if index >= -L and index <= L:
+ return index % L
+ raise InvalidRevision(i)
+
+if TESTING == True:
+ class StorageTestCase (unittest.TestCase):
+ """Test cases for base Storage class."""
+
+ Class = Storage
+
+ def __init__(self, *args, **kwargs):
+ super(StorageTestCase, self).__init__(*args, **kwargs)
+ self.dirname = None
+
+ def setUp(self):
+ """Set up test fixtures for Storage test case."""
+ super(StorageTestCase, self).setUp()
+ self.dir = Dir()
+ self.dirname = self.dir.path
+ self.s = self.Class(repo=self.dirname)
+ self.assert_failed_connect()
+ self.s.init()
+ self.s.connect()
+
+ def tearDown(self):
+ super(StorageTestCase, self).tearDown()
+ self.s.disconnect()
+ self.s.destroy()
+ self.assert_failed_connect()
+ self.dir.cleanup()
+
+ def assert_failed_connect(self):
+ try:
+ self.s.connect()
+ self.fail(
+ "Connected to %(name)s repository before initialising"
+ % vars(self.Class))
+ except ConnectionError:
+ pass
+
+ class Storage_init_TestCase (StorageTestCase):
+ """Test cases for Storage.init method."""
+
+ def test_connect_should_succeed_after_init(self):
+ """Should connect after initialization."""
+ self.s.connect()
+
+ class Storage_connect_disconnect_TestCase (StorageTestCase):
+ """Test cases for Storage.connect and .disconnect methods."""
+
+ def test_multiple_disconnects(self):
+ """Should be able to call .disconnect multiple times."""
+ self.s.disconnect()
+ self.s.disconnect()
+
+ class Storage_add_remove_TestCase (StorageTestCase):
+ """Test cases for Storage.add, .remove, and .recursive_remove methods."""
+
+ def test_initially_empty(self):
+ """New repository should be empty."""
+ self.failUnless(len(self.s.children()) == 0, self.s.children())
+
+ def test_add_identical_rooted(self):
+ """
+ Adding entries with the same ID should not increase the number of children.
+ """
+ for i in range(10):
+ self.s.add('some id', directory=False)
+ s = sorted(self.s.children())
+ self.failUnless(s == ['some id'], s)
+
+ def test_add_rooted(self):
+ """
+ Adding entries should increase the number of children (rooted).
+ """
+ ids = []
+ for i in range(10):
+ ids.append(str(i))
+ self.s.add(ids[-1], directory=(i % 2 == 0))
+ s = sorted(self.s.children())
+ self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+
+ def test_add_nonrooted(self):
+ """
+ Adding entries should increase the number of children (nonrooted).
+ """
+ self.s.add('parent', directory=True)
+ ids = []
+ for i in range(10):
+ ids.append(str(i))
+ self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
+ s = sorted(self.s.children('parent'))
+ self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ s = self.s.children()
+ self.failUnless(s == ['parent'], s)
+
+ def test_children(self):
+ """
+ Non-UUID ids should be returned as such.
+ """
+ self.s.add('parent', directory=True)
+ ids = []
+ for i in range(10):
+ ids.append('parent/%s' % str(i))
+ self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
+ s = sorted(self.s.children('parent'))
+ self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+
+ def test_add_invalid_directory(self):
+ """
+ Should not be able to add children to non-directories.
+ """
+ self.s.add('parent', directory=False)
+ try:
+ self.s.add('child', 'parent', directory=False)
+ self.fail(
+ '%s.add() succeeded instead of raising InvalidDirectory'
+ % (vars(self.Class)['name']))
+ except InvalidDirectory:
+ pass
+ try:
+ self.s.add('child', 'parent', directory=True)
+ self.fail(
+ '%s.add() succeeded instead of raising InvalidDirectory'
+ % (vars(self.Class)['name']))
+ except InvalidDirectory:
+ pass
+ self.failUnless(len(self.s.children('parent')) == 0,
+ self.s.children('parent'))
+
+ def test_remove_rooted(self):
+ """
+ Removing entries should decrease the number of children (rooted).
+ """
+ ids = []
+ for i in range(10):
+ ids.append(str(i))
+ self.s.add(ids[-1], directory=(i % 2 == 0))
+ for i in range(10):
+ self.s.remove(ids.pop())
+ s = sorted(self.s.children())
+ self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+
+ def test_remove_nonrooted(self):
+ """
+ Removing entries should decrease the number of children (nonrooted).
+ """
+ self.s.add('parent', directory=True)
+ ids = []
+ for i in range(10):
+ ids.append(str(i))
+ self.s.add(ids[-1], 'parent', directory=False)#(i % 2 == 0))
+ for i in range(10):
+ self.s.remove(ids.pop())
+ s = sorted(self.s.children('parent'))
+ self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ if len(s) > 0:
+ s = self.s.children()
+ self.failUnless(s == ['parent'], s)
+
+ def test_remove_directory_not_empty(self):
+ """
+ Removing a non-empty directory entry should raise exception.
+ """
+ self.s.add('parent', directory=True)
+ ids = []
+ for i in range(10):
+ ids.append(str(i))
+ self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
+ self.s.remove(ids.pop()) # empty directory removal succeeds
+ try:
+ self.s.remove('parent') # empty directory removal succeeds
+ self.fail(
+ "%s.remove() didn't raise DirectoryNotEmpty"
+ % (vars(self.Class)['name']))
+ except DirectoryNotEmpty:
+ pass
+
+ def test_recursive_remove(self):
+ """
+ Recursive remove should empty the tree.
+ """
+ self.s.add('parent', directory=True)
+ ids = []
+ for i in range(10):
+ ids.append(str(i))
+ self.s.add(ids[-1], 'parent', directory=True)
+ for j in range(10): # add some grandkids
+ self.s.add(str(20*(i+1)+j), ids[-1], directory=(i%2 == 0))
+ self.s.recursive_remove('parent')
+ s = sorted(self.s.children())
+ self.failUnless(s == [], s)
+
+ class Storage_get_set_TestCase (StorageTestCase):
+ """Test cases for Storage.get and .set methods."""
+
+ id = 'unlikely id'
+ val = 'unlikely value'
+
+ def test_get_default(self):
+ """
+ Get should return specified default if id not in Storage.
+ """
+ ret = self.s.get(self.id, default=self.val)
+ self.failUnless(ret == self.val,
+ "%s.get() returned %s not %s"
+ % (vars(self.Class)['name'], ret, self.val))
+
+ def test_get_default_exception(self):
+ """
+ Get should raise exception if id not in Storage and no default.
+ """
+ try:
+ ret = self.s.get(self.id)
+ self.fail(
+ "%s.get() returned %s instead of raising InvalidID"
+ % (vars(self.Class)['name'], ret))
+ except InvalidID:
+ pass
+
+ def test_get_initial_value(self):
+ """
+ Data value should be None before any value has been set.
+ """
+ self.s.add(self.id, directory=False)
+ ret = self.s.get(self.id)
+ self.failUnless(ret == None,
+ "%s.get() returned %s not None"
+ % (vars(self.Class)['name'], ret))
+
+ def test_set_exception(self):
+ """
+ Set should raise exception if id not in Storage.
+ """
+ try:
+ self.s.set(self.id, self.val)
+ self.fail(
+ "%(name)s.set() did not raise InvalidID"
+ % vars(self.Class))
+ except InvalidID:
+ pass
+
+ def test_set(self):
+ """
+ Set should define the value returned by get.
+ """
+ self.s.add(self.id, directory=False)
+ self.s.set(self.id, self.val)
+ ret = self.s.get(self.id)
+ self.failUnless(ret == self.val,
+ "%s.get() returned %s not %s"
+ % (vars(self.Class)['name'], ret, self.val))
+
+ def test_unicode_set(self):
+ """
+ Set should define the value returned by get.
+ """
+ val = u'Fran\xe7ois'
+ self.s.add(self.id, directory=False)
+ self.s.set(self.id, val)
+ ret = self.s.get(self.id, decode=True)
+ self.failUnless(type(ret) == types.UnicodeType,
+ "%s.get() returned %s not UnicodeType"
+ % (vars(self.Class)['name'], type(ret)))
+ self.failUnless(ret == val,
+ "%s.get() returned %s not %s"
+ % (vars(self.Class)['name'], ret, self.val))
+ ret = self.s.get(self.id)
+ self.failUnless(type(ret) == types.StringType,
+ "%s.get() returned %s not StringType"
+ % (vars(self.Class)['name'], type(ret)))
+ s = unicode(ret, self.s.encoding)
+ self.failUnless(s == val,
+ "%s.get() returned %s not %s"
+ % (vars(self.Class)['name'], s, self.val))
+
+
+ class Storage_persistence_TestCase (StorageTestCase):
+ """Test cases for Storage.disconnect and .connect methods."""
+
+ id = 'unlikely id'
+ val = 'unlikely value'
+
+ def test_get_set_persistence(self):
+ """
+ Set should define the value returned by get after reconnect.
+ """
+ self.s.add(self.id, directory=False)
+ self.s.set(self.id, self.val)
+ self.s.disconnect()
+ self.s.connect()
+ ret = self.s.get(self.id)
+ self.failUnless(ret == self.val,
+ "%s.get() returned %s not %s"
+ % (vars(self.Class)['name'], ret, self.val))
+
+ def test_add_nonrooted_persistence(self):
+ """
+ Adding entries should increase the number of children after reconnect.
+ """
+ self.s.add('parent', directory=True)
+ ids = []
+ for i in range(10):
+ ids.append(str(i))
+ self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
+ self.s.disconnect()
+ self.s.connect()
+ s = sorted(self.s.children('parent'))
+ self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ s = self.s.children()
+ self.failUnless(s == ['parent'], s)
+
+ class VersionedStorageTestCase (StorageTestCase):
+ """Test cases for base VersionedStorage class."""
+
+ Class = VersionedStorage
+
+ class VersionedStorage_commit_TestCase (VersionedStorageTestCase):
+ """Test cases for VersionedStorage methods."""
+
+ id = 'unlikely id'
+ val = 'Some value'
+ commit_msg = 'Committing something interesting'
+ commit_body = 'Some\nlonger\ndescription\n'
+
+ def _setup_for_empty_commit(self):
+ """
+ Initialization might add some files to version control, so
+ commit those first, before testing the empty commit
+ functionality.
+ """
+ try:
+ self.s.commit('Added initialization files')
+ except EmptyCommit:
+ pass
+
+ def test_revision_id_exception(self):
+ """
+ Invalid revision id should raise InvalidRevision.
+ """
+ try:
+ rev = self.s.revision_id('highly unlikely revision id')
+ self.fail(
+ "%s.revision_id() didn't raise InvalidRevision, returned %s."
+ % (vars(self.Class)['name'], rev))
+ except InvalidRevision:
+ pass
+
+ def test_empty_commit_raises_exception(self):
+ """
+ Empty commit should raise exception.
+ """
+ self._setup_for_empty_commit()
+ try:
+ self.s.commit(self.commit_msg, self.commit_body)
+ self.fail(
+ "Empty %(name)s.commit() didn't raise EmptyCommit."
+ % vars(self.Class))
+ except EmptyCommit:
+ pass
+
+ def test_empty_commit_allowed(self):
+ """
+ Empty commit should _not_ raise exception if allow_empty=True.
+ """
+ self._setup_for_empty_commit()
+ self.s.commit(self.commit_msg, self.commit_body,
+ allow_empty=True)
+
+ def test_commit_revision_ids(self):
+ """
+ Commit / revision_id should agree on revision ids.
+ """
+ def val(i):
+ return '%s:%d' % (self.val, i+1)
+ self.s.add(self.id, directory=False)
+ revs = []
+ for i in range(10):
+ self.s.set(self.id, val(i))
+ revs.append(self.s.commit('%s: %d' % (self.commit_msg, i),
+ self.commit_body))
+ for i in range(10):
+ rev = self.s.revision_id(i+1)
+ self.failUnless(rev == revs[i],
+ "%s.revision_id(%d) returned %s not %s"
+ % (vars(self.Class)['name'], i+1, rev, revs[i]))
+ for i in range(-1, -9, -1):
+ rev = self.s.revision_id(i)
+ self.failUnless(rev == revs[i],
+ "%s.revision_id(%d) returned %s not %s"
+ % (vars(self.Class)['name'], i, rev, revs[i]))
+
+ def test_get_previous_version(self):
+ """
+ Get should be able to return the previous version.
+ """
+ def val(i):
+ return '%s:%d' % (self.val, i+1)
+ self.s.add(self.id, directory=False)
+ revs = []
+ for i in range(10):
+ self.s.set(self.id, val(i))
+ revs.append(self.s.commit('%s: %d' % (self.commit_msg, i),
+ self.commit_body))
+ for i in range(10):
+ ret = self.s.get(self.id, revision=revs[i])
+ self.failUnless(ret == val(i),
+ "%s.get() returned %s not %s for revision %s"
+ % (vars(self.Class)['name'], ret, val(i), revs[i]))
+
+ def test_get_previous_children(self):
+ """
+ Children list should be revision dependent.
+ """
+ self.s.add('parent', directory=True)
+ revs = []
+ cur_children = []
+ children = []
+ for i in range(10):
+ new_child = str(i)
+ self.s.add(new_child, 'parent', directory=(i % 2 == 0))
+ self.s.set(new_child, self.val)
+ revs.append(self.s.commit('%s: %d' % (self.commit_msg, i),
+ self.commit_body))
+ cur_children.append(new_child)
+ children.append(list(cur_children))
+ for i in range(10):
+ ret = self.s.children('parent', revision=revs[i])
+ self.failUnless(ret == children[i],
+ "%s.get() returned %s not %s for revision %s"
+ % (vars(self.Class)['name'], ret,
+ children[i], revs[i]))
+
+ def make_storage_testcase_subclasses(storage_class, namespace):
+ """Make StorageTestCase subclasses for storage_class in namespace."""
+ storage_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, StorageTestCase) \
+ and c.Class == Storage]
+
+ for base_class in storage_testcase_classes:
+ testcase_class_name = storage_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = storage_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+
+ def make_versioned_storage_testcase_subclasses(storage_class, namespace):
+ """Make VersionedStorageTestCase subclasses for storage_class in namespace."""
+ storage_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, StorageTestCase) \
+ and c.Class == Storage]
+
+ for base_class in storage_testcase_classes:
+ testcase_class_name = storage_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = storage_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+
+ make_storage_testcase_subclasses(VersionedStorage, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/util/__init__.py b/libbe/storage/util/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/libbe/storage/util/__init__.py
diff --git a/libbe/storage/util/config.py b/libbe/storage/util/config.py
new file mode 100644
index 0000000..9f95d14
--- /dev/null
+++ b/libbe/storage/util/config.py
@@ -0,0 +1,93 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Create, save, and load the per-user config file at path().
+"""
+
+import ConfigParser
+import codecs
+import os.path
+
+import libbe
+import libbe.util.encoding
+if libbe.TESTING == True:
+ import doctest
+
+
+default_encoding = libbe.util.encoding.get_filesystem_encoding()
+
+def path():
+ """Return the path to the per-user config file"""
+ return os.path.expanduser("~/.bugs_everywhere")
+
+def set_val(name, value, section="DEFAULT", encoding=None):
+ """Set a value in the per-user config file
+
+ :param name: The name of the value to set
+ :param value: The new value to set (or None to delete the value)
+ :param section: The section to store the name/value in
+ """
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ if os.path.exists(path()) == False: # touch file or config
+ open(path(), 'w').close() # read chokes on missing file
+ f = codecs.open(path(), 'r', encoding)
+ config.readfp(f, path())
+ f.close()
+ if value is not None:
+ config.set(section, name, value)
+ else:
+ config.remove_option(section, name)
+ f = codecs.open(path(), 'w', encoding)
+ config.write(f)
+ f.close()
+
+def get_val(name, section="DEFAULT", default=None, encoding=None):
+ """
+ Get a value from the per-user config file
+
+ :param name: The name of the value to get
+ :section: The section that the name is in
+ :return: The value, or None
+ >>> get_val("junk") is None
+ True
+ >>> set_val("junk", "random")
+ >>> get_val("junk")
+ u'random'
+ >>> set_val("junk", None)
+ >>> get_val("junk") is None
+ True
+ """
+ if os.path.exists(path()):
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ f = codecs.open(path(), 'r', encoding)
+ config.readfp(f, path())
+ f.close()
+ try:
+ return config.get(section, name)
+ except ConfigParser.NoOptionError:
+ return default
+ else:
+ return default
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/storage/util/mapfile.py b/libbe/storage/util/mapfile.py
new file mode 100644
index 0000000..35ae1a0
--- /dev/null
+++ b/libbe/storage/util/mapfile.py
@@ -0,0 +1,130 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Provide a means of saving and loading dictionaries of parameters. The
+saved "mapfiles" should be clear, flat-text files, and allow easy merging of
+independent/conflicting changes.
+"""
+
+import errno
+import os.path
+import types
+import yaml
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+
+class IllegalKey(Exception):
+ def __init__(self, key):
+ Exception.__init__(self, 'Illegal key "%s"' % key)
+ self.key = key
+
+class IllegalValue(Exception):
+ def __init__(self, value):
+ Exception.__init__(self, 'Illegal value "%s"' % value)
+ self.value = value
+
+class InvalidMapfileContents(Exception):
+ def __init__(self, contents):
+ Exception.__init__(self, 'Invalid YAML contents')
+ self.contents = contents
+
+def generate(map):
+ """Generate a YAML mapfile content string.
+ >>> generate({'q':'p'})
+ 'q: p\\n\\n'
+ >>> generate({'q':u'Fran\u00e7ais'})
+ 'q: Fran\\xc3\\xa7ais\\n\\n'
+ >>> generate({'q':u'hello'})
+ 'q: hello\\n\\n'
+ >>> generate({'q=':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q="
+ >>> generate({'q:':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q:"
+ >>> generate({'q\\n':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q\\n"
+ >>> generate({'':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ""
+ >>> generate({'>q':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ">q"
+ >>> generate({'q':'p\\n'})
+ Traceback (most recent call last):
+ IllegalValue: Illegal value "p\\n"
+ """
+ keys = map.keys()
+ keys.sort()
+ for key in keys:
+ try:
+ assert not key.startswith('>')
+ assert('\n' not in key)
+ assert('=' not in key)
+ assert(':' not in key)
+ assert(len(key) > 0)
+ except AssertionError:
+ raise IllegalKey(unicode(key).encode('unicode_escape'))
+ if '\n' in map[key]:
+ raise IllegalValue(unicode(map[key]).encode('unicode_escape'))
+
+ lines = []
+ for key in keys:
+ lines.append(yaml.safe_dump({key: map[key]},
+ default_flow_style=False,
+ allow_unicode=True))
+ lines.append('')
+ return '\n'.join(lines)
+
+def parse(contents):
+ """
+ Parse a YAML mapfile string.
+ >>> parse('q: p\\n\\n')['q']
+ 'p'
+ >>> parse('q: \\'p\\'\\n\\n')['q']
+ 'p'
+ >>> contents = generate({'a':'b', 'c':'d', 'e':'f'})
+ >>> dict = parse(contents)
+ >>> dict['a']
+ 'b'
+ >>> dict['c']
+ 'd'
+ >>> dict['e']
+ 'f'
+ >>> contents = generate({'q':u'Fran\u00e7ais'})
+ >>> dict = parse(contents)
+ >>> dict['q']
+ u'Fran\\xe7ais'
+ >>> dict = parse('a!')
+ Traceback (most recent call last):
+ ...
+ InvalidMapfileContents: Invalid YAML contents
+ """
+ c = yaml.load(contents)
+ if type(c) == types.StringType:
+ raise InvalidMapfileContents(
+ 'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents)
+ return c or {}
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py
new file mode 100644
index 0000000..ddd7b25
--- /dev/null
+++ b/libbe/storage/util/properties.py
@@ -0,0 +1,642 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+This module provides a series of useful decorators for defining
+various types of properties. For example usage, consider the
+unittests at the end of the module.
+
+See
+ http://www.python.org/dev/peps/pep-0318/
+and
+ http://www.phyast.pitt.edu/~micheles/python/documentation.html
+for more information on decorators.
+"""
+
+import copy
+import types
+
+import libbe
+if libbe.TESTING == True:
+ import unittest
+
+
+class ValueCheckError (ValueError):
+ def __init__(self, name, value, allowed):
+ action = "in" # some list of allowed values
+ if type(allowed) == types.FunctionType:
+ action = "allowed by" # some allowed-value check function
+ msg = "%s not %s %s for %s" % (value, action, allowed, name)
+ ValueError.__init__(self, msg)
+ self.name = name
+ self.value = value
+ self.allowed = allowed
+
+def Property(funcs):
+ """
+ End a chain of property decorators, returning a property.
+ """
+ args = {}
+ args["fget"] = funcs.get("fget", None)
+ args["fset"] = funcs.get("fset", None)
+ args["fdel"] = funcs.get("fdel", None)
+ args["doc"] = funcs.get("doc", None)
+
+ #print "Creating a property with"
+ #for key, val in args.items(): print key, value
+ return property(**args)
+
+def doc_property(doc=None):
+ """
+ Add a docstring to a chain of property decorators.
+ """
+ def decorator(funcs=None):
+ """
+ Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...}
+ or a function fn() returning such a dict.
+ """
+ if hasattr(funcs, "__call__"):
+ funcs = funcs() # convert from function-arg to dict
+ funcs["doc"] = doc
+ return funcs
+ return decorator
+
+def local_property(name, null=None, mutable_null=False):
+ """
+ Define get/set access to per-parent-instance local storage. Uses
+ ._<name>_value to store the value for a particular owner instance.
+ If the ._<name>_value attribute does not exist, returns null.
+
+ If mutable_null == True, we only release deepcopies of the null to
+ the outside world.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ if mutable_null == True:
+ ret_null = copy.deepcopy(null)
+ else:
+ ret_null = null
+ value = getattr(self, "_%s_value" % name, ret_null)
+ return value
+ def _fset(self, value):
+ setattr(self, "_%s_value" % name, value)
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+def settings_property(name, null=None):
+ """
+ Similar to local_property, except where local_property stores the
+ value in instance._<name>_value, settings_property stores the
+ value in instance.settings[name].
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ value = self.settings.get(name, null)
+ return value
+ def _fset(self, value):
+ self.settings[name] = value
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+
+# Allow comparison and caching with _original_ values for mutables,
+# since
+#
+# >>> a = []
+# >>> b = a
+# >>> b.append(1)
+# >>> a
+# [1]
+# >>> a==b
+# True
+def _hash_mutable_value(value):
+ return repr(value)
+def _init_mutable_property_cache(self):
+ if not hasattr(self, "_mutable_property_cache_hash"):
+ # first call to _fget for any mutable property
+ self._mutable_property_cache_hash = {}
+ self._mutable_property_cache_copy = {}
+def _set_cached_mutable_property(self, cacher_name, property_name, value):
+ _init_mutable_property_cache(self)
+ self._mutable_property_cache_hash[(cacher_name, property_name)] = \
+ _hash_mutable_value(value)
+ self._mutable_property_cache_copy[(cacher_name, property_name)] = \
+ copy.deepcopy(value)
+def _get_cached_mutable_property(self, cacher_name, property_name, default=None):
+ _init_mutable_property_cache(self)
+ if (cacher_name, property_name) not in self._mutable_property_cache_copy:
+ return default
+ return self._mutable_property_cache_copy[(cacher_name, property_name)]
+def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None):
+ _init_mutable_property_cache(self)
+ if (cacher_name, property_name) not in self._mutable_property_cache_hash:
+ _set_cached_mutable_property(self, cacher_name, property_name, default)
+ old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)]
+ return cmp(_hash_mutable_value(value), old_hash)
+
+
+def defaulting_property(default=None, null=None,
+ mutable_default=False):
+ """
+ Define a default value for get access to a property.
+ If the stored value is null, then default is returned.
+
+ If mutable_default == True, we only release deepcopies of the
+ default to the outside world.
+
+ null should never escape to the outside world, so don't worry
+ about it being a mutable.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value == null:
+ if mutable_default == True:
+ return copy.deepcopy(default)
+ else:
+ return default
+ return value
+ def _fset(self, value):
+ if value == default:
+ value = null
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def fn_checked_property(value_allowed_fn):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ return value
+ def _fset(self, value):
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def checked_property(allowed=[]):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ return value
+ def _fset(self, value):
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def cached_property(generator, initVal=None, mutable=False):
+ """
+ Allow caching of values generated by generator(instance), where
+ instance is the instance to which this property belongs. Uses
+ ._<name>_cache to store a cache flag for a particular owner
+ instance.
+
+ When the cache flag is True or missing and the stored value is
+ initVal, the first fget call triggers the generator function,
+ whose output is stored in _<name>_cached_value. That and
+ subsequent calls to fget will return this cached value.
+
+ If the input value is no longer initVal (e.g. a value has been
+ loaded from disk or set with fset), that value overrides any
+ cached value, and this property has no effect.
+
+ When the cache flag is False and the stored value is initVal, the
+ generator is not cached, but is called on every fget.
+
+ The cache flag is missing on initialization. Particular instances
+ may override by setting their own flag.
+
+ In the case that mutable == True, all caching is disabled and the
+ generator is called whenever the cached value would otherwise be
+ used.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ cache = getattr(self, "_%s_cache" % name, True)
+ value = fget(self)
+ if value == initVal:
+ if cache == True and mutable == False:
+ if hasattr(self, "_%s_cached_value" % name):
+ value = getattr(self, "_%s_cached_value" % name)
+ else:
+ value = generator(self)
+ setattr(self, "_%s_cached_value" % name, value)
+ else:
+ value = generator(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def primed_property(primer, initVal=None):
+ """
+ Just like a cached_property, except that instead of returning a
+ new value and running fset to cache it, the primer performs some
+ background manipulation (e.g. loads data into instance.settings)
+ such that a _second_ pass through fget succeeds.
+
+ The 'cache' flag becomes a 'prime' flag, with priming taking place
+ whenever ._<name>_prime is True, or is False or missing and
+ value == initVal.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ prime = getattr(self, "_%s_prime" % name, False)
+ if prime == False:
+ value = fget(self)
+ if prime == True or (prime == False and value == initVal):
+ primer(self)
+ value = fget(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def change_hook_property(hook, mutable=False, default=None):
+ """
+ Call the function hook(instance, old_value, new_value) whenever a
+ value different from the current value is set (instance is a a
+ reference to the class instance to which this property belongs).
+ This is useful for saving changes to disk, etc. This function is
+ called _after_ the new value has been stored, allowing you to
+ change the stored value if you want.
+
+ In the case of mutables, things are slightly trickier. Because
+ the property-owning class has no way of knowing when the value
+ changes. We work around this by caching a private deepcopy of the
+ mutable value, and checking for changes whenever the property is
+ set (obviously) or retrieved (to check for external changes). So
+ long as you're conscientious about accessing the property after
+ making external modifications, mutability won't be a problem.
+ t.x.append(5) # external modification
+ t.x # dummy access notices change and triggers hook
+ See testChangeHookMutableProperty for an example of the expected
+ behavior.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self, new_value=None, from_fset=False): # only used if mutable == True
+ if from_fset == True:
+ value = new_value # compare new value with cached
+ else:
+ value = fget(self) # compare current value with cached
+ if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0:
+ # there has been a change, cache new value
+ old_value = _get_cached_mutable_property(self, "change hook property", name, default)
+ _set_cached_mutable_property(self, "change hook property", name, value)
+ if from_fset == True: # return previously cached value
+ value = old_value
+ else: # the value changed while we weren't looking
+ hook(self, old_value, value)
+ return value
+ def _fset(self, value):
+ if mutable == True: # get cached previous value
+ old_value = _fget(self, new_value=value, from_fset=True)
+ else:
+ old_value = fget(self)
+ fset(self, value)
+ if value != old_value:
+ hook(self, old_value, value)
+ if mutable == True:
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+if libbe.TESTING == True:
+ class DecoratorTests(unittest.TestCase):
+ def testLocalDoc(self):
+ class Test(object):
+ @Property
+ @doc_property("A fancy property")
+ def x():
+ return {}
+ self.failUnless(Test.x.__doc__ == "A fancy property",
+ Test.x.__doc__)
+ def testLocalProperty(self):
+ class Test(object):
+ @Property
+ @local_property(name="LOCAL")
+ def x():
+ return {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("_LOCAL_value" in dir(t), dir(t))
+ self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
+ def testSettingsProperty(self):
+ class Test(object):
+ @Property
+ @settings_property(name="attr")
+ def x():
+ return {}
+ def __init__(self):
+ self.settings = {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("attr" in t.settings, t.settings)
+ self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
+ def testDefaultingLocalProperty(self):
+ class Test(object):
+ @Property
+ @defaulting_property(default='y', null='x')
+ @local_property(name="DEFAULT", null=5)
+ def x(): return {}
+ t = Test()
+ self.failUnless(t.x == 5, str(t.x))
+ t.x = 'x'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'y'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'z'
+ self.failUnless(t.x == 'z', str(t.x))
+ t.x = 5
+ self.failUnless(t.x == 5, str(t.x))
+ def testCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testTwoCheckedLocalProperties(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="X")
+ def x(): return {}
+
+ @Property
+ @checked_property(allowed=['a', 'b', 'c'])
+ @local_property(name="A")
+ def a(): return {}
+ def __init__(self):
+ self._A_value = 'a'
+ self._X_value = 'x'
+ t = Test()
+ try:
+ t.x = 'a'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.x = 'x'
+ t.x = 'y'
+ t.x = 'z'
+ try:
+ t.a = 'x'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.a = 'a'
+ t.a = 'b'
+ t.a = 'c'
+ def testFnCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @fn_checked_property(lambda v : v in ['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testCachedLocalProperty(self):
+ class Gen(object):
+ def __init__(self):
+ self.i = 0
+ def __call__(self, owner):
+ self.i += 1
+ return self.i
+ class Test(object):
+ @Property
+ @cached_property(generator=Gen(), initVal=None)
+ @local_property(name="CACHED")
+ def x(): return {}
+ t = Test()
+ self.failIf("_CACHED_cache" in dir(t),
+ getattr(t, "_CACHED_cache", None))
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ t.x = 8
+ self.failUnless(t.x == 8, t.x)
+ self.failUnless(t.x == 8, t.x)
+ t._CACHED_cache = False # Caching is off, but the stored value
+ val = t.x # is 8, not the initVal (None), so we
+ self.failUnless(val == 8, val) # get 8.
+ t._CACHED_value = None # Now we've set the stored value to None
+ val = t.x # so future calls to fget (like this)
+ self.failUnless(val == 2, val) # will call the generator every time...
+ val = t.x
+ self.failUnless(val == 3, val)
+ val = t.x
+ self.failUnless(val == 4, val)
+ t._CACHED_cache = True # We turn caching back on, and get
+ self.failUnless(t.x == 1, str(t.x)) # the original cached value.
+ del t._CACHED_cached_value # Removing that value forces a
+ self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call
+ self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which
+ self.failUnless(t.x == 5, str(t.x)) # we get the new cached value.
+ def testPrimedLocalProperty(self):
+ class Test(object):
+ def prime(self):
+ self.settings["PRIMED"] = "initialized"
+ @Property
+ @primed_property(primer=prime, initVal=None)
+ @settings_property(name="PRIMED")
+ def x(): return {}
+ def __init__(self):
+ self.settings={}
+ t = Test()
+ self.failIf("_PRIMED_prime" in dir(t),
+ getattr(t, "_PRIMED_prime", None))
+ self.failUnless(t.x == "initialized", t.x)
+ t.x = 1
+ self.failUnless(t.x == 1, t.x)
+ t.x = None
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = True
+ t.x = 3
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = False
+ t.x = 3
+ self.failUnless(t.x == 3, t.x)
+ def testChangeHookLocalProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+
+ @Property
+ @change_hook_property(_hook)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 2
+ self.failUnless(t.old == 1, t.old)
+ self.failUnless(t.new == 2, t.new)
+ def testChangeHookMutableProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+ self.hook_calls += 1
+
+ @Property
+ @change_hook_property(_hook, mutable=True)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.hook_calls = 0
+ t.x = []
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == [], t.new)
+ self.failUnless(t.hook_calls == 1, t.hook_calls)
+ a = t.x
+ a.append(5)
+ t.x = a
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 2, t.hook_calls)
+ t.x = []
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [], t.new)
+ self.failUnless(t.hook_calls == 3, t.hook_calls)
+ # now append without reassigning. this doesn't trigger the
+ # change, since we don't ever set t.x, only get it and mess
+ # with it. It does, however, update our t.new, since t.new =
+ # t.x and is not a static copy.
+ t.x.append(5)
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 3, t.hook_calls)
+ # however, the next t.x get _will_ notice the change...
+ a = t.x
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 4, t.hook_calls)
+ t.x.append(6) # this append(6) is not noticed yet
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5,6], t.new)
+ self.failUnless(t.hook_calls == 4, t.hook_calls)
+ # this append(7) is not noticed, but the t.x get causes the
+ # append(6) to be noticed
+ t.x.append(7)
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [5,6,7], t.new)
+ self.failUnless(t.hook_calls == 5, t.hook_calls)
+ a = t.x # now the append(7) is noticed
+ self.failUnless(t.old == [5,6], t.old)
+ self.failUnless(t.new == [5,6,7], t.new)
+ self.failUnless(t.hook_calls == 6, t.hook_calls)
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py
new file mode 100644
index 0000000..8b86829
--- /dev/null
+++ b/libbe/storage/util/settings_object.py
@@ -0,0 +1,432 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+This module provides a base class implementing settings-dict based
+property storage useful for BE objects with saved properties
+(e.g. BugDir, Bug, Comment). For example usage, consider the
+unittests at the end of the module.
+"""
+
+import libbe
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+class _Token (object):
+ """
+ `Control' value class for properties. We want values that only
+ mean something to the settings_object module.
+ """
+ pass
+
+class UNPRIMED (_Token):
+ "Property has not been primed."
+ pass
+
+class EMPTY (_Token):
+ """
+ Property has been primed but has no user-set value, so use
+ default/generator value.
+ """
+ pass
+
+
+def prop_save_settings(self, old, new):
+ """
+ The default action undertaken when a property changes.
+ """
+ if self.storage != None and self.storage.is_writeable():
+ self.save_settings()
+
+def prop_load_settings(self):
+ """
+ The default action undertaken when an UNPRIMED property is accessed.
+ """
+ if self.storage != None and self.storage.is_readable() \
+ and self._settings_loaded==False:
+ self.load_settings()
+ else:
+ self._setup_saved_settings(flag_as_loaded=False)
+
+# Some name-mangling routines for pretty printing setting names
+def setting_name_to_attr_name(self, name):
+ """
+ Convert keys to the .settings dict into their associated
+ SavedSettingsObject attribute names.
+ >>> print setting_name_to_attr_name(None,"User-id")
+ user_id
+ """
+ return name.lower().replace('-', '_')
+
+def attr_name_to_setting_name(self, name):
+ """
+ The inverse of setting_name_to_attr_name.
+ >>> print attr_name_to_setting_name(None, "user_id")
+ User-id
+ """
+ return name.capitalize().replace('_', '-')
+
+
+def versioned_property(name, doc,
+ default=None, generator=None,
+ change_hook=prop_save_settings,
+ mutable=False,
+ primer=prop_load_settings,
+ allowed=None, check_fn=None,
+ settings_properties=[],
+ required_saved_properties=[],
+ require_save=False):
+ """
+ Combine the common decorators in a single function.
+
+ Use zero or one (but not both) of default or generator, since a
+ working default will keep the generator from functioning. Use the
+ default if you know what you want the default value to be at
+ 'coding time'. Use the generator if you can write a function to
+ determine a valid default at run time. If both default and
+ generator are None, then the property will be a defaulting
+ property which defaults to None.
+
+ allowed and check_fn have a similar relationship, although you can
+ use both of these if you want. allowed compares the proposed
+ value against a list determined at 'coding time' and check_fn
+ allows more flexible comparisons to take place at run time.
+
+ Set require_save to True if you want to save the default/generated
+ value for a property, to protect against future changes. E.g., we
+ currently expect all comments to be 'text/plain' but in the future
+ we may want to default to 'text/html'. If we don't want the old
+ comments to be interpreted as 'text/html', we would require that
+ the content type be saved.
+
+ change_hook, primer, settings_properties, and
+ required_saved_properties are only options to get their defaults
+ into our local scope. Don't mess with them.
+
+ Set mutable=True if:
+ * default is a mutable
+ * your generator function may return mutables
+ * you set change_hook and might have mutable property values
+ See the docstrings in libbe.properties for details on how each of
+ these cases are handled.
+ """
+ settings_properties.append(name)
+ if require_save == True:
+ required_saved_properties.append(name)
+ def decorator(funcs):
+ fulldoc = doc
+ if default != None or generator == None:
+ defaulting = defaulting_property(default=default, null=EMPTY,
+ mutable_default=mutable)
+ fulldoc += "\n\nThis property defaults to %s." % default
+ if generator != None:
+ cached = cached_property(generator=generator, initVal=EMPTY,
+ mutable=mutable)
+ fulldoc += "\n\nThis property is generated with %s." % generator
+ if check_fn != None:
+ fn_checked = fn_checked_property(value_allowed_fn=check_fn)
+ fulldoc += "\n\nThis property is checked with %s." % check_fn
+ if allowed != None:
+ checked = checked_property(allowed=allowed)
+ fulldoc += "\n\nThe allowed values for this property are: %s." \
+ % (', '.join(allowed))
+ hooked = change_hook_property(hook=change_hook, mutable=mutable,
+ default=EMPTY)
+ primed = primed_property(primer=primer, initVal=UNPRIMED)
+ settings = settings_property(name=name, null=UNPRIMED)
+ docp = doc_property(doc=fulldoc)
+ deco = hooked(primed(settings(docp(funcs))))
+ if default != None or generator == None:
+ deco = defaulting(deco)
+ if generator != None:
+ deco = cached(deco)
+ if check_fn != None:
+ deco = fn_checked(deco)
+ if allowed != None:
+ deco = checked(deco)
+ return Property(deco)
+ return decorator
+
+class SavedSettingsObject(object):
+
+ # Keep a list of properties that may be stored in the .settings dict.
+ #settings_properties = []
+
+ # A list of properties that we save to disk, even if they were
+ # never set (in which case we save the default value). This
+ # protects against future changes in default values.
+ #required_saved_properties = []
+
+ _setting_name_to_attr_name = setting_name_to_attr_name
+ _attr_name_to_setting_name = attr_name_to_setting_name
+
+ def __init__(self):
+ self._settings_loaded = False
+ self.storage = None
+ self.settings = {}
+
+ def load_settings(self):
+ """Load the settings from disk."""
+ # Override. Must call ._setup_saved_settings() after loading.
+ self.settings = {}
+ self._setup_saved_settings()
+
+ def _setup_saved_settings(self, flag_as_loaded=True):
+ """
+ To be run after setting self.settings up from disk. Marks all
+ settings as primed.
+ """
+ for property in self.settings_properties:
+ if property not in self.settings \
+ or self.settings[property] == UNPRIMED:
+ self.settings[property] = EMPTY
+ if flag_as_loaded == True:
+ self._settings_loaded = True
+
+ def save_settings(self):
+ """Load the settings from disk."""
+ # Override. Should save the dict output of ._get_saved_settings()
+ settings = self._get_saved_settings()
+ pass # write settings to disk....
+
+ def _get_saved_settings(self):
+ settings = {}
+ for k,v in self.settings.items():
+ if v != None and v != EMPTY:
+ settings[k] = v
+ for k in self.required_saved_properties:
+ settings[k] = getattr(self, self._setting_name_to_attr_name(k))
+ return settings
+
+ def clear_cached_setting(self, setting=None):
+ "If setting=None, clear *all* cached settings"
+ if setting != None:
+ if hasattr(self, "_%s_cached_value" % setting):
+ delattr(self, "_%s_cached_value" % setting)
+ else:
+ for setting in settings_properties:
+ self.clear_cached_setting(setting)
+
+
+if libbe.TESTING == True:
+ class SavedSettingsObjectTests(unittest.TestCase):
+ def testSimpleProperty(self):
+ """Testing a minimal versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties= \
+ required_saved_properties)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ # access missing setting
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ self.failUnless(len(t.settings) == 0, len(t.settings))
+ self.failUnless(t.content_type == None, t.content_type)
+ # accessing t.content_type triggers the priming, which runs
+ # t._setup_saved_settings, which fills out t.settings with
+ # EMPTY data. t._settings_loaded is still false though, since
+ # the default priming does not do any of the `official' loading
+ # that occurs in t.load_settings.
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ # load settings creates an EMPTY value in the settings array
+ t.load_settings()
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ # now we set a value
+ t.content_type = 5
+ self.failUnless(t.settings["Content-type"] == 5,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == 5, t.content_type)
+ self.failUnless(t.settings["Content-type"] == 5,
+ t.settings["Content-type"])
+ # now we set another value
+ t.content_type = "text/plain"
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings["Content-type"] == "text/plain",
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ # now we clear to the post-primed value
+ t.content_type = EMPTY
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ def testDefaultingProperty(self):
+ """Testing a defaulting versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties= \
+ required_saved_properties)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ t.load_settings()
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == {},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t.content_type == "text/html",
+ t.content_type)
+ self.failUnless(t.settings["Content-type"] == "text/html",
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testRequiredDefaultingProperty(self):
+ """Testing a required defaulting versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties= \
+ required_saved_properties,
+ require_save=True)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testClassVersionedPropertyDefinition(self):
+ """Testing a class-specific _versioned property decorator"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ def _versioned_property(settings_properties= \
+ settings_properties,
+ required_saved_properties= \
+ required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"] = \
+ required_saved_properties
+ return versioned_property(**kwargs)
+ @_versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ require_save=True)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testMutableChangeHookedProperty(self):
+ """Testing a mutable change-hooked property"""
+ SAVES = []
+ def prop_log_save_settings(self, old, new, saves=SAVES):
+ saves.append("'%s' -> '%s'" % (str(old), str(new)))
+ prop_save_settings(self, old, new)
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="List-type",
+ doc="A test property",
+ mutable=True,
+ change_hook=prop_log_save_settings,
+ settings_properties=settings_properties,
+ required_saved_properties= \
+ required_saved_properties)
+ def list_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ t.load_settings()
+ self.failUnless(SAVES == [], SAVES)
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.list_type == None, t.list_type)
+ self.failUnless(SAVES == [], SAVES)
+ self.failUnless(t.settings["List-type"]==EMPTY,
+ t.settings["List-type"])
+ t.list_type = []
+ self.failUnless(t.settings["List-type"] == [],
+ t.settings["List-type"])
+ self.failUnless(SAVES == [
+ "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'"
+ ], SAVES)
+ t.list_type.append(5)
+ self.failUnless(SAVES == [
+ "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'",
+ ], SAVES)
+ self.failUnless(t.settings["List-type"] == [5],
+ t.settings["List-type"])
+ self.failUnless(SAVES == [ # the append(5) has not yet been saved
+ "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'",
+ ], SAVES)
+ self.failUnless(t.list_type == [5], t.list_type)#get triggers saved
+
+ self.failUnless(SAVES == [ # now the append(5) has been saved.
+ "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'",
+ "'[]' -> '[5]'"
+ ], SAVES)
+
+ unitsuite = unittest.TestLoader().loadTestsFromTestCase( \
+ SavedSettingsObjectTests)
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/util/upgrade.py b/libbe/storage/util/upgrade.py
new file mode 100644
index 0000000..20ef1e4
--- /dev/null
+++ b/libbe/storage/util/upgrade.py
@@ -0,0 +1,331 @@
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Handle conversion between the various BE storage formats.
+"""
+
+import codecs
+import os, os.path
+import sys
+
+import libbe
+import libbe.bug
+import libbe.storage.util.mapfile as mapfile
+from libbe.storage import STORAGE_VERSIONS, STORAGE_VERSION
+#import libbe.storage.vcs # delay import to avoid cyclic dependency
+import libbe.ui.util.editor
+import libbe.util
+import libbe.util.encoding as encoding
+import libbe.util.id
+
+
+class Upgrader (object):
+ "Class for converting between different on-disk BE storage formats."
+ initial_version = None
+ final_version = None
+ def __init__(self, repo):
+ import libbe.storage.vcs
+
+ self.repo = repo
+ vcs_name = self._get_vcs_name()
+ if vcs_name == None:
+ vcs_name = 'None'
+ self.vcs = libbe.storage.vcs.vcs_by_name(vcs_name)
+ self.vcs.repo = self.repo
+ self.vcs.root()
+
+ def get_path(self, *args):
+ """
+ Return the absolute path using args relative to .be.
+ """
+ dir = os.path.join(self.repo, '.be')
+ if len(args) == 0:
+ return dir
+ return os.path.join(dir, *args)
+
+ def _get_vcs_name(self):
+ return None
+
+ def check_initial_version(self):
+ path = self.get_path('version')
+ version = encoding.get_file_contents(path, decode=True).rstrip('\n')
+ assert version == self.initial_version, '%s: %s' % (path, version)
+
+ def set_version(self):
+ path = self.get_path('version')
+ encoding.set_file_contents(path, self.final_version+'\n')
+ self.vcs._vcs_update(path)
+
+ def upgrade(self):
+ print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \
+ % (self.initial_version, self.final_version)
+ self.check_initial_version()
+ self.set_version()
+ self._upgrade()
+
+ def _upgrade(self):
+ raise NotImplementedError
+
+
+class Upgrade_1_0_to_1_1 (Upgrader):
+ initial_version = "Bugs Everywhere Tree 1 0"
+ final_version = "Bugs Everywhere Directory v1.1"
+ def _get_vcs_name(self):
+ path = self.get_path('settings')
+ settings = encoding.get_file_contents(path)
+ for line in settings.splitlines(False):
+ fields = line.split('=')
+ if len(fields) == 2 and fields[0] == 'rcs_name':
+ return fields[1]
+ return None
+
+ def _upgrade_mapfile(self, path):
+ contents = encoding.get_file_contents(path, decode=True)
+ old_format = False
+ for line in contents.splitlines():
+ if len(line.split('=')) == 2:
+ old_format = True
+ break
+ if old_format == True:
+ # translate to YAML.
+ newlines = []
+ for line in contents.splitlines():
+ line = line.rstrip('\n')
+ if len(line) == 0:
+ continue
+ fields = line.split("=")
+ if len(fields) == 2:
+ key,value = fields
+ newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
+ else:
+ newlines.append(line)
+ contents = '\n'.join(newlines)
+ # load the YAML and save
+ map = mapfile.parse(contents)
+ contents = mapfile.generate(map)
+ encoding.set_file_contents(path, contents)
+ self.vcs._vcs_update(path)
+
+ def _upgrade(self):
+ """
+ Comment value field "From" -> "Author".
+ Homegrown mapfile -> YAML.
+ """
+ path = self.get_path('settings')
+ self._upgrade_mapfile(path)
+ for bug_uuid in os.listdir(self.get_path('bugs')):
+ path = self.get_path('bugs', bug_uuid, 'values')
+ self._upgrade_mapfile(path)
+ c_path = ['bugs', bug_uuid, 'comments']
+ if not os.path.exists(self.get_path(*c_path)):
+ continue # no comments for this bug
+ for comment_uuid in os.listdir(self.get_path(*c_path)):
+ path_list = c_path + [comment_uuid, 'values']
+ path = self.get_path(*path_list)
+ self._upgrade_mapfile(path)
+ settings = mapfile.parse(
+ encoding.get_file_contents(path))
+ if 'From' in settings:
+ settings['Author'] = settings.pop('From')
+ encoding.set_file_contents(
+ path, mapfile.generate(settings))
+ self.vcs._vcs_update(path)
+
+
+class Upgrade_1_1_to_1_2 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.1"
+ final_version = "Bugs Everywhere Directory v1.2"
+ def _get_vcs_name(self):
+ path = self.get_path('settings')
+ settings = mapfile.parse(encoding.get_file_contents(path))
+ if 'rcs_name' in settings:
+ return settings['rcs_name']
+ return None
+
+ def _upgrade(self):
+ """
+ BugDir settings field "rcs_name" -> "vcs_name".
+ """
+ path = self.get_path('settings')
+ settings = mapfile.parse(encoding.get_file_contents(path))
+ if 'rcs_name' in settings:
+ settings['vcs_name'] = settings.pop('rcs_name')
+ encoding.set_file_contents(path, mapfile.generate(settings))
+ self.vcs._vcs_update(path)
+
+class Upgrade_1_2_to_1_3 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.2"
+ final_version = "Bugs Everywhere Directory v1.3"
+ def __init__(self, *args, **kwargs):
+ Upgrader.__init__(self, *args, **kwargs)
+ self._targets = {} # key: target text,value: new target bug
+
+ def _get_vcs_name(self):
+ path = self.get_path('settings')
+ settings = mapfile.parse(encoding.get_file_contents(path))
+ if 'vcs_name' in settings:
+ return settings['vcs_name']
+ return None
+
+ def _save_bug_settings(self, bug):
+ # The target bugs don't have comments
+ path = self.get_path('bugs', bug.uuid, 'values')
+ if not os.path.exists(path):
+ self.vcs._add_path(path, directory=False)
+ path = self.get_path('bugs', bug.uuid, 'values')
+ mf = mapfile.generate(bug._get_saved_settings())
+ encoding.set_file_contents(path, mf)
+ self.vcs._vcs_update(path)
+
+ def _target_bug(self, target_text):
+ if target_text not in self._targets:
+ bug = libbe.bug.Bug(summary=target_text)
+ bug.severity = 'target'
+ self._targets[target_text] = bug
+ return self._targets[target_text]
+
+ def _upgrade_bugdir_mapfile(self):
+ path = self.get_path('settings')
+ mf = encoding.get_file_contents(path)
+ if mf == libbe.util.InvalidObject:
+ return # settings file does not exist
+ settings = mapfile.parse(mf)
+ if 'target' in settings:
+ settings['target'] = self._target_bug(settings['target']).uuid
+ mf = mapfile.generate(settings)
+ encoding.set_file_contents(path, mf)
+ self.vcs._vcs_update(path)
+
+ def _upgrade_bug_mapfile(self, bug_uuid):
+ import libbe.command.depend as dep
+ path = self.get_path('bugs', bug_uuid, 'values')
+ mf = encoding.get_file_contents(path)
+ if mf == libbe.util.InvalidObject:
+ return # settings file does not exist
+ settings = mapfile.parse(mf)
+ if 'target' in settings:
+ target_bug = self._target_bug(settings['target'])
+
+ blocked_by_string = '%s%s' % (dep.BLOCKED_BY_TAG, bug_uuid)
+ dep._add_remove_extra_string(target_bug, blocked_by_string, add=True)
+ blocks_string = dep._generate_blocks_string(target_bug)
+ estrs = settings.get('extra_strings', [])
+ estrs.append(blocks_string)
+ settings['extra_strings'] = sorted(estrs)
+
+ settings.pop('target')
+ mf = mapfile.generate(settings)
+ encoding.set_file_contents(path, mf)
+ self.vcs._vcs_update(path)
+
+ def _upgrade(self):
+ """
+ Bug value field "target" -> target bugs.
+ Bugdir value field "target" -> pointer to current target bug.
+ """
+ for bug_uuid in os.listdir(self.get_path('bugs')):
+ self._upgrade_bug_mapfile(bug_uuid)
+ self._upgrade_bugdir_mapfile()
+ for bug in self._targets.values():
+ self._save_bug_settings(bug)
+
+class Upgrade_1_3_to_1_4 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.3"
+ final_version = "Bugs Everywhere Directory v1.4"
+ def _get_vcs_name(self):
+ path = self.get_path('settings')
+ settings = mapfile.parse(encoding.get_file_contents(path))
+ if 'vcs_name' in settings:
+ return settings['vcs_name']
+ return None
+
+ def _upgrade(self):
+ """
+ add new directory "./be/BUGDIR-UUID"
+ "./be/bugs" -> "./be/BUGDIR-UUID/bugs"
+ "./be/settings" -> "./be/BUGDIR-UUID/settings"
+ """
+ self.repo = os.path.abspath(self.repo)
+ basenames = [p for p in os.listdir(self.get_path())]
+ if not 'bugs' in basenames and not 'settings' in basenames \
+ and len([p for p in basenames if len(p)==36]) == 1:
+ return # the user has upgraded the directory.
+ basenames = [p for p in basenames if p in ['bugs','settings']]
+ uuid = libbe.util.id.uuid_gen()
+ add = [self.get_path(uuid)]
+ move = [(self.get_path(p), self.get_path(uuid, p)) for p in basenames]
+ msg = ['Upgrading BE directory version v1.3 to v1.4',
+ '',
+ "Because BE's VCS drivers don't support 'move',",
+ 'please make the following changes with your VCS',
+ 'and re-run BE. Note that you can choose a different',
+ 'bugdir UUID to preserve uniformity across branches',
+ 'of a distributed repository.'
+ '',
+ 'add',
+ ' ' + '\n '.join(add),
+ 'move',
+ ' ' + '\n '.join(['%s %s' % (a,b) for a,b in move]),
+ ]
+ self.vcs._cached_path_id.destroy()
+ raise Exception('Need user assistance\n%s' % '\n'.join(msg))
+
+
+upgraders = [Upgrade_1_0_to_1_1,
+ Upgrade_1_1_to_1_2,
+ Upgrade_1_2_to_1_3,
+ Upgrade_1_3_to_1_4]
+upgrade_classes = {}
+for upgrader in upgraders:
+ upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader
+
+def upgrade(path, current_version,
+ target_version=STORAGE_VERSION):
+ """
+ Call the appropriate upgrade function to convert current_version
+ to target_version. If a direct conversion function does not exist,
+ use consecutive conversion functions.
+ """
+ if current_version not in STORAGE_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % current_version
+ if target_version not in STORAGE_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % current_version
+
+ if (current_version, target_version) in upgrade_classes:
+ # direct conversion
+ upgrade_class = upgrade_classes[(current_version, target_version)]
+ u = upgrade_class(path)
+ u.upgrade()
+ else:
+ # consecutive single-step conversion
+ i = STORAGE_VERSIONS.index(current_version)
+ while True:
+ version_a = STORAGE_VERSIONS[i]
+ version_b = STORAGE_VERSIONS[i+1]
+ try:
+ upgrade_class = upgrade_classes[(version_a, version_b)]
+ except KeyError:
+ raise NotImplementedError, \
+ "Cannot convert version '%s' to '%s' yet." \
+ % (version_a, version_b)
+ u = upgrade_class(path)
+ u.upgrade()
+ if version_b == target_version:
+ break
+ i += 1
diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py
new file mode 100644
index 0000000..ddfb00a
--- /dev/null
+++ b/libbe/storage/vcs/__init__.py
@@ -0,0 +1,10 @@
+# Copyright
+
+import base
+
+set_preferred_vcs = base.set_preferred_vcs
+vcs_by_name = base.vcs_by_name
+detect_vcs = base.detect_vcs
+installed_vcs = base.installed_vcs
+
+__all__ = [set_preferred_vcs, vcs_by_name, detect_vcs, installed_vcs]
diff --git a/libbe/storage/vcs/arch.py b/libbe/storage/vcs/arch.py
new file mode 100644
index 0000000..f1b5b7b
--- /dev/null
+++ b/libbe/storage/vcs/arch.py
@@ -0,0 +1,350 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# James Rowe <jnrowe@ukfsn.org>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+GNU Arch (tla) backend.
+"""
+
+import codecs
+import os
+import re
+import shutil
+import sys
+import time
+
+import libbe
+import libbe.ui.util.user
+import libbe.storage.util.config
+from libbe.util.id import uuid_gen
+import base
+
+if libbe.TESTING == True:
+ import unittest
+ import doctest
+
+
+class CantAddFile(Exception):
+ def __init__(self, file):
+ self.file = file
+ Exception.__init__(self, "Can't automatically add file %s" % file)
+
+DEFAULT_CLIENT = 'tla'
+
+client = libbe.storage.util.config.get_val(
+ 'arch_client', default=DEFAULT_CLIENT)
+
+def new():
+ return Arch()
+
+class Arch(base.VCS):
+ name = 'arch'
+ client = client
+ _archive_name = None
+ _archive_dir = None
+ _tmp_archive = False
+ _project_name = None
+ _tmp_project = False
+ _arch_paramdir = os.path.expanduser('~/.arch-params')
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+ self.interspersed_vcs_files = True
+ self.paranoid = False
+
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client('--version')
+ return output
+
+ def _vcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Arch"""
+ if self._u_search_parent_directories(path, '{arch}') != None :
+ libbe.storage.util.config.set_val('arch_client', client)
+ return True
+ return False
+
+ def _vcs_init(self, path):
+ self._create_archive(path)
+ self._create_project(path)
+ self._add_project_code(path)
+
+ def _create_archive(self, path):
+ """
+ Create a temporary Arch archive in the directory PATH. This
+ archive will be removed by
+ destroy->_vcs_destroy->_remove_archive
+ """
+ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
+ assert self._archive_name == None
+ id = self.get_user_id()
+ name, email = libbe.ui.util.user.parse_user_id(id)
+ if email == None:
+ email = '%s@example.com' % name
+ trailer = '%s-%s' % ('bugs-everywhere-auto', uuid_gen()[0:8])
+ self._archive_name = '%s--%s' % (email, trailer)
+ self._archive_dir = '/tmp/%s' % trailer
+ self._tmp_archive = True
+ self._u_invoke_client('make-archive', self._archive_name,
+ self._archive_dir, cwd=path)
+
+ def _invoke_client(self, *args, **kwargs):
+ """
+ Invoke the client on our archive.
+ """
+ assert self._archive_name != None
+ command = args[0]
+ if len(args) > 1:
+ tailargs = args[1:]
+ else:
+ tailargs = []
+ arglist = [command, '-A', self._archive_name]
+ arglist.extend(tailargs)
+ args = tuple(arglist)
+ return self._u_invoke_client(*args, **kwargs)
+
+ def _remove_archive(self):
+ assert self._tmp_archive == True
+ assert self._archive_dir != None
+ assert self._archive_name != None
+ os.remove(os.path.join(self._arch_paramdir,
+ '=locations', self._archive_name))
+ shutil.rmtree(self._archive_dir)
+ self._tmp_archive = False
+ self._archive_dir = False
+ self._archive_name = False
+
+ def _create_project(self, path):
+ """
+ Create a temporary Arch project in the directory PATH. This
+ project will be removed by
+ destroy->_vcs_destroy->_remove_project
+ """
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
+ category = 'bugs-everywhere'
+ branch = 'mainline'
+ version = '0.1'
+ self._project_name = '%s--%s--%s' % (category, branch, version)
+ self._invoke_client('archive-setup', self._project_name,
+ cwd=path)
+ self._tmp_project = True
+
+ def _remove_project(self):
+ assert self._tmp_project == True
+ assert self._project_name != None
+ assert self._archive_dir != None
+ shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
+ self._tmp_project = False
+ self._project_name = False
+
+ def _archive_project_name(self):
+ assert self._archive_name != None
+ assert self._project_name != None
+ return '%s/%s' % (self._archive_name, self._project_name)
+
+ def _adjust_naming_conventions(self, path):
+ """
+ By default, Arch restricts source code filenames to
+ ^[_=a-zA-Z0-9].*$
+ See
+ http://regexps.srparish.net/tutorial-tla/naming-conventions.html
+ Since our bug directory '.be' doesn't satisfy these conventions,
+ we need to adjust them.
+
+ The conventions are specified in
+ project-root/{arch}/=tagging-method
+ """
+ tagpath = os.path.join(path, '{arch}', '=tagging-method')
+ lines_out = []
+ f = codecs.open(tagpath, 'r', self.encoding)
+ for line in f:
+ if line.startswith('source '):
+ lines_out.append('source ^[._=a-zA-X0-9].*$\n')
+ else:
+ lines_out.append(line)
+ f.close()
+ f = codecs.open(tagpath, 'w', self.encoding)
+ f.write(''.join(lines_out))
+ f.close()
+
+ def _add_project_code(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-source.html
+ # http://regexps.srparish.net/tutorial-tla/importing-first.html
+ self._invoke_client('init-tree', self._project_name,
+ cwd=path)
+ self._adjust_naming_conventions(path)
+ self._invoke_client('import', '--summary', 'Began versioning',
+ cwd=path)
+
+ def _vcs_destroy(self):
+ if self._tmp_project == True:
+ self._remove_project()
+ if self._tmp_archive == True:
+ self._remove_archive()
+ vcs_dir = os.path.join(self.repo, '{arch}')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+ self._archive_name = None
+
+ def _vcs_root(self, path):
+ if not os.path.isdir(path):
+ dirname = os.path.dirname(path)
+ else:
+ dirname = path
+ status,output,error = self._u_invoke_client('tree-root', dirname)
+ root = output.rstrip('\n')
+
+ self._get_archive_project_name(root)
+
+ return root
+
+ def _get_archive_name(self, root):
+ status,output,error = self._u_invoke_client('archives')
+ lines = output.split('\n')
+ # e.g. output:
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52
+ # /tmp/BEtestXXXXXX/rootdir
+ # (+ repeats)
+ for archive,location in zip(lines[::2], lines[1::2]):
+ if os.path.realpath(location) == os.path.realpath(root):
+ self._archive_name = archive
+ assert self._archive_name != None
+
+ def _get_archive_project_name(self, root):
+ # get project names
+ status,output,error = self._u_invoke_client('tree-version', cwd=root)
+ # e.g output
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1
+ archive_name,project_name = output.rstrip('\n').split('/')
+ self._archive_name = archive_name
+ self._project_name = project_name
+
+ def _vcs_get_user_id(self):
+ try:
+ status,output,error = self._u_invoke_client('my-id')
+ return output.rstrip('\n')
+ except Exception, e:
+ if 'no arch user id set' in e.args[0]:
+ return None
+ else:
+ raise
+
+ def _vcs_add(self, path):
+ self._u_invoke_client('add-id', path)
+ realpath = os.path.realpath(self._u_abspath(path))
+ pathAdded = realpath in self._list_added(self.repo)
+ if self.paranoid and not pathAdded:
+ self._force_source(path)
+
+ def _list_added(self, root):
+ assert os.path.exists(root)
+ assert os.access(root, os.X_OK)
+ root = os.path.realpath(root)
+ status,output,error = self._u_invoke_client('inventory', '--source',
+ '--both', '--all', root)
+ inv_str = output.rstrip('\n')
+ return [os.path.join(root, p) for p in inv_str.split('\n')]
+
+ def _add_dir_rule(self, rule, dirname, root):
+ inv_path = os.path.join(dirname, '.arch-inventory')
+ f = codecs.open(inv_path, 'a', self.encoding)
+ f.write(rule)
+ f.close()
+ if os.path.realpath(inv_path) not in self._list_added(root):
+ paranoid = self.paranoid
+ self.paranoid = False
+ self.add(inv_path)
+ self.paranoid = paranoid
+
+ def _force_source(self, path):
+ rule = 'source %s\n' % self._u_rel_path(path)
+ self._add_dir_rule(rule, os.path.dirname(path), self.repo)
+ if os.path.realpath(path) not in self._list_added(self.repo):
+ raise CantAddFile(path)
+
+ def _vcs_remove(self, path):
+ if self._vcs_is_versioned(path):
+ self._u_invoke_client('delete-id', path)
+ arch_ids = os.path.join(self.repo, path, '.arch-ids')
+ if os.path.exists(arch_ids):
+ shutil.rmtree(arch_ids)
+
+ def _vcs_update(self, path):
+ pass
+
+ def _vcs_is_versioned(self, path):
+ if '.arch-ids' in path:
+ return False
+ return True
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ else:
+ status,output,error = \
+ self._invoke_client('file-find', path, revision)
+ relpath = output.rstrip('\n')
+ return base.VCS._vcs_get_file_contents(self, relpath)
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ if allow_empty == False:
+ # arch applies empty commits without complaining, so check first
+ status,output,error = self._u_invoke_client('changes',expect=(0,1))
+ if status == 0:
+ raise base.EmptyCommit()
+ summary,body = self._u_parse_commitfile(commitfile)
+ args = ['commit', '--summary', summary]
+ if body != None:
+ args.extend(['--log-message',body])
+ status,output,error = self._u_invoke_client(*args)
+ revision = None
+ revline = re.compile('[*] committed (.*)')
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revpath = match.groups()[0]
+ assert not " " in revpath, revpath
+ assert revpath.startswith(self._archive_project_name()+'--')
+ revision = revpath[len(self._archive_project_name()+'--'):]
+ return revpath
+
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client('logs')
+ logs = output.splitlines()
+ first_log = logs.pop(0)
+ assert first_log == 'base-0', first_log
+ try:
+ if index > 0:
+ log = logs[index-1]
+ elif index < 0:
+ log = logs[index]
+ else:
+ return None
+ except IndexError:
+ return None
+ return '%s--%s' % (self._archive_project_name(), log)
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Arch, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
new file mode 100644
index 0000000..99f43f3
--- /dev/null
+++ b/libbe/storage/vcs/base.py
@@ -0,0 +1,1106 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Alexander Belchenko <bialix@ukr.net>
+# Ben Finney <benf@cybersource.com.au>
+# Chris Ball <cjb@laptop.org>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the base VCS (Version Control System) class, which should be
+subclassed by other Version Control System backends. The base class
+implements a "do not version" VCS.
+"""
+
+import codecs
+import os
+import os.path
+import re
+import shutil
+import sys
+import tempfile
+import types
+
+import libbe
+import libbe.storage
+import libbe.storage.base
+import libbe.util.encoding
+from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
+from libbe.util.utility import Dir, search_parent_directories
+from libbe.util.subproc import CommandError, invoke
+from libbe.util.plugin import import_by_name
+import libbe.storage.util.upgrade as upgrade
+
+if libbe.TESTING == True:
+ import unittest
+ import doctest
+
+ import libbe.ui.util.user
+
+# List VCS modules in order of preference.
+# Don't list this module, it is implicitly last.
+VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
+
+def set_preferred_vcs(name):
+ global VCS_ORDER
+ assert name in VCS_ORDER, \
+ 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
+ VCS_ORDER.remove(name)
+ VCS_ORDER.insert(0, name)
+
+def _get_matching_vcs(matchfn):
+ """Return the first module for which matchfn(VCS_instance) is true"""
+ for submodname in VCS_ORDER:
+ module = import_by_name('libbe.storage.vcs.%s' % submodname)
+ vcs = module.new()
+ if matchfn(vcs) == True:
+ return vcs
+ return VCS()
+
+def vcs_by_name(vcs_name):
+ """Return the module for the VCS with the given name"""
+ if vcs_name == VCS.name:
+ return new()
+ return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
+
+def detect_vcs(dir):
+ """Return an VCS instance for the vcs being used in this directory"""
+ return _get_matching_vcs(lambda vcs: vcs._detect(dir))
+
+def installed_vcs():
+ """Return an instance of an installed VCS"""
+ return _get_matching_vcs(lambda vcs: vcs.installed())
+
+
+class VCSNotRooted (libbe.storage.base.ConnectionError):
+ def __init__(self, vcs):
+ msg = 'VCS not rooted'
+ libbe.storage.base.ConnectionError.__init__(self, msg)
+ self.vcs = vcs
+
+class VCSUnableToRoot (libbe.storage.base.ConnectionError):
+ def __init__(self, vcs):
+ msg = 'VCS unable to root'
+ libbe.storage.base.ConnectionError.__init__(self, msg)
+ self.vcs = vcs
+
+class InvalidPath (InvalidID):
+ def __init__(self, path, root, msg=None, **kwargs):
+ if msg == None:
+ msg = 'Path "%s" not in root "%s"' % (path, root)
+ InvalidID.__init__(self, msg=msg, **kwargs)
+ self.path = path
+ self.root = root
+
+class SpacerCollision (InvalidPath):
+ def __init__(self, path, spacer):
+ msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
+ InvalidPath.__init__(self, path, root=None, msg=msg)
+ self.spacer = spacer
+
+class NoSuchFile (InvalidID):
+ def __init__(self, pathname, root='.'):
+ path = os.path.abspath(os.path.join(root, pathname))
+ InvalidID.__init__(self, 'No such file: %s' % path)
+
+
+class CachedPathID (object):
+ """
+ Storage ID <-> path policy.
+ .../.be/BUGDIR/bugs/BUG/comments/COMMENT
+ ^-- root path
+
+ >>> dir = Dir()
+ >>> os.mkdir(os.path.join(dir.path, '.be'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
+ ... 'w').close()
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
+ ... 'w').close()
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
+ ... 'w').close()
+ >>> c = CachedPathID()
+ >>> c.root(dir.path)
+ >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
+ 'def/values'
+ >>> c.init()
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc', 'id-cache']
+ >>> c.connect()
+ >>> c.path('123/values') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/values'
+ >>> c.disconnect()
+ >>> c.destroy()
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc']
+ >>> c.connect() # demonstrate auto init
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc', 'id-cache']
+ >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
+ u'.../.be/xyz'
+ >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
+ u'.../.be/xyz/def'
+ >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/comments/qrs'
+ >>> c.disconnect()
+ >>> c.connect()
+ >>> c.path('qrs') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/comments/qrs'
+ >>> c.remove_id('qrs')
+ >>> c.path('qrs')
+ Traceback (most recent call last):
+ ...
+ InvalidID: 'qrs'
+ >>> c.disconnect()
+ >>> c.destroy()
+ >>> dir.cleanup()
+ """
+ def __init__(self, encoding=None):
+ self.encoding = libbe.util.encoding.get_filesystem_encoding()
+ self._spacer_dirs = ['.be', 'bugs', 'comments']
+
+ def root(self, path):
+ self._root = os.path.abspath(path).rstrip(os.path.sep)
+ self._cache_path = os.path.join(
+ self._root, self._spacer_dirs[0], 'id-cache')
+
+ def init(self):
+ """
+ Create cache file for an existing .be directory.
+ File if multiple lines of the form:
+ UUID\tPATH
+ """
+ self._cache = {}
+ spaced_root = os.path.join(self._root, self._spacer_dirs[0])
+ for dirpath, dirnames, filenames in os.walk(spaced_root):
+ if dirpath == spaced_root:
+ continue
+ try:
+ id = self.id(dirpath)
+ relpath = dirpath[len(self._root)+1:]
+ if id.count('/') == 0:
+ if id in self._cache:
+ print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
+ self._cache[id] = relpath
+ except InvalidPath:
+ pass
+ self._changed = True
+ self.disconnect()
+
+ def destroy(self):
+ if os.path.exists(self._cache_path):
+ os.remove(self._cache_path)
+
+ def connect(self):
+ if not os.path.exists(self._cache_path):
+ try:
+ self.init()
+ except IOError:
+ raise libbe.storage.base.ConnectionError
+ self._cache = {} # key: uuid, value: path
+ self._changed = False
+ f = codecs.open(self._cache_path, 'r', self.encoding)
+ for line in f:
+ fields = line.rstrip('\n').split('\t')
+ self._cache[fields[0]] = fields[1]
+ f.close()
+
+ def disconnect(self):
+ if self._changed == True:
+ f = codecs.open(self._cache_path, 'w', self.encoding)
+ for uuid,path in self._cache.items():
+ f.write('%s\t%s\n' % (uuid, path))
+ f.close()
+ self._cache = {}
+
+ def path(self, id, relpath=False):
+ fields = id.split('/', 1)
+ uuid = fields[0]
+ if len(fields) == 1:
+ extra = []
+ else:
+ extra = fields[1:]
+ if uuid not in self._cache:
+ raise InvalidID(uuid)
+ if relpath == True:
+ return os.path.join(self._cache[uuid], *extra)
+ return os.path.join(self._root, self._cache[uuid], *extra)
+
+ def add_id(self, id, parent=None):
+ if id.count('/') > 0:
+ # not a UUID-level path
+ assert id.startswith(parent), \
+ 'Strange ID: "%s" should start with "%s"' % (id, parent)
+ path = self.path(id)
+ elif id in self._cache:
+ # already added
+ path = self.path(id)
+ else:
+ if parent == None:
+ parent_path = ''
+ spacer = self._spacer_dirs[0]
+ else:
+ assert parent.count('/') == 0, \
+ 'Strange parent ID: "%s" should be UUID' % parent
+ parent_path = self.path(parent, relpath=True)
+ parent_spacer = parent_path.split(os.path.sep)[-2]
+ i = self._spacer_dirs.index(parent_spacer)
+ spacer = self._spacer_dirs[i+1]
+ path = os.path.join(parent_path, spacer, id)
+ self._cache[id] = path
+ self._changed = True
+ path = os.path.join(self._root, path)
+ return path
+
+ def remove_id(self, id):
+ if id.count('/') > 0:
+ return # not a UUID-level path
+ self._cache.pop(id)
+ self._changed = True
+
+ def id(self, path):
+ path = os.path.join(self._root, path)
+ if not path.startswith(self._root + os.path.sep):
+ raise InvalidPath(path, self._root)
+ path = path[len(self._root)+1:]
+ orig_path = path
+ if not path.startswith(self._spacer_dirs[0] + os.path.sep):
+ raise InvalidPath(path, self._spacer_dirs[0])
+ for spacer in self._spacer_dirs:
+ if not path.startswith(spacer + os.path.sep):
+ break
+ id = path[len(spacer)+1:]
+ fields = path[len(spacer)+1:].split(os.path.sep,1)
+ if len(fields) == 1:
+ break
+ path = fields[1]
+ for spacer in self._spacer_dirs:
+ if id.endswith(os.path.sep + spacer):
+ raise SpacerCollision(orig_path, spacer)
+ if os.path.sep != '/':
+ id = id.replace(os.path.sep, '/')
+ return id
+
+
+def new():
+ return VCS()
+
+class VCS (libbe.storage.base.VersionedStorage):
+ """
+ This class implements a 'no-vcs' interface.
+
+ Support for other VCSs can be added by subclassing this class, and
+ overriding methods _vcs_*() with code appropriate for your VCS.
+
+ The methods _u_*() are utility methods available to the _vcs_*()
+ methods.
+
+ Sink to existing root
+ ======================
+
+ Consider the following usage case:
+ You have a bug directory rooted in
+ /path/to/source
+ by which I mean the '.be' directory is at
+ /path/to/source/.be
+ However, you're of in some subdirectory like
+ /path/to/source/GUI/testing
+ and you want to comment on a bug. Setting sink_to_root=True when
+ you initialize your BugDir will cause it to search for the '.be'
+ file in the ancestors of the path you passed in as 'root'.
+ /path/to/source/GUI/testing/.be miss
+ /path/to/source/GUI/.be miss
+ /path/to/source/.be hit!
+ So it still roots itself appropriately without much work for you.
+
+ File-system access
+ ==================
+
+ BugDirs live completely in memory when .sync_with_disk is False.
+ This is the default configuration setup by BugDir(from_disk=False).
+ If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
+ any changes to the BugDir will be immediately written to disk.
+
+ If you want to change .sync_with_disk, we suggest you use
+ .set_sync_with_disk(), which propogates the new setting through to
+ all bugs/comments/etc. that have been loaded into memory. If
+ you've been living in memory and want to move to
+ .sync_with_disk==True, but you're not sure if anything has been
+ changed in memory, a call to .save() immediately before the
+ .set_sync_with_disk(True) call is a safe move.
+
+ Regardless of .sync_with_disk, a call to .save() will write out
+ all the contents that the BugDir instance has loaded into memory.
+ If sync_with_disk has been True over the course of all interesting
+ changes, this .save() call will be a waste of time.
+
+ The BugDir will only load information from the file system when it
+ loads new settings/bugs/comments that it doesn't already have in
+ memory and .sync_with_disk == True.
+
+ Allow storage initialization
+ ========================
+
+ This one is for testing purposes. Setting it to True allows the
+ BugDir to search for an installed Storage backend and initialize
+ it in the root directory. This is a convenience option for
+ supporting tests of versioning functionality
+ (e.g. .duplicate_bugdir).
+
+ Disable encoding manipulation
+ =============================
+
+ This one is for testing purposed. You might have non-ASCII
+ Unicode in your bugs, comments, files, etc. BugDir instances try
+ and support your preferred encoding scheme (e.g. "utf-8") when
+ dealing with stream and file input/output. For stream output,
+ this involves replacing sys.stdout and sys.stderr
+ (libbe.encode.set_IO_stream_encodings). However this messes up
+ doctest's output catching. In order to support doctest tests
+ using BugDirs, set manipulate_encodings=False, and stick to ASCII
+ in your tests.
+
+ if root == None:
+ root = os.getcwd()
+ if sink_to_existing_root == True:
+ self.root = self._find_root(root)
+ else:
+ if not os.path.exists(root):
+ self.root = None
+ raise NoRootEntry(root)
+ self.root = root
+ # get a temporary storage until we've loaded settings
+ self.sync_with_disk = False
+ self.storage = self._guess_storage()
+
+ if assert_new_BugDir == True:
+ if os.path.exists(self.get_path()):
+ raise AlreadyInitialized, self.get_path()
+ if storage == None:
+ storage = self._guess_storage(allow_storage_init)
+ self.storage = storage
+ self._setup_user_id(self.user_id)
+
+
+ # methods for getting the BugDir situated in the filesystem
+
+ def _find_root(self, path):
+ '''
+ Search for an existing bug database dir and it's ancestors and
+ return a BugDir rooted there. Only called by __init__, and
+ then only if sink_to_existing_root == True.
+ '''
+ if not os.path.exists(path):
+ self.root = None
+ raise NoRootEntry(path)
+ versionfile=utility.search_parent_directories(path,
+ os.path.join(".be", "version"))
+ if versionfile != None:
+ beroot = os.path.dirname(versionfile)
+ root = os.path.dirname(beroot)
+ return root
+ else:
+ beroot = utility.search_parent_directories(path, ".be")
+ if beroot == None:
+ self.root = None
+ raise NoBugDir(path)
+ return beroot
+
+ def _guess_storage(self, allow_storage_init=False):
+ '''
+ Only called by __init__.
+ '''
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ new_storage = storage.detect_storage(deepdir)
+ install = False
+ if new_storage.name == "None":
+ if allow_storage_init == True:
+ new_storage = storage.installed_storage()
+ new_storage.init(self.root)
+ return new_storage
+
+os.listdir(self.get_path("bugs")):
+ """
+ name = 'None'
+ client = 'false' # command-line tool for _u_invoke_client
+
+ def __init__(self, *args, **kwargs):
+ if 'encoding' not in kwargs:
+ kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
+ libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
+ self.versioned = False
+ self.interspersed_vcs_files = False
+ self.verbose_invoke = False
+ self._cached_path_id = CachedPathID()
+ self._rooted = False
+
+ def _vcs_version(self):
+ """
+ Return the VCS version string.
+ """
+ return '0'
+
+ def _vcs_get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return None.
+ """
+ return None
+
+ def _vcs_detect(self, path=None):
+ """
+ Detect whether a directory is revision controlled with this VCS.
+ """
+ return True
+
+ def _vcs_root(self, path):
+ """
+ Get the VCS root. This is the default working directory for
+ future invocations. You would normally set this to the root
+ directory for your VCS.
+ """
+ if os.path.isdir(path) == False:
+ path = os.path.dirname(path)
+ if path == '':
+ path = os.path.abspath('.')
+ return path
+
+ def _vcs_init(self, path):
+ """
+ Begin versioning the tree based at path.
+ """
+ pass
+
+ def _vcs_destroy(self):
+ """
+ Remove any files used in versioning (e.g. whatever _vcs_init()
+ created).
+ """
+ pass
+
+ def _vcs_add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ pass
+
+ def _vcs_remove(self, path):
+ """
+ Remove the file at path from version control. Optionally
+ remove the file from the filesystem as well.
+ """
+ pass
+
+ def _vcs_update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ pass
+
+ def _vcs_is_versioned(self, path):
+ """
+ Return true if a path is under version control, False
+ otherwise. You only need to set this if the VCS goes about
+ dumping VCS-specific files into the .be directory.
+
+ If you do need to implement this method (e.g. Arch), set
+ self.interspersed_vcs_files = True
+ """
+ assert self.interspersed_vcs_files == False
+ raise NotImplementedError
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ """
+ Get the file contents as they were in a given revision.
+ Revision==None specifies the current revision.
+ """
+ if revision != None:
+ raise libbe.storage.base.InvalidRevision(
+ 'The %s VCS does not support revision specifiers' % self.name)
+ path = os.path.join(self.repo, path)
+ if not os.path.exists(path):
+ return libbe.util.InvalidObject
+ if os.path.isdir(path):
+ return libbe.storage.base.InvalidDirectory
+ f = open(path, 'rb')
+ contents = f.read()
+ f.close()
+ return contents
+
+ def _vcs_path(self, id, revision):
+ """
+ Return the path to object id as of revision.
+
+ Revision will not be None.
+ """
+ raise NotImplementedError
+
+ def _vcs_isdir(self, path, revision):
+ """
+ Return True if path (as returned by _vcs_path) was a directory
+ as of revision, False otherwise.
+
+ Revision will not be None.
+ """
+ raise NotImplementedError
+
+ def _vcs_listdir(self, path, revision):
+ """
+ Return a list of the contents of the directory path (as
+ returned by _vcs_path) as of revision.
+
+ Revision will not be None, and ._vcs_isdir(path, revision)
+ will be True.
+ """
+ raise NotImplementedError
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ """
+ Commit the current working directory, using the contents of
+ commitfile as the comment. Return the name of the old
+ revision (or None if commits are not supported).
+
+ If allow_empty == False, raise EmptyCommit if there are no
+ changes to commit.
+ """
+ return None
+
+ def _vcs_revision_id(self, index):
+ """
+ Return the name of the <index>th revision. Index will be an
+ integer (possibly <= 0). The choice of which branch to follow
+ when crossing branches/merges is not defined.
+
+ Return None if revision IDs are not supported, or if the
+ specified revision does not exist.
+ """
+ return None
+
+ def version(self):
+ # Cache version string for efficiency.
+ if not hasattr(self, '_version'):
+ self._version = self._get_version()
+ return self._version
+
+ def _get_version(self):
+ try:
+ ret = self._vcs_version()
+ return ret
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ return None
+ else:
+ raise OSError, e
+ except CommandError:
+ return None
+
+ def installed(self):
+ if self.version() != None:
+ return True
+ return False
+
+ def get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return None.
+ You can override the automatic lookup procedure by setting the
+ VCS.user_id attribute to a string of your choice.
+ """
+ if not hasattr(self, 'user_id'):
+ self.user_id = self._vcs_get_user_id()
+ return self.user_id
+
+ def _detect(self, path='.'):
+ """
+ Detect whether a directory is revision controlled with this VCS.
+ """
+ return self._vcs_detect(path)
+
+ def root(self):
+ """
+ Set the root directory to the path's VCS root. This is the
+ default working directory for future invocations.
+ """
+ if self._detect(self.repo) == False:
+ raise VCSUnableToRoot(self)
+ root = self._vcs_root(self.repo)
+ self.repo = os.path.abspath(root)
+ if os.path.isdir(self.repo) == False:
+ self.repo = os.path.dirname(self.repo)
+ self.be_dir = os.path.join(
+ self.repo, self._cached_path_id._spacer_dirs[0])
+ self._cached_path_id.root(self.repo)
+ self._rooted = True
+
+ def _init(self):
+ """
+ Begin versioning the tree based at self.repo.
+ Also roots the vcs at path.
+ """
+ if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
+ raise VCSUnableToRoot(self)
+ if self._vcs_detect(self.repo) == False:
+ self._vcs_init(self.repo)
+ if self._rooted == False:
+ self.root()
+ os.mkdir(self.be_dir)
+ self._vcs_add(self._u_rel_path(self.be_dir))
+ self._setup_storage_version()
+ self._cached_path_id.init()
+
+ def _destroy(self):
+ self._vcs_destroy()
+ self._cached_path_id.destroy()
+ if os.path.exists(self.be_dir):
+ shutil.rmtree(self.be_dir)
+
+ def _connect(self):
+ if self._rooted == False:
+ self.root()
+ if not os.path.isdir(self.be_dir):
+ raise libbe.storage.base.ConnectionError(self)
+ self._cached_path_id.connect()
+ self.check_storage_version()
+
+ def _disconnect(self):
+ self._cached_path_id.disconnect()
+
+ def _add_path(self, path, directory=False):
+ relpath = self._u_rel_path(path)
+ reldirs = relpath.split(os.path.sep)
+ if directory == False:
+ reldirs = reldirs[:-1]
+ dir = self.repo
+ for reldir in reldirs:
+ dir = os.path.join(dir, reldir)
+ if not os.path.exists(dir):
+ os.mkdir(dir)
+ self._vcs_add(self._u_rel_path(dir))
+ elif not os.path.isdir(dir):
+ raise libbe.storage.base.InvalidDirectory
+ if directory == False:
+ if not os.path.exists(path):
+ open(path, 'w').close()
+ self._vcs_add(self._u_rel_path(path))
+
+ def _add(self, id, parent=None, **kwargs):
+ path = self._cached_path_id.add_id(id, parent)
+ self._add_path(path, **kwargs)
+
+ def _remove(self, id):
+ path = self._cached_path_id.path(id)
+ if os.path.exists(path):
+ if os.path.isdir(path) and len(self.children(id)) > 0:
+ raise libbe.storage.base.DirectoryNotEmpty(id)
+ self._vcs_remove(self._u_rel_path(path))
+ if os.path.exists(path):
+ if os.path.isdir(path):
+ os.rmdir(path)
+ else:
+ os.remove(path)
+ self._cached_path_id.remove_id(id)
+
+ def _recursive_remove(self, id):
+ path = self._cached_path_id.path(id)
+ for dirpath,dirnames,filenames in os.walk(path, topdown=False):
+ filenames.extend(dirnames)
+ for f in filenames:
+ fullpath = os.path.join(dirpath, f)
+ if os.path.exists(fullpath) == False:
+ continue
+ self._vcs_remove(self._u_rel_path(fullpath))
+ if os.path.exists(path):
+ shutil.rmtree(path)
+ path = self._cached_path_id.path(id, relpath=True)
+ for id,p in self._cached_path_id._cache.items():
+ if p.startswith(path):
+ self._cached_path_id.remove_id(id)
+
+ def _children(self, id=None, revision=None):
+ if revision == None:
+ id_to_path = self._cached_path_id.path
+ isdir = os.path.isdir
+ listdir = os.listdir
+ else:
+ id_to_path = lambda id : self._vcs_path(id, revision)
+ isdir = lambda path : self._vcs_isdir(path, revision)
+ listdir = lambda path : self._vcs_listdir(path, revision)
+ if id==None:
+ path = self.be_dir
+ else:
+ path = id_to_path(id)
+ if isdir(path) == False:
+ return []
+ children = listdir(path)
+ for i,c in enumerate(children):
+ if c in self._cached_path_id._spacer_dirs:
+ children[i] = None
+ children.extend([os.path.join(c, c2) for c2 in
+ listdir(os.path.join(path, c))])
+ elif c in ['id-cache', 'version']:
+ children[i] = None
+ elif self.interspersed_vcs_files \
+ and self._vcs_is_versioned(c) == False:
+ children[i] = None
+ for i,c in enumerate(children):
+ if c == None: continue
+ cpath = os.path.join(path, c)
+ if self.interspersed_vcs_files == True \
+ and revision != None \
+ and self._vcs_is_versioned(cpath) == False:
+ children[i] = None
+ else:
+ children[i] = self._u_path_to_id(cpath)
+ children[i]
+ return [c for c in children if c != None]
+
+ def _get(self, id, default=libbe.util.InvalidObject, revision=None):
+ try:
+ path = self._cached_path_id.path(id)
+ except InvalidID, e:
+ if default == libbe.util.InvalidObject:
+ raise e
+ return default
+ relpath = self._u_rel_path(path)
+ try:
+ contents = self._vcs_get_file_contents(relpath, revision)
+ except InvalidID, e:
+ if InvalidID == None:
+ e.id = InvalidID
+ raise
+ if contents in [libbe.storage.base.InvalidDirectory,
+ libbe.util.InvalidObject]:
+ raise InvalidID(id)
+ elif len(contents) == 0:
+ return None
+ return contents
+
+ def _set(self, id, value):
+ try:
+ path = self._cached_path_id.path(id)
+ except InvalidID, e:
+ raise e
+ if not os.path.exists(path):
+ raise InvalidID(id)
+ if os.path.isdir(path):
+ raise libbe.storage.base.InvalidDirectory(id)
+ f = open(path, "wb")
+ f.write(value)
+ f.close()
+ self._vcs_update(self._u_rel_path(path))
+
+ def _commit(self, summary, body=None, allow_empty=False):
+ summary = summary.strip()+'\n'
+ if body is not None:
+ summary += '\n' + body.strip() + '\n'
+ descriptor, filename = tempfile.mkstemp()
+ revision = None
+ try:
+ temp_file = os.fdopen(descriptor, 'wb')
+ temp_file.write(summary)
+ temp_file.flush()
+ revision = self._vcs_commit(filename, allow_empty=allow_empty)
+ temp_file.close()
+ finally:
+ os.remove(filename)
+ return revision
+
+ def revision_id(self, index=None):
+ if index == None:
+ return None
+ try:
+ if int(index) != index:
+ raise InvalidRevision(index)
+ except ValueError:
+ raise InvalidRevision(index)
+ revid = self._vcs_revision_id(index)
+ if revid == None:
+ raise libbe.storage.base.InvalidRevision(index)
+ return revid
+
+ def _u_any_in_string(self, list, string):
+ """
+ Return True if any of the strings in list are in string.
+ Otherwise return False.
+ """
+ for list_string in list:
+ if list_string in string:
+ return True
+ return False
+
+ def _u_invoke(self, *args, **kwargs):
+ if 'cwd' not in kwargs:
+ kwargs['cwd'] = self.repo
+ if 'verbose' not in kwargs:
+ kwargs['verbose'] = self.verbose_invoke
+ if 'encoding' not in kwargs:
+ kwargs['encoding'] = self.encoding
+ return invoke(*args, **kwargs)
+
+ def _u_invoke_client(self, *args, **kwargs):
+ cl_args = [self.client]
+ cl_args.extend(args)
+ return self._u_invoke(cl_args, **kwargs)
+
+ def _u_search_parent_directories(self, path, filename):
+ """
+ Find the file (or directory) named filename in path or in any
+ of path's parents.
+
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
+ """
+ return search_parent_directories(path, filename)
+
+ def _u_find_id(self, id, revision):
+ """
+ Search for the relative path to id as of revision.
+ Returns None if the id is not found.
+ """
+ assert self._rooted == True
+ be_dir = self._cached_path_id._spacer_dirs[0]
+ stack = [(be_dir, be_dir)]
+ while len(stack) > 0:
+ path,long_id = stack.pop()
+ if long_id.endswith('/'+id):
+ return path
+ if self._vcs_isdir(path, revision) == False:
+ continue
+ for child in self._vcs_listdir(path, revision):
+ stack.append((os.path.join(path, child),
+ '/'.join([long_id, child])))
+ raise InvalidID(id, revision=revision)
+
+ def _u_path_to_id(self, path):
+ return self._cached_path_id.id(path)
+
+ def _u_rel_path(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> vcs = new()
+ >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
+ '.be'
+ >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
+ '.'
+ >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
+ '.'
+ >>> vcs._u_rel_path("./a", ".")
+ 'a'
+ """
+ if root == None:
+ if self.repo == None:
+ raise VCSNotRooted(self)
+ root = self.repo
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if path in [absRoot, absRootSlashedDir]:
+ return '.'
+ if not path.startswith(absRootSlashedDir):
+ raise InvalidPath(path, absRootSlashedDir)
+ relpath = path[len(absRootSlashedDir):]
+ return relpath
+
+ def _u_abspath(self, path, root=None):
+ """
+ Return the absolute path from a path realtive to root.
+ >>> vcs = new()
+ >>> vcs._u_abspath(".be", "/a.b/c")
+ '/a.b/c/.be'
+ """
+ if root == None:
+ assert self.repo != None, "VCS not rooted"
+ root = self.repo
+ return os.path.abspath(os.path.join(root, path))
+
+ def _u_parse_commitfile(self, commitfile):
+ """
+ Split the commitfile created in self.commit() back into
+ summary and header lines.
+ """
+ f = codecs.open(commitfile, 'r', self.encoding)
+ summary = f.readline()
+ body = f.read()
+ body.lstrip('\n')
+ if len(body) == 0:
+ body = None
+ f.close()
+ return (summary, body)
+
+ def check_storage_version(self):
+ version = self.storage_version()
+ if version != libbe.storage.STORAGE_VERSION:
+ upgrade.upgrade(self.repo, version)
+
+ def storage_version(self, revision=None, path=None):
+ """
+ Requires disk access.
+ """
+ if path == None:
+ path = os.path.join(self.repo, '.be', 'version')
+ if not os.path.exists(path):
+ raise libbe.storage.InvalidStorageVersion(None)
+ if revision == None: # don't require connection
+ return libbe.util.encoding.get_file_contents(
+ path, decode=True).rstrip('\n')
+ contents = self._vcs_get_file_contents(path, revision=revision)
+ if type(contents) != types.UnicodeType:
+ contents = unicode(contents, self.encoding)
+ return contents.strip()
+
+ def _setup_storage_version(self):
+ """
+ Requires disk access.
+ """
+ assert self._rooted == True
+ path = os.path.join(self.be_dir, 'version')
+ if not os.path.exists(path):
+ libbe.util.encoding.set_file_contents(path,
+ libbe.storage.STORAGE_VERSION+'\n')
+ self._vcs_add(self._u_rel_path(path))
+
+
+if libbe.TESTING == True:
+ class VCSTestCase (unittest.TestCase):
+ """
+ Test cases for base VCS class (in addition to the Storage test
+ cases).
+ """
+
+ Class = VCS
+
+ def __init__(self, *args, **kwargs):
+ super(VCSTestCase, self).__init__(*args, **kwargs)
+ self.dirname = None
+
+ def setUp(self):
+ """Set up test fixtures for Storage test case."""
+ super(VCSTestCase, self).setUp()
+ self.dir = Dir()
+ self.dirname = self.dir.path
+ self.s = self.Class(repo=self.dirname)
+ if self.s.installed() == True:
+ self.s.init()
+ self.s.connect()
+
+ def tearDown(self):
+ super(VCSTestCase, self).tearDown()
+ if self.s.installed() == True:
+ self.s.disconnect()
+ self.s.destroy()
+ self.dir.cleanup()
+
+ class VCS_installed_TestCase (VCSTestCase):
+ def test_installed(self):
+ """
+ See if the VCS is installed.
+ """
+ self.failUnless(self.s.installed() == True,
+ '%(name)s VCS not found' % vars(self.Class))
+
+
+ class VCS_detection_TestCase (VCSTestCase):
+ def test_detection(self):
+ """
+ See if the VCS detects its installed repository
+ """
+ if self.s.installed():
+ self.s.disconnect()
+ self.failUnless(self.s._detect(self.dirname) == True,
+ 'Did not detected %(name)s VCS after initialising'
+ % vars(self.Class))
+ self.s.connect()
+
+ def test_no_detection(self):
+ """
+ See if the VCS detects its installed repository
+ """
+ if self.s.installed() and self.Class.name != 'None':
+ self.s.disconnect()
+ self.s.destroy()
+ self.failUnless(self.s._detect(self.dirname) == False,
+ 'Detected %(name)s VCS before initialising'
+ % vars(self.Class))
+ self.s.init()
+ self.s.connect()
+
+ def test_vcs_repo_in_specified_root_path(self):
+ """VCS root directory should be in specified root path."""
+ rp = os.path.realpath(self.s.repo)
+ dp = os.path.realpath(self.dirname)
+ vcs_name = self.Class.name
+ self.failUnless(
+ dp == rp or rp == None,
+ "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
+
+ class VCS_get_user_id_TestCase(VCSTestCase):
+ """Test cases for VCS.get_user_id method."""
+
+ def test_gets_existing_user_id(self):
+ """Should get the existing user ID."""
+ if self.s.installed():
+ user_id = self.s.get_user_id()
+ if user_id == None:
+ return
+ name,email = libbe.ui.util.user.parse_user_id(user_id)
+ if email != None:
+ self.failUnless('@' in email, email)
+
+ def make_vcs_testcase_subclasses(vcs_class, namespace):
+ c = vcs_class()
+ if c.installed():
+ if c.versioned == True:
+ libbe.storage.base.make_versioned_storage_testcase_subclasses(
+ vcs_class, namespace)
+ else:
+ libbe.storage.base.make_storage_testcase_subclasses(
+ vcs_class, namespace)
+
+ if namespace != sys.modules[__name__]:
+ # Make VCSTestCase subclasses for vcs_class in the namespace.
+ vcs_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, VCSTestCase) \
+ and c.Class == VCS]
+
+ for base_class in vcs_testcase_classes:
+ testcase_class_name = vcs_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = vcs_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+
+ make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py
new file mode 100644
index 0000000..7335861
--- /dev/null
+++ b/libbe/storage/vcs/bzr.py
@@ -0,0 +1,196 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# Marien Zwart <marienz@gentoo.org>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Bazaar (bzr) backend.
+"""
+
+try:
+ import bzrlib
+ import bzrlib.branch
+ import bzrlib.builtins
+ import bzrlib.config
+ import bzrlib.errors
+ import bzrlib.option
+except ImportError:
+ bzrlib = None
+import os
+import os.path
+import re
+import shutil
+import StringIO
+
+import libbe
+import base
+
+if libbe.TESTING == True:
+ import doctest
+ import sys
+ import unittest
+
+
+def new():
+ return Bzr()
+
+class Bzr(base.VCS):
+ name = 'bzr'
+ client = None # bzrlib module
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+
+ def _vcs_version(self):
+ if bzrlib == None:
+ return None
+ return bzrlib.__version__
+
+ def _vcs_get_user_id(self):
+ # excerpted from bzrlib.builtins.cmd_whoami.run()
+ try:
+ c = bzrlib.branch.Branch.open_containing(self.repo)[0].get_config()
+ except errors.NotBranchError:
+ c = bzrlib.config.GlobalConfig()
+ return c.username()
+
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, '.bzr') != None :
+ return True
+ return False
+
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ cmd = bzrlib.builtins.cmd_root()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(filename=path)
+ return cmd.outf.getvalue().rstrip('\n')
+
+ def _vcs_init(self, path):
+ cmd = bzrlib.builtins.cmd_init()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(location=path)
+
+ def _vcs_destroy(self):
+ vcs_dir = os.path.join(self.repo, '.bzr')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+
+ def _vcs_add(self, path):
+ path = os.path.join(self.repo, path)
+ cmd = bzrlib.builtins.cmd_add()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(file_list=[path], file_ids_from=self.repo)
+
+ def _vcs_remove(self, path):
+ # --force to also remove unversioned files.
+ path = os.path.join(self.repo, path)
+ cmd = bzrlib.builtins.cmd_remove()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(file_list=[path], file_deletion_strategy='force')
+
+ def _vcs_update(self, path):
+ pass
+
+ def _parse_revision_string(self, revision=None):
+ if revision == None:
+ return revision
+ rev_opt = bzrlib.option.Option.OPTIONS['revision']
+ try:
+ rev_spec = rev_opt.type(revision)
+ except bzrlib.errors.NoSuchRevisionSpec:
+ raise base.InvalidRevision(revision)
+ return rev_spec
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ path = os.path.join(self.repo, path)
+ revision = self._parse_revision_string(revision)
+ cmd = bzrlib.builtins.cmd_cat()
+ cmd.outf = StringIO.StringIO()
+ try:
+ cmd.run(filename=path, revision=revision)
+ except bzrlib.errors.BzrCommandError, e:
+ if 'not present in revision' in str(e):
+ raise base.InvalidPath(path, root=self.repo, revision=revision)
+ raise
+ return cmd.outf.getvalue()
+
+ def _vcs_path(self, id, revision):
+ return self._u_find_id(id, revision)
+
+ def _vcs_isdir(self, path, revision):
+ try:
+ self._vcs_listdir(path, revision)
+ except AttributeError, e:
+ if 'children' in str(e):
+ return False
+ raise
+ return True
+
+ def _vcs_listdir(self, path, revision):
+ path = os.path.join(self.repo, path)
+ revision = self._parse_revision_string(revision)
+ cmd = bzrlib.builtins.cmd_ls()
+ cmd.outf = StringIO.StringIO()
+ try:
+ cmd.run(revision=revision, path=path)
+ except bzrlib.errors.BzrCommandError, e:
+ if 'not present in revision' in str(e):
+ raise base.InvalidPath(path, root=self.repo, revision=revision)
+ raise
+ children = cmd.outf.getvalue().rstrip('\n').splitlines()
+ children = [self._u_rel_path(c, path) for c in children]
+ return children
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ cmd = bzrlib.builtins.cmd_commit()
+ cmd.outf = StringIO.StringIO()
+ cwd = os.getcwd()
+ os.chdir(self.repo)
+ try:
+ cmd.run(file=commitfile, unchanged=allow_empty)
+ except bzrlib.errors.BzrCommandError, e:
+ strings = ['no changes to commit.', # bzr 1.3.1
+ 'No changes to commit.'] # bzr 1.15.1
+ if self._u_any_in_string(strings, str(e)) == True:
+ raise base.EmptyCommit()
+ raise
+ finally:
+ os.chdir(cwd)
+ return self._vcs_revision_id(-1)
+
+ def _vcs_revision_id(self, index):
+ cmd = bzrlib.builtins.cmd_revno()
+ cmd.outf = StringIO.StringIO()
+ cmd.run(location=self.repo)
+ current_revision = int(cmd.outf.getvalue())
+ if index > current_revision or index < -current_revision:
+ return None
+ if index >= 0:
+ return str(index) # bzr commit 0 is the empty tree.
+ return str(current_revision+index+1)
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/darcs.py b/libbe/storage/vcs/darcs.py
new file mode 100644
index 0000000..6d47ea5
--- /dev/null
+++ b/libbe/storage/vcs/darcs.py
@@ -0,0 +1,288 @@
+# Copyright (C) 2009 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Darcs backend.
+"""
+
+import codecs
+import os
+import re
+import shutil
+import sys
+import time # work around http://mercurial.selenic.com/bts/issue618
+try: # import core module, Python >= 2.5
+ from xml.etree import ElementTree
+except ImportError: # look for non-core module
+ from elementtree import ElementTree
+from xml.sax.saxutils import unescape
+
+import libbe
+import base
+
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+
+def new():
+ return Darcs()
+
+class Darcs(base.VCS):
+ name='darcs'
+ client='darcs'
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+ self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618
+
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client('--version')
+ return output.rstrip('\n')
+
+ def version_cmp(self, *args):
+ """
+ Compare the installed darcs version V_i with another version
+ V_o (given in *args). Returns
+ 1 if V_i > V_o,
+ 0 if V_i == V_o, and
+ -1 if V_i < V_o
+ >>> d = Darcs(repo='.')
+ >>> d._vcs_version = lambda : "2.3.1 (release)"
+ >>> d.version_cmp(2,3,1)
+ 0
+ >>> d.version_cmp(2,3,2)
+ -1
+ >>> d.version_cmp(2,3,0)
+ 1
+ >>> d.version_cmp(3)
+ -1
+ >>> d._vcs_version = lambda : "2.0.0pre2"
+ >>> d._parsed_version = None
+ >>> d.version_cmp(3)
+ Traceback (most recent call last):
+ ...
+ NotImplementedError: Cannot parse "2.0.0pre2" portion of Darcs version "2.0.0pre2"
+ invalid literal for int() with base 10: '0pre2'
+ """
+ if not hasattr(self, '_parsed_version') \
+ or self._parsed_version == None:
+ num_part = self._vcs_version().split(' ')[0]
+ try:
+ self._parsed_version = [int(i) for i in num_part.split('.')]
+ except ValueError, e:
+ raise NotImplementedError(
+ 'Cannot parse "%s" portion of Darcs version "%s"\n %s'
+ % (num_part, self._vcs_version(), str(e)))
+ cmps = [cmp(a,b) for a,b in zip(self._parsed_version, args)]
+ for c in cmps:
+ if c != 0:
+ return c
+ return 0
+
+ def _vcs_get_user_id(self):
+ # following http://darcs.net/manual/node4.html#SECTION00410030000000000000
+ # as of June 29th, 2009
+ if self.repo == None:
+ return None
+ darcs_dir = os.path.join(self.repo, '_darcs')
+ if darcs_dir != None:
+ for pref_file in ['author', 'email']:
+ pref_path = os.path.join(darcs_dir, 'prefs', pref_file)
+ if os.path.exists(pref_path):
+ return self.get_file_contents(pref_path)
+ for env_variable in ['DARCS_EMAIL', 'EMAIL']:
+ if env_variable in os.environ:
+ return os.environ[env_variable]
+ return None
+
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, "_darcs") != None :
+ return True
+ return False
+
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ darcs_dir = self._u_search_parent_directories(path, '_darcs')
+ if darcs_dir == None:
+ return None
+ return os.path.dirname(darcs_dir)
+
+ def _vcs_init(self, path):
+ self._u_invoke_client('init', cwd=path)
+
+ def _vcs_destroy(self):
+ vcs_dir = os.path.join(self.repo, '_darcs')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+
+ def _vcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client('add', path)
+
+ def _vcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ os.remove(os.path.join(self.repo, path)) # darcs notices removal
+
+ def _vcs_update(self, path):
+ self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618
+ pass # darcs notices changes
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ if self.version_cmp(2, 0, 0) == 1:
+ status,output,error = self._u_invoke_client( \
+ 'show', 'contents', '--patch', revision, path)
+ return output
+ # Darcs versions < 2.0.0pre2 lack the 'show contents' command
+
+ status,output,error = self._u_invoke_client( \
+ 'diff', '--unified', '--from-patch', revision, path,
+ unicode_output=False)
+ major_patch = output
+ status,output,error = self._u_invoke_client( \
+ 'diff', '--unified', '--patch', revision, path,
+ unicode_output=False)
+ target_patch = output
+
+ # '--output -' to be supported in GNU patch > 2.5.9
+ # but that hasn't been released as of June 30th, 2009.
+
+ # Rewrite path to status before the patch we want
+ args=['patch', '--reverse', path]
+ status,output,error = self._u_invoke(args, stdin=major_patch)
+ # Now apply the patch we want
+ args=['patch', path]
+ status,output,error = self._u_invoke(args, stdin=target_patch)
+
+ if os.path.exists(os.path.join(self.repo, path)) == True:
+ contents = base.VCS._vcs_get_file_contents(self, path)
+ else:
+ contents = ''
+
+ # Now restore path to it's current incarnation
+ args=['patch', '--reverse', path]
+ status,output,error = self._u_invoke(args, stdin=target_patch)
+ args=['patch', path]
+ status,output,error = self._u_invoke(args, stdin=major_patch)
+ current_contents = base.VCS._vcs_get_file_contents(self, path)
+ return contents
+
+ def _vcs_path(self, id, revision):
+ return self._u_find_id(id, revision)
+
+ def _vcs_isdir(self, path, revision):
+ if self.version_cmp(2, 3, 1) == 1:
+ # Sun Nov 15 20:32:06 EST 2009 thomashartman1@gmail.com
+ # * add versioned show files functionality (darcs show files -p 'some patch')
+ status,output,error = self._u_invoke_client( \
+ 'show', 'files', '--no-files', '--patch', revision)
+ children = output.rstrip('\n').splitlines()
+ rpath = '.'
+ children = [self._u_rel_path(c, rpath) for c in children]
+ if path in children:
+ return True
+ return False
+ # Darcs versions <= 2.3.1 lack the --patch option for 'show files'
+ raise NotImplementedError
+
+ def _vcs_listdir(self, path, revision):
+ if self.version_cmp(2, 3, 1) == 1:
+ # Sun Nov 15 20:32:06 EST 2009 thomashartman1@gmail.com
+ # * add versioned show files functionality (darcs show files -p 'some patch')
+ # Wed Dec 9 05:42:21 EST 2009 Luca Molteni <volothamp@gmail.com>
+ # * resolve issue835 show file with file directory arguments
+ path = path.rstrip(os.path.sep)
+ status,output,error = self._u_invoke_client( \
+ 'show', 'files', '--patch', revision, path)
+ files = output.rstrip('\n').splitlines()
+ if path == '.':
+ descendents = [self._u_rel_path(f, path) for f in files
+ if f != '.']
+ else:
+ descendents = [self._u_rel_path(f, path) for f in files
+ if f.startswith(path)]
+ return [f for f in descendents if f.count(os.path.sep) == 0]
+ # Darcs versions <= 2.3.1 lack the --patch option for 'show files'
+ raise NotImplementedError
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ id = self.get_user_id()
+ if id == None or '@' not in id:
+ id = '%s <%s@invalid.com>' % (id, id)
+ args = ['record', '--all', '--author', id, '--logfile', commitfile]
+ status,output,error = self._u_invoke_client(*args)
+ empty_strings = ['No changes!']
+ # work around http://mercurial.selenic.com/bts/issue618
+ if self._u_any_in_string(empty_strings, output) == True \
+ and len(self.__updated) > 0:
+ time.sleep(1)
+ for path in self.__updated:
+ os.utime(os.path.join(self.repo, path), None)
+ status,output,error = self._u_invoke_client(*args)
+ self.__updated = []
+ # end work around
+ if self._u_any_in_string(empty_strings, output) == True:
+ if allow_empty == False:
+ raise base.EmptyCommit()
+ # note that darcs does _not_ make an empty revision.
+ # this returns the last non-empty revision id...
+ revision = self._vcs_revision_id(-1)
+ else:
+ revline = re.compile("Finished recording patch '(.*)'")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client('changes', '--xml')
+ revisions = []
+ xml_str = output.encode('unicode_escape').replace(r'\n', '\n')
+ element = ElementTree.XML(xml_str)
+ assert element.tag == 'changelog', element.tag
+ for patch in element.getchildren():
+ assert patch.tag == 'patch', patch.tag
+ for child in patch.getchildren():
+ if child.tag == 'name':
+ text = unescape(unicode(child.text).decode('unicode_escape').strip())
+ revisions.append(text)
+ revisions.reverse()
+ try:
+ if index > 0:
+ return revisions[index-1]
+ elif index < 0:
+ return revisions[index]
+ else:
+ return None
+ except IndexError:
+ return None
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/git.py b/libbe/storage/vcs/git.py
new file mode 100644
index 0000000..2280665
--- /dev/null
+++ b/libbe/storage/vcs/git.py
@@ -0,0 +1,183 @@
+# Copyright (C) 2008-2009 Ben Finney <benf@cybersource.com.au>
+# Chris Ball <cjb@laptop.org>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Git backend.
+"""
+
+import os
+import os.path
+import re
+import shutil
+import unittest
+
+import libbe
+import libbe.ui.util.user
+import base
+
+if libbe.TESTING == True:
+ import doctest
+ import sys
+
+
+def new():
+ return Git()
+
+class Git(base.VCS):
+ name='git'
+ client='git'
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client('--version')
+ return output
+
+ def _vcs_get_user_id(self):
+ status,output,error = \
+ self._u_invoke_client('config', 'user.name', expect=(0,1))
+ if status == 0:
+ name = output.rstrip('\n')
+ else:
+ name = ''
+ status,output,error = \
+ self._u_invoke_client('config', 'user.email', expect=(0,1))
+ if status == 0:
+ email = output.rstrip('\n')
+ else:
+ email = ''
+ if name != '' or email != '': # got something!
+ # guess missing info, if necessary
+ if name == '':
+ name = libbe.ui.util.user.get_fallback_username()
+ if email == '':
+ email = libe.ui.util.user.get_fallback_email()
+ return libbe.ui.util.user.create_user_id(name, email)
+ return None # Git has no infomation
+
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, '.git') != None :
+ return True
+ return False
+
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ status,output,error = self._u_invoke_client('rev-parse', '--git-dir',
+ cwd=path)
+ gitdir = os.path.join(path, output.rstrip('\n'))
+ dirname = os.path.abspath(os.path.dirname(gitdir))
+ return dirname
+
+ def _vcs_init(self, path):
+ self._u_invoke_client('init', cwd=path)
+
+ def _vcs_destroy(self):
+ vcs_dir = os.path.join(self.repo, '.git')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+
+ def _vcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client('add', path)
+
+ def _vcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ self._u_invoke_client('rm', '-f', path)
+
+ def _vcs_update(self, path):
+ self._vcs_add(path)
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ else:
+ arg = '%s:%s' % (revision,path)
+ status,output,error = self._u_invoke_client('show', arg)
+ return output
+
+
+ def _vcs_path(self, id, revision):
+ return self._u_find_id(id, revision)
+
+ def _vcs_isdir(self, path, revision):
+ arg = '%s:%s' % (revision,path)
+ args = ['ls-tree', arg]
+ kwargs = {'expect':(0,128)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status != 0:
+ if 'not a tree object' in error:
+ return False
+ raise base.CommandError(args, status, stderr=error)
+ return True
+
+ def _vcs_listdir(self, path, revision):
+ arg = '%s:%s' % (revision,path)
+ status,output,error = self._u_invoke_client(
+ 'ls-tree', '--name-only', arg)
+ return output.rstrip('\n').splitlines()
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ args = ['commit', '--all', '--file', commitfile]
+ if allow_empty == True:
+ args.append('--allow-empty')
+ status,output,error = self._u_invoke_client(*args)
+ else:
+ kwargs = {'expect':(0,1)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ strings = ['nothing to commit',
+ 'nothing added to commit']
+ if self._u_any_in_string(strings, output) == True:
+ raise base.EmptyCommit()
+ full_revision = self._vcs_revision_id(-1)
+ assert full_revision[:7] in output, \
+ 'Mismatched revisions:\n%s\n%s' % (full_revision, output)
+ return full_revision
+
+ def _vcs_revision_id(self, index):
+ args = ['rev-list', '--first-parent', '--reverse', 'HEAD']
+ kwargs = {'expect':(0,128)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status == 128:
+ if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
+ return None
+ raise base.CommandError(args, status, stderr=error)
+ revisions = output.splitlines()
+ try:
+ if index > 0:
+ return revisions[index-1]
+ elif index < 0:
+ return revisions[index]
+ else:
+ return None
+ except IndexError:
+ return None
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Git, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py
new file mode 100644
index 0000000..11494a9
--- /dev/null
+++ b/libbe/storage/vcs/hg.py
@@ -0,0 +1,180 @@
+# Copyright (C) 2007-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Mercurial (hg) backend.
+"""
+
+try:
+ import mercurial
+ import mercurial.version
+ import mercurial.dispatch
+ import mercurial.ui
+except ImportError:
+ mercurial = None
+import os
+import os.path
+import re
+import shutil
+import StringIO
+import sys
+import time # work around http://mercurial.selenic.com/bts/issue618
+
+import libbe
+import base
+
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+
+def new():
+ return Hg()
+
+class Hg(base.VCS):
+ name='hg'
+ client=None # mercurial module
+
+ def __init__(self, *args, **kwargs):
+ base.VCS.__init__(self, *args, **kwargs)
+ self.versioned = True
+ self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618
+
+ def _vcs_version(self):
+ if mercurial == None:
+ return None
+ return mercurial.version.get_version()
+
+ def _u_invoke_client(self, *args, **kwargs):
+ if 'cwd' not in kwargs:
+ kwargs['cwd'] = self.repo
+ assert len(kwargs) == 1, kwargs
+ fullargs = ['--cwd', kwargs['cwd']]
+ fullargs.extend(args)
+ stdout = sys.stdout
+ tmp_stdout = StringIO.StringIO()
+ sys.stdout = tmp_stdout
+ cwd = os.getcwd()
+ mercurial.dispatch.dispatch(fullargs)
+ os.chdir(cwd)
+ sys.stdout = stdout
+ return tmp_stdout.getvalue().rstrip('\n')
+
+ def _vcs_get_user_id(self):
+ return self._u_invoke_client('showconfig', 'ui.username')
+
+ def _vcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Mercurial"""
+ if self._u_search_parent_directories(path, '.hg') != None:
+ return True
+ return False
+
+ def _vcs_root(self, path):
+ return self._u_invoke_client('root', cwd=path)
+
+ def _vcs_init(self, path):
+ self._u_invoke_client('init', cwd=path)
+
+ def _vcs_destroy(self):
+ vcs_dir = os.path.join(self.repo, '.hg')
+ if os.path.exists(vcs_dir):
+ shutil.rmtree(vcs_dir)
+
+ def _vcs_add(self, path):
+ self._u_invoke_client('add', path)
+
+ def _vcs_remove(self, path):
+ self._u_invoke_client('rm', '--force', path)
+
+ def _vcs_update(self, path):
+ self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618
+
+ def _vcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return base.VCS._vcs_get_file_contents(self, path, revision)
+ else:
+ return self._u_invoke_client('cat', '-r', revision, path)
+
+ def _vcs_path(self, id, revision):
+ output = self._u_invoke_client('manifest', '--rev', revision)
+ be_dir = self._cached_path_id._spacer_dirs[0]
+ be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
+ files = [f for f in output.splitlines() if f.startswith(be_dir_sep)]
+ for file in files:
+ if not file.startswith(be_dir+os.path.sep):
+ continue
+ parts = file.split(os.path.sep)
+ dir = parts.pop(0) # don't add the first spacer dir
+ for part in parts[:-1]:
+ dir = os.path.join(dir, part)
+ if not dir in files:
+ files.append(dir)
+ for file in files:
+ if self._u_path_to_id(file) == id:
+ return file
+ raise base.InvalidId(id, revision=revision)
+
+ def _vcs_isdir(self, path, revision):
+ output = self._u_invoke_client('manifest', '--rev', revision)
+ files = output.splitlines()
+ if path in files:
+ return False
+ return True
+
+ def _vcs_listdir(self, path, revision):
+ output = self._u_invoke_client('manifest', '--rev', revision)
+ files = output.splitlines()
+ path = path.rstrip(os.path.sep) + os.path.sep
+ return [self._u_rel_path(f, path) for f in files if f.startswith(path)]
+
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ args = ['commit', '--logfile', commitfile]
+ output = self._u_invoke_client(*args)
+ # work around http://mercurial.selenic.com/bts/issue618
+ strings = ['nothing changed']
+ if self._u_any_in_string(strings, output) == True \
+ and len(self.__updated) > 0:
+ time.sleep(1)
+ for path in self.__updated:
+ os.utime(os.path.join(self.repo, path), None)
+ output = self._u_invoke_client(*args)
+ self.__updated = []
+ # end work around
+ if allow_empty == False:
+ strings = ['nothing changed']
+ if self._u_any_in_string(strings, output) == True:
+ raise base.EmptyCommit()
+ return self._vcs_revision_id(-1)
+
+ def _vcs_revision_id(self, index, style='id'):
+ if index > 0:
+ index -= 1
+ args = ['identify', '--rev', str(int(index)), '--%s' % style]
+ output = self._u_invoke_client(*args)
+ id = output.strip()
+ if id == '000000000000':
+ return None # before initial commit.
+ return id
+
+
+if libbe.TESTING == True:
+ base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])