aboutsummaryrefslogtreecommitdiffstats
path: root/interfaces/email/interactive/be-handle-mail
diff options
context:
space:
mode:
Diffstat (limited to 'interfaces/email/interactive/be-handle-mail')
-rwxr-xr-xinterfaces/email/interactive/be-handle-mail335
1 files changed, 205 insertions, 130 deletions
diff --git a/interfaces/email/interactive/be-handle-mail b/interfaces/email/interactive/be-handle-mail
index c5b87b1..d2805d5 100755
--- a/interfaces/email/interactive/be-handle-mail
+++ b/interfaces/email/interactive/be-handle-mail
@@ -50,9 +50,8 @@ executed, with the email's post-tag subject as the commit message.
"""
import codecs
-import StringIO as StringIO
+import doctest
import email
-from email.mime.multipart import MIMEMultipart
import email.utils
import os
import os.path
@@ -62,15 +61,15 @@ import sys
import time
import traceback
import types
-import doctest
import unittest
+from email.mime.multipart import MIMEMultipart
-import libbe.bugdir
import libbe.bug
-import libbe.comment
-import libbe.diff
+import libbe.bugdir
import libbe.command
import libbe.command.subscribe as subscribe
+import libbe.comment
+import libbe.diff
import libbe.storage
import libbe.ui.command_line
import libbe.util.encoding
@@ -105,12 +104,14 @@ ALLOWED_COMMANDS = [u'assign', u'comment', u'commit', u'depend', u'diff',
AUTOCOMMIT = True
ENCODING = u'utf-8'
-libbe.util.encoding.ENCODING = ENCODING # force default encoding
+libbe.util.encoding.ENCODING = ENCODING # force default encoding
+
-class InvalidEmail (ValueError):
+class InvalidEmail(ValueError):
def __init__(self, msg, message):
ValueError.__init__(self, message)
self.msg = msg
+
def response(self):
header = self.msg.response_header
body = [u'Error processing email:\n',
@@ -122,58 +123,70 @@ class InvalidEmail (ValueError):
response.attach(self.msg.msg)
ret = send_pgp_mime.attach_root(header, response)
return ret
+
def response_body(self):
- err_text = [unicode(self)]
+ err_text = [str(self)]
return u'\n'.join(err_text)
-class InvalidSubject (InvalidEmail):
+
+class InvalidSubject(InvalidEmail):
def __init__(self, msg, message=None):
- if message == None:
+ if message is None:
message = u'Invalid subject'
InvalidEmail.__init__(self, msg, message)
+
def response_body(self):
- err_text = u'\n'.join([unicode(self), u'',
+ err_text = u'\n'.join([str(self), u'',
u'full subject was:',
self.msg.subject()])
return err_text
-class InvalidPseudoHeader (InvalidEmail):
+
+class InvalidPseudoHeader(InvalidEmail):
def response_body(self):
err_text = [u'Invalid pseudo-header:\n',
- unicode(self)]
+ str(self)]
return u'\n'.join(err_text)
-class InvalidCommand (InvalidEmail):
+
+class InvalidCommand(InvalidEmail):
def __init__(self, msg, command, message=None):
bigmessage = u'Invalid execution command "%s"' % command
- if message != None:
+ if message is not None:
bigmessage += u'\n%s' % message
InvalidEmail.__init__(self, msg, bigmessage)
self.command = command
-class InvalidOption (InvalidCommand):
+
+class InvalidOption(InvalidCommand):
def __init__(self, msg, option, message=None):
- bigmessage = u'Invalid option "%s"' % (option)
- if message != None:
+ bigmessage = u'Invalid option "%s"' % option
+ info = ""
+ command = ""
+ if message is not None:
bigmessage += u'\n%s' % message
InvalidCommand.__init__(self, msg, info, command, bigmessage)
self.option = option
-class NotificationFailed (Exception):
+
+class NotificationFailed(Exception):
def __init__(self, msg):
bigmessage = 'Notification failed: %s' % msg
Exception.__init__(self, bigmessage)
self.short_msg = msg
-class ID (object):
+
+class ID(object):
"""
Sometimes you want to reference the output of a command that
hasn't been executed yet. ID is there for situations like
> a = Command(msg, "new", ["create a bug"])
> b = Command(msg, "comment", [ID(a), "and comment on it"])
"""
+
def __init__(self, command):
self.command = command
+
def extract_id(self):
if hasattr(self, 'cached_id'):
return self._cached_id
@@ -181,17 +194,19 @@ class ID (object):
if self.command.command.name == u'new':
regexp = re.compile(u'Created bug with ID (.*)')
else:
- raise NotImplementedError, self.command.command
+ raise NotImplementedError(self.command.command)
match = regexp.match(self.command.stdout)
assert len(match.groups()) == 1, str(match.groups())
self._cached_id = match.group(1)
return self._cached_id
+
def __str__(self):
if self.command.ret != 0:
return '<id for %s>' % repr(self.command)
return '<id %s>' % self.extract_id()
-class Command (object):
+
+class Command(object):
"""
A libbe.command.Command handler.
@@ -203,54 +218,60 @@ class Command (object):
args: list of arguments to pass to the command
stdin: if non-null, a string to pipe into the command's stdin
"""
+
def __init__(self, msg, command, args=None, stdin=None):
self.msg = msg
- if args == None:
+ if args is None:
self.args = []
else:
self.args = args
self.command = libbe.command.get_command_class(command_name=command)()
- self.command._setup_io = lambda i_enc,o_enc : None
+ self.command._setup_io = lambda i_enc, o_enc: None
self.ret = None
self.stdin = stdin
self.stdout = None
+
def __str__(self):
return '<command: %s %s>' % (self.command, ' '.join([str(s) for s in self.args]))
+
def normalize_args(self):
"""
Expand any ID placeholders in self.args.
"""
- for i,arg in enumerate(self.args):
+ for i, arg in enumerate(self.args):
if isinstance(arg, ID):
self.args[i] = arg.extract_id()
+
def run(self):
"""
Attempt to execute the command whose info is given in the dictionary
info. Returns the exit code, stdout, and stderr produced by the
command.
"""
- if self.command.name in [None, u'']: # don't accept blank commands
+ if self.command.name in [None, u'']: # don't accept blank commands
raise InvalidCommand(self.msg, self, 'Blank')
elif self.command.name not in ALLOWED_COMMANDS:
raise InvalidCommand(self.msg, self, 'Not allowed')
- assert self.ret == None, u'running %s twice!' % unicode(self)
+ assert self.ret is None, u'running %s twice!' % str(self)
self.normalize_args()
UI.io.set_stdin(self.stdin)
self.ret = libbe.ui.command_line.dispatch(UI, self.command, self.args)
self.stdout = UI.io.get_stdout()
return (self.ret, self.stdout)
+
def response_msg(self):
- if self.ret == None: self.ret = -1
+ if self.ret is None: self.ret = -1
response_body = [u'Results of running: (exit code %d)' % self.ret,
- u' %s %s' % (self.command.name,u' '.join(self.args))]
- if self.stdout != None and len(self.stdout) > 0:
+ u' %s %s' % (self.command.name, u' '.join(self.args))]
+ if self.stdout is not None and len(self.stdout) > 0:
response_body.extend([u'', u'output:', u'', self.stdout])
- response_body.append(u'') # trailing endline
+ response_body.append(u'') # trailing endline
response_generator = \
send_pgp_mime.PGPMimeMessageFactory(u'\n'.join(response_body))
return response_generator.plain()
-class DiffTree (libbe.diff.DiffTree):
+
+class DiffTree(libbe.diff.DiffTree):
"""
In order to avoid tons of tiny MIMEText attachments, bug-level
nodes set .add_child_text=True (in .join()), which is propogated
@@ -275,66 +296,80 @@ class DiffTree (libbe.diff.DiffTree):
bugdir/bugs/mod/a/comments/rem
bugdir/bugs/mod/a/comments/mod
"""
+
def report_or_none(self):
report = self.report()
- if report == None:
+ if report is None:
return None
payload = report.get_payload()
- if payload == None or len(payload) == 0:
+ if payload is None or len(payload) == 0:
return None
return report
+
def report_string(self):
report = self.report_or_none()
- if report == None:
+ if report is None:
return 'No changes'
else:
return send_pgp_mime.flatten(report, to_unicode=True)
+
def make_root(self):
return MIMEMultipart()
+
def join(self, root, parent, data_part):
if hasattr(parent, 'attach_child_text'):
self.attach_child_text = True
- if data_part != None:
+ if data_part is not None:
send_pgp_mime.append_text(parent.data_mime_part, u'\n\n%s' % (data_part))
self.data_mime_part = parent.data_mime_part
else:
self.data_mime_part = None
- if data_part != None:
+ if data_part is not None:
self.data_mime_part = send_pgp_mime.encodedMIMEText(data_part)
- if parent != None and parent.name in [u'new', u'rem', u'mod']:
+ if parent is not None and parent.name in [u'new', u'rem', u'mod']:
self.attach_child_text = True
- if data_part == None: # make blank data_mime_part for children's appends
+ if data_part is None: # make blank data_mime_part for children's appends
self.data_mime_part = send_pgp_mime.encodedMIMEText(u'')
- if self.data_mime_part != None:
+ if self.data_mime_part is not None:
self.data_mime_part[u'Content-Description'] = self.name
root.attach(self.data_mime_part)
+
def data_part(self, depth, indent=False):
return libbe.diff.DiffTree.data_part(self, depth, indent=indent)
-class Diff (libbe.diff.Diff):
- def bug_add_string(self, bug):
+
+class Diff(libbe.diff.Diff):
+ @staticmethod
+ def bug_add_string(bug):
return bug.string(show_comments=True)
- def _comment_summary_string(self, comment):
+
+ @staticmethod
+ def _comment_summary_string(comment):
return comment.string()
+
def comment_add_string(self, comment):
return self._comment_summary_string(comment)
+
def comment_rem_string(self, comment):
return self._comment_summary_string(comment)
-class Message (object):
+
+class Message(object):
def __init__(self, email_text=None, disable_parsing=False):
if disable_parsing == False:
self.text = email_text
- p=email.Parser.Parser()
- self.msg=p.parsestr(self.text)
- if LOGFILE != None:
+ p = email.Parser.Parser()
+ self.msg = p.parsestr(self.text)
+ if LOGFILE is not None:
LOGFILE.write(u'handling %s\n' % self.author_addr())
LOGFILE.write(u'\n%s\n\n' % self.text)
- self.confirm = True # enable/disable confirmation email
+ self.confirm = True # enable/disable confirmation email
+
def _yes_no(self, boolean):
if boolean == True:
return 'yes'
return 'no'
+
def author_tuple(self):
"""
Extract and normalize the sender's email address. Returns a
@@ -344,36 +379,44 @@ class Message (object):
self._author_tuple_cache = \
send_pgp_mime.source_email(self.msg, return_realname=True)
return self._author_tuple_cache
+
def author_addr(self):
return email.utils.formataddr(self.author_tuple())
+
def author_name(self):
return self.author_tuple()[0]
+
def author_email(self):
return self.author_tuple()[1]
+
def default_msg_attribute_access(self, attr_name, default=None):
if attr_name in self.msg:
return self.msg[attr_name]
return default
+
def message_id(self, default=None):
return self.default_msg_attribute_access('message-id', default=default)
+
def subject(self):
if 'subject' not in self.msg:
raise InvalidSubject(self, u'Email must contain a subject')
return self.msg['subject']
+
def _split_subject(self):
"""
Returns (tag, subject), with missing values replaced by None.
"""
if hasattr(self, '_split_subject_cache'):
return self._split_subject_cache
- args = self.subject().split(u']',1)
+ args = self.subject().split(u']', 1)
if len(args) < 1:
self._split_subject_cache = (None, None)
elif len(args) < 2:
- self._split_subject_cache = (args[0]+u']', None)
+ self._split_subject_cache = (args[0] + u']', None)
else:
- self._split_subject_cache = (args[0]+u']', args[1].strip())
+ self._split_subject_cache = (args[0] + u']', args[1].strip())
return self._split_subject_cache
+
def _subject_tag_type(self):
"""
Parse subject tag, return (type, value), where type is one of
@@ -381,7 +424,7 @@ class Message (object):
in the case of "comment", in which case it's the bug
ID/shortname.
"""
- tag,subject = self._split_subject()
+ tag, subject = self._split_subject()
type = None
value = None
if tag == SUBJECT_TAG_NEW:
@@ -394,21 +437,23 @@ class Message (object):
type = u'comment'
value = match.group(1)
return (type, value)
+
def validate_subject(self):
"""
Validate the subject line.
"""
- tag,subject = self._split_subject()
+ tag, subject = self._split_subject()
if not tag.startswith(SUBJECT_TAG_START):
raise InvalidSubject(
self, u'Subject must start with "%s"' % SUBJECT_TAG_START)
- tag_type,value = self._subject_tag_type()
- if tag_type == None:
+ tag_type, value = self._subject_tag_type()
+ if tag_type is None:
raise InvalidSubject(self, u'Invalid tag "%s"' % tag)
elif tag_type == u'new' and len(subject) == 0:
raise InvalidSubject(self, u'Cannot create a bug with blank title')
elif tag_type == u'comment' and len(value) == 0:
raise InvalidSubject(self, u'Must specify a bug ID to comment')
+
def _get_bodies_and_mime_types(self):
"""
Traverse the email message returning (body, mime_type) for
@@ -418,11 +463,12 @@ class Message (object):
for part in self.msg.walk():
if part.is_multipart():
continue
- body,mime_type=(part.get_payload(decode=True),part.get_content_type())
+ body, mime_type = (part.get_payload(decode=True), part.get_content_type())
charset = part.get_content_charset(msg_charset).lower()
if mime_type.startswith('text/'):
- body = unicode(body, charset) # convert text types to unicode
+ body = str(body, charset) # convert text types to unicode
yield (body, mime_type)
+
def _parse_body_pseudoheaders(self, body, required, optional,
dictionary=None):
"""
@@ -432,17 +478,17 @@ class Message (object):
like, you can initialize the dictionary with some defaults
and pass your initialized dict in as dictionary.
"""
- if dictionary == None:
+ if dictionary is None:
dictionary = {}
body_lines = body.splitlines()
- all = required+optional
- for i,line in enumerate(body_lines):
+ all = required + optional
+ for i, line in enumerate(body_lines):
line = line.strip()
if len(line) == 0:
break
if ':' not in line:
- raise InvalidPseudoheader(self, line)
- key,value = line.split(':', 1)
+ raise InvalidPseudoHeader(self, line)
+ key, value = line.split(':', 1)
value = value.strip()
if key not in all:
raise InvalidPseudoHeader(self, key)
@@ -460,13 +506,15 @@ class Message (object):
% u', '.join(missing))
remaining_body = u'\n'.join(body_lines[i:]).strip()
return (remaining_body, dictionary)
+
def _strip_footer(self, body):
body_lines = body.splitlines()
- for i,line in enumerate(body_lines):
+ for i, line in enumerate(body_lines):
if line.startswith(BREAK):
break
- i += 1 # increment past the current valid line.
+ i += 1 # increment past the current valid line.
return u'\n'.join(body_lines[:i]).strip()
+
def parse(self):
"""
Parse the commands given in the email. Raises assorted
@@ -474,7 +522,7 @@ class Message (object):
otherwise returns a list of suggested commands to run.
"""
self.validate_subject()
- tag_type,value = self._subject_tag_type()
+ tag_type, value = self._subject_tag_type()
if tag_type == u'new':
commands = self.parse_new()
elif tag_type == u'comment':
@@ -482,18 +530,19 @@ class Message (object):
elif tag_type == u'control':
commands = self.parse_control()
else:
- raise Exception, u'Unrecognized tag type "%s"' % tag_type
+ raise Exception(u'Unrecognized tag type "%s"' % tag_type)
return commands
+
def parse_new(self):
command = u'new'
- tag,subject = self._split_subject()
+ tag, subject = self._split_subject()
summary = subject
options = {u'Reporter': self.author_addr(),
u'Confirm': self._yes_no(self.confirm),
u'Subscribe': 'no',
}
- body,mime_type = list(self._get_bodies_and_mime_types())[0]
- comment_body,options = \
+ body, mime_type = list(self._get_bodies_and_mime_types())[0]
+ comment_body, options = \
self._parse_body_pseudoheaders(body,
NEW_REQUIRED_PSEUDOHEADERS,
NEW_OPTIONAL_PSEUDOHEADERS,
@@ -512,16 +561,16 @@ class Message (object):
comment_body = self._strip_footer(comment_body)
if len(comment_body) > 0:
command = u'comment'
- comment = u'Version: %s\n\n'%options[u'Version'] + comment_body
+ comment = u'Version: %s\n\n' % options[u'Version'] + comment_body
args = [u'--author', self.author_addr(),
u'--alt-id', self.message_id(),
u'--content-type', mime_type]
args.append(id)
args.append(u'-')
commands.append(Command(self, u'comment', args, stdin=comment))
- for key,value in options.items():
+ for key, value in options.items():
if key in [u'Version', u'Reporter', u'Confirm']:
- continue # we've already handled these options
+ continue # we've already handled these options
command = key.lower()
if key in [u'Depend', u'Tag', u'Target', u'Subscribe']:
args = [id, value]
@@ -533,23 +582,25 @@ class Message (object):
args = ['--subscriber', self.author_addr(), id]
commands.append(Command(self, command, args))
return commands
+
def parse_comment(self, bug_uuid):
command = u'comment'
bug_id = bug_uuid
author = self.author_addr()
alt_id = self.message_id()
- body,mime_type = list(self._get_bodies_and_mime_types())[0]
+ body, mime_type = list(self._get_bodies_and_mime_types())[0]
if mime_type == 'text/plain':
body = self._strip_footer(body)
content_type = mime_type
args = [u'--author', author]
- if alt_id != None:
+ if alt_id is not None:
args.extend([u'--alt-id', alt_id])
args.extend([u'--content-type', content_type, bug_id, u'-'])
commands = [Command(self, command, args, stdin=body)]
return commands
+
def parse_control(self):
- body,mime_type = list(self._get_bodies_and_mime_types())[0]
+ body, mime_type = list(self._get_bodies_and_mime_types())[0]
commands = []
for line in body.splitlines():
line = line.strip()
@@ -564,42 +615,46 @@ class Message (object):
if type(line) == types.UnicodeType:
# work around http://bugs.python.org/issue1170
for field in fields:
- field = unicode(field, 'unicode escape')
- command,args = (fields[0], fields[1:])
+ field = str(field, 'unicode escape')
+ command, args = (fields[0], fields[1:])
commands.append(Command(self, command, args))
if len(commands) == 0:
raise InvalidEmail(self, u'No commands in control email.')
return commands
+
def run(self, repo='.'):
self._begin_response()
commands = self.parse()
try:
- for i,command in enumerate(commands):
+ for i, command in enumerate(commands):
command.run()
self._add_response(command.response_msg())
finally:
if AUTOCOMMIT == True:
- tag,subject = self._split_subject()
+ tag, subject = self._split_subject()
self.commit_command = Command(self, 'commit', [subject])
self.commit_command.run()
- if LOGFILE != None:
+ if LOGFILE is not None:
LOGFILE.write(u'Autocommit:\n%s\n\n' %
- send_pgp_mime.flatten(self.commit_command.response_msg(),
- to_unicode=True))
+ send_pgp_mime.flatten(self.commit_command.response_msg(),
+ to_unicode=True))
+
def _begin_response(self):
- tag,subject = self._split_subject()
+ tag, subject = self._split_subject()
response_header = [u'From: %s' % THIS_ADDRESS,
u'To: %s' % self.author_addr(),
u'Date: %s' % libbe.util.utility.time_to_str(time.time()),
- u'Subject: %s Re: %s'%(SUBJECT_TAG_RESPONSE,subject)
+ u'Subject: %s Re: %s' % (SUBJECT_TAG_RESPONSE, subject)
]
- if self.message_id() != None:
+ if self.message_id() is not None:
response_header.append(u'In-reply-to: %s' % self.message_id())
self.response_header = \
send_pgp_mime.header_from_text(text=u'\n'.join(response_header))
self._response_messages = []
+
def _add_response(self, response_message):
self._response_messages.append(response_message)
+
def response_email(self):
assert len(self._response_messages) > 0
if len(self._response_messages) == 1:
@@ -609,9 +664,10 @@ class Message (object):
for message in self._response_messages:
response_body.attach(message)
return send_pgp_mime.attach_root(self.response_header, response_body)
+
def subscriber_emails(self, previous_revision=None):
- if previous_revision == None:
- if AUTOCOMMIT != True: # no way to tell what's changed
+ if previous_revision is None:
+ if AUTOCOMMIT != True: # no way to tell what's changed
raise NotificationFailed('Autocommit dissabled')
if len(self._response_messages) == 0:
raise NotificationFailed('Initial email failed.')
@@ -622,7 +678,7 @@ class Message (object):
bd = UI.storage_callbacks.get_bugdir()
writeable = bd.storage.writeable
bd.storage.writeable = False
- if bd.storage.versioned == False: # no way to tell what's changed
+ if bd.storage.versioned == False: # no way to tell what's changed
bd.storage.writeable = writeable
raise NotificationFailed('Not versioned')
@@ -631,12 +687,12 @@ class Message (object):
if len(subscribers) == 0:
bd.storage.writeable = writeable
return []
- for subscriber,subscriptions in subscribers.items():
+ for subscriber, subscriptions in subscribers.items():
subscribers[subscriber] = []
- for id,types in subscriptions.items():
+ for id, types in subscriptions.items():
for type in types:
subscribers[subscriber].append(
- libbe.diff.Subscription(id,type))
+ libbe.diff.Subscription(id, type))
before_bd, after_bd = self._get_before_and_after_bugdirs(bd, previous_revision)
diff = Diff(before_bd, after_bd)
@@ -644,41 +700,43 @@ class Message (object):
header = self._subscriber_header(bd, previous_revision)
emails = []
- for subscriber,subscriptions in subscribers.items():
+ for subscriber, subscriptions in subscribers.items():
header.replace_header('to', subscriber)
report = diff.report_tree(subscriptions, diff_tree=DiffTree)
root = report.report_or_none()
- if root != None:
+ if root is not None:
emails.append(send_pgp_mime.attach_root(header, root))
- if LOGFILE != None:
+ if LOGFILE is not None:
LOGFILE.write(u'Preparing to notify %s of changes\n' % subscriber)
bd.storage.writeable = writeable
return emails
+
def _get_before_and_after_bugdirs(self, bd, previous_revision=None):
- if previous_revision == None:
+ if previous_revision is None:
commit_msg = self.commit_command.stdout
assert commit_msg.startswith('Committed '), commit_msg
after_revision = commit_msg[len('Committed '):]
before_revision = bd.storage.revision_id(-2)
else:
before_revision = previous_revision
- if before_revision == None:
+ if before_revision is None:
# this commit was the initial commit
before_bd = libbe.bugdir.BugDir(from_disk=False,
manipulate_encodings=False)
else:
before_bd = libbe.bugdir.RevisionedBugDir(bd, before_revision)
- #after_bd = bd.duplicate_bugdir(after_revision)
- after_bd = bd # assume no changes since commit a few cycles ago
+ # after_bd = bd.duplicate_bugdir(after_revision)
+ after_bd = bd # assume no changes since commit a few cycles ago
return (before_bd, after_bd)
+
def _subscriber_header(self, bd, previous_revision=None):
root_dir = os.path.basename(bd.storage.repo)
- if previous_revision == None:
+ if previous_revision is None:
subject = 'Changes to %s on %s by %s' \
- % (root_dir, THIS_SERVER, self.author_addr())
+ % (root_dir, THIS_SERVER, self.author_addr())
else:
subject = 'Changes to %s on %s since revision %s' \
- % (root_dir, THIS_SERVER, previous_revision)
+ % (root_dir, THIS_SERVER, previous_revision)
header = [u'From: %s' % THIS_ADDRESS,
u'To: %s' % u'DUMMY-AUTHOR',
u'Date: %s' % libbe.util.utility.time_to_str(time.time()),
@@ -686,6 +744,7 @@ class Message (object):
]
return send_pgp_mime.header_from_text(text=u'\n'.join(header))
+
def generate_global_tags(tag_base=u'be-bug'):
"""
Generate a series of tags from a base tag string.
@@ -699,6 +758,7 @@ def generate_global_tags(tag_base=u'be-bug'):
SUBJECT_TAG_COMMENT = re.compile(u'\[%s:([\-0-9a-z/]*)]' % tag_base)
SUBJECT_TAG_CONTROL = SUBJECT_TAG_RESPONSE
+
def open_logfile(logpath=None):
"""
If logpath=None, default to global LOGPATH.
@@ -708,7 +768,7 @@ def open_logfile(logpath=None):
Relative logpaths are expanded relative to _THIS_DIR
"""
global LOGPATH, LOGFILE
- if logpath != None:
+ if logpath is not None:
if logpath == u'-':
LOGPATH = u'stderr'
LOGFILE = sys.stderr
@@ -719,14 +779,16 @@ def open_logfile(logpath=None):
LOGPATH = logpath
else:
LOGPATH = os.path.join(_THIS_DIR, logpath)
- if LOGFILE == None and LOGPATH != u'none':
+ if LOGFILE is None and LOGPATH != u'none':
LOGFILE = codecs.open(LOGPATH, u'a+',
- libbe.util.encoding.get_text_file_encoding())
+ libbe.util.encoding.get_text_file_encoding())
+
def close_logfile():
- if LOGFILE != None and LOGPATH not in [u'stderr', u'none']:
+ if LOGFILE is not None and LOGPATH not in [u'stderr', u'none']:
LOGFILE.close()
+
def test():
result = unittest.TextTestRunner(verbosity=2).run(suite)
num_errors = len(result.errors)
@@ -734,11 +796,12 @@ def test():
num_bad = num_errors + num_failures
return num_bad
+
def main(args):
from optparse import OptionParser
global AUTOCOMMIT, UI
- usage='be-handle-mail [options]\n\n%s' % (__doc__)
+ usage = 'be-handle-mail [options]\n\n%s' % (__doc__)
parser = OptionParser(usage=usage)
parser.add_option('-r', '--repo', dest='repo', default=_THIS_DIR,
metavar='REPO',
@@ -762,17 +825,17 @@ def main(args):
help='Run internal unit-tests and exit.')
pargs = args
- options,args = parser.parse_args(args[1:])
+ options, args = parser.parse_args(args[1:])
if options.test == True:
num_bad = test()
if num_bad > 126:
num_bad = 1
sys.exit(num_bad)
-
+
AUTOCOMMIT = options.autocommit
- if options.notify_since == None:
+ if options.notify_since is None:
msg_text = sys.stdin.read()
open_logfile(options.logfile)
@@ -781,29 +844,29 @@ def main(args):
io = libbe.command.StringInputOutput()
UI = libbe.command.UserInterface(io, location=options.repo)
- if options.notify_since != None:
+ if options.notify_since is not None:
if options.subscribers == True:
- if LOGFILE != None:
+ if LOGFILE is not None:
LOGFILE.write(u'Checking for subscribers to notify since revision %s\n'
% options.notify_since)
try:
m = Message(disable_parsing=True)
emails = m.subscriber_emails(options.notify_since)
- except NotificationFailed, e:
- if LOGFILE != None:
- LOGFILE.write(unicode(e) + u'\n')
+ except NotificationFailed as e:
+ if LOGFILE is not None:
+ LOGFILE.write(str(e) + u'\n')
else:
for msg in emails:
if options.output == True:
- print send_pgp_mime.flatten(msg, to_unicode=True)
+ print(send_pgp_mime.flatten(msg, to_unicode=True))
else:
send_pgp_mime.mail(msg, send_pgp_mime.sendmail)
close_logfile()
UI.cleanup()
sys.exit(0)
- if len(msg_text.strip()) == 0: # blank email!?
- if LOGFILE != None:
+ if len(msg_text.strip()) == 0: # blank email!?
+ if LOGFILE is not None:
LOGFILE.write(u'Blank email!\n')
close_logfile()
UI.cleanup()
@@ -811,10 +874,10 @@ def main(args):
try:
m = Message(msg_text)
m.run()
- except InvalidEmail, e:
+ except InvalidEmail as e:
response = e.response()
- except Exception, e:
- if LOGFILE != None:
+ except Exception as e:
+ if LOGFILE is not None:
LOGFILE.write(u'Uncaught exception:\n%s\n' % (e,))
traceback.print_tb(sys.exc_traceback, file=LOGFILE)
close_logfile()
@@ -824,28 +887,28 @@ def main(args):
else:
response = m.response_email()
if options.output == True:
- print send_pgp_mime.flatten(response, to_unicode=True)
+ print(send_pgp_mime.flatten(response, to_unicode=True))
elif m.confirm == True:
- if LOGFILE != None:
+ if LOGFILE is not None:
LOGFILE.write(u'Sending response to %s\n' % m.author_addr())
LOGFILE.write(u'\n%s\n\n' % send_pgp_mime.flatten(response,
to_unicode=True))
send_pgp_mime.mail(response, send_pgp_mime.sendmail)
else:
- if LOGFILE != None:
+ if LOGFILE is not None:
LOGFILE.write(u'Response declined by %s\n' % m.author_addr())
if options.subscribers == True:
- if LOGFILE != None:
+ if LOGFILE is not None:
LOGFILE.write(u'Checking for subscribers\n')
try:
emails = m.subscriber_emails()
- except NotificationFailed, e:
- if LOGFILE != None:
- LOGFILE.write(unicode(e) + u'\n')
+ except NotificationFailed as e:
+ if LOGFILE is not None:
+ LOGFILE.write(str(e) + u'\n')
else:
for msg in emails:
if options.output == True:
- print send_pgp_mime.flatten(msg, to_unicode=True)
+ print(send_pgp_mime.flatten(msg, to_unicode=True))
else:
send_pgp_mime.mail(msg, send_pgp_mime.sendmail)
@@ -853,23 +916,28 @@ def main(args):
m.commit_command.cleanup()
UI.cleanup()
-class GenerateGlobalTagsTestCase (unittest.TestCase):
+
+class GenerateGlobalTagsTestCase(unittest.TestCase):
def setUp(self):
super(GenerateGlobalTagsTestCase, self).setUp()
self.save_global_tags()
+
def tearDown(self):
self.restore_global_tags()
super(GenerateGlobalTagsTestCase, self).tearDown()
+
def save_global_tags(self):
self.saved_globals = [SUBJECT_TAG_BASE, SUBJECT_TAG_START,
SUBJECT_TAG_RESPONSE, SUBJECT_TAG_NEW,
SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL]
+
def restore_global_tags(self):
global SUBJECT_TAG_BASE, SUBJECT_TAG_START, SUBJECT_TAG_RESPONSE, \
SUBJECT_TAG_NEW, SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL
SUBJECT_TAG_BASE, SUBJECT_TAG_START, SUBJECT_TAG_RESPONSE, \
SUBJECT_TAG_NEW, SUBJECT_TAG_COMMENT, SUBJECT_TAG_CONTROL = \
self.saved_globals
+
def test_restore_global_tags(self):
"Test global tag restoration by teardown function."
global SUBJECT_TAG_BASE
@@ -878,26 +946,32 @@ class GenerateGlobalTagsTestCase (unittest.TestCase):
self.failUnlessEqual(SUBJECT_TAG_BASE, u'projectX-bug')
self.restore_global_tags()
self.failUnlessEqual(SUBJECT_TAG_BASE, u'be-bug')
+
def test_subject_tag_base(self):
"Should set SUBJECT_TAG_BASE global correctly"
generate_global_tags(u'projectX-bug')
self.failUnlessEqual(SUBJECT_TAG_BASE, u'projectX-bug')
+
def test_subject_tag_start(self):
"Should set SUBJECT_TAG_START global correctly"
generate_global_tags(u'projectX-bug')
self.failUnlessEqual(SUBJECT_TAG_START, u'[projectX-bug')
+
def test_subject_tag_response(self):
"Should set SUBJECT_TAG_RESPONSE global correctly"
generate_global_tags(u'projectX-bug')
self.failUnlessEqual(SUBJECT_TAG_RESPONSE, u'[projectX-bug]')
+
def test_subject_tag_new(self):
"Should set SUBJECT_TAG_NEW global correctly"
generate_global_tags(u'projectX-bug')
self.failUnlessEqual(SUBJECT_TAG_NEW, u'[projectX-bug:submit]')
+
def test_subject_tag_control(self):
"Should set SUBJECT_TAG_CONTROL global correctly"
generate_global_tags(u'projectX-bug')
self.failUnlessEqual(SUBJECT_TAG_CONTROL, u'[projectX-bug]')
+
def test_subject_tag_comment(self):
"Should set SUBJECT_TAG_COMMENT global correctly"
generate_global_tags(u'projectX-bug')
@@ -905,6 +979,7 @@ class GenerateGlobalTagsTestCase (unittest.TestCase):
self.failUnlessEqual(len(m.groups()), 1)
self.failUnlessEqual(m.group(1), u'abc/xyz-123')
+
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])