aboutsummaryrefslogblamecommitdiffstats
path: root/libbe/storage/vcs/base.py
blob: abc7a017b490c5630b8ec82952f0560206d0ffc6 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11

                                                              
                                                              
                                                     
                                                           
                                                           
 



                                                                      
 



                                                                 
 


                                                                         
 





                                                                    
             

              
         

                              
          
               
 
            
                                                  
                                        
                             
 




                         









                                                              
 

                                                                         

                                                


                                
                     
                
    


                                                              
 


                                                                         
 


                                                         
 
 
 


                                                 
                              
                       
                              

                                     
                               






                                                         


                                                            
 



                                                        
 
          
                
 
                  
       
                                               
 

                                                                      
    
                                                                    
            































































































































                                                                                     



                                                        
                                                                          




                                     
                                



                                                              
                           
           
                                      
           
                    
                                     
           
                                                                        

                   
                              
           
                                                                    
                                                                    
                               





                                           
                              



                                                
                           
           
                                                                


                       
                               
           

                                                                             

                   
                                      
           

                                                                            


                                                       
                             



                                                                
                                




                                                                 
                                




                                                                     
                                                                        
           
                                                               


                                                      
                                                                         


                                                                                 
                                                            


                           
                                                            





                                                             
                                                         


                                                                   
                                                        


                                                                  

                   
                                      








                                                                      




                                                  
                           
            

                                     

                                       


                                
                            

                        
                                  

                       
                               
           
                                                                        
           
                                     

                         
                                                                   

                                                         
                                           


                                                
                                   


                                        
                            

                       
                           

                          

                                                                             
                                                                           
                                                         
           


                                    
                                    











                                                        

                                                                            

                                                       
                                    



                                                                
                                             



                                                                   
                                                






                                                                

                                       





                                                                          
                                                            






                                                                     

                                                                                       


                                                      



                                                                      
           

                                    
                                             
                                            
                                                                                  
             



                                                         


                               
                                                                                  



                                                    



                                                     

                         
        
                                             



                                 
                                                                  
           

                                                                
           


                                                                    
                                                               

                                    
                                                 
                              

                                      
                                                 


                                                                             





                                                                               
                                                                
                                          
                                                                     

                                                                 
                                                                      









                                                                
                                                            



                                                                     


                                                                   
           
                                      
                            
                                                 





                                                   
                            
                                                                          
                             
                             


                               



                                              
            



                                               
            










                                                                      
                                           








                                                                







                                                  
                                                

                               
                                                













                                                                     
                                                        
                                           
           

                                                              

                                      
                      


                                                
                               

                                                             


                                                    
                           
                      


                                                   

                                                    
            
                                                  



                                    
                                  






                                                    


                                                   

                                                   


                        
                                    
                                  
                               








                                                                      


                                                              

                                           


                        
                                                         



                                                        

                                                            
                                     
                                        




                                            
             


                                            

                                                          
                                        
                                       

                          
                                                                     

































                                                               
                                                       




                              
                 
                              



























                                                                     
        
 





                                                     
 





                                                                              
 


                                                                
 




                                                           
 
                                           
 

                                            
 
                   
 


                                                              
 


                                            
 



                                               
 

                                                       

 

                                             
 














                                                                              

 

                                                    
 



                                                           
 



                                            

 

                                                    
 

                                                         
 


























                                                                                
                                 















































































































































                                                                              
                                            










































                                                                               
# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
#                         Alexander Belchenko <bialix@ukr.net>
#                         Ben Finney <benf@cybersource.com.au>
#                         Chris Ball <cjb@laptop.org>
#                         Gianluca Montecchi <gian@grys.it>
#                         W. Trevor King <wking@drexel.edu>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""
Define the base VCS (Version Control System) class, which should be
subclassed by other Version Control System backends.  The base class
implements a "do not version" VCS.
"""

import codecs
import os
import os.path
import re
from socket import gethostname
import shutil
import sys
import tempfile

import libbe
from utility import Dir, search_parent_directories
from subproc import CommandError, invoke
from plugin import get_plugin

if libbe.TESTING == True:
    import unittest
    import doctest


# List VCS modules in order of preference.
# Don't list this module, it is implicitly last.
VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']

def set_preferred_vcs(name):
    global VCS_ORDER
    assert name in VCS_ORDER, \
        'unrecognized VCS %s not in\n  %s' % (name, VCS_ORDER)
    VCS_ORDER.remove(name)
    VCS_ORDER.insert(0, name)

def _get_matching_vcs(matchfn):
    """Return the first module for which matchfn(VCS_instance) is true"""
    for submodname in VCS_ORDER:
        module = get_plugin('libbe', submodname)
        vcs = module.new()
        if matchfn(vcs) == True:
            return vcs
        vcs.cleanup()
    return VCS()
    
def vcs_by_name(vcs_name):
    """Return the module for the VCS with the given name"""
    return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)

def detect_vcs(dir):
    """Return an VCS instance for the vcs being used in this directory"""
    return _get_matching_vcs(lambda vcs: vcs.detect(dir))

def installed_vcs():
    """Return an instance of an installed VCS"""
    return _get_matching_vcs(lambda vcs: vcs.installed())



class SettingIDnotSupported(NotImplementedError):
    pass

class VCSnotRooted(Exception):
    def __init__(self):
        msg = "VCS not rooted"
        Exception.__init__(self, msg)

class PathNotInRoot(Exception):
    def __init__(self, path, root):
        msg = "Path '%s' not in root '%s'" % (path, root)
        Exception.__init__(self, msg)
        self.path = path
        self.root = root

class NoSuchFile(Exception):
    def __init__(self, pathname, root="."):
        path = os.path.abspath(os.path.join(root, pathname))
        Exception.__init__(self, "No such file: %s" % path)

class EmptyCommit(Exception):
    def __init__(self):
        Exception.__init__(self, "No changes to commit")


def new():
    return VCS()

class VCS(object):
    """
    This class implements a 'no-vcs' interface.

    Support for other VCSs can be added by subclassing this class, and
    overriding methods _vcs_*() with code appropriate for your VCS.
    
    The methods _u_*() are utility methods available to the _vcs_*()
    methods.

    Sink to existing root
    ======================

    Consider the following usage case:
    You have a bug directory rooted in
      /path/to/source
    by which I mean the '.be' directory is at
      /path/to/source/.be
    However, you're of in some subdirectory like
      /path/to/source/GUI/testing
    and you want to comment on a bug.  Setting sink_to_root=True wen
    you initialize your BugDir will cause it to search for the '.be'
    file in the ancestors of the path you passed in as 'root'.
      /path/to/source/GUI/testing/.be     miss
      /path/to/source/GUI/.be             miss
      /path/to/source/.be                 hit!
    So it still roots itself appropriately without much work for you.

    File-system access
    ==================

    BugDirs live completely in memory when .sync_with_disk is False.
    This is the default configuration setup by BugDir(from_disk=False).
    If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
    any changes to the BugDir will be immediately written to disk.

    If you want to change .sync_with_disk, we suggest you use
    .set_sync_with_disk(), which propogates the new setting through to
    all bugs/comments/etc. that have been loaded into memory.  If
    you've been living in memory and want to move to
    .sync_with_disk==True, but you're not sure if anything has been
    changed in memory, a call to .save() immediately before the
    .set_sync_with_disk(True) call is a safe move.

    Regardless of .sync_with_disk, a call to .save() will write out
    all the contents that the BugDir instance has loaded into memory.
    If sync_with_disk has been True over the course of all interesting
    changes, this .save() call will be a waste of time.

    The BugDir will only load information from the file system when it
    loads new settings/bugs/comments that it doesn't already have in
    memory and .sync_with_disk == True.

    Allow storage initialization
    ========================

    This one is for testing purposes.  Setting it to True allows the
    BugDir to search for an installed Storage backend and initialize
    it in the root directory.  This is a convenience option for
    supporting tests of versioning functionality
    (e.g. .duplicate_bugdir).

    Disable encoding manipulation
    =============================

    This one is for testing purposed.  You might have non-ASCII
    Unicode in your bugs, comments, files, etc.  BugDir instances try
    and support your preferred encoding scheme (e.g. "utf-8") when
    dealing with stream and file input/output.  For stream output,
    this involves replacing sys.stdout and sys.stderr
    (libbe.encode.set_IO_stream_encodings).  However this messes up
    doctest's output catching.  In order to support doctest tests
    using BugDirs, set manipulate_encodings=False, and stick to ASCII
    in your tests.

        if root == None:
            root = os.getcwd()
        if sink_to_existing_root == True:
            self.root = self._find_root(root)
        else:
            if not os.path.exists(root):
                self.root = None
                raise NoRootEntry(root)
            self.root = root
        # get a temporary storage until we've loaded settings
        self.sync_with_disk = False
        self.storage = self._guess_storage()

            if assert_new_BugDir == True:
                if os.path.exists(self.get_path()):
                    raise AlreadyInitialized, self.get_path()
            if storage == None:
                storage = self._guess_storage(allow_storage_init)
            self.storage = storage
            self._setup_user_id(self.user_id)


    # methods for getting the BugDir situated in the filesystem

    def _find_root(self, path):
        """
        Search for an existing bug database dir and it's ancestors and
        return a BugDir rooted there.  Only called by __init__, and
        then only if sink_to_existing_root == True.
        """
        if not os.path.exists(path):
            self.root = None
            raise NoRootEntry(path)
        versionfile=utility.search_parent_directories(path,
                                                      os.path.join(".be", "version"))
        if versionfile != None:
            beroot = os.path.dirname(versionfile)
            root = os.path.dirname(beroot)
            return root
        else:
            beroot = utility.search_parent_directories(path, ".be")
            if beroot == None:
                self.root = None
                raise NoBugDir(path)
            return beroot

    def _guess_storage(self, allow_storage_init=False):
        """
        Only called by __init__.
        """
        deepdir = self.get_path()
        if not os.path.exists(deepdir):
            deepdir = os.path.dirname(deepdir)
        new_storage = storage.detect_storage(deepdir)
        install = False
        if new_storage.name == "None":
            if allow_storage_init == True:
                new_storage = storage.installed_storage()
                new_storage.init(self.root)
        return new_storage

