aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatěj Cepl <mcepl@cepl.eu>2024-01-18 19:09:58 +0100
committerMatěj Cepl <mcepl@cepl.eu>2024-01-18 19:09:58 +0100
commitcc7362d28bd9c43cb6839809f86e59874f2fe458 (patch)
tree1053748f5890c769d1e99806bc0c8bcffca6f0fe
parent003bd13629d9db2f14156f97b74a4672e9ecdf77 (diff)
downloadbugseverywhere-cc7362d28bd9c43cb6839809f86e59874f2fe458.tar.gz
2to3 conversion of the repo.
-rw-r--r--Makefile7
-rw-r--r--doc/conf.py8
-rw-r--r--interfaces/email/interactive/send_pgp_mime.py30
-rwxr-xr-xinterfaces/web/cfbe.py2
-rw-r--r--interfaces/web/web.py22
-rw-r--r--libbe/bug.py24
-rw-r--r--libbe/bugdir.py32
-rw-r--r--libbe/command/__init__.py2
-rw-r--r--libbe/command/assign.py2
-rw-r--r--libbe/command/base.py22
-rw-r--r--libbe/command/comment.py2
-rw-r--r--libbe/command/commit.py6
-rw-r--r--libbe/command/depend.py37
-rw-r--r--libbe/command/diff.py6
-rw-r--r--libbe/command/due.py4
-rw-r--r--libbe/command/help.py6
-rw-r--r--libbe/command/html.py14
-rw-r--r--libbe/command/import_xml.py156
-rw-r--r--libbe/command/init.py7
-rw-r--r--libbe/command/list.py19
-rw-r--r--libbe/command/merge.py4
-rw-r--r--libbe/command/new.py2
-rw-r--r--libbe/command/remove.py4
-rw-r--r--libbe/command/serve_commands.py12
-rw-r--r--libbe/command/set.py7
-rw-r--r--libbe/command/severity.py2
-rw-r--r--libbe/command/show.py13
-rw-r--r--libbe/command/status.py2
-rw-r--r--libbe/command/subscribe.py8
-rw-r--r--libbe/command/tag.py8
-rw-r--r--libbe/command/target.py12
-rw-r--r--libbe/command/util.py10
-rw-r--r--libbe/comment.py15
-rw-r--r--libbe/diff.py10
-rw-r--r--libbe/storage/__init__.py2
-rw-r--r--libbe/storage/base.py94
-rw-r--r--libbe/storage/util/config.py8
-rw-r--r--libbe/storage/util/mapfile.py4
-rw-r--r--libbe/storage/util/properties.py150
-rw-r--r--libbe/storage/util/settings_object.py184
-rw-r--r--libbe/storage/util/upgrade.py31
-rw-r--r--libbe/storage/vcs/__init__.py2
-rw-r--r--libbe/storage/vcs/base.py38
-rw-r--r--libbe/storage/vcs/bzr.py30
-rw-r--r--libbe/storage/vcs/darcs.py6
-rw-r--r--libbe/storage/vcs/git.py6
-rw-r--r--libbe/storage/vcs/hg.py6
-rw-r--r--libbe/ui/command_line.py74
-rw-r--r--libbe/ui/util/editor.py4
-rw-r--r--libbe/ui/util/user.py4
-rw-r--r--libbe/util/encoding.py2
-rw-r--r--libbe/util/http.py20
-rw-r--r--libbe/util/id.py48
-rw-r--r--libbe/util/subproc.py12
-rw-r--r--libbe/util/utility.py2
-rw-r--r--libbe/util/wsgi.py65
-rwxr-xr-xlibbe/version.py2
-rwxr-xr-xrelease.py32
58 files changed, 666 insertions, 677 deletions
diff --git a/Makefile b/Makefile
index fe62840..9d2993b 100644
--- a/Makefile
+++ b/Makefile
@@ -31,6 +31,7 @@ SHELL = /bin/bash
RM = /bin/rm
RST2MAN = /usr/bin/rst2man
RST2HTML = /usr/bin/rst2html
+PYTHON = /usr/bin/python3
#PATH = /usr/bin:/bin # must include sphinx-build for 'sphinx' target.
@@ -59,17 +60,17 @@ all: build
.PHONY: build
build: $(LIBBE_VERSION)
- python setup.py build
+ $(PYTHON) setup.py build
.PHONY: doc
doc: $(DOC)
.PHONY: install
install: build doc
- python setup.py install ${INSTALL_OPTIONS}
+ $(PYTHON) setup.py install ${INSTALL_OPTIONS}
test: build
- python test.py
+ $(PYTHON) test.py
.PHONY: clean
clean:
diff --git a/doc/conf.py b/doc/conf.py
index 75b9031..efe6eb3 100644
--- a/doc/conf.py
+++ b/doc/conf.py
@@ -42,8 +42,8 @@ source_suffix = '.txt'
master_doc = 'index'
# General information about the project.
-project = u'bugs-everywhere'
-copyright = u'2010, W. Trevor King'
+project = 'bugs-everywhere'
+copyright = '2010, W. Trevor King'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
@@ -188,8 +188,8 @@ htmlhelp_basename = 'bugs-everywheredoc'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
- ('index', 'bugs-everywhere.tex', u'bugs-everywhere Documentation',
- u'W. Trevor King', 'manual'),
+ ('index', 'bugs-everywhere.tex', 'bugs-everywhere Documentation',
+ 'W. Trevor King', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
diff --git a/interfaces/email/interactive/send_pgp_mime.py b/interfaces/email/interactive/send_pgp_mime.py
index 8febe10..e0ffbd5 100644
--- a/interfaces/email/interactive/send_pgp_mime.py
+++ b/interfaces/email/interactive/send_pgp_mime.py
@@ -27,7 +27,7 @@ the pgp_* commands. You may need to adjust the sendmail command to
point to whichever sendmail-compatible mailer you have on your system.
"""
-from cStringIO import StringIO
+from io import StringIO
import os
import re
#import GnuPGInterface # Maybe should use this instead of subprocess
@@ -150,16 +150,16 @@ def header_from_text(text, encoding="us-ascii"):
<BLANKLINE>
"""
text = text.strip()
- if type(text) == types.UnicodeType:
+ if type(text) == str:
text = text.encode(encoding)
# assume StringType arguments are already encoded
p = Parser()
return p.parsestr(text, headersonly=True)
def guess_encoding(text):
- if type(text) == types.StringType:
+ if type(text) == bytes:
encoding = "us-ascii"
- elif type(text) == types.UnicodeType:
+ elif type(text) == str:
for encoding in ["us-ascii", "iso-8859-1", "utf-8"]:
try:
text.encode(encoding)
@@ -181,7 +181,7 @@ def encodedMIMEText(body, encoding=None):
def append_text(text_part, new_text):
original_payload = text_part.get_payload(decode=True)
- new_payload = u"%s%s" % (original_payload, new_text)
+ new_payload = "%s%s" % (original_payload, new_text)
new_encoding = guess_encoding(new_payload)
text_part.set_payload(new_payload.encode(new_encoding), new_encoding)
@@ -190,7 +190,7 @@ def attach_root(header, root_part):
Attach the email.Message root_part to the email.Message header
without generating a multi-part message.
"""
- for k,v in header.items():
+ for k,v in list(header.items()):
root_part[k] = v
return root_part
@@ -199,19 +199,19 @@ def execute(args, stdin=None, expect=(0,)):
Execute a command (allows us to drive gpg).
"""
if verboseInvoke == True:
- print >> sys.stderr, '$ '+args
+ print('$ '+args, file=sys.stderr)
try:
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
- except OSError, e:
+ except OSError as e:
strerror = '%s\nwhile executing %s' % (e.args[1], args)
- raise Exception, strerror
+ raise Exception(strerror)
output, error = p.communicate(input=stdin)
status = p.wait()
if verboseInvoke == True:
- print >> sys.stderr, '(status: %d)\n%s%s' % (status, output, error)
+ print('(status: %d)\n%s%s' % (status, output, error), file=sys.stderr)
if status not in expect:
strerror = '%s\nwhile executing %s\n%s\n%d' % (args[1], args, error, status)
- raise Exception, strerror
+ raise Exception(strerror)
return status, output, error
def replace(template, format_char, replacement_text):
@@ -245,7 +245,7 @@ def flatten(msg, to_unicode=False):
text = fp.getvalue()
if to_unicode == True:
encoding = msg.get_content_charset() or "utf-8"
- text = unicode(text, encoding=encoding)
+ text = str(text, encoding=encoding)
return text
def source_email(msg, return_realname=False):
@@ -581,7 +581,7 @@ if __name__ == '__main__':
else:
header = file(options.header_filename, 'r').read()
if header == None:
- raise Exception, "missing header"
+ raise Exception("missing header")
headermsg = header_from_text(header)
body = None
if options.body_filename != None:
@@ -592,7 +592,7 @@ if __name__ == '__main__':
else:
body = file(options.body_filename, 'r').read()
if body == None:
- raise Exception, "missing body"
+ raise Exception("missing body")
m = PGPMimeMessageFactory(body)
if options.mode == "sign":
@@ -609,6 +609,6 @@ if __name__ == '__main__':
message = attach_root(headermsg, bodymsg)
if options.output == True:
message = flatten(message)
- print message
+ print(message)
else:
mail(message, sendmail)
diff --git a/interfaces/web/cfbe.py b/interfaces/web/cfbe.py
index 68c484d..33edc0d 100755
--- a/interfaces/web/cfbe.py
+++ b/interfaces/web/cfbe.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
import cherrypy
-import web
+from . import web
from optparse import OptionParser
from os import path
diff --git a/interfaces/web/web.py b/interfaces/web/web.py
index 2907932..ed87c78 100644
--- a/interfaces/web/web.py
+++ b/interfaces/web/web.py
@@ -1,5 +1,5 @@
from datetime import datetime
-from urllib import urlencode
+from urllib.parse import urlencode
from jinja2 import Environment, FileSystemLoader
import cherrypy
@@ -30,7 +30,7 @@ class WebInterface:
store = storage.get_storage(self.bug_root)
store.connect()
version = store.storage_version()
- print version
+ print(version)
self.bd = bugdir.BugDir(store, from_storage=True)
self.repository_name = self.bug_root.split('/')[-1]
self.env = Environment(loader=FileSystemLoader(template_root))
@@ -39,20 +39,20 @@ class WebInterface:
def get_common_information(self):
"""Returns a dict of common information that most pages will need."""
possible_assignees = list(set(
- [unicode(bug.assigned) for bug in self.bd if bug.assigned != EMPTY]))
- possible_assignees.sort(key=unicode.lower)
+ [str(bug.assigned) for bug in self.bd if bug.assigned != EMPTY]))
+ possible_assignees.sort(key=str.lower)
possible_targets = list(set(
- [unicode(bug.summary.rstrip("\n")) for bug in self.bd \
- if bug.severity == u"target"]))
+ [str(bug.summary.rstrip("\n")) for bug in self.bd \
+ if bug.severity == "target"]))
- possible_targets.sort(key=unicode.lower)
+ possible_targets.sort(key=str.lower)
- possible_statuses = [u'open', u'assigned', u'test', u'unconfirmed',
- u'closed', u'disabled', u'fixed', u'wontfix']
+ possible_statuses = ['open', 'assigned', 'test', 'unconfirmed',
+ 'closed', 'disabled', 'fixed', 'wontfix']
- possible_severities = [u'minor', u'serious', u'critical', u'fatal',
- u'wishlist']
+ possible_severities = ['minor', 'serious', 'critical', 'fatal',
+ 'wishlist']
return {'possible_assignees': possible_assignees,
'possible_targets': possible_targets,
diff --git a/libbe/bug.py b/libbe/bug.py
index 7b3931d..a4ef53d 100644
--- a/libbe/bug.py
+++ b/libbe/bug.py
@@ -279,7 +279,7 @@ class Bug (settings_object.SavedSettingsObject):
value = getattr(self, setting)
if value == None:
return ""
- if type(value) not in types.StringTypes:
+ if type(value) not in (str,):
return str(value)
return value
@@ -440,7 +440,7 @@ class Bug (settings_object.SavedSettingsObject):
return istring + sep.join(lines).rstrip('\n')
def from_xml(self, xml_string, preserve_uuids=False):
- u"""
+ """
Note: If a bug uuid is given, set .alt_id to it's value.
>>> bugA = Bug(uuid="0123", summary="Need to test Bug.from_xml()")
>>> bugA.date = "Thu, 01 Jan 1970 00:00:00 +0000"
@@ -469,7 +469,7 @@ class Bug (settings_object.SavedSettingsObject):
>>> bugC.uuid == bugA.uuid
True
"""
- if type(xml_string) == types.UnicodeType:
+ if type(xml_string) == str:
xml_string = xml_string.strip().encode('unicode_escape')
if hasattr(xml_string, 'getchildren'): # already an ElementTree Element
bug = xml_string
@@ -497,7 +497,7 @@ class Bug (settings_object.SavedSettingsObject):
text = settings_object.EMPTY
else:
text = xml.sax.saxutils.unescape(child.text)
- if not isinstance(text, unicode):
+ if not isinstance(text, str):
text = text.decode('unicode_escape')
text = text.strip()
if child.tag == 'uuid' and not preserve_uuids:
@@ -718,16 +718,15 @@ class Bug (settings_object.SavedSettingsObject):
if accept_extra_strings == True:
self.extra_strings += [estr]
elif change_exception == True:
- raise ValueError, \
- 'Merge would add extra string "%s" for bug %s' \
- % (estr, self.uuid)
+ raise ValueError('Merge would add extra string "%s" for bug %s' \
+ % (estr, self.uuid))
for o_comm in other.comments():
try:
s_comm = self.comment_root.comment_from_uuid(o_comm.uuid)
- except KeyError, e:
+ except KeyError as e:
try:
s_comm = self.comment_root.comment_from_uuid(o_comm.alt_id)
- except KeyError, e:
+ except KeyError as e:
s_comm = None
if s_comm == None:
if accept_comments == True:
@@ -736,9 +735,8 @@ class Bug (settings_object.SavedSettingsObject):
o_comm_copy.id = libbe.util.id.ID(o_comm_copy, 'comment')
self.comment_root.add_reply(o_comm_copy)
elif change_exception == True:
- raise ValueError, \
- 'Merge would add comment %s (alt: %s) to bug %s' \
- % (o_comm.uuid, o_comm.alt_id, self.uuid)
+ raise ValueError('Merge would add comment %s (alt: %s) to bug %s' \
+ % (o_comm.uuid, o_comm.alt_id, self.uuid))
else:
s_comm.merge(o_comm, accept_changes=accept_changes,
accept_extra_strings=accept_extra_strings,
@@ -752,7 +750,7 @@ class Bug (settings_object.SavedSettingsObject):
self.id.storage('values'), '{}\n')
try:
settings = mapfile.parse(settings_mapfile)
- except mapfile.InvalidMapfileContents, e:
+ except mapfile.InvalidMapfileContents as e:
raise Exception('Invalid settings file for bug %s\n'
'(BE version missmatch?)' % self.id.user())
self._setup_saved_settings(settings)
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index 8b9e1e7..5738a80 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -186,7 +186,7 @@ class BugDir (list, settings_object.SavedSettingsObject):
self.storage.get(self.id.storage('settings'), default='{}\n')
try:
settings = mapfile.parse(settings_mapfile)
- except mapfile.InvalidMapfileContents, e:
+ except mapfile.InvalidMapfileContents as e:
raise Exception('Invalid settings file for bugdir %s\n'
'(BE version missmatch?)' % self.id.user())
self._setup_saved_settings(settings)
@@ -438,7 +438,7 @@ class BugDir (list, settings_object.SavedSettingsObject):
... inactive_status_def=bug.inactive_status_def)
>>> bugdirA.cleanup()
"""
- if type(xml_string) == types.UnicodeType:
+ if type(xml_string) == str:
xml_string = xml_string.strip().encode('unicode_escape')
if hasattr(xml_string, 'getchildren'): # already an ElementTree Element
bugdir = xml_string
@@ -500,7 +500,7 @@ class BugDir (list, settings_object.SavedSettingsObject):
text = entries
else:
text = xml.sax.saxutils.unescape(child.text)
- if not isinstance(text, unicode):
+ if not isinstance(text, str):
text = text.decode('unicode_escape')
text = text.strip()
if child.tag == 'uuid' and not preserve_uuids:
@@ -695,11 +695,11 @@ class RevisionedBugDir (BugDir):
kwargs['revision'] = self.r
return self.sget(*args, **kwargs)
def ancestors(self, *args, **kwargs):
- print 'getting ancestors', args, kwargs
+ print('getting ancestors', args, kwargs)
if not 'revision' in kwargs or kwargs['revision'] == None:
kwargs['revision'] = self.r
ret = self.sancestors(*args, **kwargs)
- print 'got ancestors', ret
+ print('got ancestors', ret)
return ret
def children(self, *args, **kwargs):
if not 'revision' in kwargs or kwargs['revision'] == None:
@@ -895,20 +895,20 @@ if libbe.TESTING == True:
preexisting bugs.
"""
bugdir = SimpleBugDir(memory=False)
- self.failUnless(bugdir.storage.is_readable() == True,
+ self.assertTrue(bugdir.storage.is_readable() == True,
bugdir.storage.is_readable())
- self.failUnless(bugdir.storage.is_writeable() == True,
+ self.assertTrue(bugdir.storage.is_writeable() == True,
bugdir.storage.is_writeable())
uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == ['a', 'b'], uuids)
+ self.assertTrue(uuids == ['a', 'b'], uuids)
bugdir.flush_reload()
uuids = sorted(bugdir.uuids())
- self.failUnless(uuids == ['a', 'b'], uuids)
+ self.assertTrue(uuids == ['a', 'b'], uuids)
uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == [], uuids)
+ self.assertTrue(uuids == [], uuids)
bugdir.load_all_bugs()
uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == ['a', 'b'], uuids)
+ self.assertTrue(uuids == ['a', 'b'], uuids)
bugdir.cleanup()
def testInMemoryCleanLoad(self):
"""
@@ -916,16 +916,16 @@ if libbe.TESTING == True:
preexisting bugs.
"""
bugdir = SimpleBugDir(memory=True)
- self.failUnless(bugdir.storage == None, bugdir.storage)
+ self.assertTrue(bugdir.storage == None, bugdir.storage)
uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == ['a', 'b'], uuids)
+ self.assertTrue(uuids == ['a', 'b'], uuids)
uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == ['a', 'b'], uuids)
+ self.assertTrue(uuids == ['a', 'b'], uuids)
bugdir._clear_bugs()
uuids = sorted(bugdir.uuids())
- self.failUnless(uuids == [], uuids)
+ self.assertTrue(uuids == [], uuids)
uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == [], uuids)
+ self.assertTrue(uuids == [], uuids)
bugdir.cleanup()
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
diff --git a/libbe/command/__init__.py b/libbe/command/__init__.py
index f1c74d5..18d12a5 100644
--- a/libbe/command/__init__.py
+++ b/libbe/command/__init__.py
@@ -17,7 +17,7 @@
# You should have received a copy of the GNU General Public License along with
# Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
-import base
+from . import base
UserError = base.UserError
UsageError = base.UsageError
diff --git a/libbe/command/assign.py b/libbe/command/assign.py
index edc8ed1..4cadc11 100644
--- a/libbe/command/assign.py
+++ b/libbe/command/assign.py
@@ -27,7 +27,7 @@ import libbe.command.util
class Assign (libbe.command.Command):
- u"""Assign an individual or group to fix a bug
+ """Assign an individual or group to fix a bug
>>> import sys
>>> import libbe.bugdir
diff --git a/libbe/command/base.py b/libbe/command/base.py
index f019450..efba1b2 100644
--- a/libbe/command/base.py
+++ b/libbe/command/base.py
@@ -21,9 +21,9 @@
import codecs
import optparse
import os.path
-import StringIO
+import io
import sys
-import urlparse
+import urllib.parse
import libbe
import libbe.storage
@@ -84,8 +84,8 @@ def get_command(command_name):
try:
cmd = libbe.util.plugin.import_by_name(
'libbe.command.%s' % command_name.replace("-", "_"))
- except ImportError, e:
- raise UnknownCommand(command_name, message=unicode(e))
+ except ImportError as e:
+ raise UnknownCommand(command_name, message=str(e))
return cmd
def get_command_class(module=None, command_name=None):
@@ -104,7 +104,7 @@ def get_command_class(module=None, command_name=None):
try:
cname = command_name.capitalize().replace('-', '_')
cmd = getattr(module, cname)
- except ImportError, e:
+ except ImportError as e:
raise UnknownCommand(command_name)
return cmd
@@ -320,9 +320,9 @@ class Command (object):
if 'user-id' in options:
self._user_id = options.pop('user-id')
if len(options) > 0:
- raise UserError, 'Invalid option passed to command %s:\n %s' \
+ raise UserError('Invalid option passed to command %s:\n %s' \
% (self.name, '\n '.join(['%s: %s' % (k,v)
- for k,v in options.items()]))
+ for k,v in list(options.items())])))
in_optional_args = False
for i,arg in enumerate(self.args):
if arg.repeatable == True:
@@ -352,7 +352,7 @@ class Command (object):
'command': self.name,
'parameters': kwargs,
}, context=0)
- url = urlparse.urljoin(self.server, 'run')
+ url = urllib.parse.urljoin(self.server, 'run')
page,final_url,info = libbe.util.http.get_post_url(
url=url, get=False, data=data, agent=self.user_agent)
self.stdout.write(page)
@@ -491,14 +491,14 @@ class StringInputOutput (InputOutput):
u'goodbye\\n'
"""
def __init__(self):
- stdin = StringIO.StringIO()
+ stdin = io.StringIO()
stdin.encoding = 'utf-8'
- stdout = StringIO.StringIO()
+ stdout = io.StringIO()
stdout.encoding = 'utf-8'
InputOutput.__init__(self, stdin, stdout)
def set_stdin(self, stdin_string):
- self.stdin = StringIO.StringIO(stdin_string)
+ self.stdin = io.StringIO(stdin_string)
def get_stdout(self):
ret = self.stdout.getvalue()
diff --git a/libbe/command/comment.py b/libbe/command/comment.py
index 302445c..e110c81 100644
--- a/libbe/command/comment.py
+++ b/libbe/command/comment.py
@@ -138,7 +138,7 @@ class Comment (libbe.command.Command):
estr = 'Please enter your comment above\n\n%s\n\n> %s\n' \
% (header, '\n> '.join(parent_body.splitlines()))
body = libbe.ui.util.editor.editor_string(estr)
- except libbe.ui.util.editor.CantFindEditor, e:
+ except libbe.ui.util.editor.CantFindEditor as e:
raise libbe.command.UserError(
'No comment supplied, and EDITOR not specified.')
if body is None:
diff --git a/libbe/command/commit.py b/libbe/command/commit.py
index 453311f..077980d 100644
--- a/libbe/command/commit.py
+++ b/libbe/command/commit.py
@@ -93,9 +93,9 @@ class Commit (libbe.command.Command):
try:
revision = storage.commit(summary, body=body,
allow_empty=params['allow-empty'])
- print >> self.stdout, 'Committed %s' % revision
- except libbe.storage.EmptyCommit, e:
- print >> self.stdout, e
+ print('Committed %s' % revision, file=self.stdout)
+ except libbe.storage.EmptyCommit as e:
+ print(e, file=self.stdout)
return 1
def _long_help(self):
diff --git a/libbe/command/depend.py b/libbe/command/depend.py
index 0040cf3..4cb0efd 100644
--- a/libbe/command/depend.py
+++ b/libbe/command/depend.py
@@ -211,10 +211,9 @@ class Depend (libbe.command.Command):
bugdirs, repair_broken_links=True)
assert len(broken) == 0, broken
if len(fixed) > 0:
- print >> self.stdout, 'Fixed the following links:'
- print >> self.stdout, \
- '\n'.join(['%s |-- %s' % (blockee.id.user(), blocker.id.user())
- for blockee,blocker in fixed])
+ print('Fixed the following links:', file=self.stdout)
+ print('\n'.join(['%s |-- %s' % (blockee.id.user(), blocker.id.user())
+ for blockee,blocker in fixed]), file=self.stdout)
return 0
status = parse_status(params['status'])
severity = parse_severity(params['severity'])
@@ -227,19 +226,19 @@ class Depend (libbe.command.Command):
if params['tree-depth'] != None:
dtree = DependencyTree(bugdirs, bugA, params['tree-depth'], filter)
if len(dtree.blocked_by_tree()) > 0:
- print >> self.stdout, '%s blocked by:' % bugA.id.user()
+ print('%s blocked by:' % bugA.id.user(), file=self.stdout)
for depth,node in dtree.blocked_by_tree().thread():
if depth == 0: continue
- print >> self.stdout, (
+ print((
'%s%s'
- % (' '*(depth), self.bug_string(node.bug, params)))
+ % (' '*(depth), self.bug_string(node.bug, params))), file=self.stdout)
if len(dtree.blocks_tree()) > 0:
- print >> self.stdout, '%s blocks:' % bugA.id.user()
+ print('%s blocks:' % bugA.id.user(), file=self.stdout)
for depth,node in dtree.blocks_tree().thread():
if depth == 0: continue
- print >> self.stdout, (
+ print((
'%s%s'
- % (' '*(depth), self.bug_string(node.bug, params)))
+ % (' '*(depth), self.bug_string(node.bug, params))), file=self.stdout)
return 0
if params['blocking-bug-id'] != None:
@@ -254,16 +253,14 @@ class Depend (libbe.command.Command):
blocked_by = get_blocked_by(bugdirs, bugA)
if len(blocked_by) > 0:
- print >> self.stdout, '%s blocked by:' % bugA.id.user()
- print >> self.stdout, \
- '\n'.join([self.bug_string(_bug, params)
- for _bug in blocked_by])
+ print('%s blocked by:' % bugA.id.user(), file=self.stdout)
+ print('\n'.join([self.bug_string(_bug, params)
+ for _bug in blocked_by]), file=self.stdout)
blocks = get_blocks(bugdirs, bugA)
if len(blocks) > 0:
- print >> self.stdout, '%s blocks:' % bugA.id.user()
- print >> self.stdout, \
- '\n'.join([self.bug_string(_bug, params)
- for _bug in blocks])
+ print('%s blocks:' % bugA.id.user(), file=self.stdout)
+ print('\n'.join([self.bug_string(_bug, params)
+ for _bug in blocks]), file=self.stdout)
return 0
def bug_string(self, _bug, params):
@@ -410,13 +407,13 @@ def check_dependencies(bugdirs, repair_broken_links=False):
[]
>>> bugdir.cleanup()
"""
- for bugdir in bugdirs.values():
+ for bugdir in list(bugdirs.values()):
if bugdir.storage is not None:
bugdir.load_all_bugs()
good_links = []
fixed_links = []
broken_links = []
- for bugdir in bugdirs.values():
+ for bugdir in list(bugdirs.values()):
for bug in bugdir:
for blocker in get_blocked_by(bugdirs, bug):
blocks = get_blocks(bugdirs, blocker)
diff --git a/libbe/command/diff.py b/libbe/command/diff.py
index 1368654..d8aea37 100644
--- a/libbe/command/diff.py
+++ b/libbe/command/diff.py
@@ -89,7 +89,7 @@ class Diff (libbe.command.Command):
try:
subscriptions = libbe.diff.subscriptions_from_string(
params['subscribe'])
- except ValueError, e:
+ except ValueError as e:
raise libbe.command.UserError(e.msg)
bugdirs = self._get_bugdirs()
for uuid,bugdir in sorted(bugdirs.items()):
@@ -124,11 +124,11 @@ class Diff (libbe.command.Command):
bugs = tree.child_by_path('/bugs')
for bug_type in bugs:
uuids.extend([bug.name for bug in bug_type])
- print >> self.stdout, '\n'.join(uuids)
+ print('\n'.join(uuids), file=self.stdout)
else :
rep = tree.report_string()
if rep != None:
- print >> self.stdout, rep
+ print(rep, file=self.stdout)
return 0
def _long_help(self):
diff --git a/libbe/command/due.py b/libbe/command/due.py
index b2a281f..ddf111a 100644
--- a/libbe/command/due.py
+++ b/libbe/command/due.py
@@ -68,9 +68,9 @@ class Due (libbe.command.Command):
if params['due'] == None:
due_time = get_due(bug)
if due_time is None:
- print >> self.stdout, 'No due date assigned.'
+ print('No due date assigned.', file=self.stdout)
else:
- print >> self.stdout, libbe.util.utility.time_to_str(due_time)
+ print(libbe.util.utility.time_to_str(due_time), file=self.stdout)
else:
if params['due'] == 'none':
remove_due(bug)
diff --git a/libbe/command/help.py b/libbe/command/help.py
index 3af7769..981ea1a 100644
--- a/libbe/command/help.py
+++ b/libbe/command/help.py
@@ -95,15 +95,15 @@ class Help (libbe.command.Command):
def _run(self, **params):
if params['topic'] == None:
if hasattr(self.ui, 'help'):
- print >> self.stdout, self.ui.help().rstrip('\n')
+ print(self.ui.help().rstrip('\n'), file=self.stdout)
elif params['topic'] in libbe.command.commands(command_names=True):
module = libbe.command.get_command(params['topic'])
Class = libbe.command.get_command_class(module,params['topic'])
c = Class(ui=self.ui)
self.ui.setup_command(c)
- print >> self.stdout, c.help().rstrip('\n')
+ print(c.help().rstrip('\n'), file=self.stdout)
elif params['topic'] in TOPICS:
- print >> self.stdout, TOPICS[params['topic']].rstrip('\n')
+ print(TOPICS[params['topic']].rstrip('\n'), file=self.stdout)
else:
raise libbe.command.UserError(
'"%s" is neither a command nor topic' % params['topic'])
diff --git a/libbe/command/html.py b/libbe/command/html.py
index 3dfeb75..df87450 100644
--- a/libbe/command/html.py
+++ b/libbe/command/html.py
@@ -23,7 +23,7 @@
import codecs
import email.utils
-import htmlentitydefs
+import html.entities
import itertools
import os
import os.path
@@ -97,7 +97,7 @@ class ServerApp (libbe.util.wsgi.WSGI_AppObject,
filter_ = self._filters.get(bug_type, self._filters['active'])
bugs = list(itertools.chain(*list(
[bug for bug in bugdir if filter_(bug)]
- for bugdir in self.bugdirs.values())))
+ for bugdir in list(self.bugdirs.values()))))
bugs.sort()
if self.logger:
self.logger.log(
@@ -187,13 +187,13 @@ class ServerApp (libbe.util.wsgi.WSGI_AppObject,
if time.time() > self._refresh:
if self.logger:
self.logger.log(self.log_level, 'refresh bugdirs')
- for bugdir in self.bugdirs.values():
+ for bugdir in list(self.bugdirs.values()):
bugdir.load_all_bugs()
self._refresh = time.time() + 60
def _truncated_bugdir_id(self, bugdir):
return libbe.util.id._truncate(
- bugdir.uuid, self.bugdirs.keys(),
+ bugdir.uuid, list(self.bugdirs.keys()),
min_length=self.min_id_length)
def _truncated_bug_id(self, bug):
@@ -255,7 +255,7 @@ class ServerApp (libbe.util.wsgi.WSGI_AppObject,
p = libbe.util.id.parse_user(bugdirs, long_id)
except (libbe.util.id.MultipleIDMatches,
libbe.util.id.NoIDMatches,
- libbe.util.id.InvalidIDStructure), e:
+ libbe.util.id.InvalidIDStructure) as e:
return '#%s#' % long_id # re-wrap failures
if p['type'] == 'bugdir':
return '#%s#' % long_id
@@ -881,7 +881,7 @@ will exit after the dump without serving anything over the wire.
def _write_default_template(self, template_dict, out_dir):
out_dir = self._make_dir(out_dir)
- for filename,text in template_dict.iteritems():
+ for filename,text in template_dict.items():
self._write_file(text, [out_dir, filename])
def _write_static_pages(self, app, out_dir):
@@ -904,7 +904,7 @@ will exit after the dump without serving anything over the wire.
for url_,path_ in url_mappings:
content = content.replace(url_, path_)
self._write_file(content=content, path_array=[out_dir, path])
- for bugdir in app.bugdirs.values():
+ for bugdir in list(app.bugdirs.values()):
for bug in bugdir:
bug_dir_url = app.bug_dir(bug=bug)
segments = bug_dir_url.split('/')
diff --git a/libbe/command/import_xml.py b/libbe/command/import_xml.py
index fbf456b..8699e70 100644
--- a/libbe/command/import_xml.py
+++ b/libbe/command/import_xml.py
@@ -37,7 +37,7 @@ import libbe.util.utility
if libbe.TESTING == True:
import doctest
- import StringIO
+ import io
import unittest
import libbe.bugdir
@@ -474,147 +474,147 @@ if libbe.TESTING == True:
self.bugdir.flush_reload()
def testCleanBugdir(self):
uuids = list(self.bugdir.uuids())
- self.failUnless(uuids == ['b'], uuids)
+ self.assertTrue(uuids == ['b'], uuids)
def testNotAddOnly(self):
bugB = self.bugdir.bug_from_uuid('b')
self._execute(self.xml, {}, ['-'])
uuids = list(self.bugdir.uuids())
- self.failUnless(uuids == ['b'], uuids)
+ self.assertTrue(uuids == ['b'], uuids)
bugB = self.bugdir.bug_from_uuid('b')
- self.failUnless(bugB.uuid == 'b', bugB.uuid)
- self.failUnless(bugB.creator == 'John', bugB.creator)
- self.failUnless(bugB.status == 'fixed', bugB.status)
- self.failUnless(bugB.summary == 'a test bug', bugB.summary)
+ self.assertTrue(bugB.uuid == 'b', bugB.uuid)
+ self.assertTrue(bugB.creator == 'John', bugB.creator)
+ self.assertTrue(bugB.status == 'fixed', bugB.status)
+ self.assertTrue(bugB.summary == 'a test bug', bugB.summary)
estrs = ["don't forget your towel",
'helps with space travel',
'watch out for flying dolphins']
- self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ self.assertTrue(bugB.extra_strings == estrs, bugB.extra_strings)
comments = list(bugB.comments())
- self.failUnless(len(comments) == 3,
+ self.assertTrue(len(comments) == 3,
['%s (%s, %s)' % (c.uuid, c.alt_id, c.body)
for c in comments])
c1 = bugB.comment_from_uuid('c1')
comments.remove(c1)
- self.failUnless(c1.uuid == 'c1', c1.uuid)
- self.failUnless(c1.alt_id == None, c1.alt_id)
- self.failUnless(c1.author == 'Jane', c1.author)
- self.failUnless(c1.body == 'So long\n', c1.body)
+ self.assertTrue(c1.uuid == 'c1', c1.uuid)
+ self.assertTrue(c1.alt_id == None, c1.alt_id)
+ self.assertTrue(c1.author == 'Jane', c1.author)
+ self.assertTrue(c1.body == 'So long\n', c1.body)
c2 = bugB.comment_from_uuid('c2')
comments.remove(c2)
- self.failUnless(c2.uuid == 'c2', c2.uuid)
- self.failUnless(c2.alt_id == None, c2.alt_id)
- self.failUnless(c2.author == 'Jess', c2.author)
- self.failUnless(c2.body == 'World\n', c2.body)
+ self.assertTrue(c2.uuid == 'c2', c2.uuid)
+ self.assertTrue(c2.alt_id == None, c2.alt_id)
+ self.assertTrue(c2.author == 'Jess', c2.author)
+ self.assertTrue(c2.body == 'World\n', c2.body)
c4 = comments[0]
- self.failUnless(len(c4.uuid) == 36, c4.uuid)
- self.failUnless(c4.alt_id == 'c3', c4.alt_id)
- self.failUnless(c4.author == 'Jed', c4.author)
- self.failUnless(c4.body == 'And thanks\n', c4.body)
+ self.assertTrue(len(c4.uuid) == 36, c4.uuid)
+ self.assertTrue(c4.alt_id == 'c3', c4.alt_id)
+ self.assertTrue(c4.author == 'Jed', c4.author)
+ self.assertTrue(c4.body == 'And thanks\n', c4.body)
def testAddOnly(self):
bugB = self.bugdir.bug_from_uuid('b')
initial_bugB_summary = bugB.summary
self._execute(self.xml, {'add-only':True}, ['-'])
uuids = list(self.bugdir.uuids())
- self.failUnless(uuids == ['b'], uuids)
+ self.assertTrue(uuids == ['b'], uuids)
bugB = self.bugdir.bug_from_uuid('b')
- self.failUnless(bugB.uuid == 'b', bugB.uuid)
- self.failUnless(bugB.creator == 'John', bugB.creator)
- self.failUnless(bugB.status == 'open', bugB.status)
- self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary)
+ self.assertTrue(bugB.uuid == 'b', bugB.uuid)
+ self.assertTrue(bugB.creator == 'John', bugB.creator)
+ self.assertTrue(bugB.status == 'open', bugB.status)
+ self.assertTrue(bugB.summary == initial_bugB_summary, bugB.summary)
estrs = ["don't forget your towel",
'helps with space travel']
- self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ self.assertTrue(bugB.extra_strings == estrs, bugB.extra_strings)
comments = list(bugB.comments())
- self.failUnless(len(comments) == 3,
+ self.assertTrue(len(comments) == 3,
['%s (%s)' % (c.uuid, c.alt_id) for c in comments])
c1 = bugB.comment_from_uuid('c1')
comments.remove(c1)
- self.failUnless(c1.uuid == 'c1', c1.uuid)
- self.failUnless(c1.alt_id == None, c1.alt_id)
- self.failUnless(c1.author == 'Jane', c1.author)
- self.failUnless(c1.body == 'Hello\n', c1.body)
+ self.assertTrue(c1.uuid == 'c1', c1.uuid)
+ self.assertTrue(c1.alt_id == None, c1.alt_id)
+ self.assertTrue(c1.author == 'Jane', c1.author)
+ self.assertTrue(c1.body == 'Hello\n', c1.body)
c2 = bugB.comment_from_uuid('c2')
comments.remove(c2)
- self.failUnless(c2.uuid == 'c2', c2.uuid)
- self.failUnless(c2.alt_id == None, c2.alt_id)
- self.failUnless(c2.author == 'Jess', c2.author)
- self.failUnless(c2.body == 'World\n', c2.body)
+ self.assertTrue(c2.uuid == 'c2', c2.uuid)
+ self.assertTrue(c2.alt_id == None, c2.alt_id)
+ self.assertTrue(c2.author == 'Jess', c2.author)
+ self.assertTrue(c2.body == 'World\n', c2.body)
c4 = comments[0]
- self.failUnless(len(c4.uuid) == 36, c4.uuid)
- self.failUnless(c4.alt_id == 'c3', c4.alt_id)
- self.failUnless(c4.author == 'Jed', c4.author)
- self.failUnless(c4.body == 'And thanks\n', c4.body)
+ self.assertTrue(len(c4.uuid) == 36, c4.uuid)
+ self.assertTrue(c4.alt_id == 'c3', c4.alt_id)
+ self.assertTrue(c4.author == 'Jed', c4.author)
+ self.assertTrue(c4.body == 'And thanks\n', c4.body)
def testRootCommentsNotAddOnly(self):
bugB = self.bugdir.bug_from_uuid('b')
initial_bugB_summary = bugB.summary
self._execute(self.root_comment_xml, {'root':'/b'}, ['-'])
uuids = list(self.bugdir.uuids())
uuids = list(self.bugdir.uuids())
- self.failUnless(uuids == ['b'], uuids)
+ self.assertTrue(uuids == ['b'], uuids)
bugB = self.bugdir.bug_from_uuid('b')
- self.failUnless(bugB.uuid == 'b', bugB.uuid)
- self.failUnless(bugB.creator == 'John', bugB.creator)
- self.failUnless(bugB.status == 'open', bugB.status)
- self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary)
+ self.assertTrue(bugB.uuid == 'b', bugB.uuid)
+ self.assertTrue(bugB.creator == 'John', bugB.creator)
+ self.assertTrue(bugB.status == 'open', bugB.status)
+ self.assertTrue(bugB.summary == initial_bugB_summary, bugB.summary)
estrs = ["don't forget your towel",
'helps with space travel']
- self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ self.assertTrue(bugB.extra_strings == estrs, bugB.extra_strings)
comments = list(bugB.comments())
- self.failUnless(len(comments) == 3,
+ self.assertTrue(len(comments) == 3,
['%s (%s, %s)' % (c.uuid, c.alt_id, c.body)
for c in comments])
c1 = bugB.comment_from_uuid('c1')
comments.remove(c1)
- self.failUnless(c1.uuid == 'c1', c1.uuid)
- self.failUnless(c1.alt_id == None, c1.alt_id)
- self.failUnless(c1.author == 'Jane', c1.author)
- self.failUnless(c1.body == 'So long\n', c1.body)
+ self.assertTrue(c1.uuid == 'c1', c1.uuid)
+ self.assertTrue(c1.alt_id == None, c1.alt_id)
+ self.assertTrue(c1.author == 'Jane', c1.author)
+ self.assertTrue(c1.body == 'So long\n', c1.body)
c2 = bugB.comment_from_uuid('c2')
comments.remove(c2)
- self.failUnless(c2.uuid == 'c2', c2.uuid)
- self.failUnless(c2.alt_id == None, c2.alt_id)
- self.failUnless(c2.author == 'Jess', c2.author)
- self.failUnless(c2.body == 'World\n', c2.body)
+ self.assertTrue(c2.uuid == 'c2', c2.uuid)
+ self.assertTrue(c2.alt_id == None, c2.alt_id)
+ self.assertTrue(c2.author == 'Jess', c2.author)
+ self.assertTrue(c2.body == 'World\n', c2.body)
c4 = comments[0]
- self.failUnless(len(c4.uuid) == 36, c4.uuid)
- self.failUnless(c4.alt_id == 'c3', c4.alt_id)
- self.failUnless(c4.author == 'Jed', c4.author)
- self.failUnless(c4.body == 'And thanks\n', c4.body)
+ self.assertTrue(len(c4.uuid) == 36, c4.uuid)
+ self.assertTrue(c4.alt_id == 'c3', c4.alt_id)
+ self.assertTrue(c4.author == 'Jed', c4.author)
+ self.assertTrue(c4.body == 'And thanks\n', c4.body)
def testRootCommentsAddOnly(self):
bugB = self.bugdir.bug_from_uuid('b')
initial_bugB_summary = bugB.summary
self._execute(self.root_comment_xml,
{'root':'/b', 'add-only':True}, ['-'])
uuids = list(self.bugdir.uuids())
- self.failUnless(uuids == ['b'], uuids)
+ self.assertTrue(uuids == ['b'], uuids)
bugB = self.bugdir.bug_from_uuid('b')
- self.failUnless(bugB.uuid == 'b', bugB.uuid)
- self.failUnless(bugB.creator == 'John', bugB.creator)
- self.failUnless(bugB.status == 'open', bugB.status)
- self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary)
+ self.assertTrue(bugB.uuid == 'b', bugB.uuid)
+ self.assertTrue(bugB.creator == 'John', bugB.creator)
+ self.assertTrue(bugB.status == 'open', bugB.status)
+ self.assertTrue(bugB.summary == initial_bugB_summary, bugB.summary)
estrs = ["don't forget your towel",
'helps with space travel']
- self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ self.assertTrue(bugB.extra_strings == estrs, bugB.extra_strings)
comments = list(bugB.comments())
- self.failUnless(len(comments) == 3,
+ self.assertTrue(len(comments) == 3,
['%s (%s)' % (c.uuid, c.alt_id) for c in comments])
c1 = bugB.comment_from_uuid('c1')
comments.remove(c1)
- self.failUnless(c1.uuid == 'c1', c1.uuid)
- self.failUnless(c1.alt_id == None, c1.alt_id)
- self.failUnless(c1.author == 'Jane', c1.author)
- self.failUnless(c1.body == 'Hello\n', c1.body)
+ self.assertTrue(c1.uuid == 'c1', c1.uuid)
+ self.assertTrue(c1.alt_id == None, c1.alt_id)
+ self.assertTrue(c1.author == 'Jane', c1.author)
+ self.assertTrue(c1.body == 'Hello\n', c1.body)
c2 = bugB.comment_from_uuid('c2')
comments.remove(c2)
- self.failUnless(c2.uuid == 'c2', c2.uuid)
- self.failUnless(c2.alt_id == None, c2.alt_id)
- self.failUnless(c2.author == 'Jess', c2.author)
- self.failUnless(c2.body == 'World\n', c2.body)
+ self.assertTrue(c2.uuid == 'c2', c2.uuid)
+ self.assertTrue(c2.alt_id == None, c2.alt_id)
+ self.assertTrue(c2.author == 'Jess', c2.author)
+ self.assertTrue(c2.body == 'World\n', c2.body)
c4 = comments[0]
- self.failUnless(len(c4.uuid) == 36, c4.uuid)
- self.failUnless(c4.alt_id == 'c3', c4.alt_id)
- self.failUnless(c4.author == 'Jed', c4.author)
- self.failUnless(c4.body == 'And thanks\n', c4.body)
+ self.assertTrue(len(c4.uuid) == 36, c4.uuid)
+ self.assertTrue(c4.alt_id == 'c3', c4.alt_id)
+ self.assertTrue(c4.author == 'Jed', c4.author)
+ self.assertTrue(c4.body == 'And thanks\n', c4.body)
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/command/init.py b/libbe/command/init.py
index b4b3456..8640ae4 100644
--- a/libbe/command/init.py
+++ b/libbe/command/init.py
@@ -99,11 +99,10 @@ class Init (libbe.command.Command):
bd = libbe.bugdir.BugDir(storage, from_storage=False)
self.ui.storage_callbacks.set_bugdirs({bd.uuid: bd})
if bd.storage.name is not 'None':
- print >> self.stdout, \
- 'Using %s for revision control.' % storage.name
+ print('Using %s for revision control.' % storage.name, file=self.stdout)
else:
- print >> self.stdout, 'No revision control detected.'
- print >> self.stdout, 'BE repository initialized.'
+ print('No revision control detected.', file=self.stdout)
+ print('BE repository initialized.', file=self.stdout)
def _long_help(self):
return """
diff --git a/libbe/command/list.py b/libbe/command/list.py
index 4bb9c7b..6d4fcd8 100644
--- a/libbe/command/list.py
+++ b/libbe/command/list.py
@@ -135,11 +135,11 @@ class List (libbe.command.Command):
extra_strings_regexps=extra_strings_regexps)
bugs = list(itertools.chain(*list(
[bugdir.bug_from_uuid(uuid) for uuid in bugdir.uuids()]
- for bugdir in bugdirs.values())))
- bugs = [b for b in bugs if filter(bugdirs, b) == True]
+ for bugdir in list(bugdirs.values()))))
+ bugs = [b for b in bugs if list(filter(bugdirs, b)) == True]
self.result = bugs
if len(bugs) == 0 and params['xml'] == False:
- print >> self.stdout, 'No matching bugs found'
+ print('No matching bugs found', file=self.stdout)
# sort bugs
bugs = self._sort_bugs(bugs, cmp_list)
@@ -147,7 +147,7 @@ class List (libbe.command.Command):
# print list of bugs
if params['ids'] == True:
for bug in bugs:
- print >> self.stdout, bug.id.user()
+ print(bug.id.user(), file=self.stdout)
else:
self._list_bugs(bugs, show_tags=params['tags'], xml=params['xml'])
storage.writeable = writeable
@@ -194,13 +194,12 @@ class List (libbe.command.Command):
def _list_bugs(self, bugs, show_tags=False, xml=False):
if xml == True:
- print >> self.stdout, \
- '<?xml version="1.0" encoding="%s" ?>' % self.stdout.encoding
- print >> self.stdout, '<be-xml>'
+ print('<?xml version="1.0" encoding="%s" ?>' % self.stdout.encoding, file=self.stdout)
+ print('<be-xml>', file=self.stdout)
if len(bugs) > 0:
for bug in bugs:
if xml == True:
- print >> self.stdout, bug.xml(show_comments=True)
+ print(bug.xml(show_comments=True), file=self.stdout)
else:
bug_string = bug.string(shortlist=True)
if show_tags == True:
@@ -210,9 +209,9 @@ class List (libbe.command.Command):
% (attrs,
','.join(libbe.command.tag.get_tags(bug)),
summary))
- print >> self.stdout, bug_string
+ print(bug_string, file=self.stdout)
if xml == True:
- print >> self.stdout, '</be-xml>'
+ print('</be-xml>', file=self.stdout)
def _long_help(self):
return """
diff --git a/libbe/command/merge.py b/libbe/command/merge.py
index 2d52492..85fbb60 100644
--- a/libbe/command/merge.py
+++ b/libbe/command/merge.py
@@ -182,8 +182,8 @@ class Merge (libbe.command.Command):
mergeA.add_reply(comment, allow_time_inversion=True)
bugB.new_comment('Merged into bug #%s#' % bugA.id.long_user())
bugB.status = 'closed'
- print >> self.stdout, 'Merged bugs #%s# and #%s#' \
- % (bugA.id.user(), bugB.id.user())
+ print('Merged bugs #%s# and #%s#' \
+ % (bugA.id.user(), bugB.id.user()), file=self.stdout)
return 0
def _long_help(self):
diff --git a/libbe/command/new.py b/libbe/command/new.py
index 33ede50..b9a3147 100644
--- a/libbe/command/new.py
+++ b/libbe/command/new.py
@@ -120,7 +120,7 @@ class New (libbe.command.Command):
if params['bugdir']:
bugdir = bugdirs[params['bugdir']]
elif len(bugdirs) == 1:
- bugdir = bugdirs.values()[0]
+ bugdir = list(bugdirs.values())[0]
else:
raise libbe.command.UserError(
'Ambiguous bugdir {}'.format(sorted(bugdirs.values())))
diff --git a/libbe/command/remove.py b/libbe/command/remove.py
index 08eb4d1..2ca2699 100644
--- a/libbe/command/remove.py
+++ b/libbe/command/remove.py
@@ -71,9 +71,9 @@ class Remove (libbe.command.Command):
user_ids.append(bug.id.user())
bugdir.remove_bug(bug)
if len(user_ids) == 1:
- print >> self.stdout, 'Removed bug %s' % user_ids[0]
+ print('Removed bug %s' % user_ids[0], file=self.stdout)
else:
- print >> self.stdout, 'Removed bugs %s' % ', '.join(user_ids)
+ print('Removed bugs %s' % ', '.join(user_ids), file=self.stdout)
return 0
def _long_help(self):
diff --git a/libbe/command/serve_commands.py b/libbe/command/serve_commands.py
index d380a8d..dbdb76b 100644
--- a/libbe/command/serve_commands.py
+++ b/libbe/command/serve_commands.py
@@ -27,7 +27,7 @@ import logging
import os.path
import posixpath
import re
-import urllib
+import urllib.request, urllib.parse, urllib.error
import wsgiref.simple_server
import libbe
@@ -40,7 +40,7 @@ import libbe.version
if libbe.TESTING:
import copy
import doctest
- import StringIO
+ import io
import sys
import unittest
import wsgiref.validate
@@ -90,7 +90,7 @@ class ServerApp (libbe.util.wsgi.WSGI_AppObject,
parameters = data.get('parameters', {})
try:
Class = libbe.command.get_command_class(command_name=name)
- except libbe.command.UnknownCommand, e:
+ except libbe.command.UnknownCommand as e:
raise libbe.util.wsgi.HandlerError(
libbe.util.http.HTTP_USER_ERROR, 'UnknownCommand {}'.format(e))
command = Class(ui=self.ui)
@@ -197,12 +197,12 @@ if libbe.TESTING:
'parameters': params,
}, context=0)
self.getURL(self.app, '/run', method='POST', data=data)
- self.failUnless(self.status.startswith('200 '), self.status)
- self.failUnless(
+ self.assertTrue(self.status.startswith('200 '), self.status)
+ self.assertTrue(
('Content-Type', 'application/octet-stream'
) in self.response_headers,
self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
+ self.assertTrue(self.exc_info == None, self.exc_info)
# TODO: integration tests on ServeCommands?
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
diff --git a/libbe/command/set.py b/libbe/command/set.py
index 859a731..9ec7e2c 100644
--- a/libbe/command/set.py
+++ b/libbe/command/set.py
@@ -79,7 +79,7 @@ class Set (libbe.command.Command):
if params['bugdir']:
bugdir = bugdirs[params['bugdir']]
elif len(bugdirs) == 1:
- bugdir = bugdirs.values()[0]
+ bugdir = list(bugdirs.values())[0]
else:
raise libbe.command.UserError(
'Ambiguous bugdir {}'.format(sorted(bugdirs.values())))
@@ -87,8 +87,7 @@ class Set (libbe.command.Command):
keys = bugdir.settings_properties
keys.sort()
for key in keys:
- print >> self.stdout, \
- '%16s: %s' % (key, _value_string(bugdir, key))
+ print('%16s: %s' % (key, _value_string(bugdir, key)), file=self.stdout)
return 0
if params['setting'] not in bugdir.settings_properties:
msg = 'Invalid setting %s\n' % params['setting']
@@ -96,7 +95,7 @@ class Set (libbe.command.Command):
msg += '\n '.join(bugdir.settings_properties)
raise libbe.command.UserError(msg)
if params['value'] == None:
- print _value_string(bugdir, params['setting'])
+ print(_value_string(bugdir, params['setting']))
else:
if params['value'] == 'none':
params['value'] = EMPTY
diff --git a/libbe/command/severity.py b/libbe/command/severity.py
index 302d523..b97643e 100644
--- a/libbe/command/severity.py
+++ b/libbe/command/severity.py
@@ -74,7 +74,7 @@ class Severity (libbe.command.Command):
if bug.severity != params['severity']:
try:
bug.severity = params['severity']
- except ValueError, e:
+ except ValueError as e:
if e.name != 'severity':
raise e
raise libbe.command.UserError(
diff --git a/libbe/command/show.py b/libbe/command/show.py
index 3b61cc5..c08fd1f 100644
--- a/libbe/command/show.py
+++ b/libbe/command/show.py
@@ -112,10 +112,9 @@ class Show (libbe.command.Command):
% params['id'][0])
sys.__stdout__.write(comment.body)
return 0
- print >> self.stdout, \
- output(bugdirs, params['id'], encoding=self.stdout.encoding,
+ print(output(bugdirs, params['id'], encoding=self.stdout.encoding,
as_xml=params['xml'],
- with_comments=not params['no-comments'])
+ with_comments=not params['no-comments']), file=self.stdout)
return 0
def _long_help(self):
@@ -147,7 +146,7 @@ def _sort_ids(bugdirs, ids, with_comments=True):
root_comments[p['bug']] = [p['comment']]
else:
root_comments[p['bug']].append(p['comment'])
- for bugname in root_comments.keys():
+ for bugname in list(root_comments.keys()):
assert bugname not in bugs, \
'specifically requested both #/%s/%s# and #/%s#' \
% (bugname, root_comments[bugname][0], bugname)
@@ -169,7 +168,7 @@ def _xml_footer():
def output(bugdirs, ids, encoding, as_xml=True, with_comments=True):
if ids == None or len(ids) == 0:
ids = []
- for bugdir in bugdirs.values():
+ for bugdir in list(bugdirs.values()):
bugdir.load_all_bugs()
ids.extend([bug.id.user() for bug in bugdir])
uuids,root_comments = _sort_ids(bugdirs, ids, with_comments)
@@ -187,14 +186,14 @@ def output(bugdirs, ids, encoding, as_xml=True, with_comments=True):
if spaces_left > 0:
spaces_left -= 1
lines.append('') # add a blank line between bugs/comments
- for bugname,comments in root_comments.items():
+ for bugname,comments in list(root_comments.items()):
bug = libbe.command.util.bug_from_uuid(bugdirs, bugname)
if as_xml:
lines.extend([' <bug>', ' <uuid>%s</uuid>' % bug.uuid])
for commname in comments:
try:
comment = bug.comment_root.comment_from_uuid(commname)
- except KeyError, e:
+ except KeyError as e:
raise libbe.command.UserError(e.message)
if as_xml:
lines.append(comment.xml(indent=4))
diff --git a/libbe/command/status.py b/libbe/command/status.py
index 96448df..b5dbf2e 100644
--- a/libbe/command/status.py
+++ b/libbe/command/status.py
@@ -75,7 +75,7 @@ class Status (libbe.command.Command):
if bug.status != params['status']:
try:
bug.status = params['status']
- except ValueError, e:
+ except ValueError as e:
if e.name != 'status':
raise e
raise libbe.command.UserError(
diff --git a/libbe/command/subscribe.py b/libbe/command/subscribe.py
index 5884a9a..8541970 100644
--- a/libbe/command/subscribe.py
+++ b/libbe/command/subscribe.py
@@ -143,7 +143,7 @@ class Subscribe (libbe.command.Command):
types = params['types'].split(',')
if len(params['id']) == 0:
- params['id'] = bugdirs.keys()
+ params['id'] = list(bugdirs.keys())
for _id in params['id']:
p = libbe.util.id.parse_user(bugdirs, _id)
if p['type'] == 'bugdir':
@@ -173,7 +173,7 @@ class Subscribe (libbe.command.Command):
if params['list-all'] == True:
subscriptions = []
- for bugdir in bugdirs.values():
+ for bugdir in list(bugdirs.values()):
bugdir.load_all_bugs()
subscriptions.extend(
get_bugdir_subscribers(bugdir, servers[0]))
@@ -184,8 +184,8 @@ class Subscribe (libbe.command.Command):
subscriptions.append(estr[len(TAG):])
if len(subscriptions) > 0:
- print >> self.stdout, 'Subscriptions for %s:' % entity_name
- print >> self.stdout, '\n'.join(subscriptions)
+ print('Subscriptions for %s:' % entity_name, file=self.stdout)
+ print('\n'.join(subscriptions), file=self.stdout)
if params['list-all'] == True or params['list'] == True:
storage.writeable = writeable
return 0
diff --git a/libbe/command/tag.py b/libbe/command/tag.py
index 94030b8..3b11e99 100644
--- a/libbe/command/tag.py
+++ b/libbe/command/tag.py
@@ -112,10 +112,10 @@ class Tag (libbe.command.Command):
bugdirs = self._get_bugdirs()
if params['list'] == True:
tags = list(itertools.chain(*
- [get_all_tags(bugdir) for bugdir in bugdirs.values()]))
+ [get_all_tags(bugdir) for bugdir in list(bugdirs.values())]))
tags.sort()
if len(tags) > 0:
- print >> self.stdout, '\n'.join(tags)
+ print('\n'.join(tags), file=self.stdout)
return 0
bugdir,bug,comment = (
@@ -136,8 +136,8 @@ class Tag (libbe.command.Command):
tags.append(estr[len(TAG_TAG):])
if len(tags) > 0:
- print "Tags for %s:" % bug.id.user()
- print '\n'.join(tags)
+ print("Tags for %s:" % bug.id.user())
+ print('\n'.join(tags))
return 0
def _long_help(self):
diff --git a/libbe/command/target.py b/libbe/command/target.py
index 9f76303..0886fbf 100644
--- a/libbe/command/target.py
+++ b/libbe/command/target.py
@@ -101,15 +101,15 @@ class Target (libbe.command.Command):
if params['bugdir']:
bugdir = bugdirs[params['bugdir']]
elif len(bugdirs) == 1:
- bugdir = bugdirs.values()[0]
+ bugdir = list(bugdirs.values())[0]
else:
raise libbe.command.UserError(
'Ambiguous bugdir {}'.format(sorted(bugdirs.values())))
bug = bug_from_target_summary(bugdirs, bugdir, params['target'])
if bug == None:
- print >> self.stdout, 'No target assigned.'
+ print('No target assigned.', file=self.stdout)
else:
- print >> self.stdout, bug.id.long_user()
+ print(bug.id.long_user(), file=self.stdout)
return 0
bugdir,bug,comment = (
libbe.command.util.bugdir_bug_comment_from_user_id(
@@ -117,9 +117,9 @@ class Target (libbe.command.Command):
if params['target'] == None:
target = bug_target(bugdirs, bug)
if target == None:
- print >> self.stdout, 'No target assigned.'
+ print('No target assigned.', file=self.stdout)
else:
- print >> self.stdout, target.summary
+ print(target.summary, file=self.stdout)
else:
if params['target'] == 'none':
target = remove_target(bugdirs, bug)
@@ -204,7 +204,7 @@ def add_target(bugdirs, bugdir, bug, summary):
def targets(bugdirs):
"""Generate all possible target bug summaries."""
- for bugdir in bugdirs.values():
+ for bugdir in list(bugdirs.values()):
bugdir.load_all_bugs()
for bug in bugdir:
if bug.severity == 'target':
diff --git a/libbe/command/util.py b/libbe/command/util.py
index 9f76ad8..49f3490 100644
--- a/libbe/command/util.py
+++ b/libbe/command/util.py
@@ -61,7 +61,7 @@ def complete_severity(command, argument, fragment=None):
def assignees(bugdirs):
ret = set()
- for bugdir in bugdirs.values():
+ for bugdir in list(bugdirs.values()):
bugdir.load_all_bugs()
ret.update(set([bug.assigned for bug in bugdir
if bug.assigned != None]))
@@ -77,7 +77,7 @@ def complete_extra_strings(command, argument, fragment=None):
def complete_bugdir_id(command, argument, fragment=None):
bugdirs = command._get_bugdirs()
- return bugdirs.keys()
+ return list(bugdirs.keys())
def complete_bug_id(command, argument, fragment=None):
return complete_bug_comment_id(command, argument, fragment,
@@ -96,11 +96,11 @@ def complete_bug_comment_id(command, argument, fragment=None,
root,residual = (fragment, None)
if not root.endswith('/'):
root += '/'
- except libbe.util.id.InvalidIDStructure, e:
+ except libbe.util.id.InvalidIDStructure as e:
return []
except libbe.util.id.NoIDMatches:
return []
- except libbe.util.id.MultipleIDMatches, e:
+ except libbe.util.id.MultipleIDMatches as e:
if e.common == None:
# choose among bugdirs
return e.matches
@@ -226,7 +226,7 @@ def bugdir_bug_comment_from_user_id(bugdirs, id):
def bug_from_uuid(bugdirs, uuid):
error = None
- for bugdir in bugdirs.values():
+ for bugdir in list(bugdirs.values()):
try:
bug = bugdir.bug_from_uuid(uuid)
except libbe.bugdir.NoBugMatches as e:
diff --git a/libbe/comment.py b/libbe/comment.py
index 9bef50a..d03f8ce 100644
--- a/libbe/comment.py
+++ b/libbe/comment.py
@@ -256,7 +256,7 @@ class Comment (Tree, settings_object.SavedSettingsObject):
value = getattr(self, setting)
if value == None:
return ""
- if type(value) not in types.StringTypes:
+ if type(value) not in (str,):
return str(value)
return value
@@ -334,7 +334,7 @@ class Comment (Tree, settings_object.SavedSettingsObject):
return istring + sep.join(lines).rstrip('\n')
def from_xml(self, xml_string, preserve_uuids=False):
- u"""
+ """
Note: If alt-id is not given, translates any <uuid> fields to
<alt-id> fields.
>>> commA = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
@@ -358,7 +358,7 @@ class Comment (Tree, settings_object.SavedSettingsObject):
>>> commC.uuid == commA.uuid
True
"""
- if type(xml_string) == types.UnicodeType:
+ if type(xml_string) == str:
xml_string = xml_string.strip().encode('unicode_escape')
if hasattr(xml_string, 'getchildren'): # already an ElementTree Element
comment = xml_string
@@ -381,7 +381,7 @@ class Comment (Tree, settings_object.SavedSettingsObject):
text = settings_object.EMPTY
else:
text = xml.sax.saxutils.unescape(child.text)
- if not isinstance(text, unicode):
+ if not isinstance(text, str):
text = text.decode('unicode_escape')
text = text.strip()
if child.tag == 'uuid' and not preserve_uuids:
@@ -481,9 +481,8 @@ class Comment (Tree, settings_object.SavedSettingsObject):
if accept_extra_strings == True:
self.extra_strings.append(estr)
elif change_exception == True:
- raise ValueError, \
- 'Merge would add extra string "%s" to comment %s' \
- % (estr, self.uuid)
+ raise ValueError('Merge would add extra string "%s" to comment %s' \
+ % (estr, self.uuid))
def string(self, indent=0):
"""
@@ -613,7 +612,7 @@ class Comment (Tree, settings_object.SavedSettingsObject):
self.id.storage('values'), '{}\n')
try:
settings = mapfile.parse(settings_mapfile)
- except mapfile.InvalidMapfileContents, e:
+ except mapfile.InvalidMapfileContents as e:
raise Exception('Invalid settings file for comment %s\n'
'(BE version missmatch?)' % self.id.user())
self._setup_saved_settings(settings)
diff --git a/libbe/diff.py b/libbe/diff.py
index c2f4e68..c2f9c8e 100644
--- a/libbe/diff.py
+++ b/libbe/diff.py
@@ -99,7 +99,7 @@ class Subscription (object):
kwargs['type_root'] = BUGDIR_TYPE_ALL
else:
kwargs['type_root'] = BUG_TYPE_ALL
- if type(subscription_type) in types.StringTypes:
+ if type(subscription_type) in (str,):
subscription_type = type_from_name(subscription_type, **kwargs)
self.id = id
self.type = subscription_type
@@ -221,8 +221,8 @@ class DiffTree (libbe.util.tree.Tree):
if names[1] == child.name:
return child.child_by_path(names[1:])
if len(names) == 1:
- raise KeyError, "%s doesn't match '%s'" % (names, self.name)
- raise KeyError, '%s points to child not in %s' % (names, [c.name for c in self])
+ raise KeyError("%s doesn't match '%s'" % (names, self.name))
+ raise KeyError('%s points to child not in %s' % (names, [c.name for c in self]))
def report_string(self):
report = self.report()
if report == None:
@@ -649,10 +649,10 @@ class Diff (object):
def attribute_change_string(self, attribute_changes, indent=0):
indent_string = ' '*indent
- change_strings = [u'%s: %s -> %s' % f for f in attribute_changes]
+ change_strings = ['%s: %s -> %s' % f for f in attribute_changes]
for i,change_string in enumerate(change_strings):
change_strings[i] = indent_string+change_string
- return u'\n'.join(change_strings)
+ return '\n'.join(change_strings)
def bugdir_attribute_change_string(self, attribute_changes):
return 'Changed bug directory settings:\n%s' % \
self.attribute_change_string(attribute_changes, indent=1)
diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py
index 01885e9..baed80b 100644
--- a/libbe/storage/__init__.py
+++ b/libbe/storage/__init__.py
@@ -53,7 +53,7 @@ STORAGE_VERSIONS = ['Bugs Everywhere Tree 1 0',
STORAGE_VERSION = STORAGE_VERSIONS[-1]
def get_vcs_storage(location):
- import vcs
+ from . import vcs
s = vcs.detect_vcs(location)
s.repo = location
return s
diff --git a/libbe/storage/base.py b/libbe/storage/base.py
index 977179d..4c05287 100644
--- a/libbe/storage/base.py
+++ b/libbe/storage/base.py
@@ -198,7 +198,7 @@ class Storage (object):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
root = Entry(id='__ROOT__', directory=True)
d = {root.id:root}
- pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1)
+ pickle.dump(dict((k,v._objects_to_ids()) for k,v in list(d.items())), f, -1)
f.close()
def destroy(self):
@@ -223,7 +223,7 @@ class Storage (object):
except IOError:
raise ConnectionError(self)
d = pickle.load(f)
- self._data = dict((k,v._ids_to_objects(d)) for k,v in d.items())
+ self._data = dict((k,v._ids_to_objects(d)) for k,v in list(d.items()))
f.close()
def disconnect(self):
@@ -238,7 +238,7 @@ class Storage (object):
def _disconnect(self):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
pickle.dump(dict((k,v._objects_to_ids())
- for k,v in self._data.items()), f, -1)
+ for k,v in list(self._data.items())), f, -1)
f.close()
self._data = None
@@ -336,9 +336,9 @@ class Storage (object):
decode = False
value = self._get(*args, **kwargs)
if value != None:
- if decode == True and type(value) != types.UnicodeType:
- return unicode(value, self.encoding)
- elif decode == False and type(value) != types.StringType:
+ if decode == True and type(value) != str:
+ return str(value, self.encoding)
+ elif decode == False and type(value) != bytes:
return value.encode(self.encoding)
return value
@@ -355,7 +355,7 @@ class Storage (object):
"""
if self.is_writeable() == False:
raise NotWriteable('Cannot set entry in unwriteable storage.')
- if type(value) == types.UnicodeType:
+ if type(value) == str:
value = value.encode(self.encoding)
self._set(id, value, *args, **kwargs)
@@ -386,7 +386,7 @@ class VersionedStorage (Storage):
summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit')
body = Entry(id='__COMMIT__BODY__')
initial_commit = {root.id:root, summary.id:summary, body.id:body}
- d = dict((k,v._objects_to_ids()) for k,v in initial_commit.items())
+ d = dict((k,v._objects_to_ids()) for k,v in list(initial_commit.items()))
pickle.dump([d, copy.deepcopy(d)], f, -1) # [inital tree, working tree]
f.close()
@@ -396,14 +396,14 @@ class VersionedStorage (Storage):
except IOError:
raise ConnectionError(self)
d = pickle.load(f)
- self._data = [dict((k,v._ids_to_objects(t)) for k,v in t.items())
+ self._data = [dict((k,v._ids_to_objects(t)) for k,v in list(t.items()))
for t in d]
f.close()
def _disconnect(self):
f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
pickle.dump([dict((k,v._objects_to_ids())
- for k,v in t.items()) for t in self._data], f, -1)
+ for k,v in list(t.items())) for t in self._data], f, -1)
f.close()
self._data = None
@@ -527,7 +527,7 @@ class VersionedStorage (Storage):
new = []
modified = []
removed = []
- for id,value in self._data[int(revision)].items():
+ for id,value in list(self._data[int(revision)].items()):
if id.startswith('__'):
continue
if not id in self._data[-1]:
@@ -624,7 +624,7 @@ if TESTING == True:
def test_initially_empty(self):
"""New repository should be empty."""
- self.failUnless(len(self.s.children()) == 0, self.s.children())
+ self.assertTrue(len(self.s.children()) == 0, self.s.children())
def test_add_identical_rooted(self):
"""Adding entries with the same ID should not increase the number of children.
@@ -632,7 +632,7 @@ if TESTING == True:
for i in range(10):
self.s.add('some id', directory=False)
s = sorted(self.s.children())
- self.failUnless(s == ['some id'], s)
+ self.assertTrue(s == ['some id'], s)
def test_add_rooted(self):
"""Adding entries should increase the number of children (rooted).
@@ -642,7 +642,7 @@ if TESTING == True:
ids.append(str(i))
self.s.add(ids[-1], directory=(i % 2 == 0))
s = sorted(self.s.children())
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_add_nonrooted(self):
"""Adding entries should increase the number of children (nonrooted).
@@ -653,9 +653,9 @@ if TESTING == True:
ids.append(str(i))
self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
def test_ancestors(self):
"""Check ancestors lists.
@@ -668,7 +668,7 @@ if TESTING == True:
j_id = str(20*(i+1)+j)
self.s.add(j_id, i_id, directory=(i%2 == 0))
ancestors = sorted(self.s.ancestors(j_id))
- self.failUnless(ancestors == [i_id, 'parent'],
+ self.assertTrue(ancestors == [i_id, 'parent'],
'Unexpected ancestors for %s/%s, "%s"'
% (i_id, j_id, ancestors))
@@ -681,7 +681,7 @@ if TESTING == True:
ids.append('parent/%s' % str(i))
self.s.add(ids[-1], 'parent', directory=(i % 2 == 0))
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_grandchildren(self):
"""Grandchildren should not be returned as children.
@@ -699,7 +699,7 @@ if TESTING == True:
directory = (j % 2 == 0)
self.s.add(grandchild, child, directory=directory)
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_add_invalid_directory(self):
"""Should not be able to add children to non-directories.
@@ -719,7 +719,7 @@ if TESTING == True:
% (vars(self.Class)['name']))
except InvalidDirectory:
pass
- self.failUnless(len(self.s.children('parent')) == 0,
+ self.assertTrue(len(self.s.children('parent')) == 0,
self.s.children('parent'))
def test_remove_rooted(self):
@@ -732,7 +732,7 @@ if TESTING == True:
for i in range(10):
self.s.remove(ids.pop())
s = sorted(self.s.children())
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_remove_nonrooted(self):
"""Removing entries should decrease the number of children (nonrooted).
@@ -745,10 +745,10 @@ if TESTING == True:
for i in range(10):
self.s.remove(ids.pop())
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
if len(s) > 0:
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
def test_remove_directory_not_empty(self):
"""Removing a non-empty directory entry should raise exception.
@@ -778,7 +778,7 @@ if TESTING == True:
self.s.add(str(20*(i+1)+j), ids[-1], directory=(i%2 == 0))
self.s.recursive_remove('parent')
s = sorted(self.s.children())
- self.failUnless(s == [], s)
+ self.assertTrue(s == [], s)
class Storage_get_set_TestCase (StorageTestCase):
"""Test cases for Storage.get and .set methods."""
@@ -790,7 +790,7 @@ if TESTING == True:
"""Get should return specified default if id not in Storage.
"""
ret = self.s.get(self.id, default=self.val)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
@@ -811,7 +811,7 @@ if TESTING == True:
self.s.add(self.id, directory=False)
val = 'UNLIKELY DEFAULT'
ret = self.s.get(self.id, default=val)
- self.failUnless(ret == val,
+ self.assertTrue(ret == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, val))
@@ -832,29 +832,29 @@ if TESTING == True:
self.s.add(self.id, directory=False)
self.s.set(self.id, self.val)
ret = self.s.get(self.id)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
def test_unicode_set(self):
"""Set should define the value returned by get.
"""
- val = u'Fran\xe7ois'
+ val = 'Fran\xe7ois'
self.s.add(self.id, directory=False)
self.s.set(self.id, val)
ret = self.s.get(self.id, decode=True)
- self.failUnless(type(ret) == types.UnicodeType,
+ self.assertTrue(type(ret) == str,
"%s.get() returned %s not UnicodeType"
% (vars(self.Class)['name'], type(ret)))
- self.failUnless(ret == val,
+ self.assertTrue(ret == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
ret = self.s.get(self.id)
- self.failUnless(type(ret) == types.StringType,
+ self.assertTrue(type(ret) == bytes,
"%s.get() returned %s not StringType"
% (vars(self.Class)['name'], type(ret)))
- s = unicode(ret, self.s.encoding)
- self.failUnless(s == val,
+ s = str(ret, self.s.encoding)
+ self.assertTrue(s == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], s, self.val))
@@ -873,7 +873,7 @@ if TESTING == True:
self.s.disconnect()
self.s.connect()
ret = self.s.get(self.id)
- self.failUnless(ret == self.val,
+ self.assertTrue(ret == self.val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
@@ -886,7 +886,7 @@ if TESTING == True:
self.s.connect()
default = 'UNLIKELY DEFAULT'
ret = self.s.get(self.id, default=default)
- self.failUnless(ret in ['', default],
+ self.assertTrue(ret in ['', default],
"%s.get() returned %s not in %s"
% (vars(self.Class)['name'], ret, ['', default]))
@@ -901,9 +901,9 @@ if TESTING == True:
self.s.disconnect()
self.s.connect()
s = sorted(self.s.children('parent'))
- self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
+ self.assertTrue(s == ids, '\n %s\n !=\n %s' % (s, ids))
s = self.s.children()
- self.failUnless(s == ['parent'], s)
+ self.assertTrue(s == ['parent'], s)
class VersionedStorageTestCase (StorageTestCase):
"""Test cases for VersionedStorage methods."""
@@ -972,12 +972,12 @@ if TESTING == True:
self.commit_body))
for i in range(10):
rev = self.s.revision_id(i+1)
- self.failUnless(rev == revs[i],
+ self.assertTrue(rev == revs[i],
"%s.revision_id(%d) returned %s not %s"
% (vars(self.Class)['name'], i+1, rev, revs[i]))
for i in range(-1, -9, -1):
rev = self.s.revision_id(i)
- self.failUnless(rev == revs[i],
+ self.assertTrue(rev == revs[i],
"%s.revision_id(%d) returned %s not %s"
% (vars(self.Class)['name'], i, rev, revs[i]))
@@ -994,7 +994,7 @@ if TESTING == True:
self.commit_body))
for i in range(10):
ret = self.s.get(self.id, revision=revs[i])
- self.failUnless(ret == val(i),
+ self.assertTrue(ret == val(i),
"%s.get() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret, val(i), revs[i]))
@@ -1015,7 +1015,7 @@ if TESTING == True:
children.append(list(cur_children))
for i in range(10):
ret = sorted(self.s.children('parent', revision=revs[i]))
- self.failUnless(ret == children[i],
+ self.assertTrue(ret == children[i],
"%s.children() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret,
children[i], revs[i]))
@@ -1047,7 +1047,7 @@ if TESTING == True:
self.commit_body))
for rev,cur_children in zip(revs, children):
ret = sorted(self.s.children('parent', revision=rev))
- self.failUnless(ret == cur_children,
+ self.assertTrue(ret == cur_children,
"%s.children() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret,
cur_children, rev))
@@ -1074,18 +1074,18 @@ if TESTING == True:
self.s.remove('removed')
revB = self.s.commit('Final state')
new,mod,rem = self.s.changed(revA)
- self.failUnless(sorted(new) == ['moved2', 'new'],
+ self.assertTrue(sorted(new) == ['moved2', 'new'],
'Unexpected new: %s' % new)
- self.failUnless(mod == ['modified'],
+ self.assertTrue(mod == ['modified'],
'Unexpected modified: %s' % mod)
- self.failUnless(sorted(rem) == ['moved', 'removed'],
+ self.assertTrue(sorted(rem) == ['moved', 'removed'],
'Unexpected removed: %s' % rem)
def make_storage_testcase_subclasses(storage_class, namespace):
"""Make StorageTestCase subclasses for storage_class in namespace."""
storage_testcase_classes = [
c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
+ ob for ob in list(globals().values()) if isinstance(ob, type))
if issubclass(c, StorageTestCase) \
and c.Class == Storage]
@@ -1102,7 +1102,7 @@ if TESTING == True:
"""Make VersionedStorageTestCase subclasses for storage_class in namespace."""
storage_testcase_classes = [
c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
+ ob for ob in list(globals().values()) if isinstance(ob, type))
if ((issubclass(c, StorageTestCase) \
and c.Class == Storage)
or
diff --git a/libbe/storage/util/config.py b/libbe/storage/util/config.py
index 4945b1f..f53f5b2 100644
--- a/libbe/storage/util/config.py
+++ b/libbe/storage/util/config.py
@@ -21,7 +21,7 @@
"""Create, save, and load the per-user config file at :py:func:`path`.
"""
-import ConfigParser
+import configparser
import codecs
import os
import os.path
@@ -71,7 +71,7 @@ def set_val(name, value, section="DEFAULT", encoding=None):
"""
if encoding == None:
encoding = default_encoding
- config = ConfigParser.ConfigParser()
+ config = configparser.ConfigParser()
if os.path.exists(path()) == False: # touch file or config
open(path(), 'w').close() # read chokes on missing file
f = codecs.open(path(), 'r', encoding)
@@ -114,13 +114,13 @@ def get_val(name, section="DEFAULT", default=None, encoding=None):
if os.path.exists(path()):
if encoding == None:
encoding = default_encoding
- config = ConfigParser.ConfigParser()
+ config = configparser.ConfigParser()
f = codecs.open(path(), 'r', encoding)
config.readfp(f, path())
f.close()
try:
return config.get(section, name)
- except ConfigParser.NoOptionError:
+ except configparser.NoOptionError:
return default
else:
return default
diff --git a/libbe/storage/util/mapfile.py b/libbe/storage/util/mapfile.py
index 1c37e3f..609f782 100644
--- a/libbe/storage/util/mapfile.py
+++ b/libbe/storage/util/mapfile.py
@@ -70,7 +70,7 @@ def generate(map, context=6):
the :py:mod:`~libbe.command.serve_commands` where merging is not
important, the amount of context is controllable.
- >>> sys.stdout.write(generate({'q':u'Fran\u00e7ais'}, context=0))
+ >>> sys.stdout.write(generate({'q':u'Fran\\u00e7ais'}, context=0))
{
"q": "Fran\\u00e7ais"
}
@@ -113,7 +113,7 @@ def parse(contents):
u'd'
>>> dict['e']
u'f'
- >>> contents = generate({'q':u'Fran\u00e7ais'})
+ >>> contents = generate({'q':u'Fran\\u00e7ais'})
>>> dict = parse(contents)
>>> dict['q']
u'Fran\\xe7ais'
diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py
index 77c0162..4aa3ce6 100644
--- a/libbe/storage/util/properties.py
+++ b/libbe/storage/util/properties.py
@@ -415,7 +415,7 @@ if libbe.TESTING == True:
@doc_property("A fancy property")
def x():
return {}
- self.failUnless(Test.x.__doc__ == "A fancy property",
+ self.assertTrue(Test.x.__doc__ == "A fancy property",
Test.x.__doc__)
def testLocalProperty(self):
class Test(object):
@@ -424,11 +424,11 @@ if libbe.TESTING == True:
def x():
return {}
t = Test()
- self.failUnless(t.x == None, str(t.x))
+ self.assertTrue(t.x == None, str(t.x))
t.x = 'z' # the first set initializes ._LOCAL_value
- self.failUnless(t.x == 'z', str(t.x))
- self.failUnless("_LOCAL_value" in dir(t), dir(t))
- self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
+ self.assertTrue(t.x == 'z', str(t.x))
+ self.assertTrue("_LOCAL_value" in dir(t), dir(t))
+ self.assertTrue(t._LOCAL_value == 'z', t._LOCAL_value)
def testSettingsProperty(self):
class Test(object):
@Property
@@ -438,11 +438,11 @@ if libbe.TESTING == True:
def __init__(self):
self.settings = {}
t = Test()
- self.failUnless(t.x == None, str(t.x))
+ self.assertTrue(t.x == None, str(t.x))
t.x = 'z' # the first set initializes ._LOCAL_value
- self.failUnless(t.x == 'z', str(t.x))
- self.failUnless("attr" in t.settings, t.settings)
- self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
+ self.assertTrue(t.x == 'z', str(t.x))
+ self.assertTrue("attr" in t.settings, t.settings)
+ self.assertTrue(t.settings["attr"] == 'z', t.settings["attr"])
def testDefaultingLocalProperty(self):
class Test(object):
@Property
@@ -450,15 +450,15 @@ if libbe.TESTING == True:
@local_property(name="DEFAULT", null=5)
def x(): return {}
t = Test()
- self.failUnless(t.x == 5, str(t.x))
+ self.assertTrue(t.x == 5, str(t.x))
t.x = 'x'
- self.failUnless(t.x == 'y', str(t.x))
+ self.assertTrue(t.x == 'y', str(t.x))
t.x = 'y'
- self.failUnless(t.x == 'y', str(t.x))
+ self.assertTrue(t.x == 'y', str(t.x))
t.x = 'z'
- self.failUnless(t.x == 'z', str(t.x))
+ self.assertTrue(t.x == 'z', str(t.x))
t.x = 5
- self.failUnless(t.x == 5, str(t.x))
+ self.assertTrue(t.x == 5, str(t.x))
def testCheckedLocalProperty(self):
class Test(object):
@Property
@@ -468,13 +468,13 @@ if libbe.TESTING == True:
def __init__(self):
self._CHECKED_value = 'x'
t = Test()
- self.failUnless(t.x == 'x', str(t.x))
+ self.assertTrue(t.x == 'x', str(t.x))
try:
t.x = None
e = None
- except ValueCheckError, e:
+ except ValueCheckError as e:
pass
- self.failUnless(type(e) == ValueCheckError, type(e))
+ self.assertTrue(type(e) == ValueCheckError, type(e))
def testTwoCheckedLocalProperties(self):
class Test(object):
@Property
@@ -493,18 +493,18 @@ if libbe.TESTING == True:
try:
t.x = 'a'
e = None
- except ValueCheckError, e:
+ except ValueCheckError as e:
pass
- self.failUnless(type(e) == ValueCheckError, type(e))
+ self.assertTrue(type(e) == ValueCheckError, type(e))
t.x = 'x'
t.x = 'y'
t.x = 'z'
try:
t.a = 'x'
e = None
- except ValueCheckError, e:
+ except ValueCheckError as e:
pass
- self.failUnless(type(e) == ValueCheckError, type(e))
+ self.assertTrue(type(e) == ValueCheckError, type(e))
t.a = 'a'
t.a = 'b'
t.a = 'c'
@@ -517,13 +517,13 @@ if libbe.TESTING == True:
def __init__(self):
self._CHECKED_value = 'x'
t = Test()
- self.failUnless(t.x == 'x', str(t.x))
+ self.assertTrue(t.x == 'x', str(t.x))
try:
t.x = None
e = None
- except ValueCheckError, e:
+ except ValueCheckError as e:
pass
- self.failUnless(type(e) == ValueCheckError, type(e))
+ self.assertTrue(type(e) == ValueCheckError, type(e))
def testCachedLocalProperty(self):
class Gen(object):
def __init__(self):
@@ -537,30 +537,30 @@ if libbe.TESTING == True:
@local_property(name="CACHED")
def x(): return {}
t = Test()
- self.failIf("_CACHED_cache" in dir(t),
+ self.assertFalse("_CACHED_cache" in dir(t),
getattr(t, "_CACHED_cache", None))
- self.failUnless(t.x == 1, t.x)
- self.failUnless(t.x == 1, t.x)
- self.failUnless(t.x == 1, t.x)
+ self.assertTrue(t.x == 1, t.x)
+ self.assertTrue(t.x == 1, t.x)
+ self.assertTrue(t.x == 1, t.x)
t.x = 8
- self.failUnless(t.x == 8, t.x)
- self.failUnless(t.x == 8, t.x)
+ self.assertTrue(t.x == 8, t.x)
+ self.assertTrue(t.x == 8, t.x)
t._CACHED_cache = False # Caching is off, but the stored value
val = t.x # is 8, not the initVal (None), so we
- self.failUnless(val == 8, val) # get 8.
+ self.assertTrue(val == 8, val) # get 8.
t._CACHED_value = None # Now we've set the stored value to None
val = t.x # so future calls to fget (like this)
- self.failUnless(val == 2, val) # will call the generator every time...
+ self.assertTrue(val == 2, val) # will call the generator every time...
val = t.x
- self.failUnless(val == 3, val)
+ self.assertTrue(val == 3, val)
val = t.x
- self.failUnless(val == 4, val)
+ self.assertTrue(val == 4, val)
t._CACHED_cache = True # We turn caching back on, and get
- self.failUnless(t.x == 1, str(t.x)) # the original cached value.
+ self.assertTrue(t.x == 1, str(t.x)) # the original cached value.
del t._CACHED_cached_value # Removing that value forces a
- self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call
- self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which
- self.failUnless(t.x == 5, str(t.x)) # we get the new cached value.
+ self.assertTrue(t.x == 5, str(t.x)) # single cache-regenerating call
+ self.assertTrue(t.x == 5, str(t.x)) # to the genenerator, after which
+ self.assertTrue(t.x == 5, str(t.x)) # we get the new cached value.
def testPrimedLocalProperty(self):
class Test(object):
def prime(self):
@@ -573,23 +573,23 @@ if libbe.TESTING == True:
self.settings={}
self.primeVal = "initialized"
t = Test()
- self.failIf("_PRIMED_prime" in dir(t),
+ self.assertFalse("_PRIMED_prime" in dir(t),
getattr(t, "_PRIMED_prime", None))
- self.failUnless(t.x == "initialized", t.x)
+ self.assertTrue(t.x == "initialized", t.x)
t.x = 1
- self.failUnless(t.x == 1, t.x)
+ self.assertTrue(t.x == 1, t.x)
t.x = None
- self.failUnless(t.x == "initialized", t.x)
+ self.assertTrue(t.x == "initialized", t.x)
t._PRIMED_prime = True
t.x = 3
- self.failUnless(t.x == "initialized", t.x)
+ self.assertTrue(t.x == "initialized", t.x)
t._PRIMED_prime = False
t.x = 3
- self.failUnless(t.x == 3, t.x)
+ self.assertTrue(t.x == 3, t.x)
# test unprimableVal
t.x = None
t.primeVal = None
- self.failUnless(t.x == 2, t.x)
+ self.assertTrue(t.x == 2, t.x)
def testChangeHookLocalProperty(self):
class Test(object):
def _hook(self, old, new):
@@ -602,14 +602,14 @@ if libbe.TESTING == True:
def x(): return {}
t = Test()
t.x = 1
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == 1, t.new)
+ self.assertTrue(t.old == None, t.old)
+ self.assertTrue(t.new == 1, t.new)
t.x = 1
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == 1, t.new)
+ self.assertTrue(t.old == None, t.old)
+ self.assertTrue(t.new == 1, t.new)
t.x = 2
- self.failUnless(t.old == 1, t.old)
- self.failUnless(t.new == 2, t.new)
+ self.assertTrue(t.old == 1, t.old)
+ self.assertTrue(t.new == 2, t.new)
def testChangeHookMutableProperty(self):
class Test(object):
def _hook(self, old, new):
@@ -624,45 +624,45 @@ if libbe.TESTING == True:
t = Test()
t.hook_calls = 0
t.x = []
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == [], t.new)
- self.failUnless(t.hook_calls == 1, t.hook_calls)
+ self.assertTrue(t.old == None, t.old)
+ self.assertTrue(t.new == [], t.new)
+ self.assertTrue(t.hook_calls == 1, t.hook_calls)
a = t.x
a.append(5)
t.x = a
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 2, t.hook_calls)
+ self.assertTrue(t.old == [], t.old)
+ self.assertTrue(t.new == [5], t.new)
+ self.assertTrue(t.hook_calls == 2, t.hook_calls)
t.x = []
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [], t.new)
- self.failUnless(t.hook_calls == 3, t.hook_calls)
+ self.assertTrue(t.old == [5], t.old)
+ self.assertTrue(t.new == [], t.new)
+ self.assertTrue(t.hook_calls == 3, t.hook_calls)
# now append without reassigning. this doesn't trigger the
# change, since we don't ever set t.x, only get it and mess
# with it. It does, however, update our t.new, since t.new =
# t.x and is not a static copy.
t.x.append(5)
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 3, t.hook_calls)
+ self.assertTrue(t.old == [5], t.old)
+ self.assertTrue(t.new == [5], t.new)
+ self.assertTrue(t.hook_calls == 3, t.hook_calls)
# however, the next t.x get _will_ notice the change...
a = t.x
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 4, t.hook_calls)
+ self.assertTrue(t.old == [], t.old)
+ self.assertTrue(t.new == [5], t.new)
+ self.assertTrue(t.hook_calls == 4, t.hook_calls)
t.x.append(6) # this append(6) is not noticed yet
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5,6], t.new)
- self.failUnless(t.hook_calls == 4, t.hook_calls)
+ self.assertTrue(t.old == [], t.old)
+ self.assertTrue(t.new == [5,6], t.new)
+ self.assertTrue(t.hook_calls == 4, t.hook_calls)
# this append(7) is not noticed, but the t.x get causes the
# append(6) to be noticed
t.x.append(7)
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [5,6,7], t.new)
- self.failUnless(t.hook_calls == 5, t.hook_calls)
+ self.assertTrue(t.old == [5], t.old)
+ self.assertTrue(t.new == [5,6,7], t.new)
+ self.assertTrue(t.hook_calls == 5, t.hook_calls)
a = t.x # now the append(7) is noticed
- self.failUnless(t.old == [5,6], t.old)
- self.failUnless(t.new == [5,6,7], t.new)
- self.failUnless(t.hook_calls == 6, t.hook_calls)
+ self.assertTrue(t.old == [5,6], t.old)
+ self.assertTrue(t.new == [5,6,7], t.new)
+ self.assertTrue(t.hook_calls == 6, t.hook_calls)
suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py
index 819698f..323d6e9 100644
--- a/libbe/storage/util/settings_object.py
+++ b/libbe/storage/util/settings_object.py
@@ -27,7 +27,7 @@ See Also
"""
import libbe
-from properties import Property, doc_property, local_property, \
+from .properties import Property, doc_property, local_property, \
defaulting_property, checked_property, fn_checked_property, \
cached_property, primed_property, change_hook_property, \
settings_property
@@ -324,7 +324,7 @@ if libbe.TESTING == True:
required_saved_properties=required_saved_properties)
def content_type(): return {}
expected = "A test property\n\nThis property defaults to None."
- self.failUnless(Test.content_type.__doc__ == expected,
+ self.assertTrue(Test.content_type.__doc__ == expected,
Test.content_type.__doc__)
def testSimplePropertyFromMemory(self):
"""Testing a minimal versioned property from memory"""
@@ -338,75 +338,75 @@ if libbe.TESTING == True:
required_saved_properties=required_saved_properties)
def content_type(): return {}
t = Test()
- self.failUnless(len(t.settings) == 0, len(t.settings))
+ self.assertTrue(len(t.settings) == 0, len(t.settings))
# accessing t.content_type triggers the priming, but
# t.storage.is_readable() == False, so nothing happens.
t.storage.readable = False
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(t.settings == {}, t.settings)
- self.failUnless(len(t.settings) == 0, len(t.settings))
- self.failUnless(t.content_type == None, t.content_type)
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(t.settings == {}, t.settings)
+ self.assertTrue(len(t.settings) == 0, len(t.settings))
+ self.assertTrue(t.content_type == None, t.content_type)
# accessing t.content_type triggers the priming again, and
# now that t.storage.is_readable() == True, this fills out
# t.settings with EMPTY data. At this point there should
# be one load and no saves.
t.storage.readable = True
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(len(t.settings) == 1, len(t.settings))
+ self.assertTrue(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
# an explicit call to load settings forces a reload,
# but nothing else changes.
t.load_settings()
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
+ self.assertTrue(len(t.settings) == 1, len(t.settings))
+ self.assertTrue(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(t.load_count == 2, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(t.load_count == 2, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
# now we set a value
t.content_type = 5
- self.failUnless(t.settings["Content-type"] == 5,
+ self.assertTrue(t.settings["Content-type"] == 5,
t.settings["Content-type"])
- self.failUnless(t.load_count == 2, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':5}], t.storage)
+ self.assertTrue(t.load_count == 2, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':5}], t.storage)
# getting its value changes nothing
- self.failUnless(t.content_type == 5, t.content_type)
- self.failUnless(t.settings["Content-type"] == 5,
+ self.assertTrue(t.content_type == 5, t.content_type)
+ self.assertTrue(t.settings["Content-type"] == 5,
t.settings["Content-type"])
- self.failUnless(t.load_count == 2, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':5}], t.storage)
+ self.assertTrue(t.load_count == 2, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':5}], t.storage)
# now we set another value
t.content_type = "text/plain"
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings["Content-type"] == "text/plain",
+ self.assertTrue(t.content_type == "text/plain", t.content_type)
+ self.assertTrue(t.settings["Content-type"] == "text/plain",
t.settings["Content-type"])
- self.failUnless(t.load_count == 2, t.load_count)
- self.failUnless(len(t.storage) == 2, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':5},
+ self.assertTrue(t.load_count == 2, t.load_count)
+ self.assertTrue(len(t.storage) == 2, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':5},
{'Content-type':'text/plain'}],
t.storage)
# t._get_saved_settings() returns a dict of required or
# non-default values.
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
# now we clear to the post-primed value
t.content_type = EMPTY
- self.failUnless(t.settings["Content-type"] == EMPTY,
+ self.assertTrue(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
+ self.assertTrue(t.content_type == None, t.content_type)
+ self.assertTrue(len(t.settings) == 1, len(t.settings))
+ self.assertTrue(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t._get_saved_settings() == {},
+ self.assertTrue(t._get_saved_settings() == {},
t._get_saved_settings())
- self.failUnless(t.storage == [{'Content-type':5},
+ self.assertTrue(t.storage == [{'Content-type':5},
{'Content-type':'text/plain'},
{}],
t.storage)
@@ -433,16 +433,16 @@ if libbe.TESTING == True:
# which also pulls in prop-a.
t.prop_b = 'new-b'
settings = {'prop-b':'new-b', 'prop-a':'saved'}
- self.failUnless(t.settings == settings, t.settings)
- self.failUnless(t._get_saved_settings() == settings,
+ self.assertTrue(t.settings == settings, t.settings)
+ self.assertTrue(t._get_saved_settings() == settings,
t._get_saved_settings())
# test that _get_saved_settings() works even when settings
# were _not_ loaded beforehand
t = Test()
t.storage.append({'prop-a':'saved'})
settings ={'prop-a':'saved'}
- self.failUnless(t.settings == {}, t.settings)
- self.failUnless(t._get_saved_settings() == settings,
+ self.assertTrue(t.settings == {}, t.settings)
+ self.assertTrue(t._get_saved_settings() == settings,
t._get_saved_settings())
def testSimplePropertySetStorageSave(self):
"""Set a property, then attach storage and save"""
@@ -467,13 +467,13 @@ if libbe.TESTING == True:
t.prop_a = 'text/html'
t.storage = storage
t.save_settings()
- self.failUnless(t.prop_a == 'text/html', t.prop_a)
- self.failUnless(t.settings == {'prop-a':'text/html',
+ self.assertTrue(t.prop_a == 'text/html', t.prop_a)
+ self.assertTrue(t.settings == {'prop-a':'text/html',
'prop-b':EMPTY},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'prop-a':'text/html'}],
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'prop-a':'text/html'}],
t.storage)
def testDefaultingProperty(self):
"""Testing a defaulting versioned property"""
@@ -488,24 +488,24 @@ if libbe.TESTING == True:
required_saved_properties=required_saved_properties)
def content_type(): return {}
t = Test()
- self.failUnless(t.settings == {}, t.settings)
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings == {"Content-type":EMPTY},
+ self.assertTrue(t.settings == {}, t.settings)
+ self.assertTrue(t.content_type == "text/plain", t.content_type)
+ self.assertTrue(t.settings == {"Content-type":EMPTY},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
- self.failUnless(t._get_saved_settings() == {},
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t._get_saved_settings() == {},
t._get_saved_settings())
t.content_type = "text/html"
- self.failUnless(t.content_type == "text/html",
+ self.assertTrue(t.content_type == "text/html",
t.content_type)
- self.failUnless(t.settings == {"Content-type":"text/html"},
+ self.assertTrue(t.settings == {"Content-type":"text/html"},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':'text/html'}],
t.storage)
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
def testRequiredDefaultingProperty(self):
@@ -522,25 +522,25 @@ if libbe.TESTING == True:
require_save=True)
def content_type(): return {}
t = Test()
- self.failUnless(t.settings == {}, t.settings)
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings == {"Content-type":EMPTY},
+ self.assertTrue(t.settings == {}, t.settings)
+ self.assertTrue(t.content_type == "text/plain", t.content_type)
+ self.assertTrue(t.settings == {"Content-type":EMPTY},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
t.content_type = "text/html"
- self.failUnless(t.content_type == "text/html",
+ self.assertTrue(t.content_type == "text/html",
t.content_type)
- self.failUnless(t.settings == {"Content-type":"text/html"},
+ self.assertTrue(t.settings == {"Content-type":"text/html"},
t.settings)
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':'text/html'}],
t.storage)
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
def testClassVersionedPropertyDefinition(self):
@@ -564,18 +564,18 @@ if libbe.TESTING == True:
require_save=True)
def content_type(): return {}
t = Test()
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
t.content_type = "text/html"
- self.failUnless(t._get_saved_settings() == \
+ self.assertTrue(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
- self.failUnless(t.load_count == 1, t.load_count)
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ self.assertTrue(t.load_count == 1, t.load_count)
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'Content-type':'text/html'}],
t.storage)
def testMutableChangeHookedProperty(self):
"""Testing a mutable change-hooked property"""
@@ -591,26 +591,26 @@ if libbe.TESTING == True:
required_saved_properties=required_saved_properties)
def list_type(): return {}
t = Test()
- self.failUnless(len(t.storage) == 0, len(t.storage))
- self.failUnless(t.list_type == None, t.list_type)
- self.failUnless(len(t.storage) == 0, len(t.storage))
- self.failUnless(t.settings["List-type"]==EMPTY,
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.list_type == None, t.list_type)
+ self.assertTrue(len(t.storage) == 0, len(t.storage))
+ self.assertTrue(t.settings["List-type"]==EMPTY,
t.settings["List-type"])
t.list_type = []
- self.failUnless(t.settings["List-type"] == [],
+ self.assertTrue(t.settings["List-type"] == [],
t.settings["List-type"])
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'List-type':[]}],
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'List-type':[]}],
t.storage)
t.list_type.append(5) # external modification not detected yet
- self.failUnless(len(t.storage) == 1, len(t.storage))
- self.failUnless(t.storage == [{'List-type':[]}],
+ self.assertTrue(len(t.storage) == 1, len(t.storage))
+ self.assertTrue(t.storage == [{'List-type':[]}],
t.storage)
- self.failUnless(t.settings["List-type"] == [5],
+ self.assertTrue(t.settings["List-type"] == [5],
t.settings["List-type"])
- self.failUnless(t.list_type == [5], t.list_type)# get triggers save
- self.failUnless(len(t.storage) == 2, len(t.storage))
- self.failUnless(t.storage == [{'List-type':[]},
+ self.assertTrue(t.list_type == [5], t.list_type)# get triggers save
+ self.assertTrue(len(t.storage) == 2, len(t.storage))
+ self.assertTrue(t.storage == [{'List-type':[]},
{'List-type':[5]}],
t.storage)
diff --git a/libbe/storage/util/upgrade.py b/libbe/storage/util/upgrade.py
index c3a5df1..12d177a 100644
--- a/libbe/storage/util/upgrade.py
+++ b/libbe/storage/util/upgrade.py
@@ -49,14 +49,14 @@ def generate_yaml_mapfile(map):
>>> generate_yaml_mapfile({'q':'p'})
'q: p\\n\\n'
- >>> generate_yaml_mapfile({'q':u'Fran\u00e7ais'})
+ >>> generate_yaml_mapfile({'q':u'Fran\\u00e7ais'})
'q: Fran\\xc3\\xa7ais\\n\\n'
>>> generate_yaml_mapfile({'q':u'hello'})
'q: hello\\n\\n'
"""
if yaml is None:
raise _yaml_import_error
- keys = map.keys()
+ keys = list(map.keys())
keys.sort()
for key in keys:
try:
@@ -66,9 +66,9 @@ def generate_yaml_mapfile(map):
assert(':' not in key)
assert(len(key) > 0)
except AssertionError:
- raise ValueError(unicode(key).encode('unicode_escape'))
+ raise ValueError(str(key).encode('unicode_escape'))
if '\n' in map[key]:
- raise ValueError(unicode(map[key]).encode('unicode_escape'))
+ raise ValueError(str(map[key]).encode('unicode_escape'))
lines = []
for key in keys:
@@ -94,7 +94,7 @@ def parse_yaml_mapfile(contents):
'd'
>>> dict['e']
'f'
- >>> contents = generate_yaml_mapfile({'q':u'Fran\u00e7ais'})
+ >>> contents = generate_yaml_mapfile({'q':u'Fran\\u00e7ais'})
>>> dict = parse_yaml_mapfile(contents)
>>> dict['q']
u'Fran\\xe7ais'
@@ -102,7 +102,7 @@ def parse_yaml_mapfile(contents):
if yaml is None:
raise _yaml_import_error
c = yaml.safe_load(contents)
- if type(c) == types.StringType:
+ if type(c) == bytes:
raise mapfile.InvalidMapfileContents(
'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents)
return c or {}
@@ -146,8 +146,8 @@ class Upgrader (object):
self.vcs._vcs_update(path)
def upgrade(self):
- print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \
- % (self.initial_version, self.final_version)
+ print('upgrading bugdir from "%s" to "%s"' \
+ % (self.initial_version, self.final_version), file=sys.stderr)
self.check_initial_version()
self.set_version()
self._upgrade()
@@ -191,7 +191,7 @@ class Upgrade_1_0_to_1_1 (Upgrader):
contents = '\n'.join(newlines)
# load the YAML and save
map = parse_yaml_mapfile(contents)
- if type(map) == types.StringType:
+ if type(map) == bytes:
raise ValueError((path, contents))
contents = generate_yaml_mapfile(map)
encoding.set_file_contents(path, contents)
@@ -317,7 +317,7 @@ class Upgrade_1_2_to_1_3 (Upgrader):
for bug_uuid in os.listdir(self.get_path('bugs')):
self._upgrade_bug_mapfile(bug_uuid)
self._upgrade_bugdir_mapfile()
- for bug in self._targets.values():
+ for bug in list(self._targets.values()):
self._save_bug_settings(bug)
class Upgrade_1_3_to_1_4 (Upgrader):
@@ -418,11 +418,9 @@ def upgrade(path, current_version,
use consecutive conversion functions.
"""
if current_version not in STORAGE_VERSIONS:
- raise NotImplementedError, \
- "Cannot handle version '%s' yet." % current_version
+ raise NotImplementedError("Cannot handle version '%s' yet." % current_version)
if target_version not in STORAGE_VERSIONS:
- raise NotImplementedError, \
- "Cannot handle version '%s' yet." % current_version
+ raise NotImplementedError("Cannot handle version '%s' yet." % current_version)
if (current_version, target_version) in upgrade_classes:
# direct conversion
@@ -438,9 +436,8 @@ def upgrade(path, current_version,
try:
upgrade_class = upgrade_classes[(version_a, version_b)]
except KeyError:
- raise NotImplementedError, \
- "Cannot convert version '%s' to '%s' yet." \
- % (version_a, version_b)
+ raise NotImplementedError("Cannot convert version '%s' to '%s' yet." \
+ % (version_a, version_b))
u = upgrade_class(path)
u.upgrade()
if version_b == target_version:
diff --git a/libbe/storage/vcs/__init__.py b/libbe/storage/vcs/__init__.py
index 2fd8dc4..56c66b3 100644
--- a/libbe/storage/vcs/__init__.py
+++ b/libbe/storage/vcs/__init__.py
@@ -32,7 +32,7 @@ The base `VCS` class also serves as a filesystem Storage backend (not
versioning) in the event that a user has no VCS installed.
"""
-import base
+from . import base
set_preferred_vcs = base.set_preferred_vcs
vcs_by_name = base.vcs_by_name
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
index 687575f..dfd007f 100644
--- a/libbe/storage/vcs/base.py
+++ b/libbe/storage/vcs/base.py
@@ -255,7 +255,7 @@ class CachedPathID (object):
def disconnect(self):
if self._changed == True:
f = codecs.open(self._cache_path, 'w', self.encoding)
- for uuid,path in self._cache.items():
+ for uuid,path in list(self._cache.items()):
f.write('%s\t%s\n' % (uuid, path))
f.close()
self._cache = {}
@@ -558,12 +558,12 @@ class VCS (libbe.storage.base.VersionedStorage):
for num in num_part.split('.'):
try:
self._parsed_version.append(int(num))
- except ValueError, e:
+ except ValueError as e:
# bzr version number might contain non-numerical tags
splitter = re.compile(r'[\D]') # Match non-digits
splits = splitter.split(num)
# if len(tag) > 1 some splits will be empty; remove
- splits = filter(lambda s: s != '', splits)
+ splits = [s for s in splits if s != '']
tag_starti = len(splits[0])
num_starti = num.find(splits[1], tag_starti)
tag = num[tag_starti:num_starti]
@@ -573,7 +573,7 @@ class VCS (libbe.storage.base.VersionedStorage):
for current,other in zip(self._parsed_version, args):
if type(current) != type (other):
# one of them is a pre-release string
- if type(current) != types.IntType:
+ if type(current) != int:
return -1
else:
return 1
@@ -586,12 +586,12 @@ class VCS (libbe.storage.base.VersionedStorage):
if verlen == arglen:
return 0
elif verlen > arglen:
- if type(self._parsed_version[arglen]) != types.IntType:
+ if type(self._parsed_version[arglen]) != int:
return -1 # self is a prerelease
else:
return 1
else:
- if type(args[verlen]) != types.IntType:
+ if type(args[verlen]) != int:
return 1 # args is a prerelease
else:
return -1
@@ -732,7 +732,7 @@ class VCS (libbe.storage.base.VersionedStorage):
if revision == None:
try:
path = self.path(id, revision, relpath=False)
- except InvalidID, e:
+ except InvalidID as e:
return False
return os.path.exists(path)
path = self.path(id, revision, relpath=True)
@@ -763,7 +763,7 @@ class VCS (libbe.storage.base.VersionedStorage):
if os.path.exists(path):
shutil.rmtree(path)
path = self._cached_path_id.path(id, relpath=True)
- for id,p in self._cached_path_id._cache.items():
+ for id,p in list(self._cached_path_id._cache.items()):
if p.startswith(path):
self._cached_path_id.remove_id(id)
@@ -818,7 +818,7 @@ class VCS (libbe.storage.base.VersionedStorage):
try:
relpath = self.path(id, revision, relpath=True)
contents = self._vcs_get_file_contents(relpath, revision)
- except InvalidID, e:
+ except InvalidID as e:
if default == libbe.util.InvalidObject:
raise e
return default
@@ -833,7 +833,7 @@ class VCS (libbe.storage.base.VersionedStorage):
def _set(self, id, value):
try:
path = self._cached_path_id.path(id)
- except InvalidID, e:
+ except InvalidID as e:
raise
if not os.path.exists(path):
raise InvalidID(id)
@@ -923,7 +923,7 @@ class VCS (libbe.storage.base.VersionedStorage):
"""
try:
ret = search_parent_directories(path, filename)
- except AssertionError, e:
+ except AssertionError as e:
return None
return ret
@@ -1054,8 +1054,8 @@ class VCS (libbe.storage.base.VersionedStorage):
path, decode=True).rstrip()
relpath = self._u_rel_path(path)
contents = self._vcs_get_file_contents(relpath, revision=revision)
- if type(contents) != types.UnicodeType:
- contents = unicode(contents, self.encoding)
+ if type(contents) != str:
+ contents = str(contents, self.encoding)
return contents.strip()
def _setup_storage_version(self):
@@ -1104,7 +1104,7 @@ if libbe.TESTING == True:
def test_installed(self):
"""See if the VCS is installed.
"""
- self.failUnless(self.s.installed() == True,
+ self.assertTrue(self.s.installed() == True,
'%(name)s VCS not found' % vars(self.Class))
@@ -1114,7 +1114,7 @@ if libbe.TESTING == True:
"""
if self.s.installed():
self.s.disconnect()
- self.failUnless(self.s._detect(self.dirname) == True,
+ self.assertTrue(self.s._detect(self.dirname) == True,
'Did not detected %(name)s VCS after initialising'
% vars(self.Class))
self.s.connect()
@@ -1125,7 +1125,7 @@ if libbe.TESTING == True:
if self.s.installed() and self.Class.name != 'None':
self.s.disconnect()
self.s.destroy()
- self.failUnless(self.s._detect(self.dirname) == False,
+ self.assertTrue(self.s._detect(self.dirname) == False,
'Detected %(name)s VCS before initialising'
% vars(self.Class))
self.s.init()
@@ -1136,7 +1136,7 @@ if libbe.TESTING == True:
rp = os.path.realpath(self.s.repo)
dp = os.path.realpath(self.dirname)
vcs_name = self.Class.name
- self.failUnless(
+ self.assertTrue(
dp == rp or rp == None,
"%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
@@ -1151,7 +1151,7 @@ if libbe.TESTING == True:
return
name,email = libbe.ui.util.user.parse_user_id(user_id)
if email != None:
- self.failUnless('@' in email, email)
+ self.assertTrue('@' in email, email)
def make_vcs_testcase_subclasses(vcs_class, namespace):
c = vcs_class()
@@ -1167,7 +1167,7 @@ if libbe.TESTING == True:
# Make VCSTestCase subclasses for vcs_class in the namespace.
vcs_testcase_classes = [
c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
+ ob for ob in list(globals().values()) if isinstance(ob, type))
if issubclass(c, VCSTestCase) \
and c.Class == VCS]
diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py
index 0c92058..6da299a 100644
--- a/libbe/storage/vcs/bzr.py
+++ b/libbe/storage/vcs/bzr.py
@@ -39,11 +39,11 @@ import os
import os.path
import re
import shutil
-import StringIO
+import io
import sys
import libbe
-import base
+from . import base
if libbe.TESTING == True:
import doctest
@@ -84,7 +84,7 @@ class Bzr(base.VCS):
def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
cmd = bzrlib.builtins.cmd_root()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cmd.run(filename=path)
if self.version_cmp(2,2,0) < 0:
cmd.cleanup_now()
@@ -92,7 +92,7 @@ class Bzr(base.VCS):
def _vcs_init(self, path):
cmd = bzrlib.builtins.cmd_init()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cmd.run(location=path)
if self.version_cmp(2,2,0) < 0:
cmd.cleanup_now()
@@ -105,7 +105,7 @@ class Bzr(base.VCS):
def _vcs_add(self, path):
path = os.path.join(self.repo, path)
cmd = bzrlib.builtins.cmd_add()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
kwargs = {'file_ids_from': self.repo}
if self.repo == os.path.realpath(os.getcwd()):
# Work around bzr file locking on Windows.
@@ -126,7 +126,7 @@ class Bzr(base.VCS):
# --force to also remove unversioned files.
path = os.path.join(self.repo, path)
cmd = bzrlib.builtins.cmd_remove()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cmd.run(file_list=[path], file_deletion_strategy='force')
if self.version_cmp(2,2,0) < 0:
cmd.cleanup_now()
@@ -150,14 +150,14 @@ class Bzr(base.VCS):
path = os.path.join(self.repo, path)
revision = self._parse_revision_string(revision)
cmd = bzrlib.builtins.cmd_cat()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
if self.version_cmp(1,6,0) < 0:
# old bzrlib cmd_cat uses sys.stdout not self.outf for output.
stdout = sys.stdout
sys.stdout = cmd.outf
try:
cmd.run(filename=path, revision=revision)
- except bzrlib.errors.BzrCommandError, e:
+ except bzrlib.errors.BzrCommandError as e:
if 'not present in revision' in str(e):
raise base.InvalidPath(path, root=self.repo, revision=revision)
raise
@@ -177,7 +177,7 @@ class Bzr(base.VCS):
def _vcs_isdir(self, path, revision):
try:
self._vcs_listdir(path, revision)
- except AttributeError, e:
+ except AttributeError as e:
if 'children' in str(e):
return False
raise
@@ -187,7 +187,7 @@ class Bzr(base.VCS):
path = os.path.join(self.repo, path)
revision = self._parse_revision_string(revision)
cmd = bzrlib.builtins.cmd_ls()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
try:
if self.version_cmp(2,0,0) >= 0:
cmd.run(revision=revision, path=path, recursive=recursive)
@@ -197,7 +197,7 @@ class Bzr(base.VCS):
# (https://bugs.launchpad.net/bzr/+bug/158690)
cmd.run(revision=revision, path=path,
non_recursive=False)
- except bzrlib.errors.BzrCommandError, e:
+ except bzrlib.errors.BzrCommandError as e:
if 'not present in revision' in str(e):
raise base.InvalidPath(path, root=self.repo, revision=revision)
raise
@@ -212,12 +212,12 @@ class Bzr(base.VCS):
def _vcs_commit(self, commitfile, allow_empty=False):
cmd = bzrlib.builtins.cmd_commit()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cwd = os.getcwd()
os.chdir(self.repo)
try:
cmd.run(file=commitfile, unchanged=allow_empty)
- except bzrlib.errors.BzrCommandError, e:
+ except bzrlib.errors.BzrCommandError as e:
strings = ['no changes to commit.', # bzr 1.3.1
'No changes to commit.'] # bzr 1.15.1
if self._u_any_in_string(strings, str(e)) == True:
@@ -231,7 +231,7 @@ class Bzr(base.VCS):
def _vcs_revision_id(self, index):
cmd = bzrlib.builtins.cmd_revno()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
cmd.run(location=self.repo)
if self.version_cmp(2,2,0) < 0:
cmd.cleanup_now()
@@ -245,7 +245,7 @@ class Bzr(base.VCS):
def _diff(self, revision):
revision = self._parse_revision_string(revision)
cmd = bzrlib.builtins.cmd_diff()
- cmd.outf = StringIO.StringIO()
+ cmd.outf = io.StringIO()
# for some reason, cmd_diff uses sys.stdout not self.outf for output.
stdout = sys.stdout
sys.stdout = cmd.outf
diff --git a/libbe/storage/vcs/darcs.py b/libbe/storage/vcs/darcs.py
index ec100b4..41262f5 100644
--- a/libbe/storage/vcs/darcs.py
+++ b/libbe/storage/vcs/darcs.py
@@ -104,10 +104,10 @@ class Darcs(base.VCS):
for num in num_part.split('.'):
try:
self._parsed_version.append(int(num))
- except ValueError, e:
+ except ValueError as e:
self._parsed_version.append(num)
for current,other in zip(self._parsed_version, args):
- if type(current) != types.IntType:
+ if type(current) != int:
raise NotImplementedError(
'Cannot parse non-integer portion "%s" of Darcs version "%s"'
% (current, self.version()))
@@ -284,7 +284,7 @@ class Darcs(base.VCS):
assert patch.tag == 'patch', patch.tag
for child in patch.getchildren():
if child.tag == 'name':
- text = unescape(unicode(child.text).decode('unicode_escape').strip())
+ text = unescape(str(child.text).decode('unicode_escape').strip())
revisions.append(text)
revisions.reverse()
return revisions
diff --git a/libbe/storage/vcs/git.py b/libbe/storage/vcs/git.py
index 851af19..9a15c92 100644
--- a/libbe/storage/vcs/git.py
+++ b/libbe/storage/vcs/git.py
@@ -32,7 +32,7 @@ import unittest
try:
import pygit2 as _pygit2
-except ImportError, error:
+except ImportError as error:
_pygit2 = None
_pygit2_import_error = error
else:
@@ -65,7 +65,7 @@ class PygitGit(base.VCS):
Using :py:mod:`pygit2` for the Git activity.
"""
name='pygit2'
- _null_hex = u'0' * 40
+ _null_hex = '0' * 40
def __init__(self, *args, **kwargs):
base.VCS.__init__(self, *args, **kwargs)
@@ -162,7 +162,7 @@ class PygitGit(base.VCS):
def _git_get_commit(self, revision):
if isinstance(revision, str):
- revision = unicode(revision, 'ascii')
+ revision = str(revision, 'ascii')
commit = self._pygit_repository.revparse_single(revision)
assert commit.type == _pygit2.GIT_OBJ_COMMIT, commit
return commit
diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py
index f2976ff..0ebdfb0 100644
--- a/libbe/storage/vcs/hg.py
+++ b/libbe/storage/vcs/hg.py
@@ -47,12 +47,12 @@ import os
import os.path
import re
import shutil
-import StringIO
+import io
import sys
import time # work around http://mercurial.selenic.com/bts/issue618
import libbe
-import base
+from . import base
if libbe.TESTING == True:
import doctest
@@ -85,7 +85,7 @@ class Hg(base.VCS):
fullargs = ['--cwd', kwargs['cwd']]
fullargs.extend(args)
cwd = os.getcwd()
- output = StringIO.StringIO()
+ output = io.StringIO()
if self.version_cmp(1,9) >= 0:
req = mercurial.dispatch.request(fullargs, fout=output)
mercurial.dispatch.dispatch(req)
diff --git a/libbe/ui/command_line.py b/libbe/ui/command_line.py
index 5d4d80a..be6f80a 100644
--- a/libbe/ui/command_line.py
+++ b/libbe/ui/command_line.py
@@ -89,10 +89,10 @@ class CmdOptionParser(optparse.OptionParser):
options,parsed_args = optparse.OptionParser.parse_args(
self, args=args, values=values)
options = options.__dict__
- for name,value in options.items():
+ for name,value in list(options.items()):
if '_' in name: # reconstruct original option name
options[name.replace('_', '-')] = options.pop(name)
- for name,value in options.items():
+ for name,value in list(options.items()):
argument = None
option = self._option_by_name[name]
if option.arg != None:
@@ -157,8 +157,8 @@ class CmdOptionParser(optparse.OptionParser):
fragment = parser.rargs[0]
self.complete(argument, fragment)
else:
- print >> self.command.stdout, command_option.callback(
- self.command, command_option, value)
+ print(command_option.callback(
+ self.command, command_option, value), file=self.command.stdout)
raise CallbackExit
def complete(self, argument=None, fragment=None):
@@ -166,7 +166,7 @@ class CmdOptionParser(optparse.OptionParser):
if fragment != None:
comps = [c for c in comps if c.startswith(fragment)]
if len(comps) > 0:
- print >> self.command.stdout, '\n'.join(comps)
+ print('\n'.join(comps), file=self.command.stdout)
raise CallbackExit
def process_raw_argument(self, argument, value):
@@ -175,7 +175,7 @@ class CmdOptionParser(optparse.OptionParser):
if argument.type == 'string':
if not hasattr(self, 'argv_encoding'):
self.argv_encoding = libbe.util.encoding.get_argv_encoding()
- return unicode(value, self.argv_encoding)
+ return str(value, self.argv_encoding)
return value
@@ -305,36 +305,36 @@ def dispatch(ui, command, args):
ret = ui.run(command, options, args)
except CallbackExit:
return 0
- except UnicodeDecodeError, e:
- print >> ui.io.stdout, '\n'.join([
+ except UnicodeDecodeError as e:
+ print('\n'.join([
'ERROR:', str(e),
'You should set a locale that supports unicode, e.g.',
' export LANG=en_US.utf8',
'See http://docs.python.org/library/locale.html for details',
- ])
+ ]), file=ui.io.stdout)
return 1
- except libbe.command.UsageError, e:
- print >> ui.io.stdout, 'Usage Error:\n', e
+ except libbe.command.UsageError as e:
+ print('Usage Error:\n', e, file=ui.io.stdout)
if e.command:
- print >> ui.io.stdout, e.command.usage()
- print >> ui.io.stdout, 'For usage information, try'
- print >> ui.io.stdout, ' be help %s' % e.command_name
+ print(e.command.usage(), file=ui.io.stdout)
+ print('For usage information, try', file=ui.io.stdout)
+ print(' be help %s' % e.command_name, file=ui.io.stdout)
return 1
- except libbe.command.UserError, e:
- print >> ui.io.stdout, 'ERROR:\n', e
+ except libbe.command.UserError as e:
+ print('ERROR:\n', e, file=ui.io.stdout)
return 1
- except OSError, e:
- print >> ui.io.stdout, 'OSError:\n', e
+ except OSError as e:
+ print('OSError:\n', e, file=ui.io.stdout)
return 1
- except libbe.storage.ConnectionError, e:
- print >> ui.io.stdout, 'Connection Error:\n', e
+ except libbe.storage.ConnectionError as e:
+ print('Connection Error:\n', e, file=ui.io.stdout)
return 1
- except libbe.util.http.HTTPError, e:
- print >> ui.io.stdout, 'HTTP Error:\n', e
+ except libbe.util.http.HTTPError as e:
+ print('HTTP Error:\n', e, file=ui.io.stdout)
return 1
except (libbe.util.id.MultipleIDMatches, libbe.util.id.NoIDMatches,
- libbe.util.id.InvalidIDStructure), e:
- print >> ui.io.stdout, 'Invalid id:\n', e
+ libbe.util.id.InvalidIDStructure) as e:
+ print('Invalid id:\n', e, file=ui.io.stdout)
return 1
finally:
command.cleanup()
@@ -352,26 +352,26 @@ def main():
options,args = parser.parse_args()
except CallbackExit:
return 0
- except libbe.command.UsageError, e:
+ except libbe.command.UsageError as e:
if isinstance(e.command, BE):
# no command given, print usage string
- print >> ui.io.stdout, 'Usage Error:\n', e
- print >> ui.io.stdout, be.usage()
- print >> ui.io.stdout, 'For example, try'
- print >> ui.io.stdout, ' be help'
+ print('Usage Error:\n', e, file=ui.io.stdout)
+ print(be.usage(), file=ui.io.stdout)
+ print('For example, try', file=ui.io.stdout)
+ print(' be help', file=ui.io.stdout)
else:
- print >> ui.io.stdout, 'Usage Error:\n', e
+ print('Usage Error:\n', e, file=ui.io.stdout)
if e.command:
- print >> ui.io.stdout, e.command.usage()
- print >> ui.io.stdout, 'For usage information, try'
- print >> ui.io.stdout, ' be help %s' % e.command_name
+ print(e.command.usage(), file=ui.io.stdout)
+ print('For usage information, try', file=ui.io.stdout)
+ print(' be help %s' % e.command_name, file=ui.io.stdout)
return 1
command_name = args.pop(0)
try:
Class = libbe.command.get_command_class(command_name=command_name)
- except libbe.command.UnknownCommand, e:
- print >> ui.io.stdout, e
+ except libbe.command.UnknownCommand as e:
+ print(e, file=ui.io.stdout)
return 1
ui.storage_callbacks = libbe.command.StorageCallbacks(options['repo'])
@@ -392,8 +392,8 @@ def main():
ret = dispatch(ui, command, args)
try:
ui.cleanup()
- except IOError, e:
- print >> ui.io.stdout, 'IOError:\n', e
+ except IOError as e:
+ print('IOError:\n', e, file=ui.io.stdout)
return 1
return ret
diff --git a/libbe/ui/util/editor.py b/libbe/ui/util/editor.py
index f852c01..8412ff6 100644
--- a/libbe/ui/util/editor.py
+++ b/libbe/ui/util/editor.py
@@ -36,7 +36,7 @@ if libbe.TESTING == True:
import doctest
-comment_marker = u"== Anything below this line will be ignored\n"
+comment_marker = "== Anything below this line will be ignored\n"
class CantFindEditor(Exception):
def __init__(self):
@@ -76,7 +76,7 @@ def editor_string(comment=None, encoding=None):
fhandle, fname = tempfile.mkstemp()
try:
if comment is not None:
- cstring = u'\n'+comment_string(comment)
+ cstring = '\n'+comment_string(comment)
os.write(fhandle, cstring.encode(encoding))
os.close(fhandle)
oldmtime = os.path.getmtime(fname)
diff --git a/libbe/ui/util/user.py b/libbe/ui/util/user.py
index 5306593..4a4e7f1 100644
--- a/libbe/ui/util/user.py
+++ b/libbe/ui/util/user.py
@@ -46,7 +46,7 @@ def get_fallback_username():
"""
name = None
for env in ['LOGNAME', 'USERNAME']:
- if os.environ.has_key(env):
+ if env in os.environ:
name = os.environ[env]
break
if name is None and pwd:
@@ -60,7 +60,7 @@ def get_fallback_fullname():
"""
name = None
for env in ['FULLNAME']:
- if os.environ.has_key(env):
+ if env in os.environ:
name = os.environ[env]
break
if pwd and not name:
diff --git a/libbe/util/encoding.py b/libbe/util/encoding.py
index 712a3fa..c5e242b 100644
--- a/libbe/util/encoding.py
+++ b/libbe/util/encoding.py
@@ -93,7 +93,7 @@ def get_file_contents(path, mode='r', encoding=None, decode=False):
return contents
def set_file_contents(path, contents, mode='w', encoding=None):
- if type(contents) == types.UnicodeType:
+ if type(contents) == str:
if encoding == None:
encoding = get_text_file_encoding()
f = codecs.open(path, mode, encoding)
diff --git a/libbe/util/http.py b/libbe/util/http.py
index a1ab06a..2e4dae8 100644
--- a/libbe/util/http.py
+++ b/libbe/util/http.py
@@ -23,8 +23,8 @@
# httplib.responses
# but it is slow to load.
-import urllib
-import urllib2
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
from libbe import TESTING
@@ -92,19 +92,19 @@ def get_post_url(url, get=True, data=None, data_dict=None, headers=[],
if get is True:
if data_dict != {}:
# encode get parameters in the url
- param_string = urllib.urlencode(data_dict)
+ param_string = urllib.parse.urlencode(data_dict)
url = '{}?{}'.format(url, param_string)
else:
- data = urllib.urlencode(data_dict)
+ data = urllib.parse.urlencode(data_dict)
else:
assert get is False, (data, get)
assert data_dict is None, (data, data_dict)
headers = dict(headers)
headers['User-Agent'] = agent
- req = urllib2.Request(url, data=data, headers=headers)
+ req = urllib.request.Request(url, data=data, headers=headers)
try:
- response = urllib2.urlopen(req)
- except urllib2.HTTPError, e:
+ response = urllib.request.urlopen(req)
+ except urllib.error.HTTPError as e:
if e.code == HTTP_USER_ERROR:
lines = ['The server reported a user error (HTTPError)']
else:
@@ -115,7 +115,7 @@ def get_post_url(url, get=True, data=None, data_dict=None, headers=[],
lines.append('Error code: {}'.format(e.code))
msg = '\n'.join(lines)
raise HTTPError(error=e, url=url, msg=msg)
- except urllib2.URLError, e:
+ except urllib.error.URLError as e:
msg = ('We failed to connect to the server (URLError).\nURL: {}\n'
'Reason: {}').format(url, e.reason)
raise HTTPError(error=e, url=url, msg=msg)
@@ -132,7 +132,7 @@ if TESTING:
def test_get(self):
url = 'http://bugseverywhere.org/'
page,final_url,info = get_post_url(url=url)
- self.failUnless(final_url == url,
+ self.assertTrue(final_url == url,
'Redirect?\n Expected: "{}"\n Got: "{}"'.format(
url, final_url))
@@ -140,6 +140,6 @@ if TESTING:
url = 'http://physics.drexel.edu/~wking/code/be/redirect'
expected = 'http://physics.drexel.edu/~wking/'
page,final_url,info = get_post_url(url=url)
- self.failUnless(final_url == expected,
+ self.assertTrue(final_url == expected,
'Redirect?\n Expected: "{}"\n Got: "{}"'.format(
expected, final_url))
diff --git a/libbe/util/id.py b/libbe/util/id.py
index de9edf9..078dd3f 100644
--- a/libbe/util/id.py
+++ b/libbe/util/id.py
@@ -103,14 +103,14 @@ except ImportError:
# win32 don't have os.execvp() so have to run command in a shell
q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
shell=True, cwd=cwd)
- except OSError, e :
+ except OSError as e :
strerror = "%s\nwhile executing %s" % (e.args[1], args)
- raise OSError, strerror
+ raise OSError(strerror)
output, error = q.communicate()
status = q.wait()
if status != 0:
strerror = "%s\nwhile executing %s" % (status, args)
- raise Exception, strerror
+ raise Exception(strerror)
return output.rstrip('\n')
@@ -414,11 +414,11 @@ def long_to_short_user(bugdirs, id):
long_to_short_text : conversion on a block of text
"""
ids = _split(id, check_length=True)
- matching_bugdirs = [bd for bd in bugdirs.values() if bd.uuid == ids[0]]
+ matching_bugdirs = [bd for bd in list(bugdirs.values()) if bd.uuid == ids[0]]
if len(matching_bugdirs) == 0:
- raise NoIDMatches(id, [bd.uuid for bd in bugdirs.values()])
+ raise NoIDMatches(id, [bd.uuid for bd in list(bugdirs.values())])
elif len(matching_bugdirs) > 1:
- raise MultipleIDMatches(id, '', [bd.uuid for bd in bugdirs.values()])
+ raise MultipleIDMatches(id, '', [bd.uuid for bd in list(bugdirs.values())])
bugdir = matching_bugdirs[0]
objects = [bugdir]
if len(ids) >= 2:
@@ -443,10 +443,10 @@ def short_to_long_user(bugdirs, id):
"""
ids = _split(id, check_length=True)
ids[0] = _expand(ids[0], common=None,
- other_ids=[bd.uuid for bd in bugdirs.values()])
+ other_ids=[bd.uuid for bd in list(bugdirs.values())])
if len(ids) == 1:
return _assemble(ids)
- bugdir = [bd for bd in bugdirs.values() if bd.uuid == ids[0]][0]
+ bugdir = [bd for bd in list(bugdirs.values()) if bd.uuid == ids[0]][0]
ids[1] = _expand(ids[1], common=bugdir.id.user(),
other_ids=bugdir.uuids())
if len(ids) == 2:
@@ -603,7 +603,7 @@ if libbe.TESTING == True:
class UUIDtestCase(unittest.TestCase):
def testUUID_gen(self):
id = uuid_gen()
- self.failUnless(len(id) == 36, 'invalid UUID "%s"' % id)
+ self.assertTrue(len(id) == 36, 'invalid UUID "%s"' % id)
class DummyObject (object):
def __init__(self, uuid, parent=None, siblings=[]):
@@ -629,31 +629,31 @@ if libbe.TESTING == True:
self.b_id = self.bug.id
self.c_id = self.comment.id
def test_storage(self):
- self.failUnless(self.bd_id.storage() == self.bugdir.uuid,
+ self.assertTrue(self.bd_id.storage() == self.bugdir.uuid,
self.bd_id.storage())
- self.failUnless(self.b_id.storage() == self.bug.uuid,
+ self.assertTrue(self.b_id.storage() == self.bug.uuid,
self.b_id.storage())
- self.failUnless(self.c_id.storage() == self.comment.uuid,
+ self.assertTrue(self.c_id.storage() == self.comment.uuid,
self.c_id.storage())
- self.failUnless(self.bd_id.storage('x', 'y', 'z') == \
+ self.assertTrue(self.bd_id.storage('x', 'y', 'z') == \
'1234abcd/x/y/z',
self.bd_id.storage('x', 'y', 'z'))
def test_long_user(self):
- self.failUnless(self.bd_id.long_user() == self.bugdir.uuid,
+ self.assertTrue(self.bd_id.long_user() == self.bugdir.uuid,
self.bd_id.long_user())
- self.failUnless(self.b_id.long_user() == \
+ self.assertTrue(self.b_id.long_user() == \
'/'.join([self.bugdir.uuid, self.bug.uuid]),
self.b_id.long_user())
- self.failUnless(self.c_id.long_user() ==
+ self.assertTrue(self.c_id.long_user() ==
'/'.join([self.bugdir.uuid, self.bug.uuid,
self.comment.uuid]),
self.c_id.long_user)
def test_user(self):
- self.failUnless(self.bd_id.user() == '123',
+ self.assertTrue(self.bd_id.user() == '123',
self.bd_id.user())
- self.failUnless(self.b_id.user() == '123/abc',
+ self.assertTrue(self.b_id.user() == '123/abc',
self.b_id.user())
- self.failUnless(self.c_id.user() == '123/abc/12345',
+ self.assertTrue(self.c_id.user() == '123/abc/12345',
self.c_id.user())
class ShortLongParseTestCase(unittest.TestCase):
@@ -690,12 +690,12 @@ if libbe.TESTING == True:
None, '123/abc', ['1234abcd','1234cdef','12345678'])),
]
def test_short_to_long_text(self):
- self.failUnless(short_to_long_text(
+ self.assertTrue(short_to_long_text(
self.bugdirs, self.short) == self.long,
'\n' + self.short + '\n' + short_to_long_text(
self.bugdirs, self.short) + '\n' + self.long)
def test_long_to_short_text(self):
- self.failUnless(long_to_short_text(
+ self.assertTrue(long_to_short_text(
self.bugdirs, self.long) == self.short,
'\n' + long_to_short_text(
self.bugdirs, self.long
@@ -703,7 +703,7 @@ if libbe.TESTING == True:
def test_parse_user(self):
for short_id,parsed in self.short_id_parse_pairs:
ret = parse_user(self.bugdirs, short_id)
- self.failUnless(ret == parsed,
+ self.assertTrue(ret == parsed,
'got %s\nexpected %s' % (ret, parsed))
def test_parse_user_exceptions(self):
for short_id,exception in self.short_id_exception_pairs:
@@ -712,13 +712,13 @@ if libbe.TESTING == True:
self.fail('Expected parse_user(bugdir, "%s") to raise %s,'
'\n but it returned %s'
% (short_id, exception.__class__.__name__, ret))
- except exception.__class__, e:
+ except exception.__class__ as e:
for attr in dir(e):
if attr.startswith('_') or attr == 'args':
continue
value = getattr(e, attr)
expected = getattr(exception, attr)
- self.failUnless(
+ self.assertTrue(
value == expected,
'Expected parse_user(bugdir, "%s") %s.%s'
'\n to be %s, but it is %s\n\n%s'
diff --git a/libbe/util/subproc.py b/libbe/util/subproc.py
index 0ad010c..5412b08 100644
--- a/libbe/util/subproc.py
+++ b/libbe/util/subproc.py
@@ -25,7 +25,7 @@ import sys
import types
import libbe
-from encoding import get_encoding
+from .encoding import get_encoding
if libbe.TESTING == True:
import doctest
@@ -56,7 +56,7 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
"""
if cwd == None:
cwd = '.'
- if isinstance(shell, types.StringTypes):
+ if isinstance(shell, (str,)):
list_args = ' '.split(args) # sloppy, but just for logging
str_args = args
else:
@@ -76,7 +76,7 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
# win32 don't have os.execvp() so have to run command in a shell
q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr,
shell=shell, cwd=cwd, **kwargs)
- except OSError, e:
+ except OSError as e:
raise CommandError(list_args, status=e.args[0], stderr=e)
stdout,stderr = q.communicate(input=stdin)
status = q.wait()
@@ -84,10 +84,10 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
if encoding == None:
encoding = get_encoding()
if stdout != None:
- stdout = unicode(stdout, encoding)
+ stdout = str(stdout, encoding)
if stderr != None:
- stderr = unicode(stderr, encoding)
- libbe.LOG.debug(u'{0}\n{1}{2}'.format(status, stdout, stderr))
+ stderr = str(stderr, encoding)
+ libbe.LOG.debug('{0}\n{1}{2}'.format(status, stdout, stderr))
else:
libbe.LOG.debug('{0}\n{1}{2}'.format(status, stdout, stderr))
if status not in expect:
diff --git a/libbe/util/utility.py b/libbe/util/utility.py
index 08ffca8..28de34f 100644
--- a/libbe/util/utility.py
+++ b/libbe/util/utility.py
@@ -227,7 +227,7 @@ def iterable_full_of_strings(value, alternative=None):
elif not hasattr(value, '__iter__'):
return False
for x in value:
- if type(x) not in types.StringTypes:
+ if type(x) not in (str,):
return False
return True
diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py
index c509a48..96ddf60 100644
--- a/libbe/util/wsgi.py
+++ b/libbe/util/wsgi.py
@@ -32,13 +32,13 @@ import os.path
import re
import select
import signal
-import StringIO
+import io
import sys
import time
import traceback
import types
-import urllib
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.parse
import wsgiref.simple_server
try:
@@ -109,8 +109,8 @@ class User (object):
string = string.strip()
fields = string.split(':')
if len(fields) != 3:
- raise ValueError, '{}!=3 fields in "{}"'.format(
- len(fields), string)
+ raise ValueError('{}!=3 fields in "{}"'.format(
+ len(fields), string))
self.uname,self.name,self.passhash = fields
def __str__(self):
@@ -229,7 +229,7 @@ class WSGI_Object (object):
def log_request(self, environ, status='-1 OK', bytes=-1):
if self.logger is None or self.logger.level > self.log_level:
return
- req_uri = urllib.quote(environ.get('SCRIPT_NAME', '')
+ req_uri = urllib.parse.quote(environ.get('SCRIPT_NAME', '')
+ environ.get('PATH_INFO', ''))
if environ.get('QUERY_STRING'):
req_uri += '?' + environ['QUERY_STRING']
@@ -276,7 +276,7 @@ class ExceptionApp (WSGI_Middleware):
def _call(self, environ, start_response):
try:
return self.app(environ, start_response)
- except Exception, e:
+ except Exception as e:
etype,value,tb = sys.exc_info()
trace = ''.join(
traceback.format_exception(etype, value, tb, None))
@@ -290,7 +290,7 @@ class HandlerErrorApp (WSGI_Middleware):
def _call(self, environ, start_response):
try:
return self.app(environ, start_response)
- except HandlerError, e:
+ except HandlerError as e:
self.log_request(environ, status=str(e), bytes=0)
start_response('{} {}'.format(e.code, e.msg), e.headers)
return []
@@ -338,7 +338,7 @@ class UppercaseHeaderApp (WSGI_Middleware):
http://www.python.org/dev/peps/pep-0333/#id20
"""
def _call(self, environ, start_response):
- for key,value in environ.items():
+ for key,value in list(environ.items()):
if key.startswith('HTTP_'):
uppercase = key.upper()
if uppercase != key:
@@ -363,11 +363,11 @@ class WSGI_DataObject (WSGI_Object):
if content is None:
start_response('200 OK', [])
return []
- if type(content) is types.UnicodeType:
+ if type(content) is str:
content = content.encode('utf-8')
for i,header in enumerate(headers):
header_name,header_value = header
- if type(header_value) == types.UnicodeType:
+ if type(header_value) == str:
headers[i] = (header_name, header_value.encode('ISO-8859-1'))
response = '200 OK'
content_length = len(content)
@@ -388,9 +388,9 @@ class WSGI_DataObject (WSGI_Object):
def _parse_query(self, query):
if len(query) == 0:
return {}
- data = urlparse.parse_qs(
+ data = urllib.parse.parse_qs(
query, keep_blank_values=True, strict_parsing=True)
- for k,v in data.items():
+ for k,v in list(data.items()):
if len(v) == 1:
data[k] = v[0]
return data
@@ -411,7 +411,7 @@ class WSGI_DataObject (WSGI_Object):
clen = 0
if clen != 0:
if self.maxlen > 0 and clen > self.maxlen:
- raise ValueError, 'Maximum content length exceeded'
+ raise ValueError('Maximum content length exceeded')
return environ['wsgi.input'].read(clen)
return ''
@@ -722,8 +722,8 @@ class WSGICaller (object):
'SERVER_PROTOCOL':'HTTP/1.1',
'wsgi.version':(1,0),
'wsgi.url_scheme':'http',
- 'wsgi.input':StringIO.StringIO(),
- 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.input':io.StringIO(),
+ 'wsgi.errors':io.StringIO(),
'wsgi.multithread':False,
'wsgi.multiprocess':False,
'wsgi.run_once':False,
@@ -737,17 +737,17 @@ class WSGICaller (object):
env['scheme'] = scheme
if data_dict is not None:
assert data is None, (data, data_dict)
- data = urllib.urlencode(data_dict)
+ data = urllib.parse.urlencode(data_dict)
if data is not None:
if data_dict is None:
assert method == 'POST', (method, data)
if method == 'POST':
env['CONTENT_LENGTH'] = len(data)
- env['wsgi.input'] = StringIO.StringIO(data)
+ env['wsgi.input'] = io.StringIO(data)
else:
assert method in ['GET', 'HEAD'], method
env['QUERY_STRING'] = data
- for key,value in environ.items():
+ for key,value in list(environ.items()):
env[key] = value
return ''.join(app(env, self.start_response))
@@ -760,7 +760,7 @@ class WSGICaller (object):
if libbe.TESTING:
class WSGITestCase (unittest.TestCase):
def setUp(self):
- self.logstream = StringIO.StringIO()
+ self.logstream = io.StringIO()
self.logger = logging.getLogger('be-wsgi-test')
console = logging.StreamHandler(self.logstream)
console.setFormatter(logging.Formatter('%(message)s'))
@@ -789,20 +789,20 @@ if libbe.TESTING:
error=123,
message='Dummy Error',
headers=[('X-Dummy-Header','Dummy Value')])
- self.failUnless(contents == ['Dummy Error'], contents)
- self.failUnless(
+ self.assertTrue(contents == ['Dummy Error'], contents)
+ self.assertTrue(
self.caller.status == '123 Dummy Error', self.caller.status)
- self.failUnless(self.caller.response_headers == [
+ self.assertTrue(self.caller.response_headers == [
('Content-Type','text/plain'),
('X-Dummy-Header','Dummy Value')],
self.caller.response_headers)
- self.failUnless(self.caller.exc_info == None, self.caller.exc_info)
+ self.assertTrue(self.caller.exc_info == None, self.caller.exc_info)
def test_log_request(self):
self.app.log_request(
environ=self.caller.default_environ, status='-1 OK', bytes=123)
log = self.logstream.getvalue()
- self.failUnless(log.startswith('192.168.0.123 -'), log)
+ self.assertTrue(log.startswith('192.168.0.123 -'), log)
class ExceptionAppTestCase (WSGITestCase):
@@ -815,12 +815,12 @@ if libbe.TESTING:
def test_traceback(self):
try:
self.getURL(self.app)
- except ValueError, e:
+ except ValueError as e:
pass
log = self.logstream.getvalue()
- self.failUnless(log.startswith('Traceback'), log)
- self.failUnless('child_app' in log, log)
- self.failUnless('ValueError: Dummy Error' in log, log)
+ self.assertTrue(log.startswith('Traceback'), log)
+ self.assertTrue('child_app' in log, log)
+ self.assertTrue('ValueError: Dummy Error' in log, log)
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
@@ -887,15 +887,14 @@ def _create_cert_request(pkey, digest="md5", **name):
req = OpenSSL.crypto.X509Req()
subj = req.get_subject()
- for (key,value) in name.items():
+ for (key,value) in list(name.items()):
setattr(subj, key, value)
req.set_pubkey(pkey)
req.sign(pkey, digest)
return req
-def _create_certificate(req, (issuerCert, issuerKey), serial,
- (notBefore, notAfter), digest='md5'):
+def _create_certificate(req, xxx_todo_changeme, serial, xxx_todo_changeme1, digest='md5'):
"""Generate a certificate given a certificate request.
Returns the signed certificate in an X509 object.
@@ -919,6 +918,8 @@ def _create_certificate(req, (issuerCert, issuerKey), serial,
digest :
Digest method to use for signing, default is md5
"""
+ (issuerCert, issuerKey) = xxx_todo_changeme
+ (notBefore, notAfter) = xxx_todo_changeme1
cert = OpenSSL.crypto.X509()
cert.set_serial_number(serial)
cert.gmtime_adj_notBefore(notBefore)
diff --git a/libbe/version.py b/libbe/version.py
index 33277e2..a0bfe03 100755
--- a/libbe/version.py
+++ b/libbe/version.py
@@ -68,4 +68,4 @@ def version(verbose=False):
if __name__ == '__main__':
- print(version(verbose=True))
+ print((version(verbose=True)))
diff --git a/release.py b/release.py
index 2650a3f..fdf45d7 100755
--- a/release.py
+++ b/release.py
@@ -67,34 +67,34 @@ def pending_changes():
return True
def set_release_version(tag):
- print "set libbe.version._VERSION = '%s'" % tag
+ print("set libbe.version._VERSION = '%s'" % tag)
invoke(['sed', '-i', "s/^[# ]*_VERSION *=.*/_VERSION = '%s'/" % tag,
os.path.join('libbe', 'version.py')])
def remove_makefile_libbe_version_dependencies(filename):
- print "set %s LIBBE_VERSION :=" % filename
+ print("set %s LIBBE_VERSION :=" % filename)
invoke(['sed', '-i', "s/^LIBBE_VERSION *:=.*/LIBBE_VERSION :=/",
filename])
def commit(commit_message):
- print 'commit current status:', commit_message
+ print('commit current status:', commit_message)
invoke(['git', 'commit', '-a', '-m', commit_message])
def tag(tag):
- print 'tag current revision', tag
+ print('tag current revision', tag)
invoke(['git', 'tag', '-s', '-m', 'version {}'.format(tag), tag])
def export(target_dir):
if not target_dir.endswith(os.path.sep):
target_dir += os.path.sep
- print 'export current revision to', target_dir
+ print('export current revision to', target_dir)
status,stdout,stderr = invoke(
['git', 'archive', '--prefix', target_dir, 'HEAD'],
unicode_output=False)
status,stdout,stderr = invoke(['tar', '-xv'], stdin=stdout)
def make_version():
- print 'generate libbe/_version.py'
+ print('generate libbe/_version.py')
invoke(['make', os.path.join('libbe', '_version.py')])
def make_changelog(filename, tag):
@@ -103,13 +103,13 @@ def make_changelog(filename, tag):
Not the most ChangeLog-esque format, but iterating through commits
by hand is just too slow.
"""
- print 'generate ChangeLog file', filename, 'up to tag', tag
+ print('generate ChangeLog file', filename, 'up to tag', tag)
status,stdout,stderr = invoke(
['git', 'log', '--no-merges', '{}..{}'.format(INITIAL_COMMIT, tag)])
with codecs.open(filename, 'w', 'utf-8') as f:
for line in stdout.splitlines():
f.write(line.rstrip())
- f.write(u'\n')
+ f.write('\n')
def set_vcs_name(be_dir, vcs_name='None'):
"""Exported directory is not a git repository, so set vcs_name to
@@ -121,7 +121,7 @@ def set_vcs_name(be_dir, vcs_name='None'):
continue
filename = os.path.join(be_dir, directory, 'settings')
if os.path.exists(filename):
- print 'set vcs_name in', filename, 'to', vcs_name
+ print('set vcs_name in', filename, 'to', vcs_name)
invoke(['sed', '-i', "s/^vcs_name:.*/vcs_name: %s/" % vcs_name,
filename])
@@ -133,7 +133,7 @@ def make_id_cache():
def make_html_docs(docdir):
"""Generate docs so users won't need to install Sphinx, etc.
"""
- print('generate HTML docs in {}'.format(docdir))
+ print(('generate HTML docs in {}'.format(docdir)))
status,stdout,stderr = invoke(
['make', 'SPHINXBUILD=sphinx-build-2.7', 'dirhtml'], cwd=docdir)
@@ -144,20 +144,20 @@ def create_tarball(tag):
make_version()
remove_makefile_libbe_version_dependencies(
os.path.join(export_dir, 'Makefile'))
- print 'copy libbe/_version.py to %s/libbe/_version.py' % export_dir
+ print('copy libbe/_version.py to %s/libbe/_version.py' % export_dir)
shutil.copy(os.path.join('libbe', '_version.py'),
os.path.join(export_dir, 'libbe', '_version.py'))
make_changelog(os.path.join(export_dir, 'ChangeLog'), tag)
make_id_cache()
make_html_docs(os.path.join(export_dir, 'doc'))
- print 'copy .be/id-cache to %s/.be/id-cache' % export_dir
+ print('copy .be/id-cache to %s/.be/id-cache' % export_dir)
shutil.copy(os.path.join('.be', 'id-cache'),
os.path.join(export_dir, '.be', 'id-cache'))
set_vcs_name(os.path.join(export_dir, '.be'))
tarball_file = '%s.tar.gz' % release_name
- print 'create tarball', tarball_file
+ print('create tarball', tarball_file)
invoke(['tar', '-czf', tarball_file, export_dir])
- print 'remove', export_dir
+ print('remove', export_dir)
shutil.rmtree(export_dir)
def test():
@@ -189,10 +189,10 @@ If you don't like what got committed, you can undo the release with
validate_tag(_tag)
if pending_changes() == True:
- print "Handle pending changes before releasing."
+ print("Handle pending changes before releasing.")
sys.exit(1)
set_release_version(_tag)
- print "Update copyright information..."
+ print("Update copyright information...")
env = dict(os.environ)
pythonpath = os.path.abspath('update-copyright')
if 'PYTHONPATH' in env: