aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/util
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/util')
-rw-r--r--libbe/util/encoding.py2
-rw-r--r--libbe/util/http.py20
-rw-r--r--libbe/util/id.py48
-rw-r--r--libbe/util/subproc.py12
-rw-r--r--libbe/util/utility.py2
-rw-r--r--libbe/util/wsgi.py65
6 files changed, 75 insertions, 74 deletions
diff --git a/libbe/util/encoding.py b/libbe/util/encoding.py
index 712a3fa..c5e242b 100644
--- a/libbe/util/encoding.py
+++ b/libbe/util/encoding.py
@@ -93,7 +93,7 @@ def get_file_contents(path, mode='r', encoding=None, decode=False):
return contents
def set_file_contents(path, contents, mode='w', encoding=None):
- if type(contents) == types.UnicodeType:
+ if type(contents) == str:
if encoding == None:
encoding = get_text_file_encoding()
f = codecs.open(path, mode, encoding)
diff --git a/libbe/util/http.py b/libbe/util/http.py
index a1ab06a..2e4dae8 100644
--- a/libbe/util/http.py
+++ b/libbe/util/http.py
@@ -23,8 +23,8 @@
# httplib.responses
# but it is slow to load.
-import urllib
-import urllib2
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
from libbe import TESTING
@@ -92,19 +92,19 @@ def get_post_url(url, get=True, data=None, data_dict=None, headers=[],
if get is True:
if data_dict != {}:
# encode get parameters in the url
- param_string = urllib.urlencode(data_dict)
+ param_string = urllib.parse.urlencode(data_dict)
url = '{}?{}'.format(url, param_string)
else:
- data = urllib.urlencode(data_dict)
+ data = urllib.parse.urlencode(data_dict)
else:
assert get is False, (data, get)
assert data_dict is None, (data, data_dict)
headers = dict(headers)
headers['User-Agent'] = agent
- req = urllib2.Request(url, data=data, headers=headers)
+ req = urllib.request.Request(url, data=data, headers=headers)
try:
- response = urllib2.urlopen(req)
- except urllib2.HTTPError, e:
+ response = urllib.request.urlopen(req)
+ except urllib.error.HTTPError as e:
if e.code == HTTP_USER_ERROR:
lines = ['The server reported a user error (HTTPError)']
else:
@@ -115,7 +115,7 @@ def get_post_url(url, get=True, data=None, data_dict=None, headers=[],
lines.append('Error code: {}'.format(e.code))
msg = '\n'.join(lines)
raise HTTPError(error=e, url=url, msg=msg)
- except urllib2.URLError, e:
+ except urllib.error.URLError as e:
msg = ('We failed to connect to the server (URLError).\nURL: {}\n'
'Reason: {}').format(url, e.reason)
raise HTTPError(error=e, url=url, msg=msg)
@@ -132,7 +132,7 @@ if TESTING:
def test_get(self):
url = 'http://bugseverywhere.org/'
page,final_url,info = get_post_url(url=url)
- self.failUnless(final_url == url,
+ self.assertTrue(final_url == url,
'Redirect?\n Expected: "{}"\n Got: "{}"'.format(
url, final_url))
@@ -140,6 +140,6 @@ if TESTING:
url = 'http://physics.drexel.edu/~wking/code/be/redirect'
expected = 'http://physics.drexel.edu/~wking/'
page,final_url,info = get_post_url(url=url)
- self.failUnless(final_url == expected,
+ self.assertTrue(final_url == expected,
'Redirect?\n Expected: "{}"\n Got: "{}"'.format(
expected, final_url))
diff --git a/libbe/util/id.py b/libbe/util/id.py
index de9edf9..078dd3f 100644
--- a/libbe/util/id.py
+++ b/libbe/util/id.py
@@ -103,14 +103,14 @@ except ImportError:
# win32 don't have os.execvp() so have to run command in a shell
q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
shell=True, cwd=cwd)
- except OSError, e :
+ except OSError as e :
strerror = "%s\nwhile executing %s" % (e.args[1], args)
- raise OSError, strerror
+ raise OSError(strerror)
output, error = q.communicate()
status = q.wait()
if status != 0:
strerror = "%s\nwhile executing %s" % (status, args)
- raise Exception, strerror
+ raise Exception(strerror)
return output.rstrip('\n')
@@ -414,11 +414,11 @@ def long_to_short_user(bugdirs, id):
long_to_short_text : conversion on a block of text
"""
ids = _split(id, check_length=True)
- matching_bugdirs = [bd for bd in bugdirs.values() if bd.uuid == ids[0]]
+ matching_bugdirs = [bd for bd in list(bugdirs.values()) if bd.uuid == ids[0]]
if len(matching_bugdirs) == 0:
- raise NoIDMatches(id, [bd.uuid for bd in bugdirs.values()])
+ raise NoIDMatches(id, [bd.uuid for bd in list(bugdirs.values())])
elif len(matching_bugdirs) > 1:
- raise MultipleIDMatches(id, '', [bd.uuid for bd in bugdirs.values()])
+ raise MultipleIDMatches(id, '', [bd.uuid for bd in list(bugdirs.values())])
bugdir = matching_bugdirs[0]
objects = [bugdir]
if len(ids) >= 2:
@@ -443,10 +443,10 @@ def short_to_long_user(bugdirs, id):
"""
ids = _split(id, check_length=True)
ids[0] = _expand(ids[0], common=None,
- other_ids=[bd.uuid for bd in bugdirs.values()])
+ other_ids=[bd.uuid for bd in list(bugdirs.values())])
if len(ids) == 1:
return _assemble(ids)
- bugdir = [bd for bd in bugdirs.values() if bd.uuid == ids[0]][0]
+ bugdir = [bd for bd in list(bugdirs.values()) if bd.uuid == ids[0]][0]
ids[1] = _expand(ids[1], common=bugdir.id.user(),
other_ids=bugdir.uuids())
if len(ids) == 2:
@@ -603,7 +603,7 @@ if libbe.TESTING == True:
class UUIDtestCase(unittest.TestCase):
def testUUID_gen(self):
id = uuid_gen()
- self.failUnless(len(id) == 36, 'invalid UUID "%s"' % id)
+ self.assertTrue(len(id) == 36, 'invalid UUID "%s"' % id)
class DummyObject (object):
def __init__(self, uuid, parent=None, siblings=[]):
@@ -629,31 +629,31 @@ if libbe.TESTING == True:
self.b_id = self.bug.id
self.c_id = self.comment.id
def test_storage(self):
- self.failUnless(self.bd_id.storage() == self.bugdir.uuid,
+ self.assertTrue(self.bd_id.storage() == self.bugdir.uuid,
self.bd_id.storage())
- self.failUnless(self.b_id.storage() == self.bug.uuid,
+ self.assertTrue(self.b_id.storage() == self.bug.uuid,
self.b_id.storage())
- self.failUnless(self.c_id.storage() == self.comment.uuid,
+ self.assertTrue(self.c_id.storage() == self.comment.uuid,
self.c_id.storage())
- self.failUnless(self.bd_id.storage('x', 'y', 'z') == \
+ self.assertTrue(self.bd_id.storage('x', 'y', 'z') == \
'1234abcd/x/y/z',
self.bd_id.storage('x', 'y', 'z'))
def test_long_user(self):
- self.failUnless(self.bd_id.long_user() == self.bugdir.uuid,
+ self.assertTrue(self.bd_id.long_user() == self.bugdir.uuid,
self.bd_id.long_user())
- self.failUnless(self.b_id.long_user() == \
+ self.assertTrue(self.b_id.long_user() == \
'/'.join([self.bugdir.uuid, self.bug.uuid]),
self.b_id.long_user())
- self.failUnless(self.c_id.long_user() ==
+ self.assertTrue(self.c_id.long_user() ==
'/'.join([self.bugdir.uuid, self.bug.uuid,
self.comment.uuid]),
self.c_id.long_user)
def test_user(self):
- self.failUnless(self.bd_id.user() == '123',
+ self.assertTrue(self.bd_id.user() == '123',
self.bd_id.user())
- self.failUnless(self.b_id.user() == '123/abc',
+ self.assertTrue(self.b_id.user() == '123/abc',
self.b_id.user())
- self.failUnless(self.c_id.user() == '123/abc/12345',
+ self.assertTrue(self.c_id.user() == '123/abc/12345',
self.c_id.user())
class ShortLongParseTestCase(unittest.TestCase):
@@ -690,12 +690,12 @@ if libbe.TESTING == True:
None, '123/abc', ['1234abcd','1234cdef','12345678'])),
]
def test_short_to_long_text(self):
- self.failUnless(short_to_long_text(
+ self.assertTrue(short_to_long_text(
self.bugdirs, self.short) == self.long,
'\n' + self.short + '\n' + short_to_long_text(
self.bugdirs, self.short) + '\n' + self.long)
def test_long_to_short_text(self):
- self.failUnless(long_to_short_text(
+ self.assertTrue(long_to_short_text(
self.bugdirs, self.long) == self.short,
'\n' + long_to_short_text(
self.bugdirs, self.long
@@ -703,7 +703,7 @@ if libbe.TESTING == True:
def test_parse_user(self):
for short_id,parsed in self.short_id_parse_pairs:
ret = parse_user(self.bugdirs, short_id)
- self.failUnless(ret == parsed,
+ self.assertTrue(ret == parsed,
'got %s\nexpected %s' % (ret, parsed))
def test_parse_user_exceptions(self):
for short_id,exception in self.short_id_exception_pairs:
@@ -712,13 +712,13 @@ if libbe.TESTING == True:
self.fail('Expected parse_user(bugdir, "%s") to raise %s,'
'\n but it returned %s'
% (short_id, exception.__class__.__name__, ret))
- except exception.__class__, e:
+ except exception.__class__ as e:
for attr in dir(e):
if attr.startswith('_') or attr == 'args':
continue
value = getattr(e, attr)
expected = getattr(exception, attr)
- self.failUnless(
+ self.assertTrue(
value == expected,
'Expected parse_user(bugdir, "%s") %s.%s'
'\n to be %s, but it is %s\n\n%s'
diff --git a/libbe/util/subproc.py b/libbe/util/subproc.py
index 0ad010c..5412b08 100644
--- a/libbe/util/subproc.py
+++ b/libbe/util/subproc.py
@@ -25,7 +25,7 @@ import sys
import types
import libbe
-from encoding import get_encoding
+from .encoding import get_encoding
if libbe.TESTING == True:
import doctest
@@ -56,7 +56,7 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
"""
if cwd == None:
cwd = '.'
- if isinstance(shell, types.StringTypes):
+ if isinstance(shell, (str,)):
list_args = ' '.split(args) # sloppy, but just for logging
str_args = args
else:
@@ -76,7 +76,7 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
# win32 don't have os.execvp() so have to run command in a shell
q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr,
shell=shell, cwd=cwd, **kwargs)
- except OSError, e:
+ except OSError as e:
raise CommandError(list_args, status=e.args[0], stderr=e)
stdout,stderr = q.communicate(input=stdin)
status = q.wait()
@@ -84,10 +84,10 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
if encoding == None:
encoding = get_encoding()
if stdout != None:
- stdout = unicode(stdout, encoding)
+ stdout = str(stdout, encoding)
if stderr != None:
- stderr = unicode(stderr, encoding)
- libbe.LOG.debug(u'{0}\n{1}{2}'.format(status, stdout, stderr))
+ stderr = str(stderr, encoding)
+ libbe.LOG.debug('{0}\n{1}{2}'.format(status, stdout, stderr))
else:
libbe.LOG.debug('{0}\n{1}{2}'.format(status, stdout, stderr))
if status not in expect:
diff --git a/libbe/util/utility.py b/libbe/util/utility.py
index 08ffca8..28de34f 100644
--- a/libbe/util/utility.py
+++ b/libbe/util/utility.py
@@ -227,7 +227,7 @@ def iterable_full_of_strings(value, alternative=None):
elif not hasattr(value, '__iter__'):
return False
for x in value:
- if type(x) not in types.StringTypes:
+ if type(x) not in (str,):
return False
return True
diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py
index c509a48..96ddf60 100644
--- a/libbe/util/wsgi.py
+++ b/libbe/util/wsgi.py
@@ -32,13 +32,13 @@ import os.path
import re
import select
import signal
-import StringIO
+import io
import sys
import time
import traceback
import types
-import urllib
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.parse
import wsgiref.simple_server
try:
@@ -109,8 +109,8 @@ class User (object):
string = string.strip()
fields = string.split(':')
if len(fields) != 3:
- raise ValueError, '{}!=3 fields in "{}"'.format(
- len(fields), string)
+ raise ValueError('{}!=3 fields in "{}"'.format(
+ len(fields), string))
self.uname,self.name,self.passhash = fields
def __str__(self):
@@ -229,7 +229,7 @@ class WSGI_Object (object):
def log_request(self, environ, status='-1 OK', bytes=-1):
if self.logger is None or self.logger.level > self.log_level:
return
- req_uri = urllib.quote(environ.get('SCRIPT_NAME', '')
+ req_uri = urllib.parse.quote(environ.get('SCRIPT_NAME', '')
+ environ.get('PATH_INFO', ''))
if environ.get('QUERY_STRING'):
req_uri += '?' + environ['QUERY_STRING']
@@ -276,7 +276,7 @@ class ExceptionApp (WSGI_Middleware):
def _call(self, environ, start_response):
try:
return self.app(environ, start_response)
- except Exception, e:
+ except Exception as e:
etype,value,tb = sys.exc_info()
trace = ''.join(
traceback.format_exception(etype, value, tb, None))
@@ -290,7 +290,7 @@ class HandlerErrorApp (WSGI_Middleware):
def _call(self, environ, start_response):
try:
return self.app(environ, start_response)
- except HandlerError, e:
+ except HandlerError as e:
self.log_request(environ, status=str(e), bytes=0)
start_response('{} {}'.format(e.code, e.msg), e.headers)
return []
@@ -338,7 +338,7 @@ class UppercaseHeaderApp (WSGI_Middleware):
http://www.python.org/dev/peps/pep-0333/#id20
"""
def _call(self, environ, start_response):
- for key,value in environ.items():
+ for key,value in list(environ.items()):
if key.startswith('HTTP_'):
uppercase = key.upper()
if uppercase != key:
@@ -363,11 +363,11 @@ class WSGI_DataObject (WSGI_Object):
if content is None:
start_response('200 OK', [])
return []
- if type(content) is types.UnicodeType:
+ if type(content) is str:
content = content.encode('utf-8')
for i,header in enumerate(headers):
header_name,header_value = header
- if type(header_value) == types.UnicodeType:
+ if type(header_value) == str:
headers[i] = (header_name, header_value.encode('ISO-8859-1'))
response = '200 OK'
content_length = len(content)
@@ -388,9 +388,9 @@ class WSGI_DataObject (WSGI_Object):
def _parse_query(self, query):
if len(query) == 0:
return {}
- data = urlparse.parse_qs(
+ data = urllib.parse.parse_qs(
query, keep_blank_values=True, strict_parsing=True)
- for k,v in data.items():
+ for k,v in list(data.items()):
if len(v) == 1:
data[k] = v[0]
return data
@@ -411,7 +411,7 @@ class WSGI_DataObject (WSGI_Object):
clen = 0
if clen != 0:
if self.maxlen > 0 and clen > self.maxlen:
- raise ValueError, 'Maximum content length exceeded'
+ raise ValueError('Maximum content length exceeded')
return environ['wsgi.input'].read(clen)
return ''
@@ -722,8 +722,8 @@ class WSGICaller (object):
'SERVER_PROTOCOL':'HTTP/1.1',
'wsgi.version':(1,0),
'wsgi.url_scheme':'http',
- 'wsgi.input':StringIO.StringIO(),
- 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.input':io.StringIO(),
+ 'wsgi.errors':io.StringIO(),
'wsgi.multithread':False,
'wsgi.multiprocess':False,
'wsgi.run_once':False,
@@ -737,17 +737,17 @@ class WSGICaller (object):
env['scheme'] = scheme
if data_dict is not None:
assert data is None, (data, data_dict)
- data = urllib.urlencode(data_dict)
+ data = urllib.parse.urlencode(data_dict)
if data is not None:
if data_dict is None:
assert method == 'POST', (method, data)
if method == 'POST':
env['CONTENT_LENGTH'] = len(data)
- env['wsgi.input'] = StringIO.StringIO(data)
+ env['wsgi.input'] = io.StringIO(data)
else:
assert method in ['GET', 'HEAD'], method
env['QUERY_STRING'] = data
- for key,value in environ.items():
+ for key,value in list(environ.items()):
env[key] = value
return ''.join(app(env, self.start_response))
@@ -760,7 +760,7 @@ class WSGICaller (object):
if libbe.TESTING:
class WSGITestCase (unittest.TestCase):
def setUp(self):
- self.logstream = StringIO.StringIO()
+ self.logstream = io.StringIO()
self.logger = logging.getLogger('be-wsgi-test')
console = logging.StreamHandler(self.logstream)
console.setFormatter(logging.Formatter('%(message)s'))
@@ -789,20 +789,20 @@ if libbe.TESTING:
error=123,
message='Dummy Error',
headers=[('X-Dummy-Header','Dummy Value')])
- self.failUnless(contents == ['Dummy Error'], contents)
- self.failUnless(
+ self.assertTrue(contents == ['Dummy Error'], contents)
+ self.assertTrue(
self.caller.status == '123 Dummy Error', self.caller.status)
- self.failUnless(self.caller.response_headers == [
+ self.assertTrue(self.caller.response_headers == [
('Content-Type','text/plain'),
('X-Dummy-Header','Dummy Value')],
self.caller.response_headers)
- self.failUnless(self.caller.exc_info == None, self.caller.exc_info)
+ self.assertTrue(self.caller.exc_info == None, self.caller.exc_info)
def test_log_request(self):
self.app.log_request(
environ=self.caller.default_environ, status='-1 OK', bytes=123)
log = self.logstream.getvalue()
- self.failUnless(log.startswith('192.168.0.123 -'), log)
+ self.assertTrue(log.startswith('192.168.0.123 -'), log)
class ExceptionAppTestCase (WSGITestCase):
@@ -815,12 +815,12 @@ if libbe.TESTING:
def test_traceback(self):
try:
self.getURL(self.app)
- except ValueError, e:
+ except ValueError as e:
pass
log = self.logstream.getvalue()
- self.failUnless(log.startswith('Traceback'), log)
- self.failUnless('child_app' in log, log)
- self.failUnless('ValueError: Dummy Error' in log, log)
+ self.assertTrue(log.startswith('Traceback'), log)
+ self.assertTrue('child_app' in log, log)
+ self.assertTrue('ValueError: Dummy Error' in log, log)
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
@@ -887,15 +887,14 @@ def _create_cert_request(pkey, digest="md5", **name):
req = OpenSSL.crypto.X509Req()
subj = req.get_subject()
- for (key,value) in name.items():
+ for (key,value) in list(name.items()):
setattr(subj, key, value)
req.set_pubkey(pkey)
req.sign(pkey, digest)
return req
-def _create_certificate(req, (issuerCert, issuerKey), serial,
- (notBefore, notAfter), digest='md5'):
+def _create_certificate(req, xxx_todo_changeme, serial, xxx_todo_changeme1, digest='md5'):
"""Generate a certificate given a certificate request.
Returns the signed certificate in an X509 object.
@@ -919,6 +918,8 @@ def _create_certificate(req, (issuerCert, issuerKey), serial,
digest :
Digest method to use for signing, default is md5
"""
+ (issuerCert, issuerKey) = xxx_todo_changeme
+ (notBefore, notAfter) = xxx_todo_changeme1
cert = OpenSSL.crypto.X509()
cert.set_serial_number(serial)
cert.gmtime_adj_notBefore(notBefore)