aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/storage')
-rw-r--r--libbe/storage/__init__.py2
-rw-r--r--libbe/storage/base.py94
-rw-r--r--libbe/storage/util/config.py8
-rw-r--r--libbe/storage/util/mapfile.py4
-rw-r--r--libbe/storage/util/properties.py150
-rw-r--r--libbe/storage/util/settings_object.py184
-rw-r--r--libbe/storage/util/upgrade.py31
-rw-r--r--libbe/storage/vcs/__init__.py2
-rw-r--r--libbe/storage/vcs/base.py38
-rw-r--r--libbe/storage/vcs/bzr.py30
-rw-r--r--libbe/storage/vcs/darcs.py6
-rw-r--r--libbe/storage/vcs/git.py6
-rw-r--r--libbe/storage/vcs/hg.py6
13 files changed, 279 insertions, 282 deletions
diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py
index 01885e9..baed80b 100644
--- a/libbe/storage/__init__.py
+++ b/libbe/storage/__init__.py
@@ -53,7 +53,7 @@ STORAGE_VERSIONS = ['Bugs Everywhere Tree 1 0',
STORAGE_VERSION = STORAGE_VERSIONS[-1]
def get_vcs_storage(location):
- import vcs
+ from . import vcs
s = vcs.detect_vcs(location)
s.repo = location
return s
diff --git a/libbe/storage/base.py b/libbe/storage/base.py
index 977179d..4c05287 100644
--- a/libbe/storage/base.py
+++ b/libbe/storage/base.py
@@ -198,7 +198,7 @@ class Storage (object):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
root = Entry(id='__ROOT__', directory=True)
d = {root.id:root}
- pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1)
+ pickle.dump(dict((k,v._objects_to_ids()) for k,v in list(d.items())), f, -1)
f.close()
def destroy(self):
@@ -223,7 +223,7 @@ class Storage (object):
except IOError:
raise ConnectionError(self)
d = pickle.load(f)
- self._data = dict((k,v._ids_to_objects(d)) for k,v in d.items())
+ self._data = dict((k,v._ids_to_objects(d)) for k,v in list(d.items()))
f.close()
def disconnect(self):
@@ -238,7 +238,7 @@ class Storage (object):
def _disconnect(self):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
pickle.dump(dict((k,v._objects_to_ids())
- for k,v in self._data.items()), f, -1)
+ for k,v in list(self._data.items())), f, -1)
f.close()
self._data = None
@@ -336,9 +336,9 @@ class Storage (object):
decode = False
value = self._get(*args, **kwargs)
if value != None:
- if decode == True and type(value) != types.UnicodeType:
- return unicode(value, self.encoding)
- elif decode == False and type(value) != types.StringType:
+ if decode == True and type(value) != str:
+ return str(value, self.encoding)
+ elif decode == False and type(value) != bytes:
return value.encode(self.encoding)
return value
@@ -355,7 +355,7 @@ class Storage (object):
"""
if self.is_writeable() == False:
raise NotWriteable('Cannot set entry in unwriteable storage.')
- if type(value) == types.UnicodeType:
+ if type(value) == str:
value = value.encode(self.encoding)
self._set(id, value, *args, **kwargs)
@@ -386,7 +386,7 @@ class VersionedStorage (Storage):
summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit')
body = Entry(id='__COMMIT__BODY__')
initial_commit = {root.id:root, summary.id:summary, body.id:body}
- d = dict((k,v._objects_to_ids()) for k,v in initial_commit.items())
+ d = dict((k,v._objects_to_ids()) for k,v in list(initial_commit.items()))
pickle.dump([d, copy.deepcopy(d)], f, -1) # [inital tree, working tree]
f.close()
@@ -396,14 +396,14 @@ class VersionedStorage (Storage):
except IOError:
raise ConnectionError(self)
d = pickle.load(f)
- self._data = [dict((k,v._ids_to_objects(t)) for k,v in t.items())
+ self._data = [dict((k,v._ids_to_objects(t)) for k,v in list(t.items()))
for t in d]
f.close()
def _disconnect(self):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
pickle.dump([dict((k,v._objects_to_ids())
- for k,v in t.items()) for t in self._data], f, -1)
+ for k,v in list(t.items())) for t in self._data], f, -1)
f.close()
self._data = None
@@ -527,7 +527,7 @@ class VersionedStorage (Storage):
new = []
modified = []
removed = []
- for id,value in self._data[int(revision)].items():
+ for id,value in list(self._data[int(revision)].items()):
if id.startswith('__'):
continue
if not id in self._data[-1]:
@@ -624,7 +624,7 @@ if TESTING == True:
def test_initially_empty(self):
"""New repository should be empty."""
- self.failUnless(len(self.s.children()) == 0, self.s.children())
+ self.assertTrue(len(self.s.children()) == 0, self.s.children())
def test_add_identical_rooted(self):
"""Adding entries with the same ID should not increase the number of children.
@@ -632,7 +632,7 @@ if TESTING == True:
for i in range(10):
self.s.add('some id', directory=False)
s = sorted(self.s.children())
- self.failUnless(s == ['some id'], s)
+ self.assertTrue(s == ['some id'], s)
def test_add_rooted(self):
"""Adding entries should increase the number of children (rooted).
@@ -642,7 +642,7 @@ if TESTING == True:
ids.append(str(i))
self.s.add(ids[-1], directory=(i % 2 == 0))
s = sorted(self.s.children())
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_add_nonrooted(self):
"""Adding entries should increase the number of children (nonrooted).
@@ -653,9 +653,9 @@ if TESTING == True:
ids.append(str(i))
self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
def test_ancestors(self):
"""Check ancestors lists.
@@ -668,7 +668,7 @@ if TESTING == True:
j_id = str(20*(i+1)+j)
self.s.add(j_id, i_id, directory=(i%2 == 0))
ancestors = sorted(self.s.ancestors(j_id))
- self.failUnless(ancestors == [i_id, 'parent'],
+ self.assertTrue(ancestors == [i_id, 'parent'],
'Unexpected ancestors for %s/%s, "%s"'
% (i_id, j_id, ancestors))
@@ -681,7 +681,7 @@ if TESTING == True:
ids.append('parent/%s' % str(i))
self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_grandchildren(self):
"""Grandchildren should not be returned as children.
@@ -699,7 +699,7 @@ if TESTING == True:
directory = (j % 2 == 0)
self.s.add(grandchild, child, directory=directory)
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_add_invalid_directory(self):
"""Should not be able to add children to non-directories.
@@ -719,7 +719,7 @@ if TESTING == True:
% (vars(self.Class)['name']))
except InvalidDirectory:
pass
- self.failUnless(len(self.s.children('parent')) == 0,
+ self.assertTrue(len(self.s.children('parent')) == 0,
self.s.children('parent'))
def test_remove_rooted(self):
@@ -732,7 +732,7 @@ if TESTING == True:
for i in range(10):
self.s.remove(ids.pop())
s = sorted(self.s.children())
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_remove_nonrooted(self):
"""Removing entries should decrease the number of children (nonrooted).
@@ -745,10 +745,10 @@ if TESTING == True:
for i in range(10):
self.s.remove(ids.pop())
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
if len(s) > 0:
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
def test_remove_directory_not_empty(self):
"""Removing a non-empty directory entry should raise exception.
@@ -778,7 +778,7 @@ if TESTING == True:
self.s.add(str(20*(i+1)+j), ids[-1], directory=(i%2 == 0))
self.s.recursive_remove('parent')
s = sorted(self.s.children())
- self.failUnless(s == [], s)
+ self.assertTrue(s == [], s)
class Storage_get_set_TestCase (StorageTestCase):
"""Test cases for Storage.get and .set methods."""
@@ -790,7 +790,7 @@ if TESTING == True:
"""Get should return specified default if id not in Storage.
"""
ret = self.s.get(self.id, default=self.val)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
@@ -811,7 +811,7 @@ if TESTING == True:
self.s.add(self.id, directory=False)
val = 'UNLIKELY DEFAULT'
ret = self.s.get(self.id, default=val)
- self.failUnless(ret == val,
+ self.assertTrue(ret == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, val))
@@ -832,29 +832,29 @@ if TESTING == True:
self.s.add(self.id, directory=False)
self.s.set(self.id, self.val)
ret = self.s.get(self.id)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
def test_unicode_set(self):
"""Set should define the value returned by get.
"""
- val = u'Fran\xe7ois'
+ val = 'Fran\xe7ois'
self.s.add(self.id, directory=False)
self.s.set(self.id, val)
ret = self.s.get(self.id, decode=True)
- self.failUnless(type(ret) == types.UnicodeType,
+ self.assertTrue(type(ret) == str,
"%s.get() returned %s not UnicodeType"
% (vars(self.Class)['name'], type(ret)))
- self.failUnless(ret == val,
+ self.assertTrue(ret == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
ret = self.s.get(self.id)
- self.failUnless(type(ret) == types.StringType,
+ self.assertTrue(type(ret) == bytes,
"%s.get() returned %s not StringType"
% (vars(self.Class)['name'], type(ret)))
- s = unicode(ret, self.s.encoding)
- self.failUnless(s == val,
+ s = str(ret, self.s.encoding)
+ self.assertTrue(s == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], s, self.val))
@@ -873,7 +873,7 @@ if TESTING == True:
self.s.disconnect()
self.s.connect()
ret = self.s.get(self.id)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
@@ -886,7 +886,7 @@ if TESTING == True:
self.s.connect()
default = 'UNLIKELY DEFAULT'
ret = self.s.get(self.id, default=default)
- self.failUnless(ret in ['', default],
+ self.assertTrue(ret in ['', default],
"%s.get() returned %s not in %s"
% (vars(self.Class)['name'], ret, ['', default]))
@@ -901,9 +901,9 @@ if TESTING == True:
self.s.disconnect()
self.s.connect()
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
class VersionedStorageTestCase (StorageTestCase):
"""Test cases for VersionedStorage methods."""
@@ -972,12 +972,12 @@ if TESTING == True:
self.commit_body))
for i in range(10):
rev = self.s.revision_id(i+1)
- self.failUnless(rev == revs[i],
+ self.assertTrue(rev == revs[i],
"%s.revision_id(%d) returned %s not %s"
% (vars(self.Class)['name'], i+1, rev, revs[i]))
for i in range(-1, -9, -1):
rev = self.s.revision_id(i)
- self.failUnless(rev == revs[i],
+ self.assertTrue(rev == revs[i],
"%s.revision_id(%d) returned %s not %s"
% (vars(self.Class)['name'], i, rev, revs[i]))
@@ -994,7 +994,7 @@ if TESTING == True:
self.commit_body))
for i in range(10):
ret = self.s.get(self.id, revision=revs[i])
- self.failUnless(ret == val(i),
+ self.assertTrue(ret == val(i),
"%s.get() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret, val(i), revs[i]))
@@ -1015,7 +1015,7 @@ if TESTING == True:
children.append(list(cur_children))
for i in range(10):
ret = sorted(self.s.children('parent', revision=revs[i]))
- self.failUnless(ret == children[i],
+ self.assertTrue(ret == children[i],
"%s.children() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret,
children[i], revs[i]))
@@ -1047,7 +1047,7 @@ if TESTING == True:
self.commit_body))
for rev,cur_children in zip(revs, children):
ret = sorted(self.s.children('parent', revision=rev))
- self.failUnless(ret == cur_children,
+ self.assertTrue(ret == cur_children,
"%s.children() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret,
cur_children, rev))
@@ -1074,18 +1074,18 @@ if TESTING == True:
self.s.remove('removed')
revB = self.s.commit('Final state')
new,mod,rem = self.s.changed(revA)
- self.failUnless(sorted(new) == ['moved2', 'new'],
+ self.assertTrue(sorted(new) == ['moved2', 'new'],
'Unexpected new: %s' % new)
- self.failUnless(mod == ['modified'],
+ self.assertTrue(mod == ['modified'],
'Unexpected modified: %s' % mod)
- self.failUnless(sorted(rem) == ['moved', 'removed'],
+ self.assertTrue(sorted(rem) == ['moved', 'removed'],
'Unexpected removed: %s' % rem)
def make_storage_testcase_subclasses(storage_class, namespace):
"""Make StorageTestCase subclasses for storage_class in namespace."""
storage_testcase_classes = [
c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
+ ob for ob in list(globals().values()) if isinstance(ob, type))
if issubclass(c, StorageTestCase) \
and c.Class == Storage]
@@ -1102,7 +1102,7 @@ if TESTING == True:
"""Make VersionedStorageTestCase subclasses for storage_class in namespace."""
storage_testcase_classes = [
c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
+ ob for ob in list(globals().values()) if isinstance(ob, type))
if ((issubclass(c, StorageTestCase) \
and c.Class == Storage)
or
diff --git a/libbe/storage/util/config.py b/libbe/storage/util/config.py
index 4945b1f..f53f5b2 100644
--- a/libbe/storage/util/config.py
+++ b/libbe/storage/util/config.py
@@ -21,7 +21,7 @@
"""Create, save, and load the per-user config file at :py:func:`path`.
"""
-import ConfigParser
+import configparser
import codecs
import os
import os.path
@@ -71,7 +71,7 @@ def set_val(name, value, section="DEFAULT", encoding=None):
"""
if encoding == None:
encoding = default_encoding
- config = ConfigParser.ConfigParser()
+ config = configparser.ConfigParser()
if os.path.exists(path()) == False: # touch file or config
open(path(), 'w').close() # read chokes on missing file
f = codecs.open(path(), 'r', encoding)
@@ -114,13 +114,13 @@ def get_val(name, section="DEFAULT", default=None, encoding=None):
if os.path.exists(path()):
if encoding == None:
encoding = default_encoding
- config = ConfigParser.ConfigParser()
+ config = configparser.ConfigParser()
f = codecs.open(path(), 'r', encoding)
config.readfp(f, path())
f.close()
try:
return config.get(section, name)
- except ConfigParser.NoOptionError:
+ except configparser.NoOptionError:
return default
else:
return default
diff --git a/libbe/storage/util/mapfile.py b/libbe/storage/util/mapfile.py
index 1c37e3f..609f782 100644
--- a/libbe/storage/util/mapfile.py
+++ b/libbe/storage/util/mapfile.py
@@ -70,7 +70,7 @@ def generate(map, context=6):
the :py:mod:`~libbe.command.serve_commands` where merging is not
important, the amount of context is controllable.
- >>> sys.stdout.write(generate({'q':u'Fran\u00e7ais'}, context=0))
+ >>> sys.stdout.write(generate({'q':u'Fran\\u00e7ais'}, context=0))
{
"q": "Fran\\u00e7ais"
}
@@ -113,7 +113,7 @@ def parse(contents):
u'd'
>>> dict['e']
u'f'
- >>> contents = generate({'q':u'Fran\u00e7ais'})
+ >>> contents = generate({'q':u'Fran\\u00e7ais'})
>>> dict = parse(contents)
>>> dict['q']
u'Fran\\xe7ais'
diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py
index 77c0162..4aa3ce6 100644
--- a/libbe/storage/util/properties.py
+++ b/libbe/storage/util/properties.py
@@ -415,7 +415,7 @@ if libbe.TESTING == True:
@doc_property("A fancy property")
def x():
return {}
- self.failUnless(Test.x.__doc__ == "A fancy property",
+ self.assertTrue(Test.x.__doc__ == "A fancy property",
Test.x.__doc__)
def testLocalProperty(self):
class Test(object):
@@ -424,11 +424,11 @@ if libbe.TESTING == True:
def x():
return {}
t = Test()
- self.failUnless(t.x == None, str(t.x))
+ self.assertTrue(t.x == None, str(t.x))
t.x = 'z' # the first set initializes ._LOCAL_value
- self.failUnless(t.x == 'z', str(t.x))
- self.failUnless("_LOCAL_value" in dir(t), dir(t))
- self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
+ self.assertTrue(t.x == 'z', str(t.x))
+ self.assertTrue("_LOCAL_value" in dir(t), dir(t))
+ self.assertTrue(t._LOCAL_value == 'z', t._LOCAL_value)
def testSettingsProperty(self):
class Test(object):
@Property
@@ -438,11 +438,11 @@ if libbe.TESTING == True:
def __init__(self):
self.settings = {}
t = Test()
- self.failUnless(t.x == None, str(t.x))
+ self.assertTrue(t.x == None, str(t.x))
t.x = 'z' # the first set initializes ._LOCAL_value
- self.failUnless(t.x == 'z', str(t.x))
- self.failUnless("attr" in t.settings, t.settings)
- self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
+ self.assertTrue(t.x == 'z', str(t.x))
+ self.assertTrue("attr" in t.settings, t.settings)
+ self.assertTrue(t.settings["attr"] == 'z', t.settings["attr"])
def testDefaultingLocalProperty(self):
class Test(object):
@Property
@@ -450,15 +450,15 @@ if libbe.TESTING == True:
@local_property(name="DEFAULT", null=5)
def x(): return {}
t = Test()
- self.failUnless(t.x == 5, str(t.x))
+ self.assertTrue(t.x == 5, str(t.x))
t.x = 'x'
- self.failUnless(t.x == 'y', str(t.x))
+ self.assertTrue(t.x == 'y', str(t.x))
t.x = 'y'
- self.failUnless(t.x == 'y', str(t.x))
+ self.assertTrue(t.x == 'y', str(t.x))
t.x = 'z'
- self.failUnless(t.x == 'z', str(t.x))
+ self.assertTrue(t.x == 'z', str(t.x))
t.x = 5
- self.failUnless(t.x == 5, str(t.x))
+ self.assertTrue(t.x == 5, str(t.x))
def testCheckedLocalProperty(self):
class Test(object):
@Property
@@ -468,13 +468,13 @@ if libbe.TESTING == True:
def __init__(self):
self._CHECKED_value = 'x'
t = Test()
- self.failUnless(t.x == 'x', str(t.x))
+ self.assertTrue(t.x == 'x', str(t.x))
try:
t.x = None
e = None
- except ValueCheckError, e:
+ except ValueCheckError as e:
pass
- self.failUnless(type(e) == ValueCheckError, type(e))
+ self.assertTrue(type(e) == ValueCheckError, type(e))
def testTwoCheckedLocalProperties(self):
class Test(object):
@Property
@@ -493,18 +493,18 @@ if libbe.TESTING == True:
try:
t.x = 'a'
e = None
- except ValueCheckError, e:
+ except ValueCheckError as e:
pass
- self.failUnless(type(e) == ValueCheckError, type(e))
+ self.assertTrue(type(e) == ValueCheckError, type(e))
t.x = 'x'
t.x = 'y'
t.x = 'z'
try:
t.a = 'x'
e = None
- except ValueCheckError, e:
+ except ValueCheckError as e:
pass
- self.failUnless(type(e) == ValueCheckError, type(e))
+ self.assertTrue(type(e) == ValueCheckError, type(e))
t.a = 'a'
t.a = 'b'
t.a = 'c'
@@ -517,13 +517,13 @@ if libbe.TESTING == True:
def __init__(self):
self._CHECKED_value = 'x'
t = Test()
- self.failUnless(t.x == 'x', str(t.x))
+ self.assertTrue(t.x == 'x', str(t.x))
try:
t.x = None
e = None
- except ValueCheckError, e:
+ except ValueCheckError as e:
pass
- self.failUnless(type(e) == ValueCheckError, type(e))
+ self.assertTrue(type(e) == ValueCheckError, type(e))
def testCachedLocalProperty(self):
class Gen(object):
def __init__(self):
@@ -537,30 +537,30 @@ if libbe.TESTING == True:
@local_property(name="CACHED")
def x(): return {}
t = Test()
- self.failIf("_CACHED_cache" in dir(t),
+ self.assertFalse("_CACHED_cache" in dir(t),
getattr(t, "_CACHED_cache", None))
- self.failUnless(t.x == 1, t.x)
- self.failUnless(t.x == 1, t.x)
- self.failUnless(t.x == 1, t.x)
+ self.assertTrue(t.x == 1, t.x)
+ self.assertTrue(t.x == 1, t.x)
+ self.assertTrue(t.x == 1, t.x)
t.x = 8
- self.failUnless(t.x == 8, t.x)
- self.failUnless(t.x == 8, t.x)
+ self.assertTrue(t.x == 8, t.x)
+ self.assertTrue(t.x == 8, t.x)
t._CACHED_cache = False # Caching is off, but the stored value
val = t.x # is 8, not the initVal (None), so we
- self.failUnless(val == 8, val) # get 8.
+ self.assertTrue(val == 8, val) # get 8.
t._CACHED_value = None # Now we've set the stored value to None
val = t.x # so future calls to fget (like this)
- self.failUnless(val == 2, val) # will call the generator every time...
+ self.assertTrue(val == 2, val) # will call the generator every time...
val = t.x
- self.failUnless(val == 3, val)
+ self.assertTrue(val == 3, val)
val = t.x
- self.failUnless(val == 4, val)
+ self.assertTrue(val == 4, val)
t._CACHED_cache = True # We turn caching back on, and get
- self.failUnless(t.x == 1, str(t.x)) # the original cached value.
+ self.assertTrue(t.x == 1, str(t.x)) # the original cached value.
del t._CACHED_cached_value # Removing that value forces a
- self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call
- self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which
- self.failUnless(t.x == 5, str(t.x)) # we get the new cached value.
+ self.assertTrue(t.x == 5, str(t.x)) # single cache-regenerating call
+ self.assertTrue(t.x == 5, str(t.x)) # to the genenerator, after which
+ self.assertTrue(t.x == 5, str(t.x)) # we get the new cached value.
def testPrimedLocalProperty(self):
class Test(object):
def prime(self):
@@ -573,23 +573,23 @@ if libbe.TESTING == True:
self.settings={}
self.primeVal = "initialized"
t = Test()
- self.failIf("_PRIMED_prime" in dir(t),
+ self.assertFalse("_PRIMED_prime" in dir(t),
getattr(t, "_PRIMED_prime", None))
- self.failUnless(t.x == "initialized", t.x)
+ self.assertTrue(t.x == "initialized", t.x)
t.x = 1
- self.failUnless(t.x == 1, t.x)
+ self.assertTrue(t.x == 1, t.x)
t.x = None
- self.failUnless(t.x == "initialized", t.x)
+ self.assertTrue(t.x == "initialized", t.x)
t._PRIMED_prime = True
t.x = 3
- self.failUnless(t.x == "initialized", t.x)
+ self.assertTrue(t.x == "initialized", t.x)
t._PRIMED_prime = False
t.x = 3
- self.failUnless(t.x == 3, t.x)
+ self.assertTrue(t.x == 3, t.x)
# test unprimableVal
t.x = None
t.primeVal = None
- self.failUnless(t.x == 2, t.x)
+ self.assertTrue(t.x == 2, t.x)
def testChangeHookLocalProperty(self):
class Test(object):
def _hook(self, old, new):
@@ -602,14 +602,14 @@ if libbe.TESTING == True:
def x(): return {}
t = Test()
t.x = 1
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == 1, t.new)
+ self.assertTrue(t.old == None, t.old)
+ self.assertTrue(t.new == 1, t.new)
t.x = 1
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == 1, t.new)
+ self.assertTrue(t.old == None, t.old)
+ self.assertTrue(t.new == 1, t.new)
t.x = 2
- self.failUnless(t.old == 1, t.old)
- self.failUnless(t.new == 2, t.new)
+ self.assertTrue(t.old == 1, t.old)
+ self.assertTrue(t.new == 2, t.new)
def testChangeHookMutableProperty(self):
class Test(object):
def _hook(self, old, new):
@@ -624,45 +624,45 @@ if libbe.TESTING == True:
t = Test()
t.hook_calls = 0
t.x = []
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == [], t.new)
- self.failUnless(t.hook_calls == 1, t.hook_calls)
+ self.assertTrue(t.old == None, t.old)
+ self.assertTrue(t.new == [], t.new)
+ self.assertTrue(t.hook_calls == 1, t.hook_calls)
a = t.x
a.append(5)
t.x = a
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 2, t.hook_calls)
+ self.assertTrue(t.old == [], t.old)
+ self.assertTrue(t.new == [5], t.new)
+ self.assertTrue(t.hook_calls == 2, t.hook_calls)
t.x = []
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [], t.new)
- self.failUnless(t.hook_calls == 3, t.hook_calls)
+ self.assertTrue(t.old == [5], t.old)
+ self.assertTrue(t.new == [], t.new)
+ self.assertTrue(t.hook_calls == 3, t.hook_calls)
# now append without reassigning. this doesn't trigger the
# change, since we don't ever set t.x, only get it and mess
# with it. It does, however, update our t.new, since t.new =
# t.x and is not a static copy.
t.x.append(5)
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 3, t.hook_calls)
+ self.assertTrue(t.old == [5], t.old)
+ self.assertTrue(t.new == [5], t.new)
+ self.assertTrue(t.hook_calls == 3, t.hook_calls)
# however, the next t.x get _will_ notice the change...
a = t.x
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 4, t.hook_calls)
+ self.assertTrue(t.old == [], t.old)
+ self.assertTrue(t.new == [5], t.new)
+ self.assertTrue(t.hook_calls == 4, t.hook_calls)
t.x.append(6) # this append(6) is not noticed yet
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5,6], t.new)
- self.failUnless(t.hook_calls == 4, t.hook_calls)
+ self.assertTrue(t.old == [], t.old)
+ self.assertTrue(t.new == [5,6], t.new)
+ self.assertTrue(t.hook_calls == 4, t.hook_calls)
# this append(7) is not noticed, but the t.x get causes the
# append(6) to be noticed
t.x.append(7)
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [5,6,7], t.new)
- self.failUnless(t.hook_calls == 5, t.hook_calls)
+ self.assertTrue(t.old == [5], t.old)
+ self.assertTrue(t.new == [5,6,7], t.new)
+ self.assertTrue(t.hook_calls == 5, t.hook_calls)
a = t.x # now the append(7) is noticed
- self.failUnless(t.old == [5,6], t.old)
- self.failUnless(t.new == [5,6,7], t.new)
- self.failUnless(t.hook_calls == 6, t.hook_calls)
+ self.assertTrue(t.old == [5,6], t.old)
+ self.assertTrue(t.new == [5,6,7], t.new)
+ self.assertTrue(t.hook_calls == 6, t.hook_calls)
suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py
index 819698f..323d6e9 100644
--- a/libbe/storage/util/settings_object.py
+++ b/libbe/storage/util/settings_object.py
@@ -27,7 +27,7 @@ See Also
"""
import libbe
-from properties import Property, doc_property, local_property, \
+from .properties import Property, doc_property, local_property, \
defaulting_property, checked_property, fn_checked_property, \
cached_property, primed_property, change_hook_property, \
settings_property
@@ -324,7 +324,7 @@ if libbe.TESTING == True:
required_saved_properties=required_saved_properties)
def content_type(): return {}
expected = "A test property\n\nThis property defaults to None."
- self.failUnless(Test.content_type.__doc__ == expected,
+ self.assertTrue(Test.content_type.__doc__ == expected,
Test.content_type.__doc__)
def testSimplePropertyFromMemory(self):
"""Testing a minimal versioned property from memory"""
@@ -338,75 +338,75 @@ if libbe.TESTING == True:
required_saved_properties=required_saved_properties)
def content_type(): return {}
t = Test()
- self.failUnless(len(t.settings) == 0, len(t.settings))
+ self.assertTrue(len(t.settings) == 0, len(t.settings))
# accessing t.content_type triggers the priming, but
# t.storage.is_readable() == False, so nothing happens.
t.storage.readable = False
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(t.settings == {}, t.settings)
- self.failUnless(len(t.settings) == 0, len(t.settings))
- self.failUnless(t.content_type == None, t.content_type)
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(t.settings == {}, t.settings)
+ self.assertTrue(len(t.settings) == 0, len(t.settings))
+ self.assertTrue(t.content_type == None, t.content_type)
# accessing t.content_type triggers the priming again, and
# now that t.storage.is_readable() == True, this fills out
# t.settings with EMPTY data. At this point there should
# be one load and no saves.
t.storage.readable = True
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(len(t.settings) == 1, len(t.settings))
+ self.assertTrue(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
# an explicit call to load settings forces a reload,
# but nothing else changes.
t.load_settings()
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
+ self.assertTrue(len(t.settings) == 1, len(t.settings))
+ self.assertTrue(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(t.load_count == 2, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(t.load_count == 2, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
# now we set a value
t.content_type = 5
- self.failUnless(t.settings["Content-type"] == 5,
+ self.assertTrue(t.settings["Content-type"] == 5,
t.settings["Content-type"])
- self.failUnless(t.load_count == 2, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':5}], t.storage)
+ self.assertTrue(t.load_count == 2, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':5}], t.storage)
# getting its value changes nothing
- self.failUnless(t.content_type == 5, t.content_type)
- self.failUnless(t.settings["Content-type"] == 5,
+ self.assertTrue(t.content_type == 5, t.content_type)
+ self.assertTrue(t.settings["Content-type"] == 5,
t.settings["Content-type"])
- self.failUnless(t.load_count == 2, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':5}], t.storage)
+ self.assertTrue(t.load_count == 2, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':5}], t.storage)
# now we set another value
t.content_type = "text/plain"
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings["Content-type"] == "text/plain",
+ self.assertTrue(t.content_type == "text/plain", t.content_type)
+ self.assertTrue(t.settings["Content-type"] == "text/plain",
t.settings["Content-type"])
- self.failUnless(t.load_count == 2, t.load_count)
- self.failUnless(len(t.storage) == 2, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':5},
+ self.assertTrue(t.load_count == 2, t.load_count)
+ self.assertTrue(len(t.storage) == 2, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':5},
{'Content-type':'text/plain'}],
t.storage)
# t._get_saved_settings() returns a dict of required or
# non-default values.
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
# now we clear to the post-primed value
t.content_type = EMPTY
- self.failUnless(t.settings["Content-type"] == EMPTY,
+ self.assertTrue(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(len(t.settings) == 1, len(t.settings))
+ self.assertTrue(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t._get_saved_settings() == {},
+ self.assertTrue(t._get_saved_settings() == {},
t._get_saved_settings())
- self.failUnless(t.storage == [{'Content-type':5},
+ self.assertTrue(t.storage == [{'Content-type':5},
{'Content-type':'text/plain'},
{}],
t.storage)
@@ -433,16 +433,16 @@ if libbe.TESTING == True:
# which also pulls in prop-a.
t.prop_b = 'new-b'
settings = {'prop-b':'new-b', 'prop-a':'saved'}
- self.failUnless(t.settings == settings, t.settings)
- self.failUnless(t._get_saved_settings() == settings,
+ self.assertTrue(t.settings == settings, t.settings)
+ self.assertTrue(t._get_saved_settings() == settings,
t._get_saved_settings())
# test that _get_saved_settings() works even when settings
# were _not_ loaded beforehand
t = Test()
t.storage.append({'prop-a':'saved'})
settings ={'prop-a':'saved'}
- self.failUnless(t.settings == {}, t.settings)
- self.failUnless(t._get_saved_settings() == settings,
+ self.assertTrue(t.settings == {}, t.settings)
+ self.assertTrue(t._get_saved_settings() == settings,
t._get_saved_settings())
def testSimplePropertySetStorageSave(self):
"""Set a property, then attach storage and save"""
@@ -467,13 +467,13 @@ if libbe.TESTING == True:
t.prop_a = 'text/html'
t.storage = storage
t.save_settings()
- self.failUnless(t.prop_a == 'text/html', t.prop_a)
- self.failUnless(t.settings == {'prop-a':'text/html',
+ self.assertTrue(t.prop_a == 'text/html', t.prop_a)
+ self.assertTrue(t.settings == {'prop-a':'text/html',
'prop-b':EMPTY},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'prop-a':'text/html'}],
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'prop-a':'text/html'}],
t.storage)
def testDefaultingProperty(self):
"""Testing a defaulting versioned property"""
@@ -488,24 +488,24 @@ if libbe.TESTING == True:
required_saved_properties=required_saved_properties)
def content_type(): return {}
t = Test()
- self.failUnless(t.settings == {}, t.settings)
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings == {"Content-type":EMPTY},
+ self.assertTrue(t.settings == {}, t.settings)
+ self.assertTrue(t.content_type == "text/plain", t.content_type)
+ self.assertTrue(t.settings == {"Content-type":EMPTY},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
- self.failUnless(t._get_saved_settings() == {},
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t._get_saved_settings() == {},
t._get_saved_settings())
t.content_type = "text/html"
- self.failUnless(t.content_type == "text/html",
+ self.assertTrue(t.content_type == "text/html",
t.content_type)
- self.failUnless(t.settings == {"Content-type":"text/html"},
+ self.assertTrue(t.settings == {"Content-type":"text/html"},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':'text/html'}],
t.storage)
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
def testRequiredDefaultingProperty(self):
@@ -522,25 +522,25 @@ if libbe.TESTING == True:
require_save=True)
def content_type(): return {}
t = Test()
- self.failUnless(t.settings == {}, t.settings)
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings == {"Content-type":EMPTY},
+ self.assertTrue(t.settings == {}, t.settings)
+ self.assertTrue(t.content_type == "text/plain", t.content_type)
+ self.assertTrue(t.settings == {"Content-type":EMPTY},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
t.content_type = "text/html"
- self.failUnless(t.content_type == "text/html",
+ self.assertTrue(t.content_type == "text/html",
t.content_type)
- self.failUnless(t.settings == {"Content-type":"text/html"},
+ self.assertTrue(t.settings == {"Content-type":"text/html"},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':'text/html'}],
t.storage)
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
def testClassVersionedPropertyDefinition(self):
@@ -564,18 +564,18 @@ if libbe.TESTING == True:
require_save=True)
def content_type(): return {}
t = Test()
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
t.content_type = "text/html"
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':'text/html'}],
t.storage)
def testMutableChangeHookedProperty(self):
"""Testing a mutable change-hooked property"""
@@ -591,26 +591,26 @@ if libbe.TESTING == True:
required_saved_properties=required_saved_properties)
def list_type(): return {}
t = Test()
- self.failUnless(len(t.storage) == 0, len(t.storage))
- self.failUnless(t.list_type == None, t.list_type)
- self.failUnless(len(t.storage) == 0, len(t.storage))
- self.failUnless(t.settings["List-type"]==EMPTY,
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.list_type == None, t.list_type)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.settings["List-type"]==EMPTY,
t.settings["List-type"])
t.list_type = []
- self.failUnless(t.settings["List-type"] == [],
+ self.assertTrue(t.settings["List-type"] == [],
t.settings["List-type"])
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'List-type':[]}],
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'List-type':[]}],
t.storage)
t.list_type.append(5) # external modification not detected yet
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'List-type':[]}],
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'List-type':[]}],
t.storage)
- self.failUnless(t.settings["List-type"] == [5],
+ self.assertTrue(t.settings["List-type"] == [5],
t.settings["List-type"])
- self.failUnless(t.list_type == [5], t.list_type)# get triggers save
- self.failUnless(len(t.storage) == 2, len(t.storage))
- self.failUnless(t.storage == [{'List-type':[]},
+ self.assertTrue(t.list_type == [5], t.list_type)# get triggers save
+ self.assertTrue(len(t.storage) == 2, len(t.storage))
+ self.assertTrue(t.storage == [{'List-type':[]},
{'List-type':[5]}],
t.storage)
diff --git a/libbe/storage/util/upgrade.py b/libbe/storage/util/upgrade.py
index c3a5df1..12d177a 100644
--- a/libbe/storage/util/upgrade.py
+++ b/libbe/storage/util/upgrade.py
@@ -49,14 +49,14 @@ def generate_yaml_mapfile(map):
>>> generate_yaml_mapfile({'q':'p'})
'q: p\\n\\n'
- >>> generate_yaml_mapfile({'q':u'Fran\u00e7ais'})
+ >>> generate_yaml_mapfile({'q':u'Fran\\u00e7ais'})
'q: Fran\\xc3\\xa7ais\\n\\n'
>>> generate_yaml_mapfile({'q':u'hello'})
'q: hello\\n\\n'
"""
if yaml is None:
raise _yaml_import_error
- keys = map.keys()
+ keys = list(map.keys())
keys.sort()
for key in keys:
try:
@@ -66,9 +66,9 @@ def generate_yaml_mapfile(map):
assert(':' not in key)
assert(len(key) > 0)
except AssertionError:
- raise ValueError(unicode(key).encode('unicode_escape'))
+ raise ValueError(str(key).encode('unicode_escape'))
if '\n' in map[key]:
- raise ValueError(unicode(map[key]).encode('unicode_escape'))
+ raise ValueError(str(map[key]).encode('unicode_escape'))
lines = []
for key in keys:
@@ -94,7 +94,7 @@ def parse_yaml_mapfile(contents):
'd'
>>> dict['e']
'f'
- >>> contents = generate_yaml_mapfile({'q':u'Fran\u00e7ais'})
+ >>> contents = generate_yaml_mapfile({'q':u'Fran\\u00e7ais'})
>>> dict = parse_yaml_mapfile(contents)
>>> dict['q']
u'Fran\\xe7ais'
@@ -102,7 +102,7 @@ def parse_yaml_mapfile(contents):
if yaml is None:
raise _yaml_import_error
c = yaml.safe_load(contents)
- if type(c) == types.StringType:
+ if type(c) == bytes:
raise mapfile.InvalidMapfileContents(
'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents)
return c or {}
@@ -146,8 +146,8 @@ class Upgrader (object):
self.vcs._vcs_update(path)
def upgrade(self):
- print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \
- % (self.initial_version, self.final_version)
+ print('upgrading bugdir from "%s" to "%s"' \
+ % (self.initial_version, self.final_version), file=sys.stderr)
self.check_initial_version()
self.set_version()
self._upgrade()
@@ -191,7 +191,7 @@ class Upgrade_1_0_to_1_1 (Upgrader):
contents = '\n'.join(newlines)
# load the YAML and save
map = parse_yaml_mapfile(contents)
- if type(map) == types.StringType:
+ if type(map) == bytes:
raise ValueError((path, contents))
contents = generate_yaml_mapfile(map)
encoding.set_file_contents(path, contents)
@@ -317,7 +317,7 @@ class Upgrade_1_2_to_1_3 (Upgrader):
for bug_uuid in os.listdir(self.get_path('bugs')):
self._upgrade_bug_mapfile(bug_uuid)
self._upgrade_bugdir_mapfile()
- for bug in self._targets.values():
+ for bug in list(self._targets.values()):
self._save_bug_settings(bug)
class Upgrade_1_3_to_1_4 (Upgrader):
@@ -418,11 +418,9 @@ def upgrade(path, current_version,
use consecutive conversion functions.
"""
if current_version not in STORAGE_VERSIONS:
- raise NotImplementedError, \
- "Cannot handle version '%s' yet." % current_version
+ raise NotImplementedError("Cannot handle version '%s' yet." % current_version)
if target_version not in STORAGE_VERSIONS:
- raise NotImplementedError, \
- "Cannot handle version '%s' yet." % current_version
+ raise NotImplementedError("Cannot handle version '%s' yet." % current_version)
if (current_version, target_version) in upgrade_classes:
# direct conversion
@@ -438,9 +436,8 @@ def upgrade(path, current_version,
try:
upgrade_class = upgrade_classes[(version_a, version_b)]
except KeyError:
- raise NotImplementedError, \
- "Cannot convert version '%s' to '%s' yet." \
- % (version_a, version_b)
+ raise NotImplementedError("Cannot convert version '%s' to '%s' yet." \
+ % (version_a, version_b))
u = upgrade_class(path)
u.upgrade()
if version_b == target_version:
diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py
index 2fd8dc4..56c66b3 100644
--- a/libbe/storage/vcs/__init__.py
+++ b/libbe/storage/vcs/__init__.py
@@ -32,7 +32,7 @@ The base `VCS` class also serves as a filesystem Storage backend (not
versioning) in the event that a user has no VCS installed.
"""
-import base
+from . import base
set_preferred_vcs = base.set_preferred_vcs
vcs_by_name = base.vcs_by_name
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
index 687575f..dfd007f 100644
--- a/libbe/storage/vcs/base.py
+++ b/libbe/storage/vcs/base.py
@@ -255,7 +255,7 @@ class CachedPathID (object):
def disconnect(self):
if self._changed == True:
f = codecs.open(self._cache_path, 'w', self.encoding)
- for uuid,path in self._cache.items():
+ for uuid,path in list(self._cache.items()):
f.write('%s\t%s\n' % (uuid, path))
f.close()
self._cache = {}
@@ -558,12 +558,12 @@ class VCS (libbe.storage.base.VersionedStorage):
for num in num_part.split('.'):
try:
self._parsed_version.append(int(num))
- except ValueError, e:
+ except ValueError as e:
# bzr version number might contain non-numerical tags
splitter = re.compile(r'[\D]') # Match non-digits
splits = splitter.split(num)
# if len(tag) > 1 some splits will be empty; remove
- splits = filter(lambda s: s != '', splits)
+ splits = [s for s in splits if s != '']
tag_starti = len(splits[0])
num_starti = num.find(splits[1], tag_starti)
tag = num[tag_starti:num_starti]
@@ -573,7 +573,7 @@ class VCS (libbe.storage.base.VersionedStorage):
for current,other in zip(self._parsed_version, args):
if type(current) != type (other):
# one of them is a pre-release string
- if type(current) != types.IntType:
+ if type(current) != int:
return -1
else:
return 1
@@ -586,12 +586,12 @@ class VCS (libbe.storage.base.VersionedStorage):
if verlen == arglen:
return 0
elif verlen > arglen:
- if type(self._parsed_version[arglen]) != types.IntType:
+ if type(self._parsed_version[arglen]) != int:
return -1 # self is a prerelease
else:
return 1
else:
- if type(args[verlen]) != types.IntType:
+ if type(args[verlen]) != int:
return 1 # args is a prerelease
else:
return -1
@@ -732,7 +732,7 @@ class VCS (libbe.storage.base.VersionedStorage):
if revision == None:
try:
path = self.path(id, revision, relpath=False)
- except InvalidID, e:
+ except InvalidID as e:
return False
return os.path.exists(path)
path = self.path(id, revision, relpath=True)
@@ -763,7 +763,7 @@ class VCS (libbe.storage.base.VersionedStorage):
if os.path.exists(path):
shutil.rmtree(path)
path = self._cached_path_id.path(id, relpath=True)
- for id,p in self._cached_path_id._cache.items():
+ for id,p in list(self._cached_path_id._cache.items()):
if p.startswith(path):
self._cached_path_id.remove_id(id)
@@ -818,7 +818,7 @@ class VCS (libbe.storage.base.VersionedStorage):
try:
relpath = self.path(id, revision, relpath=True)
contents = self._vcs_get_file_contents(relpath, revision)
- except InvalidID, e:
+ except InvalidID as e:
if default == libbe.util.InvalidObject:
raise e
return default
@@ -833,7 +833,7 @@ class VCS (libbe.storage.base.VersionedStorage):
def _set(self, id, value):
try:
path = self._cached_path_id.path(id)
- except InvalidID, e:
+ except InvalidID as e:
raise
if not os.path.exists(path):
raise InvalidID(id)
@@ -923,7 +923,7 @@ class VCS (libbe.storage.base.VersionedStorage):
"""
try:
ret = search_parent_directories(path, filename)
- except AssertionError, e:
+ except AssertionError as e:
return None
return ret
@@ -1054,8 +1054,8 @@ class VCS (libbe.storage.base.VersionedStorage):
path, decode=True).rstrip()
relpath = self._u_rel_path(path)
contents = self._vcs_get_file_contents(relpath, revision=revision)
- if type(contents) != types.UnicodeType:
- contents = unicode(contents, self.encoding)
+ if type(contents) != str:
+ contents = str(contents, self.encoding)
return contents.strip()
def _setup_storage_version(self):
@@ -1104,7 +1104,7 @@ if libbe.TESTING == True:
def test_installed(self):
"""See if the VCS is installed.
"""
- self.failUnless(self.s.installed() == True,
+ self.assertTrue(self.s.installed() == True,
'%(name)s VCS not found' % vars(self.Class))
@@ -1114,7 +1114,7 @@ if libbe.TESTING == True:
"""
if self.s.installed():
self.s.disconnect()
- self.failUnless(self.s._detect(self.dirname) == True,
+ self.assertTrue(self.s._detect(self.dirname) == True,
'Did not detected %(name)s VCS after initialising'
% vars(self.Class))
self.s.connect()
@@ -1125,7 +1125,7 @@ if libbe.TESTING == True:
if self.s.installed() and self.Class.name != 'None':
self.s.disconnect()
self.s.destroy()
- self.failUnless(self.s._detect(self.dirname) == False,
+ self.assertTrue(self.s._detect(self.dirname) == False,
'Detected %(name)s VCS before initialising'
% vars(self.Class))
self.s.init()
@@ -1136,7 +1136,7 @@ if libbe.TESTING == True:
rp = os.path.realpath(self.s.repo)
dp = os.path.realpath(self.dirname)
vcs_name = self.Class.name
- self.failUnless(
+ self.assertTrue(
dp == rp or rp == None,
"%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
@@ -1151,7 +1151,7 @@ if libbe.TESTING == True:
return
name,email = libbe.ui.util.user.parse_user_id(user_id)
if email != None:
- self.failUnless('@' in email, email)
+ self.assertTrue('@' in email, email)
def make_vcs_testcase_subclasses(vcs_class, namespace):
c = vcs_class()
@@ -1167,7 +1167,7 @@ if libbe.TESTING == True:
# Make VCSTestCase subclasses for vcs_class in the namespace.
vcs_testcase_classes = [
c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
+ ob for ob in list(globals().values()) if isinstance(ob, type))
if issubclass(c, VCSTestCase) \
and c.Class == VCS]
diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py
index 0c92058..6da299a 100644
--- a/libbe/storage/vcs/bzr.py
+++ b/libbe/storage/vcs/bzr.py
@@ -39,11 +39,11 @@ import os
import os.path
import re
import shutil
-import StringIO
+import io
import sys
import libbe
-import base
+from . import base
if libbe.TESTING == True:
import doctest
@@ -84,7 +84,7 @@ class Bzr(base.VCS):
def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
cmd = bzrlib.builtins.cmd_root()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cmd.run(filename=path)
if self.version_cmp(2,2,0) < 0:
cmd.cleanup_now()
@@ -92,7 +92,7 @@ class Bzr(base.VCS):
def _vcs_init(self, path):
cmd = bzrlib.builtins.cmd_init()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cmd.run(location=path)
if self.version_cmp(2,2,0) < 0:
cmd.cleanup_now()
@@ -105,7 +105,7 @@ class Bzr(base.VCS):
def _vcs_add(self, path):
path = os.path.join(self.repo, path)
cmd = bzrlib.builtins.cmd_add()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
kwargs = {'file_ids_from': self.repo}
if self.repo == os.path.realpath(os.getcwd()):
# Work around bzr file locking on Windows.
@@ -126,7 +126,7 @@ class Bzr(base.VCS):
# --force to also remove unversioned files.
path = os.path.join(self.repo, path)
cmd = bzrlib.builtins.cmd_remove()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cmd.run(file_list=[path], file_deletion_strategy='force')
if self.version_cmp(2,2,0) < 0:
cmd.cleanup_now()
@@ -150,14 +150,14 @@ class Bzr(base.VCS):
path = os.path.join(self.repo, path)
revision = self._parse_revision_string(revision)
cmd = bzrlib.builtins.cmd_cat()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
if self.version_cmp(1,6,0) < 0:
# old bzrlib cmd_cat uses sys.stdout not self.outf for output.
stdout = sys.stdout
sys.stdout = cmd.outf
try:
cmd.run(filename=path, revision=revision)
- except bzrlib.errors.BzrCommandError, e:
+ except bzrlib.errors.BzrCommandError as e:
if 'not present in revision' in str(e):
raise base.InvalidPath(path, root=self.repo, revision=revision)
raise
@@ -177,7 +177,7 @@ class Bzr(base.VCS):
def _vcs_isdir(self, path, revision):
try:
self._vcs_listdir(path, revision)
- except AttributeError, e:
+ except AttributeError as e:
if 'children' in str(e):
return False
raise
@@ -187,7 +187,7 @@ class Bzr(base.VCS):
path = os.path.join(self.repo, path)
revision = self._parse_revision_string(revision)
cmd = bzrlib.builtins.cmd_ls()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
try:
if self.version_cmp(2,0,0) >= 0:
cmd.run(revision=revision, path=path, recursive=recursive)
@@ -197,7 +197,7 @@ class Bzr(base.VCS):
# (https://bugs.launchpad.net/bzr/+bug/158690)
cmd.run(revision=revision, path=path,
non_recursive=False)
- except bzrlib.errors.BzrCommandError, e:
+ except bzrlib.errors.BzrCommandError as e:
if 'not present in revision' in str(e):
raise base.InvalidPath(path, root=self.repo, revision=revision)
raise
@@ -212,12 +212,12 @@ class Bzr(base.VCS):
def _vcs_commit(self, commitfile, allow_empty=False):
cmd = bzrlib.builtins.cmd_commit()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cwd = os.getcwd()
os.chdir(self.repo)
try:
cmd.run(file=commitfile, unchanged=allow_empty)
- except bzrlib.errors.BzrCommandError, e:
+ except bzrlib.errors.BzrCommandError as e:
strings = ['no changes to commit.', # bzr 1.3.1
'No changes to commit.'] # bzr 1.15.1
if self._u_any_in_string(strings, str(e)) == True:
@@ -231,7 +231,7 @@ class Bzr(base.VCS):
def _vcs_revision_id(self, index):
cmd = bzrlib.builtins.cmd_revno()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cmd.run(location=self.repo)
if self.version_cmp(2,2,0) < 0:
cmd.cleanup_now()
@@ -245,7 +245,7 @@ class Bzr(base.VCS):
def _diff(self, revision):
revision = self._parse_revision_string(revision)
cmd = bzrlib.builtins.cmd_diff()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
# for some reason, cmd_diff uses sys.stdout not self.outf for output.
stdout = sys.stdout
sys.stdout = cmd.outf
diff --git a/libbe/storage/vcs/darcs.py b/libbe/storage/vcs/darcs.py
index ec100b4..41262f5 100644
--- a/libbe/storage/vcs/darcs.py
+++ b/libbe/storage/vcs/darcs.py
@@ -104,10 +104,10 @@ class Darcs(base.VCS):
for num in num_part.split('.'):
try:
self._parsed_version.append(int(num))
- except ValueError, e:
+ except ValueError as e:
self._parsed_version.append(num)
for current,other in zip(self._parsed_version, args):
- if type(current) != types.IntType:
+ if type(current) != int:
raise NotImplementedError(
'Cannot parse non-integer portion "%s" of Darcs version "%s"'
% (current, self.version()))
@@ -284,7 +284,7 @@ class Darcs(base.VCS):
assert patch.tag == 'patch', patch.tag
for child in patch.getchildren():
if child.tag == 'name':
- text = unescape(unicode(child.text).decode('unicode_escape').strip())
+ text = unescape(str(child.text).decode('unicode_escape').strip())
revisions.append(text)
revisions.reverse()
return revisions
diff --git a/libbe/storage/vcs/git.py b/libbe/storage/vcs/git.py
index 851af19..9a15c92 100644
--- a/libbe/storage/vcs/git.py
+++ b/libbe/storage/vcs/git.py
@@ -32,7 +32,7 @@ import unittest
try:
import pygit2 as _pygit2
-except ImportError, error:
+except ImportError as error:
_pygit2 = None
_pygit2_import_error = error
else:
@@ -65,7 +65,7 @@ class PygitGit(base.VCS):
Using :py:mod:`pygit2` for the Git activity.
"""
name='pygit2'
- _null_hex = u'0' * 40
+ _null_hex = '0' * 40
def __init__(self, *args, **kwargs):
base.VCS.__init__(self, *args, **kwargs)
@@ -162,7 +162,7 @@ class PygitGit(base.VCS):
def _git_get_commit(self, revision):
if isinstance(revision, str):
- revision = unicode(revision, 'ascii')
+ revision = str(revision, 'ascii')
commit = self._pygit_repository.revparse_single(revision)
assert commit.type == _pygit2.GIT_OBJ_COMMIT, commit
return commit
diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py
index f2976ff..0ebdfb0 100644
--- a/libbe/storage/vcs/hg.py
+++ b/libbe/storage/vcs/hg.py
@@ -47,12 +47,12 @@ import os
import os.path
import re
import shutil
-import StringIO
+import io
import sys
import time # work around http://mercurial.selenic.com/bts/issue618
import libbe
-import base
+from . import base
if libbe.TESTING == True:
import doctest
@@ -85,7 +85,7 @@ class Hg(base.VCS):
fullargs = ['--cwd', kwargs['cwd']]
fullargs.extend(args)
cwd = os.getcwd()
- output = StringIO.StringIO()
+ output = io.StringIO()
if self.version_cmp(1,9) >= 0:
req = mercurial.dispatch.request(fullargs, fout=output)
mercurial.dispatch.dispatch(req)