diff options
Diffstat (limited to 'libbe/util/wsgi.py')
-rw-r--r-- | libbe/util/wsgi.py | 65 |
1 files changed, 33 insertions, 32 deletions
diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py index c509a48..96ddf60 100644 --- a/libbe/util/wsgi.py +++ b/libbe/util/wsgi.py @@ -32,13 +32,13 @@ import os.path import re import select import signal -import StringIO +import io import sys import time import traceback import types -import urllib -import urlparse +import urllib.request, urllib.parse, urllib.error +import urllib.parse import wsgiref.simple_server try: @@ -109,8 +109,8 @@ class User (object): string = string.strip() fields = string.split(':') if len(fields) != 3: - raise ValueError, '{}!=3 fields in "{}"'.format( - len(fields), string) + raise ValueError('{}!=3 fields in "{}"'.format( + len(fields), string)) self.uname,self.name,self.passhash = fields def __str__(self): @@ -229,7 +229,7 @@ class WSGI_Object (object): def log_request(self, environ, status='-1 OK', bytes=-1): if self.logger is None or self.logger.level > self.log_level: return - req_uri = urllib.quote(environ.get('SCRIPT_NAME', '') + req_uri = urllib.parse.quote(environ.get('SCRIPT_NAME', '') + environ.get('PATH_INFO', '')) if environ.get('QUERY_STRING'): req_uri += '?' + environ['QUERY_STRING'] @@ -276,7 +276,7 @@ class ExceptionApp (WSGI_Middleware): def _call(self, environ, start_response): try: return self.app(environ, start_response) - except Exception, e: + except Exception as e: etype,value,tb = sys.exc_info() trace = ''.join( traceback.format_exception(etype, value, tb, None)) @@ -290,7 +290,7 @@ class HandlerErrorApp (WSGI_Middleware): def _call(self, environ, start_response): try: return self.app(environ, start_response) - except HandlerError, e: + except HandlerError as e: self.log_request(environ, status=str(e), bytes=0) start_response('{} {}'.format(e.code, e.msg), e.headers) return [] @@ -338,7 +338,7 @@ class UppercaseHeaderApp (WSGI_Middleware): http://www.python.org/dev/peps/pep-0333/#id20 """ def _call(self, environ, start_response): - for key,value in environ.items(): + for key,value in list(environ.items()): if key.startswith('HTTP_'): uppercase = key.upper() if uppercase != key: @@ -363,11 +363,11 @@ class WSGI_DataObject (WSGI_Object): if content is None: start_response('200 OK', []) return [] - if type(content) is types.UnicodeType: + if type(content) is str: content = content.encode('utf-8') for i,header in enumerate(headers): header_name,header_value = header - if type(header_value) == types.UnicodeType: + if type(header_value) == str: headers[i] = (header_name, header_value.encode('ISO-8859-1')) response = '200 OK' content_length = len(content) @@ -388,9 +388,9 @@ class WSGI_DataObject (WSGI_Object): def _parse_query(self, query): if len(query) == 0: return {} - data = urlparse.parse_qs( + data = urllib.parse.parse_qs( query, keep_blank_values=True, strict_parsing=True) - for k,v in data.items(): + for k,v in list(data.items()): if len(v) == 1: data[k] = v[0] return data @@ -411,7 +411,7 @@ class WSGI_DataObject (WSGI_Object): clen = 0 if clen != 0: if self.maxlen > 0 and clen > self.maxlen: - raise ValueError, 'Maximum content length exceeded' + raise ValueError('Maximum content length exceeded') return environ['wsgi.input'].read(clen) return '' @@ -722,8 +722,8 @@ class WSGICaller (object): 'SERVER_PROTOCOL':'HTTP/1.1', 'wsgi.version':(1,0), 'wsgi.url_scheme':'http', - 'wsgi.input':StringIO.StringIO(), - 'wsgi.errors':StringIO.StringIO(), + 'wsgi.input':io.StringIO(), + 'wsgi.errors':io.StringIO(), 'wsgi.multithread':False, 'wsgi.multiprocess':False, 'wsgi.run_once':False, @@ -737,17 +737,17 @@ class WSGICaller (object): env['scheme'] = scheme if data_dict is not None: assert data is None, (data, data_dict) - data = urllib.urlencode(data_dict) + data = urllib.parse.urlencode(data_dict) if data is not None: if data_dict is None: assert method == 'POST', (method, data) if method == 'POST': env['CONTENT_LENGTH'] = len(data) - env['wsgi.input'] = StringIO.StringIO(data) + env['wsgi.input'] = io.StringIO(data) else: assert method in ['GET', 'HEAD'], method env['QUERY_STRING'] = data - for key,value in environ.items(): + for key,value in list(environ.items()): env[key] = value return ''.join(app(env, self.start_response)) @@ -760,7 +760,7 @@ class WSGICaller (object): if libbe.TESTING: class WSGITestCase (unittest.TestCase): def setUp(self): - self.logstream = StringIO.StringIO() + self.logstream = io.StringIO() self.logger = logging.getLogger('be-wsgi-test') console = logging.StreamHandler(self.logstream) console.setFormatter(logging.Formatter('%(message)s')) @@ -789,20 +789,20 @@ if libbe.TESTING: error=123, message='Dummy Error', headers=[('X-Dummy-Header','Dummy Value')]) - self.failUnless(contents == ['Dummy Error'], contents) - self.failUnless( + self.assertTrue(contents == ['Dummy Error'], contents) + self.assertTrue( self.caller.status == '123 Dummy Error', self.caller.status) - self.failUnless(self.caller.response_headers == [ + self.assertTrue(self.caller.response_headers == [ ('Content-Type','text/plain'), ('X-Dummy-Header','Dummy Value')], self.caller.response_headers) - self.failUnless(self.caller.exc_info == None, self.caller.exc_info) + self.assertTrue(self.caller.exc_info == None, self.caller.exc_info) def test_log_request(self): self.app.log_request( environ=self.caller.default_environ, status='-1 OK', bytes=123) log = self.logstream.getvalue() - self.failUnless(log.startswith('192.168.0.123 -'), log) + self.assertTrue(log.startswith('192.168.0.123 -'), log) class ExceptionAppTestCase (WSGITestCase): @@ -815,12 +815,12 @@ if libbe.TESTING: def test_traceback(self): try: self.getURL(self.app) - except ValueError, e: + except ValueError as e: pass log = self.logstream.getvalue() - self.failUnless(log.startswith('Traceback'), log) - self.failUnless('child_app' in log, log) - self.failUnless('ValueError: Dummy Error' in log, log) + self.assertTrue(log.startswith('Traceback'), log) + self.assertTrue('child_app' in log, log) + self.assertTrue('ValueError: Dummy Error' in log, log) unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) @@ -887,15 +887,14 @@ def _create_cert_request(pkey, digest="md5", **name): req = OpenSSL.crypto.X509Req() subj = req.get_subject() - for (key,value) in name.items(): + for (key,value) in list(name.items()): setattr(subj, key, value) req.set_pubkey(pkey) req.sign(pkey, digest) return req -def _create_certificate(req, (issuerCert, issuerKey), serial, - (notBefore, notAfter), digest='md5'): +def _create_certificate(req, xxx_todo_changeme, serial, xxx_todo_changeme1, digest='md5'): """Generate a certificate given a certificate request. Returns the signed certificate in an X509 object. @@ -919,6 +918,8 @@ def _create_certificate(req, (issuerCert, issuerKey), serial, digest : Digest method to use for signing, default is md5 """ + (issuerCert, issuerKey) = xxx_todo_changeme + (notBefore, notAfter) = xxx_todo_changeme1 cert = OpenSSL.crypto.X509() cert.set_serial_number(serial) cert.gmtime_adj_notBefore(notBefore) |