diff options
Diffstat (limited to 'libbe/bugdir.py')
-rw-r--r-- | libbe/bugdir.py | 290 |
1 files changed, 153 insertions, 137 deletions
diff --git a/libbe/bugdir.py b/libbe/bugdir.py index 6bb6a43..f93576f 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -22,6 +22,11 @@ import copy import unittest import doctest +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, fn_checked_property, \ + cached_property, primed_property, change_hook_property, \ + settings_property +import settings_object import mapfile import bug import rcs @@ -65,31 +70,7 @@ class MultipleBugMatches(ValueError): TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n" -def setting_property(name, valid=None, default=None, doc=None): - if default != None: - raise NotImplementedError - def getter(self): - value = self.settings.get(name) - if valid is not None: - if value not in valid and value != None: - raise InvalidValue(name, value) - return value - - def setter(self, value): - if value != getter(self): - if valid is not None: - if value not in valid and value != None: - raise InvalidValue(name, value) - if value is None: - del self.settings[name] - else: - self.settings[name] = value - self._save_settings(self.get_path("settings"), self.settings) - - return property(getter, setter, doc=doc) - - -class BugDir (list): +class BugDir (list, settings_object.SavedSettingsObject): """ Sink to existing root ====================== @@ -143,14 +124,108 @@ class BugDir (list): using BugDirs, set manipulate_encodings=False, and stick to ASCII in your tests. """ + + settings_properties = [] + required_saved_properties = [] + _prop_save_settings = settings_object.prop_save_settings + _prop_load_settings = settings_object.prop_load_settings + def _versioned_property(settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"]=required_saved_properties + return settings_object.versioned_property(**kwargs) + + @_versioned_property(name="target", + doc="The current project development target") + def target(): return {} + + def _guess_encoding(self): + return encoding.get_encoding() + def _check_encoding(value): + if value != None and value != settings_object.EMPTY: + return encoding.known_encoding(value) + def _setup_encoding(self, new_encoding): + if new_encoding != None and new_encoding != settings_object.EMPTY: + if self._manipulate_encodings == True: + encoding.set_IO_stream_encodings(new_encoding) + def _set_encoding(self, old_encoding, new_encoding): + self._setup_encoding(new_encoding) + self._prop_save_settings(old_encoding, new_encoding) + + @_versioned_property(name="encoding", + doc="""The default input/output encoding to use (e.g. "utf-8").""", + change_hook=_set_encoding, + generator=_guess_encoding, + check_fn=_check_encoding) + def encoding(): return {} + + def _guess_user_id(self): + return self.rcs.get_user_id() + def _set_user_id(self, old_user_id, new_user_id): + self.rcs.user_id = new_user_id + self._prop_save_settings(old_user_id, new_user_id) + + @_versioned_property(name="user_id", + doc= +"""The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Note +that the Arch RCS backend *enforces* ids with this format.""", + change_hook=_set_user_id, + generator=_guess_user_id) + def user_id(): return {} + + @_versioned_property(name="rcs_name", + doc="""The name of the current RCS. Kept seperate to make saving/loading +settings easy. Don't set this attribute. Set .rcs instead, and +.rcs_name will be automatically adjusted.""", + default="None", + allowed=["None", "Arch", "bzr", "git", "hg"]) + def rcs_name(): return {} + + def _get_rcs(self, rcs_name=None): + """Get and root a new revision control system""" + if rcs_name == None: + rcs_name = self.rcs_name + new_rcs = rcs.rcs_by_name(rcs_name) + self._change_rcs(None, new_rcs) + return new_rcs + def _change_rcs(self, old_rcs, new_rcs): + new_rcs.encoding = self.encoding + new_rcs.root(self.root) + self.rcs_name = new_rcs.name + + @Property + @change_hook_property(hook=_change_rcs) + @cached_property(generator=_get_rcs) + @local_property("rcs") + @doc_property(doc="A revision control system instance.") + def rcs(): return {} + + def _bug_map_gen(self): + map = {} + for bug in self: + map[bug.uuid] = bug + for uuid in self.list_uuids(): + if uuid not in map: + map[uuid] = None + self._bug_map_value = map # ._bug_map_value used by @local_property + + @Property + @primed_property(primer=_bug_map_gen) + @local_property("bug_map") + @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.") + def _bug_map(): return {} + + def __init__(self, root=None, sink_to_existing_root=True, assert_new_BugDir=False, allow_rcs_init=False, manipulate_encodings=True, from_disk=False, rcs=None): list.__init__(self) - self._save_user_id = False + settings_object.SavedSettingsObject.__init__(self) self._manipulate_encodings = manipulate_encodings - self.settings = {} if root == None: root = os.getcwd() if sink_to_existing_root == True: @@ -159,9 +234,15 @@ class BugDir (list): if not os.path.exists(root): raise NoRootEntry(root) self.root = root + # get a temporary rcs until we've loaded settings + self.sync_with_disk = False + self.rcs = self._guess_rcs() + if from_disk == True: + self.sync_with_disk = True self.load() else: + self.sync_with_disk = False if assert_new_BugDir == True: if os.path.exists(self.get_path()): raise AlreadyInitialized, self.get_path() @@ -206,75 +287,6 @@ class BugDir (list): self.rcs.set_file_contents(self.get_path("version"), TREE_VERSION_STRING) - def _get_encoding(self): - if self._encoding == None: - return encoding.get_encoding() - else: - return self._encoding - def _set_encoding(self, new_encoding): - if new_encoding != None: - if encoding.known_encoding(new_encoding) == False: - raise InvalidValue("encoding", new_encoding) - self._encoding = new_encoding - if self._manipulate_encodings == True: - encoding.set_IO_stream_encodings(self.encoding) - if hasattr(self, "rcs"): - if self.rcs != None: - self.rcs.encoding = self.encoding - _encoding = setting_property("encoding", - doc= -"""The default input/output encoding to use (e.g. "utf-8"). -Dont' set this attribute, set .encoding instead.""") - encoding = property(_get_encoding, _set_encoding, doc= -"""The default input/output encoding to use (e.g. "utf-8").""") - - def _get_rcs(self): - return self._rcs - def _set_rcs(self, new_rcs): - if new_rcs == None: - new_rcs = rcs.rcs_by_name("None") - new_rcs.encoding = self.encoding - self._rcs = new_rcs - new_rcs.root(self.root) - self.rcs_name = new_rcs.name - _rcs = None - rcs = property(_get_rcs, _set_rcs, - doc="A revision control system (RCS) instance") - rcs_name = setting_property("rcs_name", - ("None", "bzr", "git", "Arch", "hg"), - doc= -"""The name of the current RCS. Kept seperate to make saving/loading -settings easy. Don't set this attribute. Set .rcs instead, and -.rcs_name will be automatically adjusted.""") - - - def _get_user_id(self): - if self._user_id == None and self.rcs != None: - self._user_id = self.rcs.get_user_id() - return self._user_id - def _set_user_id(self, user_id): - if self.rcs != None: - self.rcs.user_id = user_id - self._user_id = user_id - user_id = property(_get_user_id, _set_user_id, doc= -"""The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Note -that the Arch RCS backend *enforces* ids with this format.""") - _user_id = setting_property("user_id", doc= -"""The user's prefered name. Kept seperate to make saving/loading -settings easy. Don't set this attribute. Set .user_id instead, -and ._user_id will be automatically adjusted. This setting is -only saved if ._save_user_id == True""") - - - target = setting_property("target", - doc="The current project development target") - - def save_user_id(self, user_id=None): - if user_id == None: - user_id = self.user_id - self._save_user_id = True - self.user_id = user_id - def get_path(self, *args): my_dir = os.path.join(self.root, ".be") if len(args) == 0: @@ -292,7 +304,6 @@ only saved if ._save_user_id == True""") if allow_rcs_init == True: new_rcs = rcs.installed_rcs() new_rcs.init(self.root) - self.rcs = new_rcs return new_rcs def load(self): @@ -303,14 +314,10 @@ only saved if ._save_user_id == True""") else: if not os.path.exists(self.get_path()): raise NoBugDir(self.get_path()) - self.settings = self._get_settings(self.get_path("settings")) + self.load_settings() self.rcs = rcs.rcs_by_name(self.rcs_name) - self.encoding = self.encoding # setup encoding, IO_stream_encoding... - if self.settings.get("user_id") != None: - self.save_user_id() # was a user name in the settings file - - self._bug_map_gen() + self._setup_encoding(self.encoding) def load_all_bugs(self): "Warning: this could take a while." @@ -321,45 +328,31 @@ only saved if ._save_user_id == True""") def save(self): self.rcs.mkdir(self.get_path()) self.set_version() - self._save_settings(self.get_path("settings"), self.settings) + self.save_settings() self.rcs.mkdir(self.get_path("bugs")) for bug in self: bug.save() + def load_settings(self): + self.settings = self._get_settings(self.get_path("settings")) + self._setup_saved_settings() + def _get_settings(self, settings_path): - if self.rcs_name == None: - # Use a temporary RCS to loading settings the first time - RCS = rcs.rcs_by_name("None") - RCS.root(self.root) - else: - RCS = self.rcs - - allow_no_rcs = not RCS.path_in_root(settings_path) + allow_no_rcs = not self.rcs.path_in_root(settings_path) # allow_no_rcs=True should only be for the special case of # configuring duplicate bugdir settings try: - settings = mapfile.map_load(RCS, settings_path, allow_no_rcs) + settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs) except rcs.NoSuchFile: settings = {"rcs_name": "None"} return settings + def save_settings(self): + settings = self._get_saved_settings() + self._save_settings(self.get_path("settings"), settings) + def _save_settings(self, settings_path, settings): - this_dir_path = os.path.realpath(self.get_path("settings")) - if os.path.realpath(settings_path) == this_dir_path: - if not os.path.exists(self.get_path()): - # don't save settings until the bug directory has been - # initialized. this initialization happens the first time - # a bug directory is saved (BugDir.save()). If the user - # is just working with a BugDir in memory, we don't want - # to go cluttering up his file system with settings files. - return - if self._save_user_id == False: - if "user_id" in settings: - settings = copy.copy(settings) - del settings["user_id"] - if settings.get("encoding") == encoding.get_encoding(): - del settings["encoding"] # don't duplicate system default allow_no_rcs = not self.rcs.path_in_root(settings_path) # allow_no_rcs=True should only be for the special case of # configuring duplicate bugdir settings @@ -383,15 +376,6 @@ only saved if ._save_user_id == True""") def remove_duplicate_bugdir(self): self.rcs.remove_duplicate_repo() - def _bug_map_gen(self): - map = {} - for bug in self: - map[bug.uuid] = bug - for uuid in self.list_uuids(): - if uuid not in map: - map[uuid] = None - self._bug_map = map - def list_uuids(self): uuids = [] if os.path.exists(self.get_path()): @@ -409,6 +393,7 @@ only saved if ._save_user_id == True""") def _clear_bugs(self): while len(self) > 0: self.pop() + self._bug_map_gen() def _load_bug(self, uuid): bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True) @@ -567,6 +552,37 @@ class BugDirTestCase(unittest.TestCase): self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime)) self.bugdir.save() self.versionTest() + def testComments(self): + self.bugdir.new_bug(uuid="a", summary="Ant") + bug = self.bugdir.bug_from_uuid("a") + comm = bug.comment_root + rep = comm.new_reply("Ants are small.") + rep.new_reply("And they have six legs.") + self.bugdir.save() + self.bugdir._clear_bugs() + bug = self.bugdir.bug_from_uuid("a") + bug.load_comments() + self.failUnless(len(bug.comment_root)==1, len(bug.comment_root)) + for index,comment in enumerate(bug.comments()): + if index == 0: + repLoaded = comment + self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid) + self.failUnless(comment.sync_with_disk == True, + comment.sync_with_disk) + #load_settings() + self.failUnless(comment.content_type == "text/plain", + comment.content_type) + self.failUnless(repLoaded.settings["Content-type"]=="text/plain", + repLoaded.settings) + self.failUnless(repLoaded.body == "Ants are small.", + repLoaded.body) + elif index == 1: + self.failUnless(comment.in_reply_to == repLoaded.uuid, + repLoaded.uuid) + self.failUnless(comment.body == "And they have six legs.", + comment.body) + else: + self.failIf(True, "Invalid comment: %d\n%s" % (index, comment)) unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase) suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()]) |