aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/bugdir.py
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/bugdir.py')
-rw-r--r--libbe/bugdir.py375
1 files changed, 253 insertions, 122 deletions
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index 7e4cf3e..7885224 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -22,10 +22,16 @@ import copy
import unittest
import doctest
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+import settings_object
import mapfile
import bug
-import utility
import rcs
+import encoding
+import utility
class NoBugDir(Exception):
@@ -64,28 +70,29 @@ class MultipleBugMatches(ValueError):
TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
-def setting_property(name, valid=None, doc=None):
- def getter(self):
- value = self.settings.get(name)
- if valid is not None:
- if value not in valid and value != None:
- raise InvalidValue(name, value)
- return value
+class BugDir (list, settings_object.SavedSettingsObject):
+ """
+ Sink to existing root
+ ======================
- def setter(self, value):
- if value != getter(self):
- if value is None:
- del self.settings[name]
- else:
- self.settings[name] = value
- self._save_settings(self.get_path("settings"), self.settings)
+ Consider the following usage case:
+ You have a bug directory rooted in
+ /path/to/source
+ by which I mean the '.be' directory is at
+ /path/to/source/.be
+ However, you're of in some subdirectory like
+ /path/to/source/GUI/testing
+ and you want to comment on a bug. Setting sink_to_root=True wen
+ you initialize your BugDir will cause it to search for the '.be'
+ file in the ancestors of the path you passed in as 'root'.
+ /path/to/source/GUI/testing/.be miss
+ /path/to/source/GUI/.be miss
+ /path/to/source/.be hit!
+ So it still roots itself appropriately without much work for you.
+
+ File-system access
+ ==================
- return property(getter, setter, doc=doc)
-
-
-class BugDir (list):
- """
- File-system access:
When rooted in non-bugdir directory, BugDirs live completely in
memory until the first call to .save(). This creates a '.be'
sub-directory containing configurations options, bugs, comments,
@@ -95,13 +102,166 @@ class BugDir (list):
will only load information from the file system when it loads new
bugs/comments that it doesn't already have in memory, or when it
explicitly asked to do so (e.g. .load() or __init__(from_disk=True)).
+
+ Allow RCS initialization
+ ========================
+
+ This one is for testing purposes. Setting it to True allows the
+ BugDir to search for an installed RCS backend and initialize it in
+ the root directory. This is a convenience option for supporting
+ tests of versioning functionality (e.g. .duplicate_bugdir).
+
+ Disable encoding manipulation
+ =============================
+
+ This one is for testing purposed. You might have non-ASCII
+ Unicode in your bugs, comments, files, etc. BugDir instances try
+ and support your preferred encoding scheme (e.g. "utf-8") when
+ dealing with stream and file input/output. For stream output,
+ this involves replacing sys.stdout and sys.stderr
+ (libbe.encode.set_IO_stream_encodings). However this messes up
+ doctest's output catching. In order to support doctest tests
+ using BugDirs, set manipulate_encodings=False, and stick to ASCII
+ in your tests.
"""
+
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+
+ @_versioned_property(name="target",
+ doc="The current project development target")
+ def target(): return {}
+
+ def _guess_encoding(self):
+ return encoding.get_encoding()
+ def _check_encoding(value):
+ if value != None and value != settings_object.EMPTY:
+ return encoding.known_encoding(value)
+ def _setup_encoding(self, new_encoding):
+ if new_encoding != None and new_encoding != settings_object.EMPTY:
+ if self._manipulate_encodings == True:
+ encoding.set_IO_stream_encodings(new_encoding)
+ def _set_encoding(self, old_encoding, new_encoding):
+ self._setup_encoding(new_encoding)
+ self._prop_save_settings(old_encoding, new_encoding)
+
+ @_versioned_property(name="encoding",
+ doc="""The default input/output encoding to use (e.g. "utf-8").""",
+ change_hook=_set_encoding,
+ generator=_guess_encoding,
+ check_fn=_check_encoding)
+ def encoding(): return {}
+
+ def _setup_user_id(self, user_id):
+ self.rcs.user_id = user_id
+ def _guess_user_id(self):
+ return self.rcs.get_user_id()
+ def _set_user_id(self, old_user_id, new_user_id):
+ self._setup_user_id(new_user_id)
+ self._prop_save_settings(old_user_id, new_user_id)
+
+ @_versioned_property(name="user_id",
+ doc=
+"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note
+that the Arch RCS backend *enforces* ids with this format.""",
+ change_hook=_set_user_id,
+ generator=_guess_user_id)
+ def user_id(): return {}
+
+ @_versioned_property(name="default_assignee",
+ doc=
+"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
+ def default_assignee(): return {}
+
+ @_versioned_property(name="rcs_name",
+ doc="""The name of the current RCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .rcs instead, and
+.rcs_name will be automatically adjusted.""",
+ default="None",
+ allowed=["None", "Arch", "bzr", "git", "hg"])
+ def rcs_name(): return {}
+
+ def _get_rcs(self, rcs_name=None):
+ """Get and root a new revision control system"""
+ if rcs_name == None:
+ rcs_name = self.rcs_name
+ new_rcs = rcs.rcs_by_name(rcs_name)
+ self._change_rcs(None, new_rcs)
+ return new_rcs
+ def _change_rcs(self, old_rcs, new_rcs):
+ new_rcs.encoding = self.encoding
+ new_rcs.root(self.root)
+ self.rcs_name = new_rcs.name
+
+ @Property
+ @change_hook_property(hook=_change_rcs)
+ @cached_property(generator=_get_rcs)
+ @local_property("rcs")
+ @doc_property(doc="A revision control system instance.")
+ def rcs(): return {}
+
+ def _bug_map_gen(self):
+ map = {}
+ for bug in self:
+ map[bug.uuid] = bug
+ for uuid in self.list_uuids():
+ if uuid not in map:
+ map[uuid] = None
+ self._bug_map_value = map # ._bug_map_value used by @local_property
+
+ @Property
+ @primed_property(primer=_bug_map_gen)
+ @local_property("bug_map")
+ @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
+ def _bug_map(): return {}
+
+ def _setup_severities(self, severities):
+ if severities != None and severities != settings_object.EMPTY:
+ bug.load_severities(severities)
+ def _set_severities(self, old_severities, new_severities):
+ self._setup_severities(new_severities)
+ self._prop_save_settings(old_severities, new_severities)
+ @_versioned_property(name="severities",
+ doc="The allowed bug severities and their descriptions.",
+ change_hook=_set_severities)
+ def severities(): return {}
+
+ def _setup_status(self, active_status, inactive_status):
+ bug.load_status(active_status, inactive_status)
+ def _set_active_status(self, old_active_status, new_active_status):
+ self._setup_status(new_active_status, self.inactive_status)
+ self._prop_save_settings(old_active_status, new_active_status)
+ @_versioned_property(name="active_status",
+ doc="The allowed active bug states and their descriptions.",
+ change_hook=_set_active_status)
+ def active_status(): return {}
+
+ def _set_inactive_status(self, old_inactive_status, new_inactive_status):
+ self._setup_status(self.active_status, new_inactive_status)
+ self._prop_save_settings(old_inactive_status, new_inactive_status)
+ @_versioned_property(name="inactive_status",
+ doc="The allowed inactive bug states and their descriptions.",
+ change_hook=_set_inactive_status)
+ def inactive_status(): return {}
+
+
def __init__(self, root=None, sink_to_existing_root=True,
assert_new_BugDir=False, allow_rcs_init=False,
+ manipulate_encodings=True,
from_disk=False, rcs=None):
list.__init__(self)
- self._save_user_id = False
- self.settings = {}
+ settings_object.SavedSettingsObject.__init__(self)
+ self._manipulate_encodings = manipulate_encodings
if root == None:
root = os.getcwd()
if sink_to_existing_root == True:
@@ -110,16 +270,22 @@ class BugDir (list):
if not os.path.exists(root):
raise NoRootEntry(root)
self.root = root
+ # get a temporary rcs until we've loaded settings
+ self.sync_with_disk = False
+ self.rcs = self._guess_rcs()
+
if from_disk == True:
+ self.sync_with_disk = True
self.load()
else:
+ self.sync_with_disk = False
if assert_new_BugDir == True:
if os.path.exists(self.get_path()):
raise AlreadyInitialized, self.get_path()
if rcs == None:
rcs = self._guess_rcs(allow_rcs_init)
self.rcs = rcs
- user_id = self.rcs.get_user_id()
+ self._setup_user_id(self.user_id)
def _find_root(self, path):
"""
@@ -128,7 +294,8 @@ class BugDir (list):
"""
if not os.path.exists(path):
raise NoRootEntry(path)
- versionfile = utility.search_parent_directories(path, os.path.join(".be", "version"))
+ versionfile=utility.search_parent_directories(path,
+ os.path.join(".be", "version"))
if versionfile != None:
beroot = os.path.dirname(versionfile)
root = os.path.dirname(beroot)
@@ -139,11 +306,11 @@ class BugDir (list):
raise NoBugDir(path)
return beroot
- def get_version(self, path=None):
- if self.rcs_name == None:
- # Use a temporary RCS to check the version for the first time
+ def get_version(self, path=None, use_none_rcs=False):
+ if use_none_rcs == True:
RCS = rcs.rcs_by_name("None")
RCS.root(self.root)
+ RCS.encoding = encoding.get_encoding()
else:
RCS = self.rcs
@@ -156,57 +323,6 @@ class BugDir (list):
self.rcs.set_file_contents(self.get_path("version"),
TREE_VERSION_STRING)
- rcs_name = setting_property("rcs_name",
- ("None", "bzr", "git", "Arch", "hg"),
- doc=
-"""The name of the current RCS. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .rcs instead, and
-.rcs_name will be automatically adjusted.""")
-
- _rcs = None
-
- def _get_rcs(self):
- return self._rcs
-
- def _set_rcs(self, new_rcs):
- if new_rcs == None:
- new_rcs = rcs.rcs_by_name("None")
- self._rcs = new_rcs
- new_rcs.root(self.root)
- self.rcs_name = new_rcs.name
-
- rcs = property(_get_rcs, _set_rcs,
- doc="A revision control system (RCS) instance")
-
- _user_id = setting_property("user-id", doc=
-"""The user's prefered name. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .user_id instead,
-and ._user_id will be automatically adjusted. This setting is
-only saved if ._save_user_id == True""")
-
- def _get_user_id(self):
- if self._user_id == None and self.rcs != None:
- self._user_id = self.rcs.get_user_id()
- return self._user_id
-
- def _set_user_id(self, user_id):
- if self.rcs != None:
- self.rcs.user_id = user_id
- self._user_id = user_id
-
- user_id = property(_get_user_id, _set_user_id, doc=
-"""The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Note
-that the Arch RCS backend *enforces* ids with this format.""")
-
- target = setting_property("target",
- doc="The current project development target")
-
- def save_user_id(self, user_id=None):
- if user_id == None:
- user_id = self.user_id
- self._save_user_id = True
- self.user_id = user_id
-
def get_path(self, *args):
my_dir = os.path.join(self.root, ".be")
if len(args) == 0:
@@ -224,24 +340,20 @@ that the Arch RCS backend *enforces* ids with this format.""")
if allow_rcs_init == True:
new_rcs = rcs.installed_rcs()
new_rcs.init(self.root)
- self.rcs = new_rcs
return new_rcs
def load(self):
- version = self.get_version()
+ version = self.get_version(use_none_rcs=True)
if version != TREE_VERSION_STRING:
raise NotImplementedError, \
"BugDir cannot handle version '%s' yet." % version
else:
if not os.path.exists(self.get_path()):
raise NoBugDir(self.get_path())
- self.settings = self._get_settings(self.get_path("settings"))
+ self.load_settings()
self.rcs = rcs.rcs_by_name(self.rcs_name)
- if self._user_id != None: # was a user name in the settings file
- self.save_user_id()
-
- self._bug_map_gen()
+ self._setup_user_id(self.user_id)
def load_all_bugs(self):
"Warning: this could take a while."
@@ -252,43 +364,35 @@ that the Arch RCS backend *enforces* ids with this format.""")
def save(self):
self.rcs.mkdir(self.get_path())
self.set_version()
- self._save_settings(self.get_path("settings"), self.settings)
+ self.save_settings()
self.rcs.mkdir(self.get_path("bugs"))
for bug in self:
bug.save()
+ def load_settings(self):
+ self.settings = self._get_settings(self.get_path("settings"))
+ self._setup_saved_settings()
+ self._setup_user_id(self.user_id)
+ self._setup_encoding(self.encoding)
+ self._setup_severities(self.severities)
+ self._setup_status(self.active_status, self.inactive_status)
+
def _get_settings(self, settings_path):
- if self.rcs_name == None:
- # Use a temporary RCS to loading settings the first time
- RCS = rcs.rcs_by_name("None")
- RCS.root(self.root)
- else:
- RCS = self.rcs
-
- allow_no_rcs = not RCS.path_in_root(settings_path)
+ allow_no_rcs = not self.rcs.path_in_root(settings_path)
# allow_no_rcs=True should only be for the special case of
# configuring duplicate bugdir settings
try:
- settings = mapfile.map_load(RCS, settings_path, allow_no_rcs)
+ settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
except rcs.NoSuchFile:
settings = {"rcs_name": "None"}
return settings
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self._save_settings(self.get_path("settings"), settings)
+
def _save_settings(self, settings_path, settings):
- this_dir_path = os.path.realpath(self.get_path("settings"))
- if os.path.realpath(settings_path) == this_dir_path:
- if not os.path.exists(self.get_path()):
- # don't save settings until the bug directory has been
- # initialized. this initialization happens the first time
- # a bug directory is saved (BugDir.save()). If the user
- # is just working with a BugDir in memory, we don't want
- # to go cluttering up his file system with settings files.
- return
- if self._save_user_id == False:
- if "user-id" in settings:
- settings = copy.copy(settings)
- del settings["user-id"]
allow_no_rcs = not self.rcs.path_in_root(settings_path)
# allow_no_rcs=True should only be for the special case of
# configuring duplicate bugdir settings
@@ -304,23 +408,17 @@ that the Arch RCS backend *enforces* ids with this format.""")
duplicate_settings = self._get_settings(duplicate_settings_path)
if "rcs_name" in duplicate_settings:
duplicate_settings["rcs_name"] = "None"
- duplicate_settings["user-id"] = self.user_id
- self._save_settings(duplicate_settings_path, duplicate_settings)
+ duplicate_settings["user_id"] = self.user_id
+ if "disabled" in bug.status_values:
+ # Hack to support old versions of BE bugs
+ duplicate_settings["inactive_status"] = self.inactive_status
+ self._save_settings(duplicate_settings_path, duplicate_settings)
- return BugDir(duplicate_path, from_disk=True)
+ return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
def remove_duplicate_bugdir(self):
self.rcs.remove_duplicate_repo()
- def _bug_map_gen(self):
- map = {}
- for bug in self:
- map[bug.uuid] = bug
- for uuid in self.list_uuids():
- if uuid not in map:
- map[uuid] = None
- self._bug_map = map
-
def list_uuids(self):
uuids = []
if os.path.exists(self.get_path()):
@@ -338,6 +436,7 @@ that the Arch RCS backend *enforces* ids with this format.""")
def _clear_bugs(self):
while len(self) > 0:
self.pop()
+ self._bug_map_gen()
def _load_bug(self, uuid):
bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
@@ -420,7 +519,8 @@ def simple_bug_dir():
"""
dir = utility.Dir()
assert os.path.exists(dir.path)
- bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True)
+ bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True,
+ manipulate_encodings=False)
bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir.
bug_a = bugdir.new_bug("a", summary="Bug A")
bug_a.creator = "John Doe <jdoe@example.com>"
@@ -495,6 +595,37 @@ class BugDirTestCase(unittest.TestCase):
self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
self.bugdir.save()
self.versionTest()
+ def testComments(self):
+ self.bugdir.new_bug(uuid="a", summary="Ant")
+ bug = self.bugdir.bug_from_uuid("a")
+ comm = bug.comment_root
+ rep = comm.new_reply("Ants are small.")
+ rep.new_reply("And they have six legs.")
+ self.bugdir.save()
+ self.bugdir._clear_bugs()
+ bug = self.bugdir.bug_from_uuid("a")
+ bug.load_comments()
+ self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
+ for index,comment in enumerate(bug.comments()):
+ if index == 0:
+ repLoaded = comment
+ self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
+ self.failUnless(comment.sync_with_disk == True,
+ comment.sync_with_disk)
+ #load_settings()
+ self.failUnless(comment.content_type == "text/plain",
+ comment.content_type)
+ self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
+ repLoaded.settings)
+ self.failUnless(repLoaded.body == "Ants are small.",
+ repLoaded.body)
+ elif index == 1:
+ self.failUnless(comment.in_reply_to == repLoaded.uuid,
+ repLoaded.uuid)
+ self.failUnless(comment.body == "And they have six legs.",
+ comment.body)
+ else:
+ self.failIf(True, "Invalid comment: %d\n%s" % (index, comment))
unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()])