aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage/http.py
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2010-01-27 08:07:31 -0500
committerW. Trevor King <wking@drexel.edu>2010-01-27 08:07:31 -0500
commite0d0e0825add948a89c8ad305a3b259b743ec91d (patch)
treef5561aa2391b2c5bc6153fa3c60ed52640d21aca /libbe/storage/http.py
parent2cc9755aef6a7b36bf1b32519b5997b5221f4e5a (diff)
downloadbugseverywhere-e0d0e0825add948a89c8ad305a3b259b743ec91d.tar.gz
Streamlined libbe.command.serve, adding --auth option, #/bea/c1b#, and testing.
Diffstat (limited to 'libbe/storage/http.py')
-rw-r--r--libbe/storage/http.py177
1 files changed, 158 insertions, 19 deletions
diff --git a/libbe/storage/http.py b/libbe/storage/http.py
index 2de2aff..5606383 100644
--- a/libbe/storage/http.py
+++ b/libbe/storage/http.py
@@ -36,9 +36,14 @@ import base
from libbe import TESTING
if TESTING == True:
+ import copy
import doctest
+ import StringIO
import unittest
+ import libbe.bugdir
+ import libbe.command.serve
+
USER_AGENT = 'BE-HTTP-Storage'
HTTP_OK = 200
@@ -60,7 +65,7 @@ class InvalidURL (Exception):
return self.error.__str__()
return self.msg
-def get_post_url(url, get=True, data_dict=None):
+def get_post_url(url, get=True, data_dict=None, headers=[]):
"""
get: use GET if True, otherwise use POST.
data_dict: dict of data to send.
@@ -75,7 +80,8 @@ def get_post_url(url, get=True, data_dict=None):
data = None
else:
data = urllib.urlencode(data_dict)
- headers = {'User-Agent':USER_AGENT}
+ headers = dict(headers)
+ headers['User-Agent'] = USER_AGENT
req = urllib2.Request(url, data=data, headers=headers)
try:
response = urllib2.urlopen(req)
@@ -101,8 +107,37 @@ class HTTP (base.VersionedStorage):
"""
name = 'HTTP'
- def __init__(self, *args, **kwargs):
- base.VersionedStorage.__init__(self, *args, **kwargs)
+ def __init__(self, repo, *args, **kwargs):
+ repo,self.uname,self.password = self.parse_repo(repo)
+ base.VersionedStorage.__init__(self, repo, *args, **kwargs)
+
+ def parse_repo(self, repo):
+ """Grab username and password (if any) from the repo URL.
+ >>> s = HTTP('http://host.com/path/to/repo')
+ >>> s.repo
+ 'http://host.com/path/to/repo'
+ >>> s.uname == None
+ True
+ >>> s.password == None
+ True
+ >>> s.parse_repo('http://joe:secret@host.com/path/to/repo')
+ ('http://host.com/path/to/repo', 'joe', 'secret')
+ """
+ scheme,netloc,path,params,query,fragment = urlparse.urlparse(repo)
+ parts = netloc.split('@', 1)
+ if len(parts) == 2:
+ uname,password = parts[0].split(':')
+ repo = urlparse.urlunparse(
+ (scheme, parts[1], path, params, query, fragment))
+ else:
+ uname,password = (None, None)
+ return (repo, uname, password)
+
+ def get_post_url(self, url, get=True, data_dict=None, headers=[]):
+ if self.uname != None and self.password != None:
+ headers.append(('Authorization','Basic %s' % \
+ ('%s:%s' % (self.uname, self.password)).encode('base64')))
+ return get_post_url(url, get, data_dict, headers)
def storage_version(self, revision=None):
"""Return the storage format for this backend."""
@@ -126,32 +161,41 @@ class HTTP (base.VersionedStorage):
def _add(self, id, parent=None, directory=False):
url = urlparse.urljoin(self.repo, 'add')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'id':id, 'parent':parent, 'directory':directory})
+ def _exists(self, id, revision=None):
+ url = urlparse.urljoin(self.repo, 'exists')
+ page,final_url,info = self.get_post_url(
+ url, get=True,
+ data_dict={'id':id, 'revision':revision})
+ if page == 'True':
+ return True
+ return False
+
def _remove(self, id):
url = urlparse.urljoin(self.repo, 'remove')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'id':id, 'recursive':False})
def _recursive_remove(self, id):
url = urlparse.urljoin(self.repo, 'remove')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'id':id, 'recursive':True})
def _ancestors(self, id=None, revision=None):
url = urlparse.urljoin(self.repo, 'ancestors')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'id':id, 'revision':revision})
return page.strip('\n').splitlines()
def _children(self, id=None, revision=None):
url = urlparse.urljoin(self.repo, 'children')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'id':id, 'revision':revision})
return page.strip('\n').splitlines()
@@ -159,7 +203,7 @@ class HTTP (base.VersionedStorage):
def _get(self, id, default=base.InvalidObject, revision=None):
url = urlparse.urljoin(self.repo, '/'.join(['get', id]))
try:
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'revision':revision})
except InvalidURL, e:
@@ -177,13 +221,14 @@ class HTTP (base.VersionedStorage):
def _set(self, id, value):
url = urlparse.urljoin(self.repo, '/'.join(['set', id]))
try:
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'value':value})
except InvalidURL, e:
if not (hasattr(e.error, 'code') and e.error.code in HTTP_VALID):
raise
- if e.error.code == HTTP_USER_ERROR:
+ if e.error.code == HTTP_USER_ERROR \
+ and not 'InvalidID' in str(e.error):
raise base.InvalidDirectory(
'Directory %s cannot have data' % id)
raise base.InvalidID(id)
@@ -191,7 +236,7 @@ class HTTP (base.VersionedStorage):
def _commit(self, summary, body=None, allow_empty=False):
url = urlparse.urljoin(self.repo, 'commit')
try:
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=False,
data_dict={'summary':summary, 'body':body,
'allow_empty':allow_empty})
@@ -218,12 +263,12 @@ class HTTP (base.VersionedStorage):
return None
try:
if int(index) != index:
- raise InvalidRevision(index)
+ raise base.InvalidRevision(index)
except ValueError:
- raise InvalidRevision(index)
+ raise base.InvalidRevision(index)
url = urlparse.urljoin(self.repo, 'revision-id')
try:
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'index':index})
except InvalidURL, e:
@@ -236,7 +281,7 @@ class HTTP (base.VersionedStorage):
def changed(self, revision=None):
url = urlparse.urljoin(self.repo, 'changed')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True,
data_dict={'revision':revision})
lines = page.strip('\n')
@@ -251,7 +296,7 @@ class HTTP (base.VersionedStorage):
def storage_version(self, revision=None):
url = urlparse.urljoin(self.repo, 'version')
- page,final_url,info = get_post_url(
+ page,final_url,info = self.get_post_url(
url, get=True, data_dict={'revision':revision})
return page.rstrip('\n')
@@ -272,7 +317,101 @@ if TESTING == True:
'Redirect?\n Expected: "%s"\n Got: "%s"'
% (expected, final_url))
- #make_storage_testcase_subclasses(VersionedStorage, sys.modules[__name__])
+ class TestingHTTP (HTTP):
+ name = 'TestingHTTP'
+ def __init__(self, repo, *args, **kwargs):
+ self._storage_backend = base.VersionedStorage(repo)
+ self.app = libbe.command.serve.ServerApp(
+ storage=self._storage_backend)
+ HTTP.__init__(self, repo='http://localhost:8000/', *args, **kwargs)
+ self.intitialized = False
+ # duplicated from libbe.storage.serve.WSGITestCase
+ self.default_environ = {
+ 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
+ 'SCRIPT_NAME':'',
+ 'PATH_INFO': '',
+ #'QUERY_STRING':'', # may be empty or absent
+ #'CONTENT_TYPE':'', # may be empty or absent
+ #'CONTENT_LENGTH':'', # may be empty or absent
+ 'SERVER_NAME':'example.com',
+ 'SERVER_PORT':'80',
+ 'SERVER_PROTOCOL':'HTTP/1.1',
+ 'wsgi.version':(1,0),
+ 'wsgi.url_scheme':'http',
+ 'wsgi.input':StringIO.StringIO(),
+ 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.multithread':False,
+ 'wsgi.multiprocess':False,
+ 'wsgi.run_once':False,
+ }
+ def getURL(self, app, path='/', method='GET', data=None,
+ scheme='http', environ={}):
+ # duplicated from libbe.storage.serve.WSGITestCase
+ env = copy.copy(self.default_environ)
+ env['PATH_INFO'] = path
+ env['REQUEST_METHOD'] = method
+ env['scheme'] = scheme
+ if data != None:
+ enc_data = urllib.urlencode(data)
+ if method == 'POST':
+ env['CONTENT_LENGTH'] = len(enc_data)
+ env['wsgi.input'] = StringIO.StringIO(enc_data)
+ else:
+ assert method in ['GET', 'HEAD'], method
+ env['QUERY_STRING'] = enc_data
+ for key,value in environ.items():
+ env[key] = value
+ return ''.join(app(env, self.start_response))
+ def start_response(self, status, response_headers, exc_info=None):
+ self.status = status
+ self.response_headers = response_headers
+ self.exc_info = exc_info
+ def get_post_url(self, url, get=True, data_dict=None, headers=[]):
+ if get == True:
+ method = 'GET'
+ else:
+ method = 'POST'
+ scheme,netloc,path,params,query,fragment = urlparse.urlparse(url)
+ environ = {}
+ for header_name,header_value in headers:
+ environ['HTTP_%s' % header_name] = header_value
+ output = self.getURL(
+ self.app, path, method, data_dict, scheme, environ)
+ if self.status != '200 OK':
+ class __estr (object):
+ def __init__(self, string):
+ self.string = string
+ self.code = int(string.split()[0])
+ def __str__(self):
+ return self.string
+ error = __estr(self.status)
+ raise InvalidURL(error=error, url=url, msg=output)
+ info = dict(self.response_headers)
+ return (output, url, info)
+ def _init(self):
+ try:
+ HTTP._init(self)
+ raise AssertionError
+ except base.NotSupported:
+ pass
+ self._storage_backend._init()
+ def _destroy(self):
+ try:
+ HTTP._destroy(self)
+ raise AssertionError
+ except base.NotSupported:
+ pass
+ self._storage_backend._destroy()
+ def _connect(self):
+ self._storage_backend._connect()
+ HTTP._connect(self)
+ def _disconnect(self):
+ HTTP._disconnect(self)
+ self._storage_backend._disconnect()
+
+
+ base.make_versioned_storage_testcase_subclasses(
+ TestingHTTP, sys.modules[__name__])
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])