diff options
author | W. Trevor King <wking@drexel.edu> | 2008-11-21 14:56:05 -0500 |
---|---|---|
committer | W. Trevor King <wking@drexel.edu> | 2008-11-21 14:56:05 -0500 |
commit | 23179f50092d91dbeab97ad2b88cdaadb79b615f (patch) | |
tree | 4a5579d686c573d6d438214aa0d2100f01083bef /libbe/bugdir.py | |
parent | a2bdbab9ccd9ca24ce470d2beeea86afb7ede2ae (diff) | |
download | bugseverywhere-23179f50092d91dbeab97ad2b88cdaadb79b615f.tar.gz |
Another major rewrite. Now BugDir, Bug, and Comment are more distinct.
I pushed a lot of the little helper functions into the main classes,
which makes it easier for me to keep track of what's going on. I'm
now at the point where I can run through `python test.py` with each of
the backends (by changing the search order in rcs.py
_get_matching_rcs) without any unexpected errors for each backend
(except Arch). I can also run `test_usage.sh` without non-Arch errors
either.
However, don't consider this a stable commit yet. The bzr backend is
*really*slow*, and the other's aren't blazingly fast either. I think
I'm rewriting the entire database every time I save it :p. Still, it
passes the checks. and I don't like it when zounds of changes build up.
Diffstat (limited to 'libbe/bugdir.py')
-rw-r--r-- | libbe/bugdir.py | 356 |
1 files changed, 222 insertions, 134 deletions
diff --git a/libbe/bugdir.py b/libbe/bugdir.py index 41f0fec..6152e3f 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -16,16 +16,17 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os import os.path -import cmdutil import errno +import time import unittest import doctest -import names + +from beuuid import uuid_gen import mapfile -import time +import bug +import cmdutil import utility -from rcs import rcs_by_name, installed_rcs -from bug import Bug +from rcs import rcs_by_name, detect_rcs, installed_rcs, PathNotInRoot class NoBugDir(Exception): def __init__(self, path): @@ -33,48 +34,6 @@ class NoBugDir(Exception): Exception.__init__(self, msg) self.path = path - -def iter_parent_dirs(cur_dir): - cur_dir = os.path.realpath(cur_dir) - old_dir = None - while True: - yield cur_dir - old_dir = cur_dir - cur_dir = os.path.normpath(os.path.join(cur_dir, '..')) - if old_dir == cur_dir: - break; - - -def tree_root(dir, old_version=False): - for rootdir in iter_parent_dirs(dir): - versionfile=os.path.join(rootdir, ".be", "version") - if os.path.exists(versionfile): - if not old_version: - test_version(versionfile) - return BugDir(os.path.join(rootdir, ".be")) - elif not os.path.exists(rootdir): - raise NoRootEntry(rootdir) - old_rootdir = rootdir - rootdir=os.path.join('..', rootdir) - - raise NoBugDir(dir) - -class BadTreeVersion(Exception): - def __init__(self, version): - Exception.__init__(self, "Unsupported tree version: %s" % version) - self.version = version - -def test_version(path): - tree_version = file(path, "rb").read() - if tree_version != TREE_VERSION_STRING: - raise BadTreeVersion(tree_version) - -def set_version(path, rcs): - rcs.set_file_contents(os.path.join(path, "version"), TREE_VERSION_STRING) - - -TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n" - class NoRootEntry(Exception): def __init__(self, path): self.path = path @@ -86,32 +45,15 @@ class AlreadyInitialized(Exception): Exception.__init__(self, "Specified root is already initialized: %s" % path) -def bugdir_root(versioning_root): - return os.path.join(versioning_root, ".be") +class InvalidValue(ValueError): + def __init__(self, name, value): + msg = "Cannot assign value %s to %s" % (value, name) + Exception.__init__(self, msg) + self.name = name + self.value = value + -def create_bug_dir(path, rcs): - """ - >>> import tests - >>> rcs = rcs_by_name("None") - >>> create_bug_dir('/highly-unlikely-to-exist', rcs) - Traceback (most recent call last): - NoRootEntry: Specified root does not exist: /highly-unlikely-to-exist - """ - root = os.path.join(path, ".be") - try: - rcs.mkdir(root) - except OSError, e: - if e.errno == errno.ENOENT: - raise NoRootEntry(path) - elif e.errno == errno.EEXIST: - raise AlreadyInitialized(path) - else: - raise - rcs.mkdir(os.path.join(root, "bugs")) - set_version(root, rcs) - mapfile.map_save(rcs, - os.path.join(root, "settings"), {"rcs_name": rcs.name}) - return BugDir(bugdir_root(path)) +TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n" def setting_property(name, valid=None): @@ -130,83 +72,232 @@ def setting_property(name, valid=None): del self.settings[name] else: self.settings[name] = value - self.save_settings() + self.save() return property(getter, setter) -class BugDir: - def __init__(self, dir): - self.dir = dir - self.bugs_path = os.path.join(self.dir, "bugs") +class BugDir (list): + def __init__(self, root=None, sink_to_existing_root=True, + assert_new_BugDir=False, allow_rcs_init=False, + loadNow=False, rcs=None): + list.__init__(self) + if root == None: + root = os.getcwd() + if sink_to_existing_root == True: + self.root = self.find_root(root) + else: + if not os.path.exists(root): + raise NoRootEntry(root) + self.root = root + if loadNow == True: + self.load() + else: + if assert_new_BugDir: + if os.path.exists(self.get_path()): + raise AlreadyInitialized, self.get_path() + if rcs == None: + rcs = self.guess_rcs(allow_rcs_init) + self.settings = {"rcs_name": self.rcs_name} + self.rcs_name = rcs.name + + def find_root(self, path): + """ + Search for an existing bug database dir and it's ancestors and + return a BugDir rooted there. + """ + if not os.path.exists(path): + raise NoRootEntry(path) + versionfile = utility.search_parent_directories(path, os.path.join(".be", "version")) + if versionfile != None: + beroot = os.path.dirname(versionfile) + root = os.path.dirname(beroot) + return root + else: + beroot = utility.search_parent_directories(path, ".be") + if beroot == None: + raise NoBugDir(path) + return beroot + + def get_version(self, path=None): + if path == None: + path = self.get_path("version") try: - self.settings = mapfile.map_load(os.path.join(self.dir,"settings")) - except mapfile.NoSuchFile: - self.settings = {"rcs_name": "None"} + tree_version = self.rcs.get_file_contents(path) + except AttributeError, e: + # haven't initialized rcs yet + tree_version = file(path, "rb").read().decode("utf-8") + return tree_version + + def set_version(self): + self.rcs.set_file_contents(self.get_path("version"), TREE_VERSION_STRING) rcs_name = setting_property("rcs_name", ("None", "bzr", "git", "Arch", "hg")) - _rcs = None - target = setting_property("target") - - def save_settings(self): - mapfile.map_save(self.rcs, - os.path.join(self.dir, "settings"), self.settings) + _rcs = None def _get_rcs(self): if self._rcs is not None: if self.rcs_name == self._rcs.name: return self._rcs self._rcs = rcs_by_name(self.rcs_name) - self._rcs.root(self.dir) + self._rcs.root(self.root) return self._rcs rcs = property(_get_rcs) + target = setting_property("target") + + def get_path(self, *args): + my_dir = os.path.join(self.root, ".be") + if len(args) == 0: + return my_dir + assert args[0] in ["version", "settings", "bugs"], str(args) + return os.path.join(my_dir, *args) + + def guess_rcs(self, allow_rcs_init=False): + deepdir = self.get_path() + if not os.path.exists(deepdir): + deepdir = os.path.dirname(deepdir) + rcs = detect_rcs(deepdir) + if rcs.name == "None": + if allow_rcs_init == True: + rcs = installed_rcs() + rcs.init(self.root) + self.settings = {"rcs_name": rcs.name} + self.rcs_name = rcs.name + return rcs + + def load(self): + version = self.get_version() + if version != TREE_VERSION_STRING: + raise NotImplementedError, "BugDir cannot handle version '%s' yet." % version + else: + if not os.path.exists(self.get_path()): + raise NoBugDir(self.get_path()) + self.settings = self._get_settings(self.get_path("settings")) + self._clear_bugs() + for uuid in self.list_uuids(): + self._load_bug(uuid) + + self._bug_map_gen() + + def save(self): + self.rcs.mkdir(self.get_path()) + self.set_version() + self._save_settings(self.get_path("settings"), self.settings) + self.rcs.mkdir(self.get_path("bugs")) + for bug in self: + bug.save() + + def _get_settings(self, settings_path): + try: + settings = mapfile.map_load(settings_path) + except mapfile.NoSuchFile: + settings = {"rcs_name": "None"} + return settings + + def _save_settings(self, settings_path, settings): + try: + mapfile.map_save(self.rcs, settings_path, settings) + except PathNotInRoot, e: + # Handling duplicate bugdir settings, special case + none_rcs = rcs_by_name("None") + none_rcs.root(settings_path) + mapfile.map_save(none_rcs, settings_path, settings) + def duplicate_bugdir(self, revision): - return BugDir(bugdir_root(self.rcs.duplicate_repo(revision))) + duplicate_path = self.rcs.duplicate_repo(revision) - def remove_duplicate_bugdir(self): - self.rcs.remove_duplicate_repo() + # setup revision RCS as None, since the duplicate may not be versioned + duplicate_settings_path = os.path.join(duplicate_path, ".be", "settings") + duplicate_settings = self._get_settings(duplicate_settings_path) + if "rcs_name" in duplicate_settings: + duplicate_settings["rcs_name"] = "None" + self._save_settings(duplicate_settings_path, duplicate_settings) - def list(self): - for uuid in self.list_uuids(): - yield self.get_bug(uuid) + return BugDir(duplicate_path, loadNow=True) - def bug_map(self): - bugs = {} - for bug in self.list(): - bugs[bug.uuid] = bug - return bugs + def remove_duplicate_bugdir(self): + self.rcs.remove_duplicate_repo() - def get_bug(self, uuid): - return Bug(self.bugs_path, uuid, self.rcs, self) + def _bug_map_gen(self): + map = {} + for bug in self: + map[bug.uuid] = bug + self.bug_map = map def list_uuids(self): - for uuid in os.listdir(self.bugs_path): + for uuid in os.listdir(self.get_path("bugs")): if (uuid.startswith('.')): continue yield uuid - def new_bug(self, uuid=None): - if uuid is None: - uuid = names.uuid() - path = os.path.join(self.bugs_path, uuid) - self.rcs.mkdir(path) - bug = Bug(self.bugs_path, None, self.rcs, self) - bug.uuid = uuid - bug.creator = self.rcs.get_user_id() - bug.severity = "minor" - bug.status = "open" - bug.time = time.time() - return bug + def _clear_bugs(self): + while len(self) > 0: + self.pop() + + def _load_bug(self, uuid): + bg = bug.Bug(bugdir=self, uuid=uuid, loadNow=True) + self.append(bg) + self._bug_map_gen() + return bg + + def new_bug(self, uuid=None, summary=None): + bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary) + self.append(bg) + self._bug_map_gen() + return bg + + def remove_bug(self, bug): + self.remove(bug) + bug.remove() + + def bug_shortname(self, bug): + """ + Generate short names from uuids. Picks the minimum number of + characters (>=3) from the beginning of the uuid such that the + short names are unique. + + Obviously, as the number of bugs in the database grows, these + short names will cease to be unique. The complete uuid should be + used for long term reference. + """ + chars = 3 + for uuid in self.bug_map.keys(): + if bug.uuid == uuid: + continue + while (bug.uuid[:chars] == uuid[:chars]): + chars+=1 + return bug.uuid[:chars] + + def bug_from_shortname(self, shortname): + """ + >>> bd = simple_bug_dir() + >>> bug_a = bd.bug_from_shortname('a') + >>> print type(bug_a) + <class 'libbe.bug.Bug'> + >>> print bug_a + a:om: Bug A + """ + matches = [] + for bug in self: + if bug.uuid.startswith(shortname): + matches.append(bug) + if len(matches) > 1: + raise cmdutil.UserError("More than one bug matches %s. Please be more" + " specific." % shortname) + if len(matches) == 1: + return matches[0] + raise KeyError("No bug matches %s" % shortname) + + def bug_from_uuid(self, uuid): + if uuid not in self.bug_map: + self._bug_map_gen() + if uuid not in self.bug_map: + raise KeyError("No bug matches %s" % uuid +str(self.bug_map)+str(self)) + return self.bug_map[uuid] -class InvalidValue(ValueError): - def __init__(self, name, value): - msg = "Cannot assign value %s to %s" % (value, name) - Exception.__init__(self, msg) - self.name = name - self.value = value def simple_bug_dir(): """ @@ -218,18 +309,17 @@ def simple_bug_dir(): ['a', 'b'] """ dir = utility.Dir() - rcs = installed_rcs() - rcs.init(dir.path) assert os.path.exists(dir.path) - bugdir = create_bug_dir(dir.path, rcs) + bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True) bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir. - bug_a = bugdir.new_bug("a") - bug_a.summary = "Bug A" - bug_a.save() - bug_b = bugdir.new_bug("b") + bug_a = bugdir.new_bug("a", summary="Bug A") + bug_a.creator = "John Doe <jdoe@example.com>" + bug_a.time = 0 + bug_b = bugdir.new_bug("b", summary="Bug B") + bug_b.creator = "Jane Doe <jdoe@example.com>" + bug_b.time = 0 bug_b.status = "closed" - bug_b.summary = "Bug B" - bug_b.save() + bugdir.save() return bugdir @@ -238,9 +328,8 @@ class BugDirTestCase(unittest.TestCase): unittest.TestCase.__init__(self, *args, **kwargs) def setUp(self): self.dir = utility.Dir() - self.rcs = installed_rcs() - self.rcs.init(self.dir.path) - self.bugdir = create_bug_dir(self.dir.path, self.rcs) + self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, allow_rcs_init=True) + self.rcs = self.bugdir.rcs def tearDown(self): del(self.rcs) del(self.dir) @@ -250,9 +339,8 @@ class BugDirTestCase(unittest.TestCase): fullpath = self.fullPath(path) self.failUnless(os.path.exists(fullpath)==True, "path %s does not exist" % fullpath) - def testBugDirDuplicate(self): - self.assertRaises(AlreadyInitialized, create_bug_dir, - self.dir.path, self.rcs) + self.assertRaises(AlreadyInitialized, BugDir, + self.dir.path, assertNewBugDir=True) unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |