aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/storage/vcs/util
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/storage/vcs/util')
-rw-r--r--libbe/storage/vcs/util/config.py94
-rw-r--r--libbe/storage/vcs/util/mapfile.py126
-rw-r--r--libbe/storage/vcs/util/upgrade.py246
3 files changed, 466 insertions, 0 deletions
diff --git a/libbe/storage/vcs/util/config.py b/libbe/storage/vcs/util/config.py
new file mode 100644
index 0000000..ccd236b
--- /dev/null
+++ b/libbe/storage/vcs/util/config.py
@@ -0,0 +1,94 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Create, save, and load the per-user config file at path().
+"""
+
+import ConfigParser
+import codecs
+import locale
+import os.path
+import sys
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+
+default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
+
+def path():
+ """Return the path to the per-user config file"""
+ return os.path.expanduser("~/.bugs_everywhere")
+
+def set_val(name, value, section="DEFAULT", encoding=None):
+ """Set a value in the per-user config file
+
+ :param name: The name of the value to set
+ :param value: The new value to set (or None to delete the value)
+ :param section: The section to store the name/value in
+ """
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ if os.path.exists(path()) == False: # touch file or config
+ open(path(), "w").close() # read chokes on missing file
+ f = codecs.open(path(), "r", encoding)
+ config.readfp(f, path())
+ f.close()
+ if value is not None:
+ config.set(section, name, value)
+ else:
+ config.remove_option(section, name)
+ f = codecs.open(path(), "w", encoding)
+ config.write(f)
+ f.close()
+
+def get_val(name, section="DEFAULT", default=None, encoding=None):
+ """
+ Get a value from the per-user config file
+
+ :param name: The name of the value to get
+ :section: The section that the name is in
+ :return: The value, or None
+ >>> get_val("junk") is None
+ True
+ >>> set_val("junk", "random")
+ >>> get_val("junk")
+ u'random'
+ >>> set_val("junk", None)
+ >>> get_val("junk") is None
+ True
+ """
+ if os.path.exists(path()):
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ f = codecs.open(path(), "r", encoding)
+ config.readfp(f, path())
+ f.close()
+ try:
+ return config.get(section, name)
+ except ConfigParser.NoOptionError:
+ return default
+ else:
+ return default
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/storage/vcs/util/mapfile.py b/libbe/storage/vcs/util/mapfile.py
new file mode 100644
index 0000000..8e1e279
--- /dev/null
+++ b/libbe/storage/vcs/util/mapfile.py
@@ -0,0 +1,126 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Provide a means of saving and loading dictionaries of parameters. The
+saved "mapfiles" should be clear, flat-text files, and allow easy merging of
+independent/conflicting changes.
+"""
+
+import errno
+import os.path
+import yaml
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+
+class IllegalKey(Exception):
+ def __init__(self, key):
+ Exception.__init__(self, 'Illegal key "%s"' % key)
+ self.key = key
+
+class IllegalValue(Exception):
+ def __init__(self, value):
+ Exception.__init__(self, 'Illegal value "%s"' % value)
+ self.value = value
+
+def generate(map):
+ """Generate a YAML mapfile content string.
+ >>> generate({"q":"p"})
+ 'q: p\\n\\n'
+ >>> generate({"q":u"Fran\u00e7ais"})
+ 'q: Fran\\xc3\\xa7ais\\n\\n'
+ >>> generate({"q":u"hello"})
+ 'q: hello\\n\\n'
+ >>> generate({"q=":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q="
+ >>> generate({"q:":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q:"
+ >>> generate({"q\\n":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q\\n"
+ >>> generate({"":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ""
+ >>> generate({">q":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ">q"
+ >>> generate({"q":"p\\n"})
+ Traceback (most recent call last):
+ IllegalValue: Illegal value "p\\n"
+ """
+ keys = map.keys()
+ keys.sort()
+ for key in keys:
+ try:
+ assert not key.startswith('>')
+ assert('\n' not in key)
+ assert('=' not in key)
+ assert(':' not in key)
+ assert(len(key) > 0)
+ except AssertionError:
+ raise IllegalKey(unicode(key).encode('unicode_escape'))
+ if '\n' in map[key]:
+ raise IllegalValue(unicode(map[key]).encode('unicode_escape'))
+
+ lines = []
+ for key in keys:
+ lines.append(yaml.safe_dump({key: map[key]},
+ default_flow_style=False,
+ allow_unicode=True))
+ lines.append('')
+ return '\n'.join(lines)
+
+def parse(contents):
+ """
+ Parse a YAML mapfile string.
+ >>> parse('q: p\\n\\n')['q']
+ 'p'
+ >>> parse('q: \\'p\\'\\n\\n')['q']
+ 'p'
+ >>> contents = generate({"a":"b", "c":"d", "e":"f"})
+ >>> dict = parse(contents)
+ >>> dict["a"]
+ 'b'
+ >>> dict["c"]
+ 'd'
+ >>> dict["e"]
+ 'f'
+ >>> contents = generate({"q":u"Fran\u00e7ais"})
+ >>> dict = parse(contents)
+ >>> dict["q"]
+ u'Fran\\xe7ais'
+ """
+ return yaml.load(contents) or {}
+
+def map_save(vcs, path, map, allow_no_vcs=False):
+ """Save the map as a mapfile to the specified path"""
+ contents = generate(map)
+ vcs.set_file_contents(path, contents, allow_no_vcs, binary=True)
+
+def map_load(vcs, path, allow_no_vcs=False):
+ contents = vcs.get_file_contents(path, allow_no_vcs=allow_no_vcs,
+ binary=True)
+ return parse(contents)
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/storage/vcs/util/upgrade.py b/libbe/storage/vcs/util/upgrade.py
new file mode 100644
index 0000000..dc9d54f
--- /dev/null
+++ b/libbe/storage/vcs/util/upgrade.py
@@ -0,0 +1,246 @@
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Handle conversion between the various on-disk images.
+"""
+
+import os, os.path
+import sys
+
+import libbe
+import bug
+import encoding
+import mapfile
+import vcs
+if libbe.TESTING == True:
+ import doctest
+
+# a list of all past versions
+BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0",
+ "Bugs Everywhere Directory v1.1",
+ "Bugs Everywhere Directory v1.2",
+ "Bugs Everywhere Directory v1.3"]
+
+# the current version
+BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1]
+
+class Upgrader (object):
+ "Class for converting "
+ initial_version = None
+ final_version = None
+ def __init__(self, root):
+ self.root = root
+ # use the "None" VCS to ensure proper encoding/decoding and
+ # simplify path construction.
+ self.vcs = vcs.vcs_by_name("None")
+ self.vcs.root(self.root)
+ self.vcs.encoding = encoding.get_encoding()
+
+ def get_path(self, *args):
+ """
+ Return a path relative to .root.
+ """
+ dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(dir, *args)
+
+ def check_initial_version(self):
+ path = self.get_path("version")
+ version = self.vcs.get_file_contents(path).rstrip("\n")
+ assert version == self.initial_version, version
+
+ def set_version(self):
+ path = self.get_path("version")
+ self.vcs.set_file_contents(path, self.final_version+"\n")
+
+ def upgrade(self):
+ print >> sys.stderr, "upgrading bugdir from '%s' to '%s'" \
+ % (self.initial_version, self.final_version)
+ self.check_initial_version()
+ self.set_version()
+ self._upgrade()
+
+ def _upgrade(self):
+ raise NotImplementedError
+
+
+class Upgrade_1_0_to_1_1 (Upgrader):
+ initial_version = "Bugs Everywhere Tree 1 0"
+ final_version = "Bugs Everywhere Directory v1.1"
+ def _upgrade_mapfile(self, path):
+ contents = self.vcs.get_file_contents(path)
+ old_format = False
+ for line in contents.splitlines():
+ if len(line.split("=")) == 2:
+ old_format = True
+ break
+ if old_format == True:
+ # translate to YAML.
+ newlines = []
+ for line in contents.splitlines():
+ line = line.rstrip('\n')
+ if len(line) == 0:
+ continue
+ fields = line.split("=")
+ if len(fields) == 2:
+ key,value = fields
+ newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
+ else:
+ newlines.append(line)
+ contents = '\n'.join(newlines)
+ # load the YAML and save
+ map = mapfile.parse(contents)
+ mapfile.map_save(self.vcs, path, map)
+
+ def _upgrade(self):
+ """
+ Comment value field "From" -> "Author".
+ Homegrown mapfile -> YAML.
+ """
+ path = self.get_path("settings")
+ self._upgrade_mapfile(path)
+ for bug_uuid in os.listdir(self.get_path("bugs")):
+ path = self.get_path("bugs", bug_uuid, "values")
+ self._upgrade_mapfile(path)
+ c_path = ["bugs", bug_uuid, "comments"]
+ if not os.path.exists(self.get_path(*c_path)):
+ continue # no comments for this bug
+ for comment_uuid in os.listdir(self.get_path(*c_path)):
+ path_list = c_path + [comment_uuid, "values"]
+ path = self.get_path(*path_list)
+ self._upgrade_mapfile(path)
+ settings = mapfile.map_load(self.vcs, path)
+ if "From" in settings:
+ settings["Author"] = settings.pop("From")
+ mapfile.map_save(self.vcs, path, settings)
+
+
+class Upgrade_1_1_to_1_2 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.1"
+ final_version = "Bugs Everywhere Directory v1.2"
+ def _upgrade(self):
+ """
+ BugDir settings field "rcs_name" -> "vcs_name".
+ """
+ path = self.get_path("settings")
+ settings = mapfile.map_load(self.vcs, path)
+ if "rcs_name" in settings:
+ settings["vcs_name"] = settings.pop("rcs_name")
+ mapfile.map_save(self.vcs, path, settings)
+
+class Upgrade_1_2_to_1_3 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.2"
+ final_version = "Bugs Everywhere Directory v1.3"
+ def __init__(self, *args, **kwargs):
+ Upgrader.__init__(self, *args, **kwargs)
+ self._targets = {} # key: target text,value: new target bug
+ path = self.get_path('settings')
+ settings = mapfile.map_load(self.vcs, path)
+ if 'vcs_name' in settings:
+ old_vcs = self.vcs
+ self.vcs = vcs.vcs_by_name(settings['vcs_name'])
+ self.vcs.root(self.root)
+ self.vcs.encoding = old_vcs.encoding
+
+ def _target_bug(self, target_text):
+ if target_text not in self._targets:
+ _bug = bug.Bug(bugdir=self, summary=target_text)
+ # note: we're not a bugdir, but all Bug.save() needs is
+ # .root, .vcs, and .get_path(), which we have.
+ _bug.severity = 'target'
+ self._targets[target_text] = _bug
+ return self._targets[target_text]
+
+ def _upgrade_bugdir_mapfile(self):
+ path = self.get_path('settings')
+ settings = mapfile.map_load(self.vcs, path)
+ if 'target' in settings:
+ settings['target'] = self._target_bug(settings['target']).uuid
+ mapfile.map_save(self.vcs, path, settings)
+
+ def _upgrade_bug_mapfile(self, bug_uuid):
+ import becommands.depend
+ path = self.get_path('bugs', bug_uuid, 'values')
+ settings = mapfile.map_load(self.vcs, path)
+ if 'target' in settings:
+ target_bug = self._target_bug(settings['target'])
+ _bug = bug.Bug(bugdir=self, uuid=bug_uuid, from_disk=True)
+ # note: we're not a bugdir, but all Bug.load_settings()
+ # needs is .root, .vcs, and .get_path(), which we have.
+ becommands.depend.add_block(target_bug, _bug)
+ _bug.settings.pop('target')
+ _bug.save()
+
+ def _upgrade(self):
+ """
+ Bug value field "target" -> target bugs.
+ Bugdir value field "target" -> pointer to current target bug.
+ """
+ for bug_uuid in os.listdir(self.get_path('bugs')):
+ self._upgrade_bug_mapfile(bug_uuid)
+ self._upgrade_bugdir_mapfile()
+ for _bug in self._targets.values():
+ _bug.save()
+
+upgraders = [Upgrade_1_0_to_1_1,
+ Upgrade_1_1_to_1_2,
+ Upgrade_1_2_to_1_3]
+upgrade_classes = {}
+for upgrader in upgraders:
+ upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader
+
+def upgrade(path, current_version,
+ target_version=BUGDIR_DISK_VERSION):
+ """
+ Call the appropriate upgrade function to convert current_version
+ to target_version. If a direct conversion function does not exist,
+ use consecutive conversion functions.
+ """
+ if current_version not in BUGDIR_DISK_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % version
+ if target_version not in BUGDIR_DISK_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % version
+
+ if (current_version, target_version) in upgrade_classes:
+ # direct conversion
+ upgrade_class = upgrade_classes[(current_version, target_version)]
+ u = upgrade_class(path)
+ u.upgrade()
+ else:
+ # consecutive single-step conversion
+ i = BUGDIR_DISK_VERSIONS.index(current_version)
+ while True:
+ version_a = BUGDIR_DISK_VERSIONS[i]
+ version_b = BUGDIR_DISK_VERSIONS[i+1]
+ try:
+ upgrade_class = upgrade_classes[(version_a, version_b)]
+ except KeyError:
+ raise NotImplementedError, \
+ "Cannot convert version '%s' to '%s' yet." \
+ % (version_a, version_b)
+ u = upgrade_class(path)
+ u.upgrade()
+ if version_b == target_version:
+ break
+ i += 1
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()