aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
Diffstat (limited to 'libbe')
-rw-r--r--libbe/bug.py48
-rw-r--r--libbe/bugdir.py903
-rw-r--r--libbe/comment.py57
-rw-r--r--libbe/storage/base.py19
-rw-r--r--libbe/storage/properties.py2
-rw-r--r--libbe/storage/vcs/base.py156
-rw-r--r--libbe/util/encoding.py23
-rw-r--r--libbe/util/id.py35
8 files changed, 618 insertions, 625 deletions
diff --git a/libbe/bug.py b/libbe/bug.py
index 1aa34fd..7bb52bc 100644
--- a/libbe/bug.py
+++ b/libbe/bug.py
@@ -177,7 +177,7 @@ class Bug(settings_object.SavedSettingsObject):
def _get_user_id(self):
if self.bugdir != None:
- return self.bugdir.get_user_id()
+ return self.bugdir._get_user_id()
return None
@_versioned_property(name="creator",
@@ -227,7 +227,7 @@ class Bug(settings_object.SavedSettingsObject):
def _get_comment_root(self, load_full=False):
if self.storage != None and self.storage.is_readable():
- return comment.loadComments(self, load_full=load_full)
+ return comment.load_comments(self, load_full=load_full)
else:
return comment.Comment(self, uuid=comment.INVALID_UUID)
@@ -247,18 +247,26 @@ class Bug(settings_object.SavedSettingsObject):
@doc_property(doc="A revision control system instance.")
def storage(): return {}
- def __init__(self, bugdir=None, uuid=None, from_disk=False,
+ def __init__(self, bugdir=None, uuid=None, from_storage=False,
load_comments=False, summary=None):
settings_object.SavedSettingsObject.__init__(self)
self.bugdir = bugdir
self.uuid = uuid
- if from_disk == False:
+ if from_storage == False:
if uuid == None:
self.uuid = libbe.util.id.uuid_gen()
self.settings = {}
self._setup_saved_settings()
+ if self.storage != None and self.storage.is_writeable():
+ self.storage.writeable = False
+ set_writeable = True
+ else:
+ set_writeable = False
self.time = int(time.time()) # only save to second precision
self.summary = summary
+ if set_writeable == True:
+ self.storage.writeable = True
+ self.save()
def __repr__(self):
return "Bug(uuid=%r)" % self.uuid
@@ -462,8 +470,9 @@ class Bug(settings_object.SavedSettingsObject):
if c.alt_id != None:
uuid_map[c.alt_id] = c
uuid_map[None] = self.comment_root
+ uuid_map[comment.INVALID_UUID] = self.comment_root
if default_parent != self.comment_root:
- assert default_parent.uuid in uuid_map, default_parent
+ assert default_parent.uuid in uuid_map, default_parent.uuid
for c in comments:
if c.in_reply_to == None \
and default_parent.uuid != comment.INVALID_UUID:
@@ -647,12 +656,15 @@ class Bug(settings_object.SavedSettingsObject):
def id(self, *args):
assert len(args) <= 1, str(args)
- assert args[0] in ["values"], str(args)
- return libbe.util.id.comment_id(self, args)
-
- def load_settings(self):
- mf = self.storage.get(self.id("values"), default="\n")
- self.settings = mapfile.parse(mf)
+ if len(args) == 1:
+ assert args[0] in ["values"], str(args)
+ return libbe.util.id.bug_id(self, *args)
+
+ def load_settings(self, settings_mapfile=None):
+ if settings_mapfile == None:
+ settings_mapfile = \
+ self.storage.get(self.id("values"), default="\n")
+ self.settings = mapfile.parse(settings_mapfile)
self._setup_saved_settings()
def save_settings(self):
@@ -661,8 +673,8 @@ class Bug(settings_object.SavedSettingsObject):
def save(self):
"""
- Save any loaded contents to disk. Because of lazy loading of
- comments, this is actually not too inefficient.
+ Save any loaded contents to storage. Because of lazy loading
+ of comments, this is actually not too inefficient.
However, if self.storage.is_writeable() == True, then any
changes are automatically written to storage as soon as they
@@ -670,11 +682,15 @@ class Bug(settings_object.SavedSettingsObject):
something else has been messing with your stored files).
"""
assert self.storage != None, "Can't save without storage"
- self.storage.add(self.id())
- self.storage.add(self.id('values'))
+ if self.bugdir != None:
+ parent = self.bugdir.id()
+ else:
+ parent = None
+ self.storage.add(self.id(), parent=parent)
+ self.storage.add(self.id('values'), parent=self.id())
self.save_settings()
if len(self.comment_root) > 0:
- comment.saveComments(self)
+ comment.save_comments(self)
def load_comments(self, load_full=True):
if load_full == True:
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index 7005181..cf42747 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -27,24 +27,26 @@ import copy
import errno
import os
import os.path
-import sys
import time
import libbe
-import bug
-import encoding
-from properties import Property, doc_property, local_property, \
+import libbe.util.encoding as encoding
+import libbe.storage as storage
+from libbe.storage.properties import Property, doc_property, local_property, \
defaulting_property, checked_property, fn_checked_property, \
cached_property, primed_property, change_hook_property, \
settings_property
-import mapfile
-import vcs
-import settings_object
-import upgrade
-import utility
+import libbe.storage.settings_object as settings_object
+import libbe.storage.util.mapfile as mapfile
+import libbe.bug as bug
+import libbe.util.utility as utility
+
if libbe.TESTING == True:
- import unittest
import doctest
+ import sys
+ import unittest
+
+ import libbe.storage.base
class NoBugDir(Exception):
@@ -86,69 +88,7 @@ class DiskAccessRequired (Exception):
class BugDir (list, settings_object.SavedSettingsObject):
"""
- Sink to existing root
- ======================
-
- Consider the following usage case:
- You have a bug directory rooted in
- /path/to/source
- by which I mean the '.be' directory is at
- /path/to/source/.be
- However, you're of in some subdirectory like
- /path/to/source/GUI/testing
- and you want to comment on a bug. Setting sink_to_root=True wen
- you initialize your BugDir will cause it to search for the '.be'
- file in the ancestors of the path you passed in as 'root'.
- /path/to/source/GUI/testing/.be miss
- /path/to/source/GUI/.be miss
- /path/to/source/.be hit!
- So it still roots itself appropriately without much work for you.
-
- File-system access
- ==================
-
- BugDirs live completely in memory when .sync_with_disk is False.
- This is the default configuration setup by BugDir(from_disk=False).
- If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
- any changes to the BugDir will be immediately written to disk.
-
- If you want to change .sync_with_disk, we suggest you use
- .set_sync_with_disk(), which propogates the new setting through to
- all bugs/comments/etc. that have been loaded into memory. If
- you've been living in memory and want to move to
- .sync_with_disk==True, but you're not sure if anything has been
- changed in memory, a call to .save() immediately before the
- .set_sync_with_disk(True) call is a safe move.
-
- Regardless of .sync_with_disk, a call to .save() will write out
- all the contents that the BugDir instance has loaded into memory.
- If sync_with_disk has been True over the course of all interesting
- changes, this .save() call will be a waste of time.
-
- The BugDir will only load information from the file system when it
- loads new settings/bugs/comments that it doesn't already have in
- memory and .sync_with_disk == True.
-
- Allow VCS initialization
- ========================
-
- This one is for testing purposes. Setting it to True allows the
- BugDir to search for an installed VCS backend and initialize it in
- the root directory. This is a convenience option for supporting
- tests of versioning functionality (e.g. .duplicate_bugdir).
-
- Disable encoding manipulation
- =============================
-
- This one is for testing purposed. You might have non-ASCII
- Unicode in your bugs, comments, files, etc. BugDir instances try
- and support your preferred encoding scheme (e.g. "utf-8") when
- dealing with stream and file input/output. For stream output,
- this involves replacing sys.stdout and sys.stderr
- (libbe.encode.set_IO_stream_encodings). However this messes up
- doctest's output catching. In order to support doctest tests
- using BugDirs, set manipulate_encodings=False, and stick to ASCII
- in your tests.
+ TODO: simple bugdir manipulation examples...
"""
settings_properties = []
@@ -168,104 +108,6 @@ class BugDir (list, settings_object.SavedSettingsObject):
doc="The current project development target.")
def target(): return {}
- def _guess_encoding(self):
- return encoding.get_encoding()
- def _check_encoding(value):
- if value != None:
- return encoding.known_encoding(value)
- def _setup_encoding(self, new_encoding):
- # change hook called before generator.
- if new_encoding not in [None, settings_object.EMPTY]:
- if self._manipulate_encodings == True:
- encoding.set_IO_stream_encodings(new_encoding)
- def _set_encoding(self, old_encoding, new_encoding):
- self._setup_encoding(new_encoding)
- self._prop_save_settings(old_encoding, new_encoding)
-
- @_versioned_property(name="encoding",
- doc="""The default input/output encoding to use (e.g. "utf-8").""",
- change_hook=_set_encoding,
- generator=_guess_encoding,
- check_fn=_check_encoding)
- def encoding(): return {}
-
- def _setup_user_id(self, user_id):
- self.vcs.user_id = user_id
- def _guess_user_id(self):
- return self.vcs.get_user_id()
- def _set_user_id(self, old_user_id, new_user_id):
- self._setup_user_id(new_user_id)
- self._prop_save_settings(old_user_id, new_user_id)
-
- @_versioned_property(name="user_id",
- doc=
-"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note
-that the Arch VCS backend *enforces* ids with this format.""",
- change_hook=_set_user_id,
- generator=_guess_user_id)
- def user_id(): return {}
-
- @_versioned_property(name="default_assignee",
- doc=
-"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
- def default_assignee(): return {}
-
- @_versioned_property(name="vcs_name",
- doc="""The name of the current VCS. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .vcs instead, and
-.vcs_name will be automatically adjusted.""",
- default="None",
- allowed=["None"]+vcs.VCS_ORDER)
- def vcs_name(): return {}
-
- def _get_vcs(self, vcs_name=None):
- """Get and root a new revision control system"""
- if vcs_name == None:
- vcs_name = self.vcs_name
- new_vcs = vcs.vcs_by_name(vcs_name)
- self._change_vcs(None, new_vcs)
- return new_vcs
- def _change_vcs(self, old_vcs, new_vcs):
- new_vcs.encoding = self.encoding
- new_vcs.root(self.root)
- self.vcs_name = new_vcs.name
-
- @Property
- @change_hook_property(hook=_change_vcs)
- @cached_property(generator=_get_vcs)
- @local_property("vcs")
- @doc_property(doc="A revision control system instance.")
- def vcs(): return {}
-
- def _bug_map_gen(self):
- map = {}
- for bug in self:
- map[bug.uuid] = bug
- for uuid in self.uuids():
- if uuid not in map:
- map[uuid] = None
- self._bug_map_value = map # ._bug_map_value used by @local_property
-
- def _extra_strings_check_fn(value):
- return utility.iterable_full_of_strings(value, \
- alternative=settings_object.EMPTY)
- def _extra_strings_change_hook(self, old, new):
- self.extra_strings.sort() # to make merging easier
- self._prop_save_settings(old, new)
- @_versioned_property(name="extra_strings",
- doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
- default=[],
- check_fn=_extra_strings_check_fn,
- change_hook=_extra_strings_change_hook,
- mutable=True)
- def extra_strings(): return {}
-
- @Property
- @primed_property(primer=_bug_map_gen)
- @local_property("bug_map")
- @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
- def _bug_map(): return {}
-
def _setup_severities(self, severities):
if severities not in [None, settings_object.EMPTY]:
bug.load_severities(severities)
@@ -295,278 +137,114 @@ settings easy. Don't set this attribute. Set .vcs instead, and
change_hook=_set_inactive_status)
def inactive_status(): return {}
+ def _extra_strings_check_fn(value):
+ return utility.iterable_full_of_strings(value, \
+ alternative=settings_object.EMPTY)
+ def _extra_strings_change_hook(self, old, new):
+ self.extra_strings.sort() # to make merging easier
+ self._prop_save_settings(old, new)
+ @_versioned_property(name="extra_strings",
+ doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
+ default=[],
+ check_fn=_extra_strings_check_fn,
+ change_hook=_extra_strings_change_hook,
+ mutable=True)
+ def extra_strings(): return {}
- def __init__(self, root=None, sink_to_existing_root=True,
- assert_new_BugDir=False, allow_vcs_init=False,
- manipulate_encodings=True, from_disk=False, vcs=None):
- list.__init__(self)
- settings_object.SavedSettingsObject.__init__(self)
- self._manipulate_encodings = manipulate_encodings
- if root == None:
- root = os.getcwd()
- if sink_to_existing_root == True:
- self.root = self._find_root(root)
- else:
- if not os.path.exists(root):
- self.root = None
- raise NoRootEntry(root)
- self.root = root
- # get a temporary vcs until we've loaded settings
- self.sync_with_disk = False
- self.vcs = self._guess_vcs()
-
- if from_disk == True:
- self.sync_with_disk = True
- self.load()
- else:
- self.sync_with_disk = False
- if assert_new_BugDir == True:
- if os.path.exists(self.get_path()):
- raise AlreadyInitialized, self.get_path()
- if vcs == None:
- vcs = self._guess_vcs(allow_vcs_init)
- self.vcs = vcs
- self._setup_user_id(self.user_id)
+ def _bug_map_gen(self):
+ map = {}
+ for bug in self:
+ map[bug.uuid] = bug
+ for uuid in self.uuids():
+ if uuid not in map:
+ map[uuid] = None
+ self._bug_map_value = map # ._bug_map_value used by @local_property
- def cleanup(self):
- self.vcs.cleanup()
+ @Property
+ @primed_property(primer=_bug_map_gen)
+ @local_property("bug_map")
+ @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
+ def _bug_map(): return {}
- # methods for getting the BugDir situated in the filesystem
+ def _get_user_id(self):
+ return "X"
- def _find_root(self, path):
- """
- Search for an existing bug database dir and it's ancestors and
- return a BugDir rooted there. Only called by __init__, and
- then only if sink_to_existing_root == True.
- """
- if not os.path.exists(path):
- self.root = None
- raise NoRootEntry(path)
- versionfile=utility.search_parent_directories(path,
- os.path.join(".be", "version"))
- if versionfile != None:
- beroot = os.path.dirname(versionfile)
- root = os.path.dirname(beroot)
- return root
+ def __init__(self, storage, uuid=None, from_storage=False):
+ list.__init__(self)
+ settings_object.SavedSettingsObject.__init__(self)
+ self.storage = storage
+ if from_storage == True:
+ self.load_settings()
else:
- beroot = utility.search_parent_directories(path, ".be")
- if beroot == None:
- self.root = None
- raise NoBugDir(path)
- return beroot
-
- def _guess_vcs(self, allow_vcs_init=False):
- """
- Only called by __init__.
- """
- deepdir = self.get_path()
- if not os.path.exists(deepdir):
- deepdir = os.path.dirname(deepdir)
- new_vcs = vcs.detect_vcs(deepdir)
- install = False
- if new_vcs.name == "None":
- if allow_vcs_init == True:
- new_vcs = vcs.installed_vcs()
- new_vcs.init(self.root)
- return new_vcs
+ if uuid == None:
+ self.uuid = libbe.util.id.uuid_gen()
+ self.settings = {}
+ self._setup_saved_settings()
+ if self.storage != None and self.storage.is_writeable():
+ self.save()
# methods for saving/loading/accessing settings and properties.
- def get_path(self, *args):
- """
- Return a path relative to .root.
- """
- dir = os.path.join(self.root, ".be")
- if len(args) == 0:
- return dir
- assert args[0] in ["version", "settings", "bugs"], str(args)
- return os.path.join(dir, *args)
-
- def _get_settings(self, settings_path, for_duplicate_bugdir=False):
- allow_no_vcs = not self.vcs.path_in_root(settings_path)
- if allow_no_vcs == True:
- assert for_duplicate_bugdir == True
- if self.sync_with_disk == False and for_duplicate_bugdir == False:
- # duplicates can ignore this bugdir's .sync_with_disk status
- raise DiskAccessRequired("_get settings")
- try:
- settings = mapfile.map_load(self.vcs, settings_path, allow_no_vcs)
- except vcs.NoSuchFile:
- settings = {"vcs_name": "None"}
- return settings
-
- def _save_settings(self, settings_path, settings,
- for_duplicate_bugdir=False):
- allow_no_vcs = not self.vcs.path_in_root(settings_path)
- if allow_no_vcs == True:
- assert for_duplicate_bugdir == True
- if self.sync_with_disk == False and for_duplicate_bugdir == False:
- # duplicates can ignore this bugdir's .sync_with_disk status
- raise DiskAccessRequired("_save settings")
- self.vcs.mkdir(self.get_path(), allow_no_vcs)
- mapfile.map_save(self.vcs, settings_path, settings, allow_no_vcs)
-
- def load_settings(self):
- self.settings = self._get_settings(self.get_path("settings"))
+ def id(self, *args):
+ assert len(args) <= 1, str(args)
+ if len(args) == 1:
+ assert args[0] in ['settings'], str(args)
+ return libbe.util.id.bugdir_id(self, *args)
+
+ def load_settings(self, settings_mapfile=None):
+ if settings_mapfile == None:
+ settings_mapfile = \
+ self.storage.get(self.id('settings'), default='\n')
+ self.settings = mapfile.parse(settings_mapfile)
self._setup_saved_settings()
self._setup_user_id(self.user_id)
self._setup_encoding(self.encoding)
self._setup_severities(self.severities)
self._setup_status(self.active_status, self.inactive_status)
- if self.vcs_name != self.vcs.name:
- self.vcs = vcs.vcs_by_name(self.vcs_name)
- self._setup_user_id(self.user_id)
def save_settings(self):
- settings = self._get_saved_settings()
- self._save_settings(self.get_path("settings"), settings)
-
- def get_version(self, path=None, use_none_vcs=False,
- for_duplicate_bugdir=False):
- """
- Requires disk access.
- """
- if self.sync_with_disk == False:
- raise DiskAccessRequired("get version")
- if use_none_vcs == True:
- VCS = vcs.vcs_by_name("None")
- VCS.root(self.root)
- VCS.encoding = encoding.get_encoding()
- else:
- VCS = self.vcs
-
- if path == None:
- path = self.get_path("version")
- allow_no_vcs = not VCS.path_in_root(path)
- if allow_no_vcs == True:
- assert for_duplicate_bugdir == True
- version = VCS.get_file_contents(
- path, allow_no_vcs=allow_no_vcs).rstrip("\n")
- return version
-
- def set_version(self):
- """
- Requires disk access.
- """
- if self.sync_with_disk == False:
- raise DiskAccessRequired("set version")
- self.vcs.mkdir(self.get_path())
- self.vcs.set_file_contents(self.get_path("version"),
- upgrade.BUGDIR_DISK_VERSION+"\n")
-
- # methods controlling disk access
-
- def set_sync_with_disk(self, value):
- """
- Adjust .sync_with_disk for the BugDir and all it's children.
- See the BugDir docstring for a description of the role of
- .sync_with_disk.
- """
- self.sync_with_disk = value
- for bug in self:
- bug.set_sync_with_disk(value)
-
- def load(self):
- """
- Reqires disk access
- """
- version = self.get_version(use_none_vcs=True)
- if version != upgrade.BUGDIR_DISK_VERSION:
- upgrade.upgrade(self.root, version)
- else:
- if not os.path.exists(self.get_path()):
- raise NoBugDir(self.get_path())
- self.load_settings()
+ mf = mapfile.generate(self._get_saved_settings())
+ self.storage.set(self.id('settings'), mf)
def load_all_bugs(self):
"""
- Requires disk access.
Warning: this could take a while.
"""
- if self.sync_with_disk == False:
- raise DiskAccessRequired("load all bugs")
self._clear_bugs()
for uuid in self.uuids():
self._load_bug(uuid)
def save(self):
"""
- Note that this command writes to disk _regardless_ of the
- status of .sync_with_disk.
-
- Save any loaded contents to disk. Because of lazy loading of
- bugs and comments, this is actually not too inefficient.
-
- However, if .sync_with_disk = True, then any changes are
- automatically written to disk as soon as they happen, so
- calling this method will just waste time (unless something
- else has been messing with your on-disk files).
+ Save any loaded contents to storage. Because of lazy loading
+ of bugs and comments, this is actually not too inefficient.
- Requires disk access.
+ However, if self.storage.is_writeable() == True, then any
+ changes are automatically written to storage as soon as they
+ happen, so calling this method will just waste time (unless
+ something else has been messing with your stored files).
"""
- sync_with_disk = self.sync_with_disk
- if sync_with_disk == False:
- self.set_sync_with_disk(True)
- self.set_version()
+ self.uuid = 'BD'
+ self.storage.add(self.id())
+ self.storage.add(self.id('settings'), parent=self.id())
self.save_settings()
for bug in self:
bug.save()
- if sync_with_disk == False:
- self.set_sync_with_disk(sync_with_disk)
-
- # methods for managing duplicate BugDirs
-
- def duplicate_bugdir(self, revision):
- duplicate_path = self.vcs.duplicate_repo(revision)
-
- duplicate_version_path = os.path.join(duplicate_path, ".be", "version")
- try:
- version = self.get_version(duplicate_version_path,
- for_duplicate_bugdir=True)
- except DiskAccessRequired:
- self.sync_with_disk = True # temporarily allow access
- version = self.get_version(duplicate_version_path,
- for_duplicate_bugdir=True)
- self.sync_with_disk = False
- if version != upgrade.BUGDIR_DISK_VERSION:
- upgrade.upgrade(duplicate_path, version)
-
- # setup revision VCS as None, since the duplicate may not be
- # initialized for versioning
- duplicate_settings_path = os.path.join(duplicate_path,
- ".be", "settings")
- duplicate_settings = self._get_settings(duplicate_settings_path,
- for_duplicate_bugdir=True)
- if "vcs_name" in duplicate_settings:
- duplicate_settings["vcs_name"] = "None"
- duplicate_settings["user_id"] = self.user_id
- if "disabled" in bug.status_values:
- # Hack to support old versions of BE bugs
- duplicate_settings["inactive_status"] = self.inactive_status
- self._save_settings(duplicate_settings_path, duplicate_settings,
- for_duplicate_bugdir=True)
-
- return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
-
- def remove_duplicate_bugdir(self):
- self.vcs.remove_duplicate_repo()
# methods for managing bugs
def uuids(self):
uuids = []
- if self.sync_with_disk == True and os.path.exists(self.get_path()):
- # list the uuids on disk
- if os.path.exists(self.get_path("bugs")):
- for uuid in os.listdir(self.get_path("bugs")):
- if not (uuid.startswith('.')):
- uuids.append(uuid)
- yield uuid
- # and the ones that are still just in memory
+ # list the uuids in memory
for bug in self:
- if bug.uuid not in uuids:
- uuids.append(bug.uuid)
- yield bug.uuid
+ uuids.append(bug.uuid)
+ yield bug.uuid
+ if self.storage != None and self.storage.is_readable():
+ # and the ones that are still just in storage
+ for id in self.storage.children(self.id()):
+ parsed = libbe.util.id.parse_id(id)
+ if parsed['type'] == 'bug' and parsed['bug'] not in uuids:
+ yield parsed['bug']
def _clear_bugs(self):
while len(self) > 0:
@@ -574,25 +252,20 @@ settings easy. Don't set this attribute. Set .vcs instead, and
self._bug_map_gen()
def _load_bug(self, uuid):
- if self.sync_with_disk == False:
- raise DiskAccessRequired("_load bug")
- bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
+ bg = bug.Bug(bugdir=self, uuid=uuid, from_storage=True)
self.append(bg)
self._bug_map_gen()
return bg
- def new_bug(self, uuid=None, summary=None):
- bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary)
- bg.set_sync_with_disk(self.sync_with_disk)
- if bg.sync_with_disk == True:
- bg.save()
+ def new_bug(self, summary=None, _uuid=None):
+ bg = bug.Bug(bugdir=self, uuid=_uuid, summary=summary)
self.append(bg)
self._bug_map_gen()
return bg
def remove_bug(self, bug):
self.remove(bug)
- if bug.sync_with_disk == True:
+ if self.storage.is_writeable():
bug.remove()
def bug_shortname(self, bug):
@@ -615,7 +288,7 @@ settings easy. Don't set this attribute. Set .vcs instead, and
def bug_from_shortname(self, shortname):
"""
- >>> bd = SimpleBugDir(sync_with_disk=False)
+ >>> bd = SimpleBugDir(memory=True)
>>> bug_a = bd.bug_from_shortname('a')
>>> print type(bug_a)
<class 'libbe.bug.Bug'>
@@ -649,175 +322,235 @@ settings easy. Don't set this attribute. Set .vcs instead, and
return False
return True
+ # methods for managing duplicate BugDirs
-class SimpleBugDir (BugDir):
- """
- For testing. Set sync_with_disk==False for a memory-only bugdir.
- >>> bugdir = SimpleBugDir()
- >>> uuids = list(bugdir.uuids())
- >>> uuids.sort()
- >>> print uuids
- ['a', 'b']
- >>> bugdir.cleanup()
- """
- def __init__(self, sync_with_disk=True):
- if sync_with_disk == True:
- dir = utility.Dir()
- assert os.path.exists(dir.path)
- root = dir.path
- assert_new_BugDir = True
- vcs_init = True
- else:
- root = "/"
- assert_new_BugDir = False
- vcs_init = False
- BugDir.__init__(self, root, sink_to_existing_root=False,
- assert_new_BugDir=assert_new_BugDir,
- allow_vcs_init=vcs_init,
- manipulate_encodings=False)
- if sync_with_disk == True: # postpone cleanup since dir.cleanup() removes dir.
- self._dir_ref = dir
- bug_a = self.new_bug("a", summary="Bug A")
- bug_a.creator = "John Doe <jdoe@example.com>"
- bug_a.time = 0
- bug_b = self.new_bug("b", summary="Bug B")
- bug_b.creator = "Jane Doe <jdoe@example.com>"
- bug_b.time = 0
- bug_b.status = "closed"
- if sync_with_disk == True:
- self.save()
- self.set_sync_with_disk(True)
- def cleanup(self):
- if hasattr(self, "_dir_ref"):
- self._dir_ref.cleanup()
- BugDir.cleanup(self)
+ def duplicate_bugdir(self, revision):
+ """
+ Duplicate bugdirs are read-only copies used for generating
+ diffs between revisions.
+ """
+ dbd = copy.copy(self)
+ dbd.storage = copy.copy(self.storage)
+ dbd._bug_map = copy.copy(self._bug_map)
+ dbd.storage.writeable = False
+ added,changed,removed = self.storage.changed_since(revision)
+ for id in added:
+
+ pass
+ for id in removed:
+ pass
+ for id in changed:
+ parsed = libbe.util.id.parse_id(id)
+ if parsed['type'] == 'bugdir':
+ assert parsed['remaining'] == ['settings'], parsed['remaining']
+ dbd._settings = copy.copy(self._settings)
+ mf = self.storage.get(self.id('settings'), default='\n',
+ revision=revision)
+ dbd.load_settings(mf)
+ else:
+ if parsed['bug'] not in self:
+ self._load_bug(parsed['bug'])
+ dbd._load_bug(parsed['bug'])
+ else:
+ bug = copy.copy(self._bug_map[parsed['bug']])
+ bug.settings = copy.copy(bug.settings)
+ dbd._bug_map[parsed['bug']] = bug
+ if parsed['type'] == 'bug':
+ assert parsed['remaining'] == ['values'], parsed['remaining']
+ mf = self.storage.get(self.id('values'), default='\n',
+ revision=revision)
+ bug.load_settings(mf)
+ elif parsed['type'] == 'comment':
+ assert parsed['remaining'] in [['values'], ['body']], \
+ parsed['remaining']
+ bug.comment_root = copy.deepcopy(bug.comment_root)
+ comment = bug.comment_from_uuid(parsed['comment'])
+ if parsed['remaining'] == ['values']:
+ mf = self.storage.get(self.id('values'), default='\n',
+ revision=revision)
+ comment.load_settings(mf)
+ else:
+ body = self.storage.get(self.id('body'), default='\n',
+ revision=revision)
+ comment.body = body
+ else:
+ assert 1==0, 'Unkown type "%s" for id "%s"' % (type, id)
+ dbd.storage.readable = False # so we won't read in added bugs, etc.
+ return dbd
if libbe.TESTING == True:
- class BugDirTestCase(unittest.TestCase):
- def setUp(self):
- self.dir = utility.Dir()
- self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
- allow_vcs_init=True)
- self.vcs = self.bugdir.vcs
- def tearDown(self):
- self.bugdir.cleanup()
- self.dir.cleanup()
- def fullPath(self, path):
- return os.path.join(self.dir.path, path)
- def assertPathExists(self, path):
- fullpath = self.fullPath(path)
- self.failUnless(os.path.exists(fullpath)==True,
- "path %s does not exist" % fullpath)
- self.assertRaises(AlreadyInitialized, BugDir,
- self.dir.path, assertNewBugDir=True)
- def versionTest(self):
- if self.vcs.versioned == False:
- return
- original = self.bugdir.vcs.commit("Began versioning")
- bugA = self.bugdir.bug_from_uuid("a")
- bugA.status = "fixed"
- self.bugdir.save()
- new = self.vcs.commit("Fixed bug a")
- dupdir = self.bugdir.duplicate_bugdir(original)
- self.failUnless(dupdir.root != self.bugdir.root,
- "%s, %s" % (dupdir.root, self.bugdir.root))
- bugAorig = dupdir.bug_from_uuid("a")
- self.failUnless(bugA != bugAorig,
- "\n%s\n%s" % (bugA.string(), bugAorig.string()))
- bugAorig.status = "fixed"
- self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
- "%s, %s" % (bugA.status, bugAorig.status))
- self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
- "%s, %s" % (bugA.severity, bugAorig.severity))
- self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
- "%s, %s" % (bugA.assigned, bugAorig.assigned))
- self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
- "%s, %s" % (bugA.time, bugAorig.time))
- self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
- "%s, %s" % (bugA.creator, bugAorig.creator))
- self.failUnless(bugA == bugAorig,
- "\n%s\n%s" % (bugA.string(), bugAorig.string()))
- self.bugdir.remove_duplicate_bugdir()
- self.failUnless(os.path.exists(dupdir.root)==False,
- str(dupdir.root))
- def testRun(self):
- self.bugdir.new_bug(uuid="a", summary="Ant")
- self.bugdir.new_bug(uuid="b", summary="Cockroach")
- self.bugdir.new_bug(uuid="c", summary="Praying mantis")
- length = len(self.bugdir)
- self.failUnless(length == 3, "%d != 3 bugs" % length)
- uuids = list(self.bugdir.uuids())
- self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids))
- self.failUnless(uuids == ["a","b","c"], str(uuids))
- bugA = self.bugdir.bug_from_uuid("a")
- bugAprime = self.bugdir.bug_from_shortname("a")
- self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
- self.bugdir.save()
- self.versionTest()
- def testComments(self, sync_with_disk=False):
- if sync_with_disk == True:
- self.bugdir.set_sync_with_disk(True)
- self.bugdir.new_bug(uuid="a", summary="Ant")
- bug = self.bugdir.bug_from_uuid("a")
- comm = bug.comment_root
- rep = comm.new_reply("Ants are small.")
- rep.new_reply("And they have six legs.")
- if sync_with_disk == False:
- self.bugdir.save()
- self.bugdir.set_sync_with_disk(True)
- self.bugdir._clear_bugs()
- bug = self.bugdir.bug_from_uuid("a")
- bug.load_comments()
- if sync_with_disk == False:
- self.bugdir.set_sync_with_disk(False)
- self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
- for index,comment in enumerate(bug.comments()):
- if index == 0:
- repLoaded = comment
- self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
- self.failUnless(comment.sync_with_disk == sync_with_disk,
- comment.sync_with_disk)
- self.failUnless(comment.content_type == "text/plain",
- comment.content_type)
- self.failUnless(repLoaded.settings["Content-type"] == \
- "text/plain",
- repLoaded.settings)
- self.failUnless(repLoaded.body == "Ants are small.",
- repLoaded.body)
- elif index == 1:
- self.failUnless(comment.in_reply_to == repLoaded.uuid,
- repLoaded.uuid)
- self.failUnless(comment.body == "And they have six legs.",
- comment.body)
- else:
- self.failIf(True,
- "Invalid comment: %d\n%s" % (index, comment))
- def testSyncedComments(self):
- self.testComments(sync_with_disk=True)
+ class SimpleBugDir (BugDir):
+ """
+ For testing. Set memory=True for a memory-only bugdir.
+ >>> bugdir = SimpleBugDir()
+ >>> uuids = list(bugdir.uuids())
+ >>> uuids.sort()
+ >>> print uuids
+ ['a', 'b']
+ >>> bugdir.cleanup()
+ """
+ def __init__(self, memory=True):
+ if memory == True:
+ storage = None
+ else:
+ dir = utility.Dir()
+ self._dir_ref = dir # postpone cleanup since dir.cleanup() removes dir.
+ storage = libbe.storage.base.Storage( \
+ os.path.join(dir.path, 'repo.pkl'))
+ storage.init()
+ storage.connect()
+ BugDir.__init__(self, storage=storage)
+ bug_a = self.new_bug(summary="Bug A", _uuid="a")
+ bug_a.creator = "John Doe <jdoe@example.com>"
+ bug_a.time = 0
+ bug_b = self.new_bug(summary="Bug B", _uuid="b")
+ bug_b.creator = "Jane Doe <jdoe@example.com>"
+ bug_b.time = 0
+ bug_b.status = "closed"
+ if self.storage != None:
+ self.storage.disconnect() # flush to storage
+ self.storage.connect()
+ def cleanup(self):
+ if self.storage != None:
+ self.storage.disconnect()
+ self.storage.destroy()
+ if hasattr(self, "_dir_ref"):
+ self._dir_ref.cleanup()
+
+# class BugDirTestCase(unittest.TestCase):
+# def setUp(self):
+# self.dir = utility.Dir()
+# self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+# allow_storage_init=True)
+# self.storage = self.bugdir.storage
+# def tearDown(self):
+# self.bugdir.cleanup()
+# self.dir.cleanup()
+# def fullPath(self, path):
+# return os.path.join(self.dir.path, path)
+# def assertPathExists(self, path):
+# fullpath = self.fullPath(path)
+# self.failUnless(os.path.exists(fullpath)==True,
+# "path %s does not exist" % fullpath)
+# self.assertRaises(AlreadyInitialized, BugDir,
+# self.dir.path, assertNewBugDir=True)
+# def versionTest(self):
+# if self.storage.versioned == False:
+# return
+# original = self.bugdir.storage.commit("Began versioning")
+# bugA = self.bugdir.bug_from_uuid("a")
+# bugA.status = "fixed"
+# self.bugdir.save()
+# new = self.storage.commit("Fixed bug a")
+# dupdir = self.bugdir.duplicate_bugdir(original)
+# self.failUnless(dupdir.root != self.bugdir.root,
+# "%s, %s" % (dupdir.root, self.bugdir.root))
+# bugAorig = dupdir.bug_from_uuid("a")
+# self.failUnless(bugA != bugAorig,
+# "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+# bugAorig.status = "fixed"
+# self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
+# "%s, %s" % (bugA.status, bugAorig.status))
+# self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
+# "%s, %s" % (bugA.severity, bugAorig.severity))
+# self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
+# "%s, %s" % (bugA.assigned, bugAorig.assigned))
+# self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
+# "%s, %s" % (bugA.time, bugAorig.time))
+# self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
+# "%s, %s" % (bugA.creator, bugAorig.creator))
+# self.failUnless(bugA == bugAorig,
+# "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+# self.bugdir.remove_duplicate_bugdir()
+# self.failUnless(os.path.exists(dupdir.root)==False,
+# str(dupdir.root))
+# def testRun(self):
+# self.bugdir.new_bug(uuid="a", summary="Ant")
+# self.bugdir.new_bug(uuid="b", summary="Cockroach")
+# self.bugdir.new_bug(uuid="c", summary="Praying mantis")
+# length = len(self.bugdir)
+# self.failUnless(length == 3, "%d != 3 bugs" % length)
+# uuids = list(self.bugdir.uuids())
+# self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids))
+# self.failUnless(uuids == ["a","b","c"], str(uuids))
+# bugA = self.bugdir.bug_from_uuid("a")
+# bugAprime = self.bugdir.bug_from_shortname("a")
+# self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
+# self.bugdir.save()
+# self.versionTest()
+# def testComments(self, sync_with_disk=False):
+# if sync_with_disk == True:
+# self.bugdir.set_sync_with_disk(True)
+# self.bugdir.new_bug(uuid="a", summary="Ant")
+# bug = self.bugdir.bug_from_uuid("a")
+# comm = bug.comment_root
+# rep = comm.new_reply("Ants are small.")
+# rep.new_reply("And they have six legs.")
+# if sync_with_disk == False:
+# self.bugdir.save()
+# self.bugdir.set_sync_with_disk(True)
+# self.bugdir._clear_bugs()
+# bug = self.bugdir.bug_from_uuid("a")
+# bug.load_comments()
+# if sync_with_disk == False:
+# self.bugdir.set_sync_with_disk(False)
+# self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
+# for index,comment in enumerate(bug.comments()):
+# if index == 0:
+# repLoaded = comment
+# self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
+# self.failUnless(comment.sync_with_disk == sync_with_disk,
+# comment.sync_with_disk)
+# self.failUnless(comment.content_type == "text/plain",
+# comment.content_type)
+# self.failUnless(repLoaded.settings["Content-type"] == \
+# "text/plain",
+# repLoaded.settings)
+# self.failUnless(repLoaded.body == "Ants are small.",
+# repLoaded.body)
+# elif index == 1:
+# self.failUnless(comment.in_reply_to == repLoaded.uuid,
+# repLoaded.uuid)
+# self.failUnless(comment.body == "And they have six legs.",
+# comment.body)
+# else:
+# self.failIf(True,
+# "Invalid comment: %d\n%s" % (index, comment))
+# def testSyncedComments(self):
+# self.testComments(sync_with_disk=True)
class SimpleBugDirTestCase (unittest.TestCase):
def setUp(self):
# create a pre-existing bugdir in a temporary directory
self.dir = utility.Dir()
- self.original_working_dir = os.getcwd()
- os.chdir(self.dir.path)
- self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
- allow_vcs_init=True)
- self.bugdir.new_bug("preexisting",summary="Hopefully not imported")
- self.bugdir.save()
+ self.storage = libbe.storage.base.Storage( \
+ os.path.join(self.dir.path, 'repo.pkl'))
+ self.storage.init()
+ self.storage.connect()
+ self.bugdir = BugDir(self.storage)
+ self.bugdir.new_bug(summary="Hopefully not imported",
+ _uuid="preexisting")
+ self.storage.disconnect()
+ self.storage.connect()
def tearDown(self):
- os.chdir(self.original_working_dir)
- self.bugdir.cleanup()
+ if self.storage != None:
+ self.storage.disconnect()
+ self.storage.destroy()
self.dir.cleanup()
def testOnDiskCleanLoad(self):
"""
- SimpleBugDir(sync_with_disk==True) should not import
+ SimpleBugDir(memory==False) should not import
preexisting bugs.
"""
- bugdir = SimpleBugDir(sync_with_disk=True)
- self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk)
+ bugdir = SimpleBugDir(memory=False)
+ self.failUnless(bugdir.storage.is_readable() == True,
+ bugdir.storage.is_readable())
+ self.failUnless(bugdir.storage.is_writeable() == True,
+ bugdir.storage.is_writeable())
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == ['a', 'b'], uuids)
+ self.storage.disconnect() # flush
+ self.storage.connect()
bugdir._clear_bugs()
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == [], uuids)
@@ -827,15 +560,13 @@ if libbe.TESTING == True:
bugdir.cleanup()
def testInMemoryCleanLoad(self):
"""
- SimpleBugDir(sync_with_disk==False) should not import
+ SimpleBugDir(memory==True) should not import
preexisting bugs.
"""
- bugdir = SimpleBugDir(sync_with_disk=False)
- self.failUnless(bugdir.sync_with_disk==False,
- bugdir.sync_with_disk)
+ bugdir = SimpleBugDir(memory=True)
+ self.failUnless(bugdir.storage == None, bugdir.storage)
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == ['a', 'b'], uuids)
- self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs)
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == ['a', 'b'], uuids)
bugdir._clear_bugs()
@@ -845,3 +576,27 @@ if libbe.TESTING == True:
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
+
+# def _get_settings(self, settings_path, for_duplicate_bugdir=False):
+# allow_no_storage = not self.storage.path_in_root(settings_path)
+# if allow_no_storage == True:
+# assert for_duplicate_bugdir == True
+# if self.sync_with_disk == False and for_duplicate_bugdir == False:
+# # duplicates can ignore this bugdir's .sync_with_disk status
+# raise DiskAccessRequired("_get settings")
+# try:
+# settings = mapfile.map_load(self.storage, settings_path, allow_no_storage)
+# except storage.NoSuchFile:
+# settings = {"storage_name": "None"}
+# return settings
+
+# def _save_settings(self, settings_path, settings,
+# for_duplicate_bugdir=False):
+# allow_no_storage = not self.storage.path_in_root(settings_path)
+# if allow_no_storage == True:
+# assert for_duplicate_bugdir == True
+# if self.sync_with_disk == False and for_duplicate_bugdir == False:
+# # duplicates can ignore this bugdir's .sync_with_disk status
+# raise DiskAccessRequired("_save settings")
+# self.storage.mkdir(self.get_path(), allow_no_storage)
+# mapfile.map_save(self.storage, settings_path, settings, allow_no_storage)
diff --git a/libbe/comment.py b/libbe/comment.py
index fc87c9d..e77235a 100644
--- a/libbe/comment.py
+++ b/libbe/comment.py
@@ -67,7 +67,7 @@ class DiskAccessRequired (Exception):
INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!"
-def loadComments(bug, load_full=False):
+def load_comments(bug, load_full=False):
"""
Set load_full=True when you want to load the comment completely
from disk *now*, rather than waiting and lazy loading as required.
@@ -88,7 +88,7 @@ def loadComments(bug, load_full=False):
bug.add_comments(comments)
return bug.comment_root
-def saveComments(bug):
+def save_comments(bug):
for comment in bug.comment_root.traverse():
comment.save()
@@ -155,10 +155,12 @@ class Comment(Tree, settings_object.SavedSettingsObject):
doc="An integer version of .date")
def _get_comment_body(self):
- if self.storage != None and self.storage.readable:
+ if self.storage != None and self.storage.is_readable() \
+ and self.uuid != INVALID_UUID:
return self.storage.get(self.id("body"),
decode=self.content_type.startswith("text/"))
def _set_comment_body(self, old=None, new=None, force=False):
+ assert self.uuid != INVALID_UUID, self
if (self.storage != None and self.storage.writeable == True) \
or force==True:
assert new != None, "Can't save empty comment"
@@ -195,17 +197,17 @@ class Comment(Tree, settings_object.SavedSettingsObject):
mutable=True)
def extra_strings(): return {}
- def __init__(self, bug=None, uuid=None, from_disk=False,
+ def __init__(self, bug=None, uuid=None, from_storage=False,
in_reply_to=None, body=None):
"""
- Set from_disk=True to load an old comment.
- Set from_disk=False to create a new comment.
+ Set from_storage=True to load an old comment.
+ Set from_storage=False to create a new comment.
- The uuid option is required when from_disk==True.
+ The uuid option is required when from_storage==True.
The in_reply_to and body options are only used if
- from_disk==False (the default). When from_disk==True, they are
- loaded from the bug database.
+ from_storage==False (the default). When from_storage==True,
+ they are loaded from the bug database.
in_reply_to should be the uuid string of the parent comment.
"""
@@ -213,14 +215,22 @@ class Comment(Tree, settings_object.SavedSettingsObject):
settings_object.SavedSettingsObject.__init__(self)
self.bug = bug
self.uuid = uuid
- if from_disk == False:
+ if from_storage == False:
if uuid == None:
self.uuid = libbe.util.id.uuid_gen()
self.settings = {}
self._setup_saved_settings()
+ if self.storage != None and self.storage.is_writeable():
+ self.storage.writeable = False
+ set_writeable = True
+ else:
+ set_writeable = False
self.time = int(time.time()) # only save to second precision
self.in_reply_to = in_reply_to
self.body = body
+ if set_writeable == True:
+ self.storage.writeable = True
+ self.save()
def __cmp__(self, other):
return cmp_full(self, other)
@@ -586,12 +596,15 @@ class Comment(Tree, settings_object.SavedSettingsObject):
def id(self, *args):
assert len(args) <= 1, str(args)
- assert args[0] in ["values", "body"], str(args)
- return libbe.util.id.comment_id(self, args)
-
- def load_settings(self):
- mf = self.storage.get(self.id("values"), default="\n")
- self.settings = mapfile.parse(mf)
+ if len(args) == 1:
+ assert args[0] in ["values", "body"], str(args)
+ return libbe.util.id.comment_id(self, *args)
+
+ def load_settings(self, settings_mapfile=None):
+ if settings_mapfile == None:
+ settings_mapfile = \
+ self.storage.get(self.id("values"), default="\n")
+ self.settings = mapfile.parse(settings_mapfile)
self._setup_saved_settings()
def save_settings(self):
@@ -607,11 +620,17 @@ class Comment(Tree, settings_object.SavedSettingsObject):
happen, so calling this method will just waste time (unless
something else has been messing with your stored files).
"""
+ if self.uuid == INVALID_UUID:
+ return
assert self.storage != None, "Can't save without storage"
assert self.body != None, "Can't save blank comment"
- self.storage.add(self.id())
- self.storage.add(self.id('values'))
- self.storage.add(self.id('body'))
+ if self.bug != None:
+ parent = self.bug.id()
+ else:
+ parent = None
+ self.storage.add(self.id(), parent=parent)
+ self.storage.add(self.id('values'), parent=self.id())
+ self.storage.add(self.id('body'), parent=self.id())
self.save_settings()
self._set_comment_body(new=self.body, force=True)
diff --git a/libbe/storage/base.py b/libbe/storage/base.py
index 3526462..eb2b94c 100644
--- a/libbe/storage/base.py
+++ b/libbe/storage/base.py
@@ -33,11 +33,11 @@ class InvalidRevision (KeyError):
class NotWriteable (NotSupported):
def __init__(self, msg):
- NotSupported.__init__('write', msg)
+ NotSupported.__init__(self, 'write', msg)
class NotReadable (NotSupported):
def __init__(self, msg):
- NotSupported.__init__('read', msg)
+ NotSupported.__init__(self, 'read', msg)
class EmptyCommit(Exception):
def __init__(self):
@@ -182,7 +182,11 @@ class Storage (object):
"""Add an entry"""
if self.is_writeable() == False:
raise NotWriteable('Cannot add entry to unwriteable storage.')
- self._add(*args, **kwargs)
+ try: # Maybe we've already added that id?
+ self.get(id)
+ pass # yup, no need to add another
+ except InvalidID:
+ self._add(*args, **kwargs)
def _add(self, id, parent=None):
if parent == None:
@@ -438,6 +442,15 @@ if TESTING == True:
def test_add_rooted(self):
"""
+ Adding entries with the same ID should not increase the number of children.
+ """
+ for i in range(10):
+ self.s.add('some id')
+ s = sorted(self.s.children())
+ self.failUnless(s == ['some id'], s)
+
+ def test_add_rooted(self):
+ """
Adding entries should increase the number of children (rooted).
"""
ids = []
diff --git a/libbe/storage/properties.py b/libbe/storage/properties.py
index f756ff0..ddd7b25 100644
--- a/libbe/storage/properties.py
+++ b/libbe/storage/properties.py
@@ -346,7 +346,7 @@ def change_hook_property(hook, mutable=False, default=None):
mutable value, and checking for changes whenever the property is
set (obviously) or retrieved (to check for external changes). So
long as you're conscientious about accessing the property after
- making external modifications, mutability woln't be a problem.
+ making external modifications, mutability won't be a problem.
t.x.append(5) # external modification
t.x # dummy access notices change and triggers hook
See testChangeHookMutableProperty for an example of the expected
diff --git a/libbe/storage/vcs/base.py b/libbe/storage/vcs/base.py
index 44643a4..abc7a01 100644
--- a/libbe/storage/vcs/base.py
+++ b/libbe/storage/vcs/base.py
@@ -116,6 +116,134 @@ class VCS(object):
The methods _u_*() are utility methods available to the _vcs_*()
methods.
+
+ Sink to existing root
+ ======================
+
+ Consider the following usage case:
+ You have a bug directory rooted in
+ /path/to/source
+ by which I mean the '.be' directory is at
+ /path/to/source/.be
+ However, you're of in some subdirectory like
+ /path/to/source/GUI/testing
+ and you want to comment on a bug. Setting sink_to_root=True wen
+ you initialize your BugDir will cause it to search for the '.be'
+ file in the ancestors of the path you passed in as 'root'.
+ /path/to/source/GUI/testing/.be miss
+ /path/to/source/GUI/.be miss
+ /path/to/source/.be hit!
+ So it still roots itself appropriately without much work for you.
+
+ File-system access
+ ==================
+
+ BugDirs live completely in memory when .sync_with_disk is False.
+ This is the default configuration setup by BugDir(from_disk=False).
+ If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
+ any changes to the BugDir will be immediately written to disk.
+
+ If you want to change .sync_with_disk, we suggest you use
+ .set_sync_with_disk(), which propogates the new setting through to
+ all bugs/comments/etc. that have been loaded into memory. If
+ you've been living in memory and want to move to
+ .sync_with_disk==True, but you're not sure if anything has been
+ changed in memory, a call to .save() immediately before the
+ .set_sync_with_disk(True) call is a safe move.
+
+ Regardless of .sync_with_disk, a call to .save() will write out
+ all the contents that the BugDir instance has loaded into memory.
+ If sync_with_disk has been True over the course of all interesting
+ changes, this .save() call will be a waste of time.
+
+ The BugDir will only load information from the file system when it
+ loads new settings/bugs/comments that it doesn't already have in
+ memory and .sync_with_disk == True.
+
+ Allow storage initialization
+ ========================
+
+ This one is for testing purposes. Setting it to True allows the
+ BugDir to search for an installed Storage backend and initialize
+ it in the root directory. This is a convenience option for
+ supporting tests of versioning functionality
+ (e.g. .duplicate_bugdir).
+
+ Disable encoding manipulation
+ =============================
+
+ This one is for testing purposed. You might have non-ASCII
+ Unicode in your bugs, comments, files, etc. BugDir instances try
+ and support your preferred encoding scheme (e.g. "utf-8") when
+ dealing with stream and file input/output. For stream output,
+ this involves replacing sys.stdout and sys.stderr
+ (libbe.encode.set_IO_stream_encodings). However this messes up
+ doctest's output catching. In order to support doctest tests
+ using BugDirs, set manipulate_encodings=False, and stick to ASCII
+ in your tests.
+
+ if root == None:
+ root = os.getcwd()
+ if sink_to_existing_root == True:
+ self.root = self._find_root(root)
+ else:
+ if not os.path.exists(root):
+ self.root = None
+ raise NoRootEntry(root)
+ self.root = root
+ # get a temporary storage until we've loaded settings
+ self.sync_with_disk = False
+ self.storage = self._guess_storage()
+
+ if assert_new_BugDir == True:
+ if os.path.exists(self.get_path()):
+ raise AlreadyInitialized, self.get_path()
+ if storage == None:
+ storage = self._guess_storage(allow_storage_init)
+ self.storage = storage
+ self._setup_user_id(self.user_id)
+
+
+ # methods for getting the BugDir situated in the filesystem
+
+ def _find_root(self, path):
+ """
+ Search for an existing bug database dir and it's ancestors and
+ return a BugDir rooted there. Only called by __init__, and
+ then only if sink_to_existing_root == True.
+ """
+ if not os.path.exists(path):
+ self.root = None
+ raise NoRootEntry(path)
+ versionfile=utility.search_parent_directories(path,
+ os.path.join(".be", "version"))
+ if versionfile != None:
+ beroot = os.path.dirname(versionfile)
+ root = os.path.dirname(beroot)
+ return root
+ else:
+ beroot = utility.search_parent_directories(path, ".be")
+ if beroot == None:
+ self.root = None
+ raise NoBugDir(path)
+ return beroot
+
+ def _guess_storage(self, allow_storage_init=False):
+ """
+ Only called by __init__.
+ """
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ new_storage = storage.detect_storage(deepdir)
+ install = False
+ if new_storage.name == "None":
+ if allow_storage_init == True:
+ new_storage = storage.installed_storage()
+ new_storage.init(self.root)
+ return new_storage
+
+os.listdir(self.get_path("bugs")):
"""
name = "None"
client = "" # command-line tool for _u_invoke_client
@@ -633,6 +761,34 @@ class VCS(object):
body = None
f.close()
return (summary, body)
+
+ def check_disk_version(self):
+ version = self.get_version()
+ if version != upgrade.BUGDIR_DISK_VERSION:
+ upgrade.upgrade(self.root, version)
+
+ def disk_version(self, path=None, use_none_vcs=False,
+ for_duplicate_bugdir=False):
+ """
+ Requires disk access.
+ """
+ if path == None:
+ path = self.get_path("version")
+ allow_no_vcs = not VCS.path_in_root(path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ return self.get(path, allow_no_vcs=allow_no_vcs).rstrip("\n")
+
+ def set_disk_version(self):
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("set version")
+ self.vcs.mkdir(self.get_path())
+ self.vcs.set_file_contents(self.get_path("version"),
+ upgrade.BUGDIR_DISK_VERSION+"\n")
+
if libbe.TESTING == True:
diff --git a/libbe/util/encoding.py b/libbe/util/encoding.py
index d09117f..21e40cf 100644
--- a/libbe/util/encoding.py
+++ b/libbe/util/encoding.py
@@ -1,4 +1,3 @@
-# Bugs Everywhere, a distributed bugtracker
# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it>
# W. Trevor King <wking@drexel.edu>
#
@@ -62,5 +61,27 @@ def set_IO_stream_encodings(encoding):
sys.stdout = codecs.getwriter(encoding)(sys.__stdout__)
sys.stderr = codecs.getwriter(encoding)(sys.__stderr__)
+
+ def _guess_encoding(self):
+ return encoding.get_encoding()
+ def _check_encoding(value):
+ if value != None:
+ return encoding.known_encoding(value)
+ def _setup_encoding(self, new_encoding):
+ # change hook called before generator.
+ if new_encoding not in [None, settings_object.EMPTY]:
+ if self._manipulate_encodings == True:
+ encoding.set_IO_stream_encodings(new_encoding)
+ def _set_encoding(self, old_encoding, new_encoding):
+ self._setup_encoding(new_encoding)
+ self._prop_save_settings(old_encoding, new_encoding)
+
+ @_versioned_property(name="encoding",
+ doc="""The default input/output encoding to use (e.g. "utf-8").""",
+ change_hook=_set_encoding,
+ generator=_guess_encoding,
+ check_fn=_check_encoding)
+ def encoding(): return {}
+
if libbe.TESTING == True:
suite = doctest.DocTestSuite()
diff --git a/libbe/util/id.py b/libbe/util/id.py
index 0f1576c..d57205f 100644
--- a/libbe/util/id.py
+++ b/libbe/util/id.py
@@ -19,6 +19,8 @@
Handle ID creation and parsing.
"""
+import os.path
+
import libbe
if libbe.TESTING == True:
@@ -59,6 +61,7 @@ except ImportError:
def _assemble(*args):
+ args = list(args)
for i,arg in enumerate(args):
if arg == None:
args[i] = ''
@@ -71,31 +74,41 @@ def _split(id):
args[i] = None
return args
+def _is_a_uuid(id):
+ if id.startswith('uuid:'):
+ return True
+ return False
+
+def _uuid_to_id(id):
+ return 'uuid:' + id
+
+def _id_to_uuid(id):
+ return id[len('uuid:'):]
def bugdir_id(bugdir, *args):
- return _assemble(bugdir.uuid, args)
+ return _assemble(_uuid_to_id(bugdir.uuid), *args)
def bug_id(bug, *args):
- if bug.bug == None:
- bugdir_id = None
+ if bug.bugdir == None:
+ bdid = None
else:
- bugdir_id = bugdir_id(bug.bugdir)
- return _assemble(bugdir_id, bug.uuid, args)
+ bdid = bugdir_id(bug.bugdir)
+ return _assemble(bdid, _uuid_to_id(bug.uuid), *args)
def comment_id(comment, *args):
if comment.bug == None:
- bug_id = None
+ bid = None
else:
- bug_id = bug_id(comment.bug)
- return _assemble(bug_id, comment.uuid, args)
+ bid = bug_id(comment.bug)
+ return _assemble(bid, _uuid_to_id(comment.uuid), *args)
def parse_id(id):
args = _split(id)
- ret = {'bugdir':args.pop(0)}
+ ret = {'bugdir':_id_to_uuid(args.pop(0))}
type = 'bugdir'
for child_name in ['bug', 'comment']:
- if len(args) > 0 and is_a_uuid(args[0]):
- ret[child_name] = args.pop(0)
+ if len(args) > 0 and _is_a_uuid(args[0]):
+ ret[child_name] = _id_to_uuid(args.pop(0))
type = child_name
ret['type'] = type
ret['remaining'] = os.path.join(args)