aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2009-12-07 20:07:55 -0500
committerW. Trevor King <wking@drexel.edu>2009-12-07 20:07:55 -0500
commit49a7771336ce09f6d42c7699ef32aecea0e83182 (patch)
treef237c7413fb68e72b1d87b0ccc4c788944168f10 /libbe/storage
parentc3bcafe12034d35f5c46f76a7dab97ab08b84dfd (diff)
downloadbugseverywhere-49a7771336ce09f6d42c7699ef32aecea0e83182.tar.gz
Initial directory restructuring to clarify dependencies
Diffstat (limited to 'libbe/storage')
-rw-r--r--libbe/storage/properties.py642
-rw-r--r--libbe/storage/settings_object.py433
-rw-r--r--libbe/storage/vcs/arch.py315
-rw-r--r--libbe/storage/vcs/base.py941
-rw-r--r--libbe/storage/vcs/bzr.py117
-rw-r--r--libbe/storage/vcs/darcs.py192
-rw-r--r--libbe/storage/vcs/git.py151
-rw-r--r--libbe/storage/vcs/hg.py108
-rw-r--r--libbe/storage/vcs/util/config.py94
-rw-r--r--libbe/storage/vcs/util/mapfile.py126
-rw-r--r--libbe/storage/vcs/util/upgrade.py246
11 files changed, 3365 insertions, 0 deletions
diff --git a/libbe/storage/properties.py b/libbe/storage/properties.py
new file mode 100644
index 0000000..f756ff0
--- /dev/null
+++ b/libbe/storage/properties.py
@@ -0,0 +1,642 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+This module provides a series of useful decorators for defining
+various types of properties. For example usage, consider the
+unittests at the end of the module.
+
+See
+ http://www.python.org/dev/peps/pep-0318/
+and
+ http://www.phyast.pitt.edu/~micheles/python/documentation.html
+for more information on decorators.
+"""
+
+import copy
+import types
+
+import libbe
+if libbe.TESTING == True:
+ import unittest
+
+
+class ValueCheckError (ValueError):
+ def __init__(self, name, value, allowed):
+ action = "in" # some list of allowed values
+ if type(allowed) == types.FunctionType:
+ action = "allowed by" # some allowed-value check function
+ msg = "%s not %s %s for %s" % (value, action, allowed, name)
+ ValueError.__init__(self, msg)
+ self.name = name
+ self.value = value
+ self.allowed = allowed
+
+def Property(funcs):
+ """
+ End a chain of property decorators, returning a property.
+ """
+ args = {}
+ args["fget"] = funcs.get("fget", None)
+ args["fset"] = funcs.get("fset", None)
+ args["fdel"] = funcs.get("fdel", None)
+ args["doc"] = funcs.get("doc", None)
+
+ #print "Creating a property with"
+ #for key, val in args.items(): print key, value
+ return property(**args)
+
+def doc_property(doc=None):
+ """
+ Add a docstring to a chain of property decorators.
+ """
+ def decorator(funcs=None):
+ """
+ Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...}
+ or a function fn() returning such a dict.
+ """
+ if hasattr(funcs, "__call__"):
+ funcs = funcs() # convert from function-arg to dict
+ funcs["doc"] = doc
+ return funcs
+ return decorator
+
+def local_property(name, null=None, mutable_null=False):
+ """
+ Define get/set access to per-parent-instance local storage. Uses
+ ._<name>_value to store the value for a particular owner instance.
+ If the ._<name>_value attribute does not exist, returns null.
+
+ If mutable_null == True, we only release deepcopies of the null to
+ the outside world.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ if mutable_null == True:
+ ret_null = copy.deepcopy(null)
+ else:
+ ret_null = null
+ value = getattr(self, "_%s_value" % name, ret_null)
+ return value
+ def _fset(self, value):
+ setattr(self, "_%s_value" % name, value)
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+def settings_property(name, null=None):
+ """
+ Similar to local_property, except where local_property stores the
+ value in instance._<name>_value, settings_property stores the
+ value in instance.settings[name].
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ value = self.settings.get(name, null)
+ return value
+ def _fset(self, value):
+ self.settings[name] = value
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+
+
+# Allow comparison and caching with _original_ values for mutables,
+# since
+#
+# >>> a = []
+# >>> b = a
+# >>> b.append(1)
+# >>> a
+# [1]
+# >>> a==b
+# True
+def _hash_mutable_value(value):
+ return repr(value)
+def _init_mutable_property_cache(self):
+ if not hasattr(self, "_mutable_property_cache_hash"):
+ # first call to _fget for any mutable property
+ self._mutable_property_cache_hash = {}
+ self._mutable_property_cache_copy = {}
+def _set_cached_mutable_property(self, cacher_name, property_name, value):
+ _init_mutable_property_cache(self)
+ self._mutable_property_cache_hash[(cacher_name, property_name)] = \
+ _hash_mutable_value(value)
+ self._mutable_property_cache_copy[(cacher_name, property_name)] = \
+ copy.deepcopy(value)
+def _get_cached_mutable_property(self, cacher_name, property_name, default=None):
+ _init_mutable_property_cache(self)
+ if (cacher_name, property_name) not in self._mutable_property_cache_copy:
+ return default
+ return self._mutable_property_cache_copy[(cacher_name, property_name)]
+def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None):
+ _init_mutable_property_cache(self)
+ if (cacher_name, property_name) not in self._mutable_property_cache_hash:
+ _set_cached_mutable_property(self, cacher_name, property_name, default)
+ old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)]
+ return cmp(_hash_mutable_value(value), old_hash)
+
+
+def defaulting_property(default=None, null=None,
+ mutable_default=False):
+ """
+ Define a default value for get access to a property.
+ If the stored value is null, then default is returned.
+
+ If mutable_default == True, we only release deepcopies of the
+ default to the outside world.
+
+ null should never escape to the outside world, so don't worry
+ about it being a mutable.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value == null:
+ if mutable_default == True:
+ return copy.deepcopy(default)
+ else:
+ return default
+ return value
+ def _fset(self, value):
+ if value == default:
+ value = null
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def fn_checked_property(value_allowed_fn):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ return value
+ def _fset(self, value):
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def checked_property(allowed=[]):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ return value
+ def _fset(self, value):
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+def cached_property(generator, initVal=None, mutable=False):
+ """
+ Allow caching of values generated by generator(instance), where
+ instance is the instance to which this property belongs. Uses
+ ._<name>_cache to store a cache flag for a particular owner
+ instance.
+
+ When the cache flag is True or missing and the stored value is
+ initVal, the first fget call triggers the generator function,
+ whose output is stored in _<name>_cached_value. That and
+ subsequent calls to fget will return this cached value.
+
+ If the input value is no longer initVal (e.g. a value has been
+ loaded from disk or set with fset), that value overrides any
+ cached value, and this property has no effect.
+
+ When the cache flag is False and the stored value is initVal, the
+ generator is not cached, but is called on every fget.
+
+ The cache flag is missing on initialization. Particular instances
+ may override by setting their own flag.
+
+ In the case that mutable == True, all caching is disabled and the
+ generator is called whenever the cached value would otherwise be
+ used.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ cache = getattr(self, "_%s_cache" % name, True)
+ value = fget(self)
+ if value == initVal:
+ if cache == True and mutable == False:
+ if hasattr(self, "_%s_cached_value" % name):
+ value = getattr(self, "_%s_cached_value" % name)
+ else:
+ value = generator(self)
+ setattr(self, "_%s_cached_value" % name, value)
+ else:
+ value = generator(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def primed_property(primer, initVal=None):
+ """
+ Just like a cached_property, except that instead of returning a
+ new value and running fset to cache it, the primer performs some
+ background manipulation (e.g. loads data into instance.settings)
+ such that a _second_ pass through fget succeeds.
+
+ The 'cache' flag becomes a 'prime' flag, with priming taking place
+ whenever ._<name>_prime is True, or is False or missing and
+ value == initVal.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ prime = getattr(self, "_%s_prime" % name, False)
+ if prime == False:
+ value = fget(self)
+ if prime == True or (prime == False and value == initVal):
+ primer(self)
+ value = fget(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+
+def change_hook_property(hook, mutable=False, default=None):
+ """
+ Call the function hook(instance, old_value, new_value) whenever a
+ value different from the current value is set (instance is a a
+ reference to the class instance to which this property belongs).
+ This is useful for saving changes to disk, etc. This function is
+ called _after_ the new value has been stored, allowing you to
+ change the stored value if you want.
+
+ In the case of mutables, things are slightly trickier. Because
+ the property-owning class has no way of knowing when the value
+ changes. We work around this by caching a private deepcopy of the
+ mutable value, and checking for changes whenever the property is
+ set (obviously) or retrieved (to check for external changes). So
+ long as you're conscientious about accessing the property after
+ making external modifications, mutability woln't be a problem.
+ t.x.append(5) # external modification
+ t.x # dummy access notices change and triggers hook
+ See testChangeHookMutableProperty for an example of the expected
+ behavior.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self, new_value=None, from_fset=False): # only used if mutable == True
+ if from_fset == True:
+ value = new_value # compare new value with cached
+ else:
+ value = fget(self) # compare current value with cached
+ if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0:
+ # there has been a change, cache new value
+ old_value = _get_cached_mutable_property(self, "change hook property", name, default)
+ _set_cached_mutable_property(self, "change hook property", name, value)
+ if from_fset == True: # return previously cached value
+ value = old_value
+ else: # the value changed while we weren't looking
+ hook(self, old_value, value)
+ return value
+ def _fset(self, value):
+ if mutable == True: # get cached previous value
+ old_value = _fget(self, new_value=value, from_fset=True)
+ else:
+ old_value = fget(self)
+ fset(self, value)
+ if value != old_value:
+ hook(self, old_value, value)
+ if mutable == True:
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+
+if libbe.TESTING == True:
+ class DecoratorTests(unittest.TestCase):
+ def testLocalDoc(self):
+ class Test(object):
+ @Property
+ @doc_property("A fancy property")
+ def x():
+ return {}
+ self.failUnless(Test.x.__doc__ == "A fancy property",
+ Test.x.__doc__)
+ def testLocalProperty(self):
+ class Test(object):
+ @Property
+ @local_property(name="LOCAL")
+ def x():
+ return {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("_LOCAL_value" in dir(t), dir(t))
+ self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
+ def testSettingsProperty(self):
+ class Test(object):
+ @Property
+ @settings_property(name="attr")
+ def x():
+ return {}
+ def __init__(self):
+ self.settings = {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("attr" in t.settings, t.settings)
+ self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
+ def testDefaultingLocalProperty(self):
+ class Test(object):
+ @Property
+ @defaulting_property(default='y', null='x')
+ @local_property(name="DEFAULT", null=5)
+ def x(): return {}
+ t = Test()
+ self.failUnless(t.x == 5, str(t.x))
+ t.x = 'x'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'y'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'z'
+ self.failUnless(t.x == 'z', str(t.x))
+ t.x = 5
+ self.failUnless(t.x == 5, str(t.x))
+ def testCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testTwoCheckedLocalProperties(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="X")
+ def x(): return {}
+
+ @Property
+ @checked_property(allowed=['a', 'b', 'c'])
+ @local_property(name="A")
+ def a(): return {}
+ def __init__(self):
+ self._A_value = 'a'
+ self._X_value = 'x'
+ t = Test()
+ try:
+ t.x = 'a'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.x = 'x'
+ t.x = 'y'
+ t.x = 'z'
+ try:
+ t.a = 'x'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.a = 'a'
+ t.a = 'b'
+ t.a = 'c'
+ def testFnCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @fn_checked_property(lambda v : v in ['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testCachedLocalProperty(self):
+ class Gen(object):
+ def __init__(self):
+ self.i = 0
+ def __call__(self, owner):
+ self.i += 1
+ return self.i
+ class Test(object):
+ @Property
+ @cached_property(generator=Gen(), initVal=None)
+ @local_property(name="CACHED")
+ def x(): return {}
+ t = Test()
+ self.failIf("_CACHED_cache" in dir(t),
+ getattr(t, "_CACHED_cache", None))
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ t.x = 8
+ self.failUnless(t.x == 8, t.x)
+ self.failUnless(t.x == 8, t.x)
+ t._CACHED_cache = False # Caching is off, but the stored value
+ val = t.x # is 8, not the initVal (None), so we
+ self.failUnless(val == 8, val) # get 8.
+ t._CACHED_value = None # Now we've set the stored value to None
+ val = t.x # so future calls to fget (like this)
+ self.failUnless(val == 2, val) # will call the generator every time...
+ val = t.x
+ self.failUnless(val == 3, val)
+ val = t.x
+ self.failUnless(val == 4, val)
+ t._CACHED_cache = True # We turn caching back on, and get
+ self.failUnless(t.x == 1, str(t.x)) # the original cached value.
+ del t._CACHED_cached_value # Removing that value forces a
+ self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call
+ self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which
+ self.failUnless(t.x == 5, str(t.x)) # we get the new cached value.
+ def testPrimedLocalProperty(self):
+ class Test(object):
+ def prime(self):
+ self.settings["PRIMED"] = "initialized"
+ @Property
+ @primed_property(primer=prime, initVal=None)
+ @settings_property(name="PRIMED")
+ def x(): return {}
+ def __init__(self):
+ self.settings={}
+ t = Test()
+ self.failIf("_PRIMED_prime" in dir(t),
+ getattr(t, "_PRIMED_prime", None))
+ self.failUnless(t.x == "initialized", t.x)
+ t.x = 1
+ self.failUnless(t.x == 1, t.x)
+ t.x = None
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = True
+ t.x = 3
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = False
+ t.x = 3
+ self.failUnless(t.x == 3, t.x)
+ def testChangeHookLocalProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+
+ @Property
+ @change_hook_property(_hook)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 2
+ self.failUnless(t.old == 1, t.old)
+ self.failUnless(t.new == 2, t.new)
+ def testChangeHookMutableProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+ self.hook_calls += 1
+
+ @Property
+ @change_hook_property(_hook, mutable=True)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.hook_calls = 0
+ t.x = []
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == [], t.new)
+ self.failUnless(t.hook_calls == 1, t.hook_calls)
+ a = t.x
+ a.append(5)
+ t.x = a
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 2, t.hook_calls)
+ t.x = []
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [], t.new)
+ self.failUnless(t.hook_calls == 3, t.hook_calls)
+ # now append without reassigning. this doesn't trigger the
+ # change, since we don't ever set t.x, only get it and mess
+ # with it. It does, however, update our t.new, since t.new =
+ # t.x and is not a static copy.
+ t.x.append(5)
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 3, t.hook_calls)
+ # however, the next t.x get _will_ notice the change...
+ a = t.x
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 4, t.hook_calls)
+ t.x.append(6) # this append(6) is not noticed yet
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5,6], t.new)
+ self.failUnless(t.hook_calls == 4, t.hook_calls)
+ # this append(7) is not noticed, but the t.x get causes the
+ # append(6) to be noticed
+ t.x.append(7)
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [5,6,7], t.new)
+ self.failUnless(t.hook_calls == 5, t.hook_calls)
+ a = t.x # now the append(7) is noticed
+ self.failUnless(t.old == [5,6], t.old)
+ self.failUnless(t.new == [5,6,7], t.new)
+ self.failUnless(t.hook_calls == 6, t.hook_calls)
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
diff --git a/libbe/storage/settings_object.py b/libbe/storage/settings_object.py
new file mode 100644
index 0000000..6a00ba9
--- /dev/null
+++ b/libbe/storage/settings_object.py
@@ -0,0 +1,433 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+This module provides a base class implementing settings-dict based
+property storage useful for BE objects with saved properties
+(e.g. BugDir, Bug, Comment). For example usage, consider the
+unittests at the end of the module.
+"""
+
+import libbe
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+
+class _Token (object):
+ """
+ `Control' value class for properties. We want values that only
+ mean something to the settings_object module.
+ """
+ pass
+
+class UNPRIMED (_Token):
+ "Property has not been primed."
+ pass
+
+class EMPTY (_Token):
+ """
+ Property has been primed but has no user-set value, so use
+ default/generator value.
+ """
+ pass
+
+
+def prop_save_settings(self, old, new):
+ """
+ The default action undertaken when a property changes.
+ """
+ if self.sync_with_disk==True:
+ self.save_settings()
+
+def prop_load_settings(self):
+ """
+ The default action undertaken when an UNPRIMED property is accessed.
+ """
+ if self.sync_with_disk==True and self._settings_loaded==False:
+ self.load_settings()
+ else:
+ self._setup_saved_settings(flag_as_loaded=False)
+
+# Some name-mangling routines for pretty printing setting names
+def setting_name_to_attr_name(self, name):
+ """
+ Convert keys to the .settings dict into their associated
+ SavedSettingsObject attribute names.
+ >>> print setting_name_to_attr_name(None,"User-id")
+ user_id
+ """
+ return name.lower().replace('-', '_')
+
+def attr_name_to_setting_name(self, name):
+ """
+ The inverse of setting_name_to_attr_name.
+ >>> print attr_name_to_setting_name(None, "user_id")
+ User-id
+ """
+ return name.capitalize().replace('_', '-')
+
+
+def versioned_property(name, doc,
+ default=None, generator=None,
+ change_hook=prop_save_settings,
+ mutable=False,
+ primer=prop_load_settings,
+ allowed=None, check_fn=None,
+ settings_properties=[],
+ required_saved_properties=[],
+ require_save=False):
+ """
+ Combine the common decorators in a single function.
+
+ Use zero or one (but not both) of default or generator, since a
+ working default will keep the generator from functioning. Use the
+ default if you know what you want the default value to be at
+ 'coding time'. Use the generator if you can write a function to
+ determine a valid default at run time. If both default and
+ generator are None, then the property will be a defaulting
+ property which defaults to None.
+
+ allowed and check_fn have a similar relationship, although you can
+ use both of these if you want. allowed compares the proposed
+ value against a list determined at 'coding time' and check_fn
+ allows more flexible comparisons to take place at run time.
+
+ Set require_save to True if you want to save the default/generated
+ value for a property, to protect against future changes. E.g., we
+ currently expect all comments to be 'text/plain' but in the future
+ we may want to default to 'text/html'. If we don't want the old
+ comments to be interpreted as 'text/html', we would require that
+ the content type be saved.
+
+ change_hook, primer, settings_properties, and
+ required_saved_properties are only options to get their defaults
+ into our local scope. Don't mess with them.
+
+ Set mutable=True if:
+ * default is a mutable
+ * your generator function may return mutables
+ * you set change_hook and might have mutable property values
+ See the docstrings in libbe.properties for details on how each of
+ these cases are handled.
+ """
+ settings_properties.append(name)
+ if require_save == True:
+ required_saved_properties.append(name)
+ def decorator(funcs):
+ fulldoc = doc
+ if default != None or generator == None:
+ defaulting = defaulting_property(default=default, null=EMPTY,
+ mutable_default=mutable)
+ fulldoc += "\n\nThis property defaults to %s." % default
+ if generator != None:
+ cached = cached_property(generator=generator, initVal=EMPTY,
+ mutable=mutable)
+ fulldoc += "\n\nThis property is generated with %s." % generator
+ if check_fn != None:
+ fn_checked = fn_checked_property(value_allowed_fn=check_fn)
+ fulldoc += "\n\nThis property is checked with %s." % check_fn
+ if allowed != None:
+ checked = checked_property(allowed=allowed)
+ fulldoc += "\n\nThe allowed values for this property are: %s." \
+ % (', '.join(allowed))
+ hooked = change_hook_property(hook=change_hook, mutable=mutable,
+ default=EMPTY)
+ primed = primed_property(primer=primer, initVal=UNPRIMED)
+ settings = settings_property(name=name, null=UNPRIMED)
+ docp = doc_property(doc=fulldoc)
+ deco = hooked(primed(settings(docp(funcs))))
+ if default != None or generator == None:
+ deco = defaulting(deco)
+ if generator != None:
+ deco = cached(deco)
+ if check_fn != None:
+ deco = fn_checked(deco)
+ if allowed != None:
+ deco = checked(deco)
+ return Property(deco)
+ return decorator
+
+class SavedSettingsObject(object):
+
+ # Keep a list of properties that may be stored in the .settings dict.
+ #settings_properties = []
+
+ # A list of properties that we save to disk, even if they were
+ # never set (in which case we save the default value). This
+ # protects against future changes in default values.
+ #required_saved_properties = []
+
+ _setting_name_to_attr_name = setting_name_to_attr_name
+ _attr_name_to_setting_name = attr_name_to_setting_name
+
+ def __init__(self):
+ self._settings_loaded = False
+ self.sync_with_disk = False
+ self.settings = {}
+
+ def load_settings(self):
+ """Load the settings from disk."""
+ # Override. Must call ._setup_saved_settings() after loading.
+ self.settings = {}
+ self._setup_saved_settings()
+
+ def _setup_saved_settings(self, flag_as_loaded=True):
+ """
+ To be run after setting self.settings up from disk. Marks all
+ settings as primed.
+ """
+ for property in self.settings_properties:
+ if property not in self.settings:
+ self.settings[property] = EMPTY
+ elif self.settings[property] == UNPRIMED:
+ self.settings[property] = EMPTY
+ if flag_as_loaded == True:
+ self._settings_loaded = True
+
+ def save_settings(self):
+ """Load the settings from disk."""
+ # Override. Should save the dict output of ._get_saved_settings()
+ settings = self._get_saved_settings()
+ pass # write settings to disk....
+
+ def _get_saved_settings(self):
+ settings = {}
+ for k,v in self.settings.items():
+ if v != None and v != EMPTY:
+ settings[k] = v
+ for k in self.required_saved_properties:
+ settings[k] = getattr(self, self._setting_name_to_attr_name(k))
+ return settings
+
+ def clear_cached_setting(self, setting=None):
+ "If setting=None, clear *all* cached settings"
+ if setting != None:
+ if hasattr(self, "_%s_cached_value" % setting):
+ delattr(self, "_%s_cached_value" % setting)
+ else:
+ for setting in settings_properties:
+ self.clear_cached_setting(setting)
+
+
+if libbe.TESTING == True:
+ class SavedSettingsObjectTests(unittest.TestCase):
+ def testSimpleProperty(self):
+ """Testing a minimal versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties= \
+ required_saved_properties)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ # access missing setting
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ self.failUnless(len(t.settings) == 0, len(t.settings))
+ self.failUnless(t.content_type == None, t.content_type)
+ # accessing t.content_type triggers the priming, which runs
+ # t._setup_saved_settings, which fills out t.settings with
+ # EMPTY data. t._settings_loaded is still false though, since
+ # the default priming does not do any of the `official' loading
+ # that occurs in t.load_settings.
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ # load settings creates an EMPTY value in the settings array
+ t.load_settings()
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ # now we set a value
+ t.content_type = 5
+ self.failUnless(t.settings["Content-type"] == 5,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == 5, t.content_type)
+ self.failUnless(t.settings["Content-type"] == 5,
+ t.settings["Content-type"])
+ # now we set another value
+ t.content_type = "text/plain"
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings["Content-type"] == "text/plain",
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ # now we clear to the post-primed value
+ t.content_type = EMPTY
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ def testDefaultingProperty(self):
+ """Testing a defaulting versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties= \
+ required_saved_properties)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ t.load_settings()
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == {},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t.content_type == "text/html",
+ t.content_type)
+ self.failUnless(t.settings["Content-type"] == "text/html",
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testRequiredDefaultingProperty(self):
+ """Testing a required defaulting versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties= \
+ required_saved_properties,
+ require_save=True)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testClassVersionedPropertyDefinition(self):
+ """Testing a class-specific _versioned property decorator"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ def _versioned_property(settings_properties= \
+ settings_properties,
+ required_saved_properties= \
+ required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"] = \
+ required_saved_properties
+ return versioned_property(**kwargs)
+ @_versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ require_save=True)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings() == \
+ {"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testMutableChangeHookedProperty(self):
+ """Testing a mutable change-hooked property"""
+ SAVES = []
+ def prop_log_save_settings(self, old, new, saves=SAVES):
+ saves.append("'%s' -> '%s'" % (str(old), str(new)))
+ prop_save_settings(self, old, new)
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="List-type",
+ doc="A test property",
+ mutable=True,
+ change_hook=prop_log_save_settings,
+ settings_properties=settings_properties,
+ required_saved_properties= \
+ required_saved_properties)
+ def list_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ t.load_settings()
+ self.failUnless(SAVES == [], SAVES)
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.list_type == None, t.list_type)
+ self.failUnless(SAVES == [], SAVES)
+ self.failUnless(t.settings["List-type"]==EMPTY,
+ t.settings["List-type"])
+ t.list_type = []
+ self.failUnless(t.settings["List-type"] == [],
+ t.settings["List-type"])
+ self.failUnless(SAVES == [
+ "'<class 'libbe.settings_object.EMPTY'>' -> '[]'"
+ ], SAVES)
+ t.list_type.append(5)
+ self.failUnless(SAVES == [
+ "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
+ ], SAVES)
+ self.failUnless(t.settings["List-type"] == [5],
+ t.settings["List-type"])
+ self.failUnless(SAVES == [ # the append(5) has not yet been saved
+ "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
+ ], SAVES)
+ self.failUnless(t.list_type == [5], t.list_type)#get triggers saved
+
+ self.failUnless(SAVES == [ # now the append(5) has been saved.
+ "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
+ "'[]' -> '[5]'"
+ ], SAVES)
+
+ unitsuite = unittest.TestLoader().loadTestsFromTestCase( \
+ SavedSettingsObjectTests)
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/arch.py b/libbe/storage/vcs/arch.py
new file mode 100644
index 0000000..45a3284
--- /dev/null
+++ b/libbe/storage/vcs/arch.py
@@ -0,0 +1,315 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# James Rowe <jnrowe@ukfsn.org>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+GNU Arch (tla) backend.
+"""
+
+import codecs
+import os
+import re
+import shutil
+import sys
+import time
+
+import libbe
+from beuuid import uuid_gen
+import config
+import vcs
+if libbe.TESTING == True:
+ import unittest
+ import doctest
+
+
+DEFAULT_CLIENT = "tla"
+
+client = config.get_val("arch_client", default=DEFAULT_CLIENT)
+
+def new():
+ return Arch()
+
+class Arch(vcs.VCS):
+ name = "arch"
+ client = client
+ versioned = True
+ _archive_name = None
+ _archive_dir = None
+ _tmp_archive = False
+ _project_name = None
+ _tmp_project = False
+ _arch_paramdir = os.path.expanduser("~/.arch-params")
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client("--version")
+ return output
+ def _vcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Arch"""
+ if self._u_search_parent_directories(path, "{arch}") != None :
+ config.set_val("arch_client", client)
+ return True
+ return False
+ def _vcs_init(self, path):
+ self._create_archive(path)
+ self._create_project(path)
+ self._add_project_code(path)
+ def _create_archive(self, path):
+ """
+ Create a temporary Arch archive in the directory PATH. This
+ archive will be removed by
+ cleanup->_vcs_cleanup->_remove_archive
+ """
+ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
+ assert self._archive_name == None
+ id = self.get_user_id()
+ name, email = self._u_parse_id(id)
+ if email == None:
+ email = "%s@example.com" % name
+ trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8])
+ self._archive_name = "%s--%s" % (email, trailer)
+ self._archive_dir = "/tmp/%s" % trailer
+ self._tmp_archive = True
+ self._u_invoke_client("make-archive", self._archive_name,
+ self._archive_dir, cwd=path)
+ def _invoke_client(self, *args, **kwargs):
+ """
+ Invoke the client on our archive.
+ """
+ assert self._archive_name != None
+ command = args[0]
+ if len(args) > 1:
+ tailargs = args[1:]
+ else:
+ tailargs = []
+ arglist = [command, "-A", self._archive_name]
+ arglist.extend(tailargs)
+ args = tuple(arglist)
+ return self._u_invoke_client(*args, **kwargs)
+ def _remove_archive(self):
+ assert self._tmp_archive == True
+ assert self._archive_dir != None
+ assert self._archive_name != None
+ os.remove(os.path.join(self._arch_paramdir,
+ "=locations", self._archive_name))
+ shutil.rmtree(self._archive_dir)
+ self._tmp_archive = False
+ self._archive_dir = False
+ self._archive_name = False
+ def _create_project(self, path):
+ """
+ Create a temporary Arch project in the directory PATH. This
+ project will be removed by
+ cleanup->_vcs_cleanup->_remove_project
+ """
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
+ category = "bugs-everywhere"
+ branch = "mainline"
+ version = "0.1"
+ self._project_name = "%s--%s--%s" % (category, branch, version)
+ self._invoke_client("archive-setup", self._project_name,
+ cwd=path)
+ self._tmp_project = True
+ def _remove_project(self):
+ assert self._tmp_project == True
+ assert self._project_name != None
+ assert self._archive_dir != None
+ shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
+ self._tmp_project = False
+ self._project_name = False
+ def _archive_project_name(self):
+ assert self._archive_name != None
+ assert self._project_name != None
+ return "%s/%s" % (self._archive_name, self._project_name)
+ def _adjust_naming_conventions(self, path):
+ """
+ By default, Arch restricts source code filenames to
+ ^[_=a-zA-Z0-9].*$
+ See
+ http://regexps.srparish.net/tutorial-tla/naming-conventions.html
+ Since our bug directory '.be' doesn't satisfy these conventions,
+ we need to adjust them.
+
+ The conventions are specified in
+ project-root/{arch}/=tagging-method
+ """
+ tagpath = os.path.join(path, "{arch}", "=tagging-method")
+ lines_out = []
+ f = codecs.open(tagpath, "r", self.encoding)
+ for line in f:
+ if line.startswith("source "):
+ lines_out.append("source ^[._=a-zA-X0-9].*$\n")
+ else:
+ lines_out.append(line)
+ f.close()
+ f = codecs.open(tagpath, "w", self.encoding)
+ f.write("".join(lines_out))
+ f.close()
+
+ def _add_project_code(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-source.html
+ # http://regexps.srparish.net/tutorial-tla/importing-first.html
+ self._invoke_client("init-tree", self._project_name,
+ cwd=path)
+ self._adjust_naming_conventions(path)
+ self._invoke_client("import", "--summary", "Began versioning",
+ cwd=path)
+ def _vcs_cleanup(self):
+ if self._tmp_project == True:
+ self._remove_project()
+ if self._tmp_archive == True:
+ self._remove_archive()
+
+ def _vcs_root(self, path):
+ if not os.path.isdir(path):
+ dirname = os.path.dirname(path)
+ else:
+ dirname = path
+ status,output,error = self._u_invoke_client("tree-root", dirname)
+ root = output.rstrip('\n')
+
+ self._get_archive_project_name(root)
+
+ return root
+ def _get_archive_name(self, root):
+ status,output,error = self._u_invoke_client("archives")
+ lines = output.split('\n')
+ # e.g. output:
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52
+ # /tmp/BEtestXXXXXX/rootdir
+ # (+ repeats)
+ for archive,location in zip(lines[::2], lines[1::2]):
+ if os.path.realpath(location) == os.path.realpath(root):
+ self._archive_name = archive
+ assert self._archive_name != None
+ def _get_archive_project_name(self, root):
+ # get project names
+ status,output,error = self._u_invoke_client("tree-version", cwd=root)
+ # e.g output
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1
+ archive_name,project_name = output.rstrip('\n').split('/')
+ self._archive_name = archive_name
+ self._project_name = project_name
+ def _vcs_get_user_id(self):
+ try:
+ status,output,error = self._u_invoke_client('my-id')
+ return output.rstrip('\n')
+ except Exception, e:
+ if 'no arch user id set' in e.args[0]:
+ return None
+ else:
+ raise
+ def _vcs_set_user_id(self, value):
+ self._u_invoke_client('my-id', value)
+ def _vcs_add(self, path):
+ self._u_invoke_client("add-id", path)
+ realpath = os.path.realpath(self._u_abspath(path))
+ pathAdded = realpath in self._list_added(self.rootdir)
+ if self.paranoid and not pathAdded:
+ self._force_source(path)
+ def _list_added(self, root):
+ assert os.path.exists(root)
+ assert os.access(root, os.X_OK)
+ root = os.path.realpath(root)
+ status,output,error = self._u_invoke_client("inventory", "--source",
+ "--both", "--all", root)
+ inv_str = output.rstrip('\n')
+ return [os.path.join(root, p) for p in inv_str.split('\n')]
+ def _add_dir_rule(self, rule, dirname, root):
+ inv_path = os.path.join(dirname, '.arch-inventory')
+ f = codecs.open(inv_path, "a", self.encoding)
+ f.write(rule)
+ f.close()
+ if os.path.realpath(inv_path) not in self._list_added(root):
+ paranoid = self.paranoid
+ self.paranoid = False
+ self.add(inv_path)
+ self.paranoid = paranoid
+ def _force_source(self, path):
+ rule = "source %s\n" % self._u_rel_path(path)
+ self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
+ if os.path.realpath(path) not in self._list_added(self.rootdir):
+ raise CantAddFile(path)
+ def _vcs_remove(self, path):
+ if not '.arch-ids' in path:
+ self._u_invoke_client("delete-id", path)
+ def _vcs_update(self, path):
+ pass
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
+ else:
+ status,output,error = \
+ self._invoke_client("file-find", path, revision)
+ relpath = output.rstrip('\n')
+ abspath = os.path.join(self.rootdir, relpath)
+ f = codecs.open(abspath, "r", self.encoding)
+ contents = f.read()
+ f.close()
+ return contents
+ def _vcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ vcs.VCS._vcs_duplicate_repo(self, directory, revision)
+ else:
+ status,output,error = \
+ self._u_invoke_client("get", revision, directory)
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ if allow_empty == False:
+ # arch applies empty commits without complaining, so check first
+ status,output,error = self._u_invoke_client("changes",expect=(0,1))
+ if status == 0:
+ raise vcs.EmptyCommit()
+ summary,body = self._u_parse_commitfile(commitfile)
+ args = ["commit", "--summary", summary]
+ if body != None:
+ args.extend(["--log-message",body])
+ status,output,error = self._u_invoke_client(*args)
+ revision = None
+ revline = re.compile("[*] committed (.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revpath = match.groups()[0]
+ assert not " " in revpath, revpath
+ assert revpath.startswith(self._archive_project_name()+'--')
+ revision = revpath[len(self._archive_project_name()+'--'):]
+ return revpath
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client("logs")
+ logs = output.splitlines()
+ first_log = logs.pop(0)
+ assert first_log == "base-0", first_log
+ try:
+ log = logs[index]
+ except IndexError:
+ return None
+ return "%s--%s" % (self._archive_project_name(), log)
+
+class CantAddFile(Exception):
+ def __init__(self, file):
+ self.file = file
+ Exception.__init__(self, "Can't automatically add file %s" % file)
+
+
+
+if libbe.TESTING == True:
+ vcs.make_vcs_testcase_subclasses(Arch, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
new file mode 100644
index 0000000..44643a4
--- /dev/null
+++ b/libbe/storage/vcs/base.py
@@ -0,0 +1,941 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Alexander Belchenko <bialix@ukr.net>
+# Ben Finney <benf@cybersource.com.au>
+# Chris Ball <cjb@laptop.org>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the base VCS (Version Control System) class, which should be
+subclassed by other Version Control System backends. The base class
+implements a "do not version" VCS.
+"""
+
+import codecs
+import os
+import os.path
+import re
+from socket import gethostname
+import shutil
+import sys
+import tempfile
+
+import libbe
+from utility import Dir, search_parent_directories
+from subproc import CommandError, invoke
+from plugin import get_plugin
+
+if libbe.TESTING == True:
+ import unittest
+ import doctest
+
+
+# List VCS modules in order of preference.
+# Don't list this module, it is implicitly last.
+VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
+
+def set_preferred_vcs(name):
+ global VCS_ORDER
+ assert name in VCS_ORDER, \
+ 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
+ VCS_ORDER.remove(name)
+ VCS_ORDER.insert(0, name)
+
+def _get_matching_vcs(matchfn):
+ """Return the first module for which matchfn(VCS_instance) is true"""
+ for submodname in VCS_ORDER:
+ module = get_plugin('libbe', submodname)
+ vcs = module.new()
+ if matchfn(vcs) == True:
+ return vcs
+ vcs.cleanup()
+ return VCS()
+
+def vcs_by_name(vcs_name):
+ """Return the module for the VCS with the given name"""
+ return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
+
+def detect_vcs(dir):
+ """Return an VCS instance for the vcs being used in this directory"""
+ return _get_matching_vcs(lambda vcs: vcs.detect(dir))
+
+def installed_vcs():
+ """Return an instance of an installed VCS"""
+ return _get_matching_vcs(lambda vcs: vcs.installed())
+
+
+
+class SettingIDnotSupported(NotImplementedError):
+ pass
+
+class VCSnotRooted(Exception):
+ def __init__(self):
+ msg = "VCS not rooted"
+ Exception.__init__(self, msg)
+
+class PathNotInRoot(Exception):
+ def __init__(self, path, root):
+ msg = "Path '%s' not in root '%s'" % (path, root)
+ Exception.__init__(self, msg)
+ self.path = path
+ self.root = root
+
+class NoSuchFile(Exception):
+ def __init__(self, pathname, root="."):
+ path = os.path.abspath(os.path.join(root, pathname))
+ Exception.__init__(self, "No such file: %s" % path)
+
+class EmptyCommit(Exception):
+ def __init__(self):
+ Exception.__init__(self, "No changes to commit")
+
+
+def new():
+ return VCS()
+
+class VCS(object):
+ """
+ This class implements a 'no-vcs' interface.
+
+ Support for other VCSs can be added by subclassing this class, and
+ overriding methods _vcs_*() with code appropriate for your VCS.
+
+ The methods _u_*() are utility methods available to the _vcs_*()
+ methods.
+ """
+ name = "None"
+ client = "" # command-line tool for _u_invoke_client
+ versioned = False
+ def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()):
+ self.paranoid = paranoid
+ self.verboseInvoke = False
+ self.rootdir = None
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ self.encoding = encoding
+ def __str__(self):
+ return "<%s %s>" % (self.__class__.__name__, id(self))
+ def __repr__(self):
+ return str(self)
+ def _vcs_version(self):
+ """
+ Return the VCS version string.
+ """
+ return "0.0"
+ def _vcs_detect(self, path=None):
+ """
+ Detect whether a directory is revision controlled with this VCS.
+ """
+ return True
+ def _vcs_root(self, path):
+ """
+ Get the VCS root. This is the default working directory for
+ future invocations. You would normally set this to the root
+ directory for your VCS.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ if path == "":
+ path = os.path.abspath(".")
+ return path
+ def _vcs_init(self, path):
+ """
+ Begin versioning the tree based at path.
+ """
+ pass
+ def _vcs_cleanup(self):
+ """
+ Remove any cruft that _vcs_init() created outside of the
+ versioned tree.
+ """
+ pass
+ def _vcs_get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return None.
+ """
+ return None
+ def _vcs_set_user_id(self, value):
+ """
+ Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the VCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ raise SettingIDnotSupported
+ def _vcs_add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ pass
+ def _vcs_remove(self, path):
+ """
+ Remove the file at path from version control. Optionally
+ remove the file from the filesystem as well.
+ """
+ pass
+ def _vcs_update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ pass
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
+ """
+ Get the file contents as they were in a given revision.
+ Revision==None specifies the current revision.
+ """
+ assert revision == None, \
+ "The %s VCS does not support revision specifiers" % self.name
+ if binary == False:
+ f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding)
+ else:
+ f = open(os.path.join(self.rootdir, path), "rb")
+ contents = f.read()
+ f.close()
+ return contents
+ def _vcs_duplicate_repo(self, directory, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ dir specifies a directory to create the duplicate in.
+ """
+ shutil.copytree(self.rootdir, directory, True)
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ """
+ Commit the current working directory, using the contents of
+ commitfile as the comment. Return the name of the old
+ revision (or None if commits are not supported).
+
+ If allow_empty == False, raise EmptyCommit if there are no
+ changes to commit.
+ """
+ return None
+ def _vcs_revision_id(self, index):
+ """
+ Return the name of the <index>th revision. Index will be an
+ integer (possibly <= 0). The choice of which branch to follow
+ when crossing branches/merges is not defined.
+
+ Return None if revision IDs are not supported, or if the
+ specified revision does not exist.
+ """
+ return None
+ def version(self):
+ """Cache version string for efficiency."""
+ if not hasattr(self, '_version'):
+ self._version = self._get_version()
+ return self._version
+ def _get_version(self):
+ try:
+ ret = self._vcs_version()
+ return ret
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ return None
+ else:
+ raise OSError, e
+ except CommandError:
+ return None
+ def installed(self):
+ if self.version() != None:
+ return True
+ return False
+ def detect(self, path="."):
+ """
+ Detect whether a directory is revision controlled with this VCS.
+ """
+ return self._vcs_detect(path)
+ def root(self, path):
+ """
+ Set the root directory to the path's VCS root. This is the
+ default working directory for future invocations.
+ """
+ self.rootdir = self._vcs_root(path)
+ def init(self, path):
+ """
+ Begin versioning the tree based at path.
+ Also roots the vcs at path.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ self._vcs_init(path)
+ self.root(path)
+ def cleanup(self):
+ self._vcs_cleanup()
+ def get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return the user's
+ id. You can override the automatic lookup procedure by setting the
+ VCS.user_id attribute to a string of your choice.
+ """
+ if hasattr(self, "user_id"):
+ if self.user_id != None:
+ return self.user_id
+ id = self._vcs_get_user_id()
+ if id == None:
+ name = self._u_get_fallback_username()
+ email = self._u_get_fallback_email()
+ id = self._u_create_id(name, email)
+ print >> sys.stderr, "Guessing id '%s'" % id
+ try:
+ self.set_user_id(id)
+ except SettingIDnotSupported:
+ pass
+ return id
+ def set_user_id(self, value):
+ """
+ Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the VCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ self._vcs_set_user_id(value)
+ def add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ self._vcs_add(self._u_rel_path(path))
+ def remove(self, path):
+ """
+ Remove a file from both version control and the filesystem.
+ """
+ self._vcs_remove(self._u_rel_path(path))
+ if os.path.exists(path):
+ os.remove(path)
+ def recursive_remove(self, dirname):
+ """
+ Remove a file/directory and all its decendents from both
+ version control and the filesystem.
+ """
+ if not os.path.exists(dirname):
+ raise NoSuchFile(dirname)
+ for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
+ filenames.extend(dirnames)
+ for path in filenames:
+ fullpath = os.path.join(dirpath, path)
+ if os.path.exists(fullpath) == False:
+ continue
+ self._vcs_remove(self._u_rel_path(fullpath))
+ if os.path.exists(dirname):
+ shutil.rmtree(dirname)
+ def update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ self._vcs_update(self._u_rel_path(path))
+ def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False):
+ """
+ Get the file as it was in a given revision.
+ Revision==None specifies the current revision.
+
+ allow_no_vcs==True allows direct access to files through
+ codecs.open() or open() if the vcs decides it can't handle the
+ given path.
+ """
+ if not os.path.exists(path):
+ raise NoSuchFile(path)
+ if self._use_vcs(path, allow_no_vcs):
+ relpath = self._u_rel_path(path)
+ contents = self._vcs_get_file_contents(relpath,revision,binary=binary)
+ else:
+ if binary == True:
+ f = codecs.open(path, "r", self.encoding)
+ else:
+ f = open(path, "rb")
+ contents = f.read()
+ f.close()
+ return contents
+ def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False):
+ """
+ Set the file contents under version control.
+ """
+ add = not os.path.exists(path)
+ if binary == False:
+ f = codecs.open(path, "w", self.encoding)
+ else:
+ f = open(path, "wb")
+ f.write(contents)
+ f.close()
+
+ if self._use_vcs(path, allow_no_vcs):
+ if add:
+ self.add(path)
+ else:
+ self.update(path)
+ def mkdir(self, path, allow_no_vcs=False, check_parents=True):
+ """
+ Create (if neccessary) a directory at path under version
+ control.
+ """
+ if check_parents == True:
+ parent = os.path.dirname(path)
+ if not os.path.exists(parent): # recurse through parents
+ self.mkdir(parent, allow_no_vcs, check_parents)
+ if not os.path.exists(path):
+ os.mkdir(path)
+ if self._use_vcs(path, allow_no_vcs):
+ self.add(path)
+ else:
+ assert os.path.isdir(path)
+ if self._use_vcs(path, allow_no_vcs):
+ #self.update(path)# Don't update directories. Changing files
+ pass # underneath them should be sufficient.
+
+ def duplicate_repo(self, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ Return the path to the arbitrary directory at the base of the new repo.
+ """
+ # Dirname in Basedir to protect against simlink attacks.
+ if self._duplicateBasedir == None:
+ self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs')
+ self._duplicateDirname = \
+ os.path.join(self._duplicateBasedir, "duplicate")
+ self._vcs_duplicate_repo(directory=self._duplicateDirname,
+ revision=revision)
+ return self._duplicateDirname
+ def remove_duplicate_repo(self):
+ """
+ Clean up a duplicate repo created with duplicate_repo().
+ """
+ if self._duplicateBasedir != None:
+ shutil.rmtree(self._duplicateBasedir)
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ def commit(self, summary, body=None, allow_empty=False):
+ """
+ Commit the current working directory, with a commit message
+ string summary and body. Return the name of the old revision
+ (or None if versioning is not supported).
+
+ If allow_empty == False (the default), raise EmptyCommit if
+ there are no changes to commit.
+ """
+ summary = summary.strip()+'\n'
+ if body is not None:
+ summary += '\n' + body.strip() + '\n'
+ descriptor, filename = tempfile.mkstemp()
+ revision = None
+ try:
+ temp_file = os.fdopen(descriptor, 'wb')
+ temp_file.write(summary)
+ temp_file.flush()
+ self.precommit()
+ revision = self._vcs_commit(filename, allow_empty=allow_empty)
+ temp_file.close()
+ self.postcommit()
+ finally:
+ os.remove(filename)
+ return revision
+ def precommit(self):
+ """
+ Executed before all attempted commits.
+ """
+ pass
+ def postcommit(self):
+ """
+ Only executed after successful commits.
+ """
+ pass
+ def revision_id(self, index=None):
+ """
+ Return the name of the <index>th revision. The choice of
+ which branch to follow when crossing branches/merges is not
+ defined.
+
+ Return None if index==None, revision IDs are not supported, or
+ if the specified revision does not exist.
+ """
+ if index == None:
+ return None
+ return self._vcs_revision_id(index)
+ def _u_any_in_string(self, list, string):
+ """
+ Return True if any of the strings in list are in string.
+ Otherwise return False.
+ """
+ for list_string in list:
+ if list_string in string:
+ return True
+ return False
+ def _u_invoke(self, *args, **kwargs):
+ if 'cwd' not in kwargs:
+ kwargs['cwd'] = self.rootdir
+ if 'verbose' not in kwargs:
+ kwargs['verbose'] = self.verboseInvoke
+ if 'encoding' not in kwargs:
+ kwargs['encoding'] = self.encoding
+ return invoke(*args, **kwargs)
+ def _u_invoke_client(self, *args, **kwargs):
+ cl_args = [self.client]
+ cl_args.extend(args)
+ return self._u_invoke(cl_args, **kwargs)
+ def _u_search_parent_directories(self, path, filename):
+ """
+ Find the file (or directory) named filename in path or in any
+ of path's parents.
+
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
+ """
+ return search_parent_directories(path, filename)
+ def _use_vcs(self, path, allow_no_vcs):
+ """
+ Try and decide if _vcs_add/update/mkdir/etc calls will
+ succeed. Returns True is we think the vcs_call would
+ succeeed, and False otherwise.
+ """
+ use_vcs = True
+ exception = None
+ if self.rootdir != None:
+ if self.path_in_root(path) == False:
+ use_vcs = False
+ exception = PathNotInRoot(path, self.rootdir)
+ else:
+ use_vcs = False
+ exception = VCSnotRooted
+ if use_vcs == False and allow_no_vcs==False:
+ raise exception
+ return use_vcs
+ def path_in_root(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> vcs = new()
+ >>> vcs.path_in_root("/a.b/c/.be", "/a.b/c")
+ True
+ >>> vcs.path_in_root("/a.b/.be", "/a.b/c")
+ False
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise VCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ return False
+ return True
+ def _u_rel_path(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> vcs = new()
+ >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
+ '.be'
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise VCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ raise PathNotInRoot(path, absRootSlashedDir)
+ assert path != absRootSlashedDir, \
+ "file %s == root directory %s" % (path, absRootSlashedDir)
+ relpath = path[len(absRootSlashedDir):]
+ return relpath
+ def _u_abspath(self, path, root=None):
+ """
+ Return the absolute path from a path realtive to root.
+ >>> vcs = new()
+ >>> vcs._u_abspath(".be", "/a.b/c")
+ '/a.b/c/.be'
+ """
+ if root == None:
+ assert self.rootdir != None, "VCS not rooted"
+ root = self.rootdir
+ return os.path.abspath(os.path.join(root, path))
+ def _u_create_id(self, name, email=None):
+ """
+ >>> vcs = new()
+ >>> vcs._u_create_id("John Doe", "jdoe@example.com")
+ 'John Doe <jdoe@example.com>'
+ >>> vcs._u_create_id("John Doe")
+ 'John Doe'
+ """
+ assert len(name) > 0
+ if email == None or len(email) == 0:
+ return name
+ else:
+ return "%s <%s>" % (name, email)
+ def _u_parse_id(self, value):
+ """
+ >>> vcs = new()
+ >>> vcs._u_parse_id("John Doe <jdoe@example.com>")
+ ('John Doe', 'jdoe@example.com')
+ >>> vcs._u_parse_id("John Doe")
+ ('John Doe', None)
+ >>> try:
+ ... vcs._u_parse_id("John Doe <jdoe@example.com><what?>")
+ ... except AssertionError:
+ ... print "Invalid match"
+ Invalid match
+ """
+ emailexp = re.compile("(.*) <([^>]*)>(.*)")
+ match = emailexp.search(value)
+ if match == None:
+ email = None
+ name = value
+ else:
+ assert len(match.groups()) == 3
+ assert match.groups()[2] == "", match.groups()
+ email = match.groups()[1]
+ name = match.groups()[0]
+ assert name != None
+ assert len(name) > 0
+ return (name, email)
+ def _u_get_fallback_username(self):
+ name = None
+ for envariable in ["LOGNAME", "USERNAME"]:
+ if os.environ.has_key(envariable):
+ name = os.environ[envariable]
+ break
+ assert name != None
+ return name
+ def _u_get_fallback_email(self):
+ hostname = gethostname()
+ name = self._u_get_fallback_username()
+ return "%s@%s" % (name, hostname)
+ def _u_parse_commitfile(self, commitfile):
+ """
+ Split the commitfile created in self.commit() back into
+ summary and header lines.
+ """
+ f = codecs.open(commitfile, "r", self.encoding)
+ summary = f.readline()
+ body = f.read()
+ body.lstrip('\n')
+ if len(body) == 0:
+ body = None
+ f.close()
+ return (summary, body)
+
+
+if libbe.TESTING == True:
+ def setup_vcs_test_fixtures(testcase):
+ """Set up test fixtures for VCS test case."""
+ testcase.vcs = testcase.Class()
+ testcase.dir = Dir()
+ testcase.dirname = testcase.dir.path
+
+ vcs_not_supporting_uninitialized_user_id = []
+ vcs_not_supporting_set_user_id = ["None", "hg"]
+ testcase.vcs_supports_uninitialized_user_id = (
+ testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id)
+ testcase.vcs_supports_set_user_id = (
+ testcase.vcs.name not in vcs_not_supporting_set_user_id)
+
+ if not testcase.vcs.installed():
+ testcase.fail(
+ "%(name)s VCS not found" % vars(testcase.Class))
+
+ if testcase.Class.name != "None":
+ testcase.failIf(
+ testcase.vcs.detect(testcase.dirname),
+ "Detected %(name)s VCS before initialising"
+ % vars(testcase.Class))
+
+ testcase.vcs.init(testcase.dirname)
+
+ class VCSTestCase(unittest.TestCase):
+ """Test cases for base VCS class."""
+
+ Class = VCS
+
+ def __init__(self, *args, **kwargs):
+ super(VCSTestCase, self).__init__(*args, **kwargs)
+ self.dirname = None
+
+ def setUp(self):
+ super(VCSTestCase, self).setUp()
+ setup_vcs_test_fixtures(self)
+
+ def tearDown(self):
+ self.vcs.cleanup()
+ self.dir.cleanup()
+ super(VCSTestCase, self).tearDown()
+
+ def full_path(self, rel_path):
+ return os.path.join(self.dirname, rel_path)
+
+
+ class VCS_init_TestCase(VCSTestCase):
+ """Test cases for VCS.init method."""
+
+ def test_detect_should_succeed_after_init(self):
+ """Should detect VCS in directory after initialization."""
+ self.failUnless(
+ self.vcs.detect(self.dirname),
+ "Did not detect %(name)s VCS after initialising"
+ % vars(self.Class))
+
+ def test_vcs_rootdir_in_specified_root_path(self):
+ """VCS root directory should be in specified root path."""
+ rp = os.path.realpath(self.vcs.rootdir)
+ dp = os.path.realpath(self.dirname)
+ vcs_name = self.Class.name
+ self.failUnless(
+ dp == rp or rp == None,
+ "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
+
+
+ class VCS_get_user_id_TestCase(VCSTestCase):
+ """Test cases for VCS.get_user_id method."""
+
+ def test_gets_existing_user_id(self):
+ """Should get the existing user ID."""
+ if not self.vcs_supports_uninitialized_user_id:
+ return
+
+ user_id = self.vcs.get_user_id()
+ self.failUnless(
+ user_id is not None,
+ "unable to get a user id")
+
+
+ class VCS_set_user_id_TestCase(VCSTestCase):
+ """Test cases for VCS.set_user_id method."""
+
+ def setUp(self):
+ super(VCS_set_user_id_TestCase, self).setUp()
+
+ if self.vcs_supports_uninitialized_user_id:
+ self.prev_user_id = self.vcs.get_user_id()
+ else:
+ self.prev_user_id = "Uninitialized identity <bogus@example.org>"
+
+ if self.vcs_supports_set_user_id:
+ self.test_new_user_id = "John Doe <jdoe@example.com>"
+ self.vcs.set_user_id(self.test_new_user_id)
+
+ def tearDown(self):
+ if self.vcs_supports_set_user_id:
+ self.vcs.set_user_id(self.prev_user_id)
+ super(VCS_set_user_id_TestCase, self).tearDown()
+
+ def test_raises_error_in_unsupported_vcs(self):
+ """Should raise an error in a VCS that doesn't support it."""
+ if self.vcs_supports_set_user_id:
+ return
+ self.assertRaises(
+ SettingIDnotSupported,
+ self.vcs.set_user_id, "foo")
+
+ def test_updates_user_id_in_supporting_vcs(self):
+ """Should update the user ID in an VCS that supports it."""
+ if not self.vcs_supports_set_user_id:
+ return
+ user_id = self.vcs.get_user_id()
+ self.failUnlessEqual(
+ self.test_new_user_id, user_id,
+ "user id not set correctly (expected %s, got %s)"
+ % (self.test_new_user_id, user_id))
+
+
+ def setup_vcs_revision_test_fixtures(testcase):
+ """Set up revision test fixtures for VCS test case."""
+ testcase.test_dirs = ['a', 'a/b', 'c']
+ for path in testcase.test_dirs:
+ testcase.vcs.mkdir(testcase.full_path(path))
+
+ testcase.test_files = ['a/text', 'a/b/text']
+
+ testcase.test_contents = {
+ 'rev_1': "Lorem ipsum",
+ 'uncommitted': "dolor sit amet",
+ }
+
+
+ class VCS_mkdir_TestCase(VCSTestCase):
+ """Test cases for VCS.mkdir method."""
+
+ def setUp(self):
+ super(VCS_mkdir_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ for path in reversed(sorted(self.test_dirs)):
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_mkdir_TestCase, self).tearDown()
+
+ def test_mkdir_creates_directory(self):
+ """Should create specified directory in filesystem."""
+ for path in self.test_dirs:
+ full_path = self.full_path(path)
+ self.failUnless(
+ os.path.exists(full_path),
+ "path %(full_path)s does not exist" % vars())
+
+
+ class VCS_commit_TestCase(VCSTestCase):
+ """Test cases for VCS.commit method."""
+
+ def setUp(self):
+ super(VCS_commit_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ for path in reversed(sorted(self.test_dirs)):
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_commit_TestCase, self).tearDown()
+
+ def test_file_contents_as_specified(self):
+ """Should set file contents as specified."""
+ test_contents = self.test_contents['rev_1']
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(full_path, test_contents)
+ current_contents = self.vcs.get_file_contents(full_path)
+ self.failUnlessEqual(test_contents, current_contents)
+
+ def test_file_contents_as_committed(self):
+ """Should have file contents as specified after commit."""
+ test_contents = self.test_contents['rev_1']
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(full_path, test_contents)
+ revision = self.vcs.commit("Initial file contents.")
+ current_contents = self.vcs.get_file_contents(full_path)
+ self.failUnlessEqual(test_contents, current_contents)
+
+ def test_file_contents_as_set_when_uncommitted(self):
+ """Should set file contents as specified after commit."""
+ if not self.vcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.vcs.commit("Initial file contents.")
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ current_contents = self.vcs.get_file_contents(full_path)
+ self.failUnlessEqual(
+ self.test_contents['uncommitted'], current_contents)
+
+ def test_revision_file_contents_as_committed(self):
+ """Should get file contents as committed to specified revision."""
+ if not self.vcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.vcs.commit("Initial file contents.")
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ committed_contents = self.vcs.get_file_contents(
+ full_path, revision)
+ self.failUnlessEqual(
+ self.test_contents['rev_1'], committed_contents)
+
+ def test_revision_id_as_committed(self):
+ """Check for compatibility between .commit() and .revision_id()"""
+ if not self.vcs.versioned:
+ self.failUnlessEqual(self.vcs.revision_id(5), None)
+ return
+ committed_revisions = []
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.vcs.commit("Initial %s contents." % path)
+ committed_revisions.append(revision)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ revision = self.vcs.commit("Altered %s contents." % path)
+ committed_revisions.append(revision)
+ for i,revision in enumerate(committed_revisions):
+ self.failUnlessEqual(self.vcs.revision_id(i), revision)
+ i += -len(committed_revisions) # check negative indices
+ self.failUnlessEqual(self.vcs.revision_id(i), revision)
+ i = len(committed_revisions)
+ self.failUnlessEqual(self.vcs.revision_id(i), None)
+ self.failUnlessEqual(self.vcs.revision_id(-i-1), None)
+
+ def test_revision_id_as_committed(self):
+ """Check revision id before first commit"""
+ if not self.vcs.versioned:
+ self.failUnlessEqual(self.vcs.revision_id(5), None)
+ return
+ committed_revisions = []
+ for path in self.test_files:
+ self.failUnlessEqual(self.vcs.revision_id(0), None)
+
+
+ class VCS_duplicate_repo_TestCase(VCSTestCase):
+ """Test cases for VCS.duplicate_repo method."""
+
+ def setUp(self):
+ super(VCS_duplicate_repo_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ self.vcs.remove_duplicate_repo()
+ for path in reversed(sorted(self.test_dirs)):
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_duplicate_repo_TestCase, self).tearDown()
+
+ def test_revision_file_contents_as_committed(self):
+ """Should match file contents as committed to specified revision.
+ """
+ if not self.vcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.vcs.commit("Commit current status")
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ dup_repo_path = self.vcs.duplicate_repo(revision)
+ dup_file_path = os.path.join(dup_repo_path, path)
+ dup_file_contents = file(dup_file_path, 'rb').read()
+ self.failUnlessEqual(
+ self.test_contents['rev_1'], dup_file_contents)
+ self.vcs.remove_duplicate_repo()
+
+
+ def make_vcs_testcase_subclasses(vcs_class, namespace):
+ """Make VCSTestCase subclasses for vcs_class in the namespace."""
+ vcs_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, VCSTestCase)]
+
+ for base_class in vcs_testcase_classes:
+ testcase_class_name = vcs_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = vcs_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/bzr.py b/libbe/storage/vcs/bzr.py
new file mode 100644
index 0000000..62a9b11
--- /dev/null
+++ b/libbe/storage/vcs/bzr.py
@@ -0,0 +1,117 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# Marien Zwart <marienz@gentoo.org>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Bazaar (bzr) backend.
+"""
+
+import os
+import re
+import sys
+import unittest
+
+import libbe
+import vcs
+if libbe.TESTING == True:
+ import doctest
+
+
+def new():
+ return Bzr()
+
+class Bzr(vcs.VCS):
+ name = "bzr"
+ client = "bzr"
+ versioned = True
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client("--version")
+ return output
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, ".bzr") != None :
+ return True
+ return False
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ status,output,error = self._u_invoke_client("root", path)
+ return output.rstrip('\n')
+ def _vcs_init(self, path):
+ self._u_invoke_client("init", cwd=path)
+ def _vcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("whoami")
+ return output.rstrip('\n')
+ def _vcs_set_user_id(self, value):
+ self._u_invoke_client("whoami", value)
+ def _vcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _vcs_remove(self, path):
+ # --force to also remove unversioned files.
+ self._u_invoke_client("remove", "--force", path)
+ def _vcs_update(self, path):
+ pass
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _vcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ vcs.VCS._vcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("branch", "--revision", revision,
+ ".", directory)
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ args = ["commit", "--file", commitfile]
+ if allow_empty == True:
+ args.append("--unchanged")
+ status,output,error = self._u_invoke_client(*args)
+ else:
+ kwargs = {"expect":(0,3)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status != 0:
+ strings = ["ERROR: no changes to commit.", # bzr 1.3.1
+ "ERROR: No changes to commit."] # bzr 1.15.1
+ if self._u_any_in_string(strings, error) == True:
+ raise vcs.EmptyCommit()
+ else:
+ raise vcs.CommandError(args, status, stderr=error)
+ revision = None
+ revline = re.compile("Committed revision (.*)[.]")
+ match = revline.search(error)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client("revno")
+ current_revision = int(output)
+ if index >= current_revision or index < -current_revision:
+ return None
+ if index >= 0:
+ return str(index+1) # bzr commit 0 is the empty tree.
+ return str(current_revision+index+1)
+
+
+if libbe.TESTING == True:
+ vcs.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/darcs.py b/libbe/storage/vcs/darcs.py
new file mode 100644
index 0000000..d94eaef
--- /dev/null
+++ b/libbe/storage/vcs/darcs.py
@@ -0,0 +1,192 @@
+# Copyright (C) 2009 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Darcs backend.
+"""
+
+import codecs
+import os
+import re
+import sys
+try: # import core module, Python >= 2.5
+ from xml.etree import ElementTree
+except ImportError: # look for non-core module
+ from elementtree import ElementTree
+from xml.sax.saxutils import unescape
+
+import libbe
+import vcs
+if libbe.TESTING == True:
+ import doctest
+ import unittest
+
+
+def new():
+ return Darcs()
+
+class Darcs(vcs.VCS):
+ name="darcs"
+ client="darcs"
+ versioned=True
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client("--version")
+ num_part = output.split(" ")[0]
+ self.parsed_version = [int(i) for i in num_part.split(".")]
+ return output
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, "_darcs") != None :
+ return True
+ return False
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ darcs_dir = self._u_search_parent_directories(path, "_darcs")
+ if darcs_dir == None:
+ return None
+ return os.path.dirname(darcs_dir)
+ def _vcs_init(self, path):
+ self._u_invoke_client("init", cwd=path)
+ def _vcs_get_user_id(self):
+ # following http://darcs.net/manual/node4.html#SECTION00410030000000000000
+ # as of June 29th, 2009
+ if self.rootdir == None:
+ return None
+ darcs_dir = os.path.join(self.rootdir, "_darcs")
+ if darcs_dir != None:
+ for pref_file in ["author", "email"]:
+ pref_path = os.path.join(darcs_dir, "prefs", pref_file)
+ if os.path.exists(pref_path):
+ return self.get_file_contents(pref_path)
+ for env_variable in ["DARCS_EMAIL", "EMAIL"]:
+ if env_variable in os.environ:
+ return os.environ[env_variable]
+ return None
+ def _vcs_set_user_id(self, value):
+ if self.rootdir == None:
+ self.root(".")
+ if self.rootdir == None:
+ raise vcs.SettingIDnotSupported
+ author_path = os.path.join(self.rootdir, "_darcs", "prefs", "author")
+ f = codecs.open(author_path, "w", self.encoding)
+ f.write(value)
+ f.close()
+ def _vcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client("add", path)
+ def _vcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ os.remove(os.path.join(self.rootdir, path)) # darcs notices removal
+ def _vcs_update(self, path):
+ pass # darcs notices changes
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return vcs.VCS._vcs_get_file_contents(self, path, revision,
+ binary=binary)
+ else:
+ if self.parsed_version[0] >= 2:
+ status,output,error = self._u_invoke_client( \
+ "show", "contents", "--patch", revision, path)
+ return output
+ else:
+ # Darcs versions < 2.0.0pre2 lack the "show contents" command
+
+ status,output,error = self._u_invoke_client( \
+ "diff", "--unified", "--from-patch", revision, path,
+ unicode_output=False)
+ major_patch = output
+ status,output,error = self._u_invoke_client( \
+ "diff", "--unified", "--patch", revision, path,
+ unicode_output=False)
+ target_patch = output
+
+ # "--output -" to be supported in GNU patch > 2.5.9
+ # but that hasn't been released as of June 30th, 2009.
+
+ # Rewrite path to status before the patch we want
+ args=["patch", "--reverse", path]
+ status,output,error = self._u_invoke(args, stdin=major_patch)
+ # Now apply the patch we want
+ args=["patch", path]
+ status,output,error = self._u_invoke(args, stdin=target_patch)
+
+ if os.path.exists(os.path.join(self.rootdir, path)) == True:
+ contents = vcs.VCS._vcs_get_file_contents(self, path,
+ binary=binary)
+ else:
+ contents = ""
+
+ # Now restore path to it's current incarnation
+ args=["patch", "--reverse", path]
+ status,output,error = self._u_invoke(args, stdin=target_patch)
+ args=["patch", path]
+ status,output,error = self._u_invoke(args, stdin=major_patch)
+ current_contents = vcs.VCS._vcs_get_file_contents(self, path,
+ binary=binary)
+ return contents
+ def _vcs_duplicate_repo(self, directory, revision=None):
+ if revision==None:
+ vcs.VCS._vcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("put", "--to-patch", revision, directory)
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ id = self.get_user_id()
+ if '@' not in id:
+ id = "%s <%s@invalid.com>" % (id, id)
+ args = ['record', '--all', '--author', id, '--logfile', commitfile]
+ status,output,error = self._u_invoke_client(*args)
+ empty_strings = ["No changes!"]
+ if self._u_any_in_string(empty_strings, output) == True:
+ if allow_empty == False:
+ raise vcs.EmptyCommit()
+ # note that darcs does _not_ make an empty revision.
+ # this returns the last non-empty revision id...
+ revision = self._vcs_revision_id(-1)
+ else:
+ revline = re.compile("Finished recording patch '(.*)'")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client("changes", "--xml")
+ revisions = []
+ xml_str = output.encode("unicode_escape").replace(r"\n", "\n")
+ element = ElementTree.XML(xml_str)
+ assert element.tag == "changelog", element.tag
+ for patch in element.getchildren():
+ assert patch.tag == "patch", patch.tag
+ for child in patch.getchildren():
+ if child.tag == "name":
+ text = unescape(unicode(child.text).decode("unicode_escape").strip())
+ revisions.append(text)
+ revisions.reverse()
+ try:
+ return revisions[index]
+ except IndexError:
+ return None
+
+if libbe.TESTING == True:
+ vcs.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/git.py b/libbe/storage/vcs/git.py
new file mode 100644
index 0000000..7f6e53a
--- /dev/null
+++ b/libbe/storage/vcs/git.py
@@ -0,0 +1,151 @@
+# Copyright (C) 2008-2009 Ben Finney <benf@cybersource.com.au>
+# Chris Ball <cjb@laptop.org>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Git backend.
+"""
+
+import os
+import re
+import sys
+import unittest
+
+import libbe
+import vcs
+if libbe.TESTING == True:
+ import doctest
+
+
+def new():
+ return Git()
+
+class Git(vcs.VCS):
+ name="git"
+ client="git"
+ versioned=True
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client("--version")
+ return output
+ def _vcs_detect(self, path):
+ if self._u_search_parent_directories(path, ".git") != None :
+ return True
+ return False
+ def _vcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ status,output,error = self._u_invoke_client("rev-parse", "--git-dir",
+ cwd=path)
+ gitdir = os.path.join(path, output.rstrip('\n'))
+ dirname = os.path.abspath(os.path.dirname(gitdir))
+ return dirname
+ def _vcs_init(self, path):
+ self._u_invoke_client("init", cwd=path)
+ def _vcs_get_user_id(self):
+ status,output,error = \
+ self._u_invoke_client("config", "user.name", expect=(0,1))
+ if status == 0:
+ name = output.rstrip('\n')
+ else:
+ name = ""
+ status,output,error = \
+ self._u_invoke_client("config", "user.email", expect=(0,1))
+ if status == 0:
+ email = output.rstrip('\n')
+ else:
+ email = ""
+ if name != "" or email != "": # got something!
+ # guess missing info, if necessary
+ if name == "":
+ name = self._u_get_fallback_username()
+ if email == "":
+ email = self._u_get_fallback_email()
+ return self._u_create_id(name, email)
+ return None # Git has no infomation
+ def _vcs_set_user_id(self, value):
+ name,email = self._u_parse_id(value)
+ if email != None:
+ self._u_invoke_client("config", "user.email", email)
+ self._u_invoke_client("config", "user.name", name)
+ def _vcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client("add", path)
+ def _vcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ self._u_invoke_client("rm", "-f", path)
+ def _vcs_update(self, path):
+ self._vcs_add(path)
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
+ else:
+ arg = "%s:%s" % (revision,path)
+ status,output,error = self._u_invoke_client("show", arg)
+ return output
+ def _vcs_duplicate_repo(self, directory, revision=None):
+ if revision==None:
+ vcs.VCS._vcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("clone", "--no-checkout", ".", directory)
+ self._u_invoke_client("checkout", revision, cwd=directory)
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ args = ['commit', '--all', '--file', commitfile]
+ if allow_empty == True:
+ args.append("--allow-empty")
+ status,output,error = self._u_invoke_client(*args)
+ else:
+ kwargs = {"expect":(0,1)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ strings = ["nothing to commit",
+ "nothing added to commit"]
+ if self._u_any_in_string(strings, output) == True:
+ raise vcs.EmptyCommit()
+ revision = None
+ revline = re.compile("(.*) (.*)[:\]] (.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 3
+ revision = match.groups()[1]
+ full_revision = self._vcs_revision_id(-1)
+ assert full_revision.startswith(revision), \
+ "Mismatched revisions:\n%s\n%s" % (revision, full_revision)
+ return full_revision
+ def _vcs_revision_id(self, index):
+ args = ["rev-list", "--first-parent", "--reverse", "HEAD"]
+ kwargs = {"expect":(0,128)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status == 128:
+ if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
+ return None
+ raise vcs.CommandError(args, status, stderr=error)
+ commits = output.splitlines()
+ try:
+ return commits[index]
+ except IndexError:
+ return None
+
+
+if libbe.TESTING == True:
+ vcs.make_vcs_testcase_subclasses(Git, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/hg.py b/libbe/storage/vcs/hg.py
new file mode 100644
index 0000000..ed27717
--- /dev/null
+++ b/libbe/storage/vcs/hg.py
@@ -0,0 +1,108 @@
+# Copyright (C) 2007-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <benf@cybersource.com.au>
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Mercurial (hg) backend.
+"""
+
+import os
+import re
+import sys
+
+import libbe
+import vcs
+
+if libbe.TESTING == True:
+ import unittest
+ import doctest
+
+
+def new():
+ return Hg()
+
+class Hg(vcs.VCS):
+ name="hg"
+ client="hg"
+ versioned=True
+ def _vcs_version(self):
+ status,output,error = self._u_invoke_client("--version")
+ return output
+ def _vcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Mercurial"""
+ if self._u_search_parent_directories(path, ".hg") != None:
+ return True
+ return False
+ def _vcs_root(self, path):
+ status,output,error = self._u_invoke_client("root", cwd=path)
+ return output.rstrip('\n')
+ def _vcs_init(self, path):
+ self._u_invoke_client("init", cwd=path)
+ def _vcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("showconfig","ui.username")
+ return output.rstrip('\n')
+ def _vcs_set_user_id(self, value):
+ """
+ Supported by the Config Extension, but that is not part of
+ standard Mercurial.
+ http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
+ """
+ raise vcs.SettingIDnotSupported
+ def _vcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _vcs_remove(self, path):
+ self._u_invoke_client("rm", "--force", path)
+ def _vcs_update(self, path):
+ pass
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _vcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ return vcs.VCS._vcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("archive", "--rev", revision, directory)
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ args = ['commit', '--logfile', commitfile]
+ status,output,error = self._u_invoke_client(*args)
+ if allow_empty == False:
+ strings = ["nothing changed"]
+ if self._u_any_in_string(strings, output) == True:
+ raise vcs.EmptyCommit()
+ return self._vcs_revision_id(-1)
+ def _vcs_revision_id(self, index, style="id"):
+ args = ["identify", "--rev", str(int(index)), "--%s" % style]
+ kwargs = {"expect": (0,255)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status == 0:
+ id = output.strip()
+ if id == '000000000000':
+ return None # before initial commit.
+ return id
+ return None
+
+
+if libbe.TESTING == True:
+ vcs.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/storage/vcs/util/config.py b/libbe/storage/vcs/util/config.py
new file mode 100644
index 0000000..ccd236b
--- /dev/null
+++ b/libbe/storage/vcs/util/config.py
@@ -0,0 +1,94 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Create, save, and load the per-user config file at path().
+"""
+
+import ConfigParser
+import codecs
+import locale
+import os.path
+import sys
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+
+default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
+
+def path():
+ """Return the path to the per-user config file"""
+ return os.path.expanduser("~/.bugs_everywhere")
+
+def set_val(name, value, section="DEFAULT", encoding=None):
+ """Set a value in the per-user config file
+
+ :param name: The name of the value to set
+ :param value: The new value to set (or None to delete the value)
+ :param section: The section to store the name/value in
+ """
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ if os.path.exists(path()) == False: # touch file or config
+ open(path(), "w").close() # read chokes on missing file
+ f = codecs.open(path(), "r", encoding)
+ config.readfp(f, path())
+ f.close()
+ if value is not None:
+ config.set(section, name, value)
+ else:
+ config.remove_option(section, name)
+ f = codecs.open(path(), "w", encoding)
+ config.write(f)
+ f.close()
+
+def get_val(name, section="DEFAULT", default=None, encoding=None):
+ """
+ Get a value from the per-user config file
+
+ :param name: The name of the value to get
+ :section: The section that the name is in
+ :return: The value, or None
+ >>> get_val("junk") is None
+ True
+ >>> set_val("junk", "random")
+ >>> get_val("junk")
+ u'random'
+ >>> set_val("junk", None)
+ >>> get_val("junk") is None
+ True
+ """
+ if os.path.exists(path()):
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ f = codecs.open(path(), "r", encoding)
+ config.readfp(f, path())
+ f.close()
+ try:
+ return config.get(section, name)
+ except ConfigParser.NoOptionError:
+ return default
+ else:
+ return default
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/storage/vcs/util/mapfile.py b/libbe/storage/vcs/util/mapfile.py
new file mode 100644
index 0000000..8e1e279
--- /dev/null
+++ b/libbe/storage/vcs/util/mapfile.py
@@ -0,0 +1,126 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Provide a means of saving and loading dictionaries of parameters. The
+saved "mapfiles" should be clear, flat-text files, and allow easy merging of
+independent/conflicting changes.
+"""
+
+import errno
+import os.path
+import yaml
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+
+class IllegalKey(Exception):
+ def __init__(self, key):
+ Exception.__init__(self, 'Illegal key "%s"' % key)
+ self.key = key
+
+class IllegalValue(Exception):
+ def __init__(self, value):
+ Exception.__init__(self, 'Illegal value "%s"' % value)
+ self.value = value
+
+def generate(map):
+ """Generate a YAML mapfile content string.
+ >>> generate({"q":"p"})
+ 'q: p\\n\\n'
+ >>> generate({"q":u"Fran\u00e7ais"})
+ 'q: Fran\\xc3\\xa7ais\\n\\n'
+ >>> generate({"q":u"hello"})
+ 'q: hello\\n\\n'
+ >>> generate({"q=":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q="
+ >>> generate({"q:":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q:"
+ >>> generate({"q\\n":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q\\n"
+ >>> generate({"":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ""
+ >>> generate({">q":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ">q"
+ >>> generate({"q":"p\\n"})
+ Traceback (most recent call last):
+ IllegalValue: Illegal value "p\\n"
+ """
+ keys = map.keys()
+ keys.sort()
+ for key in keys:
+ try:
+ assert not key.startswith('>')
+ assert('\n' not in key)
+ assert('=' not in key)
+ assert(':' not in key)
+ assert(len(key) > 0)
+ except AssertionError:
+ raise IllegalKey(unicode(key).encode('unicode_escape'))
+ if '\n' in map[key]:
+ raise IllegalValue(unicode(map[key]).encode('unicode_escape'))
+
+ lines = []
+ for key in keys:
+ lines.append(yaml.safe_dump({key: map[key]},
+ default_flow_style=False,
+ allow_unicode=True))
+ lines.append('')
+ return '\n'.join(lines)
+
+def parse(contents):
+ """
+ Parse a YAML mapfile string.
+ >>> parse('q: p\\n\\n')['q']
+ 'p'
+ >>> parse('q: \\'p\\'\\n\\n')['q']
+ 'p'
+ >>> contents = generate({"a":"b", "c":"d", "e":"f"})
+ >>> dict = parse(contents)
+ >>> dict["a"]
+ 'b'
+ >>> dict["c"]
+ 'd'
+ >>> dict["e"]
+ 'f'
+ >>> contents = generate({"q":u"Fran\u00e7ais"})
+ >>> dict = parse(contents)
+ >>> dict["q"]
+ u'Fran\\xe7ais'
+ """
+ return yaml.load(contents) or {}
+
+def map_save(vcs, path, map, allow_no_vcs=False):
+ """Save the map as a mapfile to the specified path"""
+ contents = generate(map)
+ vcs.set_file_contents(path, contents, allow_no_vcs, binary=True)
+
+def map_load(vcs, path, allow_no_vcs=False):
+ contents = vcs.get_file_contents(path, allow_no_vcs=allow_no_vcs,
+ binary=True)
+ return parse(contents)
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/storage/vcs/util/upgrade.py b/libbe/storage/vcs/util/upgrade.py
new file mode 100644
index 0000000..dc9d54f
--- /dev/null
+++ b/libbe/storage/vcs/util/upgrade.py
@@ -0,0 +1,246 @@
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Handle conversion between the various on-disk images.
+"""
+
+import os, os.path
+import sys
+
+import libbe
+import bug
+import encoding
+import mapfile
+import vcs
+if libbe.TESTING == True:
+ import doctest
+
+# a list of all past versions
+BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0",
+ "Bugs Everywhere Directory v1.1",
+ "Bugs Everywhere Directory v1.2",
+ "Bugs Everywhere Directory v1.3"]
+
+# the current version
+BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1]
+
+class Upgrader (object):
+ "Class for converting "
+ initial_version = None
+ final_version = None
+ def __init__(self, root):
+ self.root = root
+ # use the "None" VCS to ensure proper encoding/decoding and
+ # simplify path construction.
+ self.vcs = vcs.vcs_by_name("None")
+ self.vcs.root(self.root)
+ self.vcs.encoding = encoding.get_encoding()
+
+ def get_path(self, *args):
+ """
+ Return a path relative to .root.
+ """
+ dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(dir, *args)
+
+ def check_initial_version(self):
+ path = self.get_path("version")
+ version = self.vcs.get_file_contents(path).rstrip("\n")
+ assert version == self.initial_version, version
+
+ def set_version(self):
+ path = self.get_path("version")
+ self.vcs.set_file_contents(path, self.final_version+"\n")
+
+ def upgrade(self):
+ print >> sys.stderr, "upgrading bugdir from '%s' to '%s'" \
+ % (self.initial_version, self.final_version)
+ self.check_initial_version()
+ self.set_version()
+ self._upgrade()
+
+ def _upgrade(self):
+ raise NotImplementedError
+
+
+class Upgrade_1_0_to_1_1 (Upgrader):
+ initial_version = "Bugs Everywhere Tree 1 0"
+ final_version = "Bugs Everywhere Directory v1.1"
+ def _upgrade_mapfile(self, path):
+ contents = self.vcs.get_file_contents(path)
+ old_format = False
+ for line in contents.splitlines():
+ if len(line.split("=")) == 2:
+ old_format = True
+ break
+ if old_format == True:
+ # translate to YAML.
+ newlines = []
+ for line in contents.splitlines():
+ line = line.rstrip('\n')
+ if len(line) == 0:
+ continue
+ fields = line.split("=")
+ if len(fields) == 2:
+ key,value = fields
+ newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
+ else:
+ newlines.append(line)
+ contents = '\n'.join(newlines)
+ # load the YAML and save
+ map = mapfile.parse(contents)
+ mapfile.map_save(self.vcs, path, map)
+
+ def _upgrade(self):
+ """
+ Comment value field "From" -> "Author".
+ Homegrown mapfile -> YAML.
+ """
+ path = self.get_path("settings")
+ self._upgrade_mapfile(path)
+ for bug_uuid in os.listdir(self.get_path("bugs")):
+ path = self.get_path("bugs", bug_uuid, "values")
+ self._upgrade_mapfile(path)
+ c_path = ["bugs", bug_uuid, "comments"]
+ if not os.path.exists(self.get_path(*c_path)):
+ continue # no comments for this bug
+ for comment_uuid in os.listdir(self.get_path(*c_path)):
+ path_list = c_path + [comment_uuid, "values"]
+ path = self.get_path(*path_list)
+ self._upgrade_mapfile(path)
+ settings = mapfile.map_load(self.vcs, path)
+ if "From" in settings:
+ settings["Author"] = settings.pop("From")
+ mapfile.map_save(self.vcs, path, settings)
+
+
+class Upgrade_1_1_to_1_2 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.1"
+ final_version = "Bugs Everywhere Directory v1.2"
+ def _upgrade(self):
+ """
+ BugDir settings field "rcs_name" -> "vcs_name".
+ """
+ path = self.get_path("settings")
+ settings = mapfile.map_load(self.vcs, path)
+ if "rcs_name" in settings:
+ settings["vcs_name"] = settings.pop("rcs_name")
+ mapfile.map_save(self.vcs, path, settings)
+
+class Upgrade_1_2_to_1_3 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.2"
+ final_version = "Bugs Everywhere Directory v1.3"
+ def __init__(self, *args, **kwargs):
+ Upgrader.__init__(self, *args, **kwargs)
+ self._targets = {} # key: target text,value: new target bug
+ path = self.get_path('settings')
+ settings = mapfile.map_load(self.vcs, path)
+ if 'vcs_name' in settings:
+ old_vcs = self.vcs
+ self.vcs = vcs.vcs_by_name(settings['vcs_name'])
+ self.vcs.root(self.root)
+ self.vcs.encoding = old_vcs.encoding
+
+ def _target_bug(self, target_text):
+ if target_text not in self._targets:
+ _bug = bug.Bug(bugdir=self, summary=target_text)
+ # note: we're not a bugdir, but all Bug.save() needs is
+ # .root, .vcs, and .get_path(), which we have.
+ _bug.severity = 'target'
+ self._targets[target_text] = _bug
+ return self._targets[target_text]
+
+ def _upgrade_bugdir_mapfile(self):
+ path = self.get_path('settings')
+ settings = mapfile.map_load(self.vcs, path)
+ if 'target' in settings:
+ settings['target'] = self._target_bug(settings['target']).uuid
+ mapfile.map_save(self.vcs, path, settings)
+
+ def _upgrade_bug_mapfile(self, bug_uuid):
+ import becommands.depend
+ path = self.get_path('bugs', bug_uuid, 'values')
+ settings = mapfile.map_load(self.vcs, path)
+ if 'target' in settings:
+ target_bug = self._target_bug(settings['target'])
+ _bug = bug.Bug(bugdir=self, uuid=bug_uuid, from_disk=True)
+ # note: we're not a bugdir, but all Bug.load_settings()
+ # needs is .root, .vcs, and .get_path(), which we have.
+ becommands.depend.add_block(target_bug, _bug)
+ _bug.settings.pop('target')
+ _bug.save()
+
+ def _upgrade(self):
+ """
+ Bug value field "target" -> target bugs.
+ Bugdir value field "target" -> pointer to current target bug.
+ """
+ for bug_uuid in os.listdir(self.get_path('bugs')):
+ self._upgrade_bug_mapfile(bug_uuid)
+ self._upgrade_bugdir_mapfile()
+ for _bug in self._targets.values():
+ _bug.save()
+
+upgraders = [Upgrade_1_0_to_1_1,
+ Upgrade_1_1_to_1_2,
+ Upgrade_1_2_to_1_3]
+upgrade_classes = {}
+for upgrader in upgraders:
+ upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader
+
+def upgrade(path, current_version,
+ target_version=BUGDIR_DISK_VERSION):
+ """
+ Call the appropriate upgrade function to convert current_version
+ to target_version. If a direct conversion function does not exist,
+ use consecutive conversion functions.
+ """
+ if current_version not in BUGDIR_DISK_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % version
+ if target_version not in BUGDIR_DISK_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % version
+
+ if (current_version, target_version) in upgrade_classes:
+ # direct conversion
+ upgrade_class = upgrade_classes[(current_version, target_version)]
+ u = upgrade_class(path)
+ u.upgrade()
+ else:
+ # consecutive single-step conversion
+ i = BUGDIR_DISK_VERSIONS.index(current_version)
+ while True:
+ version_a = BUGDIR_DISK_VERSIONS[i]
+ version_b = BUGDIR_DISK_VERSIONS[i+1]
+ try:
+ upgrade_class = upgrade_classes[(version_a, version_b)]
+ except KeyError:
+ raise NotImplementedError, \
+ "Cannot convert version '%s' to '%s' yet." \
+ % (version_a, version_b)
+ u = upgrade_class(path)
+ u.upgrade()
+ if version_b == target_version:
+ break
+ i += 1
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()