aboutsummaryrefslogblamecommitdiffstats
path: root/libbe/storage/vcs/monotone.py
blob: 2bb5fd4ec8196ac76d0f5c7da7bbe95cc6ba4a48 (plain) (tree)

















































































































































































































































































































































































                                                                                                                 
# Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
#
# This file is part of Bugs Everywhere.
#
# Bugs Everywhere is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 2 of the License, or (at your
# option) any later version.
#
# Bugs Everywhere is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Bugs Everywhere.  If not, see <http://www.gnu.org/licenses/>.

"""Monotone_ backend.

.. _Monotone: http://www.monotone.ca/
"""

import os
import os.path
import random
import re
import shutil
import unittest

import libbe
import libbe.ui.util.user
from libbe.util.subproc import CommandError
import base

if libbe.TESTING == True:
    import doctest
    import sys


def new():
    return Monotone()

class Monotone (base.VCS):
    """:class:`base.VCS` implementation for Monotone.
    """
    name='monotone'
    client='mtn'

    def __init__(self, *args, **kwargs):
        base.VCS.__init__(self, *args, **kwargs)
        self.versioned = True
        self._db_path = None
        self._key_dir = None
        self._key = None

    def _vcs_version(self):
        status,output,error = self._u_invoke_client('automate', 'interface_version')
        return output.strip()

    def version_cmp(self, *args):
        """Compare the installed Monotone version `V_i` with another
        version `V_o` (given in `*args`).  Returns

           === ===============
            1  if `V_i > V_o`
            0  if `V_i == V_o`
           -1  if `V_i < V_o`
           === ===============

        Examples
        --------

        >>> m = Monotone(repo='.')
        >>> m._version = '7.1'
        >>> m.version_cmp(7, 1)
        0
        >>> m.version_cmp(7, 2)
        -1
        >>> m.version_cmp(7, 0)
        1
        >>> m.version_cmp(8, 0)
        -1
        """
        if not hasattr(self, '_parsed_version') \
                or self._parsed_version == None:
            self._parsed_version = [int(x) for x in self.version().split('.')]
        for current,other in zip(self._parsed_version, args):
            c = cmp(current,other)
            if c != 0:
                return c
        return 0

    def _require_version_ge(self, *args):
        """Require installed interface version >= `*args`.

        >>> m = Monotone(repo='.')
        >>> m._version = '7.1'
        >>> m._require_version_ge(6, 0)
        >>> m._require_version_ge(7, 1)
        >>> m._require_version_ge(7, 2)
        Traceback (most recent call last):
          ...
        NotImplementedError: Operation not supported for monotone automation interface version 7.1.  Requires 7.2
        """
        if self.version_cmp(*args) < 0:
            raise NotImplementedError(
                'Operation not supported for %s automation interface version'
                ' %s.  Requires %s' % (self.name, self.version(),
                                      '.'.join([str(x) for x in args])))

    def _vcs_get_user_id(self):
        status,output,error = self._u_invoke_client('list', 'keys')
        # output ~=
        # ...
        # [private keys]
        # f7791378b49dfb47a740e9588848b510de58f64f john@doe.com
        if '[private keys]' in output:
            private = False
            for line in output.splitlines():
                line = line.strip()
                if private == True:  # HACK.  Just pick the first key.
                    return line.split(' ', 1)[1]
                if line == '[private keys]':
                    private = True
        return None  # Monotone has no infomation

    def _vcs_detect(self, path):
        if self._u_search_parent_directories(path, '_MTN') != None :
            return True
        return False

    def _vcs_root(self, path):
        """Find the root of the deepest repository containing path."""
        if os.path.isdir(path):
            dirname = os.path.dirname(path)
        else:
            dirname = path
        if self.version_cmp(8, 0) >= 0:
            status,output,error = self._u_invoke_client(
                'automate', 'get_workspace_root', cwd=dirname)
        else:
            mtn_dir = self._u_search_parent_directories(path, '_MTN')
            if mtn_dir == None:
                return None
            return os.path.dirname(mtn_dir)
        return output.strip()

    def _invoke_client(self, *args, **kwargs):
        """Invoke the client on our branch.
        """
        arglist = []
        if self._db_path != None:
            arglist.extend(['--db', self._db_path])
        if self._key != None:
            arglist.extend(['--key', self._key])
        if self._key_dir != None:
            arglist.extend(['--keydir', self._key_dir])
        arglist.extend(args)
        args = tuple(arglist)
        return self._u_invoke_client(*args, **kwargs)

    def _vcs_init(self, path):
        self._require_version_ge(4, 0)
        self._db_path = os.path.abspath(os.path.join(path, 'bugseverywhere.db'))
        self._key_dir = os.path.abspath(os.path.join(path, '_monotone_keys'))
        self._branch_name = 'bugs-everywhere-test'
        self._key = 'bugseverywhere-%d@test.com' % random.randint(0,1e6)
        self._passphrase = ''
        self._u_invoke_client('db', 'init', '--db', self._db_path, cwd=path)
        os.mkdir(self._key_dir)
        self._u_invoke_client('automate',
                              '--db', self._db_path,
                              '--keydir', self._key_dir,
                              'genkey', self._key, self._passphrase)
        self._invoke_client('setup', '--db', self._db_path,
                                '--branch', self._branch_name, cwd=path)

    def _vcs_destroy(self):
        vcs_dir = os.path.join(self.repo, '_MTN')
        for dir in [vcs_dir, self._key_dir]:
            if os.path.exists(dir):
                shutil.rmtree(dir)
        if os.path.exists(self._db_path):
            os.remove(self._db_path)

    def _vcs_add(self, path):
        if os.path.isdir(path):
            return
        self._invoke_client('add', path)

    def _vcs_remove(self, path):
        if not os.path.isdir(self._u_abspath(path)):
            self._invoke_client('rm', path)

    def _vcs_update(self, path):
        pass

    def _vcs_get_file_contents(self, path, revision=None):
        if revision == None:
            return base.VCS._vcs_get_file_contents(self, path, revision)
        else:
            self._require_version_ge(4, 0)
            status,output,error = self._invoke_client(
                'automate', 'get_file_of', path, '--revision', revision)
            return output

    def _dirs_and_files(self, revision):
        self._require_version_ge(2, 0)
        status,output,error = self._invoke_client(
            'automate', 'get_manifest_of', revision)
        dirs = []
        files = []
        children_by_dir = {}
        for line in output.splitlines():
            fields = line.strip().split(' ', 1)
            if len(fields) != 2 or len(fields[1]) < 2:
                continue
            value = fields[1][1:-1]  # [1:-1] for '"XYZ"' -> 'XYZ'
            if value == '':
                value = '.'
            if fields[0] == 'dir':
                dirs.append(value)
                children_by_dir[value] = []
            elif fields[0] == 'file':
                files.append(value)
        for child in (dirs+files):
            if child == '.':
                continue
            parent = '.'
            for p in dirs:
                # Does Monotone use native path separators?
                start = p+os.path.sep
                if p != child and child.startswith(start):
                    rel = child[len(start):]
                    if rel.count(os.path.sep) == 0:
                        parent = p
                        break
            children_by_dir[parent].append(child)
        return (dirs, files, children_by_dir)

    def _vcs_path(self, id, revision):
        dirs,files,children_by_dir = self._dirs_and_files(revision)
        return self._u_find_id_from_manifest(id, dirs+files, revision=revision)

    def _vcs_isdir(self, path, revision):
        dirs,files,children_by_dir = self._dirs_and_files(revision)
        return path in dirs

    def _vcs_listdir(self, path, revision):
        dirs,files,children_by_dir = self._dirs_and_files(revision)
        children = [self._u_rel_path(c, path) for c in children_by_dir[path]]
        return children

    def _vcs_commit(self, commitfile, allow_empty=False):
        args = ['commit', '--key', self._key, '--message-file', commitfile]
        kwargs = {'expect': (0,1)}
        status,output,error = self._invoke_client(*args, **kwargs)
        strings = ['no changes to commit']
        current_rev = self._current_revision()
        if status == 1:
            if self._u_any_in_string(strings, error) == True:
                if allow_empty == False:
                    raise base.EmptyCommit()
                # note that Monotone does _not_ make an empty revision.
                # this returns the last non-empty revision id...
            else:
                raise CommandError(
                    [self.client] + args, status, output, error)
        else:  # successful commit
            assert current_rev in error, \
                'Mismatched revisions:\n%s\n%s' % (current_rev, error)
        return current_rev

    def _current_revision(self):
        self._require_version_ge(2, 0)
        status,output,error = self._invoke_client(
            'automate', 'get_base_revision_id')  # since 2.0
        return output.strip()

    def _vcs_revision_id(self, index):
        current_rev = self._current_revision()
        status,output,error = self._invoke_client(
            'automate', 'ancestors', current_rev)  # since 0.2, but output is alphebetized
        revs = output.splitlines() + [current_rev]
        status,output,error = self._invoke_client(
            'automate', 'toposort', *revs)
        revisions = output.splitlines()
        try:
            if index > 0:
                return revisions[index-1]
            elif index < 0:
                return revisions[index]
            else:
                return None
        except IndexError:
            return None

    def _diff(self, revision):
        status,output,error = self._invoke_client('-r', revision, 'diff')
        return output

    def _parse_diff(self, diff_text):
        """_parse_diff(diff_text) -> (new,modified,removed)

        `new`, `modified`, and `removed` are lists of files.

        Example diff text::

          #
          # old_revision [1ce9ac2cfe3166b8ad23a60555f8a70f37686c25]
          #
          # delete ".be/dir/bugs/moved"
          # 
          # delete ".be/dir/bugs/removed"
          # 
          # add_file ".be/dir/bugs/moved2"
          #  content [33e4510df9abef16dad7c65c0775e74602cc5005]
          # 
          # add_file ".be/dir/bugs/new"
          #  content [45c45b5630f7446f83b0e14ee1525e449a06131c]
          # 
          # patch ".be/dir/bugs/modified"
          #  from [809bf3b80423c361849386008a0ce01199d30929]
          #    to [f13d3ec08972e2b41afecd9a90d4bc71cdcea338]
          #
          ============================================================
          --- .be/dir/bugs/moved2 33e4510df9abef16dad7c65c0775e74602cc5005
          +++ .be/dir/bugs/moved2 33e4510df9abef16dad7c65c0775e74602cc5005
          @@ -0,0 +1 @@
          +this entry will be moved
          \ No newline at end of file
          ============================================================
          --- .be/dir/bugs/new    45c45b5630f7446f83b0e14ee1525e449a06131c
          +++ .be/dir/bugs/new    45c45b5630f7446f83b0e14ee1525e449a06131c
          @@ -0,0 +1 @@
          +this entry is new
          \ No newline at end of file
          ============================================================
          --- .be/dir/bugs/modified       809bf3b80423c361849386008a0ce01199d30929
          +++ .be/dir/bugs/modified       f13d3ec08972e2b41afecd9a90d4bc71cdcea338
          @@ -1 +1 @@
          -some value to be modified
          \ No newline at end of file
          +a new value
          \ No newline at end of file
        """
        new = []
        modified = []
        removed = []
        lines = diff_text.splitlines()
        for i,line in enumerate(lines):
            if line.startswith('# add_file "'):
                new.append(line[len('# add_file "'):-1])
            elif line.startswith('# patch "'):
                modified.append(line[len('# patch "'):-1])
            elif line.startswith('# delete "'):
                removed.append(line[len('# delete "'):-1])
            elif not line.startswith('#'):
                break
        return (new,modified,removed)

    def _vcs_changed(self, revision):
        return self._parse_diff(self._diff(revision))


if libbe.TESTING == True:
    base.make_vcs_testcase_subclasses(Monotone, sys.modules[__name__])

    unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
    suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])