from unittest import TestCase
import unittest
from cherrypy import NotFound
"""A pseudo-REST dispatching method in which only the noun comes from the path.
The action performed will depend on kwargs.
"""
class AmbiguousAction(Exception):
def __init__(self, actions):
Exception.__init__(self, "Supplied action is ambiguous.")
self.actions = actions
def provide_action(name, value):
def provider(func):
func._action_desc = (name, value)
return func
return provider
class PrestHandler(object):
def __init__(self):
object.__init__(self)
self.actions = {}
for member in (getattr(self, m) for m in dir(self)):
if not hasattr(member, '_action_desc'):
continue
name, value = member._action_desc
if name not in self.actions:
self.actions[name] = {}
self.actions[name][value] = member
@classmethod
def add_action(klass, name, value, function):
if name not in klass.actions:
klass.actions[name] = {}
klass.actions[name][value] = function
def decode(self, path, data=None):
"""Convert the path into a handler, a resource, data, and extra_path"""
if data is None:
data = {}
if len(path) < 2 or not (hasattr(self, path[1])):
if len(path) == 0:
resource = None
else:
try:
resource = self.instantiate(**data)
except NotImplementedError, e:
if e.args[0] is not PrestHandler.instantiate:
raise NotFound()
return self, resource, data, path[1:]
if len(path) > 2:
data[path[1]] = path[2]
return getattr(self, path[1]).decode(path[2:], data)
def instantiate(self, **date):
raise NotImplementedError(PrestHandler.instantiate)
def default(self, *args, **kwargs):
child, resource, data, extra = self.decode([None,] + list(args))
action = child.get_action(**kwargs)
new_args = ([data, resource]+extra)
if action is not None:
return action(*new_args, **kwargs)
else:
return child.dispatch(*new_args, **kwargs)
def get_action(self, **kwargs):
"""Return the action requested by kwargs, if any.
Raises AmbiguousAction if more than one action matches.
"""
actions = []
for key in kwargs:
if key in self.actions:
if kwargs[key] in self.actions[key]:
actions.append(self.actions[key][kwargs[key]])
if len(actions) == 0:
return None
elif len(actions) == 1:
return actions[0]
else:
raise AmbiguousAction(actions)
class PrestTester(TestCase):
def test_decode(self):
class ProjectHandler(PrestHandler):
actions = {}
def dispatch(self, project_data, project, *args, **kwargs):
self.project_id = project_data['project']
self.project_data = project_data
self.resource = project
self.args = args
self.kwargs = kwargs
def instantiate(self, project):
return [project]
@provide_action('action', 'Save')
def save(self, project_data, project, *args, **kwargs):
self.action = "save"
@provide_action('behavior', 'Update')
def update(self, project_data, project, *args, **kwargs):
self.action = "update"
foo = PrestHandler()
foo.project = ProjectHandler()
handler, resource, data, extra = foo.decode([None, 'project', '83',
'bloop', 'yeah'])
assert handler is foo.project
self.assertEqual({'project': '83'}, data)
self.assertEqual(['bloop', 'yeah'], extra)
foo.default(*['project', '27', 'extra'], **{'a':'b', 'b':'97'})
self.assertEqual(foo.project.args, ('extra',))
self.assertEqual(foo.project.kwargs, {'a':'b', 'b':'97'})
self.assertEqual(foo.project.project_data, {'project': '27'})
self.assertEqual(foo.project.resource, ['27'])
foo.default(*['project', '27', 'extra'], **{'action':'Save', 'b':'97'})
self.assertEqual(foo.project.action, 'save')
foo.default(*['project', '27', 'extra'],
**{'behavior':'Update', 'b':'97'})
self.assertEqual(foo.project.action, 'update')
self.assertRaises(AmbiguousAction, foo.default,
*['project', '27', 'extra'],
**{'behavior':'Update', 'action':'Save', 'b':'97'})
class BugHandler(PrestHandler):
actions = {}
def dispatch(self, bug_data, bug, *args, **kwargs):
self.project_id = project_data['project']
self.project_data = project_data
self.resource = project
self.args = args
self.kwargs = kwargs
def instantiate(self, project, bug):
return [project, bug]
@provide_action('action', 'Save')
def save(self, project_data, project, *args, **kwargs):
self.action = "save"
@provide_action('behavior', 'Update')
def update(self, project_data, project, *args, **kwargs):
self.action = "update"
foo.project.bug = BugHandler()
handler, resource, data, extra = foo.decode([None, 'project', '83',
'bug', '92'])
assert handler is foo.project.bug
self.assertEqual(resource[0], '83')
self.assertEqual(resource[1], '92')
self.assertEqual([], extra)
self.assertEqual(data['project'], '83')
self.assertEqual(data['bug'], '92')
def test():
patchesTestSuite = unittest.makeSuite(PrestTester,'test')
runner = unittest.TextTestRunner(verbosity=0)
return runner.run(patchesTestSuite)
if __name__ == "__main__":
test()