aboutsummaryrefslogblamecommitdiffstats
path: root/libbe/storage/vcs/git.py
blob: 851af19a4983a8717ff0edd2cf47e38ba68d1f99 (plain) (tree)
1
2
3
4
5
6
7
8
                                                              
                                                     
                                                           
                                                                
                                                           
 
                                       
 



                                                                               
 
                                                                        


                                                                               
 

                                                                              
 


                            

   
         
              
         
             
               
 




                                
     
                                                             

                                                   
                                             
 
            
                                    

                                              
                  
 

                         
              
 

          



                         
 




                                                   
       

                         



                                                
















































































































                                                                                   




























































































                                                                               
                                                   


                
 
                           

                                                                    
                                                       
                       
                             

                               

                                                    



                                      

                                                    






                                                      
                                                    
                           

                                                    

                                           
                                
                                                                    
                       
                    
 
                              




                                                                              
                                                                             
                                                             


                                                          
 
                              

                                               
                             

                               

                                          
                                
                                                    
                                                   
                                                          
                            
                                                                        
             

                                                                    
                         
 


                                         

                                                                    











                                                               
                                                         
                                               
                               
                                        

                                                              
                                     
                                                                        

                                                 
                                                              
                                        
                                                 

                                                                     
                            
 
                                      

                                                                  



                                                                              
                                                               
                                       
            
                         
                                         
                           
                                       

                           

                          
 




                                                                     



















































                                                            











                                               
                                               













                                                                 
 
                         

                                                                      
 

                                                                               
# Copyright (C) 2008-2012 Ben Finney <benf@cybersource.com.au>
#                         Chris Ball <cjb@laptop.org>
#                         Gianluca Montecchi <gian@grys.it>
#                         Robert Lehmann <mail@robertlehmann.de>
#                         W. Trevor King <wking@tremily.us>
#
# This file is part of Bugs Everywhere.
#
# Bugs Everywhere is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 2 of the License, or (at your option) any
# later version.
#
# Bugs Everywhere is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Bugs Everywhere.  If not, see <http://www.gnu.org/licenses/>.

"""Git_ backend.

.. _Git: http://git-scm.com/
"""

import os
import os.path
import re
import shutil
import unittest

try:
    import pygit2 as _pygit2
except ImportError, error:
    _pygit2 = None
    _pygit2_import_error = error
else:
    if getattr(_pygit2, '__version__', '0.17.3') == '0.17.3':
        _pygit2 = None
        _pygit2_import_error = NotImplementedError(
            'pygit2 <= 0.17.3 not supported')

import libbe
from ...ui.util import user as _user
from ...util import encoding as _encoding
from ..base import EmptyCommit as _EmptyCommit
from . import base

if libbe.TESTING == True:
    import doctest
    import sys


def new():
    if _pygit2:
        return PygitGit()
    else:
        return ExecGit()


