aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage/base.py
diff options
context:
space:
mode:
authorMatěj Cepl <mcepl@cepl.eu>2024-01-18 19:09:58 +0100
committerMatěj Cepl <mcepl@cepl.eu>2024-01-18 19:09:58 +0100
commitcc7362d28bd9c43cb6839809f86e59874f2fe458 (patch)
tree1053748f5890c769d1e99806bc0c8bcffca6f0fe /libbe/storage/base.py
parent003bd13629d9db2f14156f97b74a4672e9ecdf77 (diff)
downloadbugseverywhere-cc7362d28bd9c43cb6839809f86e59874f2fe458.tar.gz
2to3 conversion of the repo.
Diffstat (limited to 'libbe/storage/base.py')
-rw-r--r--libbe/storage/base.py94
1 files changed, 47 insertions, 47 deletions
diff --git a/libbe/storage/base.py b/libbe/storage/base.py
index 977179d..4c05287 100644
--- a/libbe/storage/base.py
+++ b/libbe/storage/base.py
@@ -198,7 +198,7 @@ class Storage (object):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
root = Entry(id='__ROOT__', directory=True)
d = {root.id:root}
- pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1)
+ pickle.dump(dict((k,v._objects_to_ids()) for k,v in list(d.items())), f, -1)
f.close()
def destroy(self):
@@ -223,7 +223,7 @@ class Storage (object):
except IOError:
raise ConnectionError(self)
d = pickle.load(f)
- self._data = dict((k,v._ids_to_objects(d)) for k,v in d.items())
+ self._data = dict((k,v._ids_to_objects(d)) for k,v in list(d.items()))
f.close()
def disconnect(self):
@@ -238,7 +238,7 @@ class Storage (object):
def _disconnect(self):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
pickle.dump(dict((k,v._objects_to_ids())
- for k,v in self._data.items()), f, -1)
+ for k,v in list(self._data.items())), f, -1)
f.close()
self._data = None
@@ -336,9 +336,9 @@ class Storage (object):
decode = False
value = self._get(*args, **kwargs)
if value != None:
- if decode == True and type(value) != types.UnicodeType:
- return unicode(value, self.encoding)
- elif decode == False and type(value) != types.StringType:
+ if decode == True and type(value) != str:
+ return str(value, self.encoding)
+ elif decode == False and type(value) != bytes:
return value.encode(self.encoding)
return value
@@ -355,7 +355,7 @@ class Storage (object):
"""
if self.is_writeable() == False:
raise NotWriteable('Cannot set entry in unwriteable storage.')
- if type(value) == types.UnicodeType:
+ if type(value) == str:
value = value.encode(self.encoding)
self._set(id, value, *args, **kwargs)
@@ -386,7 +386,7 @@ class VersionedStorage (Storage):
summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit')
body = Entry(id='__COMMIT__BODY__')
initial_commit = {root.id:root, summary.id:summary, body.id:body}
- d = dict((k,v._objects_to_ids()) for k,v in initial_commit.items())
+ d = dict((k,v._objects_to_ids()) for k,v in list(initial_commit.items()))
pickle.dump([d, copy.deepcopy(d)], f, -1) # [inital tree, working tree]
f.close()
@@ -396,14 +396,14 @@ class VersionedStorage (Storage):
except IOError:
raise ConnectionError(self)
d = pickle.load(f)
- self._data = [dict((k,v._ids_to_objects(t)) for k,v in t.items())
+ self._data = [dict((k,v._ids_to_objects(t)) for k,v in list(t.items()))
for t in d]
f.close()
def _disconnect(self):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
pickle.dump([dict((k,v._objects_to_ids())
- for k,v in t.items()) for t in self._data], f, -1)
+ for k,v in list(t.items())) for t in self._data], f, -1)
f.close()
self._data = None
@@ -527,7 +527,7 @@ class VersionedStorage (Storage):
new = []
modified = []
removed = []
- for id,value in self._data[int(revision)].items():
+ for id,value in list(self._data[int(revision)].items()):
if id.startswith('__'):
continue
if not id in self._data[-1]:
@@ -624,7 +624,7 @@ if TESTING == True:
def test_initially_empty(self):
"""New repository should be empty."""
- self.failUnless(len(self.s.children()) == 0, self.s.children())
+ self.assertTrue(len(self.s.children()) == 0, self.s.children())
def test_add_identical_rooted(self):
"""Adding entries with the same ID should not increase the number of children.
@@ -632,7 +632,7 @@ if TESTING == True:
for i in range(10):
self.s.add('some id', directory=False)
s = sorted(self.s.children())
- self.failUnless(s == ['some id'], s)
+ self.assertTrue(s == ['some id'], s)
def test_add_rooted(self):
"""Adding entries should increase the number of children (rooted).
@@ -642,7 +642,7 @@ if TESTING == True:
ids.append(str(i))
self.s.add(ids[-1], directory=(i % 2 == 0))
s = sorted(self.s.children())
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_add_nonrooted(self):
"""Adding entries should increase the number of children (nonrooted).
@@ -653,9 +653,9 @@ if TESTING == True:
ids.append(str(i))
self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
def test_ancestors(self):
"""Check ancestors lists.
@@ -668,7 +668,7 @@ if TESTING == True:
j_id = str(20*(i+1)+j)
self.s.add(j_id, i_id, directory=(i%2 == 0))
ancestors = sorted(self.s.ancestors(j_id))
- self.failUnless(ancestors == [i_id, 'parent'],
+ self.assertTrue(ancestors == [i_id, 'parent'],
'Unexpected ancestors for %s/%s, "%s"'
% (i_id, j_id, ancestors))
@@ -681,7 +681,7 @@ if TESTING == True:
ids.append('parent/%s' % str(i))
self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_grandchildren(self):
"""Grandchildren should not be returned as children.
@@ -699,7 +699,7 @@ if TESTING == True:
directory = (j % 2 == 0)
self.s.add(grandchild, child, directory=directory)
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_add_invalid_directory(self):
"""Should not be able to add children to non-directories.
@@ -719,7 +719,7 @@ if TESTING == True:
% (vars(self.Class)['name']))
except InvalidDirectory:
pass
- self.failUnless(len(self.s.children('parent')) == 0,
+ self.assertTrue(len(self.s.children('parent')) == 0,
self.s.children('parent'))
def test_remove_rooted(self):
@@ -732,7 +732,7 @@ if TESTING == True:
for i in range(10):
self.s.remove(ids.pop())
s = sorted(self.s.children())
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_remove_nonrooted(self):
"""Removing entries should decrease the number of children (nonrooted).
@@ -745,10 +745,10 @@ if TESTING == True:
for i in range(10):
self.s.remove(ids.pop())
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
if len(s) > 0:
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
def test_remove_directory_not_empty(self):
"""Removing a non-empty directory entry should raise exception.
@@ -778,7 +778,7 @@ if TESTING == True:
self.s.add(str(20*(i+1)+j), ids[-1], directory=(i%2 == 0))
self.s.recursive_remove('parent')
s = sorted(self.s.children())
- self.failUnless(s == [], s)
+ self.assertTrue(s == [], s)
class Storage_get_set_TestCase (StorageTestCase):
"""Test cases for Storage.get and .set methods."""
@@ -790,7 +790,7 @@ if TESTING == True:
"""Get should return specified default if id not in Storage.
"""
ret = self.s.get(self.id, default=self.val)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
@@ -811,7 +811,7 @@ if TESTING == True:
self.s.add(self.id, directory=False)
val = 'UNLIKELY DEFAULT'
ret = self.s.get(self.id, default=val)
- self.failUnless(ret == val,
+ self.assertTrue(ret == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, val))
@@ -832,29 +832,29 @@ if TESTING == True:
self.s.add(self.id, directory=False)
self.s.set(self.id, self.val)
ret = self.s.get(self.id)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
def test_unicode_set(self):
"""Set should define the value returned by get.
"""
- val = u'Fran\xe7ois'
+ val = 'Fran\xe7ois'
self.s.add(self.id, directory=False)
self.s.set(self.id, val)
ret = self.s.get(self.id, decode=True)
- self.failUnless(type(ret) == types.UnicodeType,
+ self.assertTrue(type(ret) == str,
"%s.get() returned %s not UnicodeType"
% (vars(self.Class)['name'], type(ret)))
- self.failUnless(ret == val,
+ self.assertTrue(ret == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
ret = self.s.get(self.id)
- self.failUnless(type(ret) == types.StringType,
+ self.assertTrue(type(ret) == bytes,
"%s.get() returned %s not StringType"
% (vars(self.Class)['name'], type(ret)))
- s = unicode(ret, self.s.encoding)
- self.failUnless(s == val,
+ s = str(ret, self.s.encoding)
+ self.assertTrue(s == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], s, self.val))
@@ -873,7 +873,7 @@ if TESTING == True:
self.s.disconnect()
self.s.connect()
ret = self.s.get(self.id)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
@@ -886,7 +886,7 @@ if TESTING == True:
self.s.connect()
default = 'UNLIKELY DEFAULT'
ret = self.s.get(self.id, default=default)
- self.failUnless(ret in ['', default],
+ self.assertTrue(ret in ['', default],
"%s.get() returned %s not in %s"
% (vars(self.Class)['name'], ret, ['', default]))
@@ -901,9 +901,9 @@ if TESTING == True:
self.s.disconnect()
self.s.connect()
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
class VersionedStorageTestCase (StorageTestCase):
"""Test cases for VersionedStorage methods."""
@@ -972,12 +972,12 @@ if TESTING == True:
self.commit_body))
for i in range(10):
rev = self.s.revision_id(i+1)
- self.failUnless(rev == revs[i],
+ self.assertTrue(rev == revs[i],
"%s.revision_id(%d) returned %s not %s"
% (vars(self.Class)['name'], i+1, rev, revs[i]))
for i in range(-1, -9, -1):
rev = self.s.revision_id(i)
- self.failUnless(rev == revs[i],
+ self.assertTrue(rev == revs[i],
"%s.revision_id(%d) returned %s not %s"
% (vars(self.Class)['name'], i, rev, revs[i]))
@@ -994,7 +994,7 @@ if TESTING == True:
self.commit_body))
for i in range(10):
ret = self.s.get(self.id, revision=revs[i])
- self.failUnless(ret == val(i),
+ self.assertTrue(ret == val(i),
"%s.get() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret, val(i), revs[i]))
@@ -1015,7 +1015,7 @@ if TESTING == True:
children.append(list(cur_children))
for i in range(10):
ret = sorted(self.s.children('parent', revision=revs[i]))
- self.failUnless(ret == children[i],
+ self.assertTrue(ret == children[i],
"%s.children() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret,
children[i], revs[i]))
@@ -1047,7 +1047,7 @@ if TESTING == True:
self.commit_body))
for rev,cur_children in zip(revs, children):
ret = sorted(self.s.children('parent', revision=rev))
- self.failUnless(ret == cur_children,
+ self.assertTrue(ret == cur_children,
"%s.children() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret,
cur_children, rev))
@@ -1074,18 +1074,18 @@ if TESTING == True:
self.s.remove('removed')
revB = self.s.commit('Final state')
new,mod,rem = self.s.changed(revA)
- self.failUnless(sorted(new) == ['moved2', 'new'],
+ self.assertTrue(sorted(new) == ['moved2', 'new'],
'Unexpected new: %s' % new)
- self.failUnless(mod == ['modified'],
+ self.assertTrue(mod == ['modified'],
'Unexpected modified: %s' % mod)
- self.failUnless(sorted(rem) == ['moved', 'removed'],
+ self.assertTrue(sorted(rem) == ['moved', 'removed'],
'Unexpected removed: %s' % rem)
def make_storage_testcase_subclasses(storage_class, namespace):
"""Make StorageTestCase subclasses for storage_class in namespace."""
storage_testcase_classes = [
c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
+ ob for ob in list(globals().values()) if isinstance(ob, type))
if issubclass(c, StorageTestCase) \
and c.Class == Storage]
@@ -1102,7 +1102,7 @@ if TESTING == True:
"""Make VersionedStorageTestCase subclasses for storage_class in namespace."""
storage_testcase_classes = [
c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
+ ob for ob in list(globals().values()) if isinstance(ob, type))
if ((issubclass(c, StorageTestCase) \
and c.Class == Storage)
or