# Copyright (C) 2008-2012 Ben Finney # Chris Ball # Gianluca Montecchi # Robert Lehmann # W. Trevor King # # This file is part of Bugs Everywhere. # # Bugs Everywhere is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation, either version 2 of the License, or (at your option) any # later version. # # Bugs Everywhere is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # Bugs Everywhere. If not, see . """Git_ backend. .. _Git: http://git-scm.com/ """ import os import os.path import re import shutil import unittest try: import pygit2 as _pygit2 except ImportError as error: _pygit2 = None _pygit2_import_error = error else: if getattr(_pygit2, '__version__', '0.17.3') == '0.17.3': _pygit2 = None _pygit2_import_error = NotImplementedError( 'pygit2 <= 0.17.3 not supported') import libbe from ...ui.util import user as _user from ...util import encoding as _encoding from ..base import EmptyCommit as _EmptyCommit from . import base if libbe.TESTING == True: import doctest import sys def new(): if _pygit2: return PygitGit() else: return ExecGit() class PygitGit(base.VCS): """:py:class:`base.VCS` implementation for Git. Using :py:mod:`pygit2` for the Git activity. """ name='pygit2' _null_hex = '0' * 40 def __init__(self, *args, **kwargs): base.VCS.__init__(self, *args, **kwargs) self.versioned = True self._pygit_repository = None def __getstate__(self): """`pygit2.Repository`\s don't seem to pickle well. """ attrs = dict(self.__dict__) if self._pygit_repository is not None: attrs['_pygit_repository'] = self._pygit_repository.path return attrs def __setstate__(self, state): """`pygit2.Repository`\s don't seem to pickle well. """ self.__dict__.update(state) if self._pygit_repository is not None: gitdir = self._pygit_repository self._pygit_repository = _pygit2.Repository(gitdir) def _vcs_version(self): if _pygit2: return getattr(_pygit2, '__verison__', '?') return None def _vcs_get_user_id(self): try: name = self._pygit_repository.config['user.name'] except KeyError: name = '' try: email = self._pygit_repository.config['user.email'] except KeyError: email = '' if name != '' or email != '': # got something! # guess missing info, if necessary if name == '': name = _user.get_fallback_fullname() if email == '': email = _user.get_fallback_email() if '@' not in email: raise ValueError((name, email)) return _user.create_user_id(name, email) return None # Git has no infomation def _vcs_detect(self, path): try: _pygit2.discover_repository(path) except KeyError: return False return True def _vcs_root(self, path): """Find the root of the deepest repository containing path.""" # Assume that nothing funny is going on; in particular, that we aren't # dealing with a bare repo. gitdir = _pygit2.discover_repository(path) self._pygit_repository = _pygit2.Repository(gitdir) dirname,tip = os.path.split(gitdir) if tip == '': # split('x/y/z/.git/') == ('x/y/z/.git', '') dirname,tip = os.path.split(dirname) assert tip == '.git', tip return dirname def _vcs_init(self, path): bare = False self._pygit_repository = _pygit2.init_repository(path, bare) def _vcs_destroy(self): vcs_dir = os.path.join(self.repo, '.git') if os.path.exists(vcs_dir): shutil.rmtree(vcs_dir) def _vcs_add(self, path): abspath = self._u_abspath(path) if os.path.isdir(abspath): return self._pygit_repository.index.read() self._pygit_repository.index.add(path) self._pygit_repository.index.write() def _vcs_remove(self, path): abspath = self._u_abspath(path) if not os.path.isdir(self._u_abspath(abspath)): self._pygit_repository.index.read() del self._pygit_repository.index[path] self._pygit_repository.index.write() os.remove(os.path.join(self.repo, path)) def _vcs_update(self, path): self._vcs_add(path) def _git_get_commit(self, revision): if isinstance(revision, str): revision = str(revision, 'ascii') commit = self._pygit_repository.revparse_single(revision) assert commit.type == _pygit2.GIT_OBJ_COMMIT, commit return commit def _git_get_object(self, path, revision): commit = self._git_get_commit(revision=revision) tree = commit.tree sections = path.split(os.path.sep) for section in sections[:-1]: # traverse trees child_tree = None for entry in tree: if entry.name == section: eobj = entry.to_object() if eobj.type == _pygit2.GIT_OBJ_TREE: child_tree = eobj break else: raise ValueError(path) # not a directory if child_tree is None: raise ValueError((path, sections, section, [e.name for e in tree])) tree = child_tree eobj = None for entry in tree: if entry.name == sections[-1]: eobj = entry.to_object() return eobj def _vcs_get_file_contents(self, path, revision=None): if revision is None: return base.VCS._vcs_get_file_contents(self, path, revision) else: blob = self._git_get_object(path=path, revision=revision) if blob.type != _pygit2.GIT_OBJ_BLOB: raise ValueError(path) # not a file return blob.read_raw() def _vcs_path(self, id, revision): return self._u_find_id(id, revision) def _vcs_isdir(self, path, revision): obj = self._git_get_object(path=path, revision=revision) return obj.type == _pygit2.GIT_OBJ_TREE def _vcs_listdir(self, path, revision): tree = self._git_get_object(path=path, revision=revision) assert tree.type == _pygit2.GIT_OBJ_TREE, tree return [e.name for e in tree] def _vcs_commit(self, commitfile, allow_empty=False): self._pygit_repository.index.read() tree_oid = self._pygit_repository.index.write_tree() try: self._pygit_repository.head except _pygit2.GitError: # no head; this is the first commit parents = [] tree = self._pygit_repository[tree_oid] if not allow_empty and len(tree) == 0: raise _EmptyCommit() else: parents = [self._pygit_repository.head.oid] if (not allow_empty and tree_oid == self._pygit_repository.head.tree.oid): raise _EmptyCommit() update_ref = 'HEAD' user_id = self.get_user_id() name,email = _user.parse_user_id(user_id) # using default times is recent, see # https://github.com/libgit2/pygit2/pull/129 author = _pygit2.Signature(name, email) committer = author message = _encoding.get_file_contents(commitfile, decode=False) encoding = _encoding.get_text_file_encoding() commit_oid = self._pygit_repository.create_commit( update_ref, author, committer, message, tree_oid, parents, encoding) commit = self._pygit_repository[commit_oid] return commit.hex def _vcs_revision_id(self, index): walker = self._pygit_repository.walk( self._pygit_repository.head.oid, _pygit2.GIT_SORT_TIME) if index < 0: target_i = -1 - index # -1: 0, -2: 1, ... for i,commit in enumerate(walker): if i == target_i: return commit.hex elif index > 0: revisions = [commit.hex for commit in walker] # revisions is [newest, older, ..., oldest] if index > len(revisions): return None return revisions[len(revisions) - index] else: raise NotImplementedError('initial revision') return None def _vcs_changed(self, revision): commit = self._git_get_commit(revision=revision) diff = commit.tree.diff(self._pygit_repository.head.tree) new = set() modified = set() removed = set() for hunk in diff.changes['hunks']: if hunk.old_oid == self._null_hex: # pygit2 uses hex in hunk.*_oid new.add(hunk.new_file) elif hunk.new_oid == self._null_hex: removed.add(hunk.old_file) else: modified.add(hunk.new_file) return (list(new), list(modified), list(removed)) class ExecGit (PygitGit): """:py:class:`base.VCS` implementation for Git. """ name='git' client='git' def _vcs_version(self): try: status,output,error = self._u_invoke_client('--version') except base.CommandError: # command not found? return None return output.strip() def _vcs_get_user_id(self): status,output,error = self._u_invoke_client( 'config', 'user.name', expect=(0,1)) if status == 0: name = output.rstrip('\n') else: name = '' status,output,error = self._u_invoke_client( 'config', 'user.email', expect=(0,1)) if status == 0: email = output.rstrip('\n') else: email = '' if name != '' or email != '': # got something! # guess missing info, if necessary if name == '': name = _user.get_fallback_fullname() if email == '': email = _user.get_fallback_email() return _user.create_user_id(name, email) return None # Git has no infomation def _vcs_detect(self, path): if self._u_search_parent_directories(path, '.git') is not None: return True return False def _vcs_root(self, path): """Find the root of the deepest repository containing path.""" # Assume that nothing funny is going on; in particular, that we aren't # dealing with a bare repo. if os.path.isdir(path) != True: path = os.path.dirname(path) status,output,error = self._u_invoke_client('rev-parse', '--git-dir', cwd=path) gitdir = os.path.join(path, output.rstrip('\n')) dirname = os.path.abspath(os.path.dirname(gitdir)) return dirname def _vcs_init(self, path): self._u_invoke_client('init', cwd=path) def _vcs_add(self, path): if os.path.isdir(path): return self._u_invoke_client('add', path) def _vcs_remove(self, path): if not os.path.isdir(self._u_abspath(path)): self._u_invoke_client('rm', '-f', path) def _vcs_get_file_contents(self, path, revision=None): if revision is None: return base.VCS._vcs_get_file_contents(self, path, revision) else: arg = '%s:%s' % (revision,path) status,output,error = self._u_invoke_client('show', arg) return output def _vcs_isdir(self, path, revision): arg = '%s:%s' % (revision,path) args = ['ls-tree', arg] kwargs = {'expect':(0,128)} status,output,error = self._u_invoke_client(*args, **kwargs) if status != 0: if 'not a tree object' in error: return False raise base.CommandError(args, status, stderr=error) return True def _vcs_listdir(self, path, revision): arg = '%s:%s' % (revision,path) status,output,error = self._u_invoke_client( 'ls-tree', '--name-only', arg) return output.rstrip('\n').splitlines() def _vcs_commit(self, commitfile, allow_empty=False): args = ['commit', '--file', commitfile] if allow_empty == True: args.append('--allow-empty') status,output,error = self._u_invoke_client(*args) else: kwargs = {'expect':(0,1)} status,output,error = self._u_invoke_client(*args, **kwargs) strings = ['nothing to commit', 'nothing added to commit'] if self._u_any_in_string(strings, output) == True: raise base.EmptyCommit() full_revision = self._vcs_revision_id(-1) assert full_revision[:7] in output, \ 'Mismatched revisions:\n%s\n%s' % (full_revision, output) return full_revision def _vcs_revision_id(self, index): args = ['rev-list', '--first-parent', '--reverse', 'HEAD'] kwargs = {'expect':(0,128)} status,output,error = self._u_invoke_client(*args, **kwargs) if status == 128: if error.startswith("fatal: ambiguous argument 'HEAD': unknown "): return None raise base.CommandError(args, status, stderr=error) revisions = output.splitlines() try: if index > 0: return revisions[index-1] elif index < 0: return revisions[index] else: return None except IndexError: return None def _diff(self, revision): status,output,error = self._u_invoke_client('diff', revision) return output def _parse_diff(self, diff_text): """_parse_diff(diff_text) -> (new,modified,removed) `new`, `modified`, and `removed` are lists of files. Example diff text:: diff --git a/dir/changed b/dir/changed index 6c3ea8c..2f2f7c7 100644 --- a/dir/changed +++ b/dir/changed @@ -1,3 +1,3 @@ hi -there +everyone and joe diff --git a/dir/deleted b/dir/deleted deleted file mode 100644 index 225ec04..0000000 --- a/dir/deleted +++ /dev/null @@ -1,3 +0,0 @@ -in -the -beginning diff --git a/dir/moved b/dir/moved deleted file mode 100644 index 5ef102f..0000000 --- a/dir/moved +++ /dev/null @@ -1,4 +0,0 @@ -the -ants -go -marching diff --git a/dir/moved2 b/dir/moved2 new file mode 100644 index 0000000..5ef102f --- /dev/null +++ b/dir/moved2 @@ -0,0 +1,4 @@ +the +ants +go +marching diff --git a/dir/new b/dir/new new file mode 100644 index 0000000..94954ab --- /dev/null +++ b/dir/new @@ -0,0 +1,2 @@ +hello +world """ new = [] modified = [] removed = [] lines = diff_text.splitlines() for i,line in enumerate(lines): if not line.startswith('diff '): continue file_a,file_b = line.split()[-2:] assert file_a.startswith('a/'), \ 'missformed file_a %s' % file_a assert file_b.startswith('b/'), \ 'missformed file_b %s' % file_b file = file_a[2:] assert file_b[2:] == file, \ 'diff file missmatch %s != %s' % (file_a, file_b) if lines[i+1].startswith('new '): new.append(file) elif lines[i+1].startswith('index '): modified.append(file) elif lines[i+1].startswith('deleted '): removed.append(file) return (new,modified,removed) def _vcs_changed(self, revision): return self._parse_diff(self._diff(revision)) if libbe.TESTING == True: base.make_vcs_testcase_subclasses(PygitGit, sys.modules[__name__]) base.make_vcs_testcase_subclasses(ExecGit, sys.modules[__name__]) unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])