diff options
author | Gianluca Montecchi <gian@grys.it> | 2010-02-10 00:03:38 +0100 |
---|---|---|
committer | Gianluca Montecchi <gian@grys.it> | 2010-02-10 00:03:38 +0100 |
commit | c67a5863826771001f009e1ee90262ccb7a2e172 (patch) | |
tree | 64c7f83238685959bf40a13c876168071a085556 /libbe/command | |
parent | a60e599798d43ba930efc1f8e2f184d3e8262189 (diff) | |
parent | 50444209eee408dde7d240fdf59bfc9e82b714ce (diff) | |
download | bugseverywhere-c67a5863826771001f009e1ee90262ccb7a2e172.tar.gz |
Merged Trevor's tree
Diffstat (limited to 'libbe/command')
-rw-r--r-- | libbe/command/__init__.py | 40 | ||||
-rw-r--r-- | libbe/command/assign.py | 98 | ||||
-rw-r--r-- | libbe/command/base.py | 554 | ||||
-rw-r--r-- | libbe/command/close.py | 60 | ||||
-rw-r--r-- | libbe/command/comment.py | 169 | ||||
-rw-r--r-- | libbe/command/commit.py | 93 | ||||
-rw-r--r-- | libbe/command/depend.py | 408 | ||||
-rw-r--r-- | libbe/command/diff.py | 139 | ||||
-rw-r--r-- | libbe/command/due.py | 117 | ||||
-rw-r--r-- | libbe/command/help.py | 82 | ||||
-rw-r--r-- | libbe/command/html.py | 686 | ||||
-rw-r--r-- | libbe/command/import_xml.py | 541 | ||||
-rw-r--r-- | libbe/command/init.py | 132 | ||||
-rw-r--r-- | libbe/command/list.py | 279 | ||||
-rw-r--r-- | libbe/command/merge.py | 189 | ||||
-rw-r--r-- | libbe/command/new.py | 103 | ||||
-rw-r--r-- | libbe/command/open.py | 58 | ||||
-rw-r--r-- | libbe/command/remove.py | 79 | ||||
-rw-r--r-- | libbe/command/serve.py | 1172 | ||||
-rw-r--r-- | libbe/command/set.py | 144 | ||||
-rw-r--r-- | libbe/command/severity.py | 98 | ||||
-rw-r--r-- | libbe/command/show.py | 207 | ||||
-rw-r--r-- | libbe/command/status.py | 108 | ||||
-rw-r--r-- | libbe/command/subscribe.py | 385 | ||||
-rw-r--r-- | libbe/command/tag.py | 152 | ||||
-rw-r--r-- | libbe/command/target.py | 209 | ||||
-rw-r--r-- | libbe/command/util.py | 203 |
27 files changed, 6505 insertions, 0 deletions
diff --git a/libbe/command/__init__.py b/libbe/command/__init__.py new file mode 100644 index 0000000..0c8d4ff --- /dev/null +++ b/libbe/command/__init__.py @@ -0,0 +1,40 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import base + +UserError = base.UserError +UnknownCommand = base.UnknownCommand +get_command = base.get_command +get_command_class = base.get_command_class +commands = base.commands +Option = base.Option +Argument = base.Argument +Command = base.Command +InputOutput = base.InputOutput +StdInputOutput = base.StdInputOutput +StringInputOutput = base.StringInputOutput +UnconnectedStorageGetter = base.UnconnectedStorageGetter +StorageCallbacks = base.StorageCallbacks +UserInterface = base.UserInterface + +__all__ = [UserError, UnknownCommand, + get_command, get_command_class, commands, + Option, Argument, Command, + InputOutput, StdInputOutput, StringInputOutput, + StorageCallbacks, UnconnectedStorageGetter, + UserInterface] diff --git a/libbe/command/assign.py b/libbe/command/assign.py new file mode 100644 index 0000000..6abf05e --- /dev/null +++ b/libbe/command/assign.py @@ -0,0 +1,98 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# Marien Zwart <marienz@gentoo.org> +# Thomas Gerigk <tgerigk@gmx.de> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.command +import libbe.command.util + + +class Assign (libbe.command.Command): + u"""Assign an individual or group to fix a bug + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Assign(ui=ui) + + >>> bd.bug_from_uuid('a').assigned is None + True + >>> ui._user_id = u'Fran\xe7ois' + >>> ret = ui.run(cmd, args=['-', '/a']) + >>> bd.flush_reload() + >>> bd.bug_from_uuid('a').assigned + u'Fran\\xe7ois' + + >>> ret = ui.run(cmd, args=['someone', '/a', '/b']) + >>> bd.flush_reload() + >>> bd.bug_from_uuid('a').assigned + 'someone' + >>> bd.bug_from_uuid('b').assigned + 'someone' + + >>> ret = ui.run(cmd, args=['none', '/a']) + >>> bd.flush_reload() + >>> bd.bug_from_uuid('a').assigned is None + True + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'assign' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.args.extend([ + libbe.command.Argument( + name='assigned', metavar='ASSIGNED', default=None, + completion_callback=libbe.command.util.complete_assigned), + libbe.command.Argument( + name='bug-id', metavar='BUG-ID', default=None, + repeatable=True, + completion_callback=libbe.command.util.complete_bug_id), + ]) + + def _run(self, **params): + assigned = params['assigned'] + if assigned == 'none': + assigned = None + elif assigned == '-': + assigned = self._get_user_id() + bugdir = self._get_bugdir() + for bug_id in params['bug-id']: + bug,dummy_comment = \ + libbe.command.util.bug_comment_from_user_id(bugdir, bug_id) + if bug.assigned != assigned: + bug.assigned = assigned + return 0 + + def _long_help(self): + return """ +Assign a person to fix a bug. + +Assigneds should be the person's Bugs Everywhere identity, the same +string that appears in Creator fields. + +Special assigned strings: + "-" assign the bug to yourself + "none" un-assigns the bug +""" diff --git a/libbe/command/base.py b/libbe/command/base.py new file mode 100644 index 0000000..6b1c050 --- /dev/null +++ b/libbe/command/base.py @@ -0,0 +1,554 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import codecs +import optparse +import os.path +import StringIO +import sys + +import libbe +import libbe.storage +import libbe.ui.util.user +import libbe.util.encoding +import libbe.util.plugin + +class UserError(Exception): + pass + +class UnknownCommand(UserError): + def __init__(self, cmd): + Exception.__init__(self, "Unknown command '%s'" % cmd) + self.cmd = cmd + +def get_command(command_name): + """Retrieves the module for a user command + + >>> try: + ... get_command('asdf') + ... except UnknownCommand, e: + ... print e + Unknown command 'asdf' + >>> repr(get_command('list')).startswith("<module 'libbe.command.list' from ") + True + """ + try: + cmd = libbe.util.plugin.import_by_name( + 'libbe.command.%s' % command_name.replace("-", "_")) + except ImportError, e: + raise UnknownCommand(command_name) + return cmd + +def get_command_class(module=None, command_name=None): + """Retrieves a command class from a module. + + >>> import_xml_mod = get_command('import-xml') + >>> import_xml = get_command_class(import_xml_mod, 'import-xml') + >>> repr(import_xml) + "<class 'libbe.command.import_xml.Import_XML'>" + >>> import_xml = get_command_class(command_name='import-xml') + >>> repr(import_xml) + "<class 'libbe.command.import_xml.Import_XML'>" + """ + if module == None: + module = get_command(command_name) + try: + cname = command_name.capitalize().replace('-', '_') + cmd = getattr(module, cname) + except ImportError, e: + raise UnknownCommand(command_name) + return cmd + +def modname_to_command_name(modname): + """Little hack to replicate + >>> import sys + >>> def real_modname_to_command_name(modname): + ... mod = libbe.util.plugin.import_by_name( + ... 'libbe.command.%s' % modname) + ... attrs = [getattr(mod, name) for name in dir(mod)] + ... commands = [] + ... for attr_name in dir(mod): + ... attr = getattr(mod, attr_name) + ... try: + ... if issubclass(attr, Command): + ... commands.append(attr) + ... except TypeError, e: + ... pass + ... if len(commands) == 0: + ... raise Exception('No Command classes in %s' % dir(mod)) + ... return commands[0].name + >>> real_modname_to_command_name('new') + 'new' + >>> real_modname_to_command_name('import_xml') + 'import-xml' + """ + return modname.replace('_', '-') + +def commands(command_names=False): + for modname in libbe.util.plugin.modnames('libbe.command'): + if modname not in ['base', 'util']: + if command_names == False: + yield modname + else: + yield modname_to_command_name(modname) + +class CommandInput (object): + def __init__(self, name, help=''): + self.name = name + self.help = help + + def __str__(self): + return '<%s %s>' % (self.__class__.__name__, self.name) + + def __repr__(self): + return self.__str__() + +class Argument (CommandInput): + def __init__(self, metavar=None, default=None, type='string', + optional=False, repeatable=False, + completion_callback=None, *args, **kwargs): + CommandInput.__init__(self, *args, **kwargs) + self.metavar = metavar + self.default = default + self.type = type + self.optional = optional + self.repeatable = repeatable + self.completion_callback = completion_callback + if self.metavar == None: + self.metavar = self.name.upper() + +class Option (CommandInput): + def __init__(self, callback=None, short_name=None, arg=None, + *args, **kwargs): + CommandInput.__init__(self, *args, **kwargs) + self.callback = callback + self.short_name = short_name + self.arg = arg + if self.arg == None and self.callback == None: + # use an implicit boolean argument + self.arg = Argument(name=self.name, help=self.help, + default=False, type='bool') + self.validate() + + def validate(self): + if self.arg == None: + assert self.callback != None, self.name + return + assert self.callback == None, '%s: %s' (self.name, self.callback) + assert self.arg.name == self.name, \ + 'Name missmatch: %s != %s' % (self.arg.name, self.name) + assert self.arg.optional == False, self.name + assert self.arg.repeatable == False, self.name + + def __str__(self): + return '--%s' % self.name + + def __repr__(self): + return '<Option %s>' % self.__str__() + +class _DummyParser (optparse.OptionParser): + def __init__(self, command): + optparse.OptionParser.__init__(self) + self.remove_option('-h') + self.command = command + self._command_opts = [] + for option in self.command.options: + self._add_option(option) + + def _add_option(self, option): + # from libbe.ui.command_line.CmdOptionParser._add_option + option.validate() + long_opt = '--%s' % option.name + if option.short_name != None: + short_opt = '-%s' % option.short_name + assert '_' not in option.name, \ + 'Non-reconstructable option name %s' % option.name + kwargs = {'dest':option.name.replace('-', '_'), + 'help':option.help} + if option.arg == None or option.arg.type == 'bool': + kwargs['action'] = 'store_true' + kwargs['metavar'] = None + kwargs['default'] = False + else: + kwargs['type'] = option.arg.type + kwargs['action'] = 'store' + kwargs['metavar'] = option.arg.metavar + kwargs['default'] = option.arg.default + if option.short_name != None: + opt = optparse.Option(short_opt, long_opt, **kwargs) + else: + opt = optparse.Option(long_opt, **kwargs) + #option.takes_value = lambda : option.arg != None + opt._option = option + self._command_opts.append(opt) + self.add_option(opt) + +class OptionFormatter (optparse.IndentedHelpFormatter): + def __init__(self, command): + optparse.IndentedHelpFormatter.__init__(self) + self.command = command + def option_help(self): + # based on optparse.OptionParser.format_option_help() + parser = _DummyParser(self.command) + self.store_option_strings(parser) + ret = [] + ret.append(self.format_heading('Options')) + self.indent() + for option in parser._command_opts: + ret.append(self.format_option(option)) + ret.append('\n') + self.dedent() + # Drop the last '\n', or the header if no options or option groups: + return ''.join(ret[:-1]) + +class Command (object): + """One-line command description here. + + >>> c = Command() + >>> print c.help() + usage: be command [options] + <BLANKLINE> + Options: + -h, --help Print a help message. + <BLANKLINE> + --complete Print a list of possible completions. + <BLANKLINE> + A detailed help message. + """ + + name = 'command' + + def __init__(self, ui=None): + self.ui = ui # calling user-interface + self.status = None + self.result = None + self.restrict_file_access = True + self.options = [ + Option(name='help', short_name='h', + help='Print a help message.', + callback=self.help), + Option(name='complete', + help='Print a list of possible completions.', + callback=self.complete), + ] + self.args = [] + + def run(self, options=None, args=None): + self.status = 1 # in case we raise an exception + params = self._parse_options_args(options, args) + if params['help'] == True: + pass + else: + params.pop('help') + if params['complete'] != None: + pass + else: + params.pop('complete') + + self.status = self._run(**params) + return self.status + + def _parse_options_args(self, options=None, args=None): + if options == None: + options = {} + if args == None: + args = [] + params = {} + for option in self.options: + assert option.name not in params, params[option.name] + if option.name in options: + params[option.name] = options.pop(option.name) + elif option.arg != None: + params[option.name] = option.arg.default + else: # non-arg options are flags, set to default flag value + params[option.name] = False + assert 'user-id' not in params, params['user-id'] + if 'user-id' in options: + self._user_id = options.pop('user-id') + if len(options) > 0: + raise UserError, 'Invalid option passed to command %s:\n %s' \ + % (self.name, '\n '.join(['%s: %s' % (k,v) + for k,v in options.items()])) + in_optional_args = False + for i,arg in enumerate(self.args): + if arg.repeatable == True: + assert i == len(self.args)-1, arg.name + if in_optional_args == True: + assert arg.optional == True, arg.name + else: + in_optional_args = arg.optional + if i < len(args): + if arg.repeatable == True: + params[arg.name] = [args[i]] + else: + params[arg.name] = args[i] + else: # no value given + assert in_optional_args == True, arg.name + params[arg.name] = arg.default + if len(args) > len(self.args): # add some additional repeats + assert self.args[-1].repeatable == True, self.args[-1].name + params[self.args[-1].name].extend(args[len(self.args):]) + return params + + def _run(self, **kwargs): + raise NotImplementedError + + def help(self, *args): + return '\n\n'.join([self.usage(), + self._option_help(), + self._long_help().rstrip('\n')]) + + def usage(self): + usage = 'usage: be %s [options]' % self.name + num_optional = 0 + for arg in self.args: + usage += ' ' + if arg.optional == True: + usage += '[' + num_optional += 1 + usage += arg.metavar + if arg.repeatable == True: + usage += ' ...' + usage += ']'*num_optional + return usage + + def _option_help(self): + o = OptionFormatter(self) + return o.option_help().strip('\n') + + def _long_help(self): + return "A detailed help message." + + def complete(self, argument=None, fragment=None): + if argument == None: + ret = ['--%s' % o.name for o in self.options] + if len(self.args) > 0 and self.args[0].completion_callback != None: + ret.extend(self.args[0].completion_callback(self, argument, fragment)) + return ret + elif argument.completion_callback != None: + # finish a particular argument + return argument.completion_callback(self, argument, fragment) + return [] # the particular argument doesn't supply completion info + + def _check_restricted_access(self, storage, path): + """ + Check that the file at path is inside bugdir.root. This is + important if you allow other users to execute becommands with + your username (e.g. if you're running be-handle-mail through + your ~/.procmailrc). If this check wasn't made, a user could + e.g. run + be commit -b ~/.ssh/id_rsa "Hack to expose ssh key" + which would expose your ssh key to anyone who could read the + VCS log. + + >>> class DummyStorage (object): pass + >>> s = DummyStorage() + >>> s.repo = os.path.expanduser('~/x/') + >>> c = Command() + >>> try: + ... c._check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa')) + ... except UserError, e: + ... assert str(e).startswith('file access restricted!'), str(e) + ... print 'we got the expected error' + we got the expected error + >>> c._check_restricted_access(s, os.path.expanduser('~/x')) + >>> c._check_restricted_access(s, os.path.expanduser('~/x/y')) + >>> c.restrict_file_access = False + >>> c._check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa')) + """ + if self.restrict_file_access == True: + path = os.path.abspath(path) + repo = os.path.abspath(storage.repo).rstrip(os.path.sep) + if path == repo or path.startswith(repo+os.path.sep): + return + raise UserError('file access restricted!\n %s not in %s' + % (path, repo)) + + def cleanup(self): + pass + +class InputOutput (object): + def __init__(self, stdin=None, stdout=None): + self.stdin = stdin + self.stdout = stdout + + def setup_command(self, command): + if not hasattr(self.stdin, 'encoding'): + self.stdin.encoding = libbe.util.encoding.get_input_encoding() + if not hasattr(self.stdout, 'encoding'): + self.stdout.encoding = libbe.util.encoding.get_output_encoding() + command.stdin = self.stdin + command.stdin.encoding = self.stdin.encoding + command.stdout = self.stdout + command.stdout.encoding = self.stdout.encoding + + def cleanup(self): + pass + +class StdInputOutput (InputOutput): + def __init__(self, input_encoding=None, output_encoding=None): + stdin,stdout = self._get_io(input_encoding, output_encoding) + InputOutput.__init__(self, stdin, stdout) + + def _get_io(self, input_encoding=None, output_encoding=None): + if input_encoding == None: + input_encoding = libbe.util.encoding.get_input_encoding() + if output_encoding == None: + output_encoding = libbe.util.encoding.get_output_encoding() + stdin = codecs.getwriter(input_encoding)(sys.stdin) + stdin.encoding = input_encoding + stdout = codecs.getwriter(output_encoding)(sys.stdout) + stdout.encoding = output_encoding + return (stdin, stdout) + +class StringInputOutput (InputOutput): + """ + >>> s = StringInputOutput() + >>> s.set_stdin('hello') + >>> s.stdin.read() + 'hello' + >>> s.stdin.read() + '' + >>> print >> s.stdout, 'goodbye' + >>> s.get_stdout() + 'goodbye\\n' + >>> s.get_stdout() + '' + + Also works with unicode strings + + >>> s.set_stdin(u'hello') + >>> s.stdin.read() + u'hello' + >>> print >> s.stdout, u'goodbye' + >>> s.get_stdout() + u'goodbye\\n' + """ + def __init__(self): + stdin = StringIO.StringIO() + stdin.encoding = 'utf-8' + stdout = StringIO.StringIO() + stdout.encoding = 'utf-8' + InputOutput.__init__(self, stdin, stdout) + + def set_stdin(self, stdin_string): + self.stdin = StringIO.StringIO(stdin_string) + + def get_stdout(self): + ret = self.stdout.getvalue() + self.stdout = StringIO.StringIO() # clear stdout for next read + self.stdin.encoding = 'utf-8' + return ret + +class UnconnectedStorageGetter (object): + def __init__(self, location): + self.location = location + + def __call__(self): + return libbe.storage.get_storage(self.location) + +class StorageCallbacks (object): + def __init__(self, location=None): + if location == None: + location = '.' + self.location = location + self._get_unconnected_storage = UnconnectedStorageGetter(location) + + def setup_command(self, command): + command._get_unconnected_storage = self.get_unconnected_storage + command._get_storage = self.get_storage + command._get_bugdir = self.get_bugdir + + def get_unconnected_storage(self): + """ + Callback for use by commands that need it. + + The returned Storage instance is may actually be connected, + but commands that make use of the returned value should only + make use of non-connected Storage methods. This is mainly + intended for the init command, which calls Storage.init(). + """ + if not hasattr(self, '_unconnected_storage'): + if self._get_unconnected_storage == None: + raise NotImplementedError + self._unconnected_storage = self._get_unconnected_storage() + return self._unconnected_storage + + def set_unconnected_storage(self, unconnected_storage): + self._unconnected_storage = unconnected_storage + + def get_storage(self): + """Callback for use by commands that need it.""" + if not hasattr(self, '_storage'): + self._storage = self.get_unconnected_storage() + self._storage.connect() + version = self._storage.storage_version() + if version != libbe.storage.STORAGE_VERSION: + raise libbe.storage.InvalidStorageVersion(version) + return self._storage + + def set_storage(self, storage): + self._storage = storage + + def get_bugdir(self): + """Callback for use by commands that need it.""" + if not hasattr(self, '_bugdir'): + self._bugdir = libbe.bugdir.BugDir(self.get_storage(), + from_storage=True) + return self._bugdir + + def set_bugdir(self, bugdir): + self._bugdir = bugdir + + def cleanup(self): + if hasattr(self, '_storage'): + self._storage.disconnect() + +class UserInterface (object): + def __init__(self, io=None, location=None): + if io == None: + io = StringInputOutput() + self.io = io + self.storage_callbacks = StorageCallbacks(location) + self.restrict_file_access = True + + def help(self): + raise NotImplementedError + + def run(self, command, options=None, args=None): + self.setup_command(command) + return command.run(options, args) + + def setup_command(self, command): + if command.ui == None: + command.ui = self + if self.io != None: + self.io.setup_command(command) + if self.storage_callbacks != None: + self.storage_callbacks.setup_command(command) + command.restrict_file_access = self.restrict_file_access + command._get_user_id = self._get_user_id + + def _get_user_id(self): + """Callback for use by commands that need it.""" + if not hasattr(self, '_user_id'): + self._user_id = libbe.ui.util.user.get_user_id( + self.storage_callbacks.get_storage()) + return self._user_id + + def cleanup(self): + self.storage_callbacks.cleanup() + self.io.cleanup() diff --git a/libbe/command/close.py b/libbe/command/close.py new file mode 100644 index 0000000..0532ed2 --- /dev/null +++ b/libbe/command/close.py @@ -0,0 +1,60 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Marien Zwart <marienz@gentoo.org> +# Thomas Gerigk <tgerigk@gmx.de> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +"""Close a bug""" +from libbe import cmdutil, bugdir +__desc__ = __doc__ + +def execute(args, manipulate_encodings=True): + """ + >>> from libbe import bugdir + >>> import os + >>> bd = bugdir.SimpleBugDir() + >>> os.chdir(bd.root) + >>> print bd.bug_from_shortname("a").status + open + >>> execute(["a"], manipulate_encodings=False) + >>> bd._clear_bugs() + >>> print bd.bug_from_shortname("a").status + closed + >>> bd.cleanup() + """ + parser = get_parser() + options, args = parser.parse_args(args) + cmdutil.default_complete(options, args, parser, + bugid_args={0: lambda bug : bug.active==True}) + if len(args) == 0: + raise cmdutil.UsageError("Please specify a bug id.") + if len(args) > 1: + raise cmdutil.UsageError("Too many arguments.") + bd = bugdir.BugDir(from_disk=True, + manipulate_encodings=manipulate_encodings) + bug = cmdutil.bug_from_shortname(bd, args[0]) + bug.status = "closed" + bd.save() + +def get_parser(): + parser = cmdutil.CmdOptionParser("be close BUG-ID") + return parser + +longhelp=""" +Close the bug identified by BUG-ID. +""" + +def help(): + return get_parser().help_str() + longhelp diff --git a/libbe/command/comment.py b/libbe/command/comment.py new file mode 100644 index 0000000..5bf6acf --- /dev/null +++ b/libbe/command/comment.py @@ -0,0 +1,169 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import os +import sys + +import libbe +import libbe.command +import libbe.command.util +import libbe.comment +import libbe.ui.util.editor +import libbe.util.id + + +class Comment (libbe.command.Command): + """Add a comment to a bug + + >>> import time + >>> import libbe.bugdir + >>> import libbe.util.id + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Comment(ui=ui) + + >>> uuid_gen = libbe.util.id.uuid_gen + >>> libbe.util.id.uuid_gen = lambda: 'X' + >>> ui._user_id = u'Fran\\xe7ois' + >>> ret = ui.run(cmd, args=['/a', 'This is a comment about a']) + Created comment with ID abc/a/X + >>> libbe.util.id.uuid_gen = uuid_gen + >>> bd.flush_reload() + >>> bug = bd.bug_from_uuid('a') + >>> bug.load_comments(load_full=False) + >>> comment = bug.comment_root[0] + >>> comment.id.storage() == comment.uuid + True + >>> print comment.body + This is a comment about a + <BLANKLINE> + >>> comment.author + u'Fran\\xe7ois' + >>> comment.time <= int(time.time()) + True + >>> comment.in_reply_to is None + True + + >>> if 'EDITOR' in os.environ: + ... del os.environ['EDITOR'] + >>> if 'VISUAL' in os.environ: + ... del os.environ['VISUAL'] + >>> ui._user_id = u'Frank' + >>> ret = ui.run(cmd, args=['/b']) + Traceback (most recent call last): + UserError: No comment supplied, and EDITOR not specified. + + >>> os.environ['EDITOR'] = "echo 'I like cheese' > " + >>> libbe.util.id.uuid_gen = lambda: 'Y' + >>> ret = ui.run(cmd, args=['/b']) + Created comment with ID abc/b/Y + >>> libbe.util.id.uuid_gen = uuid_gen + >>> bd.flush_reload() + >>> bug = bd.bug_from_uuid('b') + >>> bug.load_comments(load_full=False) + >>> comment = bug.comment_root[0] + >>> print comment.body + I like cheese + <BLANKLINE> + >>> ui.cleanup() + >>> bd.cleanup() + >>> del os.environ["EDITOR"] + """ + name = 'comment' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='author', short_name='a', + help='Set the comment author', + arg=libbe.command.Argument( + name='author', metavar='AUTHOR')), + libbe.command.Option(name='alt-id', + help='Set an alternate comment ID', + arg=libbe.command.Argument( + name='alt-id', metavar='ID')), + libbe.command.Option(name='content-type', short_name='c', + help='Set comment content-type (e.g. text/plain)', + arg=libbe.command.Argument(name='content-type', + metavar='MIME')), + ]) + self.args.extend([ + libbe.command.Argument( + name='id', metavar='ID', default=None, + completion_callback=libbe.command.util.complete_bug_comment_id), + libbe.command.Argument( + name='comment', metavar='COMMENT', default=None, + optional=True, + completion_callback=libbe.command.util.complete_assigned), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + bug,parent = \ + libbe.command.util.bug_comment_from_user_id(bugdir, params['id']) + if params['comment'] == None: + # try to launch an editor for comment-body entry + try: + if parent == bug.comment_root: + parent_body = bug.summary+'\n' + else: + parent_body = parent.body + estr = 'Please enter your comment above\n\n> %s\n' \ + % ('\n> '.join(parent_body.splitlines())) + body = libbe.ui.util.editor.editor_string(estr) + except libbe.ui.util.editor.CantFindEditor, e: + raise libbe.command.UserError( + 'No comment supplied, and EDITOR not specified.') + if body is None: + raise libbe.command.UserError('No comment entered.') + elif params['comment'] == '-': # read body from stdin + binary = not (params['content-type'] == None + or params['content-type'].startswith("text/")) + if not binary: + body = self.stdin.read() + if not body.endswith('\n'): + body += '\n' + else: # read-in without decoding + body = sys.stdin.read() + else: # body given on command line + body = params['comment'] + if not body.endswith('\n'): + body+='\n' + if params['author'] == None: + params['author'] = self._get_user_id() + + new = parent.new_reply(body=body, content_type=params['content-type']) + for key in ['alt-id', 'author']: + if params[key] != None: + setattr(new, new._setting_name_to_attr_name(key), params[key]) + print >> self.stdout, 'Created comment with ID %s' % new.id.user() + return 0 + + def _long_help(self): + return """ +To add a comment to a bug, use the bug ID as the argument. To reply +to another comment, specify the comment name (as shown in "be show" +output). COMMENT, if specified, should be either the text of your +comment or "-", in which case the text will be read from stdin. If +you do not specify a COMMENT, $EDITOR is used to launch an editor. If +COMMENT is unspecified and EDITOR is not set, no comment will be +created. +""" diff --git a/libbe/command/commit.py b/libbe/command/commit.py new file mode 100644 index 0000000..fd15630 --- /dev/null +++ b/libbe/command/commit.py @@ -0,0 +1,93 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import sys + +import libbe +import libbe.bugdir +import libbe.command +import libbe.command.util +import libbe.storage +import libbe.ui.util.editor + + +class Commit (libbe.command.Command): + """Commit the currently pending changes to the repository + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False, versioned=True) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Commit(ui=ui) + + >>> bd.extra_strings = ['hi there'] + >>> bd.flush_reload() + >>> ui.run(cmd, args=['Making a commit']) # doctest: +ELLIPSIS + Committed ... + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'commit' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='body', short_name='b', + help='Provide the detailed body for the commit message. In the special case that FILE == "EDITOR", spawn an editor to enter the body text (in which case you cannot use stdin for the summary)', + arg=libbe.command.Argument(name='body', metavar='FILE', + completion_callback=libbe.command.util.complete_path)), + libbe.command.Option(name='allow-empty', short_name='a', + help='Allow empty commits'), + ]) + self.args.extend([ + libbe.command.Argument( + name='comment', metavar='COMMENT', default=None), + ]) + + def _run(self, **params): + if params['comment'] == '-': # read summary from stdin + assert params['body'] != 'EDITOR', \ + 'Cannot spawn and editor when the summary is using stdin.' + summary = sys.stdin.readline() + else: + summary = params['comment'] + storage = self._get_storage() + if params['body'] == None: + body = None + elif params['body'] == 'EDITOR': + body = libbe.ui.util.editor.editor_string( + 'Please enter your commit message above') + else: + self._check_restricted_access(storage, params['body']) + body = libbe.util.encoding.get_file_contents( + params['body'], decode=True) + try: + revision = storage.commit(summary, body=body, + allow_empty=params['allow-empty']) + print >> self.stdout, 'Committed %s' % revision + except libbe.storage.EmptyCommit, e: + print >> self.stdout, e + return 1 + + def _long_help(self): + return """ +Commit the current repository status. The summary specified on the +commandline is a string (only one line) that describes the commit +briefly or "-", in which case the string will be read from stdin. +""" diff --git a/libbe/command/depend.py b/libbe/command/depend.py new file mode 100644 index 0000000..f87657b --- /dev/null +++ b/libbe/command/depend.py @@ -0,0 +1,408 @@ +# Copyright (C) 2009-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import copy +import os + +import libbe +import libbe.bug +import libbe.command +import libbe.command.util +import libbe.util.tree + +BLOCKS_TAG="BLOCKS:" +BLOCKED_BY_TAG="BLOCKED-BY:" + +class BrokenLink (Exception): + def __init__(self, blocked_bug, blocking_bug, blocks=True): + if blocks == True: + msg = "Missing link: %s blocks %s" \ + % (blocking_bug.uuid, blocked_bug.uuid) + else: + msg = "Missing link: %s blocked by %s" \ + % (blocked_bug.uuid, blocking_bug.uuid) + Exception.__init__(self, msg) + self.blocked_bug = blocked_bug + self.blocking_bug = blocking_bug + +class Depend (libbe.command.Command): + """Add/remove bug dependencies + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Depend(ui=ui) + + >>> ret = ui.run(cmd, args=['/a', '/b']) + a blocked by: + b + >>> ret = ui.run(cmd, args=['/a']) + a blocked by: + b + >>> ret = ui.run(cmd, {'show-status':True}, ['/a']) # doctest: +NORMALIZE_WHITESPACE + a blocked by: + b closed + >>> ret = ui.run(cmd, args=['/b', '/a']) + b blocked by: + a + b blocks: + a + >>> ret = ui.run(cmd, {'show-status':True}, ['/a']) # doctest: +NORMALIZE_WHITESPACE + a blocked by: + b closed + a blocks: + b closed + >>> ret = ui.run(cmd, {'repair':True}) + >>> ret = ui.run(cmd, {'remove':True}, ['/b', '/a']) + b blocks: + a + >>> ret = ui.run(cmd, {'remove':True}, ['/a', '/b']) + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'depend' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='remove', short_name='r', + help='Remove dependency (instead of adding it)'), + libbe.command.Option(name='show-status', short_name='s', + help='Show status of blocking bugs'), + libbe.command.Option(name='status', + help='Only show bugs matching the STATUS specifier', + arg=libbe.command.Argument( + name='status', metavar='STATUS', default=None, + completion_callback=libbe.command.util.complete_status)), + libbe.command.Option(name='severity', + help='Only show bugs matching the SEVERITY specifier', + arg=libbe.command.Argument( + name='severity', metavar='SEVERITY', default=None, + completion_callback=libbe.command.util.complete_severity)), + libbe.command.Option(name='tree-depth', short_name='t', + help='Print dependency tree rooted at BUG-ID with DEPTH levels of both blockers and blockees. Set DEPTH <= 0 to disable the depth limit.', + arg=libbe.command.Argument( + name='tree-depth', metavar='INT', type='int', + completion_callback=libbe.command.util.complete_severity)), + libbe.command.Option(name='repair', + help='Check for and repair one-way links'), + ]) + self.args.extend([ + libbe.command.Argument( + name='bug-id', metavar='BUG-ID', default=None, + optional=True, + completion_callback=libbe.command.util.complete_bug_id), + libbe.command.Argument( + name='blocking-bug-id', metavar='BUG-ID', default=None, + optional=True, + completion_callback=libbe.command.util.complete_bug_id), + ]) + + def _run(self, **params): + if params['repair'] == True and params['bug-id'] != None: + raise libbe.command.UsageError( + 'No arguments with --repair calls.') + if params['repair'] == False and params['bug-id'] == None: + raise libbe.command.UsageError( + 'Must specify either --repair or a BUG-ID') + if params['tree-depth'] != None \ + and params['blocking-bug-id'] != None: + raise libbe.command.UsageError( + 'Only one bug id used in tree mode.') + bugdir = self._get_bugdir() + if params['repair'] == True: + good,fixed,broken = check_dependencies(bugdir, repair_broken_links=True) + assert len(broken) == 0, broken + if len(fixed) > 0: + print >> self.stdout, 'Fixed the following links:' + print >> self.stdout, \ + '\n'.join(['%s |-- %s' % (blockee.uuid, blocker.uuid) + for blockee,blocker in fixed]) + return 0 + allowed_status_values = \ + libbe.command.util.select_values( + params['status'], libbe.bug.status_values) + allowed_severity_values = \ + libbe.command.util.select_values( + params['severity'], libbe.bug.severity_values) + + bugA, dummy_comment = libbe.command.util.bug_comment_from_user_id( + bugdir, params['bug-id']) + + if params['tree-depth'] != None: + dtree = DependencyTree(bugdir, bugA, params['tree-depth'], + allowed_status_values, + allowed_severity_values) + if len(dtree.blocked_by_tree()) > 0: + print >> self.stdout, '%s blocked by:' % bugA.uuid + for depth,node in dtree.blocked_by_tree().thread(): + if depth == 0: continue + print >> self.stdout, \ + '%s%s' % (' '*(depth), + node.bug.string(shortlist=True)) + if len(dtree.blocks_tree()) > 0: + print >> self.stdout, '%s blocks:' % bugA.uuid + for depth,node in dtree.blocks_tree().thread(): + if depth == 0: continue + print >> self.stdout, \ + '%s%s' % (' '*(depth), + node.bug.string(shortlist=True)) + return 0 + + if params['blocking-bug-id'] != None: + bugB,dummy_comment = libbe.command.util.bug_comment_from_user_id( + bugdir, params['blocking-bug-id']) + if params['remove'] == True: + remove_block(bugA, bugB) + else: # add the dependency + add_block(bugA, bugB) + + blocked_by = get_blocked_by(bugdir, bugA) + if len(blocked_by) > 0: + print >> self.stdout, '%s blocked by:' % bugA.uuid + if params['show-status'] == True: + print >> self.stdout, \ + '\n'.join(['%s\t%s' % (_bug.uuid, _bug.status) + for _bug in blocked_by]) + else: + print >> self.stdout, \ + '\n'.join([_bug.uuid for _bug in blocked_by]) + blocks = get_blocks(bugdir, bugA) + if len(blocks) > 0: + print >> self.stdout, '%s blocks:' % bugA.uuid + if params['show-status'] == True: + print >> self.stdout, \ + '\n'.join(['%s\t%s' % (_bug.uuid, _bug.status) + for _bug in blocks]) + else: + print >> self.stdout, \ + '\n'.join([_bug.uuid for _bug in blocks]) + return 0 + + def _long_help(self): + return """ +Set a dependency with the second bug (B) blocking the first bug (A). +If bug B is not specified, just print a list of bugs blocking (A). + +To search for bugs blocked by a particular bug, try + $ be list --extra-strings BLOCKED-BY:<your-bug-uuid> + +The --status and --severity options allow you to either blacklist or +whitelist values, for example + $ be list --status open,assigned +will only follow and print dependencies with open or assigned status. +You select blacklist mode by starting the list with a minus sign, for +example + $ be list --severity -target +which will only follow and print dependencies with non-target severity. + +If neither bug A nor B is specified, check for and repair the missing +side of any one-way links. + +The "|--" symbol in the repair-mode output is inspired by the +"negative feedback" arrow common in biochemistry. See, for example + http://www.nature.com/nature/journal/v456/n7223/images/nature07513-f5.0.jpg +""" + +# internal helper functions + +def _generate_blocks_string(blocked_bug): + return '%s%s' % (BLOCKS_TAG, blocked_bug.uuid) + +def _generate_blocked_by_string(blocking_bug): + return '%s%s' % (BLOCKED_BY_TAG, blocking_bug.uuid) + +def _parse_blocks_string(string): + assert string.startswith(BLOCKS_TAG) + return string[len(BLOCKS_TAG):] + +def _parse_blocked_by_string(string): + assert string.startswith(BLOCKED_BY_TAG) + return string[len(BLOCKED_BY_TAG):] + +def _add_remove_extra_string(bug, string, add): + estrs = bug.extra_strings + if add == True: + estrs.append(string) + else: # remove the string + estrs.remove(string) + bug.extra_strings = estrs # reassign to notice change + +def _get_blocks(bug): + uuids = [] + for line in bug.extra_strings: + if line.startswith(BLOCKS_TAG): + uuids.append(_parse_blocks_string(line)) + return uuids + +def _get_blocked_by(bug): + uuids = [] + for line in bug.extra_strings: + if line.startswith(BLOCKED_BY_TAG): + uuids.append(_parse_blocked_by_string(line)) + return uuids + +def _repair_one_way_link(blocked_bug, blocking_bug, blocks=None): + if blocks == True: # add blocks link + blocks_string = _generate_blocks_string(blocked_bug) + _add_remove_extra_string(blocking_bug, blocks_string, add=True) + else: # add blocked by link + blocked_by_string = _generate_blocked_by_string(blocking_bug) + _add_remove_extra_string(blocked_bug, blocked_by_string, add=True) + +# functions exposed to other modules + +def add_block(blocked_bug, blocking_bug): + blocked_by_string = _generate_blocked_by_string(blocking_bug) + _add_remove_extra_string(blocked_bug, blocked_by_string, add=True) + blocks_string = _generate_blocks_string(blocked_bug) + _add_remove_extra_string(blocking_bug, blocks_string, add=True) + +def remove_block(blocked_bug, blocking_bug): + blocked_by_string = _generate_blocked_by_string(blocking_bug) + _add_remove_extra_string(blocked_bug, blocked_by_string, add=False) + blocks_string = _generate_blocks_string(blocked_bug) + _add_remove_extra_string(blocking_bug, blocks_string, add=False) + +def get_blocks(bugdir, bug): + """ + Return a list of bugs that the given bug blocks. + """ + blocks = [] + for uuid in _get_blocks(bug): + blocks.append(bugdir.bug_from_uuid(uuid)) + return blocks + +def get_blocked_by(bugdir, bug): + """ + Return a list of bugs blocking the given bug. + """ + blocked_by = [] + for uuid in _get_blocked_by(bug): + blocked_by.append(bugdir.bug_from_uuid(uuid)) + return blocked_by + +def check_dependencies(bugdir, repair_broken_links=False): + """ + Check that links are bi-directional for all bugs in bugdir. + + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir() + >>> a = bd.bug_from_uuid("a") + >>> b = bd.bug_from_uuid("b") + >>> blocked_by_string = _generate_blocked_by_string(b) + >>> _add_remove_extra_string(a, blocked_by_string, add=True) + >>> good,repaired,broken = check_dependencies(bd, repair_broken_links=False) + >>> good + [] + >>> repaired + [] + >>> broken + [(Bug(uuid='a'), Bug(uuid='b'))] + >>> _get_blocks(b) + [] + >>> good,repaired,broken = check_dependencies(bd, repair_broken_links=True) + >>> _get_blocks(b) + ['a'] + >>> good + [] + >>> repaired + [(Bug(uuid='a'), Bug(uuid='b'))] + >>> broken + [] + """ + if bugdir.storage != None: + bugdir.load_all_bugs() + good_links = [] + fixed_links = [] + broken_links = [] + for bug in bugdir: + for blocker in get_blocked_by(bugdir, bug): + blocks = get_blocks(bugdir, blocker) + if (bug, blocks) in good_links+fixed_links+broken_links: + continue # already checked that link + if bug not in blocks: + if repair_broken_links == True: + _repair_one_way_link(bug, blocker, blocks=True) + fixed_links.append((bug, blocker)) + else: + broken_links.append((bug, blocker)) + else: + good_links.append((bug, blocker)) + for blockee in get_blocks(bugdir, bug): + blocked_by = get_blocked_by(bugdir, blockee) + if (blockee, bug) in good_links+fixed_links+broken_links: + continue # already checked that link + if bug not in blocked_by: + if repair_broken_links == True: + _repair_one_way_link(blockee, bug, blocks=False) + fixed_links.append((blockee, bug)) + else: + broken_links.append((blockee, bug)) + else: + good_links.append((blockee, bug)) + return (good_links, fixed_links, broken_links) + +class DependencyTree (object): + """ + Note: should probably be DependencyDiGraph. + """ + def __init__(self, bugdir, root_bug, depth_limit=0, + allowed_status_values=None, + allowed_severity_values=None): + self.bugdir = bugdir + self.root_bug = root_bug + self.depth_limit = depth_limit + self.allowed_status_values = allowed_status_values + self.allowed_severity_values = allowed_severity_values + + def _build_tree(self, child_fn): + root = tree.Tree() + root.bug = self.root_bug + root.depth = 0 + stack = [root] + while len(stack) > 0: + node = stack.pop() + if self.depth_limit > 0 and node.depth == self.depth_limit: + continue + for bug in child_fn(self.bugdir, node.bug): + if self.allowed_status_values != None \ + and not bug.status in self.allowed_status_values: + continue + if self.allowed_severity_values != None \ + and not bug.severity in self.allowed_severity_values: + continue + child = tree.Tree() + child.bug = bug + child.depth = node.depth+1 + node.append(child) + stack.append(child) + return root + + def blocks_tree(self): + if not hasattr(self, "_blocks_tree"): + self._blocks_tree = self._build_tree(get_blocks) + return self._blocks_tree + + def blocked_by_tree(self): + if not hasattr(self, "_blocked_by_tree"): + self._blocked_by_tree = self._build_tree(get_blocked_by) + return self._blocked_by_tree diff --git a/libbe/command/diff.py b/libbe/command/diff.py new file mode 100644 index 0000000..967ab14 --- /dev/null +++ b/libbe/command/diff.py @@ -0,0 +1,139 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.bugdir +import libbe.bug +import libbe.command +import libbe.command.util +import libbe.storage + +import libbe.diff + +class Diff (libbe.command.Command): + __doc__ = """Compare bug reports with older tree + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False, versioned=True) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Diff() + + >>> original = bd.storage.commit('Original status') + >>> bug = bd.bug_from_uuid('a') + >>> bug.status = 'closed' + >>> changed = bd.storage.commit('Closed bug a') + >>> ret = ui.run(cmd, args=[original]) + Modified bugs: + abc/a:cm: Bug A + Changed bug settings: + status: open -> closed + >>> ret = ui.run(cmd, {'subscribe':'%(bugdir_id)s:mod', 'uuids':True}, [original]) + a + >>> bd.storage.versioned = False + >>> ret = ui.run(cmd, args=[original]) + Traceback (most recent call last): + ... + UserError: This repository is not revision-controlled. + >>> ui.cleanup() + >>> bd.cleanup() + """ % {'bugdir_id':libbe.diff.BUGDIR_ID} + name = 'diff' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='repo', short_name='r', + help='Compare with repository in REPO instead' + ' of the current repository.', + arg=libbe.command.Argument( + name='repo', metavar='REPO', + completion_callback=libbe.command.util.complete_path)), + libbe.command.Option(name='subscribe', short_name='s', + help='Only print changes matching SUBSCRIPTION, ' + 'subscription is a comma-separated list of ID:TYPE ' + 'tuples. See `be subscribe --help` for descriptions ' + 'of ID and TYPE.', + arg=libbe.command.Argument( + name='subscribe', metavar='SUBSCRIPTION')), + libbe.command.Option(name='uuids', short_name='u', + help='Only print the changed bug UUIDS.'), + ]) + self.args.extend([ + libbe.command.Argument( + name='revision', metavar='REVISION', default=None, + optional=True) + ]) + + def _run(self, **params): + try: + subscriptions = libbe.diff.subscriptions_from_string( + params['subscribe']) + except ValueError, e: + raise libbe.command.UserError(e.msg) + bugdir = self._get_bugdir() + if bugdir.storage.versioned == False: + raise libbe.command.UserError( + 'This repository is not revision-controlled.') + if params['repo'] == None: + if params['revision'] == None: # get the most recent revision + params['revision'] = bugdir.storage.revision_id(-1) + old_bd = libbe.bugdir.RevisionedBugDir(bugdir, params['revision']) + else: + old_storage = libbe.storage.get_storage(params['repo']) + old_storage.connect() + old_bd_current = libbe.bugdir.BugDir(old_storage, from_disk=True) + if params['revision'] == None: # use the current working state + old_bd = old_bd_current + else: + if old_bd_current.storage.versioned == False: + raise libbe.command.UserError( + '%s is not revision-controlled.' + % storage.repo) + old_bd = libbe.bugdir.RevisionedBugDir(old_bd_current,revision) + d = libbe.diff.Diff(old_bd, bugdir) + tree = d.report_tree(subscriptions) + + if params['uuids'] == True: + uuids = [] + bugs = tree.child_by_path('/bugs') + for bug_type in bugs: + uuids.extend([bug.name for bug in bug_type]) + print >> self.stdout, '\n'.join(uuids) + else : + rep = tree.report_string() + if rep != None: + print >> self.stdout, rep + return 0 + + def _long_help(self): + return """ +Uses the storage backend to compare the current tree with a previous +tree, and prints a pretty report. If REVISION is given, it is a +specifier for the particular previous tree to use. Specifiers are +specific to their storage backend. + +For Arch your specifier must be a fully-qualified revision name. + +Besides the standard summary output, you can use the options to output +UUIDS for the different categories. This output can be used as the +input to 'be show' to get an understanding of the current status. +""" diff --git a/libbe/command/due.py b/libbe/command/due.py new file mode 100644 index 0000000..4463455 --- /dev/null +++ b/libbe/command/due.py @@ -0,0 +1,117 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.command +import libbe.command.util +import libbe.util.utility + + +DUE_TAG = 'DUE:' + + +class Due (libbe.command.Command): + """Set bug due dates + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Due(ui=ui) + + >>> ret = ui.run(cmd, args=['/a']) + No due date assigned. + >>> ret = ui.run(cmd, args=['/a', 'Thu, 01 Jan 1970 00:00:00 +0000']) + >>> ret = ui.run(cmd, args=['/a']) + Thu, 01 Jan 1970 00:00:00 +0000 + >>> ret = ui.run(cmd, args=['/a', 'none']) + >>> ret = ui.run(cmd, args=['/a']) + No due date assigned. + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'due' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.args.extend([ + libbe.command.Argument( + name='bug-id', metavar='BUG-ID', + completion_callback=libbe.command.util.complete_bug_id), + libbe.command.Argument( + name='due', metavar='DUE', optional=True), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + bug,dummy_comment = libbe.command.util.bug_comment_from_user_id( + bugdir, params['bug-id']) + if params['due'] == None: + due_time = get_due(bug) + if due_time is None: + print >> self.stdout, 'No due date assigned.' + else: + print >> self.stdout, libbe.util.utility.time_to_str(due_time) + else: + if params['due'] == 'none': + remove_due(bug) + else: + due_time = libbe.util.utility.str_to_time(params['due']) + set_due(bug, due_time) + + def _long_help(self): + return """ +If no DATE is specified, the bug's current due date is printed. If +DATE is specified, it will be assigned to the bug. +""" + +# internal helper functions + +def _generate_due_string(time): + return "%s%s" % (DUE_TAG, libbe.util.utility.time_to_str(time)) + +def _parse_due_string(string): + assert string.startswith(DUE_TAG) + return libbe.util.utility.str_to_time(string[len(DUE_TAG):]) + +# functions exposed to other modules + +def get_due(bug): + matched = [] + for line in bug.extra_strings: + if line.startswith(DUE_TAG): + matched.append(_parse_due_string(line)) + if len(matched) == 0: + return None + if len(matched) > 1: + raise Exception('Several due dates for %s?:\n %s' + % (bug.uuid, '\n '.join(matched))) + return matched[0] + +def remove_due(bug): + estrs = bug.extra_strings + for due_str in [s for s in estrs if s.startswith(DUE_TAG)]: + estrs.remove(due_str) + bug.extra_strings = estrs # reassign to notice change + +def set_due(bug, time): + remove_due(bug) + estrs = bug.extra_strings + estrs.append(_generate_due_string(time)) + bug.extra_strings = estrs # reassign to notice change diff --git a/libbe/command/help.py b/libbe/command/help.py new file mode 100644 index 0000000..1fc88f0 --- /dev/null +++ b/libbe/command/help.py @@ -0,0 +1,82 @@ +# Copyright (C) 2006-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# Thomas Gerigk <tgerigk@gmx.de> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.command +import libbe.command.util + +TOPICS = {} + +class Help (libbe.command.Command): + """Print help for given command or topic + + >>> import sys + >>> import libbe.bugdir + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> cmd = Help() + + >>> ret = ui.run(cmd, args=['help']) + usage: be help [options] [TOPIC] + <BLANKLINE> + Options: + -h, --help Print a help message. + <BLANKLINE> + --complete Print a list of possible completions. + <BLANKLINE> + <BLANKLINE> + Print help for specified command/topic or list of all commands. + """ + name = 'help' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.args.extend([ + libbe.command.Argument( + name='topic', metavar='TOPIC', default=None, + optional=True, + completion_callback=self.complete_topic) + ]) + + def _run(self, **params): + if params['topic'] == None: + if hasattr(self.ui, 'help'): + print >> self.stdout, self.ui.help().rstrip('\n') + elif params['topic'] in libbe.command.commands(): + module = libbe.command.get_command(params['topic']) + Class = libbe.command.get_command_class(module,params['topic']) + c = Class(ui=self.ui) + print >> self.stdout, c.help().rstrip('\n') + elif params['topic'] in TOPICS: + print >> self.stdout, TOPICS[params['topic']].rstrip('\n') + else: + raise libbe.command.UserError( + '"%s" is neither a command nor topic' % params['topic']) + return 0 + + def _long_help(self): + return """ +Print help for specified command/topic or list of all commands. +""" + + def complete_topic(self, command, argument, fragment=None): + commands = libbe.command.util.complete_command() + topics = sorted(TOPICS.keys()) + return commands + topics diff --git a/libbe/command/html.py b/libbe/command/html.py new file mode 100644 index 0000000..fbbdf97 --- /dev/null +++ b/libbe/command/html.py @@ -0,0 +1,686 @@ +# Copyright (C) 2009-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import codecs +import htmlentitydefs +import os +import os.path +import re +import string +import time +import xml.sax.saxutils + +import libbe +import libbe.command +import libbe.command.util +import libbe.comment +import libbe.util.encoding +import libbe.util.id + + +class HTML (libbe.command.Command): + """Generate a static HTML dump of the current repository status + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = HTML(ui=ui) + + >>> ret = ui.run(cmd, {'output':os.path.join(bd.storage.repo, 'html_export')}) + >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export')) + True + >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'index.html')) + True + >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'index_inactive.html')) + True + >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs')) + True + >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs', 'a.html')) + True + >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs', 'b.html')) + True + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'html' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='output', short_name='o', + help='Set the output path (%default)', + arg=libbe.command.Argument( + name='output', metavar='DIR', default='./html_export', + completion_callback=libbe.command.util.complete_path)), + libbe.command.Option(name='template-dir', short_name='t', + help='Use a different template. Defaults to internal templates', + arg=libbe.command.Argument( + name='template-dir', metavar='DIR', + completion_callback=libbe.command.util.complete_path)), + libbe.command.Option(name='title', + help='Set the bug repository title (%default)', + arg=libbe.command.Argument( + name='title', metavar='STRING', + default='BugsEverywhere Issue Tracker')), + libbe.command.Option(name='index-header', + help='Set the index page headers (%default)', + arg=libbe.command.Argument( + name='index-header', metavar='STRING', + default='BugsEverywhere Bug List')), + libbe.command.Option(name='export-template', short_name='e', + help='Export the default template and exit.'), + libbe.command.Option(name='export-template-dir', short_name='d', + help='Set the directory for the template export (%default)', + arg=libbe.command.Argument( + name='export-template-dir', metavar='DIR', + default='./default-templates/', + completion_callback=libbe.command.util.complete_path)), + libbe.command.Option(name='verbose', short_name='v', + help='Verbose output, default is %default'), + ]) + + def _run(self, **params): + if params['export-template'] == True: + html_gen.write_default_template(params['export-template-dir']) + return 0 + bugdir = self._get_bugdir() + bugdir.load_all_bugs() + html_gen = HTMLGen(bugdir, + template=params['template-dir'], + title=params['title'], + index_header=params['index-header'], + verbose=params['verbose'], + stdout=self.stdout) + html_gen.run(params['output']) + return 0 + + def _long_help(self): + return """ +Generate a set of html pages representing the current state of the bug +directory. +""" + +Html = HTML # alias for libbe.command.base.get_command_class() + +class HTMLGen (object): + def __init__(self, bd, template=None, + title="Site Title", index_header="Index Header", + verbose=False, encoding=None, stdout=None, + ): + self.generation_time = time.ctime() + self.bd = bd + if template == None: + self.template = "default" + else: + self.template = os.path.abspath(os.path.expanduser(template)) + self.title = title + self.index_header = index_header + self.verbose = verbose + self.stdout = stdout + if encoding != None: + self.encoding = encoding + else: + self.encoding = libbe.util.encoding.get_filesystem_encoding() + self._load_default_templates() + if template != None: + self._load_user_templates() + + def run(self, out_dir): + if self.verbose == True: + print >> self.stdout, \ + 'Creating the html output in %s using templates in %s' \ + % (out_dir, self.template) + + bugs_active = [] + bugs_inactive = [] + bugs = [b for b in self.bd] + bugs.sort() + bugs_active = [b for b in bugs if b.active == True] + bugs_inactive = [b for b in bugs if b.active != True] + + self._create_output_directories(out_dir) + self._write_css_file() + for b in bugs: + if b.active: + up_link = '../index.html' + else: + up_link = '../index_inactive.html' + self._write_bug_file(b, up_link) + self._write_index_file( + bugs_active, title=self.title, + index_header=self.index_header, bug_type='active') + self._write_index_file( + bugs_inactive, title=self.title, + index_header=self.index_header, bug_type='inactive') + + def _create_output_directories(self, out_dir): + if self.verbose: + print >> self.stdout, 'Creating output directories' + self.out_dir = self._make_dir(out_dir) + self.out_dir_bugs = self._make_dir( + os.path.join(self.out_dir, 'bugs')) + + def _write_css_file(self): + if self.verbose: + print >> self.stdout, 'Writing css file' + assert hasattr(self, 'out_dir'), \ + 'Must run after ._create_output_directories()' + self._write_file(self.css_file, + [self.out_dir,'style.css']) + + def _write_bug_file(self, bug, up_link): + if self.verbose: + print >> self.stdout, '\tCreating bug file for %s' % bug.id.user() + assert hasattr(self, 'out_dir_bugs'), \ + 'Must run after ._create_output_directories()' + + bug.load_comments(load_full=True) + comment_entries = self._generate_bug_comment_entries(bug) + filename = '%s.html' % bug.uuid + fullpath = os.path.join(self.out_dir_bugs, filename) + template_info = {'title':self.title, + 'charset':self.encoding, + 'up_link':up_link, + 'shortname':bug.id.user(), + 'comment_entries':comment_entries, + 'generation_time':self.generation_time} + for attr in ['uuid', 'severity', 'status', 'assigned', + 'reporter', 'creator', 'time_string', 'summary']: + template_info[attr] = self._escape(getattr(bug, attr)) + self._write_file(self.bug_file % template_info, [fullpath]) + + def _generate_bug_comment_entries(self, bug): + assert hasattr(self, 'out_dir_bugs'), \ + 'Must run after ._create_output_directories()' + + stack = [] + comment_entries = [] + bug.comment_root.sort(cmp=libbe.comment.cmp_time, reverse=True) + for depth,comment in bug.comment_root.thread(flatten=False): + while len(stack) > depth: + # pop non-parents off the stack + stack.pop(-1) + # close non-parent <div class="comment... + comment_entries.append('</div>\n') + assert len(stack) == depth + stack.append(comment) + if depth == 0: + comment_entries.append('<div class="comment root">') + else: + comment_entries.append( + '<div class="comment" id="%s">' % comment.uuid) + template_info = {'shortname': comment.id.user()} + for attr in ['uuid', 'author', 'date', 'body']: + value = getattr(comment, attr) + if attr == 'body': + link_long_ids = False + save_body = False + if comment.content_type == 'text/html': + link_long_ids = True + elif comment.content_type.startswith('text/'): + value = '<pre>\n'+self._escape(value)+'\n</pre>' + link_long_ids = True + elif comment.content_type.startswith('image/'): + save_body = True + value = '<img src="./%s/%s" />' \ + % (bug.uuid, comment.uuid) + else: + save_body = True + value = '<a href="./%s/%s">Link to %s file</a>.' \ + % (bug.uuid, comment.uuid, comment.content_type) + if link_long_ids == True: + value = self._long_to_linked_user(value) + if save_body == True: + per_bug_dir = os.path.join(self.out_dir_bugs, bug.uuid) + if not os.path.exists(per_bug_dir): + os.mkdir(per_bug_dir) + comment_path = os.path.join(per_bug_dir, comment.uuid) + self._write_file( + '<Files %s>\n ForceType %s\n</Files>' \ + % (comment.uuid, comment.content_type), + [per_bug_dir, '.htaccess'], mode='a') + self._write_file(comment.body, + [per_bug_dir, comment.uuid], mode='wb') + else: + value = self._escape(value) + template_info[attr] = value + comment_entries.append(self.bug_comment_entry % template_info) + while len(stack) > 0: + stack.pop(-1) + comment_entries.append('</div>\n') # close every remaining <div class='comment... + return '\n'.join(comment_entries) + + def _long_to_linked_user(self, text): + """ + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> h = HTMLGen(bd) + >>> h._long_to_linked_user('A link #abc123/a#, and a non-link #x#y#.') + 'A link <a href="./a.html">abc/a</a>, and a non-link #x#y#.' + >>> bd.cleanup() + """ + replacer = libbe.util.id.IDreplacer( + [self.bd], self._long_to_linked_user_replacer, wrap=False) + return re.sub( + libbe.util.id.REGEXP, replacer, text) + + def _long_to_linked_user_replacer(self, bugdirs, long_id): + """ + >>> import libbe.bugdir + >>> import libbe.util.id + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> a = bd.bug_from_uuid('a') + >>> uuid_gen = libbe.util.id.uuid_gen + >>> libbe.util.id.uuid_gen = lambda : '0123' + >>> c = a.new_comment('comment for link testing') + >>> libbe.util.id.uuid_gen = uuid_gen + >>> c.uuid + '0123' + >>> h = HTMLGen(bd) + >>> h._long_to_linked_user_replacer([bd], 'abc123') + '#abc123#' + >>> h._long_to_linked_user_replacer([bd], 'abc123/a') + '<a href="./a.html">abc/a</a>' + >>> h._long_to_linked_user_replacer([bd], 'abc123/a/0123') + '<a href="./a.html#0123">abc/a/012</a>' + >>> h._long_to_linked_user_replacer([bd], 'x') + '#x#' + >>> h._long_to_linked_user_replacer([bd], '') + '##' + >>> bd.cleanup() + """ + try: + p = libbe.util.id.parse_user(bugdirs[0], long_id) + short_id = libbe.util.id.long_to_short_user(bugdirs, long_id) + except (libbe.util.id.MultipleIDMatches, + libbe.util.id.NoIDMatches, + libbe.util.id.InvalidIDStructure), e: + return '#%s#' % long_id # re-wrap failures + if p['type'] == 'bugdir': + return '#%s#' % long_id + elif p['type'] == 'bug': + return '<a href="./%s.html">%s</a>' \ + % (p['bug'], short_id) + elif p['type'] == 'comment': + return '<a href="./%s.html#%s">%s</a>' \ + % (p['bug'], p['comment'], short_id) + raise Exception('Invalid id type %s for "%s"' + % (p['type'], long_id)) + + def _write_index_file(self, bugs, title, index_header, bug_type='active'): + if self.verbose: + print >> self.stdout, 'Writing %s index file for %d bugs' % (bug_type, len(bugs)) + assert hasattr(self, 'out_dir'), 'Must run after ._create_output_directories()' + esc = self._escape + + bug_entries = self._generate_index_bug_entries(bugs) + + if bug_type == 'active': + filename = 'index.html' + elif bug_type == 'inactive': + filename = 'index_inactive.html' + else: + raise Exception, 'Unrecognized bug_type: "%s"' % bug_type + template_info = {'title':title, + 'index_header':index_header, + 'charset':self.encoding, + 'active_class':'tab sel', + 'inactive_class':'tab nsel', + 'bug_entries':bug_entries, + 'generation_time':self.generation_time} + if bug_type == 'inactive': + template_info['active_class'] = 'tab nsel' + template_info['inactive_class'] = 'tab sel' + + self._write_file(self.index_file % template_info, + [self.out_dir, filename]) + + def _generate_index_bug_entries(self, bugs): + bug_entries = [] + for bug in bugs: + if self.verbose: + print >> self.stdout, '\tCreating bug entry for %s' % bug.id.user() + template_info = {'shortname':bug.id.user()} + for attr in ['uuid', 'severity', 'status', 'assigned', + 'reporter', 'creator', 'time_string', 'summary']: + template_info[attr] = self._escape(getattr(bug, attr)) + bug_entries.append(self.index_bug_entry % template_info) + return '\n'.join(bug_entries) + + def _escape(self, string): + if string == None: + return '' + return xml.sax.saxutils.escape(char) + + def _load_user_templates(self): + for filename,attr in [('style.css','css_file'), + ('index_file.tpl','index_file'), + ('index_bug_entry.tpl','index_bug_entry'), + ('bug_file.tpl','bug_file'), + ('bug_comment_entry.tpl','bug_comment_entry')]: + fullpath = os.path.join(self.template, filename) + if os.path.exists(fullpath): + setattr(self, attr, self._read_file([fullpath])) + + def _make_dir(self, dir_path): + dir_path = os.path.abspath(os.path.expanduser(dir_path)) + if not os.path.exists(dir_path): + try: + os.makedirs(dir_path) + except: + raise libbe.command.UserError( + 'Cannot create output directory "%s".' % dir_path) + return dir_path + + def _write_file(self, content, path_array, mode='w'): + return libbe.util.encoding.set_file_contents( + os.path.join(*path_array), content, mode, self.encoding) + + def _read_file(self, path_array, mode='r'): + return libbe.util.encoding.get_file_contents( + os.path.join(*path_array), mode, self.encoding, decode=True) + + def write_default_template(self, out_dir): + if self.verbose: + print >> self.stdout, 'Creating output directories' + self.out_dir = self._make_dir(out_dir) + if self.verbose: + print >> self.stdout, 'Creating css file' + self._write_css_file() + if self.verbose: + print >> self.stdout, 'Creating index_file.tpl file' + self._write_file(self.index_file, + [self.out_dir, 'index_file.tpl']) + if self.verbose: + print >> self.stdout, 'Creating index_bug_entry.tpl file' + self._write_file(self.index_bug_entry, + [self.out_dir, 'index_bug_entry.tpl']) + if self.verbose: + print >> self.stdout, 'Creating bug_file.tpl file' + self._write_file(self.bug_file, + [self.out_dir, 'bug_file.tpl']) + if self.verbose: + print >> self.stdout, 'Creating bug_comment_entry.tpl file' + self._write_file(self.bug_comment_entry, + [self.out_dir, 'bug_comment_entry.tpl']) + + def _load_default_templates(self): + self.css_file = """ + body { + font-family: "lucida grande", "sans serif"; + color: #333; + width: auto; + margin: auto; + } + + div.main { + padding: 20px; + margin: auto; + padding-top: 0; + margin-top: 1em; + background-color: #fcfcfc; + } + + div.footer { + font-size: small; + padding-left: 20px; + padding-right: 20px; + padding-top: 5px; + padding-bottom: 5px; + margin: auto; + background: #305275; + color: #fffee7; + } + + table { + border-style: solid; + border: 10px #313131; + border-spacing: 0; + width: auto; + } + + tb { border: 1px; } + + tr { + vertical-align: top; + width: auto; + } + + td { + border-width: 0; + border-style: none; + padding-right: 0.5em; + padding-left: 0.5em; + width: auto; + } + + img { border-style: none; } + + h1 { + padding: 0.5em; + background-color: #305275; + margin-top: 0; + margin-bottom: 0; + color: #fff; + margin-left: -20px; + margin-right: -20px; + } + + ul { + list-style-type: none; + padding: 0; + } + + p { width: auto; } + + a, a:visited { + background: inherit; + text-decoration: none; + } + + a { color: #003d41; } + a:visited { color: #553d41; } + .footer a { color: #508d91; } + + /* bug index pages */ + + td.tab { + padding-right: 1em; + padding-left: 1em; + } + + td.sel.tab { + background-color: #afafaf; + border: 1px solid #afafaf; + font-weight:bold; + } + + td.nsel.tab { border: 0px; } + + table.bug_list { + background-color: #afafaf; + border: 2px solid #afafaf; + } + + .bug_list tr { width: auto; } + tr.wishlist { background-color: #B4FF9B; } + tr.minor { background-color: #FCFF98; } + tr.serious { background-color: #FFB648; } + tr.critical { background-color: #FF752A; } + tr.fatal { background-color: #FF3300; } + + /* bug detail pages */ + + td.bug_detail_label { text-align: right; } + td.bug_detail { } + td.bug_comment_label { text-align: right; vertical-align: top; } + td.bug_comment { } + + div.comment { + padding: 20px; + padding-top: 20px; + margin: auto; + margin-top: 0; + } + + div.root.comment { + padding: 0px; + /* padding-top: 0px; */ + padding-bottom: 20px; + } + """ + + self.index_file = """ + <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" + "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> + <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en"> + <head> + <title>%(title)s</title> + <meta http-equiv="Content-Type" content="text/html; charset=%(charset)s" /> + <link rel="stylesheet" href="style.css" type="text/css" /> + </head> + <body> + + <div class="main"> + <h1>%(index_header)s</h1> + <p></p> + <table> + + <tr> + <td class="%(active_class)s"><a href="index.html">Active Bugs</a></td> + <td class="%(inactive_class)s"><a href="index_inactive.html">Inactive Bugs</a></td> + </tr> + + </table> + <table class="bug_list"> + <tbody> + + %(bug_entries)s + + </tbody> + </table> + </div> + + <div class="footer"> + <p>Generated by <a href="http://www.bugseverywhere.org/"> + BugsEverywhere</a> on %(generation_time)s</p> + <p> + <a href="http://validator.w3.org/check?uri=referer">Validate XHTML</a> | + <a href="http://jigsaw.w3.org/css-validator/check?uri=referer">Validate CSS</a> + </p> + </div> + + </body> + </html> + """ + + self.index_bug_entry =""" + <tr class="%(severity)s"> + <td><a href="bugs/%(uuid)s.html">%(shortname)s</a></td> + <td><a href="bugs/%(uuid)s.html">%(status)s</a></td> + <td><a href="bugs/%(uuid)s.html">%(severity)s</a></td> + <td><a href="bugs/%(uuid)s.html">%(summary)s</a></td> + <td><a href="bugs/%(uuid)s.html">%(time_string)s</a></td> + </tr> + """ + + self.bug_file = """ + <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" + "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> + <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en"> + <head> + <title>%(title)s</title> + <meta http-equiv="Content-Type" content="text/html; charset=%(charset)s" /> + <link rel="stylesheet" href="../style.css" type="text/css" /> + </head> + <body> + + <div class="main"> + <h1>BugsEverywhere Bug List</h1> + <h5><a href="%(up_link)s">Back to Index</a></h5> + <h2>Bug: %(shortname)s</h2> + <table> + <tbody> + + <tr><td class="bug_detail_label">ID :</td> + <td class="bug_detail">%(uuid)s</td></tr> + <tr><td class="bug_detail_label">Short name :</td> + <td class="bug_detail">%(shortname)s</td></tr> + <tr><td class="bug_detail_label">Status :</td> + <td class="bug_detail">%(status)s</td></tr> + <tr><td class="bug_detail_label">Severity :</td> + <td class="bug_detail">%(severity)s</td></tr> + <tr><td class="bug_detail_label">Assigned :</td> + <td class="bug_detail">%(assigned)s</td></tr> + <tr><td class="bug_detail_label">Reporter :</td> + <td class="bug_detail">%(reporter)s</td></tr> + <tr><td class="bug_detail_label">Creator :</td> + <td class="bug_detail">%(creator)s</td></tr> + <tr><td class="bug_detail_label">Created :</td> + <td class="bug_detail">%(time_string)s</td></tr> + <tr><td class="bug_detail_label">Summary :</td> + <td class="bug_detail">%(summary)s</td></tr> + </tbody> + </table> + + <hr/> + + %(comment_entries)s + + </div> + <h5><a href="%(up_link)s">Back to Index</a></h5> + + <div class="footer"> + <p>Generated by <a href="http://www.bugseverywhere.org/"> + BugsEverywhere</a> on %(generation_time)s</p> + <p> + <a href="http://validator.w3.org/check?uri=referer">Validate XHTML</a> | + <a href="http://jigsaw.w3.org/css-validator/check?uri=referer">Validate CSS</a> + </p> + </div> + + </body> + </html> + """ + + self.bug_comment_entry =""" + <table> + <tr> + <td class="bug_comment_label">Comment:</td> + <td class="bug_comment"> + --------- Comment ---------<br/> + ID: %(uuid)s<br/> + Short name: %(shortname)s<br/> + From: %(author)s<br/> + Date: %(date)s<br/> + <br/> + %(body)s + </td> + </tr> + </table> + """ + + # strip leading whitespace + for attr in ['css_file', 'index_file', 'index_bug_entry', 'bug_file', + 'bug_comment_entry']: + value = getattr(self, attr) + value = value.replace('\n'+' '*12, '\n') + setattr(self, attr, value.strip()+'\n') diff --git a/libbe/command/import_xml.py b/libbe/command/import_xml.py new file mode 100644 index 0000000..287d8b7 --- /dev/null +++ b/libbe/command/import_xml.py @@ -0,0 +1,541 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import copy +import os +import sys +try: # import core module, Python >= 2.5 + from xml.etree import ElementTree +except ImportError: # look for non-core module + from elementtree import ElementTree + +import libbe +import libbe.bug +import libbe.command +import libbe.command.util +import libbe.comment +import libbe.util.encoding +import libbe.util.utility + +if libbe.TESTING == True: + import doctest + import StringIO + import unittest + + import libbe.bugdir + +class Import_XML (libbe.command.Command): + """Import comments and bugs from XML + + >>> import time + >>> import StringIO + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Import_XML(ui=ui) + + >>> ui.io.set_stdin('<be-xml><comment><uuid>c</uuid><body>This is a comment about a</body></comment></be-xml>') + >>> ret = ui.run(cmd, {'comment-root':'/a'}, ['-']) + >>> bd.flush_reload() + >>> bug = bd.bug_from_uuid('a') + >>> bug.load_comments(load_full=False) + >>> comment = bug.comment_root[0] + >>> print comment.body + This is a comment about a + <BLANKLINE> + >>> comment.time <= int(time.time()) + True + >>> comment.in_reply_to is None + True + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'import-xml' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='ignore-missing-references', short_name='i', + help="If any comment's <in-reply-to> refers to a non-existent comment, ignore it (instead of raising an exception)."), + libbe.command.Option(name='add-only', short_name='a', + help='If any bug or comment listed in the XML file already exists in the bug repository, do not alter the repository version.'), + libbe.command.Option(name='comment-root', short_name='c', + help='Supply a bug or comment ID as the root of any <comment> elements that are direct children of the <be-xml> element. If any such <comment> elements exist, you are required to set this option.', + arg=libbe.command.Argument( + name='comment-root', metavar='ID', + completion_callback=libbe.command.util.complete_bug_comment_id)), + ]) + self.args.extend([ + libbe.command.Argument( + name='xml-file', metavar='XML-FILE'), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + writeable = bugdir.storage.writeable + bugdir.storage.writeable = False + if params['comment-root'] != None: + croot_bug,croot_comment = \ + libbe.command.util.bug_comment_from_user_id( + bugdir, params['comment-root']) + croot_bug.load_comments(load_full=True) + if croot_comment.uuid == libbe.comment.INVALID_UUID: + croot_comment = croot_bug.comment_root + else: + croot_comment = croot_bug.comment_from_uuid(croot_comment.uuid) + new_croot_bug = libbe.bug.Bug(bugdir=bugdir, uuid=croot_bug.uuid) + new_croot_bug.explicit_attrs = [] + new_croot_bug.comment_root = copy.deepcopy(croot_bug.comment_root) + if croot_comment.uuid == libbe.comment.INVALID_UUID: + new_croot_comment = new_croot_bug.comment_root + else: + new_croot_comment = \ + new_croot_bug.comment_from_uuid(croot_comment.uuid) + for new in new_croot_bug.comments(): + new.explicit_attrs = [] + else: + croot_bug,croot_comment = (None, None) + + if params['xml-file'] == '-': + xml = self.stdin.read().encode(self.stdin.encoding) + else: + self._check_restricted_access(bugdir.storage, params['xml-file']) + xml = libbe.util.encoding.get_file_contents( + params['xml-file']) + + # parse the xml + root_bugs = [] + root_comments = [] + version = {} + be_xml = ElementTree.XML(xml) + if be_xml.tag != 'be-xml': + raise libbe.util.utility.InvalidXML( + 'import-xml', be_xml, 'root element must be <be-xml>') + for child in be_xml.getchildren(): + if child.tag == 'bug': + new = libbe.bug.Bug(bugdir=bugdir) + new.from_xml(child) + root_bugs.append(new) + elif child.tag == 'comment': + new = libbe.comment.Comment(croot_bug) + new.from_xml(child) + root_comments.append(new) + elif child.tag == 'version': + for gchild in child.getchildren(): + if child.tag in ['tag', 'nick', 'revision', 'revision-id']: + text = xml.sax.saxutils.unescape(child.text) + text = text.decode('unicode_escape').strip() + version[child.tag] = text + else: + print >> sys.stderr, 'ignoring unknown tag %s in %s' \ + % (gchild.tag, child.tag) + else: + print >> sys.stderr, 'ignoring unknown tag %s in %s' \ + % (child.tag, comment_list.tag) + + # merge the new root_comments + if params['add-only'] == True: + accept_changes = False + accept_extra_strings = False + else: + accept_changes = True + accept_extra_strings = True + accept_comments = True + if len(root_comments) > 0: + if croot_bug == None: + raise UserError( + '--comment-root option is required for your root comments:\n%s' + % '\n\n'.join([c.string() for c in root_comments])) + try: + # link new comments + new_croot_bug.add_comments(root_comments, + default_parent=new_croot_comment, + ignore_missing_references= \ + params['ignore-missing-references']) + except libbe.comment.MissingReference, e: + raise libbe.command.UserError(e) + croot_bug.merge(new_croot_bug, accept_changes=accept_changes, + accept_extra_strings=accept_extra_strings, + accept_comments=accept_comments) + + # merge the new croot_bugs + merged_bugs = [] + old_bugs = [] + for new in root_bugs: + try: + old = bugdir.bug_from_uuid(new.alt_id) + except KeyError: + old = None + if old == None: + bd.append(new) + else: + old.load_comments(load_full=True) + old.merge(new, accept_changes=accept_changes, + accept_extra_strings=accept_extra_strings, + accept_comments=accept_comments) + merged_bugs.append(new) + old_bugs.append(old) + + # protect against programmer error causing data loss: + if croot_bug != None: + comms = [] + for c in croot_comment.traverse(): + comms.append(c.uuid) + if c.alt_id != None: + comms.append(c.alt_id) + if croot_comment.uuid == libbe.comment.INVALID_UUID: + root_text = croot_bug.id.user() + else: + root_text = croot_comment.id.user() + for new in root_comments: + assert new.uuid in comms or new.alt_id in comms, \ + "comment %s (alt: %s) wasn't added to %s" \ + % (new.uuid, new.alt_id, root_text) + for new in root_bugs: + if not new in merged_bugs: + assert bugdir.has_bug(new.uuid), \ + "bug %s wasn't added" % (new.uuid) + + # save new information + bugdir.storage.writeable = writeable + if croot_bug != None: + croot_bug.save() + for new in root_bugs: + if not new in merged_bugs: + new.save() + for old in old_bugs: + old.save() + + def _long_help(self): + return """ +Import comments and bugs from XMLFILE. If XMLFILE is '-', the file is +read from stdin. + +This command provides a fallback mechanism for passing bugs between +repositories, in case the repositories VCSs are incompatible. If the +VCSs are compatible, it's better to use their builtin merge/push/pull +to share this information, as that will preserve a more detailed +history. + +The XML file should be formatted similarly to + <be-xml> + <version> + <tag>1.0.0</tag> + <branch-nick>be</branch-nick> + <revno>446</revno> + <revision-id>a@b.com-20091119214553-iqyw2cpqluww3zna</revision-id> + <version> + <bug> + ... + <comment>...</comment> + <comment>...</comment> + </bug> + <bug>...</bug> + <comment>...</comment> + <comment>...</comment> + </be-xml> +where the ellipses mark output commpatible with Bug.xml() and +Comment.xml(). Take a look at the output of `be show --xml` for some +explicit examples. Unrecognized tags are ignored. Missing tags are +left at the default value. The version tag is not required, but is +strongly recommended. + +The bug and comment UUIDs are always auto-generated, so if you set a +<uuid> field, but no <alt-id> field, your <uuid> will be used as the +comment's <alt-id>. An exception is raised if <alt-id> conflicts with +an existing comment. Bugs do not have a permantent alt-id, so they +the <uuid>s you specify are not saved. The <uuid>s _are_ used to +match agains prexisting bug and comment uuids, and comment alt-ids, +and fields explicitly given in the XML file will replace old versions +unless the --add-only flag. + +*.extra_strings recieves special treatment, and if --add-only is not +set, the resulting list concatenates both source lists and removes +repeats. + +Here's an example of import activity: + Repository + bug (uuid=B, creator=John, status=open) + estr (don't forget your towel) + estr (helps with space travel) + com (uuid=C1, author=Jane, body=Hello) + com (uuid=C2, author=Jess, body=World) + XML + bug (uuid=B, status=fixed) + estr (don't forget your towel) + estr (watch out for flying dolphins) + com (uuid=C1, body=So long) + com (uuid=C3, author=Jed, body=And thanks) + Result + bug (uuid=B, creator=John, status=fixed) + estr (don't forget your towel) + estr (helps with space travel) + estr (watch out for flying dolphins) + com (uuid=C1, author=Jane, body=So long) + com (uuid=C2, author=Jess, body=World) + com (uuid=C4, alt-id=C3, author=Jed, body=And thanks) + Result, with --add-only + bug (uuid=B, creator=John, status=open) + estr (don't forget your towel) + estr (helps with space travel) + com (uuid=C1, author=Jane, body=Hello) + com (uuid=C2, author=Jess, body=World) + com (uuid=C4, alt-id=C3, author=Jed, body=And thanks) + +Examples: + +Import comments (e.g. emails from an mbox) and append to bug XYZ + $ be-mbox-to-xml mail.mbox | be import-xml --c XYZ - +Or you can append those emails underneath the prexisting comment XYZ-3 + $ be-mbox-to-xml mail.mbox | be import-xml --c XYZ-3 - + +User creates a new bug + user$ be new "The demuxulizer is broken" + Created bug with ID 48f + user$ be comment 48f + <Describe bug> + ... +User exports bug as xml and emails it to the developers + user$ be show --xml 48f > 48f.xml + user$ cat 48f.xml | mail -s "Demuxulizer bug xml" devs@b.com +or equivalently (with a slightly fancier be-handle-mail compatible +email): + user$ be email-bugs 48f +Devs recieve email, and save it's contents as demux-bug.xml + dev$ cat demux-bug.xml | be import-xml - +""" + + +Import_xml = Import_XML # alias for libbe.command.base.get_command_class() + +if libbe.TESTING == True: + class LonghelpTestCase (unittest.TestCase): + """ + Test import scenarios given in longhelp. + """ + def setUp(self): + self.bugdir = libbe.bugdir.SimpleBugDir(memory=False) + io = libbe.command.StringInputOutput() + self.ui = libbe.command.UserInterface(io=io) + self.ui.storage_callbacks.set_storage(self.bugdir.storage) + self.cmd = Import_XML(ui=self.ui) + self.cmd._storage = self.bugdir.storage + self.cmd._setup_io = lambda i_enc,o_enc : None + bugA = self.bugdir.bug_from_uuid('a') + self.bugdir.remove_bug(bugA) + self.bugdir.storage.writeable = False + bugB = self.bugdir.bug_from_uuid('b') + bugB.creator = 'John' + bugB.status = 'open' + bugB.extra_strings += ["don't forget your towel"] + bugB.extra_strings += ['helps with space travel'] + comm1 = bugB.comment_root.new_reply(body='Hello\n') + comm1.uuid = 'c1' + comm1.author = 'Jane' + comm2 = bugB.comment_root.new_reply(body='World\n') + comm2.uuid = 'c2' + comm2.author = 'Jess' + self.bugdir.storage.writeable = True + bugB.save() + self.xml = """ + <be-xml> + <bug> + <uuid>b</uuid> + <status>fixed</status> + <summary>a test bug</summary> + <extra-string>don't forget your towel</extra-string> + <extra-string>watch out for flying dolphins</extra-string> + <comment> + <uuid>c1</uuid> + <body>So long</body> + </comment> + <comment> + <uuid>c3</uuid> + <author>Jed</author> + <body>And thanks</body> + </comment> + </bug> + </be-xml> + """ + self.root_comment_xml = """ + <be-xml> + <comment> + <uuid>c1</uuid> + <body>So long</body> + </comment> + <comment> + <uuid>c3</uuid> + <author>Jed</author> + <body>And thanks</body> + </comment> + </be-xml> + """ + def tearDown(self): + self.bugdir.cleanup() + self.ui.cleanup() + def _execute(self, xml, params={}, args=[]): + self.ui.io.set_stdin(xml) + self.ui.run(self.cmd, params, args) + self.bugdir.flush_reload() + def testCleanBugdir(self): + uuids = list(self.bugdir.uuids()) + self.failUnless(uuids == ['b'], uuids) + def testNotAddOnly(self): + bugB = self.bugdir.bug_from_uuid('b') + self._execute(self.xml, {}, ['-']) + uuids = list(self.bugdir.uuids()) + self.failUnless(uuids == ['b'], uuids) + bugB = self.bugdir.bug_from_uuid('b') + self.failUnless(bugB.uuid == 'b', bugB.uuid) + self.failUnless(bugB.creator == 'John', bugB.creator) + self.failUnless(bugB.status == 'fixed', bugB.status) + self.failUnless(bugB.summary == 'a test bug', bugB.summary) + estrs = ["don't forget your towel", + 'helps with space travel', + 'watch out for flying dolphins'] + self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings) + comments = list(bugB.comments()) + self.failUnless(len(comments) == 3, + ['%s (%s, %s)' % (c.uuid, c.alt_id, c.body) + for c in comments]) + c1 = bugB.comment_from_uuid('c1') + comments.remove(c1) + self.failUnless(c1.uuid == 'c1', c1.uuid) + self.failUnless(c1.alt_id == None, c1.alt_id) + self.failUnless(c1.author == 'Jane', c1.author) + self.failUnless(c1.body == 'So long\n', c1.body) + c2 = bugB.comment_from_uuid('c2') + comments.remove(c2) + self.failUnless(c2.uuid == 'c2', c2.uuid) + self.failUnless(c2.alt_id == None, c2.alt_id) + self.failUnless(c2.author == 'Jess', c2.author) + self.failUnless(c2.body == 'World\n', c2.body) + c4 = comments[0] + self.failUnless(len(c4.uuid) == 36, c4.uuid) + self.failUnless(c4.alt_id == 'c3', c4.alt_id) + self.failUnless(c4.author == 'Jed', c4.author) + self.failUnless(c4.body == 'And thanks\n', c4.body) + def testAddOnly(self): + bugB = self.bugdir.bug_from_uuid('b') + initial_bugB_summary = bugB.summary + self._execute(self.xml, {'add-only':True}, ['-']) + uuids = list(self.bugdir.uuids()) + self.failUnless(uuids == ['b'], uuids) + bugB = self.bugdir.bug_from_uuid('b') + self.failUnless(bugB.uuid == 'b', bugB.uuid) + self.failUnless(bugB.creator == 'John', bugB.creator) + self.failUnless(bugB.status == 'open', bugB.status) + self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary) + estrs = ["don't forget your towel", + 'helps with space travel'] + self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings) + comments = list(bugB.comments()) + self.failUnless(len(comments) == 3, + ['%s (%s)' % (c.uuid, c.alt_id) for c in comments]) + c1 = bugB.comment_from_uuid('c1') + comments.remove(c1) + self.failUnless(c1.uuid == 'c1', c1.uuid) + self.failUnless(c1.alt_id == None, c1.alt_id) + self.failUnless(c1.author == 'Jane', c1.author) + self.failUnless(c1.body == 'Hello\n', c1.body) + c2 = bugB.comment_from_uuid('c2') + comments.remove(c2) + self.failUnless(c2.uuid == 'c2', c2.uuid) + self.failUnless(c2.alt_id == None, c2.alt_id) + self.failUnless(c2.author == 'Jess', c2.author) + self.failUnless(c2.body == 'World\n', c2.body) + c4 = comments[0] + self.failUnless(len(c4.uuid) == 36, c4.uuid) + self.failUnless(c4.alt_id == 'c3', c4.alt_id) + self.failUnless(c4.author == 'Jed', c4.author) + self.failUnless(c4.body == 'And thanks\n', c4.body) + def testRootCommentsNotAddOnly(self): + bugB = self.bugdir.bug_from_uuid('b') + initial_bugB_summary = bugB.summary + self._execute(self.root_comment_xml, {'comment-root':'/b'}, ['-']) + uuids = list(self.bugdir.uuids()) + uuids = list(self.bugdir.uuids()) + self.failUnless(uuids == ['b'], uuids) + bugB = self.bugdir.bug_from_uuid('b') + self.failUnless(bugB.uuid == 'b', bugB.uuid) + self.failUnless(bugB.creator == 'John', bugB.creator) + self.failUnless(bugB.status == 'open', bugB.status) + self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary) + estrs = ["don't forget your towel", + 'helps with space travel'] + self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings) + comments = list(bugB.comments()) + self.failUnless(len(comments) == 3, + ['%s (%s, %s)' % (c.uuid, c.alt_id, c.body) + for c in comments]) + c1 = bugB.comment_from_uuid('c1') + comments.remove(c1) + self.failUnless(c1.uuid == 'c1', c1.uuid) + self.failUnless(c1.alt_id == None, c1.alt_id) + self.failUnless(c1.author == 'Jane', c1.author) + self.failUnless(c1.body == 'So long\n', c1.body) + c2 = bugB.comment_from_uuid('c2') + comments.remove(c2) + self.failUnless(c2.uuid == 'c2', c2.uuid) + self.failUnless(c2.alt_id == None, c2.alt_id) + self.failUnless(c2.author == 'Jess', c2.author) + self.failUnless(c2.body == 'World\n', c2.body) + c4 = comments[0] + self.failUnless(len(c4.uuid) == 36, c4.uuid) + self.failUnless(c4.alt_id == 'c3', c4.alt_id) + self.failUnless(c4.author == 'Jed', c4.author) + self.failUnless(c4.body == 'And thanks\n', c4.body) + def testRootCommentsAddOnly(self): + bugB = self.bugdir.bug_from_uuid('b') + initial_bugB_summary = bugB.summary + self._execute(self.root_comment_xml, + {'comment-root':'/b', 'add-only':True}, ['-']) + uuids = list(self.bugdir.uuids()) + self.failUnless(uuids == ['b'], uuids) + bugB = self.bugdir.bug_from_uuid('b') + self.failUnless(bugB.uuid == 'b', bugB.uuid) + self.failUnless(bugB.creator == 'John', bugB.creator) + self.failUnless(bugB.status == 'open', bugB.status) + self.failUnless(bugB.summary == initial_bugB_summary, bugB.summary) + estrs = ["don't forget your towel", + 'helps with space travel'] + self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings) + comments = list(bugB.comments()) + self.failUnless(len(comments) == 3, + ['%s (%s)' % (c.uuid, c.alt_id) for c in comments]) + c1 = bugB.comment_from_uuid('c1') + comments.remove(c1) + self.failUnless(c1.uuid == 'c1', c1.uuid) + self.failUnless(c1.alt_id == None, c1.alt_id) + self.failUnless(c1.author == 'Jane', c1.author) + self.failUnless(c1.body == 'Hello\n', c1.body) + c2 = bugB.comment_from_uuid('c2') + comments.remove(c2) + self.failUnless(c2.uuid == 'c2', c2.uuid) + self.failUnless(c2.alt_id == None, c2.alt_id) + self.failUnless(c2.author == 'Jess', c2.author) + self.failUnless(c2.body == 'World\n', c2.body) + c4 = comments[0] + self.failUnless(len(c4.uuid) == 36, c4.uuid) + self.failUnless(c4.alt_id == 'c3', c4.alt_id) + self.failUnless(c4.author == 'Jed', c4.author) + self.failUnless(c4.body == 'And thanks\n', c4.body) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/command/init.py b/libbe/command/init.py new file mode 100644 index 0000000..7b83645 --- /dev/null +++ b/libbe/command/init.py @@ -0,0 +1,132 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import os.path + +import libbe +import libbe.bugdir +import libbe.command +import libbe.storage + +class Init (libbe.command.Command): + """Create an on-disk bug repository + + >>> import os, sys + >>> import libbe.storage.vcs + >>> import libbe.storage.vcs.base + >>> import libbe.util.utility + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> cmd = Init() + + >>> dir = libbe.util.utility.Dir() + >>> vcs = libbe.storage.vcs.vcs_by_name('None') + >>> vcs.repo = dir.path + >>> try: + ... vcs.connect() + ... except libbe.storage.ConnectionError: + ... 'got error' + 'got error' + >>> ui.storage_callbacks.set_unconnected_storage(vcs) + >>> ui.run(cmd) + No revision control detected. + BE repository initialized. + >>> bd = libbe.bugdir.BugDir(vcs) + >>> vcs.disconnect() + >>> vcs.connect() + >>> bugdir = libbe.bugdir.BugDir(vcs, from_storage=True) + >>> vcs.disconnect() + >>> vcs.destroy() + >>> dir.cleanup() + + >>> dir = libbe.util.utility.Dir() + >>> vcs = libbe.storage.vcs.installed_vcs() + >>> vcs.repo = dir.path + >>> vcs._vcs_init(vcs.repo) + >>> ui.storage_callbacks.set_unconnected_storage(vcs) + >>> if vcs.name in libbe.storage.vcs.base.VCS_ORDER: + ... ui.run(cmd) # doctest: +ELLIPSIS + ... else: + ... vcs.init() + ... vcs.connect() + ... print 'Using ... for revision control.\\nDirectory initialized.' + Using ... for revision control. + BE repository initialized. + >>> vcs.disconnect() + >>> vcs.connect() + >>> bugdir = libbe.bugdir.BugDir(vcs, from_storage=True) + >>> vcs.disconnect() + >>> vcs.destroy() + >>> dir.cleanup() + """ + name = 'init' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + + def _run(self, **params): + storage = self._get_unconnected_storage() + if not os.path.isdir(storage.repo): + raise libbe.command.UserError( + 'No such directory: %s' % storage.repo) + try: + storage.connect() + raise libbe.command.UserError( + 'Directory already initialized: %s' % storage.repo) + except libbe.storage.ConnectionError: + pass + storage.init() + storage.connect() + self.ui.storage_callbacks.set_storage(storage) + bd = libbe.bugdir.BugDir(storage, from_storage=False) + self.ui.storage_callbacks.set_bugdir(bd) + if bd.storage.name is not 'None': + print >> self.stdout, \ + 'Using %s for revision control.' % storage.name + else: + print >> self.stdout, 'No revision control detected.' + print >> self.stdout, 'BE repository initialized.' + + def _long_help(self): + return """ +This command initializes Bugs Everywhere support for the specified directory +and all its subdirectories. It will auto-detect any supported revision control +system. You can use "be set vcs_name" to change the vcs being used. + +The directory defaults to your current working directory, but you can +change that by passing the --repo option to be + $ be --repo path/to/new/bug/root init + +When initialized in a version-controlled directory, BE sinks to the +version-control root. In that case, the BE repository will be created +under that directory, rather than the current directory or the one +passed in --repo. Consider the following tree, versioned in Git. + ~ + `--projectX + |-- .git + `-- src +Calling + ~$ be --repo ./projectX/src init +will create the BE repository rooted in projectX: + ~ + `--projectX + |-- .be + |-- .git + `-- src +""" diff --git a/libbe/command/list.py b/libbe/command/list.py new file mode 100644 index 0000000..3803257 --- /dev/null +++ b/libbe/command/list.py @@ -0,0 +1,279 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import os +import re + +import libbe +import libbe.bug +import libbe.command +import libbe.command.depend +import libbe.command.target +import libbe.command.util + +# get a list of * for cmp_*() comparing two bugs. +AVAILABLE_CMPS = [fn[4:] for fn in dir(libbe.bug) if fn[:4] == 'cmp_'] +AVAILABLE_CMPS.remove('attr') # a cmp_* template. + +class Filter (object): + def __init__(self, status='all', severity='all', assigned='all', + target='all', extra_strings_regexps=[]): + self.status = status + self.severity = severity + self.assigned = assigned + self.target = target + self.extra_strings_regexps = extra_strings_regexps + + def __call__(self, bugdir, bug): + if self.status != 'all' and not bug.status in self.status: + return False + if self.severity != 'all' and not bug.severity in self.severity: + return False + if self.assigned != 'all' and not bug.assigned in self.assigned: + return False + if self.target == 'all': + pass + else: + target_bug = libbe.command.target.bug_target(bugdir, bug) + if self.target in ['none', None]: + if target_bug.summary != None: + return False + else: + if target_bug.summary != self.target: + return False + if len(bug.extra_strings) == 0: + if len(self.extra_strings_regexps) > 0: + return False + else: + for string in bug.extra_strings: + for regexp in self.extra_strings_regexps: + if not regexp.match(string): + return False + return True + +class List (libbe.command.Command): + """List bugs + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = List(ui=ui) + + >>> ret = ui.run(cmd) + abc/a:om: Bug A + >>> ret = ui.run(cmd, {'status':'closed'}) + abc/b:cm: Bug B + >>> bd.storage.writeable + True + >>> ui.cleanup() + >>> bd.cleanup() + """ + + name = 'list' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='status', + help='Only show bugs matching the STATUS specifier', + arg=libbe.command.Argument( + name='status', metavar='STATUS', default='active', + completion_callback=libbe.command.util.complete_status)), + libbe.command.Option(name='severity', + help='Only show bugs matching the SEVERITY specifier', + arg=libbe.command.Argument( + name='severity', metavar='SEVERITY', default='all', + completion_callback=libbe.command.util.complete_severity)), + libbe.command.Option(name='important', + help='List bugs with >= "serious" severity'), + libbe.command.Option(name='assigned', short_name='a', + help='Only show bugs matching ASSIGNED', + arg=libbe.command.Argument( + name='assigned', metavar='ASSIGNED', default=None, + completion_callback=libbe.command.util.complete_assigned)), + libbe.command.Option(name='mine', short_name='m', + help='List bugs assigned to you'), + libbe.command.Option(name='extra-strings', short_name='e', + help='Only show bugs matching STRINGS, e.g. --extra-strings' + ' TAG:working,TAG:xml', + arg=libbe.command.Argument( + name='extra-strings', metavar='STRINGS', default=None, + completion_callback=libbe.command.util.complete_extra_strings)), + libbe.command.Option(name='sort', short_name='S', + help='Adjust bug-sort criteria with comma-separated list ' + 'SORT. e.g. "--sort creator,time". ' + 'Available criteria: %s' % ','.join(AVAILABLE_CMPS), + arg=libbe.command.Argument( + name='sort', metavar='SORT', default=None, + completion_callback=libbe.command.util.Completer(AVAILABLE_CMPS))), + libbe.command.Option(name='ids', short_name='i', + help='Only print the bug IDS'), + libbe.command.Option(name='xml', short_name='x', + help='Dump output in XML format'), + ]) +# parser.add_option("-S", "--sort", metavar="SORT-BY", dest="sort_by", +# help="Adjust bug-sort criteria with comma-separated list SORT-BY. e.g. \"--sort creator,time\". Available criteria: %s" % ','.join(AVAILABLE_CMPS), default=None) +# # boolean options. All but ids and xml are special cases of long forms +# ("w", "wishlist", "List bugs with 'wishlist' severity"), +# ("A", "active", "List all active bugs"), +# ("U", "unconfirmed", "List unconfirmed bugs"), +# ("o", "open", "List open bugs"), +# ("T", "test", "List bugs in testing"), +# for s in bools: +# attr = s[1].replace('-','_') +# short = "-%c" % s[0] +# long = "--%s" % s[1] +# help = s[2] +# parser.add_option(short, long, action="store_true", +# dest=attr, help=help, default=False) +# return parser +# +# ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + writeable = bugdir.storage.writeable + bugdir.storage.writeable = False + cmp_list, status, severity, assigned, extra_strings_regexps = \ + self._parse_params(bugdir, params) + filter = Filter(status, severity, assigned, + extra_strings_regexps=extra_strings_regexps) + bugs = [bugdir.bug_from_uuid(uuid) for uuid in bugdir.uuids()] + bugs = [b for b in bugs if filter(bugdir, b) == True] + self.result = bugs + if len(bugs) == 0 and params['xml'] == False: + print >> self.stdout, 'No matching bugs found' + + # sort bugs + bugs = self._sort_bugs(bugs, cmp_list) + + # print list of bugs + if params['ids'] == True: + for bug in bugs: + print >> self.stdout, bug.id.user() + else: + self._list_bugs(bugs, xml=params['xml']) + bugdir.storage.writeable = writeable + return 0 + + def _parse_params(self, bugdir, params): + cmp_list = [] + if params['sort'] != None: + for cmp in params['sort'].sort_by.split(','): + if cmp not in AVAILABLE_CMPS: + raise libbe.command.UserError( + 'Invalid sort on "%s".\nValid sorts:\n %s' + % (cmp, '\n '.join(AVAILABLE_CMPS))) + cmp_list.append(eval('libbe.bug.cmp_%s' % cmp)) + # select status + if params['status'] == 'all': + status = libbe.bug.status_values + elif params['status'] == 'active': + status = list(libbe.bug.active_status_values) + elif params['status'] == 'inactive': + status = list(libbe.bug.inactive_status_values) + else: + status = libbe.command.util.select_values( + params['status'], libbe.bug.status_values) + # select severity + if params['severity'] == 'all': + severity = libbe.bug.severity_values + elif params['important'] == True: + serious = libbe.bug.severity_values.index('serious') + severity.append(list(libbe.bug.severity_values[serious:])) + else: + severity = libbe.command.util.select_values( + params['severity'], libbe.bug.severity_values) + # select assigned + if params['assigned'] == None: + if params['mine'] == True: + assigned = [self._get_user_id()] + else: + assigned = 'all' + else: + assigned = libbe.command.util.select_values( + params['assigned'], libbe.command.util.assignees(bugdir)) + for i in range(len(assigned)): + if assigned[i] == '-': + assigned[i] = params['user-id'] + if params['extra-strings'] == None: + extra_strings_regexps = [] + else: + extra_strings_regexps = [re.compile(x) + for x in params['extra-strings'].split(',')] + return (cmp_list, status, severity, assigned, extra_strings_regexps) + + def _sort_bugs(self, bugs, cmp_list=[]): + cmp_list.extend(libbe.bug.DEFAULT_CMP_FULL_CMP_LIST) + cmp_fn = libbe.bug.BugCompoundComparator(cmp_list=cmp_list) + bugs.sort(cmp_fn) + return bugs + + def _list_bugs(self, bugs, xml=False): + if xml == True: + print >> self.stdout, \ + '<?xml version="1.0" encoding="%s" ?>' % self.stdout.encoding + print >> self.stdout, '<be-xml>' + if len(bugs) > 0: + for bug in bugs: + if xml == True: + print >> self.stdout, bug.xml(show_comments=True) + else: + print >> self.stdout, bug.string(shortlist=True) + if xml == True: + print >> self.stdout, '</be-xml>' + + def _long_help(self): + return """ +This command lists bugs. Normally it prints a short string like + bea/576:om: Allow attachments +Where + bea/576 the bug id + o the bug status is 'open' (first letter) + m the bug severity is 'minor' (first letter) + Allo... the bug summary string + +You can optionally (-u) print only the bug ids. + +There are several criteria that you can filter by: + * status + * severity + * assigned (who the bug is assigned to) +Allowed values for each criterion may be given in a comma seperated +list. The special string "all" may be used with any of these options +to match all values of the criterion. As with the --status and +--severity options for `be depend`, starting the list with a minus +sign makes your selections a blacklist instead of the default +whitelist. + +status + %s +severity + %s +assigned + free form, with the string '-' being a shortcut for yourself. + +In addition, there are some shortcut options that set boolean flags. +The boolean options are ignored if the matching string option is used. +""" % (','.join(libbe.bug.status_values), + ','.join(libbe.bug.severity_values)) diff --git a/libbe/command/merge.py b/libbe/command/merge.py new file mode 100644 index 0000000..2dff59c --- /dev/null +++ b/libbe/command/merge.py @@ -0,0 +1,189 @@ +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import copy +import os + +import libbe +import libbe.command +import libbe.command.util + + +class Merge (libbe.command.Command): + """Merge duplicate bugs + + >>> import sys + >>> import libbe.bugdir + >>> import libbe.comment + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_bugdir(bd) + >>> cmd = Merge(ui=ui) + + >>> a = bd.bug_from_uuid('a') + >>> a.comment_root.time = 0 + >>> dummy = a.new_comment('Testing') + >>> dummy.time = 1 + >>> dummy = dummy.new_reply('Testing...') + >>> dummy.time = 2 + >>> b = bd.bug_from_uuid('b') + >>> b.status = 'open' + >>> b.comment_root.time = 0 + >>> dummy = b.new_comment('1 2') + >>> dummy.time = 1 + >>> dummy = dummy.new_reply('1 2 3 4') + >>> dummy.time = 2 + + >>> ret = ui.run(cmd, args=['/a', '/b']) + Merged bugs #abc/a# and #abc/b# + >>> bd.flush_reload() + >>> a = bd.bug_from_uuid('a') + >>> a.load_comments() + >>> a_comments = sorted([c for c in a.comments()], + ... cmp=libbe.comment.cmp_time) + >>> mergeA = a_comments[0] + >>> mergeA.time = 3 + >>> print a.string(show_comments=True) # doctest: +ELLIPSIS + ID : a + Short name : abc/a + Severity : minor + Status : open + Assigned : + Reporter : + Creator : John Doe <jdoe@example.com> + Created : ... + Bug A + --------- Comment --------- + Name: abc/a/... + From: ... + Date: ... + <BLANKLINE> + Testing + --------- Comment --------- + Name: abc/a/... + From: ... + Date: ... + <BLANKLINE> + Testing... + --------- Comment --------- + Name: abc/a/... + From: ... + Date: ... + <BLANKLINE> + Merged from bug #abc/b# + --------- Comment --------- + Name: abc/a/... + From: ... + Date: ... + <BLANKLINE> + 1 2 + --------- Comment --------- + Name: abc/a/... + From: ... + Date: ... + <BLANKLINE> + 1 2 3 4 + >>> b = bd.bug_from_uuid('b') + >>> b.load_comments() + >>> b_comments = sorted([c for c in b.comments()], + ... libbe.comment.cmp_time) + >>> mergeB = b_comments[0] + >>> mergeB.time = 3 + >>> print b.string(show_comments=True) # doctest: +ELLIPSIS + ID : b + Short name : abc/b + Severity : minor + Status : closed + Assigned : + Reporter : + Creator : Jane Doe <jdoe@example.com> + Created : ... + Bug B + --------- Comment --------- + Name: abc/b/... + From: ... + Date: ... + <BLANKLINE> + 1 2 + --------- Comment --------- + Name: abc/b/... + From: ... + Date: ... + <BLANKLINE> + 1 2 3 4 + --------- Comment --------- + Name: abc/b/... + From: ... + Date: ... + <BLANKLINE> + Merged into bug #abc/a# + >>> print b.status + closed + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'merge' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.args.extend([ + libbe.command.Argument( + name='bug-id', metavar='BUG-ID', default=None, + completion_callback=libbe.command.util.complete_bug_id), + libbe.command.Argument( + name='bug-id-to-merge', metavar='BUG-ID', default=None, + completion_callback=libbe.command.util.complete_bug_id), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + bugA,dummy_comment = \ + libbe.command.util.bug_comment_from_user_id( + bugdir, params['bug-id']) + bugA.load_comments() + bugB,dummy_comment = \ + libbe.command.util.bug_comment_from_user_id( + bugdir, params['bug-id-to-merge']) + bugB.load_comments() + mergeA = bugA.new_comment('Merged from bug #%s#' % bugB.id.long_user()) + newCommTree = copy.deepcopy(bugB.comment_root) + for comment in newCommTree.traverse(): # all descendant comments + comment.bug = bugA + # uuids must be unique in storage + if comment.alt_id == None: + comment.storage = None + comment.alt_id = comment.uuid + comment.storage = bugdir.storage + comment.uuid = libbe.util.id.uuid_gen() + comment.save() # force onto disk under bugA + + for comment in newCommTree: # just the child comments + mergeA.add_reply(comment, allow_time_inversion=True) + bugB.new_comment('Merged into bug #%s#' % bugA.id.long_user()) + bugB.status = 'closed' + print >> self.stdout, 'Merged bugs #%s# and #%s#' \ + % (bugA.id.user(), bugB.id.user()) + return 0 + + def _long_help(self): + return """ +The second bug (B) is merged into the first (A). This adds merge +comments to both bugs, closes B, and appends B's comment tree to A's +merge comment. +""" diff --git a/libbe/command/new.py b/libbe/command/new.py new file mode 100644 index 0000000..be18306 --- /dev/null +++ b/libbe/command/new.py @@ -0,0 +1,103 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.command +import libbe.command.util + + +class New (libbe.command.Command): + """Create a new bug + + >>> import os + >>> import sys + >>> import time + >>> import libbe.bugdir + >>> import libbe.util.id + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = New() + + >>> uuid_gen = libbe.util.id.uuid_gen + >>> libbe.util.id.uuid_gen = lambda: 'X' + >>> ui._user_id = u'Fran\\xe7ois' + >>> ret = ui.run(cmd, args=['this is a test',]) + Created bug with ID abc/X + >>> libbe.util.id.uuid_gen = uuid_gen + >>> bd.flush_reload() + >>> bug = bd.bug_from_uuid('X') + >>> print bug.summary + this is a test + >>> bug.creator + u'Fran\\xe7ois' + >>> bug.reporter + u'Fran\\xe7ois' + >>> bug.time <= int(time.time()) + True + >>> print bug.severity + minor + >>> print bug.status + open + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'new' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='reporter', short_name='r', + help='The user who reported the bug', + arg=libbe.command.Argument( + name='reporter', metavar='NAME')), + libbe.command.Option(name='assigned', short_name='a', + help='The developer in charge of the bug', + arg=libbe.command.Argument( + name='assigned', metavar='NAME', + completion_callback=libbe.command.util.complete_assigned)), + ]) + self.args.extend([ + libbe.command.Argument(name='summary', metavar='SUMMARY') + ]) + + def _run(self, **params): + if params['summary'] == '-': # read summary from stdin + summary = self.stdin.readline() + else: + summary = params['summary'] + bugdir = self._get_bugdir() + bug = bugdir.new_bug(summary=summary.strip()) + bug.creator = self._get_user_id() + if params['reporter'] != None: + bug.reporter = params['reporter'] + else: + bug.reporter = bug.creator + if params['assigned'] != None: + bug.assigned = params['assigned'] + print >> self.stdout, 'Created bug with ID %s' % bug.id.user() + return 0 + + def _long_help(self): + return """ +Create a new bug, with a new ID. The summary specified on the +commandline is a string (only one line) that describes the bug briefly +or "-", in which case the string will be read from stdin. +""" diff --git a/libbe/command/open.py b/libbe/command/open.py new file mode 100644 index 0000000..0c6bf05 --- /dev/null +++ b/libbe/command/open.py @@ -0,0 +1,58 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Marien Zwart <marienz@gentoo.org> +# Thomas Gerigk <tgerigk@gmx.de> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +"""Re-open a bug""" +from libbe import cmdutil, bugdir +__desc__ = __doc__ + +def execute(args, manipulate_encodings=True): + """ + >>> import os + >>> bd = bugdir.SimpleBugDir() + >>> os.chdir(bd.root) + >>> print bd.bug_from_shortname("b").status + closed + >>> execute(["b"], manipulate_encodings=False) + >>> bd._clear_bugs() + >>> print bd.bug_from_shortname("b").status + open + >>> bd.cleanup() + """ + parser = get_parser() + options, args = parser.parse_args(args) + cmdutil.default_complete(options, args, parser, + bugid_args={0: lambda bug : bug.active==False}) + if len(args) == 0: + raise cmdutil.UsageError, "Please specify a bug id." + if len(args) > 1: + raise cmdutil.UsageError, "Too many arguments." + bd = bugdir.BugDir(from_disk=True, + manipulate_encodings=manipulate_encodings) + bug = cmdutil.bug_from_shortname(bd, args[0]) + bug.status = "open" + +def get_parser(): + parser = cmdutil.CmdOptionParser("be open BUG-ID") + return parser + +longhelp=""" +Mark a bug as 'open'. +""" + +def help(): + return get_parser().help_str() + longhelp diff --git a/libbe/command/remove.py b/libbe/command/remove.py new file mode 100644 index 0000000..8d8e641 --- /dev/null +++ b/libbe/command/remove.py @@ -0,0 +1,79 @@ +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.command +import libbe.command.util + + +class Remove (libbe.command.Command): + """Remove (delete) a bug and its comments + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Remove(ui=ui) + + >>> print bd.bug_from_uuid('b').status + closed + >>> ret = ui.run(cmd, args=['/b']) + Removed bug abc/b + >>> bd.flush_reload() + >>> try: + ... bd.bug_from_uuid('b') + ... except libbe.bugdir.NoBugMatches: + ... print 'Bug not found' + Bug not found + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'remove' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.args.extend([ + libbe.command.Argument( + name='bug-id', metavar='BUG-ID', default=None, + repeatable=True, + completion_callback=libbe.command.util.complete_bug_id), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + user_ids = [] + for bug_id in params['bug-id']: + bug,dummy_comment = libbe.command.util.bug_comment_from_user_id( + bugdir, bug_id) + user_ids.append(bug.id.user()) + bugdir.remove_bug(bug) + if len(user_ids) == 1: + print >> self.stdout, 'Removed bug %s' % user_ids[0] + else: + print >> self.stdout, 'Removed bugs %s' % ', '.join(user_ids) + return 0 + + def _long_help(self): + return """ +Remove (delete) existing bugs. Use with caution: if you're not using +a revision control system, there may be no way to recover the lost +information. You should use this command, for example, to get rid of +blank or otherwise mangled bugs. +""" diff --git a/libbe/command/serve.py b/libbe/command/serve.py new file mode 100644 index 0000000..7237343 --- /dev/null +++ b/libbe/command/serve.py @@ -0,0 +1,1172 @@ +# Copyright (C) 2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Define the :class:`Serve` serving BE Storage over HTTP. + +See Also +-------- +:mod:`libbe.storage.http` : the associated client +""" + +import hashlib +import logging +import os.path +import posixpath +import re +import sys +import time +import traceback +import types +import urllib +import wsgiref.simple_server +try: + # Python >= 2.6 + from urlparse import parse_qs +except ImportError: + # Python <= 2.5 + from cgi import parse_qs +try: + import cherrypy + import cherrypy.wsgiserver +except ImportError: + cherrypy = None +if cherrypy != None: + try: # CherryPy >= 3.2 + import cherrypy.wsgiserver.ssl_builtin + except ImportError: # CherryPy <= 3.1.X + cherrypy.wsgiserver.ssl_builtin = None +try: + import OpenSSL +except ImportError: + OpenSSL = None + +import libbe +import libbe.command +import libbe.command.util +import libbe.util.encoding +import libbe.version + +if libbe.TESTING == True: + import copy + import doctest + import StringIO + import unittest + import wsgiref.validate + try: + import cherrypy.test.webtest + cherrypy_test_webtest = True + except ImportError: + cherrypy_test_webtest = None + + import libbe.bugdir + +class _HandlerError (Exception): + def __init__(self, code, msg, headers=[]): + Exception.__init__(self, '%d %s' % (code, msg)) + self.code = code + self.msg = msg + self.headers = headers + +class _Unauthenticated (_HandlerError): + def __init__(self, realm, msg='User Not Authenticated', headers=[]): + _HandlerError.__init__(self, 401, msg, headers+[ + ('WWW-Authenticate','Basic realm="%s"' % realm)]) + +class _Unauthorized (_HandlerError): + def __init__(self, msg='User Not Authorized', headers=[]): + _HandlerError.__init__(self, 403, msg, headers) + +class User (object): + def __init__(self, uname=None, name=None, passhash=None, password=None): + self.uname = uname + self.name = name + self.passhash = passhash + if passhash == None: + if password != None: + self.passhash = self.hash(password) + else: + assert password == None, \ + 'Redundant password %s with passhash %s' % (password, passhash) + self.users = None + def from_string(self, string): + string = string.strip() + fields = string.split(':') + if len(fields) != 3: + raise ValueError, '%d!=3 fields in "%s"' % (len(fields), string) + self.uname,self.name,self.passhash = fields + def __str__(self): + return ':'.join([self.uname, self.name, self.passhash]) + def __cmp__(self, other): + return cmp(self.uname, other.uname) + def hash(self, password): + return hashlib.sha1(password).hexdigest() + def valid_login(self, password): + if self.hash(password) == self.passhash: + return True + return False + def set_name(self, name): + self._set_property('name', name) + def set_password(self, password): + self._set_property('passhash', self.hash(password)) + def _set_property(self, property, value): + if self.uname == 'guest': + raise _Unauthorized('guest user not allowed to change %s' % property) + if getattr(self, property) != value \ + and self.users != None: + self.users.changed = True + setattr(self, property, value) + +class Users (dict): + def __init__(self, filename=None): + dict.__init__(self) + self.filename = filename + self.changed = False + def load(self): + if self.filename == None: + return + user_file = libbe.util.encoding.get_file_contents( + self.filename, decode=True) + self.clear() + for line in user_file.splitlines(): + user = User() + user.from_string(line) + self.add_user(user) + def save(self): + if self.filename != None and self.changed == True: + lines = [] + for user in sorted(self.users): + lines.append(str(user)) + libbe.util.encoding.set_file_contents(self.filename) + self.changed = False + def add_user(self, user): + assert user.users == None, user.users + user.users = self + self[user.uname] = user + def valid_login(self, uname, password): + if uname in self and \ + self[uname].valid_login(password) == True: + return True + return False + +class WSGI_Object (object): + """Utility class for WGSI clients and middleware. + + For details on WGSI, see `PEP 333`_ + + .. _PEP 333: http://www.python.org/dev/peps/pep-0333/ + """ + def __init__(self, logger=None, log_level=logging.INFO, log_format=None): + self.logger = logger + self.log_level = log_level + if log_format == None: + self.log_format = ( + '%(REMOTE_ADDR)s - %(REMOTE_USER)s [%(time)s] ' + '"%(REQUEST_METHOD)s %(REQUEST_URI)s %(HTTP_VERSION)s" ' + '%(status)s %(bytes)s "%(HTTP_REFERER)s" "%(HTTP_USER_AGENT)s"') + else: + self.log_format = log_format + + def __call__(self, environ, start_response): + """The main WSGI entry point.""" + raise NotImplementedError + # start_response() is a callback for setting response headers + # start_response(status, response_headers, exc_info=None) + # status is an HTTP status string (e.g., "200 OK"). + # response_headers is a list of 2-tuples, the HTTP headers in + # key-value format. + # exc_info is used in exception handling. + # + # The application function then returns an iterable of body chunks. + + def error(self, environ, start_response, error, message, headers=[]): + """Make it easy to call start_response for errors.""" + response = '%d %s' % (error, message) + self.log_request(environ, status=response, bytes=len(message)) + start_response(response, + [('Content-Type', 'text/plain')]+headers) + return [message] + + def log_request(self, environ, status='-1 OK', bytes=-1): + if self.logger == None: + return + req_uri = urllib.quote(environ.get('SCRIPT_NAME', '') + + environ.get('PATH_INFO', '')) + if environ.get('QUERY_STRING'): + req_uri += '?'+environ['QUERY_STRING'] + start = time.localtime() + if time.daylight: + offset = time.altzone / 60 / 60 * -100 + else: + offset = time.timezone / 60 / 60 * -100 + if offset >= 0: + offset = "+%0.4d" % (offset) + elif offset < 0: + offset = "%0.4d" % (offset) + d = { + 'REMOTE_ADDR': environ.get('REMOTE_ADDR') or '-', + 'REMOTE_USER': environ.get('REMOTE_USER') or '-', + 'REQUEST_METHOD': environ['REQUEST_METHOD'], + 'REQUEST_URI': req_uri, + 'HTTP_VERSION': environ.get('SERVER_PROTOCOL'), + 'time': time.strftime('%d/%b/%Y:%H:%M:%S ', start) + offset, + 'status': status.split(None, 1)[0], + 'bytes': bytes, + 'HTTP_REFERER': environ.get('HTTP_REFERER', '-'), + 'HTTP_USER_AGENT': environ.get('HTTP_USER_AGENT', '-'), + } + self.logger.log(self.log_level, self.log_format % d) + +class ExceptionApp (WSGI_Object): + """Some servers (e.g. cherrypy) eat app-raised exceptions. + + Work around that by logging tracebacks by hand. + """ + def __init__(self, app, *args, **kwargs): + WSGI_Object.__init__(self, *args, **kwargs) + self.app = app + + def __call__(self, environ, start_response): + if self.logger != None: + self.logger.log(logging.DEBUG, 'ExceptionApp') + try: + return self.app(environ, start_response) + except Exception, e: + etype,value,tb = sys.exc_info() + trace = ''.join( + traceback.format_exception(etype, value, tb, None)) + self.logger.log(self.log_level, trace) + raise + +class UppercaseHeaderApp (WSGI_Object): + """WSGI middleware that uppercases incoming HTTP headers. + + From PEP 333, `The start_response() Callable`_ : + + A reminder for server/gateway authors: HTTP + header names are case-insensitive, so be sure + to take that into consideration when examining + application-supplied headers! + + .. _The start_response() Callable: + http://www.python.org/dev/peps/pep-0333/#id20 + """ + def __init__(self, app, *args, **kwargs): + WSGI_Object.__init__(self, *args, **kwargs) + self.app = app + + def __call__(self, environ, start_response): + if self.logger != None: + self.logger.log(logging.DEBUG, 'UppercaseHeaderApp') + for key,value in environ.items(): + if key.startswith('HTTP_'): + uppercase = key.upper() + if uppercase != key: + environ[uppercase] = environ.pop(key) + return self.app(environ, start_response) + +class AuthenticationApp (WSGI_Object): + """WSGI middleware for handling user authentication. + """ + def __init__(self, app, realm, setting='be-auth', users=None, *args, **kwargs): + WSGI_Object.__init__(self, *args, **kwargs) + self.app = app + self.realm = realm + self.setting = setting + self.users = users + + def __call__(self, environ, start_response): + if self.logger != None: + self.logger.log(logging.DEBUG, 'AuthenticationApp') + environ['%s.realm' % self.setting] = self.realm + try: + username = self.authenticate(environ) + environ['%s.user' % self.setting] = username + environ['%s.user.name' % self.setting] = \ + self.users[username].name + return self.app(environ, start_response) + except _Unauthorized, e: + return self.error(environ, start_response, + e.code, e.msg, e.headers) + + def authenticate(self, environ): + """Handle user-authentication sent in the "Authorization" header. + + This function implements ``Basic`` authentication as described in + HTTP/1.0 specification [1]_ . Do not use this module unless you + are using SSL, as it transmits unencrypted passwords. + + .. [1] http://www.w3.org/Protocols/HTTP/1.0/draft-ietf-http-spec.html#BasicAA + + Examples + -------- + + >>> users = Users() + >>> users.add_user(User('Aladdin', 'Big Al', password='open sesame')) + >>> app = AuthenticationApp(app=None, realm='Dummy Realm', users=users) + >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='}) + 'Aladdin' + >>> app.authenticate({'HTTP_AUTHORIZATION':'Basic AAAAAAAAAAAAAAAAAAAAAAAAAA=='}) + + Notes + ----- + + Code based on authkit/authenticate/basic.py + (c) 2005 Clark C. Evans. + Released under the MIT License: + http://www.opensource.org/licenses/mit-license.php + """ + authorization = environ.get('HTTP_AUTHORIZATION', None) + if authorization == None: + raise _Unauthorized('Authorization required') + try: + authmeth,auth = authorization.split(' ',1) + except ValueError: + return None + if 'basic' != authmeth.lower(): + return None # non-basic HTTP authorization not implemented + auth = auth.strip().decode('base64') + try: + username,password = auth.split(':',1) + except ValueError: + return None + if self.authfunc(environ, username, password) == True: + return username + + def authfunc(self, environ, username, password): + if not username in self.users: + return False + if self.users[username].valid_login(password) == True: + if self.logger != None: + self.logger.log(self.log_level, + 'Authenticated %s' % self.users[username].name) + return True + return False + +class WSGI_AppObject (WSGI_Object): + """Useful WSGI utilities for handling data (POST, QUERY) and + returning responses. + """ + def __init__(self, *args, **kwargs): + WSGI_Object.__init__(self, *args, **kwargs) + + # Maximum input we will accept when REQUEST_METHOD is POST + # 0 ==> unlimited input + self.maxlen = 0 + + def ok_response(self, environ, start_response, content, + content_type='application/octet-stream', + headers=[]): + if content == None: + start_response('200 OK', []) + return [] + if type(content) == types.UnicodeType: + content = content.encode('utf-8') + for i,header in enumerate(headers): + header_name,header_value = header + if type(header_value) == types.UnicodeType: + headers[i] = (header_name, header_value.encode('ISO-8859-1')) + response = '200 OK' + content_length = len(content) + self.log_request(environ, status=response, bytes=content_length) + start_response('200 OK', [ + ('Content-Type', content_type), + ('Content-Length', str(content_length)), + ]+headers) + if self.is_head(environ) == True: + return [] + return [content] + + def query_data(self, environ): + if not environ['REQUEST_METHOD'] in ['GET', 'HEAD']: + raise _HandlerError(404, 'Not Found') + return self._parse_query(environ.get('QUERY_STRING', '')) + + def _parse_query(self, query): + if len(query) == 0: + return {} + data = parse_qs( + query, keep_blank_values=True, strict_parsing=True) + for k,v in data.items(): + if len(v) == 1: + data[k] = v[0] + return data + + def post_data(self, environ): + if environ['REQUEST_METHOD'] != 'POST': + raise _HandlerError(404, 'Not Found') + post_data = self._read_post_data(environ) + return self._parse_post(post_data) + + def _parse_post(self, post): + return self._parse_query(post) + + def _read_post_data(self, environ): + try: + clen = int(environ.get('CONTENT_LENGTH', '0')) + except ValueError: + clen = 0 + if clen != 0: + if self.maxlen > 0 and clen > self.maxlen: + raise ValueError, 'Maximum content length exceeded' + return environ['wsgi.input'].read(clen) + return '' + + def data_get_string(self, data, key, default=None, source='query'): + if not key in data or data[key] in [None, 'None']: + if default == _HandlerError: + raise _HandlerError(406, 'Missing %s key %s' % (source, key)) + return default + return data[key] + + def data_get_id(self, data, key='id', default=_HandlerError, + source='query'): + return self.data_get_string(data, key, default, source) + + def data_get_boolean(self, data, key, default=False, source='query'): + val = self.data_get_string(data, key, default, source) + if val == 'True': + return True + elif val == 'False': + return False + return val + + def is_head(self, environ): + return environ['REQUEST_METHOD'] == 'HEAD' + + +class AdminApp (WSGI_AppObject): + """WSGI middleware for managing users (changing passwords, + usernames, etc.). + """ + def __init__(self, app, users=None, url=r'^admin/?', *args, **kwargs): + WSGI_AppObject.__init__(self, *args, **kwargs) + self.app = app + self.users = users + self.url = url + + def __call__(self, environ, start_response): + if self.logger != None: + self.logger.log(logging.DEBUG, 'AdminApp') + path = environ.get('PATH_INFO', '').lstrip('/') + match = re.search(self.url, path) + if match is not None: + return self.admin(environ, start_response) + return self.app(environ, start_response) + + def admin(self, environ, start_response): + if not 'be-auth.user' in environ: + raise _Unauthenticated(realm=envirion.get('be-auth.realm')) + uname = environ.get('be-auth.user') + user = self.users[uname] + data = self.post_data(environ) + source = 'post' + name = self.data_get_string( + data, 'name', default=None, source=source) + if name != None: + self.users[uname].set_name(name) + password = self.data_get_string( + data, 'password', default=None, source=source) + if password != None: + self.users[uname].set_password(password) + self.users.save() + return self.ok_response(environ, start_response, None) + +class ServerApp (WSGI_AppObject): + """WSGI server for a BE Storage instance over HTTP. + + RESTful_ WSGI request handler for serving the + libbe.storage.http.HTTP backend with GET, POST, and HEAD commands. + For more information on authentication and REST, see John + Calcote's `Open Sourcery article`_ + + .. _RESTful: http://www.ics.uci.edu/~fielding/pubs/dissertation/rest_arch_style.htm + .. _Open Sourcery article: http://jcalcote.wordpress.com/2009/08/10/restful-authentication/ + + This serves files from a connected storage instance, usually + a VCS-based repository located on the local machine. + + Notes + ----- + + The GET and HEAD requests are identical except that the HEAD + request omits the actual content of the file. + """ + server_version = "BE-server/" + libbe.version.version() + + def __init__(self, storage, *args, **kwargs): + WSGI_AppObject.__init__(self, *args, **kwargs) + self.storage = storage + self.http_user_error = 418 + + self.urls = [ + (r'^add/?', self.add), + (r'^exists/?', self.exists), + (r'^remove/?', self.remove), + (r'^ancestors/?', self.ancestors), + (r'^children/?', self.children), + (r'^get/(.+)', self.get), + (r'^set/(.+)', self.set), + (r'^commit/?', self.commit), + (r'^revision-id/?', self.revision_id), + (r'^changed/?', self.changed), + (r'^version/?', self.version), + ] + + def __call__(self, environ, start_response): + """The main WSGI application. + + Dispatch the current request to the functions from above and + store the regular expression captures in the WSGI environment + as `be-server.url_args` so that the functions from above can + access the url placeholders. + + URL dispatcher from Armin Ronacher's "Getting Started with WSGI" + http://lucumr.pocoo.org/2007/5/21/getting-started-with-wsgi + """ + if self.logger != None: + self.logger.log(logging.DEBUG, 'ServerApp') + path = environ.get('PATH_INFO', '').lstrip('/') + try: + for regex, callback in self.urls: + match = re.search(regex, path) + if match is not None: + environ['be-server.url_args'] = match.groups() + try: + return callback(environ, start_response) + except libbe.storage.NotReadable, e: + raise _HandlerError(403, 'Read permission denied') + except libbe.storage.NotWriteable, e: + raise _HandlerError(403, 'Write permission denied') + except libbe.storage.InvalidID, e: + raise _HandlerError( + self.http_user_error, 'InvalidID %s' % e) + raise _HandlerError(404, 'Not Found') + except _HandlerError, e: + return self.error(environ, start_response, + e.code, e.msg, e.headers) + + # handlers + def add(self, environ, start_response): + self.check_login(environ) + data = self.post_data(environ) + source = 'post' + id = self.data_get_id(data, source=source) + parent = self.data_get_string( + data, 'parent', default=None, source=source) + directory = self.data_get_boolean( + data, 'directory', default=False, source=source) + self.storage.add(id, parent=parent, directory=directory) + return self.ok_response(environ, start_response, None) + + def exists(self, environ, start_response): + self.check_login(environ) + data = self.query_data(environ) + source = 'query' + id = self.data_get_id(data, source=source) + revision = self.data_get_string( + data, 'revision', default=None, source=source) + content = str(self.storage.exists(id, revision)) + return self.ok_response(environ, start_response, content) + + def remove(self, environ, start_response): + self.check_login(environ) + data = self.post_data(environ) + source = 'post' + id = self.data_get_id(data, source=source) + recursive = self.data_get_boolean( + data, 'recursive', default=False, source=source) + if recursive == True: + self.storage.recursive_remove(id) + else: + self.storage.remove(id) + return self.ok_response(environ, start_response, None) + + def ancestors(self, environ, start_response): + self.check_login(environ) + data = self.query_data(environ) + source = 'query' + id = self.data_get_id(data, source=source) + revision = self.data_get_string( + data, 'revision', default=None, source=source) + content = '\n'.join(self.storage.ancestors(id, revision))+'\n' + return self.ok_response(environ, start_response, content) + + def children(self, environ, start_response): + self.check_login(environ) + data = self.query_data(environ) + source = 'query' + id = self.data_get_id(data, default=None, source=source) + revision = self.data_get_string( + data, 'revision', default=None, source=source) + content = '\n'.join(self.storage.children(id, revision)) + return self.ok_response(environ, start_response, content) + + def get(self, environ, start_response): + self.check_login(environ) + data = self.query_data(environ) + source = 'query' + try: + id = environ['be-server.url_args'][0] + except: + raise _HandlerError(404, 'Not Found') + revision = self.data_get_string( + data, 'revision', default=None, source=source) + content = self.storage.get(id, revision=revision) + be_version = self.storage.storage_version(revision) + return self.ok_response(environ, start_response, content, + headers=[('X-BE-Version', be_version)]) + + def set(self, environ, start_response): + self.check_login(environ) + data = self.post_data(environ) + try: + id = environ['be-server.url_args'][0] + except: + raise _HandlerError(404, 'Not Found') + if not 'value' in data: + raise _HandlerError(406, 'Missing query key value') + value = data['value'] + self.storage.set(id, value) + return self.ok_response(environ, start_response, None) + + def commit(self, environ, start_response): + self.check_login(environ) + data = self.post_data(environ) + if not 'summary' in data: + raise _HandlerError(406, 'Missing query key summary') + summary = data['summary'] + if not 'body' in data or data['body'] == 'None': + data['body'] = None + body = data['body'] + if not 'allow_empty' in data \ + or data['allow_empty'] == 'True': + allow_empty = True + else: + allow_empty = False + try: + revision = self.storage.commit(summary, body, allow_empty) + except libbe.storage.EmptyCommit, e: + raise _HandlerError(self.http_user_error, 'EmptyCommit') + return self.ok_response(environ, start_response, revision) + + def revision_id(self, environ, start_response): + self.check_login(environ) + data = self.query_data(environ) + source = 'query' + index = int(self.data_get_string( + data, 'index', default=_HandlerError, source=source)) + content = self.storage.revision_id(index) + return self.ok_response(environ, start_response, content) + + def changed(self, environ, start_response): + self.check_login(environ) + data = self.query_data(environ) + source = 'query' + revision = self.data_get_string( + data, 'revision', default=None, source=source) + add,mod,rem = self.storage.changed(revision) + content = '\n\n'.join(['\n'.join(p) for p in (add,mod,rem)]) + return self.ok_response(environ, start_response, content) + + def version(self, environ, start_response): + self.check_login(environ) + data = self.query_data(environ) + source = 'query' + revision = self.data_get_string( + data, 'revision', default=None, source=source) + content = self.storage.storage_version(revision) + return self.ok_response(environ, start_response, content) + + # handler utility functions + def check_login(self, environ): + user = environ.get('be-auth.user', None) + if user != None: # we're running under AuthenticationApp + if environ['REQUEST_METHOD'] == 'POST': + if user == 'guest' or self.storage.is_writeable() == False: + raise _Unauthorized() # only non-guests allowed to write + # allow read-only commands for all users + + +class Serve (libbe.command.Command): + """:class:`~libbe.command.base.Command` wrapper around + :class:`ServerApp`. + """ + + name = 'serve' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='port', + help='Bind server to port (%default)', + arg=libbe.command.Argument( + name='port', metavar='INT', type='int', default=8000)), + libbe.command.Option(name='host', + help='Set host string (blank for localhost, %default)', + arg=libbe.command.Argument( + name='host', metavar='HOST', default='')), + libbe.command.Option(name='read-only', short_name='r', + help='Dissable operations that require writing'), + libbe.command.Option(name='ssl', short_name='s', + help='Use CherryPy to serve HTTPS (HTTP over SSL/TLS)'), + libbe.command.Option(name='auth', short_name='a', + help='Require authentication. FILE should be a file containing colon-separated UNAME:USER:sha1(PASSWORD) lines, for example: "jdoe:John Doe <jdoe@example.com>:read:d99f8e5a4b02dc25f49da2ea67c0034f61779e72"', + arg=libbe.command.Argument( + name='auth', metavar='FILE', default=None, + completion_callback=libbe.command.util.complete_path)), + ]) + + def _run(self, **params): + self._setup_logging() + storage = self._get_storage() + if params['read-only'] == True: + writeable = storage.writeable + storage.writeable = False + if params['host'] == '': + params['host'] = 'localhost' + if params['auth'] != None: + self._check_restricted_access(storage, params['auth']) + users = Users(params['auth']) + users.load() + app = ServerApp(storage=storage, logger=self.logger) + if params['auth'] != None: + app = AdminApp(app, users=users, logger=self.logger) + app = AuthenticationApp(app, realm=storage.repo, + users=users, logger=self.logger) + app = UppercaseHeaderApp(app, logger=self.logger) + server,details = self._get_server(params, app) + details['repo'] = storage.repo + try: + self._start_server(params, server, details) + except KeyboardInterrupt: + pass + self._stop_server(params, server) + if params['read-only'] == True: + storage.writeable = writeable + + def _setup_logging(self, log_level=logging.INFO): + self.logger = logging.getLogger('be-serve') + self.log_level = logging.INFO + console = logging.StreamHandler(self.stdout) + console.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(console) + self.logger.propagate = False + if log_level is not None: + console.setLevel(log_level) + self.logger.setLevel(log_level) + + def _get_server(self, params, app): + details = {'port':params['port']} + if params['ssl'] == True: + details['protocol'] = 'HTTPS' + if cherrypy == None: + raise libbe.command.UserError, \ + '--ssl requires the cherrypy module' + app = ExceptionApp(app, logger=self.logger) + server = cherrypy.wsgiserver.CherryPyWSGIServer( + (params['host'], params['port']), app) + #server.throw_errors = True + #server.show_tracebacks = True + private_key,certificate = get_cert_filenames( + 'be-server', logger=self.logger) + if cherrypy.wsgiserver.ssl_builtin == None: + server.ssl_module = 'builtin' + server.ssl_private_key = private_key + server.ssl_certificate = certificate + else: + server.ssl_adapter = \ + cherrypy.wsgiserver.ssl_builtin.BuiltinSSLAdapter( + certificate=certificate, private_key=private_key) + details['socket-name'] = params['host'] + else: + details['protocol'] = 'HTTP' + server = wsgiref.simple_server.make_server( + params['host'], params['port'], app) + details['socket-name'] = server.socket.getsockname()[0] + return (server, details) + + def _start_server(self, params, server, details): + self.logger.log(self.log_level, + 'Serving %(protocol)s on %(socket-name)s port %(port)s ...' \ + % details) + self.logger.log(self.log_level, + 'BE repository %(repo)s' % details) + if params['ssl'] == True: + server.start() + else: + server.serve_forever() + + def _stop_server(self, params, server): + self.logger.log(self.log_level, 'Clossing server') + if params['ssl'] == True: + server.stop() + else: + server.server_close() + + def _long_help(self): + return """ +Example usage:: + + $ be serve + +And in another terminal (or after backgrounding the server):: + + $ be --repo http://localhost:8000/ list + +If you bind your server to a public interface, take a look at the +``--read-only`` option or the combined ``--ssl --auth FILE`` +options so other people can't mess with your repository. If you do use +authentication, you'll need to send in your username and password with, +for example:: + + $ be --repo http://username:password@localhost:8000/ list +""" + +def random_string(length=256): + if os.path.exists(os.path.join('dev', 'urandom')): + return open("/dev/urandom").read(length) + else: + import array + from random import randint + d = array.array('B') + for i in xrange(1000000): + d.append(randint(0,255)) + return d.tostring() + +if libbe.TESTING == True: + class WSGITestCase (unittest.TestCase): + def setUp(self): + self.logstream = StringIO.StringIO() + self.logger = logging.getLogger('be-serve-test') + console = logging.StreamHandler(self.logstream) + console.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(console) + self.logger.propagate = False + console.setLevel(logging.INFO) + self.logger.setLevel(logging.INFO) + self.default_environ = { # required by PEP 333 + 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD' + 'SCRIPT_NAME':'', + 'PATH_INFO': '', + #'QUERY_STRING':'', # may be empty or absent + #'CONTENT_TYPE':'', # may be empty or absent + #'CONTENT_LENGTH':'', # may be empty or absent + 'SERVER_NAME':'example.com', + 'SERVER_PORT':'80', + 'SERVER_PROTOCOL':'HTTP/1.1', + 'wsgi.version':(1,0), + 'wsgi.url_scheme':'http', + 'wsgi.input':StringIO.StringIO(), + 'wsgi.errors':StringIO.StringIO(), + 'wsgi.multithread':False, + 'wsgi.multiprocess':False, + 'wsgi.run_once':False, + } + def getURL(self, app, path='/', method='GET', data=None, + scheme='http', environ={}): + env = copy.copy(self.default_environ) + env['PATH_INFO'] = path + env['REQUEST_METHOD'] = method + env['scheme'] = scheme + if data != None: + enc_data = urllib.urlencode(data) + if method == 'POST': + env['CONTENT_LENGTH'] = len(enc_data) + env['wsgi.input'] = StringIO.StringIO(enc_data) + else: + assert method in ['GET', 'HEAD'], method + env['QUERY_STRING'] = enc_data + for key,value in environ.items(): + env[key] = value + return ''.join(app(env, self.start_response)) + def start_response(self, status, response_headers, exc_info=None): + self.status = status + self.response_headers = response_headers + self.exc_info = exc_info + + class WSGI_ObjectTestCase (WSGITestCase): + def setUp(self): + WSGITestCase.setUp(self) + self.app = WSGI_Object(self.logger) + def test_error(self): + contents = self.app.error( + environ=self.default_environ, + start_response=self.start_response, + error=123, + message='Dummy Error', + headers=[('X-Dummy-Header','Dummy Value')]) + self.failUnless(contents == ['Dummy Error'], contents) + self.failUnless(self.status == '123 Dummy Error', self.status) + self.failUnless(self.response_headers == [ + ('Content-Type','text/plain'), + ('X-Dummy-Header','Dummy Value')], + self.response_headers) + self.failUnless(self.exc_info == None, self.exc_info) + def test_log_request(self): + self.app.log_request( + environ=self.default_environ, status='-1 OK', bytes=123) + log = self.logstream.getvalue() + self.failUnless(log.startswith('- -'), log) + + class ExceptionAppTestCase (WSGITestCase): + def setUp(self): + WSGITestCase.setUp(self) + def child_app(environ, start_response): + raise ValueError('Dummy Error') + self.app = ExceptionApp(child_app, self.logger) + def test_traceback(self): + try: + self.getURL(self.app) + except ValueError, e: + pass + log = self.logstream.getvalue() + self.failUnless(log.startswith('Traceback'), log) + self.failUnless('child_app' in log, log) + self.failUnless('ValueError: Dummy Error' in log, log) + + class AdminAppTestCase (WSGITestCase): + def setUp(self): + WSGITestCase.setUp(self) + self.users = Users() + self.users.add_user( + User('Aladdin', 'Big Al', password='open sesame')) + self.users.add_user( + User('guest', 'Guest', password='guestpass')) + def child_app(environ, start_response): + pass + self.app = AdminApp( + child_app, users=self.users, logger=self.logger) + self.app = AuthenticationApp( + self.app, realm='Dummy Realm', users=self.users, + logger=self.logger) + self.app = UppercaseHeaderApp(self.app, logger=self.logger) + def basic_auth(self, uname, password): + """HTTP basic authorization string""" + return 'Basic %s' % \ + ('%s:%s' % (uname, password)).encode('base64') + def test_new_name(self): + self.getURL( + self.app, '/admin/', method='POST', + data={'name':'Prince Al'}, + environ={'HTTP_Authorization': + self.basic_auth('Aladdin', 'open sesame')}) + self.failUnless(self.status == '200 OK', self.status) + self.failUnless(self.response_headers == [], + self.response_headers) + self.failUnless(self.exc_info == None, self.exc_info) + self.failUnless(self.users['Aladdin'].name == 'Prince Al', + self.users['Aladdin'].name) + self.failUnless(self.users.changed == True, + self.users.changed) + def test_new_password(self): + self.getURL( + self.app, '/admin/', method='POST', + data={'password':'New Pass'}, + environ={'HTTP_Authorization': + self.basic_auth('Aladdin', 'open sesame')}) + self.failUnless(self.status == '200 OK', self.status) + self.failUnless(self.response_headers == [], + self.response_headers) + self.failUnless(self.exc_info == None, self.exc_info) + self.failUnless(self.users['Aladdin'].passhash == \ + self.users['Aladdin'].hash('New Pass'), + self.users['Aladdin'].passhash) + self.failUnless(self.users.changed == True, + self.users.changed) + def test_guest_name(self): + self.getURL( + self.app, '/admin/', method='POST', + data={'name':'SPAM'}, + environ={'HTTP_Authorization': + self.basic_auth('guest', 'guestpass')}) + self.failUnless(self.status.startswith('403 '), self.status) + self.failUnless(self.response_headers == [ + ('Content-Type', 'text/plain')], + self.response_headers) + self.failUnless(self.exc_info == None, self.exc_info) + self.failUnless(self.users['guest'].name == 'Guest', + self.users['guest'].name) + self.failUnless(self.users.changed == False, + self.users.changed) + def test_guest_password(self): + self.getURL( + self.app, '/admin/', method='POST', + data={'password':'SPAM'}, + environ={'HTTP_Authorization': + self.basic_auth('guest', 'guestpass')}) + self.failUnless(self.status.startswith('403 '), self.status) + self.failUnless(self.response_headers == [ + ('Content-Type', 'text/plain')], + self.response_headers) + self.failUnless(self.exc_info == None, self.exc_info) + self.failUnless(self.users['guest'].name == 'Guest', + self.users['guest'].name) + self.failUnless(self.users.changed == False, + self.users.changed) + + class ServerAppTestCase (WSGITestCase): + def setUp(self): + WSGITestCase.setUp(self) + self.bd = libbe.bugdir.SimpleBugDir(memory=False) + self.app = ServerApp(self.bd.storage, logger=self.logger) + def tearDown(self): + self.bd.cleanup() + WSGITestCase.tearDown(self) + def test_add_get(self): + self.getURL(self.app, '/add/', method='GET') + self.failUnless(self.status.startswith('404 '), self.status) + self.failUnless(self.response_headers == [ + ('Content-Type', 'text/plain')], + self.response_headers) + self.failUnless(self.exc_info == None, self.exc_info) + def test_add_post(self): + self.getURL(self.app, '/add/', method='POST', + data={'id':'123456', 'parent':'abc123', + 'directory':'True'}) + self.failUnless(self.status == '200 OK', self.status) + self.failUnless(self.response_headers == [], + self.response_headers) + self.failUnless(self.exc_info == None, self.exc_info) + # Note: other methods tested in libbe.storage.http + + # TODO: integration tests on Serve? + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) + + +# The following certificate-creation code is adapted From pyOpenSSL's +# examples. + +def get_cert_filenames(server_name, autogenerate=True, logger=None): + """ + Generate private key and certification filenames. + get_cert_filenames(server_name) -> (pkey_filename, cert_filename) + """ + pkey_file = '%s.pkey' % server_name + cert_file = '%s.cert' % server_name + if autogenerate == True: + for file in [pkey_file, cert_file]: + if not os.path.exists(file): + make_certs(server_name, logger) + return (pkey_file, cert_file) + +def createKeyPair(type, bits): + """Create a public/private key pair. + + Returns the public/private key pair in a PKey object. + + Parameters + ---------- + type : TYPE_RSA or TYPE_DSA + Key type. + bits : int + Number of bits to use in the key. + """ + pkey = OpenSSL.crypto.PKey() + pkey.generate_key(type, bits) + return pkey + +def createCertRequest(pkey, digest="md5", **name): + """Create a certificate request. + + Returns the certificate request in an X509Req object. + + Parameters + ---------- + pkey : PKey + The key to associate with the request. + digest : "md5" or ? + Digestion method to use for signing, default is "md5", + `**name` : + The name of the subject of the request, possible. + Arguments are: + + ============ ======================== + C Country name + ST State or province name + L Locality name + O Organization name + OU Organizational unit name + CN Common name + emailAddress E-mail address + ============ ======================== + """ + req = OpenSSL.crypto.X509Req() + subj = req.get_subject() + + for (key,value) in name.items(): + setattr(subj, key, value) + + req.set_pubkey(pkey) + req.sign(pkey, digest) + return req + +def createCertificate(req, (issuerCert, issuerKey), serial, (notBefore, notAfter), digest="md5"): + """Generate a certificate given a certificate request. + + Returns the signed certificate in an X509 object. + + Parameters + ---------- + req : + Certificate reqeust to use + issuerCert : + The certificate of the issuer + issuerKey : + The private key of the issuer + serial : + Serial number for the certificate + notBefore : + Timestamp (relative to now) when the certificate + starts being valid + notAfter : + Timestamp (relative to now) when the certificate + stops being valid + digest : + Digest method to use for signing, default is md5 + """ + cert = OpenSSL.crypto.X509() + cert.set_serial_number(serial) + cert.gmtime_adj_notBefore(notBefore) + cert.gmtime_adj_notAfter(notAfter) + cert.set_issuer(issuerCert.get_subject()) + cert.set_subject(req.get_subject()) + cert.set_pubkey(req.get_pubkey()) + cert.sign(issuerKey, digest) + return cert + +def make_certs(server_name, logger=None) : + """Generate private key and certification files. + + `mk_certs(server_name) -> (pkey_filename, cert_filename)` + """ + if OpenSSL == None: + raise libbe.command.UserError, \ + 'SSL certificate generation requires the OpenSSL module' + pkey_file,cert_file = get_cert_filenames( + server_name, autogenerate=False) + if logger != None: + logger.log(logger._server_level, + 'Generating certificates', pkey_file, cert_file) + cakey = createKeyPair(OpenSSL.crypto.TYPE_RSA, 1024) + careq = createCertRequest(cakey, CN='Certificate Authority') + cacert = createCertificate( + careq, (careq, cakey), 0, (0, 60*60*24*365*5)) # five years + open(pkey_file, 'w').write(OpenSSL.crypto.dump_privatekey( + OpenSSL.crypto.FILETYPE_PEM, cakey)) + open(cert_file, 'w').write(OpenSSL.crypto.dump_certificate( + OpenSSL.crypto.FILETYPE_PEM, cacert)) diff --git a/libbe/command/set.py b/libbe/command/set.py new file mode 100644 index 0000000..720dd0f --- /dev/null +++ b/libbe/command/set.py @@ -0,0 +1,144 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# Marien Zwart <marienz@gentoo.org> +# Thomas Gerigk <tgerigk@gmx.de> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import textwrap + +import libbe +import libbe.bugdir +import libbe.command +import libbe.command.util +from libbe.storage.util.settings_object import EMPTY + + +class Set (libbe.command.Command): + """Change bug directory settings + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Set(ui=ui) + + >>> ret = ui.run(cmd, args=['target']) + None + >>> ret = ui.run(cmd, args=['target', 'abcdefg']) + >>> ret = ui.run(cmd, args=['target']) + abcdefg + >>> ret = ui.run(cmd, args=['target', 'none']) + >>> ret = ui.run(cmd, args=['target']) + None + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'set' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.args.extend([ + libbe.command.Argument( + name='setting', metavar='SETTING', optional=True, + completion_callback=complete_bugdir_settings), + libbe.command.Argument( + name='value', metavar='VALUE', optional=True) + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + if params['setting'] == None: + keys = bugdir.settings_properties + keys.sort() + for key in keys: + print >> self.stdout, \ + '%16s: %s' % (key, _value_string(bugdir, key)) + return 0 + if params['setting'] not in bugdir.settings_properties: + msg = 'Invalid setting %s\n' % params['setting'] + msg += 'Allowed settings:\n ' + msg += '\n '.join(bugdir.settings_properties) + raise libbe.command.UserError(msg) + if params['value'] == None: + print _value_string(bugdir, params['setting']) + else: + if params['value'] == 'none': + params['value'] = EMPTY + old_setting = bugdir.settings.get(params['setting']) + attr = bugdir._setting_name_to_attr_name(params['setting']) + setattr(bugdir, attr, params['value']) + return 0 + + def _long_help(self): + return """ +Show or change per-tree settings. + +If name and value are supplied, the name is set to a new value. +If no value is specified, the current value is printed. +If no arguments are provided, all names and values are listed. + +To unset a setting, set it to "none". + +Allowed settings are: + +%s""" % ('\n'.join(get_bugdir_settings()),) + +def get_bugdir_settings(): + settings = [] + for s in libbe.bugdir.BugDir.settings_properties: + settings.append(s) + settings.sort() + documented_settings = [] + for s in settings: + set = getattr(libbe.bugdir.BugDir, s) + dstr = set.__doc__.strip() + # per-setting comment adjustments + if s == 'vcs_name': + lines = dstr.split('\n') + while lines[0].startswith('This property defaults to') == False: + lines.pop(0) + assert len(lines) != None, \ + 'Unexpected vcs_name docstring:\n "%s"' % dstr + lines.insert( + 0, 'The name of the revision control system to use.\n') + dstr = '\n'.join(lines) + doc = textwrap.wrap(dstr, width=70, initial_indent=' ', + subsequent_indent=' ') + documented_settings.append('%s\n%s' % (s, '\n'.join(doc))) + return documented_settings + +def _value_string(bugdir, setting): + val = bugdir.settings.get(setting, EMPTY) + if val == EMPTY: + default = getattr(bugdir, bugdir._setting_name_to_attr_name(setting)) + if default not in [None, EMPTY]: + val = 'None (%s)' % default + else: + val = None + return str(val) + +def complete_bugdir_settings(command, argument, fragment=None): + """ + List possible command completions for fragment. + + Neither the command nor argument arguments are used. + """ + return libbe.bugdir.BugDir.settings_properties diff --git a/libbe/command/severity.py b/libbe/command/severity.py new file mode 100644 index 0000000..27898f7 --- /dev/null +++ b/libbe/command/severity.py @@ -0,0 +1,98 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# Marien Zwart <marienz@gentoo.org> +# Thomas Gerigk <tgerigk@gmx.de> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.bug +import libbe.command +import libbe.command.util + + +class Severity (libbe.command.Command): + """Change a bug's severity level + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_bugdir(bd) + >>> cmd = Severity(ui=ui) + + >>> bd.bug_from_uuid('a').severity + 'minor' + >>> ret = ui.run(cmd, args=['wishlist', '/a']) + >>> bd.flush_reload() + >>> bd.bug_from_uuid('a').severity + 'wishlist' + >>> ret = ui.run(cmd, args=['none', '/a']) + Traceback (most recent call last): + UserError: Invalid severity level: none + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'severity' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.args.extend([ + libbe.command.Argument( + name='severity', metavar='SEVERITY', default=None, + completion_callback=libbe.command.util.complete_severity), + libbe.command.Argument( + name='bug-id', metavar='BUG-ID', default=None, + repeatable=True, + completion_callback=libbe.command.util.complete_bug_id), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + for bug_id in params['bug-id']: + bug,dummy_comment = \ + libbe.command.util.bug_comment_from_user_id(bugdir, bug_id) + if bug.severity != params['severity']: + try: + bug.severity = params['severity'] + except ValueError, e: + if e.name != 'severity': + raise e + raise libbe.command.UserError( + 'Invalid severity level: %s' % e.value) + return 0 + + def _long_help(self): + ret = [""" +Show or change a bug's severity level. + +If no severity is specified, the current value is printed. If a severity level +is specified, it will be assigned to the bug. + +Severity levels are: +"""] + try: # See if there are any per-tree severity configurations + bd = self._get_bugdir() + except NotImplementedError: + pass # No tree, just show the defaults + longest_severity_len = max([len(s) for s in libbe.bug.severity_values]) + for severity in libbe.bug.severity_values : + description = libbe.bug.severity_description[severity] + ret.append('%*s : %s\n' \ + % (longest_severity_len, severity, description)) + return ''.join(ret) diff --git a/libbe/command/show.py b/libbe/command/show.py new file mode 100644 index 0000000..ab3be73 --- /dev/null +++ b/libbe/command/show.py @@ -0,0 +1,207 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# Thomas Gerigk <tgerigk@gmx.de> +# Thomas Habets <thomas@habets.pp.se> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import sys + +import libbe +import libbe.command +import libbe.command.util +import libbe.util.id +import libbe.version +import libbe._version + + +class Show (libbe.command.Command): + """Show a particular bug, comment, or combination of both. + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> io.stdout.encoding = 'ascii' + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_bugdir(bd) + >>> cmd = Show(ui=ui) + + >>> ret = ui.run(cmd, args=['/a',]) # doctest: +ELLIPSIS + ID : a + Short name : abc/a + Severity : minor + Status : open + Assigned : + Reporter : + Creator : John Doe <jdoe@example.com> + Created : ... + Bug A + <BLANKLINE> + + >>> ret = ui.run(cmd, {'xml':True}, ['/a']) # doctest: +ELLIPSIS + <?xml version="1.0" encoding="..." ?> + <be-xml> + <version> + <tag>...</tag> + <branch-nick>...</branch-nick> + <revno>...</revno> + <revision-id>...</revision-id> + </version> + <bug> + <uuid>a</uuid> + <short-name>abc/a</short-name> + <severity>minor</severity> + <status>open</status> + <creator>John Doe <jdoe@example.com></creator> + <created>Thu, 01 Jan 1970 00:00:00 +0000</created> + <summary>Bug A</summary> + </bug> + </be-xml> + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'show' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='xml', short_name='x', + help='Dump as XML'), + libbe.command.Option(name='only-raw-body', + help="When printing only a single comment, just print it's" + " body. This allows extraction of non-text content types."), + libbe.command.Option(name='no-comments', short_name='c', + help="Disable comment output. This is useful if you just " + "want more details on a bug's current status."), + ]) + self.args.extend([ + libbe.command.Argument( + name='id', metavar='ID', default=None, + optional=True, repeatable=True, + completion_callback=libbe.command.util.complete_bug_comment_id), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + if params['only-raw-body'] == True: + if len(params['id']) != 1: + raise libbe.command.UsageError( + 'only one ID accepted with --only-raw-body') + bug,comment = libbe.command.util.bug_comment_from_user_id( + bugdir, params['id'][0]) + if comment == bug.comment_root: + raise libbe.command.UsageError( + "--only-raw-body requires a comment ID, not '%s'" + % params['id'][0]) + sys.__stdout__.write(comment.body) + return 0 + print >> self.stdout, \ + output(bugdir, params['id'], encoding=self.stdout.encoding, + as_xml=params['xml'], + with_comments=not params['no-comments']) + return 0 + + def _long_help(self): + return """ +Show all information about the bugs or comments whose IDs are given. +If no IDs are given, show the entire repository. + +Without the --xml flag set, it's probably not a good idea to mix bug +and comment IDs in a single call, but you're free to do so if you +like. With the --xml flag set, there will never be any root comments, +so mix and match away (the bug listings for directly requested +comments will be restricted to the bug uuid and the requested +comment(s)). + +Directly requested comments will be grouped by their parent bug and +placed at the end of the output, so the ordering may not match the +order of the listed IDs. +""" + +def _sort_ids(bugdir, ids, with_comments=True): + bugs = [] + root_comments = {} + for id in ids: + p = libbe.util.id.parse_user(bugdir, id) + if p['type'] == 'bug': + bugs.append(p['bug']) + elif with_comments == True: + if p['bug'] not in root_comments: + root_comments[p['bug']] = [p['comment']] + else: + root_comments[p['bug']].append(p['comment']) + for bugname in root_comments.keys(): + assert bugname not in bugs, \ + 'specifically requested both #/%s/%s# and #/%s#' \ + % (bugname, root_comments[bugname][0], bugname) + return (bugs, root_comments) + +def _xml_header(encoding): + lines = ['<?xml version="1.0" encoding="%s" ?>' % encoding, + '<be-xml>', + ' <version>', + ' <tag>%s</tag>' % libbe.version.version()] + for tag in ['branch-nick', 'revno', 'revision-id']: + value = libbe._version.version_info[tag.replace('-', '_')] + lines.append(' <%s>%s</%s>' % (tag, value, tag)) + lines.append(' </version>') + return lines + +def _xml_footer(): + return ['</be-xml>'] + +def output(bd, ids, encoding, as_xml=True, with_comments=True): + if ids == None or len(ids) == 0: + bd.load_all_bugs() + ids = [bug.id.user() for bug in bd] + bugs,root_comments = _sort_ids(bd, ids, with_comments) + lines = [] + if as_xml: + lines.extend(_xml_header(encoding)) + else: + spaces_left = len(ids) - 1 + for bugname in bugs: + bug = bd.bug_from_uuid(bugname) + if as_xml: + lines.append(bug.xml(indent=2, show_comments=with_comments)) + else: + lines.append(bug.string(show_comments=with_comments)) + if spaces_left > 0: + spaces_left -= 1 + lines.append('') # add a blank line between bugs/comments + for bugname,comments in root_comments.items(): + bug = bd.bug_from_uuid(bugname) + if as_xml: + lines.extend([' <bug>', ' <uuid>%s</uuid>' % bug.uuid]) + for commname in comments: + try: + comment = bug.comment_root.comment_from_uuid(commname) + except KeyError, e: + raise libbe.command.UserError(e.message) + if as_xml: + lines.append(comment.xml(indent=4)) + else: + lines.append(comment.string()) + if spaces_left > 0: + spaces_left -= 1 + lines.append('') # add a blank line between bugs/comments + if as_xml: + lines.append('</bug>') + if as_xml: + lines.extend(_xml_footer()) + return '\n'.join(lines) diff --git a/libbe/command/status.py b/libbe/command/status.py new file mode 100644 index 0000000..1659f75 --- /dev/null +++ b/libbe/command/status.py @@ -0,0 +1,108 @@ +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.bug +import libbe.command +import libbe.command.util + + +class Status (libbe.command.Command): + """Change a bug's status level + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_bugdir(bd) + >>> cmd = Status(ui=ui) + >>> cmd._storage = bd.storage + + >>> bd.bug_from_uuid('a').status + 'open' + >>> ret = ui.run(cmd, args=['closed', '/a']) + >>> bd.flush_reload() + >>> bd.bug_from_uuid('a').status + 'closed' + >>> ret = ui.run(cmd, args=['none', '/a']) + Traceback (most recent call last): + UserError: Invalid status level: none + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'status' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.args.extend([ + libbe.command.Argument( + name='status', metavar='STATUS', default=None, + completion_callback=libbe.command.util.complete_status), + libbe.command.Argument( + name='bug-id', metavar='BUG-ID', default=None, + repeatable=True, + completion_callback=libbe.command.util.complete_bug_id), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + for bug_id in params['bug-id']: + bug,dummy_comment = \ + libbe.command.util.bug_comment_from_user_id(bugdir, bug_id) + if bug.status != params['status']: + try: + bug.status = params['status'] + except ValueError, e: + if e.name != 'status': + raise e + raise libbe.command.UserError( + 'Invalid status level: %s' % e.value) + return 0 + + def _long_help(self): + longest_status_len = max([len(s) for s in libbe.bug.status_values]) + active_statuses = [] + for status in libbe.bug.active_status_values : + description = libbe.bug.status_description[status] + s = '%*s : %s' % (longest_status_len, status, description) + active_statuses.append(s) + inactive_statuses = [] + for status in libbe.bug.inactive_status_values : + description = libbe.bug.status_description[status] + s = '%*s : %s' % (longest_status_len, status, description) + inactive_statuses.append(s) + ret = """ +Show or change a bug's status. + +If no status is specified, the current value is printed. If a status +is specified, it will be assigned to the bug. + +There are two classes of statuses, active and inactive, which are only +important for commands like "be list" that show only active bugs by +default. + +Active status levels are: + %s +Inactive status levels are: + %s + +You can overide the list of allowed statuses on a per-repository basis. +See "be set --help" for more details. +""" % ('\n '.join(active_statuses), '\n '.join(inactive_statuses)) + return ret diff --git a/libbe/command/subscribe.py b/libbe/command/subscribe.py new file mode 100644 index 0000000..d1cf72e --- /dev/null +++ b/libbe/command/subscribe.py @@ -0,0 +1,385 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import copy +import os + +import libbe +import libbe.bug +import libbe.command +import libbe.diff +import libbe.command.util +import libbe.util.id +import libbe.util.tree + + +TAG="SUBSCRIBE:" + + +class Subscribe (libbe.command.Command): + """(Un)subscribe to change notification + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_bugdir(bd) + >>> cmd = Subscribe(ui=ui) + + >>> a = bd.bug_from_uuid('a') + >>> print a.extra_strings + [] + >>> ret = ui.run(cmd, {'subscriber':'John Doe <j@doe.com>'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE + Subscriptions for abc/a: + John Doe <j@doe.com> all * + >>> bd.flush_reload() + >>> a = bd.bug_from_uuid('a') + >>> print a.extra_strings + ['SUBSCRIBE:John Doe <j@doe.com>\\tall\\t*'] + >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'servers':'a.com,b.net'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE + Subscriptions for abc/a: + Jane Doe <J@doe.com> all a.com,b.net + John Doe <j@doe.com> all * + >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'servers':'a.edu'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE + Subscriptions for abc/a: + Jane Doe <J@doe.com> all a.com,a.edu,b.net + John Doe <j@doe.com> all * + >>> ret = ui.run(cmd, {'unsubscribe':True, 'subscriber':'Jane Doe <J@doe.com>', 'servers':'a.com'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE + Subscriptions for abc/a: + Jane Doe <J@doe.com> all a.edu,b.net + John Doe <j@doe.com> all * + >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'servers':'*'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE + Subscriptions for abc/a: + Jane Doe <J@doe.com> all * + John Doe <j@doe.com> all * + >>> ret = ui.run(cmd, {'unsubscribe':True, 'subscriber':'Jane Doe <J@doe.com>'}, ['/a']) # doctest: +NORMALIZE_WHITESPACE + Subscriptions for abc/a: + John Doe <j@doe.com> all * + >>> ret = ui.run(cmd, {'unsubscribe':True, 'subscriber':'John Doe <j@doe.com>'}, ['/a']) + >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'types':'new'}, ['DIR']) # doctest: +NORMALIZE_WHITESPACE + Subscriptions for bug directory: + Jane Doe <J@doe.com> new * + >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>'}, ['DIR']) # doctest: +NORMALIZE_WHITESPACE + Subscriptions for bug directory: + Jane Doe <J@doe.com> all * + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'subscribe' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='unsubscribe', short_name='u', + help='Unsubscribe instead of subscribing'), + libbe.command.Option(name='list-all', short_name='a', + help='List all subscribers (no ID argument, read only action)'), + libbe.command.Option(name='list', short_name='l', + help='List subscribers (read only action).'), + libbe.command.Option(name='subscriber', short_name='s', + help='Email address of the subscriber (defaults to your user id).', + arg=libbe.command.Argument( + name='subscriber', metavar='EMAIL')), + libbe.command.Option(name='servers', short_name='S', + help='Servers from which you want notification.', + arg=libbe.command.Argument( + name='servers', metavar='STRING')), + libbe.command.Option(name='types', short_name='t', + help='Types of changes you wish to be notified about.', + arg=libbe.command.Argument( + name='types', metavar='STRING')), + ]) + self.args.extend([ + libbe.command.Argument( + name='id', metavar='ID', default=tuple(), + optional=True, repeatable=True, + completion_callback=libbe.command.util.complete_bug_comment_id), + ]) + + def _run(self, **params): + bugdir = self._get_bugdir() + if params['list-all'] == True or params['list'] == True: + writeable = bugdir.storage.writeable + bugdir.storage.writeable = False + if params['list-all'] == True: + assert len(params['id']) == 0, params['id'] + subscriber = params['subscriber'] + if subscriber == None: + subscriber = self._get_user_id() + if params['unsubscribe'] == True: + if params['servers'] == None: + params['servers'] = 'INVALID' + if params['types'] == None: + params['types'] = 'INVALID' + else: + if params['servers'] == None: + params['servers'] = '*' + if params['types'] == None: + params['types'] = 'all' + servers = params['servers'].split(',') + types = params['types'].split(',') + + if len(params['id']) == 0: + params['id'] = [libbe.diff.BUGDIR_ID] + for _id in params['id']: + if _id == libbe.diff.BUGDIR_ID: # directory-wide subscriptions + type_root = libbe.diff.BUGDIR_TYPE_ALL + entity = bugdir + entity_name = 'bug directory' + else: # bug-specific subscriptions + type_root = libbe.diff.BUG_TYPE_ALL + bug,dummy_comment = libbe.command.util.bug_comment_from_user_id( + bugdir, _id) + entity = bug + entity_name = bug.id.user() + if params['list-all'] == True: + entity_name = 'anything in the bug directory' + types = [libbe.diff.type_from_name(name, type_root, default=libbe.diff.INVALID_TYPE, + default_ok=params['unsubscribe']) + for name in types] + estrs = entity.extra_strings + if params['list'] == True or params['list-all'] == True: + pass + else: # alter subscriptions + if params['unsubscribe'] == True: + estrs = unsubscribe(estrs, subscriber, types, servers, type_root) + else: # add the tag + estrs = subscribe(estrs, subscriber, types, servers, type_root) + entity.extra_strings = estrs # reassign to notice change + + if params['list-all'] == True: + bugdir.load_all_bugs() + subscriptions = get_bugdir_subscribers(bugdir, servers[0]) + else: + subscriptions = [] + for estr in entity.extra_strings: + if estr.startswith(TAG): + subscriptions.append(estr[len(TAG):]) + + if len(subscriptions) > 0: + print >> self.stdout, 'Subscriptions for %s:' % entity_name + print >> self.stdout, '\n'.join(subscriptions) + if params['list-all'] == True or params['list'] == True: + bugdir.storage.writeable = writeable + return 0 + + def _long_help(self): + return """ +ID can be either a bug id, or blank/"DIR", in which case it refers to the +whole bug directory. + +SERVERS specifies the servers from which you would like to receive +notification. Multiple severs may be specified in a comma-separated +list, or you can use "*" to match all servers (the default). If you +have not selected a server, it should politely refrain from notifying +you of changes, although there is no way to guarantee this behavior. + +Available TYPES: + For bugs: +%s + For %s: +%s + +For unsubscription, any listed SERVERS and TYPES are removed from your +subscription. Either the catch-all server "*" or type "%s" will +remove SUBSCRIBER entirely from the specified ID. + +This command is intended for use primarily by public interfaces, since +if you're just hacking away on your private repository, you'll known +what's changed ;). This command just (un)sets the appropriate +subscriptions, and leaves it up to each interface to perform the +notification. +""" % (libbe.diff.BUG_TYPE_ALL.string_tree(6), libbe.diff.BUGDIR_ID, + libbe.diff.BUGDIR_TYPE_ALL.string_tree(6), + libbe.diff.BUGDIR_TYPE_ALL) + + +# internal helper functions + +def _generate_string(subscriber, types, servers): + types = sorted([str(t) for t in types]) + servers = sorted(servers) + return "%s%s\t%s\t%s" % (TAG,subscriber,",".join(types),",".join(servers)) + +def _parse_string(string, type_root): + assert string.startswith(TAG), string + string = string[len(TAG):] + subscriber,types,servers = string.split("\t") + types = [libbe.diff.type_from_name(name, type_root) for name in types.split(",")] + return (subscriber,types,servers.split(",")) + +def _get_subscriber(extra_strings, subscriber, type_root): + for i,string in enumerate(extra_strings): + if string.startswith(TAG): + s,ts,srvs = _parse_string(string, type_root) + if s == subscriber: + return i,s,ts,srvs # match! + return None # no match + +# functions exposed to other modules + +def subscribe(extra_strings, subscriber, types, servers, type_root): + args = _get_subscriber(extra_strings, subscriber, type_root) + if args == None: # no match + extra_strings.append(_generate_string(subscriber, types, servers)) + return extra_strings + # Alter matched string + i,s,ts,srvs = args + for t in types: + if t not in ts: + ts.append(t) + # remove descendant types + all_ts = copy.copy(ts) + for t in all_ts: + for tt in all_ts: + if tt in ts and t.has_descendant(tt): + ts.remove(tt) + if "*" in servers+srvs: + srvs = ["*"] + else: + srvs = list(set(servers+srvs)) + extra_strings[i] = _generate_string(subscriber, ts, srvs) + return extra_strings + +def unsubscribe(extra_strings, subscriber, types, servers, type_root): + args = _get_subscriber(extra_strings, subscriber, type_root) + if args == None: # no match + return extra_strings # pass + # Remove matched string + i,s,ts,srvs = args + all_ts = copy.copy(ts) + for t in types: + for tt in all_ts: + if tt in ts and t.has_descendant(tt): + ts.remove(tt) + if "*" in servers+srvs: + srvs = [] + else: + for srv in servers: + if srv in srvs: + srvs.remove(srv) + if len(ts) == 0 or len(srvs) == 0: + extra_strings.pop(i) + else: + extra_strings[i] = _generate_string(subscriber, ts, srvs) + return extra_strings + +def get_subscribers(extra_strings, type, server, type_root, + match_ancestor_types=False, + match_descendant_types=False): + """ + Set match_ancestor_types=True if you want to find eveyone who + cares about your particular type. + + Set match_descendant_types=True if you want to find subscribers + who may only care about some subset of your type. This is useful + for generating lists of all the subscribers in a given set of + extra_strings. + + >>> def sgs(*args, **kwargs): + ... return sorted(get_subscribers(*args, **kwargs)) + >>> es = [] + >>> es = subscribe(es, "John Doe <j@doe.com>", [libbe.diff.BUGDIR_TYPE_ALL], + ... ["a.com"], libbe.diff.BUGDIR_TYPE_ALL) + >>> es = subscribe(es, "Jane Doe <J@doe.com>", [libbe.diff.BUGDIR_TYPE_NEW], + ... ["*"], libbe.diff.BUGDIR_TYPE_ALL) + >>> sgs(es, libbe.diff.BUGDIR_TYPE_ALL, "a.com", libbe.diff.BUGDIR_TYPE_ALL) + ['John Doe <j@doe.com>'] + >>> sgs(es, libbe.diff.BUGDIR_TYPE_ALL, "a.com", libbe.diff.BUGDIR_TYPE_ALL, + ... match_descendant_types=True) + ['Jane Doe <J@doe.com>', 'John Doe <j@doe.com>'] + >>> sgs(es, libbe.diff.BUGDIR_TYPE_ALL, "b.net", libbe.diff.BUGDIR_TYPE_ALL, + ... match_descendant_types=True) + ['Jane Doe <J@doe.com>'] + >>> sgs(es, libbe.diff.BUGDIR_TYPE_NEW, "a.com", libbe.diff.BUGDIR_TYPE_ALL) + ['Jane Doe <J@doe.com>'] + >>> sgs(es, libbe.diff.BUGDIR_TYPE_NEW, "a.com", libbe.diff.BUGDIR_TYPE_ALL, + ... match_ancestor_types=True) + ['Jane Doe <J@doe.com>', 'John Doe <j@doe.com>'] + """ + for string in extra_strings: + if not string.startswith(TAG): + continue + subscriber,types,servers = _parse_string(string, type_root) + type_match = False + if type in types: + type_match = True + if type_match == False and match_ancestor_types == True: + for t in types: + if t.has_descendant(type): + type_match = True + break + if type_match == False and match_descendant_types == True: + for t in types: + if type.has_descendant(t): + type_match = True + break + server_match = False + if server in servers or servers == ["*"] or server == "*": + server_match = True + if type_match == True and server_match == True: + yield subscriber + +def get_bugdir_subscribers(bugdir, server): + """ + I have a bugdir. Who cares about it, and what do they care about? + Returns a dict of dicts: + subscribers[user][id] = types + where id is either a bug.uuid (in the case of a bug subscription) + or "%(bugdir_id)s" (in the case of a bugdir subscription). + + Only checks bugs that are currently in memory, so you might want + to call bugdir.load_all_bugs() first. + + >>> bd = bugdir.SimpleBugDir(sync_with_disk=False) + >>> a = bd.bug_from_shortname("a") + >>> bd.extra_strings = subscribe(bd.extra_strings, "John Doe <j@doe.com>", + ... [libbe.diff.BUGDIR_TYPE_ALL], ["a.com"], libbe.diff.BUGDIR_TYPE_ALL) + >>> bd.extra_strings = subscribe(bd.extra_strings, "Jane Doe <J@doe.com>", + ... [libbe.diff.BUGDIR_TYPE_NEW], ["*"], libbe.diff.BUGDIR_TYPE_ALL) + >>> a.extra_strings = subscribe(a.extra_strings, "John Doe <j@doe.com>", + ... [libbe.diff.BUG_TYPE_ALL], ["a.com"], libbe.diff.BUG_TYPE_ALL) + >>> subscribers = get_bugdir_subscribers(bd, "a.com") + >>> subscribers["Jane Doe <J@doe.com>"]["%(bugdir_id)s"] + [<SubscriptionType: new>] + >>> subscribers["John Doe <j@doe.com>"]["%(bugdir_id)s"] + [<SubscriptionType: all>] + >>> subscribers["John Doe <j@doe.com>"]["a"] + [<SubscriptionType: all>] + >>> get_bugdir_subscribers(bd, "b.net") + {'Jane Doe <J@doe.com>': {'%(bugdir_id)s': [<SubscriptionType: new>]}} + >>> bd.cleanup() + """ % {'bugdir_id':libbe.diff.BUGDIR_ID} + subscribers = {} + for sub in get_subscribers(bugdir.extra_strings, libbe.diff.BUGDIR_TYPE_ALL, + server, libbe.diff.BUGDIR_TYPE_ALL, + match_descendant_types=True): + i,s,ts,srvs = _get_subscriber(bugdir.extra_strings, sub, + libbe.diff.BUGDIR_TYPE_ALL) + subscribers[sub] = {"DIR":ts} + for bug in bugdir: + for sub in get_subscribers(bug.extra_strings, libbe.diff.BUG_TYPE_ALL, + server, libbe.diff.BUG_TYPE_ALL, + match_descendant_types=True): + i,s,ts,srvs = _get_subscriber(bug.extra_strings, sub, + libbe.diff.BUG_TYPE_ALL) + if sub in subscribers: + subscribers[sub][bug.uuid] = ts + else: + subscribers[sub] = {bug.uuid:ts} + return subscribers diff --git a/libbe/command/tag.py b/libbe/command/tag.py new file mode 100644 index 0000000..f4dc3ba --- /dev/null +++ b/libbe/command/tag.py @@ -0,0 +1,152 @@ +# Copyright (C) 2009-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.command +import libbe.command.util + + +TAG_TAG = 'TAG:' + + +class Tag (libbe.command.Command): + __doc__ = """Tag a bug, or search bugs for tags + + >>> import sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_bugdir(bd) + >>> cmd = Tag(ui=ui) + + >>> a = bd.bug_from_uuid('a') + >>> print a.extra_strings + [] + >>> ret = ui.run(cmd, args=['/a', 'GUI']) + Tags for abc/a: + GUI + >>> bd.flush_reload() + >>> a = bd.bug_from_uuid('a') + >>> print a.extra_strings + ['%(tag_tag)sGUI'] + >>> ret = ui.run(cmd, args=['/a', 'later']) + Tags for abc/a: + GUI + later + >>> ret = ui.run(cmd, args=['/a']) + Tags for abc/a: + GUI + later + >>> ret = ui.run(cmd, {'list':True}) + GUI + later + >>> ret = ui.run(cmd, args=['/a', 'Alphabetically first']) + Tags for abc/a: + Alphabetically first + GUI + later + >>> bd.flush_reload() + >>> a = bd.bug_from_uuid('a') + >>> print a.extra_strings + ['%(tag_tag)sAlphabetically first', '%(tag_tag)sGUI', '%(tag_tag)slater'] + >>> a.extra_strings = [] + >>> print a.extra_strings + [] + >>> ret = ui.run(cmd, args=['/a']) + >>> bd.flush_reload() + >>> a = bd.bug_from_uuid('a') + >>> print a.extra_strings + [] + >>> ret = ui.run(cmd, args=['/a', 'Alphabetically first']) + Tags for abc/a: + Alphabetically first + >>> ret = ui.run(cmd, {'remove':True}, ['/a', 'Alphabetically first']) + >>> ui.cleanup() + >>> bd.cleanup() + """ % {'tag_tag':TAG_TAG} + name = 'tag' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='remove', short_name='r', + help='Remove TAG (instead of adding it)'), + libbe.command.Option(name='list', short_name='l', + help='List all available tags and exit'), + ]) + self.args.extend([ + libbe.command.Argument( + name='id', metavar='BUG-ID', optional=True, + completion_callback=libbe.command.util.complete_bug_id), + libbe.command.Argument( + name='tag', metavar='TAG', default=tuple(), + optional=True, repeatable=True), + ]) + + def _run(self, **params): + if params['id'] == None and params['list'] == False: + raise libbe.command.UserError('Please specify a bug id.') + if params['id'] != None and params['list'] == True: + raise libbe.command.UserError( + 'Do not specify a bug id with the --list option.') + bugdir = self._get_bugdir() + if params['list'] == True: + bugdir.load_all_bugs() + tags = [] + for bug in bugdir: + for estr in bug.extra_strings: + if estr.startswith(TAG_TAG): + tag = estr[len(TAG_TAG):] + if tag not in tags: + tags.append(tag) + tags.sort() + if len(tags) > 0: + print >> self.stdout, '\n'.join(tags) + return 0 + + bug,dummy_comment = libbe.command.util.bug_comment_from_user_id( + bugdir, params['id']) + if len(params['tag']) > 0: + estrs = bug.extra_strings + for tag in params['tag']: + tag_string = '%s%s' % (TAG_TAG, tag) + if params['remove'] == True: + estrs.remove(tag_string) + else: # add the tag + estrs.append(tag_string) + bug.extra_strings = estrs # reassign to notice change + + tags = [] + for estr in bug.extra_strings: + if estr.startswith(TAG_TAG): + tags.append(estr[len(TAG_TAG):]) + + if len(tags) > 0: + print "Tags for %s:" % bug.id.user() + print '\n'.join(tags) + return 0 + + def _long_help(self): + return """ +If TAG is given, add TAG to BUG-ID. If it is not specified, just +print the tags for BUG-ID. + +To search for bugs with a particular tag, try + $ be list --extra-strings %s<your-tag> +""" % TAG_TAG diff --git a/libbe/command/target.py b/libbe/command/target.py new file mode 100644 index 0000000..f8a956b --- /dev/null +++ b/libbe/command/target.py @@ -0,0 +1,209 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Chris Ball <cjb@laptop.org> +# Gianluca Montecchi <gian@grys.it> +# Marien Zwart <marienz@gentoo.org> +# Thomas Gerigk <tgerigk@gmx.de> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import libbe +import libbe.command +import libbe.command.util +import libbe.command.depend + + +class Target (libbe.command.Command): + """Assorted bug target manipulations and queries + + >>> import os, StringIO, sys + >>> import libbe.bugdir + >>> bd = libbe.bugdir.SimpleBugDir(memory=False) + >>> io = libbe.command.StringInputOutput() + >>> io.stdout = sys.stdout + >>> ui = libbe.command.UserInterface(io=io) + >>> ui.storage_callbacks.set_storage(bd.storage) + >>> cmd = Target(ui=ui) + + >>> ret = ui.run(cmd, args=['/a']) + No target assigned. + >>> ret = ui.run(cmd, args=['/a', 'tomorrow']) + >>> ret = ui.run(cmd, args=['/a']) + tomorrow + + >>> ui.io.stdout = StringIO.StringIO() + >>> ret = ui.run(cmd, {'resolve':True}, ['tomorrow']) + >>> output = ui.io.get_stdout().strip() + >>> bd.flush_reload() + >>> target = bd.bug_from_uuid(output) + >>> print target.summary + tomorrow + >>> print target.severity + target + + >>> ui.io.stdout = sys.stdout + >>> ret = ui.run(cmd, args=['/a', 'none']) + >>> ret = ui.run(cmd, args=['/a']) + No target assigned. + >>> ui.cleanup() + >>> bd.cleanup() + """ + name = 'target' + + def __init__(self, *args, **kwargs): + libbe.command.Command.__init__(self, *args, **kwargs) + self.options.extend([ + libbe.command.Option(name='resolve', short_name='r', + help="Print the UUID for the target bug whose summary " + "matches TARGET. If TARGET is not given, print the UUID " + "of the current bugdir target."), + ]) + self.args.extend([ + libbe.command.Argument( + name='id', metavar='BUG-ID', optional=True, + completion_callback=libbe.command.util.complete_bug_id), + libbe.command.Argument( + name='target', metavar='TARGET', optional=True, + completion_callback=complete_target), + ]) + + def _run(self, **params): + if params['resolve'] == False: + if params['id'] == None: + raise libbe.command.UserError('Please specify a bug id.') + else: + if params['target'] != None: + raise libbe.command.UserError('Too many arguments') + params['target'] = params.pop('id') + bugdir = self._get_bugdir() + if params['resolve'] == True: + bug = bug_from_target_summary(bugdir, params['target']) + if bug == None: + print >> self.stdout, 'No target assigned.' + else: + print >> self.stdout, bug.uuid + return 0 + bug,dummy_comment = libbe.command.util.bug_comment_from_user_id( + bugdir, params['id']) + if params['target'] == None: + target = bug_target(bugdir, bug) + if target == None: + print >> self.stdout, 'No target assigned.' + else: + print >> self.stdout, target.summary + else: + if params['target'] == 'none': + target = remove_target(bugdir, bug) + else: + target = add_target(bugdir, bug, params['target']) + return 0 + + def usage(self): + return 'usage: be %(name)s BUG-ID [TARGET]\nor: be %(name)s --resolve [TARGET]' \ + % vars(self.__class__) + + def _long_help(self): + return """ +Assorted bug target manipulations and queries. + +If no target is specified, the bug's current target is printed. If +TARGET is specified, it will be assigned to the bug, creating a new +target bug if necessary. + +Targets are free-form; any text may be specified. They will generally +be milestone names or release numbers. The value "none" can be used +to unset the target. + +In the alternative `be target --resolve TARGET` form, print the UUID +of the target-bug with summary TARGET. If target is not given, return +use the bugdir's current target (see `be set`). + +If you want to list all bugs blocking the current target, try + $ be depend --status -closed,fixed,wontfix --severity -target \ + $(be target --resolve) + +If you want to set the current bugdir target by summary (rather than +by UUID), try + $ be set target $(be target --resolve SUMMARY) +""" + +def bug_from_target_summary(bugdir, summary=None): + if summary == None: + if bugdir.target == None: + return None + else: + return bugdir.bug_from_uuid(bugdir.target) + matched = [] + for uuid in bugdir.uuids(): + bug = bugdir.bug_from_uuid(uuid) + if bug.severity == 'target' and bug.summary == summary: + matched.append(bug) + if len(matched) == 0: + return None + if len(matched) > 1: + raise Exception('Several targets with same summary: %s' + % '\n '.join([bug.uuid for bug in matched])) + return matched[0] + +def bug_target(bugdir, bug): + if bug.severity == 'target': + return bug + matched = [] + for blocked in libbe.command.depend.get_blocks(bugdir, bug): + if blocked.severity == 'target': + matched.append(blocked) + if len(matched) == 0: + return None + if len(matched) > 1: + raise Exception('This bug (%s) blocks several targets: %s' + % (bug.uuid, + '\n '.join([b.uuid for b in matched]))) + return matched[0] + +def remove_target(bugdir, bug): + target = bug_target(bugdir, bug) + libbe.command.depend.remove_block(target, bug) + return target + +def add_target(bugdir, bug, summary): + target = bug_from_target_summary(bugdir, summary) + if target == None: + target = bugdir.new_bug(summary=summary) + target.severity = 'target' + libbe.command.depend.add_block(target, bug) + return target + +def targets(bugdir): + """Generate all possible target bug summaries.""" + bugdir.load_all_bugs() + for bug in bugdir: + if bug.severity == 'target': + yield bug.summary + +def target_dict(bugdir): + """ + Return a dict with bug UUID keys and bug summary values for all + target bugs. + """ + ret = {} + bugdir.load_all_bugs() + for bug in bugdir: + if bug.severity == 'target': + ret[bug.uuid] = bug.summary + return ret + +def complete_target(command, argument, fragment=None): + """List possible command completions for fragment.""" + return targets(command._get_bugdir()) diff --git a/libbe/command/util.py b/libbe/command/util.py new file mode 100644 index 0000000..6e8e36c --- /dev/null +++ b/libbe/command/util.py @@ -0,0 +1,203 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import glob +import os.path + +import libbe +import libbe.command + +class Completer (object): + def __init__(self, options): + self.options = options + def __call__(self, bugdir, fragment=None): + return [fragment] + +def complete_command(command, argument, fragment=None): + """ + List possible command completions for fragment. + + command argument is not used. + """ + return list(libbe.command.commands(command_names=True)) + +def comp_path(fragment=None): + """List possible path completions for fragment.""" + if fragment == None: + fragment = '.' + comps = glob.glob(fragment+'*') + glob.glob(fragment+'/*') + if len(comps) == 1 and os.path.isdir(comps[0]): + comps.extend(glob.glob(comps[0]+'/*')) + return comps + +def complete_path(command, argument, fragment=None): + """List possible path completions for fragment.""" + return comp_path(fragment) + +def complete_status(command, argument, fragment=None): + bd = command._get_bugdir() + import libbe.bug + return libbe.bug.status_values + +def complete_severity(command, argument, fragment=None): + bd = command._get_bugdir() + import libbe.bug + return libbe.bug.severity_values + +def assignees(bugdir): + bugdir.load_all_bugs() + return list(set([bug.assigned for bug in bugdir + if bug.assigned != None])) + +def complete_assigned(command, argument, fragment=None): + return assignees(command._get_bugdir()) + +def complete_extra_strings(command, argument, fragment=None): + if fragment == None: + return [] + return [fragment] + +def complete_bug_id(command, argument, fragment=None): + return complete_bug_comment_id(command, argument, fragment, + comments=False) + +def complete_bug_comment_id(command, argument, fragment=None, + active_only=True, comments=True): + import libbe.bugdir + import libbe.util.id + bd = command._get_bugdir() + if fragment == None or len(fragment) == 0: + fragment = '/' + try: + p = libbe.util.id.parse_user(bd, fragment) + matches = None + root,residual = (fragment, None) + if not root.endswith('/'): + root += '/' + except libbe.util.id.InvalidIDStructure, e: + return [] + except libbe.util.id.NoIDMatches: + return [] + except libbe.util.id.MultipleIDMatches, e: + if e.common == None: + # choose among bugdirs + return e.matches + common = e.common + matches = e.matches + root,residual = libbe.util.id.residual(common, fragment) + p = libbe.util.id.parse_user(bd, e.common) + bug = None + if matches == None: # fragment was complete, get a list of children uuids + if p['type'] == 'bugdir': + matches = bd.uuids() + common = bd.id.user() + elif p['type'] == 'bug': + if comments == False: + return [fragment] + bug = bd.bug_from_uuid(p['bug']) + matches = bug.uuids() + common = bug.id.user() + else: + assert p['type'] == 'comment', p + return [fragment] + if p['type'] == 'bugdir': + child_fn = bd.bug_from_uuid + elif p['type'] == 'bug': + if comments == False: + return[fragment] + if bug == None: + bug = bd.bug_from_uuid(p['bug']) + child_fn = bug.comment_from_uuid + elif p['type'] == 'comment': + assert matches == None, matches + return [fragment] + possible = [] + common += '/' + for m in matches: + child = child_fn(m) + id = child.id.user() + possible.append(id.replace(common, root)) + return possible + +def select_values(string, possible_values, name="unkown"): + """ + This function allows the user to select values from a list of + possible values. The default is to select all the values: + + >>> select_values(None, ['abc', 'def', 'hij']) + ['abc', 'def', 'hij'] + + The user selects values with a comma-separated limit_string. + Prepending a minus sign to such a list denotes blacklist mode: + + >>> select_values('-abc,hij', ['abc', 'def', 'hij']) + ['def'] + + Without the leading -, the selection is in whitelist mode: + + >>> select_values('abc,hij', ['abc', 'def', 'hij']) + ['abc', 'hij'] + + In either case, appropriate errors are raised if on of the + user-values is not in the list of possible values. The name + parameter lets you make the error message more clear: + + >>> select_values('-xyz,hij', ['abc', 'def', 'hij'], name="foobar") + Traceback (most recent call last): + ... + UserError: Invalid foobar xyz + ['abc', 'def', 'hij'] + >>> select_values('xyz,hij', ['abc', 'def', 'hij'], name="foobar") + Traceback (most recent call last): + ... + UserError: Invalid foobar xyz + ['abc', 'def', 'hij'] + """ + possible_values = list(possible_values) # don't alter the original + if string == None: + pass + elif string.startswith('-'): + blacklisted_values = set(string[1:].split(',')) + for value in blacklisted_values: + if value not in possible_values: + raise libbe.command.UserError('Invalid %s %s\n %s' + % (name, value, possible_values)) + possible_values.remove(value) + else: + whitelisted_values = string.split(',') + for value in whitelisted_values: + if value not in possible_values: + raise libbe.command.UserError( + 'Invalid %s %s\n %s' + % (name, value, possible_values)) + possible_values = whitelisted_values + return possible_values + +def bug_comment_from_user_id(bugdir, id): + p = libbe.util.id.parse_user(bugdir, id) + if not p['type'] in ['bug', 'comment']: + raise libbe.command.UserError( + '%s is a %s id, not a bug or comment id' % (id, p['type'])) + if p['bugdir'] != bugdir.uuid: + raise libbe.command.UserError( + "%s doesn't belong to this bugdir (%s)" + % (id, bugdir.uuid)) + bug = bugdir.bug_from_uuid(p['bug']) + if 'comment' in p: + comment = bug.comment_from_uuid(p['comment']) + else: + comment = bug.comment_root + return (bug, comment) |