class PygitGit(base.VCS):
    """:py:class:`base.VCS` implementation for Git.

    Using :py:mod:`pygit2` for the Git activity.
    """
    name='pygit2'
    _null_hex = u'0' * 40

    def __init__(self, *args, **kwargs):
        base.VCS.__init__(self, *args, **kwargs)
        self.versioned = True
        self._pygit_repository = None

    def __getstate__(self):
        """`pygit2.Repository`\s don't seem to pickle well.
        """
        attrs = dict(self.__dict__)
        if self._pygit_repository is not None:
            attrs['_pygit_repository'] = self._pygit_repository.path
        return attrs

    def __setstate__(self, state):
        """`pygit2.Repository`\s don't seem to pickle well.
        """
        self.__dict__.update(state)
        if self._pygit_repository is not None:
            gitdir = self._pygit_repository
            self._pygit_repository = _pygit2.Repository(gitdir)

    def _vcs_version(self):
        if _pygit2:
            return getattr(_pygit2, '__verison__', '?')
        return None

    def _vcs_get_user_id(self):
        try:
            name = self._pygit_repository.config['user.name']
        except KeyError:
            name = ''
        try:
            email = self._pygit_repository.config['user.email']
        except KeyError:
            email = ''
        if name != '' or email != '': # got something!
            # guess missing info, if necessary
            if name == '':
                name = _user.get_fallback_fullname()
            if email == '':
                email = _user.get_fallback_email()
            if '@' not in email:
                raise ValueError((name, email))
            return _user.create_user_id(name, email)
        return None # Git has no infomation

    def _vcs_detect(self, path):
        try:
            _pygit2.discover_repository(path)
        except KeyError:
            return False
        return True

    def _vcs_root(self, path):
        """Find the root of the deepest repository containing path."""
        # Assume that nothing funny is going on; in particular, that we aren't
        # dealing with a bare repo.
        gitdir = _pygit2.discover_repository(path)
        self._pygit_repository = _pygit2.Repository(gitdir)
        dirname,tip = os.path.split(gitdir)
        if tip == '':  # split('x/y/z/.git/') == ('x/y/z/.git', '')
            dirname,tip = os.path.split(dirname)
        assert tip == '.git', tip
        return dirname

    def _vcs_init(self, path):
        bare = False
        self._pygit_repository = _pygit2.init_repository(path, bare)

    def _vcs_destroy(self):
        vcs_dir = os.path.join(self.repo, '.git')
        if os.path.exists(vcs_dir):
            shutil.rmtree(vcs_dir)

    def _vcs_add(self, path):
        abspath = self._u_abspath(path)
        if os.path.isdir(abspath):
            return
        self._pygit_repository.index.read()
        self._pygit_repository.index.add(path)
        self._pygit_repository.index.write()

    def _vcs_remove(self, path):
        abspath = self._u_abspath(path)
        if not os.path.isdir(self._u_abspath(abspath)):
            self._pygit_repository.index.read()
            del self._pygit_repository.index[path]
            self._pygit_repository.index.write()
            os.remove(os.path.join(self.repo, path))

    def _vcs_update(self, path):
        self._vcs_add(path)

    def _git_get_commit(self, revision):
        if isinstance(revision, str):
            revision = unicode(revision, 'ascii')
        commit = self._pygit_repository.revparse_single(revision)
        assert commit.type == _pygit2.GIT_OBJ_COMMIT, commit
        return commit

    def _git_get_object(self, path, revision):
        commit = self._git_get_commit(revision=revision)
        tree = commit.tree
        sections = path.split(os.path.sep)
        for section in sections[:-1]:  # traverse trees
            child_tree = None
            for entry in tree:
                if entry.name == section:
                    eobj = entry.to_object()
                    if eobj.type == _pygit2.GIT_OBJ_TREE:
                        child_tree = eobj
                        break
                    else:
                        raise ValueError(path)  # not a directory
            if child_tree is None:
                raise ValueError((path, sections, section, [e.name for e in tree]))
            tree = child_tree
        eobj = None
        for entry in tree:
            if entry.name == sections[-1]:
                eobj = entry.to_object()
        return eobj

    def _vcs_get_file_contents(self, path, revision=None):
        if revision == None:
            return base.VCS._vcs_get_file_contents(self, path, revision)
        else:
            blob = self._git_get_object(path=path, revision=revision)
            if blob.type != _pygit2.GIT_OBJ_BLOB:
                raise ValueError(path)  # not a file
            return blob.read_raw()

    def _vcs_path(self, id, revision):
        return self._u_find_id(id, revision)

    def _vcs_isdir(self, path, revision):
        obj = self._git_get_object(path=path, revision=revision)
        return obj.type == _pygit2.GIT_OBJ_TREE

    def _vcs_listdir(self, path, revision):
        tree = self._git_get_object(path=path, revision=revision)
        assert tree.type == _pygit2.GIT_OBJ_TREE, tree
        return [e.name for e in tree]

    def _vcs_commit(self, commitfile, allow_empty=False):
        self._pygit_repository.index.read()
        tree_oid = self._pygit_repository.index.write_tree()
        try:
            self._pygit_repository.head
        except _pygit2.GitError:  # no head; this is the first commit
            parents = []
            tree = self._pygit_repository[tree_oid]
            if not allow_empty and len(tree) == 0:
                raise _EmptyCommit()
        else:
            parents = [self._pygit_repository.head.oid]
            if (not allow_empty and
                tree_oid == self._pygit_repository.head.tree.oid):
                raise _EmptyCommit()
        update_ref = 'HEAD'
        user_id = self.get_user_id()
        name,email = _user.parse_user_id(user_id)
        # using default times is recent, see
        #   https://github.com/libgit2/pygit2/pull/129
        author = _pygit2.Signature(name, email)
        committer = author
        message = _encoding.get_file_contents(commitfile, decode=False)
        encoding = _encoding.get_text_file_encoding()
        commit_oid = self._pygit_repository.create_commit(
            update_ref, author, committer, message, tree_oid, parents,
            encoding)
        commit = self._pygit_repository[commit_oid]
        return commit.hex

    def _vcs_revision_id(self, index):
        walker = self._pygit_repository.walk(
            self._pygit_repository.head.oid, _pygit2.GIT_SORT_TIME)
        if index < 0:
            target_i = -1 - index  # -1: 0, -2: 1, ...
            for i,commit in enumerate(walker):
                if i == target_i:
                    return commit.hex
        elif index > 0:
            revisions = [commit.hex for commit in walker]
            # revisions is [newest, older, ..., oldest]
            if index > len(revisions):
                return None
            return revisions[len(revisions) - index]
        else:
            raise NotImplementedError('initial revision')
        return None

    def _vcs_changed(self, revision):
        commit = self._git_get_commit(revision=revision)
        diff = commit.tree.diff(self._pygit_repository.head.tree)
        new = set()
        modified = set()
        removed = set()
        for hunk in diff.changes['hunks']:
            if hunk.old_oid == self._null_hex:  # pygit2 uses hex in hunk.*_oid
                new.add(hunk.new_file)
            elif hunk.new_oid == self._null_hex:
                removed.add(hunk.old_file)
            else:
                modified.add(hunk.new_file)
        return (list(new), list(modified), list(removed))


class ExecGit (PygitGit):
    """:py:class:`base.VCS` implementation for Git.
    """
    name='git'
    client='git'

    def _vcs_version(self):
        try:
            status,output,error = self._u_invoke_client('--version')
        except base.CommandError:  # command not found?
            return None
        return output.strip()

    def _vcs_get_user_id(self):
        status,output,error = self._u_invoke_client(
            'config', 'user.name', expect=(0,1))
        if status == 0:
            name = output.rstrip('\n')
        else:
            name = ''
        status,output,error = self._u_invoke_client(
            'config', 'user.email', expect=(0,1))
        if status == 0:
            email = output.rstrip('\n')
        else:
            email = ''
        if name != '' or email != '': # got something!
            # guess missing info, if necessary
            if name == '':
                name = _user.get_fallback_fullname()
            if email == '':
                email = _user.get_fallback_email()
            return _user.create_user_id(name, email)
        return None # Git has no infomation

    def _vcs_detect(self, path):
        if self._u_search_parent_directories(path, '.git') != None :
            return True
        return False

    def _vcs_root(self, path):
        """Find the root of the deepest repository containing path."""
        # Assume that nothing funny is going on; in particular, that we aren't
        # dealing with a bare repo.
        if os.path.isdir(path) != True:
            path = os.path.dirname(path)
        status,output,error = self._u_invoke_client('rev-parse', '--git-dir',
                                                    cwd=path)
        gitdir = os.path.join(path, output.rstrip('\n'))
        dirname = os.path.abspath(os.path.dirname(gitdir))
        return dirname

    def _vcs_init(self, path):
        self._u_invoke_client('init', cwd=path)

    def _vcs_add(self, path):
        if os.path.isdir(path):
            return
        self._u_invoke_client('add', path)

    def _vcs_remove(self, path):
        if not os.path.isdir(self._u_abspath(path)):
            self._u_invoke_client('rm', '-f', path)
    def _vcs_get_file_contents(self, path, revision=None):
        if revision == None:
            return base.VCS._vcs_get_file_contents(self, path, revision)
        else:
            arg = '%s:%s' % (revision,path)
            status,output,error = self._u_invoke_client('show', arg)
            return output

    def _vcs_isdir(self, path, revision):
        arg = '%s:%s' % (revision,path)
        args = ['ls-tree', arg]
        kwargs = {'expect':(0,128)}
        status,output,error = self._u_invoke_client(*args, **kwargs)
        if status != 0:
            if 'not a tree object' in error:
                return False
            raise base.CommandError(args, status, stderr=error)
        return True

    def _vcs_listdir(self, path, revision):
        arg = '%s:%s' % (revision,path)
        status,output,error = self._u_invoke_client(
            'ls-tree', '--name-only', arg)
        return output.rstrip('\n').splitlines()

    def _vcs_commit(self, commitfile, allow_empty=False):
        args = ['commit', '--file', commitfile]
        if allow_empty == True:
            args.append('--allow-empty')
            status,output,error = self._u_invoke_client(*args)
        else:
            kwargs = {'expect':(0,1)}
            status,output,error = self._u_invoke_client(*args, **kwargs)
            strings = ['nothing to commit',
                       'nothing added to commit']
            if self._u_any_in_string(strings, output) == True:
                raise base.EmptyCommit()
        full_revision = self._vcs_revision_id(-1)
        assert full_revision[:7] in output, \
            'Mismatched revisions:\n%s\n%s' % (full_revision, output)
        return full_revision

    def _vcs_revision_id(self, index):
        args = ['rev-list', '--first-parent', '--reverse', 'HEAD']
        kwargs = {'expect':(0,128)}
        status,output,error = self._u_invoke_client(*args, **kwargs)
        if status == 128:
            if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
                return None
            raise base.CommandError(args, status, stderr=error)
        revisions = output.splitlines()
        try:
            if index > 0:
                return revisions[index-1]
            elif index < 0:
                return revisions[index]
            else:
                return None
        except IndexError:
            return None

    def _diff(self, revision):
        status,output,error = self._u_invoke_client('diff', revision)
        return output

    def _parse_diff(self, diff_text):
        """_parse_diff(diff_text) -> (new,modified,removed)

        `new`, `modified`, and `removed` are lists of files.

        Example diff text::

          diff --git a/dir/changed b/dir/changed
          index 6c3ea8c..2f2f7c7 100644
          --- a/dir/changed
          +++ b/dir/changed
          @@ -1,3 +1,3 @@
           hi
          -there
          +everyone and
           joe
          diff --git a/dir/deleted b/dir/deleted
          deleted file mode 100644
          index 225ec04..0000000
          --- a/dir/deleted
          +++ /dev/null
          @@ -1,3 +0,0 @@
          -in
          -the
          -beginning
          diff --git a/dir/moved b/dir/moved
          deleted file mode 100644
          index 5ef102f..0000000
          --- a/dir/moved
          +++ /dev/null
          @@ -1,4 +0,0 @@
          -the
          -ants
          -go
          -marching
          diff --git a/dir/moved2 b/dir/moved2
          new file mode 100644
          index 0000000..5ef102f
          --- /dev/null
          +++ b/dir/moved2
          @@ -0,0 +1,4 @@
          +the
          +ants
          +go
          +marching
          diff --git a/dir/new b/dir/new
          new file mode 100644
          index 0000000..94954ab
          --- /dev/null
          +++ b/dir/new
          @@ -0,0 +1,2 @@
          +hello
          +world
        """
        new = []
        modified = []
        removed = []
        lines = diff_text.splitlines()
        for i,line in enumerate(lines):
            if not line.startswith('diff '):
                continue
            file_a,file_b = line.split()[-2:]
            assert file_a.startswith('a/'), \
                'missformed file_a %s' % file_a
            assert file_b.startswith('b/'), \
                'missformed file_b %s' % file_b
            file = file_a[2:]
            assert file_b[2:] == file, \
                'diff file missmatch %s != %s' % (file_a, file_b)
            if lines[i+1].startswith('new '):
                new.append(file)
            elif lines[i+1].startswith('index '):
                modified.append(file)
            elif lines[i+1].startswith('deleted '):
                removed.append(file)
        return (new,modified,removed)

    def _vcs_changed(self, revision):
        return self._parse_diff(self._diff(revision))


if libbe.TESTING == True:
    base.make_vcs_testcase_subclasses(PygitGit, sys.modules[__name__])
    base.make_vcs_testcase_subclasses(ExecGit, sys.modules[__name__])

    unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
    suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])