diff options
Diffstat (limited to 'libbe/arch.py')
-rw-r--r-- | libbe/arch.py | 425 |
1 files changed, 247 insertions, 178 deletions
diff --git a/libbe/arch.py b/libbe/arch.py index 038325a..fd953a4 100644 --- a/libbe/arch.py +++ b/libbe/arch.py @@ -14,196 +14,265 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -from subprocess import Popen, PIPE import os +import shutil +import time +import re +import unittest +import doctest + import config -import errno +from beuuid import uuid_gen +from rcs import RCS, RCStestCase, CommandError + client = config.get_val("arch_client") if client is None: client = "tla" config.set_val("arch_client", client) -def invoke(args): - try : - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) - except OSError, e : - strerror = "%s\nwhile executing %s" % (e.args[1], args) - raise Exception("Command failed: %s" % strerror) - output = q.stdout.read() - error = q.stderr.read() - status = q.wait() - if status >= 0: - return status, output, error - raise Exception("Command failed: %s" % error) - - -def invoke_client(*args, **kwargs): - cl_args = [client] - cl_args.extend(args) - status,output,error = invoke(cl_args) - if status not in (0,): - raise Exception("Command failed: %s" % error) - return output - -def get_user_id(): - try: - return invoke_client('my-id') - except Exception, e: - if 'no arch user id set' in e.args[0]: - return None +def new(): + return Arch() + +class Arch(RCS): + name = "Arch" + client = client + versioned = True + _archive_name = None + _archive_dir = None + _tmp_archive = False + _project_name = None + _tmp_project = False + _arch_paramdir = os.path.expanduser("~/.arch-params") + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + """Detect whether a directory is revision-controlled using Arch""" + if self._u_search_parent_directories(path, "{arch}") != None : + return True + return False + def _rcs_init(self, path): + self._create_archive(path) + self._create_project(path) + self._add_project_code(path) + def _create_archive(self, path): + # Create a new archive + # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive + assert self._archive_name == None + id = self.get_user_id() + name, email = self._u_parse_id(id) + if email == None: + email = "%s@example.com" % name + trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8]) + self._archive_name = "%s--%s" % (email, trailer) + self._archive_dir = "/tmp/%s" % trailer + self._tmp_archive = True + self._u_invoke_client("make-archive", self._archive_name, + self._archive_dir, directory=path) + def _invoke_client(self, *args, **kwargs): + """ + Invoke the client on our archive. + """ + assert self._archive_name != None + command = args[0] + if len(args) > 1: + tailargs = args[1:] else: - raise - - -def set_user_id(value): - invoke_client('my-id', value) - - -def ensure_user_id(): - if get_user_id() is None: - set_user_id('nobody <nobody@example.com>') - - -def write_tree_settings(contents, path): - file(os.path.join(path, "{arch}", "=tagging-method"), "wb").write(contents) - -def init_tree(path): - invoke_client("init-tree", "-d", path) - -def temp_arch_tree(type="easy"): - import tempfile - ensure_user_id() - path = tempfile.mkdtemp() - init_tree(path) - if type=="easy": - write_tree_settings("source ^.*$\n", path) - elif type=="tricky": - write_tree_settings("source ^$\n", path) - else: - assert (type=="impossible") - add_dir_rule("precious ^\.boo$", path, path) - return path - -def list_added(root): - assert os.path.exists(root) - assert os.access(root, os.X_OK) - root = os.path.realpath(root) - inv_str = invoke_client("inventory", "--source", '--both', '--all', root) - return [os.path.join(root, p) for p in inv_str.split('\n')] - -def tree_root(filename): - assert os.path.exists(filename) - if not os.path.isdir(filename): - dirname = os.path.dirname(filename) - else: - dirname = filename - return invoke_client("tree-root", dirname).rstrip('\n') - -def rel_filename(filename, root): - filename = os.path.realpath(filename) - root = os.path.realpath(root) - assert(filename.startswith(root)) - return filename[len(root)+1:] + tailargs = [] + arglist = [command, "-A", self._archive_name] + arglist.extend(tailargs) + args = tuple(arglist) + return self._u_invoke_client(*args, **kwargs) + def _remove_archive(self): + assert self._tmp_archive == True + assert self._archive_dir != None + assert self._archive_name != None + os.remove(os.path.join(self._arch_paramdir, + "=locations", self._archive_name)) + shutil.rmtree(self._archive_dir) + self._tmp_archive = False + self._archive_dir = False + self._archive_name = False + def _create_project(self, path): + """ + Create a temporary Arch project in the directory PATH. This + project will be removed by + __del__->cleanup->_rcs_cleanup->_remove_project + """ + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project + category = "bugs-everywhere" + branch = "mainline" + version = "0.1" + self._project_name = "%s--%s--%s" % (category, branch, version) + self._invoke_client("archive-setup", self._project_name, + directory=path) + self._tmp_project = True + def _remove_project(self): + assert self._tmp_project == True + assert self._project_name != None + assert self._archive_dir != None + shutil.rmtree(os.path.join(self._archive_dir, self._project_name)) + self._tmp_project = False + self._project_name = False + def _archive_project_name(self): + assert self._archive_name != None + assert self._project_name != None + return "%s/%s" % (self._archive_name, self._project_name) + def _adjust_naming_conventions(self, path): + """ + By default, Arch restricts source code filenames to + ^[_=a-zA-Z0-9].*$ + See + http://regexps.srparish.net/tutorial-tla/naming-conventions.html + Since our bug directory '.be' doesn't satisfy these conventions, + we need to adjust them. + + The conventions are specified in + project-root/{arch}/=tagging-method + """ + tagpath = os.path.join(path, "{arch}", "=tagging-method") + lines_out = [] + for line in file(tagpath, "rb"): + line.decode("utf-8") + if line.startswith("source "): + lines_out.append("source ^[._=a-zA-X0-9].*$\n") + else: + lines_out.append(line) + file(tagpath, "wb").write("".join(lines_out).encode("utf-8")) + + def _add_project_code(self, path): + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-source.html + # http://regexps.srparish.net/tutorial-tla/importing-first.html + self._invoke_client("init-tree", self._project_name, + directory=path) + self._adjust_naming_conventions(path) + self._invoke_client("import", "--summary", "Began versioning", + directory=path) + def _rcs_cleanup(self): + if self._tmp_project == True: + self._remove_project() + if self._tmp_archive == True: + self._remove_archive() + + def _rcs_root(self, path): + if not os.path.isdir(path): + dirname = os.path.dirname(path) + else: + dirname = path + status,output,error = self._u_invoke_client("tree-root", dirname) + root = output.rstrip('\n') + + self._get_archive_project_name(root) + + return root + + def _get_archive_name(self, root): + status,output,error = self._u_invoke_client("archives") + lines = output.split('\n') + # e.g. output: + # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52 + # /tmp/BEtestXXXXXX/rootdir + # (+ repeats) + for archive,location in zip(lines[::2], lines[1::2]): + if os.path.realpath(location) == os.path.realpath(root): + self._archive_name = archive + assert self._archive_name != None + + def _get_archive_project_name(self, root): + # get project names + status,output,error = self._u_invoke_client("tree-version", directory=root) + # e.g output + # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1 + archive_name,project_name = output.rstrip('\n').split('/') + self._archive_name = archive_name + self._project_name = project_name + def _rcs_get_user_id(self): + try: + status,output,error = self._u_invoke_client('my-id') + return output.rstrip('\n') + except Exception, e: + if 'no arch user id set' in e.args[0]: + return None + else: + raise + def _rcs_set_user_id(self, value): + self._u_invoke_client('my-id', value) + def _rcs_add(self, path): + self._u_invoke_client("add-id", path) + realpath = os.path.realpath(self._u_abspath(path)) + pathAdded = realpath in self._list_added(self.rootdir) + if self.paranoid and not pathAdded: + self._force_source(path) + def _list_added(self, root): + assert os.path.exists(root) + assert os.access(root, os.X_OK) + root = os.path.realpath(root) + status,output,error = self._u_invoke_client("inventory", "--source", + "--both", "--all", root) + inv_str = output.rstrip('\n') + return [os.path.join(root, p) for p in inv_str.split('\n')] + def _add_dir_rule(self, rule, dirname, root): + inv_path = os.path.join(dirname, '.arch-inventory') + file(inv_path, "ab").write(rule) + if os.path.realpath(inv_path) not in self._list_added(root): + paranoid = self.paranoid + self.paranoid = False + self.add(inv_path) + self.paranoid = paranoid + def _force_source(self, path): + rule = "source %s\n" % self._u_rel_path(path) + self._add_dir_rule(rule, os.path.dirname(path), self.rootdir) + if os.path.realpath(path) not in self._list_added(self.rootdir): + raise CantAddFile(path) + def _rcs_remove(self, path): + if not '.arch-ids' in path: + self._u_invoke_client("delete-id", path) + def _rcs_update(self, path): + pass + def _rcs_get_file_contents(self, path, revision=None): + if revision == None: + return file(self._u_abspath(path), "rb").read() + else: + status,output,error = \ + self._invoke_client("file-find", path, revision) + path = output.rstrip('\n') + return file(self._u_abspath(path), "rb").read() + def _rcs_duplicate_repo(self, directory, revision=None): + if revision == None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + status,output,error = \ + self._u_invoke_client("get", revision,directory) + def _rcs_commit(self, commitfile): + summary,body = self._u_parse_commitfile(commitfile) + #status,output,error = self._invoke_client("make-log") + if body == None: + status,output,error \ + = self._u_invoke_client("commit","--summary",summary) + else: + status,output,error \ + = self._u_invoke_client("commit","--summary",summary, + "--log-message",body) + revision = None + revline = re.compile("[*] committed (.*)") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revpath = match.groups()[0] + assert not " " in revpath, revpath + assert revpath.startswith(self._archive_project_name()+'--') + revision = revpath[len(self._archive_project_name()+'--'):] + return revpath class CantAddFile(Exception): def __init__(self, file): self.file = file Exception.__init__(self, "Can't automatically add file %s" % file) +class ArchTestCase(RCStestCase): + Class = Arch -def add_dir_rule(rule, dirname, root): - inv_filename = os.path.join(dirname, '.arch-inventory') - file(inv_filename, "ab").write(rule) - if os.path.realpath(inv_filename) not in list_added(root): - add_id(inv_filename, paranoid=False) - -def force_source(filename, root): - rule = "source %s\n" % rel_filename(filename, root) - add_dir_rule(rule, os.path.dirname(filename), root) - if os.path.realpath(filename) not in list_added(root): - raise CantAddFile(filename) - -def add_id(filename, paranoid=False): - invoke_client("add-id", filename) - root = tree_root(filename) - if paranoid and os.path.realpath(filename) not in list_added(root): - force_source(filename, root) - - -def delete_id(filename): - invoke_client("delete-id", filename) - -def test_helper(type): - t = temp_arch_tree(type) - dirname = os.path.join(t, ".boo") - return dirname, t - -def mkdir(path, paranoid=False): - """ - >>> import shutil - >>> dirname,t = test_helper("easy") - >>> mkdir(dirname, paranoid=False) - >>> assert os.path.realpath(dirname) in list_added(t) - >>> assert not os.path.exists(os.path.join(t, ".arch-inventory")) - >>> shutil.rmtree(t) - >>> dirname,t = test_helper("tricky") - >>> mkdir(dirname, paranoid=True) - >>> assert os.path.realpath(dirname) in list_added(t) - >>> assert os.path.exists(os.path.join(t, ".arch-inventory")) - >>> shutil.rmtree(t) - >>> dirname,t = test_helper("impossible") - >>> try: - ... mkdir(dirname, paranoid=True) - ... except CantAddFile, e: - ... print "Can't add file" - Can't add file - >>> shutil.rmtree(t) - """ - os.mkdir(path) - add_id(path, paranoid=paranoid) - -def set_file_contents(path, contents): - add = not os.path.exists(path) - file(path, "wb").write(contents) - if add: - add_id(path) - - -def path_in_reference(bug_dir, spec): - if spec is not None: - return invoke_client("file-find", bug_dir, spec).rstrip('\n') - return invoke_client("file-find", bug_dir).rstrip('\n') - - -def unlink(path): - try: - os.unlink(path) - delete_id(path) - except OSError, e: - if e.errno != 2: - raise - - -def detect(path): - """Detect whether a directory is revision-controlled using Arch""" - path = os.path.realpath(path) - old_path = None - while True: - if os.path.exists(os.path.join(path, "{arch}")): - return True - if path == old_path: - return False - old_path = path - path = os.path.join('..', path) - -def precommit(directory): - pass - -def commit(directory, summary, body=None): - pass - -def postcommit(directory): - pass - - -name = "Arch" +unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |