diff options
author | W. Trevor King <wking@drexel.edu> | 2009-12-31 15:54:12 -0500 |
---|---|---|
committer | W. Trevor King <wking@drexel.edu> | 2009-12-31 15:54:12 -0500 |
commit | b0b5341c4045dd27cfbb3e2585cb2614ed9ad903 (patch) | |
tree | 37c7c2d011617ccd7a6f28a24ea77bb1b3cddfe7 /libbe/storage | |
parent | a06030436d3940dddfba37b344f90651366d67e1 (diff) | |
parent | 2d1562d951e763fed71fe60e77cc9921be9abdc9 (diff) | |
download | bugseverywhere-b0b5341c4045dd27cfbb3e2585cb2614ed9ad903.tar.gz |
Merged be.restructure, major internal reorganization.
Added a bunch of classes to make the commands, user interfaces, and
storage backends more abstract and distinct. This should make it much
easier to extend and maintain BE.
Features:
* Directory restructured:
becommands/ -> libbe/commands
submods sorted by functionality.
* Lots of new classes:
Option, Argument, Command
InputOutput, StorageCallbacks, UserInterface
Storage
* Consolidated ID handling in libbe.util.id
* Transitioned VCS backends for Python-based VCSs from subprocess
calss to internal python calls.
Plus the user-visible changes:
* New bugdir/bug/comment ID format replaces old bug:comment format.
* Deprecated support for `be diff` on Arch and Darcs <= 2.3.1. A new
backend abstraction (Storage) makes the former implementation
ungainly.
* Improved command completion.
* Removed commands close, open, email_bugs,
* Flipped some arguments
`be assign BUG-ID [ASSIGNEE]` -> `be status ASSIGNED BUG-ID ...`
`be severity BUG-ID SEVERITY` -> `be severity SEVERITY BUG-ID ...`
`be status BUG-ID STATUS` -> `be status STATUS BUG-ID ...`
In the merge:
* Added 'commit' to list of pagerless commands.
* Updated doc/README.dev
See
#bea86499-824e-4e77-b085-2d581fa9ccab/1100c966-9671-4bc6-8b68-6d408a910da1#
for a discussion of why the changes were made and some of the
difficulties en-route.
Diffstat (limited to 'libbe/storage')
-rw-r--r-- | libbe/storage/__init__.py | 37 | ||||
-rw-r--r-- | libbe/storage/base.py | 909 | ||||
-rw-r--r-- | libbe/storage/util/__init__.py | 0 | ||||
-rw-r--r-- | libbe/storage/util/config.py | 93 | ||||
-rw-r--r-- | libbe/storage/util/mapfile.py | 130 | ||||
-rw-r--r-- | libbe/storage/util/properties.py | 642 | ||||
-rw-r--r-- | libbe/storage/util/settings_object.py | 432 | ||||
-rw-r--r-- | libbe/storage/util/upgrade.py | 331 | ||||
-rw-r--r-- | libbe/storage/vcs/__init__.py | 10 | ||||
-rw-r--r-- | libbe/storage/vcs/arch.py | 350 | ||||
-rw-r--r-- | libbe/storage/vcs/base.py | 1106 | ||||
-rw-r--r-- | libbe/storage/vcs/bzr.py | 196 | ||||
-rw-r--r-- | libbe/storage/vcs/darcs.py | 288 | ||||
-rw-r--r-- | libbe/storage/vcs/git.py | 183 | ||||
-rw-r--r-- | libbe/storage/vcs/hg.py | 180 |
15 files changed, 4887 insertions, 0 deletions
diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py new file mode 100644 index 0000000..e99f799 --- /dev/null +++ b/libbe/storage/__init__.py @@ -0,0 +1,37 @@ +# Copyright + +import base + +ConnectionError = base.ConnectionError +InvalidStorageVersion = base.InvalidStorageVersion +InvalidID = base.InvalidID +InvalidRevision = base.InvalidRevision +InvalidDirectory = base.InvalidDirectory +NotWriteable = base.NotWriteable +NotReadable = base.NotReadable +EmptyCommit = base.EmptyCommit + +# a list of all past versions +STORAGE_VERSIONS = ['Bugs Everywhere Tree 1 0', + 'Bugs Everywhere Directory v1.1', + 'Bugs Everywhere Directory v1.2', + 'Bugs Everywhere Directory v1.3', + 'Bugs Everywhere Directory v1.4', + ] + +# the current version +STORAGE_VERSION = STORAGE_VERSIONS[-1] + +def get_storage(location): + """ + Return a Storage instance from a repo location string. + """ + import vcs + s = vcs.detect_vcs(location) + s.repo = location + return s + +__all__ = [ConnectionError, InvalidStorageVersion, InvalidID, + InvalidRevision, InvalidDirectory, NotWriteable, NotReadable, + EmptyCommit, STORAGE_VERSIONS, STORAGE_VERSION, + get_storage] diff --git a/libbe/storage/base.py b/libbe/storage/base.py new file mode 100644 index 0000000..1c711fa --- /dev/null +++ b/libbe/storage/base.py @@ -0,0 +1,909 @@ +# Copyright + +""" +Abstract bug repository data storage to easily support multiple backends. +""" + +import copy +import os +import pickle +import types + +from libbe.error import NotSupported +import libbe.storage +from libbe.util.tree import Tree +from libbe.util import InvalidObject +from libbe import TESTING + +if TESTING == True: + import doctest + import os.path + import sys + import unittest + + from libbe.util.utility import Dir + +class ConnectionError (Exception): + pass + +class InvalidStorageVersion(ConnectionError): + def __init__(self, active_version, expected_version=None): + if expected_version == None: + expected_version = libbe.storage.STORAGE_VERSION + msg = 'Storage in "%s" not the expected "%s"' \ + % (active_version, expected_version) + Exception.__init__(self, msg) + self.active_version = active_version + self.expected_version = expected_version + +class InvalidID (KeyError): + def __init__(self, id=None, revision=None, msg=None): + if msg == None and id != None: + msg = id + KeyError.__init__(self, msg) + self.id = id + self.revision = revision + +class InvalidRevision (KeyError): + pass + +class InvalidDirectory (Exception): + pass + +class DirectoryNotEmpty (InvalidDirectory): + pass + +class NotWriteable (NotSupported): + def __init__(self, msg): + NotSupported.__init__(self, 'write', msg) + +class NotReadable (NotSupported): + def __init__(self, msg): + NotSupported.__init__(self, 'read', msg) + +class EmptyCommit(Exception): + def __init__(self): + Exception.__init__(self, 'No changes to commit') + + +class Entry (Tree): + def __init__(self, id, value=None, parent=None, directory=False, + children=None): + if children == None: + Tree.__init__(self) + else: + Tree.__init__(self, children) + self.id = id + self.value = value + self.parent = parent + if self.parent != None: + if self.parent.directory == False: + raise InvalidDirectory( + 'Non-directory %s cannot have children' % self.parent) + parent.append(self) + self.directory = directory + + def __str__(self): + return '<Entry %s: %s>' % (self.id, self.value) + + def __repr__(self): + return str(self) + + def __cmp__(self, other, local=False): + if other == None: + return cmp(1, None) + if cmp(self.id, other.id) != 0: + return cmp(self.id, other.id) + if cmp(self.value, other.value) != 0: + return cmp(self.value, other.value) + if local == False: + if self.parent == None: + if cmp(self.parent, other.parent) != 0: + return cmp(self.parent, other.parent) + elif self.parent.__cmp__(other.parent, local=True) != 0: + return self.parent.__cmp__(other.parent, local=True) + for sc,oc in zip(self, other): + if sc.__cmp__(oc, local=True) != 0: + return sc.__cmp__(oc, local=True) + return 0 + + def _objects_to_ids(self): + if self.parent != None: + self.parent = self.parent.id + for i,c in enumerate(self): + self[i] = c.id + return self + + def _ids_to_objects(self, dict): + if self.parent != None: + self.parent = dict[self.parent] + for i,c in enumerate(self): + self[i] = dict[c] + return self + +class Storage (object): + """ + This class declares all the methods required by a Storage + interface. This implementation just keeps the data in a + dictionary and uses pickle for persistent storage. + """ + name = 'Storage' + + def __init__(self, repo='/', encoding='utf-8', options=None): + self.repo = repo + self.encoding = encoding + self.options = options + self.readable = True # soft limit (user choice) + self._readable = True # hard limit (backend choice) + self.writeable = True # soft limit (user choice) + self._writeable = True # hard limit (backend choice) + self.versioned = False + self.can_init = True + self.connected = False + + def __str__(self): + return '<%s %s %s>' % (self.__class__.__name__, id(self), self.repo) + + def __repr__(self): + return str(self) + + def version(self): + """Return a version string for this backend.""" + return '0' + + def storage_version(self, revision=None): + """Return the storage format for this backend.""" + return libbe.storage.STORAGE_VERSION + + def is_readable(self): + return self.readable and self._readable + + def is_writeable(self): + return self.writeable and self._writeable + + def init(self): + """Create a new storage repository.""" + if self.can_init == False: + raise NotSupported('init', + 'Cannot initialize this repository format.') + if self.is_writeable() == False: + raise NotWriteable('Cannot initialize unwriteable storage.') + return self._init() + + def _init(self): + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') + root = Entry(id='__ROOT__', directory=True) + d = {root.id:root} + pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1) + f.close() + + def destroy(self): + """Remove the storage repository.""" + if self.is_writeable() == False: + raise NotWriteable('Cannot destroy unwriteable storage.') + return self._destroy() + + def _destroy(self): + os.remove(os.path.join(self.repo, 'repo.pkl')) + + def connect(self): + """Open a connection to the repository.""" + if self.is_readable() == False: + raise NotReadable('Cannot connect to unreadable storage.') + self._connect() + self.connected = True + + def _connect(self): + try: + f = open(os.path.join(self.repo, 'repo.pkl'), 'rb') + except IOError: + raise ConnectionError(self) + d = pickle.load(f) + self._data = dict((k,v._ids_to_objects(d)) for k,v in d.items()) + f.close() + + def disconnect(self): + """Close the connection to the repository.""" + if self.is_writeable() == False: + return + if self.connected == False: + return + self._disconnect() + self.connected = False + + def _disconnect(self): + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') + pickle.dump(dict((k,v._objects_to_ids()) + for k,v in self._data.items()), f, -1) + f.close() + self._data = None + + def add(self, id, *args, **kwargs): + """Add an entry""" + if self.is_writeable() == False: + raise NotWriteable('Cannot add entry to unwriteable storage.') + try: # Maybe we've already added that id? + self.get(id) + pass # yup, no need to add another + except InvalidID: + self._add(id, *args, **kwargs) + + def _add(self, id, parent=None, directory=False): + if parent == None: + parent = '__ROOT__' + p = self._data[parent] + self._data[id] = Entry(id, parent=p, directory=directory) + + def remove(self, *args, **kwargs): + """Remove an entry.""" + if self.is_writeable() == False: + raise NotSupported('write', + 'Cannot remove entry from unwriteable storage.') + self._remove(*args, **kwargs) + + def _remove(self, id): + if self._data[id].directory == True \ + and len(self.children(id)) > 0: + raise DirectoryNotEmpty(id) + e = self._data.pop(id) + e.parent.remove(e) + + def recursive_remove(self, *args, **kwargs): + """Remove an entry and all its decendents.""" + if self.is_writeable() == False: + raise NotSupported('write', + 'Cannot remove entries from unwriteable storage.') + self._recursive_remove(*args, **kwargs) + + def _recursive_remove(self, id): + for entry in reversed(list(self._data[id].traverse())): + self._remove(entry.id) + + def children(self, *args, **kwargs): + """Return a list of specified entry's children's ids.""" + if self.is_readable() == False: + raise NotReadable('Cannot list children with unreadable storage.') + return self._children(*args, **kwargs) + + def _children(self, id=None, revision=None): + if id == None: + id = '__ROOT__' + return [c.id for c in self._data[id] if not c.id.startswith('__')] + + def get(self, *args, **kwargs): + """ + Get contents of and entry as they were in a given revision. + revision==None specifies the current revision. + + If there is no id, return default, unless default is not + given, in which case raise InvalidID. + """ + if self.is_readable() == False: + raise NotReadable('Cannot get entry with unreadable storage.') + if 'decode' in kwargs: + decode = kwargs.pop('decode') + else: + decode = False + value = self._get(*args, **kwargs) + if value != None: + if decode == True and type(value) != types.UnicodeType: + return unicode(value, self.encoding) + elif decode == False and type(value) != types.StringType: + return value.encode(self.encoding) + return value + + def _get(self, id, default=InvalidObject, revision=None): + if id in self._data: + return self._data[id].value + elif default == InvalidObject: + raise InvalidID(id) + return default + + def set(self, id, value, *args, **kwargs): + """ + Set the entry contents. + """ + if self.is_writeable() == False: + raise NotWriteable('Cannot set entry in unwriteable storage.') + if type(value) == types.UnicodeType: + value = value.encode(self.encoding) + self._set(id, value, *args, **kwargs) + + def _set(self, id, value): + if id not in self._data: + raise InvalidID(id) + if self._data[id].directory == True: + raise InvalidDirectory( + 'Directory %s cannot have data' % self.parent) + self._data[id].value = value + +class VersionedStorage (Storage): + """ + This class declares all the methods required by a Storage + interface that supports versioning. This implementation just + keeps the data in a list and uses pickle for persistent + storage. + """ + name = 'VersionedStorage' + + def __init__(self, *args, **kwargs): + Storage.__init__(self, *args, **kwargs) + self.versioned = True + + def _init(self): + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') + root = Entry(id='__ROOT__', directory=True) + summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit') + body = Entry(id='__COMMIT__BODY__') + initial_commit = {root.id:root, summary.id:summary, body.id:body} + d = dict((k,v._objects_to_ids()) for k,v in initial_commit.items()) + pickle.dump([d, copy.deepcopy(d)], f, -1) # [inital tree, working tree] + f.close() + + def _connect(self): + try: + f = open(os.path.join(self.repo, 'repo.pkl'), 'rb') + except IOError: + raise ConnectionError(self) + d = pickle.load(f) + self._data = [dict((k,v._ids_to_objects(t)) for k,v in t.items()) + for t in d] + f.close() + + def _disconnect(self): + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') + pickle.dump([dict((k,v._objects_to_ids()) + for k,v in t.items()) for t in self._data], f, -1) + f.close() + self._data = None + + def _add(self, id, parent=None, directory=False): + if parent == None: + parent = '__ROOT__' + p = self._data[-1][parent] + self._data[-1][id] = Entry(id, parent=p, directory=directory) + + def _remove(self, id): + if self._data[-1][id].directory == True \ + and len(self.children(id)) > 0: + raise DirectoryNotEmpty(id) + e = self._data[-1].pop(id) + e.parent.remove(e) + + def _recursive_remove(self, id): + for entry in reversed(list(self._data[-1][id].traverse())): + self._remove(entry.id) + + def _children(self, id=None, revision=None): + if id == None: + id = '__ROOT__' + if revision == None: + revision = -1 + return [c.id for c in self._data[revision][id] + if not c.id.startswith('__')] + + def _get(self, id, default=InvalidObject, revision=None): + if revision == None: + revision = -1 + if id in self._data[revision]: + return self._data[revision][id].value + elif default == InvalidObject: + raise InvalidID(id) + return default + + def _set(self, id, value): + if id not in self._data[-1]: + raise InvalidID(id) + self._data[-1][id].value = value + + def commit(self, *args, **kwargs): + """ + Commit the current repository, with a commit message string + summary and body. Return the name of the new revision. + + If allow_empty == False (the default), raise EmptyCommit if + there are no changes to commit. + """ + if self.is_writeable() == False: + raise NotWriteable('Cannot commit to unwriteable storage.') + return self._commit(*args, **kwargs) + + def _commit(self, summary, body=None, allow_empty=False): + if self._data[-1] == self._data[-2] and allow_empty == False: + raise EmptyCommit + self._data[-1]["__COMMIT__SUMMARY__"].value = summary + self._data[-1]["__COMMIT__BODY__"].value = body + rev = len(self._data)-1 + self._data.append(copy.deepcopy(self._data[-1])) + return rev + + def revision_id(self, index=None): + """ + Return the name of the <index>th revision. The choice of + which branch to follow when crossing branches/merges is not + defined. Revision indices start at 1; ID 0 is the blank + repository. + + Return None if index==None. + + If the specified revision does not exist, raise InvalidRevision. + """ + if index == None: + return None + try: + if int(index) != index: + raise InvalidRevision(index) + except ValueError: + raise InvalidRevision(index) + L = len(self._data) - 1 # -1 b/c of initial commit + if index >= -L and index <= L: + return index % L + raise InvalidRevision(i) + +if TESTING == True: + class StorageTestCase (unittest.TestCase): + """Test cases for base Storage class.""" + + Class = Storage + + def __init__(self, *args, **kwargs): + super(StorageTestCase, self).__init__(*args, **kwargs) + self.dirname = None + + def setUp(self): + """Set up test fixtures for Storage test case.""" + super(StorageTestCase, self).setUp() + self.dir = Dir() + self.dirname = self.dir.path + self.s = self.Class(repo=self.dirname) + self.assert_failed_connect() + self.s.init() + self.s.connect() + + def tearDown(self): + super(StorageTestCase, self).tearDown() + self.s.disconnect() + self.s.destroy() + self.assert_failed_connect() + self.dir.cleanup() + + def assert_failed_connect(self): + try: + self.s.connect() + self.fail( + "Connected to %(name)s repository before initialising" + % vars(self.Class)) + except ConnectionError: + pass + + class Storage_init_TestCase (StorageTestCase): + """Test cases for Storage.init method.""" + + def test_connect_should_succeed_after_init(self): + """Should connect after initialization.""" + self.s.connect() + + class Storage_connect_disconnect_TestCase (StorageTestCase): + """Test cases for Storage.connect and .disconnect methods.""" + + def test_multiple_disconnects(self): + """Should be able to call .disconnect multiple times.""" + self.s.disconnect() + self.s.disconnect() + + class Storage_add_remove_TestCase (StorageTestCase): + """Test cases for Storage.add, .remove, and .recursive_remove methods.""" + + def test_initially_empty(self): + """New repository should be empty.""" + self.failUnless(len(self.s.children()) == 0, self.s.children()) + + def test_add_identical_rooted(self): + """ + Adding entries with the same ID should not increase the number of children. + """ + for i in range(10): + self.s.add('some id', directory=False) + s = sorted(self.s.children()) + self.failUnless(s == ['some id'], s) + + def test_add_rooted(self): + """ + Adding entries should increase the number of children (rooted). + """ + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], directory=(i % 2 == 0)) + s = sorted(self.s.children()) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + + def test_add_nonrooted(self): + """ + Adding entries should increase the number of children (nonrooted). + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + s = self.s.children() + self.failUnless(s == ['parent'], s) + + def test_children(self): + """ + Non-UUID ids should be returned as such. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append('parent/%s' % str(i)) + self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + + def test_add_invalid_directory(self): + """ + Should not be able to add children to non-directories. + """ + self.s.add('parent', directory=False) + try: + self.s.add('child', 'parent', directory=False) + self.fail( + '%s.add() succeeded instead of raising InvalidDirectory' + % (vars(self.Class)['name'])) + except InvalidDirectory: + pass + try: + self.s.add('child', 'parent', directory=True) + self.fail( + '%s.add() succeeded instead of raising InvalidDirectory' + % (vars(self.Class)['name'])) + except InvalidDirectory: + pass + self.failUnless(len(self.s.children('parent')) == 0, + self.s.children('parent')) + + def test_remove_rooted(self): + """ + Removing entries should decrease the number of children (rooted). + """ + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], directory=(i % 2 == 0)) + for i in range(10): + self.s.remove(ids.pop()) + s = sorted(self.s.children()) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + + def test_remove_nonrooted(self): + """ + Removing entries should decrease the number of children (nonrooted). + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=False)#(i % 2 == 0)) + for i in range(10): + self.s.remove(ids.pop()) + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + if len(s) > 0: + s = self.s.children() + self.failUnless(s == ['parent'], s) + + def test_remove_directory_not_empty(self): + """ + Removing a non-empty directory entry should raise exception. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) + self.s.remove(ids.pop()) # empty directory removal succeeds + try: + self.s.remove('parent') # empty directory removal succeeds + self.fail( + "%s.remove() didn't raise DirectoryNotEmpty" + % (vars(self.Class)['name'])) + except DirectoryNotEmpty: + pass + + def test_recursive_remove(self): + """ + Recursive remove should empty the tree. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=True) + for j in range(10): # add some grandkids + self.s.add(str(20*(i+1)+j), ids[-1], directory=(i%2 == 0)) + self.s.recursive_remove('parent') + s = sorted(self.s.children()) + self.failUnless(s == [], s) + + class Storage_get_set_TestCase (StorageTestCase): + """Test cases for Storage.get and .set methods.""" + + id = 'unlikely id' + val = 'unlikely value' + + def test_get_default(self): + """ + Get should return specified default if id not in Storage. + """ + ret = self.s.get(self.id, default=self.val) + self.failUnless(ret == self.val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, self.val)) + + def test_get_default_exception(self): + """ + Get should raise exception if id not in Storage and no default. + """ + try: + ret = self.s.get(self.id) + self.fail( + "%s.get() returned %s instead of raising InvalidID" + % (vars(self.Class)['name'], ret)) + except InvalidID: + pass + + def test_get_initial_value(self): + """ + Data value should be None before any value has been set. + """ + self.s.add(self.id, directory=False) + ret = self.s.get(self.id) + self.failUnless(ret == None, + "%s.get() returned %s not None" + % (vars(self.Class)['name'], ret)) + + def test_set_exception(self): + """ + Set should raise exception if id not in Storage. + """ + try: + self.s.set(self.id, self.val) + self.fail( + "%(name)s.set() did not raise InvalidID" + % vars(self.Class)) + except InvalidID: + pass + + def test_set(self): + """ + Set should define the value returned by get. + """ + self.s.add(self.id, directory=False) + self.s.set(self.id, self.val) + ret = self.s.get(self.id) + self.failUnless(ret == self.val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, self.val)) + + def test_unicode_set(self): + """ + Set should define the value returned by get. + """ + val = u'Fran\xe7ois' + self.s.add(self.id, directory=False) + self.s.set(self.id, val) + ret = self.s.get(self.id, decode=True) + self.failUnless(type(ret) == types.UnicodeType, + "%s.get() returned %s not UnicodeType" + % (vars(self.Class)['name'], type(ret))) + self.failUnless(ret == val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, self.val)) + ret = self.s.get(self.id) + self.failUnless(type(ret) == types.StringType, + "%s.get() returned %s not StringType" + % (vars(self.Class)['name'], type(ret))) + s = unicode(ret, self.s.encoding) + self.failUnless(s == val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], s, self.val)) + + + class Storage_persistence_TestCase (StorageTestCase): + """Test cases for Storage.disconnect and .connect methods.""" + + id = 'unlikely id' + val = 'unlikely value' + + def test_get_set_persistence(self): + """ + Set should define the value returned by get after reconnect. + """ + self.s.add(self.id, directory=False) + self.s.set(self.id, self.val) + self.s.disconnect() + self.s.connect() + ret = self.s.get(self.id) + self.failUnless(ret == self.val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, self.val)) + + def test_add_nonrooted_persistence(self): + """ + Adding entries should increase the number of children after reconnect. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) + self.s.disconnect() + self.s.connect() + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + s = self.s.children() + self.failUnless(s == ['parent'], s) + + class VersionedStorageTestCase (StorageTestCase): + """Test cases for base VersionedStorage class.""" + + Class = VersionedStorage + + class VersionedStorage_commit_TestCase (VersionedStorageTestCase): + """Test cases for VersionedStorage methods.""" + + id = 'unlikely id' + val = 'Some value' + commit_msg = 'Committing something interesting' + commit_body = 'Some\nlonger\ndescription\n' + + def _setup_for_empty_commit(self): + """ + Initialization might add some files to version control, so + commit those first, before testing the empty commit + functionality. + """ + try: + self.s.commit('Added initialization files') + except EmptyCommit: + pass + + def test_revision_id_exception(self): + """ + Invalid revision id should raise InvalidRevision. + """ + try: + rev = self.s.revision_id('highly unlikely revision id') + self.fail( + "%s.revision_id() didn't raise InvalidRevision, returned %s." + % (vars(self.Class)['name'], rev)) + except InvalidRevision: + pass + + def test_empty_commit_raises_exception(self): + """ + Empty commit should raise exception. + """ + self._setup_for_empty_commit() + try: + self.s.commit(self.commit_msg, self.commit_body) + self.fail( + "Empty %(name)s.commit() didn't raise EmptyCommit." + % vars(self.Class)) + except EmptyCommit: + pass + + def test_empty_commit_allowed(self): + """ + Empty commit should _not_ raise exception if allow_empty=True. + """ + self._setup_for_empty_commit() + self.s.commit(self.commit_msg, self.commit_body, + allow_empty=True) + + def test_commit_revision_ids(self): + """ + Commit / revision_id should agree on revision ids. + """ + def val(i): + return '%s:%d' % (self.val, i+1) + self.s.add(self.id, directory=False) + revs = [] + for i in range(10): + self.s.set(self.id, val(i)) + revs.append(self.s.commit('%s: %d' % (self.commit_msg, i), + self.commit_body)) + for i in range(10): + rev = self.s.revision_id(i+1) + self.failUnless(rev == revs[i], + "%s.revision_id(%d) returned %s not %s" + % (vars(self.Class)['name'], i+1, rev, revs[i])) + for i in range(-1, -9, -1): + rev = self.s.revision_id(i) + self.failUnless(rev == revs[i], + "%s.revision_id(%d) returned %s not %s" + % (vars(self.Class)['name'], i, rev, revs[i])) + + def test_get_previous_version(self): + """ + Get should be able to return the previous version. + """ + def val(i): + return '%s:%d' % (self.val, i+1) + self.s.add(self.id, directory=False) + revs = [] + for i in range(10): + self.s.set(self.id, val(i)) + revs.append(self.s.commit('%s: %d' % (self.commit_msg, i), + self.commit_body)) + for i in range(10): + ret = self.s.get(self.id, revision=revs[i]) + self.failUnless(ret == val(i), + "%s.get() returned %s not %s for revision %s" + % (vars(self.Class)['name'], ret, val(i), revs[i])) + + def test_get_previous_children(self): + """ + Children list should be revision dependent. + """ + self.s.add('parent', directory=True) + revs = [] + cur_children = [] + children = [] + for i in range(10): + new_child = str(i) + self.s.add(new_child, 'parent', directory=(i % 2 == 0)) + self.s.set(new_child, self.val) + revs.append(self.s.commit('%s: %d' % (self.commit_msg, i), + self.commit_body)) + cur_children.append(new_child) + children.append(list(cur_children)) + for i in range(10): + ret = self.s.children('parent', revision=revs[i]) + self.failUnless(ret == children[i], + "%s.get() returned %s not %s for revision %s" + % (vars(self.Class)['name'], ret, + children[i], revs[i])) + + def make_storage_testcase_subclasses(storage_class, namespace): + """Make StorageTestCase subclasses for storage_class in namespace.""" + storage_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, StorageTestCase) \ + and c.Class == Storage] + + for base_class in storage_testcase_classes: + testcase_class_name = storage_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = storage_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + def make_versioned_storage_testcase_subclasses(storage_class, namespace): + """Make VersionedStorageTestCase subclasses for storage_class in namespace.""" + storage_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, StorageTestCase) \ + and c.Class == Storage] + + for base_class in storage_testcase_classes: + testcase_class_name = storage_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = storage_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + make_storage_testcase_subclasses(VersionedStorage, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/util/__init__.py b/libbe/storage/util/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/libbe/storage/util/__init__.py diff --git a/libbe/storage/util/config.py b/libbe/storage/util/config.py new file mode 100644 index 0000000..9f95d14 --- /dev/null +++ b/libbe/storage/util/config.py @@ -0,0 +1,93 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Create, save, and load the per-user config file at path(). +""" + +import ConfigParser +import codecs +import os.path + +import libbe +import libbe.util.encoding +if libbe.TESTING == True: + import doctest + + +default_encoding = libbe.util.encoding.get_filesystem_encoding() + +def path(): + """Return the path to the per-user config file""" + return os.path.expanduser("~/.bugs_everywhere") + +def set_val(name, value, section="DEFAULT", encoding=None): + """Set a value in the per-user config file + + :param name: The name of the value to set + :param value: The new value to set (or None to delete the value) + :param section: The section to store the name/value in + """ + if encoding == None: + encoding = default_encoding + config = ConfigParser.ConfigParser() + if os.path.exists(path()) == False: # touch file or config + open(path(), 'w').close() # read chokes on missing file + f = codecs.open(path(), 'r', encoding) + config.readfp(f, path()) + f.close() + if value is not None: + config.set(section, name, value) + else: + config.remove_option(section, name) + f = codecs.open(path(), 'w', encoding) + config.write(f) + f.close() + +def get_val(name, section="DEFAULT", default=None, encoding=None): + """ + Get a value from the per-user config file + + :param name: The name of the value to get + :section: The section that the name is in + :return: The value, or None + >>> get_val("junk") is None + True + >>> set_val("junk", "random") + >>> get_val("junk") + u'random' + >>> set_val("junk", None) + >>> get_val("junk") is None + True + """ + if os.path.exists(path()): + if encoding == None: + encoding = default_encoding + config = ConfigParser.ConfigParser() + f = codecs.open(path(), 'r', encoding) + config.readfp(f, path()) + f.close() + try: + return config.get(section, name) + except ConfigParser.NoOptionError: + return default + else: + return default + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/storage/util/mapfile.py b/libbe/storage/util/mapfile.py new file mode 100644 index 0000000..35ae1a0 --- /dev/null +++ b/libbe/storage/util/mapfile.py @@ -0,0 +1,130 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Provide a means of saving and loading dictionaries of parameters. The +saved "mapfiles" should be clear, flat-text files, and allow easy merging of +independent/conflicting changes. +""" + +import errno +import os.path +import types +import yaml + +import libbe +if libbe.TESTING == True: + import doctest + + +class IllegalKey(Exception): + def __init__(self, key): + Exception.__init__(self, 'Illegal key "%s"' % key) + self.key = key + +class IllegalValue(Exception): + def __init__(self, value): + Exception.__init__(self, 'Illegal value "%s"' % value) + self.value = value + +class InvalidMapfileContents(Exception): + def __init__(self, contents): + Exception.__init__(self, 'Invalid YAML contents') + self.contents = contents + +def generate(map): + """Generate a YAML mapfile content string. + >>> generate({'q':'p'}) + 'q: p\\n\\n' + >>> generate({'q':u'Fran\u00e7ais'}) + 'q: Fran\\xc3\\xa7ais\\n\\n' + >>> generate({'q':u'hello'}) + 'q: hello\\n\\n' + >>> generate({'q=':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q=" + >>> generate({'q:':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q:" + >>> generate({'q\\n':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q\\n" + >>> generate({'':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "" + >>> generate({'>q':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key ">q" + >>> generate({'q':'p\\n'}) + Traceback (most recent call last): + IllegalValue: Illegal value "p\\n" + """ + keys = map.keys() + keys.sort() + for key in keys: + try: + assert not key.startswith('>') + assert('\n' not in key) + assert('=' not in key) + assert(':' not in key) + assert(len(key) > 0) + except AssertionError: + raise IllegalKey(unicode(key).encode('unicode_escape')) + if '\n' in map[key]: + raise IllegalValue(unicode(map[key]).encode('unicode_escape')) + + lines = [] + for key in keys: + lines.append(yaml.safe_dump({key: map[key]}, + default_flow_style=False, + allow_unicode=True)) + lines.append('') + return '\n'.join(lines) + +def parse(contents): + """ + Parse a YAML mapfile string. + >>> parse('q: p\\n\\n')['q'] + 'p' + >>> parse('q: \\'p\\'\\n\\n')['q'] + 'p' + >>> contents = generate({'a':'b', 'c':'d', 'e':'f'}) + >>> dict = parse(contents) + >>> dict['a'] + 'b' + >>> dict['c'] + 'd' + >>> dict['e'] + 'f' + >>> contents = generate({'q':u'Fran\u00e7ais'}) + >>> dict = parse(contents) + >>> dict['q'] + u'Fran\\xe7ais' + >>> dict = parse('a!') + Traceback (most recent call last): + ... + InvalidMapfileContents: Invalid YAML contents + """ + c = yaml.load(contents) + if type(c) == types.StringType: + raise InvalidMapfileContents( + 'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents) + return c or {} + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py new file mode 100644 index 0000000..ddd7b25 --- /dev/null +++ b/libbe/storage/util/properties.py @@ -0,0 +1,642 @@ +# Bugs Everywhere - a distributed bugtracker +# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +This module provides a series of useful decorators for defining +various types of properties. For example usage, consider the +unittests at the end of the module. + +See + http://www.python.org/dev/peps/pep-0318/ +and + http://www.phyast.pitt.edu/~micheles/python/documentation.html +for more information on decorators. +""" + +import copy +import types + +import libbe +if libbe.TESTING == True: + import unittest + + +class ValueCheckError (ValueError): + def __init__(self, name, value, allowed): + action = "in" # some list of allowed values + if type(allowed) == types.FunctionType: + action = "allowed by" # some allowed-value check function + msg = "%s not %s %s for %s" % (value, action, allowed, name) + ValueError.__init__(self, msg) + self.name = name + self.value = value + self.allowed = allowed + +def Property(funcs): + """ + End a chain of property decorators, returning a property. + """ + args = {} + args["fget"] = funcs.get("fget", None) + args["fset"] = funcs.get("fset", None) + args["fdel"] = funcs.get("fdel", None) + args["doc"] = funcs.get("doc", None) + + #print "Creating a property with" + #for key, val in args.items(): print key, value + return property(**args) + +def doc_property(doc=None): + """ + Add a docstring to a chain of property decorators. + """ + def decorator(funcs=None): + """ + Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...} + or a function fn() returning such a dict. + """ + if hasattr(funcs, "__call__"): + funcs = funcs() # convert from function-arg to dict + funcs["doc"] = doc + return funcs + return decorator + +def local_property(name, null=None, mutable_null=False): + """ + Define get/set access to per-parent-instance local storage. Uses + ._<name>_value to store the value for a particular owner instance. + If the ._<name>_value attribute does not exist, returns null. + + If mutable_null == True, we only release deepcopies of the null to + the outside world. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget", None) + fset = funcs.get("fset", None) + def _fget(self): + if fget is not None: + fget(self) + if mutable_null == True: + ret_null = copy.deepcopy(null) + else: + ret_null = null + value = getattr(self, "_%s_value" % name, ret_null) + return value + def _fset(self, value): + setattr(self, "_%s_value" % name, value) + if fset is not None: + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + funcs["name"] = name + return funcs + return decorator + +def settings_property(name, null=None): + """ + Similar to local_property, except where local_property stores the + value in instance._<name>_value, settings_property stores the + value in instance.settings[name]. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget", None) + fset = funcs.get("fset", None) + def _fget(self): + if fget is not None: + fget(self) + value = self.settings.get(name, null) + return value + def _fset(self, value): + self.settings[name] = value + if fset is not None: + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + funcs["name"] = name + return funcs + return decorator + + +# Allow comparison and caching with _original_ values for mutables, +# since +# +# >>> a = [] +# >>> b = a +# >>> b.append(1) +# >>> a +# [1] +# >>> a==b +# True +def _hash_mutable_value(value): + return repr(value) +def _init_mutable_property_cache(self): + if not hasattr(self, "_mutable_property_cache_hash"): + # first call to _fget for any mutable property + self._mutable_property_cache_hash = {} + self._mutable_property_cache_copy = {} +def _set_cached_mutable_property(self, cacher_name, property_name, value): + _init_mutable_property_cache(self) + self._mutable_property_cache_hash[(cacher_name, property_name)] = \ + _hash_mutable_value(value) + self._mutable_property_cache_copy[(cacher_name, property_name)] = \ + copy.deepcopy(value) +def _get_cached_mutable_property(self, cacher_name, property_name, default=None): + _init_mutable_property_cache(self) + if (cacher_name, property_name) not in self._mutable_property_cache_copy: + return default + return self._mutable_property_cache_copy[(cacher_name, property_name)] +def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None): + _init_mutable_property_cache(self) + if (cacher_name, property_name) not in self._mutable_property_cache_hash: + _set_cached_mutable_property(self, cacher_name, property_name, default) + old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)] + return cmp(_hash_mutable_value(value), old_hash) + + +def defaulting_property(default=None, null=None, + mutable_default=False): + """ + Define a default value for get access to a property. + If the stored value is null, then default is returned. + + If mutable_default == True, we only release deepcopies of the + default to the outside world. + + null should never escape to the outside world, so don't worry + about it being a mutable. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value == null: + if mutable_default == True: + return copy.deepcopy(default) + else: + return default + return value + def _fset(self, value): + if value == default: + value = null + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def fn_checked_property(value_allowed_fn): + """ + Define allowed values for get/set access to a property. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value_allowed_fn(value) != True: + raise ValueCheckError(name, value, value_allowed_fn) + return value + def _fset(self, value): + if value_allowed_fn(value) != True: + raise ValueCheckError(name, value, value_allowed_fn) + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def checked_property(allowed=[]): + """ + Define allowed values for get/set access to a property. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value not in allowed: + raise ValueCheckError(name, value, allowed) + return value + def _fset(self, value): + if value not in allowed: + raise ValueCheckError(name, value, allowed) + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def cached_property(generator, initVal=None, mutable=False): + """ + Allow caching of values generated by generator(instance), where + instance is the instance to which this property belongs. Uses + ._<name>_cache to store a cache flag for a particular owner + instance. + + When the cache flag is True or missing and the stored value is + initVal, the first fget call triggers the generator function, + whose output is stored in _<name>_cached_value. That and + subsequent calls to fget will return this cached value. + + If the input value is no longer initVal (e.g. a value has been + loaded from disk or set with fset), that value overrides any + cached value, and this property has no effect. + + When the cache flag is False and the stored value is initVal, the + generator is not cached, but is called on every fget. + + The cache flag is missing on initialization. Particular instances + may override by setting their own flag. + + In the case that mutable == True, all caching is disabled and the + generator is called whenever the cached value would otherwise be + used. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + name = funcs.get("name", "<unknown>") + def _fget(self): + cache = getattr(self, "_%s_cache" % name, True) + value = fget(self) + if value == initVal: + if cache == True and mutable == False: + if hasattr(self, "_%s_cached_value" % name): + value = getattr(self, "_%s_cached_value" % name) + else: + value = generator(self) + setattr(self, "_%s_cached_value" % name, value) + else: + value = generator(self) + return value + funcs["fget"] = _fget + return funcs + return decorator + +def primed_property(primer, initVal=None): + """ + Just like a cached_property, except that instead of returning a + new value and running fset to cache it, the primer performs some + background manipulation (e.g. loads data into instance.settings) + such that a _second_ pass through fget succeeds. + + The 'cache' flag becomes a 'prime' flag, with priming taking place + whenever ._<name>_prime is True, or is False or missing and + value == initVal. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + name = funcs.get("name", "<unknown>") + def _fget(self): + prime = getattr(self, "_%s_prime" % name, False) + if prime == False: + value = fget(self) + if prime == True or (prime == False and value == initVal): + primer(self) + value = fget(self) + return value + funcs["fget"] = _fget + return funcs + return decorator + +def change_hook_property(hook, mutable=False, default=None): + """ + Call the function hook(instance, old_value, new_value) whenever a + value different from the current value is set (instance is a a + reference to the class instance to which this property belongs). + This is useful for saving changes to disk, etc. This function is + called _after_ the new value has been stored, allowing you to + change the stored value if you want. + + In the case of mutables, things are slightly trickier. Because + the property-owning class has no way of knowing when the value + changes. We work around this by caching a private deepcopy of the + mutable value, and checking for changes whenever the property is + set (obviously) or retrieved (to check for external changes). So + long as you're conscientious about accessing the property after + making external modifications, mutability won't be a problem. + t.x.append(5) # external modification + t.x # dummy access notices change and triggers hook + See testChangeHookMutableProperty for an example of the expected + behavior. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self, new_value=None, from_fset=False): # only used if mutable == True + if from_fset == True: + value = new_value # compare new value with cached + else: + value = fget(self) # compare current value with cached + if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0: + # there has been a change, cache new value + old_value = _get_cached_mutable_property(self, "change hook property", name, default) + _set_cached_mutable_property(self, "change hook property", name, value) + if from_fset == True: # return previously cached value + value = old_value + else: # the value changed while we weren't looking + hook(self, old_value, value) + return value + def _fset(self, value): + if mutable == True: # get cached previous value + old_value = _fget(self, new_value=value, from_fset=True) + else: + old_value = fget(self) + fset(self, value) + if value != old_value: + hook(self, old_value, value) + if mutable == True: + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +if libbe.TESTING == True: + class DecoratorTests(unittest.TestCase): + def testLocalDoc(self): + class Test(object): + @Property + @doc_property("A fancy property") + def x(): + return {} + self.failUnless(Test.x.__doc__ == "A fancy property", + Test.x.__doc__) + def testLocalProperty(self): + class Test(object): + @Property + @local_property(name="LOCAL") + def x(): + return {} + t = Test() + self.failUnless(t.x == None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value + self.failUnless(t.x == 'z', str(t.x)) + self.failUnless("_LOCAL_value" in dir(t), dir(t)) + self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value) + def testSettingsProperty(self): + class Test(object): + @Property + @settings_property(name="attr") + def x(): + return {} + def __init__(self): + self.settings = {} + t = Test() + self.failUnless(t.x == None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value + self.failUnless(t.x == 'z', str(t.x)) + self.failUnless("attr" in t.settings, t.settings) + self.failUnless(t.settings["attr"] == 'z', t.settings["attr"]) + def testDefaultingLocalProperty(self): + class Test(object): + @Property + @defaulting_property(default='y', null='x') + @local_property(name="DEFAULT", null=5) + def x(): return {} + t = Test() + self.failUnless(t.x == 5, str(t.x)) + t.x = 'x' + self.failUnless(t.x == 'y', str(t.x)) + t.x = 'y' + self.failUnless(t.x == 'y', str(t.x)) + t.x = 'z' + self.failUnless(t.x == 'z', str(t.x)) + t.x = 5 + self.failUnless(t.x == 5, str(t.x)) + def testCheckedLocalProperty(self): + class Test(object): + @Property + @checked_property(allowed=['x', 'y', 'z']) + @local_property(name="CHECKED") + def x(): return {} + def __init__(self): + self._CHECKED_value = 'x' + t = Test() + self.failUnless(t.x == 'x', str(t.x)) + try: + t.x = None + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + def testTwoCheckedLocalProperties(self): + class Test(object): + @Property + @checked_property(allowed=['x', 'y', 'z']) + @local_property(name="X") + def x(): return {} + + @Property + @checked_property(allowed=['a', 'b', 'c']) + @local_property(name="A") + def a(): return {} + def __init__(self): + self._A_value = 'a' + self._X_value = 'x' + t = Test() + try: + t.x = 'a' + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + t.x = 'x' + t.x = 'y' + t.x = 'z' + try: + t.a = 'x' + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + t.a = 'a' + t.a = 'b' + t.a = 'c' + def testFnCheckedLocalProperty(self): + class Test(object): + @Property + @fn_checked_property(lambda v : v in ['x', 'y', 'z']) + @local_property(name="CHECKED") + def x(): return {} + def __init__(self): + self._CHECKED_value = 'x' + t = Test() + self.failUnless(t.x == 'x', str(t.x)) + try: + t.x = None + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + def testCachedLocalProperty(self): + class Gen(object): + def __init__(self): + self.i = 0 + def __call__(self, owner): + self.i += 1 + return self.i + class Test(object): + @Property + @cached_property(generator=Gen(), initVal=None) + @local_property(name="CACHED") + def x(): return {} + t = Test() + self.failIf("_CACHED_cache" in dir(t), + getattr(t, "_CACHED_cache", None)) + self.failUnless(t.x == 1, t.x) + self.failUnless(t.x == 1, t.x) + self.failUnless(t.x == 1, t.x) + t.x = 8 + self.failUnless(t.x == 8, t.x) + self.failUnless(t.x == 8, t.x) + t._CACHED_cache = False # Caching is off, but the stored value + val = t.x # is 8, not the initVal (None), so we + self.failUnless(val == 8, val) # get 8. + t._CACHED_value = None # Now we've set the stored value to None + val = t.x # so future calls to fget (like this) + self.failUnless(val == 2, val) # will call the generator every time... + val = t.x + self.failUnless(val == 3, val) + val = t.x + self.failUnless(val == 4, val) + t._CACHED_cache = True # We turn caching back on, and get + self.failUnless(t.x == 1, str(t.x)) # the original cached value. + del t._CACHED_cached_value # Removing that value forces a + self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call + self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which + self.failUnless(t.x == 5, str(t.x)) # we get the new cached value. + def testPrimedLocalProperty(self): + class Test(object): + def prime(self): + self.settings["PRIMED"] = "initialized" + @Property + @primed_property(primer=prime, initVal=None) + @settings_property(name="PRIMED") + def x(): return {} + def __init__(self): + self.settings={} + t = Test() + self.failIf("_PRIMED_prime" in dir(t), + getattr(t, "_PRIMED_prime", None)) + self.failUnless(t.x == "initialized", t.x) + t.x = 1 + self.failUnless(t.x == 1, t.x) + t.x = None + self.failUnless(t.x == "initialized", t.x) + t._PRIMED_prime = True + t.x = 3 + self.failUnless(t.x == "initialized", t.x) + t._PRIMED_prime = False + t.x = 3 + self.failUnless(t.x == 3, t.x) + def testChangeHookLocalProperty(self): + class Test(object): + def _hook(self, old, new): + self.old = old + self.new = new + + @Property + @change_hook_property(_hook) + @local_property(name="HOOKED") + def x(): return {} + t = Test() + t.x = 1 + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == 1, t.new) + t.x = 1 + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == 1, t.new) + t.x = 2 + self.failUnless(t.old == 1, t.old) + self.failUnless(t.new == 2, t.new) + def testChangeHookMutableProperty(self): + class Test(object): + def _hook(self, old, new): + self.old = old + self.new = new + self.hook_calls += 1 + + @Property + @change_hook_property(_hook, mutable=True) + @local_property(name="HOOKED") + def x(): return {} + t = Test() + t.hook_calls = 0 + t.x = [] + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == [], t.new) + self.failUnless(t.hook_calls == 1, t.hook_calls) + a = t.x + a.append(5) + t.x = a + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 2, t.hook_calls) + t.x = [] + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [], t.new) + self.failUnless(t.hook_calls == 3, t.hook_calls) + # now append without reassigning. this doesn't trigger the + # change, since we don't ever set t.x, only get it and mess + # with it. It does, however, update our t.new, since t.new = + # t.x and is not a static copy. + t.x.append(5) + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 3, t.hook_calls) + # however, the next t.x get _will_ notice the change... + a = t.x + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 4, t.hook_calls) + t.x.append(6) # this append(6) is not noticed yet + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5,6], t.new) + self.failUnless(t.hook_calls == 4, t.hook_calls) + # this append(7) is not noticed, but the t.x get causes the + # append(6) to be noticed + t.x.append(7) + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [5,6,7], t.new) + self.failUnless(t.hook_calls == 5, t.hook_calls) + a = t.x # now the append(7) is noticed + self.failUnless(t.old == [5,6], t.old) + self.failUnless(t.new == [5,6,7], t.new) + self.failUnless(t.hook_calls == 6, t.hook_calls) + + suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests) diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py new file mode 100644 index 0000000..8b86829 --- /dev/null +++ b/libbe/storage/util/settings_object.py @@ -0,0 +1,432 @@ +# Bugs Everywhere - a distributed bugtracker +# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +This module provides a base class implementing settings-dict based +property storage useful for BE objects with saved properties +(e.g. BugDir, Bug, Comment). For example usage, consider the +unittests at the end of the module. +""" + +import libbe +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, fn_checked_property, \ + cached_property, primed_property, change_hook_property, \ + settings_property +if libbe.TESTING == True: + import doctest + import unittest + +class _Token (object): + """ + `Control' value class for properties. We want values that only + mean something to the settings_object module. + """ + pass + +class UNPRIMED (_Token): + "Property has not been primed." + pass + +class EMPTY (_Token): + """ + Property has been primed but has no user-set value, so use + default/generator value. + """ + pass + + +def prop_save_settings(self, old, new): + """ + The default action undertaken when a property changes. + """ + if self.storage != None and self.storage.is_writeable(): + self.save_settings() + +def prop_load_settings(self): + """ + The default action undertaken when an UNPRIMED property is accessed. + """ + if self.storage != None and self.storage.is_readable() \ + and self._settings_loaded==False: + self.load_settings() + else: + self._setup_saved_settings(flag_as_loaded=False) + +# Some name-mangling routines for pretty printing setting names +def setting_name_to_attr_name(self, name): + """ + Convert keys to the .settings dict into their associated + SavedSettingsObject attribute names. + >>> print setting_name_to_attr_name(None,"User-id") + user_id + """ + return name.lower().replace('-', '_') + +def attr_name_to_setting_name(self, name): + """ + The inverse of setting_name_to_attr_name. + >>> print attr_name_to_setting_name(None, "user_id") + User-id + """ + return name.capitalize().replace('_', '-') + + +def versioned_property(name, doc, + default=None, generator=None, + change_hook=prop_save_settings, + mutable=False, + primer=prop_load_settings, + allowed=None, check_fn=None, + settings_properties=[], + required_saved_properties=[], + require_save=False): + """ + Combine the common decorators in a single function. + + Use zero or one (but not both) of default or generator, since a + working default will keep the generator from functioning. Use the + default if you know what you want the default value to be at + 'coding time'. Use the generator if you can write a function to + determine a valid default at run time. If both default and + generator are None, then the property will be a defaulting + property which defaults to None. + + allowed and check_fn have a similar relationship, although you can + use both of these if you want. allowed compares the proposed + value against a list determined at 'coding time' and check_fn + allows more flexible comparisons to take place at run time. + + Set require_save to True if you want to save the default/generated + value for a property, to protect against future changes. E.g., we + currently expect all comments to be 'text/plain' but in the future + we may want to default to 'text/html'. If we don't want the old + comments to be interpreted as 'text/html', we would require that + the content type be saved. + + change_hook, primer, settings_properties, and + required_saved_properties are only options to get their defaults + into our local scope. Don't mess with them. + + Set mutable=True if: + * default is a mutable + * your generator function may return mutables + * you set change_hook and might have mutable property values + See the docstrings in libbe.properties for details on how each of + these cases are handled. + """ + settings_properties.append(name) + if require_save == True: + required_saved_properties.append(name) + def decorator(funcs): + fulldoc = doc + if default != None or generator == None: + defaulting = defaulting_property(default=default, null=EMPTY, + mutable_default=mutable) + fulldoc += "\n\nThis property defaults to %s." % default + if generator != None: + cached = cached_property(generator=generator, initVal=EMPTY, + mutable=mutable) + fulldoc += "\n\nThis property is generated with %s." % generator + if check_fn != None: + fn_checked = fn_checked_property(value_allowed_fn=check_fn) + fulldoc += "\n\nThis property is checked with %s." % check_fn + if allowed != None: + checked = checked_property(allowed=allowed) + fulldoc += "\n\nThe allowed values for this property are: %s." \ + % (', '.join(allowed)) + hooked = change_hook_property(hook=change_hook, mutable=mutable, + default=EMPTY) + primed = primed_property(primer=primer, initVal=UNPRIMED) + settings = settings_property(name=name, null=UNPRIMED) + docp = doc_property(doc=fulldoc) + deco = hooked(primed(settings(docp(funcs)))) + if default != None or generator == None: + deco = defaulting(deco) + if generator != None: + deco = cached(deco) + if check_fn != None: + deco = fn_checked(deco) + if allowed != None: + deco = checked(deco) + return Property(deco) + return decorator + +class SavedSettingsObject(object): + + # Keep a list of properties that may be stored in the .settings dict. + #settings_properties = [] + + # A list of properties that we save to disk, even if they were + # never set (in which case we save the default value). This + # protects against future changes in default values. + #required_saved_properties = [] + + _setting_name_to_attr_name = setting_name_to_attr_name + _attr_name_to_setting_name = attr_name_to_setting_name + + def __init__(self): + self._settings_loaded = False + self.storage = None + self.settings = {} + + def load_settings(self): + """Load the settings from disk.""" + # Override. Must call ._setup_saved_settings() after loading. + self.settings = {} + self._setup_saved_settings() + + def _setup_saved_settings(self, flag_as_loaded=True): + """ + To be run after setting self.settings up from disk. Marks all + settings as primed. + """ + for property in self.settings_properties: + if property not in self.settings \ + or self.settings[property] == UNPRIMED: + self.settings[property] = EMPTY + if flag_as_loaded == True: + self._settings_loaded = True + + def save_settings(self): + """Load the settings from disk.""" + # Override. Should save the dict output of ._get_saved_settings() + settings = self._get_saved_settings() + pass # write settings to disk.... + + def _get_saved_settings(self): + settings = {} + for k,v in self.settings.items(): + if v != None and v != EMPTY: + settings[k] = v + for k in self.required_saved_properties: + settings[k] = getattr(self, self._setting_name_to_attr_name(k)) + return settings + + def clear_cached_setting(self, setting=None): + "If setting=None, clear *all* cached settings" + if setting != None: + if hasattr(self, "_%s_cached_value" % setting): + delattr(self, "_%s_cached_value" % setting) + else: + for setting in settings_properties: + self.clear_cached_setting(setting) + + +if libbe.TESTING == True: + class SavedSettingsObjectTests(unittest.TestCase): + def testSimpleProperty(self): + """Testing a minimal versioned property""" + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property(name="Content-type", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties= \ + required_saved_properties) + def content_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + # access missing setting + self.failUnless(t._settings_loaded == False, t._settings_loaded) + self.failUnless(len(t.settings) == 0, len(t.settings)) + self.failUnless(t.content_type == None, t.content_type) + # accessing t.content_type triggers the priming, which runs + # t._setup_saved_settings, which fills out t.settings with + # EMPTY data. t._settings_loaded is still false though, since + # the default priming does not do any of the `official' loading + # that occurs in t.load_settings. + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t._settings_loaded == False, t._settings_loaded) + # load settings creates an EMPTY value in the settings array + t.load_settings() + self.failUnless(t._settings_loaded == True, t._settings_loaded) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + # now we set a value + t.content_type = 5 + self.failUnless(t.settings["Content-type"] == 5, + t.settings["Content-type"]) + self.failUnless(t.content_type == 5, t.content_type) + self.failUnless(t.settings["Content-type"] == 5, + t.settings["Content-type"]) + # now we set another value + t.content_type = "text/plain" + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings["Content-type"] == "text/plain", + t.settings["Content-type"]) + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + # now we clear to the post-primed value + t.content_type = EMPTY + self.failUnless(t._settings_loaded == True, t._settings_loaded) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + def testDefaultingProperty(self): + """Testing a defaulting versioned property""" + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property(name="Content-type", + doc="A test property", + default="text/plain", + settings_properties=settings_properties, + required_saved_properties= \ + required_saved_properties) + def content_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + self.failUnless(t._settings_loaded == False, t._settings_loaded) + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t._settings_loaded == False, t._settings_loaded) + t.load_settings() + self.failUnless(t._settings_loaded == True, t._settings_loaded) + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t._get_saved_settings() == {}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t.content_type == "text/html", + t.content_type) + self.failUnless(t.settings["Content-type"] == "text/html", + t.settings["Content-type"]) + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + def testRequiredDefaultingProperty(self): + """Testing a required defaulting versioned property""" + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property(name="Content-type", + doc="A test property", + default="text/plain", + settings_properties=settings_properties, + required_saved_properties= \ + required_saved_properties, + require_save=True) + def content_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + def testClassVersionedPropertyDefinition(self): + """Testing a class-specific _versioned property decorator""" + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + def _versioned_property(settings_properties= \ + settings_properties, + required_saved_properties= \ + required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"] = \ + required_saved_properties + return versioned_property(**kwargs) + @_versioned_property(name="Content-type", + doc="A test property", + default="text/plain", + require_save=True) + def content_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + def testMutableChangeHookedProperty(self): + """Testing a mutable change-hooked property""" + SAVES = [] + def prop_log_save_settings(self, old, new, saves=SAVES): + saves.append("'%s' -> '%s'" % (str(old), str(new))) + prop_save_settings(self, old, new) + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property(name="List-type", + doc="A test property", + mutable=True, + change_hook=prop_log_save_settings, + settings_properties=settings_properties, + required_saved_properties= \ + required_saved_properties) + def list_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + self.failUnless(t._settings_loaded == False, t._settings_loaded) + t.load_settings() + self.failUnless(SAVES == [], SAVES) + self.failUnless(t._settings_loaded == True, t._settings_loaded) + self.failUnless(t.list_type == None, t.list_type) + self.failUnless(SAVES == [], SAVES) + self.failUnless(t.settings["List-type"]==EMPTY, + t.settings["List-type"]) + t.list_type = [] + self.failUnless(t.settings["List-type"] == [], + t.settings["List-type"]) + self.failUnless(SAVES == [ + "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'" + ], SAVES) + t.list_type.append(5) + self.failUnless(SAVES == [ + "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'", + ], SAVES) + self.failUnless(t.settings["List-type"] == [5], + t.settings["List-type"]) + self.failUnless(SAVES == [ # the append(5) has not yet been saved + "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'", + ], SAVES) + self.failUnless(t.list_type == [5], t.list_type)#get triggers saved + + self.failUnless(SAVES == [ # now the append(5) has been saved. + "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'", + "'[]' -> '[5]'" + ], SAVES) + + unitsuite = unittest.TestLoader().loadTestsFromTestCase( \ + SavedSettingsObjectTests) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/util/upgrade.py b/libbe/storage/util/upgrade.py new file mode 100644 index 0000000..20ef1e4 --- /dev/null +++ b/libbe/storage/util/upgrade.py @@ -0,0 +1,331 @@ +# Copyright (C) 2009 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Handle conversion between the various BE storage formats. +""" + +import codecs +import os, os.path +import sys + +import libbe +import libbe.bug +import libbe.storage.util.mapfile as mapfile +from libbe.storage import STORAGE_VERSIONS, STORAGE_VERSION +#import libbe.storage.vcs # delay import to avoid cyclic dependency +import libbe.ui.util.editor +import libbe.util +import libbe.util.encoding as encoding +import libbe.util.id + + +class Upgrader (object): + "Class for converting between different on-disk BE storage formats." + initial_version = None + final_version = None + def __init__(self, repo): + import libbe.storage.vcs + + self.repo = repo + vcs_name = self._get_vcs_name() + if vcs_name == None: + vcs_name = 'None' + self.vcs = libbe.storage.vcs.vcs_by_name(vcs_name) + self.vcs.repo = self.repo + self.vcs.root() + + def get_path(self, *args): + """ + Return the absolute path using args relative to .be. + """ + dir = os.path.join(self.repo, '.be') + if len(args) == 0: + return dir + return os.path.join(dir, *args) + + def _get_vcs_name(self): + return None + + def check_initial_version(self): + path = self.get_path('version') + version = encoding.get_file_contents(path, decode=True).rstrip('\n') + assert version == self.initial_version, '%s: %s' % (path, version) + + def set_version(self): + path = self.get_path('version') + encoding.set_file_contents(path, self.final_version+'\n') + self.vcs._vcs_update(path) + + def upgrade(self): + print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \ + % (self.initial_version, self.final_version) + self.check_initial_version() + self.set_version() + self._upgrade() + + def _upgrade(self): + raise NotImplementedError + + +class Upgrade_1_0_to_1_1 (Upgrader): + initial_version = "Bugs Everywhere Tree 1 0" + final_version = "Bugs Everywhere Directory v1.1" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = encoding.get_file_contents(path) + for line in settings.splitlines(False): + fields = line.split('=') + if len(fields) == 2 and fields[0] == 'rcs_name': + return fields[1] + return None + + def _upgrade_mapfile(self, path): + contents = encoding.get_file_contents(path, decode=True) + old_format = False + for line in contents.splitlines(): + if len(line.split('=')) == 2: + old_format = True + break + if old_format == True: + # translate to YAML. + newlines = [] + for line in contents.splitlines(): + line = line.rstrip('\n') + if len(line) == 0: + continue + fields = line.split("=") + if len(fields) == 2: + key,value = fields + newlines.append('%s: "%s"' % (key, value.replace('"','\\"'))) + else: + newlines.append(line) + contents = '\n'.join(newlines) + # load the YAML and save + map = mapfile.parse(contents) + contents = mapfile.generate(map) + encoding.set_file_contents(path, contents) + self.vcs._vcs_update(path) + + def _upgrade(self): + """ + Comment value field "From" -> "Author". + Homegrown mapfile -> YAML. + """ + path = self.get_path('settings') + self._upgrade_mapfile(path) + for bug_uuid in os.listdir(self.get_path('bugs')): + path = self.get_path('bugs', bug_uuid, 'values') + self._upgrade_mapfile(path) + c_path = ['bugs', bug_uuid, 'comments'] + if not os.path.exists(self.get_path(*c_path)): + continue # no comments for this bug + for comment_uuid in os.listdir(self.get_path(*c_path)): + path_list = c_path + [comment_uuid, 'values'] + path = self.get_path(*path_list) + self._upgrade_mapfile(path) + settings = mapfile.parse( + encoding.get_file_contents(path)) + if 'From' in settings: + settings['Author'] = settings.pop('From') + encoding.set_file_contents( + path, mapfile.generate(settings)) + self.vcs._vcs_update(path) + + +class Upgrade_1_1_to_1_2 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.1" + final_version = "Bugs Everywhere Directory v1.2" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'rcs_name' in settings: + return settings['rcs_name'] + return None + + def _upgrade(self): + """ + BugDir settings field "rcs_name" -> "vcs_name". + """ + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'rcs_name' in settings: + settings['vcs_name'] = settings.pop('rcs_name') + encoding.set_file_contents(path, mapfile.generate(settings)) + self.vcs._vcs_update(path) + +class Upgrade_1_2_to_1_3 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.2" + final_version = "Bugs Everywhere Directory v1.3" + def __init__(self, *args, **kwargs): + Upgrader.__init__(self, *args, **kwargs) + self._targets = {} # key: target text,value: new target bug + + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'vcs_name' in settings: + return settings['vcs_name'] + return None + + def _save_bug_settings(self, bug): + # The target bugs don't have comments + path = self.get_path('bugs', bug.uuid, 'values') + if not os.path.exists(path): + self.vcs._add_path(path, directory=False) + path = self.get_path('bugs', bug.uuid, 'values') + mf = mapfile.generate(bug._get_saved_settings()) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _target_bug(self, target_text): + if target_text not in self._targets: + bug = libbe.bug.Bug(summary=target_text) + bug.severity = 'target' + self._targets[target_text] = bug + return self._targets[target_text] + + def _upgrade_bugdir_mapfile(self): + path = self.get_path('settings') + mf = encoding.get_file_contents(path) + if mf == libbe.util.InvalidObject: + return # settings file does not exist + settings = mapfile.parse(mf) + if 'target' in settings: + settings['target'] = self._target_bug(settings['target']).uuid + mf = mapfile.generate(settings) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _upgrade_bug_mapfile(self, bug_uuid): + import libbe.command.depend as dep + path = self.get_path('bugs', bug_uuid, 'values') + mf = encoding.get_file_contents(path) + if mf == libbe.util.InvalidObject: + return # settings file does not exist + settings = mapfile.parse(mf) + if 'target' in settings: + target_bug = self._target_bug(settings['target']) + + blocked_by_string = '%s%s' % (dep.BLOCKED_BY_TAG, bug_uuid) + dep._add_remove_extra_string(target_bug, blocked_by_string, add=True) + blocks_string = dep._generate_blocks_string(target_bug) + estrs = settings.get('extra_strings', []) + estrs.append(blocks_string) + settings['extra_strings'] = sorted(estrs) + + settings.pop('target') + mf = mapfile.generate(settings) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _upgrade(self): + """ + Bug value field "target" -> target bugs. + Bugdir value field "target" -> pointer to current target bug. + """ + for bug_uuid in os.listdir(self.get_path('bugs')): + self._upgrade_bug_mapfile(bug_uuid) + self._upgrade_bugdir_mapfile() + for bug in self._targets.values(): + self._save_bug_settings(bug) + +class Upgrade_1_3_to_1_4 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.3" + final_version = "Bugs Everywhere Directory v1.4" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'vcs_name' in settings: + return settings['vcs_name'] + return None + + def _upgrade(self): + """ + add new directory "./be/BUGDIR-UUID" + "./be/bugs" -> "./be/BUGDIR-UUID/bugs" + "./be/settings" -> "./be/BUGDIR-UUID/settings" + """ + self.repo = os.path.abspath(self.repo) + basenames = [p for p in os.listdir(self.get_path())] + if not 'bugs' in basenames and not 'settings' in basenames \ + and len([p for p in basenames if len(p)==36]) == 1: + return # the user has upgraded the directory. + basenames = [p for p in basenames if p in ['bugs','settings']] + uuid = libbe.util.id.uuid_gen() + add = [self.get_path(uuid)] + move = [(self.get_path(p), self.get_path(uuid, p)) for p in basenames] + msg = ['Upgrading BE directory version v1.3 to v1.4', + '', + "Because BE's VCS drivers don't support 'move',", + 'please make the following changes with your VCS', + 'and re-run BE. Note that you can choose a different', + 'bugdir UUID to preserve uniformity across branches', + 'of a distributed repository.' + '', + 'add', + ' ' + '\n '.join(add), + 'move', + ' ' + '\n '.join(['%s %s' % (a,b) for a,b in move]), + ] + self.vcs._cached_path_id.destroy() + raise Exception('Need user assistance\n%s' % '\n'.join(msg)) + + +upgraders = [Upgrade_1_0_to_1_1, + Upgrade_1_1_to_1_2, + Upgrade_1_2_to_1_3, + Upgrade_1_3_to_1_4] +upgrade_classes = {} +for upgrader in upgraders: + upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader + +def upgrade(path, current_version, + target_version=STORAGE_VERSION): + """ + Call the appropriate upgrade function to convert current_version + to target_version. If a direct conversion function does not exist, + use consecutive conversion functions. + """ + if current_version not in STORAGE_VERSIONS: + raise NotImplementedError, \ + "Cannot handle version '%s' yet." % current_version + if target_version not in STORAGE_VERSIONS: + raise NotImplementedError, \ + "Cannot handle version '%s' yet." % current_version + + if (current_version, target_version) in upgrade_classes: + # direct conversion + upgrade_class = upgrade_classes[(current_version, target_version)] + u = upgrade_class(path) + u.upgrade() + else: + # consecutive single-step conversion + i = STORAGE_VERSIONS.index(current_version) + while True: + version_a = STORAGE_VERSIONS[i] + version_b = STORAGE_VERSIONS[i+1] + try: + upgrade_class = upgrade_classes[(version_a, version_b)] + except KeyError: + raise NotImplementedError, \ + "Cannot convert version '%s' to '%s' yet." \ + % (version_a, version_b) + u = upgrade_class(path) + u.upgrade() + if version_b == target_version: + break + i += 1 diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py new file mode 100644 index 0000000..ddfb00a --- /dev/null +++ b/libbe/storage/vcs/__init__.py @@ -0,0 +1,10 @@ +# Copyright + +import base + +set_preferred_vcs = base.set_preferred_vcs +vcs_by_name = base.vcs_by_name +detect_vcs = base.detect_vcs +installed_vcs = base.installed_vcs + +__all__ = [set_preferred_vcs, vcs_by_name, detect_vcs, installed_vcs] diff --git a/libbe/storage/vcs/arch.py b/libbe/storage/vcs/arch.py new file mode 100644 index 0000000..f1b5b7b --- /dev/null +++ b/libbe/storage/vcs/arch.py @@ -0,0 +1,350 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Ben Finney <benf@cybersource.com.au> +# Gianluca Montecchi <gian@grys.it> +# James Rowe <jnrowe@ukfsn.org> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +GNU Arch (tla) backend. +""" + +import codecs +import os +import re +import shutil +import sys +import time + +import libbe +import libbe.ui.util.user +import libbe.storage.util.config +from libbe.util.id import uuid_gen +import base + +if libbe.TESTING == True: + import unittest + import doctest + + +class CantAddFile(Exception): + def __init__(self, file): + self.file = file + Exception.__init__(self, "Can't automatically add file %s" % file) + +DEFAULT_CLIENT = 'tla' + +client = libbe.storage.util.config.get_val( + 'arch_client', default=DEFAULT_CLIENT) + +def new(): + return Arch() + +class Arch(base.VCS): + name = 'arch' + client = client + _archive_name = None + _archive_dir = None + _tmp_archive = False + _project_name = None + _tmp_project = False + _arch_paramdir = os.path.expanduser('~/.arch-params') + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + self.interspersed_vcs_files = True + self.paranoid = False + + def _vcs_version(self): + status,output,error = self._u_invoke_client('--version') + return output + + def _vcs_detect(self, path): + """Detect whether a directory is revision-controlled using Arch""" + if self._u_search_parent_directories(path, '{arch}') != None : + libbe.storage.util.config.set_val('arch_client', client) + return True + return False + + def _vcs_init(self, path): + self._create_archive(path) + self._create_project(path) + self._add_project_code(path) + + def _create_archive(self, path): + """ + Create a temporary Arch archive in the directory PATH. This + archive will be removed by + destroy->_vcs_destroy->_remove_archive + """ + # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive + assert self._archive_name == None + id = self.get_user_id() + name, email = libbe.ui.util.user.parse_user_id(id) + if email == None: + email = '%s@example.com' % name + trailer = '%s-%s' % ('bugs-everywhere-auto', uuid_gen()[0:8]) + self._archive_name = '%s--%s' % (email, trailer) + self._archive_dir = '/tmp/%s' % trailer + self._tmp_archive = True + self._u_invoke_client('make-archive', self._archive_name, + self._archive_dir, cwd=path) + + def _invoke_client(self, *args, **kwargs): + """ + Invoke the client on our archive. + """ + assert self._archive_name != None + command = args[0] + if len(args) > 1: + tailargs = args[1:] + else: + tailargs = [] + arglist = [command, '-A', self._archive_name] + arglist.extend(tailargs) + args = tuple(arglist) + return self._u_invoke_client(*args, **kwargs) + + def _remove_archive(self): + assert self._tmp_archive == True + assert self._archive_dir != None + assert self._archive_name != None + os.remove(os.path.join(self._arch_paramdir, + '=locations', self._archive_name)) + shutil.rmtree(self._archive_dir) + self._tmp_archive = False + self._archive_dir = False + self._archive_name = False + + def _create_project(self, path): + """ + Create a temporary Arch project in the directory PATH. This + project will be removed by + destroy->_vcs_destroy->_remove_project + """ + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project + category = 'bugs-everywhere' + branch = 'mainline' + version = '0.1' + self._project_name = '%s--%s--%s' % (category, branch, version) + self._invoke_client('archive-setup', self._project_name, + cwd=path) + self._tmp_project = True + + def _remove_project(self): + assert self._tmp_project == True + assert self._project_name != None + assert self._archive_dir != None + shutil.rmtree(os.path.join(self._archive_dir, self._project_name)) + self._tmp_project = False + self._project_name = False + + def _archive_project_name(self): + assert self._archive_name != None + assert self._project_name != None + return '%s/%s' % (self._archive_name, self._project_name) + + def _adjust_naming_conventions(self, path): + """ + By default, Arch restricts source code filenames to + ^[_=a-zA-Z0-9].*$ + See + http://regexps.srparish.net/tutorial-tla/naming-conventions.html + Since our bug directory '.be' doesn't satisfy these conventions, + we need to adjust them. + + The conventions are specified in + project-root/{arch}/=tagging-method + """ + tagpath = os.path.join(path, '{arch}', '=tagging-method') + lines_out = [] + f = codecs.open(tagpath, 'r', self.encoding) + for line in f: + if line.startswith('source '): + lines_out.append('source ^[._=a-zA-X0-9].*$\n') + else: + lines_out.append(line) + f.close() + f = codecs.open(tagpath, 'w', self.encoding) + f.write(''.join(lines_out)) + f.close() + + def _add_project_code(self, path): + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-source.html + # http://regexps.srparish.net/tutorial-tla/importing-first.html + self._invoke_client('init-tree', self._project_name, + cwd=path) + self._adjust_naming_conventions(path) + self._invoke_client('import', '--summary', 'Began versioning', + cwd=path) + + def _vcs_destroy(self): + if self._tmp_project == True: + self._remove_project() + if self._tmp_archive == True: + self._remove_archive() + vcs_dir = os.path.join(self.repo, '{arch}') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + self._archive_name = None + + def _vcs_root(self, path): + if not os.path.isdir(path): + dirname = os.path.dirname(path) + else: + dirname = path + status,output,error = self._u_invoke_client('tree-root', dirname) + root = output.rstrip('\n') + + self._get_archive_project_name(root) + + return root + + def _get_archive_name(self, root): + status,output,error = self._u_invoke_client('archives') + lines = output.split('\n') + # e.g. output: + # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52 + # /tmp/BEtestXXXXXX/rootdir + # (+ repeats) + for archive,location in zip(lines[::2], lines[1::2]): + if os.path.realpath(location) == os.path.realpath(root): + self._archive_name = archive + assert self._archive_name != None + + def _get_archive_project_name(self, root): + # get project names + status,output,error = self._u_invoke_client('tree-version', cwd=root) + # e.g output + # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1 + archive_name,project_name = output.rstrip('\n').split('/') + self._archive_name = archive_name + self._project_name = project_name + + def _vcs_get_user_id(self): + try: + status,output,error = self._u_invoke_client('my-id') + return output.rstrip('\n') + except Exception, e: + if 'no arch user id set' in e.args[0]: + return None + else: + raise + + def _vcs_add(self, path): + self._u_invoke_client('add-id', path) + realpath = os.path.realpath(self._u_abspath(path)) + pathAdded = realpath in self._list_added(self.repo) + if self.paranoid and not pathAdded: + self._force_source(path) + + def _list_added(self, root): + assert os.path.exists(root) + assert os.access(root, os.X_OK) + root = os.path.realpath(root) + status,output,error = self._u_invoke_client('inventory', '--source', + '--both', '--all', root) + inv_str = output.rstrip('\n') + return [os.path.join(root, p) for p in inv_str.split('\n')] + + def _add_dir_rule(self, rule, dirname, root): + inv_path = os.path.join(dirname, '.arch-inventory') + f = codecs.open(inv_path, 'a', self.encoding) + f.write(rule) + f.close() + if os.path.realpath(inv_path) not in self._list_added(root): + paranoid = self.paranoid + self.paranoid = False + self.add(inv_path) + self.paranoid = paranoid + + def _force_source(self, path): + rule = 'source %s\n' % self._u_rel_path(path) + self._add_dir_rule(rule, os.path.dirname(path), self.repo) + if os.path.realpath(path) not in self._list_added(self.repo): + raise CantAddFile(path) + + def _vcs_remove(self, path): + if self._vcs_is_versioned(path): + self._u_invoke_client('delete-id', path) + arch_ids = os.path.join(self.repo, path, '.arch-ids') + if os.path.exists(arch_ids): + shutil.rmtree(arch_ids) + + def _vcs_update(self, path): + pass + + def _vcs_is_versioned(self, path): + if '.arch-ids' in path: + return False + return True + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + else: + status,output,error = \ + self._invoke_client('file-find', path, revision) + relpath = output.rstrip('\n') + return base.VCS._vcs_get_file_contents(self, relpath) + + def _vcs_commit(self, commitfile, allow_empty=False): + if allow_empty == False: + # arch applies empty commits without complaining, so check first + status,output,error = self._u_invoke_client('changes',expect=(0,1)) + if status == 0: + raise base.EmptyCommit() + summary,body = self._u_parse_commitfile(commitfile) + args = ['commit', '--summary', summary] + if body != None: + args.extend(['--log-message',body]) + status,output,error = self._u_invoke_client(*args) + revision = None + revline = re.compile('[*] committed (.*)') + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revpath = match.groups()[0] + assert not " " in revpath, revpath + assert revpath.startswith(self._archive_project_name()+'--') + revision = revpath[len(self._archive_project_name()+'--'):] + return revpath + + def _vcs_revision_id(self, index): + status,output,error = self._u_invoke_client('logs') + logs = output.splitlines() + first_log = logs.pop(0) + assert first_log == 'base-0', first_log + try: + if index > 0: + log = logs[index-1] + elif index < 0: + log = logs[index] + else: + return None + except IndexError: + return None + return '%s--%s' % (self._archive_project_name(), log) + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Arch, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py new file mode 100644 index 0000000..99f43f3 --- /dev/null +++ b/libbe/storage/vcs/base.py @@ -0,0 +1,1106 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Alexander Belchenko <bialix@ukr.net> +# Ben Finney <benf@cybersource.com.au> +# Chris Ball <cjb@laptop.org> +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Define the base VCS (Version Control System) class, which should be +subclassed by other Version Control System backends. The base class +implements a "do not version" VCS. +""" + +import codecs +import os +import os.path +import re +import shutil +import sys +import tempfile +import types + +import libbe +import libbe.storage +import libbe.storage.base +import libbe.util.encoding +from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID +from libbe.util.utility import Dir, search_parent_directories +from libbe.util.subproc import CommandError, invoke +from libbe.util.plugin import import_by_name +import libbe.storage.util.upgrade as upgrade + +if libbe.TESTING == True: + import unittest + import doctest + + import libbe.ui.util.user + +# List VCS modules in order of preference. +# Don't list this module, it is implicitly last. +VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg'] + +def set_preferred_vcs(name): + global VCS_ORDER + assert name in VCS_ORDER, \ + 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER) + VCS_ORDER.remove(name) + VCS_ORDER.insert(0, name) + +def _get_matching_vcs(matchfn): + """Return the first module for which matchfn(VCS_instance) is true""" + for submodname in VCS_ORDER: + module = import_by_name('libbe.storage.vcs.%s' % submodname) + vcs = module.new() + if matchfn(vcs) == True: + return vcs + return VCS() + +def vcs_by_name(vcs_name): + """Return the module for the VCS with the given name""" + if vcs_name == VCS.name: + return new() + return _get_matching_vcs(lambda vcs: vcs.name == vcs_name) + +def detect_vcs(dir): + """Return an VCS instance for the vcs being used in this directory""" + return _get_matching_vcs(lambda vcs: vcs._detect(dir)) + +def installed_vcs(): + """Return an instance of an installed VCS""" + return _get_matching_vcs(lambda vcs: vcs.installed()) + + +class VCSNotRooted (libbe.storage.base.ConnectionError): + def __init__(self, vcs): + msg = 'VCS not rooted' + libbe.storage.base.ConnectionError.__init__(self, msg) + self.vcs = vcs + +class VCSUnableToRoot (libbe.storage.base.ConnectionError): + def __init__(self, vcs): + msg = 'VCS unable to root' + libbe.storage.base.ConnectionError.__init__(self, msg) + self.vcs = vcs + +class InvalidPath (InvalidID): + def __init__(self, path, root, msg=None, **kwargs): + if msg == None: + msg = 'Path "%s" not in root "%s"' % (path, root) + InvalidID.__init__(self, msg=msg, **kwargs) + self.path = path + self.root = root + +class SpacerCollision (InvalidPath): + def __init__(self, path, spacer): + msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer) + InvalidPath.__init__(self, path, root=None, msg=msg) + self.spacer = spacer + +class NoSuchFile (InvalidID): + def __init__(self, pathname, root='.'): + path = os.path.abspath(os.path.join(root, pathname)) + InvalidID.__init__(self, 'No such file: %s' % path) + + +class CachedPathID (object): + """ + Storage ID <-> path policy. + .../.be/BUGDIR/bugs/BUG/comments/COMMENT + ^-- root path + + >>> dir = Dir() + >>> os.mkdir(os.path.join(dir.path, '.be')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456')) + >>> file(os.path.join(dir.path, '.be', 'abc', 'values'), + ... 'w').close() + >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'), + ... 'w').close() + >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'), + ... 'w').close() + >>> c = CachedPathID() + >>> c.root(dir.path) + >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values')) + 'def/values' + >>> c.init() + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc', 'id-cache'] + >>> c.connect() + >>> c.path('123/values') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/values' + >>> c.disconnect() + >>> c.destroy() + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc'] + >>> c.connect() # demonstrate auto init + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc', 'id-cache'] + >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS + u'.../.be/xyz' + >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS + u'.../.be/xyz/def' + >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/comments/qrs' + >>> c.disconnect() + >>> c.connect() + >>> c.path('qrs') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/comments/qrs' + >>> c.remove_id('qrs') + >>> c.path('qrs') + Traceback (most recent call last): + ... + InvalidID: 'qrs' + >>> c.disconnect() + >>> c.destroy() + >>> dir.cleanup() + """ + def __init__(self, encoding=None): + self.encoding = libbe.util.encoding.get_filesystem_encoding() + self._spacer_dirs = ['.be', 'bugs', 'comments'] + + def root(self, path): + self._root = os.path.abspath(path).rstrip(os.path.sep) + self._cache_path = os.path.join( + self._root, self._spacer_dirs[0], 'id-cache') + + def init(self): + """ + Create cache file for an existing .be directory. + File if multiple lines of the form: + UUID\tPATH + """ + self._cache = {} + spaced_root = os.path.join(self._root, self._spacer_dirs[0]) + for dirpath, dirnames, filenames in os.walk(spaced_root): + if dirpath == spaced_root: + continue + try: + id = self.id(dirpath) + relpath = dirpath[len(self._root)+1:] + if id.count('/') == 0: + if id in self._cache: + print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath) + self._cache[id] = relpath + except InvalidPath: + pass + self._changed = True + self.disconnect() + + def destroy(self): + if os.path.exists(self._cache_path): + os.remove(self._cache_path) + + def connect(self): + if not os.path.exists(self._cache_path): + try: + self.init() + except IOError: + raise libbe.storage.base.ConnectionError + self._cache = {} # key: uuid, value: path + self._changed = False + f = codecs.open(self._cache_path, 'r', self.encoding) + for line in f: + fields = line.rstrip('\n').split('\t') + self._cache[fields[0]] = fields[1] + f.close() + + def disconnect(self): + if self._changed == True: + f = codecs.open(self._cache_path, 'w', self.encoding) + for uuid,path in self._cache.items(): + f.write('%s\t%s\n' % (uuid, path)) + f.close() + self._cache = {} + + def path(self, id, relpath=False): + fields = id.split('/', 1) + uuid = fields[0] + if len(fields) == 1: + extra = [] + else: + extra = fields[1:] + if uuid not in self._cache: + raise InvalidID(uuid) + if relpath == True: + return os.path.join(self._cache[uuid], *extra) + return os.path.join(self._root, self._cache[uuid], *extra) + + def add_id(self, id, parent=None): + if id.count('/') > 0: + # not a UUID-level path + assert id.startswith(parent), \ + 'Strange ID: "%s" should start with "%s"' % (id, parent) + path = self.path(id) + elif id in self._cache: + # already added + path = self.path(id) + else: + if parent == None: + parent_path = '' + spacer = self._spacer_dirs[0] + else: + assert parent.count('/') == 0, \ + 'Strange parent ID: "%s" should be UUID' % parent + parent_path = self.path(parent, relpath=True) + parent_spacer = parent_path.split(os.path.sep)[-2] + i = self._spacer_dirs.index(parent_spacer) + spacer = self._spacer_dirs[i+1] + path = os.path.join(parent_path, spacer, id) + self._cache[id] = path + self._changed = True + path = os.path.join(self._root, path) + return path + + def remove_id(self, id): + if id.count('/') > 0: + return # not a UUID-level path + self._cache.pop(id) + self._changed = True + + def id(self, path): + path = os.path.join(self._root, path) + if not path.startswith(self._root + os.path.sep): + raise InvalidPath(path, self._root) + path = path[len(self._root)+1:] + orig_path = path + if not path.startswith(self._spacer_dirs[0] + os.path.sep): + raise InvalidPath(path, self._spacer_dirs[0]) + for spacer in self._spacer_dirs: + if not path.startswith(spacer + os.path.sep): + break + id = path[len(spacer)+1:] + fields = path[len(spacer)+1:].split(os.path.sep,1) + if len(fields) == 1: + break + path = fields[1] + for spacer in self._spacer_dirs: + if id.endswith(os.path.sep + spacer): + raise SpacerCollision(orig_path, spacer) + if os.path.sep != '/': + id = id.replace(os.path.sep, '/') + return id + + +def new(): + return VCS() + +class VCS (libbe.storage.base.VersionedStorage): + """ + This class implements a 'no-vcs' interface. + + Support for other VCSs can be added by subclassing this class, and + overriding methods _vcs_*() with code appropriate for your VCS. + + The methods _u_*() are utility methods available to the _vcs_*() + methods. + + Sink to existing root + ====================== + + Consider the following usage case: + You have a bug directory rooted in + /path/to/source + by which I mean the '.be' directory is at + /path/to/source/.be + However, you're of in some subdirectory like + /path/to/source/GUI/testing + and you want to comment on a bug. Setting sink_to_root=True when + you initialize your BugDir will cause it to search for the '.be' + file in the ancestors of the path you passed in as 'root'. + /path/to/source/GUI/testing/.be miss + /path/to/source/GUI/.be miss + /path/to/source/.be hit! + So it still roots itself appropriately without much work for you. + + File-system access + ================== + + BugDirs live completely in memory when .sync_with_disk is False. + This is the default configuration setup by BugDir(from_disk=False). + If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then + any changes to the BugDir will be immediately written to disk. + + If you want to change .sync_with_disk, we suggest you use + .set_sync_with_disk(), which propogates the new setting through to + all bugs/comments/etc. that have been loaded into memory. If + you've been living in memory and want to move to + .sync_with_disk==True, but you're not sure if anything has been + changed in memory, a call to .save() immediately before the + .set_sync_with_disk(True) call is a safe move. + + Regardless of .sync_with_disk, a call to .save() will write out + all the contents that the BugDir instance has loaded into memory. + If sync_with_disk has been True over the course of all interesting + changes, this .save() call will be a waste of time. + + The BugDir will only load information from the file system when it + loads new settings/bugs/comments that it doesn't already have in + memory and .sync_with_disk == True. + + Allow storage initialization + ======================== + + This one is for testing purposes. Setting it to True allows the + BugDir to search for an installed Storage backend and initialize + it in the root directory. This is a convenience option for + supporting tests of versioning functionality + (e.g. .duplicate_bugdir). + + Disable encoding manipulation + ============================= + + This one is for testing purposed. You might have non-ASCII + Unicode in your bugs, comments, files, etc. BugDir instances try + and support your preferred encoding scheme (e.g. "utf-8") when + dealing with stream and file input/output. For stream output, + this involves replacing sys.stdout and sys.stderr + (libbe.encode.set_IO_stream_encodings). However this messes up + doctest's output catching. In order to support doctest tests + using BugDirs, set manipulate_encodings=False, and stick to ASCII + in your tests. + + if root == None: + root = os.getcwd() + if sink_to_existing_root == True: + self.root = self._find_root(root) + else: + if not os.path.exists(root): + self.root = None + raise NoRootEntry(root) + self.root = root + # get a temporary storage until we've loaded settings + self.sync_with_disk = False + self.storage = self._guess_storage() + + if assert_new_BugDir == True: + if os.path.exists(self.get_path()): + raise AlreadyInitialized, self.get_path() + if storage == None: + storage = self._guess_storage(allow_storage_init) + self.storage = storage + self._setup_user_id(self.user_id) + + + # methods for getting the BugDir situated in the filesystem + + def _find_root(self, path): + ''' + Search for an existing bug database dir and it's ancestors and + return a BugDir rooted there. Only called by __init__, and + then only if sink_to_existing_root == True. + ''' + if not os.path.exists(path): + self.root = None + raise NoRootEntry(path) + versionfile=utility.search_parent_directories(path, + os.path.join(".be", "version")) + if versionfile != None: + beroot = os.path.dirname(versionfile) + root = os.path.dirname(beroot) + return root + else: + beroot = utility.search_parent_directories(path, ".be") + if beroot == None: + self.root = None + raise NoBugDir(path) + return beroot + + def _guess_storage(self, allow_storage_init=False): + ''' + Only called by __init__. + ''' + deepdir = self.get_path() + if not os.path.exists(deepdir): + deepdir = os.path.dirname(deepdir) + new_storage = storage.detect_storage(deepdir) + install = False + if new_storage.name == "None": + if allow_storage_init == True: + new_storage = storage.installed_storage() + new_storage.init(self.root) + return new_storage + +os.listdir(self.get_path("bugs")): + """ + name = 'None' + client = 'false' # command-line tool for _u_invoke_client + + def __init__(self, *args, **kwargs): + if 'encoding' not in kwargs: + kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding() + libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs) + self.versioned = False + self.interspersed_vcs_files = False + self.verbose_invoke = False + self._cached_path_id = CachedPathID() + self._rooted = False + + def _vcs_version(self): + """ + Return the VCS version string. + """ + return '0' + + def _vcs_get_user_id(self): + """ + Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the VCS has not been configured with a username, return None. + """ + return None + + def _vcs_detect(self, path=None): + """ + Detect whether a directory is revision controlled with this VCS. + """ + return True + + def _vcs_root(self, path): + """ + Get the VCS root. This is the default working directory for + future invocations. You would normally set this to the root + directory for your VCS. + """ + if os.path.isdir(path) == False: + path = os.path.dirname(path) + if path == '': + path = os.path.abspath('.') + return path + + def _vcs_init(self, path): + """ + Begin versioning the tree based at path. + """ + pass + + def _vcs_destroy(self): + """ + Remove any files used in versioning (e.g. whatever _vcs_init() + created). + """ + pass + + def _vcs_add(self, path): + """ + Add the already created file at path to version control. + """ + pass + + def _vcs_remove(self, path): + """ + Remove the file at path from version control. Optionally + remove the file from the filesystem as well. + """ + pass + + def _vcs_update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + pass + + def _vcs_is_versioned(self, path): + """ + Return true if a path is under version control, False + otherwise. You only need to set this if the VCS goes about + dumping VCS-specific files into the .be directory. + + If you do need to implement this method (e.g. Arch), set + self.interspersed_vcs_files = True + """ + assert self.interspersed_vcs_files == False + raise NotImplementedError + + def _vcs_get_file_contents(self, path, revision=None): + """ + Get the file contents as they were in a given revision. + Revision==None specifies the current revision. + """ + if revision != None: + raise libbe.storage.base.InvalidRevision( + 'The %s VCS does not support revision specifiers' % self.name) + path = os.path.join(self.repo, path) + if not os.path.exists(path): + return libbe.util.InvalidObject + if os.path.isdir(path): + return libbe.storage.base.InvalidDirectory + f = open(path, 'rb') + contents = f.read() + f.close() + return contents + + def _vcs_path(self, id, revision): + """ + Return the path to object id as of revision. + + Revision will not be None. + """ + raise NotImplementedError + + def _vcs_isdir(self, path, revision): + """ + Return True if path (as returned by _vcs_path) was a directory + as of revision, False otherwise. + + Revision will not be None. + """ + raise NotImplementedError + + def _vcs_listdir(self, path, revision): + """ + Return a list of the contents of the directory path (as + returned by _vcs_path) as of revision. + + Revision will not be None, and ._vcs_isdir(path, revision) + will be True. + """ + raise NotImplementedError + + def _vcs_commit(self, commitfile, allow_empty=False): + """ + Commit the current working directory, using the contents of + commitfile as the comment. Return the name of the old + revision (or None if commits are not supported). + + If allow_empty == False, raise EmptyCommit if there are no + changes to commit. + """ + return None + + def _vcs_revision_id(self, index): + """ + Return the name of the <index>th revision. Index will be an + integer (possibly <= 0). The choice of which branch to follow + when crossing branches/merges is not defined. + + Return None if revision IDs are not supported, or if the + specified revision does not exist. + """ + return None + + def version(self): + # Cache version string for efficiency. + if not hasattr(self, '_version'): + self._version = self._get_version() + return self._version + + def _get_version(self): + try: + ret = self._vcs_version() + return ret + except OSError, e: + if e.errno == errno.ENOENT: + return None + else: + raise OSError, e + except CommandError: + return None + + def installed(self): + if self.version() != None: + return True + return False + + def get_user_id(self): + """ + Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the VCS has not been configured with a username, return None. + You can override the automatic lookup procedure by setting the + VCS.user_id attribute to a string of your choice. + """ + if not hasattr(self, 'user_id'): + self.user_id = self._vcs_get_user_id() + return self.user_id + + def _detect(self, path='.'): + """ + Detect whether a directory is revision controlled with this VCS. + """ + return self._vcs_detect(path) + + def root(self): + """ + Set the root directory to the path's VCS root. This is the + default working directory for future invocations. + """ + if self._detect(self.repo) == False: + raise VCSUnableToRoot(self) + root = self._vcs_root(self.repo) + self.repo = os.path.abspath(root) + if os.path.isdir(self.repo) == False: + self.repo = os.path.dirname(self.repo) + self.be_dir = os.path.join( + self.repo, self._cached_path_id._spacer_dirs[0]) + self._cached_path_id.root(self.repo) + self._rooted = True + + def _init(self): + """ + Begin versioning the tree based at self.repo. + Also roots the vcs at path. + """ + if not os.path.exists(self.repo) or not os.path.isdir(self.repo): + raise VCSUnableToRoot(self) + if self._vcs_detect(self.repo) == False: + self._vcs_init(self.repo) + if self._rooted == False: + self.root() + os.mkdir(self.be_dir) + self._vcs_add(self._u_rel_path(self.be_dir)) + self._setup_storage_version() + self._cached_path_id.init() + + def _destroy(self): + self._vcs_destroy() + self._cached_path_id.destroy() + if os.path.exists(self.be_dir): + shutil.rmtree(self.be_dir) + + def _connect(self): + if self._rooted == False: + self.root() + if not os.path.isdir(self.be_dir): + raise libbe.storage.base.ConnectionError(self) + self._cached_path_id.connect() + self.check_storage_version() + + def _disconnect(self): + self._cached_path_id.disconnect() + + def _add_path(self, path, directory=False): + relpath = self._u_rel_path(path) + reldirs = relpath.split(os.path.sep) + if directory == False: + reldirs = reldirs[:-1] + dir = self.repo + for reldir in reldirs: + dir = os.path.join(dir, reldir) + if not os.path.exists(dir): + os.mkdir(dir) + self._vcs_add(self._u_rel_path(dir)) + elif not os.path.isdir(dir): + raise libbe.storage.base.InvalidDirectory + if directory == False: + if not os.path.exists(path): + open(path, 'w').close() + self._vcs_add(self._u_rel_path(path)) + + def _add(self, id, parent=None, **kwargs): + path = self._cached_path_id.add_id(id, parent) + self._add_path(path, **kwargs) + + def _remove(self, id): + path = self._cached_path_id.path(id) + if os.path.exists(path): + if os.path.isdir(path) and len(self.children(id)) > 0: + raise libbe.storage.base.DirectoryNotEmpty(id) + self._vcs_remove(self._u_rel_path(path)) + if os.path.exists(path): + if os.path.isdir(path): + os.rmdir(path) + else: + os.remove(path) + self._cached_path_id.remove_id(id) + + def _recursive_remove(self, id): + path = self._cached_path_id.path(id) + for dirpath,dirnames,filenames in os.walk(path, topdown=False): + filenames.extend(dirnames) + for f in filenames: + fullpath = os.path.join(dirpath, f) + if os.path.exists(fullpath) == False: + continue + self._vcs_remove(self._u_rel_path(fullpath)) + if os.path.exists(path): + shutil.rmtree(path) + path = self._cached_path_id.path(id, relpath=True) + for id,p in self._cached_path_id._cache.items(): + if p.startswith(path): + self._cached_path_id.remove_id(id) + + def _children(self, id=None, revision=None): + if revision == None: + id_to_path = self._cached_path_id.path + isdir = os.path.isdir + listdir = os.listdir + else: + id_to_path = lambda id : self._vcs_path(id, revision) + isdir = lambda path : self._vcs_isdir(path, revision) + listdir = lambda path : self._vcs_listdir(path, revision) + if id==None: + path = self.be_dir + else: + path = id_to_path(id) + if isdir(path) == False: + return [] + children = listdir(path) + for i,c in enumerate(children): + if c in self._cached_path_id._spacer_dirs: + children[i] = None + children.extend([os.path.join(c, c2) for c2 in + listdir(os.path.join(path, c))]) + elif c in ['id-cache', 'version']: + children[i] = None + elif self.interspersed_vcs_files \ + and self._vcs_is_versioned(c) == False: + children[i] = None + for i,c in enumerate(children): + if c == None: continue + cpath = os.path.join(path, c) + if self.interspersed_vcs_files == True \ + and revision != None \ + and self._vcs_is_versioned(cpath) == False: + children[i] = None + else: + children[i] = self._u_path_to_id(cpath) + children[i] + return [c for c in children if c != None] + + def _get(self, id, default=libbe.util.InvalidObject, revision=None): + try: + path = self._cached_path_id.path(id) + except InvalidID, e: + if default == libbe.util.InvalidObject: + raise e + return default + relpath = self._u_rel_path(path) + try: + contents = self._vcs_get_file_contents(relpath, revision) + except InvalidID, e: + if InvalidID == None: + e.id = InvalidID + raise + if contents in [libbe.storage.base.InvalidDirectory, + libbe.util.InvalidObject]: + raise InvalidID(id) + elif len(contents) == 0: + return None + return contents + + def _set(self, id, value): + try: + path = self._cached_path_id.path(id) + except InvalidID, e: + raise e + if not os.path.exists(path): + raise InvalidID(id) + if os.path.isdir(path): + raise libbe.storage.base.InvalidDirectory(id) + f = open(path, "wb") + f.write(value) + f.close() + self._vcs_update(self._u_rel_path(path)) + + def _commit(self, summary, body=None, allow_empty=False): + summary = summary.strip()+'\n' + if body is not None: + summary += '\n' + body.strip() + '\n' + descriptor, filename = tempfile.mkstemp() + revision = None + try: + temp_file = os.fdopen(descriptor, 'wb') + temp_file.write(summary) + temp_file.flush() + revision = self._vcs_commit(filename, allow_empty=allow_empty) + temp_file.close() + finally: + os.remove(filename) + return revision + + def revision_id(self, index=None): + if index == None: + return None + try: + if int(index) != index: + raise InvalidRevision(index) + except ValueError: + raise InvalidRevision(index) + revid = self._vcs_revision_id(index) + if revid == None: + raise libbe.storage.base.InvalidRevision(index) + return revid + + def _u_any_in_string(self, list, string): + """ + Return True if any of the strings in list are in string. + Otherwise return False. + """ + for list_string in list: + if list_string in string: + return True + return False + + def _u_invoke(self, *args, **kwargs): + if 'cwd' not in kwargs: + kwargs['cwd'] = self.repo + if 'verbose' not in kwargs: + kwargs['verbose'] = self.verbose_invoke + if 'encoding' not in kwargs: + kwargs['encoding'] = self.encoding + return invoke(*args, **kwargs) + + def _u_invoke_client(self, *args, **kwargs): + cl_args = [self.client] + cl_args.extend(args) + return self._u_invoke(cl_args, **kwargs) + + def _u_search_parent_directories(self, path, filename): + """ + Find the file (or directory) named filename in path or in any + of path's parents. + + e.g. + search_parent_directories("/a/b/c", ".be") + will return the path to the first existing file from + /a/b/c/.be + /a/b/.be + /a/.be + /.be + or None if none of those files exist. + """ + return search_parent_directories(path, filename) + + def _u_find_id(self, id, revision): + """ + Search for the relative path to id as of revision. + Returns None if the id is not found. + """ + assert self._rooted == True + be_dir = self._cached_path_id._spacer_dirs[0] + stack = [(be_dir, be_dir)] + while len(stack) > 0: + path,long_id = stack.pop() + if long_id.endswith('/'+id): + return path + if self._vcs_isdir(path, revision) == False: + continue + for child in self._vcs_listdir(path, revision): + stack.append((os.path.join(path, child), + '/'.join([long_id, child]))) + raise InvalidID(id, revision=revision) + + def _u_path_to_id(self, path): + return self._cached_path_id.id(path) + + def _u_rel_path(self, path, root=None): + """ + Return the relative path to path from root. + >>> vcs = new() + >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c") + '.be' + >>> vcs._u_rel_path("/a.b/c/", "/a.b/c") + '.' + >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/") + '.' + >>> vcs._u_rel_path("./a", ".") + 'a' + """ + if root == None: + if self.repo == None: + raise VCSNotRooted(self) + root = self.repo + path = os.path.abspath(path) + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + if path in [absRoot, absRootSlashedDir]: + return '.' + if not path.startswith(absRootSlashedDir): + raise InvalidPath(path, absRootSlashedDir) + relpath = path[len(absRootSlashedDir):] + return relpath + + def _u_abspath(self, path, root=None): + """ + Return the absolute path from a path realtive to root. + >>> vcs = new() + >>> vcs._u_abspath(".be", "/a.b/c") + '/a.b/c/.be' + """ + if root == None: + assert self.repo != None, "VCS not rooted" + root = self.repo + return os.path.abspath(os.path.join(root, path)) + + def _u_parse_commitfile(self, commitfile): + """ + Split the commitfile created in self.commit() back into + summary and header lines. + """ + f = codecs.open(commitfile, 'r', self.encoding) + summary = f.readline() + body = f.read() + body.lstrip('\n') + if len(body) == 0: + body = None + f.close() + return (summary, body) + + def check_storage_version(self): + version = self.storage_version() + if version != libbe.storage.STORAGE_VERSION: + upgrade.upgrade(self.repo, version) + + def storage_version(self, revision=None, path=None): + """ + Requires disk access. + """ + if path == None: + path = os.path.join(self.repo, '.be', 'version') + if not os.path.exists(path): + raise libbe.storage.InvalidStorageVersion(None) + if revision == None: # don't require connection + return libbe.util.encoding.get_file_contents( + path, decode=True).rstrip('\n') + contents = self._vcs_get_file_contents(path, revision=revision) + if type(contents) != types.UnicodeType: + contents = unicode(contents, self.encoding) + return contents.strip() + + def _setup_storage_version(self): + """ + Requires disk access. + """ + assert self._rooted == True + path = os.path.join(self.be_dir, 'version') + if not os.path.exists(path): + libbe.util.encoding.set_file_contents(path, + libbe.storage.STORAGE_VERSION+'\n') + self._vcs_add(self._u_rel_path(path)) + + +if libbe.TESTING == True: + class VCSTestCase (unittest.TestCase): + """ + Test cases for base VCS class (in addition to the Storage test + cases). + """ + + Class = VCS + + def __init__(self, *args, **kwargs): + super(VCSTestCase, self).__init__(*args, **kwargs) + self.dirname = None + + def setUp(self): + """Set up test fixtures for Storage test case.""" + super(VCSTestCase, self).setUp() + self.dir = Dir() + self.dirname = self.dir.path + self.s = self.Class(repo=self.dirname) + if self.s.installed() == True: + self.s.init() + self.s.connect() + + def tearDown(self): + super(VCSTestCase, self).tearDown() + if self.s.installed() == True: + self.s.disconnect() + self.s.destroy() + self.dir.cleanup() + + class VCS_installed_TestCase (VCSTestCase): + def test_installed(self): + """ + See if the VCS is installed. + """ + self.failUnless(self.s.installed() == True, + '%(name)s VCS not found' % vars(self.Class)) + + + class VCS_detection_TestCase (VCSTestCase): + def test_detection(self): + """ + See if the VCS detects its installed repository + """ + if self.s.installed(): + self.s.disconnect() + self.failUnless(self.s._detect(self.dirname) == True, + 'Did not detected %(name)s VCS after initialising' + % vars(self.Class)) + self.s.connect() + + def test_no_detection(self): + """ + See if the VCS detects its installed repository + """ + if self.s.installed() and self.Class.name != 'None': + self.s.disconnect() + self.s.destroy() + self.failUnless(self.s._detect(self.dirname) == False, + 'Detected %(name)s VCS before initialising' + % vars(self.Class)) + self.s.init() + self.s.connect() + + def test_vcs_repo_in_specified_root_path(self): + """VCS root directory should be in specified root path.""" + rp = os.path.realpath(self.s.repo) + dp = os.path.realpath(self.dirname) + vcs_name = self.Class.name + self.failUnless( + dp == rp or rp == None, + "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars()) + + class VCS_get_user_id_TestCase(VCSTestCase): + """Test cases for VCS.get_user_id method.""" + + def test_gets_existing_user_id(self): + """Should get the existing user ID.""" + if self.s.installed(): + user_id = self.s.get_user_id() + if user_id == None: + return + name,email = libbe.ui.util.user.parse_user_id(user_id) + if email != None: + self.failUnless('@' in email, email) + + def make_vcs_testcase_subclasses(vcs_class, namespace): + c = vcs_class() + if c.installed(): + if c.versioned == True: + libbe.storage.base.make_versioned_storage_testcase_subclasses( + vcs_class, namespace) + else: + libbe.storage.base.make_storage_testcase_subclasses( + vcs_class, namespace) + + if namespace != sys.modules[__name__]: + # Make VCSTestCase subclasses for vcs_class in the namespace. + vcs_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, VCSTestCase) \ + and c.Class == VCS] + + for base_class in vcs_testcase_classes: + testcase_class_name = vcs_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = vcs_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + make_vcs_testcase_subclasses(VCS, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py new file mode 100644 index 0000000..7335861 --- /dev/null +++ b/libbe/storage/vcs/bzr.py @@ -0,0 +1,196 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Ben Finney <benf@cybersource.com.au> +# Gianluca Montecchi <gian@grys.it> +# Marien Zwart <marienz@gentoo.org> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Bazaar (bzr) backend. +""" + +try: + import bzrlib + import bzrlib.branch + import bzrlib.builtins + import bzrlib.config + import bzrlib.errors + import bzrlib.option +except ImportError: + bzrlib = None +import os +import os.path +import re +import shutil +import StringIO + +import libbe +import base + +if libbe.TESTING == True: + import doctest + import sys + import unittest + + +def new(): + return Bzr() + +class Bzr(base.VCS): + name = 'bzr' + client = None # bzrlib module + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + + def _vcs_version(self): + if bzrlib == None: + return None + return bzrlib.__version__ + + def _vcs_get_user_id(self): + # excerpted from bzrlib.builtins.cmd_whoami.run() + try: + c = bzrlib.branch.Branch.open_containing(self.repo)[0].get_config() + except errors.NotBranchError: + c = bzrlib.config.GlobalConfig() + return c.username() + + def _vcs_detect(self, path): + if self._u_search_parent_directories(path, '.bzr') != None : + return True + return False + + def _vcs_root(self, path): + """Find the root of the deepest repository containing path.""" + cmd = bzrlib.builtins.cmd_root() + cmd.outf = StringIO.StringIO() + cmd.run(filename=path) + return cmd.outf.getvalue().rstrip('\n') + + def _vcs_init(self, path): + cmd = bzrlib.builtins.cmd_init() + cmd.outf = StringIO.StringIO() + cmd.run(location=path) + + def _vcs_destroy(self): + vcs_dir = os.path.join(self.repo, '.bzr') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + + def _vcs_add(self, path): + path = os.path.join(self.repo, path) + cmd = bzrlib.builtins.cmd_add() + cmd.outf = StringIO.StringIO() + cmd.run(file_list=[path], file_ids_from=self.repo) + + def _vcs_remove(self, path): + # --force to also remove unversioned files. + path = os.path.join(self.repo, path) + cmd = bzrlib.builtins.cmd_remove() + cmd.outf = StringIO.StringIO() + cmd.run(file_list=[path], file_deletion_strategy='force') + + def _vcs_update(self, path): + pass + + def _parse_revision_string(self, revision=None): + if revision == None: + return revision + rev_opt = bzrlib.option.Option.OPTIONS['revision'] + try: + rev_spec = rev_opt.type(revision) + except bzrlib.errors.NoSuchRevisionSpec: + raise base.InvalidRevision(revision) + return rev_spec + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + path = os.path.join(self.repo, path) + revision = self._parse_revision_string(revision) + cmd = bzrlib.builtins.cmd_cat() + cmd.outf = StringIO.StringIO() + try: + cmd.run(filename=path, revision=revision) + except bzrlib.errors.BzrCommandError, e: + if 'not present in revision' in str(e): + raise base.InvalidPath(path, root=self.repo, revision=revision) + raise + return cmd.outf.getvalue() + + def _vcs_path(self, id, revision): + return self._u_find_id(id, revision) + + def _vcs_isdir(self, path, revision): + try: + self._vcs_listdir(path, revision) + except AttributeError, e: + if 'children' in str(e): + return False + raise + return True + + def _vcs_listdir(self, path, revision): + path = os.path.join(self.repo, path) + revision = self._parse_revision_string(revision) + cmd = bzrlib.builtins.cmd_ls() + cmd.outf = StringIO.StringIO() + try: + cmd.run(revision=revision, path=path) + except bzrlib.errors.BzrCommandError, e: + if 'not present in revision' in str(e): + raise base.InvalidPath(path, root=self.repo, revision=revision) + raise + children = cmd.outf.getvalue().rstrip('\n').splitlines() + children = [self._u_rel_path(c, path) for c in children] + return children + + def _vcs_commit(self, commitfile, allow_empty=False): + cmd = bzrlib.builtins.cmd_commit() + cmd.outf = StringIO.StringIO() + cwd = os.getcwd() + os.chdir(self.repo) + try: + cmd.run(file=commitfile, unchanged=allow_empty) + except bzrlib.errors.BzrCommandError, e: + strings = ['no changes to commit.', # bzr 1.3.1 + 'No changes to commit.'] # bzr 1.15.1 + if self._u_any_in_string(strings, str(e)) == True: + raise base.EmptyCommit() + raise + finally: + os.chdir(cwd) + return self._vcs_revision_id(-1) + + def _vcs_revision_id(self, index): + cmd = bzrlib.builtins.cmd_revno() + cmd.outf = StringIO.StringIO() + cmd.run(location=self.repo) + current_revision = int(cmd.outf.getvalue()) + if index > current_revision or index < -current_revision: + return None + if index >= 0: + return str(index) # bzr commit 0 is the empty tree. + return str(current_revision+index+1) + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/darcs.py b/libbe/storage/vcs/darcs.py new file mode 100644 index 0000000..6d47ea5 --- /dev/null +++ b/libbe/storage/vcs/darcs.py @@ -0,0 +1,288 @@ +# Copyright (C) 2009 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Darcs backend. +""" + +import codecs +import os +import re +import shutil +import sys +import time # work around http://mercurial.selenic.com/bts/issue618 +try: # import core module, Python >= 2.5 + from xml.etree import ElementTree +except ImportError: # look for non-core module + from elementtree import ElementTree +from xml.sax.saxutils import unescape + +import libbe +import base + +if libbe.TESTING == True: + import doctest + import unittest + + +def new(): + return Darcs() + +class Darcs(base.VCS): + name='darcs' + client='darcs' + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618 + + def _vcs_version(self): + status,output,error = self._u_invoke_client('--version') + return output.rstrip('\n') + + def version_cmp(self, *args): + """ + Compare the installed darcs version V_i with another version + V_o (given in *args). Returns + 1 if V_i > V_o, + 0 if V_i == V_o, and + -1 if V_i < V_o + >>> d = Darcs(repo='.') + >>> d._vcs_version = lambda : "2.3.1 (release)" + >>> d.version_cmp(2,3,1) + 0 + >>> d.version_cmp(2,3,2) + -1 + >>> d.version_cmp(2,3,0) + 1 + >>> d.version_cmp(3) + -1 + >>> d._vcs_version = lambda : "2.0.0pre2" + >>> d._parsed_version = None + >>> d.version_cmp(3) + Traceback (most recent call last): + ... + NotImplementedError: Cannot parse "2.0.0pre2" portion of Darcs version "2.0.0pre2" + invalid literal for int() with base 10: '0pre2' + """ + if not hasattr(self, '_parsed_version') \ + or self._parsed_version == None: + num_part = self._vcs_version().split(' ')[0] + try: + self._parsed_version = [int(i) for i in num_part.split('.')] + except ValueError, e: + raise NotImplementedError( + 'Cannot parse "%s" portion of Darcs version "%s"\n %s' + % (num_part, self._vcs_version(), str(e))) + cmps = [cmp(a,b) for a,b in zip(self._parsed_version, args)] + for c in cmps: + if c != 0: + return c + return 0 + + def _vcs_get_user_id(self): + # following http://darcs.net/manual/node4.html#SECTION00410030000000000000 + # as of June 29th, 2009 + if self.repo == None: + return None + darcs_dir = os.path.join(self.repo, '_darcs') + if darcs_dir != None: + for pref_file in ['author', 'email']: + pref_path = os.path.join(darcs_dir, 'prefs', pref_file) + if os.path.exists(pref_path): + return self.get_file_contents(pref_path) + for env_variable in ['DARCS_EMAIL', 'EMAIL']: + if env_variable in os.environ: + return os.environ[env_variable] + return None + + def _vcs_detect(self, path): + if self._u_search_parent_directories(path, "_darcs") != None : + return True + return False + + def _vcs_root(self, path): + """Find the root of the deepest repository containing path.""" + # Assume that nothing funny is going on; in particular, that we aren't + # dealing with a bare repo. + if os.path.isdir(path) != True: + path = os.path.dirname(path) + darcs_dir = self._u_search_parent_directories(path, '_darcs') + if darcs_dir == None: + return None + return os.path.dirname(darcs_dir) + + def _vcs_init(self, path): + self._u_invoke_client('init', cwd=path) + + def _vcs_destroy(self): + vcs_dir = os.path.join(self.repo, '_darcs') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + + def _vcs_add(self, path): + if os.path.isdir(path): + return + self._u_invoke_client('add', path) + + def _vcs_remove(self, path): + if not os.path.isdir(self._u_abspath(path)): + os.remove(os.path.join(self.repo, path)) # darcs notices removal + + def _vcs_update(self, path): + self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618 + pass # darcs notices changes + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + if self.version_cmp(2, 0, 0) == 1: + status,output,error = self._u_invoke_client( \ + 'show', 'contents', '--patch', revision, path) + return output + # Darcs versions < 2.0.0pre2 lack the 'show contents' command + + status,output,error = self._u_invoke_client( \ + 'diff', '--unified', '--from-patch', revision, path, + unicode_output=False) + major_patch = output + status,output,error = self._u_invoke_client( \ + 'diff', '--unified', '--patch', revision, path, + unicode_output=False) + target_patch = output + + # '--output -' to be supported in GNU patch > 2.5.9 + # but that hasn't been released as of June 30th, 2009. + + # Rewrite path to status before the patch we want + args=['patch', '--reverse', path] + status,output,error = self._u_invoke(args, stdin=major_patch) + # Now apply the patch we want + args=['patch', path] + status,output,error = self._u_invoke(args, stdin=target_patch) + + if os.path.exists(os.path.join(self.repo, path)) == True: + contents = base.VCS._vcs_get_file_contents(self, path) + else: + contents = '' + + # Now restore path to it's current incarnation + args=['patch', '--reverse', path] + status,output,error = self._u_invoke(args, stdin=target_patch) + args=['patch', path] + status,output,error = self._u_invoke(args, stdin=major_patch) + current_contents = base.VCS._vcs_get_file_contents(self, path) + return contents + + def _vcs_path(self, id, revision): + return self._u_find_id(id, revision) + + def _vcs_isdir(self, path, revision): + if self.version_cmp(2, 3, 1) == 1: + # Sun Nov 15 20:32:06 EST 2009 thomashartman1@gmail.com + # * add versioned show files functionality (darcs show files -p 'some patch') + status,output,error = self._u_invoke_client( \ + 'show', 'files', '--no-files', '--patch', revision) + children = output.rstrip('\n').splitlines() + rpath = '.' + children = [self._u_rel_path(c, rpath) for c in children] + if path in children: + return True + return False + # Darcs versions <= 2.3.1 lack the --patch option for 'show files' + raise NotImplementedError + + def _vcs_listdir(self, path, revision): + if self.version_cmp(2, 3, 1) == 1: + # Sun Nov 15 20:32:06 EST 2009 thomashartman1@gmail.com + # * add versioned show files functionality (darcs show files -p 'some patch') + # Wed Dec 9 05:42:21 EST 2009 Luca Molteni <volothamp@gmail.com> + # * resolve issue835 show file with file directory arguments + path = path.rstrip(os.path.sep) + status,output,error = self._u_invoke_client( \ + 'show', 'files', '--patch', revision, path) + files = output.rstrip('\n').splitlines() + if path == '.': + descendents = [self._u_rel_path(f, path) for f in files + if f != '.'] + else: + descendents = [self._u_rel_path(f, path) for f in files + if f.startswith(path)] + return [f for f in descendents if f.count(os.path.sep) == 0] + # Darcs versions <= 2.3.1 lack the --patch option for 'show files' + raise NotImplementedError + + def _vcs_commit(self, commitfile, allow_empty=False): + id = self.get_user_id() + if id == None or '@' not in id: + id = '%s <%s@invalid.com>' % (id, id) + args = ['record', '--all', '--author', id, '--logfile', commitfile] + status,output,error = self._u_invoke_client(*args) + empty_strings = ['No changes!'] + # work around http://mercurial.selenic.com/bts/issue618 + if self._u_any_in_string(empty_strings, output) == True \ + and len(self.__updated) > 0: + time.sleep(1) + for path in self.__updated: + os.utime(os.path.join(self.repo, path), None) + status,output,error = self._u_invoke_client(*args) + self.__updated = [] + # end work around + if self._u_any_in_string(empty_strings, output) == True: + if allow_empty == False: + raise base.EmptyCommit() + # note that darcs does _not_ make an empty revision. + # this returns the last non-empty revision id... + revision = self._vcs_revision_id(-1) + else: + revline = re.compile("Finished recording patch '(.*)'") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revision = match.groups()[0] + return revision + + def _vcs_revision_id(self, index): + status,output,error = self._u_invoke_client('changes', '--xml') + revisions = [] + xml_str = output.encode('unicode_escape').replace(r'\n', '\n') + element = ElementTree.XML(xml_str) + assert element.tag == 'changelog', element.tag + for patch in element.getchildren(): + assert patch.tag == 'patch', patch.tag + for child in patch.getchildren(): + if child.tag == 'name': + text = unescape(unicode(child.text).decode('unicode_escape').strip()) + revisions.append(text) + revisions.reverse() + try: + if index > 0: + return revisions[index-1] + elif index < 0: + return revisions[index] + else: + return None + except IndexError: + return None + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/git.py b/libbe/storage/vcs/git.py new file mode 100644 index 0000000..2280665 --- /dev/null +++ b/libbe/storage/vcs/git.py @@ -0,0 +1,183 @@ +# Copyright (C) 2008-2009 Ben Finney <benf@cybersource.com.au> +# Chris Ball <cjb@laptop.org> +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Git backend. +""" + +import os +import os.path +import re +import shutil +import unittest + +import libbe +import libbe.ui.util.user +import base + +if libbe.TESTING == True: + import doctest + import sys + + +def new(): + return Git() + +class Git(base.VCS): + name='git' + client='git' + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + + def _vcs_version(self): + status,output,error = self._u_invoke_client('--version') + return output + + def _vcs_get_user_id(self): + status,output,error = \ + self._u_invoke_client('config', 'user.name', expect=(0,1)) + if status == 0: + name = output.rstrip('\n') + else: + name = '' + status,output,error = \ + self._u_invoke_client('config', 'user.email', expect=(0,1)) + if status == 0: + email = output.rstrip('\n') + else: + email = '' + if name != '' or email != '': # got something! + # guess missing info, if necessary + if name == '': + name = libbe.ui.util.user.get_fallback_username() + if email == '': + email = libe.ui.util.user.get_fallback_email() + return libbe.ui.util.user.create_user_id(name, email) + return None # Git has no infomation + + def _vcs_detect(self, path): + if self._u_search_parent_directories(path, '.git') != None : + return True + return False + + def _vcs_root(self, path): + """Find the root of the deepest repository containing path.""" + # Assume that nothing funny is going on; in particular, that we aren't + # dealing with a bare repo. + if os.path.isdir(path) != True: + path = os.path.dirname(path) + status,output,error = self._u_invoke_client('rev-parse', '--git-dir', + cwd=path) + gitdir = os.path.join(path, output.rstrip('\n')) + dirname = os.path.abspath(os.path.dirname(gitdir)) + return dirname + + def _vcs_init(self, path): + self._u_invoke_client('init', cwd=path) + + def _vcs_destroy(self): + vcs_dir = os.path.join(self.repo, '.git') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + + def _vcs_add(self, path): + if os.path.isdir(path): + return + self._u_invoke_client('add', path) + + def _vcs_remove(self, path): + if not os.path.isdir(self._u_abspath(path)): + self._u_invoke_client('rm', '-f', path) + + def _vcs_update(self, path): + self._vcs_add(path) + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + else: + arg = '%s:%s' % (revision,path) + status,output,error = self._u_invoke_client('show', arg) + return output + + + def _vcs_path(self, id, revision): + return self._u_find_id(id, revision) + + def _vcs_isdir(self, path, revision): + arg = '%s:%s' % (revision,path) + args = ['ls-tree', arg] + kwargs = {'expect':(0,128)} + status,output,error = self._u_invoke_client(*args, **kwargs) + if status != 0: + if 'not a tree object' in error: + return False + raise base.CommandError(args, status, stderr=error) + return True + + def _vcs_listdir(self, path, revision): + arg = '%s:%s' % (revision,path) + status,output,error = self._u_invoke_client( + 'ls-tree', '--name-only', arg) + return output.rstrip('\n').splitlines() + + def _vcs_commit(self, commitfile, allow_empty=False): + args = ['commit', '--all', '--file', commitfile] + if allow_empty == True: + args.append('--allow-empty') + status,output,error = self._u_invoke_client(*args) + else: + kwargs = {'expect':(0,1)} + status,output,error = self._u_invoke_client(*args, **kwargs) + strings = ['nothing to commit', + 'nothing added to commit'] + if self._u_any_in_string(strings, output) == True: + raise base.EmptyCommit() + full_revision = self._vcs_revision_id(-1) + assert full_revision[:7] in output, \ + 'Mismatched revisions:\n%s\n%s' % (full_revision, output) + return full_revision + + def _vcs_revision_id(self, index): + args = ['rev-list', '--first-parent', '--reverse', 'HEAD'] + kwargs = {'expect':(0,128)} + status,output,error = self._u_invoke_client(*args, **kwargs) + if status == 128: + if error.startswith("fatal: ambiguous argument 'HEAD': unknown "): + return None + raise base.CommandError(args, status, stderr=error) + revisions = output.splitlines() + try: + if index > 0: + return revisions[index-1] + elif index < 0: + return revisions[index] + else: + return None + except IndexError: + return None + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Git, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py new file mode 100644 index 0000000..11494a9 --- /dev/null +++ b/libbe/storage/vcs/hg.py @@ -0,0 +1,180 @@ +# Copyright (C) 2007-2009 Aaron Bentley and Panometrics, Inc. +# Ben Finney <benf@cybersource.com.au> +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Mercurial (hg) backend. +""" + +try: + import mercurial + import mercurial.version + import mercurial.dispatch + import mercurial.ui +except ImportError: + mercurial = None +import os +import os.path +import re +import shutil +import StringIO +import sys +import time # work around http://mercurial.selenic.com/bts/issue618 + +import libbe +import base + +if libbe.TESTING == True: + import doctest + import unittest + + +def new(): + return Hg() + +class Hg(base.VCS): + name='hg' + client=None # mercurial module + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618 + + def _vcs_version(self): + if mercurial == None: + return None + return mercurial.version.get_version() + + def _u_invoke_client(self, *args, **kwargs): + if 'cwd' not in kwargs: + kwargs['cwd'] = self.repo + assert len(kwargs) == 1, kwargs + fullargs = ['--cwd', kwargs['cwd']] + fullargs.extend(args) + stdout = sys.stdout + tmp_stdout = StringIO.StringIO() + sys.stdout = tmp_stdout + cwd = os.getcwd() + mercurial.dispatch.dispatch(fullargs) + os.chdir(cwd) + sys.stdout = stdout + return tmp_stdout.getvalue().rstrip('\n') + + def _vcs_get_user_id(self): + return self._u_invoke_client('showconfig', 'ui.username') + + def _vcs_detect(self, path): + """Detect whether a directory is revision-controlled using Mercurial""" + if self._u_search_parent_directories(path, '.hg') != None: + return True + return False + + def _vcs_root(self, path): + return self._u_invoke_client('root', cwd=path) + + def _vcs_init(self, path): + self._u_invoke_client('init', cwd=path) + + def _vcs_destroy(self): + vcs_dir = os.path.join(self.repo, '.hg') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + + def _vcs_add(self, path): + self._u_invoke_client('add', path) + + def _vcs_remove(self, path): + self._u_invoke_client('rm', '--force', path) + + def _vcs_update(self, path): + self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618 + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + else: + return self._u_invoke_client('cat', '-r', revision, path) + + def _vcs_path(self, id, revision): + output = self._u_invoke_client('manifest', '--rev', revision) + be_dir = self._cached_path_id._spacer_dirs[0] + be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep + files = [f for f in output.splitlines() if f.startswith(be_dir_sep)] + for file in files: + if not file.startswith(be_dir+os.path.sep): + continue + parts = file.split(os.path.sep) + dir = parts.pop(0) # don't add the first spacer dir + for part in parts[:-1]: + dir = os.path.join(dir, part) + if not dir in files: + files.append(dir) + for file in files: + if self._u_path_to_id(file) == id: + return file + raise base.InvalidId(id, revision=revision) + + def _vcs_isdir(self, path, revision): + output = self._u_invoke_client('manifest', '--rev', revision) + files = output.splitlines() + if path in files: + return False + return True + + def _vcs_listdir(self, path, revision): + output = self._u_invoke_client('manifest', '--rev', revision) + files = output.splitlines() + path = path.rstrip(os.path.sep) + os.path.sep + return [self._u_rel_path(f, path) for f in files if f.startswith(path)] + + def _vcs_commit(self, commitfile, allow_empty=False): + args = ['commit', '--logfile', commitfile] + output = self._u_invoke_client(*args) + # work around http://mercurial.selenic.com/bts/issue618 + strings = ['nothing changed'] + if self._u_any_in_string(strings, output) == True \ + and len(self.__updated) > 0: + time.sleep(1) + for path in self.__updated: + os.utime(os.path.join(self.repo, path), None) + output = self._u_invoke_client(*args) + self.__updated = [] + # end work around + if allow_empty == False: + strings = ['nothing changed'] + if self._u_any_in_string(strings, output) == True: + raise base.EmptyCommit() + return self._vcs_revision_id(-1) + + def _vcs_revision_id(self, index, style='id'): + if index > 0: + index -= 1 + args = ['identify', '--rev', str(int(index)), '--%s' % style] + output = self._u_invoke_client(*args) + id = output.strip() + if id == '000000000000': + return None # before initial commit. + return id + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |