# Copyright (C) 2009-2012 Chris Ball <cjb@laptop.org>
# Gianluca Montecchi <gian@grys.it>
# W. Trevor King <wking@tremily.us>
#
# This file is part of Bugs Everywhere.
#
# Bugs Everywhere is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 2 of the License, or (at your option) any
# later version.
#
# Bugs Everywhere is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
"""
Handle conversion between the various BE storage formats.
"""
import codecs
import json
import os, os.path
import sys
import types
import yaml
import libbe
import libbe.bug
import libbe.storage.util.mapfile as mapfile
from libbe.storage import STORAGE_VERSIONS, STORAGE_VERSION
#import libbe.storage.vcs # delay import to avoid cyclic dependency
import libbe.ui.util.editor
import libbe.util
import libbe.util.encoding as encoding
import libbe.util.id
def generate_yaml_mapfile(map):
"""From v1.1 to v1.5, BE dirs used YAML mapfiles
>>> generate_yaml_mapfile({'q':'p'})
'q: p\\n\\n'
>>> generate_yaml_mapfile({'q':u'Fran\u00e7ais'})
'q: Fran\\xc3\\xa7ais\\n\\n'
>>> generate_yaml_mapfile({'q':u'hello'})
'q: hello\\n\\n'
"""
keys = map.keys()
keys.sort()
for key in keys:
try:
assert not key.startswith('>')
assert('\n' not in key)
assert('=' not in key)
assert(':' not in key)
assert(len(key) > 0)
except AssertionError:
raise ValueError(unicode(key).encode('unicode_escape'))
if '\n' in map[key]:
raise ValueError(unicode(map[key]).encode('unicode_escape'))
lines = []
for key in keys:
lines.append(yaml.safe_dump({key: map[key]},
default_flow_style=False,
allow_unicode=True))
lines.append('')
return '\n'.join(lines)
def parse_yaml_mapfile(contents):
"""From v1.1 to v1.5, BE dirs used YAML mapfiles
>>> parse_yaml_mapfile('q: p\\n\\n')['q']
'p'
>>> parse_yaml_mapfile('q: \\'p\\'\\n\\n')['q']
'p'
>>> contents = generate_yaml_mapfile({'a':'b', 'c':'d', 'e':'f'})
>>> dict = parse_yaml_mapfile(contents)
>>> dict['a']
'b'
>>> dict['c']
'd'
>>> dict['e']
'f'
>>> contents = generate_yaml_mapfile({'q':u'Fran\u00e7ais'})
>>> dict = parse_yaml_mapfile(contents)
>>> dict['q']
u'Fran\\xe7ais'
"""
c = yaml.safe_load(contents)
if type(c) == types.StringType:
raise mapfile.InvalidMapfileContents(
'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents)
return c or {}
class Upgrader (object):
"Class for converting between different on-disk BE storage formats."
initial_version = None
final_version = None
def __init__(self, repo):
import libbe.storage.vcs
self.repo = repo
vcs_name = self._get_vcs_name()
if vcs_name == None:
vcs_name = 'None'
self.vcs = libbe.storage.vcs.vcs_by_name(vcs_name)
self.vcs.repo = self.repo
self.vcs.root()
def get_path(self, *args):
"""
Return the absolute path using args relative to .be.
"""
dir = os.path.join(self.repo, '.be')
if len(args) == 0:
return dir
return os.path.join(dir, *args)
def _get_vcs_name(self):
return None
def check_initial_version(self):
path = self.get_path('version')
version = encoding.get_file_contents(path, decode=True).rstrip('\n')
assert version == self.initial_version, '%s: %s' % (path, version)
def set_version(self):
path = self.get_path('version')
encoding.set_file_contents(path, self.final_version+'\n')
self.vcs._vcs_update(path)
def upgrade(self):
print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \
% (self.initial_version, self.final_version)
self.check_initial_version()
self.set_version()
self._upgrade()
def _upgrade(self):
raise NotImplementedError
class Upgrade_1_0_to_1_1 (Upgrader):
initial_version = "Bugs Everywhere Tree 1 0"
final_version = "Bugs Everywhere Directory v1.1"
def _get_vcs_name(self):
path = self.get_path('settings')
settings = encoding.get_file_contents(path)
for line in settings.splitlines(False):
fields = line.split('=')
if len(fields) == 2 and fields[0] == 'rcs_name':
return fields[1]
return None
def _upgrade_mapfile(self, path):
contents = encoding.get_file_contents(path, decode=True)
old_format = False
for line in contents.splitlines():
if len(line.split('=')) == 2:
old_format = True
break
if old_format == True:
# translate to YAML.
newlines = []
for line in contents.splitlines():
line = line.rstrip('\n')
if len(line) == 0:
continue
fields = line.split("=")
if len(fields) == 2:
key,value = fields
newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
else:
newlines.append(line)
contents = '\n'.join(newlines)
# load the YAML and save
map = parse_yaml_mapfile(contents)
if type(map) == types.StringType:
raise ValueError((path, contents))
contents = generate_yaml_mapfile(map)
encoding.set_file_contents(path, contents)
self.vcs._vcs_update(path)
def _upgrade(self):
"""
Comment value field "From" -> "Author".
Homegrown mapfile -> YAML.
"""
path = self.get_path('settings')
self._upgrade_mapfile(path)
for bug_uuid in os.listdir(self.get_path('bugs')):
path = self.get_path('bugs', bug_uuid, 'values')
self._upgrade_mapfile(path)
c_path = ['bugs', bug_uuid, 'comments']
if not os.path.exists(self.get_path(*c_path)):
continue # no comments for this bug
for comment_uuid in os.listdir(self.get_path(*c_path)):
path_list = c_path + [comment_uuid, 'values']
path = self.get_path(*path_list)
self._upgrade_mapfile(path)
settings = mapfile.parse(
encoding.get_file_contents(path))
if 'From' in settings:
settings['Author'] = settings.pop('From')
encoding.set_file_contents(
path, generate_yaml_mapfile(settings))
self.vcs._vcs_update(path)
class Upgrade_1_1_to_1_2 (Upgrader):
initial_version = "Bugs Everywhere Directory v1.1"
final_version = "Bugs Everywhere Directory v1.2"
def _get_vcs_name(self):
path = self.get_path('settings')
settings = parse_yaml_mapfile(encoding.get_file_contents(path))
if 'rcs_name' in settings:
return settings['rcs_name']
return None
def _upgrade(self):
"""
BugDir settings field "rcs_name" -> "vcs_name".
"""
path = self.get_path('settings')
settings = parse_yaml_mapfile(encoding.get_file_contents(path))
if 'rcs_name' in settings:
settings['vcs_name'] = settings.pop('rcs_name')
encoding.set_file_contents(path, generate_yaml_mapfile(settings))
self.vcs._vcs_update(path)
class Upgrade_1_2_to_1_3 (Upgrader):
initial_version = "Bugs Everywhere Directory v1.2"
final_version = "Bugs Everywhere Directory v1.3"
def __init__(self, *args, **kwargs):
Upgrader.__init__(self, *args, **kwargs)
self._targets = {} # key: target text,value: new target bug
def _get_vcs_name(self):
path = self.get_path('settings')
settings = parse_yaml_mapfile(encoding.get_file_contents(path))
if 'vcs_name' in settings:
return settings['vcs_name']
return None
def _save_bug_settings(self, bug):
# The target bugs don't have comments
path = self.get_path('bugs', bug.uuid, 'values')
if not os.path.exists(path):
self.vcs._add_path(path, directory=False)
path = self.get_path('bugs', bug.uuid, 'values')
mf = generate_yaml_mapfile(bug._get_saved_settings())
encoding.set_file_contents(path, mf)
self.vcs._vcs_update(path)
def _target_bug(self, target_text):
if target_text not in self._targets:
bug = libbe.bug.Bug(summary=target_text)
bug.severity = 'target'
self._targets[target_text] = bug
return self._targets[target_text]
def _upgrade_bugdir_mapfile(self):
path = self.get_path('settings')
mf = encoding.get_file_contents(path)
if mf == libbe.util.InvalidObject:
return # settings file does not exist
settings = parse_yaml_mapfile(mf)
if 'target' in settings:
settings['target'] = self._target_bug(settings['target']).uuid
mf = generate_yaml_mapfile(settings)
encoding.set_file_contents(path, mf)
self.vcs._vcs_update(path)
def _upgrade_bug_mapfile(self, bug_uuid):
import libbe.command.depend as dep
path = self.get_path('bugs', bug_uuid, 'values')
mf = encoding.get_file_contents(path)
if mf == libbe.util.InvalidObject:
return # settings file does not exist
settings = parse_yaml_mapfile(mf)
if 'target' in settings:
target_bug = self._target_bug(settings['target'])
blocked_by_string = '%s%s' % (dep.BLOCKED_BY_TAG, bug_uuid)
dep._add_remove_extra_string(target_bug, blocked_by_string, add=True)
blocks_string = dep._generate_blocks_string(target_bug)
estrs = settings.get('extra_strings', [])
estrs.append(blocks_string)
settings['extra_strings'] = sorted(estrs)
settings.pop('target')
mf = generate_yaml_mapfile(settings)
encoding.set_file_contents(path, mf)
self.vcs._vcs_update(path)
def _upgrade(self):
"""
Bug value field "target" -> target bugs.
Bugdir value field "target" -> pointer to current target bug.
"""
for bug_uuid in os.listdir(self.get_path('bugs')):
self._upgrade_bug_mapfile(bug_uuid)
self._upgrade_bugdir_mapfile()
for bug in self._targets.values():
self._save_bug_settings(bug)
class Upgrade_1_3_to_1_4 (Upgrader):
initial_version = "Bugs Everywhere Directory v1.3"
final_version = "Bugs Everywhere Directory v1.4"
def _get_vcs_name(self):
path = self.get_path('settings')
settings = parse_yaml_mapfile(encoding.get_file_contents(path))
if 'vcs_name' in settings:
return settings['vcs_name']
return None
def _upgrade(self):
"""
add new directory "./be/BUGDIR-UUID"
"./be/bugs" -> "./be/BUGDIR-UUID/bugs"
"./be/settings" -> "./be/BUGDIR-UUID/settings"
"""
self.repo = os.path.abspath(self.repo)
basenames = [p for p in os.listdir(self.get_path())]
if not 'bugs' in basenames and not 'settings' in basenames \
and len([p for p in basenames if len(p)==36]) == 1:
return # the user has upgraded the directory.
basenames = [p for p in basenames if p in ['bugs','settings']]
uuid = libbe.util.id.uuid_gen()
add = [self.get_path(uuid)]
move = [(self.get_path(p), self.get_path(uuid, p)) for p in basenames]
msg = ['Upgrading BE directory version v1.3 to v1.4',
'',
"Because BE's VCS drivers don't support 'move',",
'please make the following changes with your VCS',
'and re-run BE. Note that you can choose a different',
'bugdir UUID to preserve uniformity across branches',
'of a distributed repository.'
'',
'add',
' ' + '\n '.join(add),
'move',
' ' + '\n '.join(['%s %s' % (a,b) for a,b in move]),
]
self.vcs._cached_path_id.destroy()
raise Exception('Need user assistance\n%s' % '\n'.join(msg))
class Upgrade_1_4_to_1_5 (Upgrader):
initial_version = "Bugs Everywhere Directory v1.4"
final_version = "Bugs Everywhere Directory v1.5"
def _get_vcs_name(self):
path = self.get_path('settings')
for p in os.listdir(self.get_path()): # check each bugdir's settings
path = os.path.join(self.get_path(), p)
if os.path.isdir(path):
settings_path = os.path.join(path, 'settings')
if os.path.isfile(settings_path):
settings = parse_yaml_mapfile(encoding.get_file_contents(
settings_path))
if 'vcs_name' in settings:
return settings['vcs_name'] # first entry we found
return None
def _upgrade(self):
"""
convert YAML settings to JSON (much faster parsing)
"./be/BUGDIR-UUID/settings"
"./be/BUGDIR-UUID/bugs/BUG-UUID/values"
"./be/BUGDIR-UUID/bugs/BUG-UUID/comments/COMMENT-UUID/values"
"""
self.repo = os.path.abspath(self.repo)
basenames = [p for p in os.listdir(self.get_path())]
for dirpath,dirnames,filenames in os.walk(self.get_path()):
for filename in filenames:
if filename in ['settings', 'values']:
self._upgrade_mapfile(os.path.join(dirpath, filename))
def _upgrade_mapfile(self, path):
contents = encoding.get_file_contents(path)
data = parse_yaml_mapfile(contents)
contents = mapfile.generate(data)
encoding.set_file_contents(path, contents)
self.vcs._vcs_update(path)
upgraders = [Upgrade_1_0_to_1_1,
Upgrade_1_1_to_1_2,
Upgrade_1_2_to_1_3,
Upgrade_1_3_to_1_4,
Upgrade_1_4_to_1_5]
upgrade_classes = {}
for upgrader in upgraders:
upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader
def upgrade(path, current_version,
target_version=STORAGE_VERSION):
"""
Call the appropriate upgrade function to convert current_version
to target_version. If a direct conversion function does not exist,
use consecutive conversion functions.
"""
if current_version not in STORAGE_VERSIONS:
raise NotImplementedError, \
"Cannot handle version '%s' yet." % current_version
if target_version not in STORAGE_VERSIONS:
raise NotImplementedError, \
"Cannot handle version '%s' yet." % current_version
if (current_version, target_version) in upgrade_classes:
# direct conversion
upgrade_class = upgrade_classes[(current_version, target_version)]
u = upgrade_class(path)
u.upgrade()
else:
# consecutive single-step conversion
i = STORAGE_VERSIONS.index(current_version)
while True:
version_a = STORAGE_VERSIONS[i]
version_b = STORAGE_VERSIONS[i+1]
try:
upgrade_class = upgrade_classes[(version_a, version_b)]
except KeyError:
raise NotImplementedError, \
"Cannot convert version '%s' to '%s' yet." \
% (version_a, version_b)
u = upgrade_class(path)
u.upgrade()
if version_b == target_version:
break
i += 1