diff options
Diffstat (limited to 'libbe/storage/util/properties.py')
-rw-r--r-- | libbe/storage/util/properties.py | 193 |
1 files changed, 141 insertions, 52 deletions
diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py index 6a2b964..4489a44 100644 --- a/libbe/storage/util/properties.py +++ b/libbe/storage/util/properties.py @@ -42,50 +42,53 @@ import copy import types import libbe + if libbe.TESTING: import unittest -class ValueCheckError (ValueError): +class ValueCheckError(ValueError): def __init__(self, name, value, allowed): - action = "in" # some list of allowed values - if type(allowed) == types.FunctionType: - action = "allowed by" # some allowed-value check function + action = "in" # some list of allowed values + if isinstance(allowed, types.FunctionType): + action = "allowed by" # some allowed-value check function msg = "%s not %s %s for %s" % (value, action, allowed, name) ValueError.__init__(self, msg) self.name = name self.value = value self.allowed = allowed + def Property(funcs): """ End a chain of property decorators, returning a property. """ - args = {} - args["fget"] = funcs.get("fget", None) - args["fset"] = funcs.get("fset", None) - args["fdel"] = funcs.get("fdel", None) - args["doc"] = funcs.get("doc", None) - - #print("Creating a property with") - #for key, val in args.items(): print(key, value) + args = {"fget": funcs.get("fget", None), "fset": funcs.get("fset", None), "fdel": funcs.get("fdel", None), + "doc": funcs.get("doc", None)} + + # print("Creating a property with") + # for key, val in args.items(): print(key, value) return property(**args) + def doc_property(doc=None): """ Add a docstring to a chain of property decorators. """ + def decorator(funcs=None): """ Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...} or a function fn() returning such a dict. """ if hasattr(funcs, "__call__"): - funcs = funcs() # convert from function-arg to dict + funcs = funcs() # convert from function-arg to dict funcs["doc"] = doc return funcs + return decorator + def local_property(name, null=None, mutable_null=False): """ Define get/set access to per-parent-instance local storage. Uses @@ -95,11 +98,13 @@ def local_property(name, null=None, mutable_null=False): If mutable_null == True, we only release deepcopies of the null to the outside world. """ + def decorator(funcs): if hasattr(funcs, "__call__"): funcs = funcs() fget = funcs.get("fget", None) fset = funcs.get("fset", None) + def _fget(self): if fget is not None: fget(self) @@ -109,40 +114,49 @@ def local_property(name, null=None, mutable_null=False): ret_null = null value = getattr(self, "_%s_value" % name, ret_null) return value + def _fset(self, value): setattr(self, "_%s_value" % name, value) if fset is not None: fset(self, value) + funcs["fget"] = _fget funcs["fset"] = _fset funcs["name"] = name return funcs + return decorator + def settings_property(name, null=None): """ Similar to local_property, except where local_property stores the value in instance._<name>_value, settings_property stores the value in instance.settings[name]. """ + def decorator(funcs): if hasattr(funcs, "__call__"): funcs = funcs() fget = funcs.get("fget", None) fset = funcs.get("fset", None) + def _fget(self): if fget is not None: fget(self) value = self.settings.get(name, null) return value + def _fset(self, value): self.settings[name] = value if fset is not None: fset(self, value) + funcs["fget"] = _fget funcs["fset"] = _fset funcs["name"] = name return funcs + return decorator @@ -158,22 +172,30 @@ def settings_property(name, null=None): # True def _hash_mutable_value(value): return repr(value) + + def _init_mutable_property_cache(self): if not hasattr(self, "_mutable_property_cache_hash"): # first call to _fget for any mutable property self._mutable_property_cache_hash = {} self._mutable_property_cache_copy = {} + + def _set_cached_mutable_property(self, cacher_name, property_name, value): _init_mutable_property_cache(self) self._mutable_property_cache_hash[(cacher_name, property_name)] = \ _hash_mutable_value(value) self._mutable_property_cache_copy[(cacher_name, property_name)] = \ copy.deepcopy(value) + + def _get_cached_mutable_property(self, cacher_name, property_name, default=None): _init_mutable_property_cache(self) if (cacher_name, property_name) not in self._mutable_property_cache_copy: return default return self._mutable_property_cache_copy[(cacher_name, property_name)] + + def _eq_cached_mutable_property(self, cacher_name, property_name, value, default=None): _init_mutable_property_cache(self) if (cacher_name, property_name) not in self._mutable_property_cache_hash: @@ -194,12 +216,14 @@ def defaulting_property(default=None, null=None, null should never escape to the outside world, so don't worry about it being a mutable. """ + def decorator(funcs): if hasattr(funcs, "__call__"): funcs = funcs() fget = funcs.get("fget") fset = funcs.get("fset") name = funcs.get("name", "<unknown>") + def _fget(self): value = fget(self) if value == null: @@ -208,63 +232,81 @@ def defaulting_property(default=None, null=None, else: return default return value + def _fset(self, value): if value == default: value = null fset(self, value) + funcs["fget"] = _fget funcs["fset"] = _fset return funcs + return decorator + def fn_checked_property(value_allowed_fn): """ Define allowed values for get/set access to a property. """ + def decorator(funcs): if hasattr(funcs, "__call__"): funcs = funcs() fget = funcs.get("fget") fset = funcs.get("fset") name = funcs.get("name", "<unknown>") + def _fget(self): value = fget(self) - if value_allowed_fn(value) != True: + if not value_allowed_fn(value): raise ValueCheckError(name, value, value_allowed_fn) return value + def _fset(self, value): - if value_allowed_fn(value) != True: + if not value_allowed_fn(value): raise ValueCheckError(name, value, value_allowed_fn) fset(self, value) + funcs["fget"] = _fget funcs["fset"] = _fset return funcs + return decorator -def checked_property(allowed=[]): + +def checked_property(allowed=None): """ Define allowed values for get/set access to a property. """ + def decorator(funcs): if hasattr(funcs, "__call__"): funcs = funcs() fget = funcs.get("fget") fset = funcs.get("fset") name = funcs.get("name", "<unknown>") + def _fget(self): value = fget(self) if value not in allowed: raise ValueCheckError(name, value, allowed) return value + def _fset(self, value): if value not in allowed: raise ValueCheckError(name, value, allowed) fset(self, value) + funcs["fget"] = _fget funcs["fset"] = _fset return funcs + if allowed is None: + allowed = [] + return decorator + def cached_property(generator, initVal=None, mutable=False): """ Allow caching of values generated by generator(instance), where @@ -291,11 +333,13 @@ def cached_property(generator, initVal=None, mutable=False): generator is called whenever the cached value would otherwise be used. """ + def decorator(funcs): if hasattr(funcs, "__call__"): funcs = funcs() fget = funcs.get("fget") name = funcs.get("name", "<unknown>") + def _fget(self): cache = getattr(self, "_%s_cache" % name, True) value = fget(self) @@ -309,10 +353,13 @@ def cached_property(generator, initVal=None, mutable=False): else: value = generator(self) return value + funcs["fget"] = _fget return funcs + return decorator + def primed_property(primer, initVal=None, unprimeableVal=None): """ Just like a cached_property, except that instead of returning a @@ -326,14 +373,17 @@ def primed_property(primer, initVal=None, unprimeableVal=None): whenever ._<name>_prime is True, or is False or missing and value == initVal. """ + def decorator(funcs): if hasattr(funcs, "__call__"): funcs = funcs() fget = funcs.get("fget") name = funcs.get("name", "<unknown>") + def _fget(self): prime = getattr(self, "_%s_prime" % name, False) - if prime == False: + value = None + if not prime: value = fget(self) if prime == True or (prime == False and value == initVal): primer(self) @@ -341,10 +391,13 @@ def primed_property(primer, initVal=None, unprimeableVal=None): if prime == False and value == initVal: return unprimeableVal return value + funcs["fget"] = _fget return funcs + return decorator + def change_hook_property(hook, mutable=False, default=None): """Call the function `hook` whenever a value different from the current value is set. @@ -373,40 +426,46 @@ def change_hook_property(hook, mutable=False, default=None): `hook(instance, old_value, new_value)`, where `instance` is a reference to the class instance to which this property belongs. """ + def decorator(funcs): if hasattr(funcs, "__call__"): funcs = funcs() fget = funcs.get("fget") fset = funcs.get("fset") name = funcs.get("name", "<unknown>") - def _fget(self, new_value=None, from_fset=False): # only used if mutable == True + + def _fget(self, new_value=None, from_fset=False): # only used if mutable == True if from_fset: - value = new_value # compare new value with cached + value = new_value # compare new value with cached else: - value = fget(self) # compare current value with cached + value = fget(self) # compare current value with cached if _eq_cached_mutable_property(self, "change hook property", name, value, default) != 0: # there has been a change, cache new value old_value = _get_cached_mutable_property(self, "change hook property", name, default) _set_cached_mutable_property(self, "change hook property", name, value) - if from_fset: # return previously cached value + if from_fset: # return previously cached value value = old_value - else: # the value changed while we weren't looking + else: # the value changed while we weren't looking hook(self, old_value, value) return value + def _fset(self, value): - if mutable: # get cached previous value + if mutable: # get cached previous value old_value = _fget(self, new_value=value, from_fset=True) else: old_value = fget(self) fset(self, value) if value != old_value: hook(self, old_value, value) + if mutable: funcs["fget"] = _fget funcs["fset"] = _fset return funcs + return decorator + if libbe.TESTING: class DecoratorTests(unittest.TestCase): def testLocalDoc(self): @@ -415,40 +474,48 @@ if libbe.TESTING: @doc_property("A fancy property") def x(): return {} + self.assertTrue(Test.x.__doc__ == "A fancy property", Test.x.__doc__) + def testLocalProperty(self): class Test(object): @Property @local_property(name="LOCAL") def x(): return {} + t = Test() - self.assertTrue(t.x == None, str(t.x)) - t.x = 'z' # the first set initializes ._LOCAL_value + self.assertTrue(t.x is None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value self.assertTrue(t.x == 'z', str(t.x)) self.assertTrue("_LOCAL_value" in dir(t), dir(t)) self.assertTrue(t._LOCAL_value == 'z', t._LOCAL_value) + def testSettingsProperty(self): class Test(object): @Property @settings_property(name="attr") def x(): return {} + def __init__(self): self.settings = {} + t = Test() - self.assertTrue(t.x == None, str(t.x)) - t.x = 'z' # the first set initializes ._LOCAL_value + self.assertTrue(t.x is None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value self.assertTrue(t.x == 'z', str(t.x)) self.assertTrue("attr" in t.settings, t.settings) self.assertTrue(t.settings["attr"] == 'z', t.settings["attr"]) + def testDefaultingLocalProperty(self): class Test(object): @Property @defaulting_property(default='y', null='x') @local_property(name="DEFAULT", null=5) def x(): return {} + t = Test() self.assertTrue(t.x == 5, str(t.x)) t.x = 'x' @@ -459,14 +526,17 @@ if libbe.TESTING: self.assertTrue(t.x == 'z', str(t.x)) t.x = 5 self.assertTrue(t.x == 5, str(t.x)) + def testCheckedLocalProperty(self): class Test(object): @Property @checked_property(allowed=['x', 'y', 'z']) @local_property(name="CHECKED") def x(): return {} + def __init__(self): self._CHECKED_value = 'x' + t = Test() self.assertTrue(t.x == 'x', str(t.x)) try: @@ -475,6 +545,7 @@ if libbe.TESTING: except ValueCheckError as e: pass self.assertTrue(type(e) == ValueCheckError, type(e)) + def testTwoCheckedLocalProperties(self): class Test(object): @Property @@ -486,9 +557,11 @@ if libbe.TESTING: @checked_property(allowed=['a', 'b', 'c']) @local_property(name="A") def a(): return {} + def __init__(self): self._A_value = 'a' self._X_value = 'x' + t = Test() try: t.x = 'a' @@ -508,14 +581,17 @@ if libbe.TESTING: t.a = 'a' t.a = 'b' t.a = 'c' + def testFnCheckedLocalProperty(self): class Test(object): @Property - @fn_checked_property(lambda v : v in ['x', 'y', 'z']) + @fn_checked_property(lambda v: v in ['x', 'y', 'z']) @local_property(name="CHECKED") def x(): return {} + def __init__(self): self._CHECKED_value = 'x' + t = Test() self.assertTrue(t.x == 'x', str(t.x)) try: @@ -524,57 +600,65 @@ if libbe.TESTING: except ValueCheckError as e: pass self.assertTrue(type(e) == ValueCheckError, type(e)) + def testCachedLocalProperty(self): class Gen(object): def __init__(self): self.i = 0 + def __call__(self, owner): self.i += 1 return self.i + class Test(object): @Property @cached_property(generator=Gen(), initVal=None) @local_property(name="CACHED") def x(): return {} + t = Test() self.assertFalse("_CACHED_cache" in dir(t), - getattr(t, "_CACHED_cache", None)) + getattr(t, "_CACHED_cache", None)) self.assertTrue(t.x == 1, t.x) self.assertTrue(t.x == 1, t.x) self.assertTrue(t.x == 1, t.x) t.x = 8 self.assertTrue(t.x == 8, t.x) self.assertTrue(t.x == 8, t.x) - t._CACHED_cache = False # Caching is off, but the stored value - val = t.x # is 8, not the initVal (None), so we - self.assertTrue(val == 8, val) # get 8. - t._CACHED_value = None # Now we've set the stored value to None - val = t.x # so future calls to fget (like this) - self.assertTrue(val == 2, val) # will call the generator every time... + t._CACHED_cache = False # Caching is off, but the stored value + val = t.x # is 8, not the initVal (None), so we + self.assertTrue(val == 8, val) # get 8. + t._CACHED_value = None # Now we've set the stored value to None + val = t.x # so future calls to fget (like this) + self.assertTrue(val == 2, val) # will call the generator every time... val = t.x self.assertTrue(val == 3, val) val = t.x self.assertTrue(val == 4, val) - t._CACHED_cache = True # We turn caching back on, and get - self.assertTrue(t.x == 1, str(t.x)) # the original cached value. - del t._CACHED_cached_value # Removing that value forces a - self.assertTrue(t.x == 5, str(t.x)) # single cache-regenerating call - self.assertTrue(t.x == 5, str(t.x)) # to the genenerator, after which - self.assertTrue(t.x == 5, str(t.x)) # we get the new cached value. + t._CACHED_cache = True # We turn caching back on, and get + self.assertTrue(t.x == 1, str(t.x)) # the original cached value. + del t._CACHED_cached_value # Removing that value forces a + self.assertTrue(t.x == 5, str(t.x)) # single cache-regenerating call + self.assertTrue(t.x == 5, str(t.x)) # to the genenerator, after which + self.assertTrue(t.x == 5, str(t.x)) # we get the new cached value. + def testPrimedLocalProperty(self): class Test(object): def prime(self): self.settings["PRIMED"] = self.primeVal + @Property @primed_property(primer=prime, initVal=None, unprimeableVal=2) @settings_property(name="PRIMED") def x(): return {} + def __init__(self): - self.settings={} + self.settings = {} self.primeVal = "initialized" + t = Test() self.assertFalse("_PRIMED_prime" in dir(t), - getattr(t, "_PRIMED_prime", None)) + getattr(t, "_PRIMED_prime", None)) self.assertTrue(t.x == "initialized", t.x) t.x = 1 self.assertTrue(t.x == 1, t.x) @@ -590,6 +674,7 @@ if libbe.TESTING: t.x = None t.primeVal = None self.assertTrue(t.x == 2, t.x) + def testChangeHookLocalProperty(self): class Test(object): def _hook(self, old, new): @@ -600,16 +685,18 @@ if libbe.TESTING: @change_hook_property(_hook) @local_property(name="HOOKED") def x(): return {} + t = Test() t.x = 1 - self.assertTrue(t.old == None, t.old) + self.assertTrue(t.old is None, t.old) self.assertTrue(t.new == 1, t.new) t.x = 1 - self.assertTrue(t.old == None, t.old) + self.assertTrue(t.old is None, t.old) self.assertTrue(t.new == 1, t.new) t.x = 2 self.assertTrue(t.old == 1, t.old) self.assertTrue(t.new == 2, t.new) + def testChangeHookMutableProperty(self): class Test(object): def _hook(self, old, new): @@ -621,10 +708,11 @@ if libbe.TESTING: @change_hook_property(_hook, mutable=True) @local_property(name="HOOKED") def x(): return {} + t = Test() t.hook_calls = 0 t.x = [] - self.assertTrue(t.old == None, t.old) + self.assertTrue(t.old is None, t.old) self.assertTrue(t.new == [], t.new) self.assertTrue(t.hook_calls == 1, t.hook_calls) a = t.x @@ -650,19 +738,20 @@ if libbe.TESTING: self.assertTrue(t.old == [], t.old) self.assertTrue(t.new == [5], t.new) self.assertTrue(t.hook_calls == 4, t.hook_calls) - t.x.append(6) # this append(6) is not noticed yet + t.x.append(6) # this append(6) is not noticed yet self.assertTrue(t.old == [], t.old) - self.assertTrue(t.new == [5,6], t.new) + self.assertTrue(t.new == [5, 6], t.new) self.assertTrue(t.hook_calls == 4, t.hook_calls) # this append(7) is not noticed, but the t.x get causes the # append(6) to be noticed t.x.append(7) self.assertTrue(t.old == [5], t.old) - self.assertTrue(t.new == [5,6,7], t.new) + self.assertTrue(t.new == [5, 6, 7], t.new) self.assertTrue(t.hook_calls == 5, t.hook_calls) - a = t.x # now the append(7) is noticed - self.assertTrue(t.old == [5,6], t.old) - self.assertTrue(t.new == [5,6,7], t.new) + a = t.x # now the append(7) is noticed + self.assertTrue(t.old == [5, 6], t.old) + self.assertTrue(t.new == [5, 6, 7], t.new) self.assertTrue(t.hook_calls == 6, t.hook_calls) + suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests) |