aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/util/wsgi.py
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/util/wsgi.py')
-rw-r--r--libbe/util/wsgi.py65
1 files changed, 33 insertions, 32 deletions
diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py
index c509a48..96ddf60 100644
--- a/libbe/util/wsgi.py
+++ b/libbe/util/wsgi.py
@@ -32,13 +32,13 @@ import os.path
import re
import select
import signal
-import StringIO
+import io
import sys
import time
import traceback
import types
-import urllib
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.parse
import wsgiref.simple_server
try:
@@ -109,8 +109,8 @@ class User (object):
string = string.strip()
fields = string.split(':')
if len(fields) != 3:
- raise ValueError, '{}!=3 fields in "{}"'.format(
- len(fields), string)
+ raise ValueError('{}!=3 fields in "{}"'.format(
+ len(fields), string))
self.uname,self.name,self.passhash = fields
def __str__(self):
@@ -229,7 +229,7 @@ class WSGI_Object (object):
def log_request(self, environ, status='-1 OK', bytes=-1):
if self.logger is None or self.logger.level > self.log_level:
return
- req_uri = urllib.quote(environ.get('SCRIPT_NAME', '')
+ req_uri = urllib.parse.quote(environ.get('SCRIPT_NAME', '')
+ environ.get('PATH_INFO', ''))
if environ.get('QUERY_STRING'):
req_uri += '?' + environ['QUERY_STRING']
@@ -276,7 +276,7 @@ class ExceptionApp (WSGI_Middleware):
def _call(self, environ, start_response):
try:
return self.app(environ, start_response)
- except Exception, e:
+ except Exception as e:
etype,value,tb = sys.exc_info()
trace = ''.join(
traceback.format_exception(etype, value, tb, None))
@@ -290,7 +290,7 @@ class HandlerErrorApp (WSGI_Middleware):
def _call(self, environ, start_response):
try:
return self.app(environ, start_response)
- except HandlerError, e:
+ except HandlerError as e:
self.log_request(environ, status=str(e), bytes=0)
start_response('{} {}'.format(e.code, e.msg), e.headers)
return []
@@ -338,7 +338,7 @@ class UppercaseHeaderApp (WSGI_Middleware):
http://www.python.org/dev/peps/pep-0333/#id20
"""
def _call(self, environ, start_response):
- for key,value in environ.items():
+ for key,value in list(environ.items()):
if key.startswith('HTTP_'):
uppercase = key.upper()
if uppercase != key:
@@ -363,11 +363,11 @@ class WSGI_DataObject (WSGI_Object):
if content is None:
start_response('200 OK', [])
return []
- if type(content) is types.UnicodeType:
+ if type(content) is str:
content = content.encode('utf-8')
for i,header in enumerate(headers):
header_name,header_value = header
- if type(header_value) == types.UnicodeType:
+ if type(header_value) == str:
headers[i] = (header_name, header_value.encode('ISO-8859-1'))
response = '200 OK'
content_length = len(content)
@@ -388,9 +388,9 @@ class WSGI_DataObject (WSGI_Object):
def _parse_query(self, query):
if len(query) == 0:
return {}
- data = urlparse.parse_qs(
+ data = urllib.parse.parse_qs(
query, keep_blank_values=True, strict_parsing=True)
- for k,v in data.items():
+ for k,v in list(data.items()):
if len(v) == 1:
data[k] = v[0]
return data
@@ -411,7 +411,7 @@ class WSGI_DataObject (WSGI_Object):
clen = 0
if clen != 0:
if self.maxlen > 0 and clen > self.maxlen:
- raise ValueError, 'Maximum content length exceeded'
+ raise ValueError('Maximum content length exceeded')
return environ['wsgi.input'].read(clen)
return ''
@@ -722,8 +722,8 @@ class WSGICaller (object):
'SERVER_PROTOCOL':'HTTP/1.1',
'wsgi.version':(1,0),
'wsgi.url_scheme':'http',
- 'wsgi.input':StringIO.StringIO(),
- 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.input':io.StringIO(),
+ 'wsgi.errors':io.StringIO(),
'wsgi.multithread':False,
'wsgi.multiprocess':False,
'wsgi.run_once':False,
@@ -737,17 +737,17 @@ class WSGICaller (object):
env['scheme'] = scheme
if data_dict is not None:
assert data is None, (data, data_dict)
- data = urllib.urlencode(data_dict)
+ data = urllib.parse.urlencode(data_dict)
if data is not None:
if data_dict is None:
assert method == 'POST', (method, data)
if method == 'POST':
env['CONTENT_LENGTH'] = len(data)
- env['wsgi.input'] = StringIO.StringIO(data)
+ env['wsgi.input'] = io.StringIO(data)
else:
assert method in ['GET', 'HEAD'], method
env['QUERY_STRING'] = data
- for key,value in environ.items():
+ for key,value in list(environ.items()):
env[key] = value
return ''.join(app(env, self.start_response))
@@ -760,7 +760,7 @@ class WSGICaller (object):
if libbe.TESTING:
class WSGITestCase (unittest.TestCase):
def setUp(self):
- self.logstream = StringIO.StringIO()
+ self.logstream = io.StringIO()
self.logger = logging.getLogger('be-wsgi-test')
console = logging.StreamHandler(self.logstream)
console.setFormatter(logging.Formatter('%(message)s'))
@@ -789,20 +789,20 @@ if libbe.TESTING:
error=123,
message='Dummy Error',
headers=[('X-Dummy-Header','Dummy Value')])
- self.failUnless(contents == ['Dummy Error'], contents)
- self.failUnless(
+ self.assertTrue(contents == ['Dummy Error'], contents)
+ self.assertTrue(
self.caller.status == '123 Dummy Error', self.caller.status)
- self.failUnless(self.caller.response_headers == [
+ self.assertTrue(self.caller.response_headers == [
('Content-Type','text/plain'),
('X-Dummy-Header','Dummy Value')],
self.caller.response_headers)
- self.failUnless(self.caller.exc_info == None, self.caller.exc_info)
+ self.assertTrue(self.caller.exc_info == None, self.caller.exc_info)
def test_log_request(self):
self.app.log_request(
environ=self.caller.default_environ, status='-1 OK', bytes=123)
log = self.logstream.getvalue()
- self.failUnless(log.startswith('192.168.0.123 -'), log)
+ self.assertTrue(log.startswith('192.168.0.123 -'), log)
class ExceptionAppTestCase (WSGITestCase):
@@ -815,12 +815,12 @@ if libbe.TESTING:
def test_traceback(self):
try:
self.getURL(self.app)
- except ValueError, e:
+ except ValueError as e:
pass
log = self.logstream.getvalue()
- self.failUnless(log.startswith('Traceback'), log)
- self.failUnless('child_app' in log, log)
- self.failUnless('ValueError: Dummy Error' in log, log)
+ self.assertTrue(log.startswith('Traceback'), log)
+ self.assertTrue('child_app' in log, log)
+ self.assertTrue('ValueError: Dummy Error' in log, log)
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
@@ -887,15 +887,14 @@ def _create_cert_request(pkey, digest="md5", **name):
req = OpenSSL.crypto.X509Req()
subj = req.get_subject()
- for (key,value) in name.items():
+ for (key,value) in list(name.items()):
setattr(subj, key, value)
req.set_pubkey(pkey)
req.sign(pkey, digest)
return req
-def _create_certificate(req, (issuerCert, issuerKey), serial,
- (notBefore, notAfter), digest='md5'):
+def _create_certificate(req, xxx_todo_changeme, serial, xxx_todo_changeme1, digest='md5'):
"""Generate a certificate given a certificate request.
Returns the signed certificate in an X509 object.
@@ -919,6 +918,8 @@ def _create_certificate(req, (issuerCert, issuerKey), serial,
digest :
Digest method to use for signing, default is md5
"""
+ (issuerCert, issuerKey) = xxx_todo_changeme
+ (notBefore, notAfter) = xxx_todo_changeme1
cert = OpenSSL.crypto.X509()
cert.set_serial_number(serial)
cert.gmtime_adj_notBefore(notBefore)