aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/arch.py
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/arch.py')
-rw-r--r--libbe/arch.py425
1 files changed, 247 insertions, 178 deletions
diff --git a/libbe/arch.py b/libbe/arch.py
index 038325a..fd953a4 100644
--- a/libbe/arch.py
+++ b/libbe/arch.py
@@ -14,196 +14,265 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-from subprocess import Popen, PIPE
import os
+import shutil
+import time
+import re
+import unittest
+import doctest
+
import config
-import errno
+from beuuid import uuid_gen
+from rcs import RCS, RCStestCase, CommandError
+
client = config.get_val("arch_client")
if client is None:
client = "tla"
config.set_val("arch_client", client)
-def invoke(args):
- try :
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- except OSError, e :
- strerror = "%s\nwhile executing %s" % (e.args[1], args)
- raise Exception("Command failed: %s" % strerror)
- output = q.stdout.read()
- error = q.stderr.read()
- status = q.wait()
- if status >= 0:
- return status, output, error
- raise Exception("Command failed: %s" % error)
-
-
-def invoke_client(*args, **kwargs):
- cl_args = [client]
- cl_args.extend(args)
- status,output,error = invoke(cl_args)
- if status not in (0,):
- raise Exception("Command failed: %s" % error)
- return output
-
-def get_user_id():
- try:
- return invoke_client('my-id')
- except Exception, e:
- if 'no arch user id set' in e.args[0]:
- return None
+def new():
+ return Arch()
+
+class Arch(RCS):
+ name = "Arch"
+ client = client
+ versioned = True
+ _archive_name = None
+ _archive_dir = None
+ _tmp_archive = False
+ _project_name = None
+ _tmp_project = False
+ _arch_paramdir = os.path.expanduser("~/.arch-params")
+ def _rcs_help(self):
+ status,output,error = self._u_invoke_client("--help")
+ return output
+ def _rcs_detect(self, path):
+ """Detect whether a directory is revision-controlled using Arch"""
+ if self._u_search_parent_directories(path, "{arch}") != None :
+ return True
+ return False
+ def _rcs_init(self, path):
+ self._create_archive(path)
+ self._create_project(path)
+ self._add_project_code(path)
+ def _create_archive(self, path):
+ # Create a new archive
+ # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
+ assert self._archive_name == None
+ id = self.get_user_id()
+ name, email = self._u_parse_id(id)
+ if email == None:
+ email = "%s@example.com" % name
+ trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8])
+ self._archive_name = "%s--%s" % (email, trailer)
+ self._archive_dir = "/tmp/%s" % trailer
+ self._tmp_archive = True
+ self._u_invoke_client("make-archive", self._archive_name,
+ self._archive_dir, directory=path)
+ def _invoke_client(self, *args, **kwargs):
+ """
+ Invoke the client on our archive.
+ """
+ assert self._archive_name != None
+ command = args[0]
+ if len(args) > 1:
+ tailargs = args[1:]
else:
- raise
-
-
-def set_user_id(value):
- invoke_client('my-id', value)
-
-
-def ensure_user_id():
- if get_user_id() is None:
- set_user_id('nobody <nobody@example.com>')
-
-
-def write_tree_settings(contents, path):
- file(os.path.join(path, "{arch}", "=tagging-method"), "wb").write(contents)
-
-def init_tree(path):
- invoke_client("init-tree", "-d", path)
-
-def temp_arch_tree(type="easy"):
- import tempfile
- ensure_user_id()
- path = tempfile.mkdtemp()
- init_tree(path)
- if type=="easy":
- write_tree_settings("source ^.*$\n", path)
- elif type=="tricky":
- write_tree_settings("source ^$\n", path)
- else:
- assert (type=="impossible")
- add_dir_rule("precious ^\.boo$", path, path)
- return path
-
-def list_added(root):
- assert os.path.exists(root)
- assert os.access(root, os.X_OK)
- root = os.path.realpath(root)
- inv_str = invoke_client("inventory", "--source", '--both', '--all', root)
- return [os.path.join(root, p) for p in inv_str.split('\n')]
-
-def tree_root(filename):
- assert os.path.exists(filename)
- if not os.path.isdir(filename):
- dirname = os.path.dirname(filename)
- else:
- dirname = filename
- return invoke_client("tree-root", dirname).rstrip('\n')
-
-def rel_filename(filename, root):
- filename = os.path.realpath(filename)
- root = os.path.realpath(root)
- assert(filename.startswith(root))
- return filename[len(root)+1:]
+ tailargs = []
+ arglist = [command, "-A", self._archive_name]
+ arglist.extend(tailargs)
+ args = tuple(arglist)
+ return self._u_invoke_client(*args, **kwargs)
+ def _remove_archive(self):
+ assert self._tmp_archive == True
+ assert self._archive_dir != None
+ assert self._archive_name != None
+ os.remove(os.path.join(self._arch_paramdir,
+ "=locations", self._archive_name))
+ shutil.rmtree(self._archive_dir)
+ self._tmp_archive = False
+ self._archive_dir = False
+ self._archive_name = False
+ def _create_project(self, path):
+ """
+ Create a temporary Arch project in the directory PATH. This
+ project will be removed by
+ __del__->cleanup->_rcs_cleanup->_remove_project
+ """
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
+ category = "bugs-everywhere"
+ branch = "mainline"
+ version = "0.1"
+ self._project_name = "%s--%s--%s" % (category, branch, version)
+ self._invoke_client("archive-setup", self._project_name,
+ directory=path)
+ self._tmp_project = True
+ def _remove_project(self):
+ assert self._tmp_project == True
+ assert self._project_name != None
+ assert self._archive_dir != None
+ shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
+ self._tmp_project = False
+ self._project_name = False
+ def _archive_project_name(self):
+ assert self._archive_name != None
+ assert self._project_name != None
+ return "%s/%s" % (self._archive_name, self._project_name)
+ def _adjust_naming_conventions(self, path):
+ """
+ By default, Arch restricts source code filenames to
+ ^[_=a-zA-Z0-9].*$
+ See
+ http://regexps.srparish.net/tutorial-tla/naming-conventions.html
+ Since our bug directory '.be' doesn't satisfy these conventions,
+ we need to adjust them.
+
+ The conventions are specified in
+ project-root/{arch}/=tagging-method
+ """
+ tagpath = os.path.join(path, "{arch}", "=tagging-method")
+ lines_out = []
+ for line in file(tagpath, "rb"):
+ line.decode("utf-8")
+ if line.startswith("source "):
+ lines_out.append("source ^[._=a-zA-X0-9].*$\n")
+ else:
+ lines_out.append(line)
+ file(tagpath, "wb").write("".join(lines_out).encode("utf-8"))
+
+ def _add_project_code(self, path):
+ # http://mwolson.org/projects/GettingStartedWithArch.html
+ # http://regexps.srparish.net/tutorial-tla/new-source.html
+ # http://regexps.srparish.net/tutorial-tla/importing-first.html
+ self._invoke_client("init-tree", self._project_name,
+ directory=path)
+ self._adjust_naming_conventions(path)
+ self._invoke_client("import", "--summary", "Began versioning",
+ directory=path)
+ def _rcs_cleanup(self):
+ if self._tmp_project == True:
+ self._remove_project()
+ if self._tmp_archive == True:
+ self._remove_archive()
+
+ def _rcs_root(self, path):
+ if not os.path.isdir(path):
+ dirname = os.path.dirname(path)
+ else:
+ dirname = path
+ status,output,error = self._u_invoke_client("tree-root", dirname)
+ root = output.rstrip('\n')
+
+ self._get_archive_project_name(root)
+
+ return root
+
+ def _get_archive_name(self, root):
+ status,output,error = self._u_invoke_client("archives")
+ lines = output.split('\n')
+ # e.g. output:
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52
+ # /tmp/BEtestXXXXXX/rootdir
+ # (+ repeats)
+ for archive,location in zip(lines[::2], lines[1::2]):
+ if os.path.realpath(location) == os.path.realpath(root):
+ self._archive_name = archive
+ assert self._archive_name != None
+
+ def _get_archive_project_name(self, root):
+ # get project names
+ status,output,error = self._u_invoke_client("tree-version", directory=root)
+ # e.g output
+ # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1
+ archive_name,project_name = output.rstrip('\n').split('/')
+ self._archive_name = archive_name
+ self._project_name = project_name
+ def _rcs_get_user_id(self):
+ try:
+ status,output,error = self._u_invoke_client('my-id')
+ return output.rstrip('\n')
+ except Exception, e:
+ if 'no arch user id set' in e.args[0]:
+ return None
+ else:
+ raise
+ def _rcs_set_user_id(self, value):
+ self._u_invoke_client('my-id', value)
+ def _rcs_add(self, path):
+ self._u_invoke_client("add-id", path)
+ realpath = os.path.realpath(self._u_abspath(path))
+ pathAdded = realpath in self._list_added(self.rootdir)
+ if self.paranoid and not pathAdded:
+ self._force_source(path)
+ def _list_added(self, root):
+ assert os.path.exists(root)
+ assert os.access(root, os.X_OK)
+ root = os.path.realpath(root)
+ status,output,error = self._u_invoke_client("inventory", "--source",
+ "--both", "--all", root)
+ inv_str = output.rstrip('\n')
+ return [os.path.join(root, p) for p in inv_str.split('\n')]
+ def _add_dir_rule(self, rule, dirname, root):
+ inv_path = os.path.join(dirname, '.arch-inventory')
+ file(inv_path, "ab").write(rule)
+ if os.path.realpath(inv_path) not in self._list_added(root):
+ paranoid = self.paranoid
+ self.paranoid = False
+ self.add(inv_path)
+ self.paranoid = paranoid
+ def _force_source(self, path):
+ rule = "source %s\n" % self._u_rel_path(path)
+ self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
+ if os.path.realpath(path) not in self._list_added(self.rootdir):
+ raise CantAddFile(path)
+ def _rcs_remove(self, path):
+ if not '.arch-ids' in path:
+ self._u_invoke_client("delete-id", path)
+ def _rcs_update(self, path):
+ pass
+ def _rcs_get_file_contents(self, path, revision=None):
+ if revision == None:
+ return file(self._u_abspath(path), "rb").read()
+ else:
+ status,output,error = \
+ self._invoke_client("file-find", path, revision)
+ path = output.rstrip('\n')
+ return file(self._u_abspath(path), "rb").read()
+ def _rcs_duplicate_repo(self, directory, revision=None):
+ if revision == None:
+ RCS._rcs_duplicate_repo(self, directory, revision)
+ else:
+ status,output,error = \
+ self._u_invoke_client("get", revision,directory)
+ def _rcs_commit(self, commitfile):
+ summary,body = self._u_parse_commitfile(commitfile)
+ #status,output,error = self._invoke_client("make-log")
+ if body == None:
+ status,output,error \
+ = self._u_invoke_client("commit","--summary",summary)
+ else:
+ status,output,error \
+ = self._u_invoke_client("commit","--summary",summary,
+ "--log-message",body)
+ revision = None
+ revline = re.compile("[*] committed (.*)")
+ match = revline.search(output)
+ assert match != None, output+error
+ assert len(match.groups()) == 1
+ revpath = match.groups()[0]
+ assert not " " in revpath, revpath
+ assert revpath.startswith(self._archive_project_name()+'--')
+ revision = revpath[len(self._archive_project_name()+'--'):]
+ return revpath
class CantAddFile(Exception):
def __init__(self, file):
self.file = file
Exception.__init__(self, "Can't automatically add file %s" % file)
+class ArchTestCase(RCStestCase):
+ Class = Arch
-def add_dir_rule(rule, dirname, root):
- inv_filename = os.path.join(dirname, '.arch-inventory')
- file(inv_filename, "ab").write(rule)
- if os.path.realpath(inv_filename) not in list_added(root):
- add_id(inv_filename, paranoid=False)
-
-def force_source(filename, root):
- rule = "source %s\n" % rel_filename(filename, root)
- add_dir_rule(rule, os.path.dirname(filename), root)
- if os.path.realpath(filename) not in list_added(root):
- raise CantAddFile(filename)
-
-def add_id(filename, paranoid=False):
- invoke_client("add-id", filename)
- root = tree_root(filename)
- if paranoid and os.path.realpath(filename) not in list_added(root):
- force_source(filename, root)
-
-
-def delete_id(filename):
- invoke_client("delete-id", filename)
-
-def test_helper(type):
- t = temp_arch_tree(type)
- dirname = os.path.join(t, ".boo")
- return dirname, t
-
-def mkdir(path, paranoid=False):
- """
- >>> import shutil
- >>> dirname,t = test_helper("easy")
- >>> mkdir(dirname, paranoid=False)
- >>> assert os.path.realpath(dirname) in list_added(t)
- >>> assert not os.path.exists(os.path.join(t, ".arch-inventory"))
- >>> shutil.rmtree(t)
- >>> dirname,t = test_helper("tricky")
- >>> mkdir(dirname, paranoid=True)
- >>> assert os.path.realpath(dirname) in list_added(t)
- >>> assert os.path.exists(os.path.join(t, ".arch-inventory"))
- >>> shutil.rmtree(t)
- >>> dirname,t = test_helper("impossible")
- >>> try:
- ... mkdir(dirname, paranoid=True)
- ... except CantAddFile, e:
- ... print "Can't add file"
- Can't add file
- >>> shutil.rmtree(t)
- """
- os.mkdir(path)
- add_id(path, paranoid=paranoid)
-
-def set_file_contents(path, contents):
- add = not os.path.exists(path)
- file(path, "wb").write(contents)
- if add:
- add_id(path)
-
-
-def path_in_reference(bug_dir, spec):
- if spec is not None:
- return invoke_client("file-find", bug_dir, spec).rstrip('\n')
- return invoke_client("file-find", bug_dir).rstrip('\n')
-
-
-def unlink(path):
- try:
- os.unlink(path)
- delete_id(path)
- except OSError, e:
- if e.errno != 2:
- raise
-
-
-def detect(path):
- """Detect whether a directory is revision-controlled using Arch"""
- path = os.path.realpath(path)
- old_path = None
- while True:
- if os.path.exists(os.path.join(path, "{arch}")):
- return True
- if path == old_path:
- return False
- old_path = path
- path = os.path.join('..', path)
-
-def precommit(directory):
- pass
-
-def commit(directory, summary, body=None):
- pass
-
-def postcommit(directory):
- pass
-
-
-name = "Arch"
+unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase)
+suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])