aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/command/base.py
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2009-12-31 14:32:39 -0500
committerW. Trevor King <wking@drexel.edu>2009-12-31 14:32:39 -0500
commitcfae8a8302f06a84196700138d7ddbb25e91ea31 (patch)
treee5e158f424e01edeb83c0d6132af8b5c1d1808ea /libbe/command/base.py
parent80e76f70d58672167b17ddaced6c7856ba703ece (diff)
downloadbugseverywhere-cfae8a8302f06a84196700138d7ddbb25e91ea31.tar.gz
Added UserInterface and other improved abstractions for command handling
Diffstat (limited to 'libbe/command/base.py')
-rw-r--r--libbe/command/base.py221
1 files changed, 172 insertions, 49 deletions
diff --git a/libbe/command/base.py b/libbe/command/base.py
index 2318aa7..357940f 100644
--- a/libbe/command/base.py
+++ b/libbe/command/base.py
@@ -3,6 +3,7 @@
import codecs
import optparse
import os.path
+import StringIO
import sys
import libbe
@@ -37,14 +38,19 @@ def get_command(command_name):
raise UnknownCommand(command_name)
return cmd
-def get_command_class(module, command_name):
+def get_command_class(module=None, command_name=None):
"""Retrieves a command class from a module.
>>> import_xml_mod = get_command('import-xml')
>>> import_xml = get_command_class(import_xml_mod, 'import-xml')
>>> repr(import_xml)
"<class 'libbe.command.import_xml.Import_XML'>"
+ >>> import_xml = get_command_class(command_name='import-xml')
+ >>> repr(import_xml)
+ "<class 'libbe.command.import_xml.Import_XML'>"
"""
+ if module == None:
+ module = get_command(command_name)
try:
cname = command_name.capitalize().replace('-', '_')
cmd = getattr(module, cname)
@@ -167,7 +173,7 @@ class OptionFormatter (optparse.IndentedHelpFormatter):
return ''.join(ret[:-1])
class Command (object):
- """One-line command description.
+ """One-line command description here.
>>> c = Command()
>>> print c.help()
@@ -183,12 +189,8 @@ class Command (object):
name = 'command'
- def __init__(self, input_encoding=None, output_encoding=None,
- get_unconnected_storage=None, ui=None):
- self.input_encoding = input_encoding
- self.output_encoding = output_encoding
- self.get_unconnected_storage = get_unconnected_storage
- self.ui = ui # calling user-interface, e.g. for Help()
+ def __init__(self, ui=None):
+ self.ui = ui # calling user-interface
self.status = None
self.result = None
self.restrict_file_access = True
@@ -203,6 +205,21 @@ class Command (object):
self.args = []
def run(self, options=None, args=None):
+ self.status = 1 # in case we raise an exception
+ params = self._parse_options_args(options, args)
+ if params['help'] == True:
+ pass
+ else:
+ params.pop('help')
+ if params['complete'] != None:
+ pass
+ else:
+ params.pop('complete')
+
+ self.status = self._run(**params)
+ return self.status
+
+ def _parse_options_args(self, options=None, args=None):
if options == None:
options = {}
if args == None:
@@ -242,33 +259,11 @@ class Command (object):
if len(args) > len(self.args): # add some additional repeats
assert self.args[-1].repeatable == True, self.args[-1].name
params[self.args[-1].name].extend(args[len(self.args):])
-
- if params['help'] == True:
- pass
- else:
- params.pop('help')
- if params['complete'] != None:
- pass
- else:
- params.pop('complete')
-
- self._setup_io(self.input_encoding, self.output_encoding)
- self.status = self._run(**params)
- return self.status
+ return params
def _run(self, **kwargs):
raise NotImplementedError
- def _setup_io(self, input_encoding=None, output_encoding=None):
- if input_encoding == None:
- input_encoding = libbe.util.encoding.get_input_encoding()
- if output_encoding == None:
- output_encoding = libbe.util.encoding.get_output_encoding()
- self.stdin = codecs.getwriter(input_encoding)(sys.stdin)
- self.stdin.encoding = input_encoding
- self.stdout = codecs.getwriter(output_encoding)(sys.stdout)
- self.stdout.encoding = output_encoding
-
def help(self, *args):
return '\n\n'.join([self.usage(),
self._option_help(),
@@ -340,43 +335,171 @@ class Command (object):
raise UserError('file access restricted!\n %s not in %s'
% (path, repo))
- def _get_unconnected_storage(self):
- """Callback for use by commands that need it."""
+ def cleanup(self):
+ pass
+
+class InputOutput (object):
+ def __init__(self, stdin=None, stdout=None):
+ self.stdin = stdin
+ self.stdout = stdout
+
+ def setup_command(self, command):
+ if not hasattr(self.stdin, 'encoding'):
+ self.stdin.encoding = libbe.util.encoding.get_input_encoding()
+ if not hasattr(self.stdout, 'encoding'):
+ self.stdout.encoding = libbe.util.encoding.get_output_encoding()
+ command.stdin = self.stdin
+ command.stdin.encoding = self.stdin.encoding
+ command.stdout = self.stdout
+ command.stdout.encoding = self.stdout.encoding
+
+ def cleanup(self):
+ pass
+
+class StdInputOutput (InputOutput):
+ def __init__(self, input_encoding=None, output_encoding=None):
+ stdin,stdout = self._get_io(input_encoding, output_encoding)
+ InputOutput.__init__(self, stdin, stdout)
+
+ def _get_io(self, input_encoding=None, output_encoding=None):
+ if input_encoding == None:
+ input_encoding = libbe.util.encoding.get_input_encoding()
+ if output_encoding == None:
+ output_encoding = libbe.util.encoding.get_output_encoding()
+ stdin = codecs.getwriter(input_encoding)(sys.stdin)
+ stdin.encoding = input_encoding
+ stdout = codecs.getwriter(output_encoding)(sys.stdout)
+ stdout.encoding = output_encoding
+ return (stdin, stdout)
+
+class StringInputOutput (InputOutput):
+ """
+ >>> s = StringInputOutput()
+ >>> s.set_stdin('hello')
+ >>> s.stdin.read()
+ 'hello'
+ >>> s.stdin.read()
+ >>> print >> s.stdout, 'goodbye'
+ >>> s.get_stdout()
+ 'goodbye\n'
+ >>> s.get_stdout()
+ ''
+
+ Also works with unicode strings
+
+ >>> s.set_stdin(u'hello')
+ >>> s.stdin.read()
+ u'hello'
+ >>> print >> s.stdout, u'goodbye'
+ >>> s.get_stdout()
+ u'goodbye\n'
+ """
+ def __init__(self):
+ stdin = StringIO.StringIO()
+ stdin.encoding = 'utf-8'
+ stdout = StringIO.StringIO()
+ stdout.encoding = 'utf-8'
+ InputOutput.__init__(self, stdin, stdout)
+
+ def set_stdin(self, stdin_string):
+ self.stdin = StringIO.StringIO(stdin_string)
+
+ def get_stdout(self):
+ ret = self.stdout.getvalue()
+ self.stdout = StringIO.StringIO() # clear stdout for next read
+ self.stdin.encoding = 'utf-8'
+ return ret
+
+class UnconnectedStorageGetter (object):
+ def __init__(self, location):
+ self.location = location
+
+ def __call__(self):
+ return libbe.storage.get_storage(self.location)
+
+class StorageCallbacks (object):
+ def __init__(self, location=None):
+ if location == None:
+ location = '.'
+ self.location = location
+ self._get_unconnected_storage = UnconnectedStorageGetter(location)
+
+ def setup_command(self, command):
+ command._get_unconnected_storage = self.get_unconnected_storage
+ command._get_storage = self.get_storage
+ command._get_bugdir = self.get_bugdir
+
+ def get_unconnected_storage(self):
+ """
+ Callback for use by commands that need it.
+
+ The returned Storage instance is may actually be connected,
+ but commands that make use of the returned value should only
+ make use of non-connected Storage methods. This is mainly
+ intended for the init command, which calls Storage.init().
+ """
if not hasattr(self, '_unconnected_storage'):
- if self.get_unconnected_storage == None:
+ if self._get_unconnected_storage == None:
raise NotImplementedError
- self._unconnected_storage = self.get_unconnected_storage()
+ self._unconnected_storage = self._get_unconnected_storage()
return self._unconnected_storage
- def _get_storage(self):
- """
- Callback for use by commands that need it.
+ def set_unconnected_storage(self, unconnected_storage):
+ self._unconnected_storage = unconnected_storage
- Note that with the current implementation,
- _get_unconnected_storage() will not work after this method
- runs, but that shouldn't be an issue for any command I can
- think of...
- """
+ def get_storage(self):
+ """Callback for use by commands that need it."""
if not hasattr(self, '_storage'):
- self._storage = self._get_unconnected_storage()
+ self._storage = self.get_unconnected_storage()
self._storage.connect()
version = self._storage.storage_version()
if version != libbe.storage.STORAGE_VERSION:
raise libbe.storage.InvalidStorageVersion(version)
return self._storage
- def _get_bugdir(self):
+ def set_storage(self, storage):
+ self._storage = storage
+
+ def get_bugdir(self):
"""Callback for use by commands that need it."""
if not hasattr(self, '_bugdir'):
- self._bugdir = libbe.bugdir.BugDir(self._get_storage(), from_storage=True)
+ self._bugdir = libbe.bugdir.BugDir(self.get_storage(),
+ from_storage=True)
return self._bugdir
+ def set_bugdir(self, bugdir):
+ self._bugdir = bugdir
+
+ def cleanup(self):
+ if hasattr(self, '_storage'):
+ self._storage.disconnect()
+
+class UserInterface (object):
+ def __init__(self, io=None, location=None):
+ if io == None:
+ io = StringInputOutput()
+ self.io = io
+ self.storage_callbacks = StorageCallbacks(location)
+ self.restrict_file_access = True
+
+ def help(self):
+ raise NotImplementedError
+
+ def run(self, command, options=None, args=None):
+ command.ui = self
+ self.io.setup_command(command)
+ self.storage_callbacks.setup_command(command)
+ command.restrict_file_access = self.restrict_file_access
+ command._get_user_id = self._get_user_id
+ return command.run(options, args)
+
def _get_user_id(self):
"""Callback for use by commands that need it."""
if not hasattr(self, '_user_id'):
- self._user_id = libbe.ui.util.user.get_user_id(self._get_storage())
+ self._user_id = libbe.ui.util.user.get_user_id(
+ self.storage_callbacks.get_storage())
return self._user_id
def cleanup(self):
- if hasattr(self, '_storage'):
- self._storage.disconnect()
+ self.storage_callbacks.cleanup()
+ self.io.cleanup()