diff options
author | Chris Ball <cjb@laptop.org> | 2010-06-20 19:19:06 -0400 |
---|---|---|
committer | Chris Ball <cjb@laptop.org> | 2010-06-20 19:19:06 -0400 |
commit | 0df4bd7ae194bb07f36a2a69a0549037de01cb52 (patch) | |
tree | ea9128bbbedd8df9b1d6c737f704260874680a6b /libbe/storage/util | |
parent | 429e33fb4c7be8daa791fb744a14024ef27a72c2 (diff) | |
parent | a2a51929a848ffa6db92ec7218994461ecccb50a (diff) | |
download | bugseverywhere-0df4bd7ae194bb07f36a2a69a0549037de01cb52.tar.gz |
Merge with Trevor.
Diffstat (limited to 'libbe/storage/util')
-rw-r--r-- | libbe/storage/util/__init__.py | 0 | ||||
-rw-r--r-- | libbe/storage/util/config.py | 114 | ||||
-rw-r--r-- | libbe/storage/util/mapfile.py | 146 | ||||
-rw-r--r-- | libbe/storage/util/properties.py | 666 | ||||
-rw-r--r-- | libbe/storage/util/settings_object.py | 617 | ||||
-rw-r--r-- | libbe/storage/util/upgrade.py | 331 |
6 files changed, 1874 insertions, 0 deletions
diff --git a/libbe/storage/util/__init__.py b/libbe/storage/util/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/libbe/storage/util/__init__.py diff --git a/libbe/storage/util/config.py b/libbe/storage/util/config.py new file mode 100644 index 0000000..724d2d3 --- /dev/null +++ b/libbe/storage/util/config.py @@ -0,0 +1,114 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Create, save, and load the per-user config file at :func:`path`. +""" + +import ConfigParser +import codecs +import os.path + +import libbe +import libbe.util.encoding +if libbe.TESTING == True: + import doctest + + +default_encoding = libbe.util.encoding.get_filesystem_encoding() +"""Default filesystem encoding. + +Initialized with :func:`libbe.util.encoding.get_filesystem_encoding`. +""" + +def path(): + """Return the path to the per-user config file. + """ + return os.path.expanduser("~/.bugs_everywhere") + +def set_val(name, value, section="DEFAULT", encoding=None): + """Set a value in the per-user config file. + + Parameters + ---------- + name : str + The name of the value to set. + value : str or None + The new value to set (or None to delete the value). + section : str + The section to store the name/value in. + encoding : str + The config file's encoding, defaults to :data:`default_encoding`. + """ + if encoding == None: + encoding = default_encoding + config = ConfigParser.ConfigParser() + if os.path.exists(path()) == False: # touch file or config + open(path(), 'w').close() # read chokes on missing file + f = codecs.open(path(), 'r', encoding) + config.readfp(f, path()) + f.close() + if value is not None: + config.set(section, name, value) + else: + config.remove_option(section, name) + f = codecs.open(path(), 'w', encoding) + config.write(f) + f.close() + +def get_val(name, section="DEFAULT", default=None, encoding=None): + """Get a value from the per-user config file + + Parameters + ---------- + name : str + The name of the value to set. + section : str + The section to store the name/value in. + default : + The value to return if `name` is not set. + encoding : str + The config file's encoding, defaults to :data:`default_encoding`. + + Examples + -------- + + >>> get_val("junk") is None + True + >>> set_val("junk", "random") + >>> get_val("junk") + u'random' + >>> set_val("junk", None) + >>> get_val("junk") is None + True + """ + if os.path.exists(path()): + if encoding == None: + encoding = default_encoding + config = ConfigParser.ConfigParser() + f = codecs.open(path(), 'r', encoding) + config.readfp(f, path()) + f.close() + try: + return config.get(section, name) + except ConfigParser.NoOptionError: + return default + else: + return default + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/storage/util/mapfile.py b/libbe/storage/util/mapfile.py new file mode 100644 index 0000000..55863d7 --- /dev/null +++ b/libbe/storage/util/mapfile.py @@ -0,0 +1,146 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Serializing and deserializing dictionaries of parameters. + +The serialized "mapfiles" should be clear, flat-text strings, and allow +easy merging of independent/conflicting changes. +""" + +import errno +import os.path +import types +import yaml + +import libbe +if libbe.TESTING == True: + import doctest + + +class IllegalKey(Exception): + def __init__(self, key): + Exception.__init__(self, 'Illegal key "%s"' % key) + self.key = key + +class IllegalValue(Exception): + def __init__(self, value): + Exception.__init__(self, 'Illegal value "%s"' % value) + self.value = value + +class InvalidMapfileContents(Exception): + def __init__(self, contents): + Exception.__init__(self, 'Invalid YAML contents') + self.contents = contents + +def generate(map): + """Generate a YAML mapfile content string. + + Examples + -------- + + >>> generate({'q':'p'}) + 'q: p\\n\\n' + >>> generate({'q':u'Fran\u00e7ais'}) + 'q: Fran\\xc3\\xa7ais\\n\\n' + >>> generate({'q':u'hello'}) + 'q: hello\\n\\n' + >>> generate({'q=':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q=" + >>> generate({'q:':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q:" + >>> generate({'q\\n':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "q\\n" + >>> generate({'':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key "" + >>> generate({'>q':'p'}) + Traceback (most recent call last): + IllegalKey: Illegal key ">q" + >>> generate({'q':'p\\n'}) + Traceback (most recent call last): + IllegalValue: Illegal value "p\\n" + + See Also + -------- + parse : inverse + """ + keys = map.keys() + keys.sort() + for key in keys: + try: + assert not key.startswith('>') + assert('\n' not in key) + assert('=' not in key) + assert(':' not in key) + assert(len(key) > 0) + except AssertionError: + raise IllegalKey(unicode(key).encode('unicode_escape')) + if '\n' in map[key]: + raise IllegalValue(unicode(map[key]).encode('unicode_escape')) + + lines = [] + for key in keys: + lines.append(yaml.safe_dump({key: map[key]}, + default_flow_style=False, + allow_unicode=True)) + lines.append('') + return '\n'.join(lines) + +def parse(contents): + """Parse a YAML mapfile string. + + Examples + -------- + + >>> parse('q: p\\n\\n')['q'] + 'p' + >>> parse('q: \\'p\\'\\n\\n')['q'] + 'p' + >>> contents = generate({'a':'b', 'c':'d', 'e':'f'}) + >>> dict = parse(contents) + >>> dict['a'] + 'b' + >>> dict['c'] + 'd' + >>> dict['e'] + 'f' + >>> contents = generate({'q':u'Fran\u00e7ais'}) + >>> dict = parse(contents) + >>> dict['q'] + u'Fran\\xe7ais' + >>> dict = parse('a!') + Traceback (most recent call last): + ... + InvalidMapfileContents: Invalid YAML contents + + See Also + -------- + generate : inverse + + """ + c = yaml.load(contents) + if type(c) == types.StringType: + raise InvalidMapfileContents( + 'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents) + return c or {} + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py new file mode 100644 index 0000000..b5681b1 --- /dev/null +++ b/libbe/storage/util/properties.py @@ -0,0 +1,666 @@ +# Bugs Everywhere - a distributed bugtracker +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Provides a series of useful decorators for defining various types +of properties. + +For example usage, consider the unittests at the end of the module. + +Notes +----- + +See `PEP 318` and Michele Simionato's `decorator documentation` for +more information on decorators. + +.. _PEP 318: http://www.python.org/dev/peps/pep-0318/ +.. _decorator documentation: http://www.phyast.pitt.edu/~micheles/python/documentation.html + +See Also +-------- +:mod:`libbe.storage.util.settings_object` : bundle properties into a convenient package + +""" + +import copy +import types + +import libbe +if libbe.TESTING == True: + import unittest + + +class ValueCheckError (ValueError): + def __init__(self, name, value, allowed): + action = "in" # some list of allowed values + if type(allowed) == types.FunctionType: + action = "allowed by" # some allowed-value check function + msg = "%s not %s %s for %s" % (value, action, allowed, name) + ValueError.__init__(self, msg) + self.name = name + self.value = value + self.allowed = allowed + +def Property(funcs): + """ + End a chain of property decorators, returning a property. + """ + args = {} + args["fget"] = funcs.get("fget", None) + args["fset"] = funcs.get("fset", None) + args["fdel"] = funcs.get("fdel", None) + args["doc"] = funcs.get("doc", None) + + #print "Creating a property with" + #for key, val in args.items(): print key, value + return property(**args) + +def doc_property(doc=None): + """ + Add a docstring to a chain of property decorators. + """ + def decorator(funcs=None): + """ + Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...} + or a function fn() returning such a dict. + """ + if hasattr(funcs, "__call__"): + funcs = funcs() # convert from function-arg to dict + funcs["doc"] = doc + return funcs + return decorator + +def local_property(name, null=None, mutable_null=False): + """ + Define get/set access to per-parent-instance local storage. Uses + ._<name>_value to store the value for a particular owner instance. + If the ._<name>_value attribute does not exist, returns null. + + If mutable_null == True, we only release deepcopies of the null to + the outside world. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget", None) + fset = funcs.get("fset", None) + def _fget(self): + if fget is not None: + fget(self) + if mutable_null == True: + ret_null = copy.deepcopy(null) + else: + ret_null = null + value = getattr(self, "_%s_value" % name, ret_null) + return value + def _fset(self, value): + setattr(self, "_%s_value" % name, value) + if fset is not None: + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + funcs["name"] = name + return funcs + return decorator + +def settings_property(name, null=None): + """ + Similar to local_property, except where local_property stores the + value in instance._<name>_value, settings_property stores the + value in instance.settings[name]. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget", None) + fset = funcs.get("fset", None) + def _fget(self): + if fget is not None: + fget(self) + value = self.settings.get(name, null) + return value + def _fset(self, value): + self.settings[name] = value + if fset is not None: + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + funcs["name"] = name + return funcs + return decorator + + +# Allow comparison and caching with _original_ values for mutables, +# since +# +# >>> a = [] +# >>> b = a +# >>> b.append(1) +# >>> a +# [1] +# >>> a==b +# True +def _hash_mutable_value(value): + return repr(value) +def _init_mutable_property_cache(self): + if not hasattr(self, "_mutable_property_cache_hash"): + # first call to _fget for any mutable property + self._mutable_property_cache_hash = {} + self._mutable_property_cache_copy = {} +def _set_cached_mutable_property(self, cacher_name, property_name, value): + _init_mutable_property_cache(self) + self._mutable_property_cache_hash[(cacher_name, property_name)] = \ + _hash_mutable_value(value) + self._mutable_property_cache_copy[(cacher_name, property_name)] = \ + copy.deepcopy(value) +def _get_cached_mutable_property(self, cacher_name, property_name, default=None): + _init_mutable_property_cache(self) + if (cacher_name, property_name) not in self._mutable_property_cache_copy: + return default + return self._mutable_property_cache_copy[(cacher_name, property_name)] +def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None): + _init_mutable_property_cache(self) + if (cacher_name, property_name) not in self._mutable_property_cache_hash: + _set_cached_mutable_property(self, cacher_name, property_name, default) + old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)] + return cmp(_hash_mutable_value(value), old_hash) + + +def defaulting_property(default=None, null=None, + mutable_default=False): + """ + Define a default value for get access to a property. + If the stored value is null, then default is returned. + + If mutable_default == True, we only release deepcopies of the + default to the outside world. + + null should never escape to the outside world, so don't worry + about it being a mutable. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value == null: + if mutable_default == True: + return copy.deepcopy(default) + else: + return default + return value + def _fset(self, value): + if value == default: + value = null + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def fn_checked_property(value_allowed_fn): + """ + Define allowed values for get/set access to a property. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value_allowed_fn(value) != True: + raise ValueCheckError(name, value, value_allowed_fn) + return value + def _fset(self, value): + if value_allowed_fn(value) != True: + raise ValueCheckError(name, value, value_allowed_fn) + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def checked_property(allowed=[]): + """ + Define allowed values for get/set access to a property. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self): + value = fget(self) + if value not in allowed: + raise ValueCheckError(name, value, allowed) + return value + def _fset(self, value): + if value not in allowed: + raise ValueCheckError(name, value, allowed) + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def cached_property(generator, initVal=None, mutable=False): + """ + Allow caching of values generated by generator(instance), where + instance is the instance to which this property belongs. Uses + ._<name>_cache to store a cache flag for a particular owner + instance. + + When the cache flag is True or missing and the stored value is + initVal, the first fget call triggers the generator function, + whose output is stored in _<name>_cached_value. That and + subsequent calls to fget will return this cached value. + + If the input value is no longer initVal (e.g. a value has been + loaded from disk or set with fset), that value overrides any + cached value, and this property has no effect. + + When the cache flag is False and the stored value is initVal, the + generator is not cached, but is called on every fget. + + The cache flag is missing on initialization. Particular instances + may override by setting their own flag. + + In the case that mutable == True, all caching is disabled and the + generator is called whenever the cached value would otherwise be + used. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + name = funcs.get("name", "<unknown>") + def _fget(self): + cache = getattr(self, "_%s_cache" % name, True) + value = fget(self) + if value == initVal: + if cache == True and mutable == False: + if hasattr(self, "_%s_cached_value" % name): + value = getattr(self, "_%s_cached_value" % name) + else: + value = generator(self) + setattr(self, "_%s_cached_value" % name, value) + else: + value = generator(self) + return value + funcs["fget"] = _fget + return funcs + return decorator + +def primed_property(primer, initVal=None, unprimeableVal=None): + """ + Just like a cached_property, except that instead of returning a + new value and running fset to cache it, the primer attempts some + background manipulation (e.g. loads data into instance.settings) + such that a _second_ pass through fget succeeds. If the second + pass doesn't succeed (e.g. no readable storage), we give up and + return unprimeableVal. + + The 'cache' flag becomes a 'prime' flag, with priming taking place + whenever ._<name>_prime is True, or is False or missing and + value == initVal. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + name = funcs.get("name", "<unknown>") + def _fget(self): + prime = getattr(self, "_%s_prime" % name, False) + if prime == False: + value = fget(self) + if prime == True or (prime == False and value == initVal): + primer(self) + value = fget(self) + if prime == False and value == initVal: + return unprimeableVal + return value + funcs["fget"] = _fget + return funcs + return decorator + +def change_hook_property(hook, mutable=False, default=None): + """Call the function `hook` whenever a value different from the + current value is set. + + This is useful for saving changes to disk, etc. This function is + called *after* the new value has been stored, allowing you to + change the stored value if you want. + + In the case of mutables, things are slightly trickier. Because + the property-owning class has no way of knowing when the value + changes. We work around this by caching a private deepcopy of the + mutable value, and checking for changes whenever the property is + set (obviously) or retrieved (to check for external changes). So + long as you're conscientious about accessing the property after + making external modifications, mutability won't be a problem:: + + t.x.append(5) # external modification + t.x # dummy access notices change and triggers hook + + See :class:`testChangeHookMutableProperty` for an example of the + expected behavior. + + Parameters + ---------- + hook : fn + `hook(instance, old_value, new_value)`, where `instance` is a + reference to the class instance to which this property belongs. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "<unknown>") + def _fget(self, new_value=None, from_fset=False): # only used if mutable == True + if from_fset == True: + value = new_value # compare new value with cached + else: + value = fget(self) # compare current value with cached + if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0: + # there has been a change, cache new value + old_value = _get_cached_mutable_property(self, "change hook property", name, default) + _set_cached_mutable_property(self, "change hook property", name, value) + if from_fset == True: # return previously cached value + value = old_value + else: # the value changed while we weren't looking + hook(self, old_value, value) + return value + def _fset(self, value): + if mutable == True: # get cached previous value + old_value = _fget(self, new_value=value, from_fset=True) + else: + old_value = fget(self) + fset(self, value) + if value != old_value: + hook(self, old_value, value) + if mutable == True: + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +if libbe.TESTING == True: + class DecoratorTests(unittest.TestCase): + def testLocalDoc(self): + class Test(object): + @Property + @doc_property("A fancy property") + def x(): + return {} + self.failUnless(Test.x.__doc__ == "A fancy property", + Test.x.__doc__) + def testLocalProperty(self): + class Test(object): + @Property + @local_property(name="LOCAL") + def x(): + return {} + t = Test() + self.failUnless(t.x == None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value + self.failUnless(t.x == 'z', str(t.x)) + self.failUnless("_LOCAL_value" in dir(t), dir(t)) + self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value) + def testSettingsProperty(self): + class Test(object): + @Property + @settings_property(name="attr") + def x(): + return {} + def __init__(self): + self.settings = {} + t = Test() + self.failUnless(t.x == None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value + self.failUnless(t.x == 'z', str(t.x)) + self.failUnless("attr" in t.settings, t.settings) + self.failUnless(t.settings["attr"] == 'z', t.settings["attr"]) + def testDefaultingLocalProperty(self): + class Test(object): + @Property + @defaulting_property(default='y', null='x') + @local_property(name="DEFAULT", null=5) + def x(): return {} + t = Test() + self.failUnless(t.x == 5, str(t.x)) + t.x = 'x' + self.failUnless(t.x == 'y', str(t.x)) + t.x = 'y' + self.failUnless(t.x == 'y', str(t.x)) + t.x = 'z' + self.failUnless(t.x == 'z', str(t.x)) + t.x = 5 + self.failUnless(t.x == 5, str(t.x)) + def testCheckedLocalProperty(self): + class Test(object): + @Property + @checked_property(allowed=['x', 'y', 'z']) + @local_property(name="CHECKED") + def x(): return {} + def __init__(self): + self._CHECKED_value = 'x' + t = Test() + self.failUnless(t.x == 'x', str(t.x)) + try: + t.x = None + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + def testTwoCheckedLocalProperties(self): + class Test(object): + @Property + @checked_property(allowed=['x', 'y', 'z']) + @local_property(name="X") + def x(): return {} + + @Property + @checked_property(allowed=['a', 'b', 'c']) + @local_property(name="A") + def a(): return {} + def __init__(self): + self._A_value = 'a' + self._X_value = 'x' + t = Test() + try: + t.x = 'a' + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + t.x = 'x' + t.x = 'y' + t.x = 'z' + try: + t.a = 'x' + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + t.a = 'a' + t.a = 'b' + t.a = 'c' + def testFnCheckedLocalProperty(self): + class Test(object): + @Property + @fn_checked_property(lambda v : v in ['x', 'y', 'z']) + @local_property(name="CHECKED") + def x(): return {} + def __init__(self): + self._CHECKED_value = 'x' + t = Test() + self.failUnless(t.x == 'x', str(t.x)) + try: + t.x = None + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + def testCachedLocalProperty(self): + class Gen(object): + def __init__(self): + self.i = 0 + def __call__(self, owner): + self.i += 1 + return self.i + class Test(object): + @Property + @cached_property(generator=Gen(), initVal=None) + @local_property(name="CACHED") + def x(): return {} + t = Test() + self.failIf("_CACHED_cache" in dir(t), + getattr(t, "_CACHED_cache", None)) + self.failUnless(t.x == 1, t.x) + self.failUnless(t.x == 1, t.x) + self.failUnless(t.x == 1, t.x) + t.x = 8 + self.failUnless(t.x == 8, t.x) + self.failUnless(t.x == 8, t.x) + t._CACHED_cache = False # Caching is off, but the stored value + val = t.x # is 8, not the initVal (None), so we + self.failUnless(val == 8, val) # get 8. + t._CACHED_value = None # Now we've set the stored value to None + val = t.x # so future calls to fget (like this) + self.failUnless(val == 2, val) # will call the generator every time... + val = t.x + self.failUnless(val == 3, val) + val = t.x + self.failUnless(val == 4, val) + t._CACHED_cache = True # We turn caching back on, and get + self.failUnless(t.x == 1, str(t.x)) # the original cached value. + del t._CACHED_cached_value # Removing that value forces a + self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call + self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which + self.failUnless(t.x == 5, str(t.x)) # we get the new cached value. + def testPrimedLocalProperty(self): + class Test(object): + def prime(self): + self.settings["PRIMED"] = self.primeVal + @Property + @primed_property(primer=prime, initVal=None, unprimeableVal=2) + @settings_property(name="PRIMED") + def x(): return {} + def __init__(self): + self.settings={} + self.primeVal = "initialized" + t = Test() + self.failIf("_PRIMED_prime" in dir(t), + getattr(t, "_PRIMED_prime", None)) + self.failUnless(t.x == "initialized", t.x) + t.x = 1 + self.failUnless(t.x == 1, t.x) + t.x = None + self.failUnless(t.x == "initialized", t.x) + t._PRIMED_prime = True + t.x = 3 + self.failUnless(t.x == "initialized", t.x) + t._PRIMED_prime = False + t.x = 3 + self.failUnless(t.x == 3, t.x) + # test unprimableVal + t.x = None + t.primeVal = None + self.failUnless(t.x == 2, t.x) + def testChangeHookLocalProperty(self): + class Test(object): + def _hook(self, old, new): + self.old = old + self.new = new + + @Property + @change_hook_property(_hook) + @local_property(name="HOOKED") + def x(): return {} + t = Test() + t.x = 1 + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == 1, t.new) + t.x = 1 + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == 1, t.new) + t.x = 2 + self.failUnless(t.old == 1, t.old) + self.failUnless(t.new == 2, t.new) + def testChangeHookMutableProperty(self): + class Test(object): + def _hook(self, old, new): + self.old = old + self.new = new + self.hook_calls += 1 + + @Property + @change_hook_property(_hook, mutable=True) + @local_property(name="HOOKED") + def x(): return {} + t = Test() + t.hook_calls = 0 + t.x = [] + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == [], t.new) + self.failUnless(t.hook_calls == 1, t.hook_calls) + a = t.x + a.append(5) + t.x = a + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 2, t.hook_calls) + t.x = [] + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [], t.new) + self.failUnless(t.hook_calls == 3, t.hook_calls) + # now append without reassigning. this doesn't trigger the + # change, since we don't ever set t.x, only get it and mess + # with it. It does, however, update our t.new, since t.new = + # t.x and is not a static copy. + t.x.append(5) + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 3, t.hook_calls) + # however, the next t.x get _will_ notice the change... + a = t.x + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 4, t.hook_calls) + t.x.append(6) # this append(6) is not noticed yet + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5,6], t.new) + self.failUnless(t.hook_calls == 4, t.hook_calls) + # this append(7) is not noticed, but the t.x get causes the + # append(6) to be noticed + t.x.append(7) + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [5,6,7], t.new) + self.failUnless(t.hook_calls == 5, t.hook_calls) + a = t.x # now the append(7) is noticed + self.failUnless(t.old == [5,6], t.old) + self.failUnless(t.new == [5,6,7], t.new) + self.failUnless(t.hook_calls == 6, t.hook_calls) + + suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests) diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py new file mode 100644 index 0000000..6e4da55 --- /dev/null +++ b/libbe/storage/util/settings_object.py @@ -0,0 +1,617 @@ +# Bugs Everywhere - a distributed bugtracker +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Provides :class:`SavedSettingsObject` implementing settings-dict +based property storage. + +See Also +-------- +:mod:`libbe.storage.util.properties` : underlying property definitions +""" + +import libbe +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, fn_checked_property, \ + cached_property, primed_property, change_hook_property, \ + settings_property +if libbe.TESTING == True: + import doctest + import unittest + +class _Token (object): + """`Control' value class for properties. + + We want values that only mean something to the `settings_object` + module. + """ + pass + +class UNPRIMED (_Token): + "Property has not been primed (loaded)." + pass + +class EMPTY (_Token): + """Property has been primed but has no user-set value, so use + default/generator value. + """ + pass + + +def prop_save_settings(self, old, new): + """The default action undertaken when a property changes. + """ + if self.storage != None and self.storage.is_writeable(): + self.save_settings() + +def prop_load_settings(self): + """The default action undertaken when an UNPRIMED property is + accessed. + + Attempt to run `.load_settings()`, which calls + `._setup_saved_settings()` internally. If `.storage` is + inaccessible, don't do anything. + """ + if self.storage != None and self.storage.is_readable(): + self.load_settings() + +# Some name-mangling routines for pretty printing setting names +def setting_name_to_attr_name(self, name): + """Convert keys to the `.settings` dict into their associated + SavedSettingsObject attribute names. + + Examples + -------- + + >>> print setting_name_to_attr_name(None,"User-id") + user_id + + See Also + -------- + attr_name_to_setting_name : inverse + """ + return name.lower().replace('-', '_') + +def attr_name_to_setting_name(self, name): + """Convert SavedSettingsObject attribute names to `.settings` dict + keys. + + Examples: + + >>> print attr_name_to_setting_name(None, "user_id") + User-id + + See Also + -------- + setting_name_to_attr_name : inverse + """ + return name.capitalize().replace('_', '-') + + +def versioned_property(name, doc, + default=None, generator=None, + change_hook=prop_save_settings, + mutable=False, + primer=prop_load_settings, + allowed=None, check_fn=None, + settings_properties=[], + required_saved_properties=[], + require_save=False): + """Combine the common decorators in a single function. + + Use zero or one (but not both) of default or generator, since a + working default will keep the generator from functioning. Use the + default if you know what you want the default value to be at + 'coding time'. Use the generator if you can write a function to + determine a valid default at run time. If both default and + generator are None, then the property will be a defaulting + property which defaults to None. + + allowed and check_fn have a similar relationship, although you can + use both of these if you want. allowed compares the proposed + value against a list determined at 'coding time' and check_fn + allows more flexible comparisons to take place at run time. + + Set require_save to True if you want to save the default/generated + value for a property, to protect against future changes. E.g., we + currently expect all comments to be 'text/plain' but in the future + we may want to default to 'text/html'. If we don't want the old + comments to be interpreted as 'text/html', we would require that + the content type be saved. + + change_hook, primer, settings_properties, and + required_saved_properties are only options to get their defaults + into our local scope. Don't mess with them. + + Set mutable=True if: + + * default is a mutable + * your generator function may return mutables + * you set change_hook and might have mutable property values + + See the docstrings in `libbe.properties` for details on how each of + these cases are handled. + + The value stored in `.settings[name]` will be + + * no value (or UNPRIMED) if the property has been neither set, + nor loaded as blank. + * EMPTY if the value has been loaded as blank. + * some value if the property has been either loaded or set. + """ + settings_properties.append(name) + if require_save == True: + required_saved_properties.append(name) + def decorator(funcs): + fulldoc = doc + if default != None or generator == None: + defaulting = defaulting_property(default=default, null=EMPTY, + mutable_default=mutable) + fulldoc += "\n\nThis property defaults to %s." % default + if generator != None: + cached = cached_property(generator=generator, initVal=EMPTY, + mutable=mutable) + fulldoc += "\n\nThis property is generated with %s." % generator + if check_fn != None: + fn_checked = fn_checked_property(value_allowed_fn=check_fn) + fulldoc += "\n\nThis property is checked with %s." % check_fn + if allowed != None: + checked = checked_property(allowed=allowed) + fulldoc += "\n\nThe allowed values for this property are: %s." \ + % (', '.join(allowed)) + hooked = change_hook_property(hook=change_hook, mutable=mutable, + default=EMPTY) + primed = primed_property(primer=primer, initVal=UNPRIMED, + unprimeableVal=EMPTY) + settings = settings_property(name=name, null=UNPRIMED) + docp = doc_property(doc=fulldoc) + deco = hooked(primed(settings(docp(funcs)))) + if default != None or generator == None: + deco = defaulting(deco) + if generator != None: + deco = cached(deco) + if check_fn != None: + deco = fn_checked(deco) + if allowed != None: + deco = checked(deco) + return Property(deco) + return decorator + +class SavedSettingsObject(object): + """Setup a framework for lazy saving and loading of `.settings` + properties. + + This is useful for BE objects with saved properties + (e.g. :class:`~libbe.bugdir.BugDir`, :class:`~libbe.bug.Bug`, + :class:`~libbe.comment.Comment`). For example usage, consider the + unittests at the end of the module. + + See Also + -------- + versioned_property, prop_save_settings, prop_load_settings + setting_name_to_attr_name, attr_name_to_setting_name + """ + # Keep a list of properties that may be stored in the .settings dict. + #settings_properties = [] + + # A list of properties that we save to disk, even if they were + # never set (in which case we save the default value). This + # protects against future changes in default values. + #required_saved_properties = [] + + _setting_name_to_attr_name = setting_name_to_attr_name + _attr_name_to_setting_name = attr_name_to_setting_name + + def __init__(self): + self.storage = None + self.settings = {} + + def load_settings(self): + """Load the settings from disk.""" + # Override. Must call ._setup_saved_settings({}) with + # from-storage settings. + self._setup_saved_settings({}) + + def _setup_saved_settings(self, settings=None): + """ + Sets up a settings dict loaded from storage. Fills in + all missing settings entries with EMPTY. + """ + if settings == None: + settings = {} + for property in self.settings_properties: + if property not in self.settings \ + or self.settings[property] == UNPRIMED: + if property in settings: + self.settings[property] = settings[property] + else: + self.settings[property] = EMPTY + + def save_settings(self): + """Save the settings to disk.""" + # Override. Should save the dict output of ._get_saved_settings() + settings = self._get_saved_settings() + pass # write settings to disk.... + + def _get_saved_settings(self): + """ + In order to avoid overwriting unread on-disk data, make sure + we've loaded anything sitting on the disk. In the current + implementation, all the settings are stored in a single file, + so we need to load _all_ the saved settings. Another approach + would be per-setting saves, in which case you could skip this + step, since any setting changes would have forced that setting + load already. + """ + settings = {} + for k in self.settings_properties: # force full load + if not k in self.settings or self.settings[k] == UNPRIMED: + value = getattr( + self, self._setting_name_to_attr_name(k)) + for k in self.settings_properties: + if k in self.settings and self.settings[k] != EMPTY: + settings[k] = self.settings[k] + elif k in self.required_saved_properties: + settings[k] = getattr( + self, self._setting_name_to_attr_name(k)) + return settings + + def clear_cached_setting(self, setting=None): + "If setting=None, clear *all* cached settings" + if setting != None: + if hasattr(self, "_%s_cached_value" % setting): + delattr(self, "_%s_cached_value" % setting) + else: + for setting in settings_properties: + self.clear_cached_setting(setting) + + +if libbe.TESTING == True: + import copy + + class TestStorage (list): + def __init__(self): + list.__init__(self) + self.readable = True + self.writeable = True + def is_readable(self): + return self.readable + def is_writeable(self): + return self.writeable + + class TestObject (SavedSettingsObject): + def load_settings(self): + self.load_count += 1 + if len(self.storage) == 0: + settings = {} + else: + settings = copy.deepcopy(self.storage[-1]) + self._setup_saved_settings(settings) + def save_settings(self): + settings = self._get_saved_settings() + self.storage.append(copy.deepcopy(settings)) + def __init__(self): + SavedSettingsObject.__init__(self) + self.load_count = 0 + self.storage = TestStorage() + + class SavedSettingsObjectTests(unittest.TestCase): + def testSimplePropertyDoc(self): + """Testing a minimal versioned property docstring""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="Content-type", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def content_type(): return {} + expected = "A test property\n\nThis property defaults to None." + self.failUnless(Test.content_type.__doc__ == expected, + Test.content_type.__doc__) + def testSimplePropertyFromMemory(self): + """Testing a minimal versioned property from memory""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="Content-type", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def content_type(): return {} + t = Test() + self.failUnless(len(t.settings) == 0, len(t.settings)) + # accessing t.content_type triggers the priming, but + # t.storage.is_readable() == False, so nothing happens. + t.storage.readable = False + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(t.settings == {}, t.settings) + self.failUnless(len(t.settings) == 0, len(t.settings)) + self.failUnless(t.content_type == None, t.content_type) + # accessing t.content_type triggers the priming again, and + # now that t.storage.is_readable() == True, this fills out + # t.settings with EMPTY data. At this point there should + # be one load and no saves. + t.storage.readable = True + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + # an explicit call to load settings forces a reload, + # but nothing else changes. + t.load_settings() + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(t.load_count == 2, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + # now we set a value + t.content_type = 5 + self.failUnless(t.settings["Content-type"] == 5, + t.settings["Content-type"]) + self.failUnless(t.load_count == 2, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':5}], t.storage) + # getting its value changes nothing + self.failUnless(t.content_type == 5, t.content_type) + self.failUnless(t.settings["Content-type"] == 5, + t.settings["Content-type"]) + self.failUnless(t.load_count == 2, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':5}], t.storage) + # now we set another value + t.content_type = "text/plain" + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings["Content-type"] == "text/plain", + t.settings["Content-type"]) + self.failUnless(t.load_count == 2, t.load_count) + self.failUnless(len(t.storage) == 2, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':5}, + {'Content-type':'text/plain'}], + t.storage) + # t._get_saved_settings() returns a dict of required or + # non-default values. + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + # now we clear to the post-primed value + t.content_type = EMPTY + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t._get_saved_settings() == {}, + t._get_saved_settings()) + self.failUnless(t.storage == [{'Content-type':5}, + {'Content-type':'text/plain'}, + {}], + t.storage) + def testSimplePropertyFromStorage(self): + """Testing a minimal versioned property from storage""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="prop-a", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def prop_a(): return {} + @versioned_property( + name="prop-b", + doc="Another test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def prop_b(): return {} + t = Test() + t.storage.append({'prop-a':'saved'}) + # setting prop-b forces a load (to check for changes), + # which also pulls in prop-a. + t.prop_b = 'new-b' + settings = {'prop-b':'new-b', 'prop-a':'saved'} + self.failUnless(t.settings == settings, t.settings) + self.failUnless(t._get_saved_settings() == settings, + t._get_saved_settings()) + # test that _get_saved_settings() works even when settings + # were _not_ loaded beforehand + t = Test() + t.storage.append({'prop-a':'saved'}) + settings ={'prop-a':'saved'} + self.failUnless(t.settings == {}, t.settings) + self.failUnless(t._get_saved_settings() == settings, + t._get_saved_settings()) + def testSimplePropertySetStorageSave(self): + """Set a property, then attach storage and save""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="prop-a", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def prop_a(): return {} + @versioned_property( + name="prop-b", + doc="Another test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def prop_b(): return {} + t = Test() + storage = t.storage + t.storage = None + t.prop_a = 'text/html' + t.storage = storage + t.save_settings() + self.failUnless(t.prop_a == 'text/html', t.prop_a) + self.failUnless(t.settings == {'prop-a':'text/html', + 'prop-b':EMPTY}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'prop-a':'text/html'}], + t.storage) + def testDefaultingProperty(self): + """Testing a defaulting versioned property""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="Content-type", + doc="A test property", + default="text/plain", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def content_type(): return {} + t = Test() + self.failUnless(t.settings == {}, t.settings) + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings == {"Content-type":EMPTY}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + self.failUnless(t._get_saved_settings() == {}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t.content_type == "text/html", + t.content_type) + self.failUnless(t.settings == {"Content-type":"text/html"}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':'text/html'}], + t.storage) + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + def testRequiredDefaultingProperty(self): + """Testing a required defaulting versioned property""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="Content-type", + doc="A test property", + default="text/plain", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + require_save=True) + def content_type(): return {} + t = Test() + self.failUnless(t.settings == {}, t.settings) + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings == {"Content-type":EMPTY}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t.content_type == "text/html", + t.content_type) + self.failUnless(t.settings == {"Content-type":"text/html"}, + t.settings) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':'text/html'}], + t.storage) + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + def testClassVersionedPropertyDefinition(self): + """Testing a class-specific _versioned property decorator""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + def _versioned_property( + settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"] = \ + required_saved_properties + return versioned_property(**kwargs) + @_versioned_property(name="Content-type", + doc="A test property", + default="text/plain", + require_save=True) + def content_type(): return {} + t = Test() + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/plain"}, + t._get_saved_settings()) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 0, len(t.storage)) + t.content_type = "text/html" + self.failUnless(t._get_saved_settings() == \ + {"Content-type":"text/html"}, + t._get_saved_settings()) + self.failUnless(t.load_count == 1, t.load_count) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'Content-type':'text/html'}], + t.storage) + def testMutableChangeHookedProperty(self): + """Testing a mutable change-hooked property""" + class Test (TestObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property( + name="List-type", + doc="A test property", + mutable=True, + change_hook=prop_save_settings, + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def list_type(): return {} + t = Test() + self.failUnless(len(t.storage) == 0, len(t.storage)) + self.failUnless(t.list_type == None, t.list_type) + self.failUnless(len(t.storage) == 0, len(t.storage)) + self.failUnless(t.settings["List-type"]==EMPTY, + t.settings["List-type"]) + t.list_type = [] + self.failUnless(t.settings["List-type"] == [], + t.settings["List-type"]) + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'List-type':[]}], + t.storage) + t.list_type.append(5) # external modification not detected yet + self.failUnless(len(t.storage) == 1, len(t.storage)) + self.failUnless(t.storage == [{'List-type':[]}], + t.storage) + self.failUnless(t.settings["List-type"] == [5], + t.settings["List-type"]) + self.failUnless(t.list_type == [5], t.list_type)# get triggers save + self.failUnless(len(t.storage) == 2, len(t.storage)) + self.failUnless(t.storage == [{'List-type':[]}, + {'List-type':[5]}], + t.storage) + + unitsuite = unittest.TestLoader().loadTestsFromTestCase( \ + SavedSettingsObjectTests) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/storage/util/upgrade.py b/libbe/storage/util/upgrade.py new file mode 100644 index 0000000..f3c4912 --- /dev/null +++ b/libbe/storage/util/upgrade.py @@ -0,0 +1,331 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Handle conversion between the various BE storage formats. +""" + +import codecs +import os, os.path +import sys + +import libbe +import libbe.bug +import libbe.storage.util.mapfile as mapfile +from libbe.storage import STORAGE_VERSIONS, STORAGE_VERSION +#import libbe.storage.vcs # delay import to avoid cyclic dependency +import libbe.ui.util.editor +import libbe.util +import libbe.util.encoding as encoding +import libbe.util.id + + +class Upgrader (object): + "Class for converting between different on-disk BE storage formats." + initial_version = None + final_version = None + def __init__(self, repo): + import libbe.storage.vcs + + self.repo = repo + vcs_name = self._get_vcs_name() + if vcs_name == None: + vcs_name = 'None' + self.vcs = libbe.storage.vcs.vcs_by_name(vcs_name) + self.vcs.repo = self.repo + self.vcs.root() + + def get_path(self, *args): + """ + Return the absolute path using args relative to .be. + """ + dir = os.path.join(self.repo, '.be') + if len(args) == 0: + return dir + return os.path.join(dir, *args) + + def _get_vcs_name(self): + return None + + def check_initial_version(self): + path = self.get_path('version') + version = encoding.get_file_contents(path, decode=True).rstrip('\n') + assert version == self.initial_version, '%s: %s' % (path, version) + + def set_version(self): + path = self.get_path('version') + encoding.set_file_contents(path, self.final_version+'\n') + self.vcs._vcs_update(path) + + def upgrade(self): + print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \ + % (self.initial_version, self.final_version) + self.check_initial_version() + self.set_version() + self._upgrade() + + def _upgrade(self): + raise NotImplementedError + + +class Upgrade_1_0_to_1_1 (Upgrader): + initial_version = "Bugs Everywhere Tree 1 0" + final_version = "Bugs Everywhere Directory v1.1" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = encoding.get_file_contents(path) + for line in settings.splitlines(False): + fields = line.split('=') + if len(fields) == 2 and fields[0] == 'rcs_name': + return fields[1] + return None + + def _upgrade_mapfile(self, path): + contents = encoding.get_file_contents(path, decode=True) + old_format = False + for line in contents.splitlines(): + if len(line.split('=')) == 2: + old_format = True + break + if old_format == True: + # translate to YAML. + newlines = [] + for line in contents.splitlines(): + line = line.rstrip('\n') + if len(line) == 0: + continue + fields = line.split("=") + if len(fields) == 2: + key,value = fields + newlines.append('%s: "%s"' % (key, value.replace('"','\\"'))) + else: + newlines.append(line) + contents = '\n'.join(newlines) + # load the YAML and save + map = mapfile.parse(contents) + contents = mapfile.generate(map) + encoding.set_file_contents(path, contents) + self.vcs._vcs_update(path) + + def _upgrade(self): + """ + Comment value field "From" -> "Author". + Homegrown mapfile -> YAML. + """ + path = self.get_path('settings') + self._upgrade_mapfile(path) + for bug_uuid in os.listdir(self.get_path('bugs')): + path = self.get_path('bugs', bug_uuid, 'values') + self._upgrade_mapfile(path) + c_path = ['bugs', bug_uuid, 'comments'] + if not os.path.exists(self.get_path(*c_path)): + continue # no comments for this bug + for comment_uuid in os.listdir(self.get_path(*c_path)): + path_list = c_path + [comment_uuid, 'values'] + path = self.get_path(*path_list) + self._upgrade_mapfile(path) + settings = mapfile.parse( + encoding.get_file_contents(path)) + if 'From' in settings: + settings['Author'] = settings.pop('From') + encoding.set_file_contents( + path, mapfile.generate(settings)) + self.vcs._vcs_update(path) + + +class Upgrade_1_1_to_1_2 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.1" + final_version = "Bugs Everywhere Directory v1.2" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'rcs_name' in settings: + return settings['rcs_name'] + return None + + def _upgrade(self): + """ + BugDir settings field "rcs_name" -> "vcs_name". + """ + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'rcs_name' in settings: + settings['vcs_name'] = settings.pop('rcs_name') + encoding.set_file_contents(path, mapfile.generate(settings)) + self.vcs._vcs_update(path) + +class Upgrade_1_2_to_1_3 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.2" + final_version = "Bugs Everywhere Directory v1.3" + def __init__(self, *args, **kwargs): + Upgrader.__init__(self, *args, **kwargs) + self._targets = {} # key: target text,value: new target bug + + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'vcs_name' in settings: + return settings['vcs_name'] + return None + + def _save_bug_settings(self, bug): + # The target bugs don't have comments + path = self.get_path('bugs', bug.uuid, 'values') + if not os.path.exists(path): + self.vcs._add_path(path, directory=False) + path = self.get_path('bugs', bug.uuid, 'values') + mf = mapfile.generate(bug._get_saved_settings()) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _target_bug(self, target_text): + if target_text not in self._targets: + bug = libbe.bug.Bug(summary=target_text) + bug.severity = 'target' + self._targets[target_text] = bug + return self._targets[target_text] + + def _upgrade_bugdir_mapfile(self): + path = self.get_path('settings') + mf = encoding.get_file_contents(path) + if mf == libbe.util.InvalidObject: + return # settings file does not exist + settings = mapfile.parse(mf) + if 'target' in settings: + settings['target'] = self._target_bug(settings['target']).uuid + mf = mapfile.generate(settings) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _upgrade_bug_mapfile(self, bug_uuid): + import libbe.command.depend as dep + path = self.get_path('bugs', bug_uuid, 'values') + mf = encoding.get_file_contents(path) + if mf == libbe.util.InvalidObject: + return # settings file does not exist + settings = mapfile.parse(mf) + if 'target' in settings: + target_bug = self._target_bug(settings['target']) + + blocked_by_string = '%s%s' % (dep.BLOCKED_BY_TAG, bug_uuid) + dep._add_remove_extra_string(target_bug, blocked_by_string, add=True) + blocks_string = dep._generate_blocks_string(target_bug) + estrs = settings.get('extra_strings', []) + estrs.append(blocks_string) + settings['extra_strings'] = sorted(estrs) + + settings.pop('target') + mf = mapfile.generate(settings) + encoding.set_file_contents(path, mf) + self.vcs._vcs_update(path) + + def _upgrade(self): + """ + Bug value field "target" -> target bugs. + Bugdir value field "target" -> pointer to current target bug. + """ + for bug_uuid in os.listdir(self.get_path('bugs')): + self._upgrade_bug_mapfile(bug_uuid) + self._upgrade_bugdir_mapfile() + for bug in self._targets.values(): + self._save_bug_settings(bug) + +class Upgrade_1_3_to_1_4 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.3" + final_version = "Bugs Everywhere Directory v1.4" + def _get_vcs_name(self): + path = self.get_path('settings') + settings = mapfile.parse(encoding.get_file_contents(path)) + if 'vcs_name' in settings: + return settings['vcs_name'] + return None + + def _upgrade(self): + """ + add new directory "./be/BUGDIR-UUID" + "./be/bugs" -> "./be/BUGDIR-UUID/bugs" + "./be/settings" -> "./be/BUGDIR-UUID/settings" + """ + self.repo = os.path.abspath(self.repo) + basenames = [p for p in os.listdir(self.get_path())] + if not 'bugs' in basenames and not 'settings' in basenames \ + and len([p for p in basenames if len(p)==36]) == 1: + return # the user has upgraded the directory. + basenames = [p for p in basenames if p in ['bugs','settings']] + uuid = libbe.util.id.uuid_gen() + add = [self.get_path(uuid)] + move = [(self.get_path(p), self.get_path(uuid, p)) for p in basenames] + msg = ['Upgrading BE directory version v1.3 to v1.4', + '', + "Because BE's VCS drivers don't support 'move',", + 'please make the following changes with your VCS', + 'and re-run BE. Note that you can choose a different', + 'bugdir UUID to preserve uniformity across branches', + 'of a distributed repository.' + '', + 'add', + ' ' + '\n '.join(add), + 'move', + ' ' + '\n '.join(['%s %s' % (a,b) for a,b in move]), + ] + self.vcs._cached_path_id.destroy() + raise Exception('Need user assistance\n%s' % '\n'.join(msg)) + + +upgraders = [Upgrade_1_0_to_1_1, + Upgrade_1_1_to_1_2, + Upgrade_1_2_to_1_3, + Upgrade_1_3_to_1_4] +upgrade_classes = {} +for upgrader in upgraders: + upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader + +def upgrade(path, current_version, + target_version=STORAGE_VERSION): + """ + Call the appropriate upgrade function to convert current_version + to target_version. If a direct conversion function does not exist, + use consecutive conversion functions. + """ + if current_version not in STORAGE_VERSIONS: + raise NotImplementedError, \ + "Cannot handle version '%s' yet." % current_version + if target_version not in STORAGE_VERSIONS: + raise NotImplementedError, \ + "Cannot handle version '%s' yet." % current_version + + if (current_version, target_version) in upgrade_classes: + # direct conversion + upgrade_class = upgrade_classes[(current_version, target_version)] + u = upgrade_class(path) + u.upgrade() + else: + # consecutive single-step conversion + i = STORAGE_VERSIONS.index(current_version) + while True: + version_a = STORAGE_VERSIONS[i] + version_b = STORAGE_VERSIONS[i+1] + try: + upgrade_class = upgrade_classes[(version_a, version_b)] + except KeyError: + raise NotImplementedError, \ + "Cannot convert version '%s' to '%s' yet." \ + % (version_a, version_b) + u = upgrade_class(path) + u.upgrade() + if version_b == target_version: + break + i += 1 |