aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/util/wsgi.py
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/util/wsgi.py')
-rw-r--r--libbe/util/wsgi.py213
1 files changed, 1 insertions, 212 deletions
diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py
index dcddcf3..2e97941 100644
--- a/libbe/util/wsgi.py
+++ b/libbe/util/wsgi.py
@@ -353,81 +353,6 @@ class UppercaseHeaderApp (WSGI_Middleware):
return self.app(environ, start_response)
-class AuthenticationApp (WSGI_Middleware):
- """WSGI middleware for handling user authentication.
- """
- def __init__(self, realm, setting='be-auth', users=None, *args, **kwargs):
- super(AuthenticationApp, self).__init__(*args, **kwargs)
- self.realm = realm
- self.setting = setting
- self.users = users
-
- def _call(self, environ, start_response):
- environ['{}.realm'.format(self.setting)] = self.realm
- try:
- username = self.authenticate(environ)
- environ['{}.user'.format(self.setting)] = username
- environ['{}.user.name'.format(self.setting)] = self.users[username].name
- return self.app(environ, start_response)
- except Unauthorized, e:
- return self.error(environ, start_response,
- e.code, e.msg, e.headers)
-
- def authenticate(self, environ):
- """Handle user-authentication sent in the "Authorization" header.
-
- This function implements ``Basic`` authentication as described in
- HTTP/1.0 specification [1]_ . Do not use this module unless you
- are using SSL, as it transmits unencrypted passwords.
-
- .. [1] http://www.w3.org/Protocols/HTTP/1.0/draft-ietf-http-spec.html#BasicAA
-
- Examples
- --------
-
- >>> users = Users()
- >>> users.add_user(User('Aladdin', 'Big Al', password='open sesame'))
- >>> app = AuthenticationApp(app=None, realm='Dummy Realm', users=users)
- >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='})
- 'Aladdin'
- >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic AAAAAAAAAAAAAAAAAAAAAAAAAA=='})
-
- Notes
- -----
-
- Code based on authkit/authenticate/basic.py
- (c) 2005 Clark C. Evans.
- Released under the MIT License:
- http://www.opensource.org/licenses/mit-license.php
- """
- authorization = environ.get('HTTP_AUTHORIZATION', None)
- if authorization is None:
- raise Unauthorized('Authorization required')
- try:
- authmeth,auth = authorization.split(' ', 1)
- except ValueError:
- return None
- if 'basic' != authmeth.lower():
- return None # non-basic HTTP authorization not implemented
- auth = auth.strip().decode('base64')
- try:
- username,password = auth.split(':', 1)
- except ValueError:
- return None
- if self.authfunc(environ, username, password):
- return username
-
- def authfunc(self, environ, username, password):
- if not username in self.users:
- return False
- if self.users[username].valid_login(password):
- if self.logger is not None:
- self.logger.log(self.log_level,
- 'Authenticated {}'.format(self.users[username].name))
- return True
- return False
-
-
class WSGI_DataObject (WSGI_Object):
"""Useful WSGI utilities for handling data (POST, QUERY) and
returning responses.
@@ -544,41 +469,6 @@ class WSGI_AppObject (WSGI_Object):
return self.default_handler(environ, start_response)
-class AdminApp (WSGI_AppObject, WSGI_DataObject, WSGI_Middleware):
- """WSGI middleware for managing users
-
- Changing passwords, usernames, etc.
- """
- def __init__(self, users=None, setting='be-auth', *args, **kwargs):
- handler = ('^admin/?', self.admin)
- if 'urls' not in kwargs:
- kwargs['urls'] = [handler]
- else:
- kwargs.urls.append(handler)
- super(AdminApp, self).__init__(*args, **kwargs)
- self.users = users
- self.setting = setting
-
- def admin(self, environ, start_response):
- if not '{}.user'.format(self.setting) in environ:
- realm = envirion.get('{}.realm'.format(self.setting))
- raise Unauthenticated(realm=realm)
- uname = environ.get('{}.user'.format(self.setting))
- user = self.users[uname]
- data = self.post_data(environ)
- source = 'post'
- name = self.data_get_string(
- data, 'name', default=None, source=source)
- if name is not None:
- self.users[uname].set_name(name)
- password = self.data_get_string(
- data, 'password', default=None, source=source)
- if password is not None:
- self.users[uname].set_password(password)
- self.users.save()
- return self.ok_response(environ, start_response, None)
-
-
class SilentRequestHandler (wsgiref.simple_server.WSGIRequestHandler):
def log_message(self, format, *args):
pass
@@ -631,15 +521,6 @@ class ServerCommand (libbe.command.base.Command):
name='notify', metavar='EMAIL-COMMAND', default=None)),
libbe.command.Option(name='ssl', short_name='s',
help='Use CherryPy to serve HTTPS (HTTP over SSL/TLS)'),
- libbe.command.Option(name='auth', short_name='a',
- help=('Require authentication. FILE should be a file '
- 'containing colon-separated '
- 'UNAME:USER:sha1(PASSWORD) lines, for example: '
- '"jdoe:John Doe <jdoe@example.com>:'
- 'd99f8e5a4b02dc25f49da2ea67c0034f61779e72"'),
- arg=libbe.command.Argument(
- name='auth', metavar='FILE', default=None,
- completion_callback=libbe.command.util.complete_path)),
])
def _run(self, **params):
@@ -655,15 +536,9 @@ class ServerCommand (libbe.command.base.Command):
if params['read-only']:
writeable = storage.writeable
storage.writeable = False
- if params['auth']:
- self._check_restricted_access(storage, params['auth'])
- users = Users(params['auth'])
+ users = Users()
users.load()
app = self._get_app(logger=self.logger, storage=storage, **params)
- if params['auth']:
- app = AdminApp(app, users=users, logger=self.logger)
- app = AuthenticationApp(app, realm=storage.repo,
- users=users, logger=self.logger)
app = UppercaseHeaderApp(app, logger=self.logger)
server,details = self._get_server(params, app)
details['repo'] = storage.repo
@@ -954,92 +829,6 @@ if libbe.TESTING:
self.failUnless('child_app' in log, log)
self.failUnless('ValueError: Dummy Error' in log, log)
-
- class AdminAppTestCase (WSGITestCase):
- def setUp(self):
- WSGITestCase.setUp(self)
- self.users = Users()
- self.users.add_user(
- User('Aladdin', 'Big Al', password='open sesame'))
- self.users.add_user(
- User('guest', 'Guest', password='guestpass'))
- def child_app(environ, start_response):
- pass
- app = AdminApp(
- app=child_app, users=self.users, logger=self.logger)
- app = AuthenticationApp(
- app=app, realm='Dummy Realm', users=self.users,
- logger=self.logger)
- self.app = UppercaseHeaderApp(app=app, logger=self.logger)
-
- def basic_auth(self, uname, password):
- """HTTP basic authorization string"""
- return 'Basic {}'.format(
- '{}:{}'.format(uname, password).encode('base64'))
-
- def test_new_name(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data_dict={'name':'Prince Al'},
- environ={'HTTP_Authorization':
- self.basic_auth('Aladdin', 'open sesame')})
- self.failUnless(self.status == '200 OK', self.status)
- self.failUnless(self.response_headers == [],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['Aladdin'].name == 'Prince Al',
- self.users['Aladdin'].name)
- self.failUnless(self.users.changed == True,
- self.users.changed)
-
- def test_new_password(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data_dict={'password':'New Pass'},
- environ={'HTTP_Authorization':
- self.basic_auth('Aladdin', 'open sesame')})
- self.failUnless(self.status == '200 OK', self.status)
- self.failUnless(self.response_headers == [],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless((self.users['Aladdin'].passhash ==
- self.users['Aladdin'].hash('New Pass')),
- self.users['Aladdin'].passhash)
- self.failUnless(self.users.changed == True,
- self.users.changed)
-
- def test_guest_name(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data_dict={'name':'SPAM'},
- environ={'HTTP_Authorization':
- self.basic_auth('guest', 'guestpass')})
- self.failUnless(self.status.startswith('403 '), self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type', 'text/plain')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['guest'].name == 'Guest',
- self.users['guest'].name)
- self.failUnless(self.users.changed == False,
- self.users.changed)
-
- def test_guest_password(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data_dict={'password':'SPAM'},
- environ={'HTTP_Authorization':
- self.basic_auth('guest', 'guestpass')})
- self.failUnless(self.status.startswith('403 '), self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type', 'text/plain')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['guest'].name == 'Guest',
- self.users['guest'].name)
- self.failUnless(self.users.changed == False,
- self.users.changed)
-
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])