diff options
Diffstat (limited to 'libbe/bugdir.py')
-rw-r--r-- | libbe/bugdir.py | 375 |
1 files changed, 253 insertions, 122 deletions
diff --git a/libbe/bugdir.py b/libbe/bugdir.py index 7e4cf3e..7885224 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -22,10 +22,16 @@ import copy import unittest import doctest +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, fn_checked_property, \ + cached_property, primed_property, change_hook_property, \ + settings_property +import settings_object import mapfile import bug -import utility import rcs +import encoding +import utility class NoBugDir(Exception): @@ -64,28 +70,29 @@ class MultipleBugMatches(ValueError): TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n" -def setting_property(name, valid=None, doc=None): - def getter(self): - value = self.settings.get(name) - if valid is not None: - if value not in valid and value != None: - raise InvalidValue(name, value) - return value +class BugDir (list, settings_object.SavedSettingsObject): + """ + Sink to existing root + ====================== - def setter(self, value): - if value != getter(self): - if value is None: - del self.settings[name] - else: - self.settings[name] = value - self._save_settings(self.get_path("settings"), self.settings) + Consider the following usage case: + You have a bug directory rooted in + /path/to/source + by which I mean the '.be' directory is at + /path/to/source/.be + However, you're of in some subdirectory like + /path/to/source/GUI/testing + and you want to comment on a bug. Setting sink_to_root=True wen + you initialize your BugDir will cause it to search for the '.be' + file in the ancestors of the path you passed in as 'root'. + /path/to/source/GUI/testing/.be miss + /path/to/source/GUI/.be miss + /path/to/source/.be hit! + So it still roots itself appropriately without much work for you. + + File-system access + ================== - return property(getter, setter, doc=doc) - - -class BugDir (list): - """ - File-system access: When rooted in non-bugdir directory, BugDirs live completely in memory until the first call to .save(). This creates a '.be' sub-directory containing configurations options, bugs, comments, @@ -95,13 +102,166 @@ class BugDir (list): will only load information from the file system when it loads new bugs/comments that it doesn't already have in memory, or when it explicitly asked to do so (e.g. .load() or __init__(from_disk=True)). + + Allow RCS initialization + ======================== + + This one is for testing purposes. Setting it to True allows the + BugDir to search for an installed RCS backend and initialize it in + the root directory. This is a convenience option for supporting + tests of versioning functionality (e.g. .duplicate_bugdir). + + Disable encoding manipulation + ============================= + + This one is for testing purposed. You might have non-ASCII + Unicode in your bugs, comments, files, etc. BugDir instances try + and support your preferred encoding scheme (e.g. "utf-8") when + dealing with stream and file input/output. For stream output, + this involves replacing sys.stdout and sys.stderr + (libbe.encode.set_IO_stream_encodings). However this messes up + doctest's output catching. In order to support doctest tests + using BugDirs, set manipulate_encodings=False, and stick to ASCII + in your tests. """ + + settings_properties = [] + required_saved_properties = [] + _prop_save_settings = settings_object.prop_save_settings + _prop_load_settings = settings_object.prop_load_settings + def _versioned_property(settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"]=required_saved_properties + return settings_object.versioned_property(**kwargs) + + @_versioned_property(name="target", + doc="The current project development target") + def target(): return {} + + def _guess_encoding(self): + return encoding.get_encoding() + def _check_encoding(value): + if value != None and value != settings_object.EMPTY: + return encoding.known_encoding(value) + def _setup_encoding(self, new_encoding): + if new_encoding != None and new_encoding != settings_object.EMPTY: + if self._manipulate_encodings == True: + encoding.set_IO_stream_encodings(new_encoding) + def _set_encoding(self, old_encoding, new_encoding): + self._setup_encoding(new_encoding) + self._prop_save_settings(old_encoding, new_encoding) + + @_versioned_property(name="encoding", + doc="""The default input/output encoding to use (e.g. "utf-8").""", + change_hook=_set_encoding, + generator=_guess_encoding, + check_fn=_check_encoding) + def encoding(): return {} + + def _setup_user_id(self, user_id): + self.rcs.user_id = user_id + def _guess_user_id(self): + return self.rcs.get_user_id() + def _set_user_id(self, old_user_id, new_user_id): + self._setup_user_id(new_user_id) + self._prop_save_settings(old_user_id, new_user_id) + + @_versioned_property(name="user_id", + doc= +"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note +that the Arch RCS backend *enforces* ids with this format.""", + change_hook=_set_user_id, + generator=_guess_user_id) + def user_id(): return {} + + @_versioned_property(name="default_assignee", + doc= +"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""") + def default_assignee(): return {} + + @_versioned_property(name="rcs_name", + doc="""The name of the current RCS. Kept seperate to make saving/loading +settings easy. Don't set this attribute. Set .rcs instead, and +.rcs_name will be automatically adjusted.""", + default="None", + allowed=["None", "Arch", "bzr", "git", "hg"]) + def rcs_name(): return {} + + def _get_rcs(self, rcs_name=None): + """Get and root a new revision control system""" + if rcs_name == None: + rcs_name = self.rcs_name + new_rcs = rcs.rcs_by_name(rcs_name) + self._change_rcs(None, new_rcs) + return new_rcs + def _change_rcs(self, old_rcs, new_rcs): + new_rcs.encoding = self.encoding + new_rcs.root(self.root) + self.rcs_name = new_rcs.name + + @Property + @change_hook_property(hook=_change_rcs) + @cached_property(generator=_get_rcs) + @local_property("rcs") + @doc_property(doc="A revision control system instance.") + def rcs(): return {} + + def _bug_map_gen(self): + map = {} + for bug in self: + map[bug.uuid] = bug + for uuid in self.list_uuids(): + if uuid not in map: + map[uuid] = None + self._bug_map_value = map # ._bug_map_value used by @local_property + + @Property + @primed_property(primer=_bug_map_gen) + @local_property("bug_map") + @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.") + def _bug_map(): return {} + + def _setup_severities(self, severities): + if severities != None and severities != settings_object.EMPTY: + bug.load_severities(severities) + def _set_severities(self, old_severities, new_severities): + self._setup_severities(new_severities) + self._prop_save_settings(old_severities, new_severities) + @_versioned_property(name="severities", + doc="The allowed bug severities and their descriptions.", + change_hook=_set_severities) + def severities(): return {} + + def _setup_status(self, active_status, inactive_status): + bug.load_status(active_status, inactive_status) + def _set_active_status(self, old_active_status, new_active_status): + self._setup_status(new_active_status, self.inactive_status) + self._prop_save_settings(old_active_status, new_active_status) + @_versioned_property(name="active_status", + doc="The allowed active bug states and their descriptions.", + change_hook=_set_active_status) + def active_status(): return {} + + def _set_inactive_status(self, old_inactive_status, new_inactive_status): + self._setup_status(self.active_status, new_inactive_status) + self._prop_save_settings(old_inactive_status, new_inactive_status) + @_versioned_property(name="inactive_status", + doc="The allowed inactive bug states and their descriptions.", + change_hook=_set_inactive_status) + def inactive_status(): return {} + + def __init__(self, root=None, sink_to_existing_root=True, assert_new_BugDir=False, allow_rcs_init=False, + manipulate_encodings=True, from_disk=False, rcs=None): list.__init__(self) - self._save_user_id = False - self.settings = {} + settings_object.SavedSettingsObject.__init__(self) + self._manipulate_encodings = manipulate_encodings if root == None: root = os.getcwd() if sink_to_existing_root == True: @@ -110,16 +270,22 @@ class BugDir (list): if not os.path.exists(root): raise NoRootEntry(root) self.root = root + # get a temporary rcs until we've loaded settings + self.sync_with_disk = False + self.rcs = self._guess_rcs() + if from_disk == True: + self.sync_with_disk = True self.load() else: + self.sync_with_disk = False if assert_new_BugDir == True: if os.path.exists(self.get_path()): raise AlreadyInitialized, self.get_path() if rcs == None: rcs = self._guess_rcs(allow_rcs_init) self.rcs = rcs - user_id = self.rcs.get_user_id() + self._setup_user_id(self.user_id) def _find_root(self, path): """ @@ -128,7 +294,8 @@ class BugDir (list): """ if not os.path.exists(path): raise NoRootEntry(path) - versionfile = utility.search_parent_directories(path, os.path.join(".be", "version")) + versionfile=utility.search_parent_directories(path, + os.path.join(".be", "version")) if versionfile != None: beroot = os.path.dirname(versionfile) root = os.path.dirname(beroot) @@ -139,11 +306,11 @@ class BugDir (list): raise NoBugDir(path) return beroot - def get_version(self, path=None): - if self.rcs_name == None: - # Use a temporary RCS to check the version for the first time + def get_version(self, path=None, use_none_rcs=False): + if use_none_rcs == True: RCS = rcs.rcs_by_name("None") RCS.root(self.root) + RCS.encoding = encoding.get_encoding() else: RCS = self.rcs @@ -156,57 +323,6 @@ class BugDir (list): self.rcs.set_file_contents(self.get_path("version"), TREE_VERSION_STRING) - rcs_name = setting_property("rcs_name", - ("None", "bzr", "git", "Arch", "hg"), - doc= -"""The name of the current RCS. Kept seperate to make saving/loading -settings easy. Don't set this attribute. Set .rcs instead, and -.rcs_name will be automatically adjusted.""") - - _rcs = None - - def _get_rcs(self): - return self._rcs - - def _set_rcs(self, new_rcs): - if new_rcs == None: - new_rcs = rcs.rcs_by_name("None") - self._rcs = new_rcs - new_rcs.root(self.root) - self.rcs_name = new_rcs.name - - rcs = property(_get_rcs, _set_rcs, - doc="A revision control system (RCS) instance") - - _user_id = setting_property("user-id", doc= -"""The user's prefered name. Kept seperate to make saving/loading -settings easy. Don't set this attribute. Set .user_id instead, -and ._user_id will be automatically adjusted. This setting is -only saved if ._save_user_id == True""") - - def _get_user_id(self): - if self._user_id == None and self.rcs != None: - self._user_id = self.rcs.get_user_id() - return self._user_id - - def _set_user_id(self, user_id): - if self.rcs != None: - self.rcs.user_id = user_id - self._user_id = user_id - - user_id = property(_get_user_id, _set_user_id, doc= -"""The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Note -that the Arch RCS backend *enforces* ids with this format.""") - - target = setting_property("target", - doc="The current project development target") - - def save_user_id(self, user_id=None): - if user_id == None: - user_id = self.user_id - self._save_user_id = True - self.user_id = user_id - def get_path(self, *args): my_dir = os.path.join(self.root, ".be") if len(args) == 0: @@ -224,24 +340,20 @@ that the Arch RCS backend *enforces* ids with this format.""") if allow_rcs_init == True: new_rcs = rcs.installed_rcs() new_rcs.init(self.root) - self.rcs = new_rcs return new_rcs def load(self): - version = self.get_version() + version = self.get_version(use_none_rcs=True) if version != TREE_VERSION_STRING: raise NotImplementedError, \ "BugDir cannot handle version '%s' yet." % version else: if not os.path.exists(self.get_path()): raise NoBugDir(self.get_path()) - self.settings = self._get_settings(self.get_path("settings")) + self.load_settings() self.rcs = rcs.rcs_by_name(self.rcs_name) - if self._user_id != None: # was a user name in the settings file - self.save_user_id() - - self._bug_map_gen() + self._setup_user_id(self.user_id) def load_all_bugs(self): "Warning: this could take a while." @@ -252,43 +364,35 @@ that the Arch RCS backend *enforces* ids with this format.""") def save(self): self.rcs.mkdir(self.get_path()) self.set_version() - self._save_settings(self.get_path("settings"), self.settings) + self.save_settings() self.rcs.mkdir(self.get_path("bugs")) for bug in self: bug.save() + def load_settings(self): + self.settings = self._get_settings(self.get_path("settings")) + self._setup_saved_settings() + self._setup_user_id(self.user_id) + self._setup_encoding(self.encoding) + self._setup_severities(self.severities) + self._setup_status(self.active_status, self.inactive_status) + def _get_settings(self, settings_path): - if self.rcs_name == None: - # Use a temporary RCS to loading settings the first time - RCS = rcs.rcs_by_name("None") - RCS.root(self.root) - else: - RCS = self.rcs - - allow_no_rcs = not RCS.path_in_root(settings_path) + allow_no_rcs = not self.rcs.path_in_root(settings_path) # allow_no_rcs=True should only be for the special case of # configuring duplicate bugdir settings try: - settings = mapfile.map_load(RCS, settings_path, allow_no_rcs) + settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs) except rcs.NoSuchFile: settings = {"rcs_name": "None"} return settings + def save_settings(self): + settings = self._get_saved_settings() + self._save_settings(self.get_path("settings"), settings) + def _save_settings(self, settings_path, settings): - this_dir_path = os.path.realpath(self.get_path("settings")) - if os.path.realpath(settings_path) == this_dir_path: - if not os.path.exists(self.get_path()): - # don't save settings until the bug directory has been - # initialized. this initialization happens the first time - # a bug directory is saved (BugDir.save()). If the user - # is just working with a BugDir in memory, we don't want - # to go cluttering up his file system with settings files. - return - if self._save_user_id == False: - if "user-id" in settings: - settings = copy.copy(settings) - del settings["user-id"] allow_no_rcs = not self.rcs.path_in_root(settings_path) # allow_no_rcs=True should only be for the special case of # configuring duplicate bugdir settings @@ -304,23 +408,17 @@ that the Arch RCS backend *enforces* ids with this format.""") duplicate_settings = self._get_settings(duplicate_settings_path) if "rcs_name" in duplicate_settings: duplicate_settings["rcs_name"] = "None" - duplicate_settings["user-id"] = self.user_id - self._save_settings(duplicate_settings_path, duplicate_settings) + duplicate_settings["user_id"] = self.user_id + if "disabled" in bug.status_values: + # Hack to support old versions of BE bugs + duplicate_settings["inactive_status"] = self.inactive_status + self._save_settings(duplicate_settings_path, duplicate_settings) - return BugDir(duplicate_path, from_disk=True) + return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings) def remove_duplicate_bugdir(self): self.rcs.remove_duplicate_repo() - def _bug_map_gen(self): - map = {} - for bug in self: - map[bug.uuid] = bug - for uuid in self.list_uuids(): - if uuid not in map: - map[uuid] = None - self._bug_map = map - def list_uuids(self): uuids = [] if os.path.exists(self.get_path()): @@ -338,6 +436,7 @@ that the Arch RCS backend *enforces* ids with this format.""") def _clear_bugs(self): while len(self) > 0: self.pop() + self._bug_map_gen() def _load_bug(self, uuid): bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True) @@ -420,7 +519,8 @@ def simple_bug_dir(): """ dir = utility.Dir() assert os.path.exists(dir.path) - bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True) + bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True, + manipulate_encodings=False) bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir. bug_a = bugdir.new_bug("a", summary="Bug A") bug_a.creator = "John Doe <jdoe@example.com>" @@ -495,6 +595,37 @@ class BugDirTestCase(unittest.TestCase): self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime)) self.bugdir.save() self.versionTest() + def testComments(self): + self.bugdir.new_bug(uuid="a", summary="Ant") + bug = self.bugdir.bug_from_uuid("a") + comm = bug.comment_root + rep = comm.new_reply("Ants are small.") + rep.new_reply("And they have six legs.") + self.bugdir.save() + self.bugdir._clear_bugs() + bug = self.bugdir.bug_from_uuid("a") + bug.load_comments() + self.failUnless(len(bug.comment_root)==1, len(bug.comment_root)) + for index,comment in enumerate(bug.comments()): + if index == 0: + repLoaded = comment + self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid) + self.failUnless(comment.sync_with_disk == True, + comment.sync_with_disk) + #load_settings() + self.failUnless(comment.content_type == "text/plain", + comment.content_type) + self.failUnless(repLoaded.settings["Content-type"]=="text/plain", + repLoaded.settings) + self.failUnless(repLoaded.body == "Ants are small.", + repLoaded.body) + elif index == 1: + self.failUnless(comment.in_reply_to == repLoaded.uuid, + repLoaded.uuid) + self.failUnless(comment.body == "And they have six legs.", + comment.body) + else: + self.failIf(True, "Invalid comment: %d\n%s" % (index, comment)) unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase) suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()]) |