aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/bugdir.py
diff options
context:
space:
mode:
authorGianluca Montecchi <gian@grys.it>2009-10-02 23:46:24 +0200
committerGianluca Montecchi <gian@grys.it>2009-10-02 23:46:24 +0200
commitb0b3c9473e3a4b728ea72a2876e39fe41284a9ed (patch)
tree533a389e877b4b1a9c4099bb419eb221b2f12ada /libbe/bugdir.py
parent071fef7c351c4fc23696aa6db693411b78da2edb (diff)
downloadbugseverywhere-b0b3c9473e3a4b728ea72a2876e39fe41284a9ed.tar.gz
Merged with Trevor's -rr branch
Diffstat (limited to 'libbe/bugdir.py')
-rw-r--r--libbe/bugdir.py476
1 files changed, 316 insertions, 160 deletions
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index 6e020ee..c4f0f91 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -17,23 +17,30 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the BugDir class for representing bug comments.
+"""
+
+import copy
+import errno
import os
import os.path
-import errno
+import sys
import time
-import copy
import unittest
import doctest
+import bug
+import encoding
from properties import Property, doc_property, local_property, \
defaulting_property, checked_property, fn_checked_property, \
cached_property, primed_property, change_hook_property, \
settings_property
-import settings_object
import mapfile
-import bug
-import rcs
-import encoding
+import vcs
+import settings_object
+import upgrade
import utility
@@ -62,8 +69,16 @@ class MultipleBugMatches(ValueError):
self.shortname = shortname
self.matches = matches
+class NoBugMatches(KeyError):
+ def __init__(self, shortname):
+ msg = "No bug matches %s" % shortname
+ KeyError.__init__(self, msg)
+ self.shortname = shortname
-TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
+class DiskAccessRequired (Exception):
+ def __init__(self, goal):
+ msg = "Cannot %s without accessing the disk" % goal
+ Exception.__init__(self, msg)
class BugDir (list, settings_object.SavedSettingsObject):
@@ -99,7 +114,8 @@ class BugDir (list, settings_object.SavedSettingsObject):
all bugs/comments/etc. that have been loaded into memory. If
you've been living in memory and want to move to
.sync_with_disk==True, but you're not sure if anything has been
- changed in memoryy, a call to save() is a safe move.
+ changed in memory, a call to save() immediately before the
+ .set_sync_with_disk(True) call is a safe move.
Regardless of .sync_with_disk, a call to .save() will write out
all the contents that the BugDir instance has loaded into memory.
@@ -107,15 +123,14 @@ class BugDir (list, settings_object.SavedSettingsObject):
changes, this .save() call will be a waste of time.
The BugDir will only load information from the file system when it
- loads new bugs/comments that it doesn't already have in memory, or
- when it explicitly asked to do so (e.g. .load() or
- __init__(from_disk=True)).
+ loads new settings/bugs/comments that it doesn't already have in
+ memory and .sync_with_disk == True.
- Allow RCS initialization
+ Allow VCS initialization
========================
This one is for testing purposes. Setting it to True allows the
- BugDir to search for an installed RCS backend and initialize it in
+ BugDir to search for an installed VCS backend and initialize it in
the root directory. This is a convenience option for supporting
tests of versioning functionality (e.g. .duplicate_bugdir).
@@ -172,9 +187,9 @@ class BugDir (list, settings_object.SavedSettingsObject):
def encoding(): return {}
def _setup_user_id(self, user_id):
- self.rcs.user_id = user_id
+ self.vcs.user_id = user_id
def _guess_user_id(self):
- return self.rcs.get_user_id()
+ return self.vcs.get_user_id()
def _set_user_id(self, old_user_id, new_user_id):
self._setup_user_id(new_user_id)
self._prop_save_settings(old_user_id, new_user_id)
@@ -182,7 +197,7 @@ class BugDir (list, settings_object.SavedSettingsObject):
@_versioned_property(name="user_id",
doc=
"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note
-that the Arch RCS backend *enforces* ids with this format.""",
+that the Arch VCS backend *enforces* ids with this format.""",
change_hook=_set_user_id,
generator=_guess_user_id)
def user_id(): return {}
@@ -192,32 +207,32 @@ that the Arch RCS backend *enforces* ids with this format.""",
"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
def default_assignee(): return {}
- @_versioned_property(name="rcs_name",
- doc="""The name of the current RCS. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .rcs instead, and
-.rcs_name will be automatically adjusted.""",
+ @_versioned_property(name="vcs_name",
+ doc="""The name of the current VCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .vcs instead, and
+.vcs_name will be automatically adjusted.""",
default="None",
allowed=["None", "Arch", "bzr", "darcs", "git", "hg"])
- def rcs_name(): return {}
+ def vcs_name(): return {}
- def _get_rcs(self, rcs_name=None):
+ def _get_vcs(self, vcs_name=None):
"""Get and root a new revision control system"""
- if rcs_name == None:
- rcs_name = self.rcs_name
- new_rcs = rcs.rcs_by_name(rcs_name)
- self._change_rcs(None, new_rcs)
- return new_rcs
- def _change_rcs(self, old_rcs, new_rcs):
- new_rcs.encoding = self.encoding
- new_rcs.root(self.root)
- self.rcs_name = new_rcs.name
+ if vcs_name == None:
+ vcs_name = self.vcs_name
+ new_vcs = vcs.vcs_by_name(vcs_name)
+ self._change_vcs(None, new_vcs)
+ return new_vcs
+ def _change_vcs(self, old_vcs, new_vcs):
+ new_vcs.encoding = self.encoding
+ new_vcs.root(self.root)
+ self.vcs_name = new_vcs.name
@Property
- @change_hook_property(hook=_change_rcs)
- @cached_property(generator=_get_rcs)
- @local_property("rcs")
+ @change_hook_property(hook=_change_vcs)
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
@doc_property(doc="A revision control system instance.")
- def rcs(): return {}
+ def vcs(): return {}
def _bug_map_gen(self):
map = {}
@@ -279,9 +294,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and
def __init__(self, root=None, sink_to_existing_root=True,
- assert_new_BugDir=False, allow_rcs_init=False,
- manipulate_encodings=True,
- from_disk=False, rcs=None):
+ assert_new_BugDir=False, allow_vcs_init=False,
+ manipulate_encodings=True, from_disk=False, vcs=None):
list.__init__(self)
settings_object.SavedSettingsObject.__init__(self)
self._manipulate_encodings = manipulate_encodings
@@ -293,9 +307,9 @@ settings easy. Don't set this attribute. Set .rcs instead, and
if not os.path.exists(root):
raise NoRootEntry(root)
self.root = root
- # get a temporary rcs until we've loaded settings
+ # get a temporary vcs until we've loaded settings
self.sync_with_disk = False
- self.rcs = self._guess_rcs()
+ self.vcs = self._guess_vcs()
if from_disk == True:
self.sync_with_disk = True
@@ -305,20 +319,24 @@ settings easy. Don't set this attribute. Set .rcs instead, and
if assert_new_BugDir == True:
if os.path.exists(self.get_path()):
raise AlreadyInitialized, self.get_path()
- if rcs == None:
- rcs = self._guess_rcs(allow_rcs_init)
- self.rcs = rcs
+ if vcs == None:
+ vcs = self._guess_vcs(allow_vcs_init)
+ self.vcs = vcs
self._setup_user_id(self.user_id)
- def set_sync_with_disk(self, value):
- self.sync_with_disk = value
- for bug in self:
- bug.set_sync_with_disk(value)
+ def __del__(self):
+ self.cleanup()
+
+ def cleanup(self):
+ self.vcs.cleanup()
+
+ # methods for getting the BugDir situated in the filesystem
def _find_root(self, path):
"""
Search for an existing bug database dir and it's ancestors and
- return a BugDir rooted there.
+ return a BugDir rooted there. Only called by __init__, and
+ then only if sink_to_existing_root == True.
"""
if not os.path.exists(path):
raise NoRootEntry(path)
@@ -334,136 +352,212 @@ settings easy. Don't set this attribute. Set .rcs instead, and
raise NoBugDir(path)
return beroot
- def get_version(self, path=None, use_none_rcs=False):
- if use_none_rcs == True:
- RCS = rcs.rcs_by_name("None")
- RCS.root(self.root)
- RCS.encoding = encoding.get_encoding()
+ def _guess_vcs(self, allow_vcs_init=False):
+ """
+ Only called by __init__.
+ """
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ new_vcs = vcs.detect_vcs(deepdir)
+ install = False
+ if new_vcs.name == "None":
+ if allow_vcs_init == True:
+ new_vcs = vcs.installed_vcs()
+ new_vcs.init(self.root)
+ return new_vcs
+
+ # methods for saving/loading/accessing settings and properties.
+
+ def get_path(self, *args):
+ """
+ Return a path relative to .root.
+ """
+ dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(dir, *args)
+
+ def _get_settings(self, settings_path, for_duplicate_bugdir=False):
+ allow_no_vcs = not self.vcs.path_in_root(settings_path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ if self.sync_with_disk == False and for_duplicate_bugdir == False:
+ # duplicates can ignore this bugdir's .sync_with_disk status
+ raise DiskAccessRequired("_get settings")
+ try:
+ settings = mapfile.map_load(self.vcs, settings_path, allow_no_vcs)
+ except vcs.NoSuchFile:
+ settings = {"vcs_name": "None"}
+ return settings
+
+ def _save_settings(self, settings_path, settings,
+ for_duplicate_bugdir=False):
+ allow_no_vcs = not self.vcs.path_in_root(settings_path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ if self.sync_with_disk == False and for_duplicate_bugdir == False:
+ # duplicates can ignore this bugdir's .sync_with_disk status
+ raise DiskAccessRequired("_save settings")
+ self.vcs.mkdir(self.get_path(), allow_no_vcs)
+ mapfile.map_save(self.vcs, settings_path, settings, allow_no_vcs)
+
+ def load_settings(self):
+ self.settings = self._get_settings(self.get_path("settings"))
+ self._setup_saved_settings()
+ self._setup_user_id(self.user_id)
+ self._setup_encoding(self.encoding)
+ self._setup_severities(self.severities)
+ self._setup_status(self.active_status, self.inactive_status)
+ self.vcs = vcs.vcs_by_name(self.vcs_name)
+ self._setup_user_id(self.user_id)
+
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self._save_settings(self.get_path("settings"), settings)
+
+ def get_version(self, path=None, use_none_vcs=False,
+ for_duplicate_bugdir=False):
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("get version")
+ if use_none_vcs == True:
+ VCS = vcs.vcs_by_name("None")
+ VCS.root(self.root)
+ VCS.encoding = encoding.get_encoding()
else:
- RCS = self.rcs
+ VCS = self.vcs
if path == None:
path = self.get_path("version")
- tree_version = RCS.get_file_contents(path)
- return tree_version
+ allow_no_vcs = not VCS.path_in_root(path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ version = VCS.get_file_contents(
+ path, allow_no_vcs=allow_no_vcs).rstrip("\n")
+ return version
def set_version(self):
- self.rcs.mkdir(self.get_path())
- self.rcs.set_file_contents(self.get_path("version"),
- TREE_VERSION_STRING)
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("set version")
+ self.vcs.mkdir(self.get_path())
+ self.vcs.set_file_contents(self.get_path("version"),
+ upgrade.BUGDIR_DISK_VERSION+"\n")
- def get_path(self, *args):
- my_dir = os.path.join(self.root, ".be")
- if len(args) == 0:
- return my_dir
- assert args[0] in ["version", "settings", "bugs"], str(args)
- return os.path.join(my_dir, *args)
+ # methods controlling disk access
- def _guess_rcs(self, allow_rcs_init=False):
- deepdir = self.get_path()
- if not os.path.exists(deepdir):
- deepdir = os.path.dirname(deepdir)
- new_rcs = rcs.detect_rcs(deepdir)
- install = False
- if new_rcs.name == "None":
- if allow_rcs_init == True:
- new_rcs = rcs.installed_rcs()
- new_rcs.init(self.root)
- return new_rcs
+ def set_sync_with_disk(self, value):
+ """
+ Adjust .sync_with_disk for the BugDir and all it's children.
+ See the BugDir docstring for a description of the role of
+ .sync_with_disk.
+ """
+ self.sync_with_disk = value
+ for bug in self:
+ bug.set_sync_with_disk(value)
def load(self):
- version = self.get_version(use_none_rcs=True)
- if version != TREE_VERSION_STRING:
- raise NotImplementedError, \
- "BugDir cannot handle version '%s' yet." % version
+ """
+ Reqires disk access
+ """
+ version = self.get_version(use_none_vcs=True)
+ if version != upgrade.BUGDIR_DISK_VERSION:
+ upgrade.upgrade(self.root, version)
else:
if not os.path.exists(self.get_path()):
raise NoBugDir(self.get_path())
self.load_settings()
- self.rcs = rcs.rcs_by_name(self.rcs_name)
- self._setup_user_id(self.user_id)
-
def load_all_bugs(self):
- "Warning: this could take a while."
+ """
+ Requires disk access.
+ Warning: this could take a while.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load all bugs")
self._clear_bugs()
for uuid in self.list_uuids():
self._load_bug(uuid)
def save(self):
"""
+ Note that this command writes to disk _regardless_ of the
+ status of .sync_with_disk.
+
Save any loaded contents to disk. Because of lazy loading of
bugs and comments, this is actually not too inefficient.
- However, if self.sync_with_disk = True, then any changes are
+ However, if .sync_with_disk = True, then any changes are
automatically written to disk as soon as they happen, so
calling this method will just waste time (unless something
else has been messing with your on-disk files).
+
+ Requires disk access.
"""
+ sync_with_disk = self.sync_with_disk
+ if sync_with_disk == False:
+ self.set_sync_with_disk(True)
self.set_version()
self.save_settings()
for bug in self:
bug.save()
+ if sync_with_disk == False:
+ self.set_sync_with_disk(sync_with_disk)
- def load_settings(self):
- self.settings = self._get_settings(self.get_path("settings"))
- self._setup_saved_settings()
- self._setup_user_id(self.user_id)
- self._setup_encoding(self.encoding)
- self._setup_severities(self.severities)
- self._setup_status(self.active_status, self.inactive_status)
+ # methods for managing duplicate BugDirs
- def _get_settings(self, settings_path):
- allow_no_rcs = not self.rcs.path_in_root(settings_path)
- # allow_no_rcs=True should only be for the special case of
- # configuring duplicate bugdir settings
+ def duplicate_bugdir(self, revision):
+ duplicate_path = self.vcs.duplicate_repo(revision)
+ duplicate_version_path = os.path.join(duplicate_path, ".be", "version")
try:
- settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
- except rcs.NoSuchFile:
- settings = {"rcs_name": "None"}
- return settings
-
- def save_settings(self):
- settings = self._get_saved_settings()
- self._save_settings(self.get_path("settings"), settings)
-
- def _save_settings(self, settings_path, settings):
- allow_no_rcs = not self.rcs.path_in_root(settings_path)
- # allow_no_rcs=True should only be for the special case of
- # configuring duplicate bugdir settings
- self.rcs.mkdir(self.get_path(), allow_no_rcs)
- mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
-
- def duplicate_bugdir(self, revision):
- duplicate_path = self.rcs.duplicate_repo(revision)
+ version = self.get_version(duplicate_version_path,
+ for_duplicate_bugdir=True)
+ except DiskAccessRequired:
+ self.sync_with_disk = True # temporarily allow access
+ version = self.get_version(duplicate_version_path,
+ for_duplicate_bugdir=True)
+ self.sync_with_disk = False
+ if version != upgrade.BUGDIR_DISK_VERSION:
+ upgrade.upgrade(duplicate_path, version)
- # setup revision RCS as None, since the duplicate may not be
+ # setup revision VCS as None, since the duplicate may not be
# initialized for versioning
duplicate_settings_path = os.path.join(duplicate_path,
".be", "settings")
- duplicate_settings = self._get_settings(duplicate_settings_path)
- if "rcs_name" in duplicate_settings:
- duplicate_settings["rcs_name"] = "None"
+ duplicate_settings = self._get_settings(duplicate_settings_path,
+ for_duplicate_bugdir=True)
+ if "vcs_name" in duplicate_settings:
+ duplicate_settings["vcs_name"] = "None"
duplicate_settings["user_id"] = self.user_id
if "disabled" in bug.status_values:
# Hack to support old versions of BE bugs
duplicate_settings["inactive_status"] = self.inactive_status
- self._save_settings(duplicate_settings_path, duplicate_settings)
+ self._save_settings(duplicate_settings_path, duplicate_settings,
+ for_duplicate_bugdir=True)
return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
def remove_duplicate_bugdir(self):
- self.rcs.remove_duplicate_repo()
+ self.vcs.remove_duplicate_repo()
+
+ # methods for managing bugs
def list_uuids(self):
uuids = []
- if os.path.exists(self.get_path()):
+ if self.sync_with_disk == True and os.path.exists(self.get_path()):
# list the uuids on disk
- for uuid in os.listdir(self.get_path("bugs")):
- if not (uuid.startswith('.')):
- uuids.append(uuid)
- yield uuid
+ if os.path.exists(self.get_path("bugs")):
+ for uuid in os.listdir(self.get_path("bugs")):
+ if not (uuid.startswith('.')):
+ uuids.append(uuid)
+ yield uuid
# and the ones that are still just in memory
for bug in self:
if bug.uuid not in uuids:
@@ -476,6 +570,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and
self._bug_map_gen()
def _load_bug(self, uuid):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("_load bug")
bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
self.append(bg)
self._bug_map_gen()
@@ -492,7 +588,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and
def remove_bug(self, bug):
self.remove(bug)
- bug.remove()
+ if bug.sync_with_disk == True:
+ bug.remove()
def bug_shortname(self, bug):
"""
@@ -514,12 +611,13 @@ settings easy. Don't set this attribute. Set .rcs instead, and
def bug_from_shortname(self, shortname):
"""
- >>> bd = simple_bug_dir()
+ >>> bd = SimpleBugDir(sync_with_disk=False)
>>> bug_a = bd.bug_from_shortname('a')
>>> print type(bug_a)
<class 'libbe.bug.Bug'>
>>> print bug_a
a:om: Bug A
+ >>> bd.cleanup()
"""
matches = []
self._bug_map_gen()
@@ -530,7 +628,7 @@ settings easy. Don't set this attribute. Set .rcs instead, and
raise MultipleBugMatches(shortname, matches)
if len(matches) == 1:
return self.bug_from_uuid(matches[0])
- raise KeyError("No bug matches %s" % shortname)
+ raise NoBugMatches(shortname)
def bug_from_uuid(self, uuid):
if not self.has_bug(uuid):
@@ -548,41 +646,56 @@ settings easy. Don't set this attribute. Set .rcs instead, and
return True
-def simple_bug_dir():
+class SimpleBugDir (BugDir):
"""
- For testing
- >>> bugdir = simple_bug_dir()
- >>> ls = list(bugdir.list_uuids())
- >>> ls.sort()
- >>> print ls
+ For testing. Set sync_with_disk==False for a memory-only bugdir.
+ >>> bugdir = SimpleBugDir()
+ >>> uuids = list(bugdir.list_uuids())
+ >>> uuids.sort()
+ >>> print uuids
['a', 'b']
+ >>> bugdir.cleanup()
"""
- dir = utility.Dir()
- assert os.path.exists(dir.path)
- bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True,
+ def __init__(self, sync_with_disk=True):
+ if sync_with_disk == True:
+ dir = utility.Dir()
+ assert os.path.exists(dir.path)
+ root = dir.path
+ assert_new_BugDir = True
+ vcs_init = True
+ else:
+ root = "/"
+ assert_new_BugDir = False
+ vcs_init = False
+ BugDir.__init__(self, root, sink_to_existing_root=False,
+ assert_new_BugDir=assert_new_BugDir,
+ allow_vcs_init=vcs_init,
manipulate_encodings=False)
- bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir.
- bug_a = bugdir.new_bug("a", summary="Bug A")
- bug_a.creator = "John Doe <jdoe@example.com>"
- bug_a.time = 0
- bug_b = bugdir.new_bug("b", summary="Bug B")
- bug_b.creator = "Jane Doe <jdoe@example.com>"
- bug_b.time = 0
- bug_b.status = "closed"
- bugdir.save()
- return bugdir
-
+ if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir.
+ self._dir_ref = dir
+ bug_a = self.new_bug("a", summary="Bug A")
+ bug_a.creator = "John Doe <jdoe@example.com>"
+ bug_a.time = 0
+ bug_b = self.new_bug("b", summary="Bug B")
+ bug_b.creator = "Jane Doe <jdoe@example.com>"
+ bug_b.time = 0
+ bug_b.status = "closed"
+ if sync_with_disk == True:
+ self.save()
+ self.set_sync_with_disk(True)
+ def cleanup(self):
+ if hasattr(self, "_dir_ref"):
+ self._dir_ref.cleanup()
+ BugDir.cleanup(self)
class BugDirTestCase(unittest.TestCase):
- def __init__(self, *args, **kwargs):
- unittest.TestCase.__init__(self, *args, **kwargs)
def setUp(self):
self.dir = utility.Dir()
self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
- allow_rcs_init=True)
- self.rcs = self.bugdir.rcs
+ allow_vcs_init=True)
+ self.vcs = self.bugdir.vcs
def tearDown(self):
- self.rcs.cleanup()
+ self.bugdir.cleanup()
self.dir.cleanup()
def fullPath(self, path):
return os.path.join(self.dir.path, path)
@@ -593,13 +706,13 @@ class BugDirTestCase(unittest.TestCase):
self.assertRaises(AlreadyInitialized, BugDir,
self.dir.path, assertNewBugDir=True)
def versionTest(self):
- if self.rcs.versioned == False:
+ if self.vcs.versioned == False:
return
- original = self.bugdir.rcs.commit("Began versioning")
+ original = self.bugdir.vcs.commit("Began versioning")
bugA = self.bugdir.bug_from_uuid("a")
bugA.status = "fixed"
self.bugdir.save()
- new = self.rcs.commit("Fixed bug a")
+ new = self.vcs.commit("Fixed bug a")
dupdir = self.bugdir.duplicate_bugdir(original)
self.failUnless(dupdir.root != self.bugdir.root,
"%s, %s" % (dupdir.root, self.bugdir.root))
@@ -645,17 +758,19 @@ class BugDirTestCase(unittest.TestCase):
rep.new_reply("And they have six legs.")
if sync_with_disk == False:
self.bugdir.save()
+ self.bugdir.set_sync_with_disk(True)
self.bugdir._clear_bugs()
bug = self.bugdir.bug_from_uuid("a")
bug.load_comments()
+ if sync_with_disk == False:
+ self.bugdir.set_sync_with_disk(False)
self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
for index,comment in enumerate(bug.comments()):
if index == 0:
repLoaded = comment
self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
- self.failUnless(comment.sync_with_disk == True,
+ self.failUnless(comment.sync_with_disk == sync_with_disk,
comment.sync_with_disk)
- #load_settings()
self.failUnless(comment.content_type == "text/plain",
comment.content_type)
self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
@@ -672,5 +787,46 @@ class BugDirTestCase(unittest.TestCase):
def testSyncedComments(self):
self.testComments(sync_with_disk=True)
-unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
-suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()])
+class SimpleBugDirTestCase (unittest.TestCase):
+ def setUp(self):
+ # create a pre-existing bugdir in a temporary directory
+ self.dir = utility.Dir()
+ self.original_working_dir = os.getcwd()
+ os.chdir(self.dir.path)
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+ allow_vcs_init=True)
+ self.bugdir.new_bug("preexisting", summary="Hopefully not imported")
+ self.bugdir.save()
+ def tearDown(self):
+ os.chdir(self.original_working_dir)
+ self.bugdir.cleanup()
+ self.dir.cleanup()
+ def testOnDiskCleanLoad(self):
+ """SimpleBugDir(sync_with_disk==True) should not import preexisting bugs."""
+ bugdir = SimpleBugDir(sync_with_disk=True)
+ self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir._clear_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+ bugdir.load_all_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir.cleanup()
+ def testInMemoryCleanLoad(self):
+ """SimpleBugDir(sync_with_disk==False) should not import preexisting bugs."""
+ bugdir = SimpleBugDir(sync_with_disk=False)
+ self.failUnless(bugdir.sync_with_disk==False, bugdir.sync_with_disk)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir._clear_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+ bugdir.cleanup()
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])