aboutsummaryrefslogblamecommitdiffstats
path: root/libbe/arch.py
blob: 038325a1cbf5c50632c922aeec51904eae41490e (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16















                                                                              
                                  
         
             
            



                                         

                 




                                                               

                            
                     

                                    
                                                 
 
 
                                   


                                         
                          

                                                     
 


















                                                  







                                                                               
                    


























                                                                             

                                         












                                                                          
                                            






                                                          
                                     
                                     
                              
                                                                       

                                    

                        
                                        
 




                                     
                                


                                       
                                      



                                                                     
                                     




                                                                 
                                         




                                  
                  
                                   






                                      






                                                                     











                                                                      
                   


                                                        
                            
                        

                                       
 








                                          

             
# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
# <abentley@panoramicfeedback.com>
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program; if not, write to the Free Software
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
from subprocess import Popen, PIPE
import os
import config
import errno
client = config.get_val("arch_client")
if client is None:
    client = "tla"
    config.set_val("arch_client", client)

def invoke(args):
    try :
        q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
    except OSError, e :
        strerror = "%s\nwhile executing %s" % (e.args[1], args)
        raise Exception("Command failed: %s" % strerror)
    output = q.stdout.read()
    error = q.stderr.read()
    status = q.wait()
    if status >= 0:
        return status, output, error
    raise Exception("Command failed: %s" % error)


def invoke_client(*args, **kwargs):
    cl_args = [client]
    cl_args.extend(args)
    status,output,error = invoke(cl_args)
    if status not in (0,):
        raise Exception("Command failed: %s" % error)
    return output

def get_user_id():
    try:
        return invoke_client('my-id')
    except Exception, e:
        if 'no arch user id set' in e.args[0]:
            return None
        else:
            raise


def set_user_id(value):
    invoke_client('my-id', value)


def ensure_user_id():
    if get_user_id() is None:
        set_user_id('nobody <nobody@example.com>')
 

def write_tree_settings(contents, path):
    file(os.path.join(path, "{arch}", "=tagging-method"), "wb").write(contents)

def init_tree(path):
    invoke_client("init-tree", "-d", path)

def temp_arch_tree(type="easy"):
    import tempfile
    ensure_user_id()
    path = tempfile.mkdtemp()
    init_tree(path)
    if type=="easy":
        write_tree_settings("source ^.*$\n", path)
    elif type=="tricky":
        write_tree_settings("source ^$\n", path)
    else:
        assert (type=="impossible")
        add_dir_rule("precious ^\.boo$", path, path)
    return path

def list_added(root):
    assert os.path.exists(root)
    assert os.access(root, os.X_OK)
    root = os.path.realpath(root)
    inv_str = invoke_client("inventory", "--source", '--both', '--all', root)
    return [os.path.join(root, p) for p in inv_str.split('\n')]

def tree_root(filename):
    assert os.path.exists(filename)
    if not os.path.isdir(filename):
        dirname = os.path.dirname(filename)
    else:
        dirname = filename
    return invoke_client("tree-root", dirname).rstrip('\n')

def rel_filename(filename, root):
    filename = os.path.realpath(filename)
    root = os.path.realpath(root)
    assert(filename.startswith(root))
    return filename[len(root)+1:]

class CantAddFile(Exception):
    def __init__(self, file):
        self.file = file
        Exception.__init__(self, "Can't automatically add file %s" % file)
    

def add_dir_rule(rule, dirname, root):
    inv_filename = os.path.join(dirname, '.arch-inventory')
    file(inv_filename, "ab").write(rule)
    if os.path.realpath(inv_filename) not in list_added(root):
        add_id(inv_filename, paranoid=False)

def force_source(filename, root):
    rule = "source %s\n" % rel_filename(filename, root)
    add_dir_rule(rule, os.path.dirname(filename), root)
    if os.path.realpath(filename) not in list_added(root):
        raise CantAddFile(filename)

def add_id(filename, paranoid=False):
    invoke_client("add-id", filename)
    root = tree_root(filename)
    if paranoid and os.path.realpath(filename) not in list_added(root):
        force_source(filename, root)


def delete_id(filename):
    invoke_client("delete-id", filename)

def test_helper(type):
    t = temp_arch_tree(type)
    dirname = os.path.join(t, ".boo")
    return dirname, t

def mkdir(path, paranoid=False):
    """
    >>> import shutil
    >>> dirname,t = test_helper("easy")
    >>> mkdir(dirname, paranoid=False)
    >>> assert os.path.realpath(dirname) in list_added(t)
    >>> assert not os.path.exists(os.path.join(t, ".arch-inventory"))
    >>> shutil.rmtree(t)
    >>> dirname,t = test_helper("tricky")
    >>> mkdir(dirname, paranoid=True)
    >>> assert os.path.realpath(dirname) in list_added(t)
    >>> assert os.path.exists(os.path.join(t, ".arch-inventory"))
    >>> shutil.rmtree(t)
    >>> dirname,t = test_helper("impossible")
    >>> try:
    ...     mkdir(dirname, paranoid=True)
    ... except CantAddFile, e:
    ...     print "Can't add file"
    Can't add file
    >>> shutil.rmtree(t)
    """
    os.mkdir(path)
    add_id(path, paranoid=paranoid)

def set_file_contents(path, contents):
    add = not os.path.exists(path)
    file(path, "wb").write(contents)
    if add:
        add_id(path)


def path_in_reference(bug_dir, spec):
    if spec is not None:
        return invoke_client("file-find", bug_dir, spec).rstrip('\n')
    return invoke_client("file-find", bug_dir).rstrip('\n')


def unlink(path):
    try:
        os.unlink(path)
        delete_id(path)
    except OSError, e:
        if e.errno != 2:
            raise


def detect(path):
    """Detect whether a directory is revision-controlled using Arch"""
    path = os.path.realpath(path)
    old_path = None
    while True:
        if os.path.exists(os.path.join(path, "{arch}")):
            return True
        if path == old_path:
            return False
        old_path = path
        path = os.path.join('..', path)

def precommit(directory):
    pass

def commit(directory, summary, body=None):
    pass

def postcommit(directory):
    pass


name = "Arch"