aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/bugdir.py
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2008-11-21 14:56:05 -0500
committerW. Trevor King <wking@drexel.edu>2008-11-21 14:56:05 -0500
commit23179f50092d91dbeab97ad2b88cdaadb79b615f (patch)
tree4a5579d686c573d6d438214aa0d2100f01083bef /libbe/bugdir.py
parenta2bdbab9ccd9ca24ce470d2beeea86afb7ede2ae (diff)
downloadbugseverywhere-23179f50092d91dbeab97ad2b88cdaadb79b615f.tar.gz
Another major rewrite. Now BugDir, Bug, and Comment are more distinct.
I pushed a lot of the little helper functions into the main classes, which makes it easier for me to keep track of what's going on. I'm now at the point where I can run through `python test.py` with each of the backends (by changing the search order in rcs.py _get_matching_rcs) without any unexpected errors for each backend (except Arch). I can also run `test_usage.sh` without non-Arch errors either. However, don't consider this a stable commit yet. The bzr backend is *really*slow*, and the other's aren't blazingly fast either. I think I'm rewriting the entire database every time I save it :p. Still, it passes the checks. and I don't like it when zounds of changes build up.
Diffstat (limited to 'libbe/bugdir.py')
-rw-r--r--libbe/bugdir.py356
1 files changed, 222 insertions, 134 deletions
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index 41f0fec..6152e3f 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -16,16 +16,17 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
import os.path
-import cmdutil
import errno
+import time
import unittest
import doctest
-import names
+
+from beuuid import uuid_gen
import mapfile
-import time
+import bug
+import cmdutil
import utility
-from rcs import rcs_by_name, installed_rcs
-from bug import Bug
+from rcs import rcs_by_name, detect_rcs, installed_rcs, PathNotInRoot
class NoBugDir(Exception):
def __init__(self, path):
@@ -33,48 +34,6 @@ class NoBugDir(Exception):
Exception.__init__(self, msg)
self.path = path
-
-def iter_parent_dirs(cur_dir):
- cur_dir = os.path.realpath(cur_dir)
- old_dir = None
- while True:
- yield cur_dir
- old_dir = cur_dir
- cur_dir = os.path.normpath(os.path.join(cur_dir, '..'))
- if old_dir == cur_dir:
- break;
-
-
-def tree_root(dir, old_version=False):
- for rootdir in iter_parent_dirs(dir):
- versionfile=os.path.join(rootdir, ".be", "version")
- if os.path.exists(versionfile):
- if not old_version:
- test_version(versionfile)
- return BugDir(os.path.join(rootdir, ".be"))
- elif not os.path.exists(rootdir):
- raise NoRootEntry(rootdir)
- old_rootdir = rootdir
- rootdir=os.path.join('..', rootdir)
-
- raise NoBugDir(dir)
-
-class BadTreeVersion(Exception):
- def __init__(self, version):
- Exception.__init__(self, "Unsupported tree version: %s" % version)
- self.version = version
-
-def test_version(path):
- tree_version = file(path, "rb").read()
- if tree_version != TREE_VERSION_STRING:
- raise BadTreeVersion(tree_version)
-
-def set_version(path, rcs):
- rcs.set_file_contents(os.path.join(path, "version"), TREE_VERSION_STRING)
-
-
-TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
-
class NoRootEntry(Exception):
def __init__(self, path):
self.path = path
@@ -86,32 +45,15 @@ class AlreadyInitialized(Exception):
Exception.__init__(self,
"Specified root is already initialized: %s" % path)
-def bugdir_root(versioning_root):
- return os.path.join(versioning_root, ".be")
+class InvalidValue(ValueError):
+ def __init__(self, name, value):
+ msg = "Cannot assign value %s to %s" % (value, name)
+ Exception.__init__(self, msg)
+ self.name = name
+ self.value = value
+
-def create_bug_dir(path, rcs):
- """
- >>> import tests
- >>> rcs = rcs_by_name("None")
- >>> create_bug_dir('/highly-unlikely-to-exist', rcs)
- Traceback (most recent call last):
- NoRootEntry: Specified root does not exist: /highly-unlikely-to-exist
- """
- root = os.path.join(path, ".be")
- try:
- rcs.mkdir(root)
- except OSError, e:
- if e.errno == errno.ENOENT:
- raise NoRootEntry(path)
- elif e.errno == errno.EEXIST:
- raise AlreadyInitialized(path)
- else:
- raise
- rcs.mkdir(os.path.join(root, "bugs"))
- set_version(root, rcs)
- mapfile.map_save(rcs,
- os.path.join(root, "settings"), {"rcs_name": rcs.name})
- return BugDir(bugdir_root(path))
+TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
def setting_property(name, valid=None):
@@ -130,83 +72,232 @@ def setting_property(name, valid=None):
del self.settings[name]
else:
self.settings[name] = value
- self.save_settings()
+ self.save()
return property(getter, setter)
-class BugDir:
- def __init__(self, dir):
- self.dir = dir
- self.bugs_path = os.path.join(self.dir, "bugs")
+class BugDir (list):
+ def __init__(self, root=None, sink_to_existing_root=True,
+ assert_new_BugDir=False, allow_rcs_init=False,
+ loadNow=False, rcs=None):
+ list.__init__(self)
+ if root == None:
+ root = os.getcwd()
+ if sink_to_existing_root == True:
+ self.root = self.find_root(root)
+ else:
+ if not os.path.exists(root):
+ raise NoRootEntry(root)
+ self.root = root
+ if loadNow == True:
+ self.load()
+ else:
+ if assert_new_BugDir:
+ if os.path.exists(self.get_path()):
+ raise AlreadyInitialized, self.get_path()
+ if rcs == None:
+ rcs = self.guess_rcs(allow_rcs_init)
+ self.settings = {"rcs_name": self.rcs_name}
+ self.rcs_name = rcs.name
+
+ def find_root(self, path):
+ """
+ Search for an existing bug database dir and it's ancestors and
+ return a BugDir rooted there.
+ """
+ if not os.path.exists(path):
+ raise NoRootEntry(path)
+ versionfile = utility.search_parent_directories(path, os.path.join(".be", "version"))
+ if versionfile != None:
+ beroot = os.path.dirname(versionfile)
+ root = os.path.dirname(beroot)
+ return root
+ else:
+ beroot = utility.search_parent_directories(path, ".be")
+ if beroot == None:
+ raise NoBugDir(path)
+ return beroot
+
+ def get_version(self, path=None):
+ if path == None:
+ path = self.get_path("version")
try:
- self.settings = mapfile.map_load(os.path.join(self.dir,"settings"))
- except mapfile.NoSuchFile:
- self.settings = {"rcs_name": "None"}
+ tree_version = self.rcs.get_file_contents(path)
+ except AttributeError, e:
+ # haven't initialized rcs yet
+ tree_version = file(path, "rb").read().decode("utf-8")
+ return tree_version
+
+ def set_version(self):
+ self.rcs.set_file_contents(self.get_path("version"), TREE_VERSION_STRING)
rcs_name = setting_property("rcs_name",
("None", "bzr", "git", "Arch", "hg"))
- _rcs = None
- target = setting_property("target")
-
- def save_settings(self):
- mapfile.map_save(self.rcs,
- os.path.join(self.dir, "settings"), self.settings)
+ _rcs = None
def _get_rcs(self):
if self._rcs is not None:
if self.rcs_name == self._rcs.name:
return self._rcs
self._rcs = rcs_by_name(self.rcs_name)
- self._rcs.root(self.dir)
+ self._rcs.root(self.root)
return self._rcs
rcs = property(_get_rcs)
+ target = setting_property("target")
+
+ def get_path(self, *args):
+ my_dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return my_dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(my_dir, *args)
+
+ def guess_rcs(self, allow_rcs_init=False):
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ rcs = detect_rcs(deepdir)
+ if rcs.name == "None":
+ if allow_rcs_init == True:
+ rcs = installed_rcs()
+ rcs.init(self.root)
+ self.settings = {"rcs_name": rcs.name}
+ self.rcs_name = rcs.name
+ return rcs
+
+ def load(self):
+ version = self.get_version()
+ if version != TREE_VERSION_STRING:
+ raise NotImplementedError, "BugDir cannot handle version '%s' yet." % version
+ else:
+ if not os.path.exists(self.get_path()):
+ raise NoBugDir(self.get_path())
+ self.settings = self._get_settings(self.get_path("settings"))
+ self._clear_bugs()
+ for uuid in self.list_uuids():
+ self._load_bug(uuid)
+
+ self._bug_map_gen()
+
+ def save(self):
+ self.rcs.mkdir(self.get_path())
+ self.set_version()
+ self._save_settings(self.get_path("settings"), self.settings)
+ self.rcs.mkdir(self.get_path("bugs"))
+ for bug in self:
+ bug.save()
+
+ def _get_settings(self, settings_path):
+ try:
+ settings = mapfile.map_load(settings_path)
+ except mapfile.NoSuchFile:
+ settings = {"rcs_name": "None"}
+ return settings
+
+ def _save_settings(self, settings_path, settings):
+ try:
+ mapfile.map_save(self.rcs, settings_path, settings)
+ except PathNotInRoot, e:
+ # Handling duplicate bugdir settings, special case
+ none_rcs = rcs_by_name("None")
+ none_rcs.root(settings_path)
+ mapfile.map_save(none_rcs, settings_path, settings)
+
def duplicate_bugdir(self, revision):
- return BugDir(bugdir_root(self.rcs.duplicate_repo(revision)))
+ duplicate_path = self.rcs.duplicate_repo(revision)
- def remove_duplicate_bugdir(self):
- self.rcs.remove_duplicate_repo()
+ # setup revision RCS as None, since the duplicate may not be versioned
+ duplicate_settings_path = os.path.join(duplicate_path, ".be", "settings")
+ duplicate_settings = self._get_settings(duplicate_settings_path)
+ if "rcs_name" in duplicate_settings:
+ duplicate_settings["rcs_name"] = "None"
+ self._save_settings(duplicate_settings_path, duplicate_settings)
- def list(self):
- for uuid in self.list_uuids():
- yield self.get_bug(uuid)
+ return BugDir(duplicate_path, loadNow=True)
- def bug_map(self):
- bugs = {}
- for bug in self.list():
- bugs[bug.uuid] = bug
- return bugs
+ def remove_duplicate_bugdir(self):
+ self.rcs.remove_duplicate_repo()
- def get_bug(self, uuid):
- return Bug(self.bugs_path, uuid, self.rcs, self)
+ def _bug_map_gen(self):
+ map = {}
+ for bug in self:
+ map[bug.uuid] = bug
+ self.bug_map = map
def list_uuids(self):
- for uuid in os.listdir(self.bugs_path):
+ for uuid in os.listdir(self.get_path("bugs")):
if (uuid.startswith('.')):
continue
yield uuid
- def new_bug(self, uuid=None):
- if uuid is None:
- uuid = names.uuid()
- path = os.path.join(self.bugs_path, uuid)
- self.rcs.mkdir(path)
- bug = Bug(self.bugs_path, None, self.rcs, self)
- bug.uuid = uuid
- bug.creator = self.rcs.get_user_id()
- bug.severity = "minor"
- bug.status = "open"
- bug.time = time.time()
- return bug
+ def _clear_bugs(self):
+ while len(self) > 0:
+ self.pop()
+
+ def _load_bug(self, uuid):
+ bg = bug.Bug(bugdir=self, uuid=uuid, loadNow=True)
+ self.append(bg)
+ self._bug_map_gen()
+ return bg
+
+ def new_bug(self, uuid=None, summary=None):
+ bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary)
+ self.append(bg)
+ self._bug_map_gen()
+ return bg
+
+ def remove_bug(self, bug):
+ self.remove(bug)
+ bug.remove()
+
+ def bug_shortname(self, bug):
+ """
+ Generate short names from uuids. Picks the minimum number of
+ characters (>=3) from the beginning of the uuid such that the
+ short names are unique.
+
+ Obviously, as the number of bugs in the database grows, these
+ short names will cease to be unique. The complete uuid should be
+ used for long term reference.
+ """
+ chars = 3
+ for uuid in self.bug_map.keys():
+ if bug.uuid == uuid:
+ continue
+ while (bug.uuid[:chars] == uuid[:chars]):
+ chars+=1
+ return bug.uuid[:chars]
+
+ def bug_from_shortname(self, shortname):
+ """
+ >>> bd = simple_bug_dir()
+ >>> bug_a = bd.bug_from_shortname('a')
+ >>> print type(bug_a)
+ <class 'libbe.bug.Bug'>
+ >>> print bug_a
+ a:om: Bug A
+ """
+ matches = []
+ for bug in self:
+ if bug.uuid.startswith(shortname):
+ matches.append(bug)
+ if len(matches) > 1:
+ raise cmdutil.UserError("More than one bug matches %s. Please be more"
+ " specific." % shortname)
+ if len(matches) == 1:
+ return matches[0]
+ raise KeyError("No bug matches %s" % shortname)
+
+ def bug_from_uuid(self, uuid):
+ if uuid not in self.bug_map:
+ self._bug_map_gen()
+ if uuid not in self.bug_map:
+ raise KeyError("No bug matches %s" % uuid +str(self.bug_map)+str(self))
+ return self.bug_map[uuid]
-class InvalidValue(ValueError):
- def __init__(self, name, value):
- msg = "Cannot assign value %s to %s" % (value, name)
- Exception.__init__(self, msg)
- self.name = name
- self.value = value
def simple_bug_dir():
"""
@@ -218,18 +309,17 @@ def simple_bug_dir():
['a', 'b']
"""
dir = utility.Dir()
- rcs = installed_rcs()
- rcs.init(dir.path)
assert os.path.exists(dir.path)
- bugdir = create_bug_dir(dir.path, rcs)
+ bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True)
bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir.
- bug_a = bugdir.new_bug("a")
- bug_a.summary = "Bug A"
- bug_a.save()
- bug_b = bugdir.new_bug("b")
+ bug_a = bugdir.new_bug("a", summary="Bug A")
+ bug_a.creator = "John Doe <jdoe@example.com>"
+ bug_a.time = 0
+ bug_b = bugdir.new_bug("b", summary="Bug B")
+ bug_b.creator = "Jane Doe <jdoe@example.com>"
+ bug_b.time = 0
bug_b.status = "closed"
- bug_b.summary = "Bug B"
- bug_b.save()
+ bugdir.save()
return bugdir
@@ -238,9 +328,8 @@ class BugDirTestCase(unittest.TestCase):
unittest.TestCase.__init__(self, *args, **kwargs)
def setUp(self):
self.dir = utility.Dir()
- self.rcs = installed_rcs()
- self.rcs.init(self.dir.path)
- self.bugdir = create_bug_dir(self.dir.path, self.rcs)
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, allow_rcs_init=True)
+ self.rcs = self.bugdir.rcs
def tearDown(self):
del(self.rcs)
del(self.dir)
@@ -250,9 +339,8 @@ class BugDirTestCase(unittest.TestCase):
fullpath = self.fullPath(path)
self.failUnless(os.path.exists(fullpath)==True,
"path %s does not exist" % fullpath)
- def testBugDirDuplicate(self):
- self.assertRaises(AlreadyInitialized, create_bug_dir,
- self.dir.path, self.rcs)
+ self.assertRaises(AlreadyInitialized, BugDir,
+ self.dir.path, assertNewBugDir=True)
unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])