diff options
Diffstat (limited to 'interfaces/email/interactive/be-handle-mail')
-rwxr-xr-x | interfaces/email/interactive/be-handle-mail | 335 |
1 files changed, 205 insertions, 130 deletions
diff --git a/interfaces/email/interactive/be-handle-mail b/interfaces/email/interactive/be-handle-mail index c5b87b1..d2805d5 100755 --- a/interfaces/email/interactive/be-handle-mail +++ b/interfaces/email/interactive/be-handle-mail @@ -50,9 +50,8 @@ executed, with the email's post-tag subject as the commit message. """ import codecs -import StringIO as StringIO +import doctest import email -from email.mime.multipart import MIMEMultipart import email.utils import os import os.path @@ -62,15 +61,15 @@ import sys import time import traceback import types -import doctest import unittest +from email.mime.multipart import MIMEMultipart -import libbe.bugdir import libbe.bug -import libbe.comment -import libbe.diff +import libbe.bugdir import libbe.command import libbe.command.subscribe as subscribe +import libbe.comment +import libbe.diff import libbe.storage import libbe.ui.command_line import libbe.util.encoding @@ -105,12 +104,14 @@ ALLOWED_COMMANDS = [u'assign', u'comment', u'commit', u'depend', u'diff', AUTOCOMMIT = True ENCODING = u'utf-8' -libbe.util.encoding.ENCODING = ENCODING # force default encoding +libbe.util.encoding.ENCODING = ENCODING # force default encoding + -class InvalidEmail (ValueError): +class InvalidEmail(ValueError): def __init__(self, msg, message): ValueError.__init__(self, message) self.msg = msg + def response(self): header = self.msg.response_header body = [u'Error processing email:\n', @@ -122,58 +123,70 @@ class InvalidEmail (ValueError): response.attach(self.msg.msg) ret = send_pgp_mime.attach_root(header, response) return ret + def response_body(self): - err_text = [unicode(self)] + err_text = [str(self)] return u'\n'.join(err_text) -class InvalidSubject (InvalidEmail): + +class InvalidSubject(InvalidEmail): def __init__(self, msg, message=None): - if message == None: + if message is None: message = u'Invalid subject' InvalidEmail.__init__(self, msg, message) + def response_body(self): - err_text = u'\n'.join([unicode(self), u'', + err_text = u'\n'.join([str(self), u'', u'full subject was:', self.msg.subject()]) return err_text -class InvalidPseudoHeader (InvalidEmail): + +class InvalidPseudoHeader(InvalidEmail): def response_body(self): err_text = [u'Invalid pseudo-header:\n', - unicode(self)] + str(self)] return u'\n'.join(err_text) -class InvalidCommand (InvalidEmail): + +class InvalidCommand(InvalidEmail): def __init__(self, msg, command, message=None): bigmessage = u'Invalid execution command "%s"' % command - if message != None: + if message is not None: bigmessage += u'\n%s' % message InvalidEmail.__init__(self, msg, bigmessage) self.command = command -class InvalidOption (InvalidCommand): + +class InvalidOption(InvalidCommand): def __init__(self, msg, option, message=None): - bigmessage = u'Invalid option "%s"' % (option) - if message != None: + bigmessage = u'Invalid option "%s"' % option + info = "" + command = "" + if message is not None: bigmessage += u'\n%s' % message InvalidCommand.__init__(self, msg, info, command, bigmessage) self.option = option -class NotificationFailed (Exception): + +class NotificationFailed(Exception): def __init__(self, msg): bigmessage = 'Notification failed: %s' % msg Exception.__init__(self, bigmessage) self.short_msg = msg -class ID (object): + +class ID(object): """ Sometimes you want to reference the output of a command that hasn't been executed yet. ID is there for situations like > a = Command(msg, "new", ["create a bug"]) > b = Command(msg, "comment", [ID(a), "and comment on it"]) """ + def __init__(self, command): self.command = command + def extract_id(self): if hasattr(self, 'cached_id'): return self._cached_id @@ -181,17 +194,19 @@ class ID (object): if self.command.command.name == u'new': regexp = re.compile(u'Created bug with ID (.*)') else: - raise NotImplementedError, self.command.command + raise NotImplementedError(self.command.command) match = regexp.match(self.command.stdout) assert len(match.groups()) == 1, str(match.groups()) self._cached_id = match.group(1) return self._cached_id + def __str__(self): if self.command.ret != 0: return '<id for %s>' % repr(self.command) return '<id %s>' % self.extract_id() -class Command (object): + +class Command(object): """ A libbe.command.Command handler. @@ -203,54 +218,60 @@ class Command (object): args: list of arguments to pass to the command stdin: if non-null, a string to pipe into the command's stdin """ + def __init__(self, msg, command, args=None, stdin=None): self.msg = msg - if args == None: + if args is None: self.args = [] else: self.args = args self.command = libbe.command.get_command_class(command_name=command)() - self.command._setup_io = lambda i_enc,o_enc : None + self.command._setup_io = lambda i_enc, o_enc: None self.ret = None self.stdin = stdin self.stdout = None + def __str__(self): return '<command: %s %s>' % (self.command, ' '.join([str(s) for s in self.args])) + def normalize_args(self): """ Expand any ID placeholders in self.args. """ - for i,arg in enumerate(self.args): + for i, arg in enumerate(self.args): if isinstance(arg, ID): self.args[i] = arg.extract_id() + def run(self): """ Attempt to execute the command whose info is given in the dictionary info. Returns the exit code, stdout, and stderr produced by the command. """ - if self.command.name in [None, u'']: # don't accept blank commands + if self.command.name in [None, u'']: # don't accept blank commands raise InvalidCommand(self.msg, self, 'Blank') elif self.command.name not in ALLOWED_COMMANDS: raise InvalidCommand(self.msg, self, 'Not allowed') - assert self.ret == None, u'running %s twice!' % unicode(self) + assert self.ret is None, u'running %s twice!' % str(self) self.normalize_args() UI.io.set_stdin(self.stdin) self.ret = libbe.ui.command_line.dispatch(UI, self.command, self.args) self.stdout = UI.io.get_stdout() return (self.ret, self.stdout) + def response_msg(self): - if self.ret == None: self.ret = -1 + if self.ret is None: self.ret = -1 response_body = [u'Results of running: (exit code %d)' % self.ret, - u' %s %s' % (self.command.name,u' '.join(self.args))] - if self.stdout != None and len(self.stdout) > 0: + u' %s %s' % (self.command.name, u' '.join(self.args))] + if self.stdout is not None and len(self.stdout) > 0: response_body.extend([u'', u'output:', u'', self.stdout]) - response_body.append(u'') # trailing endline + response_body.append(u'') # trailing endline response_generator = \ send_pgp_mime.PGPMimeMessageFactory(u'\n'.join(response_body)) return response_generator.plain() -class DiffTree (libbe.diff.DiffTree): + +class DiffTree(libbe.diff.DiffTree): """ In order to avoid tons of tiny MIMEText attachments, bug-level nodes set .add_child_text=True (in .join()), which is propogated @@ -275,66 +296,80 @@ class DiffTree (libbe.diff.DiffTree): bugdir/bugs/mod/a/comments/rem bugdir/bugs/mod/a/comments/mod """ + def report_or_none(self): report = self.report() - if report == None: + if report is None: return None payload = report.get_payload() - if payload == None or len(payload) == 0: + if payload is None or len(payload) == 0: return None return report + def report_string(self): report = self.report_or_none() - if report == None: + if report is None: return 'No changes' else: return send_pgp_mime.flatten(report, to_unicode=True) + def make_root(self): return MIMEMultipart() + def join(self, root, parent, data_part): if hasattr(parent, 'attach_child_text'): self.attach_child_text = True - if data_part != None: + if data_part is not None: send_pgp_mime.append_text(parent.data_mime_part, u'\n\n%s' % (data_part)) self.data_mime_part = parent.data_mime_part else: self.data_mime_part = None - if data_part != None: + if data_part is not None: self.data_mime_part = send_pgp_mime.encodedMIMEText(data_part) - if parent != None and parent.name in [u'new', u'rem', u'mod']: + if parent is not None and parent.name in [u'new', u'rem', u'mod']: self.attach_child_text = True - if data_part == None: # make blank data_mime_part for children's appends + if data_part is None: # make blank data_mime_part for children's appends self.data_mime_part = send_pgp_mime.encodedMIMEText(u'') - if self.data_mime_part != None: + if self.data_mime_part is not None: self.data_mime_part[u'Content-Description'] = self.name root.attach(self.data_mime_part) + def data_part(self, depth, indent=False): return libbe.diff.DiffTree.data_part(self, depth, indent=indent) -class Diff (libbe.diff.Diff): - def bug_add_string(self, bug): + +class Diff(libbe.diff.Diff): + @staticmethod + def bug_add_string(bug): return bug.string(show_comments=True) - def _comment_summary_string(self, comment): + + @staticmethod + def _comment_summary_string(comment): return comment.string() + def comment_add_string(self, comment): return self._comment_summary_string(comment) + def comment_rem_string(self, comment): return self._comment_summary_string(comment) -class Message (object): + +class Message(object): def __init__(self, email_text=None, disable_parsing=False): if disable_parsing == False: self.text = email_text - p=email.Parser.Parser() - self.msg=p.parsestr(self.text) - if LOGFILE != None: + p = email.Parser.Parser() + self.msg = p.parsestr(self.text) + if LOGFILE is not None: LOGFILE.write(u'handling %s\n' % self.author_addr()) LOGFILE.write(u'\n%s\n\n' % self.text) - self.confirm = True # enable/disable confirmation email + self.confirm = True # enable/disable confirmation email + def _yes_no(self, boolean): if boolean == True: return 'yes' return 'no' + def author_tuple(self): """ Extract and normalize the sender's email address. Returns a @@ -344,36 +379,44 @@ class Message (object): self._author_tuple_cache = \ send_pgp_mime.source_email(self.msg, return_realname=True) return self._author_tuple_cache + def author_addr(self): return email.utils.formataddr(self.author_tuple()) + def author_name(self): return self.author_tuple()[0] + def author_email(self): return self.author_tuple()[1] + def default_msg_attribute_access(self, attr_name, default=None): if attr_name in self.msg: return self.msg[attr_name] return default + def message_id(self, default=None): return self.default_msg_attribute_access('message-id', default=default) + def subject(self): if 'subject' not in self.msg: raise InvalidSubject(self, u'Email must contain a subject') return self.msg['subject'] + def _split_subject(self): """ Returns (tag, subject), with missing values replaced by None. """ if hasattr(self, '_split_subject_cache'): return self._split_subject_cache - args = self.subject().split(u']',1) + args = self.subject().split(u']', 1) if len(args) < 1: self._split_subject_cache = (None, None) elif len(args) < 2: - self._split_subject_cache = (args[0]+u']', None) + self._split_subject_cache = (args[0] + u']', None) else: - self._split_subject_cache = (args[0]+u']', args[1].strip()) + self._split_subject_cache = (args[0] + u']', args[1].strip()) return self._split_subject_cache + def _subject_tag_type(self): """ Parse subject tag, return (type, value), where type is one of @@ -381,7 +424,7 @@ class Message (object): in the case of "comment", in which case it's the bug ID/shortname. """ - tag,subject = self._split_subject() + tag, subject = self._split_subject() type = None value = None if tag == SUBJECT_TAG_NEW: @@ -394,21 +437,23 @@ class Message (object): type = u'comment' value = match.group(1) return (type, value) + def validate_subject(self): """ Validate the subject line. """ - tag,subject = self._split_subject() + tag, subject = self._split_subject() if not tag.startswith(SUBJECT_TAG_START): raise InvalidSubject( self, u'Subject must start with "%s"' % SUBJECT_TAG_START) - tag_type,value = self._subject_tag_type() - if tag_type == None: + tag_type, value = self._subject_tag_type() + if tag_type is None: raise InvalidSubject(self, u'Invalid tag "%s"' % tag) elif tag_type == u'new' and len(subject) == 0: raise InvalidSubject(self, u'Cannot create a bug with blank title') elif tag_type == u'comment' and len(value) == 0: raise InvalidSubject(self, u'Must specify a bug ID to comment') + def _get_bodies_and_mime_types(self): """ Traverse the email message returning (body, mime_type) for @@ -418,11 +463,12 @@ class Message (object): for part in self.msg.walk(): if part.is_multipart(): continue - body,mime_type=(part.get_payload(decode=True),part.get_content_type()) + body, mime_type = (part.get_payload(decode=True), part.get_content_type()) charset = part.get_content_charset(msg_charset).lower() if mime_type.startswith('text/'): - body = unicode(body, charset) # convert text types to unicode + body = str(body, charset) # convert text types to unicode yield (body, mime_type) + def _parse_body_pseudoheaders(self, body, required, optional, dictionary=None): """ @@ -432,17 +478,17 @@ class Message (object): like, you can initialize the dictionary with some defaults and pass your initialized dict in as dictionary. """ - if dictionary == None: + if dictionary is None: dictionary = {} body_lines = body.splitlines() - all = required+optional - for i,line in enumerate(body_lines): + all = required + optional + for i, line in enumerate(body_lines): line = line.strip() if len(line) == 0: break if ':' not in line: - raise InvalidPseudoheader(self, line) - key,value = line.split(':', 1) + raise InvalidPseudoHeader(self, line) + key, value = line.split(':', 1) value = value.strip() if key not in all: raise InvalidPseudoHeader(self, key) @@ -460,13 +506,15 @@ class Message (object): % u', '.join(missing)) remaining_body = u'\n'.join(body_lines[i:]).strip() return (remaining_body, dictionary) + def _strip_footer(self, body): body_lines = body.splitlines() - for i,line in enumerate(body_lines): + for i, line in enumerate(body_lines): if line.startswith(BREAK): break - i += 1 # increment past the current valid line. + i += 1 # increment past the current valid line. return u'\n'.join(body_lines[:i]).strip() + def parse(self): """ Parse the commands given in the email. Raises assorted @@ -474,7 +522,7 @@ class Message (object): otherwise returns a list of suggested commands to run. """ self.validate_subject() - tag_type,value = self._subject_tag_type() + tag_type, value = self._subject_tag_type() if tag_type == u'new': commands = self.parse_new() elif tag_type == u'comment': @@ -482,18 +530,19 @@ class Message (object): elif tag_type == u'control': commands = self.parse_control() else: - raise Exception, u'Unrecognized tag type "%s"' % tag_type + raise Exception(u'Unrecognized tag type "%s"' % tag_type) return commands + def parse_new(self): command = u'new' - tag,subject = self._split_subject() + tag, subject = self._split_subject() summary = subject options = {u'Reporter': self.author_addr(), u'Confirm': self._yes_no(self.confirm), u'Subscribe': 'no', } - body,mime_type = list(self._get_bodies_and_mime_types())[0] - comment_body,options = \ + body, mime_type = list(self._get_bodies_and_mime_types())[0] + comment_body, options = \ self._parse_body_pseudoheaders(body, NEW_REQUIRED_PSEUDOHEADERS, NEW_OPTIONAL_PSEUDOHEADERS, @@ -512,16 +561,16 @@ class Message (object): comment_body = self._strip_footer(comment_body) if len(comment_body) > 0: command = u'comment' - comment = u'Version: %s\n\n'%options[u'Version'] + comment_body + comment = u'Version: %s\n\n' % options[u'Version'] + comment_body args = [u'--author', self.author_addr(), u'--alt-id', self.message_id(), u'--content-type', mime_type] args.append(id) args.append(u'-') commands.append(Command(self, u'comment', args, stdin=comment)) - for key,value in options.items(): + for key, value in options.items(): if key in [u'Version', u'Reporter', u'Confirm']: - continue # we've already handled these options + continue # we've already handled these options command = key.lower() if key in [u'Depend', u'Tag', u'Target', u'Subscribe']: args = [id, value] @@ -533,23 +582,25 @@ class Message (object): args = ['--subscriber', self.author_addr(), id] commands.append(Command(self, command, args)) return commands + def parse_comment(self, bug_uuid): command = u'comment' bug_id = bug_uuid author = self.author_addr() alt_id = self.message_id() - body,mime_type = list(self._get_bodies_and_mime_types())[0] + body, mime_type = list(self._get_bodies_and_mime_types())[0] if mime_type == 'text/plain': body = self._strip_footer(body) content_type = mime_type args = [u'--author', author] - if alt_id != None: + if alt_id is not None: args.extend([u'--alt-id', alt_id]) args.extend([u'--content-type', content_type, bug_id, u'-']) commands = [Command(self, command, args, stdin=body)] return commands + def parse_control(self): - body,mime_type = list(self._get_bodies_and_mime_types())[0] + body, mime_type = list(self._get_bodies_and_mime_types())[0] commands = [] for line in body.splitlines(): line = line.strip() @@ -564,42 +615,46 @@ class Message (object): if type(line) == types.UnicodeType: # work around http://bugs.python.org/issue1170 for field in fields: - field = unicode(field, 'unicode escape') - command,args = (fields[0], fields[1:]) + field = str(field, 'unicode escape') + command, args = (fields[0], fields[1:]) commands.append(Command(self, command, args)) if len(commands) == 0: raise InvalidEmail(self, u'No commands in control email.') return commands + def run(self, repo='.'): self._begin_response() commands = self.parse() try: - for i,command in enumerate(commands): + for i, command in enumerate(commands): command.run() self._add_response(command.response_msg()) finally: if AUTOCOMMIT == True: - tag,subject = self._split_subject() + tag, subject = self._split_subject() self.commit_command = Command(self, 'commit', [subject]) self.commit_command.run() - if LOGFILE != None: + if LOGFILE is not None: LOGFILE.write(u'Autocommit:\n%s\n\n' % - send_pgp_mime.flatten(self.commit_command.response_msg(), - to_unicode=True)) + send_pgp_mime.flatten(self.commit_command.response_msg(), + to_unicode=True)) + def _begin_response(self): - tag,subject = self._split_subject() + tag, subject = self._split_subject() response_header = [u'From: %s' % THIS_ADDRESS, u'To: %s' % self.author_addr(), u'Date: %s' % libbe.util.utility.time_to_str(time.time()), - u'Subject: %s Re: %s'%(SUBJECT_TAG_RESPONSE,subject) + u'Subject: %s Re: %s' % (SUBJECT_TAG_RESPONSE, subject) ] - if self.message_id() != None: + if self.message_id() is not None: response_header.append(u'In-reply-to: %s' % self.message_id()) self.response_header = \ send_pgp_mime.header_from_text(text=u'\n'.join(response_header)) self._response_messages = [] + def _add_response(self, response_message): self._response_messages.append(response_message) + def response_email(self): assert len(self._response_messages) > 0 if len(self._response_messages) == 1: @@ -609,9 +664,10 @@ class Message (object): for message in self._response_messages: response_body.attach(message) return send_pgp_mime.attach_root(self.response_header, response_body) + def subscriber_emails(self, previous_revision=None): - if previous_revision == None: - if AUTOCOMMIT != True: # no way to tell what's changed + if previous_revision is None: + if AUTOCOMMIT != True: # no way to tell what's changed raise NotificationFailed('Autocommit dissabled') if len(self._response_messages) == 0: raise NotificationFailed('Initial email failed.') @@ -622,7 +678,7 @@ class Message (object): bd = UI.storage_callbacks.get_bugdir() writeable = bd.storage.writeable bd.storage.writeable = False - if bd.storage.versioned == False: # no way to tell what's changed + if bd.storage.versioned == False: # no way to tell what's changed bd.storage.writeable = writeable raise NotificationFailed('Not versioned') @@ -631,12 +687,12 @@ class Message (object): if len(subscribers) == 0: bd.storage.writeable = writeable return [] - for subscriber,subscriptions in subscribers.items(): + for subscriber, subscriptions in subscribers.items(): subscribers[subscriber] = [] - for id,types in subscriptions.items(): + for id, types in subscriptions.items(): for type in types: subscribers[subscriber].append( - libbe.diff.Subscription(id,type)) + libbe.diff.Subscription(id, type)) before_bd, after_bd = self._get_before_and_after_bugdirs(bd, previous_revision) diff = Diff(before_bd, after_bd) @@ -644,41 +700,43 @@ class Message (object): header = self._subscriber_header(bd, previous_revision) emails = [] - for subscriber,subscriptions in subscribers.items(): + for subscriber, subscriptions in subscribers.items(): header.replace_header('to', subscriber) report = diff.report_tree(subscriptions, diff_tree=DiffTree) root = report.report_or_none() - if root != None: + if root is not None: emails.append(send_pgp_mime.attach_root(header, root)) - if LOGFILE != None: + if LOGFILE is not None: LOGFILE.write(u'Preparing to notify %s of changes\n' % subscriber) bd.storage.writeable = writeable return emails + def _get_before_and_after_bugdirs(self, bd, previous_revision=None): - if previous_revision == None: + if previous_revision is None: commit_msg = self.commit_command.stdout assert commit_msg.startswith('Committed '), commit_msg after_revision = commit_msg[len('Committed '):] before_revision = bd.storage.revision_id(-2) else: before_revision = previous_revision - if before_revision == None: + if before_revision is None: # this commit was the initial commit before_bd = libbe.bugdir.BugDir(from_disk=False, manipulate_encodings=False) else: before_bd = libbe.bugdir.RevisionedBugDir(bd, before_revision) - #after_bd = bd.duplicate_bugdir(after_revision) - after_bd = bd # assume no changes since commit a few cycles ago + # after_bd = bd.duplicate_bugdir(after_revision) + after_bd = bd # assume no changes since commit a few cycles ago return (before_bd, after_bd) + def _subscriber_header(self, bd, previous_revision=None): root_dir = os.path.basename(bd.storage.repo) - if previous_revision == None: + if previous_revision is None: subject = 'Changes to %s on %s by %s' \ - % (root_dir, THIS_SERVER, self.author_addr()) + % (root_dir, THIS_SERVER, self.author_addr()) else: subject = 'Changes to %s on %s since revision %s' \ - % (root_dir, THIS_SERVER, previous_revision) + % (root_dir, THIS_SERVER, previous_revision) header = [u'From: %s' % THIS_ADDRESS, u'To: %s' % u'DUMMY-AUTHOR', u'Date: %s' % libbe.util.utility.time_to_str(time.time()), @@ -686,6 +744,7 @@ class Message (object): ] return send_pgp_mime.header_from_text(text=u'\n'.join(header)) + def generate_global_tags(tag_base=u'be-bug'): """ Generate a series of tags from a base tag string. @@ -699,6 +758,7 @@ def generate_global_tags(tag_base=u'be-bug'): SUBJECT_TAG_COMMENT = re.compile(u'\[%s:([\-0-9a-z/]*)]' % tag_base) SUBJECT_TAG_CONTROL = SUBJECT_TAG_RESPONSE + def open_logfile(logpath=None): """ If logpath=None, default to global LOGPATH. @@ -708,7 +768,7 @@ def open_logfile(logpath=None): Relative logpaths are expanded relative to _THIS_DIR """ global LOGPATH, LOGFILE - if logpath != None: + if logpath is not None: if logpath == u'-': LOGPATH = u'stderr' LOGFILE = sys.stderr @@ -719,14 +779,16 @@ def open_logfile(logpath=None): LOGPATH = logpath else: LOGPATH = os.path.join(_THIS_DIR, logpath) - if LOGFILE == None and LOGPATH != u'none': + if LOGFILE is None and LOGPATH != u'none': LOGFILE = codecs.open(LOGPATH, u'a+', - libbe.util.encoding.get_text_file_encoding()) + libbe.util.encoding.get_text_file_encoding()) + def close_logfile(): - if LOGFILE != None and LOGPATH not in [u'stderr', u'none']: + if LOGFILE is not None and LOGPATH not in [u'stderr', u'none']: LOGFILE.close() + def test(): result = unittest.TextTestRunner(verbosity=2).run(suite) num_errors = len(result.errors) @@ -734,11 +796,12 @@ def test(): num_bad = num_errors + num_failures return num_bad + def main(args): from optparse import OptionParser global AUTOCOMMIT, UI - usage='be-handle-mail [options]\n\n%s' % (__doc__) + usage = 'be-handle-mail [options]\n\n%s' % (__doc__) parser = OptionParser(usage=usage) parser.add_option('-r', '--repo', dest='repo', default=_THIS_DIR, metavar='REPO', @@ -762,17 +825,17 @@ def main(args): help='Run internal unit-tests and exit.') pargs = args - options,args = parser.parse_args(args[1:]) + options, args = parser.parse_args(args[1:]) if options.test == True: num_bad = test() if num_bad > 126: num_bad = 1 sys.exit(num_bad) - + AUTOCOMMIT = options.autocommit - if options.notify_since == None: + if options.notify_since is None: msg_text = sys.stdin.read() open_logfile(options.logfile) @@ -781,29 +844,29 @@ def main(args): io = libbe.command.StringInputOutput() UI = libbe.command.UserInterface(io, location=options.repo) - if options.notify_since != None: + if options.notify_since is not None: if options.subscribers == True: - if LOGFILE != None: + if LOGFILE is not None: LOGFILE.write(u'Checking for subscribers to notify since revision %s\n' % options.notify_since) try: m = Message(disable_parsing=True) emails = m.subscriber_emails(options.notify_since) - except NotificationFailed, e: - if LOGFILE != None: - LOGFILE.write(unicode(e) + u'\n') + except NotificationFailed as e: + if LOGFILE is not None: + LOGFILE.write(str(e) + u'\n') else: for msg in emails: if options.output == True: - print send_pgp_mime.flatten(msg, to_unicode=True) + print(send_pgp_mime.flatten(msg, to_unicode=True)) else: send_pgp_mime.mail(msg, send_pgp_mime.sendmail) close_logfile() UI.cleanup() sys.exit(0) - if len(msg_text.strip()) == 0: # blank email!? - if LOGFILE != None: + if len(msg_text.strip()) == 0: # blank email!? + if LOGFILE is not None: LOGFILE.write(u'Blank email!\n') close_logfile() UI.cleanup() @@ -811,10 +874,10 @@ def main(args): try: m = Message(msg_text) m.run() - except InvalidEmail, e: + except InvalidEmail as e: response = e.response() - except Exception, e: - if LOGFILE != None: + except Exception as e: + if LOGFILE is not None: LOGFILE.write(u'Uncaught exception:\n%s\n' % (e,)) traceback.print_tb(sys.exc_traceback, file=LOGFILE) close_logfile() @@ -824,28 +887,28 @@ def main(args): else: response = m.response_email() if options.output == True: - print send_pgp_mime.flatten(response, to_unicode=True) + print(send_pgp_mime.flatten(response, to_unicode=True)) elif m.confirm == True: - if LOGFILE != None: + if LOGFILE is not None: LOGFILE.write(u'Sending response to %s\n' % m.author_addr()) LOGFILE.write(u'\n%s\n\n' % send_pgp_mime.flatten(response, to_unicode=True)) send_pgp_mime.mail(response, send_pgp_mime.sendmail) else: - if LOGFILE != None: + if LOGFILE is not None: LOGFILE.write(u'Response declined by %s\n' % m.author_addr()) if options.subscribers == True: - if LOGFILE != None: + if LOGFILE is not None: LOGFILE.write(u'Checking for subscribers\n') try: emails = m.subscriber_emails() - except NotificationFailed, e: - if LOGFILE != None: - LOGFILE.write(unicode(e) + u'\n') + except NotificationFailed as e: + if LOGFILE is not None: + LOGFILE.write(str(e) + u'\n') else: for msg in emails: if options.output == True: - print send_pgp_mime.flatten(msg, to_unicode=True) + print(send_pgp_mime.flatten(msg, to_unicode=True)) else: send_pgp_mime.mail(msg, send_pgp_mime.sendmail) @@ -853,23 +916,28 @@ def main(args): m.commit_command.cleanup() UI.cleanup() -class GenerateGlobalTagsTestCase (unittest.TestCase): + +class GenerateGlobalTagsTestCase(unittest.TestCase): def setUp(self): super(GenerateGlobalTagsTestCase, self).setUp() self.save_global_tags() + def tearDown(self): self.restore_global_tags() super(GenerateGlobalTagsTestCase, self).tearDown() + def save_global_tags(self): self.saved_globals = [SUBJECT_TAG_BASE, SUBJECT_TAG_START, SUBJECT_TAG_RESPONSE, SUBJECT_TAG_NEW, SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL] + def restore_global_tags(self): global SUBJECT_TAG_BASE, SUBJECT_TAG_START, SUBJECT_TAG_RESPONSE, \ SUBJECT_TAG_NEW, SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL SUBJECT_TAG_BASE, SUBJECT_TAG_START, SUBJECT_TAG_RESPONSE, \ SUBJECT_TAG_NEW, SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL = \ self.saved_globals + def test_restore_global_tags(self): "Test global tag restoration by teardown function." global SUBJECT_TAG_BASE @@ -878,26 +946,32 @@ class GenerateGlobalTagsTestCase (unittest.TestCase): self.failUnlessEqual(SUBJECT_TAG_BASE, u'projectX-bug') self.restore_global_tags() self.failUnlessEqual(SUBJECT_TAG_BASE, u'be-bug') + def test_subject_tag_base(self): "Should set SUBJECT_TAG_BASE global correctly" generate_global_tags(u'projectX-bug') self.failUnlessEqual(SUBJECT_TAG_BASE, u'projectX-bug') + def test_subject_tag_start(self): "Should set SUBJECT_TAG_START global correctly" generate_global_tags(u'projectX-bug') self.failUnlessEqual(SUBJECT_TAG_START, u'[projectX-bug') + def test_subject_tag_response(self): "Should set SUBJECT_TAG_RESPONSE global correctly" generate_global_tags(u'projectX-bug') self.failUnlessEqual(SUBJECT_TAG_RESPONSE, u'[projectX-bug]') + def test_subject_tag_new(self): "Should set SUBJECT_TAG_NEW global correctly" generate_global_tags(u'projectX-bug') self.failUnlessEqual(SUBJECT_TAG_NEW, u'[projectX-bug:submit]') + def test_subject_tag_control(self): "Should set SUBJECT_TAG_CONTROL global correctly" generate_global_tags(u'projectX-bug') self.failUnlessEqual(SUBJECT_TAG_CONTROL, u'[projectX-bug]') + def test_subject_tag_comment(self): "Should set SUBJECT_TAG_COMMENT global correctly" generate_global_tags(u'projectX-bug') @@ -905,6 +979,7 @@ class GenerateGlobalTagsTestCase (unittest.TestCase): self.failUnlessEqual(len(m.groups()), 1) self.failUnlessEqual(m.group(1), u'abc/xyz-123') + unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |