aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/arch.py
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2008-11-18 20:42:50 -0500
committerW. Trevor King <wking@drexel.edu>2008-11-18 20:42:50 -0500
commit19b153b9a86377a2b30cc80fa3f475fed892e2fe (patch)
tree8f5688707ab1b34ffec2bc4372d087580ff21709 /libbe/arch.py
parente4018dfe8cfa553adbd20898c5b42c3462ca1733 (diff)
downloadbugseverywhere-19b153b9a86377a2b30cc80fa3f475fed892e2fe.tar.gz
Major rewrite of RCS backends. RCS now represented as a class.
Lots of changes and just one commit. This started with bug dac91856-cb6a-4f69-8c03-38ff0b29aab2, when I noticed that new bugs were not being added appropriately with the Git backend. I'd been working with Git trouble before with bug 0cad2ac6-76ef-4a88-abdf-b2e02de76f5c, and decided things would be better off if I just scrapped the current RCS architecture and went to a more object oriented setup. So I did. It's not clear how to add support for an RCS backend: * Create a new module that - defines an inheritor of rsc.RCS, overriding the _rcs_*() methods - provide a new() function for instantizating the new class - defines an inheritor of rcs.RCStestCase, overiding the Class attribute - defines 'suite' a unittest.TestSuite testing the module * Add your new module to the rest in rcs._get_matching_rcs() * Add your new module to the rest in libbe/tests.py Although I'm not sure libbe/tests.py is still usefull. The new framework clears out a bunch of hackery that used to be involved with supporting becommands/diff.py. There's still room for progress though. While implementing the new verision, I moved the testing framework over from doctest to a doctest/unittest combination. Longer tests that don't demonstrate a function's usage should be moved to unittests at the end of the module, since unittest has better support for setup/teardown, etc. The new framework also revealed some underimplented backends, most notably arch. These backends have now been fixed. I also tweaked the test_usage.sh script to run through all the backends if it is called with no arguments. The fix for the dac bug turned out to be an unflushed file write :p.
Diffstat (limited to 'libbe/arch.py')
-rw-r--r--libbe/arch.py357
1 files changed, 191 insertions, 166 deletions
diff --git a/libbe/arch.py b/libbe/arch.py
index 001f852..8e7390d 100644
--- a/libbe/arch.py
+++ b/libbe/arch.py
@@ -15,184 +15,209 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import config
-import errno
+import shutil
+import time
+import re
+import unittest
+import doctest
-from rcs import invoke
+import config
+from rcs import RCS, RCStestCase, CommandError
client = config.get_val("arch_client")
if client is None:
client = "tla"
config.set_val("arch_client", client)
-
-def invoke_client(*args, **kwargs):
- cl_args = [client]
- cl_args.extend(args)
- status,output,error = invoke(cl_args)
- if status not in (0,):
- raise Exception("Command failed: %s" % error)
- return output
-
-def get_user_id():
- try:
- return invoke_client('my-id')
- except Exception, e:
- if 'no arch user id set' in e.args[0]:
- return None
+def new():
+ return Arch()
+
+class Arch(RCS):
+ name = "Arch"
+ client = client
+ versioned = True
+ _archive_name = None
+ _archive_dir = None
+ _tmp_archive = False
+ _project_name = None
+ _tmp_project = False
+ _arch_paramdir = os.path.expanduser("~/.arch-params")
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Arch"""
+ if self._u_search_parent_directories(path, "{arch}") != None :
+ return True
+ return False
+ def _rcs_root(self, path):
+ if not os.path.isdir(path):
+ dirname = os.path.dirname(path)
else:
- raise
-
-
-def set_user_id(value):
- invoke_client('my-id', value)
-
-
-def ensure_user_id():
- if get_user_id() is None:
- set_user_id('nobody <nobody@example.com>')
-
-
-def write_tree_settings(contents, path):
- file(os.path.join(path, "{arch}", "=tagging-method"), "wb").write(contents)
-
-def init_tree(path):
- invoke_client("init-tree", "-d", path)
-
-def temp_arch_tree(type="easy"):
- import tempfile
- ensure_user_id()
- path = tempfile.mkdtemp()
- init_tree(path)
- if type=="easy":
- write_tree_settings("source ^.*$\n", path)
- elif type=="tricky":
- write_tree_settings("source ^$\n", path)
- else:
- assert (type=="impossible")
- add_dir_rule("precious ^\.boo$", path, path)
- return path
-
-def list_added(root):
- assert os.path.exists(root)
- assert os.access(root, os.X_OK)
- root = os.path.realpath(root)
- inv_str = invoke_client("inventory", "--source", '--both', '--all', root)
- return [os.path.join(root, p) for p in inv_str.split('\n')]
-
-def tree_root(filename):
- assert os.path.exists(filename)
- if not os.path.isdir(filename):
- dirname = os.path.dirname(filename)
- else:
- dirname = filename
- return invoke_client("tree-root", dirname).rstrip('\n')
-
-def rel_filename(filename, root):
- filename = os.path.realpath(filename)
- root = os.path.realpath(root)
- assert(filename.startswith(root))
- return filename[len(root)+1:]
+ dirname = path
+ status,output,error = self._u_invoke_client("tree-root", dirname)
+ # get archive name...
+ return output.rstrip('\n')
+ def _rcs_init(self, path):
+ self._create_archive(path)
+ self._create_project(path)
+ self._add_project_code(path)
+ def _create_archive(self, path):
+ # Create a new archive
+ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
+ assert self._archive_name == None
+ id = self.get_user_id()
+ name, email = self._u_parse_id(id)
+ if email == None:
+ email = "%s@example.com" % name
+ trailer = "%s-%s" % ("bugs-everywhere-auto",
+ time.strftime("%Y.%H.%M.%S"))
+ self._archive_name = "%s--%s" % (email, trailer)
+ self._archive_dir = "/tmp/%s" % trailer
+ self._tmp_archive = True
+ self._u_invoke_client("make-archive", self._archive_name,
+ self._archive_dir, directory=path)
+ def _invoke_client(self, *args, **kwargs):
+ """
+ Invoke the client on our archive.
+ """
+ assert self._archive_name != None
+ command = args[0]
+ if len(args) > 1:
+ tailargs = args[1:]
+ else:
+ tailargs = []
+ arglist = [command, "-A", self._archive_name]
+ arglist.extend(tailargs)
+ args = tuple(arglist)
+ return self._u_invoke_client(*args, **kwargs)
+ def _remove_archive(self):
+ assert self._tmp_archive == True
+ assert self._archive_dir != None
+ assert self._archive_name != None
+ os.remove(os.path.join(self._arch_paramdir,
+ "=locations", self._archive_name))
+ shutil.rmtree(self._archive_dir)
+ self._tmp_archive = False
+ self._archive_dir = False
+ self._archive_name = False
+ def _create_project(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
+ category = "bugs-everywhere"
+ branch = "mainline"
+ version = "0.1"
+ self._project_name = "%s--%s--%s" % (category, branch, version)
+ self._invoke_client("archive-setup", self._project_name,
+ directory=path)
+ def _remove_project(self):
+ assert self._tmp_project == True
+ assert self._project_name != None
+ assert self._archive_dir != None
+ shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
+ self._tmp_project = False
+ self._project_name = False
+ def _archive_project_name(self):
+ assert self._archive_name != None
+ assert self._project_name != None
+ return "%s/%s" % (self._archive_name, self._project_name)
+ def _add_project_code(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/importing-first.html#Importing_the_First_Revision
+ self._u_invoke_client("init-tree", self._archive_project_name(),
+ directory=path)
+ self._invoke_client("import", "--summary", "Began versioning",
+ directory=path)
+ def _rcs_cleanup(self):
+ if self._tmp_project == True:
+ self._remove_project()
+ if self._tmp_archive == True:
+ self._remove_archive()
+ def _rcs_get_user_id(self):
+ try:
+ status,output,error = self._u_invoke_client('my-id')
+ return output.rstrip('\n')
+ except Exception, e:
+ if 'no arch user id set' in e.args[0]:
+ return None
+ else:
+ raise
+ def _rcs_set_user_id(self, value):
+ self._u_invoke_client('my-id', value)
+ def _rcs_add(self, path):
+ self._u_invoke_client("add-id", path)
+ realpath = os.path.realpath(self._u_abspath(path))
+ pathAdded = realpath in self._list_added(self.rootdir)
+ if self.paranoid and not pathAdded:
+ self._force_source(path)
+ def _list_added(self, root):
+ assert os.path.exists(root)
+ assert os.access(root, os.X_OK)
+ root = os.path.realpath(root)
+ status,output,error = self._u_invoke_client("inventory", "--source",
+ "--both", "--all", root)
+ inv_str = output.rstrip('\n')
+ return [os.path.join(root, p) for p in inv_str.split('\n')]
+ def _add_dir_rule(self, rule, dirname, root):
+ inv_path = os.path.join(dirname, '.arch-inventory')
+ file(inv_path, "ab").write(rule)
+ if os.path.realpath(inv_path) not in self._list_added(root):
+ paranoid = self.paranoid
+ self.paranoid = False
+ self.add(inv_path)
+ self.paranoid = paranoid
+ def _force_source(self, path):
+ rule = "source %s\n" % self._u_rel_path(path)
+ self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
+ if os.path.realpath(path) not in self._list_added(self.rootdir):
+ raise CantAddFile(path)
+ def _rcs_remove(self, path):
+ if not '.arch-ids' in path:
+ self._u_invoke_client("delete-id", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(self._u_abspath(path), "rb").read()
+ else:
+ status,output,error = \
+ self._invoke_client("file-find", path, revision)
+ path = output.rstrip('\n')
+ return file(self._u_abspath(path), "rb").read()
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ status,output,error = \
+ self._u_invoke_client("get", revision,directory)
+ def _rcs_commit(self, commitfile):
+ summary,body = self._u_parse_commitfile(commitfile)
+ #status,output,error = self._invoke_client("make-log")
+ if body == None:
+ status,output,error \
+ = self._invoke_client("commit","--summary",summary)
+ else:
+ status,output,error \
+ = self._invoke_client("commit","--summary",summary,
+ "--log-message",body)
+ revision = None
+ revline = re.compile("[*] committed (.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revpath = match.groups()[0]
+ assert not " " in revpath, revpath
+ assert revpath.startswith(self._archive_project_name()+'--')
+ revision = revpath[len(self._archive_project_name()+'--'):]
+ return revpath
class CantAddFile(Exception):
def __init__(self, file):
self.file = file
Exception.__init__(self, "Can't automatically add file %s" % file)
+class ArchTestCase(RCStestCase):
+ Class = Arch
-def add_dir_rule(rule, dirname, root):
- inv_filename = os.path.join(dirname, '.arch-inventory')
- file(inv_filename, "ab").write(rule)
- if os.path.realpath(inv_filename) not in list_added(root):
- add_id(inv_filename, paranoid=False)
-
-def force_source(filename, root):
- rule = "source %s\n" % rel_filename(filename, root)
- add_dir_rule(rule, os.path.dirname(filename), root)
- if os.path.realpath(filename) not in list_added(root):
- raise CantAddFile(filename)
-
-def add_id(filename, paranoid=False):
- invoke_client("add-id", filename)
- root = tree_root(filename)
- if paranoid and os.path.realpath(filename) not in list_added(root):
- force_source(filename, root)
-
-
-def delete_id(filename):
- invoke_client("delete-id", filename)
-
-def test_helper(type):
- t = temp_arch_tree(type)
- dirname = os.path.join(t, ".boo")
- return dirname, t
-
-def mkdir(path, paranoid=False):
- """
- >>> import shutil
- >>> dirname,t = test_helper("easy")
- >>> mkdir(dirname, paranoid=False)
- >>> assert os.path.realpath(dirname) in list_added(t)
- >>> assert not os.path.exists(os.path.join(t, ".arch-inventory"))
- >>> shutil.rmtree(t)
- >>> dirname,t = test_helper("tricky")
- >>> mkdir(dirname, paranoid=True)
- >>> assert os.path.realpath(dirname) in list_added(t)
- >>> assert os.path.exists(os.path.join(t, ".arch-inventory"))
- >>> shutil.rmtree(t)
- >>> dirname,t = test_helper("impossible")
- >>> try:
- ... mkdir(dirname, paranoid=True)
- ... except CantAddFile, e:
- ... print "Can't add file"
- Can't add file
- >>> shutil.rmtree(t)
- """
- os.mkdir(path)
- add_id(path, paranoid=paranoid)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-
-def path_in_reference(bug_dir, spec):
- if spec is not None:
- return invoke_client("file-find", bug_dir, spec).rstrip('\n')
- return invoke_client("file-find", bug_dir).rstrip('\n')
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using Arch"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, "{arch}")):
- return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.join('..', path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- pass
-
-def postcommit(directory):
- pass
-
-
-name = "Arch"
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])