diff options
Diffstat (limited to 'becommands')
-rw-r--r-- | becommands/import_xml.py | 215 |
1 files changed, 188 insertions, 27 deletions
diff --git a/becommands/import_xml.py b/becommands/import_xml.py index a74d329..892a09e 100644 --- a/becommands/import_xml.py +++ b/becommands/import_xml.py @@ -16,12 +16,15 @@ """Import comments and bugs from XML""" from libbe import cmdutil, bugdir, bug, comment, utility from becommands.comment import complete +import copy import os import sys try: # import core module, Python >= 2.5 from xml.etree import ElementTree except ImportError: # look for non-core module from elementtree import ElementTree +import doctest +import unittest __desc__ = __doc__ def execute(args, manipulate_encodings=True, restrict_file_access=False): @@ -63,6 +66,22 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False): if options.comment_root != None: croot_bug,croot_comment = \ cmdutil.bug_comment_from_id(bd, options.comment_root) + croot_bug.load_comments(load_full=True) + croot_bug.set_sync_with_disk(False) + if croot_comment.uuid == comment.INVALID_UUID: + croot_comment = croot_bug.comment_root + else: + croot_comment = croot_bug.comment_from_uuid(croot_comment.uuid) + new_croot_bug = bug.Bug(bugdir=bd, uuid=croot_bug.uuid) + new_croot_bug.explicit_attrs = [] + new_croot_bug.comment_root = copy.deepcopy(croot_bug.comment_root) + if croot_comment.uuid == comment.INVALID_UUID: + new_croot_comment = new_croot_bug.comment_root + else: + new_croot_comment = \ + new_croot_bug.comment_from_uuid(croot_comment.uuid) + for new in new_croot_bug.comments(): + new.explicit_attrs = [] else: croot_bug,croot_comment = (None, None) @@ -87,7 +106,7 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False): for child in be_xml.getchildren(): if child.tag == 'bug': new = bug.Bug(bugdir=bd) - new.from_xml(unicode(ElementTree.tostring(child)).decode('unicode_escape')) + new.from_xml(unicode(ElementTree.tostring(child)).decode("unicode_escape")) root_bugs.append(new) elif child.tag == 'comment': new = comment.Comment(croot_bug) @@ -107,35 +126,47 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False): % (child.tag, comment_list.tag) # merge the new root_comments + if options.add_only == True: + accept_changes = False + accept_extra_strings = False + else: + accept_changes = True + accept_extra_strings = True + accept_comments = True if len(root_comments) > 0: if croot_bug == None: raise UserError( '--comment-root option is required for your root comments:\n%s' % '\n\n'.join([c.string() for c in root_comments])) - croot_cids = [] - for c in croot_bug.comment_root.traverse(): - croot_cids.append(c.uuid) - if c.alt_id != None: - croot_cids.append(c.alt_id) - for new in root_comments: - if new.alt_id in croot_cids: - raise cmdutil.UserError( - 'clashing comment alt_id: %s' % new.alt_id) - croot_cids.append(new.uuid) - if new.alt_id != None: - croot_cids.append(new.alt_id) - if new.in_reply_to == None: - new.in_reply_to = croot_comment.uuid try: # link new comments - comment.list_to_root(root_comments,croot_bug,root=croot_comment, - ignore_missing_references= \ - options.ignore_missing_references) + new_croot_bug.add_comments(root_comments, + default_parent=new_croot_comment, + ignore_missing_references= \ + options.ignore_missing_references) except comment.MissingReference, e: raise cmdutil.UserError(e) + croot_bug.merge(new_croot_bug, accept_changes=accept_changes, + accept_extra_strings=accept_extra_strings, + accept_comments=accept_comments) + # merge the new croot_bugs + merged_bugs = [] + old_bugs = [] for new in root_bugs: - bd.append(new) + try: + old = bd.bug_from_uuid(new.alt_id) + except KeyError: + old = None + if old == None: + bd.append(new) + else: + old.load_comments(load_full=True) + old.merge(new, accept_changes=accept_changes, + accept_extra_strings=accept_extra_strings, + accept_comments=accept_comments) + merged_bugs.append(new) + old_bugs.append(old) # protect against programmer error causing data loss: if croot_bug != None: @@ -144,14 +175,18 @@ def execute(args, manipulate_encodings=True, restrict_file_access=False): assert new.uuid in comms, \ "comment %s wasn't added to %s" % (new.uuid, croot_comment.uuid) for new in root_bugs: - assert bd.has_bug(new.uuid), \ - "bug %s wasn't added" % (new.uuid) + if not new in merged_bugs: + assert bd.has_bug(new.uuid), \ + "bug %s wasn't added" % (new.uuid) # save new information - for new in root_comments: - new.save() + if croot_bug != None: + croot_bug.save() for new in root_bugs: - new.save() + if not new in merged_bugs: + new.save() + for old in old_bugs: + old.save() def get_parser(): parser = cmdutil.CmdOptionParser("be import-xml XMLFILE") @@ -213,7 +248,7 @@ repeats. Here's an example of import activity: Repository - bug (uuid=B, author=John, status=open) + bug (uuid=B, creator=John, status=open) estr (don't forget your towel) estr (helps with space travel) com (uuid=C1, author=Jane, body=Hello) @@ -225,7 +260,7 @@ Here's an example of import activity: com (uuid=C1, body=So long) com (uuid=C3, author=Jed, body=And thanks) Result - bug (uuid=B, author=John, status=fixed) + bug (uuid=B, creator=John, status=fixed) estr (don't forget your towel) estr (helps with space travel) estr (watch out for flying dolphins) @@ -233,7 +268,7 @@ Here's an example of import activity: com (uuid=C2, author=Jess, body=World) com (uuid=C4, alt-id=C3, author=Jed, body=And thanks) Result, with --add-only - bug (uuid=B, author=John, status=open) + bug (uuid=B, creator=John, status=open) estr (don't forget your towel) estr (helps with space travel) com (uuid=C1, author=Jane, body=Hello) @@ -265,3 +300,129 @@ Devs recieve email, and save it's contents as demux-bug.xml def help(): return get_parser().help_str() + longhelp + + +class LonghelpTestCase (unittest.TestCase): + """ + Test import scenarios given in longhelp. + """ + def setUp(self): + self.bugdir = bugdir.SimpleBugDir() + self.original_working_dir = os.getcwd() + os.chdir(self.bugdir.root) + bugA = self.bugdir.bug_from_uuid('a') + self.bugdir.remove_bug(bugA) + self.bugdir.set_sync_with_disk(False) + bugB = self.bugdir.bug_from_uuid('b') + bugB.creator = 'John' + bugB.status = 'open' + bugB.extra_strings += ["don't forget your towel"] + bugB.extra_strings += ['helps with space travel'] + comm1 = bugB.comment_root.new_reply(body='Hello\n') + comm1.uuid = 'c1' + comm1.author = 'Jane' + comm2 = bugB.comment_root.new_reply(body='World\n') + comm2.uuid = 'c2' + comm2.author = 'Jess' + bugB.save() + self.bugdir.set_sync_with_disk(True) + self.xml = """ + <be-xml> + <bug> + <uuid>b</uuid> + <status>fixed</status> + <summary>a test bug</summary> + <extra-string>don't forget your towel</extra-string> + <extra-string>watch out for flying dolphins</extra-string> + <comment> + <uuid>c1</uuid> + <body>So long</body> + </comment> + <comment> + <uuid>c3</uuid> + <author>Jed</author> + <body>And thanks</body> + </comment> + </bug> + </be-xml> + """ + def tearDown(self): + os.chdir(self.original_working_dir) + self.bugdir.cleanup() + def _execute(self, *args): + import StringIO + orig_stdin = sys.stdin + sys.stdin = StringIO.StringIO(self.xml) + execute(list(args)+["-"], manipulate_encodings=False, + restrict_file_access=True) + sys.stdin = orig_stdin + self.bugdir._clear_bugs() + def testCleanBugdir(self): + uuids = list(self.bugdir.uuids()) + self.failUnless(uuids == ['b'], uuids) + def testNotAddOnly(self): + self._execute() + uuids = list(self.bugdir.uuids()) + self.failUnless(uuids == ['b'], uuids) + bugB = self.bugdir.bug_from_uuid('b') + self.failUnless(bugB.uuid == 'b', bugB.uuid) + self.failUnless(bugB.creator == 'John', bugB.creator) + self.failUnless(bugB.status == 'fixed', bugB.status) + estrs = ["don't forget your towel", + 'helps with space travel', + 'watch out for flying dolphins'] + self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings) + comments = list(bugB.comments()) + self.failUnless(len(comments) == 3, + ['%s (%s, %s)' % (c.uuid, c.alt_id, c.body) for c in comments]) + c1 = bugB.comment_from_uuid('c1') + comments.remove(c1) + self.failUnless(c1.uuid == 'c1', c1.uuid) + self.failUnless(c1.alt_id == None, c1.alt_id) + self.failUnless(c1.author == 'Jane', c1.author) + self.failUnless(c1.body == 'So long\n', c1.body) + c2 = bugB.comment_from_uuid('c2') + comments.remove(c2) + self.failUnless(c2.uuid == 'c2', c2.uuid) + self.failUnless(c2.alt_id == None, c2.alt_id) + self.failUnless(c2.author == 'Jess', c2.author) + self.failUnless(c2.body == 'World\n', c2.body) + c4 = comments[0] + self.failUnless(len(c4.uuid) == 36, c4.uuid) + self.failUnless(c4.alt_id == 'c3', c4.alt_id) + self.failUnless(c4.author == 'Jed', c4.author) + self.failUnless(c4.body == 'And thanks\n', c4.body) + def testAddOnly(self): + self._execute('--add-only') + uuids = list(self.bugdir.uuids()) + self.failUnless(uuids == ['b'], uuids) + bugB = self.bugdir.bug_from_uuid('b') + self.failUnless(bugB.uuid == 'b', bugB.uuid) + self.failUnless(bugB.creator == 'John', bugB.creator) + self.failUnless(bugB.status == 'open', bugB.status) + estrs = ["don't forget your towel", + 'helps with space travel'] + self.failUnless(bugB.extra_strings == estrs, bugB.extra_strings) + comments = list(bugB.comments()) + self.failUnless(len(comments) == 3, + ['%s (%s)' % (c.uuid, c.alt_id) for c in comments]) + c1 = bugB.comment_from_uuid('c1') + comments.remove(c1) + self.failUnless(c1.uuid == 'c1', c1.uuid) + self.failUnless(c1.alt_id == None, c1.alt_id) + self.failUnless(c1.author == 'Jane', c1.author) + self.failUnless(c1.body == 'Hello\n', c1.body) + c2 = bugB.comment_from_uuid('c2') + comments.remove(c2) + self.failUnless(c2.uuid == 'c2', c2.uuid) + self.failUnless(c2.alt_id == None, c2.alt_id) + self.failUnless(c2.author == 'Jess', c2.author) + self.failUnless(c2.body == 'World\n', c2.body) + c4 = comments[0] + self.failUnless(len(c4.uuid) == 36, c4.uuid) + self.failUnless(c4.alt_id == 'c3', c4.alt_id) + self.failUnless(c4.author == 'Jed', c4.author) + self.failUnless(c4.body == 'And thanks\n', c4.body) + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |