# Copyright (C) 2008-2010 Gianluca Montecchi # Thomas Habets # W. Trevor King # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Define the Comment class for representing bug comments. """ import base64 import os import os.path import sys import time import types try: from email.mime.base import MIMEBase from email.encoders import encode_base64 except ImportError: # adjust to old python 2.4 from email.MIMEBase import MIMEBase from email.Encoders import encode_base64 try: # import core module, Python >= 2.5 from xml.etree import ElementTree except ImportError: # look for non-core module from elementtree import ElementTree import xml.sax.saxutils import libbe import libbe.util.id from libbe.storage.util.properties import Property, doc_property, \ local_property, defaulting_property, checked_property, cached_property, \ primed_property, change_hook_property, settings_property import libbe.storage.util.settings_object as settings_object import libbe.storage.util.mapfile as mapfile from libbe.util.tree import Tree import libbe.util.utility as utility if libbe.TESTING == True: import doctest class InvalidShortname(KeyError): def __init__(self, shortname, shortnames): msg = "Invalid shortname %s\n%s" % (shortname, shortnames) KeyError.__init__(self, msg) self.shortname = shortname self.shortnames = shortnames class MissingReference(ValueError): def __init__(self, comment): msg = "Missing reference to %s" % (comment.in_reply_to) ValueError.__init__(self, msg) self.reference = comment.in_reply_to self.comment = comment class DiskAccessRequired (Exception): def __init__(self, goal): msg = "Cannot %s without accessing the disk" % goal Exception.__init__(self, msg) INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!" def load_comments(bug, load_full=False): """ Set load_full=True when you want to load the comment completely from disk *now*, rather than waiting and lazy loading as required. """ uuids = [] for id in libbe.util.id.child_uuids( bug.storage.children( bug.id.storage())): uuids.append(id) comments = [] for uuid in uuids: comm = Comment(bug, uuid, from_storage=True) if load_full == True: comm.load_settings() dummy = comm.body # force the body to load comments.append(comm) bug.comment_root = Comment(bug, uuid=INVALID_UUID) bug.add_comments(comments, ignore_missing_references=True) return bug.comment_root def save_comments(bug): for comment in bug.comment_root.traverse(): comment.save() class Comment (Tree, settings_object.SavedSettingsObject): """ >>> c = Comment() >>> c.uuid != None True >>> c.uuid = "some-UUID" >>> print c.content_type text/plain """ settings_properties = [] required_saved_properties = [] _prop_save_settings = settings_object.prop_save_settings _prop_load_settings = settings_object.prop_load_settings def _versioned_property(settings_properties=settings_properties, required_saved_properties=required_saved_properties, **kwargs): if "settings_properties" not in kwargs: kwargs["settings_properties"] = settings_properties if "required_saved_properties" not in kwargs: kwargs["required_saved_properties"]=required_saved_properties return settings_object.versioned_property(**kwargs) @_versioned_property(name="Alt-id", doc="Alternate ID for linking imported comments. Internally comments are linked (via In-reply-to) to the parent's UUID. However, these UUIDs are generated internally, so Alt-id is provided as a user-controlled linking target.") def alt_id(): return {} @_versioned_property(name="Author", doc="The author of the comment") def author(): return {} @_versioned_property(name="In-reply-to", doc="UUID for parent comment or bug") def in_reply_to(): return {} @_versioned_property(name="Content-type", doc="Mime type for comment body", default="text/plain", require_save=True) def content_type(): return {} @_versioned_property(name="Date", doc="An RFC 2822 timestamp for comment creation") def date(): return {} def _get_time(self): if self.date == None: return None return utility.str_to_time(self.date) def _set_time(self, value): self.date = utility.time_to_str(value) time = property(fget=_get_time, fset=_set_time, doc="An integer version of .date") def _get_comment_body(self): if self.storage != None and self.storage.is_readable() \ and self.uuid != INVALID_UUID: return self.storage.get(self.id.storage("body"), decode=self.content_type.startswith("text/")) def _set_comment_body(self, old=None, new=None, force=False): assert self.uuid != INVALID_UUID, self if self.content_type.startswith('text/') \ and self.bug != None and self.bug.bugdir != None: new = libbe.util.id.short_to_long_text([self.bug.bugdir], new) if (self.storage != None and self.storage.writeable == True) \ or force==True: assert new != None, "Can't save empty comment" self.storage.set(self.id.storage("body"), new) @Property @change_hook_property(hook=_set_comment_body) @cached_property(generator=_get_comment_body) @local_property("body") @doc_property(doc="The meat of the comment") def body(): return {} def _extra_strings_check_fn(value): return utility.iterable_full_of_strings(value, \ alternative=settings_object.EMPTY) def _extra_strings_change_hook(self, old, new): self.extra_strings.sort() # to make merging easier self._prop_save_settings(old, new) @_versioned_property(name="extra_strings", doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/.py.", default=[], check_fn=_extra_strings_check_fn, change_hook=_extra_strings_change_hook, mutable=True) def extra_strings(): return {} def __init__(self, bug=None, uuid=None, from_storage=False, in_reply_to=None, body=None, content_type=None): """ Set from_storage=True to load an old comment. Set from_storage=False to create a new comment. The uuid option is required when from_storage==True. The in_reply_to, body, and content_type options are only used if from_storage==False (the default). When from_storage==True, they are loaded from the bug database. content_type decides if the body should be run through libbe.util.id.short_to_long_text() before saving. See ._set_comment_body() for details. in_reply_to should be the uuid string of the parent comment. """ Tree.__init__(self) settings_object.SavedSettingsObject.__init__(self) self.bug = bug self.storage = None self.uuid = uuid self.id = libbe.util.id.ID(self, 'comment') if from_storage == False: if uuid == None: self.uuid = libbe.util.id.uuid_gen() self.time = int(time.time()) # only save to second precision self.in_reply_to = in_reply_to if content_type != None: self.content_type = content_type self.body = body if self.bug != None: self.storage = self.bug.storage if from_storage == False: if self.storage != None and self.storage.is_writeable(): self.save() def __cmp__(self, other): return cmp_full(self, other) def __str__(self): """ >>> comm = Comment(bug=None, body="Some insightful remarks") >>> comm.uuid = "com-1" >>> comm.date = "Thu, 20 Nov 2008 15:55:11 +0000" >>> comm.author = "Jane Doe " >>> print comm --------- Comment --------- Name: //com From: Jane Doe Date: Thu, 20 Nov 2008 15:55:11 +0000 Some insightful remarks """ return self.string() def traverse(self, *args, **kwargs): """Avoid working with the possible dummy root comment""" for comment in Tree.traverse(self, *args, **kwargs): if comment.uuid == INVALID_UUID: continue yield comment # serializing methods def _setting_attr_string(self, setting): value = getattr(self, setting) if value == None: return "" if type(value) not in types.StringTypes: return str(value) return value def safe_in_reply_to(self): """ Return self.in_reply_to, except... * if no comment matches that id, in which case return None. * if that id matches another comments .alt_id, in which case return the matching comments .uuid. """ if self.in_reply_to == None: return None else: try: irt_comment = self.bug.comment_from_uuid( self.in_reply_to, match_alt_id=True) return irt_comment.uuid except KeyError: return None def xml(self, indent=0): """ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") >>> comm.uuid = "0123" >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> print comm.xml(indent=2) 0123 //012 Thu, 01 Jan 1970 00:00:00 +0000 text/plain Some insightful remarks >>> comm.content_type = 'image/png' >>> print comm.xml() 0123 //012 Thu, 01 Jan 1970 00:00:00 +0000 image/png U29tZQppbnNpZ2h0ZnVsCnJlbWFya3MK """ if self.content_type.startswith('text/'): body = (self.body or '').rstrip('\n') else: maintype,subtype = self.content_type.split('/',1) msg = MIMEBase(maintype, subtype) msg.set_payload(self.body or '') encode_base64(msg) body = base64.encodestring(self.body or '') info = [('uuid', self.uuid), ('alt-id', self.alt_id), ('short-name', self.id.user()), ('in-reply-to', self.safe_in_reply_to()), ('author', self._setting_attr_string('author')), ('date', self.date), ('content-type', self.content_type), ('body', body)] lines = [''] for (k,v) in info: if v != None: lines.append(' <%s>%s' % (k,xml.sax.saxutils.escape(v),k)) for estr in self.extra_strings: lines.append(' %s' % estr) lines.append('') istring = ' '*indent sep = '\n' + istring return istring + sep.join(lines).rstrip('\n') def from_xml(self, xml_string, verbose=True): """ Note: If alt-id is not given, translates any fields to fields. >>> commA = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") >>> commA.uuid = "0123" >>> commA.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> commA.author = u'Fran\xe7ois' >>> commA.extra_strings += ['TAG: very helpful'] >>> xml = commA.xml() >>> commB = Comment() >>> commB.from_xml(xml, verbose=True) >>> commB.explicit_attrs ['author', 'date', 'content_type', 'body', 'alt_id'] >>> commB.xml() == xml False >>> commB.uuid = commB.alt_id >>> commB.alt_id = None >>> commB.xml() == xml True """ if type(xml_string) == types.UnicodeType: xml_string = xml_string.strip().encode('unicode_escape') if hasattr(xml_string, 'getchildren'): # already an ElementTree Element comment = xml_string else: comment = ElementTree.XML(xml_string) if comment.tag != 'comment': raise utility.InvalidXML( \ 'comment', comment, 'root element must be ') tags=['uuid','alt-id','in-reply-to','author','date','content-type', 'body','extra-string'] self.explicit_attrs = [] uuid = None body = None estrs = [] for child in comment.getchildren(): if child.tag == 'short-name': pass elif child.tag in tags: if child.text == None or len(child.text) == 0: text = settings_object.EMPTY else: text = xml.sax.saxutils.unescape(child.text) text = text.decode('unicode_escape').strip() if child.tag == 'uuid': uuid = text continue # don't set the comment's uuid tag. elif child.tag == 'body': body = text self.explicit_attrs.append(child.tag) continue # don't set the comment's body yet. elif child.tag == 'extra-string': estrs.append(text) continue # don't set the comment's extra_string yet. attr_name = child.tag.replace('-','_') self.explicit_attrs.append(attr_name) setattr(self, attr_name, text) elif verbose == True: print >> sys.stderr, 'Ignoring unknown tag %s in %s' \ % (child.tag, comment.tag) if uuid != self.uuid and self.alt_id == None: self.explicit_attrs.append('alt_id') self.alt_id = uuid if body != None: if self.content_type.startswith('text/'): self.body = body+'\n' # restore trailing newline else: self.body = base64.decodestring(body) self.extra_strings = estrs def merge(self, other, accept_changes=True, accept_extra_strings=True, change_exception=False): """ Merge info from other into this comment. Overrides any attributes in self that are listed in other.explicit_attrs. >>> commA = Comment(bug=None, body='Some insightful remarks') >>> commA.uuid = '0123' >>> commA.date = 'Thu, 01 Jan 1970 00:00:00 +0000' >>> commA.author = 'Frank' >>> commA.extra_strings += ['TAG: very helpful'] >>> commA.extra_strings += ['TAG: favorite'] >>> commB = Comment(bug=None, body='More insightful remarks') >>> commB.uuid = '3210' >>> commB.date = 'Fri, 02 Jan 1970 00:00:00 +0000' >>> commB.author = 'John' >>> commB.explicit_attrs = ['author', 'body'] >>> commB.extra_strings += ['TAG: very helpful'] >>> commB.extra_strings += ['TAG: useful'] >>> commA.merge(commB, accept_changes=False, ... accept_extra_strings=False, change_exception=False) >>> commA.merge(commB, accept_changes=False, ... accept_extra_strings=False, change_exception=True) Traceback (most recent call last): ... ValueError: Merge would change author "Frank"->"John" for comment 0123 >>> commA.merge(commB, accept_changes=True, ... accept_extra_strings=False, change_exception=True) Traceback (most recent call last): ... ValueError: Merge would add extra string "TAG: useful" to comment 0123 >>> print commA.author John >>> print commA.extra_strings ['TAG: favorite', 'TAG: very helpful'] >>> commA.merge(commB, accept_changes=True, ... accept_extra_strings=True, change_exception=True) >>> print commA.extra_strings ['TAG: favorite', 'TAG: useful', 'TAG: very helpful'] >>> print commA.xml() 0123 //012 John Thu, 01 Jan 1970 00:00:00 +0000 text/plain More insightful remarks TAG: favorite TAG: useful TAG: very helpful """ for attr in other.explicit_attrs: old = getattr(self, attr) new = getattr(other, attr) if old != new: if accept_changes == True: setattr(self, attr, new) elif change_exception == True: raise ValueError, \ 'Merge would change %s "%s"->"%s" for comment %s' \ % (attr, old, new, self.uuid) if self.alt_id == self.uuid: self.alt_id = None for estr in other.extra_strings: if not estr in self.extra_strings: if accept_extra_strings == True: self.extra_strings.append(estr) elif change_exception == True: raise ValueError, \ 'Merge would add extra string "%s" to comment %s' \ % (estr, self.uuid) def string(self, indent=0): """ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") >>> comm.uuid = 'abcdef' >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> print comm.string(indent=2) --------- Comment --------- Name: //abc From: Date: Thu, 01 Jan 1970 00:00:00 +0000 Some insightful remarks """ lines = [] lines.append("--------- Comment ---------") lines.append("Name: %s" % self.id.user()) lines.append("From: %s" % (self._setting_attr_string("author"))) lines.append("Date: %s" % self.date) lines.append("") if self.content_type.startswith("text/"): body = (self.body or "") if self.bug != None and self.bug.bugdir != None: body = libbe.util.id.long_to_short_text([self.bug.bugdir], body) lines.extend(body.splitlines()) else: lines.append("Content type %s not printable. Try XML output instead" % self.content_type) istring = ' '*indent sep = '\n' + istring return istring + sep.join(lines).rstrip('\n') def string_thread(self, string_method_name="string", indent=0, flatten=True): """ Return a string displaying a thread of comments. bug_shortname is only used if auto_name_map == True. string_method_name (defaults to "string") is the name of the Comment method used to generate the output string for each Comment in the thread. The method must take the arguments indent and shortname. SIDE-EFFECT: if auto_name_map==True, calls comment_shortnames() which will sort the tree by comment.time. Avoid by calling name_map = {} for shortname,comment in comm.comment_shortnames(bug_shortname): name_map[comment.uuid] = shortname comm.sort(key=lambda c : c.author) # your sort comm.string_thread(name_map=name_map) >>> a = Comment(bug=None, uuid="a", body="Insightful remarks") >>> a.time = utility.str_to_time("Thu, 20 Nov 2008 01:00:00 +0000") >>> b = a.new_reply("Critique original comment") >>> b.uuid = "b" >>> b.time = utility.str_to_time("Thu, 20 Nov 2008 02:00:00 +0000") >>> c = b.new_reply("Begin flamewar :p") >>> c.uuid = "c" >>> c.time = utility.str_to_time("Thu, 20 Nov 2008 03:00:00 +0000") >>> d = a.new_reply("Useful examples") >>> d.uuid = "d" >>> d.time = utility.str_to_time("Thu, 20 Nov 2008 04:00:00 +0000") >>> a.sort(key=lambda comm : comm.time) >>> print a.string_thread(flatten=True) --------- Comment --------- Name: //a From: Date: Thu, 20 Nov 2008 01:00:00 +0000 Insightful remarks --------- Comment --------- Name: //b From: Date: Thu, 20 Nov 2008 02:00:00 +0000 Critique original comment --------- Comment --------- Name: //c From: Date: Thu, 20 Nov 2008 03:00:00 +0000 Begin flamewar :p --------- Comment --------- Name: //d From: Date: Thu, 20 Nov 2008 04:00:00 +0000 Useful examples >>> print a.string_thread() --------- Comment --------- Name: //a From: Date: Thu, 20 Nov 2008 01:00:00 +0000 Insightful remarks --------- Comment --------- Name: //b From: Date: Thu, 20 Nov 2008 02:00:00 +0000 Critique original comment --------- Comment --------- Name: //c From: Date: Thu, 20 Nov 2008 03:00:00 +0000 Begin flamewar :p --------- Comment --------- Name: //d From: Date: Thu, 20 Nov 2008 04:00:00 +0000 Useful examples """ stringlist = [] for depth,comment in self.thread(flatten=flatten): ind = 2*depth+indent string_fn = getattr(comment, string_method_name) stringlist.append(string_fn(indent=ind)) return '\n'.join(stringlist) def xml_thread(self, indent=0): return self.string_thread(string_method_name="xml", indent=indent) # methods for saving/loading/acessing settings and properties. def load_settings(self, settings_mapfile=None): if self.uuid == INVALID_UUID: return if settings_mapfile == None: settings_mapfile = \ self.storage.get(self.id.storage("values"), default="\n") try: settings = mapfile.parse(settings_mapfile) except mapfile.InvalidMapfileContents, e: raise Exception('Invalid settings file for comment %s\n' '(BE version missmatch?)' % self.id.user()) self._setup_saved_settings(settings) def save_settings(self): if self.uuid == INVALID_UUID: return mf = mapfile.generate(self._get_saved_settings()) self.storage.set(self.id.storage("values"), mf) def save(self): """ Save any loaded contents to storage. However, if self.storage.is_writeable() == True, then any changes are automatically written to storage as soon as they happen, so calling this method will just waste time (unless something else has been messing with your stored files). """ if self.uuid == INVALID_UUID: return assert self.storage != None, "Can't save without storage" assert self.body != None, "Can't save blank comment" if self.bug != None: parent = self.bug.id.storage() else: parent = None self.storage.add(self.id.storage(), parent=parent, directory=True) self.storage.add(self.id.storage('values'), parent=self.id.storage(), directory=False) self.storage.add(self.id.storage('body'), parent=self.id.storage(), directory=False) self.save_settings() self._set_comment_body(new=self.body, force=True) def remove(self): for comment in self: comment.remove() if self.uuid != INVALID_UUID: self.storage.recursive_remove(self.id.storage()) def add_reply(self, reply, allow_time_inversion=False): if self.uuid != INVALID_UUID: reply.in_reply_to = self.uuid self.append(reply) def new_reply(self, body=None, content_type=None): """ >>> comm = Comment(bug=None, body="Some insightful remarks") >>> repA = comm.new_reply("Critique original comment") >>> repB = repA.new_reply("Begin flamewar :p") >>> repB.in_reply_to == repA.uuid True """ reply = Comment(self.bug, body=body, content_type=content_type) self.add_reply(reply) return reply def comment_from_uuid(self, uuid, match_alt_id=True): """ Use a uuid to look up a comment. >>> a = Comment(bug=None, uuid="a") >>> b = a.new_reply() >>> b.uuid = "b" >>> c = b.new_reply() >>> c.uuid = "c" >>> d = a.new_reply() >>> d.uuid = "d" >>> d.alt_id = "d-alt" >>> comm = a.comment_from_uuid("d") >>> id(comm) == id(d) True >>> comm = a.comment_from_uuid("d-alt") >>> id(comm) == id(d) True >>> comm = a.comment_from_uuid(None, match_alt_id=False) Traceback (most recent call last): ... KeyError: None """ for comment in self.traverse(): if comment.uuid == uuid: return comment if match_alt_id == True and uuid != None \ and comment.alt_id == uuid: return comment raise KeyError(uuid) # methods for id generation def sibling_uuids(self): if self.bug != None: return self.bug.uuids() return [] def cmp_attr(comment_1, comment_2, attr, invert=False): """ Compare a general attribute between two comments using the conventional comparison rule for that attribute type. If invert == True, sort *against* that convention. >>> attr="author" >>> commentA = Comment() >>> commentB = Comment() >>> commentA.author = "John Doe" >>> commentB.author = "Jane Doe" >>> cmp_attr(commentA, commentB, attr) > 0 True >>> cmp_attr(commentA, commentB, attr, invert=True) < 0 True >>> commentB.author = "John Doe" >>> cmp_attr(commentA, commentB, attr) == 0 True """ if not hasattr(comment_2, attr) : return 1 val_1 = getattr(comment_1, attr) val_2 = getattr(comment_2, attr) if val_1 == None: val_1 = None if val_2 == None: val_2 = None if invert == True : return -cmp(val_1, val_2) else : return cmp(val_1, val_2) # alphabetical rankings (a < z) cmp_uuid = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "uuid") cmp_author = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "author") cmp_in_reply_to = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "in_reply_to") cmp_content_type = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "content_type") cmp_body = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "body") cmp_extra_strings = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "extra_strings") # chronological rankings (newer < older) cmp_time = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "time", invert=True) DEFAULT_CMP_FULL_CMP_LIST = \ (cmp_time, cmp_author, cmp_content_type, cmp_body, cmp_in_reply_to, cmp_uuid, cmp_extra_strings) class CommentCompoundComparator (object): def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST): self.cmp_list = cmp_list def __call__(self, comment_1, comment_2): for comparison in self.cmp_list : val = comparison(comment_1, comment_2) if val != 0 : return val return 0 cmp_full = CommentCompoundComparator() if libbe.TESTING == True: suite = doctest.DocTestSuite()