aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libbe/command/serve.py994
-rw-r--r--libbe/command/serve_commands.py208
-rw-r--r--libbe/storage/http.py10
-rw-r--r--libbe/util/wsgi.py962
4 files changed, 1054 insertions, 1120 deletions
diff --git a/libbe/command/serve.py b/libbe/command/serve.py
index 4057536..d1c7b9a 100644
--- a/libbe/command/serve.py
+++ b/libbe/command/serve.py
@@ -23,49 +23,21 @@ See Also
:mod:`libbe.storage.http` : the associated client
"""
-import hashlib
import logging
import os.path
-import posixpath
-import re
-import sys
-import time
-import traceback
-import types
-import urllib
-import wsgiref.simple_server
-try:
- # Python >= 2.6
- from urlparse import parse_qs
-except ImportError:
- # Python <= 2.5
- from cgi import parse_qs
-try:
- import cherrypy
- import cherrypy.wsgiserver
-except ImportError:
- cherrypy = None
-if cherrypy != None:
- try: # CherryPy >= 3.2
- import cherrypy.wsgiserver.ssl_builtin
- except ImportError: # CherryPy <= 3.1.X
- cherrypy.wsgiserver.ssl_builtin = None
-try:
- import OpenSSL
-except ImportError:
- OpenSSL = None
import libbe
import libbe.command
import libbe.command.util
-import libbe.util.encoding
import libbe.util.subproc
+import libbe.util.wsgi
import libbe.version
-if libbe.TESTING == True:
+if libbe.TESTING:
import copy
import doctest
import StringIO
+ import sys
import unittest
import wsgiref.validate
try:
@@ -75,419 +47,21 @@ if libbe.TESTING == True:
cherrypy_test_webtest = None
import libbe.bugdir
-
-class _HandlerError (Exception):
- def __init__(self, code, msg, headers=[]):
- Exception.__init__(self, '%d %s' % (code, msg))
- self.code = code
- self.msg = msg
- self.headers = headers
+ import libbe.util.wsgi
-class _Unauthenticated (_HandlerError):
- def __init__(self, realm, msg='User Not Authenticated', headers=[]):
- _HandlerError.__init__(self, 401, msg, headers+[
- ('WWW-Authenticate','Basic realm="%s"' % realm)])
-class _Unauthorized (_HandlerError):
- def __init__(self, msg='User Not Authorized', headers=[]):
- _HandlerError.__init__(self, 403, msg, headers)
+# return callback(environ, start_response)
+# except libbe.storage.NotReadable, e:
+# raise libbe.util.wsgi.HandlerError(403, 'Read permission denied')
+# except libbe.storage.NotWriteable, e:
+# raise libbe.util.wsgi.HandlerError(403, 'Write permission denied')
+# except libbe.storage.InvalidID, e:
+# raise libbe.util.wsgi.HandlerError(
+# self.http_user_error, 'InvalidID %s' % e)
-class User (object):
- def __init__(self, uname=None, name=None, passhash=None, password=None):
- self.uname = uname
- self.name = name
- self.passhash = passhash
- if passhash == None:
- if password != None:
- self.passhash = self.hash(password)
- else:
- assert password == None, \
- 'Redundant password %s with passhash %s' % (password, passhash)
- self.users = None
- def from_string(self, string):
- string = string.strip()
- fields = string.split(':')
- if len(fields) != 3:
- raise ValueError, '%d!=3 fields in "%s"' % (len(fields), string)
- self.uname,self.name,self.passhash = fields
- def __str__(self):
- return ':'.join([self.uname, self.name, self.passhash])
- def __cmp__(self, other):
- return cmp(self.uname, other.uname)
- def hash(self, password):
- return hashlib.sha1(password).hexdigest()
- def valid_login(self, password):
- if self.hash(password) == self.passhash:
- return True
- return False
- def set_name(self, name):
- self._set_property('name', name)
- def set_password(self, password):
- self._set_property('passhash', self.hash(password))
- def _set_property(self, property, value):
- if self.uname == 'guest':
- raise _Unauthorized('guest user not allowed to change %s' % property)
- if getattr(self, property) != value \
- and self.users != None:
- self.users.changed = True
- setattr(self, property, value)
-
-class Users (dict):
- def __init__(self, filename=None):
- dict.__init__(self)
- self.filename = filename
- self.changed = False
- def load(self):
- if self.filename == None:
- return
- user_file = libbe.util.encoding.get_file_contents(
- self.filename, decode=True)
- self.clear()
- for line in user_file.splitlines():
- user = User()
- user.from_string(line)
- self.add_user(user)
- def save(self):
- if self.filename != None and self.changed == True:
- lines = []
- for user in sorted(self.users):
- lines.append(str(user))
- libbe.util.encoding.set_file_contents(self.filename)
- self.changed = False
- def add_user(self, user):
- assert user.users == None, user.users
- user.users = self
- self[user.uname] = user
- def valid_login(self, uname, password):
- if uname in self and \
- self[uname].valid_login(password) == True:
- return True
- return False
-
-class WSGI_Object (object):
- """Utility class for WGSI clients and middleware.
-
- For details on WGSI, see `PEP 333`_
-
- .. _PEP 333: http://www.python.org/dev/peps/pep-0333/
- """
- def __init__(self, logger=None, log_level=logging.INFO, log_format=None):
- self.logger = logger
- self.log_level = log_level
- if log_format == None:
- self.log_format = (
- '%(REMOTE_ADDR)s - %(REMOTE_USER)s [%(time)s] '
- '"%(REQUEST_METHOD)s %(REQUEST_URI)s %(HTTP_VERSION)s" '
- '%(status)s %(bytes)s "%(HTTP_REFERER)s" "%(HTTP_USER_AGENT)s"')
- else:
- self.log_format = log_format
-
- def __call__(self, environ, start_response):
- """The main WSGI entry point."""
- raise NotImplementedError
- # start_response() is a callback for setting response headers
- # start_response(status, response_headers, exc_info=None)
- # status is an HTTP status string (e.g., "200 OK").
- # response_headers is a list of 2-tuples, the HTTP headers in
- # key-value format.
- # exc_info is used in exception handling.
- #
- # The application function then returns an iterable of body chunks.
-
- def error(self, environ, start_response, error, message, headers=[]):
- """Make it easy to call start_response for errors."""
- response = '%d %s' % (error, message)
- self.log_request(environ, status=response, bytes=len(message))
- start_response(response,
- [('Content-Type', 'text/plain')]+headers)
- return [message]
-
- def log_request(self, environ, status='-1 OK', bytes=-1):
- if self.logger is None or self.logger.level > self.log_level:
- return
- req_uri = urllib.quote(environ.get('SCRIPT_NAME', '')
- + environ.get('PATH_INFO', ''))
- if environ.get('QUERY_STRING'):
- req_uri += '?'+environ['QUERY_STRING']
- start = time.localtime()
- if time.daylight:
- offset = time.altzone / 60 / 60 * -100
- else:
- offset = time.timezone / 60 / 60 * -100
- if offset >= 0:
- offset = "+%0.4d" % (offset)
- elif offset < 0:
- offset = "%0.4d" % (offset)
- d = {
- 'REMOTE_ADDR': environ.get('REMOTE_ADDR') or '-',
- 'REMOTE_USER': environ.get('REMOTE_USER') or '-',
- 'REQUEST_METHOD': environ['REQUEST_METHOD'],
- 'REQUEST_URI': req_uri,
- 'HTTP_VERSION': environ.get('SERVER_PROTOCOL'),
- 'time': time.strftime('%d/%b/%Y:%H:%M:%S ', start) + offset,
- 'status': status.split(None, 1)[0],
- 'bytes': bytes,
- 'HTTP_REFERER': environ.get('HTTP_REFERER', '-'),
- 'HTTP_USER_AGENT': environ.get('HTTP_USER_AGENT', '-'),
- }
- self.logger.log(self.log_level, self.log_format % d)
-
-class ExceptionApp (WSGI_Object):
- """Some servers (e.g. cherrypy) eat app-raised exceptions.
-
- Work around that by logging tracebacks by hand.
- """
- def __init__(self, app, *args, **kwargs):
- WSGI_Object.__init__(self, *args, **kwargs)
- self.app = app
-
- def __call__(self, environ, start_response):
- if self.logger != None:
- self.logger.log(logging.DEBUG, 'ExceptionApp')
- try:
- return self.app(environ, start_response)
- except Exception, e:
- etype,value,tb = sys.exc_info()
- trace = ''.join(
- traceback.format_exception(etype, value, tb, None))
- self.logger.log(self.log_level, trace)
- raise
-
-class UppercaseHeaderApp (WSGI_Object):
- """WSGI middleware that uppercases incoming HTTP headers.
-
- From PEP 333, `The start_response() Callable`_ :
-
- A reminder for server/gateway authors: HTTP
- header names are case-insensitive, so be sure
- to take that into consideration when examining
- application-supplied headers!
-
- .. _The start_response() Callable:
- http://www.python.org/dev/peps/pep-0333/#id20
- """
- def __init__(self, app, *args, **kwargs):
- WSGI_Object.__init__(self, *args, **kwargs)
- self.app = app
-
- def __call__(self, environ, start_response):
- if self.logger != None:
- self.logger.log(logging.DEBUG, 'UppercaseHeaderApp')
- for key,value in environ.items():
- if key.startswith('HTTP_'):
- uppercase = key.upper()
- if uppercase != key:
- environ[uppercase] = environ.pop(key)
- return self.app(environ, start_response)
-
-class AuthenticationApp (WSGI_Object):
- """WSGI middleware for handling user authentication.
- """
- def __init__(self, app, realm, setting='be-auth', users=None, *args, **kwargs):
- WSGI_Object.__init__(self, *args, **kwargs)
- self.app = app
- self.realm = realm
- self.setting = setting
- self.users = users
-
- def __call__(self, environ, start_response):
- if self.logger != None:
- self.logger.log(logging.DEBUG, 'AuthenticationApp')
- environ['%s.realm' % self.setting] = self.realm
- try:
- username = self.authenticate(environ)
- environ['%s.user' % self.setting] = username
- environ['%s.user.name' % self.setting] = \
- self.users[username].name
- return self.app(environ, start_response)
- except _Unauthorized, e:
- return self.error(environ, start_response,
- e.code, e.msg, e.headers)
-
- def authenticate(self, environ):
- """Handle user-authentication sent in the "Authorization" header.
-
- This function implements ``Basic`` authentication as described in
- HTTP/1.0 specification [1]_ . Do not use this module unless you
- are using SSL, as it transmits unencrypted passwords.
-
- .. [1] http://www.w3.org/Protocols/HTTP/1.0/draft-ietf-http-spec.html#BasicAA
-
- Examples
- --------
-
- >>> users = Users()
- >>> users.add_user(User('Aladdin', 'Big Al', password='open sesame'))
- >>> app = AuthenticationApp(app=None, realm='Dummy Realm', users=users)
- >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='})
- 'Aladdin'
- >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic AAAAAAAAAAAAAAAAAAAAAAAAAA=='})
-
- Notes
- -----
- Code based on authkit/authenticate/basic.py
- (c) 2005 Clark C. Evans.
- Released under the MIT License:
- http://www.opensource.org/licenses/mit-license.php
- """
- authorization = environ.get('HTTP_AUTHORIZATION', None)
- if authorization == None:
- raise _Unauthorized('Authorization required')
- try:
- authmeth,auth = authorization.split(' ',1)
- except ValueError:
- return None
- if 'basic' != authmeth.lower():
- return None # non-basic HTTP authorization not implemented
- auth = auth.strip().decode('base64')
- try:
- username,password = auth.split(':',1)
- except ValueError:
- return None
- if self.authfunc(environ, username, password) == True:
- return username
-
- def authfunc(self, environ, username, password):
- if not username in self.users:
- return False
- if self.users[username].valid_login(password) == True:
- if self.logger != None:
- self.logger.log(self.log_level,
- 'Authenticated %s' % self.users[username].name)
- return True
- return False
-
-class WSGI_AppObject (WSGI_Object):
- """Useful WSGI utilities for handling data (POST, QUERY) and
- returning responses.
- """
- def __init__(self, *args, **kwargs):
- WSGI_Object.__init__(self, *args, **kwargs)
-
- # Maximum input we will accept when REQUEST_METHOD is POST
- # 0 ==> unlimited input
- self.maxlen = 0
-
- def ok_response(self, environ, start_response, content,
- content_type='application/octet-stream',
- headers=[]):
- if content == None:
- start_response('200 OK', [])
- return []
- if type(content) == types.UnicodeType:
- content = content.encode('utf-8')
- for i,header in enumerate(headers):
- header_name,header_value = header
- if type(header_value) == types.UnicodeType:
- headers[i] = (header_name, header_value.encode('ISO-8859-1'))
- response = '200 OK'
- content_length = len(content)
- self.log_request(environ, status=response, bytes=content_length)
- start_response('200 OK', [
- ('Content-Type', content_type),
- ('Content-Length', str(content_length)),
- ]+headers)
- if self.is_head(environ) == True:
- return []
- return [content]
-
- def query_data(self, environ):
- if not environ['REQUEST_METHOD'] in ['GET', 'HEAD']:
- raise _HandlerError(404, 'Not Found')
- return self._parse_query(environ.get('QUERY_STRING', ''))
-
- def _parse_query(self, query):
- if len(query) == 0:
- return {}
- data = parse_qs(
- query, keep_blank_values=True, strict_parsing=True)
- for k,v in data.items():
- if len(v) == 1:
- data[k] = v[0]
- return data
-
- def post_data(self, environ):
- if environ['REQUEST_METHOD'] != 'POST':
- raise _HandlerError(404, 'Not Found')
- post_data = self._read_post_data(environ)
- return self._parse_post(post_data)
-
- def _parse_post(self, post):
- return self._parse_query(post)
-
- def _read_post_data(self, environ):
- try:
- clen = int(environ.get('CONTENT_LENGTH', '0'))
- except ValueError:
- clen = 0
- if clen != 0:
- if self.maxlen > 0 and clen > self.maxlen:
- raise ValueError, 'Maximum content length exceeded'
- return environ['wsgi.input'].read(clen)
- return ''
-
- def data_get_string(self, data, key, default=None, source='query'):
- if not key in data or data[key] in [None, 'None']:
- if default == _HandlerError:
- raise _HandlerError(406, 'Missing %s key %s' % (source, key))
- return default
- return data[key]
-
- def data_get_id(self, data, key='id', default=_HandlerError,
- source='query'):
- return self.data_get_string(data, key, default, source)
-
- def data_get_boolean(self, data, key, default=False, source='query'):
- val = self.data_get_string(data, key, default, source)
- if val == 'True':
- return True
- elif val == 'False':
- return False
- return val
-
- def is_head(self, environ):
- return environ['REQUEST_METHOD'] == 'HEAD'
-
-
-class AdminApp (WSGI_AppObject):
- """WSGI middleware for managing users (changing passwords,
- usernames, etc.).
- """
- def __init__(self, app, users=None, url=r'^admin/?', *args, **kwargs):
- WSGI_AppObject.__init__(self, *args, **kwargs)
- self.app = app
- self.users = users
- self.url = url
-
- def __call__(self, environ, start_response):
- if self.logger != None:
- self.logger.log(logging.DEBUG, 'AdminApp')
- path = environ.get('PATH_INFO', '').lstrip('/')
- match = re.search(self.url, path)
- if match is not None:
- return self.admin(environ, start_response)
- return self.app(environ, start_response)
-
- def admin(self, environ, start_response):
- if not 'be-auth.user' in environ:
- raise _Unauthenticated(realm=envirion.get('be-auth.realm'))
- uname = environ.get('be-auth.user')
- user = self.users[uname]
- data = self.post_data(environ)
- source = 'post'
- name = self.data_get_string(
- data, 'name', default=None, source=source)
- if name != None:
- self.users[uname].set_name(name)
- password = self.data_get_string(
- data, 'password', default=None, source=source)
- if password != None:
- self.users[uname].set_password(password)
- self.users.save()
- return self.ok_response(environ, start_response, None)
-
-class ServerApp (WSGI_AppObject):
+class ServerApp (libbe.util.wsgi.WSGI_AppObject,
+ libbe.util.wsgi.WSGI_DataObject):
"""WSGI server for a BE Storage instance over HTTP.
RESTful_ WSGI request handler for serving the
@@ -509,59 +83,26 @@ class ServerApp (WSGI_AppObject):
"""
server_version = "BE-server/" + libbe.version.version()
- def __init__(self, storage, notify=False, **kwargs):
- WSGI_AppObject.__init__(self, **kwargs)
+ def __init__(self, storage=None, notify=False, **kwargs):
+ super(ServerApp, self).__init__(
+ urls=[
+ (r'^add/?', self.add),
+ (r'^exists/?', self.exists),
+ (r'^remove/?', self.remove),
+ (r'^ancestors/?', self.ancestors),
+ (r'^children/?', self.children),
+ (r'^get/(.+)', self.get),
+ (r'^set/(.+)', self.set),
+ (r'^commit/?', self.commit),
+ (r'^revision-id/?', self.revision_id),
+ (r'^changed/?', self.changed),
+ (r'^version/?', self.version),
+ ],
+ **kwargs)
self.storage = storage
self.notify = notify
self.http_user_error = 418
- self.urls = [
- (r'^add/?', self.add),
- (r'^exists/?', self.exists),
- (r'^remove/?', self.remove),
- (r'^ancestors/?', self.ancestors),
- (r'^children/?', self.children),
- (r'^get/(.+)', self.get),
- (r'^set/(.+)', self.set),
- (r'^commit/?', self.commit),
- (r'^revision-id/?', self.revision_id),
- (r'^changed/?', self.changed),
- (r'^version/?', self.version),
- ]
-
- def __call__(self, environ, start_response):
- """The main WSGI application.
-
- Dispatch the current request to the functions from above and
- store the regular expression captures in the WSGI environment
- as `be-server.url_args` so that the functions from above can
- access the url placeholders.
-
- URL dispatcher from Armin Ronacher's "Getting Started with WSGI"
- http://lucumr.pocoo.org/2007/5/21/getting-started-with-wsgi
- """
- if self.logger != None:
- self.logger.log(logging.DEBUG, 'ServerApp')
- path = environ.get('PATH_INFO', '').lstrip('/')
- try:
- for regex, callback in self.urls:
- match = re.search(regex, path)
- if match is not None:
- environ['be-server.url_args'] = match.groups()
- try:
- return callback(environ, start_response)
- except libbe.storage.NotReadable, e:
- raise _HandlerError(403, 'Read permission denied')
- except libbe.storage.NotWriteable, e:
- raise _HandlerError(403, 'Write permission denied')
- except libbe.storage.InvalidID, e:
- raise _HandlerError(
- self.http_user_error, 'InvalidID %s' % e)
- raise _HandlerError(404, 'Not Found')
- except _HandlerError, e:
- return self.error(environ, start_response,
- e.code, e.msg, e.headers)
-
# handlers
def add(self, environ, start_response):
self.check_login(environ)
@@ -630,7 +171,7 @@ class ServerApp (WSGI_AppObject):
try:
id = environ['be-server.url_args'][0]
except:
- raise _HandlerError(404, 'Not Found')
+ raise libbe.util.wsgi.HandlerError(404, 'Not Found')
revision = self.data_get_string(
data, 'revision', default=None, source=source)
content = self.storage.get(id, revision=revision)
@@ -644,9 +185,9 @@ class ServerApp (WSGI_AppObject):
try:
id = environ['be-server.url_args'][0]
except:
- raise _HandlerError(404, 'Not Found')
+ raise libbe.util.wsgi.HandlerError(404, 'Not Found')
if not 'value' in data:
- raise _HandlerError(406, 'Missing query key value')
+ raise libbe.util.wsgi.HandlerError(406, 'Missing query key value')
value = data['value']
self.storage.set(id, value)
if self.notify:
@@ -657,7 +198,8 @@ class ServerApp (WSGI_AppObject):
self.check_login(environ)
data = self.post_data(environ)
if not 'summary' in data:
- raise _HandlerError(406, 'Missing query key summary')
+ raise libbe.util.wsgi.HandlerError(
+ 406, 'Missing query key summary')
summary = data['summary']
if not 'body' in data or data['body'] == 'None':
data['body'] = None
@@ -670,7 +212,8 @@ class ServerApp (WSGI_AppObject):
try:
revision = self.storage.commit(summary, body, allow_empty)
except libbe.storage.EmptyCommit, e:
- raise _HandlerError(self.http_user_error, 'EmptyCommit')
+ raise libbe.util.wsgi.HandlerError(
+ self.http_user_error, 'EmptyCommit')
if self.notify:
self._notify(environ, 'commit', id,
[('allow_empty', allow_empty), ('summary', summary),
@@ -682,7 +225,8 @@ class ServerApp (WSGI_AppObject):
data = self.query_data(environ)
source = 'query'
index = int(self.data_get_string(
- data, 'index', default=_HandlerError, source=source))
+ data, 'index', default=libbe.util.wsgi.HandlerError,
+ source=source))
content = self.storage.revision_id(index)
return self.ok_response(environ, start_response, content)
@@ -708,7 +252,7 @@ class ServerApp (WSGI_AppObject):
# handler utility functions
def check_login(self, environ):
user = environ.get('be-auth.user', None)
- if user != None: # we're running under AuthenticationApp
+ if user is not None: # we're running under AuthenticationApp
if environ['REQUEST_METHOD'] == 'POST':
if user == 'guest' or self.storage.is_writeable() == False:
raise _Unauthorized() # only non-guests allowed to write
@@ -744,12 +288,7 @@ class ServerApp (WSGI_AppObject):
libbe.util.subproc.invoke(self.notify, stdin=message, shell=True)
-class SilentRequestHandler (wsgiref.simple_server.WSGIRequestHandler):
- def log_message(self, format, *args):
- pass
-
-
-class Serve (libbe.command.Command):
+class Serve (libbe.util.wsgi.ServerCommand):
"""Serve bug directory storage over HTTP.
This allows you to run local `be` commands interfacing with remote
@@ -761,120 +300,9 @@ class Serve (libbe.command.Command):
name = 'serve'
- def __init__(self, *args, **kwargs):
- libbe.command.Command.__init__(self, *args, **kwargs)
- self.options.extend([
- libbe.command.Option(name='port',
- help='Bind server to port (%default)',
- arg=libbe.command.Argument(
- name='port', metavar='INT', type='int', default=8000)),
- libbe.command.Option(name='host',
- help='Set host string (blank for localhost, %default)',
- arg=libbe.command.Argument(
- name='host', metavar='HOST', default='')),
- libbe.command.Option(name='read-only', short_name='r',
- help='Dissable operations that require writing'),
- libbe.command.Option(name='notify', short_name='n',
- help='Send notification emails for changes.',
- arg=libbe.command.Argument(
- name='notify', metavar='EMAIL-COMMAND', default=None)),
- libbe.command.Option(name='ssl', short_name='s',
- help='Use CherryPy to serve HTTPS (HTTP over SSL/TLS)'),
- libbe.command.Option(name='auth', short_name='a',
- help='Require authentication. FILE should be a file containing colon-separated UNAME:USER:sha1(PASSWORD) lines, for example: "jdoe:John Doe <jdoe@example.com>:read:d99f8e5a4b02dc25f49da2ea67c0034f61779e72"',
- arg=libbe.command.Argument(
- name='auth', metavar='FILE', default=None,
- completion_callback=libbe.command.util.complete_path)),
- ])
-
- def _run(self, **params):
- self._setup_logging()
- storage = self._get_storage()
- if params['read-only'] == True:
- writeable = storage.writeable
- storage.writeable = False
- if params['host'] == '':
- params['host'] = 'localhost'
- if params['auth'] != None:
- self._check_restricted_access(storage, params['auth'])
- users = Users(params['auth'])
- users.load()
- app = ServerApp(
- storage=storage, notify=params['notify'], logger=self.logger)
- if params['auth'] != None:
- app = AdminApp(app, users=users, logger=self.logger)
- app = AuthenticationApp(app, realm=storage.repo,
- users=users, logger=self.logger)
- app = UppercaseHeaderApp(app, logger=self.logger)
- server,details = self._get_server(params, app)
- details['repo'] = storage.repo
- try:
- self._start_server(params, server, details)
- except KeyboardInterrupt:
- pass
- self._stop_server(params, server)
- if params['read-only'] == True:
- storage.writeable = writeable
-
- def _setup_logging(self, log_level=logging.INFO):
- self.logger = logging.getLogger('be-serve')
- self.log_level = logging.INFO
- console = logging.StreamHandler(self.stdout)
- console.setFormatter(logging.Formatter('%(message)s'))
- self.logger.addHandler(console)
- self.logger.propagate = False
- if log_level is not None:
- console.setLevel(log_level)
- self.logger.setLevel(log_level)
-
- def _get_server(self, params, app):
- details = {'port':params['port']}
- app = ExceptionApp(app, logger=self.logger)
- if params['ssl'] == True:
- details['protocol'] = 'HTTPS'
- if cherrypy == None:
- raise libbe.command.UserError, \
- '--ssl requires the cherrypy module'
- server = cherrypy.wsgiserver.CherryPyWSGIServer(
- (params['host'], params['port']), app)
- #server.throw_errors = True
- #server.show_tracebacks = True
- private_key,certificate = get_cert_filenames(
- 'be-server', logger=self.logger)
- if cherrypy.wsgiserver.ssl_builtin == None:
- server.ssl_module = 'builtin'
- server.ssl_private_key = private_key
- server.ssl_certificate = certificate
- else:
- server.ssl_adapter = \
- cherrypy.wsgiserver.ssl_builtin.BuiltinSSLAdapter(
- certificate=certificate, private_key=private_key)
- details['socket-name'] = params['host']
- else:
- details['protocol'] = 'HTTP'
- server = wsgiref.simple_server.make_server(
- params['host'], params['port'], app,
- handler_class=SilentRequestHandler)
- details['socket-name'] = server.socket.getsockname()[0]
- return (server, details)
-
- def _start_server(self, params, server, details):
- self.logger.log(self.log_level,
- 'Serving %(protocol)s on %(socket-name)s port %(port)s ...' \
- % details)
- self.logger.log(self.log_level,
- 'BE repository %(repo)s' % details)
- if params['ssl'] == True:
- server.start()
- else:
- server.serve_forever()
-
- def _stop_server(self, params, server):
- self.logger.log(self.log_level, 'Clossing server')
- if params['ssl'] == True:
- server.stop()
- else:
- server.server_close()
+ def _get_app(self, logger, storage, **kwargs):
+ return ServerApp(
+ logger=logger, storage=storage, notify=kwargs.get('notify', False))
def _long_help(self):
return """
@@ -895,208 +323,26 @@ for example::
$ be --repo http://username:password@localhost:8000/ list
"""
-def random_string(length=256):
- if os.path.exists(os.path.join('dev', 'urandom')):
- return open("/dev/urandom").read(length)
- else:
- import array
- from random import randint
- d = array.array('B')
- for i in xrange(1000000):
- d.append(randint(0,255))
- return d.tostring()
-
-if libbe.TESTING == True:
- class WSGITestCase (unittest.TestCase):
- def setUp(self):
- self.logstream = StringIO.StringIO()
- self.logger = logging.getLogger('be-serve-test')
- console = logging.StreamHandler(self.logstream)
- console.setFormatter(logging.Formatter('%(message)s'))
- self.logger.addHandler(console)
- self.logger.propagate = False
- console.setLevel(logging.INFO)
- self.logger.setLevel(logging.INFO)
- self.default_environ = { # required by PEP 333
- 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
- 'REMOTE_ADDR': '192.168.0.123',
- 'SCRIPT_NAME':'',
- 'PATH_INFO': '',
- #'QUERY_STRING':'', # may be empty or absent
- #'CONTENT_TYPE':'', # may be empty or absent
- #'CONTENT_LENGTH':'', # may be empty or absent
- 'SERVER_NAME':'example.com',
- 'SERVER_PORT':'80',
- 'SERVER_PROTOCOL':'HTTP/1.1',
- 'wsgi.version':(1,0),
- 'wsgi.url_scheme':'http',
- 'wsgi.input':StringIO.StringIO(),
- 'wsgi.errors':StringIO.StringIO(),
- 'wsgi.multithread':False,
- 'wsgi.multiprocess':False,
- 'wsgi.run_once':False,
- }
- def getURL(self, app, path='/', method='GET', data=None,
- data_dict=None, scheme='http', environ={}):
- env = copy.copy(self.default_environ)
- env['PATH_INFO'] = path
- env['REQUEST_METHOD'] = method
- env['scheme'] = scheme
- if data_dict is not None:
- assert data is None, (data, data_dict)
- data = urllib.urlencode(data_dict)
- if data is not None:
- if data_dict is None:
- assert method == 'POST', (method, data)
- if method == 'POST':
- env['CONTENT_LENGTH'] = len(data)
- env['wsgi.input'] = StringIO.StringIO(data)
- else:
- assert method in ['GET', 'HEAD'], method
- env['QUERY_STRING'] = data
- for key,value in environ.items():
- env[key] = value
- return ''.join(app(env, self.start_response))
- def start_response(self, status, response_headers, exc_info=None):
- self.status = status
- self.response_headers = response_headers
- self.exc_info = exc_info
-
- class WSGI_ObjectTestCase (WSGITestCase):
- def setUp(self):
- WSGITestCase.setUp(self)
- self.app = WSGI_Object(self.logger)
- def test_error(self):
- contents = self.app.error(
- environ=self.default_environ,
- start_response=self.start_response,
- error=123,
- message='Dummy Error',
- headers=[('X-Dummy-Header','Dummy Value')])
- self.failUnless(contents == ['Dummy Error'], contents)
- self.failUnless(self.status == '123 Dummy Error', self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type','text/plain'),
- ('X-Dummy-Header','Dummy Value')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- def test_log_request(self):
- self.app.log_request(
- environ=self.default_environ, status='-1 OK', bytes=123)
- log = self.logstream.getvalue()
- self.failUnless(log.startswith('192.168.0.123 -'), log)
-
- class ExceptionAppTestCase (WSGITestCase):
- def setUp(self):
- WSGITestCase.setUp(self)
- def child_app(environ, start_response):
- raise ValueError('Dummy Error')
- self.app = ExceptionApp(child_app, self.logger)
- def test_traceback(self):
- try:
- self.getURL(self.app)
- except ValueError, e:
- pass
- log = self.logstream.getvalue()
- self.failUnless(log.startswith('Traceback'), log)
- self.failUnless('child_app' in log, log)
- self.failUnless('ValueError: Dummy Error' in log, log)
-
- class AdminAppTestCase (WSGITestCase):
- def setUp(self):
- WSGITestCase.setUp(self)
- self.users = Users()
- self.users.add_user(
- User('Aladdin', 'Big Al', password='open sesame'))
- self.users.add_user(
- User('guest', 'Guest', password='guestpass'))
- def child_app(environ, start_response):
- pass
- self.app = AdminApp(
- child_app, users=self.users, logger=self.logger)
- self.app = AuthenticationApp(
- self.app, realm='Dummy Realm', users=self.users,
- logger=self.logger)
- self.app = UppercaseHeaderApp(self.app, logger=self.logger)
- def basic_auth(self, uname, password):
- """HTTP basic authorization string"""
- return 'Basic %s' % \
- ('%s:%s' % (uname, password)).encode('base64')
- def test_new_name(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data_dict={'name':'Prince Al'},
- environ={'HTTP_Authorization':
- self.basic_auth('Aladdin', 'open sesame')})
- self.failUnless(self.status == '200 OK', self.status)
- self.failUnless(self.response_headers == [],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['Aladdin'].name == 'Prince Al',
- self.users['Aladdin'].name)
- self.failUnless(self.users.changed == True,
- self.users.changed)
- def test_new_password(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data_dict={'password':'New Pass'},
- environ={'HTTP_Authorization':
- self.basic_auth('Aladdin', 'open sesame')})
- self.failUnless(self.status == '200 OK', self.status)
- self.failUnless(self.response_headers == [],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['Aladdin'].passhash == \
- self.users['Aladdin'].hash('New Pass'),
- self.users['Aladdin'].passhash)
- self.failUnless(self.users.changed == True,
- self.users.changed)
- def test_guest_name(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data_dict={'name':'SPAM'},
- environ={'HTTP_Authorization':
- self.basic_auth('guest', 'guestpass')})
- self.failUnless(self.status.startswith('403 '), self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type', 'text/plain')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['guest'].name == 'Guest',
- self.users['guest'].name)
- self.failUnless(self.users.changed == False,
- self.users.changed)
- def test_guest_password(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data_dict={'password':'SPAM'},
- environ={'HTTP_Authorization':
- self.basic_auth('guest', 'guestpass')})
- self.failUnless(self.status.startswith('403 '), self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type', 'text/plain')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['guest'].name == 'Guest',
- self.users['guest'].name)
- self.failUnless(self.users.changed == False,
- self.users.changed)
- class ServerAppTestCase (WSGITestCase):
+if libbe.TESTING:
+ class ServerAppTestCase (libbe.util.wsgi.WSGITestCase):
def setUp(self):
- WSGITestCase.setUp(self)
+ super(ServerAppTestCase, self).setUp()
self.bd = libbe.bugdir.SimpleBugDir(memory=False)
self.app = ServerApp(self.bd.storage, logger=self.logger)
+
def tearDown(self):
self.bd.cleanup()
- WSGITestCase.tearDown(self)
+ super(ServerAppTestCase, self).tearDown()
+
def test_add_get(self):
- self.getURL(self.app, '/add/', method='GET')
- self.failUnless(self.status.startswith('404 '), self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type', 'text/plain')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
+ try:
+ self.getURL(self.app, '/add/', method='GET')
+ except libbe.util.wsgi.HandlerError as e:
+ self.failUnless(e.code == 404, e)
+ else:
+ self.fail('GET /add/ did not raise 404')
+
def test_add_post(self):
self.getURL(self.app, '/add/', method='POST',
data_dict={'id':'123456', 'parent':'abc123',
@@ -1104,134 +350,10 @@ if libbe.TESTING == True:
self.failUnless(self.status == '200 OK', self.status)
self.failUnless(self.response_headers == [],
self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.exc_info is None, self.exc_info)
# Note: other methods tested in libbe.storage.http
# TODO: integration tests on Serve?
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
-
-
-# The following certificate-creation code is adapted From pyOpenSSL's
-# examples.
-
-def get_cert_filenames(server_name, autogenerate=True, logger=None):
- """
- Generate private key and certification filenames.
- get_cert_filenames(server_name) -> (pkey_filename, cert_filename)
- """
- pkey_file = '%s.pkey' % server_name
- cert_file = '%s.cert' % server_name
- if autogenerate == True:
- for file in [pkey_file, cert_file]:
- if not os.path.exists(file):
- make_certs(server_name, logger)
- return (pkey_file, cert_file)
-
-def createKeyPair(type, bits):
- """Create a public/private key pair.
-
- Returns the public/private key pair in a PKey object.
-
- Parameters
- ----------
- type : TYPE_RSA or TYPE_DSA
- Key type.
- bits : int
- Number of bits to use in the key.
- """
- pkey = OpenSSL.crypto.PKey()
- pkey.generate_key(type, bits)
- return pkey
-
-def createCertRequest(pkey, digest="md5", **name):
- """Create a certificate request.
-
- Returns the certificate request in an X509Req object.
-
- Parameters
- ----------
- pkey : PKey
- The key to associate with the request.
- digest : "md5" or ?
- Digestion method to use for signing, default is "md5",
- `**name` :
- The name of the subject of the request, possible.
- Arguments are:
-
- ============ ========================
- C Country name
- ST State or province name
- L Locality name
- O Organization name
- OU Organizational unit name
- CN Common name
- emailAddress E-mail address
- ============ ========================
- """
- req = OpenSSL.crypto.X509Req()
- subj = req.get_subject()
-
- for (key,value) in name.items():
- setattr(subj, key, value)
-
- req.set_pubkey(pkey)
- req.sign(pkey, digest)
- return req
-
-def createCertificate(req, (issuerCert, issuerKey), serial, (notBefore, notAfter), digest="md5"):
- """Generate a certificate given a certificate request.
-
- Returns the signed certificate in an X509 object.
-
- Parameters
- ----------
- req :
- Certificate reqeust to use
- issuerCert :
- The certificate of the issuer
- issuerKey :
- The private key of the issuer
- serial :
- Serial number for the certificate
- notBefore :
- Timestamp (relative to now) when the certificate
- starts being valid
- notAfter :
- Timestamp (relative to now) when the certificate
- stops being valid
- digest :
- Digest method to use for signing, default is md5
- """
- cert = OpenSSL.crypto.X509()
- cert.set_serial_number(serial)
- cert.gmtime_adj_notBefore(notBefore)
- cert.gmtime_adj_notAfter(notAfter)
- cert.set_issuer(issuerCert.get_subject())
- cert.set_subject(req.get_subject())
- cert.set_pubkey(req.get_pubkey())
- cert.sign(issuerKey, digest)
- return cert
-
-def make_certs(server_name, logger=None) :
- """Generate private key and certification files.
-
- `mk_certs(server_name) -> (pkey_filename, cert_filename)`
- """
- if OpenSSL == None:
- raise libbe.command.UserError, \
- 'SSL certificate generation requires the OpenSSL module'
- pkey_file,cert_file = get_cert_filenames(
- server_name, autogenerate=False)
- if logger != None:
- logger.log(logger._server_level,
- 'Generating certificates', pkey_file, cert_file)
- cakey = createKeyPair(OpenSSL.crypto.TYPE_RSA, 1024)
- careq = createCertRequest(cakey, CN='Certificate Authority')
- cacert = createCertificate(
- careq, (careq, cakey), 0, (0, 60*60*24*365*5)) # five years
- open(pkey_file, 'w').write(OpenSSL.crypto.dump_privatekey(
- OpenSSL.crypto.FILETYPE_PEM, cakey))
- open(cert_file, 'w').write(OpenSSL.crypto.dump_certificate(
- OpenSSL.crypto.FILETYPE_PEM, cacert))
diff --git a/libbe/command/serve_commands.py b/libbe/command/serve_commands.py
index 810f698..75d9c7a 100644
--- a/libbe/command/serve_commands.py
+++ b/libbe/command/serve_commands.py
@@ -23,53 +23,26 @@ See Also
:py:meth:`be-libbe.command.base.Command._run_remote` : the associated client
"""
-import hashlib
import logging
import os.path
import posixpath
import re
-import sys
-import time
-import traceback
-import types
import urllib
import wsgiref.simple_server
import yaml
-try:
- # Python >= 2.6
- from urlparse import parse_qs
-except ImportError:
- # Python <= 2.5
- from cgi import parse_qs
-try:
- import cherrypy
- import cherrypy.wsgiserver
-except ImportError:
- cherrypy = None
-if cherrypy != None:
- try: # CherryPy >= 3.2
- import cherrypy.wsgiserver.ssl_builtin
- except ImportError: # CherryPy <= 3.1.X
- cherrypy.wsgiserver.ssl_builtin = None
-try:
- import OpenSSL
-except ImportError:
- OpenSSL = None
-
import libbe
import libbe.command
-import libbe.command.serve
-import libbe.command.util
-import libbe.util.encoding
-import libbe.util.subproc
+import libbe.command.base
+import libbe.util.wsgi
import libbe.version
-if libbe.TESTING == True:
+if libbe.TESTING:
import copy
import doctest
import StringIO
+ import sys
import unittest
import wsgiref.validate
try:
@@ -82,7 +55,8 @@ if libbe.TESTING == True:
import libbe.command.list
-class ServerApp (libbe.command.serve.WSGI_AppObject):
+class ServerApp (libbe.util.wsgi.WSGI_AppObject,
+ libbe.util.wsgi.WSGI_DataObject):
"""WSGI server for a BE Command invocation over HTTP.
RESTful_ WSGI request handler for serving the
@@ -95,39 +69,16 @@ class ServerApp (libbe.command.serve.WSGI_AppObject):
"""
server_version = "BE-command-server/" + libbe.version.version()
- def __init__(self, storage, notify=False, **kwargs):
- libbe.command.serve.WSGI_AppObject.__init__(self, **kwargs)
+ def __init__(self, storage=None, notify=False, **kwargs):
+ super(ServerApp, self).__init__(
+ urls=[
+ (r'^run/?$', self.run),
+ ],
+ **kwargs)
self.storage = storage
self.ui = libbe.command.base.UserInterface()
self.notify = notify
self.http_user_error = 418
- self.urls = [(r'^run$', self.run)]
-
- def __call__(self, environ, start_response):
- """The main WSGI application.
-
- Dispatch the current request to the functions from above and
- store the regular expression captures in the WSGI environment
- as `be-server.url_args` so that the functions from above can
- access the url placeholders.
-
- URL dispatcher from Armin Ronacher's "Getting Started with WSGI"
- http://lucumr.pocoo.org/2007/5/21/getting-started-with-wsgi
- """
- if self.logger != None:
- self.logger.log(logging.DEBUG, 'ServerApp')
- path = environ.get('PATH_INFO', '').lstrip('/')
- try:
- for regex, callback in self.urls:
- match = re.search(regex, path)
- if match is not None:
- environ['be-server.url_args'] = match.groups()
- return callback(environ, start_response)
- print('not found')
- raise libbe.command.serve._HandlerError(404, 'Not Found')
- except libbe.command.serve._HandlerError, e:
- return self.error(environ, start_response,
- e.code, e.msg, e.headers)
# handlers
def run(self, environ, start_response):
@@ -139,7 +90,7 @@ class ServerApp (libbe.command.serve.WSGI_AppObject):
try:
Class = libbe.command.get_command_class(command_name=name)
except libbe.command.UnknownCommand, e:
- raise libbe.command.serve._HandlerError(
+ raise libbe.util.wsgi.HandlerError(
self.http_user_error, 'UnknownCommand {}'.format(e))
command = Class(ui=self.ui)
self.ui.setup_command(command)
@@ -156,8 +107,9 @@ class ServerApp (libbe.command.serve.WSGI_AppObject):
def check_login(self, environ):
user = environ.get('be-auth.user', None)
- if user != None: # we're running under AuthenticationApp
+ if user is not None: # we're running under AuthenticationApp
if environ['REQUEST_METHOD'] == 'POST':
+ # TODO: better detection of commands requiring writes
if user == 'guest' or self.storage.is_writeable() == False:
raise _Unauthorized() # only non-guests allowed to write
# allow read-only commands for all users
@@ -192,7 +144,7 @@ class ServerApp (libbe.command.serve.WSGI_AppObject):
libbe.util.subproc.invoke(self.notify, stdin=message, shell=True)
-class Serve_Commands (libbe.command.Command):
+class Serve_Commands (libbe.util.wsgi.ServerCommand):
"""Serve commands over HTTP.
This allows you to run local `be` commands interfacing with remote
@@ -204,120 +156,9 @@ class Serve_Commands (libbe.command.Command):
name = 'serve-commands'
- def __init__(self, *args, **kwargs):
- libbe.command.Command.__init__(self, *args, **kwargs)
- self.options.extend([
- libbe.command.Option(name='port',
- help='Bind server to port (%default)',
- arg=libbe.command.Argument(
- name='port', metavar='INT', type='int', default=8000)),
- libbe.command.Option(name='host',
- help='Set host string (blank for localhost, %default)',
- arg=libbe.command.Argument(
- name='host', metavar='HOST', default='')),
- libbe.command.Option(name='read-only', short_name='r',
- help='Dissable operations that require writing'),
- libbe.command.Option(name='notify', short_name='n',
- help='Send notification emails for changes.',
- arg=libbe.command.Argument(
- name='notify', metavar='EMAIL-COMMAND', default=None)),
- libbe.command.Option(name='ssl', short_name='s',
- help='Use CherryPy to serve HTTPS (HTTP over SSL/TLS)'),
- libbe.command.Option(name='auth', short_name='a',
- help='Require authentication. FILE should be a file containing colon-separated UNAME:USER:sha1(PASSWORD) lines, for example: "jdoe:John Doe <jdoe@example.com>:read:d99f8e5a4b02dc25f49da2ea67c0034f61779e72"',
- arg=libbe.command.Argument(
- name='auth', metavar='FILE', default=None,
- completion_callback=libbe.command.util.complete_path)),
- ])
-
- def _run(self, **params):
- self._setup_logging()
- storage = self._get_storage()
- if params['read-only'] == True:
- writeable = storage.writeable
- storage.writeable = False
- if params['host'] == '':
- params['host'] = 'localhost'
- if params['auth'] != None:
- self._check_restricted_access(storage, params['auth'])
- users = libbe.command.serve.Users(params['auth'])
- users.load()
- app = ServerApp(
- storage=storage, notify=params['notify'], logger=self.logger)
- if params['auth'] != None:
- app = AdminApp(app, users=users, logger=self.logger)
- app = AuthenticationApp(app, realm=storage.repo,
- users=users, logger=self.logger)
- app = libbe.command.serve.UppercaseHeaderApp(app, logger=self.logger)
- server,details = self._get_server(params, app)
- details['repo'] = storage.repo
- try:
- self._start_server(params, server, details)
- except KeyboardInterrupt:
- pass
- self._stop_server(params, server)
- if params['read-only'] == True:
- storage.writeable = writeable
-
- def _setup_logging(self, log_level=logging.INFO):
- self.logger = logging.getLogger('be-serve')
- self.log_level = logging.INFO
- console = logging.StreamHandler(self.stdout)
- console.setFormatter(logging.Formatter('%(message)s'))
- self.logger.addHandler(console)
- self.logger.propagate = False
- if log_level is not None:
- console.setLevel(log_level)
- self.logger.setLevel(log_level)
-
- def _get_server(self, params, app):
- details = {'port':params['port']}
- app = libbe.command.serve.ExceptionApp(app, logger=self.logger)
- if params['ssl'] == True:
- details['protocol'] = 'HTTPS'
- if cherrypy == None:
- raise libbe.command.UserError, \
- '--ssl requires the cherrypy module'
- server = cherrypy.wsgiserver.CherryPyWSGIServer(
- (params['host'], params['port']), app)
- #server.throw_errors = True
- #server.show_tracebacks = True
- private_key,certificate = libbe.command.serve.get_cert_filenames(
- 'be-server', logger=self.logger)
- if cherrypy.wsgiserver.ssl_builtin == None:
- server.ssl_module = 'builtin'
- server.ssl_private_key = private_key
- server.ssl_certificate = certificate
- else:
- server.ssl_adapter = \
- cherrypy.wsgiserver.ssl_builtin.BuiltinSSLAdapter(
- certificate=certificate, private_key=private_key)
- details['socket-name'] = params['host']
- else:
- details['protocol'] = 'HTTP'
- server = wsgiref.simple_server.make_server(
- params['host'], params['port'], app,
- handler_class=libbe.command.serve.SilentRequestHandler)
- details['socket-name'] = server.socket.getsockname()[0]
- return (server, details)
-
- def _start_server(self, params, server, details):
- self.logger.log(self.log_level,
- 'Serving %(protocol)s on %(socket-name)s port %(port)s ...' \
- % details)
- self.logger.log(self.log_level,
- 'BE repository %(repo)s' % details)
- if params['ssl'] == True:
- server.start()
- else:
- server.serve_forever()
-
- def _stop_server(self, params, server):
- self.logger.log(self.log_level, 'Clossing server')
- if params['ssl'] == True:
- server.stop()
- else:
- server.server_close()
+ def _get_app(self, logger, storage, **kwargs):
+ return ServerApp(
+ logger=logger, storage=storage, notify=kwargs.get('notify', False))
def _long_help(self):
return """
@@ -342,15 +183,18 @@ for example::
# alias for libbe.command.base.get_command_class()
Serve_commands = Serve_Commands
-if libbe.TESTING == True:
- class ServerAppTestCase (libbe.command.serve.WSGITestCase):
+
+if libbe.TESTING:
+ class ServerAppTestCase (libbe.util.wsgi.WSGITestCase):
def setUp(self):
- libbe.command.serve.WSGITestCase.setUp(self)
+ libbe.util.wsgi.WSGITestCase.setUp(self)
self.bd = libbe.bugdir.SimpleBugDir(memory=False)
self.app = ServerApp(self.bd.storage, logger=self.logger)
+
def tearDown(self):
self.bd.cleanup()
- libbe.command.serve.WSGITestCase.tearDown(self)
+ libbe.util.wsgi.WSGITestCase.tearDown(self)
+
def test_run_list(self):
list = libbe.command.list.List()
params = list._parse_options_args()
diff --git a/libbe/storage/http.py b/libbe/storage/http.py
index 594fd0d..511e63f 100644
--- a/libbe/storage/http.py
+++ b/libbe/storage/http.py
@@ -45,6 +45,7 @@ if TESTING == True:
import libbe.bugdir
import libbe.command.serve
+ import libbe.util.http
class HTTP (base.VersionedStorage):
@@ -91,7 +92,8 @@ class HTTP (base.VersionedStorage):
headers.append(('Authorization','Basic %s' % \
('%s:%s' % (self.uname, self.password)).encode('base64')))
return libbe.util.http.get_post_url(
- url, get, data_dict, headers, agent=self.user_agent)
+ url, get, data_dict=data_dict, headers=headers,
+ agent=self.user_agent)
def storage_version(self, revision=None):
"""Return the storage format for this backend."""
@@ -303,7 +305,11 @@ if TESTING == True:
env['QUERY_STRING'] = enc_data
for key,value in environ.items():
env[key] = value
- return ''.join(app(env, self.start_response))
+ try:
+ result = app(env, self.start_response)
+ except libbe.util.wsgi.HandlerError as e:
+ raise libbe.util.http.HTTPError(error=e, url=path, msg=str(e))
+ return ''.join(result)
def start_response(self, status, response_headers, exc_info=None):
self.status = status
self.response_headers = response_headers
diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py
new file mode 100644
index 0000000..41d625c
--- /dev/null
+++ b/libbe/util/wsgi.py
@@ -0,0 +1,962 @@
+# Copyright
+
+"""Utilities for building WSGI commands.
+
+See Also
+--------
+:py:mod:`libbe.command.serve` and :py:mod:`libbe.command.serve_commands`.
+"""
+
+import hashlib
+import logging
+import re
+import sys
+import time
+import traceback
+import types
+import urllib
+import urlparse
+import wsgiref.simple_server
+
+try:
+ import cherrypy
+ import cherrypy.wsgiserver
+except ImportError:
+ cherrypy = None
+if cherrypy != None:
+ try: # CherryPy >= 3.2
+ import cherrypy.wsgiserver.ssl_builtin
+ except ImportError: # CherryPy <= 3.1.X
+ cherrypy.wsgiserver.ssl_builtin = None
+
+try:
+ import OpenSSL
+except ImportError:
+ OpenSSL = None
+
+
+import libbe.util.encoding
+import libbe.command
+import libbe.command.base
+
+if libbe.TESTING == True:
+ import copy
+ import doctest
+ import StringIO
+ import unittest
+ import wsgiref.validate
+ try:
+ import cherrypy.test.webtest
+ cherrypy_test_webtest = True
+ except ImportError:
+ cherrypy_test_webtest = None
+
+
+class HandlerError (Exception):
+ def __init__(self, code, msg, headers=[]):
+ super(HandlerError, self).__init__('{} {}'.format(code, msg))
+ self.code = code
+ self.msg = msg
+ self.headers = headers
+
+
+class Unauthenticated (HandlerError):
+ def __init__(self, realm, msg='User Not Authenticated', headers=[]):
+ super(Unauthenticated, self).__init__(401, msg, headers+[
+ ('WWW-Authenticate','Basic realm="{}"'.format(realm))])
+
+
+class Unauthorized (HandlerError):
+ def __init__(self, msg='User Not Authorized', headers=[]):
+ super(Unauthorized, self).__init__(403, msg, headers)
+
+
+class User (object):
+ def __init__(self, uname=None, name=None, passhash=None, password=None):
+ self.uname = uname
+ self.name = name
+ self.passhash = passhash
+ if passhash is None:
+ if password is not None:
+ self.passhash = self.hash(password)
+ else:
+ assert password is None, (
+ 'Redundant password {} with passhash {}'.format(
+ password, passhash))
+ self.users = None
+
+ def from_string(self, string):
+ string = string.strip()
+ fields = string.split(':')
+ if len(fields) != 3:
+ raise ValueError, '{}!=3 fields in "{}"'.format(
+ len(fields), string)
+ self.uname,self.name,self.passhash = fields
+
+ def __str__(self):
+ return ':'.join([self.uname, self.name, self.passhash])
+
+ def __cmp__(self, other):
+ return cmp(self.uname, other.uname)
+
+ def hash(self, password):
+ return hashlib.sha1(password).hexdigest()
+
+ def valid_login(self, password):
+ if self.hash(password) == self.passhash:
+ return True
+ return False
+
+ def set_name(self, name):
+ self._set_property('name', name)
+
+ def set_password(self, password):
+ self._set_property('passhash', self.hash(password))
+
+ def _set_property(self, property, value):
+ if self.uname == 'guest':
+ raise Unauthorized(
+ 'guest user not allowed to change {}'.format(property))
+ if (getattr(self, property) != value and
+ self.users is not None):
+ self.users.changed = True
+ setattr(self, property, value)
+
+
+class Users (dict):
+ def __init__(self, filename=None):
+ super(Users, self).__init__()
+ self.filename = filename
+ self.changed = False
+
+ def load(self):
+ if self.filename is None:
+ return
+ user_file = libbe.util.encoding.get_file_contents(
+ self.filename, decode=True)
+ self.clear()
+ for line in user_file.splitlines():
+ user = User()
+ user.from_string(line)
+ self.add_user(user)
+
+ def save(self):
+ if self.filename is not None and self.changed:
+ lines = []
+ for user in sorted(self.users):
+ lines.append(str(user))
+ libbe.util.encoding.set_file_contents(self.filename)
+ self.changed = False
+
+ def add_user(self, user):
+ assert user.users is None, user.users
+ user.users = self
+ self[user.uname] = user
+
+ def valid_login(self, uname, password):
+ return (uname in self and
+ self[uname].valid_login(password))
+
+
+class WSGI_Object (object):
+ """Utility class for WGSI clients and middleware.
+
+ For details on WGSI, see `PEP 333`_
+
+ .. _PEP 333: http://www.python.org/dev/peps/pep-0333/
+ """
+ def __init__(self, logger=None, log_level=logging.INFO, log_format=None):
+ self.logger = logger
+ self.log_level = log_level
+ if log_format is None:
+ self.log_format = (
+ '{REMOTE_ADDR} - {REMOTE_USER} [{time}] '
+ '"{REQUEST_METHOD} {REQUEST_URI} {HTTP_VERSION}" '
+ '{status} {bytes} "{HTTP_REFERER}" "{HTTP_USER_AGENT}"')
+ else:
+ self.log_format = log_format
+
+ def __call__(self, environ, start_response):
+ if self.logger is not None:
+ self.logger.log(
+ logging.DEBUG, 'entering {}'.format(self.__class__.__name__))
+ ret = self._call(environ, start_response)
+ if self.logger is not None:
+ self.logger.log(
+ logging.DEBUG, 'leaving {}'.format(self.__class__.__name__))
+ return ret
+
+ def _call(self, environ, start_response):
+ """The main WSGI entry point."""
+ raise NotImplementedError
+ # start_response() is a callback for setting response headers
+ # start_response(status, response_headers, exc_info=None)
+ # status is an HTTP status string (e.g., "200 OK").
+ # response_headers is a list of 2-tuples, the HTTP headers in
+ # key-value format.
+ # exc_info is used in exception handling.
+ #
+ # The application function then returns an iterable of body chunks.
+
+ def error(self, environ, start_response, error, message, headers=[]):
+ """Make it easy to call start_response for errors."""
+ response = '{} {}'.format(error, message)
+ self.log_request(environ, status=response, bytes=len(message))
+ start_response(response,
+ [('Content-Type', 'text/plain')]+headers)
+ return [message]
+
+ def log_request(self, environ, status='-1 OK', bytes=-1):
+ if self.logger is None or self.logger.level > self.log_level:
+ return
+ req_uri = urllib.quote(environ.get('SCRIPT_NAME', '')
+ + environ.get('PATH_INFO', ''))
+ if environ.get('QUERY_STRING'):
+ req_uri += '?' + environ['QUERY_STRING']
+ start = time.localtime()
+ if time.daylight:
+ offset = time.altzone / 60 / 60 * -100
+ else:
+ offset = time.timezone / 60 / 60 * -100
+ if offset >= 0:
+ offset = '+{:04d}'.format(offset)
+ elif offset < 0:
+ offset = '{:04d}'.format(offset)
+ d = {
+ 'REMOTE_ADDR': environ.get('REMOTE_ADDR', '-'),
+ 'REMOTE_USER': environ.get('REMOTE_USER', '-'),
+ 'REQUEST_METHOD': environ['REQUEST_METHOD'],
+ 'REQUEST_URI': req_uri,
+ 'HTTP_VERSION': environ.get('SERVER_PROTOCOL'),
+ 'time': time.strftime('%d/%b/%Y:%H:%M:%S ', start) + offset,
+ 'status': status.split(None, 1)[0],
+ 'bytes': bytes,
+ 'HTTP_REFERER': environ.get('HTTP_REFERER', '-'),
+ 'HTTP_USER_AGENT': environ.get('HTTP_USER_AGENT', '-'),
+ }
+ self.logger.log(self.log_level, self.log_format.format(**d))
+
+
+class WSGI_Middleware (WSGI_Object):
+ """Utility class for WGSI middleware.
+ """
+ def __init__(self, app, *args, **kwargs):
+ super(WSGI_Middleware, self).__init__(*args, **kwargs)
+ self.app = app
+
+ def _call(self, environ, start_response):
+ return self.app(environ, start_response)
+
+
+class ExceptionApp (WSGI_Middleware):
+ """Some servers (e.g. cherrypy) eat app-raised exceptions.
+
+ Work around that by logging tracebacks by hand.
+ """
+ def _call(self, environ, start_response):
+ try:
+ return self.app(environ, start_response)
+ except Exception, e:
+ etype,value,tb = sys.exc_info()
+ trace = ''.join(
+ traceback.format_exception(etype, value, tb, None))
+ self.logger.log(self.log_level, trace)
+ raise
+
+
+class UppercaseHeaderApp (WSGI_Middleware):
+ """WSGI middleware that uppercases incoming HTTP headers.
+
+ From PEP 333, `The start_response() Callable`_ :
+
+ A reminder for server/gateway authors: HTTP
+ header names are case-insensitive, so be sure
+ to take that into consideration when examining
+ application-supplied headers!
+
+ .. _The start_response() Callable:
+ http://www.python.org/dev/peps/pep-0333/#id20
+ """
+ def _call(self, environ, start_response):
+ for key,value in environ.items():
+ if key.startswith('HTTP_'):
+ uppercase = key.upper()
+ if uppercase != key:
+ environ[uppercase] = environ.pop(key)
+ return self.app(environ, start_response)
+
+
+class AuthenticationApp (WSGI_Middleware):
+ """WSGI middleware for handling user authentication.
+ """
+ def __init__(self, realm, setting='be-auth', users=None, *args, **kwargs):
+ super(AuthenticationApp, self).__init__(*args, **kwargs)
+ self.realm = realm
+ self.setting = setting
+ self.users = users
+
+ def _call(self, environ, start_response):
+ environ['{}.realm'.format(self.setting)] = self.realm
+ try:
+ username = self.authenticate(environ)
+ environ['{}.user'.format(self.setting)] = username
+ environ['{}.user.name'.format(self.setting)] = self.users[username].name
+ return self.app(environ, start_response)
+ except Unauthorized, e:
+ return self.error(environ, start_response,
+ e.code, e.msg, e.headers)
+
+ def authenticate(self, environ):
+ """Handle user-authentication sent in the "Authorization" header.
+
+ This function implements ``Basic`` authentication as described in
+ HTTP/1.0 specification [1]_ . Do not use this module unless you
+ are using SSL, as it transmits unencrypted passwords.
+
+ .. [1] http://www.w3.org/Protocols/HTTP/1.0/draft-ietf-http-spec.html#BasicAA
+
+ Examples
+ --------
+
+ >>> users = Users()
+ >>> users.add_user(User('Aladdin', 'Big Al', password='open sesame'))
+ >>> app = AuthenticationApp(app=None, realm='Dummy Realm', users=users)
+ >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='})
+ 'Aladdin'
+ >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic AAAAAAAAAAAAAAAAAAAAAAAAAA=='})
+
+ Notes
+ -----
+
+ Code based on authkit/authenticate/basic.py
+ (c) 2005 Clark C. Evans.
+ Released under the MIT License:
+ http://www.opensource.org/licenses/mit-license.php
+ """
+ authorization = environ.get('HTTP_AUTHORIZATION', None)
+ if authorization is None:
+ raise Unauthorized('Authorization required')
+ try:
+ authmeth,auth = authorization.split(' ', 1)
+ except ValueError:
+ return None
+ if 'basic' != authmeth.lower():
+ return None # non-basic HTTP authorization not implemented
+ auth = auth.strip().decode('base64')
+ try:
+ username,password = auth.split(':', 1)
+ except ValueError:
+ return None
+ if self.authfunc(environ, username, password):
+ return username
+
+ def authfunc(self, environ, username, password):
+ if not username in self.users:
+ return False
+ if self.users[username].valid_login(password):
+ if self.logger is not None:
+ self.logger.log(self.log_level,
+ 'Authenticated {}'.format(self.users[username].name))
+ return True
+ return False
+
+
+class WSGI_DataObject (WSGI_Object):
+ """Useful WSGI utilities for handling data (POST, QUERY) and
+ returning responses.
+ """
+ def __init__(self, *args, **kwargs):
+ super(WSGI_DataObject, self).__init__(*args, **kwargs)
+
+ # Maximum input we will accept when REQUEST_METHOD is POST
+ # 0 ==> unlimited input
+ self.maxlen = 0
+
+ def ok_response(self, environ, start_response, content,
+ content_type='application/octet-stream',
+ headers=[]):
+ if content is None:
+ start_response('200 OK', [])
+ return []
+ if type(content) is types.UnicodeType:
+ content = content.encode('utf-8')
+ for i,header in enumerate(headers):
+ header_name,header_value = header
+ if type(header_value) == types.UnicodeType:
+ headers[i] = (header_name, header_value.encode('ISO-8859-1'))
+ response = '200 OK'
+ content_length = len(content)
+ self.log_request(environ, status=response, bytes=content_length)
+ start_response(response, [
+ ('Content-Type', content_type),
+ ('Content-Length', str(content_length)),
+ ]+headers)
+ if self.is_head(environ):
+ return []
+ return [content]
+
+ def query_data(self, environ):
+ if not environ['REQUEST_METHOD'] in ['GET', 'HEAD']:
+ raise HandlerError(404, 'Not Found')
+ return self._parse_query(environ.get('QUERY_STRING', ''))
+
+ def _parse_query(self, query):
+ if len(query) == 0:
+ return {}
+ data = urlparse.parse_qs(
+ query, keep_blank_values=True, strict_parsing=True)
+ for k,v in data.items():
+ if len(v) == 1:
+ data[k] = v[0]
+ return data
+
+ def post_data(self, environ):
+ if environ['REQUEST_METHOD'] != 'POST':
+ raise HandlerError(404, 'Not Found')
+ post_data = self._read_post_data(environ)
+ return self._parse_post(post_data)
+
+ def _parse_post(self, post):
+ return self._parse_query(post)
+
+ def _read_post_data(self, environ):
+ try:
+ clen = int(environ.get('CONTENT_LENGTH', '0'))
+ except ValueError:
+ clen = 0
+ if clen != 0:
+ if self.maxlen > 0 and clen > self.maxlen:
+ raise ValueError, 'Maximum content length exceeded'
+ return environ['wsgi.input'].read(clen)
+ return ''
+
+ def data_get_string(self, data, key, default=None, source='query'):
+ if not key in data or data[key] in [None, 'None']:
+ if default == HandlerError:
+ raise HandlerError(
+ 406, 'Missing {} key {}'.format(source, key))
+ return default
+ return data[key]
+
+ def data_get_id(self, data, key='id', default=HandlerError,
+ source='query'):
+ return self.data_get_string(data, key, default, source)
+
+ def data_get_boolean(self, data, key, default=False, source='query'):
+ val = self.data_get_string(data, key, default, source)
+ if val == 'True':
+ return True
+ elif val == 'False':
+ return False
+ return val
+
+ def is_head(self, environ):
+ return environ['REQUEST_METHOD'] == 'HEAD'
+
+
+class WSGI_AppObject (WSGI_Object):
+ """Useful WSGI utilities for handling URL delegation.
+ """
+ def __init__(self, urls=tuple(), default_handler=None, setting='be-server',
+ *args, **kwargs):
+ super(WSGI_AppObject, self).__init__(*args, **kwargs)
+ self.urls = [(re.compile(regexp),callback) for regexp,callback in urls]
+ self.default_callback = default_handler
+ self.setting = setting
+
+ def _call(self, environ, start_response):
+ path = environ.get('PATH_INFO', '').lstrip('/')
+ for regexp,callback in self.urls:
+ match = regexp.match(path)
+ if match is not None:
+ setting = '{}.url_args'.format(self.setting)
+ environ[setting] = match.groups()
+ return callback(environ, start_response)
+ if self.default_handler is None:
+ raise HandlerError(404, 'Not Found')
+ return self.default_handler(environ, start_response)
+
+
+class AdminApp (WSGI_AppObject, WSGI_DataObject, WSGI_Middleware):
+ """WSGI middleware for managing users
+
+ Changing passwords, usernames, etc.
+ """
+ def __init__(self, users=None, setting='be-auth', *args, **kwargs):
+ handler = ('^admin/?', self.admin)
+ if 'urls' not in kwargs:
+ kwargs['urls'] = [handler]
+ else:
+ kwargs.urls.append(handler)
+ super(AdminApp, self).__init__(*args, **kwargs)
+ self.users = users
+ self.setting = setting
+
+ def admin(self, environ, start_response):
+ if not '{}.user'.format(self.setting) in environ:
+ realm = envirion.get('{}.realm'.format(self.setting))
+ raise Unauthenticated(realm=realm)
+ uname = environ.get('{}.user'.format(self.setting))
+ user = self.users[uname]
+ data = self.post_data(environ)
+ source = 'post'
+ name = self.data_get_string(
+ data, 'name', default=None, source=source)
+ if name is not None:
+ self.users[uname].set_name(name)
+ password = self.data_get_string(
+ data, 'password', default=None, source=source)
+ if password is not None:
+ self.users[uname].set_password(password)
+ self.users.save()
+ return self.ok_response(environ, start_response, None)
+
+
+class SilentRequestHandler (wsgiref.simple_server.WSGIRequestHandler):
+ def log_message(self, format, *args):
+ pass
+
+
+class ServerCommand (libbe.command.base.Command):
+ """Serve something over HTTP.
+
+ Use this as a base class to build commands that serve a web interface.
+ """
+ def __init__(self, *args, **kwargs):
+ super(ServerCommand, self).__init__(*args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='port',
+ help='Bind server to port (%default)',
+ arg=libbe.command.Argument(
+ name='port', metavar='INT', type='int', default=8000)),
+ libbe.command.Option(name='host',
+ help='Set host string (blank for localhost, %default)',
+ arg=libbe.command.Argument(
+ name='host', metavar='HOST', default='localhost')),
+ libbe.command.Option(name='read-only', short_name='r',
+ help='Dissable operations that require writing'),
+ libbe.command.Option(name='notify', short_name='n',
+ help='Send notification emails for changes.',
+ arg=libbe.command.Argument(
+ name='notify', metavar='EMAIL-COMMAND', default=None)),
+ libbe.command.Option(name='ssl', short_name='s',
+ help='Use CherryPy to serve HTTPS (HTTP over SSL/TLS)'),
+ libbe.command.Option(name='auth', short_name='a',
+ help=('Require authentication. FILE should be a file '
+ 'containing colon-separated '
+ 'UNAME:USER:sha1(PASSWORD) lines, for example: '
+ '"jdoe:John Doe <jdoe@example.com>:'
+ 'd99f8e5a4b02dc25f49da2ea67c0034f61779e72"'),
+ arg=libbe.command.Argument(
+ name='auth', metavar='FILE', default=None,
+ completion_callback=libbe.command.util.complete_path)),
+ ])
+
+ def _run(self, **params):
+ self._setup_logging()
+ storage = self._get_storage()
+ if params['read-only']:
+ writeable = storage.writeable
+ storage.writeable = False
+ if params['auth']:
+ self._check_restricted_access(storage, params['auth'])
+ users = Users(params['auth'])
+ users.load()
+ app = self._get_app(logger=self.logger, storage=storage, **params)
+ if params['auth']:
+ app = AdminApp(app, users=users, logger=self.logger)
+ app = AuthenticationApp(app, realm=storage.repo,
+ users=users, logger=self.logger)
+ app = UppercaseHeaderApp(app, logger=self.logger)
+ server,details = self._get_server(params, app)
+ details['repo'] = storage.repo
+ try:
+ self._start_server(params, server, details)
+ except KeyboardInterrupt:
+ pass
+ self._stop_server(params, server)
+ if params['read-only']:
+ storage.writeable = writeable
+
+ def _get_app(self, logger, storage, **kwargs):
+ raise NotImplementedError()
+
+ def _setup_logging(self, log_level=logging.INFO):
+ self.logger = logging.getLogger('be-{}'.format(self.name))
+ self.log_level = logging.INFO
+ console = logging.StreamHandler(self.stdout)
+ console.setFormatter(logging.Formatter('%(message)s'))
+ self.logger.addHandler(console)
+ self.logger.propagate = False
+ if log_level is not None:
+ console.setLevel(log_level)
+ self.logger.setLevel(log_level)
+
+ def _get_server(self, params, app):
+ details = {
+ 'socket-name':params['host'],
+ 'port':params['port'],
+ }
+ app = ExceptionApp(app, logger=self.logger)
+ if params['ssl'] == True:
+ details['protocol'] = 'HTTPS'
+ if cherrypy == None:
+ raise libbe.command.UserError(
+ '--ssl requires the cherrypy module')
+ server = cherrypy.wsgiserver.CherryPyWSGIServer(
+ (params['host'], params['port']), app)
+ #server.throw_errors = True
+ #server.show_tracebacks = True
+ private_key,certificate = _get_cert_filenames(
+ 'be-server', logger=self.logger)
+ if cherrypy.wsgiserver.ssl_builtin == None:
+ server.ssl_module = 'builtin'
+ server.ssl_private_key = private_key
+ server.ssl_certificate = certificate
+ else:
+ server.ssl_adapter = (
+ cherrypy.wsgiserver.ssl_builtin.BuiltinSSLAdapter(
+ certificate=certificate, private_key=private_key))
+ else:
+ details['protocol'] = 'HTTP'
+ server = wsgiref.simple_server.make_server(
+ params['host'], params['port'], app,
+ handler_class=SilentRequestHandler)
+ return (server, details)
+
+ def _start_server(self, params, server, details):
+ self.logger.log(self.log_level,
+ ('Serving {protocol} on {socket-name} port {port} ...\n'
+ 'BE repository {repo}').format(**details))
+ if params['ssl']:
+ server.start()
+ else:
+ server.serve_forever()
+
+ def _stop_server(self, params, server):
+ self.logger.log(self.log_level, 'Clossing server')
+ if params['ssl'] == True:
+ server.stop()
+ else:
+ server.server_close()
+
+ def _long_help(self):
+ raise NotImplementedError()
+
+
+if libbe.TESTING:
+ class WSGITestCase (unittest.TestCase):
+ def setUp(self):
+ self.logstream = StringIO.StringIO()
+ self.logger = logging.getLogger('be-wsgi-test')
+ console = logging.StreamHandler(self.logstream)
+ console.setFormatter(logging.Formatter('%(message)s'))
+ self.logger.addHandler(console)
+ self.logger.propagate = False
+ console.setLevel(logging.INFO)
+ self.logger.setLevel(logging.INFO)
+ self.default_environ = { # required by PEP 333
+ 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
+ 'REMOTE_ADDR': '192.168.0.123',
+ 'SCRIPT_NAME':'',
+ 'PATH_INFO': '',
+ #'QUERY_STRING':'', # may be empty or absent
+ #'CONTENT_TYPE':'', # may be empty or absent
+ #'CONTENT_LENGTH':'', # may be empty or absent
+ 'SERVER_NAME':'example.com',
+ 'SERVER_PORT':'80',
+ 'SERVER_PROTOCOL':'HTTP/1.1',
+ 'wsgi.version':(1,0),
+ 'wsgi.url_scheme':'http',
+ 'wsgi.input':StringIO.StringIO(),
+ 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.multithread':False,
+ 'wsgi.multiprocess':False,
+ 'wsgi.run_once':False,
+ }
+
+ def getURL(self, app, path='/', method='GET', data=None,
+ data_dict=None, scheme='http', environ={}):
+ env = copy.copy(self.default_environ)
+ env['PATH_INFO'] = path
+ env['REQUEST_METHOD'] = method
+ env['scheme'] = scheme
+ if data_dict is not None:
+ assert data is None, (data, data_dict)
+ data = urllib.urlencode(data_dict)
+ if data is not None:
+ if data_dict is None:
+ assert method == 'POST', (method, data)
+ if method == 'POST':
+ env['CONTENT_LENGTH'] = len(data)
+ env['wsgi.input'] = StringIO.StringIO(data)
+ else:
+ assert method in ['GET', 'HEAD'], method
+ env['QUERY_STRING'] = data
+ for key,value in environ.items():
+ env[key] = value
+ return ''.join(app(env, self.start_response))
+
+ def start_response(self, status, response_headers, exc_info=None):
+ self.status = status
+ self.response_headers = response_headers
+ self.exc_info = exc_info
+
+
+ class WSGI_ObjectTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ self.app = WSGI_Object(self.logger)
+
+ def test_error(self):
+ contents = self.app.error(
+ environ=self.default_environ,
+ start_response=self.start_response,
+ error=123,
+ message='Dummy Error',
+ headers=[('X-Dummy-Header','Dummy Value')])
+ self.failUnless(contents == ['Dummy Error'], contents)
+ self.failUnless(self.status == '123 Dummy Error', self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type','text/plain'),
+ ('X-Dummy-Header','Dummy Value')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+
+ def test_log_request(self):
+ self.app.log_request(
+ environ=self.default_environ, status='-1 OK', bytes=123)
+ log = self.logstream.getvalue()
+ self.failUnless(log.startswith('192.168.0.123 -'), log)
+
+
+ class ExceptionAppTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ def child_app(environ, start_response):
+ raise ValueError('Dummy Error')
+ self.app = ExceptionApp(child_app, self.logger)
+
+ def test_traceback(self):
+ try:
+ self.getURL(self.app)
+ except ValueError, e:
+ pass
+ log = self.logstream.getvalue()
+ self.failUnless(log.startswith('Traceback'), log)
+ self.failUnless('child_app' in log, log)
+ self.failUnless('ValueError: Dummy Error' in log, log)
+
+
+ class AdminAppTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ self.users = Users()
+ self.users.add_user(
+ User('Aladdin', 'Big Al', password='open sesame'))
+ self.users.add_user(
+ User('guest', 'Guest', password='guestpass'))
+ def child_app(environ, start_response):
+ pass
+ app = AdminApp(
+ app=child_app, users=self.users, logger=self.logger)
+ app = AuthenticationApp(
+ app=app, realm='Dummy Realm', users=self.users,
+ logger=self.logger)
+ self.app = UppercaseHeaderApp(app=app, logger=self.logger)
+
+ def basic_auth(self, uname, password):
+ """HTTP basic authorization string"""
+ return 'Basic {}'.format(
+ '{}:{}'.format(uname, password).encode('base64'))
+
+ def test_new_name(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data_dict={'name':'Prince Al'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('Aladdin', 'open sesame')})
+ self.failUnless(self.status == '200 OK', self.status)
+ self.failUnless(self.response_headers == [],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['Aladdin'].name == 'Prince Al',
+ self.users['Aladdin'].name)
+ self.failUnless(self.users.changed == True,
+ self.users.changed)
+
+ def test_new_password(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data_dict={'password':'New Pass'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('Aladdin', 'open sesame')})
+ self.failUnless(self.status == '200 OK', self.status)
+ self.failUnless(self.response_headers == [],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless((self.users['Aladdin'].passhash ==
+ self.users['Aladdin'].hash('New Pass')),
+ self.users['Aladdin'].passhash)
+ self.failUnless(self.users.changed == True,
+ self.users.changed)
+
+ def test_guest_name(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data_dict={'name':'SPAM'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('guest', 'guestpass')})
+ self.failUnless(self.status.startswith('403 '), self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type', 'text/plain')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['guest'].name == 'Guest',
+ self.users['guest'].name)
+ self.failUnless(self.users.changed == False,
+ self.users.changed)
+
+ def test_guest_password(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data_dict={'password':'SPAM'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('guest', 'guestpass')})
+ self.failUnless(self.status.startswith('403 '), self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type', 'text/plain')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['guest'].name == 'Guest',
+ self.users['guest'].name)
+ self.failUnless(self.users.changed == False,
+ self.users.changed)
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
+
+
+# The following certificate-creation code is adapted From pyOpenSSL's
+# examples.
+
+def _get_cert_filenames(server_name, autogenerate=True, logger=None):
+ """
+ Generate private key and certification filenames.
+ get_cert_filenames(server_name) -> (pkey_filename, cert_filename)
+ """
+ pkey_file = '{}.pkey'.format(server_name)
+ cert_file = '{}.cert'.format(server_name)
+ if autogenerate:
+ for file in [pkey_file, cert_file]:
+ if not os.path.exists(file):
+ _make_certs(server_name, logger)
+ return (pkey_file, cert_file)
+
+def _create_key_pair(type, bits):
+ """Create a public/private key pair.
+
+ Returns the public/private key pair in a PKey object.
+
+ Parameters
+ ----------
+ type : TYPE_RSA or TYPE_DSA
+ Key type.
+ bits : int
+ Number of bits to use in the key.
+ """
+ pkey = OpenSSL.crypto.PKey()
+ pkey.generate_key(type, bits)
+ return pkey
+
+def _create_cert_request(pkey, digest="md5", **name):
+ """Create a certificate request.
+
+ Returns the certificate request in an X509Req object.
+
+ Parameters
+ ----------
+ pkey : PKey
+ The key to associate with the request.
+ digest : "md5" or ?
+ Digestion method to use for signing, default is "md5",
+ `**name` :
+ The name of the subject of the request, possible.
+ Arguments are:
+
+ ============ ========================
+ C Country name
+ ST State or province name
+ L Locality name
+ O Organization name
+ OU Organizational unit name
+ CN Common name
+ emailAddress E-mail address
+ ============ ========================
+ """
+ req = OpenSSL.crypto.X509Req()
+ subj = req.get_subject()
+
+ for (key,value) in name.items():
+ setattr(subj, key, value)
+
+ req.set_pubkey(pkey)
+ req.sign(pkey, digest)
+ return req
+
+def _create_certificate(req, (issuerCert, issuerKey), serial,
+ (notBefore, notAfter), digest='md5'):
+ """Generate a certificate given a certificate request.
+
+ Returns the signed certificate in an X509 object.
+
+ Parameters
+ ----------
+ req :
+ Certificate reqeust to use
+ issuerCert :
+ The certificate of the issuer
+ issuerKey :
+ The private key of the issuer
+ serial :
+ Serial number for the certificate
+ notBefore :
+ Timestamp (relative to now) when the certificate
+ starts being valid
+ notAfter :
+ Timestamp (relative to now) when the certificate
+ stops being valid
+ digest :
+ Digest method to use for signing, default is md5
+ """
+ cert = OpenSSL.crypto.X509()
+ cert.set_serial_number(serial)
+ cert.gmtime_adj_notBefore(notBefore)
+ cert.gmtime_adj_notAfter(notAfter)
+ cert.set_issuer(issuerCert.get_subject())
+ cert.set_subject(req.get_subject())
+ cert.set_pubkey(req.get_pubkey())
+ cert.sign(issuerKey, digest)
+ return cert
+
+def _make_certs(server_name, logger=None) :
+ """Generate private key and certification files.
+
+ `mk_certs(server_name) -> (pkey_filename, cert_filename)`
+ """
+ if OpenSSL == None:
+ raise libbe.command.UserError(
+ 'SSL certificate generation requires the OpenSSL module')
+ pkey_file,cert_file = get_cert_filenames(
+ server_name, autogenerate=False)
+ if logger != None:
+ logger.log(logger._server_level,
+ 'Generating certificates', pkey_file, cert_file)
+ cakey = _create_key_pair(OpenSSL.crypto.TYPE_RSA, 1024)
+ careq = _create_cert_request(cakey, CN='Certificate Authority')
+ cacert = _create_certificate(
+ careq, (careq, cakey), 0, (0, 60*60*24*365*5)) # five years
+ open(pkey_file, 'w').write(OpenSSL.crypto.dump_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, cakey))
+ open(cert_file, 'w').write(OpenSSL.crypto.dump_certificate(
+ OpenSSL.crypto.FILETYPE_PEM, cacert))