aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2008-11-18 20:42:50 -0500
committerW. Trevor King <wking@drexel.edu>2008-11-18 20:42:50 -0500
commit19b153b9a86377a2b30cc80fa3f475fed892e2fe (patch)
tree8f5688707ab1b34ffec2bc4372d087580ff21709 /libbe
parente4018dfe8cfa553adbd20898c5b42c3462ca1733 (diff)
downloadbugseverywhere-19b153b9a86377a2b30cc80fa3f475fed892e2fe.tar.gz
Major rewrite of RCS backends. RCS now represented as a class.
Lots of changes and just one commit. This started with bug dac91856-cb6a-4f69-8c03-38ff0b29aab2, when I noticed that new bugs were not being added appropriately with the Git backend. I'd been working with Git trouble before with bug 0cad2ac6-76ef-4a88-abdf-b2e02de76f5c, and decided things would be better off if I just scrapped the current RCS architecture and went to a more object oriented setup. So I did. It's not clear how to add support for an RCS backend: * Create a new module that - defines an inheritor of rsc.RCS, overriding the _rcs_*() methods - provide a new() function for instantizating the new class - defines an inheritor of rcs.RCStestCase, overiding the Class attribute - defines 'suite' a unittest.TestSuite testing the module * Add your new module to the rest in rcs._get_matching_rcs() * Add your new module to the rest in libbe/tests.py Although I'm not sure libbe/tests.py is still usefull. The new framework clears out a bunch of hackery that used to be involved with supporting becommands/diff.py. There's still room for progress though. While implementing the new verision, I moved the testing framework over from doctest to a doctest/unittest combination. Longer tests that don't demonstrate a function's usage should be moved to unittests at the end of the module, since unittest has better support for setup/teardown, etc. The new framework also revealed some underimplented backends, most notably arch. These backends have now been fixed. I also tweaked the test_usage.sh script to run through all the backends if it is called with no arguments. The fix for the dac bug turned out to be an unflushed file write :p.
Diffstat (limited to 'libbe')
-rw-r--r--libbe/arch.py357
-rw-r--r--libbe/bug.py53
-rw-r--r--libbe/bugdir.py97
-rw-r--r--libbe/bzr.py186
-rw-r--r--libbe/cmdutil.py15
-rw-r--r--libbe/config.py4
-rw-r--r--libbe/diff.py11
-rw-r--r--libbe/git.py218
-rw-r--r--libbe/hg.py167
-rw-r--r--libbe/mapfile.py8
-rw-r--r--libbe/names.py4
-rw-r--r--libbe/no_rcs.py51
-rw-r--r--libbe/plugin.py4
-rw-r--r--libbe/rcs.py578
-rw-r--r--libbe/restconvert.py4
-rw-r--r--libbe/tests.py46
-rw-r--r--libbe/utility.py12
17 files changed, 1123 insertions, 692 deletions
diff --git a/libbe/arch.py b/libbe/arch.py
index 001f852..8e7390d 100644
--- a/libbe/arch.py
+++ b/libbe/arch.py
@@ -15,184 +15,209 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import config
-import errno
+import shutil
+import time
+import re
+import unittest
+import doctest
-from rcs import invoke
+import config
+from rcs import RCS, RCStestCase, CommandError
client = config.get_val("arch_client")
if client is None:
client = "tla"
config.set_val("arch_client", client)
-
-def invoke_client(*args, **kwargs):
- cl_args = [client]
- cl_args.extend(args)
- status,output,error = invoke(cl_args)
- if status not in (0,):
- raise Exception("Command failed: %s" % error)
- return output
-
-def get_user_id():
- try:
- return invoke_client('my-id')
- except Exception, e:
- if 'no arch user id set' in e.args[0]:
- return None
+def new():
+ return Arch()
+
+class Arch(RCS):
+ name = "Arch"
+ client = client
+ versioned = True
+ _archive_name = None
+ _archive_dir = None
+ _tmp_archive = False
+ _project_name = None
+ _tmp_project = False
+ _arch_paramdir = os.path.expanduser("~/.arch-params")
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Arch"""
+ if self._u_search_parent_directories(path, "{arch}") != None :
+ return True
+ return False
+ def _rcs_root(self, path):
+ if not os.path.isdir(path):
+ dirname = os.path.dirname(path)
else:
- raise
-
-
-def set_user_id(value):
- invoke_client('my-id', value)
-
-
-def ensure_user_id():
- if get_user_id() is None:
- set_user_id('nobody <nobody@example.com>')
-
-
-def write_tree_settings(contents, path):
- file(os.path.join(path, "{arch}", "=tagging-method"), "wb").write(contents)
-
-def init_tree(path):
- invoke_client("init-tree", "-d", path)
-
-def temp_arch_tree(type="easy"):
- import tempfile
- ensure_user_id()
- path = tempfile.mkdtemp()
- init_tree(path)
- if type=="easy":
- write_tree_settings("source ^.*$\n", path)
- elif type=="tricky":
- write_tree_settings("source ^$\n", path)
- else:
- assert (type=="impossible")
- add_dir_rule("precious ^\.boo$", path, path)
- return path
-
-def list_added(root):
- assert os.path.exists(root)
- assert os.access(root, os.X_OK)
- root = os.path.realpath(root)
- inv_str = invoke_client("inventory", "--source", '--both', '--all', root)
- return [os.path.join(root, p) for p in inv_str.split('\n')]
-
-def tree_root(filename):
- assert os.path.exists(filename)
- if not os.path.isdir(filename):
- dirname = os.path.dirname(filename)
- else:
- dirname = filename
- return invoke_client("tree-root", dirname).rstrip('\n')
-
-def rel_filename(filename, root):
- filename = os.path.realpath(filename)
- root = os.path.realpath(root)
- assert(filename.startswith(root))
- return filename[len(root)+1:]
+ dirname = path
+ status,output,error = self._u_invoke_client("tree-root", dirname)
+ # get archive name...
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._create_archive(path)
+ self._create_project(path)
+ self._add_project_code(path)
+ def _create_archive(self, path):
+ # Create a new archive
+ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
+ assert self._archive_name == None
+ id = self.get_user_id()
+ name, email = self._u_parse_id(id)
+ if email == None:
+ email = "%s@example.com" % name
+ trailer = "%s-%s" % ("bugs-everywhere-auto",
+ time.strftime("%Y.%H.%M.%S"))
+ self._archive_name = "%s--%s" % (email, trailer)
+ self._archive_dir = "/tmp/%s" % trailer
+ self._tmp_archive = True
+ self._u_invoke_client("make-archive", self._archive_name,
+ self._archive_dir, directory=path)
+ def _invoke_client(self, *args, **kwargs):
+ """
+ Invoke the client on our archive.
+ """
+ assert self._archive_name != None
+ command = args[0]
+ if len(args) > 1:
+ tailargs = args[1:]
+ else:
+ tailargs = []
+ arglist = [command, "-A", self._archive_name]
+ arglist.extend(tailargs)
+ args = tuple(arglist)
+ return self._u_invoke_client(*args, **kwargs)
+ def _remove_archive(self):
+ assert self._tmp_archive == True
+ assert self._archive_dir != None
+ assert self._archive_name != None
+ os.remove(os.path.join(self._arch_paramdir,
+ "=locations", self._archive_name))
+ shutil.rmtree(self._archive_dir)
+ self._tmp_archive = False
+ self._archive_dir = False
+ self._archive_name = False
+ def _create_project(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
+ category = "bugs-everywhere"
+ branch = "mainline"
+ version = "0.1"
+ self._project_name = "%s--%s--%s" % (category, branch, version)
+ self._invoke_client("archive-setup", self._project_name,
+ directory=path)
+ def _remove_project(self):
+ assert self._tmp_project == True
+ assert self._project_name != None
+ assert self._archive_dir != None
+ shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
+ self._tmp_project = False
+ self._project_name = False
+ def _archive_project_name(self):
+ assert self._archive_name != None
+ assert self._project_name != None
+ return "%s/%s" % (self._archive_name, self._project_name)
+ def _add_project_code(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/importing-first.html#Importing_the_First_Revision
+ self._u_invoke_client("init-tree", self._archive_project_name(),
+ directory=path)
+ self._invoke_client("import", "--summary", "Began versioning",
+ directory=path)
+ def _rcs_cleanup(self):
+ if self._tmp_project == True:
+ self._remove_project()
+ if self._tmp_archive == True:
+ self._remove_archive()
+ def _rcs_get_user_id(self):
+ try:
+ status,output,error = self._u_invoke_client('my-id')
+ return output.rstrip('\n')
+ except Exception, e:
+ if 'no arch user id set' in e.args[0]:
+ return None
+ else:
+ raise
+ def _rcs_set_user_id(self, value):
+ self._u_invoke_client('my-id', value)
+ def _rcs_add(self, path):
+ self._u_invoke_client("add-id", path)
+ realpath = os.path.realpath(self._u_abspath(path))
+ pathAdded = realpath in self._list_added(self.rootdir)
+ if self.paranoid and not pathAdded:
+ self._force_source(path)
+ def _list_added(self, root):
+ assert os.path.exists(root)
+ assert os.access(root, os.X_OK)
+ root = os.path.realpath(root)
+ status,output,error = self._u_invoke_client("inventory", "--source",
+ "--both", "--all", root)
+ inv_str = output.rstrip('\n')
+ return [os.path.join(root, p) for p in inv_str.split('\n')]
+ def _add_dir_rule(self, rule, dirname, root):
+ inv_path = os.path.join(dirname, '.arch-inventory')
+ file(inv_path, "ab").write(rule)
+ if os.path.realpath(inv_path) not in self._list_added(root):
+ paranoid = self.paranoid
+ self.paranoid = False
+ self.add(inv_path)
+ self.paranoid = paranoid
+ def _force_source(self, path):
+ rule = "source %s\n" % self._u_rel_path(path)
+ self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
+ if os.path.realpath(path) not in self._list_added(self.rootdir):
+ raise CantAddFile(path)
+ def _rcs_remove(self, path):
+ if not '.arch-ids' in path:
+ self._u_invoke_client("delete-id", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(self._u_abspath(path), "rb").read()
+ else:
+ status,output,error = \
+ self._invoke_client("file-find", path, revision)
+ path = output.rstrip('\n')
+ return file(self._u_abspath(path), "rb").read()
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ status,output,error = \
+ self._u_invoke_client("get", revision,directory)
+ def _rcs_commit(self, commitfile):
+ summary,body = self._u_parse_commitfile(commitfile)
+ #status,output,error = self._invoke_client("make-log")
+ if body == None:
+ status,output,error \
+ = self._invoke_client("commit","--summary",summary)
+ else:
+ status,output,error \
+ = self._invoke_client("commit","--summary",summary,
+ "--log-message",body)
+ revision = None
+ revline = re.compile("[*] committed (.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revpath = match.groups()[0]
+ assert not " " in revpath, revpath
+ assert revpath.startswith(self._archive_project_name()+'--')
+ revision = revpath[len(self._archive_project_name()+'--'):]
+ return revpath
class CantAddFile(Exception):
def __init__(self, file):
self.file = file
Exception.__init__(self, "Can't automatically add file %s" % file)
+class ArchTestCase(RCStestCase):
+ Class = Arch
-def add_dir_rule(rule, dirname, root):
- inv_filename = os.path.join(dirname, '.arch-inventory')
- file(inv_filename, "ab").write(rule)
- if os.path.realpath(inv_filename) not in list_added(root):
- add_id(inv_filename, paranoid=False)
-
-def force_source(filename, root):
- rule = "source %s\n" % rel_filename(filename, root)
- add_dir_rule(rule, os.path.dirname(filename), root)
- if os.path.realpath(filename) not in list_added(root):
- raise CantAddFile(filename)
-
-def add_id(filename, paranoid=False):
- invoke_client("add-id", filename)
- root = tree_root(filename)
- if paranoid and os.path.realpath(filename) not in list_added(root):
- force_source(filename, root)
-
-
-def delete_id(filename):
- invoke_client("delete-id", filename)
-
-def test_helper(type):
- t = temp_arch_tree(type)
- dirname = os.path.join(t, ".boo")
- return dirname, t
-
-def mkdir(path, paranoid=False):
- """
- >>> import shutil
- >>> dirname,t = test_helper("easy")
- >>> mkdir(dirname, paranoid=False)
- >>> assert os.path.realpath(dirname) in list_added(t)
- >>> assert not os.path.exists(os.path.join(t, ".arch-inventory"))
- >>> shutil.rmtree(t)
- >>> dirname,t = test_helper("tricky")
- >>> mkdir(dirname, paranoid=True)
- >>> assert os.path.realpath(dirname) in list_added(t)
- >>> assert os.path.exists(os.path.join(t, ".arch-inventory"))
- >>> shutil.rmtree(t)
- >>> dirname,t = test_helper("impossible")
- >>> try:
- ... mkdir(dirname, paranoid=True)
- ... except CantAddFile, e:
- ... print "Can't add file"
- Can't add file
- >>> shutil.rmtree(t)
- """
- os.mkdir(path)
- add_id(path, paranoid=paranoid)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-
-def path_in_reference(bug_dir, spec):
- if spec is not None:
- return invoke_client("file-find", bug_dir, spec).rstrip('\n')
- return invoke_client("file-find", bug_dir).rstrip('\n')
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using Arch"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, "{arch}")):
- return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.join('..', path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- pass
-
-def postcommit(directory):
- pass
-
-
-name = "Arch"
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/bug.py b/libbe/bug.py
index f973cf0..a14f7fd 100644
--- a/libbe/bug.py
+++ b/libbe/bug.py
@@ -16,14 +16,12 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
import os.path
-import shutil
import errno
import names
import mapfile
import time
import utility
-from rcs import rcs_by_name
-
+import doctest
### Define and describe valid bug categories
# Use a tuple of (category, description) tuples since we don't have
@@ -89,7 +87,7 @@ class Bug(object):
severity = checked_property("severity", severity_values)
status = checked_property("status", status_values)
- def __init__(self, path, uuid, rcs_name, bugdir):
+ def __init__(self, path, uuid, rcs, bugdir):
self.path = path
self.uuid = uuid
if uuid is not None:
@@ -97,7 +95,7 @@ class Bug(object):
else:
dict = {}
- self.rcs_name = rcs_name
+ self.rcs = rcs
self.bugdir = bugdir
self.summary = dict.get("summary")
@@ -110,6 +108,17 @@ class Bug(object):
if self.time is not None:
self.time = utility.str_to_time(self.time)
+ def get_path(self, file=None):
+ if file == None:
+ return os.path.join(self.path, self.uuid)
+ else:
+ return os.path.join(self.path, self.uuid, file)
+
+ def _get_active(self):
+ return self.status in active_status_values
+
+ active = property(_get_active)
+
def __repr__(self):
return "Bug(uuid=%r)" % self.uuid
@@ -146,19 +155,13 @@ class Bug(object):
statuschar = self.status[0]
severitychar = self.severity[0]
chars = "%c%c" % (statuschar, severitychar)
- return "%s:%s: %s\n" % (short_name, chars, self.summary)
+ return "%s:%s: %s" % (short_name, chars, self.summary)
+
def __str__(self):
return self.string(shortlist=True)
- def get_path(self, file=None):
- if file == None:
- return os.path.join(self.path, self.uuid)
- else:
- return os.path.join(self.path, self.uuid, file)
- def _get_active(self):
- return self.status in active_status_values
-
- active = property(_get_active)
+ def __cmp__(self, other):
+ return cmp_full(self, other)
def add_attr(self, map, name):
value = getattr(self, name)
@@ -166,6 +169,7 @@ class Bug(object):
map[name] = value
def save(self):
+ assert self.summary != None, "Can't save blank bug"
map = {}
self.add_attr(map, "assigned")
self.add_attr(map, "summary")
@@ -176,22 +180,18 @@ class Bug(object):
if self.time is not None:
map["time"] = utility.time_to_str(self.time)
path = self.get_path("values")
- mapfile.map_save(rcs_by_name(self.rcs_name), path, map)
-
+ mapfile.map_save(self.rcs, path, map)
+
def remove(self):
path = self.get_path()
- shutil.rmtree(path)
+ self.rcs.recursive_remove(path)
- def _get_rcs(self):
- return rcs_by_name(self.rcs_name)
-
- rcs = property(_get_rcs)
-
def new_comment(self):
if not os.path.exists(self.get_path("comments")):
self.rcs.mkdir(self.get_path("comments"))
comm = Comment(None, self)
comm.uuid = names.uuid()
+ comm.rcs = self.rcs
return comm
def get_comment(self, uuid):
@@ -218,7 +218,7 @@ class Bug(object):
def new_bug(dir, uuid=None):
bug = dir.new_bug(uuid)
- bug.creator = names.creator()
+ bug.creator = bug.rcs.get_user_id()
bug.severity = "minor"
bug.status = "open"
bug.time = time.time()
@@ -226,7 +226,7 @@ def new_bug(dir, uuid=None):
def new_comment(bug, body=None):
comm = bug.new_comment()
- comm.From = names.creator()
+ comm.From = comm.rcs.get_user_id()
comm.time = time.time()
comm.body = body
return comm
@@ -276,7 +276,6 @@ class Comment(object):
mapfile.map_save(self.bug.rcs, self.get_path("values"), map_file)
self.bug.rcs.set_file_contents(self.get_path("body"),
self.body.encode('utf-8'))
-
def get_path(self, name=None):
my_dir = os.path.join(self.bug.get_path("comments"), self.uuid)
@@ -387,3 +386,5 @@ class InvalidValue(ValueError):
Exception.__init__(self, msg)
self.name = name
self.value = value
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index f8f45b8..cf8cba5 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -18,11 +18,13 @@ import os
import os.path
import cmdutil
import errno
+import unittest
+import doctest
import names
import mapfile
import time
import utility
-from rcs import rcs_by_name
+from rcs import rcs_by_name, installed_rcs
from bug import Bug
class NoBugDir(Exception):
@@ -84,22 +86,20 @@ class AlreadyInitialized(Exception):
Exception.__init__(self,
"Specified root is already initialized: %s" % path)
+def bugdir_root(versioning_root):
+ return os.path.join(versioning_root, ".be")
+
def create_bug_dir(path, rcs):
"""
- >>> import no_rcs, tests
- >>> create_bug_dir('/highly-unlikely-to-exist', no_rcs)
+ >>> import tests
+ >>> rcs = rcs_by_name("None")
+ >>> create_bug_dir('/highly-unlikely-to-exist', rcs)
Traceback (most recent call last):
NoRootEntry: Specified root does not exist: /highly-unlikely-to-exist
- >>> test_dir = os.path.dirname(tests.bug_arch_dir().dir)
- >>> try:
- ... create_bug_dir(test_dir, no_rcs)
- ... except AlreadyInitialized, e:
- ... print "Already Initialized"
- Already Initialized
"""
root = os.path.join(path, ".be")
try:
- rcs.mkdir(root, paranoid=True)
+ rcs.mkdir(root)
except OSError, e:
if e.errno == errno.ENOENT:
raise NoRootEntry(path)
@@ -111,7 +111,7 @@ def create_bug_dir(path, rcs):
set_version(root, rcs)
mapfile.map_save(rcs,
os.path.join(root, "settings"), {"rcs_name": rcs.name})
- return BugDir(os.path.join(path, ".be"))
+ return BugDir(bugdir_root(path))
def setting_property(name, valid=None):
@@ -139,11 +139,12 @@ class BugDir:
self.dir = dir
self.bugs_path = os.path.join(self.dir, "bugs")
try:
- self.settings = mapfile.map_load(os.path.join(self.dir, "settings"))
+ self.settings = mapfile.map_load(os.path.join(self.dir,"settings"))
except mapfile.NoSuchFile:
self.settings = {"rcs_name": "None"}
- rcs_name = setting_property("rcs_name", ("None", "bzr", "git", "Arch", "hg"))
+ rcs_name = setting_property("rcs_name",
+ ("None", "bzr", "git", "Arch", "hg"))
_rcs = None
target = setting_property("target")
@@ -152,16 +153,21 @@ class BugDir:
mapfile.map_save(self.rcs,
os.path.join(self.dir, "settings"), self.settings)
- def get_rcs(self):
- if self._rcs is not None and self.rcs_name == self._rcs.name:
- return self._rcs
+ def _get_rcs(self):
+ if self._rcs is not None:
+ if self.rcs_name == self._rcs.name:
+ return self._rcs
self._rcs = rcs_by_name(self.rcs_name)
+ self._rcs.root(self.dir)
return self._rcs
- rcs = property(get_rcs)
+ rcs = property(_get_rcs)
+
+ def duplicate_bugdir(self, revision):
+ return BugDir(bugdir_root(self.rcs.duplicate_repo(revision)))
- def get_reference_bugdir(self, spec):
- return BugDir(self.rcs.path_in_reference(self.dir, spec))
+ def remove_duplicate_bugdir(self):
+ self.rcs.remove_duplicate_repo()
def list(self):
for uuid in self.list_uuids():
@@ -174,7 +180,7 @@ class BugDir:
return bugs
def get_bug(self, uuid):
- return Bug(self.bugs_path, uuid, self.rcs_name, self)
+ return Bug(self.bugs_path, uuid, self.rcs, self)
def list_uuids(self):
for uuid in os.listdir(self.bugs_path):
@@ -187,7 +193,7 @@ class BugDir:
uuid = names.uuid()
path = os.path.join(self.bugs_path, uuid)
self.rcs.mkdir(path)
- bug = Bug(self.bugs_path, None, self.rcs_name, self)
+ bug = Bug(self.bugs_path, None, self.rcs, self)
bug.uuid = uuid
return bug
@@ -197,3 +203,52 @@ class InvalidValue(ValueError):
Exception.__init__(self, msg)
self.name = name
self.value = value
+
+def simple_bug_dir():
+ """
+ For testing
+ >>> bugdir = simple_bug_dir()
+ >>> ls = list(bugdir.list_uuids())
+ >>> ls.sort()
+ >>> print ls
+ ['a', 'b']
+ """
+ dir = utility.Dir()
+ rcs = installed_rcs()
+ rcs.init(dir.path)
+ assert os.path.exists(dir.path)
+ bugdir = create_bug_dir(dir.path, rcs)
+ bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir.
+ bug_a = bugdir.new_bug("a")
+ bug_a.summary = "Bug A"
+ bug_a.save()
+ bug_b = bugdir.new_bug("b")
+ bug_b.status = "closed"
+ bug_b.summary = "Bug B"
+ bug_b.save()
+ return bugdir
+
+
+class BugDirTestCase(unittest.TestCase):
+ def __init__(self, *args, **kwargs):
+ unittest.TestCase.__init__(self, *args, **kwargs)
+ def setUp(self):
+ self.dir = utility.Dir()
+ self.rcs = installed_rcs()
+ self.rcs.init(self.dir.path)
+ self.bugdir = create_bug_dir(self.dir.path, self.rcs)
+ def tearDown(self):
+ del(self.rcs)
+ del(self.dir)
+ def fullPath(self, path):
+ return os.path.join(self.dir.path, path)
+ def assertPathExists(self, path):
+ fullpath = self.fullPath(path)
+ self.failUnless(os.path.exists(fullpath)==True,
+ "path %s does not exist" % fullpath)
+ def testBugDirDuplicate(self):
+ self.assertRaises(AlreadyInitialized, create_bug_dir,
+ self.dir.path, self.rcs)
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/bzr.py b/libbe/bzr.py
index ddda334..a0ae715 100644
--- a/libbe/bzr.py
+++ b/libbe/bzr.py
@@ -15,114 +15,84 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import tempfile
-
-import config
-from rcs import invoke, CommandError
-
-def invoke_client(*args, **kwargs):
- directory = kwargs['directory']
- expect = kwargs.get('expect', (0, 1))
- cl_args = ["bzr"]
- cl_args.extend(args)
- status,output,error = invoke(cl_args, expect, cwd=directory)
- return status, output
-
-def add_id(filename, paranoid=False):
- invoke_client("add", filename, directory='.')
-
-def delete_id(filename):
- invoke_client("remove", filename, directory='.')
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
- add_id(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def lookup_revision(revno, directory):
- return invoke_client("lookup-revision", str(revno),
- directory=directory)[1].rstrip('\n')
-
-def export(revno, directory, revision_dir):
- invoke_client("export", "-r", str(revno), revision_dir, directory=directory)
-
-def find_or_make_export(revno, directory):
- revision_id = lookup_revision(revno, directory)
- home = os.path.expanduser("~")
- revision_root = os.path.join(home, ".bzrrevs")
- if not os.path.exists(revision_root):
- os.mkdir(revision_root)
- revision_dir = os.path.join(revision_root, revision_id)
- if not os.path.exists(revision_dir):
- export(revno, directory, revision_dir)
- return revision_dir
-
-def bzr_root(path):
- return invoke_client("root", path, directory=None)[1].rstrip('\r')
-
-def path_in_reference(bug_dir, spec):
- if spec is None:
- spec = int(invoke_client("revno", directory=bug_dir)[1])
- rel_bug_dir = bug_dir[len(bzr_root(bug_dir)):]
- export_root = find_or_make_export(spec, directory=bug_dir)
- return os.path.join(export_root, rel_bug_dir)
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using bzr"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, ".bzr")):
+import re
+import unittest
+import doctest
+
+from rcs import RCS, RCStestCase, CommandError
+
+def new():
+ return Bzr()
+
+class Bzr(RCS):
+ name = "bzr"
+ client = "bzr"
+ versioned = True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ if self._u_search_parent_directories(path, ".bzr") != None :
return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.dirname(path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- if body is not None:
- summary += '\n' + body
- descriptor, filename = tempfile.mkstemp()
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.close()
- invoke_client('commit', '--unchanged', '--file', filename,
- directory=directory)
- finally:
- os.unlink(filename)
-
-def postcommit(directory):
- try:
- invoke_client('merge', directory=directory)
- except CommandError, e:
- if ('No merge branch known or specified' in e.err_str or
- 'No merge location known or specified' in e.err_str):
- pass
+ return False
+ def _rcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ status,output,error = self._u_invoke_client("root", path)
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("whoami")
+ return output.rstrip('\n')
+ def _rcs_set_user_id(self, value):
+ self._u_invoke_client("whoami", value)
+ def _rcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ # --force to also remove unversioned files.
+ self._u_invoke_client("remove", "--force", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(os.path.join(self.rootdir, path), "rb").read()
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
else:
- status = invoke_client('revert', '--no-backup',
+ self._u_invoke_client("branch", "--revision", revision,
+ ".", directory)
+ def _rcs_commit(self, commitfile):
+ status,output,error = self._u_invoke_client("commit", "--unchanged",
+ "--file", commitfile)
+ revision = None
+ revline = re.compile("Committed revision (.*)[.]")
+ match = revline.search(error)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+ def postcommit(self):
+ try:
+ self._u_invoke_client('merge')
+ except CommandError, e:
+ if ('No merge branch known or specified' in e.err_str or
+ 'No merge location known or specified' in e.err_str):
+ pass
+ else:
+ self._u_invoke_client('revert', '--no-backup',
directory=directory)
- status = invoke_client('resolve', '--all', directory=directory)
- raise
- if len(invoke_client('status', directory=directory)[1]) > 0:
- commit(directory, 'Merge from upstream')
-
-name = "bzr"
+ self._u_invoke_client('resolve', '--all', directory=directory)
+ raise
+ if len(self._u_invoke_client('status', directory=directory)[1]) > 0:
+ self.commit('Merge from upstream')
+
+class BzrTestCase(RCStestCase):
+ Class = Bzr
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(BzrTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/cmdutil.py b/libbe/cmdutil.py
index ace2d81..62a0c7c 100644
--- a/libbe/cmdutil.py
+++ b/libbe/cmdutil.py
@@ -22,6 +22,7 @@ import optparse
from textwrap import TextWrapper
from StringIO import StringIO
import utility
+import doctest
class UserError(Exception):
def __init__(self, msg):
@@ -33,6 +34,18 @@ class UserErrorWrap(UserError):
self.exception = exception
def get_bug(spec, bug_dir=None):
+ """
+ >>> bd = bugdir.simple_bug_dir()
+ >>> bug_a = get_bug('a', bd)
+ >>> print type(bug_a)
+ <class 'libbe.bug.Bug'>
+ >>> print bug_a
+ a:om: Bug A
+ >>> print bd.get_bug('a')
+ a:om: Bug A
+ >>> bug_a == bd.get_bug('a')
+ True
+ """
matches = []
try:
if bug_dir is None:
@@ -206,3 +219,5 @@ def _test():
if __name__ == "__main__":
_test()
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/config.py b/libbe/config.py
index ecc40ce..79c0d6f 100644
--- a/libbe/config.py
+++ b/libbe/config.py
@@ -16,6 +16,8 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import ConfigParser
import os.path
+import doctest
+
def path():
"""Return the path to the per-user config file"""
return os.path.expanduser("~/.bugs_everywhere")
@@ -58,3 +60,5 @@ def get_val(name, section="DEFAULT"):
return config.get(section, name)
except ConfigParser.NoOptionError:
return None
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/diff.py b/libbe/diff.py
index 7a1dbcc..9fa3816 100644
--- a/libbe/diff.py
+++ b/libbe/diff.py
@@ -18,6 +18,7 @@
from libbe import cmdutil, bugdir
from libbe.utility import time_to_str
from libbe.bug import cmp_severity
+import doctest
def diff(old_tree, new_tree):
old_bug_map = old_tree.bug_map()
@@ -38,9 +39,11 @@ def diff(old_tree, new_tree):
return (removed, modified, added)
-def reference_diff(bugdir, spec=None):
- return diff(bugdir.get_reference_bugdir(spec), bugdir)
-
+def reference_diff(bugdir, revision=None):
+ d = diff(bugdir.duplicate_bugdir(revision), bugdir)
+ bugdir.remove_duplicate_bugdir()
+ return d
+
def diff_report(diff_data, bug_dir):
(removed, modified, added) = diff_data
bugs = list(bug_dir.list())
@@ -109,3 +112,5 @@ def bug_changes(old, new, bugs):
def comment_summary(comment, status):
return "%8s comment from %s on %s" % (status, comment.From,
time_to_str(comment.time))
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/git.py b/libbe/git.py
index e15d773..046e72e 100644
--- a/libbe/git.py
+++ b/libbe/git.py
@@ -14,140 +14,86 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import tempfile
-
-from rcs import invoke
-
-def strip_git(filename):
- # Find the base path of the GIT tree, in order to strip that leading
- # path from arguments to git -- it doesn't like absolute paths.
- if os.path.isabs(filename):
- absRepoDir = os.path.abspath(git_repo_for_path('.'))
- absRepoSlashedDir = os.path.join(absRepoDir,"")
- assert filename.startswith(absRepoSlashedDir), \
- "file %s not in git repo %s" % (filename, absRepoSlashedDir)
- filename = filename[len(absRepoSlashedDir):]
- return filename
-
-def invoke_client(*args, **kwargs):
- directory = kwargs['directory']
- expect = kwargs.get('expect', (0, 1))
- cl_args = ["git"]
- cl_args.extend(args)
- status,output,error = invoke(cl_args, expect, cwd=directory)
- return status, output
-
-def add_id(filename, paranoid=False):
- filename = strip_git(filename)
- invoke_client("add", filename, directory=git_repo_for_path('.'))
-
-def delete_id(filename):
- filename = strip_git(filename)
- invoke_client("rm", filename, directory=git_repo_for_path('.'))
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def detect(path):
- """Detect whether a directory is revision-controlled using GIT"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, ".git")):
+import re
+import unittest
+import doctest
+
+from rcs import RCS, RCStestCase, CommandError
+
+def new():
+ return Git()
+
+class Git(RCS):
+ name="git"
+ client="git"
+ versioned=True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ if self._u_search_parent_directories(path, ".git") != None :
return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.dirname(path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- if body is not None:
- summary += '\n' + body
- descriptor, filename = tempfile.mkstemp()
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.close()
- invoke_client('commit', '-a', '-F', filename, directory=directory)
- finally:
- os.unlink(filename)
-
-def postcommit(directory):
- pass
-
-
-# In order to diff the bug database, you need a way to check out arbitrary
-# previous revisions and a mechanism for locating the bug_dir in the revision
-# you've checked out.
-#
-# Copying the Mercurial implementation, this feature is implemented by four
-# functions:
-#
-# git_dir_for_path : find '.git' for a git tree.
-#
-# export : check out a commit 'spec' from git-repo 'bug_dir' into a dir
-# 'revision_dir'
-#
-# find_or_make_export : check out a commit 'spec' from git repo 'directory' to
-# any location you please and return the path to the checkout
-#
-# path_in_reference : return a path to the bug_dir of the commit 'spec'
-
-def git_repo_for_path(path):
- """Find the root of the deepest repository containing path."""
- # Assume that nothing funny is going on; in particular, that we aren't
- # dealing with a bare repo.
- dirname = os.path.dirname(git_dir_for_path(path))
- if dirname == '' : # os.path.dirname('filename') == ''
- dirname = '.'
- return dirname
-
-def git_dir_for_path(path):
- """Find the git-dir of the deepest repo containing path."""
- return invoke_client("rev-parse", "--git-dir", directory=path)[1].rstrip()
-
-def export(spec, bug_dir, revision_dir):
- """Check out commit 'spec' from the git repo containing bug_dir into
- 'revision_dir'."""
- if not os.path.exists(revision_dir):
- os.makedirs(revision_dir)
- invoke_client("init", directory=revision_dir)
- invoke_client("pull", git_dir_for_path(bug_dir), directory=revision_dir)
- invoke_client("checkout", '-f', spec, directory=revision_dir)
-
-def find_or_make_export(spec, directory):
- """Checkout 'spec' from the repo at 'directory' by hook or by crook and
- return the path to the working copy."""
- home = os.path.expanduser("~")
- revision_root = os.path.join(home, ".be_revs")
- if not os.path.exists(revision_root):
- os.mkdir(revision_root)
- revision_dir = os.path.join(revision_root, spec)
- if not os.path.exists(revision_dir):
- export(spec, directory, revision_dir)
- return revision_dir
-
-def path_in_reference(bug_dir, spec):
- """Check out 'spec' and return the path to its bug_dir."""
- spec = spec or 'HEAD'
- spec = invoke_client('rev-parse', spec, directory=bug_dir)[1].rstrip()
- # This is a really hairy computation.
- # The theory is that we can't possibly be working out of a bare repo;
- # hence, we get the rel_bug_dir by chopping off dirname(git_dir_for_path(bug_dir))
- # + '/'.
- rel_bug_dir = strip_git(bug_dir)
- export_root = find_or_make_export(spec, directory=bug_dir)
- return os.path.join(export_root, rel_bug_dir)
-
-
-name = "git"
-
+ return False
+ def _rcs_root(self, path):
+ """Find the root of the deepest repository containing path."""
+ # Assume that nothing funny is going on; in particular, that we aren't
+ # dealing with a bare repo.
+ if os.path.isdir(path) != True:
+ path = os.path.dirname(path)
+ status,output,error = self._u_invoke_client("rev-parse", "--git-dir",
+ directory=path)
+ gitdir = os.path.join(path, output.rstrip('\n'))
+ dirname = os.path.abspath(os.path.dirname(gitdir))
+ return dirname
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("config", "user.name")
+ name = output.rstrip('\n')
+ status,output,error = self._u_invoke_client("config", "user.email")
+ email = output.rstrip('\n')
+ return self._u_create_id(name, email)
+ def _rcs_set_user_id(self, value):
+ name,email = self._u_parse_id(value)
+ if email != None:
+ self._u_invoke_client("config", "user.email", email)
+ self._u_invoke_client("config", "user.name", name)
+ def _rcs_add(self, path):
+ if os.path.isdir(path):
+ return
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ if not os.path.isdir(self._u_abspath(path)):
+ self._u_invoke_client("rm", "-f", path)
+ def _rcs_update(self, path):
+ self._rcs_add(path)
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(self._u_abspath(path), "rb").read()
+ else:
+ arg = "%s:%s" % (revision,path)
+ status,output,error = self._u_invoke_client("show", arg)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision==None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ #self._u_invoke_client("archive", revision, directory) # makes tarball
+ self._u_invoke_client("clone", "--no-checkout",".",directory)
+ self._u_invoke_client("checkout", revision, directory=directory)
+ def _rcs_commit(self, commitfile):
+ status,output,error = self._u_invoke_client('commit', '-a',
+ '-F', commitfile)
+ revision = None
+ revline = re.compile("Created (.*)commit (.*):(.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 3
+ revision = match.groups()[1]
+ return revision
+
+class GitTestCase(RCStestCase):
+ Class = Git
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(GitTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/hg.py b/libbe/hg.py
index 35de8e0..27cbb79 100644
--- a/libbe/hg.py
+++ b/libbe/hg.py
@@ -14,102 +14,73 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import tempfile
-
-import config
-from rcs import invoke, CommandError
-
-def invoke_client(*args, **kwargs):
- directory = kwargs['directory']
- expect = kwargs.get('expect', (0, 1))
- cl_args = ["hg"]
- cl_args.extend(args)
- status,output,error = invoke(cl_args, expect, cwd=directory)
- return status, output
-
-def add_id(filename, paranoid=False):
- invoke_client("add", filename, directory='.')
-
-def delete_id(filename):
- invoke_client("rm", filename, directory='.')
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def lookup_revision(revno, directory):
- return invoke_client('log', '--rev', str(revno), '--template={node}',
- directory=directory)[1].rstrip('\n')
-
-def export(revno, directory, revision_dir):
- invoke_client("archive", "--rev", str(revno), revision_dir,
- directory=directory)
-
-def find_or_make_export(revno, directory):
- revision_id = lookup_revision(revno, directory)
- home = os.path.expanduser("~")
- revision_root = os.path.join(home, ".be_revs")
- if not os.path.exists(revision_root):
- os.mkdir(revision_root)
- revision_dir = os.path.join(revision_root, revision_id)
- if not os.path.exists(revision_dir):
- export(revno, directory, revision_dir)
- return revision_dir
-
-def hg_root(path):
- return invoke_client("root", "-R", path, directory=None)[1].rstrip('\r')
-
-def path_in_reference(bug_dir, spec):
- if spec is None:
- spec = int(invoke_client('tip', '--template="{rev}"',
- directory=bug_dir)[1])
- rel_bug_dir = bug_dir[len(hg_root(bug_dir)):]
- export_root = find_or_make_export(spec, directory=bug_dir)
- return os.path.join(export_root, rel_bug_dir)
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using Mercurial"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, ".hg")):
+import re
+import unittest
+import doctest
+
+from rcs import RCS, RCStestCase, CommandError, SettingIDnotSupported
+
+def new():
+ return Hg()
+
+class Hg(RCS):
+ name="hg"
+ client="hg"
+ versioned=True
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Mercurial"""
+ if self._u_search_parent_directories(path, ".hg") != None:
return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.dirname(path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- if body is not None:
- summary += '\n' + body
- descriptor, filename = tempfile.mkstemp()
- try:
- temp_file = os.fdopen(descriptor, 'wb')
- temp_file.write(summary)
- temp_file.close()
- invoke_client('commit', '--logfile', filename, directory=directory)
- finally:
- os.unlink(filename)
-
-def postcommit(directory):
- pass
-
-name = "hg"
+ return False
+ def _rcs_root(self, path):
+ status,output,error = self._u_invoke_client("root", directory=path)
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._u_invoke_client("init", directory=path)
+ def _rcs_get_user_id(self):
+ status,output,error = self._u_invoke_client("showconfig","ui.username")
+ return output.rstrip('\n')
+ def _rcs_set_user_id(self, value):
+ """
+ Supported by the Config Extension, but that is not part of
+ standard Mercurial.
+ http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
+ """
+ raise SettingIDnotSupported
+ def _rcs_add(self, path):
+ self._u_invoke_client("add", path)
+ def _rcs_remove(self, path):
+ self._u_invoke_client("rm", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(os.path.join(self.rootdir, path), "rb").read()
+ else:
+ status,output,error = \
+ self._u_invoke_client("cat","-r",revision,path)
+ return output
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ self._u_invoke_client("archive", "--rev", revision, directory)
+ def _rcs_commit(self, commitfile):
+ self._u_invoke_client('commit', '--logfile', commitfile)
+ status,output,error = self._u_invoke_client('identify')
+ revision = None
+ revline = re.compile("(.*) tip")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revision = match.groups()[0]
+ return revision
+
+class HgTestCase(RCStestCase):
+ Class = Hg
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(HgTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/mapfile.py b/libbe/mapfile.py
index 3f09edd..8f69554 100644
--- a/libbe/mapfile.py
+++ b/libbe/mapfile.py
@@ -17,6 +17,7 @@
import os.path
import errno
import utility
+import doctest
class IllegalKey(Exception):
def __init__(self, key):
@@ -107,8 +108,11 @@ def map_save(rcs, path, map):
add = not os.path.exists(path)
output = file(path, "wb")
generate(output, map)
+ output.close()
if add:
- rcs.add_id(path)
+ rcs.add(path)
+ else:
+ rcs.update(path)
class NoSuchFile(Exception):
def __init__(self, pathname):
@@ -122,3 +126,5 @@ def map_load(path):
if e.errno != errno.ENOENT:
raise e
raise NoSuchFile(path)
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/names.py b/libbe/names.py
index c86063d..b866f75 100644
--- a/libbe/names.py
+++ b/libbe/names.py
@@ -17,7 +17,7 @@
import os
import sys
-
+import doctest
def uuid():
# this code borrowed from standard commands module
@@ -53,3 +53,5 @@ def unique_name(bug, bugs):
while (bug.uuid[:chars] == some_bug.uuid[:chars]):
chars+=1
return bug.uuid[:chars]
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/no_rcs.py b/libbe/no_rcs.py
deleted file mode 100644
index 1b3b005..0000000
--- a/libbe/no_rcs.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
-# <abentley@panoramicfeedback.com>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import os
-import config
-from os import unlink
-
-def add_id(filename, paranoid=False):
- """Compatibility function"""
- pass
-
-def delete_id(filename):
- """Compatibility function"""
- pass
-
-def mkdir(path, paranoid=False):
- os.mkdir(path)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-def detect(path):
- """Compatibility function"""
- return True
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- pass
-
-def postcommit(directory):
- pass
-
-name = "None"
diff --git a/libbe/plugin.py b/libbe/plugin.py
index 9254986..05a4398 100644
--- a/libbe/plugin.py
+++ b/libbe/plugin.py
@@ -17,6 +17,8 @@
import os
import os.path
import sys
+import doctest
+
def my_import(mod_name):
module = __import__(mod_name)
components = mod_name.split('.')
@@ -56,6 +58,8 @@ plugin_path = os.path.realpath(os.path.dirname(os.path.dirname(__file__)))
if plugin_path not in sys.path:
sys.path.append(plugin_path)
+suite = doctest.DocTestSuite()
+
def _test():
import doctest
doctest.testmod()
diff --git a/libbe/rcs.py b/libbe/rcs.py
index 4487fba..2993a80 100644
--- a/libbe/rcs.py
+++ b/libbe/rcs.py
@@ -15,42 +15,43 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from subprocess import Popen, PIPE
+import os
+import os.path
+from socket import gethostname
+import re
import sys
+import tempfile
+import shutil
+import unittest
+import doctest
+from utility import Dir
-def rcs_by_name(rcs_name):
- """Return the module for the RCS with the given name"""
- if rcs_name == "Arch":
- import arch
- return arch
- elif rcs_name == "bzr":
- import bzr
- return bzr
- elif rcs_name == "hg":
- import hg
- return hg
- elif rcs_name == "git":
- import git
- return git
- elif rcs_name == "None":
- import no_rcs
- return no_rcs
-
-def detect(dir):
- """Return the module for the rcs being used in this directory"""
+def _get_matching_rcs(matchfn):
+ """Return the first module for which matchfn(RCS_instance) is true"""
import arch
import bzr
import hg
import git
- if arch.detect(dir):
- return arch
- elif bzr.detect(dir):
- return bzr
- elif hg.detect(dir):
- return hg
- elif git.detect(dir):
- return git
- import no_rcs
- return no_rcs
+ for module in [arch, bzr, hg, git]:
+ rcs = module.new()
+ if matchfn(rcs):
+ return rcs
+ else:
+ del(rcs)
+ return RCS()
+
+def rcs_by_name(rcs_name):
+ """Return the module for the RCS with the given name"""
+ return _get_matching_rcs(lambda rcs: rcs.name == rcs_name)
+
+def detect_rcs(dir):
+ """Return an RCS instance for the rcs being used in this directory"""
+ return _get_matching_rcs(lambda rcs: rcs.detect(dir))
+
+def installed_rcs():
+ """Return an instance of an installed RCS"""
+ return _get_matching_rcs(lambda rcs: rcs.installed())
+
class CommandError(Exception):
def __init__(self, err_str, status):
@@ -58,19 +59,506 @@ class CommandError(Exception):
self.err_str = err_str
self.status = status
-def invoke(args, expect=(0,), cwd=None):
- try :
- if sys.platform != "win32":
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
+class SettingIDnotSupported(NotImplementedError):
+ pass
+
+def new():
+ return RCS()
+
+class RCS(object):
+ """
+ Implement the 'no-rcs' interface.
+
+ Support for other RCSs can be added by subclassing this class, and
+ overriding methods _rcs_*() with code appropriate for your RCS.
+
+ The methods _u_*() are utility methods available to the _rcs_*()
+ methods.
+ """
+ name = "None"
+ client = "" # command-line tool for _u_invoke_client
+ versioned = False
+ def __init__(self, paranoid=False):
+ self.paranoid = paranoid
+ self.verboseInvoke = False
+ self.rootdir = None
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ def __del__(self):
+ self.cleanup()
+
+ def _rcs_help(self):
+ """
+ Return the command help string.
+ (Allows a simple test to see if the client is installed.)
+ """
+ pass
+ def _rcs_detect(self, path=None):
+ """
+ Detect whether a directory is revision controlled with this RCS.
+ """
+ return True
+ def _rcs_root(self, path):
+ """
+ Get the RCS root. This is the default working directory for
+ future invocations. You would normally set this to the root
+ directory for your RCS.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ if path == "":
+ path = os.path.abspath(".")
+ return path
+ def _rcs_init(self, path):
+ """
+ Begin versioning the tree based at path.
+ """
+ pass
+ def _rcs_cleanup(self):
+ """
+ Remove any cruft that _rcs_init() created outside of the
+ versioned tree.
+ """
+ pass
+ def _rcs_get_user_id(self):
+ """
+ Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the RCS has not been configured with a username, return None.
+ """
+ return None
+ def _rcs_set_user_id(self, value):
+ """
+ Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the RCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ raise SettingIDnotSupported
+ def _rcs_add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ pass
+ def _rcs_remove(self, path):
+ """
+ Remove the file at path from version control. Optionally
+ remove the file from the filesystem as well.
+ """
+ pass
+ def _rcs_update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ """
+ Get the file as it was in a given revision.
+ Revision==None specifies the current revision.
+ """
+ assert revision == None, \
+ "The %s RCS does not support revision specifiers" % self.name
+ return file(os.path.join(self.rootdir, path), "rb").read()
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ dir specifies a directory to create the duplicate in.
+ """
+ shutil.copytree(self.rootdir, directory, True)
+ def _rcs_commit(self, commitfile):
+ """
+ Commit the current working directory, using the contents of
+ commitfile as the comment. Return the name of the old
+ revision.
+ """
+ return None
+ def installed(self):
+ try:
+ self._rcs_help()
+ return True
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ return False
+ raise e
+ def detect(self, path=None):
+ """
+ Detect whether a directory is revision controlled with this RCS.
+ """
+ return self._rcs_detect(path)
+ def root(self, path):
+ """
+ Set the root directory to the path's RCS root. This is the
+ default working directory for future invocations.
+ """
+ self.rootdir = self._rcs_root(path)
+ def init(self, path):
+ """
+ Begin versioning the tree based at path.
+ Also roots the rcs at path.
+ """
+ if os.path.isdir(path)==False:
+ path = os.path.dirname(path)
+ self._rcs_init(path)
+ self.root(path)
+ def cleanup(self):
+ self._rcs_cleanup()
+ def get_user_id(self):
+ """
+ Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the RCS has not been configured with a username, return the user's
+ id.
+ """
+ id = self._rcs_get_user_id()
+ if id == None:
+ name = self._u_get_fallback_username()
+ email = self._u_get_fallback_email()
+ id = self._u_create_id(name, email)
+ print >> sys.stderr, "Guessing id '%s'" % id
+ try:
+ self.set_user_id(id)
+ except SettingIDnotSupported:
+ pass
+ return id
+ def set_user_id(self, value):
+ """
+ Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the RCS has not been configured with a usename, so
+ that commits will have a reasonable FROM value.
+ """
+ self._rcs_set_user_id(value)
+ def add(self, path):
+ """
+ Add the already created file at path to version control.
+ """
+ self._rcs_add(self._u_rel_path(path))
+ def remove(self, path):
+ """
+ Remove a file from both version control and the filesystem.
+ """
+ self._rcs_remove(self._u_rel_path(path))
+ if os.path.exists(path):
+ os.remove(path)
+ def recursive_remove(self, dirname):
+ """
+ Remove a file/directory and all its decendents from both
+ version control and the filesystem.
+ """
+ for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
+ filenames.extend(dirnames)
+ for path in filenames:
+ fullpath = os.path.join(dirpath, path)
+ if os.path.exists(fullpath) == False:
+ continue
+ self._rcs_remove(self._u_rel_path(fullpath))
+ if os.path.exists(dirname):
+ shutil.rmtree(dirname)
+ def update(self, path):
+ """
+ Notify the versioning system of changes to the versioned file
+ at path.
+ """
+ self._rcs_update(self._u_rel_path(path))
+ def get_file_contents(self, path, revision=None):
+ """
+ Get the file as it was in a given revision.
+ Revision==None specifies the current revision.
+ """
+ relpath = self._u_rel_path(path)
+ return self._rcs_get_file_contents(relpath, revision)
+ def set_file_contents(self, path, contents):
+ """
+ Set the file contents under version control.
+ """
+ add = not os.path.exists(path)
+ file(path, "wb").write(contents)
+ if add:
+ self.add(path)
+ else:
+ self.update(path)
+ def mkdir(self, path):
+ """
+ Created directory at path under version control.
+ """
+ os.mkdir(path)
+ self.add(path)
+ def duplicate_repo(self, revision=None):
+ """
+ Get the repository as it was in a given revision.
+ revision==None specifies the current revision.
+ Return the path to the arbitrary directory at the base of the new repo.
+ """
+ # Dirname in Baseir to protect against simlink attacks.
+ if self._duplicateBasedir == None:
+ self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs')
+ self._duplicateDirname = \
+ os.path.join(self._duplicateBasedir, "duplicate")
+ self._rcs_duplicate_repo(directory=self._duplicateDirname,
+ revision=revision)
+ return self._duplicateDirname
+ def remove_duplicate_repo(self):
+ """
+ Clean up a duplicate repo created with duplicate_repo().
+ """
+ if self._duplicateBasedir != None:
+ shutil.rmtree(self._duplicateBasedir)
+ self._duplicateBasedir = None
+ self._duplicateDirname = None
+ def commit(self, summary, body=None):
+ """
+ Commit the current working directory, with a commit message
+ string summary and body. Return the name of the old revision
+ (or None if versioning is not supported).
+ """
+ if body is not None:
+ summary += '\n' + body
+ descriptor, filename = tempfile.mkstemp()
+ revision = None
+ try:
+ temp_file = os.fdopen(descriptor, 'wb')
+ temp_file.write(summary)
+ temp_file.flush()
+ revision = self._rcs_commit(filename)
+ temp_file.close()
+ finally:
+ os.remove(filename)
+ return revision
+ def precommit(self, directory):
+ pass
+ def postcommit(self, directory):
+ pass
+ def _u_invoke(self, args, expect=(0,), cwd=None):
+ if cwd == None:
+ cwd = self.rootdir
+ try :
+ if self.verboseInvoke == True:
+ print "%s$ %s" % (cwd, " ".join(args))
+ if sys.platform != "win32":
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
+ else:
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ shell=True, cwd=cwd)
+ except OSError, e :
+ strerror = "%s\nwhile executing %s" % (e.args[1], args)
+ raise CommandError(strerror, e.args[0])
+ output, error = q.communicate()
+ status = q.wait()
+ if status not in expect:
+ raise CommandError(error, status)
+ return status, output, error
+ def _u_invoke_client(self, *args, **kwargs):
+ directory = kwargs.get('directory',None)
+ expect = kwargs.get('expect', (0,))
+ cl_args = [self.client]
+ cl_args.extend(args)
+ return self._u_invoke(cl_args, expect, cwd=directory)
+ def _u_search_parent_directories(self, path, filename):
+ """
+ Find the file (or directory) named filename in path or in any
+ of path's parents.
+
+ e.g.
+ search_parent_directories("/a/b/c", ".be")
+ will return the path to the first existing file from
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+ or None if none of those files exist.
+ """
+ path = os.path.realpath(path)
+ assert os.path.exists(path)
+ old_path = None
+ while True:
+ if os.path.exists(os.path.join(path, filename)):
+ return os.path.join(path, filename)
+ if path == old_path:
+ return None
+ old_path = path
+ path = os.path.dirname(path)
+ def _u_rel_path(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> rcs = new()
+ >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c")
+ '.be'
+ """
+ if root == None:
+ assert self.rootdir != None, "RCS not rooted"
+ root = self.rootdir
+ if os.path.isabs(path):
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ assert path.startswith(absRootSlashedDir), \
+ "file %s not in root %s" % (path, absRootSlashedDir)
+ assert path != absRootSlashedDir, \
+ "file %s == root directory %s" % (path, absRootSlashedDir)
+ path = path[len(absRootSlashedDir):]
+ return path
+ def _u_abspath(self, path, root=None):
+ """
+ Return the absolute path from a path realtive to root.
+ >>> rcs = new()
+ >>> rcs._u_abspath(".be", "/a.b/c")
+ '/a.b/c/.be'
+ """
+ if root == None:
+ assert self.rootdir != None, "RCS not rooted"
+ root = self.rootdir
+ return os.path.abspath(os.path.join(root, path))
+ def _u_create_id(self, name, email=None):
+ """
+ >>> rcs = new()
+ >>> rcs._u_create_id("John Doe", "jdoe@example.com")
+ 'John Doe <jdoe@example.com>'
+ >>> rcs._u_create_id("John Doe")
+ 'John Doe'
+ """
+ assert len(name) > 0
+ if email == None or len(email) == 0:
+ return name
else:
- # win32 don't have os.execvp() so have to run command in a shell
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True,
- cwd=cwd)
- except OSError, e :
- strerror = "%s\nwhile executing %s" % (e.args[1], args)
- raise CommandError(strerror, e.args[0])
- output, error = q.communicate()
- status = q.wait()
- if status not in expect:
- raise CommandError(error, status)
- return status, output, error
+ return "%s <%s>" % (name, email)
+ def _u_parse_id(self, value):
+ """
+ >>> rcs = new()
+ >>> rcs._u_parse_id("John Doe <jdoe@example.com>")
+ ('John Doe', 'jdoe@example.com')
+ >>> rcs._u_parse_id("John Doe")
+ ('John Doe', None)
+ >>> try:
+ ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>")
+ ... except AssertionError:
+ ... print "Invalid match"
+ Invalid match
+ """
+ emailexp = re.compile("(.*) <([^>]*)>(.*)")
+ match = emailexp.search(value)
+ if match == None:
+ email = None
+ name = value
+ else:
+ assert len(match.groups()) == 3
+ assert match.groups()[2] == "", match.groups()
+ email = match.groups()[1]
+ name = match.groups()[0]
+ assert name != None
+ assert len(name) > 0
+ return (name, email)
+ def _u_get_fallback_username(self):
+ name = None
+ for envariable in ["LOGNAME", "USERNAME"]:
+ if os.environ.has_key(envariable):
+ name = os.environ[envariable]
+ break
+ assert name != None
+ return name
+ def _u_get_fallback_email(self):
+ hostname = gethostname()
+ name = self._u_get_fallback_username()
+ return "%s@%s" % (name, hostname)
+ def _u_parse_commitfile(self, commitfile):
+ """
+ Split the commitfile created in self.commit() back into
+ summary and header lines.
+ """
+ f = file(commitfile, "rb")
+ summary = f.readline()
+ body = f.read()
+ body.lstrip('\n')
+ if len(body) == 0:
+ body = None
+ f.close
+ return (summary, body)
+
+
+class RCStestCase(unittest.TestCase):
+ Class = RCS
+ def __init__(self, *args, **kwargs):
+ unittest.TestCase.__init__(self, *args, **kwargs)
+ self.dirname = None
+ def instantiateRCS(self):
+ return self.Class()
+ def setUp(self):
+ self.dir = Dir()
+ self.dirname = self.dir.path
+ self.rcs = self.instantiateRCS()
+ def tearDown(self):
+ del(self.rcs)
+ del(self.dirname)
+ def fullPath(self, path):
+ return os.path.join(self.dirname, path)
+ def assertPathExists(self, path):
+ fullpath = self.fullPath(path)
+ self.failUnless(os.path.exists(fullpath)==True,
+ "path %s does not exist" % fullpath)
+ def uidTest(self):
+ user_id = self.rcs.get_user_id()
+ self.failUnless(user_id != None,
+ "unable to get a user id")
+ user_idB = "John Doe <jdoe@example.com>"
+ if self.rcs.name in ["None", "hg"]:
+ self.assertRaises(SettingIDnotSupported, self.rcs.set_user_id,
+ user_idB)
+ else:
+ self.rcs.set_user_id(user_idB)
+ self.failUnless(self.rcs.get_user_id() == user_idB,
+ "user id not set correctly (was %s, is %s)" \
+ % (user_id, self.rcs.get_user_id()))
+ self.failUnless(self.rcs.set_user_id(user_id) == None,
+ "unable to restore user id %s" % user_id)
+ self.failUnless(self.rcs.get_user_id() == user_id,
+ "unable to restore user id %s" % user_id)
+ def versionTest(self, path):
+ origpath = path
+ path = self.fullPath(path)
+ contentsA = "Lorem ipsum"
+ contentsB = "dolor sit amet"
+ self.rcs.set_file_contents(path,contentsA)
+ self.failUnless(self.rcs.get_file_contents(path)==contentsA,
+ "File contents not set or read correctly")
+ revision = self.rcs.commit("Commit current status")
+ self.failUnless(self.rcs.get_file_contents(path)==contentsA,
+ "Committing File contents not set or read correctly")
+ if self.rcs.versioned == True:
+ self.rcs.set_file_contents(path,contentsB)
+ self.failUnless(self.rcs.get_file_contents(path)==contentsB,
+ "File contents not set correctly after commit")
+ contentsArev = self.rcs.get_file_contents(path, revision)
+ self.failUnless(contentsArev==contentsA, \
+ "Original file contents not saved in revision %s\n%s\n%s\n" \
+ % (revision, contentsA, contentsArev))
+ dup = self.rcs.duplicate_repo(revision)
+ duppath = os.path.join(dup, origpath)
+ dupcont = file(duppath, "rb").read()
+ self.failUnless(dupcont == contentsA)
+ self.rcs.remove_duplicate_repo()
+ def testRun(self):
+ self.failUnless(self.rcs.installed() == True,
+ "%s RCS not found" % self.Class.name)
+ if self.Class.name != "None":
+ self.failUnless(self.rcs.detect(self.dirname)==False,
+ "Detected %s RCS before initializing" \
+ % self.Class.name)
+ self.rcs.init(self.dirname)
+ self.failUnless(self.rcs.detect(self.dirname)==True,
+ "Did not detect %s RCS after initializing" \
+ % self.Class.name)
+ rp = os.path.realpath(self.rcs.rootdir)
+ dp = os.path.realpath(self.dirname)
+ self.failUnless(dp == rp or rp == None,
+ "%s RCS root in wrong dir (%s %s)" \
+ % (self.Class.name, dp, rp))
+ self.uidTest()
+ self.rcs.mkdir(self.fullPath('a'))
+ self.rcs.mkdir(self.fullPath('a/b'))
+ self.rcs.mkdir(self.fullPath('c'))
+ self.assertPathExists('a')
+ self.assertPathExists('a/b')
+ self.assertPathExists('c')
+ self.versionTest('a/text')
+ self.versionTest('a/b/text')
+ self.rcs.recursive_remove(self.fullPath('a'))
+
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(RCStestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/restconvert.py b/libbe/restconvert.py
index cc7f866..57148e4 100644
--- a/libbe/restconvert.py
+++ b/libbe/restconvert.py
@@ -27,7 +27,7 @@ try :
from xml.etree import ElementTree # Python 2.5 (and greater?)
except ImportError :
from elementtree import ElementTree
-
+import doctest
def rest_xml(rest):
warnings = StringIO()
@@ -126,3 +126,5 @@ def foldout(name, arguments, options, content, lineno, content_offset,
foldout += foldout_body
foldout.set_class('foldout')
return [foldout]
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/tests.py b/libbe/tests.py
index 461e6e8..18277d7 100644
--- a/libbe/tests.py
+++ b/libbe/tests.py
@@ -16,40 +16,16 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import tempfile
import shutil
-import os
-import os.path
-from libbe import bugdir, bug, arch
-cleanable = []
-def clean_up():
- global cleanable
- tmp = cleanable
- tmp.reverse()
- for obj in tmp:
- obj.clean_up()
- cleanable = []
+from libbe import utility, names, restconvert, mapfile, config, diff, rcs, \
+ arch, bzr, git, hg, bug, bugdir, plugin, cmdutil
+import unittest
-class Dir:
- def __init__(self):
- self.name = tempfile.mkdtemp(prefix="testdir")
- cleanable.append(self)
- def clean_up(self):
- shutil.rmtree(self.name)
+# can not use 'suite' or the base test.py file will include these suites twice.
+testsuite = unittest.TestSuite([utility.suite, names.suite, restconvert.suite,
+ mapfile.suite, config.suite, diff.suite,
+ rcs.suite, arch.suite, bzr.suite, git.suite,
+ hg.suite, bug.suite, bugdir.suite,
+ plugin.suite, cmdutil.suite])
-def arch_dir():
- arch.ensure_user_id()
- dir = Dir()
- arch.init_tree(dir.name)
- return dir
-
-def bug_arch_dir():
- dir = arch_dir()
- return bugdir.create_bug_dir(dir.name, arch)
-
-def simple_bug_dir():
- dir = bug_arch_dir()
- bug_a = bug.new_bug(dir, "a")
- bug_b = bug.new_bug(dir, "b")
- bug_b.status = "closed"
- bug_a.save()
- bug_b.save()
- return dir
+if __name__ == "__main__":
+ unittest.TextTestRunner(verbosity=2).run(testsuite)
diff --git a/libbe/utility.py b/libbe/utility.py
index 1fd83da..f595bdb 100644
--- a/libbe/utility.py
+++ b/libbe/utility.py
@@ -18,6 +18,8 @@ import calendar
import time
import os
import tempfile
+import shutil
+import doctest
class FileString(object):
"""Bare-bones pseudo-file class
@@ -69,6 +71,14 @@ def get_file(f):
else:
return f
+class Dir:
+ "A temporary directory for testing use"
+ def __init__(self):
+ self.path = tempfile.mkdtemp(prefix="BEtest")
+ def __del__(self):
+ shutil.rmtree(self.path)
+ def __call__(self):
+ return self.path
RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000"
@@ -162,3 +172,5 @@ def trimmed_string(instring):
break
out.append(line)
return ''.join(out)
+
+suite = doctest.DocTestSuite()