diff options
Diffstat (limited to 'libbe/util')
-rw-r--r-- | libbe/util/wsgi.py | 213 |
1 files changed, 1 insertions, 212 deletions
diff --git a/libbe/util/wsgi.py b/libbe/util/wsgi.py index dcddcf3..2e97941 100644 --- a/libbe/util/wsgi.py +++ b/libbe/util/wsgi.py @@ -353,81 +353,6 @@ class UppercaseHeaderApp (WSGI_Middleware): return self.app(environ, start_response) -class AuthenticationApp (WSGI_Middleware): - """WSGI middleware for handling user authentication. - """ - def __init__(self, realm, setting='be-auth', users=None, *args, **kwargs): - super(AuthenticationApp, self).__init__(*args, **kwargs) - self.realm = realm - self.setting = setting - self.users = users - - def _call(self, environ, start_response): - environ['{}.realm'.format(self.setting)] = self.realm - try: - username = self.authenticate(environ) - environ['{}.user'.format(self.setting)] = username - environ['{}.user.name'.format(self.setting)] = self.users[username].name - return self.app(environ, start_response) - except Unauthorized, e: - return self.error(environ, start_response, - e.code, e.msg, e.headers) - - def authenticate(self, environ): - """Handle user-authentication sent in the "Authorization" header. - - This function implements ``Basic`` authentication as described in - HTTP/1.0 specification [1]_ . Do not use this module unless you - are using SSL, as it transmits unencrypted passwords. - - .. [1] http://www.w3.org/Protocols/HTTP/1.0/draft-ietf-http-spec.html#BasicAA - - Examples - -------- - - >>> users = Users() - >>> users.add_user(User('Aladdin', 'Big Al', password='open sesame')) - >>> app = AuthenticationApp(app=None, realm='Dummy Realm', users=users) - >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='}) - 'Aladdin' - >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic AAAAAAAAAAAAAAAAAAAAAAAAAA=='}) - - Notes - ----- - - Code based on authkit/authenticate/basic.py - (c) 2005 Clark C. Evans. - Released under the MIT License: - http://www.opensource.org/licenses/mit-license.php - """ - authorization = environ.get('HTTP_AUTHORIZATION', None) - if authorization is None: - raise Unauthorized('Authorization required') - try: - authmeth,auth = authorization.split(' ', 1) - except ValueError: - return None - if 'basic' != authmeth.lower(): - return None # non-basic HTTP authorization not implemented - auth = auth.strip().decode('base64') - try: - username,password = auth.split(':', 1) - except ValueError: - return None - if self.authfunc(environ, username, password): - return username - - def authfunc(self, environ, username, password): - if not username in self.users: - return False - if self.users[username].valid_login(password): - if self.logger is not None: - self.logger.log(self.log_level, - 'Authenticated {}'.format(self.users[username].name)) - return True - return False - - class WSGI_DataObject (WSGI_Object): """Useful WSGI utilities for handling data (POST, QUERY) and returning responses. @@ -544,41 +469,6 @@ class WSGI_AppObject (WSGI_Object): return self.default_handler(environ, start_response) -class AdminApp (WSGI_AppObject, WSGI_DataObject, WSGI_Middleware): - """WSGI middleware for managing users - - Changing passwords, usernames, etc. - """ - def __init__(self, users=None, setting='be-auth', *args, **kwargs): - handler = ('^admin/?', self.admin) - if 'urls' not in kwargs: - kwargs['urls'] = [handler] - else: - kwargs.urls.append(handler) - super(AdminApp, self).__init__(*args, **kwargs) - self.users = users - self.setting = setting - - def admin(self, environ, start_response): - if not '{}.user'.format(self.setting) in environ: - realm = envirion.get('{}.realm'.format(self.setting)) - raise Unauthenticated(realm=realm) - uname = environ.get('{}.user'.format(self.setting)) - user = self.users[uname] - data = self.post_data(environ) - source = 'post' - name = self.data_get_string( - data, 'name', default=None, source=source) - if name is not None: - self.users[uname].set_name(name) - password = self.data_get_string( - data, 'password', default=None, source=source) - if password is not None: - self.users[uname].set_password(password) - self.users.save() - return self.ok_response(environ, start_response, None) - - class SilentRequestHandler (wsgiref.simple_server.WSGIRequestHandler): def log_message(self, format, *args): pass @@ -631,15 +521,6 @@ class ServerCommand (libbe.command.base.Command): name='notify', metavar='EMAIL-COMMAND', default=None)), libbe.command.Option(name='ssl', short_name='s', help='Use CherryPy to serve HTTPS (HTTP over SSL/TLS)'), - libbe.command.Option(name='auth', short_name='a', - help=('Require authentication. FILE should be a file ' - 'containing colon-separated ' - 'UNAME:USER:sha1(PASSWORD) lines, for example: ' - '"jdoe:John Doe <jdoe@example.com>:' - 'd99f8e5a4b02dc25f49da2ea67c0034f61779e72"'), - arg=libbe.command.Argument( - name='auth', metavar='FILE', default=None, - completion_callback=libbe.command.util.complete_path)), ]) def _run(self, **params): @@ -655,15 +536,9 @@ class ServerCommand (libbe.command.base.Command): if params['read-only']: writeable = storage.writeable storage.writeable = False - if params['auth']: - self._check_restricted_access(storage, params['auth']) - users = Users(params['auth']) + users = Users() users.load() app = self._get_app(logger=self.logger, storage=storage, **params) - if params['auth']: - app = AdminApp(app, users=users, logger=self.logger) - app = AuthenticationApp(app, realm=storage.repo, - users=users, logger=self.logger) app = UppercaseHeaderApp(app, logger=self.logger) server,details = self._get_server(params, app) details['repo'] = storage.repo @@ -954,92 +829,6 @@ if libbe.TESTING: self.failUnless('child_app' in log, log) self.failUnless('ValueError: Dummy Error' in log, log) - - class AdminAppTestCase (WSGITestCase): - def setUp(self): - WSGITestCase.setUp(self) - self.users = Users() - self.users.add_user( - User('Aladdin', 'Big Al', password='open sesame')) - self.users.add_user( - User('guest', 'Guest', password='guestpass')) - def child_app(environ, start_response): - pass - app = AdminApp( - app=child_app, users=self.users, logger=self.logger) - app = AuthenticationApp( - app=app, realm='Dummy Realm', users=self.users, - logger=self.logger) - self.app = UppercaseHeaderApp(app=app, logger=self.logger) - - def basic_auth(self, uname, password): - """HTTP basic authorization string""" - return 'Basic {}'.format( - '{}:{}'.format(uname, password).encode('base64')) - - def test_new_name(self): - self.getURL( - self.app, '/admin/', method='POST', - data_dict={'name':'Prince Al'}, - environ={'HTTP_Authorization': - self.basic_auth('Aladdin', 'open sesame')}) - self.failUnless(self.status == '200 OK', self.status) - self.failUnless(self.response_headers == [], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - self.failUnless(self.users['Aladdin'].name == 'Prince Al', - self.users['Aladdin'].name) - self.failUnless(self.users.changed == True, - self.users.changed) - - def test_new_password(self): - self.getURL( - self.app, '/admin/', method='POST', - data_dict={'password':'New Pass'}, - environ={'HTTP_Authorization': - self.basic_auth('Aladdin', 'open sesame')}) - self.failUnless(self.status == '200 OK', self.status) - self.failUnless(self.response_headers == [], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - self.failUnless((self.users['Aladdin'].passhash == - self.users['Aladdin'].hash('New Pass')), - self.users['Aladdin'].passhash) - self.failUnless(self.users.changed == True, - self.users.changed) - - def test_guest_name(self): - self.getURL( - self.app, '/admin/', method='POST', - data_dict={'name':'SPAM'}, - environ={'HTTP_Authorization': - self.basic_auth('guest', 'guestpass')}) - self.failUnless(self.status.startswith('403 '), self.status) - self.failUnless(self.response_headers == [ - ('Content-Type', 'text/plain')], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - self.failUnless(self.users['guest'].name == 'Guest', - self.users['guest'].name) - self.failUnless(self.users.changed == False, - self.users.changed) - - def test_guest_password(self): - self.getURL( - self.app, '/admin/', method='POST', - data_dict={'password':'SPAM'}, - environ={'HTTP_Authorization': - self.basic_auth('guest', 'guestpass')}) - self.failUnless(self.status.startswith('403 '), self.status) - self.failUnless(self.response_headers == [ - ('Content-Type', 'text/plain')], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - self.failUnless(self.users['guest'].name == 'Guest', - self.users['guest'].name) - self.failUnless(self.users.changed == False, - self.users.changed) - unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |