aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
Diffstat (limited to 'libbe')
-rw-r--r--libbe/arch.py425
-rw-r--r--libbe/beuuid.py62
-rw-r--r--libbe/bug.py351
-rw-r--r--libbe/bugdir.py811
-rw-r--r--libbe/bzr.py186
-rw-r--r--libbe/cmdutil.py161
-rw-r--r--libbe/comment.py386
-rw-r--r--libbe/config.py4
-rw-r--r--libbe/diff.py63
-rw-r--r--libbe/git.py211
-rw-r--r--libbe/hg.py167
-rw-r--r--libbe/mapfile.py136
-rw-r--r--libbe/names.py37
-rw-r--r--libbe/no_rcs.py51
-rw-r--r--libbe/plugin.py7
-rw-r--r--libbe/rcs.py653
-rw-r--r--libbe/restconvert.py4
-rw-r--r--libbe/template48
-rw-r--r--libbe/tests.py55
-rw-r--r--libbe/tree.py158
-rw-r--r--libbe/utility.py95
21 files changed, 2631 insertions, 1440 deletions
diff --git a/libbe/arch.py b/libbe/arch.py
index 038325a..fd953a4 100644
--- a/libbe/arch.py
+++ b/libbe/arch.py
@@ -14,196 +14,265 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-from subprocess import Popen, PIPE
import os
+import shutil
+import time
+import re
+import unittest
+import doctest
+
import config
-import errno
+from beuuid import uuid_gen
+from rcs import RCS, RCStestCase, CommandError
+
client = config.get_val("arch_client")
if client is None:
client = "tla"
config.set_val("arch_client", client)
-def invoke(args):
- try :
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- except OSError, e :
- strerror = "%s\nwhile executing %s" % (e.args[1], args)
- raise Exception("Command failed: %s" % strerror)
- output = q.stdout.read()
- error = q.stderr.read()
- status = q.wait()
- if status >= 0:
- return status, output, error
- raise Exception("Command failed: %s" % error)
-
-
-def invoke_client(*args, **kwargs):
- cl_args = [client]
- cl_args.extend(args)
- status,output,error = invoke(cl_args)
- if status not in (0,):
- raise Exception("Command failed: %s" % error)
- return output
-
-def get_user_id():
- try:
- return invoke_client('my-id')
- except Exception, e:
- if 'no arch user id set' in e.args[0]:
- return None
+def new():
+ return Arch()
+
+class Arch(RCS):
+ name = "Arch"
+ client = client
+ versioned = True
+ _archive_name = None
+ _archive_dir = None
+ _tmp_archive = False
+ _project_name = None
+ _tmp_project = False
+ _arch_paramdir = os.path.expanduser("~/.arch-params")
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Arch"""
+ if self._u_search_parent_directories(path, "{arch}") != None :
+ return True
+ return False
+ def _rcs_init(self, path):
+ self._create_archive(path)
+ self._create_project(path)
+ self._add_project_code(path)
+ def _create_archive(self, path):
+ # Create a new archive
+ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
+ assert self._archive_name == None
+ id = self.get_user_id()
+ name, email = self._u_parse_id(id)
+ if email == None:
+ email = "%s@example.com" % name
+ trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8])
+ self._archive_name = "%s--%s" % (email, trailer)
+ self._archive_dir = "/tmp/%s" % trailer
+ self._tmp_archive = True
+ self._u_invoke_client("make-archive", self._archive_name,
+ self._archive_dir, directory=path)
+ def _invoke_client(self, *args, **kwargs):
+ """
+ Invoke the client on our archive.
+ """
+ assert self._archive_name != None
+ command = args[0]
+ if len(args) > 1:
+ tailargs = args[1:]
else:
- raise
-
-
-def set_user_id(value):
- invoke_client('my-id', value)
-
-
-def ensure_user_id():
- if get_user_id() is None:
- set_user_id('nobody <nobody@example.com>')
-
-
-def write_tree_settings(contents, path):
- file(os.path.join(path, "{arch}", "=tagging-method"), "wb").write(contents)
-
-def init_tree(path):
- invoke_client("init-tree", "-d", path)
-
-def temp_arch_tree(type="easy"):
- import tempfile
- ensure_user_id()
- path = tempfile.mkdtemp()
- init_tree(path)
- if type=="easy":
- write_tree_settings("source ^.*$\n", path)
- elif type=="tricky":
- write_tree_settings("source ^$\n", path)
- else:
- assert (type=="impossible")
- add_dir_rule("precious ^\.boo$", path, path)
- return path
-
-def list_added(root):
- assert os.path.exists(root)
- assert os.access(root, os.X_OK)
- root = os.path.realpath(root)
- inv_str = invoke_client("inventory", "--source", '--both', '--all', root)
- return [os.path.join(root, p) for p in inv_str.split('\n')]
-
-def tree_root(filename):
- assert os.path.exists(filename)
- if not os.path.isdir(filename):
- dirname = os.path.dirname(filename)
- else:
- dirname = filename
- return invoke_client("tree-root", dirname).rstrip('\n')
-
-def rel_filename(filename, root):
- filename = os.path.realpath(filename)
- root = os.path.realpath(root)
- assert(filename.startswith(root))
- return filename[len(root)+1:]
+ tailargs = []
+ arglist = [command, "-A", self._archive_name]
+ arglist.extend(tailargs)
+ args = tuple(arglist)
+ return self._u_invoke_client(*args, **kwargs)
+ def _remove_archive(self):
+ assert self._tmp_archive == True
+ assert self._archive_dir != None
+ assert self._archive_name != None
+ os.remove(os.path.join(self._arch_paramdir,
+ "=locations", self._archive_name))
+ shutil.rmtree(self._archive_dir)
+ self._tmp_archive = False
+ self._archive_dir = False
+ self._archive_name = False
+ def _create_project(self, path):
+ """
+ Create a temporary Arch project in the directory PATH. This
+ project will be removed by
+ __del__->cleanup->_rcs_cleanup->_remove_project
+ """
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
+ category = "bugs-everywhere"
+ branch = "mainline"
+ version = "0.1"
+ self._project_name = "%s--%s--%s" % (category, branch, version)
+ self._invoke_client("archive-setup", self._project_name,
+ directory=path)
+ self._tmp_project = True
+ def _remove_project(self):
+ assert self._tmp_project == True
+ assert self._project_name != None
+ assert self._archive_dir != None
+ shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
+ self._tmp_project = False
+ self._project_name = False
+ def _archive_project_name(self):
+ assert self._archive_name != None
+ assert self._project_name != None
+ return "%s/%s" % (self._archive_name, self._project_name)
+ def _adjust_naming_conventions(self, path):
+ """
+ By default, Arch restricts source code filenames to
+ ^[_=a-zA-Z0-9].*$
+ See
+ http://regexps.srparish.net/tutorial-tla/naming-conventions.html
+ Since our bug directory '.be' doesn't satisfy these conventions,
+ we need to adjust them.
+
+ The conventions are specified in
+ project-root/{arch}/=tagging-method
+ """
+ tagpath = os.path.join(path, "{arch}", "=tagging-method")
+ lines_out = []
+ for line in file(tagpath, "rb"):
+ line.decode("utf-8")
+ if line.startswith("source "):
+ lines_out.append("source ^[._=a-zA-X0-9].*$\n")
+ else:
+ lines_out.append(line)
+ file(tagpath, "wb").write("".join(lines_out).encode("utf-8"))
+
+ def _add_project_code(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-source.html
+ # http://regexps.srparish.net/tutorial-tla/importing-first.html
+ self._invoke_client("init-tree", self._project_name,
+ directory=path)
+ self._adjust_naming_conventions(path)
+ self._invoke_client("import", "--summary", "Began versioning",
+ directory=path)
+ def _rcs_cleanup(self):
+ if self._tmp_project == True:
+ self._remove_project()
+ if self._tmp_archive == True:
+ self._remove_archive()
+
+ def _rcs_root(self, path):
+ if not os.path.isdir(path):
+ dirname = os.path.dirname(path)
+ else:
+ dirname = path
+ status,output,error = self._u_invoke_client("tree-root", dirname)
+ root = output.rstrip('\n')
+
+ self._get_archive_project_name(root)
+
+ return root
+
+ def _get_archive_name(self, root):
+ status,output,error = self._u_invoke_client("archives")
+ lines = output.split('\n')
+ # e.g. output:
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52
+ # /tmp/BEtestXXXXXX/rootdir
+ # (+ repeats)
+ for archive,location in zip(lines[::2], lines[1::2]):
+ if os.path.realpath(location) == os.path.realpath(root):
+ self._archive_name = archive
+ assert self._archive_name != None
+
+ def _get_archive_project_name(self, root):
+ # get project names
+ status,output,error = self._u_invoke_client("tree-version", directory=root)
+ # e.g output
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1
+ archive_name,project_name = output.rstrip('\n').split('/')
+ self._archive_name = archive_name
+ self._project_name = project_name
+ def _rcs_get_user_id(self):
+ try:
+ status,output,error = self._u_invoke_client('my-id')
+ return output.rstrip('\n')
+ except Exception, e:
+ if 'no arch user id set' in e.args[0]:
+ return None
+ else:
+ raise
+ def _rcs_set_user_id(self, value):
+ self._u_invoke_client('my-id', value)
+ def _rcs_add(self, path):
+ self._u_invoke_client("add-id", path)
+ realpath = os.path.realpath(self._u_abspath(path))
+ pathAdded = realpath in self._list_added(self.rootdir)
+ if self.paranoid and not pathAdded:
+ self._force_source(path)
+ def _list_added(self, root):
+ assert os.path.exists(root)
+ assert os.access(root, os.X_OK)
+ root = os.path.realpath(root)
+ status,output,error = self._u_invoke_client("inventory", "--source",
+ "--both", "--all", root)
+ inv_str = output.rstrip('\n')
+ return [os.path.join(root, p) for p in inv_str.split('\n')]
+ def _add_dir_rule(self, rule, dirname, root):
+ inv_path = os.path.join(dirname, '.arch-inventory')
+ file(inv_path, "ab").write(rule)
+ if os.path.realpath(inv_path) not in self._list_added(root):
+ paranoid = self.paranoid
+ self.paranoid = False
+ self.add(inv_path)
+ self.paranoid = paranoid
+ def _force_source(self, path):
+ rule = "source %s\n" % self._u_rel_path(path)
+ self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
+ if os.path.realpath(path) not in self._list_added(self.rootdir):
+ raise CantAddFile(path)
+ def _rcs_remove(self, path):
+ if not '.arch-ids' in path:
+ self._u_invoke_client("delete-id", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(self._u_abspath(path), "rb").read()
+ else:
+ status,output,error = \
+ self._invoke_client("file-find", path, revision)
+ path = output.rstrip('\n')
+ return file(self._u_abspath(path), "rb").read()
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ status,output,error = \
+ self._u_invoke_client("get", revision,directory)
+ def _rcs_commit(self, commitfile):
+ summary,body = self._u_parse_commitfile(commitfile)
+ #status,output,error = self._invoke_client("make-log")
+ if body == None:
+ status,output,error \
+ = self._u_invoke_client("commit","--summary",summary)
+ else:
+ status,output,error \
+ = self._u_invoke_client("commit","--summary",summary,
+ "--log-message",body)
+ revision = None
+ revline = re.compile("[*] committed (.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revpath = match.groups()[0]
+ assert not " " in revpath, revpath
+ assert revpath.startswith(self._archive_project_name()+'--')
+ revision = revpath[len(self._archive_project_name()+'--'):]
+ return revpath
class CantAddFile(Exception):
def __init__(self, file):
self.file = file
Exception.__init__(self, "Can't automatically add file %s" % file)
+class ArchTestCase(RCStestCase):
+ Class = Arch
-def add_dir_rule(rule, dirname, root):
- inv_filename = os.path.join(dirname, '.arch-inventory')
- file(inv_filename, "ab").write(rule)
- if os.path.realpath(inv_filename) not in list_added(root):
- add_id(inv_filename, paranoid=False)
-
-def force_source(filename, root):
- rule = "source %s\n" % rel_filename(filename, root)
- add_dir_rule(rule, os.path.dirname(filename), root)
- if os.path.realpath(filename) not in list_added(root):
- raise CantAddFile(filename)
-
-def add_id(filename, paranoid=False):
- invoke_client("add-id", filename)
- root = tree_root(filename)
- if paranoid and os.path.realpath(filename) not in list_added(root):
- force_source(filename, root)
-
-
-def delete_id(filename):
- invoke_client("delete-id", filename)
-
-def test_helper(type):
- t = temp_arch_tree(type)
- dirname = os.path.join(t, ".boo")
- return dirname, t
-
-def mkdir(path, paranoid=False):
- """
- >>> import shutil
- >>> dirname,t = test_helper("easy")
- >>> mkdir(dirname, paranoid=False)
- >>> assert os.path.realpath(dirname) in list_added(t)
- >>> assert not os.path.exists(os.path.join(t, ".arch-inventory"))
- >>> shutil.rmtree(t)
- >>> dirname,t = test_helper("tricky")
- >>> mkdir(dirname, paranoid=True)
- >>> assert os.path.realpath(dirname) in list_added(t)
- >>> assert os.path.exists(os.path.join(t, ".arch-inventory"))
- >>> shutil.rmtree(t)
- >>> dirname,t = test_helper("impossible")
- >>> try:
- ... mkdir(dirname, paranoid=True)
- ... except CantAddFile, e:
- ... print "Can't add file"
- Can't add file
- >>> shutil.rmtree(t)
- """
- os.mkdir(path)
- add_id(path, paranoid=paranoid)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-
-def path_in_reference(bug_dir, spec):
- if spec is not None:
- return invoke_client("file-find", bug_dir, spec).rstrip('\n')
- return invoke_client("file-find", bug_dir).rstrip('\n')
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using Arch"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, "{arch}")):
- return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.join('..', path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- pass
-
-def postcommit(directory):
- pass
-
-
-name = "Arch"
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/beuuid.py b/libbe/beuuid.py
new file mode 100644
index 0000000..e2435ea
--- /dev/null
+++ b/libbe/beuuid.py
@@ -0,0 +1,62 @@
+# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
+# <abentley@panoramicfeedback.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+Backwards compatibility support for Python 2.4. Once people give up
+on 2.4 ;), the uuid call should be merged into bugdir.py
+"""
+
+import unittest
+
+try:
+ from uuid import uuid4 # Python >= 2.5
+ def uuid_gen():
+ id = uuid4()
+ idstr = id.urn
+ start = "urn:uuid:"
+ assert idstr.startswith(start)
+ return idstr[len(start):]
+except ImportError:
+ import os
+ import sys
+ from subprocess import Popen, PIPE
+
+ def uuid_gen():
+ # Shell-out to system uuidgen
+ args = ['uuidgen', 'r']
+ try:
+ if sys.platform != "win32":
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ else:
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ shell=True, cwd=cwd)
+ except OSError, e :
+ strerror = "%s\nwhile executing %s" % (e.args[1], args)
+ raise OSError, strerror
+ output, error = q.communicate()
+ status = q.wait()
+ if status != 0:
+ strerror = "%s\nwhile executing %s" % (status, args)
+ raise Exception, strerror
+ return output.rstrip('\n')
+
+class UUIDtestCase(unittest.TestCase):
+ def testUUID_gen(self):
+ id = uuid_gen()
+ self.failUnless(len(id) == 36, "invalid UUID '%s'" % id)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(UUIDtestCase)
diff --git a/libbe/bug.py b/libbe/bug.py
new file mode 100644
index 0000000..c75c968
--- /dev/null
+++ b/libbe/bug.py
@@ -0,0 +1,351 @@
+# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
+# <abentley@panoramicfeedback.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import os
+import os.path
+import errno
+import time
+import doctest
+
+from beuuid import uuid_gen
+import mapfile
+import comment
+import utility
+
+
+### Define and describe valid bug categories
+# Use a tuple of (category, description) tuples since we don't have
+# ordered dicts in Python yet http://www.python.org/dev/peps/pep-0372/
+
+# in order of increasing severity
+severity_level_def = (
+ ("wishlist","A feature that could improve usefullness, but not a bug."),
+ ("minor","The standard bug level."),
+ ("serious","A bug that requires workarounds."),
+ ("critical","A bug that prevents some features from working at all."),
+ ("fatal","A bug that makes the package unusable."))
+
+# in order of increasing resolution
+# roughly following http://www.bugzilla.org/docs/3.2/en/html/lifecycle.html
+active_status_def = (
+ ("unconfirmed","A possible bug which lacks independent existance confirmation."),
+ ("open","A working bug that has not been assigned to a developer."),
+ ("assigned","A working bug that has been assigned to a developer."),
+ ("test","The code has been adjusted, but the fix is still being tested."))
+inactive_status_def = (
+ ("closed", "The bug is no longer relevant."),
+ ("fixed", "The bug should no longer occur."),
+ ("wontfix","It's not a bug, it's a feature."),
+ ("disabled", "?"))
+
+
+### Convert the description tuples to more useful formats
+
+severity_values = tuple([val for val,description in severity_level_def])
+severity_description = dict(severity_level_def)
+severity_index = {}
+for i in range(len(severity_values)):
+ severity_index[severity_values[i]] = i
+
+active_status_values = tuple(val for val,description in active_status_def)
+inactive_status_values = tuple(val for val,description in inactive_status_def)
+status_values = active_status_values + inactive_status_values
+status_description = dict(active_status_def+inactive_status_def)
+status_index = {}
+for i in range(len(status_values)):
+ status_index[status_values[i]] = i
+
+
+def checked_property(name, valid):
+ """
+ Provide access to an attribute name, testing for valid values.
+ """
+ def getter(self):
+ value = getattr(self, "_"+name)
+ if value not in valid:
+ raise InvalidValue(name, value)
+ return value
+
+ def setter(self, value):
+ if value not in valid:
+ raise InvalidValue(name, value)
+ return setattr(self, "_"+name, value)
+ return property(getter, setter)
+
+
+class Bug(object):
+ severity = checked_property("severity", severity_values)
+ status = checked_property("status", status_values)
+
+ def _get_active(self):
+ return self.status in active_status_values
+
+ active = property(_get_active)
+
+ def __init__(self, bugdir=None, uuid=None, from_disk=False,
+ load_comments=False, summary=None):
+ self.bugdir = bugdir
+ if bugdir != None:
+ self.rcs = bugdir.rcs
+ else:
+ self.rcs = None
+ if from_disk == True:
+ self._comments_loaded = False
+ self.uuid = uuid
+ self.load(load_comments=load_comments)
+ else:
+ # Note: defaults should match those in Bug.load()
+ self._comments_loaded = True
+ if uuid != None:
+ self.uuid = uuid
+ else:
+ self.uuid = uuid_gen()
+ self.summary = summary
+ if self.rcs != None:
+ self.creator = self.rcs.get_user_id()
+ else:
+ self.creator = None
+ self.target = None
+ self.status = "open"
+ self.severity = "minor"
+ self.assigned = None
+ self.time = int(time.time()) # only save to second precision
+ self.comment_root = comment.Comment(self, uuid=comment.INVALID_UUID)
+
+ def __repr__(self):
+ return "Bug(uuid=%r)" % self.uuid
+
+ def string(self, shortlist=False, show_comments=False):
+ if self.bugdir == None:
+ shortname = self.uuid
+ else:
+ shortname = self.bugdir.bug_shortname(self)
+ if shortlist == False:
+ if self.time == None:
+ timestring = ""
+ else:
+ htime = utility.handy_time(self.time)
+ ftime = utility.time_to_str(self.time)
+ timestring = "%s (%s)" % (htime, ftime)
+ info = [("ID", self.uuid),
+ ("Short name", shortname),
+ ("Severity", self.severity),
+ ("Status", self.status),
+ ("Assigned", self.assigned),
+ ("Target", self.target),
+ ("Creator", self.creator),
+ ("Created", timestring)]
+ newinfo = []
+ for k,v in info:
+ if v == None:
+ newinfo.append((k,""))
+ else:
+ newinfo.append((k,v))
+ info = newinfo
+ longest_key_len = max([len(k) for k,v in info])
+ infolines = [" %*s : %s\n" %(longest_key_len,k,v) for k,v in info]
+ bugout = "".join(infolines) + "%s" % self.summary.rstrip('\n')
+ else:
+ statuschar = self.status[0]
+ severitychar = self.severity[0]
+ chars = "%c%c" % (statuschar, severitychar)
+ bugout = "%s:%s: %s" % (shortname, chars, self.summary.rstrip('\n'))
+
+ if show_comments == True:
+ if self._comments_loaded == False:
+ self.load_comments()
+ comout = self.comment_root.string_thread(auto_name_map=True,
+ bug_shortname=shortname)
+ output = bugout + '\n' + comout.rstrip('\n')
+ else :
+ output = bugout
+ return output
+
+ def __str__(self):
+ return self.string(shortlist=True)
+
+ def __cmp__(self, other):
+ return cmp_full(self, other)
+
+ def get_path(self, name=None):
+ my_dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid)
+ if name is None:
+ return my_dir
+ assert name in ["values", "comments"]
+ return os.path.join(my_dir, name)
+
+ def load(self, load_comments=False):
+ map = mapfile.map_load(self.rcs, self.get_path("values"))
+ self.summary = map.get("summary")
+ self.creator = map.get("creator")
+ self.target = map.get("target")
+ self.status = map.get("status", "open")
+ self.severity = map.get("severity", "minor")
+ self.assigned = map.get("assigned")
+ self.time = map.get("time")
+ if self.time is not None:
+ self.time = utility.str_to_time(self.time)
+
+ if load_comments == True:
+ self.load_comments()
+
+ def load_comments(self):
+ self.comment_root = comment.loadComments(self)
+ self._comments_loaded = True
+
+ def comments(self):
+ if self._comments_loaded == False:
+ self.load_comments()
+ for comment in self.comment_root.traverse():
+ yield comment
+
+ def _add_attr(self, map, name):
+ value = getattr(self, name)
+ if value is not None:
+ map[name] = value
+
+ def save(self):
+ assert self.summary != None, "Can't save blank bug"
+ map = {}
+ self._add_attr(map, "assigned")
+ self._add_attr(map, "summary")
+ self._add_attr(map, "creator")
+ self._add_attr(map, "target")
+ self._add_attr(map, "status")
+ self._add_attr(map, "severity")
+ if self.time is not None:
+ map["time"] = utility.time_to_str(self.time)
+
+ self.rcs.mkdir(self.get_path())
+ path = self.get_path("values")
+ mapfile.map_save(self.rcs, path, map)
+
+ if self._comments_loaded:
+ if len(self.comment_root) > 0:
+ self.rcs.mkdir(self.get_path("comments"))
+ comment.saveComments(self)
+
+ def remove(self):
+ self.load_comments()
+ self.comment_root.remove()
+ path = self.get_path()
+ self.rcs.recursive_remove(path)
+
+ def new_comment(self, body=None):
+ comm = comment.comment_root.new_reply(body=body)
+ return comm
+
+ def comment_from_shortname(self, shortname, *args, **kwargs):
+ return self.comment_root.comment_from_shortname(shortname, *args, **kwargs)
+
+ def comment_from_uuid(self, uuid):
+ return self.comment_root.comment_from_uuid(uuid)
+
+
+# the general rule for bug sorting is that "more important" bugs are
+# less than "less important" bugs. This way sorting a list of bugs
+# will put the most important bugs first in the list. When relative
+# importance is unclear, the sorting follows some arbitrary convention
+# (i.e. dictionary order).
+
+def cmp_severity(bug_1, bug_2):
+ """
+ Compare the severity levels of two bugs, with more severe bugs
+ comparing as less.
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.severity = bugB.severity = "wishlist"
+ >>> cmp_severity(bugA, bugB) == 0
+ True
+ >>> bugB.severity = "minor"
+ >>> cmp_severity(bugA, bugB) > 0
+ True
+ >>> bugA.severity = "critical"
+ >>> cmp_severity(bugA, bugB) < 0
+ True
+ """
+ if not hasattr(bug_2, "severity") :
+ return 1
+ return -cmp(severity_index[bug_1.severity], severity_index[bug_2.severity])
+
+def cmp_status(bug_1, bug_2):
+ """
+ Compare the status levels of two bugs, with more 'open' bugs
+ comparing as less.
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.status = bugB.status = "open"
+ >>> cmp_status(bugA, bugB) == 0
+ True
+ >>> bugB.status = "closed"
+ >>> cmp_status(bugA, bugB) < 0
+ True
+ >>> bugA.status = "fixed"
+ >>> cmp_status(bugA, bugB) > 0
+ True
+ """
+ if not hasattr(bug_2, "status") :
+ return 1
+ val_2 = status_index[bug_2.status]
+ return cmp(status_index[bug_1.status], status_index[bug_2.status])
+
+def cmp_attr(bug_1, bug_2, attr, invert=False):
+ """
+ Compare a general attribute between two bugs using the conventional
+ comparison rule for that attribute type. If invert == True, sort
+ *against* that convention.
+ >>> attr="severity"
+ >>> bugA = Bug()
+ >>> bugB = Bug()
+ >>> bugA.severity = "critical"
+ >>> bugB.severity = "wishlist"
+ >>> cmp_attr(bugA, bugB, attr) < 0
+ True
+ >>> cmp_attr(bugA, bugB, attr, invert=True) > 0
+ True
+ >>> bugB.severity = "critical"
+ >>> cmp_attr(bugA, bugB, attr) == 0
+ True
+ """
+ if not hasattr(bug_2, attr) :
+ return 1
+ if invert == True :
+ return -cmp(getattr(bug_1, attr), getattr(bug_2, attr))
+ else :
+ return cmp(getattr(bug_1, attr), getattr(bug_2, attr))
+
+# alphabetical rankings (a < z)
+cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator")
+cmp_assigned = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "assigned")
+# chronological rankings (newer < older)
+cmp_time = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "time", invert=True)
+
+def cmp_full(bug_1, bug_2, cmp_list=(cmp_status,cmp_severity,cmp_assigned,
+ cmp_time,cmp_creator)):
+ for comparison in cmp_list :
+ val = comparison(bug_1, bug_2)
+ if val != 0 :
+ return val
+ return 0
+
+class InvalidValue(ValueError):
+ def __init__(self, name, value):
+ msg = "Cannot assign value %s to %s" % (value, name)
+ Exception.__init__(self, msg)
+ self.name = name
+ self.value = value
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index 427ed38..7e4cf3e 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -16,13 +16,17 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
import os.path
-import cmdutil
import errno
-import names
-import mapfile
import time
+import copy
+import unittest
+import doctest
+
+import mapfile
+import bug
import utility
-from rcs import rcs_by_name
+import rcs
+
class NoBugDir(Exception):
def __init__(self, path):
@@ -30,48 +34,6 @@ class NoBugDir(Exception):
Exception.__init__(self, msg)
self.path = path
-
-def iter_parent_dirs(cur_dir):
- cur_dir = os.path.realpath(cur_dir)
- old_dir = None
- while True:
- yield cur_dir
- old_dir = cur_dir
- cur_dir = os.path.normpath(os.path.join(cur_dir, '..'))
- if old_dir == cur_dir:
- break;
-
-
-def tree_root(dir, old_version=False):
- for rootdir in iter_parent_dirs(dir):
- versionfile=os.path.join(rootdir, ".be", "version")
- if os.path.exists(versionfile):
- if not old_version:
- test_version(versionfile)
- return BugDir(os.path.join(rootdir, ".be"))
- elif not os.path.exists(rootdir):
- raise NoRootEntry(rootdir)
- old_rootdir = rootdir
- rootdir=os.path.join('..', rootdir)
-
- raise NoBugDir(dir)
-
-class BadTreeVersion(Exception):
- def __init__(self, version):
- Exception.__init__(self, "Unsupported tree version: %s" % version)
- self.version = version
-
-def test_version(path):
- tree_version = file(path, "rb").read()
- if tree_version != TREE_VERSION_STRING:
- raise BadTreeVersion(tree_version)
-
-def set_version(path, rcs):
- rcs.set_file_contents(os.path.join(path, "version"), TREE_VERSION_STRING)
-
-
-TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
-
class NoRootEntry(Exception):
def __init__(self, path):
self.path = path
@@ -83,363 +45,456 @@ class AlreadyInitialized(Exception):
Exception.__init__(self,
"Specified root is already initialized: %s" % path)
-def create_bug_dir(path, rcs):
- """
- >>> import no_rcs, tests
- >>> create_bug_dir('/highly-unlikely-to-exist', no_rcs)
- Traceback (most recent call last):
- NoRootEntry: Specified root does not exist: /highly-unlikely-to-exist
- >>> test_dir = os.path.dirname(tests.bug_arch_dir().dir)
- >>> try:
- ... create_bug_dir(test_dir, no_rcs)
- ... except AlreadyInitialized, e:
- ... print "Already Initialized"
- Already Initialized
- """
- root = os.path.join(path, ".be")
- try:
- rcs.mkdir(root, paranoid=True)
- except OSError, e:
- if e.errno == errno.ENOENT:
- raise NoRootEntry(path)
- elif e.errno == errno.EEXIST:
- raise AlreadyInitialized(path)
- else:
- raise
- rcs.mkdir(os.path.join(root, "bugs"))
- set_version(root, rcs)
- map_save(rcs, os.path.join(root, "settings"), {"rcs_name": rcs.name})
- return BugDir(os.path.join(path, ".be"))
-
-
-def setting_property(name, valid=None):
- def getter(self):
- value = self.settings.get(name)
- if valid is not None:
- if value not in valid:
- raise InvalidValue(name, value)
- return value
-
- def setter(self, value):
- if valid is not None:
- if value not in valid and value is not None:
- raise InvalidValue(name, value)
- if value is None:
- del self.settings[name]
- else:
- self.settings[name] = value
- self.save_settings()
- return property(getter, setter)
-
-
-class BugDir:
- def __init__(self, dir):
- self.dir = dir
- self.bugs_path = os.path.join(self.dir, "bugs")
- try:
- self.settings = map_load(os.path.join(self.dir, "settings"))
- except NoSuchFile:
- self.settings = {"rcs_name": "None"}
-
- rcs_name = setting_property("rcs_name", ("None", "bzr", "git", "Arch", "hg"))
- _rcs = None
-
- target = setting_property("target")
-
- def save_settings(self):
- map_save(self.rcs, os.path.join(self.dir, "settings"), self.settings)
-
- def get_rcs(self):
- if self._rcs is not None and self.rcs_name == self._rcs.name:
- return self._rcs
- self._rcs = rcs_by_name(self.rcs_name)
- return self._rcs
-
- rcs = property(get_rcs)
-
- def get_reference_bugdir(self, spec):
- return BugDir(self.rcs.path_in_reference(self.dir, spec))
-
- def list(self):
- for uuid in self.list_uuids():
- yield self.get_bug(uuid)
-
- def bug_map(self):
- bugs = {}
- for bug in self.list():
- bugs[bug.uuid] = bug
- return bugs
-
- def get_bug(self, uuid):
- return Bug(self.bugs_path, uuid, self.rcs_name)
-
- def list_uuids(self):
- for uuid in os.listdir(self.bugs_path):
- if (uuid.startswith('.')):
- continue
- yield uuid
-
- def new_bug(self, uuid=None):
- if uuid is None:
- uuid = names.uuid()
- path = os.path.join(self.bugs_path, uuid)
- self.rcs.mkdir(path)
- bug = Bug(self.bugs_path, None, self.rcs_name)
- bug.uuid = uuid
- return bug
-
-class InvalidValue(Exception):
+class InvalidValue(ValueError):
def __init__(self, name, value):
msg = "Cannot assign value %s to %s" % (value, name)
Exception.__init__(self, msg)
self.name = name
self.value = value
+class MultipleBugMatches(ValueError):
+ def __init__(self, shortname, matches):
+ msg = ("More than one bug matches %s. "
+ "Please be more specific.\n%s" % shortname, matches)
+ ValueError.__init__(self, msg)
+ self.shortname = shortnamename
+ self.matches = matches
-def checked_property(name, valid):
- def getter(self):
- value = self.__getattribute__("_"+name)
- if value not in valid:
- raise InvalidValue(name, value)
- return value
- def setter(self, value):
- if value not in valid:
- raise InvalidValue(name, value)
- return self.__setattr__("_"+name, value)
- return property(getter, setter)
+TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n"
-severity_levels = ("wishlist", "minor", "serious", "critical", "fatal")
-active_status = ("open", "in-progress", "waiting", "new", "verified")
-inactive_status = ("closed", "disabled", "fixed", "wontfix", "waiting")
-severity_value = {}
-for i in range(len(severity_levels)):
- severity_value[severity_levels[i]] = i
+def setting_property(name, valid=None, doc=None):
+ def getter(self):
+ value = self.settings.get(name)
+ if valid is not None:
+ if value not in valid and value != None:
+ raise InvalidValue(name, value)
+ return value
+
+ def setter(self, value):
+ if value != getter(self):
+ if value is None:
+ del self.settings[name]
+ else:
+ self.settings[name] = value
+ self._save_settings(self.get_path("settings"), self.settings)
+
+ return property(getter, setter, doc=doc)
-class Bug(object):
- status = checked_property("status", (None,)+active_status+inactive_status)
- severity = checked_property("severity", (None, "wishlist", "minor",
- "serious", "critical", "fatal"))
- def __init__(self, path, uuid, rcs_name):
- self.path = path
- self.uuid = uuid
- if uuid is not None:
- dict = map_load(self.get_path("values"))
+class BugDir (list):
+ """
+ File-system access:
+ When rooted in non-bugdir directory, BugDirs live completely in
+ memory until the first call to .save(). This creates a '.be'
+ sub-directory containing configurations options, bugs, comments,
+ etc. Once this sub-directory has been created (possibly by
+ another BugDir instance) any changes to the BugDir in memory will
+ be flushed to the file system automatically. However, the BugDir
+ will only load information from the file system when it loads new
+ bugs/comments that it doesn't already have in memory, or when it
+ explicitly asked to do so (e.g. .load() or __init__(from_disk=True)).
+ """
+ def __init__(self, root=None, sink_to_existing_root=True,
+ assert_new_BugDir=False, allow_rcs_init=False,
+ from_disk=False, rcs=None):
+ list.__init__(self)
+ self._save_user_id = False
+ self.settings = {}
+ if root == None:
+ root = os.getcwd()
+ if sink_to_existing_root == True:
+ self.root = self._find_root(root)
else:
- dict = {}
-
- self.rcs_name = rcs_name
-
- self.summary = dict.get("summary")
- self.creator = dict.get("creator")
- self.target = dict.get("target")
- self.status = dict.get("status")
- self.severity = dict.get("severity")
- self.assigned = dict.get("assigned")
- self.time = dict.get("time")
- if self.time is not None:
- self.time = utility.str_to_time(self.time)
-
- def __repr__(self):
- return "Bug(uuid=%r)" % self.uuid
-
- def get_path(self, file):
- return os.path.join(self.path, self.uuid, file)
+ if not os.path.exists(root):
+ raise NoRootEntry(root)
+ self.root = root
+ if from_disk == True:
+ self.load()
+ else:
+ if assert_new_BugDir == True:
+ if os.path.exists(self.get_path()):
+ raise AlreadyInitialized, self.get_path()
+ if rcs == None:
+ rcs = self._guess_rcs(allow_rcs_init)
+ self.rcs = rcs
+ user_id = self.rcs.get_user_id()
+
+ def _find_root(self, path):
+ """
+ Search for an existing bug database dir and it's ancestors and
+ return a BugDir rooted there.
+ """
+ if not os.path.exists(path):
+ raise NoRootEntry(path)
+ versionfile = utility.search_parent_directories(path, os.path.join(".be", "version"))
+ if versionfile != None:
+ beroot = os.path.dirname(versionfile)
+ root = os.path.dirname(beroot)
+ return root
+ else:
+ beroot = utility.search_parent_directories(path, ".be")
+ if beroot == None:
+ raise NoBugDir(path)
+ return beroot
+
+ def get_version(self, path=None):
+ if self.rcs_name == None:
+ # Use a temporary RCS to check the version for the first time
+ RCS = rcs.rcs_by_name("None")
+ RCS.root(self.root)
+ else:
+ RCS = self.rcs
- def _get_active(self):
- return self.status in active_status
+ if path == None:
+ path = self.get_path("version")
+ tree_version = RCS.get_file_contents(path)
+ return tree_version
- active = property(_get_active)
+ def set_version(self):
+ self.rcs.set_file_contents(self.get_path("version"),
+ TREE_VERSION_STRING)
- def add_attr(self, map, name):
- value = self.__getattribute__(name)
- if value is not None:
- map[name] = value
+ rcs_name = setting_property("rcs_name",
+ ("None", "bzr", "git", "Arch", "hg"),
+ doc=
+"""The name of the current RCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .rcs instead, and
+.rcs_name will be automatically adjusted.""")
- def save(self):
- map = {}
- self.add_attr(map, "assigned")
- self.add_attr(map, "summary")
- self.add_attr(map, "creator")
- self.add_attr(map, "target")
- self.add_attr(map, "status")
- self.add_attr(map, "severity")
- if self.time is not None:
- map["time"] = utility.time_to_str(self.time)
- path = self.get_path("values")
- map_save(rcs_by_name(self.rcs_name), path, map)
+ _rcs = None
def _get_rcs(self):
- return rcs_by_name(self.rcs_name)
-
- rcs = property(_get_rcs)
-
- def new_comment(self):
- if not os.path.exists(self.get_path("comments")):
- self.rcs.mkdir(self.get_path("comments"))
- comm = Comment(None, self)
- comm.uuid = names.uuid()
- return comm
-
- def get_comment(self, uuid):
- return Comment(uuid, self)
-
- def iter_comment_ids(self):
- path = self.get_path("comments")
- if not os.path.isdir(path):
- return
- try:
- for uuid in os.listdir(path):
- if (uuid.startswith('.')):
- continue
- yield uuid
- except OSError, e:
- if e.errno != errno.ENOENT:
- raise
- return
+ return self._rcs
- def list_comments(self):
- comments = [Comment(id, self) for id in self.iter_comment_ids()]
- comments.sort(cmp_date)
- return comments
-
-def cmp_date(comm1, comm2):
- return cmp(comm1.date, comm2.date)
-
-def new_bug(dir, uuid=None):
- bug = dir.new_bug(uuid)
- bug.creator = names.creator()
- bug.severity = "minor"
- bug.status = "open"
- bug.time = time.time()
- return bug
-
-def new_comment(bug, body=None):
- comm = bug.new_comment()
- comm.From = names.creator()
- comm.date = time.time()
- comm.body = body
- return comm
-
-def add_headers(obj, map, names):
- map_names = {}
- for name in names:
- map_names[name] = pyname_to_header(name)
- add_attrs(obj, map, names, map_names)
-
-def add_attrs(obj, map, names, map_names=None):
- if map_names is None:
- map_names = {}
- for name in names:
- map_names[name] = name
-
- for name in names:
- value = obj.__getattribute__(name)
- if value is not None:
- map[map_names[name]] = value
-
-
-class Comment(object):
- def __init__(self, uuid, bug):
- object.__init__(self)
- self.uuid = uuid
- self.bug = bug
- if self.uuid is not None and self.bug is not None:
- mapfile = map_load(self.get_path("values"))
- self.date = utility.str_to_time(mapfile["Date"])
- self.From = mapfile["From"]
- self.in_reply_to = mapfile.get("In-reply-to")
- self.content_type = mapfile.get("Content-type", "text/plain")
- self.body = file(self.get_path("body")).read().decode("utf-8")
+ def _set_rcs(self, new_rcs):
+ if new_rcs == None:
+ new_rcs = rcs.rcs_by_name("None")
+ self._rcs = new_rcs
+ new_rcs.root(self.root)
+ self.rcs_name = new_rcs.name
+
+ rcs = property(_get_rcs, _set_rcs,
+ doc="A revision control system (RCS) instance")
+
+ _user_id = setting_property("user-id", doc=
+"""The user's prefered name. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .user_id instead,
+and ._user_id will be automatically adjusted. This setting is
+only saved if ._save_user_id == True""")
+
+ def _get_user_id(self):
+ if self._user_id == None and self.rcs != None:
+ self._user_id = self.rcs.get_user_id()
+ return self._user_id
+
+ def _set_user_id(self, user_id):
+ if self.rcs != None:
+ self.rcs.user_id = user_id
+ self._user_id = user_id
+
+ user_id = property(_get_user_id, _set_user_id, doc=
+"""The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Note
+that the Arch RCS backend *enforces* ids with this format.""")
+
+ target = setting_property("target",
+ doc="The current project development target")
+
+ def save_user_id(self, user_id=None):
+ if user_id == None:
+ user_id = self.user_id
+ self._save_user_id = True
+ self.user_id = user_id
+
+ def get_path(self, *args):
+ my_dir = os.path.join(self.root, ".be")
+ if len(args) == 0:
+ return my_dir
+ assert args[0] in ["version", "settings", "bugs"], str(args)
+ return os.path.join(my_dir, *args)
+
+ def _guess_rcs(self, allow_rcs_init=False):
+ deepdir = self.get_path()
+ if not os.path.exists(deepdir):
+ deepdir = os.path.dirname(deepdir)
+ new_rcs = rcs.detect_rcs(deepdir)
+ install = False
+ if new_rcs.name == "None":
+ if allow_rcs_init == True:
+ new_rcs = rcs.installed_rcs()
+ new_rcs.init(self.root)
+ self.rcs = new_rcs
+ return new_rcs
+
+ def load(self):
+ version = self.get_version()
+ if version != TREE_VERSION_STRING:
+ raise NotImplementedError, \
+ "BugDir cannot handle version '%s' yet." % version
else:
- self.date = None
- self.From = None
- self.in_reply_to = None
- self.content_type = "text/plain"
- self.body = None
-
- def save(self):
- map_file = {"Date": utility.time_to_str(self.date)}
- add_headers(self, map_file, ("From", "in_reply_to", "content_type"))
- if not os.path.exists(self.get_path(None)):
- self.bug.rcs.mkdir(self.get_path(None))
- map_save(self.bug.rcs, self.get_path("values"), map_file)
- self.bug.rcs.set_file_contents(self.get_path("body"),
- self.body.encode('utf-8'))
+ if not os.path.exists(self.get_path()):
+ raise NoBugDir(self.get_path())
+ self.settings = self._get_settings(self.get_path("settings"))
+ self.rcs = rcs.rcs_by_name(self.rcs_name)
+ if self._user_id != None: # was a user name in the settings file
+ self.save_user_id()
+
+ self._bug_map_gen()
- def get_path(self, name):
- my_dir = os.path.join(self.bug.get_path("comments"), self.uuid)
- if name is None:
- return my_dir
- return os.path.join(my_dir, name)
-
-
-def thread_comments(comments):
- child_map = {}
- top_comments = []
- for comment in comments:
- child_map[comment.uuid] = []
- for comment in comments:
- if comment.in_reply_to is None or comment.in_reply_to not in child_map:
- top_comments.append(comment)
- continue
- child_map[comment.in_reply_to].append(comment)
-
- def recurse_children(comment):
- child_list = []
- for child in child_map[comment.uuid]:
- child_list.append(recurse_children(child))
- return (comment, child_list)
- return [recurse_children(c) for c in top_comments]
-
-
-def pyname_to_header(name):
- return name.capitalize().replace('_', '-')
-
-
-def map_save(rcs, path, map):
- """Save the map as a mapfile to the specified path"""
- add = not os.path.exists(path)
- output = file(path, "wb")
- mapfile.generate(output, map)
- if add:
- rcs.add_id(path)
-
-class NoSuchFile(Exception):
- def __init__(self, pathname):
- Exception.__init__(self, "No such file: %s" % pathname)
-
-
-def map_load(path):
- try:
- return mapfile.parse(file(path, "rb"))
- except IOError, e:
- if e.errno != errno.ENOENT:
- raise e
- raise NoSuchFile(path)
+ def load_all_bugs(self):
+ "Warning: this could take a while."
+ self._clear_bugs()
+ for uuid in self.list_uuids():
+ self._load_bug(uuid)
+ def save(self):
+ self.rcs.mkdir(self.get_path())
+ self.set_version()
+ self._save_settings(self.get_path("settings"), self.settings)
+ self.rcs.mkdir(self.get_path("bugs"))
+ for bug in self:
+ bug.save()
+
+ def _get_settings(self, settings_path):
+ if self.rcs_name == None:
+ # Use a temporary RCS to loading settings the first time
+ RCS = rcs.rcs_by_name("None")
+ RCS.root(self.root)
+ else:
+ RCS = self.rcs
+
+ allow_no_rcs = not RCS.path_in_root(settings_path)
+ # allow_no_rcs=True should only be for the special case of
+ # configuring duplicate bugdir settings
+
+ try:
+ settings = mapfile.map_load(RCS, settings_path, allow_no_rcs)
+ except rcs.NoSuchFile:
+ settings = {"rcs_name": "None"}
+ return settings
+
+ def _save_settings(self, settings_path, settings):
+ this_dir_path = os.path.realpath(self.get_path("settings"))
+ if os.path.realpath(settings_path) == this_dir_path:
+ if not os.path.exists(self.get_path()):
+ # don't save settings until the bug directory has been
+ # initialized. this initialization happens the first time
+ # a bug directory is saved (BugDir.save()). If the user
+ # is just working with a BugDir in memory, we don't want
+ # to go cluttering up his file system with settings files.
+ return
+ if self._save_user_id == False:
+ if "user-id" in settings:
+ settings = copy.copy(settings)
+ del settings["user-id"]
+ allow_no_rcs = not self.rcs.path_in_root(settings_path)
+ # allow_no_rcs=True should only be for the special case of
+ # configuring duplicate bugdir settings
+ mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
+
+ def duplicate_bugdir(self, revision):
+ duplicate_path = self.rcs.duplicate_repo(revision)
+
+ # setup revision RCS as None, since the duplicate may not be
+ # initialized for versioning
+ duplicate_settings_path = os.path.join(duplicate_path,
+ ".be", "settings")
+ duplicate_settings = self._get_settings(duplicate_settings_path)
+ if "rcs_name" in duplicate_settings:
+ duplicate_settings["rcs_name"] = "None"
+ duplicate_settings["user-id"] = self.user_id
+ self._save_settings(duplicate_settings_path, duplicate_settings)
+
+ return BugDir(duplicate_path, from_disk=True)
+
+ def remove_duplicate_bugdir(self):
+ self.rcs.remove_duplicate_repo()
+
+ def _bug_map_gen(self):
+ map = {}
+ for bug in self:
+ map[bug.uuid] = bug
+ for uuid in self.list_uuids():
+ if uuid not in map:
+ map[uuid] = None
+ self._bug_map = map
-class MockBug:
- def __init__(self, severity):
- self.severity = severity
+ def list_uuids(self):
+ uuids = []
+ if os.path.exists(self.get_path()):
+ # list the uuids on disk
+ for uuid in os.listdir(self.get_path("bugs")):
+ if not (uuid.startswith('.')):
+ uuids.append(uuid)
+ yield uuid
+ # and the ones that are still just in memory
+ for bug in self:
+ if bug.uuid not in uuids:
+ uuids.append(bug.uuid)
+ yield bug.uuid
+
+ def _clear_bugs(self):
+ while len(self) > 0:
+ self.pop()
+
+ def _load_bug(self, uuid):
+ bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True)
+ self.append(bg)
+ self._bug_map_gen()
+ return bg
+
+ def new_bug(self, uuid=None, summary=None):
+ bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary)
+ self.append(bg)
+ self._bug_map_gen()
+ return bg
+
+ def remove_bug(self, bug):
+ self.remove(bug)
+ bug.remove()
+
+ def bug_shortname(self, bug):
+ """
+ Generate short names from uuids. Picks the minimum number of
+ characters (>=3) from the beginning of the uuid such that the
+ short names are unique.
+
+ Obviously, as the number of bugs in the database grows, these
+ short names will cease to be unique. The complete uuid should be
+ used for long term reference.
+ """
+ chars = 3
+ for uuid in self._bug_map.keys():
+ if bug.uuid == uuid:
+ continue
+ while (bug.uuid[:chars] == uuid[:chars]):
+ chars+=1
+ return bug.uuid[:chars]
+
+ def bug_from_shortname(self, shortname):
+ """
+ >>> bd = simple_bug_dir()
+ >>> bug_a = bd.bug_from_shortname('a')
+ >>> print type(bug_a)
+ <class 'libbe.bug.Bug'>
+ >>> print bug_a
+ a:om: Bug A
+ """
+ matches = []
+ self._bug_map_gen()
+ for uuid in self._bug_map.keys():
+ if uuid.startswith(shortname):
+ matches.append(uuid)
+ if len(matches) > 1:
+ raise MultipleBugMatches(shortname, matches)
+ if len(matches) == 1:
+ return self.bug_from_uuid(matches[0])
+ raise KeyError("No bug matches %s" % shortname)
+
+ def bug_from_uuid(self, uuid):
+ if not self.has_bug(uuid):
+ raise KeyError("No bug matches %s\n bug map: %s\n root: %s" \
+ % (uuid, self._bug_map, self.root))
+ if self._bug_map[uuid] == None:
+ self._load_bug(uuid)
+ return self._bug_map[uuid]
+
+ def has_bug(self, bug_uuid):
+ if bug_uuid not in self._bug_map:
+ self._bug_map_gen()
+ if bug_uuid not in self._bug_map:
+ return False
+ return True
+
-def cmp_severity(bug_1, bug_2):
+def simple_bug_dir():
"""
- Compare the severity levels of two bugs, with more sever bugs comparing
- as less.
-
- >>> cmp_severity(MockBug(None), MockBug(None))
- 0
- >>> cmp_severity(MockBug("wishlist"), MockBug(None)) < 0
- True
- >>> cmp_severity(MockBug(None), MockBug("wishlist")) > 0
- True
- >>> cmp_severity(MockBug("critical"), MockBug("wishlist")) < 0
- True
+ For testing
+ >>> bugdir = simple_bug_dir()
+ >>> ls = list(bugdir.list_uuids())
+ >>> ls.sort()
+ >>> print ls
+ ['a', 'b']
"""
- val_1 = severity_value.get(bug_1.severity)
- val_2 = severity_value.get(bug_2.severity)
- return -cmp(val_1, val_2)
+ dir = utility.Dir()
+ assert os.path.exists(dir.path)
+ bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True)
+ bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir.
+ bug_a = bugdir.new_bug("a", summary="Bug A")
+ bug_a.creator = "John Doe <jdoe@example.com>"
+ bug_a.time = 0
+ bug_b = bugdir.new_bug("b", summary="Bug B")
+ bug_b.creator = "Jane Doe <jdoe@example.com>"
+ bug_b.time = 0
+ bug_b.status = "closed"
+ bugdir.save()
+ return bugdir
+
+
+class BugDirTestCase(unittest.TestCase):
+ def __init__(self, *args, **kwargs):
+ unittest.TestCase.__init__(self, *args, **kwargs)
+ def setUp(self):
+ self.dir = utility.Dir()
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+ allow_rcs_init=True)
+ self.rcs = self.bugdir.rcs
+ def tearDown(self):
+ self.rcs.cleanup()
+ self.dir.cleanup()
+ def fullPath(self, path):
+ return os.path.join(self.dir.path, path)
+ def assertPathExists(self, path):
+ fullpath = self.fullPath(path)
+ self.failUnless(os.path.exists(fullpath)==True,
+ "path %s does not exist" % fullpath)
+ self.assertRaises(AlreadyInitialized, BugDir,
+ self.dir.path, assertNewBugDir=True)
+ def versionTest(self):
+ if self.rcs.versioned == False:
+ return
+ original = self.bugdir.rcs.commit("Began versioning")
+ bugA = self.bugdir.bug_from_uuid("a")
+ bugA.status = "fixed"
+ self.bugdir.save()
+ new = self.rcs.commit("Fixed bug a")
+ dupdir = self.bugdir.duplicate_bugdir(original)
+ self.failUnless(dupdir.root != self.bugdir.root,
+ "%s, %s" % (dupdir.root, self.bugdir.root))
+ bugAorig = dupdir.bug_from_uuid("a")
+ self.failUnless(bugA != bugAorig,
+ "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+ bugAorig.status = "fixed"
+ self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.status, bugAorig.status))
+ self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.severity, bugAorig.severity))
+ self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.assigned, bugAorig.assigned))
+ self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.time, bugAorig.time))
+ self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.creator, bugAorig.creator))
+ self.failUnless(bugA == bugAorig,
+ "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+ self.bugdir.remove_duplicate_bugdir()
+ self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root))
+ def testRun(self):
+ self.bugdir.new_bug(uuid="a", summary="Ant")
+ self.bugdir.new_bug(uuid="b", summary="Cockroach")
+ self.bugdir.new_bug(uuid="c", summary="Praying mantis")
+ length = len(self.bugdir)
+ self.failUnless(length == 3, "%d != 3 bugs" % length)
+ uuids = list(self.bugdir.list_uuids())
+ self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids))
+ self.failUnless(uuids == ["a","b","c"], str(uuids))
+ bugA = self.bugdir.bug_from_uuid("a")
+ bugAprime = self.bugdir.bug_from_shortname("a")
+ self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime))
+ self.bugdir.save()
+ self.versionTest()
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
+suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()])
diff --git a/libbe/bzr.py b/libbe/bzr.py
index ddda334..a0ae715 100644
--- a/libbe/bzr.py
+++ b/libbe/bzr.py
@@ -15,114 +15,84 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import tempfile
-
-import config
-from rcs import invoke, CommandError
-
-def invoke_client(*args, **kwargs):
- directory = kwargs['directory']
- expect = kwargs.get('expect', (0, 1))
- cl_args = ["bzr"]
- cl_args.extend(args)
- status,output,error = invoke(cl_args, expect, cwd=directory)
- return status, output
-
-def add_id(filename, paranoid=False):
- invoke_client("add", filename, directory='.')
-
-def delete_id(filename):
- invoke_client("remove", filename, directory='.')
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
- add_id(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def lookup_revision(revno, directory):
- return invoke_client("lookup-revision", str(revno),
- directory=directory)[1].rstrip('\n')
-
-def export(revno, directory, revision_dir):
- invoke_client("export", "-r", str(revno), revision_dir, directory=directory)
-
-def find_or_make_export(revno, directory):
- revision_id = lookup_revision(revno, directory)
- home = os.path.expanduser("~")
- revision_root = os.path.join(home, ".bzrrevs")
- if not os.path.exists(revision_root):
- os.mkdir(revision_root)
- revision_dir = os.path.join(revision_root, revision_id)
- if not os.path.exists(revision_dir):
- export(revno, directory, revision_dir)
- return revision_dir
-
-def bzr_root(path):
- return invoke_client("root", path, directory=None)[1].rstrip('\r')
-
-def path_in_reference(bug_dir, spec):
- if spec is None:
- spec = int(invoke_client("revno", directory=bug_dir)[1])
- rel_bug_dir = bug_dir[len(bzr_root(bug_dir)):]
- export_root = find_or_make_export(spec, directory=bug_dir)
- return os.path.join(export_root, rel_bug_dir)
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using bzr"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, ".bzr")):
+import re
+import unittest
+import doctest
+
+from rcs import RCS, RCStestCase, CommandError
+
+def new():
+ return Bzr()
+
+class Bzr(RCS):
+ name = "bzr"
+ client = "bzr"
+ versioned = True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ if self._u_search_parent_directories(path, ".bzr") != None :
return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.dirname(path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- if body is not None:
- summary += '\n' + body
- descriptor, filename = tempfile.mkstemp()
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.close()
- invoke_client('commit', '--unchanged', '--file', filename,
- directory=directory)
- finally:
- os.unlink(filename)
-
-def postcommit(directory):
- try:
- invoke_client('merge', directory=directory)
- except CommandError, e:
- if ('No merge branch known or specified' in e.err_str or
- 'No merge location known or specified' in e.err_str):
- pass
+ return False
+ def _rcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ status,output,error = self._u_invoke_client("root", path)
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("whoami")
+ return output.rstrip('\n')
+ def _rcs_set_user_id(self, value):
+ self._u_invoke_client("whoami", value)
+ def _rcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ # --force to also remove unversioned files.
+ self._u_invoke_client("remove", "--force", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(os.path.join(self.rootdir, path), "rb").read()
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
else:
- status = invoke_client('revert', '--no-backup',
+ self._u_invoke_client("branch", "--revision", revision,
+ ".", directory)
+ def _rcs_commit(self, commitfile):
+ status,output,error = self._u_invoke_client("commit", "--unchanged",
+ "--file", commitfile)
+ revision = None
+ revline = re.compile("Committed revision (.*)[.]")
+ match = revline.search(error)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+ def postcommit(self):
+ try:
+ self._u_invoke_client('merge')
+ except CommandError, e:
+ if ('No merge branch known or specified' in e.err_str or
+ 'No merge location known or specified' in e.err_str):
+ pass
+ else:
+ self._u_invoke_client('revert', '--no-backup',
directory=directory)
- status = invoke_client('resolve', '--all', directory=directory)
- raise
- if len(invoke_client('status', directory=directory)[1]) > 0:
- commit(directory, 'Merge from upstream')
-
-name = "bzr"
+ self._u_invoke_client('resolve', '--all', directory=directory)
+ raise
+ if len(self._u_invoke_client('status', directory=directory)[1]) > 0:
+ self.commit('Merge from upstream')
+
+class BzrTestCase(RCStestCase):
+ Class = Bzr
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(BzrTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/cmdutil.py b/libbe/cmdutil.py
index 079601e..6d7ab01 100644
--- a/libbe/cmdutil.py
+++ b/libbe/cmdutil.py
@@ -14,25 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import bugdir
-import plugin
-import locale
-import os
import optparse
+import os
+import locale
from textwrap import TextWrapper
from StringIO import StringIO
-import utility
+import doctest
-def unique_name(bug, bugs):
- chars = 1
- for some_bug in bugs:
- if bug.uuid == some_bug.uuid:
- continue
- while (bug.uuid[:chars] == some_bug.uuid[:chars]):
- chars+=1
- if chars < 3:
- chars = 3
- return bug.uuid[:chars]
+import bugdir
+import plugin
+import utility
class UserError(Exception):
def __init__(self, msg):
@@ -43,45 +34,6 @@ class UserErrorWrap(UserError):
UserError.__init__(self, str(exception))
self.exception = exception
-def get_bug(spec, bug_dir=None):
- matches = []
- try:
- if bug_dir is None:
- bug_dir = bugdir.tree_root('.')
- except bugdir.NoBugDir, e:
- raise UserErrorWrap(e)
- bugs = list(bug_dir.list())
- for bug in bugs:
- if bug.uuid.startswith(spec):
- matches.append(bug)
- if len(matches) > 1:
- raise UserError("More than one bug matches %s. Please be more"
- " specific." % spec)
- if len(matches) == 1:
- return matches[0]
-
- matches = []
- if len(matches) == 0:
- raise UserError("No bug matches %s" % spec)
- return matches[0]
-
-def bug_summary(bug, bugs, no_target=False, shortlist=False):
- target = bug.target
- if target is None or no_target:
- target = ""
- else:
- target = " Target: %s" % target
- if bug.assigned is None:
- assigned = ""
- else:
- assigned = " Assigned: %s" % bug.assigned
- if shortlist == False:
- return " ID: %s\n Severity: %s\n%s%s\n Creator: %s \n%s\n" % \
- (unique_name(bug, bugs), bug.severity, assigned, target,
- bug.creator, bug.summary)
- else:
- return "%4s: %s\n" % (unique_name(bug, bugs), bug.summary)
-
def iter_commands():
for name, module in plugin.iter_plugins("becommands"):
yield name.replace("_", "-"), module
@@ -104,9 +56,20 @@ def execute(cmd, args):
encoding = locale.getpreferredencoding() or 'ascii'
return get_command(cmd).execute([a.decode(encoding) for a in args])
-def help(cmd):
- return get_command(cmd).help()
-
+def help(cmd=None):
+ if cmd != None:
+ return get_command(cmd).help()
+ else:
+ cmdlist = []
+ for name, module in iter_commands():
+ cmdlist.append((name, module.__desc__))
+ longest_cmd_len = max([len(name) for name,desc in cmdlist])
+ ret = ["Bugs Everywhere - Distributed bug tracking\n",
+ "Supported commands"]
+ for name, desc in cmdlist:
+ numExtraSpaces = longest_cmd_len-len(name)
+ ret.append("be %s%*s %s" % (name, numExtraSpaces, "", desc))
+ return "\n".join(ret)
class GetHelp(Exception):
pass
@@ -119,34 +82,6 @@ class UsageError(Exception):
def raise_get_help(option, opt, value, parser):
raise GetHelp
-
-def iter_comment_name(bug, unique_name):
- """Iterate through id, comment pairs, in date order.
- (This is a user-friendly id, not the comment uuid)
- """
- def key(comment):
- return comment.date
- for num, comment in enumerate(sorted(bug.list_comments(), key=key)):
- yield ("%s:%d" % (unique_name, num+1), comment)
-
-
-def comment_from_name(bug, unique_name, name):
- """Use a comment name to look up a comment"""
- for cur_name, comment in iter_comment_name(bug, unique_name):
- if name == cur_name:
- return comment
- raise KeyError(name)
-
-
-def get_bug_and_comment(identifier, bug_dir=None):
- ids = identifier.split(':')
- bug = get_bug(ids[0], bug_dir)
- if len(ids) == 2:
- comment = comment_from_name(bug, ids[0], identifier)
- else:
- comment = None
- return bug, comment
-
class CmdOptionParser(optparse.OptionParser):
def __init__(self, usage):
@@ -163,10 +98,9 @@ class CmdOptionParser(optparse.OptionParser):
self._long_opt.iterkeys()])
def help_str(self):
- fs = utility.FileString()
- self.print_help(fs)
- return fs.str
-
+ f = StringIO()
+ self.print_help(f)
+ return f.getvalue()
def underlined(instring):
"""Produces a version of a string that is underlined with '='
@@ -178,53 +112,6 @@ def underlined(instring):
return "%s\n%s" % (instring, "="*len(instring))
-def print_threaded_comments(comments, name_map, indent=""):
- """Print a threaded display of comments"""
- tw = TextWrapper(initial_indent = indent, subsequent_indent = indent,
- width=80)
- for comment, children in comments:
- s = StringIO()
- print >> s, "--------- Comment ---------"
- print >> s, "Name: %s" % name_map[comment.uuid]
- print >> s, "From: %s" % comment.From
- print >> s, "Date: %s\n" % utility.time_to_str(comment.date)
- print >> s, comment.body.rstrip('\n')
-
- s.seek(0)
- for line in s:
- print tw.fill(line).rstrip('\n')
- print_threaded_comments(children, name_map, indent=indent+" ")
-
-
-def bug_tree(dir=None):
- """Retrieve the bug tree specified by the user. If no directory is
- specified, the current working directory is used.
-
- :param dir: The directory to search for the bug tree in.
-
- >>> bug_tree() is not None
- True
- >>> bug_tree("/")
- Traceback (most recent call last):
- UserErrorWrap: The directory "/" has no bug directory.
- """
- if dir is None:
- dir = os.getcwd()
- try:
- return bugdir.tree_root(dir)
- except bugdir.NoBugDir, e:
- raise UserErrorWrap(e)
-
-def print_command_list():
- cmdlist = []
- print """Bugs Everywhere - Distributed bug tracking
-
-Supported commands"""
- for name, module in iter_commands():
- cmdlist.append((name, module.__doc__))
- for name, desc in cmdlist:
- print "be %s\n %s" % (name, desc)
-
def _test():
import doctest
import sys
@@ -232,3 +119,5 @@ def _test():
if __name__ == "__main__":
_test()
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/comment.py b/libbe/comment.py
new file mode 100644
index 0000000..c89fd9d
--- /dev/null
+++ b/libbe/comment.py
@@ -0,0 +1,386 @@
+# Bugs Everywhere, a distributed bugtracker
+# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
+# <abentley@panoramicfeedback.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301, USA
+import os
+import os.path
+import time
+import textwrap
+import doctest
+
+from beuuid import uuid_gen
+import mapfile
+from tree import Tree
+import utility
+
+INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!"
+
+def _list_to_root(comments, bug):
+ """
+ Convert a raw list of comments to single (dummy) root comment. We
+ use a dummy root comment, because there can be several comment
+ threads rooted on the same parent bug. To simplify comment
+ interaction, we condense these threads into a single thread with a
+ Comment dummy root.
+
+ No Comment method should use the dummy comment.
+ """
+ root_comments = []
+ uuid_map = {}
+ for comment in comments:
+ assert comment.uuid != None
+ uuid_map[comment.uuid] = comment
+ for comm in comments:
+ if comm.in_reply_to == None:
+ root_comments.append(comm)
+ else:
+ parentUUID = comm.in_reply_to
+ parent = uuid_map[parentUUID]
+ parent.add_reply(comm)
+ dummy_root = Comment(bug, uuid=INVALID_UUID)
+ dummy_root.extend(root_comments)
+ return dummy_root
+
+def loadComments(bug):
+ path = bug.get_path("comments")
+ if not os.path.isdir(path):
+ return Comment(bug, uuid=INVALID_UUID)
+ comments = []
+ for uuid in os.listdir(path):
+ if uuid.startswith('.'):
+ continue
+ comm = Comment(bug, uuid, from_disk=True)
+ comments.append(comm)
+ return _list_to_root(comments, bug)
+
+def saveComments(bug):
+ path = bug.get_path("comments")
+ bug.rcs.mkdir(path)
+ for comment in bug.comment_root.traverse():
+ comment.save()
+
+class Comment(Tree):
+ def __init__(self, bug=None, uuid=None, from_disk=False,
+ in_reply_to=None, body=None):
+ """
+ Set from_disk=True to load an old bug.
+ Set from_disk=False to create a new bug.
+
+ The uuid option is required when from_disk==True.
+
+ The in_reply_to and body options are only used if
+ from_disk==False (the default). When from_disk==True, they are
+ loaded from the bug database.
+
+ in_reply_to should be the uuid string of the parent comment.
+ """
+ Tree.__init__(self)
+ self.bug = bug
+ if bug != None:
+ self.rcs = bug.rcs
+ else:
+ self.rcs = None
+ if from_disk == True:
+ self.uuid = uuid
+ self.load()
+ else:
+ if uuid != None:
+ self.uuid = uuid
+ else:
+ self.uuid = uuid_gen()
+ self.time = time.time()
+ if self.rcs != None:
+ self.From = self.rcs.get_user_id()
+ else:
+ self.From = None
+ self.in_reply_to = in_reply_to
+ self.content_type = "text/plain"
+ self.body = body
+
+ def traverse(self, *args, **kwargs):
+ """Avoid working with the possible dummy root comment"""
+ for comment in Tree.traverse(self, *args, **kwargs):
+ if comment.uuid == INVALID_UUID:
+ continue
+ yield comment
+
+ def _clean_string(self, value):
+ """
+ >>> comm = Comment()
+ >>> comm._clean_string(None)
+ ''
+ >>> comm._clean_string("abc")
+ 'abc'
+ """
+ if value == None:
+ return ""
+ return value
+
+ def string(self, indent=0, shortname=None):
+ """
+ >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n")
+ >>> comm.time = utility.str_to_time("Thu, 20 Nov 2008 15:55:11 +0000")
+ >>> print comm.string(indent=2, shortname="com-1")
+ --------- Comment ---------
+ Name: com-1
+ From:
+ Date: Thu, 20 Nov 2008 15:55:11 +0000
+ <BLANKLINE>
+ Some
+ insightful
+ remarks
+ """
+ if shortname == None:
+ shortname = self.uuid
+ lines = []
+ lines.append("--------- Comment ---------")
+ lines.append("Name: %s" % shortname)
+ lines.append("From: %s" % self._clean_string(self.From))
+ lines.append("Date: %s" % utility.time_to_str(self.time))
+ lines.append("")
+ #lines.append(textwrap.fill(self._clean_string(self.body),
+ # width=(79-indent)))
+ lines.extend(self._clean_string(self.body).splitlines())
+ # some comments shouldn't be wrapped...
+
+ istring = ' '*indent
+ sep = '\n' + istring
+ return istring + sep.join(lines).rstrip('\n')
+
+ def __str__(self):
+ """
+ >>> comm = Comment(bug=None, body="Some insightful remarks")
+ >>> comm.uuid = "com-1"
+ >>> comm.time = utility.str_to_time("Thu, 20 Nov 2008 15:55:11 +0000")
+ >>> comm.From = "Jane Doe <jdoe@example.com>"
+ >>> print comm
+ --------- Comment ---------
+ Name: com-1
+ From: Jane Doe <jdoe@example.com>
+ Date: Thu, 20 Nov 2008 15:55:11 +0000
+ <BLANKLINE>
+ Some insightful remarks
+ """
+ return self.string()
+
+ def get_path(self, name=None):
+ my_dir = os.path.join(self.bug.get_path("comments"), self.uuid)
+ if name is None:
+ return my_dir
+ assert name in ["values", "body"]
+ return os.path.join(my_dir, name)
+
+ def load(self):
+ map = mapfile.map_load(self.rcs, self.get_path("values"))
+ self.time = utility.str_to_time(map["Date"])
+ self.From = map["From"]
+ self.in_reply_to = map.get("In-reply-to")
+ self.content_type = map.get("Content-type", "text/plain")
+ self.body = self.rcs.get_file_contents(self.get_path("body"))
+
+ def save(self):
+ assert self.rcs != None
+ map_file = {"Date": utility.time_to_str(self.time)}
+ self._add_headers(map_file, ("From", "in_reply_to", "content_type"))
+ self.rcs.mkdir(self.get_path())
+ mapfile.map_save(self.rcs, self.get_path("values"), map_file)
+ self.rcs.set_file_contents(self.get_path("body"), self.body)
+
+ def _add_headers(self, map, names):
+ map_names = {}
+ for name in names:
+ map_names[name] = self._pyname_to_header(name)
+ self._add_attrs(map, map_names)
+
+ def _pyname_to_header(self, name):
+ return name.capitalize().replace('_', '-')
+
+ def _add_attrs(self, map, map_names):
+ for name in map_names.keys():
+ value = getattr(self, name)
+ if value is not None:
+ map[map_names[name]] = value
+
+ def remove(self):
+ for comment in self.traverse():
+ path = comment.get_path()
+ self.rcs.recursive_remove(path)
+
+ def add_reply(self, reply):
+ if reply.time != None and self.time != None:
+ assert reply.time >= self.time
+ if self.uuid != INVALID_UUID:
+ reply.in_reply_to = self.uuid
+ self.append(reply)
+
+ def new_reply(self, body=None):
+ """
+ >>> comm = Comment(bug=None, body="Some insightful remarks")
+ >>> repA = comm.new_reply("Critique original comment")
+ >>> repB = repA.new_reply("Begin flamewar :p")
+ """
+ reply = Comment(self.bug, body=body)
+ self.add_reply(reply)
+ return reply
+
+ def string_thread(self, name_map={}, indent=0,
+ auto_name_map=False, bug_shortname=None):
+ """
+ Return a sting displaying a thread of comments.
+ bug_shortname is only used if auto_name_map == True.
+
+ >>> a = Comment(bug=None, uuid="a", body="Insightful remarks")
+ >>> a.time = utility.str_to_time("Thu, 20 Nov 2008 01:00:00 +0000")
+ >>> b = a.new_reply("Critique original comment")
+ >>> b.uuid = "b"
+ >>> b.time = utility.str_to_time("Thu, 20 Nov 2008 02:00:00 +0000")
+ >>> c = b.new_reply("Begin flamewar :p")
+ >>> c.uuid = "c"
+ >>> c.time = utility.str_to_time("Thu, 20 Nov 2008 03:00:00 +0000")
+ >>> d = a.new_reply("Useful examples")
+ >>> d.uuid = "d"
+ >>> d.time = utility.str_to_time("Thu, 20 Nov 2008 04:00:00 +0000")
+ >>> a.sort(key=lambda comm : comm.time)
+ >>> print a.string_thread()
+ --------- Comment ---------
+ Name: a
+ From:
+ Date: Thu, 20 Nov 2008 01:00:00 +0000
+ <BLANKLINE>
+ Insightful remarks
+ --------- Comment ---------
+ Name: b
+ From:
+ Date: Thu, 20 Nov 2008 02:00:00 +0000
+ <BLANKLINE>
+ Critique original comment
+ --------- Comment ---------
+ Name: c
+ From:
+ Date: Thu, 20 Nov 2008 03:00:00 +0000
+ <BLANKLINE>
+ Begin flamewar :p
+ --------- Comment ---------
+ Name: d
+ From:
+ Date: Thu, 20 Nov 2008 04:00:00 +0000
+ <BLANKLINE>
+ Useful examples
+ >>> print a.string_thread(auto_name_map=True, bug_shortname="bug-1")
+ --------- Comment ---------
+ Name: bug-1:1
+ From:
+ Date: Thu, 20 Nov 2008 01:00:00 +0000
+ <BLANKLINE>
+ Insightful remarks
+ --------- Comment ---------
+ Name: bug-1:2
+ From:
+ Date: Thu, 20 Nov 2008 02:00:00 +0000
+ <BLANKLINE>
+ Critique original comment
+ --------- Comment ---------
+ Name: bug-1:3
+ From:
+ Date: Thu, 20 Nov 2008 03:00:00 +0000
+ <BLANKLINE>
+ Begin flamewar :p
+ --------- Comment ---------
+ Name: bug-1:4
+ From:
+ Date: Thu, 20 Nov 2008 04:00:00 +0000
+ <BLANKLINE>
+ Useful examples
+ """
+ if auto_name_map == True:
+ name_map = {}
+ for shortname,comment in self.comment_shortnames(bug_shortname):
+ name_map[comment.uuid] = shortname
+ stringlist = []
+ for depth,comment in self.thread(flatten=True):
+ ind = 2*depth+indent
+ if comment.uuid in name_map:
+ sname = name_map[comment.uuid]
+ else:
+ sname = None
+ stringlist.append(comment.string(indent=ind, shortname=sname))
+ return '\n'.join(stringlist)
+
+ def comment_shortnames(self, bug_shortname=""):
+ """
+ Iterate through (id, comment) pairs, in time order.
+ (This is a user-friendly id, not the comment uuid).
+
+ SIDE-EFFECT : will sort the comment tree by comment.time
+
+ >>> a = Comment(bug=None, uuid="a")
+ >>> b = a.new_reply()
+ >>> b.uuid = "b"
+ >>> c = b.new_reply()
+ >>> c.uuid = "c"
+ >>> d = a.new_reply()
+ >>> d.uuid = "d"
+ >>> for id,name in a.comment_shortnames("bug-1"):
+ ... print id, name.uuid
+ bug-1:1 a
+ bug-1:2 b
+ bug-1:3 c
+ bug-1:4 d
+ """
+ self.sort(key=lambda comm : comm.time)
+ for num,comment in enumerate(self.traverse()):
+ yield ("%s:%d" % (bug_shortname, num+1), comment)
+
+ def comment_from_shortname(self, comment_shortname, *args, **kwargs):
+ """
+ Use a comment shortname to look up a comment.
+ >>> a = Comment(bug=None, uuid="a")
+ >>> b = a.new_reply()
+ >>> b.uuid = "b"
+ >>> c = b.new_reply()
+ >>> c.uuid = "c"
+ >>> d = a.new_reply()
+ >>> d.uuid = "d"
+ >>> comm = a.comment_from_shortname("bug-1:3", bug_shortname="bug-1")
+ >>> id(comm) == id(c)
+ True
+ """
+ for cur_name, comment in self.comment_shortnames(*args, **kwargs):
+ if comment_shortname == cur_name:
+ return comment
+ raise KeyError(comment_shortname)
+
+ def comment_from_uuid(self, uuid):
+ """
+ Use a comment shortname to look up a comment.
+ >>> a = Comment(bug=None, uuid="a")
+ >>> b = a.new_reply()
+ >>> b.uuid = "b"
+ >>> c = b.new_reply()
+ >>> c.uuid = "c"
+ >>> d = a.new_reply()
+ >>> d.uuid = "d"
+ >>> comm = a.comment_from_uuid("d")
+ >>> id(comm) == id(d)
+ True
+ """
+ for comment in self.traverse():
+ if comment.uuid == uuid:
+ return comment
+ raise KeyError(uuid)
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/config.py b/libbe/config.py
index ecc40ce..79c0d6f 100644
--- a/libbe/config.py
+++ b/libbe/config.py
@@ -16,6 +16,8 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import ConfigParser
import os.path
+import doctest
+
def path():
"""Return the path to the per-user config file"""
return os.path.expanduser("~/.bugs_everywhere")
@@ -58,3 +60,5 @@ def get_val(name, section="DEFAULT"):
return config.get(section, name)
except ConfigParser.NoOptionError:
return None
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/diff.py b/libbe/diff.py
index c1dc429..86a91ca 100644
--- a/libbe/diff.py
+++ b/libbe/diff.py
@@ -17,60 +17,60 @@
"""Compare two bug trees"""
from libbe import cmdutil, bugdir
from libbe.utility import time_to_str
+from libbe.bug import cmp_severity
+import doctest
-def diff(old_tree, new_tree):
- old_bug_map = old_tree.bug_map()
- new_bug_map = new_tree.bug_map()
+def diff(old_bugdir, new_bugdir):
added = []
removed = []
modified = []
- for old_bug in old_bug_map.itervalues():
- new_bug = new_bug_map.get(old_bug.uuid)
- if new_bug is None :
- removed.append(old_bug)
- else:
+ for uuid in old_bugdir.list_uuids():
+ old_bug = old_bugdir.bug_from_uuid(uuid)
+ try:
+ new_bug = new_bugdir.bug_from_uuid(uuid)
if old_bug != new_bug:
modified.append((old_bug, new_bug))
- for new_bug in new_bug_map.itervalues():
- if not old_bug_map.has_key(new_bug.uuid):
+ except KeyError:
+ removed.append(old_bug)
+ for uuid in new_bugdir.list_uuids():
+ if not old_bugdir.has_bug(uuid):
+ new_bug = new_bugdir.bug_from_uuid(uuid)
added.append(new_bug)
return (removed, modified, added)
-
-def reference_diff(bugdir, spec=None):
- return diff(bugdir.get_reference_bugdir(spec), bugdir)
-
def diff_report(diff_data, bug_dir):
(removed, modified, added) = diff_data
- bugs = list(bug_dir.list())
def modified_cmp(left, right):
- return bugdir.cmp_severity(left[1], right[1])
+ return cmp_severity(left[1], right[1])
- added.sort(bugdir.cmp_severity)
- removed.sort(bugdir.cmp_severity)
+ added.sort(cmp_severity)
+ removed.sort(cmp_severity)
modified.sort(modified_cmp)
- if len(added) > 0:
+ if len(added) > 0:
print "New bug reports:"
for bug in added:
- print cmdutil.bug_summary(bug, bugs, no_target=True)
+ print bug.string(shortlist=True)
+ print ""
if len(modified) > 0:
printed = False
for old_bug, new_bug in modified:
- change_str = bug_changes(old_bug, new_bug, bugs)
+ change_str = bug_changes(old_bug, new_bug, bug_dir)
if change_str is None:
continue
if not printed:
printed = True
print "Modified bug reports:"
print change_str
+ print ""
if len(removed) > 0:
print "Removed bug reports:"
for bug in removed:
- print cmdutil.bug_summary(bug, bugs, no_target=True)
-
+ print bug.string(shortlist=True)
+ print ""
+
def change_lines(old, new, attributes):
change_list = []
for attr in attributes:
@@ -87,24 +87,27 @@ def bug_changes(old, new, bugs):
change_list = change_lines(old, new, ("time", "creator", "severity",
"target", "summary", "status", "assigned"))
- old_comment_ids = list(old.iter_comment_ids())
- new_comment_ids = list(new.iter_comment_ids())
+ old_comment_ids = [c.uuid for c in old.comments()]
+ new_comment_ids = [c.uuid for c in new.comments()]
change_strings = ["%s: %s -> %s" % f for f in change_list]
for comment_id in new_comment_ids:
if comment_id not in old_comment_ids:
- summary = comment_summary(new.get_comment(comment_id), "new")
+ summary = comment_summary(new.comment_from_uuid(comment_id), "new")
change_strings.append(summary)
for comment_id in old_comment_ids:
if comment_id not in new_comment_ids:
- summary = comment_summary(new.get_comment(comment_id), "removed")
+ summary = comment_summary(new.comment_from_uuid(comment_id),
+ "removed")
change_strings.append(summary)
if len(change_strings) == 0:
return None
- return "%s%s\n" % (cmdutil.bug_summary(new, bugs, shortlist=True),
- "\n".join(change_strings))
+ return "%s\n %s" % (new.string(shortlist=True),
+ " \n".join(change_strings))
def comment_summary(comment, status):
return "%8s comment from %s on %s" % (status, comment.From,
- time_to_str(comment.date))
+ time_to_str(comment.time))
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/git.py b/libbe/git.py
index 398585f..046e72e 100644
--- a/libbe/git.py
+++ b/libbe/git.py
@@ -14,133 +14,86 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import tempfile
-
-from rcs import invoke
-
-def strip_git(filename):
- # Find the base path of the GIT tree, in order to strip that leading
- # path from arguments to git -- it doesn't like absolute paths.
- if os.path.isabs(filename):
- filename = filename[len(git_repo_for_path('.'))+1:]
- return filename
-
-def invoke_client(*args, **kwargs):
- directory = kwargs['directory']
- expect = kwargs.get('expect', (0, 1))
- cl_args = ["git"]
- cl_args.extend(args)
- status,output,error = invoke(cl_args, expect, cwd=directory)
- return status, output
-
-def add_id(filename, paranoid=False):
- filename = strip_git(filename)
- invoke_client("add", filename, directory=git_repo_for_path('.'))
-
-def delete_id(filename):
- filename = strip_git(filename)
- invoke_client("rm", filename, directory=git_repo_for_path('.'))
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def detect(path):
- """Detect whether a directory is revision-controlled using GIT"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, ".git")):
+import re
+import unittest
+import doctest
+
+from rcs import RCS, RCStestCase, CommandError
+
+def new():
+ return Git()
+
+class Git(RCS):
+ name="git"
+ client="git"
+ versioned=True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ if self._u_search_parent_directories(path, ".git") != None :
return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.dirname(path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- if body is not None:
- summary += '\n' + body
- descriptor, filename = tempfile.mkstemp()
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.close()
- invoke_client('commit', '-a', '-F', filename, directory=directory)
- finally:
- os.unlink(filename)
-
-def postcommit(directory):
- pass
-
-
-# In order to diff the bug database, you need a way to check out arbitrary
-# previous revisions and a mechanism for locating the bug_dir in the revision
-# you've checked out.
-#
-# Copying the Mercurial implementation, this feature is implemented by four
-# functions:
-#
-# git_dir_for_path : find '.git' for a git tree.
-#
-# export : check out a commit 'spec' from git-repo 'bug_dir' into a dir
-# 'revision_dir'
-#
-# find_or_make_export : check out a commit 'spec' from git repo 'directory' to
-# any location you please and return the path to the checkout
-#
-# path_in_reference : return a path to the bug_dir of the commit 'spec'
-
-def git_repo_for_path(path):
- """Find the root of the deepest repository containing path."""
- # Assume that nothing funny is going on; in particular, that we aren't
- # dealing with a bare repo.
- return os.path.dirname(git_dir_for_path(path))
-
-def git_dir_for_path(path):
- """Find the git-dir of the deepest repo containing path."""
- return invoke_client("rev-parse", "--git-dir", directory=path)[1].rstrip()
-
-def export(spec, bug_dir, revision_dir):
- """Check out commit 'spec' from the git repo containing bug_dir into
- 'revision_dir'."""
- if not os.path.exists(revision_dir):
- os.makedirs(revision_dir)
- invoke_client("init", directory=revision_dir)
- invoke_client("pull", git_dir_for_path(bug_dir), directory=revision_dir)
- invoke_client("checkout", '-f', spec, directory=revision_dir)
-
-def find_or_make_export(spec, directory):
- """Checkout 'spec' from the repo at 'directory' by hook or by crook and
- return the path to the working copy."""
- home = os.path.expanduser("~")
- revision_root = os.path.join(home, ".be_revs")
- if not os.path.exists(revision_root):
- os.mkdir(revision_root)
- revision_dir = os.path.join(revision_root, spec)
- if not os.path.exists(revision_dir):
- export(spec, directory, revision_dir)
- return revision_dir
-
-def path_in_reference(bug_dir, spec):
- """Check out 'spec' and return the path to its bug_dir."""
- spec = spec or 'HEAD'
- spec = invoke_client('rev-parse', spec, directory=bug_dir)[1].rstrip()
- # This is a really hairy computation.
- # The theory is that we can't possibly be working out of a bare repo;
- # hence, we get the rel_bug_dir by chopping off dirname(git_dir_for_path(bug_dir))
- # + '/'.
- rel_bug_dir = strip_git(bug_dir)
- export_root = find_or_make_export(spec, directory=bug_dir)
- return os.path.join(export_root, rel_bug_dir)
-
-
-name = "git"
-
+ return False
+ def _rcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ status,output,error = self._u_invoke_client("rev-parse", "--git-dir",
+ directory=path)
+ gitdir = os.path.join(path, output.rstrip('\n'))
+ dirname = os.path.abspath(os.path.dirname(gitdir))
+ return dirname
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("config", "user.name")
+ name = output.rstrip('\n')
+ status,output,error = self._u_invoke_client("config", "user.email")
+ email = output.rstrip('\n')
+ return self._u_create_id(name, email)
+ def _rcs_set_user_id(self, value):
+ name,email = self._u_parse_id(value)
+ if email != None:
+ self._u_invoke_client("config", "user.email", email)
+ self._u_invoke_client("config", "user.name", name)
+ def _rcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ self._u_invoke_client("rm", "-f", path)
+ def _rcs_update(self, path):
+ self._rcs_add(path)
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(self._u_abspath(path), "rb").read()
+ else:
+ arg = "%s:%s" % (revision,path)
+ status,output,error = self._u_invoke_client("show", arg)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision==None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ #self._u_invoke_client("archive", revision, directory) # makes tarball
+ self._u_invoke_client("clone", "--no-checkout",".",directory)
+ self._u_invoke_client("checkout", revision, directory=directory)
+ def _rcs_commit(self, commitfile):
+ status,output,error = self._u_invoke_client('commit', '-a',
+ '-F', commitfile)
+ revision = None
+ revline = re.compile("Created (.*)commit (.*):(.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 3
+ revision = match.groups()[1]
+ return revision
+
+class GitTestCase(RCStestCase):
+ Class = Git
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(GitTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/hg.py b/libbe/hg.py
index 35de8e0..27cbb79 100644
--- a/libbe/hg.py
+++ b/libbe/hg.py
@@ -14,102 +14,73 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import tempfile
-
-import config
-from rcs import invoke, CommandError
-
-def invoke_client(*args, **kwargs):
- directory = kwargs['directory']
- expect = kwargs.get('expect', (0, 1))
- cl_args = ["hg"]
- cl_args.extend(args)
- status,output,error = invoke(cl_args, expect, cwd=directory)
- return status, output
-
-def add_id(filename, paranoid=False):
- invoke_client("add", filename, directory='.')
-
-def delete_id(filename):
- invoke_client("rm", filename, directory='.')
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def lookup_revision(revno, directory):
- return invoke_client('log', '--rev', str(revno), '--template={node}',
- directory=directory)[1].rstrip('\n')
-
-def export(revno, directory, revision_dir):
- invoke_client("archive", "--rev", str(revno), revision_dir,
- directory=directory)
-
-def find_or_make_export(revno, directory):
- revision_id = lookup_revision(revno, directory)
- home = os.path.expanduser("~")
- revision_root = os.path.join(home, ".be_revs")
- if not os.path.exists(revision_root):
- os.mkdir(revision_root)
- revision_dir = os.path.join(revision_root, revision_id)
- if not os.path.exists(revision_dir):
- export(revno, directory, revision_dir)
- return revision_dir
-
-def hg_root(path):
- return invoke_client("root", "-R", path, directory=None)[1].rstrip('\r')
-
-def path_in_reference(bug_dir, spec):
- if spec is None:
- spec = int(invoke_client('tip', '--template="{rev}"',
- directory=bug_dir)[1])
- rel_bug_dir = bug_dir[len(hg_root(bug_dir)):]
- export_root = find_or_make_export(spec, directory=bug_dir)
- return os.path.join(export_root, rel_bug_dir)
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using Mercurial"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, ".hg")):
+import re
+import unittest
+import doctest
+
+from rcs import RCS, RCStestCase, CommandError, SettingIDnotSupported
+
+def new():
+ return Hg()
+
+class Hg(RCS):
+ name="hg"
+ client="hg"
+ versioned=True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Mercurial"""
+ if self._u_search_parent_directories(path, ".hg") != None:
return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.dirname(path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- if body is not None:
- summary += '\n' + body
- descriptor, filename = tempfile.mkstemp()
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.close()
- invoke_client('commit', '--logfile', filename, directory=directory)
- finally:
- os.unlink(filename)
-
-def postcommit(directory):
- pass
-
-name = "hg"
+ return False
+ def _rcs_root(self, path):
+ status,output,error = self._u_invoke_client("root", directory=path)
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("showconfig","ui.username")
+ return output.rstrip('\n')
+ def _rcs_set_user_id(self, value):
+ """
+ Supported by the Config Extension, but that is not part of
+ standard Mercurial.
+ http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
+ """
+ raise SettingIDnotSupported
+ def _rcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ self._u_invoke_client("rm", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(os.path.join(self.rootdir, path), "rb").read()
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("archive", "--rev", revision, directory)
+ def _rcs_commit(self, commitfile):
+ self._u_invoke_client('commit', '--logfile', commitfile)
+ status,output,error = self._u_invoke_client('identify')
+ revision = None
+ revline = re.compile("(.*) tip")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+
+class HgTestCase(RCStestCase):
+ Class = Hg
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(HgTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/mapfile.py b/libbe/mapfile.py
index 6a304fd..559d713 100644
--- a/libbe/mapfile.py
+++ b/libbe/mapfile.py
@@ -14,7 +14,11 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import os.path
+import errno
import utility
+import doctest
+
class IllegalKey(Exception):
def __init__(self, key):
Exception.__init__(self, 'Illegal key "%s"' % key)
@@ -25,28 +29,27 @@ class IllegalValue(Exception):
Exception.__init__(self, 'Illegal value "%s"' % value)
self.value = value
-def generate(f, map, context=3):
- """Generate a format-2 mapfile. This is a simpler format, but should merge
- better, because there's no chance of confusion for appends, and lines
- are unique for both key and value.
+def generate(map, context=3):
+ """Generate a format-2 mapfile content string. This is a simpler
+ format, but should merge better, because there's no chance of
+ confusion for appends, and lines are unique for both key and
+ value.
- >>> f = utility.FileString()
- >>> generate(f, {"q":"p"})
- >>> f.str
+ >>> generate({"q":"p"})
'\\n\\n\\nq=p\\n\\n\\n\\n'
- >>> generate(f, {"q=":"p"})
+ >>> generate({"q=":"p"})
Traceback (most recent call last):
IllegalKey: Illegal key "q="
- >>> generate(f, {"q\\n":"p"})
+ >>> generate({"q\\n":"p"})
Traceback (most recent call last):
IllegalKey: Illegal key "q\\n"
- >>> generate(f, {"":"p"})
+ >>> generate({"":"p"})
Traceback (most recent call last):
IllegalKey: Illegal key ""
- >>> generate(f, {">q":"p"})
+ >>> generate({">q":"p"})
Traceback (most recent call last):
IllegalKey: Illegal key ">q"
- >>> generate(f, {"q":"p\\n"})
+ >>> generate({"q":"p\\n"})
Traceback (most recent call last):
IllegalValue: Illegal value "p\\n"
"""
@@ -64,107 +67,48 @@ def generate(f, map, context=3):
if "\n" in map[key]:
raise IllegalValue(map[key].encode('string_escape'))
+ lines = []
for key in keys:
for i in range(context):
- f.write("\n")
- f.write("%s=%s\n" % (key.encode("utf-8"), map[key].encode("utf-8")))
+ lines.append("")
+ lines.append("%s=%s" % (key, map[key]))
for i in range(context):
- f.write("\n")
+ lines.append("")
+ return '\n'.join(lines) + '\n'
-def parse(f):
+def parse(contents):
"""
- Parse a format-2 mapfile.
+ Parse a format-2 mapfile string.
>>> parse('\\n\\n\\nq=p\\n\\n\\n\\n')['q']
- u'p'
+ 'p'
>>> parse('\\n\\nq=\\'p\\'\\n\\n\\n\\n')['q']
- u"\'p\'"
- >>> f = utility.FileString()
- >>> generate(f, {"a":"b", "c":"d", "e":"f"})
- >>> dict = parse(f)
+ "\'p\'"
+ >>> contents = generate({"a":"b", "c":"d", "e":"f"})
+ >>> dict = parse(contents)
>>> dict["a"]
- u'b'
+ 'b'
>>> dict["c"]
- u'd'
+ 'd'
>>> dict["e"]
- u'f'
+ 'f'
"""
- f = utility.get_file(f)
result = {}
- for line in f:
+ for line in contents.splitlines():
line = line.rstrip('\n')
if len(line) == 0:
continue
- name,value = [f.decode('utf-8') for f in line.split('=', 1)]
- assert not result.has_key('name')
+ name,value = [field for field in line.split('=', 1)]
+ assert not result.has_key(name)
result[name] = value
return result
+def map_save(rcs, path, map, allow_no_rcs=False):
+ """Save the map as a mapfile to the specified path"""
+ contents = generate(map)
+ rcs.set_file_contents(path, contents, allow_no_rcs)
-def split_diff3(this, other, f):
- """Split a file or string with diff3 conflicts into two files.
-
- :param this: The THIS file to write. May be a utility.FileString
- :param other: The OTHER file to write. May be a utility.FileString
- :param f: The file or string to split.
- :return: True if there were conflicts
-
- >>> split_diff3(utility.FileString(), utility.FileString(),
- ... "a\\nb\\nc\\nd\\n")
- False
- >>> this = utility.FileString()
- >>> other = utility.FileString()
- >>> split_diff3(this, other, "<<<<<<< values1\\nstatus=closed\\n=======\\nstatus=closedd\\n>>>>>>> values2\\n")
- True
- >>> this.str
- 'status=closed\\n'
- >>> other.str
- 'status=closedd\\n'
- """
- f = utility.get_file(f)
- this_active = True
- other_active = True
- conflicts = False
- for line in f:
- if line.startswith("<<<<<<<"):
- conflicts = True
- this_active = True
- other_active = False
- elif line.startswith("======="):
- this_active = False
- other_active = True
- elif line.startswith(">>>>>>>"):
- this_active = True
- other_active = True
- else:
- if this_active:
- this.write(line)
- if other_active:
- other.write(line)
- return conflicts
-
-def split_diff3_str(f):
- """Split a file/string with diff3 conflicts into two strings. If there
- were no conflicts, one string is returned.
+def map_load(rcs, path, allow_no_rcs=False):
+ contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs)
+ return parse(contents)
- >>> result = split_diff3_str("<<<<<<< values1\\nstatus=closed\\n=======\\nstatus=closedd\\n>>>>>>> values2\\n")
- >>> len(result)
- 2
- >>> result[0] != result[1]
- True
- >>> result = split_diff3_str("<<<<<<< values1\\nstatus=closed\\n=======\\nstatus=closed\\n>>>>>>> values2\\n")
- >>> len(result)
- 2
- >>> result[0] == result[1]
- True
- >>> result = split_diff3_str("a\\nb\\nc\\nd\\n")
- >>> len(result)
- 1
- >>> result[0]
- 'a\\nb\\nc\\nd\\n'
- """
- this = utility.FileString()
- other = utility.FileString()
- if split_diff3(this, other, f):
- return (this.str, other.str)
- else:
- return (this.str,)
+suite = doctest.DocTestSuite()
diff --git a/libbe/names.py b/libbe/names.py
deleted file mode 100644
index d2e077a..0000000
--- a/libbe/names.py
+++ /dev/null
@@ -1,37 +0,0 @@
-# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
-# <abentley@panoramicfeedback.com>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-import os
-import sys
-
-
-def uuid():
- # this code borrowed from standard commands module
- # but adapted to win32
- pipe = os.popen('uuidgen', 'r')
- text = pipe.read()
- sts = pipe.close()
- if sts not in (0, None):
- raise "Failed to run uuidgen"
- if text[-1:] == '\n': text = text[:-1]
- return text
-
-def creator():
- if sys.platform != "win32":
- return os.environ["LOGNAME"]
- else:
- return os.environ["USERNAME"]
diff --git a/libbe/no_rcs.py b/libbe/no_rcs.py
deleted file mode 100644
index 1b3b005..0000000
--- a/libbe/no_rcs.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
-# <abentley@panoramicfeedback.com>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import os
-import config
-from os import unlink
-
-def add_id(filename, paranoid=False):
- """Compatibility function"""
- pass
-
-def delete_id(filename):
- """Compatibility function"""
- pass
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def detect(path):
- """Compatibility function"""
- return True
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- pass
-
-def postcommit(directory):
- pass
-
-name = "None"
diff --git a/libbe/plugin.py b/libbe/plugin.py
index 4016ca1..0964fba 100644
--- a/libbe/plugin.py
+++ b/libbe/plugin.py
@@ -17,6 +17,8 @@
import os
import os.path
import sys
+import doctest
+
def my_import(mod_name):
module = __import__(mod_name)
components = mod_name.split('.')
@@ -34,6 +36,8 @@ def iter_plugins(prefix):
modfiles = os.listdir(os.path.join(plugin_path, prefix))
modfiles.sort()
for modfile in modfiles:
+ if modfile.startswith('.'):
+ continue # the occasional emacs temporary file
if modfile.endswith(".py") and modfile != "__init__.py":
yield modfile[:-3], my_import(prefix+"."+modfile[:-3])
@@ -55,6 +59,9 @@ def get_plugin(prefix, name):
plugin_path = os.path.realpath(os.path.dirname(os.path.dirname(__file__)))
if plugin_path not in sys.path:
sys.path.append(plugin_path)
+
+suite = doctest.DocTestSuite()
+
def _test():
import doctest
doctest.testmod()
diff --git a/libbe/rcs.py b/libbe/rcs.py
index 4487fba..3519c3d 100644
--- a/libbe/rcs.py
+++ b/libbe/rcs.py
@@ -15,42 +15,45 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from subprocess import Popen, PIPE
+import os
+import os.path
+from socket import gethostname
+import re
import sys
+import tempfile
+import shutil
+import unittest
+import doctest
-def rcs_by_name(rcs_name):
- """Return the module for the RCS with the given name"""
- if rcs_name == "Arch":
- import arch
- return arch
- elif rcs_name == "bzr":
- import bzr
- return bzr
- elif rcs_name == "hg":
- import hg
- return hg
- elif rcs_name == "git":
- import git
- return git
- elif rcs_name == "None":
- import no_rcs
- return no_rcs
-
-def detect(dir):
- """Return the module for the rcs being used in this directory"""
+from utility import Dir, search_parent_directories
+
+
+def _get_matching_rcs(matchfn):
+ """Return the first module for which matchfn(RCS_instance) is true"""
import arch
import bzr
import hg
import git
- if arch.detect(dir):
- return arch
- elif bzr.detect(dir):
- return bzr
- elif hg.detect(dir):
- return hg
- elif git.detect(dir):
- return git
- import no_rcs
- return no_rcs
+ for module in [arch, bzr, hg, git]:
+ rcs = module.new()
+ if matchfn(rcs) == True:
+ return rcs
+ else:
+ del(rcs)
+ return RCS()
+
+def rcs_by_name(rcs_name):
+ """Return the module for the RCS with the given name"""
+ return _get_matching_rcs(lambda rcs: rcs.name == rcs_name)
+
+def detect_rcs(dir):
+ """Return an RCS instance for the rcs being used in this directory"""
+ return _get_matching_rcs(lambda rcs: rcs.detect(dir))
+
+def installed_rcs():
+ """Return an instance of an installed RCS"""
+ return _get_matching_rcs(lambda rcs: rcs.installed())
+
class CommandError(Exception):
def __init__(self, err_str, status):
@@ -58,19 +61,579 @@ class CommandError(Exception):
self.err_str = err_str
self.status = status
-def invoke(args, expect=(0,), cwd=None):
- try :
- if sys.platform != "win32":
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
+class SettingIDnotSupported(NotImplementedError):
+ pass
+
+class RCSnotRooted(Exception):
+ def __init__(self):
+ msg = "RCS not rooted"
+ Exception.__init__(self, msg)
+
+class PathNotInRoot(Exception):
+ def __init__(self, path, root):
+ msg = "Path '%s' not in root '%s'" % (path, root)
+ Exception.__init__(self, msg)
+ self.path = path
+ self.root = root
+
+class NoSuchFile(Exception):
+ def __init__(self, pathname):
+ Exception.__init__(self, "No such file: %s" % pathname)
+
+
+def new():
+ return RCS()
+
+class RCS(object):
+ """
+ This class implements a 'no-rcs' interface.
+
+ Support for other RCSs can be added by subclassing this class, and
+ overriding methods _rcs_*() with code appropriate for your RCS.
+
+ The methods _u_*() are utility methods available to the _rcs_*()
+ methods.
+ """
+ name = "None"
+ client = "" # command-line tool for _u_invoke_client
+ versioned = False
+ def __init__(self, paranoid=False):
+ self.paranoid = paranoid
+ self.verboseInvoke = False
+ self.rootdir = None
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ def __del__(self):
+ self.cleanup()
+
+ def _rcs_help(self):
+ """
+ Return the command help string.
+ (Allows a simple test to see if the client is installed.)
+ """
+ pass
+ def _rcs_detect(self, path=None):
+ """
+ Detect whether a directory is revision controlled with this RCS.
+ """
+ return True
+ def _rcs_root(self, path):
+ """
+ Get the RCS root. This is the default working directory for
+ future invocations. You would normally set this to the root
+ directory for your RCS.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ if path == "":
+ path = os.path.abspath(".")
+ return path
+ def _rcs_init(self, path):
+ """
+ Begin versioning the tree based at path.
+ """
+ pass
+ def _rcs_cleanup(self):
+ """
+ Remove any cruft that _rcs_init() created outside of the
+ versioned tree.
+ """
+ pass
+ def _rcs_get_user_id(self):
+ """
+ Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the RCS has not been configured with a username, return None.
+ """
+ return None
+ def _rcs_set_user_id(self, value):
+ """
+ Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the RCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ raise SettingIDnotSupported
+ def _rcs_add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ pass
+ def _rcs_remove(self, path):
+ """
+ Remove the file at path from version control. Optionally
+ remove the file from the filesystem as well.
+ """
+ pass
+ def _rcs_update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ """
+ Get the file contents as they were in a given revision. Don't
+ worry about decoding the contents, the RCS.get_file_contents()
+ method will handle that.
+
+ Revision==None specifies the current revision.
+ """
+ assert revision == None, \
+ "The %s RCS does not support revision specifiers" % self.name
+ return file(os.path.join(self.rootdir, path), "rb").read()
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ dir specifies a directory to create the duplicate in.
+ """
+ shutil.copytree(self.rootdir, directory, True)
+ def _rcs_commit(self, commitfile):
+ """
+ Commit the current working directory, using the contents of
+ commitfile as the comment. Return the name of the old
+ revision.
+ """
+ return None
+ def installed(self):
+ try:
+ self._rcs_help()
+ return True
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ return False
+ raise e
+ def detect(self, path="."):
+ """
+ Detect whether a directory is revision controlled with this RCS.
+ """
+ return self._rcs_detect(path)
+ def root(self, path):
+ """
+ Set the root directory to the path's RCS root. This is the
+ default working directory for future invocations.
+ """
+ self.rootdir = self._rcs_root(path)
+ def init(self, path):
+ """
+ Begin versioning the tree based at path.
+ Also roots the rcs at path.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ self._rcs_init(path)
+ self.root(path)
+ def cleanup(self):
+ self._rcs_cleanup()
+ def get_user_id(self):
+ """
+ Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the RCS has not been configured with a username, return the user's
+ id. You can override the automatic lookup procedure by setting the
+ RCS.user_id attribute to a string of your choice.
+ """
+ if hasattr(self, "user_id"):
+ if self.user_id != None:
+ return self.user_id
+ id = self._rcs_get_user_id()
+ if id == None:
+ name = self._u_get_fallback_username()
+ email = self._u_get_fallback_email()
+ id = self._u_create_id(name, email)
+ print >> sys.stderr, "Guessing id '%s'" % id
+ try:
+ self.set_user_id(id)
+ except SettingIDnotSupported:
+ pass
+ return id
+ def set_user_id(self, value):
+ """
+ Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the RCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ self._rcs_set_user_id(value)
+ def add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ self._rcs_add(self._u_rel_path(path))
+ def remove(self, path):
+ """
+ Remove a file from both version control and the filesystem.
+ """
+ self._rcs_remove(self._u_rel_path(path))
+ if os.path.exists(path):
+ os.remove(path)
+ def recursive_remove(self, dirname):
+ """
+ Remove a file/directory and all its decendents from both
+ version control and the filesystem.
+ """
+ if not os.path.exists(dirname):
+ raise NoSuchFile(dirname)
+ for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
+ filenames.extend(dirnames)
+ for path in filenames:
+ fullpath = os.path.join(dirpath, path)
+ if os.path.exists(fullpath) == False:
+ continue
+ self._rcs_remove(self._u_rel_path(fullpath))
+ if os.path.exists(dirname):
+ shutil.rmtree(dirname)
+ def update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ self._rcs_update(self._u_rel_path(path))
+ def get_file_contents(self, path, revision=None, allow_no_rcs=False):
+ """
+ Get the file as it was in a given revision.
+ Revision==None specifies the current revision.
+ """
+ if not os.path.exists(path):
+ raise NoSuchFile(path)
+ if self._use_rcs(path, allow_no_rcs):
+ relpath = self._u_rel_path(path)
+ contents = self._rcs_get_file_contents(relpath,revision)
+ else:
+ contents = file(path, "rb").read()
+ return contents.decode("utf-8")
+ def set_file_contents(self, path, contents, allow_no_rcs=False):
+ """
+ Set the file contents under version control.
+ """
+ add = not os.path.exists(path)
+ file(path, "wb").write(contents.encode("utf-8"))
+
+ if self._use_rcs(path, allow_no_rcs):
+ if add:
+ self.add(path)
+ else:
+ self.update(path)
+ def mkdir(self, path, allow_no_rcs=False):
+ """
+ Create (if neccessary) a directory at path under version
+ control.
+ """
+ if not os.path.exists(path):
+ os.mkdir(path)
+ if self._use_rcs(path, allow_no_rcs):
+ self.add(path)
else:
- # win32 don't have os.execvp() so have to run command in a shell
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True,
- cwd=cwd)
- except OSError, e :
- strerror = "%s\nwhile executing %s" % (e.args[1], args)
- raise CommandError(strerror, e.args[0])
- output, error = q.communicate()
- status = q.wait()
- if status not in expect:
- raise CommandError(error, status)
- return status, output, error
+ assert os.path.isdir(path)
+ if self._use_rcs(path, allow_no_rcs):
+ self.update(path)
+ def duplicate_repo(self, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ Return the path to the arbitrary directory at the base of the new repo.
+ """
+ # Dirname in Baseir to protect against simlink attacks.
+ if self._duplicateBasedir == None:
+ self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs')
+ self._duplicateDirname = \
+ os.path.join(self._duplicateBasedir, "duplicate")
+ self._rcs_duplicate_repo(directory=self._duplicateDirname,
+ revision=revision)
+ return self._duplicateDirname
+ def remove_duplicate_repo(self):
+ """
+ Clean up a duplicate repo created with duplicate_repo().
+ """
+ if self._duplicateBasedir != None:
+ shutil.rmtree(self._duplicateBasedir)
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ def commit(self, summary, body=None):
+ """
+ Commit the current working directory, with a commit message
+ string summary and body. Return the name of the old revision
+ (or None if versioning is not supported).
+ """
+ if body is not None:
+ summary += '\n' + body
+ descriptor, filename = tempfile.mkstemp()
+ revision = None
+ try:
+ temp_file = os.fdopen(descriptor, 'wb')
+ temp_file.write(summary)
+ temp_file.flush()
+ revision = self._rcs_commit(filename)
+ temp_file.close()
+ finally:
+ os.remove(filename)
+ return revision
+ def precommit(self, directory):
+ pass
+ def postcommit(self, directory):
+ pass
+ def _u_invoke(self, args, expect=(0,), cwd=None):
+ if cwd == None:
+ cwd = self.rootdir
+ if self.verboseInvoke == True:
+ print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args))
+ try :
+ if sys.platform != "win32":
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
+ else:
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ shell=True, cwd=cwd)
+ except OSError, e :
+ strerror = "%s\nwhile executing %s" % (e.args[1], args)
+ raise CommandError(strerror, e.args[0])
+ output, error = q.communicate()
+ status = q.wait()
+ if self.verboseInvoke == True:
+ print >> sys.stderr, "%d\n%s%s" % (status, output, error)
+ if status not in expect:
+ strerror = "%s\nwhile executing %s\n%s" % (args[1], args, error)
+ raise CommandError(strerror, status)
+ return status, output, error
+ def _u_invoke_client(self, *args, **kwargs):
+ directory = kwargs.get('directory',None)
+ expect = kwargs.get('expect', (0,))
+ cl_args = [self.client]
+ cl_args.extend(args)
+ return self._u_invoke(cl_args, expect, cwd=directory)
+ def _u_search_parent_directories(self, path, filename):
+ """
+ Find the file (or directory) named filename in path or in any
+ of path's parents.
+
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
+ """
+ return search_parent_directories(path, filename)
+ def _use_rcs(self, path, allow_no_rcs):
+ """
+ Try and decide if _rcs_add/update/mkdir/etc calls will
+ succeed. Returns True is we think the rcs_call would
+ succeeed, and False otherwise.
+ """
+ use_rcs = True
+ exception = None
+ if self.rootdir != None:
+ if self.path_in_root(path) == False:
+ use_rcs = False
+ exception = PathNotInRoot(path, self.rootdir)
+ else:
+ use_rcs = False
+ exception = RCSnotRooted
+ if use_rcs == False and allow_no_rcs==False:
+ raise exception
+ return use_rcs
+ def path_in_root(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> rcs = new()
+ >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c")
+ True
+ >>> rcs.path_in_root("/a.b/.be", "/a.b/c")
+ False
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise RCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ return False
+ return True
+ def _u_rel_path(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> rcs = new()
+ >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c")
+ '.be'
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise RCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ raise PathNotInRoot(path, absRootSlashedDir)
+ assert path != absRootSlashedDir, \
+ "file %s == root directory %s" % (path, absRootSlashedDir)
+ relpath = path[len(absRootSlashedDir):]
+ return relpath
+ def _u_abspath(self, path, root=None):
+ """
+ Return the absolute path from a path realtive to root.
+ >>> rcs = new()
+ >>> rcs._u_abspath(".be", "/a.b/c")
+ '/a.b/c/.be'
+ """
+ if root == None:
+ assert self.rootdir != None, "RCS not rooted"
+ root = self.rootdir
+ return os.path.abspath(os.path.join(root, path))
+ def _u_create_id(self, name, email=None):
+ """
+ >>> rcs = new()
+ >>> rcs._u_create_id("John Doe", "jdoe@example.com")
+ 'John Doe <jdoe@example.com>'
+ >>> rcs._u_create_id("John Doe")
+ 'John Doe'
+ """
+ assert len(name) > 0
+ if email == None or len(email) == 0:
+ return name
+ else:
+ return "%s <%s>" % (name, email)
+ def _u_parse_id(self, value):
+ """
+ >>> rcs = new()
+ >>> rcs._u_parse_id("John Doe <jdoe@example.com>")
+ ('John Doe', 'jdoe@example.com')
+ >>> rcs._u_parse_id("John Doe")
+ ('John Doe', None)
+ >>> try:
+ ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>")
+ ... except AssertionError:
+ ... print "Invalid match"
+ Invalid match
+ """
+ emailexp = re.compile("(.*) <([^>]*)>(.*)")
+ match = emailexp.search(value)
+ if match == None:
+ email = None
+ name = value
+ else:
+ assert len(match.groups()) == 3
+ assert match.groups()[2] == "", match.groups()
+ email = match.groups()[1]
+ name = match.groups()[0]
+ assert name != None
+ assert len(name) > 0
+ return (name, email)
+ def _u_get_fallback_username(self):
+ name = None
+ for envariable in ["LOGNAME", "USERNAME"]:
+ if os.environ.has_key(envariable):
+ name = os.environ[envariable]
+ break
+ assert name != None
+ return name
+ def _u_get_fallback_email(self):
+ hostname = gethostname()
+ name = self._u_get_fallback_username()
+ return "%s@%s" % (name, hostname)
+ def _u_parse_commitfile(self, commitfile):
+ """
+ Split the commitfile created in self.commit() back into
+ summary and header lines.
+ """
+ f = file(commitfile, "rb")
+ summary = f.readline()
+ body = f.read()
+ body.lstrip('\n')
+ if len(body) == 0:
+ body = None
+ f.close
+ return (summary, body)
+
+
+class RCStestCase(unittest.TestCase):
+ Class = RCS
+ def __init__(self, *args, **kwargs):
+ unittest.TestCase.__init__(self, *args, **kwargs)
+ self.dirname = None
+ def instantiateRCS(self):
+ return self.Class()
+ def setUp(self):
+ self.dir = Dir()
+ self.dirname = self.dir.path
+ self.rcs = self.instantiateRCS()
+ def tearDown(self):
+ del(self.rcs)
+ del(self.dirname)
+ def fullPath(self, path):
+ return os.path.join(self.dirname, path)
+ def assertPathExists(self, path):
+ fullpath = self.fullPath(path)
+ self.failUnless(os.path.exists(fullpath)==True,
+ "path %s does not exist" % fullpath)
+ def uidTest(self):
+ user_id = self.rcs.get_user_id()
+ self.failUnless(user_id != None,
+ "unable to get a user id")
+ user_idB = "John Doe <jdoe@example.com>"
+ if self.rcs.name in ["None", "hg"]:
+ self.assertRaises(SettingIDnotSupported, self.rcs.set_user_id,
+ user_idB)
+ else:
+ self.rcs.set_user_id(user_idB)
+ self.failUnless(self.rcs.get_user_id() == user_idB,
+ "user id not set correctly (was %s, is %s)" \
+ % (user_id, self.rcs.get_user_id()))
+ self.failUnless(self.rcs.set_user_id(user_id) == None,
+ "unable to restore user id %s" % user_id)
+ self.failUnless(self.rcs.get_user_id() == user_id,
+ "unable to restore user id %s" % user_id)
+ def versionTest(self, path):
+ origpath = path
+ path = self.fullPath(path)
+ contentsA = "Lorem ipsum"
+ contentsB = "dolor sit amet"
+ self.rcs.set_file_contents(path,contentsA)
+ self.failUnless(self.rcs.get_file_contents(path)==contentsA,
+ "File contents not set or read correctly")
+ revision = self.rcs.commit("Commit current status")
+ self.failUnless(self.rcs.get_file_contents(path)==contentsA,
+ "Committing File contents not set or read correctly")
+ if self.rcs.versioned == True:
+ self.rcs.set_file_contents(path,contentsB)
+ self.failUnless(self.rcs.get_file_contents(path)==contentsB,
+ "File contents not set correctly after commit")
+ contentsArev = self.rcs.get_file_contents(path, revision)
+ self.failUnless(contentsArev==contentsA, \
+ "Original file contents not saved in revision %s\n%s\n%s\n" \
+ % (revision, contentsA, contentsArev))
+ dup = self.rcs.duplicate_repo(revision)
+ duppath = os.path.join(dup, origpath)
+ dupcont = file(duppath, "rb").read()
+ self.failUnless(dupcont == contentsA)
+ self.rcs.remove_duplicate_repo()
+ def testRun(self):
+ self.failUnless(self.rcs.installed() == True,
+ "%s RCS not found" % self.Class.name)
+ if self.Class.name != "None":
+ self.failUnless(self.rcs.detect(self.dirname)==False,
+ "Detected %s RCS before initializing" \
+ % self.Class.name)
+ self.rcs.init(self.dirname)
+ self.failUnless(self.rcs.detect(self.dirname)==True,
+ "Did not detect %s RCS after initializing" \
+ % self.Class.name)
+ rp = os.path.realpath(self.rcs.rootdir)
+ dp = os.path.realpath(self.dirname)
+ self.failUnless(dp == rp or rp == None,
+ "%s RCS root in wrong dir (%s %s)" \
+ % (self.Class.name, dp, rp))
+ self.uidTest()
+ self.rcs.mkdir(self.fullPath('a'))
+ self.rcs.mkdir(self.fullPath('a/b'))
+ self.rcs.mkdir(self.fullPath('c'))
+ self.assertPathExists('a')
+ self.assertPathExists('a/b')
+ self.assertPathExists('c')
+ self.versionTest('a/text')
+ self.versionTest('a/b/text')
+ self.rcs.recursive_remove(self.fullPath('a'))
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(RCStestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/restconvert.py b/libbe/restconvert.py
index cc7f866..57148e4 100644
--- a/libbe/restconvert.py
+++ b/libbe/restconvert.py
@@ -27,7 +27,7 @@ try :
from xml.etree import ElementTree # Python 2.5 (and greater?)
except ImportError :
from elementtree import ElementTree
-
+import doctest
def rest_xml(rest):
warnings = StringIO()
@@ -126,3 +126,5 @@ def foldout(name, arguments, options, content, lineno, content_offset,
foldout += foldout_body
foldout.set_class('foldout')
return [foldout]
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/template b/libbe/template
deleted file mode 100644
index 467eee4..0000000
--- a/libbe/template
+++ /dev/null
@@ -1,48 +0,0 @@
-"""Compare two bug trees"""
-from bugdir import cmdutil
-
-def diff(old_tree, new_tree):
- old_bug_map = old_tree.bug_map()
- new_bug_map = new_tree.bug_map()
- added = []
- removed = []
- modified = []
- for old_bug in old_bug_map.itervalues():
- new_bug = new_bug_map.get(bug.uuid)
- if new_bug is None :
- removed.append(old_bug)
- else:
- if old_bug != new_bug:
- modified.append((old_bug, new_bug))
- for new_bug in new_bug_map.itervalues():
- if not old_bug_map.haskey(new_bug.id):
- added.append(new_bug)
- return (removed, modified, added)
-
-
-def reference_diff(bugdir, spec=None):
- return diff(bugdir.reference_bugdir(), bugdir)
-
-def diff_report(diff_data, bugdir)
- (removed, modified, added) = diff_data
- def modified_cmp(left, right):
- return cmp_severity(left[1], right[1])
-
- added.sort(bugdir.cmp_severity)
- removed.sort(bugdir.cmp_severity)
- modified.sort(modified_cmp)
-
- print "New bug reports:"
- for bug in added:
- cmdutil.bug_summary(bug, bugdir, no_target=True)
-
- print "modified bug reports:"
- for old_bug, new_bug in modified:
- cmdutil.bug_summary(new_bug, bugdir, no_target=True)
-
- print "Removed bug reports:"
- for bug in removed:
- cmdutil.bug_summary(bug, bugdir, no_target=True)
-
-
-
diff --git a/libbe/tests.py b/libbe/tests.py
deleted file mode 100644
index a7d925d..0000000
--- a/libbe/tests.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
-# <abentley@panoramicfeedback.com>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import tempfile
-import shutil
-import os
-import os.path
-from libbe import bugdir, arch
-cleanable = []
-def clean_up():
- global cleanable
- tmp = cleanable
- tmp.reverse()
- for obj in tmp:
- obj.clean_up()
- cleanable = []
-
-class Dir:
- def __init__(self):
- self.name = tempfile.mkdtemp(prefix="testdir")
- cleanable.append(self)
- def clean_up(self):
- shutil.rmtree(self.name)
-
-def arch_dir():
- arch.ensure_user_id()
- dir = Dir()
- arch.init_tree(dir.name)
- return dir
-
-def bug_arch_dir():
- dir = arch_dir()
- return bugdir.create_bug_dir(dir.name, arch)
-
-def simple_bug_dir():
- dir = bug_arch_dir()
- bug_a = bugdir.new_bug(dir, "a")
- bug_b = bugdir.new_bug(dir, "b")
- bug_b.status = "closed"
- bug_a.save()
- bug_b.save()
- return dir
diff --git a/libbe/tree.py b/libbe/tree.py
new file mode 100644
index 0000000..e6f144e
--- /dev/null
+++ b/libbe/tree.py
@@ -0,0 +1,158 @@
+# Bugs Everywhere, a distributed bugtracker
+# Copyright (C) 2008 W. Trevor King
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA
+
+import doctest
+
+class Tree(list):
+ """
+ Construct
+ +-b---d-g
+ a-+ +-e
+ +-c-+-f-h-i
+ with
+ >>> i = Tree(); i.n = "i"
+ >>> h = Tree([i]); h.n = "h"
+ >>> f = Tree([h]); f.n = "f"
+ >>> e = Tree(); e.n = "e"
+ >>> c = Tree([f,e]); c.n = "c"
+ >>> g = Tree(); g.n = "g"
+ >>> d = Tree([g]); d.n = "d"
+ >>> b = Tree([d]); b.n = "b"
+ >>> a = Tree(); a.n = "a"
+ >>> a.append(c)
+ >>> a.append(b)
+
+ >>> a.branch_len()
+ 5
+ >>> a.sort(key=lambda node : node.branch_len())
+ >>> "".join([node.n for node in a.traverse()])
+ 'abdgcefhi'
+ >>> "".join([node.n for node in a.traverse(depthFirst=False)])
+ 'abcdefghi'
+ >>> for depth,node in a.thread():
+ ... print "%*s" % (2*depth+1, node.n)
+ a
+ b
+ d
+ g
+ c
+ e
+ f
+ h
+ i
+ >>> for depth,node in a.thread(flatten=True):
+ ... print "%*s" % (2*depth+1, node.n)
+ a
+ b
+ d
+ g
+ c
+ e
+ f
+ h
+ i
+ """
+ def branch_len(self):
+ """
+ Exhaustive search every time == SLOW.
+
+ Use only on small trees, or reimplement by overriding
+ child-addition methods to allow accurate caching.
+
+ For the tree
+ +-b---d-g
+ a-+ +-e
+ +-c-+-f-h-i
+ this method returns 5.
+ """
+ if len(self) == 0:
+ return 1
+ else:
+ return 1 + max([child.branch_len() for child in self])
+
+ def sort(self, *args, **kwargs):
+ """
+ This method can be slow, e.g. on a branch_len() sort, since a
+ node at depth N from the root has it's branch_len() method
+ called N times.
+ """
+ list.sort(self, *args, **kwargs)
+ for child in self:
+ child.sort()
+
+ def traverse(self, depthFirst=True):
+ """
+ Note: you might want to sort() your tree first.
+ """
+ if depthFirst == True:
+ yield self
+ for child in self:
+ for descendant in child.traverse():
+ yield descendant
+ else: # breadth first, Wikipedia algorithm
+ # http://en.wikipedia.org/wiki/Breadth-first_search
+ queue = [self]
+ while len(queue) > 0:
+ node = queue.pop(0)
+ yield node
+ queue.extend(node)
+
+ def thread(self, flatten=False):
+ """
+ When flatten==False, the depth of any node is one greater than
+ the depth of its parent. That way the inheritance is
+ explicit, but you can end up with highly indented threads.
+
+ When flatten==True, the depth of any node is only greater than
+ the depth of its parent when there is a branch, and the node
+ is not the last child. This can lead to ancestry ambiguity,
+ but keeps the total indentation down. E.g.
+ +-b +-b-c
+ a-+-c and a-+
+ +-d-e-f +-d-e-f
+ would both produce (after sorting by branch_len())
+ (0, a)
+ (1, b)
+ (1, c)
+ (0, d)
+ (0, e)
+ (0, f)
+ """
+ stack = [] # ancestry of the current node
+ if flatten == True:
+ depthDict = {}
+
+ for node in self.traverse(depthFirst=True):
+ while len(stack) > 0 \
+ and id(node) not in [id(c) for c in stack[-1]]:
+ stack.pop(-1)
+ if flatten == False:
+ depth = len(stack)
+ else:
+ if len(stack) == 0:
+ depth = 0
+ else:
+ parent = stack[-1]
+ depth = depthDict[id(parent)]
+ if len(parent) > 1 and node != parent[-1]:
+ depth += 1
+ depthDict[id(node)] = depth
+ yield (depth,node)
+ stack.append(node)
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/utility.py b/libbe/utility.py
index 1fd83da..2c77fcf 100644
--- a/libbe/utility.py
+++ b/libbe/utility.py
@@ -18,57 +18,50 @@ import calendar
import time
import os
import tempfile
+import shutil
+import doctest
-class FileString(object):
- """Bare-bones pseudo-file class
-
- >>> f = FileString("me\\nyou")
- >>> len(list(f))
- 2
- >>> len(list(f))
- 0
- >>> f = FileString()
- >>> f.write("hello\\nthere")
- >>> "".join(list(f))
- 'hello\\nthere'
- """
- def __init__(self, str=""):
- object.__init__(self)
- self.str = str
- self._iter = None
-
- def __iter__(self):
- if self._iter is None:
- self._iter = self._get_iter()
- return self._iter
- def _get_iter(self):
- for line in self.str.splitlines(True):
- yield line
-
- def write(self, line):
- self.str += line
-
-
-def get_file(f):
+def search_parent_directories(path, filename):
"""
- Return a file-like object from input. This is a helper for functions that
- can take either file or string parameters.
-
- :param f: file or string
- :return: a FileString if input is a string, otherwise return the imput
- object.
-
- >>> isinstance(get_file(file("/dev/null")), file)
- True
- >>> isinstance(get_file("f"), FileString)
- True
+ Find the file (or directory) named filename in path or in any
+ of path's parents.
+
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
"""
- if isinstance(f, basestring):
- return FileString(f)
- else:
- return f
-
+ path = os.path.realpath(path)
+ assert os.path.exists(path)
+ old_path = None
+ while True:
+ check_path = os.path.join(path, filename)
+ if os.path.exists(check_path):
+ return check_path
+ if path == old_path:
+ return None
+ old_path = path
+ path = os.path.dirname(path)
+
+class Dir (object):
+ "A temporary directory for testing use"
+ def __init__(self):
+ self.path = tempfile.mkdtemp(prefix="BEtest")
+ self.rmtree = shutil.rmtree # save local reference for __del__
+ self.removed = False
+ def __del__(self):
+ self.cleanup()
+ def cleanup(self):
+ if self.removed == False:
+ self.rmtree(self.path)
+ self.removed = True
+ def __call__(self):
+ return self.path
RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000"
@@ -111,10 +104,10 @@ def editor_string(comment=None):
CantFindEditor: Can't find editor to get string from
>>> os.environ["EDITOR"] = "echo bar > "
>>> editor_string()
- 'bar\\n'
+ u'bar\\n'
>>> os.environ["VISUAL"] = "echo baz > "
>>> editor_string()
- 'baz\\n'
+ u'baz\\n'
>>> del os.environ["EDITOR"]
>>> del os.environ["VISUAL"]
"""
@@ -133,7 +126,7 @@ def editor_string(comment=None):
os.close(fhandle)
oldmtime = os.path.getmtime(fname)
os.system("%s %s" % (editor, fname))
- output = trimmed_string(file(fname, "rb").read())
+ output = trimmed_string(file(fname, "rb").read().decode("utf-8"))
if output.rstrip('\n') == "":
output = None
finally:
@@ -162,3 +155,5 @@ def trimmed_string(instring):
break
out.append(line)
return ''.join(out)
+
+suite = doctest.DocTestSuite()