path: root/libbe
diff options
Diffstat (limited to 'libbe')
21 files changed, 5736 insertions, 0 deletions
diff --git a/libbe/arch.py b/libbe/arch.py
index e69de29..2f45aa9 100644
--- a/libbe/arch.py
+++ b/libbe/arch.py
@@ -0,0 +1,295 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <ben+python@benfinney.id.au>
+# James Rowe <jnrowe@ukfsn.org>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import codecs
+import os
+import re
+import shutil
+import sys
+import time
+import unittest
+import doctest
+import config
+from beuuid import uuid_gen
+import rcs
+from rcs import RCS
+client = config.get_val("arch_client", default=DEFAULT_CLIENT)
+def new():
+ return Arch()
+class Arch(RCS):
+ name = "Arch"
+ client = client
+ versioned = True
+ _archive_name = None
+ _archive_dir = None
+ _tmp_archive = False
+ _project_name = None
+ _tmp_project = False
+ _arch_paramdir = os.path.expanduser("~/.arch-params")
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Arch"""
+ if self._u_search_parent_directories(path, "{arch}") != None :
+ config.set_val("arch_client", client)
+ return True
+ return False
+ def _rcs_init(self, path):
+ self._create_archive(path)
+ self._create_project(path)
+ self._add_project_code(path)
+ def _create_archive(self, path):
+ # Create a new archive
+ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
+ assert self._archive_name == None
+ id = self.get_user_id()
+ name, email = self._u_parse_id(id)
+ if email == None:
+ email = "%s@example.com" % name
+ trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8])
+ self._archive_name = "%s--%s" % (email, trailer)
+ self._archive_dir = "/tmp/%s" % trailer
+ self._tmp_archive = True
+ self._u_invoke_client("make-archive", self._archive_name,
+ self._archive_dir, directory=path)
+ def _invoke_client(self, *args, **kwargs):
+ """
+ Invoke the client on our archive.
+ """
+ assert self._archive_name != None
+ command = args[0]
+ if len(args) > 1:
+ tailargs = args[1:]
+ else:
+ tailargs = []
+ arglist = [command, "-A", self._archive_name]
+ arglist.extend(tailargs)
+ args = tuple(arglist)
+ return self._u_invoke_client(*args, **kwargs)
+ def _remove_archive(self):
+ assert self._tmp_archive == True
+ assert self._archive_dir != None
+ assert self._archive_name != None
+ os.remove(os.path.join(self._arch_paramdir,
+ "=locations", self._archive_name))
+ shutil.rmtree(self._archive_dir)
+ self._tmp_archive = False
+ self._archive_dir = False
+ self._archive_name = False
+ def _create_project(self, path):
+ """
+ Create a temporary Arch project in the directory PATH. This
+ project will be removed by
+ __del__->cleanup->_rcs_cleanup->_remove_project
+ """
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
+ category = "bugs-everywhere"
+ branch = "mainline"
+ version = "0.1"
+ self._project_name = "%s--%s--%s" % (category, branch, version)
+ self._invoke_client("archive-setup", self._project_name,
+ directory=path)
+ self._tmp_project = True
+ def _remove_project(self):
+ assert self._tmp_project == True
+ assert self._project_name != None
+ assert self._archive_dir != None
+ shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
+ self._tmp_project = False
+ self._project_name = False
+ def _archive_project_name(self):
+ assert self._archive_name != None
+ assert self._project_name != None
+ return "%s/%s" % (self._archive_name, self._project_name)
+ def _adjust_naming_conventions(self, path):
+ """
+ By default, Arch restricts source code filenames to
+ ^[_=a-zA-Z0-9].*$
+ See
+ http://regexps.srparish.net/tutorial-tla/naming-conventions.html
+ Since our bug directory '.be' doesn't satisfy these conventions,
+ we need to adjust them.
+ The conventions are specified in
+ project-root/{arch}/=tagging-method
+ """
+ tagpath = os.path.join(path, "{arch}", "=tagging-method")
+ lines_out = []
+ f = codecs.open(tagpath, "r", self.encoding)
+ for line in f:
+ if line.startswith("source "):
+ lines_out.append("source ^[._=a-zA-X0-9].*$\n")
+ else:
+ lines_out.append(line)
+ f.close()
+ f = codecs.open(tagpath, "w", self.encoding)
+ f.write("".join(lines_out))
+ f.close()
+ def _add_project_code(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-source.html
+ # http://regexps.srparish.net/tutorial-tla/importing-first.html
+ self._invoke_client("init-tree", self._project_name,
+ directory=path)
+ self._adjust_naming_conventions(path)
+ self._invoke_client("import", "--summary", "Began versioning",
+ directory=path)
+ def _rcs_cleanup(self):
+ if self._tmp_project == True:
+ self._remove_project()
+ if self._tmp_archive == True:
+ self._remove_archive()
+ def _rcs_root(self, path):
+ if not os.path.isdir(path):
+ dirname = os.path.dirname(path)
+ else:
+ dirname = path
+ status,output,error = self._u_invoke_client("tree-root", dirname)
+ root = output.rstrip('\n')
+ self._get_archive_project_name(root)
+ return root
+ def _get_archive_name(self, root):
+ status,output,error = self._u_invoke_client("archives")
+ lines = output.split('\n')
+ # e.g. output:
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52
+ # /tmp/BEtestXXXXXX/rootdir
+ # (+ repeats)
+ for archive,location in zip(lines[::2], lines[1::2]):
+ if os.path.realpath(location) == os.path.realpath(root):
+ self._archive_name = archive
+ assert self._archive_name != None
+ def _get_archive_project_name(self, root):
+ # get project names
+ status,output,error = self._u_invoke_client("tree-version", directory=root)
+ # e.g output
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1
+ archive_name,project_name = output.rstrip('\n').split('/')
+ self._archive_name = archive_name
+ self._project_name = project_name
+ def _rcs_get_user_id(self):
+ try:
+ status,output,error = self._u_invoke_client('my-id')
+ return output.rstrip('\n')
+ except Exception, e:
+ if 'no arch user id set' in e.args[0]:
+ return None
+ else:
+ raise
+ def _rcs_set_user_id(self, value):
+ self._u_invoke_client('my-id', value)
+ def _rcs_add(self, path):
+ self._u_invoke_client("add-id", path)
+ realpath = os.path.realpath(self._u_abspath(path))
+ pathAdded = realpath in self._list_added(self.rootdir)
+ if self.paranoid and not pathAdded:
+ self._force_source(path)
+ def _list_added(self, root):
+ assert os.path.exists(root)
+ assert os.access(root, os.X_OK)
+ root = os.path.realpath(root)
+ status,output,error = self._u_invoke_client("inventory", "--source",
+ "--both", "--all", root)
+ inv_str = output.rstrip('\n')
+ return [os.path.join(root, p) for p in inv_str.split('\n')]
+ def _add_dir_rule(self, rule, dirname, root):
+ inv_path = os.path.join(dirname, '.arch-inventory')
+ f = codecs.open(inv_path, "a", self.encoding)
+ f.write(rule)
+ f.close()
+ if os.path.realpath(inv_path) not in self._list_added(root):
+ paranoid = self.paranoid
+ self.paranoid = False
+ self.add(inv_path)
+ self.paranoid = paranoid
+ def _force_source(self, path):
+ rule = "source %s\n" % self._u_rel_path(path)
+ self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
+ if os.path.realpath(path) not in self._list_added(self.rootdir):
+ raise CantAddFile(path)
+ def _rcs_remove(self, path):
+ if not '.arch-ids' in path:
+ self._u_invoke_client("delete-id", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ else:
+ status,output,error = \
+ self._invoke_client("file-find", path, revision)
+ relpath = output.rstrip('\n')
+ abspath = os.path.join(self.rootdir, relpath)
+ f = codecs.open(abspath, "r", self.encoding)
+ contents = f.read()
+ f.close()
+ return contents
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ status,output,error = \
+ self._u_invoke_client("get", revision,directory)
+ def _rcs_commit(self, commitfile, allow_empty=False):
+ if allow_empty == False:
+ # arch applies empty commits without complaining, so check first
+ status,output,error = self._u_invoke_client("changes",expect=(0,1))
+ if status == 0:
+ raise rcs.EmptyCommit()
+ summary,body = self._u_parse_commitfile(commitfile)
+ args = ["commit", "--summary", summary]
+ if body != None:
+ args.extend(["--log-message",body])
+ status,output,error = self._u_invoke_client(*args)
+ revision = None
+ revline = re.compile("[*] committed (.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revpath = match.groups()[0]
+ assert not " " in revpath, revpath
+ assert revpath.startswith(self._archive_project_name()+'--')
+ revision = revpath[len(self._archive_project_name()+'--'):]
+ return revpath
+class CantAddFile(Exception):
+ def __init__(self, file):
+ self.file = file
+ Exception.__init__(self, "Can't automatically add file %s" % file)
+rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__])
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/beuuid.py b/libbe/beuuid.py
index e69de29..bc47208 100644
--- a/libbe/beuuid.py
+++ b/libbe/beuuid.py
@@ -0,0 +1,61 @@
+# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+Backwards compatibility support for Python 2.4. Once people give up
+on 2.4 ;), the uuid call should be merged into bugdir.py
+import unittest
+ from uuid import uuid4 # Python >= 2.5
+ def uuid_gen():
+ id = uuid4()
+ idstr = id.urn
+ start = "urn:uuid:"
+ assert idstr.startswith(start)
+ return idstr[len(start):]
+except ImportError:
+ import os
+ import sys
+ from subprocess import Popen, PIPE
+ def uuid_gen():
+ # Shell-out to system uuidgen
+ args = ['uuidgen', 'r']
+ try:
+ if sys.platform != "win32":
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ else:
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ shell=True, cwd=cwd)
+ except OSError, e :
+ strerror = "%s\nwhile executing %s" % (e.args[1], args)
+ raise OSError, strerror
+ output, error = q.communicate()
+ status = q.wait()
+ if status != 0:
+ strerror = "%s\nwhile executing %s" % (status, args)
+ raise Exception, strerror
+ return output.rstrip('\n')
+class UUIDtestCase(unittest.TestCase):
+ def testUUID_gen(self):
+ id = uuid_gen()
+ self.failUnless(len(id) == 36, "invalid UUID '%s'" % id)
+suite = unittest.TestLoader().loadTestsFromTestCase(UUIDtestCase)
diff --git a/libbe/bug.py b/libbe/bug.py
index e69de29..c1e5481 100644
--- a/libbe/bug.py
+++ b/libbe/bug.py
@@ -0,0 +1,547 @@
+# Copyright (C) 2008-2009 Chris Ball <cjb@laptop.org>
+# Thomas Habets <thomas@habets.pp.se>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import os
+import os.path
+import errno
+import time
+import types
+import xml.sax.saxutils
+import doctest
+from beuuid import uuid_gen
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, cached_property, \
+ primed_property, change_hook_property, settings_property
+import settings_object
+import mapfile
+import comment
+import utility
+### Define and describe valid bug categories
+# Use a tuple of (category, description) tuples since we don't have
+# ordered dicts in Python yet http://www.python.org/dev/peps/pep-0372/
+# in order of increasing severity. (name, description) pairs
+severity_def = (
+ ("wishlist","A feature that could improve usefulness, but not a bug."),
+ ("minor","The standard bug level."),
+ ("serious","A bug that requires workarounds."),
+ ("critical","A bug that prevents some features from working at all."),
+ ("fatal","A bug that makes the package unusable."))
+# in order of increasing resolution
+# roughly following http://www.bugzilla.org/docs/3.2/en/html/lifecycle.html
+active_status_def = (
+ ("unconfirmed","A possible bug which lacks independent existance confirmation."),
+ ("open","A working bug that has not been assigned to a developer."),
+ ("assigned","A working bug that has been assigned to a developer."),
+ ("test","The code has been adjusted, but the fix is still being tested."))
+inactive_status_def = (
+ ("closed", "The bug is no longer relevant."),
+ ("fixed", "The bug should no longer occur."),
+ ("wontfix","It's not a bug, it's a feature."))
+### Convert the description tuples to more useful formats
+severity_values = ()
+severity_description = {}
+severity_index = {}
+def load_severities(severity_def):
+ global severity_values
+ global severity_description
+ global severity_index
+ if severity_def == None:
+ return
+ severity_values = tuple([val for val,description in severity_def])
+ severity_description = dict(severity_def)
+ severity_index = {}
+ for i,severity in enumerate(severity_values):
+ severity_index[severity] = i
+active_status_values = []
+inactive_status_values = []
+status_values = []
+status_description = {}
+status_index = {}
+def load_status(active_status_def, inactive_status_def):
+ global active_status_values
+ global inactive_status_values
+ global status_values
+ global status_description
+ global status_index
+ if active_status_def == None:
+ active_status_def = globals()["active_status_def"]
+ if inactive_status_def == None:
+ inactive_status_def = globals()["inactive_status_def"]
+ active_status_values = tuple([val for val,description in active_status_def])
+ inactive_status_values = tuple([val for val,description in inactive_status_def])
+ status_values = active_status_values + inactive_status_values
+ status_description = dict(tuple(active_status_def) + tuple(inactive_status_def))
+ status_index = {}
+ for i,status in enumerate(status_values):
+ status_index[status] = i
+load_status(active_status_def, inactive_status_def)
+class Bug(settings_object.SavedSettingsObject):
+ """
+ >>> b = Bug()
+ >>> print b.status
+ open
+ >>> print b.severity
+ minor
+ There are two formats for time, int and string. Setting either
+ one will adjust the other appropriately. The string form is the
+ one stored in the bug's settings file on disk.
+ >>> print type(b.time)
+ <type 'int'>
+ >>> print type(b.time_string)
+ <type 'str'>
+ >>> b.time = 0
+ >>> print b.time_string
+ Thu, 01 Jan 1970 00:00:00 +0000
+ >>> b.time_string="Thu, 01 Jan 1970 00:01:00 +0000"
+ >>> b.time
+ 60
+ >>> print b.settings["time"]
+ Thu, 01 Jan 1970 00:01:00 +0000
+ """
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+ @_versioned_property(name="severity",
+ doc="A measure of the bug's importance",
+ default="minor",
+ check_fn=lambda s: s in severity_values,
+ require_save=True)
+ def severity(): return {}
+ @_versioned_property(name="status",
+ doc="The bug's current status",
+ default="open",
+ check_fn=lambda s: s in status_values,
+ require_save=True)
+ def status(): return {}
+ @property
+ def active(self):
+ return self.status in active_status_values
+ @_versioned_property(name="target",
+ doc="The deadline for fixing this bug")
+ def target(): return {}
+ @_versioned_property(name="creator",
+ doc="The user who entered the bug into the system")
+ def creator(): return {}
+ @_versioned_property(name="reporter",
+ doc="The user who reported the bug")
+ def reporter(): return {}
+ @_versioned_property(name="assigned",
+ doc="The developer in charge of the bug")
+ def assigned(): return {}
+ @_versioned_property(name="time",
+ doc="An RFC 2822 timestamp for bug creation")
+ def time_string(): return {}
+ def _get_time(self):
+ if self.time_string == None:
+ return None
+ return utility.str_to_time(self.time_string)
+ def _set_time(self, value):
+ self.time_string = utility.time_to_str(value)
+ time = property(fget=_get_time,
+ fset=_set_time,
+ doc="An integer version of .time_string")
+ def _extra_strings_check_fn(value):
+ return utility.iterable_full_of_strings(value, \
+ alternative=settings_object.EMPTY)
+ def _extra_strings_change_hook(self, old, new):
+ self.extra_strings.sort() # to make merging easier
+ self._prop_save_settings(old, new)
+ @_versioned_property(name="extra_strings",
+ doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
+ default=[],
+ check_fn=_extra_strings_check_fn,
+ change_hook=_extra_strings_change_hook,
+ mutable=True)
+ def extra_strings(): return {}
+ @_versioned_property(name="summary",
+ doc="A one-line bug description")
+ def summary(): return {}
+ def _get_comment_root(self, load_full=False):
+ if self.sync_with_disk:
+ return comment.loadComments(self, load_full=load_full)
+ else:
+ return comment.Comment(self, uuid=comment.INVALID_UUID)
+ @Property
+ @cached_property(generator=_get_comment_root)
+ @local_property("comment_root")
+ @doc_property(doc="The trunk of the comment tree")
+ def comment_root(): return {}
+ def _get_rcs(self):
+ if hasattr(self.bugdir, "rcs"):
+ return self.bugdir.rcs
+ @Property
+ @cached_property(generator=_get_rcs)
+ @local_property("rcs")
+ @doc_property(doc="A revision control system instance.")
+ def rcs(): return {}
+ def __init__(self, bugdir=None, uuid=None, from_disk=False,
+ load_comments=False, summary=None):
+ settings_object.SavedSettingsObject.__init__(self)
+ self.bugdir = bugdir
+ self.uuid = uuid
+ if from_disk == True:
+ self.sync_with_disk = True
+ else:
+ self.sync_with_disk = False
+ if uuid == None:
+ self.uuid = uuid_gen()
+ self.time = int(time.time()) # only save to second precision
+ if self.rcs != None:
+ self.creator = self.rcs.get_user_id()
+ self.summary = summary
+ def __repr__(self):
+ return "Bug(uuid=%r)" % self.uuid
+ def set_sync_with_disk(self, value):
+ self.sync_with_disk = value
+ for comment in self.comments():
+ comment.set_sync_with_disk(value)
+ def _setting_attr_string(self, setting):
+ value = getattr(self, setting)
+ if value == None:
+ return ""
+ return str(value)
+ def xml(self, show_comments=False):
+ if self.bugdir == None:
+ shortname = self.uuid
+ else:
+ shortname = self.bugdir.bug_shortname(self)
+ if self.time == None:
+ timestring = ""
+ else:
+ timestring = utility.time_to_str(self.time)
+ info = [("uuid", self.uuid),
+ ("short-name", shortname),
+ ("severity", self.severity),
+ ("status", self.status),
+ ("assigned", self.assigned),
+ ("target", self.target),
+ ("reporter", self.reporter),
+ ("creator", self.creator),
+ ("created", timestring),
+ ("summary", self.summary)]
+ ret = '<bug>\n'
+ for (k,v) in info:
+ if v is not None:
+ ret += ' <%s>%s</%s>\n' % (k,xml.sax.saxutils.escape(v),k)
+ for estr in self.extra_strings:
+ ret += ' <extra-string>%s</extra-string>\n' % estr
+ if show_comments == True:
+ comout = self.comment_root.xml_thread(auto_name_map=True,
+ bug_shortname=shortname)
+ if len(comout) > 0:
+ ret += comout+'\n'
+ ret += '</bug>'
+ return ret
+ def string(self, shortlist=False, show_comments=False):
+ if self.bugdir == None:
+ shortname = self.uuid
+ else:
+ shortname = self.bugdir.bug_shortname(self)
+ if shortlist == False:
+ if self.time == None:
+ timestring = ""
+ else:
+ htime = utility.handy_time(self.time)
+ timestring = "%s (%s)" % (htime, self.time_string)
+ info = [("ID", self.uuid),
+ ("Short name", shortname),
+ ("Severity", self.severity),
+ ("Status", self.status),
+ ("Assigned", self._setting_attr_string("assigned")),
+ ("Target", self._setting_attr_string("target")),
+ ("Reporter", self._setting_attr_string("reporter")),
+ ("Creator", self._setting_attr_string("creator")),
+ ("Created", timestring)]
+ longest_key_len = max([len(k) for k,v in info])
+ infolines = [" %*s : %s\n" %(longest_key_len,k,v) for k,v in info]
+ bugout = "".join(infolines) + "%s" % self.summary.rstrip('\n')
+ else:
+ statuschar = self.status[0]
+ severitychar = self.severity[0]
+ chars = "%c%c" % (statuschar, severitychar)
+ bugout = "%s:%s: %s" % (shortname,chars,self.summary.rstrip('\n'))
+ if show_comments == True:
+ # take advantage of the string_thread(auto_name_map=True)
+ # SIDE-EFFECT of sorting by comment time.
+ comout = self.comment_root.string_thread(flatten=False,
+ auto_name_map=True,
+ bug_shortname=shortname)
+ output = bugout + '\n' + comout.rstrip('\n')
+ else :
+ output = bugout
+ return output
+ def __str__(self):
+ return self.string(shortlist=True)
+ def __cmp__(self, other):
+ return cmp_full(self, other)
+ def get_path(self, name=None):
+ my_dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid)
+ if name is None:
+ return my_dir
+ assert name in ["values", "comments"]
+ return os.path.join(my_dir, name)
+ def load_settings(self):
+ self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
+ self._setup_saved_settings()
+ def load_comments(self, load_full=True):
+ if load_full == True:
+ # Force a complete load of the whole comment tree
+ self.comment_root = self._get_comment_root(load_full=True)
+ else:
+ # Setup for fresh lazy-loading. Clear _comment_root, so
+ # _get_comment_root returns a fresh version. Turn of
+ # syncing temporarily so we don't write our blank comment
+ # tree to disk.
+ self.sync_with_disk = False
+ self.comment_root = None
+ self.sync_with_disk = True
+ def save_settings(self):
+ assert self.summary != None, "Can't save blank bug"
+ self.rcs.mkdir(self.get_path())
+ path = self.get_path("values")
+ mapfile.map_save(self.rcs, path, self._get_saved_settings())
+ def save(self):
+ """
+ Save any loaded contents to disk. Because of lazy loading of
+ comments, this is actually not too inefficient.
+ However, if self.sync_with_disk = True, then any changes are
+ automatically written to disk as soon as they happen, so
+ calling this method will just waste time (unless something
+ else has been messing with your on-disk files).
+ """
+ self.save_settings()
+ if len(self.comment_root) > 0:
+ comment.saveComments(self)
+ def remove(self):
+ self.comment_root.remove()
+ path = self.get_path()
+ self.rcs.recursive_remove(path)
+ def comments(self):
+ for comment in self.comment_root.traverse():
+ yield comment
+ def new_comment(self, body=None):
+ comm = self.comment_root.new_reply(body=body)
+ return comm
+ def comment_from_shortname(self, shortname, *args, **kwargs):
+ return self.comment_root.comment_from_shortname(shortname,
+ *args, **kwargs)
+ def comment_from_uuid(self, uuid):
+ return self.comment_root.comment_from_uuid(uuid)
+ def comment_shortnames(self, shortname=None):
+ """
+ SIDE-EFFECT : Comment.comment_shortnames will sort the comment
+ tree by comment.time
+ """
+ for id, comment in self.comment_root.comment_shortnames(shortname):
+ yield (id, comment)
+# The general rule for bug sorting is that "more important" bugs are
+# less than "less important" bugs. This way sorting a list of bugs
+# will put the most important bugs first in the list. When relative
+# importance is unclear, the sorting follows some arbitrary convention
+# (i.e. dictionary order).
+def cmp_severity(bug_1, bug_2):
+ """
+ Compare the severity levels of two bugs, with more severe bugs
+ comparing as less.
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.severity = bugB.severity = "wishlist"
+ >>> cmp_severity(bugA, bugB) == 0
+ True
+ >>> bugB.severity = "minor"
+ >>> cmp_severity(bugA, bugB) > 0
+ True
+ >>> bugA.severity = "critical"
+ >>> cmp_severity(bugA, bugB) < 0
+ True
+ """
+ if not hasattr(bug_2, "severity") :
+ return 1
+ return -cmp(severity_index[bug_1.severity], severity_index[bug_2.severity])
+def cmp_status(bug_1, bug_2):
+ """
+ Compare the status levels of two bugs, with more 'open' bugs
+ comparing as less.
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.status = bugB.status = "open"
+ >>> cmp_status(bugA, bugB) == 0
+ True
+ >>> bugB.status = "closed"
+ >>> cmp_status(bugA, bugB) < 0
+ True
+ >>> bugA.status = "fixed"
+ >>> cmp_status(bugA, bugB) > 0
+ True
+ """
+ if not hasattr(bug_2, "status") :
+ return 1
+ val_2 = status_index[bug_2.status]
+ return cmp(status_index[bug_1.status], status_index[bug_2.status])
+def cmp_attr(bug_1, bug_2, attr, invert=False):
+ """
+ Compare a general attribute between two bugs using the conventional
+ comparison rule for that attribute type. If invert == True, sort
+ *against* that convention.
+ >>> attr="severity"
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.severity = "critical"
+ >>> bugB.severity = "wishlist"
+ >>> cmp_attr(bugA, bugB, attr) < 0
+ True
+ >>> cmp_attr(bugA, bugB, attr, invert=True) > 0
+ True
+ >>> bugB.severity = "critical"
+ >>> cmp_attr(bugA, bugB, attr) == 0
+ True
+ """
+ if not hasattr(bug_2, attr) :
+ return 1
+ val_1 = getattr(bug_1, attr)
+ val_2 = getattr(bug_2, attr)
+ if val_1 == None: val_1 = None
+ if val_2 == None: val_2 = None
+ if invert == True :
+ return -cmp(val_1, val_2)
+ else :
+ return cmp(val_1, val_2)
+# alphabetical rankings (a < z)
+cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator")
+cmp_assigned = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "assigned")
+# chronological rankings (newer < older)
+cmp_time = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "time", invert=True)
+def cmp_comments(bug_1, bug_2):
+ """
+ Compare two bugs' comments lists. Doesn't load any new comments,
+ so you should call each bug's .load_comments() first if you want a
+ full comparison.
+ """
+ comms_1 = sorted(bug_1.comments(), key = lambda comm : comm.uuid)
+ comms_2 = sorted(bug_2.comments(), key = lambda comm : comm.uuid)
+ result = cmp(len(comms_1), len(comms_2))
+ if result != 0:
+ return result
+ for c_1,c_2 in zip(comms_1, comms_2):
+ result = cmp(c_1, c_2)
+ if result != 0:
+ return result
+ return 0
+ (cmp_status,cmp_severity,cmp_assigned,cmp_time,cmp_creator,cmp_comments)
+class BugCompoundComparator (object):
+ def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST):
+ self.cmp_list = cmp_list
+ def __call__(self, bug_1, bug_2):
+ for comparison in self.cmp_list :
+ val = comparison(bug_1, bug_2)
+ if val != 0 :
+ return val
+ return 0
+cmp_full = BugCompoundComparator()
+# define some bonus cmp_* functions
+def cmp_last_modified(bug_1, bug_2):
+ """
+ Like cmp_time(), but use most recent comment instead of bug
+ creation for the timestamp.
+ """
+ def last_modified(bug):
+ time = bug.time
+ for comment in bug.comment_root.traverse():
+ if comment.time > time:
+ time = comment.time
+ return time
+ val_1 = last_modified(bug_1)
+ val_2 = last_modified(bug_2)
+ return -cmp(val_1, val_2)
+suite = doctest.DocTestSuite()
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index e69de29..6e020ee 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -0,0 +1,676 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Alexander Belchenko <bialix@ukr.net>
+# Chris Ball <cjb@laptop.org>
+# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import os
+import os.path
+import errno
+import time
+import copy
+import unittest
+import doctest
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+import settings_object
+import mapfile
+import bug
+import rcs
+import encoding
+import utility
+class NoBugDir(Exception):
+ def __init__(self, path):
+ msg = "The directory \"%s\" has no bug directory." % path
+ Exception.__init__(self, msg)
+ self.path = path
+class NoRootEntry(Exception):
+ def __init__(self, path):
+ self.path = path
+ Exception.__init__(self, "Specified root does not exist: %s" % path)
+class AlreadyInitialized(Exception):
+ def __init__(self, path):
+ self.path = path
+ Exception.__init__(self,
+ "Specified root is already initialized: %s" % path)
+class MultipleBugMatches(ValueError):
+ def __init__(self, shortname, matches):
+ msg = ("More than one bug matches %s. "
+ "Please be more specific.\n%s" % (shortname, matches))
+ ValueError.__init__(self, msg)
+ self.shortname = shortname
+ self.matches = matches
+TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
+class BugDir (list, settings_object.SavedSettingsObject):
+ """
+ Sink to existing root
+ ======================
+ Consider the following usage case:
+ You have a bug directory rooted in
+ /path/to/source
+ by which I mean the '.be' directory is at
+ /path/to/source/.be
+ However, you're of in some subdirectory like
+ /path/to/source/GUI/testing
+ and you want to comment on a bug. Setting sink_to_root=True wen
+ you initialize your BugDir will cause it to search for the '.be'
+ file in the ancestors of the path you passed in as 'root'.
+ /path/to/source/GUI/testing/.be miss
+ /path/to/source/GUI/.be miss
+ /path/to/source/.be hit!
+ So it still roots itself appropriately without much work for you.
+ File-system access
+ ==================
+ BugDirs live completely in memory when .sync_with_disk is False.
+ This is the default configuration setup by BugDir(from_disk=False).
+ If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
+ any changes to the BugDir will be immediately written to disk.
+ If you want to change .sync_with_disk, we suggest you use
+ .set_sync_with_disk(), which propogates the new setting through to
+ all bugs/comments/etc. that have been loaded into memory. If
+ you've been living in memory and want to move to
+ .sync_with_disk==True, but you're not sure if anything has been
+ changed in memoryy, a call to save() is a safe move.
+ Regardless of .sync_with_disk, a call to .save() will write out
+ all the contents that the BugDir instance has loaded into memory.
+ If sync_with_disk has been True over the course of all interesting
+ changes, this .save() call will be a waste of time.
+ The BugDir will only load information from the file system when it
+ loads new bugs/comments that it doesn't already have in memory, or
+ when it explicitly asked to do so (e.g. .load() or
+ __init__(from_disk=True)).
+ Allow RCS initialization
+ ========================
+ This one is for testing purposes. Setting it to True allows the
+ BugDir to search for an installed RCS backend and initialize it in
+ the root directory. This is a convenience option for supporting
+ tests of versioning functionality (e.g. .duplicate_bugdir).
+ Disable encoding manipulation
+ =============================
+ This one is for testing purposed. You might have non-ASCII
+ Unicode in your bugs, comments, files, etc. BugDir instances try
+ and support your preferred encoding scheme (e.g. "utf-8") when
+ dealing with stream and file input/output. For stream output,
+ this involves replacing sys.stdout and sys.stderr
+ (libbe.encode.set_IO_stream_encodings). However this messes up
+ doctest's output catching. In order to support doctest tests
+ using BugDirs, set manipulate_encodings=False, and stick to ASCII
+ in your tests.
+ """
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+ @_versioned_property(name="target",
+ doc="The current project development target.")
+ def target(): return {}
+ def _guess_encoding(self):
+ return encoding.get_encoding()
+ def _check_encoding(value):
+ if value != None:
+ return encoding.known_encoding(value)
+ def _setup_encoding(self, new_encoding):
+ # change hook called before generator.
+ if new_encoding not in [None, settings_object.EMPTY]:
+ if self._manipulate_encodings == True:
+ encoding.set_IO_stream_encodings(new_encoding)
+ def _set_encoding(self, old_encoding, new_encoding):
+ self._setup_encoding(new_encoding)
+ self._prop_save_settings(old_encoding, new_encoding)
+ @_versioned_property(name="encoding",
+ doc="""The default input/output encoding to use (e.g. "utf-8").""",
+ change_hook=_set_encoding,
+ generator=_guess_encoding,
+ check_fn=_check_encoding)
+ def encoding(): return {}
+ def _setup_user_id(self, user_id):
+ self.rcs.user_id = user_id
+ def _guess_user_id(self):
+ return self.rcs.get_user_id()
+ def _set_user_id(self, old_user_id, new_user_id):
+ self._setup_user_id(new_user_id)
+ self._prop_save_settings(old_user_id, new_user_id)
+ @_versioned_property(name="user_id",
+ doc=
+"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note
+that the Arch RCS backend *enforces* ids with this format.""",
+ change_hook=_set_user_id,
+ generator=_guess_user_id)
+ def user_id(): return {}
+ @_versioned_property(name="default_assignee",
+ doc=
+"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
+ def default_assignee(): return {}
+ @_versioned_property(name="rcs_name",
+ doc="""The name of the current RCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .rcs instead, and
+.rcs_name will be automatically adjusted.""",
+ default="None",
+ allowed=["None", "Arch", "bzr", "darcs", "git", "hg"])
+ def rcs_name(): return {}
+ def _get_rcs(self, rcs_name=None):
+ """Get and root a new revision control system"""
+ if rcs_name == None:
+ rcs_name = self.rcs_name
+ new_rcs = rcs.rcs_by_name(rcs_name)
+ self._change_rcs(None, new_rcs)
+ return new_rcs
+ def _change_rcs(self, old_rcs, new_rcs):
+ new_rcs.encoding = self.encoding
+ new_rcs.root(self.root)
+ self.rcs_name = new_rcs.name
+ @Property
+ @change_hook_property(hook=_change_rcs)
+ @cached_property(generator=_get_rcs)
+ @local_property("rcs")
+ @doc_property(doc="A revision control system instance.")
+ def rcs(): return {}
+ def _bug_map_gen(self):
+ map = {}
+ for bug in self:
+ map[bug.uuid] = bug
+ for uuid in self.list_uuids():
+ if uuid not in map:
+ map[uuid] = None
+ self._bug_map_value = map # ._bug_map_value used by @local_property
+ def _extra_strings_check_fn(value):
+ return utility.iterable_full_of_strings(value, \
+ alternative=settings_object.EMPTY)
+ def _extra_strings_change_hook(self, old, new):
+ self.extra_strings.sort() # to make merging easier
+ self._prop_save_settings(old, new)
+ @_versioned_property(name="extra_strings",
+ doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
+ default=[],
+ check_fn=_extra_strings_check_fn,
+ change_hook=_extra_strings_change_hook,
+ mutable=True)
+ def extra_strings(): return {}
+ @Property
+ @primed_property(primer=_bug_map_gen)
+ @local_property("bug_map")
+ @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
+ def _bug_map(): return {}
+ def _setup_severities(self, severities):
+ if severities not in [None, settings_object.EMPTY]:
+ bug.load_severities(severities)
+ def _set_severities(self, old_severities, new_severities):
+ self._setup_severities(new_severities)
+ self._prop_save_settings(old_severities, new_severities)
+ @_versioned_property(name="severities",
+ doc="The allowed bug severities and their descriptions.",
+ change_hook=_set_severities)
+ def severities(): return {}
+ def _setup_status(self, active_status, inactive_status):
+ bug.load_status(active_status, inactive_status)
+ def _set_active_status(self, old_active_status, new_active_status):
+ self._setup_status(new_active_status, self.inactive_status)
+ self._prop_save_settings(old_active_status, new_active_status)
+ @_versioned_property(name="active_status",
+ doc="The allowed active bug states and their descriptions.",
+ change_hook=_set_active_status)
+ def active_status(): return {}
+ def _set_inactive_status(self, old_inactive_status, new_inactive_status):
+ self._setup_status(self.active_status, new_inactive_status)
+ self._prop_save_settings(old_inactive_status, new_inactive_status)
+ @_versioned_property(name="inactive_status",
+ doc="The allowed inactive bug states and their descriptions.",
+ change_hook=_set_inactive_status)
+ def inactive_status(): return {}
+ def __init__(self, root=None, sink_to_existing_root=True,
+ assert_new_BugDir=False, allow_rcs_init=False,
+ manipulate_encodings=True,
+ from_disk=False, rcs=None):
+ list.__init__(self)
+ settings_object.SavedSettingsObject.__init__(self)
+ self._manipulate_encodings = manipulate_encodings
+ if root == None:
+ root = os.getcwd()
+ if sink_to_existing_root == True:
+ self.root = self._find_root(root)
+ else:
+ if not os.path.exists(root):
+ raise NoRootEntry(root)
+ self.root = root
+ # get a temporary rcs until we've loaded settings
+ self.sync_with_disk = False
+ self.rcs = self._guess_rcs()
+ if from_disk == True:
+ self.sync_with_disk = True
+ self.load()
+ else:
+ self.sync_with_disk = False
+ if assert_new_BugDir == True:
+ if os.path.exists(self.get_path()):
+ raise AlreadyInitialized, self.get_path()
+ if rcs == None:
+ rcs = self._guess_rcs(allow_rcs_init)
+ self.rcs = rcs
+ self._setup_user_id(self.user_id)
+ def set_sync_with_disk(self, value):
+ self.sync_with_disk = value
+ for bug in self:
+ bug.set_sync_with_disk(value)
+ def _find_root(self, path):
+ """
+ Search for an existing bug database dir and it's ancestors and
+ return a BugDir rooted there.
+ """
+ if not os.path.exists(path):
+ raise NoRootEntry(path)
+ versionfile=utility.search_parent_directories(path,
+ os.path.join(".be", "version"))
+ if versionfile != None:
+ beroot = os.path.dirname(versionfile)
+ root = os.path.dirname(beroot)
+ return root
+ else:
+ beroot = utility.search_parent_directories(path, ".be")
+ if beroot == None:
+ raise NoBugDir(path)
+ return beroot
+ def get_version(self, path=None, use_none_rcs=False):
+ if use_none_rcs == True:
+ RCS = rcs.rcs_by_name("None")
+ RCS.root(self.root)
+ RCS.encoding = encoding.get_encoding()
+ else:
+ RCS = self.rcs
+ if path == None:
+ path = self.get_path("version")
+ tree_version = RCS.get_file_contents(path)
+ return tree_version
+ def set_version(self):
+ self.rcs.mkdir(self.get_path())
+ self.rcs.set_file_contents(self.get_path("version"),
+ def get_path(self, *args):
+ my_dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return my_dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(my_dir, *args)
+ def _guess_rcs(self, allow_rcs_init=False):
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ new_rcs = rcs.detect_rcs(deepdir)
+ install = False
+ if new_rcs.name == "None":
+ if allow_rcs_init == True:
+ new_rcs = rcs.installed_rcs()
+ new_rcs.init(self.root)
+ return new_rcs
+ def load(self):
+ version = self.get_version(use_none_rcs=True)
+ if version != TREE_VERSION_STRING:
+ raise NotImplementedError, \
+ "BugDir cannot handle version '%s' yet." % version
+ else:
+ if not os.path.exists(self.get_path()):
+ raise NoBugDir(self.get_path())
+ self.load_settings()
+ self.rcs = rcs.rcs_by_name(self.rcs_name)
+ self._setup_user_id(self.user_id)
+ def load_all_bugs(self):
+ "Warning: this could take a while."
+ self._clear_bugs()
+ for uuid in self.list_uuids():
+ self._load_bug(uuid)
+ def save(self):
+ """
+ Save any loaded contents to disk. Because of lazy loading of
+ bugs and comments, this is actually not too inefficient.
+ However, if self.sync_with_disk = True, then any changes are
+ automatically written to disk as soon as they happen, so
+ calling this method will just waste time (unless something
+ else has been messing with your on-disk files).
+ """
+ self.set_version()
+ self.save_settings()
+ for bug in self:
+ bug.save()
+ def load_settings(self):
+ self.settings = self._get_settings(self.get_path("settings"))
+ self._setup_saved_settings()
+ self._setup_user_id(self.user_id)
+ self._setup_encoding(self.encoding)
+ self._setup_severities(self.severities)
+ self._setup_status(self.active_status, self.inactive_status)
+ def _get_settings(self, settings_path):
+ allow_no_rcs = not self.rcs.path_in_root(settings_path)
+ # allow_no_rcs=True should only be for the special case of
+ # configuring duplicate bugdir settings
+ try:
+ settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
+ except rcs.NoSuchFile:
+ settings = {"rcs_name": "None"}
+ return settings
+ def save_settings(self):
+ settings = self._get_saved_settings()
+ self._save_settings(self.get_path("settings"), settings)
+ def _save_settings(self, settings_path, settings):
+ allow_no_rcs = not self.rcs.path_in_root(settings_path)
+ # allow_no_rcs=True should only be for the special case of
+ # configuring duplicate bugdir settings
+ self.rcs.mkdir(self.get_path(), allow_no_rcs)
+ mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
+ def duplicate_bugdir(self, revision):
+ duplicate_path = self.rcs.duplicate_repo(revision)
+ # setup revision RCS as None, since the duplicate may not be
+ # initialized for versioning
+ duplicate_settings_path = os.path.join(duplicate_path,
+ ".be", "settings")
+ duplicate_settings = self._get_settings(duplicate_settings_path)
+ if "rcs_name" in duplicate_settings:
+ duplicate_settings["rcs_name"] = "None"
+ duplicate_settings["user_id"] = self.user_id
+ if "disabled" in bug.status_values:
+ # Hack to support old versions of BE bugs
+ duplicate_settings["inactive_status"] = self.inactive_status
+ self._save_settings(duplicate_settings_path, duplicate_settings)
+ return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
+ def remove_duplicate_bugdir(self):
+ self.rcs.remove_duplicate_repo()
+ def list_uuids(self):
+ uuids = []
+ if os.path.exists(self.get_path()):
+ # list the uuids on disk
+ for uuid in os.listdir(self.get_path("bugs")):
+ if not (uuid.startswith('.')):
+ uuids.append(uuid)
+ yield uuid
+ # and the ones that are still just in memory
+ for bug in self:
+ if bug.uuid not in uuids:
+ uuids.append(bug.uuid)
+ yield bug.uuid
+ def _clear_bugs(self):
+ while len(self) > 0:
+ self.pop()
+ self._bug_map_gen()
+ def _load_bug(self, uuid):
+ bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
+ self.append(bg)
+ self._bug_map_gen()
+ return bg
+ def new_bug(self, uuid=None, summary=None):
+ bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary)
+ bg.set_sync_with_disk(self.sync_with_disk)
+ if bg.sync_with_disk == True:
+ bg.save()
+ self.append(bg)
+ self._bug_map_gen()
+ return bg
+ def remove_bug(self, bug):
+ self.remove(bug)
+ bug.remove()
+ def bug_shortname(self, bug):
+ """
+ Generate short names from uuids. Picks the minimum number of
+ characters (>=3) from the beginning of the uuid such that the
+ short names are unique.
+ Obviously, as the number of bugs in the database grows, these
+ short names will cease to be unique. The complete uuid should be
+ used for long term reference.
+ """
+ chars = 3
+ for uuid in self._bug_map.keys():
+ if bug.uuid == uuid:
+ continue
+ while (bug.uuid[:chars] == uuid[:chars]):
+ chars+=1
+ return bug.uuid[:chars]
+ def bug_from_shortname(self, shortname):
+ """
+ >>> bd = simple_bug_dir()
+ >>> bug_a = bd.bug_from_shortname('a')
+ >>> print type(bug_a)
+ <class 'libbe.bug.Bug'>
+ >>> print bug_a
+ a:om: Bug A
+ """
+ matches = []
+ self._bug_map_gen()
+ for uuid in self._bug_map.keys():
+ if uuid.startswith(shortname):
+ matches.append(uuid)
+ if len(matches) > 1:
+ raise MultipleBugMatches(shortname, matches)
+ if len(matches) == 1:
+ return self.bug_from_uuid(matches[0])
+ raise KeyError("No bug matches %s" % shortname)
+ def bug_from_uuid(self, uuid):
+ if not self.has_bug(uuid):
+ raise KeyError("No bug matches %s\n bug map: %s\n root: %s" \
+ % (uuid, self._bug_map, self.root))
+ if self._bug_map[uuid] == None:
+ self._load_bug(uuid)
+ return self._bug_map[uuid]
+ def has_bug(self, bug_uuid):
+ if bug_uuid not in self._bug_map:
+ self._bug_map_gen()
+ if bug_uuid not in self._bug_map:
+ return False
+ return True
+def simple_bug_dir():
+ """
+ For testing
+ >>> bugdir = simple_bug_dir()
+ >>> ls = list(bugdir.list_uuids())
+ >>> ls.sort()
+ >>> print ls
+ ['a', 'b']
+ """
+ dir = utility.Dir()
+ assert os.path.exists(dir.path)
+ bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True,
+ manipulate_encodings=False)
+ bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir.
+ bug_a = bugdir.new_bug("a", summary="Bug A")
+ bug_a.creator = "John Doe <jdoe@example.com>"
+ bug_a.time = 0
+ bug_b = bugdir.new_bug("b", summary="Bug B")
+ bug_b.creator = "Jane Doe <jdoe@example.com>"
+ bug_b.time = 0
+ bug_b.status = "closed"
+ bugdir.save()
+ return bugdir
+class BugDirTestCase(unittest.TestCase):
+ def __init__(self, *args, **kwargs):
+ unittest.TestCase.__init__(self, *args, **kwargs)
+ def setUp(self):
+ self.dir = utility.Dir()
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+ allow_rcs_init=True)
+ self.rcs = self.bugdir.rcs
+ def tearDown(self):
+ self.rcs.cleanup()
+ self.dir.cleanup()
+ def fullPath(self, path):
+ return os.path.join(self.dir.path, path)
+ def assertPathExists(self, path):
+ fullpath = self.fullPath(path)
+ self.failUnless(os.path.exists(fullpath)==True,
+ "path %s does not exist" % fullpath)
+ self.assertRaises(AlreadyInitialized, BugDir,
+ self.dir.path, assertNewBugDir=True)
+ def versionTest(self):
+ if self.rcs.versioned == False:
+ return
+ original = self.bugdir.rcs.commit("Began versioning")
+ bugA = self.bugdir.bug_from_uuid("a")
+ bugA.status = "fixed"
+ self.bugdir.save()
+ new = self.rcs.commit("Fixed bug a")
+ dupdir = self.bugdir.duplicate_bugdir(original)
+ self.failUnless(dupdir.root != self.bugdir.root,
+ "%s, %s" % (dupdir.root, self.bugdir.root))
+ bugAorig = dupdir.bug_from_uuid("a")
+ self.failUnless(bugA != bugAorig,
+ "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+ bugAorig.status = "fixed"
+ self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.status, bugAorig.status))
+ self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.severity, bugAorig.severity))
+ self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.assigned, bugAorig.assigned))
+ self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.time, bugAorig.time))
+ self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.creator, bugAorig.creator))
+ self.failUnless(bugA == bugAorig,
+ "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+ self.bugdir.remove_duplicate_bugdir()
+ self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root))
+ def testRun(self):
+ self.bugdir.new_bug(uuid="a", summary="Ant")
+ self.bugdir.new_bug(uuid="b", summary="Cockroach")
+ self.bugdir.new_bug(uuid="c", summary="Praying mantis")
+ length = len(self.bugdir)
+ self.failUnless(length == 3, "%d != 3 bugs" % length)
+ uuids = list(self.bugdir.list_uuids())
+ self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids))
+ self.failUnless(uuids == ["a","b","c"], str(uuids))
+ bugA = self.bugdir.bug_from_uuid("a")
+ bugAprime = self.bugdir.bug_from_shortname("a")
+ self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
+ self.bugdir.save()
+ self.versionTest()
+ def testComments(self, sync_with_disk=False):
+ if sync_with_disk == True:
+ self.bugdir.set_sync_with_disk(True)
+ self.bugdir.new_bug(uuid="a", summary="Ant")
+ bug = self.bugdir.bug_from_uuid("a")
+ comm = bug.comment_root
+ rep = comm.new_reply("Ants are small.")
+ rep.new_reply("And they have six legs.")
+ if sync_with_disk == False:
+ self.bugdir.save()
+ self.bugdir._clear_bugs()
+ bug = self.bugdir.bug_from_uuid("a")
+ bug.load_comments()
+ self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
+ for index,comment in enumerate(bug.comments()):
+ if index == 0:
+ repLoaded = comment
+ self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
+ self.failUnless(comment.sync_with_disk == True,
+ comment.sync_with_disk)
+ #load_settings()
+ self.failUnless(comment.content_type == "text/plain",
+ comment.content_type)
+ self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
+ repLoaded.settings)
+ self.failUnless(repLoaded.body == "Ants are small.",
+ repLoaded.body)
+ elif index == 1:
+ self.failUnless(comment.in_reply_to == repLoaded.uuid,
+ repLoaded.uuid)
+ self.failUnless(comment.body == "And they have six legs.",
+ comment.body)
+ else:
+ self.failIf(True, "Invalid comment: %d\n%s" % (index, comment))
+ def testSyncedComments(self):
+ self.testComments(sync_with_disk=True)
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
+suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()])
diff --git a/libbe/bzr.py b/libbe/bzr.py
index e69de29..d7cd1e5 100644
--- a/libbe/bzr.py
+++ b/libbe/bzr.py
@@ -0,0 +1,101 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <ben+python@benfinney.id.au>
+# Marien Zwart <marienz@gentoo.org>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import os
+import re
+import sys
+import unittest
+import doctest
+import rcs
+from rcs import RCS
+def new():
+ return Bzr()
+class Bzr(RCS):
+ name = "bzr"
+ client = "bzr"
+ versioned = True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ if self._u_search_parent_directories(path, ".bzr") != None :
+ return True
+ return False
+ def _rcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ status,output,error = self._u_invoke_client("root", path)
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("whoami")
+ return output.rstrip('\n')
+ def _rcs_set_user_id(self, value):
+ self._u_invoke_client("whoami", value)
+ def _rcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ # --force to also remove unversioned files.
+ self._u_invoke_client("remove", "--force", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("branch", "--revision", revision,
+ ".", directory)
+ def _rcs_commit(self, commitfile, allow_empty=False):
+ args = ["commit", "--file", commitfile]
+ if allow_empty == True:
+ args.append("--unchanged")
+ status,output,error = self._u_invoke_client(*args)
+ else:
+ kwargs = {"expect":(0,3)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ if status != 0:
+ strings = ["ERROR: no changes to commit.", # bzr 1.3.1
+ "ERROR: No changes to commit."] # bzr 1.15.1
+ if self._u_any_in_string(strings, error) == True:
+ raise rcs.EmptyCommit()
+ else:
+ raise rcs.CommandError(args, status, error)
+ revision = None
+ revline = re.compile("Committed revision (.*)[.]")
+ match = revline.search(error)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+rcs.make_rcs_testcase_subclasses(Bzr, sys.modules[__name__])
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/cmdutil.py b/libbe/cmdutil.py
index e69de29..853a75a 100644
--- a/libbe/cmdutil.py
+++ b/libbe/cmdutil.py
@@ -0,0 +1,218 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import glob
+import optparse
+import os
+from textwrap import TextWrapper
+from StringIO import StringIO
+import sys
+import doctest
+import bugdir
+import plugin
+import encoding
+class UserError(Exception):
+ def __init__(self, msg):
+ Exception.__init__(self, msg)
+class UnknownCommand(UserError):
+ def __init__(self, cmd):
+ Exception.__init__(self, "Unknown command '%s'" % cmd)
+ self.cmd = cmd
+class UsageError(Exception):
+ pass
+class GetHelp(Exception):
+ pass
+class GetCompletions(Exception):
+ def __init__(self, completions=[]):
+ msg = "Get allowed completions"
+ Exception.__init__(self, msg)
+ self.completions = completions
+def iter_commands():
+ for name, module in plugin.iter_plugins("becommands"):
+ yield name.replace("_", "-"), module
+def get_command(command_name):
+ """Retrieves the module for a user command
+ >>> try:
+ ... get_command("asdf")
+ ... except UnknownCommand, e:
+ ... print e
+ Unknown command 'asdf'
+ >>> repr(get_command("list")).startswith("<module 'becommands.list' from ")
+ True
+ """
+ cmd = plugin.get_plugin("becommands", command_name.replace("-", "_"))
+ if cmd is None:
+ raise UnknownCommand(command_name)
+ return cmd
+def execute(cmd, args):
+ enc = encoding.get_encoding()
+ cmd = get_command(cmd)
+ ret = cmd.execute([a.decode(enc) for a in args])
+ if ret == None:
+ ret = 0
+ return ret
+def help(cmd=None, parser=None):
+ if cmd != None:
+ return get_command(cmd).help()
+ else:
+ cmdlist = []
+ for name, module in iter_commands():
+ cmdlist.append((name, module.__desc__))
+ longest_cmd_len = max([len(name) for name,desc in cmdlist])
+ ret = ["Bugs Everywhere - Distributed bug tracking",
+ "", "Supported commands"]
+ for name, desc in cmdlist:
+ numExtraSpaces = longest_cmd_len-len(name)
+ ret.append("be %s%*s %s" % (name, numExtraSpaces, "", desc))
+ ret.extend(["", "Run", " be help [command]", "for more information."])
+ longhelp = "\n".join(ret)
+ if parser == None:
+ return longhelp
+ return parser.help_str() + "\n" + longhelp
+def completions(cmd):
+ parser = get_command(cmd).get_parser()
+ longopts = []
+ for opt in parser.option_list:
+ longopts.append(opt.get_opt_string())
+ return longopts
+def raise_get_help(option, opt, value, parser):
+ raise GetHelp
+def raise_get_completions(option, opt, value, parser):
+ print "got completion arg"
+ if hasattr(parser, "command") and parser.command == "be":
+ comps = []
+ for command, module in iter_commands():
+ comps.append(command)
+ for opt in parser.option_list:
+ comps.append(opt.get_opt_string())
+ raise GetCompletions(comps)
+ raise GetCompletions(completions(sys.argv[1]))
+class CmdOptionParser(optparse.OptionParser):
+ def __init__(self, usage):
+ optparse.OptionParser.__init__(self, usage)
+ self.disable_interspersed_args()
+ self.remove_option("-h")
+ self.add_option("-h", "--help", action="callback",
+ callback=raise_get_help, help="Print a help message")
+ self.add_option("--complete", action="callback",
+ callback=raise_get_completions,
+ help="Print a list of available completions")
+ def error(self, message):
+ raise UsageError(message)
+ def iter_options(self):
+ return iter_combine([self._short_opt.iterkeys(),
+ self._long_opt.iterkeys()])
+ def help_str(self):
+ f = StringIO()
+ self.print_help(f)
+ return f.getvalue()
+def option_value_pairs(options, parser):
+ """
+ Iterate through OptionParser (option, value) pairs.
+ """
+ for option in [o.dest for o in parser.option_list if o.dest != None]:
+ value = getattr(options, option)
+ yield (option, value)
+def default_complete(options, args, parser, bugid_args={}):
+ """
+ A dud complete implementation for becommands so that the
+ --complete argument doesn't cause any problems. Use this
+ until you've set up a command-specific complete function.
+ bugid_args is an optional dict where the keys are positional
+ arguments taking bug shortnames and the values are functions for
+ filtering, since that's a common enough operation.
+ e.g. for "be open [options] BUGID"
+ bugid_args = {0: lambda bug : bug.active == False}
+ A positional argument of -1 specifies all remaining arguments
+ (e.g in the case of "be show BUGID BUGID ...").
+ """
+ for option,value in option_value_pairs(options, parser):
+ if value == "--complete":
+ raise GetCompletions()
+ if len(bugid_args.keys()) > 0:
+ max_pos_arg = max(bugid_args.keys())
+ else:
+ max_pos_arg = -1
+ for pos,value in enumerate(args):
+ if value == "--complete":
+ filter = None
+ if pos in bugid_args:
+ filter = bugid_args[pos]
+ if pos > max_pos_arg and -1 in bugid_args:
+ filter = bugid_args[-1]
+ if filter != None:
+ bugshortnames = []
+ try:
+ bd = bugdir.BugDir(from_disk=True,
+ manipulate_encodings=False)
+ bd.load_all_bugs()
+ bugs = [bug for bug in bd if filter(bug) == True]
+ bugshortnames = [bd.bug_shortname(bug) for bug in bugs]
+ except bugdir.NoBugDir:
+ pass
+ raise GetCompletions(bugshortnames)
+ raise GetCompletions()
+def complete_path(path):
+ """List possible path completions for path."""
+ comps = glob.glob(path+"*") + glob.glob(path+"/*")
+ if len(comps) == 1 and os.path.isdir(comps[0]):
+ comps.extend(glob.glob(comps[0]+"/*"))
+ return comps
+def underlined(instring):
+ """Produces a version of a string that is underlined with '='
+ >>> underlined("Underlined String")
+ 'Underlined String\\n================='
+ """
+ return "%s\n%s" % (instring, "="*len(instring))
+def _test():
+ import doctest
+ import sys
+ doctest.testmod()
+if __name__ == "__main__":
+ _test()
+suite = doctest.DocTestSuite()
diff --git a/libbe/comment.py b/libbe/comment.py
index e69de29..3249e8b 100644
--- a/libbe/comment.py
+++ b/libbe/comment.py
@@ -0,0 +1,662 @@
+# Bugs Everywhere, a distributed bugtracker
+# Copyright (C) 2008-2009 Chris Ball <cjb@laptop.org>
+# Thomas Habets <thomas@habets.pp.se>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import base64
+import os
+import os.path
+import sys
+import time
+import types
+try: # import core module, Python >= 2.5
+ from xml.etree import ElementTree
+except ImportError: # look for non-core module
+ from elementtree import ElementTree
+import xml.sax.saxutils
+import doctest
+from beuuid import uuid_gen
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, cached_property, \
+ primed_property, change_hook_property, settings_property
+import settings_object
+import mapfile
+from tree import Tree
+import utility
+class InvalidShortname(KeyError):
+ def __init__(self, shortname, shortnames):
+ msg = "Invalid shortname %s\n%s" % (shortname, shortnames)
+ KeyError.__init__(self, msg)
+ self.shortname = shortname
+ self.shortnames = shortnames
+class InvalidXML(ValueError):
+ def __init__(self, element, comment):
+ msg = "Invalid comment xml: %s\n %s\n" \
+ % (comment, ElementTree.tostring(element))
+ ValueError.__init__(self, msg)
+ self.element = element
+ self.comment = comment
+class MissingReference(ValueError):
+ def __init__(self, comment):
+ msg = "Missing reference to %s" % (comment.in_reply_to)
+ ValueError.__init__(self, msg)
+ self.reference = comment.in_reply_to
+ self.comment = comment
+INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!"
+def list_to_root(comments, bug, root=None,
+ ignore_missing_references=False):
+ """
+ Convert a raw list of comments to single root comment. We use a
+ dummy root comment by default, because there can be several
+ comment threads rooted on the same parent bug. To simplify
+ comment interaction, we condense these threads into a single
+ thread with a Comment dummy root. Can also be used to append
+ a list of subcomments to a non-dummy root comment, so long as
+ all the new comments are descendants of the root comment.
+ No Comment method should use the dummy comment.
+ """
+ root_comments = []
+ uuid_map = {}
+ for comment in comments:
+ assert comment.uuid != None
+ uuid_map[comment.uuid] = comment
+ for comment in comments:
+ if comment.alt_id != None and comment.alt_id not in uuid_map:
+ uuid_map[comment.alt_id] = comment
+ if root == None:
+ root = Comment(bug, uuid=INVALID_UUID)
+ else:
+ uuid_map[root.uuid] = root
+ for comm in comments:
+ if comm.in_reply_to == INVALID_UUID:
+ comm.in_reply_to = None
+ rep = comm.in_reply_to
+ if rep == None or rep == bug.uuid:
+ root_comments.append(comm)
+ else:
+ parentUUID = comm.in_reply_to
+ try:
+ parent = uuid_map[parentUUID]
+ parent.add_reply(comm)
+ except KeyError, e:
+ if ignore_missing_references == True:
+ print >> sys.stderr, \
+ "Ignoring missing reference to %s" % parentUUID
+ comm.in_reply_to = None
+ root_comments.append(comm)
+ else:
+ raise MissingReference(comm)
+ root.extend(root_comments)
+ return root
+def loadComments(bug, load_full=False):
+ """
+ Set load_full=True when you want to load the comment completely
+ from disk *now*, rather than waiting and lazy loading as required.
+ """
+ path = bug.get_path("comments")
+ if not os.path.isdir(path):
+ return Comment(bug, uuid=INVALID_UUID)
+ comments = []
+ for uuid in os.listdir(path):
+ if uuid.startswith('.'):
+ continue
+ comm = Comment(bug, uuid, from_disk=True)
+ comm.set_sync_with_disk(bug.sync_with_disk)
+ if load_full == True:
+ comm.load_settings()
+ dummy = comm.body # force the body to load
+ comments.append(comm)
+ return list_to_root(comments, bug)
+def saveComments(bug):
+ for comment in bug.comment_root.traverse():
+ comment.save()
+class Comment(Tree, settings_object.SavedSettingsObject):
+ """
+ >>> c = Comment()
+ >>> c.uuid != None
+ True
+ >>> c.uuid = "some-UUID"
+ >>> print c.content_type
+ text/plain
+ """
+ settings_properties = []
+ required_saved_properties = []
+ _prop_save_settings = settings_object.prop_save_settings
+ _prop_load_settings = settings_object.prop_load_settings
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return settings_object.versioned_property(**kwargs)
+ @_versioned_property(name="Alt-id",
+ doc="Alternate ID for linking imported comments. Internally comments are linked (via In-reply-to) to the parent's UUID. However, these UUIDs are generated internally, so Alt-id is provided as a user-controlled linking target.")
+ def alt_id(): return {}
+ @_versioned_property(name="From",
+ doc="The author of the comment")
+ def From(): return {}
+ @_versioned_property(name="In-reply-to",
+ doc="UUID for parent comment or bug")
+ def in_reply_to(): return {}
+ @_versioned_property(name="Content-type",
+ doc="Mime type for comment body",
+ default="text/plain",
+ require_save=True)
+ def content_type(): return {}
+ @_versioned_property(name="Date",
+ doc="An RFC 2822 timestamp for comment creation")
+ def time_string(): return {}
+ def _get_time(self):
+ if self.time_string == None:
+ return None
+ return utility.str_to_time(self.time_string)
+ def _set_time(self, value):
+ self.time_string = utility.time_to_str(value)
+ time = property(fget=_get_time,
+ fset=_set_time,
+ doc="An integer version of .time_string")
+ def _get_comment_body(self):
+ if self.rcs != None and self.sync_with_disk == True:
+ import rcs
+ binary = not self.content_type.startswith("text/")
+ return self.rcs.get_file_contents(self.get_path("body"), binary=binary)
+ def _set_comment_body(self, old=None, new=None, force=False):
+ if (self.rcs != None and self.sync_with_disk == True) or force==True:
+ assert new != None, "Can't save empty comment"
+ binary = not self.content_type.startswith("text/")
+ self.rcs.set_file_contents(self.get_path("body"), new, binary=binary)
+ @Property
+ @change_hook_property(hook=_set_comment_body)
+ @cached_property(generator=_get_comment_body)
+ @local_property("body")
+ @doc_property(doc="The meat of the comment")
+ def body(): return {}
+ def _get_rcs(self):
+ if hasattr(self.bug, "rcs"):
+ return self.bug.rcs
+ @Property
+ @cached_property(generator=_get_rcs)
+ @local_property("rcs")
+ @doc_property(doc="A revision control system instance.")
+ def rcs(): return {}
+ def _extra_strings_check_fn(value):
+ return utility.iterable_full_of_strings(value, \
+ alternative=settings_object.EMPTY)
+ def _extra_strings_change_hook(self, old, new):
+ self.extra_strings.sort() # to make merging easier
+ self._prop_save_settings(old, new)
+ @_versioned_property(name="extra_strings",
+ doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
+ default=[],
+ check_fn=_extra_strings_check_fn,
+ change_hook=_extra_strings_change_hook,
+ mutable=True)
+ def extra_strings(): return {}
+ def __init__(self, bug=None, uuid=None, from_disk=False,
+ in_reply_to=None, body=None):
+ """
+ Set from_disk=True to load an old comment.
+ Set from_disk=False to create a new comment.
+ The uuid option is required when from_disk==True.
+ The in_reply_to and body options are only used if
+ from_disk==False (the default). When from_disk==True, they are
+ loaded from the bug database.
+ in_reply_to should be the uuid string of the parent comment.
+ """
+ Tree.__init__(self)
+ settings_object.SavedSettingsObject.__init__(self)
+ self.bug = bug
+ self.uuid = uuid
+ if from_disk == True:
+ self.sync_with_disk = True
+ else:
+ self.sync_with_disk = False
+ if uuid == None:
+ self.uuid = uuid_gen()
+ self.time = int(time.time()) # only save to second precision
+ if self.rcs != None:
+ self.From = self.rcs.get_user_id()
+ self.in_reply_to = in_reply_to
+ self.body = body
+ def set_sync_with_disk(self, value):
+ self.sync_with_disk = True
+ def traverse(self, *args, **kwargs):
+ """Avoid working with the possible dummy root comment"""
+ for comment in Tree.traverse(self, *args, **kwargs):
+ if comment.uuid == INVALID_UUID:
+ continue
+ yield comment
+ def _setting_attr_string(self, setting):
+ value = getattr(self, setting)
+ if value == None:
+ return ""
+ return str(value)
+ def xml(self, indent=0, shortname=None):
+ """
+ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
+ >>> comm.uuid = "0123"
+ >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000"
+ >>> print comm.xml(indent=2, shortname="com-1")
+ <comment>
+ <uuid>0123</uuid>
+ <short-name>com-1</short-name>
+ <from></from>
+ <date>Thu, 01 Jan 1970 00:00:00 +0000</date>
+ <content-type>text/plain</content-type>
+ <body>Some
+ insightful
+ remarks</body>
+ </comment>
+ """
+ if shortname == None:
+ shortname = self.uuid
+ if self.content_type.startswith("text/"):
+ body = (self.body or "").rstrip('\n')
+ else:
+ maintype,subtype = self.content_type.split('/',1)
+ msg = email.mime.base.MIMEBase(maintype, subtype)
+ msg.set_payload(self.body or "")
+ email.encoders.encode_base64(msg)
+ body = base64.encodestring(self.body or "")
+ info = [("uuid", self.uuid),
+ ("alt-id", self.alt_id),
+ ("short-name", shortname),
+ ("in-reply-to", self.in_reply_to),
+ ("from", self._setting_attr_string("From")),
+ ("date", self.time_string),
+ ("content-type", self.content_type),
+ ("body", body)]
+ lines = ["<comment>"]
+ for (k,v) in info:
+ if v != None:
+ lines.append(' <%s>%s</%s>' % (k,xml.sax.saxutils.escape(v),k))
+ lines.append("</comment>")
+ istring = ' '*indent
+ sep = '\n' + istring
+ return istring + sep.join(lines).rstrip('\n')
+ def from_xml(self, xml_string, verbose=True):
+ """
+ Note: If alt-id is not given, translates any <uuid> fields to
+ <alt-id> fields.
+ >>> commA = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
+ >>> commA.uuid = "0123"
+ >>> commA.time_string = "Thu, 01 Jan 1970 00:00:00 +0000"
+ >>> xml = commA.xml(shortname="com-1")
+ >>> commB = Comment()
+ >>> commB.from_xml(xml)
+ >>> attrs=['uuid','alt_id','in_reply_to','From','time_string','content_type','body']
+ >>> for attr in attrs: # doctest: +ELLIPSIS
+ ... if getattr(commB, attr) != getattr(commA, attr):
+ ... estr = "Mismatch on %s: '%s' should be '%s'"
+ ... args = (attr, getattr(commB, attr), getattr(commA, attr))
+ ... print estr % args
+ Mismatch on uuid: '...' should be '0123'
+ Mismatch on alt_id: '0123' should be 'None'
+ >>> print commB.alt_id
+ 0123
+ >>> commA.From
+ >>> commB.From
+ """
+ if type(xml_string) == types.UnicodeType:
+ xml_string = xml_string.strip().encode("unicode_escape")
+ comment = ElementTree.XML(xml_string)
+ if comment.tag != "comment":
+ raise InvalidXML(comment, "root element must be <comment>")
+ tags=['uuid','alt-id','in-reply-to','from','date','content-type','body']
+ uuid = None
+ body = None
+ for child in comment.getchildren():
+ if child.tag == "short-name":
+ pass
+ elif child.tag in tags:
+ if child.text == None or len(child.text) == 0:
+ text = settings_object.EMPTY
+ else:
+ text = xml.sax.saxutils.unescape(child.text)
+ text = unicode(text).decode("unicode_escape").strip()
+ if child.tag == "uuid":
+ uuid = text
+ continue # don't set the bug's uuid tag.
+ if child.tag == "body":
+ body = text
+ continue # don't set the bug's body yet.
+ elif child.tag == 'from':
+ attr_name = "From"
+ elif child.tag == 'date':
+ attr_name = 'time_string'
+ else:
+ attr_name = child.tag.replace('-','_')
+ setattr(self, attr_name, text)
+ elif verbose == True:
+ print >> sys.stderr, "Ignoring unknown tag %s in %s" \
+ % (child.tag, comment.tag)
+ if self.alt_id == None and uuid not in [None, self.uuid]:
+ self.alt_id = uuid
+ if body != None:
+ if self.content_type.startswith("text/"):
+ self.body = body+"\n" # restore trailing newline
+ else:
+ self.body = base64.decodestring(body)
+ def string(self, indent=0, shortname=None):
+ """
+ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
+ >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000"
+ >>> print comm.string(indent=2, shortname="com-1")
+ --------- Comment ---------
+ Name: com-1
+ From:
+ Date: Thu, 01 Jan 1970 00:00:00 +0000
+ Some
+ insightful
+ remarks
+ """
+ if shortname == None:
+ shortname = self.uuid
+ lines = []
+ lines.append("--------- Comment ---------")
+ lines.append("Name: %s" % shortname)
+ lines.append("From: %s" % (self._setting_attr_string("From")))
+ lines.append("Date: %s" % self.time_string)
+ lines.append("")
+ if self.content_type.startswith("text/"):
+ lines.extend((self.body or "").splitlines())
+ else:
+ lines.append("Content type %s not printable. Try XML output instead" % self.content_type)
+ istring = ' '*indent
+ sep = '\n' + istring
+ return istring + sep.join(lines).rstrip('\n')
+ def __str__(self):
+ """
+ >>> comm = Comment(bug=None, body="Some insightful remarks")
+ >>> comm.uuid = "com-1"
+ >>> comm.time_string = "Thu, 20 Nov 2008 15:55:11 +0000"
+ >>> comm.From = "Jane Doe <jdoe@example.com>"
+ >>> print comm
+ --------- Comment ---------
+ Name: com-1
+ From: Jane Doe <jdoe@example.com>
+ Date: Thu, 20 Nov 2008 15:55:11 +0000
+ Some insightful remarks
+ """
+ return self.string()
+ def get_path(self, name=None):
+ my_dir = os.path.join(self.bug.get_path("comments"), self.uuid)
+ if name is None:
+ return my_dir
+ assert name in ["values", "body"]
+ return os.path.join(my_dir, name)
+ def load_settings(self):
+ self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
+ self._setup_saved_settings()
+ def save_settings(self):
+ self.rcs.mkdir(self.get_path())
+ path = self.get_path("values")
+ mapfile.map_save(self.rcs, path, self._get_saved_settings())
+ def save(self):
+ """
+ Save any loaded contents to disk.
+ However, if self.sync_with_disk = True, then any changes are
+ automatically written to disk as soon as they happen, so
+ calling this method will just waste time (unless something
+ else has been messing with your on-disk files).
+ """
+ assert self.body != None, "Can't save blank comment"
+ self.save_settings()
+ self._set_comment_body(new=self.body, force=True)
+ def remove(self):
+ for comment in self.traverse():
+ path = comment.get_path()
+ self.rcs.recursive_remove(path)
+ def add_reply(self, reply, allow_time_inversion=False):
+ if self.uuid != INVALID_UUID:
+ reply.in_reply_to = self.uuid
+ self.append(reply)
+ #raise Exception, "adding reply \n%s\n%s" % (self, reply)
+ def new_reply(self, body=None):
+ """
+ >>> comm = Comment(bug=None, body="Some insightful remarks")
+ >>> repA = comm.new_reply("Critique original comment")
+ >>> repB = repA.new_reply("Begin flamewar :p")
+ >>> repB.in_reply_to == repA.uuid
+ True
+ """
+ reply = Comment(self.bug, body=body)
+ if self.bug != None:
+ reply.set_sync_with_disk(self.bug.sync_with_disk)
+ if reply.sync_with_disk == True:
+ reply.save()
+ self.add_reply(reply)
+ return reply
+ def string_thread(self, string_method_name="string", name_map={},
+ indent=0, flatten=True,
+ auto_name_map=False, bug_shortname=None):
+ """
+ Return a string displaying a thread of comments.
+ bug_shortname is only used if auto_name_map == True.
+ string_method_name (defaults to "string") is the name of the
+ Comment method used to generate the output string for each
+ Comment in the thread. The method must take the arguments
+ indent and shortname.
+ SIDE-EFFECT: if auto_name_map==True, calls comment_shortnames()
+ which will sort the tree by comment.time. Avoid by calling
+ name_map = {}
+ for shortname,comment in comm.comment_shortnames(bug_shortname):
+ name_map[comment.uuid] = shortname
+ comm.sort(key=lambda c : c.From) # your sort
+ comm.string_thread(name_map=name_map)
+ >>> a = Comment(bug=None, uuid="a", body="Insightful remarks")
+ >>> a.time = utility.str_to_time("Thu, 20 Nov 2008 01:00:00 +0000")
+ >>> b = a.new_reply("Critique original comment")
+ >>> b.uuid = "b"
+ >>> b.time = utility.str_to_time("Thu, 20 Nov 2008 02:00:00 +0000")
+ >>> c = b.new_reply("Begin flamewar :p")
+ >>> c.uuid = "c"
+ >>> c.time = utility.str_to_time("Thu, 20 Nov 2008 03:00:00 +0000")
+ >>> d = a.new_reply("Useful examples")
+ >>> d.uuid = "d"
+ >>> d.time = utility.str_to_time("Thu, 20 Nov 2008 04:00:00 +0000")
+ >>> a.sort(key=lambda comm : comm.time)
+ >>> print a.string_thread(flatten=True)
+ --------- Comment ---------
+ Name: a
+ From:
+ Date: Thu, 20 Nov 2008 01:00:00 +0000
+ Insightful remarks
+ --------- Comment ---------
+ Name: b
+ From:
+ Date: Thu, 20 Nov 2008 02:00:00 +0000
+ Critique original comment
+ --------- Comment ---------
+ Name: c
+ From:
+ Date: Thu, 20 Nov 2008 03:00:00 +0000
+ Begin flamewar :p
+ --------- Comment ---------
+ Name: d
+ From:
+ Date: Thu, 20 Nov 2008 04:00:00 +0000
+ Useful examples
+ >>> print a.string_thread(auto_name_map=True, bug_shortname="bug-1")
+ --------- Comment ---------
+ Name: bug-1:1
+ From:
+ Date: Thu, 20 Nov 2008 01:00:00 +0000
+ Insightful remarks
+ --------- Comment ---------
+ Name: bug-1:2
+ From:
+ Date: Thu, 20 Nov 2008 02:00:00 +0000
+ Critique original comment
+ --------- Comment ---------
+ Name: bug-1:3
+ From:
+ Date: Thu, 20 Nov 2008 03:00:00 +0000
+ Begin flamewar :p
+ --------- Comment ---------
+ Name: bug-1:4
+ From:
+ Date: Thu, 20 Nov 2008 04:00:00 +0000
+ Useful examples
+ """
+ if auto_name_map == True:
+ name_map = {}
+ for shortname,comment in self.comment_shortnames(bug_shortname):
+ name_map[comment.uuid] = shortname
+ stringlist = []
+ for depth,comment in self.thread(flatten=flatten):
+ ind = 2*depth+indent
+ if comment.uuid in name_map:
+ sname = name_map[comment.uuid]
+ else:
+ sname = None
+ string_fn = getattr(comment, string_method_name)
+ stringlist.append(string_fn(indent=ind, shortname=sname))
+ return '\n'.join(stringlist)
+ def xml_thread(self, name_map={}, indent=0,
+ auto_name_map=False, bug_shortname=None):
+ return self.string_thread(string_method_name="xml", name_map=name_map,
+ indent=indent, auto_name_map=auto_name_map,
+ bug_shortname=bug_shortname)
+ def comment_shortnames(self, bug_shortname=None):
+ """
+ Iterate through (id, comment) pairs, in time order.
+ (This is a user-friendly id, not the comment uuid).
+ SIDE-EFFECT : will sort the comment tree by comment.time
+ >>> a = Comment(bug=None, uuid="a")
+ >>> b = a.new_reply()
+ >>> b.uuid = "b"
+ >>> c = b.new_reply()
+ >>> c.uuid = "c"
+ >>> d = a.new_reply()
+ >>> d.uuid = "d"
+ >>> for id,name in a.comment_shortnames("bug-1"):
+ ... print id, name.uuid
+ bug-1:1 a
+ bug-1:2 b
+ bug-1:3 c
+ bug-1:4 d
+ """
+ if bug_shortname == None:
+ bug_shortname = ""
+ self.sort(key=lambda comm : comm.time)
+ for num,comment in enumerate(self.traverse()):
+ yield ("%s:%d" % (bug_shortname, num+1), comment)
+ def comment_from_shortname(self, comment_shortname, *args, **kwargs):
+ """
+ Use a comment shortname to look up a comment.
+ >>> a = Comment(bug=None, uuid="a")
+ >>> b = a.new_reply()
+ >>> b.uuid = "b"
+ >>> c = b.new_reply()
+ >>> c.uuid = "c"
+ >>> d = a.new_reply()
+ >>> d.uuid = "d"
+ >>> comm = a.comment_from_shortname("bug-1:3", bug_shortname="bug-1")
+ >>> id(comm) == id(c)
+ True
+ """
+ for cur_name, comment in self.comment_shortnames(*args, **kwargs):
+ if comment_shortname == cur_name:
+ return comment
+ raise InvalidShortname(comment_shortname,
+ list(self.comment_shortnames(*args, **kwargs)))
+ def comment_from_uuid(self, uuid):
+ """
+ Use a comment shortname to look up a comment.
+ >>> a = Comment(bug=None, uuid="a")
+ >>> b = a.new_reply()
+ >>> b.uuid = "b"
+ >>> c = b.new_reply()
+ >>> c.uuid = "c"
+ >>> d = a.new_reply()
+ >>> d.uuid = "d"
+ >>> comm = a.comment_from_uuid("d")
+ >>> id(comm) == id(d)
+ True
+ """
+ for comment in self.traverse():
+ if comment.uuid == uuid:
+ return comment
+ raise KeyError(uuid)
+suite = doctest.DocTestSuite()
diff --git a/libbe/config.py b/libbe/config.py
index e69de29..5e343b9 100644
--- a/libbe/config.py
+++ b/libbe/config.py
@@ -0,0 +1,83 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import ConfigParser
+import codecs
+import locale
+import os.path
+import sys
+import doctest
+default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
+def path():
+ """Return the path to the per-user config file"""
+ return os.path.expanduser("~/.bugs_everywhere")
+def set_val(name, value, section="DEFAULT", encoding=None):
+ """Set a value in the per-user config file
+ :param name: The name of the value to set
+ :param value: The new value to set (or None to delete the value)
+ :param section: The section to store the name/value in
+ """
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ if os.path.exists(path()) == False: # touch file or config
+ open(path(), "w").close() # read chokes on missing file
+ f = codecs.open(path(), "r", encoding)
+ config.readfp(f, path())
+ f.close()
+ if value is not None:
+ config.set(section, name, value)
+ else:
+ config.remove_option(section, name)
+ f = codecs.open(path(), "w", encoding)
+ config.write(f)
+ f.close()
+def get_val(name, section="DEFAULT", default=None, encoding=None):
+ """
+ Get a value from the per-user config file
+ :param name: The name of the value to get
+ :section: The section that the name is in
+ :return: The value, or None
+ >>> get_val("junk") is None
+ True
+ >>> set_val("junk", "random")
+ >>> get_val("junk")
+ u'random'
+ >>> set_val("junk", None)
+ >>> get_val("junk") is None
+ True
+ """
+ if os.path.exists(path()):
+ if encoding == None:
+ encoding = default_encoding
+ config = ConfigParser.ConfigParser()
+ f = codecs.open(path(), "r", encoding)
+ config.readfp(f, path())
+ f.close()
+ try:
+ return config.get(section, name)
+ except ConfigParser.NoOptionError:
+ return default
+ else:
+ return default
+suite = doctest.DocTestSuite()
diff --git a/libbe/darcs.py b/libbe/darcs.py
index e69de29..e7132c0 100644
--- a/libbe/darcs.py
+++ b/libbe/darcs.py
@@ -0,0 +1,163 @@
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import codecs
+import os
+import re
+import sys
+import unittest
+import doctest
+import rcs
+from rcs import RCS
+def new():
+ return Darcs()
+class Darcs(RCS):
+ name="darcs"
+ client="darcs"
+ versioned=True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ if self._u_search_parent_directories(path, "_darcs") != None :
+ return True
+ return False
+ def _rcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ darcs_dir = self._u_search_parent_directories(path, "_darcs")
+ if darcs_dir == None:
+ return None
+ return os.path.dirname(darcs_dir)
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ # following http://darcs.net/manual/node4.html#SECTION00410030000000000000
+ # as of June 29th, 2009
+ if self.rootdir == None:
+ return None
+ darcs_dir = os.path.join(self.rootdir, "_darcs")
+ if darcs_dir != None:
+ for pref_file in ["author", "email"]:
+ pref_path = os.path.join(darcs_dir, "prefs", pref_file)
+ if os.path.exists(pref_path):
+ return self.get_file_contents(pref_path)
+ for env_variable in ["DARCS_EMAIL", "EMAIL"]:
+ if env_variable in os.environ:
+ return os.environ[env_variable]
+ return None
+ def _rcs_set_user_id(self, value):
+ if self.rootdir == None:
+ self.root(".")
+ if self.rootdir == None:
+ raise rcs.SettingIDnotSupported
+ author_path = os.path.join(self.rootdir, "_darcs", "prefs", "author")
+ f = codecs.open(author_path, "w", self.encoding)
+ f.write(value)
+ f.close()
+ def _rcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ os.remove(os.path.join(self.rootdir, path)) # darcs notices removal
+ def _rcs_update(self, path):
+ pass # darcs notices changes
+ def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return RCS._rcs_get_file_contents(self, path, revision,
+ binary=binary)
+ else:
+ try:
+ return self._u_invoke_client("show", "contents", "--patch", revision, path)
+ except rcs.CommandError:
+ # Darcs versions < 2.0.0pre2 lack the "show contents" command
+ status,output,error = self._u_invoke_client("diff", "--unified",
+ "--from-patch",
+ revision, path)
+ major_patch = output
+ status,output,error = self._u_invoke_client("diff", "--unified",
+ "--patch",
+ revision, path)
+ target_patch = output
+ # "--output -" to be supported in GNU patch > 2.5.9
+ # but that hasn't been released as of June 30th, 2009.
+ # Rewrite path to status before the patch we want
+ args=["patch", "--reverse", path]
+ status,output,error = self._u_invoke(args, stdin=major_patch)
+ # Now apply the patch we want
+ args=["patch", path]
+ status,output,error = self._u_invoke(args, stdin=target_patch)
+ if os.path.exists(os.path.join(self.rootdir, path)) == True:
+ contents = RCS._rcs_get_file_contents(self, path,
+ binary=binary)
+ else:
+ contents = ""
+ # Now restore path to it's current incarnation
+ args=["patch", "--reverse", path]
+ status,output,error = self._u_invoke(args, stdin=target_patch)
+ args=["patch", path]
+ status,output,error = self._u_invoke(args, stdin=major_patch)
+ current_contents = RCS._rcs_get_file_contents(self, path,
+ binary=binary)
+ return contents
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision==None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("put", "--to-patch", revision, directory)
+ def _rcs_commit(self, commitfile, allow_empty=False):
+ id = self.get_user_id()
+ if '@' not in id:
+ id = "%s <%s@invalid.com>" % (id, id)
+ args = ['record', '--all', '--author', id, '--logfile', commitfile]
+ status,output,error = self._u_invoke_client(*args)
+ empty_strings = ["No changes!"]
+ revision = None
+ if self._u_any_in_string(empty_strings, output) == True:
+ if allow_empty == False:
+ raise rcs.EmptyCommit()
+ else: # we need a extra call to get the current revision
+ args = ["changes", "--last=1", "--xml"]
+ status,output,error = self._u_invoke_client(*args)
+ revline = re.compile("[ \t]*<name>(.*)</name>")
+ # note that darcs does _not_ make an empty revision.
+ # this returns the last non-empty revision id...
+ else:
+ revline = re.compile("Finished recording patch '(.*)'")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+rcs.make_rcs_testcase_subclasses(Darcs, sys.modules[__name__])
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/diff.py b/libbe/diff.py
index e69de29..ba48efc 100644
--- a/libbe/diff.py
+++ b/libbe/diff.py
@@ -0,0 +1,125 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""Compare two bug trees"""
+from libbe import cmdutil, bugdir, bug
+from libbe.utility import time_to_str
+import doctest
+def bug_diffs(old_bugdir, new_bugdir):
+ added = []
+ removed = []
+ modified = []
+ for uuid in old_bugdir.list_uuids():
+ old_bug = old_bugdir.bug_from_uuid(uuid)
+ try:
+ new_bug = new_bugdir.bug_from_uuid(uuid)
+ old_bug.load_comments()
+ new_bug.load_comments()
+ if old_bug != new_bug:
+ modified.append((old_bug, new_bug))
+ except KeyError:
+ removed.append(old_bug)
+ for uuid in new_bugdir.list_uuids():
+ if not old_bugdir.has_bug(uuid):
+ new_bug = new_bugdir.bug_from_uuid(uuid)
+ added.append(new_bug)
+ return (removed, modified, added)
+def diff_report(bug_diffs_data, old_bugdir, new_bugdir):
+ bugs_removed,bugs_modified,bugs_added = bug_diffs_data
+ def modified_cmp(left, right):
+ return bug.cmp_severity(left[1], right[1])
+ bugs_added.sort(bug.cmp_severity)
+ bugs_removed.sort(bug.cmp_severity)
+ bugs_modified.sort(modified_cmp)
+ lines = []
+ if old_bugdir.settings != new_bugdir.settings:
+ bugdir_settings = sorted(new_bugdir.settings_properties)
+ bugdir_settings.remove("rcs_name") # tweaked by bugdir.duplicate_bugdir
+ change_list = change_lines(old_bugdir, new_bugdir, bugdir_settings)
+ if len(change_list) > 0:
+ lines.append("Modified bug directory:")
+ change_strings = ["%s: %s -> %s" % f for f in change_list]
+ lines.extend(change_strings)
+ lines.append("")
+ if len(bugs_added) > 0:
+ lines.append("New bug reports:")
+ for bg in bugs_added:
+ lines.extend(bg.string(shortlist=True).splitlines())
+ lines.append("")
+ if len(bugs_modified) > 0:
+ printed = False
+ for old_bug, new_bug in bugs_modified:
+ change_str = bug_changes(old_bug, new_bug)
+ if change_str is None:
+ continue
+ if not printed:
+ printed = True
+ lines.append("Modified bug reports:")
+ lines.extend(change_str.splitlines())
+ if printed == True:
+ lines.append("")
+ if len(bugs_removed) > 0:
+ lines.append("Removed bug reports:")
+ for bg in bugs_removed:
+ lines.extend(bg.string(shortlist=True).splitlines())
+ lines.append("")
+ return "\n".join(lines).rstrip("\n")
+def change_lines(old, new, attributes):
+ change_list = []
+ for attr in attributes:
+ old_attr = getattr(old, attr)
+ new_attr = getattr(new, attr)
+ if old_attr != new_attr:
+ change_list.append((attr, old_attr, new_attr))
+ if len(change_list) >= 0:
+ return change_list
+ else:
+ return None
+def bug_changes(old, new):
+ bug_settings = sorted(new.settings_properties)
+ change_list = change_lines(old, new, bug_settings)
+ change_strings = ["%s: %s -> %s" % f for f in change_list]
+ old_comment_ids = [c.uuid for c in old.comments()]
+ new_comment_ids = [c.uuid for c in new.comments()]
+ for comment_id in new_comment_ids:
+ if comment_id not in old_comment_ids:
+ summary = comment_summary(new.comment_from_uuid(comment_id), "new")
+ change_strings.append(summary)
+ for comment_id in old_comment_ids:
+ if comment_id not in new_comment_ids:
+ summary = comment_summary(new.comment_from_uuid(comment_id),
+ "removed")
+ change_strings.append(summary)
+ if len(change_strings) == 0:
+ return None
+ return "%s\n %s" % (new.string(shortlist=True),
+ " \n".join(change_strings))
+def comment_summary(comment, status):
+ return "%8s comment from %s on %s" % (status, comment.From,
+ time_to_str(comment.time))
+suite = doctest.DocTestSuite()
diff --git a/libbe/editor.py b/libbe/editor.py
index e69de29..93144b8 100644
--- a/libbe/editor.py
+++ b/libbe/editor.py
@@ -0,0 +1,101 @@
+# Bugs Everywhere, a distributed bugtracker
+# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import codecs
+import locale
+import os
+import sys
+import tempfile
+import doctest
+default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
+comment_marker = u"== Anything below this line will be ignored\n"
+class CantFindEditor(Exception):
+ def __init__(self):
+ Exception.__init__(self, "Can't find editor to get string from")
+def editor_string(comment=None, encoding=None):
+ """Invokes the editor, and returns the user-produced text as a string
+ >>> if "EDITOR" in os.environ:
+ ... del os.environ["EDITOR"]
+ >>> if "VISUAL" in os.environ:
+ ... del os.environ["VISUAL"]
+ >>> editor_string()
+ Traceback (most recent call last):
+ CantFindEditor: Can't find editor to get string from
+ >>> os.environ["EDITOR"] = "echo bar > "
+ >>> editor_string()
+ u'bar\\n'
+ >>> os.environ["VISUAL"] = "echo baz > "
+ >>> editor_string()
+ u'baz\\n'
+ >>> del os.environ["EDITOR"]
+ >>> del os.environ["VISUAL"]
+ """
+ if encoding == None:
+ encoding = default_encoding
+ for name in ('VISUAL', 'EDITOR'):
+ try:
+ editor = os.environ[name]
+ break
+ except KeyError:
+ pass
+ else:
+ raise CantFindEditor()
+ fhandle, fname = tempfile.mkstemp()
+ try:
+ if comment is not None:
+ os.write(fhandle, '\n'+comment_string(comment))
+ os.close(fhandle)
+ oldmtime = os.path.getmtime(fname)
+ os.system("%s %s" % (editor, fname))
+ f = codecs.open(fname, "r", encoding)
+ output = trimmed_string(f.read())
+ f.close()
+ if output.rstrip('\n') == "":
+ output = None
+ finally:
+ os.unlink(fname)
+ return output
+def comment_string(comment):
+ """
+ >>> comment_string('hello') == comment_marker+"hello"
+ True
+ """
+ return comment_marker + comment
+def trimmed_string(instring):
+ """
+ >>> trimmed_string("hello\\n"+comment_marker)
+ u'hello\\n'
+ >>> trimmed_string("hi!\\n" + comment_string('Booga'))
+ u'hi!\\n'
+ """
+ out = []
+ for line in instring.splitlines(True):
+ if line.startswith(comment_marker):
+ break
+ out.append(line)
+ return ''.join(out)
+suite = doctest.DocTestSuite()
diff --git a/libbe/encoding.py b/libbe/encoding.py
index e69de29..d603602 100644
--- a/libbe/encoding.py
+++ b/libbe/encoding.py
@@ -0,0 +1,51 @@
+# Bugs Everywhere, a distributed bugtracker
+# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import codecs
+import locale
+import sys
+import doctest
+def get_encoding():
+ """
+ Guess a useful input/output/filesystem encoding... Maybe we need
+ seperate encodings for input/output and filesystem? Hmm...
+ """
+ encoding = locale.getpreferredencoding() or sys.getdefaultencoding()
+ if sys.platform != 'win32' or sys.version_info[:2] > (2, 3):
+ encoding = locale.getlocale(locale.LC_TIME)[1] or encoding
+ # Python 2.3 on windows doesn't know about 'XYZ' alias for 'cpXYZ'
+ return encoding
+def known_encoding(encoding):
+ """
+ >>> known_encoding("highly-unlikely-encoding")
+ False
+ >>> known_encoding(get_encoding())
+ True
+ """
+ try:
+ codecs.lookup(encoding)
+ return True
+ except LookupError:
+ return False
+def set_IO_stream_encodings(encoding):
+ sys.stdin = codecs.getreader(encoding)(sys.__stdin__)
+ sys.stdout = codecs.getwriter(encoding)(sys.__stdout__)
+ sys.stderr = codecs.getwriter(encoding)(sys.__stderr__)
+suite = doctest.DocTestSuite()
diff --git a/libbe/git.py b/libbe/git.py
index e69de29..2f9ffa9 100644
--- a/libbe/git.py
+++ b/libbe/git.py
@@ -0,0 +1,120 @@
+# Copyright (C) 2008-2009 Ben Finney <ben+python@benfinney.id.au>
+# Chris Ball <cjb@laptop.org>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import os
+import re
+import sys
+import unittest
+import doctest
+import rcs
+from rcs import RCS
+def new():
+ return Git()
+class Git(RCS):
+ name="git"
+ client="git"
+ versioned=True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ if self._u_search_parent_directories(path, ".git") != None :
+ return True
+ return False
+ def _rcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ status,output,error = self._u_invoke_client("rev-parse", "--git-dir",
+ directory=path)
+ gitdir = os.path.join(path, output.rstrip('\n'))
+ dirname = os.path.abspath(os.path.dirname(gitdir))
+ return dirname
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("config", "user.name")
+ name = output.rstrip('\n')
+ status,output,error = self._u_invoke_client("config", "user.email")
+ email = output.rstrip('\n')
+ if name != "" or email != "": # got something!
+ # guess missing info, if necessary
+ if name == "":
+ name = self._u_get_fallback_username()
+ if email == "":
+ email = self._u_get_fallback_email()
+ return self._u_create_id(name, email)
+ return None # Git has no infomation
+ def _rcs_set_user_id(self, value):
+ name,email = self._u_parse_id(value)
+ if email != None:
+ self._u_invoke_client("config", "user.email", email)
+ self._u_invoke_client("config", "user.name", name)
+ def _rcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ self._u_invoke_client("rm", "-f", path)
+ def _rcs_update(self, path):
+ self._rcs_add(path)
+ def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ else:
+ arg = "%s:%s" % (revision,path)
+ status,output,error = self._u_invoke_client("show", arg)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision==None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ #self._u_invoke_client("archive", revision, directory) # makes tarball
+ self._u_invoke_client("clone", "--no-checkout",".",directory)
+ self._u_invoke_client("checkout", revision, directory=directory)
+ def _rcs_commit(self, commitfile, allow_empty=False):
+ args = ['commit', '--all', '--file', commitfile]
+ if allow_empty == True:
+ args.append("--allow-empty")
+ status,output,error = self._u_invoke_client(*args)
+ else:
+ kwargs = {"expect":(0,1)}
+ status,output,error = self._u_invoke_client(*args, **kwargs)
+ strings = ["nothing to commit",
+ "nothing added to commit"]
+ if self._u_any_in_string(strings, output) == True:
+ raise rcs.EmptyCommit()
+ revision = None
+ revline = re.compile("(.*) (.*)[:\]] (.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 3
+ revision = match.groups()[1]
+ return revision
+rcs.make_rcs_testcase_subclasses(Git, sys.modules[__name__])
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/hg.py b/libbe/hg.py
index e69de29..a20eeb5 100644
--- a/libbe/hg.py
+++ b/libbe/hg.py
@@ -0,0 +1,96 @@
+# Copyright (C) 2007-2009 Aaron Bentley and Panometrics, Inc.
+# Ben Finney <ben+python@benfinney.id.au>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import os
+import re
+import sys
+import unittest
+import doctest
+import rcs
+from rcs import RCS
+def new():
+ return Hg()
+class Hg(RCS):
+ name="hg"
+ client="hg"
+ versioned=True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Mercurial"""
+ if self._u_search_parent_directories(path, ".hg") != None:
+ return True
+ return False
+ def _rcs_root(self, path):
+ status,output,error = self._u_invoke_client("root", directory=path)
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("showconfig","ui.username")
+ return output.rstrip('\n')
+ def _rcs_set_user_id(self, value):
+ """
+ Supported by the Config Extension, but that is not part of
+ standard Mercurial.
+ http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
+ """
+ raise rcs.SettingIDnotSupported
+ def _rcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ self._u_invoke_client("rm", "--force", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ if revision == None:
+ return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ return RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("archive", "--rev", revision, directory)
+ def _rcs_commit(self, commitfile, allow_empty=False):
+ args = ['commit', '--logfile', commitfile]
+ status,output,error = self._u_invoke_client(*args)
+ if allow_empty == False:
+ strings = ["nothing changed"]
+ if self._u_any_in_string(strings, output) == True:
+ raise rcs.EmptyCommit()
+ status,output,error = self._u_invoke_client('identify')
+ revision = None
+ revline = re.compile("(.*) tip")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+rcs.make_rcs_testcase_subclasses(Hg, sys.modules[__name__])
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/mapfile.py b/libbe/mapfile.py
index e69de29..b959d76 100644
--- a/libbe/mapfile.py
+++ b/libbe/mapfile.py
@@ -0,0 +1,127 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import yaml
+import os.path
+import errno
+import utility
+import doctest
+class IllegalKey(Exception):
+ def __init__(self, key):
+ Exception.__init__(self, 'Illegal key "%s"' % key)
+ self.key = key
+class IllegalValue(Exception):
+ def __init__(self, value):
+ Exception.__init__(self, 'Illegal value "%s"' % value)
+ self.value = value
+def generate(map):
+ """Generate a YAML mapfile content string.
+ >>> generate({"q":"p"})
+ 'q: p\\n\\n'
+ >>> generate({"q":u"Fran\u00e7ais"})
+ 'q: Fran\\xc3\\xa7ais\\n\\n'
+ >>> generate({"q":u"hello"})
+ 'q: hello\\n\\n'
+ >>> generate({"q=":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q="
+ >>> generate({"q:":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q:"
+ >>> generate({"q\\n":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key "q\\n"
+ >>> generate({"":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ""
+ >>> generate({">q":"p"})
+ Traceback (most recent call last):
+ IllegalKey: Illegal key ">q"
+ >>> generate({"q":"p\\n"})
+ Traceback (most recent call last):
+ IllegalValue: Illegal value "p\\n"
+ """
+ keys = map.keys()
+ keys.sort()
+ for key in keys:
+ try:
+ assert not key.startswith('>')
+ assert('\n' not in key)
+ assert('=' not in key)
+ assert(':' not in key)
+ assert(len(key) > 0)
+ except AssertionError:
+ raise IllegalKey(unicode(key).encode('unicode_escape'))
+ if "\n" in map[key]:
+ raise IllegalValue(unicode(map[key]).encode('unicode_escape'))
+ lines = []
+ for key in keys:
+ lines.append(yaml.safe_dump({key: map[key]},
+ default_flow_style=False,
+ allow_unicode=True))
+ lines.append("")
+ return '\n'.join(lines)
+def parse(contents):
+ """
+ Parse a YAML mapfile string.
+ >>> parse('q: p\\n\\n')['q']
+ 'p'
+ >>> parse('q: \\'p\\'\\n\\n')['q']
+ 'p'
+ >>> contents = generate({"a":"b", "c":"d", "e":"f"})
+ >>> dict = parse(contents)
+ >>> dict["a"]
+ 'b'
+ >>> dict["c"]
+ 'd'
+ >>> dict["e"]
+ 'f'
+ """
+ old_format = False
+ for line in contents.splitlines():
+ if len(line.split("=")) == 2:
+ old_format = True
+ break
+ if old_format: # translate to YAML. Hack to deal with old BE bugs.
+ newlines = []
+ for line in contents.splitlines():
+ line = line.rstrip('\n')
+ if len(line) == 0:
+ continue
+ fields = line.split("=")
+ if len(fields) == 2:
+ key,value = fields
+ newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
+ else:
+ newlines.append(line)
+ contents = '\n'.join(newlines)
+ return yaml.load(contents) or {}
+def map_save(rcs, path, map, allow_no_rcs=False):
+ """Save the map as a mapfile to the specified path"""
+ contents = generate(map)
+ rcs.set_file_contents(path, contents, allow_no_rcs)
+def map_load(rcs, path, allow_no_rcs=False):
+ contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs)
+ return parse(contents)
+suite = doctest.DocTestSuite()
diff --git a/libbe/plugin.py b/libbe/plugin.py
index e69de29..0545fd7 100644
--- a/libbe/plugin.py
+++ b/libbe/plugin.py
@@ -0,0 +1,71 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Marien Zwart <marienz@gentoo.org>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import os
+import os.path
+import sys
+import doctest
+def my_import(mod_name):
+ module = __import__(mod_name)
+ components = mod_name.split('.')
+ for comp in components[1:]:
+ module = getattr(module, comp)
+ return module
+def iter_plugins(prefix):
+ """
+ >>> "list" in [n for n,m in iter_plugins("becommands")]
+ True
+ >>> "plugin" in [n for n,m in iter_plugins("libbe")]
+ True
+ """
+ modfiles = os.listdir(os.path.join(plugin_path, prefix))
+ modfiles.sort()
+ for modfile in modfiles:
+ if modfile.startswith('.'):
+ continue # the occasional emacs temporary file
+ if modfile.endswith(".py") and modfile != "__init__.py":
+ yield modfile[:-3], my_import(prefix+"."+modfile[:-3])
+def get_plugin(prefix, name):
+ """
+ >>> get_plugin("becommands", "asdf") is None
+ True
+ >>> q = repr(get_plugin("becommands", "list"))
+ >>> q.startswith("<module 'becommands.list' from ")
+ True
+ """
+ dirprefix = os.path.join(*prefix.split('.'))
+ command_path = os.path.join(plugin_path, dirprefix, name+".py")
+ if os.path.isfile(command_path):
+ return my_import(prefix + "." + name)
+ return None
+plugin_path = os.path.realpath(os.path.dirname(os.path.dirname(__file__)))
+if plugin_path not in sys.path:
+ sys.path.append(plugin_path)
+suite = doctest.DocTestSuite()
+def _test():
+ import doctest
+ doctest.testmod()
+if __name__ == "__main__":
+ _test()
diff --git a/libbe/properties.py b/libbe/properties.py
index e69de29..144220b 100644
--- a/libbe/properties.py
+++ b/libbe/properties.py
@@ -0,0 +1,638 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+This module provides a series of useful decorators for defining
+various types of properties. For example usage, consider the
+unittests at the end of the module.
+ http://www.python.org/dev/peps/pep-0318/
+ http://www.phyast.pitt.edu/~micheles/python/documentation.html
+for more information on decorators.
+import copy
+import types
+import unittest
+class ValueCheckError (ValueError):
+ def __init__(self, name, value, allowed):
+ action = "in" # some list of allowed values
+ if type(allowed) == types.FunctionType:
+ action = "allowed by" # some allowed-value check function
+ msg = "%s not %s %s for %s" % (value, action, allowed, name)
+ ValueError.__init__(self, msg)
+ self.name = name
+ self.value = value
+ self.allowed = allowed
+def Property(funcs):
+ """
+ End a chain of property decorators, returning a property.
+ """
+ args = {}
+ args["fget"] = funcs.get("fget", None)
+ args["fset"] = funcs.get("fset", None)
+ args["fdel"] = funcs.get("fdel", None)
+ args["doc"] = funcs.get("doc", None)
+ #print "Creating a property with"
+ #for key, val in args.items(): print key, value
+ return property(**args)
+def doc_property(doc=None):
+ """
+ Add a docstring to a chain of property decorators.
+ """
+ def decorator(funcs=None):
+ """
+ Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...}
+ or a function fn() returning such a dict.
+ """
+ if hasattr(funcs, "__call__"):
+ funcs = funcs() # convert from function-arg to dict
+ funcs["doc"] = doc
+ return funcs
+ return decorator
+def local_property(name, null=None, mutable_null=False):
+ """
+ Define get/set access to per-parent-instance local storage. Uses
+ ._<name>_value to store the value for a particular owner instance.
+ If the ._<name>_value attribute does not exist, returns null.
+ If mutable_null == True, we only release deepcopies of the null to
+ the outside world.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ if mutable_null == True:
+ ret_null = copy.deepcopy(null)
+ else:
+ ret_null = null
+ value = getattr(self, "_%s_value" % name, ret_null)
+ return value
+ def _fset(self, value):
+ setattr(self, "_%s_value" % name, value)
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+def settings_property(name, null=None):
+ """
+ Similar to local_property, except where local_property stores the
+ value in instance._<name>_value, settings_property stores the
+ value in instance.settings[name].
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget", None)
+ fset = funcs.get("fset", None)
+ def _fget(self):
+ if fget is not None:
+ fget(self)
+ value = self.settings.get(name, null)
+ return value
+ def _fset(self, value):
+ self.settings[name] = value
+ if fset is not None:
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ funcs["name"] = name
+ return funcs
+ return decorator
+# Allow comparison and caching with _original_ values for mutables,
+# since
+# >>> a = []
+# >>> b = a
+# >>> b.append(1)
+# >>> a
+# [1]
+# >>> a==b
+# True
+def _hash_mutable_value(value):
+ return repr(value)
+def _init_mutable_property_cache(self):
+ if not hasattr(self, "_mutable_property_cache_hash"):
+ # first call to _fget for any mutable property
+ self._mutable_property_cache_hash = {}
+ self._mutable_property_cache_copy = {}
+def _set_cached_mutable_property(self, cacher_name, property_name, value):
+ _init_mutable_property_cache(self)
+ self._mutable_property_cache_hash[(cacher_name, property_name)] = \
+ _hash_mutable_value(value)
+ self._mutable_property_cache_copy[(cacher_name, property_name)] = \
+ copy.deepcopy(value)
+def _get_cached_mutable_property(self, cacher_name, property_name, default=None):
+ _init_mutable_property_cache(self)
+ if (cacher_name, property_name) not in self._mutable_property_cache_copy:
+ return default
+ return self._mutable_property_cache_copy[(cacher_name, property_name)]
+def _cmp_cached_mutable_property(self, cacher_name, property_name, value):
+ _init_mutable_property_cache(self)
+ if (cacher_name, property_name) not in self._mutable_property_cache_hash:
+ return 1 # any value > non-existant old hash
+ old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)]
+ return cmp(_hash_mutable_value(value), old_hash)
+def defaulting_property(default=None, null=None,
+ mutable_default=False):
+ """
+ Define a default value for get access to a property.
+ If the stored value is null, then default is returned.
+ If mutable_default == True, we only release deepcopies of the
+ default to the outside world.
+ null should never escape to the outside world, so don't worry
+ about it being a mutable.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value == null:
+ if mutable_default == True:
+ return copy.deepcopy(default)
+ else:
+ return default
+ return value
+ def _fset(self, value):
+ if value == default:
+ value = null
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+def fn_checked_property(value_allowed_fn):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ return value
+ def _fset(self, value):
+ if value_allowed_fn(value) != True:
+ raise ValueCheckError(name, value, value_allowed_fn)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+def checked_property(allowed=[]):
+ """
+ Define allowed values for get/set access to a property.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ value = fget(self)
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ return value
+ def _fset(self, value):
+ if value not in allowed:
+ raise ValueCheckError(name, value, allowed)
+ fset(self, value)
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+def cached_property(generator, initVal=None, mutable=False):
+ """
+ Allow caching of values generated by generator(instance), where
+ instance is the instance to which this property belongs. Uses
+ ._<name>_cache to store a cache flag for a particular owner
+ instance.
+ When the cache flag is True or missing and the stored value is
+ initVal, the first fget call triggers the generator function,
+ whose output is stored in _<name>_cached_value. That and
+ subsequent calls to fget will return this cached value.
+ If the input value is no longer initVal (e.g. a value has been
+ loaded from disk or set with fset), that value overrides any
+ cached value, and this property has no effect.
+ When the cache flag is False and the stored value is initVal, the
+ generator is not cached, but is called on every fget.
+ The cache flag is missing on initialization. Particular instances
+ may override by setting their own flag.
+ In the case that mutable == True, all caching is disabled and the
+ generator is called whenever the cached value would otherwise be
+ used.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ cache = getattr(self, "_%s_cache" % name, True)
+ value = fget(self)
+ if value == initVal:
+ if cache == True and mutable == False:
+ if hasattr(self, "_%s_cached_value" % name):
+ value = getattr(self, "_%s_cached_value" % name)
+ else:
+ value = generator(self)
+ setattr(self, "_%s_cached_value" % name, value)
+ else:
+ value = generator(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+def primed_property(primer, initVal=None):
+ """
+ Just like a cached_property, except that instead of returning a
+ new value and running fset to cache it, the primer performs some
+ background manipulation (e.g. loads data into instance.settings)
+ such that a _second_ pass through fget succeeds.
+ The 'cache' flag becomes a 'prime' flag, with priming taking place
+ whenever ._<name>_prime is True, or is False or missing and
+ value == initVal.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self):
+ prime = getattr(self, "_%s_prime" % name, False)
+ if prime == False:
+ value = fget(self)
+ if prime == True or (prime == False and value == initVal):
+ primer(self)
+ value = fget(self)
+ return value
+ funcs["fget"] = _fget
+ return funcs
+ return decorator
+def change_hook_property(hook, mutable=False):
+ """
+ Call the function hook(instance, old_value, new_value) whenever a
+ value different from the current value is set (instance is a a
+ reference to the class instance to which this property belongs).
+ This is useful for saving changes to disk, etc. This function is
+ called _after_ the new value has been stored, allowing you to
+ change the stored value if you want.
+ In the case of mutables, things are slightly trickier. Because
+ the property-owning class has no way of knowing when the value
+ changes. We work around this by caching a private deepcopy of the
+ mutable value, and checking for changes whenever the property is
+ set (obviously) or retrieved (to check for external changes). So
+ long as you're conscientious about accessing the property after
+ making external modifications, mutability woln't be a problem.
+ t.x.append(5) # external modification
+ t.x # dummy access notices change and triggers hook
+ See testChangeHookMutableProperty for an example of the expected
+ behavior.
+ """
+ def decorator(funcs):
+ if hasattr(funcs, "__call__"):
+ funcs = funcs()
+ fget = funcs.get("fget")
+ fset = funcs.get("fset")
+ name = funcs.get("name", "<unknown>")
+ def _fget(self, new_value=None, from_fset=False): # only used if mutable == True
+ if from_fset == True:
+ value = new_value # compare new value with cached
+ else:
+ value = fget(self) # compare current value with cached
+ if _cmp_cached_mutable_property(self, "change hook property", name, value) != 0:
+ # there has been a change, cache new value
+ old_value = _get_cached_mutable_property(self, "change hook property", name)
+ _set_cached_mutable_property(self, "change hook property", name, value)
+ if from_fset == True: # return previously cached value
+ value = old_value
+ else: # the value changed while we weren't looking
+ hook(self, old_value, value)
+ return value
+ def _fset(self, value):
+ if mutable == True: # get cached previous value
+ old_value = _fget(self, new_value=value, from_fset=True)
+ else:
+ old_value = fget(self)
+ fset(self, value)
+ if value != old_value:
+ hook(self, old_value, value)
+ if mutable == True:
+ funcs["fget"] = _fget
+ funcs["fset"] = _fset
+ return funcs
+ return decorator
+class DecoratorTests(unittest.TestCase):
+ def testLocalDoc(self):
+ class Test(object):
+ @Property
+ @doc_property("A fancy property")
+ def x():
+ return {}
+ self.failUnless(Test.x.__doc__ == "A fancy property",
+ Test.x.__doc__)
+ def testLocalProperty(self):
+ class Test(object):
+ @Property
+ @local_property(name="LOCAL")
+ def x():
+ return {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("_LOCAL_value" in dir(t), dir(t))
+ self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
+ def testSettingsProperty(self):
+ class Test(object):
+ @Property
+ @settings_property(name="attr")
+ def x():
+ return {}
+ def __init__(self):
+ self.settings = {}
+ t = Test()
+ self.failUnless(t.x == None, str(t.x))
+ t.x = 'z' # the first set initializes ._LOCAL_value
+ self.failUnless(t.x == 'z', str(t.x))
+ self.failUnless("attr" in t.settings, t.settings)
+ self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
+ def testDefaultingLocalProperty(self):
+ class Test(object):
+ @Property
+ @defaulting_property(default='y', null='x')
+ @local_property(name="DEFAULT", null=5)
+ def x(): return {}
+ t = Test()
+ self.failUnless(t.x == 5, str(t.x))
+ t.x = 'x'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'y'
+ self.failUnless(t.x == 'y', str(t.x))
+ t.x = 'z'
+ self.failUnless(t.x == 'z', str(t.x))
+ t.x = 5
+ self.failUnless(t.x == 5, str(t.x))
+ def testCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testTwoCheckedLocalProperties(self):
+ class Test(object):
+ @Property
+ @checked_property(allowed=['x', 'y', 'z'])
+ @local_property(name="X")
+ def x(): return {}
+ @Property
+ @checked_property(allowed=['a', 'b', 'c'])
+ @local_property(name="A")
+ def a(): return {}
+ def __init__(self):
+ self._A_value = 'a'
+ self._X_value = 'x'
+ t = Test()
+ try:
+ t.x = 'a'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.x = 'x'
+ t.x = 'y'
+ t.x = 'z'
+ try:
+ t.a = 'x'
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ t.a = 'a'
+ t.a = 'b'
+ t.a = 'c'
+ def testFnCheckedLocalProperty(self):
+ class Test(object):
+ @Property
+ @fn_checked_property(lambda v : v in ['x', 'y', 'z'])
+ @local_property(name="CHECKED")
+ def x(): return {}
+ def __init__(self):
+ self._CHECKED_value = 'x'
+ t = Test()
+ self.failUnless(t.x == 'x', str(t.x))
+ try:
+ t.x = None
+ e = None
+ except ValueCheckError, e:
+ pass
+ self.failUnless(type(e) == ValueCheckError, type(e))
+ def testCachedLocalProperty(self):
+ class Gen(object):
+ def __init__(self):
+ self.i = 0
+ def __call__(self, owner):
+ self.i += 1
+ return self.i
+ class Test(object):
+ @Property
+ @cached_property(generator=Gen(), initVal=None)
+ @local_property(name="CACHED")
+ def x(): return {}
+ t = Test()
+ self.failIf("_CACHED_cache" in dir(t), getattr(t, "_CACHED_cache", None))
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ self.failUnless(t.x == 1, t.x)
+ t.x = 8
+ self.failUnless(t.x == 8, t.x)
+ self.failUnless(t.x == 8, t.x)
+ t._CACHED_cache = False # Caching is off, but the stored value
+ val = t.x # is 8, not the initVal (None), so we
+ self.failUnless(val == 8, val) # get 8.
+ t._CACHED_value = None # Now we've set the stored value to None
+ val = t.x # so future calls to fget (like this)
+ self.failUnless(val == 2, val) # will call the generator every time...
+ val = t.x
+ self.failUnless(val == 3, val)
+ val = t.x
+ self.failUnless(val == 4, val)
+ t._CACHED_cache = True # We turn caching back on, and get
+ self.failUnless(t.x == 1, str(t.x)) # the original cached value.
+ del t._CACHED_cached_value # Removing that value forces a
+ self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call
+ self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which
+ self.failUnless(t.x == 5, str(t.x)) # we get the new cached value.
+ def testPrimedLocalProperty(self):
+ class Test(object):
+ def prime(self):
+ self.settings["PRIMED"] = "initialized"
+ @Property
+ @primed_property(primer=prime, initVal=None)
+ @settings_property(name="PRIMED")
+ def x(): return {}
+ def __init__(self):
+ self.settings={}
+ t = Test()
+ self.failIf("_PRIMED_prime" in dir(t), getattr(t, "_PRIMED_prime", None))
+ self.failUnless(t.x == "initialized", t.x)
+ t.x = 1
+ self.failUnless(t.x == 1, t.x)
+ t.x = None
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = True
+ t.x = 3
+ self.failUnless(t.x == "initialized", t.x)
+ t._PRIMED_prime = False
+ t.x = 3
+ self.failUnless(t.x == 3, t.x)
+ def testChangeHookLocalProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+ @Property
+ @change_hook_property(_hook)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 1
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == 1, t.new)
+ t.x = 2
+ self.failUnless(t.old == 1, t.old)
+ self.failUnless(t.new == 2, t.new)
+ def testChangeHookMutableProperty(self):
+ class Test(object):
+ def _hook(self, old, new):
+ self.old = old
+ self.new = new
+ self.hook_calls += 1
+ @Property
+ @change_hook_property(_hook, mutable=True)
+ @local_property(name="HOOKED")
+ def x(): return {}
+ t = Test()
+ t.hook_calls = 0
+ t.x = []
+ self.failUnless(t.old == None, t.old)
+ self.failUnless(t.new == [], t.new)
+ self.failUnless(t.hook_calls == 1, t.hook_calls)
+ a = t.x
+ a.append(5)
+ t.x = a
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 2, t.hook_calls)
+ t.x = []
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [], t.new)
+ self.failUnless(t.hook_calls == 3, t.hook_calls)
+ # now append without reassigning. this doesn't trigger the
+ # change, since we don't ever set t.x, only get it and mess
+ # with it. It does, however, update our t.new, since t.new =
+ # t.x and is not a static copy.
+ t.x.append(5)
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 3, t.hook_calls)
+ # however, the next t.x get _will_ notice the change...
+ a = t.x
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5], t.new)
+ self.failUnless(t.hook_calls == 4, t.hook_calls)
+ t.x.append(6) # this append(6) is not noticed yet
+ self.failUnless(t.old == [], t.old)
+ self.failUnless(t.new == [5,6], t.new)
+ self.failUnless(t.hook_calls == 4, t.hook_calls)
+ # this append(7) is not noticed, but the t.x get causes the
+ # append(6) to be noticed
+ t.x.append(7)
+ self.failUnless(t.old == [5], t.old)
+ self.failUnless(t.new == [5,6,7], t.new)
+ self.failUnless(t.hook_calls == 5, t.hook_calls)
+ a = t.x # now the append(7) is noticed
+ self.failUnless(t.old == [5,6], t.old)
+ self.failUnless(t.new == [5,6,7], t.new)
+ self.failUnless(t.hook_calls == 6, t.hook_calls)
+suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
diff --git a/libbe/rcs.py b/libbe/rcs.py
index e69de29..294b8e0 100644
--- a/libbe/rcs.py
+++ b/libbe/rcs.py
@@ -0,0 +1,876 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# Alexander Belchenko <bialix@ukr.net>
+# Ben Finney <ben+python@benfinney.id.au>
+# Chris Ball <cjb@laptop.org>
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+from subprocess import Popen, PIPE
+import codecs
+import os
+import os.path
+import re
+from socket import gethostname
+import shutil
+import sys
+import tempfile
+import unittest
+import doctest
+from utility import Dir, search_parent_directories
+def _get_matching_rcs(matchfn):
+ """Return the first module for which matchfn(RCS_instance) is true"""
+ import arch
+ import bzr
+ import darcs
+ import git
+ import hg
+ for module in [arch, bzr, darcs, git, hg]:
+ rcs = module.new()
+ if matchfn(rcs) == True:
+ return rcs
+ del(rcs)
+ return RCS()
+def rcs_by_name(rcs_name):
+ """Return the module for the RCS with the given name"""
+ return _get_matching_rcs(lambda rcs: rcs.name == rcs_name)
+def detect_rcs(dir):
+ """Return an RCS instance for the rcs being used in this directory"""
+ return _get_matching_rcs(lambda rcs: rcs.detect(dir))
+def installed_rcs():
+ """Return an instance of an installed RCS"""
+ return _get_matching_rcs(lambda rcs: rcs.installed())
+class CommandError(Exception):
+ def __init__(self, command, status, err_str):
+ strerror = ["Command failed (%d):\n %s\n" % (status, err_str),
+ "while executing\n %s" % command]
+ Exception.__init__(self, "\n".join(strerror))
+ self.command = command
+ self.status = status
+ self.err_str = err_str
+class SettingIDnotSupported(NotImplementedError):
+ pass
+class RCSnotRooted(Exception):
+ def __init__(self):
+ msg = "RCS not rooted"
+ Exception.__init__(self, msg)
+class PathNotInRoot(Exception):
+ def __init__(self, path, root):
+ msg = "Path '%s' not in root '%s'" % (path, root)
+ Exception.__init__(self, msg)
+ self.path = path
+ self.root = root
+class NoSuchFile(Exception):
+ def __init__(self, pathname, root="."):
+ path = os.path.abspath(os.path.join(root, pathname))
+ Exception.__init__(self, "No such file: %s" % path)
+class EmptyCommit(Exception):
+ def __init__(self):
+ Exception.__init__(self, "No changes to commit")
+def new():
+ return RCS()
+class RCS(object):
+ """
+ This class implements a 'no-rcs' interface.
+ Support for other RCSs can be added by subclassing this class, and
+ overriding methods _rcs_*() with code appropriate for your RCS.
+ The methods _u_*() are utility methods available to the _rcs_*()
+ methods.
+ """
+ name = "None"
+ client = "" # command-line tool for _u_invoke_client
+ versioned = False
+ def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()):
+ self.paranoid = paranoid
+ self.verboseInvoke = False
+ self.rootdir = None
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ self.encoding = encoding
+ def __del__(self):
+ self.cleanup()
+ def _rcs_help(self):
+ """
+ Return the command help string.
+ (Allows a simple test to see if the client is installed.)
+ """
+ pass
+ def _rcs_detect(self, path=None):
+ """
+ Detect whether a directory is revision controlled with this RCS.
+ """
+ return True
+ def _rcs_root(self, path):
+ """
+ Get the RCS root. This is the default working directory for
+ future invocations. You would normally set this to the root
+ directory for your RCS.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ if path == "":
+ path = os.path.abspath(".")
+ return path
+ def _rcs_init(self, path):
+ """
+ Begin versioning the tree based at path.
+ """
+ pass
+ def _rcs_cleanup(self):
+ """
+ Remove any cruft that _rcs_init() created outside of the
+ versioned tree.
+ """
+ pass
+ def _rcs_get_user_id(self):
+ """
+ Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the RCS has not been configured with a username, return None.
+ """
+ return None
+ def _rcs_set_user_id(self, value):
+ """
+ Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the RCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ raise SettingIDnotSupported
+ def _rcs_add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ pass
+ def _rcs_remove(self, path):
+ """
+ Remove the file at path from version control. Optionally
+ remove the file from the filesystem as well.
+ """
+ pass
+ def _rcs_update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ pass
+ def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ """
+ Get the file contents as they were in a given revision.
+ Revision==None specifies the current revision.
+ """
+ assert revision == None, \
+ "The %s RCS does not support revision specifiers" % self.name
+ if binary == False:
+ f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding)
+ else:
+ f = open(os.path.join(self.rootdir, path), "rb")
+ contents = f.read()
+ f.close()
+ return contents
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ dir specifies a directory to create the duplicate in.
+ """
+ shutil.copytree(self.rootdir, directory, True)
+ def _rcs_commit(self, commitfile, allow_empty=False):
+ """
+ Commit the current working directory, using the contents of
+ commitfile as the comment. Return the name of the old
+ revision (or None if commits are not supported).
+ If allow_empty == False, raise EmptyCommit if there are no
+ changes to commit.
+ """
+ return None
+ def installed(self):
+ try:
+ self._rcs_help()
+ return True
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ return False
+ except CommandError:
+ return False
+ def detect(self, path="."):
+ """
+ Detect whether a directory is revision controlled with this RCS.
+ """
+ return self._rcs_detect(path)
+ def root(self, path):
+ """
+ Set the root directory to the path's RCS root. This is the
+ default working directory for future invocations.
+ """
+ self.rootdir = self._rcs_root(path)
+ def init(self, path):
+ """
+ Begin versioning the tree based at path.
+ Also roots the rcs at path.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ self._rcs_init(path)
+ self.root(path)
+ def cleanup(self):
+ self._rcs_cleanup()
+ def get_user_id(self):
+ """
+ Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the RCS has not been configured with a username, return the user's
+ id. You can override the automatic lookup procedure by setting the
+ RCS.user_id attribute to a string of your choice.
+ """
+ if hasattr(self, "user_id"):
+ if self.user_id != None:
+ return self.user_id
+ id = self._rcs_get_user_id()
+ if id == None:
+ name = self._u_get_fallback_username()
+ email = self._u_get_fallback_email()
+ id = self._u_create_id(name, email)
+ print >> sys.stderr, "Guessing id '%s'" % id
+ try:
+ self.set_user_id(id)
+ except SettingIDnotSupported:
+ pass
+ return id
+ def set_user_id(self, value):
+ """
+ Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the RCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ self._rcs_set_user_id(value)
+ def add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ self._rcs_add(self._u_rel_path(path))
+ def remove(self, path):
+ """
+ Remove a file from both version control and the filesystem.
+ """
+ self._rcs_remove(self._u_rel_path(path))
+ if os.path.exists(path):
+ os.remove(path)
+ def recursive_remove(self, dirname):
+ """
+ Remove a file/directory and all its decendents from both
+ version control and the filesystem.
+ """
+ if not os.path.exists(dirname):
+ raise NoSuchFile(dirname)
+ for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
+ filenames.extend(dirnames)
+ for path in filenames:
+ fullpath = os.path.join(dirpath, path)
+ if os.path.exists(fullpath) == False:
+ continue
+ self._rcs_remove(self._u_rel_path(fullpath))
+ if os.path.exists(dirname):
+ shutil.rmtree(dirname)
+ def update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ self._rcs_update(self._u_rel_path(path))
+ def get_file_contents(self, path, revision=None, allow_no_rcs=False, binary=False):
+ """
+ Get the file as it was in a given revision.
+ Revision==None specifies the current revision.
+ """
+ if not os.path.exists(path):
+ raise NoSuchFile(path)
+ if self._use_rcs(path, allow_no_rcs):
+ relpath = self._u_rel_path(path)
+ contents = self._rcs_get_file_contents(relpath,revision,binary=binary)
+ else:
+ f = codecs.open(path, "r", self.encoding)
+ contents = f.read()
+ f.close()
+ return contents
+ def set_file_contents(self, path, contents, allow_no_rcs=False, binary=False):
+ """
+ Set the file contents under version control.
+ """
+ add = not os.path.exists(path)
+ if binary == False:
+ f = codecs.open(path, "w", self.encoding)
+ else:
+ f = open(path, "wb")
+ f.write(contents)
+ f.close()
+ if self._use_rcs(path, allow_no_rcs):
+ if add:
+ self.add(path)
+ else:
+ self.update(path)
+ def mkdir(self, path, allow_no_rcs=False, check_parents=True):
+ """
+ Create (if neccessary) a directory at path under version
+ control.
+ """
+ if check_parents == True:
+ parent = os.path.dirname(path)
+ if not os.path.exists(parent): # recurse through parents
+ self.mkdir(parent, allow_no_rcs, check_parents)
+ if not os.path.exists(path):
+ os.mkdir(path)
+ if self._use_rcs(path, allow_no_rcs):
+ self.add(path)
+ else:
+ assert os.path.isdir(path)
+ if self._use_rcs(path, allow_no_rcs):
+ #self.update(path)# Don't update directories. Changing files
+ pass # underneath them should be sufficient.
+ def duplicate_repo(self, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ Return the path to the arbitrary directory at the base of the new repo.
+ """
+ # Dirname in Baseir to protect against simlink attacks.
+ if self._duplicateBasedir == None:
+ self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs')
+ self._duplicateDirname = \
+ os.path.join(self._duplicateBasedir, "duplicate")
+ self._rcs_duplicate_repo(directory=self._duplicateDirname,
+ revision=revision)
+ return self._duplicateDirname
+ def remove_duplicate_repo(self):
+ """
+ Clean up a duplicate repo created with duplicate_repo().
+ """
+ if self._duplicateBasedir != None:
+ shutil.rmtree(self._duplicateBasedir)
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ def commit(self, summary, body=None, allow_empty=False):
+ """
+ Commit the current working directory, with a commit message
+ string summary and body. Return the name of the old revision
+ (or None if versioning is not supported).
+ If allow_empty == False (the default), raise EmptyCommit if
+ there are no changes to commit.
+ """
+ summary = summary.strip()+'\n'
+ if body is not None:
+ summary += '\n' + body.strip() + '\n'
+ descriptor, filename = tempfile.mkstemp()
+ revision = None
+ try:
+ temp_file = os.fdopen(descriptor, 'wb')
+ temp_file.write(summary)
+ temp_file.flush()
+ self.precommit()
+ revision = self._rcs_commit(filename, allow_empty=allow_empty)
+ temp_file.close()
+ self.postcommit()
+ finally:
+ os.remove(filename)
+ return revision
+ def precommit(self):
+ """
+ Executed before all attempted commits.
+ """
+ pass
+ def postcommit(self):
+ """
+ Only executed after successful commits.
+ """
+ pass
+ def _u_any_in_string(self, list, string):
+ """
+ Return True if any of the strings in list are in string.
+ Otherwise return False.
+ """
+ for list_string in list:
+ if list_string in string:
+ return True
+ return False
+ def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None):
+ """
+ expect should be a tuple of allowed exit codes. cwd should be
+ the directory from which the command will be executed.
+ """
+ if cwd == None:
+ cwd = self.rootdir
+ if self.verboseInvoke == True:
+ print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args))
+ try :
+ if sys.platform != "win32":
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
+ else:
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ shell=True, cwd=cwd)
+ except OSError, e :
+ raise CommandError(args, e.args[0], e)
+ output, error = q.communicate(input=stdin)
+ status = q.wait()
+ if self.verboseInvoke == True:
+ print >> sys.stderr, "%d\n%s%s" % (status, output, error)
+ if status not in expect:
+ raise CommandError(args, status, error)
+ return status, output, error
+ def _u_invoke_client(self, *args, **kwargs):
+ directory = kwargs.get('directory',None)
+ expect = kwargs.get('expect', (0,))
+ stdin = kwargs.get('stdin', None)
+ cl_args = [self.client]
+ cl_args.extend(args)
+ return self._u_invoke(cl_args, stdin=stdin,expect=expect,cwd=directory)
+ def _u_search_parent_directories(self, path, filename):
+ """
+ Find the file (or directory) named filename in path or in any
+ of path's parents.
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
+ """
+ return search_parent_directories(path, filename)
+ def _use_rcs(self, path, allow_no_rcs):
+ """
+ Try and decide if _rcs_add/update/mkdir/etc calls will
+ succeed. Returns True is we think the rcs_call would
+ succeeed, and False otherwise.
+ """
+ use_rcs = True
+ exception = None
+ if self.rootdir != None:
+ if self.path_in_root(path) == False:
+ use_rcs = False
+ exception = PathNotInRoot(path, self.rootdir)
+ else:
+ use_rcs = False
+ exception = RCSnotRooted
+ if use_rcs == False and allow_no_rcs==False:
+ raise exception
+ return use_rcs
+ def path_in_root(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> rcs = new()
+ >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c")
+ True
+ >>> rcs.path_in_root("/a.b/.be", "/a.b/c")
+ False
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise RCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ return False
+ return True
+ def _u_rel_path(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> rcs = new()
+ >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c")
+ '.be'
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise RCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ raise PathNotInRoot(path, absRootSlashedDir)
+ assert path != absRootSlashedDir, \
+ "file %s == root directory %s" % (path, absRootSlashedDir)
+ relpath = path[len(absRootSlashedDir):]
+ return relpath
+ def _u_abspath(self, path, root=None):
+ """
+ Return the absolute path from a path realtive to root.
+ >>> rcs = new()
+ >>> rcs._u_abspath(".be", "/a.b/c")
+ '/a.b/c/.be'
+ """
+ if root == None:
+ assert self.rootdir != None, "RCS not rooted"
+ root = self.rootdir
+ return os.path.abspath(os.path.join(root, path))
+ def _u_create_id(self, name, email=None):
+ """
+ >>> rcs = new()
+ >>> rcs._u_create_id("John Doe", "jdoe@example.com")
+ 'John Doe <jdoe@example.com>'
+ >>> rcs._u_create_id("John Doe")
+ 'John Doe'
+ """
+ assert len(name) > 0
+ if email == None or len(email) == 0:
+ return name
+ else:
+ return "%s <%s>" % (name, email)
+ def _u_parse_id(self, value):
+ """
+ >>> rcs = new()
+ >>> rcs._u_parse_id("John Doe <jdoe@example.com>")
+ ('John Doe', 'jdoe@example.com')
+ >>> rcs._u_parse_id("John Doe")
+ ('John Doe', None)
+ >>> try:
+ ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>")
+ ... except AssertionError:
+ ... print "Invalid match"
+ Invalid match
+ """
+ emailexp = re.compile("(.*) <([^>]*)>(.*)")
+ match = emailexp.search(value)
+ if match == None:
+ email = None
+ name = value
+ else:
+ assert len(match.groups()) == 3
+ assert match.groups()[2] == "", match.groups()
+ email = match.groups()[1]
+ name = match.groups()[0]
+ assert name != None
+ assert len(name) > 0
+ return (name, email)
+ def _u_get_fallback_username(self):
+ name = None
+ for envariable in ["LOGNAME", "USERNAME"]:
+ if os.environ.has_key(envariable):
+ name = os.environ[envariable]
+ break
+ assert name != None
+ return name
+ def _u_get_fallback_email(self):
+ hostname = gethostname()
+ name = self._u_get_fallback_username()
+ return "%s@%s" % (name, hostname)
+ def _u_parse_commitfile(self, commitfile):
+ """
+ Split the commitfile created in self.commit() back into
+ summary and header lines.
+ """
+ f = codecs.open(commitfile, "r", self.encoding)
+ summary = f.readline()
+ body = f.read()
+ body.lstrip('\n')
+ if len(body) == 0:
+ body = None
+ f.close()
+ return (summary, body)
+def setup_rcs_test_fixtures(testcase):
+ """Set up test fixtures for RCS test case."""
+ testcase.rcs = testcase.Class()
+ testcase.dir = Dir()
+ testcase.dirname = testcase.dir.path
+ rcs_not_supporting_uninitialized_user_id = []
+ rcs_not_supporting_set_user_id = ["None", "hg"]
+ testcase.rcs_supports_uninitialized_user_id = (
+ testcase.rcs.name not in rcs_not_supporting_uninitialized_user_id)
+ testcase.rcs_supports_set_user_id = (
+ testcase.rcs.name not in rcs_not_supporting_set_user_id)
+ if not testcase.rcs.installed():
+ testcase.fail(
+ "%(name)s RCS not found" % vars(testcase.Class))
+ if testcase.Class.name != "None":
+ testcase.failIf(
+ testcase.rcs.detect(testcase.dirname),
+ "Detected %(name)s RCS before initialising"
+ % vars(testcase.Class))
+ testcase.rcs.init(testcase.dirname)
+class RCSTestCase(unittest.TestCase):
+ """Test cases for base RCS class."""
+ Class = RCS
+ def __init__(self, *args, **kwargs):
+ super(RCSTestCase, self).__init__(*args, **kwargs)
+ self.dirname = None
+ def setUp(self):
+ super(RCSTestCase, self).setUp()
+ setup_rcs_test_fixtures(self)
+ def tearDown(self):
+ del(self.rcs)
+ super(RCSTestCase, self).tearDown()
+ def full_path(self, rel_path):
+ return os.path.join(self.dirname, rel_path)
+class RCS_init_TestCase(RCSTestCase):
+ """Test cases for RCS.init method."""
+ def test_detect_should_succeed_after_init(self):
+ """Should detect RCS in directory after initialization."""
+ self.failUnless(
+ self.rcs.detect(self.dirname),
+ "Did not detect %(name)s RCS after initialising"
+ % vars(self.Class))
+ def test_rcs_rootdir_in_specified_root_path(self):
+ """RCS root directory should be in specified root path."""
+ rp = os.path.realpath(self.rcs.rootdir)
+ dp = os.path.realpath(self.dirname)
+ rcs_name = self.Class.name
+ self.failUnless(
+ dp == rp or rp == None,
+ "%(rcs_name)s RCS root in wrong dir (%(dp)s %(rp)s)" % vars())
+class RCS_get_user_id_TestCase(RCSTestCase):
+ """Test cases for RCS.get_user_id method."""
+ def test_gets_existing_user_id(self):
+ """Should get the existing user ID."""
+ if not self.rcs_supports_uninitialized_user_id:
+ return
+ user_id = self.rcs.get_user_id()
+ self.failUnless(
+ user_id is not None,
+ "unable to get a user id")
+class RCS_set_user_id_TestCase(RCSTestCase):
+ """Test cases for RCS.set_user_id method."""
+ def setUp(self):
+ super(RCS_set_user_id_TestCase, self).setUp()
+ if self.rcs_supports_uninitialized_user_id:
+ self.prev_user_id = self.rcs.get_user_id()
+ else:
+ self.prev_user_id = "Uninitialized identity <bogus@example.org>"
+ if self.rcs_supports_set_user_id:
+ self.test_new_user_id = "John Doe <jdoe@example.com>"
+ self.rcs.set_user_id(self.test_new_user_id)
+ def tearDown(self):
+ if self.rcs_supports_set_user_id:
+ self.rcs.set_user_id(self.prev_user_id)
+ super(RCS_set_user_id_TestCase, self).tearDown()
+ def test_raises_error_in_unsupported_vcs(self):
+ """Should raise an error in a VCS that doesn't support it."""
+ if self.rcs_supports_set_user_id:
+ return
+ self.assertRaises(
+ SettingIDnotSupported,
+ self.rcs.set_user_id, "foo")
+ def test_updates_user_id_in_supporting_rcs(self):
+ """Should update the user ID in an RCS that supports it."""
+ if not self.rcs_supports_set_user_id:
+ return
+ user_id = self.rcs.get_user_id()
+ self.failUnlessEqual(
+ self.test_new_user_id, user_id,
+ "user id not set correctly (expected %s, got %s)"
+ % (self.test_new_user_id, user_id))
+def setup_rcs_revision_test_fixtures(testcase):
+ """Set up revision test fixtures for RCS test case."""
+ testcase.test_dirs = ['a', 'a/b', 'c']
+ for path in testcase.test_dirs:
+ testcase.rcs.mkdir(testcase.full_path(path))
+ testcase.test_files = ['a/text', 'a/b/text']
+ testcase.test_contents = {
+ 'rev_1': "Lorem ipsum",
+ 'uncommitted': "dolor sit amet",
+ }
+class RCS_mkdir_TestCase(RCSTestCase):
+ """Test cases for RCS.mkdir method."""
+ def setUp(self):
+ super(RCS_mkdir_TestCase, self).setUp()
+ setup_rcs_revision_test_fixtures(self)
+ def tearDown(self):
+ for path in reversed(sorted(self.test_dirs)):
+ self.rcs.recursive_remove(self.full_path(path))
+ super(RCS_mkdir_TestCase, self).tearDown()
+ def test_mkdir_creates_directory(self):
+ """Should create specified directory in filesystem."""
+ for path in self.test_dirs:
+ full_path = self.full_path(path)
+ self.failUnless(
+ os.path.exists(full_path),
+ "path %(full_path)s does not exist" % vars())
+class RCS_commit_TestCase(RCSTestCase):
+ """Test cases for RCS.commit method."""
+ def setUp(self):
+ super(RCS_commit_TestCase, self).setUp()
+ setup_rcs_revision_test_fixtures(self)
+ def tearDown(self):
+ for path in reversed(sorted(self.test_dirs)):
+ self.rcs.recursive_remove(self.full_path(path))
+ super(RCS_commit_TestCase, self).tearDown()
+ def test_file_contents_as_specified(self):
+ """Should set file contents as specified."""
+ test_contents = self.test_contents['rev_1']
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(full_path, test_contents)
+ current_contents = self.rcs.get_file_contents(full_path)
+ self.failUnlessEqual(test_contents, current_contents)
+ def test_file_contents_as_committed(self):
+ """Should have file contents as specified after commit."""
+ test_contents = self.test_contents['rev_1']
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(full_path, test_contents)
+ revision = self.rcs.commit("Initial file contents.")
+ current_contents = self.rcs.get_file_contents(full_path)
+ self.failUnlessEqual(test_contents, current_contents)
+ def test_file_contents_as_set_when_uncommitted(self):
+ """Should set file contents as specified after commit."""
+ if not self.rcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.rcs.commit("Initial file contents.")
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ current_contents = self.rcs.get_file_contents(full_path)
+ self.failUnlessEqual(
+ self.test_contents['uncommitted'], current_contents)
+ def test_revision_file_contents_as_committed(self):
+ """Should get file contents as committed to specified revision."""
+ if not self.rcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.rcs.commit("Initial file contents.")
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ committed_contents = self.rcs.get_file_contents(
+ full_path, revision)
+ self.failUnlessEqual(
+ self.test_contents['rev_1'], committed_contents)
+class RCS_duplicate_repo_TestCase(RCSTestCase):
+ """Test cases for RCS.duplicate_repo method."""
+ def setUp(self):
+ super(RCS_duplicate_repo_TestCase, self).setUp()
+ setup_rcs_revision_test_fixtures(self)
+ def tearDown(self):
+ self.rcs.remove_duplicate_repo()
+ for path in reversed(sorted(self.test_dirs)):
+ self.rcs.recursive_remove(self.full_path(path))
+ super(RCS_duplicate_repo_TestCase, self).tearDown()
+ def test_revision_file_contents_as_committed(self):
+ """Should match file contents as committed to specified revision."""
+ if not self.rcs.versioned:
+ return
+ for path in self.test_files:
+ full_path = self.full_path(path)
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['rev_1'])
+ revision = self.rcs.commit("Commit current status")
+ self.rcs.set_file_contents(
+ full_path, self.test_contents['uncommitted'])
+ dup_repo_path = self.rcs.duplicate_repo(revision)
+ dup_file_path = os.path.join(dup_repo_path, path)
+ dup_file_contents = file(dup_file_path, 'rb').read()
+ self.failUnlessEqual(
+ self.test_contents['rev_1'], dup_file_contents)
+ self.rcs.remove_duplicate_repo()
+def make_rcs_testcase_subclasses(rcs_class, namespace):
+ """Make RCSTestCase subclasses for rcs_class in the namespace."""
+ rcs_testcase_classes = [
+ c for c in (
+ ob for ob in globals().values() if isinstance(ob, type))
+ if issubclass(c, RCSTestCase)]
+ for base_class in rcs_testcase_classes:
+ testcase_class_name = rcs_class.__name__ + base_class.__name__
+ testcase_class_bases = (base_class,)
+ testcase_class_dict = dict(base_class.__dict__)
+ testcase_class_dict['Class'] = rcs_class
+ testcase_class = type(
+ testcase_class_name, testcase_class_bases, testcase_class_dict)
+ setattr(namespace, testcase_class_name, testcase_class)
+unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/settings_object.py b/libbe/settings_object.py
index e69de29..dde247f 100644
--- a/libbe/settings_object.py
+++ b/libbe/settings_object.py
@@ -0,0 +1,417 @@
+# Bugs Everywhere - a distributed bugtracker
+# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+This module provides a base class implementing settings-dict based
+property storage useful for BE objects with saved properties
+(e.g. BugDir, Bug, Comment). For example usage, consider the
+unittests at the end of the module.
+import doctest
+import unittest
+from properties import Property, doc_property, local_property, \
+ defaulting_property, checked_property, fn_checked_property, \
+ cached_property, primed_property, change_hook_property, \
+ settings_property
+class _Token (object):
+ """
+ `Control' value class for properties. We want values that only
+ mean something to the settings_object module.
+ """
+ pass
+class UNPRIMED (_Token):
+ "Property has not been primed."
+ pass
+class EMPTY (_Token):
+ """
+ Property has been primed but has no user-set value, so use
+ default/generator value.
+ """
+ pass
+def prop_save_settings(self, old, new):
+ """
+ The default action undertaken when a property changes.
+ """
+ if self.sync_with_disk==True:
+ self.save_settings()
+def prop_load_settings(self):
+ """
+ The default action undertaken when an UNPRIMED property is accessed.
+ """
+ if self.sync_with_disk==True and self._settings_loaded==False:
+ self.load_settings()
+ else:
+ self._setup_saved_settings(flag_as_loaded=False)
+# Some name-mangling routines for pretty printing setting names
+def setting_name_to_attr_name(self, name):
+ """
+ Convert keys to the .settings dict into their associated
+ SavedSettingsObject attribute names.
+ >>> print setting_name_to_attr_name(None,"User-id")
+ user_id
+ """
+ return name.lower().replace('-', '_')
+def attr_name_to_setting_name(self, name):
+ """
+ The inverse of setting_name_to_attr_name.
+ >>> print attr_name_to_setting_name(None, "user_id")
+ User-id
+ """
+ return name.capitalize().replace('_', '-')
+def versioned_property(name, doc,
+ default=None, generator=None,
+ change_hook=prop_save_settings,
+ mutable=False,
+ primer=prop_load_settings,
+ allowed=None, check_fn=None,
+ settings_properties=[],
+ required_saved_properties=[],
+ require_save=False):
+ """
+ Combine the common decorators in a single function.
+ Use zero or one (but not both) of default or generator, since a
+ working default will keep the generator from functioning. Use the
+ default if you know what you want the default value to be at
+ 'coding time'. Use the generator if you can write a function to
+ determine a valid default at run time. If both default and
+ generator are None, then the property will be a defaulting
+ property which defaults to None.
+ allowed and check_fn have a similar relationship, although you can
+ use both of these if you want. allowed compares the proposed
+ value against a list determined at 'coding time' and check_fn
+ allows more flexible comparisons to take place at run time.
+ Set require_save to True if you want to save the default/generated
+ value for a property, to protect against future changes. E.g., we
+ currently expect all comments to be 'text/plain' but in the future
+ we may want to default to 'text/html'. If we don't want the old
+ comments to be interpreted as 'text/html', we would require that
+ the content type be saved.
+ change_hook, primer, settings_properties, and
+ required_saved_properties are only options to get their defaults
+ into our local scope. Don't mess with them.
+ Set mutable=True if:
+ * default is a mutable
+ * your generator function may return mutables
+ * you set change_hook and might have mutable property values
+ See the docstrings in libbe.properties for details on how each of
+ these cases are handled.
+ """
+ settings_properties.append(name)
+ if require_save == True:
+ required_saved_properties.append(name)
+ def decorator(funcs):
+ fulldoc = doc
+ if default != None or generator == None:
+ defaulting = defaulting_property(default=default, null=EMPTY,
+ mutable_default=mutable)
+ fulldoc += "\n\nThis property defaults to %s." % default
+ if generator != None:
+ cached = cached_property(generator=generator, initVal=EMPTY,
+ mutable=mutable)
+ fulldoc += "\n\nThis property is generated with %s." % generator
+ if check_fn != None:
+ fn_checked = fn_checked_property(value_allowed_fn=check_fn)
+ fulldoc += "\n\nThis property is checked with %s." % check_fn
+ if allowed != None:
+ checked = checked_property(allowed=allowed)
+ fulldoc += "\n\nThe allowed values for this property are: %s." \
+ % (', '.join(allowed))
+ hooked = change_hook_property(hook=change_hook, mutable=mutable)
+ primed = primed_property(primer=primer, initVal=UNPRIMED)
+ settings = settings_property(name=name, null=UNPRIMED)
+ docp = doc_property(doc=fulldoc)
+ deco = hooked(primed(settings(docp(funcs))))
+ if default != None or generator == None:
+ deco = defaulting(deco)
+ if generator != None:
+ deco = cached(deco)
+ if check_fn != None:
+ deco = fn_checked(deco)
+ if allowed != None:
+ deco = checked(deco)
+ return Property(deco)
+ return decorator
+class SavedSettingsObject(object):
+ # Keep a list of properties that may be stored in the .settings dict.
+ #settings_properties = []
+ # A list of properties that we save to disk, even if they were
+ # never set (in which case we save the default value). This
+ # protects against future changes in default values.
+ #required_saved_properties = []
+ _setting_name_to_attr_name = setting_name_to_attr_name
+ _attr_name_to_setting_name = attr_name_to_setting_name
+ def __init__(self):
+ self._settings_loaded = False
+ self.sync_with_disk = False
+ self.settings = {}
+ def load_settings(self):
+ """Load the settings from disk."""
+ # Override. Must call ._setup_saved_settings() after loading.
+ self.settings = {}
+ self._setup_saved_settings()
+ def _setup_saved_settings(self, flag_as_loaded=True):
+ """
+ To be run after setting self.settings up from disk. Marks all
+ settings as primed.
+ """
+ for property in self.settings_properties:
+ if property not in self.settings:
+ self.settings[property] = EMPTY
+ elif self.settings[property] == UNPRIMED:
+ self.settings[property] = EMPTY
+ if flag_as_loaded == True:
+ self._settings_loaded = True
+ def save_settings(self):
+ """Load the settings from disk."""
+ # Override. Should save the dict output of ._get_saved_settings()
+ settings = self._get_saved_settings()
+ pass # write settings to disk....
+ def _get_saved_settings(self):
+ settings = {}
+ for k,v in self.settings.items():
+ if v != None and v != EMPTY:
+ settings[k] = v
+ for k in self.required_saved_properties:
+ settings[k] = getattr(self, self._setting_name_to_attr_name(k))
+ return settings
+ def clear_cached_setting(self, setting=None):
+ "If setting=None, clear *all* cached settings"
+ if setting != None:
+ if hasattr(self, "_%s_cached_value" % setting):
+ delattr(self, "_%s_cached_value" % setting)
+ else:
+ for setting in settings_properties:
+ self.clear_cached_setting(setting)
+class SavedSettingsObjectTests(unittest.TestCase):
+ def testSimpleProperty(self):
+ """Testing a minimal versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ # access missing setting
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ self.failUnless(len(t.settings) == 0, len(t.settings))
+ self.failUnless(t.content_type == None, t.content_type)
+ # accessing t.content_type triggers the priming, which runs
+ # t._setup_saved_settings, which fills out t.settings with
+ # EMPTY data. t._settings_loaded is still false though, since
+ # the default priming does not do any of the `official' loading
+ # that occurs in t.load_settings.
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ # load settings creates an EMPTY value in the settings array
+ t.load_settings()
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ # now we set a value
+ t.content_type = 5
+ self.failUnless(t.settings["Content-type"] == 5,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == 5, t.content_type)
+ self.failUnless(t.settings["Content-type"] == 5,
+ t.settings["Content-type"])
+ # now we set another value
+ t.content_type = "text/plain"
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings["Content-type"] == "text/plain",
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"},
+ t._get_saved_settings())
+ # now we clear to the post-primed value
+ t.content_type = EMPTY
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t.content_type == None, t.content_type)
+ self.failUnless(len(t.settings) == 1, len(t.settings))
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ def testDefaultingProperty(self):
+ """Testing a defaulting versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ t.load_settings()
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.content_type == "text/plain", t.content_type)
+ self.failUnless(t.settings["Content-type"] == EMPTY,
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings() == {}, t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t.content_type == "text/html",
+ t.content_type)
+ self.failUnless(t.settings["Content-type"] == "text/html",
+ t.settings["Content-type"])
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testRequiredDefaultingProperty(self):
+ """Testing a required defaulting versioned property"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ require_save=True)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testClassVersionedPropertyDefinition(self):
+ """Testing a class-specific _versioned property decorator"""
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ def _versioned_property(settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties,
+ **kwargs):
+ if "settings_properties" not in kwargs:
+ kwargs["settings_properties"] = settings_properties
+ if "required_saved_properties" not in kwargs:
+ kwargs["required_saved_properties"]=required_saved_properties
+ return versioned_property(**kwargs)
+ @_versioned_property(name="Content-type",
+ doc="A test property",
+ default="text/plain",
+ require_save=True)
+ def content_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"},
+ t._get_saved_settings())
+ t.content_type = "text/html"
+ self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
+ t._get_saved_settings())
+ def testMutableChangeHookedProperty(self):
+ """Testing a mutable change-hooked property"""
+ SAVES = []
+ def prop_log_save_settings(self, old, new, saves=SAVES):
+ saves.append("'%s' -> '%s'" % (str(old), str(new)))
+ prop_save_settings(self, old, new)
+ class Test(SavedSettingsObject):
+ settings_properties = []
+ required_saved_properties = []
+ @versioned_property(name="List-type",
+ doc="A test property",
+ mutable=True,
+ change_hook=prop_log_save_settings,
+ settings_properties=settings_properties,
+ required_saved_properties=required_saved_properties)
+ def list_type(): return {}
+ def __init__(self):
+ SavedSettingsObject.__init__(self)
+ t = Test()
+ self.failUnless(t._settings_loaded == False, t._settings_loaded)
+ t.load_settings()
+ self.failUnless(SAVES == [], SAVES)
+ self.failUnless(t._settings_loaded == True, t._settings_loaded)
+ self.failUnless(t.list_type == None, t.list_type)
+ self.failUnless(SAVES == [
+ "'None' -> '<class 'libbe.settings_object.EMPTY'>'"
+ ], SAVES)
+ self.failUnless(t.settings["List-type"]==EMPTY,t.settings["List-type"])
+ t.list_type = []
+ self.failUnless(t.settings["List-type"] == [], t.settings["List-type"])
+ self.failUnless(SAVES == [
+ "'None' -> '<class 'libbe.settings_object.EMPTY'>'",
+ "'<class 'libbe.settings_object.EMPTY'>' -> '[]'"
+ ], SAVES)
+ t.list_type.append(5)
+ self.failUnless(SAVES == [
+ "'None' -> '<class 'libbe.settings_object.EMPTY'>'",
+ "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
+ ], SAVES)
+ self.failUnless(t.settings["List-type"] == [5],t.settings["List-type"])
+ self.failUnless(SAVES == [ # the append(5) has not yet been saved
+ "'None' -> '<class 'libbe.settings_object.EMPTY'>'",
+ "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
+ ], SAVES)
+ self.failUnless(t.list_type == [5], t.list_type) # <-get triggers saved
+ self.failUnless(SAVES == [ # now the append(5) has been saved.
+ "'None' -> '<class 'libbe.settings_object.EMPTY'>'",
+ "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
+ "'[]' -> '[5]'"
+ ], SAVES)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/tree.py b/libbe/tree.py
index e69de29..45ae085 100644
--- a/libbe/tree.py
+++ b/libbe/tree.py
@@ -0,0 +1,179 @@
+# Bugs Everywhere, a distributed bugtracker
+# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import doctest
+class Tree(list):
+ """
+ Construct
+ +-b---d-g
+ a-+ +-e
+ +-c-+-f-h-i
+ with
+ >>> i = Tree(); i.n = "i"
+ >>> h = Tree([i]); h.n = "h"
+ >>> f = Tree([h]); f.n = "f"
+ >>> e = Tree(); e.n = "e"
+ >>> c = Tree([f,e]); c.n = "c"
+ >>> g = Tree(); g.n = "g"
+ >>> d = Tree([g]); d.n = "d"
+ >>> b = Tree([d]); b.n = "b"
+ >>> a = Tree(); a.n = "a"
+ >>> a.append(c)
+ >>> a.append(b)
+ >>> a.branch_len()
+ 5
+ >>> a.sort(key=lambda node : -node.branch_len())
+ >>> "".join([node.n for node in a.traverse()])
+ 'acfhiebdg'
+ >>> a.sort(key=lambda node : node.branch_len())
+ >>> "".join([node.n for node in a.traverse()])
+ 'abdgcefhi'
+ >>> "".join([node.n for node in a.traverse(depth_first=False)])
+ 'abcdefghi'
+ >>> for depth,node in a.thread():
+ ... print "%*s" % (2*depth+1, node.n)
+ a
+ b
+ d
+ g
+ c
+ e
+ f
+ h
+ i
+ >>> for depth,node in a.thread(flatten=True):
+ ... print "%*s" % (2*depth+1, node.n)
+ a
+ b
+ d
+ g
+ c
+ e
+ f
+ h
+ i
+ >>> a.has_descendant(g)
+ True
+ >>> c.has_descendant(g)
+ False
+ >>> a.has_descendant(a)
+ False
+ >>> a.has_descendant(a, match_self=True)
+ True
+ """
+ def __eq__(self, other):
+ return id(self) == id(other)
+ def branch_len(self):
+ """
+ Exhaustive search every time == SLOW.
+ Use only on small trees, or reimplement by overriding
+ child-addition methods to allow accurate caching.
+ For the tree
+ +-b---d-g
+ a-+ +-e
+ +-c-+-f-h-i
+ this method returns 5.
+ """
+ if len(self) == 0:
+ return 1
+ else:
+ return 1 + max([child.branch_len() for child in self])
+ def sort(self, *args, **kwargs):
+ """
+ This method can be slow, e.g. on a branch_len() sort, since a
+ node at depth N from the root has it's branch_len() method
+ called N times.
+ """
+ list.sort(self, *args, **kwargs)
+ for child in self:
+ child.sort(*args, **kwargs)
+ def traverse(self, depth_first=True):
+ """
+ Note: you might want to sort() your tree first.
+ """
+ if depth_first == True:
+ yield self
+ for child in self:
+ for descendant in child.traverse():
+ yield descendant
+ else: # breadth first, Wikipedia algorithm
+ # http://en.wikipedia.org/wiki/Breadth-first_search
+ queue = [self]
+ while len(queue) > 0:
+ node = queue.pop(0)
+ yield node
+ queue.extend(node)
+ def thread(self, flatten=False):
+ """
+ When flatten==False, the depth of any node is one greater than
+ the depth of its parent. That way the inheritance is
+ explicit, but you can end up with highly indented threads.
+ When flatten==True, the depth of any node is only greater than
+ the depth of its parent when there is a branch, and the node
+ is not the last child. This can lead to ancestry ambiguity,
+ but keeps the total indentation down. E.g.
+ +-b +-b-c
+ a-+-c and a-+
+ +-d-e-f +-d-e-f
+ would both produce (after sorting by branch_len())
+ (0, a)
+ (1, b)
+ (1, c)
+ (0, d)
+ (0, e)
+ (0, f)
+ """
+ stack = [] # ancestry of the current node
+ if flatten == True:
+ depthDict = {}
+ for node in self.traverse(depth_first=True):
+ while len(stack) > 0 \
+ and id(node) not in [id(c) for c in stack[-1]]:
+ stack.pop(-1)
+ if flatten == False:
+ depth = len(stack)
+ else:
+ if len(stack) == 0:
+ depth = 0
+ else:
+ parent = stack[-1]
+ depth = depthDict[id(parent)]
+ if len(parent) > 1 and node != parent[-1]:
+ depth += 1
+ depthDict[id(node)] = depth
+ yield (depth,node)
+ stack.append(node)
+ def has_descendant(self, descendant, depth_first=True, match_self=False):
+ if descendant == self:
+ return match_self
+ for d in self.traverse(depth_first):
+ if descendant == d:
+ return True
+ return False
+suite = doctest.DocTestSuite()
diff --git a/libbe/utility.py b/libbe/utility.py
index e69de29..3df06b4 100644
--- a/libbe/utility.py
+++ b/libbe/utility.py
@@ -0,0 +1,129 @@
+# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
+# W. Trevor King <wking@drexel.edu>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import calendar
+import codecs
+import os
+import shutil
+import tempfile
+import time
+import types
+import doctest
+def search_parent_directories(path, filename):
+ """
+ Find the file (or directory) named filename in path or in any
+ of path's parents.
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
+ """
+ path = os.path.realpath(path)
+ assert os.path.exists(path)
+ old_path = None
+ while True:
+ check_path = os.path.join(path, filename)
+ if os.path.exists(check_path):
+ return check_path
+ if path == old_path:
+ return None
+ old_path = path
+ path = os.path.dirname(path)
+class Dir (object):
+ "A temporary directory for testing use"
+ def __init__(self):
+ self.path = tempfile.mkdtemp(prefix="BEtest")
+ self.rmtree = shutil.rmtree # save local reference for __del__
+ self.removed = False
+ def __del__(self):
+ self.cleanup()
+ def cleanup(self):
+ if self.removed == False:
+ self.rmtree(self.path)
+ self.removed = True
+ def __call__(self):
+ return self.path
+RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000"
+def time_to_str(time_val):
+ """Convert a time value into an RFC 2822-formatted string. This format
+ lacks sub-second data.
+ >>> time_to_str(0)
+ 'Thu, 01 Jan 1970 00:00:00 +0000'
+ """
+ return time.strftime(RFC_2822_TIME_FMT, time.gmtime(time_val))
+def str_to_time(str_time):
+ """Convert an RFC 2822-fomatted string into a time value.
+ >>> str_to_time("Thu, 01 Jan 1970 00:00:00 +0000")
+ 0
+ >>> q = time.time()
+ >>> str_to_time(time_to_str(q)) == int(q)
+ True
+ >>> str_to_time("Thu, 01 Jan 1970 00:00:00 -1000")
+ 36000
+ """
+ timezone_str = str_time[-5:]
+ if timezone_str != "+0000":
+ str_time = str_time.replace(timezone_str, "+0000")
+ time_val = calendar.timegm(time.strptime(str_time, RFC_2822_TIME_FMT))
+ timesign = -int(timezone_str[0]+"1") # "+" -> time_val ahead of GMT
+ timezone_tuple = time.strptime(timezone_str[1:], "%H%M")
+ timezone = timezone_tuple.tm_hour*3600 + timezone_tuple.tm_min*60
+ return time_val + timesign*timezone
+def handy_time(time_val):
+ return time.strftime("%a, %d %b %Y %H:%M", time.localtime(time_val))
+def time_to_gmtime(str_time):
+ """Convert an RFC 2822-fomatted string to a GMT string.
+ >>> time_to_gmtime("Thu, 01 Jan 1970 00:00:00 -1000")
+ 'Thu, 01 Jan 1970 10:00:00 +0000'
+ """
+ time_val = str_to_time(str_time)
+ return time_to_str(time_val)
+def iterable_full_of_strings(value, alternative=None):
+ """
+ Require an iterable full of strings.
+ >>> iterable_full_of_strings([])
+ True
+ >>> iterable_full_of_strings(["abc", "def", u"hij"])
+ True
+ >>> iterable_full_of_strings(["abc", None, u"hij"])
+ False
+ >>> iterable_full_of_strings(None, alternative=None)
+ True
+ """
+ if value == alternative:
+ return True
+ elif not hasattr(value, "__iter__"):
+ return False
+ for x in value:
+ if type(x) not in types.StringTypes:
+ return False
+ return True
+suite = doctest.DocTestSuite()