aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
Diffstat (limited to 'libbe')
-rw-r--r--libbe/command/serve.py911
-rw-r--r--libbe/storage/__init__.py2
-rw-r--r--libbe/storage/http.py177
3 files changed, 894 insertions, 196 deletions
diff --git a/libbe/command/serve.py b/libbe/command/serve.py
index 608e623..43e07cc 100644
--- a/libbe/command/serve.py
+++ b/libbe/command/serve.py
@@ -14,11 +14,16 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import hashlib
+import logging
import os.path
import posixpath
import re
import sys
+import time
+import traceback
import types
+import urllib
import wsgiref.simple_server
try:
# Python >= 2.6
@@ -31,10 +36,11 @@ try:
import cherrypy.wsgiserver
except ImportError:
cherrypy = None
-try: # CherryPy >= 3.2
- import cherrypy.wsgiserver.ssl_builtin
-except ImportError: # CherryPy <= 3.1.X
- cherrypy.wsgiserver.ssl_builtin = None
+if cherrypy != None:
+ try: # CherryPy >= 3.2
+ import cherrypy.wsgiserver.ssl_builtin
+ except ImportError: # CherryPy <= 3.1.X
+ cherrypy.wsgiserver.ssl_builtin = None
try:
import OpenSSL
except ImportError:
@@ -43,55 +49,459 @@ except ImportError:
import libbe
import libbe.command
import libbe.command.util
+import libbe.util.encoding
import libbe.version
if libbe.TESTING == True:
+ import copy
import doctest
import StringIO
import unittest
import wsgiref.validate
+ try:
+ import cherrypy.test.webtest
+ cherrypy_test_webtest = True
+ except ImportError:
+ cherrypy_test_webtest = None
import libbe.bugdir
-
+
class _HandlerError (Exception):
- def __init__(self, code, msg):
+ def __init__(self, code, msg, headers=[]):
Exception.__init__(self, '%d %s' % (code, msg))
self.code = code
self.msg = msg
+ self.headers = headers
-class ServerApp (object):
- """Simple WSGI request handler for serving the
- libbe.storage.http.HTTP backend with GET, POST, and HEAD commands.
+class _Unauthenticated (_HandlerError):
+ def __init__(self, realm, msg='User Not Authenticated', headers=[]):
+ _HandlerError.__init__(self, 401, msg, headers+[
+ ('WWW-Authenticate','Basic realm="%s"' % realm)])
- This serves files from a connected storage instance, usually
- a VCS-based repository located on the local machine.
+class _Unauthorized (_HandlerError):
+ def __init__(self, msg='User Not Authorized', headers=[]):
+ _HandlerError.__init__(self, 403, msg, headers)
- The GET and HEAD requests are identical except that the HEAD
- request omits the actual content of the file.
+class User (object):
+ def __init__(self, uname=None, name=None, passhash=None, password=None):
+ self.uname = uname
+ self.name = name
+ self.passhash = passhash
+ if passhash == None:
+ if password != None:
+ self.passhash = self.hash(password)
+ else:
+ assert password == None, \
+ 'Redundant password %s with passhash %s' % (password, passhash)
+ self.users = None
+ def from_string(self, string):
+ string = string.strip()
+ fields = string.split(':')
+ if len(fields) != 3:
+ raise ValueError, '%d!=3 fields in "%s"' % (len(fields), string)
+ self.uname,self.name,self.passhash = fields
+ def __str__(self):
+ return ':'.join([self.uname, self.name, self.passhash])
+ def __cmp__(self, other):
+ return cmp(self.uname, other.uname)
+ def hash(self, password):
+ return hashlib.sha1(password).hexdigest()
+ def valid_login(self, password):
+ if self.hash(password) == self.passhash:
+ return True
+ return False
+ def set_name(self, name):
+ self._set_property('name', name)
+ def set_password(self, password):
+ self._set_property('passhash', self.hash(password))
+ def _set_property(self, property, value):
+ if self.uname == 'guest':
+ raise _Unauthorized('guest user not allowed to change %s' % property)
+ if getattr(self, property) != value \
+ and self.users != None:
+ self.users.changed = True
+ setattr(self, property, value)
+class Users (dict):
+ def __init__(self, filename=None):
+ dict.__init__(self)
+ self.filename = filename
+ self.changed = False
+ def load(self):
+ if self.filename == None:
+ return
+ user_file = libbe.util.encoding.get_file_contents(
+ self.filename, decode=True)
+ self.clear()
+ for line in user_file.splitlines():
+ user = User()
+ user.from_string(line)
+ self.add_user(user)
+ def save(self):
+ if self.filename != None and self.changed == True:
+ lines = []
+ for user in sorted(self.users):
+ lines.append(str(user))
+ libbe.util.encoding.set_file_contents(self.filename)
+ self.changed = False
+ def add_user(self, user):
+ assert user.users == None, user.users
+ user.users = self
+ self[user.uname] = user
+ def valid_login(self, uname, password):
+ if uname in self and \
+ self[uname].valid_login(password) == True:
+ return True
+ return False
+
+class WSGI_Object (object):
+ """Utility class for WGSI clients and middleware.
For details on WGSI, see `PEP 333`_
.. PEP 333: http://www.python.org/dev/peps/pep-0333/
"""
- server_version = "BE-server/" + libbe.version.version()
+ def __init__(self, logger=None, log_level=logging.INFO, log_format=None):
+ self.logger = logger
+ self.log_level = log_level
+ if log_format == None:
+ self.log_format = (
+ '%(REMOTE_ADDR)s - %(REMOTE_USER)s [%(time)s] '
+ '"%(REQUEST_METHOD)s %(REQUEST_URI)s %(HTTP_VERSION)s" '
+ '%(status)s %(bytes)s "%(HTTP_REFERER)s" "%(HTTP_USER_AGENT)s"')
+ else:
+ self.log_format = log_format
- def __init__(self, command, storage):
- self.command = command
- self.storage = storage
- self.http_user_error = 418
+ def __call__(self, environ, start_response):
+ """The main WSGI entry point."""
+ raise NotImplementedError
+ # start_response() is a callback for setting response headers
+ # start_response(status, response_headers, exc_info=None)
+ # status is an HTTP status string (e.g., "200 OK").
+ # response_headers is a list of 2-tuples, the HTTP headers in
+ # key-value format.
+ # exc_info is used in exception handling.
+ #
+ # The application function then returns an iterable of body chunks.
+
+ def error(self, environ, start_response, error, message, headers=[]):
+ """Make it easy to call start_response for errors."""
+ response = '%d %s' % (error, message)
+ self.log_request(environ, status=response, bytes=len(message))
+ start_response(response,
+ [('Content-Type', 'text/plain')]+headers)
+ return [message]
+
+ def log_request(self, environ, status='-1 OK', bytes=-1):
+ if self.logger == None:
+ return
+ req_uri = urllib.quote(environ.get('SCRIPT_NAME', '')
+ + environ.get('PATH_INFO', ''))
+ if environ.get('QUERY_STRING'):
+ req_uri += '?'+environ['QUERY_STRING']
+ start = time.localtime()
+ if time.daylight:
+ offset = time.altzone / 60 / 60 * -100
+ else:
+ offset = time.timezone / 60 / 60 * -100
+ if offset >= 0:
+ offset = "+%0.4d" % (offset)
+ elif offset < 0:
+ offset = "%0.4d" % (offset)
+ d = {
+ 'REMOTE_ADDR': environ.get('REMOTE_ADDR') or '-',
+ 'REMOTE_USER': environ.get('REMOTE_USER') or '-',
+ 'REQUEST_METHOD': environ['REQUEST_METHOD'],
+ 'REQUEST_URI': req_uri,
+ 'HTTP_VERSION': environ.get('SERVER_PROTOCOL'),
+ 'time': time.strftime('%d/%b/%Y:%H:%M:%S ', start) + offset,
+ 'status': status.split(None, 1)[0],
+ 'bytes': bytes,
+ 'HTTP_REFERER': environ.get('HTTP_REFERER', '-'),
+ 'HTTP_USER_AGENT': environ.get('HTTP_USER_AGENT', '-'),
+ }
+ self.logger.log(self.log_level, self.log_format % d)
+
+class ExceptionApp (WSGI_Object):
+ """Some servers (e.g. cherrypy) eat app-raised exceptions.
+ Work around that by logging tracebacks by hand.
+ """
+ def __init__(self, app, *args, **kwargs):
+ WSGI_Object.__init__(self, *args, **kwargs)
+ self.app = app
+
+ def __call__(self, environ, start_response):
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'ExceptionApp')
+ try:
+ return self.app(environ, start_response)
+ except Exception, e:
+ etype,value,tb = sys.exc_info()
+ trace = ''.join(
+ traceback.format_exception(etype, value, tb, None))
+ self.logger.log(self.log_level, trace)
+ raise
+
+class UppercaseHeaderApp (WSGI_Object):
+ """From PEP 333, `The start_response() Callable`_ :
+
+ A reminder for server/gateway authors: HTTP
+ header names are case-insensitive, so be sure
+ to take that into consideration when examining
+ application-supplied headers!
+
+ .. _The start_response() Callable:
+ http://www.python.org/dev/peps/pep-0333/#id20
+ """
+ def __init__(self, app, *args, **kwargs):
+ WSGI_Object.__init__(self, *args, **kwargs)
+ self.app = app
+
+ def __call__(self, environ, start_response):
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'UppercaseHeaderApp')
+ for key,value in environ.items():
+ if key.startswith('HTTP_'):
+ uppercase = key.upper()
+ if uppercase != key:
+ environ[uppercase] = environ.pop(key)
+ return self.app(environ, start_response)
+
+class AuthenticationApp (WSGI_Object):
+ """WSGI middleware for handling user authentication.
+ """
+ def __init__(self, app, realm, setting='be-auth', users=None, *args, **kwargs):
+ WSGI_Object.__init__(self, *args, **kwargs)
+ self.app = app
+ self.realm = realm
+ self.setting = setting
+ self.users = users
+
+ def __call__(self, environ, start_response):
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'AuthenticationApp')
+ environ['%s.realm' % self.setting] = self.realm
+ try:
+ username = self.authenticate(environ)
+ environ['%s.user' % self.setting] = username
+ environ['%s.user.name' % self.setting] = \
+ self.users[username].name
+ return self.app(environ, start_response)
+ except _Unauthorized, e:
+ return self.error(environ, start_response,
+ e.code, e.msg, e.headers)
+
+ def authenticate(self, environ):
+ """Handle user-authentication sent in the 'Authorization' header.
+
+ Basic HTTP/1.0 Authentication
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ This function implements ``Basic`` authentication as described in
+ HTTP/1.0 specification [1]_ . Do not use this module unless you
+ are using SSL, as it transmits unencrypted passwords.
+
+ .. [1] http://www.w3.org/Protocols/HTTP/1.0/draft-ietf-http-spec.html#BasicAA
+
+ >>> users = Users()
+ >>> users.add_user(User('Aladdin', 'Big Al', password='open sesame'))
+ >>> app = AuthenticationApp(app=None, realm='Dummy Realm', users=users)
+ >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='})
+ 'Aladdin'
+ >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic AAAAAAAAAAAAAAAAAAAAAAAAAA=='})
+
+ Code based on authkit/authenticate/basic.py
+ (c) 2005 Clark C. Evans.
+ Released under the MIT License:
+ http://www.opensource.org/licenses/mit-license.php
+ """
+ authorization = environ.get('HTTP_AUTHORIZATION', None)
+ if authorization == None:
+ raise _Unauthorized('Authorization required')
+ try:
+ authmeth,auth = authorization.split(' ',1)
+ except ValueError:
+ return None
+ if 'basic' != authmeth.lower():
+ return None # non-basic HTTP authorization not implemented
+ auth = auth.strip().decode('base64')
+ try:
+ username,password = auth.split(':',1)
+ except ValueError:
+ return None
+ if self.authfunc(environ, username, password) == True:
+ return username
+
+ def authfunc(self, environ, username, password):
+ if not username in self.users:
+ return False
+ if self.users[username].valid_login(password) == True:
+ if self.logger != None:
+ self.logger.log(self.log_level,
+ 'Authenticated %s' % self.users[username].name)
+ return True
+ return False
+
+class WSGI_AppObject (WSGI_Object):
+ """Utility class for WGSI clients and middleware with
+ useful utilities for handling data (POST, QUERY) and
+ returning responses.
+ """
+ def __init__(self, *args, **kwargs):
+ WSGI_Object.__init__(self, *args, **kwargs)
# Maximum input we will accept when REQUEST_METHOD is POST
# 0 ==> unlimited input
self.maxlen = 0
+ def ok_response(self, environ, start_response, content,
+ content_type='application/octet-stream',
+ headers=[]):
+ if content == None:
+ start_response('200 OK', [])
+ return []
+ if type(content) == types.UnicodeType:
+ content = content.encode('utf-8')
+ for i,header in enumerate(headers):
+ header_name,header_value = header
+ if type(header_value) == types.UnicodeType:
+ headers[i] = (header_name, header_value.encode('ISO-8859-1'))
+ response = '200 OK'
+ content_length = len(content)
+ self.log_request(environ, status=response, bytes=content_length)
+ start_response('200 OK', [
+ ('Content-Type', content_type),
+ ('Content-Length', str(content_length)),
+ ]+headers)
+ if self.is_head(environ) == True:
+ return []
+ return [content]
+
+ def query_data(self, environ):
+ if not environ['REQUEST_METHOD'] in ['GET', 'HEAD']:
+ raise _HandlerError(404, 'Not Found')
+ return self._parse_query(environ.get('QUERY_STRING', ''))
+
+ def _parse_query(self, query):
+ if len(query) == 0:
+ return {}
+ data = parse_qs(
+ query, keep_blank_values=True, strict_parsing=True)
+ for k,v in data.items():
+ if len(v) == 1:
+ data[k] = v[0]
+ return data
+
+ def post_data(self, environ):
+ if environ['REQUEST_METHOD'] != 'POST':
+ raise _HandlerError(404, 'Not Found')
+ post_data = self._read_post_data(environ)
+ return self._parse_post(post_data)
+
+ def _parse_post(self, post):
+ return self._parse_query(post)
+
+ def _read_post_data(self, environ):
+ try:
+ clen = int(environ.get('CONTENT_LENGTH', '0'))
+ except ValueError:
+ clen = 0
+ if clen != 0:
+ if self.maxlen > 0 and clen > self.maxlen:
+ raise ValueError, 'Maximum content length exceeded'
+ return environ['wsgi.input'].read(clen)
+ return ''
+
+ def data_get_string(self, data, key, default=None, source='query'):
+ if not key in data or data[key] in [None, 'None']:
+ if default == _HandlerError:
+ raise _HandlerError(406, 'Missing %s key %s' % (source, key))
+ return default
+ return data[key]
+
+ def data_get_id(self, data, key='id', default=_HandlerError,
+ source='query'):
+ return self.data_get_string(data, key, default, source)
+
+ def data_get_boolean(self, data, key, default=False, source='query'):
+ val = self.data_get_string(data, key, default, source)
+ if val == 'True':
+ return True
+ elif val == 'False':
+ return False
+ return val
+
+ def is_head(self, environ):
+ return environ['REQUEST_METHOD'] == 'HEAD'
+
+
+class AdminApp (WSGI_AppObject):
+ """WSGI middleware for managing users (changing passwords,
+ usernames, etc.).
+ """
+ def __init__(self, app, users=None, url=r'^admin/?', *args, **kwargs):
+ WSGI_AppObject.__init__(self, *args, **kwargs)
+ self.app = app
+ self.users = users
+ self.url = url
+
+ def __call__(self, environ, start_response):
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'AdminApp')
+ path = environ.get('PATH_INFO', '').lstrip('/')
+ match = re.search(self.url, path)
+ if match is not None:
+ return self.admin(environ, start_response)
+ return self.app(environ, start_response)
+
+ def admin(self, environ, start_response):
+ if not 'be-auth.user' in environ:
+ raise _Unauthenticated(realm=envirion.get('be-auth.realm'))
+ uname = environ.get('be-auth.user')
+ user = self.users[uname]
+ data = self.post_data(environ)
+ source = 'post'
+ name = self.data_get_string(
+ data, 'name', default=None, source=source)
+ if name != None:
+ self.users[uname].set_name(name)
+ password = self.data_get_string(
+ data, 'password', default=None, source=source)
+ if password != None:
+ self.users[uname].set_password(password)
+ self.users.save()
+ return self.ok_response(environ, start_response, None)
+
+class ServerApp (WSGI_AppObject):
+ """RESTful_ WSGI request handler for serving the
+ libbe.storage.http.HTTP backend with GET, POST, and HEAD commands.
+ For more information on authentication and REST, see John Calcote's
+ `Open Sourcery article`_
+
+ .. _RESTful: http://www.ics.uci.edu/~fielding/pubs/dissertation/rest_arch_style.htm
+ .. _Open Sourcery article: http://jcalcote.wordpress.com/2009/08/10/restful-authentication/
+
+ This serves files from a connected storage instance, usually
+ a VCS-based repository located on the local machine.
+
+ The GET and HEAD requests are identical except that the HEAD
+ request omits the actual content of the file.
+ """
+ server_version = "BE-server/" + libbe.version.version()
+
+ def __init__(self, storage, *args, **kwargs):
+ WSGI_AppObject.__init__(self, *args, **kwargs)
+ self.storage = storage
+ self.http_user_error = 418
+
self.urls = [
- (r'^add/(.+)', self.add),
- (r'^remove/(.+)', self.remove),
+ (r'^add/?', self.add),
+ (r'^exists/?', self.exists),
+ (r'^remove/?', self.remove),
(r'^ancestors/?', self.ancestors),
(r'^children/?', self.children),
(r'^get/(.+)', self.get),
(r'^set/(.+)', self.set),
- (r'^commit/(.+)', self.commit),
+ (r'^commit/?', self.commit),
(r'^revision-id/?', self.revision_id),
(r'^changed/?', self.changed),
(r'^version/?', self.version),
@@ -102,18 +512,12 @@ class ServerApp (object):
the functions from above and store the regular expression
captures in the WSGI environment as `be-server.url_args` so
that the functions from above can access the url placeholders.
+
+ URL dispatcher from Armin Ronacher's "Getting Started with WSGI"
+ http://lucumr.pocoo.org/2007/5/21/getting-started-with-wsgi
"""
- # start_response() is a callback for setting response headers
- # start_response(status, response_headers, exc_info=None)
- # status is an HTTP status string (e.g., "200 OK").
- # response_headers is a list of 2-tuples, the HTTP headers in
- # key-value format.
- # exc_info is used in exception handling.
- #
- # The application function then returns an iterable of body chunks.
- self.log_request(environ)
- # URL dispatcher from Armin Ronacher's "Getting Started with WSGI"
- # http://lucumr.pocoo.org/2007/5/21/getting-started-with-wsgi
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'ServerApp')
path = environ.get('PATH_INFO', '').lstrip('/')
try:
for regex, callback in self.urls:
@@ -131,26 +535,39 @@ class ServerApp (object):
self.http_user_error, 'InvalidID %s' % e)
raise _HandlerError(404, 'Not Found')
except _HandlerError, e:
- return self.error(start_response, e.code, e.msg)
+ return self.error(environ, start_response,
+ e.code, e.msg, e.headers)
# handlers
def add(self, environ, start_response):
+ self.check_login(environ)
data = self.post_data(environ)
source = 'post'
id = self.data_get_id(data, source=source)
parent = self.data_get_string(
data, 'parent', default=None, source=source)
directory = self.data_get_boolean(
- data, 'directory', default=False, souce=source)
+ data, 'directory', default=False, source=source)
self.storage.add(id, parent=parent, directory=directory)
return self.ok_response(environ, start_response, None)
+ def exists(self, environ, start_response):
+ self.check_login(environ)
+ data = self.query_data(environ)
+ source = 'query'
+ id = self.data_get_id(data, source=source)
+ revision = self.data_get_string(
+ data, 'revision', default=None, source=source)
+ content = str(self.storage.exists(id, revision))
+ return self.ok_response(environ, start_response, content)
+
def remove(self, environ, start_response):
+ self.check_login(environ)
data = self.post_data(environ)
source = 'post'
id = self.data_get_id(data, source=source)
recursive = self.data_get_boolean(
- data, 'recursive', default=False, souce=source)
+ data, 'recursive', default=False, source=source)
if recursive == True:
self.storage.recursive_remove(id)
else:
@@ -158,6 +575,7 @@ class ServerApp (object):
return self.ok_response(environ, start_response, None)
def ancestors(self, environ, start_response):
+ self.check_login(environ)
data = self.query_data(environ)
source = 'query'
id = self.data_get_id(data, source=source)
@@ -167,6 +585,7 @@ class ServerApp (object):
return self.ok_response(environ, start_response, content)
def children(self, environ, start_response):
+ self.check_login(environ)
data = self.query_data(environ)
source = 'query'
id = self.data_get_id(data, default=None, source=source)
@@ -176,6 +595,7 @@ class ServerApp (object):
return self.ok_response(environ, start_response, content)
def get(self, environ, start_response):
+ self.check_login(environ)
data = self.query_data(environ)
source = 'query'
try:
@@ -190,6 +610,7 @@ class ServerApp (object):
headers=[('X-BE-Version', be_version)])
def set(self, environ, start_response):
+ self.check_login(environ)
data = self.post_data(environ)
try:
id = environ['be-server.url_args'][0]
@@ -202,9 +623,10 @@ class ServerApp (object):
return self.ok_response(environ, start_response, None)
def commit(self, environ, start_response):
+ self.check_login(environ)
data = self.post_data(environ)
if not 'summary' in data:
- return self.error(start_response, 406, 'Missing query key summary')
+ raise _HandlerError(406, 'Missing query key summary')
summary = data['summary']
if not 'body' in data or data['body'] == 'None':
data['body'] = None
@@ -215,21 +637,22 @@ class ServerApp (object):
else:
allow_empty = False
try:
- self.storage.commit(summary, body, allow_empty)
+ revision = self.storage.commit(summary, body, allow_empty)
except libbe.storage.EmptyCommit, e:
- return self.error(
- start_response, self.http_user_error, 'EmptyCommit')
- return self.ok_response(environ, start_response, None)
+ raise _HandlerError(self.http_user_error, 'EmptyCommit')
+ return self.ok_response(environ, start_response, revision)
def revision_id(self, environ, start_response):
+ self.check_login(environ)
data = self.query_data(environ)
source = 'query'
- index = self.data_get_string(
- data, 'index', default=_HandlerError, source=source)
+ index = int(self.data_get_string(
+ data, 'index', default=_HandlerError, source=source))
content = self.storage.revision_id(index)
return self.ok_response(environ, start_response, content)
def changed(self, environ, start_response):
+ self.check_login(environ)
data = self.query_data(environ)
source = 'query'
revision = self.data_get_string(
@@ -239,6 +662,7 @@ class ServerApp (object):
return self.ok_response(environ, start_response, content)
def version(self, environ, start_response):
+ self.check_login(environ)
data = self.query_data(environ)
source = 'query'
revision = self.data_get_string(
@@ -247,116 +671,17 @@ class ServerApp (object):
return self.ok_response(environ, start_response, content)
# handler utility functions
- def log_request(self, environ):
- print >> self.command.stdout, \
- environ.get('REQUEST_METHOD'), environ.get('PATH_INFO', '')
-
- def error(self, start_response, error, message):
- """Called if no URL matches."""
- start_response('%d %s' % (error, message.upper()),
- [('Content-Type', 'text/plain')])
- return [message]
-
- def ok_response(self, environ, start_response, content,
- content_type='application/octet-stream',
- headers=[]):
- if content == None:
- start_response('200 OK', [])
- return []
- if type(content) == types.UnicodeType:
- content = content.encode('utf-8')
- for i,header in enumerate(headers):
- header_name,header_value = header
- if type(header_value) == types.UnicodeType:
- headers[i] = (header_name, header_value.encode('ISO-8859-1'))
- start_response('200 OK', [
- ('Content-Type', content_type),
- ('Content-Length', str(len(content))),
- ]+headers)
- if self.is_head(environ) == True:
- return []
- return [content]
-
- def query_data(self, environ):
- if not environ['REQUEST_METHOD'] in ['GET', 'HEAD']:
- raise _HandlerError(404, 'Not Found')
- return self._parse_query(environ.get('QUERY_STRING', ''))
-
- def _parse_query(self, query):
- if len(query) == 0:
- return {}
- data = parse_qs(
- query, keep_blank_values=True, strict_parsing=True)
- for k,v in data.items():
- if len(v) == 1:
- data[k] = v[0]
- return data
-
- def post_data(self, environ):
- if environ['REQUEST_METHOD'] != 'POST':
- raise _HandlerError(404, 'Not Found')
- post_data = self._read_post_data(environ)
- return self._parse_post(post_data)
-
- def _parse_post(self, post):
- return self._parse_query(post)
-
- def _read_post_data(self, environ):
- try:
- clen = int(environ.get('CONTENT_LENGTH', '0'))
- except ValueError:
- clen = 0
- if clen != 0:
- if self.maxlen > 0 and clen > self.maxlen:
- raise ValueError, 'Maximum content length exceeded'
- return environ['wsgi.input'].read(clen)
- return ''
-
- def data_get_string(self, data, key, default=None, source='query'):
- if not key in data or data[key] in [None, 'None']:
- if default == _HandlerError:
- raise _HandlerError(406, 'Missing %s key %s' % (source, key))
- return default
- return data[key]
-
- def data_get_id(self, data, key='id', default=_HandlerError,
- source='query'):
- return self.data_get_string(data, key, default, source)
-
- def data_get_boolean(self, data, key, default=False, source='query'):
- val = self.data_get_string(self, data, key, default, source)
- if val == 'True':
- return True
- elif val == 'False':
- return False
- return val
-
- def is_head(self, environ):
- return environ['REQUEST_METHOD'] == 'HEAD'
+ def check_login(self, environ):
+ user = environ.get('be-auth.user', None)
+ if user != None: # we're running under AuthenticationApp
+ if environ['REQUEST_METHOD'] == 'POST':
+ if user == 'guest' or self.storage.is_writeable() == False:
+ raise _Unauthorized() # only non-guests allowed to write
+ # allow read-only commands for all users
class Serve (libbe.command.Command):
"""Serve a Storage backend for the HTTP storage client
-
- >>> raise NotImplementedError, "Serve tests not yet implemented"
- >>> import sys
- >>> import libbe.bugdir
- >>> import libbe.command.list
- >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
- >>> io = libbe.command.StringInputOutput()
- >>> io.stdout = sys.stdout
- >>> ui = libbe.command.UserInterface(io=io)
- >>> ui.storage_callbacks.set_storage(bd.storage)
- >>> cmd = libbe.command.list.List(ui=ui)
-
- >>> ret = ui.run(cmd)
- abc/a:om: Bug A
- >>> ret = ui.run(cmd, {'status':'closed'})
- abc/b:cm: Bug B
- >>> bd.storage.writeable
- True
- >>> ui.cleanup()
- >>> bd.cleanup()
"""
name = 'serve'
@@ -374,18 +699,33 @@ class Serve (libbe.command.Command):
name='host', metavar='HOST', default='')),
libbe.command.Option(name='read-only', short_name='r',
help='Dissable operations that require writing'),
- libbe.command.Option(name='ssl',
+ libbe.command.Option(name='ssl', short_name='s',
help='Use CherryPy to serve HTTPS (HTTP over SSL/TLS)'),
+ libbe.command.Option(name='auth', short_name='a',
+ help='Require authentication. FILE should be a file containing colon-separated UNAME:USER:sha1(PASSWORD) lines, for example: "jdoe:John Doe <jdoe@example.com>:read:d99f8e5a4b02dc25f49da2ea67c0034f61779e72"',
+ arg=libbe.command.Argument(
+ name='auth', metavar='FILE', default=None,
+ completion_callback=libbe.command.util.complete_path)),
])
def _run(self, **params):
+ self._setup_logging()
storage = self._get_storage()
if params['read-only'] == True:
writeable = storage.writeable
storage.writeable = False
if params['host'] == '':
params['host'] = 'localhost'
- app = ServerApp(command=self, storage=storage)
+ if params['auth'] != None:
+ self._check_restricted_access(storage, params['auth'])
+ users = Users(params['auth'])
+ users.load()
+ app = ServerApp(storage=storage, logger=self.logger)
+ if params['auth'] != None:
+ app = AdminApp(app, users=users, logger=self.logger)
+ app = AuthenticationApp(app, realm=storage.repo,
+ users=users, logger=self.logger)
+ app = UppercaseHeaderApp(app, logger=self.logger)
server,details = self._get_server(params, app)
details['repo'] = storage.repo
try:
@@ -396,6 +736,17 @@ class Serve (libbe.command.Command):
if params['read-only'] == True:
storage.writeable = writeable
+ def _setup_logging(self, log_level=logging.INFO):
+ self.logger = logging.getLogger('be-serve')
+ self.log_level = logging.INFO
+ console = logging.StreamHandler(self.stdout)
+ console.setFormatter(logging.Formatter('%(message)s'))
+ self.logger.addHandler(console)
+ self.logger.propagate = False
+ if log_level is not None:
+ console.setLevel(log_level)
+ self.logger.setLevel(log_level)
+
def _get_server(self, params, app):
details = {'port':params['port']}
if params['ssl'] == True:
@@ -403,9 +754,13 @@ class Serve (libbe.command.Command):
if cherrypy == None:
raise libbe.command.UserError, \
'--ssl requires the cherrypy module'
+ app = ExceptionApp(app, logger=self.logger)
server = cherrypy.wsgiserver.CherryPyWSGIServer(
(params['host'], params['port']), app)
- private_key,certificate = get_cert_filenames('be-server')
+ #server.throw_errors = True
+ #server.show_tracebacks = True
+ private_key,certificate = get_cert_filenames(
+ 'be-server', logger=self.logger)
if cherrypy.wsgiserver.ssl_builtin == None:
server.ssl_module = 'builtin'
server.ssl_private_key = private_key
@@ -423,17 +778,18 @@ class Serve (libbe.command.Command):
return (server, details)
def _start_server(self, params, server, details):
- print >> self.stdout, \
+ self.logger.log(self.log_level,
'Serving %(protocol)s on %(socket-name)s port %(port)s ...' \
- % details
- print >> self.stdout, 'BE repository %(repo)s' % details
+ % details)
+ self.logger.log(self.log_level,
+ 'BE repository %(repo)s' % details)
if params['ssl'] == True:
server.start()
else:
server.serve_forever()
def _stop_server(self, params, server):
- print >> self.stdout, 'Closing server'
+ self.logger.log(self.log_level, 'Clossing server')
if params['ssl'] == True:
server.stop()
else:
@@ -441,30 +797,231 @@ class Serve (libbe.command.Command):
def _long_help(self):
return """
-Example usage:
- $ be serve
-And in another terminal (or after backgrounding the server)
- $ be --repo http://localhost:8000 list
-
-If you bind your server to a public interface, you should probably use
-the --read-only option so other people can't mess with your
-repository.
+Example usage::
+
+ $ be serve
+
+And in another terminal (or after backgrounding the server)::
+
+ $ be --repo http://localhost:8000/ list
+
+If you bind your server to a public interface, take a look at the
+``--read-only`` option or the combined ``--ssl --auth FILE``
+options so other people can't mess with your repository. If you do use
+authentication, you'll need to send in your username and password with,
+for example::
+
+ $ be --repo http://username:password@localhost:8000/ list
"""
+def random_string(length=256):
+ if os.path.exists(os.path.join('dev', 'urandom')):
+ return open("/dev/urandom").read(length)
+ else:
+ import array
+ from random import randint
+ d = array.array('B')
+ for i in xrange(1000000):
+ d.append(randint(0,255))
+ return d.tostring()
+
if libbe.TESTING == True:
- class ServerAppTestCase (unittest.TestCase):
+ class WSGITestCase (unittest.TestCase):
+ def setUp(self):
+ self.logstream = StringIO.StringIO()
+ self.logger = logging.getLogger('be-serve-test')
+ console = logging.StreamHandler(self.logstream)
+ console.setFormatter(logging.Formatter('%(message)s'))
+ self.logger.addHandler(console)
+ self.logger.propagate = False
+ console.setLevel(logging.INFO)
+ self.logger.setLevel(logging.INFO)
+ self.default_environ = { # required by PEP 333
+ 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
+ 'SCRIPT_NAME':'',
+ 'PATH_INFO': '',
+ #'QUERY_STRING':'', # may be empty or absent
+ #'CONTENT_TYPE':'', # may be empty or absent
+ #'CONTENT_LENGTH':'', # may be empty or absent
+ 'SERVER_NAME':'example.com',
+ 'SERVER_PORT':'80',
+ 'SERVER_PROTOCOL':'HTTP/1.1',
+ 'wsgi.version':(1,0),
+ 'wsgi.url_scheme':'http',
+ 'wsgi.input':StringIO.StringIO(),
+ 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.multithread':False,
+ 'wsgi.multiprocess':False,
+ 'wsgi.run_once':False,
+ }
+ def getURL(self, app, path='/', method='GET', data=None,
+ scheme='http', environ={}):
+ env = copy.copy(self.default_environ)
+ env['PATH_INFO'] = path
+ env['REQUEST_METHOD'] = method
+ env['scheme'] = scheme
+ if data != None:
+ enc_data = urllib.urlencode(data)
+ if method == 'POST':
+ env['CONTENT_LENGTH'] = len(enc_data)
+ env['wsgi.input'] = StringIO.StringIO(enc_data)
+ else:
+ assert method in ['GET', 'HEAD'], method
+ env['QUERY_STRING'] = enc_data
+ for key,value in environ.items():
+ env[key] = value
+ return ''.join(app(env, self.start_response))
+ def start_response(self, status, response_headers, exc_info=None):
+ self.status = status
+ self.response_headers = response_headers
+ self.exc_info = exc_info
+
+ class WSGI_ObjectTestCase (WSGITestCase):
def setUp(self):
+ WSGITestCase.setUp(self)
+ self.app = WSGI_Object(self.logger)
+ def test_error(self):
+ contents = self.app.error(
+ environ=self.default_environ,
+ start_response=self.start_response,
+ error=123,
+ message='Dummy Error',
+ headers=[('X-Dummy-Header','Dummy Value')])
+ self.failUnless(contents == ['Dummy Error'], contents)
+ self.failUnless(self.status == '123 Dummy Error', self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type','text/plain'),
+ ('X-Dummy-Header','Dummy Value')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ def test_log_request(self):
+ self.app.log_request(
+ environ=self.default_environ, status='-1 OK', bytes=123)
+ log = self.logstream.getvalue()
+ self.failUnless(log.startswith('- -'), log)
+
+ class ExceptionAppTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ def child_app(environ, start_response):
+ raise ValueError('Dummy Error')
+ self.app = ExceptionApp(child_app, self.logger)
+ def test_traceback(self):
+ try:
+ self.getURL(self.app)
+ except ValueError, e:
+ pass
+ log = self.logstream.getvalue()
+ self.failUnless(log.startswith('Traceback'), log)
+ self.failUnless('child_app' in log, log)
+ self.failUnless('ValueError: Dummy Error' in log, log)
+
+ class AdminAppTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ self.users = Users()
+ self.users.add_user(
+ User('Aladdin', 'Big Al', password='open sesame'))
+ self.users.add_user(
+ User('guest', 'Guest', password='guestpass'))
+ def child_app(environ, start_response):
+ pass
+ self.app = AdminApp(
+ child_app, users=self.users, logger=self.logger)
+ self.app = AuthenticationApp(
+ self.app, realm='Dummy Realm', users=self.users,
+ logger=self.logger)
+ self.app = UppercaseHeaderApp(self.app, logger=self.logger)
+ def basic_auth(self, uname, password):
+ """HTTP basic authorization string"""
+ return 'Basic %s' % \
+ ('%s:%s' % (uname, password)).encode('base64')
+ def test_new_name(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data={'name':'Prince Al'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('Aladdin', 'open sesame')})
+ self.failUnless(self.status == '200 OK', self.status)
+ self.failUnless(self.response_headers == [],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['Aladdin'].name == 'Prince Al',
+ self.users['Aladdin'].name)
+ self.failUnless(self.users.changed == True,
+ self.users.changed)
+ def test_new_password(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data={'password':'New Pass'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('Aladdin', 'open sesame')})
+ self.failUnless(self.status == '200 OK', self.status)
+ self.failUnless(self.response_headers == [],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['Aladdin'].passhash == \
+ self.users['Aladdin'].hash('New Pass'),
+ self.users['Aladdin'].passhash)
+ self.failUnless(self.users.changed == True,
+ self.users.changed)
+ def test_guest_name(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data={'name':'SPAM'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('guest', 'guestpass')})
+ self.failUnless(self.status.startswith('403 '), self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type', 'text/plain')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['guest'].name == 'Guest',
+ self.users['guest'].name)
+ self.failUnless(self.users.changed == False,
+ self.users.changed)
+ def test_guest_password(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data={'password':'SPAM'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('guest', 'guestpass')})
+ self.failUnless(self.status.startswith('403 '), self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type', 'text/plain')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['guest'].name == 'Guest',
+ self.users['guest'].name)
+ self.failUnless(self.users.changed == False,
+ self.users.changed)
+
+ class ServerAppTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
self.bd = libbe.bugdir.SimpleBugDir(memory=False)
- storage = self.bd.storage
- command = object()
- command.stdout = StringIO.StringIO()
- command.stdout.encoding = 'utf-8'
- self.app = ServerApp(command=self, storage=storage)
+ self.app = ServerApp(self.bd.storage, logger=self.logger)
def tearDown(self):
self.bd.cleanup()
- def testValidWSGI(self):
- wsgiref.validate.validator(self.app)
- pass
+ WSGITestCase.tearDown(self)
+ def test_add_get(self):
+ self.getURL(self.app, '/add/', method='GET')
+ self.failUnless(self.status.startswith('404 '), self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type', 'text/plain')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ def test_add_post(self):
+ self.getURL(self.app, '/add/', method='POST',
+ data={'id':'123456', 'parent':'abc123',
+ 'directory':'True'})
+ self.failUnless(self.status == '200 OK', self.status)
+ self.failUnless(self.response_headers == [],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ # Note: other methods tested in libbe.storage.http
+
+ # TODO: integration tests on Serve?
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
@@ -473,7 +1030,7 @@ if libbe.TESTING == True:
# The following certificate-creation code is adapted From pyOpenSSL's
# examples.
-def get_cert_filenames(server_name, autogenerate=True):
+def get_cert_filenames(server_name, autogenerate=True, logger=None):
"""
Generate private key and certification filenames.
get_cert_filenames(server_name) -> (pkey_filename, cert_filename)
@@ -483,7 +1040,7 @@ def get_cert_filenames(server_name, autogenerate=True):
if autogenerate == True:
for file in [pkey_file, cert_file]:
if not os.path.exists(file):
- make_certs(server_name)
+ make_certs(server_name, logger)
return (pkey_file, cert_file)
def createKeyPair(type, bits):
@@ -550,7 +1107,7 @@ def createCertificate(req, (issuerCert, issuerKey), serial, (notBefore, notAfter
cert.sign(issuerKey, digest)
return cert
-def make_certs(server_name) :
+def make_certs(server_name, logger=None) :
"""
Generate private key and certification files.
mk_certs(server_name) -> (pkey_filename, cert_filename)
@@ -560,7 +1117,9 @@ def make_certs(server_name) :
'SSL certificate generation requires the OpenSSL module'
pkey_file,cert_file = get_cert_filenames(
server_name, autogenerate=False)
- print >> sys.stderr, 'Generating certificates', pkey_file, cert_file
+ if logger != None:
+ logger.log(logger._server_level,
+ 'Generating certificates', pkey_file, cert_file)
cakey = createKeyPair(OpenSSL.crypto.TYPE_RSA, 1024)
careq = createCertRequest(cakey, CN='Certificate Authority')
cacert = createCertificate(
diff --git a/libbe/storage/__init__.py b/libbe/storage/__init__.py
index b6b0ac1..c3bda4b 100644
--- a/libbe/storage/__init__.py
+++ b/libbe/storage/__init__.py
@@ -50,7 +50,7 @@ def get_storage(location):
"""
Return a Storage instance from a repo location string.
"""
- if location.startswith('http://'):
+ if location.startswith('http://') or location.startswith('https://'):
return get_http_storage(location)
return get_vcs_storage(location)
diff --git a/libbe/storage/http.py b/libbe/storage/http.py
index 2de2aff..5606383 100644
--- a/libbe/storage/http.py
+++ b/libbe/storage/http.py
@@ -36,9 +36,14 @@ import base
from libbe import TESTING
if TESTING == True:
+ import copy
import doctest
+ import StringIO
import unittest
+ import libbe.bugdir
+ import libbe.command.serve
+
USER_AGENT = 'BE-HTTP-Storage'
HTTP_OK = 200
@@ -60,7 +65,7 @@ class InvalidURL (Exception):
return self.error.__str__()
return self.msg
-def get_post_url(url, get=True, data_dict=None):
+def get_post_url(url, get=True, data_dict=None, headers=[]):
"""
get: use GET if True, otherwise use POST.
data_dict: dict of data to send.
@@ -75,7 +80,8 @@ def get_post_url(url, get=True, data_dict=None):
data = None
else:
data = urllib.urlencode(data_dict)
- headers = {'User-Agent':USER_AGENT}
+ headers = dict(headers)
+ headers['User-Agent'] = USER_AGENT
req = urllib2.Request(url, data=data, headers=headers)
try:
response = urllib2.urlopen(req)
@@ -101,8 +107,37 @@ class HTTP (base.VersionedStorage):
"""
name = 'HTTP'
- def __init__(self, *args, **kwargs):
- base.VersionedStorage.__init__(self, *args, **kwargs)
+ def __init__(self, repo, *args, **kwargs):
+ repo,self.uname,self.password = self.parse_repo(repo)
+ base.VersionedStorage.__init__(self, repo, *args, **kwargs)
+
+ def parse_repo(self, repo):
+ """Grab username and password (if any) from the repo URL.
+ >>> s = HTTP('http://host.com/path/to/repo')
+ >>> s.repo
+ 'http://host.com/path/to/repo'
+ >>> s.uname == None
+ True
+ >>> s.password == None
+ True
+ >>> s.parse_repo('http://joe:secret@host.com/path/to/repo')
+ ('http://host.com/path/to/repo', 'joe', 'secret')
+ """
+ scheme,netloc,path,params,query,fragment = urlparse.urlparse(repo)
+ parts = netloc.split('@', 1)
+ if len(parts) == 2:
+ uname,password = parts[0].split(':')
+ repo = urlparse.urlunparse(
+ (scheme, parts[1], path, params, query, fragment))
+ else:
+ uname,password = (None, None)
+ return (repo, uname, password)
+
+ def get_post_url(self, url, get=True, data_dict=None, headers=[]):
+ if self.uname != None and self.password != None:
+ headers.append(('Authorization','Basic %s' % \
+ ('%s:%s' % (self.uname, self.password)).encode('base64')))
+ return get_post_url(url, get, data_dict, headers)
def storage_version(self, revision=None):
"""Return the storage format for this backend."""
@@ -126,32 +161,41 @@ class HTTP (base.VersionedStorage):
def _add(self, id, parent=None, directory=False):
url = urlparse.urljoin(self.repo, 'add')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'id':id, 'parent':parent, 'directory':directory})
+ def _exists(self, id, revision=None):
+ url = urlparse.urljoin(self.repo, 'exists')
+ page,final_url,info = self.get_post_url(
+ url, get=True,
+ data_dict={'id':id, 'revision':revision})
+ if page == 'True':
+ return True
+ return False
+
def _remove(self, id):
url = urlparse.urljoin(self.repo, 'remove')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'id':id, 'recursive':False})
def _recursive_remove(self, id):
url = urlparse.urljoin(self.repo, 'remove')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'id':id, 'recursive':True})
def _ancestors(self, id=None, revision=None):
url = urlparse.urljoin(self.repo, 'ancestors')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'id':id, 'revision':revision})
return page.strip('\n').splitlines()
def _children(self, id=None, revision=None):
url = urlparse.urljoin(self.repo, 'children')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'id':id, 'revision':revision})
return page.strip('\n').splitlines()
@@ -159,7 +203,7 @@ class HTTP (base.VersionedStorage):
def _get(self, id, default=base.InvalidObject, revision=None):
url = urlparse.urljoin(self.repo, '/'.join(['get', id]))
try:
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'revision':revision})
except InvalidURL, e:
@@ -177,13 +221,14 @@ class HTTP (base.VersionedStorage):
def _set(self, id, value):
url = urlparse.urljoin(self.repo, '/'.join(['set', id]))
try:
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'value':value})
except InvalidURL, e:
if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID):
raise
- if e.error.code == HTTP_USER_ERROR:
+ if e.error.code == HTTP_USER_ERROR \
+ and not 'InvalidID' in str(e.error):
raise base.InvalidDirectory(
'Directory %s cannot have data' % id)
raise base.InvalidID(id)
@@ -191,7 +236,7 @@ class HTTP (base.VersionedStorage):
def _commit(self, summary, body=None, allow_empty=False):
url = urlparse.urljoin(self.repo, 'commit')
try:
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'summary':summary, 'body':body,
'allow_empty':allow_empty})
@@ -218,12 +263,12 @@ class HTTP (base.VersionedStorage):
return None
try:
if int(index) != index:
- raise InvalidRevision(index)
+ raise base.InvalidRevision(index)
except ValueError:
- raise InvalidRevision(index)
+ raise base.InvalidRevision(index)
url = urlparse.urljoin(self.repo, 'revision-id')
try:
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'index':index})
except InvalidURL, e:
@@ -236,7 +281,7 @@ class HTTP (base.VersionedStorage):
def changed(self, revision=None):
url = urlparse.urljoin(self.repo, 'changed')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'revision':revision})
lines = page.strip('\n')
@@ -251,7 +296,7 @@ class HTTP (base.VersionedStorage):
def storage_version(self, revision=None):
url = urlparse.urljoin(self.repo, 'version')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True, data_dict={'revision':revision})
return page.rstrip('\n')
@@ -272,7 +317,101 @@ if TESTING == True:
'Redirect?\n Expected: "%s"\n Got: "%s"'
% (expected, final_url))
- #make_storage_testcase_subclasses(VersionedStorage, sys.modules[__name__])
+ class TestingHTTP (HTTP):
+ name = 'TestingHTTP'
+ def __init__(self, repo, *args, **kwargs):
+ self._storage_backend = base.VersionedStorage(repo)
+ self.app = libbe.command.serve.ServerApp(
+ storage=self._storage_backend)
+ HTTP.__init__(self, repo='http://localhost:8000/', *args, **kwargs)
+ self.intitialized = False
+ # duplicated from libbe.storage.serve.WSGITestCase
+ self.default_environ = {
+ 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
+ 'SCRIPT_NAME':'',
+ 'PATH_INFO': '',
+ #'QUERY_STRING':'', # may be empty or absent
+ #'CONTENT_TYPE':'', # may be empty or absent
+ #'CONTENT_LENGTH':'', # may be empty or absent
+ 'SERVER_NAME':'example.com',
+ 'SERVER_PORT':'80',
+ 'SERVER_PROTOCOL':'HTTP/1.1',
+ 'wsgi.version':(1,0),
+ 'wsgi.url_scheme':'http',
+ 'wsgi.input':StringIO.StringIO(),
+ 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.multithread':False,
+ 'wsgi.multiprocess':False,
+ 'wsgi.run_once':False,
+ }
+ def getURL(self, app, path='/', method='GET', data=None,
+ scheme='http', environ={}):
+ # duplicated from libbe.storage.serve.WSGITestCase
+ env = copy.copy(self.default_environ)
+ env['PATH_INFO'] = path
+ env['REQUEST_METHOD'] = method
+ env['scheme'] = scheme
+ if data != None:
+ enc_data = urllib.urlencode(data)
+ if method == 'POST':
+ env['CONTENT_LENGTH'] = len(enc_data)
+ env['wsgi.input'] = StringIO.StringIO(enc_data)
+ else:
+ assert method in ['GET', 'HEAD'], method
+ env['QUERY_STRING'] = enc_data
+ for key,value in environ.items():
+ env[key] = value
+ return ''.join(app(env, self.start_response))
+ def start_response(self, status, response_headers, exc_info=None):
+ self.status = status
+ self.response_headers = response_headers
+ self.exc_info = exc_info
+ def get_post_url(self, url, get=True, data_dict=None, headers=[]):
+ if get == True:
+ method = 'GET'
+ else:
+ method = 'POST'
+ scheme,netloc,path,params,query,fragment = urlparse.urlparse(url)
+ environ = {}
+ for header_name,header_value in headers:
+ environ['HTTP_%s' % header_name] = header_value
+ output = self.getURL(
+ self.app, path, method, data_dict, scheme, environ)
+ if self.status != '200 OK':
+ class __estr (object):
+ def __init__(self, string):
+ self.string = string
+ self.code = int(string.split()[0])
+ def __str__(self):
+ return self.string
+ error = __estr(self.status)
+ raise InvalidURL(error=error, url=url, msg=output)
+ info = dict(self.response_headers)
+ return (output, url, info)
+ def _init(self):
+ try:
+ HTTP._init(self)
+ raise AssertionError
+ except base.NotSupported:
+ pass
+ self._storage_backend._init()
+ def _destroy(self):
+ try:
+ HTTP._destroy(self)
+ raise AssertionError
+ except base.NotSupported:
+ pass
+ self._storage_backend._destroy()
+ def _connect(self):
+ self._storage_backend._connect()
+ HTTP._connect(self)
+ def _disconnect(self):
+ HTTP._disconnect(self)
+ self._storage_backend._disconnect()
+
+
+ base.make_versioned_storage_testcase_subclasses(
+ TestingHTTP, sys.modules[__name__])
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])