aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage/http.py
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/storage/http.py')
-rw-r--r--libbe/storage/http.py368
1 files changed, 0 insertions, 368 deletions
diff --git a/libbe/storage/http.py b/libbe/storage/http.py
deleted file mode 100644
index 030349d..0000000
--- a/libbe/storage/http.py
+++ /dev/null
@@ -1,368 +0,0 @@
-# Copyright (C) 2010-2012 Chris Ball <cjb@laptop.org>
-# W. Trevor King <wking@tremily.us>
-#
-# This file is part of Bugs Everywhere.
-#
-# Bugs Everywhere is free software: you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 2 of the License, or (at your option) any
-# later version.
-#
-# Bugs Everywhere is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
-
-"""Define an HTTP-based :py:class:`~libbe.storage.base.VersionedStorage`
-implementation.
-
-See Also
---------
-:py:mod:`libbe.command.serve_storage` : the associated server
-"""
-
-from __future__ import absolute_import
-import sys
-import urllib
-import urlparse
-
-import libbe
-import libbe.version
-import libbe.util.http
-from libbe.util.http import HTTP_VALID, HTTP_USER_ERROR
-from . import base
-
-from libbe import TESTING
-
-if TESTING == True:
- import copy
- import doctest
- import StringIO
- import unittest
-
- import libbe.bugdir
- import libbe.command.serve_storage
- import libbe.util.http
- import libbe.util.wsgi
-
-
-class HTTP (base.VersionedStorage):
- """:py:class:`~libbe.storage.base.VersionedStorage` implementation over
- HTTP.
-
- Uses GET to retrieve information and POST to set information.
- """
- name = 'HTTP'
- user_agent = 'BE-HTTP-Storage'
-
- def __init__(self, repo, *args, **kwargs):
- repo,self.uname,self.password = self.parse_repo(repo)
- base.VersionedStorage.__init__(self, repo, *args, **kwargs)
-
- def parse_repo(self, repo):
- """Grab username and password (if any) from the repo URL.
-
- Examples
- --------
-
- >>> s = HTTP('http://host.com/path/to/repo')
- >>> s.repo
- 'http://host.com/path/to/repo'
- >>> s.uname == None
- True
- >>> s.password == None
- True
- >>> s.parse_repo('http://joe:secret@host.com/path/to/repo')
- ('http://host.com/path/to/repo', 'joe', 'secret')
- """
- scheme,netloc,path,params,query,fragment = urlparse.urlparse(repo)
- parts = netloc.split('@', 1)
- if len(parts) == 2:
- uname,password = parts[0].split(':')
- repo = urlparse.urlunparse(
- (scheme, parts[1], path, params, query, fragment))
- else:
- uname,password = (None, None)
- return (repo, uname, password)
-
- def get_post_url(self, url, get=True, data_dict=None, headers=[]):
- if self.uname != None and self.password != None:
- headers.append(('Authorization','Basic %s' % \
- ('%s:%s' % (self.uname, self.password)).encode('base64')))
- return libbe.util.http.get_post_url(
- url, get, data_dict=data_dict, headers=headers,
- agent=self.user_agent)
-
- def storage_version(self, revision=None):
- """Return the storage format for this backend."""
- return libbe.storage.STORAGE_VERSION
-
- def _init(self):
- """Create a new storage repository."""
- raise base.NotSupported(
- 'init', 'Cannot initialize this repository format.')
-
- def _destroy(self):
- """Remove the storage repository."""
- raise base.NotSupported(
- 'destroy', 'Cannot destroy this repository format.')
-
- def _connect(self):
- self.check_storage_version()
-
- def _disconnect(self):
- pass
-
- def _add(self, id, parent=None, directory=False):
- url = urlparse.urljoin(self.repo, 'add')
- page,final_url,info = self.get_post_url(
- url, get=False,
- data_dict={'id':id, 'parent':parent, 'directory':directory})
-
- def _exists(self, id, revision=None):
- url = urlparse.urljoin(self.repo, 'exists')
- page,final_url,info = self.get_post_url(
- url, get=True,
- data_dict={'id':id, 'revision':revision})
- if page == 'True':
- return True
- return False
-
- def _remove(self, id):
- url = urlparse.urljoin(self.repo, 'remove')
- page,final_url,info = self.get_post_url(
- url, get=False,
- data_dict={'id':id, 'recursive':False})
-
- def _recursive_remove(self, id):
- url = urlparse.urljoin(self.repo, 'remove')
- page,final_url,info = self.get_post_url(
- url, get=False,
- data_dict={'id':id, 'recursive':True})
-
- def _ancestors(self, id=None, revision=None):
- url = urlparse.urljoin(self.repo, 'ancestors')
- page,final_url,info = self.get_post_url(
- url, get=True,
- data_dict={'id':id, 'revision':revision})
- return page.strip('\n').splitlines()
-
- def _children(self, id=None, revision=None):
- url = urlparse.urljoin(self.repo, 'children')
- page,final_url,info = self.get_post_url(
- url, get=True,
- data_dict={'id':id, 'revision':revision})
- return page.strip('\n').splitlines()
-
- def _get(self, id, default=base.InvalidObject, revision=None):
- url = urlparse.urljoin(self.repo, '/'.join(['get', id]))
- try:
- page,final_url,info = self.get_post_url(
- url, get=True,
- data_dict={'revision':revision})
- except libbe.util.http.HTTPError, e:
- if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID):
- raise
- elif default == base.InvalidObject:
- raise base.InvalidID(id)
- return default
- version = info['X-BE-Version']
- if version != libbe.storage.STORAGE_VERSION:
- raise base.InvalidStorageVersion(
- version, libbe.storage.STORAGE_VERSION)
- return page
-
- def _set(self, id, value):
- url = urlparse.urljoin(self.repo, '/'.join(['set', id]))
- try:
- page,final_url,info = self.get_post_url(
- url, get=False,
- data_dict={'value':value})
- except libbe.util.http.HTTPError, e:
- if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID):
- raise
- if e.error.code == HTTP_USER_ERROR \
- and not 'InvalidID' in str(e.error):
- raise base.InvalidDirectory(
- 'Directory %s cannot have data' % id)
- raise base.InvalidID(id)
-
- def _commit(self, summary, body=None, allow_empty=False):
- url = urlparse.urljoin(self.repo, 'commit')
- try:
- page,final_url,info = self.get_post_url(
- url, get=False,
- data_dict={'summary':summary, 'body':body,
- 'allow_empty':allow_empty})
- except libbe.util.http.HTTPError, e:
- if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID):
- raise
- if e.error.code == HTTP_USER_ERROR:
- raise base.EmptyCommit
- raise base.InvalidID(id)
- return page.rstrip('\n')
-
- def revision_id(self, index=None):
- """Return the name of the <index>th revision.
-
- The choice of which branch to follow when crossing
- branches/merges is not defined. Revision indices start at 1;
- ID 0 is the blank repository.
-
- Return None if index==None.
-
- Raises
- ------
- InvalidRevision
- If the specified revision does not exist.
- """
- if index == None:
- return None
- try:
- if int(index) != index:
- raise base.InvalidRevision(index)
- except ValueError:
- raise base.InvalidRevision(index)
- url = urlparse.urljoin(self.repo, 'revision-id')
- try:
- page,final_url,info = self.get_post_url(
- url, get=True,
- data_dict={'index':index})
- except libbe.util.http.HTTPError, e:
- if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID):
- raise
- if e.error.code == HTTP_USER_ERROR:
- raise base.InvalidRevision(index)
- raise base.InvalidID(id)
- return page.rstrip('\n')
-
- def changed(self, revision=None):
- url = urlparse.urljoin(self.repo, 'changed')
- page,final_url,info = self.get_post_url(
- url, get=True,
- data_dict={'revision':revision})
- lines = page.strip('\n')
- new,mod,rem = [p.splitlines() for p in page.split('\n\n')]
- return (new, mod, rem)
-
- def check_storage_version(self):
- version = self.storage_version()
- if version != libbe.storage.STORAGE_VERSION:
- raise base.InvalidStorageVersion(
- version, libbe.storage.STORAGE_VERSION)
-
- def storage_version(self, revision=None):
- url = urlparse.urljoin(self.repo, 'version')
- page,final_url,info = self.get_post_url(
- url, get=True, data_dict={'revision':revision})
- return page.rstrip('\n')
-
-if TESTING == True:
- class TestingHTTP (HTTP):
- name = 'TestingHTTP'
- def __init__(self, repo, *args, **kwargs):
- self._storage_backend = base.VersionedStorage(repo)
- app = libbe.command.serve_storage.ServerApp(
- storage=self._storage_backend)
- self.app = libbe.util.wsgi.BEExceptionApp(app=app)
- HTTP.__init__(self, repo='http://localhost:8000/', *args, **kwargs)
- self.intitialized = False
- # duplicated from libbe.util.wsgi.WSGITestCase
- self.default_environ = {
- 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
- 'REMOTE_ADDR': '192.168.0.123',
- 'SCRIPT_NAME':'',
- 'PATH_INFO': '',
- #'QUERY_STRING':'', # may be empty or absent
- #'CONTENT_TYPE':'', # may be empty or absent
- #'CONTENT_LENGTH':'', # may be empty or absent
- 'SERVER_NAME':'example.com',
- 'SERVER_PORT':'80',
- 'SERVER_PROTOCOL':'HTTP/1.1',
- 'wsgi.version':(1,0),
- 'wsgi.url_scheme':'http',
- 'wsgi.input':StringIO.StringIO(),
- 'wsgi.errors':StringIO.StringIO(),
- 'wsgi.multithread':False,
- 'wsgi.multiprocess':False,
- 'wsgi.run_once':False,
- }
- def getURL(self, app, path='/', method='GET', data=None,
- scheme='http', environ={}):
- # duplicated from libbe.util.wsgi.WSGITestCase
- env = copy.copy(self.default_environ)
- env['PATH_INFO'] = path
- env['REQUEST_METHOD'] = method
- env['scheme'] = scheme
- if data != None:
- enc_data = urllib.urlencode(data)
- if method == 'POST':
- env['CONTENT_LENGTH'] = len(enc_data)
- env['wsgi.input'] = StringIO.StringIO(enc_data)
- else:
- assert method in ['GET', 'HEAD'], method
- env['QUERY_STRING'] = enc_data
- for key,value in environ.items():
- env[key] = value
- try:
- result = app(env, self.start_response)
- except libbe.util.wsgi.HandlerError as e:
- raise libbe.util.http.HTTPError(error=e, url=path, msg=str(e))
- return ''.join(result)
- def start_response(self, status, response_headers, exc_info=None):
- self.status = status
- self.response_headers = response_headers
- self.exc_info = exc_info
- def get_post_url(self, url, get=True, data_dict=None, headers=[]):
- if get == True:
- method = 'GET'
- else:
- method = 'POST'
- scheme,netloc,path,params,query,fragment = urlparse.urlparse(url)
- environ = {}
- for header_name,header_value in headers:
- environ['HTTP_%s' % header_name] = header_value
- output = self.getURL(
- self.app, path, method, data_dict, scheme, environ)
- if self.status != '200 OK':
- class __estr (object):
- def __init__(self, string):
- self.string = string
- self.code = int(string.split()[0])
- def __str__(self):
- return self.string
- error = __estr(self.status)
- raise libbe.util.http.HTTPError(
- error=error, url=url, msg=output)
- info = dict(self.response_headers)
- return (output, url, info)
- def _init(self):
- try:
- HTTP._init(self)
- raise AssertionError
- except base.NotSupported:
- pass
- self._storage_backend._init()
- def _destroy(self):
- try:
- HTTP._destroy(self)
- raise AssertionError
- except base.NotSupported:
- pass
- self._storage_backend._destroy()
- def _connect(self):
- self._storage_backend._connect()
- HTTP._connect(self)
- def _disconnect(self):
- HTTP._disconnect(self)
- self._storage_backend._disconnect()
-
-
- base.make_versioned_storage_testcase_subclasses(
- TestingHTTP, sys.modules[__name__])
-
- unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
- suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])