aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage/util
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/storage/util')
-rw-r--r--libbe/storage/util/__init__.py0
-rw-r--r--libbe/storage/util/config.py114
-rw-r--r--libbe/storage/util/mapfile.py146
-rw-r--r--libbe/storage/util/properties.py666
-rw-r--r--libbe/storage/util/settings_object.py617
-rw-r--r--libbe/storage/util/upgrade.py331
6 files changed, 1874 insertions, 0 deletions
diff --git a/libbe/storage/util/__init__.py b/libbe/storage/util/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/libbe/storage/util/__init__.py
diff --git a/libbe/storage/util/config.py b/libbe/storage/util/config.py
new file mode 100644
index 0000000..724d2d3
--- /dev/null
+++ b/libbe/storage/util/config.py
@@ -0,0 +1,114 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Create, save, and load the per-user config file at :func:`path`.
+"""
+
+import ConfigParser
+import codecs
+import os.path
+
+import libbe
+import libbe.util.encoding
+if libbe.TESTING == True:
+ import doctest
+
+
+default_encoding = libbe.util.encoding.get_filesystem_encoding()
+"""Default filesystem encoding.
+
+Initialized with :func:`libbe.util.encoding.get_filesystem_encoding`.
+"""
+
+def path():
+ """Return the path to the per-user config file.
+ """
+ return os.path.expanduser("~/.bugs_everywhere")
+
+def set_val(name, value, section="DEFAULT", encoding=None):
+ """Set a value in the per-user config file.
+
+ Parameters
+ ----------
+ name : str
+ The name of the value to set.
+ value : str or None
+ The new value to set (or None to delete the value).
+ section : str
+ The section to store the name/value in.
+ encoding : str
+ The config file's encoding, defaults to :data:`default_encoding`.
+ """
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ if os.path.exists(path()) == False: # touch file or config
+ open(path(), 'w').close() # read chokes on missing file
+ f = codecs.open(path(), 'r', encoding)
+ config.readfp(f, path())
+ f.close()
+ if value is not None:
+ config.set(section, name, value)
+ else:
+ config.remove_option(section, name)
+ f = codecs.open(path(), 'w', encoding)
+ config.write(f)
+ f.close()
+
+def get_val(name, section="DEFAULT", default=None, encoding=None):
+ """Get a value from the per-user config file
+
+ Parameters
+ ----------
+ name : str
+ The name of the value to set.
+ section : str
+ The section to store the name/value in.
+ default :
+ The value to return if `name` is not set.
+ encoding : str
+ The config file's encoding, defaults to :data:`default_encoding`.
+
+ Examples
+ --------
+
+ >>> get_val("junk") is None
+ True
+ >>> set_val("junk", "random")
+ >>> get_val("junk")
+ u'random'
+ >>> set_val("junk", None)
+ >>> get_val("junk") is None
+ True
+ """
+ if os.path.exists(path()):
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ f = codecs.open(path(), 'r', encoding)
+ config.readfp(f, path())
+ f.close()
+ try:
+ return config.get(section, name)
+ except ConfigParser.NoOptionError:
+ return default
+ else:
+ return default
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/storage/util/mapfile.py b/libbe/storage/util/mapfile.py
new file mode 100644
index 0000000..55863d7
--- /dev/null
+++ b/libbe/storage/util/mapfile.py
@@ -0,0 +1,146 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Serializing and deserializing dictionaries of parameters.
+
+The serialized "mapfiles" should be clear, flat-text strings, and allow
+easy merging of independent/conflicting changes.
+"""
+
+import errno
+import os.path
+import types
+import yaml
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+
+class IllegalKey(Exception):
+ def __init__(self, key):
+ Exception.__init__(self, 'Illegal key "%s"' % key)
+ self.key = key
+
+class IllegalValue(Exception):
+ def __init__(self, value):
+ Exception.__init__(self, 'Illegal value "%s"' % value)
+ self.value = value
+
+class InvalidMapfileContents(Exception):
+ def __init__(self, contents):
+ Exception.__init__(self, 'Invalid YAML contents')
+ self.contents = contents
+
+def generate(map):
+ """Generate a YAML mapfile content string.
+
+ Examples
+ --------
+
+ >>> generate({'q':'p'})
+ 'q: p\\n\\n'
+ >>> generate({'q':u'Fran\u00e7ais'})
+ 'q: Fran\\xc3\\xa7ais\\n\\n'
+ >>> generate({'q':u'hello'})
+ 'q: hello\\n\\n'
+ >>> generate({'q=':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q="
+ >>> generate({'q:':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q:"
+ >>> generate({'q\\n':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q\\n"
+ >>> generate({'':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ""
+ >>> generate({'>q':'p'})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ">q"
+ >>> generate({'q':'p\\n'})
+ Traceback (most recent call last):
+ IllegalValue: Illegal value "p\\n"
+
+ See Also
+ --------
+ parse : inverse
+ """
+ keys = map.keys()
+ keys.sort()
+ for key in keys:
+ try:
+ assert not key.startswith('>')
+ assert('\n' not in key)
+ assert('=' not in key)
+ assert(':' not in key)
+ assert(len(key) > 0)
+ except AssertionError:
+ raise IllegalKey(unicode(key).encode('unicode_escape'))
+ if '\n' in map[key]:
+ raise IllegalValue(unicode(map[key]).encode('unicode_escape'))
+
+ lines = []
+ for key in keys:
+ lines.append(yaml.safe_dump({key: map[key]},
+ default_flow_style=False,
+ allow_unicode=True))
+ lines.append('')
+ return '\n'.join(lines)
+
+def parse(contents):
+ """Parse a YAML mapfile string.
+
+ Examples
+ --------
+
+ >>> parse('q: p\\n\\n')['q']
+ 'p'
+ >>> parse('q: \\'p\\'\\n\\n')['q']
+ 'p'
+ >>> contents = generate({'a':'b', 'c':'d', 'e':'f'})
+ >>> dict = parse(contents)
+ >>> dict['a']
+ 'b'
+ >>> dict['c']
+ 'd'
+ >>> dict['e']
+ 'f'
+ >>> contents = generate({'q':u'Fran\u00e7ais'})
+ >>> dict = parse(contents)
+ >>> dict['q']
+ u'Fran\\xe7ais'
+ >>> dict = parse('a!')
+ Traceback (most recent call last):
+ ...
+ InvalidMapfileContents: Invalid YAML contents
+
+ See Also
+ --------
+ generate : inverse
+
+ """
+ c = yaml.load(contents)
+ if type(c) == types.StringType:
+ raise InvalidMapfileContents(
+ 'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents)
+ return c or {}
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py
new file mode 100644
index 0000000..b5681b1
--- /dev/null
+++ b/libbe/storage/util/properties.py
@@ -0,0 +1,666 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Provides a series of useful decorators for defining various types
+of properties.
+
+For example usage, consider the unittests at the end of the module.
+
+Notes
+-----
+
+See `PEP 318` and Michele Simionato's `decorator documentation` for
+more information on decorators.
+
+.. _PEP 318: http://www.python.org/dev/peps/pep-0318/
+.. _decorator documentation: http://www.phyast.pitt.edu/~micheles/python/documentation.html
+
+See Also
+--------
+:mod:`libbe.storage.util.settings_object` : bundle properties into a convenient package
+
+"""
+
+import copy
+import types
+
+import libbe
+if libbe.TESTING == True:
+ import unittest
+
+
+class ValueCheckError (ValueError):
+ def __init__(self, name, value, allowed):
+ action = "in" # some list of allowed values
+ if type(allowed) == types.FunctionType:
+ action = "allowed by" # some allowed-value check function
+ msg = "%s not %s %s for %s" % (value, action, allowed, name)
+ ValueError.__init__(self, msg)
+ self.name = name
+ self.value = value
+ self.allowed = allowed
+
+def Property(funcs):
+ """
+ End a chain of property decorators, returning a property.
+ """
+ args = {}
+ args["fget"] = funcs.get("fget", None)
+ args["fset"] = funcs.get("fset", None)
+ args["fdel"] = funcs.get("fdel", None)
+ args["doc"] = funcs.get("doc", None)
+
+ #print "Creating a property with"
+ #for key, val in args.items(): print key, value
+ return property(**args)
+
+def doc_property(doc=None):
+ """
+ Add a docstring to a chain of property decorators.
+ """
+ def decorator(funcs=None):
+ """
+ Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...}
+ or a function fn() returning such a dict.
+ """
+ if hasattr(funcs, "__call__"):
+ funcs = funcs() # convert from function-arg to dict
+ funcs["doc"] = doc
+ return funcs
+ return decorator
+
+def local_property(name, null=None, mutable_null=False):
+ """
+ Define get/set access to per-parent-instance local storage. Uses
+ ._<name>_value to store the value for a particular owner instance.
+ If the ._<name>_value attribute does not exist, returns null.
+
+ If mutable_null == True, we only release deepcopies of the null to
+ the outside world.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ if mutable_null == True:
+ ret_null = copy.deepcopy(null)
+ else:
+ ret_null = null
+ value = getattr(self, "_%s_value" % name, ret_null)
+ return value
+ def _fset(self, value):
+ setattr(self, "_%s_value" % name, value)
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+def settings_property(name, null=None):
+ """
+ Similar to local_property, except where local_property stores the
+ value in instance._<name>_value, settings_property stores the
+ value in instance.settings[name].
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ value = self.settings.get(name, null)
+ return value
+ def _fset(self, value):
+ self.settings[name] = value
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+
+# Allow comparison and caching with _original_ values for mutables,
+# since
+#
+# >>> a = []
+# >>> b = a
+# >>> b.append(1)
+# >>> a
+# [1]
+# >>> a==b
+# True
+def _hash_mutable_value(value):
+ return repr(value)
+def _init_mutable_property_cache(self):
+ if not hasattr(self, "_mutable_property_cache_hash"):
+ # first call to _fget for any mutable property
+ self._mutable_property_cache_hash = {}
+ self._mutable_property_cache_copy = {}
+def _set_cached_mutable_property(self, cacher_name, property_name, value):
+ _init_mutable_property_cache(self)
+ self._mutable_property_cache_hash[(cacher_name, property_name)] = \
+ _hash_mutable_value(value)
+ self._mutable_property_cache_copy[(cacher_name, property_name)] = \
+ copy.deepcopy(value)
+def _get_cached_mutable_property(self, cacher_name, property_name, default=None):
+ _init_mutable_property_cache(self)
+ if (cacher_name, property_name) not in self._mutable_property_cache_copy:
+ return default
+ return self._mutable_property_cache_copy[(cacher_name, property_name)]
+def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None):
+ _init_mutable_property_cache(self)
+ if (cacher_name, property_name) not in self._mutable_property_cache_hash:
+ _set_cached_mutable_property(self, cacher_name, property_name, default)
+ old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)]
+ return cmp(_hash_mutable_value(value), old_hash)
+
+
+def defaulting_property(default=None, null=None,
+ mutable_default=False):
+ """
+ Define a default value for get access to a property.
+ If the stored value is null, then default is returned.
+
+ If mutable_default == True, we only release deepcopies of the
+ default to the outside world.
+
+ null should never escape to the outside world, so don't worry
+ about it being a mutable.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value == null:
+ if mutable_default == True:
+ return copy.deepcopy(default)
+ else:
+ return default
+ return value
+ def _fset(self, value):
+ if value == default:
+ value = null
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def fn_checked_property(value_allowed_fn):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ return value
+ def _fset(self, value):
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def checked_property(allowed=[]):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ return value
+ def _fset(self, value):
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def cached_property(generator, initVal=None, mutable=False):
+ """
+ Allow caching of values generated by generator(instance), where
+ instance is the instance to which this property belongs. Uses
+ ._<name>_cache to store a cache flag for a particular owner
+ instance.
+
+ When the cache flag is True or missing and the stored value is
+ initVal, the first fget call triggers the generator function,
+ whose output is stored in _<name>_cached_value. That and
+ subsequent calls to fget will return this cached value.
+
+ If the input value is no longer initVal (e.g. a value has been
+ loaded from disk or set with fset), that value overrides any
+ cached value, and this property has no effect.
+
+ When the cache flag is False and the stored value is initVal, the
+ generator is not cached, but is called on every fget.
+
+ The cache flag is missing on initialization. Particular instances
+ may override by setting their own flag.
+
+ In the case that mutable == True, all caching is disabled and the
+ generator is called whenever the cached value would otherwise be
+ used.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ cache = getattr(self, "_%s_cache" % name, True)
+ value = fget(self)
+ if value == initVal:
+ if cache == True and mutable == False:
+ if hasattr(self, "_%s_cached_value" % name):
+ value = getattr(self, "_%s_cached_value" % name)
+ else:
+ value = generator(self)
+ setattr(self, "_%s_cached_value" % name, value)
+ else:
+ value = generator(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def primed_property(primer, initVal=None, unprimeableVal=None):
+ """
+ Just like a cached_property, except that instead of returning a
+ new value and running fset to cache it, the primer attempts some
+ background manipulation (e.g. loads data into instance.settings)
+ such that a _second_ pass through fget succeeds. If the second
+ pass doesn't succeed (e.g. no readable storage), we give up and
+ return unprimeableVal.
+
+ The 'cache' flag becomes a 'prime' flag, with priming taking place
+ whenever ._<name>_prime is True, or is False or missing and
+ value == initVal.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ prime = getattr(self, "_%s_prime" % name, False)
+ if prime == False:
+ value = fget(self)
+ if prime == True or (prime == False and value == initVal):
+ primer(self)
+ value = fget(self)
+ if prime == False and value == initVal:
+ return unprimeableVal
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def change_hook_property(hook, mutable=False, default=None):
+ """Call the function `hook` whenever a value different from the
+ current value is set.
+
+ This is useful for saving changes to disk, etc. This function is
+ called *after* the new value has been stored, allowing you to
+ change the stored value if you want.
+
+ In the case of mutables, things are slightly trickier. Because
+ the property-owning class has no way of knowing when the value
+ changes. We work around this by caching a private deepcopy of the
+ mutable value, and checking for changes whenever the property is
+ set (obviously) or retrieved (to check for external changes). So
+ long as you're conscientious about accessing the property after
+ making external modifications, mutability won't be a problem::
+
+ t.x.append(5) # external modification
+ t.x # dummy access notices change and triggers hook
+
+ See :class:`testChangeHookMutableProperty` for an example of the
+ expected behavior.
+
+ Parameters
+ ----------
+ hook : fn
+ `hook(instance, old_value, new_value)`, where `instance` is a
+ reference to the class instance to which this property belongs.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self, new_value=None, from_fset=False): # only used if mutable == True
+ if from_fset == True:
+ value = new_value # compare new value with cached
+ else:
+ value = fget(self) # compare current value with cached
+ if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0:
+ # there has been a change, cache new value
+ old_value = _get_cached_mutable_property(self, "change hook property", name, default)
+ _set_cached_mutable_property(self, "change hook property", name, value)
+ if from_fset == True: # return previously cached value
+ value = old_value
+ else: # the value changed while we weren't looking
+ hook(self, old_value, value)
+ return value
+ def _fset(self, value):
+ if mutable == True: # get cached previous value
+ old_value = _fget(self, new_value=value, from_fset=True)
+ else:
+ old_value = fget(self)
+ fset(self, value)
+ if value != old_value:
+ hook(self, old_value, value)
+ if mutable == True:
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+if libbe.TESTING == True:
+ class DecoratorTests(unittest.TestCase):
+ def testLocalDoc(self):
+ class Test(object):
+ @Property
+ @doc_property("A fancy property")
+ def x():
+ return {}
+ self.failUnless(Test.x.__doc__ == "A fancy property",
+ Test.x.__doc__)
+ def testLocalProperty(self):
+ class Test(object):
+ @Property
+ @local_property(name="LOCAL")
+ def x():
+ return {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("_LOCAL_value" in dir(t), dir(t))
+ self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
+ def testSettingsProperty(self):
+ class Test(object):
+ @Property
+ @settings_property(name="attr")
+ def x():
+ return {}
+ def __init__(self):
+ self.settings = {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("attr" in t.settings, t.settings)
+ self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
+ def testDefaultingLocalProperty(self):
+ class Test(object):
+ @Property
+ @defaulting_property(default='y', null='x')
+ @local_property(name="DEFAULT", null=5)
+ def x(): return {}
+ t = Test()
+ self.failUnless(t.x == 5, str(t.x))
+ t.x = 'x'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'y'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'z'
+ self.failUnless(t.x == 'z', str(t.x))
+ t.x = 5
+ self.failUnless(t.x == 5, str(t.x))
+ def testCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testTwoCheckedLocalProperties(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="X")
+ def x(): return {}
+
+ @Property
+ @checked_property(allowed=['a', 'b', 'c'])
+ @local_property(name="A")
+ def a(): return {}
+ def __init__(self):
+ self._A_value = 'a'
+ self._X_value = 'x'
+ t = Test()
+ try:
+ t.x = 'a'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.x = 'x'
+ t.x = 'y'
+ t.x = 'z'
+ try:
+ t.a = 'x'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.a = 'a'
+ t.a = 'b'
+ t.a = 'c'
+ def testFnCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @fn_checked_property(lambda v : v in ['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testCachedLocalProperty(self):
+ class Gen(object):
+ def __init__(self):
+ self.i = 0
+ def __call__(self, owner):
+ self.i += 1
+ return self.i
+ class Test(object):
+ @Property
+ @cached_property(generator=Gen(), initVal=None)
+ @local_property(name="CACHED")
+ def x(): return {}
+ t = Test()
+ self.failIf("_CACHED_cache" in dir(t),
+ getattr(t, "_CACHED_cache", None))
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ t.x = 8
+ self.failUnless(t.x == 8, t.x)
+ self.failUnless(t.x == 8, t.x)
+ t._CACHED_cache = False # Caching is off, but the stored value
+ val = t.x # is 8, not the initVal (None), so we
+ self.failUnless(val == 8, val) # get 8.
+ t._CACHED_value = None # Now we've set the stored value to None
+ val = t.x # so future calls to fget (like this)
+ self.failUnless(val == 2, val) # will call the generator every time...
+ val = t.x
+ self.failUnless(val == 3, val)
+ val = t.x
+ self.failUnless(val == 4, val)
+ t._CACHED_cache = True # We turn caching back on, and get
+ self.failUnless(t.x == 1, str(t.x)) # the original cached value.
+ del t._CACHED_cached_value # Removing that value forces a
+ self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call
+ self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which
+ self.failUnless(t.x == 5, str(t.x)) # we get the new cached value.
+ def testPrimedLocalProperty(self):
+ class Test(object):
+ def prime(self):
+ self.settings["PRIMED"] = self.primeVal
+ @Property
+ @primed_property(primer=prime, initVal=None, unprimeableVal=2)
+ @settings_property(name="PRIMED")
+ def x(): return {}
+ def __init__(self):
+ self.settings={}
+ self.primeVal = "initialized"
+ t = Test()
+ self.failIf("_PRIMED_prime" in dir(t),
+ getattr(t, "_PRIMED_prime", None))
+ self.failUnless(t.x == "initialized", t.x)
+ t.x = 1
+ self.failUnless(t.x == 1, t.x)
+ t.x = None
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = True
+ t.x = 3
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = False
+ t.x = 3
+ self.failUnless(t.x == 3, t.x)
+ # test unprimableVal
+ t.x = None
+ t.primeVal = None
+ self.failUnless(t.x == 2, t.x)
+ def testChangeHookLocalProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+
+ @Property
+ @change_hook_property(_hook)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 2
+ self.failUnless(t.old == 1, t.old)
+ self.failUnless(t.new == 2, t.new)
+ def testChangeHookMutableProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+ self.hook_calls += 1
+
+ @Property
+ @change_hook_property(_hook, mutable=True)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.hook_calls = 0
+ t.x = []
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == [], t.new)
+ self.failUnless(t.hook_calls == 1, t.hook_calls)
+ a = t.x
+ a.append(5)
+ t.x = a
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 2, t.hook_calls)
+ t.x = []
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [], t.new)
+ self.failUnless(t.hook_calls == 3, t.hook_calls)
+ # now append without reassigning. this doesn't trigger the
+ # change, since we don't ever set t.x, only get it and mess
+ # with it. It does, however, update our t.new, since t.new =
+ # t.x and is not a static copy.
+ t.x.append(5)
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 3, t.hook_calls)
+ # however, the next t.x get _will_ notice the change...
+ a = t.x
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 4, t.hook_calls)
+ t.x.append(6) # this append(6) is not noticed yet
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5,6], t.new)
+ self.failUnless(t.hook_calls == 4, t.hook_calls)
+ # this append(7) is not noticed, but the t.x get causes the
+ # append(6) to be noticed
+ t.x.append(7)
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [5,6,7], t.new)
+ self.failUnless(t.hook_calls == 5, t.hook_calls)
+ a = t.x # now the append(7) is noticed
+ self.failUnless(t.old == [5,6], t.old)
+ self.failUnless(t.new == [5,6,7], t.new)
+ self.failUnless(t.hook_calls == 6, t.hook_calls)
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py
new file mode 100644
index 0000000..6e4da55
--- /dev/null
+++ b/libbe/storage/util/settings_object.py
@@ -0,0 +1,617 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Provides :class:`SavedSettingsObject` implementing settings-dict
+based property storage.
+
+See Also
+--------
+:mod:`libbe.storage.util.properties` : underlying property definitions
+"""
+
+import libbe
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+class _Token (object):
+ """`Control' value class for properties.
+
+ We want values that only mean something to the `settings_object`
+ module.
+ """
+ pass
+
+class UNPRIMED (_Token):
+ "Property has not been primed (loaded)."
+ pass
+
+class EMPTY (_Token):
+ """Property has been primed but has no user-set value, so use
+ default/generator value.
+ """
+ pass
+
+
+def prop_save_settings(self, old, new):
+ """The default action undertaken when a property changes.
+ """
+ if self.storage != None and self.storage.is_writeable():
+ self.save_settings()
+
+def prop_load_settings(self):
+ """The default action undertaken when an UNPRIMED property is
+ accessed.
+
+ Attempt to run `.load_settings()`, which calls
+ `._setup_saved_settings()` internally. If `.storage` is
+ inaccessible, don't do anything.
+ """
+ if self.storage != None and self.storage.is_readable():
+ self.load_settings()
+
+# Some name-mangling routines for pretty printing setting names
+def setting_name_to_attr_name(self, name):
+ """Convert keys to the `.settings` dict into their associated
+ SavedSettingsObject attribute names.
+
+ Examples
+ --------
+
+ >>> print setting_name_to_attr_name(None,"User-id")
+ user_id
+
+ See Also
+ --------
+ attr_name_to_setting_name : inverse
+ """
+ return name.lower().replace('-', '_')
+
+def attr_name_to_setting_name(self, name):
+ """Convert SavedSettingsObject attribute names to `.settings` dict
+ keys.
+
+ Examples:
+
+ >>> print attr_name_to_setting_name(None, "user_id")
+ User-id
+
+ See Also
+ --------
+ setting_name_to_attr_name : inverse
+ """
+ return name.capitalize().replace('_', '-')
+
+
+def versioned_property(name, doc,
+ default=None, generator=None,
+ change_hook=prop_save_settings,
+ mutable=False,
+ primer=prop_load_settings,
+ allowed=None, check_fn=None,
+ settings_properties=[],
+ required_saved_properties=[],
+ require_save=False):
+ """Combine the common decorators in a single function.
+
+ Use zero or one (but not both) of default or generator, since a
+ working default will keep the generator from functioning. Use the
+ default if you know what you want the default value to be at
+ 'coding time'. Use the generator if you can write a function to
+ determine a valid default at run time. If both default and
+ generator are None, then the property will be a defaulting
+ property which defaults to None.
+
+ allowed and check_fn have a similar relationship, although you can
+ use both of these if you want. allowed compares the proposed
+ value against a list determined at 'coding time' and check_fn
+ allows more flexible comparisons to take place at run time.
+
+ Set require_save to True if you want to save the default/generated
+ value for a property, to protect against future changes. E.g., we
+ currently expect all comments to be 'text/plain' but in the future
+ we may want to default to 'text/html'. If we don't want the old
+ comments to be interpreted as 'text/html', we would require that
+ the content type be saved.
+
+ change_hook, primer, settings_properties, and
+ required_saved_properties are only options to get their defaults
+ into our local scope. Don't mess with them.
+
+ Set mutable=True if:
+
+ * default is a mutable
+ * your generator function may return mutables
+ * you set change_hook and might have mutable property values
+
+ See the docstrings in `libbe.properties` for details on how each of
+ these cases are handled.
+
+ The value stored in `.settings[name]` will be
+
+ * no value (or UNPRIMED) if the property has been neither set,
+ nor loaded as blank.
+ * EMPTY if the value has been loaded as blank.
+ * some value if the property has been either loaded or set.
+ """
+ settings_properties.append(name)
+ if require_save == True:
+ required_saved_properties.append(name)
+ def decorator(funcs):
+ fulldoc = doc
+ if default != None or generator == None:
+ defaulting = defaulting_property(default=default, null=EMPTY,
+ mutable_default=mutable)
+ fulldoc += "\n\nThis property defaults to %s." % default
+ if generator != None:
+ cached = cached_property(generator=generator, initVal=EMPTY,
+ mutable=mutable)
+ fulldoc += "\n\nThis property is generated with %s." % generator
+ if check_fn != None:
+ fn_checked = fn_checked_property(value_allowed_fn=check_fn)
+ fulldoc += "\n\nThis property is checked with %s." % check_fn
+ if allowed != None:
+ checked = checked_property(allowed=allowed)
+ fulldoc += "\n\nThe allowed values for this property are: %s." \
+ % (', '.join(allowed))
+ hooked = change_hook_property(hook=change_hook, mutable=mutable,
+ default=EMPTY)
+ primed = primed_property(primer=primer, initVal=UNPRIMED,
+ unprimeableVal=EMPTY)
+ settings = settings_property(name=name, null=UNPRIMED)
+ docp = doc_property(doc=fulldoc)
+ deco = hooked(primed(settings(docp(funcs))))
+ if default != None or generator == None:
+ deco = defaulting(deco)
+ if generator != None:
+ deco = cached(deco)
+ if check_fn != None:
+ deco = fn_checked(deco)
+ if allowed != None:
+ deco = checked(deco)
+ return Property(deco)
+ return decorator
+
+class SavedSettingsObject(object):
+ """Setup a framework for lazy saving and loading of `.settings`
+ properties.
+
+ This is useful for BE objects with saved properties
+ (e.g. :class:`~libbe.bugdir.BugDir`, :class:`~libbe.bug.Bug`,
+ :class:`~libbe.comment.Comment`). For example usage, consider the
+ unittests at the end of the module.
+
+ See Also
+ --------
+ versioned_property, prop_save_settings, prop_load_settings
+ setting_name_to_attr_name, attr_name_to_setting_name
+ """
+ # Keep a list of properties that may be stored in the .settings dict.
+ #settings_properties = []
+
+ # A list of properties that we save to disk, even if they were
+ # never set (in which case we save the default value). This
+ # protects against future changes in default values.
+ #required_saved_properties = []
+
+ _setting_name_to_attr_name = setting_name_to_attr_name
+ _attr_name_to_setting_name = attr_name_to_setting_name
+
+ def __init__(self):
+ self.storage = None
+ self.settings = {}
+
+ def load_settings(self):
+ """Load the settings from disk."""
+ # Override. Must call ._setup_saved_settings({}) with
+ # from-storage settings.
+ self._setup_saved_settings({})
+
+ def _setup_saved_settings(self, settings=None):
+ """
+ Sets up a settings dict loaded from storage. Fills in
+ all missing settings entries with EMPTY.
+ """
+ if settings == None:
+ settings = {}
+ for property in self.settings_properties:
+ if property not in self.settings \
+ or self.settings[property] == UNPRIMED:
+ if property in settings:
+ self.settings[property] = settings[property]
+ else:
+ self.settings[property] = EMPTY
+
+ def save_settings(self):
+ """Save the settings to disk."""
+ # Override. Should save the dict output of ._get_saved_settings()
+ settings = self._get_saved_settings()
+ pass # write settings to disk....
+
+ def _get_saved_settings(self):
+ """
+ In order to avoid overwriting unread on-disk data, make sure
+ we've loaded anything sitting on the disk. In the current
+ implementation, all the settings are stored in a single file,
+ so we need to load _all_ the saved settings. Another approach
+ would be per-setting saves, in which case you could skip this
+ step, since any setting changes would have forced that setting
+ load already.
+ """
+ settings = {}
+ for k in self.settings_properties: # force full load
+ if not k in self.settings or self.settings[k] == UNPRIMED:
+ value = getattr(
+ self, self._setting_name_to_attr_name(k))
+ for k in self.settings_properties:
+ if k in self.settings and self.settings[k] != EMPTY:
+ settings[k] = self.settings[k]
+ elif k in self.required_saved_properties:
+ settings[k] = getattr(
+ self, self._setting_name_to_attr_name(k))
+ return settings
+
+ def clear_cached_setting(self, setting=None):
+ "If setting=None, clear *all* cached settings"
+ if setting != None:
+ if hasattr(self, "_%s_cached_value" % setting):
+ delattr(self, "_%s_cached_value" % setting)
+ else:
+ for setting in settings_properties:
+ self.clear_cached_setting(setting)
+
+
+if libbe.TESTING == True:
+ import copy
+
+ class TestStorage (list):
+ def __init__(self):
+ list.__init__(self)
+ self.readable = True
+ self.writeable = True
+ def is_readable(self):
+ return self.readable
+ def is_writeable(self):
+ return self.writeable
+
+ class TestObject (SavedSettingsObject):
+ def load_settings(self):
+ self.load_count += 1
+ if len(self.storage) == 0:
+ settings = {}
+ else:
+ settings = copy.deepcopy(self.storage[-1])
+ self._setup_saved_settings(settings)
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self.storage.append(copy.deepcopy(settings))
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ self.load_count = 0
+ self.storage = TestStorage()
+
+ class SavedSettingsObjectTests(unittest.TestCase):
+ def testSimplePropertyDoc(self):
+ """Testing a minimal versioned property docstring"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="Content-type",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def content_type(): return {}
+ expected = "A test property\n\nThis property defaults to None."
+ self.failUnless(Test.content_type.__doc__ == expected,
+ Test.content_type.__doc__)
+ def testSimplePropertyFromMemory(self):
+ """Testing a minimal versioned property from memory"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="Content-type",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def content_type(): return {}
+ t = Test()
+ self.failUnless(len(t.settings) == 0, len(t.settings))
+ # accessing t.content_type triggers the priming, but
+ # t.storage.is_readable() == False, so nothing happens.
+ t.storage.readable = False
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(t.settings == {}, t.settings)
+ self.failUnless(len(t.settings) == 0, len(t.settings))
+ self.failUnless(t.content_type == None, t.content_type)
+ # accessing t.content_type triggers the priming again, and
+ # now that t.storage.is_readable() == True, this fills out
+ # t.settings with EMPTY data. At this point there should
+ # be one load and no saves.
+ t.storage.readable = True
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ # an explicit call to load settings forces a reload,
+ # but nothing else changes.
+ t.load_settings()
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(t.load_count == 2, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ # now we set a value
+ t.content_type = 5
+ self.failUnless(t.settings["Content-type"] == 5,
+ t.settings["Content-type"])
+ self.failUnless(t.load_count == 2, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':5}], t.storage)
+ # getting its value changes nothing
+ self.failUnless(t.content_type == 5, t.content_type)
+ self.failUnless(t.settings["Content-type"] == 5,
+ t.settings["Content-type"])
+ self.failUnless(t.load_count == 2, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':5}], t.storage)
+ # now we set another value
+ t.content_type = "text/plain"
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings["Content-type"] == "text/plain",
+ t.settings["Content-type"])
+ self.failUnless(t.load_count == 2, t.load_count)
+ self.failUnless(len(t.storage) == 2, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':5},
+ {'Content-type':'text/plain'}],
+ t.storage)
+ # t._get_saved_settings() returns a dict of required or
+ # non-default values.
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ # now we clear to the post-primed value
+ t.content_type = EMPTY
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == {},
+ t._get_saved_settings())
+ self.failUnless(t.storage == [{'Content-type':5},
+ {'Content-type':'text/plain'},
+ {}],
+ t.storage)
+ def testSimplePropertyFromStorage(self):
+ """Testing a minimal versioned property from storage"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="prop-a",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def prop_a(): return {}
+ @versioned_property(
+ name="prop-b",
+ doc="Another test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def prop_b(): return {}
+ t = Test()
+ t.storage.append({'prop-a':'saved'})
+ # setting prop-b forces a load (to check for changes),
+ # which also pulls in prop-a.
+ t.prop_b = 'new-b'
+ settings = {'prop-b':'new-b', 'prop-a':'saved'}
+ self.failUnless(t.settings == settings, t.settings)
+ self.failUnless(t._get_saved_settings() == settings,
+ t._get_saved_settings())
+ # test that _get_saved_settings() works even when settings
+ # were _not_ loaded beforehand
+ t = Test()
+ t.storage.append({'prop-a':'saved'})
+ settings ={'prop-a':'saved'}
+ self.failUnless(t.settings == {}, t.settings)
+ self.failUnless(t._get_saved_settings() == settings,
+ t._get_saved_settings())
+ def testSimplePropertySetStorageSave(self):
+ """Set a property, then attach storage and save"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="prop-a",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def prop_a(): return {}
+ @versioned_property(
+ name="prop-b",
+ doc="Another test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def prop_b(): return {}
+ t = Test()
+ storage = t.storage
+ t.storage = None
+ t.prop_a = 'text/html'
+ t.storage = storage
+ t.save_settings()
+ self.failUnless(t.prop_a == 'text/html', t.prop_a)
+ self.failUnless(t.settings == {'prop-a':'text/html',
+ 'prop-b':EMPTY},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'prop-a':'text/html'}],
+ t.storage)
+ def testDefaultingProperty(self):
+ """Testing a defaulting versioned property"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def content_type(): return {}
+ t = Test()
+ self.failUnless(t.settings == {}, t.settings)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings == {"Content-type":EMPTY},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.failUnless(t._get_saved_settings() == {},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t.content_type == "text/html",
+ t.content_type)
+ self.failUnless(t.settings == {"Content-type":"text/html"},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ t.storage)
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testRequiredDefaultingProperty(self):
+ """Testing a required defaulting versioned property"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ require_save=True)
+ def content_type(): return {}
+ t = Test()
+ self.failUnless(t.settings == {}, t.settings)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings == {"Content-type":EMPTY},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t.content_type == "text/html",
+ t.content_type)
+ self.failUnless(t.settings == {"Content-type":"text/html"},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ t.storage)
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testClassVersionedPropertyDefinition(self):
+ """Testing a class-specific _versioned property decorator"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ def _versioned_property(
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"] = \
+ required_saved_properties
+ return versioned_property(**kwargs)
+ @_versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ require_save=True)
+ def content_type(): return {}
+ t = Test()
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ t.storage)
+ def testMutableChangeHookedProperty(self):
+ """Testing a mutable change-hooked property"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="List-type",
+ doc="A test property",
+ mutable=True,
+ change_hook=prop_save_settings,
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def list_type(): return {}
+ t = Test()
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.failUnless(t.list_type == None, t.list_type)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.failUnless(t.settings["List-type"]==EMPTY,
+ t.settings["List-type"])
+ t.list_type = []
+ self.failUnless(t.settings["List-type"] == [],
+ t.settings["List-type"])
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'List-type':[]}],
+ t.storage)
+ t.list_type.append(5) # external modification not detected yet
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'List-type':[]}],
+ t.storage)
+ self.failUnless(t.settings["List-type"] == [5],
+ t.settings["List-type"])
+ self.failUnless(t.list_type == [5], t.list_type)# get triggers save
+ self.failUnless(len(t.storage) == 2, len(t.storage))
+ self.failUnless(t.storage == [{'List-type':[]},
+ {'List-type':[5]}],
+ t.storage)
+
+ unitsuite = unittest.TestLoader().loadTestsFromTestCase( \
+ SavedSettingsObjectTests)
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/util/upgrade.py b/libbe/storage/util/upgrade.py
new file mode 100644
index 0000000..f3c4912
--- /dev/null
+++ b/libbe/storage/util/upgrade.py
@@ -0,0 +1,331 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Handle conversion between the various BE storage formats.
+"""
+
+import codecs
+import os, os.path
+import sys
+
+import libbe
+import libbe.bug
+import libbe.storage.util.mapfile as mapfile
+from libbe.storage import STORAGE_VERSIONS, STORAGE_VERSION
+#import libbe.storage.vcs # delay import to avoid cyclic dependency
+import libbe.ui.util.editor
+import libbe.util
+import libbe.util.encoding as encoding
+import libbe.util.id
+
+
+class Upgrader (object):
+ "Class for converting between different on-disk BE storage formats."
+ initial_version = None
+ final_version = None
+ def __init__(self, repo):
+ import libbe.storage.vcs
+
+ self.repo = repo
+ vcs_name = self._get_vcs_name()
+ if vcs_name == None:
+ vcs_name = 'None'
+ self.vcs = libbe.storage.vcs.vcs_by_name(vcs_name)
+ self.vcs.repo = self.repo
+ self.vcs.root()
+
+ def get_path(self, *args):
+ """
+ Return the absolute path using args relative to .be.
+ """
+ dir = os.path.join(self.repo, '.be')
+ if len(args) == 0:
+ return dir
+ return os.path.join(dir, *args)
+
+ def _get_vcs_name(self):
+ return None
+
+ def check_initial_version(self):
+ path = self.get_path('version')
+ version = encoding.get_file_contents(path, decode=True).rstrip('\n')
+ assert version == self.initial_version, '%s: %s' % (path, version)
+
+ def set_version(self):
+ path = self.get_path('version')
+ encoding.set_file_contents(path, self.final_version+'\n')
+ self.vcs._vcs_update(path)
+
+ def upgrade(self):
+ print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \
+ % (self.initial_version, self.final_version)
+ self.check_initial_version()
+ self.set_version()
+ self._upgrade()
+
+ def _upgrade(self):
+ raise NotImplementedError
+
+
+class Upgrade_1_0_to_1_1 (Upgrader):
+ initial_version = "Bugs Everywhere Tree 1 0"
+ final_version = "Bugs Everywhere Directory v1.1"
+ def _get_vcs_name(self):
+ path = self.get_path('settings')
+ settings = encoding.get_file_contents(path)
+ for line in settings.splitlines(False):
+ fields = line.split('=')
+ if len(fields) == 2 and fields[0] == 'rcs_name':
+ return fields[1]
+ return None
+
+ def _upgrade_mapfile(self, path):
+ contents = encoding.get_file_contents(path, decode=True)
+ old_format = False
+ for line in contents.splitlines():
+ if len(line.split('=')) == 2:
+ old_format = True
+ break
+ if old_format == True:
+ # translate to YAML.
+ newlines = []
+ for line in contents.splitlines():
+ line = line.rstrip('\n')
+ if len(line) == 0:
+ continue
+ fields = line.split("=")
+ if len(fields) == 2:
+ key,value = fields
+ newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
+ else:
+ newlines.append(line)
+ contents = '\n'.join(newlines)
+ # load the YAML and save
+ map = mapfile.parse(contents)
+ contents = mapfile.generate(map)
+ encoding.set_file_contents(path, contents)
+ self.vcs._vcs_update(path)
+
+ def _upgrade(self):
+ """
+ Comment value field "From" -> "Author".
+ Homegrown mapfile -> YAML.
+ """
+ path = self.get_path('settings')
+ self._upgrade_mapfile(path)
+ for bug_uuid in os.listdir(self.get_path('bugs')):
+ path = self.get_path('bugs', bug_uuid, 'values')
+ self._upgrade_mapfile(path)
+ c_path = ['bugs', bug_uuid, 'comments']
+ if not os.path.exists(self.get_path(*c_path)):
+ continue # no comments for this bug
+ for comment_uuid in os.listdir(self.get_path(*c_path)):
+ path_list = c_path + [comment_uuid, 'values']
+ path = self.get_path(*path_list)
+ self._upgrade_mapfile(path)
+ settings = mapfile.parse(
+ encoding.get_file_contents(path))
+ if 'From' in settings:
+ settings['Author'] = settings.pop('From')
+ encoding.set_file_contents(
+ path, mapfile.generate(settings))
+ self.vcs._vcs_update(path)
+
+
+class Upgrade_1_1_to_1_2 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.1"
+ final_version = "Bugs Everywhere Directory v1.2"
+ def _get_vcs_name(self):
+ path = self.get_path('settings')
+ settings = mapfile.parse(encoding.get_file_contents(path))
+ if 'rcs_name' in settings:
+ return settings['rcs_name']
+ return None
+
+ def _upgrade(self):
+ """
+ BugDir settings field "rcs_name" -> "vcs_name".
+ """
+ path = self.get_path('settings')
+ settings = mapfile.parse(encoding.get_file_contents(path))
+ if 'rcs_name' in settings:
+ settings['vcs_name'] = settings.pop('rcs_name')
+ encoding.set_file_contents(path, mapfile.generate(settings))
+ self.vcs._vcs_update(path)
+
+class Upgrade_1_2_to_1_3 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.2"
+ final_version = "Bugs Everywhere Directory v1.3"
+ def __init__(self, *args, **kwargs):
+ Upgrader.__init__(self, *args, **kwargs)
+ self._targets = {} # key: target text,value: new target bug
+
+ def _get_vcs_name(self):
+ path = self.get_path('settings')
+ settings = mapfile.parse(encoding.get_file_contents(path))
+ if 'vcs_name' in settings:
+ return settings['vcs_name']
+ return None
+
+ def _save_bug_settings(self, bug):
+ # The target bugs don't have comments
+ path = self.get_path('bugs', bug.uuid, 'values')
+ if not os.path.exists(path):
+ self.vcs._add_path(path, directory=False)
+ path = self.get_path('bugs', bug.uuid, 'values')
+ mf = mapfile.generate(bug._get_saved_settings())
+ encoding.set_file_contents(path, mf)
+ self.vcs._vcs_update(path)
+
+ def _target_bug(self, target_text):
+ if target_text not in self._targets:
+ bug = libbe.bug.Bug(summary=target_text)
+ bug.severity = 'target'
+ self._targets[target_text] = bug
+ return self._targets[target_text]
+
+ def _upgrade_bugdir_mapfile(self):
+ path = self.get_path('settings')
+ mf = encoding.get_file_contents(path)
+ if mf == libbe.util.InvalidObject:
+ return # settings file does not exist
+ settings = mapfile.parse(mf)
+ if 'target' in settings:
+ settings['target'] = self._target_bug(settings['target']).uuid
+ mf = mapfile.generate(settings)
+ encoding.set_file_contents(path, mf)
+ self.vcs._vcs_update(path)
+
+ def _upgrade_bug_mapfile(self, bug_uuid):
+ import libbe.command.depend as dep
+ path = self.get_path('bugs', bug_uuid, 'values')
+ mf = encoding.get_file_contents(path)
+ if mf == libbe.util.InvalidObject:
+ return # settings file does not exist
+ settings = mapfile.parse(mf)
+ if 'target' in settings:
+ target_bug = self._target_bug(settings['target'])
+
+ blocked_by_string = '%s%s' % (dep.BLOCKED_BY_TAG, bug_uuid)
+ dep._add_remove_extra_string(target_bug, blocked_by_string, add=True)
+ blocks_string = dep._generate_blocks_string(target_bug)
+ estrs = settings.get('extra_strings', [])
+ estrs.append(blocks_string)
+ settings['extra_strings'] = sorted(estrs)
+
+ settings.pop('target')
+ mf = mapfile.generate(settings)
+ encoding.set_file_contents(path, mf)
+ self.vcs._vcs_update(path)
+
+ def _upgrade(self):
+ """
+ Bug value field "target" -> target bugs.
+ Bugdir value field "target" -> pointer to current target bug.
+ """
+ for bug_uuid in os.listdir(self.get_path('bugs')):
+ self._upgrade_bug_mapfile(bug_uuid)
+ self._upgrade_bugdir_mapfile()
+ for bug in self._targets.values():
+ self._save_bug_settings(bug)
+
+class Upgrade_1_3_to_1_4 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.3"
+ final_version = "Bugs Everywhere Directory v1.4"
+ def _get_vcs_name(self):
+ path = self.get_path('settings')
+ settings = mapfile.parse(encoding.get_file_contents(path))
+ if 'vcs_name' in settings:
+ return settings['vcs_name']
+ return None
+
+ def _upgrade(self):
+ """
+ add new directory "./be/BUGDIR-UUID"
+ "./be/bugs" -> "./be/BUGDIR-UUID/bugs"
+ "./be/settings" -> "./be/BUGDIR-UUID/settings"
+ """
+ self.repo = os.path.abspath(self.repo)
+ basenames = [p for p in os.listdir(self.get_path())]
+ if not 'bugs' in basenames and not 'settings' in basenames \
+ and len([p for p in basenames if len(p)==36]) == 1:
+ return # the user has upgraded the directory.
+ basenames = [p for p in basenames if p in ['bugs','settings']]
+ uuid = libbe.util.id.uuid_gen()
+ add = [self.get_path(uuid)]
+ move = [(self.get_path(p), self.get_path(uuid, p)) for p in basenames]
+ msg = ['Upgrading BE directory version v1.3 to v1.4',
+ '',
+ "Because BE's VCS drivers don't support 'move',",
+ 'please make the following changes with your VCS',
+ 'and re-run BE. Note that you can choose a different',
+ 'bugdir UUID to preserve uniformity across branches',
+ 'of a distributed repository.'
+ '',
+ 'add',
+ ' ' + '\n '.join(add),
+ 'move',
+ ' ' + '\n '.join(['%s %s' % (a,b) for a,b in move]),
+ ]
+ self.vcs._cached_path_id.destroy()
+ raise Exception('Need user assistance\n%s' % '\n'.join(msg))
+
+
+upgraders = [Upgrade_1_0_to_1_1,
+ Upgrade_1_1_to_1_2,
+ Upgrade_1_2_to_1_3,
+ Upgrade_1_3_to_1_4]
+upgrade_classes = {}
+for upgrader in upgraders:
+ upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader
+
+def upgrade(path, current_version,
+ target_version=STORAGE_VERSION):
+ """
+ Call the appropriate upgrade function to convert current_version
+ to target_version. If a direct conversion function does not exist,
+ use consecutive conversion functions.
+ """
+ if current_version not in STORAGE_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % current_version
+ if target_version not in STORAGE_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % current_version
+
+ if (current_version, target_version) in upgrade_classes:
+ # direct conversion
+ upgrade_class = upgrade_classes[(current_version, target_version)]
+ u = upgrade_class(path)
+ u.upgrade()
+ else:
+ # consecutive single-step conversion
+ i = STORAGE_VERSIONS.index(current_version)
+ while True:
+ version_a = STORAGE_VERSIONS[i]
+ version_b = STORAGE_VERSIONS[i+1]
+ try:
+ upgrade_class = upgrade_classes[(version_a, version_b)]
+ except KeyError:
+ raise NotImplementedError, \
+ "Cannot convert version '%s' to '%s' yet." \
+ % (version_a, version_b)
+ u = upgrade_class(path)
+ u.upgrade()
+ if version_b == target_version:
+ break
+ i += 1