diff options
Diffstat (limited to 'libbe/storage')
-rw-r--r-- | libbe/storage/__init__.py | 74 | ||||
-rw-r--r-- | libbe/storage/base.py | 1070 | ||||
-rw-r--r-- | libbe/storage/http.py | 446 | ||||
-rw-r--r-- | libbe/storage/util/__init__.py | 0 | ||||
-rw-r--r-- | libbe/storage/util/config.py | 114 | ||||
-rw-r--r-- | libbe/storage/util/mapfile.py | 146 | ||||
-rw-r--r-- | libbe/storage/util/properties.py | 666 | ||||
-rw-r--r-- | libbe/storage/util/settings_object.py | 617 | ||||
-rw-r--r-- | libbe/storage/util/upgrade.py | 331 | ||||
-rw-r--r-- | libbe/storage/vcs/__init__.py | 41 | ||||
-rw-r--r-- | libbe/storage/vcs/arch.py | 441 | ||||
-rw-r--r-- | libbe/storage/vcs/base.py | 1127 | ||||
-rw-r--r-- | libbe/storage/vcs/bzr.py | 361 | ||||
-rw-r--r-- | libbe/storage/vcs/darcs.py | 399 | ||||
-rw-r--r-- | libbe/storage/vcs/git.py | 269 | ||||
-rw-r--r-- | libbe/storage/vcs/hg.py | 257 |
16 files changed, 6359 insertions, 0 deletions
diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py new file mode 100644 index 0000000..6bceac9 --- /dev/null +++ b/libbe/storage/__init__.py @@ -0,0 +1,74 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Define the :class:`~libbe.storage.base.Storage` and +:class:`~libbe.storage.base.VersionedStorage` classes for storing BE +data. + +Also define assorted implementations for the Storage classes: + +* :mod:`libbe.storage.vcs` +* :mod:`libbe.storage.http` + +Also define an assortment of storage-related tools and utilities: + +* :mod:`libbe.storage.util` +""" + +import base + +ConnectionError = base.ConnectionError +InvalidStorageVersion = base.InvalidStorageVersion +InvalidID = base.InvalidID +InvalidRevision = base.InvalidRevision +InvalidDirectory = base.InvalidDirectory +NotWriteable = base.NotWriteable +NotReadable = base.NotReadable +EmptyCommit = base.EmptyCommit + +# a list of all past versions +STORAGE_VERSIONS = ['Bugs Everywhere Tree 1 0', + 'Bugs Everywhere Directory v1.1', + 'Bugs Everywhere Directory v1.2', + 'Bugs Everywhere Directory v1.3', + 'Bugs Everywhere Directory v1.4', + ] + +# the current version +STORAGE_VERSION = STORAGE_VERSIONS[-1] + +def get_http_storage(location): + import http + return http.HTTP(location) + +def get_vcs_storage(location): + import vcs + s = vcs.detect_vcs(location) + s.repo = location + return s + +def get_storage(location): + """ + Return a Storage instance from a repo location string. + """ + if location.startswith('http://') or location.startswith('https://'): + return get_http_storage(location) + return get_vcs_storage(location) + +__all__ = [ConnectionError, InvalidStorageVersion, InvalidID, + InvalidRevision, InvalidDirectory, NotWriteable, NotReadable, + EmptyCommit, STORAGE_VERSIONS, STORAGE_VERSION, + get_storage] diff --git a/libbe/storage/base.py b/libbe/storage/base.py new file mode 100644 index 0000000..0ae9c53 --- /dev/null +++ b/libbe/storage/base.py @@ -0,0 +1,1070 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Abstract bug repository data storage to easily support multiple backends. +""" + +import copy +import os +import pickle +import types + +from libbe.error import NotSupported +import libbe.storage +from libbe.util.tree import Tree +from libbe.util import InvalidObject +import libbe.version +from libbe import TESTING + +if TESTING == True: + import doctest + import os.path + import sys + import unittest + + from libbe.util.utility import Dir + +class ConnectionError (Exception): + pass + +class InvalidStorageVersion(ConnectionError): + def __init__(self, active_version, expected_version=None): + if expected_version == None: + expected_version = libbe.storage.STORAGE_VERSION + msg = 'Storage in "%s" not the expected "%s"' \ + % (active_version, expected_version) + Exception.__init__(self, msg) + self.active_version = active_version + self.expected_version = expected_version + +class InvalidID (KeyError): + def __init__(self, id=None, revision=None, msg=None): + KeyError.__init__(self, id) + self.msg = msg + self.id = id + self.revision = revision + def __str__(self): + if self.msg == None: + return '%s in revision %s' % (self.id, self.revision) + return self.msg + + +class InvalidRevision (KeyError): + pass + +class InvalidDirectory (Exception): + pass + +class DirectoryNotEmpty (InvalidDirectory): + pass + +class NotWriteable (NotSupported): + def __init__(self, msg): + NotSupported.__init__(self, 'write', msg) + +class NotReadable (NotSupported): + def __init__(self, msg): + NotSupported.__init__(self, 'read', msg) + +class EmptyCommit(Exception): + def __init__(self): + Exception.__init__(self, 'No changes to commit') + +class _EMPTY (object): + """Entry has been added but has no user-set value.""" + pass + +class Entry (Tree): + def __init__(self, id, value=_EMPTY, parent=None, directory=False, + children=None): + if children == None: + Tree.__init__(self) + else: + Tree.__init__(self, children) + self.id = id + self.value = value + self.parent = parent + if self.parent != None: + if self.parent.directory == False: + raise InvalidDirectory( + 'Non-directory %s cannot have children' % self.parent) + parent.append(self) + self.directory = directory + + def __str__(self): + return '<Entry %s: %s>' % (self.id, self.value) + + def __repr__(self): + return str(self) + + def __cmp__(self, other, local=False): + if other == None: + return cmp(1, None) + if cmp(self.id, other.id) != 0: + return cmp(self.id, other.id) + if cmp(self.value, other.value) != 0: + return cmp(self.value, other.value) + if local == False: + if self.parent == None: + if cmp(self.parent, other.parent) != 0: + return cmp(self.parent, other.parent) + elif self.parent.__cmp__(other.parent, local=True) != 0: + return self.parent.__cmp__(other.parent, local=True) + for sc,oc in zip(self, other): + if sc.__cmp__(oc, local=True) != 0: + return sc.__cmp__(oc, local=True) + return 0 + + def _objects_to_ids(self): + if self.parent != None: + self.parent = self.parent.id + for i,c in enumerate(self): + self[i] = c.id + return self + + def _ids_to_objects(self, dict): + if self.parent != None: + self.parent = dict[self.parent] + for i,c in enumerate(self): + self[i] = dict[c] + return self + +class Storage (object): + """ + This class declares all the methods required by a Storage + interface. This implementation just keeps the data in a + dictionary and uses pickle for persistent storage. + """ + name = 'Storage' + + def __init__(self, repo='/', encoding='utf-8', options=None): + self.repo = repo + self.encoding = encoding + self.options = options + self.readable = True # soft limit (user choice) + self._readable = True # hard limit (backend choice) + self.writeable = True # soft limit (user choice) + self._writeable = True # hard limit (backend choice) + self.versioned = False + self.can_init = True + self.connected = False + + def __str__(self): + return '<%s %s %s>' % (self.__class__.__name__, id(self), self.repo) + + def __repr__(self): + return str(self) + + def version(self): + """Return a version string for this backend.""" + return libbe.version.version() + + def storage_version(self, revision=None): + """Return the storage format for this backend.""" + return libbe.storage.STORAGE_VERSION + + def is_readable(self): + return self.readable and self._readable + + def is_writeable(self): + return self.writeable and self._writeable + + def init(self): + """Create a new storage repository.""" + if self.can_init == False: + raise NotSupported('init', + 'Cannot initialize this repository format.') + if self.is_writeable() == False: + raise NotWriteable('Cannot initialize unwriteable storage.') + return self._init() + + def _init(self): + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') + root = Entry(id='__ROOT__', directory=True) + d = {root.id:root} + pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1) + f.close() + + def destroy(self): + """Remove the storage repository.""" + if self.is_writeable() == False: + raise NotWriteable('Cannot destroy unwriteable storage.') + return self._destroy() + + def _destroy(self): + os.remove(os.path.join(self.repo, 'repo.pkl')) + + def connect(self): + """Open a connection to the repository.""" + if self.is_readable() == False: + raise NotReadable('Cannot connect to unreadable storage.') + self._connect() + self.connected = True + + def _connect(self): + try: + f = open(os.path.join(self.repo, 'repo.pkl'), 'rb') + except IOError: + raise ConnectionError(self) + d = pickle.load(f) + self._data = dict((k,v._ids_to_objects(d)) for k,v in d.items()) + f.close() + + def disconnect(self): + """Close the connection to the repository.""" + if self.is_writeable() == False: + return + if self.connected == False: + return + self._disconnect() + self.connected = False + + def _disconnect(self): + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') + pickle.dump(dict((k,v._objects_to_ids()) + for k,v in self._data.items()), f, -1) + f.close() + self._data = None + + def add(self, id, *args, **kwargs): + """Add an entry""" + if self.is_writeable() == False: + raise NotWriteable('Cannot add entry to unwriteable storage.') + if not self.exists(id): + self._add(id, *args, **kwargs) + + def _add(self, id, parent=None, directory=False): + if parent == None: + parent = '__ROOT__' + p = self._data[parent] + self._data[id] = Entry(id, parent=p, directory=directory) + + def exists(self, *args, **kwargs): + """Check an entry's existence""" + if self.is_readable() == False: + raise NotReadable('Cannot check entry existence in unreadable storage.') + return self._exists(*args, **kwargs) + + def _exists(self, id, revision=None): + return id in self._data + + def remove(self, *args, **kwargs): + """Remove an entry.""" + if self.is_writeable() == False: + raise NotSupported('write', + 'Cannot remove entry from unwriteable storage.') + self._remove(*args, **kwargs) + + def _remove(self, id): + if self._data[id].directory == True \ + and len(self.children(id)) > 0: + raise DirectoryNotEmpty(id) + e = self._data.pop(id) + e.parent.remove(e) + + def recursive_remove(self, *args, **kwargs): + """Remove an entry and all its decendents.""" + if self.is_writeable() == False: + raise NotSupported('write', + 'Cannot remove entries from unwriteable storage.') + self._recursive_remove(*args, **kwargs) + + def _recursive_remove(self, id): + for entry in reversed(list(self._data[id].traverse())): + self._remove(entry.id) + + def ancestors(self, *args, **kwargs): + """Return a list of the specified entry's ancestors' ids.""" + if self.is_readable() == False: + raise NotReadable('Cannot list parents with unreadable storage.') + return self._ancestors(*args, **kwargs) + + def _ancestors(self, id=None, revision=None): + if id == None: + return [] + ancestors = [] + stack = [id] + while len(stack) > 0: + id = stack.pop(0) + parent = self._data[id].parent + if parent != None and not parent.id.startswith('__'): + ancestor = parent.id + ancestors.append(ancestor) + stack.append(ancestor) + return ancestors + + def children(self, *args, **kwargs): + """Return a list of specified entry's children's ids.""" + if self.is_readable() == False: + raise NotReadable('Cannot list children with unreadable storage.') + return self._children(*args, **kwargs) + + def _children(self, id=None, revision=None): + if id == None: + id = '__ROOT__' + return [c.id for c in self._data[id] if not c.id.startswith('__')] + + def get(self, *args, **kwargs): + """ + Get contents of and entry as they were in a given revision. + revision==None specifies the current revision. + + If there is no id, return default, unless default is not + given, in which case raise InvalidID. + """ + if self.is_readable() == False: + raise NotReadable('Cannot get entry with unreadable storage.') + if 'decode' in kwargs: + decode = kwargs.pop('decode') + else: + decode = False + value = self._get(*args, **kwargs) + if value != None: + if decode == True and type(value) != types.UnicodeType: + return unicode(value, self.encoding) + elif decode == False and type(value) != types.StringType: + return value.encode(self.encoding) + return value + + def _get(self, id, default=InvalidObject, revision=None): + if id in self._data and self._data[id].value != _EMPTY: + return self._data[id].value + elif default == InvalidObject: + raise InvalidID(id) + return default + + def set(self, id, value, *args, **kwargs): + """ + Set the entry contents. + """ + if self.is_writeable() == False: + raise NotWriteable('Cannot set entry in unwriteable storage.') + if type(value) == types.UnicodeType: + value = value.encode(self.encoding) + self._set(id, value, *args, **kwargs) + + def _set(self, id, value): + if id not in self._data: + raise InvalidID(id) + if self._data[id].directory == True: + raise InvalidDirectory( + 'Directory %s cannot have data' % self.parent) + self._data[id].value = value + +class VersionedStorage (Storage): + """ + This class declares all the methods required by a Storage + interface that supports versioning. This implementation just + keeps the data in a list and uses pickle for persistent + storage. + """ + name = 'VersionedStorage' + + def __init__(self, *args, **kwargs): + Storage.__init__(self, *args, **kwargs) + self.versioned = True + + def _init(self): + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') + root = Entry(id='__ROOT__', directory=True) + summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit') + body = Entry(id='__COMMIT__BODY__') + initial_commit = {root.id:root, summary.id:summary, body.id:body} + d = dict((k,v._objects_to_ids()) for k,v in initial_commit.items()) + pickle.dump([d, copy.deepcopy(d)], f, -1) # [inital tree, working tree] + f.close() + + def _connect(self): + try: + f = open(os.path.join(self.repo, 'repo.pkl'), 'rb') + except IOError: + raise ConnectionError(self) + d = pickle.load(f) + self._data = [dict((k,v._ids_to_objects(t)) for k,v in t.items()) + for t in d] + f.close() + + def _disconnect(self): + f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') + pickle.dump([dict((k,v._objects_to_ids()) + for k,v in t.items()) for t in self._data], f, -1) + f.close() + self._data = None + + def _add(self, id, parent=None, directory=False): + if parent == None: + parent = '__ROOT__' + p = self._data[-1][parent] + self._data[-1][id] = Entry(id, parent=p, directory=directory) + + def _exists(self, id, revision=None): + if revision == None: + revision = -1 + else: + revision = int(revision) + return id in self._data[revision] + + def _remove(self, id): + if self._data[-1][id].directory == True \ + and len(self.children(id)) > 0: + raise DirectoryNotEmpty(id) + e = self._data[-1].pop(id) + e.parent.remove(e) + + def _recursive_remove(self, id): + for entry in reversed(list(self._data[-1][id].traverse())): + self._remove(entry.id) + + def _ancestors(self, id=None, revision=None): + if id == None: + return [] + if revision == None: + revision = -1 + else: + revision = int(revision) + ancestors = [] + stack = [id] + while len(stack) > 0: + id = stack.pop(0) + parent = self._data[revision][id].parent + if parent != None and not parent.id.startswith('__'): + ancestor = parent.id + ancestors.append(ancestor) + stack.append(ancestor) + return ancestors + + def _children(self, id=None, revision=None): + if id == None: + id = '__ROOT__' + if revision == None: + revision = -1 + else: + revision = int(revision) + return [c.id for c in self._data[revision][id] + if not c.id.startswith('__')] + + def _get(self, id, default=InvalidObject, revision=None): + if revision == None: + revision = -1 + else: + revision = int(revision) + if id in self._data[revision] \ + and self._data[revision][id].value != _EMPTY: + return self._data[revision][id].value + elif default == InvalidObject: + raise InvalidID(id) + return default + + def _set(self, id, value): + if id not in self._data[-1]: + raise InvalidID(id) + self._data[-1][id].value = value + + def commit(self, *args, **kwargs): + """ + Commit the current repository, with a commit message string + summary and body. Return the name of the new revision. + + If allow_empty == False (the default), raise EmptyCommit if + there are no changes to commit. + """ + if self.is_writeable() == False: + raise NotWriteable('Cannot commit to unwriteable storage.') + return self._commit(*args, **kwargs) + + def _commit(self, summary, body=None, allow_empty=False): + if self._data[-1] == self._data[-2] and allow_empty == False: + raise EmptyCommit + self._data[-1]["__COMMIT__SUMMARY__"].value = summary + self._data[-1]["__COMMIT__BODY__"].value = body + rev = str(len(self._data)-1) + self._data.append(copy.deepcopy(self._data[-1])) + return rev + + def revision_id(self, index=None): + """ + Return the name of the <index>th revision. The choice of + which branch to follow when crossing branches/merges is not + defined. Revision indices start at 1; ID 0 is the blank + repository. + + Return None if index==None. + + If the specified revision does not exist, raise InvalidRevision. + """ + if index == None: + return None + try: + if int(index) != index: + raise InvalidRevision(index) + except ValueError: + raise InvalidRevision(index) + L = len(self._data) - 1 # -1 b/c of initial commit + if index >= -L and index <= L: + return str(index % L) + raise InvalidRevision(i) + + def changed(self, revision): + """Return a tuple of lists of ids `(new, modified, removed)` from the + specified revision to the current situation. + """ + new = [] + modified = [] + removed = [] + for id,value in self._data[int(revision)].items(): + if id.startswith('__'): + continue + if not id in self._data[-1]: + removed.append(id) + elif value.value != self._data[-1][id].value: + modified.append(id) + for id in self._data[-1]: + if not id in self._data[int(revision)]: + new.append(id) + return (new, modified, removed) + + +if TESTING == True: + class StorageTestCase (unittest.TestCase): + """Test cases for Storage class.""" + + Class = Storage + + def __init__(self, *args, **kwargs): + super(StorageTestCase, self).__init__(*args, **kwargs) + self.dirname = None + + # this class will be the basis of tests for several classes, + # so make sure we print the name of the class we're dealing with. + def _classname(self): + version = '?' + try: + if hasattr(self, 's'): + version = self.s.version() + except: + pass + return '%s:%s' % (self.Class.__name__, version) + + def fail(self, msg=None): + """Fail immediately, with the given message.""" + raise self.failureException, \ + '(%s) %s' % (self._classname(), msg) + + def failIf(self, expr, msg=None): + "Fail the test if the expression is true." + if expr: raise self.failureException, \ + '(%s) %s' % (self._classname(), msg) + + def failUnless(self, expr, msg=None): + """Fail the test unless the expression is true.""" + if not expr: raise self.failureException, \ + '(%s) %s' % (self._classname(), msg) + + def setUp(self): + """Set up test fixtures for Storage test case.""" + super(StorageTestCase, self).setUp() + self.dir = Dir() + self.dirname = self.dir.path + self.s = self.Class(repo=self.dirname) + self.assert_failed_connect() + self.s.init() + self.s.connect() + + def tearDown(self): + super(StorageTestCase, self).tearDown() + self.s.disconnect() + self.s.destroy() + self.assert_failed_connect() + self.dir.cleanup() + + def assert_failed_connect(self): + try: + self.s.connect() + self.fail( + "Connected to %(name)s repository before initialising" + % vars(self.Class)) + except ConnectionError: + pass + + class Storage_init_TestCase (StorageTestCase): + """Test cases for Storage.init method.""" + + def test_connect_should_succeed_after_init(self): + """Should connect after initialization.""" + self.s.connect() + + class Storage_connect_disconnect_TestCase (StorageTestCase): + """Test cases for Storage.connect and .disconnect methods.""" + + def test_multiple_disconnects(self): + """Should be able to call .disconnect multiple times.""" + self.s.disconnect() + self.s.disconnect() + + class Storage_add_remove_TestCase (StorageTestCase): + """Test cases for Storage.add, .remove, and .recursive_remove methods.""" + + def test_initially_empty(self): + """New repository should be empty.""" + self.failUnless(len(self.s.children()) == 0, self.s.children()) + + def test_add_identical_rooted(self): + """Adding entries with the same ID should not increase the number of children. + """ + for i in range(10): + self.s.add('some id', directory=False) + s = sorted(self.s.children()) + self.failUnless(s == ['some id'], s) + + def test_add_rooted(self): + """Adding entries should increase the number of children (rooted). + """ + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], directory=(i % 2 == 0)) + s = sorted(self.s.children()) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + + def test_add_nonrooted(self): + """Adding entries should increase the number of children (nonrooted). + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + s = self.s.children() + self.failUnless(s == ['parent'], s) + + def test_ancestors(self): + """Check ancestors lists. + """ + self.s.add('parent', directory=True) + for i in range(10): + i_id = str(i) + self.s.add(i_id, 'parent', directory=True) + for j in range(10): # add some grandkids + j_id = str(20*(i+1)+j) + self.s.add(j_id, i_id, directory=(i%2 == 0)) + ancestors = sorted(self.s.ancestors(j_id)) + self.failUnless(ancestors == [i_id, 'parent'], + 'Unexpected ancestors for %s/%s, "%s"' + % (i_id, j_id, ancestors)) + + def test_children(self): + """Non-UUID ids should be returned as such. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append('parent/%s' % str(i)) + self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + + def test_add_invalid_directory(self): + """Should not be able to add children to non-directories. + """ + self.s.add('parent', directory=False) + try: + self.s.add('child', 'parent', directory=False) + self.fail( + '%s.add() succeeded instead of raising InvalidDirectory' + % (vars(self.Class)['name'])) + except InvalidDirectory: + pass + try: + self.s.add('child', 'parent', directory=True) + self.fail( + '%s.add() succeeded instead of raising InvalidDirectory' + % (vars(self.Class)['name'])) + except InvalidDirectory: + pass + self.failUnless(len(self.s.children('parent')) == 0, + self.s.children('parent')) + + def test_remove_rooted(self): + """Removing entries should decrease the number of children (rooted). + """ + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], directory=(i % 2 == 0)) + for i in range(10): + self.s.remove(ids.pop()) + s = sorted(self.s.children()) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + + def test_remove_nonrooted(self): + """Removing entries should decrease the number of children (nonrooted). + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=False)#(i % 2 == 0)) + for i in range(10): + self.s.remove(ids.pop()) + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + if len(s) > 0: + s = self.s.children() + self.failUnless(s == ['parent'], s) + + def test_remove_directory_not_empty(self): + """Removing a non-empty directory entry should raise exception. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) + self.s.remove(ids.pop()) # empty directory removal succeeds + try: + self.s.remove('parent') # empty directory removal succeeds + self.fail( + "%s.remove() didn't raise DirectoryNotEmpty" + % (vars(self.Class)['name'])) + except DirectoryNotEmpty: + pass + + def test_recursive_remove(self): + """Recursive remove should empty the tree.""" + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=True) + for j in range(10): # add some grandkids + self.s.add(str(20*(i+1)+j), ids[-1], directory=(i%2 == 0)) + self.s.recursive_remove('parent') + s = sorted(self.s.children()) + self.failUnless(s == [], s) + + class Storage_get_set_TestCase (StorageTestCase): + """Test cases for Storage.get and .set methods.""" + + id = 'unlikely id' + val = 'unlikely value' + + def test_get_default(self): + """Get should return specified default if id not in Storage. + """ + ret = self.s.get(self.id, default=self.val) + self.failUnless(ret == self.val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, self.val)) + + def test_get_default_exception(self): + """Get should raise exception if id not in Storage and no default. + """ + try: + ret = self.s.get(self.id) + self.fail( + "%s.get() returned %s instead of raising InvalidID" + % (vars(self.Class)['name'], ret)) + except InvalidID: + pass + + def test_get_initial_value(self): + """Data value should be default before any value has been set. + """ + self.s.add(self.id, directory=False) + val = 'UNLIKELY DEFAULT' + ret = self.s.get(self.id, default=val) + self.failUnless(ret == val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, val)) + + def test_set_exception(self): + """Set should raise exception if id not in Storage. + """ + try: + self.s.set(self.id, self.val) + self.fail( + "%(name)s.set() did not raise InvalidID" + % vars(self.Class)) + except InvalidID: + pass + + def test_set(self): + """Set should define the value returned by get. + """ + self.s.add(self.id, directory=False) + self.s.set(self.id, self.val) + ret = self.s.get(self.id) + self.failUnless(ret == self.val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, self.val)) + + def test_unicode_set(self): + """Set should define the value returned by get. + """ + val = u'Fran\xe7ois' + self.s.add(self.id, directory=False) + self.s.set(self.id, val) + ret = self.s.get(self.id, decode=True) + self.failUnless(type(ret) == types.UnicodeType, + "%s.get() returned %s not UnicodeType" + % (vars(self.Class)['name'], type(ret))) + self.failUnless(ret == val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, self.val)) + ret = self.s.get(self.id) + self.failUnless(type(ret) == types.StringType, + "%s.get() returned %s not StringType" + % (vars(self.Class)['name'], type(ret))) + s = unicode(ret, self.s.encoding) + self.failUnless(s == val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], s, self.val)) + + + class Storage_persistence_TestCase (StorageTestCase): + """Test cases for Storage.disconnect and .connect methods.""" + + id = 'unlikely id' + val = 'unlikely value' + + def test_get_set_persistence(self): + """Set should define the value returned by get after reconnect. + """ + self.s.add(self.id, directory=False) + self.s.set(self.id, self.val) + self.s.disconnect() + self.s.connect() + ret = self.s.get(self.id) + self.failUnless(ret == self.val, + "%s.get() returned %s not %s" + % (vars(self.Class)['name'], ret, self.val)) + + def test_empty_get_set_persistence(self): + """After empty set, get may return either an empty string or default. + """ + self.s.add(self.id, directory=False) + self.s.set(self.id, '') + self.s.disconnect() + self.s.connect() + default = 'UNLIKELY DEFAULT' + ret = self.s.get(self.id, default=default) + self.failUnless(ret in ['', default], + "%s.get() returned %s not in %s" + % (vars(self.Class)['name'], ret, ['', default])) + + def test_add_nonrooted_persistence(self): + """Adding entries should increase the number of children after reconnect. + """ + self.s.add('parent', directory=True) + ids = [] + for i in range(10): + ids.append(str(i)) + self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) + self.s.disconnect() + self.s.connect() + s = sorted(self.s.children('parent')) + self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + s = self.s.children() + self.failUnless(s == ['parent'], s) + + class VersionedStorageTestCase (StorageTestCase): + """Test cases for VersionedStorage methods.""" + + Class = VersionedStorage + + class VersionedStorage_commit_TestCase (VersionedStorageTestCase): + """Test cases for VersionedStorage.commit and revision_ids methods.""" + + id = 'unlikely id' + val = 'Some value' + commit_msg = 'Committing something interesting' + commit_body = 'Some\nlonger\ndescription\n' + + def _setup_for_empty_commit(self): + """ + Initialization might add some files to version control, so + commit those first, before testing the empty commit + functionality. + """ + try: + self.s.commit('Added initialization files') + except EmptyCommit: + pass + + def test_revision_id_exception(self): + """Invalid revision id should raise InvalidRevision. + """ + try: + rev = self.s.revision_id('highly unlikely revision id') + self.fail( + "%s.revision_id() didn't raise InvalidRevision, returned %s." + % (vars(self.Class)['name'], rev)) + except InvalidRevision: + pass + + def test_empty_commit_raises_exception(self): + """Empty commit should raise exception. + """ + self._setup_for_empty_commit() + try: + self.s.commit(self.commit_msg, self.commit_body) + self.fail( + "Empty %(name)s.commit() didn't raise EmptyCommit." + % vars(self.Class)) + except EmptyCommit: + pass + + def test_empty_commit_allowed(self): + """Empty commit should _not_ raise exception if allow_empty=True. + """ + self._setup_for_empty_commit() + self.s.commit(self.commit_msg, self.commit_body, + allow_empty=True) + + def test_commit_revision_ids(self): + """Commit / revision_id should agree on revision ids. + """ + def val(i): + return '%s:%d' % (self.val, i+1) + self.s.add(self.id, directory=False) + revs = [] + for i in range(10): + self.s.set(self.id, val(i)) + revs.append(self.s.commit('%s: %d' % (self.commit_msg, i), + self.commit_body)) + for i in range(10): + rev = self.s.revision_id(i+1) + self.failUnless(rev == revs[i], + "%s.revision_id(%d) returned %s not %s" + % (vars(self.Class)['name'], i+1, rev, revs[i])) + for i in range(-1, -9, -1): + rev = self.s.revision_id(i) + self.failUnless(rev == revs[i], + "%s.revision_id(%d) returned %s not %s" + % (vars(self.Class)['name'], i, rev, revs[i])) + + def test_get_previous_version(self): + """Get should be able to return the previous version. + """ + def val(i): + return '%s:%d' % (self.val, i+1) + self.s.add(self.id, directory=False) + revs = [] + for i in range(10): + self.s.set(self.id, val(i)) + revs.append(self.s.commit('%s: %d' % (self.commit_msg, i), + self.commit_body)) + for i in range(10): + ret = self.s.get(self.id, revision=revs[i]) + self.failUnless(ret == val(i), + "%s.get() returned %s not %s for revision %s" + % (vars(self.Class)['name'], ret, val(i), revs[i])) + + def test_get_previous_children(self): + """Children list should be revision dependent. + """ + self.s.add('parent', directory=True) + revs = [] + cur_children = [] + children = [] + for i in range(10): + new_child = str(i) + self.s.add(new_child, 'parent') + self.s.set(new_child, self.val) + revs.append(self.s.commit('%s: %d' % (self.commit_msg, i), + self.commit_body)) + cur_children.append(new_child) + children.append(list(cur_children)) + for i in range(10): + ret = sorted(self.s.children('parent', revision=revs[i])) + self.failUnless(ret == children[i], + "%s.get() returned %s not %s for revision %s" + % (vars(self.Class)['name'], ret, + children[i], revs[i])) + + class VersionedStorage_changed_TestCase (VersionedStorageTestCase): + """Test cases for VersionedStorage.changed() method.""" + + def test_changed(self): + """Changed lists should reflect past activity""" + self.s.add('dir', directory=True) + self.s.add('modified', parent='dir') + self.s.set('modified', 'some value to be modified') + self.s.add('moved', parent='dir') + self.s.set('moved', 'this entry will be moved') + self.s.add('removed', parent='dir') + self.s.set('removed', 'this entry will be deleted') + revA = self.s.commit('Initial state') + self.s.add('new', parent='dir') + self.s.set('new', 'this entry is new') + self.s.set('modified', 'a new value') + self.s.remove('moved') + self.s.add('moved2', parent='dir') + self.s.set('moved2', 'this entry will be moved') + self.s.remove('removed') + revB = self.s.commit('Final state') + new,mod,rem = self.s.changed(revA) + self.failUnless(sorted(new) == ['moved2', 'new'], + 'Unexpected new: %s' % new) + self.failUnless(mod == ['modified'], + 'Unexpected modified: %s' % mod) + self.failUnless(sorted(rem) == ['moved', 'removed'], + 'Unexpected removed: %s' % rem) + + def make_storage_testcase_subclasses(storage_class, namespace): + """Make StorageTestCase subclasses for storage_class in namespace.""" + storage_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, StorageTestCase) \ + and c.Class == Storage] + + for base_class in storage_testcase_classes: + testcase_class_name = storage_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = storage_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + def make_versioned_storage_testcase_subclasses(storage_class, namespace): + """Make VersionedStorageTestCase subclasses for storage_class in namespace.""" + storage_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if ((issubclass(c, StorageTestCase) \ + and c.Class == Storage) + or + (issubclass(c, VersionedStorageTestCase) \ + and c.Class == VersionedStorage))] + + for base_class in storage_testcase_classes: + testcase_class_name = storage_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = storage_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + make_storage_testcase_subclasses(VersionedStorage, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/http.py b/libbe/storage/http.py new file mode 100644 index 0000000..7ec9f54 --- /dev/null +++ b/libbe/storage/http.py @@ -0,0 +1,446 @@ +# Copyright (C) 2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +# For urllib2 information, see +# urllib2, from urllib2 - The Missing Manual +# http://www.voidspace.org.uk/python/articles/urllib2.shtml +# +# A dictionary of response codes is available in +# httplib.responses + +"""Define an HTTP-based :class:`~libbe.storage.base.VersionedStorage` +implementation. + +See Also +-------- +:mod:`libbe.command.serve` : the associated server + +""" + +import sys +import urllib +import urllib2 +import urlparse + +import libbe +import libbe.version +import base +from libbe import TESTING + +if TESTING == True: + import copy + import doctest + import StringIO + import unittest + + import libbe.bugdir + import libbe.command.serve + + +USER_AGENT = 'BE-HTTP-Storage' +HTTP_OK = 200 +HTTP_FOUND = 302 +HTTP_TEMP_REDIRECT = 307 +HTTP_USER_ERROR = 418 +"""Status returned to indicate exceptions on the server side. + +A BE-specific extension to the HTTP/1.1 protocol (See `RFC 2616`_). + +.. _RFC 2616: http://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html#sec6.1.1 +""" + +HTTP_VALID = [HTTP_OK, HTTP_FOUND, HTTP_TEMP_REDIRECT, HTTP_USER_ERROR] + +class InvalidURL (Exception): + def __init__(self, error=None, url=None, msg=None): + Exception.__init__(self, msg) + self.url = url + self.error = error + self.msg = msg + def __str__(self): + if self.msg == None: + if self.error == None: + return "Unknown URL error: %s" % self.url + return self.error.__str__() + return self.msg + +def get_post_url(url, get=True, data_dict=None, headers=[]): + """Execute a GET or POST transaction. + + Parameters + ---------- + url : str + The base URL (query portion added internally, if necessary). + get : bool + Use GET if True, otherwise use POST. + data_dict : dict + Data to send, either by URL query (if GET) or by POST (if POST). + headers : list + Extra HTTP headers to add to the request. + """ + if data_dict == None: + data_dict = {} + if get == True: + if data_dict != {}: + # encode get parameters in the url + param_string = urllib.urlencode(data_dict) + url = "%s?%s" % (url, param_string) + data = None + else: + data = urllib.urlencode(data_dict) + headers = dict(headers) + headers['User-Agent'] = USER_AGENT + req = urllib2.Request(url, data=data, headers=headers) + try: + response = urllib2.urlopen(req) + except urllib2.HTTPError, e: + if hasattr(e, 'reason'): + msg = 'We failed to reach a server.\nURL: %s\nReason: %s' \ + % (url, e.reason) + elif hasattr(e, 'code'): + msg = "The server couldn't fulfill the request.\nURL: %s\nError code: %s" \ + % (url, e.code) + raise InvalidURL(error=e, url=url, msg=msg) + page = response.read() + final_url = response.geturl() + info = response.info() + response.close() + return (page, final_url, info) + + +class HTTP (base.VersionedStorage): + """:class:`~libbe.storage.base.VersionedStorage` implementation over + HTTP. + + Uses GET to retrieve information and POST to set information. + """ + name = 'HTTP' + + def __init__(self, repo, *args, **kwargs): + repo,self.uname,self.password = self.parse_repo(repo) + base.VersionedStorage.__init__(self, repo, *args, **kwargs) + + def parse_repo(self, repo): + """Grab username and password (if any) from the repo URL. + + Examples + -------- + + >>> s = HTTP('http://host.com/path/to/repo') + >>> s.repo + 'http://host.com/path/to/repo' + >>> s.uname == None + True + >>> s.password == None + True + >>> s.parse_repo('http://joe:secret@host.com/path/to/repo') + ('http://host.com/path/to/repo', 'joe', 'secret') + """ + scheme,netloc,path,params,query,fragment = urlparse.urlparse(repo) + parts = netloc.split('@', 1) + if len(parts) == 2: + uname,password = parts[0].split(':') + repo = urlparse.urlunparse( + (scheme, parts[1], path, params, query, fragment)) + else: + uname,password = (None, None) + return (repo, uname, password) + + def get_post_url(self, url, get=True, data_dict=None, headers=[]): + if self.uname != None and self.password != None: + headers.append(('Authorization','Basic %s' % \ + ('%s:%s' % (self.uname, self.password)).encode('base64'))) + return get_post_url(url, get, data_dict, headers) + + def storage_version(self, revision=None): + """Return the storage format for this backend.""" + return libbe.storage.STORAGE_VERSION + + def _init(self): + """Create a new storage repository.""" + raise base.NotSupported( + 'init', 'Cannot initialize this repository format.') + + def _destroy(self): + """Remove the storage repository.""" + raise base.NotSupported( + 'destroy', 'Cannot destroy this repository format.') + + def _connect(self): + self.check_storage_version() + + def _disconnect(self): + pass + + def _add(self, id, parent=None, directory=False): + url = urlparse.urljoin(self.repo, 'add') + page,final_url,info = self.get_post_url( + url, get=False, + data_dict={'id':id, 'parent':parent, 'directory':directory}) + + def _exists(self, id, revision=None): + url = urlparse.urljoin(self.repo, 'exists') + page,final_url,info = self.get_post_url( + url, get=True, + data_dict={'id':id, 'revision':revision}) + if page == 'True': + return True + return False + + def _remove(self, id): + url = urlparse.urljoin(self.repo, 'remove') + page,final_url,info = self.get_post_url( + url, get=False, + data_dict={'id':id, 'recursive':False}) + + def _recursive_remove(self, id): + url = urlparse.urljoin(self.repo, 'remove') + page,final_url,info = self.get_post_url( + url, get=False, + data_dict={'id':id, 'recursive':True}) + + def _ancestors(self, id=None, revision=None): + url = urlparse.urljoin(self.repo, 'ancestors') + page,final_url,info = self.get_post_url( + url, get=True, + data_dict={'id':id, 'revision':revision}) + return page.strip('\n').splitlines() + + def _children(self, id=None, revision=None): + url = urlparse.urljoin(self.repo, 'children') + page,final_url,info = self.get_post_url( + url, get=True, + data_dict={'id':id, 'revision':revision}) + return page.strip('\n').splitlines() + + def _get(self, id, default=base.InvalidObject, revision=None): + url = urlparse.urljoin(self.repo, '/'.join(['get', id])) + try: + page,final_url,info = self.get_post_url( + url, get=True, + data_dict={'revision':revision}) + except InvalidURL, e: + if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID): + raise + elif default == base.InvalidObject: + raise base.InvalidID(id) + return default + version = info['X-BE-Version'] + if version != libbe.storage.STORAGE_VERSION: + raise base.InvalidStorageVersion( + version, libbe.storage.STORAGE_VERSION) + return page + + def _set(self, id, value): + url = urlparse.urljoin(self.repo, '/'.join(['set', id])) + try: + page,final_url,info = self.get_post_url( + url, get=False, + data_dict={'value':value}) + except InvalidURL, e: + if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID): + raise + if e.error.code == HTTP_USER_ERROR \ + and not 'InvalidID' in str(e.error): + raise base.InvalidDirectory( + 'Directory %s cannot have data' % id) + raise base.InvalidID(id) + + def _commit(self, summary, body=None, allow_empty=False): + url = urlparse.urljoin(self.repo, 'commit') + try: + page,final_url,info = self.get_post_url( + url, get=False, + data_dict={'summary':summary, 'body':body, + 'allow_empty':allow_empty}) + except InvalidURL, e: + if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID): + raise + if e.error.code == HTTP_USER_ERROR: + raise base.EmptyCommit + raise base.InvalidID(id) + return page.rstrip('\n') + + def revision_id(self, index=None): + """Return the name of the <index>th revision. + + The choice of which branch to follow when crossing + branches/merges is not defined. Revision indices start at 1; + ID 0 is the blank repository. + + Return None if index==None. + + Raises + ------ + InvalidRevision + If the specified revision does not exist. + """ + if index == None: + return None + try: + if int(index) != index: + raise base.InvalidRevision(index) + except ValueError: + raise base.InvalidRevision(index) + url = urlparse.urljoin(self.repo, 'revision-id') + try: + page,final_url,info = self.get_post_url( + url, get=True, + data_dict={'index':index}) + except InvalidURL, e: + if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID): + raise + if e.error.code == HTTP_USER_ERROR: + raise base.InvalidRevision(index) + raise base.InvalidID(id) + return page.rstrip('\n') + + def changed(self, revision=None): + url = urlparse.urljoin(self.repo, 'changed') + page,final_url,info = self.get_post_url( + url, get=True, + data_dict={'revision':revision}) + lines = page.strip('\n') + new,mod,rem = [p.splitlines() for p in page.split('\n\n')] + return (new, mod, rem) + + def check_storage_version(self): + version = self.storage_version() + if version != libbe.storage.STORAGE_VERSION: + raise base.InvalidStorageVersion( + version, libbe.storage.STORAGE_VERSION) + + def storage_version(self, revision=None): + url = urlparse.urljoin(self.repo, 'version') + page,final_url,info = self.get_post_url( + url, get=True, data_dict={'revision':revision}) + return page.rstrip('\n') + +if TESTING == True: + class GetPostUrlTestCase (unittest.TestCase): + """Test cases for get_post_url()""" + def test_get(self): + url = 'http://bugseverywhere.org/be/show/HomePage' + page,final_url,info = get_post_url(url=url) + self.failUnless(final_url == url, + 'Redirect?\n Expected: "%s"\n Got: "%s"' + % (url, final_url)) + def test_get_redirect(self): + url = 'http://bugseverywhere.org' + expected = 'http://bugseverywhere.org/be/show/HomePage' + page,final_url,info = get_post_url(url=url) + self.failUnless(final_url == expected, + 'Redirect?\n Expected: "%s"\n Got: "%s"' + % (expected, final_url)) + + class TestingHTTP (HTTP): + name = 'TestingHTTP' + def __init__(self, repo, *args, **kwargs): + self._storage_backend = base.VersionedStorage(repo) + self.app = libbe.command.serve.ServerApp( + storage=self._storage_backend) + HTTP.__init__(self, repo='http://localhost:8000/', *args, **kwargs) + self.intitialized = False + # duplicated from libbe.storage.serve.WSGITestCase + self.default_environ = { + 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD' + 'SCRIPT_NAME':'', + 'PATH_INFO': '', + #'QUERY_STRING':'', # may be empty or absent + #'CONTENT_TYPE':'', # may be empty or absent + #'CONTENT_LENGTH':'', # may be empty or absent + 'SERVER_NAME':'example.com', + 'SERVER_PORT':'80', + 'SERVER_PROTOCOL':'HTTP/1.1', + 'wsgi.version':(1,0), + 'wsgi.url_scheme':'http', + 'wsgi.input':StringIO.StringIO(), + 'wsgi.errors':StringIO.StringIO(), + 'wsgi.multithread':False, + 'wsgi.multiprocess':False, + 'wsgi.run_once':False, + } + def getURL(self, app, path='/', method='GET', data=None, + scheme='http', environ={}): + # duplicated from libbe.storage.serve.WSGITestCase + env = copy.copy(self.default_environ) + env['PATH_INFO'] = path + env['REQUEST_METHOD'] = method + env['scheme'] = scheme + if data != None: + enc_data = urllib.urlencode(data) + if method == 'POST': + env['CONTENT_LENGTH'] = len(enc_data) + env['wsgi.input'] = StringIO.StringIO(enc_data) + else: + assert method in ['GET', 'HEAD'], method + env['QUERY_STRING'] = enc_data + for key,value in environ.items(): + env[key] = value + return ''.join(app(env, self.start_response)) + def start_response(self, status, response_headers, exc_info=None): + self.status = status + self.response_headers = response_headers + self.exc_info = exc_info + def get_post_url(self, url, get=True, data_dict=None, headers=[]): + if get == True: + method = 'GET' + else: + method = 'POST' + scheme,netloc,path,params,query,fragment = urlparse.urlparse(url) + environ = {} + for header_name,header_value in headers: + environ['HTTP_%s' % header_name] = header_value + output = self.getURL( + self.app, path, method, data_dict, scheme, environ) + if self.status != '200 OK': + class __estr (object): + def __init__(self, string): + self.string = string + self.code = int(string.split()[0]) + def __str__(self): + return self.string + error = __estr(self.status) + raise InvalidURL(error=error, url=url, msg=output) + info = dict(self.response_headers) + return (output, url, info) + def _init(self): + try: + HTTP._init(self) + raise AssertionError + except base.NotSupported: + pass + self._storage_backend._init() + def _destroy(self): + try: + HTTP._destroy(self) + raise AssertionError + except base.NotSupported: + pass + self._storage_backend._destroy() + def _connect(self): + self._storage_backend._connect() + HTTP._connect(self) + def _disconnect(self): + HTTP._disconnect(self) + self._storage_backend._disconnect() + + + base.make_versioned_storage_testcase_subclasses( + TestingHTTP, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/util/__init__.py b/libbe/storage/util/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/libbe/storage/util/__init__.py diff --git a/libbe/storage/util/config.py b/libbe/storage/util/config.py new file mode 100644 index 0000000..724d2d3 --- /dev/null +++ b/libbe/storage/util/config.py @@ -0,0 +1,114 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Create, save, and load the per-user config file at :func:`path`. +""" + +import ConfigParser +import codecs +import os.path + +import libbe +import libbe.util.encoding +if libbe.TESTING == True: + import doctest + + +default_encoding = libbe.util.encoding.get_filesystem_encoding() +"""Default filesystem encoding. + +Initialized with :func:`libbe.util.encoding.get_filesystem_encoding`. +""" + +def path(): + """Return the path to the per-user config file. + """ + return os.path.expanduser("~/.bugs_everywhere") + +def set_val(name, value, section="DEFAULT", encoding=None): + """Set a value in the per-user config file. + + Parameters + ---------- + name : str + The name of the value to set. + value : str or None + The new value to set (or None to delete the value). + section : str + The section to store the name/value in. + encoding : str + The config file's encoding, defaults to :data:`default_encoding`. + """ + if encoding == None: + encoding = default_encoding + config = ConfigParser.ConfigParser() + if os.path.exists(path()) == False: # touch file or config + open(path(), 'w').close() # read chokes on missing file + f = codecs.open(path(), 'r', encoding) + config.readfp(f, path()) + f.close() + if value is not None: + config.set(section, name, value) + else: + config.remove_option(section, name) + f = codecs.open(path(), 'w', encoding) + config.write(f) + f.close() + +def get_val(name, section="DEFAULT", default=None, encoding=None): + """Get a value from the per-user config file + + Parameters + ---------- + name : str + The name of the value to set. + section : str + The section to store the name/value in. + default : + The value to return if `name` is not set. + encoding : str + The config file's encoding, defaults to :data:`default_encoding`. + + Examples + -------- + + >>> get_val("junk") is None + True + >>> set_val("junk", "random") + >>> get_val("junk") + u'random' + >>> set_val("junk", None) + >>> get_val("junk") is None + True + """ + if os.path.exists(path()): + if encoding == None: + encoding = default_encoding + config = ConfigParser.ConfigParser() + f = codecs.open(path(), 'r', encoding) + config.readfp(f, path()) + f.close() + try: + return config.get(section, name) + except ConfigParser.NoOptionError: + return default + else: + return default + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/storage/util/mapfile.py b/libbe/storage/util/mapfile.py new file mode 100644 index 0000000..55863d7 --- /dev/null +++ b/libbe/storage/util/mapfile.py @@ -0,0 +1,146 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Serializing and deserializing dictionaries of parameters. + +The serialized "mapfiles" should be clear, flat-text strings, and allow +easy merging of independent/conflicting changes. +""" + +import errno +import os.path +import types +import yaml + +import libbe +if libbe.TESTING == True: + import doctest + + +class IllegalKey(Exception): + def __init__(self, key): + Exception.__init__(self, 'Illegal key "%s"' % key) + self.key = key + +class IllegalValue(Exception): + def __init__(self, value): + Exception.__init__(self, 'Illegal value "%s"' % value) + self.value = value + +class InvalidMapfileContents(Exception): + def __init__(self, contents): + Exception.__init__(self, 'Invalid YAML contents') + self.contents = contents + +def generate(map): + """Generate a YAML mapfile content string. + + Examples + -------- + + >>> generate({'q':'p'}) + 'q: p\\n\\n' + >>> generate({'q':u'Fran\u00e7ais'}) + 'q: Fran\\xc3\\xa7ais\\n\\n' + >>> generate({'q':u'hello'}) + 'q: hello\\n\\n' + >>> generate({'q=':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q=" + >>> generate({'q:':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q:" + >>> generate({'q\\n':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q\\n" + >>> generate({'':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "" + >>> generate({'>q':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key ">q" + >>> generate({'q':'p\\n'}) + Traceback (most recent call last): + IllegalValue: Illegal value "p\\n" + + See Also + -------- + parse : inverse + """ + keys = map.keys() + keys.sort() + for key in keys: + try: + assert not key.startswith('>') + assert('\n' not in key) + assert('=' not in key) + assert(':' not in key) + assert(len(key) > 0) + except AssertionError: + raise IllegalKey(unicode(key).encode('unicode_escape')) + if '\n' in map[key]: + raise IllegalValue(unicode(map[key]).encode('unicode_escape')) + + lines = [] + for key in keys: + lines.append(yaml.safe_dump({key: map[key]}, + default_flow_style=False, + allow_unicode=True)) + lines.append('') + return '\n'.join(lines) + +def parse(contents): + """Parse a YAML mapfile string. + + Examples + -------- + + >>> parse('q: p\\n\\n')['q'] + 'p' + >>> parse('q: \\'p\\'\\n\\n')['q'] + 'p' + >>> contents = generate({'a':'b', 'c':'d', 'e':'f'}) + >>> dict = parse(contents) + >>> dict['a'] + 'b' + >>> dict['c'] + 'd' + >>> dict['e'] + 'f' + >>> contents = generate({'q':u'Fran\u00e7ais'}) + >>> dict = parse(contents) + >>> dict['q'] + u'Fran\\xe7ais' + >>> dict = parse('a!') + Traceback (most recent call last): + ... + InvalidMapfileContents: Invalid YAML contents + + See Also + -------- + generate : inverse + + """ + c = yaml.load(contents) + if type(c) == types.StringType: + raise InvalidMapfileContents( + 'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents) + return c or {} + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py new file mode 100644 index 0000000..b5681b1 --- /dev/null +++ b/libbe/storage/util/properties.py @@ -0,0 +1,666 @@ +# Bugs Everywhere - a distributed bugtracker +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Provides a series of useful decorators for defining various types +of properties. + +For example usage, consider the unittests at the end of the module. + +Notes +----- + +See `PEP 318` and Michele Simionato's `decorator documentation` for +more information on decorators. + +.. _PEP 318: http://www.python.org/dev/peps/pep-0318/ +.. _decorator documentation: http://www.phyast.pitt.edu/~micheles/python/documentation.html + +See Also +-------- +:mod:`libbe.storage.util.settings_object` : bundle properties into a convenient package + +""" + +import copy +import types + +import libbe +if libbe.TESTING == True: + import unittest + + +class ValueCheckError (ValueError): + def __init__(self, name, value, allowed): + action = "in" # some list of allowed values + if type(allowed) == types.FunctionType: + action = "allowed by" # some allowed-value check function + msg = "%s not %s %s for %s" % (value, action, allowed, name) + ValueError.__init__(self, msg) + self.name = name + self.value = value + self.allowed = allowed + +def Property(funcs): + """ + End a chain of property decorators, returning a property. + """ + args = {} + args["fget"] = funcs.get("fget", None) + args["fset"] = funcs.get("fset", None) + args["fdel"] = funcs.get("fdel", None) + args["doc"] = funcs.get("doc", None) + + #print "Creating a property with" + #for key, val in args.items(): print key, value + return property(**args) + +def doc_property(doc=None): + """ + Add a docstring to a chain of property decorators. + """ + def decorator(funcs=None): + """ + Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...} + or a function fn() returning such a dict. + """ + if hasattr(funcs, "__call__"): + funcs = funcs() # convert from function-arg to dict + funcs["doc"] = doc + return funcs + return decorator + +def local_property(name, null=None, mutable_null=False): + """ + Define get/set access to per-parent-instance local storage. Uses + ._<name>_value to store the value for a particular owner instance. + If the ._<name>_value attribute does not exist, returns null. + + If mutable_null == True, we only release deepcopies of the null to + the outside world. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget", None) + fset = funcs.get("fset", None) + def _fget(self): + if fget is not None: + fget(self) + if mutable_null == True: + ret_null = copy.deepcopy(null) + else: + ret_null = null + value = getattr(self, "_%s_value" % name, ret_null) + return value + def _fset(self, value): + setattr(self, "_%s_value" % name, value) + if fset is not None: + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + funcs["name"] = name + return funcs + return decorator + +def settings_property(name, null=None): + """ + Similar to local_property, except where local_property stores the + value in instance._<name>_value, settings_property stores the + value in instance.settings[name]. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget", None) + fset = funcs.get("fset", None) + def _fget(self): + if fget is not None: + fget(self) + value = self.settings.get(name, null) + return value + def _fset(self, value): + self.settings[name] = value + if fset is not None: + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + funcs["name"] = name + return funcs + return decorator + + +# Allow comparison and caching with _original_ values for mutables, +# since +# +# >>> a = [] +# >>> b = a +# >>> b.append(1) +# >>> a +# [1] +# >>> a==b +# True +def _hash_mutable_value(value): + return repr(value) +def _init_mutable_property_cache(self): + if not hasattr(self, "_mutable_property_cache_hash"): + # first call to _fget for any mutable property + self._mutable_property_cache_hash = {} + self._mutable_property_cache_copy = {} +def _set_cached_mutable_property(self, cacher_name, property_name, value): + _init_mutable_property_cache(self) + self._mutable_property_cache_hash[(cacher_name, property_name)] = \ + _hash_mutable_value(value) + self._mutable_property_cache_copy[(cacher_name, property_name)] = \ + copy.deepcopy(value) +def _get_cached_mutable_property(self, cacher_name, property_name, default=None): + _init_mutable_property_cache(self) + if (cacher_name, property_name) not in self._mutable_property_cache_copy: + return default + return self._mutable_property_cache_copy[(cacher_name, property_name)] +def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None): + _init_mutable_property_cache(self) + if (cacher_name, property_name) not in self._mutable_property_cache_hash: + _set_cached_mutable_property(self, cacher_name, property_name, default) + old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)] + return cmp(_hash_mutable_value(value), old_hash) + + +def defaulting_property(default=None, null=None, + mutable_default=False): + """ + Define a default value for get access to a property. + If the stored value is null, then default is returned. + + If mutable_default == True, we only release deepcopies of the + default to the outside world. + + null should never escape to the outside world, so don't worry + about it being a mutable. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value == null: + if mutable_default == True: + return copy.deepcopy(default) + else: + return default + return value + def _fset(self, value): + if value == default: + value = null + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def fn_checked_property(value_allowed_fn): + """ + Define allowed values for get/set access to a property. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value_allowed_fn(value) != True: + raise ValueCheckError(name, value, value_allowed_fn) + return value + def _fset(self, value): + if value_allowed_fn(value) != True: + raise ValueCheckError(name, value, value_allowed_fn) + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def checked_property(allowed=[]): + """ + Define allowed values for get/set access to a property. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value not in allowed: + raise ValueCheckError(name, value, allowed) + return value + def _fset(self, value): + if value not in allowed: + raise ValueCheckError(name, value, allowed) + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def cached_property(generator, initVal=None, mutable=False): + """ + Allow caching of values generated by generator(instance), where + instance is the instance to which this property belongs. Uses + ._<name>_cache to store a cache flag for a particular owner + instance. + + When the cache flag is True or missing and the stored value is + initVal, the first fget call triggers the generator function, + whose output is stored in _<name>_cached_value. That and + subsequent calls to fget will return this cached value. + + If the input value is no longer initVal (e.g. a value has been + loaded from disk or set with fset), that value overrides any + cached value, and this property has no effect. + + When the cache flag is False and the stored value is initVal, the + generator is not cached, but is called on every fget. + + The cache flag is missing on initialization. Particular instances + may override by setting their own flag. + + In the case that mutable == True, all caching is disabled and the + generator is called whenever the cached value would otherwise be + used. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + name = funcs.get("name", "<unknown>") + def _fget(self): + cache = getattr(self, "_%s_cache" % name, True) + value = fget(self) + if value == initVal: + if cache == True and mutable == False: + if hasattr(self, "_%s_cached_value" % name): + value = getattr(self, "_%s_cached_value" % name) + else: + value = generator(self) + setattr(self, "_%s_cached_value" % name, value) + else: + value = generator(self) + return value + funcs["fget"] = _fget + return funcs + return decorator + +def primed_property(primer, initVal=None, unprimeableVal=None): + """ + Just like a cached_property, except that instead of returning a + new value and running fset to cache it, the primer attempts some + background manipulation (e.g. loads data into instance.settings) + such that a _second_ pass through fget succeeds. If the second + pass doesn't succeed (e.g. no readable storage), we give up and + return unprimeableVal. + + The 'cache' flag becomes a 'prime' flag, with priming taking place + whenever ._<name>_prime is True, or is False or missing and + value == initVal. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + name = funcs.get("name", "<unknown>") + def _fget(self): + prime = getattr(self, "_%s_prime" % name, False) + if prime == False: + value = fget(self) + if prime == True or (prime == False and value == initVal): + primer(self) + value = fget(self) + if prime == False and value == initVal: + return unprimeableVal + return value + funcs["fget"] = _fget + return funcs + return decorator + +def change_hook_property(hook, mutable=False, default=None): + """Call the function `hook` whenever a value different from the + current value is set. + + This is useful for saving changes to disk, etc. This function is + called *after* the new value has been stored, allowing you to + change the stored value if you want. + + In the case of mutables, things are slightly trickier. Because + the property-owning class has no way of knowing when the value + changes. We work around this by caching a private deepcopy of the + mutable value, and checking for changes whenever the property is + set (obviously) or retrieved (to check for external changes). So + long as you're conscientious about accessing the property after + making external modifications, mutability won't be a problem:: + + t.x.append(5) # external modification + t.x # dummy access notices change and triggers hook + + See :class:`testChangeHookMutableProperty` for an example of the + expected behavior. + + Parameters + ---------- + hook : fn + `hook(instance, old_value, new_value)`, where `instance` is a + reference to the class instance to which this property belongs. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self, new_value=None, from_fset=False): # only used if mutable == True + if from_fset == True: + value = new_value # compare new value with cached + else: + value = fget(self) # compare current value with cached + if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0: + # there has been a change, cache new value + old_value = _get_cached_mutable_property(self, "change hook property", name, default) + _set_cached_mutable_property(self, "change hook property", name, value) + if from_fset == True: # return previously cached value + value = old_value + else: # the value changed while we weren't looking + hook(self, old_value, value) + return value + def _fset(self, value): + if mutable == True: # get cached previous value + old_value = _fget(self, new_value=value, from_fset=True) + else: + old_value = fget(self) + fset(self, value) + if value != old_value: + hook(self, old_value, value) + if mutable == True: + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +if libbe.TESTING == True: + class DecoratorTests(unittest.TestCase): + def testLocalDoc(self): + class Test(object): + @Property + @doc_property("A fancy property") + def x(): + return {} + self.failUnless(Test.x.__doc__ == "A fancy property", + Test.x.__doc__) + def testLocalProperty(self): + class Test(object): + @Property + @local_property(name="LOCAL") + def x(): + return {} + t = Test() + self.failUnless(t.x == None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value + self.failUnless(t.x == 'z', str(t.x)) + self.failUnless("_LOCAL_value" in dir(t), dir(t)) + self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value) + def testSettingsProperty(self): + class Test(object): + @Property + @settings_property(name="attr") + def x(): + return {} + def __init__(self): + self.settings = {} + t = Test() + self.failUnless(t.x == None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value + self.failUnless(t.x == 'z', str(t.x)) + self.failUnless("attr" in t.settings, t.settings) + self.failUnless(t.settings["attr"] == 'z', t.settings["attr"]) + def testDefaultingLocalProperty(self): + class Test(object): + @Property + @defaulting_property(default='y', null='x') + @local_property(name="DEFAULT", null=5) + def x(): return {} + t = Test() + self.failUnless(t.x == 5, str(t.x)) + t.x = 'x' + self.failUnless(t.x == 'y', str(t.x)) + t.x = 'y' + self.failUnless(t.x == 'y', str(t.x)) + t.x = 'z' + self.failUnless(t.x == 'z', str(t.x)) + t.x = 5 + self.failUnless(t.x == 5, str(t.x)) + def testCheckedLocalProperty(self): + class Test(object): + @Property + @checked_property(allowed=['x', 'y', 'z']) + @local_property(name="CHECKED") + def x(): return {} + def __init__(self): + self._CHECKED_value = 'x' + t = Test() + self.failUnless(t.x == 'x', str(t.x)) + try: + t.x = None + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + def testTwoCheckedLocalProperties(self): + class Test(object): + @Property + @checked_property(allowed=['x', 'y', 'z']) + @local_property(name="X") + def x(): return {} + + @Property + @checked_property(allowed=['a', 'b', 'c']) + @local_property(name="A") + def a(): return {} + def __init__(self): + self._A_value = 'a' + self._X_value = 'x' + t = Test() + try: + t.x = 'a' + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + t.x = 'x' + t.x = 'y' + t.x = 'z' + try: + t.a = 'x' + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + t.a = 'a' + t.a = 'b' + t.a = 'c' + def testFnCheckedLocalProperty(self): + class Test(object): + @Property + @fn_checked_property(lambda v : v in ['x', 'y', 'z']) + @local_property(name="CHECKED") + def x(): return {} + def __init__(self): + self._CHECKED_value = 'x' + t = Test() + self.failUnless(t.x == 'x', str(t.x)) + try: + t.x = None + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + def testCachedLocalProperty(self): + class Gen(object): + def __init__(self): + self.i = 0 + def __call__(self, owner): + self.i += 1 + return self.i + class Test(object): + @Property + @cached_property(generator=Gen(), initVal=None) + @local_property(name="CACHED") + def x(): return {} + t = Test() + self.failIf("_CACHED_cache" in dir(t), + getattr(t, "_CACHED_cache", None)) + self.failUnless(t.x == 1, t.x) + self.failUnless(t.x == 1, t.x) + self.failUnless(t.x == 1, t.x) + t.x = 8 + self.failUnless(t.x == 8, t.x) + self.failUnless(t.x == 8, t.x) + t._CACHED_cache = False # Caching is off, but the stored value + val = t.x # is 8, not the initVal (None), so we + self.failUnless(val == 8, val) # get 8. + t._CACHED_value = None # Now we've set the stored value to None + val = t.x # so future calls to fget (like this) + self.failUnless(val == 2, val) # will call the generator every time... + val = t.x + self.failUnless(val == 3, val) + val = t.x + self.failUnless(val == 4, val) + t._CACHED_cache = True # We turn caching back on, and get + self.failUnless(t.x == 1, str(t.x)) # the original cached value. + del t._CACHED_cached_value # Removing that value forces a + self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call + self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which + self.failUnless(t.x == 5, str(t.x)) # we get the new cached value. + def testPrimedLocalProperty(self): + class Test(object): + def prime(self): + self.settings["PRIMED"] = self.primeVal + @Property + @primed_property(primer=prime, initVal=None, unprimeableVal=2) + @settings_property(name="PRIMED") + def x(): return {} + def __init__(self): + self.settings={} + self.primeVal = "initialized" + t = Test() + self.failIf("_PRIMED_prime" in dir(t), + getattr(t, "_PRIMED_prime", None)) + self.failUnless(t.x == "initialized", t.x) + t.x = 1 + self.failUnless(t.x == 1, t.x) + t.x = None + self.failUnless(t.x == "initialized", t.x) + t._PRIMED_prime = True + t.x = 3 + self.failUnless(t.x == "initialized", t.x) + t._PRIMED_prime = False + t.x = 3 + self.failUnless(t.x == 3, t.x) + # test unprimableVal + t.x = None + t.primeVal = None + self.failUnless(t.x == 2, t.x) + def testChangeHookLocalProperty(self): + class Test(object): + def _hook(self, old, new): + self.old = old + self.new = new + + @Property + @change_hook_property(_hook) + @local_property(name="HOOKED") + def x(): return {} + t = Test() + t.x = 1 + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == 1, t.new) + t.x = 1 + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == 1, t.new) + t.x = 2 + self.failUnless(t.old == 1, t.old) + self.failUnless(t.new == 2, t.new) + def testChangeHookMutableProperty(self): + class Test(object): + def _hook(self, old, new): + self.old = old + self.new = new + self.hook_calls += 1 + + @Property + @change_hook_property(_hook, mutable=True) + @local_property(name="HOOKED") + def x(): return {} + t = Test() + t.hook_calls = 0 + t.x = [] + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == [], t.new) + self.failUnless(t.hook_calls == 1, t.hook_calls) + a = t.x + a.append(5) + t.x = a + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 2, t.hook_calls) + t.x = [] + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [], t.new) + self.failUnless(t.hook_calls == 3, t.hook_calls) + # now append without reassigning. this doesn't trigger the + # change, since we don't ever set t.x, only get it and mess + # with it. It does, however, update our t.new, since t.new = + # t.x and is not a static copy. + t.x.append(5) + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 3, t.hook_calls) + # however, the next t.x get _will_ notice the change... + a = t.x + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 4, t.hook_calls) + t.x.append(6) # this append(6) is not noticed yet + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5,6], t.new) + self.failUnless(t.hook_calls == 4, t.hook_calls) + # this append(7) is not noticed, but the t.x get causes the + # append(6) to be noticed + t.x.append(7) + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [5,6,7], t.new) + self.failUnless(t.hook_calls == 5, t.hook_calls) + a = t.x # now the append(7) is noticed + self.failUnless(t.old == [5,6], t.old) + self.failUnless(t.new == [5,6,7], t.new) + self.failUnless(t.hook_calls == 6, t.hook_calls) + + suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests) diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py new file mode 100644 index 0000000..6e4da55 --- /dev/null +++ b/libbe/storage/util/settings_object.py @@ -0,0 +1,617 @@ +# Bugs Everywhere - a distributed bugtracker +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Provides :class:`SavedSettingsObject` implementing settings-dict +based property storage. + +See Also +-------- +:mod:`libbe.storage.util.properties` : underlying property definitions +""" + +import libbe +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, fn_checked_property, \ + cached_property, primed_property, change_hook_property, \ + settings_property +if libbe.TESTING == True: + import doctest + import unittest + +class _Token (object): + """`Control' value class for properties. + + We want values that only mean something to the `settings_object` + module. + """ + pass + +class UNPRIMED (_Token): + "Property has not been primed (loaded)." + pass + +class EMPTY (_Token): + """Property has been primed but has no user-set value, so use + default/generator value. + """ + pass + + +def prop_save_settings(self, old, new): + """The default action undertaken when a property changes. + """ + if self.storage != None and self.storage.is_writeable(): + self.save_settings() + +def prop_load_settings(self): + """The default action undertaken when an UNPRIMED property is + accessed. + + Attempt to run `.load_settings()`, which calls + `._setup_saved_settings()` internally. If `.storage` is + inaccessible, don't do anything. + """ + if self.storage != None and self.storage.is_readable(): + self.load_settings() + +# Some name-mangling routines for pretty printing setting names +def setting_name_to_attr_name(self, name): + """Convert keys to the `.settings` dict into their associated + SavedSettingsObject attribute names. + + Examples + -------- + + >>> print setting_name_to_attr_name(None,"User-id") + user_id + + See Also + -------- + attr_name_to_setting_name : inverse + """ + return name.lower().replace('-', '_') + +def attr_name_to_setting_name(self, name): + """Convert SavedSettingsObject attribute names to `.settings` dict + keys. + + Examples: + + >>> print attr_name_to_setting_name(None, "user_id") + User-id + + See Also + -------- + setting_name_to_attr_name : inverse + """ + return name.capitalize().replace('_', '-') + + +def versioned_property(name, doc, + default=None, generator=None, + change_hook=prop_save_settings, + mutable=False, + primer=prop_load_settings, + allowed=None, check_fn=None, + settings_properties=[], + required_saved_properties=[], + require_save=False): + """Combine the common decorators in a single function. + + Use zero or one (but not both) of default or generator, since a + working default will keep the generator from functioning. Use the + default if you know what you want the default value to be at + 'coding time'. Use the generator if you can write a function to + determine a valid default at run time. If both default and + generator are None, then the property will be a defaulting + property which defaults to None. + + allowed and check_fn have a similar relationship, although you can + use both of these if you want. allowed compares the proposed + value against a list determined at 'coding time' and check_fn + allows more flexible comparisons to take place at run time. + + Set require_save to True if you want to save the default/generated + value for a property, to protect against future changes. E.g., we + currently expect all comments to be 'text/plain' but in the future + we may want to default to 'text/html'. If we don't want the old + comments to be interpreted as 'text/html', we would require that + the content type be saved. + + change_hook, primer, settings_properties, and + required_saved_properties are only options to get their defaults + into our local scope. Don't mess with them. + + Set mutable=True if: + + * default is a mutable + * your generator function may return mutables + * you set change_hook and might have mutable property values + + See the docstrings in `libbe.properties` for details on how each of + these cases are handled. + + The value stored in `.settings[name]` will be + + * no value (or UNPRIMED) if the property has been neither set, + nor loaded as blank. + * EMPTY if the value has been loaded as blank. + * some value if the property has been either loaded or set. + """ + settings_properties.append(name) + if require_save == True: + required_saved_properties.append(name) + def decorator(funcs): + fulldoc = doc + if default != None or generator == None: + defaulting = defaulting_property(default=default, null=EMPTY, + mutable_default=mutable) + fulldoc += "\n\nThis property defaults to %s." % default + if generator != None: + cached = cached_property(generator=generator, initVal=EMPTY, + mutable=mutable) + fulldoc += "\n\nThis property is generated with %s." % generator + if check_fn != None: + fn_checked = fn_checked_property(value_allowed_fn=check_fn) + fulldoc += "\n\nThis property is checked with %s." % check_fn + if allowed != None: + checked = checked_property(allowed=allowed) + fulldoc += "\n\nThe allowed values for this property are: %s." \ + % (', '.join(allowed)) + hooked = change_hook_property(hook=change_hook, mutable=mutable, + default=EMPTY) + primed = primed_property(primer=primer, initVal=UNPRIMED, + unprimeableVal=EMPTY) + settings = settings_property(name=name, null=UNPRIMED) + docp = doc_property(doc=fulldoc) + deco = hooked(primed(settings(docp(funcs)))) + if default != None or generator == None: + deco = defaulting(deco) + if generator != None: + deco = cached(deco) + if check_fn != None: + deco = fn_checked(deco) + if allowed != None: + deco = checked(deco) + return Property(deco) + return decorator + +class SavedSettingsObject(object): + """Setup a framework for lazy saving and loading of `.settings` + properties. + + This is useful for BE objects with saved properties + (e.g. :class:`~libbe.bugdir.BugDir`, :class:`~libbe.bug.Bug`, + :class:`~libbe.comment.Comment`). For example usage, consider the + unittests at the end of the module. + + See Also + -------- + versioned_property, prop_save_settings, prop_load_settings + setting_name_to_attr_name, attr_name_to_setting_name + """ + # Keep a list of properties that may be stored in the .settings dict. + #settings_properties = [] + + # A list of properties that we save to disk, even if they were + # never set (in which case we save the default value). This + # protects against future changes in default values. + #required_saved_properties = [] + + _setting_name_to_attr_name = setting_name_to_attr_name + _attr_name_to_setting_name = attr_name_to_setting_name + + def __init__(self): + self.storage = None + self.settings = {} + + def load_settings(self): + """Load the settings from disk.""" + # Override. Must call ._setup_saved_settings({}) with + # from-storage settings. + self._setup_saved_settings({}) + + def _setup_saved_settings(self, settings=None): + """ + Sets up a settings dict loaded from storage. Fills in + all missing settings entries with EMPTY. + """ + if settings == None: + settings = {} + for property in self.settings_properties: + if property not in self.settings \ + or self.settings[property] == UNPRIMED: + if property in settings: + self.settings[property] = settings[property] + else: + self.settings[property] = EMPTY + + def save_settings(self): + """Save the settings to disk.""" + # Override. Should save the dict output of ._get_saved_settings() + settings = self._get_saved_settings() + pass # write settings to disk.... + + def _get_saved_settings(self): + """ + In order to avoid overwriting unread on-disk data, make sure + we've loaded anything sitting on the disk. In the current + implementation, all the settings are stored in a single file, + so we need to load _all_ the saved settings. Another approach + would be per-setting saves, in which case you could skip this + step, since any setting changes would have forced that setting + load already. + """ + settings = {} + for k in self.settings_properties: # force full load + if not k in self.settings or self.settings[k] == UNPRIMED: + value = getattr( + self, self._setting_name_to_attr_name(k)) + for k in self.settings_properties: + if k in self.settings and self.settings[k] != EMPTY: + settings[k] = self.settings[k] + elif k in self.required_saved_properties: + settings[k] = getattr( + self, self._setting_name_to_attr_name(k)) + return settings + + def clear_cached_setting(self, setting=None): + "If setting=None, clear *all* cached settings" + if setting != None: + if hasattr(self, "_%s_cached_value" % setting): + delattr(self, "_%s_cached_value" % setting) + else: + for setting in settings_properties: + self.clear_cached_setting(setting) + + +if libbe.TESTING == True: + import copy + + class TestStorage (list): + def __init__(self): + list.__init__(self) + self.readable = True + self.writeable = True + def is_readable(self): + return self.readable + def is_writeable(self): + return self.writeable + + class TestObject (SavedSettingsObject): + def load_settings(self): + self.load_count += 1 + if len(self.storage) == 0: + settings = {} + else: + settings = copy.deepcopy(self.storage[-1]) + self._setup_saved_settings(settings) + def save_settings(self): + settings = self._get_saved_settings() + self.storage.append(copy.deepcopy(settings)) + def __init__(self): + SavedSettingsObject.__init__(self) + self.load_count = 0 + self.storage = TestStorage() + + class SavedSettingsObjectTests(unittest.TestCase): + def testSimplePropertyDoc(self): + """Testing a minimal versioned property docstring""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="Content-type", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def content_type(): return {} + expected = "A test property\n\nThis property defaults to None." + self.failUnless(Test.content_type.__doc__ == expected, + Test.content_type.__doc__) + def testSimplePropertyFromMemory(self): + """Testing a minimal versioned property from memory""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="Content-type", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def content_type(): return {} + t = Test() + self.failUnless(len(t.settings) == 0, len(t.settings)) + # accessing t.content_type triggers the priming, but + # t.storage.is_readable() == False, so nothing happens. + t.storage.readable = False + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(t.settings == {}, t.settings) + self.failUnless(len(t.settings) == 0, len(t.settings)) + self.failUnless(t.content_type == None, t.content_type) + # accessing t.content_type triggers the priming again, and + # now that t.storage.is_readable() == True, this fills out + # t.settings with EMPTY data. At this point there should + # be one load and no saves. + t.storage.readable = True + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + # an explicit call to load settings forces a reload, + # but nothing else changes. + t.load_settings() + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(t.load_count == 2, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + # now we set a value + t.content_type = 5 + self.failUnless(t.settings["Content-type"] == 5, + t.settings["Content-type"]) + self.failUnless(t.load_count == 2, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':5}], t.storage) + # getting its value changes nothing + self.failUnless(t.content_type == 5, t.content_type) + self.failUnless(t.settings["Content-type"] == 5, + t.settings["Content-type"]) + self.failUnless(t.load_count == 2, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':5}], t.storage) + # now we set another value + t.content_type = "text/plain" + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings["Content-type"] == "text/plain", + t.settings["Content-type"]) + self.failUnless(t.load_count == 2, t.load_count) + self.failUnless(len(t.storage) == 2, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':5}, + {'Content-type':'text/plain'}], + t.storage) + # t._get_saved_settings() returns a dict of required or + # non-default values. + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + # now we clear to the post-primed value + t.content_type = EMPTY + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t._get_saved_settings() == {}, + t._get_saved_settings()) + self.failUnless(t.storage == [{'Content-type':5}, + {'Content-type':'text/plain'}, + {}], + t.storage) + def testSimplePropertyFromStorage(self): + """Testing a minimal versioned property from storage""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="prop-a", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def prop_a(): return {} + @versioned_property( + name="prop-b", + doc="Another test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def prop_b(): return {} + t = Test() + t.storage.append({'prop-a':'saved'}) + # setting prop-b forces a load (to check for changes), + # which also pulls in prop-a. + t.prop_b = 'new-b' + settings = {'prop-b':'new-b', 'prop-a':'saved'} + self.failUnless(t.settings == settings, t.settings) + self.failUnless(t._get_saved_settings() == settings, + t._get_saved_settings()) + # test that _get_saved_settings() works even when settings + # were _not_ loaded beforehand + t = Test() + t.storage.append({'prop-a':'saved'}) + settings ={'prop-a':'saved'} + self.failUnless(t.settings == {}, t.settings) + self.failUnless(t._get_saved_settings() == settings, + t._get_saved_settings()) + def testSimplePropertySetStorageSave(self): + """Set a property, then attach storage and save""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="prop-a", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def prop_a(): return {} + @versioned_property( + name="prop-b", + doc="Another test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def prop_b(): return {} + t = Test() + storage = t.storage + t.storage = None + t.prop_a = 'text/html' + t.storage = storage + t.save_settings() + self.failUnless(t.prop_a == 'text/html', t.prop_a) + self.failUnless(t.settings == {'prop-a':'text/html', + 'prop-b':EMPTY}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'prop-a':'text/html'}], + t.storage) + def testDefaultingProperty(self): + """Testing a defaulting versioned property""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="Content-type", + doc="A test property", + default="text/plain", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def content_type(): return {} + t = Test() + self.failUnless(t.settings == {}, t.settings) + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings == {"Content-type":EMPTY}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + self.failUnless(t._get_saved_settings() == {}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t.content_type == "text/html", + t.content_type) + self.failUnless(t.settings == {"Content-type":"text/html"}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':'text/html'}], + t.storage) + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + def testRequiredDefaultingProperty(self): + """Testing a required defaulting versioned property""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="Content-type", + doc="A test property", + default="text/plain", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + require_save=True) + def content_type(): return {} + t = Test() + self.failUnless(t.settings == {}, t.settings) + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings == {"Content-type":EMPTY}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t.content_type == "text/html", + t.content_type) + self.failUnless(t.settings == {"Content-type":"text/html"}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':'text/html'}], + t.storage) + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + def testClassVersionedPropertyDefinition(self): + """Testing a class-specific _versioned property decorator""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + def _versioned_property( + settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"] = \ + required_saved_properties + return versioned_property(**kwargs) + @_versioned_property(name="Content-type", + doc="A test property", + default="text/plain", + require_save=True) + def content_type(): return {} + t = Test() + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + t.content_type = "text/html" + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':'text/html'}], + t.storage) + def testMutableChangeHookedProperty(self): + """Testing a mutable change-hooked property""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="List-type", + doc="A test property", + mutable=True, + change_hook=prop_save_settings, + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def list_type(): return {} + t = Test() + self.failUnless(len(t.storage) == 0, len(t.storage)) + self.failUnless(t.list_type == None, t.list_type) + self.failUnless(len(t.storage) == 0, len(t.storage)) + self.failUnless(t.settings["List-type"]==EMPTY, + t.settings["List-type"]) + t.list_type = [] + self.failUnless(t.settings["List-type"] == [], + t.settings["List-type"]) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'List-type':[]}], + t.storage) + t.list_type.append(5) # external modification not detected yet + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'List-type':[]}], + t.storage) + self.failUnless(t.settings["List-type"] == [5], + t.settings["List-type"]) + self.failUnless(t.list_type == [5], t.list_type)# get triggers save + self.failUnless(len(t.storage) == 2, len(t.storage)) + self.failUnless(t.storage == [{'List-type':[]}, + {'List-type':[5]}], + t.storage) + + unitsuite = unittest.TestLoader().loadTestsFromTestCase( \ + SavedSettingsObjectTests) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/util/upgrade.py b/libbe/storage/util/upgrade.py new file mode 100644 index 0000000..f3c4912 --- /dev/null +++ b/libbe/storage/util/upgrade.py @@ -0,0 +1,331 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Handle conversion between the various BE storage formats. +""" + +import codecs +import os, os.path +import sys + +import libbe +import libbe.bug +import libbe.storage.util.mapfile as mapfile +from libbe.storage import STORAGE_VERSIONS, STORAGE_VERSION +#import libbe.storage.vcs # delay import to avoid cyclic dependency +import libbe.ui.util.editor +import libbe.util +import libbe.util.encoding as encoding +import libbe.util.id + + +class Upgrader (object): + "Class for converting between different on-disk BE storage formats." + initial_version = None + final_version = None + def __init__(self, repo): + import libbe.storage.vcs + + self.repo = repo + vcs_name = self._get_vcs_name() + if vcs_name == None: + vcs_name = 'None' + self.vcs = libbe.storage.vcs.vcs_by_name(vcs_name) + self.vcs.repo = self.repo + self.vcs.root() + + def get_path(self, *args): + """ + Return the absolute path using args relative to .be. + """ + dir = os.path.join(self.repo, '.be') + if len(args) == 0: + return dir + return os.path.join(dir, *args) + + def _get_vcs_name(self): + return None + + def check_initial_version(self): + path = self.get_path('version') + version = encoding.get_file_contents(path, decode=True).rstrip('\n') + assert version == self.initial_version, '%s: %s' % (path, version) + + def set_version(self): + path = self.get_path('version') + encoding.set_file_contents(path, self.final_version+'\n') + self.vcs._vcs_update(path) + + def upgrade(self): + print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \ + % (self.initial_version, self.final_version) + self.check_initial_version() + self.set_version() + self._upgrade() + + def _upgrade(self): + raise NotImplementedError + + +class Upgrade_1_0_to_1_1 (Upgrader): + initial_version = "Bugs Everywhere Tree 1 0" + final_version = "Bugs Everywhere Directory v1.1" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = encoding.get_file_contents(path) + for line in settings.splitlines(False): + fields = line.split('=') + if len(fields) == 2 and fields[0] == 'rcs_name': + return fields[1] + return None + + def _upgrade_mapfile(self, path): + contents = encoding.get_file_contents(path, decode=True) + old_format = False + for line in contents.splitlines(): + if len(line.split('=')) == 2: + old_format = True + break + if old_format == True: + # translate to YAML. + newlines = [] + for line in contents.splitlines(): + line = line.rstrip('\n') + if len(line) == 0: + continue + fields = line.split("=") + if len(fields) == 2: + key,value = fields + newlines.append('%s: "%s"' % (key, value.replace('"','\\"'))) + else: + newlines.append(line) + contents = '\n'.join(newlines) + # load the YAML and save + map = mapfile.parse(contents) + contents = mapfile.generate(map) + encoding.set_file_contents(path, contents) + self.vcs._vcs_update(path) + + def _upgrade(self): + """ + Comment value field "From" -> "Author". + Homegrown mapfile -> YAML. + """ + path = self.get_path('settings') + self._upgrade_mapfile(path) + for bug_uuid in os.listdir(self.get_path('bugs')): + path = self.get_path('bugs', bug_uuid, 'values') + self._upgrade_mapfile(path) + c_path = ['bugs', bug_uuid, 'comments'] + if not os.path.exists(self.get_path(*c_path)): + continue # no comments for this bug + for comment_uuid in os.listdir(self.get_path(*c_path)): + path_list = c_path + [comment_uuid, 'values'] + path = self.get_path(*path_list) + self._upgrade_mapfile(path) + settings = mapfile.parse( + encoding.get_file_contents(path)) + if 'From' in settings: + settings['Author'] = settings.pop('From') + encoding.set_file_contents( + path, mapfile.generate(settings)) + self.vcs._vcs_update(path) + + +class Upgrade_1_1_to_1_2 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.1" + final_version = "Bugs Everywhere Directory v1.2" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'rcs_name' in settings: + return settings['rcs_name'] + return None + + def _upgrade(self): + """ + BugDir settings field "rcs_name" -> "vcs_name". + """ + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'rcs_name' in settings: + settings['vcs_name'] = settings.pop('rcs_name') + encoding.set_file_contents(path, mapfile.generate(settings)) + self.vcs._vcs_update(path) + +class Upgrade_1_2_to_1_3 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.2" + final_version = "Bugs Everywhere Directory v1.3" + def __init__(self, *args, **kwargs): + Upgrader.__init__(self, *args, **kwargs) + self._targets = {} # key: target text,value: new target bug + + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'vcs_name' in settings: + return settings['vcs_name'] + return None + + def _save_bug_settings(self, bug): + # The target bugs don't have comments + path = self.get_path('bugs', bug.uuid, 'values') + if not os.path.exists(path): + self.vcs._add_path(path, directory=False) + path = self.get_path('bugs', bug.uuid, 'values') + mf = mapfile.generate(bug._get_saved_settings()) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _target_bug(self, target_text): + if target_text not in self._targets: + bug = libbe.bug.Bug(summary=target_text) + bug.severity = 'target' + self._targets[target_text] = bug + return self._targets[target_text] + + def _upgrade_bugdir_mapfile(self): + path = self.get_path('settings') + mf = encoding.get_file_contents(path) + if mf == libbe.util.InvalidObject: + return # settings file does not exist + settings = mapfile.parse(mf) + if 'target' in settings: + settings['target'] = self._target_bug(settings['target']).uuid + mf = mapfile.generate(settings) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _upgrade_bug_mapfile(self, bug_uuid): + import libbe.command.depend as dep + path = self.get_path('bugs', bug_uuid, 'values') + mf = encoding.get_file_contents(path) + if mf == libbe.util.InvalidObject: + return # settings file does not exist + settings = mapfile.parse(mf) + if 'target' in settings: + target_bug = self._target_bug(settings['target']) + + blocked_by_string = '%s%s' % (dep.BLOCKED_BY_TAG, bug_uuid) + dep._add_remove_extra_string(target_bug, blocked_by_string, add=True) + blocks_string = dep._generate_blocks_string(target_bug) + estrs = settings.get('extra_strings', []) + estrs.append(blocks_string) + settings['extra_strings'] = sorted(estrs) + + settings.pop('target') + mf = mapfile.generate(settings) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _upgrade(self): + """ + Bug value field "target" -> target bugs. + Bugdir value field "target" -> pointer to current target bug. + """ + for bug_uuid in os.listdir(self.get_path('bugs')): + self._upgrade_bug_mapfile(bug_uuid) + self._upgrade_bugdir_mapfile() + for bug in self._targets.values(): + self._save_bug_settings(bug) + +class Upgrade_1_3_to_1_4 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.3" + final_version = "Bugs Everywhere Directory v1.4" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'vcs_name' in settings: + return settings['vcs_name'] + return None + + def _upgrade(self): + """ + add new directory "./be/BUGDIR-UUID" + "./be/bugs" -> "./be/BUGDIR-UUID/bugs" + "./be/settings" -> "./be/BUGDIR-UUID/settings" + """ + self.repo = os.path.abspath(self.repo) + basenames = [p for p in os.listdir(self.get_path())] + if not 'bugs' in basenames and not 'settings' in basenames \ + and len([p for p in basenames if len(p)==36]) == 1: + return # the user has upgraded the directory. + basenames = [p for p in basenames if p in ['bugs','settings']] + uuid = libbe.util.id.uuid_gen() + add = [self.get_path(uuid)] + move = [(self.get_path(p), self.get_path(uuid, p)) for p in basenames] + msg = ['Upgrading BE directory version v1.3 to v1.4', + '', + "Because BE's VCS drivers don't support 'move',", + 'please make the following changes with your VCS', + 'and re-run BE. Note that you can choose a different', + 'bugdir UUID to preserve uniformity across branches', + 'of a distributed repository.' + '', + 'add', + ' ' + '\n '.join(add), + 'move', + ' ' + '\n '.join(['%s %s' % (a,b) for a,b in move]), + ] + self.vcs._cached_path_id.destroy() + raise Exception('Need user assistance\n%s' % '\n'.join(msg)) + + +upgraders = [Upgrade_1_0_to_1_1, + Upgrade_1_1_to_1_2, + Upgrade_1_2_to_1_3, + Upgrade_1_3_to_1_4] +upgrade_classes = {} +for upgrader in upgraders: + upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader + +def upgrade(path, current_version, + target_version=STORAGE_VERSION): + """ + Call the appropriate upgrade function to convert current_version + to target_version. If a direct conversion function does not exist, + use consecutive conversion functions. + """ + if current_version not in STORAGE_VERSIONS: + raise NotImplementedError, \ + "Cannot handle version '%s' yet." % current_version + if target_version not in STORAGE_VERSIONS: + raise NotImplementedError, \ + "Cannot handle version '%s' yet." % current_version + + if (current_version, target_version) in upgrade_classes: + # direct conversion + upgrade_class = upgrade_classes[(current_version, target_version)] + u = upgrade_class(path) + u.upgrade() + else: + # consecutive single-step conversion + i = STORAGE_VERSIONS.index(current_version) + while True: + version_a = STORAGE_VERSIONS[i] + version_b = STORAGE_VERSIONS[i+1] + try: + upgrade_class = upgrade_classes[(version_a, version_b)] + except KeyError: + raise NotImplementedError, \ + "Cannot convert version '%s' to '%s' yet." \ + % (version_a, version_b) + u = upgrade_class(path) + u.upgrade() + if version_b == target_version: + break + i += 1 diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py new file mode 100644 index 0000000..552d43e --- /dev/null +++ b/libbe/storage/vcs/__init__.py @@ -0,0 +1,41 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Define the Version Controlled System (VCS)-based +:class:`~libbe.storage.base.Storage` and +:class:`~libbe.storage.base.VersionedStorage` implementations. + +There is a base class (:class:`~libbe.storage.vcs.VCS`) translating +Storage language to VCS language, and a number of `VCS` implementations: + +* :class:`~libbe.storage.vcs.arch.Arch` +* :class:`~libbe.storage.vcs.bzr.Bzr` +* :class:`~libbe.storage.vcs.darcs.Darcs` +* :class:`~libbe.storage.vcs.git.Git` +* :class:`~libbe.storage.vcs.hg.Hg` + +The base `VCS` class also serves as a filesystem Storage backend (not +versioning) in the event that a user has no VCS installed. +""" + +import base + +set_preferred_vcs = base.set_preferred_vcs +vcs_by_name = base.vcs_by_name +detect_vcs = base.detect_vcs +installed_vcs = base.installed_vcs + +__all__ = [set_preferred_vcs, vcs_by_name, detect_vcs, installed_vcs] diff --git a/libbe/storage/vcs/arch.py b/libbe/storage/vcs/arch.py new file mode 100644 index 0000000..3a50414 --- /dev/null +++ b/libbe/storage/vcs/arch.py @@ -0,0 +1,441 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Ben Finney <benf@cybersource.com.au> +# Gianluca Montecchi <gian@grys.it> +# James Rowe <jnrowe@ukfsn.org> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""GNU Arch_ (tla) backend. + +.. _Arch: http://www.gnu.org/software/gnu-arch/ +""" + +import codecs +import os +import os.path +import re +import shutil +import sys +import time # work around http://mercurial.selenic.com/bts/issue618 + +import libbe +import libbe.ui.util.user +import libbe.storage.util.config +from libbe.util.id import uuid_gen +from libbe.util.subproc import CommandError +import base + +if libbe.TESTING == True: + import unittest + import doctest + + +class CantAddFile(Exception): + def __init__(self, file): + self.file = file + Exception.__init__(self, "Can't automatically add file %s" % file) + +DEFAULT_CLIENT = 'tla' + +client = libbe.storage.util.config.get_val( + 'arch_client', default=DEFAULT_CLIENT) + +def new(): + return Arch() + +class Arch(base.VCS): + """:class:`base.VCS` implementation for GNU Arch. + """ + name = 'arch' + client = client + _archive_name = None + _archive_dir = None + _tmp_archive = False + _project_name = None + _tmp_project = False + _arch_paramdir = os.path.expanduser('~/.arch-params') + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + self.interspersed_vcs_files = True + self.paranoid = False + self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618 + + def _vcs_version(self): + status,output,error = self._u_invoke_client('--version') + version = '\n'.join(output.splitlines()[:2]) + return version + + def _vcs_detect(self, path): + """Detect whether a directory is revision-controlled using Arch""" + if self._u_search_parent_directories(path, '{arch}') != None : + libbe.storage.util.config.set_val('arch_client', client) + return True + return False + + def _vcs_init(self, path): + self._create_archive(path) + self._create_project(path) + self._add_project_code(path) + + def _create_archive(self, path): + """Create a temporary Arch archive in the directory PATH. This + archive will be removed by:: + + destroy->_vcs_destroy->_remove_archive + """ + # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive + assert self._archive_name == None + id = self.get_user_id() + name, email = libbe.ui.util.user.parse_user_id(id) + if email == None: + email = '%s@example.com' % name + trailer = '%s-%s' % ('bugs-everywhere-auto', uuid_gen()[0:8]) + self._archive_name = '%s--%s' % (email, trailer) + self._archive_dir = '/tmp/%s' % trailer + self._tmp_archive = True + self._u_invoke_client('make-archive', self._archive_name, + self._archive_dir, cwd=path) + + def _invoke_client(self, *args, **kwargs): + """Invoke the client on our archive. + """ + assert self._archive_name != None + command = args[0] + if len(args) > 1: + tailargs = args[1:] + else: + tailargs = [] + arglist = [command, '-A', self._archive_name] + arglist.extend(tailargs) + args = tuple(arglist) + return self._u_invoke_client(*args, **kwargs) + + def _remove_archive(self): + assert self._tmp_archive == True + assert self._archive_dir != None + assert self._archive_name != None + os.remove(os.path.join(self._arch_paramdir, + '=locations', self._archive_name)) + shutil.rmtree(self._archive_dir) + self._tmp_archive = False + self._archive_dir = False + self._archive_name = False + + def _create_project(self, path): + """ + Create a temporary Arch project in the directory PATH. This + project will be removed by + destroy->_vcs_destroy->_remove_project + """ + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project + category = 'bugs-everywhere' + branch = 'mainline' + version = '0.1' + self._project_name = '%s--%s--%s' % (category, branch, version) + self._invoke_client('archive-setup', self._project_name, + cwd=path) + self._tmp_project = True + + def _remove_project(self): + assert self._tmp_project == True + assert self._project_name != None + assert self._archive_dir != None + shutil.rmtree(os.path.join(self._archive_dir, self._project_name)) + self._tmp_project = False + self._project_name = False + + def _archive_project_name(self): + assert self._archive_name != None + assert self._project_name != None + return '%s/%s' % (self._archive_name, self._project_name) + + def _adjust_naming_conventions(self, path): + """Adjust `Arch naming conventions`_ so ``.be`` is considered source + code. + + By default, Arch restricts source code filenames to:: + + ^[_=a-zA-Z0-9].*$ + + Since our bug directory ``.be`` doesn't satisfy these conventions, + we need to adjust them. The conventions are specified in:: + + project-root/{arch}/=tagging-method + + .. _Arch naming conventions: + http://regexps.srparish.net/tutorial-tla/naming-conventions.html + """ + tagpath = os.path.join(path, '{arch}', '=tagging-method') + lines_out = [] + f = codecs.open(tagpath, 'r', self.encoding) + for line in f: + if line.startswith('source '): + lines_out.append('source ^[._=a-zA-X0-9].*$\n') + else: + lines_out.append(line) + f.close() + f = codecs.open(tagpath, 'w', self.encoding) + f.write(''.join(lines_out)) + f.close() + + def _add_project_code(self, path): + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-source.html + # http://regexps.srparish.net/tutorial-tla/importing-first.html + self._invoke_client('init-tree', self._project_name, + cwd=path) + self._adjust_naming_conventions(path) + self._invoke_client('import', '--summary', 'Began versioning', + cwd=path) + + def _vcs_destroy(self): + if self._tmp_project == True: + self._remove_project() + if self._tmp_archive == True: + self._remove_archive() + vcs_dir = os.path.join(self.repo, '{arch}') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + self._archive_name = None + + def _vcs_root(self, path): + if not os.path.isdir(path): + dirname = os.path.dirname(path) + else: + dirname = path + status,output,error = self._u_invoke_client('tree-root', dirname) + root = output.rstrip('\n') + + self._get_archive_project_name(root) + + return root + + def _get_archive_name(self, root): + status,output,error = self._u_invoke_client('archives') + lines = output.split('\n') + # e.g. output: + # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52 + # /tmp/BEtestXXXXXX/rootdir + # (+ repeats) + for archive,location in zip(lines[::2], lines[1::2]): + if os.path.realpath(location) == os.path.realpath(root): + self._archive_name = archive + assert self._archive_name != None + + def _get_archive_project_name(self, root): + # get project names + status,output,error = self._u_invoke_client('tree-version', cwd=root) + # e.g output + # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1 + archive_name,project_name = output.rstrip('\n').split('/') + self._archive_name = archive_name + self._project_name = project_name + + def _vcs_get_user_id(self): + try: + status,output,error = self._u_invoke_client('my-id') + return output.rstrip('\n') + except Exception, e: + if 'no arch user id set' in e.args[0]: + return None + else: + raise + + def _vcs_add(self, path): + self._u_invoke_client('add-id', path) + realpath = os.path.realpath(self._u_abspath(path)) + pathAdded = realpath in self._list_added(self.repo) + if self.paranoid and not pathAdded: + self._force_source(path) + + def _list_added(self, root): + assert os.path.exists(root) + assert os.access(root, os.X_OK) + root = os.path.realpath(root) + status,output,error = self._u_invoke_client('inventory', '--source', + '--both', '--all', root) + inv_str = output.rstrip('\n') + return [os.path.join(root, p) for p in inv_str.split('\n')] + + def _add_dir_rule(self, rule, dirname, root): + inv_path = os.path.join(dirname, '.arch-inventory') + f = codecs.open(inv_path, 'a', self.encoding) + f.write(rule) + f.close() + if os.path.realpath(inv_path) not in self._list_added(root): + paranoid = self.paranoid + self.paranoid = False + self.add(inv_path) + self.paranoid = paranoid + + def _force_source(self, path): + rule = 'source %s\n' % self._u_rel_path(path) + self._add_dir_rule(rule, os.path.dirname(path), self.repo) + if os.path.realpath(path) not in self._list_added(self.repo): + raise CantAddFile(path) + + def _vcs_remove(self, path): + if self._vcs_is_versioned(path): + self._u_invoke_client('delete-id', path) + arch_ids = os.path.join(self.repo, path, '.arch-ids') + if os.path.exists(arch_ids): + shutil.rmtree(arch_ids) + + def _vcs_update(self, path): + self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618 + + def _vcs_is_versioned(self, path): + if '.arch-ids' in path: + return False + return True + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + else: + relpath = self._file_find(path, revision, relpath=True) + return base.VCS._vcs_get_file_contents(self, relpath) + + def _file_find(self, path, revision, relpath=False): + try: + status,output,error = \ + self._invoke_client( + 'file-find', '--unescaped', path, revision) + path = output.rstrip('\n').splitlines()[-1] + except CommandError, e: + if e.status == 2 \ + and 'illegally formed changeset index' in e.stderr: + raise NotImplementedError( +"""Outstanding tla bug, see + https://bugs.launchpad.net/ubuntu/+source/tla/+bug/513472 +""") + raise + if relpath == True: + return path + return os.path.abspath(os.path.join(self.repo, path)) + + def _vcs_path(self, id, revision): + return self._u_find_id(id, revision) + + def _vcs_isdir(self, path, revision): + abspath = self._file_find(path, revision) + return os.path.isdir(abspath) + + def _vcs_listdir(self, path, revision): + abspath = self._file_find(path, revision) + return [p for p in os.listdir(abspath) if self._vcs_is_versioned(p)] + + def _vcs_commit(self, commitfile, allow_empty=False): + if allow_empty == False: + # arch applies empty commits without complaining, so check first + status,output,error = self._u_invoke_client('changes',expect=(0,1)) + if status == 0: + # work around http://mercurial.selenic.com/bts/issue618 + time.sleep(1) + for path in self.__updated: + os.utime(os.path.join(self.repo, path), None) + self.__updated = [] + status,output,error = self._u_invoke_client('changes',expect=(0,1)) + if status == 0: + # end work around + raise base.EmptyCommit() + summary,body = self._u_parse_commitfile(commitfile) + args = ['commit', '--summary', summary] + if body != None: + args.extend(['--log-message',body]) + status,output,error = self._u_invoke_client(*args) + revision = None + revline = re.compile('[*] committed (.*)') + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revpath = match.groups()[0] + assert not " " in revpath, revpath + assert revpath.startswith(self._archive_project_name()+'--') + revision = revpath[len(self._archive_project_name()+'--'):] + return revpath + + def _vcs_revision_id(self, index): + status,output,error = self._u_invoke_client('logs') + logs = output.splitlines() + first_log = logs.pop(0) + assert first_log == 'base-0', first_log + try: + if index > 0: + log = logs[index-1] + elif index < 0: + log = logs[index] + else: + return None + except IndexError: + return None + return '%s--%s' % (self._archive_project_name(), log) + + def _diff(self, revision): + status,output,error = self._u_invoke_client( + 'diff', '--summary', '--unescaped', revision, expect=(0,1)) + return output + + def _parse_diff(self, diff_text): + """ + Example diff text: + + * local directory is at ... + * build pristine tree for ... + * from import revision: ... + * patching for revision: ... + * comparing to ... + D .be/dir/bugs/.arch-ids/moved.id + D .be/dir/bugs/.arch-ids/removed.id + D .be/dir/bugs/moved + D .be/dir/bugs/removed + A .be/dir/bugs/.arch-ids/moved2.id + A .be/dir/bugs/.arch-ids/new.id + A .be/dir/bugs/moved2 + A .be/dir/bugs/new + A {arch}/bugs-everywhere/bugs-everywhere--mainline/... + M .be/dir/bugs/modified + """ + new = [] + modified = [] + removed = [] + lines = diff_text.splitlines() + for i,line in enumerate(lines): + if line.startswith('* ') or '/.arch-ids/' in line: + continue + change,file = line.split(' ',1) + if file.startswith('{arch}/'): + continue + if change == 'A': + new.append(file) + elif change == 'M': + modified.append(file) + elif change == 'D': + removed.append(file) + return (new,modified,removed) + + def _vcs_changed(self, revision): + return self._parse_diff(self._diff(revision)) + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Arch, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py new file mode 100644 index 0000000..d85c94d --- /dev/null +++ b/libbe/storage/vcs/base.py @@ -0,0 +1,1127 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Alexander Belchenko <bialix@ukr.net> +# Ben Finney <benf@cybersource.com.au> +# Chris Ball <cjb@laptop.org> +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Define the base :class:`VCS` (Version Control System) class, which +should be subclassed by other Version Control System backends. The +base class implements a "do not version" VCS. +""" + +import codecs +import os +import os.path +import re +import shutil +import sys +import tempfile +import types + +import libbe +import libbe.storage +import libbe.storage.base +import libbe.util.encoding +from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID +from libbe.util.utility import Dir, search_parent_directories +from libbe.util.subproc import CommandError, invoke +from libbe.util.plugin import import_by_name +import libbe.storage.util.upgrade as upgrade + +if libbe.TESTING == True: + import unittest + import doctest + + import libbe.ui.util.user + +VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg'] +"""List VCS modules in order of preference. + +Don't list this module, it is implicitly last. +""" + +def set_preferred_vcs(name): + """Manipulate :data:`VCS_ORDER` to place `name` first. + + This is primarily indended for testing purposes. + """ + global VCS_ORDER + assert name in VCS_ORDER, \ + 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER) + VCS_ORDER.remove(name) + VCS_ORDER.insert(0, name) + +def _get_matching_vcs(matchfn): + """Return the first module for which matchfn(VCS_instance) is True. + + Searches in :data:`VCS_ORDER`. + """ + for submodname in VCS_ORDER: + module = import_by_name('libbe.storage.vcs.%s' % submodname) + vcs = module.new() + if matchfn(vcs) == True: + return vcs + return VCS() + +def vcs_by_name(vcs_name): + """Return the module for the VCS with the given name. + + Searches in :data:`VCS_ORDER`. + """ + if vcs_name == VCS.name: + return new() + return _get_matching_vcs(lambda vcs: vcs.name == vcs_name) + +def detect_vcs(dir): + """Return an VCS instance for the vcs being used in this directory. + + Searches in :data:`VCS_ORDER`. + """ + return _get_matching_vcs(lambda vcs: vcs._detect(dir)) + +def installed_vcs(): + """Return an instance of an installed VCS. + + Searches in :data:`VCS_ORDER`. + """ + return _get_matching_vcs(lambda vcs: vcs.installed()) + + +class VCSNotRooted (libbe.storage.base.ConnectionError): + def __init__(self, vcs): + msg = 'VCS not rooted' + libbe.storage.base.ConnectionError.__init__(self, msg) + self.vcs = vcs + +class VCSUnableToRoot (libbe.storage.base.ConnectionError): + def __init__(self, vcs): + msg = 'VCS unable to root' + libbe.storage.base.ConnectionError.__init__(self, msg) + self.vcs = vcs + +class InvalidPath (InvalidID): + def __init__(self, path, root, msg=None, **kwargs): + if msg == None: + msg = 'Path "%s" not in root "%s"' % (path, root) + InvalidID.__init__(self, msg=msg, **kwargs) + self.path = path + self.root = root + +class SpacerCollision (InvalidPath): + def __init__(self, path, spacer): + msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer) + InvalidPath.__init__(self, path, root=None, msg=msg) + self.spacer = spacer + +class NoSuchFile (InvalidID): + def __init__(self, pathname, root='.'): + path = os.path.abspath(os.path.join(root, pathname)) + InvalidID.__init__(self, 'No such file: %s' % path) + + +class CachedPathID (object): + """Cache Storage ID <-> path policy. + + Paths generated following:: + + .../.be/BUGDIR/bugs/BUG/comments/COMMENT + ^-- root path + + See :mod:`libbe.util.id` for a discussion of ID formats. + + Examples + -------- + + >>> dir = Dir() + >>> os.mkdir(os.path.join(dir.path, '.be')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def')) + >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456')) + >>> file(os.path.join(dir.path, '.be', 'abc', 'values'), + ... 'w').close() + >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'), + ... 'w').close() + >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'), + ... 'w').close() + >>> c = CachedPathID() + >>> c.root(dir.path) + >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values')) + 'def/values' + >>> c.init() + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc', 'id-cache'] + >>> c.connect() + >>> c.path('123/values') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/values' + >>> c.disconnect() + >>> c.destroy() + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc'] + >>> c.connect() # demonstrate auto init + >>> sorted(os.listdir(os.path.join(c._root, '.be'))) + ['abc', 'id-cache'] + >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS + u'.../.be/xyz' + >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS + u'.../.be/xyz/def' + >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/comments/qrs' + >>> c.disconnect() + >>> c.connect() + >>> c.path('qrs') # doctest: +ELLIPSIS + u'.../.be/abc/bugs/123/comments/qrs' + >>> c.remove_id('qrs') + >>> c.path('qrs') + Traceback (most recent call last): + ... + InvalidID: qrs in revision None + >>> c.disconnect() + >>> c.destroy() + >>> dir.cleanup() + """ + def __init__(self, encoding=None): + self.encoding = libbe.util.encoding.get_filesystem_encoding() + self._spacer_dirs = ['.be', 'bugs', 'comments'] + + def root(self, path): + self._root = os.path.abspath(path).rstrip(os.path.sep) + self._cache_path = os.path.join( + self._root, self._spacer_dirs[0], 'id-cache') + + def init(self, verbose=True, cache=None): + """Create cache file for an existing .be directory. + + The file contains multiple lines of the form:: + + UUID\tPATH + """ + if cache == None: + self._cache = {} + else: + self._cache = cache + spaced_root = os.path.join(self._root, self._spacer_dirs[0]) + for dirpath, dirnames, filenames in os.walk(spaced_root): + if dirpath == spaced_root: + continue + try: + id = self.id(dirpath) + relpath = dirpath[len(self._root)+1:] + if id.count('/') == 0: + if verbose == True and id in self._cache: + print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath) + self._cache[id] = relpath + except InvalidPath: + pass + if self._cache != cache: + self._changed = True + if cache == None: + self.disconnect() + + def destroy(self): + if os.path.exists(self._cache_path): + os.remove(self._cache_path) + + def connect(self): + if not os.path.exists(self._cache_path): + try: + self.init() + except IOError: + raise libbe.storage.base.ConnectionError + self._cache = {} # key: uuid, value: path + self._changed = False + f = codecs.open(self._cache_path, 'r', self.encoding) + for line in f: + fields = line.rstrip('\n').split('\t') + self._cache[fields[0]] = fields[1] + f.close() + + def disconnect(self): + if self._changed == True: + f = codecs.open(self._cache_path, 'w', self.encoding) + for uuid,path in self._cache.items(): + f.write('%s\t%s\n' % (uuid, path)) + f.close() + self._cache = {} + + def path(self, id, relpath=False): + fields = id.split('/', 1) + uuid = fields[0] + if len(fields) == 1: + extra = [] + else: + extra = fields[1:] + if uuid not in self._cache: + self.init(verbose=False, cache=self._cache) + if uuid not in self._cache: + raise InvalidID(uuid) + if relpath == True: + return os.path.join(self._cache[uuid], *extra) + return os.path.join(self._root, self._cache[uuid], *extra) + + def add_id(self, id, parent=None): + if id.count('/') > 0: + # not a UUID-level path + assert id.startswith(parent), \ + 'Strange ID: "%s" should start with "%s"' % (id, parent) + path = self.path(id) + elif id in self._cache: + # already added + path = self.path(id) + else: + if parent == None: + parent_path = '' + spacer = self._spacer_dirs[0] + else: + assert parent.count('/') == 0, \ + 'Strange parent ID: "%s" should be UUID' % parent + parent_path = self.path(parent, relpath=True) + parent_spacer = parent_path.split(os.path.sep)[-2] + i = self._spacer_dirs.index(parent_spacer) + spacer = self._spacer_dirs[i+1] + path = os.path.join(parent_path, spacer, id) + self._cache[id] = path + self._changed = True + path = os.path.join(self._root, path) + return path + + def remove_id(self, id): + if id.count('/') > 0: + return # not a UUID-level path + self._cache.pop(id) + self._changed = True + + def id(self, path): + path = os.path.join(self._root, path) + if not path.startswith(self._root + os.path.sep): + raise InvalidPath(path, self._root) + path = path[len(self._root)+1:] + orig_path = path + if not path.startswith(self._spacer_dirs[0] + os.path.sep): + raise InvalidPath(path, self._spacer_dirs[0]) + for spacer in self._spacer_dirs: + if not path.startswith(spacer + os.path.sep): + break + id = path[len(spacer)+1:] + fields = path[len(spacer)+1:].split(os.path.sep,1) + if len(fields) == 1: + break + path = fields[1] + for spacer in self._spacer_dirs: + if id.endswith(os.path.sep + spacer): + raise SpacerCollision(orig_path, spacer) + if os.path.sep != '/': + id = id.replace(os.path.sep, '/') + return id + + +def new(): + return VCS() + +class VCS (libbe.storage.base.VersionedStorage): + """Implement a 'no-VCS' interface. + + Support for other VCSs can be added by subclassing this class, and + overriding methods `_vcs_*()` with code appropriate for your VCS. + + The methods `_u_*()` are utility methods available to the `_vcs_*()` + methods. + """ + name = 'None' + client = 'false' # command-line tool for _u_invoke_client + + def __init__(self, *args, **kwargs): + if 'encoding' not in kwargs: + kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding() + libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs) + self.versioned = False + self.interspersed_vcs_files = False + self.verbose_invoke = False + self._cached_path_id = CachedPathID() + self._rooted = False + + def _vcs_version(self): + """ + Return the VCS version string. + """ + return '0' + + def _vcs_get_user_id(self): + """ + Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the VCS has not been configured with a username, return None. + """ + return None + + def _vcs_detect(self, path=None): + """ + Detect whether a directory is revision controlled with this VCS. + """ + return True + + def _vcs_root(self, path): + """ + Get the VCS root. This is the default working directory for + future invocations. You would normally set this to the root + directory for your VCS. + """ + if os.path.isdir(path) == False: + path = os.path.dirname(path) + if path == '': + path = os.path.abspath('.') + return path + + def _vcs_init(self, path): + """ + Begin versioning the tree based at path. + """ + pass + + def _vcs_destroy(self): + """ + Remove any files used in versioning (e.g. whatever _vcs_init() + created). + """ + pass + + def _vcs_add(self, path): + """ + Add the already created file at path to version control. + """ + pass + + def _vcs_exists(self, path, revision=None): + """ + Does the path exist in a given revision? (True/False) + """ + raise NotImplementedError('Lazy BE developers') + + def _vcs_remove(self, path): + """ + Remove the file at path from version control. Optionally + remove the file from the filesystem as well. + """ + pass + + def _vcs_update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + pass + + def _vcs_is_versioned(self, path): + """ + Return true if a path is under version control, False + otherwise. You only need to set this if the VCS goes about + dumping VCS-specific files into the .be directory. + + If you do need to implement this method (e.g. Arch), set + self.interspersed_vcs_files = True + """ + assert self.interspersed_vcs_files == False + raise NotImplementedError + + def _vcs_get_file_contents(self, path, revision=None): + """ + Get the file contents as they were in a given revision. + Revision==None specifies the current revision. + """ + if revision != None: + raise libbe.storage.base.InvalidRevision( + 'The %s VCS does not support revision specifiers' % self.name) + path = os.path.join(self.repo, path) + if not os.path.exists(path): + return libbe.util.InvalidObject + if os.path.isdir(path): + return libbe.storage.base.InvalidDirectory + f = open(path, 'rb') + contents = f.read() + f.close() + return contents + + def _vcs_path(self, id, revision): + """ + Return the relative path to object id as of revision. + + Revision will not be None. + """ + raise NotImplementedError + + def _vcs_isdir(self, path, revision): + """ + Return True if path (as returned by _vcs_path) was a directory + as of revision, False otherwise. + + Revision will not be None. + """ + raise NotImplementedError + + def _vcs_listdir(self, path, revision): + """ + Return a list of the contents of the directory path (as + returned by _vcs_path) as of revision. + + Revision will not be None, and ._vcs_isdir(path, revision) + will be True. + """ + raise NotImplementedError + + def _vcs_commit(self, commitfile, allow_empty=False): + """ + Commit the current working directory, using the contents of + commitfile as the comment. Return the name of the old + revision (or None if commits are not supported). + + If allow_empty == False, raise EmptyCommit if there are no + changes to commit. + """ + return None + + def _vcs_revision_id(self, index): + """ + Return the name of the <index>th revision. Index will be an + integer (possibly <= 0). The choice of which branch to follow + when crossing branches/merges is not defined. + + Return None if revision IDs are not supported, or if the + specified revision does not exist. + """ + return None + + def _vcs_changed(self, revision): + """ + Return a tuple of lists of ids + (new, modified, removed) + from the specified revision to the current situation. + """ + return ([], [], []) + + def version(self): + # Cache version string for efficiency. + if not hasattr(self, '_version'): + self._version = self._get_version() + return self._version + + def _get_version(self): + try: + ret = self._vcs_version() + return ret + except OSError, e: + if e.errno == errno.ENOENT: + return None + else: + raise OSError, e + except CommandError: + return None + + def installed(self): + if self.version() != None: + return True + return False + + def get_user_id(self): + """ + Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the VCS has not been configured with a username, return None. + You can override the automatic lookup procedure by setting the + VCS.user_id attribute to a string of your choice. + """ + if not hasattr(self, 'user_id'): + self.user_id = self._vcs_get_user_id() + return self.user_id + + def _detect(self, path='.'): + """ + Detect whether a directory is revision controlled with this VCS. + """ + return self._vcs_detect(path) + + def root(self): + """Set the root directory to the path's VCS root. + + This is the default working directory for future invocations. + Consider the following usage case: + + You have a project rooted in:: + + /path/to/source/ + + by which I mean the VCS repository is in, for example:: + + /path/to/source/.bzr + + However, you're of in some subdirectory like:: + + /path/to/source/ui/testing + + and you want to comment on a bug. `root` will locate your VCS + root (``/path/to/source/``) and set the repo there. This + means that it doesn't matter where you are in your project + tree when you call "be COMMAND", it always acts as if you called + it from the VCS root. + """ + if self._detect(self.repo) == False: + raise VCSUnableToRoot(self) + root = self._vcs_root(self.repo) + self.repo = os.path.abspath(root) + if os.path.isdir(self.repo) == False: + self.repo = os.path.dirname(self.repo) + self.be_dir = os.path.join( + self.repo, self._cached_path_id._spacer_dirs[0]) + self._cached_path_id.root(self.repo) + self._rooted = True + + def _init(self): + """ + Begin versioning the tree based at self.repo. + Also roots the vcs at path. + + See Also + -------- + root : called if the VCS has already been initialized. + """ + if not os.path.exists(self.repo) or not os.path.isdir(self.repo): + raise VCSUnableToRoot(self) + if self._vcs_detect(self.repo) == False: + self._vcs_init(self.repo) + if self._rooted == False: + self.root() + os.mkdir(self.be_dir) + self._vcs_add(self._u_rel_path(self.be_dir)) + self._setup_storage_version() + self._cached_path_id.init() + + def _destroy(self): + self._vcs_destroy() + self._cached_path_id.destroy() + if os.path.exists(self.be_dir): + shutil.rmtree(self.be_dir) + + def _connect(self): + if self._rooted == False: + self.root() + if not os.path.isdir(self.be_dir): + raise libbe.storage.base.ConnectionError(self) + self._cached_path_id.connect() + self.check_storage_version() + + def _disconnect(self): + self._cached_path_id.disconnect() + + def path(self, id, revision=None, relpath=True): + if revision == None: + path = self._cached_path_id.path(id) + if relpath == True: + return self._u_rel_path(path) + return path + path = self._vcs_path(id, revision) + if relpath == True: + return path + return os.path.join(self.repo, path) + + def _add_path(self, path, directory=False): + relpath = self._u_rel_path(path) + reldirs = relpath.split(os.path.sep) + if directory == False: + reldirs = reldirs[:-1] + dir = self.repo + for reldir in reldirs: + dir = os.path.join(dir, reldir) + if not os.path.exists(dir): + os.mkdir(dir) + self._vcs_add(self._u_rel_path(dir)) + elif not os.path.isdir(dir): + raise libbe.storage.base.InvalidDirectory + if directory == False: + if not os.path.exists(path): + open(path, 'w').close() + self._vcs_add(self._u_rel_path(path)) + + def _add(self, id, parent=None, **kwargs): + path = self._cached_path_id.add_id(id, parent) + self._add_path(path, **kwargs) + + def _exists(self, id, revision=None): + if revision == None: + try: + path = self.path(id, revision, relpath=False) + except InvalidID, e: + return False + return os.path.exists(path) + path = self.path(id, revision, relpath=True) + return self._vcs_exists(relpath, revision) + + def _remove(self, id): + path = self._cached_path_id.path(id) + if os.path.exists(path): + if os.path.isdir(path) and len(self.children(id)) > 0: + raise libbe.storage.base.DirectoryNotEmpty(id) + self._vcs_remove(self._u_rel_path(path)) + if os.path.exists(path): + if os.path.isdir(path): + os.rmdir(path) + else: + os.remove(path) + self._cached_path_id.remove_id(id) + + def _recursive_remove(self, id): + path = self._cached_path_id.path(id) + for dirpath,dirnames,filenames in os.walk(path, topdown=False): + filenames.extend(dirnames) + for f in filenames: + fullpath = os.path.join(dirpath, f) + if os.path.exists(fullpath) == False: + continue + self._vcs_remove(self._u_rel_path(fullpath)) + if os.path.exists(path): + shutil.rmtree(path) + path = self._cached_path_id.path(id, relpath=True) + for id,p in self._cached_path_id._cache.items(): + if p.startswith(path): + self._cached_path_id.remove_id(id) + + def _ancestors(self, id=None, revision=None): + if id==None: + path = self.be_dir + else: + path = self.path(id, revision, relpath=False) + ancestors = [] + while True: + if not path.startswith(self.repo + os.path.sep): + break + path = os.path.dirname(path) + try: + id = self._u_path_to_id(path) + ancestors.append(id) + except (SpacerCollision, InvalidPath): + pass + return ancestors + + def _children(self, id=None, revision=None): + if revision == None: + isdir = os.path.isdir + listdir = os.listdir + else: + isdir = lambda path : self._vcs_isdir( + self._u_rel_path(path), revision) + listdir = lambda path : self._vcs_listdir( + self._u_rel_path(path), revision) + if id==None: + path = self.be_dir + else: + path = self.path(id, revision, relpath=False) + if isdir(path) == False: + return [] + children = listdir(path) + for i,c in enumerate(children): + if c in self._cached_path_id._spacer_dirs: + children[i] = None + children.extend([os.path.join(c, c2) for c2 in + listdir(os.path.join(path, c))]) + elif c in ['id-cache', 'version']: + children[i] = None + elif self.interspersed_vcs_files \ + and self._vcs_is_versioned(c) == False: + children[i] = None + for i,c in enumerate(children): + if c == None: continue + cpath = os.path.join(path, c) + if self.interspersed_vcs_files == True \ + and revision != None \ + and self._vcs_is_versioned(cpath) == False: + children[i] = None + else: + children[i] = self._u_path_to_id(cpath) + children[i] + return [c for c in children if c != None] + + def _get(self, id, default=libbe.util.InvalidObject, revision=None): + try: + relpath = self.path(id, revision, relpath=True) + contents = self._vcs_get_file_contents(relpath, revision) + except InvalidID, e: + if default == libbe.util.InvalidObject: + raise e + return default + if contents in [libbe.storage.base.InvalidDirectory, + libbe.util.InvalidObject] \ + or len(contents) == 0: + if default == libbe.util.InvalidObject: + raise InvalidID(id, revision) + return default + return contents + + def _set(self, id, value): + try: + path = self._cached_path_id.path(id) + except InvalidID, e: + raise + if not os.path.exists(path): + raise InvalidID(id) + if os.path.isdir(path): + raise libbe.storage.base.InvalidDirectory(id) + f = open(path, "wb") + f.write(value) + f.close() + self._vcs_update(self._u_rel_path(path)) + + def _commit(self, summary, body=None, allow_empty=False): + summary = summary.strip()+'\n' + if body is not None: + summary += '\n' + body.strip() + '\n' + descriptor, filename = tempfile.mkstemp() + revision = None + try: + temp_file = os.fdopen(descriptor, 'wb') + temp_file.write(summary) + temp_file.flush() + revision = self._vcs_commit(filename, allow_empty=allow_empty) + temp_file.close() + finally: + os.remove(filename) + return revision + + def revision_id(self, index=None): + if index == None: + return None + try: + if int(index) != index: + raise InvalidRevision(index) + except ValueError: + raise InvalidRevision(index) + revid = self._vcs_revision_id(index) + if revid == None: + raise libbe.storage.base.InvalidRevision(index) + return revid + + def changed(self, revision): + new,mod,rem = self._vcs_changed(revision) + def paths_to_ids(paths): + for p in paths: + try: + id = self._u_path_to_id(p) + yield id + except (SpacerCollision, InvalidPath): + pass + new_id = list(paths_to_ids(new)) + mod_id = list(paths_to_ids(mod)) + rem_id = list(paths_to_ids(rem)) + return (new_id, mod_id, rem_id) + + def _u_any_in_string(self, list, string): + """Return True if any of the strings in list are in string. + Otherwise return False. + """ + for list_string in list: + if list_string in string: + return True + return False + + def _u_invoke(self, *args, **kwargs): + if 'cwd' not in kwargs: + kwargs['cwd'] = self.repo + if 'verbose' not in kwargs: + kwargs['verbose'] = self.verbose_invoke + if 'encoding' not in kwargs: + kwargs['encoding'] = self.encoding + return invoke(*args, **kwargs) + + def _u_invoke_client(self, *args, **kwargs): + cl_args = [self.client] + cl_args.extend(args) + return self._u_invoke(cl_args, **kwargs) + + def _u_search_parent_directories(self, path, filename): + """Find the file (or directory) named filename in path or in any of + path's parents. + + e.g. + search_parent_directories("/a/b/c", ".be") + will return the path to the first existing file from + /a/b/c/.be + /a/b/.be + /a/.be + /.be + or None if none of those files exist. + """ + try: + ret = search_parent_directories(path, filename) + except AssertionError, e: + return None + return ret + + def _u_find_id_from_manifest(self, id, manifest, revision=None): + """Search for the relative path to id using manifest, a list of all + files. + + Returns None if the id is not found. + """ + be_dir = self._cached_path_id._spacer_dirs[0] + be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep + files = [f for f in manifest if f.startswith(be_dir_sep)] + for file in files: + if not file.startswith(be_dir+os.path.sep): + continue + parts = file.split(os.path.sep) + dir = parts.pop(0) # don't add the first spacer dir + for part in parts[:-1]: + dir = os.path.join(dir, part) + if not dir in files: + files.append(dir) + for file in files: + try: + p_id = self._u_path_to_id(file) + if p_id == id: + return file + except (SpacerCollision, InvalidPath): + pass + raise InvalidID(id, revision=revision) + + def _u_find_id(self, id, revision): + """Search for the relative path to id as of revision. + + Returns None if the id is not found. + """ + assert self._rooted == True + be_dir = self._cached_path_id._spacer_dirs[0] + stack = [(be_dir, be_dir)] + while len(stack) > 0: + path,long_id = stack.pop() + if long_id.endswith('/'+id): + return path + if self._vcs_isdir(path, revision) == False: + continue + for child in self._vcs_listdir(path, revision): + stack.append((os.path.join(path, child), + '/'.join([long_id, child]))) + raise InvalidID(id, revision=revision) + + def _u_path_to_id(self, path): + return self._cached_path_id.id(path) + + def _u_rel_path(self, path, root=None): + """Return the relative path to path from root. + + Examples: + + >>> vcs = new() + >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c") + '.be' + >>> vcs._u_rel_path("/a.b/c/", "/a.b/c") + '.' + >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/") + '.' + >>> vcs._u_rel_path("./a", ".") + 'a' + """ + if root == None: + if self.repo == None: + raise VCSNotRooted(self) + root = self.repo + path = os.path.abspath(path) + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + if path in [absRoot, absRootSlashedDir]: + return '.' + if not path.startswith(absRootSlashedDir): + raise InvalidPath(path, absRootSlashedDir) + relpath = path[len(absRootSlashedDir):] + return relpath + + def _u_abspath(self, path, root=None): + """Return the absolute path from a path realtive to root. + + Examples + -------- + + >>> vcs = new() + >>> vcs._u_abspath(".be", "/a.b/c") + '/a.b/c/.be' + """ + if root == None: + assert self.repo != None, "VCS not rooted" + root = self.repo + return os.path.abspath(os.path.join(root, path)) + + def _u_parse_commitfile(self, commitfile): + """Split the commitfile created in self.commit() back into summary and + header lines. + """ + f = codecs.open(commitfile, 'r', self.encoding) + summary = f.readline() + body = f.read() + body.lstrip('\n') + if len(body) == 0: + body = None + f.close() + return (summary, body) + + def check_storage_version(self): + version = self.storage_version() + if version != libbe.storage.STORAGE_VERSION: + upgrade.upgrade(self.repo, version) + + def storage_version(self, revision=None, path=None): + """Return the storage version of the on-disk files. + + See Also + -------- + :mod:`libbe.storage.util.upgrade` + """ + if path == None: + path = os.path.join(self.repo, '.be', 'version') + if not os.path.exists(path): + raise libbe.storage.InvalidStorageVersion(None) + if revision == None: # don't require connection + return libbe.util.encoding.get_file_contents( + path, decode=True).rstrip('\n') + relpath = self._u_rel_path(path) + contents = self._vcs_get_file_contents(relpath, revision=revision) + if type(contents) != types.UnicodeType: + contents = unicode(contents, self.encoding) + return contents.strip() + + def _setup_storage_version(self): + """ + Requires disk access. + """ + assert self._rooted == True + path = os.path.join(self.be_dir, 'version') + if not os.path.exists(path): + libbe.util.encoding.set_file_contents(path, + libbe.storage.STORAGE_VERSION+'\n') + self._vcs_add(self._u_rel_path(path)) + + +if libbe.TESTING == True: + class VCSTestCase (unittest.TestCase): + """ + Test cases for base VCS class (in addition to the Storage test + cases). + """ + + Class = VCS + + def __init__(self, *args, **kwargs): + super(VCSTestCase, self).__init__(*args, **kwargs) + self.dirname = None + + def setUp(self): + """Set up test fixtures for Storage test case.""" + super(VCSTestCase, self).setUp() + self.dir = Dir() + self.dirname = self.dir.path + self.s = self.Class(repo=self.dirname) + if self.s.installed() == True: + self.s.init() + self.s.connect() + + def tearDown(self): + super(VCSTestCase, self).tearDown() + if self.s.installed() == True: + self.s.disconnect() + self.s.destroy() + self.dir.cleanup() + + class VCS_installed_TestCase (VCSTestCase): + def test_installed(self): + """See if the VCS is installed. + """ + self.failUnless(self.s.installed() == True, + '%(name)s VCS not found' % vars(self.Class)) + + + class VCS_detection_TestCase (VCSTestCase): + def test_detection(self): + """See if the VCS detects its installed repository + """ + if self.s.installed(): + self.s.disconnect() + self.failUnless(self.s._detect(self.dirname) == True, + 'Did not detected %(name)s VCS after initialising' + % vars(self.Class)) + self.s.connect() + + def test_no_detection(self): + """See if the VCS detects its installed repository + """ + if self.s.installed() and self.Class.name != 'None': + self.s.disconnect() + self.s.destroy() + self.failUnless(self.s._detect(self.dirname) == False, + 'Detected %(name)s VCS before initialising' + % vars(self.Class)) + self.s.init() + self.s.connect() + + def test_vcs_repo_in_specified_root_path(self): + """VCS root directory should be in specified root path.""" + rp = os.path.realpath(self.s.repo) + dp = os.path.realpath(self.dirname) + vcs_name = self.Class.name + self.failUnless( + dp == rp or rp == None, + "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars()) + + class VCS_get_user_id_TestCase(VCSTestCase): + """Test cases for VCS.get_user_id method.""" + + def test_gets_existing_user_id(self): + """Should get the existing user ID.""" + if self.s.installed(): + user_id = self.s.get_user_id() + if user_id == None: + return + name,email = libbe.ui.util.user.parse_user_id(user_id) + if email != None: + self.failUnless('@' in email, email) + + def make_vcs_testcase_subclasses(vcs_class, namespace): + c = vcs_class() + if c.installed(): + if c.versioned == True: + libbe.storage.base.make_versioned_storage_testcase_subclasses( + vcs_class, namespace) + else: + libbe.storage.base.make_storage_testcase_subclasses( + vcs_class, namespace) + + if namespace != sys.modules[__name__]: + # Make VCSTestCase subclasses for vcs_class in the namespace. + vcs_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, VCSTestCase) \ + and c.Class == VCS] + + for base_class in vcs_testcase_classes: + testcase_class_name = vcs_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = vcs_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + make_vcs_testcase_subclasses(VCS, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py new file mode 100644 index 0000000..5a62968 --- /dev/null +++ b/libbe/storage/vcs/bzr.py @@ -0,0 +1,361 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Ben Finney <benf@cybersource.com.au> +# Gianluca Montecchi <gian@grys.it> +# Marien Zwart <marienz@gentoo.org> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Bazaar_ (bzr) backend. + +.. _Bazaar: http://bazaar.canonical.com/ +""" + +try: + import bzrlib + import bzrlib.branch + import bzrlib.builtins + import bzrlib.config + import bzrlib.errors + import bzrlib.option +except ImportError: + bzrlib = None +import os +import os.path +import re +import shutil +import StringIO +import sys +import types + +import libbe +import base + +if libbe.TESTING == True: + import doctest + import unittest + + +def new(): + return Bzr() + +class Bzr(base.VCS): + """:class:`base.VCS` implementation for Bazaar. + """ + name = 'bzr' + client = None # bzrlib module + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + + def _vcs_version(self): + if bzrlib == None: + return None + return bzrlib.__version__ + + def version_cmp(self, *args): + """Compare the installed Bazaar version `V_i` with another version + `V_o` (given in `*args`). Returns + + === =============== + 1 if `V_i > V_o` + 0 if `V_i == V_o` + -1 if `V_i < V_o` + === =============== + + Examples + -------- + + >>> b = Bzr(repo='.') + >>> b._vcs_version = lambda : "2.3.1 (release)" + >>> b.version_cmp(2,3,1) + 0 + >>> b.version_cmp(2,3,2) + -1 + >>> b.version_cmp(2,3,0) + 1 + >>> b.version_cmp(3) + -1 + >>> b._vcs_version = lambda : "2.0.0pre2" + >>> b._parsed_version = None + >>> b.version_cmp(3) + -1 + >>> b.version_cmp(2,0,1) + Traceback (most recent call last): + ... + NotImplementedError: Cannot parse non-integer portion "0pre2" of Bzr version "2.0.0pre2" + """ + if not hasattr(self, '_parsed_version') \ + or self._parsed_version == None: + num_part = self._vcs_version().split(' ')[0] + self._parsed_version = [] + for num in num_part.split('.'): + try: + self._parsed_version.append(int(num)) + except ValueError, e: + self._parsed_version.append(num) + for current,other in zip(self._parsed_version, args): + if type(current) != types.IntType: + raise NotImplementedError( + 'Cannot parse non-integer portion "%s" of Bzr version "%s"' + % (current, self._vcs_version())) + c = cmp(current,other) + if c != 0: + return c + return 0 + + def _vcs_get_user_id(self): + # excerpted from bzrlib.builtins.cmd_whoami.run() + try: + c = bzrlib.branch.Branch.open_containing(self.repo)[0].get_config() + except errors.NotBranchError: + c = bzrlib.config.GlobalConfig() + return c.username() + + def _vcs_detect(self, path): + if self._u_search_parent_directories(path, '.bzr') != None : + return True + return False + + def _vcs_root(self, path): + """Find the root of the deepest repository containing path.""" + cmd = bzrlib.builtins.cmd_root() + cmd.outf = StringIO.StringIO() + cmd.run(filename=path) + return cmd.outf.getvalue().rstrip('\n') + + def _vcs_init(self, path): + cmd = bzrlib.builtins.cmd_init() + cmd.outf = StringIO.StringIO() + cmd.run(location=path) + + def _vcs_destroy(self): + vcs_dir = os.path.join(self.repo, '.bzr') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + + def _vcs_add(self, path): + path = os.path.join(self.repo, path) + cmd = bzrlib.builtins.cmd_add() + cmd.outf = StringIO.StringIO() + cmd.run(file_list=[path], file_ids_from=self.repo) + + def _vcs_exists(self, path, revision=None): + manifest = self._vcs_listdir( + self.repo, revision=revision, recursive=True) + if path in manifest: + return True + return False + + def _vcs_remove(self, path): + # --force to also remove unversioned files. + path = os.path.join(self.repo, path) + cmd = bzrlib.builtins.cmd_remove() + cmd.outf = StringIO.StringIO() + cmd.run(file_list=[path], file_deletion_strategy='force') + + def _vcs_update(self, path): + pass + + def _parse_revision_string(self, revision=None): + if revision == None: + return revision + rev_opt = bzrlib.option.Option.OPTIONS['revision'] + try: + rev_spec = rev_opt.type(revision) + except bzrlib.errors.NoSuchRevisionSpec: + raise base.InvalidRevision(revision) + return rev_spec + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + path = os.path.join(self.repo, path) + revision = self._parse_revision_string(revision) + cmd = bzrlib.builtins.cmd_cat() + cmd.outf = StringIO.StringIO() + if self.version_cmp(1,6,0) < 0: + # old bzrlib cmd_cat uses sys.stdout not self.outf for output. + stdout = sys.stdout + sys.stdout = cmd.outf + try: + cmd.run(filename=path, revision=revision) + except bzrlib.errors.BzrCommandError, e: + if 'not present in revision' in str(e): + raise base.InvalidPath(path, root=self.repo, revision=revision) + raise + finally: + if self.version_cmp(2,0,0) < 0: + cmd.outf = sys.stdout + sys.stdout = stdout + return cmd.outf.getvalue() + + def _vcs_path(self, id, revision): + manifest = self._vcs_listdir( + self.repo, revision=revision, recursive=True) + return self._u_find_id_from_manifest(id, manifest, revision=revision) + + def _vcs_isdir(self, path, revision): + try: + self._vcs_listdir(path, revision) + except AttributeError, e: + if 'children' in str(e): + return False + raise + return True + + def _vcs_listdir(self, path, revision, recursive=False): + path = os.path.join(self.repo, path) + revision = self._parse_revision_string(revision) + cmd = bzrlib.builtins.cmd_ls() + cmd.outf = StringIO.StringIO() + try: + if self.version_cmp(2,0,0) >= 0: + cmd.run(revision=revision, path=path, recursive=recursive) + else: + # Pre-2.0 Bazaar (non_recursive) + # + working around broken non_recursive+path implementation + # (https://bugs.launchpad.net/bzr/+bug/158690) + cmd.run(revision=revision, path=path, + non_recursive=False) + except bzrlib.errors.BzrCommandError, e: + if 'not present in revision' in str(e): + raise base.InvalidPath(path, root=self.repo, revision=revision) + raise + children = cmd.outf.getvalue().rstrip('\n').splitlines() + children = [self._u_rel_path(c, path) for c in children] + if self.version_cmp(2,0,0) < 0 and recursive == False: + children = [c for c in children if os.path.sep not in c] + return children + + def _vcs_commit(self, commitfile, allow_empty=False): + cmd = bzrlib.builtins.cmd_commit() + cmd.outf = StringIO.StringIO() + cwd = os.getcwd() + os.chdir(self.repo) + try: + cmd.run(file=commitfile, unchanged=allow_empty) + except bzrlib.errors.BzrCommandError, e: + strings = ['no changes to commit.', # bzr 1.3.1 + 'No changes to commit.'] # bzr 1.15.1 + if self._u_any_in_string(strings, str(e)) == True: + raise base.EmptyCommit() + raise + finally: + os.chdir(cwd) + return self._vcs_revision_id(-1) + + def _vcs_revision_id(self, index): + cmd = bzrlib.builtins.cmd_revno() + cmd.outf = StringIO.StringIO() + cmd.run(location=self.repo) + current_revision = int(cmd.outf.getvalue()) + if index > current_revision or index < -current_revision: + return None + if index >= 0: + return str(index) # bzr commit 0 is the empty tree. + return str(current_revision+index+1) + + def _diff(self, revision): + revision = self._parse_revision_string(revision) + cmd = bzrlib.builtins.cmd_diff() + cmd.outf = StringIO.StringIO() + # for some reason, cmd_diff uses sys.stdout not self.outf for output. + stdout = sys.stdout + sys.stdout = cmd.outf + try: + status = cmd.run(revision=revision, file_list=[self.repo]) + finally: + sys.stdout = stdout + assert status in [0,1], "Invalid status %d" % status + return cmd.outf.getvalue() + + def _parse_diff(self, diff_text): + """_parse_diff(diff_text) -> (new,modified,removed) + + `new`, `modified`, and `removed` are lists of files. + + Example diff text:: + + === modified file 'dir/changed' + --- dir/changed 2010-01-16 01:54:53 +0000 + +++ dir/changed 2010-01-16 01:54:54 +0000 + @@ -1,3 +1,3 @@ + hi + -there + +everyone and + joe + + === removed file 'dir/deleted' + --- dir/deleted 2010-01-16 01:54:53 +0000 + +++ dir/deleted 1970-01-01 00:00:00 +0000 + @@ -1,3 +0,0 @@ + -in + -the + -beginning + + === removed file 'dir/moved' + --- dir/moved 2010-01-16 01:54:53 +0000 + +++ dir/moved 1970-01-01 00:00:00 +0000 + @@ -1,4 +0,0 @@ + -the + -ants + -go + -marching + + === added file 'dir/moved2' + --- dir/moved2 1970-01-01 00:00:00 +0000 + +++ dir/moved2 2010-01-16 01:54:34 +0000 + @@ -0,0 +1,4 @@ + +the + +ants + +go + +marching + + === added file 'dir/new' + --- dir/new 1970-01-01 00:00:00 +0000 + +++ dir/new 2010-01-16 01:54:54 +0000 + @@ -0,0 +1,2 @@ + +hello + +world + + """ + new = [] + modified = [] + removed = [] + for line in diff_text.splitlines(): + if not line.startswith('=== '): + continue + fields = line.split() + action = fields[1] + file = fields[-1].strip("'") + if action == 'added': + new.append(file) + elif action == 'modified': + modified.append(file) + elif action == 'removed': + removed.append(file) + return (new,modified,removed) + + def _vcs_changed(self, revision): + return self._parse_diff(self._diff(revision)) + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/darcs.py b/libbe/storage/vcs/darcs.py new file mode 100644 index 0000000..0f23278 --- /dev/null +++ b/libbe/storage/vcs/darcs.py @@ -0,0 +1,399 @@ +# Copyright (C) 2009-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Darcs_ backend. + +.. _Darcs: http://darcs.net/ +""" + +import codecs +import os +import re +import shutil +import sys +import time # work around http://mercurial.selenic.com/bts/issue618 +import types +try: # import core module, Python >= 2.5 + from xml.etree import ElementTree +except ImportError: # look for non-core module + from elementtree import ElementTree +from xml.sax.saxutils import unescape + +import libbe +import base + +if libbe.TESTING == True: + import doctest + import unittest + + +def new(): + return Darcs() + +class Darcs(base.VCS): + """:class:`base.VCS` implementation for Darcs. + """ + name='darcs' + client='darcs' + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618 + + def _vcs_version(self): + status,output,error = self._u_invoke_client('--version') + return output.strip() + + def version_cmp(self, *args): + """Compare the installed Darcs version `V_i` with another version + `V_o` (given in `*args`). Returns + + === =============== + 1 if `V_i > V_o` + 0 if `V_i == V_o` + -1 if `V_i < V_o` + === =============== + + Examples + -------- + + >>> d = Darcs(repo='.') + >>> d._vcs_version = lambda : "2.3.1 (release)" + >>> d.version_cmp(2,3,1) + 0 + >>> d.version_cmp(2,3,2) + -1 + >>> d.version_cmp(2,3,0) + 1 + >>> d.version_cmp(3) + -1 + >>> d._vcs_version = lambda : "2.0.0pre2" + >>> d._parsed_version = None + >>> d.version_cmp(3) + -1 + >>> d.version_cmp(2,0,1) + Traceback (most recent call last): + ... + NotImplementedError: Cannot parse non-integer portion "0pre2" of Darcs version "2.0.0pre2" + """ + if not hasattr(self, '_parsed_version') \ + or self._parsed_version == None: + num_part = self._vcs_version().split(' ')[0] + self._parsed_version = [] + for num in num_part.split('.'): + try: + self._parsed_version.append(int(num)) + except ValueError, e: + self._parsed_version.append(num) + for current,other in zip(self._parsed_version, args): + if type(current) != types.IntType: + raise NotImplementedError( + 'Cannot parse non-integer portion "%s" of Darcs version "%s"' + % (current, self._vcs_version())) + c = cmp(current,other) + if c != 0: + return c + return 0 + + def _vcs_get_user_id(self): + # following http://darcs.net/manual/node4.html#SECTION00410030000000000000 + # as of June 29th, 2009 + if self.repo == None: + return None + darcs_dir = os.path.join(self.repo, '_darcs') + if darcs_dir != None: + for pref_file in ['author', 'email']: + pref_path = os.path.join(darcs_dir, 'prefs', pref_file) + if os.path.exists(pref_path): + return self._vcs_get_file_contents(pref_path).strip() + for env_variable in ['DARCS_EMAIL', 'EMAIL']: + if env_variable in os.environ: + return os.environ[env_variable] + return None + + def _vcs_detect(self, path): + if self._u_search_parent_directories(path, "_darcs") != None : + return True + return False + + def _vcs_root(self, path): + """Find the root of the deepest repository containing path.""" + # Assume that nothing funny is going on; in particular, that we aren't + # dealing with a bare repo. + if os.path.isdir(path) != True: + path = os.path.dirname(path) + darcs_dir = self._u_search_parent_directories(path, '_darcs') + if darcs_dir == None: + return None + return os.path.dirname(darcs_dir) + + def _vcs_init(self, path): + self._u_invoke_client('init', cwd=path) + + def _vcs_destroy(self): + vcs_dir = os.path.join(self.repo, '_darcs') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + + def _vcs_add(self, path): + if os.path.isdir(path): + return + self._u_invoke_client('add', path) + + def _vcs_remove(self, path): + if not os.path.isdir(self._u_abspath(path)): + os.remove(os.path.join(self.repo, path)) # darcs notices removal + + def _vcs_update(self, path): + self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618 + pass # darcs notices changes + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + if self.version_cmp(2, 0, 0) == 1: + status,output,error = self._u_invoke_client( \ + 'show', 'contents', '--patch', revision, path) + return output + # Darcs versions < 2.0.0pre2 lack the 'show contents' command + + patch = self._diff(revision, path=path, unicode_output=False) + + # '--output -' to be supported in GNU patch > 2.5.9 + # but that hasn't been released as of June 30th, 2009. + + # Rewrite path to status before the patch we want + args=['patch', '--reverse', path] + status,output,error = self._u_invoke(args, stdin=patch) + + if os.path.exists(os.path.join(self.repo, path)) == True: + contents = base.VCS._vcs_get_file_contents(self, path) + else: + contents = '' + + # Now restore path to it's current incarnation + args=['patch', path] + status,output,error = self._u_invoke(args, stdin=patch) + return contents + + def _vcs_path(self, id, revision): + return self._u_find_id(id, revision) + + def _vcs_isdir(self, path, revision): + if self.version_cmp(2, 3, 1) == 1: + # Sun Nov 15 20:32:06 EST 2009 thomashartman1@gmail.com + # * add versioned show files functionality (darcs show files -p 'some patch') + status,output,error = self._u_invoke_client( \ + 'show', 'files', '--no-files', '--patch', revision) + children = output.rstrip('\n').splitlines() + rpath = '.' + children = [self._u_rel_path(c, rpath) for c in children] + if path in children: + return True + return False + raise NotImplementedError( + 'Darcs versions <= 2.3.1 lack the --patch option for "show files"') + + def _vcs_listdir(self, path, revision): + if self.version_cmp(2, 3, 1) == 1: + # Sun Nov 15 20:32:06 EST 2009 thomashartman1@gmail.com + # * add versioned show files functionality (darcs show files -p 'some patch') + # Wed Dec 9 05:42:21 EST 2009 Luca Molteni <volothamp@gmail.com> + # * resolve issue835 show file with file directory arguments + path = path.rstrip(os.path.sep) + status,output,error = self._u_invoke_client( \ + 'show', 'files', '--patch', revision, path) + files = output.rstrip('\n').splitlines() + if path == '.': + descendents = [self._u_rel_path(f, path) for f in files + if f != '.'] + else: + descendents = [self._u_rel_path(f, path) for f in files + if f.startswith(path)] + return [f for f in descendents if f.count(os.path.sep) == 0] + # Darcs versions <= 2.3.1 lack the --patch option for 'show files' + raise NotImplementedError + + def _vcs_commit(self, commitfile, allow_empty=False): + id = self.get_user_id() + if id == None or '@' not in id: + id = '%s <%s@invalid.com>' % (id, id) + args = ['record', '--all', '--author', id, '--logfile', commitfile] + status,output,error = self._u_invoke_client(*args) + empty_strings = ['No changes!'] + # work around http://mercurial.selenic.com/bts/issue618 + if self._u_any_in_string(empty_strings, output) == True \ + and len(self.__updated) > 0: + time.sleep(1) + for path in self.__updated: + os.utime(os.path.join(self.repo, path), None) + status,output,error = self._u_invoke_client(*args) + self.__updated = [] + # end work around + if self._u_any_in_string(empty_strings, output) == True: + if allow_empty == False: + raise base.EmptyCommit() + # note that darcs does _not_ make an empty revision. + # this returns the last non-empty revision id... + revision = self._vcs_revision_id(-1) + else: + revline = re.compile("Finished recording patch '(.*)'") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revision = match.groups()[0] + return revision + + def _revisions(self): + """ + Return a list of revisions in the repository. + """ + status,output,error = self._u_invoke_client('changes', '--xml') + revisions = [] + xml_str = output.encode('unicode_escape').replace(r'\n', '\n') + element = ElementTree.XML(xml_str) + assert element.tag == 'changelog', element.tag + for patch in element.getchildren(): + assert patch.tag == 'patch', patch.tag + for child in patch.getchildren(): + if child.tag == 'name': + text = unescape(unicode(child.text).decode('unicode_escape').strip()) + revisions.append(text) + revisions.reverse() + return revisions + + def _vcs_revision_id(self, index): + revisions = self._revisions() + try: + if index > 0: + return revisions[index-1] + elif index < 0: + return revisions[index] + else: + return None + except IndexError: + return None + + def _diff(self, revision, path=None, unicode_output=True): + revisions = self._revisions() + i = revisions.index(revision) + args = ['diff', '--unified'] + if i+1 < len(revisions): + next_rev = revisions[i+1] + args.extend(['--from-patch', next_rev]) + if path != None: + args.append(path) + kwargs = {'unicode_output':unicode_output} + status,output,error = self._u_invoke_client( + *args, **kwargs) + return output + + def _parse_diff(self, diff_text): + """_parse_diff(diff_text) -> (new,modified,removed) + + `new`, `modified`, and `removed` are lists of files. + + Example diff text:: + + Mon Jan 18 15:19:30 EST 2010 None <None@invalid.com> + * Final state + diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/modified new-BEtestgQtDuD/.be/dir/bugs/modified + --- old-BEtestgQtDuD/.be/dir/bugs/modified 2010-01-18 15:19:30.000000000 -0500 + +++ new-BEtestgQtDuD/.be/dir/bugs/modified 2010-01-18 15:19:30.000000000 -0500 + @@ -1 +1 @@ + -some value to be modified + \ No newline at end of file + +a new value + \ No newline at end of file + diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/moved new-BEtestgQtDuD/.be/dir/bugs/moved + --- old-BEtestgQtDuD/.be/dir/bugs/moved 2010-01-18 15:19:30.000000000 -0500 + +++ new-BEtestgQtDuD/.be/dir/bugs/moved 1969-12-31 19:00:00.000000000 -0500 + @@ -1 +0,0 @@ + -this entry will be moved + \ No newline at end of file + diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/moved2 new-BEtestgQtDuD/.be/dir/bugs/moved2 + --- old-BEtestgQtDuD/.be/dir/bugs/moved2 1969-12-31 19:00:00.000000000 -0500 + +++ new-BEtestgQtDuD/.be/dir/bugs/moved2 2010-01-18 15:19:30.000000000 -0500 + @@ -0,0 +1 @@ + +this entry will be moved + \ No newline at end of file + diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/new new-BEtestgQtDuD/.be/dir/bugs/new + --- old-BEtestgQtDuD/.be/dir/bugs/new 1969-12-31 19:00:00.000000000 -0500 + +++ new-BEtestgQtDuD/.be/dir/bugs/new 2010-01-18 15:19:30.000000000 -0500 + @@ -0,0 +1 @@ + +this entry is new + \ No newline at end of file + diff -rN --unified old-BEtestgQtDuD/.be/dir/bugs/removed new-BEtestgQtDuD/.be/dir/bugs/removed + --- old-BEtestgQtDuD/.be/dir/bugs/removed 2010-01-18 15:19:30.000000000 -0500 + +++ new-BEtestgQtDuD/.be/dir/bugs/removed 1969-12-31 19:00:00.000000000 -0500 + @@ -1 +0,0 @@ + -this entry will be deleted + \ No newline at end of file + + """ + new = [] + modified = [] + removed = [] + lines = diff_text.splitlines() + repodir = os.path.basename(self.repo) + os.path.sep + i = 0 + while i < len(lines): + line = lines[i]; i += 1 + if not line.startswith('diff '): + continue + file_a,file_b = line.split()[-2:] + assert file_a.startswith('old-'), \ + 'missformed file_a %s' % file_a + assert file_b.startswith('new-'), \ + 'missformed file_a %s' % file_b + file = file_a[4:] + assert file_b[4:] == file, \ + 'diff file missmatch %s != %s' % (file_a, file_b) + assert file.startswith(repodir), \ + 'missformed file_a %s' % file_a + file = file[len(repodir):] + lines_added = 0 + lines_removed = 0 + line = lines[i]; i += 1 + assert line.startswith('--- old-'), \ + 'missformed "---" line %s' % line + time_a = line.split('\t')[1] + line = lines[i]; i += 1 + assert line.startswith('+++ new-'), \ + 'missformed "+++" line %s' % line + time_b = line.split('\t')[1] + zero_time = time.strftime('%Y-%m-%d %H:%M:%S.000000000 ', + time.localtime(0)) + # note that zero_time is missing the trailing timezone offset + if time_a.startswith(zero_time): + new.append(file) + elif time_b.startswith(zero_time): + removed.append(file) + else: + modified.append(file) + return (new,modified,removed) + + def _vcs_changed(self, revision): + return self._parse_diff(self._diff(revision)) + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/git.py b/libbe/storage/vcs/git.py new file mode 100644 index 0000000..4df9bc8 --- /dev/null +++ b/libbe/storage/vcs/git.py @@ -0,0 +1,269 @@ +# Copyright (C) 2008-2010 Ben Finney <benf@cybersource.com.au> +# Chris Ball <cjb@laptop.org> +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Git_ backend. + +.. _Git: http://git-scm.com/ +""" + +import os +import os.path +import re +import shutil +import unittest + +import libbe +import libbe.ui.util.user +import base + +if libbe.TESTING == True: + import doctest + import sys + + +def new(): + return Git() + +class Git(base.VCS): + """:class:`base.VCS` implementation for Git. + """ + name='git' + client='git' + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + + def _vcs_version(self): + status,output,error = self._u_invoke_client('--version') + return output.strip() + + def _vcs_get_user_id(self): + status,output,error = \ + self._u_invoke_client('config', 'user.name', expect=(0,1)) + if status == 0: + name = output.rstrip('\n') + else: + name = '' + status,output,error = \ + self._u_invoke_client('config', 'user.email', expect=(0,1)) + if status == 0: + email = output.rstrip('\n') + else: + email = '' + if name != '' or email != '': # got something! + # guess missing info, if necessary + if name == '': + name = libbe.ui.util.user.get_fallback_username() + if email == '': + email = libe.ui.util.user.get_fallback_email() + return libbe.ui.util.user.create_user_id(name, email) + return None # Git has no infomation + + def _vcs_detect(self, path): + if self._u_search_parent_directories(path, '.git') != None : + return True + return False + + def _vcs_root(self, path): + """Find the root of the deepest repository containing path.""" + # Assume that nothing funny is going on; in particular, that we aren't + # dealing with a bare repo. + if os.path.isdir(path) != True: + path = os.path.dirname(path) + status,output,error = self._u_invoke_client('rev-parse', '--git-dir', + cwd=path) + gitdir = os.path.join(path, output.rstrip('\n')) + dirname = os.path.abspath(os.path.dirname(gitdir)) + return dirname + + def _vcs_init(self, path): + self._u_invoke_client('init', cwd=path) + + def _vcs_destroy(self): + vcs_dir = os.path.join(self.repo, '.git') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + + def _vcs_add(self, path): + if os.path.isdir(path): + return + self._u_invoke_client('add', path) + + def _vcs_remove(self, path): + if not os.path.isdir(self._u_abspath(path)): + self._u_invoke_client('rm', '-f', path) + + def _vcs_update(self, path): + self._vcs_add(path) + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + else: + arg = '%s:%s' % (revision,path) + status,output,error = self._u_invoke_client('show', arg) + return output + + def _vcs_path(self, id, revision): + return self._u_find_id(id, revision) + + def _vcs_isdir(self, path, revision): + arg = '%s:%s' % (revision,path) + args = ['ls-tree', arg] + kwargs = {'expect':(0,128)} + status,output,error = self._u_invoke_client(*args, **kwargs) + if status != 0: + if 'not a tree object' in error: + return False + raise base.CommandError(args, status, stderr=error) + return True + + def _vcs_listdir(self, path, revision): + arg = '%s:%s' % (revision,path) + status,output,error = self._u_invoke_client( + 'ls-tree', '--name-only', arg) + return output.rstrip('\n').splitlines() + + def _vcs_commit(self, commitfile, allow_empty=False): + args = ['commit', '--all', '--file', commitfile] + if allow_empty == True: + args.append('--allow-empty') + status,output,error = self._u_invoke_client(*args) + else: + kwargs = {'expect':(0,1)} + status,output,error = self._u_invoke_client(*args, **kwargs) + strings = ['nothing to commit', + 'nothing added to commit'] + if self._u_any_in_string(strings, output) == True: + raise base.EmptyCommit() + full_revision = self._vcs_revision_id(-1) + assert full_revision[:7] in output, \ + 'Mismatched revisions:\n%s\n%s' % (full_revision, output) + return full_revision + + def _vcs_revision_id(self, index): + args = ['rev-list', '--first-parent', '--reverse', 'HEAD'] + kwargs = {'expect':(0,128)} + status,output,error = self._u_invoke_client(*args, **kwargs) + if status == 128: + if error.startswith("fatal: ambiguous argument 'HEAD': unknown "): + return None + raise base.CommandError(args, status, stderr=error) + revisions = output.splitlines() + try: + if index > 0: + return revisions[index-1] + elif index < 0: + return revisions[index] + else: + return None + except IndexError: + return None + + def _diff(self, revision): + status,output,error = self._u_invoke_client('diff', revision) + return output + + def _parse_diff(self, diff_text): + """_parse_diff(diff_text) -> (new,modified,removed) + + `new`, `modified`, and `removed` are lists of files. + + Example diff text:: + + diff --git a/dir/changed b/dir/changed + index 6c3ea8c..2f2f7c7 100644 + --- a/dir/changed + +++ b/dir/changed + @@ -1,3 +1,3 @@ + hi + -there + +everyone and + joe + diff --git a/dir/deleted b/dir/deleted + deleted file mode 100644 + index 225ec04..0000000 + --- a/dir/deleted + +++ /dev/null + @@ -1,3 +0,0 @@ + -in + -the + -beginning + diff --git a/dir/moved b/dir/moved + deleted file mode 100644 + index 5ef102f..0000000 + --- a/dir/moved + +++ /dev/null + @@ -1,4 +0,0 @@ + -the + -ants + -go + -marching + diff --git a/dir/moved2 b/dir/moved2 + new file mode 100644 + index 0000000..5ef102f + --- /dev/null + +++ b/dir/moved2 + @@ -0,0 +1,4 @@ + +the + +ants + +go + +marching + diff --git a/dir/new b/dir/new + new file mode 100644 + index 0000000..94954ab + --- /dev/null + +++ b/dir/new + @@ -0,0 +1,2 @@ + +hello + +world + """ + new = [] + modified = [] + removed = [] + lines = diff_text.splitlines() + for i,line in enumerate(lines): + if not line.startswith('diff '): + continue + file_a,file_b = line.split()[-2:] + assert file_a.startswith('a/'), \ + 'missformed file_a %s' % file_a + assert file_b.startswith('b/'), \ + 'missformed file_a %s' % file_b + file = file_a[2:] + assert file_b[2:] == file, \ + 'diff file missmatch %s != %s' % (file_a, file_b) + if lines[i+1].startswith('new '): + new.append(file) + elif lines[i+1].startswith('index '): + modified.append(file) + elif lines[i+1].startswith('deleted '): + removed.append(file) + return (new,modified,removed) + + def _vcs_changed(self, revision): + return self._parse_diff(self._diff(revision)) + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Git, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py new file mode 100644 index 0000000..9378336 --- /dev/null +++ b/libbe/storage/vcs/hg.py @@ -0,0 +1,257 @@ +# Copyright (C) 2007-2010 Aaron Bentley and Panometrics, Inc. +# Ben Finney <benf@cybersource.com.au> +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Mercurial_ (hg) backend. + +.. _Mercurial: http://mercurial.selenic.com/ +""" + +try: + import mercurial + import mercurial.dispatch + import mercurial.ui +except ImportError: + mercurial = None + +try: + # mercurial >= 1.2 + from mercurial.util import version +except ImportError: + try: + # mercurial <= 1.1.2 + from mercurial.version import get_version as version + except ImportError: + version = None + +import os +import os.path +import re +import shutil +import StringIO +import sys +import time # work around http://mercurial.selenic.com/bts/issue618 + +import libbe +import base + +if libbe.TESTING == True: + import doctest + import unittest + + +def new(): + return Hg() + +class Hg(base.VCS): + """:class:`base.VCS` implementation for Mercurial. + """ + name='hg' + client=None # mercurial module + + def __init__(self, *args, **kwargs): + base.VCS.__init__(self, *args, **kwargs) + self.versioned = True + self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618 + + def _vcs_version(self): + if version == None: + return None + return version() + + def _u_invoke_client(self, *args, **kwargs): + if 'cwd' not in kwargs: + kwargs['cwd'] = self.repo + assert len(kwargs) == 1, kwargs + fullargs = ['--cwd', kwargs['cwd']] + fullargs.extend(args) + stdout = sys.stdout + tmp_stdout = StringIO.StringIO() + sys.stdout = tmp_stdout + cwd = os.getcwd() + mercurial.dispatch.dispatch(fullargs) + os.chdir(cwd) + sys.stdout = stdout + return tmp_stdout.getvalue().rstrip('\n') + + def _vcs_get_user_id(self): + output = self._u_invoke_client( + 'showconfig', 'ui.username').rstrip('\n') + if output != '': + return output + return None + + def _vcs_detect(self, path): + """Detect whether a directory is revision-controlled using Mercurial""" + if self._u_search_parent_directories(path, '.hg') != None: + return True + return False + + def _vcs_root(self, path): + return self._u_invoke_client('root', cwd=path) + + def _vcs_init(self, path): + self._u_invoke_client('init', cwd=path) + + def _vcs_destroy(self): + vcs_dir = os.path.join(self.repo, '.hg') + if os.path.exists(vcs_dir): + shutil.rmtree(vcs_dir) + + def _vcs_add(self, path): + self._u_invoke_client('add', path) + + def _vcs_remove(self, path): + self._u_invoke_client('rm', '--force', path) + + def _vcs_update(self, path): + self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618 + + def _vcs_get_file_contents(self, path, revision=None): + if revision == None: + return base.VCS._vcs_get_file_contents(self, path, revision) + else: + return self._u_invoke_client('cat', '-r', revision, path) + + def _vcs_path(self, id, revision): + manifest = self._u_invoke_client( + 'manifest', '--rev', revision).splitlines() + return self._u_find_id_from_manifest(id, manifest, revision=revision) + + def _vcs_isdir(self, path, revision): + output = self._u_invoke_client('manifest', '--rev', revision) + files = output.splitlines() + if path in files: + return False + return True + + def _vcs_listdir(self, path, revision): + output = self._u_invoke_client('manifest', '--rev', revision) + files = output.splitlines() + path = path.rstrip(os.path.sep) + os.path.sep + return [self._u_rel_path(f, path) for f in files if f.startswith(path)] + + def _vcs_commit(self, commitfile, allow_empty=False): + args = ['commit', '--logfile', commitfile] + output = self._u_invoke_client(*args) + # work around http://mercurial.selenic.com/bts/issue618 + strings = ['nothing changed'] + if self._u_any_in_string(strings, output) == True \ + and len(self.__updated) > 0: + time.sleep(1) + for path in self.__updated: + os.utime(os.path.join(self.repo, path), None) + output = self._u_invoke_client(*args) + self.__updated = [] + # end work around + if allow_empty == False: + strings = ['nothing changed'] + if self._u_any_in_string(strings, output) == True: + raise base.EmptyCommit() + return self._vcs_revision_id(-1) + + def _vcs_revision_id(self, index, style='id'): + if index > 0: + index -= 1 + args = ['identify', '--rev', str(int(index)), '--%s' % style] + output = self._u_invoke_client(*args) + id = output.strip() + if id == '000000000000': + return None # before initial commit. + return id + + def _diff(self, revision): + return self._u_invoke_client( + 'diff', '-r', revision, '--git') + + def _parse_diff(self, diff_text): + """_parse_diff(diff_text) -> (new,modified,removed) + + `new`, `modified`, and `removed` are lists of files. + + Example diff text:: + + diff --git a/.be/dir/bugs/modified b/.be/dir/bugs/modified + --- a/.be/dir/bugs/modified + +++ b/.be/dir/bugs/modified + @@ -1,1 +1,1 @@ some value to be modified + -some value to be modified + \ No newline at end of file + +a new value + \ No newline at end of file + diff --git a/.be/dir/bugs/moved b/.be/dir/bugs/moved + deleted file mode 100644 + --- a/.be/dir/bugs/moved + +++ /dev/null + @@ -1,1 +0,0 @@ + -this entry will be moved + \ No newline at end of file + diff --git a/.be/dir/bugs/moved2 b/.be/dir/bugs/moved2 + new file mode 100644 + --- /dev/null + +++ b/.be/dir/bugs/moved2 + @@ -0,0 +1,1 @@ + +this entry will be moved + \ No newline at end of file + diff --git a/.be/dir/bugs/new b/.be/dir/bugs/new + new file mode 100644 + --- /dev/null + +++ b/.be/dir/bugs/new + @@ -0,0 +1,1 @@ + +this entry is new + \ No newline at end of file + diff --git a/.be/dir/bugs/removed b/.be/dir/bugs/removed + deleted file mode 100644 + --- a/.be/dir/bugs/removed + +++ /dev/null + @@ -1,1 +0,0 @@ + -this entry will be deleted + \ No newline at end of file + """ + new = [] + modified = [] + removed = [] + lines = diff_text.splitlines() + for i,line in enumerate(lines): + if not line.startswith('diff '): + continue + file_a,file_b = line.split()[-2:] + assert file_a.startswith('a/'), \ + 'missformed file_a %s' % file_a + assert file_b.startswith('b/'), \ + 'missformed file_a %s' % file_b + file = file_a[2:] + assert file_b[2:] == file, \ + 'diff file missmatch %s != %s' % (file_a, file_b) + if lines[i+1].startswith('new '): + new.append(file) + elif lines[i+1].startswith('deleted '): + removed.append(file) + else: + modified.append(file) + return (new,modified,removed) + + def _vcs_changed(self, revision): + return self._parse_diff(self._diff(revision)) + + +if libbe.TESTING == True: + base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__]) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |