aboutsummaryrefslogtreecommitdiffstats
path: root/interfaces/email/interactive/libbe/bugdir.py
diff options
context:
space:
mode:
authorGianluca Montecchi <gian@grys.it>2009-10-02 23:46:24 +0200
committerGianluca Montecchi <gian@grys.it>2009-10-02 23:46:24 +0200
commitb0b3c9473e3a4b728ea72a2876e39fe41284a9ed (patch)
tree533a389e877b4b1a9c4099bb419eb221b2f12ada /interfaces/email/interactive/libbe/bugdir.py
parent071fef7c351c4fc23696aa6db693411b78da2edb (diff)
downloadbugseverywhere-b0b3c9473e3a4b728ea72a2876e39fe41284a9ed.tar.gz
Merged with Trevor's -rr branch
Diffstat (limited to 'interfaces/email/interactive/libbe/bugdir.py')
-rw-r--r--interfaces/email/interactive/libbe/bugdir.py832
1 files changed, 832 insertions, 0 deletions
diff --git a/interfaces/email/interactive/libbe/bugdir.py b/interfaces/email/interactive/libbe/bugdir.py
new file mode 100644
index 0000000..c4f0f91
--- /dev/null
+++ b/interfaces/email/interactive/libbe/bugdir.py
@@ -0,0 +1,832 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Alexander Belchenko <bialix@ukr.net>
+# Chris Ball <cjb@laptop.org>
+# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the BugDir class for representing bug comments.
+"""
+
+import copy
+import errno
+import os
+import os.path
+import sys
+import time
+import unittest
+import doctest
+
+import bug
+import encoding
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+import mapfile
+import vcs
+import settings_object
+import upgrade
+import utility
+
+
+class NoBugDir(Exception):
+ def __init__(self, path):
+ msg = "The directory \"%s\" has no bug directory." % path
+ Exception.__init__(self, msg)
+ self.path = path
+
+class NoRootEntry(Exception):
+ def __init__(self, path):
+ self.path = path
+ Exception.__init__(self, "Specified root does not exist: %s" % path)
+
+class AlreadyInitialized(Exception):
+ def __init__(self, path):
+ self.path = path
+ Exception.__init__(self,
+ "Specified root is already initialized: %s" % path)
+
+class MultipleBugMatches(ValueError):
+ def __init__(self, shortname, matches):
+ msg = ("More than one bug matches %s. "
+ "Please be more specific.\n%s" % (shortname, matches))
+ ValueError.__init__(self, msg)
+ self.shortname = shortname
+ self.matches = matches
+
+class NoBugMatches(KeyError):
+ def __init__(self, shortname):
+ msg = "No bug matches %s" % shortname
+ KeyError.__init__(self, msg)
+ self.shortname = shortname
+
+class DiskAccessRequired (Exception):
+ def __init__(self, goal):
+ msg = "Cannot %s without accessing the disk" % goal
+ Exception.__init__(self, msg)
+
+
+class BugDir (list, settings_object.SavedSettingsObject):
+ """
+ Sink to existing root
+ ======================
+
+ Consider the following usage case:
+ You have a bug directory rooted in
+ /path/to/source
+ by which I mean the '.be' directory is at
+ /path/to/source/.be
+ However, you're of in some subdirectory like
+ /path/to/source/GUI/testing
+ and you want to comment on a bug. Setting sink_to_root=True wen
+ you initialize your BugDir will cause it to search for the '.be'
+ file in the ancestors of the path you passed in as 'root'.
+ /path/to/source/GUI/testing/.be miss
+ /path/to/source/GUI/.be miss
+ /path/to/source/.be hit!
+ So it still roots itself appropriately without much work for you.
+
+ File-system access
+ ==================
+
+ BugDirs live completely in memory when .sync_with_disk is False.
+ This is the default configuration setup by BugDir(from_disk=False).
+ If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
+ any changes to the BugDir will be immediately written to disk.
+
+ If you want to change .sync_with_disk, we suggest you use
+ .set_sync_with_disk(), which propogates the new setting through to
+ all bugs/comments/etc. that have been loaded into memory. If
+ you've been living in memory and want to move to
+ .sync_with_disk==True, but you're not sure if anything has been
+ changed in memory, a call to save() immediately before the
+ .set_sync_with_disk(True) call is a safe move.
+
+ Regardless of .sync_with_disk, a call to .save() will write out
+ all the contents that the BugDir instance has loaded into memory.
+ If sync_with_disk has been True over the course of all interesting
+ changes, this .save() call will be a waste of time.
+
+ The BugDir will only load information from the file system when it
+ loads new settings/bugs/comments that it doesn't already have in
+ memory and .sync_with_disk == True.
+
+ Allow VCS initialization
+ ========================
+
+ This one is for testing purposes. Setting it to True allows the
+ BugDir to search for an installed VCS backend and initialize it in
+ the root directory. This is a convenience option for supporting
+ tests of versioning functionality (e.g. .duplicate_bugdir).
+
+ Disable encoding manipulation
+ =============================
+
+ This one is for testing purposed. You might have non-ASCII
+ Unicode in your bugs, comments, files, etc. BugDir instances try
+ and support your preferred encoding scheme (e.g. "utf-8") when
+ dealing with stream and file input/output. For stream output,
+ this involves replacing sys.stdout and sys.stderr
+ (libbe.encode.set_IO_stream_encodings). However this messes up
+ doctest's output catching. In order to support doctest tests
+ using BugDirs, set manipulate_encodings=False, and stick to ASCII
+ in your tests.
+ """
+
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+
+ @_versioned_property(name="target",
+ doc="The current project development target.")
+ def target(): return {}
+
+ def _guess_encoding(self):
+ return encoding.get_encoding()
+ def _check_encoding(value):
+ if value != None:
+ return encoding.known_encoding(value)
+ def _setup_encoding(self, new_encoding):
+ # change hook called before generator.
+ if new_encoding not in [None, settings_object.EMPTY]:
+ if self._manipulate_encodings == True:
+ encoding.set_IO_stream_encodings(new_encoding)
+ def _set_encoding(self, old_encoding, new_encoding):
+ self._setup_encoding(new_encoding)
+ self._prop_save_settings(old_encoding, new_encoding)
+
+ @_versioned_property(name="encoding",
+ doc="""The default input/output encoding to use (e.g. "utf-8").""",
+ change_hook=_set_encoding,
+ generator=_guess_encoding,
+ check_fn=_check_encoding)
+ def encoding(): return {}
+
+ def _setup_user_id(self, user_id):
+ self.vcs.user_id = user_id
+ def _guess_user_id(self):
+ return self.vcs.get_user_id()
+ def _set_user_id(self, old_user_id, new_user_id):
+ self._setup_user_id(new_user_id)
+ self._prop_save_settings(old_user_id, new_user_id)
+
+ @_versioned_property(name="user_id",
+ doc=
+"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note
+that the Arch VCS backend *enforces* ids with this format.""",
+ change_hook=_set_user_id,
+ generator=_guess_user_id)
+ def user_id(): return {}
+
+ @_versioned_property(name="default_assignee",
+ doc=
+"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
+ def default_assignee(): return {}
+
+ @_versioned_property(name="vcs_name",
+ doc="""The name of the current VCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .vcs instead, and
+.vcs_name will be automatically adjusted.""",
+ default="None",
+ allowed=["None", "Arch", "bzr", "darcs", "git", "hg"])
+ def vcs_name(): return {}
+
+ def _get_vcs(self, vcs_name=None):
+ """Get and root a new revision control system"""
+ if vcs_name == None:
+ vcs_name = self.vcs_name
+ new_vcs = vcs.vcs_by_name(vcs_name)
+ self._change_vcs(None, new_vcs)
+ return new_vcs
+ def _change_vcs(self, old_vcs, new_vcs):
+ new_vcs.encoding = self.encoding
+ new_vcs.root(self.root)
+ self.vcs_name = new_vcs.name
+
+ @Property
+ @change_hook_property(hook=_change_vcs)
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
+ @doc_property(doc="A revision control system instance.")
+ def vcs(): return {}
+
+ def _bug_map_gen(self):
+ map = {}
+ for bug in self:
+ map[bug.uuid] = bug
+ for uuid in self.list_uuids():
+ if uuid not in map:
+ map[uuid] = None
+ self._bug_map_value = map # ._bug_map_value used by @local_property
+
+ def _extra_strings_check_fn(value):
+ return utility.iterable_full_of_strings(value, \
+ alternative=settings_object.EMPTY)
+ def _extra_strings_change_hook(self, old, new):
+ self.extra_strings.sort() # to make merging easier
+ self._prop_save_settings(old, new)
+ @_versioned_property(name="extra_strings",
+ doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
+ default=[],
+ check_fn=_extra_strings_check_fn,
+ change_hook=_extra_strings_change_hook,
+ mutable=True)
+ def extra_strings(): return {}
+
+ @Property
+ @primed_property(primer=_bug_map_gen)
+ @local_property("bug_map")
+ @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
+ def _bug_map(): return {}
+
+ def _setup_severities(self, severities):
+ if severities not in [None, settings_object.EMPTY]:
+ bug.load_severities(severities)
+ def _set_severities(self, old_severities, new_severities):
+ self._setup_severities(new_severities)
+ self._prop_save_settings(old_severities, new_severities)
+ @_versioned_property(name="severities",
+ doc="The allowed bug severities and their descriptions.",
+ change_hook=_set_severities)
+ def severities(): return {}
+
+ def _setup_status(self, active_status, inactive_status):
+ bug.load_status(active_status, inactive_status)
+ def _set_active_status(self, old_active_status, new_active_status):
+ self._setup_status(new_active_status, self.inactive_status)
+ self._prop_save_settings(old_active_status, new_active_status)
+ @_versioned_property(name="active_status",
+ doc="The allowed active bug states and their descriptions.",
+ change_hook=_set_active_status)
+ def active_status(): return {}
+
+ def _set_inactive_status(self, old_inactive_status, new_inactive_status):
+ self._setup_status(self.active_status, new_inactive_status)
+ self._prop_save_settings(old_inactive_status, new_inactive_status)
+ @_versioned_property(name="inactive_status",
+ doc="The allowed inactive bug states and their descriptions.",
+ change_hook=_set_inactive_status)
+ def inactive_status(): return {}
+
+
+ def __init__(self, root=None, sink_to_existing_root=True,
+ assert_new_BugDir=False, allow_vcs_init=False,
+ manipulate_encodings=True, from_disk=False, vcs=None):
+ list.__init__(self)
+ settings_object.SavedSettingsObject.__init__(self)
+ self._manipulate_encodings = manipulate_encodings
+ if root == None:
+ root = os.getcwd()
+ if sink_to_existing_root == True:
+ self.root = self._find_root(root)
+ else:
+ if not os.path.exists(root):
+ raise NoRootEntry(root)
+ self.root = root
+ # get a temporary vcs until we've loaded settings
+ self.sync_with_disk = False
+ self.vcs = self._guess_vcs()
+
+ if from_disk == True:
+ self.sync_with_disk = True
+ self.load()
+ else:
+ self.sync_with_disk = False
+ if assert_new_BugDir == True:
+ if os.path.exists(self.get_path()):
+ raise AlreadyInitialized, self.get_path()
+ if vcs == None:
+ vcs = self._guess_vcs(allow_vcs_init)
+ self.vcs = vcs
+ self._setup_user_id(self.user_id)
+
+ def __del__(self):
+ self.cleanup()
+
+ def cleanup(self):
+ self.vcs.cleanup()
+
+ # methods for getting the BugDir situated in the filesystem
+
+ def _find_root(self, path):
+ """
+ Search for an existing bug database dir and it's ancestors and
+ return a BugDir rooted there. Only called by __init__, and
+ then only if sink_to_existing_root == True.
+ """
+ if not os.path.exists(path):
+ raise NoRootEntry(path)
+ versionfile=utility.search_parent_directories(path,
+ os.path.join(".be", "version"))
+ if versionfile != None:
+ beroot = os.path.dirname(versionfile)
+ root = os.path.dirname(beroot)
+ return root
+ else:
+ beroot = utility.search_parent_directories(path, ".be")
+ if beroot == None:
+ raise NoBugDir(path)
+ return beroot
+
+ def _guess_vcs(self, allow_vcs_init=False):
+ """
+ Only called by __init__.
+ """
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ new_vcs = vcs.detect_vcs(deepdir)
+ install = False
+ if new_vcs.name == "None":
+ if allow_vcs_init == True:
+ new_vcs = vcs.installed_vcs()
+ new_vcs.init(self.root)
+ return new_vcs
+
+ # methods for saving/loading/accessing settings and properties.
+
+ def get_path(self, *args):
+ """
+ Return a path relative to .root.
+ """
+ dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(dir, *args)
+
+ def _get_settings(self, settings_path, for_duplicate_bugdir=False):
+ allow_no_vcs = not self.vcs.path_in_root(settings_path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ if self.sync_with_disk == False and for_duplicate_bugdir == False:
+ # duplicates can ignore this bugdir's .sync_with_disk status
+ raise DiskAccessRequired("_get settings")
+ try:
+ settings = mapfile.map_load(self.vcs, settings_path, allow_no_vcs)
+ except vcs.NoSuchFile:
+ settings = {"vcs_name": "None"}
+ return settings
+
+ def _save_settings(self, settings_path, settings,
+ for_duplicate_bugdir=False):
+ allow_no_vcs = not self.vcs.path_in_root(settings_path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ if self.sync_with_disk == False and for_duplicate_bugdir == False:
+ # duplicates can ignore this bugdir's .sync_with_disk status
+ raise DiskAccessRequired("_save settings")
+ self.vcs.mkdir(self.get_path(), allow_no_vcs)
+ mapfile.map_save(self.vcs, settings_path, settings, allow_no_vcs)
+
+ def load_settings(self):
+ self.settings = self._get_settings(self.get_path("settings"))
+ self._setup_saved_settings()
+ self._setup_user_id(self.user_id)
+ self._setup_encoding(self.encoding)
+ self._setup_severities(self.severities)
+ self._setup_status(self.active_status, self.inactive_status)
+ self.vcs = vcs.vcs_by_name(self.vcs_name)
+ self._setup_user_id(self.user_id)
+
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self._save_settings(self.get_path("settings"), settings)
+
+ def get_version(self, path=None, use_none_vcs=False,
+ for_duplicate_bugdir=False):
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("get version")
+ if use_none_vcs == True:
+ VCS = vcs.vcs_by_name("None")
+ VCS.root(self.root)
+ VCS.encoding = encoding.get_encoding()
+ else:
+ VCS = self.vcs
+
+ if path == None:
+ path = self.get_path("version")
+ allow_no_vcs = not VCS.path_in_root(path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ version = VCS.get_file_contents(
+ path, allow_no_vcs=allow_no_vcs).rstrip("\n")
+ return version
+
+ def set_version(self):
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("set version")
+ self.vcs.mkdir(self.get_path())
+ self.vcs.set_file_contents(self.get_path("version"),
+ upgrade.BUGDIR_DISK_VERSION+"\n")
+
+ # methods controlling disk access
+
+ def set_sync_with_disk(self, value):
+ """
+ Adjust .sync_with_disk for the BugDir and all it's children.
+ See the BugDir docstring for a description of the role of
+ .sync_with_disk.
+ """
+ self.sync_with_disk = value
+ for bug in self:
+ bug.set_sync_with_disk(value)
+
+ def load(self):
+ """
+ Reqires disk access
+ """
+ version = self.get_version(use_none_vcs=True)
+ if version != upgrade.BUGDIR_DISK_VERSION:
+ upgrade.upgrade(self.root, version)
+ else:
+ if not os.path.exists(self.get_path()):
+ raise NoBugDir(self.get_path())
+ self.load_settings()
+
+ def load_all_bugs(self):
+ """
+ Requires disk access.
+ Warning: this could take a while.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load all bugs")
+ self._clear_bugs()
+ for uuid in self.list_uuids():
+ self._load_bug(uuid)
+
+ def save(self):
+ """
+ Note that this command writes to disk _regardless_ of the
+ status of .sync_with_disk.
+
+ Save any loaded contents to disk. Because of lazy loading of
+ bugs and comments, this is actually not too inefficient.
+
+ However, if .sync_with_disk = True, then any changes are
+ automatically written to disk as soon as they happen, so
+ calling this method will just waste time (unless something
+ else has been messing with your on-disk files).
+
+ Requires disk access.
+ """
+ sync_with_disk = self.sync_with_disk
+ if sync_with_disk == False:
+ self.set_sync_with_disk(True)
+ self.set_version()
+ self.save_settings()
+ for bug in self:
+ bug.save()
+ if sync_with_disk == False:
+ self.set_sync_with_disk(sync_with_disk)
+
+ # methods for managing duplicate BugDirs
+
+ def duplicate_bugdir(self, revision):
+ duplicate_path = self.vcs.duplicate_repo(revision)
+
+ duplicate_version_path = os.path.join(duplicate_path, ".be", "version")
+ try:
+ version = self.get_version(duplicate_version_path,
+ for_duplicate_bugdir=True)
+ except DiskAccessRequired:
+ self.sync_with_disk = True # temporarily allow access
+ version = self.get_version(duplicate_version_path,
+ for_duplicate_bugdir=True)
+ self.sync_with_disk = False
+ if version != upgrade.BUGDIR_DISK_VERSION:
+ upgrade.upgrade(duplicate_path, version)
+
+ # setup revision VCS as None, since the duplicate may not be
+ # initialized for versioning
+ duplicate_settings_path = os.path.join(duplicate_path,
+ ".be", "settings")
+ duplicate_settings = self._get_settings(duplicate_settings_path,
+ for_duplicate_bugdir=True)
+ if "vcs_name" in duplicate_settings:
+ duplicate_settings["vcs_name"] = "None"
+ duplicate_settings["user_id"] = self.user_id
+ if "disabled" in bug.status_values:
+ # Hack to support old versions of BE bugs
+ duplicate_settings["inactive_status"] = self.inactive_status
+ self._save_settings(duplicate_settings_path, duplicate_settings,
+ for_duplicate_bugdir=True)
+
+ return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
+
+ def remove_duplicate_bugdir(self):
+ self.vcs.remove_duplicate_repo()
+
+ # methods for managing bugs
+
+ def list_uuids(self):
+ uuids = []
+ if self.sync_with_disk == True and os.path.exists(self.get_path()):
+ # list the uuids on disk
+ if os.path.exists(self.get_path("bugs")):
+ for uuid in os.listdir(self.get_path("bugs")):
+ if not (uuid.startswith('.')):
+ uuids.append(uuid)
+ yield uuid
+ # and the ones that are still just in memory
+ for bug in self:
+ if bug.uuid not in uuids:
+ uuids.append(bug.uuid)
+ yield bug.uuid
+
+ def _clear_bugs(self):
+ while len(self) > 0:
+ self.pop()
+ self._bug_map_gen()
+
+ def _load_bug(self, uuid):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("_load bug")
+ bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
+ self.append(bg)
+ self._bug_map_gen()
+ return bg
+
+ def new_bug(self, uuid=None, summary=None):
+ bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary)
+ bg.set_sync_with_disk(self.sync_with_disk)
+ if bg.sync_with_disk == True:
+ bg.save()
+ self.append(bg)
+ self._bug_map_gen()
+ return bg
+
+ def remove_bug(self, bug):
+ self.remove(bug)
+ if bug.sync_with_disk == True:
+ bug.remove()
+
+ def bug_shortname(self, bug):
+ """
+ Generate short names from uuids. Picks the minimum number of
+ characters (>=3) from the beginning of the uuid such that the
+ short names are unique.
+
+ Obviously, as the number of bugs in the database grows, these
+ short names will cease to be unique. The complete uuid should be
+ used for long term reference.
+ """
+ chars = 3
+ for uuid in self._bug_map.keys():
+ if bug.uuid == uuid:
+ continue
+ while (bug.uuid[:chars] == uuid[:chars]):
+ chars+=1
+ return bug.uuid[:chars]
+
+ def bug_from_shortname(self, shortname):
+ """
+ >>> bd = SimpleBugDir(sync_with_disk=False)
+ >>> bug_a = bd.bug_from_shortname('a')
+ >>> print type(bug_a)
+ <class 'libbe.bug.Bug'>
+ >>> print bug_a
+ a:om: Bug A
+ >>> bd.cleanup()
+ """
+ matches = []
+ self._bug_map_gen()
+ for uuid in self._bug_map.keys():
+ if uuid.startswith(shortname):
+ matches.append(uuid)
+ if len(matches) > 1:
+ raise MultipleBugMatches(shortname, matches)
+ if len(matches) == 1:
+ return self.bug_from_uuid(matches[0])
+ raise NoBugMatches(shortname)
+
+ def bug_from_uuid(self, uuid):
+ if not self.has_bug(uuid):
+ raise KeyError("No bug matches %s\n bug map: %s\n root: %s" \
+ % (uuid, self._bug_map, self.root))
+ if self._bug_map[uuid] == None:
+ self._load_bug(uuid)
+ return self._bug_map[uuid]
+
+ def has_bug(self, bug_uuid):
+ if bug_uuid not in self._bug_map:
+ self._bug_map_gen()
+ if bug_uuid not in self._bug_map:
+ return False
+ return True
+
+
+class SimpleBugDir (BugDir):
+ """
+ For testing. Set sync_with_disk==False for a memory-only bugdir.
+ >>> bugdir = SimpleBugDir()
+ >>> uuids = list(bugdir.list_uuids())
+ >>> uuids.sort()
+ >>> print uuids
+ ['a', 'b']
+ >>> bugdir.cleanup()
+ """
+ def __init__(self, sync_with_disk=True):
+ if sync_with_disk == True:
+ dir = utility.Dir()
+ assert os.path.exists(dir.path)
+ root = dir.path
+ assert_new_BugDir = True
+ vcs_init = True
+ else:
+ root = "/"
+ assert_new_BugDir = False
+ vcs_init = False
+ BugDir.__init__(self, root, sink_to_existing_root=False,
+ assert_new_BugDir=assert_new_BugDir,
+ allow_vcs_init=vcs_init,
+ manipulate_encodings=False)
+ if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir.
+ self._dir_ref = dir
+ bug_a = self.new_bug("a", summary="Bug A")
+ bug_a.creator = "John Doe <jdoe@example.com>"
+ bug_a.time = 0
+ bug_b = self.new_bug("b", summary="Bug B")
+ bug_b.creator = "Jane Doe <jdoe@example.com>"
+ bug_b.time = 0
+ bug_b.status = "closed"
+ if sync_with_disk == True:
+ self.save()
+ self.set_sync_with_disk(True)
+ def cleanup(self):
+ if hasattr(self, "_dir_ref"):
+ self._dir_ref.cleanup()
+ BugDir.cleanup(self)
+
+class BugDirTestCase(unittest.TestCase):
+ def setUp(self):
+ self.dir = utility.Dir()
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+ allow_vcs_init=True)
+ self.vcs = self.bugdir.vcs
+ def tearDown(self):
+ self.bugdir.cleanup()
+ self.dir.cleanup()
+ def fullPath(self, path):
+ return os.path.join(self.dir.path, path)
+ def assertPathExists(self, path):
+ fullpath = self.fullPath(path)
+ self.failUnless(os.path.exists(fullpath)==True,
+ "path %s does not exist" % fullpath)
+ self.assertRaises(AlreadyInitialized, BugDir,
+ self.dir.path, assertNewBugDir=True)
+ def versionTest(self):
+ if self.vcs.versioned == False:
+ return
+ original = self.bugdir.vcs.commit("Began versioning")
+ bugA = self.bugdir.bug_from_uuid("a")
+ bugA.status = "fixed"
+ self.bugdir.save()
+ new = self.vcs.commit("Fixed bug a")
+ dupdir = self.bugdir.duplicate_bugdir(original)
+ self.failUnless(dupdir.root != self.bugdir.root,
+ "%s, %s" % (dupdir.root, self.bugdir.root))
+ bugAorig = dupdir.bug_from_uuid("a")
+ self.failUnless(bugA != bugAorig,
+ "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+ bugAorig.status = "fixed"
+ self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.status, bugAorig.status))
+ self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.severity, bugAorig.severity))
+ self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.assigned, bugAorig.assigned))
+ self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.time, bugAorig.time))
+ self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.creator, bugAorig.creator))
+ self.failUnless(bugA == bugAorig,
+ "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+ self.bugdir.remove_duplicate_bugdir()
+ self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root))
+ def testRun(self):
+ self.bugdir.new_bug(uuid="a", summary="Ant")
+ self.bugdir.new_bug(uuid="b", summary="Cockroach")
+ self.bugdir.new_bug(uuid="c", summary="Praying mantis")
+ length = len(self.bugdir)
+ self.failUnless(length == 3, "%d != 3 bugs" % length)
+ uuids = list(self.bugdir.list_uuids())
+ self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids))
+ self.failUnless(uuids == ["a","b","c"], str(uuids))
+ bugA = self.bugdir.bug_from_uuid("a")
+ bugAprime = self.bugdir.bug_from_shortname("a")
+ self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
+ self.bugdir.save()
+ self.versionTest()
+ def testComments(self, sync_with_disk=False):
+ if sync_with_disk == True:
+ self.bugdir.set_sync_with_disk(True)
+ self.bugdir.new_bug(uuid="a", summary="Ant")
+ bug = self.bugdir.bug_from_uuid("a")
+ comm = bug.comment_root
+ rep = comm.new_reply("Ants are small.")
+ rep.new_reply("And they have six legs.")
+ if sync_with_disk == False:
+ self.bugdir.save()
+ self.bugdir.set_sync_with_disk(True)
+ self.bugdir._clear_bugs()
+ bug = self.bugdir.bug_from_uuid("a")
+ bug.load_comments()
+ if sync_with_disk == False:
+ self.bugdir.set_sync_with_disk(False)
+ self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
+ for index,comment in enumerate(bug.comments()):
+ if index == 0:
+ repLoaded = comment
+ self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
+ self.failUnless(comment.sync_with_disk == sync_with_disk,
+ comment.sync_with_disk)
+ self.failUnless(comment.content_type == "text/plain",
+ comment.content_type)
+ self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
+ repLoaded.settings)
+ self.failUnless(repLoaded.body == "Ants are small.",
+ repLoaded.body)
+ elif index == 1:
+ self.failUnless(comment.in_reply_to == repLoaded.uuid,
+ repLoaded.uuid)
+ self.failUnless(comment.body == "And they have six legs.",
+ comment.body)
+ else:
+ self.failIf(True, "Invalid comment: %d\n%s" % (index, comment))
+ def testSyncedComments(self):
+ self.testComments(sync_with_disk=True)
+
+class SimpleBugDirTestCase (unittest.TestCase):
+ def setUp(self):
+ # create a pre-existing bugdir in a temporary directory
+ self.dir = utility.Dir()
+ self.original_working_dir = os.getcwd()
+ os.chdir(self.dir.path)
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+ allow_vcs_init=True)
+ self.bugdir.new_bug("preexisting", summary="Hopefully not imported")
+ self.bugdir.save()
+ def tearDown(self):
+ os.chdir(self.original_working_dir)
+ self.bugdir.cleanup()
+ self.dir.cleanup()
+ def testOnDiskCleanLoad(self):
+ """SimpleBugDir(sync_with_disk==True) should not import preexisting bugs."""
+ bugdir = SimpleBugDir(sync_with_disk=True)
+ self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir._clear_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+ bugdir.load_all_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir.cleanup()
+ def testInMemoryCleanLoad(self):
+ """SimpleBugDir(sync_with_disk==False) should not import preexisting bugs."""
+ bugdir = SimpleBugDir(sync_with_disk=False)
+ self.failUnless(bugdir.sync_with_disk==False, bugdir.sync_with_disk)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir._clear_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+ bugdir.cleanup()
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])