From 071fef7c351c4fc23696aa6db693411b78da2edb Mon Sep 17 00:00:00 2001 From: Gianluca Montecchi Date: Fri, 2 Oct 2009 00:08:33 +0200 Subject: Fixed the merge --- libbe/arch.py | 295 ++++++++++++++++ libbe/beuuid.py | 61 ++++ libbe/bug.py | 547 +++++++++++++++++++++++++++++ libbe/bugdir.py | 676 ++++++++++++++++++++++++++++++++++++ libbe/bzr.py | 101 ++++++ libbe/cmdutil.py | 218 ++++++++++++ libbe/comment.py | 662 +++++++++++++++++++++++++++++++++++ libbe/config.py | 83 +++++ libbe/darcs.py | 163 +++++++++ libbe/diff.py | 125 +++++++ libbe/editor.py | 101 ++++++ libbe/encoding.py | 51 +++ libbe/git.py | 120 +++++++ libbe/hg.py | 96 ++++++ libbe/mapfile.py | 127 +++++++ libbe/plugin.py | 71 ++++ libbe/properties.py | 638 ++++++++++++++++++++++++++++++++++ libbe/rcs.py | 876 +++++++++++++++++++++++++++++++++++++++++++++++ libbe/settings_object.py | 417 ++++++++++++++++++++++ libbe/tree.py | 179 ++++++++++ libbe/utility.py | 129 +++++++ 21 files changed, 5736 insertions(+) diff --git a/libbe/arch.py b/libbe/arch.py index e69de29..2f45aa9 100644 --- a/libbe/arch.py +++ b/libbe/arch.py @@ -0,0 +1,295 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Ben Finney +# James Rowe +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import codecs +import os +import re +import shutil +import sys +import time +import unittest +import doctest + +import config +from beuuid import uuid_gen +import rcs +from rcs import RCS + +DEFAULT_CLIENT = "tla" + +client = config.get_val("arch_client", default=DEFAULT_CLIENT) + +def new(): + return Arch() + +class Arch(RCS): + name = "Arch" + client = client + versioned = True + _archive_name = None + _archive_dir = None + _tmp_archive = False + _project_name = None + _tmp_project = False + _arch_paramdir = os.path.expanduser("~/.arch-params") + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + """Detect whether a directory is revision-controlled using Arch""" + if self._u_search_parent_directories(path, "{arch}") != None : + config.set_val("arch_client", client) + return True + return False + def _rcs_init(self, path): + self._create_archive(path) + self._create_project(path) + self._add_project_code(path) + def _create_archive(self, path): + # Create a new archive + # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive + assert self._archive_name == None + id = self.get_user_id() + name, email = self._u_parse_id(id) + if email == None: + email = "%s@example.com" % name + trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8]) + self._archive_name = "%s--%s" % (email, trailer) + self._archive_dir = "/tmp/%s" % trailer + self._tmp_archive = True + self._u_invoke_client("make-archive", self._archive_name, + self._archive_dir, directory=path) + def _invoke_client(self, *args, **kwargs): + """ + Invoke the client on our archive. + """ + assert self._archive_name != None + command = args[0] + if len(args) > 1: + tailargs = args[1:] + else: + tailargs = [] + arglist = [command, "-A", self._archive_name] + arglist.extend(tailargs) + args = tuple(arglist) + return self._u_invoke_client(*args, **kwargs) + def _remove_archive(self): + assert self._tmp_archive == True + assert self._archive_dir != None + assert self._archive_name != None + os.remove(os.path.join(self._arch_paramdir, + "=locations", self._archive_name)) + shutil.rmtree(self._archive_dir) + self._tmp_archive = False + self._archive_dir = False + self._archive_name = False + def _create_project(self, path): + """ + Create a temporary Arch project in the directory PATH. This + project will be removed by + __del__->cleanup->_rcs_cleanup->_remove_project + """ + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project + category = "bugs-everywhere" + branch = "mainline" + version = "0.1" + self._project_name = "%s--%s--%s" % (category, branch, version) + self._invoke_client("archive-setup", self._project_name, + directory=path) + self._tmp_project = True + def _remove_project(self): + assert self._tmp_project == True + assert self._project_name != None + assert self._archive_dir != None + shutil.rmtree(os.path.join(self._archive_dir, self._project_name)) + self._tmp_project = False + self._project_name = False + def _archive_project_name(self): + assert self._archive_name != None + assert self._project_name != None + return "%s/%s" % (self._archive_name, self._project_name) + def _adjust_naming_conventions(self, path): + """ + By default, Arch restricts source code filenames to + ^[_=a-zA-Z0-9].*$ + See + http://regexps.srparish.net/tutorial-tla/naming-conventions.html + Since our bug directory '.be' doesn't satisfy these conventions, + we need to adjust them. + + The conventions are specified in + project-root/{arch}/=tagging-method + """ + tagpath = os.path.join(path, "{arch}", "=tagging-method") + lines_out = [] + f = codecs.open(tagpath, "r", self.encoding) + for line in f: + if line.startswith("source "): + lines_out.append("source ^[._=a-zA-X0-9].*$\n") + else: + lines_out.append(line) + f.close() + f = codecs.open(tagpath, "w", self.encoding) + f.write("".join(lines_out)) + f.close() + + def _add_project_code(self, path): + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-source.html + # http://regexps.srparish.net/tutorial-tla/importing-first.html + self._invoke_client("init-tree", self._project_name, + directory=path) + self._adjust_naming_conventions(path) + self._invoke_client("import", "--summary", "Began versioning", + directory=path) + def _rcs_cleanup(self): + if self._tmp_project == True: + self._remove_project() + if self._tmp_archive == True: + self._remove_archive() + + def _rcs_root(self, path): + if not os.path.isdir(path): + dirname = os.path.dirname(path) + else: + dirname = path + status,output,error = self._u_invoke_client("tree-root", dirname) + root = output.rstrip('\n') + + self._get_archive_project_name(root) + + return root + + def _get_archive_name(self, root): + status,output,error = self._u_invoke_client("archives") + lines = output.split('\n') + # e.g. output: + # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52 + # /tmp/BEtestXXXXXX/rootdir + # (+ repeats) + for archive,location in zip(lines[::2], lines[1::2]): + if os.path.realpath(location) == os.path.realpath(root): + self._archive_name = archive + assert self._archive_name != None + + def _get_archive_project_name(self, root): + # get project names + status,output,error = self._u_invoke_client("tree-version", directory=root) + # e.g output + # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1 + archive_name,project_name = output.rstrip('\n').split('/') + self._archive_name = archive_name + self._project_name = project_name + def _rcs_get_user_id(self): + try: + status,output,error = self._u_invoke_client('my-id') + return output.rstrip('\n') + except Exception, e: + if 'no arch user id set' in e.args[0]: + return None + else: + raise + def _rcs_set_user_id(self, value): + self._u_invoke_client('my-id', value) + def _rcs_add(self, path): + self._u_invoke_client("add-id", path) + realpath = os.path.realpath(self._u_abspath(path)) + pathAdded = realpath in self._list_added(self.rootdir) + if self.paranoid and not pathAdded: + self._force_source(path) + def _list_added(self, root): + assert os.path.exists(root) + assert os.access(root, os.X_OK) + root = os.path.realpath(root) + status,output,error = self._u_invoke_client("inventory", "--source", + "--both", "--all", root) + inv_str = output.rstrip('\n') + return [os.path.join(root, p) for p in inv_str.split('\n')] + def _add_dir_rule(self, rule, dirname, root): + inv_path = os.path.join(dirname, '.arch-inventory') + f = codecs.open(inv_path, "a", self.encoding) + f.write(rule) + f.close() + if os.path.realpath(inv_path) not in self._list_added(root): + paranoid = self.paranoid + self.paranoid = False + self.add(inv_path) + self.paranoid = paranoid + def _force_source(self, path): + rule = "source %s\n" % self._u_rel_path(path) + self._add_dir_rule(rule, os.path.dirname(path), self.rootdir) + if os.path.realpath(path) not in self._list_added(self.rootdir): + raise CantAddFile(path) + def _rcs_remove(self, path): + if not '.arch-ids' in path: + self._u_invoke_client("delete-id", path) + def _rcs_update(self, path): + pass + def _rcs_get_file_contents(self, path, revision=None, binary=False): + if revision == None: + return RCS._rcs_get_file_contents(self, path, revision, binary=binary) + else: + status,output,error = \ + self._invoke_client("file-find", path, revision) + relpath = output.rstrip('\n') + abspath = os.path.join(self.rootdir, relpath) + f = codecs.open(abspath, "r", self.encoding) + contents = f.read() + f.close() + return contents + def _rcs_duplicate_repo(self, directory, revision=None): + if revision == None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + status,output,error = \ + self._u_invoke_client("get", revision,directory) + def _rcs_commit(self, commitfile, allow_empty=False): + if allow_empty == False: + # arch applies empty commits without complaining, so check first + status,output,error = self._u_invoke_client("changes",expect=(0,1)) + if status == 0: + raise rcs.EmptyCommit() + summary,body = self._u_parse_commitfile(commitfile) + args = ["commit", "--summary", summary] + if body != None: + args.extend(["--log-message",body]) + status,output,error = self._u_invoke_client(*args) + revision = None + revline = re.compile("[*] committed (.*)") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revpath = match.groups()[0] + assert not " " in revpath, revpath + assert revpath.startswith(self._archive_project_name()+'--') + revision = revpath[len(self._archive_project_name()+'--'):] + return revpath + +class CantAddFile(Exception): + def __init__(self, file): + self.file = file + Exception.__init__(self, "Can't automatically add file %s" % file) + + + +rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__]) + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/beuuid.py b/libbe/beuuid.py index e69de29..bc47208 100644 --- a/libbe/beuuid.py +++ b/libbe/beuuid.py @@ -0,0 +1,61 @@ +# Copyright (C) 2008-2009 W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +Backwards compatibility support for Python 2.4. Once people give up +on 2.4 ;), the uuid call should be merged into bugdir.py +""" + +import unittest + +try: + from uuid import uuid4 # Python >= 2.5 + def uuid_gen(): + id = uuid4() + idstr = id.urn + start = "urn:uuid:" + assert idstr.startswith(start) + return idstr[len(start):] +except ImportError: + import os + import sys + from subprocess import Popen, PIPE + + def uuid_gen(): + # Shell-out to system uuidgen + args = ['uuidgen', 'r'] + try: + if sys.platform != "win32": + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) + else: + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, + shell=True, cwd=cwd) + except OSError, e : + strerror = "%s\nwhile executing %s" % (e.args[1], args) + raise OSError, strerror + output, error = q.communicate() + status = q.wait() + if status != 0: + strerror = "%s\nwhile executing %s" % (status, args) + raise Exception, strerror + return output.rstrip('\n') + +class UUIDtestCase(unittest.TestCase): + def testUUID_gen(self): + id = uuid_gen() + self.failUnless(len(id) == 36, "invalid UUID '%s'" % id) + +suite = unittest.TestLoader().loadTestsFromTestCase(UUIDtestCase) diff --git a/libbe/bug.py b/libbe/bug.py index e69de29..c1e5481 100644 --- a/libbe/bug.py +++ b/libbe/bug.py @@ -0,0 +1,547 @@ +# Copyright (C) 2008-2009 Chris Ball +# Thomas Habets +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import os +import os.path +import errno +import time +import types +import xml.sax.saxutils +import doctest + +from beuuid import uuid_gen +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, cached_property, \ + primed_property, change_hook_property, settings_property +import settings_object +import mapfile +import comment +import utility + + +### Define and describe valid bug categories +# Use a tuple of (category, description) tuples since we don't have +# ordered dicts in Python yet http://www.python.org/dev/peps/pep-0372/ + +# in order of increasing severity. (name, description) pairs +severity_def = ( + ("wishlist","A feature that could improve usefulness, but not a bug."), + ("minor","The standard bug level."), + ("serious","A bug that requires workarounds."), + ("critical","A bug that prevents some features from working at all."), + ("fatal","A bug that makes the package unusable.")) + +# in order of increasing resolution +# roughly following http://www.bugzilla.org/docs/3.2/en/html/lifecycle.html +active_status_def = ( + ("unconfirmed","A possible bug which lacks independent existance confirmation."), + ("open","A working bug that has not been assigned to a developer."), + ("assigned","A working bug that has been assigned to a developer."), + ("test","The code has been adjusted, but the fix is still being tested.")) +inactive_status_def = ( + ("closed", "The bug is no longer relevant."), + ("fixed", "The bug should no longer occur."), + ("wontfix","It's not a bug, it's a feature.")) + + +### Convert the description tuples to more useful formats + +severity_values = () +severity_description = {} +severity_index = {} +def load_severities(severity_def): + global severity_values + global severity_description + global severity_index + if severity_def == None: + return + severity_values = tuple([val for val,description in severity_def]) + severity_description = dict(severity_def) + severity_index = {} + for i,severity in enumerate(severity_values): + severity_index[severity] = i +load_severities(severity_def) + +active_status_values = [] +inactive_status_values = [] +status_values = [] +status_description = {} +status_index = {} +def load_status(active_status_def, inactive_status_def): + global active_status_values + global inactive_status_values + global status_values + global status_description + global status_index + if active_status_def == None: + active_status_def = globals()["active_status_def"] + if inactive_status_def == None: + inactive_status_def = globals()["inactive_status_def"] + active_status_values = tuple([val for val,description in active_status_def]) + inactive_status_values = tuple([val for val,description in inactive_status_def]) + status_values = active_status_values + inactive_status_values + status_description = dict(tuple(active_status_def) + tuple(inactive_status_def)) + status_index = {} + for i,status in enumerate(status_values): + status_index[status] = i +load_status(active_status_def, inactive_status_def) + + +class Bug(settings_object.SavedSettingsObject): + """ + >>> b = Bug() + >>> print b.status + open + >>> print b.severity + minor + + There are two formats for time, int and string. Setting either + one will adjust the other appropriately. The string form is the + one stored in the bug's settings file on disk. + >>> print type(b.time) + + >>> print type(b.time_string) + + >>> b.time = 0 + >>> print b.time_string + Thu, 01 Jan 1970 00:00:00 +0000 + >>> b.time_string="Thu, 01 Jan 1970 00:01:00 +0000" + >>> b.time + 60 + >>> print b.settings["time"] + Thu, 01 Jan 1970 00:01:00 +0000 + """ + settings_properties = [] + required_saved_properties = [] + _prop_save_settings = settings_object.prop_save_settings + _prop_load_settings = settings_object.prop_load_settings + def _versioned_property(settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"]=required_saved_properties + return settings_object.versioned_property(**kwargs) + + @_versioned_property(name="severity", + doc="A measure of the bug's importance", + default="minor", + check_fn=lambda s: s in severity_values, + require_save=True) + def severity(): return {} + + @_versioned_property(name="status", + doc="The bug's current status", + default="open", + check_fn=lambda s: s in status_values, + require_save=True) + def status(): return {} + + @property + def active(self): + return self.status in active_status_values + + @_versioned_property(name="target", + doc="The deadline for fixing this bug") + def target(): return {} + + @_versioned_property(name="creator", + doc="The user who entered the bug into the system") + def creator(): return {} + + @_versioned_property(name="reporter", + doc="The user who reported the bug") + def reporter(): return {} + + @_versioned_property(name="assigned", + doc="The developer in charge of the bug") + def assigned(): return {} + + @_versioned_property(name="time", + doc="An RFC 2822 timestamp for bug creation") + def time_string(): return {} + + def _get_time(self): + if self.time_string == None: + return None + return utility.str_to_time(self.time_string) + def _set_time(self, value): + self.time_string = utility.time_to_str(value) + time = property(fget=_get_time, + fset=_set_time, + doc="An integer version of .time_string") + + def _extra_strings_check_fn(value): + return utility.iterable_full_of_strings(value, \ + alternative=settings_object.EMPTY) + def _extra_strings_change_hook(self, old, new): + self.extra_strings.sort() # to make merging easier + self._prop_save_settings(old, new) + @_versioned_property(name="extra_strings", + doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/.py.", + default=[], + check_fn=_extra_strings_check_fn, + change_hook=_extra_strings_change_hook, + mutable=True) + def extra_strings(): return {} + + @_versioned_property(name="summary", + doc="A one-line bug description") + def summary(): return {} + + def _get_comment_root(self, load_full=False): + if self.sync_with_disk: + return comment.loadComments(self, load_full=load_full) + else: + return comment.Comment(self, uuid=comment.INVALID_UUID) + + @Property + @cached_property(generator=_get_comment_root) + @local_property("comment_root") + @doc_property(doc="The trunk of the comment tree") + def comment_root(): return {} + + def _get_rcs(self): + if hasattr(self.bugdir, "rcs"): + return self.bugdir.rcs + + @Property + @cached_property(generator=_get_rcs) + @local_property("rcs") + @doc_property(doc="A revision control system instance.") + def rcs(): return {} + + def __init__(self, bugdir=None, uuid=None, from_disk=False, + load_comments=False, summary=None): + settings_object.SavedSettingsObject.__init__(self) + self.bugdir = bugdir + self.uuid = uuid + if from_disk == True: + self.sync_with_disk = True + else: + self.sync_with_disk = False + if uuid == None: + self.uuid = uuid_gen() + self.time = int(time.time()) # only save to second precision + if self.rcs != None: + self.creator = self.rcs.get_user_id() + self.summary = summary + + def __repr__(self): + return "Bug(uuid=%r)" % self.uuid + + def set_sync_with_disk(self, value): + self.sync_with_disk = value + for comment in self.comments(): + comment.set_sync_with_disk(value) + + def _setting_attr_string(self, setting): + value = getattr(self, setting) + if value == None: + return "" + return str(value) + + def xml(self, show_comments=False): + if self.bugdir == None: + shortname = self.uuid + else: + shortname = self.bugdir.bug_shortname(self) + + if self.time == None: + timestring = "" + else: + timestring = utility.time_to_str(self.time) + + info = [("uuid", self.uuid), + ("short-name", shortname), + ("severity", self.severity), + ("status", self.status), + ("assigned", self.assigned), + ("target", self.target), + ("reporter", self.reporter), + ("creator", self.creator), + ("created", timestring), + ("summary", self.summary)] + ret = '\n' + for (k,v) in info: + if v is not None: + ret += ' <%s>%s\n' % (k,xml.sax.saxutils.escape(v),k) + for estr in self.extra_strings: + ret += ' %s\n' % estr + if show_comments == True: + comout = self.comment_root.xml_thread(auto_name_map=True, + bug_shortname=shortname) + if len(comout) > 0: + ret += comout+'\n' + ret += '' + return ret + + def string(self, shortlist=False, show_comments=False): + if self.bugdir == None: + shortname = self.uuid + else: + shortname = self.bugdir.bug_shortname(self) + if shortlist == False: + if self.time == None: + timestring = "" + else: + htime = utility.handy_time(self.time) + timestring = "%s (%s)" % (htime, self.time_string) + info = [("ID", self.uuid), + ("Short name", shortname), + ("Severity", self.severity), + ("Status", self.status), + ("Assigned", self._setting_attr_string("assigned")), + ("Target", self._setting_attr_string("target")), + ("Reporter", self._setting_attr_string("reporter")), + ("Creator", self._setting_attr_string("creator")), + ("Created", timestring)] + longest_key_len = max([len(k) for k,v in info]) + infolines = [" %*s : %s\n" %(longest_key_len,k,v) for k,v in info] + bugout = "".join(infolines) + "%s" % self.summary.rstrip('\n') + else: + statuschar = self.status[0] + severitychar = self.severity[0] + chars = "%c%c" % (statuschar, severitychar) + bugout = "%s:%s: %s" % (shortname,chars,self.summary.rstrip('\n')) + + if show_comments == True: + # take advantage of the string_thread(auto_name_map=True) + # SIDE-EFFECT of sorting by comment time. + comout = self.comment_root.string_thread(flatten=False, + auto_name_map=True, + bug_shortname=shortname) + output = bugout + '\n' + comout.rstrip('\n') + else : + output = bugout + return output + + def __str__(self): + return self.string(shortlist=True) + + def __cmp__(self, other): + return cmp_full(self, other) + + def get_path(self, name=None): + my_dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid) + if name is None: + return my_dir + assert name in ["values", "comments"] + return os.path.join(my_dir, name) + + def load_settings(self): + self.settings = mapfile.map_load(self.rcs, self.get_path("values")) + self._setup_saved_settings() + + def load_comments(self, load_full=True): + if load_full == True: + # Force a complete load of the whole comment tree + self.comment_root = self._get_comment_root(load_full=True) + else: + # Setup for fresh lazy-loading. Clear _comment_root, so + # _get_comment_root returns a fresh version. Turn of + # syncing temporarily so we don't write our blank comment + # tree to disk. + self.sync_with_disk = False + self.comment_root = None + self.sync_with_disk = True + + def save_settings(self): + assert self.summary != None, "Can't save blank bug" + + self.rcs.mkdir(self.get_path()) + path = self.get_path("values") + mapfile.map_save(self.rcs, path, self._get_saved_settings()) + + def save(self): + """ + Save any loaded contents to disk. Because of lazy loading of + comments, this is actually not too inefficient. + + However, if self.sync_with_disk = True, then any changes are + automatically written to disk as soon as they happen, so + calling this method will just waste time (unless something + else has been messing with your on-disk files). + """ + self.save_settings() + if len(self.comment_root) > 0: + comment.saveComments(self) + + def remove(self): + self.comment_root.remove() + path = self.get_path() + self.rcs.recursive_remove(path) + + def comments(self): + for comment in self.comment_root.traverse(): + yield comment + + def new_comment(self, body=None): + comm = self.comment_root.new_reply(body=body) + return comm + + def comment_from_shortname(self, shortname, *args, **kwargs): + return self.comment_root.comment_from_shortname(shortname, + *args, **kwargs) + + def comment_from_uuid(self, uuid): + return self.comment_root.comment_from_uuid(uuid) + + def comment_shortnames(self, shortname=None): + """ + SIDE-EFFECT : Comment.comment_shortnames will sort the comment + tree by comment.time + """ + for id, comment in self.comment_root.comment_shortnames(shortname): + yield (id, comment) + + +# The general rule for bug sorting is that "more important" bugs are +# less than "less important" bugs. This way sorting a list of bugs +# will put the most important bugs first in the list. When relative +# importance is unclear, the sorting follows some arbitrary convention +# (i.e. dictionary order). + +def cmp_severity(bug_1, bug_2): + """ + Compare the severity levels of two bugs, with more severe bugs + comparing as less. + >>> bugA = Bug() + >>> bugB = Bug() + >>> bugA.severity = bugB.severity = "wishlist" + >>> cmp_severity(bugA, bugB) == 0 + True + >>> bugB.severity = "minor" + >>> cmp_severity(bugA, bugB) > 0 + True + >>> bugA.severity = "critical" + >>> cmp_severity(bugA, bugB) < 0 + True + """ + if not hasattr(bug_2, "severity") : + return 1 + return -cmp(severity_index[bug_1.severity], severity_index[bug_2.severity]) + +def cmp_status(bug_1, bug_2): + """ + Compare the status levels of two bugs, with more 'open' bugs + comparing as less. + >>> bugA = Bug() + >>> bugB = Bug() + >>> bugA.status = bugB.status = "open" + >>> cmp_status(bugA, bugB) == 0 + True + >>> bugB.status = "closed" + >>> cmp_status(bugA, bugB) < 0 + True + >>> bugA.status = "fixed" + >>> cmp_status(bugA, bugB) > 0 + True + """ + if not hasattr(bug_2, "status") : + return 1 + val_2 = status_index[bug_2.status] + return cmp(status_index[bug_1.status], status_index[bug_2.status]) + +def cmp_attr(bug_1, bug_2, attr, invert=False): + """ + Compare a general attribute between two bugs using the conventional + comparison rule for that attribute type. If invert == True, sort + *against* that convention. + >>> attr="severity" + >>> bugA = Bug() + >>> bugB = Bug() + >>> bugA.severity = "critical" + >>> bugB.severity = "wishlist" + >>> cmp_attr(bugA, bugB, attr) < 0 + True + >>> cmp_attr(bugA, bugB, attr, invert=True) > 0 + True + >>> bugB.severity = "critical" + >>> cmp_attr(bugA, bugB, attr) == 0 + True + """ + if not hasattr(bug_2, attr) : + return 1 + val_1 = getattr(bug_1, attr) + val_2 = getattr(bug_2, attr) + if val_1 == None: val_1 = None + if val_2 == None: val_2 = None + + if invert == True : + return -cmp(val_1, val_2) + else : + return cmp(val_1, val_2) + +# alphabetical rankings (a < z) +cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator") +cmp_assigned = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "assigned") +# chronological rankings (newer < older) +cmp_time = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "time", invert=True) + +def cmp_comments(bug_1, bug_2): + """ + Compare two bugs' comments lists. Doesn't load any new comments, + so you should call each bug's .load_comments() first if you want a + full comparison. + """ + comms_1 = sorted(bug_1.comments(), key = lambda comm : comm.uuid) + comms_2 = sorted(bug_2.comments(), key = lambda comm : comm.uuid) + result = cmp(len(comms_1), len(comms_2)) + if result != 0: + return result + for c_1,c_2 in zip(comms_1, comms_2): + result = cmp(c_1, c_2) + if result != 0: + return result + return 0 + +DEFAULT_CMP_FULL_CMP_LIST = \ + (cmp_status,cmp_severity,cmp_assigned,cmp_time,cmp_creator,cmp_comments) + +class BugCompoundComparator (object): + def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST): + self.cmp_list = cmp_list + def __call__(self, bug_1, bug_2): + for comparison in self.cmp_list : + val = comparison(bug_1, bug_2) + if val != 0 : + return val + return 0 + +cmp_full = BugCompoundComparator() + + +# define some bonus cmp_* functions +def cmp_last_modified(bug_1, bug_2): + """ + Like cmp_time(), but use most recent comment instead of bug + creation for the timestamp. + """ + def last_modified(bug): + time = bug.time + for comment in bug.comment_root.traverse(): + if comment.time > time: + time = comment.time + return time + val_1 = last_modified(bug_1) + val_2 = last_modified(bug_2) + return -cmp(val_1, val_2) + + +suite = doctest.DocTestSuite() diff --git a/libbe/bugdir.py b/libbe/bugdir.py index e69de29..6e020ee 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -0,0 +1,676 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Alexander Belchenko +# Chris Ball +# Oleg Romanyshyn +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import os +import os.path +import errno +import time +import copy +import unittest +import doctest + +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, fn_checked_property, \ + cached_property, primed_property, change_hook_property, \ + settings_property +import settings_object +import mapfile +import bug +import rcs +import encoding +import utility + + +class NoBugDir(Exception): + def __init__(self, path): + msg = "The directory \"%s\" has no bug directory." % path + Exception.__init__(self, msg) + self.path = path + +class NoRootEntry(Exception): + def __init__(self, path): + self.path = path + Exception.__init__(self, "Specified root does not exist: %s" % path) + +class AlreadyInitialized(Exception): + def __init__(self, path): + self.path = path + Exception.__init__(self, + "Specified root is already initialized: %s" % path) + +class MultipleBugMatches(ValueError): + def __init__(self, shortname, matches): + msg = ("More than one bug matches %s. " + "Please be more specific.\n%s" % (shortname, matches)) + ValueError.__init__(self, msg) + self.shortname = shortname + self.matches = matches + + +TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n" + + +class BugDir (list, settings_object.SavedSettingsObject): + """ + Sink to existing root + ====================== + + Consider the following usage case: + You have a bug directory rooted in + /path/to/source + by which I mean the '.be' directory is at + /path/to/source/.be + However, you're of in some subdirectory like + /path/to/source/GUI/testing + and you want to comment on a bug. Setting sink_to_root=True wen + you initialize your BugDir will cause it to search for the '.be' + file in the ancestors of the path you passed in as 'root'. + /path/to/source/GUI/testing/.be miss + /path/to/source/GUI/.be miss + /path/to/source/.be hit! + So it still roots itself appropriately without much work for you. + + File-system access + ================== + + BugDirs live completely in memory when .sync_with_disk is False. + This is the default configuration setup by BugDir(from_disk=False). + If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then + any changes to the BugDir will be immediately written to disk. + + If you want to change .sync_with_disk, we suggest you use + .set_sync_with_disk(), which propogates the new setting through to + all bugs/comments/etc. that have been loaded into memory. If + you've been living in memory and want to move to + .sync_with_disk==True, but you're not sure if anything has been + changed in memoryy, a call to save() is a safe move. + + Regardless of .sync_with_disk, a call to .save() will write out + all the contents that the BugDir instance has loaded into memory. + If sync_with_disk has been True over the course of all interesting + changes, this .save() call will be a waste of time. + + The BugDir will only load information from the file system when it + loads new bugs/comments that it doesn't already have in memory, or + when it explicitly asked to do so (e.g. .load() or + __init__(from_disk=True)). + + Allow RCS initialization + ======================== + + This one is for testing purposes. Setting it to True allows the + BugDir to search for an installed RCS backend and initialize it in + the root directory. This is a convenience option for supporting + tests of versioning functionality (e.g. .duplicate_bugdir). + + Disable encoding manipulation + ============================= + + This one is for testing purposed. You might have non-ASCII + Unicode in your bugs, comments, files, etc. BugDir instances try + and support your preferred encoding scheme (e.g. "utf-8") when + dealing with stream and file input/output. For stream output, + this involves replacing sys.stdout and sys.stderr + (libbe.encode.set_IO_stream_encodings). However this messes up + doctest's output catching. In order to support doctest tests + using BugDirs, set manipulate_encodings=False, and stick to ASCII + in your tests. + """ + + settings_properties = [] + required_saved_properties = [] + _prop_save_settings = settings_object.prop_save_settings + _prop_load_settings = settings_object.prop_load_settings + def _versioned_property(settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"]=required_saved_properties + return settings_object.versioned_property(**kwargs) + + @_versioned_property(name="target", + doc="The current project development target.") + def target(): return {} + + def _guess_encoding(self): + return encoding.get_encoding() + def _check_encoding(value): + if value != None: + return encoding.known_encoding(value) + def _setup_encoding(self, new_encoding): + # change hook called before generator. + if new_encoding not in [None, settings_object.EMPTY]: + if self._manipulate_encodings == True: + encoding.set_IO_stream_encodings(new_encoding) + def _set_encoding(self, old_encoding, new_encoding): + self._setup_encoding(new_encoding) + self._prop_save_settings(old_encoding, new_encoding) + + @_versioned_property(name="encoding", + doc="""The default input/output encoding to use (e.g. "utf-8").""", + change_hook=_set_encoding, + generator=_guess_encoding, + check_fn=_check_encoding) + def encoding(): return {} + + def _setup_user_id(self, user_id): + self.rcs.user_id = user_id + def _guess_user_id(self): + return self.rcs.get_user_id() + def _set_user_id(self, old_user_id, new_user_id): + self._setup_user_id(new_user_id) + self._prop_save_settings(old_user_id, new_user_id) + + @_versioned_property(name="user_id", + doc= +"""The user's prefered name, e.g. 'John Doe '. Note +that the Arch RCS backend *enforces* ids with this format.""", + change_hook=_set_user_id, + generator=_guess_user_id) + def user_id(): return {} + + @_versioned_property(name="default_assignee", + doc= +"""The default assignee for new bugs e.g. 'John Doe '.""") + def default_assignee(): return {} + + @_versioned_property(name="rcs_name", + doc="""The name of the current RCS. Kept seperate to make saving/loading +settings easy. Don't set this attribute. Set .rcs instead, and +.rcs_name will be automatically adjusted.""", + default="None", + allowed=["None", "Arch", "bzr", "darcs", "git", "hg"]) + def rcs_name(): return {} + + def _get_rcs(self, rcs_name=None): + """Get and root a new revision control system""" + if rcs_name == None: + rcs_name = self.rcs_name + new_rcs = rcs.rcs_by_name(rcs_name) + self._change_rcs(None, new_rcs) + return new_rcs + def _change_rcs(self, old_rcs, new_rcs): + new_rcs.encoding = self.encoding + new_rcs.root(self.root) + self.rcs_name = new_rcs.name + + @Property + @change_hook_property(hook=_change_rcs) + @cached_property(generator=_get_rcs) + @local_property("rcs") + @doc_property(doc="A revision control system instance.") + def rcs(): return {} + + def _bug_map_gen(self): + map = {} + for bug in self: + map[bug.uuid] = bug + for uuid in self.list_uuids(): + if uuid not in map: + map[uuid] = None + self._bug_map_value = map # ._bug_map_value used by @local_property + + def _extra_strings_check_fn(value): + return utility.iterable_full_of_strings(value, \ + alternative=settings_object.EMPTY) + def _extra_strings_change_hook(self, old, new): + self.extra_strings.sort() # to make merging easier + self._prop_save_settings(old, new) + @_versioned_property(name="extra_strings", + doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/.py.", + default=[], + check_fn=_extra_strings_check_fn, + change_hook=_extra_strings_change_hook, + mutable=True) + def extra_strings(): return {} + + @Property + @primed_property(primer=_bug_map_gen) + @local_property("bug_map") + @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.") + def _bug_map(): return {} + + def _setup_severities(self, severities): + if severities not in [None, settings_object.EMPTY]: + bug.load_severities(severities) + def _set_severities(self, old_severities, new_severities): + self._setup_severities(new_severities) + self._prop_save_settings(old_severities, new_severities) + @_versioned_property(name="severities", + doc="The allowed bug severities and their descriptions.", + change_hook=_set_severities) + def severities(): return {} + + def _setup_status(self, active_status, inactive_status): + bug.load_status(active_status, inactive_status) + def _set_active_status(self, old_active_status, new_active_status): + self._setup_status(new_active_status, self.inactive_status) + self._prop_save_settings(old_active_status, new_active_status) + @_versioned_property(name="active_status", + doc="The allowed active bug states and their descriptions.", + change_hook=_set_active_status) + def active_status(): return {} + + def _set_inactive_status(self, old_inactive_status, new_inactive_status): + self._setup_status(self.active_status, new_inactive_status) + self._prop_save_settings(old_inactive_status, new_inactive_status) + @_versioned_property(name="inactive_status", + doc="The allowed inactive bug states and their descriptions.", + change_hook=_set_inactive_status) + def inactive_status(): return {} + + + def __init__(self, root=None, sink_to_existing_root=True, + assert_new_BugDir=False, allow_rcs_init=False, + manipulate_encodings=True, + from_disk=False, rcs=None): + list.__init__(self) + settings_object.SavedSettingsObject.__init__(self) + self._manipulate_encodings = manipulate_encodings + if root == None: + root = os.getcwd() + if sink_to_existing_root == True: + self.root = self._find_root(root) + else: + if not os.path.exists(root): + raise NoRootEntry(root) + self.root = root + # get a temporary rcs until we've loaded settings + self.sync_with_disk = False + self.rcs = self._guess_rcs() + + if from_disk == True: + self.sync_with_disk = True + self.load() + else: + self.sync_with_disk = False + if assert_new_BugDir == True: + if os.path.exists(self.get_path()): + raise AlreadyInitialized, self.get_path() + if rcs == None: + rcs = self._guess_rcs(allow_rcs_init) + self.rcs = rcs + self._setup_user_id(self.user_id) + + def set_sync_with_disk(self, value): + self.sync_with_disk = value + for bug in self: + bug.set_sync_with_disk(value) + + def _find_root(self, path): + """ + Search for an existing bug database dir and it's ancestors and + return a BugDir rooted there. + """ + if not os.path.exists(path): + raise NoRootEntry(path) + versionfile=utility.search_parent_directories(path, + os.path.join(".be", "version")) + if versionfile != None: + beroot = os.path.dirname(versionfile) + root = os.path.dirname(beroot) + return root + else: + beroot = utility.search_parent_directories(path, ".be") + if beroot == None: + raise NoBugDir(path) + return beroot + + def get_version(self, path=None, use_none_rcs=False): + if use_none_rcs == True: + RCS = rcs.rcs_by_name("None") + RCS.root(self.root) + RCS.encoding = encoding.get_encoding() + else: + RCS = self.rcs + + if path == None: + path = self.get_path("version") + tree_version = RCS.get_file_contents(path) + return tree_version + + def set_version(self): + self.rcs.mkdir(self.get_path()) + self.rcs.set_file_contents(self.get_path("version"), + TREE_VERSION_STRING) + + def get_path(self, *args): + my_dir = os.path.join(self.root, ".be") + if len(args) == 0: + return my_dir + assert args[0] in ["version", "settings", "bugs"], str(args) + return os.path.join(my_dir, *args) + + def _guess_rcs(self, allow_rcs_init=False): + deepdir = self.get_path() + if not os.path.exists(deepdir): + deepdir = os.path.dirname(deepdir) + new_rcs = rcs.detect_rcs(deepdir) + install = False + if new_rcs.name == "None": + if allow_rcs_init == True: + new_rcs = rcs.installed_rcs() + new_rcs.init(self.root) + return new_rcs + + def load(self): + version = self.get_version(use_none_rcs=True) + if version != TREE_VERSION_STRING: + raise NotImplementedError, \ + "BugDir cannot handle version '%s' yet." % version + else: + if not os.path.exists(self.get_path()): + raise NoBugDir(self.get_path()) + self.load_settings() + + self.rcs = rcs.rcs_by_name(self.rcs_name) + self._setup_user_id(self.user_id) + + def load_all_bugs(self): + "Warning: this could take a while." + self._clear_bugs() + for uuid in self.list_uuids(): + self._load_bug(uuid) + + def save(self): + """ + Save any loaded contents to disk. Because of lazy loading of + bugs and comments, this is actually not too inefficient. + + However, if self.sync_with_disk = True, then any changes are + automatically written to disk as soon as they happen, so + calling this method will just waste time (unless something + else has been messing with your on-disk files). + """ + self.set_version() + self.save_settings() + for bug in self: + bug.save() + + def load_settings(self): + self.settings = self._get_settings(self.get_path("settings")) + self._setup_saved_settings() + self._setup_user_id(self.user_id) + self._setup_encoding(self.encoding) + self._setup_severities(self.severities) + self._setup_status(self.active_status, self.inactive_status) + + def _get_settings(self, settings_path): + allow_no_rcs = not self.rcs.path_in_root(settings_path) + # allow_no_rcs=True should only be for the special case of + # configuring duplicate bugdir settings + + try: + settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs) + except rcs.NoSuchFile: + settings = {"rcs_name": "None"} + return settings + + def save_settings(self): + settings = self._get_saved_settings() + self._save_settings(self.get_path("settings"), settings) + + def _save_settings(self, settings_path, settings): + allow_no_rcs = not self.rcs.path_in_root(settings_path) + # allow_no_rcs=True should only be for the special case of + # configuring duplicate bugdir settings + self.rcs.mkdir(self.get_path(), allow_no_rcs) + mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs) + + def duplicate_bugdir(self, revision): + duplicate_path = self.rcs.duplicate_repo(revision) + + # setup revision RCS as None, since the duplicate may not be + # initialized for versioning + duplicate_settings_path = os.path.join(duplicate_path, + ".be", "settings") + duplicate_settings = self._get_settings(duplicate_settings_path) + if "rcs_name" in duplicate_settings: + duplicate_settings["rcs_name"] = "None" + duplicate_settings["user_id"] = self.user_id + if "disabled" in bug.status_values: + # Hack to support old versions of BE bugs + duplicate_settings["inactive_status"] = self.inactive_status + self._save_settings(duplicate_settings_path, duplicate_settings) + + return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings) + + def remove_duplicate_bugdir(self): + self.rcs.remove_duplicate_repo() + + def list_uuids(self): + uuids = [] + if os.path.exists(self.get_path()): + # list the uuids on disk + for uuid in os.listdir(self.get_path("bugs")): + if not (uuid.startswith('.')): + uuids.append(uuid) + yield uuid + # and the ones that are still just in memory + for bug in self: + if bug.uuid not in uuids: + uuids.append(bug.uuid) + yield bug.uuid + + def _clear_bugs(self): + while len(self) > 0: + self.pop() + self._bug_map_gen() + + def _load_bug(self, uuid): + bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True) + self.append(bg) + self._bug_map_gen() + return bg + + def new_bug(self, uuid=None, summary=None): + bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary) + bg.set_sync_with_disk(self.sync_with_disk) + if bg.sync_with_disk == True: + bg.save() + self.append(bg) + self._bug_map_gen() + return bg + + def remove_bug(self, bug): + self.remove(bug) + bug.remove() + + def bug_shortname(self, bug): + """ + Generate short names from uuids. Picks the minimum number of + characters (>=3) from the beginning of the uuid such that the + short names are unique. + + Obviously, as the number of bugs in the database grows, these + short names will cease to be unique. The complete uuid should be + used for long term reference. + """ + chars = 3 + for uuid in self._bug_map.keys(): + if bug.uuid == uuid: + continue + while (bug.uuid[:chars] == uuid[:chars]): + chars+=1 + return bug.uuid[:chars] + + def bug_from_shortname(self, shortname): + """ + >>> bd = simple_bug_dir() + >>> bug_a = bd.bug_from_shortname('a') + >>> print type(bug_a) + + >>> print bug_a + a:om: Bug A + """ + matches = [] + self._bug_map_gen() + for uuid in self._bug_map.keys(): + if uuid.startswith(shortname): + matches.append(uuid) + if len(matches) > 1: + raise MultipleBugMatches(shortname, matches) + if len(matches) == 1: + return self.bug_from_uuid(matches[0]) + raise KeyError("No bug matches %s" % shortname) + + def bug_from_uuid(self, uuid): + if not self.has_bug(uuid): + raise KeyError("No bug matches %s\n bug map: %s\n root: %s" \ + % (uuid, self._bug_map, self.root)) + if self._bug_map[uuid] == None: + self._load_bug(uuid) + return self._bug_map[uuid] + + def has_bug(self, bug_uuid): + if bug_uuid not in self._bug_map: + self._bug_map_gen() + if bug_uuid not in self._bug_map: + return False + return True + + +def simple_bug_dir(): + """ + For testing + >>> bugdir = simple_bug_dir() + >>> ls = list(bugdir.list_uuids()) + >>> ls.sort() + >>> print ls + ['a', 'b'] + """ + dir = utility.Dir() + assert os.path.exists(dir.path) + bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True, + manipulate_encodings=False) + bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir. + bug_a = bugdir.new_bug("a", summary="Bug A") + bug_a.creator = "John Doe " + bug_a.time = 0 + bug_b = bugdir.new_bug("b", summary="Bug B") + bug_b.creator = "Jane Doe " + bug_b.time = 0 + bug_b.status = "closed" + bugdir.save() + return bugdir + + +class BugDirTestCase(unittest.TestCase): + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + def setUp(self): + self.dir = utility.Dir() + self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, + allow_rcs_init=True) + self.rcs = self.bugdir.rcs + def tearDown(self): + self.rcs.cleanup() + self.dir.cleanup() + def fullPath(self, path): + return os.path.join(self.dir.path, path) + def assertPathExists(self, path): + fullpath = self.fullPath(path) + self.failUnless(os.path.exists(fullpath)==True, + "path %s does not exist" % fullpath) + self.assertRaises(AlreadyInitialized, BugDir, + self.dir.path, assertNewBugDir=True) + def versionTest(self): + if self.rcs.versioned == False: + return + original = self.bugdir.rcs.commit("Began versioning") + bugA = self.bugdir.bug_from_uuid("a") + bugA.status = "fixed" + self.bugdir.save() + new = self.rcs.commit("Fixed bug a") + dupdir = self.bugdir.duplicate_bugdir(original) + self.failUnless(dupdir.root != self.bugdir.root, + "%s, %s" % (dupdir.root, self.bugdir.root)) + bugAorig = dupdir.bug_from_uuid("a") + self.failUnless(bugA != bugAorig, + "\n%s\n%s" % (bugA.string(), bugAorig.string())) + bugAorig.status = "fixed" + self.failUnless(bug.cmp_status(bugA, bugAorig)==0, + "%s, %s" % (bugA.status, bugAorig.status)) + self.failUnless(bug.cmp_severity(bugA, bugAorig)==0, + "%s, %s" % (bugA.severity, bugAorig.severity)) + self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0, + "%s, %s" % (bugA.assigned, bugAorig.assigned)) + self.failUnless(bug.cmp_time(bugA, bugAorig)==0, + "%s, %s" % (bugA.time, bugAorig.time)) + self.failUnless(bug.cmp_creator(bugA, bugAorig)==0, + "%s, %s" % (bugA.creator, bugAorig.creator)) + self.failUnless(bugA == bugAorig, + "\n%s\n%s" % (bugA.string(), bugAorig.string())) + self.bugdir.remove_duplicate_bugdir() + self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root)) + def testRun(self): + self.bugdir.new_bug(uuid="a", summary="Ant") + self.bugdir.new_bug(uuid="b", summary="Cockroach") + self.bugdir.new_bug(uuid="c", summary="Praying mantis") + length = len(self.bugdir) + self.failUnless(length == 3, "%d != 3 bugs" % length) + uuids = list(self.bugdir.list_uuids()) + self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids)) + self.failUnless(uuids == ["a","b","c"], str(uuids)) + bugA = self.bugdir.bug_from_uuid("a") + bugAprime = self.bugdir.bug_from_shortname("a") + self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime)) + self.bugdir.save() + self.versionTest() + def testComments(self, sync_with_disk=False): + if sync_with_disk == True: + self.bugdir.set_sync_with_disk(True) + self.bugdir.new_bug(uuid="a", summary="Ant") + bug = self.bugdir.bug_from_uuid("a") + comm = bug.comment_root + rep = comm.new_reply("Ants are small.") + rep.new_reply("And they have six legs.") + if sync_with_disk == False: + self.bugdir.save() + self.bugdir._clear_bugs() + bug = self.bugdir.bug_from_uuid("a") + bug.load_comments() + self.failUnless(len(bug.comment_root)==1, len(bug.comment_root)) + for index,comment in enumerate(bug.comments()): + if index == 0: + repLoaded = comment + self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid) + self.failUnless(comment.sync_with_disk == True, + comment.sync_with_disk) + #load_settings() + self.failUnless(comment.content_type == "text/plain", + comment.content_type) + self.failUnless(repLoaded.settings["Content-type"]=="text/plain", + repLoaded.settings) + self.failUnless(repLoaded.body == "Ants are small.", + repLoaded.body) + elif index == 1: + self.failUnless(comment.in_reply_to == repLoaded.uuid, + repLoaded.uuid) + self.failUnless(comment.body == "And they have six legs.", + comment.body) + else: + self.failIf(True, "Invalid comment: %d\n%s" % (index, comment)) + def testSyncedComments(self): + self.testComments(sync_with_disk=True) + +unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase) +suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()]) diff --git a/libbe/bzr.py b/libbe/bzr.py index e69de29..d7cd1e5 100644 --- a/libbe/bzr.py +++ b/libbe/bzr.py @@ -0,0 +1,101 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Ben Finney +# Marien Zwart +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import os +import re +import sys +import unittest +import doctest + +import rcs +from rcs import RCS + +def new(): + return Bzr() + +class Bzr(RCS): + name = "bzr" + client = "bzr" + versioned = True + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + if self._u_search_parent_directories(path, ".bzr") != None : + return True + return False + def _rcs_root(self, path): + """Find the root of the deepest repository containing path.""" + status,output,error = self._u_invoke_client("root", path) + return output.rstrip('\n') + def _rcs_init(self, path): + self._u_invoke_client("init", directory=path) + def _rcs_get_user_id(self): + status,output,error = self._u_invoke_client("whoami") + return output.rstrip('\n') + def _rcs_set_user_id(self, value): + self._u_invoke_client("whoami", value) + def _rcs_add(self, path): + self._u_invoke_client("add", path) + def _rcs_remove(self, path): + # --force to also remove unversioned files. + self._u_invoke_client("remove", "--force", path) + def _rcs_update(self, path): + pass + def _rcs_get_file_contents(self, path, revision=None, binary=False): + if revision == None: + return RCS._rcs_get_file_contents(self, path, revision, binary=binary) + else: + status,output,error = \ + self._u_invoke_client("cat","-r",revision,path) + return output + def _rcs_duplicate_repo(self, directory, revision=None): + if revision == None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + self._u_invoke_client("branch", "--revision", revision, + ".", directory) + def _rcs_commit(self, commitfile, allow_empty=False): + args = ["commit", "--file", commitfile] + if allow_empty == True: + args.append("--unchanged") + status,output,error = self._u_invoke_client(*args) + else: + kwargs = {"expect":(0,3)} + status,output,error = self._u_invoke_client(*args, **kwargs) + if status != 0: + strings = ["ERROR: no changes to commit.", # bzr 1.3.1 + "ERROR: No changes to commit."] # bzr 1.15.1 + if self._u_any_in_string(strings, error) == True: + raise rcs.EmptyCommit() + else: + raise rcs.CommandError(args, status, error) + revision = None + revline = re.compile("Committed revision (.*)[.]") + match = revline.search(error) + assert match != None, output+error + assert len(match.groups()) == 1 + revision = match.groups()[0] + return revision + + +rcs.make_rcs_testcase_subclasses(Bzr, sys.modules[__name__]) + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/cmdutil.py b/libbe/cmdutil.py index e69de29..853a75a 100644 --- a/libbe/cmdutil.py +++ b/libbe/cmdutil.py @@ -0,0 +1,218 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Oleg Romanyshyn +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import glob +import optparse +import os +from textwrap import TextWrapper +from StringIO import StringIO +import sys +import doctest + +import bugdir +import plugin +import encoding + + +class UserError(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) + +class UnknownCommand(UserError): + def __init__(self, cmd): + Exception.__init__(self, "Unknown command '%s'" % cmd) + self.cmd = cmd + +class UsageError(Exception): + pass + +class GetHelp(Exception): + pass + +class GetCompletions(Exception): + def __init__(self, completions=[]): + msg = "Get allowed completions" + Exception.__init__(self, msg) + self.completions = completions + +def iter_commands(): + for name, module in plugin.iter_plugins("becommands"): + yield name.replace("_", "-"), module + +def get_command(command_name): + """Retrieves the module for a user command + + >>> try: + ... get_command("asdf") + ... except UnknownCommand, e: + ... print e + Unknown command 'asdf' + >>> repr(get_command("list")).startswith(" 0: + max_pos_arg = max(bugid_args.keys()) + else: + max_pos_arg = -1 + for pos,value in enumerate(args): + if value == "--complete": + filter = None + if pos in bugid_args: + filter = bugid_args[pos] + if pos > max_pos_arg and -1 in bugid_args: + filter = bugid_args[-1] + if filter != None: + bugshortnames = [] + try: + bd = bugdir.BugDir(from_disk=True, + manipulate_encodings=False) + bd.load_all_bugs() + bugs = [bug for bug in bd if filter(bug) == True] + bugshortnames = [bd.bug_shortname(bug) for bug in bugs] + except bugdir.NoBugDir: + pass + raise GetCompletions(bugshortnames) + raise GetCompletions() + +def complete_path(path): + """List possible path completions for path.""" + comps = glob.glob(path+"*") + glob.glob(path+"/*") + if len(comps) == 1 and os.path.isdir(comps[0]): + comps.extend(glob.glob(comps[0]+"/*")) + return comps + +def underlined(instring): + """Produces a version of a string that is underlined with '=' + + >>> underlined("Underlined String") + 'Underlined String\\n=================' + """ + + return "%s\n%s" % (instring, "="*len(instring)) + + +def _test(): + import doctest + import sys + doctest.testmod() + +if __name__ == "__main__": + _test() + +suite = doctest.DocTestSuite() diff --git a/libbe/comment.py b/libbe/comment.py index e69de29..3249e8b 100644 --- a/libbe/comment.py +++ b/libbe/comment.py @@ -0,0 +1,662 @@ +# Bugs Everywhere, a distributed bugtracker +# Copyright (C) 2008-2009 Chris Ball +# Thomas Habets +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import base64 +import os +import os.path +import sys +import time +import types +try: # import core module, Python >= 2.5 + from xml.etree import ElementTree +except ImportError: # look for non-core module + from elementtree import ElementTree +import xml.sax.saxutils +import doctest + +from beuuid import uuid_gen +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, cached_property, \ + primed_property, change_hook_property, settings_property +import settings_object +import mapfile +from tree import Tree +import utility + + +class InvalidShortname(KeyError): + def __init__(self, shortname, shortnames): + msg = "Invalid shortname %s\n%s" % (shortname, shortnames) + KeyError.__init__(self, msg) + self.shortname = shortname + self.shortnames = shortnames + +class InvalidXML(ValueError): + def __init__(self, element, comment): + msg = "Invalid comment xml: %s\n %s\n" \ + % (comment, ElementTree.tostring(element)) + ValueError.__init__(self, msg) + self.element = element + self.comment = comment + +class MissingReference(ValueError): + def __init__(self, comment): + msg = "Missing reference to %s" % (comment.in_reply_to) + ValueError.__init__(self, msg) + self.reference = comment.in_reply_to + self.comment = comment + +INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!" + +def list_to_root(comments, bug, root=None, + ignore_missing_references=False): + """ + Convert a raw list of comments to single root comment. We use a + dummy root comment by default, because there can be several + comment threads rooted on the same parent bug. To simplify + comment interaction, we condense these threads into a single + thread with a Comment dummy root. Can also be used to append + a list of subcomments to a non-dummy root comment, so long as + all the new comments are descendants of the root comment. + + No Comment method should use the dummy comment. + """ + root_comments = [] + uuid_map = {} + for comment in comments: + assert comment.uuid != None + uuid_map[comment.uuid] = comment + for comment in comments: + if comment.alt_id != None and comment.alt_id not in uuid_map: + uuid_map[comment.alt_id] = comment + if root == None: + root = Comment(bug, uuid=INVALID_UUID) + else: + uuid_map[root.uuid] = root + for comm in comments: + if comm.in_reply_to == INVALID_UUID: + comm.in_reply_to = None + rep = comm.in_reply_to + if rep == None or rep == bug.uuid: + root_comments.append(comm) + else: + parentUUID = comm.in_reply_to + try: + parent = uuid_map[parentUUID] + parent.add_reply(comm) + except KeyError, e: + if ignore_missing_references == True: + print >> sys.stderr, \ + "Ignoring missing reference to %s" % parentUUID + comm.in_reply_to = None + root_comments.append(comm) + else: + raise MissingReference(comm) + root.extend(root_comments) + return root + +def loadComments(bug, load_full=False): + """ + Set load_full=True when you want to load the comment completely + from disk *now*, rather than waiting and lazy loading as required. + """ + path = bug.get_path("comments") + if not os.path.isdir(path): + return Comment(bug, uuid=INVALID_UUID) + comments = [] + for uuid in os.listdir(path): + if uuid.startswith('.'): + continue + comm = Comment(bug, uuid, from_disk=True) + comm.set_sync_with_disk(bug.sync_with_disk) + if load_full == True: + comm.load_settings() + dummy = comm.body # force the body to load + comments.append(comm) + return list_to_root(comments, bug) + +def saveComments(bug): + for comment in bug.comment_root.traverse(): + comment.save() + + +class Comment(Tree, settings_object.SavedSettingsObject): + """ + >>> c = Comment() + >>> c.uuid != None + True + >>> c.uuid = "some-UUID" + >>> print c.content_type + text/plain + """ + + settings_properties = [] + required_saved_properties = [] + _prop_save_settings = settings_object.prop_save_settings + _prop_load_settings = settings_object.prop_load_settings + def _versioned_property(settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"]=required_saved_properties + return settings_object.versioned_property(**kwargs) + + @_versioned_property(name="Alt-id", + doc="Alternate ID for linking imported comments. Internally comments are linked (via In-reply-to) to the parent's UUID. However, these UUIDs are generated internally, so Alt-id is provided as a user-controlled linking target.") + def alt_id(): return {} + + @_versioned_property(name="From", + doc="The author of the comment") + def From(): return {} + + @_versioned_property(name="In-reply-to", + doc="UUID for parent comment or bug") + def in_reply_to(): return {} + + @_versioned_property(name="Content-type", + doc="Mime type for comment body", + default="text/plain", + require_save=True) + def content_type(): return {} + + @_versioned_property(name="Date", + doc="An RFC 2822 timestamp for comment creation") + def time_string(): return {} + + def _get_time(self): + if self.time_string == None: + return None + return utility.str_to_time(self.time_string) + def _set_time(self, value): + self.time_string = utility.time_to_str(value) + time = property(fget=_get_time, + fset=_set_time, + doc="An integer version of .time_string") + + def _get_comment_body(self): + if self.rcs != None and self.sync_with_disk == True: + import rcs + binary = not self.content_type.startswith("text/") + return self.rcs.get_file_contents(self.get_path("body"), binary=binary) + def _set_comment_body(self, old=None, new=None, force=False): + if (self.rcs != None and self.sync_with_disk == True) or force==True: + assert new != None, "Can't save empty comment" + binary = not self.content_type.startswith("text/") + self.rcs.set_file_contents(self.get_path("body"), new, binary=binary) + + @Property + @change_hook_property(hook=_set_comment_body) + @cached_property(generator=_get_comment_body) + @local_property("body") + @doc_property(doc="The meat of the comment") + def body(): return {} + + def _get_rcs(self): + if hasattr(self.bug, "rcs"): + return self.bug.rcs + + @Property + @cached_property(generator=_get_rcs) + @local_property("rcs") + @doc_property(doc="A revision control system instance.") + def rcs(): return {} + + def _extra_strings_check_fn(value): + return utility.iterable_full_of_strings(value, \ + alternative=settings_object.EMPTY) + def _extra_strings_change_hook(self, old, new): + self.extra_strings.sort() # to make merging easier + self._prop_save_settings(old, new) + @_versioned_property(name="extra_strings", + doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/.py.", + default=[], + check_fn=_extra_strings_check_fn, + change_hook=_extra_strings_change_hook, + mutable=True) + def extra_strings(): return {} + + def __init__(self, bug=None, uuid=None, from_disk=False, + in_reply_to=None, body=None): + """ + Set from_disk=True to load an old comment. + Set from_disk=False to create a new comment. + + The uuid option is required when from_disk==True. + + The in_reply_to and body options are only used if + from_disk==False (the default). When from_disk==True, they are + loaded from the bug database. + + in_reply_to should be the uuid string of the parent comment. + """ + Tree.__init__(self) + settings_object.SavedSettingsObject.__init__(self) + self.bug = bug + self.uuid = uuid + if from_disk == True: + self.sync_with_disk = True + else: + self.sync_with_disk = False + if uuid == None: + self.uuid = uuid_gen() + self.time = int(time.time()) # only save to second precision + if self.rcs != None: + self.From = self.rcs.get_user_id() + self.in_reply_to = in_reply_to + self.body = body + + def set_sync_with_disk(self, value): + self.sync_with_disk = True + + def traverse(self, *args, **kwargs): + """Avoid working with the possible dummy root comment""" + for comment in Tree.traverse(self, *args, **kwargs): + if comment.uuid == INVALID_UUID: + continue + yield comment + + def _setting_attr_string(self, setting): + value = getattr(self, setting) + if value == None: + return "" + return str(value) + + def xml(self, indent=0, shortname=None): + """ + >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") + >>> comm.uuid = "0123" + >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000" + >>> print comm.xml(indent=2, shortname="com-1") + + 0123 + com-1 + + Thu, 01 Jan 1970 00:00:00 +0000 + text/plain + Some + insightful + remarks + + """ + if shortname == None: + shortname = self.uuid + if self.content_type.startswith("text/"): + body = (self.body or "").rstrip('\n') + else: + maintype,subtype = self.content_type.split('/',1) + msg = email.mime.base.MIMEBase(maintype, subtype) + msg.set_payload(self.body or "") + email.encoders.encode_base64(msg) + body = base64.encodestring(self.body or "") + info = [("uuid", self.uuid), + ("alt-id", self.alt_id), + ("short-name", shortname), + ("in-reply-to", self.in_reply_to), + ("from", self._setting_attr_string("From")), + ("date", self.time_string), + ("content-type", self.content_type), + ("body", body)] + lines = [""] + for (k,v) in info: + if v != None: + lines.append(' <%s>%s' % (k,xml.sax.saxutils.escape(v),k)) + lines.append("") + istring = ' '*indent + sep = '\n' + istring + return istring + sep.join(lines).rstrip('\n') + + def from_xml(self, xml_string, verbose=True): + """ + Note: If alt-id is not given, translates any fields to + fields. + >>> commA = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") + >>> commA.uuid = "0123" + >>> commA.time_string = "Thu, 01 Jan 1970 00:00:00 +0000" + >>> xml = commA.xml(shortname="com-1") + >>> commB = Comment() + >>> commB.from_xml(xml) + >>> attrs=['uuid','alt_id','in_reply_to','From','time_string','content_type','body'] + >>> for attr in attrs: # doctest: +ELLIPSIS + ... if getattr(commB, attr) != getattr(commA, attr): + ... estr = "Mismatch on %s: '%s' should be '%s'" + ... args = (attr, getattr(commB, attr), getattr(commA, attr)) + ... print estr % args + Mismatch on uuid: '...' should be '0123' + Mismatch on alt_id: '0123' should be 'None' + >>> print commB.alt_id + 0123 + >>> commA.From + >>> commB.From + """ + if type(xml_string) == types.UnicodeType: + xml_string = xml_string.strip().encode("unicode_escape") + comment = ElementTree.XML(xml_string) + if comment.tag != "comment": + raise InvalidXML(comment, "root element must be ") + tags=['uuid','alt-id','in-reply-to','from','date','content-type','body'] + uuid = None + body = None + for child in comment.getchildren(): + if child.tag == "short-name": + pass + elif child.tag in tags: + if child.text == None or len(child.text) == 0: + text = settings_object.EMPTY + else: + text = xml.sax.saxutils.unescape(child.text) + text = unicode(text).decode("unicode_escape").strip() + if child.tag == "uuid": + uuid = text + continue # don't set the bug's uuid tag. + if child.tag == "body": + body = text + continue # don't set the bug's body yet. + elif child.tag == 'from': + attr_name = "From" + elif child.tag == 'date': + attr_name = 'time_string' + else: + attr_name = child.tag.replace('-','_') + setattr(self, attr_name, text) + elif verbose == True: + print >> sys.stderr, "Ignoring unknown tag %s in %s" \ + % (child.tag, comment.tag) + if self.alt_id == None and uuid not in [None, self.uuid]: + self.alt_id = uuid + if body != None: + if self.content_type.startswith("text/"): + self.body = body+"\n" # restore trailing newline + else: + self.body = base64.decodestring(body) + + def string(self, indent=0, shortname=None): + """ + >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") + >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000" + >>> print comm.string(indent=2, shortname="com-1") + --------- Comment --------- + Name: com-1 + From: + Date: Thu, 01 Jan 1970 00:00:00 +0000 + + Some + insightful + remarks + """ + if shortname == None: + shortname = self.uuid + lines = [] + lines.append("--------- Comment ---------") + lines.append("Name: %s" % shortname) + lines.append("From: %s" % (self._setting_attr_string("From"))) + lines.append("Date: %s" % self.time_string) + lines.append("") + if self.content_type.startswith("text/"): + lines.extend((self.body or "").splitlines()) + else: + lines.append("Content type %s not printable. Try XML output instead" % self.content_type) + + istring = ' '*indent + sep = '\n' + istring + return istring + sep.join(lines).rstrip('\n') + + def __str__(self): + """ + >>> comm = Comment(bug=None, body="Some insightful remarks") + >>> comm.uuid = "com-1" + >>> comm.time_string = "Thu, 20 Nov 2008 15:55:11 +0000" + >>> comm.From = "Jane Doe " + >>> print comm + --------- Comment --------- + Name: com-1 + From: Jane Doe + Date: Thu, 20 Nov 2008 15:55:11 +0000 + + Some insightful remarks + """ + return self.string() + + def get_path(self, name=None): + my_dir = os.path.join(self.bug.get_path("comments"), self.uuid) + if name is None: + return my_dir + assert name in ["values", "body"] + return os.path.join(my_dir, name) + + def load_settings(self): + self.settings = mapfile.map_load(self.rcs, self.get_path("values")) + self._setup_saved_settings() + + def save_settings(self): + self.rcs.mkdir(self.get_path()) + path = self.get_path("values") + mapfile.map_save(self.rcs, path, self._get_saved_settings()) + + def save(self): + """ + Save any loaded contents to disk. + + However, if self.sync_with_disk = True, then any changes are + automatically written to disk as soon as they happen, so + calling this method will just waste time (unless something + else has been messing with your on-disk files). + """ + assert self.body != None, "Can't save blank comment" + self.save_settings() + self._set_comment_body(new=self.body, force=True) + + def remove(self): + for comment in self.traverse(): + path = comment.get_path() + self.rcs.recursive_remove(path) + + def add_reply(self, reply, allow_time_inversion=False): + if self.uuid != INVALID_UUID: + reply.in_reply_to = self.uuid + self.append(reply) + #raise Exception, "adding reply \n%s\n%s" % (self, reply) + + def new_reply(self, body=None): + """ + >>> comm = Comment(bug=None, body="Some insightful remarks") + >>> repA = comm.new_reply("Critique original comment") + >>> repB = repA.new_reply("Begin flamewar :p") + >>> repB.in_reply_to == repA.uuid + True + """ + reply = Comment(self.bug, body=body) + if self.bug != None: + reply.set_sync_with_disk(self.bug.sync_with_disk) + if reply.sync_with_disk == True: + reply.save() + self.add_reply(reply) + return reply + + def string_thread(self, string_method_name="string", name_map={}, + indent=0, flatten=True, + auto_name_map=False, bug_shortname=None): + """ + Return a string displaying a thread of comments. + bug_shortname is only used if auto_name_map == True. + + string_method_name (defaults to "string") is the name of the + Comment method used to generate the output string for each + Comment in the thread. The method must take the arguments + indent and shortname. + + SIDE-EFFECT: if auto_name_map==True, calls comment_shortnames() + which will sort the tree by comment.time. Avoid by calling + name_map = {} + for shortname,comment in comm.comment_shortnames(bug_shortname): + name_map[comment.uuid] = shortname + comm.sort(key=lambda c : c.From) # your sort + comm.string_thread(name_map=name_map) + + >>> a = Comment(bug=None, uuid="a", body="Insightful remarks") + >>> a.time = utility.str_to_time("Thu, 20 Nov 2008 01:00:00 +0000") + >>> b = a.new_reply("Critique original comment") + >>> b.uuid = "b" + >>> b.time = utility.str_to_time("Thu, 20 Nov 2008 02:00:00 +0000") + >>> c = b.new_reply("Begin flamewar :p") + >>> c.uuid = "c" + >>> c.time = utility.str_to_time("Thu, 20 Nov 2008 03:00:00 +0000") + >>> d = a.new_reply("Useful examples") + >>> d.uuid = "d" + >>> d.time = utility.str_to_time("Thu, 20 Nov 2008 04:00:00 +0000") + >>> a.sort(key=lambda comm : comm.time) + >>> print a.string_thread(flatten=True) + --------- Comment --------- + Name: a + From: + Date: Thu, 20 Nov 2008 01:00:00 +0000 + + Insightful remarks + --------- Comment --------- + Name: b + From: + Date: Thu, 20 Nov 2008 02:00:00 +0000 + + Critique original comment + --------- Comment --------- + Name: c + From: + Date: Thu, 20 Nov 2008 03:00:00 +0000 + + Begin flamewar :p + --------- Comment --------- + Name: d + From: + Date: Thu, 20 Nov 2008 04:00:00 +0000 + + Useful examples + >>> print a.string_thread(auto_name_map=True, bug_shortname="bug-1") + --------- Comment --------- + Name: bug-1:1 + From: + Date: Thu, 20 Nov 2008 01:00:00 +0000 + + Insightful remarks + --------- Comment --------- + Name: bug-1:2 + From: + Date: Thu, 20 Nov 2008 02:00:00 +0000 + + Critique original comment + --------- Comment --------- + Name: bug-1:3 + From: + Date: Thu, 20 Nov 2008 03:00:00 +0000 + + Begin flamewar :p + --------- Comment --------- + Name: bug-1:4 + From: + Date: Thu, 20 Nov 2008 04:00:00 +0000 + + Useful examples + """ + if auto_name_map == True: + name_map = {} + for shortname,comment in self.comment_shortnames(bug_shortname): + name_map[comment.uuid] = shortname + stringlist = [] + for depth,comment in self.thread(flatten=flatten): + ind = 2*depth+indent + if comment.uuid in name_map: + sname = name_map[comment.uuid] + else: + sname = None + string_fn = getattr(comment, string_method_name) + stringlist.append(string_fn(indent=ind, shortname=sname)) + return '\n'.join(stringlist) + + def xml_thread(self, name_map={}, indent=0, + auto_name_map=False, bug_shortname=None): + return self.string_thread(string_method_name="xml", name_map=name_map, + indent=indent, auto_name_map=auto_name_map, + bug_shortname=bug_shortname) + + def comment_shortnames(self, bug_shortname=None): + """ + Iterate through (id, comment) pairs, in time order. + (This is a user-friendly id, not the comment uuid). + + SIDE-EFFECT : will sort the comment tree by comment.time + + >>> a = Comment(bug=None, uuid="a") + >>> b = a.new_reply() + >>> b.uuid = "b" + >>> c = b.new_reply() + >>> c.uuid = "c" + >>> d = a.new_reply() + >>> d.uuid = "d" + >>> for id,name in a.comment_shortnames("bug-1"): + ... print id, name.uuid + bug-1:1 a + bug-1:2 b + bug-1:3 c + bug-1:4 d + """ + if bug_shortname == None: + bug_shortname = "" + self.sort(key=lambda comm : comm.time) + for num,comment in enumerate(self.traverse()): + yield ("%s:%d" % (bug_shortname, num+1), comment) + + def comment_from_shortname(self, comment_shortname, *args, **kwargs): + """ + Use a comment shortname to look up a comment. + >>> a = Comment(bug=None, uuid="a") + >>> b = a.new_reply() + >>> b.uuid = "b" + >>> c = b.new_reply() + >>> c.uuid = "c" + >>> d = a.new_reply() + >>> d.uuid = "d" + >>> comm = a.comment_from_shortname("bug-1:3", bug_shortname="bug-1") + >>> id(comm) == id(c) + True + """ + for cur_name, comment in self.comment_shortnames(*args, **kwargs): + if comment_shortname == cur_name: + return comment + raise InvalidShortname(comment_shortname, + list(self.comment_shortnames(*args, **kwargs))) + + def comment_from_uuid(self, uuid): + """ + Use a comment shortname to look up a comment. + >>> a = Comment(bug=None, uuid="a") + >>> b = a.new_reply() + >>> b.uuid = "b" + >>> c = b.new_reply() + >>> c.uuid = "c" + >>> d = a.new_reply() + >>> d.uuid = "d" + >>> comm = a.comment_from_uuid("d") + >>> id(comm) == id(d) + True + """ + for comment in self.traverse(): + if comment.uuid == uuid: + return comment + raise KeyError(uuid) + +suite = doctest.DocTestSuite() diff --git a/libbe/config.py b/libbe/config.py index e69de29..5e343b9 100644 --- a/libbe/config.py +++ b/libbe/config.py @@ -0,0 +1,83 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import ConfigParser +import codecs +import locale +import os.path +import sys +import doctest + +default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding() + +def path(): + """Return the path to the per-user config file""" + return os.path.expanduser("~/.bugs_everywhere") + +def set_val(name, value, section="DEFAULT", encoding=None): + """Set a value in the per-user config file + + :param name: The name of the value to set + :param value: The new value to set (or None to delete the value) + :param section: The section to store the name/value in + """ + if encoding == None: + encoding = default_encoding + config = ConfigParser.ConfigParser() + if os.path.exists(path()) == False: # touch file or config + open(path(), "w").close() # read chokes on missing file + f = codecs.open(path(), "r", encoding) + config.readfp(f, path()) + f.close() + if value is not None: + config.set(section, name, value) + else: + config.remove_option(section, name) + f = codecs.open(path(), "w", encoding) + config.write(f) + f.close() + +def get_val(name, section="DEFAULT", default=None, encoding=None): + """ + Get a value from the per-user config file + + :param name: The name of the value to get + :section: The section that the name is in + :return: The value, or None + >>> get_val("junk") is None + True + >>> set_val("junk", "random") + >>> get_val("junk") + u'random' + >>> set_val("junk", None) + >>> get_val("junk") is None + True + """ + if os.path.exists(path()): + if encoding == None: + encoding = default_encoding + config = ConfigParser.ConfigParser() + f = codecs.open(path(), "r", encoding) + config.readfp(f, path()) + f.close() + try: + return config.get(section, name) + except ConfigParser.NoOptionError: + return default + else: + return default + +suite = doctest.DocTestSuite() diff --git a/libbe/darcs.py b/libbe/darcs.py index e69de29..e7132c0 100644 --- a/libbe/darcs.py +++ b/libbe/darcs.py @@ -0,0 +1,163 @@ +# Copyright (C) 2009 W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import codecs +import os +import re +import sys +import unittest +import doctest + +import rcs +from rcs import RCS + +def new(): + return Darcs() + +class Darcs(RCS): + name="darcs" + client="darcs" + versioned=True + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + if self._u_search_parent_directories(path, "_darcs") != None : + return True + return False + def _rcs_root(self, path): + """Find the root of the deepest repository containing path.""" + # Assume that nothing funny is going on; in particular, that we aren't + # dealing with a bare repo. + if os.path.isdir(path) != True: + path = os.path.dirname(path) + darcs_dir = self._u_search_parent_directories(path, "_darcs") + if darcs_dir == None: + return None + return os.path.dirname(darcs_dir) + def _rcs_init(self, path): + self._u_invoke_client("init", directory=path) + def _rcs_get_user_id(self): + # following http://darcs.net/manual/node4.html#SECTION00410030000000000000 + # as of June 29th, 2009 + if self.rootdir == None: + return None + darcs_dir = os.path.join(self.rootdir, "_darcs") + if darcs_dir != None: + for pref_file in ["author", "email"]: + pref_path = os.path.join(darcs_dir, "prefs", pref_file) + if os.path.exists(pref_path): + return self.get_file_contents(pref_path) + for env_variable in ["DARCS_EMAIL", "EMAIL"]: + if env_variable in os.environ: + return os.environ[env_variable] + return None + def _rcs_set_user_id(self, value): + if self.rootdir == None: + self.root(".") + if self.rootdir == None: + raise rcs.SettingIDnotSupported + author_path = os.path.join(self.rootdir, "_darcs", "prefs", "author") + f = codecs.open(author_path, "w", self.encoding) + f.write(value) + f.close() + def _rcs_add(self, path): + if os.path.isdir(path): + return + self._u_invoke_client("add", path) + def _rcs_remove(self, path): + if not os.path.isdir(self._u_abspath(path)): + os.remove(os.path.join(self.rootdir, path)) # darcs notices removal + def _rcs_update(self, path): + pass # darcs notices changes + def _rcs_get_file_contents(self, path, revision=None, binary=False): + if revision == None: + return RCS._rcs_get_file_contents(self, path, revision, + binary=binary) + else: + try: + return self._u_invoke_client("show", "contents", "--patch", revision, path) + except rcs.CommandError: + # Darcs versions < 2.0.0pre2 lack the "show contents" command + + status,output,error = self._u_invoke_client("diff", "--unified", + "--from-patch", + revision, path) + major_patch = output + status,output,error = self._u_invoke_client("diff", "--unified", + "--patch", + revision, path) + target_patch = output + + # "--output -" to be supported in GNU patch > 2.5.9 + # but that hasn't been released as of June 30th, 2009. + + # Rewrite path to status before the patch we want + args=["patch", "--reverse", path] + status,output,error = self._u_invoke(args, stdin=major_patch) + # Now apply the patch we want + args=["patch", path] + status,output,error = self._u_invoke(args, stdin=target_patch) + + if os.path.exists(os.path.join(self.rootdir, path)) == True: + contents = RCS._rcs_get_file_contents(self, path, + binary=binary) + else: + contents = "" + + # Now restore path to it's current incarnation + args=["patch", "--reverse", path] + status,output,error = self._u_invoke(args, stdin=target_patch) + args=["patch", path] + status,output,error = self._u_invoke(args, stdin=major_patch) + current_contents = RCS._rcs_get_file_contents(self, path, + binary=binary) + return contents + def _rcs_duplicate_repo(self, directory, revision=None): + if revision==None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + self._u_invoke_client("put", "--to-patch", revision, directory) + def _rcs_commit(self, commitfile, allow_empty=False): + id = self.get_user_id() + if '@' not in id: + id = "%s <%s@invalid.com>" % (id, id) + args = ['record', '--all', '--author', id, '--logfile', commitfile] + status,output,error = self._u_invoke_client(*args) + empty_strings = ["No changes!"] + revision = None + if self._u_any_in_string(empty_strings, output) == True: + if allow_empty == False: + raise rcs.EmptyCommit() + else: # we need a extra call to get the current revision + args = ["changes", "--last=1", "--xml"] + status,output,error = self._u_invoke_client(*args) + revline = re.compile("[ \t]*(.*)") + # note that darcs does _not_ make an empty revision. + # this returns the last non-empty revision id... + else: + revline = re.compile("Finished recording patch '(.*)'") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revision = match.groups()[0] + return revision + + +rcs.make_rcs_testcase_subclasses(Darcs, sys.modules[__name__]) + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/diff.py b/libbe/diff.py index e69de29..ba48efc 100644 --- a/libbe/diff.py +++ b/libbe/diff.py @@ -0,0 +1,125 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +"""Compare two bug trees""" +from libbe import cmdutil, bugdir, bug +from libbe.utility import time_to_str +import doctest + +def bug_diffs(old_bugdir, new_bugdir): + added = [] + removed = [] + modified = [] + for uuid in old_bugdir.list_uuids(): + old_bug = old_bugdir.bug_from_uuid(uuid) + try: + new_bug = new_bugdir.bug_from_uuid(uuid) + old_bug.load_comments() + new_bug.load_comments() + if old_bug != new_bug: + modified.append((old_bug, new_bug)) + except KeyError: + removed.append(old_bug) + for uuid in new_bugdir.list_uuids(): + if not old_bugdir.has_bug(uuid): + new_bug = new_bugdir.bug_from_uuid(uuid) + added.append(new_bug) + return (removed, modified, added) + +def diff_report(bug_diffs_data, old_bugdir, new_bugdir): + bugs_removed,bugs_modified,bugs_added = bug_diffs_data + def modified_cmp(left, right): + return bug.cmp_severity(left[1], right[1]) + + bugs_added.sort(bug.cmp_severity) + bugs_removed.sort(bug.cmp_severity) + bugs_modified.sort(modified_cmp) + lines = [] + + if old_bugdir.settings != new_bugdir.settings: + bugdir_settings = sorted(new_bugdir.settings_properties) + bugdir_settings.remove("rcs_name") # tweaked by bugdir.duplicate_bugdir + change_list = change_lines(old_bugdir, new_bugdir, bugdir_settings) + if len(change_list) > 0: + lines.append("Modified bug directory:") + change_strings = ["%s: %s -> %s" % f for f in change_list] + lines.extend(change_strings) + lines.append("") + if len(bugs_added) > 0: + lines.append("New bug reports:") + for bg in bugs_added: + lines.extend(bg.string(shortlist=True).splitlines()) + lines.append("") + if len(bugs_modified) > 0: + printed = False + for old_bug, new_bug in bugs_modified: + change_str = bug_changes(old_bug, new_bug) + if change_str is None: + continue + if not printed: + printed = True + lines.append("Modified bug reports:") + lines.extend(change_str.splitlines()) + if printed == True: + lines.append("") + if len(bugs_removed) > 0: + lines.append("Removed bug reports:") + for bg in bugs_removed: + lines.extend(bg.string(shortlist=True).splitlines()) + lines.append("") + + return "\n".join(lines).rstrip("\n") + +def change_lines(old, new, attributes): + change_list = [] + for attr in attributes: + old_attr = getattr(old, attr) + new_attr = getattr(new, attr) + if old_attr != new_attr: + change_list.append((attr, old_attr, new_attr)) + if len(change_list) >= 0: + return change_list + else: + return None + +def bug_changes(old, new): + bug_settings = sorted(new.settings_properties) + change_list = change_lines(old, new, bug_settings) + change_strings = ["%s: %s -> %s" % f for f in change_list] + + old_comment_ids = [c.uuid for c in old.comments()] + new_comment_ids = [c.uuid for c in new.comments()] + for comment_id in new_comment_ids: + if comment_id not in old_comment_ids: + summary = comment_summary(new.comment_from_uuid(comment_id), "new") + change_strings.append(summary) + for comment_id in old_comment_ids: + if comment_id not in new_comment_ids: + summary = comment_summary(new.comment_from_uuid(comment_id), + "removed") + change_strings.append(summary) + + if len(change_strings) == 0: + return None + return "%s\n %s" % (new.string(shortlist=True), + " \n".join(change_strings)) + + +def comment_summary(comment, status): + return "%8s comment from %s on %s" % (status, comment.From, + time_to_str(comment.time)) + +suite = doctest.DocTestSuite() diff --git a/libbe/editor.py b/libbe/editor.py index e69de29..93144b8 100644 --- a/libbe/editor.py +++ b/libbe/editor.py @@ -0,0 +1,101 @@ +# Bugs Everywhere, a distributed bugtracker +# Copyright (C) 2008-2009 W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import codecs +import locale +import os +import sys +import tempfile +import doctest + +default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding() + +comment_marker = u"== Anything below this line will be ignored\n" + +class CantFindEditor(Exception): + def __init__(self): + Exception.__init__(self, "Can't find editor to get string from") + +def editor_string(comment=None, encoding=None): + """Invokes the editor, and returns the user-produced text as a string + + >>> if "EDITOR" in os.environ: + ... del os.environ["EDITOR"] + >>> if "VISUAL" in os.environ: + ... del os.environ["VISUAL"] + >>> editor_string() + Traceback (most recent call last): + CantFindEditor: Can't find editor to get string from + >>> os.environ["EDITOR"] = "echo bar > " + >>> editor_string() + u'bar\\n' + >>> os.environ["VISUAL"] = "echo baz > " + >>> editor_string() + u'baz\\n' + >>> del os.environ["EDITOR"] + >>> del os.environ["VISUAL"] + """ + if encoding == None: + encoding = default_encoding + for name in ('VISUAL', 'EDITOR'): + try: + editor = os.environ[name] + break + except KeyError: + pass + else: + raise CantFindEditor() + fhandle, fname = tempfile.mkstemp() + try: + if comment is not None: + os.write(fhandle, '\n'+comment_string(comment)) + os.close(fhandle) + oldmtime = os.path.getmtime(fname) + os.system("%s %s" % (editor, fname)) + f = codecs.open(fname, "r", encoding) + output = trimmed_string(f.read()) + f.close() + if output.rstrip('\n') == "": + output = None + finally: + os.unlink(fname) + return output + + +def comment_string(comment): + """ + >>> comment_string('hello') == comment_marker+"hello" + True + """ + return comment_marker + comment + + +def trimmed_string(instring): + """ + >>> trimmed_string("hello\\n"+comment_marker) + u'hello\\n' + >>> trimmed_string("hi!\\n" + comment_string('Booga')) + u'hi!\\n' + """ + out = [] + for line in instring.splitlines(True): + if line.startswith(comment_marker): + break + out.append(line) + return ''.join(out) + +suite = doctest.DocTestSuite() diff --git a/libbe/encoding.py b/libbe/encoding.py index e69de29..d603602 100644 --- a/libbe/encoding.py +++ b/libbe/encoding.py @@ -0,0 +1,51 @@ +# Bugs Everywhere, a distributed bugtracker +# Copyright (C) 2008-2009 W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import codecs +import locale +import sys +import doctest + +def get_encoding(): + """ + Guess a useful input/output/filesystem encoding... Maybe we need + seperate encodings for input/output and filesystem? Hmm... + """ + encoding = locale.getpreferredencoding() or sys.getdefaultencoding() + if sys.platform != 'win32' or sys.version_info[:2] > (2, 3): + encoding = locale.getlocale(locale.LC_TIME)[1] or encoding + # Python 2.3 on windows doesn't know about 'XYZ' alias for 'cpXYZ' + return encoding + +def known_encoding(encoding): + """ + >>> known_encoding("highly-unlikely-encoding") + False + >>> known_encoding(get_encoding()) + True + """ + try: + codecs.lookup(encoding) + return True + except LookupError: + return False + +def set_IO_stream_encodings(encoding): + sys.stdin = codecs.getreader(encoding)(sys.__stdin__) + sys.stdout = codecs.getwriter(encoding)(sys.__stdout__) + sys.stderr = codecs.getwriter(encoding)(sys.__stderr__) + +suite = doctest.DocTestSuite() diff --git a/libbe/git.py b/libbe/git.py index e69de29..2f9ffa9 100644 --- a/libbe/git.py +++ b/libbe/git.py @@ -0,0 +1,120 @@ +# Copyright (C) 2008-2009 Ben Finney +# Chris Ball +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import os +import re +import sys +import unittest +import doctest + +import rcs +from rcs import RCS + +def new(): + return Git() + +class Git(RCS): + name="git" + client="git" + versioned=True + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + if self._u_search_parent_directories(path, ".git") != None : + return True + return False + def _rcs_root(self, path): + """Find the root of the deepest repository containing path.""" + # Assume that nothing funny is going on; in particular, that we aren't + # dealing with a bare repo. + if os.path.isdir(path) != True: + path = os.path.dirname(path) + status,output,error = self._u_invoke_client("rev-parse", "--git-dir", + directory=path) + gitdir = os.path.join(path, output.rstrip('\n')) + dirname = os.path.abspath(os.path.dirname(gitdir)) + return dirname + def _rcs_init(self, path): + self._u_invoke_client("init", directory=path) + def _rcs_get_user_id(self): + status,output,error = self._u_invoke_client("config", "user.name") + name = output.rstrip('\n') + status,output,error = self._u_invoke_client("config", "user.email") + email = output.rstrip('\n') + if name != "" or email != "": # got something! + # guess missing info, if necessary + if name == "": + name = self._u_get_fallback_username() + if email == "": + email = self._u_get_fallback_email() + return self._u_create_id(name, email) + return None # Git has no infomation + def _rcs_set_user_id(self, value): + name,email = self._u_parse_id(value) + if email != None: + self._u_invoke_client("config", "user.email", email) + self._u_invoke_client("config", "user.name", name) + def _rcs_add(self, path): + if os.path.isdir(path): + return + self._u_invoke_client("add", path) + def _rcs_remove(self, path): + if not os.path.isdir(self._u_abspath(path)): + self._u_invoke_client("rm", "-f", path) + def _rcs_update(self, path): + self._rcs_add(path) + def _rcs_get_file_contents(self, path, revision=None, binary=False): + if revision == None: + return RCS._rcs_get_file_contents(self, path, revision, binary=binary) + else: + arg = "%s:%s" % (revision,path) + status,output,error = self._u_invoke_client("show", arg) + return output + def _rcs_duplicate_repo(self, directory, revision=None): + if revision==None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + #self._u_invoke_client("archive", revision, directory) # makes tarball + self._u_invoke_client("clone", "--no-checkout",".",directory) + self._u_invoke_client("checkout", revision, directory=directory) + def _rcs_commit(self, commitfile, allow_empty=False): + args = ['commit', '--all', '--file', commitfile] + if allow_empty == True: + args.append("--allow-empty") + status,output,error = self._u_invoke_client(*args) + else: + kwargs = {"expect":(0,1)} + status,output,error = self._u_invoke_client(*args, **kwargs) + strings = ["nothing to commit", + "nothing added to commit"] + if self._u_any_in_string(strings, output) == True: + raise rcs.EmptyCommit() + revision = None + revline = re.compile("(.*) (.*)[:\]] (.*)") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 3 + revision = match.groups()[1] + return revision + + +rcs.make_rcs_testcase_subclasses(Git, sys.modules[__name__]) + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/hg.py b/libbe/hg.py index e69de29..a20eeb5 100644 --- a/libbe/hg.py +++ b/libbe/hg.py @@ -0,0 +1,96 @@ +# Copyright (C) 2007-2009 Aaron Bentley and Panometrics, Inc. +# Ben Finney +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import os +import re +import sys +import unittest +import doctest + +import rcs +from rcs import RCS + +def new(): + return Hg() + +class Hg(RCS): + name="hg" + client="hg" + versioned=True + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + """Detect whether a directory is revision-controlled using Mercurial""" + if self._u_search_parent_directories(path, ".hg") != None: + return True + return False + def _rcs_root(self, path): + status,output,error = self._u_invoke_client("root", directory=path) + return output.rstrip('\n') + def _rcs_init(self, path): + self._u_invoke_client("init", directory=path) + def _rcs_get_user_id(self): + status,output,error = self._u_invoke_client("showconfig","ui.username") + return output.rstrip('\n') + def _rcs_set_user_id(self, value): + """ + Supported by the Config Extension, but that is not part of + standard Mercurial. + http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension + """ + raise rcs.SettingIDnotSupported + def _rcs_add(self, path): + self._u_invoke_client("add", path) + def _rcs_remove(self, path): + self._u_invoke_client("rm", "--force", path) + def _rcs_update(self, path): + pass + def _rcs_get_file_contents(self, path, revision=None, binary=False): + if revision == None: + return RCS._rcs_get_file_contents(self, path, revision, binary=binary) + else: + status,output,error = \ + self._u_invoke_client("cat","-r",revision,path) + return output + def _rcs_duplicate_repo(self, directory, revision=None): + if revision == None: + return RCS._rcs_duplicate_repo(self, directory, revision) + else: + self._u_invoke_client("archive", "--rev", revision, directory) + def _rcs_commit(self, commitfile, allow_empty=False): + args = ['commit', '--logfile', commitfile] + status,output,error = self._u_invoke_client(*args) + if allow_empty == False: + strings = ["nothing changed"] + if self._u_any_in_string(strings, output) == True: + raise rcs.EmptyCommit() + status,output,error = self._u_invoke_client('identify') + revision = None + revline = re.compile("(.*) tip") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revision = match.groups()[0] + return revision + + +rcs.make_rcs_testcase_subclasses(Hg, sys.modules[__name__]) + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/mapfile.py b/libbe/mapfile.py index e69de29..b959d76 100644 --- a/libbe/mapfile.py +++ b/libbe/mapfile.py @@ -0,0 +1,127 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import yaml +import os.path +import errno +import utility +import doctest + +class IllegalKey(Exception): + def __init__(self, key): + Exception.__init__(self, 'Illegal key "%s"' % key) + self.key = key + +class IllegalValue(Exception): + def __init__(self, value): + Exception.__init__(self, 'Illegal value "%s"' % value) + self.value = value + +def generate(map): + """Generate a YAML mapfile content string. + >>> generate({"q":"p"}) + 'q: p\\n\\n' + >>> generate({"q":u"Fran\u00e7ais"}) + 'q: Fran\\xc3\\xa7ais\\n\\n' + >>> generate({"q":u"hello"}) + 'q: hello\\n\\n' + >>> generate({"q=":"p"}) + Traceback (most recent call last): + IllegalKey: Illegal key "q=" + >>> generate({"q:":"p"}) + Traceback (most recent call last): + IllegalKey: Illegal key "q:" + >>> generate({"q\\n":"p"}) + Traceback (most recent call last): + IllegalKey: Illegal key "q\\n" + >>> generate({"":"p"}) + Traceback (most recent call last): + IllegalKey: Illegal key "" + >>> generate({">q":"p"}) + Traceback (most recent call last): + IllegalKey: Illegal key ">q" + >>> generate({"q":"p\\n"}) + Traceback (most recent call last): + IllegalValue: Illegal value "p\\n" + """ + keys = map.keys() + keys.sort() + for key in keys: + try: + assert not key.startswith('>') + assert('\n' not in key) + assert('=' not in key) + assert(':' not in key) + assert(len(key) > 0) + except AssertionError: + raise IllegalKey(unicode(key).encode('unicode_escape')) + if "\n" in map[key]: + raise IllegalValue(unicode(map[key]).encode('unicode_escape')) + + lines = [] + for key in keys: + lines.append(yaml.safe_dump({key: map[key]}, + default_flow_style=False, + allow_unicode=True)) + lines.append("") + return '\n'.join(lines) + +def parse(contents): + """ + Parse a YAML mapfile string. + >>> parse('q: p\\n\\n')['q'] + 'p' + >>> parse('q: \\'p\\'\\n\\n')['q'] + 'p' + >>> contents = generate({"a":"b", "c":"d", "e":"f"}) + >>> dict = parse(contents) + >>> dict["a"] + 'b' + >>> dict["c"] + 'd' + >>> dict["e"] + 'f' + """ + old_format = False + for line in contents.splitlines(): + if len(line.split("=")) == 2: + old_format = True + break + if old_format: # translate to YAML. Hack to deal with old BE bugs. + newlines = [] + for line in contents.splitlines(): + line = line.rstrip('\n') + if len(line) == 0: + continue + fields = line.split("=") + if len(fields) == 2: + key,value = fields + newlines.append('%s: "%s"' % (key, value.replace('"','\\"'))) + else: + newlines.append(line) + contents = '\n'.join(newlines) + return yaml.load(contents) or {} + +def map_save(rcs, path, map, allow_no_rcs=False): + """Save the map as a mapfile to the specified path""" + contents = generate(map) + rcs.set_file_contents(path, contents, allow_no_rcs) + +def map_load(rcs, path, allow_no_rcs=False): + contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs) + return parse(contents) + +suite = doctest.DocTestSuite() diff --git a/libbe/plugin.py b/libbe/plugin.py index e69de29..0545fd7 100644 --- a/libbe/plugin.py +++ b/libbe/plugin.py @@ -0,0 +1,71 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Marien Zwart +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import os +import os.path +import sys +import doctest + +def my_import(mod_name): + module = __import__(mod_name) + components = mod_name.split('.') + for comp in components[1:]: + module = getattr(module, comp) + return module + +def iter_plugins(prefix): + """ + >>> "list" in [n for n,m in iter_plugins("becommands")] + True + >>> "plugin" in [n for n,m in iter_plugins("libbe")] + True + """ + modfiles = os.listdir(os.path.join(plugin_path, prefix)) + modfiles.sort() + for modfile in modfiles: + if modfile.startswith('.'): + continue # the occasional emacs temporary file + if modfile.endswith(".py") and modfile != "__init__.py": + yield modfile[:-3], my_import(prefix+"."+modfile[:-3]) + + +def get_plugin(prefix, name): + """ + >>> get_plugin("becommands", "asdf") is None + True + >>> q = repr(get_plugin("becommands", "list")) + >>> q.startswith(" +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +This module provides a series of useful decorators for defining +various types of properties. For example usage, consider the +unittests at the end of the module. + +See + http://www.python.org/dev/peps/pep-0318/ +and + http://www.phyast.pitt.edu/~micheles/python/documentation.html +for more information on decorators. +""" + +import copy +import types +import unittest + + +class ValueCheckError (ValueError): + def __init__(self, name, value, allowed): + action = "in" # some list of allowed values + if type(allowed) == types.FunctionType: + action = "allowed by" # some allowed-value check function + msg = "%s not %s %s for %s" % (value, action, allowed, name) + ValueError.__init__(self, msg) + self.name = name + self.value = value + self.allowed = allowed + +def Property(funcs): + """ + End a chain of property decorators, returning a property. + """ + args = {} + args["fget"] = funcs.get("fget", None) + args["fset"] = funcs.get("fset", None) + args["fdel"] = funcs.get("fdel", None) + args["doc"] = funcs.get("doc", None) + + #print "Creating a property with" + #for key, val in args.items(): print key, value + return property(**args) + +def doc_property(doc=None): + """ + Add a docstring to a chain of property decorators. + """ + def decorator(funcs=None): + """ + Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...} + or a function fn() returning such a dict. + """ + if hasattr(funcs, "__call__"): + funcs = funcs() # convert from function-arg to dict + funcs["doc"] = doc + return funcs + return decorator + +def local_property(name, null=None, mutable_null=False): + """ + Define get/set access to per-parent-instance local storage. Uses + .__value to store the value for a particular owner instance. + If the .__value attribute does not exist, returns null. + + If mutable_null == True, we only release deepcopies of the null to + the outside world. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget", None) + fset = funcs.get("fset", None) + def _fget(self): + if fget is not None: + fget(self) + if mutable_null == True: + ret_null = copy.deepcopy(null) + else: + ret_null = null + value = getattr(self, "_%s_value" % name, ret_null) + return value + def _fset(self, value): + setattr(self, "_%s_value" % name, value) + if fset is not None: + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + funcs["name"] = name + return funcs + return decorator + +def settings_property(name, null=None): + """ + Similar to local_property, except where local_property stores the + value in instance.__value, settings_property stores the + value in instance.settings[name]. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget", None) + fset = funcs.get("fset", None) + def _fget(self): + if fget is not None: + fget(self) + value = self.settings.get(name, null) + return value + def _fset(self, value): + self.settings[name] = value + if fset is not None: + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + funcs["name"] = name + return funcs + return decorator + + +# Allow comparison and caching with _original_ values for mutables, +# since +# +# >>> a = [] +# >>> b = a +# >>> b.append(1) +# >>> a +# [1] +# >>> a==b +# True +def _hash_mutable_value(value): + return repr(value) +def _init_mutable_property_cache(self): + if not hasattr(self, "_mutable_property_cache_hash"): + # first call to _fget for any mutable property + self._mutable_property_cache_hash = {} + self._mutable_property_cache_copy = {} +def _set_cached_mutable_property(self, cacher_name, property_name, value): + _init_mutable_property_cache(self) + self._mutable_property_cache_hash[(cacher_name, property_name)] = \ + _hash_mutable_value(value) + self._mutable_property_cache_copy[(cacher_name, property_name)] = \ + copy.deepcopy(value) +def _get_cached_mutable_property(self, cacher_name, property_name, default=None): + _init_mutable_property_cache(self) + if (cacher_name, property_name) not in self._mutable_property_cache_copy: + return default + return self._mutable_property_cache_copy[(cacher_name, property_name)] +def _cmp_cached_mutable_property(self, cacher_name, property_name, value): + _init_mutable_property_cache(self) + if (cacher_name, property_name) not in self._mutable_property_cache_hash: + return 1 # any value > non-existant old hash + old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)] + return cmp(_hash_mutable_value(value), old_hash) + + +def defaulting_property(default=None, null=None, + mutable_default=False): + """ + Define a default value for get access to a property. + If the stored value is null, then default is returned. + + If mutable_default == True, we only release deepcopies of the + default to the outside world. + + null should never escape to the outside world, so don't worry + about it being a mutable. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "") + def _fget(self): + value = fget(self) + if value == null: + if mutable_default == True: + return copy.deepcopy(default) + else: + return default + return value + def _fset(self, value): + if value == default: + value = null + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def fn_checked_property(value_allowed_fn): + """ + Define allowed values for get/set access to a property. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "") + def _fget(self): + value = fget(self) + if value_allowed_fn(value) != True: + raise ValueCheckError(name, value, value_allowed_fn) + return value + def _fset(self, value): + if value_allowed_fn(value) != True: + raise ValueCheckError(name, value, value_allowed_fn) + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def checked_property(allowed=[]): + """ + Define allowed values for get/set access to a property. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "") + def _fget(self): + value = fget(self) + if value not in allowed: + raise ValueCheckError(name, value, allowed) + return value + def _fset(self, value): + if value not in allowed: + raise ValueCheckError(name, value, allowed) + fset(self, value) + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + +def cached_property(generator, initVal=None, mutable=False): + """ + Allow caching of values generated by generator(instance), where + instance is the instance to which this property belongs. Uses + .__cache to store a cache flag for a particular owner + instance. + + When the cache flag is True or missing and the stored value is + initVal, the first fget call triggers the generator function, + whose output is stored in __cached_value. That and + subsequent calls to fget will return this cached value. + + If the input value is no longer initVal (e.g. a value has been + loaded from disk or set with fset), that value overrides any + cached value, and this property has no effect. + + When the cache flag is False and the stored value is initVal, the + generator is not cached, but is called on every fget. + + The cache flag is missing on initialization. Particular instances + may override by setting their own flag. + + In the case that mutable == True, all caching is disabled and the + generator is called whenever the cached value would otherwise be + used. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + name = funcs.get("name", "") + def _fget(self): + cache = getattr(self, "_%s_cache" % name, True) + value = fget(self) + if value == initVal: + if cache == True and mutable == False: + if hasattr(self, "_%s_cached_value" % name): + value = getattr(self, "_%s_cached_value" % name) + else: + value = generator(self) + setattr(self, "_%s_cached_value" % name, value) + else: + value = generator(self) + return value + funcs["fget"] = _fget + return funcs + return decorator + +def primed_property(primer, initVal=None): + """ + Just like a cached_property, except that instead of returning a + new value and running fset to cache it, the primer performs some + background manipulation (e.g. loads data into instance.settings) + such that a _second_ pass through fget succeeds. + + The 'cache' flag becomes a 'prime' flag, with priming taking place + whenever .__prime is True, or is False or missing and + value == initVal. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + name = funcs.get("name", "") + def _fget(self): + prime = getattr(self, "_%s_prime" % name, False) + if prime == False: + value = fget(self) + if prime == True or (prime == False and value == initVal): + primer(self) + value = fget(self) + return value + funcs["fget"] = _fget + return funcs + return decorator + +def change_hook_property(hook, mutable=False): + """ + Call the function hook(instance, old_value, new_value) whenever a + value different from the current value is set (instance is a a + reference to the class instance to which this property belongs). + This is useful for saving changes to disk, etc. This function is + called _after_ the new value has been stored, allowing you to + change the stored value if you want. + + In the case of mutables, things are slightly trickier. Because + the property-owning class has no way of knowing when the value + changes. We work around this by caching a private deepcopy of the + mutable value, and checking for changes whenever the property is + set (obviously) or retrieved (to check for external changes). So + long as you're conscientious about accessing the property after + making external modifications, mutability woln't be a problem. + t.x.append(5) # external modification + t.x # dummy access notices change and triggers hook + See testChangeHookMutableProperty for an example of the expected + behavior. + """ + def decorator(funcs): + if hasattr(funcs, "__call__"): + funcs = funcs() + fget = funcs.get("fget") + fset = funcs.get("fset") + name = funcs.get("name", "") + def _fget(self, new_value=None, from_fset=False): # only used if mutable == True + if from_fset == True: + value = new_value # compare new value with cached + else: + value = fget(self) # compare current value with cached + if _cmp_cached_mutable_property(self, "change hook property", name, value) != 0: + # there has been a change, cache new value + old_value = _get_cached_mutable_property(self, "change hook property", name) + _set_cached_mutable_property(self, "change hook property", name, value) + if from_fset == True: # return previously cached value + value = old_value + else: # the value changed while we weren't looking + hook(self, old_value, value) + return value + def _fset(self, value): + if mutable == True: # get cached previous value + old_value = _fget(self, new_value=value, from_fset=True) + else: + old_value = fget(self) + fset(self, value) + if value != old_value: + hook(self, old_value, value) + if mutable == True: + funcs["fget"] = _fget + funcs["fset"] = _fset + return funcs + return decorator + + +class DecoratorTests(unittest.TestCase): + def testLocalDoc(self): + class Test(object): + @Property + @doc_property("A fancy property") + def x(): + return {} + self.failUnless(Test.x.__doc__ == "A fancy property", + Test.x.__doc__) + def testLocalProperty(self): + class Test(object): + @Property + @local_property(name="LOCAL") + def x(): + return {} + t = Test() + self.failUnless(t.x == None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value + self.failUnless(t.x == 'z', str(t.x)) + self.failUnless("_LOCAL_value" in dir(t), dir(t)) + self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value) + def testSettingsProperty(self): + class Test(object): + @Property + @settings_property(name="attr") + def x(): + return {} + def __init__(self): + self.settings = {} + t = Test() + self.failUnless(t.x == None, str(t.x)) + t.x = 'z' # the first set initializes ._LOCAL_value + self.failUnless(t.x == 'z', str(t.x)) + self.failUnless("attr" in t.settings, t.settings) + self.failUnless(t.settings["attr"] == 'z', t.settings["attr"]) + def testDefaultingLocalProperty(self): + class Test(object): + @Property + @defaulting_property(default='y', null='x') + @local_property(name="DEFAULT", null=5) + def x(): return {} + t = Test() + self.failUnless(t.x == 5, str(t.x)) + t.x = 'x' + self.failUnless(t.x == 'y', str(t.x)) + t.x = 'y' + self.failUnless(t.x == 'y', str(t.x)) + t.x = 'z' + self.failUnless(t.x == 'z', str(t.x)) + t.x = 5 + self.failUnless(t.x == 5, str(t.x)) + def testCheckedLocalProperty(self): + class Test(object): + @Property + @checked_property(allowed=['x', 'y', 'z']) + @local_property(name="CHECKED") + def x(): return {} + def __init__(self): + self._CHECKED_value = 'x' + t = Test() + self.failUnless(t.x == 'x', str(t.x)) + try: + t.x = None + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + def testTwoCheckedLocalProperties(self): + class Test(object): + @Property + @checked_property(allowed=['x', 'y', 'z']) + @local_property(name="X") + def x(): return {} + + @Property + @checked_property(allowed=['a', 'b', 'c']) + @local_property(name="A") + def a(): return {} + def __init__(self): + self._A_value = 'a' + self._X_value = 'x' + t = Test() + try: + t.x = 'a' + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + t.x = 'x' + t.x = 'y' + t.x = 'z' + try: + t.a = 'x' + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + t.a = 'a' + t.a = 'b' + t.a = 'c' + def testFnCheckedLocalProperty(self): + class Test(object): + @Property + @fn_checked_property(lambda v : v in ['x', 'y', 'z']) + @local_property(name="CHECKED") + def x(): return {} + def __init__(self): + self._CHECKED_value = 'x' + t = Test() + self.failUnless(t.x == 'x', str(t.x)) + try: + t.x = None + e = None + except ValueCheckError, e: + pass + self.failUnless(type(e) == ValueCheckError, type(e)) + def testCachedLocalProperty(self): + class Gen(object): + def __init__(self): + self.i = 0 + def __call__(self, owner): + self.i += 1 + return self.i + class Test(object): + @Property + @cached_property(generator=Gen(), initVal=None) + @local_property(name="CACHED") + def x(): return {} + t = Test() + self.failIf("_CACHED_cache" in dir(t), getattr(t, "_CACHED_cache", None)) + self.failUnless(t.x == 1, t.x) + self.failUnless(t.x == 1, t.x) + self.failUnless(t.x == 1, t.x) + t.x = 8 + self.failUnless(t.x == 8, t.x) + self.failUnless(t.x == 8, t.x) + t._CACHED_cache = False # Caching is off, but the stored value + val = t.x # is 8, not the initVal (None), so we + self.failUnless(val == 8, val) # get 8. + t._CACHED_value = None # Now we've set the stored value to None + val = t.x # so future calls to fget (like this) + self.failUnless(val == 2, val) # will call the generator every time... + val = t.x + self.failUnless(val == 3, val) + val = t.x + self.failUnless(val == 4, val) + t._CACHED_cache = True # We turn caching back on, and get + self.failUnless(t.x == 1, str(t.x)) # the original cached value. + del t._CACHED_cached_value # Removing that value forces a + self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call + self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which + self.failUnless(t.x == 5, str(t.x)) # we get the new cached value. + def testPrimedLocalProperty(self): + class Test(object): + def prime(self): + self.settings["PRIMED"] = "initialized" + @Property + @primed_property(primer=prime, initVal=None) + @settings_property(name="PRIMED") + def x(): return {} + def __init__(self): + self.settings={} + t = Test() + self.failIf("_PRIMED_prime" in dir(t), getattr(t, "_PRIMED_prime", None)) + self.failUnless(t.x == "initialized", t.x) + t.x = 1 + self.failUnless(t.x == 1, t.x) + t.x = None + self.failUnless(t.x == "initialized", t.x) + t._PRIMED_prime = True + t.x = 3 + self.failUnless(t.x == "initialized", t.x) + t._PRIMED_prime = False + t.x = 3 + self.failUnless(t.x == 3, t.x) + def testChangeHookLocalProperty(self): + class Test(object): + def _hook(self, old, new): + self.old = old + self.new = new + + @Property + @change_hook_property(_hook) + @local_property(name="HOOKED") + def x(): return {} + t = Test() + t.x = 1 + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == 1, t.new) + t.x = 1 + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == 1, t.new) + t.x = 2 + self.failUnless(t.old == 1, t.old) + self.failUnless(t.new == 2, t.new) + def testChangeHookMutableProperty(self): + class Test(object): + def _hook(self, old, new): + self.old = old + self.new = new + self.hook_calls += 1 + + @Property + @change_hook_property(_hook, mutable=True) + @local_property(name="HOOKED") + def x(): return {} + t = Test() + t.hook_calls = 0 + t.x = [] + self.failUnless(t.old == None, t.old) + self.failUnless(t.new == [], t.new) + self.failUnless(t.hook_calls == 1, t.hook_calls) + a = t.x + a.append(5) + t.x = a + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 2, t.hook_calls) + t.x = [] + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [], t.new) + self.failUnless(t.hook_calls == 3, t.hook_calls) + # now append without reassigning. this doesn't trigger the + # change, since we don't ever set t.x, only get it and mess + # with it. It does, however, update our t.new, since t.new = + # t.x and is not a static copy. + t.x.append(5) + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 3, t.hook_calls) + # however, the next t.x get _will_ notice the change... + a = t.x + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5], t.new) + self.failUnless(t.hook_calls == 4, t.hook_calls) + t.x.append(6) # this append(6) is not noticed yet + self.failUnless(t.old == [], t.old) + self.failUnless(t.new == [5,6], t.new) + self.failUnless(t.hook_calls == 4, t.hook_calls) + # this append(7) is not noticed, but the t.x get causes the + # append(6) to be noticed + t.x.append(7) + self.failUnless(t.old == [5], t.old) + self.failUnless(t.new == [5,6,7], t.new) + self.failUnless(t.hook_calls == 5, t.hook_calls) + a = t.x # now the append(7) is noticed + self.failUnless(t.old == [5,6], t.old) + self.failUnless(t.new == [5,6,7], t.new) + self.failUnless(t.hook_calls == 6, t.hook_calls) + + +suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests) + diff --git a/libbe/rcs.py b/libbe/rcs.py index e69de29..294b8e0 100644 --- a/libbe/rcs.py +++ b/libbe/rcs.py @@ -0,0 +1,876 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Alexander Belchenko +# Ben Finney +# Chris Ball +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from subprocess import Popen, PIPE +import codecs +import os +import os.path +import re +from socket import gethostname +import shutil +import sys +import tempfile +import unittest +import doctest + +from utility import Dir, search_parent_directories + + +def _get_matching_rcs(matchfn): + """Return the first module for which matchfn(RCS_instance) is true""" + import arch + import bzr + import darcs + import git + import hg + for module in [arch, bzr, darcs, git, hg]: + rcs = module.new() + if matchfn(rcs) == True: + return rcs + del(rcs) + return RCS() + +def rcs_by_name(rcs_name): + """Return the module for the RCS with the given name""" + return _get_matching_rcs(lambda rcs: rcs.name == rcs_name) + +def detect_rcs(dir): + """Return an RCS instance for the rcs being used in this directory""" + return _get_matching_rcs(lambda rcs: rcs.detect(dir)) + +def installed_rcs(): + """Return an instance of an installed RCS""" + return _get_matching_rcs(lambda rcs: rcs.installed()) + + +class CommandError(Exception): + def __init__(self, command, status, err_str): + strerror = ["Command failed (%d):\n %s\n" % (status, err_str), + "while executing\n %s" % command] + Exception.__init__(self, "\n".join(strerror)) + self.command = command + self.status = status + self.err_str = err_str + +class SettingIDnotSupported(NotImplementedError): + pass + +class RCSnotRooted(Exception): + def __init__(self): + msg = "RCS not rooted" + Exception.__init__(self, msg) + +class PathNotInRoot(Exception): + def __init__(self, path, root): + msg = "Path '%s' not in root '%s'" % (path, root) + Exception.__init__(self, msg) + self.path = path + self.root = root + +class NoSuchFile(Exception): + def __init__(self, pathname, root="."): + path = os.path.abspath(os.path.join(root, pathname)) + Exception.__init__(self, "No such file: %s" % path) + +class EmptyCommit(Exception): + def __init__(self): + Exception.__init__(self, "No changes to commit") + + +def new(): + return RCS() + +class RCS(object): + """ + This class implements a 'no-rcs' interface. + + Support for other RCSs can be added by subclassing this class, and + overriding methods _rcs_*() with code appropriate for your RCS. + + The methods _u_*() are utility methods available to the _rcs_*() + methods. + """ + name = "None" + client = "" # command-line tool for _u_invoke_client + versioned = False + def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()): + self.paranoid = paranoid + self.verboseInvoke = False + self.rootdir = None + self._duplicateBasedir = None + self._duplicateDirname = None + self.encoding = encoding + def __del__(self): + self.cleanup() + + def _rcs_help(self): + """ + Return the command help string. + (Allows a simple test to see if the client is installed.) + """ + pass + def _rcs_detect(self, path=None): + """ + Detect whether a directory is revision controlled with this RCS. + """ + return True + def _rcs_root(self, path): + """ + Get the RCS root. This is the default working directory for + future invocations. You would normally set this to the root + directory for your RCS. + """ + if os.path.isdir(path)==False: + path = os.path.dirname(path) + if path == "": + path = os.path.abspath(".") + return path + def _rcs_init(self, path): + """ + Begin versioning the tree based at path. + """ + pass + def _rcs_cleanup(self): + """ + Remove any cruft that _rcs_init() created outside of the + versioned tree. + """ + pass + def _rcs_get_user_id(self): + """ + Get the RCS's suggested user id (e.g. "John Doe "). + If the RCS has not been configured with a username, return None. + """ + return None + def _rcs_set_user_id(self, value): + """ + Set the RCS's suggested user id (e.g "John Doe "). + This is run if the RCS has not been configured with a usename, so + that commits will have a reasonable FROM value. + """ + raise SettingIDnotSupported + def _rcs_add(self, path): + """ + Add the already created file at path to version control. + """ + pass + def _rcs_remove(self, path): + """ + Remove the file at path from version control. Optionally + remove the file from the filesystem as well. + """ + pass + def _rcs_update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + pass + def _rcs_get_file_contents(self, path, revision=None, binary=False): + """ + Get the file contents as they were in a given revision. + Revision==None specifies the current revision. + """ + assert revision == None, \ + "The %s RCS does not support revision specifiers" % self.name + if binary == False: + f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding) + else: + f = open(os.path.join(self.rootdir, path), "rb") + contents = f.read() + f.close() + return contents + def _rcs_duplicate_repo(self, directory, revision=None): + """ + Get the repository as it was in a given revision. + revision==None specifies the current revision. + dir specifies a directory to create the duplicate in. + """ + shutil.copytree(self.rootdir, directory, True) + def _rcs_commit(self, commitfile, allow_empty=False): + """ + Commit the current working directory, using the contents of + commitfile as the comment. Return the name of the old + revision (or None if commits are not supported). + + If allow_empty == False, raise EmptyCommit if there are no + changes to commit. + """ + return None + def installed(self): + try: + self._rcs_help() + return True + except OSError, e: + if e.errno == errno.ENOENT: + return False + except CommandError: + return False + def detect(self, path="."): + """ + Detect whether a directory is revision controlled with this RCS. + """ + return self._rcs_detect(path) + def root(self, path): + """ + Set the root directory to the path's RCS root. This is the + default working directory for future invocations. + """ + self.rootdir = self._rcs_root(path) + def init(self, path): + """ + Begin versioning the tree based at path. + Also roots the rcs at path. + """ + if os.path.isdir(path)==False: + path = os.path.dirname(path) + self._rcs_init(path) + self.root(path) + def cleanup(self): + self._rcs_cleanup() + def get_user_id(self): + """ + Get the RCS's suggested user id (e.g. "John Doe "). + If the RCS has not been configured with a username, return the user's + id. You can override the automatic lookup procedure by setting the + RCS.user_id attribute to a string of your choice. + """ + if hasattr(self, "user_id"): + if self.user_id != None: + return self.user_id + id = self._rcs_get_user_id() + if id == None: + name = self._u_get_fallback_username() + email = self._u_get_fallback_email() + id = self._u_create_id(name, email) + print >> sys.stderr, "Guessing id '%s'" % id + try: + self.set_user_id(id) + except SettingIDnotSupported: + pass + return id + def set_user_id(self, value): + """ + Set the RCS's suggested user id (e.g "John Doe "). + This is run if the RCS has not been configured with a usename, so + that commits will have a reasonable FROM value. + """ + self._rcs_set_user_id(value) + def add(self, path): + """ + Add the already created file at path to version control. + """ + self._rcs_add(self._u_rel_path(path)) + def remove(self, path): + """ + Remove a file from both version control and the filesystem. + """ + self._rcs_remove(self._u_rel_path(path)) + if os.path.exists(path): + os.remove(path) + def recursive_remove(self, dirname): + """ + Remove a file/directory and all its decendents from both + version control and the filesystem. + """ + if not os.path.exists(dirname): + raise NoSuchFile(dirname) + for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): + filenames.extend(dirnames) + for path in filenames: + fullpath = os.path.join(dirpath, path) + if os.path.exists(fullpath) == False: + continue + self._rcs_remove(self._u_rel_path(fullpath)) + if os.path.exists(dirname): + shutil.rmtree(dirname) + def update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + self._rcs_update(self._u_rel_path(path)) + def get_file_contents(self, path, revision=None, allow_no_rcs=False, binary=False): + """ + Get the file as it was in a given revision. + Revision==None specifies the current revision. + """ + if not os.path.exists(path): + raise NoSuchFile(path) + if self._use_rcs(path, allow_no_rcs): + relpath = self._u_rel_path(path) + contents = self._rcs_get_file_contents(relpath,revision,binary=binary) + else: + f = codecs.open(path, "r", self.encoding) + contents = f.read() + f.close() + return contents + def set_file_contents(self, path, contents, allow_no_rcs=False, binary=False): + """ + Set the file contents under version control. + """ + add = not os.path.exists(path) + if binary == False: + f = codecs.open(path, "w", self.encoding) + else: + f = open(path, "wb") + f.write(contents) + f.close() + + if self._use_rcs(path, allow_no_rcs): + if add: + self.add(path) + else: + self.update(path) + def mkdir(self, path, allow_no_rcs=False, check_parents=True): + """ + Create (if neccessary) a directory at path under version + control. + """ + if check_parents == True: + parent = os.path.dirname(path) + if not os.path.exists(parent): # recurse through parents + self.mkdir(parent, allow_no_rcs, check_parents) + if not os.path.exists(path): + os.mkdir(path) + if self._use_rcs(path, allow_no_rcs): + self.add(path) + else: + assert os.path.isdir(path) + if self._use_rcs(path, allow_no_rcs): + #self.update(path)# Don't update directories. Changing files + pass # underneath them should be sufficient. + + def duplicate_repo(self, revision=None): + """ + Get the repository as it was in a given revision. + revision==None specifies the current revision. + Return the path to the arbitrary directory at the base of the new repo. + """ + # Dirname in Baseir to protect against simlink attacks. + if self._duplicateBasedir == None: + self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs') + self._duplicateDirname = \ + os.path.join(self._duplicateBasedir, "duplicate") + self._rcs_duplicate_repo(directory=self._duplicateDirname, + revision=revision) + return self._duplicateDirname + def remove_duplicate_repo(self): + """ + Clean up a duplicate repo created with duplicate_repo(). + """ + if self._duplicateBasedir != None: + shutil.rmtree(self._duplicateBasedir) + self._duplicateBasedir = None + self._duplicateDirname = None + def commit(self, summary, body=None, allow_empty=False): + """ + Commit the current working directory, with a commit message + string summary and body. Return the name of the old revision + (or None if versioning is not supported). + + If allow_empty == False (the default), raise EmptyCommit if + there are no changes to commit. + """ + summary = summary.strip()+'\n' + if body is not None: + summary += '\n' + body.strip() + '\n' + descriptor, filename = tempfile.mkstemp() + revision = None + try: + temp_file = os.fdopen(descriptor, 'wb') + temp_file.write(summary) + temp_file.flush() + self.precommit() + revision = self._rcs_commit(filename, allow_empty=allow_empty) + temp_file.close() + self.postcommit() + finally: + os.remove(filename) + return revision + def precommit(self): + """ + Executed before all attempted commits. + """ + pass + def postcommit(self): + """ + Only executed after successful commits. + """ + pass + def _u_any_in_string(self, list, string): + """ + Return True if any of the strings in list are in string. + Otherwise return False. + """ + for list_string in list: + if list_string in string: + return True + return False + def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None): + """ + expect should be a tuple of allowed exit codes. cwd should be + the directory from which the command will be executed. + """ + if cwd == None: + cwd = self.rootdir + if self.verboseInvoke == True: + print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args)) + try : + if sys.platform != "win32": + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd) + else: + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, + shell=True, cwd=cwd) + except OSError, e : + raise CommandError(args, e.args[0], e) + output, error = q.communicate(input=stdin) + status = q.wait() + if self.verboseInvoke == True: + print >> sys.stderr, "%d\n%s%s" % (status, output, error) + if status not in expect: + raise CommandError(args, status, error) + return status, output, error + def _u_invoke_client(self, *args, **kwargs): + directory = kwargs.get('directory',None) + expect = kwargs.get('expect', (0,)) + stdin = kwargs.get('stdin', None) + cl_args = [self.client] + cl_args.extend(args) + return self._u_invoke(cl_args, stdin=stdin,expect=expect,cwd=directory) + def _u_search_parent_directories(self, path, filename): + """ + Find the file (or directory) named filename in path or in any + of path's parents. + + e.g. + search_parent_directories("/a/b/c", ".be") + will return the path to the first existing file from + /a/b/c/.be + /a/b/.be + /a/.be + /.be + or None if none of those files exist. + """ + return search_parent_directories(path, filename) + def _use_rcs(self, path, allow_no_rcs): + """ + Try and decide if _rcs_add/update/mkdir/etc calls will + succeed. Returns True is we think the rcs_call would + succeeed, and False otherwise. + """ + use_rcs = True + exception = None + if self.rootdir != None: + if self.path_in_root(path) == False: + use_rcs = False + exception = PathNotInRoot(path, self.rootdir) + else: + use_rcs = False + exception = RCSnotRooted + if use_rcs == False and allow_no_rcs==False: + raise exception + return use_rcs + def path_in_root(self, path, root=None): + """ + Return the relative path to path from root. + >>> rcs = new() + >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c") + True + >>> rcs.path_in_root("/a.b/.be", "/a.b/c") + False + """ + if root == None: + if self.rootdir == None: + raise RCSnotRooted + root = self.rootdir + path = os.path.abspath(path) + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + if not path.startswith(absRootSlashedDir): + return False + return True + def _u_rel_path(self, path, root=None): + """ + Return the relative path to path from root. + >>> rcs = new() + >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c") + '.be' + """ + if root == None: + if self.rootdir == None: + raise RCSnotRooted + root = self.rootdir + path = os.path.abspath(path) + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + if not path.startswith(absRootSlashedDir): + raise PathNotInRoot(path, absRootSlashedDir) + assert path != absRootSlashedDir, \ + "file %s == root directory %s" % (path, absRootSlashedDir) + relpath = path[len(absRootSlashedDir):] + return relpath + def _u_abspath(self, path, root=None): + """ + Return the absolute path from a path realtive to root. + >>> rcs = new() + >>> rcs._u_abspath(".be", "/a.b/c") + '/a.b/c/.be' + """ + if root == None: + assert self.rootdir != None, "RCS not rooted" + root = self.rootdir + return os.path.abspath(os.path.join(root, path)) + def _u_create_id(self, name, email=None): + """ + >>> rcs = new() + >>> rcs._u_create_id("John Doe", "jdoe@example.com") + 'John Doe ' + >>> rcs._u_create_id("John Doe") + 'John Doe' + """ + assert len(name) > 0 + if email == None or len(email) == 0: + return name + else: + return "%s <%s>" % (name, email) + def _u_parse_id(self, value): + """ + >>> rcs = new() + >>> rcs._u_parse_id("John Doe ") + ('John Doe', 'jdoe@example.com') + >>> rcs._u_parse_id("John Doe") + ('John Doe', None) + >>> try: + ... rcs._u_parse_id("John Doe ") + ... except AssertionError: + ... print "Invalid match" + Invalid match + """ + emailexp = re.compile("(.*) <([^>]*)>(.*)") + match = emailexp.search(value) + if match == None: + email = None + name = value + else: + assert len(match.groups()) == 3 + assert match.groups()[2] == "", match.groups() + email = match.groups()[1] + name = match.groups()[0] + assert name != None + assert len(name) > 0 + return (name, email) + def _u_get_fallback_username(self): + name = None + for envariable in ["LOGNAME", "USERNAME"]: + if os.environ.has_key(envariable): + name = os.environ[envariable] + break + assert name != None + return name + def _u_get_fallback_email(self): + hostname = gethostname() + name = self._u_get_fallback_username() + return "%s@%s" % (name, hostname) + def _u_parse_commitfile(self, commitfile): + """ + Split the commitfile created in self.commit() back into + summary and header lines. + """ + f = codecs.open(commitfile, "r", self.encoding) + summary = f.readline() + body = f.read() + body.lstrip('\n') + if len(body) == 0: + body = None + f.close() + return (summary, body) + + +def setup_rcs_test_fixtures(testcase): + """Set up test fixtures for RCS test case.""" + testcase.rcs = testcase.Class() + testcase.dir = Dir() + testcase.dirname = testcase.dir.path + + rcs_not_supporting_uninitialized_user_id = [] + rcs_not_supporting_set_user_id = ["None", "hg"] + testcase.rcs_supports_uninitialized_user_id = ( + testcase.rcs.name not in rcs_not_supporting_uninitialized_user_id) + testcase.rcs_supports_set_user_id = ( + testcase.rcs.name not in rcs_not_supporting_set_user_id) + + if not testcase.rcs.installed(): + testcase.fail( + "%(name)s RCS not found" % vars(testcase.Class)) + + if testcase.Class.name != "None": + testcase.failIf( + testcase.rcs.detect(testcase.dirname), + "Detected %(name)s RCS before initialising" + % vars(testcase.Class)) + + testcase.rcs.init(testcase.dirname) + + +class RCSTestCase(unittest.TestCase): + """Test cases for base RCS class.""" + + Class = RCS + + def __init__(self, *args, **kwargs): + super(RCSTestCase, self).__init__(*args, **kwargs) + self.dirname = None + + def setUp(self): + super(RCSTestCase, self).setUp() + setup_rcs_test_fixtures(self) + + def tearDown(self): + del(self.rcs) + super(RCSTestCase, self).tearDown() + + def full_path(self, rel_path): + return os.path.join(self.dirname, rel_path) + + +class RCS_init_TestCase(RCSTestCase): + """Test cases for RCS.init method.""" + + def test_detect_should_succeed_after_init(self): + """Should detect RCS in directory after initialization.""" + self.failUnless( + self.rcs.detect(self.dirname), + "Did not detect %(name)s RCS after initialising" + % vars(self.Class)) + + def test_rcs_rootdir_in_specified_root_path(self): + """RCS root directory should be in specified root path.""" + rp = os.path.realpath(self.rcs.rootdir) + dp = os.path.realpath(self.dirname) + rcs_name = self.Class.name + self.failUnless( + dp == rp or rp == None, + "%(rcs_name)s RCS root in wrong dir (%(dp)s %(rp)s)" % vars()) + + +class RCS_get_user_id_TestCase(RCSTestCase): + """Test cases for RCS.get_user_id method.""" + + def test_gets_existing_user_id(self): + """Should get the existing user ID.""" + if not self.rcs_supports_uninitialized_user_id: + return + + user_id = self.rcs.get_user_id() + self.failUnless( + user_id is not None, + "unable to get a user id") + + +class RCS_set_user_id_TestCase(RCSTestCase): + """Test cases for RCS.set_user_id method.""" + + def setUp(self): + super(RCS_set_user_id_TestCase, self).setUp() + + if self.rcs_supports_uninitialized_user_id: + self.prev_user_id = self.rcs.get_user_id() + else: + self.prev_user_id = "Uninitialized identity " + + if self.rcs_supports_set_user_id: + self.test_new_user_id = "John Doe " + self.rcs.set_user_id(self.test_new_user_id) + + def tearDown(self): + if self.rcs_supports_set_user_id: + self.rcs.set_user_id(self.prev_user_id) + super(RCS_set_user_id_TestCase, self).tearDown() + + def test_raises_error_in_unsupported_vcs(self): + """Should raise an error in a VCS that doesn't support it.""" + if self.rcs_supports_set_user_id: + return + self.assertRaises( + SettingIDnotSupported, + self.rcs.set_user_id, "foo") + + def test_updates_user_id_in_supporting_rcs(self): + """Should update the user ID in an RCS that supports it.""" + if not self.rcs_supports_set_user_id: + return + user_id = self.rcs.get_user_id() + self.failUnlessEqual( + self.test_new_user_id, user_id, + "user id not set correctly (expected %s, got %s)" + % (self.test_new_user_id, user_id)) + + +def setup_rcs_revision_test_fixtures(testcase): + """Set up revision test fixtures for RCS test case.""" + testcase.test_dirs = ['a', 'a/b', 'c'] + for path in testcase.test_dirs: + testcase.rcs.mkdir(testcase.full_path(path)) + + testcase.test_files = ['a/text', 'a/b/text'] + + testcase.test_contents = { + 'rev_1': "Lorem ipsum", + 'uncommitted': "dolor sit amet", + } + + +class RCS_mkdir_TestCase(RCSTestCase): + """Test cases for RCS.mkdir method.""" + + def setUp(self): + super(RCS_mkdir_TestCase, self).setUp() + setup_rcs_revision_test_fixtures(self) + + def tearDown(self): + for path in reversed(sorted(self.test_dirs)): + self.rcs.recursive_remove(self.full_path(path)) + super(RCS_mkdir_TestCase, self).tearDown() + + def test_mkdir_creates_directory(self): + """Should create specified directory in filesystem.""" + for path in self.test_dirs: + full_path = self.full_path(path) + self.failUnless( + os.path.exists(full_path), + "path %(full_path)s does not exist" % vars()) + + +class RCS_commit_TestCase(RCSTestCase): + """Test cases for RCS.commit method.""" + + def setUp(self): + super(RCS_commit_TestCase, self).setUp() + setup_rcs_revision_test_fixtures(self) + + def tearDown(self): + for path in reversed(sorted(self.test_dirs)): + self.rcs.recursive_remove(self.full_path(path)) + super(RCS_commit_TestCase, self).tearDown() + + def test_file_contents_as_specified(self): + """Should set file contents as specified.""" + test_contents = self.test_contents['rev_1'] + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents(full_path, test_contents) + current_contents = self.rcs.get_file_contents(full_path) + self.failUnlessEqual(test_contents, current_contents) + + def test_file_contents_as_committed(self): + """Should have file contents as specified after commit.""" + test_contents = self.test_contents['rev_1'] + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents(full_path, test_contents) + revision = self.rcs.commit("Initial file contents.") + current_contents = self.rcs.get_file_contents(full_path) + self.failUnlessEqual(test_contents, current_contents) + + def test_file_contents_as_set_when_uncommitted(self): + """Should set file contents as specified after commit.""" + if not self.rcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.rcs.commit("Initial file contents.") + self.rcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + current_contents = self.rcs.get_file_contents(full_path) + self.failUnlessEqual( + self.test_contents['uncommitted'], current_contents) + + def test_revision_file_contents_as_committed(self): + """Should get file contents as committed to specified revision.""" + if not self.rcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.rcs.commit("Initial file contents.") + self.rcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + committed_contents = self.rcs.get_file_contents( + full_path, revision) + self.failUnlessEqual( + self.test_contents['rev_1'], committed_contents) + + +class RCS_duplicate_repo_TestCase(RCSTestCase): + """Test cases for RCS.duplicate_repo method.""" + + def setUp(self): + super(RCS_duplicate_repo_TestCase, self).setUp() + setup_rcs_revision_test_fixtures(self) + + def tearDown(self): + self.rcs.remove_duplicate_repo() + for path in reversed(sorted(self.test_dirs)): + self.rcs.recursive_remove(self.full_path(path)) + super(RCS_duplicate_repo_TestCase, self).tearDown() + + def test_revision_file_contents_as_committed(self): + """Should match file contents as committed to specified revision.""" + if not self.rcs.versioned: + return + for path in self.test_files: + full_path = self.full_path(path) + self.rcs.set_file_contents( + full_path, self.test_contents['rev_1']) + revision = self.rcs.commit("Commit current status") + self.rcs.set_file_contents( + full_path, self.test_contents['uncommitted']) + dup_repo_path = self.rcs.duplicate_repo(revision) + dup_file_path = os.path.join(dup_repo_path, path) + dup_file_contents = file(dup_file_path, 'rb').read() + self.failUnlessEqual( + self.test_contents['rev_1'], dup_file_contents) + self.rcs.remove_duplicate_repo() + + +def make_rcs_testcase_subclasses(rcs_class, namespace): + """Make RCSTestCase subclasses for rcs_class in the namespace.""" + rcs_testcase_classes = [ + c for c in ( + ob for ob in globals().values() if isinstance(ob, type)) + if issubclass(c, RCSTestCase)] + + for base_class in rcs_testcase_classes: + testcase_class_name = rcs_class.__name__ + base_class.__name__ + testcase_class_bases = (base_class,) + testcase_class_dict = dict(base_class.__dict__) + testcase_class_dict['Class'] = rcs_class + testcase_class = type( + testcase_class_name, testcase_class_bases, testcase_class_dict) + setattr(namespace, testcase_class_name, testcase_class) + + +unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/settings_object.py b/libbe/settings_object.py index e69de29..dde247f 100644 --- a/libbe/settings_object.py +++ b/libbe/settings_object.py @@ -0,0 +1,417 @@ +# Bugs Everywhere - a distributed bugtracker +# Copyright (C) 2008-2009 W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +This module provides a base class implementing settings-dict based +property storage useful for BE objects with saved properties +(e.g. BugDir, Bug, Comment). For example usage, consider the +unittests at the end of the module. +""" + +import doctest +import unittest + +from properties import Property, doc_property, local_property, \ + defaulting_property, checked_property, fn_checked_property, \ + cached_property, primed_property, change_hook_property, \ + settings_property + + +class _Token (object): + """ + `Control' value class for properties. We want values that only + mean something to the settings_object module. + """ + pass + +class UNPRIMED (_Token): + "Property has not been primed." + pass + +class EMPTY (_Token): + """ + Property has been primed but has no user-set value, so use + default/generator value. + """ + pass + + +def prop_save_settings(self, old, new): + """ + The default action undertaken when a property changes. + """ + if self.sync_with_disk==True: + self.save_settings() + +def prop_load_settings(self): + """ + The default action undertaken when an UNPRIMED property is accessed. + """ + if self.sync_with_disk==True and self._settings_loaded==False: + self.load_settings() + else: + self._setup_saved_settings(flag_as_loaded=False) + +# Some name-mangling routines for pretty printing setting names +def setting_name_to_attr_name(self, name): + """ + Convert keys to the .settings dict into their associated + SavedSettingsObject attribute names. + >>> print setting_name_to_attr_name(None,"User-id") + user_id + """ + return name.lower().replace('-', '_') + +def attr_name_to_setting_name(self, name): + """ + The inverse of setting_name_to_attr_name. + >>> print attr_name_to_setting_name(None, "user_id") + User-id + """ + return name.capitalize().replace('_', '-') + + +def versioned_property(name, doc, + default=None, generator=None, + change_hook=prop_save_settings, + mutable=False, + primer=prop_load_settings, + allowed=None, check_fn=None, + settings_properties=[], + required_saved_properties=[], + require_save=False): + """ + Combine the common decorators in a single function. + + Use zero or one (but not both) of default or generator, since a + working default will keep the generator from functioning. Use the + default if you know what you want the default value to be at + 'coding time'. Use the generator if you can write a function to + determine a valid default at run time. If both default and + generator are None, then the property will be a defaulting + property which defaults to None. + + allowed and check_fn have a similar relationship, although you can + use both of these if you want. allowed compares the proposed + value against a list determined at 'coding time' and check_fn + allows more flexible comparisons to take place at run time. + + Set require_save to True if you want to save the default/generated + value for a property, to protect against future changes. E.g., we + currently expect all comments to be 'text/plain' but in the future + we may want to default to 'text/html'. If we don't want the old + comments to be interpreted as 'text/html', we would require that + the content type be saved. + + change_hook, primer, settings_properties, and + required_saved_properties are only options to get their defaults + into our local scope. Don't mess with them. + + Set mutable=True if: + * default is a mutable + * your generator function may return mutables + * you set change_hook and might have mutable property values + See the docstrings in libbe.properties for details on how each of + these cases are handled. + """ + settings_properties.append(name) + if require_save == True: + required_saved_properties.append(name) + def decorator(funcs): + fulldoc = doc + if default != None or generator == None: + defaulting = defaulting_property(default=default, null=EMPTY, + mutable_default=mutable) + fulldoc += "\n\nThis property defaults to %s." % default + if generator != None: + cached = cached_property(generator=generator, initVal=EMPTY, + mutable=mutable) + fulldoc += "\n\nThis property is generated with %s." % generator + if check_fn != None: + fn_checked = fn_checked_property(value_allowed_fn=check_fn) + fulldoc += "\n\nThis property is checked with %s." % check_fn + if allowed != None: + checked = checked_property(allowed=allowed) + fulldoc += "\n\nThe allowed values for this property are: %s." \ + % (', '.join(allowed)) + hooked = change_hook_property(hook=change_hook, mutable=mutable) + primed = primed_property(primer=primer, initVal=UNPRIMED) + settings = settings_property(name=name, null=UNPRIMED) + docp = doc_property(doc=fulldoc) + deco = hooked(primed(settings(docp(funcs)))) + if default != None or generator == None: + deco = defaulting(deco) + if generator != None: + deco = cached(deco) + if check_fn != None: + deco = fn_checked(deco) + if allowed != None: + deco = checked(deco) + return Property(deco) + return decorator + +class SavedSettingsObject(object): + + # Keep a list of properties that may be stored in the .settings dict. + #settings_properties = [] + + # A list of properties that we save to disk, even if they were + # never set (in which case we save the default value). This + # protects against future changes in default values. + #required_saved_properties = [] + + _setting_name_to_attr_name = setting_name_to_attr_name + _attr_name_to_setting_name = attr_name_to_setting_name + + def __init__(self): + self._settings_loaded = False + self.sync_with_disk = False + self.settings = {} + + def load_settings(self): + """Load the settings from disk.""" + # Override. Must call ._setup_saved_settings() after loading. + self.settings = {} + self._setup_saved_settings() + + def _setup_saved_settings(self, flag_as_loaded=True): + """ + To be run after setting self.settings up from disk. Marks all + settings as primed. + """ + for property in self.settings_properties: + if property not in self.settings: + self.settings[property] = EMPTY + elif self.settings[property] == UNPRIMED: + self.settings[property] = EMPTY + if flag_as_loaded == True: + self._settings_loaded = True + + def save_settings(self): + """Load the settings from disk.""" + # Override. Should save the dict output of ._get_saved_settings() + settings = self._get_saved_settings() + pass # write settings to disk.... + + def _get_saved_settings(self): + settings = {} + for k,v in self.settings.items(): + if v != None and v != EMPTY: + settings[k] = v + for k in self.required_saved_properties: + settings[k] = getattr(self, self._setting_name_to_attr_name(k)) + return settings + + def clear_cached_setting(self, setting=None): + "If setting=None, clear *all* cached settings" + if setting != None: + if hasattr(self, "_%s_cached_value" % setting): + delattr(self, "_%s_cached_value" % setting) + else: + for setting in settings_properties: + self.clear_cached_setting(setting) + + +class SavedSettingsObjectTests(unittest.TestCase): + def testSimpleProperty(self): + """Testing a minimal versioned property""" + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property(name="Content-type", + doc="A test property", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def content_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + # access missing setting + self.failUnless(t._settings_loaded == False, t._settings_loaded) + self.failUnless(len(t.settings) == 0, len(t.settings)) + self.failUnless(t.content_type == None, t.content_type) + # accessing t.content_type triggers the priming, which runs + # t._setup_saved_settings, which fills out t.settings with + # EMPTY data. t._settings_loaded is still false though, since + # the default priming does not do any of the `official' loading + # that occurs in t.load_settings. + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t._settings_loaded == False, t._settings_loaded) + # load settings creates an EMPTY value in the settings array + t.load_settings() + self.failUnless(t._settings_loaded == True, t._settings_loaded) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + # now we set a value + t.content_type = 5 + self.failUnless(t.settings["Content-type"] == 5, + t.settings["Content-type"]) + self.failUnless(t.content_type == 5, t.content_type) + self.failUnless(t.settings["Content-type"] == 5, + t.settings["Content-type"]) + # now we set another value + t.content_type = "text/plain" + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings["Content-type"] == "text/plain", + t.settings["Content-type"]) + self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"}, + t._get_saved_settings()) + # now we clear to the post-primed value + t.content_type = EMPTY + self.failUnless(t._settings_loaded == True, t._settings_loaded) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t.content_type == None, t.content_type) + self.failUnless(len(t.settings) == 1, len(t.settings)) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + def testDefaultingProperty(self): + """Testing a defaulting versioned property""" + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property(name="Content-type", + doc="A test property", + default="text/plain", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def content_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + self.failUnless(t._settings_loaded == False, t._settings_loaded) + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t._settings_loaded == False, t._settings_loaded) + t.load_settings() + self.failUnless(t._settings_loaded == True, t._settings_loaded) + self.failUnless(t.content_type == "text/plain", t.content_type) + self.failUnless(t.settings["Content-type"] == EMPTY, + t.settings["Content-type"]) + self.failUnless(t._get_saved_settings() == {}, t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t.content_type == "text/html", + t.content_type) + self.failUnless(t.settings["Content-type"] == "text/html", + t.settings["Content-type"]) + self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"}, + t._get_saved_settings()) + def testRequiredDefaultingProperty(self): + """Testing a required defaulting versioned property""" + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property(name="Content-type", + doc="A test property", + default="text/plain", + settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + require_save=True) + def content_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"}, + t._get_saved_settings()) + def testClassVersionedPropertyDefinition(self): + """Testing a class-specific _versioned property decorator""" + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + def _versioned_property(settings_properties=settings_properties, + required_saved_properties=required_saved_properties, + **kwargs): + if "settings_properties" not in kwargs: + kwargs["settings_properties"] = settings_properties + if "required_saved_properties" not in kwargs: + kwargs["required_saved_properties"]=required_saved_properties + return versioned_property(**kwargs) + @_versioned_property(name="Content-type", + doc="A test property", + default="text/plain", + require_save=True) + def content_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"}, + t._get_saved_settings()) + t.content_type = "text/html" + self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"}, + t._get_saved_settings()) + def testMutableChangeHookedProperty(self): + """Testing a mutable change-hooked property""" + SAVES = [] + def prop_log_save_settings(self, old, new, saves=SAVES): + saves.append("'%s' -> '%s'" % (str(old), str(new))) + prop_save_settings(self, old, new) + class Test(SavedSettingsObject): + settings_properties = [] + required_saved_properties = [] + @versioned_property(name="List-type", + doc="A test property", + mutable=True, + change_hook=prop_log_save_settings, + settings_properties=settings_properties, + required_saved_properties=required_saved_properties) + def list_type(): return {} + def __init__(self): + SavedSettingsObject.__init__(self) + t = Test() + self.failUnless(t._settings_loaded == False, t._settings_loaded) + t.load_settings() + self.failUnless(SAVES == [], SAVES) + self.failUnless(t._settings_loaded == True, t._settings_loaded) + self.failUnless(t.list_type == None, t.list_type) + self.failUnless(SAVES == [ + "'None' -> ''" + ], SAVES) + self.failUnless(t.settings["List-type"]==EMPTY,t.settings["List-type"]) + t.list_type = [] + self.failUnless(t.settings["List-type"] == [], t.settings["List-type"]) + self.failUnless(SAVES == [ + "'None' -> ''", + "'' -> '[]'" + ], SAVES) + t.list_type.append(5) + self.failUnless(SAVES == [ + "'None' -> ''", + "'' -> '[]'", + ], SAVES) + self.failUnless(t.settings["List-type"] == [5],t.settings["List-type"]) + self.failUnless(SAVES == [ # the append(5) has not yet been saved + "'None' -> ''", + "'' -> '[]'", + ], SAVES) + self.failUnless(t.list_type == [5], t.list_type) # <-get triggers saved + + self.failUnless(SAVES == [ # now the append(5) has been saved. + "'None' -> ''", + "'' -> '[]'", + "'[]' -> '[5]'" + ], SAVES) + +unitsuite=unittest.TestLoader().loadTestsFromTestCase(SavedSettingsObjectTests) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/tree.py b/libbe/tree.py index e69de29..45ae085 100644 --- a/libbe/tree.py +++ b/libbe/tree.py @@ -0,0 +1,179 @@ +# Bugs Everywhere, a distributed bugtracker +# Copyright (C) 2008-2009 W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import doctest + +class Tree(list): + """ + Construct + +-b---d-g + a-+ +-e + +-c-+-f-h-i + with + >>> i = Tree(); i.n = "i" + >>> h = Tree([i]); h.n = "h" + >>> f = Tree([h]); f.n = "f" + >>> e = Tree(); e.n = "e" + >>> c = Tree([f,e]); c.n = "c" + >>> g = Tree(); g.n = "g" + >>> d = Tree([g]); d.n = "d" + >>> b = Tree([d]); b.n = "b" + >>> a = Tree(); a.n = "a" + >>> a.append(c) + >>> a.append(b) + + >>> a.branch_len() + 5 + >>> a.sort(key=lambda node : -node.branch_len()) + >>> "".join([node.n for node in a.traverse()]) + 'acfhiebdg' + >>> a.sort(key=lambda node : node.branch_len()) + >>> "".join([node.n for node in a.traverse()]) + 'abdgcefhi' + >>> "".join([node.n for node in a.traverse(depth_first=False)]) + 'abcdefghi' + >>> for depth,node in a.thread(): + ... print "%*s" % (2*depth+1, node.n) + a + b + d + g + c + e + f + h + i + >>> for depth,node in a.thread(flatten=True): + ... print "%*s" % (2*depth+1, node.n) + a + b + d + g + c + e + f + h + i + >>> a.has_descendant(g) + True + >>> c.has_descendant(g) + False + >>> a.has_descendant(a) + False + >>> a.has_descendant(a, match_self=True) + True + """ + def __eq__(self, other): + return id(self) == id(other) + + def branch_len(self): + """ + Exhaustive search every time == SLOW. + + Use only on small trees, or reimplement by overriding + child-addition methods to allow accurate caching. + + For the tree + +-b---d-g + a-+ +-e + +-c-+-f-h-i + this method returns 5. + """ + if len(self) == 0: + return 1 + else: + return 1 + max([child.branch_len() for child in self]) + + def sort(self, *args, **kwargs): + """ + This method can be slow, e.g. on a branch_len() sort, since a + node at depth N from the root has it's branch_len() method + called N times. + """ + list.sort(self, *args, **kwargs) + for child in self: + child.sort(*args, **kwargs) + + def traverse(self, depth_first=True): + """ + Note: you might want to sort() your tree first. + """ + if depth_first == True: + yield self + for child in self: + for descendant in child.traverse(): + yield descendant + else: # breadth first, Wikipedia algorithm + # http://en.wikipedia.org/wiki/Breadth-first_search + queue = [self] + while len(queue) > 0: + node = queue.pop(0) + yield node + queue.extend(node) + + def thread(self, flatten=False): + """ + When flatten==False, the depth of any node is one greater than + the depth of its parent. That way the inheritance is + explicit, but you can end up with highly indented threads. + + When flatten==True, the depth of any node is only greater than + the depth of its parent when there is a branch, and the node + is not the last child. This can lead to ancestry ambiguity, + but keeps the total indentation down. E.g. + +-b +-b-c + a-+-c and a-+ + +-d-e-f +-d-e-f + would both produce (after sorting by branch_len()) + (0, a) + (1, b) + (1, c) + (0, d) + (0, e) + (0, f) + """ + stack = [] # ancestry of the current node + if flatten == True: + depthDict = {} + + for node in self.traverse(depth_first=True): + while len(stack) > 0 \ + and id(node) not in [id(c) for c in stack[-1]]: + stack.pop(-1) + if flatten == False: + depth = len(stack) + else: + if len(stack) == 0: + depth = 0 + else: + parent = stack[-1] + depth = depthDict[id(parent)] + if len(parent) > 1 and node != parent[-1]: + depth += 1 + depthDict[id(node)] = depth + yield (depth,node) + stack.append(node) + + def has_descendant(self, descendant, depth_first=True, match_self=False): + if descendant == self: + return match_self + for d in self.traverse(depth_first): + if descendant == d: + return True + return False + +suite = doctest.DocTestSuite() diff --git a/libbe/utility.py b/libbe/utility.py index e69de29..3df06b4 100644 --- a/libbe/utility.py +++ b/libbe/utility.py @@ -0,0 +1,129 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# W. Trevor King +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import calendar +import codecs +import os +import shutil +import tempfile +import time +import types +import doctest + +def search_parent_directories(path, filename): + """ + Find the file (or directory) named filename in path or in any + of path's parents. + + e.g. + search_parent_directories("/a/b/c", ".be") + will return the path to the first existing file from + /a/b/c/.be + /a/b/.be + /a/.be + /.be + or None if none of those files exist. + """ + path = os.path.realpath(path) + assert os.path.exists(path) + old_path = None + while True: + check_path = os.path.join(path, filename) + if os.path.exists(check_path): + return check_path + if path == old_path: + return None + old_path = path + path = os.path.dirname(path) + +class Dir (object): + "A temporary directory for testing use" + def __init__(self): + self.path = tempfile.mkdtemp(prefix="BEtest") + self.rmtree = shutil.rmtree # save local reference for __del__ + self.removed = False + def __del__(self): + self.cleanup() + def cleanup(self): + if self.removed == False: + self.rmtree(self.path) + self.removed = True + def __call__(self): + return self.path + +RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000" + + +def time_to_str(time_val): + """Convert a time value into an RFC 2822-formatted string. This format + lacks sub-second data. + >>> time_to_str(0) + 'Thu, 01 Jan 1970 00:00:00 +0000' + """ + return time.strftime(RFC_2822_TIME_FMT, time.gmtime(time_val)) + +def str_to_time(str_time): + """Convert an RFC 2822-fomatted string into a time value. + >>> str_to_time("Thu, 01 Jan 1970 00:00:00 +0000") + 0 + >>> q = time.time() + >>> str_to_time(time_to_str(q)) == int(q) + True + >>> str_to_time("Thu, 01 Jan 1970 00:00:00 -1000") + 36000 + """ + timezone_str = str_time[-5:] + if timezone_str != "+0000": + str_time = str_time.replace(timezone_str, "+0000") + time_val = calendar.timegm(time.strptime(str_time, RFC_2822_TIME_FMT)) + timesign = -int(timezone_str[0]+"1") # "+" -> time_val ahead of GMT + timezone_tuple = time.strptime(timezone_str[1:], "%H%M") + timezone = timezone_tuple.tm_hour*3600 + timezone_tuple.tm_min*60 + return time_val + timesign*timezone + +def handy_time(time_val): + return time.strftime("%a, %d %b %Y %H:%M", time.localtime(time_val)) + +def time_to_gmtime(str_time): + """Convert an RFC 2822-fomatted string to a GMT string. + >>> time_to_gmtime("Thu, 01 Jan 1970 00:00:00 -1000") + 'Thu, 01 Jan 1970 10:00:00 +0000' + """ + time_val = str_to_time(str_time) + return time_to_str(time_val) + +def iterable_full_of_strings(value, alternative=None): + """ + Require an iterable full of strings. + >>> iterable_full_of_strings([]) + True + >>> iterable_full_of_strings(["abc", "def", u"hij"]) + True + >>> iterable_full_of_strings(["abc", None, u"hij"]) + False + >>> iterable_full_of_strings(None, alternative=None) + True + """ + if value == alternative: + return True + elif not hasattr(value, "__iter__"): + return False + for x in value: + if type(x) not in types.StringTypes: + return False + return True + +suite = doctest.DocTestSuite() -- cgit