os.listdir(self.get_path("bugs")):
    """
    name = "None"
    client = "" # command-line tool for _u_invoke_client
    versioned = False
    def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()):
        self.paranoid = paranoid
        self.verboseInvoke = False
        self.rootdir = None
        self._duplicateBasedir = None
        self._duplicateDirname = None
        self.encoding = encoding
    def __str__(self):
        return "<%s %s>" % (self.__class__.__name__, id(self))
    def __repr__(self):
        return str(self)
    def _vcs_version(self):
        """
        Return the VCS version string.
        """
        return "0.0"
    def _vcs_detect(self, path=None):
        """
        Detect whether a directory is revision controlled with this VCS.
        """
        return True
    def _vcs_root(self, path):
        """
        Get the VCS root.  This is the default working directory for
        future invocations.  You would normally set this to the root
        directory for your VCS.
        """
        if os.path.isdir(path)==False:
            path = os.path.dirname(path)
            if path == "":
                path = os.path.abspath(".")
        return path
    def _vcs_init(self, path):
        """
        Begin versioning the tree based at path.
        """
        pass
    def _vcs_cleanup(self):
        """
        Remove any cruft that _vcs_init() created outside of the
        versioned tree.
        """
        pass
    def _vcs_get_user_id(self):
        """
        Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
        If the VCS has not been configured with a username, return None.
        """
        return None
    def _vcs_set_user_id(self, value):
        """
        Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
        This is run if the VCS has not been configured with a usename, so
        that commits will have a reasonable FROM value.
        """
        raise SettingIDnotSupported
    def _vcs_add(self, path):
        """
        Add the already created file at path to version control.
        """
        pass
    def _vcs_remove(self, path):
        """
        Remove the file at path from version control.  Optionally
        remove the file from the filesystem as well.
        """
        pass
    def _vcs_update(self, path):
        """
        Notify the versioning system of changes to the versioned file
        at path.
        """
        pass
    def _vcs_get_file_contents(self, path, revision=None, binary=False):
        """
        Get the file contents as they were in a given revision.
        Revision==None specifies the current revision.
        """
        assert revision == None, \
            "The %s VCS does not support revision specifiers" % self.name
        if binary == False:
            f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding)
        else:
            f = open(os.path.join(self.rootdir, path), "rb")
        contents = f.read()
        f.close()
        return contents
    def _vcs_duplicate_repo(self, directory, revision=None):
        """
        Get the repository as it was in a given revision.
        revision==None specifies the current revision.
        dir specifies a directory to create the duplicate in.
        """
        shutil.copytree(self.rootdir, directory, True)
    def _vcs_commit(self, commitfile, allow_empty=False):
        """
        Commit the current working directory, using the contents of
        commitfile as the comment.  Return the name of the old
        revision (or None if commits are not supported).
        
        If allow_empty == False, raise EmptyCommit if there are no
        changes to commit.
        """
        return None
    def _vcs_revision_id(self, index):
        """
        Return the name of the <index>th revision.  Index will be an
        integer (possibly <= 0).  The choice of which branch to follow
        when crossing branches/merges is not defined.

        Return None if revision IDs are not supported, or if the
        specified revision does not exist.
        """
        return None
    def version(self):
        """Cache version string for efficiency."""
        if not hasattr(self, '_version'):
            self._version = self._get_version()
        return self._version
    def _get_version(self):
        try:
            ret = self._vcs_version()
            return ret
        except OSError, e:
            if e.errno == errno.ENOENT:
                return None
            else:
                raise OSError, e
        except CommandError:
            return None
    def installed(self):
        if self.version() != None:
            return True
        return False
    def detect(self, path="."):
        """
        Detect whether a directory is revision controlled with this VCS.
        """
        return self._vcs_detect(path)
    def root(self, path):
        """
        Set the root directory to the path's VCS root.  This is the
        default working directory for future invocations.
        """
        self.rootdir = self._vcs_root(path)
    def init(self, path):
        """
        Begin versioning the tree based at path.
        Also roots the vcs at path.
        """
        if os.path.isdir(path)==False:
            path = os.path.dirname(path)
        self._vcs_init(path)
        self.root(path)
    def cleanup(self):
        self._vcs_cleanup()
    def get_user_id(self):
        """
        Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
        If the VCS has not been configured with a username, return the user's
        id.  You can override the automatic lookup procedure by setting the
        VCS.user_id attribute to a string of your choice.
        """
        if hasattr(self, "user_id"):
            if self.user_id != None:
                return self.user_id
        id = self._vcs_get_user_id()
        if id == None:
            name = self._u_get_fallback_username()
            email = self._u_get_fallback_email()
            id = self._u_create_id(name, email)
            print >> sys.stderr, "Guessing id '%s'" % id
            try:
                self.set_user_id(id)
            except SettingIDnotSupported:
                pass
        return id
    def set_user_id(self, value):
        """
        Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
        This is run if the VCS has not been configured with a usename, so
        that commits will have a reasonable FROM value.
        """
        self._vcs_set_user_id(value)
    def add(self, path):
        """
        Add the already created file at path to version control.
        """
        self._vcs_add(self._u_rel_path(path))
    def remove(self, path):
        """
        Remove a file from both version control and the filesystem.
        """
        self._vcs_remove(self._u_rel_path(path))
        if os.path.exists(path):
            os.remove(path)
    def recursive_remove(self, dirname):
        """
        Remove a file/directory and all its decendents from both
        version control and the filesystem.
        """
        if not os.path.exists(dirname):
            raise NoSuchFile(dirname)
        for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
            filenames.extend(dirnames)
            for path in filenames:
                fullpath = os.path.join(dirpath, path)
                if os.path.exists(fullpath) == False:
                    continue
                self._vcs_remove(self._u_rel_path(fullpath))
        if os.path.exists(dirname):
            shutil.rmtree(dirname)
    def update(self, path):
        """
        Notify the versioning system of changes to the versioned file
        at path.
        """
        self._vcs_update(self._u_rel_path(path))
    def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False):
        """
        Get the file as it was in a given revision.
        Revision==None specifies the current revision.

        allow_no_vcs==True allows direct access to files through
        codecs.open() or open() if the vcs decides it can't handle the
        given path.
        """
        if not os.path.exists(path):
            raise NoSuchFile(path)
        if self._use_vcs(path, allow_no_vcs):
            relpath = self._u_rel_path(path)
            contents = self._vcs_get_file_contents(relpath,revision,binary=binary)
        else:
            if binary == True:
                f = codecs.open(path, "r", self.encoding)
            else:
                f = open(path, "rb")
            contents = f.read()
            f.close()
        return contents
    def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False):
        """
        Set the file contents under version control.
        """
        add = not os.path.exists(path)
        if binary == False:
            f = codecs.open(path, "w", self.encoding)
        else:
            f = open(path, "wb")
        f.write(contents)
        f.close()
        
        if self._use_vcs(path, allow_no_vcs):
            if add:
                self.add(path)
            else:
                self.update(path)
    def mkdir(self, path, allow_no_vcs=False, check_parents=True):
        """
        Create (if neccessary) a directory at path under version
        control.
        """
        if check_parents == True:
            parent = os.path.dirname(path)
            if not os.path.exists(parent): # recurse through parents
                self.mkdir(parent, allow_no_vcs, check_parents)
        if not os.path.exists(path):
            os.mkdir(path)
            if self._use_vcs(path, allow_no_vcs):
                self.add(path)
        else:
            assert os.path.isdir(path)
            if self._use_vcs(path, allow_no_vcs):
                #self.update(path)# Don't update directories.  Changing files
                pass              # underneath them should be sufficient.
                
    def duplicate_repo(self, revision=None):
        """
        Get the repository as it was in a given revision.
        revision==None specifies the current revision.
        Return the path to the arbitrary directory at the base of the new repo.
        """
        # Dirname in Basedir to protect against simlink attacks.
        if self._duplicateBasedir == None:
            self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs')
            self._duplicateDirname = \
                os.path.join(self._duplicateBasedir, "duplicate")
            self._vcs_duplicate_repo(directory=self._duplicateDirname,
                                     revision=revision)
        return self._duplicateDirname
    def remove_duplicate_repo(self):
        """
        Clean up a duplicate repo created with duplicate_repo().
        """
        if self._duplicateBasedir != None:
            shutil.rmtree(self._duplicateBasedir)
            self._duplicateBasedir = None
            self._duplicateDirname = None
    def commit(self, summary, body=None, allow_empty=False):
        """
        Commit the current working directory, with a commit message
        string summary and body.  Return the name of the old revision
        (or None if versioning is not supported).
        
        If allow_empty == False (the default), raise EmptyCommit if
        there are no changes to commit.
        """
        summary = summary.strip()+'\n'
        if body is not None:
            summary += '\n' + body.strip() + '\n'
        descriptor, filename = tempfile.mkstemp()
        revision = None
        try:
            temp_file = os.fdopen(descriptor, 'wb')
            temp_file.write(summary)
            temp_file.flush()
            self.precommit()
            revision = self._vcs_commit(filename, allow_empty=allow_empty)
            temp_file.close()
            self.postcommit()
        finally:
            os.remove(filename)
        return revision
    def precommit(self):
        """
        Executed before all attempted commits.
        """
        pass
    def postcommit(self):
        """
        Only executed after successful commits.
        """
        pass
    def revision_id(self, index=None):
        """
        Return the name of the <index>th revision.  The choice of
        which branch to follow when crossing branches/merges is not
        defined.

        Return None if index==None, revision IDs are not supported, or
        if the specified revision does not exist.
        """
        if index == None:
            return None
        return self._vcs_revision_id(index)
    def _u_any_in_string(self, list, string):
        """
        Return True if any of the strings in list are in string.
        Otherwise return False.
        """
        for list_string in list:
            if list_string in string:
                return True
        return False
    def _u_invoke(self, *args, **kwargs):
        if 'cwd' not in kwargs:
            kwargs['cwd'] = self.rootdir
        if 'verbose' not in kwargs:
            kwargs['verbose'] = self.verboseInvoke
        if 'encoding' not in kwargs:
            kwargs['encoding'] = self.encoding
        return invoke(*args, **kwargs)
    def _u_invoke_client(self, *args, **kwargs):
        cl_args = [self.client]
        cl_args.extend(args)
        return self._u_invoke(cl_args, **kwargs)
    def _u_search_parent_directories(self, path, filename):
        """
        Find the file (or directory) named filename in path or in any
        of path's parents.
        
        e.g.
          search_parent_directories("/a/b/c", ".be")
        will return the path to the first existing file from
          /a/b/c/.be
          /a/b/.be
          /a/.be
          /.be
        or None if none of those files exist.
        """
        return search_parent_directories(path, filename)
    def _use_vcs(self, path, allow_no_vcs):
        """
        Try and decide if _vcs_add/update/mkdir/etc calls will
        succeed.  Returns True is we think the vcs_call would
        succeeed, and False otherwise.
        """
        use_vcs = True
        exception = None
        if self.rootdir != None:
            if self.path_in_root(path) == False:
                use_vcs = False
                exception = PathNotInRoot(path, self.rootdir)
        else:
            use_vcs = False
            exception = VCSnotRooted
        if use_vcs == False and allow_no_vcs==False:
            raise exception
        return use_vcs
    def path_in_root(self, path, root=None):
        """
        Return the relative path to path from root.
        >>> vcs = new()
        >>> vcs.path_in_root("/a.b/c/.be", "/a.b/c")
        True
        >>> vcs.path_in_root("/a.b/.be", "/a.b/c")
        False
        """
        if root == None:
            if self.rootdir == None:
                raise VCSnotRooted
            root = self.rootdir
        path = os.path.abspath(path)
        absRoot = os.path.abspath(root)
        absRootSlashedDir = os.path.join(absRoot,"")
        if not path.startswith(absRootSlashedDir):
            return False
        return True
    def _u_rel_path(self, path, root=None):
        """
        Return the relative path to path from root.
        >>> vcs = new()
        >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
        '.be'
        """
        if root == None:
            if self.rootdir == None:
                raise VCSnotRooted
            root = self.rootdir
        path = os.path.abspath(path)
        absRoot = os.path.abspath(root)
        absRootSlashedDir = os.path.join(absRoot,"")
        if not path.startswith(absRootSlashedDir):
            raise PathNotInRoot(path, absRootSlashedDir)
        assert path != absRootSlashedDir, \
            "file %s == root directory %s" % (path, absRootSlashedDir)
        relpath = path[len(absRootSlashedDir):]
        return relpath
    def _u_abspath(self, path, root=None):
        """
        Return the absolute path from a path realtive to root.
        >>> vcs = new()
        >>> vcs._u_abspath(".be", "/a.b/c")
        '/a.b/c/.be'
        """
        if root == None:
            assert self.rootdir != None, "VCS not rooted"
            root = self.rootdir
        return os.path.abspath(os.path.join(root, path))
    def _u_create_id(self, name, email=None):
        """
        >>> vcs = new()
        >>> vcs._u_create_id("John Doe", "jdoe@example.com")
        'John Doe <jdoe@example.com>'
        >>> vcs._u_create_id("John Doe")
        'John Doe'
        """
        assert len(name) > 0
        if email == None or len(email) == 0:
            return name
        else:
            return "%s <%s>" % (name, email)
    def _u_parse_id(self, value):
        """
        >>> vcs = new()
        >>> vcs._u_parse_id("John Doe <jdoe@example.com>")
        ('John Doe', 'jdoe@example.com')
        >>> vcs._u_parse_id("John Doe")
        ('John Doe', None)
        >>> try:
        ...     vcs._u_parse_id("John Doe <jdoe@example.com><what?>")
        ... except AssertionError:
        ...     print "Invalid match"
        Invalid match
        """
        emailexp = re.compile("(.*) <([^>]*)>(.*)")
        match = emailexp.search(value)
        if match == None:
            email = None
            name = value
        else:
            assert len(match.groups()) == 3
            assert match.groups()[2] == "", match.groups()
            email = match.groups()[1]
            name = match.groups()[0]
        assert name != None
        assert len(name) > 0
        return (name, email)
    def _u_get_fallback_username(self):
        name = None
        for envariable in ["LOGNAME", "USERNAME"]:
            if os.environ.has_key(envariable):
                name = os.environ[envariable]
                break
        assert name != None
        return name
    def _u_get_fallback_email(self):
        hostname = gethostname()
        name = self._u_get_fallback_username()
        return "%s@%s" % (name, hostname)
    def _u_parse_commitfile(self, commitfile):
        """
        Split the commitfile created in self.commit() back into
        summary and header lines.
        """
        f = codecs.open(commitfile, "r", self.encoding)
        summary = f.readline()
        body = f.read()
        body.lstrip('\n')
        if len(body) == 0:
            body = None
        f.close()
        return (summary, body)

    def check_disk_version(self):
        version = self.get_version()
        if version != upgrade.BUGDIR_DISK_VERSION:
            upgrade.upgrade(self.root, version)

    def disk_version(self, path=None, use_none_vcs=False,
                     for_duplicate_bugdir=False):
        """
        Requires disk access.
        """
        if path == None:
            path = self.get_path("version")
        allow_no_vcs = not VCS.path_in_root(path)
        if allow_no_vcs == True:
            assert for_duplicate_bugdir == True
        return self.get(path, allow_no_vcs=allow_no_vcs).rstrip("\n")

    def set_disk_version(self):
        """
        Requires disk access.
        """
        if self.sync_with_disk == False:
            raise DiskAccessRequired("set version")
        self.vcs.mkdir(self.get_path())
        self.vcs.set_file_contents(self.get_path("version"),
                                   upgrade.BUGDIR_DISK_VERSION+"\n")

        

if libbe.TESTING == True:
    def setup_vcs_test_fixtures(testcase):
        """Set up test fixtures for VCS test case."""
        testcase.vcs = testcase.Class()
        testcase.dir = Dir()
        testcase.dirname = testcase.dir.path

        vcs_not_supporting_uninitialized_user_id = []
        vcs_not_supporting_set_user_id = ["None", "hg"]
        testcase.vcs_supports_uninitialized_user_id = (
            testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id)
        testcase.vcs_supports_set_user_id = (
            testcase.vcs.name not in vcs_not_supporting_set_user_id)

        if not testcase.vcs.installed():
            testcase.fail(
                "%(name)s VCS not found" % vars(testcase.Class))

        if testcase.Class.name != "None":
            testcase.failIf(
                testcase.vcs.detect(testcase.dirname),
                "Detected %(name)s VCS before initialising"
                    % vars(testcase.Class))

        testcase.vcs.init(testcase.dirname)

    class VCSTestCase(unittest.TestCase):
        """Test cases for base VCS class."""

        Class = VCS

        def __init__(self, *args, **kwargs):
            super(VCSTestCase, self).__init__(*args, **kwargs)
            self.dirname = None

        def setUp(self):
            super(VCSTestCase, self).setUp()
            setup_vcs_test_fixtures(self)

        def tearDown(self):
            self.vcs.cleanup()
            self.dir.cleanup()
            super(VCSTestCase, self).tearDown()

        def full_path(self, rel_path):
            return os.path.join(self.dirname, rel_path)


    class VCS_init_TestCase(VCSTestCase):
        """Test cases for VCS.init method."""

        def test_detect_should_succeed_after_init(self):
            """Should detect VCS in directory after initialization."""
            self.failUnless(
                self.vcs.detect(self.dirname),
                "Did not detect %(name)s VCS after initialising"
                    % vars(self.Class))

        def test_vcs_rootdir_in_specified_root_path(self):
            """VCS root directory should be in specified root path."""
            rp = os.path.realpath(self.vcs.rootdir)
            dp = os.path.realpath(self.dirname)
            vcs_name = self.Class.name
            self.failUnless(
                dp == rp or rp == None,
                "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())


    class VCS_get_user_id_TestCase(VCSTestCase):
        """Test cases for VCS.get_user_id method."""

        def test_gets_existing_user_id(self):
            """Should get the existing user ID."""
            if not self.vcs_supports_uninitialized_user_id:
                return

            user_id = self.vcs.get_user_id()
            self.failUnless(
                user_id is not None,
                "unable to get a user id")


    class VCS_set_user_id_TestCase(VCSTestCase):
        """Test cases for VCS.set_user_id method."""

        def setUp(self):
            super(VCS_set_user_id_TestCase, self).setUp()

            if self.vcs_supports_uninitialized_user_id:
                self.prev_user_id = self.vcs.get_user_id()
            else:
                self.prev_user_id = "Uninitialized identity <bogus@example.org>"

            if self.vcs_supports_set_user_id:
                self.test_new_user_id = "John Doe <jdoe@example.com>"
                self.vcs.set_user_id(self.test_new_user_id)

        def tearDown(self):
            if self.vcs_supports_set_user_id:
                self.vcs.set_user_id(self.prev_user_id)
            super(VCS_set_user_id_TestCase, self).tearDown()

        def test_raises_error_in_unsupported_vcs(self):
            """Should raise an error in a VCS that doesn't support it."""
            if self.vcs_supports_set_user_id:
                return
            self.assertRaises(
                SettingIDnotSupported,
                self.vcs.set_user_id, "foo")

        def test_updates_user_id_in_supporting_vcs(self):
            """Should update the user ID in an VCS that supports it."""
            if not self.vcs_supports_set_user_id:
                return
            user_id = self.vcs.get_user_id()
            self.failUnlessEqual(
                self.test_new_user_id, user_id,
                "user id not set correctly (expected %s, got %s)"
                    % (self.test_new_user_id, user_id))


    def setup_vcs_revision_test_fixtures(testcase):
        """Set up revision test fixtures for VCS test case."""
        testcase.test_dirs = ['a', 'a/b', 'c']
        for path in testcase.test_dirs:
            testcase.vcs.mkdir(testcase.full_path(path))

        testcase.test_files = ['a/text', 'a/b/text']

        testcase.test_contents = {
            'rev_1': "Lorem ipsum",
            'uncommitted': "dolor sit amet",
            }


    class VCS_mkdir_TestCase(VCSTestCase):
        """Test cases for VCS.mkdir method."""

        def setUp(self):
            super(VCS_mkdir_TestCase, self).setUp()
            setup_vcs_revision_test_fixtures(self)

        def tearDown(self):
            for path in reversed(sorted(self.test_dirs)):
                self.vcs.recursive_remove(self.full_path(path))
            super(VCS_mkdir_TestCase, self).tearDown()

        def test_mkdir_creates_directory(self):
            """Should create specified directory in filesystem."""
            for path in self.test_dirs:
                full_path = self.full_path(path)
                self.failUnless(
                    os.path.exists(full_path),
                    "path %(full_path)s does not exist" % vars())


    class VCS_commit_TestCase(VCSTestCase):
        """Test cases for VCS.commit method."""

        def setUp(self):
            super(VCS_commit_TestCase, self).setUp()
            setup_vcs_revision_test_fixtures(self)

        def tearDown(self):
            for path in reversed(sorted(self.test_dirs)):
                self.vcs.recursive_remove(self.full_path(path))
            super(VCS_commit_TestCase, self).tearDown()

        def test_file_contents_as_specified(self):
            """Should set file contents as specified."""
            test_contents = self.test_contents['rev_1']
            for path in self.test_files:
                full_path = self.full_path(path)
                self.vcs.set_file_contents(full_path, test_contents)
                current_contents = self.vcs.get_file_contents(full_path)
                self.failUnlessEqual(test_contents, current_contents)

        def test_file_contents_as_committed(self):
            """Should have file contents as specified after commit."""
            test_contents = self.test_contents['rev_1']
            for path in self.test_files:
                full_path = self.full_path(path)
                self.vcs.set_file_contents(full_path, test_contents)
                revision = self.vcs.commit("Initial file contents.")
                current_contents = self.vcs.get_file_contents(full_path)
                self.failUnlessEqual(test_contents, current_contents)

        def test_file_contents_as_set_when_uncommitted(self):
            """Should set file contents as specified after commit."""
            if not self.vcs.versioned:
                return
            for path in self.test_files:
                full_path = self.full_path(path)
                self.vcs.set_file_contents(
                    full_path, self.test_contents['rev_1'])
                revision = self.vcs.commit("Initial file contents.")
                self.vcs.set_file_contents(
                    full_path, self.test_contents['uncommitted'])
                current_contents = self.vcs.get_file_contents(full_path)
                self.failUnlessEqual(
                    self.test_contents['uncommitted'], current_contents)

        def test_revision_file_contents_as_committed(self):
            """Should get file contents as committed to specified revision."""
            if not self.vcs.versioned:
                return
            for path in self.test_files:
                full_path = self.full_path(path)
                self.vcs.set_file_contents(
                    full_path, self.test_contents['rev_1'])
                revision = self.vcs.commit("Initial file contents.")
                self.vcs.set_file_contents(
                    full_path, self.test_contents['uncommitted'])
                committed_contents = self.vcs.get_file_contents(
                    full_path, revision)
                self.failUnlessEqual(
                    self.test_contents['rev_1'], committed_contents)

        def test_revision_id_as_committed(self):
            """Check for compatibility between .commit() and .revision_id()"""
            if not self.vcs.versioned:
                self.failUnlessEqual(self.vcs.revision_id(5), None)
                return
            committed_revisions = []
            for path in self.test_files:
                full_path = self.full_path(path)
                self.vcs.set_file_contents(
                    full_path, self.test_contents['rev_1'])
                revision = self.vcs.commit("Initial %s contents." % path)
                committed_revisions.append(revision)
                self.vcs.set_file_contents(
                    full_path, self.test_contents['uncommitted'])
                revision = self.vcs.commit("Altered %s contents." % path)
                committed_revisions.append(revision)
            for i,revision in enumerate(committed_revisions):
                self.failUnlessEqual(self.vcs.revision_id(i), revision)
                i += -len(committed_revisions) # check negative indices
                self.failUnlessEqual(self.vcs.revision_id(i), revision)
            i = len(committed_revisions)
            self.failUnlessEqual(self.vcs.revision_id(i), None)
            self.failUnlessEqual(self.vcs.revision_id(-i-1), None)

        def test_revision_id_as_committed(self):
            """Check revision id before first commit"""
            if not self.vcs.versioned:
                self.failUnlessEqual(self.vcs.revision_id(5), None)
                return
            committed_revisions = []
            for path in self.test_files:
                self.failUnlessEqual(self.vcs.revision_id(0), None)


    class VCS_duplicate_repo_TestCase(VCSTestCase):
        """Test cases for VCS.duplicate_repo method."""

        def setUp(self):
            super(VCS_duplicate_repo_TestCase, self).setUp()
            setup_vcs_revision_test_fixtures(self)

        def tearDown(self):
            self.vcs.remove_duplicate_repo()
            for path in reversed(sorted(self.test_dirs)):
                self.vcs.recursive_remove(self.full_path(path))
            super(VCS_duplicate_repo_TestCase, self).tearDown()

        def test_revision_file_contents_as_committed(self):
            """Should match file contents as committed to specified revision.
            """
            if not self.vcs.versioned:
                return
            for path in self.test_files:
                full_path = self.full_path(path)
                self.vcs.set_file_contents(
                    full_path, self.test_contents['rev_1'])
                revision = self.vcs.commit("Commit current status")
                self.vcs.set_file_contents(
                    full_path, self.test_contents['uncommitted'])
                dup_repo_path = self.vcs.duplicate_repo(revision)
                dup_file_path = os.path.join(dup_repo_path, path)
                dup_file_contents = file(dup_file_path, 'rb').read()
                self.failUnlessEqual(
                    self.test_contents['rev_1'], dup_file_contents)
                self.vcs.remove_duplicate_repo()


    def make_vcs_testcase_subclasses(vcs_class, namespace):
        """Make VCSTestCase subclasses for vcs_class in the namespace."""
        vcs_testcase_classes = [
            c for c in (
                ob for ob in globals().values() if isinstance(ob, type))
            if issubclass(c, VCSTestCase)]

        for base_class in vcs_testcase_classes:
            testcase_class_name = vcs_class.__name__ + base_class.__name__
            testcase_class_bases = (base_class,)
            testcase_class_dict = dict(base_class.__dict__)
            testcase_class_dict['Class'] = vcs_class
            testcase_class = type(
                testcase_class_name, testcase_class_bases, testcase_class_dict)
            setattr(namespace, testcase_class_name, testcase_class)


    unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
    suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])