diff options
Diffstat (limited to 'libbe')
-rw-r--r-- | libbe/arch.py | 67 | ||||
-rw-r--r-- | libbe/beuuid.py | 2 | ||||
-rw-r--r-- | libbe/bug.py | 117 | ||||
-rw-r--r-- | libbe/bugdir.py | 476 | ||||
-rw-r--r-- | libbe/bzr.py | 52 | ||||
-rw-r--r-- | libbe/cmdutil.py | 19 | ||||
-rw-r--r-- | libbe/comment.py | 306 | ||||
-rw-r--r-- | libbe/config.py | 6 | ||||
-rw-r--r-- | libbe/darcs.py | 93 | ||||
-rw-r--r-- | libbe/diff.py | 486 | ||||
-rw-r--r-- | libbe/editor.py | 9 | ||||
-rw-r--r-- | libbe/encoding.py | 10 | ||||
-rw-r--r-- | libbe/git.py | 78 | ||||
-rw-r--r-- | libbe/hg.py | 63 | ||||
-rw-r--r-- | libbe/mapfile.py | 39 | ||||
-rw-r--r-- | libbe/plugin.py | 6 | ||||
-rw-r--r-- | libbe/properties.py | 10 | ||||
-rw-r--r-- | libbe/rcs.py | 876 | ||||
-rw-r--r-- | libbe/settings_object.py | 11 | ||||
-rw-r--r-- | libbe/tree.py | 4 | ||||
-rw-r--r-- | libbe/upgrade.py | 187 | ||||
-rw-r--r-- | libbe/utility.py | 5 | ||||
-rw-r--r-- | libbe/vcs.py | 938 | ||||
-rw-r--r-- | libbe/version.py | 50 |
24 files changed, 2449 insertions, 1461 deletions
diff --git a/libbe/arch.py b/libbe/arch.py index 2f45aa9..ab55172 100644 --- a/libbe/arch.py +++ b/libbe/arch.py @@ -17,6 +17,10 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +GNU Arch (tla) backend. +""" + import codecs import os import re @@ -26,10 +30,11 @@ import time import unittest import doctest -import config from beuuid import uuid_gen -import rcs -from rcs import RCS +import config +import vcs + + DEFAULT_CLIENT = "tla" @@ -38,7 +43,7 @@ client = config.get_val("arch_client", default=DEFAULT_CLIENT) def new(): return Arch() -class Arch(RCS): +class Arch(vcs.VCS): name = "Arch" client = client versioned = True @@ -48,21 +53,25 @@ class Arch(RCS): _project_name = None _tmp_project = False _arch_paramdir = os.path.expanduser("~/.arch-params") - def _rcs_help(self): + def _vcs_help(self): status,output,error = self._u_invoke_client("--help") return output - def _rcs_detect(self, path): + def _vcs_detect(self, path): """Detect whether a directory is revision-controlled using Arch""" if self._u_search_parent_directories(path, "{arch}") != None : config.set_val("arch_client", client) return True return False - def _rcs_init(self, path): + def _vcs_init(self, path): self._create_archive(path) self._create_project(path) self._add_project_code(path) def _create_archive(self, path): - # Create a new archive + """ + Create a temporary Arch archive in the directory PATH. This + archive will be removed by + __del__->cleanup->_vcs_cleanup->_remove_archive + """ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive assert self._archive_name == None id = self.get_user_id() @@ -103,7 +112,7 @@ class Arch(RCS): """ Create a temporary Arch project in the directory PATH. This project will be removed by - __del__->cleanup->_rcs_cleanup->_remove_project + __del__->cleanup->_vcs_cleanup->_remove_project """ # http://mwolson.org/projects/GettingStartedWithArch.html # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project @@ -159,13 +168,13 @@ class Arch(RCS): self._adjust_naming_conventions(path) self._invoke_client("import", "--summary", "Began versioning", directory=path) - def _rcs_cleanup(self): + def _vcs_cleanup(self): if self._tmp_project == True: self._remove_project() if self._tmp_archive == True: self._remove_archive() - def _rcs_root(self, path): + def _vcs_root(self, path): if not os.path.isdir(path): dirname = os.path.dirname(path) else: @@ -176,7 +185,6 @@ class Arch(RCS): self._get_archive_project_name(root) return root - def _get_archive_name(self, root): status,output,error = self._u_invoke_client("archives") lines = output.split('\n') @@ -188,7 +196,6 @@ class Arch(RCS): if os.path.realpath(location) == os.path.realpath(root): self._archive_name = archive assert self._archive_name != None - def _get_archive_project_name(self, root): # get project names status,output,error = self._u_invoke_client("tree-version", directory=root) @@ -197,7 +204,7 @@ class Arch(RCS): archive_name,project_name = output.rstrip('\n').split('/') self._archive_name = archive_name self._project_name = project_name - def _rcs_get_user_id(self): + def _vcs_get_user_id(self): try: status,output,error = self._u_invoke_client('my-id') return output.rstrip('\n') @@ -206,9 +213,9 @@ class Arch(RCS): return None else: raise - def _rcs_set_user_id(self, value): + def _vcs_set_user_id(self, value): self._u_invoke_client('my-id', value) - def _rcs_add(self, path): + def _vcs_add(self, path): self._u_invoke_client("add-id", path) realpath = os.path.realpath(self._u_abspath(path)) pathAdded = realpath in self._list_added(self.rootdir) @@ -237,14 +244,14 @@ class Arch(RCS): self._add_dir_rule(rule, os.path.dirname(path), self.rootdir) if os.path.realpath(path) not in self._list_added(self.rootdir): raise CantAddFile(path) - def _rcs_remove(self, path): + def _vcs_remove(self, path): if not '.arch-ids' in path: self._u_invoke_client("delete-id", path) - def _rcs_update(self, path): + def _vcs_update(self, path): pass - def _rcs_get_file_contents(self, path, revision=None, binary=False): + def _vcs_get_file_contents(self, path, revision=None, binary=False): if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, binary=binary) + return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary) else: status,output,error = \ self._invoke_client("file-find", path, revision) @@ -254,18 +261,18 @@ class Arch(RCS): contents = f.read() f.close() return contents - def _rcs_duplicate_repo(self, directory, revision=None): + def _vcs_duplicate_repo(self, directory, revision=None): if revision == None: - RCS._rcs_duplicate_repo(self, directory, revision) + vcs.VCS._vcs_duplicate_repo(self, directory, revision) else: status,output,error = \ self._u_invoke_client("get", revision,directory) - def _rcs_commit(self, commitfile, allow_empty=False): + def _vcs_commit(self, commitfile, allow_empty=False): if allow_empty == False: # arch applies empty commits without complaining, so check first status,output,error = self._u_invoke_client("changes",expect=(0,1)) if status == 0: - raise rcs.EmptyCommit() + raise vcs.EmptyCommit() summary,body = self._u_parse_commitfile(commitfile) args = ["commit", "--summary", summary] if body != None: @@ -281,6 +288,16 @@ class Arch(RCS): assert revpath.startswith(self._archive_project_name()+'--') revision = revpath[len(self._archive_project_name()+'--'):] return revpath + def _vcs_revision_id(self, index): + status,output,error = self._u_invoke_client("logs") + logs = output.splitlines() + first_log = logs.pop(0) + assert first_log == "base-0", first_log + try: + log = logs[index] + except IndexError: + return None + return "%s--%s" % (self._archive_project_name(), log) class CantAddFile(Exception): def __init__(self, file): @@ -289,7 +306,7 @@ class CantAddFile(Exception): -rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__]) +vcs.make_vcs_testcase_subclasses(Arch, sys.modules[__name__]) unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/beuuid.py b/libbe/beuuid.py index bc47208..490ed62 100644 --- a/libbe/beuuid.py +++ b/libbe/beuuid.py @@ -13,6 +13,7 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + """ Backwards compatibility support for Python 2.4. Once people give up on 2.4 ;), the uuid call should be merged into bugdir.py @@ -20,6 +21,7 @@ on 2.4 ;), the uuid call should be merged into bugdir.py import unittest + try: from uuid import uuid4 # Python >= 2.5 def uuid_gen(): diff --git a/libbe/bug.py b/libbe/bug.py index c1e5481..fd30ff7 100644 --- a/libbe/bug.py +++ b/libbe/bug.py @@ -15,6 +15,11 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Define the Bug class for representing bugs. +""" + import os import os.path import errno @@ -33,6 +38,11 @@ import comment import utility +class DiskAccessRequired (Exception): + def __init__(self, goal): + msg = "Cannot %s without accessing the disk" % goal + Exception.__init__(self, msg) + ### Define and describe valid bug categories # Use a tuple of (category, description) tuples since we don't have # ordered dicts in Python yet http://www.python.org/dev/peps/pep-0372/ @@ -216,15 +226,15 @@ class Bug(settings_object.SavedSettingsObject): @doc_property(doc="The trunk of the comment tree") def comment_root(): return {} - def _get_rcs(self): - if hasattr(self.bugdir, "rcs"): - return self.bugdir.rcs + def _get_vcs(self): + if hasattr(self.bugdir, "vcs"): + return self.bugdir.vcs @Property - @cached_property(generator=_get_rcs) - @local_property("rcs") + @cached_property(generator=_get_vcs) + @local_property("vcs") @doc_property(doc="A revision control system instance.") - def rcs(): return {} + def vcs(): return {} def __init__(self, bugdir=None, uuid=None, from_disk=False, load_comments=False, summary=None): @@ -238,17 +248,20 @@ class Bug(settings_object.SavedSettingsObject): if uuid == None: self.uuid = uuid_gen() self.time = int(time.time()) # only save to second precision - if self.rcs != None: - self.creator = self.rcs.get_user_id() + if self.vcs != None: + self.creator = self.vcs.get_user_id() self.summary = summary def __repr__(self): return "Bug(uuid=%r)" % self.uuid - def set_sync_with_disk(self, value): - self.sync_with_disk = value - for comment in self.comments(): - comment.set_sync_with_disk(value) + def __str__(self): + return self.string(shortlist=True) + + def __cmp__(self, other): + return cmp_full(self, other) + + # serializing methods def _setting_attr_string(self, setting): value = getattr(self, setting) @@ -331,43 +344,34 @@ class Bug(settings_object.SavedSettingsObject): output = bugout return output - def __str__(self): - return self.string(shortlist=True) + # methods for saving/loading/acessing settings and properties. - def __cmp__(self, other): - return cmp_full(self, other) + def get_path(self, *args): + dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid) + if len(args) == 0: + return dir + assert args[0] in ["values", "comments"], str(args) + return os.path.join(dir, *args) - def get_path(self, name=None): - my_dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid) - if name is None: - return my_dir - assert name in ["values", "comments"] - return os.path.join(my_dir, name) + def set_sync_with_disk(self, value): + self.sync_with_disk = value + for comment in self.comments(): + comment.set_sync_with_disk(value) def load_settings(self): - self.settings = mapfile.map_load(self.rcs, self.get_path("values")) + if self.sync_with_disk == False: + raise DiskAccessRequired("load settings") + self.settings = mapfile.map_load(self.vcs, self.get_path("values")) self._setup_saved_settings() - def load_comments(self, load_full=True): - if load_full == True: - # Force a complete load of the whole comment tree - self.comment_root = self._get_comment_root(load_full=True) - else: - # Setup for fresh lazy-loading. Clear _comment_root, so - # _get_comment_root returns a fresh version. Turn of - # syncing temporarily so we don't write our blank comment - # tree to disk. - self.sync_with_disk = False - self.comment_root = None - self.sync_with_disk = True - def save_settings(self): + if self.sync_with_disk == False: + raise DiskAccessRequired("save settings") assert self.summary != None, "Can't save blank bug" - - self.rcs.mkdir(self.get_path()) + self.vcs.mkdir(self.get_path()) path = self.get_path("values") - mapfile.map_save(self.rcs, path, self._get_saved_settings()) - + mapfile.map_save(self.vcs, path, self._get_saved_settings()) + def save(self): """ Save any loaded contents to disk. Because of lazy loading of @@ -378,15 +382,39 @@ class Bug(settings_object.SavedSettingsObject): calling this method will just waste time (unless something else has been messing with your on-disk files). """ + sync_with_disk = self.sync_with_disk + if sync_with_disk == False: + self.set_sync_with_disk(True) self.save_settings() if len(self.comment_root) > 0: comment.saveComments(self) + if sync_with_disk == False: + self.set_sync_with_disk(False) + + def load_comments(self, load_full=True): + if self.sync_with_disk == False: + raise DiskAccessRequired("load comments") + if load_full == True: + # Force a complete load of the whole comment tree + self.comment_root = self._get_comment_root(load_full=True) + else: + # Setup for fresh lazy-loading. Clear _comment_root, so + # _get_comment_root returns a fresh version. Turn of + # syncing temporarily so we don't write our blank comment + # tree to disk. + self.sync_with_disk = False + self.comment_root = None + self.sync_with_disk = True def remove(self): + if self.sync_with_disk == False: + raise DiskAccessRequired("remove") self.comment_root.remove() path = self.get_path() - self.rcs.recursive_remove(path) + self.vcs.recursive_remove(path) + # methods for managing comments + def comments(self): for comment in self.comment_root.traverse(): yield comment @@ -489,8 +517,12 @@ def cmp_attr(bug_1, bug_2, attr, invert=False): return cmp(val_1, val_2) # alphabetical rankings (a < z) +cmp_uuid = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "uuid") cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator") cmp_assigned = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "assigned") +cmp_target = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "target") +cmp_reporter = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "reporter") +cmp_summary = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "summary") # chronological rankings (newer < older) cmp_time = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "time", invert=True) @@ -512,7 +544,8 @@ def cmp_comments(bug_1, bug_2): return 0 DEFAULT_CMP_FULL_CMP_LIST = \ - (cmp_status,cmp_severity,cmp_assigned,cmp_time,cmp_creator,cmp_comments) + (cmp_status, cmp_severity, cmp_assigned, cmp_time, cmp_creator, + cmp_reporter, cmp_target, cmp_comments, cmp_summary, cmp_uuid) class BugCompoundComparator (object): def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST): diff --git a/libbe/bugdir.py b/libbe/bugdir.py index 6e020ee..c4f0f91 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -17,23 +17,30 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Define the BugDir class for representing bug comments. +""" + +import copy +import errno import os import os.path -import errno +import sys import time -import copy import unittest import doctest +import bug +import encoding from properties import Property, doc_property, local_property, \ defaulting_property, checked_property, fn_checked_property, \ cached_property, primed_property, change_hook_property, \ settings_property -import settings_object import mapfile -import bug -import rcs -import encoding +import vcs +import settings_object +import upgrade import utility @@ -62,8 +69,16 @@ class MultipleBugMatches(ValueError): self.shortname = shortname self.matches = matches +class NoBugMatches(KeyError): + def __init__(self, shortname): + msg = "No bug matches %s" % shortname + KeyError.__init__(self, msg) + self.shortname = shortname -TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n" +class DiskAccessRequired (Exception): + def __init__(self, goal): + msg = "Cannot %s without accessing the disk" % goal + Exception.__init__(self, msg) class BugDir (list, settings_object.SavedSettingsObject): @@ -99,7 +114,8 @@ class BugDir (list, settings_object.SavedSettingsObject): all bugs/comments/etc. that have been loaded into memory. If you've been living in memory and want to move to .sync_with_disk==True, but you're not sure if anything has been - changed in memoryy, a call to save() is a safe move. + changed in memory, a call to save() immediately before the + .set_sync_with_disk(True) call is a safe move. Regardless of .sync_with_disk, a call to .save() will write out all the contents that the BugDir instance has loaded into memory. @@ -107,15 +123,14 @@ class BugDir (list, settings_object.SavedSettingsObject): changes, this .save() call will be a waste of time. The BugDir will only load information from the file system when it - loads new bugs/comments that it doesn't already have in memory, or - when it explicitly asked to do so (e.g. .load() or - __init__(from_disk=True)). + loads new settings/bugs/comments that it doesn't already have in + memory and .sync_with_disk == True. - Allow RCS initialization + Allow VCS initialization ======================== This one is for testing purposes. Setting it to True allows the - BugDir to search for an installed RCS backend and initialize it in + BugDir to search for an installed VCS backend and initialize it in the root directory. This is a convenience option for supporting tests of versioning functionality (e.g. .duplicate_bugdir). @@ -172,9 +187,9 @@ class BugDir (list, settings_object.SavedSettingsObject): def encoding(): return {} def _setup_user_id(self, user_id): - self.rcs.user_id = user_id + self.vcs.user_id = user_id def _guess_user_id(self): - return self.rcs.get_user_id() + return self.vcs.get_user_id() def _set_user_id(self, old_user_id, new_user_id): self._setup_user_id(new_user_id) self._prop_save_settings(old_user_id, new_user_id) @@ -182,7 +197,7 @@ class BugDir (list, settings_object.SavedSettingsObject): @_versioned_property(name="user_id", doc= """The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note -that the Arch RCS backend *enforces* ids with this format.""", +that the Arch VCS backend *enforces* ids with this format.""", change_hook=_set_user_id, generator=_guess_user_id) def user_id(): return {} @@ -192,32 +207,32 @@ that the Arch RCS backend *enforces* ids with this format.""", """The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""") def default_assignee(): return {} - @_versioned_property(name="rcs_name", - doc="""The name of the current RCS. Kept seperate to make saving/loading -settings easy. Don't set this attribute. Set .rcs instead, and -.rcs_name will be automatically adjusted.""", + @_versioned_property(name="vcs_name", + doc="""The name of the current VCS. Kept seperate to make saving/loading +settings easy. Don't set this attribute. Set .vcs instead, and +.vcs_name will be automatically adjusted.""", default="None", allowed=["None", "Arch", "bzr", "darcs", "git", "hg"]) - def rcs_name(): return {} + def vcs_name(): return {} - def _get_rcs(self, rcs_name=None): + def _get_vcs(self, vcs_name=None): """Get and root a new revision control system""" - if rcs_name == None: - rcs_name = self.rcs_name - new_rcs = rcs.rcs_by_name(rcs_name) - self._change_rcs(None, new_rcs) - return new_rcs - def _change_rcs(self, old_rcs, new_rcs): - new_rcs.encoding = self.encoding - new_rcs.root(self.root) - self.rcs_name = new_rcs.name + if vcs_name == None: + vcs_name = self.vcs_name + new_vcs = vcs.vcs_by_name(vcs_name) + self._change_vcs(None, new_vcs) + return new_vcs + def _change_vcs(self, old_vcs, new_vcs): + new_vcs.encoding = self.encoding + new_vcs.root(self.root) + self.vcs_name = new_vcs.name @Property - @change_hook_property(hook=_change_rcs) - @cached_property(generator=_get_rcs) - @local_property("rcs") + @change_hook_property(hook=_change_vcs) + @cached_property(generator=_get_vcs) + @local_property("vcs") @doc_property(doc="A revision control system instance.") - def rcs(): return {} + def vcs(): return {} def _bug_map_gen(self): map = {} @@ -279,9 +294,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and def __init__(self, root=None, sink_to_existing_root=True, - assert_new_BugDir=False, allow_rcs_init=False, - manipulate_encodings=True, - from_disk=False, rcs=None): + assert_new_BugDir=False, allow_vcs_init=False, + manipulate_encodings=True, from_disk=False, vcs=None): list.__init__(self) settings_object.SavedSettingsObject.__init__(self) self._manipulate_encodings = manipulate_encodings @@ -293,9 +307,9 @@ settings easy. Don't set this attribute. Set .rcs instead, and if not os.path.exists(root): raise NoRootEntry(root) self.root = root - # get a temporary rcs until we've loaded settings + # get a temporary vcs until we've loaded settings self.sync_with_disk = False - self.rcs = self._guess_rcs() + self.vcs = self._guess_vcs() if from_disk == True: self.sync_with_disk = True @@ -305,20 +319,24 @@ settings easy. Don't set this attribute. Set .rcs instead, and if assert_new_BugDir == True: if os.path.exists(self.get_path()): raise AlreadyInitialized, self.get_path() - if rcs == None: - rcs = self._guess_rcs(allow_rcs_init) - self.rcs = rcs + if vcs == None: + vcs = self._guess_vcs(allow_vcs_init) + self.vcs = vcs self._setup_user_id(self.user_id) - def set_sync_with_disk(self, value): - self.sync_with_disk = value - for bug in self: - bug.set_sync_with_disk(value) + def __del__(self): + self.cleanup() + + def cleanup(self): + self.vcs.cleanup() + + # methods for getting the BugDir situated in the filesystem def _find_root(self, path): """ Search for an existing bug database dir and it's ancestors and - return a BugDir rooted there. + return a BugDir rooted there. Only called by __init__, and + then only if sink_to_existing_root == True. """ if not os.path.exists(path): raise NoRootEntry(path) @@ -334,136 +352,212 @@ settings easy. Don't set this attribute. Set .rcs instead, and raise NoBugDir(path) return beroot - def get_version(self, path=None, use_none_rcs=False): - if use_none_rcs == True: - RCS = rcs.rcs_by_name("None") - RCS.root(self.root) - RCS.encoding = encoding.get_encoding() + def _guess_vcs(self, allow_vcs_init=False): + """ + Only called by __init__. + """ + deepdir = self.get_path() + if not os.path.exists(deepdir): + deepdir = os.path.dirname(deepdir) + new_vcs = vcs.detect_vcs(deepdir) + install = False + if new_vcs.name == "None": + if allow_vcs_init == True: + new_vcs = vcs.installed_vcs() + new_vcs.init(self.root) + return new_vcs + + # methods for saving/loading/accessing settings and properties. + + def get_path(self, *args): + """ + Return a path relative to .root. + """ + dir = os.path.join(self.root, ".be") + if len(args) == 0: + return dir + assert args[0] in ["version", "settings", "bugs"], str(args) + return os.path.join(dir, *args) + + def _get_settings(self, settings_path, for_duplicate_bugdir=False): + allow_no_vcs = not self.vcs.path_in_root(settings_path) + if allow_no_vcs == True: + assert for_duplicate_bugdir == True + if self.sync_with_disk == False and for_duplicate_bugdir == False: + # duplicates can ignore this bugdir's .sync_with_disk status + raise DiskAccessRequired("_get settings") + try: + settings = mapfile.map_load(self.vcs, settings_path, allow_no_vcs) + except vcs.NoSuchFile: + settings = {"vcs_name": "None"} + return settings + + def _save_settings(self, settings_path, settings, + for_duplicate_bugdir=False): + allow_no_vcs = not self.vcs.path_in_root(settings_path) + if allow_no_vcs == True: + assert for_duplicate_bugdir == True + if self.sync_with_disk == False and for_duplicate_bugdir == False: + # duplicates can ignore this bugdir's .sync_with_disk status + raise DiskAccessRequired("_save settings") + self.vcs.mkdir(self.get_path(), allow_no_vcs) + mapfile.map_save(self.vcs, settings_path, settings, allow_no_vcs) + + def load_settings(self): + self.settings = self._get_settings(self.get_path("settings")) + self._setup_saved_settings() + self._setup_user_id(self.user_id) + self._setup_encoding(self.encoding) + self._setup_severities(self.severities) + self._setup_status(self.active_status, self.inactive_status) + self.vcs = vcs.vcs_by_name(self.vcs_name) + self._setup_user_id(self.user_id) + + def save_settings(self): + settings = self._get_saved_settings() + self._save_settings(self.get_path("settings"), settings) + + def get_version(self, path=None, use_none_vcs=False, + for_duplicate_bugdir=False): + """ + Requires disk access. + """ + if self.sync_with_disk == False: + raise DiskAccessRequired("get version") + if use_none_vcs == True: + VCS = vcs.vcs_by_name("None") + VCS.root(self.root) + VCS.encoding = encoding.get_encoding() else: - RCS = self.rcs + VCS = self.vcs if path == None: path = self.get_path("version") - tree_version = RCS.get_file_contents(path) - return tree_version + allow_no_vcs = not VCS.path_in_root(path) + if allow_no_vcs == True: + assert for_duplicate_bugdir == True + version = VCS.get_file_contents( + path, allow_no_vcs=allow_no_vcs).rstrip("\n") + return version def set_version(self): - self.rcs.mkdir(self.get_path()) - self.rcs.set_file_contents(self.get_path("version"), - TREE_VERSION_STRING) + """ + Requires disk access. + """ + if self.sync_with_disk == False: + raise DiskAccessRequired("set version") + self.vcs.mkdir(self.get_path()) + self.vcs.set_file_contents(self.get_path("version"), + upgrade.BUGDIR_DISK_VERSION+"\n") - def get_path(self, *args): - my_dir = os.path.join(self.root, ".be") - if len(args) == 0: - return my_dir - assert args[0] in ["version", "settings", "bugs"], str(args) - return os.path.join(my_dir, *args) + # methods controlling disk access - def _guess_rcs(self, allow_rcs_init=False): - deepdir = self.get_path() - if not os.path.exists(deepdir): - deepdir = os.path.dirname(deepdir) - new_rcs = rcs.detect_rcs(deepdir) - install = False - if new_rcs.name == "None": - if allow_rcs_init == True: - new_rcs = rcs.installed_rcs() - new_rcs.init(self.root) - return new_rcs + def set_sync_with_disk(self, value): + """ + Adjust .sync_with_disk for the BugDir and all it's children. + See the BugDir docstring for a description of the role of + .sync_with_disk. + """ + self.sync_with_disk = value + for bug in self: + bug.set_sync_with_disk(value) def load(self): - version = self.get_version(use_none_rcs=True) - if version != TREE_VERSION_STRING: - raise NotImplementedError, \ - "BugDir cannot handle version '%s' yet." % version + """ + Reqires disk access + """ + version = self.get_version(use_none_vcs=True) + if version != upgrade.BUGDIR_DISK_VERSION: + upgrade.upgrade(self.root, version) else: if not os.path.exists(self.get_path()): raise NoBugDir(self.get_path()) self.load_settings() - self.rcs = rcs.rcs_by_name(self.rcs_name) - self._setup_user_id(self.user_id) - def load_all_bugs(self): - "Warning: this could take a while." + """ + Requires disk access. + Warning: this could take a while. + """ + if self.sync_with_disk == False: + raise DiskAccessRequired("load all bugs") self._clear_bugs() for uuid in self.list_uuids(): self._load_bug(uuid) def save(self): """ + Note that this command writes to disk _regardless_ of the + status of .sync_with_disk. + Save any loaded contents to disk. Because of lazy loading of bugs and comments, this is actually not too inefficient. - However, if self.sync_with_disk = True, then any changes are + However, if .sync_with_disk = True, then any changes are automatically written to disk as soon as they happen, so calling this method will just waste time (unless something else has been messing with your on-disk files). + + Requires disk access. """ + sync_with_disk = self.sync_with_disk + if sync_with_disk == False: + self.set_sync_with_disk(True) self.set_version() self.save_settings() for bug in self: bug.save() + if sync_with_disk == False: + self.set_sync_with_disk(sync_with_disk) - def load_settings(self): - self.settings = self._get_settings(self.get_path("settings")) - self._setup_saved_settings() - self._setup_user_id(self.user_id) - self._setup_encoding(self.encoding) - self._setup_severities(self.severities) - self._setup_status(self.active_status, self.inactive_status) + # methods for managing duplicate BugDirs - def _get_settings(self, settings_path): - allow_no_rcs = not self.rcs.path_in_root(settings_path) - # allow_no_rcs=True should only be for the special case of - # configuring duplicate bugdir settings + def duplicate_bugdir(self, revision): + duplicate_path = self.vcs.duplicate_repo(revision) + duplicate_version_path = os.path.join(duplicate_path, ".be", "version") try: - settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs) - except rcs.NoSuchFile: - settings = {"rcs_name": "None"} - return settings - - def save_settings(self): - settings = self._get_saved_settings() - self._save_settings(self.get_path("settings"), settings) - - def _save_settings(self, settings_path, settings): - allow_no_rcs = not self.rcs.path_in_root(settings_path) - # allow_no_rcs=True should only be for the special case of - # configuring duplicate bugdir settings - self.rcs.mkdir(self.get_path(), allow_no_rcs) - mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs) - - def duplicate_bugdir(self, revision): - duplicate_path = self.rcs.duplicate_repo(revision) + version = self.get_version(duplicate_version_path, + for_duplicate_bugdir=True) + except DiskAccessRequired: + self.sync_with_disk = True # temporarily allow access + version = self.get_version(duplicate_version_path, + for_duplicate_bugdir=True) + self.sync_with_disk = False + if version != upgrade.BUGDIR_DISK_VERSION: + upgrade.upgrade(duplicate_path, version) - # setup revision RCS as None, since the duplicate may not be + # setup revision VCS as None, since the duplicate may not be # initialized for versioning duplicate_settings_path = os.path.join(duplicate_path, ".be", "settings") - duplicate_settings = self._get_settings(duplicate_settings_path) - if "rcs_name" in duplicate_settings: - duplicate_settings["rcs_name"] = "None" + duplicate_settings = self._get_settings(duplicate_settings_path, + for_duplicate_bugdir=True) + if "vcs_name" in duplicate_settings: + duplicate_settings["vcs_name"] = "None" duplicate_settings["user_id"] = self.user_id if "disabled" in bug.status_values: # Hack to support old versions of BE bugs duplicate_settings["inactive_status"] = self.inactive_status - self._save_settings(duplicate_settings_path, duplicate_settings) + self._save_settings(duplicate_settings_path, duplicate_settings, + for_duplicate_bugdir=True) return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings) def remove_duplicate_bugdir(self): - self.rcs.remove_duplicate_repo() + self.vcs.remove_duplicate_repo() + + # methods for managing bugs def list_uuids(self): uuids = [] - if os.path.exists(self.get_path()): + if self.sync_with_disk == True and os.path.exists(self.get_path()): # list the uuids on disk - for uuid in os.listdir(self.get_path("bugs")): - if not (uuid.startswith('.')): - uuids.append(uuid) - yield uuid + if os.path.exists(self.get_path("bugs")): + for uuid in os.listdir(self.get_path("bugs")): + if not (uuid.startswith('.')): + uuids.append(uuid) + yield uuid # and the ones that are still just in memory for bug in self: if bug.uuid not in uuids: @@ -476,6 +570,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and self._bug_map_gen() def _load_bug(self, uuid): + if self.sync_with_disk == False: + raise DiskAccessRequired("_load bug") bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True) self.append(bg) self._bug_map_gen() @@ -492,7 +588,8 @@ settings easy. Don't set this attribute. Set .rcs instead, and def remove_bug(self, bug): self.remove(bug) - bug.remove() + if bug.sync_with_disk == True: + bug.remove() def bug_shortname(self, bug): """ @@ -514,12 +611,13 @@ settings easy. Don't set this attribute. Set .rcs instead, and def bug_from_shortname(self, shortname): """ - >>> bd = simple_bug_dir() + >>> bd = SimpleBugDir(sync_with_disk=False) >>> bug_a = bd.bug_from_shortname('a') >>> print type(bug_a) <class 'libbe.bug.Bug'> >>> print bug_a a:om: Bug A + >>> bd.cleanup() """ matches = [] self._bug_map_gen() @@ -530,7 +628,7 @@ settings easy. Don't set this attribute. Set .rcs instead, and raise MultipleBugMatches(shortname, matches) if len(matches) == 1: return self.bug_from_uuid(matches[0]) - raise KeyError("No bug matches %s" % shortname) + raise NoBugMatches(shortname) def bug_from_uuid(self, uuid): if not self.has_bug(uuid): @@ -548,41 +646,56 @@ settings easy. Don't set this attribute. Set .rcs instead, and return True -def simple_bug_dir(): +class SimpleBugDir (BugDir): """ - For testing - >>> bugdir = simple_bug_dir() - >>> ls = list(bugdir.list_uuids()) - >>> ls.sort() - >>> print ls + For testing. Set sync_with_disk==False for a memory-only bugdir. + >>> bugdir = SimpleBugDir() + >>> uuids = list(bugdir.list_uuids()) + >>> uuids.sort() + >>> print uuids ['a', 'b'] + >>> bugdir.cleanup() """ - dir = utility.Dir() - assert os.path.exists(dir.path) - bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True, + def __init__(self, sync_with_disk=True): + if sync_with_disk == True: + dir = utility.Dir() + assert os.path.exists(dir.path) + root = dir.path + assert_new_BugDir = True + vcs_init = True + else: + root = "/" + assert_new_BugDir = False + vcs_init = False + BugDir.__init__(self, root, sink_to_existing_root=False, + assert_new_BugDir=assert_new_BugDir, + allow_vcs_init=vcs_init, manipulate_encodings=False) - bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir. - bug_a = bugdir.new_bug("a", summary="Bug A") - bug_a.creator = "John Doe <jdoe@example.com>" - bug_a.time = 0 - bug_b = bugdir.new_bug("b", summary="Bug B") - bug_b.creator = "Jane Doe <jdoe@example.com>" - bug_b.time = 0 - bug_b.status = "closed" - bugdir.save() - return bugdir - + if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir. + self._dir_ref = dir + bug_a = self.new_bug("a", summary="Bug A") + bug_a.creator = "John Doe <jdoe@example.com>" + bug_a.time = 0 + bug_b = self.new_bug("b", summary="Bug B") + bug_b.creator = "Jane Doe <jdoe@example.com>" + bug_b.time = 0 + bug_b.status = "closed" + if sync_with_disk == True: + self.save() + self.set_sync_with_disk(True) + def cleanup(self): + if hasattr(self, "_dir_ref"): + self._dir_ref.cleanup() + BugDir.cleanup(self) class BugDirTestCase(unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) def setUp(self): self.dir = utility.Dir() self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, - allow_rcs_init=True) - self.rcs = self.bugdir.rcs + allow_vcs_init=True) + self.vcs = self.bugdir.vcs def tearDown(self): - self.rcs.cleanup() + self.bugdir.cleanup() self.dir.cleanup() def fullPath(self, path): return os.path.join(self.dir.path, path) @@ -593,13 +706,13 @@ class BugDirTestCase(unittest.TestCase): self.assertRaises(AlreadyInitialized, BugDir, self.dir.path, assertNewBugDir=True) def versionTest(self): - if self.rcs.versioned == False: + if self.vcs.versioned == False: return - original = self.bugdir.rcs.commit("Began versioning") + original = self.bugdir.vcs.commit("Began versioning") bugA = self.bugdir.bug_from_uuid("a") bugA.status = "fixed" self.bugdir.save() - new = self.rcs.commit("Fixed bug a") + new = self.vcs.commit("Fixed bug a") dupdir = self.bugdir.duplicate_bugdir(original) self.failUnless(dupdir.root != self.bugdir.root, "%s, %s" % (dupdir.root, self.bugdir.root)) @@ -645,17 +758,19 @@ class BugDirTestCase(unittest.TestCase): rep.new_reply("And they have six legs.") if sync_with_disk == False: self.bugdir.save() + self.bugdir.set_sync_with_disk(True) self.bugdir._clear_bugs() bug = self.bugdir.bug_from_uuid("a") bug.load_comments() + if sync_with_disk == False: + self.bugdir.set_sync_with_disk(False) self.failUnless(len(bug.comment_root)==1, len(bug.comment_root)) for index,comment in enumerate(bug.comments()): if index == 0: repLoaded = comment self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid) - self.failUnless(comment.sync_with_disk == True, + self.failUnless(comment.sync_with_disk == sync_with_disk, comment.sync_with_disk) - #load_settings() self.failUnless(comment.content_type == "text/plain", comment.content_type) self.failUnless(repLoaded.settings["Content-type"]=="text/plain", @@ -672,5 +787,46 @@ class BugDirTestCase(unittest.TestCase): def testSyncedComments(self): self.testComments(sync_with_disk=True) -unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase) -suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()]) +class SimpleBugDirTestCase (unittest.TestCase): + def setUp(self): + # create a pre-existing bugdir in a temporary directory + self.dir = utility.Dir() + self.original_working_dir = os.getcwd() + os.chdir(self.dir.path) + self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, + allow_vcs_init=True) + self.bugdir.new_bug("preexisting", summary="Hopefully not imported") + self.bugdir.save() + def tearDown(self): + os.chdir(self.original_working_dir) + self.bugdir.cleanup() + self.dir.cleanup() + def testOnDiskCleanLoad(self): + """SimpleBugDir(sync_with_disk==True) should not import preexisting bugs.""" + bugdir = SimpleBugDir(sync_with_disk=True) + self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk) + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == ['a', 'b'], uuids) + bugdir._clear_bugs() + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == [], uuids) + bugdir.load_all_bugs() + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == ['a', 'b'], uuids) + bugdir.cleanup() + def testInMemoryCleanLoad(self): + """SimpleBugDir(sync_with_disk==False) should not import preexisting bugs.""" + bugdir = SimpleBugDir(sync_with_disk=False) + self.failUnless(bugdir.sync_with_disk==False, bugdir.sync_with_disk) + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == ['a', 'b'], uuids) + self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs) + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == ['a', 'b'], uuids) + bugdir._clear_bugs() + uuids = sorted([bug.uuid for bug in bugdir]) + self.failUnless(uuids == [], uuids) + bugdir.cleanup() + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/bzr.py b/libbe/bzr.py index d7cd1e5..e9e0649 100644 --- a/libbe/bzr.py +++ b/libbe/bzr.py @@ -17,61 +17,65 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +Bazaar (bzr) backend. +""" + import os import re import sys import unittest import doctest -import rcs -from rcs import RCS +import vcs + def new(): return Bzr() -class Bzr(RCS): +class Bzr(vcs.VCS): name = "bzr" client = "bzr" versioned = True - def _rcs_help(self): + def _vcs_help(self): status,output,error = self._u_invoke_client("--help") return output - def _rcs_detect(self, path): + def _vcs_detect(self, path): if self._u_search_parent_directories(path, ".bzr") != None : return True return False - def _rcs_root(self, path): + def _vcs_root(self, path): """Find the root of the deepest repository containing path.""" status,output,error = self._u_invoke_client("root", path) return output.rstrip('\n') - def _rcs_init(self, path): + def _vcs_init(self, path): self._u_invoke_client("init", directory=path) - def _rcs_get_user_id(self): + def _vcs_get_user_id(self): status,output,error = self._u_invoke_client("whoami") return output.rstrip('\n') - def _rcs_set_user_id(self, value): + def _vcs_set_user_id(self, value): self._u_invoke_client("whoami", value) - def _rcs_add(self, path): + def _vcs_add(self, path): self._u_invoke_client("add", path) - def _rcs_remove(self, path): + def _vcs_remove(self, path): # --force to also remove unversioned files. self._u_invoke_client("remove", "--force", path) - def _rcs_update(self, path): + def _vcs_update(self, path): pass - def _rcs_get_file_contents(self, path, revision=None, binary=False): + def _vcs_get_file_contents(self, path, revision=None, binary=False): if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, binary=binary) + return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary) else: status,output,error = \ self._u_invoke_client("cat","-r",revision,path) return output - def _rcs_duplicate_repo(self, directory, revision=None): + def _vcs_duplicate_repo(self, directory, revision=None): if revision == None: - RCS._rcs_duplicate_repo(self, directory, revision) + vcs.VCS._vcs_duplicate_repo(self, directory, revision) else: self._u_invoke_client("branch", "--revision", revision, ".", directory) - def _rcs_commit(self, commitfile, allow_empty=False): + def _vcs_commit(self, commitfile, allow_empty=False): args = ["commit", "--file", commitfile] if allow_empty == True: args.append("--unchanged") @@ -83,9 +87,9 @@ class Bzr(RCS): strings = ["ERROR: no changes to commit.", # bzr 1.3.1 "ERROR: No changes to commit."] # bzr 1.15.1 if self._u_any_in_string(strings, error) == True: - raise rcs.EmptyCommit() + raise vcs.EmptyCommit() else: - raise rcs.CommandError(args, status, error) + raise vcs.CommandError(args, status, stdout="", stderr=error) revision = None revline = re.compile("Committed revision (.*)[.]") match = revline.search(error) @@ -93,9 +97,17 @@ class Bzr(RCS): assert len(match.groups()) == 1 revision = match.groups()[0] return revision + def _vcs_revision_id(self, index): + status,output,error = self._u_invoke_client("revno") + current_revision = int(output) + if index >= current_revision or index < -current_revision: + return None + if index >= 0: + return str(index+1) # bzr commit 0 is the empty tree. + return str(current_revision+index+1) -rcs.make_rcs_testcase_subclasses(Bzr, sys.modules[__name__]) +vcs.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__]) unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/cmdutil.py b/libbe/cmdutil.py index 853a75a..9b64142 100644 --- a/libbe/cmdutil.py +++ b/libbe/cmdutil.py @@ -15,6 +15,11 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Define assorted utilities to make command-line handling easier. +""" + import glob import optparse import os @@ -70,10 +75,11 @@ def get_command(command_name): return cmd -def execute(cmd, args): +def execute(cmd, args, manipulate_encodings=True): enc = encoding.get_encoding() cmd = get_command(cmd) - ret = cmd.execute([a.decode(enc) for a in args]) + ret = cmd.execute([a.decode(enc) for a in args], + manipulate_encodings=manipulate_encodings) if ret == None: ret = 0 return ret @@ -206,6 +212,15 @@ def underlined(instring): return "%s\n%s" % (instring, "="*len(instring)) +def bug_from_shortname(bdir, shortname): + """ + Exception translation for the command-line interface. + """ + try: + bug = bdir.bug_from_shortname(shortname) + except (bugdir.MultipleBugMatches, bugdir.NoBugMatches), e: + raise UserError(e.message) + return bug def _test(): import doctest diff --git a/libbe/comment.py b/libbe/comment.py index 3249e8b..41bc7e6 100644 --- a/libbe/comment.py +++ b/libbe/comment.py @@ -16,6 +16,11 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Define the Comment class for representing bug comments. +""" + import base64 import os import os.path @@ -61,6 +66,11 @@ class MissingReference(ValueError): self.reference = comment.in_reply_to self.comment = comment +class DiskAccessRequired (Exception): + def __init__(self, goal): + msg = "Cannot %s without accessing the disk" % goal + Exception.__init__(self, msg) + INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!" def list_to_root(comments, bug, root=None, @@ -115,8 +125,10 @@ def loadComments(bug, load_full=False): Set load_full=True when you want to load the comment completely from disk *now*, rather than waiting and lazy loading as required. """ + if bug.sync_with_disk == False: + raise DiskAccessRequired("load comments") path = bug.get_path("comments") - if not os.path.isdir(path): + if not os.path.exists(path): return Comment(bug, uuid=INVALID_UUID) comments = [] for uuid in os.listdir(path): @@ -131,6 +143,8 @@ def loadComments(bug, load_full=False): return list_to_root(comments, bug) def saveComments(bug): + if bug.sync_with_disk == False: + raise DiskAccessRequired("save comments") for comment in bug.comment_root.traverse(): comment.save() @@ -162,9 +176,9 @@ class Comment(Tree, settings_object.SavedSettingsObject): doc="Alternate ID for linking imported comments. Internally comments are linked (via In-reply-to) to the parent's UUID. However, these UUIDs are generated internally, so Alt-id is provided as a user-controlled linking target.") def alt_id(): return {} - @_versioned_property(name="From", + @_versioned_property(name="Author", doc="The author of the comment") - def From(): return {} + def author(): return {} @_versioned_property(name="In-reply-to", doc="UUID for parent comment or bug") @@ -178,28 +192,28 @@ class Comment(Tree, settings_object.SavedSettingsObject): @_versioned_property(name="Date", doc="An RFC 2822 timestamp for comment creation") - def time_string(): return {} + def date(): return {} def _get_time(self): - if self.time_string == None: + if self.date == None: return None - return utility.str_to_time(self.time_string) + return utility.str_to_time(self.date) def _set_time(self, value): - self.time_string = utility.time_to_str(value) + self.date = utility.time_to_str(value) time = property(fget=_get_time, fset=_set_time, - doc="An integer version of .time_string") + doc="An integer version of .date") def _get_comment_body(self): - if self.rcs != None and self.sync_with_disk == True: - import rcs + if self.vcs != None and self.sync_with_disk == True: + import vcs binary = not self.content_type.startswith("text/") - return self.rcs.get_file_contents(self.get_path("body"), binary=binary) + return self.vcs.get_file_contents(self.get_path("body"), binary=binary) def _set_comment_body(self, old=None, new=None, force=False): - if (self.rcs != None and self.sync_with_disk == True) or force==True: + if (self.vcs != None and self.sync_with_disk == True) or force==True: assert new != None, "Can't save empty comment" binary = not self.content_type.startswith("text/") - self.rcs.set_file_contents(self.get_path("body"), new, binary=binary) + self.vcs.set_file_contents(self.get_path("body"), new, binary=binary) @Property @change_hook_property(hook=_set_comment_body) @@ -208,15 +222,15 @@ class Comment(Tree, settings_object.SavedSettingsObject): @doc_property(doc="The meat of the comment") def body(): return {} - def _get_rcs(self): - if hasattr(self.bug, "rcs"): - return self.bug.rcs + def _get_vcs(self): + if hasattr(self.bug, "vcs"): + return self.bug.vcs @Property - @cached_property(generator=_get_rcs) - @local_property("rcs") + @cached_property(generator=_get_vcs) + @local_property("vcs") @doc_property(doc="A revision control system instance.") - def rcs(): return {} + def vcs(): return {} def _extra_strings_check_fn(value): return utility.iterable_full_of_strings(value, \ @@ -257,13 +271,29 @@ class Comment(Tree, settings_object.SavedSettingsObject): if uuid == None: self.uuid = uuid_gen() self.time = int(time.time()) # only save to second precision - if self.rcs != None: - self.From = self.rcs.get_user_id() + if self.vcs != None: + self.author = self.vcs.get_user_id() self.in_reply_to = in_reply_to self.body = body - def set_sync_with_disk(self, value): - self.sync_with_disk = True + def __cmp__(self, other): + return cmp_full(self, other) + + def __str__(self): + """ + >>> comm = Comment(bug=None, body="Some insightful remarks") + >>> comm.uuid = "com-1" + >>> comm.date = "Thu, 20 Nov 2008 15:55:11 +0000" + >>> comm.author = "Jane Doe <jdoe@example.com>" + >>> print comm + --------- Comment --------- + Name: com-1 + From: Jane Doe <jdoe@example.com> + Date: Thu, 20 Nov 2008 15:55:11 +0000 + <BLANKLINE> + Some insightful remarks + """ + return self.string() def traverse(self, *args, **kwargs): """Avoid working with the possible dummy root comment""" @@ -272,6 +302,8 @@ class Comment(Tree, settings_object.SavedSettingsObject): continue yield comment + # serializing methods + def _setting_attr_string(self, setting): value = getattr(self, setting) if value == None: @@ -282,12 +314,12 @@ class Comment(Tree, settings_object.SavedSettingsObject): """ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") >>> comm.uuid = "0123" - >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000" + >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> print comm.xml(indent=2, shortname="com-1") <comment> <uuid>0123</uuid> <short-name>com-1</short-name> - <from></from> + <author></author> <date>Thu, 01 Jan 1970 00:00:00 +0000</date> <content-type>text/plain</content-type> <body>Some @@ -309,8 +341,8 @@ class Comment(Tree, settings_object.SavedSettingsObject): ("alt-id", self.alt_id), ("short-name", shortname), ("in-reply-to", self.in_reply_to), - ("from", self._setting_attr_string("From")), - ("date", self.time_string), + ("author", self._setting_attr_string("author")), + ("date", self.date), ("content-type", self.content_type), ("body", body)] lines = ["<comment>"] @@ -328,11 +360,11 @@ class Comment(Tree, settings_object.SavedSettingsObject): <alt-id> fields. >>> commA = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") >>> commA.uuid = "0123" - >>> commA.time_string = "Thu, 01 Jan 1970 00:00:00 +0000" + >>> commA.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> xml = commA.xml(shortname="com-1") >>> commB = Comment() >>> commB.from_xml(xml) - >>> attrs=['uuid','alt_id','in_reply_to','From','time_string','content_type','body'] + >>> attrs=['uuid','alt_id','in_reply_to','author','date','content_type','body'] >>> for attr in attrs: # doctest: +ELLIPSIS ... if getattr(commB, attr) != getattr(commA, attr): ... estr = "Mismatch on %s: '%s' should be '%s'" @@ -342,15 +374,15 @@ class Comment(Tree, settings_object.SavedSettingsObject): Mismatch on alt_id: '0123' should be 'None' >>> print commB.alt_id 0123 - >>> commA.From - >>> commB.From + >>> commA.author + >>> commB.author """ if type(xml_string) == types.UnicodeType: xml_string = xml_string.strip().encode("unicode_escape") comment = ElementTree.XML(xml_string) if comment.tag != "comment": raise InvalidXML(comment, "root element must be <comment>") - tags=['uuid','alt-id','in-reply-to','from','date','content-type','body'] + tags=['uuid','alt-id','in-reply-to','author','date','content-type','body'] uuid = None body = None for child in comment.getchildren(): @@ -368,10 +400,6 @@ class Comment(Tree, settings_object.SavedSettingsObject): if child.tag == "body": body = text continue # don't set the bug's body yet. - elif child.tag == 'from': - attr_name = "From" - elif child.tag == 'date': - attr_name = 'time_string' else: attr_name = child.tag.replace('-','_') setattr(self, attr_name, text) @@ -389,7 +417,7 @@ class Comment(Tree, settings_object.SavedSettingsObject): def string(self, indent=0, shortname=None): """ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") - >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000" + >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000" >>> print comm.string(indent=2, shortname="com-1") --------- Comment --------- Name: com-1 @@ -405,8 +433,8 @@ class Comment(Tree, settings_object.SavedSettingsObject): lines = [] lines.append("--------- Comment ---------") lines.append("Name: %s" % shortname) - lines.append("From: %s" % (self._setting_attr_string("From"))) - lines.append("Date: %s" % self.time_string) + lines.append("From: %s" % (self._setting_attr_string("author"))) + lines.append("Date: %s" % self.date) lines.append("") if self.content_type.startswith("text/"): lines.extend((self.body or "").splitlines()) @@ -417,78 +445,6 @@ class Comment(Tree, settings_object.SavedSettingsObject): sep = '\n' + istring return istring + sep.join(lines).rstrip('\n') - def __str__(self): - """ - >>> comm = Comment(bug=None, body="Some insightful remarks") - >>> comm.uuid = "com-1" - >>> comm.time_string = "Thu, 20 Nov 2008 15:55:11 +0000" - >>> comm.From = "Jane Doe <jdoe@example.com>" - >>> print comm - --------- Comment --------- - Name: com-1 - From: Jane Doe <jdoe@example.com> - Date: Thu, 20 Nov 2008 15:55:11 +0000 - <BLANKLINE> - Some insightful remarks - """ - return self.string() - - def get_path(self, name=None): - my_dir = os.path.join(self.bug.get_path("comments"), self.uuid) - if name is None: - return my_dir - assert name in ["values", "body"] - return os.path.join(my_dir, name) - - def load_settings(self): - self.settings = mapfile.map_load(self.rcs, self.get_path("values")) - self._setup_saved_settings() - - def save_settings(self): - self.rcs.mkdir(self.get_path()) - path = self.get_path("values") - mapfile.map_save(self.rcs, path, self._get_saved_settings()) - - def save(self): - """ - Save any loaded contents to disk. - - However, if self.sync_with_disk = True, then any changes are - automatically written to disk as soon as they happen, so - calling this method will just waste time (unless something - else has been messing with your on-disk files). - """ - assert self.body != None, "Can't save blank comment" - self.save_settings() - self._set_comment_body(new=self.body, force=True) - - def remove(self): - for comment in self.traverse(): - path = comment.get_path() - self.rcs.recursive_remove(path) - - def add_reply(self, reply, allow_time_inversion=False): - if self.uuid != INVALID_UUID: - reply.in_reply_to = self.uuid - self.append(reply) - #raise Exception, "adding reply \n%s\n%s" % (self, reply) - - def new_reply(self, body=None): - """ - >>> comm = Comment(bug=None, body="Some insightful remarks") - >>> repA = comm.new_reply("Critique original comment") - >>> repB = repA.new_reply("Begin flamewar :p") - >>> repB.in_reply_to == repA.uuid - True - """ - reply = Comment(self.bug, body=body) - if self.bug != None: - reply.set_sync_with_disk(self.bug.sync_with_disk) - if reply.sync_with_disk == True: - reply.save() - self.add_reply(reply) - return reply - def string_thread(self, string_method_name="string", name_map={}, indent=0, flatten=True, auto_name_map=False, bug_shortname=None): @@ -506,7 +462,7 @@ class Comment(Tree, settings_object.SavedSettingsObject): name_map = {} for shortname,comment in comm.comment_shortnames(bug_shortname): name_map[comment.uuid] = shortname - comm.sort(key=lambda c : c.From) # your sort + comm.sort(key=lambda c : c.author) # your sort comm.string_thread(name_map=name_map) >>> a = Comment(bug=None, uuid="a", body="Insightful remarks") @@ -593,6 +549,77 @@ class Comment(Tree, settings_object.SavedSettingsObject): indent=indent, auto_name_map=auto_name_map, bug_shortname=bug_shortname) + # methods for saving/loading/acessing settings and properties. + + def get_path(self, *args): + dir = os.path.join(self.bug.get_path("comments"), self.uuid) + if len(args) == 0: + return dir + assert args[0] in ["values", "body"], str(args) + return os.path.join(dir, *args) + + def set_sync_with_disk(self, value): + self.sync_with_disk = value + + def load_settings(self): + if self.sync_with_disk == False: + raise DiskAccessRequired("load settings") + self.settings = mapfile.map_load(self.vcs, self.get_path("values")) + self._setup_saved_settings() + + def save_settings(self): + if self.sync_with_disk == False: + raise DiskAccessRequired("save settings") + self.vcs.mkdir(self.get_path()) + path = self.get_path("values") + mapfile.map_save(self.vcs, path, self._get_saved_settings()) + + def save(self): + """ + Save any loaded contents to disk. + + However, if self.sync_with_disk = True, then any changes are + automatically written to disk as soon as they happen, so + calling this method will just waste time (unless something + else has been messing with your on-disk files). + """ + sync_with_disk = self.sync_with_disk + if sync_with_disk == False: + self.set_sync_with_disk(True) + assert self.body != None, "Can't save blank comment" + self.save_settings() + self._set_comment_body(new=self.body, force=True) + if sync_with_disk == False: + self.set_sync_with_disk(False) + + def remove(self): + if self.sync_with_disk == False and self.uuid != INVALID_UUID: + raise DiskAccessRequired("remove") + for comment in self.traverse(): + path = comment.get_path() + self.vcs.recursive_remove(path) + + def add_reply(self, reply, allow_time_inversion=False): + if self.uuid != INVALID_UUID: + reply.in_reply_to = self.uuid + self.append(reply) + + def new_reply(self, body=None): + """ + >>> comm = Comment(bug=None, body="Some insightful remarks") + >>> repA = comm.new_reply("Critique original comment") + >>> repB = repA.new_reply("Begin flamewar :p") + >>> repB.in_reply_to == repA.uuid + True + """ + reply = Comment(self.bug, body=body) + if self.bug != None: + reply.set_sync_with_disk(self.bug.sync_with_disk) + if reply.sync_with_disk == True: + reply.save() + self.add_reply(reply) + return reply + def comment_shortnames(self, bug_shortname=None): """ Iterate through (id, comment) pairs, in time order. @@ -659,4 +686,59 @@ class Comment(Tree, settings_object.SavedSettingsObject): return comment raise KeyError(uuid) +def cmp_attr(comment_1, comment_2, attr, invert=False): + """ + Compare a general attribute between two comments using the conventional + comparison rule for that attribute type. If invert == True, sort + *against* that convention. + >>> attr="author" + >>> commentA = Comment() + >>> commentB = Comment() + >>> commentA.author = "John Doe" + >>> commentB.author = "Jane Doe" + >>> cmp_attr(commentA, commentB, attr) > 0 + True + >>> cmp_attr(commentA, commentB, attr, invert=True) < 0 + True + >>> commentB.author = "John Doe" + >>> cmp_attr(commentA, commentB, attr) == 0 + True + """ + if not hasattr(comment_2, attr) : + return 1 + val_1 = getattr(comment_1, attr) + val_2 = getattr(comment_2, attr) + if val_1 == None: val_1 = None + if val_2 == None: val_2 = None + + if invert == True : + return -cmp(val_1, val_2) + else : + return cmp(val_1, val_2) + +# alphabetical rankings (a < z) +cmp_uuid = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "uuid") +cmp_author = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "author") +cmp_in_reply_to = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "in_reply_to") +cmp_content_type = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "content_type") +cmp_body = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "body") +# chronological rankings (newer < older) +cmp_time = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "time", invert=True) + +DEFAULT_CMP_FULL_CMP_LIST = \ + (cmp_time, cmp_author, cmp_content_type, cmp_body, cmp_in_reply_to, + cmp_uuid) + +class CommentCompoundComparator (object): + def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST): + self.cmp_list = cmp_list + def __call__(self, comment_1, comment_2): + for comparison in self.cmp_list : + val = comparison(comment_1, comment_2) + if val != 0 : + return val + return 0 + +cmp_full = CommentCompoundComparator() + suite = doctest.DocTestSuite() diff --git a/libbe/config.py b/libbe/config.py index 5e343b9..fb5a028 100644 --- a/libbe/config.py +++ b/libbe/config.py @@ -14,6 +14,11 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Create, save, and load the per-user config file at path(). +""" + import ConfigParser import codecs import locale @@ -21,6 +26,7 @@ import os.path import sys import doctest + default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding() def path(): diff --git a/libbe/darcs.py b/libbe/darcs.py index e7132c0..16005f2 100644 --- a/libbe/darcs.py +++ b/libbe/darcs.py @@ -14,31 +14,40 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +Darcs backend. +""" + import codecs import os import re import sys -import unittest +try: # import core module, Python >= 2.5 + from xml.etree import ElementTree +except ImportError: # look for non-core module + from elementtree import ElementTree +from xml.sax.saxutils import unescape import doctest +import unittest + +import vcs -import rcs -from rcs import RCS def new(): return Darcs() -class Darcs(RCS): +class Darcs(vcs.VCS): name="darcs" client="darcs" versioned=True - def _rcs_help(self): + def _vcs_help(self): status,output,error = self._u_invoke_client("--help") return output - def _rcs_detect(self, path): + def _vcs_detect(self, path): if self._u_search_parent_directories(path, "_darcs") != None : return True return False - def _rcs_root(self, path): + def _vcs_root(self, path): """Find the root of the deepest repository containing path.""" # Assume that nothing funny is going on; in particular, that we aren't # dealing with a bare repo. @@ -48,9 +57,9 @@ class Darcs(RCS): if darcs_dir == None: return None return os.path.dirname(darcs_dir) - def _rcs_init(self, path): + def _vcs_init(self, path): self._u_invoke_client("init", directory=path) - def _rcs_get_user_id(self): + def _vcs_get_user_id(self): # following http://darcs.net/manual/node4.html#SECTION00410030000000000000 # as of June 29th, 2009 if self.rootdir == None: @@ -65,32 +74,32 @@ class Darcs(RCS): if env_variable in os.environ: return os.environ[env_variable] return None - def _rcs_set_user_id(self, value): + def _vcs_set_user_id(self, value): if self.rootdir == None: self.root(".") if self.rootdir == None: - raise rcs.SettingIDnotSupported + raise vcs.SettingIDnotSupported author_path = os.path.join(self.rootdir, "_darcs", "prefs", "author") f = codecs.open(author_path, "w", self.encoding) f.write(value) f.close() - def _rcs_add(self, path): + def _vcs_add(self, path): if os.path.isdir(path): return self._u_invoke_client("add", path) - def _rcs_remove(self, path): + def _vcs_remove(self, path): if not os.path.isdir(self._u_abspath(path)): os.remove(os.path.join(self.rootdir, path)) # darcs notices removal - def _rcs_update(self, path): + def _vcs_update(self, path): pass # darcs notices changes - def _rcs_get_file_contents(self, path, revision=None, binary=False): + def _vcs_get_file_contents(self, path, revision=None, binary=False): if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, + return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary) else: try: return self._u_invoke_client("show", "contents", "--patch", revision, path) - except rcs.CommandError: + except vcs.CommandError: # Darcs versions < 2.0.0pre2 lack the "show contents" command status,output,error = self._u_invoke_client("diff", "--unified", @@ -113,7 +122,7 @@ class Darcs(RCS): status,output,error = self._u_invoke(args, stdin=target_patch) if os.path.exists(os.path.join(self.rootdir, path)) == True: - contents = RCS._rcs_get_file_contents(self, path, + contents = vcs.VCS._vcs_get_file_contents(self, path, binary=binary) else: contents = "" @@ -123,41 +132,53 @@ class Darcs(RCS): status,output,error = self._u_invoke(args, stdin=target_patch) args=["patch", path] status,output,error = self._u_invoke(args, stdin=major_patch) - current_contents = RCS._rcs_get_file_contents(self, path, + current_contents = vcs.VCS._vcs_get_file_contents(self, path, binary=binary) return contents - def _rcs_duplicate_repo(self, directory, revision=None): + def _vcs_duplicate_repo(self, directory, revision=None): if revision==None: - RCS._rcs_duplicate_repo(self, directory, revision) + vcs.VCS._vcs_duplicate_repo(self, directory, revision) else: self._u_invoke_client("put", "--to-patch", revision, directory) - def _rcs_commit(self, commitfile, allow_empty=False): + def _vcs_commit(self, commitfile, allow_empty=False): id = self.get_user_id() if '@' not in id: id = "%s <%s@invalid.com>" % (id, id) args = ['record', '--all', '--author', id, '--logfile', commitfile] status,output,error = self._u_invoke_client(*args) empty_strings = ["No changes!"] - revision = None if self._u_any_in_string(empty_strings, output) == True: if allow_empty == False: - raise rcs.EmptyCommit() - else: # we need a extra call to get the current revision - args = ["changes", "--last=1", "--xml"] - status,output,error = self._u_invoke_client(*args) - revline = re.compile("[ \t]*<name>(.*)</name>") - # note that darcs does _not_ make an empty revision. - # this returns the last non-empty revision id... + raise vcs.EmptyCommit() + # note that darcs does _not_ make an empty revision. + # this returns the last non-empty revision id... + revision = self._vcs_revision_id(-1) else: revline = re.compile("Finished recording patch '(.*)'") - match = revline.search(output) - assert match != None, output+error - assert len(match.groups()) == 1 - revision = match.groups()[0] + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revision = match.groups()[0] return revision - + def _vcs_revision_id(self, index): + status,output,error = self._u_invoke_client("changes", "--xml") + revisions = [] + xml_str = output.encode("unicode_escape").replace(r"\n", "\n") + element = ElementTree.XML(xml_str) + assert element.tag == "changelog", element.tag + for patch in element.getchildren(): + assert patch.tag == "patch", patch.tag + for child in patch.getchildren(): + if child.tag == "name": + text = unescape(unicode(child.text).decode("unicode_escape").strip()) + revisions.append(text) + revisions.reverse() + try: + return revisions[index] + except IndexError: + return None -rcs.make_rcs_testcase_subclasses(Darcs, sys.modules[__name__]) +vcs.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__]) unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/diff.py b/libbe/diff.py index ba48efc..9253a23 100644 --- a/libbe/diff.py +++ b/libbe/diff.py @@ -14,112 +14,406 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -"""Compare two bug trees""" -from libbe import cmdutil, bugdir, bug -from libbe.utility import time_to_str + +"""Compare two bug trees.""" + +import difflib import doctest -def bug_diffs(old_bugdir, new_bugdir): - added = [] - removed = [] - modified = [] - for uuid in old_bugdir.list_uuids(): - old_bug = old_bugdir.bug_from_uuid(uuid) - try: - new_bug = new_bugdir.bug_from_uuid(uuid) - old_bug.load_comments() - new_bug.load_comments() - if old_bug != new_bug: - modified.append((old_bug, new_bug)) - except KeyError: - removed.append(old_bug) - for uuid in new_bugdir.list_uuids(): - if not old_bugdir.has_bug(uuid): - new_bug = new_bugdir.bug_from_uuid(uuid) - added.append(new_bug) - return (removed, modified, added) +from libbe import bugdir, bug, settings_object, tree +from libbe.utility import time_to_str -def diff_report(bug_diffs_data, old_bugdir, new_bugdir): - bugs_removed,bugs_modified,bugs_added = bug_diffs_data - def modified_cmp(left, right): - return bug.cmp_severity(left[1], right[1]) - bugs_added.sort(bug.cmp_severity) - bugs_removed.sort(bug.cmp_severity) - bugs_modified.sort(modified_cmp) - lines = [] - - if old_bugdir.settings != new_bugdir.settings: - bugdir_settings = sorted(new_bugdir.settings_properties) - bugdir_settings.remove("rcs_name") # tweaked by bugdir.duplicate_bugdir - change_list = change_lines(old_bugdir, new_bugdir, bugdir_settings) - if len(change_list) > 0: - lines.append("Modified bug directory:") - change_strings = ["%s: %s -> %s" % f for f in change_list] - lines.extend(change_strings) - lines.append("") - if len(bugs_added) > 0: - lines.append("New bug reports:") - for bg in bugs_added: - lines.extend(bg.string(shortlist=True).splitlines()) - lines.append("") - if len(bugs_modified) > 0: - printed = False - for old_bug, new_bug in bugs_modified: - change_str = bug_changes(old_bug, new_bug) - if change_str is None: - continue - if not printed: - printed = True - lines.append("Modified bug reports:") - lines.extend(change_str.splitlines()) - if printed == True: - lines.append("") - if len(bugs_removed) > 0: - lines.append("Removed bug reports:") - for bg in bugs_removed: - lines.extend(bg.string(shortlist=True).splitlines()) - lines.append("") - - return "\n".join(lines).rstrip("\n") +class DiffTree (tree.Tree): + """ + A tree holding difference data for easy report generation. + >>> bugdir = DiffTree("bugdir") + >>> bdsettings = DiffTree("settings", data="target: None -> 1.0") + >>> bugdir.append(bdsettings) + >>> bugs = DiffTree("bugs", "bug-count: 5 -> 6") + >>> bugdir.append(bugs) + >>> new = DiffTree("new", "new bugs: ABC, DEF") + >>> bugs.append(new) + >>> rem = DiffTree("rem", "removed bugs: RST, UVW") + >>> bugs.append(rem) + >>> print bugdir.report_string() + target: None -> 1.0 + bug-count: 5 -> 6 + new bugs: ABC, DEF + removed bugs: RST, UVW + >>> print "\\n".join(bugdir.paths()) + bugdir + bugdir/settings + bugdir/bugs + bugdir/bugs/new + bugdir/bugs/rem + >>> bugdir.child_by_path("/") == bugdir + True + >>> bugdir.child_by_path("/bugs") == bugs + True + >>> bugdir.child_by_path("/bugs/rem") == rem + True + >>> bugdir.child_by_path("bugdir") == bugdir + True + >>> bugdir.child_by_path("bugdir/") == bugdir + True + >>> bugdir.child_by_path("bugdir/bugs") == bugs + True + >>> bugdir.child_by_path("/bugs").masked = True + >>> print bugdir.report_string() + target: None -> 1.0 + """ + def __init__(self, name, data=None, data_part_fn=str, + requires_children=False, masked=False): + tree.Tree.__init__(self) + self.name = name + self.data = data + self.data_part_fn = data_part_fn + self.requires_children = requires_children + self.masked = masked + def paths(self, parent_path=None): + paths = [] + if parent_path == None: + path = self.name + else: + path = "%s/%s" % (parent_path, self.name) + paths.append(path) + for child in self: + paths.extend(child.paths(path)) + return paths + def child_by_path(self, path): + if hasattr(path, "split"): # convert string path to a list of names + names = path.split("/") + if names[0] == "": + names[0] = self.name # replace root with self + if len(names) > 1 and names[-1] == "": + names = names[:-1] # strip empty tail + else: # it was already an array + names = path + assert len(names) > 0, path + if names[0] == self.name: + if len(names) == 1: + return self + for child in self: + if names[1] == child.name: + return child.child_by_path(names[1:]) + if len(names) == 1: + raise KeyError, "%s doesn't match '%s'" % (names, self.name) + raise KeyError, "%s points to child not in %s" % (names, [c.name for c in self]) + def report_string(self): + return "\n".join(self.report()) + def report(self, root=None, parent=None, depth=0): + if root == None: + root = self.make_root() + if self.masked == True: + return None + data_part = self.data_part(depth) + if self.requires_children == True and len(self) == 0: + pass + else: + self.join(root, parent, data_part) + if data_part != None: + depth += 1 + for child in self: + child.report(root, self, depth) + return root + def make_root(self): + return [] + def join(self, root, parent, data_part): + if data_part != None: + root.append(data_part) + def data_part(self, depth, indent=True): + if self.data == None: + return None + if hasattr(self, "_cached_data_part"): + return self._cached_data_part + data_part = self.data_part_fn(self.data) + if indent == True: + data_part_lines = data_part.splitlines() + indent = " "*(depth) + line_sep = "\n"+indent + data_part = indent+line_sep.join(data_part_lines) + self._cached_data_part = data_part + return data_part -def change_lines(old, new, attributes): - change_list = [] - for attr in attributes: - old_attr = getattr(old, attr) - new_attr = getattr(new, attr) - if old_attr != new_attr: - change_list.append((attr, old_attr, new_attr)) - if len(change_list) >= 0: - return change_list - else: +class Diff (object): + """ + Difference tree generator for BugDirs. + >>> import copy + >>> bd = bugdir.SimpleBugDir(sync_with_disk=False) + >>> bd.user_id = "John Doe <j@doe.com>" + >>> bd_new = copy.deepcopy(bd) + >>> bd_new.target = "1.0" + >>> a = bd_new.bug_from_uuid("a") + >>> rep = a.comment_root.new_reply("I'm closing this bug") + >>> rep.uuid = "acom" + >>> rep.date = "Thu, 01 Jan 1970 00:00:00 +0000" + >>> a.status = "closed" + >>> b = bd_new.bug_from_uuid("b") + >>> bd_new.remove_bug(b) + >>> c = bd_new.new_bug("c", "Bug C") + >>> d = Diff(bd, bd_new) + >>> r = d.report_tree() + >>> print "\\n".join(r.paths()) + bugdir + bugdir/settings + bugdir/bugs + bugdir/bugs/new + bugdir/bugs/new/c + bugdir/bugs/rem + bugdir/bugs/rem/b + bugdir/bugs/mod + bugdir/bugs/mod/a + bugdir/bugs/mod/a/settings + bugdir/bugs/mod/a/comments + bugdir/bugs/mod/a/comments/new + bugdir/bugs/mod/a/comments/new/acom + bugdir/bugs/mod/a/comments/rem + bugdir/bugs/mod/a/comments/mod + >>> print r.report_string() + Changed bug directory settings: + target: None -> 1.0 + New bugs: + c:om: Bug C + Removed bugs: + b:cm: Bug B + Modified bugs: + a:cm: Bug A + Changed bug settings: + status: open -> closed + New comments: + from John Doe <j@doe.com> on Thu, 01 Jan 1970 00:00:00 +0000 + I'm closing this bug... + >>> bd.cleanup() + """ + def __init__(self, old_bugdir, new_bugdir): + self.old_bugdir = old_bugdir + self.new_bugdir = new_bugdir + + # data assembly methods + + def _changed_bugs(self): + """ + Search for differences in all bugs between .old_bugdir and + .new_bugdir. Returns + (added_bugs, modified_bugs, removed_bugs) + where added_bugs and removed_bugs are lists of added and + removed bugs respectively. modified_bugs is a list of + (old_bug,new_bug) pairs. + """ + if hasattr(self, "__changed_bugs"): + return self.__changed_bugs + added = [] + removed = [] + modified = [] + for uuid in self.new_bugdir.list_uuids(): + new_bug = self.new_bugdir.bug_from_uuid(uuid) + try: + old_bug = self.old_bugdir.bug_from_uuid(uuid) + except KeyError: + added.append(new_bug) + else: + if old_bug.sync_with_disk == True: + old_bug.load_comments() + if new_bug.sync_with_disk == True: + new_bug.load_comments() + if old_bug != new_bug: + modified.append((old_bug, new_bug)) + for uuid in self.old_bugdir.list_uuids(): + if not self.new_bugdir.has_bug(uuid): + old_bug = self.old_bugdir.bug_from_uuid(uuid) + removed.append(old_bug) + added.sort() + removed.sort() + modified.sort(self._bug_modified_cmp) + self.__changed_bugs = (added, modified, removed) + return self.__changed_bugs + def _bug_modified_cmp(self, left, right): + return cmp(left[1], right[1]) + def _changed_comments(self, old, new): + """ + Search for differences in all loaded comments between the bugs + old and new. Returns + (added_comments, modified_comments, removed_comments) + analogous to ._changed_bugs. + """ + if hasattr(self, "__changed_comments"): + if new.uuid in self.__changed_comments: + return self.__changed_comments[new.uuid] + else: + self.__changed_comments = {} + added = [] + removed = [] + modified = [] + old.comment_root.sort(key=lambda comm : comm.time) + new.comment_root.sort(key=lambda comm : comm.time) + old_comment_ids = [c.uuid for c in old.comments()] + new_comment_ids = [c.uuid for c in new.comments()] + for uuid in new_comment_ids: + new_comment = new.comment_from_uuid(uuid) + try: + old_comment = old.comment_from_uuid(uuid) + except KeyError: + added.append(new_comment) + else: + if old_comment != new_comment: + modified.append((old_comment, new_comment)) + for uuid in old_comment_ids: + if uuid not in new_comment_ids: + new_comment = new.comment_from_uuid(uuid) + removed.append(new_comment) + self.__changed_comments[new.uuid] = (added, modified, removed) + return self.__changed_comments[new.uuid] + def _attribute_changes(self, old, new, attributes): + """ + Take two objects old and new, and compare the value of *.attr + for attr in the list attribute names. Returns a list of + (attr_name, old_value, new_value) + tuples. + """ + change_list = [] + for attr in attributes: + old_value = getattr(old, attr) + new_value = getattr(new, attr) + if old_value != new_value: + change_list.append((attr, old_value, new_value)) + if len(change_list) >= 0: + return change_list return None + def _settings_properties_attribute_changes(self, old, new, + hidden_properties=[]): + properties = sorted(new.settings_properties) + for p in hidden_properties: + properties.remove(p) + attributes = [settings_object.setting_name_to_attr_name(None, p) + for p in properties] + return self._attribute_changes(old, new, attributes) + def _bugdir_attribute_changes(self): + return self._settings_properties_attribute_changes( \ + self.old_bugdir, self.new_bugdir, + ["vcs_name"]) # tweaked by bugdir.duplicate_bugdir + def _bug_attribute_changes(self, old, new): + return self._settings_properties_attribute_changes(old, new) + def _comment_attribute_changes(self, old, new): + return self._settings_properties_attribute_changes(old, new) -def bug_changes(old, new): - bug_settings = sorted(new.settings_properties) - change_list = change_lines(old, new, bug_settings) - change_strings = ["%s: %s -> %s" % f for f in change_list] + # report generation methods - old_comment_ids = [c.uuid for c in old.comments()] - new_comment_ids = [c.uuid for c in new.comments()] - for comment_id in new_comment_ids: - if comment_id not in old_comment_ids: - summary = comment_summary(new.comment_from_uuid(comment_id), "new") - change_strings.append(summary) - for comment_id in old_comment_ids: - if comment_id not in new_comment_ids: - summary = comment_summary(new.comment_from_uuid(comment_id), - "removed") - change_strings.append(summary) + def report_tree(self, diff_tree=DiffTree): + """ + Pretty bare to make it easy to adjust to specific cases. You + can pass in a DiffTree subclass via diff_tree to override the + default report assembly process. + """ + if hasattr(self, "__report_tree"): + return self.__report_tree + bugdir_settings = sorted(self.new_bugdir.settings_properties) + bugdir_settings.remove("vcs_name") # tweaked by bugdir.duplicate_bugdir + root = diff_tree("bugdir") + bugdir_attribute_changes = self._bugdir_attribute_changes() + if len(bugdir_attribute_changes) > 0: + bugdir = diff_tree("settings", bugdir_attribute_changes, + self.bugdir_attribute_change_string) + root.append(bugdir) + bug_root = diff_tree("bugs") + root.append(bug_root) + add,mod,rem = self._changed_bugs() + bnew = diff_tree("new", "New bugs:", requires_children=True) + bug_root.append(bnew) + for bug in add: + b = diff_tree(bug.uuid, bug, self.bug_add_string) + bnew.append(b) + brem = diff_tree("rem", "Removed bugs:", requires_children=True) + bug_root.append(brem) + for bug in rem: + b = diff_tree(bug.uuid, bug, self.bug_rem_string) + brem.append(b) + bmod = diff_tree("mod", "Modified bugs:", requires_children=True) + bug_root.append(bmod) + for old,new in mod: + b = diff_tree(new.uuid, (old,new), self.bug_mod_string) + bmod.append(b) + bug_attribute_changes = self._bug_attribute_changes(old, new) + if len(bug_attribute_changes) > 0: + bset = diff_tree("settings", bug_attribute_changes, + self.bug_attribute_change_string) + b.append(bset) + if old.summary != new.summary: + data = (old.summary, new.summary) + bsum = diff_tree("summary", data, self.bug_summary_change_string) + b.append(bsum) + cr = diff_tree("comments") + b.append(cr) + a,m,d = self._changed_comments(old, new) + cnew = diff_tree("new", "New comments:", requires_children=True) + for comment in a: + c = diff_tree(comment.uuid, comment, self.comment_add_string) + cnew.append(c) + crem = diff_tree("rem", "Removed comments:",requires_children=True) + for comment in d: + c = diff_tree(comment.uuid, comment, self.comment_rem_string) + crem.append(c) + cmod = diff_tree("mod","Modified comments:",requires_children=True) + for o,n in m: + c = diff_tree(n.uuid, (o,n), self.comment_mod_string) + cmod.append(c) + comm_attribute_changes = self._comment_attribute_changes(o, n) + if len(comm_attribute_changes) > 0: + cset = diff_tree("settings", comm_attribute_changes, + self.comment_attribute_change_string) + if o.body != n.body: + data = (o.body, n.body) + cbody = diff_tree("cbody", data, + self.comment_body_change_string) + c.append(cbody) + cr.extend([cnew, crem, cmod]) + self.__report_tree = root + return self.__report_tree - if len(change_strings) == 0: - return None - return "%s\n %s" % (new.string(shortlist=True), - " \n".join(change_strings)) + # change data -> string methods. + # Feel free to play with these in subclasses. + def attribute_change_string(self, attribute_changes, indent=0): + indent_string = " "*indent + change_strings = [u"%s: %s -> %s" % f for f in attribute_changes] + for i,change_string in enumerate(change_strings): + change_strings[i] = indent_string+change_string + return u"\n".join(change_strings) + def bugdir_attribute_change_string(self, attribute_changes): + return "Changed bug directory settings:\n%s" % \ + self.attribute_change_string(attribute_changes, indent=1) + def bug_attribute_change_string(self, attribute_changes): + return "Changed bug settings:\n%s" % \ + self.attribute_change_string(attribute_changes, indent=1) + def comment_attribute_change_string(self, attribute_changes): + return "Changed comment settings:\n%s" % \ + self.attribute_change_string(attribute_changes, indent=1) + def bug_add_string(self, bug): + return bug.string(shortlist=True) + def bug_rem_string(self, bug): + return bug.string(shortlist=True) + def bug_mod_string(self, bugs): + old_bug,new_bug = bugs + return new_bug.string(shortlist=True) + def bug_summary_change_string(self, summaries): + old_summary,new_summary = summaries + return "summary changed:\n %s\n %s" % (old_summary, new_summary) + def _comment_summary_string(self, comment): + return "from %s on %s" % (comment.author, time_to_str(comment.time)) + def comment_add_string(self, comment): + summary = self._comment_summary_string(comment) + first_line = comment.body.splitlines()[0] + return "%s\n %s..." % (summary, first_line) + def comment_rem_string(self, comment): + summary = self._comment_summary_string(comment) + first_line = comment.body.splitlines()[0] + return "%s\n %s..." % (summary, first_line) + def comment_mod_string(self, comments): + old_comment,new_comment = comments + return self._comment_summary_string(new_comment) + def comment_body_change_string(self, bodies): + old_body,new_body = bodies + return difflib.unified_diff(old_body, new_body) -def comment_summary(comment, status): - return "%8s comment from %s on %s" % (status, comment.From, - time_to_str(comment.time)) suite = doctest.DocTestSuite() diff --git a/libbe/editor.py b/libbe/editor.py index 93144b8..ec41006 100644 --- a/libbe/editor.py +++ b/libbe/editor.py @@ -15,6 +15,11 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +Define editor_string(), a function that invokes an editor to accept +user-produced text as a string. +""" + import codecs import locale import os @@ -22,6 +27,7 @@ import sys import tempfile import doctest + default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding() comment_marker = u"== Anything below this line will be ignored\n" @@ -62,7 +68,8 @@ def editor_string(comment=None, encoding=None): fhandle, fname = tempfile.mkstemp() try: if comment is not None: - os.write(fhandle, '\n'+comment_string(comment)) + cstring = u'\n'+comment_string(comment) + os.write(fhandle, cstring.encode(encoding)) os.close(fhandle) oldmtime = os.path.getmtime(fname) os.system("%s %s" % (editor, fname)) diff --git a/libbe/encoding.py b/libbe/encoding.py index d603602..fd513b5 100644 --- a/libbe/encoding.py +++ b/libbe/encoding.py @@ -14,16 +14,26 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Support input/output/filesystem encodings (e.g. UTF-8). +""" + import codecs import locale import sys import doctest + +ENCODING = None # override get_encoding() output by setting this + def get_encoding(): """ Guess a useful input/output/filesystem encoding... Maybe we need seperate encodings for input/output and filesystem? Hmm... """ + if ENCODING != None: + return ENCODING encoding = locale.getpreferredencoding() or sys.getdefaultencoding() if sys.platform != 'win32' or sys.version_info[:2] > (2, 3): encoding = locale.getlocale(locale.LC_TIME)[1] or encoding diff --git a/libbe/git.py b/libbe/git.py index 2f9ffa9..3abe3b8 100644 --- a/libbe/git.py +++ b/libbe/git.py @@ -16,30 +16,34 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +Git backend. +""" + import os import re import sys import unittest import doctest -import rcs -from rcs import RCS +import vcs + def new(): return Git() -class Git(RCS): +class Git(vcs.VCS): name="git" client="git" versioned=True - def _rcs_help(self): + def _vcs_help(self): status,output,error = self._u_invoke_client("--help") return output - def _rcs_detect(self, path): + def _vcs_detect(self, path): if self._u_search_parent_directories(path, ".git") != None : return True return False - def _rcs_root(self, path): + def _vcs_root(self, path): """Find the root of the deepest repository containing path.""" # Assume that nothing funny is going on; in particular, that we aren't # dealing with a bare repo. @@ -50,13 +54,21 @@ class Git(RCS): gitdir = os.path.join(path, output.rstrip('\n')) dirname = os.path.abspath(os.path.dirname(gitdir)) return dirname - def _rcs_init(self, path): + def _vcs_init(self, path): self._u_invoke_client("init", directory=path) - def _rcs_get_user_id(self): - status,output,error = self._u_invoke_client("config", "user.name") - name = output.rstrip('\n') - status,output,error = self._u_invoke_client("config", "user.email") - email = output.rstrip('\n') + def _vcs_get_user_id(self): + status,output,error = \ + self._u_invoke_client("config", "user.name", expect=(0,1)) + if status == 0: + name = output.rstrip('\n') + else: + name = "" + status,output,error = \ + self._u_invoke_client("config", "user.email", expect=(0,1)) + if status == 0: + email = output.rstrip('\n') + else: + email = "" if name != "" or email != "": # got something! # guess missing info, if necessary if name == "": @@ -65,35 +77,35 @@ class Git(RCS): email = self._u_get_fallback_email() return self._u_create_id(name, email) return None # Git has no infomation - def _rcs_set_user_id(self, value): + def _vcs_set_user_id(self, value): name,email = self._u_parse_id(value) if email != None: self._u_invoke_client("config", "user.email", email) self._u_invoke_client("config", "user.name", name) - def _rcs_add(self, path): + def _vcs_add(self, path): if os.path.isdir(path): return self._u_invoke_client("add", path) - def _rcs_remove(self, path): + def _vcs_remove(self, path): if not os.path.isdir(self._u_abspath(path)): self._u_invoke_client("rm", "-f", path) - def _rcs_update(self, path): - self._rcs_add(path) - def _rcs_get_file_contents(self, path, revision=None, binary=False): + def _vcs_update(self, path): + self._vcs_add(path) + def _vcs_get_file_contents(self, path, revision=None, binary=False): if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, binary=binary) + return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary) else: arg = "%s:%s" % (revision,path) status,output,error = self._u_invoke_client("show", arg) return output - def _rcs_duplicate_repo(self, directory, revision=None): + def _vcs_duplicate_repo(self, directory, revision=None): if revision==None: - RCS._rcs_duplicate_repo(self, directory, revision) + vcs.VCS._vcs_duplicate_repo(self, directory, revision) else: #self._u_invoke_client("archive", revision, directory) # makes tarball self._u_invoke_client("clone", "--no-checkout",".",directory) self._u_invoke_client("checkout", revision, directory=directory) - def _rcs_commit(self, commitfile, allow_empty=False): + def _vcs_commit(self, commitfile, allow_empty=False): args = ['commit', '--all', '--file', commitfile] if allow_empty == True: args.append("--allow-empty") @@ -104,17 +116,33 @@ class Git(RCS): strings = ["nothing to commit", "nothing added to commit"] if self._u_any_in_string(strings, output) == True: - raise rcs.EmptyCommit() + raise vcs.EmptyCommit() revision = None revline = re.compile("(.*) (.*)[:\]] (.*)") match = revline.search(output) assert match != None, output+error assert len(match.groups()) == 3 revision = match.groups()[1] - return revision + full_revision = self._vcs_revision_id(-1) + assert full_revision.startswith(revision), \ + "Mismatched revisions:\n%s\n%s" % (revision, full_revision) + return full_revision + def _vcs_revision_id(self, index): + args = ["rev-list", "--first-parent", "--reverse", "HEAD"] + kwargs = {"expect":(0,128)} + status,output,error = self._u_invoke_client(*args, **kwargs) + if status == 128: + if error.startswith("fatal: ambiguous argument 'HEAD': unknown "): + return None + raise vcs.CommandError(args, status, stdout="", stderr=error) + commits = output.splitlines() + try: + return commits[index] + except IndexError: + return None -rcs.make_rcs_testcase_subclasses(Git, sys.modules[__name__]) +vcs.make_vcs_testcase_subclasses(Git, sys.modules[__name__]) unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/hg.py b/libbe/hg.py index a20eeb5..f8f8121 100644 --- a/libbe/hg.py +++ b/libbe/hg.py @@ -16,81 +16,88 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +Mercurial (hg) backend. +""" + import os import re import sys import unittest import doctest -import rcs -from rcs import RCS +import vcs + def new(): return Hg() -class Hg(RCS): +class Hg(vcs.VCS): name="hg" client="hg" versioned=True - def _rcs_help(self): + def _vcs_help(self): status,output,error = self._u_invoke_client("--help") return output - def _rcs_detect(self, path): + def _vcs_detect(self, path): """Detect whether a directory is revision-controlled using Mercurial""" if self._u_search_parent_directories(path, ".hg") != None: return True return False - def _rcs_root(self, path): + def _vcs_root(self, path): status,output,error = self._u_invoke_client("root", directory=path) return output.rstrip('\n') - def _rcs_init(self, path): + def _vcs_init(self, path): self._u_invoke_client("init", directory=path) - def _rcs_get_user_id(self): + def _vcs_get_user_id(self): status,output,error = self._u_invoke_client("showconfig","ui.username") return output.rstrip('\n') - def _rcs_set_user_id(self, value): + def _vcs_set_user_id(self, value): """ Supported by the Config Extension, but that is not part of standard Mercurial. http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension """ - raise rcs.SettingIDnotSupported - def _rcs_add(self, path): + raise vcs.SettingIDnotSupported + def _vcs_add(self, path): self._u_invoke_client("add", path) - def _rcs_remove(self, path): + def _vcs_remove(self, path): self._u_invoke_client("rm", "--force", path) - def _rcs_update(self, path): + def _vcs_update(self, path): pass - def _rcs_get_file_contents(self, path, revision=None, binary=False): + def _vcs_get_file_contents(self, path, revision=None, binary=False): if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, binary=binary) + return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary) else: status,output,error = \ self._u_invoke_client("cat","-r",revision,path) return output - def _rcs_duplicate_repo(self, directory, revision=None): + def _vcs_duplicate_repo(self, directory, revision=None): if revision == None: - return RCS._rcs_duplicate_repo(self, directory, revision) + return vcs.VCS._vcs_duplicate_repo(self, directory, revision) else: self._u_invoke_client("archive", "--rev", revision, directory) - def _rcs_commit(self, commitfile, allow_empty=False): + def _vcs_commit(self, commitfile, allow_empty=False): args = ['commit', '--logfile', commitfile] status,output,error = self._u_invoke_client(*args) if allow_empty == False: strings = ["nothing changed"] if self._u_any_in_string(strings, output) == True: - raise rcs.EmptyCommit() - status,output,error = self._u_invoke_client('identify') - revision = None - revline = re.compile("(.*) tip") - match = revline.search(output) - assert match != None, output+error - assert len(match.groups()) == 1 - revision = match.groups()[0] - return revision + raise vcs.EmptyCommit() + return self._vcs_revision_id(-1) + def _vcs_revision_id(self, index, style="id"): + args = ["identify", "--rev", str(int(index)), "--%s" % style] + kwargs = {"expect": (0,255)} + status,output,error = self._u_invoke_client(*args, **kwargs) + if status == 0: + id = output.strip() + if id == '000000000000': + return None # before initial commit. + return id + return None -rcs.make_rcs_testcase_subclasses(Hg, sys.modules[__name__]) +vcs.make_vcs_testcase_subclasses(Hg, sys.modules[__name__]) unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/mapfile.py b/libbe/mapfile.py index b959d76..4d69601 100644 --- a/libbe/mapfile.py +++ b/libbe/mapfile.py @@ -14,12 +14,19 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import yaml -import os.path + +""" +Provide a means of saving and loading dictionaries of parameters. The +saved "mapfiles" should be clear, flat-text files, and allow easy merging of +independent/conflicting changes. +""" + import errno -import utility +import os.path +import yaml import doctest + class IllegalKey(Exception): def __init__(self, key): Exception.__init__(self, 'Illegal key "%s"' % key) @@ -95,33 +102,15 @@ def parse(contents): >>> dict["e"] 'f' """ - old_format = False - for line in contents.splitlines(): - if len(line.split("=")) == 2: - old_format = True - break - if old_format: # translate to YAML. Hack to deal with old BE bugs. - newlines = [] - for line in contents.splitlines(): - line = line.rstrip('\n') - if len(line) == 0: - continue - fields = line.split("=") - if len(fields) == 2: - key,value = fields - newlines.append('%s: "%s"' % (key, value.replace('"','\\"'))) - else: - newlines.append(line) - contents = '\n'.join(newlines) return yaml.load(contents) or {} -def map_save(rcs, path, map, allow_no_rcs=False): +def map_save(vcs, path, map, allow_no_vcs=False): """Save the map as a mapfile to the specified path""" contents = generate(map) - rcs.set_file_contents(path, contents, allow_no_rcs) + vcs.set_file_contents(path, contents, allow_no_vcs) -def map_load(rcs, path, allow_no_rcs=False): - contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs) +def map_load(vcs, path, allow_no_vcs=False): + contents = vcs.get_file_contents(path, allow_no_vcs=allow_no_vcs) return parse(contents) suite = doctest.DocTestSuite() diff --git a/libbe/plugin.py b/libbe/plugin.py index 0545fd7..d593d69 100644 --- a/libbe/plugin.py +++ b/libbe/plugin.py @@ -15,6 +15,12 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Allow simple listing and loading of the various becommands and libbe +submodules (i.e. "plugins"). +""" + import os import os.path import sys diff --git a/libbe/properties.py b/libbe/properties.py index 144220b..09dd20e 100644 --- a/libbe/properties.py +++ b/libbe/properties.py @@ -160,10 +160,10 @@ def _get_cached_mutable_property(self, cacher_name, property_name, default=None) if (cacher_name, property_name) not in self._mutable_property_cache_copy: return default return self._mutable_property_cache_copy[(cacher_name, property_name)] -def _cmp_cached_mutable_property(self, cacher_name, property_name, value): +def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None): _init_mutable_property_cache(self) if (cacher_name, property_name) not in self._mutable_property_cache_hash: - return 1 # any value > non-existant old hash + _set_cached_mutable_property(self, cacher_name, property_name, default) old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)] return cmp(_hash_mutable_value(value), old_hash) @@ -327,7 +327,7 @@ def primed_property(primer, initVal=None): return funcs return decorator -def change_hook_property(hook, mutable=False): +def change_hook_property(hook, mutable=False, default=None): """ Call the function hook(instance, old_value, new_value) whenever a value different from the current value is set (instance is a a @@ -359,9 +359,9 @@ def change_hook_property(hook, mutable=False): value = new_value # compare new value with cached else: value = fget(self) # compare current value with cached - if _cmp_cached_mutable_property(self, "change hook property", name, value) != 0: + if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0: # there has been a change, cache new value - old_value = _get_cached_mutable_property(self, "change hook property", name) + old_value = _get_cached_mutable_property(self, "change hook property", name, default) _set_cached_mutable_property(self, "change hook property", name, value) if from_fset == True: # return previously cached value value = old_value diff --git a/libbe/rcs.py b/libbe/rcs.py index 294b8e0..e69de29 100644 --- a/libbe/rcs.py +++ b/libbe/rcs.py @@ -1,876 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# Alexander Belchenko <bialix@ukr.net> -# Ben Finney <ben+python@benfinney.id.au> -# Chris Ball <cjb@laptop.org> -# W. Trevor King <wking@drexel.edu> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -from subprocess import Popen, PIPE -import codecs -import os -import os.path -import re -from socket import gethostname -import shutil -import sys -import tempfile -import unittest -import doctest - -from utility import Dir, search_parent_directories - - -def _get_matching_rcs(matchfn): - """Return the first module for which matchfn(RCS_instance) is true""" - import arch - import bzr - import darcs - import git - import hg - for module in [arch, bzr, darcs, git, hg]: - rcs = module.new() - if matchfn(rcs) == True: - return rcs - del(rcs) - return RCS() - -def rcs_by_name(rcs_name): - """Return the module for the RCS with the given name""" - return _get_matching_rcs(lambda rcs: rcs.name == rcs_name) - -def detect_rcs(dir): - """Return an RCS instance for the rcs being used in this directory""" - return _get_matching_rcs(lambda rcs: rcs.detect(dir)) - -def installed_rcs(): - """Return an instance of an installed RCS""" - return _get_matching_rcs(lambda rcs: rcs.installed()) - - -class CommandError(Exception): - def __init__(self, command, status, err_str): - strerror = ["Command failed (%d):\n %s\n" % (status, err_str), - "while executing\n %s" % command] - Exception.__init__(self, "\n".join(strerror)) - self.command = command - self.status = status - self.err_str = err_str - -class SettingIDnotSupported(NotImplementedError): - pass - -class RCSnotRooted(Exception): - def __init__(self): - msg = "RCS not rooted" - Exception.__init__(self, msg) - -class PathNotInRoot(Exception): - def __init__(self, path, root): - msg = "Path '%s' not in root '%s'" % (path, root) - Exception.__init__(self, msg) - self.path = path - self.root = root - -class NoSuchFile(Exception): - def __init__(self, pathname, root="."): - path = os.path.abspath(os.path.join(root, pathname)) - Exception.__init__(self, "No such file: %s" % path) - -class EmptyCommit(Exception): - def __init__(self): - Exception.__init__(self, "No changes to commit") - - -def new(): - return RCS() - -class RCS(object): - """ - This class implements a 'no-rcs' interface. - - Support for other RCSs can be added by subclassing this class, and - overriding methods _rcs_*() with code appropriate for your RCS. - - The methods _u_*() are utility methods available to the _rcs_*() - methods. - """ - name = "None" - client = "" # command-line tool for _u_invoke_client - versioned = False - def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()): - self.paranoid = paranoid - self.verboseInvoke = False - self.rootdir = None - self._duplicateBasedir = None - self._duplicateDirname = None - self.encoding = encoding - def __del__(self): - self.cleanup() - - def _rcs_help(self): - """ - Return the command help string. - (Allows a simple test to see if the client is installed.) - """ - pass - def _rcs_detect(self, path=None): - """ - Detect whether a directory is revision controlled with this RCS. - """ - return True - def _rcs_root(self, path): - """ - Get the RCS root. This is the default working directory for - future invocations. You would normally set this to the root - directory for your RCS. - """ - if os.path.isdir(path)==False: - path = os.path.dirname(path) - if path == "": - path = os.path.abspath(".") - return path - def _rcs_init(self, path): - """ - Begin versioning the tree based at path. - """ - pass - def _rcs_cleanup(self): - """ - Remove any cruft that _rcs_init() created outside of the - versioned tree. - """ - pass - def _rcs_get_user_id(self): - """ - Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). - If the RCS has not been configured with a username, return None. - """ - return None - def _rcs_set_user_id(self, value): - """ - Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>"). - This is run if the RCS has not been configured with a usename, so - that commits will have a reasonable FROM value. - """ - raise SettingIDnotSupported - def _rcs_add(self, path): - """ - Add the already created file at path to version control. - """ - pass - def _rcs_remove(self, path): - """ - Remove the file at path from version control. Optionally - remove the file from the filesystem as well. - """ - pass - def _rcs_update(self, path): - """ - Notify the versioning system of changes to the versioned file - at path. - """ - pass - def _rcs_get_file_contents(self, path, revision=None, binary=False): - """ - Get the file contents as they were in a given revision. - Revision==None specifies the current revision. - """ - assert revision == None, \ - "The %s RCS does not support revision specifiers" % self.name - if binary == False: - f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding) - else: - f = open(os.path.join(self.rootdir, path), "rb") - contents = f.read() - f.close() - return contents - def _rcs_duplicate_repo(self, directory, revision=None): - """ - Get the repository as it was in a given revision. - revision==None specifies the current revision. - dir specifies a directory to create the duplicate in. - """ - shutil.copytree(self.rootdir, directory, True) - def _rcs_commit(self, commitfile, allow_empty=False): - """ - Commit the current working directory, using the contents of - commitfile as the comment. Return the name of the old - revision (or None if commits are not supported). - - If allow_empty == False, raise EmptyCommit if there are no - changes to commit. - """ - return None - def installed(self): - try: - self._rcs_help() - return True - except OSError, e: - if e.errno == errno.ENOENT: - return False - except CommandError: - return False - def detect(self, path="."): - """ - Detect whether a directory is revision controlled with this RCS. - """ - return self._rcs_detect(path) - def root(self, path): - """ - Set the root directory to the path's RCS root. This is the - default working directory for future invocations. - """ - self.rootdir = self._rcs_root(path) - def init(self, path): - """ - Begin versioning the tree based at path. - Also roots the rcs at path. - """ - if os.path.isdir(path)==False: - path = os.path.dirname(path) - self._rcs_init(path) - self.root(path) - def cleanup(self): - self._rcs_cleanup() - def get_user_id(self): - """ - Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). - If the RCS has not been configured with a username, return the user's - id. You can override the automatic lookup procedure by setting the - RCS.user_id attribute to a string of your choice. - """ - if hasattr(self, "user_id"): - if self.user_id != None: - return self.user_id - id = self._rcs_get_user_id() - if id == None: - name = self._u_get_fallback_username() - email = self._u_get_fallback_email() - id = self._u_create_id(name, email) - print >> sys.stderr, "Guessing id '%s'" % id - try: - self.set_user_id(id) - except SettingIDnotSupported: - pass - return id - def set_user_id(self, value): - """ - Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>"). - This is run if the RCS has not been configured with a usename, so - that commits will have a reasonable FROM value. - """ - self._rcs_set_user_id(value) - def add(self, path): - """ - Add the already created file at path to version control. - """ - self._rcs_add(self._u_rel_path(path)) - def remove(self, path): - """ - Remove a file from both version control and the filesystem. - """ - self._rcs_remove(self._u_rel_path(path)) - if os.path.exists(path): - os.remove(path) - def recursive_remove(self, dirname): - """ - Remove a file/directory and all its decendents from both - version control and the filesystem. - """ - if not os.path.exists(dirname): - raise NoSuchFile(dirname) - for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): - filenames.extend(dirnames) - for path in filenames: - fullpath = os.path.join(dirpath, path) - if os.path.exists(fullpath) == False: - continue - self._rcs_remove(self._u_rel_path(fullpath)) - if os.path.exists(dirname): - shutil.rmtree(dirname) - def update(self, path): - """ - Notify the versioning system of changes to the versioned file - at path. - """ - self._rcs_update(self._u_rel_path(path)) - def get_file_contents(self, path, revision=None, allow_no_rcs=False, binary=False): - """ - Get the file as it was in a given revision. - Revision==None specifies the current revision. - """ - if not os.path.exists(path): - raise NoSuchFile(path) - if self._use_rcs(path, allow_no_rcs): - relpath = self._u_rel_path(path) - contents = self._rcs_get_file_contents(relpath,revision,binary=binary) - else: - f = codecs.open(path, "r", self.encoding) - contents = f.read() - f.close() - return contents - def set_file_contents(self, path, contents, allow_no_rcs=False, binary=False): - """ - Set the file contents under version control. - """ - add = not os.path.exists(path) - if binary == False: - f = codecs.open(path, "w", self.encoding) - else: - f = open(path, "wb") - f.write(contents) - f.close() - - if self._use_rcs(path, allow_no_rcs): - if add: - self.add(path) - else: - self.update(path) - def mkdir(self, path, allow_no_rcs=False, check_parents=True): - """ - Create (if neccessary) a directory at path under version - control. - """ - if check_parents == True: - parent = os.path.dirname(path) - if not os.path.exists(parent): # recurse through parents - self.mkdir(parent, allow_no_rcs, check_parents) - if not os.path.exists(path): - os.mkdir(path) - if self._use_rcs(path, allow_no_rcs): - self.add(path) - else: - assert os.path.isdir(path) - if self._use_rcs(path, allow_no_rcs): - #self.update(path)# Don't update directories. Changing files - pass # underneath them should be sufficient. - - def duplicate_repo(self, revision=None): - """ - Get the repository as it was in a given revision. - revision==None specifies the current revision. - Return the path to the arbitrary directory at the base of the new repo. - """ - # Dirname in Baseir to protect against simlink attacks. - if self._duplicateBasedir == None: - self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs') - self._duplicateDirname = \ - os.path.join(self._duplicateBasedir, "duplicate") - self._rcs_duplicate_repo(directory=self._duplicateDirname, - revision=revision) - return self._duplicateDirname - def remove_duplicate_repo(self): - """ - Clean up a duplicate repo created with duplicate_repo(). - """ - if self._duplicateBasedir != None: - shutil.rmtree(self._duplicateBasedir) - self._duplicateBasedir = None - self._duplicateDirname = None - def commit(self, summary, body=None, allow_empty=False): - """ - Commit the current working directory, with a commit message - string summary and body. Return the name of the old revision - (or None if versioning is not supported). - - If allow_empty == False (the default), raise EmptyCommit if - there are no changes to commit. - """ - summary = summary.strip()+'\n' - if body is not None: - summary += '\n' + body.strip() + '\n' - descriptor, filename = tempfile.mkstemp() - revision = None - try: - temp_file = os.fdopen(descriptor, 'wb') - temp_file.write(summary) - temp_file.flush() - self.precommit() - revision = self._rcs_commit(filename, allow_empty=allow_empty) - temp_file.close() - self.postcommit() - finally: - os.remove(filename) - return revision - def precommit(self): - """ - Executed before all attempted commits. - """ - pass - def postcommit(self): - """ - Only executed after successful commits. - """ - pass - def _u_any_in_string(self, list, string): - """ - Return True if any of the strings in list are in string. - Otherwise return False. - """ - for list_string in list: - if list_string in string: - return True - return False - def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None): - """ - expect should be a tuple of allowed exit codes. cwd should be - the directory from which the command will be executed. - """ - if cwd == None: - cwd = self.rootdir - if self.verboseInvoke == True: - print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args)) - try : - if sys.platform != "win32": - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd) - else: - # win32 don't have os.execvp() so have to run command in a shell - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, - shell=True, cwd=cwd) - except OSError, e : - raise CommandError(args, e.args[0], e) - output, error = q.communicate(input=stdin) - status = q.wait() - if self.verboseInvoke == True: - print >> sys.stderr, "%d\n%s%s" % (status, output, error) - if status not in expect: - raise CommandError(args, status, error) - return status, output, error - def _u_invoke_client(self, *args, **kwargs): - directory = kwargs.get('directory',None) - expect = kwargs.get('expect', (0,)) - stdin = kwargs.get('stdin', None) - cl_args = [self.client] - cl_args.extend(args) - return self._u_invoke(cl_args, stdin=stdin,expect=expect,cwd=directory) - def _u_search_parent_directories(self, path, filename): - """ - Find the file (or directory) named filename in path or in any - of path's parents. - - e.g. - search_parent_directories("/a/b/c", ".be") - will return the path to the first existing file from - /a/b/c/.be - /a/b/.be - /a/.be - /.be - or None if none of those files exist. - """ - return search_parent_directories(path, filename) - def _use_rcs(self, path, allow_no_rcs): - """ - Try and decide if _rcs_add/update/mkdir/etc calls will - succeed. Returns True is we think the rcs_call would - succeeed, and False otherwise. - """ - use_rcs = True - exception = None - if self.rootdir != None: - if self.path_in_root(path) == False: - use_rcs = False - exception = PathNotInRoot(path, self.rootdir) - else: - use_rcs = False - exception = RCSnotRooted - if use_rcs == False and allow_no_rcs==False: - raise exception - return use_rcs - def path_in_root(self, path, root=None): - """ - Return the relative path to path from root. - >>> rcs = new() - >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c") - True - >>> rcs.path_in_root("/a.b/.be", "/a.b/c") - False - """ - if root == None: - if self.rootdir == None: - raise RCSnotRooted - root = self.rootdir - path = os.path.abspath(path) - absRoot = os.path.abspath(root) - absRootSlashedDir = os.path.join(absRoot,"") - if not path.startswith(absRootSlashedDir): - return False - return True - def _u_rel_path(self, path, root=None): - """ - Return the relative path to path from root. - >>> rcs = new() - >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c") - '.be' - """ - if root == None: - if self.rootdir == None: - raise RCSnotRooted - root = self.rootdir - path = os.path.abspath(path) - absRoot = os.path.abspath(root) - absRootSlashedDir = os.path.join(absRoot,"") - if not path.startswith(absRootSlashedDir): - raise PathNotInRoot(path, absRootSlashedDir) - assert path != absRootSlashedDir, \ - "file %s == root directory %s" % (path, absRootSlashedDir) - relpath = path[len(absRootSlashedDir):] - return relpath - def _u_abspath(self, path, root=None): - """ - Return the absolute path from a path realtive to root. - >>> rcs = new() - >>> rcs._u_abspath(".be", "/a.b/c") - '/a.b/c/.be' - """ - if root == None: - assert self.rootdir != None, "RCS not rooted" - root = self.rootdir - return os.path.abspath(os.path.join(root, path)) - def _u_create_id(self, name, email=None): - """ - >>> rcs = new() - >>> rcs._u_create_id("John Doe", "jdoe@example.com") - 'John Doe <jdoe@example.com>' - >>> rcs._u_create_id("John Doe") - 'John Doe' - """ - assert len(name) > 0 - if email == None or len(email) == 0: - return name - else: - return "%s <%s>" % (name, email) - def _u_parse_id(self, value): - """ - >>> rcs = new() - >>> rcs._u_parse_id("John Doe <jdoe@example.com>") - ('John Doe', 'jdoe@example.com') - >>> rcs._u_parse_id("John Doe") - ('John Doe', None) - >>> try: - ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>") - ... except AssertionError: - ... print "Invalid match" - Invalid match - """ - emailexp = re.compile("(.*) <([^>]*)>(.*)") - match = emailexp.search(value) - if match == None: - email = None - name = value - else: - assert len(match.groups()) == 3 - assert match.groups()[2] == "", match.groups() - email = match.groups()[1] - name = match.groups()[0] - assert name != None - assert len(name) > 0 - return (name, email) - def _u_get_fallback_username(self): - name = None - for envariable in ["LOGNAME", "USERNAME"]: - if os.environ.has_key(envariable): - name = os.environ[envariable] - break - assert name != None - return name - def _u_get_fallback_email(self): - hostname = gethostname() - name = self._u_get_fallback_username() - return "%s@%s" % (name, hostname) - def _u_parse_commitfile(self, commitfile): - """ - Split the commitfile created in self.commit() back into - summary and header lines. - """ - f = codecs.open(commitfile, "r", self.encoding) - summary = f.readline() - body = f.read() - body.lstrip('\n') - if len(body) == 0: - body = None - f.close() - return (summary, body) - - -def setup_rcs_test_fixtures(testcase): - """Set up test fixtures for RCS test case.""" - testcase.rcs = testcase.Class() - testcase.dir = Dir() - testcase.dirname = testcase.dir.path - - rcs_not_supporting_uninitialized_user_id = [] - rcs_not_supporting_set_user_id = ["None", "hg"] - testcase.rcs_supports_uninitialized_user_id = ( - testcase.rcs.name not in rcs_not_supporting_uninitialized_user_id) - testcase.rcs_supports_set_user_id = ( - testcase.rcs.name not in rcs_not_supporting_set_user_id) - - if not testcase.rcs.installed(): - testcase.fail( - "%(name)s RCS not found" % vars(testcase.Class)) - - if testcase.Class.name != "None": - testcase.failIf( - testcase.rcs.detect(testcase.dirname), - "Detected %(name)s RCS before initialising" - % vars(testcase.Class)) - - testcase.rcs.init(testcase.dirname) - - -class RCSTestCase(unittest.TestCase): - """Test cases for base RCS class.""" - - Class = RCS - - def __init__(self, *args, **kwargs): - super(RCSTestCase, self).__init__(*args, **kwargs) - self.dirname = None - - def setUp(self): - super(RCSTestCase, self).setUp() - setup_rcs_test_fixtures(self) - - def tearDown(self): - del(self.rcs) - super(RCSTestCase, self).tearDown() - - def full_path(self, rel_path): - return os.path.join(self.dirname, rel_path) - - -class RCS_init_TestCase(RCSTestCase): - """Test cases for RCS.init method.""" - - def test_detect_should_succeed_after_init(self): - """Should detect RCS in directory after initialization.""" - self.failUnless( - self.rcs.detect(self.dirname), - "Did not detect %(name)s RCS after initialising" - % vars(self.Class)) - - def test_rcs_rootdir_in_specified_root_path(self): - """RCS root directory should be in specified root path.""" - rp = os.path.realpath(self.rcs.rootdir) - dp = os.path.realpath(self.dirname) - rcs_name = self.Class.name - self.failUnless( - dp == rp or rp == None, - "%(rcs_name)s RCS root in wrong dir (%(dp)s %(rp)s)" % vars()) - - -class RCS_get_user_id_TestCase(RCSTestCase): - """Test cases for RCS.get_user_id method.""" - - def test_gets_existing_user_id(self): - """Should get the existing user ID.""" - if not self.rcs_supports_uninitialized_user_id: - return - - user_id = self.rcs.get_user_id() - self.failUnless( - user_id is not None, - "unable to get a user id") - - -class RCS_set_user_id_TestCase(RCSTestCase): - """Test cases for RCS.set_user_id method.""" - - def setUp(self): - super(RCS_set_user_id_TestCase, self).setUp() - - if self.rcs_supports_uninitialized_user_id: - self.prev_user_id = self.rcs.get_user_id() - else: - self.prev_user_id = "Uninitialized identity <bogus@example.org>" - - if self.rcs_supports_set_user_id: - self.test_new_user_id = "John Doe <jdoe@example.com>" - self.rcs.set_user_id(self.test_new_user_id) - - def tearDown(self): - if self.rcs_supports_set_user_id: - self.rcs.set_user_id(self.prev_user_id) - super(RCS_set_user_id_TestCase, self).tearDown() - - def test_raises_error_in_unsupported_vcs(self): - """Should raise an error in a VCS that doesn't support it.""" - if self.rcs_supports_set_user_id: - return - self.assertRaises( - SettingIDnotSupported, - self.rcs.set_user_id, "foo") - - def test_updates_user_id_in_supporting_rcs(self): - """Should update the user ID in an RCS that supports it.""" - if not self.rcs_supports_set_user_id: - return - user_id = self.rcs.get_user_id() - self.failUnlessEqual( - self.test_new_user_id, user_id, - "user id not set correctly (expected %s, got %s)" - % (self.test_new_user_id, user_id)) - - -def setup_rcs_revision_test_fixtures(testcase): - """Set up revision test fixtures for RCS test case.""" - testcase.test_dirs = ['a', 'a/b', 'c'] - for path in testcase.test_dirs: - testcase.rcs.mkdir(testcase.full_path(path)) - - testcase.test_files = ['a/text', 'a/b/text'] - - testcase.test_contents = { - 'rev_1': "Lorem ipsum", - 'uncommitted': "dolor sit amet", - } - - -class RCS_mkdir_TestCase(RCSTestCase): - """Test cases for RCS.mkdir method.""" - - def setUp(self): - super(RCS_mkdir_TestCase, self).setUp() - setup_rcs_revision_test_fixtures(self) - - def tearDown(self): - for path in reversed(sorted(self.test_dirs)): - self.rcs.recursive_remove(self.full_path(path)) - super(RCS_mkdir_TestCase, self).tearDown() - - def test_mkdir_creates_directory(self): - """Should create specified directory in filesystem.""" - for path in self.test_dirs: - full_path = self.full_path(path) - self.failUnless( - os.path.exists(full_path), - "path %(full_path)s does not exist" % vars()) - - -class RCS_commit_TestCase(RCSTestCase): - """Test cases for RCS.commit method.""" - - def setUp(self): - super(RCS_commit_TestCase, self).setUp() - setup_rcs_revision_test_fixtures(self) - - def tearDown(self): - for path in reversed(sorted(self.test_dirs)): - self.rcs.recursive_remove(self.full_path(path)) - super(RCS_commit_TestCase, self).tearDown() - - def test_file_contents_as_specified(self): - """Should set file contents as specified.""" - test_contents = self.test_contents['rev_1'] - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents(full_path, test_contents) - current_contents = self.rcs.get_file_contents(full_path) - self.failUnlessEqual(test_contents, current_contents) - - def test_file_contents_as_committed(self): - """Should have file contents as specified after commit.""" - test_contents = self.test_contents['rev_1'] - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents(full_path, test_contents) - revision = self.rcs.commit("Initial file contents.") - current_contents = self.rcs.get_file_contents(full_path) - self.failUnlessEqual(test_contents, current_contents) - - def test_file_contents_as_set_when_uncommitted(self): - """Should set file contents as specified after commit.""" - if not self.rcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.rcs.commit("Initial file contents.") - self.rcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - current_contents = self.rcs.get_file_contents(full_path) - self.failUnlessEqual( - self.test_contents['uncommitted'], current_contents) - - def test_revision_file_contents_as_committed(self): - """Should get file contents as committed to specified revision.""" - if not self.rcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.rcs.commit("Initial file contents.") - self.rcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - committed_contents = self.rcs.get_file_contents( - full_path, revision) - self.failUnlessEqual( - self.test_contents['rev_1'], committed_contents) - - -class RCS_duplicate_repo_TestCase(RCSTestCase): - """Test cases for RCS.duplicate_repo method.""" - - def setUp(self): - super(RCS_duplicate_repo_TestCase, self).setUp() - setup_rcs_revision_test_fixtures(self) - - def tearDown(self): - self.rcs.remove_duplicate_repo() - for path in reversed(sorted(self.test_dirs)): - self.rcs.recursive_remove(self.full_path(path)) - super(RCS_duplicate_repo_TestCase, self).tearDown() - - def test_revision_file_contents_as_committed(self): - """Should match file contents as committed to specified revision.""" - if not self.rcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.rcs.commit("Commit current status") - self.rcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - dup_repo_path = self.rcs.duplicate_repo(revision) - dup_file_path = os.path.join(dup_repo_path, path) - dup_file_contents = file(dup_file_path, 'rb').read() - self.failUnlessEqual( - self.test_contents['rev_1'], dup_file_contents) - self.rcs.remove_duplicate_repo() - - -def make_rcs_testcase_subclasses(rcs_class, namespace): - """Make RCSTestCase subclasses for rcs_class in the namespace.""" - rcs_testcase_classes = [ - c for c in ( - ob for ob in globals().values() if isinstance(ob, type)) - if issubclass(c, RCSTestCase)] - - for base_class in rcs_testcase_classes: - testcase_class_name = rcs_class.__name__ + base_class.__name__ - testcase_class_bases = (base_class,) - testcase_class_dict = dict(base_class.__dict__) - testcase_class_dict['Class'] = rcs_class - testcase_class = type( - testcase_class_name, testcase_class_bases, testcase_class_dict) - setattr(namespace, testcase_class_name, testcase_class) - - -unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) -suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/settings_object.py b/libbe/settings_object.py index dde247f..ceea9d5 100644 --- a/libbe/settings_object.py +++ b/libbe/settings_object.py @@ -148,7 +148,8 @@ def versioned_property(name, doc, checked = checked_property(allowed=allowed) fulldoc += "\n\nThe allowed values for this property are: %s." \ % (', '.join(allowed)) - hooked = change_hook_property(hook=change_hook, mutable=mutable) + hooked = change_hook_property(hook=change_hook, mutable=mutable, + default=EMPTY) primed = primed_property(primer=primer, initVal=UNPRIMED) settings = settings_property(name=name, null=UNPRIMED) docp = doc_property(doc=fulldoc) @@ -385,30 +386,24 @@ class SavedSettingsObjectTests(unittest.TestCase): self.failUnless(SAVES == [], SAVES) self.failUnless(t._settings_loaded == True, t._settings_loaded) self.failUnless(t.list_type == None, t.list_type) - self.failUnless(SAVES == [ - "'None' -> '<class 'libbe.settings_object.EMPTY'>'" - ], SAVES) + self.failUnless(SAVES == [], SAVES) self.failUnless(t.settings["List-type"]==EMPTY,t.settings["List-type"]) t.list_type = [] self.failUnless(t.settings["List-type"] == [], t.settings["List-type"]) self.failUnless(SAVES == [ - "'None' -> '<class 'libbe.settings_object.EMPTY'>'", "'<class 'libbe.settings_object.EMPTY'>' -> '[]'" ], SAVES) t.list_type.append(5) self.failUnless(SAVES == [ - "'None' -> '<class 'libbe.settings_object.EMPTY'>'", "'<class 'libbe.settings_object.EMPTY'>' -> '[]'", ], SAVES) self.failUnless(t.settings["List-type"] == [5],t.settings["List-type"]) self.failUnless(SAVES == [ # the append(5) has not yet been saved - "'None' -> '<class 'libbe.settings_object.EMPTY'>'", "'<class 'libbe.settings_object.EMPTY'>' -> '[]'", ], SAVES) self.failUnless(t.list_type == [5], t.list_type) # <-get triggers saved self.failUnless(SAVES == [ # now the append(5) has been saved. - "'None' -> '<class 'libbe.settings_object.EMPTY'>'", "'<class 'libbe.settings_object.EMPTY'>' -> '[]'", "'[]' -> '[5]'" ], SAVES) diff --git a/libbe/tree.py b/libbe/tree.py index 45ae085..06d09e5 100644 --- a/libbe/tree.py +++ b/libbe/tree.py @@ -15,6 +15,10 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +Define a traversable tree structure. +""" + import doctest class Tree(list): diff --git a/libbe/upgrade.py b/libbe/upgrade.py new file mode 100644 index 0000000..4123c72 --- /dev/null +++ b/libbe/upgrade.py @@ -0,0 +1,187 @@ +# Copyright (C) 2009 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Handle conversion between the various on-disk images. +""" + +import os, os.path +import sys +import doctest + +import encoding +import mapfile +import vcs + +# a list of all past versions +BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0", + "Bugs Everywhere Directory v1.1", + "Bugs Everywhere Directory v1.2"] + +# the current version +BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1] + +class Upgrader (object): + "Class for converting " + initial_version = None + final_version = None + def __init__(self, root): + self.root = root + # use the "None" VCS to ensure proper encoding/decoding and + # simplify path construction. + self.vcs = vcs.vcs_by_name("None") + self.vcs.root(self.root) + self.vcs.encoding = encoding.get_encoding() + + def get_path(self, *args): + """ + Return a path relative to .root. + """ + dir = os.path.join(self.root, ".be") + if len(args) == 0: + return dir + assert args[0] in ["version", "settings", "bugs"], str(args) + return os.path.join(dir, *args) + + def check_initial_version(self): + path = self.get_path("version") + version = self.vcs.get_file_contents(path).rstrip("\n") + assert version == self.initial_version, version + + def set_version(self): + path = self.get_path("version") + self.vcs.set_file_contents(path, self.final_version+"\n") + + def upgrade(self): + print >> sys.stderr, "upgrading bugdir from '%s' to '%s'" \ + % (self.initial_version, self.final_version) + self.check_initial_version() + self.set_version() + self._upgrade() + + def _upgrade(self): + raise NotImplementedError + + +class Upgrade_1_0_to_1_1 (Upgrader): + initial_version = "Bugs Everywhere Tree 1 0" + final_version = "Bugs Everywhere Directory v1.1" + def _upgrade_mapfile(self, path): + contents = self.vcs.get_file_contents(path) + old_format = False + for line in contents.splitlines(): + if len(line.split("=")) == 2: + old_format = True + break + if old_format == True: + # translate to YAML. + newlines = [] + for line in contents.splitlines(): + line = line.rstrip('\n') + if len(line) == 0: + continue + fields = line.split("=") + if len(fields) == 2: + key,value = fields + newlines.append('%s: "%s"' % (key, value.replace('"','\\"'))) + else: + newlines.append(line) + contents = '\n'.join(newlines) + # load the YAML and save + map = mapfile.parse(contents) + mapfile.map_save(self.vcs, path, map) + + def _upgrade(self): + """ + Comment value field "From" -> "Author". + Homegrown mapfile -> YAML. + """ + path = self.get_path("settings") + self._upgrade_mapfile(path) + for bug_uuid in os.listdir(self.get_path("bugs")): + path = self.get_path("bugs", bug_uuid, "values") + self._upgrade_mapfile(path) + c_path = ["bugs", bug_uuid, "comments"] + if not os.path.exists(self.get_path(*c_path)): + continue # no comments for this bug + for comment_uuid in os.listdir(self.get_path(*c_path)): + path_list = c_path + [comment_uuid, "values"] + path = self.get_path(*path_list) + self._upgrade_mapfile(path) + settings = mapfile.map_load(self.vcs, path) + if "From" in settings: + settings["Author"] = settings.pop("From") + mapfile.map_save(self.vcs, path, settings) + + +class Upgrade_1_1_to_1_2 (Upgrader): + initial_version = "Bugs Everywhere Directory v1.1" + final_version = "Bugs Everywhere Directory v1.2" + def _upgrade(self): + """ + BugDir settings field "rcs_name" -> "vcs_name". + """ + path = self.get_path("settings") + settings = mapfile.map_load(self.vcs, path) + if "rcs_name" in settings: + settings["vcs_name"] = settings.pop("rcs_name") + mapfile.map_save(self.vcs, path, settings) + + +upgraders = [Upgrade_1_0_to_1_1, + Upgrade_1_1_to_1_2] +upgrade_classes = {} +for upgrader in upgraders: + upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader + +def upgrade(path, current_version, + target_version=BUGDIR_DISK_VERSION): + """ + Call the appropriate upgrade function to convert current_version + to target_version. If a direct conversion function does not exist, + use consecutive conversion functions. + """ + if current_version not in BUGDIR_DISK_VERSIONS: + raise NotImplementedError, \ + "Cannot handle version '%s' yet." % version + if target_version not in BUGDIR_DISK_VERSIONS: + raise NotImplementedError, \ + "Cannot handle version '%s' yet." % version + + if (current_version, target_version) in upgrade_classes: + # direct conversion + upgrade_class = upgrade_classes[(current_version, target_version)] + u = upgrade_class(path) + u.upgrade() + else: + # consecutive single-step conversion + i = BUGDIR_DISK_VERSIONS.index(current_version) + while True: + version_a = BUGDIR_DISK_VERSIONS[i] + version_b = BUGDIR_DISK_VERSIONS[i+1] + try: + upgrade_class = upgrade_classes[(version_a, version_b)] + except KeyError: + raise NotImplementedError, \ + "Cannot convert version '%s' to '%s' yet." \ + % (version_a, version_b) + u = upgrade_class(path) + u.upgrade() + if version_b == target_version: + break + i += 1 + +suite = doctest.DocTestSuite() diff --git a/libbe/utility.py b/libbe/utility.py index 3df06b4..aafbf8d 100644 --- a/libbe/utility.py +++ b/libbe/utility.py @@ -14,6 +14,11 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Assorted utility functions that don't fit in anywhere else. +""" + import calendar import codecs import os diff --git a/libbe/vcs.py b/libbe/vcs.py new file mode 100644 index 0000000..a1d3022 --- /dev/null +++ b/libbe/vcs.py @@ -0,0 +1,938 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Alexander Belchenko <bialix@ukr.net> +# Ben Finney <ben+python@benfinney.id.au> +# Chris Ball <cjb@laptop.org> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Define the base VCS (Version Control System) class, which should be +subclassed by other Version Control System backends. The base class +implements a "do not version" VCS. +""" + +from subprocess import Popen, PIPE +import codecs +import os +import os.path +import re +from socket import gethostname +import shutil +import sys +import tempfile +import unittest +import doctest + +from utility import Dir, search_parent_directories + + +def _get_matching_vcs(matchfn): + """Return the first module for which matchfn(VCS_instance) is true""" + import arch + import bzr + import darcs + import git + import hg + for module in [arch, bzr, darcs, git, hg]: + vcs = module.new() + if matchfn(vcs) == True: + return vcs + del(vcs) + return VCS() + +def vcs_by_name(vcs_name): + """Return the module for the VCS with the given name""" + return _get_matching_vcs(lambda vcs: vcs.name == vcs_name) + +def detect_vcs(dir): + """Return an VCS instance for the vcs being used in this directory""" + return _get_matching_vcs(lambda vcs: vcs.detect(dir)) + +def installed_vcs(): + """Return an instance of an installed VCS""" + return _get_matching_vcs(lambda vcs: vcs.installed()) + + +class CommandError(Exception): + def __init__(self, command, status, stdout, stderr): + strerror = ["Command failed (%d):\n %s\n" % (status, stderr), + "while executing\n %s" % command] + Exception.__init__(self, "\n".join(strerror)) + self.command = command + self.status = status + self.stdout = stdout + self.stderr = stderr + +class SettingIDnotSupported(NotImplementedError): + pass + +class VCSnotRooted(Exception): + def __init__(self): + msg = "VCS not rooted" + Exception.__init__(self, msg) + +class PathNotInRoot(Exception): + def __init__(self, path, root): + msg = "Path '%s' not in root '%s'" % (path, root) + Exception.__init__(self, msg) + self.path = path + self.root = root + +class NoSuchFile(Exception): + def __init__(self, pathname, root="."): + path = os.path.abspath(os.path.join(root, pathname)) + Exception.__init__(self, "No such file: %s" % path) + +class EmptyCommit(Exception): + def __init__(self): + Exception.__init__(self, "No changes to commit") + + +def new(): + return VCS() + +class VCS(object): + """ + This class implements a 'no-vcs' interface. + + Support for other VCSs can be added by subclassing this class, and + overriding methods _vcs_*() with code appropriate for your VCS. + + The methods _u_*() are utility methods available to the _vcs_*() + methods. + """ + name = "None" + client = "" # command-line tool for _u_invoke_client + versioned = False + def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()): + self.paranoid = paranoid + self.verboseInvoke = False + self.rootdir = None + self._duplicateBasedir = None + self._duplicateDirname = None + self.encoding = encoding + def __del__(self): + self.cleanup() + + def _vcs_help(self): + """ + Return the command help string. + (Allows a simple test to see if the client is installed.) + """ + pass + def _vcs_detect(self, path=None): + """ + Detect whether a directory is revision controlled with this VCS. + """ + return True + def _vcs_root(self, path): + """ + Get the VCS root. This is the default working directory for + future invocations. You would normally set this to the root + directory for your VCS. + """ + if os.path.isdir(path)==False: + path = os.path.dirname(path) + if path == "": + path = os.path.abspath(".") + return path + def _vcs_init(self, path): + """ + Begin versioning the tree based at path. + """ + pass + def _vcs_cleanup(self): + """ + Remove any cruft that _vcs_init() created outside of the + versioned tree. + """ + pass + def _vcs_get_user_id(self): + """ + Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the VCS has not been configured with a username, return None. + """ + return None + def _vcs_set_user_id(self, value): + """ + Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>"). + This is run if the VCS has not been configured with a usename, so + that commits will have a reasonable FROM value. + """ + raise SettingIDnotSupported + def _vcs_add(self, path): + """ + Add the already created file at path to version control. + """ + pass + def _vcs_remove(self, path): + """ + Remove the file at path from version control. Optionally + remove the file from the filesystem as well. + """ + pass + def _vcs_update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + pass + def _vcs_get_file_contents(self, path, revision=None, binary=False): + """ + Get the file contents as they were in a given revision. + Revision==None specifies the current revision. + """ + assert revision == None, \ + "The %s VCS does not support revision specifiers" % self.name + if binary == False: + f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding) + else: + f = open(os.path.join(self.rootdir, path), "rb") + contents = f.read() + f.close() + return contents + def _vcs_duplicate_repo(self, directory, revision=None): + """ + Get the repository as it was in a given revision. + revision==None specifies the current revision. + dir specifies a directory to create the duplicate in. + """ + shutil.copytree(self.rootdir, directory, True) + def _vcs_commit(self, commitfile, allow_empty=False): + """ + Commit the current working directory, using the contents of + commitfile as the comment. Return the name of the old + revision (or None if commits are not supported). + + If allow_empty == False, raise EmptyCommit if there are no + changes to commit. + """ + return None + def _vcs_revision_id(self, index): + """ + Return the name of the <index>th revision. Index will be an + integer (possibly <= 0). The choice of which branch to follow + when crossing branches/merges is not defined. + + Return None if revision IDs are not supported, or if the + specified revision does not exist. + """ + return None + def installed(self): + try: + self._vcs_help() + return True + except OSError, e: + if e.errno == errno.ENOENT: + return False + except CommandError: + return False + def detect(self, path="."): + """ + Detect whether a directory is revision controlled with this VCS. + """ + return self._vcs_detect(path) + def root(self, path): + """ + Set the root directory to the path's VCS root. This is the + default working directory for future invocations. + """ + self.rootdir = self._vcs_root(path) + def init(self, path): + """ + Begin versioning the tree based at path. + Also roots the vcs at path. + """ + if os.path.isdir(path)==False: + path = os.path.dirname(path) + self._vcs_init(path) + self.root(path) + def cleanup(self): + self._vcs_cleanup() + def get_user_id(self): + """ + Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the VCS has not been configured with a username, return the user's + id. You can override the automatic lookup procedure by setting the + VCS.user_id attribute to a string of your choice. + """ + if hasattr(self, "user_id"): + if self.user_id != None: + return self.user_id + id = self._vcs_get_user_id() + if id == None: + name = self._u_get_fallback_username() + email = self._u_get_fallback_email() + id = self._u_create_id(name, email) + print >> sys.stderr, "Guessing id '%s'" % id + try: + self.set_user_id(id) + except SettingIDnotSupported: + pass + return id + def set_user_id(self, value): + """ + Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>"). + This is run if the VCS has not been configured with a usename, so + that commits will have a reasonable FROM value. + """ + self._vcs_set_user_id(value) + def add(self, path): + """ + Add the already created file at path to version control. + """ + self._vcs_add(self._u_rel_path(path)) + def remove(self, path): + """ + Remove a file from both version control and the filesystem. + """ + self._vcs_remove(self._u_rel_path(path)) + if os.path.exists(path): + os.remove(path) + def recursive_remove(self, dirname): + """ + Remove a file/directory and all its decendents from both + version control and the filesystem. + """ + if not os.path.exists(dirname): + raise NoSuchFile(dirname) + for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): + filenames.extend(dirnames) + for path in filenames: + fullpath = os.path.join(dirpath, path) + if os.path.exists(fullpath) == False: + continue + self._vcs_remove(self._u_rel_path(fullpath)) + if os.path.exists(dirname): + shutil.rmtree(dirname) + def update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + self._vcs_update(self._u_rel_path(path)) + def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False): + """ + Get the file as it was in a given revision. + Revision==None specifies the current revision. + """ + if not os.path.exists(path): + raise NoSuchFile(path) + if self._use_vcs(path, allow_no_vcs): + relpath = self._u_rel_path(path) + contents = self._vcs_get_file_contents(relpath,revision,binary=binary) + else: + f = codecs.open(path, "r", self.encoding) + contents = f.read() + f.close() + return contents + def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False): + """ + Set the file contents under version control. + """ + add = not os.path.exists(path) + if binary == False: + f = codecs.open(path, "w", self.encoding) + else: + f = open(path, "wb") + f.write(contents) + f.close() + + if self._use_vcs(path, allow_no_vcs): + if add: + self.add(path) + else: + self.update(path) + def mkdir(self, path, allow_no_vcs=False, check_parents=True): + """ + Create (if neccessary) a directory at path under version + control. + """ + if check_parents == True: + parent = os.path.dirname(path) + if not os.path.exists(parent): # recurse through parents + self.mkdir(parent, allow_no_vcs, check_parents) + if not os.path.exists(path): + os.mkdir(path) + if self._use_vcs(path, allow_no_vcs): + self.add(path) + else: + assert os.path.isdir(path) + if self._use_vcs(path, allow_no_vcs): + #self.update(path)# Don't update directories. Changing files + pass # underneath them should be sufficient. + + def duplicate_repo(self, revision=None): + """ + Get the repository as it was in a given revision. + revision==None specifies the current revision. + Return the path to the arbitrary directory at the base of the new repo. + """ + # Dirname in Baseir to protect against simlink attacks. + if self._duplicateBasedir == None: + self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs') + self._duplicateDirname = \ + os.path.join(self._duplicateBasedir, "duplicate") + self._vcs_duplicate_repo(directory=self._duplicateDirname, + revision=revision) + return self._duplicateDirname + def remove_duplicate_repo(self): + """ + Clean up a duplicate repo created with duplicate_repo(). + """ + if self._duplicateBasedir != None: + shutil.rmtree(self._duplicateBasedir) + self._duplicateBasedir = None + self._duplicateDirname = None + def commit(self, summary, body=None, allow_empty=False): + """ + Commit the current working directory, with a commit message + string summary and body. Return the name of the old revision + (or None if versioning is not supported). + + If allow_empty == False (the default), raise EmptyCommit if + there are no changes to commit. + """ + summary = summary.strip()+'\n' + if body is not None: + summary += '\n' + body.strip() + '\n' + descriptor, filename = tempfile.mkstemp() + revision = None + try: + temp_file = os.fdopen(descriptor, 'wb') + temp_file.write(summary) + temp_file.flush() + self.precommit() + revision = self._vcs_commit(filename, allow_empty=allow_empty) + temp_file.close() + self.postcommit() + finally: + os.remove(filename) + return revision + def precommit(self): + """ + Executed before all attempted commits. + """ + pass + def postcommit(self): + """ + Only executed after successful commits. + """ + pass + def revision_id(self, index=None): + """ + Return the name of the <index>th revision. The choice of + which branch to follow when crossing branches/merges is not + defined. + + Return None if index==None, revision IDs are not supported, or + if the specified revision does not exist. + """ + if index == None: + return None + return self._vcs_revision_id(index) + def _u_any_in_string(self, list, string): + """ + Return True if any of the strings in list are in string. + Otherwise return False. + """ + for list_string in list: + if list_string in string: + return True + return False + def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None): + """ + expect should be a tuple of allowed exit codes. cwd should be + the directory from which the command will be executed. + """ + if cwd == None: + cwd = self.rootdir + if self.verboseInvoke == True: + print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args)) + try : + if sys.platform != "win32": + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd) + else: + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, + shell=True, cwd=cwd) + except OSError, e : + raise CommandError(args, status=e.args[0], stdout="", stderr=e) + output,error = q.communicate(input=stdin) + status = q.wait() + if self.verboseInvoke == True: + print >> sys.stderr, "%d\n%s%s" % (status, output, error) + if status not in expect: + raise CommandError(args, status, output, error) + return status, output, error + def _u_invoke_client(self, *args, **kwargs): + directory = kwargs.get('directory',None) + expect = kwargs.get('expect', (0,)) + stdin = kwargs.get('stdin', None) + cl_args = [self.client] + cl_args.extend(args) + return self._u_invoke(cl_args, stdin=stdin,expect=expect,cwd=directory) + def _u_search_parent_directories(self, path, filename): + """ + Find the file (or directory) named filename in path or in any + of path's parents. + + e.g. + search_parent_directories("/a/b/c", ".be") + will return the path to the first existing file from + /a/b/c/.be + /a/b/.be + /a/.be + /.be + or None if none of those files exist. + """ + return search_parent_directories(path, filename) + def _use_vcs(self, path, allow_no_vcs): + """ + Try and decide if _vcs_add/update/mkdir/etc calls will + succeed. Returns True is we think the vcs_call would + succeeed, and False otherwise. + """ + use_vcs = True + exception = None + if self.rootdir != None: + if self.path_in_root(path) == False: + use_vcs = False + exception = PathNotInRoot(path, self.rootdir) + else: + use_vcs = False + exception = VCSnotRooted + if use_vcs == False and allow_no_vcs==False: + raise exception + return use_vcs + def path_in_root(self, path, root=None): + """ + Return the relative path to path from root. + >>> vcs = new() + >>> vcs.path_in_root("/a.b/c/.be", "/a.b/c") + True + >>> vcs.path_in_root("/a.b/.be", "/a.b/c") + False + """ + if root == None: + if self.rootdir == None: + raise VCSnotRooted + root = self.rootdir + path = os.path.abspath(path) + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + if not path.startswith(absRootSlashedDir): + return False + return True + def _u_rel_path(self, path, root=None): + """ + Return the relative path to path from root. + >>> vcs = new() + >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c") + '.be' + """ + if root == None: + if self.rootdir == None: + raise VCSnotRooted + root = self.rootdir + path = os.path.abspath(path) + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + if not path.startswith(absRootSlashedDir): + raise PathNotInRoot(path, absRootSlashedDir) + assert path != absRootSlashedDir, \ + "file %s == root directory %s" % (path, absRootSlashedDir) + relpath = path[len(absRootSlashedDir):] + return relpath + def _u_abspath(self, path, root=None): + """ + Return the absolute path from a path realtive to root. + >>> vcs = new() + >>> vcs._u_abspath(".be", "/a.b/c") + '/a.b/c/.be' + """ + if root == None: + assert self.rootdir != None, "VCS not rooted" + root = self.rootdir + return os.path.abspath(os.path.join(root, path)) + def _u_create_id(self, name, email=None): + """ + >>> vcs = new() + >>> vcs._u_create_id("John Doe", "jdoe@example.com") + 'John Doe <jdoe@example.com>' + >>> vcs._u_create_id("John Doe") + 'John Doe' + """ + assert len(name) > 0 + if email == None or len(email) == 0: + return name + else: + return "%s <%s>" % (name, email) + def _u_parse_id(self, value): + """ + >>> vcs = new() + >>> vcs._u_parse_id("John Doe <jdoe@example.com>") + ('John Doe', 'jdoe@example.com') + >>> vcs._u_parse_id("John Doe") + ('John Doe', None) + >>> try: + ... vcs._u_parse_id("John Doe <jdoe@example.com><what?>") + ... except AssertionError: + ... print "Invalid match" + Invalid match + """ + emailexp = re.compile("(.*) <([^>]*)>(.*)") + match = emailexp.search(value) + if match == None: + email = None + name = value + else: + assert len(match.groups()) == 3 + assert match.groups()[2] == "", match.groups() + email = match.groups()[1] + name = match.groups()[0] + assert name != None + assert len(name) > 0 + return (name, email) + def _u_get_fallback_username(self): + name = None + for envariable in ["LOGNAME", "USERNAME"]: + if os.environ.has_key(envariable): + name = os.environ[envariable] + break + assert name != None + return name + def _u_get_fallback_email(self): + hostname = gethostname() + name = self._u_get_fallback_username() + return "%s@%s" % (name, hostname) + def _u_parse_commitfile(self, commitfile): + """ + Split the commitfile created in self.commit() back into + summary and header lines. + """ + f = codecs.open(commitfile, "r", self.encoding) + summary = f.readline() + body = f.read() + body.lstrip('\n') + if len(body) == 0: + body = None + f.close() + return (summary, body) + + +def setup_vcs_test_fixtures(testcase): + """Set up test fixtures for VCS test case.""" + testcase.vcs = testcase.Class() + testcase.dir = Dir() + testcase.dirname = testcase.dir.path + + vcs_not_supporting_uninitialized_user_id = [] + vcs_not_supporting_set_user_id = ["None", "hg"] + testcase.vcs_supports_uninitialized_user_id = ( + testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id) + testcase.vcs_supports_set_user_id = ( + testcase.vcs.name not in vcs_not_supporting_set_user_id) + + if not testcase.vcs.installed(): + testcase.fail( + "%(name)s VCS not found" % vars(testcase.Class)) + + if testcase.Class.name != "None": + testcase.failIf( + testcase.vcs.detect(testcase.dirname), + "Detected %(name)s VCS before initialising" + % vars(testcase.Class)) + + testcase.vcs.init(testcase.dirname) + + +class VCSTestCase(unittest.TestCase): + """Test cases for base VCS class.""" + + Class = VCS + + def __init__(self, *args, **kwargs): + super(VCSTestCase, self).__init__(*args, **kwargs) + self.dirname = None + + def setUp(self): + super(VCSTestCase, self).setUp() + setup_vcs_test_fixtures(self) + + def tearDown(self): + del(self.vcs) + super(VCSTestCase, self).tearDown() + + def full_path(self, rel_path): + return os.path.join(self.dirname, rel_path) + + +class VCS_init_TestCase(VCSTestCase): + """Test cases for VCS.init method.""" + + def test_detect_should_succeed_after_init(self): + """Should detect VCS in directory after initialization.""" + self.failUnless( + self.vcs.detect(self.dirname), + "Did not detect %(name)s VCS after initialising" + % vars(self.Class)) + + def test_vcs_rootdir_in_specified_root_path(self): + """VCS root directory should be in specified root path.""" + rp = os.path.realpath(self.vcs.rootdir) + dp = os.path.realpath(self.dirname) + vcs_name = self.Class.name + self.failUnless( + dp == rp or rp == None, + "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars()) + + +class VCS_get_user_id_TestCase(VCSTestCase): + """Test cases for VCS.get_user_id method.""" + + def test_gets_existing_user_id(self): + """Should get the existing user ID.""" + if not self.vcs_supports_uninitialized_user_id: + return + + user_id = self.vcs.get_user_id() + self.failUnless( + user_id is not None, + "unable to get a user id") + + +class VCS_set_user_id_TestCase(VCSTestCase): + """Test cases for VCS.set_user_id method.""" + + def setUp(self): + super(VCS_set_user_id_TestCase, self).setUp() + + if self.vcs_supports_uninitialized_user_id: + self.prev_user_id = self.vcs.get_user_id() + else: + self.prev_user_id = "Uninitialized identity <bogus@example.org>" + + if self.vcs_supports_set_user_id: + self.test_new_user_id = "John Doe <jdoe@example.com>" + self.vcs.set_user_id(self.test_new_user_id) + + def tearDown(self): + if self.vcs_supports_set_user_id: + self.vcs.set_user_id(self.prev_user_id) + super(VCS_set_user_id_TestCase, self).tearDown() + + def test_raises_error_in_unsupported_vcs(self): + """Should raise an error in a VCS that doesn't support it.""" + if self.vcs_supports_set_user_id: + return + self.assertRaises( + SettingIDnotSupported, + self.vcs.set_user_id, "foo") + + def test_updates_user_id_in_supporting_vcs(self): + """Should update the user ID in an VCS that supports it.""" + if not self.vcs_supports_set_user_id: + return + user_id = self.vcs.get_user_id() + self.failUnlessEqual( + self.test_new_user_id, user_id, + "user id not set correctly (expected %s, got %s)" + % (self.test_new_user_id, user_id)) + + +def setup_vcs_revision_test_fixtures(testcase): + """Set up revision test fixtures for VCS test case.""" + testcase.test_dirs = ['a', 'a/b', 'c'] + for path in testcase.test_dirs: + testcase.vcs.mkdir(testcase.full_path(path)) + + testcase.test_files = ['a/text', 'a/b/text'] + + testcase.test_contents = { + 'rev_1': "Lorem ipsum", + 'uncommitted': "dolor sit amet", + } + + +class VCS_mkdir_TestCase(VCSTestCase): + """Test cases for VCS.mkdir method.""" + + def setUp(self): + super(VCS_mkdir_TestCase, self).setUp() + setup_vcs_revision_test_fixtures(self) + + def tearDown(self): + for path in reversed(sorted(self.test_dirs)): + self.vcs.recursive_remove(self.full_path(path)) + super(VCS_mkdir_TestCase, self).tearDown() + + def test_mkdir_creates_directory(self): + """Should create specified directory in filesystem.""" + for path in self.test_dirs: + full_path = self.full_path(path) + self.failUnless( + os.path.exists(full_path), + "path %(full_path)s does not exist" % vars()) + + +class VCS_commit_TestCase(VCSTestCase): + """Test cases for VCS.commit method.""" + + def setUp(self): + super(VCS_commit_TestCase, self).setUp() + setup_vcs_revision_test_fixtures(self) + + def tearDown(self): + for path in reversed(sorted(self.test_dirs)): + self.vcs.recursive_remove(self.full_path(path)) + super(VCS_commit_TestCase, self).tearDown() + + def test_file_contents_as_specified(self): + """Should set file contents as specified.""" + test_contents = self.test_contents['rev_1'] + for path in self.test_files: + full_path = self.full_path(path) + self.vcs.set_file_contents(full_path, test_contents) + current_contents = self.vcs.get_file_contents(full_path) + self.failUnlessEqual(test_contents, current_contents) + + def test_file_contents_as_committed(self): + """Should have file contents as specified after commit.""" + test_contents = self.test_contents['rev_1'] + for path in self.test_files: + full_path = self.full_path(path) + self.vcs.set_file_contents(full_path, test_contents) + revision = self.vcs.commit("Initial file contents.") + current_contents = self.vcs.get_file_contents(full_path) + self.failUnlessEqual(test_contents, current_contents) + + def test_file_contents_as_set_when_uncommitted(self): + """Should set file contents as specified after commit.""" + if not self.vcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.vcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.vcs.commit("Initial file contents.") + self.vcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + current_contents = self.vcs.get_file_contents(full_path) + self.failUnlessEqual( + self.test_contents['uncommitted'], current_contents) + + def test_revision_file_contents_as_committed(self): + """Should get file contents as committed to specified revision.""" + if not self.vcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.vcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.vcs.commit("Initial file contents.") + self.vcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + committed_contents = self.vcs.get_file_contents( + full_path, revision) + self.failUnlessEqual( + self.test_contents['rev_1'], committed_contents) + + def test_revision_id_as_committed(self): + """Check for compatibility between .commit() and .revision_id()""" + if not self.vcs.versioned: + self.failUnlessEqual(self.vcs.revision_id(5), None) + return + committed_revisions = [] + for path in self.test_files: + full_path = self.full_path(path) + self.vcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.vcs.commit("Initial %s contents." % path) + committed_revisions.append(revision) + self.vcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + revision = self.vcs.commit("Altered %s contents." % path) + committed_revisions.append(revision) + for i,revision in enumerate(committed_revisions): + self.failUnlessEqual(self.vcs.revision_id(i), revision) + i += -len(committed_revisions) # check negative indices + self.failUnlessEqual(self.vcs.revision_id(i), revision) + i = len(committed_revisions) + self.failUnlessEqual(self.vcs.revision_id(i), None) + self.failUnlessEqual(self.vcs.revision_id(-i-1), None) + + def test_revision_id_as_committed(self): + """Check revision id before first commit""" + if not self.vcs.versioned: + self.failUnlessEqual(self.vcs.revision_id(5), None) + return + committed_revisions = [] + for path in self.test_files: + self.failUnlessEqual(self.vcs.revision_id(0), None) + + +class VCS_duplicate_repo_TestCase(VCSTestCase): + """Test cases for VCS.duplicate_repo method.""" + + def setUp(self): + super(VCS_duplicate_repo_TestCase, self).setUp() + setup_vcs_revision_test_fixtures(self) + + def tearDown(self): + self.vcs.remove_duplicate_repo() + for path in reversed(sorted(self.test_dirs)): + self.vcs.recursive_remove(self.full_path(path)) + super(VCS_duplicate_repo_TestCase, self).tearDown() + + def test_revision_file_contents_as_committed(self): + """Should match file contents as committed to specified revision.""" + if not self.vcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.vcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.vcs.commit("Commit current status") + self.vcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + dup_repo_path = self.vcs.duplicate_repo(revision) + dup_file_path = os.path.join(dup_repo_path, path) + dup_file_contents = file(dup_file_path, 'rb').read() + self.failUnlessEqual( + self.test_contents['rev_1'], dup_file_contents) + self.vcs.remove_duplicate_repo() + + +def make_vcs_testcase_subclasses(vcs_class, namespace): + """Make VCSTestCase subclasses for vcs_class in the namespace.""" + vcs_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, VCSTestCase)] + + for base_class in vcs_testcase_classes: + testcase_class_name = vcs_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = vcs_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/version.py b/libbe/version.py new file mode 100644 index 0000000..f8eebbd --- /dev/null +++ b/libbe/version.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# Copyright (C) 2009 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Store version info for this BE installation. By default, use the +bzr-generated information in _version.py, but allow manual overriding +by setting _VERSION. This allows support of both the "I don't want to +be bothered setting version strings" and the "I want complete control +over the version strings" workflows. +""" + +import libbe._version as _version + +# Manually set a version string (optional, defaults to bzr revision id) +#_VERSION = "1.2.3" + +def version(verbose=False): + """ + Returns the version string for this BE installation. If + verbose==True, the string will include extra lines with more + detail (e.g. bzr branch nickname, etc.). + """ + if "_VERSION" in globals(): + string = _VERSION + else: + string = _version.version_info["revision_id"] + if verbose == True: + string += ("\n" + "revision: %(revno)d\n" + "nick: %(branch_nick)s\n" + "revision id: %(revision_id)s" + % _version.version_info) + return string + +if __name__ == "__main__": + print version(verbose=True) |