aboutsummaryrefslogtreecommitdiffstats
path: root/xml
diff options
context:
space:
mode:
Diffstat (limited to 'xml')
-rwxr-xr-xxml/be-mbox-to-xml104
-rwxr-xr-xxml/be-xml-to-mbox157
2 files changed, 185 insertions, 76 deletions
diff --git a/xml/be-mbox-to-xml b/xml/be-mbox-to-xml
new file mode 100755
index 0000000..9054cfd
--- /dev/null
+++ b/xml/be-mbox-to-xml
@@ -0,0 +1,104 @@
+#!/usr/bin/env python
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+Convert an mbox into xml suitable for imput into be.
+ $ cat mbox | be-mbox-to-xml | be comment --xml <ID> -
+mbox is a flat-file format, consisting of a series of messages.
+Messages begin with a a From_ line, followed by RFC 822 email,
+followed by a blank line.
+"""
+
+import base64
+import email.utils
+from libbe.encoding import get_encoding, set_IO_stream_encodings
+from mailbox import mbox, Message # the mailbox people really want an on-disk copy
+from time import asctime, gmtime
+import types
+from xml.sax import make_parser
+from xml.sax.handler import ContentHandler
+from xml.sax.saxutils import escape
+
+DEFAULT_ENCODING = get_encoding()
+set_IO_stream_encodings(DEFAULT_ENCODING)
+
+def comment_message_to_xml(message, fields=None):
+ if fields == None:
+ fields = {}
+ new_fields = {}
+ new_fields[u'alt-id'] = message[u'message-id']
+ new_fields[u'in-reply-to'] = message[u'in-reply-to']
+ new_fields[u'from'] = message[u'from']
+ new_fields[u'date'] = message[u'date']
+ new_fields[u'content-type'] = message.get_content_type()
+ for k,v in new_fields.items():
+ if v != None and type(v) != types.UnicodeType:
+ fields[k] = unicode(v, encoding=DEFAULT_ENCODING)
+ elif v == None and k in fields:
+ new_fields[k] = fields[k]
+ for k,v in fields.items():
+ if k not in new_fields:
+ new_fields.k = fields[k]
+ fields = new_fields
+
+ if message.is_multipart():
+ ret = []
+ alt_id = fields[u'alt-id']
+ from_str = fields[u'from']
+ date = fields[u'date']
+ for m in message.walk():
+ if m == message:
+ continue
+ fields[u'from'] = from_str
+ fields[u'date'] = date
+ if len(ret) >= 0:
+ fields.pop(u'alt-id')
+ fields[u'in-reply-to'] = alt_id
+ ret.append(comment_message_to_xml(m, fields))
+ return u'\n'.join(ret)
+
+ charset = message.get_content_charset(DEFAULT_ENCODING).lower()
+ #assert charset == DEFAULT_ENCODING.lower(), \
+ # u"Unknown charset: %s" % charset
+
+ encoding = message[u'content-transfer-encoding'].lower()
+ body = message.get_payload(decode=True) # attempt to decode
+ assert body != None, "Unable to decode?"
+ if fields[u'content-type'].startswith(u"text/"):
+ body = unicode(body, encoding=charset).rstrip(u'\n')
+ else:
+ body = base64.encode(body)
+ fields[u'body'] = body
+ lines = [u"<comment>"]
+ for tag,body in fields.items():
+ if body != None:
+ ebody = escape(body)
+ lines.append(u" <%s>%s</%s>" % (tag, ebody, tag))
+ lines.append(u"</comment>")
+ return u'\n'.join(lines)
+
+def main(mbox_filename):
+ mb = mbox(mbox_filename)
+ print u'<?xml version="1.0" encoding="%s" ?>' % DEFAULT_ENCODING
+ print u"<comment-list>"
+ for message in mb:
+ print comment_message_to_xml(message)
+ print u"</comment-list>"
+
+
+if __name__ == "__main__":
+ import sys
+ main(sys.argv[1])
diff --git a/xml/be-xml-to-mbox b/xml/be-xml-to-mbox
index 80db634..b74a33d 100755
--- a/xml/be-xml-to-mbox
+++ b/xml/be-xml-to-mbox
@@ -26,14 +26,16 @@ followed by a blank line.
"""
#from mailbox import mbox, Message # the mailbox people really want an on-disk copy
+import codecs
import email.utils
-import types
-
from libbe.encoding import get_encoding, set_IO_stream_encodings
from libbe.utility import str_to_time as rfc2822_to_gmtime_integer
from time import asctime, gmtime
-from xml.sax import make_parser
-from xml.sax.handler import ContentHandler
+import types
+try: # import core module, Python >= 2.5
+ from xml.etree import ElementTree
+except ImportError: # look for non-core module
+ from elementtree import ElementTree
from xml.sax.saxutils import unescape
@@ -83,7 +85,7 @@ class Bug (LimitedAttrDict):
u"created",
u"summary",
u"comments",
- u"extra_strings"]
+ u"extra-strings"]
def print_to_mbox(self):
name,addr = email.utils.parseaddr(self["creator"])
print "From %s %s" % (addr, rfc2822_to_asctime(self["created"]))
@@ -96,28 +98,62 @@ class Bug (LimitedAttrDict):
print ""
print self["summary"]
print ""
- if len(self["extra_strings"]) > 0:
+ if "extra-strings" in self:
print "extra strings:\n ",
print '\n '.join(self["extra_strings"])
print ""
- for comment in self["comments"]:
- comment.print_to_mbox(self)
+ if "comments" in self:
+ for comment in self["comments"]:
+ comment.print_to_mbox(self)
+ def init_from_etree(self, element):
+ assert element.tag == "bug", element.tag
+ for field in element.getchildren():
+ text = unescape(unicode(field.text).decode("unicode_escape").strip())
+ if field.tag == "comment":
+ comm = Comment()
+ comm.init_from_etree(field)
+ if "comments" in self:
+ self["comments"].append(comm)
+ else:
+ self["comments"] = [comm]
+ elif field.tag == "extra-string":
+ if "extra-strings" in self:
+ self["extra-strings"].append(text)
+ else:
+ self["extra-strings"] = [text]
+ else:
+ self[field.tag] = text
class Comment (LimitedAttrDict):
_attrs = [u"uuid",
+ u"alt-id",
u"short-name",
u"in-reply-to",
u"from",
u"date",
u"content-type",
u"body"]
- def print_to_mbox(self, bug):
+ def print_to_mbox(self, bug=None):
+ if bug == None:
+ bug = Bug()
+ bug[u"uuid"] = u"no-uuid"
name,addr = email.utils.parseaddr(self["from"])
print "From %s %s" % (addr, rfc2822_to_asctime(self["date"]))
- print "Message-ID: <%s@%s>" % (self["uuid"], DEFAULT_DOMAIN)
+ if "uuid" in self: id = self["uuid"]
+ elif "alt-id" in self: id = self["alt-id"]
+ else: id = None
+ if id != None:
+ print "Message-ID: <%s@%s>" % (id, DEFAULT_DOMAIN)
print "Date: %s" % self["date"]
print "From: %s" % self["from"]
- print "Subject: %s: %s" % (self["short-name"], bug["summary"])
+ subject = ""
+ if "short-name" in self:
+ subject += self["short-name"]+u": "
+ if "summary" in bug:
+ subject += bug["summary"]
+ else:
+ subject += u"no-subject"
+ print "Subject: %s" % subject
if "in-reply-to" not in self.keys():
self["in-reply-to"] = bug["uuid"]
print "In-Reply-To: <%s@%s>" % (self["in-reply-to"], DEFAULT_DOMAIN)
@@ -129,72 +165,41 @@ class Comment (LimitedAttrDict):
else: # content type and transfer encoding already in XML MIME output
print self["body"]
print ""
+ def init_from_etree(self, element):
+ assert element.tag == "comment", element.tag
+ for field in element.getchildren():
+ text = unescape(unicode(field.text).decode("unicode_escape").strip())
+ if field.tag == "body":
+ text+="\n"
+ self[field.tag] = text
-class BE_list_handler (ContentHandler):
- def __init__(self):
- self.reset()
-
- def reset(self):
- self.bug = None
- self.comment = None
- self.extra_strings = None
- self.text_field = None
-
- def startElement(self, name, attributes):
- if name == "bug":
- assert self.bug == None, "Nested bugs?!"
- assert self.comment == None
- assert self.text_field == None
- self.bug = Bug(comments=[], extra_strings=[])
- elif name == "comment":
- assert self.bug != None, "<comment> not in <bug>?"
- assert self.comment == None, "Nested comments?!"
- assert self.text_field == None, "<comment> in text field %s?" % self.text_field
- self.comment = Comment()
- elif self.bug != None and self.comment == None:
- # parse bug text field
- self.text_field = name
- self.text_data = ""
- elif self.bug != None and self.comment != None:
- # parse comment text field
- self.text_field = name
- self.text_data = ""
-
- def endElement(self, name):
- if name == "bug":
- assert self.bug != None, "Invalid XML?"
- assert self.comment == None, "Invalid XML?"
- assert self.text_field == None, "Invalid XML?"
- self.bug.print_to_mbox()
- self.bug = None
- elif name == "comment":
- assert self.bug != None, "<comment> not in <bug>?"
- assert self.comment != None, "Invalid XML?"
- assert self.text_field == None, "<comment> in text field %s?" % self.text_field
- self.bug["comments"].append(self.comment)
- # comments printed by bug.print_to_mbox()
- self.comment = None
- elif self.bug != None and self.comment == None:
- # parse bug text field
- if self.text_field == "extra-string":
- self.bug["extra_strings"].append(unescape(self.text_data.strip()))
- else:
- self.bug[self.text_field] = unescape(self.text_data.strip())
- self.text_field = None
- self.text_data = None
- elif self.bug != None and self.comment != None:
- # parse comment text field
- self.comment[self.text_field] = unescape(self.text_data.strip())
- self.text_field = None
- self.text_data = None
-
- def characters(self, data):
- if self.text_field != None:
- self.text_data += data
+def print_to_mbox(element):
+ if element.tag == "bug":
+ b = Bug()
+ b.init_from_etree(element)
+ b.print_to_mbox()
+ elif element.tag == "comment":
+ c = Comment()
+ c.init_from_etree(element)
+ c.print_to_mbox()
+ elif element.tag in ["bugs", "bug-list"]:
+ for b_elt in element.getchildren():
+ b = Bug()
+ b.init_from_etree(b_elt)
+ b.print_to_mbox()
+ elif element.tag in ["comments", "comment-list"]:
+ for c_elt in element.getchildren():
+ c = Comment()
+ c.init_from_etree(c_elt)
+ c.print_to_mbox()
if __name__ == "__main__":
import sys
-
- parser = make_parser()
- parser.setContentHandler(BE_list_handler())
- parser.parse(sys.stdin)
+
+ if len(sys.argv) == 1: # no filename given, use stdin
+ xml_unicode = sys.stdin.read()
+ else:
+ xml_unicode = codecs.open(sys.argv[1], "r", DEFAULT_ENCODING).read()
+ xml_str = xml_unicode.encode("unicode_escape").replace(r"\n", "\n")
+ elist = ElementTree.XML(xml_str)
+ print_to_mbox(elist)