aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2009-12-13 06:19:23 -0500
committerW. Trevor King <wking@drexel.edu>2009-12-13 06:19:23 -0500
commit4d057dab603f42ec40b911dbee6792dcf107bd14 (patch)
tree9a73459aa160e3c96f4893b132543f412ca6e97f /libbe/storage
parentdff6bd9bf89ca80e2265696a478e540476718c9c (diff)
downloadbugseverywhere-4d057dab603f42ec40b911dbee6792dcf107bd14.tar.gz
Converted libbe.storage.vcs.base to new Storage format.
Diffstat (limited to 'libbe/storage')
-rw-r--r--libbe/storage/__init__.py14
-rw-r--r--libbe/storage/base.py139
-rw-r--r--libbe/storage/util/upgrade.py (renamed from libbe/storage/vcs/util/upgrade.py)26
-rw-r--r--libbe/storage/vcs/__init__.py10
-rw-r--r--libbe/storage/vcs/base.py1105
5 files changed, 651 insertions, 643 deletions
diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py
index 9c954ee..5d5b918 100644
--- a/libbe/storage/__init__.py
+++ b/libbe/storage/__init__.py
@@ -5,9 +5,21 @@ import base
ConnectionError = base.ConnectionError
InvalidID = base.InvalidID
InvalidRevision = base.InvalidRevision
+InvalidDirectory = base.InvalidDirectory
NotWriteable = base.NotWriteable
NotReadable = base.NotReadable
EmptyCommit = base.EmptyCommit
+def get_storage(location):
+ """
+ Return a Storage instance from a repo location string.
+ """
+ import vcs
+ #s = vcs.detect_vcs(location)
+ s = vcs.vcs_by_name('None')
+ s.repo = location
+ return s
+
__all__ = [ConnectionError, InvalidID, InvalidRevision,
- NotWriteable, NotReadable, EmptyCommit]
+ InvalidDirectory, NotWriteable, NotReadable,
+ EmptyCommit, get_storage]
diff --git a/libbe/storage/base.py b/libbe/storage/base.py
index eb2b94c..8419796 100644
--- a/libbe/storage/base.py
+++ b/libbe/storage/base.py
@@ -31,6 +31,12 @@ class InvalidID (KeyError):
class InvalidRevision (KeyError):
pass
+class InvalidDirectory (Exception):
+ pass
+
+class DirectoryNotEmpty (InvalidDirectory):
+ pass
+
class NotWriteable (NotSupported):
def __init__(self, msg):
NotSupported.__init__(self, 'write', msg)
@@ -44,7 +50,8 @@ class EmptyCommit(Exception):
Exception.__init__(self, 'No changes to commit')
class Entry (Tree):
- def __init__(self, id, value=None, parent=None, children=None):
+ def __init__(self, id, value=None, parent=None, directory=False,
+ children=None):
if children == None:
Tree.__init__(self)
else:
@@ -53,7 +60,11 @@ class Entry (Tree):
self.value = value
self.parent = parent
if self.parent != None:
+ if self.parent.directory == False:
+ raise InvalidDirectory(
+ 'Non-directory %s cannot have children' % self.parent)
parent.append(self)
+ self.directory = directory
def __str__(self):
return '<Entry %s: %s>' % (self.id, self.value)
@@ -101,7 +112,7 @@ class Storage (object):
"""
name = 'Storage'
- def __init__(self, repo, encoding='utf-8', options=None):
+ def __init__(self, repo='/', encoding='utf-8', options=None):
self.repo = repo
self.encoding = encoding
self.options = options
@@ -113,7 +124,7 @@ class Storage (object):
self.can_init = True
def __str__(self):
- return '<%s %s>' % (self.__class__.__name__, id(self))
+ return '<%s %s %s>' % (self.__class__.__name__, id(self), self.repo)
def __repr__(self):
return str(self)
@@ -139,7 +150,7 @@ class Storage (object):
def _init(self):
f = open(self.repo, 'wb')
- root = Entry(id='__ROOT__')
+ root = Entry(id='__ROOT__', directory=True)
d = {root.id:root}
pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1)
f.close()
@@ -178,7 +189,7 @@ class Storage (object):
f.close()
self._data = None
- def add(self, *args, **kwargs):
+ def add(self, id, *args, **kwargs):
"""Add an entry"""
if self.is_writeable() == False:
raise NotWriteable('Cannot add entry to unwriteable storage.')
@@ -186,13 +197,13 @@ class Storage (object):
self.get(id)
pass # yup, no need to add another
except InvalidID:
- self._add(*args, **kwargs)
+ self._add(id, *args, **kwargs)
- def _add(self, id, parent=None):
+ def _add(self, id, parent=None, directory=False):
if parent == None:
parent = '__ROOT__'
p = self._data[parent]
- self._data[id] = Entry(id, parent=p)
+ self._data[id] = Entry(id, parent=p, directory=directory)
def remove(self, *args, **kwargs):
"""Remove an entry."""
@@ -202,6 +213,9 @@ class Storage (object):
self._remove(*args, **kwargs)
def _remove(self, id):
+ if self._data[id].directory == True \
+ and len(self.children(id)) > 0:
+ raise DirectoryNotEmpty(id)
e = self._data.pop(id)
e.parent.remove(e)
@@ -213,7 +227,7 @@ class Storage (object):
self._recursive_remove(*args, **kwargs)
def _recursive_remove(self, id):
- for entry in self._data[id].traverse():
+ for entry in reversed(list(self._data[id].traverse())):
self._remove(entry.id)
def children(self, *args, **kwargs):
@@ -266,6 +280,9 @@ class Storage (object):
def _set(self, id, value):
if id not in self._data:
raise InvalidID(id)
+ if self._data[id].directory == True:
+ raise InvalidDirectory(
+ 'Directory %s cannot have data' % self.parent)
self._data[id].value = value
class VersionedStorage (Storage):
@@ -283,7 +300,7 @@ class VersionedStorage (Storage):
def _init(self):
f = open(self.repo, 'wb')
- root = Entry(id='__ROOT__')
+ root = Entry(id='__ROOT__', directory=True)
summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit')
body = Entry(id='__COMMIT__BODY__')
initial_commit = {root.id:root, summary.id:summary, body.id:body}
@@ -311,18 +328,21 @@ class VersionedStorage (Storage):
f.close()
self._data = None
- def _add(self, id, parent=None):
+ def _add(self, id, parent=None, directory=False):
if parent == None:
parent = '__ROOT__'
p = self._data[-1][parent]
- self._data[-1][id] = Entry(id, parent=p)
+ self._data[-1][id] = Entry(id, parent=p, directory=directory)
def _remove(self, id):
+ if self._data[-1][id].directory == True \
+ and len(self.children(id)) > 0:
+ raise DirectoryNotEmpty(id)
e = self._data[-1].pop(id)
e.parent.remove(e)
def _recursive_remove(self, id):
- for entry in self._data[-1][id].traverse():
+ for entry in reversed(list(self._data[-1][id].traverse())):
self._remove(entry.id)
def _children(self, id=None, revision=None):
@@ -416,6 +436,7 @@ if TESTING == True:
self.s.disconnect()
self.s.destroy()
self.assert_failed_connect()
+ self.dir.cleanup()
def assert_failed_connect(self):
try:
@@ -440,12 +461,12 @@ if TESTING == True:
"""New repository should be empty."""
self.failUnless(len(self.s.children()) == 0, self.s.children())
- def test_add_rooted(self):
+ def test_add_identical_rooted(self):
"""
Adding entries with the same ID should not increase the number of children.
"""
for i in range(10):
- self.s.add('some id')
+ self.s.add('some id', directory=False)
s = sorted(self.s.children())
self.failUnless(s == ['some id'], s)
@@ -456,7 +477,7 @@ if TESTING == True:
ids = []
for i in range(10):
ids.append(str(i))
- self.s.add(ids[-1])
+ self.s.add(ids[-1], directory=False)
s = sorted(self.s.children())
self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
@@ -464,16 +485,50 @@ if TESTING == True:
"""
Adding entries should increase the number of children (nonrooted).
"""
- self.s.add('parent')
+ self.s.add('parent', directory=True)
ids = []
for i in range(10):
ids.append(str(i))
- self.s.add(ids[-1], 'parent')
+ self.s.add(ids[-1], 'parent', directory=True)
s = sorted(self.s.children('parent'))
self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
s = self.s.children()
self.failUnless(s == ['parent'], s)
-
+
+ def test_children(self):
+ """
+ Non-UUID ids should be returned as such.
+ """
+ self.s.add('parent', directory=True)
+ ids = []
+ for i in range(10):
+ ids.append('parent/%s' % str(i))
+ self.s.add(ids[-1], 'parent', directory=True)
+ s = sorted(self.s.children('parent'))
+ self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+
+ def test_add_invalid_directory(self):
+ """
+ Should not be able to add children to non-directories.
+ """
+ self.s.add('parent', directory=False)
+ try:
+ self.s.add('child', 'parent', directory=False)
+ self.fail(
+ '%s.add() succeeded instead of raising InvalidDirectory'
+ % (vars(self.Class)['name']))
+ except InvalidDirectory:
+ pass
+ try:
+ self.s.add('child', 'parent', directory=True)
+ self.fail(
+ '%s.add() succeeded instead of raising InvalidDirectory'
+ % (vars(self.Class)['name']))
+ except InvalidDirectory:
+ pass
+ self.failUnless(len(self.s.children('parent')) == 0,
+ self.s.children('parent'))
+
def test_remove_rooted(self):
"""
Removing entries should decrease the number of children (rooted).
@@ -481,7 +536,7 @@ if TESTING == True:
ids = []
for i in range(10):
ids.append(str(i))
- self.s.add(ids[-1])
+ self.s.add(ids[-1], directory=True)
for i in range(10):
self.s.remove(ids.pop())
s = sorted(self.s.children())
@@ -491,11 +546,11 @@ if TESTING == True:
"""
Removing entries should decrease the number of children (nonrooted).
"""
- self.s.add('parent')
+ self.s.add('parent', directory=True)
ids = []
for i in range(10):
ids.append(str(i))
- self.s.add(ids[-1], 'parent')
+ self.s.add(ids[-1], 'parent', directory=False)
for i in range(10):
self.s.remove(ids.pop())
s = sorted(self.s.children('parent'))
@@ -503,17 +558,35 @@ if TESTING == True:
s = self.s.children()
self.failUnless(s == ['parent'], s)
+ def test_remove_directory_not_empty(self):
+ """
+ Removing a non-empty directory entry should raise exception.
+ """
+ self.s.add('parent', directory=True)
+ ids = []
+ for i in range(10):
+ ids.append(str(i))
+ self.s.add(ids[-1], 'parent', directory=True)
+ self.s.remove(ids.pop()) # empty directory removal succeeds
+ try:
+ self.s.remove('parent') # empty directory removal succeeds
+ self.fail(
+ "%s.remove() didn't raise DirectoryNotEmpty"
+ % (vars(self.Class)['name']))
+ except DirectoryNotEmpty:
+ pass
+
def test_recursive_remove(self):
"""
Recursive remove should empty the tree.
"""
- self.s.add('parent')
+ self.s.add('parent', directory=True)
ids = []
for i in range(10):
ids.append(str(i))
- self.s.add(ids[-1], 'parent')
+ self.s.add(ids[-1], 'parent', directory=True)
for j in range(10): # add some grandkids
- self.s.add(str(20*i+j), ids[-i])
+ self.s.add(str(20*(i+1)+j), ids[-1], directory=False)
self.s.recursive_remove('parent')
s = sorted(self.s.children())
self.failUnless(s == [], s)
@@ -549,7 +622,7 @@ if TESTING == True:
"""
Data value should be None before any value has been set.
"""
- self.s.add(self.id)
+ self.s.add(self.id, directory=False)
ret = self.s.get(self.id)
self.failUnless(ret == None,
"%s.get() returned %s not None"
@@ -571,7 +644,7 @@ if TESTING == True:
"""
Set should define the value returned by get.
"""
- self.s.add(self.id)
+ self.s.add(self.id, directory=False)
self.s.set(self.id, self.val)
ret = self.s.get(self.id)
self.failUnless(ret == self.val,
@@ -583,7 +656,7 @@ if TESTING == True:
Set should define the value returned by get.
"""
val = u'Fran\xe7ois'
- self.s.add(self.id)
+ self.s.add(self.id, directory=False)
self.s.set(self.id, val)
ret = self.s.get(self.id, decode=True)
self.failUnless(type(ret) == types.UnicodeType,
@@ -612,7 +685,7 @@ if TESTING == True:
"""
Set should define the value returned by get after reconnect.
"""
- self.s.add(self.id)
+ self.s.add(self.id, directory=False)
self.s.set(self.id, self.val)
self.s.disconnect()
self.s.connect()
@@ -625,11 +698,11 @@ if TESTING == True:
"""
Adding entries should increase the number of children after reconnect.
"""
- self.s.add('parent')
+ self.s.add('parent', directory=True)
ids = []
for i in range(10):
ids.append(str(i))
- self.s.add(ids[-1], 'parent')
+ self.s.add(ids[-1], 'parent', directory=False)
self.s.disconnect()
self.s.connect()
s = sorted(self.s.children('parent'))
@@ -707,7 +780,7 @@ if TESTING == True:
"""
def val(i):
return '%s:%d' % (self.val, i+1)
- self.s.add(self.id)
+ self.s.add(self.id, directory=False)
revs = []
for i in range(10):
self.s.set(self.id, val(i))
@@ -716,7 +789,7 @@ if TESTING == True:
for i in range(10):
ret = self.s.get(self.id, revision=revs[i])
self.failUnless(ret == val(i),
- "%s.get() returned %s not %s for revision %d"
+ "%s.get() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret, val(i), revs[i]))
def make_storage_testcase_subclasses(storage_class, namespace):
diff --git a/libbe/storage/vcs/util/upgrade.py b/libbe/storage/util/upgrade.py
index dc9d54f..7ef760e 100644
--- a/libbe/storage/vcs/util/upgrade.py
+++ b/libbe/storage/util/upgrade.py
@@ -15,17 +15,18 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
-Handle conversion between the various on-disk images.
+Handle conversion between the various BE storage formats.
"""
+import codecs
import os, os.path
import sys
import libbe
-import bug
-import encoding
-import mapfile
-import vcs
+import libbe.bug as bug
+import libbe.util.encoding as encoding
+import libbe.storage.util.mapfile as mapfile
+
if libbe.TESTING == True:
import doctest
@@ -39,30 +40,25 @@ BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0",
BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1]
class Upgrader (object):
- "Class for converting "
+ "Class for converting between different on-disk BE storage formats."
initial_version = None
final_version = None
def __init__(self, root):
self.root = root
- # use the "None" VCS to ensure proper encoding/decoding and
- # simplify path construction.
- self.vcs = vcs.vcs_by_name("None")
- self.vcs.root(self.root)
- self.vcs.encoding = encoding.get_encoding()
def get_path(self, *args):
"""
Return a path relative to .root.
"""
- dir = os.path.join(self.root, ".be")
+ dir = os.path.join(self.root, '.be')
if len(args) == 0:
return dir
- assert args[0] in ["version", "settings", "bugs"], str(args)
+ assert args[0] in ['version', 'settings', 'bugs'], str(args)
return os.path.join(dir, *args)
def check_initial_version(self):
- path = self.get_path("version")
- version = self.vcs.get_file_contents(path).rstrip("\n")
+ path = self.get_path('version')
+ version = self.vcs.get_file_contents(path).rstrip('\n')
assert version == self.initial_version, version
def set_version(self):
diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py
new file mode 100644
index 0000000..ddfb00a
--- /dev/null
+++ b/libbe/storage/vcs/__init__.py
@@ -0,0 +1,10 @@
+# Copyright
+
+import base
+
+set_preferred_vcs = base.set_preferred_vcs
+vcs_by_name = base.vcs_by_name
+detect_vcs = base.detect_vcs
+installed_vcs = base.installed_vcs
+
+__all__ = [set_preferred_vcs, vcs_by_name, detect_vcs, installed_vcs]
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
index f8b0727..8c0ecf5 100644
--- a/libbe/storage/vcs/base.py
+++ b/libbe/storage/vcs/base.py
@@ -29,20 +29,23 @@ import codecs
import os
import os.path
import re
-from socket import gethostname
import shutil
import sys
import tempfile
import libbe
-from utility import Dir, search_parent_directories
-from subproc import CommandError, invoke
-from plugin import get_plugin
+import libbe.storage.base
+import libbe.util.encoding
+from libbe.util.utility import Dir, search_parent_directories
+from libbe.util.subproc import CommandError, invoke
+from libbe.util.plugin import import_by_name
+#import libbe.storage.util.upgrade as upgrade
if libbe.TESTING == True:
import unittest
import doctest
+ import libbe.ui.util.user
# List VCS modules in order of preference.
# Don't list this module, it is implicitly last.
@@ -58,62 +61,243 @@ def set_preferred_vcs(name):
def _get_matching_vcs(matchfn):
"""Return the first module for which matchfn(VCS_instance) is true"""
for submodname in VCS_ORDER:
- module = get_plugin('libbe', submodname)
+ module = import_by_name('libbe.storage.vcs.%s' % submodname)
vcs = module.new()
if matchfn(vcs) == True:
return vcs
vcs.cleanup()
return VCS()
-
+
def vcs_by_name(vcs_name):
"""Return the module for the VCS with the given name"""
+ if vcs_name == VCS.name:
+ return new()
return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
def detect_vcs(dir):
"""Return an VCS instance for the vcs being used in this directory"""
- return _get_matching_vcs(lambda vcs: vcs.detect(dir))
+ return _get_matching_vcs(lambda vcs: vcs._detect(dir))
def installed_vcs():
"""Return an instance of an installed VCS"""
return _get_matching_vcs(lambda vcs: vcs.installed())
-
-class SettingIDnotSupported(NotImplementedError):
- pass
-
-class VCSnotRooted(Exception):
+class VCSnotRooted (libbe.storage.base.ConnectionError):
def __init__(self):
- msg = "VCS not rooted"
- Exception.__init__(self, msg)
-
-class PathNotInRoot(Exception):
- def __init__(self, path, root):
- msg = "Path '%s' not in root '%s'" % (path, root)
- Exception.__init__(self, msg)
+ msg = 'VCS not rooted'
+ libbe.storage.base.ConnectionError.__init__(self, msg)
+
+class InvalidPath (libbe.storage.base.InvalidID):
+ def __init__(self, path, root, msg=None):
+ if msg == None:
+ msg = 'Path "%s" not in root "%s"' % (path, root)
+ libbe.storage.base.InvalidID.__init__(self, msg)
self.path = path
self.root = root
-class NoSuchFile(Exception):
- def __init__(self, pathname, root="."):
+class SpacerCollision (InvalidPath):
+ def __init__(self, path, spacer):
+ msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
+ InvalidPath.__init__(self, path, root=None, msg=msg)
+ self.spacer = spacer
+
+class NoSuchFile (libbe.storage.base.InvalidID):
+ def __init__(self, pathname, root='.'):
path = os.path.abspath(os.path.join(root, pathname))
- Exception.__init__(self, "No such file: %s" % path)
+ libbe.storage.base.InvalidID.__init__(self, 'No such file: %s' % path)
-class EmptyCommit(Exception):
- def __init__(self):
- Exception.__init__(self, "No changes to commit")
+
+class CachedPathID (object):
+ """
+ Storage ID <-> path policy.
+ .../.be/BUGDIR/bugs/BUG/comments/COMMENT
+ ^-- root path
+
+ >>> dir = Dir()
+ >>> os.mkdir(os.path.join(dir.path, '.be'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
+ >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
+ ... 'w').close()
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
+ ... 'w').close()
+ >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
+ ... 'w').close()
+ >>> c = CachedPathID()
+ >>> c.root(dir.path)
+ >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
+ 'def/values'
+ >>> c.init()
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc', 'id-cache']
+ >>> c.connect()
+ >>> c.path('123/values') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/values'
+ >>> c.disconnect()
+ >>> c.destroy()
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc']
+ >>> c.connect() # demonstrate auto init
+ >>> sorted(os.listdir(os.path.join(c._root, '.be')))
+ ['abc', 'id-cache']
+ >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
+ u'.../.be/xyz'
+ >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
+ u'.../.be/xyz/def'
+ >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/comments/qrs'
+ >>> c.disconnect()
+ >>> c.connect()
+ >>> c.path('qrs') # doctest: +ELLIPSIS
+ u'.../.be/abc/bugs/123/comments/qrs'
+ >>> c.remove_id('qrs')
+ >>> c.path('qrs')
+ Traceback (most recent call last):
+ ...
+ InvalidID: 'qrs'
+ >>> c.disconnect()
+ >>> c.destroy()
+ >>> dir.cleanup()
+ """
+ def __init__(self, encoding=None):
+ self.encoding = libbe.util.encoding.get_filesystem_encoding()
+ self._spacer_dirs = ['.be', 'bugs', 'comments']
+
+ def root(self, path):
+ self._root = os.path.abspath(path).rstrip(os.path.sep)
+ self._cache_path = os.path.join(
+ self._root, self._spacer_dirs[0], 'id-cache')
+
+ def init(self):
+ """
+ Create cache file for an existing .be directory.
+ File if multiple lines of the form:
+ UUID\tPATH
+ """
+ self._cache = {}
+ spaced_root = os.path.join(self._root, self._spacer_dirs[0])
+ for dirpath, dirnames, filenames in os.walk(spaced_root):
+ if dirpath == spaced_root:
+ continue
+ try:
+ id = self.id(dirpath)
+ relpath = dirpath[len(self._root)+1:]
+ if id.count('/') == 0:
+ self._cache[id] = relpath
+ except InvalidPath:
+ pass
+ self._changed = True
+ self.disconnect()
+
+ def destroy(self):
+ os.remove(self._cache_path)
+
+ def connect(self):
+ if not os.path.exists(self._cache_path):
+ try:
+ self.init()
+ except IOError:
+ raise libbe.storage.base.ConnectionError
+ self._cache = {} # key: uuid, value: path
+ self._changed = False
+ f = codecs.open(self._cache_path, 'r', self.encoding)
+ for line in f:
+ fields = line.rstrip('\n').split('\t')
+ self._cache[fields[0]] = fields[1]
+ f.close()
+
+ def disconnect(self):
+ if self._changed == True:
+ f = codecs.open(self._cache_path, 'w', self.encoding)
+ for uuid,path in self._cache.items():
+ f.write('%s\t%s\n' % (uuid, path))
+ f.close()
+ self._cache = {}
+
+ def path(self, id, relpath=False):
+ fields = id.split('/', 1)
+ uuid = fields[0]
+ if len(fields) == 1:
+ extra = []
+ else:
+ extra = fields[1:]
+ if uuid not in self._cache:
+ raise libbe.storage.base.InvalidID(uuid)
+ if relpath == True:
+ return os.path.join(self._cache[uuid], *extra)
+ return os.path.join(self._root, self._cache[uuid], *extra)
+
+ def add_id(self, id, parent=None):
+ if id.count('/') > 0:
+ # not a UUID-level path
+ assert id.startswith(parent), \
+ 'Strange ID: "%s" should start with "%s"' % (id, parent)
+ path = self.path(id)
+ elif id in self._cache:
+ # already added
+ path = self.path(id)
+ else:
+ if parent == None:
+ parent_path = ''
+ spacer = self._spacer_dirs[0]
+ else:
+ assert parent.count('/') == 0, \
+ 'Strange parent ID: "%s" should be UUID' % parent
+ parent_path = self.path(parent, relpath=True)
+ parent_spacer = parent_path.split(os.path.sep)[-2]
+ i = self._spacer_dirs.index(parent_spacer)
+ spacer = self._spacer_dirs[i+1]
+ path = os.path.join(parent_path, spacer, id)
+ self._cache[id] = path
+ self._changed = True
+ path = os.path.join(self._root, path)
+ return path
+
+ def remove_id(self, id):
+ if id.count('/') > 0:
+ return # not a UUID-level path
+ self._cache.pop(id)
+ self._changed = True
+
+ def id(self, path):
+ path = os.path.abspath(path)
+ if not path.startswith(self._root + os.path.sep):
+ raise InvalidPath('Path %s not in root %s' % (path, self._root))
+ path = path[len(self._root)+1:]
+ orig_path = path
+ if not path.startswith(self._spacer_dirs[0] + os.path.sep):
+ raise InvalidPath(path, self._spacer_dirs[0])
+ for spacer in self._spacer_dirs:
+ if not path.startswith(spacer + os.path.sep):
+ break
+ id = path[len(spacer)+1:]
+ fields = path[len(spacer)+1:].split(os.path.sep,1)
+ if len(fields) == 1:
+ break
+ path = fields[1]
+ for spacer in self._spacer_dirs:
+ if id.endswith(os.path.sep + spacer):
+ raise SpacerCollision(orig_path, spacer)
+ if os.path.sep != '/':
+ id = id.replace(os.path.sep, '/')
+ return id
def new():
return VCS()
-class VCS(object):
+class VCS (libbe.storage.base.VersionedStorage):
"""
This class implements a 'no-vcs' interface.
Support for other VCSs can be added by subclassing this class, and
overriding methods _vcs_*() with code appropriate for your VCS.
-
+
The methods _u_*() are utility methods available to the _vcs_*()
methods.
@@ -127,7 +311,7 @@ class VCS(object):
/path/to/source/.be
However, you're of in some subdirectory like
/path/to/source/GUI/testing
- and you want to comment on a bug. Setting sink_to_root=True wen
+ and you want to comment on a bug. Setting sink_to_root=True when
you initialize your BugDir will cause it to search for the '.be'
file in the ancestors of the path you passed in as 'root'.
/path/to/source/GUI/testing/.be miss
@@ -245,113 +429,110 @@ class VCS(object):
os.listdir(self.get_path("bugs")):
"""
- name = "None"
- client = "" # command-line tool for _u_invoke_client
- versioned = False
- def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()):
- self.paranoid = paranoid
- self.verboseInvoke = False
- self.rootdir = None
- self._duplicateBasedir = None
- self._duplicateDirname = None
- self.encoding = encoding
- def __str__(self):
- return "<%s %s>" % (self.__class__.__name__, id(self))
- def __repr__(self):
- return str(self)
+ name = 'None'
+ client = 'false' # command-line tool for _u_invoke_client
+
+ def __init__(self, *args, **kwargs):
+ if 'encoding' not in kwargs:
+ kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
+ libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
+ self.versioned = False
+ self.verbose_invoke = False
+ self._cached_path_id = CachedPathID()
+
def _vcs_version(self):
"""
Return the VCS version string.
"""
- return "0.0"
+ return '0'
+
+ def _vcs_get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return None.
+ """
+ return None
+
def _vcs_detect(self, path=None):
"""
Detect whether a directory is revision controlled with this VCS.
"""
return True
+
def _vcs_root(self, path):
"""
Get the VCS root. This is the default working directory for
future invocations. You would normally set this to the root
directory for your VCS.
"""
- if os.path.isdir(path)==False:
+ if os.path.isdir(path) == False:
path = os.path.dirname(path)
- if path == "":
- path = os.path.abspath(".")
+ if path == '':
+ path = os.path.abspath('.')
return path
- def _vcs_init(self, path):
+
+ def _vcs_init(self):
"""
- Begin versioning the tree based at path.
+ Begin versioning the tree based at self.repo.
"""
pass
- def _vcs_cleanup(self):
+
+ def _vcs_destroy(self):
"""
- Remove any cruft that _vcs_init() created outside of the
- versioned tree.
+ Remove any files used in versioning (e.g. whatever _vcs_init()
+ created).
"""
pass
- def _vcs_get_user_id(self):
- """
- Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
- If the VCS has not been configured with a username, return None.
- """
- return None
- def _vcs_set_user_id(self, value):
- """
- Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
- This is run if the VCS has not been configured with a usename, so
- that commits will have a reasonable FROM value.
- """
- raise SettingIDnotSupported
+
def _vcs_add(self, path):
"""
Add the already created file at path to version control.
"""
pass
+
def _vcs_remove(self, path):
"""
Remove the file at path from version control. Optionally
remove the file from the filesystem as well.
"""
pass
+
def _vcs_update(self, path):
"""
Notify the versioning system of changes to the versioned file
at path.
"""
pass
- def _vcs_get_file_contents(self, path, revision=None, binary=False):
+
+ def _vcs_get_file_contents(self, path, revision=None):
"""
Get the file contents as they were in a given revision.
Revision==None specifies the current revision.
"""
- assert revision == None, \
- "The %s VCS does not support revision specifiers" % self.name
- if binary == False:
- f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding)
- else:
- f = open(os.path.join(self.rootdir, path), "rb")
+ if revision != None:
+ raise libbe.storage.base.InvalidRevision(
+ 'The %s VCS does not support revision specifiers' % self.name)
+ path = os.path.join(self.repo, path)
+ if not os.path.exists(path):
+ return libbe.util.InvalidObject
+ if os.path.isdir(path):
+ return libbe.storage.base.InvalidDirectory
+ f = open(path, 'rb')
contents = f.read()
f.close()
return contents
- def _vcs_duplicate_repo(self, directory, revision=None):
- """
- Get the repository as it was in a given revision.
- revision==None specifies the current revision.
- dir specifies a directory to create the duplicate in.
- """
- shutil.copytree(self.rootdir, directory, True)
+
def _vcs_commit(self, commitfile, allow_empty=False):
"""
Commit the current working directory, using the contents of
commitfile as the comment. Return the name of the old
revision (or None if commits are not supported).
-
+
If allow_empty == False, raise EmptyCommit if there are no
changes to commit.
"""
return None
+
def _vcs_revision_id(self, index):
"""
Return the name of the <index>th revision. Index will be an
@@ -362,11 +543,13 @@ os.listdir(self.get_path("bugs")):
specified revision does not exist.
"""
return None
+
def version(self):
- """Cache version string for efficiency."""
+ # Cache version string for efficiency.
if not hasattr(self, '_version'):
self._version = self._get_version()
return self._version
+
def _get_version(self):
try:
ret = self._vcs_version()
@@ -378,183 +561,166 @@ os.listdir(self.get_path("bugs")):
raise OSError, e
except CommandError:
return None
+
def installed(self):
if self.version() != None:
return True
return False
- def detect(self, path="."):
- """
- Detect whether a directory is revision controlled with this VCS.
- """
- return self._vcs_detect(path)
- def root(self, path):
- """
- Set the root directory to the path's VCS root. This is the
- default working directory for future invocations.
- """
- self.rootdir = self._vcs_root(path)
- def init(self, path):
- """
- Begin versioning the tree based at path.
- Also roots the vcs at path.
- """
- if os.path.isdir(path)==False:
- path = os.path.dirname(path)
- self._vcs_init(path)
- self.root(path)
- def cleanup(self):
- self._vcs_cleanup()
+
def get_user_id(self):
"""
Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
- If the VCS has not been configured with a username, return the user's
- id. You can override the automatic lookup procedure by setting the
+ If the VCS has not been configured with a username, return None.
+ You can override the automatic lookup procedure by setting the
VCS.user_id attribute to a string of your choice.
"""
- if hasattr(self, "user_id"):
- if self.user_id != None:
- return self.user_id
- id = self._vcs_get_user_id()
- if id == None:
- name = self._u_get_fallback_username()
- email = self._u_get_fallback_email()
- id = self._u_create_id(name, email)
- print >> sys.stderr, "Guessing id '%s'" % id
- try:
- self.set_user_id(id)
- except SettingIDnotSupported:
- pass
- return id
- def set_user_id(self, value):
+ if not hasattr(self, 'user_id'):
+ self.user_id = self._vcs_get_user_id()
+ return self.user_id
+
+ def _detect(self, path='.'):
"""
- Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
- This is run if the VCS has not been configured with a usename, so
- that commits will have a reasonable FROM value.
+ Detect whether a directory is revision controlled with this VCS.
"""
- self._vcs_set_user_id(value)
- def add(self, path):
+ return self._vcs_detect(path)
+
+ def root(self):
"""
- Add the already created file at path to version control.
+ Set the root directory to the path's VCS root. This is the
+ default working directory for future invocations.
"""
- self._vcs_add(self._u_rel_path(path))
- def remove(self, path):
+ self.repo = os.path.abspath(self._vcs_root(self.repo))
+ if os.path.isdir(self.repo) == False:
+ self.repo = os.path.dirname(self.repo)
+ self.be_dir = os.path.join(
+ self.repo, self._cached_path_id._spacer_dirs[0])
+ self._cached_path_id.root(self.repo)
+
+ def _init(self):
"""
- Remove a file from both version control and the filesystem.
+ Begin versioning the tree based at self.repo.
+ Also roots the vcs at path.
"""
- self._vcs_remove(self._u_rel_path(path))
+ self.root()
+ self._vcs_init()
+ os.mkdir(self.be_dir)
+ self._vcs_add(self._u_rel_path(self.be_dir))
+ self._cached_path_id.init()
+
+ def _destroy(self):
+ self._vcs_destroy()
+ self._cached_path_id.destroy()
+ shutil.rmtree(self.be_dir)
+
+ def _connect(self):
+ self.root()
+ self._cached_path_id.connect()
+ self.check_disk_version()
+
+ def disconnect(self):
+ self._cached_path_id.disconnect()
+
+ def _add(self, id, parent=None, directory=False):
+ path = self._cached_path_id.add_id(id, parent)
+ relpath = self._u_rel_path(path)
+ reldirs = relpath.split(os.path.sep)
+ if directory == False:
+ reldirs = reldirs[:-1]
+ dir = self.repo
+ for reldir in reldirs:
+ dir = os.path.join(dir, reldir)
+ if not os.path.exists(dir):
+ os.mkdir(dir)
+ self._vcs_add(self._u_rel_path(dir))
+ elif not os.path.isdir(dir):
+ raise libbe.storage.base.InvalidDirectory
+ if directory == False:
+ if not os.path.exists(path):
+ open(path, 'w').close()
+ self._vcs_add(self._u_rel_path(path))
+
+ def _remove(self, id):
+ path = self._cached_path_id.path(id)
if os.path.exists(path):
- os.remove(path)
- def recursive_remove(self, dirname):
- """
- Remove a file/directory and all its decendents from both
- version control and the filesystem.
- """
- if not os.path.exists(dirname):
- raise NoSuchFile(dirname)
- for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
+ if os.path.isdir(path) and len(self.children(id)) > 0:
+ raise libbe.storage.base.DirectoryNotEmpty(id)
+ self._vcs_remove(self._u_rel_path(path))
+ if os.path.exists(path):
+ if os.path.isdir(path):
+ os.rmdir(path)
+ else:
+ os.remove(path)
+ self._cached_path_id.remove_id(id)
+
+ def _recursive_remove(self, id):
+ path = self._cached_path_id.path(id)
+ for dirpath,dirnames,filenames in os.walk(path, topdown=False):
filenames.extend(dirnames)
- for path in filenames:
- fullpath = os.path.join(dirpath, path)
+ for f in filenames:
+ fullpath = os.path.join(dirpath, f)
if os.path.exists(fullpath) == False:
continue
self._vcs_remove(self._u_rel_path(fullpath))
- if os.path.exists(dirname):
- shutil.rmtree(dirname)
- def update(self, path):
- """
- Notify the versioning system of changes to the versioned file
- at path.
- """
- self._vcs_update(self._u_rel_path(path))
- def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False):
- """
- Get the file as it was in a given revision.
- Revision==None specifies the current revision.
-
- allow_no_vcs==True allows direct access to files through
- codecs.open() or open() if the vcs decides it can't handle the
- given path.
- """
- if not os.path.exists(path):
- raise NoSuchFile(path)
- if self._use_vcs(path, allow_no_vcs):
- relpath = self._u_rel_path(path)
- contents = self._vcs_get_file_contents(relpath,revision,binary=binary)
+ if os.path.exists(path):
+ shutil.rmtree(path)
+ path = self._cached_path_id.path(id, relpath=True)
+ for id,p in self._cached_path_id._cache.items():
+ if p.startswith(path):
+ self._cached_path_id.remove_id(id)
+
+ def _children(self, id=None, revision=None):
+ if id==None:
+ path = self.be_dir
else:
- if binary == True:
- f = codecs.open(path, "r", self.encoding)
- else:
- f = open(path, "rb")
- contents = f.read()
- f.close()
+ path = self._cached_path_id.path(id)
+ if os.path.isdir(path) == False:
+ return []
+ children = os.listdir(path)
+ for i,c in enumerate(children):
+ if c in self._cached_path_id._spacer_dirs:
+ children[i] = None
+ children.extend([os.path.join(c, c2) for c2 in
+ os.listdir(os.path.join(path, c))])
+ elif c == 'id-cache':
+ children[i] = None
+ for i,c in enumerate(children):
+ if c == None: continue
+ cpath = os.path.join(path, c)
+ children[i] = self._cached_path_id.id(cpath)
+ return [c for c in children if c != None]
+
+ def _get(self, id, default=libbe.util.InvalidObject, revision=None):
+ try:
+ path = self._cached_path_id.path(id)
+ except libbe.storage.base.InvalidID, e:
+ if default == libbe.util.InvalidObject:
+ raise e
+ return default
+ relpath = self._u_rel_path(path)
+ contents = self._vcs_get_file_contents(relpath,revision)
+ if contents == libbe.storage.base.InvalidDirectory:
+ raise libbe.storage.base.InvalidDirectory(id)
+ elif contents == libbe.util.InvalidObject:
+ raise libbe.storage.base.InvalidID(id)
+ elif len(contents) == 0:
+ return None
return contents
- def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False):
- """
- Set the file contents under version control.
- """
- add = not os.path.exists(path)
- if binary == False:
- f = codecs.open(path, "w", self.encoding)
- else:
- f = open(path, "wb")
- f.write(contents)
- f.close()
-
- if self._use_vcs(path, allow_no_vcs):
- if add:
- self.add(path)
- else:
- self.update(path)
- def mkdir(self, path, allow_no_vcs=False, check_parents=True):
- """
- Create (if neccessary) a directory at path under version
- control.
- """
- if check_parents == True:
- parent = os.path.dirname(path)
- if not os.path.exists(parent): # recurse through parents
- self.mkdir(parent, allow_no_vcs, check_parents)
+
+ def _set(self, id, value):
+ try:
+ path = self._cached_path_id.path(id)
+ except libbe.storage.base.InvalidID, e:
+ raise e
if not os.path.exists(path):
- os.mkdir(path)
- if self._use_vcs(path, allow_no_vcs):
- self.add(path)
- else:
- assert os.path.isdir(path)
- if self._use_vcs(path, allow_no_vcs):
- #self.update(path)# Don't update directories. Changing files
- pass # underneath them should be sufficient.
-
- def duplicate_repo(self, revision=None):
- """
- Get the repository as it was in a given revision.
- revision==None specifies the current revision.
- Return the path to the arbitrary directory at the base of the new repo.
- """
- # Dirname in Basedir to protect against simlink attacks.
- if self._duplicateBasedir == None:
- self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs')
- self._duplicateDirname = \
- os.path.join(self._duplicateBasedir, "duplicate")
- self._vcs_duplicate_repo(directory=self._duplicateDirname,
- revision=revision)
- return self._duplicateDirname
- def remove_duplicate_repo(self):
- """
- Clean up a duplicate repo created with duplicate_repo().
- """
- if self._duplicateBasedir != None:
- shutil.rmtree(self._duplicateBasedir)
- self._duplicateBasedir = None
- self._duplicateDirname = None
- def commit(self, summary, body=None, allow_empty=False):
- """
- Commit the current working directory, with a commit message
- string summary and body. Return the name of the old revision
- (or None if versioning is not supported).
-
- If allow_empty == False (the default), raise EmptyCommit if
- there are no changes to commit.
- """
+ raise libbe.storage.base.InvalidID(id)
+ if os.path.isdir(path):
+ raise libbe.storage.base.InvalidDirectory(id)
+ f = open(path, "wb")
+ f.write(value)
+ f.close()
+ self._vcs_update(self._u_rel_path(path))
+
+ def _commit(self, summary, body=None, allow_empty=False):
summary = summary.strip()+'\n'
if body is not None:
summary += '\n' + body.strip() + '\n'
@@ -564,35 +730,20 @@ os.listdir(self.get_path("bugs")):
temp_file = os.fdopen(descriptor, 'wb')
temp_file.write(summary)
temp_file.flush()
- self.precommit()
revision = self._vcs_commit(filename, allow_empty=allow_empty)
temp_file.close()
- self.postcommit()
finally:
os.remove(filename)
return revision
- def precommit(self):
- """
- Executed before all attempted commits.
- """
- pass
- def postcommit(self):
- """
- Only executed after successful commits.
- """
- pass
- def revision_id(self, index=None):
- """
- Return the name of the <index>th revision. The choice of
- which branch to follow when crossing branches/merges is not
- defined.
- Return None if index==None, revision IDs are not supported, or
- if the specified revision does not exist.
- """
+ def revision_id(self, index=None):
if index == None:
return None
- return self._vcs_revision_id(index)
+ revid = self._vcs_revision_id(index)
+ if revid == None:
+ raise libbe.storage.base.InvalidRevision(index)
+ return revid
+
def _u_any_in_string(self, list, string):
"""
Return True if any of the strings in list are in string.
@@ -602,23 +753,26 @@ os.listdir(self.get_path("bugs")):
if list_string in string:
return True
return False
+
def _u_invoke(self, *args, **kwargs):
if 'cwd' not in kwargs:
- kwargs['cwd'] = self.rootdir
+ kwargs['cwd'] = self.repo
if 'verbose' not in kwargs:
- kwargs['verbose'] = self.verboseInvoke
+ kwargs['verbose'] = self.verbose_invoke
if 'encoding' not in kwargs:
kwargs['encoding'] = self.encoding
return invoke(*args, **kwargs)
+
def _u_invoke_client(self, *args, **kwargs):
cl_args = [self.client]
cl_args.extend(args)
return self._u_invoke(cl_args, **kwargs)
+
def _u_search_parent_directories(self, path, filename):
"""
Find the file (or directory) named filename in path or in any
of path's parents.
-
+
e.g.
search_parent_directories("/a/b/c", ".be")
will return the path to the first existing file from
@@ -629,6 +783,7 @@ os.listdir(self.get_path("bugs")):
or None if none of those files exist.
"""
return search_parent_directories(path, filename)
+
def _use_vcs(self, path, allow_no_vcs):
"""
Try and decide if _vcs_add/update/mkdir/etc calls will
@@ -637,16 +792,17 @@ os.listdir(self.get_path("bugs")):
"""
use_vcs = True
exception = None
- if self.rootdir != None:
+ if self.repo != None:
if self.path_in_root(path) == False:
use_vcs = False
- exception = PathNotInRoot(path, self.rootdir)
+ exception = InvalidPath(path, self.repo)
else:
use_vcs = False
exception = VCSnotRooted
if use_vcs == False and allow_no_vcs==False:
raise exception
return use_vcs
+
def path_in_root(self, path, root=None):
"""
Return the relative path to path from root.
@@ -657,15 +813,16 @@ os.listdir(self.get_path("bugs")):
False
"""
if root == None:
- if self.rootdir == None:
+ if self.repo == None:
raise VCSnotRooted
- root = self.rootdir
+ root = self.repo
path = os.path.abspath(path)
absRoot = os.path.abspath(root)
absRootSlashedDir = os.path.join(absRoot,"")
if not path.startswith(absRootSlashedDir):
return False
return True
+
def _u_rel_path(self, path, root=None):
"""
Return the relative path to path from root.
@@ -674,18 +831,19 @@ os.listdir(self.get_path("bugs")):
'.be'
"""
if root == None:
- if self.rootdir == None:
+ if self.repo == None:
raise VCSnotRooted
- root = self.rootdir
+ root = self.repo
path = os.path.abspath(path)
absRoot = os.path.abspath(root)
absRootSlashedDir = os.path.join(absRoot,"")
if not path.startswith(absRootSlashedDir):
- raise PathNotInRoot(path, absRootSlashedDir)
+ raise InvalidPath(path, absRootSlashedDir)
assert path != absRootSlashedDir, \
"file %s == root directory %s" % (path, absRootSlashedDir)
relpath = path[len(absRootSlashedDir):]
return relpath
+
def _u_abspath(self, path, root=None):
"""
Return the absolute path from a path realtive to root.
@@ -694,60 +852,10 @@ os.listdir(self.get_path("bugs")):
'/a.b/c/.be'
"""
if root == None:
- assert self.rootdir != None, "VCS not rooted"
- root = self.rootdir
+ assert self.repo != None, "VCS not rooted"
+ root = self.repo
return os.path.abspath(os.path.join(root, path))
- def _u_create_id(self, name, email=None):
- """
- >>> vcs = new()
- >>> vcs._u_create_id("John Doe", "jdoe@example.com")
- 'John Doe <jdoe@example.com>'
- >>> vcs._u_create_id("John Doe")
- 'John Doe'
- """
- assert len(name) > 0
- if email == None or len(email) == 0:
- return name
- else:
- return "%s <%s>" % (name, email)
- def _u_parse_id(self, value):
- """
- >>> vcs = new()
- >>> vcs._u_parse_id("John Doe <jdoe@example.com>")
- ('John Doe', 'jdoe@example.com')
- >>> vcs._u_parse_id("John Doe")
- ('John Doe', None)
- >>> try:
- ... vcs._u_parse_id("John Doe <jdoe@example.com><what?>")
- ... except AssertionError:
- ... print "Invalid match"
- Invalid match
- """
- emailexp = re.compile("(.*) <([^>]*)>(.*)")
- match = emailexp.search(value)
- if match == None:
- email = None
- name = value
- else:
- assert len(match.groups()) == 3
- assert match.groups()[2] == "", match.groups()
- email = match.groups()[1]
- name = match.groups()[0]
- assert name != None
- assert len(name) > 0
- return (name, email)
- def _u_get_fallback_username(self):
- name = None
- for envariable in ["LOGNAME", "USERNAME"]:
- if os.environ.has_key(envariable):
- name = os.environ[envariable]
- break
- assert name != None
- return name
- def _u_get_fallback_email(self):
- hostname = gethostname()
- name = self._u_get_fallback_username()
- return "%s@%s" % (name, hostname)
+
def _u_parse_commitfile(self, commitfile):
"""
Split the commitfile created in self.commit() back into
@@ -763,9 +871,9 @@ os.listdir(self.get_path("bugs")):
return (summary, body)
def check_disk_version(self):
- version = self.get_version()
- if version != upgrade.BUGDIR_DISK_VERSION:
- upgrade.upgrade(self.root, version)
+ version = self.version()
+ #if version != upgrade.BUGDIR_DISK_VERSION:
+ # upgrade.upgrade(self.repo, version)
def disk_version(self, path=None, use_none_vcs=False,
for_duplicate_bugdir=False):
@@ -786,39 +894,17 @@ os.listdir(self.get_path("bugs")):
if self.sync_with_disk == False:
raise DiskAccessRequired("set version")
self.vcs.mkdir(self.get_path())
- self.vcs.set_file_contents(self.get_path("version"),
- upgrade.BUGDIR_DISK_VERSION+"\n")
+ #self.vcs.set_file_contents(self.get_path("version"),
+ # upgrade.BUGDIR_DISK_VERSION+"\n")
+
-
if libbe.TESTING == True:
- def setup_vcs_test_fixtures(testcase):
- """Set up test fixtures for VCS test case."""
- testcase.vcs = testcase.Class()
- testcase.dir = Dir()
- testcase.dirname = testcase.dir.path
-
- vcs_not_supporting_uninitialized_user_id = []
- vcs_not_supporting_set_user_id = ["None", "hg"]
- testcase.vcs_supports_uninitialized_user_id = (
- testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id)
- testcase.vcs_supports_set_user_id = (
- testcase.vcs.name not in vcs_not_supporting_set_user_id)
-
- if not testcase.vcs.installed():
- testcase.fail(
- "%(name)s VCS not found" % vars(testcase.Class))
-
- if testcase.Class.name != "None":
- testcase.failIf(
- testcase.vcs.detect(testcase.dirname),
- "Detected %(name)s VCS before initialising"
- % vars(testcase.Class))
-
- testcase.vcs.init(testcase.dirname)
-
- class VCSTestCase(unittest.TestCase):
- """Test cases for base VCS class."""
+ class VCSTestCase (unittest.TestCase):
+ """
+ Test cases for base VCS class (in addition to the Storage test
+ cases).
+ """
Class = VCS
@@ -827,271 +913,102 @@ if libbe.TESTING == True:
self.dirname = None
def setUp(self):
+ """Set up test fixtures for Storage test case."""
super(VCSTestCase, self).setUp()
- setup_vcs_test_fixtures(self)
+ self.dir = Dir()
+ self.dirname = self.dir.path
+ self.s = self.Class(repo=self.dirname)
+ if self.s.installed() == True:
+ self.s.init()
+ self.s.connect()
def tearDown(self):
- self.vcs.cleanup()
- self.dir.cleanup()
super(VCSTestCase, self).tearDown()
+ if self.s.installed() == True:
+ self.s.disconnect()
+ self.s.destroy()
+ self.dir.cleanup()
- def full_path(self, rel_path):
- return os.path.join(self.dirname, rel_path)
-
+ def test_installed(self):
+ """
+ See if the VCS is installed.
+ """
+ self.failUnless(self.s.installed() == True,
+ '%(name)s VCS not found' % vars(self.Class))
- class VCS_init_TestCase(VCSTestCase):
- """Test cases for VCS.init method."""
- def test_detect_should_succeed_after_init(self):
- """Should detect VCS in directory after initialization."""
- self.failUnless(
- self.vcs.detect(self.dirname),
- "Did not detect %(name)s VCS after initialising"
+ class VCS_detection_TestCase (VCSTestCase):
+ def test_detection(self):
+ """
+ See if the VCS detects its installed repository
+ """
+ if self.s.installed():
+ self.s.disconnect()
+ self.failUnless(self.s._detect(self.dirname) == True,
+ 'Did not detected %(name)s VCS after initialising'
% vars(self.Class))
+ self.s.connect()
- def test_vcs_rootdir_in_specified_root_path(self):
+ def test_no_detection(self):
+ """
+ See if the VCS detects its installed repository
+ """
+ if self.s.installed() and self.Class.name != 'None':
+ self.s.disconnect()
+ self.s.destroy()
+ self.failUnless(self.s._detect(self.dirname) == False,
+ 'Detected %(name)s VCS before initialising'
+ % self.Class)
+ self.s.init()
+ self.s.connect()
+
+ def test_vcs_repo_in_specified_root_path(self):
"""VCS root directory should be in specified root path."""
- rp = os.path.realpath(self.vcs.rootdir)
+ rp = os.path.realpath(self.s.repo)
dp = os.path.realpath(self.dirname)
vcs_name = self.Class.name
self.failUnless(
dp == rp or rp == None,
"%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
-
class VCS_get_user_id_TestCase(VCSTestCase):
"""Test cases for VCS.get_user_id method."""
def test_gets_existing_user_id(self):
"""Should get the existing user ID."""
- if not self.vcs_supports_uninitialized_user_id:
- return
-
- user_id = self.vcs.get_user_id()
- self.failUnless(
- user_id is not None,
- "unable to get a user id")
-
-
- class VCS_set_user_id_TestCase(VCSTestCase):
- """Test cases for VCS.set_user_id method."""
-
- def setUp(self):
- super(VCS_set_user_id_TestCase, self).setUp()
-
- if self.vcs_supports_uninitialized_user_id:
- self.prev_user_id = self.vcs.get_user_id()
- else:
- self.prev_user_id = "Uninitialized identity <bogus@example.org>"
-
- if self.vcs_supports_set_user_id:
- self.test_new_user_id = "John Doe <jdoe@example.com>"
- self.vcs.set_user_id(self.test_new_user_id)
-
- def tearDown(self):
- if self.vcs_supports_set_user_id:
- self.vcs.set_user_id(self.prev_user_id)
- super(VCS_set_user_id_TestCase, self).tearDown()
-
- def test_raises_error_in_unsupported_vcs(self):
- """Should raise an error in a VCS that doesn't support it."""
- if self.vcs_supports_set_user_id:
+ user_id = self.s.get_user_id()
+ if user_id == None:
return
- self.assertRaises(
- SettingIDnotSupported,
- self.vcs.set_user_id, "foo")
-
- def test_updates_user_id_in_supporting_vcs(self):
- """Should update the user ID in an VCS that supports it."""
- if not self.vcs_supports_set_user_id:
- return
- user_id = self.vcs.get_user_id()
- self.failUnlessEqual(
- self.test_new_user_id, user_id,
- "user id not set correctly (expected %s, got %s)"
- % (self.test_new_user_id, user_id))
-
-
- def setup_vcs_revision_test_fixtures(testcase):
- """Set up revision test fixtures for VCS test case."""
- testcase.test_dirs = ['a', 'a/b', 'c']
- for path in testcase.test_dirs:
- testcase.vcs.mkdir(testcase.full_path(path))
-
- testcase.test_files = ['a/text', 'a/b/text']
-
- testcase.test_contents = {
- 'rev_1': "Lorem ipsum",
- 'uncommitted': "dolor sit amet",
- }
-
-
- class VCS_mkdir_TestCase(VCSTestCase):
- """Test cases for VCS.mkdir method."""
-
- def setUp(self):
- super(VCS_mkdir_TestCase, self).setUp()
- setup_vcs_revision_test_fixtures(self)
-
- def tearDown(self):
- for path in reversed(sorted(self.test_dirs)):
- self.vcs.recursive_remove(self.full_path(path))
- super(VCS_mkdir_TestCase, self).tearDown()
-
- def test_mkdir_creates_directory(self):
- """Should create specified directory in filesystem."""
- for path in self.test_dirs:
- full_path = self.full_path(path)
- self.failUnless(
- os.path.exists(full_path),
- "path %(full_path)s does not exist" % vars())
-
-
- class VCS_commit_TestCase(VCSTestCase):
- """Test cases for VCS.commit method."""
-
- def setUp(self):
- super(VCS_commit_TestCase, self).setUp()
- setup_vcs_revision_test_fixtures(self)
-
- def tearDown(self):
- for path in reversed(sorted(self.test_dirs)):
- self.vcs.recursive_remove(self.full_path(path))
- super(VCS_commit_TestCase, self).tearDown()
-
- def test_file_contents_as_specified(self):
- """Should set file contents as specified."""
- test_contents = self.test_contents['rev_1']
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(full_path, test_contents)
- current_contents = self.vcs.get_file_contents(full_path)
- self.failUnlessEqual(test_contents, current_contents)
-
- def test_file_contents_as_committed(self):
- """Should have file contents as specified after commit."""
- test_contents = self.test_contents['rev_1']
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(full_path, test_contents)
- revision = self.vcs.commit("Initial file contents.")
- current_contents = self.vcs.get_file_contents(full_path)
- self.failUnlessEqual(test_contents, current_contents)
-
- def test_file_contents_as_set_when_uncommitted(self):
- """Should set file contents as specified after commit."""
- if not self.vcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.vcs.commit("Initial file contents.")
- self.vcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- current_contents = self.vcs.get_file_contents(full_path)
- self.failUnlessEqual(
- self.test_contents['uncommitted'], current_contents)
-
- def test_revision_file_contents_as_committed(self):
- """Should get file contents as committed to specified revision."""
- if not self.vcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.vcs.commit("Initial file contents.")
- self.vcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- committed_contents = self.vcs.get_file_contents(
- full_path, revision)
- self.failUnlessEqual(
- self.test_contents['rev_1'], committed_contents)
-
- def test_revision_id_as_committed(self):
- """Check for compatibility between .commit() and .revision_id()"""
- if not self.vcs.versioned:
- self.failUnlessEqual(self.vcs.revision_id(5), None)
- return
- committed_revisions = []
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.vcs.commit("Initial %s contents." % path)
- committed_revisions.append(revision)
- self.vcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- revision = self.vcs.commit("Altered %s contents." % path)
- committed_revisions.append(revision)
- for i,revision in enumerate(committed_revisions):
- self.failUnlessEqual(self.vcs.revision_id(i), revision)
- i += -len(committed_revisions) # check negative indices
- self.failUnlessEqual(self.vcs.revision_id(i), revision)
- i = len(committed_revisions)
- self.failUnlessEqual(self.vcs.revision_id(i), None)
- self.failUnlessEqual(self.vcs.revision_id(-i-1), None)
-
- def test_revision_id_as_committed(self):
- """Check revision id before first commit"""
- if not self.vcs.versioned:
- self.failUnlessEqual(self.vcs.revision_id(5), None)
- return
- committed_revisions = []
- for path in self.test_files:
- self.failUnlessEqual(self.vcs.revision_id(0), None)
-
-
- class VCS_duplicate_repo_TestCase(VCSTestCase):
- """Test cases for VCS.duplicate_repo method."""
-
- def setUp(self):
- super(VCS_duplicate_repo_TestCase, self).setUp()
- setup_vcs_revision_test_fixtures(self)
-
- def tearDown(self):
- self.vcs.remove_duplicate_repo()
- for path in reversed(sorted(self.test_dirs)):
- self.vcs.recursive_remove(self.full_path(path))
- super(VCS_duplicate_repo_TestCase, self).tearDown()
-
- def test_revision_file_contents_as_committed(self):
- """Should match file contents as committed to specified revision.
- """
- if not self.vcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.vcs.commit("Commit current status")
- self.vcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- dup_repo_path = self.vcs.duplicate_repo(revision)
- dup_file_path = os.path.join(dup_repo_path, path)
- dup_file_contents = file(dup_file_path, 'rb').read()
- self.failUnlessEqual(
- self.test_contents['rev_1'], dup_file_contents)
- self.vcs.remove_duplicate_repo()
-
-
- def make_vcs_testcase_subclasses(vcs_class, namespace):
- """Make VCSTestCase subclasses for vcs_class in the namespace."""
- vcs_testcase_classes = [
- c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
- if issubclass(c, VCSTestCase)]
-
- for base_class in vcs_testcase_classes:
- testcase_class_name = vcs_class.__name__ + base_class.__name__
- testcase_class_bases = (base_class,)
- testcase_class_dict = dict(base_class.__dict__)
- testcase_class_dict['Class'] = vcs_class
- testcase_class = type(
- testcase_class_name, testcase_class_bases, testcase_class_dict)
- setattr(namespace, testcase_class_name, testcase_class)
-
+ name,email = libbe.ui.util.user.parse_user_id(user_id)
+ if email != None:
+ self.failUnless('@' in email, email)
+
+ def make_vcs_testcase_subclasses(storage_class, namespace):
+ c = storage_class()
+ if c.versioned == True:
+ libbe.storage.base.make_versioned_storage_testcase_subclasses(
+ storage_class, namespace)
+ else:
+ libbe.storage.base.make_storage_testcase_subclasses(
+ storage_class, namespace)
+
+ if namespace != sys.modules[__name__]:
+ # Make VCSTestCase subclasses for vcs_class in the namespace.
+ vcs_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, VCSTestCase)]
+
+ for base_class in vcs_testcase_classes:
+ testcase_class_name = vcs_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = vcs_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+
+ make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])