aboutsummaryrefslogtreecommitdiffstats
path: root/interfaces/email/interactive/libbe
diff options
context:
space:
mode:
Diffstat (limited to 'interfaces/email/interactive/libbe')
l---------interfaces/email/interactive/libbe1
-rw-r--r--interfaces/email/interactive/libbe/arch.py312
-rw-r--r--interfaces/email/interactive/libbe/beuuid.py63
-rw-r--r--interfaces/email/interactive/libbe/bug.py580
-rw-r--r--interfaces/email/interactive/libbe/bugdir.py832
-rw-r--r--interfaces/email/interactive/libbe/bzr.py113
-rw-r--r--interfaces/email/interactive/libbe/cmdutil.py233
-rw-r--r--interfaces/email/interactive/libbe/comment.py744
-rw-r--r--interfaces/email/interactive/libbe/config.py89
-rw-r--r--interfaces/email/interactive/libbe/darcs.py186
-rw-r--r--interfaces/email/interactive/libbe/diff.py419
-rw-r--r--interfaces/email/interactive/libbe/editor.py108
-rw-r--r--interfaces/email/interactive/libbe/encoding.py61
-rw-r--r--interfaces/email/interactive/libbe/git.py148
-rw-r--r--interfaces/email/interactive/libbe/hg.py103
-rw-r--r--interfaces/email/interactive/libbe/mapfile.py116
-rw-r--r--interfaces/email/interactive/libbe/plugin.py77
-rw-r--r--interfaces/email/interactive/libbe/properties.py638
-rw-r--r--interfaces/email/interactive/libbe/settings_object.py412
-rw-r--r--interfaces/email/interactive/libbe/tree.py183
-rw-r--r--interfaces/email/interactive/libbe/upgrade.py187
-rw-r--r--interfaces/email/interactive/libbe/utility.py131
-rw-r--r--interfaces/email/interactive/libbe/vcs.py942
-rw-r--r--interfaces/email/interactive/libbe/version.py50
24 files changed, 1 insertions, 6727 deletions
diff --git a/interfaces/email/interactive/libbe b/interfaces/email/interactive/libbe
new file mode 120000
index 0000000..7d18612
--- /dev/null
+++ b/interfaces/email/interactive/libbe
@@ -0,0 +1 @@
+../../../libbe \ No newline at end of file
diff --git a/interfaces/email/interactive/libbe/arch.py b/interfaces/email/interactive/libbe/arch.py
deleted file mode 100644
index daa8ac6..0000000
--- a/interfaces/email/interactive/libbe/arch.py
+++ /dev/null
@@ -1,312 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# Ben Finney <ben+python@benfinney.id.au>
-# James Rowe <jnrowe@ukfsn.org>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-GNU Arch (tla) backend.
-"""
-
-import codecs
-import os
-import re
-import shutil
-import sys
-import time
-import unittest
-import doctest
-
-from beuuid import uuid_gen
-import config
-import vcs
-
-
-
-DEFAULT_CLIENT = "tla"
-
-client = config.get_val("arch_client", default=DEFAULT_CLIENT)
-
-def new():
- return Arch()
-
-class Arch(vcs.VCS):
- name = "Arch"
- client = client
- versioned = True
- _archive_name = None
- _archive_dir = None
- _tmp_archive = False
- _project_name = None
- _tmp_project = False
- _arch_paramdir = os.path.expanduser("~/.arch-params")
- def _vcs_version(self):
- status,output,error = self._u_invoke_client("--version")
- return output
- def _vcs_detect(self, path):
- """Detect whether a directory is revision-controlled using Arch"""
- if self._u_search_parent_directories(path, "{arch}") != None :
- config.set_val("arch_client", client)
- return True
- return False
- def _vcs_init(self, path):
- self._create_archive(path)
- self._create_project(path)
- self._add_project_code(path)
- def _create_archive(self, path):
- """
- Create a temporary Arch archive in the directory PATH. This
- archive will be removed by
- cleanup->_vcs_cleanup->_remove_archive
- """
- # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
- assert self._archive_name == None
- id = self.get_user_id()
- name, email = self._u_parse_id(id)
- if email == None:
- email = "%s@example.com" % name
- trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8])
- self._archive_name = "%s--%s" % (email, trailer)
- self._archive_dir = "/tmp/%s" % trailer
- self._tmp_archive = True
- self._u_invoke_client("make-archive", self._archive_name,
- self._archive_dir, directory=path)
- def _invoke_client(self, *args, **kwargs):
- """
- Invoke the client on our archive.
- """
- assert self._archive_name != None
- command = args[0]
- if len(args) > 1:
- tailargs = args[1:]
- else:
- tailargs = []
- arglist = [command, "-A", self._archive_name]
- arglist.extend(tailargs)
- args = tuple(arglist)
- return self._u_invoke_client(*args, **kwargs)
- def _remove_archive(self):
- assert self._tmp_archive == True
- assert self._archive_dir != None
- assert self._archive_name != None
- os.remove(os.path.join(self._arch_paramdir,
- "=locations", self._archive_name))
- shutil.rmtree(self._archive_dir)
- self._tmp_archive = False
- self._archive_dir = False
- self._archive_name = False
- def _create_project(self, path):
- """
- Create a temporary Arch project in the directory PATH. This
- project will be removed by
- cleanup->_vcs_cleanup->_remove_project
- """
- # http://mwolson.org/projects/GettingStartedWithArch.html
- # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
- category = "bugs-everywhere"
- branch = "mainline"
- version = "0.1"
- self._project_name = "%s--%s--%s" % (category, branch, version)
- self._invoke_client("archive-setup", self._project_name,
- directory=path)
- self._tmp_project = True
- def _remove_project(self):
- assert self._tmp_project == True
- assert self._project_name != None
- assert self._archive_dir != None
- shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
- self._tmp_project = False
- self._project_name = False
- def _archive_project_name(self):
- assert self._archive_name != None
- assert self._project_name != None
- return "%s/%s" % (self._archive_name, self._project_name)
- def _adjust_naming_conventions(self, path):
- """
- By default, Arch restricts source code filenames to
- ^[_=a-zA-Z0-9].*$
- See
- http://regexps.srparish.net/tutorial-tla/naming-conventions.html
- Since our bug directory '.be' doesn't satisfy these conventions,
- we need to adjust them.
-
- The conventions are specified in
- project-root/{arch}/=tagging-method
- """
- tagpath = os.path.join(path, "{arch}", "=tagging-method")
- lines_out = []
- f = codecs.open(tagpath, "r", self.encoding)
- for line in f:
- if line.startswith("source "):
- lines_out.append("source ^[._=a-zA-X0-9].*$\n")
- else:
- lines_out.append(line)
- f.close()
- f = codecs.open(tagpath, "w", self.encoding)
- f.write("".join(lines_out))
- f.close()
-
- def _add_project_code(self, path):
- # http://mwolson.org/projects/GettingStartedWithArch.html
- # http://regexps.srparish.net/tutorial-tla/new-source.html
- # http://regexps.srparish.net/tutorial-tla/importing-first.html
- self._invoke_client("init-tree", self._project_name,
- directory=path)
- self._adjust_naming_conventions(path)
- self._invoke_client("import", "--summary", "Began versioning",
- directory=path)
- def _vcs_cleanup(self):
- if self._tmp_project == True:
- self._remove_project()
- if self._tmp_archive == True:
- self._remove_archive()
-
- def _vcs_root(self, path):
- if not os.path.isdir(path):
- dirname = os.path.dirname(path)
- else:
- dirname = path
- status,output,error = self._u_invoke_client("tree-root", dirname)
- root = output.rstrip('\n')
-
- self._get_archive_project_name(root)
-
- return root
- def _get_archive_name(self, root):
- status,output,error = self._u_invoke_client("archives")
- lines = output.split('\n')
- # e.g. output:
- # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52
- # /tmp/BEtestXXXXXX/rootdir
- # (+ repeats)
- for archive,location in zip(lines[::2], lines[1::2]):
- if os.path.realpath(location) == os.path.realpath(root):
- self._archive_name = archive
- assert self._archive_name != None
- def _get_archive_project_name(self, root):
- # get project names
- status,output,error = self._u_invoke_client("tree-version", directory=root)
- # e.g output
- # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1
- archive_name,project_name = output.rstrip('\n').split('/')
- self._archive_name = archive_name
- self._project_name = project_name
- def _vcs_get_user_id(self):
- try:
- status,output,error = self._u_invoke_client('my-id')
- return output.rstrip('\n')
- except Exception, e:
- if 'no arch user id set' in e.args[0]:
- return None
- else:
- raise
- def _vcs_set_user_id(self, value):
- self._u_invoke_client('my-id', value)
- def _vcs_add(self, path):
- self._u_invoke_client("add-id", path)
- realpath = os.path.realpath(self._u_abspath(path))
- pathAdded = realpath in self._list_added(self.rootdir)
- if self.paranoid and not pathAdded:
- self._force_source(path)
- def _list_added(self, root):
- assert os.path.exists(root)
- assert os.access(root, os.X_OK)
- root = os.path.realpath(root)
- status,output,error = self._u_invoke_client("inventory", "--source",
- "--both", "--all", root)
- inv_str = output.rstrip('\n')
- return [os.path.join(root, p) for p in inv_str.split('\n')]
- def _add_dir_rule(self, rule, dirname, root):
- inv_path = os.path.join(dirname, '.arch-inventory')
- f = codecs.open(inv_path, "a", self.encoding)
- f.write(rule)
- f.close()
- if os.path.realpath(inv_path) not in self._list_added(root):
- paranoid = self.paranoid
- self.paranoid = False
- self.add(inv_path)
- self.paranoid = paranoid
- def _force_source(self, path):
- rule = "source %s\n" % self._u_rel_path(path)
- self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
- if os.path.realpath(path) not in self._list_added(self.rootdir):
- raise CantAddFile(path)
- def _vcs_remove(self, path):
- if not '.arch-ids' in path:
- self._u_invoke_client("delete-id", path)
- def _vcs_update(self, path):
- pass
- def _vcs_get_file_contents(self, path, revision=None, binary=False):
- if revision == None:
- return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
- else:
- status,output,error = \
- self._invoke_client("file-find", path, revision)
- relpath = output.rstrip('\n')
- abspath = os.path.join(self.rootdir, relpath)
- f = codecs.open(abspath, "r", self.encoding)
- contents = f.read()
- f.close()
- return contents
- def _vcs_duplicate_repo(self, directory, revision=None):
- if revision == None:
- vcs.VCS._vcs_duplicate_repo(self, directory, revision)
- else:
- status,output,error = \
- self._u_invoke_client("get", revision,directory)
- def _vcs_commit(self, commitfile, allow_empty=False):
- if allow_empty == False:
- # arch applies empty commits without complaining, so check first
- status,output,error = self._u_invoke_client("changes",expect=(0,1))
- if status == 0:
- raise vcs.EmptyCommit()
- summary,body = self._u_parse_commitfile(commitfile)
- args = ["commit", "--summary", summary]
- if body != None:
- args.extend(["--log-message",body])
- status,output,error = self._u_invoke_client(*args)
- revision = None
- revline = re.compile("[*] committed (.*)")
- match = revline.search(output)
- assert match != None, output+error
- assert len(match.groups()) == 1
- revpath = match.groups()[0]
- assert not " " in revpath, revpath
- assert revpath.startswith(self._archive_project_name()+'--')
- revision = revpath[len(self._archive_project_name()+'--'):]
- return revpath
- def _vcs_revision_id(self, index):
- status,output,error = self._u_invoke_client("logs")
- logs = output.splitlines()
- first_log = logs.pop(0)
- assert first_log == "base-0", first_log
- try:
- log = logs[index]
- except IndexError:
- return None
- return "%s--%s" % (self._archive_project_name(), log)
-
-class CantAddFile(Exception):
- def __init__(self, file):
- self.file = file
- Exception.__init__(self, "Can't automatically add file %s" % file)
-
-
-
-vcs.make_vcs_testcase_subclasses(Arch, sys.modules[__name__])
-
-unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/interfaces/email/interactive/libbe/beuuid.py b/interfaces/email/interactive/libbe/beuuid.py
deleted file mode 100644
index 490ed62..0000000
--- a/interfaces/email/interactive/libbe/beuuid.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Backwards compatibility support for Python 2.4. Once people give up
-on 2.4 ;), the uuid call should be merged into bugdir.py
-"""
-
-import unittest
-
-
-try:
- from uuid import uuid4 # Python >= 2.5
- def uuid_gen():
- id = uuid4()
- idstr = id.urn
- start = "urn:uuid:"
- assert idstr.startswith(start)
- return idstr[len(start):]
-except ImportError:
- import os
- import sys
- from subprocess import Popen, PIPE
-
- def uuid_gen():
- # Shell-out to system uuidgen
- args = ['uuidgen', 'r']
- try:
- if sys.platform != "win32":
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- else:
- # win32 don't have os.execvp() so have to run command in a shell
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
- shell=True, cwd=cwd)
- except OSError, e :
- strerror = "%s\nwhile executing %s" % (e.args[1], args)
- raise OSError, strerror
- output, error = q.communicate()
- status = q.wait()
- if status != 0:
- strerror = "%s\nwhile executing %s" % (status, args)
- raise Exception, strerror
- return output.rstrip('\n')
-
-class UUIDtestCase(unittest.TestCase):
- def testUUID_gen(self):
- id = uuid_gen()
- self.failUnless(len(id) == 36, "invalid UUID '%s'" % id)
-
-suite = unittest.TestLoader().loadTestsFromTestCase(UUIDtestCase)
diff --git a/interfaces/email/interactive/libbe/bug.py b/interfaces/email/interactive/libbe/bug.py
deleted file mode 100644
index fd30ff7..0000000
--- a/interfaces/email/interactive/libbe/bug.py
+++ /dev/null
@@ -1,580 +0,0 @@
-# Copyright (C) 2008-2009 Chris Ball <cjb@laptop.org>
-# Thomas Habets <thomas@habets.pp.se>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Define the Bug class for representing bugs.
-"""
-
-import os
-import os.path
-import errno
-import time
-import types
-import xml.sax.saxutils
-import doctest
-
-from beuuid import uuid_gen
-from properties import Property, doc_property, local_property, \
- defaulting_property, checked_property, cached_property, \
- primed_property, change_hook_property, settings_property
-import settings_object
-import mapfile
-import comment
-import utility
-
-
-class DiskAccessRequired (Exception):
- def __init__(self, goal):
- msg = "Cannot %s without accessing the disk" % goal
- Exception.__init__(self, msg)
-
-### Define and describe valid bug categories
-# Use a tuple of (category, description) tuples since we don't have
-# ordered dicts in Python yet http://www.python.org/dev/peps/pep-0372/
-
-# in order of increasing severity. (name, description) pairs
-severity_def = (
- ("wishlist","A feature that could improve usefulness, but not a bug."),
- ("minor","The standard bug level."),
- ("serious","A bug that requires workarounds."),
- ("critical","A bug that prevents some features from working at all."),
- ("fatal","A bug that makes the package unusable."))
-
-# in order of increasing resolution
-# roughly following http://www.bugzilla.org/docs/3.2/en/html/lifecycle.html
-active_status_def = (
- ("unconfirmed","A possible bug which lacks independent existance confirmation."),
- ("open","A working bug that has not been assigned to a developer."),
- ("assigned","A working bug that has been assigned to a developer."),
- ("test","The code has been adjusted, but the fix is still being tested."))
-inactive_status_def = (
- ("closed", "The bug is no longer relevant."),
- ("fixed", "The bug should no longer occur."),
- ("wontfix","It's not a bug, it's a feature."))
-
-
-### Convert the description tuples to more useful formats
-
-severity_values = ()
-severity_description = {}
-severity_index = {}
-def load_severities(severity_def):
- global severity_values
- global severity_description
- global severity_index
- if severity_def == None:
- return
- severity_values = tuple([val for val,description in severity_def])
- severity_description = dict(severity_def)
- severity_index = {}
- for i,severity in enumerate(severity_values):
- severity_index[severity] = i
-load_severities(severity_def)
-
-active_status_values = []
-inactive_status_values = []
-status_values = []
-status_description = {}
-status_index = {}
-def load_status(active_status_def, inactive_status_def):
- global active_status_values
- global inactive_status_values
- global status_values
- global status_description
- global status_index
- if active_status_def == None:
- active_status_def = globals()["active_status_def"]
- if inactive_status_def == None:
- inactive_status_def = globals()["inactive_status_def"]
- active_status_values = tuple([val for val,description in active_status_def])
- inactive_status_values = tuple([val for val,description in inactive_status_def])
- status_values = active_status_values + inactive_status_values
- status_description = dict(tuple(active_status_def) + tuple(inactive_status_def))
- status_index = {}
- for i,status in enumerate(status_values):
- status_index[status] = i
-load_status(active_status_def, inactive_status_def)
-
-
-class Bug(settings_object.SavedSettingsObject):
- """
- >>> b = Bug()
- >>> print b.status
- open
- >>> print b.severity
- minor
-
- There are two formats for time, int and string. Setting either
- one will adjust the other appropriately. The string form is the
- one stored in the bug's settings file on disk.
- >>> print type(b.time)
- <type 'int'>
- >>> print type(b.time_string)
- <type 'str'>
- >>> b.time = 0
- >>> print b.time_string
- Thu, 01 Jan 1970 00:00:00 +0000
- >>> b.time_string="Thu, 01 Jan 1970 00:01:00 +0000"
- >>> b.time
- 60
- >>> print b.settings["time"]
- Thu, 01 Jan 1970 00:01:00 +0000
- """
- settings_properties = []
- required_saved_properties = []
- _prop_save_settings = settings_object.prop_save_settings
- _prop_load_settings = settings_object.prop_load_settings
- def _versioned_property(settings_properties=settings_properties,
- required_saved_properties=required_saved_properties,
- **kwargs):
- if "settings_properties" not in kwargs:
- kwargs["settings_properties"] = settings_properties
- if "required_saved_properties" not in kwargs:
- kwargs["required_saved_properties"]=required_saved_properties
- return settings_object.versioned_property(**kwargs)
-
- @_versioned_property(name="severity",
- doc="A measure of the bug's importance",
- default="minor",
- check_fn=lambda s: s in severity_values,
- require_save=True)
- def severity(): return {}
-
- @_versioned_property(name="status",
- doc="The bug's current status",
- default="open",
- check_fn=lambda s: s in status_values,
- require_save=True)
- def status(): return {}
-
- @property
- def active(self):
- return self.status in active_status_values
-
- @_versioned_property(name="target",
- doc="The deadline for fixing this bug")
- def target(): return {}
-
- @_versioned_property(name="creator",
- doc="The user who entered the bug into the system")
- def creator(): return {}
-
- @_versioned_property(name="reporter",
- doc="The user who reported the bug")
- def reporter(): return {}
-
- @_versioned_property(name="assigned",
- doc="The developer in charge of the bug")
- def assigned(): return {}
-
- @_versioned_property(name="time",
- doc="An RFC 2822 timestamp for bug creation")
- def time_string(): return {}
-
- def _get_time(self):
- if self.time_string == None:
- return None
- return utility.str_to_time(self.time_string)
- def _set_time(self, value):
- self.time_string = utility.time_to_str(value)
- time = property(fget=_get_time,
- fset=_set_time,
- doc="An integer version of .time_string")
-
- def _extra_strings_check_fn(value):
- return utility.iterable_full_of_strings(value, \
- alternative=settings_object.EMPTY)
- def _extra_strings_change_hook(self, old, new):
- self.extra_strings.sort() # to make merging easier
- self._prop_save_settings(old, new)
- @_versioned_property(name="extra_strings",
- doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
- default=[],
- check_fn=_extra_strings_check_fn,
- change_hook=_extra_strings_change_hook,
- mutable=True)
- def extra_strings(): return {}
-
- @_versioned_property(name="summary",
- doc="A one-line bug description")
- def summary(): return {}
-
- def _get_comment_root(self, load_full=False):
- if self.sync_with_disk:
- return comment.loadComments(self, load_full=load_full)
- else:
- return comment.Comment(self, uuid=comment.INVALID_UUID)
-
- @Property
- @cached_property(generator=_get_comment_root)
- @local_property("comment_root")
- @doc_property(doc="The trunk of the comment tree")
- def comment_root(): return {}
-
- def _get_vcs(self):
- if hasattr(self.bugdir, "vcs"):
- return self.bugdir.vcs
-
- @Property
- @cached_property(generator=_get_vcs)
- @local_property("vcs")
- @doc_property(doc="A revision control system instance.")
- def vcs(): return {}
-
- def __init__(self, bugdir=None, uuid=None, from_disk=False,
- load_comments=False, summary=None):
- settings_object.SavedSettingsObject.__init__(self)
- self.bugdir = bugdir
- self.uuid = uuid
- if from_disk == True:
- self.sync_with_disk = True
- else:
- self.sync_with_disk = False
- if uuid == None:
- self.uuid = uuid_gen()
- self.time = int(time.time()) # only save to second precision
- if self.vcs != None:
- self.creator = self.vcs.get_user_id()
- self.summary = summary
-
- def __repr__(self):
- return "Bug(uuid=%r)" % self.uuid
-
- def __str__(self):
- return self.string(shortlist=True)
-
- def __cmp__(self, other):
- return cmp_full(self, other)
-
- # serializing methods
-
- def _setting_attr_string(self, setting):
- value = getattr(self, setting)
- if value == None:
- return ""
- return str(value)
-
- def xml(self, show_comments=False):
- if self.bugdir == None:
- shortname = self.uuid
- else:
- shortname = self.bugdir.bug_shortname(self)
-
- if self.time == None:
- timestring = ""
- else:
- timestring = utility.time_to_str(self.time)
-
- info = [("uuid", self.uuid),
- ("short-name", shortname),
- ("severity", self.severity),
- ("status", self.status),
- ("assigned", self.assigned),
- ("target", self.target),
- ("reporter", self.reporter),
- ("creator", self.creator),
- ("created", timestring),
- ("summary", self.summary)]
- ret = '<bug>\n'
- for (k,v) in info:
- if v is not None:
- ret += ' <%s>%s</%s>\n' % (k,xml.sax.saxutils.escape(v),k)
- for estr in self.extra_strings:
- ret += ' <extra-string>%s</extra-string>\n' % estr
- if show_comments == True:
- comout = self.comment_root.xml_thread(auto_name_map=True,
- bug_shortname=shortname)
- if len(comout) > 0:
- ret += comout+'\n'
- ret += '</bug>'
- return ret
-
- def string(self, shortlist=False, show_comments=False):
- if self.bugdir == None:
- shortname = self.uuid
- else:
- shortname = self.bugdir.bug_shortname(self)
- if shortlist == False:
- if self.time == None:
- timestring = ""
- else:
- htime = utility.handy_time(self.time)
- timestring = "%s (%s)" % (htime, self.time_string)
- info = [("ID", self.uuid),
- ("Short name", shortname),
- ("Severity", self.severity),
- ("Status", self.status),
- ("Assigned", self._setting_attr_string("assigned")),
- ("Target", self._setting_attr_string("target")),
- ("Reporter", self._setting_attr_string("reporter")),
- ("Creator", self._setting_attr_string("creator")),
- ("Created", timestring)]
- longest_key_len = max([len(k) for k,v in info])
- infolines = [" %*s : %s\n" %(longest_key_len,k,v) for k,v in info]
- bugout = "".join(infolines) + "%s" % self.summary.rstrip('\n')
- else:
- statuschar = self.status[0]
- severitychar = self.severity[0]
- chars = "%c%c" % (statuschar, severitychar)
- bugout = "%s:%s: %s" % (shortname,chars,self.summary.rstrip('\n'))
-
- if show_comments == True:
- # take advantage of the string_thread(auto_name_map=True)
- # SIDE-EFFECT of sorting by comment time.
- comout = self.comment_root.string_thread(flatten=False,
- auto_name_map=True,
- bug_shortname=shortname)
- output = bugout + '\n' + comout.rstrip('\n')
- else :
- output = bugout
- return output
-
- # methods for saving/loading/acessing settings and properties.
-
- def get_path(self, *args):
- dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid)
- if len(args) == 0:
- return dir
- assert args[0] in ["values", "comments"], str(args)
- return os.path.join(dir, *args)
-
- def set_sync_with_disk(self, value):
- self.sync_with_disk = value
- for comment in self.comments():
- comment.set_sync_with_disk(value)
-
- def load_settings(self):
- if self.sync_with_disk == False:
- raise DiskAccessRequired("load settings")
- self.settings = mapfile.map_load(self.vcs, self.get_path("values"))
- self._setup_saved_settings()
-
- def save_settings(self):
- if self.sync_with_disk == False:
- raise DiskAccessRequired("save settings")
- assert self.summary != None, "Can't save blank bug"
- self.vcs.mkdir(self.get_path())
- path = self.get_path("values")
- mapfile.map_save(self.vcs, path, self._get_saved_settings())
-
- def save(self):
- """
- Save any loaded contents to disk. Because of lazy loading of
- comments, this is actually not too inefficient.
-
- However, if self.sync_with_disk = True, then any changes are
- automatically written to disk as soon as they happen, so
- calling this method will just waste time (unless something
- else has been messing with your on-disk files).
- """
- sync_with_disk = self.sync_with_disk
- if sync_with_disk == False:
- self.set_sync_with_disk(True)
- self.save_settings()
- if len(self.comment_root) > 0:
- comment.saveComments(self)
- if sync_with_disk == False:
- self.set_sync_with_disk(False)
-
- def load_comments(self, load_full=True):
- if self.sync_with_disk == False:
- raise DiskAccessRequired("load comments")
- if load_full == True:
- # Force a complete load of the whole comment tree
- self.comment_root = self._get_comment_root(load_full=True)
- else:
- # Setup for fresh lazy-loading. Clear _comment_root, so
- # _get_comment_root returns a fresh version. Turn of
- # syncing temporarily so we don't write our blank comment
- # tree to disk.
- self.sync_with_disk = False
- self.comment_root = None
- self.sync_with_disk = True
-
- def remove(self):
- if self.sync_with_disk == False:
- raise DiskAccessRequired("remove")
- self.comment_root.remove()
- path = self.get_path()
- self.vcs.recursive_remove(path)
-
- # methods for managing comments
-
- def comments(self):
- for comment in self.comment_root.traverse():
- yield comment
-
- def new_comment(self, body=None):
- comm = self.comment_root.new_reply(body=body)
- return comm
-
- def comment_from_shortname(self, shortname, *args, **kwargs):
- return self.comment_root.comment_from_shortname(shortname,
- *args, **kwargs)
-
- def comment_from_uuid(self, uuid):
- return self.comment_root.comment_from_uuid(uuid)
-
- def comment_shortnames(self, shortname=None):
- """
- SIDE-EFFECT : Comment.comment_shortnames will sort the comment
- tree by comment.time
- """
- for id, comment in self.comment_root.comment_shortnames(shortname):
- yield (id, comment)
-
-
-# The general rule for bug sorting is that "more important" bugs are
-# less than "less important" bugs. This way sorting a list of bugs
-# will put the most important bugs first in the list. When relative
-# importance is unclear, the sorting follows some arbitrary convention
-# (i.e. dictionary order).
-
-def cmp_severity(bug_1, bug_2):
- """
- Compare the severity levels of two bugs, with more severe bugs
- comparing as less.
- >>> bugA = Bug()
- >>> bugB = Bug()
- >>> bugA.severity = bugB.severity = "wishlist"
- >>> cmp_severity(bugA, bugB) == 0
- True
- >>> bugB.severity = "minor"
- >>> cmp_severity(bugA, bugB) > 0
- True
- >>> bugA.severity = "critical"
- >>> cmp_severity(bugA, bugB) < 0
- True
- """
- if not hasattr(bug_2, "severity") :
- return 1
- return -cmp(severity_index[bug_1.severity], severity_index[bug_2.severity])
-
-def cmp_status(bug_1, bug_2):
- """
- Compare the status levels of two bugs, with more 'open' bugs
- comparing as less.
- >>> bugA = Bug()
- >>> bugB = Bug()
- >>> bugA.status = bugB.status = "open"
- >>> cmp_status(bugA, bugB) == 0
- True
- >>> bugB.status = "closed"
- >>> cmp_status(bugA, bugB) < 0
- True
- >>> bugA.status = "fixed"
- >>> cmp_status(bugA, bugB) > 0
- True
- """
- if not hasattr(bug_2, "status") :
- return 1
- val_2 = status_index[bug_2.status]
- return cmp(status_index[bug_1.status], status_index[bug_2.status])
-
-def cmp_attr(bug_1, bug_2, attr, invert=False):
- """
- Compare a general attribute between two bugs using the conventional
- comparison rule for that attribute type. If invert == True, sort
- *against* that convention.
- >>> attr="severity"
- >>> bugA = Bug()
- >>> bugB = Bug()
- >>> bugA.severity = "critical"
- >>> bugB.severity = "wishlist"
- >>> cmp_attr(bugA, bugB, attr) < 0
- True
- >>> cmp_attr(bugA, bugB, attr, invert=True) > 0
- True
- >>> bugB.severity = "critical"
- >>> cmp_attr(bugA, bugB, attr) == 0
- True
- """
- if not hasattr(bug_2, attr) :
- return 1
- val_1 = getattr(bug_1, attr)
- val_2 = getattr(bug_2, attr)
- if val_1 == None: val_1 = None
- if val_2 == None: val_2 = None
-
- if invert == True :
- return -cmp(val_1, val_2)
- else :
- return cmp(val_1, val_2)
-
-# alphabetical rankings (a < z)
-cmp_uuid = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "uuid")
-cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator")
-cmp_assigned = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "assigned")
-cmp_target = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "target")
-cmp_reporter = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "reporter")
-cmp_summary = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "summary")
-# chronological rankings (newer < older)
-cmp_time = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "time", invert=True)
-
-def cmp_comments(bug_1, bug_2):
- """
- Compare two bugs' comments lists. Doesn't load any new comments,
- so you should call each bug's .load_comments() first if you want a
- full comparison.
- """
- comms_1 = sorted(bug_1.comments(), key = lambda comm : comm.uuid)
- comms_2 = sorted(bug_2.comments(), key = lambda comm : comm.uuid)
- result = cmp(len(comms_1), len(comms_2))
- if result != 0:
- return result
- for c_1,c_2 in zip(comms_1, comms_2):
- result = cmp(c_1, c_2)
- if result != 0:
- return result
- return 0
-
-DEFAULT_CMP_FULL_CMP_LIST = \
- (cmp_status, cmp_severity, cmp_assigned, cmp_time, cmp_creator,
- cmp_reporter, cmp_target, cmp_comments, cmp_summary, cmp_uuid)
-
-class BugCompoundComparator (object):
- def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST):
- self.cmp_list = cmp_list
- def __call__(self, bug_1, bug_2):
- for comparison in self.cmp_list :
- val = comparison(bug_1, bug_2)
- if val != 0 :
- return val
- return 0
-
-cmp_full = BugCompoundComparator()
-
-
-# define some bonus cmp_* functions
-def cmp_last_modified(bug_1, bug_2):
- """
- Like cmp_time(), but use most recent comment instead of bug
- creation for the timestamp.
- """
- def last_modified(bug):
- time = bug.time
- for comment in bug.comment_root.traverse():
- if comment.time > time:
- time = comment.time
- return time
- val_1 = last_modified(bug_1)
- val_2 = last_modified(bug_2)
- return -cmp(val_1, val_2)
-
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/bugdir.py b/interfaces/email/interactive/libbe/bugdir.py
deleted file mode 100644
index 5324163..0000000
--- a/interfaces/email/interactive/libbe/bugdir.py
+++ /dev/null
@@ -1,832 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# Alexander Belchenko <bialix@ukr.net>
-# Chris Ball <cjb@laptop.org>
-# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Define the BugDir class for representing bug comments.
-"""
-
-import copy
-import errno
-import os
-import os.path
-import sys
-import time
-import unittest
-import doctest
-
-import bug
-import encoding
-from properties import Property, doc_property, local_property, \
- defaulting_property, checked_property, fn_checked_property, \
- cached_property, primed_property, change_hook_property, \
- settings_property
-import mapfile
-import vcs
-import settings_object
-import upgrade
-import utility
-
-
-class NoBugDir(Exception):
- def __init__(self, path):
- msg = "The directory \"%s\" has no bug directory." % path
- Exception.__init__(self, msg)
- self.path = path
-
-class NoRootEntry(Exception):
- def __init__(self, path):
- self.path = path
- Exception.__init__(self, "Specified root does not exist: %s" % path)
-
-class AlreadyInitialized(Exception):
- def __init__(self, path):
- self.path = path
- Exception.__init__(self,
- "Specified root is already initialized: %s" % path)
-
-class MultipleBugMatches(ValueError):
- def __init__(self, shortname, matches):
- msg = ("More than one bug matches %s. "
- "Please be more specific.\n%s" % (shortname, matches))
- ValueError.__init__(self, msg)
- self.shortname = shortname
- self.matches = matches
-
-class NoBugMatches(KeyError):
- def __init__(self, shortname):
- msg = "No bug matches %s" % shortname
- KeyError.__init__(self, msg)
- self.shortname = shortname
-
-class DiskAccessRequired (Exception):
- def __init__(self, goal):
- msg = "Cannot %s without accessing the disk" % goal
- Exception.__init__(self, msg)
-
-
-class BugDir (list, settings_object.SavedSettingsObject):
- """
- Sink to existing root
- ======================
-
- Consider the following usage case:
- You have a bug directory rooted in
- /path/to/source
- by which I mean the '.be' directory is at
- /path/to/source/.be
- However, you're of in some subdirectory like
- /path/to/source/GUI/testing
- and you want to comment on a bug. Setting sink_to_root=True wen
- you initialize your BugDir will cause it to search for the '.be'
- file in the ancestors of the path you passed in as 'root'.
- /path/to/source/GUI/testing/.be miss
- /path/to/source/GUI/.be miss
- /path/to/source/.be hit!
- So it still roots itself appropriately without much work for you.
-
- File-system access
- ==================
-
- BugDirs live completely in memory when .sync_with_disk is False.
- This is the default configuration setup by BugDir(from_disk=False).
- If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
- any changes to the BugDir will be immediately written to disk.
-
- If you want to change .sync_with_disk, we suggest you use
- .set_sync_with_disk(), which propogates the new setting through to
- all bugs/comments/etc. that have been loaded into memory. If
- you've been living in memory and want to move to
- .sync_with_disk==True, but you're not sure if anything has been
- changed in memory, a call to save() immediately before the
- .set_sync_with_disk(True) call is a safe move.
-
- Regardless of .sync_with_disk, a call to .save() will write out
- all the contents that the BugDir instance has loaded into memory.
- If sync_with_disk has been True over the course of all interesting
- changes, this .save() call will be a waste of time.
-
- The BugDir will only load information from the file system when it
- loads new settings/bugs/comments that it doesn't already have in
- memory and .sync_with_disk == True.
-
- Allow VCS initialization
- ========================
-
- This one is for testing purposes. Setting it to True allows the
- BugDir to search for an installed VCS backend and initialize it in
- the root directory. This is a convenience option for supporting
- tests of versioning functionality (e.g. .duplicate_bugdir).
-
- Disable encoding manipulation
- =============================
-
- This one is for testing purposed. You might have non-ASCII
- Unicode in your bugs, comments, files, etc. BugDir instances try
- and support your preferred encoding scheme (e.g. "utf-8") when
- dealing with stream and file input/output. For stream output,
- this involves replacing sys.stdout and sys.stderr
- (libbe.encode.set_IO_stream_encodings). However this messes up
- doctest's output catching. In order to support doctest tests
- using BugDirs, set manipulate_encodings=False, and stick to ASCII
- in your tests.
- """
-
- settings_properties = []
- required_saved_properties = []
- _prop_save_settings = settings_object.prop_save_settings
- _prop_load_settings = settings_object.prop_load_settings
- def _versioned_property(settings_properties=settings_properties,
- required_saved_properties=required_saved_properties,
- **kwargs):
- if "settings_properties" not in kwargs:
- kwargs["settings_properties"] = settings_properties
- if "required_saved_properties" not in kwargs:
- kwargs["required_saved_properties"]=required_saved_properties
- return settings_object.versioned_property(**kwargs)
-
- @_versioned_property(name="target",
- doc="The current project development target.")
- def target(): return {}
-
- def _guess_encoding(self):
- return encoding.get_encoding()
- def _check_encoding(value):
- if value != None:
- return encoding.known_encoding(value)
- def _setup_encoding(self, new_encoding):
- # change hook called before generator.
- if new_encoding not in [None, settings_object.EMPTY]:
- if self._manipulate_encodings == True:
- encoding.set_IO_stream_encodings(new_encoding)
- def _set_encoding(self, old_encoding, new_encoding):
- self._setup_encoding(new_encoding)
- self._prop_save_settings(old_encoding, new_encoding)
-
- @_versioned_property(name="encoding",
- doc="""The default input/output encoding to use (e.g. "utf-8").""",
- change_hook=_set_encoding,
- generator=_guess_encoding,
- check_fn=_check_encoding)
- def encoding(): return {}
-
- def _setup_user_id(self, user_id):
- self.vcs.user_id = user_id
- def _guess_user_id(self):
- return self.vcs.get_user_id()
- def _set_user_id(self, old_user_id, new_user_id):
- self._setup_user_id(new_user_id)
- self._prop_save_settings(old_user_id, new_user_id)
-
- @_versioned_property(name="user_id",
- doc=
-"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note
-that the Arch VCS backend *enforces* ids with this format.""",
- change_hook=_set_user_id,
- generator=_guess_user_id)
- def user_id(): return {}
-
- @_versioned_property(name="default_assignee",
- doc=
-"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
- def default_assignee(): return {}
-
- @_versioned_property(name="vcs_name",
- doc="""The name of the current VCS. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .vcs instead, and
-.vcs_name will be automatically adjusted.""",
- default="None",
- allowed=["None", "Arch", "bzr", "darcs", "git", "hg"])
- def vcs_name(): return {}
-
- def _get_vcs(self, vcs_name=None):
- """Get and root a new revision control system"""
- if vcs_name == None:
- vcs_name = self.vcs_name
- new_vcs = vcs.vcs_by_name(vcs_name)
- self._change_vcs(None, new_vcs)
- return new_vcs
- def _change_vcs(self, old_vcs, new_vcs):
- new_vcs.encoding = self.encoding
- new_vcs.root(self.root)
- self.vcs_name = new_vcs.name
-
- @Property
- @change_hook_property(hook=_change_vcs)
- @cached_property(generator=_get_vcs)
- @local_property("vcs")
- @doc_property(doc="A revision control system instance.")
- def vcs(): return {}
-
- def _bug_map_gen(self):
- map = {}
- for bug in self:
- map[bug.uuid] = bug
- for uuid in self.list_uuids():
- if uuid not in map:
- map[uuid] = None
- self._bug_map_value = map # ._bug_map_value used by @local_property
-
- def _extra_strings_check_fn(value):
- return utility.iterable_full_of_strings(value, \
- alternative=settings_object.EMPTY)
- def _extra_strings_change_hook(self, old, new):
- self.extra_strings.sort() # to make merging easier
- self._prop_save_settings(old, new)
- @_versioned_property(name="extra_strings",
- doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
- default=[],
- check_fn=_extra_strings_check_fn,
- change_hook=_extra_strings_change_hook,
- mutable=True)
- def extra_strings(): return {}
-
- @Property
- @primed_property(primer=_bug_map_gen)
- @local_property("bug_map")
- @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.")
- def _bug_map(): return {}
-
- def _setup_severities(self, severities):
- if severities not in [None, settings_object.EMPTY]:
- bug.load_severities(severities)
- def _set_severities(self, old_severities, new_severities):
- self._setup_severities(new_severities)
- self._prop_save_settings(old_severities, new_severities)
- @_versioned_property(name="severities",
- doc="The allowed bug severities and their descriptions.",
- change_hook=_set_severities)
- def severities(): return {}
-
- def _setup_status(self, active_status, inactive_status):
- bug.load_status(active_status, inactive_status)
- def _set_active_status(self, old_active_status, new_active_status):
- self._setup_status(new_active_status, self.inactive_status)
- self._prop_save_settings(old_active_status, new_active_status)
- @_versioned_property(name="active_status",
- doc="The allowed active bug states and their descriptions.",
- change_hook=_set_active_status)
- def active_status(): return {}
-
- def _set_inactive_status(self, old_inactive_status, new_inactive_status):
- self._setup_status(self.active_status, new_inactive_status)
- self._prop_save_settings(old_inactive_status, new_inactive_status)
- @_versioned_property(name="inactive_status",
- doc="The allowed inactive bug states and their descriptions.",
- change_hook=_set_inactive_status)
- def inactive_status(): return {}
-
-
- def __init__(self, root=None, sink_to_existing_root=True,
- assert_new_BugDir=False, allow_vcs_init=False,
- manipulate_encodings=True, from_disk=False, vcs=None):
- list.__init__(self)
- settings_object.SavedSettingsObject.__init__(self)
- self._manipulate_encodings = manipulate_encodings
- if root == None:
- root = os.getcwd()
- if sink_to_existing_root == True:
- self.root = self._find_root(root)
- else:
- if not os.path.exists(root):
- self.root = None
- raise NoRootEntry(root)
- self.root = root
- # get a temporary vcs until we've loaded settings
- self.sync_with_disk = False
- self.vcs = self._guess_vcs()
-
- if from_disk == True:
- self.sync_with_disk = True
- self.load()
- else:
- self.sync_with_disk = False
- if assert_new_BugDir == True:
- if os.path.exists(self.get_path()):
- raise AlreadyInitialized, self.get_path()
- if vcs == None:
- vcs = self._guess_vcs(allow_vcs_init)
- self.vcs = vcs
- self._setup_user_id(self.user_id)
-
- def cleanup(self):
- self.vcs.cleanup()
-
- # methods for getting the BugDir situated in the filesystem
-
- def _find_root(self, path):
- """
- Search for an existing bug database dir and it's ancestors and
- return a BugDir rooted there. Only called by __init__, and
- then only if sink_to_existing_root == True.
- """
- if not os.path.exists(path):
- self.root = None
- raise NoRootEntry(path)
- versionfile=utility.search_parent_directories(path,
- os.path.join(".be", "version"))
- if versionfile != None:
- beroot = os.path.dirname(versionfile)
- root = os.path.dirname(beroot)
- return root
- else:
- beroot = utility.search_parent_directories(path, ".be")
- if beroot == None:
- self.root = None
- raise NoBugDir(path)
- return beroot
-
- def _guess_vcs(self, allow_vcs_init=False):
- """
- Only called by __init__.
- """
- deepdir = self.get_path()
- if not os.path.exists(deepdir):
- deepdir = os.path.dirname(deepdir)
- new_vcs = vcs.detect_vcs(deepdir)
- install = False
- if new_vcs.name == "None":
- if allow_vcs_init == True:
- new_vcs = vcs.installed_vcs()
- new_vcs.init(self.root)
- return new_vcs
-
- # methods for saving/loading/accessing settings and properties.
-
- def get_path(self, *args):
- """
- Return a path relative to .root.
- """
- dir = os.path.join(self.root, ".be")
- if len(args) == 0:
- return dir
- assert args[0] in ["version", "settings", "bugs"], str(args)
- return os.path.join(dir, *args)
-
- def _get_settings(self, settings_path, for_duplicate_bugdir=False):
- allow_no_vcs = not self.vcs.path_in_root(settings_path)
- if allow_no_vcs == True:
- assert for_duplicate_bugdir == True
- if self.sync_with_disk == False and for_duplicate_bugdir == False:
- # duplicates can ignore this bugdir's .sync_with_disk status
- raise DiskAccessRequired("_get settings")
- try:
- settings = mapfile.map_load(self.vcs, settings_path, allow_no_vcs)
- except vcs.NoSuchFile:
- settings = {"vcs_name": "None"}
- return settings
-
- def _save_settings(self, settings_path, settings,
- for_duplicate_bugdir=False):
- allow_no_vcs = not self.vcs.path_in_root(settings_path)
- if allow_no_vcs == True:
- assert for_duplicate_bugdir == True
- if self.sync_with_disk == False and for_duplicate_bugdir == False:
- # duplicates can ignore this bugdir's .sync_with_disk status
- raise DiskAccessRequired("_save settings")
- self.vcs.mkdir(self.get_path(), allow_no_vcs)
- mapfile.map_save(self.vcs, settings_path, settings, allow_no_vcs)
-
- def load_settings(self):
- self.settings = self._get_settings(self.get_path("settings"))
- self._setup_saved_settings()
- self._setup_user_id(self.user_id)
- self._setup_encoding(self.encoding)
- self._setup_severities(self.severities)
- self._setup_status(self.active_status, self.inactive_status)
- self.vcs = vcs.vcs_by_name(self.vcs_name)
- self._setup_user_id(self.user_id)
-
- def save_settings(self):
- settings = self._get_saved_settings()
- self._save_settings(self.get_path("settings"), settings)
-
- def get_version(self, path=None, use_none_vcs=False,
- for_duplicate_bugdir=False):
- """
- Requires disk access.
- """
- if self.sync_with_disk == False:
- raise DiskAccessRequired("get version")
- if use_none_vcs == True:
- VCS = vcs.vcs_by_name("None")
- VCS.root(self.root)
- VCS.encoding = encoding.get_encoding()
- else:
- VCS = self.vcs
-
- if path == None:
- path = self.get_path("version")
- allow_no_vcs = not VCS.path_in_root(path)
- if allow_no_vcs == True:
- assert for_duplicate_bugdir == True
- version = VCS.get_file_contents(
- path, allow_no_vcs=allow_no_vcs).rstrip("\n")
- return version
-
- def set_version(self):
- """
- Requires disk access.
- """
- if self.sync_with_disk == False:
- raise DiskAccessRequired("set version")
- self.vcs.mkdir(self.get_path())
- self.vcs.set_file_contents(self.get_path("version"),
- upgrade.BUGDIR_DISK_VERSION+"\n")
-
- # methods controlling disk access
-
- def set_sync_with_disk(self, value):
- """
- Adjust .sync_with_disk for the BugDir and all it's children.
- See the BugDir docstring for a description of the role of
- .sync_with_disk.
- """
- self.sync_with_disk = value
- for bug in self:
- bug.set_sync_with_disk(value)
-
- def load(self):
- """
- Reqires disk access
- """
- version = self.get_version(use_none_vcs=True)
- if version != upgrade.BUGDIR_DISK_VERSION:
- upgrade.upgrade(self.root, version)
- else:
- if not os.path.exists(self.get_path()):
- raise NoBugDir(self.get_path())
- self.load_settings()
-
- def load_all_bugs(self):
- """
- Requires disk access.
- Warning: this could take a while.
- """
- if self.sync_with_disk == False:
- raise DiskAccessRequired("load all bugs")
- self._clear_bugs()
- for uuid in self.list_uuids():
- self._load_bug(uuid)
-
- def save(self):
- """
- Note that this command writes to disk _regardless_ of the
- status of .sync_with_disk.
-
- Save any loaded contents to disk. Because of lazy loading of
- bugs and comments, this is actually not too inefficient.
-
- However, if .sync_with_disk = True, then any changes are
- automatically written to disk as soon as they happen, so
- calling this method will just waste time (unless something
- else has been messing with your on-disk files).
-
- Requires disk access.
- """
- sync_with_disk = self.sync_with_disk
- if sync_with_disk == False:
- self.set_sync_with_disk(True)
- self.set_version()
- self.save_settings()
- for bug in self:
- bug.save()
- if sync_with_disk == False:
- self.set_sync_with_disk(sync_with_disk)
-
- # methods for managing duplicate BugDirs
-
- def duplicate_bugdir(self, revision):
- duplicate_path = self.vcs.duplicate_repo(revision)
-
- duplicate_version_path = os.path.join(duplicate_path, ".be", "version")
- try:
- version = self.get_version(duplicate_version_path,
- for_duplicate_bugdir=True)
- except DiskAccessRequired:
- self.sync_with_disk = True # temporarily allow access
- version = self.get_version(duplicate_version_path,
- for_duplicate_bugdir=True)
- self.sync_with_disk = False
- if version != upgrade.BUGDIR_DISK_VERSION:
- upgrade.upgrade(duplicate_path, version)
-
- # setup revision VCS as None, since the duplicate may not be
- # initialized for versioning
- duplicate_settings_path = os.path.join(duplicate_path,
- ".be", "settings")
- duplicate_settings = self._get_settings(duplicate_settings_path,
- for_duplicate_bugdir=True)
- if "vcs_name" in duplicate_settings:
- duplicate_settings["vcs_name"] = "None"
- duplicate_settings["user_id"] = self.user_id
- if "disabled" in bug.status_values:
- # Hack to support old versions of BE bugs
- duplicate_settings["inactive_status"] = self.inactive_status
- self._save_settings(duplicate_settings_path, duplicate_settings,
- for_duplicate_bugdir=True)
-
- return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
-
- def remove_duplicate_bugdir(self):
- self.vcs.remove_duplicate_repo()
-
- # methods for managing bugs
-
- def list_uuids(self):
- uuids = []
- if self.sync_with_disk == True and os.path.exists(self.get_path()):
- # list the uuids on disk
- if os.path.exists(self.get_path("bugs")):
- for uuid in os.listdir(self.get_path("bugs")):
- if not (uuid.startswith('.')):
- uuids.append(uuid)
- yield uuid
- # and the ones that are still just in memory
- for bug in self:
- if bug.uuid not in uuids:
- uuids.append(bug.uuid)
- yield bug.uuid
-
- def _clear_bugs(self):
- while len(self) > 0:
- self.pop()
- self._bug_map_gen()
-
- def _load_bug(self, uuid):
- if self.sync_with_disk == False:
- raise DiskAccessRequired("_load bug")
- bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
- self.append(bg)
- self._bug_map_gen()
- return bg
-
- def new_bug(self, uuid=None, summary=None):
- bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary)
- bg.set_sync_with_disk(self.sync_with_disk)
- if bg.sync_with_disk == True:
- bg.save()
- self.append(bg)
- self._bug_map_gen()
- return bg
-
- def remove_bug(self, bug):
- self.remove(bug)
- if bug.sync_with_disk == True:
- bug.remove()
-
- def bug_shortname(self, bug):
- """
- Generate short names from uuids. Picks the minimum number of
- characters (>=3) from the beginning of the uuid such that the
- short names are unique.
-
- Obviously, as the number of bugs in the database grows, these
- short names will cease to be unique. The complete uuid should be
- used for long term reference.
- """
- chars = 3
- for uuid in self._bug_map.keys():
- if bug.uuid == uuid:
- continue
- while (bug.uuid[:chars] == uuid[:chars]):
- chars+=1
- return bug.uuid[:chars]
-
- def bug_from_shortname(self, shortname):
- """
- >>> bd = SimpleBugDir(sync_with_disk=False)
- >>> bug_a = bd.bug_from_shortname('a')
- >>> print type(bug_a)
- <class 'libbe.bug.Bug'>
- >>> print bug_a
- a:om: Bug A
- >>> bd.cleanup()
- """
- matches = []
- self._bug_map_gen()
- for uuid in self._bug_map.keys():
- if uuid.startswith(shortname):
- matches.append(uuid)
- if len(matches) > 1:
- raise MultipleBugMatches(shortname, matches)
- if len(matches) == 1:
- return self.bug_from_uuid(matches[0])
- raise NoBugMatches(shortname)
-
- def bug_from_uuid(self, uuid):
- if not self.has_bug(uuid):
- raise KeyError("No bug matches %s\n bug map: %s\n root: %s" \
- % (uuid, self._bug_map, self.root))
- if self._bug_map[uuid] == None:
- self._load_bug(uuid)
- return self._bug_map[uuid]
-
- def has_bug(self, bug_uuid):
- if bug_uuid not in self._bug_map:
- self._bug_map_gen()
- if bug_uuid not in self._bug_map:
- return False
- return True
-
-
-class SimpleBugDir (BugDir):
- """
- For testing. Set sync_with_disk==False for a memory-only bugdir.
- >>> bugdir = SimpleBugDir()
- >>> uuids = list(bugdir.list_uuids())
- >>> uuids.sort()
- >>> print uuids
- ['a', 'b']
- >>> bugdir.cleanup()
- """
- def __init__(self, sync_with_disk=True):
- if sync_with_disk == True:
- dir = utility.Dir()
- assert os.path.exists(dir.path)
- root = dir.path
- assert_new_BugDir = True
- vcs_init = True
- else:
- root = "/"
- assert_new_BugDir = False
- vcs_init = False
- BugDir.__init__(self, root, sink_to_existing_root=False,
- assert_new_BugDir=assert_new_BugDir,
- allow_vcs_init=vcs_init,
- manipulate_encodings=False)
- if sync_with_disk == True: # postpone cleanup since dir.cleanup() removes dir.
- self._dir_ref = dir
- bug_a = self.new_bug("a", summary="Bug A")
- bug_a.creator = "John Doe <jdoe@example.com>"
- bug_a.time = 0
- bug_b = self.new_bug("b", summary="Bug B")
- bug_b.creator = "Jane Doe <jdoe@example.com>"
- bug_b.time = 0
- bug_b.status = "closed"
- if sync_with_disk == True:
- self.save()
- self.set_sync_with_disk(True)
- def cleanup(self):
- if hasattr(self, "_dir_ref"):
- self._dir_ref.cleanup()
- BugDir.cleanup(self)
-
-class BugDirTestCase(unittest.TestCase):
- def setUp(self):
- self.dir = utility.Dir()
- self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
- allow_vcs_init=True)
- self.vcs = self.bugdir.vcs
- def tearDown(self):
- self.bugdir.cleanup()
- self.dir.cleanup()
- def fullPath(self, path):
- return os.path.join(self.dir.path, path)
- def assertPathExists(self, path):
- fullpath = self.fullPath(path)
- self.failUnless(os.path.exists(fullpath)==True,
- "path %s does not exist" % fullpath)
- self.assertRaises(AlreadyInitialized, BugDir,
- self.dir.path, assertNewBugDir=True)
- def versionTest(self):
- if self.vcs.versioned == False:
- return
- original = self.bugdir.vcs.commit("Began versioning")
- bugA = self.bugdir.bug_from_uuid("a")
- bugA.status = "fixed"
- self.bugdir.save()
- new = self.vcs.commit("Fixed bug a")
- dupdir = self.bugdir.duplicate_bugdir(original)
- self.failUnless(dupdir.root != self.bugdir.root,
- "%s, %s" % (dupdir.root, self.bugdir.root))
- bugAorig = dupdir.bug_from_uuid("a")
- self.failUnless(bugA != bugAorig,
- "\n%s\n%s" % (bugA.string(), bugAorig.string()))
- bugAorig.status = "fixed"
- self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
- "%s, %s" % (bugA.status, bugAorig.status))
- self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
- "%s, %s" % (bugA.severity, bugAorig.severity))
- self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
- "%s, %s" % (bugA.assigned, bugAorig.assigned))
- self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
- "%s, %s" % (bugA.time, bugAorig.time))
- self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
- "%s, %s" % (bugA.creator, bugAorig.creator))
- self.failUnless(bugA == bugAorig,
- "\n%s\n%s" % (bugA.string(), bugAorig.string()))
- self.bugdir.remove_duplicate_bugdir()
- self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root))
- def testRun(self):
- self.bugdir.new_bug(uuid="a", summary="Ant")
- self.bugdir.new_bug(uuid="b", summary="Cockroach")
- self.bugdir.new_bug(uuid="c", summary="Praying mantis")
- length = len(self.bugdir)
- self.failUnless(length == 3, "%d != 3 bugs" % length)
- uuids = list(self.bugdir.list_uuids())
- self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids))
- self.failUnless(uuids == ["a","b","c"], str(uuids))
- bugA = self.bugdir.bug_from_uuid("a")
- bugAprime = self.bugdir.bug_from_shortname("a")
- self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
- self.bugdir.save()
- self.versionTest()
- def testComments(self, sync_with_disk=False):
- if sync_with_disk == True:
- self.bugdir.set_sync_with_disk(True)
- self.bugdir.new_bug(uuid="a", summary="Ant")
- bug = self.bugdir.bug_from_uuid("a")
- comm = bug.comment_root
- rep = comm.new_reply("Ants are small.")
- rep.new_reply("And they have six legs.")
- if sync_with_disk == False:
- self.bugdir.save()
- self.bugdir.set_sync_with_disk(True)
- self.bugdir._clear_bugs()
- bug = self.bugdir.bug_from_uuid("a")
- bug.load_comments()
- if sync_with_disk == False:
- self.bugdir.set_sync_with_disk(False)
- self.failUnless(len(bug.comment_root)==1, len(bug.comment_root))
- for index,comment in enumerate(bug.comments()):
- if index == 0:
- repLoaded = comment
- self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid)
- self.failUnless(comment.sync_with_disk == sync_with_disk,
- comment.sync_with_disk)
- self.failUnless(comment.content_type == "text/plain",
- comment.content_type)
- self.failUnless(repLoaded.settings["Content-type"]=="text/plain",
- repLoaded.settings)
- self.failUnless(repLoaded.body == "Ants are small.",
- repLoaded.body)
- elif index == 1:
- self.failUnless(comment.in_reply_to == repLoaded.uuid,
- repLoaded.uuid)
- self.failUnless(comment.body == "And they have six legs.",
- comment.body)
- else:
- self.failIf(True, "Invalid comment: %d\n%s" % (index, comment))
- def testSyncedComments(self):
- self.testComments(sync_with_disk=True)
-
-class SimpleBugDirTestCase (unittest.TestCase):
- def setUp(self):
- # create a pre-existing bugdir in a temporary directory
- self.dir = utility.Dir()
- self.original_working_dir = os.getcwd()
- os.chdir(self.dir.path)
- self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
- allow_vcs_init=True)
- self.bugdir.new_bug("preexisting", summary="Hopefully not imported")
- self.bugdir.save()
- def tearDown(self):
- os.chdir(self.original_working_dir)
- self.bugdir.cleanup()
- self.dir.cleanup()
- def testOnDiskCleanLoad(self):
- """SimpleBugDir(sync_with_disk==True) should not import preexisting bugs."""
- bugdir = SimpleBugDir(sync_with_disk=True)
- self.failUnless(bugdir.sync_with_disk==True, bugdir.sync_with_disk)
- uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == ['a', 'b'], uuids)
- bugdir._clear_bugs()
- uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == [], uuids)
- bugdir.load_all_bugs()
- uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == ['a', 'b'], uuids)
- bugdir.cleanup()
- def testInMemoryCleanLoad(self):
- """SimpleBugDir(sync_with_disk==False) should not import preexisting bugs."""
- bugdir = SimpleBugDir(sync_with_disk=False)
- self.failUnless(bugdir.sync_with_disk==False, bugdir.sync_with_disk)
- uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == ['a', 'b'], uuids)
- self.failUnlessRaises(DiskAccessRequired, bugdir.load_all_bugs)
- uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == ['a', 'b'], uuids)
- bugdir._clear_bugs()
- uuids = sorted([bug.uuid for bug in bugdir])
- self.failUnless(uuids == [], uuids)
- bugdir.cleanup()
-
-unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/interfaces/email/interactive/libbe/bzr.py b/interfaces/email/interactive/libbe/bzr.py
deleted file mode 100644
index ed9e032..0000000
--- a/interfaces/email/interactive/libbe/bzr.py
+++ /dev/null
@@ -1,113 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# Ben Finney <ben+python@benfinney.id.au>
-# Marien Zwart <marienz@gentoo.org>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Bazaar (bzr) backend.
-"""
-
-import os
-import re
-import sys
-import unittest
-import doctest
-
-import vcs
-
-
-def new():
- return Bzr()
-
-class Bzr(vcs.VCS):
- name = "bzr"
- client = "bzr"
- versioned = True
- def _vcs_version(self):
- status,output,error = self._u_invoke_client("--version")
- return output
- def _vcs_detect(self, path):
- if self._u_search_parent_directories(path, ".bzr") != None :
- return True
- return False
- def _vcs_root(self, path):
- """Find the root of the deepest repository containing path."""
- status,output,error = self._u_invoke_client("root", path)
- return output.rstrip('\n')
- def _vcs_init(self, path):
- self._u_invoke_client("init", directory=path)
- def _vcs_get_user_id(self):
- status,output,error = self._u_invoke_client("whoami")
- return output.rstrip('\n')
- def _vcs_set_user_id(self, value):
- self._u_invoke_client("whoami", value)
- def _vcs_add(self, path):
- self._u_invoke_client("add", path)
- def _vcs_remove(self, path):
- # --force to also remove unversioned files.
- self._u_invoke_client("remove", "--force", path)
- def _vcs_update(self, path):
- pass
- def _vcs_get_file_contents(self, path, revision=None, binary=False):
- if revision == None:
- return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
- else:
- status,output,error = \
- self._u_invoke_client("cat","-r",revision,path)
- return output
- def _vcs_duplicate_repo(self, directory, revision=None):
- if revision == None:
- vcs.VCS._vcs_duplicate_repo(self, directory, revision)
- else:
- self._u_invoke_client("branch", "--revision", revision,
- ".", directory)
- def _vcs_commit(self, commitfile, allow_empty=False):
- args = ["commit", "--file", commitfile]
- if allow_empty == True:
- args.append("--unchanged")
- status,output,error = self._u_invoke_client(*args)
- else:
- kwargs = {"expect":(0,3)}
- status,output,error = self._u_invoke_client(*args, **kwargs)
- if status != 0:
- strings = ["ERROR: no changes to commit.", # bzr 1.3.1
- "ERROR: No changes to commit."] # bzr 1.15.1
- if self._u_any_in_string(strings, error) == True:
- raise vcs.EmptyCommit()
- else:
- raise vcs.CommandError(args, status, stdout="", stderr=error)
- revision = None
- revline = re.compile("Committed revision (.*)[.]")
- match = revline.search(error)
- assert match != None, output+error
- assert len(match.groups()) == 1
- revision = match.groups()[0]
- return revision
- def _vcs_revision_id(self, index):
- status,output,error = self._u_invoke_client("revno")
- current_revision = int(output)
- if index >= current_revision or index < -current_revision:
- return None
- if index >= 0:
- return str(index+1) # bzr commit 0 is the empty tree.
- return str(current_revision+index+1)
-
-
-vcs.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__])
-
-unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/interfaces/email/interactive/libbe/cmdutil.py b/interfaces/email/interactive/libbe/cmdutil.py
deleted file mode 100644
index 9b64142..0000000
--- a/interfaces/email/interactive/libbe/cmdutil.py
+++ /dev/null
@@ -1,233 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Define assorted utilities to make command-line handling easier.
-"""
-
-import glob
-import optparse
-import os
-from textwrap import TextWrapper
-from StringIO import StringIO
-import sys
-import doctest
-
-import bugdir
-import plugin
-import encoding
-
-
-class UserError(Exception):
- def __init__(self, msg):
- Exception.__init__(self, msg)
-
-class UnknownCommand(UserError):
- def __init__(self, cmd):
- Exception.__init__(self, "Unknown command '%s'" % cmd)
- self.cmd = cmd
-
-class UsageError(Exception):
- pass
-
-class GetHelp(Exception):
- pass
-
-class GetCompletions(Exception):
- def __init__(self, completions=[]):
- msg = "Get allowed completions"
- Exception.__init__(self, msg)
- self.completions = completions
-
-def iter_commands():
- for name, module in plugin.iter_plugins("becommands"):
- yield name.replace("_", "-"), module
-
-def get_command(command_name):
- """Retrieves the module for a user command
-
- >>> try:
- ... get_command("asdf")
- ... except UnknownCommand, e:
- ... print e
- Unknown command 'asdf'
- >>> repr(get_command("list")).startswith("<module 'becommands.list' from ")
- True
- """
- cmd = plugin.get_plugin("becommands", command_name.replace("-", "_"))
- if cmd is None:
- raise UnknownCommand(command_name)
- return cmd
-
-
-def execute(cmd, args, manipulate_encodings=True):
- enc = encoding.get_encoding()
- cmd = get_command(cmd)
- ret = cmd.execute([a.decode(enc) for a in args],
- manipulate_encodings=manipulate_encodings)
- if ret == None:
- ret = 0
- return ret
-
-def help(cmd=None, parser=None):
- if cmd != None:
- return get_command(cmd).help()
- else:
- cmdlist = []
- for name, module in iter_commands():
- cmdlist.append((name, module.__desc__))
- longest_cmd_len = max([len(name) for name,desc in cmdlist])
- ret = ["Bugs Everywhere - Distributed bug tracking",
- "", "Supported commands"]
- for name, desc in cmdlist:
- numExtraSpaces = longest_cmd_len-len(name)
- ret.append("be %s%*s %s" % (name, numExtraSpaces, "", desc))
- ret.extend(["", "Run", " be help [command]", "for more information."])
- longhelp = "\n".join(ret)
- if parser == None:
- return longhelp
- return parser.help_str() + "\n" + longhelp
-
-def completions(cmd):
- parser = get_command(cmd).get_parser()
- longopts = []
- for opt in parser.option_list:
- longopts.append(opt.get_opt_string())
- return longopts
-
-def raise_get_help(option, opt, value, parser):
- raise GetHelp
-
-def raise_get_completions(option, opt, value, parser):
- print "got completion arg"
- if hasattr(parser, "command") and parser.command == "be":
- comps = []
- for command, module in iter_commands():
- comps.append(command)
- for opt in parser.option_list:
- comps.append(opt.get_opt_string())
- raise GetCompletions(comps)
- raise GetCompletions(completions(sys.argv[1]))
-
-class CmdOptionParser(optparse.OptionParser):
- def __init__(self, usage):
- optparse.OptionParser.__init__(self, usage)
- self.disable_interspersed_args()
- self.remove_option("-h")
- self.add_option("-h", "--help", action="callback",
- callback=raise_get_help, help="Print a help message")
- self.add_option("--complete", action="callback",
- callback=raise_get_completions,
- help="Print a list of available completions")
-
- def error(self, message):
- raise UsageError(message)
-
- def iter_options(self):
- return iter_combine([self._short_opt.iterkeys(),
- self._long_opt.iterkeys()])
-
- def help_str(self):
- f = StringIO()
- self.print_help(f)
- return f.getvalue()
-
-def option_value_pairs(options, parser):
- """
- Iterate through OptionParser (option, value) pairs.
- """
- for option in [o.dest for o in parser.option_list if o.dest != None]:
- value = getattr(options, option)
- yield (option, value)
-
-def default_complete(options, args, parser, bugid_args={}):
- """
- A dud complete implementation for becommands so that the
- --complete argument doesn't cause any problems. Use this
- until you've set up a command-specific complete function.
-
- bugid_args is an optional dict where the keys are positional
- arguments taking bug shortnames and the values are functions for
- filtering, since that's a common enough operation.
- e.g. for "be open [options] BUGID"
- bugid_args = {0: lambda bug : bug.active == False}
- A positional argument of -1 specifies all remaining arguments
- (e.g in the case of "be show BUGID BUGID ...").
- """
- for option,value in option_value_pairs(options, parser):
- if value == "--complete":
- raise GetCompletions()
- if len(bugid_args.keys()) > 0:
- max_pos_arg = max(bugid_args.keys())
- else:
- max_pos_arg = -1
- for pos,value in enumerate(args):
- if value == "--complete":
- filter = None
- if pos in bugid_args:
- filter = bugid_args[pos]
- if pos > max_pos_arg and -1 in bugid_args:
- filter = bugid_args[-1]
- if filter != None:
- bugshortnames = []
- try:
- bd = bugdir.BugDir(from_disk=True,
- manipulate_encodings=False)
- bd.load_all_bugs()
- bugs = [bug for bug in bd if filter(bug) == True]
- bugshortnames = [bd.bug_shortname(bug) for bug in bugs]
- except bugdir.NoBugDir:
- pass
- raise GetCompletions(bugshortnames)
- raise GetCompletions()
-
-def complete_path(path):
- """List possible path completions for path."""
- comps = glob.glob(path+"*") + glob.glob(path+"/*")
- if len(comps) == 1 and os.path.isdir(comps[0]):
- comps.extend(glob.glob(comps[0]+"/*"))
- return comps
-
-def underlined(instring):
- """Produces a version of a string that is underlined with '='
-
- >>> underlined("Underlined String")
- 'Underlined String\\n================='
- """
-
- return "%s\n%s" % (instring, "="*len(instring))
-
-def bug_from_shortname(bdir, shortname):
- """
- Exception translation for the command-line interface.
- """
- try:
- bug = bdir.bug_from_shortname(shortname)
- except (bugdir.MultipleBugMatches, bugdir.NoBugMatches), e:
- raise UserError(e.message)
- return bug
-
-def _test():
- import doctest
- import sys
- doctest.testmod()
-
-if __name__ == "__main__":
- _test()
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/comment.py b/interfaces/email/interactive/libbe/comment.py
deleted file mode 100644
index 41bc7e6..0000000
--- a/interfaces/email/interactive/libbe/comment.py
+++ /dev/null
@@ -1,744 +0,0 @@
-# Bugs Everywhere, a distributed bugtracker
-# Copyright (C) 2008-2009 Chris Ball <cjb@laptop.org>
-# Thomas Habets <thomas@habets.pp.se>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Define the Comment class for representing bug comments.
-"""
-
-import base64
-import os
-import os.path
-import sys
-import time
-import types
-try: # import core module, Python >= 2.5
- from xml.etree import ElementTree
-except ImportError: # look for non-core module
- from elementtree import ElementTree
-import xml.sax.saxutils
-import doctest
-
-from beuuid import uuid_gen
-from properties import Property, doc_property, local_property, \
- defaulting_property, checked_property, cached_property, \
- primed_property, change_hook_property, settings_property
-import settings_object
-import mapfile
-from tree import Tree
-import utility
-
-
-class InvalidShortname(KeyError):
- def __init__(self, shortname, shortnames):
- msg = "Invalid shortname %s\n%s" % (shortname, shortnames)
- KeyError.__init__(self, msg)
- self.shortname = shortname
- self.shortnames = shortnames
-
-class InvalidXML(ValueError):
- def __init__(self, element, comment):
- msg = "Invalid comment xml: %s\n %s\n" \
- % (comment, ElementTree.tostring(element))
- ValueError.__init__(self, msg)
- self.element = element
- self.comment = comment
-
-class MissingReference(ValueError):
- def __init__(self, comment):
- msg = "Missing reference to %s" % (comment.in_reply_to)
- ValueError.__init__(self, msg)
- self.reference = comment.in_reply_to
- self.comment = comment
-
-class DiskAccessRequired (Exception):
- def __init__(self, goal):
- msg = "Cannot %s without accessing the disk" % goal
- Exception.__init__(self, msg)
-
-INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!"
-
-def list_to_root(comments, bug, root=None,
- ignore_missing_references=False):
- """
- Convert a raw list of comments to single root comment. We use a
- dummy root comment by default, because there can be several
- comment threads rooted on the same parent bug. To simplify
- comment interaction, we condense these threads into a single
- thread with a Comment dummy root. Can also be used to append
- a list of subcomments to a non-dummy root comment, so long as
- all the new comments are descendants of the root comment.
-
- No Comment method should use the dummy comment.
- """
- root_comments = []
- uuid_map = {}
- for comment in comments:
- assert comment.uuid != None
- uuid_map[comment.uuid] = comment
- for comment in comments:
- if comment.alt_id != None and comment.alt_id not in uuid_map:
- uuid_map[comment.alt_id] = comment
- if root == None:
- root = Comment(bug, uuid=INVALID_UUID)
- else:
- uuid_map[root.uuid] = root
- for comm in comments:
- if comm.in_reply_to == INVALID_UUID:
- comm.in_reply_to = None
- rep = comm.in_reply_to
- if rep == None or rep == bug.uuid:
- root_comments.append(comm)
- else:
- parentUUID = comm.in_reply_to
- try:
- parent = uuid_map[parentUUID]
- parent.add_reply(comm)
- except KeyError, e:
- if ignore_missing_references == True:
- print >> sys.stderr, \
- "Ignoring missing reference to %s" % parentUUID
- comm.in_reply_to = None
- root_comments.append(comm)
- else:
- raise MissingReference(comm)
- root.extend(root_comments)
- return root
-
-def loadComments(bug, load_full=False):
- """
- Set load_full=True when you want to load the comment completely
- from disk *now*, rather than waiting and lazy loading as required.
- """
- if bug.sync_with_disk == False:
- raise DiskAccessRequired("load comments")
- path = bug.get_path("comments")
- if not os.path.exists(path):
- return Comment(bug, uuid=INVALID_UUID)
- comments = []
- for uuid in os.listdir(path):
- if uuid.startswith('.'):
- continue
- comm = Comment(bug, uuid, from_disk=True)
- comm.set_sync_with_disk(bug.sync_with_disk)
- if load_full == True:
- comm.load_settings()
- dummy = comm.body # force the body to load
- comments.append(comm)
- return list_to_root(comments, bug)
-
-def saveComments(bug):
- if bug.sync_with_disk == False:
- raise DiskAccessRequired("save comments")
- for comment in bug.comment_root.traverse():
- comment.save()
-
-
-class Comment(Tree, settings_object.SavedSettingsObject):
- """
- >>> c = Comment()
- >>> c.uuid != None
- True
- >>> c.uuid = "some-UUID"
- >>> print c.content_type
- text/plain
- """
-
- settings_properties = []
- required_saved_properties = []
- _prop_save_settings = settings_object.prop_save_settings
- _prop_load_settings = settings_object.prop_load_settings
- def _versioned_property(settings_properties=settings_properties,
- required_saved_properties=required_saved_properties,
- **kwargs):
- if "settings_properties" not in kwargs:
- kwargs["settings_properties"] = settings_properties
- if "required_saved_properties" not in kwargs:
- kwargs["required_saved_properties"]=required_saved_properties
- return settings_object.versioned_property(**kwargs)
-
- @_versioned_property(name="Alt-id",
- doc="Alternate ID for linking imported comments. Internally comments are linked (via In-reply-to) to the parent's UUID. However, these UUIDs are generated internally, so Alt-id is provided as a user-controlled linking target.")
- def alt_id(): return {}
-
- @_versioned_property(name="Author",
- doc="The author of the comment")
- def author(): return {}
-
- @_versioned_property(name="In-reply-to",
- doc="UUID for parent comment or bug")
- def in_reply_to(): return {}
-
- @_versioned_property(name="Content-type",
- doc="Mime type for comment body",
- default="text/plain",
- require_save=True)
- def content_type(): return {}
-
- @_versioned_property(name="Date",
- doc="An RFC 2822 timestamp for comment creation")
- def date(): return {}
-
- def _get_time(self):
- if self.date == None:
- return None
- return utility.str_to_time(self.date)
- def _set_time(self, value):
- self.date = utility.time_to_str(value)
- time = property(fget=_get_time,
- fset=_set_time,
- doc="An integer version of .date")
-
- def _get_comment_body(self):
- if self.vcs != None and self.sync_with_disk == True:
- import vcs
- binary = not self.content_type.startswith("text/")
- return self.vcs.get_file_contents(self.get_path("body"), binary=binary)
- def _set_comment_body(self, old=None, new=None, force=False):
- if (self.vcs != None and self.sync_with_disk == True) or force==True:
- assert new != None, "Can't save empty comment"
- binary = not self.content_type.startswith("text/")
- self.vcs.set_file_contents(self.get_path("body"), new, binary=binary)
-
- @Property
- @change_hook_property(hook=_set_comment_body)
- @cached_property(generator=_get_comment_body)
- @local_property("body")
- @doc_property(doc="The meat of the comment")
- def body(): return {}
-
- def _get_vcs(self):
- if hasattr(self.bug, "vcs"):
- return self.bug.vcs
-
- @Property
- @cached_property(generator=_get_vcs)
- @local_property("vcs")
- @doc_property(doc="A revision control system instance.")
- def vcs(): return {}
-
- def _extra_strings_check_fn(value):
- return utility.iterable_full_of_strings(value, \
- alternative=settings_object.EMPTY)
- def _extra_strings_change_hook(self, old, new):
- self.extra_strings.sort() # to make merging easier
- self._prop_save_settings(old, new)
- @_versioned_property(name="extra_strings",
- doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.",
- default=[],
- check_fn=_extra_strings_check_fn,
- change_hook=_extra_strings_change_hook,
- mutable=True)
- def extra_strings(): return {}
-
- def __init__(self, bug=None, uuid=None, from_disk=False,
- in_reply_to=None, body=None):
- """
- Set from_disk=True to load an old comment.
- Set from_disk=False to create a new comment.
-
- The uuid option is required when from_disk==True.
-
- The in_reply_to and body options are only used if
- from_disk==False (the default). When from_disk==True, they are
- loaded from the bug database.
-
- in_reply_to should be the uuid string of the parent comment.
- """
- Tree.__init__(self)
- settings_object.SavedSettingsObject.__init__(self)
- self.bug = bug
- self.uuid = uuid
- if from_disk == True:
- self.sync_with_disk = True
- else:
- self.sync_with_disk = False
- if uuid == None:
- self.uuid = uuid_gen()
- self.time = int(time.time()) # only save to second precision
- if self.vcs != None:
- self.author = self.vcs.get_user_id()
- self.in_reply_to = in_reply_to
- self.body = body
-
- def __cmp__(self, other):
- return cmp_full(self, other)
-
- def __str__(self):
- """
- >>> comm = Comment(bug=None, body="Some insightful remarks")
- >>> comm.uuid = "com-1"
- >>> comm.date = "Thu, 20 Nov 2008 15:55:11 +0000"
- >>> comm.author = "Jane Doe <jdoe@example.com>"
- >>> print comm
- --------- Comment ---------
- Name: com-1
- From: Jane Doe <jdoe@example.com>
- Date: Thu, 20 Nov 2008 15:55:11 +0000
- <BLANKLINE>
- Some insightful remarks
- """
- return self.string()
-
- def traverse(self, *args, **kwargs):
- """Avoid working with the possible dummy root comment"""
- for comment in Tree.traverse(self, *args, **kwargs):
- if comment.uuid == INVALID_UUID:
- continue
- yield comment
-
- # serializing methods
-
- def _setting_attr_string(self, setting):
- value = getattr(self, setting)
- if value == None:
- return ""
- return str(value)
-
- def xml(self, indent=0, shortname=None):
- """
- >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
- >>> comm.uuid = "0123"
- >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000"
- >>> print comm.xml(indent=2, shortname="com-1")
- <comment>
- <uuid>0123</uuid>
- <short-name>com-1</short-name>
- <author></author>
- <date>Thu, 01 Jan 1970 00:00:00 +0000</date>
- <content-type>text/plain</content-type>
- <body>Some
- insightful
- remarks</body>
- </comment>
- """
- if shortname == None:
- shortname = self.uuid
- if self.content_type.startswith("text/"):
- body = (self.body or "").rstrip('\n')
- else:
- maintype,subtype = self.content_type.split('/',1)
- msg = email.mime.base.MIMEBase(maintype, subtype)
- msg.set_payload(self.body or "")
- email.encoders.encode_base64(msg)
- body = base64.encodestring(self.body or "")
- info = [("uuid", self.uuid),
- ("alt-id", self.alt_id),
- ("short-name", shortname),
- ("in-reply-to", self.in_reply_to),
- ("author", self._setting_attr_string("author")),
- ("date", self.date),
- ("content-type", self.content_type),
- ("body", body)]
- lines = ["<comment>"]
- for (k,v) in info:
- if v != None:
- lines.append(' <%s>%s</%s>' % (k,xml.sax.saxutils.escape(v),k))
- lines.append("</comment>")
- istring = ' '*indent
- sep = '\n' + istring
- return istring + sep.join(lines).rstrip('\n')
-
- def from_xml(self, xml_string, verbose=True):
- """
- Note: If alt-id is not given, translates any <uuid> fields to
- <alt-id> fields.
- >>> commA = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
- >>> commA.uuid = "0123"
- >>> commA.date = "Thu, 01 Jan 1970 00:00:00 +0000"
- >>> xml = commA.xml(shortname="com-1")
- >>> commB = Comment()
- >>> commB.from_xml(xml)
- >>> attrs=['uuid','alt_id','in_reply_to','author','date','content_type','body']
- >>> for attr in attrs: # doctest: +ELLIPSIS
- ... if getattr(commB, attr) != getattr(commA, attr):
- ... estr = "Mismatch on %s: '%s' should be '%s'"
- ... args = (attr, getattr(commB, attr), getattr(commA, attr))
- ... print estr % args
- Mismatch on uuid: '...' should be '0123'
- Mismatch on alt_id: '0123' should be 'None'
- >>> print commB.alt_id
- 0123
- >>> commA.author
- >>> commB.author
- """
- if type(xml_string) == types.UnicodeType:
- xml_string = xml_string.strip().encode("unicode_escape")
- comment = ElementTree.XML(xml_string)
- if comment.tag != "comment":
- raise InvalidXML(comment, "root element must be <comment>")
- tags=['uuid','alt-id','in-reply-to','author','date','content-type','body']
- uuid = None
- body = None
- for child in comment.getchildren():
- if child.tag == "short-name":
- pass
- elif child.tag in tags:
- if child.text == None or len(child.text) == 0:
- text = settings_object.EMPTY
- else:
- text = xml.sax.saxutils.unescape(child.text)
- text = unicode(text).decode("unicode_escape").strip()
- if child.tag == "uuid":
- uuid = text
- continue # don't set the bug's uuid tag.
- if child.tag == "body":
- body = text
- continue # don't set the bug's body yet.
- else:
- attr_name = child.tag.replace('-','_')
- setattr(self, attr_name, text)
- elif verbose == True:
- print >> sys.stderr, "Ignoring unknown tag %s in %s" \
- % (child.tag, comment.tag)
- if self.alt_id == None and uuid not in [None, self.uuid]:
- self.alt_id = uuid
- if body != None:
- if self.content_type.startswith("text/"):
- self.body = body+"\n" # restore trailing newline
- else:
- self.body = base64.decodestring(body)
-
- def string(self, indent=0, shortname=None):
- """
- >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
- >>> comm.date = "Thu, 01 Jan 1970 00:00:00 +0000"
- >>> print comm.string(indent=2, shortname="com-1")
- --------- Comment ---------
- Name: com-1
- From:
- Date: Thu, 01 Jan 1970 00:00:00 +0000
- <BLANKLINE>
- Some
- insightful
- remarks
- """
- if shortname == None:
- shortname = self.uuid
- lines = []
- lines.append("--------- Comment ---------")
- lines.append("Name: %s" % shortname)
- lines.append("From: %s" % (self._setting_attr_string("author")))
- lines.append("Date: %s" % self.date)
- lines.append("")
- if self.content_type.startswith("text/"):
- lines.extend((self.body or "").splitlines())
- else:
- lines.append("Content type %s not printable. Try XML output instead" % self.content_type)
-
- istring = ' '*indent
- sep = '\n' + istring
- return istring + sep.join(lines).rstrip('\n')
-
- def string_thread(self, string_method_name="string", name_map={},
- indent=0, flatten=True,
- auto_name_map=False, bug_shortname=None):
- """
- Return a string displaying a thread of comments.
- bug_shortname is only used if auto_name_map == True.
-
- string_method_name (defaults to "string") is the name of the
- Comment method used to generate the output string for each
- Comment in the thread. The method must take the arguments
- indent and shortname.
-
- SIDE-EFFECT: if auto_name_map==True, calls comment_shortnames()
- which will sort the tree by comment.time. Avoid by calling
- name_map = {}
- for shortname,comment in comm.comment_shortnames(bug_shortname):
- name_map[comment.uuid] = shortname
- comm.sort(key=lambda c : c.author) # your sort
- comm.string_thread(name_map=name_map)
-
- >>> a = Comment(bug=None, uuid="a", body="Insightful remarks")
- >>> a.time = utility.str_to_time("Thu, 20 Nov 2008 01:00:00 +0000")
- >>> b = a.new_reply("Critique original comment")
- >>> b.uuid = "b"
- >>> b.time = utility.str_to_time("Thu, 20 Nov 2008 02:00:00 +0000")
- >>> c = b.new_reply("Begin flamewar :p")
- >>> c.uuid = "c"
- >>> c.time = utility.str_to_time("Thu, 20 Nov 2008 03:00:00 +0000")
- >>> d = a.new_reply("Useful examples")
- >>> d.uuid = "d"
- >>> d.time = utility.str_to_time("Thu, 20 Nov 2008 04:00:00 +0000")
- >>> a.sort(key=lambda comm : comm.time)
- >>> print a.string_thread(flatten=True)
- --------- Comment ---------
- Name: a
- From:
- Date: Thu, 20 Nov 2008 01:00:00 +0000
- <BLANKLINE>
- Insightful remarks
- --------- Comment ---------
- Name: b
- From:
- Date: Thu, 20 Nov 2008 02:00:00 +0000
- <BLANKLINE>
- Critique original comment
- --------- Comment ---------
- Name: c
- From:
- Date: Thu, 20 Nov 2008 03:00:00 +0000
- <BLANKLINE>
- Begin flamewar :p
- --------- Comment ---------
- Name: d
- From:
- Date: Thu, 20 Nov 2008 04:00:00 +0000
- <BLANKLINE>
- Useful examples
- >>> print a.string_thread(auto_name_map=True, bug_shortname="bug-1")
- --------- Comment ---------
- Name: bug-1:1
- From:
- Date: Thu, 20 Nov 2008 01:00:00 +0000
- <BLANKLINE>
- Insightful remarks
- --------- Comment ---------
- Name: bug-1:2
- From:
- Date: Thu, 20 Nov 2008 02:00:00 +0000
- <BLANKLINE>
- Critique original comment
- --------- Comment ---------
- Name: bug-1:3
- From:
- Date: Thu, 20 Nov 2008 03:00:00 +0000
- <BLANKLINE>
- Begin flamewar :p
- --------- Comment ---------
- Name: bug-1:4
- From:
- Date: Thu, 20 Nov 2008 04:00:00 +0000
- <BLANKLINE>
- Useful examples
- """
- if auto_name_map == True:
- name_map = {}
- for shortname,comment in self.comment_shortnames(bug_shortname):
- name_map[comment.uuid] = shortname
- stringlist = []
- for depth,comment in self.thread(flatten=flatten):
- ind = 2*depth+indent
- if comment.uuid in name_map:
- sname = name_map[comment.uuid]
- else:
- sname = None
- string_fn = getattr(comment, string_method_name)
- stringlist.append(string_fn(indent=ind, shortname=sname))
- return '\n'.join(stringlist)
-
- def xml_thread(self, name_map={}, indent=0,
- auto_name_map=False, bug_shortname=None):
- return self.string_thread(string_method_name="xml", name_map=name_map,
- indent=indent, auto_name_map=auto_name_map,
- bug_shortname=bug_shortname)
-
- # methods for saving/loading/acessing settings and properties.
-
- def get_path(self, *args):
- dir = os.path.join(self.bug.get_path("comments"), self.uuid)
- if len(args) == 0:
- return dir
- assert args[0] in ["values", "body"], str(args)
- return os.path.join(dir, *args)
-
- def set_sync_with_disk(self, value):
- self.sync_with_disk = value
-
- def load_settings(self):
- if self.sync_with_disk == False:
- raise DiskAccessRequired("load settings")
- self.settings = mapfile.map_load(self.vcs, self.get_path("values"))
- self._setup_saved_settings()
-
- def save_settings(self):
- if self.sync_with_disk == False:
- raise DiskAccessRequired("save settings")
- self.vcs.mkdir(self.get_path())
- path = self.get_path("values")
- mapfile.map_save(self.vcs, path, self._get_saved_settings())
-
- def save(self):
- """
- Save any loaded contents to disk.
-
- However, if self.sync_with_disk = True, then any changes are
- automatically written to disk as soon as they happen, so
- calling this method will just waste time (unless something
- else has been messing with your on-disk files).
- """
- sync_with_disk = self.sync_with_disk
- if sync_with_disk == False:
- self.set_sync_with_disk(True)
- assert self.body != None, "Can't save blank comment"
- self.save_settings()
- self._set_comment_body(new=self.body, force=True)
- if sync_with_disk == False:
- self.set_sync_with_disk(False)
-
- def remove(self):
- if self.sync_with_disk == False and self.uuid != INVALID_UUID:
- raise DiskAccessRequired("remove")
- for comment in self.traverse():
- path = comment.get_path()
- self.vcs.recursive_remove(path)
-
- def add_reply(self, reply, allow_time_inversion=False):
- if self.uuid != INVALID_UUID:
- reply.in_reply_to = self.uuid
- self.append(reply)
-
- def new_reply(self, body=None):
- """
- >>> comm = Comment(bug=None, body="Some insightful remarks")
- >>> repA = comm.new_reply("Critique original comment")
- >>> repB = repA.new_reply("Begin flamewar :p")
- >>> repB.in_reply_to == repA.uuid
- True
- """
- reply = Comment(self.bug, body=body)
- if self.bug != None:
- reply.set_sync_with_disk(self.bug.sync_with_disk)
- if reply.sync_with_disk == True:
- reply.save()
- self.add_reply(reply)
- return reply
-
- def comment_shortnames(self, bug_shortname=None):
- """
- Iterate through (id, comment) pairs, in time order.
- (This is a user-friendly id, not the comment uuid).
-
- SIDE-EFFECT : will sort the comment tree by comment.time
-
- >>> a = Comment(bug=None, uuid="a")
- >>> b = a.new_reply()
- >>> b.uuid = "b"
- >>> c = b.new_reply()
- >>> c.uuid = "c"
- >>> d = a.new_reply()
- >>> d.uuid = "d"
- >>> for id,name in a.comment_shortnames("bug-1"):
- ... print id, name.uuid
- bug-1:1 a
- bug-1:2 b
- bug-1:3 c
- bug-1:4 d
- """
- if bug_shortname == None:
- bug_shortname = ""
- self.sort(key=lambda comm : comm.time)
- for num,comment in enumerate(self.traverse()):
- yield ("%s:%d" % (bug_shortname, num+1), comment)
-
- def comment_from_shortname(self, comment_shortname, *args, **kwargs):
- """
- Use a comment shortname to look up a comment.
- >>> a = Comment(bug=None, uuid="a")
- >>> b = a.new_reply()
- >>> b.uuid = "b"
- >>> c = b.new_reply()
- >>> c.uuid = "c"
- >>> d = a.new_reply()
- >>> d.uuid = "d"
- >>> comm = a.comment_from_shortname("bug-1:3", bug_shortname="bug-1")
- >>> id(comm) == id(c)
- True
- """
- for cur_name, comment in self.comment_shortnames(*args, **kwargs):
- if comment_shortname == cur_name:
- return comment
- raise InvalidShortname(comment_shortname,
- list(self.comment_shortnames(*args, **kwargs)))
-
- def comment_from_uuid(self, uuid):
- """
- Use a comment shortname to look up a comment.
- >>> a = Comment(bug=None, uuid="a")
- >>> b = a.new_reply()
- >>> b.uuid = "b"
- >>> c = b.new_reply()
- >>> c.uuid = "c"
- >>> d = a.new_reply()
- >>> d.uuid = "d"
- >>> comm = a.comment_from_uuid("d")
- >>> id(comm) == id(d)
- True
- """
- for comment in self.traverse():
- if comment.uuid == uuid:
- return comment
- raise KeyError(uuid)
-
-def cmp_attr(comment_1, comment_2, attr, invert=False):
- """
- Compare a general attribute between two comments using the conventional
- comparison rule for that attribute type. If invert == True, sort
- *against* that convention.
- >>> attr="author"
- >>> commentA = Comment()
- >>> commentB = Comment()
- >>> commentA.author = "John Doe"
- >>> commentB.author = "Jane Doe"
- >>> cmp_attr(commentA, commentB, attr) > 0
- True
- >>> cmp_attr(commentA, commentB, attr, invert=True) < 0
- True
- >>> commentB.author = "John Doe"
- >>> cmp_attr(commentA, commentB, attr) == 0
- True
- """
- if not hasattr(comment_2, attr) :
- return 1
- val_1 = getattr(comment_1, attr)
- val_2 = getattr(comment_2, attr)
- if val_1 == None: val_1 = None
- if val_2 == None: val_2 = None
-
- if invert == True :
- return -cmp(val_1, val_2)
- else :
- return cmp(val_1, val_2)
-
-# alphabetical rankings (a < z)
-cmp_uuid = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "uuid")
-cmp_author = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "author")
-cmp_in_reply_to = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "in_reply_to")
-cmp_content_type = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "content_type")
-cmp_body = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "body")
-# chronological rankings (newer < older)
-cmp_time = lambda comment_1, comment_2 : cmp_attr(comment_1, comment_2, "time", invert=True)
-
-DEFAULT_CMP_FULL_CMP_LIST = \
- (cmp_time, cmp_author, cmp_content_type, cmp_body, cmp_in_reply_to,
- cmp_uuid)
-
-class CommentCompoundComparator (object):
- def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST):
- self.cmp_list = cmp_list
- def __call__(self, comment_1, comment_2):
- for comparison in self.cmp_list :
- val = comparison(comment_1, comment_2)
- if val != 0 :
- return val
- return 0
-
-cmp_full = CommentCompoundComparator()
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/config.py b/interfaces/email/interactive/libbe/config.py
deleted file mode 100644
index fb5a028..0000000
--- a/interfaces/email/interactive/libbe/config.py
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Create, save, and load the per-user config file at path().
-"""
-
-import ConfigParser
-import codecs
-import locale
-import os.path
-import sys
-import doctest
-
-
-default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
-
-def path():
- """Return the path to the per-user config file"""
- return os.path.expanduser("~/.bugs_everywhere")
-
-def set_val(name, value, section="DEFAULT", encoding=None):
- """Set a value in the per-user config file
-
- :param name: The name of the value to set
- :param value: The new value to set (or None to delete the value)
- :param section: The section to store the name/value in
- """
- if encoding == None:
- encoding = default_encoding
- config = ConfigParser.ConfigParser()
- if os.path.exists(path()) == False: # touch file or config
- open(path(), "w").close() # read chokes on missing file
- f = codecs.open(path(), "r", encoding)
- config.readfp(f, path())
- f.close()
- if value is not None:
- config.set(section, name, value)
- else:
- config.remove_option(section, name)
- f = codecs.open(path(), "w", encoding)
- config.write(f)
- f.close()
-
-def get_val(name, section="DEFAULT", default=None, encoding=None):
- """
- Get a value from the per-user config file
-
- :param name: The name of the value to get
- :section: The section that the name is in
- :return: The value, or None
- >>> get_val("junk") is None
- True
- >>> set_val("junk", "random")
- >>> get_val("junk")
- u'random'
- >>> set_val("junk", None)
- >>> get_val("junk") is None
- True
- """
- if os.path.exists(path()):
- if encoding == None:
- encoding = default_encoding
- config = ConfigParser.ConfigParser()
- f = codecs.open(path(), "r", encoding)
- config.readfp(f, path())
- f.close()
- try:
- return config.get(section, name)
- except ConfigParser.NoOptionError:
- return default
- else:
- return default
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/darcs.py b/interfaces/email/interactive/libbe/darcs.py
deleted file mode 100644
index 9115886..0000000
--- a/interfaces/email/interactive/libbe/darcs.py
+++ /dev/null
@@ -1,186 +0,0 @@
-# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Darcs backend.
-"""
-
-import codecs
-import os
-import re
-import sys
-try: # import core module, Python >= 2.5
- from xml.etree import ElementTree
-except ImportError: # look for non-core module
- from elementtree import ElementTree
-from xml.sax.saxutils import unescape
-import doctest
-import unittest
-
-import vcs
-
-
-def new():
- return Darcs()
-
-class Darcs(vcs.VCS):
- name="darcs"
- client="darcs"
- versioned=True
- def _vcs_version(self):
- status,output,error = self._u_invoke_client("--version")
- num_part = output.split(" ")[0]
- self.parsed_version = [int(i) for i in num_part.split(".")]
- return output
- def _vcs_detect(self, path):
- if self._u_search_parent_directories(path, "_darcs") != None :
- return True
- return False
- def _vcs_root(self, path):
- """Find the root of the deepest repository containing path."""
- # Assume that nothing funny is going on; in particular, that we aren't
- # dealing with a bare repo.
- if os.path.isdir(path) != True:
- path = os.path.dirname(path)
- darcs_dir = self._u_search_parent_directories(path, "_darcs")
- if darcs_dir == None:
- return None
- return os.path.dirname(darcs_dir)
- def _vcs_init(self, path):
- self._u_invoke_client("init", directory=path)
- def _vcs_get_user_id(self):
- # following http://darcs.net/manual/node4.html#SECTION00410030000000000000
- # as of June 29th, 2009
- if self.rootdir == None:
- return None
- darcs_dir = os.path.join(self.rootdir, "_darcs")
- if darcs_dir != None:
- for pref_file in ["author", "email"]:
- pref_path = os.path.join(darcs_dir, "prefs", pref_file)
- if os.path.exists(pref_path):
- return self.get_file_contents(pref_path)
- for env_variable in ["DARCS_EMAIL", "EMAIL"]:
- if env_variable in os.environ:
- return os.environ[env_variable]
- return None
- def _vcs_set_user_id(self, value):
- if self.rootdir == None:
- self.root(".")
- if self.rootdir == None:
- raise vcs.SettingIDnotSupported
- author_path = os.path.join(self.rootdir, "_darcs", "prefs", "author")
- f = codecs.open(author_path, "w", self.encoding)
- f.write(value)
- f.close()
- def _vcs_add(self, path):
- if os.path.isdir(path):
- return
- self._u_invoke_client("add", path)
- def _vcs_remove(self, path):
- if not os.path.isdir(self._u_abspath(path)):
- os.remove(os.path.join(self.rootdir, path)) # darcs notices removal
- def _vcs_update(self, path):
- pass # darcs notices changes
- def _vcs_get_file_contents(self, path, revision=None, binary=False):
- if revision == None:
- return vcs.VCS._vcs_get_file_contents(self, path, revision,
- binary=binary)
- else:
- if self.parsed_version[0] >= 2:
- status,output,error = self._u_invoke_client( \
- "show", "contents", "--patch", revision, path)
- return output
- else:
- # Darcs versions < 2.0.0pre2 lack the "show contents" command
-
- status,output,error = self._u_invoke_client( \
- "diff", "--unified", "--from-patch", revision, path)
- major_patch = output
- status,output,error = self._u_invoke_client( \
- "diff", "--unified", "--patch", revision, path)
- target_patch = output
-
- # "--output -" to be supported in GNU patch > 2.5.9
- # but that hasn't been released as of June 30th, 2009.
-
- # Rewrite path to status before the patch we want
- args=["patch", "--reverse", path]
- status,output,error = self._u_invoke(args, stdin=major_patch)
- # Now apply the patch we want
- args=["patch", path]
- status,output,error = self._u_invoke(args, stdin=target_patch)
-
- if os.path.exists(os.path.join(self.rootdir, path)) == True:
- contents = vcs.VCS._vcs_get_file_contents(self, path,
- binary=binary)
- else:
- contents = ""
-
- # Now restore path to it's current incarnation
- args=["patch", "--reverse", path]
- status,output,error = self._u_invoke(args, stdin=target_patch)
- args=["patch", path]
- status,output,error = self._u_invoke(args, stdin=major_patch)
- current_contents = vcs.VCS._vcs_get_file_contents(self, path,
- binary=binary)
- return contents
- def _vcs_duplicate_repo(self, directory, revision=None):
- if revision==None:
- vcs.VCS._vcs_duplicate_repo(self, directory, revision)
- else:
- self._u_invoke_client("put", "--to-patch", revision, directory)
- def _vcs_commit(self, commitfile, allow_empty=False):
- id = self.get_user_id()
- if '@' not in id:
- id = "%s <%s@invalid.com>" % (id, id)
- args = ['record', '--all', '--author', id, '--logfile', commitfile]
- status,output,error = self._u_invoke_client(*args)
- empty_strings = ["No changes!"]
- if self._u_any_in_string(empty_strings, output) == True:
- if allow_empty == False:
- raise vcs.EmptyCommit()
- # note that darcs does _not_ make an empty revision.
- # this returns the last non-empty revision id...
- revision = self._vcs_revision_id(-1)
- else:
- revline = re.compile("Finished recording patch '(.*)'")
- match = revline.search(output)
- assert match != None, output+error
- assert len(match.groups()) == 1
- revision = match.groups()[0]
- return revision
- def _vcs_revision_id(self, index):
- status,output,error = self._u_invoke_client("changes", "--xml")
- revisions = []
- xml_str = output.encode("unicode_escape").replace(r"\n", "\n")
- element = ElementTree.XML(xml_str)
- assert element.tag == "changelog", element.tag
- for patch in element.getchildren():
- assert patch.tag == "patch", patch.tag
- for child in patch.getchildren():
- if child.tag == "name":
- text = unescape(unicode(child.text).decode("unicode_escape").strip())
- revisions.append(text)
- revisions.reverse()
- try:
- return revisions[index]
- except IndexError:
- return None
-
-vcs.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__])
-
-unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/interfaces/email/interactive/libbe/diff.py b/interfaces/email/interactive/libbe/diff.py
deleted file mode 100644
index 9253a23..0000000
--- a/interfaces/email/interactive/libbe/diff.py
+++ /dev/null
@@ -1,419 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""Compare two bug trees."""
-
-import difflib
-import doctest
-
-from libbe import bugdir, bug, settings_object, tree
-from libbe.utility import time_to_str
-
-
-class DiffTree (tree.Tree):
- """
- A tree holding difference data for easy report generation.
- >>> bugdir = DiffTree("bugdir")
- >>> bdsettings = DiffTree("settings", data="target: None -> 1.0")
- >>> bugdir.append(bdsettings)
- >>> bugs = DiffTree("bugs", "bug-count: 5 -> 6")
- >>> bugdir.append(bugs)
- >>> new = DiffTree("new", "new bugs: ABC, DEF")
- >>> bugs.append(new)
- >>> rem = DiffTree("rem", "removed bugs: RST, UVW")
- >>> bugs.append(rem)
- >>> print bugdir.report_string()
- target: None -> 1.0
- bug-count: 5 -> 6
- new bugs: ABC, DEF
- removed bugs: RST, UVW
- >>> print "\\n".join(bugdir.paths())
- bugdir
- bugdir/settings
- bugdir/bugs
- bugdir/bugs/new
- bugdir/bugs/rem
- >>> bugdir.child_by_path("/") == bugdir
- True
- >>> bugdir.child_by_path("/bugs") == bugs
- True
- >>> bugdir.child_by_path("/bugs/rem") == rem
- True
- >>> bugdir.child_by_path("bugdir") == bugdir
- True
- >>> bugdir.child_by_path("bugdir/") == bugdir
- True
- >>> bugdir.child_by_path("bugdir/bugs") == bugs
- True
- >>> bugdir.child_by_path("/bugs").masked = True
- >>> print bugdir.report_string()
- target: None -> 1.0
- """
- def __init__(self, name, data=None, data_part_fn=str,
- requires_children=False, masked=False):
- tree.Tree.__init__(self)
- self.name = name
- self.data = data
- self.data_part_fn = data_part_fn
- self.requires_children = requires_children
- self.masked = masked
- def paths(self, parent_path=None):
- paths = []
- if parent_path == None:
- path = self.name
- else:
- path = "%s/%s" % (parent_path, self.name)
- paths.append(path)
- for child in self:
- paths.extend(child.paths(path))
- return paths
- def child_by_path(self, path):
- if hasattr(path, "split"): # convert string path to a list of names
- names = path.split("/")
- if names[0] == "":
- names[0] = self.name # replace root with self
- if len(names) > 1 and names[-1] == "":
- names = names[:-1] # strip empty tail
- else: # it was already an array
- names = path
- assert len(names) > 0, path
- if names[0] == self.name:
- if len(names) == 1:
- return self
- for child in self:
- if names[1] == child.name:
- return child.child_by_path(names[1:])
- if len(names) == 1:
- raise KeyError, "%s doesn't match '%s'" % (names, self.name)
- raise KeyError, "%s points to child not in %s" % (names, [c.name for c in self])
- def report_string(self):
- return "\n".join(self.report())
- def report(self, root=None, parent=None, depth=0):
- if root == None:
- root = self.make_root()
- if self.masked == True:
- return None
- data_part = self.data_part(depth)
- if self.requires_children == True and len(self) == 0:
- pass
- else:
- self.join(root, parent, data_part)
- if data_part != None:
- depth += 1
- for child in self:
- child.report(root, self, depth)
- return root
- def make_root(self):
- return []
- def join(self, root, parent, data_part):
- if data_part != None:
- root.append(data_part)
- def data_part(self, depth, indent=True):
- if self.data == None:
- return None
- if hasattr(self, "_cached_data_part"):
- return self._cached_data_part
- data_part = self.data_part_fn(self.data)
- if indent == True:
- data_part_lines = data_part.splitlines()
- indent = " "*(depth)
- line_sep = "\n"+indent
- data_part = indent+line_sep.join(data_part_lines)
- self._cached_data_part = data_part
- return data_part
-
-class Diff (object):
- """
- Difference tree generator for BugDirs.
- >>> import copy
- >>> bd = bugdir.SimpleBugDir(sync_with_disk=False)
- >>> bd.user_id = "John Doe <j@doe.com>"
- >>> bd_new = copy.deepcopy(bd)
- >>> bd_new.target = "1.0"
- >>> a = bd_new.bug_from_uuid("a")
- >>> rep = a.comment_root.new_reply("I'm closing this bug")
- >>> rep.uuid = "acom"
- >>> rep.date = "Thu, 01 Jan 1970 00:00:00 +0000"
- >>> a.status = "closed"
- >>> b = bd_new.bug_from_uuid("b")
- >>> bd_new.remove_bug(b)
- >>> c = bd_new.new_bug("c", "Bug C")
- >>> d = Diff(bd, bd_new)
- >>> r = d.report_tree()
- >>> print "\\n".join(r.paths())
- bugdir
- bugdir/settings
- bugdir/bugs
- bugdir/bugs/new
- bugdir/bugs/new/c
- bugdir/bugs/rem
- bugdir/bugs/rem/b
- bugdir/bugs/mod
- bugdir/bugs/mod/a
- bugdir/bugs/mod/a/settings
- bugdir/bugs/mod/a/comments
- bugdir/bugs/mod/a/comments/new
- bugdir/bugs/mod/a/comments/new/acom
- bugdir/bugs/mod/a/comments/rem
- bugdir/bugs/mod/a/comments/mod
- >>> print r.report_string()
- Changed bug directory settings:
- target: None -> 1.0
- New bugs:
- c:om: Bug C
- Removed bugs:
- b:cm: Bug B
- Modified bugs:
- a:cm: Bug A
- Changed bug settings:
- status: open -> closed
- New comments:
- from John Doe <j@doe.com> on Thu, 01 Jan 1970 00:00:00 +0000
- I'm closing this bug...
- >>> bd.cleanup()
- """
- def __init__(self, old_bugdir, new_bugdir):
- self.old_bugdir = old_bugdir
- self.new_bugdir = new_bugdir
-
- # data assembly methods
-
- def _changed_bugs(self):
- """
- Search for differences in all bugs between .old_bugdir and
- .new_bugdir. Returns
- (added_bugs, modified_bugs, removed_bugs)
- where added_bugs and removed_bugs are lists of added and
- removed bugs respectively. modified_bugs is a list of
- (old_bug,new_bug) pairs.
- """
- if hasattr(self, "__changed_bugs"):
- return self.__changed_bugs
- added = []
- removed = []
- modified = []
- for uuid in self.new_bugdir.list_uuids():
- new_bug = self.new_bugdir.bug_from_uuid(uuid)
- try:
- old_bug = self.old_bugdir.bug_from_uuid(uuid)
- except KeyError:
- added.append(new_bug)
- else:
- if old_bug.sync_with_disk == True:
- old_bug.load_comments()
- if new_bug.sync_with_disk == True:
- new_bug.load_comments()
- if old_bug != new_bug:
- modified.append((old_bug, new_bug))
- for uuid in self.old_bugdir.list_uuids():
- if not self.new_bugdir.has_bug(uuid):
- old_bug = self.old_bugdir.bug_from_uuid(uuid)
- removed.append(old_bug)
- added.sort()
- removed.sort()
- modified.sort(self._bug_modified_cmp)
- self.__changed_bugs = (added, modified, removed)
- return self.__changed_bugs
- def _bug_modified_cmp(self, left, right):
- return cmp(left[1], right[1])
- def _changed_comments(self, old, new):
- """
- Search for differences in all loaded comments between the bugs
- old and new. Returns
- (added_comments, modified_comments, removed_comments)
- analogous to ._changed_bugs.
- """
- if hasattr(self, "__changed_comments"):
- if new.uuid in self.__changed_comments:
- return self.__changed_comments[new.uuid]
- else:
- self.__changed_comments = {}
- added = []
- removed = []
- modified = []
- old.comment_root.sort(key=lambda comm : comm.time)
- new.comment_root.sort(key=lambda comm : comm.time)
- old_comment_ids = [c.uuid for c in old.comments()]
- new_comment_ids = [c.uuid for c in new.comments()]
- for uuid in new_comment_ids:
- new_comment = new.comment_from_uuid(uuid)
- try:
- old_comment = old.comment_from_uuid(uuid)
- except KeyError:
- added.append(new_comment)
- else:
- if old_comment != new_comment:
- modified.append((old_comment, new_comment))
- for uuid in old_comment_ids:
- if uuid not in new_comment_ids:
- new_comment = new.comment_from_uuid(uuid)
- removed.append(new_comment)
- self.__changed_comments[new.uuid] = (added, modified, removed)
- return self.__changed_comments[new.uuid]
- def _attribute_changes(self, old, new, attributes):
- """
- Take two objects old and new, and compare the value of *.attr
- for attr in the list attribute names. Returns a list of
- (attr_name, old_value, new_value)
- tuples.
- """
- change_list = []
- for attr in attributes:
- old_value = getattr(old, attr)
- new_value = getattr(new, attr)
- if old_value != new_value:
- change_list.append((attr, old_value, new_value))
- if len(change_list) >= 0:
- return change_list
- return None
- def _settings_properties_attribute_changes(self, old, new,
- hidden_properties=[]):
- properties = sorted(new.settings_properties)
- for p in hidden_properties:
- properties.remove(p)
- attributes = [settings_object.setting_name_to_attr_name(None, p)
- for p in properties]
- return self._attribute_changes(old, new, attributes)
- def _bugdir_attribute_changes(self):
- return self._settings_properties_attribute_changes( \
- self.old_bugdir, self.new_bugdir,
- ["vcs_name"]) # tweaked by bugdir.duplicate_bugdir
- def _bug_attribute_changes(self, old, new):
- return self._settings_properties_attribute_changes(old, new)
- def _comment_attribute_changes(self, old, new):
- return self._settings_properties_attribute_changes(old, new)
-
- # report generation methods
-
- def report_tree(self, diff_tree=DiffTree):
- """
- Pretty bare to make it easy to adjust to specific cases. You
- can pass in a DiffTree subclass via diff_tree to override the
- default report assembly process.
- """
- if hasattr(self, "__report_tree"):
- return self.__report_tree
- bugdir_settings = sorted(self.new_bugdir.settings_properties)
- bugdir_settings.remove("vcs_name") # tweaked by bugdir.duplicate_bugdir
- root = diff_tree("bugdir")
- bugdir_attribute_changes = self._bugdir_attribute_changes()
- if len(bugdir_attribute_changes) > 0:
- bugdir = diff_tree("settings", bugdir_attribute_changes,
- self.bugdir_attribute_change_string)
- root.append(bugdir)
- bug_root = diff_tree("bugs")
- root.append(bug_root)
- add,mod,rem = self._changed_bugs()
- bnew = diff_tree("new", "New bugs:", requires_children=True)
- bug_root.append(bnew)
- for bug in add:
- b = diff_tree(bug.uuid, bug, self.bug_add_string)
- bnew.append(b)
- brem = diff_tree("rem", "Removed bugs:", requires_children=True)
- bug_root.append(brem)
- for bug in rem:
- b = diff_tree(bug.uuid, bug, self.bug_rem_string)
- brem.append(b)
- bmod = diff_tree("mod", "Modified bugs:", requires_children=True)
- bug_root.append(bmod)
- for old,new in mod:
- b = diff_tree(new.uuid, (old,new), self.bug_mod_string)
- bmod.append(b)
- bug_attribute_changes = self._bug_attribute_changes(old, new)
- if len(bug_attribute_changes) > 0:
- bset = diff_tree("settings", bug_attribute_changes,
- self.bug_attribute_change_string)
- b.append(bset)
- if old.summary != new.summary:
- data = (old.summary, new.summary)
- bsum = diff_tree("summary", data, self.bug_summary_change_string)
- b.append(bsum)
- cr = diff_tree("comments")
- b.append(cr)
- a,m,d = self._changed_comments(old, new)
- cnew = diff_tree("new", "New comments:", requires_children=True)
- for comment in a:
- c = diff_tree(comment.uuid, comment, self.comment_add_string)
- cnew.append(c)
- crem = diff_tree("rem", "Removed comments:",requires_children=True)
- for comment in d:
- c = diff_tree(comment.uuid, comment, self.comment_rem_string)
- crem.append(c)
- cmod = diff_tree("mod","Modified comments:",requires_children=True)
- for o,n in m:
- c = diff_tree(n.uuid, (o,n), self.comment_mod_string)
- cmod.append(c)
- comm_attribute_changes = self._comment_attribute_changes(o, n)
- if len(comm_attribute_changes) > 0:
- cset = diff_tree("settings", comm_attribute_changes,
- self.comment_attribute_change_string)
- if o.body != n.body:
- data = (o.body, n.body)
- cbody = diff_tree("cbody", data,
- self.comment_body_change_string)
- c.append(cbody)
- cr.extend([cnew, crem, cmod])
- self.__report_tree = root
- return self.__report_tree
-
- # change data -> string methods.
- # Feel free to play with these in subclasses.
-
- def attribute_change_string(self, attribute_changes, indent=0):
- indent_string = " "*indent
- change_strings = [u"%s: %s -> %s" % f for f in attribute_changes]
- for i,change_string in enumerate(change_strings):
- change_strings[i] = indent_string+change_string
- return u"\n".join(change_strings)
- def bugdir_attribute_change_string(self, attribute_changes):
- return "Changed bug directory settings:\n%s" % \
- self.attribute_change_string(attribute_changes, indent=1)
- def bug_attribute_change_string(self, attribute_changes):
- return "Changed bug settings:\n%s" % \
- self.attribute_change_string(attribute_changes, indent=1)
- def comment_attribute_change_string(self, attribute_changes):
- return "Changed comment settings:\n%s" % \
- self.attribute_change_string(attribute_changes, indent=1)
- def bug_add_string(self, bug):
- return bug.string(shortlist=True)
- def bug_rem_string(self, bug):
- return bug.string(shortlist=True)
- def bug_mod_string(self, bugs):
- old_bug,new_bug = bugs
- return new_bug.string(shortlist=True)
- def bug_summary_change_string(self, summaries):
- old_summary,new_summary = summaries
- return "summary changed:\n %s\n %s" % (old_summary, new_summary)
- def _comment_summary_string(self, comment):
- return "from %s on %s" % (comment.author, time_to_str(comment.time))
- def comment_add_string(self, comment):
- summary = self._comment_summary_string(comment)
- first_line = comment.body.splitlines()[0]
- return "%s\n %s..." % (summary, first_line)
- def comment_rem_string(self, comment):
- summary = self._comment_summary_string(comment)
- first_line = comment.body.splitlines()[0]
- return "%s\n %s..." % (summary, first_line)
- def comment_mod_string(self, comments):
- old_comment,new_comment = comments
- return self._comment_summary_string(new_comment)
- def comment_body_change_string(self, bodies):
- old_body,new_body = bodies
- return difflib.unified_diff(old_body, new_body)
-
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/editor.py b/interfaces/email/interactive/libbe/editor.py
deleted file mode 100644
index ec41006..0000000
--- a/interfaces/email/interactive/libbe/editor.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# Bugs Everywhere, a distributed bugtracker
-# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Define editor_string(), a function that invokes an editor to accept
-user-produced text as a string.
-"""
-
-import codecs
-import locale
-import os
-import sys
-import tempfile
-import doctest
-
-
-default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
-
-comment_marker = u"== Anything below this line will be ignored\n"
-
-class CantFindEditor(Exception):
- def __init__(self):
- Exception.__init__(self, "Can't find editor to get string from")
-
-def editor_string(comment=None, encoding=None):
- """Invokes the editor, and returns the user-produced text as a string
-
- >>> if "EDITOR" in os.environ:
- ... del os.environ["EDITOR"]
- >>> if "VISUAL" in os.environ:
- ... del os.environ["VISUAL"]
- >>> editor_string()
- Traceback (most recent call last):
- CantFindEditor: Can't find editor to get string from
- >>> os.environ["EDITOR"] = "echo bar > "
- >>> editor_string()
- u'bar\\n'
- >>> os.environ["VISUAL"] = "echo baz > "
- >>> editor_string()
- u'baz\\n'
- >>> del os.environ["EDITOR"]
- >>> del os.environ["VISUAL"]
- """
- if encoding == None:
- encoding = default_encoding
- for name in ('VISUAL', 'EDITOR'):
- try:
- editor = os.environ[name]
- break
- except KeyError:
- pass
- else:
- raise CantFindEditor()
- fhandle, fname = tempfile.mkstemp()
- try:
- if comment is not None:
- cstring = u'\n'+comment_string(comment)
- os.write(fhandle, cstring.encode(encoding))
- os.close(fhandle)
- oldmtime = os.path.getmtime(fname)
- os.system("%s %s" % (editor, fname))
- f = codecs.open(fname, "r", encoding)
- output = trimmed_string(f.read())
- f.close()
- if output.rstrip('\n') == "":
- output = None
- finally:
- os.unlink(fname)
- return output
-
-
-def comment_string(comment):
- """
- >>> comment_string('hello') == comment_marker+"hello"
- True
- """
- return comment_marker + comment
-
-
-def trimmed_string(instring):
- """
- >>> trimmed_string("hello\\n"+comment_marker)
- u'hello\\n'
- >>> trimmed_string("hi!\\n" + comment_string('Booga'))
- u'hi!\\n'
- """
- out = []
- for line in instring.splitlines(True):
- if line.startswith(comment_marker):
- break
- out.append(line)
- return ''.join(out)
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/encoding.py b/interfaces/email/interactive/libbe/encoding.py
deleted file mode 100644
index fd513b5..0000000
--- a/interfaces/email/interactive/libbe/encoding.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# Bugs Everywhere, a distributed bugtracker
-# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Support input/output/filesystem encodings (e.g. UTF-8).
-"""
-
-import codecs
-import locale
-import sys
-import doctest
-
-
-ENCODING = None # override get_encoding() output by setting this
-
-def get_encoding():
- """
- Guess a useful input/output/filesystem encoding... Maybe we need
- seperate encodings for input/output and filesystem? Hmm...
- """
- if ENCODING != None:
- return ENCODING
- encoding = locale.getpreferredencoding() or sys.getdefaultencoding()
- if sys.platform != 'win32' or sys.version_info[:2] > (2, 3):
- encoding = locale.getlocale(locale.LC_TIME)[1] or encoding
- # Python 2.3 on windows doesn't know about 'XYZ' alias for 'cpXYZ'
- return encoding
-
-def known_encoding(encoding):
- """
- >>> known_encoding("highly-unlikely-encoding")
- False
- >>> known_encoding(get_encoding())
- True
- """
- try:
- codecs.lookup(encoding)
- return True
- except LookupError:
- return False
-
-def set_IO_stream_encodings(encoding):
- sys.stdin = codecs.getreader(encoding)(sys.__stdin__)
- sys.stdout = codecs.getwriter(encoding)(sys.__stdout__)
- sys.stderr = codecs.getwriter(encoding)(sys.__stderr__)
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/git.py b/interfaces/email/interactive/libbe/git.py
deleted file mode 100644
index 628f9b9..0000000
--- a/interfaces/email/interactive/libbe/git.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Copyright (C) 2008-2009 Ben Finney <ben+python@benfinney.id.au>
-# Chris Ball <cjb@laptop.org>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Git backend.
-"""
-
-import os
-import re
-import sys
-import unittest
-import doctest
-
-import vcs
-
-
-def new():
- return Git()
-
-class Git(vcs.VCS):
- name="git"
- client="git"
- versioned=True
- def _vcs_version(self):
- status,output,error = self._u_invoke_client("--version")
- return output
- def _vcs_detect(self, path):
- if self._u_search_parent_directories(path, ".git") != None :
- return True
- return False
- def _vcs_root(self, path):
- """Find the root of the deepest repository containing path."""
- # Assume that nothing funny is going on; in particular, that we aren't
- # dealing with a bare repo.
- if os.path.isdir(path) != True:
- path = os.path.dirname(path)
- status,output,error = self._u_invoke_client("rev-parse", "--git-dir",
- directory=path)
- gitdir = os.path.join(path, output.rstrip('\n'))
- dirname = os.path.abspath(os.path.dirname(gitdir))
- return dirname
- def _vcs_init(self, path):
- self._u_invoke_client("init", directory=path)
- def _vcs_get_user_id(self):
- status,output,error = \
- self._u_invoke_client("config", "user.name", expect=(0,1))
- if status == 0:
- name = output.rstrip('\n')
- else:
- name = ""
- status,output,error = \
- self._u_invoke_client("config", "user.email", expect=(0,1))
- if status == 0:
- email = output.rstrip('\n')
- else:
- email = ""
- if name != "" or email != "": # got something!
- # guess missing info, if necessary
- if name == "":
- name = self._u_get_fallback_username()
- if email == "":
- email = self._u_get_fallback_email()
- return self._u_create_id(name, email)
- return None # Git has no infomation
- def _vcs_set_user_id(self, value):
- name,email = self._u_parse_id(value)
- if email != None:
- self._u_invoke_client("config", "user.email", email)
- self._u_invoke_client("config", "user.name", name)
- def _vcs_add(self, path):
- if os.path.isdir(path):
- return
- self._u_invoke_client("add", path)
- def _vcs_remove(self, path):
- if not os.path.isdir(self._u_abspath(path)):
- self._u_invoke_client("rm", "-f", path)
- def _vcs_update(self, path):
- self._vcs_add(path)
- def _vcs_get_file_contents(self, path, revision=None, binary=False):
- if revision == None:
- return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
- else:
- arg = "%s:%s" % (revision,path)
- status,output,error = self._u_invoke_client("show", arg)
- return output
- def _vcs_duplicate_repo(self, directory, revision=None):
- if revision==None:
- vcs.VCS._vcs_duplicate_repo(self, directory, revision)
- else:
- #self._u_invoke_client("archive", revision, directory) # makes tarball
- self._u_invoke_client("clone", "--no-checkout",".",directory)
- self._u_invoke_client("checkout", revision, directory=directory)
- def _vcs_commit(self, commitfile, allow_empty=False):
- args = ['commit', '--all', '--file', commitfile]
- if allow_empty == True:
- args.append("--allow-empty")
- status,output,error = self._u_invoke_client(*args)
- else:
- kwargs = {"expect":(0,1)}
- status,output,error = self._u_invoke_client(*args, **kwargs)
- strings = ["nothing to commit",
- "nothing added to commit"]
- if self._u_any_in_string(strings, output) == True:
- raise vcs.EmptyCommit()
- revision = None
- revline = re.compile("(.*) (.*)[:\]] (.*)")
- match = revline.search(output)
- assert match != None, output+error
- assert len(match.groups()) == 3
- revision = match.groups()[1]
- full_revision = self._vcs_revision_id(-1)
- assert full_revision.startswith(revision), \
- "Mismatched revisions:\n%s\n%s" % (revision, full_revision)
- return full_revision
- def _vcs_revision_id(self, index):
- args = ["rev-list", "--first-parent", "--reverse", "HEAD"]
- kwargs = {"expect":(0,128)}
- status,output,error = self._u_invoke_client(*args, **kwargs)
- if status == 128:
- if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
- return None
- raise vcs.CommandError(args, status, stdout="", stderr=error)
- commits = output.splitlines()
- try:
- return commits[index]
- except IndexError:
- return None
-
-
-vcs.make_vcs_testcase_subclasses(Git, sys.modules[__name__])
-
-unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/interfaces/email/interactive/libbe/hg.py b/interfaces/email/interactive/libbe/hg.py
deleted file mode 100644
index 7cd4c2f..0000000
--- a/interfaces/email/interactive/libbe/hg.py
+++ /dev/null
@@ -1,103 +0,0 @@
-# Copyright (C) 2007-2009 Aaron Bentley and Panometrics, Inc.
-# Ben Finney <ben+python@benfinney.id.au>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Mercurial (hg) backend.
-"""
-
-import os
-import re
-import sys
-import unittest
-import doctest
-
-import vcs
-
-
-def new():
- return Hg()
-
-class Hg(vcs.VCS):
- name="hg"
- client="hg"
- versioned=True
- def _vcs_version(self):
- status,output,error = self._u_invoke_client("--version")
- return output
- def _vcs_detect(self, path):
- """Detect whether a directory is revision-controlled using Mercurial"""
- if self._u_search_parent_directories(path, ".hg") != None:
- return True
- return False
- def _vcs_root(self, path):
- status,output,error = self._u_invoke_client("root", directory=path)
- return output.rstrip('\n')
- def _vcs_init(self, path):
- self._u_invoke_client("init", directory=path)
- def _vcs_get_user_id(self):
- status,output,error = self._u_invoke_client("showconfig","ui.username")
- return output.rstrip('\n')
- def _vcs_set_user_id(self, value):
- """
- Supported by the Config Extension, but that is not part of
- standard Mercurial.
- http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
- """
- raise vcs.SettingIDnotSupported
- def _vcs_add(self, path):
- self._u_invoke_client("add", path)
- def _vcs_remove(self, path):
- self._u_invoke_client("rm", "--force", path)
- def _vcs_update(self, path):
- pass
- def _vcs_get_file_contents(self, path, revision=None, binary=False):
- if revision == None:
- return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
- else:
- status,output,error = \
- self._u_invoke_client("cat","-r",revision,path)
- return output
- def _vcs_duplicate_repo(self, directory, revision=None):
- if revision == None:
- return vcs.VCS._vcs_duplicate_repo(self, directory, revision)
- else:
- self._u_invoke_client("archive", "--rev", revision, directory)
- def _vcs_commit(self, commitfile, allow_empty=False):
- args = ['commit', '--logfile', commitfile]
- status,output,error = self._u_invoke_client(*args)
- if allow_empty == False:
- strings = ["nothing changed"]
- if self._u_any_in_string(strings, output) == True:
- raise vcs.EmptyCommit()
- return self._vcs_revision_id(-1)
- def _vcs_revision_id(self, index, style="id"):
- args = ["identify", "--rev", str(int(index)), "--%s" % style]
- kwargs = {"expect": (0,255)}
- status,output,error = self._u_invoke_client(*args, **kwargs)
- if status == 0:
- id = output.strip()
- if id == '000000000000':
- return None # before initial commit.
- return id
- return None
-
-
-vcs.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
-
-unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/interfaces/email/interactive/libbe/mapfile.py b/interfaces/email/interactive/libbe/mapfile.py
deleted file mode 100644
index 4d69601..0000000
--- a/interfaces/email/interactive/libbe/mapfile.py
+++ /dev/null
@@ -1,116 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Provide a means of saving and loading dictionaries of parameters. The
-saved "mapfiles" should be clear, flat-text files, and allow easy merging of
-independent/conflicting changes.
-"""
-
-import errno
-import os.path
-import yaml
-import doctest
-
-
-class IllegalKey(Exception):
- def __init__(self, key):
- Exception.__init__(self, 'Illegal key "%s"' % key)
- self.key = key
-
-class IllegalValue(Exception):
- def __init__(self, value):
- Exception.__init__(self, 'Illegal value "%s"' % value)
- self.value = value
-
-def generate(map):
- """Generate a YAML mapfile content string.
- >>> generate({"q":"p"})
- 'q: p\\n\\n'
- >>> generate({"q":u"Fran\u00e7ais"})
- 'q: Fran\\xc3\\xa7ais\\n\\n'
- >>> generate({"q":u"hello"})
- 'q: hello\\n\\n'
- >>> generate({"q=":"p"})
- Traceback (most recent call last):
- IllegalKey: Illegal key "q="
- >>> generate({"q:":"p"})
- Traceback (most recent call last):
- IllegalKey: Illegal key "q:"
- >>> generate({"q\\n":"p"})
- Traceback (most recent call last):
- IllegalKey: Illegal key "q\\n"
- >>> generate({"":"p"})
- Traceback (most recent call last):
- IllegalKey: Illegal key ""
- >>> generate({">q":"p"})
- Traceback (most recent call last):
- IllegalKey: Illegal key ">q"
- >>> generate({"q":"p\\n"})
- Traceback (most recent call last):
- IllegalValue: Illegal value "p\\n"
- """
- keys = map.keys()
- keys.sort()
- for key in keys:
- try:
- assert not key.startswith('>')
- assert('\n' not in key)
- assert('=' not in key)
- assert(':' not in key)
- assert(len(key) > 0)
- except AssertionError:
- raise IllegalKey(unicode(key).encode('unicode_escape'))
- if "\n" in map[key]:
- raise IllegalValue(unicode(map[key]).encode('unicode_escape'))
-
- lines = []
- for key in keys:
- lines.append(yaml.safe_dump({key: map[key]},
- default_flow_style=False,
- allow_unicode=True))
- lines.append("")
- return '\n'.join(lines)
-
-def parse(contents):
- """
- Parse a YAML mapfile string.
- >>> parse('q: p\\n\\n')['q']
- 'p'
- >>> parse('q: \\'p\\'\\n\\n')['q']
- 'p'
- >>> contents = generate({"a":"b", "c":"d", "e":"f"})
- >>> dict = parse(contents)
- >>> dict["a"]
- 'b'
- >>> dict["c"]
- 'd'
- >>> dict["e"]
- 'f'
- """
- return yaml.load(contents) or {}
-
-def map_save(vcs, path, map, allow_no_vcs=False):
- """Save the map as a mapfile to the specified path"""
- contents = generate(map)
- vcs.set_file_contents(path, contents, allow_no_vcs)
-
-def map_load(vcs, path, allow_no_vcs=False):
- contents = vcs.get_file_contents(path, allow_no_vcs=allow_no_vcs)
- return parse(contents)
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/plugin.py b/interfaces/email/interactive/libbe/plugin.py
deleted file mode 100644
index d593d69..0000000
--- a/interfaces/email/interactive/libbe/plugin.py
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# Marien Zwart <marienz@gentoo.org>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Allow simple listing and loading of the various becommands and libbe
-submodules (i.e. "plugins").
-"""
-
-import os
-import os.path
-import sys
-import doctest
-
-def my_import(mod_name):
- module = __import__(mod_name)
- components = mod_name.split('.')
- for comp in components[1:]:
- module = getattr(module, comp)
- return module
-
-def iter_plugins(prefix):
- """
- >>> "list" in [n for n,m in iter_plugins("becommands")]
- True
- >>> "plugin" in [n for n,m in iter_plugins("libbe")]
- True
- """
- modfiles = os.listdir(os.path.join(plugin_path, prefix))
- modfiles.sort()
- for modfile in modfiles:
- if modfile.startswith('.'):
- continue # the occasional emacs temporary file
- if modfile.endswith(".py") and modfile != "__init__.py":
- yield modfile[:-3], my_import(prefix+"."+modfile[:-3])
-
-
-def get_plugin(prefix, name):
- """
- >>> get_plugin("becommands", "asdf") is None
- True
- >>> q = repr(get_plugin("becommands", "list"))
- >>> q.startswith("<module 'becommands.list' from ")
- True
- """
- dirprefix = os.path.join(*prefix.split('.'))
- command_path = os.path.join(plugin_path, dirprefix, name+".py")
- if os.path.isfile(command_path):
- return my_import(prefix + "." + name)
- return None
-
-plugin_path = os.path.realpath(os.path.dirname(os.path.dirname(__file__)))
-if plugin_path not in sys.path:
- sys.path.append(plugin_path)
-
-suite = doctest.DocTestSuite()
-
-def _test():
- import doctest
- doctest.testmod()
-
-if __name__ == "__main__":
- _test()
diff --git a/interfaces/email/interactive/libbe/properties.py b/interfaces/email/interactive/libbe/properties.py
deleted file mode 100644
index 09dd20e..0000000
--- a/interfaces/email/interactive/libbe/properties.py
+++ /dev/null
@@ -1,638 +0,0 @@
-# Bugs Everywhere - a distributed bugtracker
-# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-This module provides a series of useful decorators for defining
-various types of properties. For example usage, consider the
-unittests at the end of the module.
-
-See
- http://www.python.org/dev/peps/pep-0318/
-and
- http://www.phyast.pitt.edu/~micheles/python/documentation.html
-for more information on decorators.
-"""
-
-import copy
-import types
-import unittest
-
-
-class ValueCheckError (ValueError):
- def __init__(self, name, value, allowed):
- action = "in" # some list of allowed values
- if type(allowed) == types.FunctionType:
- action = "allowed by" # some allowed-value check function
- msg = "%s not %s %s for %s" % (value, action, allowed, name)
- ValueError.__init__(self, msg)
- self.name = name
- self.value = value
- self.allowed = allowed
-
-def Property(funcs):
- """
- End a chain of property decorators, returning a property.
- """
- args = {}
- args["fget"] = funcs.get("fget", None)
- args["fset"] = funcs.get("fset", None)
- args["fdel"] = funcs.get("fdel", None)
- args["doc"] = funcs.get("doc", None)
-
- #print "Creating a property with"
- #for key, val in args.items(): print key, value
- return property(**args)
-
-def doc_property(doc=None):
- """
- Add a docstring to a chain of property decorators.
- """
- def decorator(funcs=None):
- """
- Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...}
- or a function fn() returning such a dict.
- """
- if hasattr(funcs, "__call__"):
- funcs = funcs() # convert from function-arg to dict
- funcs["doc"] = doc
- return funcs
- return decorator
-
-def local_property(name, null=None, mutable_null=False):
- """
- Define get/set access to per-parent-instance local storage. Uses
- ._<name>_value to store the value for a particular owner instance.
- If the ._<name>_value attribute does not exist, returns null.
-
- If mutable_null == True, we only release deepcopies of the null to
- the outside world.
- """
- def decorator(funcs):
- if hasattr(funcs, "__call__"):
- funcs = funcs()
- fget = funcs.get("fget", None)
- fset = funcs.get("fset", None)
- def _fget(self):
- if fget is not None:
- fget(self)
- if mutable_null == True:
- ret_null = copy.deepcopy(null)
- else:
- ret_null = null
- value = getattr(self, "_%s_value" % name, ret_null)
- return value
- def _fset(self, value):
- setattr(self, "_%s_value" % name, value)
- if fset is not None:
- fset(self, value)
- funcs["fget"] = _fget
- funcs["fset"] = _fset
- funcs["name"] = name
- return funcs
- return decorator
-
-def settings_property(name, null=None):
- """
- Similar to local_property, except where local_property stores the
- value in instance._<name>_value, settings_property stores the
- value in instance.settings[name].
- """
- def decorator(funcs):
- if hasattr(funcs, "__call__"):
- funcs = funcs()
- fget = funcs.get("fget", None)
- fset = funcs.get("fset", None)
- def _fget(self):
- if fget is not None:
- fget(self)
- value = self.settings.get(name, null)
- return value
- def _fset(self, value):
- self.settings[name] = value
- if fset is not None:
- fset(self, value)
- funcs["fget"] = _fget
- funcs["fset"] = _fset
- funcs["name"] = name
- return funcs
- return decorator
-
-
-# Allow comparison and caching with _original_ values for mutables,
-# since
-#
-# >>> a = []
-# >>> b = a
-# >>> b.append(1)
-# >>> a
-# [1]
-# >>> a==b
-# True
-def _hash_mutable_value(value):
- return repr(value)
-def _init_mutable_property_cache(self):
- if not hasattr(self, "_mutable_property_cache_hash"):
- # first call to _fget for any mutable property
- self._mutable_property_cache_hash = {}
- self._mutable_property_cache_copy = {}
-def _set_cached_mutable_property(self, cacher_name, property_name, value):
- _init_mutable_property_cache(self)
- self._mutable_property_cache_hash[(cacher_name, property_name)] = \
- _hash_mutable_value(value)
- self._mutable_property_cache_copy[(cacher_name, property_name)] = \
- copy.deepcopy(value)
-def _get_cached_mutable_property(self, cacher_name, property_name, default=None):
- _init_mutable_property_cache(self)
- if (cacher_name, property_name) not in self._mutable_property_cache_copy:
- return default
- return self._mutable_property_cache_copy[(cacher_name, property_name)]
-def _cmp_cached_mutable_property(self, cacher_name, property_name, value, default=None):
- _init_mutable_property_cache(self)
- if (cacher_name, property_name) not in self._mutable_property_cache_hash:
- _set_cached_mutable_property(self, cacher_name, property_name, default)
- old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)]
- return cmp(_hash_mutable_value(value), old_hash)
-
-
-def defaulting_property(default=None, null=None,
- mutable_default=False):
- """
- Define a default value for get access to a property.
- If the stored value is null, then default is returned.
-
- If mutable_default == True, we only release deepcopies of the
- default to the outside world.
-
- null should never escape to the outside world, so don't worry
- about it being a mutable.
- """
- def decorator(funcs):
- if hasattr(funcs, "__call__"):
- funcs = funcs()
- fget = funcs.get("fget")
- fset = funcs.get("fset")
- name = funcs.get("name", "<unknown>")
- def _fget(self):
- value = fget(self)
- if value == null:
- if mutable_default == True:
- return copy.deepcopy(default)
- else:
- return default
- return value
- def _fset(self, value):
- if value == default:
- value = null
- fset(self, value)
- funcs["fget"] = _fget
- funcs["fset"] = _fset
- return funcs
- return decorator
-
-def fn_checked_property(value_allowed_fn):
- """
- Define allowed values for get/set access to a property.
- """
- def decorator(funcs):
- if hasattr(funcs, "__call__"):
- funcs = funcs()
- fget = funcs.get("fget")
- fset = funcs.get("fset")
- name = funcs.get("name", "<unknown>")
- def _fget(self):
- value = fget(self)
- if value_allowed_fn(value) != True:
- raise ValueCheckError(name, value, value_allowed_fn)
- return value
- def _fset(self, value):
- if value_allowed_fn(value) != True:
- raise ValueCheckError(name, value, value_allowed_fn)
- fset(self, value)
- funcs["fget"] = _fget
- funcs["fset"] = _fset
- return funcs
- return decorator
-
-def checked_property(allowed=[]):
- """
- Define allowed values for get/set access to a property.
- """
- def decorator(funcs):
- if hasattr(funcs, "__call__"):
- funcs = funcs()
- fget = funcs.get("fget")
- fset = funcs.get("fset")
- name = funcs.get("name", "<unknown>")
- def _fget(self):
- value = fget(self)
- if value not in allowed:
- raise ValueCheckError(name, value, allowed)
- return value
- def _fset(self, value):
- if value not in allowed:
- raise ValueCheckError(name, value, allowed)
- fset(self, value)
- funcs["fget"] = _fget
- funcs["fset"] = _fset
- return funcs
- return decorator
-
-def cached_property(generator, initVal=None, mutable=False):
- """
- Allow caching of values generated by generator(instance), where
- instance is the instance to which this property belongs. Uses
- ._<name>_cache to store a cache flag for a particular owner
- instance.
-
- When the cache flag is True or missing and the stored value is
- initVal, the first fget call triggers the generator function,
- whose output is stored in _<name>_cached_value. That and
- subsequent calls to fget will return this cached value.
-
- If the input value is no longer initVal (e.g. a value has been
- loaded from disk or set with fset), that value overrides any
- cached value, and this property has no effect.
-
- When the cache flag is False and the stored value is initVal, the
- generator is not cached, but is called on every fget.
-
- The cache flag is missing on initialization. Particular instances
- may override by setting their own flag.
-
- In the case that mutable == True, all caching is disabled and the
- generator is called whenever the cached value would otherwise be
- used.
- """
- def decorator(funcs):
- if hasattr(funcs, "__call__"):
- funcs = funcs()
- fget = funcs.get("fget")
- name = funcs.get("name", "<unknown>")
- def _fget(self):
- cache = getattr(self, "_%s_cache" % name, True)
- value = fget(self)
- if value == initVal:
- if cache == True and mutable == False:
- if hasattr(self, "_%s_cached_value" % name):
- value = getattr(self, "_%s_cached_value" % name)
- else:
- value = generator(self)
- setattr(self, "_%s_cached_value" % name, value)
- else:
- value = generator(self)
- return value
- funcs["fget"] = _fget
- return funcs
- return decorator
-
-def primed_property(primer, initVal=None):
- """
- Just like a cached_property, except that instead of returning a
- new value and running fset to cache it, the primer performs some
- background manipulation (e.g. loads data into instance.settings)
- such that a _second_ pass through fget succeeds.
-
- The 'cache' flag becomes a 'prime' flag, with priming taking place
- whenever ._<name>_prime is True, or is False or missing and
- value == initVal.
- """
- def decorator(funcs):
- if hasattr(funcs, "__call__"):
- funcs = funcs()
- fget = funcs.get("fget")
- name = funcs.get("name", "<unknown>")
- def _fget(self):
- prime = getattr(self, "_%s_prime" % name, False)
- if prime == False:
- value = fget(self)
- if prime == True or (prime == False and value == initVal):
- primer(self)
- value = fget(self)
- return value
- funcs["fget"] = _fget
- return funcs
- return decorator
-
-def change_hook_property(hook, mutable=False, default=None):
- """
- Call the function hook(instance, old_value, new_value) whenever a
- value different from the current value is set (instance is a a
- reference to the class instance to which this property belongs).
- This is useful for saving changes to disk, etc. This function is
- called _after_ the new value has been stored, allowing you to
- change the stored value if you want.
-
- In the case of mutables, things are slightly trickier. Because
- the property-owning class has no way of knowing when the value
- changes. We work around this by caching a private deepcopy of the
- mutable value, and checking for changes whenever the property is
- set (obviously) or retrieved (to check for external changes). So
- long as you're conscientious about accessing the property after
- making external modifications, mutability woln't be a problem.
- t.x.append(5) # external modification
- t.x # dummy access notices change and triggers hook
- See testChangeHookMutableProperty for an example of the expected
- behavior.
- """
- def decorator(funcs):
- if hasattr(funcs, "__call__"):
- funcs = funcs()
- fget = funcs.get("fget")
- fset = funcs.get("fset")
- name = funcs.get("name", "<unknown>")
- def _fget(self, new_value=None, from_fset=False): # only used if mutable == True
- if from_fset == True:
- value = new_value # compare new value with cached
- else:
- value = fget(self) # compare current value with cached
- if _cmp_cached_mutable_property(self, "change hook property", name, value, default) != 0:
- # there has been a change, cache new value
- old_value = _get_cached_mutable_property(self, "change hook property", name, default)
- _set_cached_mutable_property(self, "change hook property", name, value)
- if from_fset == True: # return previously cached value
- value = old_value
- else: # the value changed while we weren't looking
- hook(self, old_value, value)
- return value
- def _fset(self, value):
- if mutable == True: # get cached previous value
- old_value = _fget(self, new_value=value, from_fset=True)
- else:
- old_value = fget(self)
- fset(self, value)
- if value != old_value:
- hook(self, old_value, value)
- if mutable == True:
- funcs["fget"] = _fget
- funcs["fset"] = _fset
- return funcs
- return decorator
-
-
-class DecoratorTests(unittest.TestCase):
- def testLocalDoc(self):
- class Test(object):
- @Property
- @doc_property("A fancy property")
- def x():
- return {}
- self.failUnless(Test.x.__doc__ == "A fancy property",
- Test.x.__doc__)
- def testLocalProperty(self):
- class Test(object):
- @Property
- @local_property(name="LOCAL")
- def x():
- return {}
- t = Test()
- self.failUnless(t.x == None, str(t.x))
- t.x = 'z' # the first set initializes ._LOCAL_value
- self.failUnless(t.x == 'z', str(t.x))
- self.failUnless("_LOCAL_value" in dir(t), dir(t))
- self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value)
- def testSettingsProperty(self):
- class Test(object):
- @Property
- @settings_property(name="attr")
- def x():
- return {}
- def __init__(self):
- self.settings = {}
- t = Test()
- self.failUnless(t.x == None, str(t.x))
- t.x = 'z' # the first set initializes ._LOCAL_value
- self.failUnless(t.x == 'z', str(t.x))
- self.failUnless("attr" in t.settings, t.settings)
- self.failUnless(t.settings["attr"] == 'z', t.settings["attr"])
- def testDefaultingLocalProperty(self):
- class Test(object):
- @Property
- @defaulting_property(default='y', null='x')
- @local_property(name="DEFAULT", null=5)
- def x(): return {}
- t = Test()
- self.failUnless(t.x == 5, str(t.x))
- t.x = 'x'
- self.failUnless(t.x == 'y', str(t.x))
- t.x = 'y'
- self.failUnless(t.x == 'y', str(t.x))
- t.x = 'z'
- self.failUnless(t.x == 'z', str(t.x))
- t.x = 5
- self.failUnless(t.x == 5, str(t.x))
- def testCheckedLocalProperty(self):
- class Test(object):
- @Property
- @checked_property(allowed=['x', 'y', 'z'])
- @local_property(name="CHECKED")
- def x(): return {}
- def __init__(self):
- self._CHECKED_value = 'x'
- t = Test()
- self.failUnless(t.x == 'x', str(t.x))
- try:
- t.x = None
- e = None
- except ValueCheckError, e:
- pass
- self.failUnless(type(e) == ValueCheckError, type(e))
- def testTwoCheckedLocalProperties(self):
- class Test(object):
- @Property
- @checked_property(allowed=['x', 'y', 'z'])
- @local_property(name="X")
- def x(): return {}
-
- @Property
- @checked_property(allowed=['a', 'b', 'c'])
- @local_property(name="A")
- def a(): return {}
- def __init__(self):
- self._A_value = 'a'
- self._X_value = 'x'
- t = Test()
- try:
- t.x = 'a'
- e = None
- except ValueCheckError, e:
- pass
- self.failUnless(type(e) == ValueCheckError, type(e))
- t.x = 'x'
- t.x = 'y'
- t.x = 'z'
- try:
- t.a = 'x'
- e = None
- except ValueCheckError, e:
- pass
- self.failUnless(type(e) == ValueCheckError, type(e))
- t.a = 'a'
- t.a = 'b'
- t.a = 'c'
- def testFnCheckedLocalProperty(self):
- class Test(object):
- @Property
- @fn_checked_property(lambda v : v in ['x', 'y', 'z'])
- @local_property(name="CHECKED")
- def x(): return {}
- def __init__(self):
- self._CHECKED_value = 'x'
- t = Test()
- self.failUnless(t.x == 'x', str(t.x))
- try:
- t.x = None
- e = None
- except ValueCheckError, e:
- pass
- self.failUnless(type(e) == ValueCheckError, type(e))
- def testCachedLocalProperty(self):
- class Gen(object):
- def __init__(self):
- self.i = 0
- def __call__(self, owner):
- self.i += 1
- return self.i
- class Test(object):
- @Property
- @cached_property(generator=Gen(), initVal=None)
- @local_property(name="CACHED")
- def x(): return {}
- t = Test()
- self.failIf("_CACHED_cache" in dir(t), getattr(t, "_CACHED_cache", None))
- self.failUnless(t.x == 1, t.x)
- self.failUnless(t.x == 1, t.x)
- self.failUnless(t.x == 1, t.x)
- t.x = 8
- self.failUnless(t.x == 8, t.x)
- self.failUnless(t.x == 8, t.x)
- t._CACHED_cache = False # Caching is off, but the stored value
- val = t.x # is 8, not the initVal (None), so we
- self.failUnless(val == 8, val) # get 8.
- t._CACHED_value = None # Now we've set the stored value to None
- val = t.x # so future calls to fget (like this)
- self.failUnless(val == 2, val) # will call the generator every time...
- val = t.x
- self.failUnless(val == 3, val)
- val = t.x
- self.failUnless(val == 4, val)
- t._CACHED_cache = True # We turn caching back on, and get
- self.failUnless(t.x == 1, str(t.x)) # the original cached value.
- del t._CACHED_cached_value # Removing that value forces a
- self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call
- self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which
- self.failUnless(t.x == 5, str(t.x)) # we get the new cached value.
- def testPrimedLocalProperty(self):
- class Test(object):
- def prime(self):
- self.settings["PRIMED"] = "initialized"
- @Property
- @primed_property(primer=prime, initVal=None)
- @settings_property(name="PRIMED")
- def x(): return {}
- def __init__(self):
- self.settings={}
- t = Test()
- self.failIf("_PRIMED_prime" in dir(t), getattr(t, "_PRIMED_prime", None))
- self.failUnless(t.x == "initialized", t.x)
- t.x = 1
- self.failUnless(t.x == 1, t.x)
- t.x = None
- self.failUnless(t.x == "initialized", t.x)
- t._PRIMED_prime = True
- t.x = 3
- self.failUnless(t.x == "initialized", t.x)
- t._PRIMED_prime = False
- t.x = 3
- self.failUnless(t.x == 3, t.x)
- def testChangeHookLocalProperty(self):
- class Test(object):
- def _hook(self, old, new):
- self.old = old
- self.new = new
-
- @Property
- @change_hook_property(_hook)
- @local_property(name="HOOKED")
- def x(): return {}
- t = Test()
- t.x = 1
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == 1, t.new)
- t.x = 1
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == 1, t.new)
- t.x = 2
- self.failUnless(t.old == 1, t.old)
- self.failUnless(t.new == 2, t.new)
- def testChangeHookMutableProperty(self):
- class Test(object):
- def _hook(self, old, new):
- self.old = old
- self.new = new
- self.hook_calls += 1
-
- @Property
- @change_hook_property(_hook, mutable=True)
- @local_property(name="HOOKED")
- def x(): return {}
- t = Test()
- t.hook_calls = 0
- t.x = []
- self.failUnless(t.old == None, t.old)
- self.failUnless(t.new == [], t.new)
- self.failUnless(t.hook_calls == 1, t.hook_calls)
- a = t.x
- a.append(5)
- t.x = a
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 2, t.hook_calls)
- t.x = []
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [], t.new)
- self.failUnless(t.hook_calls == 3, t.hook_calls)
- # now append without reassigning. this doesn't trigger the
- # change, since we don't ever set t.x, only get it and mess
- # with it. It does, however, update our t.new, since t.new =
- # t.x and is not a static copy.
- t.x.append(5)
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 3, t.hook_calls)
- # however, the next t.x get _will_ notice the change...
- a = t.x
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5], t.new)
- self.failUnless(t.hook_calls == 4, t.hook_calls)
- t.x.append(6) # this append(6) is not noticed yet
- self.failUnless(t.old == [], t.old)
- self.failUnless(t.new == [5,6], t.new)
- self.failUnless(t.hook_calls == 4, t.hook_calls)
- # this append(7) is not noticed, but the t.x get causes the
- # append(6) to be noticed
- t.x.append(7)
- self.failUnless(t.old == [5], t.old)
- self.failUnless(t.new == [5,6,7], t.new)
- self.failUnless(t.hook_calls == 5, t.hook_calls)
- a = t.x # now the append(7) is noticed
- self.failUnless(t.old == [5,6], t.old)
- self.failUnless(t.new == [5,6,7], t.new)
- self.failUnless(t.hook_calls == 6, t.hook_calls)
-
-
-suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests)
-
diff --git a/interfaces/email/interactive/libbe/settings_object.py b/interfaces/email/interactive/libbe/settings_object.py
deleted file mode 100644
index ceea9d5..0000000
--- a/interfaces/email/interactive/libbe/settings_object.py
+++ /dev/null
@@ -1,412 +0,0 @@
-# Bugs Everywhere - a distributed bugtracker
-# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-This module provides a base class implementing settings-dict based
-property storage useful for BE objects with saved properties
-(e.g. BugDir, Bug, Comment). For example usage, consider the
-unittests at the end of the module.
-"""
-
-import doctest
-import unittest
-
-from properties import Property, doc_property, local_property, \
- defaulting_property, checked_property, fn_checked_property, \
- cached_property, primed_property, change_hook_property, \
- settings_property
-
-
-class _Token (object):
- """
- `Control' value class for properties. We want values that only
- mean something to the settings_object module.
- """
- pass
-
-class UNPRIMED (_Token):
- "Property has not been primed."
- pass
-
-class EMPTY (_Token):
- """
- Property has been primed but has no user-set value, so use
- default/generator value.
- """
- pass
-
-
-def prop_save_settings(self, old, new):
- """
- The default action undertaken when a property changes.
- """
- if self.sync_with_disk==True:
- self.save_settings()
-
-def prop_load_settings(self):
- """
- The default action undertaken when an UNPRIMED property is accessed.
- """
- if self.sync_with_disk==True and self._settings_loaded==False:
- self.load_settings()
- else:
- self._setup_saved_settings(flag_as_loaded=False)
-
-# Some name-mangling routines for pretty printing setting names
-def setting_name_to_attr_name(self, name):
- """
- Convert keys to the .settings dict into their associated
- SavedSettingsObject attribute names.
- >>> print setting_name_to_attr_name(None,"User-id")
- user_id
- """
- return name.lower().replace('-', '_')
-
-def attr_name_to_setting_name(self, name):
- """
- The inverse of setting_name_to_attr_name.
- >>> print attr_name_to_setting_name(None, "user_id")
- User-id
- """
- return name.capitalize().replace('_', '-')
-
-
-def versioned_property(name, doc,
- default=None, generator=None,
- change_hook=prop_save_settings,
- mutable=False,
- primer=prop_load_settings,
- allowed=None, check_fn=None,
- settings_properties=[],
- required_saved_properties=[],
- require_save=False):
- """
- Combine the common decorators in a single function.
-
- Use zero or one (but not both) of default or generator, since a
- working default will keep the generator from functioning. Use the
- default if you know what you want the default value to be at
- 'coding time'. Use the generator if you can write a function to
- determine a valid default at run time. If both default and
- generator are None, then the property will be a defaulting
- property which defaults to None.
-
- allowed and check_fn have a similar relationship, although you can
- use both of these if you want. allowed compares the proposed
- value against a list determined at 'coding time' and check_fn
- allows more flexible comparisons to take place at run time.
-
- Set require_save to True if you want to save the default/generated
- value for a property, to protect against future changes. E.g., we
- currently expect all comments to be 'text/plain' but in the future
- we may want to default to 'text/html'. If we don't want the old
- comments to be interpreted as 'text/html', we would require that
- the content type be saved.
-
- change_hook, primer, settings_properties, and
- required_saved_properties are only options to get their defaults
- into our local scope. Don't mess with them.
-
- Set mutable=True if:
- * default is a mutable
- * your generator function may return mutables
- * you set change_hook and might have mutable property values
- See the docstrings in libbe.properties for details on how each of
- these cases are handled.
- """
- settings_properties.append(name)
- if require_save == True:
- required_saved_properties.append(name)
- def decorator(funcs):
- fulldoc = doc
- if default != None or generator == None:
- defaulting = defaulting_property(default=default, null=EMPTY,
- mutable_default=mutable)
- fulldoc += "\n\nThis property defaults to %s." % default
- if generator != None:
- cached = cached_property(generator=generator, initVal=EMPTY,
- mutable=mutable)
- fulldoc += "\n\nThis property is generated with %s." % generator
- if check_fn != None:
- fn_checked = fn_checked_property(value_allowed_fn=check_fn)
- fulldoc += "\n\nThis property is checked with %s." % check_fn
- if allowed != None:
- checked = checked_property(allowed=allowed)
- fulldoc += "\n\nThe allowed values for this property are: %s." \
- % (', '.join(allowed))
- hooked = change_hook_property(hook=change_hook, mutable=mutable,
- default=EMPTY)
- primed = primed_property(primer=primer, initVal=UNPRIMED)
- settings = settings_property(name=name, null=UNPRIMED)
- docp = doc_property(doc=fulldoc)
- deco = hooked(primed(settings(docp(funcs))))
- if default != None or generator == None:
- deco = defaulting(deco)
- if generator != None:
- deco = cached(deco)
- if check_fn != None:
- deco = fn_checked(deco)
- if allowed != None:
- deco = checked(deco)
- return Property(deco)
- return decorator
-
-class SavedSettingsObject(object):
-
- # Keep a list of properties that may be stored in the .settings dict.
- #settings_properties = []
-
- # A list of properties that we save to disk, even if they were
- # never set (in which case we save the default value). This
- # protects against future changes in default values.
- #required_saved_properties = []
-
- _setting_name_to_attr_name = setting_name_to_attr_name
- _attr_name_to_setting_name = attr_name_to_setting_name
-
- def __init__(self):
- self._settings_loaded = False
- self.sync_with_disk = False
- self.settings = {}
-
- def load_settings(self):
- """Load the settings from disk."""
- # Override. Must call ._setup_saved_settings() after loading.
- self.settings = {}
- self._setup_saved_settings()
-
- def _setup_saved_settings(self, flag_as_loaded=True):
- """
- To be run after setting self.settings up from disk. Marks all
- settings as primed.
- """
- for property in self.settings_properties:
- if property not in self.settings:
- self.settings[property] = EMPTY
- elif self.settings[property] == UNPRIMED:
- self.settings[property] = EMPTY
- if flag_as_loaded == True:
- self._settings_loaded = True
-
- def save_settings(self):
- """Load the settings from disk."""
- # Override. Should save the dict output of ._get_saved_settings()
- settings = self._get_saved_settings()
- pass # write settings to disk....
-
- def _get_saved_settings(self):
- settings = {}
- for k,v in self.settings.items():
- if v != None and v != EMPTY:
- settings[k] = v
- for k in self.required_saved_properties:
- settings[k] = getattr(self, self._setting_name_to_attr_name(k))
- return settings
-
- def clear_cached_setting(self, setting=None):
- "If setting=None, clear *all* cached settings"
- if setting != None:
- if hasattr(self, "_%s_cached_value" % setting):
- delattr(self, "_%s_cached_value" % setting)
- else:
- for setting in settings_properties:
- self.clear_cached_setting(setting)
-
-
-class SavedSettingsObjectTests(unittest.TestCase):
- def testSimpleProperty(self):
- """Testing a minimal versioned property"""
- class Test(SavedSettingsObject):
- settings_properties = []
- required_saved_properties = []
- @versioned_property(name="Content-type",
- doc="A test property",
- settings_properties=settings_properties,
- required_saved_properties=required_saved_properties)
- def content_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
- t = Test()
- # access missing setting
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
- self.failUnless(len(t.settings) == 0, len(t.settings))
- self.failUnless(t.content_type == None, t.content_type)
- # accessing t.content_type triggers the priming, which runs
- # t._setup_saved_settings, which fills out t.settings with
- # EMPTY data. t._settings_loaded is still false though, since
- # the default priming does not do any of the `official' loading
- # that occurs in t.load_settings.
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
- t.settings["Content-type"])
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
- # load settings creates an EMPTY value in the settings array
- t.load_settings()
- self.failUnless(t._settings_loaded == True, t._settings_loaded)
- self.failUnless(t.settings["Content-type"] == EMPTY,
- t.settings["Content-type"])
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
- t.settings["Content-type"])
- # now we set a value
- t.content_type = 5
- self.failUnless(t.settings["Content-type"] == 5,
- t.settings["Content-type"])
- self.failUnless(t.content_type == 5, t.content_type)
- self.failUnless(t.settings["Content-type"] == 5,
- t.settings["Content-type"])
- # now we set another value
- t.content_type = "text/plain"
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings["Content-type"] == "text/plain",
- t.settings["Content-type"])
- self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"},
- t._get_saved_settings())
- # now we clear to the post-primed value
- t.content_type = EMPTY
- self.failUnless(t._settings_loaded == True, t._settings_loaded)
- self.failUnless(t.settings["Content-type"] == EMPTY,
- t.settings["Content-type"])
- self.failUnless(t.content_type == None, t.content_type)
- self.failUnless(len(t.settings) == 1, len(t.settings))
- self.failUnless(t.settings["Content-type"] == EMPTY,
- t.settings["Content-type"])
- def testDefaultingProperty(self):
- """Testing a defaulting versioned property"""
- class Test(SavedSettingsObject):
- settings_properties = []
- required_saved_properties = []
- @versioned_property(name="Content-type",
- doc="A test property",
- default="text/plain",
- settings_properties=settings_properties,
- required_saved_properties=required_saved_properties)
- def content_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
- t = Test()
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
- t.load_settings()
- self.failUnless(t._settings_loaded == True, t._settings_loaded)
- self.failUnless(t.content_type == "text/plain", t.content_type)
- self.failUnless(t.settings["Content-type"] == EMPTY,
- t.settings["Content-type"])
- self.failUnless(t._get_saved_settings() == {}, t._get_saved_settings())
- t.content_type = "text/html"
- self.failUnless(t.content_type == "text/html",
- t.content_type)
- self.failUnless(t.settings["Content-type"] == "text/html",
- t.settings["Content-type"])
- self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
- t._get_saved_settings())
- def testRequiredDefaultingProperty(self):
- """Testing a required defaulting versioned property"""
- class Test(SavedSettingsObject):
- settings_properties = []
- required_saved_properties = []
- @versioned_property(name="Content-type",
- doc="A test property",
- default="text/plain",
- settings_properties=settings_properties,
- required_saved_properties=required_saved_properties,
- require_save=True)
- def content_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
- t = Test()
- self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"},
- t._get_saved_settings())
- t.content_type = "text/html"
- self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
- t._get_saved_settings())
- def testClassVersionedPropertyDefinition(self):
- """Testing a class-specific _versioned property decorator"""
- class Test(SavedSettingsObject):
- settings_properties = []
- required_saved_properties = []
- def _versioned_property(settings_properties=settings_properties,
- required_saved_properties=required_saved_properties,
- **kwargs):
- if "settings_properties" not in kwargs:
- kwargs["settings_properties"] = settings_properties
- if "required_saved_properties" not in kwargs:
- kwargs["required_saved_properties"]=required_saved_properties
- return versioned_property(**kwargs)
- @_versioned_property(name="Content-type",
- doc="A test property",
- default="text/plain",
- require_save=True)
- def content_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
- t = Test()
- self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"},
- t._get_saved_settings())
- t.content_type = "text/html"
- self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"},
- t._get_saved_settings())
- def testMutableChangeHookedProperty(self):
- """Testing a mutable change-hooked property"""
- SAVES = []
- def prop_log_save_settings(self, old, new, saves=SAVES):
- saves.append("'%s' -> '%s'" % (str(old), str(new)))
- prop_save_settings(self, old, new)
- class Test(SavedSettingsObject):
- settings_properties = []
- required_saved_properties = []
- @versioned_property(name="List-type",
- doc="A test property",
- mutable=True,
- change_hook=prop_log_save_settings,
- settings_properties=settings_properties,
- required_saved_properties=required_saved_properties)
- def list_type(): return {}
- def __init__(self):
- SavedSettingsObject.__init__(self)
- t = Test()
- self.failUnless(t._settings_loaded == False, t._settings_loaded)
- t.load_settings()
- self.failUnless(SAVES == [], SAVES)
- self.failUnless(t._settings_loaded == True, t._settings_loaded)
- self.failUnless(t.list_type == None, t.list_type)
- self.failUnless(SAVES == [], SAVES)
- self.failUnless(t.settings["List-type"]==EMPTY,t.settings["List-type"])
- t.list_type = []
- self.failUnless(t.settings["List-type"] == [], t.settings["List-type"])
- self.failUnless(SAVES == [
- "'<class 'libbe.settings_object.EMPTY'>' -> '[]'"
- ], SAVES)
- t.list_type.append(5)
- self.failUnless(SAVES == [
- "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
- ], SAVES)
- self.failUnless(t.settings["List-type"] == [5],t.settings["List-type"])
- self.failUnless(SAVES == [ # the append(5) has not yet been saved
- "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
- ], SAVES)
- self.failUnless(t.list_type == [5], t.list_type) # <-get triggers saved
-
- self.failUnless(SAVES == [ # now the append(5) has been saved.
- "'<class 'libbe.settings_object.EMPTY'>' -> '[]'",
- "'[]' -> '[5]'"
- ], SAVES)
-
-unitsuite=unittest.TestLoader().loadTestsFromTestCase(SavedSettingsObjectTests)
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/interfaces/email/interactive/libbe/tree.py b/interfaces/email/interactive/libbe/tree.py
deleted file mode 100644
index 06d09e5..0000000
--- a/interfaces/email/interactive/libbe/tree.py
+++ /dev/null
@@ -1,183 +0,0 @@
-# Bugs Everywhere, a distributed bugtracker
-# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Define a traversable tree structure.
-"""
-
-import doctest
-
-class Tree(list):
- """
- Construct
- +-b---d-g
- a-+ +-e
- +-c-+-f-h-i
- with
- >>> i = Tree(); i.n = "i"
- >>> h = Tree([i]); h.n = "h"
- >>> f = Tree([h]); f.n = "f"
- >>> e = Tree(); e.n = "e"
- >>> c = Tree([f,e]); c.n = "c"
- >>> g = Tree(); g.n = "g"
- >>> d = Tree([g]); d.n = "d"
- >>> b = Tree([d]); b.n = "b"
- >>> a = Tree(); a.n = "a"
- >>> a.append(c)
- >>> a.append(b)
-
- >>> a.branch_len()
- 5
- >>> a.sort(key=lambda node : -node.branch_len())
- >>> "".join([node.n for node in a.traverse()])
- 'acfhiebdg'
- >>> a.sort(key=lambda node : node.branch_len())
- >>> "".join([node.n for node in a.traverse()])
- 'abdgcefhi'
- >>> "".join([node.n for node in a.traverse(depth_first=False)])
- 'abcdefghi'
- >>> for depth,node in a.thread():
- ... print "%*s" % (2*depth+1, node.n)
- a
- b
- d
- g
- c
- e
- f
- h
- i
- >>> for depth,node in a.thread(flatten=True):
- ... print "%*s" % (2*depth+1, node.n)
- a
- b
- d
- g
- c
- e
- f
- h
- i
- >>> a.has_descendant(g)
- True
- >>> c.has_descendant(g)
- False
- >>> a.has_descendant(a)
- False
- >>> a.has_descendant(a, match_self=True)
- True
- """
- def __eq__(self, other):
- return id(self) == id(other)
-
- def branch_len(self):
- """
- Exhaustive search every time == SLOW.
-
- Use only on small trees, or reimplement by overriding
- child-addition methods to allow accurate caching.
-
- For the tree
- +-b---d-g
- a-+ +-e
- +-c-+-f-h-i
- this method returns 5.
- """
- if len(self) == 0:
- return 1
- else:
- return 1 + max([child.branch_len() for child in self])
-
- def sort(self, *args, **kwargs):
- """
- This method can be slow, e.g. on a branch_len() sort, since a
- node at depth N from the root has it's branch_len() method
- called N times.
- """
- list.sort(self, *args, **kwargs)
- for child in self:
- child.sort(*args, **kwargs)
-
- def traverse(self, depth_first=True):
- """
- Note: you might want to sort() your tree first.
- """
- if depth_first == True:
- yield self
- for child in self:
- for descendant in child.traverse():
- yield descendant
- else: # breadth first, Wikipedia algorithm
- # http://en.wikipedia.org/wiki/Breadth-first_search
- queue = [self]
- while len(queue) > 0:
- node = queue.pop(0)
- yield node
- queue.extend(node)
-
- def thread(self, flatten=False):
- """
- When flatten==False, the depth of any node is one greater than
- the depth of its parent. That way the inheritance is
- explicit, but you can end up with highly indented threads.
-
- When flatten==True, the depth of any node is only greater than
- the depth of its parent when there is a branch, and the node
- is not the last child. This can lead to ancestry ambiguity,
- but keeps the total indentation down. E.g.
- +-b +-b-c
- a-+-c and a-+
- +-d-e-f +-d-e-f
- would both produce (after sorting by branch_len())
- (0, a)
- (1, b)
- (1, c)
- (0, d)
- (0, e)
- (0, f)
- """
- stack = [] # ancestry of the current node
- if flatten == True:
- depthDict = {}
-
- for node in self.traverse(depth_first=True):
- while len(stack) > 0 \
- and id(node) not in [id(c) for c in stack[-1]]:
- stack.pop(-1)
- if flatten == False:
- depth = len(stack)
- else:
- if len(stack) == 0:
- depth = 0
- else:
- parent = stack[-1]
- depth = depthDict[id(parent)]
- if len(parent) > 1 and node != parent[-1]:
- depth += 1
- depthDict[id(node)] = depth
- yield (depth,node)
- stack.append(node)
-
- def has_descendant(self, descendant, depth_first=True, match_self=False):
- if descendant == self:
- return match_self
- for d in self.traverse(depth_first):
- if descendant == d:
- return True
- return False
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/upgrade.py b/interfaces/email/interactive/libbe/upgrade.py
deleted file mode 100644
index 4123c72..0000000
--- a/interfaces/email/interactive/libbe/upgrade.py
+++ /dev/null
@@ -1,187 +0,0 @@
-# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Handle conversion between the various on-disk images.
-"""
-
-import os, os.path
-import sys
-import doctest
-
-import encoding
-import mapfile
-import vcs
-
-# a list of all past versions
-BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0",
- "Bugs Everywhere Directory v1.1",
- "Bugs Everywhere Directory v1.2"]
-
-# the current version
-BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1]
-
-class Upgrader (object):
- "Class for converting "
- initial_version = None
- final_version = None
- def __init__(self, root):
- self.root = root
- # use the "None" VCS to ensure proper encoding/decoding and
- # simplify path construction.
- self.vcs = vcs.vcs_by_name("None")
- self.vcs.root(self.root)
- self.vcs.encoding = encoding.get_encoding()
-
- def get_path(self, *args):
- """
- Return a path relative to .root.
- """
- dir = os.path.join(self.root, ".be")
- if len(args) == 0:
- return dir
- assert args[0] in ["version", "settings", "bugs"], str(args)
- return os.path.join(dir, *args)
-
- def check_initial_version(self):
- path = self.get_path("version")
- version = self.vcs.get_file_contents(path).rstrip("\n")
- assert version == self.initial_version, version
-
- def set_version(self):
- path = self.get_path("version")
- self.vcs.set_file_contents(path, self.final_version+"\n")
-
- def upgrade(self):
- print >> sys.stderr, "upgrading bugdir from '%s' to '%s'" \
- % (self.initial_version, self.final_version)
- self.check_initial_version()
- self.set_version()
- self._upgrade()
-
- def _upgrade(self):
- raise NotImplementedError
-
-
-class Upgrade_1_0_to_1_1 (Upgrader):
- initial_version = "Bugs Everywhere Tree 1 0"
- final_version = "Bugs Everywhere Directory v1.1"
- def _upgrade_mapfile(self, path):
- contents = self.vcs.get_file_contents(path)
- old_format = False
- for line in contents.splitlines():
- if len(line.split("=")) == 2:
- old_format = True
- break
- if old_format == True:
- # translate to YAML.
- newlines = []
- for line in contents.splitlines():
- line = line.rstrip('\n')
- if len(line) == 0:
- continue
- fields = line.split("=")
- if len(fields) == 2:
- key,value = fields
- newlines.append('%s: "%s"' % (key, value.replace('"','\\"')))
- else:
- newlines.append(line)
- contents = '\n'.join(newlines)
- # load the YAML and save
- map = mapfile.parse(contents)
- mapfile.map_save(self.vcs, path, map)
-
- def _upgrade(self):
- """
- Comment value field "From" -> "Author".
- Homegrown mapfile -> YAML.
- """
- path = self.get_path("settings")
- self._upgrade_mapfile(path)
- for bug_uuid in os.listdir(self.get_path("bugs")):
- path = self.get_path("bugs", bug_uuid, "values")
- self._upgrade_mapfile(path)
- c_path = ["bugs", bug_uuid, "comments"]
- if not os.path.exists(self.get_path(*c_path)):
- continue # no comments for this bug
- for comment_uuid in os.listdir(self.get_path(*c_path)):
- path_list = c_path + [comment_uuid, "values"]
- path = self.get_path(*path_list)
- self._upgrade_mapfile(path)
- settings = mapfile.map_load(self.vcs, path)
- if "From" in settings:
- settings["Author"] = settings.pop("From")
- mapfile.map_save(self.vcs, path, settings)
-
-
-class Upgrade_1_1_to_1_2 (Upgrader):
- initial_version = "Bugs Everywhere Directory v1.1"
- final_version = "Bugs Everywhere Directory v1.2"
- def _upgrade(self):
- """
- BugDir settings field "rcs_name" -> "vcs_name".
- """
- path = self.get_path("settings")
- settings = mapfile.map_load(self.vcs, path)
- if "rcs_name" in settings:
- settings["vcs_name"] = settings.pop("rcs_name")
- mapfile.map_save(self.vcs, path, settings)
-
-
-upgraders = [Upgrade_1_0_to_1_1,
- Upgrade_1_1_to_1_2]
-upgrade_classes = {}
-for upgrader in upgraders:
- upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader
-
-def upgrade(path, current_version,
- target_version=BUGDIR_DISK_VERSION):
- """
- Call the appropriate upgrade function to convert current_version
- to target_version. If a direct conversion function does not exist,
- use consecutive conversion functions.
- """
- if current_version not in BUGDIR_DISK_VERSIONS:
- raise NotImplementedError, \
- "Cannot handle version '%s' yet." % version
- if target_version not in BUGDIR_DISK_VERSIONS:
- raise NotImplementedError, \
- "Cannot handle version '%s' yet." % version
-
- if (current_version, target_version) in upgrade_classes:
- # direct conversion
- upgrade_class = upgrade_classes[(current_version, target_version)]
- u = upgrade_class(path)
- u.upgrade()
- else:
- # consecutive single-step conversion
- i = BUGDIR_DISK_VERSIONS.index(current_version)
- while True:
- version_a = BUGDIR_DISK_VERSIONS[i]
- version_b = BUGDIR_DISK_VERSIONS[i+1]
- try:
- upgrade_class = upgrade_classes[(version_a, version_b)]
- except KeyError:
- raise NotImplementedError, \
- "Cannot convert version '%s' to '%s' yet." \
- % (version_a, version_b)
- u = upgrade_class(path)
- u.upgrade()
- if version_b == target_version:
- break
- i += 1
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/utility.py b/interfaces/email/interactive/libbe/utility.py
deleted file mode 100644
index 1e43516..0000000
--- a/interfaces/email/interactive/libbe/utility.py
+++ /dev/null
@@ -1,131 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Assorted utility functions that don't fit in anywhere else.
-"""
-
-import calendar
-import codecs
-import os
-import shutil
-import tempfile
-import time
-import types
-import doctest
-
-def search_parent_directories(path, filename):
- """
- Find the file (or directory) named filename in path or in any
- of path's parents.
-
- e.g.
- search_parent_directories("/a/b/c", ".be")
- will return the path to the first existing file from
- /a/b/c/.be
- /a/b/.be
- /a/.be
- /.be
- or None if none of those files exist.
- """
- path = os.path.realpath(path)
- assert os.path.exists(path)
- old_path = None
- while True:
- check_path = os.path.join(path, filename)
- if os.path.exists(check_path):
- return check_path
- if path == old_path:
- return None
- old_path = path
- path = os.path.dirname(path)
-
-class Dir (object):
- "A temporary directory for testing use"
- def __init__(self):
- self.path = tempfile.mkdtemp(prefix="BEtest")
- self.removed = False
- def cleanup(self):
- if self.removed == False:
- shutil.rmtree(self.path)
- self.removed = True
- def __call__(self):
- return self.path
-
-RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000"
-
-
-def time_to_str(time_val):
- """Convert a time value into an RFC 2822-formatted string. This format
- lacks sub-second data.
- >>> time_to_str(0)
- 'Thu, 01 Jan 1970 00:00:00 +0000'
- """
- return time.strftime(RFC_2822_TIME_FMT, time.gmtime(time_val))
-
-def str_to_time(str_time):
- """Convert an RFC 2822-fomatted string into a time value.
- >>> str_to_time("Thu, 01 Jan 1970 00:00:00 +0000")
- 0
- >>> q = time.time()
- >>> str_to_time(time_to_str(q)) == int(q)
- True
- >>> str_to_time("Thu, 01 Jan 1970 00:00:00 -1000")
- 36000
- """
- timezone_str = str_time[-5:]
- if timezone_str != "+0000":
- str_time = str_time.replace(timezone_str, "+0000")
- time_val = calendar.timegm(time.strptime(str_time, RFC_2822_TIME_FMT))
- timesign = -int(timezone_str[0]+"1") # "+" -> time_val ahead of GMT
- timezone_tuple = time.strptime(timezone_str[1:], "%H%M")
- timezone = timezone_tuple.tm_hour*3600 + timezone_tuple.tm_min*60
- return time_val + timesign*timezone
-
-def handy_time(time_val):
- return time.strftime("%a, %d %b %Y %H:%M", time.localtime(time_val))
-
-def time_to_gmtime(str_time):
- """Convert an RFC 2822-fomatted string to a GMT string.
- >>> time_to_gmtime("Thu, 01 Jan 1970 00:00:00 -1000")
- 'Thu, 01 Jan 1970 10:00:00 +0000'
- """
- time_val = str_to_time(str_time)
- return time_to_str(time_val)
-
-def iterable_full_of_strings(value, alternative=None):
- """
- Require an iterable full of strings.
- >>> iterable_full_of_strings([])
- True
- >>> iterable_full_of_strings(["abc", "def", u"hij"])
- True
- >>> iterable_full_of_strings(["abc", None, u"hij"])
- False
- >>> iterable_full_of_strings(None, alternative=None)
- True
- """
- if value == alternative:
- return True
- elif not hasattr(value, "__iter__"):
- return False
- for x in value:
- if type(x) not in types.StringTypes:
- return False
- return True
-
-suite = doctest.DocTestSuite()
diff --git a/interfaces/email/interactive/libbe/vcs.py b/interfaces/email/interactive/libbe/vcs.py
deleted file mode 100644
index 7b506e8..0000000
--- a/interfaces/email/interactive/libbe/vcs.py
+++ /dev/null
@@ -1,942 +0,0 @@
-# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
-# Alexander Belchenko <bialix@ukr.net>
-# Ben Finney <ben+python@benfinney.id.au>
-# Chris Ball <cjb@laptop.org>
-# W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Define the base VCS (Version Control System) class, which should be
-subclassed by other Version Control System backends. The base class
-implements a "do not version" VCS.
-"""
-
-from subprocess import Popen, PIPE
-import codecs
-import os
-import os.path
-import re
-from socket import gethostname
-import shutil
-import sys
-import tempfile
-import unittest
-import doctest
-
-from utility import Dir, search_parent_directories
-
-
-def _get_matching_vcs(matchfn):
- """Return the first module for which matchfn(VCS_instance) is true"""
- import arch
- import bzr
- import darcs
- import git
- import hg
- for module in [arch, bzr, darcs, git, hg]:
- vcs = module.new()
- if matchfn(vcs) == True:
- return vcs
- vcs.cleanup()
- return VCS()
-
-def vcs_by_name(vcs_name):
- """Return the module for the VCS with the given name"""
- return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
-
-def detect_vcs(dir):
- """Return an VCS instance for the vcs being used in this directory"""
- return _get_matching_vcs(lambda vcs: vcs.detect(dir))
-
-def installed_vcs():
- """Return an instance of an installed VCS"""
- return _get_matching_vcs(lambda vcs: vcs.installed())
-
-
-class CommandError(Exception):
- def __init__(self, command, status, stdout, stderr):
- strerror = ["Command failed (%d):\n %s\n" % (status, stderr),
- "while executing\n %s" % command]
- Exception.__init__(self, "\n".join(strerror))
- self.command = command
- self.status = status
- self.stdout = stdout
- self.stderr = stderr
-
-class SettingIDnotSupported(NotImplementedError):
- pass
-
-class VCSnotRooted(Exception):
- def __init__(self):
- msg = "VCS not rooted"
- Exception.__init__(self, msg)
-
-class PathNotInRoot(Exception):
- def __init__(self, path, root):
- msg = "Path '%s' not in root '%s'" % (path, root)
- Exception.__init__(self, msg)
- self.path = path
- self.root = root
-
-class NoSuchFile(Exception):
- def __init__(self, pathname, root="."):
- path = os.path.abspath(os.path.join(root, pathname))
- Exception.__init__(self, "No such file: %s" % path)
-
-class EmptyCommit(Exception):
- def __init__(self):
- Exception.__init__(self, "No changes to commit")
-
-
-def new():
- return VCS()
-
-class VCS(object):
- """
- This class implements a 'no-vcs' interface.
-
- Support for other VCSs can be added by subclassing this class, and
- overriding methods _vcs_*() with code appropriate for your VCS.
-
- The methods _u_*() are utility methods available to the _vcs_*()
- methods.
- """
- name = "None"
- client = "" # command-line tool for _u_invoke_client
- versioned = False
- def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()):
- self.paranoid = paranoid
- self.verboseInvoke = False
- self.rootdir = None
- self._duplicateBasedir = None
- self._duplicateDirname = None
- self.encoding = encoding
- self.version = self._get_version()
- def _vcs_version(self):
- """
- Return the VCS version string.
- """
- return "0.0"
- def _vcs_detect(self, path=None):
- """
- Detect whether a directory is revision controlled with this VCS.
- """
- return True
- def _vcs_root(self, path):
- """
- Get the VCS root. This is the default working directory for
- future invocations. You would normally set this to the root
- directory for your VCS.
- """
- if os.path.isdir(path)==False:
- path = os.path.dirname(path)
- if path == "":
- path = os.path.abspath(".")
- return path
- def _vcs_init(self, path):
- """
- Begin versioning the tree based at path.
- """
- pass
- def _vcs_cleanup(self):
- """
- Remove any cruft that _vcs_init() created outside of the
- versioned tree.
- """
- pass
- def _vcs_get_user_id(self):
- """
- Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
- If the VCS has not been configured with a username, return None.
- """
- return None
- def _vcs_set_user_id(self, value):
- """
- Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
- This is run if the VCS has not been configured with a usename, so
- that commits will have a reasonable FROM value.
- """
- raise SettingIDnotSupported
- def _vcs_add(self, path):
- """
- Add the already created file at path to version control.
- """
- pass
- def _vcs_remove(self, path):
- """
- Remove the file at path from version control. Optionally
- remove the file from the filesystem as well.
- """
- pass
- def _vcs_update(self, path):
- """
- Notify the versioning system of changes to the versioned file
- at path.
- """
- pass
- def _vcs_get_file_contents(self, path, revision=None, binary=False):
- """
- Get the file contents as they were in a given revision.
- Revision==None specifies the current revision.
- """
- assert revision == None, \
- "The %s VCS does not support revision specifiers" % self.name
- if binary == False:
- f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding)
- else:
- f = open(os.path.join(self.rootdir, path), "rb")
- contents = f.read()
- f.close()
- return contents
- def _vcs_duplicate_repo(self, directory, revision=None):
- """
- Get the repository as it was in a given revision.
- revision==None specifies the current revision.
- dir specifies a directory to create the duplicate in.
- """
- shutil.copytree(self.rootdir, directory, True)
- def _vcs_commit(self, commitfile, allow_empty=False):
- """
- Commit the current working directory, using the contents of
- commitfile as the comment. Return the name of the old
- revision (or None if commits are not supported).
-
- If allow_empty == False, raise EmptyCommit if there are no
- changes to commit.
- """
- return None
- def _vcs_revision_id(self, index):
- """
- Return the name of the <index>th revision. Index will be an
- integer (possibly <= 0). The choice of which branch to follow
- when crossing branches/merges is not defined.
-
- Return None if revision IDs are not supported, or if the
- specified revision does not exist.
- """
- return None
- def _get_version(self):
- try:
- ret = self._vcs_version()
- return ret
- except OSError, e:
- if e.errno == errno.ENOENT:
- return None
- else:
- raise OSError, e
- except CommandError:
- return None
- def installed(self):
- if self.version != None:
- return True
- return False
- def detect(self, path="."):
- """
- Detect whether a directory is revision controlled with this VCS.
- """
- return self._vcs_detect(path)
- def root(self, path):
- """
- Set the root directory to the path's VCS root. This is the
- default working directory for future invocations.
- """
- self.rootdir = self._vcs_root(path)
- def init(self, path):
- """
- Begin versioning the tree based at path.
- Also roots the vcs at path.
- """
- if os.path.isdir(path)==False:
- path = os.path.dirname(path)
- self._vcs_init(path)
- self.root(path)
- def cleanup(self):
- self._vcs_cleanup()
- def get_user_id(self):
- """
- Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
- If the VCS has not been configured with a username, return the user's
- id. You can override the automatic lookup procedure by setting the
- VCS.user_id attribute to a string of your choice.
- """
- if hasattr(self, "user_id"):
- if self.user_id != None:
- return self.user_id
- id = self._vcs_get_user_id()
- if id == None:
- name = self._u_get_fallback_username()
- email = self._u_get_fallback_email()
- id = self._u_create_id(name, email)
- print >> sys.stderr, "Guessing id '%s'" % id
- try:
- self.set_user_id(id)
- except SettingIDnotSupported:
- pass
- return id
- def set_user_id(self, value):
- """
- Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
- This is run if the VCS has not been configured with a usename, so
- that commits will have a reasonable FROM value.
- """
- self._vcs_set_user_id(value)
- def add(self, path):
- """
- Add the already created file at path to version control.
- """
- self._vcs_add(self._u_rel_path(path))
- def remove(self, path):
- """
- Remove a file from both version control and the filesystem.
- """
- self._vcs_remove(self._u_rel_path(path))
- if os.path.exists(path):
- os.remove(path)
- def recursive_remove(self, dirname):
- """
- Remove a file/directory and all its decendents from both
- version control and the filesystem.
- """
- if not os.path.exists(dirname):
- raise NoSuchFile(dirname)
- for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
- filenames.extend(dirnames)
- for path in filenames:
- fullpath = os.path.join(dirpath, path)
- if os.path.exists(fullpath) == False:
- continue
- self._vcs_remove(self._u_rel_path(fullpath))
- if os.path.exists(dirname):
- shutil.rmtree(dirname)
- def update(self, path):
- """
- Notify the versioning system of changes to the versioned file
- at path.
- """
- self._vcs_update(self._u_rel_path(path))
- def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False):
- """
- Get the file as it was in a given revision.
- Revision==None specifies the current revision.
- """
- if not os.path.exists(path):
- raise NoSuchFile(path)
- if self._use_vcs(path, allow_no_vcs):
- relpath = self._u_rel_path(path)
- contents = self._vcs_get_file_contents(relpath,revision,binary=binary)
- else:
- f = codecs.open(path, "r", self.encoding)
- contents = f.read()
- f.close()
- return contents
- def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False):
- """
- Set the file contents under version control.
- """
- add = not os.path.exists(path)
- if binary == False:
- f = codecs.open(path, "w", self.encoding)
- else:
- f = open(path, "wb")
- f.write(contents)
- f.close()
-
- if self._use_vcs(path, allow_no_vcs):
- if add:
- self.add(path)
- else:
- self.update(path)
- def mkdir(self, path, allow_no_vcs=False, check_parents=True):
- """
- Create (if neccessary) a directory at path under version
- control.
- """
- if check_parents == True:
- parent = os.path.dirname(path)
- if not os.path.exists(parent): # recurse through parents
- self.mkdir(parent, allow_no_vcs, check_parents)
- if not os.path.exists(path):
- os.mkdir(path)
- if self._use_vcs(path, allow_no_vcs):
- self.add(path)
- else:
- assert os.path.isdir(path)
- if self._use_vcs(path, allow_no_vcs):
- #self.update(path)# Don't update directories. Changing files
- pass # underneath them should be sufficient.
-
- def duplicate_repo(self, revision=None):
- """
- Get the repository as it was in a given revision.
- revision==None specifies the current revision.
- Return the path to the arbitrary directory at the base of the new repo.
- """
- # Dirname in Baseir to protect against simlink attacks.
- if self._duplicateBasedir == None:
- self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs')
- self._duplicateDirname = \
- os.path.join(self._duplicateBasedir, "duplicate")
- self._vcs_duplicate_repo(directory=self._duplicateDirname,
- revision=revision)
- return self._duplicateDirname
- def remove_duplicate_repo(self):
- """
- Clean up a duplicate repo created with duplicate_repo().
- """
- if self._duplicateBasedir != None:
- shutil.rmtree(self._duplicateBasedir)
- self._duplicateBasedir = None
- self._duplicateDirname = None
- def commit(self, summary, body=None, allow_empty=False):
- """
- Commit the current working directory, with a commit message
- string summary and body. Return the name of the old revision
- (or None if versioning is not supported).
-
- If allow_empty == False (the default), raise EmptyCommit if
- there are no changes to commit.
- """
- summary = summary.strip()+'\n'
- if body is not None:
- summary += '\n' + body.strip() + '\n'
- descriptor, filename = tempfile.mkstemp()
- revision = None
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.flush()
- self.precommit()
- revision = self._vcs_commit(filename, allow_empty=allow_empty)
- temp_file.close()
- self.postcommit()
- finally:
- os.remove(filename)
- return revision
- def precommit(self):
- """
- Executed before all attempted commits.
- """
- pass
- def postcommit(self):
- """
- Only executed after successful commits.
- """
- pass
- def revision_id(self, index=None):
- """
- Return the name of the <index>th revision. The choice of
- which branch to follow when crossing branches/merges is not
- defined.
-
- Return None if index==None, revision IDs are not supported, or
- if the specified revision does not exist.
- """
- if index == None:
- return None
- return self._vcs_revision_id(index)
- def _u_any_in_string(self, list, string):
- """
- Return True if any of the strings in list are in string.
- Otherwise return False.
- """
- for list_string in list:
- if list_string in string:
- return True
- return False
- def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None):
- """
- expect should be a tuple of allowed exit codes. cwd should be
- the directory from which the command will be executed.
- """
- if cwd == None:
- cwd = self.rootdir
- if self.verboseInvoke == True:
- print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args))
- try :
- if sys.platform != "win32":
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
- else:
- # win32 don't have os.execvp() so have to run command in a shell
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
- shell=True, cwd=cwd)
- except OSError, e :
- raise CommandError(args, status=e.args[0], stdout="", stderr=e)
- output,error = q.communicate(input=stdin)
- status = q.wait()
- if self.verboseInvoke == True:
- print >> sys.stderr, "%d\n%s%s" % (status, output, error)
- if status not in expect:
- raise CommandError(args, status, output, error)
- return status, output, error
- def _u_invoke_client(self, *args, **kwargs):
- directory = kwargs.get('directory',None)
- expect = kwargs.get('expect', (0,))
- stdin = kwargs.get('stdin', None)
- cl_args = [self.client]
- cl_args.extend(args)
- return self._u_invoke(cl_args, stdin=stdin,expect=expect,cwd=directory)
- def _u_search_parent_directories(self, path, filename):
- """
- Find the file (or directory) named filename in path or in any
- of path's parents.
-
- e.g.
- search_parent_directories("/a/b/c", ".be")
- will return the path to the first existing file from
- /a/b/c/.be
- /a/b/.be
- /a/.be
- /.be
- or None if none of those files exist.
- """
- return search_parent_directories(path, filename)
- def _use_vcs(self, path, allow_no_vcs):
- """
- Try and decide if _vcs_add/update/mkdir/etc calls will
- succeed. Returns True is we think the vcs_call would
- succeeed, and False otherwise.
- """
- use_vcs = True
- exception = None
- if self.rootdir != None:
- if self.path_in_root(path) == False:
- use_vcs = False
- exception = PathNotInRoot(path, self.rootdir)
- else:
- use_vcs = False
- exception = VCSnotRooted
- if use_vcs == False and allow_no_vcs==False:
- raise exception
- return use_vcs
- def path_in_root(self, path, root=None):
- """
- Return the relative path to path from root.
- >>> vcs = new()
- >>> vcs.path_in_root("/a.b/c/.be", "/a.b/c")
- True
- >>> vcs.path_in_root("/a.b/.be", "/a.b/c")
- False
- """
- if root == None:
- if self.rootdir == None:
- raise VCSnotRooted
- root = self.rootdir
- path = os.path.abspath(path)
- absRoot = os.path.abspath(root)
- absRootSlashedDir = os.path.join(absRoot,"")
- if not path.startswith(absRootSlashedDir):
- return False
- return True
- def _u_rel_path(self, path, root=None):
- """
- Return the relative path to path from root.
- >>> vcs = new()
- >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
- '.be'
- """
- if root == None:
- if self.rootdir == None:
- raise VCSnotRooted
- root = self.rootdir
- path = os.path.abspath(path)
- absRoot = os.path.abspath(root)
- absRootSlashedDir = os.path.join(absRoot,"")
- if not path.startswith(absRootSlashedDir):
- raise PathNotInRoot(path, absRootSlashedDir)
- assert path != absRootSlashedDir, \
- "file %s == root directory %s" % (path, absRootSlashedDir)
- relpath = path[len(absRootSlashedDir):]
- return relpath
- def _u_abspath(self, path, root=None):
- """
- Return the absolute path from a path realtive to root.
- >>> vcs = new()
- >>> vcs._u_abspath(".be", "/a.b/c")
- '/a.b/c/.be'
- """
- if root == None:
- assert self.rootdir != None, "VCS not rooted"
- root = self.rootdir
- return os.path.abspath(os.path.join(root, path))
- def _u_create_id(self, name, email=None):
- """
- >>> vcs = new()
- >>> vcs._u_create_id("John Doe", "jdoe@example.com")
- 'John Doe <jdoe@example.com>'
- >>> vcs._u_create_id("John Doe")
- 'John Doe'
- """
- assert len(name) > 0
- if email == None or len(email) == 0:
- return name
- else:
- return "%s <%s>" % (name, email)
- def _u_parse_id(self, value):
- """
- >>> vcs = new()
- >>> vcs._u_parse_id("John Doe <jdoe@example.com>")
- ('John Doe', 'jdoe@example.com')
- >>> vcs._u_parse_id("John Doe")
- ('John Doe', None)
- >>> try:
- ... vcs._u_parse_id("John Doe <jdoe@example.com><what?>")
- ... except AssertionError:
- ... print "Invalid match"
- Invalid match
- """
- emailexp = re.compile("(.*) <([^>]*)>(.*)")
- match = emailexp.search(value)
- if match == None:
- email = None
- name = value
- else:
- assert len(match.groups()) == 3
- assert match.groups()[2] == "", match.groups()
- email = match.groups()[1]
- name = match.groups()[0]
- assert name != None
- assert len(name) > 0
- return (name, email)
- def _u_get_fallback_username(self):
- name = None
- for envariable in ["LOGNAME", "USERNAME"]:
- if os.environ.has_key(envariable):
- name = os.environ[envariable]
- break
- assert name != None
- return name
- def _u_get_fallback_email(self):
- hostname = gethostname()
- name = self._u_get_fallback_username()
- return "%s@%s" % (name, hostname)
- def _u_parse_commitfile(self, commitfile):
- """
- Split the commitfile created in self.commit() back into
- summary and header lines.
- """
- f = codecs.open(commitfile, "r", self.encoding)
- summary = f.readline()
- body = f.read()
- body.lstrip('\n')
- if len(body) == 0:
- body = None
- f.close()
- return (summary, body)
-
-
-def setup_vcs_test_fixtures(testcase):
- """Set up test fixtures for VCS test case."""
- testcase.vcs = testcase.Class()
- testcase.dir = Dir()
- testcase.dirname = testcase.dir.path
-
- vcs_not_supporting_uninitialized_user_id = []
- vcs_not_supporting_set_user_id = ["None", "hg"]
- testcase.vcs_supports_uninitialized_user_id = (
- testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id)
- testcase.vcs_supports_set_user_id = (
- testcase.vcs.name not in vcs_not_supporting_set_user_id)
-
- if not testcase.vcs.installed():
- testcase.fail(
- "%(name)s VCS not found" % vars(testcase.Class))
-
- if testcase.Class.name != "None":
- testcase.failIf(
- testcase.vcs.detect(testcase.dirname),
- "Detected %(name)s VCS before initialising"
- % vars(testcase.Class))
-
- testcase.vcs.init(testcase.dirname)
-
-
-class VCSTestCase(unittest.TestCase):
- """Test cases for base VCS class."""
-
- Class = VCS
-
- def __init__(self, *args, **kwargs):
- super(VCSTestCase, self).__init__(*args, **kwargs)
- self.dirname = None
-
- def setUp(self):
- super(VCSTestCase, self).setUp()
- setup_vcs_test_fixtures(self)
-
- def tearDown(self):
- self.vcs.cleanup()
- super(VCSTestCase, self).tearDown()
-
- def full_path(self, rel_path):
- return os.path.join(self.dirname, rel_path)
-
-
-class VCS_init_TestCase(VCSTestCase):
- """Test cases for VCS.init method."""
-
- def test_detect_should_succeed_after_init(self):
- """Should detect VCS in directory after initialization."""
- self.failUnless(
- self.vcs.detect(self.dirname),
- "Did not detect %(name)s VCS after initialising"
- % vars(self.Class))
-
- def test_vcs_rootdir_in_specified_root_path(self):
- """VCS root directory should be in specified root path."""
- rp = os.path.realpath(self.vcs.rootdir)
- dp = os.path.realpath(self.dirname)
- vcs_name = self.Class.name
- self.failUnless(
- dp == rp or rp == None,
- "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
-
-
-class VCS_get_user_id_TestCase(VCSTestCase):
- """Test cases for VCS.get_user_id method."""
-
- def test_gets_existing_user_id(self):
- """Should get the existing user ID."""
- if not self.vcs_supports_uninitialized_user_id:
- return
-
- user_id = self.vcs.get_user_id()
- self.failUnless(
- user_id is not None,
- "unable to get a user id")
-
-
-class VCS_set_user_id_TestCase(VCSTestCase):
- """Test cases for VCS.set_user_id method."""
-
- def setUp(self):
- super(VCS_set_user_id_TestCase, self).setUp()
-
- if self.vcs_supports_uninitialized_user_id:
- self.prev_user_id = self.vcs.get_user_id()
- else:
- self.prev_user_id = "Uninitialized identity <bogus@example.org>"
-
- if self.vcs_supports_set_user_id:
- self.test_new_user_id = "John Doe <jdoe@example.com>"
- self.vcs.set_user_id(self.test_new_user_id)
-
- def tearDown(self):
- if self.vcs_supports_set_user_id:
- self.vcs.set_user_id(self.prev_user_id)
- super(VCS_set_user_id_TestCase, self).tearDown()
-
- def test_raises_error_in_unsupported_vcs(self):
- """Should raise an error in a VCS that doesn't support it."""
- if self.vcs_supports_set_user_id:
- return
- self.assertRaises(
- SettingIDnotSupported,
- self.vcs.set_user_id, "foo")
-
- def test_updates_user_id_in_supporting_vcs(self):
- """Should update the user ID in an VCS that supports it."""
- if not self.vcs_supports_set_user_id:
- return
- user_id = self.vcs.get_user_id()
- self.failUnlessEqual(
- self.test_new_user_id, user_id,
- "user id not set correctly (expected %s, got %s)"
- % (self.test_new_user_id, user_id))
-
-
-def setup_vcs_revision_test_fixtures(testcase):
- """Set up revision test fixtures for VCS test case."""
- testcase.test_dirs = ['a', 'a/b', 'c']
- for path in testcase.test_dirs:
- testcase.vcs.mkdir(testcase.full_path(path))
-
- testcase.test_files = ['a/text', 'a/b/text']
-
- testcase.test_contents = {
- 'rev_1': "Lorem ipsum",
- 'uncommitted': "dolor sit amet",
- }
-
-
-class VCS_mkdir_TestCase(VCSTestCase):
- """Test cases for VCS.mkdir method."""
-
- def setUp(self):
- super(VCS_mkdir_TestCase, self).setUp()
- setup_vcs_revision_test_fixtures(self)
-
- def tearDown(self):
- for path in reversed(sorted(self.test_dirs)):
- self.vcs.recursive_remove(self.full_path(path))
- super(VCS_mkdir_TestCase, self).tearDown()
-
- def test_mkdir_creates_directory(self):
- """Should create specified directory in filesystem."""
- for path in self.test_dirs:
- full_path = self.full_path(path)
- self.failUnless(
- os.path.exists(full_path),
- "path %(full_path)s does not exist" % vars())
-
-
-class VCS_commit_TestCase(VCSTestCase):
- """Test cases for VCS.commit method."""
-
- def setUp(self):
- super(VCS_commit_TestCase, self).setUp()
- setup_vcs_revision_test_fixtures(self)
-
- def tearDown(self):
- for path in reversed(sorted(self.test_dirs)):
- self.vcs.recursive_remove(self.full_path(path))
- super(VCS_commit_TestCase, self).tearDown()
-
- def test_file_contents_as_specified(self):
- """Should set file contents as specified."""
- test_contents = self.test_contents['rev_1']
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(full_path, test_contents)
- current_contents = self.vcs.get_file_contents(full_path)
- self.failUnlessEqual(test_contents, current_contents)
-
- def test_file_contents_as_committed(self):
- """Should have file contents as specified after commit."""
- test_contents = self.test_contents['rev_1']
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(full_path, test_contents)
- revision = self.vcs.commit("Initial file contents.")
- current_contents = self.vcs.get_file_contents(full_path)
- self.failUnlessEqual(test_contents, current_contents)
-
- def test_file_contents_as_set_when_uncommitted(self):
- """Should set file contents as specified after commit."""
- if not self.vcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.vcs.commit("Initial file contents.")
- self.vcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- current_contents = self.vcs.get_file_contents(full_path)
- self.failUnlessEqual(
- self.test_contents['uncommitted'], current_contents)
-
- def test_revision_file_contents_as_committed(self):
- """Should get file contents as committed to specified revision."""
- import sys
- if not self.vcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.vcs.commit("Initial file contents.")
- self.vcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- committed_contents = self.vcs.get_file_contents(
- full_path, revision)
- self.failUnlessEqual(
- self.test_contents['rev_1'], committed_contents)
-
- def test_revision_id_as_committed(self):
- """Check for compatibility between .commit() and .revision_id()"""
- if not self.vcs.versioned:
- self.failUnlessEqual(self.vcs.revision_id(5), None)
- return
- committed_revisions = []
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.vcs.commit("Initial %s contents." % path)
- committed_revisions.append(revision)
- self.vcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- revision = self.vcs.commit("Altered %s contents." % path)
- committed_revisions.append(revision)
- for i,revision in enumerate(committed_revisions):
- self.failUnlessEqual(self.vcs.revision_id(i), revision)
- i += -len(committed_revisions) # check negative indices
- self.failUnlessEqual(self.vcs.revision_id(i), revision)
- i = len(committed_revisions)
- self.failUnlessEqual(self.vcs.revision_id(i), None)
- self.failUnlessEqual(self.vcs.revision_id(-i-1), None)
-
- def test_revision_id_as_committed(self):
- """Check revision id before first commit"""
- if not self.vcs.versioned:
- self.failUnlessEqual(self.vcs.revision_id(5), None)
- return
- committed_revisions = []
- for path in self.test_files:
- self.failUnlessEqual(self.vcs.revision_id(0), None)
-
-
-class VCS_duplicate_repo_TestCase(VCSTestCase):
- """Test cases for VCS.duplicate_repo method."""
-
- def setUp(self):
- super(VCS_duplicate_repo_TestCase, self).setUp()
- setup_vcs_revision_test_fixtures(self)
-
- def tearDown(self):
- self.vcs.remove_duplicate_repo()
- for path in reversed(sorted(self.test_dirs)):
- self.vcs.recursive_remove(self.full_path(path))
- super(VCS_duplicate_repo_TestCase, self).tearDown()
-
- def test_revision_file_contents_as_committed(self):
- """Should match file contents as committed to specified revision."""
- if not self.vcs.versioned:
- return
- for path in self.test_files:
- full_path = self.full_path(path)
- self.vcs.set_file_contents(
- full_path, self.test_contents['rev_1'])
- revision = self.vcs.commit("Commit current status")
- self.vcs.set_file_contents(
- full_path, self.test_contents['uncommitted'])
- dup_repo_path = self.vcs.duplicate_repo(revision)
- dup_file_path = os.path.join(dup_repo_path, path)
- dup_file_contents = file(dup_file_path, 'rb').read()
- self.failUnlessEqual(
- self.test_contents['rev_1'], dup_file_contents)
- self.vcs.remove_duplicate_repo()
-
-
-def make_vcs_testcase_subclasses(vcs_class, namespace):
- """Make VCSTestCase subclasses for vcs_class in the namespace."""
- vcs_testcase_classes = [
- c for c in (
- ob for ob in globals().values() if isinstance(ob, type))
- if issubclass(c, VCSTestCase)]
-
- for base_class in vcs_testcase_classes:
- testcase_class_name = vcs_class.__name__ + base_class.__name__
- testcase_class_bases = (base_class,)
- testcase_class_dict = dict(base_class.__dict__)
- testcase_class_dict['Class'] = vcs_class
- testcase_class = type(
- testcase_class_name, testcase_class_bases, testcase_class_dict)
- setattr(namespace, testcase_class_name, testcase_class)
-
-
-unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
-suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/interfaces/email/interactive/libbe/version.py b/interfaces/email/interactive/libbe/version.py
deleted file mode 100644
index f8eebbd..0000000
--- a/interfaces/email/interactive/libbe/version.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Store version info for this BE installation. By default, use the
-bzr-generated information in _version.py, but allow manual overriding
-by setting _VERSION. This allows support of both the "I don't want to
-be bothered setting version strings" and the "I want complete control
-over the version strings" workflows.
-"""
-
-import libbe._version as _version
-
-# Manually set a version string (optional, defaults to bzr revision id)
-#_VERSION = "1.2.3"
-
-def version(verbose=False):
- """
- Returns the version string for this BE installation. If
- verbose==True, the string will include extra lines with more
- detail (e.g. bzr branch nickname, etc.).
- """
- if "_VERSION" in globals():
- string = _VERSION
- else:
- string = _version.version_info["revision_id"]
- if verbose == True:
- string += ("\n"
- "revision: %(revno)d\n"
- "nick: %(branch_nick)s\n"
- "revision id: %(revision_id)s"
- % _version.version_info)
- return string
-
-if __name__ == "__main__":
- print version(verbose=True)