# Copyright (C) 2008-2012 Chris Ball # Gianluca Montecchi # Niall Douglas (s_sourceforge@nedprod.com) # Thomas Habets # W. Trevor King # # This file is part of Bugs Everywhere. # # Bugs Everywhere is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation, either version 2 of the License, or (at your option) any # later version. # # Bugs Everywhere is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # Bugs Everywhere. If not, see . """Define :py:class:`Comment` for representing bug comments. """ import base64 import functools import time try: from email.mime.base import MIMEBase from email.encoders import encode_base64 except ImportError: # adjust to old python 2.4 from email.MIMEBase import MIMEBase from email.Encoders import encode_base64 from xml.etree import ElementTree import xml.sax.saxutils import libbe import libbe.util.id from libbe.storage.util.properties import Property, doc_property, \ local_property, cached_property, change_hook_property import libbe.storage.util.settings_object as settings_object import libbe.storage.util.mapfile as mapfile from libbe.util.tree import Tree import libbe.util.utility as utility if libbe.TESTING: import doctest class MissingReference(ValueError): def __init__(self, comment): msg = "Missing reference to %s" % (comment.in_reply_to) ValueError.__init__(self, msg) self.reference = comment.in_reply_to self.comment = comment INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!" DEFAULT_IDX_FULL_IDX_LIST = None # To be redefined later def load_comments(bug, load_full=False): """ Set load_full=True when you want to load the comment completely from disk *now*, rather than waiting and lazy loading as required. """ uuids = [] for id in libbe.util.id.child_uuids( bug.storage.children( bug.id.storage())): uuids.append(id) comments = [] for uuid in uuids: comm = Comment(bug, uuid, from_storage=True) if load_full: comm.load_settings() _ = comm.body # force the body to load comments.append(comm) bug.comment_root = Comment(bug, uuid=INVALID_UUID) bug.add_comments(comments, ignore_missing_references=True) return bug.comment_root def save_comments(bug): for comment in bug.comment_root.traverse(): comment.bug = bug comment.storage = bug.storage comment.save() @functools.total_ordering class Comment (Tree, settings_object.SavedSettingsObject): """Comments are a notes that attach to :py:class:`~libbe.bug.Bug`\\s in threaded trees. In mailing-list terms, a comment is analogous to a single part of an email. >>> c = Comment() >>> c.uuid is not None True >>> c.uuid = "some-UUID" >>> print(c.content_type) text/plain """ settings_properties = [] required_saved_properties = [] _prop_save_settings = settings_object.prop_save_settings _prop_load_settings = settings_object.prop_load_settings def _versioned_property(settings_properties=settings_properties, required_saved_properties=required_saved_properties, **kwargs): if "settings_properties" not in kwargs: kwargs["settings_properties"] = settings_properties if "required_saved_properties" not in kwargs: kwargs["required_saved_properties"] = required_saved_properties return settings_object.versioned_property(**kwargs) @_versioned_property(name="Alt-id", doc="Alternate ID for linking imported comments. " + "Internally comments are linked (via In-reply-to) " + "to the parent's UUID. However, these UUIDs are " + "generated internally, so Alt-id is provided as " + "a user-controlled linking target.") def alt_id(): return {} @_versioned_property(name="Author", doc="The author of the comment") def author(): return {} @_versioned_property(name="In-reply-to", doc="UUID for parent comment or bug") def in_reply_to(): return {} @_versioned_property(name="Content-type", doc="Mime type for comment body", default="text/plain", require_save=True) def content_type(): return {} @_versioned_property(name="Date", doc="An RFC 2822 timestamp for comment creation") def date(): return {} def _get_time(self): if self.date is None: return None return utility.str_to_time(self.date) def _set_time(self, value): self.date = utility.time_to_str(value) time = property(fget=_get_time, fset=_set_time, doc="An integer version of .date") def _get_comment_body(self): if self.storage is not None and self.storage.is_readable() \ and self.uuid != INVALID_UUID: return self.storage.get( self.id.storage("body"), decode=self.content_type.startswith("text/")) def _set_comment_body(self, old=None, new=None, force=False): assert self.uuid != INVALID_UUID, self if self.content_type.startswith('text/') \ and self.bug is not None and self.bug.bugdir is not None: new = libbe.util.id.short_to_long_text( {self.bug.bugdir.uuid: self.bug.bugdir}, new) if (self.storage is not None and self.storage.writeable) \ or force: assert new is not None, "Can't save empty comment" self.storage.set(self.id.storage("body"), new) @Property @change_hook_property(hook=_set_comment_body) @cached_property(generator=_get_comment_body) @local_property("body") @doc_property(doc="The meat of the comment") def body(): return {} def _extra_strings_check_fn(value): return utility.iterable_full_of_strings( value, alternative=settings_object.EMPTY) def _extra_strings_change_hook(self, old, new): self.extra_strings.sort() # to make merging easier self._prop_save_settings(old, new) @_versioned_property(name="extra_strings", doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/.py.", default=[], check_fn=_extra_strings_check_fn, change_hook=_extra_strings_change_hook, mutable=True) def extra_strings(): return {} def __init__(self, bug=None, uuid=None, from_storage=False, in_reply_to=None, body=None, content_type=None): """ Set ``from_storage=True`` to load an old comment. Set ``from_storage=False`` to create a new comment. The ``uuid`` option is required when ``from_storage==True``. The in_reply_to, body, and content_type options are only used if ``from_storage==False`` (the default). When ``from_storage==True``, they are loaded from the bug database. ``content_type`` decides if the body should be run through :py:func:`util.id.short_to_long_text` before saving. See :py:meth:`_set_comment_body` for details. ``in_reply_to`` should be the uuid string of the parent comment. """ Tree.__init__(self) settings_object.SavedSettingsObject.__init__(self) self.bug = bug self.storage = None self.uuid = uuid self.id = libbe.util.id.ID(self, 'comment') if not from_storage: if uuid is None: self.uuid = libbe.util.id.uuid_gen() self.time = int(time.time()) # only save to second precision self.in_reply_to = in_reply_to if content_type is not None: self.content_type = content_type self.body = body if self.bug is not None: self.storage = self.bug.storage if not from_storage: if self.storage is not None and self.storage.is_writeable(): self.save() def __eq__(self, other, idx_list=None): if idx_list is None: idx_list = DEFAULT_IDX_FULL_IDX_LIST for idx in idx_list: if idx(self) != idx(other): return False return True def __lt__(self, other, idx_list=None): if idx_list is None: idx_list = DEFAULT_IDX_FULL_IDX_LIST for idx in idx_list: if idx(self) != idx(other): return idx(self) < idx(other) return False def __str__(self): """ >>> comm = Comment(bug=None, body="Some insightful remarks") >>> comm.uuid = "com-1" >>> comm.date = "Thu, 20 Nov 2008 15:55:11 +0000" >>> comm.author = "Jane Doe " >>> print(comm) --------- Comment --------- Name: //com From: Jane Doe Date: Thu, 20 Nov 2008 15:55:11 +0000 Some insightful remarks """ return self.string() def traverse(self, *args, **kwargs): """Avoid working with the possible dummy root comment""" for comment in Tree.traverse(self, *args, **kwargs): if comment.uuid == INVALID_UUID: continue yield comment # serializing methods def _setting_attr_string(self, setting): value = getattr(self, setting) if value is None: return "" if type(value) not in (str,): return str(value) return value def safe_in_reply_to(self): """ Return self.in_reply_to, except... * if no comment matches that id, in which case return None. * if that id matches another comments .alt_id, in which case return the matching comments .uuid. """ if self.in_reply_to is None: return None else: try: irt_comment = self.bug.comment_from_uuid( self.in_reply_to, match_alt_id=True) return irt_comment.uuid except KeyError: return None def xml(self, indent=0): """ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") >>> comm.uuid = "0123" >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> print(comm.xml(indent=2)) 0123 //012 Thu, 01 Jan 1970 00:00:00 +0000 text/plain Some insightful remarks >>> comm.content_type = 'image/png' >>> print(comm.xml()) 0123 //012 Thu, 01 Jan 1970 00:00:00 +0000 image/png U29tZQppbnNpZ2h0ZnVsCnJlbWFya3MK """ if self.content_type.startswith('text/'): body = (self.body or '').rstrip('\n') else: maintype, subtype = self.content_type.split('/', 1) msg = MIMEBase(maintype, subtype) msg.set_payload(self.body or '') encode_base64(msg) body = base64.encodebytes(self.body.encode() or b'') info = [('uuid', self.uuid), ('alt-id', self.alt_id), ('short-name', self.id.user()), ('in-reply-to', self.safe_in_reply_to()), ('author', self._setting_attr_string('author')), ('date', self.date), ('content-type', self.content_type), ('body', body)] lines = [''] for (k, v) in info: if v is not None: lines.append(' <%s>%s' % (k, xml.sax.saxutils.escape(v), k)) for estr in self.extra_strings: lines.append(' %s' % estr) lines.append('') istring = ' '*indent sep = '\n' + istring return istring + sep.join(lines).rstrip('\n') def from_xml(self, xml_string, preserve_uuids=False): """ Note: If alt-id is not given, translates any fields to fields. >>> commA = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") >>> commA.uuid = "0123" >>> commA.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> commA.author = u'Fran\xe7ois' >>> commA.extra_strings += ['TAG: very helpful'] >>> xml = commA.xml() >>> commB = Comment() >>> commB.from_xml(xml) >>> commB.explicit_attrs ['author', 'date', 'content_type', 'body', 'alt_id'] >>> commB.xml() == xml False >>> commB.uuid = commB.alt_id >>> commB.alt_id = None >>> commB.xml() == xml True >>> commC = Comment() >>> commC.from_xml(xml, preserve_uuids=True) >>> commC.uuid == commA.uuid True """ if type(xml_string) == str: xml_string = xml_string.strip().encode('unicode_escape') if isinstance(xml_string, ElementTree.Element): comment = xml_string else: comment = ElementTree.XML(xml_string) if comment.tag != 'comment': raise utility.InvalidXML( 'comment', comment, 'root element must be ') tags = ['uuid', 'alt-id', 'in-reply-to', 'author', 'date', 'content-type', 'body', 'extra-string'] self.explicit_attrs = [] uuid = None body = None estrs = [] for child in comment: if child.tag == 'short-name': pass elif child.tag in tags: if child.text is None or len(child.text) == 0: text = settings_object.EMPTY else: text = xml.sax.saxutils.unescape(child.text) if not isinstance(text, str): text = text.decode('unicode_escape') text = text.strip() if child.tag == 'uuid' and not preserve_uuids: uuid = text continue # don't set the comment's uuid tag. elif child.tag == 'body': body = text self.explicit_attrs.append(child.tag) continue # don't set the comment's body yet. elif child.tag == 'extra-string': estrs.append(text) continue # don't set the comment's extra_string yet. attr_name = child.tag.replace('-', '_') self.explicit_attrs.append(attr_name) setattr(self, attr_name, text) else: libbe.LOG.warning( 'ignoring unknown tag {0} in {1}'.format( child.tag, comment.tag)) if uuid != self.uuid and self.alt_id is None: self.explicit_attrs.append('alt_id') self.alt_id = uuid if body is not None: if self.content_type.startswith('text/'): self.body = body+'\n' # restore trailing newline else: self.body = base64.decodestring(body) self.extra_strings = estrs def merge(self, other, accept_changes=True, accept_extra_strings=True, change_exception=False): """ Merge info from other into this comment. Overrides any attributes in self that are listed in other.explicit_attrs. >>> commA = Comment(bug=None, body='Some insightful remarks') >>> commA.uuid = '0123' >>> commA.date = 'Thu, 01 Jan 1970 00:00:00 +0000' >>> commA.author = 'Frank' >>> commA.extra_strings += ['TAG: very helpful'] >>> commA.extra_strings += ['TAG: favorite'] >>> commB = Comment(bug=None, body='More insightful remarks') >>> commB.uuid = '3210' >>> commB.date = 'Fri, 02 Jan 1970 00:00:00 +0000' >>> commB.author = 'John' >>> commB.explicit_attrs = ['author', 'body'] >>> commB.extra_strings += ['TAG: very helpful'] >>> commB.extra_strings += ['TAG: useful'] >>> commA.merge(commB, accept_changes=False, ... accept_extra_strings=False, change_exception=False) >>> commA.merge(commB, accept_changes=False, ... accept_extra_strings=False, change_exception=True) Traceback (most recent call last): ... ValueError: Merge would change author "Frank"->"John" for comment 0123 >>> commA.merge(commB, accept_changes=True, ... accept_extra_strings=False, change_exception=True) Traceback (most recent call last): ... ValueError: Merge would add extra string "TAG: useful" to comment 0123 >>> print(commA.author) John >>> print(commA.extra_strings) ['TAG: favorite', 'TAG: very helpful'] >>> commA.merge(commB, accept_changes=True, ... accept_extra_strings=True, change_exception=True) >>> print(commA.extra_strings) ['TAG: favorite', 'TAG: useful', 'TAG: very helpful'] >>> print(type(commA)) >>> print(dir(commA)) >>> print(commA.xml()) >>> print(commA.xml()) 0123 //012 John Thu, 01 Jan 1970 00:00:00 +0000 text/plain More insightful remarks TAG: favorite TAG: useful TAG: very helpful """ if hasattr(other, 'explicit_attrs'): for attr in other.explicit_attrs: old = getattr(self, attr) new = getattr(other, attr) if old != new: if accept_changes: setattr(self, attr, new) elif change_exception: raise ValueError( ('Merge would change {} "{}"->"{}" for comment {}' ).format(attr, old, new, self.uuid)) if self.alt_id == self.uuid: self.alt_id = None for estr in other.extra_strings: if estr not in self.extra_strings: if accept_extra_strings: self.extra_strings.append(estr) elif change_exception: raise ValueError( 'Merge would add extra string "%s" to comment %s' % (estr, self.uuid)) def string(self, indent=0): """ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") >>> comm.uuid = 'abcdef' >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> print(comm.string(indent=2)) --------- Comment --------- Name: //abc From: Date: Thu, 01 Jan 1970 00:00:00 +0000 Some insightful remarks """ lines = [] lines.append("--------- Comment ---------") lines.append("Name: %s" % self.id.user()) lines.append("From: %s" % (self._setting_attr_string("author"))) lines.append("Date: %s" % self.date) lines.append("") if self.content_type.startswith("text/"): body = (self.body or "") if self.bug is not None and self.bug.bugdir is not None: body = libbe.util.id.long_to_short_text( {self.bug.bugdir.uuid: self.bug.bugdir}, body) lines.extend(body.splitlines()) else: lines.append(("Content type %s not printable. " + "Try XML output instead") % self.content_type) istring = ' '*indent sep = '\n' + istring return istring + sep.join(lines).rstrip('\n') def string_thread(self, string_method_name="string", indent=0, flatten=True): """ Return a string displaying a thread of comments. bug_shortname is only used if auto_name_map == True. string_method_name (defaults to "string") is the name of the Comment method used to generate the output string for each Comment in the thread. The method must take the arguments indent and shortname. >>> a = Comment(bug=None, uuid="a", body="Insightful remarks") >>> a.time = utility.str_to_time("Thu, 20 Nov 2008 01:00:00 +0000") >>> b = a.new_reply("Critique original comment") >>> b.uuid = "b" >>> b.time = utility.str_to_time("Thu, 20 Nov 2008 02:00:00 +0000") >>> c = b.new_reply("Begin flamewar :p") >>> c.uuid = "c" >>> c.time = utility.str_to_time("Thu, 20 Nov 2008 03:00:00 +0000") >>> d = a.new_reply("Useful examples") >>> d.uuid = "d" >>> d.time = utility.str_to_time("Thu, 20 Nov 2008 04:00:00 +0000") >>> a.sort(key=lambda comm : comm.time) >>> print(a.string_thread(flatten=True)) --------- Comment --------- Name: //a From: Date: Thu, 20 Nov 2008 01:00:00 +0000 Insightful remarks --------- Comment --------- Name: //b From: Date: Thu, 20 Nov 2008 02:00:00 +0000 Critique original comment --------- Comment --------- Name: //c From: Date: Thu, 20 Nov 2008 03:00:00 +0000 Begin flamewar :p --------- Comment --------- Name: //d From: Date: Thu, 20 Nov 2008 04:00:00 +0000 Useful examples >>> print(a.string_thread()) --------- Comment --------- Name: //a From: Date: Thu, 20 Nov 2008 01:00:00 +0000 Insightful remarks --------- Comment --------- Name: //b From: Date: Thu, 20 Nov 2008 02:00:00 +0000 Critique original comment --------- Comment --------- Name: //c From: Date: Thu, 20 Nov 2008 03:00:00 +0000 Begin flamewar :p --------- Comment --------- Name: //d From: Date: Thu, 20 Nov 2008 04:00:00 +0000 Useful examples """ stringlist = [] for depth, comment in self.thread(flatten=flatten): ind = 2*depth+indent string_fn = getattr(comment, string_method_name) stringlist.append(string_fn(indent=ind)) return '\n'.join(stringlist) def xml_thread(self, indent=0): return self.string_thread(string_method_name="xml", indent=indent) # methods for saving/loading/acessing settings and properties. def load_settings(self, settings_mapfile=None): if self.uuid == INVALID_UUID: return if settings_mapfile is None: settings_mapfile = self.storage.get( self.id.storage('values'), '{}\n') try: settings = mapfile.parse(settings_mapfile) except mapfile.InvalidMapfileContents: raise Exception('Invalid settings file for comment %s\n' '(BE version missmatch?)' % self.id.user()) self._setup_saved_settings(settings) def save_settings(self): if self.uuid == INVALID_UUID: return mf = mapfile.generate(self._get_saved_settings()) self.storage.set(self.id.storage("values"), mf) def save(self): """ Save any loaded contents to storage. However, if ``self.storage.is_writeable() == True``, then any changes are automatically written to storage as soon as they happen, so calling this method will just waste time (unless something else has been messing with your stored files). """ if self.uuid == INVALID_UUID: return assert self.storage is not None, "Can't save without storage" assert self.body is not None, "Can't save blank comment" if self.bug is not None: parent = self.bug.id.storage() else: parent = None self.storage.add(self.id.storage(), parent=parent, directory=True) self.storage.add(self.id.storage('values'), parent=self.id.storage(), directory=False) self.storage.add(self.id.storage('body'), parent=self.id.storage(), directory=False) self.save_settings() self._set_comment_body(new=self.body, force=True) def remove(self): for comment in self: comment.remove() if self.uuid != INVALID_UUID: self.storage.recursive_remove(self.id.storage()) def add_reply(self, reply, allow_time_inversion=False): if self.uuid != INVALID_UUID: reply.in_reply_to = self.uuid self.append(reply) def new_reply(self, body=None, content_type=None): """ >>> comm = Comment(bug=None, body="Some insightful remarks") >>> repA = comm.new_reply("Critique original comment") >>> repB = repA.new_reply("Begin flamewar :p") >>> repB.in_reply_to == repA.uuid True """ reply = Comment(self.bug, body=body, content_type=content_type) self.add_reply(reply) return reply def comment_from_uuid(self, uuid, match_alt_id=True): """Use a uuid to look up a comment. >>> a = Comment(bug=None, uuid="a") >>> b = a.new_reply() >>> b.uuid = "b" >>> c = b.new_reply() >>> c.uuid = "c" >>> d = a.new_reply() >>> d.uuid = "d" >>> d.alt_id = "d-alt" >>> comm = a.comment_from_uuid("d") >>> id(comm) == id(d) True >>> comm = a.comment_from_uuid("d-alt") >>> id(comm) == id(d) True >>> comm = a.comment_from_uuid(None, match_alt_id=False) Traceback (most recent call last): ... KeyError: None """ for comment in self.traverse(): if comment.uuid == uuid: return comment if match_alt_id and uuid is not None \ and comment.alt_id == uuid: return comment raise KeyError(uuid) # methods for id generation def sibling_uuids(self): if self.bug is not None: return self.bug.uuids() return [] def idx_attr(comment, attr): """ Generate an index for sorting of comments based on the given attribute. >>> attr="author" >>> commentA = Comment() >>> commentA.author = "John Doe" >>> idx_attr(commentA, attr) 'John Doe' >>> commentB = Comment() >>> commentB.author = "John Doe" >>> idx_attr(commentA, attr) == idx_attr(commentB, attr) True """ if not hasattr(comment, attr): return None return getattr(comment, attr) # alphabetical rankings (a < z) def idx_uuid(comment): return idx_attr(comment, "uuid") def idx_author(comment): return idx_attr(comment, "author") def idx_in_reply_to(comment): return idx_attr(comment, "in_reply_to") def idx_content_type(comment): return idx_attr(comment, "content_type") def idx_body(comment): return idx_attr(comment, "body") def idx_extra_strings(comment): return idx_attr(comment, "extra_strings") # chronological rankings (newer < older) def idx_time(comment): return idx_attr(comment, "time") DEFAULT_IDX_FULL_IDX_LIST = \ (idx_uuid, idx_author, idx_in_reply_to, idx_content_type, idx_body, idx_extra_strings, idx_time) if libbe.TESTING: suite = doctest.DocTestSuite()