aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/storage')
-rw-r--r--libbe/storage/base.py55
-rw-r--r--libbe/storage/util/properties.py19
-rw-r--r--libbe/storage/util/settings_object.py388
-rw-r--r--libbe/storage/vcs/base.py62
-rw-r--r--libbe/storage/vcs/bzr.py9
5 files changed, 367 insertions, 166 deletions
diff --git a/libbe/storage/base.py b/libbe/storage/base.py
index 202305b..64ae3e7 100644
--- a/libbe/storage/base.py
+++ b/libbe/storage/base.py
@@ -84,9 +84,12 @@ class EmptyCommit(Exception):
def __init__(self):
Exception.__init__(self, 'No changes to commit')
+class _EMPTY (object):
+ """Entry has been added but has no user-set value."""
+ pass
class Entry (Tree):
- def __init__(self, id, value=None, parent=None, directory=False,
+ def __init__(self, id, value=_EMPTY, parent=None, directory=False,
children=None):
if children == None:
Tree.__init__(self)
@@ -241,10 +244,7 @@ class Storage (object):
"""Add an entry"""
if self.is_writeable() == False:
raise NotWriteable('Cannot add entry to unwriteable storage.')
- try: # Maybe we've already added that id?
- self.get(id)
- pass # yup, no need to add another
- except InvalidID:
+ if not self.exists(id):
self._add(id, *args, **kwargs)
def _add(self, id, parent=None, directory=False):
@@ -253,6 +253,15 @@ class Storage (object):
p = self._data[parent]
self._data[id] = Entry(id, parent=p, directory=directory)
+ def exists(self, *args, **kwargs):
+ """Check an entry's existence"""
+ if self.is_readable() == False:
+ raise NotReadable('Cannot check entry existence in unreadable storage.')
+ return self._exists(*args, **kwargs)
+
+ def _exists(self, id, revision=None):
+ return id in self._data
+
def remove(self, *args, **kwargs):
"""Remove an entry."""
if self.is_writeable() == False:
@@ -332,7 +341,7 @@ class Storage (object):
return value
def _get(self, id, default=InvalidObject, revision=None):
- if id in self._data:
+ if id in self._data and self._data[id].value != _EMPTY:
return self._data[id].value
elif default == InvalidObject:
raise InvalidID(id)
@@ -402,6 +411,13 @@ class VersionedStorage (Storage):
p = self._data[-1][parent]
self._data[-1][id] = Entry(id, parent=p, directory=directory)
+ def _exists(self, id, revision=None):
+ if revision == None:
+ revision = -1
+ else:
+ revision = int(revision)
+ return id in self._data[revision]
+
def _remove(self, id):
if self._data[-1][id].directory == True \
and len(self.children(id)) > 0:
@@ -446,7 +462,8 @@ class VersionedStorage (Storage):
revision = -1
else:
revision = int(revision)
- if id in self._data[revision]:
+ if id in self._data[revision] \
+ and self._data[revision][id].value != _EMPTY:
return self._data[revision][id].value
elif default == InvalidObject:
raise InvalidID(id)
@@ -760,13 +777,14 @@ if TESTING == True:
pass
def test_get_initial_value(self):
- """Data value should be None before any value has been set.
+ """Data value should be default before any value has been set.
"""
self.s.add(self.id, directory=False)
- ret = self.s.get(self.id)
- self.failUnless(ret == None,
- "%s.get() returned %s not None"
- % (vars(self.Class)['name'], ret))
+ val = 'UNLIKELY DEFAULT'
+ ret = self.s.get(self.id, default=val)
+ self.failUnless(ret == val,
+ "%s.get() returned %s not %s"
+ % (vars(self.Class)['name'], ret, val))
def test_set_exception(self):
"""Set should raise exception if id not in Storage.
@@ -830,6 +848,19 @@ if TESTING == True:
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], ret, self.val))
+ def test_empty_get_set_persistence(self):
+ """After empty set, get may return either an empty string or default.
+ """
+ self.s.add(self.id, directory=False)
+ self.s.set(self.id, '')
+ self.s.disconnect()
+ self.s.connect()
+ default = 'UNLIKELY DEFAULT'
+ ret = self.s.get(self.id, default=default)
+ self.failUnless(ret in ['', default],
+ "%s.get() returned %s not in %s"
+ % (vars(self.Class)['name'], ret, ['', default]))
+
def test_add_nonrooted_persistence(self):
"""Adding entries should increase the number of children after reconnect.
"""
diff --git a/libbe/storage/util/properties.py b/libbe/storage/util/properties.py
index f48cfa0..55bac85 100644
--- a/libbe/storage/util/properties.py
+++ b/libbe/storage/util/properties.py
@@ -303,12 +303,14 @@ def cached_property(generator, initVal=None, mutable=False):
return funcs
return decorator
-def primed_property(primer, initVal=None):
+def primed_property(primer, initVal=None, unprimeableVal=None):
"""
Just like a cached_property, except that instead of returning a
- new value and running fset to cache it, the primer performs some
+ new value and running fset to cache it, the primer attempts some
background manipulation (e.g. loads data into instance.settings)
- such that a _second_ pass through fget succeeds.
+ such that a _second_ pass through fget succeeds. If the second
+ pass doesn't succeed (e.g. no readable storage), we give up and
+ return unprimeableVal.
The 'cache' flag becomes a 'prime' flag, with priming taking place
whenever ._<name>_prime is True, or is False or missing and
@@ -326,6 +328,8 @@ def primed_property(primer, initVal=None):
if prime == True or (prime == False and value == initVal):
primer(self)
value = fget(self)
+ if prime == False and value == initVal:
+ return unprimeableVal
return value
funcs["fget"] = _fget
return funcs
@@ -543,13 +547,14 @@ if libbe.TESTING == True:
def testPrimedLocalProperty(self):
class Test(object):
def prime(self):
- self.settings["PRIMED"] = "initialized"
+ self.settings["PRIMED"] = self.primeVal
@Property
- @primed_property(primer=prime, initVal=None)
+ @primed_property(primer=prime, initVal=None, unprimeableVal=2)
@settings_property(name="PRIMED")
def x(): return {}
def __init__(self):
self.settings={}
+ self.primeVal = "initialized"
t = Test()
self.failIf("_PRIMED_prime" in dir(t),
getattr(t, "_PRIMED_prime", None))
@@ -564,6 +569,10 @@ if libbe.TESTING == True:
t._PRIMED_prime = False
t.x = 3
self.failUnless(t.x == 3, t.x)
+ # test unprimableVal
+ t.x = None
+ t.primeVal = None
+ self.failUnless(t.x == 2, t.x)
def testChangeHookLocalProperty(self):
class Test(object):
def _hook(self, old, new):
diff --git a/libbe/storage/util/settings_object.py b/libbe/storage/util/settings_object.py
index ca94f23..8434952 100644
--- a/libbe/storage/util/settings_object.py
+++ b/libbe/storage/util/settings_object.py
@@ -40,7 +40,7 @@ class _Token (object):
pass
class UNPRIMED (_Token):
- "Property has not been primed."
+ "Property has not been primed (loaded)."
pass
class EMPTY (_Token):
@@ -60,13 +60,13 @@ def prop_save_settings(self, old, new):
def prop_load_settings(self):
"""
- The default action undertaken when an UNPRIMED property is accessed.
+ The default action undertaken when an UNPRIMED property is
+ accessed. Attempt to run .load_settings(), which calls
+ ._setup_saved_settings() internally. If .storage is inaccessible,
+ don't do anything.
"""
- if self.storage != None and self.storage.is_readable() \
- and self._settings_loaded==False:
+ if self.storage != None and self.storage.is_readable():
self.load_settings()
- else:
- self._setup_saved_settings(flag_as_loaded=False)
# Some name-mangling routines for pretty printing setting names
def setting_name_to_attr_name(self, name):
@@ -129,6 +129,12 @@ def versioned_property(name, doc,
* you set change_hook and might have mutable property values
See the docstrings in libbe.properties for details on how each of
these cases are handled.
+
+ The value stored in .settings[name] will be
+ * no value (or UNPRIMED) if the property has been neither set,
+ nor loaded as blank.
+ * EMPTY if the value has been loaded as blank.
+ * some value if the property has been either loaded or set.
"""
settings_properties.append(name)
if require_save == True:
@@ -152,7 +158,8 @@ def versioned_property(name, doc,
% (', '.join(allowed))
hooked = change_hook_property(hook=change_hook, mutable=mutable,
default=EMPTY)
- primed = primed_property(primer=primer, initVal=UNPRIMED)
+ primed = primed_property(primer=primer, initVal=UNPRIMED,
+ unprimeableVal=EMPTY)
settings = settings_property(name=name, null=UNPRIMED)
docp = doc_property(doc=fulldoc)
deco = hooked(primed(settings(docp(funcs))))
@@ -181,27 +188,29 @@ class SavedSettingsObject(object):
_attr_name_to_setting_name = attr_name_to_setting_name
def __init__(self):
- self._settings_loaded = False
self.storage = None
self.settings = {}
def load_settings(self):
"""Load the settings from disk."""
- # Override. Must call ._setup_saved_settings() after loading.
- self.settings = {}
- self._setup_saved_settings()
+ # Override. Must call ._setup_saved_settings({}) with
+ # from-storage settings.
+ self._setup_saved_settings({})
- def _setup_saved_settings(self, flag_as_loaded=True):
+ def _setup_saved_settings(self, settings=None):
"""
- To be run after setting self.settings up from disk. Marks all
- settings as primed.
+ Sets up a settings dict loaded from storage. Fills in
+ all missing settings entries with EMPTY.
"""
+ if settings == None:
+ settings = {}
for property in self.settings_properties:
if property not in self.settings \
or self.settings[property] == UNPRIMED:
- self.settings[property] = EMPTY
- if flag_as_loaded == True:
- self._settings_loaded = True
+ if property in settings:
+ self.settings[property] = settings[property]
+ else:
+ self.settings[property] = EMPTY
def save_settings(self):
"""Save the settings to disk."""
@@ -220,15 +229,16 @@ class SavedSettingsObject(object):
load already.
"""
settings = {}
+ for k in self.settings_properties: # force full load
+ if not k in self.settings or self.settings[k] == UNPRIMED:
+ value = getattr(
+ self, self._setting_name_to_attr_name(k))
for k in self.settings_properties:
- if k in self.settings and \
- not self.settings[k] in [None, EMPTY]:
+ if k in self.settings and self.settings[k] != EMPTY:
settings[k] = self.settings[k]
- else:
- value = getattr(
+ elif k in self.required_saved_properties:
+ settings[k] = getattr(
self, self._setting_name_to_attr_name(k))
- if value not in [None, EMPTY, []]:
- settings[k] = value
return settings
def clear_cached_setting(self, setting=None):
@@ -242,134 +252,275 @@ class SavedSettingsObject(object):
if libbe.TESTING == True:
+ import copy
+
+ class TestStorage (list):
+ def __init__(self):
+ list.__init__(self)
+ self.readable = True
+ self.writeable = True
+ def is_readable(self):
+ return self.readable
+ def is_writeable(self):
+ return self.writeable
+
+ class TestObject (SavedSettingsObject):
+ def load_settings(self):
+ self.load_count += 1
+ if len(self.storage) == 0:
+ settings = {}
+ else:
+ settings = copy.deepcopy(self.storage[-1])
+ self._setup_saved_settings(settings)
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self.storage.append(copy.deepcopy(settings))
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ self.load_count = 0
+ self.storage = TestStorage()
+
class SavedSettingsObjectTests(unittest.TestCase):
- def testSimpleProperty(self):
- """Testing a minimal versioned property"""
- class Test(SavedSettingsObject):
+ def testSimplePropertyDoc(self):
+ """Testing a minimal versioned property docstring"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="Content-type",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def content_type(): return {}
+ expected = "A test property\n\nThis property defaults to None."
+ self.failUnless(Test.content_type.__doc__ == expected,
+ Test.content_type.__doc__)
+ def testSimplePropertyFromMemory(self):
+ """Testing a minimal versioned property from memory"""
+ class Test (TestObject):
settings_properties = []
required_saved_properties = []
- @versioned_property(name="Content-type",
- doc="A test property",
- settings_properties=settings_properties,
- required_saved_properties= \
- required_saved_properties)
+ @versioned_property(
+ name="Content-type",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
def content_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
t = Test()
- # access missing setting
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
self.failUnless(len(t.settings) == 0, len(t.settings))
+ # accessing t.content_type triggers the priming, but
+ # t.storage.is_readable() == False, so nothing happens.
+ t.storage.readable = False
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(t.settings == {}, t.settings)
+ self.failUnless(len(t.settings) == 0, len(t.settings))
+ self.failUnless(t.content_type == None, t.content_type)
+ # accessing t.content_type triggers the priming again, and
+ # now that t.storage.is_readable() == True, this fills out
+ # t.settings with EMPTY data. At this point there should
+ # be one load and no saves.
+ t.storage.readable = True
self.failUnless(t.content_type == None, t.content_type)
- # accessing t.content_type triggers the priming, which runs
- # t._setup_saved_settings, which fills out t.settings with
- # EMPTY data. t._settings_loaded is still false though, since
- # the default priming does not do any of the `official' loading
- # that occurs in t.load_settings.
self.failUnless(len(t.settings) == 1, len(t.settings))
self.failUnless(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
- # load settings creates an EMPTY value in the settings array
- t.load_settings()
- self.failUnless(t._settings_loaded == True, t._settings_loaded)
- self.failUnless(t.settings["Content-type"] == EMPTY,
- t.settings["Content-type"])
self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ # an explicit call to load settings forces a reload,
+ # but nothing else changes.
+ t.load_settings()
self.failUnless(len(t.settings) == 1, len(t.settings))
self.failUnless(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(t.load_count == 2, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
# now we set a value
t.content_type = 5
self.failUnless(t.settings["Content-type"] == 5,
t.settings["Content-type"])
+ self.failUnless(t.load_count == 2, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':5}], t.storage)
+ # getting its value changes nothing
self.failUnless(t.content_type == 5, t.content_type)
self.failUnless(t.settings["Content-type"] == 5,
t.settings["Content-type"])
+ self.failUnless(t.load_count == 2, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':5}], t.storage)
# now we set another value
t.content_type = "text/plain"
self.failUnless(t.content_type == "text/plain", t.content_type)
self.failUnless(t.settings["Content-type"] == "text/plain",
t.settings["Content-type"])
+ self.failUnless(t.load_count == 2, t.load_count)
+ self.failUnless(len(t.storage) == 2, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':5},
+ {'Content-type':'text/plain'}],
+ t.storage)
+ # t._get_saved_settings() returns a dict of required or
+ # non-default values.
self.failUnless(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
# now we clear to the post-primed value
t.content_type = EMPTY
- self.failUnless(t._settings_loaded == True, t._settings_loaded)
self.failUnless(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
self.failUnless(t.content_type == None, t.content_type)
self.failUnless(len(t.settings) == 1, len(t.settings))
self.failUnless(t.settings["Content-type"] == EMPTY,
t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == {},
+ t._get_saved_settings())
+ self.failUnless(t.storage == [{'Content-type':5},
+ {'Content-type':'text/plain'},
+ {}],
+ t.storage)
+ def testSimplePropertyFromStorage(self):
+ """Testing a minimal versioned property from storage"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="prop-a",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def prop_a(): return {}
+ @versioned_property(
+ name="prop-b",
+ doc="Another test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def prop_b(): return {}
+ t = Test()
+ t.storage.append({'prop-a':'saved'})
+ # setting prop-b forces a load (to check for changes),
+ # which also pulls in prop-a.
+ t.prop_b = 'new-b'
+ settings = {'prop-b':'new-b', 'prop-a':'saved'}
+ self.failUnless(t.settings == settings, t.settings)
+ self.failUnless(t._get_saved_settings() == settings,
+ t._get_saved_settings())
+ # test that _get_saved_settings() works even when settings
+ # were _not_ loaded beforehand
+ t = Test()
+ t.storage.append({'prop-a':'saved'})
+ settings ={'prop-a':'saved'}
+ self.failUnless(t.settings == {}, t.settings)
+ self.failUnless(t._get_saved_settings() == settings,
+ t._get_saved_settings())
+ def testSimplePropertySetStorageSave(self):
+ """Set a property, then attach storage and save"""
+ class Test (TestObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(
+ name="prop-a",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def prop_a(): return {}
+ @versioned_property(
+ name="prop-b",
+ doc="Another test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def prop_b(): return {}
+ t = Test()
+ storage = t.storage
+ t.storage = None
+ t.prop_a = 'text/html'
+ t.storage = storage
+ t.save_settings()
+ self.failUnless(t.prop_a == 'text/html', t.prop_a)
+ self.failUnless(t.settings == {'prop-a':'text/html',
+ 'prop-b':EMPTY},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'prop-a':'text/html'}],
+ t.storage)
def testDefaultingProperty(self):
"""Testing a defaulting versioned property"""
- class Test(SavedSettingsObject):
+ class Test (TestObject):
settings_properties = []
required_saved_properties = []
- @versioned_property(name="Content-type",
- doc="A test property",
- default="text/plain",
- settings_properties=settings_properties,
- required_saved_properties= \
- required_saved_properties)
+ @versioned_property(
+ name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
def content_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
t = Test()
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ self.failUnless(t.settings == {}, t.settings)
self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
- t.load_settings()
- self.failUnless(t._settings_loaded == True, t._settings_loaded)
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings["Content-type"] == EMPTY,
- t.settings["Content-type"])
- self.failUnless(t._get_saved_settings() ==
- {"Content-type":"text/plain"},
+ self.failUnless(t.settings == {"Content-type":EMPTY},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
+ self.failUnless(t._get_saved_settings() == {},
t._get_saved_settings())
t.content_type = "text/html"
self.failUnless(t.content_type == "text/html",
t.content_type)
- self.failUnless(t.settings["Content-type"] == "text/html",
- t.settings["Content-type"])
+ self.failUnless(t.settings == {"Content-type":"text/html"},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ t.storage)
self.failUnless(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
def testRequiredDefaultingProperty(self):
"""Testing a required defaulting versioned property"""
- class Test(SavedSettingsObject):
+ class Test (TestObject):
settings_properties = []
required_saved_properties = []
- @versioned_property(name="Content-type",
- doc="A test property",
- default="text/plain",
- settings_properties=settings_properties,
- required_saved_properties= \
- required_saved_properties,
- require_save=True)
+ @versioned_property(
+ name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ require_save=True)
def content_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
t = Test()
+ self.failUnless(t.settings == {}, t.settings)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings == {"Content-type":EMPTY},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
self.failUnless(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
t.content_type = "text/html"
+ self.failUnless(t.content_type == "text/html",
+ t.content_type)
+ self.failUnless(t.settings == {"Content-type":"text/html"},
+ t.settings)
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ t.storage)
self.failUnless(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
def testClassVersionedPropertyDefinition(self):
"""Testing a class-specific _versioned property decorator"""
- class Test(SavedSettingsObject):
+ class Test (TestObject):
settings_properties = []
required_saved_properties = []
- def _versioned_property(settings_properties= \
- settings_properties,
- required_saved_properties= \
- required_saved_properties,
- **kwargs):
+ def _versioned_property(
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
if "settings_properties" not in kwargs:
kwargs["settings_properties"] = settings_properties
if "required_saved_properties" not in kwargs:
@@ -377,69 +528,60 @@ if libbe.TESTING == True:
required_saved_properties
return versioned_property(**kwargs)
@_versioned_property(name="Content-type",
- doc="A test property",
- default="text/plain",
- require_save=True)
+ doc="A test property",
+ default="text/plain",
+ require_save=True)
def content_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
t = Test()
self.failUnless(t._get_saved_settings() == \
{"Content-type":"text/plain"},
t._get_saved_settings())
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
t.content_type = "text/html"
self.failUnless(t._get_saved_settings() == \
{"Content-type":"text/html"},
t._get_saved_settings())
+ self.failUnless(t.load_count == 1, t.load_count)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'Content-type':'text/html'}],
+ t.storage)
def testMutableChangeHookedProperty(self):
"""Testing a mutable change-hooked property"""
- SAVES = []
- def prop_log_save_settings(self, old, new, saves=SAVES):
- saves.append("'%s' -> '%s'" % (str(old), str(new)))
- prop_save_settings(self, old, new)
- class Test(SavedSettingsObject):
+ class Test (TestObject):
settings_properties = []
required_saved_properties = []
- @versioned_property(name="List-type",
- doc="A test property",
- mutable=True,
- change_hook=prop_log_save_settings,
- settings_properties=settings_properties,
- required_saved_properties= \
- required_saved_properties)
+ @versioned_property(
+ name="List-type",
+ doc="A test property",
+ mutable=True,
+ change_hook=prop_save_settings,
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
def list_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
t = Test()
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
- t.load_settings()
- self.failUnless(SAVES == [], SAVES)
- self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
self.failUnless(t.list_type == None, t.list_type)
- self.failUnless(SAVES == [], SAVES)
+ self.failUnless(len(t.storage) == 0, len(t.storage))
self.failUnless(t.settings["List-type"]==EMPTY,
t.settings["List-type"])
t.list_type = []
self.failUnless(t.settings["List-type"] == [],
t.settings["List-type"])
- self.failUnless(SAVES == [
- "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'"
- ], SAVES)
- t.list_type.append(5)
- self.failUnless(SAVES == [
- "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'",
- ], SAVES)
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'List-type':[]}],
+ t.storage)
+ t.list_type.append(5) # external modification not detected yet
+ self.failUnless(len(t.storage) == 1, len(t.storage))
+ self.failUnless(t.storage == [{'List-type':[]}],
+ t.storage)
self.failUnless(t.settings["List-type"] == [5],
t.settings["List-type"])
- self.failUnless(SAVES == [ # the append(5) has not yet been saved
- "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'",
- ], SAVES)
- self.failUnless(t.list_type == [5], t.list_type)#get triggers saved
-
- self.failUnless(SAVES == [ # now the append(5) has been saved.
- "'<class 'libbe.storage.util.settings_object.EMPTY'>' -> '[]'",
- "'[]' -> '[5]'"
- ], SAVES)
+ self.failUnless(t.list_type == [5], t.list_type)# get triggers save
+ self.failUnless(len(t.storage) == 2, len(t.storage))
+ self.failUnless(t.storage == [{'List-type':[]},
+ {'List-type':[5]}],
+ t.storage)
unitsuite = unittest.TestLoader().loadTestsFromTestCase( \
SavedSettingsObjectTests)
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
index 9fc43c1..7d4383f 100644
--- a/libbe/storage/vcs/base.py
+++ b/libbe/storage/vcs/base.py
@@ -504,6 +504,12 @@ os.listdir(self.get_path("bugs")):
"""
pass
+ def _vcs_exists(self, path, revision=None):
+ """
+ Does the path exist in a given revision? (True/False)
+ """
+ raise NotImplementedError
+
def _vcs_remove(self, path):
"""
Remove the file at path from version control. Optionally
@@ -550,7 +556,7 @@ os.listdir(self.get_path("bugs")):
def _vcs_path(self, id, revision):
"""
- Return the path to object id as of revision.
+ Return the relative path to object id as of revision.
Revision will not be None.
"""
@@ -694,6 +700,17 @@ os.listdir(self.get_path("bugs")):
def _disconnect(self):
self._cached_path_id.disconnect()
+ def path(self, id, revision=None, relpath=True):
+ if revision == None:
+ path = self._cached_path_id.path(id)
+ if relpath == True:
+ return self._u_rel_path(path)
+ return path
+ path = self._vcs_path(id, revision)
+ if relpath == True:
+ return path
+ return os.path.join(self.repo, path)
+
def _add_path(self, path, directory=False):
relpath = self._u_rel_path(path)
reldirs = relpath.split(os.path.sep)
@@ -716,6 +733,16 @@ os.listdir(self.get_path("bugs")):
path = self._cached_path_id.add_id(id, parent)
self._add_path(path, **kwargs)
+ def _exists(self, id, revision=None):
+ if revision == None:
+ try:
+ path = self.path(id, revision, relpath=False)
+ except InvalidID, e:
+ return False
+ return os.path.exists(path)
+ path = self.path(id, revision, relpath=True)
+ return self._vcs_exists(relpath, revision)
+
def _remove(self, id):
path = self._cached_path_id.path(id)
if os.path.exists(path):
@@ -746,15 +773,10 @@ os.listdir(self.get_path("bugs")):
self._cached_path_id.remove_id(id)
def _ancestors(self, id=None, revision=None):
- if revision == None:
- id_to_path = self._cached_path_id.path
- else:
- id_to_path = lambda id : os.path.join(
- self.repo, self._vcs_path(id, revision))
if id==None:
path = self.be_dir
else:
- path = id_to_path(id)
+ path = self.path(id, revision, relpath=False)
ancestors = []
while True:
if not path.startswith(self.repo + os.path.sep):
@@ -769,12 +791,9 @@ os.listdir(self.get_path("bugs")):
def _children(self, id=None, revision=None):
if revision == None:
- id_to_path = self._cached_path_id.path
isdir = os.path.isdir
listdir = os.listdir
else:
- id_to_path = lambda id : os.path.join(
- self.repo, self._vcs_path(id, revision))
isdir = lambda path : self._vcs_isdir(
self._u_rel_path(path), revision)
listdir = lambda path : self._vcs_listdir(
@@ -782,7 +801,7 @@ os.listdir(self.get_path("bugs")):
if id==None:
path = self.be_dir
else:
- path = id_to_path(id)
+ path = self.path(id, revision, relpath=False)
if isdir(path) == False:
return []
children = listdir(path)
@@ -810,25 +829,18 @@ os.listdir(self.get_path("bugs")):
def _get(self, id, default=libbe.util.InvalidObject, revision=None):
try:
- path = self._cached_path_id.path(id)
+ relpath = self.path(id, revision, relpath=True)
+ contents = self._vcs_get_file_contents(relpath, revision)
except InvalidID, e:
if default == libbe.util.InvalidObject:
raise e
return default
- relpath = self._u_rel_path(path)
- try:
- contents = self._vcs_get_file_contents(relpath, revision)
- except InvalidID, e:
- if e.id == None:
- e.id = id
- if e.revision == None:
- e.revision = revision
- raise
if contents in [libbe.storage.base.InvalidDirectory,
- libbe.util.InvalidObject]:
- raise InvalidID(id, revision)
- elif len(contents) == 0:
- return None
+ libbe.util.InvalidObject] \
+ or len(contents) == 0:
+ if default == libbe.util.InvalidObject:
+ raise InvalidID(id, revision)
+ return default
return contents
def _set(self, id, value):
diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py
index e1cd2e5..1db50f8 100644
--- a/libbe/storage/vcs/bzr.py
+++ b/libbe/storage/vcs/bzr.py
@@ -98,6 +98,13 @@ class Bzr(base.VCS):
cmd.outf = StringIO.StringIO()
cmd.run(file_list=[path], file_ids_from=self.repo)
+ def _vcs_exists(self, path, revision=None):
+ manifest = self._vcs_listdir(
+ self.repo, revision=revision, recursive=True)
+ if path in manifest:
+ return True
+ return False
+
def _vcs_remove(self, path):
# --force to also remove unversioned files.
path = os.path.join(self.repo, path)
@@ -131,7 +138,7 @@ class Bzr(base.VCS):
if 'not present in revision' in str(e):
raise base.InvalidPath(path, root=self.repo, revision=revision)
raise
- return cmd.outf.getvalue()
+ return cmd.outf.getvalue()
def _vcs_path(self, id, revision):
manifest = self._vcs_listdir(