aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/command
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/command')
-rw-r--r--libbe/command/__init__.py40
-rw-r--r--libbe/command/assign.py98
-rw-r--r--libbe/command/base.py554
-rw-r--r--libbe/command/close.py60
-rw-r--r--libbe/command/comment.py169
-rw-r--r--libbe/command/commit.py93
-rw-r--r--libbe/command/depend.py408
-rw-r--r--libbe/command/diff.py139
-rw-r--r--libbe/command/due.py117
-rw-r--r--libbe/command/help.py82
-rw-r--r--libbe/command/html.py686
-rw-r--r--libbe/command/import_xml.py541
-rw-r--r--libbe/command/init.py132
-rw-r--r--libbe/command/list.py279
-rw-r--r--libbe/command/merge.py189
-rw-r--r--libbe/command/new.py103
-rw-r--r--libbe/command/open.py58
-rw-r--r--libbe/command/remove.py79
-rw-r--r--libbe/command/serve.py1172
-rw-r--r--libbe/command/set.py144
-rw-r--r--libbe/command/severity.py98
-rw-r--r--libbe/command/show.py207
-rw-r--r--libbe/command/status.py108
-rw-r--r--libbe/command/subscribe.py385
-rw-r--r--libbe/command/tag.py152
-rw-r--r--libbe/command/target.py209
-rw-r--r--libbe/command/util.py203
27 files changed, 6505 insertions, 0 deletions
diff --git a/libbe/command/__init__.py b/libbe/command/__init__.py
new file mode 100644
index 0000000..0c8d4ff
--- /dev/null
+++ b/libbe/command/__init__.py
@@ -0,0 +1,40 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import base
+
+UserError = base.UserError
+UnknownCommand = base.UnknownCommand
+get_command = base.get_command
+get_command_class = base.get_command_class
+commands = base.commands
+Option = base.Option
+Argument = base.Argument
+Command = base.Command
+InputOutput = base.InputOutput
+StdInputOutput = base.StdInputOutput
+StringInputOutput = base.StringInputOutput
+UnconnectedStorageGetter = base.UnconnectedStorageGetter
+StorageCallbacks = base.StorageCallbacks
+UserInterface = base.UserInterface
+
+__all__ = [UserError, UnknownCommand,
+ get_command, get_command_class, commands,
+ Option, Argument, Command,
+ InputOutput, StdInputOutput, StringInputOutput,
+ StorageCallbacks, UnconnectedStorageGetter,
+ UserInterface]
diff --git a/libbe/command/assign.py b/libbe/command/assign.py
new file mode 100644
index 0000000..6abf05e
--- /dev/null
+++ b/libbe/command/assign.py
@@ -0,0 +1,98 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# Marien Zwart <marienz@gentoo.org>
+# Thomas Gerigk <tgerigk@gmx.de>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.command
+import libbe.command.util
+
+
+class Assign (libbe.command.Command):
+ u"""Assign an individual or group to fix a bug
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Assign(ui=ui)
+
+ >>> bd.bug_from_uuid('a').assigned is None
+ True
+ >>> ui._user_id = u'Fran\xe7ois'
+ >>> ret = ui.run(cmd, args=['-', '/a'])
+ >>> bd.flush_reload()
+ >>> bd.bug_from_uuid('a').assigned
+ u'Fran\\xe7ois'
+
+ >>> ret = ui.run(cmd, args=['someone', '/a', '/b'])
+ >>> bd.flush_reload()
+ >>> bd.bug_from_uuid('a').assigned
+ 'someone'
+ >>> bd.bug_from_uuid('b').assigned
+ 'someone'
+
+ >>> ret = ui.run(cmd, args=['none', '/a'])
+ >>> bd.flush_reload()
+ >>> bd.bug_from_uuid('a').assigned is None
+ True
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'assign'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='assigned', metavar='ASSIGNED', default=None,
+ completion_callback=libbe.command.util.complete_assigned),
+ libbe.command.Argument(
+ name='bug-id', metavar='BUG-ID', default=None,
+ repeatable=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ ])
+
+ def _run(self, **params):
+ assigned = params['assigned']
+ if assigned == 'none':
+ assigned = None
+ elif assigned == '-':
+ assigned = self._get_user_id()
+ bugdir = self._get_bugdir()
+ for bug_id in params['bug-id']:
+ bug,dummy_comment = \
+ libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
+ if bug.assigned != assigned:
+ bug.assigned = assigned
+ return 0
+
+ def _long_help(self):
+ return """
+Assign a person to fix a bug.
+
+Assigneds should be the person's Bugs Everywhere identity, the same
+string that appears in Creator fields.
+
+Special assigned strings:
+ "-" assign the bug to yourself
+ "none" un-assigns the bug
+"""
diff --git a/libbe/command/base.py b/libbe/command/base.py
new file mode 100644
index 0000000..6b1c050
--- /dev/null
+++ b/libbe/command/base.py
@@ -0,0 +1,554 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import codecs
+import optparse
+import os.path
+import StringIO
+import sys
+
+import libbe
+import libbe.storage
+import libbe.ui.util.user
+import libbe.util.encoding
+import libbe.util.plugin
+
+class UserError(Exception):
+ pass
+
+class UnknownCommand(UserError):
+ def __init__(self, cmd):
+ Exception.__init__(self, "Unknown command '%s'" % cmd)
+ self.cmd = cmd
+
+def get_command(command_name):
+ """Retrieves the module for a user command
+
+ >>> try:
+ ... get_command('asdf')
+ ... except UnknownCommand, e:
+ ... print e
+ Unknown command 'asdf'
+ >>> repr(get_command('list')).startswith("<module 'libbe.command.list' from ")
+ True
+ """
+ try:
+ cmd = libbe.util.plugin.import_by_name(
+ 'libbe.command.%s' % command_name.replace("-", "_"))
+ except ImportError, e:
+ raise UnknownCommand(command_name)
+ return cmd
+
+def get_command_class(module=None, command_name=None):
+ """Retrieves a command class from a module.
+
+ >>> import_xml_mod = get_command('import-xml')
+ >>> import_xml = get_command_class(import_xml_mod, 'import-xml')
+ >>> repr(import_xml)
+ "<class 'libbe.command.import_xml.Import_XML'>"
+ >>> import_xml = get_command_class(command_name='import-xml')
+ >>> repr(import_xml)
+ "<class 'libbe.command.import_xml.Import_XML'>"
+ """
+ if module == None:
+ module = get_command(command_name)
+ try:
+ cname = command_name.capitalize().replace('-', '_')
+ cmd = getattr(module, cname)
+ except ImportError, e:
+ raise UnknownCommand(command_name)
+ return cmd
+
+def modname_to_command_name(modname):
+ """Little hack to replicate
+ >>> import sys
+ >>> def real_modname_to_command_name(modname):
+ ... mod = libbe.util.plugin.import_by_name(
+ ... 'libbe.command.%s' % modname)
+ ... attrs = [getattr(mod, name) for name in dir(mod)]
+ ... commands = []
+ ... for attr_name in dir(mod):
+ ... attr = getattr(mod, attr_name)
+ ... try:
+ ... if issubclass(attr, Command):
+ ... commands.append(attr)
+ ... except TypeError, e:
+ ... pass
+ ... if len(commands) == 0:
+ ... raise Exception('No Command classes in %s' % dir(mod))
+ ... return commands[0].name
+ >>> real_modname_to_command_name('new')
+ 'new'
+ >>> real_modname_to_command_name('import_xml')
+ 'import-xml'
+ """
+ return modname.replace('_', '-')
+
+def commands(command_names=False):
+ for modname in libbe.util.plugin.modnames('libbe.command'):
+ if modname not in ['base', 'util']:
+ if command_names == False:
+ yield modname
+ else:
+ yield modname_to_command_name(modname)
+
+class CommandInput (object):
+ def __init__(self, name, help=''):
+ self.name = name
+ self.help = help
+
+ def __str__(self):
+ return '<%s %s>' % (self.__class__.__name__, self.name)
+
+ def __repr__(self):
+ return self.__str__()
+
+class Argument (CommandInput):
+ def __init__(self, metavar=None, default=None, type='string',
+ optional=False, repeatable=False,
+ completion_callback=None, *args, **kwargs):
+ CommandInput.__init__(self, *args, **kwargs)
+ self.metavar = metavar
+ self.default = default
+ self.type = type
+ self.optional = optional
+ self.repeatable = repeatable
+ self.completion_callback = completion_callback
+ if self.metavar == None:
+ self.metavar = self.name.upper()
+
+class Option (CommandInput):
+ def __init__(self, callback=None, short_name=None, arg=None,
+ *args, **kwargs):
+ CommandInput.__init__(self, *args, **kwargs)
+ self.callback = callback
+ self.short_name = short_name
+ self.arg = arg
+ if self.arg == None and self.callback == None:
+ # use an implicit boolean argument
+ self.arg = Argument(name=self.name, help=self.help,
+ default=False, type='bool')
+ self.validate()
+
+ def validate(self):
+ if self.arg == None:
+ assert self.callback != None, self.name
+ return
+ assert self.callback == None, '%s: %s' (self.name, self.callback)
+ assert self.arg.name == self.name, \
+ 'Name missmatch: %s != %s' % (self.arg.name, self.name)
+ assert self.arg.optional == False, self.name
+ assert self.arg.repeatable == False, self.name
+
+ def __str__(self):
+ return '--%s' % self.name
+
+ def __repr__(self):
+ return '<Option %s>' % self.__str__()
+
+class _DummyParser (optparse.OptionParser):
+ def __init__(self, command):
+ optparse.OptionParser.__init__(self)
+ self.remove_option('-h')
+ self.command = command
+ self._command_opts = []
+ for option in self.command.options:
+ self._add_option(option)
+
+ def _add_option(self, option):
+ # from libbe.ui.command_line.CmdOptionParser._add_option
+ option.validate()
+ long_opt = '--%s' % option.name
+ if option.short_name != None:
+ short_opt = '-%s' % option.short_name
+ assert '_' not in option.name, \
+ 'Non-reconstructable option name %s' % option.name
+ kwargs = {'dest':option.name.replace('-', '_'),
+ 'help':option.help}
+ if option.arg == None or option.arg.type == 'bool':
+ kwargs['action'] = 'store_true'
+ kwargs['metavar'] = None
+ kwargs['default'] = False
+ else:
+ kwargs['type'] = option.arg.type
+ kwargs['action'] = 'store'
+ kwargs['metavar'] = option.arg.metavar
+ kwargs['default'] = option.arg.default
+ if option.short_name != None:
+ opt = optparse.Option(short_opt, long_opt, **kwargs)
+ else:
+ opt = optparse.Option(long_opt, **kwargs)
+ #option.takes_value = lambda : option.arg != None
+ opt._option = option
+ self._command_opts.append(opt)
+ self.add_option(opt)
+
+class OptionFormatter (optparse.IndentedHelpFormatter):
+ def __init__(self, command):
+ optparse.IndentedHelpFormatter.__init__(self)
+ self.command = command
+ def option_help(self):
+ # based on optparse.OptionParser.format_option_help()
+ parser = _DummyParser(self.command)
+ self.store_option_strings(parser)
+ ret = []
+ ret.append(self.format_heading('Options'))
+ self.indent()
+ for option in parser._command_opts:
+ ret.append(self.format_option(option))
+ ret.append('\n')
+ self.dedent()
+ # Drop the last '\n', or the header if no options or option groups:
+ return ''.join(ret[:-1])
+
+class Command (object):
+ """One-line command description here.
+
+ >>> c = Command()
+ >>> print c.help()
+ usage: be command [options]
+ <BLANKLINE>
+ Options:
+ -h, --help Print a help message.
+ <BLANKLINE>
+ --complete Print a list of possible completions.
+ <BLANKLINE>
+ A detailed help message.
+ """
+
+ name = 'command'
+
+ def __init__(self, ui=None):
+ self.ui = ui # calling user-interface
+ self.status = None
+ self.result = None
+ self.restrict_file_access = True
+ self.options = [
+ Option(name='help', short_name='h',
+ help='Print a help message.',
+ callback=self.help),
+ Option(name='complete',
+ help='Print a list of possible completions.',
+ callback=self.complete),
+ ]
+ self.args = []
+
+ def run(self, options=None, args=None):
+ self.status = 1 # in case we raise an exception
+ params = self._parse_options_args(options, args)
+ if params['help'] == True:
+ pass
+ else:
+ params.pop('help')
+ if params['complete'] != None:
+ pass
+ else:
+ params.pop('complete')
+
+ self.status = self._run(**params)
+ return self.status
+
+ def _parse_options_args(self, options=None, args=None):
+ if options == None:
+ options = {}
+ if args == None:
+ args = []
+ params = {}
+ for option in self.options:
+ assert option.name not in params, params[option.name]
+ if option.name in options:
+ params[option.name] = options.pop(option.name)
+ elif option.arg != None:
+ params[option.name] = option.arg.default
+ else: # non-arg options are flags, set to default flag value
+ params[option.name] = False
+ assert 'user-id' not in params, params['user-id']
+ if 'user-id' in options:
+ self._user_id = options.pop('user-id')
+ if len(options) > 0:
+ raise UserError, 'Invalid option passed to command %s:\n %s' \
+ % (self.name, '\n '.join(['%s: %s' % (k,v)
+ for k,v in options.items()]))
+ in_optional_args = False
+ for i,arg in enumerate(self.args):
+ if arg.repeatable == True:
+ assert i == len(self.args)-1, arg.name
+ if in_optional_args == True:
+ assert arg.optional == True, arg.name
+ else:
+ in_optional_args = arg.optional
+ if i < len(args):
+ if arg.repeatable == True:
+ params[arg.name] = [args[i]]
+ else:
+ params[arg.name] = args[i]
+ else: # no value given
+ assert in_optional_args == True, arg.name
+ params[arg.name] = arg.default
+ if len(args) > len(self.args): # add some additional repeats
+ assert self.args[-1].repeatable == True, self.args[-1].name
+ params[self.args[-1].name].extend(args[len(self.args):])
+ return params
+
+ def _run(self, **kwargs):
+ raise NotImplementedError
+
+ def help(self, *args):
+ return '\n\n'.join([self.usage(),
+ self._option_help(),
+ self._long_help().rstrip('\n')])
+
+ def usage(self):
+ usage = 'usage: be %s [options]' % self.name
+ num_optional = 0
+ for arg in self.args:
+ usage += ' '
+ if arg.optional == True:
+ usage += '['
+ num_optional += 1
+ usage += arg.metavar
+ if arg.repeatable == True:
+ usage += ' ...'
+ usage += ']'*num_optional
+ return usage
+
+ def _option_help(self):
+ o = OptionFormatter(self)
+ return o.option_help().strip('\n')
+
+ def _long_help(self):
+ return "A detailed help message."
+
+ def complete(self, argument=None, fragment=None):
+ if argument == None:
+ ret = ['--%s' % o.name for o in self.options]
+ if len(self.args) > 0 and self.args[0].completion_callback != None:
+ ret.extend(self.args[0].completion_callback(self, argument, fragment))
+ return ret
+ elif argument.completion_callback != None:
+ # finish a particular argument
+ return argument.completion_callback(self, argument, fragment)
+ return [] # the particular argument doesn't supply completion info
+
+ def _check_restricted_access(self, storage, path):
+ """
+ Check that the file at path is inside bugdir.root. This is
+ important if you allow other users to execute becommands with
+ your username (e.g. if you're running be-handle-mail through
+ your ~/.procmailrc). If this check wasn't made, a user could
+ e.g. run
+ be commit -b ~/.ssh/id_rsa "Hack to expose ssh key"
+ which would expose your ssh key to anyone who could read the
+ VCS log.
+
+ >>> class DummyStorage (object): pass
+ >>> s = DummyStorage()
+ >>> s.repo = os.path.expanduser('~/x/')
+ >>> c = Command()
+ >>> try:
+ ... c._check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
+ ... except UserError, e:
+ ... assert str(e).startswith('file access restricted!'), str(e)
+ ... print 'we got the expected error'
+ we got the expected error
+ >>> c._check_restricted_access(s, os.path.expanduser('~/x'))
+ >>> c._check_restricted_access(s, os.path.expanduser('~/x/y'))
+ >>> c.restrict_file_access = False
+ >>> c._check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
+ """
+ if self.restrict_file_access == True:
+ path = os.path.abspath(path)
+ repo = os.path.abspath(storage.repo).rstrip(os.path.sep)
+ if path == repo or path.startswith(repo+os.path.sep):
+ return
+ raise UserError('file access restricted!\n %s not in %s'
+ % (path, repo))
+
+ def cleanup(self):
+ pass
+
+class InputOutput (object):
+ def __init__(self, stdin=None, stdout=None):
+ self.stdin = stdin
+ self.stdout = stdout
+
+ def setup_command(self, command):
+ if not hasattr(self.stdin, 'encoding'):
+ self.stdin.encoding = libbe.util.encoding.get_input_encoding()
+ if not hasattr(self.stdout, 'encoding'):
+ self.stdout.encoding = libbe.util.encoding.get_output_encoding()
+ command.stdin = self.stdin
+ command.stdin.encoding = self.stdin.encoding
+ command.stdout = self.stdout
+ command.stdout.encoding = self.stdout.encoding
+
+ def cleanup(self):
+ pass
+
+class StdInputOutput (InputOutput):
+ def __init__(self, input_encoding=None, output_encoding=None):
+ stdin,stdout = self._get_io(input_encoding, output_encoding)
+ InputOutput.__init__(self, stdin, stdout)
+
+ def _get_io(self, input_encoding=None, output_encoding=None):
+ if input_encoding == None:
+ input_encoding = libbe.util.encoding.get_input_encoding()
+ if output_encoding == None:
+ output_encoding = libbe.util.encoding.get_output_encoding()
+ stdin = codecs.getwriter(input_encoding)(sys.stdin)
+ stdin.encoding = input_encoding
+ stdout = codecs.getwriter(output_encoding)(sys.stdout)
+ stdout.encoding = output_encoding
+ return (stdin, stdout)
+
+class StringInputOutput (InputOutput):
+ """
+ >>> s = StringInputOutput()
+ >>> s.set_stdin('hello')
+ >>> s.stdin.read()
+ 'hello'
+ >>> s.stdin.read()
+ ''
+ >>> print >> s.stdout, 'goodbye'
+ >>> s.get_stdout()
+ 'goodbye\\n'
+ >>> s.get_stdout()
+ ''
+
+ Also works with unicode strings
+
+ >>> s.set_stdin(u'hello')
+ >>> s.stdin.read()
+ u'hello'
+ >>> print >> s.stdout, u'goodbye'
+ >>> s.get_stdout()
+ u'goodbye\\n'
+ """
+ def __init__(self):
+ stdin = StringIO.StringIO()
+ stdin.encoding = 'utf-8'
+ stdout = StringIO.StringIO()
+ stdout.encoding = 'utf-8'
+ InputOutput.__init__(self, stdin, stdout)
+
+ def set_stdin(self, stdin_string):
+ self.stdin = StringIO.StringIO(stdin_string)
+
+ def get_stdout(self):
+ ret = self.stdout.getvalue()
+ self.stdout = StringIO.StringIO() # clear stdout for next read
+ self.stdin.encoding = 'utf-8'
+ return ret
+
+class UnconnectedStorageGetter (object):
+ def __init__(self, location):
+ self.location = location
+
+ def __call__(self):
+ return libbe.storage.get_storage(self.location)
+
+class StorageCallbacks (object):
+ def __init__(self, location=None):
+ if location == None:
+ location = '.'
+ self.location = location
+ self._get_unconnected_storage = UnconnectedStorageGetter(location)
+
+ def setup_command(self, command):
+ command._get_unconnected_storage = self.get_unconnected_storage
+ command._get_storage = self.get_storage
+ command._get_bugdir = self.get_bugdir
+
+ def get_unconnected_storage(self):
+ """
+ Callback for use by commands that need it.
+
+ The returned Storage instance is may actually be connected,
+ but commands that make use of the returned value should only
+ make use of non-connected Storage methods. This is mainly
+ intended for the init command, which calls Storage.init().
+ """
+ if not hasattr(self, '_unconnected_storage'):
+ if self._get_unconnected_storage == None:
+ raise NotImplementedError
+ self._unconnected_storage = self._get_unconnected_storage()
+ return self._unconnected_storage
+
+ def set_unconnected_storage(self, unconnected_storage):
+ self._unconnected_storage = unconnected_storage
+
+ def get_storage(self):
+ """Callback for use by commands that need it."""
+ if not hasattr(self, '_storage'):
+ self._storage = self.get_unconnected_storage()
+ self._storage.connect()
+ version = self._storage.storage_version()
+ if version != libbe.storage.STORAGE_VERSION:
+ raise libbe.storage.InvalidStorageVersion(version)
+ return self._storage
+
+ def set_storage(self, storage):
+ self._storage = storage
+
+ def get_bugdir(self):
+ """Callback for use by commands that need it."""
+ if not hasattr(self, '_bugdir'):
+ self._bugdir = libbe.bugdir.BugDir(self.get_storage(),
+ from_storage=True)
+ return self._bugdir
+
+ def set_bugdir(self, bugdir):
+ self._bugdir = bugdir
+
+ def cleanup(self):
+ if hasattr(self, '_storage'):
+ self._storage.disconnect()
+
+class UserInterface (object):
+ def __init__(self, io=None, location=None):
+ if io == None:
+ io = StringInputOutput()
+ self.io = io
+ self.storage_callbacks = StorageCallbacks(location)
+ self.restrict_file_access = True
+
+ def help(self):
+ raise NotImplementedError
+
+ def run(self, command, options=None, args=None):
+ self.setup_command(command)
+ return command.run(options, args)
+
+ def setup_command(self, command):
+ if command.ui == None:
+ command.ui = self
+ if self.io != None:
+ self.io.setup_command(command)
+ if self.storage_callbacks != None:
+ self.storage_callbacks.setup_command(command)
+ command.restrict_file_access = self.restrict_file_access
+ command._get_user_id = self._get_user_id
+
+ def _get_user_id(self):
+ """Callback for use by commands that need it."""
+ if not hasattr(self, '_user_id'):
+ self._user_id = libbe.ui.util.user.get_user_id(
+ self.storage_callbacks.get_storage())
+ return self._user_id
+
+ def cleanup(self):
+ self.storage_callbacks.cleanup()
+ self.io.cleanup()
diff --git a/libbe/command/close.py b/libbe/command/close.py
new file mode 100644
index 0000000..0532ed2
--- /dev/null
+++ b/libbe/command/close.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Marien Zwart <marienz@gentoo.org>
+# Thomas Gerigk <tgerigk@gmx.de>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""Close a bug"""
+from libbe import cmdutil, bugdir
+__desc__ = __doc__
+
+def execute(args, manipulate_encodings=True):
+ """
+ >>> from libbe import bugdir
+ >>> import os
+ >>> bd = bugdir.SimpleBugDir()
+ >>> os.chdir(bd.root)
+ >>> print bd.bug_from_shortname("a").status
+ open
+ >>> execute(["a"], manipulate_encodings=False)
+ >>> bd._clear_bugs()
+ >>> print bd.bug_from_shortname("a").status
+ closed
+ >>> bd.cleanup()
+ """
+ parser = get_parser()
+ options, args = parser.parse_args(args)
+ cmdutil.default_complete(options, args, parser,
+ bugid_args={0: lambda bug : bug.active==True})
+ if len(args) == 0:
+ raise cmdutil.UsageError("Please specify a bug id.")
+ if len(args) > 1:
+ raise cmdutil.UsageError("Too many arguments.")
+ bd = bugdir.BugDir(from_disk=True,
+ manipulate_encodings=manipulate_encodings)
+ bug = cmdutil.bug_from_shortname(bd, args[0])
+ bug.status = "closed"
+ bd.save()
+
+def get_parser():
+ parser = cmdutil.CmdOptionParser("be close BUG-ID")
+ return parser
+
+longhelp="""
+Close the bug identified by BUG-ID.
+"""
+
+def help():
+ return get_parser().help_str() + longhelp
diff --git a/libbe/command/comment.py b/libbe/command/comment.py
new file mode 100644
index 0000000..5bf6acf
--- /dev/null
+++ b/libbe/command/comment.py
@@ -0,0 +1,169 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import os
+import sys
+
+import libbe
+import libbe.command
+import libbe.command.util
+import libbe.comment
+import libbe.ui.util.editor
+import libbe.util.id
+
+
+class Comment (libbe.command.Command):
+ """Add a comment to a bug
+
+ >>> import time
+ >>> import libbe.bugdir
+ >>> import libbe.util.id
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Comment(ui=ui)
+
+ >>> uuid_gen = libbe.util.id.uuid_gen
+ >>> libbe.util.id.uuid_gen = lambda: 'X'
+ >>> ui._user_id = u'Fran\\xe7ois'
+ >>> ret = ui.run(cmd, args=['/a', 'This is a comment about a'])
+ Created comment with ID abc/a/X
+ >>> libbe.util.id.uuid_gen = uuid_gen
+ >>> bd.flush_reload()
+ >>> bug = bd.bug_from_uuid('a')
+ >>> bug.load_comments(load_full=False)
+ >>> comment = bug.comment_root[0]
+ >>> comment.id.storage() == comment.uuid
+ True
+ >>> print comment.body
+ This is a comment about a
+ <BLANKLINE>
+ >>> comment.author
+ u'Fran\\xe7ois'
+ >>> comment.time <= int(time.time())
+ True
+ >>> comment.in_reply_to is None
+ True
+
+ >>> if 'EDITOR' in os.environ:
+ ... del os.environ['EDITOR']
+ >>> if 'VISUAL' in os.environ:
+ ... del os.environ['VISUAL']
+ >>> ui._user_id = u'Frank'
+ >>> ret = ui.run(cmd, args=['/b'])
+ Traceback (most recent call last):
+ UserError: No comment supplied, and EDITOR not specified.
+
+ >>> os.environ['EDITOR'] = "echo 'I like cheese' > "
+ >>> libbe.util.id.uuid_gen = lambda: 'Y'
+ >>> ret = ui.run(cmd, args=['/b'])
+ Created comment with ID abc/b/Y
+ >>> libbe.util.id.uuid_gen = uuid_gen
+ >>> bd.flush_reload()
+ >>> bug = bd.bug_from_uuid('b')
+ >>> bug.load_comments(load_full=False)
+ >>> comment = bug.comment_root[0]
+ >>> print comment.body
+ I like cheese
+ <BLANKLINE>
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ >>> del os.environ["EDITOR"]
+ """
+ name = 'comment'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='author', short_name='a',
+ help='Set the comment author',
+ arg=libbe.command.Argument(
+ name='author', metavar='AUTHOR')),
+ libbe.command.Option(name='alt-id',
+ help='Set an alternate comment ID',
+ arg=libbe.command.Argument(
+ name='alt-id', metavar='ID')),
+ libbe.command.Option(name='content-type', short_name='c',
+ help='Set comment content-type (e.g. text/plain)',
+ arg=libbe.command.Argument(name='content-type',
+ metavar='MIME')),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='id', metavar='ID', default=None,
+ completion_callback=libbe.command.util.complete_bug_comment_id),
+ libbe.command.Argument(
+ name='comment', metavar='COMMENT', default=None,
+ optional=True,
+ completion_callback=libbe.command.util.complete_assigned),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ bug,parent = \
+ libbe.command.util.bug_comment_from_user_id(bugdir, params['id'])
+ if params['comment'] == None:
+ # try to launch an editor for comment-body entry
+ try:
+ if parent == bug.comment_root:
+ parent_body = bug.summary+'\n'
+ else:
+ parent_body = parent.body
+ estr = 'Please enter your comment above\n\n> %s\n' \
+ % ('\n> '.join(parent_body.splitlines()))
+ body = libbe.ui.util.editor.editor_string(estr)
+ except libbe.ui.util.editor.CantFindEditor, e:
+ raise libbe.command.UserError(
+ 'No comment supplied, and EDITOR not specified.')
+ if body is None:
+ raise libbe.command.UserError('No comment entered.')
+ elif params['comment'] == '-': # read body from stdin
+ binary = not (params['content-type'] == None
+ or params['content-type'].startswith("text/"))
+ if not binary:
+ body = self.stdin.read()
+ if not body.endswith('\n'):
+ body += '\n'
+ else: # read-in without decoding
+ body = sys.stdin.read()
+ else: # body given on command line
+ body = params['comment']
+ if not body.endswith('\n'):
+ body+='\n'
+ if params['author'] == None:
+ params['author'] = self._get_user_id()
+
+ new = parent.new_reply(body=body, content_type=params['content-type'])
+ for key in ['alt-id', 'author']:
+ if params[key] != None:
+ setattr(new, new._setting_name_to_attr_name(key), params[key])
+ print >> self.stdout, 'Created comment with ID %s' % new.id.user()
+ return 0
+
+ def _long_help(self):
+ return """
+To add a comment to a bug, use the bug ID as the argument. To reply
+to another comment, specify the comment name (as shown in "be show"
+output). COMMENT, if specified, should be either the text of your
+comment or "-", in which case the text will be read from stdin. If
+you do not specify a COMMENT, $EDITOR is used to launch an editor. If
+COMMENT is unspecified and EDITOR is not set, no comment will be
+created.
+"""
diff --git a/libbe/command/commit.py b/libbe/command/commit.py
new file mode 100644
index 0000000..fd15630
--- /dev/null
+++ b/libbe/command/commit.py
@@ -0,0 +1,93 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import sys
+
+import libbe
+import libbe.bugdir
+import libbe.command
+import libbe.command.util
+import libbe.storage
+import libbe.ui.util.editor
+
+
+class Commit (libbe.command.Command):
+ """Commit the currently pending changes to the repository
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False, versioned=True)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Commit(ui=ui)
+
+ >>> bd.extra_strings = ['hi there']
+ >>> bd.flush_reload()
+ >>> ui.run(cmd, args=['Making a commit']) # doctest: +ELLIPSIS
+ Committed ...
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'commit'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='body', short_name='b',
+ help='Provide the detailed body for the commit message. In the special case that FILE == "EDITOR", spawn an editor to enter the body text (in which case you cannot use stdin for the summary)',
+ arg=libbe.command.Argument(name='body', metavar='FILE',
+ completion_callback=libbe.command.util.complete_path)),
+ libbe.command.Option(name='allow-empty', short_name='a',
+ help='Allow empty commits'),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='comment', metavar='COMMENT', default=None),
+ ])
+
+ def _run(self, **params):
+ if params['comment'] == '-': # read summary from stdin
+ assert params['body'] != 'EDITOR', \
+ 'Cannot spawn and editor when the summary is using stdin.'
+ summary = sys.stdin.readline()
+ else:
+ summary = params['comment']
+ storage = self._get_storage()
+ if params['body'] == None:
+ body = None
+ elif params['body'] == 'EDITOR':
+ body = libbe.ui.util.editor.editor_string(
+ 'Please enter your commit message above')
+ else:
+ self._check_restricted_access(storage, params['body'])
+ body = libbe.util.encoding.get_file_contents(
+ params['body'], decode=True)
+ try:
+ revision = storage.commit(summary, body=body,
+ allow_empty=params['allow-empty'])
+ print >> self.stdout, 'Committed %s' % revision
+ except libbe.storage.EmptyCommit, e:
+ print >> self.stdout, e
+ return 1
+
+ def _long_help(self):
+ return """
+Commit the current repository status. The summary specified on the
+commandline is a string (only one line) that describes the commit
+briefly or "-", in which case the string will be read from stdin.
+"""
diff --git a/libbe/command/depend.py b/libbe/command/depend.py
new file mode 100644
index 0000000..f87657b
--- /dev/null
+++ b/libbe/command/depend.py
@@ -0,0 +1,408 @@
+# Copyright (C) 2009-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import copy
+import os
+
+import libbe
+import libbe.bug
+import libbe.command
+import libbe.command.util
+import libbe.util.tree
+
+BLOCKS_TAG="BLOCKS:"
+BLOCKED_BY_TAG="BLOCKED-BY:"
+
+class BrokenLink (Exception):
+ def __init__(self, blocked_bug, blocking_bug, blocks=True):
+ if blocks == True:
+ msg = "Missing link: %s blocks %s" \
+ % (blocking_bug.uuid, blocked_bug.uuid)
+ else:
+ msg = "Missing link: %s blocked by %s" \
+ % (blocked_bug.uuid, blocking_bug.uuid)
+ Exception.__init__(self, msg)
+ self.blocked_bug = blocked_bug
+ self.blocking_bug = blocking_bug
+
+class Depend (libbe.command.Command):
+ """Add/remove bug dependencies
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Depend(ui=ui)
+
+ >>> ret = ui.run(cmd, args=['/a', '/b'])
+ a blocked by:
+ b
+ >>> ret = ui.run(cmd, args=['/a'])
+ a blocked by:
+ b
+ >>> ret = ui.run(cmd, {'show-status':True}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ a blocked by:
+ b closed
+ >>> ret = ui.run(cmd, args=['/b', '/a'])
+ b blocked by:
+ a
+ b blocks:
+ a
+ >>> ret = ui.run(cmd, {'show-status':True}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ a blocked by:
+ b closed
+ a blocks:
+ b closed
+ >>> ret = ui.run(cmd, {'repair':True})
+ >>> ret = ui.run(cmd, {'remove':True}, ['/b', '/a'])
+ b blocks:
+ a
+ >>> ret = ui.run(cmd, {'remove':True}, ['/a', '/b'])
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'depend'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='remove', short_name='r',
+ help='Remove dependency (instead of adding it)'),
+ libbe.command.Option(name='show-status', short_name='s',
+ help='Show status of blocking bugs'),
+ libbe.command.Option(name='status',
+ help='Only show bugs matching the STATUS specifier',
+ arg=libbe.command.Argument(
+ name='status', metavar='STATUS', default=None,
+ completion_callback=libbe.command.util.complete_status)),
+ libbe.command.Option(name='severity',
+ help='Only show bugs matching the SEVERITY specifier',
+ arg=libbe.command.Argument(
+ name='severity', metavar='SEVERITY', default=None,
+ completion_callback=libbe.command.util.complete_severity)),
+ libbe.command.Option(name='tree-depth', short_name='t',
+ help='Print dependency tree rooted at BUG-ID with DEPTH levels of both blockers and blockees. Set DEPTH <= 0 to disable the depth limit.',
+ arg=libbe.command.Argument(
+ name='tree-depth', metavar='INT', type='int',
+ completion_callback=libbe.command.util.complete_severity)),
+ libbe.command.Option(name='repair',
+ help='Check for and repair one-way links'),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='bug-id', metavar='BUG-ID', default=None,
+ optional=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ libbe.command.Argument(
+ name='blocking-bug-id', metavar='BUG-ID', default=None,
+ optional=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ ])
+
+ def _run(self, **params):
+ if params['repair'] == True and params['bug-id'] != None:
+ raise libbe.command.UsageError(
+ 'No arguments with --repair calls.')
+ if params['repair'] == False and params['bug-id'] == None:
+ raise libbe.command.UsageError(
+ 'Must specify either --repair or a BUG-ID')
+ if params['tree-depth'] != None \
+ and params['blocking-bug-id'] != None:
+ raise libbe.command.UsageError(
+ 'Only one bug id used in tree mode.')
+ bugdir = self._get_bugdir()
+ if params['repair'] == True:
+ good,fixed,broken = check_dependencies(bugdir, repair_broken_links=True)
+ assert len(broken) == 0, broken
+ if len(fixed) > 0:
+ print >> self.stdout, 'Fixed the following links:'
+ print >> self.stdout, \
+ '\n'.join(['%s |-- %s' % (blockee.uuid, blocker.uuid)
+ for blockee,blocker in fixed])
+ return 0
+ allowed_status_values = \
+ libbe.command.util.select_values(
+ params['status'], libbe.bug.status_values)
+ allowed_severity_values = \
+ libbe.command.util.select_values(
+ params['severity'], libbe.bug.severity_values)
+
+ bugA, dummy_comment = libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['bug-id'])
+
+ if params['tree-depth'] != None:
+ dtree = DependencyTree(bugdir, bugA, params['tree-depth'],
+ allowed_status_values,
+ allowed_severity_values)
+ if len(dtree.blocked_by_tree()) > 0:
+ print >> self.stdout, '%s blocked by:' % bugA.uuid
+ for depth,node in dtree.blocked_by_tree().thread():
+ if depth == 0: continue
+ print >> self.stdout, \
+ '%s%s' % (' '*(depth),
+ node.bug.string(shortlist=True))
+ if len(dtree.blocks_tree()) > 0:
+ print >> self.stdout, '%s blocks:' % bugA.uuid
+ for depth,node in dtree.blocks_tree().thread():
+ if depth == 0: continue
+ print >> self.stdout, \
+ '%s%s' % (' '*(depth),
+ node.bug.string(shortlist=True))
+ return 0
+
+ if params['blocking-bug-id'] != None:
+ bugB,dummy_comment = libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['blocking-bug-id'])
+ if params['remove'] == True:
+ remove_block(bugA, bugB)
+ else: # add the dependency
+ add_block(bugA, bugB)
+
+ blocked_by = get_blocked_by(bugdir, bugA)
+ if len(blocked_by) > 0:
+ print >> self.stdout, '%s blocked by:' % bugA.uuid
+ if params['show-status'] == True:
+ print >> self.stdout, \
+ '\n'.join(['%s\t%s' % (_bug.uuid, _bug.status)
+ for _bug in blocked_by])
+ else:
+ print >> self.stdout, \
+ '\n'.join([_bug.uuid for _bug in blocked_by])
+ blocks = get_blocks(bugdir, bugA)
+ if len(blocks) > 0:
+ print >> self.stdout, '%s blocks:' % bugA.uuid
+ if params['show-status'] == True:
+ print >> self.stdout, \
+ '\n'.join(['%s\t%s' % (_bug.uuid, _bug.status)
+ for _bug in blocks])
+ else:
+ print >> self.stdout, \
+ '\n'.join([_bug.uuid for _bug in blocks])
+ return 0
+
+ def _long_help(self):
+ return """
+Set a dependency with the second bug (B) blocking the first bug (A).
+If bug B is not specified, just print a list of bugs blocking (A).
+
+To search for bugs blocked by a particular bug, try
+ $ be list --extra-strings BLOCKED-BY:<your-bug-uuid>
+
+The --status and --severity options allow you to either blacklist or
+whitelist values, for example
+ $ be list --status open,assigned
+will only follow and print dependencies with open or assigned status.
+You select blacklist mode by starting the list with a minus sign, for
+example
+ $ be list --severity -target
+which will only follow and print dependencies with non-target severity.
+
+If neither bug A nor B is specified, check for and repair the missing
+side of any one-way links.
+
+The "|--" symbol in the repair-mode output is inspired by the
+"negative feedback" arrow common in biochemistry. See, for example
+ http://www.nature.com/nature/journal/v456/n7223/images/nature07513-f5.0.jpg
+"""
+
+# internal helper functions
+
+def _generate_blocks_string(blocked_bug):
+ return '%s%s' % (BLOCKS_TAG, blocked_bug.uuid)
+
+def _generate_blocked_by_string(blocking_bug):
+ return '%s%s' % (BLOCKED_BY_TAG, blocking_bug.uuid)
+
+def _parse_blocks_string(string):
+ assert string.startswith(BLOCKS_TAG)
+ return string[len(BLOCKS_TAG):]
+
+def _parse_blocked_by_string(string):
+ assert string.startswith(BLOCKED_BY_TAG)
+ return string[len(BLOCKED_BY_TAG):]
+
+def _add_remove_extra_string(bug, string, add):
+ estrs = bug.extra_strings
+ if add == True:
+ estrs.append(string)
+ else: # remove the string
+ estrs.remove(string)
+ bug.extra_strings = estrs # reassign to notice change
+
+def _get_blocks(bug):
+ uuids = []
+ for line in bug.extra_strings:
+ if line.startswith(BLOCKS_TAG):
+ uuids.append(_parse_blocks_string(line))
+ return uuids
+
+def _get_blocked_by(bug):
+ uuids = []
+ for line in bug.extra_strings:
+ if line.startswith(BLOCKED_BY_TAG):
+ uuids.append(_parse_blocked_by_string(line))
+ return uuids
+
+def _repair_one_way_link(blocked_bug, blocking_bug, blocks=None):
+ if blocks == True: # add blocks link
+ blocks_string = _generate_blocks_string(blocked_bug)
+ _add_remove_extra_string(blocking_bug, blocks_string, add=True)
+ else: # add blocked by link
+ blocked_by_string = _generate_blocked_by_string(blocking_bug)
+ _add_remove_extra_string(blocked_bug, blocked_by_string, add=True)
+
+# functions exposed to other modules
+
+def add_block(blocked_bug, blocking_bug):
+ blocked_by_string = _generate_blocked_by_string(blocking_bug)
+ _add_remove_extra_string(blocked_bug, blocked_by_string, add=True)
+ blocks_string = _generate_blocks_string(blocked_bug)
+ _add_remove_extra_string(blocking_bug, blocks_string, add=True)
+
+def remove_block(blocked_bug, blocking_bug):
+ blocked_by_string = _generate_blocked_by_string(blocking_bug)
+ _add_remove_extra_string(blocked_bug, blocked_by_string, add=False)
+ blocks_string = _generate_blocks_string(blocked_bug)
+ _add_remove_extra_string(blocking_bug, blocks_string, add=False)
+
+def get_blocks(bugdir, bug):
+ """
+ Return a list of bugs that the given bug blocks.
+ """
+ blocks = []
+ for uuid in _get_blocks(bug):
+ blocks.append(bugdir.bug_from_uuid(uuid))
+ return blocks
+
+def get_blocked_by(bugdir, bug):
+ """
+ Return a list of bugs blocking the given bug.
+ """
+ blocked_by = []
+ for uuid in _get_blocked_by(bug):
+ blocked_by.append(bugdir.bug_from_uuid(uuid))
+ return blocked_by
+
+def check_dependencies(bugdir, repair_broken_links=False):
+ """
+ Check that links are bi-directional for all bugs in bugdir.
+
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir()
+ >>> a = bd.bug_from_uuid("a")
+ >>> b = bd.bug_from_uuid("b")
+ >>> blocked_by_string = _generate_blocked_by_string(b)
+ >>> _add_remove_extra_string(a, blocked_by_string, add=True)
+ >>> good,repaired,broken = check_dependencies(bd, repair_broken_links=False)
+ >>> good
+ []
+ >>> repaired
+ []
+ >>> broken
+ [(Bug(uuid='a'), Bug(uuid='b'))]
+ >>> _get_blocks(b)
+ []
+ >>> good,repaired,broken = check_dependencies(bd, repair_broken_links=True)
+ >>> _get_blocks(b)
+ ['a']
+ >>> good
+ []
+ >>> repaired
+ [(Bug(uuid='a'), Bug(uuid='b'))]
+ >>> broken
+ []
+ """
+ if bugdir.storage != None:
+ bugdir.load_all_bugs()
+ good_links = []
+ fixed_links = []
+ broken_links = []
+ for bug in bugdir:
+ for blocker in get_blocked_by(bugdir, bug):
+ blocks = get_blocks(bugdir, blocker)
+ if (bug, blocks) in good_links+fixed_links+broken_links:
+ continue # already checked that link
+ if bug not in blocks:
+ if repair_broken_links == True:
+ _repair_one_way_link(bug, blocker, blocks=True)
+ fixed_links.append((bug, blocker))
+ else:
+ broken_links.append((bug, blocker))
+ else:
+ good_links.append((bug, blocker))
+ for blockee in get_blocks(bugdir, bug):
+ blocked_by = get_blocked_by(bugdir, blockee)
+ if (blockee, bug) in good_links+fixed_links+broken_links:
+ continue # already checked that link
+ if bug not in blocked_by:
+ if repair_broken_links == True:
+ _repair_one_way_link(blockee, bug, blocks=False)
+ fixed_links.append((blockee, bug))
+ else:
+ broken_links.append((blockee, bug))
+ else:
+ good_links.append((blockee, bug))
+ return (good_links, fixed_links, broken_links)
+
+class DependencyTree (object):
+ """
+ Note: should probably be DependencyDiGraph.
+ """
+ def __init__(self, bugdir, root_bug, depth_limit=0,
+ allowed_status_values=None,
+ allowed_severity_values=None):
+ self.bugdir = bugdir
+ self.root_bug = root_bug
+ self.depth_limit = depth_limit
+ self.allowed_status_values = allowed_status_values
+ self.allowed_severity_values = allowed_severity_values
+
+ def _build_tree(self, child_fn):
+ root = tree.Tree()
+ root.bug = self.root_bug
+ root.depth = 0
+ stack = [root]
+ while len(stack) > 0:
+ node = stack.pop()
+ if self.depth_limit > 0 and node.depth == self.depth_limit:
+ continue
+ for bug in child_fn(self.bugdir, node.bug):
+ if self.allowed_status_values != None \
+ and not bug.status in self.allowed_status_values:
+ continue
+ if self.allowed_severity_values != None \
+ and not bug.severity in self.allowed_severity_values:
+ continue
+ child = tree.Tree()
+ child.bug = bug
+ child.depth = node.depth+1
+ node.append(child)
+ stack.append(child)
+ return root
+
+ def blocks_tree(self):
+ if not hasattr(self, "_blocks_tree"):
+ self._blocks_tree = self._build_tree(get_blocks)
+ return self._blocks_tree
+
+ def blocked_by_tree(self):
+ if not hasattr(self, "_blocked_by_tree"):
+ self._blocked_by_tree = self._build_tree(get_blocked_by)
+ return self._blocked_by_tree
diff --git a/libbe/command/diff.py b/libbe/command/diff.py
new file mode 100644
index 0000000..967ab14
--- /dev/null
+++ b/libbe/command/diff.py
@@ -0,0 +1,139 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.bugdir
+import libbe.bug
+import libbe.command
+import libbe.command.util
+import libbe.storage
+
+import libbe.diff
+
+class Diff (libbe.command.Command):
+ __doc__ = """Compare bug reports with older tree
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False, versioned=True)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Diff()
+
+ >>> original = bd.storage.commit('Original status')
+ >>> bug = bd.bug_from_uuid('a')
+ >>> bug.status = 'closed'
+ >>> changed = bd.storage.commit('Closed bug a')
+ >>> ret = ui.run(cmd, args=[original])
+ Modified bugs:
+ abc/a:cm: Bug A
+ Changed bug settings:
+ status: open -> closed
+ >>> ret = ui.run(cmd, {'subscribe':'%(bugdir_id)s:mod', 'uuids':True}, [original])
+ a
+ >>> bd.storage.versioned = False
+ >>> ret = ui.run(cmd, args=[original])
+ Traceback (most recent call last):
+ ...
+ UserError: This repository is not revision-controlled.
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """ % {'bugdir_id':libbe.diff.BUGDIR_ID}
+ name = 'diff'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='repo', short_name='r',
+ help='Compare with repository in REPO instead'
+ ' of the current repository.',
+ arg=libbe.command.Argument(
+ name='repo', metavar='REPO',
+ completion_callback=libbe.command.util.complete_path)),
+ libbe.command.Option(name='subscribe', short_name='s',
+ help='Only print changes matching SUBSCRIPTION, '
+ 'subscription is a comma-separated list of ID:TYPE '
+ 'tuples. See `be subscribe --help` for descriptions '
+ 'of ID and TYPE.',
+ arg=libbe.command.Argument(
+ name='subscribe', metavar='SUBSCRIPTION')),
+ libbe.command.Option(name='uuids', short_name='u',
+ help='Only print the changed bug UUIDS.'),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='revision', metavar='REVISION', default=None,
+ optional=True)
+ ])
+
+ def _run(self, **params):
+ try:
+ subscriptions = libbe.diff.subscriptions_from_string(
+ params['subscribe'])
+ except ValueError, e:
+ raise libbe.command.UserError(e.msg)
+ bugdir = self._get_bugdir()
+ if bugdir.storage.versioned == False:
+ raise libbe.command.UserError(
+ 'This repository is not revision-controlled.')
+ if params['repo'] == None:
+ if params['revision'] == None: # get the most recent revision
+ params['revision'] = bugdir.storage.revision_id(-1)
+ old_bd = libbe.bugdir.RevisionedBugDir(bugdir, params['revision'])
+ else:
+ old_storage = libbe.storage.get_storage(params['repo'])
+ old_storage.connect()
+ old_bd_current = libbe.bugdir.BugDir(old_storage, from_disk=True)
+ if params['revision'] == None: # use the current working state
+ old_bd = old_bd_current
+ else:
+ if old_bd_current.storage.versioned == False:
+ raise libbe.command.UserError(
+ '%s is not revision-controlled.'
+ % storage.repo)
+ old_bd = libbe.bugdir.RevisionedBugDir(old_bd_current,revision)
+ d = libbe.diff.Diff(old_bd, bugdir)
+ tree = d.report_tree(subscriptions)
+
+ if params['uuids'] == True:
+ uuids = []
+ bugs = tree.child_by_path('/bugs')
+ for bug_type in bugs:
+ uuids.extend([bug.name for bug in bug_type])
+ print >> self.stdout, '\n'.join(uuids)
+ else :
+ rep = tree.report_string()
+ if rep != None:
+ print >> self.stdout, rep
+ return 0
+
+ def _long_help(self):
+ return """
+Uses the storage backend to compare the current tree with a previous
+tree, and prints a pretty report. If REVISION is given, it is a
+specifier for the particular previous tree to use. Specifiers are
+specific to their storage backend.
+
+For Arch your specifier must be a fully-qualified revision name.
+
+Besides the standard summary output, you can use the options to output
+UUIDS for the different categories. This output can be used as the
+input to 'be show' to get an understanding of the current status.
+"""
diff --git a/libbe/command/due.py b/libbe/command/due.py
new file mode 100644
index 0000000..4463455
--- /dev/null
+++ b/libbe/command/due.py
@@ -0,0 +1,117 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.command
+import libbe.command.util
+import libbe.util.utility
+
+
+DUE_TAG = 'DUE:'
+
+
+class Due (libbe.command.Command):
+ """Set bug due dates
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Due(ui=ui)
+
+ >>> ret = ui.run(cmd, args=['/a'])
+ No due date assigned.
+ >>> ret = ui.run(cmd, args=['/a', 'Thu, 01 Jan 1970 00:00:00 +0000'])
+ >>> ret = ui.run(cmd, args=['/a'])
+ Thu, 01 Jan 1970 00:00:00 +0000
+ >>> ret = ui.run(cmd, args=['/a', 'none'])
+ >>> ret = ui.run(cmd, args=['/a'])
+ No due date assigned.
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'due'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='bug-id', metavar='BUG-ID',
+ completion_callback=libbe.command.util.complete_bug_id),
+ libbe.command.Argument(
+ name='due', metavar='DUE', optional=True),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['bug-id'])
+ if params['due'] == None:
+ due_time = get_due(bug)
+ if due_time is None:
+ print >> self.stdout, 'No due date assigned.'
+ else:
+ print >> self.stdout, libbe.util.utility.time_to_str(due_time)
+ else:
+ if params['due'] == 'none':
+ remove_due(bug)
+ else:
+ due_time = libbe.util.utility.str_to_time(params['due'])
+ set_due(bug, due_time)
+
+ def _long_help(self):
+ return """
+If no DATE is specified, the bug's current due date is printed. If
+DATE is specified, it will be assigned to the bug.
+"""
+
+# internal helper functions
+
+def _generate_due_string(time):
+ return "%s%s" % (DUE_TAG, libbe.util.utility.time_to_str(time))
+
+def _parse_due_string(string):
+ assert string.startswith(DUE_TAG)
+ return libbe.util.utility.str_to_time(string[len(DUE_TAG):])
+
+# functions exposed to other modules
+
+def get_due(bug):
+ matched = []
+ for line in bug.extra_strings:
+ if line.startswith(DUE_TAG):
+ matched.append(_parse_due_string(line))
+ if len(matched) == 0:
+ return None
+ if len(matched) > 1:
+ raise Exception('Several due dates for %s?:\n %s'
+ % (bug.uuid, '\n '.join(matched)))
+ return matched[0]
+
+def remove_due(bug):
+ estrs = bug.extra_strings
+ for due_str in [s for s in estrs if s.startswith(DUE_TAG)]:
+ estrs.remove(due_str)
+ bug.extra_strings = estrs # reassign to notice change
+
+def set_due(bug, time):
+ remove_due(bug)
+ estrs = bug.extra_strings
+ estrs.append(_generate_due_string(time))
+ bug.extra_strings = estrs # reassign to notice change
diff --git a/libbe/command/help.py b/libbe/command/help.py
new file mode 100644
index 0000000..1fc88f0
--- /dev/null
+++ b/libbe/command/help.py
@@ -0,0 +1,82 @@
+# Copyright (C) 2006-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# Thomas Gerigk <tgerigk@gmx.de>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.command
+import libbe.command.util
+
+TOPICS = {}
+
+class Help (libbe.command.Command):
+ """Print help for given command or topic
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> cmd = Help()
+
+ >>> ret = ui.run(cmd, args=['help'])
+ usage: be help [options] [TOPIC]
+ <BLANKLINE>
+ Options:
+ -h, --help Print a help message.
+ <BLANKLINE>
+ --complete Print a list of possible completions.
+ <BLANKLINE>
+ <BLANKLINE>
+ Print help for specified command/topic or list of all commands.
+ """
+ name = 'help'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='topic', metavar='TOPIC', default=None,
+ optional=True,
+ completion_callback=self.complete_topic)
+ ])
+
+ def _run(self, **params):
+ if params['topic'] == None:
+ if hasattr(self.ui, 'help'):
+ print >> self.stdout, self.ui.help().rstrip('\n')
+ elif params['topic'] in libbe.command.commands():
+ module = libbe.command.get_command(params['topic'])
+ Class = libbe.command.get_command_class(module,params['topic'])
+ c = Class(ui=self.ui)
+ print >> self.stdout, c.help().rstrip('\n')
+ elif params['topic'] in TOPICS:
+ print >> self.stdout, TOPICS[params['topic']].rstrip('\n')
+ else:
+ raise libbe.command.UserError(
+ '"%s" is neither a command nor topic' % params['topic'])
+ return 0
+
+ def _long_help(self):
+ return """
+Print help for specified command/topic or list of all commands.
+"""
+
+ def complete_topic(self, command, argument, fragment=None):
+ commands = libbe.command.util.complete_command()
+ topics = sorted(TOPICS.keys())
+ return commands + topics
diff --git a/libbe/command/html.py b/libbe/command/html.py
new file mode 100644
index 0000000..fbbdf97
--- /dev/null
+++ b/libbe/command/html.py
@@ -0,0 +1,686 @@
+# Copyright (C) 2009-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import codecs
+import htmlentitydefs
+import os
+import os.path
+import re
+import string
+import time
+import xml.sax.saxutils
+
+import libbe
+import libbe.command
+import libbe.command.util
+import libbe.comment
+import libbe.util.encoding
+import libbe.util.id
+
+
+class HTML (libbe.command.Command):
+ """Generate a static HTML dump of the current repository status
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = HTML(ui=ui)
+
+ >>> ret = ui.run(cmd, {'output':os.path.join(bd.storage.repo, 'html_export')})
+ >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export'))
+ True
+ >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'index.html'))
+ True
+ >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'index_inactive.html'))
+ True
+ >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs'))
+ True
+ >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs', 'a.html'))
+ True
+ >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs', 'b.html'))
+ True
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'html'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='output', short_name='o',
+ help='Set the output path (%default)',
+ arg=libbe.command.Argument(
+ name='output', metavar='DIR', default='./html_export',
+ completion_callback=libbe.command.util.complete_path)),
+ libbe.command.Option(name='template-dir', short_name='t',
+ help='Use a different template. Defaults to internal templates',
+ arg=libbe.command.Argument(
+ name='template-dir', metavar='DIR',
+ completion_callback=libbe.command.util.complete_path)),
+ libbe.command.Option(name='title',
+ help='Set the bug repository title (%default)',
+ arg=libbe.command.Argument(
+ name='title', metavar='STRING',
+ default='BugsEverywhere Issue Tracker')),
+ libbe.command.Option(name='index-header',
+ help='Set the index page headers (%default)',
+ arg=libbe.command.Argument(
+ name='index-header', metavar='STRING',
+ default='BugsEverywhere Bug List')),
+ libbe.command.Option(name='export-template', short_name='e',
+ help='Export the default template and exit.'),
+ libbe.command.Option(name='export-template-dir', short_name='d',
+ help='Set the directory for the template export (%default)',
+ arg=libbe.command.Argument(
+ name='export-template-dir', metavar='DIR',
+ default='./default-templates/',
+ completion_callback=libbe.command.util.complete_path)),
+ libbe.command.Option(name='verbose', short_name='v',
+ help='Verbose output, default is %default'),
+ ])
+
+ def _run(self, **params):
+ if params['export-template'] == True:
+ html_gen.write_default_template(params['export-template-dir'])
+ return 0
+ bugdir = self._get_bugdir()
+ bugdir.load_all_bugs()
+ html_gen = HTMLGen(bugdir,
+ template=params['template-dir'],
+ title=params['title'],
+ index_header=params['index-header'],
+ verbose=params['verbose'],
+ stdout=self.stdout)
+ html_gen.run(params['output'])
+ return 0
+
+ def _long_help(self):
+ return """
+Generate a set of html pages representing the current state of the bug
+directory.
+"""
+
+Html = HTML # alias for libbe.command.base.get_command_class()
+
+class HTMLGen (object):
+ def __init__(self, bd, template=None,
+ title="Site Title", index_header="Index Header",
+ verbose=False, encoding=None, stdout=None,
+ ):
+ self.generation_time = time.ctime()
+ self.bd = bd
+ if template == None:
+ self.template = "default"
+ else:
+ self.template = os.path.abspath(os.path.expanduser(template))
+ self.title = title
+ self.index_header = index_header
+ self.verbose = verbose
+ self.stdout = stdout
+ if encoding != None:
+ self.encoding = encoding
+ else:
+ self.encoding = libbe.util.encoding.get_filesystem_encoding()
+ self._load_default_templates()
+ if template != None:
+ self._load_user_templates()
+
+ def run(self, out_dir):
+ if self.verbose == True:
+ print >> self.stdout, \
+ 'Creating the html output in %s using templates in %s' \
+ % (out_dir, self.template)
+
+ bugs_active = []
+ bugs_inactive = []
+ bugs = [b for b in self.bd]
+ bugs.sort()
+ bugs_active = [b for b in bugs if b.active == True]
+ bugs_inactive = [b for b in bugs if b.active != True]
+
+ self._create_output_directories(out_dir)
+ self._write_css_file()
+ for b in bugs:
+ if b.active:
+ up_link = '../index.html'
+ else:
+ up_link = '../index_inactive.html'
+ self._write_bug_file(b, up_link)
+ self._write_index_file(
+ bugs_active, title=self.title,
+ index_header=self.index_header, bug_type='active')
+ self._write_index_file(
+ bugs_inactive, title=self.title,
+ index_header=self.index_header, bug_type='inactive')
+
+ def _create_output_directories(self, out_dir):
+ if self.verbose:
+ print >> self.stdout, 'Creating output directories'
+ self.out_dir = self._make_dir(out_dir)
+ self.out_dir_bugs = self._make_dir(
+ os.path.join(self.out_dir, 'bugs'))
+
+ def _write_css_file(self):
+ if self.verbose:
+ print >> self.stdout, 'Writing css file'
+ assert hasattr(self, 'out_dir'), \
+ 'Must run after ._create_output_directories()'
+ self._write_file(self.css_file,
+ [self.out_dir,'style.css'])
+
+ def _write_bug_file(self, bug, up_link):
+ if self.verbose:
+ print >> self.stdout, '\tCreating bug file for %s' % bug.id.user()
+ assert hasattr(self, 'out_dir_bugs'), \
+ 'Must run after ._create_output_directories()'
+
+ bug.load_comments(load_full=True)
+ comment_entries = self._generate_bug_comment_entries(bug)
+ filename = '%s.html' % bug.uuid
+ fullpath = os.path.join(self.out_dir_bugs, filename)
+ template_info = {'title':self.title,
+ 'charset':self.encoding,
+ 'up_link':up_link,
+ 'shortname':bug.id.user(),
+ 'comment_entries':comment_entries,
+ 'generation_time':self.generation_time}
+ for attr in ['uuid', 'severity', 'status', 'assigned',
+ 'reporter', 'creator', 'time_string', 'summary']:
+ template_info[attr] = self._escape(getattr(bug, attr))
+ self._write_file(self.bug_file % template_info, [fullpath])
+
+ def _generate_bug_comment_entries(self, bug):
+ assert hasattr(self, 'out_dir_bugs'), \
+ 'Must run after ._create_output_directories()'
+
+ stack = []
+ comment_entries = []
+ bug.comment_root.sort(cmp=libbe.comment.cmp_time, reverse=True)
+ for depth,comment in bug.comment_root.thread(flatten=False):
+ while len(stack) > depth:
+ # pop non-parents off the stack
+ stack.pop(-1)
+ # close non-parent <div class="comment...
+ comment_entries.append('</div>\n')
+ assert len(stack) == depth
+ stack.append(comment)
+ if depth == 0:
+ comment_entries.append('<div class="comment root">')
+ else:
+ comment_entries.append(
+ '<div class="comment" id="%s">' % comment.uuid)
+ template_info = {'shortname': comment.id.user()}
+ for attr in ['uuid', 'author', 'date', 'body']:
+ value = getattr(comment, attr)
+ if attr == 'body':
+ link_long_ids = False
+ save_body = False
+ if comment.content_type == 'text/html':
+ link_long_ids = True
+ elif comment.content_type.startswith('text/'):
+ value = '<pre>\n'+self._escape(value)+'\n</pre>'
+ link_long_ids = True
+ elif comment.content_type.startswith('image/'):
+ save_body = True
+ value = '<img src="./%s/%s" />' \
+ % (bug.uuid, comment.uuid)
+ else:
+ save_body = True
+ value = '<a href="./%s/%s">Link to %s file</a>.' \
+ % (bug.uuid, comment.uuid, comment.content_type)
+ if link_long_ids == True:
+ value = self._long_to_linked_user(value)
+ if save_body == True:
+ per_bug_dir = os.path.join(self.out_dir_bugs, bug.uuid)
+ if not os.path.exists(per_bug_dir):
+ os.mkdir(per_bug_dir)
+ comment_path = os.path.join(per_bug_dir, comment.uuid)
+ self._write_file(
+ '<Files %s>\n ForceType %s\n</Files>' \
+ % (comment.uuid, comment.content_type),
+ [per_bug_dir, '.htaccess'], mode='a')
+ self._write_file(comment.body,
+ [per_bug_dir, comment.uuid], mode='wb')
+ else:
+ value = self._escape(value)
+ template_info[attr] = value
+ comment_entries.append(self.bug_comment_entry % template_info)
+ while len(stack) > 0:
+ stack.pop(-1)
+ comment_entries.append('</div>\n') # close every remaining <div class='comment...
+ return '\n'.join(comment_entries)
+
+ def _long_to_linked_user(self, text):
+ """
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> h = HTMLGen(bd)
+ >>> h._long_to_linked_user('A link #abc123/a#, and a non-link #x#y#.')
+ 'A link <a href="./a.html">abc/a</a>, and a non-link #x#y#.'
+ >>> bd.cleanup()
+ """
+ replacer = libbe.util.id.IDreplacer(
+ [self.bd], self._long_to_linked_user_replacer, wrap=False)
+ return re.sub(
+ libbe.util.id.REGEXP, replacer, text)
+
+ def _long_to_linked_user_replacer(self, bugdirs, long_id):
+ """
+ >>> import libbe.bugdir
+ >>> import libbe.util.id
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> a = bd.bug_from_uuid('a')
+ >>> uuid_gen = libbe.util.id.uuid_gen
+ >>> libbe.util.id.uuid_gen = lambda : '0123'
+ >>> c = a.new_comment('comment for link testing')
+ >>> libbe.util.id.uuid_gen = uuid_gen
+ >>> c.uuid
+ '0123'
+ >>> h = HTMLGen(bd)
+ >>> h._long_to_linked_user_replacer([bd], 'abc123')
+ '#abc123#'
+ >>> h._long_to_linked_user_replacer([bd], 'abc123/a')
+ '<a href="./a.html">abc/a</a>'
+ >>> h._long_to_linked_user_replacer([bd], 'abc123/a/0123')
+ '<a href="./a.html#0123">abc/a/012</a>'
+ >>> h._long_to_linked_user_replacer([bd], 'x')
+ '#x#'
+ >>> h._long_to_linked_user_replacer([bd], '')
+ '##'
+ >>> bd.cleanup()
+ """
+ try:
+ p = libbe.util.id.parse_user(bugdirs[0], long_id)
+ short_id = libbe.util.id.long_to_short_user(bugdirs, long_id)
+ except (libbe.util.id.MultipleIDMatches,
+ libbe.util.id.NoIDMatches,
+ libbe.util.id.InvalidIDStructure), e:
+ return '#%s#' % long_id # re-wrap failures
+ if p['type'] == 'bugdir':
+ return '#%s#' % long_id
+ elif p['type'] == 'bug':
+ return '<a href="./%s.html">%s</a>' \
+ % (p['bug'], short_id)
+ elif p['type'] == 'comment':
+ return '<a href="./%s.html#%s">%s</a>' \
+ % (p['bug'], p['comment'], short_id)
+ raise Exception('Invalid id type %s for "%s"'
+ % (p['type'], long_id))
+
+ def _write_index_file(self, bugs, title, index_header, bug_type='active'):
+ if self.verbose:
+ print >> self.stdout, 'Writing %s index file for %d bugs' % (bug_type, len(bugs))
+ assert hasattr(self, 'out_dir'), 'Must run after ._create_output_directories()'
+ esc = self._escape
+
+ bug_entries = self._generate_index_bug_entries(bugs)
+
+ if bug_type == 'active':
+ filename = 'index.html'
+ elif bug_type == 'inactive':
+ filename = 'index_inactive.html'
+ else:
+ raise Exception, 'Unrecognized bug_type: "%s"' % bug_type
+ template_info = {'title':title,
+ 'index_header':index_header,
+ 'charset':self.encoding,
+ 'active_class':'tab sel',
+ 'inactive_class':'tab nsel',
+ 'bug_entries':bug_entries,
+ 'generation_time':self.generation_time}
+ if bug_type == 'inactive':
+ template_info['active_class'] = 'tab nsel'
+ template_info['inactive_class'] = 'tab sel'
+
+ self._write_file(self.index_file % template_info,
+ [self.out_dir, filename])
+
+ def _generate_index_bug_entries(self, bugs):
+ bug_entries = []
+ for bug in bugs:
+ if self.verbose:
+ print >> self.stdout, '\tCreating bug entry for %s' % bug.id.user()
+ template_info = {'shortname':bug.id.user()}
+ for attr in ['uuid', 'severity', 'status', 'assigned',
+ 'reporter', 'creator', 'time_string', 'summary']:
+ template_info[attr] = self._escape(getattr(bug, attr))
+ bug_entries.append(self.index_bug_entry % template_info)
+ return '\n'.join(bug_entries)
+
+ def _escape(self, string):
+ if string == None:
+ return ''
+ return xml.sax.saxutils.escape(char)
+
+ def _load_user_templates(self):
+ for filename,attr in [('style.css','css_file'),
+ ('index_file.tpl','index_file'),
+ ('index_bug_entry.tpl','index_bug_entry'),
+ ('bug_file.tpl','bug_file'),
+ ('bug_comment_entry.tpl','bug_comment_entry')]:
+ fullpath = os.path.join(self.template, filename)
+ if os.path.exists(fullpath):
+ setattr(self, attr, self._read_file([fullpath]))
+
+ def _make_dir(self, dir_path):
+ dir_path = os.path.abspath(os.path.expanduser(dir_path))
+ if not os.path.exists(dir_path):
+ try:
+ os.makedirs(dir_path)
+ except:
+ raise libbe.command.UserError(
+ 'Cannot create output directory "%s".' % dir_path)
+ return dir_path
+
+ def _write_file(self, content, path_array, mode='w'):
+ return libbe.util.encoding.set_file_contents(
+ os.path.join(*path_array), content, mode, self.encoding)
+
+ def _read_file(self, path_array, mode='r'):
+ return libbe.util.encoding.get_file_contents(
+ os.path.join(*path_array), mode, self.encoding, decode=True)
+
+ def write_default_template(self, out_dir):
+ if self.verbose:
+ print >> self.stdout, 'Creating output directories'
+ self.out_dir = self._make_dir(out_dir)
+ if self.verbose:
+ print >> self.stdout, 'Creating css file'
+ self._write_css_file()
+ if self.verbose:
+ print >> self.stdout, 'Creating index_file.tpl file'
+ self._write_file(self.index_file,
+ [self.out_dir, 'index_file.tpl'])
+ if self.verbose:
+ print >> self.stdout, 'Creating index_bug_entry.tpl file'
+ self._write_file(self.index_bug_entry,
+ [self.out_dir, 'index_bug_entry.tpl'])
+ if self.verbose:
+ print >> self.stdout, 'Creating bug_file.tpl file'
+ self._write_file(self.bug_file,
+ [self.out_dir, 'bug_file.tpl'])
+ if self.verbose:
+ print >> self.stdout, 'Creating bug_comment_entry.tpl file'
+ self._write_file(self.bug_comment_entry,
+ [self.out_dir, 'bug_comment_entry.tpl'])
+
+ def _load_default_templates(self):
+ self.css_file = """
+ body {
+ font-family: "lucida grande", "sans serif";
+ color: #333;
+ width: auto;
+ margin: auto;
+ }
+
+ div.main {
+ padding: 20px;
+ margin: auto;
+ padding-top: 0;
+ margin-top: 1em;
+ background-color: #fcfcfc;
+ }
+
+ div.footer {
+ font-size: small;
+ padding-left: 20px;
+ padding-right: 20px;
+ padding-top: 5px;
+ padding-bottom: 5px;
+ margin: auto;
+ background: #305275;
+ color: #fffee7;
+ }
+
+ table {
+ border-style: solid;
+ border: 10px #313131;
+ border-spacing: 0;
+ width: auto;
+ }
+
+ tb { border: 1px; }
+
+ tr {
+ vertical-align: top;
+ width: auto;
+ }
+
+ td {
+ border-width: 0;
+ border-style: none;
+ padding-right: 0.5em;
+ padding-left: 0.5em;
+ width: auto;
+ }
+
+ img { border-style: none; }
+
+ h1 {
+ padding: 0.5em;
+ background-color: #305275;
+ margin-top: 0;
+ margin-bottom: 0;
+ color: #fff;
+ margin-left: -20px;
+ margin-right: -20px;
+ }
+
+ ul {
+ list-style-type: none;
+ padding: 0;
+ }
+
+ p { width: auto; }
+
+ a, a:visited {
+ background: inherit;
+ text-decoration: none;
+ }
+
+ a { color: #003d41; }
+ a:visited { color: #553d41; }
+ .footer a { color: #508d91; }
+
+ /* bug index pages */
+
+ td.tab {
+ padding-right: 1em;
+ padding-left: 1em;
+ }
+
+ td.sel.tab {
+ background-color: #afafaf;
+ border: 1px solid #afafaf;
+ font-weight:bold;
+ }
+
+ td.nsel.tab { border: 0px; }
+
+ table.bug_list {
+ background-color: #afafaf;
+ border: 2px solid #afafaf;
+ }
+
+ .bug_list tr { width: auto; }
+ tr.wishlist { background-color: #B4FF9B; }
+ tr.minor { background-color: #FCFF98; }
+ tr.serious { background-color: #FFB648; }
+ tr.critical { background-color: #FF752A; }
+ tr.fatal { background-color: #FF3300; }
+
+ /* bug detail pages */
+
+ td.bug_detail_label { text-align: right; }
+ td.bug_detail { }
+ td.bug_comment_label { text-align: right; vertical-align: top; }
+ td.bug_comment { }
+
+ div.comment {
+ padding: 20px;
+ padding-top: 20px;
+ margin: auto;
+ margin-top: 0;
+ }
+
+ div.root.comment {
+ padding: 0px;
+ /* padding-top: 0px; */
+ padding-bottom: 20px;
+ }
+ """
+
+ self.index_file = """
+ <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
+ "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+ <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
+ <head>
+ <title>%(title)s</title>
+ <meta http-equiv="Content-Type" content="text/html; charset=%(charset)s" />
+ <link rel="stylesheet" href="style.css" type="text/css" />
+ </head>
+ <body>
+
+ <div class="main">
+ <h1>%(index_header)s</h1>
+ <p></p>
+ <table>
+
+ <tr>
+ <td class="%(active_class)s"><a href="index.html">Active Bugs</a></td>
+ <td class="%(inactive_class)s"><a href="index_inactive.html">Inactive Bugs</a></td>
+ </tr>
+
+ </table>
+ <table class="bug_list">
+ <tbody>
+
+ %(bug_entries)s
+
+ </tbody>
+ </table>
+ </div>
+
+ <div class="footer">
+ <p>Generated by <a href="http://www.bugseverywhere.org/">
+ BugsEverywhere</a> on %(generation_time)s</p>
+ <p>
+ <a href="http://validator.w3.org/check?uri=referer">Validate XHTML</a>&nbsp;|&nbsp;
+ <a href="http://jigsaw.w3.org/css-validator/check?uri=referer">Validate CSS</a>
+ </p>
+ </div>
+
+ </body>
+ </html>
+ """
+
+ self.index_bug_entry ="""
+ <tr class="%(severity)s">
+ <td><a href="bugs/%(uuid)s.html">%(shortname)s</a></td>
+ <td><a href="bugs/%(uuid)s.html">%(status)s</a></td>
+ <td><a href="bugs/%(uuid)s.html">%(severity)s</a></td>
+ <td><a href="bugs/%(uuid)s.html">%(summary)s</a></td>
+ <td><a href="bugs/%(uuid)s.html">%(time_string)s</a></td>
+ </tr>
+ """
+
+ self.bug_file = """
+ <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
+ "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+ <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
+ <head>
+ <title>%(title)s</title>
+ <meta http-equiv="Content-Type" content="text/html; charset=%(charset)s" />
+ <link rel="stylesheet" href="../style.css" type="text/css" />
+ </head>
+ <body>
+
+ <div class="main">
+ <h1>BugsEverywhere Bug List</h1>
+ <h5><a href="%(up_link)s">Back to Index</a></h5>
+ <h2>Bug: %(shortname)s</h2>
+ <table>
+ <tbody>
+
+ <tr><td class="bug_detail_label">ID :</td>
+ <td class="bug_detail">%(uuid)s</td></tr>
+ <tr><td class="bug_detail_label">Short name :</td>
+ <td class="bug_detail">%(shortname)s</td></tr>
+ <tr><td class="bug_detail_label">Status :</td>
+ <td class="bug_detail">%(status)s</td></tr>
+ <tr><td class="bug_detail_label">Severity :</td>
+ <td class="bug_detail">%(severity)s</td></tr>
+ <tr><td class="bug_detail_label">Assigned :</td>
+ <td class="bug_detail">%(assigned)s</td></tr>
+ <tr><td class="bug_detail_label">Reporter :</td>
+ <td class="bug_detail">%(reporter)s</td></tr>
+ <tr><td class="bug_detail_label">Creator :</td>
+ <td class="bug_detail">%(creator)s</td></tr>
+ <tr><td class="bug_detail_label">Created :</td>
+ <td class="bug_detail">%(time_string)s</td></tr>
+ <tr><td class="bug_detail_label">Summary :</td>
+ <td class="bug_detail">%(summary)s</td></tr>
+ </tbody>
+ </table>
+
+ <hr/>
+
+ %(comment_entries)s
+
+ </div>
+ <h5><a href="%(up_link)s">Back to Index</a></h5>
+
+ <div class="footer">
+ <p>Generated by <a href="http://www.bugseverywhere.org/">
+ BugsEverywhere</a> on %(generation_time)s</p>
+ <p>
+ <a href="http://validator.w3.org/check?uri=referer">Validate XHTML</a>&nbsp;|&nbsp;
+ <a href="http://jigsaw.w3.org/css-validator/check?uri=referer">Validate CSS</a>
+ </p>
+ </div>
+
+ </body>
+ </html>
+ """
+
+ self.bug_comment_entry ="""
+ <table>
+ <tr>
+ <td class="bug_comment_label">Comment:</td>
+ <td class="bug_comment">
+ --------- Comment ---------<br/>
+ ID: %(uuid)s<br/>
+ Short name: %(shortname)s<br/>
+ From: %(author)s<br/>
+ Date: %(date)s<br/>
+ <br/>
+ %(body)s
+ </td>
+ </tr>
+ </table>
+ """
+
+ # strip leading whitespace
+ for attr in ['css_file', 'index_file', 'index_bug_entry', 'bug_file',
+ 'bug_comment_entry']:
+ value = getattr(self, attr)
+ value = value.replace('\n'+' '*12, '\n')
+ setattr(self, attr, value.strip()+'\n')
diff --git a/libbe/command/import_xml.py b/libbe/command/import_xml.py
new file mode 100644
index 0000000..287d8b7
--- /dev/null
+++ b/libbe/command/import_xml.py
@@ -0,0 +1,541 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import copy
+import os
+import sys
+try: # import core module, Python >= 2.5
+ from xml.etree import ElementTree
+except ImportError: # look for non-core module
+ from elementtree import ElementTree
+
+import libbe
+import libbe.bug
+import libbe.command
+import libbe.command.util
+import libbe.comment
+import libbe.util.encoding
+import libbe.util.utility
+
+if libbe.TESTING == True:
+ import doctest
+ import StringIO
+ import unittest
+
+ import libbe.bugdir
+
+class Import_XML (libbe.command.Command):
+ """Import comments and bugs from XML
+
+ >>> import time
+ >>> import StringIO
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Import_XML(ui=ui)
+
+ >>> ui.io.set_stdin('<be-xml><comment><uuid>c</uuid><body>This is a comment about a</body></comment></be-xml>')
+ >>> ret = ui.run(cmd, {'comment-root':'/a'}, ['-'])
+ >>> bd.flush_reload()
+ >>> bug = bd.bug_from_uuid('a')
+ >>> bug.load_comments(load_full=False)
+ >>> comment = bug.comment_root[0]
+ >>> print comment.body
+ This is a comment about a
+ <BLANKLINE>
+ >>> comment.time <= int(time.time())
+ True
+ >>> comment.in_reply_to is None
+ True
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'import-xml'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='ignore-missing-references', short_name='i',
+ help="If any comment's <in-reply-to> refers to a non-existent comment, ignore it (instead of raising an exception)."),
+ libbe.command.Option(name='add-only', short_name='a',
+ help='If any bug or comment listed in the XML file already exists in the bug repository, do not alter the repository version.'),
+ libbe.command.Option(name='comment-root', short_name='c',
+ help='Supply a bug or comment ID as the root of any <comment> elements that are direct children of the <be-xml> element. If any such <comment> elements exist, you are required to set this option.',
+ arg=libbe.command.Argument(
+ name='comment-root', metavar='ID',
+ completion_callback=libbe.command.util.complete_bug_comment_id)),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='xml-file', metavar='XML-FILE'),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ writeable = bugdir.storage.writeable
+ bugdir.storage.writeable = False
+ if params['comment-root'] != None:
+ croot_bug,croot_comment = \
+ libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['comment-root'])
+ croot_bug.load_comments(load_full=True)
+ if croot_comment.uuid == libbe.comment.INVALID_UUID:
+ croot_comment = croot_bug.comment_root
+ else:
+ croot_comment = croot_bug.comment_from_uuid(croot_comment.uuid)
+ new_croot_bug = libbe.bug.Bug(bugdir=bugdir, uuid=croot_bug.uuid)
+ new_croot_bug.explicit_attrs = []
+ new_croot_bug.comment_root = copy.deepcopy(croot_bug.comment_root)
+ if croot_comment.uuid == libbe.comment.INVALID_UUID:
+ new_croot_comment = new_croot_bug.comment_root
+ else:
+ new_croot_comment = \
+ new_croot_bug.comment_from_uuid(croot_comment.uuid)
+ for new in new_croot_bug.comments():
+ new.explicit_attrs = []
+ else:
+ croot_bug,croot_comment = (None, None)
+
+ if params['xml-file'] == '-':
+ xml = self.stdin.read().encode(self.stdin.encoding)
+ else:
+ self._check_restricted_access(bugdir.storage, params['xml-file'])
+ xml = libbe.util.encoding.get_file_contents(
+ params['xml-file'])
+
+ # parse the xml
+ root_bugs = []
+ root_comments = []
+ version = {}
+ be_xml = ElementTree.XML(xml)
+ if be_xml.tag != 'be-xml':
+ raise libbe.util.utility.InvalidXML(
+ 'import-xml', be_xml, 'root element must be <be-xml>')
+ for child in be_xml.getchildren():
+ if child.tag == 'bug':
+ new = libbe.bug.Bug(bugdir=bugdir)
+ new.from_xml(child)
+ root_bugs.append(new)
+ elif child.tag == 'comment':
+ new = libbe.comment.Comment(croot_bug)
+ new.from_xml(child)
+ root_comments.append(new)
+ elif child.tag == 'version':
+ for gchild in child.getchildren():
+ if child.tag in ['tag', 'nick', 'revision', 'revision-id']:
+ text = xml.sax.saxutils.unescape(child.text)
+ text = text.decode('unicode_escape').strip()
+ version[child.tag] = text
+ else:
+ print >> sys.stderr, 'ignoring unknown tag %s in %s' \
+ % (gchild.tag, child.tag)
+ else:
+ print >> sys.stderr, 'ignoring unknown tag %s in %s' \
+ % (child.tag, comment_list.tag)
+
+ # merge the new root_comments
+ if params['add-only'] == True:
+ accept_changes = False
+ accept_extra_strings = False
+ else:
+ accept_changes = True
+ accept_extra_strings = True
+ accept_comments = True
+ if len(root_comments) > 0:
+ if croot_bug == None:
+ raise UserError(
+ '--comment-root option is required for your root comments:\n%s'
+ % '\n\n'.join([c.string() for c in root_comments]))
+ try:
+ # link new comments
+ new_croot_bug.add_comments(root_comments,
+ default_parent=new_croot_comment,
+ ignore_missing_references= \
+ params['ignore-missing-references'])
+ except libbe.comment.MissingReference, e:
+ raise libbe.command.UserError(e)
+ croot_bug.merge(new_croot_bug, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ accept_comments=accept_comments)
+
+ # merge the new croot_bugs
+ merged_bugs = []
+ old_bugs = []
+ for new in root_bugs:
+ try:
+ old = bugdir.bug_from_uuid(new.alt_id)
+ except KeyError:
+ old = None
+ if old == None:
+ bd.append(new)
+ else:
+ old.load_comments(load_full=True)
+ old.merge(new, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ accept_comments=accept_comments)
+ merged_bugs.append(new)
+ old_bugs.append(old)
+
+ # protect against programmer error causing data loss:
+ if croot_bug != None:
+ comms = []
+ for c in croot_comment.traverse():
+ comms.append(c.uuid)
+ if c.alt_id != None:
+ comms.append(c.alt_id)
+ if croot_comment.uuid == libbe.comment.INVALID_UUID:
+ root_text = croot_bug.id.user()
+ else:
+ root_text = croot_comment.id.user()
+ for new in root_comments:
+ assert new.uuid in comms or new.alt_id in comms, \
+ "comment %s (alt: %s) wasn't added to %s" \
+ % (new.uuid, new.alt_id, root_text)
+ for new in root_bugs:
+ if not new in merged_bugs:
+ assert bugdir.has_bug(new.uuid), \
+ "bug %s wasn't added" % (new.uuid)
+
+ # save new information
+ bugdir.storage.writeable = writeable
+ if croot_bug != None:
+ croot_bug.save()
+ for new in root_bugs:
+ if not new in merged_bugs:
+ new.save()
+ for old in old_bugs:
+ old.save()
+
+ def _long_help(self):
+ return """
+Import comments and bugs from XMLFILE. If XMLFILE is '-', the file is
+read from stdin.
+
+This command provides a fallback mechanism for passing bugs between
+repositories, in case the repositories VCSs are incompatible. If the
+VCSs are compatible, it's better to use their builtin merge/push/pull
+to share this information, as that will preserve a more detailed
+history.
+
+The XML file should be formatted similarly to
+ <be-xml>
+ <version>
+ <tag>1.0.0</tag>
+ <branch-nick>be</branch-nick>
+ <revno>446</revno>
+ <revision-id>a@b.com-20091119214553-iqyw2cpqluww3zna</revision-id>
+ <version>
+ <bug>
+ ...
+ <comment>...</comment>
+ <comment>...</comment>
+ </bug>
+ <bug>...</bug>
+ <comment>...</comment>
+ <comment>...</comment>
+ </be-xml>
+where the ellipses mark output commpatible with Bug.xml() and
+Comment.xml(). Take a look at the output of `be show --xml` for some
+explicit examples. Unrecognized tags are ignored. Missing tags are
+left at the default value. The version tag is not required, but is
+strongly recommended.
+
+The bug and comment UUIDs are always auto-generated, so if you set a
+<uuid> field, but no <alt-id> field, your <uuid> will be used as the
+comment's <alt-id>. An exception is raised if <alt-id> conflicts with
+an existing comment. Bugs do not have a permantent alt-id, so they
+the <uuid>s you specify are not saved. The <uuid>s _are_ used to
+match agains prexisting bug and comment uuids, and comment alt-ids,
+and fields explicitly given in the XML file will replace old versions
+unless the --add-only flag.
+
+*.extra_strings recieves special treatment, and if --add-only is not
+set, the resulting list concatenates both source lists and removes
+repeats.
+
+Here's an example of import activity:
+ Repository
+ bug (uuid=B, creator=John, status=open)
+ estr (don't forget your towel)
+ estr (helps with space travel)
+ com (uuid=C1, author=Jane, body=Hello)
+ com (uuid=C2, author=Jess, body=World)
+ XML
+ bug (uuid=B, status=fixed)
+ estr (don't forget your towel)
+ estr (watch out for flying dolphins)
+ com (uuid=C1, body=So long)
+ com (uuid=C3, author=Jed, body=And thanks)
+ Result
+ bug (uuid=B, creator=John, status=fixed)
+ estr (don't forget your towel)
+ estr (helps with space travel)
+ estr (watch out for flying dolphins)
+ com (uuid=C1, author=Jane, body=So long)
+ com (uuid=C2, author=Jess, body=World)
+ com (uuid=C4, alt-id=C3, author=Jed, body=And thanks)
+ Result, with --add-only
+ bug (uuid=B, creator=John, status=open)
+ estr (don't forget your towel)
+ estr (helps with space travel)
+ com (uuid=C1, author=Jane, body=Hello)
+ com (uuid=C2, author=Jess, body=World)
+ com (uuid=C4, alt-id=C3, author=Jed, body=And thanks)
+
+Examples:
+
+Import comments (e.g. emails from an mbox) and append to bug XYZ
+ $ be-mbox-to-xml mail.mbox | be import-xml --c XYZ -
+Or you can append those emails underneath the prexisting comment XYZ-3
+ $ be-mbox-to-xml mail.mbox | be import-xml --c XYZ-3 -
+
+User creates a new bug
+ user$ be new "The demuxulizer is broken"
+ Created bug with ID 48f
+ user$ be comment 48f
+ <Describe bug>
+ ...
+User exports bug as xml and emails it to the developers
+ user$ be show --xml 48f > 48f.xml
+ user$ cat 48f.xml | mail -s "Demuxulizer bug xml" devs@b.com
+or equivalently (with a slightly fancier be-handle-mail compatible
+email):
+ user$ be email-bugs 48f
+Devs recieve email, and save it's contents as demux-bug.xml
+ dev$ cat demux-bug.xml | be import-xml -
+"""
+
+
+Import_xml = Import_XML # alias for libbe.command.base.get_command_class()
+
+if libbe.TESTING == True:
+ class LonghelpTestCase (unittest.TestCase):
+ """
+ Test import scenarios given in longhelp.
+ """
+ def setUp(self):
+ self.bugdir = libbe.bugdir.SimpleBugDir(memory=False)
+ io = libbe.command.StringInputOutput()
+ self.ui = libbe.command.UserInterface(io=io)
+ self.ui.storage_callbacks.set_storage(self.bugdir.storage)
+ self.cmd = Import_XML(ui=self.ui)
+ self.cmd._storage = self.bugdir.storage
+ self.cmd._setup_io = lambda i_enc,o_enc : None
+ bugA = self.bugdir.bug_from_uuid('a')
+ self.bugdir.remove_bug(bugA)
+ self.bugdir.storage.writeable = False
+ bugB = self.bugdir.bug_from_uuid('b')
+ bugB.creator = 'John'
+ bugB.status = 'open'
+ bugB.extra_strings += ["don't forget your towel"]
+ bugB.extra_strings += ['helps with space travel']
+ comm1 = bugB.comment_root.new_reply(body='Hello\n')
+ comm1.uuid = 'c1'
+ comm1.author = 'Jane'
+ comm2 = bugB.comment_root.new_reply(body='World\n')
+ comm2.uuid = 'c2'
+ comm2.author = 'Jess'
+ self.bugdir.storage.writeable = True
+ bugB.save()
+ self.xml = """
+ <be-xml>
+ <bug>
+ <uuid>b</uuid>
+ <status>fixed</status>
+ <summary>a test bug</summary>
+ <extra-string>don't forget your towel</extra-string>
+ <extra-string>watch out for flying dolphins</extra-string>
+ <comment>
+ <uuid>c1</uuid>
+ <body>So long</body>
+ </comment>
+ <comment>
+ <uuid>c3</uuid>
+ <author>Jed</author>
+ <body>And thanks</body>
+ </comment>
+ </bug>
+ </be-xml>
+ """
+ self.root_comment_xml = """
+ <be-xml>
+ <comment>
+ <uuid>c1</uuid>
+ <body>So long</body>
+ </comment>
+ <comment>
+ <uuid>c3</uuid>
+ <author>Jed</author>
+ <body>And thanks</body>
+ </comment>
+ </be-xml>
+ """
+ def tearDown(self):
+ self.bugdir.cleanup()
+ self.ui.cleanup()
+ def _execute(self, xml, params={}, args=[]):
+ self.ui.io.set_stdin(xml)
+ self.ui.run(self.cmd, params, args)
+ self.bugdir.flush_reload()
+ def testCleanBugdir(self):
+ uuids = list(self.bugdir.uuids())
+ self.failUnless(uuids == ['b'], uuids)
+ def testNotAddOnly(self):
+ bugB = self.bugdir.bug_from_uuid('b')
+ self._execute(self.xml, {}, ['-'])
+ uuids = list(self.bugdir.uuids())
+ self.failUnless(uuids == ['b'], uuids)
+ bugB = self.bugdir.bug_from_uuid('b')
+ self.failUnless(bugB.uuid == 'b', bugB.uuid)
+ self.failUnless(bugB.creator == 'John', bugB.creator)
+ self.failUnless(bugB.status == 'fixed', bugB.status)
+ self.failUnless(bugB.summary == 'a test bug', bugB.summary)
+ estrs = ["don't forget your towel",
+ 'helps with space travel',
+ 'watch out for flying dolphins']
+ self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ comments = list(bugB.comments())
+ self.failUnless(len(comments) == 3,
+ ['%s (%s, %s)' % (c.uuid, c.alt_id, c.body)
+ for c in comments])
+ c1 = bugB.comment_from_uuid('c1')
+ comments.remove(c1)
+ self.failUnless(c1.uuid == 'c1', c1.uuid)
+ self.failUnless(c1.alt_id == None, c1.alt_id)
+ self.failUnless(c1.author == 'Jane', c1.author)
+ self.failUnless(c1.body == 'So long\n', c1.body)
+ c2 = bugB.comment_from_uuid('c2')
+ comments.remove(c2)
+ self.failUnless(c2.uuid == 'c2', c2.uuid)
+ self.failUnless(c2.alt_id == None, c2.alt_id)
+ self.failUnless(c2.author == 'Jess', c2.author)
+ self.failUnless(c2.body == 'World\n', c2.body)
+ c4 = comments[0]
+ self.failUnless(len(c4.uuid) == 36, c4.uuid)
+ self.failUnless(c4.alt_id == 'c3', c4.alt_id)
+ self.failUnless(c4.author == 'Jed', c4.author)
+ self.failUnless(c4.body == 'And thanks\n', c4.body)
+ def testAddOnly(self):
+ bugB = self.bugdir.bug_from_uuid('b')
+ initial_bugB_summary = bugB.summary
+ self._execute(self.xml, {'add-only':True}, ['-'])
+ uuids = list(self.bugdir.uuids())
+ self.failUnless(uuids == ['b'], uuids)
+ bugB = self.bugdir.bug_from_uuid('b')
+ self.failUnless(bugB.uuid == 'b', bugB.uuid)
+ self.failUnless(bugB.creator == 'John', bugB.creator)
+ self.failUnless(bugB.status == 'open', bugB.status)
+ self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary)
+ estrs = ["don't forget your towel",
+ 'helps with space travel']
+ self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ comments = list(bugB.comments())
+ self.failUnless(len(comments) == 3,
+ ['%s (%s)' % (c.uuid, c.alt_id) for c in comments])
+ c1 = bugB.comment_from_uuid('c1')
+ comments.remove(c1)
+ self.failUnless(c1.uuid == 'c1', c1.uuid)
+ self.failUnless(c1.alt_id == None, c1.alt_id)
+ self.failUnless(c1.author == 'Jane', c1.author)
+ self.failUnless(c1.body == 'Hello\n', c1.body)
+ c2 = bugB.comment_from_uuid('c2')
+ comments.remove(c2)
+ self.failUnless(c2.uuid == 'c2', c2.uuid)
+ self.failUnless(c2.alt_id == None, c2.alt_id)
+ self.failUnless(c2.author == 'Jess', c2.author)
+ self.failUnless(c2.body == 'World\n', c2.body)
+ c4 = comments[0]
+ self.failUnless(len(c4.uuid) == 36, c4.uuid)
+ self.failUnless(c4.alt_id == 'c3', c4.alt_id)
+ self.failUnless(c4.author == 'Jed', c4.author)
+ self.failUnless(c4.body == 'And thanks\n', c4.body)
+ def testRootCommentsNotAddOnly(self):
+ bugB = self.bugdir.bug_from_uuid('b')
+ initial_bugB_summary = bugB.summary
+ self._execute(self.root_comment_xml, {'comment-root':'/b'}, ['-'])
+ uuids = list(self.bugdir.uuids())
+ uuids = list(self.bugdir.uuids())
+ self.failUnless(uuids == ['b'], uuids)
+ bugB = self.bugdir.bug_from_uuid('b')
+ self.failUnless(bugB.uuid == 'b', bugB.uuid)
+ self.failUnless(bugB.creator == 'John', bugB.creator)
+ self.failUnless(bugB.status == 'open', bugB.status)
+ self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary)
+ estrs = ["don't forget your towel",
+ 'helps with space travel']
+ self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ comments = list(bugB.comments())
+ self.failUnless(len(comments) == 3,
+ ['%s (%s, %s)' % (c.uuid, c.alt_id, c.body)
+ for c in comments])
+ c1 = bugB.comment_from_uuid('c1')
+ comments.remove(c1)
+ self.failUnless(c1.uuid == 'c1', c1.uuid)
+ self.failUnless(c1.alt_id == None, c1.alt_id)
+ self.failUnless(c1.author == 'Jane', c1.author)
+ self.failUnless(c1.body == 'So long\n', c1.body)
+ c2 = bugB.comment_from_uuid('c2')
+ comments.remove(c2)
+ self.failUnless(c2.uuid == 'c2', c2.uuid)
+ self.failUnless(c2.alt_id == None, c2.alt_id)
+ self.failUnless(c2.author == 'Jess', c2.author)
+ self.failUnless(c2.body == 'World\n', c2.body)
+ c4 = comments[0]
+ self.failUnless(len(c4.uuid) == 36, c4.uuid)
+ self.failUnless(c4.alt_id == 'c3', c4.alt_id)
+ self.failUnless(c4.author == 'Jed', c4.author)
+ self.failUnless(c4.body == 'And thanks\n', c4.body)
+ def testRootCommentsAddOnly(self):
+ bugB = self.bugdir.bug_from_uuid('b')
+ initial_bugB_summary = bugB.summary
+ self._execute(self.root_comment_xml,
+ {'comment-root':'/b', 'add-only':True}, ['-'])
+ uuids = list(self.bugdir.uuids())
+ self.failUnless(uuids == ['b'], uuids)
+ bugB = self.bugdir.bug_from_uuid('b')
+ self.failUnless(bugB.uuid == 'b', bugB.uuid)
+ self.failUnless(bugB.creator == 'John', bugB.creator)
+ self.failUnless(bugB.status == 'open', bugB.status)
+ self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary)
+ estrs = ["don't forget your towel",
+ 'helps with space travel']
+ self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ comments = list(bugB.comments())
+ self.failUnless(len(comments) == 3,
+ ['%s (%s)' % (c.uuid, c.alt_id) for c in comments])
+ c1 = bugB.comment_from_uuid('c1')
+ comments.remove(c1)
+ self.failUnless(c1.uuid == 'c1', c1.uuid)
+ self.failUnless(c1.alt_id == None, c1.alt_id)
+ self.failUnless(c1.author == 'Jane', c1.author)
+ self.failUnless(c1.body == 'Hello\n', c1.body)
+ c2 = bugB.comment_from_uuid('c2')
+ comments.remove(c2)
+ self.failUnless(c2.uuid == 'c2', c2.uuid)
+ self.failUnless(c2.alt_id == None, c2.alt_id)
+ self.failUnless(c2.author == 'Jess', c2.author)
+ self.failUnless(c2.body == 'World\n', c2.body)
+ c4 = comments[0]
+ self.failUnless(len(c4.uuid) == 36, c4.uuid)
+ self.failUnless(c4.alt_id == 'c3', c4.alt_id)
+ self.failUnless(c4.author == 'Jed', c4.author)
+ self.failUnless(c4.body == 'And thanks\n', c4.body)
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/command/init.py b/libbe/command/init.py
new file mode 100644
index 0000000..7b83645
--- /dev/null
+++ b/libbe/command/init.py
@@ -0,0 +1,132 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import os.path
+
+import libbe
+import libbe.bugdir
+import libbe.command
+import libbe.storage
+
+class Init (libbe.command.Command):
+ """Create an on-disk bug repository
+
+ >>> import os, sys
+ >>> import libbe.storage.vcs
+ >>> import libbe.storage.vcs.base
+ >>> import libbe.util.utility
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> cmd = Init()
+
+ >>> dir = libbe.util.utility.Dir()
+ >>> vcs = libbe.storage.vcs.vcs_by_name('None')
+ >>> vcs.repo = dir.path
+ >>> try:
+ ... vcs.connect()
+ ... except libbe.storage.ConnectionError:
+ ... 'got error'
+ 'got error'
+ >>> ui.storage_callbacks.set_unconnected_storage(vcs)
+ >>> ui.run(cmd)
+ No revision control detected.
+ BE repository initialized.
+ >>> bd = libbe.bugdir.BugDir(vcs)
+ >>> vcs.disconnect()
+ >>> vcs.connect()
+ >>> bugdir = libbe.bugdir.BugDir(vcs, from_storage=True)
+ >>> vcs.disconnect()
+ >>> vcs.destroy()
+ >>> dir.cleanup()
+
+ >>> dir = libbe.util.utility.Dir()
+ >>> vcs = libbe.storage.vcs.installed_vcs()
+ >>> vcs.repo = dir.path
+ >>> vcs._vcs_init(vcs.repo)
+ >>> ui.storage_callbacks.set_unconnected_storage(vcs)
+ >>> if vcs.name in libbe.storage.vcs.base.VCS_ORDER:
+ ... ui.run(cmd) # doctest: +ELLIPSIS
+ ... else:
+ ... vcs.init()
+ ... vcs.connect()
+ ... print 'Using ... for revision control.\\nDirectory initialized.'
+ Using ... for revision control.
+ BE repository initialized.
+ >>> vcs.disconnect()
+ >>> vcs.connect()
+ >>> bugdir = libbe.bugdir.BugDir(vcs, from_storage=True)
+ >>> vcs.disconnect()
+ >>> vcs.destroy()
+ >>> dir.cleanup()
+ """
+ name = 'init'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+
+ def _run(self, **params):
+ storage = self._get_unconnected_storage()
+ if not os.path.isdir(storage.repo):
+ raise libbe.command.UserError(
+ 'No such directory: %s' % storage.repo)
+ try:
+ storage.connect()
+ raise libbe.command.UserError(
+ 'Directory already initialized: %s' % storage.repo)
+ except libbe.storage.ConnectionError:
+ pass
+ storage.init()
+ storage.connect()
+ self.ui.storage_callbacks.set_storage(storage)
+ bd = libbe.bugdir.BugDir(storage, from_storage=False)
+ self.ui.storage_callbacks.set_bugdir(bd)
+ if bd.storage.name is not 'None':
+ print >> self.stdout, \
+ 'Using %s for revision control.' % storage.name
+ else:
+ print >> self.stdout, 'No revision control detected.'
+ print >> self.stdout, 'BE repository initialized.'
+
+ def _long_help(self):
+ return """
+This command initializes Bugs Everywhere support for the specified directory
+and all its subdirectories. It will auto-detect any supported revision control
+system. You can use "be set vcs_name" to change the vcs being used.
+
+The directory defaults to your current working directory, but you can
+change that by passing the --repo option to be
+ $ be --repo path/to/new/bug/root init
+
+When initialized in a version-controlled directory, BE sinks to the
+version-control root. In that case, the BE repository will be created
+under that directory, rather than the current directory or the one
+passed in --repo. Consider the following tree, versioned in Git.
+ ~
+ `--projectX
+ |-- .git
+ `-- src
+Calling
+ ~$ be --repo ./projectX/src init
+will create the BE repository rooted in projectX:
+ ~
+ `--projectX
+ |-- .be
+ |-- .git
+ `-- src
+"""
diff --git a/libbe/command/list.py b/libbe/command/list.py
new file mode 100644
index 0000000..3803257
--- /dev/null
+++ b/libbe/command/list.py
@@ -0,0 +1,279 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import os
+import re
+
+import libbe
+import libbe.bug
+import libbe.command
+import libbe.command.depend
+import libbe.command.target
+import libbe.command.util
+
+# get a list of * for cmp_*() comparing two bugs.
+AVAILABLE_CMPS = [fn[4:] for fn in dir(libbe.bug) if fn[:4] == 'cmp_']
+AVAILABLE_CMPS.remove('attr') # a cmp_* template.
+
+class Filter (object):
+ def __init__(self, status='all', severity='all', assigned='all',
+ target='all', extra_strings_regexps=[]):
+ self.status = status
+ self.severity = severity
+ self.assigned = assigned
+ self.target = target
+ self.extra_strings_regexps = extra_strings_regexps
+
+ def __call__(self, bugdir, bug):
+ if self.status != 'all' and not bug.status in self.status:
+ return False
+ if self.severity != 'all' and not bug.severity in self.severity:
+ return False
+ if self.assigned != 'all' and not bug.assigned in self.assigned:
+ return False
+ if self.target == 'all':
+ pass
+ else:
+ target_bug = libbe.command.target.bug_target(bugdir, bug)
+ if self.target in ['none', None]:
+ if target_bug.summary != None:
+ return False
+ else:
+ if target_bug.summary != self.target:
+ return False
+ if len(bug.extra_strings) == 0:
+ if len(self.extra_strings_regexps) > 0:
+ return False
+ else:
+ for string in bug.extra_strings:
+ for regexp in self.extra_strings_regexps:
+ if not regexp.match(string):
+ return False
+ return True
+
+class List (libbe.command.Command):
+ """List bugs
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = List(ui=ui)
+
+ >>> ret = ui.run(cmd)
+ abc/a:om: Bug A
+ >>> ret = ui.run(cmd, {'status':'closed'})
+ abc/b:cm: Bug B
+ >>> bd.storage.writeable
+ True
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+
+ name = 'list'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='status',
+ help='Only show bugs matching the STATUS specifier',
+ arg=libbe.command.Argument(
+ name='status', metavar='STATUS', default='active',
+ completion_callback=libbe.command.util.complete_status)),
+ libbe.command.Option(name='severity',
+ help='Only show bugs matching the SEVERITY specifier',
+ arg=libbe.command.Argument(
+ name='severity', metavar='SEVERITY', default='all',
+ completion_callback=libbe.command.util.complete_severity)),
+ libbe.command.Option(name='important',
+ help='List bugs with >= "serious" severity'),
+ libbe.command.Option(name='assigned', short_name='a',
+ help='Only show bugs matching ASSIGNED',
+ arg=libbe.command.Argument(
+ name='assigned', metavar='ASSIGNED', default=None,
+ completion_callback=libbe.command.util.complete_assigned)),
+ libbe.command.Option(name='mine', short_name='m',
+ help='List bugs assigned to you'),
+ libbe.command.Option(name='extra-strings', short_name='e',
+ help='Only show bugs matching STRINGS, e.g. --extra-strings'
+ ' TAG:working,TAG:xml',
+ arg=libbe.command.Argument(
+ name='extra-strings', metavar='STRINGS', default=None,
+ completion_callback=libbe.command.util.complete_extra_strings)),
+ libbe.command.Option(name='sort', short_name='S',
+ help='Adjust bug-sort criteria with comma-separated list '
+ 'SORT. e.g. "--sort creator,time". '
+ 'Available criteria: %s' % ','.join(AVAILABLE_CMPS),
+ arg=libbe.command.Argument(
+ name='sort', metavar='SORT', default=None,
+ completion_callback=libbe.command.util.Completer(AVAILABLE_CMPS))),
+ libbe.command.Option(name='ids', short_name='i',
+ help='Only print the bug IDS'),
+ libbe.command.Option(name='xml', short_name='x',
+ help='Dump output in XML format'),
+ ])
+# parser.add_option("-S", "--sort", metavar="SORT-BY", dest="sort_by",
+# help="Adjust bug-sort criteria with comma-separated list SORT-BY. e.g. \"--sort creator,time\". Available criteria: %s" % ','.join(AVAILABLE_CMPS), default=None)
+# # boolean options. All but ids and xml are special cases of long forms
+# ("w", "wishlist", "List bugs with 'wishlist' severity"),
+# ("A", "active", "List all active bugs"),
+# ("U", "unconfirmed", "List unconfirmed bugs"),
+# ("o", "open", "List open bugs"),
+# ("T", "test", "List bugs in testing"),
+# for s in bools:
+# attr = s[1].replace('-','_')
+# short = "-%c" % s[0]
+# long = "--%s" % s[1]
+# help = s[2]
+# parser.add_option(short, long, action="store_true",
+# dest=attr, help=help, default=False)
+# return parser
+#
+# ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ writeable = bugdir.storage.writeable
+ bugdir.storage.writeable = False
+ cmp_list, status, severity, assigned, extra_strings_regexps = \
+ self._parse_params(bugdir, params)
+ filter = Filter(status, severity, assigned,
+ extra_strings_regexps=extra_strings_regexps)
+ bugs = [bugdir.bug_from_uuid(uuid) for uuid in bugdir.uuids()]
+ bugs = [b for b in bugs if filter(bugdir, b) == True]
+ self.result = bugs
+ if len(bugs) == 0 and params['xml'] == False:
+ print >> self.stdout, 'No matching bugs found'
+
+ # sort bugs
+ bugs = self._sort_bugs(bugs, cmp_list)
+
+ # print list of bugs
+ if params['ids'] == True:
+ for bug in bugs:
+ print >> self.stdout, bug.id.user()
+ else:
+ self._list_bugs(bugs, xml=params['xml'])
+ bugdir.storage.writeable = writeable
+ return 0
+
+ def _parse_params(self, bugdir, params):
+ cmp_list = []
+ if params['sort'] != None:
+ for cmp in params['sort'].sort_by.split(','):
+ if cmp not in AVAILABLE_CMPS:
+ raise libbe.command.UserError(
+ 'Invalid sort on "%s".\nValid sorts:\n %s'
+ % (cmp, '\n '.join(AVAILABLE_CMPS)))
+ cmp_list.append(eval('libbe.bug.cmp_%s' % cmp))
+ # select status
+ if params['status'] == 'all':
+ status = libbe.bug.status_values
+ elif params['status'] == 'active':
+ status = list(libbe.bug.active_status_values)
+ elif params['status'] == 'inactive':
+ status = list(libbe.bug.inactive_status_values)
+ else:
+ status = libbe.command.util.select_values(
+ params['status'], libbe.bug.status_values)
+ # select severity
+ if params['severity'] == 'all':
+ severity = libbe.bug.severity_values
+ elif params['important'] == True:
+ serious = libbe.bug.severity_values.index('serious')
+ severity.append(list(libbe.bug.severity_values[serious:]))
+ else:
+ severity = libbe.command.util.select_values(
+ params['severity'], libbe.bug.severity_values)
+ # select assigned
+ if params['assigned'] == None:
+ if params['mine'] == True:
+ assigned = [self._get_user_id()]
+ else:
+ assigned = 'all'
+ else:
+ assigned = libbe.command.util.select_values(
+ params['assigned'], libbe.command.util.assignees(bugdir))
+ for i in range(len(assigned)):
+ if assigned[i] == '-':
+ assigned[i] = params['user-id']
+ if params['extra-strings'] == None:
+ extra_strings_regexps = []
+ else:
+ extra_strings_regexps = [re.compile(x)
+ for x in params['extra-strings'].split(',')]
+ return (cmp_list, status, severity, assigned, extra_strings_regexps)
+
+ def _sort_bugs(self, bugs, cmp_list=[]):
+ cmp_list.extend(libbe.bug.DEFAULT_CMP_FULL_CMP_LIST)
+ cmp_fn = libbe.bug.BugCompoundComparator(cmp_list=cmp_list)
+ bugs.sort(cmp_fn)
+ return bugs
+
+ def _list_bugs(self, bugs, xml=False):
+ if xml == True:
+ print >> self.stdout, \
+ '<?xml version="1.0" encoding="%s" ?>' % self.stdout.encoding
+ print >> self.stdout, '<be-xml>'
+ if len(bugs) > 0:
+ for bug in bugs:
+ if xml == True:
+ print >> self.stdout, bug.xml(show_comments=True)
+ else:
+ print >> self.stdout, bug.string(shortlist=True)
+ if xml == True:
+ print >> self.stdout, '</be-xml>'
+
+ def _long_help(self):
+ return """
+This command lists bugs. Normally it prints a short string like
+ bea/576:om: Allow attachments
+Where
+ bea/576 the bug id
+ o the bug status is 'open' (first letter)
+ m the bug severity is 'minor' (first letter)
+ Allo... the bug summary string
+
+You can optionally (-u) print only the bug ids.
+
+There are several criteria that you can filter by:
+ * status
+ * severity
+ * assigned (who the bug is assigned to)
+Allowed values for each criterion may be given in a comma seperated
+list. The special string "all" may be used with any of these options
+to match all values of the criterion. As with the --status and
+--severity options for `be depend`, starting the list with a minus
+sign makes your selections a blacklist instead of the default
+whitelist.
+
+status
+ %s
+severity
+ %s
+assigned
+ free form, with the string '-' being a shortcut for yourself.
+
+In addition, there are some shortcut options that set boolean flags.
+The boolean options are ignored if the matching string option is used.
+""" % (','.join(libbe.bug.status_values),
+ ','.join(libbe.bug.severity_values))
diff --git a/libbe/command/merge.py b/libbe/command/merge.py
new file mode 100644
index 0000000..2dff59c
--- /dev/null
+++ b/libbe/command/merge.py
@@ -0,0 +1,189 @@
+# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import copy
+import os
+
+import libbe
+import libbe.command
+import libbe.command.util
+
+
+class Merge (libbe.command.Command):
+ """Merge duplicate bugs
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> import libbe.comment
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> cmd = Merge(ui=ui)
+
+ >>> a = bd.bug_from_uuid('a')
+ >>> a.comment_root.time = 0
+ >>> dummy = a.new_comment('Testing')
+ >>> dummy.time = 1
+ >>> dummy = dummy.new_reply('Testing...')
+ >>> dummy.time = 2
+ >>> b = bd.bug_from_uuid('b')
+ >>> b.status = 'open'
+ >>> b.comment_root.time = 0
+ >>> dummy = b.new_comment('1 2')
+ >>> dummy.time = 1
+ >>> dummy = dummy.new_reply('1 2 3 4')
+ >>> dummy.time = 2
+
+ >>> ret = ui.run(cmd, args=['/a', '/b'])
+ Merged bugs #abc/a# and #abc/b#
+ >>> bd.flush_reload()
+ >>> a = bd.bug_from_uuid('a')
+ >>> a.load_comments()
+ >>> a_comments = sorted([c for c in a.comments()],
+ ... cmp=libbe.comment.cmp_time)
+ >>> mergeA = a_comments[0]
+ >>> mergeA.time = 3
+ >>> print a.string(show_comments=True) # doctest: +ELLIPSIS
+ ID : a
+ Short name : abc/a
+ Severity : minor
+ Status : open
+ Assigned :
+ Reporter :
+ Creator : John Doe <jdoe@example.com>
+ Created : ...
+ Bug A
+ --------- Comment ---------
+ Name: abc/a/...
+ From: ...
+ Date: ...
+ <BLANKLINE>
+ Testing
+ --------- Comment ---------
+ Name: abc/a/...
+ From: ...
+ Date: ...
+ <BLANKLINE>
+ Testing...
+ --------- Comment ---------
+ Name: abc/a/...
+ From: ...
+ Date: ...
+ <BLANKLINE>
+ Merged from bug #abc/b#
+ --------- Comment ---------
+ Name: abc/a/...
+ From: ...
+ Date: ...
+ <BLANKLINE>
+ 1 2
+ --------- Comment ---------
+ Name: abc/a/...
+ From: ...
+ Date: ...
+ <BLANKLINE>
+ 1 2 3 4
+ >>> b = bd.bug_from_uuid('b')
+ >>> b.load_comments()
+ >>> b_comments = sorted([c for c in b.comments()],
+ ... libbe.comment.cmp_time)
+ >>> mergeB = b_comments[0]
+ >>> mergeB.time = 3
+ >>> print b.string(show_comments=True) # doctest: +ELLIPSIS
+ ID : b
+ Short name : abc/b
+ Severity : minor
+ Status : closed
+ Assigned :
+ Reporter :
+ Creator : Jane Doe <jdoe@example.com>
+ Created : ...
+ Bug B
+ --------- Comment ---------
+ Name: abc/b/...
+ From: ...
+ Date: ...
+ <BLANKLINE>
+ 1 2
+ --------- Comment ---------
+ Name: abc/b/...
+ From: ...
+ Date: ...
+ <BLANKLINE>
+ 1 2 3 4
+ --------- Comment ---------
+ Name: abc/b/...
+ From: ...
+ Date: ...
+ <BLANKLINE>
+ Merged into bug #abc/a#
+ >>> print b.status
+ closed
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'merge'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='bug-id', metavar='BUG-ID', default=None,
+ completion_callback=libbe.command.util.complete_bug_id),
+ libbe.command.Argument(
+ name='bug-id-to-merge', metavar='BUG-ID', default=None,
+ completion_callback=libbe.command.util.complete_bug_id),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ bugA,dummy_comment = \
+ libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['bug-id'])
+ bugA.load_comments()
+ bugB,dummy_comment = \
+ libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['bug-id-to-merge'])
+ bugB.load_comments()
+ mergeA = bugA.new_comment('Merged from bug #%s#' % bugB.id.long_user())
+ newCommTree = copy.deepcopy(bugB.comment_root)
+ for comment in newCommTree.traverse(): # all descendant comments
+ comment.bug = bugA
+ # uuids must be unique in storage
+ if comment.alt_id == None:
+ comment.storage = None
+ comment.alt_id = comment.uuid
+ comment.storage = bugdir.storage
+ comment.uuid = libbe.util.id.uuid_gen()
+ comment.save() # force onto disk under bugA
+
+ for comment in newCommTree: # just the child comments
+ mergeA.add_reply(comment, allow_time_inversion=True)
+ bugB.new_comment('Merged into bug #%s#' % bugA.id.long_user())
+ bugB.status = 'closed'
+ print >> self.stdout, 'Merged bugs #%s# and #%s#' \
+ % (bugA.id.user(), bugB.id.user())
+ return 0
+
+ def _long_help(self):
+ return """
+The second bug (B) is merged into the first (A). This adds merge
+comments to both bugs, closes B, and appends B's comment tree to A's
+merge comment.
+"""
diff --git a/libbe/command/new.py b/libbe/command/new.py
new file mode 100644
index 0000000..be18306
--- /dev/null
+++ b/libbe/command/new.py
@@ -0,0 +1,103 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.command
+import libbe.command.util
+
+
+class New (libbe.command.Command):
+ """Create a new bug
+
+ >>> import os
+ >>> import sys
+ >>> import time
+ >>> import libbe.bugdir
+ >>> import libbe.util.id
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = New()
+
+ >>> uuid_gen = libbe.util.id.uuid_gen
+ >>> libbe.util.id.uuid_gen = lambda: 'X'
+ >>> ui._user_id = u'Fran\\xe7ois'
+ >>> ret = ui.run(cmd, args=['this is a test',])
+ Created bug with ID abc/X
+ >>> libbe.util.id.uuid_gen = uuid_gen
+ >>> bd.flush_reload()
+ >>> bug = bd.bug_from_uuid('X')
+ >>> print bug.summary
+ this is a test
+ >>> bug.creator
+ u'Fran\\xe7ois'
+ >>> bug.reporter
+ u'Fran\\xe7ois'
+ >>> bug.time <= int(time.time())
+ True
+ >>> print bug.severity
+ minor
+ >>> print bug.status
+ open
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'new'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='reporter', short_name='r',
+ help='The user who reported the bug',
+ arg=libbe.command.Argument(
+ name='reporter', metavar='NAME')),
+ libbe.command.Option(name='assigned', short_name='a',
+ help='The developer in charge of the bug',
+ arg=libbe.command.Argument(
+ name='assigned', metavar='NAME',
+ completion_callback=libbe.command.util.complete_assigned)),
+ ])
+ self.args.extend([
+ libbe.command.Argument(name='summary', metavar='SUMMARY')
+ ])
+
+ def _run(self, **params):
+ if params['summary'] == '-': # read summary from stdin
+ summary = self.stdin.readline()
+ else:
+ summary = params['summary']
+ bugdir = self._get_bugdir()
+ bug = bugdir.new_bug(summary=summary.strip())
+ bug.creator = self._get_user_id()
+ if params['reporter'] != None:
+ bug.reporter = params['reporter']
+ else:
+ bug.reporter = bug.creator
+ if params['assigned'] != None:
+ bug.assigned = params['assigned']
+ print >> self.stdout, 'Created bug with ID %s' % bug.id.user()
+ return 0
+
+ def _long_help(self):
+ return """
+Create a new bug, with a new ID. The summary specified on the
+commandline is a string (only one line) that describes the bug briefly
+or "-", in which case the string will be read from stdin.
+"""
diff --git a/libbe/command/open.py b/libbe/command/open.py
new file mode 100644
index 0000000..0c6bf05
--- /dev/null
+++ b/libbe/command/open.py
@@ -0,0 +1,58 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Marien Zwart <marienz@gentoo.org>
+# Thomas Gerigk <tgerigk@gmx.de>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""Re-open a bug"""
+from libbe import cmdutil, bugdir
+__desc__ = __doc__
+
+def execute(args, manipulate_encodings=True):
+ """
+ >>> import os
+ >>> bd = bugdir.SimpleBugDir()
+ >>> os.chdir(bd.root)
+ >>> print bd.bug_from_shortname("b").status
+ closed
+ >>> execute(["b"], manipulate_encodings=False)
+ >>> bd._clear_bugs()
+ >>> print bd.bug_from_shortname("b").status
+ open
+ >>> bd.cleanup()
+ """
+ parser = get_parser()
+ options, args = parser.parse_args(args)
+ cmdutil.default_complete(options, args, parser,
+ bugid_args={0: lambda bug : bug.active==False})
+ if len(args) == 0:
+ raise cmdutil.UsageError, "Please specify a bug id."
+ if len(args) > 1:
+ raise cmdutil.UsageError, "Too many arguments."
+ bd = bugdir.BugDir(from_disk=True,
+ manipulate_encodings=manipulate_encodings)
+ bug = cmdutil.bug_from_shortname(bd, args[0])
+ bug.status = "open"
+
+def get_parser():
+ parser = cmdutil.CmdOptionParser("be open BUG-ID")
+ return parser
+
+longhelp="""
+Mark a bug as 'open'.
+"""
+
+def help():
+ return get_parser().help_str() + longhelp
diff --git a/libbe/command/remove.py b/libbe/command/remove.py
new file mode 100644
index 0000000..8d8e641
--- /dev/null
+++ b/libbe/command/remove.py
@@ -0,0 +1,79 @@
+# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.command
+import libbe.command.util
+
+
+class Remove (libbe.command.Command):
+ """Remove (delete) a bug and its comments
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Remove(ui=ui)
+
+ >>> print bd.bug_from_uuid('b').status
+ closed
+ >>> ret = ui.run(cmd, args=['/b'])
+ Removed bug abc/b
+ >>> bd.flush_reload()
+ >>> try:
+ ... bd.bug_from_uuid('b')
+ ... except libbe.bugdir.NoBugMatches:
+ ... print 'Bug not found'
+ Bug not found
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'remove'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='bug-id', metavar='BUG-ID', default=None,
+ repeatable=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ user_ids = []
+ for bug_id in params['bug-id']:
+ bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
+ bugdir, bug_id)
+ user_ids.append(bug.id.user())
+ bugdir.remove_bug(bug)
+ if len(user_ids) == 1:
+ print >> self.stdout, 'Removed bug %s' % user_ids[0]
+ else:
+ print >> self.stdout, 'Removed bugs %s' % ', '.join(user_ids)
+ return 0
+
+ def _long_help(self):
+ return """
+Remove (delete) existing bugs. Use with caution: if you're not using
+a revision control system, there may be no way to recover the lost
+information. You should use this command, for example, to get rid of
+blank or otherwise mangled bugs.
+"""
diff --git a/libbe/command/serve.py b/libbe/command/serve.py
new file mode 100644
index 0000000..7237343
--- /dev/null
+++ b/libbe/command/serve.py
@@ -0,0 +1,1172 @@
+# Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Define the :class:`Serve` serving BE Storage over HTTP.
+
+See Also
+--------
+:mod:`libbe.storage.http` : the associated client
+"""
+
+import hashlib
+import logging
+import os.path
+import posixpath
+import re
+import sys
+import time
+import traceback
+import types
+import urllib
+import wsgiref.simple_server
+try:
+ # Python >= 2.6
+ from urlparse import parse_qs
+except ImportError:
+ # Python <= 2.5
+ from cgi import parse_qs
+try:
+ import cherrypy
+ import cherrypy.wsgiserver
+except ImportError:
+ cherrypy = None
+if cherrypy != None:
+ try: # CherryPy >= 3.2
+ import cherrypy.wsgiserver.ssl_builtin
+ except ImportError: # CherryPy <= 3.1.X
+ cherrypy.wsgiserver.ssl_builtin = None
+try:
+ import OpenSSL
+except ImportError:
+ OpenSSL = None
+
+import libbe
+import libbe.command
+import libbe.command.util
+import libbe.util.encoding
+import libbe.version
+
+if libbe.TESTING == True:
+ import copy
+ import doctest
+ import StringIO
+ import unittest
+ import wsgiref.validate
+ try:
+ import cherrypy.test.webtest
+ cherrypy_test_webtest = True
+ except ImportError:
+ cherrypy_test_webtest = None
+
+ import libbe.bugdir
+
+class _HandlerError (Exception):
+ def __init__(self, code, msg, headers=[]):
+ Exception.__init__(self, '%d %s' % (code, msg))
+ self.code = code
+ self.msg = msg
+ self.headers = headers
+
+class _Unauthenticated (_HandlerError):
+ def __init__(self, realm, msg='User Not Authenticated', headers=[]):
+ _HandlerError.__init__(self, 401, msg, headers+[
+ ('WWW-Authenticate','Basic realm="%s"' % realm)])
+
+class _Unauthorized (_HandlerError):
+ def __init__(self, msg='User Not Authorized', headers=[]):
+ _HandlerError.__init__(self, 403, msg, headers)
+
+class User (object):
+ def __init__(self, uname=None, name=None, passhash=None, password=None):
+ self.uname = uname
+ self.name = name
+ self.passhash = passhash
+ if passhash == None:
+ if password != None:
+ self.passhash = self.hash(password)
+ else:
+ assert password == None, \
+ 'Redundant password %s with passhash %s' % (password, passhash)
+ self.users = None
+ def from_string(self, string):
+ string = string.strip()
+ fields = string.split(':')
+ if len(fields) != 3:
+ raise ValueError, '%d!=3 fields in "%s"' % (len(fields), string)
+ self.uname,self.name,self.passhash = fields
+ def __str__(self):
+ return ':'.join([self.uname, self.name, self.passhash])
+ def __cmp__(self, other):
+ return cmp(self.uname, other.uname)
+ def hash(self, password):
+ return hashlib.sha1(password).hexdigest()
+ def valid_login(self, password):
+ if self.hash(password) == self.passhash:
+ return True
+ return False
+ def set_name(self, name):
+ self._set_property('name', name)
+ def set_password(self, password):
+ self._set_property('passhash', self.hash(password))
+ def _set_property(self, property, value):
+ if self.uname == 'guest':
+ raise _Unauthorized('guest user not allowed to change %s' % property)
+ if getattr(self, property) != value \
+ and self.users != None:
+ self.users.changed = True
+ setattr(self, property, value)
+
+class Users (dict):
+ def __init__(self, filename=None):
+ dict.__init__(self)
+ self.filename = filename
+ self.changed = False
+ def load(self):
+ if self.filename == None:
+ return
+ user_file = libbe.util.encoding.get_file_contents(
+ self.filename, decode=True)
+ self.clear()
+ for line in user_file.splitlines():
+ user = User()
+ user.from_string(line)
+ self.add_user(user)
+ def save(self):
+ if self.filename != None and self.changed == True:
+ lines = []
+ for user in sorted(self.users):
+ lines.append(str(user))
+ libbe.util.encoding.set_file_contents(self.filename)
+ self.changed = False
+ def add_user(self, user):
+ assert user.users == None, user.users
+ user.users = self
+ self[user.uname] = user
+ def valid_login(self, uname, password):
+ if uname in self and \
+ self[uname].valid_login(password) == True:
+ return True
+ return False
+
+class WSGI_Object (object):
+ """Utility class for WGSI clients and middleware.
+
+ For details on WGSI, see `PEP 333`_
+
+ .. _PEP 333: http://www.python.org/dev/peps/pep-0333/
+ """
+ def __init__(self, logger=None, log_level=logging.INFO, log_format=None):
+ self.logger = logger
+ self.log_level = log_level
+ if log_format == None:
+ self.log_format = (
+ '%(REMOTE_ADDR)s - %(REMOTE_USER)s [%(time)s] '
+ '"%(REQUEST_METHOD)s %(REQUEST_URI)s %(HTTP_VERSION)s" '
+ '%(status)s %(bytes)s "%(HTTP_REFERER)s" "%(HTTP_USER_AGENT)s"')
+ else:
+ self.log_format = log_format
+
+ def __call__(self, environ, start_response):
+ """The main WSGI entry point."""
+ raise NotImplementedError
+ # start_response() is a callback for setting response headers
+ # start_response(status, response_headers, exc_info=None)
+ # status is an HTTP status string (e.g., "200 OK").
+ # response_headers is a list of 2-tuples, the HTTP headers in
+ # key-value format.
+ # exc_info is used in exception handling.
+ #
+ # The application function then returns an iterable of body chunks.
+
+ def error(self, environ, start_response, error, message, headers=[]):
+ """Make it easy to call start_response for errors."""
+ response = '%d %s' % (error, message)
+ self.log_request(environ, status=response, bytes=len(message))
+ start_response(response,
+ [('Content-Type', 'text/plain')]+headers)
+ return [message]
+
+ def log_request(self, environ, status='-1 OK', bytes=-1):
+ if self.logger == None:
+ return
+ req_uri = urllib.quote(environ.get('SCRIPT_NAME', '')
+ + environ.get('PATH_INFO', ''))
+ if environ.get('QUERY_STRING'):
+ req_uri += '?'+environ['QUERY_STRING']
+ start = time.localtime()
+ if time.daylight:
+ offset = time.altzone / 60 / 60 * -100
+ else:
+ offset = time.timezone / 60 / 60 * -100
+ if offset >= 0:
+ offset = "+%0.4d" % (offset)
+ elif offset < 0:
+ offset = "%0.4d" % (offset)
+ d = {
+ 'REMOTE_ADDR': environ.get('REMOTE_ADDR') or '-',
+ 'REMOTE_USER': environ.get('REMOTE_USER') or '-',
+ 'REQUEST_METHOD': environ['REQUEST_METHOD'],
+ 'REQUEST_URI': req_uri,
+ 'HTTP_VERSION': environ.get('SERVER_PROTOCOL'),
+ 'time': time.strftime('%d/%b/%Y:%H:%M:%S ', start) + offset,
+ 'status': status.split(None, 1)[0],
+ 'bytes': bytes,
+ 'HTTP_REFERER': environ.get('HTTP_REFERER', '-'),
+ 'HTTP_USER_AGENT': environ.get('HTTP_USER_AGENT', '-'),
+ }
+ self.logger.log(self.log_level, self.log_format % d)
+
+class ExceptionApp (WSGI_Object):
+ """Some servers (e.g. cherrypy) eat app-raised exceptions.
+
+ Work around that by logging tracebacks by hand.
+ """
+ def __init__(self, app, *args, **kwargs):
+ WSGI_Object.__init__(self, *args, **kwargs)
+ self.app = app
+
+ def __call__(self, environ, start_response):
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'ExceptionApp')
+ try:
+ return self.app(environ, start_response)
+ except Exception, e:
+ etype,value,tb = sys.exc_info()
+ trace = ''.join(
+ traceback.format_exception(etype, value, tb, None))
+ self.logger.log(self.log_level, trace)
+ raise
+
+class UppercaseHeaderApp (WSGI_Object):
+ """WSGI middleware that uppercases incoming HTTP headers.
+
+ From PEP 333, `The start_response() Callable`_ :
+
+ A reminder for server/gateway authors: HTTP
+ header names are case-insensitive, so be sure
+ to take that into consideration when examining
+ application-supplied headers!
+
+ .. _The start_response() Callable:
+ http://www.python.org/dev/peps/pep-0333/#id20
+ """
+ def __init__(self, app, *args, **kwargs):
+ WSGI_Object.__init__(self, *args, **kwargs)
+ self.app = app
+
+ def __call__(self, environ, start_response):
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'UppercaseHeaderApp')
+ for key,value in environ.items():
+ if key.startswith('HTTP_'):
+ uppercase = key.upper()
+ if uppercase != key:
+ environ[uppercase] = environ.pop(key)
+ return self.app(environ, start_response)
+
+class AuthenticationApp (WSGI_Object):
+ """WSGI middleware for handling user authentication.
+ """
+ def __init__(self, app, realm, setting='be-auth', users=None, *args, **kwargs):
+ WSGI_Object.__init__(self, *args, **kwargs)
+ self.app = app
+ self.realm = realm
+ self.setting = setting
+ self.users = users
+
+ def __call__(self, environ, start_response):
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'AuthenticationApp')
+ environ['%s.realm' % self.setting] = self.realm
+ try:
+ username = self.authenticate(environ)
+ environ['%s.user' % self.setting] = username
+ environ['%s.user.name' % self.setting] = \
+ self.users[username].name
+ return self.app(environ, start_response)
+ except _Unauthorized, e:
+ return self.error(environ, start_response,
+ e.code, e.msg, e.headers)
+
+ def authenticate(self, environ):
+ """Handle user-authentication sent in the "Authorization" header.
+
+ This function implements ``Basic`` authentication as described in
+ HTTP/1.0 specification [1]_ . Do not use this module unless you
+ are using SSL, as it transmits unencrypted passwords.
+
+ .. [1] http://www.w3.org/Protocols/HTTP/1.0/draft-ietf-http-spec.html#BasicAA
+
+ Examples
+ --------
+
+ >>> users = Users()
+ >>> users.add_user(User('Aladdin', 'Big Al', password='open sesame'))
+ >>> app = AuthenticationApp(app=None, realm='Dummy Realm', users=users)
+ >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='})
+ 'Aladdin'
+ >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic AAAAAAAAAAAAAAAAAAAAAAAAAA=='})
+
+ Notes
+ -----
+
+ Code based on authkit/authenticate/basic.py
+ (c) 2005 Clark C. Evans.
+ Released under the MIT License:
+ http://www.opensource.org/licenses/mit-license.php
+ """
+ authorization = environ.get('HTTP_AUTHORIZATION', None)
+ if authorization == None:
+ raise _Unauthorized('Authorization required')
+ try:
+ authmeth,auth = authorization.split(' ',1)
+ except ValueError:
+ return None
+ if 'basic' != authmeth.lower():
+ return None # non-basic HTTP authorization not implemented
+ auth = auth.strip().decode('base64')
+ try:
+ username,password = auth.split(':',1)
+ except ValueError:
+ return None
+ if self.authfunc(environ, username, password) == True:
+ return username
+
+ def authfunc(self, environ, username, password):
+ if not username in self.users:
+ return False
+ if self.users[username].valid_login(password) == True:
+ if self.logger != None:
+ self.logger.log(self.log_level,
+ 'Authenticated %s' % self.users[username].name)
+ return True
+ return False
+
+class WSGI_AppObject (WSGI_Object):
+ """Useful WSGI utilities for handling data (POST, QUERY) and
+ returning responses.
+ """
+ def __init__(self, *args, **kwargs):
+ WSGI_Object.__init__(self, *args, **kwargs)
+
+ # Maximum input we will accept when REQUEST_METHOD is POST
+ # 0 ==> unlimited input
+ self.maxlen = 0
+
+ def ok_response(self, environ, start_response, content,
+ content_type='application/octet-stream',
+ headers=[]):
+ if content == None:
+ start_response('200 OK', [])
+ return []
+ if type(content) == types.UnicodeType:
+ content = content.encode('utf-8')
+ for i,header in enumerate(headers):
+ header_name,header_value = header
+ if type(header_value) == types.UnicodeType:
+ headers[i] = (header_name, header_value.encode('ISO-8859-1'))
+ response = '200 OK'
+ content_length = len(content)
+ self.log_request(environ, status=response, bytes=content_length)
+ start_response('200 OK', [
+ ('Content-Type', content_type),
+ ('Content-Length', str(content_length)),
+ ]+headers)
+ if self.is_head(environ) == True:
+ return []
+ return [content]
+
+ def query_data(self, environ):
+ if not environ['REQUEST_METHOD'] in ['GET', 'HEAD']:
+ raise _HandlerError(404, 'Not Found')
+ return self._parse_query(environ.get('QUERY_STRING', ''))
+
+ def _parse_query(self, query):
+ if len(query) == 0:
+ return {}
+ data = parse_qs(
+ query, keep_blank_values=True, strict_parsing=True)
+ for k,v in data.items():
+ if len(v) == 1:
+ data[k] = v[0]
+ return data
+
+ def post_data(self, environ):
+ if environ['REQUEST_METHOD'] != 'POST':
+ raise _HandlerError(404, 'Not Found')
+ post_data = self._read_post_data(environ)
+ return self._parse_post(post_data)
+
+ def _parse_post(self, post):
+ return self._parse_query(post)
+
+ def _read_post_data(self, environ):
+ try:
+ clen = int(environ.get('CONTENT_LENGTH', '0'))
+ except ValueError:
+ clen = 0
+ if clen != 0:
+ if self.maxlen > 0 and clen > self.maxlen:
+ raise ValueError, 'Maximum content length exceeded'
+ return environ['wsgi.input'].read(clen)
+ return ''
+
+ def data_get_string(self, data, key, default=None, source='query'):
+ if not key in data or data[key] in [None, 'None']:
+ if default == _HandlerError:
+ raise _HandlerError(406, 'Missing %s key %s' % (source, key))
+ return default
+ return data[key]
+
+ def data_get_id(self, data, key='id', default=_HandlerError,
+ source='query'):
+ return self.data_get_string(data, key, default, source)
+
+ def data_get_boolean(self, data, key, default=False, source='query'):
+ val = self.data_get_string(data, key, default, source)
+ if val == 'True':
+ return True
+ elif val == 'False':
+ return False
+ return val
+
+ def is_head(self, environ):
+ return environ['REQUEST_METHOD'] == 'HEAD'
+
+
+class AdminApp (WSGI_AppObject):
+ """WSGI middleware for managing users (changing passwords,
+ usernames, etc.).
+ """
+ def __init__(self, app, users=None, url=r'^admin/?', *args, **kwargs):
+ WSGI_AppObject.__init__(self, *args, **kwargs)
+ self.app = app
+ self.users = users
+ self.url = url
+
+ def __call__(self, environ, start_response):
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'AdminApp')
+ path = environ.get('PATH_INFO', '').lstrip('/')
+ match = re.search(self.url, path)
+ if match is not None:
+ return self.admin(environ, start_response)
+ return self.app(environ, start_response)
+
+ def admin(self, environ, start_response):
+ if not 'be-auth.user' in environ:
+ raise _Unauthenticated(realm=envirion.get('be-auth.realm'))
+ uname = environ.get('be-auth.user')
+ user = self.users[uname]
+ data = self.post_data(environ)
+ source = 'post'
+ name = self.data_get_string(
+ data, 'name', default=None, source=source)
+ if name != None:
+ self.users[uname].set_name(name)
+ password = self.data_get_string(
+ data, 'password', default=None, source=source)
+ if password != None:
+ self.users[uname].set_password(password)
+ self.users.save()
+ return self.ok_response(environ, start_response, None)
+
+class ServerApp (WSGI_AppObject):
+ """WSGI server for a BE Storage instance over HTTP.
+
+ RESTful_ WSGI request handler for serving the
+ libbe.storage.http.HTTP backend with GET, POST, and HEAD commands.
+ For more information on authentication and REST, see John
+ Calcote's `Open Sourcery article`_
+
+ .. _RESTful: http://www.ics.uci.edu/~fielding/pubs/dissertation/rest_arch_style.htm
+ .. _Open Sourcery article: http://jcalcote.wordpress.com/2009/08/10/restful-authentication/
+
+ This serves files from a connected storage instance, usually
+ a VCS-based repository located on the local machine.
+
+ Notes
+ -----
+
+ The GET and HEAD requests are identical except that the HEAD
+ request omits the actual content of the file.
+ """
+ server_version = "BE-server/" + libbe.version.version()
+
+ def __init__(self, storage, *args, **kwargs):
+ WSGI_AppObject.__init__(self, *args, **kwargs)
+ self.storage = storage
+ self.http_user_error = 418
+
+ self.urls = [
+ (r'^add/?', self.add),
+ (r'^exists/?', self.exists),
+ (r'^remove/?', self.remove),
+ (r'^ancestors/?', self.ancestors),
+ (r'^children/?', self.children),
+ (r'^get/(.+)', self.get),
+ (r'^set/(.+)', self.set),
+ (r'^commit/?', self.commit),
+ (r'^revision-id/?', self.revision_id),
+ (r'^changed/?', self.changed),
+ (r'^version/?', self.version),
+ ]
+
+ def __call__(self, environ, start_response):
+ """The main WSGI application.
+
+ Dispatch the current request to the functions from above and
+ store the regular expression captures in the WSGI environment
+ as `be-server.url_args` so that the functions from above can
+ access the url placeholders.
+
+ URL dispatcher from Armin Ronacher's "Getting Started with WSGI"
+ http://lucumr.pocoo.org/2007/5/21/getting-started-with-wsgi
+ """
+ if self.logger != None:
+ self.logger.log(logging.DEBUG, 'ServerApp')
+ path = environ.get('PATH_INFO', '').lstrip('/')
+ try:
+ for regex, callback in self.urls:
+ match = re.search(regex, path)
+ if match is not None:
+ environ['be-server.url_args'] = match.groups()
+ try:
+ return callback(environ, start_response)
+ except libbe.storage.NotReadable, e:
+ raise _HandlerError(403, 'Read permission denied')
+ except libbe.storage.NotWriteable, e:
+ raise _HandlerError(403, 'Write permission denied')
+ except libbe.storage.InvalidID, e:
+ raise _HandlerError(
+ self.http_user_error, 'InvalidID %s' % e)
+ raise _HandlerError(404, 'Not Found')
+ except _HandlerError, e:
+ return self.error(environ, start_response,
+ e.code, e.msg, e.headers)
+
+ # handlers
+ def add(self, environ, start_response):
+ self.check_login(environ)
+ data = self.post_data(environ)
+ source = 'post'
+ id = self.data_get_id(data, source=source)
+ parent = self.data_get_string(
+ data, 'parent', default=None, source=source)
+ directory = self.data_get_boolean(
+ data, 'directory', default=False, source=source)
+ self.storage.add(id, parent=parent, directory=directory)
+ return self.ok_response(environ, start_response, None)
+
+ def exists(self, environ, start_response):
+ self.check_login(environ)
+ data = self.query_data(environ)
+ source = 'query'
+ id = self.data_get_id(data, source=source)
+ revision = self.data_get_string(
+ data, 'revision', default=None, source=source)
+ content = str(self.storage.exists(id, revision))
+ return self.ok_response(environ, start_response, content)
+
+ def remove(self, environ, start_response):
+ self.check_login(environ)
+ data = self.post_data(environ)
+ source = 'post'
+ id = self.data_get_id(data, source=source)
+ recursive = self.data_get_boolean(
+ data, 'recursive', default=False, source=source)
+ if recursive == True:
+ self.storage.recursive_remove(id)
+ else:
+ self.storage.remove(id)
+ return self.ok_response(environ, start_response, None)
+
+ def ancestors(self, environ, start_response):
+ self.check_login(environ)
+ data = self.query_data(environ)
+ source = 'query'
+ id = self.data_get_id(data, source=source)
+ revision = self.data_get_string(
+ data, 'revision', default=None, source=source)
+ content = '\n'.join(self.storage.ancestors(id, revision))+'\n'
+ return self.ok_response(environ, start_response, content)
+
+ def children(self, environ, start_response):
+ self.check_login(environ)
+ data = self.query_data(environ)
+ source = 'query'
+ id = self.data_get_id(data, default=None, source=source)
+ revision = self.data_get_string(
+ data, 'revision', default=None, source=source)
+ content = '\n'.join(self.storage.children(id, revision))
+ return self.ok_response(environ, start_response, content)
+
+ def get(self, environ, start_response):
+ self.check_login(environ)
+ data = self.query_data(environ)
+ source = 'query'
+ try:
+ id = environ['be-server.url_args'][0]
+ except:
+ raise _HandlerError(404, 'Not Found')
+ revision = self.data_get_string(
+ data, 'revision', default=None, source=source)
+ content = self.storage.get(id, revision=revision)
+ be_version = self.storage.storage_version(revision)
+ return self.ok_response(environ, start_response, content,
+ headers=[('X-BE-Version', be_version)])
+
+ def set(self, environ, start_response):
+ self.check_login(environ)
+ data = self.post_data(environ)
+ try:
+ id = environ['be-server.url_args'][0]
+ except:
+ raise _HandlerError(404, 'Not Found')
+ if not 'value' in data:
+ raise _HandlerError(406, 'Missing query key value')
+ value = data['value']
+ self.storage.set(id, value)
+ return self.ok_response(environ, start_response, None)
+
+ def commit(self, environ, start_response):
+ self.check_login(environ)
+ data = self.post_data(environ)
+ if not 'summary' in data:
+ raise _HandlerError(406, 'Missing query key summary')
+ summary = data['summary']
+ if not 'body' in data or data['body'] == 'None':
+ data['body'] = None
+ body = data['body']
+ if not 'allow_empty' in data \
+ or data['allow_empty'] == 'True':
+ allow_empty = True
+ else:
+ allow_empty = False
+ try:
+ revision = self.storage.commit(summary, body, allow_empty)
+ except libbe.storage.EmptyCommit, e:
+ raise _HandlerError(self.http_user_error, 'EmptyCommit')
+ return self.ok_response(environ, start_response, revision)
+
+ def revision_id(self, environ, start_response):
+ self.check_login(environ)
+ data = self.query_data(environ)
+ source = 'query'
+ index = int(self.data_get_string(
+ data, 'index', default=_HandlerError, source=source))
+ content = self.storage.revision_id(index)
+ return self.ok_response(environ, start_response, content)
+
+ def changed(self, environ, start_response):
+ self.check_login(environ)
+ data = self.query_data(environ)
+ source = 'query'
+ revision = self.data_get_string(
+ data, 'revision', default=None, source=source)
+ add,mod,rem = self.storage.changed(revision)
+ content = '\n\n'.join(['\n'.join(p) for p in (add,mod,rem)])
+ return self.ok_response(environ, start_response, content)
+
+ def version(self, environ, start_response):
+ self.check_login(environ)
+ data = self.query_data(environ)
+ source = 'query'
+ revision = self.data_get_string(
+ data, 'revision', default=None, source=source)
+ content = self.storage.storage_version(revision)
+ return self.ok_response(environ, start_response, content)
+
+ # handler utility functions
+ def check_login(self, environ):
+ user = environ.get('be-auth.user', None)
+ if user != None: # we're running under AuthenticationApp
+ if environ['REQUEST_METHOD'] == 'POST':
+ if user == 'guest' or self.storage.is_writeable() == False:
+ raise _Unauthorized() # only non-guests allowed to write
+ # allow read-only commands for all users
+
+
+class Serve (libbe.command.Command):
+ """:class:`~libbe.command.base.Command` wrapper around
+ :class:`ServerApp`.
+ """
+
+ name = 'serve'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='port',
+ help='Bind server to port (%default)',
+ arg=libbe.command.Argument(
+ name='port', metavar='INT', type='int', default=8000)),
+ libbe.command.Option(name='host',
+ help='Set host string (blank for localhost, %default)',
+ arg=libbe.command.Argument(
+ name='host', metavar='HOST', default='')),
+ libbe.command.Option(name='read-only', short_name='r',
+ help='Dissable operations that require writing'),
+ libbe.command.Option(name='ssl', short_name='s',
+ help='Use CherryPy to serve HTTPS (HTTP over SSL/TLS)'),
+ libbe.command.Option(name='auth', short_name='a',
+ help='Require authentication. FILE should be a file containing colon-separated UNAME:USER:sha1(PASSWORD) lines, for example: "jdoe:John Doe <jdoe@example.com>:read:d99f8e5a4b02dc25f49da2ea67c0034f61779e72"',
+ arg=libbe.command.Argument(
+ name='auth', metavar='FILE', default=None,
+ completion_callback=libbe.command.util.complete_path)),
+ ])
+
+ def _run(self, **params):
+ self._setup_logging()
+ storage = self._get_storage()
+ if params['read-only'] == True:
+ writeable = storage.writeable
+ storage.writeable = False
+ if params['host'] == '':
+ params['host'] = 'localhost'
+ if params['auth'] != None:
+ self._check_restricted_access(storage, params['auth'])
+ users = Users(params['auth'])
+ users.load()
+ app = ServerApp(storage=storage, logger=self.logger)
+ if params['auth'] != None:
+ app = AdminApp(app, users=users, logger=self.logger)
+ app = AuthenticationApp(app, realm=storage.repo,
+ users=users, logger=self.logger)
+ app = UppercaseHeaderApp(app, logger=self.logger)
+ server,details = self._get_server(params, app)
+ details['repo'] = storage.repo
+ try:
+ self._start_server(params, server, details)
+ except KeyboardInterrupt:
+ pass
+ self._stop_server(params, server)
+ if params['read-only'] == True:
+ storage.writeable = writeable
+
+ def _setup_logging(self, log_level=logging.INFO):
+ self.logger = logging.getLogger('be-serve')
+ self.log_level = logging.INFO
+ console = logging.StreamHandler(self.stdout)
+ console.setFormatter(logging.Formatter('%(message)s'))
+ self.logger.addHandler(console)
+ self.logger.propagate = False
+ if log_level is not None:
+ console.setLevel(log_level)
+ self.logger.setLevel(log_level)
+
+ def _get_server(self, params, app):
+ details = {'port':params['port']}
+ if params['ssl'] == True:
+ details['protocol'] = 'HTTPS'
+ if cherrypy == None:
+ raise libbe.command.UserError, \
+ '--ssl requires the cherrypy module'
+ app = ExceptionApp(app, logger=self.logger)
+ server = cherrypy.wsgiserver.CherryPyWSGIServer(
+ (params['host'], params['port']), app)
+ #server.throw_errors = True
+ #server.show_tracebacks = True
+ private_key,certificate = get_cert_filenames(
+ 'be-server', logger=self.logger)
+ if cherrypy.wsgiserver.ssl_builtin == None:
+ server.ssl_module = 'builtin'
+ server.ssl_private_key = private_key
+ server.ssl_certificate = certificate
+ else:
+ server.ssl_adapter = \
+ cherrypy.wsgiserver.ssl_builtin.BuiltinSSLAdapter(
+ certificate=certificate, private_key=private_key)
+ details['socket-name'] = params['host']
+ else:
+ details['protocol'] = 'HTTP'
+ server = wsgiref.simple_server.make_server(
+ params['host'], params['port'], app)
+ details['socket-name'] = server.socket.getsockname()[0]
+ return (server, details)
+
+ def _start_server(self, params, server, details):
+ self.logger.log(self.log_level,
+ 'Serving %(protocol)s on %(socket-name)s port %(port)s ...' \
+ % details)
+ self.logger.log(self.log_level,
+ 'BE repository %(repo)s' % details)
+ if params['ssl'] == True:
+ server.start()
+ else:
+ server.serve_forever()
+
+ def _stop_server(self, params, server):
+ self.logger.log(self.log_level, 'Clossing server')
+ if params['ssl'] == True:
+ server.stop()
+ else:
+ server.server_close()
+
+ def _long_help(self):
+ return """
+Example usage::
+
+ $ be serve
+
+And in another terminal (or after backgrounding the server)::
+
+ $ be --repo http://localhost:8000/ list
+
+If you bind your server to a public interface, take a look at the
+``--read-only`` option or the combined ``--ssl --auth FILE``
+options so other people can't mess with your repository. If you do use
+authentication, you'll need to send in your username and password with,
+for example::
+
+ $ be --repo http://username:password@localhost:8000/ list
+"""
+
+def random_string(length=256):
+ if os.path.exists(os.path.join('dev', 'urandom')):
+ return open("/dev/urandom").read(length)
+ else:
+ import array
+ from random import randint
+ d = array.array('B')
+ for i in xrange(1000000):
+ d.append(randint(0,255))
+ return d.tostring()
+
+if libbe.TESTING == True:
+ class WSGITestCase (unittest.TestCase):
+ def setUp(self):
+ self.logstream = StringIO.StringIO()
+ self.logger = logging.getLogger('be-serve-test')
+ console = logging.StreamHandler(self.logstream)
+ console.setFormatter(logging.Formatter('%(message)s'))
+ self.logger.addHandler(console)
+ self.logger.propagate = False
+ console.setLevel(logging.INFO)
+ self.logger.setLevel(logging.INFO)
+ self.default_environ = { # required by PEP 333
+ 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
+ 'SCRIPT_NAME':'',
+ 'PATH_INFO': '',
+ #'QUERY_STRING':'', # may be empty or absent
+ #'CONTENT_TYPE':'', # may be empty or absent
+ #'CONTENT_LENGTH':'', # may be empty or absent
+ 'SERVER_NAME':'example.com',
+ 'SERVER_PORT':'80',
+ 'SERVER_PROTOCOL':'HTTP/1.1',
+ 'wsgi.version':(1,0),
+ 'wsgi.url_scheme':'http',
+ 'wsgi.input':StringIO.StringIO(),
+ 'wsgi.errors':StringIO.StringIO(),
+ 'wsgi.multithread':False,
+ 'wsgi.multiprocess':False,
+ 'wsgi.run_once':False,
+ }
+ def getURL(self, app, path='/', method='GET', data=None,
+ scheme='http', environ={}):
+ env = copy.copy(self.default_environ)
+ env['PATH_INFO'] = path
+ env['REQUEST_METHOD'] = method
+ env['scheme'] = scheme
+ if data != None:
+ enc_data = urllib.urlencode(data)
+ if method == 'POST':
+ env['CONTENT_LENGTH'] = len(enc_data)
+ env['wsgi.input'] = StringIO.StringIO(enc_data)
+ else:
+ assert method in ['GET', 'HEAD'], method
+ env['QUERY_STRING'] = enc_data
+ for key,value in environ.items():
+ env[key] = value
+ return ''.join(app(env, self.start_response))
+ def start_response(self, status, response_headers, exc_info=None):
+ self.status = status
+ self.response_headers = response_headers
+ self.exc_info = exc_info
+
+ class WSGI_ObjectTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ self.app = WSGI_Object(self.logger)
+ def test_error(self):
+ contents = self.app.error(
+ environ=self.default_environ,
+ start_response=self.start_response,
+ error=123,
+ message='Dummy Error',
+ headers=[('X-Dummy-Header','Dummy Value')])
+ self.failUnless(contents == ['Dummy Error'], contents)
+ self.failUnless(self.status == '123 Dummy Error', self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type','text/plain'),
+ ('X-Dummy-Header','Dummy Value')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ def test_log_request(self):
+ self.app.log_request(
+ environ=self.default_environ, status='-1 OK', bytes=123)
+ log = self.logstream.getvalue()
+ self.failUnless(log.startswith('- -'), log)
+
+ class ExceptionAppTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ def child_app(environ, start_response):
+ raise ValueError('Dummy Error')
+ self.app = ExceptionApp(child_app, self.logger)
+ def test_traceback(self):
+ try:
+ self.getURL(self.app)
+ except ValueError, e:
+ pass
+ log = self.logstream.getvalue()
+ self.failUnless(log.startswith('Traceback'), log)
+ self.failUnless('child_app' in log, log)
+ self.failUnless('ValueError: Dummy Error' in log, log)
+
+ class AdminAppTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ self.users = Users()
+ self.users.add_user(
+ User('Aladdin', 'Big Al', password='open sesame'))
+ self.users.add_user(
+ User('guest', 'Guest', password='guestpass'))
+ def child_app(environ, start_response):
+ pass
+ self.app = AdminApp(
+ child_app, users=self.users, logger=self.logger)
+ self.app = AuthenticationApp(
+ self.app, realm='Dummy Realm', users=self.users,
+ logger=self.logger)
+ self.app = UppercaseHeaderApp(self.app, logger=self.logger)
+ def basic_auth(self, uname, password):
+ """HTTP basic authorization string"""
+ return 'Basic %s' % \
+ ('%s:%s' % (uname, password)).encode('base64')
+ def test_new_name(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data={'name':'Prince Al'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('Aladdin', 'open sesame')})
+ self.failUnless(self.status == '200 OK', self.status)
+ self.failUnless(self.response_headers == [],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['Aladdin'].name == 'Prince Al',
+ self.users['Aladdin'].name)
+ self.failUnless(self.users.changed == True,
+ self.users.changed)
+ def test_new_password(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data={'password':'New Pass'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('Aladdin', 'open sesame')})
+ self.failUnless(self.status == '200 OK', self.status)
+ self.failUnless(self.response_headers == [],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['Aladdin'].passhash == \
+ self.users['Aladdin'].hash('New Pass'),
+ self.users['Aladdin'].passhash)
+ self.failUnless(self.users.changed == True,
+ self.users.changed)
+ def test_guest_name(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data={'name':'SPAM'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('guest', 'guestpass')})
+ self.failUnless(self.status.startswith('403 '), self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type', 'text/plain')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['guest'].name == 'Guest',
+ self.users['guest'].name)
+ self.failUnless(self.users.changed == False,
+ self.users.changed)
+ def test_guest_password(self):
+ self.getURL(
+ self.app, '/admin/', method='POST',
+ data={'password':'SPAM'},
+ environ={'HTTP_Authorization':
+ self.basic_auth('guest', 'guestpass')})
+ self.failUnless(self.status.startswith('403 '), self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type', 'text/plain')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ self.failUnless(self.users['guest'].name == 'Guest',
+ self.users['guest'].name)
+ self.failUnless(self.users.changed == False,
+ self.users.changed)
+
+ class ServerAppTestCase (WSGITestCase):
+ def setUp(self):
+ WSGITestCase.setUp(self)
+ self.bd = libbe.bugdir.SimpleBugDir(memory=False)
+ self.app = ServerApp(self.bd.storage, logger=self.logger)
+ def tearDown(self):
+ self.bd.cleanup()
+ WSGITestCase.tearDown(self)
+ def test_add_get(self):
+ self.getURL(self.app, '/add/', method='GET')
+ self.failUnless(self.status.startswith('404 '), self.status)
+ self.failUnless(self.response_headers == [
+ ('Content-Type', 'text/plain')],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ def test_add_post(self):
+ self.getURL(self.app, '/add/', method='POST',
+ data={'id':'123456', 'parent':'abc123',
+ 'directory':'True'})
+ self.failUnless(self.status == '200 OK', self.status)
+ self.failUnless(self.response_headers == [],
+ self.response_headers)
+ self.failUnless(self.exc_info == None, self.exc_info)
+ # Note: other methods tested in libbe.storage.http
+
+ # TODO: integration tests on Serve?
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
+
+
+# The following certificate-creation code is adapted From pyOpenSSL's
+# examples.
+
+def get_cert_filenames(server_name, autogenerate=True, logger=None):
+ """
+ Generate private key and certification filenames.
+ get_cert_filenames(server_name) -> (pkey_filename, cert_filename)
+ """
+ pkey_file = '%s.pkey' % server_name
+ cert_file = '%s.cert' % server_name
+ if autogenerate == True:
+ for file in [pkey_file, cert_file]:
+ if not os.path.exists(file):
+ make_certs(server_name, logger)
+ return (pkey_file, cert_file)
+
+def createKeyPair(type, bits):
+ """Create a public/private key pair.
+
+ Returns the public/private key pair in a PKey object.
+
+ Parameters
+ ----------
+ type : TYPE_RSA or TYPE_DSA
+ Key type.
+ bits : int
+ Number of bits to use in the key.
+ """
+ pkey = OpenSSL.crypto.PKey()
+ pkey.generate_key(type, bits)
+ return pkey
+
+def createCertRequest(pkey, digest="md5", **name):
+ """Create a certificate request.
+
+ Returns the certificate request in an X509Req object.
+
+ Parameters
+ ----------
+ pkey : PKey
+ The key to associate with the request.
+ digest : "md5" or ?
+ Digestion method to use for signing, default is "md5",
+ `**name` :
+ The name of the subject of the request, possible.
+ Arguments are:
+
+ ============ ========================
+ C Country name
+ ST State or province name
+ L Locality name
+ O Organization name
+ OU Organizational unit name
+ CN Common name
+ emailAddress E-mail address
+ ============ ========================
+ """
+ req = OpenSSL.crypto.X509Req()
+ subj = req.get_subject()
+
+ for (key,value) in name.items():
+ setattr(subj, key, value)
+
+ req.set_pubkey(pkey)
+ req.sign(pkey, digest)
+ return req
+
+def createCertificate(req, (issuerCert, issuerKey), serial, (notBefore, notAfter), digest="md5"):
+ """Generate a certificate given a certificate request.
+
+ Returns the signed certificate in an X509 object.
+
+ Parameters
+ ----------
+ req :
+ Certificate reqeust to use
+ issuerCert :
+ The certificate of the issuer
+ issuerKey :
+ The private key of the issuer
+ serial :
+ Serial number for the certificate
+ notBefore :
+ Timestamp (relative to now) when the certificate
+ starts being valid
+ notAfter :
+ Timestamp (relative to now) when the certificate
+ stops being valid
+ digest :
+ Digest method to use for signing, default is md5
+ """
+ cert = OpenSSL.crypto.X509()
+ cert.set_serial_number(serial)
+ cert.gmtime_adj_notBefore(notBefore)
+ cert.gmtime_adj_notAfter(notAfter)
+ cert.set_issuer(issuerCert.get_subject())
+ cert.set_subject(req.get_subject())
+ cert.set_pubkey(req.get_pubkey())
+ cert.sign(issuerKey, digest)
+ return cert
+
+def make_certs(server_name, logger=None) :
+ """Generate private key and certification files.
+
+ `mk_certs(server_name) -> (pkey_filename, cert_filename)`
+ """
+ if OpenSSL == None:
+ raise libbe.command.UserError, \
+ 'SSL certificate generation requires the OpenSSL module'
+ pkey_file,cert_file = get_cert_filenames(
+ server_name, autogenerate=False)
+ if logger != None:
+ logger.log(logger._server_level,
+ 'Generating certificates', pkey_file, cert_file)
+ cakey = createKeyPair(OpenSSL.crypto.TYPE_RSA, 1024)
+ careq = createCertRequest(cakey, CN='Certificate Authority')
+ cacert = createCertificate(
+ careq, (careq, cakey), 0, (0, 60*60*24*365*5)) # five years
+ open(pkey_file, 'w').write(OpenSSL.crypto.dump_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, cakey))
+ open(cert_file, 'w').write(OpenSSL.crypto.dump_certificate(
+ OpenSSL.crypto.FILETYPE_PEM, cacert))
diff --git a/libbe/command/set.py b/libbe/command/set.py
new file mode 100644
index 0000000..720dd0f
--- /dev/null
+++ b/libbe/command/set.py
@@ -0,0 +1,144 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# Marien Zwart <marienz@gentoo.org>
+# Thomas Gerigk <tgerigk@gmx.de>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import textwrap
+
+import libbe
+import libbe.bugdir
+import libbe.command
+import libbe.command.util
+from libbe.storage.util.settings_object import EMPTY
+
+
+class Set (libbe.command.Command):
+ """Change bug directory settings
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Set(ui=ui)
+
+ >>> ret = ui.run(cmd, args=['target'])
+ None
+ >>> ret = ui.run(cmd, args=['target', 'abcdefg'])
+ >>> ret = ui.run(cmd, args=['target'])
+ abcdefg
+ >>> ret = ui.run(cmd, args=['target', 'none'])
+ >>> ret = ui.run(cmd, args=['target'])
+ None
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'set'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='setting', metavar='SETTING', optional=True,
+ completion_callback=complete_bugdir_settings),
+ libbe.command.Argument(
+ name='value', metavar='VALUE', optional=True)
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ if params['setting'] == None:
+ keys = bugdir.settings_properties
+ keys.sort()
+ for key in keys:
+ print >> self.stdout, \
+ '%16s: %s' % (key, _value_string(bugdir, key))
+ return 0
+ if params['setting'] not in bugdir.settings_properties:
+ msg = 'Invalid setting %s\n' % params['setting']
+ msg += 'Allowed settings:\n '
+ msg += '\n '.join(bugdir.settings_properties)
+ raise libbe.command.UserError(msg)
+ if params['value'] == None:
+ print _value_string(bugdir, params['setting'])
+ else:
+ if params['value'] == 'none':
+ params['value'] = EMPTY
+ old_setting = bugdir.settings.get(params['setting'])
+ attr = bugdir._setting_name_to_attr_name(params['setting'])
+ setattr(bugdir, attr, params['value'])
+ return 0
+
+ def _long_help(self):
+ return """
+Show or change per-tree settings.
+
+If name and value are supplied, the name is set to a new value.
+If no value is specified, the current value is printed.
+If no arguments are provided, all names and values are listed.
+
+To unset a setting, set it to "none".
+
+Allowed settings are:
+
+%s""" % ('\n'.join(get_bugdir_settings()),)
+
+def get_bugdir_settings():
+ settings = []
+ for s in libbe.bugdir.BugDir.settings_properties:
+ settings.append(s)
+ settings.sort()
+ documented_settings = []
+ for s in settings:
+ set = getattr(libbe.bugdir.BugDir, s)
+ dstr = set.__doc__.strip()
+ # per-setting comment adjustments
+ if s == 'vcs_name':
+ lines = dstr.split('\n')
+ while lines[0].startswith('This property defaults to') == False:
+ lines.pop(0)
+ assert len(lines) != None, \
+ 'Unexpected vcs_name docstring:\n "%s"' % dstr
+ lines.insert(
+ 0, 'The name of the revision control system to use.\n')
+ dstr = '\n'.join(lines)
+ doc = textwrap.wrap(dstr, width=70, initial_indent=' ',
+ subsequent_indent=' ')
+ documented_settings.append('%s\n%s' % (s, '\n'.join(doc)))
+ return documented_settings
+
+def _value_string(bugdir, setting):
+ val = bugdir.settings.get(setting, EMPTY)
+ if val == EMPTY:
+ default = getattr(bugdir, bugdir._setting_name_to_attr_name(setting))
+ if default not in [None, EMPTY]:
+ val = 'None (%s)' % default
+ else:
+ val = None
+ return str(val)
+
+def complete_bugdir_settings(command, argument, fragment=None):
+ """
+ List possible command completions for fragment.
+
+ Neither the command nor argument arguments are used.
+ """
+ return libbe.bugdir.BugDir.settings_properties
diff --git a/libbe/command/severity.py b/libbe/command/severity.py
new file mode 100644
index 0000000..27898f7
--- /dev/null
+++ b/libbe/command/severity.py
@@ -0,0 +1,98 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# Marien Zwart <marienz@gentoo.org>
+# Thomas Gerigk <tgerigk@gmx.de>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.bug
+import libbe.command
+import libbe.command.util
+
+
+class Severity (libbe.command.Command):
+ """Change a bug's severity level
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> cmd = Severity(ui=ui)
+
+ >>> bd.bug_from_uuid('a').severity
+ 'minor'
+ >>> ret = ui.run(cmd, args=['wishlist', '/a'])
+ >>> bd.flush_reload()
+ >>> bd.bug_from_uuid('a').severity
+ 'wishlist'
+ >>> ret = ui.run(cmd, args=['none', '/a'])
+ Traceback (most recent call last):
+ UserError: Invalid severity level: none
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'severity'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='severity', metavar='SEVERITY', default=None,
+ completion_callback=libbe.command.util.complete_severity),
+ libbe.command.Argument(
+ name='bug-id', metavar='BUG-ID', default=None,
+ repeatable=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ for bug_id in params['bug-id']:
+ bug,dummy_comment = \
+ libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
+ if bug.severity != params['severity']:
+ try:
+ bug.severity = params['severity']
+ except ValueError, e:
+ if e.name != 'severity':
+ raise e
+ raise libbe.command.UserError(
+ 'Invalid severity level: %s' % e.value)
+ return 0
+
+ def _long_help(self):
+ ret = ["""
+Show or change a bug's severity level.
+
+If no severity is specified, the current value is printed. If a severity level
+is specified, it will be assigned to the bug.
+
+Severity levels are:
+"""]
+ try: # See if there are any per-tree severity configurations
+ bd = self._get_bugdir()
+ except NotImplementedError:
+ pass # No tree, just show the defaults
+ longest_severity_len = max([len(s) for s in libbe.bug.severity_values])
+ for severity in libbe.bug.severity_values :
+ description = libbe.bug.severity_description[severity]
+ ret.append('%*s : %s\n' \
+ % (longest_severity_len, severity, description))
+ return ''.join(ret)
diff --git a/libbe/command/show.py b/libbe/command/show.py
new file mode 100644
index 0000000..ab3be73
--- /dev/null
+++ b/libbe/command/show.py
@@ -0,0 +1,207 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# Thomas Gerigk <tgerigk@gmx.de>
+# Thomas Habets <thomas@habets.pp.se>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import sys
+
+import libbe
+import libbe.command
+import libbe.command.util
+import libbe.util.id
+import libbe.version
+import libbe._version
+
+
+class Show (libbe.command.Command):
+ """Show a particular bug, comment, or combination of both.
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> io.stdout.encoding = 'ascii'
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> cmd = Show(ui=ui)
+
+ >>> ret = ui.run(cmd, args=['/a',]) # doctest: +ELLIPSIS
+ ID : a
+ Short name : abc/a
+ Severity : minor
+ Status : open
+ Assigned :
+ Reporter :
+ Creator : John Doe <jdoe@example.com>
+ Created : ...
+ Bug A
+ <BLANKLINE>
+
+ >>> ret = ui.run(cmd, {'xml':True}, ['/a']) # doctest: +ELLIPSIS
+ <?xml version="1.0" encoding="..." ?>
+ <be-xml>
+ <version>
+ <tag>...</tag>
+ <branch-nick>...</branch-nick>
+ <revno>...</revno>
+ <revision-id>...</revision-id>
+ </version>
+ <bug>
+ <uuid>a</uuid>
+ <short-name>abc/a</short-name>
+ <severity>minor</severity>
+ <status>open</status>
+ <creator>John Doe &lt;jdoe@example.com&gt;</creator>
+ <created>Thu, 01 Jan 1970 00:00:00 +0000</created>
+ <summary>Bug A</summary>
+ </bug>
+ </be-xml>
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'show'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='xml', short_name='x',
+ help='Dump as XML'),
+ libbe.command.Option(name='only-raw-body',
+ help="When printing only a single comment, just print it's"
+ " body. This allows extraction of non-text content types."),
+ libbe.command.Option(name='no-comments', short_name='c',
+ help="Disable comment output. This is useful if you just "
+ "want more details on a bug's current status."),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='id', metavar='ID', default=None,
+ optional=True, repeatable=True,
+ completion_callback=libbe.command.util.complete_bug_comment_id),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ if params['only-raw-body'] == True:
+ if len(params['id']) != 1:
+ raise libbe.command.UsageError(
+ 'only one ID accepted with --only-raw-body')
+ bug,comment = libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['id'][0])
+ if comment == bug.comment_root:
+ raise libbe.command.UsageError(
+ "--only-raw-body requires a comment ID, not '%s'"
+ % params['id'][0])
+ sys.__stdout__.write(comment.body)
+ return 0
+ print >> self.stdout, \
+ output(bugdir, params['id'], encoding=self.stdout.encoding,
+ as_xml=params['xml'],
+ with_comments=not params['no-comments'])
+ return 0
+
+ def _long_help(self):
+ return """
+Show all information about the bugs or comments whose IDs are given.
+If no IDs are given, show the entire repository.
+
+Without the --xml flag set, it's probably not a good idea to mix bug
+and comment IDs in a single call, but you're free to do so if you
+like. With the --xml flag set, there will never be any root comments,
+so mix and match away (the bug listings for directly requested
+comments will be restricted to the bug uuid and the requested
+comment(s)).
+
+Directly requested comments will be grouped by their parent bug and
+placed at the end of the output, so the ordering may not match the
+order of the listed IDs.
+"""
+
+def _sort_ids(bugdir, ids, with_comments=True):
+ bugs = []
+ root_comments = {}
+ for id in ids:
+ p = libbe.util.id.parse_user(bugdir, id)
+ if p['type'] == 'bug':
+ bugs.append(p['bug'])
+ elif with_comments == True:
+ if p['bug'] not in root_comments:
+ root_comments[p['bug']] = [p['comment']]
+ else:
+ root_comments[p['bug']].append(p['comment'])
+ for bugname in root_comments.keys():
+ assert bugname not in bugs, \
+ 'specifically requested both #/%s/%s# and #/%s#' \
+ % (bugname, root_comments[bugname][0], bugname)
+ return (bugs, root_comments)
+
+def _xml_header(encoding):
+ lines = ['<?xml version="1.0" encoding="%s" ?>' % encoding,
+ '<be-xml>',
+ ' <version>',
+ ' <tag>%s</tag>' % libbe.version.version()]
+ for tag in ['branch-nick', 'revno', 'revision-id']:
+ value = libbe._version.version_info[tag.replace('-', '_')]
+ lines.append(' <%s>%s</%s>' % (tag, value, tag))
+ lines.append(' </version>')
+ return lines
+
+def _xml_footer():
+ return ['</be-xml>']
+
+def output(bd, ids, encoding, as_xml=True, with_comments=True):
+ if ids == None or len(ids) == 0:
+ bd.load_all_bugs()
+ ids = [bug.id.user() for bug in bd]
+ bugs,root_comments = _sort_ids(bd, ids, with_comments)
+ lines = []
+ if as_xml:
+ lines.extend(_xml_header(encoding))
+ else:
+ spaces_left = len(ids) - 1
+ for bugname in bugs:
+ bug = bd.bug_from_uuid(bugname)
+ if as_xml:
+ lines.append(bug.xml(indent=2, show_comments=with_comments))
+ else:
+ lines.append(bug.string(show_comments=with_comments))
+ if spaces_left > 0:
+ spaces_left -= 1
+ lines.append('') # add a blank line between bugs/comments
+ for bugname,comments in root_comments.items():
+ bug = bd.bug_from_uuid(bugname)
+ if as_xml:
+ lines.extend([' <bug>', ' <uuid>%s</uuid>' % bug.uuid])
+ for commname in comments:
+ try:
+ comment = bug.comment_root.comment_from_uuid(commname)
+ except KeyError, e:
+ raise libbe.command.UserError(e.message)
+ if as_xml:
+ lines.append(comment.xml(indent=4))
+ else:
+ lines.append(comment.string())
+ if spaces_left > 0:
+ spaces_left -= 1
+ lines.append('') # add a blank line between bugs/comments
+ if as_xml:
+ lines.append('</bug>')
+ if as_xml:
+ lines.extend(_xml_footer())
+ return '\n'.join(lines)
diff --git a/libbe/command/status.py b/libbe/command/status.py
new file mode 100644
index 0000000..1659f75
--- /dev/null
+++ b/libbe/command/status.py
@@ -0,0 +1,108 @@
+# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.bug
+import libbe.command
+import libbe.command.util
+
+
+class Status (libbe.command.Command):
+ """Change a bug's status level
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> cmd = Status(ui=ui)
+ >>> cmd._storage = bd.storage
+
+ >>> bd.bug_from_uuid('a').status
+ 'open'
+ >>> ret = ui.run(cmd, args=['closed', '/a'])
+ >>> bd.flush_reload()
+ >>> bd.bug_from_uuid('a').status
+ 'closed'
+ >>> ret = ui.run(cmd, args=['none', '/a'])
+ Traceback (most recent call last):
+ UserError: Invalid status level: none
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'status'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='status', metavar='STATUS', default=None,
+ completion_callback=libbe.command.util.complete_status),
+ libbe.command.Argument(
+ name='bug-id', metavar='BUG-ID', default=None,
+ repeatable=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ for bug_id in params['bug-id']:
+ bug,dummy_comment = \
+ libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
+ if bug.status != params['status']:
+ try:
+ bug.status = params['status']
+ except ValueError, e:
+ if e.name != 'status':
+ raise e
+ raise libbe.command.UserError(
+ 'Invalid status level: %s' % e.value)
+ return 0
+
+ def _long_help(self):
+ longest_status_len = max([len(s) for s in libbe.bug.status_values])
+ active_statuses = []
+ for status in libbe.bug.active_status_values :
+ description = libbe.bug.status_description[status]
+ s = '%*s : %s' % (longest_status_len, status, description)
+ active_statuses.append(s)
+ inactive_statuses = []
+ for status in libbe.bug.inactive_status_values :
+ description = libbe.bug.status_description[status]
+ s = '%*s : %s' % (longest_status_len, status, description)
+ inactive_statuses.append(s)
+ ret = """
+Show or change a bug's status.
+
+If no status is specified, the current value is printed. If a status
+is specified, it will be assigned to the bug.
+
+There are two classes of statuses, active and inactive, which are only
+important for commands like "be list" that show only active bugs by
+default.
+
+Active status levels are:
+ %s
+Inactive status levels are:
+ %s
+
+You can overide the list of allowed statuses on a per-repository basis.
+See "be set --help" for more details.
+""" % ('\n '.join(active_statuses), '\n '.join(inactive_statuses))
+ return ret
diff --git a/libbe/command/subscribe.py b/libbe/command/subscribe.py
new file mode 100644
index 0000000..d1cf72e
--- /dev/null
+++ b/libbe/command/subscribe.py
@@ -0,0 +1,385 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import copy
+import os
+
+import libbe
+import libbe.bug
+import libbe.command
+import libbe.diff
+import libbe.command.util
+import libbe.util.id
+import libbe.util.tree
+
+
+TAG="SUBSCRIBE:"
+
+
+class Subscribe (libbe.command.Command):
+ """(Un)subscribe to change notification
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> cmd = Subscribe(ui=ui)
+
+ >>> a = bd.bug_from_uuid('a')
+ >>> print a.extra_strings
+ []
+ >>> ret = ui.run(cmd, {'subscriber':'John Doe <j@doe.com>'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for abc/a:
+ John Doe <j@doe.com> all *
+ >>> bd.flush_reload()
+ >>> a = bd.bug_from_uuid('a')
+ >>> print a.extra_strings
+ ['SUBSCRIBE:John Doe <j@doe.com>\\tall\\t*']
+ >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'servers':'a.com,b.net'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for abc/a:
+ Jane Doe <J@doe.com> all a.com,b.net
+ John Doe <j@doe.com> all *
+ >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'servers':'a.edu'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for abc/a:
+ Jane Doe <J@doe.com> all a.com,a.edu,b.net
+ John Doe <j@doe.com> all *
+ >>> ret = ui.run(cmd, {'unsubscribe':True, 'subscriber':'Jane Doe <J@doe.com>', 'servers':'a.com'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for abc/a:
+ Jane Doe <J@doe.com> all a.edu,b.net
+ John Doe <j@doe.com> all *
+ >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'servers':'*'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for abc/a:
+ Jane Doe <J@doe.com> all *
+ John Doe <j@doe.com> all *
+ >>> ret = ui.run(cmd, {'unsubscribe':True, 'subscriber':'Jane Doe <J@doe.com>'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for abc/a:
+ John Doe <j@doe.com> all *
+ >>> ret = ui.run(cmd, {'unsubscribe':True, 'subscriber':'John Doe <j@doe.com>'}, ['/a'])
+ >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'types':'new'}, ['DIR']) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for bug directory:
+ Jane Doe <J@doe.com> new *
+ >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>'}, ['DIR']) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for bug directory:
+ Jane Doe <J@doe.com> all *
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'subscribe'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='unsubscribe', short_name='u',
+ help='Unsubscribe instead of subscribing'),
+ libbe.command.Option(name='list-all', short_name='a',
+ help='List all subscribers (no ID argument, read only action)'),
+ libbe.command.Option(name='list', short_name='l',
+ help='List subscribers (read only action).'),
+ libbe.command.Option(name='subscriber', short_name='s',
+ help='Email address of the subscriber (defaults to your user id).',
+ arg=libbe.command.Argument(
+ name='subscriber', metavar='EMAIL')),
+ libbe.command.Option(name='servers', short_name='S',
+ help='Servers from which you want notification.',
+ arg=libbe.command.Argument(
+ name='servers', metavar='STRING')),
+ libbe.command.Option(name='types', short_name='t',
+ help='Types of changes you wish to be notified about.',
+ arg=libbe.command.Argument(
+ name='types', metavar='STRING')),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='id', metavar='ID', default=tuple(),
+ optional=True, repeatable=True,
+ completion_callback=libbe.command.util.complete_bug_comment_id),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ if params['list-all'] == True or params['list'] == True:
+ writeable = bugdir.storage.writeable
+ bugdir.storage.writeable = False
+ if params['list-all'] == True:
+ assert len(params['id']) == 0, params['id']
+ subscriber = params['subscriber']
+ if subscriber == None:
+ subscriber = self._get_user_id()
+ if params['unsubscribe'] == True:
+ if params['servers'] == None:
+ params['servers'] = 'INVALID'
+ if params['types'] == None:
+ params['types'] = 'INVALID'
+ else:
+ if params['servers'] == None:
+ params['servers'] = '*'
+ if params['types'] == None:
+ params['types'] = 'all'
+ servers = params['servers'].split(',')
+ types = params['types'].split(',')
+
+ if len(params['id']) == 0:
+ params['id'] = [libbe.diff.BUGDIR_ID]
+ for _id in params['id']:
+ if _id == libbe.diff.BUGDIR_ID: # directory-wide subscriptions
+ type_root = libbe.diff.BUGDIR_TYPE_ALL
+ entity = bugdir
+ entity_name = 'bug directory'
+ else: # bug-specific subscriptions
+ type_root = libbe.diff.BUG_TYPE_ALL
+ bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
+ bugdir, _id)
+ entity = bug
+ entity_name = bug.id.user()
+ if params['list-all'] == True:
+ entity_name = 'anything in the bug directory'
+ types = [libbe.diff.type_from_name(name, type_root, default=libbe.diff.INVALID_TYPE,
+ default_ok=params['unsubscribe'])
+ for name in types]
+ estrs = entity.extra_strings
+ if params['list'] == True or params['list-all'] == True:
+ pass
+ else: # alter subscriptions
+ if params['unsubscribe'] == True:
+ estrs = unsubscribe(estrs, subscriber, types, servers, type_root)
+ else: # add the tag
+ estrs = subscribe(estrs, subscriber, types, servers, type_root)
+ entity.extra_strings = estrs # reassign to notice change
+
+ if params['list-all'] == True:
+ bugdir.load_all_bugs()
+ subscriptions = get_bugdir_subscribers(bugdir, servers[0])
+ else:
+ subscriptions = []
+ for estr in entity.extra_strings:
+ if estr.startswith(TAG):
+ subscriptions.append(estr[len(TAG):])
+
+ if len(subscriptions) > 0:
+ print >> self.stdout, 'Subscriptions for %s:' % entity_name
+ print >> self.stdout, '\n'.join(subscriptions)
+ if params['list-all'] == True or params['list'] == True:
+ bugdir.storage.writeable = writeable
+ return 0
+
+ def _long_help(self):
+ return """
+ID can be either a bug id, or blank/"DIR", in which case it refers to the
+whole bug directory.
+
+SERVERS specifies the servers from which you would like to receive
+notification. Multiple severs may be specified in a comma-separated
+list, or you can use "*" to match all servers (the default). If you
+have not selected a server, it should politely refrain from notifying
+you of changes, although there is no way to guarantee this behavior.
+
+Available TYPES:
+ For bugs:
+%s
+ For %s:
+%s
+
+For unsubscription, any listed SERVERS and TYPES are removed from your
+subscription. Either the catch-all server "*" or type "%s" will
+remove SUBSCRIBER entirely from the specified ID.
+
+This command is intended for use primarily by public interfaces, since
+if you're just hacking away on your private repository, you'll known
+what's changed ;). This command just (un)sets the appropriate
+subscriptions, and leaves it up to each interface to perform the
+notification.
+""" % (libbe.diff.BUG_TYPE_ALL.string_tree(6), libbe.diff.BUGDIR_ID,
+ libbe.diff.BUGDIR_TYPE_ALL.string_tree(6),
+ libbe.diff.BUGDIR_TYPE_ALL)
+
+
+# internal helper functions
+
+def _generate_string(subscriber, types, servers):
+ types = sorted([str(t) for t in types])
+ servers = sorted(servers)
+ return "%s%s\t%s\t%s" % (TAG,subscriber,",".join(types),",".join(servers))
+
+def _parse_string(string, type_root):
+ assert string.startswith(TAG), string
+ string = string[len(TAG):]
+ subscriber,types,servers = string.split("\t")
+ types = [libbe.diff.type_from_name(name, type_root) for name in types.split(",")]
+ return (subscriber,types,servers.split(","))
+
+def _get_subscriber(extra_strings, subscriber, type_root):
+ for i,string in enumerate(extra_strings):
+ if string.startswith(TAG):
+ s,ts,srvs = _parse_string(string, type_root)
+ if s == subscriber:
+ return i,s,ts,srvs # match!
+ return None # no match
+
+# functions exposed to other modules
+
+def subscribe(extra_strings, subscriber, types, servers, type_root):
+ args = _get_subscriber(extra_strings, subscriber, type_root)
+ if args == None: # no match
+ extra_strings.append(_generate_string(subscriber, types, servers))
+ return extra_strings
+ # Alter matched string
+ i,s,ts,srvs = args
+ for t in types:
+ if t not in ts:
+ ts.append(t)
+ # remove descendant types
+ all_ts = copy.copy(ts)
+ for t in all_ts:
+ for tt in all_ts:
+ if tt in ts and t.has_descendant(tt):
+ ts.remove(tt)
+ if "*" in servers+srvs:
+ srvs = ["*"]
+ else:
+ srvs = list(set(servers+srvs))
+ extra_strings[i] = _generate_string(subscriber, ts, srvs)
+ return extra_strings
+
+def unsubscribe(extra_strings, subscriber, types, servers, type_root):
+ args = _get_subscriber(extra_strings, subscriber, type_root)
+ if args == None: # no match
+ return extra_strings # pass
+ # Remove matched string
+ i,s,ts,srvs = args
+ all_ts = copy.copy(ts)
+ for t in types:
+ for tt in all_ts:
+ if tt in ts and t.has_descendant(tt):
+ ts.remove(tt)
+ if "*" in servers+srvs:
+ srvs = []
+ else:
+ for srv in servers:
+ if srv in srvs:
+ srvs.remove(srv)
+ if len(ts) == 0 or len(srvs) == 0:
+ extra_strings.pop(i)
+ else:
+ extra_strings[i] = _generate_string(subscriber, ts, srvs)
+ return extra_strings
+
+def get_subscribers(extra_strings, type, server, type_root,
+ match_ancestor_types=False,
+ match_descendant_types=False):
+ """
+ Set match_ancestor_types=True if you want to find eveyone who
+ cares about your particular type.
+
+ Set match_descendant_types=True if you want to find subscribers
+ who may only care about some subset of your type. This is useful
+ for generating lists of all the subscribers in a given set of
+ extra_strings.
+
+ >>> def sgs(*args, **kwargs):
+ ... return sorted(get_subscribers(*args, **kwargs))
+ >>> es = []
+ >>> es = subscribe(es, "John Doe <j@doe.com>", [libbe.diff.BUGDIR_TYPE_ALL],
+ ... ["a.com"], libbe.diff.BUGDIR_TYPE_ALL)
+ >>> es = subscribe(es, "Jane Doe <J@doe.com>", [libbe.diff.BUGDIR_TYPE_NEW],
+ ... ["*"], libbe.diff.BUGDIR_TYPE_ALL)
+ >>> sgs(es, libbe.diff.BUGDIR_TYPE_ALL, "a.com", libbe.diff.BUGDIR_TYPE_ALL)
+ ['John Doe <j@doe.com>']
+ >>> sgs(es, libbe.diff.BUGDIR_TYPE_ALL, "a.com", libbe.diff.BUGDIR_TYPE_ALL,
+ ... match_descendant_types=True)
+ ['Jane Doe <J@doe.com>', 'John Doe <j@doe.com>']
+ >>> sgs(es, libbe.diff.BUGDIR_TYPE_ALL, "b.net", libbe.diff.BUGDIR_TYPE_ALL,
+ ... match_descendant_types=True)
+ ['Jane Doe <J@doe.com>']
+ >>> sgs(es, libbe.diff.BUGDIR_TYPE_NEW, "a.com", libbe.diff.BUGDIR_TYPE_ALL)
+ ['Jane Doe <J@doe.com>']
+ >>> sgs(es, libbe.diff.BUGDIR_TYPE_NEW, "a.com", libbe.diff.BUGDIR_TYPE_ALL,
+ ... match_ancestor_types=True)
+ ['Jane Doe <J@doe.com>', 'John Doe <j@doe.com>']
+ """
+ for string in extra_strings:
+ if not string.startswith(TAG):
+ continue
+ subscriber,types,servers = _parse_string(string, type_root)
+ type_match = False
+ if type in types:
+ type_match = True
+ if type_match == False and match_ancestor_types == True:
+ for t in types:
+ if t.has_descendant(type):
+ type_match = True
+ break
+ if type_match == False and match_descendant_types == True:
+ for t in types:
+ if type.has_descendant(t):
+ type_match = True
+ break
+ server_match = False
+ if server in servers or servers == ["*"] or server == "*":
+ server_match = True
+ if type_match == True and server_match == True:
+ yield subscriber
+
+def get_bugdir_subscribers(bugdir, server):
+ """
+ I have a bugdir. Who cares about it, and what do they care about?
+ Returns a dict of dicts:
+ subscribers[user][id] = types
+ where id is either a bug.uuid (in the case of a bug subscription)
+ or "%(bugdir_id)s" (in the case of a bugdir subscription).
+
+ Only checks bugs that are currently in memory, so you might want
+ to call bugdir.load_all_bugs() first.
+
+ >>> bd = bugdir.SimpleBugDir(sync_with_disk=False)
+ >>> a = bd.bug_from_shortname("a")
+ >>> bd.extra_strings = subscribe(bd.extra_strings, "John Doe <j@doe.com>",
+ ... [libbe.diff.BUGDIR_TYPE_ALL], ["a.com"], libbe.diff.BUGDIR_TYPE_ALL)
+ >>> bd.extra_strings = subscribe(bd.extra_strings, "Jane Doe <J@doe.com>",
+ ... [libbe.diff.BUGDIR_TYPE_NEW], ["*"], libbe.diff.BUGDIR_TYPE_ALL)
+ >>> a.extra_strings = subscribe(a.extra_strings, "John Doe <j@doe.com>",
+ ... [libbe.diff.BUG_TYPE_ALL], ["a.com"], libbe.diff.BUG_TYPE_ALL)
+ >>> subscribers = get_bugdir_subscribers(bd, "a.com")
+ >>> subscribers["Jane Doe <J@doe.com>"]["%(bugdir_id)s"]
+ [<SubscriptionType: new>]
+ >>> subscribers["John Doe <j@doe.com>"]["%(bugdir_id)s"]
+ [<SubscriptionType: all>]
+ >>> subscribers["John Doe <j@doe.com>"]["a"]
+ [<SubscriptionType: all>]
+ >>> get_bugdir_subscribers(bd, "b.net")
+ {'Jane Doe <J@doe.com>': {'%(bugdir_id)s': [<SubscriptionType: new>]}}
+ >>> bd.cleanup()
+ """ % {'bugdir_id':libbe.diff.BUGDIR_ID}
+ subscribers = {}
+ for sub in get_subscribers(bugdir.extra_strings, libbe.diff.BUGDIR_TYPE_ALL,
+ server, libbe.diff.BUGDIR_TYPE_ALL,
+ match_descendant_types=True):
+ i,s,ts,srvs = _get_subscriber(bugdir.extra_strings, sub,
+ libbe.diff.BUGDIR_TYPE_ALL)
+ subscribers[sub] = {"DIR":ts}
+ for bug in bugdir:
+ for sub in get_subscribers(bug.extra_strings, libbe.diff.BUG_TYPE_ALL,
+ server, libbe.diff.BUG_TYPE_ALL,
+ match_descendant_types=True):
+ i,s,ts,srvs = _get_subscriber(bug.extra_strings, sub,
+ libbe.diff.BUG_TYPE_ALL)
+ if sub in subscribers:
+ subscribers[sub][bug.uuid] = ts
+ else:
+ subscribers[sub] = {bug.uuid:ts}
+ return subscribers
diff --git a/libbe/command/tag.py b/libbe/command/tag.py
new file mode 100644
index 0000000..f4dc3ba
--- /dev/null
+++ b/libbe/command/tag.py
@@ -0,0 +1,152 @@
+# Copyright (C) 2009-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.command
+import libbe.command.util
+
+
+TAG_TAG = 'TAG:'
+
+
+class Tag (libbe.command.Command):
+ __doc__ = """Tag a bug, or search bugs for tags
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> cmd = Tag(ui=ui)
+
+ >>> a = bd.bug_from_uuid('a')
+ >>> print a.extra_strings
+ []
+ >>> ret = ui.run(cmd, args=['/a', 'GUI'])
+ Tags for abc/a:
+ GUI
+ >>> bd.flush_reload()
+ >>> a = bd.bug_from_uuid('a')
+ >>> print a.extra_strings
+ ['%(tag_tag)sGUI']
+ >>> ret = ui.run(cmd, args=['/a', 'later'])
+ Tags for abc/a:
+ GUI
+ later
+ >>> ret = ui.run(cmd, args=['/a'])
+ Tags for abc/a:
+ GUI
+ later
+ >>> ret = ui.run(cmd, {'list':True})
+ GUI
+ later
+ >>> ret = ui.run(cmd, args=['/a', 'Alphabetically first'])
+ Tags for abc/a:
+ Alphabetically first
+ GUI
+ later
+ >>> bd.flush_reload()
+ >>> a = bd.bug_from_uuid('a')
+ >>> print a.extra_strings
+ ['%(tag_tag)sAlphabetically first', '%(tag_tag)sGUI', '%(tag_tag)slater']
+ >>> a.extra_strings = []
+ >>> print a.extra_strings
+ []
+ >>> ret = ui.run(cmd, args=['/a'])
+ >>> bd.flush_reload()
+ >>> a = bd.bug_from_uuid('a')
+ >>> print a.extra_strings
+ []
+ >>> ret = ui.run(cmd, args=['/a', 'Alphabetically first'])
+ Tags for abc/a:
+ Alphabetically first
+ >>> ret = ui.run(cmd, {'remove':True}, ['/a', 'Alphabetically first'])
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """ % {'tag_tag':TAG_TAG}
+ name = 'tag'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='remove', short_name='r',
+ help='Remove TAG (instead of adding it)'),
+ libbe.command.Option(name='list', short_name='l',
+ help='List all available tags and exit'),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='id', metavar='BUG-ID', optional=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ libbe.command.Argument(
+ name='tag', metavar='TAG', default=tuple(),
+ optional=True, repeatable=True),
+ ])
+
+ def _run(self, **params):
+ if params['id'] == None and params['list'] == False:
+ raise libbe.command.UserError('Please specify a bug id.')
+ if params['id'] != None and params['list'] == True:
+ raise libbe.command.UserError(
+ 'Do not specify a bug id with the --list option.')
+ bugdir = self._get_bugdir()
+ if params['list'] == True:
+ bugdir.load_all_bugs()
+ tags = []
+ for bug in bugdir:
+ for estr in bug.extra_strings:
+ if estr.startswith(TAG_TAG):
+ tag = estr[len(TAG_TAG):]
+ if tag not in tags:
+ tags.append(tag)
+ tags.sort()
+ if len(tags) > 0:
+ print >> self.stdout, '\n'.join(tags)
+ return 0
+
+ bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['id'])
+ if len(params['tag']) > 0:
+ estrs = bug.extra_strings
+ for tag in params['tag']:
+ tag_string = '%s%s' % (TAG_TAG, tag)
+ if params['remove'] == True:
+ estrs.remove(tag_string)
+ else: # add the tag
+ estrs.append(tag_string)
+ bug.extra_strings = estrs # reassign to notice change
+
+ tags = []
+ for estr in bug.extra_strings:
+ if estr.startswith(TAG_TAG):
+ tags.append(estr[len(TAG_TAG):])
+
+ if len(tags) > 0:
+ print "Tags for %s:" % bug.id.user()
+ print '\n'.join(tags)
+ return 0
+
+ def _long_help(self):
+ return """
+If TAG is given, add TAG to BUG-ID. If it is not specified, just
+print the tags for BUG-ID.
+
+To search for bugs with a particular tag, try
+ $ be list --extra-strings %s<your-tag>
+""" % TAG_TAG
diff --git a/libbe/command/target.py b/libbe/command/target.py
new file mode 100644
index 0000000..f8a956b
--- /dev/null
+++ b/libbe/command/target.py
@@ -0,0 +1,209 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Chris Ball <cjb@laptop.org>
+# Gianluca Montecchi <gian@grys.it>
+# Marien Zwart <marienz@gentoo.org>
+# Thomas Gerigk <tgerigk@gmx.de>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import libbe
+import libbe.command
+import libbe.command.util
+import libbe.command.depend
+
+
+class Target (libbe.command.Command):
+ """Assorted bug target manipulations and queries
+
+ >>> import os, StringIO, sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> io = libbe.command.StringInputOutput()
+ >>> io.stdout = sys.stdout
+ >>> ui = libbe.command.UserInterface(io=io)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> cmd = Target(ui=ui)
+
+ >>> ret = ui.run(cmd, args=['/a'])
+ No target assigned.
+ >>> ret = ui.run(cmd, args=['/a', 'tomorrow'])
+ >>> ret = ui.run(cmd, args=['/a'])
+ tomorrow
+
+ >>> ui.io.stdout = StringIO.StringIO()
+ >>> ret = ui.run(cmd, {'resolve':True}, ['tomorrow'])
+ >>> output = ui.io.get_stdout().strip()
+ >>> bd.flush_reload()
+ >>> target = bd.bug_from_uuid(output)
+ >>> print target.summary
+ tomorrow
+ >>> print target.severity
+ target
+
+ >>> ui.io.stdout = sys.stdout
+ >>> ret = ui.run(cmd, args=['/a', 'none'])
+ >>> ret = ui.run(cmd, args=['/a'])
+ No target assigned.
+ >>> ui.cleanup()
+ >>> bd.cleanup()
+ """
+ name = 'target'
+
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='resolve', short_name='r',
+ help="Print the UUID for the target bug whose summary "
+ "matches TARGET. If TARGET is not given, print the UUID "
+ "of the current bugdir target."),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='id', metavar='BUG-ID', optional=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ libbe.command.Argument(
+ name='target', metavar='TARGET', optional=True,
+ completion_callback=complete_target),
+ ])
+
+ def _run(self, **params):
+ if params['resolve'] == False:
+ if params['id'] == None:
+ raise libbe.command.UserError('Please specify a bug id.')
+ else:
+ if params['target'] != None:
+ raise libbe.command.UserError('Too many arguments')
+ params['target'] = params.pop('id')
+ bugdir = self._get_bugdir()
+ if params['resolve'] == True:
+ bug = bug_from_target_summary(bugdir, params['target'])
+ if bug == None:
+ print >> self.stdout, 'No target assigned.'
+ else:
+ print >> self.stdout, bug.uuid
+ return 0
+ bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['id'])
+ if params['target'] == None:
+ target = bug_target(bugdir, bug)
+ if target == None:
+ print >> self.stdout, 'No target assigned.'
+ else:
+ print >> self.stdout, target.summary
+ else:
+ if params['target'] == 'none':
+ target = remove_target(bugdir, bug)
+ else:
+ target = add_target(bugdir, bug, params['target'])
+ return 0
+
+ def usage(self):
+ return 'usage: be %(name)s BUG-ID [TARGET]\nor: be %(name)s --resolve [TARGET]' \
+ % vars(self.__class__)
+
+ def _long_help(self):
+ return """
+Assorted bug target manipulations and queries.
+
+If no target is specified, the bug's current target is printed. If
+TARGET is specified, it will be assigned to the bug, creating a new
+target bug if necessary.
+
+Targets are free-form; any text may be specified. They will generally
+be milestone names or release numbers. The value "none" can be used
+to unset the target.
+
+In the alternative `be target --resolve TARGET` form, print the UUID
+of the target-bug with summary TARGET. If target is not given, return
+use the bugdir's current target (see `be set`).
+
+If you want to list all bugs blocking the current target, try
+ $ be depend --status -closed,fixed,wontfix --severity -target \
+ $(be target --resolve)
+
+If you want to set the current bugdir target by summary (rather than
+by UUID), try
+ $ be set target $(be target --resolve SUMMARY)
+"""
+
+def bug_from_target_summary(bugdir, summary=None):
+ if summary == None:
+ if bugdir.target == None:
+ return None
+ else:
+ return bugdir.bug_from_uuid(bugdir.target)
+ matched = []
+ for uuid in bugdir.uuids():
+ bug = bugdir.bug_from_uuid(uuid)
+ if bug.severity == 'target' and bug.summary == summary:
+ matched.append(bug)
+ if len(matched) == 0:
+ return None
+ if len(matched) > 1:
+ raise Exception('Several targets with same summary: %s'
+ % '\n '.join([bug.uuid for bug in matched]))
+ return matched[0]
+
+def bug_target(bugdir, bug):
+ if bug.severity == 'target':
+ return bug
+ matched = []
+ for blocked in libbe.command.depend.get_blocks(bugdir, bug):
+ if blocked.severity == 'target':
+ matched.append(blocked)
+ if len(matched) == 0:
+ return None
+ if len(matched) > 1:
+ raise Exception('This bug (%s) blocks several targets: %s'
+ % (bug.uuid,
+ '\n '.join([b.uuid for b in matched])))
+ return matched[0]
+
+def remove_target(bugdir, bug):
+ target = bug_target(bugdir, bug)
+ libbe.command.depend.remove_block(target, bug)
+ return target
+
+def add_target(bugdir, bug, summary):
+ target = bug_from_target_summary(bugdir, summary)
+ if target == None:
+ target = bugdir.new_bug(summary=summary)
+ target.severity = 'target'
+ libbe.command.depend.add_block(target, bug)
+ return target
+
+def targets(bugdir):
+ """Generate all possible target bug summaries."""
+ bugdir.load_all_bugs()
+ for bug in bugdir:
+ if bug.severity == 'target':
+ yield bug.summary
+
+def target_dict(bugdir):
+ """
+ Return a dict with bug UUID keys and bug summary values for all
+ target bugs.
+ """
+ ret = {}
+ bugdir.load_all_bugs()
+ for bug in bugdir:
+ if bug.severity == 'target':
+ ret[bug.uuid] = bug.summary
+ return ret
+
+def complete_target(command, argument, fragment=None):
+ """List possible command completions for fragment."""
+ return targets(command._get_bugdir())
diff --git a/libbe/command/util.py b/libbe/command/util.py
new file mode 100644
index 0000000..6e8e36c
--- /dev/null
+++ b/libbe/command/util.py
@@ -0,0 +1,203 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import glob
+import os.path
+
+import libbe
+import libbe.command
+
+class Completer (object):
+ def __init__(self, options):
+ self.options = options
+ def __call__(self, bugdir, fragment=None):
+ return [fragment]
+
+def complete_command(command, argument, fragment=None):
+ """
+ List possible command completions for fragment.
+
+ command argument is not used.
+ """
+ return list(libbe.command.commands(command_names=True))
+
+def comp_path(fragment=None):
+ """List possible path completions for fragment."""
+ if fragment == None:
+ fragment = '.'
+ comps = glob.glob(fragment+'*') + glob.glob(fragment+'/*')
+ if len(comps) == 1 and os.path.isdir(comps[0]):
+ comps.extend(glob.glob(comps[0]+'/*'))
+ return comps
+
+def complete_path(command, argument, fragment=None):
+ """List possible path completions for fragment."""
+ return comp_path(fragment)
+
+def complete_status(command, argument, fragment=None):
+ bd = command._get_bugdir()
+ import libbe.bug
+ return libbe.bug.status_values
+
+def complete_severity(command, argument, fragment=None):
+ bd = command._get_bugdir()
+ import libbe.bug
+ return libbe.bug.severity_values
+
+def assignees(bugdir):
+ bugdir.load_all_bugs()
+ return list(set([bug.assigned for bug in bugdir
+ if bug.assigned != None]))
+
+def complete_assigned(command, argument, fragment=None):
+ return assignees(command._get_bugdir())
+
+def complete_extra_strings(command, argument, fragment=None):
+ if fragment == None:
+ return []
+ return [fragment]
+
+def complete_bug_id(command, argument, fragment=None):
+ return complete_bug_comment_id(command, argument, fragment,
+ comments=False)
+
+def complete_bug_comment_id(command, argument, fragment=None,
+ active_only=True, comments=True):
+ import libbe.bugdir
+ import libbe.util.id
+ bd = command._get_bugdir()
+ if fragment == None or len(fragment) == 0:
+ fragment = '/'
+ try:
+ p = libbe.util.id.parse_user(bd, fragment)
+ matches = None
+ root,residual = (fragment, None)
+ if not root.endswith('/'):
+ root += '/'
+ except libbe.util.id.InvalidIDStructure, e:
+ return []
+ except libbe.util.id.NoIDMatches:
+ return []
+ except libbe.util.id.MultipleIDMatches, e:
+ if e.common == None:
+ # choose among bugdirs
+ return e.matches
+ common = e.common
+ matches = e.matches
+ root,residual = libbe.util.id.residual(common, fragment)
+ p = libbe.util.id.parse_user(bd, e.common)
+ bug = None
+ if matches == None: # fragment was complete, get a list of children uuids
+ if p['type'] == 'bugdir':
+ matches = bd.uuids()
+ common = bd.id.user()
+ elif p['type'] == 'bug':
+ if comments == False:
+ return [fragment]
+ bug = bd.bug_from_uuid(p['bug'])
+ matches = bug.uuids()
+ common = bug.id.user()
+ else:
+ assert p['type'] == 'comment', p
+ return [fragment]
+ if p['type'] == 'bugdir':
+ child_fn = bd.bug_from_uuid
+ elif p['type'] == 'bug':
+ if comments == False:
+ return[fragment]
+ if bug == None:
+ bug = bd.bug_from_uuid(p['bug'])
+ child_fn = bug.comment_from_uuid
+ elif p['type'] == 'comment':
+ assert matches == None, matches
+ return [fragment]
+ possible = []
+ common += '/'
+ for m in matches:
+ child = child_fn(m)
+ id = child.id.user()
+ possible.append(id.replace(common, root))
+ return possible
+
+def select_values(string, possible_values, name="unkown"):
+ """
+ This function allows the user to select values from a list of
+ possible values. The default is to select all the values:
+
+ >>> select_values(None, ['abc', 'def', 'hij'])
+ ['abc', 'def', 'hij']
+
+ The user selects values with a comma-separated limit_string.
+ Prepending a minus sign to such a list denotes blacklist mode:
+
+ >>> select_values('-abc,hij', ['abc', 'def', 'hij'])
+ ['def']
+
+ Without the leading -, the selection is in whitelist mode:
+
+ >>> select_values('abc,hij', ['abc', 'def', 'hij'])
+ ['abc', 'hij']
+
+ In either case, appropriate errors are raised if on of the
+ user-values is not in the list of possible values. The name
+ parameter lets you make the error message more clear:
+
+ >>> select_values('-xyz,hij', ['abc', 'def', 'hij'], name="foobar")
+ Traceback (most recent call last):
+ ...
+ UserError: Invalid foobar xyz
+ ['abc', 'def', 'hij']
+ >>> select_values('xyz,hij', ['abc', 'def', 'hij'], name="foobar")
+ Traceback (most recent call last):
+ ...
+ UserError: Invalid foobar xyz
+ ['abc', 'def', 'hij']
+ """
+ possible_values = list(possible_values) # don't alter the original
+ if string == None:
+ pass
+ elif string.startswith('-'):
+ blacklisted_values = set(string[1:].split(','))
+ for value in blacklisted_values:
+ if value not in possible_values:
+ raise libbe.command.UserError('Invalid %s %s\n %s'
+ % (name, value, possible_values))
+ possible_values.remove(value)
+ else:
+ whitelisted_values = string.split(',')
+ for value in whitelisted_values:
+ if value not in possible_values:
+ raise libbe.command.UserError(
+ 'Invalid %s %s\n %s'
+ % (name, value, possible_values))
+ possible_values = whitelisted_values
+ return possible_values
+
+def bug_comment_from_user_id(bugdir, id):
+ p = libbe.util.id.parse_user(bugdir, id)
+ if not p['type'] in ['bug', 'comment']:
+ raise libbe.command.UserError(
+ '%s is a %s id, not a bug or comment id' % (id, p['type']))
+ if p['bugdir'] != bugdir.uuid:
+ raise libbe.command.UserError(
+ "%s doesn't belong to this bugdir (%s)"
+ % (id, bugdir.uuid))
+ bug = bugdir.bug_from_uuid(p['bug'])
+ if 'comment' in p:
+ comment = bug.comment_from_uuid(p['comment'])
+ else:
+ comment = bug.comment_root
+ return (bug, comment)