diff options
Diffstat (limited to 'libbe/storage')
-rw-r--r-- | libbe/storage/__init__.py | 2 | ||||
-rw-r--r-- | libbe/storage/base.py | 94 | ||||
-rw-r--r-- | libbe/storage/util/config.py | 8 | ||||
-rw-r--r-- | libbe/storage/util/mapfile.py | 4 | ||||
-rw-r--r-- | libbe/storage/util/properties.py | 150 | ||||
-rw-r--r-- | libbe/storage/util/settings_object.py | 184 | ||||
-rw-r--r-- | libbe/storage/util/upgrade.py | 31 | ||||
-rw-r--r-- | libbe/storage/vcs/__init__.py | 2 | ||||
-rw-r--r-- | libbe/storage/vcs/base.py | 38 | ||||
-rw-r--r-- | libbe/storage/vcs/bzr.py | 30 | ||||
-rw-r--r-- | libbe/storage/vcs/darcs.py | 6 | ||||
-rw-r--r-- | libbe/storage/vcs/git.py | 6 | ||||
-rw-r--r-- | libbe/storage/vcs/hg.py | 6 |
13 files changed, 279 insertions, 282 deletions
diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py index 01885e9..baed80b 100644 --- a/libbe/storage/__init__.py +++ b/libbe/storage/__init__.py @@ -53,7 +53,7 @@ STORAGE_VERSIONS = ['Bugs Everywhere Tree 1 0', STORAGE_VERSION = STORAGE_VERSIONS[-1] def get_vcs_storage(location): - import vcs + from . import vcs s = vcs.detect_vcs(location) s.repo = location return s diff --git a/libbe/storage/base.py b/libbe/storage/base.py index 977179d..4c05287 100644 --- a/libbe/storage/base.py +++ b/libbe/storage/base.py @@ -198,7 +198,7 @@ class Storage (object): f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') root = Entry(id='__ROOT__', directory=True) d = {root.id:root} - pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1) + pickle.dump(dict((k,v._objects_to_ids()) for k,v in list(d.items())), f, -1) f.close() def destroy(self): @@ -223,7 +223,7 @@ class Storage (object): except IOError: raise ConnectionError(self) d = pickle.load(f) - self._data = dict((k,v._ids_to_objects(d)) for k,v in d.items()) + self._data = dict((k,v._ids_to_objects(d)) for k,v in list(d.items())) f.close() def disconnect(self): @@ -238,7 +238,7 @@ class Storage (object): def _disconnect(self): f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') pickle.dump(dict((k,v._objects_to_ids()) - for k,v in self._data.items()), f, -1) + for k,v in list(self._data.items())), f, -1) f.close() self._data = None @@ -336,9 +336,9 @@ class Storage (object): decode = False value = self._get(*args, **kwargs) if value != None: - if decode == True and type(value) != types.UnicodeType: - return unicode(value, self.encoding) - elif decode == False and type(value) != types.StringType: + if decode == True and type(value) != str: + return str(value, self.encoding) + elif decode == False and type(value) != bytes: return value.encode(self.encoding) return value @@ -355,7 +355,7 @@ class Storage (object): """ if self.is_writeable() == False: raise NotWriteable('Cannot set entry in unwriteable storage.') - if type(value) == types.UnicodeType: + if type(value) == str: value = value.encode(self.encoding) self._set(id, value, *args, **kwargs) @@ -386,7 +386,7 @@ class VersionedStorage (Storage): summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit') body = Entry(id='__COMMIT__BODY__') initial_commit = {root.id:root, summary.id:summary, body.id:body} - d = dict((k,v._objects_to_ids()) for k,v in initial_commit.items()) + d = dict((k,v._objects_to_ids()) for k,v in list(initial_commit.items())) pickle.dump([d, copy.deepcopy(d)], f, -1) # [inital tree, working tree] f.close() @@ -396,14 +396,14 @@ class VersionedStorage (Storage): except IOError: raise ConnectionError(self) d = pickle.load(f) - self._data = [dict((k,v._ids_to_objects(t)) for k,v in t.items()) + self._data = [dict((k,v._ids_to_objects(t)) for k,v in list(t.items())) for t in d] f.close() def _disconnect(self): f = open(os.path.join(self.repo, 'repo.pkl'), 'wb') pickle.dump([dict((k,v._objects_to_ids()) - for k,v in t.items()) for t in self._data], f, -1) + for k,v in list(t.items())) for t in self._data], f, -1) f.close() self._data = None @@ -527,7 +527,7 @@ class VersionedStorage (Storage): new = [] modified = [] removed = [] - for id,value in self._data[int(revision)].items(): + for id,value in list(self._data[int(revision)].items()): if id.startswith('__'): continue if not id in self._data[-1]: @@ -624,7 +624,7 @@ if TESTING == True: def test_initially_empty(self): """New repository should be empty.""" - self.failUnless(len(self.s.children()) == 0, self.s.children()) + self.assertTrue(len(self.s.children()) == 0, self.s.children()) def test_add_identical_rooted(self): """Adding entries with the same ID should not increase the number of children. @@ -632,7 +632,7 @@ if TESTING == True: for i in range(10): self.s.add('some id', directory=False) s = sorted(self.s.children()) - self.failUnless(s == ['some id'], s) + self.assertTrue(s == ['some id'], s) def test_add_rooted(self): """Adding entries should increase the number of children (rooted). @@ -642,7 +642,7 @@ if TESTING == True: ids.append(str(i)) self.s.add(ids[-1], directory=(i % 2 == 0)) s = sorted(self.s.children()) - self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids)) def test_add_nonrooted(self): """Adding entries should increase the number of children (nonrooted). @@ -653,9 +653,9 @@ if TESTING == True: ids.append(str(i)) self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) s = sorted(self.s.children('parent')) - self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids)) s = self.s.children() - self.failUnless(s == ['parent'], s) + self.assertTrue(s == ['parent'], s) def test_ancestors(self): """Check ancestors lists. @@ -668,7 +668,7 @@ if TESTING == True: j_id = str(20*(i+1)+j) self.s.add(j_id, i_id, directory=(i%2 == 0)) ancestors = sorted(self.s.ancestors(j_id)) - self.failUnless(ancestors == [i_id, 'parent'], + self.assertTrue(ancestors == [i_id, 'parent'], 'Unexpected ancestors for %s/%s, "%s"' % (i_id, j_id, ancestors)) @@ -681,7 +681,7 @@ if TESTING == True: ids.append('parent/%s' % str(i)) self.s.add(ids[-1], 'parent', directory=(i % 2 == 0)) s = sorted(self.s.children('parent')) - self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids)) def test_grandchildren(self): """Grandchildren should not be returned as children. @@ -699,7 +699,7 @@ if TESTING == True: directory = (j % 2 == 0) self.s.add(grandchild, child, directory=directory) s = sorted(self.s.children('parent')) - self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids)) def test_add_invalid_directory(self): """Should not be able to add children to non-directories. @@ -719,7 +719,7 @@ if TESTING == True: % (vars(self.Class)['name'])) except InvalidDirectory: pass - self.failUnless(len(self.s.children('parent')) == 0, + self.assertTrue(len(self.s.children('parent')) == 0, self.s.children('parent')) def test_remove_rooted(self): @@ -732,7 +732,7 @@ if TESTING == True: for i in range(10): self.s.remove(ids.pop()) s = sorted(self.s.children()) - self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids)) def test_remove_nonrooted(self): """Removing entries should decrease the number of children (nonrooted). @@ -745,10 +745,10 @@ if TESTING == True: for i in range(10): self.s.remove(ids.pop()) s = sorted(self.s.children('parent')) - self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids)) if len(s) > 0: s = self.s.children() - self.failUnless(s == ['parent'], s) + self.assertTrue(s == ['parent'], s) def test_remove_directory_not_empty(self): """Removing a non-empty directory entry should raise exception. @@ -778,7 +778,7 @@ if TESTING == True: self.s.add(str(20*(i+1)+j), ids[-1], directory=(i%2 == 0)) self.s.recursive_remove('parent') s = sorted(self.s.children()) - self.failUnless(s == [], s) + self.assertTrue(s == [], s) class Storage_get_set_TestCase (StorageTestCase): """Test cases for Storage.get and .set methods.""" @@ -790,7 +790,7 @@ if TESTING == True: """Get should return specified default if id not in Storage. """ ret = self.s.get(self.id, default=self.val) - self.failUnless(ret == self.val, + self.assertTrue(ret == self.val, "%s.get() returned %s not %s" % (vars(self.Class)['name'], ret, self.val)) @@ -811,7 +811,7 @@ if TESTING == True: self.s.add(self.id, directory=False) val = 'UNLIKELY DEFAULT' ret = self.s.get(self.id, default=val) - self.failUnless(ret == val, + self.assertTrue(ret == val, "%s.get() returned %s not %s" % (vars(self.Class)['name'], ret, val)) @@ -832,29 +832,29 @@ if TESTING == True: self.s.add(self.id, directory=False) self.s.set(self.id, self.val) ret = self.s.get(self.id) - self.failUnless(ret == self.val, + self.assertTrue(ret == self.val, "%s.get() returned %s not %s" % (vars(self.Class)['name'], ret, self.val)) def test_unicode_set(self): """Set should define the value returned by get. """ - val = u'Fran\xe7ois' + val = 'Fran\xe7ois' self.s.add(self.id, directory=False) self.s.set(self.id, val) ret = self.s.get(self.id, decode=True) - self.failUnless(type(ret) == types.UnicodeType, + self.assertTrue(type(ret) == str, "%s.get() returned %s not UnicodeType" % (vars(self.Class)['name'], type(ret))) - self.failUnless(ret == val, + self.assertTrue(ret == val, "%s.get() returned %s not %s" % (vars(self.Class)['name'], ret, self.val)) ret = self.s.get(self.id) - self.failUnless(type(ret) == types.StringType, + self.assertTrue(type(ret) == bytes, "%s.get() returned %s not StringType" % (vars(self.Class)['name'], type(ret))) - s = unicode(ret, self.s.encoding) - self.failUnless(s == val, + s = str(ret, self.s.encoding) + self.assertTrue(s == val, "%s.get() returned %s not %s" % (vars(self.Class)['name'], s, self.val)) @@ -873,7 +873,7 @@ if TESTING == True: self.s.disconnect() self.s.connect() ret = self.s.get(self.id) - self.failUnless(ret == self.val, + self.assertTrue(ret == self.val, "%s.get() returned %s not %s" % (vars(self.Class)['name'], ret, self.val)) @@ -886,7 +886,7 @@ if TESTING == True: self.s.connect() default = 'UNLIKELY DEFAULT' ret = self.s.get(self.id, default=default) - self.failUnless(ret in ['', default], + self.assertTrue(ret in ['', default], "%s.get() returned %s not in %s" % (vars(self.Class)['name'], ret, ['', default])) @@ -901,9 +901,9 @@ if TESTING == True: self.s.disconnect() self.s.connect() s = sorted(self.s.children('parent')) - self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids)) + self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids)) s = self.s.children() - self.failUnless(s == ['parent'], s) + self.assertTrue(s == ['parent'], s) class VersionedStorageTestCase (StorageTestCase): """Test cases for VersionedStorage methods.""" @@ -972,12 +972,12 @@ if TESTING == True: self.commit_body)) for i in range(10): rev = self.s.revision_id(i+1) - self.failUnless(rev == revs[i], + self.assertTrue(rev == revs[i], "%s.revision_id(%d) returned %s not %s" % (vars(self.Class)['name'], i+1, rev, revs[i])) for i in range(-1, -9, -1): rev = self.s.revision_id(i) - self.failUnless(rev == revs[i], + self.assertTrue(rev == revs[i], "%s.revision_id(%d) returned %s not %s" % (vars(self.Class)['name'], i, rev, revs[i])) @@ -994,7 +994,7 @@ if TESTING == True: self.commit_body)) for i in range(10): ret = self.s.get(self.id, revision=revs[i]) - self.failUnless(ret == val(i), + self.assertTrue(ret == val(i), "%s.get() returned %s not %s for revision %s" % (vars(self.Class)['name'], ret, val(i), revs[i])) @@ -1015,7 +1015,7 @@ if TESTING == True: children.append(list(cur_children)) for i in range(10): ret = sorted(self.s.children('parent', revision=revs[i])) - self.failUnless(ret == children[i], + self.assertTrue(ret == children[i], "%s.children() returned %s not %s for revision %s" % (vars(self.Class)['name'], ret, children[i], revs[i])) @@ -1047,7 +1047,7 @@ if TESTING == True: self.commit_body)) for rev,cur_children in zip(revs, children): ret = sorted(self.s.children('parent', revision=rev)) - self.failUnless(ret == cur_children, + self.assertTrue(ret == cur_children, "%s.children() returned %s not %s for revision %s" % (vars(self.Class)['name'], ret, cur_children, rev)) @@ -1074,18 +1074,18 @@ if TESTING == True: self.s.remove('removed') revB = self.s.commit('Final state') new,mod,rem = self.s.changed(revA) - self.failUnless(sorted(new) == ['moved2', 'new'], + self.assertTrue(sorted(new) == ['moved2', 'new'], 'Unexpected new: %s' % new) - self.failUnless(mod == ['modified'], + self.assertTrue(mod == ['modified'], 'Unexpected modified: %s' % mod) - self.failUnless(sorted(rem) == ['moved', 'removed'], + self.assertTrue(sorted(rem) == ['moved', 'removed'], 'Unexpected removed: %s' % rem) def make_storage_testcase_subclasses(storage_class, namespace): """Make StorageTestCase subclasses for storage_class in namespace.""" storage_testcase_classes = [ c for c in ( - ob for ob in globals().values() if isinstance(ob, type)) + ob for ob in list(globals().values()) if isinstance(ob, type)) if issubclass(c, StorageTestCase) \ and c.Class == Storage] @@ -1102,7 +1102,7 @@ if TESTING == True: """Make VersionedStorageTestCase subclasses for storage_class in namespace.""" storage_testcase_classes = [ c for c in ( - ob for ob in globals().values() if isinstance(ob, type)) + ob for ob in list(globals().values()) if isinstance(ob, type)) if ((issubclass(c, StorageTestCase) \ and c.Class == Storage) or diff --git a/libbe/storage/util/config.py b/libbe/storage/util/config.py index 4945b1f..f53f5b2 100644 --- a/libbe/storage/util/config.py +++ b/libbe/storage/util/config.py @@ -21,7 +21,7 @@ """Create, save, and load the per-user config file at :py:func:`path`. """ -import ConfigParser +import configparser import codecs import os import os.path @@ -71,7 +71,7 @@ def set_val(name, value, section="DEFAULT", encoding=None): """ if encoding == None: encoding = default_encoding - config = ConfigParser.ConfigParser() + config = configparser.ConfigParser() if os.path.exists(path()) == False: # touch file or config open(path(), 'w').close() # read chokes on missing file f = codecs.open(path(), 'r', encoding) @@ -114,13 +114,13 @@ def get_val(name, section="DEFAULT", default=None, encoding=None): if os.path.exists(path()): if encoding == None: encoding = default_encoding - config = ConfigParser.ConfigParser() + config = configparser.ConfigParser() f = codecs.open(path(), 'r', encoding) config.readfp(f, path()) f.close() try: return config.get(section, name) - except ConfigParser.NoOptionError: + except configparser.NoOptionError: return default else: return default diff --git a/libbe/storage/util/mapfile.py b/libbe/storage/util/mapfile.py index 1c37e3f..609f782 100644 --- a/libbe/storage/util/mapfile.py +++ b/libbe/storage/util/mapfile.py @@ -70,7 +70,7 @@ def generate(map, context=6): the :py:mod:`~libbe.command.serve_commands` where merging is not important, the amount of context is controllable. - >>> sys.stdout.write(generate({'q':u'Fran\u00e7ais'}, context=0)) + >>> sys.stdout.write(generate({'q':u'Fran\\u00e7ais'}, context=0)) { "q": "Fran\\u00e7ais" } @@ -113,7 +113,7 @@ def parse(contents): u'd' >>> dict['e'] u'f' - >>> contents = generate({'q':u'Fran\u00e7ais'}) + >>> contents = generate({'q':u'Fran\\u00e7ais'}) >>> dict = parse(contents) >>> dict['q'] u'Fran\\xe7ais' diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py index 77c0162..4aa3ce6 100644 --- a/libbe/storage/util/properties.py +++ b/libbe/storage/util/properties.py @@ -415,7 +415,7 @@ if libbe.TESTING == True: @doc_property("A fancy property") def x(): return {} - self.failUnless(Test.x.__doc__ == "A fancy property", + self.assertTrue(Test.x.__doc__ == "A fancy property", Test.x.__doc__) def testLocalProperty(self): class Test(object): @@ -424,11 +424,11 @@ if libbe.TESTING == True: def x(): return {} t = Test() - self.failUnless(t.x == None, str(t.x)) + self.assertTrue(t.x == None, str(t.x)) t.x = 'z' # the first set initializes ._LOCAL_value - self.failUnless(t.x == 'z', str(t.x)) - self.failUnless("_LOCAL_value" in dir(t), dir(t)) - self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value) + self.assertTrue(t.x == 'z', str(t.x)) + self.assertTrue("_LOCAL_value" in dir(t), dir(t)) + self.assertTrue(t._LOCAL_value == 'z', t._LOCAL_value) def testSettingsProperty(self): class Test(object): @Property @@ -438,11 +438,11 @@ if libbe.TESTING == True: def __init__(self): self.settings = {} t = Test() - self.failUnless(t.x == None, str(t.x)) + self.assertTrue(t.x == None, str(t.x)) t.x = 'z' # the first set initializes ._LOCAL_value - self.failUnless(t.x == 'z', str(t.x)) - self.failUnless("attr" in t.settings, t.settings) - self.failUnless(t.settings["attr"] == 'z', t.settings["attr"]) + self.assertTrue(t.x == 'z', str(t.x)) + self.assertTrue("attr" in t.settings, t.settings) + self.assertTrue(t.settings["attr"] == 'z', t.settings["attr"]) def testDefaultingLocalProperty(self): class Test(object): @Property @@ -450,15 +450,15 @@ if libbe.TESTING == True: @local_property(name="DEFAULT", null=5) def x(): return {} t = Test() - self.failUnless(t.x == 5, str(t.x)) + self.assertTrue(t.x == 5, str(t.x)) t.x = 'x' - self.failUnless(t.x == 'y', str(t.x)) + self.assertTrue(t.x == 'y', str(t.x)) t.x = 'y' - self.failUnless(t.x == 'y', str(t.x)) + self.assertTrue(t.x == 'y', str(t.x)) t.x = 'z' - self.failUnless(t.x == 'z', str(t.x)) + self.assertTrue(t.x == 'z', str(t.x)) t.x = 5 - self.failUnless(t.x == 5, str(t.x)) + self.assertTrue(t.x == 5, str(t.x)) def testCheckedLocalProperty(self): class Test(object): @Property @@ -468,13 +468,13 @@ if libbe.TESTING == True: def __init__(self): self._CHECKED_value = 'x' t = Test() - self.failUnless(t.x == 'x', str(t.x)) + self.assertTrue(t.x == 'x', str(t.x)) try: t.x = None e = None - except ValueCheckError, e: + except ValueCheckError as e: pass - self.failUnless(type(e) == ValueCheckError, type(e)) + self.assertTrue(type(e) == ValueCheckError, type(e)) def testTwoCheckedLocalProperties(self): class Test(object): @Property @@ -493,18 +493,18 @@ if libbe.TESTING == True: try: t.x = 'a' e = None - except ValueCheckError, e: + except ValueCheckError as e: pass - self.failUnless(type(e) == ValueCheckError, type(e)) + self.assertTrue(type(e) == ValueCheckError, type(e)) t.x = 'x' t.x = 'y' t.x = 'z' try: t.a = 'x' e = None - except ValueCheckError, e: + except ValueCheckError as e: pass - self.failUnless(type(e) == ValueCheckError, type(e)) + self.assertTrue(type(e) == ValueCheckError, type(e)) t.a = 'a' t.a = 'b' t.a = 'c' @@ -517,13 +517,13 @@ if libbe.TESTING == True: def __init__(self): self._CHECKED_value = 'x' t = Test() - self.failUnless(t.x == 'x', str(t.x)) + self.assertTrue(t.x == 'x', str(t.x)) try: t.x = None e = None - except ValueCheckError, e: + except ValueCheckError as e: pass - self.failUnless(type(e) == ValueCheckError, type(e)) + self.assertTrue(type(e) == ValueCheckError, type(e)) def testCachedLocalProperty(self): class Gen(object): def __init__(self): @@ -537,30 +537,30 @@ if libbe.TESTING == True: @local_property(name="CACHED") def x(): return {} t = Test() - self.failIf("_CACHED_cache" in dir(t), + self.assertFalse("_CACHED_cache" in dir(t), getattr(t, "_CACHED_cache", None)) - self.failUnless(t.x == 1, t.x) - self.failUnless(t.x == 1, t.x) - self.failUnless(t.x == 1, t.x) + self.assertTrue(t.x == 1, t.x) + self.assertTrue(t.x == 1, t.x) + self.assertTrue(t.x == 1, t.x) t.x = 8 - self.failUnless(t.x == 8, t.x) - self.failUnless(t.x == 8, t.x) + self.assertTrue(t.x == 8, t.x) + self.assertTrue(t.x == 8, t.x) t._CACHED_cache = False # Caching is off, but the stored value val = t.x # is 8, not the initVal (None), so we - self.failUnless(val == 8, val) # get 8. + self.assertTrue(val == 8, val) # get 8. t._CACHED_value = None # Now we've set the stored value to None val = t.x # so future calls to fget (like this) - self.failUnless(val == 2, val) # will call the generator every time... + self.assertTrue(val == 2, val) # will call the generator every time... val = t.x - self.failUnless(val == 3, val) + self.assertTrue(val == 3, val) val = t.x - self.failUnless(val == 4, val) + self.assertTrue(val == 4, val) t._CACHED_cache = True # We turn caching back on, and get - self.failUnless(t.x == 1, str(t.x)) # the original cached value. + self.assertTrue(t.x == 1, str(t.x)) # the original cached value. del t._CACHED_cached_value # Removing that value forces a - self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call - self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which - self.failUnless(t.x == 5, str(t.x)) # we get the new cached value. + self.assertTrue(t.x == 5, str(t.x)) # single cache-regenerating call + self.assertTrue(t.x == 5, str(t.x)) # to the genenerator, after which + self.assertTrue(t.x == 5, str(t.x)) # we get the new cached value. def testPrimedLocalProperty(self): class Test(object): def prime(self): @@ -573,23 +573,23 @@ if libbe.TESTING == True: self.settings={} self.primeVal = "initialized" t = Test() - self.failIf("_PRIMED_prime" in dir(t), + self.assertFalse("_PRIMED_prime" in dir(t), getattr(t, "_PRIMED_prime", None)) - self.failUnless(t.x == "initialized", t.x) + self.assertTrue(t.x == "initialized", t.x) t.x = 1 - self.failUnless(t.x == 1, t.x) + self.assertTrue(t.x == 1, t.x) t.x = None - self.failUnless(t.x == "initialized", t.x) + self.assertTrue(t.x == "initialized", t.x) t._PRIMED_prime = True t.x = 3 - self.failUnless(t.x == "initialized", t.x) + self.assertTrue(t.x == "initialized", t.x) t._PRIMED_prime = False t.x = 3 - self.failUnless(t.x == 3, t.x) + self.assertTrue(t.x == 3, t.x) # test unprimableVal t.x = None t.primeVal = None - self.failUnless(t.x == 2, t.x) + self.assertTrue(t.x == 2, t.x) def testChangeHookLocalProperty(self): class Test(object): def _hook(self, old, new): @@ -602,14 +602,14 @@ if libbe.TESTING == True: def x(): return {} t = Test() t.x = 1 - self.failUnless(t.old == None, t.old) - self.failUnless(t.new == 1, t.new) + self.assertTrue(t.old == None, t.old) + self.assertTrue(t.new == 1, t.new) t.x = 1 - self.failUnless(t.old == None, t.old) - self.failUnless(t.new == 1, t.new) + self.assertTrue(t.old == None, t.old) + self.assertTrue(t.new == 1, t.new) t.x = 2 - self.failUnless(t.old == 1, t.old) - self.failUnless(t.new == 2, t.new) + self.assertTrue(t.old == 1, t.old) + self.assertTrue(t.new == 2, t.new) def testChangeHookMutableProperty(self): class Test(object): def _hook(self, old, new): @@ -624,45 +624,45 @@ if libbe.TESTING == True: t = Test() t.hook_calls = 0 t.x = [] - self.failUnless(t.old == None, t.old) - self.failUnless(t.new == [], t.new) - self.failUnless(t.hook_calls == 1, t.hook_calls) + self.assertTrue(t.old == None, t.old) + self.assertTrue(t.new == [], t.new) + self.assertTrue(t.hook_calls == 1, t.hook_calls) a = t.x a.append(5) t.x = a - self.failUnless(t.old == [], t.old) - self.failUnless(t.new == [5], t.new) - self.failUnless(t.hook_calls == 2, t.hook_calls) + self.assertTrue(t.old == [], t.old) + self.assertTrue(t.new == [5], t.new) + self.assertTrue(t.hook_calls == 2, t.hook_calls) t.x = [] - self.failUnless(t.old == [5], t.old) - self.failUnless(t.new == [], t.new) - self.failUnless(t.hook_calls == 3, t.hook_calls) + self.assertTrue(t.old == [5], t.old) + self.assertTrue(t.new == [], t.new) + self.assertTrue(t.hook_calls == 3, t.hook_calls) # now append without reassigning. this doesn't trigger the # change, since we don't ever set t.x, only get it and mess # with it. It does, however, update our t.new, since t.new = # t.x and is not a static copy. t.x.append(5) - self.failUnless(t.old == [5], t.old) - self.failUnless(t.new == [5], t.new) - self.failUnless(t.hook_calls == 3, t.hook_calls) + self.assertTrue(t.old == [5], t.old) + self.assertTrue(t.new == [5], t.new) + self.assertTrue(t.hook_calls == 3, t.hook_calls) # however, the next t.x get _will_ notice the change... a = t.x - self.failUnless(t.old == [], t.old) - self.failUnless(t.new == [5], t.new) - self.failUnless(t.hook_calls == 4, t.hook_calls) + self.assertTrue(t.old == [], t.old) + self.assertTrue(t.new == [5], t.new) + self.assertTrue(t.hook_calls == 4, t.hook_calls) t.x.append(6) # this append(6) is not noticed yet - self.failUnless(t.old == [], t.old) - self.failUnless(t.new == [5,6], t.new) - self.failUnless(t.hook_calls == 4, t.hook_calls) + self.assertTrue(t.old == [], t.old) + self.assertTrue(t.new == [5,6], t.new) + self.assertTrue(t.hook_calls == 4, t.hook_calls) # this append(7) is not noticed, but the t.x get causes the # append(6) to be noticed t.x.append(7) - self.failUnless(t.old == [5], t.old) - self.failUnless(t.new == [5,6,7], t.new) - self.failUnless(t.hook_calls == 5, t.hook_calls) + self.assertTrue(t.old == [5], t.old) + self.assertTrue(t.new == [5,6,7], t.new) + self.assertTrue(t.hook_calls == 5, t.hook_calls) a = t.x # now the append(7) is noticed - self.failUnless(t.old == [5,6], t.old) - self.failUnless(t.new == [5,6,7], t.new) - self.failUnless(t.hook_calls == 6, t.hook_calls) + self.assertTrue(t.old == [5,6], t.old) + self.assertTrue(t.new == [5,6,7], t.new) + self.assertTrue(t.hook_calls == 6, t.hook_calls) suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests) diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py index 819698f..323d6e9 100644 --- a/libbe/storage/util/settings_object.py +++ b/libbe/storage/util/settings_object.py @@ -27,7 +27,7 @@ See Also """ import libbe -from properties import Property, doc_property, local_property, \ +from .properties import Property, doc_property, local_property, \ defaulting_property, checked_property, fn_checked_property, \ cached_property, primed_property, change_hook_property, \ settings_property @@ -324,7 +324,7 @@ if libbe.TESTING == True: required_saved_properties=required_saved_properties) def content_type(): return {} expected = "A test property\n\nThis property defaults to None." - self.failUnless(Test.content_type.__doc__ == expected, + self.assertTrue(Test.content_type.__doc__ == expected, Test.content_type.__doc__) def testSimplePropertyFromMemory(self): """Testing a minimal versioned property from memory""" @@ -338,75 +338,75 @@ if libbe.TESTING == True: required_saved_properties=required_saved_properties) def content_type(): return {} t = Test() - self.failUnless(len(t.settings) == 0, len(t.settings)) + self.assertTrue(len(t.settings) == 0, len(t.settings)) # accessing t.content_type triggers the priming, but # t.storage.is_readable() == False, so nothing happens. t.storage.readable = False - self.failUnless(t.content_type == None, t.content_type) - self.failUnless(t.settings == {}, t.settings) - self.failUnless(len(t.settings) == 0, len(t.settings)) - self.failUnless(t.content_type == None, t.content_type) + self.assertTrue(t.content_type == None, t.content_type) + self.assertTrue(t.settings == {}, t.settings) + self.assertTrue(len(t.settings) == 0, len(t.settings)) + self.assertTrue(t.content_type == None, t.content_type) # accessing t.content_type triggers the priming again, and # now that t.storage.is_readable() == True, this fills out # t.settings with EMPTY data. At this point there should # be one load and no saves. t.storage.readable = True - self.failUnless(t.content_type == None, t.content_type) - self.failUnless(len(t.settings) == 1, len(t.settings)) - self.failUnless(t.settings["Content-type"] == EMPTY, + self.assertTrue(t.content_type == None, t.content_type) + self.assertTrue(len(t.settings) == 1, len(t.settings)) + self.assertTrue(t.settings["Content-type"] == EMPTY, t.settings["Content-type"]) - self.failUnless(t.content_type == None, t.content_type) - self.failUnless(t.load_count == 1, t.load_count) - self.failUnless(len(t.storage) == 0, len(t.storage)) + self.assertTrue(t.content_type == None, t.content_type) + self.assertTrue(t.load_count == 1, t.load_count) + self.assertTrue(len(t.storage) == 0, len(t.storage)) # an explicit call to load settings forces a reload, # but nothing else changes. t.load_settings() - self.failUnless(len(t.settings) == 1, len(t.settings)) - self.failUnless(t.settings["Content-type"] == EMPTY, + self.assertTrue(len(t.settings) == 1, len(t.settings)) + self.assertTrue(t.settings["Content-type"] == EMPTY, t.settings["Content-type"]) - self.failUnless(t.content_type == None, t.content_type) - self.failUnless(t.load_count == 2, t.load_count) - self.failUnless(len(t.storage) == 0, len(t.storage)) + self.assertTrue(t.content_type == None, t.content_type) + self.assertTrue(t.load_count == 2, t.load_count) + self.assertTrue(len(t.storage) == 0, len(t.storage)) # now we set a value t.content_type = 5 - self.failUnless(t.settings["Content-type"] == 5, + self.assertTrue(t.settings["Content-type"] == 5, t.settings["Content-type"]) - self.failUnless(t.load_count == 2, t.load_count) - self.failUnless(len(t.storage) == 1, len(t.storage)) - self.failUnless(t.storage == [{'Content-type':5}], t.storage) + self.assertTrue(t.load_count == 2, t.load_count) + self.assertTrue(len(t.storage) == 1, len(t.storage)) + self.assertTrue(t.storage == [{'Content-type':5}], t.storage) # getting its value changes nothing - self.failUnless(t.content_type == 5, t.content_type) - self.failUnless(t.settings["Content-type"] == 5, + self.assertTrue(t.content_type == 5, t.content_type) + self.assertTrue(t.settings["Content-type"] == 5, t.settings["Content-type"]) - self.failUnless(t.load_count == 2, t.load_count) - self.failUnless(len(t.storage) == 1, len(t.storage)) - self.failUnless(t.storage == [{'Content-type':5}], t.storage) + self.assertTrue(t.load_count == 2, t.load_count) + self.assertTrue(len(t.storage) == 1, len(t.storage)) + self.assertTrue(t.storage == [{'Content-type':5}], t.storage) # now we set another value t.content_type = "text/plain" - self.failUnless(t.content_type == "text/plain", t.content_type) - self.failUnless(t.settings["Content-type"] == "text/plain", + self.assertTrue(t.content_type == "text/plain", t.content_type) + self.assertTrue(t.settings["Content-type"] == "text/plain", t.settings["Content-type"]) - self.failUnless(t.load_count == 2, t.load_count) - self.failUnless(len(t.storage) == 2, len(t.storage)) - self.failUnless(t.storage == [{'Content-type':5}, + self.assertTrue(t.load_count == 2, t.load_count) + self.assertTrue(len(t.storage) == 2, len(t.storage)) + self.assertTrue(t.storage == [{'Content-type':5}, {'Content-type':'text/plain'}], t.storage) # t._get_saved_settings() returns a dict of required or # non-default values. - self.failUnless(t._get_saved_settings() == \ + self.assertTrue(t._get_saved_settings() == \ {"Content-type":"text/plain"}, t._get_saved_settings()) # now we clear to the post-primed value t.content_type = EMPTY - self.failUnless(t.settings["Content-type"] == EMPTY, + self.assertTrue(t.settings["Content-type"] == EMPTY, t.settings["Content-type"]) - self.failUnless(t.content_type == None, t.content_type) - self.failUnless(len(t.settings) == 1, len(t.settings)) - self.failUnless(t.settings["Content-type"] == EMPTY, + self.assertTrue(t.content_type == None, t.content_type) + self.assertTrue(len(t.settings) == 1, len(t.settings)) + self.assertTrue(t.settings["Content-type"] == EMPTY, t.settings["Content-type"]) - self.failUnless(t._get_saved_settings() == {}, + self.assertTrue(t._get_saved_settings() == {}, t._get_saved_settings()) - self.failUnless(t.storage == [{'Content-type':5}, + self.assertTrue(t.storage == [{'Content-type':5}, {'Content-type':'text/plain'}, {}], t.storage) @@ -433,16 +433,16 @@ if libbe.TESTING == True: # which also pulls in prop-a. t.prop_b = 'new-b' settings = {'prop-b':'new-b', 'prop-a':'saved'} - self.failUnless(t.settings == settings, t.settings) - self.failUnless(t._get_saved_settings() == settings, + self.assertTrue(t.settings == settings, t.settings) + self.assertTrue(t._get_saved_settings() == settings, t._get_saved_settings()) # test that _get_saved_settings() works even when settings # were _not_ loaded beforehand t = Test() t.storage.append({'prop-a':'saved'}) settings ={'prop-a':'saved'} - self.failUnless(t.settings == {}, t.settings) - self.failUnless(t._get_saved_settings() == settings, + self.assertTrue(t.settings == {}, t.settings) + self.assertTrue(t._get_saved_settings() == settings, t._get_saved_settings()) def testSimplePropertySetStorageSave(self): """Set a property, then attach storage and save""" @@ -467,13 +467,13 @@ if libbe.TESTING == True: t.prop_a = 'text/html' t.storage = storage t.save_settings() - self.failUnless(t.prop_a == 'text/html', t.prop_a) - self.failUnless(t.settings == {'prop-a':'text/html', + self.assertTrue(t.prop_a == 'text/html', t.prop_a) + self.assertTrue(t.settings == {'prop-a':'text/html', 'prop-b':EMPTY}, t.settings) - self.failUnless(t.load_count == 1, t.load_count) - self.failUnless(len(t.storage) == 1, len(t.storage)) - self.failUnless(t.storage == [{'prop-a':'text/html'}], + self.assertTrue(t.load_count == 1, t.load_count) + self.assertTrue(len(t.storage) == 1, len(t.storage)) + self.assertTrue(t.storage == [{'prop-a':'text/html'}], t.storage) def testDefaultingProperty(self): """Testing a defaulting versioned property""" @@ -488,24 +488,24 @@ if libbe.TESTING == True: required_saved_properties=required_saved_properties) def content_type(): return {} t = Test() - self.failUnless(t.settings == {}, t.settings) - self.failUnless(t.content_type == "text/plain", t.content_type) - self.failUnless(t.settings == {"Content-type":EMPTY}, + self.assertTrue(t.settings == {}, t.settings) + self.assertTrue(t.content_type == "text/plain", t.content_type) + self.assertTrue(t.settings == {"Content-type":EMPTY}, t.settings) - self.failUnless(t.load_count == 1, t.load_count) - self.failUnless(len(t.storage) == 0, len(t.storage)) - self.failUnless(t._get_saved_settings() == {}, + self.assertTrue(t.load_count == 1, t.load_count) + self.assertTrue(len(t.storage) == 0, len(t.storage)) + self.assertTrue(t._get_saved_settings() == {}, t._get_saved_settings()) t.content_type = "text/html" - self.failUnless(t.content_type == "text/html", + self.assertTrue(t.content_type == "text/html", t.content_type) - self.failUnless(t.settings == {"Content-type":"text/html"}, + self.assertTrue(t.settings == {"Content-type":"text/html"}, t.settings) - self.failUnless(t.load_count == 1, t.load_count) - self.failUnless(len(t.storage) == 1, len(t.storage)) - self.failUnless(t.storage == [{'Content-type':'text/html'}], + self.assertTrue(t.load_count == 1, t.load_count) + self.assertTrue(len(t.storage) == 1, len(t.storage)) + self.assertTrue(t.storage == [{'Content-type':'text/html'}], t.storage) - self.failUnless(t._get_saved_settings() == \ + self.assertTrue(t._get_saved_settings() == \ {"Content-type":"text/html"}, t._get_saved_settings()) def testRequiredDefaultingProperty(self): @@ -522,25 +522,25 @@ if libbe.TESTING == True: require_save=True) def content_type(): return {} t = Test() - self.failUnless(t.settings == {}, t.settings) - self.failUnless(t.content_type == "text/plain", t.content_type) - self.failUnless(t.settings == {"Content-type":EMPTY}, + self.assertTrue(t.settings == {}, t.settings) + self.assertTrue(t.content_type == "text/plain", t.content_type) + self.assertTrue(t.settings == {"Content-type":EMPTY}, t.settings) - self.failUnless(t.load_count == 1, t.load_count) - self.failUnless(len(t.storage) == 0, len(t.storage)) - self.failUnless(t._get_saved_settings() == \ + self.assertTrue(t.load_count == 1, t.load_count) + self.assertTrue(len(t.storage) == 0, len(t.storage)) + self.assertTrue(t._get_saved_settings() == \ {"Content-type":"text/plain"}, t._get_saved_settings()) t.content_type = "text/html" - self.failUnless(t.content_type == "text/html", + self.assertTrue(t.content_type == "text/html", t.content_type) - self.failUnless(t.settings == {"Content-type":"text/html"}, + self.assertTrue(t.settings == {"Content-type":"text/html"}, t.settings) - self.failUnless(t.load_count == 1, t.load_count) - self.failUnless(len(t.storage) == 1, len(t.storage)) - self.failUnless(t.storage == [{'Content-type':'text/html'}], + self.assertTrue(t.load_count == 1, t.load_count) + self.assertTrue(len(t.storage) == 1, len(t.storage)) + self.assertTrue(t.storage == [{'Content-type':'text/html'}], t.storage) - self.failUnless(t._get_saved_settings() == \ + self.assertTrue(t._get_saved_settings() == \ {"Content-type":"text/html"}, t._get_saved_settings()) def testClassVersionedPropertyDefinition(self): @@ -564,18 +564,18 @@ if libbe.TESTING == True: require_save=True) def content_type(): return {} t = Test() - self.failUnless(t._get_saved_settings() == \ + self.assertTrue(t._get_saved_settings() == \ {"Content-type":"text/plain"}, t._get_saved_settings()) - self.failUnless(t.load_count == 1, t.load_count) - self.failUnless(len(t.storage) == 0, len(t.storage)) + self.assertTrue(t.load_count == 1, t.load_count) + self.assertTrue(len(t.storage) == 0, len(t.storage)) t.content_type = "text/html" - self.failUnless(t._get_saved_settings() == \ + self.assertTrue(t._get_saved_settings() == \ {"Content-type":"text/html"}, t._get_saved_settings()) - self.failUnless(t.load_count == 1, t.load_count) - self.failUnless(len(t.storage) == 1, len(t.storage)) - self.failUnless(t.storage == [{'Content-type':'text/html'}], + self.assertTrue(t.load_count == 1, t.load_count) + self.assertTrue(len(t.storage) == 1, len(t.storage)) + self.assertTrue(t.storage == [{'Content-type':'text/html'}], t.storage) def testMutableChangeHookedProperty(self): """Testing a mutable change-hooked property""" @@ -591,26 +591,26 @@ if libbe.TESTING == True: required_saved_properties=required_saved_properties) def list_type(): return {} t = Test() - self.failUnless(len(t.storage) == 0, len(t.storage)) - self.failUnless(t.list_type == None, t.list_type) - self.failUnless(len(t.storage) == 0, len(t.storage)) - self.failUnless(t.settings["List-type"]==EMPTY, + self.assertTrue(len(t.storage) == 0, len(t.storage)) + self.assertTrue(t.list_type == None, t.list_type) + self.assertTrue(len(t.storage) == 0, len(t.storage)) + self.assertTrue(t.settings["List-type"]==EMPTY, t.settings["List-type"]) t.list_type = [] - self.failUnless(t.settings["List-type"] == [], + self.assertTrue(t.settings["List-type"] == [], t.settings["List-type"]) - self.failUnless(len(t.storage) == 1, len(t.storage)) - self.failUnless(t.storage == [{'List-type':[]}], + self.assertTrue(len(t.storage) == 1, len(t.storage)) + self.assertTrue(t.storage == [{'List-type':[]}], t.storage) t.list_type.append(5) # external modification not detected yet - self.failUnless(len(t.storage) == 1, len(t.storage)) - self.failUnless(t.storage == [{'List-type':[]}], + self.assertTrue(len(t.storage) == 1, len(t.storage)) + self.assertTrue(t.storage == [{'List-type':[]}], t.storage) - self.failUnless(t.settings["List-type"] == [5], + self.assertTrue(t.settings["List-type"] == [5], t.settings["List-type"]) - self.failUnless(t.list_type == [5], t.list_type)# get triggers save - self.failUnless(len(t.storage) == 2, len(t.storage)) - self.failUnless(t.storage == [{'List-type':[]}, + self.assertTrue(t.list_type == [5], t.list_type)# get triggers save + self.assertTrue(len(t.storage) == 2, len(t.storage)) + self.assertTrue(t.storage == [{'List-type':[]}, {'List-type':[5]}], t.storage) diff --git a/libbe/storage/util/upgrade.py b/libbe/storage/util/upgrade.py index c3a5df1..12d177a 100644 --- a/libbe/storage/util/upgrade.py +++ b/libbe/storage/util/upgrade.py @@ -49,14 +49,14 @@ def generate_yaml_mapfile(map): >>> generate_yaml_mapfile({'q':'p'}) 'q: p\\n\\n' - >>> generate_yaml_mapfile({'q':u'Fran\u00e7ais'}) + >>> generate_yaml_mapfile({'q':u'Fran\\u00e7ais'}) 'q: Fran\\xc3\\xa7ais\\n\\n' >>> generate_yaml_mapfile({'q':u'hello'}) 'q: hello\\n\\n' """ if yaml is None: raise _yaml_import_error - keys = map.keys() + keys = list(map.keys()) keys.sort() for key in keys: try: @@ -66,9 +66,9 @@ def generate_yaml_mapfile(map): assert(':' not in key) assert(len(key) > 0) except AssertionError: - raise ValueError(unicode(key).encode('unicode_escape')) + raise ValueError(str(key).encode('unicode_escape')) if '\n' in map[key]: - raise ValueError(unicode(map[key]).encode('unicode_escape')) + raise ValueError(str(map[key]).encode('unicode_escape')) lines = [] for key in keys: @@ -94,7 +94,7 @@ def parse_yaml_mapfile(contents): 'd' >>> dict['e'] 'f' - >>> contents = generate_yaml_mapfile({'q':u'Fran\u00e7ais'}) + >>> contents = generate_yaml_mapfile({'q':u'Fran\\u00e7ais'}) >>> dict = parse_yaml_mapfile(contents) >>> dict['q'] u'Fran\\xe7ais' @@ -102,7 +102,7 @@ def parse_yaml_mapfile(contents): if yaml is None: raise _yaml_import_error c = yaml.safe_load(contents) - if type(c) == types.StringType: + if type(c) == bytes: raise mapfile.InvalidMapfileContents( 'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents) return c or {} @@ -146,8 +146,8 @@ class Upgrader (object): self.vcs._vcs_update(path) def upgrade(self): - print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \ - % (self.initial_version, self.final_version) + print('upgrading bugdir from "%s" to "%s"' \ + % (self.initial_version, self.final_version), file=sys.stderr) self.check_initial_version() self.set_version() self._upgrade() @@ -191,7 +191,7 @@ class Upgrade_1_0_to_1_1 (Upgrader): contents = '\n'.join(newlines) # load the YAML and save map = parse_yaml_mapfile(contents) - if type(map) == types.StringType: + if type(map) == bytes: raise ValueError((path, contents)) contents = generate_yaml_mapfile(map) encoding.set_file_contents(path, contents) @@ -317,7 +317,7 @@ class Upgrade_1_2_to_1_3 (Upgrader): for bug_uuid in os.listdir(self.get_path('bugs')): self._upgrade_bug_mapfile(bug_uuid) self._upgrade_bugdir_mapfile() - for bug in self._targets.values(): + for bug in list(self._targets.values()): self._save_bug_settings(bug) class Upgrade_1_3_to_1_4 (Upgrader): @@ -418,11 +418,9 @@ def upgrade(path, current_version, use consecutive conversion functions. """ if current_version not in STORAGE_VERSIONS: - raise NotImplementedError, \ - "Cannot handle version '%s' yet." % current_version + raise NotImplementedError("Cannot handle version '%s' yet." % current_version) if target_version not in STORAGE_VERSIONS: - raise NotImplementedError, \ - "Cannot handle version '%s' yet." % current_version + raise NotImplementedError("Cannot handle version '%s' yet." % current_version) if (current_version, target_version) in upgrade_classes: # direct conversion @@ -438,9 +436,8 @@ def upgrade(path, current_version, try: upgrade_class = upgrade_classes[(version_a, version_b)] except KeyError: - raise NotImplementedError, \ - "Cannot convert version '%s' to '%s' yet." \ - % (version_a, version_b) + raise NotImplementedError("Cannot convert version '%s' to '%s' yet." \ + % (version_a, version_b)) u = upgrade_class(path) u.upgrade() if version_b == target_version: diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py index 2fd8dc4..56c66b3 100644 --- a/libbe/storage/vcs/__init__.py +++ b/libbe/storage/vcs/__init__.py @@ -32,7 +32,7 @@ The base `VCS` class also serves as a filesystem Storage backend (not versioning) in the event that a user has no VCS installed. """ -import base +from . import base set_preferred_vcs = base.set_preferred_vcs vcs_by_name = base.vcs_by_name diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py index 687575f..dfd007f 100644 --- a/libbe/storage/vcs/base.py +++ b/libbe/storage/vcs/base.py @@ -255,7 +255,7 @@ class CachedPathID (object): def disconnect(self): if self._changed == True: f = codecs.open(self._cache_path, 'w', self.encoding) - for uuid,path in self._cache.items(): + for uuid,path in list(self._cache.items()): f.write('%s\t%s\n' % (uuid, path)) f.close() self._cache = {} @@ -558,12 +558,12 @@ class VCS (libbe.storage.base.VersionedStorage): for num in num_part.split('.'): try: self._parsed_version.append(int(num)) - except ValueError, e: + except ValueError as e: # bzr version number might contain non-numerical tags splitter = re.compile(r'[\D]') # Match non-digits splits = splitter.split(num) # if len(tag) > 1 some splits will be empty; remove - splits = filter(lambda s: s != '', splits) + splits = [s for s in splits if s != ''] tag_starti = len(splits[0]) num_starti = num.find(splits[1], tag_starti) tag = num[tag_starti:num_starti] @@ -573,7 +573,7 @@ class VCS (libbe.storage.base.VersionedStorage): for current,other in zip(self._parsed_version, args): if type(current) != type (other): # one of them is a pre-release string - if type(current) != types.IntType: + if type(current) != int: return -1 else: return 1 @@ -586,12 +586,12 @@ class VCS (libbe.storage.base.VersionedStorage): if verlen == arglen: return 0 elif verlen > arglen: - if type(self._parsed_version[arglen]) != types.IntType: + if type(self._parsed_version[arglen]) != int: return -1 # self is a prerelease else: return 1 else: - if type(args[verlen]) != types.IntType: + if type(args[verlen]) != int: return 1 # args is a prerelease else: return -1 @@ -732,7 +732,7 @@ class VCS (libbe.storage.base.VersionedStorage): if revision == None: try: path = self.path(id, revision, relpath=False) - except InvalidID, e: + except InvalidID as e: return False return os.path.exists(path) path = self.path(id, revision, relpath=True) @@ -763,7 +763,7 @@ class VCS (libbe.storage.base.VersionedStorage): if os.path.exists(path): shutil.rmtree(path) path = self._cached_path_id.path(id, relpath=True) - for id,p in self._cached_path_id._cache.items(): + for id,p in list(self._cached_path_id._cache.items()): if p.startswith(path): self._cached_path_id.remove_id(id) @@ -818,7 +818,7 @@ class VCS (libbe.storage.base.VersionedStorage): try: relpath = self.path(id, revision, relpath=True) contents = self._vcs_get_file_contents(relpath, revision) - except InvalidID, e: + except InvalidID as e: if default == libbe.util.InvalidObject: raise e return default @@ -833,7 +833,7 @@ class VCS (libbe.storage.base.VersionedStorage): def _set(self, id, value): try: path = self._cached_path_id.path(id) - except InvalidID, e: + except InvalidID as e: raise if not os.path.exists(path): raise InvalidID(id) @@ -923,7 +923,7 @@ class VCS (libbe.storage.base.VersionedStorage): """ try: ret = search_parent_directories(path, filename) - except AssertionError, e: + except AssertionError as e: return None return ret @@ -1054,8 +1054,8 @@ class VCS (libbe.storage.base.VersionedStorage): path, decode=True).rstrip() relpath = self._u_rel_path(path) contents = self._vcs_get_file_contents(relpath, revision=revision) - if type(contents) != types.UnicodeType: - contents = unicode(contents, self.encoding) + if type(contents) != str: + contents = str(contents, self.encoding) return contents.strip() def _setup_storage_version(self): @@ -1104,7 +1104,7 @@ if libbe.TESTING == True: def test_installed(self): """See if the VCS is installed. """ - self.failUnless(self.s.installed() == True, + self.assertTrue(self.s.installed() == True, '%(name)s VCS not found' % vars(self.Class)) @@ -1114,7 +1114,7 @@ if libbe.TESTING == True: """ if self.s.installed(): self.s.disconnect() - self.failUnless(self.s._detect(self.dirname) == True, + self.assertTrue(self.s._detect(self.dirname) == True, 'Did not detected %(name)s VCS after initialising' % vars(self.Class)) self.s.connect() @@ -1125,7 +1125,7 @@ if libbe.TESTING == True: if self.s.installed() and self.Class.name != 'None': self.s.disconnect() self.s.destroy() - self.failUnless(self.s._detect(self.dirname) == False, + self.assertTrue(self.s._detect(self.dirname) == False, 'Detected %(name)s VCS before initialising' % vars(self.Class)) self.s.init() @@ -1136,7 +1136,7 @@ if libbe.TESTING == True: rp = os.path.realpath(self.s.repo) dp = os.path.realpath(self.dirname) vcs_name = self.Class.name - self.failUnless( + self.assertTrue( dp == rp or rp == None, "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars()) @@ -1151,7 +1151,7 @@ if libbe.TESTING == True: return name,email = libbe.ui.util.user.parse_user_id(user_id) if email != None: - self.failUnless('@' in email, email) + self.assertTrue('@' in email, email) def make_vcs_testcase_subclasses(vcs_class, namespace): c = vcs_class() @@ -1167,7 +1167,7 @@ if libbe.TESTING == True: # Make VCSTestCase subclasses for vcs_class in the namespace. vcs_testcase_classes = [ c for c in ( - ob for ob in globals().values() if isinstance(ob, type)) + ob for ob in list(globals().values()) if isinstance(ob, type)) if issubclass(c, VCSTestCase) \ and c.Class == VCS] diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py index 0c92058..6da299a 100644 --- a/libbe/storage/vcs/bzr.py +++ b/libbe/storage/vcs/bzr.py @@ -39,11 +39,11 @@ import os import os.path import re import shutil -import StringIO +import io import sys import libbe -import base +from . import base if libbe.TESTING == True: import doctest @@ -84,7 +84,7 @@ class Bzr(base.VCS): def _vcs_root(self, path): """Find the root of the deepest repository containing path.""" cmd = bzrlib.builtins.cmd_root() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() cmd.run(filename=path) if self.version_cmp(2,2,0) < 0: cmd.cleanup_now() @@ -92,7 +92,7 @@ class Bzr(base.VCS): def _vcs_init(self, path): cmd = bzrlib.builtins.cmd_init() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() cmd.run(location=path) if self.version_cmp(2,2,0) < 0: cmd.cleanup_now() @@ -105,7 +105,7 @@ class Bzr(base.VCS): def _vcs_add(self, path): path = os.path.join(self.repo, path) cmd = bzrlib.builtins.cmd_add() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() kwargs = {'file_ids_from': self.repo} if self.repo == os.path.realpath(os.getcwd()): # Work around bzr file locking on Windows. @@ -126,7 +126,7 @@ class Bzr(base.VCS): # --force to also remove unversioned files. path = os.path.join(self.repo, path) cmd = bzrlib.builtins.cmd_remove() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() cmd.run(file_list=[path], file_deletion_strategy='force') if self.version_cmp(2,2,0) < 0: cmd.cleanup_now() @@ -150,14 +150,14 @@ class Bzr(base.VCS): path = os.path.join(self.repo, path) revision = self._parse_revision_string(revision) cmd = bzrlib.builtins.cmd_cat() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() if self.version_cmp(1,6,0) < 0: # old bzrlib cmd_cat uses sys.stdout not self.outf for output. stdout = sys.stdout sys.stdout = cmd.outf try: cmd.run(filename=path, revision=revision) - except bzrlib.errors.BzrCommandError, e: + except bzrlib.errors.BzrCommandError as e: if 'not present in revision' in str(e): raise base.InvalidPath(path, root=self.repo, revision=revision) raise @@ -177,7 +177,7 @@ class Bzr(base.VCS): def _vcs_isdir(self, path, revision): try: self._vcs_listdir(path, revision) - except AttributeError, e: + except AttributeError as e: if 'children' in str(e): return False raise @@ -187,7 +187,7 @@ class Bzr(base.VCS): path = os.path.join(self.repo, path) revision = self._parse_revision_string(revision) cmd = bzrlib.builtins.cmd_ls() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() try: if self.version_cmp(2,0,0) >= 0: cmd.run(revision=revision, path=path, recursive=recursive) @@ -197,7 +197,7 @@ class Bzr(base.VCS): # (https://bugs.launchpad.net/bzr/+bug/158690) cmd.run(revision=revision, path=path, non_recursive=False) - except bzrlib.errors.BzrCommandError, e: + except bzrlib.errors.BzrCommandError as e: if 'not present in revision' in str(e): raise base.InvalidPath(path, root=self.repo, revision=revision) raise @@ -212,12 +212,12 @@ class Bzr(base.VCS): def _vcs_commit(self, commitfile, allow_empty=False): cmd = bzrlib.builtins.cmd_commit() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() cwd = os.getcwd() os.chdir(self.repo) try: cmd.run(file=commitfile, unchanged=allow_empty) - except bzrlib.errors.BzrCommandError, e: + except bzrlib.errors.BzrCommandError as e: strings = ['no changes to commit.', # bzr 1.3.1 'No changes to commit.'] # bzr 1.15.1 if self._u_any_in_string(strings, str(e)) == True: @@ -231,7 +231,7 @@ class Bzr(base.VCS): def _vcs_revision_id(self, index): cmd = bzrlib.builtins.cmd_revno() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() cmd.run(location=self.repo) if self.version_cmp(2,2,0) < 0: cmd.cleanup_now() @@ -245,7 +245,7 @@ class Bzr(base.VCS): def _diff(self, revision): revision = self._parse_revision_string(revision) cmd = bzrlib.builtins.cmd_diff() - cmd.outf = StringIO.StringIO() + cmd.outf = io.StringIO() # for some reason, cmd_diff uses sys.stdout not self.outf for output. stdout = sys.stdout sys.stdout = cmd.outf diff --git a/libbe/storage/vcs/darcs.py b/libbe/storage/vcs/darcs.py index ec100b4..41262f5 100644 --- a/libbe/storage/vcs/darcs.py +++ b/libbe/storage/vcs/darcs.py @@ -104,10 +104,10 @@ class Darcs(base.VCS): for num in num_part.split('.'): try: self._parsed_version.append(int(num)) - except ValueError, e: + except ValueError as e: self._parsed_version.append(num) for current,other in zip(self._parsed_version, args): - if type(current) != types.IntType: + if type(current) != int: raise NotImplementedError( 'Cannot parse non-integer portion "%s" of Darcs version "%s"' % (current, self.version())) @@ -284,7 +284,7 @@ class Darcs(base.VCS): assert patch.tag == 'patch', patch.tag for child in patch.getchildren(): if child.tag == 'name': - text = unescape(unicode(child.text).decode('unicode_escape').strip()) + text = unescape(str(child.text).decode('unicode_escape').strip()) revisions.append(text) revisions.reverse() return revisions diff --git a/libbe/storage/vcs/git.py b/libbe/storage/vcs/git.py index 851af19..9a15c92 100644 --- a/libbe/storage/vcs/git.py +++ b/libbe/storage/vcs/git.py @@ -32,7 +32,7 @@ import unittest try: import pygit2 as _pygit2 -except ImportError, error: +except ImportError as error: _pygit2 = None _pygit2_import_error = error else: @@ -65,7 +65,7 @@ class PygitGit(base.VCS): Using :py:mod:`pygit2` for the Git activity. """ name='pygit2' - _null_hex = u'0' * 40 + _null_hex = '0' * 40 def __init__(self, *args, **kwargs): base.VCS.__init__(self, *args, **kwargs) @@ -162,7 +162,7 @@ class PygitGit(base.VCS): def _git_get_commit(self, revision): if isinstance(revision, str): - revision = unicode(revision, 'ascii') + revision = str(revision, 'ascii') commit = self._pygit_repository.revparse_single(revision) assert commit.type == _pygit2.GIT_OBJ_COMMIT, commit return commit diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py index f2976ff..0ebdfb0 100644 --- a/libbe/storage/vcs/hg.py +++ b/libbe/storage/vcs/hg.py @@ -47,12 +47,12 @@ import os import os.path import re import shutil -import StringIO +import io import sys import time # work around http://mercurial.selenic.com/bts/issue618 import libbe -import base +from . import base if libbe.TESTING == True: import doctest @@ -85,7 +85,7 @@ class Hg(base.VCS): fullargs = ['--cwd', kwargs['cwd']] fullargs.extend(args) cwd = os.getcwd() - output = StringIO.StringIO() + output = io.StringIO() if self.version_cmp(1,9) >= 0: req = mercurial.dispatch.request(fullargs, fout=output) mercurial.dispatch.dispatch(req) |