# Copyright (C) 2009-2012 Chris Ball # Gianluca Montecchi # W. Trevor King # # This file is part of Bugs Everywhere. # # Bugs Everywhere is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation, either version 2 of the License, or (at your option) any # later version. # # Bugs Everywhere is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # Bugs Everywhere. If not, see . """ Handle conversion between the various BE storage formats. """ import codecs import json import os, os.path import sys import types import yaml import libbe import libbe.bug import libbe.storage.util.mapfile as mapfile from libbe.storage import STORAGE_VERSIONS, STORAGE_VERSION #import libbe.storage.vcs # delay import to avoid cyclic dependency import libbe.ui.util.editor import libbe.util import libbe.util.encoding as encoding import libbe.util.id def generate_yaml_mapfile(map): """From v1.1 to v1.5, BE dirs used YAML mapfiles >>> generate_yaml_mapfile({'q':'p'}) 'q: p\\n\\n' >>> generate_yaml_mapfile({'q':u'Fran\u00e7ais'}) 'q: Fran\\xc3\\xa7ais\\n\\n' >>> generate_yaml_mapfile({'q':u'hello'}) 'q: hello\\n\\n' """ keys = map.keys() keys.sort() for key in keys: try: assert not key.startswith('>') assert('\n' not in key) assert('=' not in key) assert(':' not in key) assert(len(key) > 0) except AssertionError: raise ValueError(unicode(key).encode('unicode_escape')) if '\n' in map[key]: raise ValueError(unicode(map[key]).encode('unicode_escape')) lines = [] for key in keys: lines.append(yaml.safe_dump({key: map[key]}, default_flow_style=False, allow_unicode=True)) lines.append('') return '\n'.join(lines) def parse_yaml_mapfile(contents): """From v1.1 to v1.5, BE dirs used YAML mapfiles >>> parse_yaml_mapfile('q: p\\n\\n')['q'] 'p' >>> parse_yaml_mapfile('q: \\'p\\'\\n\\n')['q'] 'p' >>> contents = generate_yaml_mapfile({'a':'b', 'c':'d', 'e':'f'}) >>> dict = parse_yaml_mapfile(contents) >>> dict['a'] 'b' >>> dict['c'] 'd' >>> dict['e'] 'f' >>> contents = generate_yaml_mapfile({'q':u'Fran\u00e7ais'}) >>> dict = parse_yaml_mapfile(contents) >>> dict['q'] u'Fran\\xe7ais' """ c = yaml.safe_load(contents) if type(c) == types.StringType: raise mapfile.InvalidMapfileContents( 'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents) return c or {} class Upgrader (object): "Class for converting between different on-disk BE storage formats." initial_version = None final_version = None def __init__(self, repo): import libbe.storage.vcs self.repo = repo vcs_name = self._get_vcs_name() if vcs_name == None: vcs_name = 'None' self.vcs = libbe.storage.vcs.vcs_by_name(vcs_name) self.vcs.repo = self.repo self.vcs.root() def get_path(self, *args): """ Return the absolute path using args relative to .be. """ dir = os.path.join(self.repo, '.be') if len(args) == 0: return dir return os.path.join(dir, *args) def _get_vcs_name(self): return None def check_initial_version(self): path = self.get_path('version') version = encoding.get_file_contents(path, decode=True).rstrip('\n') assert version == self.initial_version, '%s: %s' % (path, version) def set_version(self): path = self.get_path('version') encoding.set_file_contents(path, self.final_version+'\n') self.vcs._vcs_update(path) def upgrade(self): print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \ % (self.initial_version, self.final_version) self.check_initial_version() self.set_version() self._upgrade() def _upgrade(self): raise NotImplementedError class Upgrade_1_0_to_1_1 (Upgrader): initial_version = "Bugs Everywhere Tree 1 0" final_version = "Bugs Everywhere Directory v1.1" def _get_vcs_name(self): path = self.get_path('settings') settings = encoding.get_file_contents(path) for line in settings.splitlines(False): fields = line.split('=') if len(fields) == 2 and fields[0] == 'rcs_name': return fields[1] return None def _upgrade_mapfile(self, path): contents = encoding.get_file_contents(path, decode=True) old_format = False for line in contents.splitlines(): if len(line.split('=')) == 2: old_format = True break if old_format == True: # translate to YAML. newlines = [] for line in contents.splitlines(): line = line.rstrip('\n') if len(line) == 0: continue fields = line.split("=") if len(fields) == 2: key,value = fields newlines.append('%s: "%s"' % (key, value.replace('"','\\"'))) else: newlines.append(line) contents = '\n'.join(newlines) # load the YAML and save map = parse_yaml_mapfile(contents) if type(map) == types.StringType: raise ValueError((path, contents)) contents = generate_yaml_mapfile(map) encoding.set_file_contents(path, contents) self.vcs._vcs_update(path) def _upgrade(self): """ Comment value field "From" -> "Author". Homegrown mapfile -> YAML. """ path = self.get_path('settings') self._upgrade_mapfile(path) for bug_uuid in os.listdir(self.get_path('bugs')): path = self.get_path('bugs', bug_uuid, 'values') self._upgrade_mapfile(path) c_path = ['bugs', bug_uuid, 'comments'] if not os.path.exists(self.get_path(*c_path)): continue # no comments for this bug for comment_uuid in os.listdir(self.get_path(*c_path)): path_list = c_path + [comment_uuid, 'values'] path = self.get_path(*path_list) self._upgrade_mapfile(path) settings = mapfile.parse( encoding.get_file_contents(path)) if 'From' in settings: settings['Author'] = settings.pop('From') encoding.set_file_contents( path, generate_yaml_mapfile(settings)) self.vcs._vcs_update(path) class Upgrade_1_1_to_1_2 (Upgrader): initial_version = "Bugs Everywhere Directory v1.1" final_version = "Bugs Everywhere Directory v1.2" def _get_vcs_name(self): path = self.get_path('settings') settings = parse_yaml_mapfile(encoding.get_file_contents(path)) if 'rcs_name' in settings: return settings['rcs_name'] return None def _upgrade(self): """ BugDir settings field "rcs_name" -> "vcs_name". """ path = self.get_path('settings') settings = parse_yaml_mapfile(encoding.get_file_contents(path)) if 'rcs_name' in settings: settings['vcs_name'] = settings.pop('rcs_name') encoding.set_file_contents(path, generate_yaml_mapfile(settings)) self.vcs._vcs_update(path) class Upgrade_1_2_to_1_3 (Upgrader): initial_version = "Bugs Everywhere Directory v1.2" final_version = "Bugs Everywhere Directory v1.3" def __init__(self, *args, **kwargs): Upgrader.__init__(self, *args, **kwargs) self._targets = {} # key: target text,value: new target bug def _get_vcs_name(self): path = self.get_path('settings') settings = parse_yaml_mapfile(encoding.get_file_contents(path)) if 'vcs_name' in settings: return settings['vcs_name'] return None def _save_bug_settings(self, bug): # The target bugs don't have comments path = self.get_path('bugs', bug.uuid, 'values') if not os.path.exists(path): self.vcs._add_path(path, directory=False) path = self.get_path('bugs', bug.uuid, 'values') mf = generate_yaml_mapfile(bug._get_saved_settings()) encoding.set_file_contents(path, mf) self.vcs._vcs_update(path) def _target_bug(self, target_text): if target_text not in self._targets: bug = libbe.bug.Bug(summary=target_text) bug.severity = 'target' self._targets[target_text] = bug return self._targets[target_text] def _upgrade_bugdir_mapfile(self): path = self.get_path('settings') mf = encoding.get_file_contents(path) if mf == libbe.util.InvalidObject: return # settings file does not exist settings = parse_yaml_mapfile(mf) if 'target' in settings: settings['target'] = self._target_bug(settings['target']).uuid mf = generate_yaml_mapfile(settings) encoding.set_file_contents(path, mf) self.vcs._vcs_update(path) def _upgrade_bug_mapfile(self, bug_uuid): import libbe.command.depend as dep path = self.get_path('bugs', bug_uuid, 'values') mf = encoding.get_file_contents(path) if mf == libbe.util.InvalidObject: return # settings file does not exist settings = parse_yaml_mapfile(mf) if 'target' in settings: target_bug = self._target_bug(settings['target']) blocked_by_string = '%s%s' % (dep.BLOCKED_BY_TAG, bug_uuid) dep._add_remove_extra_string(target_bug, blocked_by_string, add=True) blocks_string = dep._generate_blocks_string(target_bug) estrs = settings.get('extra_strings', []) estrs.append(blocks_string) settings['extra_strings'] = sorted(estrs) settings.pop('target') mf = generate_yaml_mapfile(settings) encoding.set_file_contents(path, mf) self.vcs._vcs_update(path) def _upgrade(self): """ Bug value field "target" -> target bugs. Bugdir value field "target" -> pointer to current target bug. """ for bug_uuid in os.listdir(self.get_path('bugs')): self._upgrade_bug_mapfile(bug_uuid) self._upgrade_bugdir_mapfile() for bug in self._targets.values(): self._save_bug_settings(bug) class Upgrade_1_3_to_1_4 (Upgrader): initial_version = "Bugs Everywhere Directory v1.3" final_version = "Bugs Everywhere Directory v1.4" def _get_vcs_name(self): path = self.get_path('settings') settings = parse_yaml_mapfile(encoding.get_file_contents(path)) if 'vcs_name' in settings: return settings['vcs_name'] return None def _upgrade(self): """ add new directory "./be/BUGDIR-UUID" "./be/bugs" -> "./be/BUGDIR-UUID/bugs" "./be/settings" -> "./be/BUGDIR-UUID/settings" """ self.repo = os.path.abspath(self.repo) basenames = [p for p in os.listdir(self.get_path())] if not 'bugs' in basenames and not 'settings' in basenames \ and len([p for p in basenames if len(p)==36]) == 1: return # the user has upgraded the directory. basenames = [p for p in basenames if p in ['bugs','settings']] uuid = libbe.util.id.uuid_gen() add = [self.get_path(uuid)] move = [(self.get_path(p), self.get_path(uuid, p)) for p in basenames] msg = ['Upgrading BE directory version v1.3 to v1.4', '', "Because BE's VCS drivers don't support 'move',", 'please make the following changes with your VCS', 'and re-run BE. Note that you can choose a different', 'bugdir UUID to preserve uniformity across branches', 'of a distributed repository.' '', 'add', ' ' + '\n '.join(add), 'move', ' ' + '\n '.join(['%s %s' % (a,b) for a,b in move]), ] self.vcs._cached_path_id.destroy() raise Exception('Need user assistance\n%s' % '\n'.join(msg)) class Upgrade_1_4_to_1_5 (Upgrader): initial_version = "Bugs Everywhere Directory v1.4" final_version = "Bugs Everywhere Directory v1.5" def _get_vcs_name(self): path = self.get_path('settings') for p in os.listdir(self.get_path()): # check each bugdir's settings path = os.path.join(self.get_path(), p) if os.path.isdir(path): settings_path = os.path.join(path, 'settings') if os.path.isfile(settings_path): settings = parse_yaml_mapfile(encoding.get_file_contents( settings_path)) if 'vcs_name' in settings: return settings['vcs_name'] # first entry we found return None def _upgrade(self): """ convert YAML settings to JSON (much faster parsing) "./be/BUGDIR-UUID/settings" "./be/BUGDIR-UUID/bugs/BUG-UUID/values" "./be/BUGDIR-UUID/bugs/BUG-UUID/comments/COMMENT-UUID/values" """ self.repo = os.path.abspath(self.repo) basenames = [p for p in os.listdir(self.get_path())] for dirpath,dirnames,filenames in os.walk(self.get_path()): for filename in filenames: if filename in ['settings', 'values']: self._upgrade_mapfile(os.path.join(dirpath, filename)) def _upgrade_mapfile(self, path): contents = encoding.get_file_contents(path) data = parse_yaml_mapfile(contents) contents = mapfile.generate(data) encoding.set_file_contents(path, contents) self.vcs._vcs_update(path) upgraders = [Upgrade_1_0_to_1_1, Upgrade_1_1_to_1_2, Upgrade_1_2_to_1_3, Upgrade_1_3_to_1_4, Upgrade_1_4_to_1_5] upgrade_classes = {} for upgrader in upgraders: upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader def upgrade(path, current_version, target_version=STORAGE_VERSION): """ Call the appropriate upgrade function to convert current_version to target_version. If a direct conversion function does not exist, use consecutive conversion functions. """ if current_version not in STORAGE_VERSIONS: raise NotImplementedError, \ "Cannot handle version '%s' yet." % current_version if target_version not in STORAGE_VERSIONS: raise NotImplementedError, \ "Cannot handle version '%s' yet." % current_version if (current_version, target_version) in upgrade_classes: # direct conversion upgrade_class = upgrade_classes[(current_version, target_version)] u = upgrade_class(path) u.upgrade() else: # consecutive single-step conversion i = STORAGE_VERSIONS.index(current_version) while True: version_a = STORAGE_VERSIONS[i] version_b = STORAGE_VERSIONS[i+1] try: upgrade_class = upgrade_classes[(version_a, version_b)] except KeyError: raise NotImplementedError, \ "Cannot convert version '%s' to '%s' yet." \ % (version_a, version_b) u = upgrade_class(path) u.upgrade() if version_b == target_version: break i += 1