From b0b3c9473e3a4b728ea72a2876e39fe41284a9ed Mon Sep 17 00:00:00 2001 From: Gianluca Montecchi Date: Fri, 2 Oct 2009 23:46:24 +0200 Subject: Merged with Trevor's -rr branch --- interfaces/web/Bugs-Everywhere-Web/libbe/bugdir.py | 476 ++++++++++++++------- 1 file changed, 316 insertions(+), 160 deletions(-) (limited to 'interfaces/web/Bugs-Everywhere-Web/libbe/bugdir.py') diff --git a/interfaces/web/Bugs-Everywhere-Web/libbe/bugdir.py b/interfaces/web/Bugs-Everywhere-Web/libbe/bugdir.py index 6e020ee..c4f0f91 100644 --- a/interfaces/web/Bugs-Everywhere-Web/libbe/bugdir.py +++ b/interfaces/web/Bugs-Everywhere-Web/libbe/bugdir.py @@ -17,23 +17,30 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Define the BugDir class for representing bug comments. +""" + +import copy +import errno import os import os.path -import errno +import sys import time -import copy import unittest import doctest +import bug +import encoding from properties import Property, doc_property, local_property, \ defaulting_property, checked_property, fn_checked_property, \ cached_property, primed_property, change_hook_property, \ settings_property -import settings_object import mapfile -import bug -import rcs -import encoding +import vcs +import settings_object +import upgrade import utility @@ -62,8 +69,16 @@ class MultipleBugMatches(ValueError): self.shortname = shortname self.matches = matches +class NoBugMatches(KeyError): + def __init__(self, shortname): + msg = "No bug matches %s" % shortname + KeyError.__init__(self, msg) + self.shortname = shortname -TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n" +class DiskAccessRequired (Exception): + def __init__(self, goal): + msg = "Cannot %s without accessing the disk" % goal + Exception.__init__(self, msg) class BugDir (list, settings_object.SavedSettingsObject): @@ -99,7 +114,8 @@ class BugDir (list, settings_object.SavedSettingsObject): all bugs/comments/etc. that have been loaded into memory. If you've been living in memory and want to move to .sync_with_disk==True, but you're not sure if anything has been - changed in memoryy, a call to save() is a safe move. + changed in memory, a call to save() immediately before the + .set_sync_with_disk(True) call is a safe move. Regardless of .sync_with_disk, a call to .save() will write out all the contents that the BugDir instance has loaded into memory. @@ -107,15 +123,14 @@ class BugDir (list, settings_object.SavedSettingsObject): changes, this .save() call will be a waste of time. The BugDir will only load information from the file system when it - loads new bugs/comments that it doesn't already have in memory, or - when it explicitly asked to do so (e.g. .load() or - __init__(from_disk=True)). + loads new settings/bugs/comments that it doesn't already have in + memory and .sync_with_disk == True. - Allow RCS initialization + Allow VCS initialization ======================== This one is for testing purposes. Setting it to True allows the - BugDir to search for an installed RCS backend and initialize it in + BugDir to search for an installed VCS backend and initialize it in the root directory. This is a convenience option for supporting tests of versioning functionality (e.g. .duplicate_bugdir). @@ -172,9 +187,9 @@ class BugDir (list, settings_object.SavedSettingsObject): def encoding(): return {} def _setup_user_id(self, user_id): - self.rcs.user_id = user_id + self.vcs.user_id = user_id def _guess_user_id(self): - return self.rcs.get_user_id() + return self.vcs.get_user_id() def _set_user_id(self, old_user_id, new_user_id): self._setup_user_id(new_user_id) self._prop_save_settings(old_user_id, new_user_id) @@ -182,7 +197,7 @@ class BugDir (list, settings_object.SavedSettingsObject): @_versioned_property(name="user_id", doc= """The user's prefered name, e.g. 'John Doe '. Note -that the Arch RCS backend *enforces* ids with this format.""", +that the Arch VCS backend *enforces* ids with this format.""", change_hook=_set_user_id, generator=_guess_user_id) def user_id(): return {} @@ -192,32 +207,32 @@ that the Arch RCS backend *enforces* ids with this format.""", """The default assignee for new bugs e.g. 'John Doe '.""") def default_assignee(): return {} - @_versioned_property(name="rcs_name", - doc="""The name of the current RCS. Kept seperate to make saving/loading -settings easy. Don't set this attribute. Set .rcs instead, and -.rcs_name will be automatically adjusted.""", + @_versioned_property(name="vcs_name", + doc="""The name of the current VCS. Kept seperate to make saving/loading +settings easy. Don't set this attribute. Set .vcs instead, and +.vcs_name will be automatically adjusted.""", default="None", allowed=["None", "Arch", "bzr", "darcs", "git", "hg"]) - def rcs_name(): return {} + def vcs_name(): return {} - def _get_rcs(self, rcs_name=None): + def _get_vcs(self, vcs_name=None): """Get and root a new revision control system""" - if rcs_name == None: - rcs_name = self.rcs_name - new_rcs = rcs.rcs_by_name(rcs_name) - self._change_rcs(None, new_rcs) - return new_rcs - def _change_rcs(self, old_rcs, new_rcs): - new_rcs.encoding = self.encoding - new_rcs.root(self.root) - self.rcs_name = new_rcs.name + if vcs_name == None: + vcs_name = self.vcs_name + new_vcs = vcs.vcs_by_name(vcs_name) + self._change_vcs(None, new_vcs) + return new_vcs + def _change_vcs(self, old_vcs, new_vcs): + new_vcs.encoding = self.encoding + new_vcs.root(self.root) + self.vcs_name = new_vcs.name @Property - @change_hook_property(hook=_change_rcs) - @cached_property(generator=_get_rcs) - @local_property("rcs") + @change_hook_property(hook=_change_vcs) + @cached_property(generator=_get_vcs) + @local_property("vcs") @doc_property(doc="A revision control system instance.") - def rcs(): return {} + def vcs(): return {} def _bug_map_gen(self): map = {} @@ -279,9 +294,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and def __init__(self, root=None, sink_to_existing_root=True, - assert_new_BugDir=False, allow_rcs_init=False, - manipulate_encodings=True, - from_disk=False, rcs=None): + assert_new_BugDir=False, allow_vcs_init=False, + manipulate_encodings=True, from_disk=False, vcs=None): list.__init__(self) settings_object.SavedSettingsObject.__init__(self) self._manipulate_encodings = manipulate_encodings @@ -293,9 +307,9 @@ settings easy. Don't set this attribute. Set .rcs instead, and if not os.path.exists(root): raise NoRootEntry(root) self.root = root - # get a temporary rcs until we've loaded settings + # get a temporary vcs until we've loaded settings self.sync_with_disk = False - self.rcs = self._guess_rcs() + self.vcs = self._guess_vcs() if from_disk == True: self.sync_with_disk = True @@ -305,20 +319,24 @@ settings easy. Don't set this attribute. Set .rcs instead, and if assert_new_BugDir == True: if os.path.exists(self.get_path()): raise AlreadyInitialized, self.get_path() - if rcs == None: - rcs = self._guess_rcs(allow_rcs_init) - self.rcs = rcs + if vcs == None: + vcs = self._guess_vcs(allow_vcs_init) + self.vcs = vcs self._setup_user_id(self.user_id) - def set_sync_with_disk(self, value): - self.sync_with_disk = value - for bug in self: - bug.set_sync_with_disk(value) + def __del__(self): + self.cleanup() + + def cleanup(self): + self.vcs.cleanup() + + # methods for getting the BugDir situated in the filesystem def _find_root(self, path): """ Search for an existing bug database dir and it's ancestors and - return a BugDir rooted there. + return a BugDir rooted there. Only called by __init__, and + then only if sink_to_existing_root == True. """ if not os.path.exists(path): raise NoRootEntry(path) @@ -334,136 +352,212 @@ settings easy. Don't set this attribute. Set .rcs instead, and raise NoBugDir(path) return beroot - def get_version(self, path=None, use_none_rcs=False): - if use_none_rcs == True: - RCS = rcs.rcs_by_name("None") - RCS.root(self.root) - RCS.encoding = encoding.get_encoding() + def _guess_vcs(self, allow_vcs_init=False): + """ + Only called by __init__. + """ + deepdir = self.get_path() + if not os.path.exists(deepdir): + deepdir = os.path.dirname(deepdir) + new_vcs = vcs.detect_vcs(deepdir) + install = False + if new_vcs.name == "None": + if allow_vcs_init == True: + new_vcs = vcs.installed_vcs() + new_vcs.init(self.root) + return new_vcs + + # methods for saving/loading/accessing settings and properties. + + def get_path(self, *args): + """ + Return a path relative to .root. + """ + dir = os.path.join(self.root, ".be") + if len(args) == 0: + return dir + assert args[0] in ["version", "settings", "bugs"], str(args) + return os.path.join(dir, *args) + + def _get_settings(self, settings_path, for_duplicate_bugdir=False): + allow_no_vcs = not self.vcs.path_in_root(settings_path) + if allow_no_vcs == True: + assert for_duplicate_bugdir == True + if self.sync_with_disk == False and for_duplicate_bugdir == False: + # duplicates can ignore this bugdir's .sync_with_disk status + raise DiskAccessRequired("_get settings") + try: + settings = mapfile.map_load(self.vcs, settings_path, allow_no_vcs) + except vcs.NoSuchFile: + settings = {"vcs_name": "None"} + return settings + + def _save_settings(self, settings_path, settings, + for_duplicate_bugdir=False): + allow_no_vcs = not self.vcs.path_in_root(settings_path) + if allow_no_vcs == True: + assert for_duplicate_bugdir == True + if self.sync_with_disk == False and for_duplicate_bugdir == False: + # duplicates can ignore this bugdir's .sync_with_disk status + raise DiskAccessRequired("_save settings") + self.vcs.mkdir(self.get_path(), allow_no_vcs) + mapfile.map_save(self.vcs, settings_path, settings, allow_no_vcs) + + def load_settings(self): + self.settings = self._get_settings(self.get_path("settings")) + self._setup_saved_settings() + self._setup_user_id(self.user_id) + self._setup_encoding(self.encoding) + self._setup_severities(self.severities) + self._setup_status(self.active_status, self.inactive_status) + self.vcs = vcs.vcs_by_name(self.vcs_name) + self._setup_user_id(self.user_id) + + def save_settings(self): + settings = self._get_saved_settings() + self._save_settings(self.get_path("settings"), settings) + + def get_version(self, path=None, use_none_vcs=False, + for_duplicate_bugdir=False): + """ + Requires disk access. + """ + if self.sync_with_disk == False: + raise DiskAccessRequired("get version") + if use_none_vcs == True: + VCS = vcs.vcs_by_name("None") + VCS.root(self.root) + VCS.encoding = encoding.get_encoding() else: - RCS = self.rcs + VCS = self.vcs if path == None: path = self.get_path("version") - tree_version = RCS.get_file_contents(path) - return tree_version + allow_no_vcs = not VCS.path_in_root(path) + if allow_no_vcs == True: + assert for_duplicate_bugdir == True + version = VCS.get_file_contents( + path, allow_no_vcs=allow_no_vcs).rstrip("\n") + return version def set_version(self): - self.rcs.mkdir(self.get_path()) - self.rcs.set_file_contents(self.get_path("version"), - TREE_VERSION_STRING) + """ + Requires disk access. + """ + if self.sync_with_disk == False: + raise DiskAccessRequired("set version") + self.vcs.mkdir(self.get_path()) + self.vcs.set_file_contents(self.get_path("version"), + upgrade.BUGDIR_DISK_VERSION+"\n") - def get_path(self, *args): - my_dir = os.path.join(self.root, ".be") - if len(args) == 0: - return my_dir - assert args[0] in ["version", "settings", "bugs"], str(args) - return os.path.join(my_dir, *args) + # methods controlling disk access - def _guess_rcs(self, allow_rcs_init=False): - deepdir = self.get_path() - if not os.path.exists(deepdir): - deepdir = os.path.dirname(deepdir) - new_rcs = rcs.detect_rcs(deepdir) - install = False - if new_rcs.name == "None": - if allow_rcs_init == True: - new_rcs = rcs.installed_rcs() - new_rcs.init(self.root) - return new_rcs + def set_sync_with_disk(self, value): + """ + Adjust .sync_with_disk for the BugDir and all it's children. + See the BugDir docstring for a description of the role of + .sync_with_disk. + """ + self.sync_with_disk = value + for bug in self: + bug.set_sync_with_disk(value) def load(self): - version = self.get_version(use_none_rcs=True) - if version != TREE_VERSION_STRING: - raise NotImplementedError, \ - "BugDir cannot handle version '%s' yet." % version + """ + Reqires disk access + """ + version = self.get_version(use_none_vcs=True) + if version != upgrade.BUGDIR_DISK_VERSION: + upgrade.upgrade(self.root, version) else: if not os.path.exists(self.get_path()): raise NoBugDir(self.get_path()) self.load_settings() - self.rcs = rcs.rcs_by_name(self.rcs_name) - self._setup_user_id(self.user_id) - def load_all_bugs(self): - "Warning: this could take a while." + """ + Requires disk access. + Warning: this could take a while. + """ + if self.sync_with_disk == False: + raise DiskAccessRequired("load all bugs") self._clear_bugs() for uuid in self.list_uuids(): self._load_bug(uuid) def save(self): """ + Note that this command writes to disk _regardless_ of the + status of .sync_with_disk. + Save any loaded contents to disk. Because of lazy loading of bugs and comments, this is actually not too inefficient. - However, if self.sync_with_disk = True, then any changes are + However, if .sync_with_disk = True, then any changes are automatically written to disk as soon as they happen, so calling this method will just waste time (unless something else has been messing with your on-disk files). + + Requires disk access. """ + sync_with_disk = self.sync_with_disk + if sync_with_disk == False: + self.set_sync_with_disk(True) self.set_version() self.save_settings() for bug in self: bug.save() + if sync_with_disk == False: + self.set_sync_with_disk(sync_with_disk) - def load_settings(self): - self.settings = self._get_settings(self.get_path("settings")) - self._setup_saved_settings() - self._setup_user_id(self.user_id) - self._setup_encoding(self.encoding) - self._setup_severities(self.severities) - self._setup_status(self.active_status, self.inactive_status) + # methods for managing duplicate BugDirs - def _get_settings(self, settings_path): - allow_no_rcs = not self.rcs.path_in_root(settings_path) - # allow_no_rcs=True should only be for the special case of - # configuring duplicate bugdir settings + def duplicate_bugdir(self, revision): + duplicate_path = self.vcs.duplicate_repo(revision) + duplicate_version_path = os.path.join(duplicate_path, ".be", "version") try: - settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs) - except rcs.NoSuchFile: - settings = {"rcs_name": "None"} - return settings - - def save_settings(self): - settings = self._get_saved_settings() - self._save_settings(self.get_path("settings"), settings) - - def _save_settings(self, settings_path, settings): - allow_no_rcs = not self.rcs.path_in_root(settings_path) - # allow_no_rcs=True should only be for the special case of - # configuring duplicate bugdir settings - self.rcs.mkdir(self.get_path(), allow_no_rcs) - mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs) - - def duplicate_bugdir(self, revision): - duplicate_path = self.rcs.duplicate_repo(revision) + version = self.get_version(duplicate_version_path, + for_duplicate_bugdir=True) + except DiskAccessRequired: + self.sync_with_disk = True # temporarily allow access + version = self.get_version(duplicate_version_path, + for_duplicate_bugdir=True) + self.sync_with_disk = False + if version != upgrade.BUGDIR_DISK_VERSION: + upgrade.upgrade(duplicate_path, version) - # setup revision RCS as None, since the duplicate may not be + # setup revision VCS as None, since the duplicate may not be # initialized for versioning duplicate_settings_path = os.path.join(duplicate_path, ".be", "settings") - duplicate_settings = self._get_settings(duplicate_settings_path) - if "rcs_name" in duplicate_settings: - duplicate_settings["rcs_name"] = "None" + duplicate_settings = self._get_settings(duplicate_settings_path, + for_duplicate_bugdir=True) + if "vcs_name" in duplicate_settings: + duplicate_settings["vcs_name"] = "None" duplicate_settings["user_id"] = self.user_id if "disabled" in bug.status_values: # Hack to support old versions of BE bugs duplicate_settings["inactive_status"] = self.inactive_status - self._save_settings(duplicate_settings_path, duplicate_settings) + self._save_settings(duplicate_settings_path, duplicate_settings, + for_duplicate_bugdir=True) return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings) def remove_duplicate_bugdir(self): - self.rcs.remove_duplicate_repo() + self.vcs.remove_duplicate_repo() + + # methods for managing bugs def list_uuids(self): uuids = [] - if os.path.exists(self.get_path()): + if self.sync_with_disk == True and os.path.exists(self.get_path()): # list the uuids on disk - for uuid in os.listdir(self.get_path("bugs")): - if not (uuid.startswith('.')): - uuids.append(uuid) - yield uuid + if os.path.exists(self.get_path("bugs")): + for uuid in os.listdir(self.get_path("bugs")): + if not (uuid.startswith('.')): + uuids.append(uuid) + yield uuid # and the ones that are still just in memory for bug in self: if bug.uuid not in uuids: @@ -476,6 +570,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and self._bug_map_gen() def _load_bug(self, uuid): + if self.sync_with_disk == False: + raise DiskAccessRequired("_load bug") bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True) self.append(bg) self._bug_map_gen() @@ -492,7 +588,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and def remove_bug(self, bug): self.remove(bug) - bug.remove() + if bug.sync_with_disk == True: + bug.remove() def bug_shortname(self, bug): """ @@ -514,12 +611,13 @@ settings easy. Don't set this attribute. Set .rcs instead, and def bug_from_shortname(self, shortname): """ - >>> bd = simple_bug_dir() + >>> bd = SimpleBugDir(sync_with_disk=False) >>> bug_a = bd.bug_from_shortname('a') >>> print type(bug_a) >>> print bug_a a:om: Bug A + >>> bd.cleanup() """ matches = [] self._bug_map_gen() @@ -530,7 +628,7 @@ settings easy. Don't set this attribute. Set .rcs instead, and raise MultipleBugMatches(shortname, matches) if len(matches) == 1: return self.bug_from_uuid(matches[0]) - raise KeyError("No bug matches %s" % shortname) + raise NoBugMatches(shortname) def bug_from_uuid(self, uuid): if not self.has_bug(uuid): @@ -548,41 +646,56 @@ settings easy. Don't set this attribute. Set .rcs instead, and return True -def simple_bug_dir(): +class SimpleBugDir (BugDir): """ - For testing - >>> bugdir = simple_bug_dir() - >>> ls = list(bugdir.list_uuids()) - >>> ls.sort() - >>> print ls + For testing. Set sync_with_disk==False for a memory-only bugdir. + >>> bugdir = SimpleBugDir() + >>> uuids = list(bugdir.list_uuids()) + >>> uuids.sort() + >>> print uuids ['a', 'b'] + >>> bugdir.cleanup() """ - dir = utility.Dir() - assert os.path.exists(dir.path) - bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True, + def __init__(self, sync_with_disk=True): + if sync_with_disk == True: + dir = utility.Dir() + assert os.path.exists(dir.path) + root = dir.path + assert_new_BugDir = True + vcs_init = True + else: + root = "/" + assert_new_BugDir = False + vcs_init = False + BugDir.__init__(self, root, sink_to_existing_root=False, + assert_new_BugDir=assert_new_BugDir, + allow_vcs_init=vcs_init, manipulate_encodings=False) - bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir. - bug_a = bugdir.new_bug("a", summary="Bug A") - bug_a.creator = "John Doe " - bug_a.time = 0 - bug_b = bugdir.new_bug("b", summary="Bug B") - bug_b.creator = "Jane Doe " - bug_b.time = 0 - bug_b.status = "closed" - bugdir.save() - return bugdir - + if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir. + self._dir_ref = dir + bug_a = self.new_bug("a", summary="Bug A") + bug_a.creator = "John Doe " + bug_a.time = 0 + bug_b = self.new_bug("b", summary="Bug B") + bug_b.creator = "Jane Doe " + bug_b.time = 0 + bug_b.status = "closed" + if sync_with_disk == True: + self.save() + self.set_sync_with_disk(True) + def cleanup(self): + if hasattr(self, "_dir_ref"): + self._dir_ref.cleanup() + BugDir.cleanup(self) class BugDirTestCase(unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) def setUp(self): self.dir = utility.Dir() self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, - allow_rcs_init=True) - self.rcs = self.bugdir.rcs + allow_vcs_init=True) + self.vcs = self.bugdir.vcs def tearDown(self): - self.rcs.cleanup() + self.bugdir.cleanup() self.dir.cleanup() def fullPath(self, path): return os.path.join(self.dir.path, path) @@ -593,13 +706,13 @@ class BugDirTestCase(unittest.TestCase): self.assertRaises(AlreadyInitialized, BugDir, self.dir.path, assertNewBugDir=True) def versionTest(self): - if self.rcs.versioned == False: + if self.vcs.versioned == False: return - original = self.bugdir.rcs.commit("Began versioning") + original = self.bugdir.vcs.commit("Began versioning") bugA = self.bugdir.bug_from_uuid("a") bugA.status = "fixed" self.bugdir.save() - new = self.rcs.commit("Fixed bug a") + new = self.vcs.commit("Fixed bug a") dupdir = self.bugdir.duplicate_bugdir(original) self.failUnless(dupdir.root != self.bugdir.root, "%s, %s" % (dupdir.root, self.bugdir.root)) @@ -645,17 +758,19 @@ class BugDirTestCase(unittest.TestCase): rep.new_reply("And they have six legs.") if sync_with_disk == False: self.bugdir.save() + self.bugdir.set_sync_with_disk(True) self.bugdir._clear_bugs() bug = self.bugdir.bug_from_uuid("a") bug.load_comments() + if sync_with_disk == False: + self.bugdir.set_sync_with_disk(False) self.failUnless(len(bug.comment_root)==1, len(bug.comment_root)) for index,comment in enumerate(bug.comments()): if index == 0: repLoaded = comment self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid) - self.failUnless(comment.sync_with_disk == True, + self.failUnless(comment.sync_with_disk == sync_with_disk, comment.sync_with_disk) - #load_settings() self.failUnless(comment.content_type == "text/plain", comment.content_type) self.failUnless(repLoaded.settings["Content-type"]=="text/plain", @@ -672,5 +787,46 @@ class BugDirTestCase(unittest.TestCase): def testSyncedComments(self): self.testComments(sync_with_disk=True) -unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase) -suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()]) +class SimpleBugDirTestCase (unittest.TestCase): + def setUp(self): + # create a pre-existing bugdir in a temporary directory + self.dir = utility.Dir() + self.original_working_dir = os.getcwd() + os.chdir(self.dir.path) + self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, + allow_vcs_init=True) + self.bugdir.new_bug("preexisting", summary="Hopefully not imported") + self.bugdir.save() + def tearDown(self): + os.chdir(self.original_working_dir) + self.bugdir.cleanup() + self.dir.cleanup() + def testOnDiskCleanLoad(self): + """SimpleBugDir(sync_with_disk==True) should not import preexisting bugs.""" + bugdir = SimpleBugDir(sync_with_disk=True) + self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk) + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == ['a', 'b'], uuids) + bugdir._clear_bugs() + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == [], uuids) + bugdir.load_all_bugs() + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == ['a', 'b'], uuids) + bugdir.cleanup() + def testInMemoryCleanLoad(self): + """SimpleBugDir(sync_with_disk==False) should not import preexisting bugs.""" + bugdir = SimpleBugDir(sync_with_disk=False) + self.failUnless(bugdir.sync_with_disk==False, bugdir.sync_with_disk) + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == ['a', 'b'], uuids) + self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs) + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == ['a', 'b'], uuids) + bugdir._clear_bugs() + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == [], uuids) + bugdir.cleanup() + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) -- cgit