aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
Diffstat (limited to 'libbe')
-rw-r--r--libbe/arch.py67
-rw-r--r--libbe/beuuid.py2
-rw-r--r--libbe/bug.py117
-rw-r--r--libbe/bugdir.py476
-rw-r--r--libbe/bzr.py52
-rw-r--r--libbe/cmdutil.py19
-rw-r--r--libbe/comment.py306
-rw-r--r--libbe/config.py6
-rw-r--r--libbe/darcs.py93
-rw-r--r--libbe/diff.py486
-rw-r--r--libbe/editor.py9
-rw-r--r--libbe/encoding.py10
-rw-r--r--libbe/git.py78
-rw-r--r--libbe/hg.py63
-rw-r--r--libbe/mapfile.py39
-rw-r--r--libbe/plugin.py6
-rw-r--r--libbe/properties.py10
-rw-r--r--libbe/rcs.py876
-rw-r--r--libbe/settings_object.py11
-rw-r--r--libbe/tree.py4
-rw-r--r--libbe/upgrade.py187
-rw-r--r--libbe/utility.py5
-rw-r--r--libbe/vcs.py938
-rw-r--r--libbe/version.py50
24 files changed, 2449 insertions, 1461 deletions
diff --git a/libbe/arch.py b/libbe/arch.py
index 2f45aa9..ab55172 100644
--- a/libbe/arch.py
+++ b/libbe/arch.py
@@ -17,6 +17,10 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+GNU Arch (tla) backend.
+"""
+
import codecs
import os
import re
@@ -26,10 +30,11 @@ import time
import unittest
import doctest
-import config
from beuuid import uuid_gen
-import rcs
-from rcs import RCS
+import config
+import vcs
+
+
DEFAULT_CLIENT = "tla"
@@ -38,7 +43,7 @@ client = config.get_val("arch_client", default=DEFAULT_CLIENT)
def new():
return Arch()
-class Arch(RCS):
+class Arch(vcs.VCS):
name = "Arch"
client = client
versioned = True
@@ -48,21 +53,25 @@ class Arch(RCS):
_project_name = None
_tmp_project = False
_arch_paramdir = os.path.expanduser("~/.arch-params")
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
"""Detect whether a directory is revision-controlled using Arch"""
if self._u_search_parent_directories(path, "{arch}") != None :
config.set_val("arch_client", client)
return True
return False
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._create_archive(path)
self._create_project(path)
self._add_project_code(path)
def _create_archive(self, path):
- # Create a new archive
+ """
+ Create a temporary Arch archive in the directory PATH. This
+ archive will be removed by
+ __del__->cleanup->_vcs_cleanup->_remove_archive
+ """
# http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
assert self._archive_name == None
id = self.get_user_id()
@@ -103,7 +112,7 @@ class Arch(RCS):
"""
Create a temporary Arch project in the directory PATH. This
project will be removed by
- __del__->cleanup->_rcs_cleanup->_remove_project
+ __del__->cleanup->_vcs_cleanup->_remove_project
"""
# http://mwolson.org/projects/GettingStartedWithArch.html
# http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
@@ -159,13 +168,13 @@ class Arch(RCS):
self._adjust_naming_conventions(path)
self._invoke_client("import", "--summary", "Began versioning",
directory=path)
- def _rcs_cleanup(self):
+ def _vcs_cleanup(self):
if self._tmp_project == True:
self._remove_project()
if self._tmp_archive == True:
self._remove_archive()
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
if not os.path.isdir(path):
dirname = os.path.dirname(path)
else:
@@ -176,7 +185,6 @@ class Arch(RCS):
self._get_archive_project_name(root)
return root
-
def _get_archive_name(self, root):
status,output,error = self._u_invoke_client("archives")
lines = output.split('\n')
@@ -188,7 +196,6 @@ class Arch(RCS):
if os.path.realpath(location) == os.path.realpath(root):
self._archive_name = archive
assert self._archive_name != None
-
def _get_archive_project_name(self, root):
# get project names
status,output,error = self._u_invoke_client("tree-version", directory=root)
@@ -197,7 +204,7 @@ class Arch(RCS):
archive_name,project_name = output.rstrip('\n').split('/')
self._archive_name = archive_name
self._project_name = project_name
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
try:
status,output,error = self._u_invoke_client('my-id')
return output.rstrip('\n')
@@ -206,9 +213,9 @@ class Arch(RCS):
return None
else:
raise
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
self._u_invoke_client('my-id', value)
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
self._u_invoke_client("add-id", path)
realpath = os.path.realpath(self._u_abspath(path))
pathAdded = realpath in self._list_added(self.rootdir)
@@ -237,14 +244,14 @@ class Arch(RCS):
self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
if os.path.realpath(path) not in self._list_added(self.rootdir):
raise CantAddFile(path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
if not '.arch-ids' in path:
self._u_invoke_client("delete-id", path)
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
pass
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
else:
status,output,error = \
self._invoke_client("file-find", path, revision)
@@ -254,18 +261,18 @@ class Arch(RCS):
contents = f.read()
f.close()
return contents
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision == None:
- RCS._rcs_duplicate_repo(self, directory, revision)
+ vcs.VCS._vcs_duplicate_repo(self, directory, revision)
else:
status,output,error = \
self._u_invoke_client("get", revision,directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
if allow_empty == False:
# arch applies empty commits without complaining, so check first
status,output,error = self._u_invoke_client("changes",expect=(0,1))
if status == 0:
- raise rcs.EmptyCommit()
+ raise vcs.EmptyCommit()
summary,body = self._u_parse_commitfile(commitfile)
args = ["commit", "--summary", summary]
if body != None:
@@ -281,6 +288,16 @@ class Arch(RCS):
assert revpath.startswith(self._archive_project_name()+'--')
revision = revpath[len(self._archive_project_name()+'--'):]
return revpath
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client("logs")
+ logs = output.splitlines()
+ first_log = logs.pop(0)
+ assert first_log == "base-0", first_log
+ try:
+ log = logs[index]
+ except IndexError:
+ return None
+ return "%s--%s" % (self._archive_project_name(), log)
class CantAddFile(Exception):
def __init__(self, file):
@@ -289,7 +306,7 @@ class CantAddFile(Exception):
-rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Arch, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/beuuid.py b/libbe/beuuid.py
index bc47208..490ed62 100644
--- a/libbe/beuuid.py
+++ b/libbe/beuuid.py
@@ -13,6 +13,7 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
"""
Backwards compatibility support for Python 2.4. Once people give up
on 2.4 ;), the uuid call should be merged into bugdir.py
@@ -20,6 +21,7 @@ on 2.4 ;), the uuid call should be merged into bugdir.py
import unittest
+
try:
from uuid import uuid4 # Python >= 2.5
def uuid_gen():
diff --git a/libbe/bug.py b/libbe/bug.py
index c1e5481..fd30ff7 100644
--- a/libbe/bug.py
+++ b/libbe/bug.py
@@ -15,6 +15,11 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the Bug class for representing bugs.
+"""
+
import os
import os.path
import errno
@@ -33,6 +38,11 @@ import comment
import utility
+class DiskAccessRequired (Exception):
+ def __init__(self, goal):
+ msg = "Cannot %s without accessing the disk" % goal
+ Exception.__init__(self, msg)
+
### Define and describe valid bug categories
# Use a tuple of (category, description) tuples since we don't have
# ordered dicts in Python yet http://www.python.org/dev/peps/pep-0372/
@@ -216,15 +226,15 @@ class Bug(settings_object.SavedSettingsObject):
@doc_property(doc="The trunk of the comment tree")
def comment_root(): return {}
- def _get_rcs(self):
- if hasattr(self.bugdir, "rcs"):
- return self.bugdir.rcs
+ def _get_vcs(self):
+ if hasattr(self.bugdir, "vcs"):
+ return self.bugdir.vcs
@Property
- @cached_property(generator=_get_rcs)
- @local_property("rcs")
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
@doc_property(doc="A revision control system instance.")
- def rcs(): return {}
+ def vcs(): return {}
def __init__(self, bugdir=None, uuid=None, from_disk=False,
load_comments=False, summary=None):
@@ -238,17 +248,20 @@ class Bug(settings_object.SavedSettingsObject):
if uuid == None:
self.uuid = uuid_gen()
self.time = int(time.time()) # only save to second precision
- if self.rcs != None:
- self.creator = self.rcs.get_user_id()
+ if self.vcs != None:
+ self.creator = self.vcs.get_user_id()
self.summary = summary
def __repr__(self):
return "Bug(uuid=%r)" % self.uuid
- def set_sync_with_disk(self, value):
- self.sync_with_disk = value
- for comment in self.comments():
- comment.set_sync_with_disk(value)
+ def __str__(self):
+ return self.string(shortlist=True)
+
+ def __cmp__(self, other):
+ return cmp_full(self, other)
+
+ # serializing methods
def _setting_attr_string(self, setting):
value = getattr(self, setting)
@@ -331,43 +344,34 @@ class Bug(settings_object.SavedSettingsObject):
output = bugout
return output
- def __str__(self):
- return self.string(shortlist=True)
+ # methods for saving/loading/acessing settings and properties.
- def __cmp__(self, other):
- return cmp_full(self, other)
+ def get_path(self, *args):
+ dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid)
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["values", "comments"], str(args)
+ return os.path.join(dir, *args)
- def get_path(self, name=None):
- my_dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid)
- if name is None:
- return my_dir
- assert name in ["values", "comments"]
- return os.path.join(my_dir, name)
+ def set_sync_with_disk(self, value):
+ self.sync_with_disk = value
+ for comment in self.comments():
+ comment.set_sync_with_disk(value)
def load_settings(self):
- self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load settings")
+ self.settings = mapfile.map_load(self.vcs, self.get_path("values"))
self._setup_saved_settings()
- def load_comments(self, load_full=True):
- if load_full == True:
- # Force a complete load of the whole comment tree
- self.comment_root = self._get_comment_root(load_full=True)
- else:
- # Setup for fresh lazy-loading. Clear _comment_root, so
- # _get_comment_root returns a fresh version. Turn of
- # syncing temporarily so we don't write our blank comment
- # tree to disk.
- self.sync_with_disk = False
- self.comment_root = None
- self.sync_with_disk = True
-
def save_settings(self):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("save settings")
assert self.summary != None, "Can't save blank bug"
-
- self.rcs.mkdir(self.get_path())
+ self.vcs.mkdir(self.get_path())
path = self.get_path("values")
- mapfile.map_save(self.rcs, path, self._get_saved_settings())
-
+ mapfile.map_save(self.vcs, path, self._get_saved_settings())
+
def save(self):
"""
Save any loaded contents to disk. Because of lazy loading of
@@ -378,15 +382,39 @@ class Bug(settings_object.SavedSettingsObject):
calling this method will just waste time (unless something
else has been messing with your on-disk files).
"""
+ sync_with_disk = self.sync_with_disk
+ if sync_with_disk == False:
+ self.set_sync_with_disk(True)
self.save_settings()
if len(self.comment_root) > 0:
comment.saveComments(self)
+ if sync_with_disk == False:
+ self.set_sync_with_disk(False)
+
+ def load_comments(self, load_full=True):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load comments")
+ if load_full == True:
+ # Force a complete load of the whole comment tree
+ self.comment_root = self._get_comment_root(load_full=True)
+ else:
+ # Setup for fresh lazy-loading. Clear _comment_root, so
+ # _get_comment_root returns a fresh version. Turn of
+ # syncing temporarily so we don't write our blank comment
+ # tree to disk.
+ self.sync_with_disk = False
+ self.comment_root = None
+ self.sync_with_disk = True
def remove(self):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("remove")
self.comment_root.remove()
path = self.get_path()
- self.rcs.recursive_remove(path)
+ self.vcs.recursive_remove(path)
+ # methods for managing comments
+
def comments(self):
for comment in self.comment_root.traverse():
yield comment
@@ -489,8 +517,12 @@ def cmp_attr(bug_1, bug_2, attr, invert=False):
return cmp(val_1, val_2)
# alphabetical rankings (a < z)
+cmp_uuid = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "uuid")
cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator")
cmp_assigned = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "assigned")
+cmp_target = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "target")
+cmp_reporter = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "reporter")
+cmp_summary = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "summary")
# chronological rankings (newer < older)
cmp_time = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "time", invert=True)
@@ -512,7 +544,8 @@ def cmp_comments(bug_1, bug_2):
return 0
DEFAULT_CMP_FULL_CMP_LIST = \
- (cmp_status,cmp_severity,cmp_assigned,cmp_time,cmp_creator,cmp_comments)
+ (cmp_status, cmp_severity, cmp_assigned, cmp_time, cmp_creator,
+ cmp_reporter, cmp_target, cmp_comments, cmp_summary, cmp_uuid)
class BugCompoundComparator (object):
def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST):
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index 6e020ee..c4f0f91 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -17,23 +17,30 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the BugDir class for representing bug comments.
+"""
+
+import copy
+import errno
import os
import os.path
-import errno
+import sys
import time
-import copy
import unittest
import doctest
+import bug
+import encoding
from properties import Property, doc_property, local_property, \
defaulting_property, checked_property, fn_checked_property, \
cached_property, primed_property, change_hook_property, \
settings_property
-import settings_object
import mapfile
-import bug
-import rcs
-import encoding
+import vcs
+import settings_object
+import upgrade
import utility
@@ -62,8 +69,16 @@ class MultipleBugMatches(ValueError):
self.shortname = shortname
self.matches = matches
+class NoBugMatches(KeyError):
+ def __init__(self, shortname):
+ msg = "No bug matches %s" % shortname
+ KeyError.__init__(self, msg)
+ self.shortname = shortname
-TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
+class DiskAccessRequired (Exception):
+ def __init__(self, goal):
+ msg = "Cannot %s without accessing the disk" % goal
+ Exception.__init__(self, msg)
class BugDir (list, settings_object.SavedSettingsObject):
@@ -99,7 +114,8 @@ class BugDir (list, settings_object.SavedSettingsObject):
all bugs/comments/etc. that have been loaded into memory. If
you've been living in memory and want to move to
.sync_with_disk==True, but you're not sure if anything has been
- changed in memoryy, a call to save() is a safe move.
+ changed in memory, a call to save() immediately before the
+ .set_sync_with_disk(True) call is a safe move.
Regardless of .sync_with_disk, a call to .save() will write out
all the contents that the BugDir instance has loaded into memory.
@@ -107,15 +123,14 @@ class BugDir (list, settings_object.SavedSettingsObject):
changes, this .save() call will be a waste of time.
The BugDir will only load information from the file system when it
- loads new bugs/comments that it doesn't already have in memory, or
- when it explicitly asked to do so (e.g. .load() or
- __init__(from_disk=True)).
+ loads new settings/bugs/comments that it doesn't already have in
+ memory and .sync_with_disk == True.
- Allow RCS initialization
+ Allow VCS initialization
========================
This one is for testing purposes. Setting it to True allows the
- BugDir to search for an installed RCS backend and initialize it in
+ BugDir to search for an installed VCS backend and initialize it in
the root directory. This is a convenience option for supporting
tests of versioning functionality (e.g. .duplicate_bugdir).
@@ -172,9 +187,9 @@ class BugDir (list, settings_object.SavedSettingsObject):
def encoding(): return {}
def _setup_user_id(self, user_id):
- self.rcs.user_id = user_id
+ self.vcs.user_id = user_id
def _guess_user_id(self):
- return self.rcs.get_user_id()
+ return self.vcs.get_user_id()
def _set_user_id(self, old_user_id, new_user_id):
self._setup_user_id(new_user_id)
self._prop_save_settings(old_user_id, new_user_id)
@@ -182,7 +197,7 @@ class BugDir (list, settings_object.SavedSettingsObject):
@_versioned_property(name="user_id",
doc=
"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note
-that the Arch RCS backend *enforces* ids with this format.""",
+that the Arch VCS backend *enforces* ids with this format.""",
change_hook=_set_user_id,
generator=_guess_user_id)
def user_id(): return {}
@@ -192,32 +207,32 @@ that the Arch RCS backend *enforces* ids with this format.""",
"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
def default_assignee(): return {}
- @_versioned_property(name="rcs_name",
- doc="""The name of the current RCS. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .rcs instead, and
-.rcs_name will be automatically adjusted.""",
+ @_versioned_property(name="vcs_name",
+ doc="""The name of the current VCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .vcs instead, and
+.vcs_name will be automatically adjusted.""",
default="None",
allowed=["None", "Arch", "bzr", "darcs", "git", "hg"])
- def rcs_name(): return {}
+ def vcs_name(): return {}
- def _get_rcs(self, rcs_name=None):
+ def _get_vcs(self, vcs_name=None):
"""Get and root a new revision control system"""
- if rcs_name == None:
- rcs_name = self.rcs_name
- new_rcs = rcs.rcs_by_name(rcs_name)
- self._change_rcs(None, new_rcs)
- return new_rcs
- def _change_rcs(self, old_rcs, new_rcs):
- new_rcs.encoding = self.encoding
- new_rcs.root(self.root)
- self.rcs_name = new_rcs.name
+ if vcs_name == None:
+ vcs_name = self.vcs_name
+ new_vcs = vcs.vcs_by_name(vcs_name)
+ self._change_vcs(None, new_vcs)
+ return new_vcs
+ def _change_vcs(self, old_vcs, new_vcs):
+ new_vcs.encoding = self.encoding
+ new_vcs.root(self.root)
+ self.vcs_name = new_vcs.name
@Property
- @change_hook_property(hook=_change_rcs)
- @cached_property(generator=_get_rcs)
- @local_property("rcs")
+ @change_hook_property(hook=_change_vcs)
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
@doc_property(doc="A revision control system instance.")
- def rcs(): return {}
+ def vcs(): return {}
def _bug_map_gen(self):
map = {}
@@ -279,9 +294,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and
def __init__(self, root=None, sink_to_existing_root=True,
- assert_new_BugDir=False, allow_rcs_init=False,
- manipulate_encodings=True,
- from_disk=False, rcs=None):
+ assert_new_BugDir=False, allow_vcs_init=False,
+ manipulate_encodings=True, from_disk=False, vcs=None):
list.__init__(self)
settings_object.SavedSettingsObject.__init__(self)
self._manipulate_encodings = manipulate_encodings
@@ -293,9 +307,9 @@ settings easy. Don't set this attribute. Set .rcs instead, and
if not os.path.exists(root):
raise NoRootEntry(root)
self.root = root
- # get a temporary rcs until we've loaded settings
+ # get a temporary vcs until we've loaded settings
self.sync_with_disk = False
- self.rcs = self._guess_rcs()
+ self.vcs = self._guess_vcs()
if from_disk == True:
self.sync_with_disk = True
@@ -305,20 +319,24 @@ settings easy. Don't set this attribute. Set .rcs instead, and
if assert_new_BugDir == True:
if os.path.exists(self.get_path()):
raise AlreadyInitialized, self.get_path()
- if rcs == None:
- rcs = self._guess_rcs(allow_rcs_init)
- self.rcs = rcs
+ if vcs == None:
+ vcs = self._guess_vcs(allow_vcs_init)
+ self.vcs = vcs
self._setup_user_id(self.user_id)
- def set_sync_with_disk(self, value):
- self.sync_with_disk = value
- for bug in self:
- bug.set_sync_with_disk(value)
+ def __del__(self):
+ self.cleanup()
+
+ def cleanup(self):
+ self.vcs.cleanup()
+
+ # methods for getting the BugDir situated in the filesystem
def _find_root(self, path):
"""
Search for an existing bug database dir and it's ancestors and
- return a BugDir rooted there.
+ return a BugDir rooted there. Only called by __init__, and
+ then only if sink_to_existing_root == True.
"""
if not os.path.exists(path):
raise NoRootEntry(path)
@@ -334,136 +352,212 @@ settings easy. Don't set this attribute. Set .rcs instead, and
raise NoBugDir(path)
return beroot
- def get_version(self, path=None, use_none_rcs=False):
- if use_none_rcs == True:
- RCS = rcs.rcs_by_name("None")
- RCS.root(self.root)
- RCS.encoding = encoding.get_encoding()
+ def _guess_vcs(self, allow_vcs_init=False):
+ """
+ Only called by __init__.
+ """
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ new_vcs = vcs.detect_vcs(deepdir)
+ install = False
+ if new_vcs.name == "None":
+ if allow_vcs_init == True:
+ new_vcs = vcs.installed_vcs()
+ new_vcs.init(self.root)
+ return new_vcs
+
+ # methods for saving/loading/accessing settings and properties.
+
+ def get_path(self, *args):
+ """
+ Return a path relative to .root.
+ """
+ dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(dir, *args)
+
+ def _get_settings(self, settings_path, for_duplicate_bugdir=False):
+ allow_no_vcs = not self.vcs.path_in_root(settings_path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ if self.sync_with_disk == False and for_duplicate_bugdir == False:
+ # duplicates can ignore this bugdir's .sync_with_disk status
+ raise DiskAccessRequired("_get settings")
+ try:
+ settings = mapfile.map_load(self.vcs, settings_path, allow_no_vcs)
+ except vcs.NoSuchFile:
+ settings = {"vcs_name": "None"}
+ return settings
+
+ def _save_settings(self, settings_path, settings,
+ for_duplicate_bugdir=False):
+ allow_no_vcs = not self.vcs.path_in_root(settings_path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ if self.sync_with_disk == False and for_duplicate_bugdir == False:
+ # duplicates can ignore this bugdir's .sync_with_disk status
+ raise DiskAccessRequired("_save settings")
+ self.vcs.mkdir(self.get_path(), allow_no_vcs)
+ mapfile.map_save(self.vcs, settings_path, settings, allow_no_vcs)
+
+ def load_settings(self):
+ self.settings = self._get_settings(self.get_path("settings"))
+ self._setup_saved_settings()
+ self._setup_user_id(self.user_id)
+ self._setup_encoding(self.encoding)
+ self._setup_severities(self.severities)
+ self._setup_status(self.active_status, self.inactive_status)
+ self.vcs = vcs.vcs_by_name(self.vcs_name)
+ self._setup_user_id(self.user_id)
+
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self._save_settings(self.get_path("settings"), settings)
+
+ def get_version(self, path=None, use_none_vcs=False,
+ for_duplicate_bugdir=False):
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("get version")
+ if use_none_vcs == True:
+ VCS = vcs.vcs_by_name("None")
+ VCS.root(self.root)
+ VCS.encoding = encoding.get_encoding()
else:
- RCS = self.rcs
+ VCS = self.vcs
if path == None:
path = self.get_path("version")
- tree_version = RCS.get_file_contents(path)
- return tree_version
+ allow_no_vcs = not VCS.path_in_root(path)
+ if allow_no_vcs == True:
+ assert for_duplicate_bugdir == True
+ version = VCS.get_file_contents(
+ path, allow_no_vcs=allow_no_vcs).rstrip("\n")
+ return version
def set_version(self):
- self.rcs.mkdir(self.get_path())
- self.rcs.set_file_contents(self.get_path("version"),
- TREE_VERSION_STRING)
+ """
+ Requires disk access.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("set version")
+ self.vcs.mkdir(self.get_path())
+ self.vcs.set_file_contents(self.get_path("version"),
+ upgrade.BUGDIR_DISK_VERSION+"\n")
- def get_path(self, *args):
- my_dir = os.path.join(self.root, ".be")
- if len(args) == 0:
- return my_dir
- assert args[0] in ["version", "settings", "bugs"], str(args)
- return os.path.join(my_dir, *args)
+ # methods controlling disk access
- def _guess_rcs(self, allow_rcs_init=False):
- deepdir = self.get_path()
- if not os.path.exists(deepdir):
- deepdir = os.path.dirname(deepdir)
- new_rcs = rcs.detect_rcs(deepdir)
- install = False
- if new_rcs.name == "None":
- if allow_rcs_init == True:
- new_rcs = rcs.installed_rcs()
- new_rcs.init(self.root)
- return new_rcs
+ def set_sync_with_disk(self, value):
+ """
+ Adjust .sync_with_disk for the BugDir and all it's children.
+ See the BugDir docstring for a description of the role of
+ .sync_with_disk.
+ """
+ self.sync_with_disk = value
+ for bug in self:
+ bug.set_sync_with_disk(value)
def load(self):
- version = self.get_version(use_none_rcs=True)
- if version != TREE_VERSION_STRING:
- raise NotImplementedError, \
- "BugDir cannot handle version '%s' yet." % version
+ """
+ Reqires disk access
+ """
+ version = self.get_version(use_none_vcs=True)
+ if version != upgrade.BUGDIR_DISK_VERSION:
+ upgrade.upgrade(self.root, version)
else:
if not os.path.exists(self.get_path()):
raise NoBugDir(self.get_path())
self.load_settings()
- self.rcs = rcs.rcs_by_name(self.rcs_name)
- self._setup_user_id(self.user_id)
-
def load_all_bugs(self):
- "Warning: this could take a while."
+ """
+ Requires disk access.
+ Warning: this could take a while.
+ """
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load all bugs")
self._clear_bugs()
for uuid in self.list_uuids():
self._load_bug(uuid)
def save(self):
"""
+ Note that this command writes to disk _regardless_ of the
+ status of .sync_with_disk.
+
Save any loaded contents to disk. Because of lazy loading of
bugs and comments, this is actually not too inefficient.
- However, if self.sync_with_disk = True, then any changes are
+ However, if .sync_with_disk = True, then any changes are
automatically written to disk as soon as they happen, so
calling this method will just waste time (unless something
else has been messing with your on-disk files).
+
+ Requires disk access.
"""
+ sync_with_disk = self.sync_with_disk
+ if sync_with_disk == False:
+ self.set_sync_with_disk(True)
self.set_version()
self.save_settings()
for bug in self:
bug.save()
+ if sync_with_disk == False:
+ self.set_sync_with_disk(sync_with_disk)
- def load_settings(self):
- self.settings = self._get_settings(self.get_path("settings"))
- self._setup_saved_settings()
- self._setup_user_id(self.user_id)
- self._setup_encoding(self.encoding)
- self._setup_severities(self.severities)
- self._setup_status(self.active_status, self.inactive_status)
+ # methods for managing duplicate BugDirs
- def _get_settings(self, settings_path):
- allow_no_rcs = not self.rcs.path_in_root(settings_path)
- # allow_no_rcs=True should only be for the special case of
- # configuring duplicate bugdir settings
+ def duplicate_bugdir(self, revision):
+ duplicate_path = self.vcs.duplicate_repo(revision)
+ duplicate_version_path = os.path.join(duplicate_path, ".be", "version")
try:
- settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
- except rcs.NoSuchFile:
- settings = {"rcs_name": "None"}
- return settings
-
- def save_settings(self):
- settings = self._get_saved_settings()
- self._save_settings(self.get_path("settings"), settings)
-
- def _save_settings(self, settings_path, settings):
- allow_no_rcs = not self.rcs.path_in_root(settings_path)
- # allow_no_rcs=True should only be for the special case of
- # configuring duplicate bugdir settings
- self.rcs.mkdir(self.get_path(), allow_no_rcs)
- mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
-
- def duplicate_bugdir(self, revision):
- duplicate_path = self.rcs.duplicate_repo(revision)
+ version = self.get_version(duplicate_version_path,
+ for_duplicate_bugdir=True)
+ except DiskAccessRequired:
+ self.sync_with_disk = True # temporarily allow access
+ version = self.get_version(duplicate_version_path,
+ for_duplicate_bugdir=True)
+ self.sync_with_disk = False
+ if version != upgrade.BUGDIR_DISK_VERSION:
+ upgrade.upgrade(duplicate_path, version)
- # setup revision RCS as None, since the duplicate may not be
+ # setup revision VCS as None, since the duplicate may not be
# initialized for versioning
duplicate_settings_path = os.path.join(duplicate_path,
".be", "settings")
- duplicate_settings = self._get_settings(duplicate_settings_path)
- if "rcs_name" in duplicate_settings:
- duplicate_settings["rcs_name"] = "None"
+ duplicate_settings = self._get_settings(duplicate_settings_path,
+ for_duplicate_bugdir=True)
+ if "vcs_name" in duplicate_settings:
+ duplicate_settings["vcs_name"] = "None"
duplicate_settings["user_id"] = self.user_id
if "disabled" in bug.status_values:
# Hack to support old versions of BE bugs
duplicate_settings["inactive_status"] = self.inactive_status
- self._save_settings(duplicate_settings_path, duplicate_settings)
+ self._save_settings(duplicate_settings_path, duplicate_settings,
+ for_duplicate_bugdir=True)
return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
def remove_duplicate_bugdir(self):
- self.rcs.remove_duplicate_repo()
+ self.vcs.remove_duplicate_repo()
+
+ # methods for managing bugs
def list_uuids(self):
uuids = []
- if os.path.exists(self.get_path()):
+ if self.sync_with_disk == True and os.path.exists(self.get_path()):
# list the uuids on disk
- for uuid in os.listdir(self.get_path("bugs")):
- if not (uuid.startswith('.')):
- uuids.append(uuid)
- yield uuid
+ if os.path.exists(self.get_path("bugs")):
+ for uuid in os.listdir(self.get_path("bugs")):
+ if not (uuid.startswith('.')):
+ uuids.append(uuid)
+ yield uuid
# and the ones that are still just in memory
for bug in self:
if bug.uuid not in uuids:
@@ -476,6 +570,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and
self._bug_map_gen()
def _load_bug(self, uuid):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("_load bug")
bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
self.append(bg)
self._bug_map_gen()
@@ -492,7 +588,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and
def remove_bug(self, bug):
self.remove(bug)
- bug.remove()
+ if bug.sync_with_disk == True:
+ bug.remove()
def bug_shortname(self, bug):
"""
@@ -514,12 +611,13 @@ settings easy. Don't set this attribute. Set .rcs instead, and
def bug_from_shortname(self, shortname):
"""
- >>> bd = simple_bug_dir()
+ >>> bd = SimpleBugDir(sync_with_disk=False)
>>> bug_a = bd.bug_from_shortname('a')
>>> print type(bug_a)
<class 'libbe.bug.Bug'>
>>> print bug_a
a:om: Bug A
+ >>> bd.cleanup()
"""
matches = []
self._bug_map_gen()
@@ -530,7 +628,7 @@ settings easy. Don't set this attribute. Set .rcs instead, and
raise MultipleBugMatches(shortname, matches)
if len(matches) == 1:
return self.bug_from_uuid(matches[0])
- raise KeyError("No bug matches %s" % shortname)
+ raise NoBugMatches(shortname)
def bug_from_uuid(self, uuid):
if not self.has_bug(uuid):
@@ -548,41 +646,56 @@ settings easy. Don't set this attribute. Set .rcs instead, and
return True
-def simple_bug_dir():
+class SimpleBugDir (BugDir):
"""
- For testing
- >>> bugdir = simple_bug_dir()
- >>> ls = list(bugdir.list_uuids())
- >>> ls.sort()
- >>> print ls
+ For testing. Set sync_with_disk==False for a memory-only bugdir.
+ >>> bugdir = SimpleBugDir()
+ >>> uuids = list(bugdir.list_uuids())
+ >>> uuids.sort()
+ >>> print uuids
['a', 'b']
+ >>> bugdir.cleanup()
"""
- dir = utility.Dir()
- assert os.path.exists(dir.path)
- bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True,
+ def __init__(self, sync_with_disk=True):
+ if sync_with_disk == True:
+ dir = utility.Dir()
+ assert os.path.exists(dir.path)
+ root = dir.path
+ assert_new_BugDir = True
+ vcs_init = True
+ else:
+ root = "/"
+ assert_new_BugDir = False
+ vcs_init = False
+ BugDir.__init__(self, root, sink_to_existing_root=False,
+ assert_new_BugDir=assert_new_BugDir,
+ allow_vcs_init=vcs_init,
manipulate_encodings=False)
- bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir.
- bug_a = bugdir.new_bug("a", summary="Bug A")
- bug_a.creator = "John Doe <jdoe@example.com>"
- bug_a.time = 0
- bug_b = bugdir.new_bug("b", summary="Bug B")
- bug_b.creator = "Jane Doe <jdoe@example.com>"
- bug_b.time = 0
- bug_b.status = "closed"
- bugdir.save()
- return bugdir
-
+ if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir.
+ self._dir_ref = dir
+ bug_a = self.new_bug("a", summary="Bug A")
+ bug_a.creator = "John Doe <jdoe@example.com>"
+ bug_a.time = 0
+ bug_b = self.new_bug("b", summary="Bug B")
+ bug_b.creator = "Jane Doe <jdoe@example.com>"
+ bug_b.time = 0
+ bug_b.status = "closed"
+ if sync_with_disk == True:
+ self.save()
+ self.set_sync_with_disk(True)
+ def cleanup(self):
+ if hasattr(self, "_dir_ref"):
+ self._dir_ref.cleanup()
+ BugDir.cleanup(self)
class BugDirTestCase(unittest.TestCase):
- def __init__(self, *args, **kwargs):
- unittest.TestCase.__init__(self, *args, **kwargs)
def setUp(self):
self.dir = utility.Dir()
self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
- allow_rcs_init=True)
- self.rcs = self.bugdir.rcs
+ allow_vcs_init=True)
+ self.vcs = self.bugdir.vcs
def tearDown(self):
- self.rcs.cleanup()
+ self.bugdir.cleanup()
self.dir.cleanup()
def fullPath(self, path):
return os.path.join(self.dir.path, path)
@@ -593,13 +706,13 @@ class BugDirTestCase(unittest.TestCase):
self.assertRaises(AlreadyInitialized, BugDir,
self.dir.path, assertNewBugDir=True)
def versionTest(self):
- if self.rcs.versioned == False:
+ if self.vcs.versioned == False:
return
- original = self.bugdir.rcs.commit("Began versioning")
+ original = self.bugdir.vcs.commit("Began versioning")
bugA = self.bugdir.bug_from_uuid("a")
bugA.status = "fixed"
self.bugdir.save()
- new = self.rcs.commit("Fixed bug a")
+ new = self.vcs.commit("Fixed bug a")
dupdir = self.bugdir.duplicate_bugdir(original)
self.failUnless(dupdir.root != self.bugdir.root,
"%s, %s" % (dupdir.root, self.bugdir.root))
@@ -645,17 +758,19 @@ class BugDirTestCase(unittest.TestCase):
rep.new_reply("And they have six legs.")
if sync_with_disk == False:
self.bugdir.save()
+ self.bugdir.set_sync_with_disk(True)
self.bugdir._clear_bugs()
bug = self.bugdir.bug_from_uuid("a")
bug.load_comments()
+ if sync_with_disk == False:
+ self.bugdir.set_sync_with_disk(False)
self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
for index,comment in enumerate(bug.comments()):
if index == 0:
repLoaded = comment
self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
- self.failUnless(comment.sync_with_disk == True,
+ self.failUnless(comment.sync_with_disk == sync_with_disk,
comment.sync_with_disk)
- #load_settings()
self.failUnless(comment.content_type == "text/plain",
comment.content_type)
self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
@@ -672,5 +787,46 @@ class BugDirTestCase(unittest.TestCase):
def testSyncedComments(self):
self.testComments(sync_with_disk=True)
-unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
-suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()])
+class SimpleBugDirTestCase (unittest.TestCase):
+ def setUp(self):
+ # create a pre-existing bugdir in a temporary directory
+ self.dir = utility.Dir()
+ self.original_working_dir = os.getcwd()
+ os.chdir(self.dir.path)
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+ allow_vcs_init=True)
+ self.bugdir.new_bug("preexisting", summary="Hopefully not imported")
+ self.bugdir.save()
+ def tearDown(self):
+ os.chdir(self.original_working_dir)
+ self.bugdir.cleanup()
+ self.dir.cleanup()
+ def testOnDiskCleanLoad(self):
+ """SimpleBugDir(sync_with_disk==True) should not import preexisting bugs."""
+ bugdir = SimpleBugDir(sync_with_disk=True)
+ self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir._clear_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+ bugdir.load_all_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir.cleanup()
+ def testInMemoryCleanLoad(self):
+ """SimpleBugDir(sync_with_disk==False) should not import preexisting bugs."""
+ bugdir = SimpleBugDir(sync_with_disk=False)
+ self.failUnless(bugdir.sync_with_disk==False, bugdir.sync_with_disk)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs)
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == ['a', 'b'], uuids)
+ bugdir._clear_bugs()
+ uuids = sorted([bug.uuid for bug in bugdir])
+ self.failUnless(uuids == [], uuids)
+ bugdir.cleanup()
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/bzr.py b/libbe/bzr.py
index d7cd1e5..e9e0649 100644
--- a/libbe/bzr.py
+++ b/libbe/bzr.py
@@ -17,61 +17,65 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+Bazaar (bzr) backend.
+"""
+
import os
import re
import sys
import unittest
import doctest
-import rcs
-from rcs import RCS
+import vcs
+
def new():
return Bzr()
-class Bzr(RCS):
+class Bzr(vcs.VCS):
name = "bzr"
client = "bzr"
versioned = True
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
if self._u_search_parent_directories(path, ".bzr") != None :
return True
return False
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
status,output,error = self._u_invoke_client("root", path)
return output.rstrip('\n')
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._u_invoke_client("init", directory=path)
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
status,output,error = self._u_invoke_client("whoami")
return output.rstrip('\n')
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
self._u_invoke_client("whoami", value)
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
self._u_invoke_client("add", path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
# --force to also remove unversioned files.
self._u_invoke_client("remove", "--force", path)
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
pass
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
else:
status,output,error = \
self._u_invoke_client("cat","-r",revision,path)
return output
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision == None:
- RCS._rcs_duplicate_repo(self, directory, revision)
+ vcs.VCS._vcs_duplicate_repo(self, directory, revision)
else:
self._u_invoke_client("branch", "--revision", revision,
".", directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
args = ["commit", "--file", commitfile]
if allow_empty == True:
args.append("--unchanged")
@@ -83,9 +87,9 @@ class Bzr(RCS):
strings = ["ERROR: no changes to commit.", # bzr 1.3.1
"ERROR: No changes to commit."] # bzr 1.15.1
if self._u_any_in_string(strings, error) == True:
- raise rcs.EmptyCommit()
+ raise vcs.EmptyCommit()
else:
- raise rcs.CommandError(args, status, error)
+ raise vcs.CommandError(args, status, stdout="", stderr=error)
revision = None
revline = re.compile("Committed revision (.*)[.]")
match = revline.search(error)
@@ -93,9 +97,17 @@ class Bzr(RCS):
assert len(match.groups()) == 1
revision = match.groups()[0]
return revision
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client("revno")
+ current_revision = int(output)
+ if index >= current_revision or index < -current_revision:
+ return None
+ if index >= 0:
+ return str(index+1) # bzr commit 0 is the empty tree.
+ return str(current_revision+index+1)
-rcs.make_rcs_testcase_subclasses(Bzr, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/cmdutil.py b/libbe/cmdutil.py
index 853a75a..9b64142 100644
--- a/libbe/cmdutil.py
+++ b/libbe/cmdutil.py
@@ -15,6 +15,11 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define assorted utilities to make command-line handling easier.
+"""
+
import glob
import optparse
import os
@@ -70,10 +75,11 @@ def get_command(command_name):
return cmd
-def execute(cmd, args):
+def execute(cmd, args, manipulate_encodings=True):
enc = encoding.get_encoding()
cmd = get_command(cmd)
- ret = cmd.execute([a.decode(enc) for a in args])
+ ret = cmd.execute([a.decode(enc) for a in args],
+ manipulate_encodings=manipulate_encodings)
if ret == None:
ret = 0
return ret
@@ -206,6 +212,15 @@ def underlined(instring):
return "%s\n%s" % (instring, "="*len(instring))
+def bug_from_shortname(bdir, shortname):
+ """
+ Exception translation for the command-line interface.
+ """
+ try:
+ bug = bdir.bug_from_shortname(shortname)
+ except (bugdir.MultipleBugMatches, bugdir.NoBugMatches), e:
+ raise UserError(e.message)
+ return bug
def _test():
import doctest
diff --git a/libbe/comment.py b/libbe/comment.py
index 3249e8b..41bc7e6 100644
--- a/libbe/comment.py
+++ b/libbe/comment.py
@@ -16,6 +16,11 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the Comment class for representing bug comments.
+"""
+
import base64
import os
import os.path
@@ -61,6 +66,11 @@ class MissingReference(ValueError):
self.reference = comment.in_reply_to
self.comment = comment
+class DiskAccessRequired (Exception):
+ def __init__(self, goal):
+ msg = "Cannot %s without accessing the disk" % goal
+ Exception.__init__(self, msg)
+
INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!"
def list_to_root(comments, bug, root=None,
@@ -115,8 +125,10 @@ def loadComments(bug, load_full=False):
Set load_full=True when you want to load the comment completely
from disk *now*, rather than waiting and lazy loading as required.
"""
+ if bug.sync_with_disk == False:
+ raise DiskAccessRequired("load comments")
path = bug.get_path("comments")
- if not os.path.isdir(path):
+ if not os.path.exists(path):
return Comment(bug, uuid=INVALID_UUID)
comments = []
for uuid in os.listdir(path):
@@ -131,6 +143,8 @@ def loadComments(bug, load_full=False):
return list_to_root(comments, bug)
def saveComments(bug):
+ if bug.sync_with_disk == False:
+ raise DiskAccessRequired("save comments")
for comment in bug.comment_root.traverse():
comment.save()
@@ -162,9 +176,9 @@ class Comment(Tree, settings_object.SavedSettingsObject):
doc="Alternate ID for linking imported comments. Internally comments are linked (via In-reply-to) to the parent's UUID. However, these UUIDs are generated internally, so Alt-id is provided as a user-controlled linking target.")
def alt_id(): return {}
- @_versioned_property(name="From",
+ @_versioned_property(name="Author",
doc="The author of the comment")
- def From(): return {}
+ def author(): return {}
@_versioned_property(name="In-reply-to",
doc="UUID for parent comment or bug")
@@ -178,28 +192,28 @@ class Comment(Tree, settings_object.SavedSettingsObject):
@_versioned_property(name="Date",
doc="An RFC 2822 timestamp for comment creation")
- def time_string(): return {}
+ def date(): return {}
def _get_time(self):
- if self.time_string == None:
+ if self.date == None:
return None
- return utility.str_to_time(self.time_string)
+ return utility.str_to_time(self.date)
def _set_time(self, value):
- self.time_string = utility.time_to_str(value)
+ self.date = utility.time_to_str(value)
time = property(fget=_get_time,
fset=_set_time,
- doc="An integer version of .time_string")
+ doc="An integer version of .date")
def _get_comment_body(self):
- if self.rcs != None and self.sync_with_disk == True:
- import rcs
+ if self.vcs != None and self.sync_with_disk == True:
+ import vcs
binary = not self.content_type.startswith("text/")
- return self.rcs.get_file_contents(self.get_path("body"), binary=binary)
+ return self.vcs.get_file_contents(self.get_path("body"), binary=binary)
def _set_comment_body(self, old=None, new=None, force=False):
- if (self.rcs != None and self.sync_with_disk == True) or force==True:
+ if (self.vcs != None and self.sync_with_disk == True) or force==True:
assert new != None, "Can't save empty comment"
binary = not self.content_type.startswith("text/")
- self.rcs.set_file_contents(self.get_path("body"), new, binary=binary)
+ self.vcs.set_file_contents(self.get_path("body"), new, binary=binary)
@Property
@change_hook_property(hook=_set_comment_body)
@@ -208,15 +222,15 @@ class Comment(Tree, settings_object.SavedSettingsObject):
@doc_property(doc="The meat of the comment")
def body(): return {}
- def _get_rcs(self):
- if hasattr(self.bug, "rcs"):
- return self.bug.rcs
+ def _get_vcs(self):
+ if hasattr(self.bug, "vcs"):
+ return self.bug.vcs
@Property
- @cached_property(generator=_get_rcs)
- @local_property("rcs")
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
@doc_property(doc="A revision control system instance.")
- def rcs(): return {}
+ def vcs(): return {}
def _extra_strings_check_fn(value):
return utility.iterable_full_of_strings(value, \
@@ -257,13 +271,29 @@ class Comment(Tree, settings_object.SavedSettingsObject):
if uuid == None:
self.uuid = uuid_gen()
self.time = int(time.time()) # only save to second precision
- if self.rcs != None:
- self.From = self.rcs.get_user_id()
+ if self.vcs != None:
+ self.author = self.vcs.get_user_id()
self.in_reply_to = in_reply_to
self.body = body
- def set_sync_with_disk(self, value):
- self.sync_with_disk = True
+ def __cmp__(self, other):
+ return cmp_full(self, other)
+
+ def __str__(self):
+ """
+ >>> comm = Comment(bug=None, body="Some insightful remarks")
+ >>> comm.uuid = "com-1"
+ >>> comm.date = "Thu, 20 Nov 2008 15:55:11 +0000"
+ >>> comm.author = "Jane Doe <jdoe@example.com>"
+ >>> print comm
+ --------- Comment ---------
+ Name: com-1
+ From: Jane Doe <jdoe@example.com>
+ Date: Thu, 20 Nov 2008 15:55:11 +0000
+ <BLANKLINE>
+ Some insightful remarks
+ """
+ return self.string()
def traverse(self, *args, **kwargs):
"""Avoid working with the possible dummy root comment"""
@@ -272,6 +302,8 @@ class Comment(Tree, settings_object.SavedSettingsObject):
continue
yield comment
+ # serializing methods
+
def _setting_attr_string(self, setting):
value = getattr(self, setting)
if value == None:
@@ -282,12 +314,12 @@ class Comment(Tree, settings_object.SavedSettingsObject):
"""
>>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
>>> comm.uuid = "0123"
- >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000"
+ >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000"
>>> print comm.xml(indent=2, shortname="com-1")
<comment>
<uuid>0123</uuid>
<short-name>com-1</short-name>
- <from></from>
+ <author></author>
<date>Thu, 01 Jan 1970 00:00:00 +0000</date>
<content-type>text/plain</content-type>
<body>Some
@@ -309,8 +341,8 @@ class Comment(Tree, settings_object.SavedSettingsObject):
("alt-id", self.alt_id),
("short-name", shortname),
("in-reply-to", self.in_reply_to),
- ("from", self._setting_attr_string("From")),
- ("date", self.time_string),
+ ("author", self._setting_attr_string("author")),
+ ("date", self.date),
("content-type", self.content_type),
("body", body)]
lines = ["<comment>"]
@@ -328,11 +360,11 @@ class Comment(Tree, settings_object.SavedSettingsObject):
<alt-id> fields.
>>> commA = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
>>> commA.uuid = "0123"
- >>> commA.time_string = "Thu, 01 Jan 1970 00:00:00 +0000"
+ >>> commA.date = "Thu, 01 Jan 1970 00:00:00 +0000"
>>> xml = commA.xml(shortname="com-1")
>>> commB = Comment()
>>> commB.from_xml(xml)
- >>> attrs=['uuid','alt_id','in_reply_to','From','time_string','content_type','body']
+ >>> attrs=['uuid','alt_id','in_reply_to','author','date','content_type','body']
>>> for attr in attrs: # doctest: +ELLIPSIS
... if getattr(commB, attr) != getattr(commA, attr):
... estr = "Mismatch on %s: '%s' should be '%s'"
@@ -342,15 +374,15 @@ class Comment(Tree, settings_object.SavedSettingsObject):
Mismatch on alt_id: '0123' should be 'None'
>>> print commB.alt_id
0123
- >>> commA.From
- >>> commB.From
+ >>> commA.author
+ >>> commB.author
"""
if type(xml_string) == types.UnicodeType:
xml_string = xml_string.strip().encode("unicode_escape")
comment = ElementTree.XML(xml_string)
if comment.tag != "comment":
raise InvalidXML(comment, "root element must be <comment>")
- tags=['uuid','alt-id','in-reply-to','from','date','content-type','body']
+ tags=['uuid','alt-id','in-reply-to','author','date','content-type','body']
uuid = None
body = None
for child in comment.getchildren():
@@ -368,10 +400,6 @@ class Comment(Tree, settings_object.SavedSettingsObject):
if child.tag == "body":
body = text
continue # don't set the bug's body yet.
- elif child.tag == 'from':
- attr_name = "From"
- elif child.tag == 'date':
- attr_name = 'time_string'
else:
attr_name = child.tag.replace('-','_')
setattr(self, attr_name, text)
@@ -389,7 +417,7 @@ class Comment(Tree, settings_object.SavedSettingsObject):
def string(self, indent=0, shortname=None):
"""
>>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
- >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000"
+ >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000"
>>> print comm.string(indent=2, shortname="com-1")
--------- Comment ---------
Name: com-1
@@ -405,8 +433,8 @@ class Comment(Tree, settings_object.SavedSettingsObject):
lines = []
lines.append("--------- Comment ---------")
lines.append("Name: %s" % shortname)
- lines.append("From: %s" % (self._setting_attr_string("From")))
- lines.append("Date: %s" % self.time_string)
+ lines.append("From: %s" % (self._setting_attr_string("author")))
+ lines.append("Date: %s" % self.date)
lines.append("")
if self.content_type.startswith("text/"):
lines.extend((self.body or "").splitlines())
@@ -417,78 +445,6 @@ class Comment(Tree, settings_object.SavedSettingsObject):
sep = '\n' + istring
return istring + sep.join(lines).rstrip('\n')
- def __str__(self):
- """
- >>> comm = Comment(bug=None, body="Some insightful remarks")
- >>> comm.uuid = "com-1"
- >>> comm.time_string = "Thu, 20 Nov 2008 15:55:11 +0000"
- >>> comm.From = "Jane Doe <jdoe@example.com>"
- >>> print comm
- --------- Comment ---------
- Name: com-1
- From: Jane Doe <jdoe@example.com>
- Date: Thu, 20 Nov 2008 15:55:11 +0000
- <BLANKLINE>
- Some insightful remarks
- """
- return self.string()
-
- def get_path(self, name=None):
- my_dir = os.path.join(self.bug.get_path("comments"), self.uuid)
- if name is None:
- return my_dir
- assert name in ["values", "body"]
- return os.path.join(my_dir, name)
-
- def load_settings(self):
- self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
- self._setup_saved_settings()
-
- def save_settings(self):
- self.rcs.mkdir(self.get_path())
- path = self.get_path("values")
- mapfile.map_save(self.rcs, path, self._get_saved_settings())
-
- def save(self):
- """
- Save any loaded contents to disk.
-
- However, if self.sync_with_disk = True, then any changes are
- automatically written to disk as soon as they happen, so
- calling this method will just waste time (unless something
- else has been messing with your on-disk files).
- """
- assert self.body != None, "Can't save blank comment"
- self.save_settings()
- self._set_comment_body(new=self.body, force=True)
-
- def remove(self):
- for comment in self.traverse():
- path = comment.get_path()
- self.rcs.recursive_remove(path)
-
- def add_reply(self, reply, allow_time_inversion=False):
- if self.uuid != INVALID_UUID:
- reply.in_reply_to = self.uuid
- self.append(reply)
- #raise Exception, "adding reply \n%s\n%s" % (self, reply)
-
- def new_reply(self, body=None):
- """
- >>> comm = Comment(bug=None, body="Some insightful remarks")
- >>> repA = comm.new_reply("Critique original comment")
- >>> repB = repA.new_reply("Begin flamewar :p")
- >>> repB.in_reply_to == repA.uuid
- True
- """
- reply = Comment(self.bug, body=body)
- if self.bug != None:
- reply.set_sync_with_disk(self.bug.sync_with_disk)
- if reply.sync_with_disk == True:
- reply.save()
- self.add_reply(reply)
- return reply
-
def string_thread(self, string_method_name="string", name_map={},
indent=0, flatten=True,
auto_name_map=False, bug_shortname=None):
@@ -506,7 +462,7 @@ class Comment(Tree, settings_object.SavedSettingsObject):
name_map = {}
for shortname,comment in comm.comment_shortnames(bug_shortname):
name_map[comment.uuid] = shortname
- comm.sort(key=lambda c : c.From) # your sort
+ comm.sort(key=lambda c : c.author) # your sort
comm.string_thread(name_map=name_map)
>>> a = Comment(bug=None, uuid="a", body="Insightful remarks")
@@ -593,6 +549,77 @@ class Comment(Tree, settings_object.SavedSettingsObject):
indent=indent, auto_name_map=auto_name_map,
bug_shortname=bug_shortname)
+ # methods for saving/loading/acessing settings and properties.
+
+ def get_path(self, *args):
+ dir = os.path.join(self.bug.get_path("comments"), self.uuid)
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["values", "body"], str(args)
+ return os.path.join(dir, *args)
+
+ def set_sync_with_disk(self, value):
+ self.sync_with_disk = value
+
+ def load_settings(self):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("load settings")
+ self.settings = mapfile.map_load(self.vcs, self.get_path("values"))
+ self._setup_saved_settings()
+
+ def save_settings(self):
+ if self.sync_with_disk == False:
+ raise DiskAccessRequired("save settings")
+ self.vcs.mkdir(self.get_path())
+ path = self.get_path("values")
+ mapfile.map_save(self.vcs, path, self._get_saved_settings())
+
+ def save(self):
+ """
+ Save any loaded contents to disk.
+
+ However, if self.sync_with_disk = True, then any changes are
+ automatically written to disk as soon as they happen, so
+ calling this method will just waste time (unless something
+ else has been messing with your on-disk files).
+ """
+ sync_with_disk = self.sync_with_disk
+ if sync_with_disk == False:
+ self.set_sync_with_disk(True)
+ assert self.body != None, "Can't save blank comment"
+ self.save_settings()
+ self._set_comment_body(new=self.body, force=True)
+ if sync_with_disk == False:
+ self.set_sync_with_disk(False)
+
+ def remove(self):
+ if self.sync_with_disk == False and self.uuid != INVALID_UUID:
+ raise DiskAccessRequired("remove")
+ for comment in self.traverse():
+ path = comment.get_path()
+ self.vcs.recursive_remove(path)
+
+ def add_reply(self, reply, allow_time_inversion=False):
+ if self.uuid != INVALID_UUID:
+ reply.in_reply_to = self.uuid
+ self.append(reply)
+
+ def new_reply(self, body=None):
+ """
+ >>> comm = Comment(bug=None, body="Some insightful remarks")
+ >>> repA = comm.new_reply("Critique original comment")
+ >>> repB = repA.new_reply("Begin flamewar :p")
+ >>> repB.in_reply_to == repA.uuid
+ True
+ """
+ reply = Comment(self.bug, body=body)
+ if self.bug != None:
+ reply.set_sync_with_disk(self.bug.sync_with_disk)
+ if reply.sync_with_disk == True:
+ reply.save()
+ self.add_reply(reply)
+ return reply
+
def comment_shortnames(self, bug_shortname=None):
"""
Iterate through (id, comment) pairs, in time order.
@@ -659,4 +686,59 @@ class Comment(Tree, settings_object.SavedSettingsObject):
return comment
raise KeyError(uuid)
+def cmp_attr(comment_1, comment_2, attr, invert=False):
+ """
+ Compare a general attribute between two comments using the conventional
+ comparison rule for that attribute type. If invert == True, sort
+ *against* that convention.
+ >>> attr="author"
+ >>> commentA = Comment()
+ >>> commentB = Comment()
+ >>> commentA.author = "John Doe"
+ >>> commentB.author = "Jane Doe"
+ >>> cmp_attr(commentA, commentB, attr) > 0
+ True
+ >>> cmp_attr(commentA, commentB, attr, invert=True) < 0
+ True
+ >>> commentB.author = "John Doe"
+ >>> cmp_attr(commentA, commentB, attr) == 0
+ True
+ """
+ if not hasattr(comment_2, attr) :
+ return 1
+ val_1 = getattr(comment_1, attr)
+ val_2 = getattr(comment_2, attr)
+ if val_1 == None: val_1 = None
+ if val_2 == None: val_2 = None
+
+ if invert == True :
+ return -cmp(val_1, val_2)
+ else :
+ return cmp(val_1, val_2)
+
+# alphabetical rankings (a < z)
+cmp_uuid = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "uuid")
+cmp_author = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "author")
+cmp_in_reply_to = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "in_reply_to")
+cmp_content_type = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "content_type")
+cmp_body = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "body")
+# chronological rankings (newer < older)
+cmp_time = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "time", invert=True)
+
+DEFAULT_CMP_FULL_CMP_LIST = \
+ (cmp_time, cmp_author, cmp_content_type, cmp_body, cmp_in_reply_to,
+ cmp_uuid)
+
+class CommentCompoundComparator (object):
+ def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST):
+ self.cmp_list = cmp_list
+ def __call__(self, comment_1, comment_2):
+ for comparison in self.cmp_list :
+ val = comparison(comment_1, comment_2)
+ if val != 0 :
+ return val
+ return 0
+
+cmp_full = CommentCompoundComparator()
+
suite = doctest.DocTestSuite()
diff --git a/libbe/config.py b/libbe/config.py
index 5e343b9..fb5a028 100644
--- a/libbe/config.py
+++ b/libbe/config.py
@@ -14,6 +14,11 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Create, save, and load the per-user config file at path().
+"""
+
import ConfigParser
import codecs
import locale
@@ -21,6 +26,7 @@ import os.path
import sys
import doctest
+
default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
def path():
diff --git a/libbe/darcs.py b/libbe/darcs.py
index e7132c0..16005f2 100644
--- a/libbe/darcs.py
+++ b/libbe/darcs.py
@@ -14,31 +14,40 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+Darcs backend.
+"""
+
import codecs
import os
import re
import sys
-import unittest
+try: # import core module, Python >= 2.5
+ from xml.etree import ElementTree
+except ImportError: # look for non-core module
+ from elementtree import ElementTree
+from xml.sax.saxutils import unescape
import doctest
+import unittest
+
+import vcs
-import rcs
-from rcs import RCS
def new():
return Darcs()
-class Darcs(RCS):
+class Darcs(vcs.VCS):
name="darcs"
client="darcs"
versioned=True
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
if self._u_search_parent_directories(path, "_darcs") != None :
return True
return False
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
# Assume that nothing funny is going on; in particular, that we aren't
# dealing with a bare repo.
@@ -48,9 +57,9 @@ class Darcs(RCS):
if darcs_dir == None:
return None
return os.path.dirname(darcs_dir)
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._u_invoke_client("init", directory=path)
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
# following http://darcs.net/manual/node4.html#SECTION00410030000000000000
# as of June 29th, 2009
if self.rootdir == None:
@@ -65,32 +74,32 @@ class Darcs(RCS):
if env_variable in os.environ:
return os.environ[env_variable]
return None
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
if self.rootdir == None:
self.root(".")
if self.rootdir == None:
- raise rcs.SettingIDnotSupported
+ raise vcs.SettingIDnotSupported
author_path = os.path.join(self.rootdir, "_darcs", "prefs", "author")
f = codecs.open(author_path, "w", self.encoding)
f.write(value)
f.close()
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
if os.path.isdir(path):
return
self._u_invoke_client("add", path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
if not os.path.isdir(self._u_abspath(path)):
os.remove(os.path.join(self.rootdir, path)) # darcs notices removal
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
pass # darcs notices changes
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision,
+ return vcs.VCS._vcs_get_file_contents(self, path, revision,
binary=binary)
else:
try:
return self._u_invoke_client("show", "contents", "--patch", revision, path)
- except rcs.CommandError:
+ except vcs.CommandError:
# Darcs versions < 2.0.0pre2 lack the "show contents" command
status,output,error = self._u_invoke_client("diff", "--unified",
@@ -113,7 +122,7 @@ class Darcs(RCS):
status,output,error = self._u_invoke(args, stdin=target_patch)
if os.path.exists(os.path.join(self.rootdir, path)) == True:
- contents = RCS._rcs_get_file_contents(self, path,
+ contents = vcs.VCS._vcs_get_file_contents(self, path,
binary=binary)
else:
contents = ""
@@ -123,41 +132,53 @@ class Darcs(RCS):
status,output,error = self._u_invoke(args, stdin=target_patch)
args=["patch", path]
status,output,error = self._u_invoke(args, stdin=major_patch)
- current_contents = RCS._rcs_get_file_contents(self, path,
+ current_contents = vcs.VCS._vcs_get_file_contents(self, path,
binary=binary)
return contents
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision==None:
- RCS._rcs_duplicate_repo(self, directory, revision)
+ vcs.VCS._vcs_duplicate_repo(self, directory, revision)
else:
self._u_invoke_client("put", "--to-patch", revision, directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
id = self.get_user_id()
if '@' not in id:
id = "%s <%s@invalid.com>" % (id, id)
args = ['record', '--all', '--author', id, '--logfile', commitfile]
status,output,error = self._u_invoke_client(*args)
empty_strings = ["No changes!"]
- revision = None
if self._u_any_in_string(empty_strings, output) == True:
if allow_empty == False:
- raise rcs.EmptyCommit()
- else: # we need a extra call to get the current revision
- args = ["changes", "--last=1", "--xml"]
- status,output,error = self._u_invoke_client(*args)
- revline = re.compile("[ \t]*<name>(.*)</name>")
- # note that darcs does _not_ make an empty revision.
- # this returns the last non-empty revision id...
+ raise vcs.EmptyCommit()
+ # note that darcs does _not_ make an empty revision.
+ # this returns the last non-empty revision id...
+ revision = self._vcs_revision_id(-1)
else:
revline = re.compile("Finished recording patch '(.*)'")
- match = revline.search(output)
- assert match != None, output+error
- assert len(match.groups()) == 1
- revision = match.groups()[0]
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
return revision
-
+ def _vcs_revision_id(self, index):
+ status,output,error = self._u_invoke_client("changes", "--xml")
+ revisions = []
+ xml_str = output.encode("unicode_escape").replace(r"\n", "\n")
+ element = ElementTree.XML(xml_str)
+ assert element.tag == "changelog", element.tag
+ for patch in element.getchildren():
+ assert patch.tag == "patch", patch.tag
+ for child in patch.getchildren():
+ if child.tag == "name":
+ text = unescape(unicode(child.text).decode("unicode_escape").strip())
+ revisions.append(text)
+ revisions.reverse()
+ try:
+ return revisions[index]
+ except IndexError:
+ return None
-rcs.make_rcs_testcase_subclasses(Darcs, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/diff.py b/libbe/diff.py
index ba48efc..9253a23 100644
--- a/libbe/diff.py
+++ b/libbe/diff.py
@@ -14,112 +14,406 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-"""Compare two bug trees"""
-from libbe import cmdutil, bugdir, bug
-from libbe.utility import time_to_str
+
+"""Compare two bug trees."""
+
+import difflib
import doctest
-def bug_diffs(old_bugdir, new_bugdir):
- added = []
- removed = []
- modified = []
- for uuid in old_bugdir.list_uuids():
- old_bug = old_bugdir.bug_from_uuid(uuid)
- try:
- new_bug = new_bugdir.bug_from_uuid(uuid)
- old_bug.load_comments()
- new_bug.load_comments()
- if old_bug != new_bug:
- modified.append((old_bug, new_bug))
- except KeyError:
- removed.append(old_bug)
- for uuid in new_bugdir.list_uuids():
- if not old_bugdir.has_bug(uuid):
- new_bug = new_bugdir.bug_from_uuid(uuid)
- added.append(new_bug)
- return (removed, modified, added)
+from libbe import bugdir, bug, settings_object, tree
+from libbe.utility import time_to_str
-def diff_report(bug_diffs_data, old_bugdir, new_bugdir):
- bugs_removed,bugs_modified,bugs_added = bug_diffs_data
- def modified_cmp(left, right):
- return bug.cmp_severity(left[1], right[1])
- bugs_added.sort(bug.cmp_severity)
- bugs_removed.sort(bug.cmp_severity)
- bugs_modified.sort(modified_cmp)
- lines = []
-
- if old_bugdir.settings != new_bugdir.settings:
- bugdir_settings = sorted(new_bugdir.settings_properties)
- bugdir_settings.remove("rcs_name") # tweaked by bugdir.duplicate_bugdir
- change_list = change_lines(old_bugdir, new_bugdir, bugdir_settings)
- if len(change_list) > 0:
- lines.append("Modified bug directory:")
- change_strings = ["%s: %s -> %s" % f for f in change_list]
- lines.extend(change_strings)
- lines.append("")
- if len(bugs_added) > 0:
- lines.append("New bug reports:")
- for bg in bugs_added:
- lines.extend(bg.string(shortlist=True).splitlines())
- lines.append("")
- if len(bugs_modified) > 0:
- printed = False
- for old_bug, new_bug in bugs_modified:
- change_str = bug_changes(old_bug, new_bug)
- if change_str is None:
- continue
- if not printed:
- printed = True
- lines.append("Modified bug reports:")
- lines.extend(change_str.splitlines())
- if printed == True:
- lines.append("")
- if len(bugs_removed) > 0:
- lines.append("Removed bug reports:")
- for bg in bugs_removed:
- lines.extend(bg.string(shortlist=True).splitlines())
- lines.append("")
-
- return "\n".join(lines).rstrip("\n")
+class DiffTree (tree.Tree):
+ """
+ A tree holding difference data for easy report generation.
+ >>> bugdir = DiffTree("bugdir")
+ >>> bdsettings = DiffTree("settings", data="target: None -> 1.0")
+ >>> bugdir.append(bdsettings)
+ >>> bugs = DiffTree("bugs", "bug-count: 5 -> 6")
+ >>> bugdir.append(bugs)
+ >>> new = DiffTree("new", "new bugs: ABC, DEF")
+ >>> bugs.append(new)
+ >>> rem = DiffTree("rem", "removed bugs: RST, UVW")
+ >>> bugs.append(rem)
+ >>> print bugdir.report_string()
+ target: None -> 1.0
+ bug-count: 5 -> 6
+ new bugs: ABC, DEF
+ removed bugs: RST, UVW
+ >>> print "\\n".join(bugdir.paths())
+ bugdir
+ bugdir/settings
+ bugdir/bugs
+ bugdir/bugs/new
+ bugdir/bugs/rem
+ >>> bugdir.child_by_path("/") == bugdir
+ True
+ >>> bugdir.child_by_path("/bugs") == bugs
+ True
+ >>> bugdir.child_by_path("/bugs/rem") == rem
+ True
+ >>> bugdir.child_by_path("bugdir") == bugdir
+ True
+ >>> bugdir.child_by_path("bugdir/") == bugdir
+ True
+ >>> bugdir.child_by_path("bugdir/bugs") == bugs
+ True
+ >>> bugdir.child_by_path("/bugs").masked = True
+ >>> print bugdir.report_string()
+ target: None -> 1.0
+ """
+ def __init__(self, name, data=None, data_part_fn=str,
+ requires_children=False, masked=False):
+ tree.Tree.__init__(self)
+ self.name = name
+ self.data = data
+ self.data_part_fn = data_part_fn
+ self.requires_children = requires_children
+ self.masked = masked
+ def paths(self, parent_path=None):
+ paths = []
+ if parent_path == None:
+ path = self.name
+ else:
+ path = "%s/%s" % (parent_path, self.name)
+ paths.append(path)
+ for child in self:
+ paths.extend(child.paths(path))
+ return paths
+ def child_by_path(self, path):
+ if hasattr(path, "split"): # convert string path to a list of names
+ names = path.split("/")
+ if names[0] == "":
+ names[0] = self.name # replace root with self
+ if len(names) > 1 and names[-1] == "":
+ names = names[:-1] # strip empty tail
+ else: # it was already an array
+ names = path
+ assert len(names) > 0, path
+ if names[0] == self.name:
+ if len(names) == 1:
+ return self
+ for child in self:
+ if names[1] == child.name:
+ return child.child_by_path(names[1:])
+ if len(names) == 1:
+ raise KeyError, "%s doesn't match '%s'" % (names, self.name)
+ raise KeyError, "%s points to child not in %s" % (names, [c.name for c in self])
+ def report_string(self):
+ return "\n".join(self.report())
+ def report(self, root=None, parent=None, depth=0):
+ if root == None:
+ root = self.make_root()
+ if self.masked == True:
+ return None
+ data_part = self.data_part(depth)
+ if self.requires_children == True and len(self) == 0:
+ pass
+ else:
+ self.join(root, parent, data_part)
+ if data_part != None:
+ depth += 1
+ for child in self:
+ child.report(root, self, depth)
+ return root
+ def make_root(self):
+ return []
+ def join(self, root, parent, data_part):
+ if data_part != None:
+ root.append(data_part)
+ def data_part(self, depth, indent=True):
+ if self.data == None:
+ return None
+ if hasattr(self, "_cached_data_part"):
+ return self._cached_data_part
+ data_part = self.data_part_fn(self.data)
+ if indent == True:
+ data_part_lines = data_part.splitlines()
+ indent = " "*(depth)
+ line_sep = "\n"+indent
+ data_part = indent+line_sep.join(data_part_lines)
+ self._cached_data_part = data_part
+ return data_part
-def change_lines(old, new, attributes):
- change_list = []
- for attr in attributes:
- old_attr = getattr(old, attr)
- new_attr = getattr(new, attr)
- if old_attr != new_attr:
- change_list.append((attr, old_attr, new_attr))
- if len(change_list) >= 0:
- return change_list
- else:
+class Diff (object):
+ """
+ Difference tree generator for BugDirs.
+ >>> import copy
+ >>> bd = bugdir.SimpleBugDir(sync_with_disk=False)
+ >>> bd.user_id = "John Doe <j@doe.com>"
+ >>> bd_new = copy.deepcopy(bd)
+ >>> bd_new.target = "1.0"
+ >>> a = bd_new.bug_from_uuid("a")
+ >>> rep = a.comment_root.new_reply("I'm closing this bug")
+ >>> rep.uuid = "acom"
+ >>> rep.date = "Thu, 01 Jan 1970 00:00:00 +0000"
+ >>> a.status = "closed"
+ >>> b = bd_new.bug_from_uuid("b")
+ >>> bd_new.remove_bug(b)
+ >>> c = bd_new.new_bug("c", "Bug C")
+ >>> d = Diff(bd, bd_new)
+ >>> r = d.report_tree()
+ >>> print "\\n".join(r.paths())
+ bugdir
+ bugdir/settings
+ bugdir/bugs
+ bugdir/bugs/new
+ bugdir/bugs/new/c
+ bugdir/bugs/rem
+ bugdir/bugs/rem/b
+ bugdir/bugs/mod
+ bugdir/bugs/mod/a
+ bugdir/bugs/mod/a/settings
+ bugdir/bugs/mod/a/comments
+ bugdir/bugs/mod/a/comments/new
+ bugdir/bugs/mod/a/comments/new/acom
+ bugdir/bugs/mod/a/comments/rem
+ bugdir/bugs/mod/a/comments/mod
+ >>> print r.report_string()
+ Changed bug directory settings:
+ target: None -> 1.0
+ New bugs:
+ c:om: Bug C
+ Removed bugs:
+ b:cm: Bug B
+ Modified bugs:
+ a:cm: Bug A
+ Changed bug settings:
+ status: open -> closed
+ New comments:
+ from John Doe <j@doe.com> on Thu, 01 Jan 1970 00:00:00 +0000
+ I'm closing this bug...
+ >>> bd.cleanup()
+ """
+ def __init__(self, old_bugdir, new_bugdir):
+ self.old_bugdir = old_bugdir
+ self.new_bugdir = new_bugdir
+
+ # data assembly methods
+
+ def _changed_bugs(self):
+ """
+ Search for differences in all bugs between .old_bugdir and
+ .new_bugdir. Returns
+ (added_bugs, modified_bugs, removed_bugs)
+ where added_bugs and removed_bugs are lists of added and
+ removed bugs respectively. modified_bugs is a list of
+ (old_bug,new_bug) pairs.
+ """
+ if hasattr(self, "__changed_bugs"):
+ return self.__changed_bugs
+ added = []
+ removed = []
+ modified = []
+ for uuid in self.new_bugdir.list_uuids():
+ new_bug = self.new_bugdir.bug_from_uuid(uuid)
+ try:
+ old_bug = self.old_bugdir.bug_from_uuid(uuid)
+ except KeyError:
+ added.append(new_bug)
+ else:
+ if old_bug.sync_with_disk == True:
+ old_bug.load_comments()
+ if new_bug.sync_with_disk == True:
+ new_bug.load_comments()
+ if old_bug != new_bug:
+ modified.append((old_bug, new_bug))
+ for uuid in self.old_bugdir.list_uuids():
+ if not self.new_bugdir.has_bug(uuid):
+ old_bug = self.old_bugdir.bug_from_uuid(uuid)
+ removed.append(old_bug)
+ added.sort()
+ removed.sort()
+ modified.sort(self._bug_modified_cmp)
+ self.__changed_bugs = (added, modified, removed)
+ return self.__changed_bugs
+ def _bug_modified_cmp(self, left, right):
+ return cmp(left[1], right[1])
+ def _changed_comments(self, old, new):
+ """
+ Search for differences in all loaded comments between the bugs
+ old and new. Returns
+ (added_comments, modified_comments, removed_comments)
+ analogous to ._changed_bugs.
+ """
+ if hasattr(self, "__changed_comments"):
+ if new.uuid in self.__changed_comments:
+ return self.__changed_comments[new.uuid]
+ else:
+ self.__changed_comments = {}
+ added = []
+ removed = []
+ modified = []
+ old.comment_root.sort(key=lambda comm : comm.time)
+ new.comment_root.sort(key=lambda comm : comm.time)
+ old_comment_ids = [c.uuid for c in old.comments()]
+ new_comment_ids = [c.uuid for c in new.comments()]
+ for uuid in new_comment_ids:
+ new_comment = new.comment_from_uuid(uuid)
+ try:
+ old_comment = old.comment_from_uuid(uuid)
+ except KeyError:
+ added.append(new_comment)
+ else:
+ if old_comment != new_comment:
+ modified.append((old_comment, new_comment))
+ for uuid in old_comment_ids:
+ if uuid not in new_comment_ids:
+ new_comment = new.comment_from_uuid(uuid)
+ removed.append(new_comment)
+ self.__changed_comments[new.uuid] = (added, modified, removed)
+ return self.__changed_comments[new.uuid]
+ def _attribute_changes(self, old, new, attributes):
+ """
+ Take two objects old and new, and compare the value of *.attr
+ for attr in the list attribute names. Returns a list of
+ (attr_name, old_value, new_value)
+ tuples.
+ """
+ change_list = []
+ for attr in attributes:
+ old_value = getattr(old, attr)
+ new_value = getattr(new, attr)
+ if old_value != new_value:
+ change_list.append((attr, old_value, new_value))
+ if len(change_list) >= 0:
+ return change_list
return None
+ def _settings_properties_attribute_changes(self, old, new,
+ hidden_properties=[]):
+ properties = sorted(new.settings_properties)
+ for p in hidden_properties:
+ properties.remove(p)
+ attributes = [settings_object.setting_name_to_attr_name(None, p)
+ for p in properties]
+ return self._attribute_changes(old, new, attributes)
+ def _bugdir_attribute_changes(self):
+ return self._settings_properties_attribute_changes( \
+ self.old_bugdir, self.new_bugdir,
+ ["vcs_name"]) # tweaked by bugdir.duplicate_bugdir
+ def _bug_attribute_changes(self, old, new):
+ return self._settings_properties_attribute_changes(old, new)
+ def _comment_attribute_changes(self, old, new):
+ return self._settings_properties_attribute_changes(old, new)
-def bug_changes(old, new):
- bug_settings = sorted(new.settings_properties)
- change_list = change_lines(old, new, bug_settings)
- change_strings = ["%s: %s -> %s" % f for f in change_list]
+ # report generation methods
- old_comment_ids = [c.uuid for c in old.comments()]
- new_comment_ids = [c.uuid for c in new.comments()]
- for comment_id in new_comment_ids:
- if comment_id not in old_comment_ids:
- summary = comment_summary(new.comment_from_uuid(comment_id), "new")
- change_strings.append(summary)
- for comment_id in old_comment_ids:
- if comment_id not in new_comment_ids:
- summary = comment_summary(new.comment_from_uuid(comment_id),
- "removed")
- change_strings.append(summary)
+ def report_tree(self, diff_tree=DiffTree):
+ """
+ Pretty bare to make it easy to adjust to specific cases. You
+ can pass in a DiffTree subclass via diff_tree to override the
+ default report assembly process.
+ """
+ if hasattr(self, "__report_tree"):
+ return self.__report_tree
+ bugdir_settings = sorted(self.new_bugdir.settings_properties)
+ bugdir_settings.remove("vcs_name") # tweaked by bugdir.duplicate_bugdir
+ root = diff_tree("bugdir")
+ bugdir_attribute_changes = self._bugdir_attribute_changes()
+ if len(bugdir_attribute_changes) > 0:
+ bugdir = diff_tree("settings", bugdir_attribute_changes,
+ self.bugdir_attribute_change_string)
+ root.append(bugdir)
+ bug_root = diff_tree("bugs")
+ root.append(bug_root)
+ add,mod,rem = self._changed_bugs()
+ bnew = diff_tree("new", "New bugs:", requires_children=True)
+ bug_root.append(bnew)
+ for bug in add:
+ b = diff_tree(bug.uuid, bug, self.bug_add_string)
+ bnew.append(b)
+ brem = diff_tree("rem", "Removed bugs:", requires_children=True)
+ bug_root.append(brem)
+ for bug in rem:
+ b = diff_tree(bug.uuid, bug, self.bug_rem_string)
+ brem.append(b)
+ bmod = diff_tree("mod", "Modified bugs:", requires_children=True)
+ bug_root.append(bmod)
+ for old,new in mod:
+ b = diff_tree(new.uuid, (old,new), self.bug_mod_string)
+ bmod.append(b)
+ bug_attribute_changes = self._bug_attribute_changes(old, new)
+ if len(bug_attribute_changes) > 0:
+ bset = diff_tree("settings", bug_attribute_changes,
+ self.bug_attribute_change_string)
+ b.append(bset)
+ if old.summary != new.summary:
+ data = (old.summary, new.summary)
+ bsum = diff_tree("summary", data, self.bug_summary_change_string)
+ b.append(bsum)
+ cr = diff_tree("comments")
+ b.append(cr)
+ a,m,d = self._changed_comments(old, new)
+ cnew = diff_tree("new", "New comments:", requires_children=True)
+ for comment in a:
+ c = diff_tree(comment.uuid, comment, self.comment_add_string)
+ cnew.append(c)
+ crem = diff_tree("rem", "Removed comments:",requires_children=True)
+ for comment in d:
+ c = diff_tree(comment.uuid, comment, self.comment_rem_string)
+ crem.append(c)
+ cmod = diff_tree("mod","Modified comments:",requires_children=True)
+ for o,n in m:
+ c = diff_tree(n.uuid, (o,n), self.comment_mod_string)
+ cmod.append(c)
+ comm_attribute_changes = self._comment_attribute_changes(o, n)
+ if len(comm_attribute_changes) > 0:
+ cset = diff_tree("settings", comm_attribute_changes,
+ self.comment_attribute_change_string)
+ if o.body != n.body:
+ data = (o.body, n.body)
+ cbody = diff_tree("cbody", data,
+ self.comment_body_change_string)
+ c.append(cbody)
+ cr.extend([cnew, crem, cmod])
+ self.__report_tree = root
+ return self.__report_tree
- if len(change_strings) == 0:
- return None
- return "%s\n %s" % (new.string(shortlist=True),
- " \n".join(change_strings))
+ # change data -> string methods.
+ # Feel free to play with these in subclasses.
+ def attribute_change_string(self, attribute_changes, indent=0):
+ indent_string = " "*indent
+ change_strings = [u"%s: %s -> %s" % f for f in attribute_changes]
+ for i,change_string in enumerate(change_strings):
+ change_strings[i] = indent_string+change_string
+ return u"\n".join(change_strings)
+ def bugdir_attribute_change_string(self, attribute_changes):
+ return "Changed bug directory settings:\n%s" % \
+ self.attribute_change_string(attribute_changes, indent=1)
+ def bug_attribute_change_string(self, attribute_changes):
+ return "Changed bug settings:\n%s" % \
+ self.attribute_change_string(attribute_changes, indent=1)
+ def comment_attribute_change_string(self, attribute_changes):
+ return "Changed comment settings:\n%s" % \
+ self.attribute_change_string(attribute_changes, indent=1)
+ def bug_add_string(self, bug):
+ return bug.string(shortlist=True)
+ def bug_rem_string(self, bug):
+ return bug.string(shortlist=True)
+ def bug_mod_string(self, bugs):
+ old_bug,new_bug = bugs
+ return new_bug.string(shortlist=True)
+ def bug_summary_change_string(self, summaries):
+ old_summary,new_summary = summaries
+ return "summary changed:\n %s\n %s" % (old_summary, new_summary)
+ def _comment_summary_string(self, comment):
+ return "from %s on %s" % (comment.author, time_to_str(comment.time))
+ def comment_add_string(self, comment):
+ summary = self._comment_summary_string(comment)
+ first_line = comment.body.splitlines()[0]
+ return "%s\n %s..." % (summary, first_line)
+ def comment_rem_string(self, comment):
+ summary = self._comment_summary_string(comment)
+ first_line = comment.body.splitlines()[0]
+ return "%s\n %s..." % (summary, first_line)
+ def comment_mod_string(self, comments):
+ old_comment,new_comment = comments
+ return self._comment_summary_string(new_comment)
+ def comment_body_change_string(self, bodies):
+ old_body,new_body = bodies
+ return difflib.unified_diff(old_body, new_body)
-def comment_summary(comment, status):
- return "%8s comment from %s on %s" % (status, comment.From,
- time_to_str(comment.time))
suite = doctest.DocTestSuite()
diff --git a/libbe/editor.py b/libbe/editor.py
index 93144b8..ec41006 100644
--- a/libbe/editor.py
+++ b/libbe/editor.py
@@ -15,6 +15,11 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+Define editor_string(), a function that invokes an editor to accept
+user-produced text as a string.
+"""
+
import codecs
import locale
import os
@@ -22,6 +27,7 @@ import sys
import tempfile
import doctest
+
default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
comment_marker = u"== Anything below this line will be ignored\n"
@@ -62,7 +68,8 @@ def editor_string(comment=None, encoding=None):
fhandle, fname = tempfile.mkstemp()
try:
if comment is not None:
- os.write(fhandle, '\n'+comment_string(comment))
+ cstring = u'\n'+comment_string(comment)
+ os.write(fhandle, cstring.encode(encoding))
os.close(fhandle)
oldmtime = os.path.getmtime(fname)
os.system("%s %s" % (editor, fname))
diff --git a/libbe/encoding.py b/libbe/encoding.py
index d603602..fd513b5 100644
--- a/libbe/encoding.py
+++ b/libbe/encoding.py
@@ -14,16 +14,26 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Support input/output/filesystem encodings (e.g. UTF-8).
+"""
+
import codecs
import locale
import sys
import doctest
+
+ENCODING = None # override get_encoding() output by setting this
+
def get_encoding():
"""
Guess a useful input/output/filesystem encoding... Maybe we need
seperate encodings for input/output and filesystem? Hmm...
"""
+ if ENCODING != None:
+ return ENCODING
encoding = locale.getpreferredencoding() or sys.getdefaultencoding()
if sys.platform != 'win32' or sys.version_info[:2] > (2, 3):
encoding = locale.getlocale(locale.LC_TIME)[1] or encoding
diff --git a/libbe/git.py b/libbe/git.py
index 2f9ffa9..3abe3b8 100644
--- a/libbe/git.py
+++ b/libbe/git.py
@@ -16,30 +16,34 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+Git backend.
+"""
+
import os
import re
import sys
import unittest
import doctest
-import rcs
-from rcs import RCS
+import vcs
+
def new():
return Git()
-class Git(RCS):
+class Git(vcs.VCS):
name="git"
client="git"
versioned=True
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
if self._u_search_parent_directories(path, ".git") != None :
return True
return False
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
# Assume that nothing funny is going on; in particular, that we aren't
# dealing with a bare repo.
@@ -50,13 +54,21 @@ class Git(RCS):
gitdir = os.path.join(path, output.rstrip('\n'))
dirname = os.path.abspath(os.path.dirname(gitdir))
return dirname
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._u_invoke_client("init", directory=path)
- def _rcs_get_user_id(self):
- status,output,error = self._u_invoke_client("config", "user.name")
- name = output.rstrip('\n')
- status,output,error = self._u_invoke_client("config", "user.email")
- email = output.rstrip('\n')
+ def _vcs_get_user_id(self):
+ status,output,error = \
+ self._u_invoke_client("config", "user.name", expect=(0,1))
+ if status == 0:
+ name = output.rstrip('\n')
+ else:
+ name = ""
+ status,output,error = \
+ self._u_invoke_client("config", "user.email", expect=(0,1))
+ if status == 0:
+ email = output.rstrip('\n')
+ else:
+ email = ""
if name != "" or email != "": # got something!
# guess missing info, if necessary
if name == "":
@@ -65,35 +77,35 @@ class Git(RCS):
email = self._u_get_fallback_email()
return self._u_create_id(name, email)
return None # Git has no infomation
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
name,email = self._u_parse_id(value)
if email != None:
self._u_invoke_client("config", "user.email", email)
self._u_invoke_client("config", "user.name", name)
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
if os.path.isdir(path):
return
self._u_invoke_client("add", path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
if not os.path.isdir(self._u_abspath(path)):
self._u_invoke_client("rm", "-f", path)
- def _rcs_update(self, path):
- self._rcs_add(path)
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_update(self, path):
+ self._vcs_add(path)
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
else:
arg = "%s:%s" % (revision,path)
status,output,error = self._u_invoke_client("show", arg)
return output
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision==None:
- RCS._rcs_duplicate_repo(self, directory, revision)
+ vcs.VCS._vcs_duplicate_repo(self, directory, revision)
else:
#self._u_invoke_client("archive", revision, directory) # makes tarball
self._u_invoke_client("clone", "--no-checkout",".",directory)
self._u_invoke_client("checkout", revision, directory=directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
args = ['commit', '--all', '--file', commitfile]
if allow_empty == True:
args.append("--allow-empty")
@@ -104,17 +116,33 @@ class Git(RCS):
strings = ["nothing to commit",
"nothing added to commit"]
if self._u_any_in_string(strings, output) == True:
- raise rcs.EmptyCommit()
+ raise vcs.EmptyCommit()
revision = None
revline = re.compile("(.*) (.*)[:\]] (.*)")
match = revline.search(output)
assert match != None, output+error
assert len(match.groups()) == 3
revision = match.groups()[1]
- return revision
+ full_revision = self._vcs_revision_id(-1)
+ assert full_revision.startswith(revision), \
+ "Mismatched revisions:\n%s\n%s" % (revision, full_revision)
+ return full_revision
+ def _vcs_revision_id(self, index):
+ args = ["rev-list", "--first-parent", "--reverse", "HEAD"]
+ kwargs = {"expect":(0,128)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status == 128:
+ if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
+ return None
+ raise vcs.CommandError(args, status, stdout="", stderr=error)
+ commits = output.splitlines()
+ try:
+ return commits[index]
+ except IndexError:
+ return None
-rcs.make_rcs_testcase_subclasses(Git, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Git, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/hg.py b/libbe/hg.py
index a20eeb5..f8f8121 100644
--- a/libbe/hg.py
+++ b/libbe/hg.py
@@ -16,81 +16,88 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+Mercurial (hg) backend.
+"""
+
import os
import re
import sys
import unittest
import doctest
-import rcs
-from rcs import RCS
+import vcs
+
def new():
return Hg()
-class Hg(RCS):
+class Hg(vcs.VCS):
name="hg"
client="hg"
versioned=True
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
"""Detect whether a directory is revision-controlled using Mercurial"""
if self._u_search_parent_directories(path, ".hg") != None:
return True
return False
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
status,output,error = self._u_invoke_client("root", directory=path)
return output.rstrip('\n')
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._u_invoke_client("init", directory=path)
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
status,output,error = self._u_invoke_client("showconfig","ui.username")
return output.rstrip('\n')
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
"""
Supported by the Config Extension, but that is not part of
standard Mercurial.
http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
"""
- raise rcs.SettingIDnotSupported
- def _rcs_add(self, path):
+ raise vcs.SettingIDnotSupported
+ def _vcs_add(self, path):
self._u_invoke_client("add", path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
self._u_invoke_client("rm", "--force", path)
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
pass
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
else:
status,output,error = \
self._u_invoke_client("cat","-r",revision,path)
return output
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision == None:
- return RCS._rcs_duplicate_repo(self, directory, revision)
+ return vcs.VCS._vcs_duplicate_repo(self, directory, revision)
else:
self._u_invoke_client("archive", "--rev", revision, directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
args = ['commit', '--logfile', commitfile]
status,output,error = self._u_invoke_client(*args)
if allow_empty == False:
strings = ["nothing changed"]
if self._u_any_in_string(strings, output) == True:
- raise rcs.EmptyCommit()
- status,output,error = self._u_invoke_client('identify')
- revision = None
- revline = re.compile("(.*) tip")
- match = revline.search(output)
- assert match != None, output+error
- assert len(match.groups()) == 1
- revision = match.groups()[0]
- return revision
+ raise vcs.EmptyCommit()
+ return self._vcs_revision_id(-1)
+ def _vcs_revision_id(self, index, style="id"):
+ args = ["identify", "--rev", str(int(index)), "--%s" % style]
+ kwargs = {"expect": (0,255)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status == 0:
+ id = output.strip()
+ if id == '000000000000':
+ return None # before initial commit.
+ return id
+ return None
-rcs.make_rcs_testcase_subclasses(Hg, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/mapfile.py b/libbe/mapfile.py
index b959d76..4d69601 100644
--- a/libbe/mapfile.py
+++ b/libbe/mapfile.py
@@ -14,12 +14,19 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import yaml
-import os.path
+
+"""
+Provide a means of saving and loading dictionaries of parameters. The
+saved "mapfiles" should be clear, flat-text files, and allow easy merging of
+independent/conflicting changes.
+"""
+
import errno
-import utility
+import os.path
+import yaml
import doctest
+
class IllegalKey(Exception):
def __init__(self, key):
Exception.__init__(self, 'Illegal key "%s"' % key)
@@ -95,33 +102,15 @@ def parse(contents):
>>> dict["e"]
'f'
"""
- old_format = False
- for line in contents.splitlines():
- if len(line.split("=")) == 2:
- old_format = True
- break
- if old_format: # translate to YAML. Hack to deal with old BE bugs.
- newlines = []
- for line in contents.splitlines():
- line = line.rstrip('\n')
- if len(line) == 0:
- continue
- fields = line.split("=")
- if len(fields) == 2:
- key,value = fields
- newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
- else:
- newlines.append(line)
- contents = '\n'.join(newlines)
return yaml.load(contents) or {}
-def map_save(rcs, path, map, allow_no_rcs=False):
+def map_save(vcs, path, map, allow_no_vcs=False):
"""Save the map as a mapfile to the specified path"""
contents = generate(map)
- rcs.set_file_contents(path, contents, allow_no_rcs)
+ vcs.set_file_contents(path, contents, allow_no_vcs)
-def map_load(rcs, path, allow_no_rcs=False):
- contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs)
+def map_load(vcs, path, allow_no_vcs=False):
+ contents = vcs.get_file_contents(path, allow_no_vcs=allow_no_vcs)
return parse(contents)
suite = doctest.DocTestSuite()
diff --git a/libbe/plugin.py b/libbe/plugin.py
index 0545fd7..d593d69 100644
--- a/libbe/plugin.py
+++ b/libbe/plugin.py
@@ -15,6 +15,12 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Allow simple listing and loading of the various becommands and libbe
+submodules (i.e. "plugins").
+"""
+
import os
import os.path
import sys
diff --git a/libbe/properties.py b/libbe/properties.py
index 144220b..09dd20e 100644
--- a/libbe/properties.py
+++ b/libbe/properties.py
@@ -160,10 +160,10 @@ def _get_cached_mutable_property(self, cacher_name, property_name, default=None)
if (cacher_name, property_name) not in self._mutable_property_cache_copy:
return default
return self._mutable_property_cache_copy[(cacher_name, property_name)]
-def _cmp_cached_mutable_property(self, cacher_name, property_name, value):
+def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None):
_init_mutable_property_cache(self)
if (cacher_name, property_name) not in self._mutable_property_cache_hash:
- return 1 # any value > non-existant old hash
+ _set_cached_mutable_property(self, cacher_name, property_name, default)
old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)]
return cmp(_hash_mutable_value(value), old_hash)
@@ -327,7 +327,7 @@ def primed_property(primer, initVal=None):
return funcs
return decorator
-def change_hook_property(hook, mutable=False):
+def change_hook_property(hook, mutable=False, default=None):
"""
Call the function hook(instance, old_value, new_value) whenever a
value different from the current value is set (instance is a a
@@ -359,9 +359,9 @@ def change_hook_property(hook, mutable=False):
value = new_value # compare new value with cached
else:
value = fget(self) # compare current value with cached
- if _cmp_cached_mutable_property(self, "change hook property", name, value) != 0:
+ if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0:
# there has been a change, cache new value
- old_value = _get_cached_mutable_property(self, "change hook property", name)
+ old_value = _get_cached_mutable_property(self, "change hook property", name, default)
_set_cached_mutable_property(self, "change hook property", name, value)
if from_fset == True: # return previously cached value
value = old_value
diff --git a/libbe/rcs.py b/libbe/rcs.py
index 294b8e0..e69de29 100644
--- a/libbe/rcs.py
+++ b/libbe/rcs.py
@@ -1,876 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# Alexander Belchenko <bialix@ukr.net>
-# Ben Finney <ben+python@benfinney.id.au>
-# Chris Ball <cjb@laptop.org>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-from subprocess import Popen, PIPE
-import codecs
-import os
-import os.path
-import re
-from socket import gethostname
-import shutil
-import sys
-import tempfile
-import unittest
-import doctest
-
-from utility import Dir, search_parent_directories
-
-
-def _get_matching_rcs(matchfn):
- """Return the first module for which matchfn(RCS_instance) is true"""
- import arch
- import bzr
- import darcs
- import git
- import hg
- for module in [arch, bzr, darcs, git, hg]:
- rcs = module.new()
- if matchfn(rcs) == True:
- return rcs
- del(rcs)
- return RCS()
-
-def rcs_by_name(rcs_name):
- """Return the module for the RCS with the given name"""
- return _get_matching_rcs(lambda rcs: rcs.name == rcs_name)
-
-def detect_rcs(dir):
- """Return an RCS instance for the rcs being used in this directory"""
- return _get_matching_rcs(lambda rcs: rcs.detect(dir))
-
-def installed_rcs():
- """Return an instance of an installed RCS"""
- return _get_matching_rcs(lambda rcs: rcs.installed())
-
-
-class CommandError(Exception):
- def __init__(self, command, status, err_str):
- strerror = ["Command failed (%d):\n %s\n" % (status, err_str),
- "while executing\n %s" % command]
- Exception.__init__(self, "\n".join(strerror))
- self.command = command
- self.status = status
- self.err_str = err_str
-
-class SettingIDnotSupported(NotImplementedError):
- pass
-
-class RCSnotRooted(Exception):
- def __init__(self):
- msg = "RCS not rooted"
- Exception.__init__(self, msg)
-
-class PathNotInRoot(Exception):
- def __init__(self, path, root):
- msg = "Path '%s' not in root '%s'" % (path, root)
- Exception.__init__(self, msg)
- self.path = path
- self.root = root
-
-class NoSuchFile(Exception):
- def __init__(self, pathname, root="."):
- path = os.path.abspath(os.path.join(root, pathname))
- Exception.__init__(self, "No such file: %s" % path)
-
-class EmptyCommit(Exception):
- def __init__(self):
- Exception.__init__(self, "No changes to commit")
-
-
-def new():
- return RCS()
-
-class RCS(object):
- """
- This class implements a 'no-rcs' interface.
-
- Support for other RCSs can be added by subclassing this class, and
- overriding methods _rcs_*() with code appropriate for your RCS.
-
- The methods _u_*() are utility methods available to the _rcs_*()
- methods.
- """
- name = "None"
- client = "" # command-line tool for _u_invoke_client
- versioned = False
- def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()):
- self.paranoid = paranoid
- self.verboseInvoke = False
- self.rootdir = None
- self._duplicateBasedir = None
- self._duplicateDirname = None
- self.encoding = encoding
- def __del__(self):
- self.cleanup()
-
- def _rcs_help(self):
- """
- Return the command help string.
- (Allows a simple test to see if the client is installed.)
- """
- pass
- def _rcs_detect(self, path=None):
- """
- Detect whether a directory is revision controlled with this RCS.
- """
- return True
- def _rcs_root(self, path):
- """
- Get the RCS root. This is the default working directory for
- future invocations. You would normally set this to the root
- directory for your RCS.
- """
- if os.path.isdir(path)==False:
- path = os.path.dirname(path)
- if path == "":
- path = os.path.abspath(".")
- return path
- def _rcs_init(self, path):
- """
- Begin versioning the tree based at path.
- """
- pass
- def _rcs_cleanup(self):
- """
- Remove any cruft that _rcs_init() created outside of the
- versioned tree.
- """
- pass
- def _rcs_get_user_id(self):
- """
- Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
- If the RCS has not been configured with a username, return None.
- """
- return None
- def _rcs_set_user_id(self, value):
- """
- Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
- This is run if the RCS has not been configured with a usename, so
- that commits will have a reasonable FROM value.
- """
- raise SettingIDnotSupported
- def _rcs_add(self, path):
- """
- Add the already created file at path to version control.
- """
- pass
- def _rcs_remove(self, path):
- """
- Remove the file at path from version control. Optionally
- remove the file from the filesystem as well.
- """
- pass
- def _rcs_update(self, path):
- """
- Notify the versioning system of changes to the versioned file
- at path.
- """
- pass
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
- """
- Get the file contents as they were in a given revision.
- Revision==None specifies the current revision.
- """
- assert revision == None, \
- "The %s RCS does not support revision specifiers" % self.name
- if binary == False:
- f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding)
- else:
- f = open(os.path.join(self.rootdir, path), "rb")
- contents = f.read()
- f.close()
- return contents
- def _rcs_duplicate_repo(self, directory, revision=None):
- """
- Get the repository as it was in a given revision.
- revision==None specifies the current revision.
- dir specifies a directory to create the duplicate in.
- """
- shutil.copytree(self.rootdir, directory, True)
- def _rcs_commit(self, commitfile, allow_empty=False):
- """
- Commit the current working directory, using the contents of
- commitfile as the comment. Return the name of the old
- revision (or None if commits are not supported).
-
- If allow_empty == False, raise EmptyCommit if there are no
- changes to commit.
- """
- return None
- def installed(self):
- try:
- self._rcs_help()
- return True
- except OSError, e:
- if e.errno == errno.ENOENT:
- return False
- except CommandError:
- return False
- def detect(self, path="."):
- """
- Detect whether a directory is revision controlled with this RCS.
- """
- return self._rcs_detect(path)
- def root(self, path):
- """
- Set the root directory to the path's RCS root. This is the
- default working directory for future invocations.
- """
- self.rootdir = self._rcs_root(path)
- def init(self, path):
- """
- Begin versioning the tree based at path.
- Also roots the rcs at path.
- """
- if os.path.isdir(path)==False:
- path = os.path.dirname(path)
- self._rcs_init(path)
- self.root(path)
- def cleanup(self):
- self._rcs_cleanup()
- def get_user_id(self):
- """
- Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
- If the RCS has not been configured with a username, return the user's
- id. You can override the automatic lookup procedure by setting the
- RCS.user_id attribute to a string of your choice.
- """
- if hasattr(self, "user_id"):
- if self.user_id != None:
- return self.user_id
- id = self._rcs_get_user_id()
- if id == None:
- name = self._u_get_fallback_username()
- email = self._u_get_fallback_email()
- id = self._u_create_id(name, email)
- print >> sys.stderr, "Guessing id '%s'" % id
- try:
- self.set_user_id(id)
- except SettingIDnotSupported:
- pass
- return id
- def set_user_id(self, value):
- """
- Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
- This is run if the RCS has not been configured with a usename, so
- that commits will have a reasonable FROM value.
- """
- self._rcs_set_user_id(value)
- def add(self, path):
- """
- Add the already created file at path to version control.
- """
- self._rcs_add(self._u_rel_path(path))
- def remove(self, path):
- """
- Remove a file from both version control and the filesystem.
- """
- self._rcs_remove(self._u_rel_path(path))
- if os.path.exists(path):
- os.remove(path)
- def recursive_remove(self, dirname):
- """
- Remove a file/directory and all its decendents from both
- version control and the filesystem.
- """
- if not os.path.exists(dirname):
- raise NoSuchFile(dirname)
- for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
- filenames.extend(dirnames)
- for path in filenames:
- fullpath = os.path.join(dirpath, path)
- if os.path.exists(fullpath) == False:
- continue
- self._rcs_remove(self._u_rel_path(fullpath))
- if os.path.exists(dirname):
- shutil.rmtree(dirname)
- def update(self, path):
- """
- Notify the versioning system of changes to the versioned file
- at path.
- """
- self._rcs_update(self._u_rel_path(path))
- def get_file_contents(self, path, revision=None, allow_no_rcs=False, binary=False):
- """
- Get the file as it was in a given revision.
- Revision==None specifies the current revision.
- """
- if not os.path.exists(path):
- raise NoSuchFile(path)
- if self._use_rcs(path, allow_no_rcs):
- relpath = self._u_rel_path(path)
- contents = self._rcs_get_file_contents(relpath,revision,binary=binary)
- else:
- f = codecs.open(path, "r", self.encoding)
- contents = f.read()
- f.close()
- return contents
- def set_file_contents(self, path, contents, allow_no_rcs=False, binary=False):
- """
- Set the file contents under version control.
- """
- add = not os.path.exists(path)
- if binary == False:
- f = codecs.open(path, "w", self.encoding)
- else:
- f = open(path, "wb")
- f.write(contents)
- f.close()
-
- if self._use_rcs(path, allow_no_rcs):
- if add:
- self.add(path)
- else:
- self.update(path)
- def mkdir(self, path, allow_no_rcs=False, check_parents=True):
- """
- Create (if neccessary) a directory at path under version
- control.
- """
- if check_parents == True:
- parent = os.path.dirname(path)
- if not os.path.exists(parent): # recurse through parents
- self.mkdir(parent, allow_no_rcs, check_parents)
- if not os.path.exists(path):
- os.mkdir(path)
- if self._use_rcs(path, allow_no_rcs):
- self.add(path)
- else:
- assert os.path.isdir(path)
- if self._use_rcs(path, allow_no_rcs):
- #self.update(path)# Don't update directories. Changing files
- pass # underneath them should be sufficient.
-
- def duplicate_repo(self, revision=None):
- """
- Get the repository as it was in a given revision.
- revision==None specifies the current revision.
- Return the path to the arbitrary directory at the base of the new repo.
- """
- # Dirname in Baseir to protect against simlink attacks.
- if self._duplicateBasedir == None:
- self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs')
- self._duplicateDirname = \
- os.path.join(self._duplicateBasedir, "duplicate")
- self._rcs_duplicate_repo(directory=self._duplicateDirname,
- revision=revision)
- return self._duplicateDirname
- def remove_duplicate_repo(self):
- """
- Clean up a duplicate repo created with duplicate_repo().
- """
- if self._duplicateBasedir != None:
- shutil.rmtree(self._duplicateBasedir)
- self._duplicateBasedir = None
- self._duplicateDirname = None
- def commit(self, summary, body=None, allow_empty=False):
- """
- Commit the current working directory, with a commit message
- string summary and body. Return the name of the old revision
- (or None if versioning is not supported).
-
- If allow_empty == False (the default), raise EmptyCommit if
- there are no changes to commit.
- """
- summary = summary.strip()+'\n'
- if body is not None:
- summary += '\n' + body.strip() + '\n'
- descriptor, filename = tempfile.mkstemp()
- revision = None
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.flush()
- self.precommit()
- revision = self._rcs_commit(filename, allow_empty=allow_empty)
- temp_file.close()
- self.postcommit()
- finally:
- os.remove(filename)
- return revision
- def precommit(self):
- """
- Executed before all attempted commits.
- """
- pass
- def postcommit(self):
- """
- Only executed after successful commits.
- """
- pass
- def _u_any_in_string(self, list, string):
- """
- Return True if any of the strings in list are in string.
- Otherwise return False.
- """
- for list_string in list:
- if list_string in string:
- return True
- return False
- def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None):
- """
- expect should be a tuple of allowed exit codes. cwd should be
- the directory from which the command will be executed.
- """
- if cwd == None:
- cwd = self.rootdir
- if self.verboseInvoke == True:
- print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args))
- try :
- if sys.platform != "win32":
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
- else:
- # win32 don't have os.execvp() so have to run command in a shell
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
- shell=True, cwd=cwd)
- except OSError, e :
- raise CommandError(args, e.args[0], e)
- output, error = q.communicate(input=stdin)
- status = q.wait()
- if self.verboseInvoke == True:
- print >> sys.stderr, "%d\n%s%s" % (status, output, error)
- if status not in expect:
- raise CommandError(args, status, error)
- return status, output, error
- def _u_invoke_client(self, *args, **kwargs):
- directory = kwargs.get('directory',None)
- expect = kwargs.get('expect', (0,))
- stdin = kwargs.get('stdin', None)
- cl_args = [self.client]
- cl_args.extend(args)
- return self._u_invoke(cl_args, stdin=stdin,expect=expect,cwd=directory)
- def _u_search_parent_directories(self, path, filename):
- """
- Find the file (or directory) named filename in path or in any
- of path's parents.
-
- e.g.
- search_parent_directories("/a/b/c", ".be")
- will return the path to the first existing file from
- /a/b/c/.be
- /a/b/.be
- /a/.be
- /.be
- or None if none of those files exist.
- """
- return search_parent_directories(path, filename)
- def _use_rcs(self, path, allow_no_rcs):
- """
- Try and decide if _rcs_add/update/mkdir/etc calls will
- succeed. Returns True is we think the rcs_call would
- succeeed, and False otherwise.
- """
- use_rcs = True
- exception = None
- if self.rootdir != None:
- if self.path_in_root(path) == False:
- use_rcs = False
- exception = PathNotInRoot(path, self.rootdir)
- else:
- use_rcs = False
- exception = RCSnotRooted
- if use_rcs == False and allow_no_rcs==False:
- raise exception
- return use_rcs
- def path_in_root(self, path, root=None):
- """
- Return the relative path to path from root.
- >>> rcs = new()
- >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c")
- True
- >>> rcs.path_in_root("/a.b/.be", "/a.b/c")
- False
- """
- if root == None:
- if self.rootdir == None:
- raise RCSnotRooted
- root = self.rootdir
- path = os.path.abspath(path)
- absRoot = os.path.abspath(root)
- absRootSlashedDir = os.path.join(absRoot,"")
- if not path.startswith(absRootSlashedDir):
- return False
- return True
- def _u_rel_path(self, path, root=None):
- """
- Return the relative path to path from root.
- >>> rcs = new()
- >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c")
- '.be'
- """
- if root == None:
- if self.rootdir == None:
- raise RCSnotRooted
- root = self.rootdir
- path = os.path.abspath(path)
- absRoot = os.path.abspath(root)
- absRootSlashedDir = os.path.join(absRoot,"")
- if not path.startswith(absRootSlashedDir):
- raise PathNotInRoot(path, absRootSlashedDir)
- assert path != absRootSlashedDir, \
- "file %s == root directory %s" % (path, absRootSlashedDir)
- relpath = path[len(absRootSlashedDir):]
- return relpath
- def _u_abspath(self, path, root=None):
- """
- Return the absolute path from a path realtive to root.
- >>> rcs = new()
- >>> rcs._u_abspath(".be", "/a.b/c")
- '/a.b/c/.be'
- """
- if root == None:
- assert self.rootdir != None, "RCS not rooted"
- root = self.rootdir
- return os.path.abspath(os.path.join(root, path))
- def _u_create_id(self, name, email=None):
- """
- >>> rcs = new()
- >>> rcs._u_create_id("John Doe", "jdoe@example.com")
- 'John Doe <jdoe@example.com>'
- >>> rcs._u_create_id("John Doe")
- 'John Doe'
- """
- assert len(name) > 0
- if email == None or len(email) == 0:
- return name
- else:
- return "%s <%s>" % (name, email)
- def _u_parse_id(self, value):
- """
- >>> rcs = new()
- >>> rcs._u_parse_id("John Doe <jdoe@example.com>")
- ('John Doe', 'jdoe@example.com')
- >>> rcs._u_parse_id("John Doe")
- ('John Doe', None)
- >>> try:
- ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>")
- ... except AssertionError:
- ... print "Invalid match"
- Invalid match
- """
- emailexp = re.compile("(.*) <([^>]*)>(.*)")
- match = emailexp.search(value)
- if match == None:
- email = None
- name = value
- else:
- assert len(match.groups()) == 3
- assert match.groups()[2] == "", match.groups()
- email = match.groups()[1]
- name = match.groups()[0]
- assert name != None
- assert len(name) > 0
- return (name, email)
- def _u_get_fallback_username(self):
- name = None
- for envariable in ["LOGNAME", "USERNAME"]:
- if os.environ.has_key(envariable):
- name = os.environ[envariable]
- break
- assert name != None
- return name
- def _u_get_fallback_email(self):
- hostname = gethostname()
- name = self._u_get_fallback_username()
- return "%s@%s" % (name, hostname)
- def _u_parse_commitfile(self, commitfile):
- """
- Split the commitfile created in self.commit() back into
- summary and header lines.
- """
- f = codecs.open(commitfile, "r", self.encoding)
- summary = f.readline()
- body = f.read()
- body.lstrip('\n')
- if len(body) == 0:
- body = None
- f.close()
- return (summary, body)
-
-
-def setup_rcs_test_fixtures(testcase):
- """Set up test fixtures for RCS test case."""
- testcase.rcs = testcase.Class()
- testcase.dir = Dir()
- testcase.dirname = testcase.dir.path
-
- rcs_not_supporting_uninitialized_user_id = []
- rcs_not_supporting_set_user_id = ["None", "hg"]
- testcase.rcs_supports_uninitialized_user_id = (
- testcase.rcs.name not in rcs_not_supporting_uninitialized_user_id)
- testcase.rcs_supports_set_user_id = (
- testcase.rcs.name not in rcs_not_supporting_set_user_id)
-
- if not testcase.rcs.installed():
- testcase.fail(
- "%(name)s RCS not found" % vars(testcase.Class))
-
- if testcase.Class.name != "None":
- testcase.failIf(
- testcase.rcs.detect(testcase.dirname),
- "Detected %(name)s RCS before initialising"
- % vars(testcase.Class))
-
- testcase.rcs.init(testcase.dirname)
-
-
-class RCSTestCase(unittest.TestCase):
- """Test cases for base RCS class."""
-
- Class = RCS
-
- def __init__(self, *args, **kwargs):
- super(RCSTestCase, self).__init__(*args, **kwargs)
- self.dirname = None
-
- def setUp(self):
- super(RCSTestCase, self).setUp()
- setup_rcs_test_fixtures(self)
-
- def tearDown(self):
- del(self.rcs)
- super(RCSTestCase, self).tearDown()
-
- def full_path(self, rel_path):
- return os.path.join(self.dirname, rel_path)
-
-
-class RCS_init_TestCase(RCSTestCase):
- """Test cases for RCS.init method."""
-
- def test_detect_should_succeed_after_init(self):
- """Should detect RCS in directory after initialization."""
- self.failUnless(
- self.rcs.detect(self.dirname),
- "Did not detect %(name)s RCS after initialising"
- % vars(self.Class))
-
- def test_rcs_rootdir_in_specified_root_path(self):
- """RCS root directory should be in specified root path."""
- rp = os.path.realpath(self.rcs.rootdir)
- dp = os.path.realpath(self.dirname)
- rcs_name = self.Class.name
- self.failUnless(
- dp == rp or rp == None,
- "%(rcs_name)s RCS root in wrong dir (%(dp)s %(rp)s)" % vars())
-
-
-class RCS_get_user_id_TestCase(RCSTestCase):
- """Test cases for RCS.get_user_id method."""
-
- def test_gets_existing_user_id(self):
- """Should get the existing user ID."""
- if not self.rcs_supports_uninitialized_user_id:
- return
-
- user_id = self.rcs.get_user_id()
- self.failUnless(
- user_id is not None,
- "unable to get a user id")
-
-
-class RCS_set_user_id_TestCase(RCSTestCase):
- """Test cases for RCS.set_user_id method."""
-
- def setUp(self):
- super(RCS_set_user_id_TestCase, self).setUp()
-
- if self.rcs_supports_uninitialized_user_id:
- self.prev_user_id = self.rcs.get_user_id()
- else:
- self.prev_user_id = "Uninitialized identity <bogus@example.org>"
-
- if self.rcs_supports_set_user_id:
- self.test_new_user_id = "John Doe <jdoe@example.com>"
- self.rcs.set_user_id(self.test_new_user_id)
-
- def tearDown(self):
- if self.rcs_supports_set_user_id:
- self.rcs.set_user_id(self.prev_user_id)
- super(RCS_set_user_id_TestCase, self).tearDown()
-
- def test_raises_error_in_unsupported_vcs(self):
- """Should raise an error in a VCS that doesn't support it."""
- if self.rcs_supports_set_user_id:
- return
- self.assertRaises(
- SettingIDnotSupported,
- self.rcs.set_user_id, "foo")
-
- def test_updates_user_id_in_supporting_rcs(self):
- """Should update the user ID in an RCS that supports it."""
- if not self.rcs_supports_set_user_id:
- return
- user_id = self.rcs.get_user_id()
- self.failUnlessEqual(
- self.test_new_user_id, user_id,
- "user id not set correctly (expected %s, got %s)"
- % (self.test_new_user_id, user_id))
-
-
-def setup_rcs_revision_test_fixtures(testcase):
- """Set up revision test fixtures for RCS test case."""
- testcase.test_dirs = ['a', 'a/b', 'c']
- for path in testcase.test_dirs:
- testcase.rcs.mkdir(testcase.full_path(path))
-
- testcase.test_files = ['a/text', 'a/b/text']
-
- testcase.test_contents = {
- 'rev_1': "Lorem ipsum",
- 'uncommitted': "dolor sit amet",
- }
-
-
-class RCS_mkdir_TestCase(RCSTestCase):
- """Test cases for RCS.mkdir method."""
-
- def setUp(self):
- super(RCS_mkdir_TestCase, self).setUp()
- setup_rcs_revision_test_fixtures(self)
-
- def tearDown(self):
- for path in reversed(sorted(self.test_dirs)):
- self.rcs.recursive_remove(self.full_path(path))
- super(RCS_mkdir_TestCase, self).tearDown()
-
- def test_mkdir_creates_directory(self):
- """Should create specified directory in filesystem."""
- for path in self.test_dirs:
- full_path = self.full_path(path)
- self.failUnless(
- os.path.exists(full_path),
- "path %(full_path)s does not exist" % vars())
-
-
-class RCS_commit_TestCase(RCSTestCase):
- """Test cases for RCS.commit method."""
-
- def setUp(self):
- super(RCS_commit_TestCase, self).setUp()
- setup_rcs_revision_test_fixtures(self)
-
- def tearDown(self):
- for path in reversed(sorted(self.test_dirs)):
- self.rcs.recursive_remove(self.full_path(path))
- super(RCS_commit_TestCase, self).tearDown()
-
- def test_file_contents_as_specified(self):
- """Should set file contents as specified."""
- test_contents = self.test_contents['rev_1']
- for path in self.test_files:
- full_path = self.full_path(path)
- self.rcs.set_file_contents(full_path, test_contents)
- current_contents = self.rcs.get_file_contents(full_path)
- self.failUnlessEqual(test_contents, current_contents)
-
- def test_file_contents_as_committed(self):
- """Should have file contents as specified after commit."""
- test_contents = self.test_contents['rev_1']
- for path in self.test_files:
- full_path = self.full_path(path)
- self.rcs.set_file_contents(full_path, test_contents)
- revision = self.rcs.commit("Initial file contents.")
- current_contents = self.rcs.get_file_contents(full_path)
- self.failUnlessEqual(test_contents, current_contents)
-
- def test_file_contents_as_set_when_uncommitted(self):
- """Should set file contents as specified after commit."""
- if not self.rcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.rcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.rcs.commit("Initial file contents.")
- self.rcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- current_contents = self.rcs.get_file_contents(full_path)
- self.failUnlessEqual(
- self.test_contents['uncommitted'], current_contents)
-
- def test_revision_file_contents_as_committed(self):
- """Should get file contents as committed to specified revision."""
- if not self.rcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.rcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.rcs.commit("Initial file contents.")
- self.rcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- committed_contents = self.rcs.get_file_contents(
- full_path, revision)
- self.failUnlessEqual(
- self.test_contents['rev_1'], committed_contents)
-
-
-class RCS_duplicate_repo_TestCase(RCSTestCase):
- """Test cases for RCS.duplicate_repo method."""
-
- def setUp(self):
- super(RCS_duplicate_repo_TestCase, self).setUp()
- setup_rcs_revision_test_fixtures(self)
-
- def tearDown(self):
- self.rcs.remove_duplicate_repo()
- for path in reversed(sorted(self.test_dirs)):
- self.rcs.recursive_remove(self.full_path(path))
- super(RCS_duplicate_repo_TestCase, self).tearDown()
-
- def test_revision_file_contents_as_committed(self):
- """Should match file contents as committed to specified revision."""
- if not self.rcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.rcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.rcs.commit("Commit current status")
- self.rcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- dup_repo_path = self.rcs.duplicate_repo(revision)
- dup_file_path = os.path.join(dup_repo_path, path)
- dup_file_contents = file(dup_file_path, 'rb').read()
- self.failUnlessEqual(
- self.test_contents['rev_1'], dup_file_contents)
- self.rcs.remove_duplicate_repo()
-
-
-def make_rcs_testcase_subclasses(rcs_class, namespace):
- """Make RCSTestCase subclasses for rcs_class in the namespace."""
- rcs_testcase_classes = [
- c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
- if issubclass(c, RCSTestCase)]
-
- for base_class in rcs_testcase_classes:
- testcase_class_name = rcs_class.__name__ + base_class.__name__
- testcase_class_bases = (base_class,)
- testcase_class_dict = dict(base_class.__dict__)
- testcase_class_dict['Class'] = rcs_class
- testcase_class = type(
- testcase_class_name, testcase_class_bases, testcase_class_dict)
- setattr(namespace, testcase_class_name, testcase_class)
-
-
-unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/settings_object.py b/libbe/settings_object.py
index dde247f..ceea9d5 100644
--- a/libbe/settings_object.py
+++ b/libbe/settings_object.py
@@ -148,7 +148,8 @@ def versioned_property(name, doc,
checked = checked_property(allowed=allowed)
fulldoc += "\n\nThe allowed values for this property are: %s." \
% (', '.join(allowed))
- hooked = change_hook_property(hook=change_hook, mutable=mutable)
+ hooked = change_hook_property(hook=change_hook, mutable=mutable,
+ default=EMPTY)
primed = primed_property(primer=primer, initVal=UNPRIMED)
settings = settings_property(name=name, null=UNPRIMED)
docp = doc_property(doc=fulldoc)
@@ -385,30 +386,24 @@ class SavedSettingsObjectTests(unittest.TestCase):
self.failUnless(SAVES == [], SAVES)
self.failUnless(t._settings_loaded == True, t._settings_loaded)
self.failUnless(t.list_type == None, t.list_type)
- self.failUnless(SAVES == [
- "'None' -> '<class 'libbe.settings_object.EMPTY'>'"
- ], SAVES)
+ self.failUnless(SAVES == [], SAVES)
self.failUnless(t.settings["List-type"]==EMPTY,t.settings["List-type"])
t.list_type = []
self.failUnless(t.settings["List-type"] == [], t.settings["List-type"])
self.failUnless(SAVES == [
- "'None' -> '<class 'libbe.settings_object.EMPTY'>'",
"'<class 'libbe.settings_object.EMPTY'>' -> '[]'"
], SAVES)
t.list_type.append(5)
self.failUnless(SAVES == [
- "'None' -> '<class 'libbe.settings_object.EMPTY'>'",
"'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
], SAVES)
self.failUnless(t.settings["List-type"] == [5],t.settings["List-type"])
self.failUnless(SAVES == [ # the append(5) has not yet been saved
- "'None' -> '<class 'libbe.settings_object.EMPTY'>'",
"'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
], SAVES)
self.failUnless(t.list_type == [5], t.list_type) # <-get triggers saved
self.failUnless(SAVES == [ # now the append(5) has been saved.
- "'None' -> '<class 'libbe.settings_object.EMPTY'>'",
"'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
"'[]' -> '[5]'"
], SAVES)
diff --git a/libbe/tree.py b/libbe/tree.py
index 45ae085..06d09e5 100644
--- a/libbe/tree.py
+++ b/libbe/tree.py
@@ -15,6 +15,10 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+Define a traversable tree structure.
+"""
+
import doctest
class Tree(list):
diff --git a/libbe/upgrade.py b/libbe/upgrade.py
new file mode 100644
index 0000000..4123c72
--- /dev/null
+++ b/libbe/upgrade.py
@@ -0,0 +1,187 @@
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Handle conversion between the various on-disk images.
+"""
+
+import os, os.path
+import sys
+import doctest
+
+import encoding
+import mapfile
+import vcs
+
+# a list of all past versions
+BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0",
+ "Bugs Everywhere Directory v1.1",
+ "Bugs Everywhere Directory v1.2"]
+
+# the current version
+BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1]
+
+class Upgrader (object):
+ "Class for converting "
+ initial_version = None
+ final_version = None
+ def __init__(self, root):
+ self.root = root
+ # use the "None" VCS to ensure proper encoding/decoding and
+ # simplify path construction.
+ self.vcs = vcs.vcs_by_name("None")
+ self.vcs.root(self.root)
+ self.vcs.encoding = encoding.get_encoding()
+
+ def get_path(self, *args):
+ """
+ Return a path relative to .root.
+ """
+ dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(dir, *args)
+
+ def check_initial_version(self):
+ path = self.get_path("version")
+ version = self.vcs.get_file_contents(path).rstrip("\n")
+ assert version == self.initial_version, version
+
+ def set_version(self):
+ path = self.get_path("version")
+ self.vcs.set_file_contents(path, self.final_version+"\n")
+
+ def upgrade(self):
+ print >> sys.stderr, "upgrading bugdir from '%s' to '%s'" \
+ % (self.initial_version, self.final_version)
+ self.check_initial_version()
+ self.set_version()
+ self._upgrade()
+
+ def _upgrade(self):
+ raise NotImplementedError
+
+
+class Upgrade_1_0_to_1_1 (Upgrader):
+ initial_version = "Bugs Everywhere Tree 1 0"
+ final_version = "Bugs Everywhere Directory v1.1"
+ def _upgrade_mapfile(self, path):
+ contents = self.vcs.get_file_contents(path)
+ old_format = False
+ for line in contents.splitlines():
+ if len(line.split("=")) == 2:
+ old_format = True
+ break
+ if old_format == True:
+ # translate to YAML.
+ newlines = []
+ for line in contents.splitlines():
+ line = line.rstrip('\n')
+ if len(line) == 0:
+ continue
+ fields = line.split("=")
+ if len(fields) == 2:
+ key,value = fields
+ newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
+ else:
+ newlines.append(line)
+ contents = '\n'.join(newlines)
+ # load the YAML and save
+ map = mapfile.parse(contents)
+ mapfile.map_save(self.vcs, path, map)
+
+ def _upgrade(self):
+ """
+ Comment value field "From" -> "Author".
+ Homegrown mapfile -> YAML.
+ """
+ path = self.get_path("settings")
+ self._upgrade_mapfile(path)
+ for bug_uuid in os.listdir(self.get_path("bugs")):
+ path = self.get_path("bugs", bug_uuid, "values")
+ self._upgrade_mapfile(path)
+ c_path = ["bugs", bug_uuid, "comments"]
+ if not os.path.exists(self.get_path(*c_path)):
+ continue # no comments for this bug
+ for comment_uuid in os.listdir(self.get_path(*c_path)):
+ path_list = c_path + [comment_uuid, "values"]
+ path = self.get_path(*path_list)
+ self._upgrade_mapfile(path)
+ settings = mapfile.map_load(self.vcs, path)
+ if "From" in settings:
+ settings["Author"] = settings.pop("From")
+ mapfile.map_save(self.vcs, path, settings)
+
+
+class Upgrade_1_1_to_1_2 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.1"
+ final_version = "Bugs Everywhere Directory v1.2"
+ def _upgrade(self):
+ """
+ BugDir settings field "rcs_name" -> "vcs_name".
+ """
+ path = self.get_path("settings")
+ settings = mapfile.map_load(self.vcs, path)
+ if "rcs_name" in settings:
+ settings["vcs_name"] = settings.pop("rcs_name")
+ mapfile.map_save(self.vcs, path, settings)
+
+
+upgraders = [Upgrade_1_0_to_1_1,
+ Upgrade_1_1_to_1_2]
+upgrade_classes = {}
+for upgrader in upgraders:
+ upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader
+
+def upgrade(path, current_version,
+ target_version=BUGDIR_DISK_VERSION):
+ """
+ Call the appropriate upgrade function to convert current_version
+ to target_version. If a direct conversion function does not exist,
+ use consecutive conversion functions.
+ """
+ if current_version not in BUGDIR_DISK_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % version
+ if target_version not in BUGDIR_DISK_VERSIONS:
+ raise NotImplementedError, \
+ "Cannot handle version '%s' yet." % version
+
+ if (current_version, target_version) in upgrade_classes:
+ # direct conversion
+ upgrade_class = upgrade_classes[(current_version, target_version)]
+ u = upgrade_class(path)
+ u.upgrade()
+ else:
+ # consecutive single-step conversion
+ i = BUGDIR_DISK_VERSIONS.index(current_version)
+ while True:
+ version_a = BUGDIR_DISK_VERSIONS[i]
+ version_b = BUGDIR_DISK_VERSIONS[i+1]
+ try:
+ upgrade_class = upgrade_classes[(version_a, version_b)]
+ except KeyError:
+ raise NotImplementedError, \
+ "Cannot convert version '%s' to '%s' yet." \
+ % (version_a, version_b)
+ u = upgrade_class(path)
+ u.upgrade()
+ if version_b == target_version:
+ break
+ i += 1
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/utility.py b/libbe/utility.py
index 3df06b4..aafbf8d 100644
--- a/libbe/utility.py
+++ b/libbe/utility.py
@@ -14,6 +14,11 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Assorted utility functions that don't fit in anywhere else.
+"""
+
import calendar
import codecs
import os
diff --git a/libbe/vcs.py b/libbe/vcs.py
new file mode 100644
index 0000000..a1d3022
--- /dev/null
+++ b/libbe/vcs.py
@@ -0,0 +1,938 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Alexander Belchenko <bialix@ukr.net>
+# Ben Finney <ben+python@benfinney.id.au>
+# Chris Ball <cjb@laptop.org>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Define the base VCS (Version Control System) class, which should be
+subclassed by other Version Control System backends. The base class
+implements a "do not version" VCS.
+"""
+
+from subprocess import Popen, PIPE
+import codecs
+import os
+import os.path
+import re
+from socket import gethostname
+import shutil
+import sys
+import tempfile
+import unittest
+import doctest
+
+from utility import Dir, search_parent_directories
+
+
+def _get_matching_vcs(matchfn):
+ """Return the first module for which matchfn(VCS_instance) is true"""
+ import arch
+ import bzr
+ import darcs
+ import git
+ import hg
+ for module in [arch, bzr, darcs, git, hg]:
+ vcs = module.new()
+ if matchfn(vcs) == True:
+ return vcs
+ del(vcs)
+ return VCS()
+
+def vcs_by_name(vcs_name):
+ """Return the module for the VCS with the given name"""
+ return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
+
+def detect_vcs(dir):
+ """Return an VCS instance for the vcs being used in this directory"""
+ return _get_matching_vcs(lambda vcs: vcs.detect(dir))
+
+def installed_vcs():
+ """Return an instance of an installed VCS"""
+ return _get_matching_vcs(lambda vcs: vcs.installed())
+
+
+class CommandError(Exception):
+ def __init__(self, command, status, stdout, stderr):
+ strerror = ["Command failed (%d):\n %s\n" % (status, stderr),
+ "while executing\n %s" % command]
+ Exception.__init__(self, "\n".join(strerror))
+ self.command = command
+ self.status = status
+ self.stdout = stdout
+ self.stderr = stderr
+
+class SettingIDnotSupported(NotImplementedError):
+ pass
+
+class VCSnotRooted(Exception):
+ def __init__(self):
+ msg = "VCS not rooted"
+ Exception.__init__(self, msg)
+
+class PathNotInRoot(Exception):
+ def __init__(self, path, root):
+ msg = "Path '%s' not in root '%s'" % (path, root)
+ Exception.__init__(self, msg)
+ self.path = path
+ self.root = root
+
+class NoSuchFile(Exception):
+ def __init__(self, pathname, root="."):
+ path = os.path.abspath(os.path.join(root, pathname))
+ Exception.__init__(self, "No such file: %s" % path)
+
+class EmptyCommit(Exception):
+ def __init__(self):
+ Exception.__init__(self, "No changes to commit")
+
+
+def new():
+ return VCS()
+
+class VCS(object):
+ """
+ This class implements a 'no-vcs' interface.
+
+ Support for other VCSs can be added by subclassing this class, and
+ overriding methods _vcs_*() with code appropriate for your VCS.
+
+ The methods _u_*() are utility methods available to the _vcs_*()
+ methods.
+ """
+ name = "None"
+ client = "" # command-line tool for _u_invoke_client
+ versioned = False
+ def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()):
+ self.paranoid = paranoid
+ self.verboseInvoke = False
+ self.rootdir = None
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ self.encoding = encoding
+ def __del__(self):
+ self.cleanup()
+
+ def _vcs_help(self):
+ """
+ Return the command help string.
+ (Allows a simple test to see if the client is installed.)
+ """
+ pass
+ def _vcs_detect(self, path=None):
+ """
+ Detect whether a directory is revision controlled with this VCS.
+ """
+ return True
+ def _vcs_root(self, path):
+ """
+ Get the VCS root. This is the default working directory for
+ future invocations. You would normally set this to the root
+ directory for your VCS.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ if path == "":
+ path = os.path.abspath(".")
+ return path
+ def _vcs_init(self, path):
+ """
+ Begin versioning the tree based at path.
+ """
+ pass
+ def _vcs_cleanup(self):
+ """
+ Remove any cruft that _vcs_init() created outside of the
+ versioned tree.
+ """
+ pass
+ def _vcs_get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return None.
+ """
+ return None
+ def _vcs_set_user_id(self, value):
+ """
+ Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the VCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ raise SettingIDnotSupported
+ def _vcs_add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ pass
+ def _vcs_remove(self, path):
+ """
+ Remove the file at path from version control. Optionally
+ remove the file from the filesystem as well.
+ """
+ pass
+ def _vcs_update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ pass
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
+ """
+ Get the file contents as they were in a given revision.
+ Revision==None specifies the current revision.
+ """
+ assert revision == None, \
+ "The %s VCS does not support revision specifiers" % self.name
+ if binary == False:
+ f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding)
+ else:
+ f = open(os.path.join(self.rootdir, path), "rb")
+ contents = f.read()
+ f.close()
+ return contents
+ def _vcs_duplicate_repo(self, directory, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ dir specifies a directory to create the duplicate in.
+ """
+ shutil.copytree(self.rootdir, directory, True)
+ def _vcs_commit(self, commitfile, allow_empty=False):
+ """
+ Commit the current working directory, using the contents of
+ commitfile as the comment. Return the name of the old
+ revision (or None if commits are not supported).
+
+ If allow_empty == False, raise EmptyCommit if there are no
+ changes to commit.
+ """
+ return None
+ def _vcs_revision_id(self, index):
+ """
+ Return the name of the <index>th revision. Index will be an
+ integer (possibly <= 0). The choice of which branch to follow
+ when crossing branches/merges is not defined.
+
+ Return None if revision IDs are not supported, or if the
+ specified revision does not exist.
+ """
+ return None
+ def installed(self):
+ try:
+ self._vcs_help()
+ return True
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ return False
+ except CommandError:
+ return False
+ def detect(self, path="."):
+ """
+ Detect whether a directory is revision controlled with this VCS.
+ """
+ return self._vcs_detect(path)
+ def root(self, path):
+ """
+ Set the root directory to the path's VCS root. This is the
+ default working directory for future invocations.
+ """
+ self.rootdir = self._vcs_root(path)
+ def init(self, path):
+ """
+ Begin versioning the tree based at path.
+ Also roots the vcs at path.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ self._vcs_init(path)
+ self.root(path)
+ def cleanup(self):
+ self._vcs_cleanup()
+ def get_user_id(self):
+ """
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return the user's
+ id. You can override the automatic lookup procedure by setting the
+ VCS.user_id attribute to a string of your choice.
+ """
+ if hasattr(self, "user_id"):
+ if self.user_id != None:
+ return self.user_id
+ id = self._vcs_get_user_id()
+ if id == None:
+ name = self._u_get_fallback_username()
+ email = self._u_get_fallback_email()
+ id = self._u_create_id(name, email)
+ print >> sys.stderr, "Guessing id '%s'" % id
+ try:
+ self.set_user_id(id)
+ except SettingIDnotSupported:
+ pass
+ return id
+ def set_user_id(self, value):
+ """
+ Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the VCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ self._vcs_set_user_id(value)
+ def add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ self._vcs_add(self._u_rel_path(path))
+ def remove(self, path):
+ """
+ Remove a file from both version control and the filesystem.
+ """
+ self._vcs_remove(self._u_rel_path(path))
+ if os.path.exists(path):
+ os.remove(path)
+ def recursive_remove(self, dirname):
+ """
+ Remove a file/directory and all its decendents from both
+ version control and the filesystem.
+ """
+ if not os.path.exists(dirname):
+ raise NoSuchFile(dirname)
+ for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
+ filenames.extend(dirnames)
+ for path in filenames:
+ fullpath = os.path.join(dirpath, path)
+ if os.path.exists(fullpath) == False:
+ continue
+ self._vcs_remove(self._u_rel_path(fullpath))
+ if os.path.exists(dirname):
+ shutil.rmtree(dirname)
+ def update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ self._vcs_update(self._u_rel_path(path))
+ def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False):
+ """
+ Get the file as it was in a given revision.
+ Revision==None specifies the current revision.
+ """
+ if not os.path.exists(path):
+ raise NoSuchFile(path)
+ if self._use_vcs(path, allow_no_vcs):
+ relpath = self._u_rel_path(path)
+ contents = self._vcs_get_file_contents(relpath,revision,binary=binary)
+ else:
+ f = codecs.open(path, "r", self.encoding)
+ contents = f.read()
+ f.close()
+ return contents
+ def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False):
+ """
+ Set the file contents under version control.
+ """
+ add = not os.path.exists(path)
+ if binary == False:
+ f = codecs.open(path, "w", self.encoding)
+ else:
+ f = open(path, "wb")
+ f.write(contents)
+ f.close()
+
+ if self._use_vcs(path, allow_no_vcs):
+ if add:
+ self.add(path)
+ else:
+ self.update(path)
+ def mkdir(self, path, allow_no_vcs=False, check_parents=True):
+ """
+ Create (if neccessary) a directory at path under version
+ control.
+ """
+ if check_parents == True:
+ parent = os.path.dirname(path)
+ if not os.path.exists(parent): # recurse through parents
+ self.mkdir(parent, allow_no_vcs, check_parents)
+ if not os.path.exists(path):
+ os.mkdir(path)
+ if self._use_vcs(path, allow_no_vcs):
+ self.add(path)
+ else:
+ assert os.path.isdir(path)
+ if self._use_vcs(path, allow_no_vcs):
+ #self.update(path)# Don't update directories. Changing files
+ pass # underneath them should be sufficient.
+
+ def duplicate_repo(self, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ Return the path to the arbitrary directory at the base of the new repo.
+ """
+ # Dirname in Baseir to protect against simlink attacks.
+ if self._duplicateBasedir == None:
+ self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs')
+ self._duplicateDirname = \
+ os.path.join(self._duplicateBasedir, "duplicate")
+ self._vcs_duplicate_repo(directory=self._duplicateDirname,
+ revision=revision)
+ return self._duplicateDirname
+ def remove_duplicate_repo(self):
+ """
+ Clean up a duplicate repo created with duplicate_repo().
+ """
+ if self._duplicateBasedir != None:
+ shutil.rmtree(self._duplicateBasedir)
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ def commit(self, summary, body=None, allow_empty=False):
+ """
+ Commit the current working directory, with a commit message
+ string summary and body. Return the name of the old revision
+ (or None if versioning is not supported).
+
+ If allow_empty == False (the default), raise EmptyCommit if
+ there are no changes to commit.
+ """
+ summary = summary.strip()+'\n'
+ if body is not None:
+ summary += '\n' + body.strip() + '\n'
+ descriptor, filename = tempfile.mkstemp()
+ revision = None
+ try:
+ temp_file = os.fdopen(descriptor, 'wb')
+ temp_file.write(summary)
+ temp_file.flush()
+ self.precommit()
+ revision = self._vcs_commit(filename, allow_empty=allow_empty)
+ temp_file.close()
+ self.postcommit()
+ finally:
+ os.remove(filename)
+ return revision
+ def precommit(self):
+ """
+ Executed before all attempted commits.
+ """
+ pass
+ def postcommit(self):
+ """
+ Only executed after successful commits.
+ """
+ pass
+ def revision_id(self, index=None):
+ """
+ Return the name of the <index>th revision. The choice of
+ which branch to follow when crossing branches/merges is not
+ defined.
+
+ Return None if index==None, revision IDs are not supported, or
+ if the specified revision does not exist.
+ """
+ if index == None:
+ return None
+ return self._vcs_revision_id(index)
+ def _u_any_in_string(self, list, string):
+ """
+ Return True if any of the strings in list are in string.
+ Otherwise return False.
+ """
+ for list_string in list:
+ if list_string in string:
+ return True
+ return False
+ def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None):
+ """
+ expect should be a tuple of allowed exit codes. cwd should be
+ the directory from which the command will be executed.
+ """
+ if cwd == None:
+ cwd = self.rootdir
+ if self.verboseInvoke == True:
+ print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args))
+ try :
+ if sys.platform != "win32":
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
+ else:
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ shell=True, cwd=cwd)
+ except OSError, e :
+ raise CommandError(args, status=e.args[0], stdout="", stderr=e)
+ output,error = q.communicate(input=stdin)
+ status = q.wait()
+ if self.verboseInvoke == True:
+ print >> sys.stderr, "%d\n%s%s" % (status, output, error)
+ if status not in expect:
+ raise CommandError(args, status, output, error)
+ return status, output, error
+ def _u_invoke_client(self, *args, **kwargs):
+ directory = kwargs.get('directory',None)
+ expect = kwargs.get('expect', (0,))
+ stdin = kwargs.get('stdin', None)
+ cl_args = [self.client]
+ cl_args.extend(args)
+ return self._u_invoke(cl_args, stdin=stdin,expect=expect,cwd=directory)
+ def _u_search_parent_directories(self, path, filename):
+ """
+ Find the file (or directory) named filename in path or in any
+ of path's parents.
+
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
+ """
+ return search_parent_directories(path, filename)
+ def _use_vcs(self, path, allow_no_vcs):
+ """
+ Try and decide if _vcs_add/update/mkdir/etc calls will
+ succeed. Returns True is we think the vcs_call would
+ succeeed, and False otherwise.
+ """
+ use_vcs = True
+ exception = None
+ if self.rootdir != None:
+ if self.path_in_root(path) == False:
+ use_vcs = False
+ exception = PathNotInRoot(path, self.rootdir)
+ else:
+ use_vcs = False
+ exception = VCSnotRooted
+ if use_vcs == False and allow_no_vcs==False:
+ raise exception
+ return use_vcs
+ def path_in_root(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> vcs = new()
+ >>> vcs.path_in_root("/a.b/c/.be", "/a.b/c")
+ True
+ >>> vcs.path_in_root("/a.b/.be", "/a.b/c")
+ False
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise VCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ return False
+ return True
+ def _u_rel_path(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> vcs = new()
+ >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
+ '.be'
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise VCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ raise PathNotInRoot(path, absRootSlashedDir)
+ assert path != absRootSlashedDir, \
+ "file %s == root directory %s" % (path, absRootSlashedDir)
+ relpath = path[len(absRootSlashedDir):]
+ return relpath
+ def _u_abspath(self, path, root=None):
+ """
+ Return the absolute path from a path realtive to root.
+ >>> vcs = new()
+ >>> vcs._u_abspath(".be", "/a.b/c")
+ '/a.b/c/.be'
+ """
+ if root == None:
+ assert self.rootdir != None, "VCS not rooted"
+ root = self.rootdir
+ return os.path.abspath(os.path.join(root, path))
+ def _u_create_id(self, name, email=None):
+ """
+ >>> vcs = new()
+ >>> vcs._u_create_id("John Doe", "jdoe@example.com")
+ 'John Doe <jdoe@example.com>'
+ >>> vcs._u_create_id("John Doe")
+ 'John Doe'
+ """
+ assert len(name) > 0
+ if email == None or len(email) == 0:
+ return name
+ else:
+ return "%s <%s>" % (name, email)
+ def _u_parse_id(self, value):
+ """
+ >>> vcs = new()
+ >>> vcs._u_parse_id("John Doe <jdoe@example.com>")
+ ('John Doe', 'jdoe@example.com')
+ >>> vcs._u_parse_id("John Doe")
+ ('John Doe', None)
+ >>> try:
+ ... vcs._u_parse_id("John Doe <jdoe@example.com><what?>")
+ ... except AssertionError:
+ ... print "Invalid match"
+ Invalid match
+ """
+ emailexp = re.compile("(.*) <([^>]*)>(.*)")
+ match = emailexp.search(value)
+ if match == None:
+ email = None
+ name = value
+ else:
+ assert len(match.groups()) == 3
+ assert match.groups()[2] == "", match.groups()
+ email = match.groups()[1]
+ name = match.groups()[0]
+ assert name != None
+ assert len(name) > 0
+ return (name, email)
+ def _u_get_fallback_username(self):
+ name = None
+ for envariable in ["LOGNAME", "USERNAME"]:
+ if os.environ.has_key(envariable):
+ name = os.environ[envariable]
+ break
+ assert name != None
+ return name
+ def _u_get_fallback_email(self):
+ hostname = gethostname()
+ name = self._u_get_fallback_username()
+ return "%s@%s" % (name, hostname)
+ def _u_parse_commitfile(self, commitfile):
+ """
+ Split the commitfile created in self.commit() back into
+ summary and header lines.
+ """
+ f = codecs.open(commitfile, "r", self.encoding)
+ summary = f.readline()
+ body = f.read()
+ body.lstrip('\n')
+ if len(body) == 0:
+ body = None
+ f.close()
+ return (summary, body)
+
+
+def setup_vcs_test_fixtures(testcase):
+ """Set up test fixtures for VCS test case."""
+ testcase.vcs = testcase.Class()
+ testcase.dir = Dir()
+ testcase.dirname = testcase.dir.path
+
+ vcs_not_supporting_uninitialized_user_id = []
+ vcs_not_supporting_set_user_id = ["None", "hg"]
+ testcase.vcs_supports_uninitialized_user_id = (
+ testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id)
+ testcase.vcs_supports_set_user_id = (
+ testcase.vcs.name not in vcs_not_supporting_set_user_id)
+
+ if not testcase.vcs.installed():
+ testcase.fail(
+ "%(name)s VCS not found" % vars(testcase.Class))
+
+ if testcase.Class.name != "None":
+ testcase.failIf(
+ testcase.vcs.detect(testcase.dirname),
+ "Detected %(name)s VCS before initialising"
+ % vars(testcase.Class))
+
+ testcase.vcs.init(testcase.dirname)
+
+
+class VCSTestCase(unittest.TestCase):
+ """Test cases for base VCS class."""
+
+ Class = VCS
+
+ def __init__(self, *args, **kwargs):
+ super(VCSTestCase, self).__init__(*args, **kwargs)
+ self.dirname = None
+
+ def setUp(self):
+ super(VCSTestCase, self).setUp()
+ setup_vcs_test_fixtures(self)
+
+ def tearDown(self):
+ del(self.vcs)
+ super(VCSTestCase, self).tearDown()
+
+ def full_path(self, rel_path):
+ return os.path.join(self.dirname, rel_path)
+
+
+class VCS_init_TestCase(VCSTestCase):
+ """Test cases for VCS.init method."""
+
+ def test_detect_should_succeed_after_init(self):
+ """Should detect VCS in directory after initialization."""
+ self.failUnless(
+ self.vcs.detect(self.dirname),
+ "Did not detect %(name)s VCS after initialising"
+ % vars(self.Class))
+
+ def test_vcs_rootdir_in_specified_root_path(self):
+ """VCS root directory should be in specified root path."""
+ rp = os.path.realpath(self.vcs.rootdir)
+ dp = os.path.realpath(self.dirname)
+ vcs_name = self.Class.name
+ self.failUnless(
+ dp == rp or rp == None,
+ "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
+
+
+class VCS_get_user_id_TestCase(VCSTestCase):
+ """Test cases for VCS.get_user_id method."""
+
+ def test_gets_existing_user_id(self):
+ """Should get the existing user ID."""
+ if not self.vcs_supports_uninitialized_user_id:
+ return
+
+ user_id = self.vcs.get_user_id()
+ self.failUnless(
+ user_id is not None,
+ "unable to get a user id")
+
+
+class VCS_set_user_id_TestCase(VCSTestCase):
+ """Test cases for VCS.set_user_id method."""
+
+ def setUp(self):
+ super(VCS_set_user_id_TestCase, self).setUp()
+
+ if self.vcs_supports_uninitialized_user_id:
+ self.prev_user_id = self.vcs.get_user_id()
+ else:
+ self.prev_user_id = "Uninitialized identity <bogus@example.org>"
+
+ if self.vcs_supports_set_user_id:
+ self.test_new_user_id = "John Doe <jdoe@example.com>"
+ self.vcs.set_user_id(self.test_new_user_id)
+
+ def tearDown(self):
+ if self.vcs_supports_set_user_id:
+ self.vcs.set_user_id(self.prev_user_id)
+ super(VCS_set_user_id_TestCase, self).tearDown()
+
+ def test_raises_error_in_unsupported_vcs(self):
+ """Should raise an error in a VCS that doesn't support it."""
+ if self.vcs_supports_set_user_id:
+ return
+ self.assertRaises(
+ SettingIDnotSupported,
+ self.vcs.set_user_id, "foo")
+
+ def test_updates_user_id_in_supporting_vcs(self):
+ """Should update the user ID in an VCS that supports it."""
+ if not self.vcs_supports_set_user_id:
+ return
+ user_id = self.vcs.get_user_id()
+ self.failUnlessEqual(
+ self.test_new_user_id, user_id,
+ "user id not set correctly (expected %s, got %s)"
+ % (self.test_new_user_id, user_id))
+
+
+def setup_vcs_revision_test_fixtures(testcase):
+ """Set up revision test fixtures for VCS test case."""
+ testcase.test_dirs = ['a', 'a/b', 'c']
+ for path in testcase.test_dirs:
+ testcase.vcs.mkdir(testcase.full_path(path))
+
+ testcase.test_files = ['a/text', 'a/b/text']
+
+ testcase.test_contents = {
+ 'rev_1': "Lorem ipsum",
+ 'uncommitted': "dolor sit amet",
+ }
+
+
+class VCS_mkdir_TestCase(VCSTestCase):
+ """Test cases for VCS.mkdir method."""
+
+ def setUp(self):
+ super(VCS_mkdir_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ for path in reversed(sorted(self.test_dirs)):
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_mkdir_TestCase, self).tearDown()
+
+ def test_mkdir_creates_directory(self):
+ """Should create specified directory in filesystem."""
+ for path in self.test_dirs:
+ full_path = self.full_path(path)
+ self.failUnless(
+ os.path.exists(full_path),
+ "path %(full_path)s does not exist" % vars())
+
+
+class VCS_commit_TestCase(VCSTestCase):
+ """Test cases for VCS.commit method."""
+
+ def setUp(self):
+ super(VCS_commit_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ for path in reversed(sorted(self.test_dirs)):
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_commit_TestCase, self).tearDown()
+
+ def test_file_contents_as_specified(self):
+ """Should set file contents as specified."""
+ test_contents = self.test_contents['rev_1']
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(full_path, test_contents)
+ current_contents = self.vcs.get_file_contents(full_path)
+ self.failUnlessEqual(test_contents, current_contents)
+
+ def test_file_contents_as_committed(self):
+ """Should have file contents as specified after commit."""
+ test_contents = self.test_contents['rev_1']
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(full_path, test_contents)
+ revision = self.vcs.commit("Initial file contents.")
+ current_contents = self.vcs.get_file_contents(full_path)
+ self.failUnlessEqual(test_contents, current_contents)
+
+ def test_file_contents_as_set_when_uncommitted(self):
+ """Should set file contents as specified after commit."""
+ if not self.vcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.vcs.commit("Initial file contents.")
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ current_contents = self.vcs.get_file_contents(full_path)
+ self.failUnlessEqual(
+ self.test_contents['uncommitted'], current_contents)
+
+ def test_revision_file_contents_as_committed(self):
+ """Should get file contents as committed to specified revision."""
+ if not self.vcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.vcs.commit("Initial file contents.")
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ committed_contents = self.vcs.get_file_contents(
+ full_path, revision)
+ self.failUnlessEqual(
+ self.test_contents['rev_1'], committed_contents)
+
+ def test_revision_id_as_committed(self):
+ """Check for compatibility between .commit() and .revision_id()"""
+ if not self.vcs.versioned:
+ self.failUnlessEqual(self.vcs.revision_id(5), None)
+ return
+ committed_revisions = []
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.vcs.commit("Initial %s contents." % path)
+ committed_revisions.append(revision)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ revision = self.vcs.commit("Altered %s contents." % path)
+ committed_revisions.append(revision)
+ for i,revision in enumerate(committed_revisions):
+ self.failUnlessEqual(self.vcs.revision_id(i), revision)
+ i += -len(committed_revisions) # check negative indices
+ self.failUnlessEqual(self.vcs.revision_id(i), revision)
+ i = len(committed_revisions)
+ self.failUnlessEqual(self.vcs.revision_id(i), None)
+ self.failUnlessEqual(self.vcs.revision_id(-i-1), None)
+
+ def test_revision_id_as_committed(self):
+ """Check revision id before first commit"""
+ if not self.vcs.versioned:
+ self.failUnlessEqual(self.vcs.revision_id(5), None)
+ return
+ committed_revisions = []
+ for path in self.test_files:
+ self.failUnlessEqual(self.vcs.revision_id(0), None)
+
+
+class VCS_duplicate_repo_TestCase(VCSTestCase):
+ """Test cases for VCS.duplicate_repo method."""
+
+ def setUp(self):
+ super(VCS_duplicate_repo_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
+
+ def tearDown(self):
+ self.vcs.remove_duplicate_repo()
+ for path in reversed(sorted(self.test_dirs)):
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_duplicate_repo_TestCase, self).tearDown()
+
+ def test_revision_file_contents_as_committed(self):
+ """Should match file contents as committed to specified revision."""
+ if not self.vcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.vcs.commit("Commit current status")
+ self.vcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ dup_repo_path = self.vcs.duplicate_repo(revision)
+ dup_file_path = os.path.join(dup_repo_path, path)
+ dup_file_contents = file(dup_file_path, 'rb').read()
+ self.failUnlessEqual(
+ self.test_contents['rev_1'], dup_file_contents)
+ self.vcs.remove_duplicate_repo()
+
+
+def make_vcs_testcase_subclasses(vcs_class, namespace):
+ """Make VCSTestCase subclasses for vcs_class in the namespace."""
+ vcs_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, VCSTestCase)]
+
+ for base_class in vcs_testcase_classes:
+ testcase_class_name = vcs_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = vcs_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+
+
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/version.py b/libbe/version.py
new file mode 100644
index 0000000..f8eebbd
--- /dev/null
+++ b/libbe/version.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Store version info for this BE installation. By default, use the
+bzr-generated information in _version.py, but allow manual overriding
+by setting _VERSION. This allows support of both the "I don't want to
+be bothered setting version strings" and the "I want complete control
+over the version strings" workflows.
+"""
+
+import libbe._version as _version
+
+# Manually set a version string (optional, defaults to bzr revision id)
+#_VERSION = "1.2.3"
+
+def version(verbose=False):
+ """
+ Returns the version string for this BE installation. If
+ verbose==True, the string will include extra lines with more
+ detail (e.g. bzr branch nickname, etc.).
+ """
+ if "_VERSION" in globals():
+ string = _VERSION
+ else:
+ string = _version.version_info["revision_id"]
+ if verbose == True:
+ string += ("\n"
+ "revision: %(revno)d\n"
+ "nick: %(branch_nick)s\n"
+ "revision id: %(revision_id)s"
+ % _version.version_info)
+ return string
+
+if __name__ == "__main__":
+ print version(verbose=True)