diff options
Diffstat (limited to 'libbe/util')
-rw-r--r-- | libbe/util/encoding.py | 2 | ||||
-rw-r--r-- | libbe/util/http.py | 20 | ||||
-rw-r--r-- | libbe/util/id.py | 48 | ||||
-rw-r--r-- | libbe/util/subproc.py | 12 | ||||
-rw-r--r-- | libbe/util/utility.py | 2 | ||||
-rw-r--r-- | libbe/util/wsgi.py | 65 |
6 files changed, 75 insertions, 74 deletions
diff --git a/libbe/util/encoding.py b/libbe/util/encoding.py index 712a3fa..c5e242b 100644 --- a/libbe/util/encoding.py +++ b/libbe/util/encoding.py @@ -93,7 +93,7 @@ def get_file_contents(path, mode='r', encoding=None, decode=False): return contents def set_file_contents(path, contents, mode='w', encoding=None): - if type(contents) == types.UnicodeType: + if type(contents) == str: if encoding == None: encoding = get_text_file_encoding() f = codecs.open(path, mode, encoding) diff --git a/libbe/util/http.py b/libbe/util/http.py index a1ab06a..2e4dae8 100644 --- a/libbe/util/http.py +++ b/libbe/util/http.py @@ -23,8 +23,8 @@ # httplib.responses # but it is slow to load. -import urllib -import urllib2 +import urllib.request, urllib.parse, urllib.error +import urllib.request, urllib.error, urllib.parse from libbe import TESTING @@ -92,19 +92,19 @@ def get_post_url(url, get=True, data=None, data_dict=None, headers=[], if get is True: if data_dict != {}: # encode get parameters in the url - param_string = urllib.urlencode(data_dict) + param_string = urllib.parse.urlencode(data_dict) url = '{}?{}'.format(url, param_string) else: - data = urllib.urlencode(data_dict) + data = urllib.parse.urlencode(data_dict) else: assert get is False, (data, get) assert data_dict is None, (data, data_dict) headers = dict(headers) headers['User-Agent'] = agent - req = urllib2.Request(url, data=data, headers=headers) + req = urllib.request.Request(url, data=data, headers=headers) try: - response = urllib2.urlopen(req) - except urllib2.HTTPError, e: + response = urllib.request.urlopen(req) + except urllib.error.HTTPError as e: if e.code == HTTP_USER_ERROR: lines = ['The server reported a user error (HTTPError)'] else: @@ -115,7 +115,7 @@ def get_post_url(url, get=True, data=None, data_dict=None, headers=[], lines.append('Error code: {}'.format(e.code)) msg = '\n'.join(lines) raise HTTPError(error=e, url=url, msg=msg) - except urllib2.URLError, e: + except urllib.error.URLError as e: msg = ('We failed to connect to the server (URLError).\nURL: {}\n' 'Reason: {}').format(url, e.reason) raise HTTPError(error=e, url=url, msg=msg) @@ -132,7 +132,7 @@ if TESTING: def test_get(self): url = 'http://bugseverywhere.org/' page,final_url,info = get_post_url(url=url) - self.failUnless(final_url == url, + self.assertTrue(final_url == url, 'Redirect?\n Expected: "{}"\n Got: "{}"'.format( url, final_url)) @@ -140,6 +140,6 @@ if TESTING: url = 'http://physics.drexel.edu/~wking/code/be/redirect' expected = 'http://physics.drexel.edu/~wking/' page,final_url,info = get_post_url(url=url) - self.failUnless(final_url == expected, + self.assertTrue(final_url == expected, 'Redirect?\n Expected: "{}"\n Got: "{}"'.format( expected, final_url)) diff --git a/libbe/util/id.py b/libbe/util/id.py index de9edf9..078dd3f 100644 --- a/libbe/util/id.py +++ b/libbe/util/id.py @@ -103,14 +103,14 @@ except ImportError: # win32 don't have os.execvp() so have to run command in a shell q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True, cwd=cwd) - except OSError, e : + except OSError as e : strerror = "%s\nwhile executing %s" % (e.args[1], args) - raise OSError, strerror + raise OSError(strerror) output, error = q.communicate() status = q.wait() if status != 0: strerror = "%s\nwhile executing %s" % (status, args) - raise Exception, strerror + raise Exception(strerror) return output.rstrip('\n') @@ -414,11 +414,11 @@ def long_to_short_user(bugdirs, id): long_to_short_text : conversion on a block of text """ ids = _split(id, check_length=True) - matching_bugdirs = [bd for bd in bugdirs.values() if bd.uuid == ids[0]] + matching_bugdirs = [bd for bd in list(bugdirs.values()) if bd.uuid == ids[0]] if len(matching_bugdirs) == 0: - raise NoIDMatches(id, [bd.uuid for bd in bugdirs.values()]) + raise NoIDMatches(id, [bd.uuid for bd in list(bugdirs.values())]) elif len(matching_bugdirs) > 1: - raise MultipleIDMatches(id, '', [bd.uuid for bd in bugdirs.values()]) + raise MultipleIDMatches(id, '', [bd.uuid for bd in list(bugdirs.values())]) bugdir = matching_bugdirs[0] objects = [bugdir] if len(ids) >= 2: @@ -443,10 +443,10 @@ def short_to_long_user(bugdirs, id): """ ids = _split(id, check_length=True) ids[0] = _expand(ids[0], common=None, - other_ids=[bd.uuid for bd in bugdirs.values()]) + other_ids=[bd.uuid for bd in list(bugdirs.values())]) if len(ids) == 1: return _assemble(ids) - bugdir = [bd for bd in bugdirs.values() if bd.uuid == ids[0]][0] + bugdir = [bd for bd in list(bugdirs.values()) if bd.uuid == ids[0]][0] ids[1] = _expand(ids[1], common=bugdir.id.user(), other_ids=bugdir.uuids()) if len(ids) == 2: @@ -603,7 +603,7 @@ if libbe.TESTING == True: class UUIDtestCase(unittest.TestCase): def testUUID_gen(self): id = uuid_gen() - self.failUnless(len(id) == 36, 'invalid UUID "%s"' % id) + self.assertTrue(len(id) == 36, 'invalid UUID "%s"' % id) class DummyObject (object): def __init__(self, uuid, parent=None, siblings=[]): @@ -629,31 +629,31 @@ if libbe.TESTING == True: self.b_id = self.bug.id self.c_id = self.comment.id def test_storage(self): - self.failUnless(self.bd_id.storage() == self.bugdir.uuid, + self.assertTrue(self.bd_id.storage() == self.bugdir.uuid, self.bd_id.storage()) - self.failUnless(self.b_id.storage() == self.bug.uuid, + self.assertTrue(self.b_id.storage() == self.bug.uuid, self.b_id.storage()) - self.failUnless(self.c_id.storage() == self.comment.uuid, + self.assertTrue(self.c_id.storage() == self.comment.uuid, self.c_id.storage()) - self.failUnless(self.bd_id.storage('x', 'y', 'z') == \ + self.assertTrue(self.bd_id.storage('x', 'y', 'z') == \ '1234abcd/x/y/z', self.bd_id.storage('x', 'y', 'z')) def test_long_user(self): - self.failUnless(self.bd_id.long_user() == self.bugdir.uuid, + self.assertTrue(self.bd_id.long_user() == self.bugdir.uuid, self.bd_id.long_user()) - self.failUnless(self.b_id.long_user() == \ + self.assertTrue(self.b_id.long_user() == \ '/'.join([self.bugdir.uuid, self.bug.uuid]), self.b_id.long_user()) - self.failUnless(self.c_id.long_user() == + self.assertTrue(self.c_id.long_user() == '/'.join([self.bugdir.uuid, self.bug.uuid, self.comment.uuid]), self.c_id.long_user) def test_user(self): - self.failUnless(self.bd_id.user() == '123', + self.assertTrue(self.bd_id.user() == '123', self.bd_id.user()) - self.failUnless(self.b_id.user() == '123/abc', + self.assertTrue(self.b_id.user() == '123/abc', self.b_id.user()) - self.failUnless(self.c_id.user() == '123/abc/12345', + self.assertTrue(self.c_id.user() == '123/abc/12345', self.c_id.user()) class ShortLongParseTestCase(unittest.TestCase): @@ -690,12 +690,12 @@ if libbe.TESTING == True: None, '123/abc', ['1234abcd','1234cdef','12345678'])), ] def test_short_to_long_text(self): - self.failUnless(short_to_long_text( + self.assertTrue(short_to_long_text( self.bugdirs, self.short) == self.long, '\n' + self.short + '\n' + short_to_long_text( self.bugdirs, self.short) + '\n' + self.long) def test_long_to_short_text(self): - self.failUnless(long_to_short_text( + self.assertTrue(long_to_short_text( self.bugdirs, self.long) == self.short, '\n' + long_to_short_text( self.bugdirs, self.long @@ -703,7 +703,7 @@ if libbe.TESTING == True: def test_parse_user(self): for short_id,parsed in self.short_id_parse_pairs: ret = parse_user(self.bugdirs, short_id) - self.failUnless(ret == parsed, + self.assertTrue(ret == parsed, 'got %s\nexpected %s' % (ret, parsed)) def test_parse_user_exceptions(self): for short_id,exception in self.short_id_exception_pairs: @@ -712,13 +712,13 @@ if libbe.TESTING == True: self.fail('Expected parse_user(bugdir, "%s") to raise %s,' '\n but it returned %s' % (short_id, exception.__class__.__name__, ret)) - except exception.__class__, e: + except exception.__class__ as e: for attr in dir(e): if attr.startswith('_') or attr == 'args': continue value = getattr(e, attr) expected = getattr(exception, attr) - self.failUnless( + self.assertTrue( value == expected, 'Expected parse_user(bugdir, "%s") %s.%s' '\n to be %s, but it is %s\n\n%s' diff --git a/libbe/util/subproc.py b/libbe/util/subproc.py index 0ad010c..5412b08 100644 --- a/libbe/util/subproc.py +++ b/libbe/util/subproc.py @@ -25,7 +25,7 @@ import sys import types import libbe -from encoding import get_encoding +from .encoding import get_encoding if libbe.TESTING == True: import doctest @@ -56,7 +56,7 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,), """ if cwd == None: cwd = '.' - if isinstance(shell, types.StringTypes): + if isinstance(shell, (str,)): list_args = ' '.split(args) # sloppy, but just for logging str_args = args else: @@ -76,7 +76,7 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,), # win32 don't have os.execvp() so have to run command in a shell q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, **kwargs) - except OSError, e: + except OSError as e: raise CommandError(list_args, status=e.args[0], stderr=e) stdout,stderr = q.communicate(input=stdin) status = q.wait() @@ -84,10 +84,10 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,), if encoding == None: encoding = get_encoding() if stdout != None: - stdout = unicode(stdout, encoding) + stdout = str(stdout, encoding) if stderr != None: - stderr = unicode(stderr, encoding) - libbe.LOG.debug(u'{0}\n{1}{2}'.format(status, stdout, stderr)) + stderr = str(stderr, encoding) + libbe.LOG.debug('{0}\n{1}{2}'.format(status, stdout, stderr)) else: libbe.LOG.debug('{0}\n{1}{2}'.format(status, stdout, stderr)) if status not in expect: diff --git a/libbe/util/utility.py b/libbe/util/utility.py index 08ffca8..28de34f 100644 --- a/libbe/util/utility.py +++ b/libbe/util/utility.py @@ -227,7 +227,7 @@ def iterable_full_of_strings(value, alternative=None): elif not hasattr(value, '__iter__'): return False for x in value: - if type(x) not in types.StringTypes: + if type(x) not in (str,): return False return True diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py index c509a48..96ddf60 100644 --- a/libbe/util/wsgi.py +++ b/libbe/util/wsgi.py @@ -32,13 +32,13 @@ import os.path import re import select import signal -import StringIO +import io import sys import time import traceback import types -import urllib -import urlparse +import urllib.request, urllib.parse, urllib.error +import urllib.parse import wsgiref.simple_server try: @@ -109,8 +109,8 @@ class User (object): string = string.strip() fields = string.split(':') if len(fields) != 3: - raise ValueError, '{}!=3 fields in "{}"'.format( - len(fields), string) + raise ValueError('{}!=3 fields in "{}"'.format( + len(fields), string)) self.uname,self.name,self.passhash = fields def __str__(self): @@ -229,7 +229,7 @@ class WSGI_Object (object): def log_request(self, environ, status='-1 OK', bytes=-1): if self.logger is None or self.logger.level > self.log_level: return - req_uri = urllib.quote(environ.get('SCRIPT_NAME', '') + req_uri = urllib.parse.quote(environ.get('SCRIPT_NAME', '') + environ.get('PATH_INFO', '')) if environ.get('QUERY_STRING'): req_uri += '?' + environ['QUERY_STRING'] @@ -276,7 +276,7 @@ class ExceptionApp (WSGI_Middleware): def _call(self, environ, start_response): try: return self.app(environ, start_response) - except Exception, e: + except Exception as e: etype,value,tb = sys.exc_info() trace = ''.join( traceback.format_exception(etype, value, tb, None)) @@ -290,7 +290,7 @@ class HandlerErrorApp (WSGI_Middleware): def _call(self, environ, start_response): try: return self.app(environ, start_response) - except HandlerError, e: + except HandlerError as e: self.log_request(environ, status=str(e), bytes=0) start_response('{} {}'.format(e.code, e.msg), e.headers) return [] @@ -338,7 +338,7 @@ class UppercaseHeaderApp (WSGI_Middleware): http://www.python.org/dev/peps/pep-0333/#id20 """ def _call(self, environ, start_response): - for key,value in environ.items(): + for key,value in list(environ.items()): if key.startswith('HTTP_'): uppercase = key.upper() if uppercase != key: @@ -363,11 +363,11 @@ class WSGI_DataObject (WSGI_Object): if content is None: start_response('200 OK', []) return [] - if type(content) is types.UnicodeType: + if type(content) is str: content = content.encode('utf-8') for i,header in enumerate(headers): header_name,header_value = header - if type(header_value) == types.UnicodeType: + if type(header_value) == str: headers[i] = (header_name, header_value.encode('ISO-8859-1')) response = '200 OK' content_length = len(content) @@ -388,9 +388,9 @@ class WSGI_DataObject (WSGI_Object): def _parse_query(self, query): if len(query) == 0: return {} - data = urlparse.parse_qs( + data = urllib.parse.parse_qs( query, keep_blank_values=True, strict_parsing=True) - for k,v in data.items(): + for k,v in list(data.items()): if len(v) == 1: data[k] = v[0] return data @@ -411,7 +411,7 @@ class WSGI_DataObject (WSGI_Object): clen = 0 if clen != 0: if self.maxlen > 0 and clen > self.maxlen: - raise ValueError, 'Maximum content length exceeded' + raise ValueError('Maximum content length exceeded') return environ['wsgi.input'].read(clen) return '' @@ -722,8 +722,8 @@ class WSGICaller (object): 'SERVER_PROTOCOL':'HTTP/1.1', 'wsgi.version':(1,0), 'wsgi.url_scheme':'http', - 'wsgi.input':StringIO.StringIO(), - 'wsgi.errors':StringIO.StringIO(), + 'wsgi.input':io.StringIO(), + 'wsgi.errors':io.StringIO(), 'wsgi.multithread':False, 'wsgi.multiprocess':False, 'wsgi.run_once':False, @@ -737,17 +737,17 @@ class WSGICaller (object): env['scheme'] = scheme if data_dict is not None: assert data is None, (data, data_dict) - data = urllib.urlencode(data_dict) + data = urllib.parse.urlencode(data_dict) if data is not None: if data_dict is None: assert method == 'POST', (method, data) if method == 'POST': env['CONTENT_LENGTH'] = len(data) - env['wsgi.input'] = StringIO.StringIO(data) + env['wsgi.input'] = io.StringIO(data) else: assert method in ['GET', 'HEAD'], method env['QUERY_STRING'] = data - for key,value in environ.items(): + for key,value in list(environ.items()): env[key] = value return ''.join(app(env, self.start_response)) @@ -760,7 +760,7 @@ class WSGICaller (object): if libbe.TESTING: class WSGITestCase (unittest.TestCase): def setUp(self): - self.logstream = StringIO.StringIO() + self.logstream = io.StringIO() self.logger = logging.getLogger('be-wsgi-test') console = logging.StreamHandler(self.logstream) console.setFormatter(logging.Formatter('%(message)s')) @@ -789,20 +789,20 @@ if libbe.TESTING: error=123, message='Dummy Error', headers=[('X-Dummy-Header','Dummy Value')]) - self.failUnless(contents == ['Dummy Error'], contents) - self.failUnless( + self.assertTrue(contents == ['Dummy Error'], contents) + self.assertTrue( self.caller.status == '123 Dummy Error', self.caller.status) - self.failUnless(self.caller.response_headers == [ + self.assertTrue(self.caller.response_headers == [ ('Content-Type','text/plain'), ('X-Dummy-Header','Dummy Value')], self.caller.response_headers) - self.failUnless(self.caller.exc_info == None, self.caller.exc_info) + self.assertTrue(self.caller.exc_info == None, self.caller.exc_info) def test_log_request(self): self.app.log_request( environ=self.caller.default_environ, status='-1 OK', bytes=123) log = self.logstream.getvalue() - self.failUnless(log.startswith('192.168.0.123 -'), log) + self.assertTrue(log.startswith('192.168.0.123 -'), log) class ExceptionAppTestCase (WSGITestCase): @@ -815,12 +815,12 @@ if libbe.TESTING: def test_traceback(self): try: self.getURL(self.app) - except ValueError, e: + except ValueError as e: pass log = self.logstream.getvalue() - self.failUnless(log.startswith('Traceback'), log) - self.failUnless('child_app' in log, log) - self.failUnless('ValueError: Dummy Error' in log, log) + self.assertTrue(log.startswith('Traceback'), log) + self.assertTrue('child_app' in log, log) + self.assertTrue('ValueError: Dummy Error' in log, log) unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) @@ -887,15 +887,14 @@ def _create_cert_request(pkey, digest="md5", **name): req = OpenSSL.crypto.X509Req() subj = req.get_subject() - for (key,value) in name.items(): + for (key,value) in list(name.items()): setattr(subj, key, value) req.set_pubkey(pkey) req.sign(pkey, digest) return req -def _create_certificate(req, (issuerCert, issuerKey), serial, - (notBefore, notAfter), digest='md5'): +def _create_certificate(req, xxx_todo_changeme, serial, xxx_todo_changeme1, digest='md5'): """Generate a certificate given a certificate request. Returns the signed certificate in an X509 object. @@ -919,6 +918,8 @@ def _create_certificate(req, (issuerCert, issuerKey), serial, digest : Digest method to use for signing, default is md5 """ + (issuerCert, issuerKey) = xxx_todo_changeme + (notBefore, notAfter) = xxx_todo_changeme1 cert = OpenSSL.crypto.X509() cert.set_serial_number(serial) cert.gmtime_adj_notBefore(notBefore) |