aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
Diffstat (limited to 'libbe')
-rw-r--r--libbe/bug.py277
-rw-r--r--libbe/bugdir.py290
-rw-r--r--libbe/comment.py200
-rw-r--r--libbe/properties.py477
-rw-r--r--libbe/settings_object.py267
5 files changed, 1193 insertions, 318 deletions
diff --git a/libbe/bug.py b/libbe/bug.py
index f47bcba..cba06d6 100644
--- a/libbe/bug.py
+++ b/libbe/bug.py
@@ -21,6 +21,10 @@ import time
import doctest
from beuuid import uuid_gen
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, cached_property, \
+ primed_property, change_hook_property, settings_property
+import settings_object
import mapfile
import comment
import utility
@@ -69,107 +73,162 @@ for i in range(len(status_values)):
status_index[status_values[i]] = i
-def checked_property(name, valid):
+class Bug(settings_object.SavedSettingsObject):
"""
- Provide access to an attribute name, testing for valid values.
+ >>> b = Bug()
+ >>> print b.status
+ open
+ >>> print b.severity
+ minor
+
+ There are two formats for time, int and string. Setting either
+ one will adjust the other appropriately. The string form is the
+ one stored in the bug's settings file on disk.
+ >>> print type(b.time)
+ <type 'int'>
+ >>> print type(b.time_string)
+ <type 'str'>
+ >>> b.time = 0
+ >>> print b.time_string
+ Thu, 01 Jan 1970 00:00:00 +0000
+ >>> b.time_string="Thu, 01 Jan 1970 00:01:00 +0000"
+ >>> b.time
+ 60
+ >>> print b.settings["time"]
+ Thu, 01 Jan 1970 00:01:00 +0000
"""
- def getter(self):
- value = getattr(self, "_"+name)
- if value not in valid:
- raise InvalidValue(name, value)
- return value
-
- def setter(self, value):
- if value not in valid:
- raise InvalidValue(name, value)
- return setattr(self, "_"+name, value)
- return property(getter, setter)
-
-
-class Bug(object):
- severity = checked_property("severity", severity_values)
- status = checked_property("status", status_values)
-
- def _get_active(self):
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+
+ @_versioned_property(name="severity",
+ doc="A measure of the bug's importance",
+ default="minor",
+ allowed=severity_values,
+ require_save=True)
+ def severity(): return {}
+
+ @_versioned_property(name="status",
+ doc="The bug's current status",
+ default="open",
+ allowed=status_values,
+ require_save=True)
+ def status(): return {}
+
+ @property
+ def active(self):
return self.status in active_status_values
- active = property(_get_active)
+ @_versioned_property(name="target",
+ doc="The deadline for fixing this bug")
+ def target(): return {}
+
+ @_versioned_property(name="creator",
+ doc="The user who entered the bug into the system")
+ def creator(): return {}
+
+ @_versioned_property(name="reporter",
+ doc="The user who reported the bug")
+ def reporter(): return {}
+
+ @_versioned_property(name="assigned",
+ doc="The developer in charge of the bug")
+ def assigned(): return {}
+
+ @_versioned_property(name="time",
+ doc="An RFC 2822 timestamp for bug creation")
+ def time_string(): return {}
+
+ def _get_time(self):
+ if self.time_string == None or self.time_string == settings_object.EMPTY:
+ return None
+ return utility.str_to_time(self.time_string)
+ def _set_time(self, value):
+ self.time_string = utility.time_to_str(value)
+ time = property(fget=_get_time,
+ fset=_set_time,
+ doc="An integer version of .time_string")
+
+ @_versioned_property(name="summary",
+ doc="A one-line bug description")
+ def summary(): return {}
+
+ def _get_comment_root(self, load_full=False):
+ if self.sync_with_disk:
+ return comment.loadComments(self, load_full=load_full)
+ else:
+ return comment.Comment(self, uuid=comment.INVALID_UUID)
- def _get_comment_root(self):
- if self._comment_root == None:
- if self._comments_loaded == True:
- self._comment_root = comment.loadComments(self)
- else:
- self._comment_root = comment.Comment(self,
- uuid=comment.INVALID_UUID)
- return self._comment_root
+ @Property
+ @cached_property(generator=_get_comment_root)
+ @local_property("comment_root")
+ @doc_property(doc="The trunk of the comment tree")
+ def comment_root(): return {}
- def _set_comment_root(self, comment_root):
- self._comment_root = comment_root
+ def _get_rcs(self):
+ if hasattr(self.bugdir, "rcs"):
+ return self.bugdir.rcs
- _comment_root = None
- comment_root = property(_get_comment_root, _set_comment_root,
- doc="The trunk of the comment tree")
+ @Property
+ @cached_property(generator=_get_rcs)
+ @local_property("rcs")
+ @doc_property(doc="A revision control system instance.")
+ def rcs(): return {}
def __init__(self, bugdir=None, uuid=None, from_disk=False,
load_comments=False, summary=None):
+ settings_object.SavedSettingsObject.__init__(self)
self.bugdir = bugdir
- if bugdir != None:
- self.rcs = bugdir.rcs
- else:
- self.rcs = None
+ self.uuid = uuid
if from_disk == True:
- self._comments_loaded = False
- self.uuid = uuid
- self.load(load_comments=load_comments)
+ self.sync_with_disk = True
else:
- # Note: defaults should match those in Bug.load()
- self._comments_loaded = True
- if uuid != None:
- self.uuid = uuid
- else:
+ self.sync_with_disk = False
+ if uuid == None:
self.uuid = uuid_gen()
- self.summary = summary
+ self.time = int(time.time()) # only save to second precision
if self.rcs != None:
self.creator = self.rcs.get_user_id()
- else:
- self.creator = None
- self.target = None
- self.status = "open"
- self.severity = "minor"
- self.assigned = None
- self.time = int(time.time()) # only save to second precision
+ self.summary = summary
def __repr__(self):
return "Bug(uuid=%r)" % self.uuid
+ def _setting_attr_string(self, setting):
+ value = getattr(self, setting)
+ if value == settings_object.EMPTY:
+ return ""
+ else:
+ return str(value)
+
def string(self, shortlist=False, show_comments=False):
if self.bugdir == None:
shortname = self.uuid
else:
shortname = self.bugdir.bug_shortname(self)
if shortlist == False:
- if self.time == None:
- timestring = ""
+ if self.time_string == "":
+ timestring = self.time_string
else:
htime = utility.handy_time(self.time)
- ftime = utility.time_to_str(self.time)
- timestring = "%s (%s)" % (htime, ftime)
+ timestring = "%s (%s)" % (htime, self.time_string)
info = [("ID", self.uuid),
("Short name", shortname),
("Severity", self.severity),
("Status", self.status),
- ("Assigned", self.assigned),
- ("Target", self.target),
- ("Creator", self.creator),
+ ("Assigned", self._setting_attr_string("assigned")),
+ ("Target", self._setting_attr_string("target")),
+ ("Creator", self._setting_attr_string("creator")),
("Created", timestring)]
- newinfo = []
- for k,v in info:
- if v == None:
- newinfo.append((k,""))
- else:
- newinfo.append((k,v))
- info = newinfo
longest_key_len = max([len(k) for k,v in info])
infolines = [" %*s : %s\n" %(longest_key_len,k,v) for k,v in info]
bugout = "".join(infolines) + "%s" % self.summary.rstrip('\n')
@@ -180,8 +239,6 @@ class Bug(object):
bugout = "%s:%s: %s" % (shortname,chars,self.summary.rstrip('\n'))
if show_comments == True:
- if self._comments_loaded == False:
- self.load_comments()
# take advantage of the string_thread(auto_name_map=True)
# SIDE-EFFECT of sorting by comment time.
comout = self.comment_root.string_thread(flatten=False,
@@ -205,52 +262,32 @@ class Bug(object):
assert name in ["values", "comments"]
return os.path.join(my_dir, name)
- def load(self, load_comments=False):
- map = mapfile.map_load(self.rcs, self.get_path("values"))
- self.summary = map.get("summary")
- self.creator = map.get("creator")
- self.target = map.get("target")
- self.status = map.get("status", "open")
- self.severity = map.get("severity", "minor")
- self.assigned = map.get("assigned")
- self.time = map.get("time")
- if self.time is not None:
- self.time = utility.str_to_time(self.time)
-
- if load_comments == True:
- self.load_comments()
-
- def load_comments(self):
- # clear _comment_root, so _get_comment_root returns a fresh version
- self._comment_root = None
- self._comments_loaded = True
-
- def comments(self):
- if self._comments_loaded == False:
- self.load_comments()
- for comment in self.comment_root.traverse():
- yield comment
-
- def _add_attr(self, map, name):
- value = getattr(self, name)
- if value is not None:
- map[name] = value
+ def load_settings(self):
+ self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
+ self._setup_saved_settings()
- def save(self):
+ def load_comments(self, load_full=True):
+ if load_full == True:
+ # Force a complete load of the whole comment tree
+ self.comment_root = self._get_comment_root(load_full=True)
+ else:
+ # Setup for fresh lazy-loading. Clear _comment_root, so
+ # _get_comment_root returns a fresh version. Turn of
+ # syncing temporarily so we don't write our blank comment
+ # tree to disk.
+ self.sync_with_disk = False
+ self.comment_root = None
+ self.sync_with_disk = True
+
+ def save_settings(self):
assert self.summary != None, "Can't save blank bug"
- map = {}
- self._add_attr(map, "assigned")
- self._add_attr(map, "summary")
- self._add_attr(map, "creator")
- self._add_attr(map, "target")
- self._add_attr(map, "status")
- self._add_attr(map, "severity")
- if self.time is not None:
- map["time"] = utility.time_to_str(self.time)
-
+
self.rcs.mkdir(self.get_path())
path = self.get_path("values")
- mapfile.map_save(self.rcs, path, map)
+ mapfile.map_save(self.rcs, path, self._get_saved_settings())
+
+ def save(self):
+ self.save_settings()
if len(self.comment_root) > 0:
self.rcs.mkdir(self.get_path("comments"))
@@ -261,6 +298,10 @@ class Bug(object):
path = self.get_path()
self.rcs.recursive_remove(path)
+ def comments(self):
+ for comment in self.comment_root.traverse():
+ yield comment
+
def new_comment(self, body=None):
comm = self.comment_root.new_reply(body=body)
return comm
@@ -280,7 +321,8 @@ class Bug(object):
for id, comment in self.comment_root.comment_shortnames(shortname):
yield (id, comment)
-# the general rule for bug sorting is that "more important" bugs are
+
+# The general rule for bug sorting is that "more important" bugs are
# less than "less important" bugs. This way sorting a list of bugs
# will put the most important bugs first in the list. When relative
# importance is unclear, the sorting follows some arbitrary convention
@@ -347,10 +389,15 @@ def cmp_attr(bug_1, bug_2, attr, invert=False):
"""
if not hasattr(bug_2, attr) :
return 1
+ val_1 = getattr(bug_1, attr)
+ val_2 = getattr(bug_2, attr)
+ if val_1 == settings_object.EMPTY: val_1 = None
+ if val_2 == settings_object.EMPTY: val_2 = None
+
if invert == True :
- return -cmp(getattr(bug_1, attr), getattr(bug_2, attr))
+ return -cmp(val_1, val_2)
else :
- return cmp(getattr(bug_1, attr), getattr(bug_2, attr))
+ return cmp(val_1, val_2)
# alphabetical rankings (a < z)
cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator")
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index 6bb6a43..f93576f 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -22,6 +22,11 @@ import copy
import unittest
import doctest
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+import settings_object
import mapfile
import bug
import rcs
@@ -65,31 +70,7 @@ class MultipleBugMatches(ValueError):
TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
-def setting_property(name, valid=None, default=None, doc=None):
- if default != None:
- raise NotImplementedError
- def getter(self):
- value = self.settings.get(name)
- if valid is not None:
- if value not in valid and value != None:
- raise InvalidValue(name, value)
- return value
-
- def setter(self, value):
- if value != getter(self):
- if valid is not None:
- if value not in valid and value != None:
- raise InvalidValue(name, value)
- if value is None:
- del self.settings[name]
- else:
- self.settings[name] = value
- self._save_settings(self.get_path("settings"), self.settings)
-
- return property(getter, setter, doc=doc)
-
-
-class BugDir (list):
+class BugDir (list, settings_object.SavedSettingsObject):
"""
Sink to existing root
======================
@@ -143,14 +124,108 @@ class BugDir (list):
using BugDirs, set manipulate_encodings=False, and stick to ASCII
in your tests.
"""
+
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+
+ @_versioned_property(name="target",
+ doc="The current project development target")
+ def target(): return {}
+
+ def _guess_encoding(self):
+ return encoding.get_encoding()
+ def _check_encoding(value):
+ if value != None and value != settings_object.EMPTY:
+ return encoding.known_encoding(value)
+ def _setup_encoding(self, new_encoding):
+ if new_encoding != None and new_encoding != settings_object.EMPTY:
+ if self._manipulate_encodings == True:
+ encoding.set_IO_stream_encodings(new_encoding)
+ def _set_encoding(self, old_encoding, new_encoding):
+ self._setup_encoding(new_encoding)
+ self._prop_save_settings(old_encoding, new_encoding)
+
+ @_versioned_property(name="encoding",
+ doc="""The default input/output encoding to use (e.g. "utf-8").""",
+ change_hook=_set_encoding,
+ generator=_guess_encoding,
+ check_fn=_check_encoding)
+ def encoding(): return {}
+
+ def _guess_user_id(self):
+ return self.rcs.get_user_id()
+ def _set_user_id(self, old_user_id, new_user_id):
+ self.rcs.user_id = new_user_id
+ self._prop_save_settings(old_user_id, new_user_id)
+
+ @_versioned_property(name="user_id",
+ doc=
+"""The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Note
+that the Arch RCS backend *enforces* ids with this format.""",
+ change_hook=_set_user_id,
+ generator=_guess_user_id)
+ def user_id(): return {}
+
+ @_versioned_property(name="rcs_name",
+ doc="""The name of the current RCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .rcs instead, and
+.rcs_name will be automatically adjusted.""",
+ default="None",
+ allowed=["None", "Arch", "bzr", "git", "hg"])
+ def rcs_name(): return {}
+
+ def _get_rcs(self, rcs_name=None):
+ """Get and root a new revision control system"""
+ if rcs_name == None:
+ rcs_name = self.rcs_name
+ new_rcs = rcs.rcs_by_name(rcs_name)
+ self._change_rcs(None, new_rcs)
+ return new_rcs
+ def _change_rcs(self, old_rcs, new_rcs):
+ new_rcs.encoding = self.encoding
+ new_rcs.root(self.root)
+ self.rcs_name = new_rcs.name
+
+ @Property
+ @change_hook_property(hook=_change_rcs)
+ @cached_property(generator=_get_rcs)
+ @local_property("rcs")
+ @doc_property(doc="A revision control system instance.")
+ def rcs(): return {}
+
+ def _bug_map_gen(self):
+ map = {}
+ for bug in self:
+ map[bug.uuid] = bug
+ for uuid in self.list_uuids():
+ if uuid not in map:
+ map[uuid] = None
+ self._bug_map_value = map # ._bug_map_value used by @local_property
+
+ @Property
+ @primed_property(primer=_bug_map_gen)
+ @local_property("bug_map")
+ @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
+ def _bug_map(): return {}
+
+
def __init__(self, root=None, sink_to_existing_root=True,
assert_new_BugDir=False, allow_rcs_init=False,
manipulate_encodings=True,
from_disk=False, rcs=None):
list.__init__(self)
- self._save_user_id = False
+ settings_object.SavedSettingsObject.__init__(self)
self._manipulate_encodings = manipulate_encodings
- self.settings = {}
if root == None:
root = os.getcwd()
if sink_to_existing_root == True:
@@ -159,9 +234,15 @@ class BugDir (list):
if not os.path.exists(root):
raise NoRootEntry(root)
self.root = root
+ # get a temporary rcs until we've loaded settings
+ self.sync_with_disk = False
+ self.rcs = self._guess_rcs()
+
if from_disk == True:
+ self.sync_with_disk = True
self.load()
else:
+ self.sync_with_disk = False
if assert_new_BugDir == True:
if os.path.exists(self.get_path()):
raise AlreadyInitialized, self.get_path()
@@ -206,75 +287,6 @@ class BugDir (list):
self.rcs.set_file_contents(self.get_path("version"),
TREE_VERSION_STRING)
- def _get_encoding(self):
- if self._encoding == None:
- return encoding.get_encoding()
- else:
- return self._encoding
- def _set_encoding(self, new_encoding):
- if new_encoding != None:
- if encoding.known_encoding(new_encoding) == False:
- raise InvalidValue("encoding", new_encoding)
- self._encoding = new_encoding
- if self._manipulate_encodings == True:
- encoding.set_IO_stream_encodings(self.encoding)
- if hasattr(self, "rcs"):
- if self.rcs != None:
- self.rcs.encoding = self.encoding
- _encoding = setting_property("encoding",
- doc=
-"""The default input/output encoding to use (e.g. "utf-8").
-Dont' set this attribute, set .encoding instead.""")
- encoding = property(_get_encoding, _set_encoding, doc=
-"""The default input/output encoding to use (e.g. "utf-8").""")
-
- def _get_rcs(self):
- return self._rcs
- def _set_rcs(self, new_rcs):
- if new_rcs == None:
- new_rcs = rcs.rcs_by_name("None")
- new_rcs.encoding = self.encoding
- self._rcs = new_rcs
- new_rcs.root(self.root)
- self.rcs_name = new_rcs.name
- _rcs = None
- rcs = property(_get_rcs, _set_rcs,
- doc="A revision control system (RCS) instance")
- rcs_name = setting_property("rcs_name",
- ("None", "bzr", "git", "Arch", "hg"),
- doc=
-"""The name of the current RCS. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .rcs instead, and
-.rcs_name will be automatically adjusted.""")
-
-
- def _get_user_id(self):
- if self._user_id == None and self.rcs != None:
- self._user_id = self.rcs.get_user_id()
- return self._user_id
- def _set_user_id(self, user_id):
- if self.rcs != None:
- self.rcs.user_id = user_id
- self._user_id = user_id
- user_id = property(_get_user_id, _set_user_id, doc=
-"""The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Note
-that the Arch RCS backend *enforces* ids with this format.""")
- _user_id = setting_property("user_id", doc=
-"""The user's prefered name. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .user_id instead,
-and ._user_id will be automatically adjusted. This setting is
-only saved if ._save_user_id == True""")
-
-
- target = setting_property("target",
- doc="The current project development target")
-
- def save_user_id(self, user_id=None):
- if user_id == None:
- user_id = self.user_id
- self._save_user_id = True
- self.user_id = user_id
-
def get_path(self, *args):
my_dir = os.path.join(self.root, ".be")
if len(args) == 0:
@@ -292,7 +304,6 @@ only saved if ._save_user_id == True""")
if allow_rcs_init == True:
new_rcs = rcs.installed_rcs()
new_rcs.init(self.root)
- self.rcs = new_rcs
return new_rcs
def load(self):
@@ -303,14 +314,10 @@ only saved if ._save_user_id == True""")
else:
if not os.path.exists(self.get_path()):
raise NoBugDir(self.get_path())
- self.settings = self._get_settings(self.get_path("settings"))
+ self.load_settings()
self.rcs = rcs.rcs_by_name(self.rcs_name)
- self.encoding = self.encoding # setup encoding, IO_stream_encoding...
- if self.settings.get("user_id") != None:
- self.save_user_id() # was a user name in the settings file
-
- self._bug_map_gen()
+ self._setup_encoding(self.encoding)
def load_all_bugs(self):
"Warning: this could take a while."
@@ -321,45 +328,31 @@ only saved if ._save_user_id == True""")
def save(self):
self.rcs.mkdir(self.get_path())
self.set_version()
- self._save_settings(self.get_path("settings"), self.settings)
+ self.save_settings()
self.rcs.mkdir(self.get_path("bugs"))
for bug in self:
bug.save()
+ def load_settings(self):
+ self.settings = self._get_settings(self.get_path("settings"))
+ self._setup_saved_settings()
+
def _get_settings(self, settings_path):
- if self.rcs_name == None:
- # Use a temporary RCS to loading settings the first time
- RCS = rcs.rcs_by_name("None")
- RCS.root(self.root)
- else:
- RCS = self.rcs
-
- allow_no_rcs = not RCS.path_in_root(settings_path)
+ allow_no_rcs = not self.rcs.path_in_root(settings_path)
# allow_no_rcs=True should only be for the special case of
# configuring duplicate bugdir settings
try:
- settings = mapfile.map_load(RCS, settings_path, allow_no_rcs)
+ settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
except rcs.NoSuchFile:
settings = {"rcs_name": "None"}
return settings
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self._save_settings(self.get_path("settings"), settings)
+
def _save_settings(self, settings_path, settings):
- this_dir_path = os.path.realpath(self.get_path("settings"))
- if os.path.realpath(settings_path) == this_dir_path:
- if not os.path.exists(self.get_path()):
- # don't save settings until the bug directory has been
- # initialized. this initialization happens the first time
- # a bug directory is saved (BugDir.save()). If the user
- # is just working with a BugDir in memory, we don't want
- # to go cluttering up his file system with settings files.
- return
- if self._save_user_id == False:
- if "user_id" in settings:
- settings = copy.copy(settings)
- del settings["user_id"]
- if settings.get("encoding") == encoding.get_encoding():
- del settings["encoding"] # don't duplicate system default
allow_no_rcs = not self.rcs.path_in_root(settings_path)
# allow_no_rcs=True should only be for the special case of
# configuring duplicate bugdir settings
@@ -383,15 +376,6 @@ only saved if ._save_user_id == True""")
def remove_duplicate_bugdir(self):
self.rcs.remove_duplicate_repo()
- def _bug_map_gen(self):
- map = {}
- for bug in self:
- map[bug.uuid] = bug
- for uuid in self.list_uuids():
- if uuid not in map:
- map[uuid] = None
- self._bug_map = map
-
def list_uuids(self):
uuids = []
if os.path.exists(self.get_path()):
@@ -409,6 +393,7 @@ only saved if ._save_user_id == True""")
def _clear_bugs(self):
while len(self) > 0:
self.pop()
+ self._bug_map_gen()
def _load_bug(self, uuid):
bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
@@ -567,6 +552,37 @@ class BugDirTestCase(unittest.TestCase):
self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
self.bugdir.save()
self.versionTest()
+ def testComments(self):
+ self.bugdir.new_bug(uuid="a", summary="Ant")
+ bug = self.bugdir.bug_from_uuid("a")
+ comm = bug.comment_root
+ rep = comm.new_reply("Ants are small.")
+ rep.new_reply("And they have six legs.")
+ self.bugdir.save()
+ self.bugdir._clear_bugs()
+ bug = self.bugdir.bug_from_uuid("a")
+ bug.load_comments()
+ self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
+ for index,comment in enumerate(bug.comments()):
+ if index == 0:
+ repLoaded = comment
+ self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
+ self.failUnless(comment.sync_with_disk == True,
+ comment.sync_with_disk)
+ #load_settings()
+ self.failUnless(comment.content_type == "text/plain",
+ comment.content_type)
+ self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
+ repLoaded.settings)
+ self.failUnless(repLoaded.body == "Ants are small.",
+ repLoaded.body)
+ elif index == 1:
+ self.failUnless(comment.in_reply_to == repLoaded.uuid,
+ repLoaded.uuid)
+ self.failUnless(comment.body == "And they have six legs.",
+ comment.body)
+ else:
+ self.failIf(True, "Invalid comment: %d\n%s" % (index, comment))
unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()])
diff --git a/libbe/comment.py b/libbe/comment.py
index ab0973d..6c0e5c0 100644
--- a/libbe/comment.py
+++ b/libbe/comment.py
@@ -23,10 +23,23 @@ import textwrap
import doctest
from beuuid import uuid_gen
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, cached_property, \
+ primed_property, change_hook_property, settings_property
+import settings_object
import mapfile
from tree import Tree
import utility
+
+class InvalidShortname(KeyError):
+ def __init__(self, shortname, shortnames):
+ msg = "Invalid shortname %s\n%s" % (shortname, shortnames)
+ KeyError.__init__(self, msg)
+ self.shortname = shortname
+ self.shortnames = shortnames
+
+
INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!"
def _list_to_root(comments, bug):
@@ -45,7 +58,8 @@ def _list_to_root(comments, bug):
assert comment.uuid != None
uuid_map[comment.uuid] = comment
for comm in comments:
- if comm.in_reply_to == None:
+ rep = comm.in_reply_to
+ if rep == None or rep == settings_object.EMPTY or rep == bug.uuid:
root_comments.append(comm)
else:
parentUUID = comm.in_reply_to
@@ -55,7 +69,11 @@ def _list_to_root(comments, bug):
dummy_root.extend(root_comments)
return dummy_root
-def loadComments(bug):
+def loadComments(bug, load_full=False):
+ """
+ Set load_full=True when you want to load the comment completely
+ from disk *now*, rather than waiting and lazy loading as required.
+ """
path = bug.get_path("comments")
if not os.path.isdir(path):
return Comment(bug, uuid=INVALID_UUID)
@@ -64,6 +82,9 @@ def loadComments(bug):
if uuid.startswith('.'):
continue
comm = Comment(bug, uuid, from_disk=True)
+ if load_full == True:
+ comm.load_settings()
+ dummy = comm.body # force the body to load
comments.append(comm)
return _list_to_root(comments, bug)
@@ -73,15 +94,84 @@ def saveComments(bug):
for comment in bug.comment_root.traverse():
comment.save()
-class InvalidShortname(KeyError):
- def __init__(self, shortname, shortnames):
- msg = "Invalid shortname %s\n%s" % (shortname, shortnames)
- KeyError.__init__(self, msg)
- self.shortname = shortname
- self.shortnames = shortnames
+class Comment(Tree, settings_object.SavedSettingsObject):
+ """
+ >>> c = Comment()
+ >>> c.uuid != None
+ True
+ >>> c.uuid = "some-UUID"
+ >>> print c.content_type
+ text/plain
+ """
+
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+
+ @_versioned_property(name="From",
+ doc="The author of the comment")
+ def From(): return {}
+
+ @_versioned_property(name="In-reply-to",
+ doc="UUID for parent comment or bug")
+ def in_reply_to(): return {}
+
+ @_versioned_property(name="Content-type",
+ doc="Mime type for comment body",
+ default="text/plain",
+ require_save=True)
+ def content_type(): return {}
+
+ @_versioned_property(name="Date",
+ doc="An RFC 2822 timestamp for comment creation")
+ def time_string(): return {}
+
+ def _get_time(self):
+ if self.time_string == None:
+ return None
+ return utility.str_to_time(self.time_string)
+ def _set_time(self, value):
+ self.time_string = utility.time_to_str(value)
+ time = property(fget=_get_time,
+ fset=_set_time,
+ doc="An integer version of .time_string")
+
+ def _get_comment_body(self):
+ if self.rcs != None and self.sync_with_disk == True:
+ import rcs
+ return self.rcs.get_file_contents(self.get_path("body"))
+ def _set_comment_body(self, value, force=False):
+ if (self.rcs != None and self.sync_with_disk == True) or force==True:
+ assert value != None, "Can't save empty comment"
+ self.rcs.set_file_contents(self.get_path("body"), value)
+
+ @Property
+ @change_hook_property(hook=_set_comment_body)
+ @cached_property(generator=_get_comment_body)
+ @local_property("body")
+ @doc_property(doc="The meat of the comment")
+ def body(): return {}
+
+ def _get_rcs(self):
+ if hasattr(self.bug, "rcs"):
+ return self.bug.rcs
+
+ @Property
+ @cached_property(generator=_get_rcs)
+ @local_property("rcs")
+ @doc_property(doc="A revision control system instance.")
+ def rcs(): return {}
-class Comment(Tree):
def __init__(self, bug=None, uuid=None, from_disk=False,
in_reply_to=None, body=None):
"""
@@ -97,26 +187,19 @@ class Comment(Tree):
in_reply_to should be the uuid string of the parent comment.
"""
Tree.__init__(self)
+ settings_object.SavedSettingsObject.__init__(self)
self.bug = bug
- if bug != None:
- self.rcs = bug.rcs
- else:
- self.rcs = None
+ self.uuid = uuid
if from_disk == True:
- self.uuid = uuid
- self.load()
+ self.sync_with_disk = True
else:
- if uuid != None:
- self.uuid = uuid
- else:
+ self.sync_with_disk = False
+ if uuid == None:
self.uuid = uuid_gen()
- self.time = time.time()
+ self.time = int(time.time()) # only save to second precision
if self.rcs != None:
self.From = self.rcs.get_user_id()
- else:
- self.From = None
self.in_reply_to = in_reply_to
- self.content_type = "text/plain"
self.body = body
def traverse(self, *args, **kwargs):
@@ -126,22 +209,17 @@ class Comment(Tree):
continue
yield comment
- def _clean_string(self, value):
- """
- >>> comm = Comment()
- >>> comm._clean_string(None)
- ''
- >>> comm._clean_string("abc")
- 'abc'
- """
- if value == None:
+ def _setting_attr_string(self, setting):
+ value = getattr(self, setting)
+ if value == settings_object.EMPTY:
return ""
- return value
+ else:
+ return str(value)
def string(self, indent=0, shortname=None):
"""
>>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
- >>> comm.time = utility.str_to_time("Thu, 01 Jan 1970 00:00:00 +0000")
+ >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000"
>>> print comm.string(indent=2, shortname="com-1")
--------- Comment ---------
Name: com-1
@@ -157,12 +235,12 @@ class Comment(Tree):
lines = []
lines.append("--------- Comment ---------")
lines.append("Name: %s" % shortname)
- lines.append("From: %s" % self._clean_string(self.From))
- lines.append("Date: %s" % utility.time_to_str(self.time))
+ lines.append("From: %s" % (self._setting_attr_string("From")))
+ lines.append("Date: %s" % self.time_string)
lines.append("")
- #lines.append(textwrap.fill(self._clean_string(self.body),
+ #lines.append(textwrap.fill(self.body or "",
# width=(79-indent)))
- lines.extend(self._clean_string(self.body).splitlines())
+ lines.extend((self.body or "").splitlines())
# some comments shouldn't be wrapped...
istring = ' '*indent
@@ -173,7 +251,7 @@ class Comment(Tree):
"""
>>> comm = Comment(bug=None, body="Some insightful remarks")
>>> comm.uuid = "com-1"
- >>> comm.time = utility.str_to_time("Thu, 20 Nov 2008 15:55:11 +0000")
+ >>> comm.time_string = "Thu, 20 Nov 2008 15:55:11 +0000"
>>> comm.From = "Jane Doe <jdoe@example.com>"
>>> print comm
--------- Comment ---------
@@ -192,36 +270,22 @@ class Comment(Tree):
assert name in ["values", "body"]
return os.path.join(my_dir, name)
- def load(self):
- map = mapfile.map_load(self.rcs, self.get_path("values"))
- self.time = utility.str_to_time(map["Date"])
- self.From = map["From"]
- self.in_reply_to = map.get("In-reply-to")
- self.content_type = map.get("Content-type", "text/plain")
- self.body = self.rcs.get_file_contents(self.get_path("body"))
+ def load_settings(self):
+ self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
+ self._setup_saved_settings()
- def save(self):
- assert self.rcs != None
- map_file = {"Date": utility.time_to_str(self.time)}
- self._add_headers(map_file, ("From", "in_reply_to", "content_type"))
+ def save_settings(self):
self.rcs.mkdir(self.get_path())
- mapfile.map_save(self.rcs, self.get_path("values"), map_file)
- self.rcs.set_file_contents(self.get_path("body"), self.body)
-
- def _add_headers(self, map, names):
- map_names = {}
- for name in names:
- map_names[name] = self._pyname_to_header(name)
- self._add_attrs(map, map_names)
-
- def _pyname_to_header(self, name):
- return name.capitalize().replace('_', '-')
-
- def _add_attrs(self, map, map_names):
- for name in map_names.keys():
- value = getattr(self, name)
- if value is not None:
- map[map_names[name]] = value
+ path = self.get_path("values")
+ mapfile.map_save(self.rcs, path, self._get_saved_settings())
+
+ def save(self):
+ assert self.body != None, "Can't save blank comment"
+ #if self.in_reply_to == None:
+ # raise Exception, str(self)+'\n'+str(self.settings)+'\n'+str(self._settings_loaded)
+ #assert self.in_reply_to != None, "Comment must be a reply to something"
+ self.save_settings()
+ self._set_comment_body(self.body, force=True)
def remove(self):
for comment in self.traverse():
@@ -232,15 +296,19 @@ class Comment(Tree):
if self.uuid != INVALID_UUID:
reply.in_reply_to = self.uuid
self.append(reply)
+ #raise Exception, "adding reply \n%s\n%s" % (self, reply)
def new_reply(self, body=None):
"""
>>> comm = Comment(bug=None, body="Some insightful remarks")
>>> repA = comm.new_reply("Critique original comment")
>>> repB = repA.new_reply("Begin flamewar :p")
+ >>> repB.in_reply_to == repA.uuid
+ True
"""
reply = Comment(self.bug, body=body)
self.add_reply(reply)
+ #raise Exception, "new reply added (%s),\n%s\n%s\n\t--%s--" % (body, self, reply, reply.in_reply_to)
return reply
def string_thread(self, name_map={}, indent=0, flatten=True,
diff --git a/libbe/properties.py b/libbe/properties.py
new file mode 100644
index 0000000..176e898
--- /dev/null
+++ b/libbe/properties.py
@@ -0,0 +1,477 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+This module provides a series of useful decorators for defining
+various types of properties. For example usage, consider the
+unittests at the end of the module.
+
+See
+ http://www.python.org/dev/peps/pep-0318/
+and
+ http://www.phyast.pitt.edu/~micheles/python/documentation.html
+for more information on decorators.
+"""
+
+import unittest
+
+class ValueCheckError (ValueError):
+ def __init__(self, name, value, allowed):
+ msg = "%s not in %s for %s" % (value, allowed, name)
+ ValueError.__init__(self, msg)
+ self.name = name
+ self.value = value
+ self.allowed = allowed
+
+def Property(funcs):
+ """
+ End a chain of property decorators, returning a property.
+ """
+ args = {}
+ args["fget"] = funcs.get("fget", None)
+ args["fset"] = funcs.get("fset", None)
+ args["fdel"] = funcs.get("fdel", None)
+ args["doc"] = funcs.get("doc", None)
+
+ #print "Creating a property with"
+ #for key, val in args.items(): print key, value
+ return property(**args)
+
+def doc_property(doc=None):
+ """
+ Add a docstring to a chain of property decorators.
+ """
+ def decorator(funcs=None):
+ """
+ Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...}
+ or a function fn() returning such a dict.
+ """
+ if hasattr(funcs, "__call__"):
+ funcs = funcs() # convert from function-arg to dict
+ funcs["doc"] = doc
+ return funcs
+ return decorator
+
+def local_property(name):
+ """
+ Define get/set access to per-parent-instance local storage. Uses
+ ._<name>_value to store the value for a particular owner instance.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ value = getattr(self, "_%s_value" % name, None)
+ return value
+ def _fset(self, value):
+ setattr(self, "_%s_value" % name, value)
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+def settings_property(name):
+ """
+ Similar to local_property, except where local_property stores the
+ value in instance._<name>_value, settings_property stores the
+ value in instance.settings[name].
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ value = self.settings.get(name, None)
+ return value
+ def _fset(self, value):
+ self.settings[name] = value
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+def defaulting_property(default=None, null=None):
+ """
+ Define a default value for get access to a property.
+ If the stored value is null, then default is returned.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ def _fget(self):
+ value = fget(self)
+ if value == null:
+ return default
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def fn_checked_property(value_allowed_fn):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ return value
+ def _fset(self, value):
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def checked_property(allowed=[]):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ return value
+ def _fset(self, value):
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def cached_property(generator, initVal=None):
+ """
+ Allow caching of values generated by generator(instance), where
+ instance is the instance to which this property belongs. Uses
+ ._<name>_cache to store a cache flag for a particular owner
+ instance.
+
+ When the cache flag is True or missing and the stored value is
+ initVal, the first fget call triggers the generator function,
+ whiose output is stored in _<name>_cached_value. That and
+ subsequent calls to fget will return this cached value.
+
+ If the input value is no longer initVal (e.g. a value has been
+ loaded from disk or set with fset), that value overrides any
+ cached value, and this property has no effect.
+
+ When the cache flag is False and the stored value is initVal, the
+ generator is not cached, but is called on every fget.
+
+ The cache flag is missing on initialization. Particular instances
+ may override by setting their own flag.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ cache = getattr(self, "_%s_cache" % name, True)
+ value = fget(self)
+ if cache == True:
+ if value == initVal:
+ if hasattr(self, "_%s_cached_value" % name):
+ value = getattr(self, "_%s_cached_value" % name)
+ else:
+ value = generator(self)
+ setattr(self, "_%s_cached_value" % name, value)
+ else:
+ if value == initVal:
+ value = generator(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def primed_property(primer, initVal=None):
+ """
+ Just like a generator_property, except that instead of returning a
+ new value and running fset to cache it, the primer performs some
+ background manipulation (e.g. loads data into instance.settings)
+ such that a _second_ pass through fget succeeds.
+
+ The 'cache' flag becomes a 'prime' flag, with priming taking place
+ whenever ._<name>_prime is True, or is False or missing and
+ value == initVal.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ prime = getattr(self, "_%s_prime" % name, False)
+ if prime == False:
+ value = fget(self)
+ if prime == True or (prime == False and value == initVal):
+ primer(self)
+ value = fget(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def change_hook_property(hook):
+ """
+ Call the function hook(instance, old_value, new_value) whenever a
+ value different from the current value is set (instance is a a
+ reference to the class instance to which this property belongs).
+ This is useful for saving changes to disk, etc.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fset(self, value):
+ old_value = fget(self)
+ if value != old_value:
+ hook(self, old_value, value)
+ fset(self, value)
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+
+class DecoratorTests(unittest.TestCase):
+ def testLocalDoc(self):
+ class Test(object):
+ @Property
+ @doc_property("A fancy property")
+ def x():
+ return {}
+ self.failUnless(Test.x.__doc__ == "A fancy property",
+ Test.x.__doc__)
+ def testLocalProperty(self):
+ class Test(object):
+ @Property
+ @local_property(name="LOCAL")
+ def x():
+ return {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("_LOCAL_value" in dir(t), dir(t))
+ self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
+ def testSettingsProperty(self):
+ class Test(object):
+ @Property
+ @settings_property(name="attr")
+ def x():
+ return {}
+ def __init__(self):
+ self.settings = {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("attr" in t.settings, t.settings)
+ self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
+ def testDefaultingLocalProperty(self):
+ class Test(object):
+ @Property
+ @defaulting_property(default='y', null='x')
+ @local_property(name="DEFAULT")
+ def x(): return {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'x'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'y'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'z'
+ self.failUnless(t.x == 'z', str(t.x))
+ def testCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testTwoCheckedLocalProperties(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="X")
+ def x(): return {}
+
+ @Property
+ @checked_property(allowed=['a', 'b', 'c'])
+ @local_property(name="A")
+ def a(): return {}
+ def __init__(self):
+ self._A_value = 'a'
+ self._X_value = 'x'
+ t = Test()
+ try:
+ t.x = 'a'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.x = 'x'
+ t.x = 'y'
+ t.x = 'z'
+ try:
+ t.a = 'x'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.a = 'a'
+ t.a = 'b'
+ t.a = 'c'
+ def testFnCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @fn_checked_property(lambda v : v in ['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testCachedLocalProperty(self):
+ class Gen(object):
+ def __init__(self):
+ self.i = 0
+ def __call__(self, owner):
+ self.i += 1
+ return self.i
+ class Test(object):
+ @Property
+ @cached_property(generator=Gen(), initVal=None)
+ @local_property(name="CACHED")
+ def x(): return {}
+ t = Test()
+ self.failIf("_CACHED_cache" in dir(t), getattr(t, "_CACHED_cache", None))
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ t.x = 8
+ self.failUnless(t.x == 8, t.x)
+ self.failUnless(t.x == 8, t.x)
+ t._CACHED_cache = False # Caching is off, but the stored value
+ val = t.x # is 8, not the initVal (None), so we
+ self.failUnless(val == 8, val) # get 8.
+ t._CACHED_value = None # Now we've set the stored value to None
+ val = t.x # so future calls to fget (like this)
+ self.failUnless(val == 2, val) # will call the generator every time...
+ val = t.x
+ self.failUnless(val == 3, val)
+ val = t.x
+ self.failUnless(val == 4, val)
+ t._CACHED_cache = True # We turn caching back on, and get
+ self.failUnless(t.x == 1, str(t.x)) # the original cached value.
+ del t._CACHED_cached_value # Removing that value forces a
+ self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call
+ self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which
+ self.failUnless(t.x == 5, str(t.x)) # we get the new cached value.
+ def testPrimedLocalProperty(self):
+ class Test(object):
+ def prime(self):
+ self.settings["PRIMED"] = "initialized"
+ @Property
+ @primed_property(primer=prime, initVal=None)
+ @settings_property(name="PRIMED")
+ def x(): return {}
+ def __init__(self):
+ self.settings={}
+ t = Test()
+ self.failIf("_PRIMED_prime" in dir(t), getattr(t, "_PRIMED_prime", None))
+ self.failUnless(t.x == "initialized", t.x)
+ t.x = 1
+ self.failUnless(t.x == 1, t.x)
+ t.x = None
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = True
+ t.x = 3
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = False
+ t.x = 3
+ self.failUnless(t.x == 3, t.x)
+ def testChangeHookLocalProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+
+ @Property
+ @change_hook_property(_hook)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 2
+ self.failUnless(t.old == 1, t.old)
+ self.failUnless(t.new == 2, t.new)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
+
diff --git a/libbe/settings_object.py b/libbe/settings_object.py
new file mode 100644
index 0000000..8b0ff47
--- /dev/null
+++ b/libbe/settings_object.py
@@ -0,0 +1,267 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+This module provides a base class implementing settings-dict based
+property storage useful for BE objects with saved properties
+(e.g. BugDir, Bug, Comment). For example usage, consider the
+unittests at the end of the module.
+"""
+
+import doctest
+import unittest
+
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+
+# Define an invalid value for our properties, distinct from None,
+# which shows that a property has been initialized but has no value.
+EMPTY = -1
+
+
+def prop_save_settings(self, old, new):
+ if self.sync_with_disk==True:
+ self.save_settings()
+def prop_load_settings(self):
+ if self.sync_with_disk==True and self._settings_loaded==False:
+ self.load_settings()
+ else:
+ self._setup_saved_settings(flag_as_loaded=False)
+
+def setting_name_to_attr_name(self, name):
+ """
+ Convert keys to the .settings dict into their associated
+ SavedSettingsObject attribute names.
+ >>> print setting_name_to_attr_name(None,"User-id")
+ user_id
+ """
+ return name.lower().replace('-', '_')
+
+def attr_name_to_setting_name(self, name):
+ """
+ The inverse of setting_name_to_attr_name.
+ >>> print attr_name_to_setting_name(None, "user_id")
+ User-id
+ """
+ return name.capitalize().replace('_', '-')
+
+def versioned_property(name, doc,
+ default=None, generator=None,
+ change_hook=prop_save_settings,
+ primer=prop_load_settings,
+ allowed=None, check_fn=None,
+ settings_properties=[],
+ required_saved_properties=[],
+ require_save=False):
+ """
+ Combine the common decorators in a single function.
+
+ Use zero or one (but not both) of default or generator, since a
+ working default will keep the generator from functioning. Use the
+ default if you know what you want the default value to be at
+ 'coding time'. Use the generator if you can write a function to
+ determine a valid default at run time.
+
+ allowed and check_fn have a similar relationship, although you can
+ use both of these if you want. allowed compares the proposed
+ value against a list determined at 'coding time' and check_fn
+ allows more flexible comparisons to take place at run time.
+
+ Set require_save to True if you want to save the default/generated
+ value for a property, to protect against future changes. E.g., we
+ currently expect all comments to be 'text/plain' but in the future
+ we may want to default to 'text/html'. If we don't want the old
+ comments to be interpreted as 'text/html', we would require that
+ the content type be saved.
+
+ change_hook, primer, settings_properties, and
+ required_saved_properties are only options to get their defaults
+ into our local scope. Don't mess with them.
+ """
+ settings_properties.append(name)
+ if require_save == True:
+ required_saved_properties.append(name)
+ def decorator(funcs):
+ fulldoc = doc
+ if default != None:
+ defaulting = defaulting_property(default=default, null=EMPTY)
+ fulldoc += "\n\nThis property defaults to %s" % default
+ if generator != None:
+ cached = cached_property(generator=generator, initVal=EMPTY)
+ fulldoc += "\n\nThis property is generated with %s" % generator
+ if check_fn != None:
+ fn_checked = fn_checked_property(value_allowed_fn=check_fn)
+ fulldoc += "\n\nThis property is checked with %s" % check_fn
+ if allowed != None:
+ checked = checked_property(allowed=allowed)
+ fulldoc += "\n\nThe allowed values for this property are: %s." \
+ % (', '.join(allowed))
+ hooked = change_hook_property(hook=change_hook)
+ primed = primed_property(primer=primer)
+ settings = settings_property(name=name)
+ docp = doc_property(doc=fulldoc)
+ deco = hooked(primed(settings(docp(funcs))))
+ if default != None:
+ deco = defaulting(deco)
+ if generator != None:
+ deco = cached(deco)
+ if default != None:
+ deco = defaulting(deco)
+ if allowed != None:
+ deco = checked(deco)
+ if check_fn != None:
+ deco = fn_checked(deco)
+ return Property(deco)
+ return decorator
+
+class SavedSettingsObject(object):
+
+ # Keep a list of properties that may be stored in the .settings dict.
+ #settings_properties = []
+
+ # A list of properties that we save to disk, even if they were
+ # never set (in which case we save the default value). This
+ # protects against future changes in default values.
+ #required_saved_properties = []
+
+ _setting_name_to_attr_name = setting_name_to_attr_name
+ _attr_name_to_setting_name = attr_name_to_setting_name
+
+ def __init__(self):
+ self._settings_loaded = False
+ self.sync_with_disk = False
+ self.settings = {}
+
+ def load_settings(self):
+ """Load the settings from disk."""
+ # Override. Must call ._setup_saved_settings() after loading.
+ self.settings = {}
+ self._setup_saved_settings()
+
+ def _setup_saved_settings(self, flag_as_loaded=True):
+ """To be run after setting self.settings up from disk."""
+ for property in self.settings_properties:
+ if property not in self.settings:
+ self.settings[property] = EMPTY
+ elif self.settings[property] == None:
+ self.settings[property] = EMPTY
+ if flag_as_loaded == True:
+ self._settings_loaded = True
+
+ def save_settings(self):
+ """Load the settings from disk."""
+ # Override. Should save the dict output of ._get_saved_settings()
+ settings = self._get_saved_settings()
+ pass # write settings to disk....
+
+ def _get_saved_settings(self):
+ settings = {}
+ for k,v in self.settings.items():
+ if v != None and v != EMPTY:
+ settings[k] = v
+ for k in self.required_saved_properties:
+ settings[k] = getattr(self, self._setting_name_to_attr_name(k))
+ return settings
+
+ def clear_cached_setting(self, setting=None):
+ "If setting=None, clear *all* cached settings"
+ if setting != None:
+ if hasattr(self, "_%s_cached_value" % setting):
+ delattr(self, "_%s_cached_value" % setting)
+ else:
+ for setting in settings_properties:
+ self.clear_cached_setting(setting)
+
+
+class SavedSettingsObjectTests(unittest.TestCase):
+ def testDefaultingProperty(self):
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ t.load_settings()
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == {}, t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t.content_type == "text/html",
+ t.content_type)
+ self.failUnless(t.settings["Content-type"] == "text/html",
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testRequiredDefaultingProperty(self):
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ require_save=True)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testClassVersionedPropertyDefinition(self):
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return versioned_property(**kwargs)
+ @_versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ require_save=True)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
+ t._get_saved_settings())
+
+unitsuite=unittest.TestLoader().loadTestsFromTestCase(SavedSettingsObjectTests)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])