aboutsummaryrefslogtreecommitdiffstats
path: root/interfaces/email/interactive/libbe/bug.py
diff options
context:
space:
mode:
Diffstat (limited to 'interfaces/email/interactive/libbe/bug.py')
-rw-r--r--interfaces/email/interactive/libbe/bug.py580
1 files changed, 580 insertions, 0 deletions
diff --git a/interfaces/email/interactive/libbe/bug.py b/interfaces/email/interactive/libbe/bug.py
new file mode 100644
index 0000000..fd30ff7
--- /dev/null
+++ b/interfaces/email/interactive/libbe/bug.py
@@ -0,0 +1,580 @@
+# Copyright (C) 2008-2009 Chris Ball <cjb@laptop.org>
+# Thomas Habets <thomas@habets.pp.se>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the Bug class for representing bugs.
+"""
+
+import os
+import os.path
+import errno
+import time
+import types
+import xml.sax.saxutils
+import doctest
+
+from beuuid import uuid_gen
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, cached_property, \
+ primed_property, change_hook_property, settings_property
+import settings_object
+import mapfile
+import comment
+import utility
+
+
+class DiskAccessRequired (Exception):
+ def __init__(self, goal):
+ msg = "Cannot %s without accessing the disk" % goal
+ Exception.__init__(self, msg)
+
+### Define and describe valid bug categories
+# Use a tuple of (category, description) tuples since we don't have
+# ordered dicts in Python yet http://www.python.org/dev/peps/pep-0372/
+
+# in order of increasing severity. (name, description) pairs
+severity_def = (
+ ("wishlist","A feature that could improve usefulness, but not a bug."),
+ ("minor","The standard bug level."),
+ ("serious","A bug that requires workarounds."),
+ ("critical","A bug that prevents some features from working at all."),
+ ("fatal","A bug that makes the package unusable."))
+
+# in order of increasing resolution
+# roughly following http://www.bugzilla.org/docs/3.2/en/html/lifecycle.html
+active_status_def = (
+ ("unconfirmed","A possible bug which lacks independent existance confirmation."),
+ ("open","A working bug that has not been assigned to a developer."),
+ ("assigned","A working bug that has been assigned to a developer."),
+ ("test","The code has been adjusted, but the fix is still being tested."))
+inactive_status_def = (
+ ("closed", "The bug is no longer relevant."),
+ ("fixed", "The bug should no longer occur."),
+ ("wontfix","It's not a bug, it's a feature."))
+
+
+### Convert the description tuples to more useful formats
+
+severity_values = ()
+severity_description = {}
+severity_index = {}
+def load_severities(severity_def):
+ global severity_values
+ global severity_description
+ global severity_index
+ if severity_def == None:
+ return
+ severity_values = tuple([val for val,description in severity_def])
+ severity_description = dict(severity_def)
+ severity_index = {}
+ for i,severity in enumerate(severity_values):
+ severity_index[severity] = i
+load_severities(severity_def)
+
+active_status_values = []
+inactive_status_values = []
+status_values = []
+status_description = {}
+status_index = {}
+def load_status(active_status_def, inactive_status_def):
+ global active_status_values
+ global inactive_status_values
+ global status_values
+ global status_description
+ global status_index
+ if active_status_def == None:
+ active_status_def = globals()["active_status_def"]
+ if inactive_status_def == None:
+ inactive_status_def = globals()["inactive_status_def"]
+ active_status_values = tuple([val for val,description in active_status_def])
+ inactive_status_values = tuple([val for val,description in inactive_status_def])
+ status_values = active_status_values + inactive_status_values
+ status_description = dict(tuple(active_status_def) + tuple(inactive_status_def))
+ status_index = {}
+ for i,status in enumerate(status_values):
+ status_index[status] = i
+load_status(active_status_def, inactive_status_def)
+
+
+class Bug(settings_object.SavedSettingsObject):
+ """
+ >>> b = Bug()
+ >>> print b.status
+ open
+ >>> print b.severity
+ minor
+
+ There are two formats for time, int and string. Setting either
+ one will adjust the other appropriately. The string form is the
+ one stored in the bug's settings file on disk.
+ >>> print type(b.time)
+ <type 'int'>
+ >>> print type(b.time_string)
+ <type 'str'>
+ >>> b.time = 0
+ >>> print b.time_string
+ Thu, 01 Jan 1970 00:00:00 +0000
+ >>> b.time_string="Thu, 01 Jan 1970 00:01:00 +0000"
+ >>> b.time
+ 60
+ >>> print b.settings["time"]
+ Thu, 01 Jan 1970 00:01:00 +0000
+ """
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+
+ @_versioned_property(name="severity",
+ doc="A measure of the bug's importance",
+ default="minor",
+ check_fn=lambda s: s in severity_values,
+ require_save=True)
+ def severity(): return {}
+
+ @_versioned_property(name="status",
+ doc="The bug's current status",
+ default="open",
+ check_fn=lambda s: s in status_values,
+ require_save=True)
+ def status(): return {}
+
+ @property
+ def active(self):
+ return self.status in active_status_values
+
+ @_versioned_property(name="target",
+ doc="The deadline for fixing this bug")
+ def target(): return {}
+
+ @_versioned_property(name="creator",
+ doc="The user who entered the bug into the system")
+ def creator(): return {}
+
+ @_versioned_property(name="reporter",
+ doc="The user who reported the bug")
+ def reporter(): return {}
+
+ @_versioned_property(name="assigned",
+ doc="The developer in charge of the bug")
+ def assigned(): return {}
+
+ @_versioned_property(name="time",
+ doc="An RFC 2822 timestamp for bug creation")
+ def time_string(): return {}
+
+ def _get_time(self):
+ if self.time_string == None:
+ return None
+ return utility.str_to_time(self.time_string)
+ def _set_time(self, value):
+ self.time_string = utility.time_to_str(value)
+ time = property(fget=_get_time,
+ fset=_set_time,
+ doc="An integer version of .time_string")
+
+ def _extra_strings_check_fn(value):
+ return utility.iterable_full_of_strings(value, \
+ alternative=settings_object.EMPTY)
+ def _extra_strings_change_hook(self, old, new):
+ self.extra_strings.sort() # to make merging easier
+ self._prop_save_settings(old, new)
+ @_versioned_property(name="extra_strings",
+ doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
+ default=[],
+ check_fn=_extra_strings_check_fn,
+ change_hook=_extra_strings_change_hook,
+ mutable=True)
+ def extra_strings(): return {}
+
+ @_versioned_property(name="summary",
+ doc="A one-line bug description")
+ def summary(): return {}
+
+ def _get_comment_root(self, load_full=False):
+ if self.sync_with_disk:
+ return comment.loadComments(self, load_full=load_full)
+ else:
+ return comment.Comment(self, uuid=comment.INVALID_UUID)
+
+ @Property
+ @cached_property(generator=_get_comment_root)
+ @local_property("comment_root")
+ @doc_property(doc="The trunk of the comment tree")
+ def comment_root(): return {}
+
+ def _get_vcs(self):
+ if hasattr(self.bugdir, "vcs"):
+ return self.bugdir.vcs
+
+ @Property
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
+ @doc_property(doc="A revision control system instance.")
+ def vcs(): return {}
+
+ def __init__(self, bugdir=None, uuid=None, from_disk=False,
+ load_comments=False, summary=None):
+ settings_object.SavedSettingsObject.__init__(self)
+ self.bugdir = bugdir
+ self.uuid = uuid
+ if from_disk == True:
+ self.sync_with_disk = True
+ else:
+ self.sync_with_disk = False
+ if uuid == None:
+ self.uuid = uuid_gen()
+ self.time = int(time.time()) # only save to second precision
+ if self.vcs != None:
+ self.creator = self.vcs.get_user_id()
+ self.summary = summary
+
+ def __repr__(self):
+ return "Bug(uuid=%r)" % self.uuid
+
+ def __str__(self):
+ return self.string(shortlist=True)
+
+ def __cmp__(self, other):
+ return cmp_full(self, other)
+
+ # serializing methods
+
+ def _setting_attr_string(self, setting):
+ value = getattr(self, setting)
+ if value == None:
+ return ""
+ return str(value)
+
+ def xml(self, show_comments=False):
+ if self.bugdir == None:
+ shortname = self.uuid
+ else:
+ shortname = self.bugdir.bug_shortname(self)
+
+ if self.time == None:
+ timestring = ""
+ else:
+ timestring = utility.time_to_str(self.time)
+
+ info = [("uuid", self.uuid),
+ ("short-name", shortname),
+ ("severity", self.severity),
+ ("status", self.status),
+ ("assigned", self.assigned),
+ ("target", self.target),
+ ("reporter", self.reporter),
+ ("creator", self.creator),
+ ("created", timestring),
+ ("summary", self.summary)]
+ ret = '<bug>\n'
+ for (k,v) in info:
+ if v is not None:
+ ret += ' <%s>%s</%s>\n' % (k,xml.sax.saxutils.escape(v),k)
+ for estr in self.extra_strings:
+ ret += ' <extra-string>%s</extra-string>\n' % estr
+ if show_comments == True:
+ comout = self.comment_root.xml_thread(auto_name_map=True,
+ bug_shortname=shortname)
+ if len(comout) > 0:
+ ret += comout+'\n'
+ ret += '</bug>'
+ return ret
+
+ def string(self, shortlist=False, show_comments=False):
+ if self.bugdir == None:
+ shortname = self.uuid
+ else:
+ shortname = self.bugdir.bug_shortname(self)
+ if shortlist == False:
+ if self.time == None:
+ timestring = ""
+ else:
+ htime = utility.handy_time(self.time)
+ timestring = "%s (%s)" % (htime, self.time_string)
+ info = [("ID", self.uuid),
+ ("Short name", shortname),
+ ("Severity", self.severity),
+ ("Status", self.status),
+ ("Assigned", self._setting_attr_string("assigned")),
+ ("Target", self._setting_attr_string("target")),
+ ("Reporter", self._setting_attr_string("reporter")),
+ ("Creator", self._setting_attr_string("creator")),
+ ("Created", timestring)]
+ longest_key_len = max([len(k) for k,v in info])
+ infolines = [" %*s : %s\n" %(longest_key_len,k,v) for k,v in info]
+ bugout = "".join(infolines) + "%s" % self.summary.rstrip('\n')
+ else:
+ statuschar = self.status[0]
+ severitychar = self.severity[0]
+ chars = "%c%c" % (statuschar, severitychar)
+ bugout = "%s:%s: %s" % (shortname,chars,self.summary.rstrip('\n'))
+
+ if show_comments == True:
+ # take advantage of the string_thread(auto_name_map=True)
+ # SIDE-EFFECT of sorting by comment time.
+ comout = self.comment_root.string_thread(flatten=False,
+ auto_name_map=True,
+ bug_shortname=shortname)
+ output = bugout + '\n' + comout.rstrip('\n')
+ else :
+ output = bugout
+ return output
+
+ # methods for saving/loading/acessing settings and properties.
+
+ def get_path(self, *args):
+ dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid)
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["values", "comments"], str(args)
+ return os.path.join(dir, *args)
+
+ def set_sync_with_disk(self, value):
+ self.sync_with_disk = value
+ for comment in self.comments():
+ comment.set_sync_with_disk(value)
+
+ def load_settings(self):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load settings")
+ self.settings = mapfile.map_load(self.vcs, self.get_path("values"))
+ self._setup_saved_settings()
+
+ def save_settings(self):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("save settings")
+ assert self.summary != None, "Can't save blank bug"
+ self.vcs.mkdir(self.get_path())
+ path = self.get_path("values")
+ mapfile.map_save(self.vcs, path, self._get_saved_settings())
+
+ def save(self):
+ """
+ Save any loaded contents to disk. Because of lazy loading of
+ comments, this is actually not too inefficient.
+
+ However, if self.sync_with_disk = True, then any changes are
+ automatically written to disk as soon as they happen, so
+ calling this method will just waste time (unless something
+ else has been messing with your on-disk files).
+ """
+ sync_with_disk = self.sync_with_disk
+ if sync_with_disk == False:
+ self.set_sync_with_disk(True)
+ self.save_settings()
+ if len(self.comment_root) > 0:
+ comment.saveComments(self)
+ if sync_with_disk == False:
+ self.set_sync_with_disk(False)
+
+ def load_comments(self, load_full=True):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load comments")
+ if load_full == True:
+ # Force a complete load of the whole comment tree
+ self.comment_root = self._get_comment_root(load_full=True)
+ else:
+ # Setup for fresh lazy-loading. Clear _comment_root, so
+ # _get_comment_root returns a fresh version. Turn of
+ # syncing temporarily so we don't write our blank comment
+ # tree to disk.
+ self.sync_with_disk = False
+ self.comment_root = None
+ self.sync_with_disk = True
+
+ def remove(self):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("remove")
+ self.comment_root.remove()
+ path = self.get_path()
+ self.vcs.recursive_remove(path)
+
+ # methods for managing comments
+
+ def comments(self):
+ for comment in self.comment_root.traverse():
+ yield comment
+
+ def new_comment(self, body=None):
+ comm = self.comment_root.new_reply(body=body)
+ return comm
+
+ def comment_from_shortname(self, shortname, *args, **kwargs):
+ return self.comment_root.comment_from_shortname(shortname,
+ *args, **kwargs)
+
+ def comment_from_uuid(self, uuid):
+ return self.comment_root.comment_from_uuid(uuid)
+
+ def comment_shortnames(self, shortname=None):
+ """
+ SIDE-EFFECT : Comment.comment_shortnames will sort the comment
+ tree by comment.time
+ """
+ for id, comment in self.comment_root.comment_shortnames(shortname):
+ yield (id, comment)
+
+
+# The general rule for bug sorting is that "more important" bugs are
+# less than "less important" bugs. This way sorting a list of bugs
+# will put the most important bugs first in the list. When relative
+# importance is unclear, the sorting follows some arbitrary convention
+# (i.e. dictionary order).
+
+def cmp_severity(bug_1, bug_2):
+ """
+ Compare the severity levels of two bugs, with more severe bugs
+ comparing as less.
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.severity = bugB.severity = "wishlist"
+ >>> cmp_severity(bugA, bugB) == 0
+ True
+ >>> bugB.severity = "minor"
+ >>> cmp_severity(bugA, bugB) > 0
+ True
+ >>> bugA.severity = "critical"
+ >>> cmp_severity(bugA, bugB) < 0
+ True
+ """
+ if not hasattr(bug_2, "severity") :
+ return 1
+ return -cmp(severity_index[bug_1.severity], severity_index[bug_2.severity])
+
+def cmp_status(bug_1, bug_2):
+ """
+ Compare the status levels of two bugs, with more 'open' bugs
+ comparing as less.
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.status = bugB.status = "open"
+ >>> cmp_status(bugA, bugB) == 0
+ True
+ >>> bugB.status = "closed"
+ >>> cmp_status(bugA, bugB) < 0
+ True
+ >>> bugA.status = "fixed"
+ >>> cmp_status(bugA, bugB) > 0
+ True
+ """
+ if not hasattr(bug_2, "status") :
+ return 1
+ val_2 = status_index[bug_2.status]
+ return cmp(status_index[bug_1.status], status_index[bug_2.status])
+
+def cmp_attr(bug_1, bug_2, attr, invert=False):
+ """
+ Compare a general attribute between two bugs using the conventional
+ comparison rule for that attribute type. If invert == True, sort
+ *against* that convention.
+ >>> attr="severity"
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.severity = "critical"
+ >>> bugB.severity = "wishlist"
+ >>> cmp_attr(bugA, bugB, attr) < 0
+ True
+ >>> cmp_attr(bugA, bugB, attr, invert=True) > 0
+ True
+ >>> bugB.severity = "critical"
+ >>> cmp_attr(bugA, bugB, attr) == 0
+ True
+ """
+ if not hasattr(bug_2, attr) :
+ return 1
+ val_1 = getattr(bug_1, attr)
+ val_2 = getattr(bug_2, attr)
+ if val_1 == None: val_1 = None
+ if val_2 == None: val_2 = None
+
+ if invert == True :
+ return -cmp(val_1, val_2)
+ else :
+ return cmp(val_1, val_2)
+
+# alphabetical rankings (a < z)
+cmp_uuid = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "uuid")
+cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator")
+cmp_assigned = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "assigned")
+cmp_target = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "target")
+cmp_reporter = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "reporter")
+cmp_summary = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "summary")
+# chronological rankings (newer < older)
+cmp_time = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "time", invert=True)
+
+def cmp_comments(bug_1, bug_2):
+ """
+ Compare two bugs' comments lists. Doesn't load any new comments,
+ so you should call each bug's .load_comments() first if you want a
+ full comparison.
+ """
+ comms_1 = sorted(bug_1.comments(), key = lambda comm : comm.uuid)
+ comms_2 = sorted(bug_2.comments(), key = lambda comm : comm.uuid)
+ result = cmp(len(comms_1), len(comms_2))
+ if result != 0:
+ return result
+ for c_1,c_2 in zip(comms_1, comms_2):
+ result = cmp(c_1, c_2)
+ if result != 0:
+ return result
+ return 0
+
+DEFAULT_CMP_FULL_CMP_LIST = \
+ (cmp_status, cmp_severity, cmp_assigned, cmp_time, cmp_creator,
+ cmp_reporter, cmp_target, cmp_comments, cmp_summary, cmp_uuid)
+
+class BugCompoundComparator (object):
+ def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST):
+ self.cmp_list = cmp_list
+ def __call__(self, bug_1, bug_2):
+ for comparison in self.cmp_list :
+ val = comparison(bug_1, bug_2)
+ if val != 0 :
+ return val
+ return 0
+
+cmp_full = BugCompoundComparator()
+
+
+# define some bonus cmp_* functions
+def cmp_last_modified(bug_1, bug_2):
+ """
+ Like cmp_time(), but use most recent comment instead of bug
+ creation for the timestamp.
+ """
+ def last_modified(bug):
+ time = bug.time
+ for comment in bug.comment_root.traverse():
+ if comment.time > time:
+ time = comment.time
+ return time
+ val_1 = last_modified(bug_1)
+ val_2 = last_modified(bug_2)
+ return -cmp(val_1, val_2)
+
+
+suite = doctest.DocTestSuite()