From 19b153b9a86377a2b30cc80fa3f475fed892e2fe Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Tue, 18 Nov 2008 20:42:50 -0500 Subject: Major rewrite of RCS backends. RCS now represented as a class. Lots of changes and just one commit. This started with bug dac91856-cb6a-4f69-8c03-38ff0b29aab2, when I noticed that new bugs were not being added appropriately with the Git backend. I'd been working with Git trouble before with bug 0cad2ac6-76ef-4a88-abdf-b2e02de76f5c, and decided things would be better off if I just scrapped the current RCS architecture and went to a more object oriented setup. So I did. It's not clear how to add support for an RCS backend: * Create a new module that - defines an inheritor of rsc.RCS, overriding the _rcs_*() methods - provide a new() function for instantizating the new class - defines an inheritor of rcs.RCStestCase, overiding the Class attribute - defines 'suite' a unittest.TestSuite testing the module * Add your new module to the rest in rcs._get_matching_rcs() * Add your new module to the rest in libbe/tests.py Although I'm not sure libbe/tests.py is still usefull. The new framework clears out a bunch of hackery that used to be involved with supporting becommands/diff.py. There's still room for progress though. While implementing the new verision, I moved the testing framework over from doctest to a doctest/unittest combination. Longer tests that don't demonstrate a function's usage should be moved to unittests at the end of the module, since unittest has better support for setup/teardown, etc. The new framework also revealed some underimplented backends, most notably arch. These backends have now been fixed. I also tweaked the test_usage.sh script to run through all the backends if it is called with no arguments. The fix for the dac bug turned out to be an unflushed file write :p. --- libbe/arch.py | 357 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 191 insertions(+), 166 deletions(-) (limited to 'libbe/arch.py') diff --git a/libbe/arch.py b/libbe/arch.py index 001f852..8e7390d 100644 --- a/libbe/arch.py +++ b/libbe/arch.py @@ -15,184 +15,209 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os -import config -import errno +import shutil +import time +import re +import unittest +import doctest -from rcs import invoke +import config +from rcs import RCS, RCStestCase, CommandError client = config.get_val("arch_client") if client is None: client = "tla" config.set_val("arch_client", client) - -def invoke_client(*args, **kwargs): - cl_args = [client] - cl_args.extend(args) - status,output,error = invoke(cl_args) - if status not in (0,): - raise Exception("Command failed: %s" % error) - return output - -def get_user_id(): - try: - return invoke_client('my-id') - except Exception, e: - if 'no arch user id set' in e.args[0]: - return None +def new(): + return Arch() + +class Arch(RCS): + name = "Arch" + client = client + versioned = True + _archive_name = None + _archive_dir = None + _tmp_archive = False + _project_name = None + _tmp_project = False + _arch_paramdir = os.path.expanduser("~/.arch-params") + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + """Detect whether a directory is revision-controlled using Arch""" + if self._u_search_parent_directories(path, "{arch}") != None : + return True + return False + def _rcs_root(self, path): + if not os.path.isdir(path): + dirname = os.path.dirname(path) else: - raise - - -def set_user_id(value): - invoke_client('my-id', value) - - -def ensure_user_id(): - if get_user_id() is None: - set_user_id('nobody ') - - -def write_tree_settings(contents, path): - file(os.path.join(path, "{arch}", "=tagging-method"), "wb").write(contents) - -def init_tree(path): - invoke_client("init-tree", "-d", path) - -def temp_arch_tree(type="easy"): - import tempfile - ensure_user_id() - path = tempfile.mkdtemp() - init_tree(path) - if type=="easy": - write_tree_settings("source ^.*$\n", path) - elif type=="tricky": - write_tree_settings("source ^$\n", path) - else: - assert (type=="impossible") - add_dir_rule("precious ^\.boo$", path, path) - return path - -def list_added(root): - assert os.path.exists(root) - assert os.access(root, os.X_OK) - root = os.path.realpath(root) - inv_str = invoke_client("inventory", "--source", '--both', '--all', root) - return [os.path.join(root, p) for p in inv_str.split('\n')] - -def tree_root(filename): - assert os.path.exists(filename) - if not os.path.isdir(filename): - dirname = os.path.dirname(filename) - else: - dirname = filename - return invoke_client("tree-root", dirname).rstrip('\n') - -def rel_filename(filename, root): - filename = os.path.realpath(filename) - root = os.path.realpath(root) - assert(filename.startswith(root)) - return filename[len(root)+1:] + dirname = path + status,output,error = self._u_invoke_client("tree-root", dirname) + # get archive name... + return output.rstrip('\n') + def _rcs_init(self, path): + self._create_archive(path) + self._create_project(path) + self._add_project_code(path) + def _create_archive(self, path): + # Create a new archive + # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive + assert self._archive_name == None + id = self.get_user_id() + name, email = self._u_parse_id(id) + if email == None: + email = "%s@example.com" % name + trailer = "%s-%s" % ("bugs-everywhere-auto", + time.strftime("%Y.%H.%M.%S")) + self._archive_name = "%s--%s" % (email, trailer) + self._archive_dir = "/tmp/%s" % trailer + self._tmp_archive = True + self._u_invoke_client("make-archive", self._archive_name, + self._archive_dir, directory=path) + def _invoke_client(self, *args, **kwargs): + """ + Invoke the client on our archive. + """ + assert self._archive_name != None + command = args[0] + if len(args) > 1: + tailargs = args[1:] + else: + tailargs = [] + arglist = [command, "-A", self._archive_name] + arglist.extend(tailargs) + args = tuple(arglist) + return self._u_invoke_client(*args, **kwargs) + def _remove_archive(self): + assert self._tmp_archive == True + assert self._archive_dir != None + assert self._archive_name != None + os.remove(os.path.join(self._arch_paramdir, + "=locations", self._archive_name)) + shutil.rmtree(self._archive_dir) + self._tmp_archive = False + self._archive_dir = False + self._archive_name = False + def _create_project(self, path): + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project + category = "bugs-everywhere" + branch = "mainline" + version = "0.1" + self._project_name = "%s--%s--%s" % (category, branch, version) + self._invoke_client("archive-setup", self._project_name, + directory=path) + def _remove_project(self): + assert self._tmp_project == True + assert self._project_name != None + assert self._archive_dir != None + shutil.rmtree(os.path.join(self._archive_dir, self._project_name)) + self._tmp_project = False + self._project_name = False + def _archive_project_name(self): + assert self._archive_name != None + assert self._project_name != None + return "%s/%s" % (self._archive_name, self._project_name) + def _add_project_code(self, path): + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/importing-first.html#Importing_the_First_Revision + self._u_invoke_client("init-tree", self._archive_project_name(), + directory=path) + self._invoke_client("import", "--summary", "Began versioning", + directory=path) + def _rcs_cleanup(self): + if self._tmp_project == True: + self._remove_project() + if self._tmp_archive == True: + self._remove_archive() + def _rcs_get_user_id(self): + try: + status,output,error = self._u_invoke_client('my-id') + return output.rstrip('\n') + except Exception, e: + if 'no arch user id set' in e.args[0]: + return None + else: + raise + def _rcs_set_user_id(self, value): + self._u_invoke_client('my-id', value) + def _rcs_add(self, path): + self._u_invoke_client("add-id", path) + realpath = os.path.realpath(self._u_abspath(path)) + pathAdded = realpath in self._list_added(self.rootdir) + if self.paranoid and not pathAdded: + self._force_source(path) + def _list_added(self, root): + assert os.path.exists(root) + assert os.access(root, os.X_OK) + root = os.path.realpath(root) + status,output,error = self._u_invoke_client("inventory", "--source", + "--both", "--all", root) + inv_str = output.rstrip('\n') + return [os.path.join(root, p) for p in inv_str.split('\n')] + def _add_dir_rule(self, rule, dirname, root): + inv_path = os.path.join(dirname, '.arch-inventory') + file(inv_path, "ab").write(rule) + if os.path.realpath(inv_path) not in self._list_added(root): + paranoid = self.paranoid + self.paranoid = False + self.add(inv_path) + self.paranoid = paranoid + def _force_source(self, path): + rule = "source %s\n" % self._u_rel_path(path) + self._add_dir_rule(rule, os.path.dirname(path), self.rootdir) + if os.path.realpath(path) not in self._list_added(self.rootdir): + raise CantAddFile(path) + def _rcs_remove(self, path): + if not '.arch-ids' in path: + self._u_invoke_client("delete-id", path) + def _rcs_update(self, path): + pass + def _rcs_get_file_contents(self, path, revision=None): + if revision == None: + return file(self._u_abspath(path), "rb").read() + else: + status,output,error = \ + self._invoke_client("file-find", path, revision) + path = output.rstrip('\n') + return file(self._u_abspath(path), "rb").read() + def _rcs_duplicate_repo(self, directory, revision=None): + if revision == None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + status,output,error = \ + self._u_invoke_client("get", revision,directory) + def _rcs_commit(self, commitfile): + summary,body = self._u_parse_commitfile(commitfile) + #status,output,error = self._invoke_client("make-log") + if body == None: + status,output,error \ + = self._invoke_client("commit","--summary",summary) + else: + status,output,error \ + = self._invoke_client("commit","--summary",summary, + "--log-message",body) + revision = None + revline = re.compile("[*] committed (.*)") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revpath = match.groups()[0] + assert not " " in revpath, revpath + assert revpath.startswith(self._archive_project_name()+'--') + revision = revpath[len(self._archive_project_name()+'--'):] + return revpath class CantAddFile(Exception): def __init__(self, file): self.file = file Exception.__init__(self, "Can't automatically add file %s" % file) +class ArchTestCase(RCStestCase): + Class = Arch -def add_dir_rule(rule, dirname, root): - inv_filename = os.path.join(dirname, '.arch-inventory') - file(inv_filename, "ab").write(rule) - if os.path.realpath(inv_filename) not in list_added(root): - add_id(inv_filename, paranoid=False) - -def force_source(filename, root): - rule = "source %s\n" % rel_filename(filename, root) - add_dir_rule(rule, os.path.dirname(filename), root) - if os.path.realpath(filename) not in list_added(root): - raise CantAddFile(filename) - -def add_id(filename, paranoid=False): - invoke_client("add-id", filename) - root = tree_root(filename) - if paranoid and os.path.realpath(filename) not in list_added(root): - force_source(filename, root) - - -def delete_id(filename): - invoke_client("delete-id", filename) - -def test_helper(type): - t = temp_arch_tree(type) - dirname = os.path.join(t, ".boo") - return dirname, t - -def mkdir(path, paranoid=False): - """ - >>> import shutil - >>> dirname,t = test_helper("easy") - >>> mkdir(dirname, paranoid=False) - >>> assert os.path.realpath(dirname) in list_added(t) - >>> assert not os.path.exists(os.path.join(t, ".arch-inventory")) - >>> shutil.rmtree(t) - >>> dirname,t = test_helper("tricky") - >>> mkdir(dirname, paranoid=True) - >>> assert os.path.realpath(dirname) in list_added(t) - >>> assert os.path.exists(os.path.join(t, ".arch-inventory")) - >>> shutil.rmtree(t) - >>> dirname,t = test_helper("impossible") - >>> try: - ... mkdir(dirname, paranoid=True) - ... except CantAddFile, e: - ... print "Can't add file" - Can't add file - >>> shutil.rmtree(t) - """ - os.mkdir(path) - add_id(path, paranoid=paranoid) - -def set_file_contents(path, contents): - add = not os.path.exists(path) - file(path, "wb").write(contents) - if add: - add_id(path) - - -def path_in_reference(bug_dir, spec): - if spec is not None: - return invoke_client("file-find", bug_dir, spec).rstrip('\n') - return invoke_client("file-find", bug_dir).rstrip('\n') - - -def unlink(path): - try: - os.unlink(path) - delete_id(path) - except OSError, e: - if e.errno != 2: - raise - - -def detect(path): - """Detect whether a directory is revision-controlled using Arch""" - path = os.path.realpath(path) - old_path = None - while True: - if os.path.exists(os.path.join(path, "{arch}")): - return True - if path == old_path: - return False - old_path = path - path = os.path.join('..', path) - -def precommit(directory): - pass - -def commit(directory, summary, body=None): - pass - -def postcommit(directory): - pass - - -name = "Arch" +unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) -- cgit