diff options
Diffstat (limited to 'interfaces/email/interactive/be-handle-mail')
-rwxr-xr-x | interfaces/email/interactive/be-handle-mail | 673 |
1 files changed, 673 insertions, 0 deletions
diff --git a/interfaces/email/interactive/be-handle-mail b/interfaces/email/interactive/be-handle-mail new file mode 100755 index 0000000..f457b6a --- /dev/null +++ b/interfaces/email/interactive/be-handle-mail @@ -0,0 +1,673 @@ +#!/usr/bin/env python +# +# Copyright (C) 2009 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +Provide and email interface to the distributed bugtracker Bugs +Everywhere. Recieves incoming email via procmail. Provides an +interface similar to the Debian Bug Tracker. There are currently +three distinct email types: submits, comments, and controls. The +email types are differentiated by tags in the email subject. See +SUBJECT_TAG* for the current values. + +Submit emails create a bug (and optionally add some intitial +comments). The post-tag subject is used as the bug summary, and the +email body is parsed for a pseudo-header. Any text after the +psuedo-header but before a possible line starting with BREAK is added +as the initial bug comment. + +Comment emails... + +Control emails... + +Any changes made to the repository are commited after the email is +executed, with the email's post-tag subject as the commit message. +""" + +import codecs +import cStringIO as StringIO +import email +from email.mime.multipart import MIMEMultipart +import email.utils +import os +import os.path +import re +import shlex +import sys +import time +import traceback +import doctest +import unittest + +import libbe.cmdutil, libbe.encoding, libbe.utility +import send_pgp_mime + +HANDLER_ADDRESS = u"BE Bugs <wking@thor.physics.drexel.edu>" +_THIS_DIR = os.path.abspath(os.path.dirname(__file__)) +BE_DIR = _THIS_DIR +LOGPATH = os.path.join(_THIS_DIR, u"be-handle-mail.log") +LOGFILE = None + +# Tag strings generated by generate_global_tags() +SUBJECT_TAG_BASE = u"be-bug" +SUBJECT_TAG_RESPONSE = None +SUBJECT_TAG_START = None +SUBJECT_TAG_NEW = None +SUBJECT_TAG_COMMENT = None +SUBJECT_TAG_CONTROL = None + +BREAK = u"--" +NEW_REQUIRED_PSEUDOHEADERS = [u"Version"] +NEW_OPTIONAL_PSEUDOHEADERS = [u"Reporter", u"Assign", u"Depend", u"Severity", + u"Status", u"Tag", u"Target"] +CONTROL_COMMENT = u"#" +ALLOWED_COMMANDS = [u"assign", u"comment", u"commit", u"depend", u"help", + u"list", u"merge", u"new", u"open", u"severity", u"show", + u"status", u"tag", u"target"] + +AUTOCOMMIT = True + +libbe.encoding.ENCODING = u"utf-8" # force default encoding +ENCODING = libbe.encoding.get_encoding() + +class InvalidEmail (ValueError): + def __init__(self, msg, message): + ValueError.__init__(self, message) + self.msg = msg + def response(self): + header = self.msg.response_header + body = [u"Error processing email:\n", + self.response_body(), u""] + response_generator = \ + send_pgp_mime.PGPMimeMessageFactory(u"\n".join(body)) + response = MIMEMultipart() + response.attach(response_generator.plain()) + response.attach(self.msg.msg) + ret = send_pgp_mime.attach_root(header, response) + return ret + def response_body(self): + err_text = [unicode(self)] + return u"\n".join(err_text) + +class InvalidSubject (InvalidEmail): + def __init__(self, msg, message=None): + if message == None: + message = u"Invalid subject" + InvalidEmail.__init__(self, msg, message) + def response_body(self): + err_text = u"\n".join([unicode(self), u"", + u"full subject was:", + self.msg.subject()]) + return err_text + +class InvalidPseudoHeader (InvalidEmail): + def response_body(self): + err_text = [u"Invalid pseudo-header:\n", + unicode(self)] + return u"\n".join(err_text) + +class InvalidCommand (InvalidEmail): + def __init__(self, msg, command, message=None): + bigmessage = u"Invalid execution command '%s'" % command + if message != None: + bigmessage += u"\n%s" % message + InvalidEmail.__init__(self, msg, bigmessage) + self.command = command + +class InvalidOption (InvalidCommand): + def __init__(self, msg, option, message=None): + bigmessage = u"Invalid option '%s'" % (option) + if message != None: + bigmessage += u"\n%s" % message + InvalidCommand.__init__(self, msg, info, command, bigmessage) + self.option = option + +class ID (object): + """ + Sometimes you want to reference the output of a command that + hasn't been executed yet. ID is there for situations like + > a = Command(msg, "new", ["create a bug"]) + > b = Command(msg, "comment", [ID(a), "and comment on it"]) + """ + def __init__(self, command): + self.command = command + def extract_id(self): + if hasattr(self, "cached_id"): + return self.cached_id + assert self.command.ret == 0, self.command.ret + if self.command.command == u"new": + regexp = re.compile(u"Created bug with ID (.*)") + else: + raise NotImplementedError, self.command.command + match = regexp.match(self.command.stdout) + assert len(match.groups()) == 1, str(match.groups()) + self.cached_id = match.group(1) + return self.cached_id + def __str__(self): + if self.command.ret != 0: + return "<id for %s>" % repr(self.command) + return "<id %s>" % self.extract_id() + +class Command (object): + """ + A becommands command wrapper. + Doesn't validate input, so do that before initializing. + + Initialize with + Command(msg, command, args=None, stdin=None) + where + msg: the Message instance prompting this command + command: name of becommand to execute, e.g. "new" + args: list of arguments to pass to the command + stdin: if non-null, a string to pipe into the command's stdin + """ + def __init__(self, msg, command, args=None, stdin=None): + self.msg = msg + self.command = command + if args == None: + self.args = [] + else: + self.args = args + self.stdin = stdin + self.ret = None + self.stdout = None + self.stderr = None + self.err = None + def __str__(self): + return "<command: %s %s>" % (self.command, " ".join([str(s) for s in self.args])) + def normalize_args(self): + """ + Expand any ID placeholders in self.args. + """ + for i,arg in enumerate(self.args): + if isinstance(arg, ID): + self.args[i] = arg.extract_id() + def run(self): + """ + Attempt to execute the command whose info is given in the dictionary + info. Returns the exit code, stdout, and stderr produced by the + command. + """ + if self.command in [None, u""]: # don't accept blank commands + raise InvalidCommand(self.msg, self, "Blank") + elif self.command not in ALLOWED_COMMANDS: + raise InvalidCommand(self.msg, self, "Not allowed") + assert self.ret == None, u"running %s twice!" % unicode(self) + self.normalize_args() + # set stdin and catch stdout and stderr + if self.stdin != None: + new_stdin = StringIO.StringIO(self.stdin) + orig___stdin = sys.__stdin__ + sys.__stdin__ = new_stdin + orig_stdin = sys.stdin + sys.stdin = new_stdin + new_stdout = codecs.getwriter(ENCODING)(StringIO.StringIO()) + new_stderr = codecs.getwriter(ENCODING)(StringIO.StringIO()) + orig_stdout = sys.stdout + orig_stderr = sys.stderr + sys.stdout = new_stdout + sys.stderr = new_stderr + # run the command + os.chdir(BE_DIR) + try: + self.ret = libbe.cmdutil.execute(self.command, self.args, + manipulate_encodings=False) + except libbe.cmdutil.GetHelp: + print libbe.cmdutil.help(command) + except libbe.cmdutil.GetCompletions: + self.err = InvalidOption(self.msg, self.command, u"--complete") + except libbe.cmdutil.UsageError, e: + self.err = InvalidCommand(self.msg, self, + "%s\n%s" % (type(e), unicode(e))) + except libbe.cmdutil.UserError, e: + self.err = InvalidCommand(self.msg, self, + "%s\n%s" % (type(e), unicode(e))) + # restore stdin, stdout, and stderr + if self.stdin != None: + sys.__stdin__ = new_stdin + sys.__stdin__ = orig___stdin + sys.stdin = orig_stdin + sys.stdout.flush() + sys.stderr.flush() + sys.stdout = orig_stdout + sys.stderr = orig_stderr + self.stdout = codecs.decode(new_stdout.getvalue(), ENCODING) + self.stderr = codecs.decode(new_stderr.getvalue(), ENCODING) + if self.err != None: + raise self.err + return (self.ret, self.stdout, self.stderr) + def response_msg(self): + response_body = [u"Results of running: (exit code %d)" % self.ret, + u" %s %s" % (self.command, u" ".join(self.args))] + if self.stdout != None and len(self.stdout) > 0: + response_body.extend([u"", u"stdout:", u"", self.stdout]) + if self.stderr != None and len(self.stderr) > 0: + response_body.extend([u"", u"stderr:", u"", self.stderr]) + response_body.append(u"") # trailing endline + response_generator = \ + send_pgp_mime.PGPMimeMessageFactory(u"\n".join(response_body)) + return response_generator.plain() + +class Message (object): + def __init__(self, email_text): + self.text = email_text + p=email.Parser.Parser() + self.msg=p.parsestr(self.text) + if LOGFILE != None: + LOGFILE.write(u"handling %s\n" % self.author_addr()) + LOGFILE.write(u"\n%s\n\n" % self.text) + def author_tuple(self): + """ + Extract and normalize the sender's email address. Returns a + (name, email) tuple. + """ + if not hasattr(self, "author_tuple_cache"): + self.author_tuple_cache = \ + send_pgp_mime.source_email(self.msg, return_realname=True) + return self.author_tuple_cache + def author_addr(self): + return email.utils.formataddr(self.author_tuple()) + def author_name(self): + return self.author_tuple()[0] + def author_email(self): + return self.author_tuple()[1] + def default_msg_attribute_access(self, attr_name, default=None): + if attr_name in self.msg: + return self.msg[attr_name] + return default + def message_id(self, default=None): + return self.default_msg_attribute_access("message-id", default=default) + def subject(self): + if "subject" not in self.msg: + raise InvalidSubject(self, u"Email must contain a subject") + return self.msg["subject"] + def _split_subject(self): + """ + Returns (tag, subject), with missing values replaced by None. + """ + if hasattr(self, "_split_subject_cache"): + return self._split_subject_cache + args = self.subject().split(u"]",1) + if len(args) < 1: + self._split_subject_cache = (None, None) + elif len(args) < 2: + self._split_subject_cache = (args[0]+u"]", None) + else: + self._split_subject_cache = (args[0]+u"]", args[1].strip()) + return self._split_subject_cache + def _subject_tag_type(self): + """ + Parse subject tag, return (type, value), where type is one of + None, "new", "comment", or "control"; and value is None except + in the case of "comment", in which case it's the bug + ID/shortname. + """ + tag,subject = self._split_subject() + type = None + value = None + if tag == SUBJECT_TAG_NEW: + type = u"new" + elif tag == SUBJECT_TAG_CONTROL: + type = u"control" + else: + match = SUBJECT_TAG_COMMENT.match(tag) + if len(match.groups()) == 1: + type = u"comment" + value = match.group(1) + return (type, value) + def validate_subject(self): + """ + Validate the subject line. + """ + tag,subject = self._split_subject() + if not tag.startswith(SUBJECT_TAG_START): + raise InvalidSubject( + self, u"Subject must start with '%s'" % SUBJECT_TAG_START) + tag_type,value = self._subject_tag_type() + if tag_type == None: + raise InvalidSubject(self, u"Invalid tag '%s'" % tag) + elif tag_type == u"new" and len(subject) == 0: + raise InvalidSubject(self, u"Cannot create a bug with blank title") + elif tag_type == u"comment" and len(value) == 0: + raise InvalidSubject(self, u"Must specify a bug ID to comment") + def _get_bodies_and_mime_types(self): + """ + Traverse the email message returning (body, mime_type) for + each non-mulitpart portion of the message. + """ + for part in self.msg.walk(): + if part.is_multipart(): + continue + body,mime_type=(part.get_payload(decode=1),part.get_content_type()) + yield (body, mime_type) + def _parse_body_pseudoheaders(self, body, required, optional, + dictionary=None): + """ + Grab any pseudo-headers from the beginning of body. Raise + InvalidPseudoHeader on errors. Returns the body text after + the pseudo-header and a dictionary of set options. If you + like, you can initialize the dictionary with some defaults + and pass your initialized dict in as dictionary. + """ + if dictionary == None: + dictionary = {} + body_lines = body.splitlines() + all = required+optional + for i,line in enumerate(body_lines): + line = line.strip() + if len(line) == 0: + break + key,value = line.split(":", 1) + value = value.strip() + if key not in all: + raise InvalidPseudoHeader(self, key) + if len(value) == 0: + raise InvalidEmail( + self, u"Blank value for: %s" % key) + dictionary[key] = value + missing = [] + for key in required: + if key not in dictionary: + missing.append(key) + if len(missing) > 0: + raise InvalidPseudoHeader(self, + u"Missing required pseudo-headers:\n%s" + % u", ".join(missing)) + remaining_body = u"\n".join(body_lines[i:]).strip() + return (remaining_body, dictionary) + def _strip_footer(self, body): + body_lines = body.splitlines() + for i,line in enumerate(body_lines): + if line.startswith(BREAK): + break + return u"\n".join(body_lines[:i]).strip() + def parse(self): + """ + Parse the commands given in the email. Raises assorted + subclasses of InvalidEmail in the case of invalid messages, + otherwise returns a list of suggested commands to run. + """ + self.validate_subject() + tag_type,value = self._subject_tag_type() + commands = [] + if tag_type == u"new": + command = u"new" + tag,subject = self._split_subject() + summary = subject + options = {u"Reporter": self.author_addr()} + body,mime_type = list(self._get_bodies_and_mime_types())[0] + comment_body,options = \ + self._parse_body_pseudoheaders(body, + NEW_REQUIRED_PSEUDOHEADERS, + NEW_OPTIONAL_PSEUDOHEADERS, + options) + args = [u"--reporter", options[u"Reporter"]] + args.append(summary) + commands.append(Command(self, command, args)) + comment_body = self._strip_footer(comment_body) + id = ID(commands[0]) + if len(comment_body) > 0: + command = u"comment" + comment = u"Version: %s\n\n"%options[u"Version"] + comment_body + args = [u"--author", self.author_addr(), + u"--alt-id", self.message_id(), + u"--content-type", mime_type] + args.append(id) + args.append(u"-") + commands.append(Command(self, u"comment", args, stdin=comment)) + for key,value in options.items(): + if key in [u"Version", u"Reporter"]: + continue # we've already handled this option + command = key.lower() + args = [id, value] + commands.append(Command(self, command, args)) + elif tag_type == u"comment": + command = u"comment" + bug_id = value + author = self.author_addr() + alt_id = self.message_id() + body,mime_type = list(self._get_bodies_and_mime_types())[0] + if mime_type == "text/plain": + body = self._strip_footer(body) + content_type = mime_type + args = [u"--author", author, u"--alt-id", alt_id, + u"--content-type", content_type, bug_id, u"-"] + commands.append(Command(self, command, args, stdin=body)) + elif tag_type == u"control": + body,mime_type = list(self._get_bodies_and_mime_types())[0] + for line in body.splitlines(): + line = line.strip() + if line.startswith(CONTROL_COMMENT) or len(line) == 0: + continue + if line.startswith(BREAK): + break + fields = shlex.split(line) + command,args = (fields[0], fields[1:]) + commands.append(Command(self, command, args)) + if len(commands) == 0: + raise InvalidEmail(self, u"No commands in control email.") + else: + raise Exception, u"Unrecognized tag type '%s'" % tag_type + return commands + def run(self): + self._begin_response() + commands = self.parse() + try: + for command in commands: + command.run() + self._add_response(command.response_msg()) + finally: + if AUTOCOMMIT == True: + tag,subject = self._split_subject() + command = Command(self, "commit", [subject]) + command.run() + if LOGFILE != None: + LOGFILE.write("Autocommit:\n%s\n\n" % + send_pgp_mime.flatten(command.response_msg(), + to_unicode=True)) + def _begin_response(self): + tag,subject = self._split_subject() + response_header = [u"From: %s" % HANDLER_ADDRESS, + u"To: %s" % self.author_addr(), + u"Date: %s" % libbe.utility.time_to_str(time.time()), + u"Subject: %s Re: %s"%(SUBJECT_TAG_RESPONSE,subject) + ] + if self.message_id() != None: + response_header.append(u"In-reply-to: %s" % self.message_id()) + self.response_header = \ + send_pgp_mime.header_from_text(text=u"\n".join(response_header)) + self._response_messages = [] + def _add_response(self, response_message): + self._response_messages.append(response_message) + def response_email(self): + assert len(self._response_messages) > 0 + if len(self._response_messages) == 1: + response_body = self._response_messages[0] + else: + response_body = MIMEMultipart() + for message in self._response_messages: + response_body.attach(message) + return send_pgp_mime.attach_root(self.response_header, response_body) + +def generate_global_tags(tag_base=u"be-bug"): + """ + Generate a series of tags from a base tag string. + """ + global SUBJECT_TAG_BASE, SUBJECT_TAG_START, SUBJECT_TAG_RESPONSE, \ + SUBJECT_TAG_NEW, SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL + SUBJECT_TAG_BASE = tag_base + SUBJECT_TAG_START = u"[%s" % tag_base + SUBJECT_TAG_RESPONSE = u"[%s]" % tag_base + SUBJECT_TAG_NEW = u"[%s:submit]" % tag_base + SUBJECT_TAG_COMMENT = re.compile(u"\[%s:([\-0-9a-z]*)]" % tag_base) + SUBJECT_TAG_CONTROL = SUBJECT_TAG_RESPONSE + +def open_logfile(logpath=None): + """ + If logpath=None, default to global LOGPATH. + Special logpath strings: + "-" set LOGFILE to sys.stderr + "none" disable logging + Relative logpaths are expanded relative to _THIS_DIR + """ + global LOGPATH, LOGFILE + if logpath != None: + if logpath == u"-": + LOGPATH = u"stderr" + LOGFILE = sys.stderr + elif logpath == u"none": + LOGPATH = u"none" + LOGFILE = None + elif os.path.isabs(logpath): + LOGPATH = logpath + else: + LOGPATH = os.path.join(_THIS_DIR, logpath) + if LOGFILE == None and LOGPATH != u"none": + LOGFILE = codecs.open(LOGPATH, u"a+", ENCODING) + LOGFILE.write(u"Default encoding: %s\n" % ENCODING) + +def close_logfile(): + if LOGFILE != None and LOGPATH not in [u"stderr", u"none"]: + LOGFILE.close() + +def test(): + unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) + result = unittest.TextTestRunner(verbosity=2).run(suite) + num_errors = len(result.errors) + num_failures = len(result.failures) + num_bad = num_errors + num_failures + return num_bad + +def main(): + from optparse import OptionParser + global AUTOCOMMIT, BE_DIR + + usage="be-handle-mail [options]\n\n%s" % (__doc__) + parser = OptionParser(usage=usage) + parser.add_option('-b', '--be-dir', dest='be_dir', default=BE_DIR, + metavar="DIR", + help='Select the BE directory to serve (%default).') + parser.add_option('-t', '--tag-base', dest='tag_base', + default=SUBJECT_TAG_BASE, metavar="TAG", + help='Set the subject tag base (%default).') + parser.add_option('-o', '--output', dest='output', action='store_true', + help="Don't mail the generated message, print it to stdout instead. Useful for testing be-handle-mail functionality without the whole mail transfer agent and procmail setup.") + parser.add_option('-l', '--logfile', dest='logfile', metavar='LOGFILE', + help='Set the logfile to LOGFILE. Relative paths are relative to the location of this be-handle-mail file (%s). The special value of "-" directs the log output to stderr, and "none" disables logging.' % _THIS_DIR) + parser.add_option('-a', '--disable-autocommit', dest='autocommit', + default=True, action='store_false', + help='Disable the autocommit after parsing the email.') + parser.add_option('--test', dest='test', action='store_true', + help='Run internal unit-tests and exit.') + + options,args = parser.parse_args() + + if options.test == True: + num_bad = test() + if num_bad > 126: + num_bad = 1 + sys.exit(num_bad) + + BE_DIR = options.be_dir + AUTOCOMMIT = options.autocommit + generate_global_tags(options.tag_base) + + msg_text = sys.stdin.read() + libbe.encoding.set_IO_stream_encodings(ENCODING) # _after_ reading message + open_logfile(options.logfile) + if len(msg_text.strip()) == 0: # blank email!? + if LOGFILE != None: + LOGFILE.write(u"Blank email!\n") + close_logfile() + sys.exit(1) + try: + m = Message(msg_text) + m.run() + except InvalidEmail, e: + response = e.response() + except Exception, e: + if LOGFILE != None: + LOGFILE.write(u"Uncaught exception:\n%s\n" % (e,)) + traceback.print_tb(sys.exc_traceback, file=LOGFILE) + close_logfile() + sys.exit(1) + else: + response = m.response_email() + if options.output == True: + print send_pgp_mime.flatten(response, to_unicode=True) + else: + if LOGFILE != None: + LOGFILE.write(u"sending response to %s\n" % m.author_addr()) + LOGFILE.write(u"\n%s\n\n" % send_pgp_mime.flatten(response, + to_unicode=True)) + send_pgp_mime.mail(response, send_pgp_mime.sendmail) + close_logfile() + +class GenerateGlobalTagsTestCase (unittest.TestCase): + def setUp(self): + super(GenerateGlobalTagsTestCase, self).setUp() + self.save_global_tags() + def tearDown(self): + self.restore_global_tags() + super(GenerateGlobalTagsTestCase, self).tearDown() + def save_global_tags(self): + self.saved_globals = [SUBJECT_TAG_BASE, SUBJECT_TAG_START, + SUBJECT_TAG_RESPONSE, SUBJECT_TAG_NEW, + SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL] + def restore_global_tags(self): + global SUBJECT_TAG_BASE, SUBJECT_TAG_START, SUBJECT_TAG_RESPONSE, \ + SUBJECT_TAG_NEW, SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL + SUBJECT_TAG_BASE, SUBJECT_TAG_START, SUBJECT_TAG_RESPONSE, \ + SUBJECT_TAG_NEW, SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL = \ + self.saved_globals + def test_restore_global_tags(self): + "Test global tag restoration by teardown function." + global SUBJECT_TAG_BASE + self.failUnlessEqual(SUBJECT_TAG_BASE, u"be-bug") + SUBJECT_TAG_BASE = "projectX-bug" + self.failUnlessEqual(SUBJECT_TAG_BASE, u"projectX-bug") + self.restore_global_tags() + self.failUnlessEqual(SUBJECT_TAG_BASE, u"be-bug") + def test_subject_tag_base(self): + "Should set SUBJECT_TAG_BASE global correctly" + generate_global_tags(u"projectX-bug") + self.failUnlessEqual(SUBJECT_TAG_BASE, u"projectX-bug") + def test_subject_tag_start(self): + "Should set SUBJECT_TAG_START global correctly" + generate_global_tags(u"projectX-bug") + self.failUnlessEqual(SUBJECT_TAG_START, u"[projectX-bug") + def test_subject_tag_response(self): + "Should set SUBJECT_TAG_RESPONSE global correctly" + generate_global_tags(u"projectX-bug") + self.failUnlessEqual(SUBJECT_TAG_RESPONSE, u"[projectX-bug]") + def test_subject_tag_new(self): + "Should set SUBJECT_TAG_NEW global correctly" + generate_global_tags(u"projectX-bug") + self.failUnlessEqual(SUBJECT_TAG_NEW, u"[projectX-bug:submit]") + def test_subject_tag_control(self): + "Should set SUBJECT_TAG_CONTROL global correctly" + generate_global_tags(u"projectX-bug") + self.failUnlessEqual(SUBJECT_TAG_CONTROL, u"[projectX-bug]") + def test_subject_tag_comment(self): + "Should set SUBJECT_TAG_COMMENT global correctly" + generate_global_tags(u"projectX-bug") + m = SUBJECT_TAG_COMMENT.match("[projectX-bug:xyz-123]") + self.failUnlessEqual(len(m.groups()), 1) + self.failUnlessEqual(m.group(1), u"xyz-123") + +if __name__ == "__main__": + main() |