aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libbe/command/base.py23
-rw-r--r--libbe/command/comment.py1
-rw-r--r--libbe/command/import_xml.py364
-rwxr-xr-xlibbe/ui/command_line.py2
4 files changed, 205 insertions, 185 deletions
diff --git a/libbe/command/base.py b/libbe/command/base.py
index e28cf33..fe29908 100644
--- a/libbe/command/base.py
+++ b/libbe/command/base.py
@@ -37,6 +37,21 @@ def get_command(command_name):
raise UnknownCommand(command_name)
return cmd
+def get_command_class(module, command_name):
+ """Retrieves a command class from a module.
+
+ >>> import_xml_mod = get_command('import-xml')
+ >>> import_xml = get_command_class(import_xml_mod, 'import-xml')
+ >>> repr(import_xml)
+ "<class 'libbe.command.import_xml.Import_XML'>"
+ """
+ try:
+ cname = command_name.capitalize().replace('-', '_')
+ cmd = getattr(module, cname)
+ except ImportError, e:
+ raise UnknownCommand(command_name)
+ return cmd
+
def commands():
for modname in libbe.util.plugin.modnames('libbe.command'):
if modname not in ['base', 'util']:
@@ -76,13 +91,13 @@ class Option (CommandInput):
def validate(self):
if self.arg == None:
- assert self.callback != None
+ assert self.callback != None, self.name
return
- assert self.callback == None, self.callback
+ assert self.callback == None, '%s: %s' (self.name, self.callback)
assert self.arg.name == self.name, \
'Name missmatch: %s != %s' % (self.arg.name, self.name)
- assert self.arg.optional == False
- assert self.arg.repeatable == False
+ assert self.arg.optional == False, self.name
+ assert self.arg.repeatable == False, self.name
def __str__(self):
return '--%s' % self.name
diff --git a/libbe/command/comment.py b/libbe/command/comment.py
index 7a8f5f9..3ec4062 100644
--- a/libbe/command/comment.py
+++ b/libbe/command/comment.py
@@ -100,6 +100,7 @@ class Comment (libbe.command.Command):
optional=True,
completion_callback=libbe.command.util.complete_assigned),
])
+
def _run(self, storage, bugdir, **params):
bug,parent = \
libbe.command.util.bug_comment_from_user_id(bugdir, params['id'])
diff --git a/libbe/command/import_xml.py b/libbe/command/import_xml.py
index b985193..f06c741 100644
--- a/libbe/command/import_xml.py
+++ b/libbe/command/import_xml.py
@@ -13,10 +13,7 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-"""Import comments and bugs from XML"""
-import libbe
-from libbe import cmdutil, bugdir, bug, comment, utility
-from becommands.comment import complete
+
import copy
import os
import sys
@@ -24,187 +21,198 @@ try: # import core module, Python >= 2.5
from xml.etree import ElementTree
except ImportError: # look for non-core module
from elementtree import ElementTree
+
+import libbe
+import libbe.bug
+import libbe.command
+import libbe.command.util
+import libbe.comment
+import libbe.util.encoding
+import libbe.util.utility
+
if libbe.TESTING == True:
import doctest
+ import StringIO
import unittest
-__desc__ = __doc__
-def execute(args, manipulate_encodings=True, restrict_file_access=False,
- dir="."):
- """
+ import libbe.bugdir
+
+class Import_XML (libbe.command.Command):
+ """Import comments and bugs from XML
+
>>> import time
>>> import StringIO
- >>> bd = bugdir.SimpleBugDir()
- >>> os.chdir(bd.root)
- >>> orig_stdin = sys.stdin
- >>> sys.stdin = StringIO.StringIO("<be-xml><comment><uuid>c</uuid><body>This is a comment about a</body></comment></be-xml>")
- >>> execute(["-c", "a", "-"], manipulate_encodings=False)
- >>> sys.stdin = orig_stdin
- >>> bd._clear_bugs()
- >>> bug = cmdutil.bug_from_id(bd, "a")
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> cmd = Import_XML()
+ >>> cmd._setup_io = lambda i_enc,o_enc : None
+ >>> cmd.stdout = sys.stdout
+
+ >>> cmd.stdin = StringIO.StringIO('<be-xml><comment><uuid>c</uuid><body>This is a comment about a</body></comment></be-xml>')
+ >>> cmd.stdin.encoding = 'ascii'
+ >>> ret = cmd.run(bd.storage, bd, {'comment-root':'/a'}, ['-'])
+ >>> bd.flush_reload()
+ >>> bug = bd.bug_from_uuid('a')
>>> bug.load_comments(load_full=False)
>>> comment = bug.comment_root[0]
>>> print comment.body
This is a comment about a
<BLANKLINE>
- >>> comment.author == bd.user_id
- True
>>> comment.time <= int(time.time())
True
>>> comment.in_reply_to is None
True
>>> bd.cleanup()
"""
- parser = get_parser()
- options, args = parser.parse_args(args)
- complete(options, args, parser)
- if len(args) < 1:
- raise cmdutil.UsageError("Please specify an XML file.")
- if len(args) > 1:
- raise cmdutil.UsageError("Too many arguments.")
- filename = args[0]
+ name = 'import-xml'
- bd = bugdir.BugDir(from_disk=True,
- manipulate_encodings=manipulate_encodings,
- root=dir)
- if options.comment_root != None:
- croot_bug,croot_comment = \
- cmdutil.bug_comment_from_id(bd, options.comment_root)
- croot_bug.load_comments(load_full=True)
- croot_bug.set_sync_with_disk(False)
- if croot_comment.uuid == comment.INVALID_UUID:
- croot_comment = croot_bug.comment_root
- else:
- croot_comment = croot_bug.comment_from_uuid(croot_comment.uuid)
- new_croot_bug = bug.Bug(bugdir=bd, uuid=croot_bug.uuid)
- new_croot_bug.explicit_attrs = []
- new_croot_bug.comment_root = copy.deepcopy(croot_bug.comment_root)
- if croot_comment.uuid == comment.INVALID_UUID:
- new_croot_comment = new_croot_bug.comment_root
- else:
- new_croot_comment = \
- new_croot_bug.comment_from_uuid(croot_comment.uuid)
- for new in new_croot_bug.comments():
- new.explicit_attrs = []
- else:
- croot_bug,croot_comment = (None, None)
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.requires_bugdir = True
+ self.options.extend([
+ libbe.command.Option(name='ignore-missing-references', short_name='i',
+ help="If any comment's <in-reply-to> refers to a non-existent comment, ignore it (instead of raising an exception)."),
+ libbe.command.Option(name='add-only', short_name='a',
+ help='If any bug or comment listed in the XML file already exists in the bug repository, do not alter the repository version.'),
+ libbe.command.Option(name='comment-root', short_name='c',
+ help='Supply a bug or comment ID as the root of any <comment> elements that are direct children of the <be-xml> element. If any such <comment> elements exist, you are required to set this option.',
+ arg=libbe.command.Argument(
+ name='comment-root', metavar='ID',
+ completion_callback=libbe.command.util.complete_bug_comment_id)),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='xml-file', metavar='XML-FILE'),
+ ])
- if filename == '-':
- xml = sys.stdin.read()
- else:
- if restrict_file_access == True:
- cmdutil.restrict_file_access(bd, options.body)
- xml = bd.vcs.get_file_contents(filename, allow_no_vcs=True)
- str_xml = xml.encode('unicode_escape').replace(r'\n', '\n')
- # unicode read + encode to string so we know the encoding,
- # which might not be given (?) in a binary string read?
-
- # parse the xml
- root_bugs = []
- root_comments = []
- version = {}
- be_xml = ElementTree.XML(str_xml)
- if be_xml.tag != 'be-xml':
- raise utility.InvalidXML(
- 'import-xml', be_xml, 'root element must be <be-xml>')
- for child in be_xml.getchildren():
- if child.tag == 'bug':
- new = bug.Bug(bugdir=bd)
- new.from_xml(unicode(ElementTree.tostring(child)).decode("unicode_escape"))
- root_bugs.append(new)
- elif child.tag == 'comment':
- new = comment.Comment(croot_bug)
- new.from_xml(unicode(ElementTree.tostring(child)).decode("unicode_escape"))
- root_comments.append(new)
- elif child.tag == 'version':
- for gchild in child.getchildren():
- if child.tag in ['tag', 'nick', 'revision', 'revision-id']:
- text = xml.sax.saxutils.unescape(child.text)
- text = text.decode('unicode_escape').strip()
- version[child.tag] = text
- else:
- print >> sys.stderr, 'ignoring unknown tag %s in %s' \
- % (gchild.tag, child.tag)
+ def _run(self, storage, bugdir, **params):
+ writeable = bugdir.storage.writeable
+ bugdir.storage.writeable = False
+ if params['comment-root'] != None:
+ croot_bug,croot_comment = \
+ libbe.command.util.bug_comment_from_user_id(
+ bugdir, params['comment-root'])
+ croot_bug.load_comments(load_full=True)
+ if croot_comment.uuid == libbe.comment.INVALID_UUID:
+ croot_comment = croot_bug.comment_root
+ else:
+ croot_comment = croot_bug.comment_from_uuid(croot_comment.uuid)
+ new_croot_bug = libbe.bug.Bug(bugdir=bugdir, uuid=croot_bug.uuid)
+ new_croot_bug.explicit_attrs = []
+ new_croot_bug.comment_root = copy.deepcopy(croot_bug.comment_root)
+ if croot_comment.uuid == libbe.comment.INVALID_UUID:
+ new_croot_comment = new_croot_bug.comment_root
+ else:
+ new_croot_comment = \
+ new_croot_bug.comment_from_uuid(croot_comment.uuid)
+ for new in new_croot_bug.comments():
+ new.explicit_attrs = []
else:
- print >> sys.stderr, 'ignoring unknown tag %s in %s' \
- % (child.tag, comment_list.tag)
-
- # merge the new root_comments
- if options.add_only == True:
- accept_changes = False
- accept_extra_strings = False
- else:
- accept_changes = True
- accept_extra_strings = True
- accept_comments = True
- if len(root_comments) > 0:
- if croot_bug == None:
- raise UserError(
- '--comment-root option is required for your root comments:\n%s'
- % '\n\n'.join([c.string() for c in root_comments]))
- try:
- # link new comments
- new_croot_bug.add_comments(root_comments,
- default_parent=new_croot_comment,
- ignore_missing_references= \
- options.ignore_missing_references)
- except comment.MissingReference, e:
- raise cmdutil.UserError(e)
- croot_bug.merge(new_croot_bug, accept_changes=accept_changes,
- accept_extra_strings=accept_extra_strings,
- accept_comments=accept_comments)
-
- # merge the new croot_bugs
- merged_bugs = []
- old_bugs = []
- for new in root_bugs:
- try:
- old = bd.bug_from_uuid(new.alt_id)
- except KeyError:
- old = None
- if old == None:
- bd.append(new)
+ croot_bug,croot_comment = (None, None)
+
+ if params['xml-file'] == '-':
+ xml = self.stdin.read().encode(self.stdin.encoding)
else:
- old.load_comments(load_full=True)
- old.merge(new, accept_changes=accept_changes,
- accept_extra_strings=accept_extra_strings,
- accept_comments=accept_comments)
- merged_bugs.append(new)
- old_bugs.append(old)
-
- # protect against programmer error causing data loss:
- if croot_bug != None:
- comms = [c.uuid for c in croot_comment.traverse()]
- for new in root_comments:
- assert new.uuid in comms, \
- "comment %s wasn't added to %s" % (new.uuid, croot_comment.uuid)
- for new in root_bugs:
- if not new in merged_bugs:
- assert bd.has_bug(new.uuid), \
- "bug %s wasn't added" % (new.uuid)
+ self.check_restricted_access(storage, params['xml-file'])
+ xml = libbe.util.encoding.get_file_contents(
+ params['xml-file'])
- # save new information
- if croot_bug != None:
- croot_bug.save()
- for new in root_bugs:
- if not new in merged_bugs:
- new.save()
- for old in old_bugs:
- old.save()
-
-def get_parser():
- parser = cmdutil.CmdOptionParser("be import-xml XMLFILE")
- parser.add_option("-i", "--ignore-missing-references", action="store_true",
- dest="ignore_missing_references", default=False,
- help="If any comment's <in-reply-to> refers to a non-existent comment, ignore it (instead of raising an exception).")
- parser.add_option("-a", "--add-only", action='store_true',
- dest="add_only", default=False,
- help="If any bug or comment listed in the XML file already exists in the bug repository, do not alter the repository version.")
- parser.add_option("-c", "--comment-root", dest="comment_root",
- help="Supply a bug or comment ID as the root of any <comment> elements that are direct children of the <be-xml> element. If any such <comment> elements exist, you are required to set this option.")
- return parser
+ # parse the xml
+ root_bugs = []
+ root_comments = []
+ version = {}
+ be_xml = ElementTree.XML(xml)
+ if be_xml.tag != 'be-xml':
+ raise libbe.util.utility.InvalidXML(
+ 'import-xml', be_xml, 'root element must be <be-xml>')
+ for child in be_xml.getchildren():
+ if child.tag == 'bug':
+ new = libbe.bug.Bug(bugdir=bugdir)
+ new.from_xml(child)
+ root_bugs.append(new)
+ elif child.tag == 'comment':
+ new = libbe.comment.Comment(croot_bug)
+ new.from_xml(child)
+ root_comments.append(new)
+ elif child.tag == 'version':
+ for gchild in child.getchildren():
+ if child.tag in ['tag', 'nick', 'revision', 'revision-id']:
+ text = xml.sax.saxutils.unescape(child.text)
+ text = text.decode('unicode_escape').strip()
+ version[child.tag] = text
+ else:
+ print >> sys.stderr, 'ignoring unknown tag %s in %s' \
+ % (gchild.tag, child.tag)
+ else:
+ print >> sys.stderr, 'ignoring unknown tag %s in %s' \
+ % (child.tag, comment_list.tag)
+
+ # merge the new root_comments
+ if params['add-only'] == True:
+ accept_changes = False
+ accept_extra_strings = False
+ else:
+ accept_changes = True
+ accept_extra_strings = True
+ accept_comments = True
+ if len(root_comments) > 0:
+ if croot_bug == None:
+ raise UserError(
+ '--comment-root option is required for your root comments:\n%s'
+ % '\n\n'.join([c.string() for c in root_comments]))
+ try:
+ # link new comments
+ new_croot_bug.add_comments(root_comments,
+ default_parent=new_croot_comment,
+ ignore_missing_references= \
+ params['ignore-missing-references'])
+ except libbe.comment.MissingReference, e:
+ raise libbe.command.UserError(e)
+ croot_bug.merge(new_croot_bug, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ accept_comments=accept_comments)
+
+ # merge the new croot_bugs
+ merged_bugs = []
+ old_bugs = []
+ for new in root_bugs:
+ try:
+ old = bugdir.bug_from_uuid(new.alt_id)
+ except KeyError:
+ old = None
+ if old == None:
+ bd.append(new)
+ else:
+ old.load_comments(load_full=True)
+ old.merge(new, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ accept_comments=accept_comments)
+ merged_bugs.append(new)
+ old_bugs.append(old)
+
+ # protect against programmer error causing data loss:
+ if croot_bug != None:
+ comms = [c.uuid for c in croot_comment.traverse()]
+ for new in root_comments:
+ assert new.uuid in comms, \
+ "comment %s wasn't added to %s" % (new.uuid, croot_comment.uuid)
+ for new in root_bugs:
+ if not new in merged_bugs:
+ assert bugdir.has_bug(new.uuid), \
+ "bug %s wasn't added" % (new.uuid)
+
+ # save new information
+ bugdir.storage.writeable = writeable
+ if croot_bug != None:
+ croot_bug.save()
+ for new in root_bugs:
+ if not new in merged_bugs:
+ new.save()
+ for old in old_bugs:
+ old.save()
-longhelp="""
+ def _long_help(self):
+ return """
Import comments and bugs from XMLFILE. If XMLFILE is '-', the file is
read from stdin.
@@ -302,9 +310,8 @@ Devs recieve email, and save it's contents as demux-bug.xml
dev$ cat demux-bug.xml | be import-xml -
"""
-def help():
- return get_parser().help_str() + longhelp
+Import_xml = Import_XML # alias for libbe.command.base.get_command_class()
if libbe.TESTING == True:
class LonghelpTestCase (unittest.TestCase):
@@ -312,12 +319,12 @@ if libbe.TESTING == True:
Test import scenarios given in longhelp.
"""
def setUp(self):
- self.bugdir = bugdir.SimpleBugDir()
- self.original_working_dir = os.getcwd()
- os.chdir(self.bugdir.root)
+ self.bugdir = libbe.bugdir.SimpleBugDir(memory=False)
+ self.cmd = Import_XML()
+ self.cmd._setup_io = lambda i_enc,o_enc : None
bugA = self.bugdir.bug_from_uuid('a')
self.bugdir.remove_bug(bugA)
- self.bugdir.set_sync_with_disk(False)
+ self.bugdir.storage.writeable = False
bugB = self.bugdir.bug_from_uuid('b')
bugB.creator = 'John'
bugB.status = 'open'
@@ -329,8 +336,8 @@ if libbe.TESTING == True:
comm2 = bugB.comment_root.new_reply(body='World\n')
comm2.uuid = 'c2'
comm2.author = 'Jess'
+ self.bugdir.storage.writeable = True
bugB.save()
- self.bugdir.set_sync_with_disk(True)
self.xml = """
<be-xml>
<bug>
@@ -351,22 +358,19 @@ if libbe.TESTING == True:
</bug>
</be-xml>
"""
+ self.cmd.stdin = StringIO.StringIO(self.xml)
+ self.cmd.stdin.encoding = 'ascii'
+ self.cmd.stdout = StringIO.StringIO()
def tearDown(self):
- os.chdir(self.original_working_dir)
self.bugdir.cleanup()
- def _execute(self, *args):
- import StringIO
- orig_stdin = sys.stdin
- sys.stdin = StringIO.StringIO(self.xml)
- execute(list(args)+["-"], manipulate_encodings=False,
- restrict_file_access=True)
- sys.stdin = orig_stdin
- self.bugdir._clear_bugs()
+ def _execute(self, params={}, args=[]):
+ self.cmd.run(self.bugdir.storage, self.bugdir, params, args)
+ self.bugdir.flush_reload()
def testCleanBugdir(self):
uuids = list(self.bugdir.uuids())
self.failUnless(uuids == ['b'], uuids)
def testNotAddOnly(self):
- self._execute()
+ self._execute({}, ['-'])
uuids = list(self.bugdir.uuids())
self.failUnless(uuids == ['b'], uuids)
bugB = self.bugdir.bug_from_uuid('b')
@@ -399,7 +403,7 @@ if libbe.TESTING == True:
self.failUnless(c4.author == 'Jed', c4.author)
self.failUnless(c4.body == 'And thanks\n', c4.body)
def testAddOnly(self):
- self._execute('--add-only')
+ self._execute({'add-only':True}, ['-'])
uuids = list(self.bugdir.uuids())
self.failUnless(uuids == ['b'], uuids)
bugB = self.bugdir.bug_from_uuid('b')
diff --git a/libbe/ui/command_line.py b/libbe/ui/command_line.py
index 4042123..feeccd4 100755
--- a/libbe/ui/command_line.py
+++ b/libbe/ui/command_line.py
@@ -222,7 +222,7 @@ class BE (libbe.command.Command):
cmdlist = []
for name in libbe.command.commands():
module = libbe.command.get_command(name)
- Class = getattr(module, name.capitalize())
+ Class = libbe.command.get_command_class(module, name)
cmdlist.append((name, Class.__doc__.splitlines()[0]))
cmdlist.sort()
longest_cmd_len = max([len(name) for name,desc in cmdlist])