aboutsummaryrefslogtreecommitdiffstats
path: root/becommands/import_xml.py
diff options
context:
space:
mode:
Diffstat (limited to 'becommands/import_xml.py')
-rw-r--r--becommands/import_xml.py215
1 files changed, 188 insertions, 27 deletions
diff --git a/becommands/import_xml.py b/becommands/import_xml.py
index a74d329..892a09e 100644
--- a/becommands/import_xml.py
+++ b/becommands/import_xml.py
@@ -16,12 +16,15 @@
"""Import comments and bugs from XML"""
from libbe import cmdutil, bugdir, bug, comment, utility
from becommands.comment import complete
+import copy
import os
import sys
try: # import core module, Python >= 2.5
from xml.etree import ElementTree
except ImportError: # look for non-core module
from elementtree import ElementTree
+import doctest
+import unittest
__desc__ = __doc__
def execute(args, manipulate_encodings=True, restrict_file_access=False):
@@ -63,6 +66,22 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False):
if options.comment_root != None:
croot_bug,croot_comment = \
cmdutil.bug_comment_from_id(bd, options.comment_root)
+ croot_bug.load_comments(load_full=True)
+ croot_bug.set_sync_with_disk(False)
+ if croot_comment.uuid == comment.INVALID_UUID:
+ croot_comment = croot_bug.comment_root
+ else:
+ croot_comment = croot_bug.comment_from_uuid(croot_comment.uuid)
+ new_croot_bug = bug.Bug(bugdir=bd, uuid=croot_bug.uuid)
+ new_croot_bug.explicit_attrs = []
+ new_croot_bug.comment_root = copy.deepcopy(croot_bug.comment_root)
+ if croot_comment.uuid == comment.INVALID_UUID:
+ new_croot_comment = new_croot_bug.comment_root
+ else:
+ new_croot_comment = \
+ new_croot_bug.comment_from_uuid(croot_comment.uuid)
+ for new in new_croot_bug.comments():
+ new.explicit_attrs = []
else:
croot_bug,croot_comment = (None, None)
@@ -87,7 +106,7 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False):
for child in be_xml.getchildren():
if child.tag == 'bug':
new = bug.Bug(bugdir=bd)
- new.from_xml(unicode(ElementTree.tostring(child)).decode('unicode_escape'))
+ new.from_xml(unicode(ElementTree.tostring(child)).decode("unicode_escape"))
root_bugs.append(new)
elif child.tag == 'comment':
new = comment.Comment(croot_bug)
@@ -107,35 +126,47 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False):
% (child.tag, comment_list.tag)
# merge the new root_comments
+ if options.add_only == True:
+ accept_changes = False
+ accept_extra_strings = False
+ else:
+ accept_changes = True
+ accept_extra_strings = True
+ accept_comments = True
if len(root_comments) > 0:
if croot_bug == None:
raise UserError(
'--comment-root option is required for your root comments:\n%s'
% '\n\n'.join([c.string() for c in root_comments]))
- croot_cids = []
- for c in croot_bug.comment_root.traverse():
- croot_cids.append(c.uuid)
- if c.alt_id != None:
- croot_cids.append(c.alt_id)
- for new in root_comments:
- if new.alt_id in croot_cids:
- raise cmdutil.UserError(
- 'clashing comment alt_id: %s' % new.alt_id)
- croot_cids.append(new.uuid)
- if new.alt_id != None:
- croot_cids.append(new.alt_id)
- if new.in_reply_to == None:
- new.in_reply_to = croot_comment.uuid
try:
# link new comments
- comment.list_to_root(root_comments,croot_bug,root=croot_comment,
- ignore_missing_references= \
- options.ignore_missing_references)
+ new_croot_bug.add_comments(root_comments,
+ default_parent=new_croot_comment,
+ ignore_missing_references= \
+ options.ignore_missing_references)
except comment.MissingReference, e:
raise cmdutil.UserError(e)
+ croot_bug.merge(new_croot_bug, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ accept_comments=accept_comments)
+
# merge the new croot_bugs
+ merged_bugs = []
+ old_bugs = []
for new in root_bugs:
- bd.append(new)
+ try:
+ old = bd.bug_from_uuid(new.alt_id)
+ except KeyError:
+ old = None
+ if old == None:
+ bd.append(new)
+ else:
+ old.load_comments(load_full=True)
+ old.merge(new, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ accept_comments=accept_comments)
+ merged_bugs.append(new)
+ old_bugs.append(old)
# protect against programmer error causing data loss:
if croot_bug != None:
@@ -144,14 +175,18 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False):
assert new.uuid in comms, \
"comment %s wasn't added to %s" % (new.uuid, croot_comment.uuid)
for new in root_bugs:
- assert bd.has_bug(new.uuid), \
- "bug %s wasn't added" % (new.uuid)
+ if not new in merged_bugs:
+ assert bd.has_bug(new.uuid), \
+ "bug %s wasn't added" % (new.uuid)
# save new information
- for new in root_comments:
- new.save()
+ if croot_bug != None:
+ croot_bug.save()
for new in root_bugs:
- new.save()
+ if not new in merged_bugs:
+ new.save()
+ for old in old_bugs:
+ old.save()
def get_parser():
parser = cmdutil.CmdOptionParser("be import-xml XMLFILE")
@@ -213,7 +248,7 @@ repeats.
Here's an example of import activity:
Repository
- bug (uuid=B, author=John, status=open)
+ bug (uuid=B, creator=John, status=open)
estr (don't forget your towel)
estr (helps with space travel)
com (uuid=C1, author=Jane, body=Hello)
@@ -225,7 +260,7 @@ Here's an example of import activity:
com (uuid=C1, body=So long)
com (uuid=C3, author=Jed, body=And thanks)
Result
- bug (uuid=B, author=John, status=fixed)
+ bug (uuid=B, creator=John, status=fixed)
estr (don't forget your towel)
estr (helps with space travel)
estr (watch out for flying dolphins)
@@ -233,7 +268,7 @@ Here's an example of import activity:
com (uuid=C2, author=Jess, body=World)
com (uuid=C4, alt-id=C3, author=Jed, body=And thanks)
Result, with --add-only
- bug (uuid=B, author=John, status=open)
+ bug (uuid=B, creator=John, status=open)
estr (don't forget your towel)
estr (helps with space travel)
com (uuid=C1, author=Jane, body=Hello)
@@ -265,3 +300,129 @@ Devs recieve email, and save it's contents as demux-bug.xml
def help():
return get_parser().help_str() + longhelp
+
+
+class LonghelpTestCase (unittest.TestCase):
+ """
+ Test import scenarios given in longhelp.
+ """
+ def setUp(self):
+ self.bugdir = bugdir.SimpleBugDir()
+ self.original_working_dir = os.getcwd()
+ os.chdir(self.bugdir.root)
+ bugA = self.bugdir.bug_from_uuid('a')
+ self.bugdir.remove_bug(bugA)
+ self.bugdir.set_sync_with_disk(False)
+ bugB = self.bugdir.bug_from_uuid('b')
+ bugB.creator = 'John'
+ bugB.status = 'open'
+ bugB.extra_strings += ["don't forget your towel"]
+ bugB.extra_strings += ['helps with space travel']
+ comm1 = bugB.comment_root.new_reply(body='Hello\n')
+ comm1.uuid = 'c1'
+ comm1.author = 'Jane'
+ comm2 = bugB.comment_root.new_reply(body='World\n')
+ comm2.uuid = 'c2'
+ comm2.author = 'Jess'
+ bugB.save()
+ self.bugdir.set_sync_with_disk(True)
+ self.xml = """
+ <be-xml>
+ <bug>
+ <uuid>b</uuid>
+ <status>fixed</status>
+ <summary>a test bug</summary>
+ <extra-string>don't forget your towel</extra-string>
+ <extra-string>watch out for flying dolphins</extra-string>
+ <comment>
+ <uuid>c1</uuid>
+ <body>So long</body>
+ </comment>
+ <comment>
+ <uuid>c3</uuid>
+ <author>Jed</author>
+ <body>And thanks</body>
+ </comment>
+ </bug>
+ </be-xml>
+ """
+ def tearDown(self):
+ os.chdir(self.original_working_dir)
+ self.bugdir.cleanup()
+ def _execute(self, *args):
+ import StringIO
+ orig_stdin = sys.stdin
+ sys.stdin = StringIO.StringIO(self.xml)
+ execute(list(args)+["-"], manipulate_encodings=False,
+ restrict_file_access=True)
+ sys.stdin = orig_stdin
+ self.bugdir._clear_bugs()
+ def testCleanBugdir(self):
+ uuids = list(self.bugdir.uuids())
+ self.failUnless(uuids == ['b'], uuids)
+ def testNotAddOnly(self):
+ self._execute()
+ uuids = list(self.bugdir.uuids())
+ self.failUnless(uuids == ['b'], uuids)
+ bugB = self.bugdir.bug_from_uuid('b')
+ self.failUnless(bugB.uuid == 'b', bugB.uuid)
+ self.failUnless(bugB.creator == 'John', bugB.creator)
+ self.failUnless(bugB.status == 'fixed', bugB.status)
+ estrs = ["don't forget your towel",
+ 'helps with space travel',
+ 'watch out for flying dolphins']
+ self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ comments = list(bugB.comments())
+ self.failUnless(len(comments) == 3,
+ ['%s (%s, %s)' % (c.uuid, c.alt_id, c.body) for c in comments])
+ c1 = bugB.comment_from_uuid('c1')
+ comments.remove(c1)
+ self.failUnless(c1.uuid == 'c1', c1.uuid)
+ self.failUnless(c1.alt_id == None, c1.alt_id)
+ self.failUnless(c1.author == 'Jane', c1.author)
+ self.failUnless(c1.body == 'So long\n', c1.body)
+ c2 = bugB.comment_from_uuid('c2')
+ comments.remove(c2)
+ self.failUnless(c2.uuid == 'c2', c2.uuid)
+ self.failUnless(c2.alt_id == None, c2.alt_id)
+ self.failUnless(c2.author == 'Jess', c2.author)
+ self.failUnless(c2.body == 'World\n', c2.body)
+ c4 = comments[0]
+ self.failUnless(len(c4.uuid) == 36, c4.uuid)
+ self.failUnless(c4.alt_id == 'c3', c4.alt_id)
+ self.failUnless(c4.author == 'Jed', c4.author)
+ self.failUnless(c4.body == 'And thanks\n', c4.body)
+ def testAddOnly(self):
+ self._execute('--add-only')
+ uuids = list(self.bugdir.uuids())
+ self.failUnless(uuids == ['b'], uuids)
+ bugB = self.bugdir.bug_from_uuid('b')
+ self.failUnless(bugB.uuid == 'b', bugB.uuid)
+ self.failUnless(bugB.creator == 'John', bugB.creator)
+ self.failUnless(bugB.status == 'open', bugB.status)
+ estrs = ["don't forget your towel",
+ 'helps with space travel']
+ self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings)
+ comments = list(bugB.comments())
+ self.failUnless(len(comments) == 3,
+ ['%s (%s)' % (c.uuid, c.alt_id) for c in comments])
+ c1 = bugB.comment_from_uuid('c1')
+ comments.remove(c1)
+ self.failUnless(c1.uuid == 'c1', c1.uuid)
+ self.failUnless(c1.alt_id == None, c1.alt_id)
+ self.failUnless(c1.author == 'Jane', c1.author)
+ self.failUnless(c1.body == 'Hello\n', c1.body)
+ c2 = bugB.comment_from_uuid('c2')
+ comments.remove(c2)
+ self.failUnless(c2.uuid == 'c2', c2.uuid)
+ self.failUnless(c2.alt_id == None, c2.alt_id)
+ self.failUnless(c2.author == 'Jess', c2.author)
+ self.failUnless(c2.body == 'World\n', c2.body)
+ c4 = comments[0]
+ self.failUnless(len(c4.uuid) == 36, c4.uuid)
+ self.failUnless(c4.alt_id == 'c3', c4.alt_id)
+ self.failUnless(c4.author == 'Jed', c4.author)
+ self.failUnless(c4.body == 'And thanks\n', c4.body)
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])