aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
Diffstat (limited to 'libbe')
-rw-r--r--libbe/bzr.py2
-rw-r--r--libbe/git.py2
-rw-r--r--libbe/subproc.py216
-rw-r--r--libbe/vcs.py50
4 files changed, 227 insertions, 43 deletions
diff --git a/libbe/bzr.py b/libbe/bzr.py
index 2cf1cba..281493d 100644
--- a/libbe/bzr.py
+++ b/libbe/bzr.py
@@ -90,7 +90,7 @@ class Bzr(vcs.VCS):
if self._u_any_in_string(strings, error) == True:
raise vcs.EmptyCommit()
else:
- raise vcs.CommandError(args, status, stdout="", stderr=error)
+ raise vcs.CommandError(args, status, stderr=error)
revision = None
revline = re.compile("Committed revision (.*)[.]")
match = revline.search(error)
diff --git a/libbe/git.py b/libbe/git.py
index cb4436a..55556de 100644
--- a/libbe/git.py
+++ b/libbe/git.py
@@ -134,7 +134,7 @@ class Git(vcs.VCS):
if status == 128:
if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
return None
- raise vcs.CommandError(args, status, stdout="", stderr=error)
+ raise vcs.CommandError(args, status, stderr=error)
commits = output.splitlines()
try:
return commits[index]
diff --git a/libbe/subproc.py b/libbe/subproc.py
new file mode 100644
index 0000000..13afcf8
--- /dev/null
+++ b/libbe/subproc.py
@@ -0,0 +1,216 @@
+# Copyright (C) 2009 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Functions for running external commands in subprocesses.
+"""
+
+from subprocess import Popen, PIPE
+import sys
+import doctest
+
+from encoding import get_encoding
+
+_MSWINDOWS = sys.platform == 'win32'
+_POSIX = not _MSWINDOWS
+
+class CommandError(Exception):
+ def __init__(self, command, status, stdout=None, stderr=None):
+ strerror = ['Command failed (%d):\n %s\n' % (status, stderr),
+ 'while executing\n %s' % command]
+ Exception.__init__(self, '\n'.join(strerror))
+ self.command = command
+ self.status = status
+ self.stdout = stdout
+ self.stderr = stderr
+
+def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
+ cwd=None, unicode_output=True, verbose=False, encoding=None):
+ """
+ expect should be a tuple of allowed exit codes. cwd should be
+ the directory from which the command will be executed. When
+ unicode_output == True, convert stdout and stdin strings to
+ unicode before returing them.
+ """
+ if cwd == None:
+ cwd = '.'
+ if verbose == True:
+ print >> sys.stderr, '%s$ %s' % (cwd, ' '.join(args))
+ try :
+ if _POSIX:
+ q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, cwd=cwd)
+ else:
+ assert _MSWINDOWS==True, 'invalid platform'
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr,
+ shell=True, cwd=cwd)
+ except OSError, e:
+ raise CommandError(args, status=e.args[0], stderr=e)
+ stdout,stderr = q.communicate(input=stdin)
+ status = q.wait()
+ if unicode_output == True:
+ if encoding == None:
+ encoding = get_encoding()
+ if stdout != None:
+ stdout = unicode(stdout, encoding)
+ if stderr != None:
+ stderr = unicode(stderr, encoding)
+ if verbose == True:
+ print >> sys.stderr, '%d\n%s%s' % (status, stdout, stderr)
+ if status not in expect:
+ raise CommandError(args, status, stdout, stderr)
+ return status, stdout, stderr
+
+class Pipe (object):
+ """
+ Simple interface for executing POSIX-style pipes based on the
+ subprocess module. The only complication is the adaptation of
+ subprocess.Popen._comminucate to listen to the stderrs of all
+ processes involved in the pipe, as well as the terminal process'
+ stdout. There are two implementations of Pipe._communicate, one
+ for MS Windows, and one for POSIX systems. The MS Windows
+ implementation is currently untested.
+
+ >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']])
+ >>> p.stdout
+ '/etc/ssh\\n'
+ >>> p.status
+ 1
+ >>> p.statuses
+ [1, 0]
+ >>> p.stderrs # doctest: +ELLIPSIS
+ ["find: `...': Permission denied\\n...", '']
+ """
+ def __init__(self, cmds, stdin=None):
+ # spawn processes
+ self._procs = []
+ for cmd in cmds:
+ if len(self._procs) != 0:
+ stdin = self._procs[-1].stdout
+ self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE))
+
+ self.stdout,self.stderrs = self._communicate(input=None)
+
+ # collect process statuses
+ self.statuses = []
+ self.status = 0
+ for proc in self._procs:
+ self.statuses.append(proc.wait())
+ if self.statuses[-1] != 0:
+ self.status = self.statuses[-1]
+
+ # Code excerpted from subprocess.Popen._communicate()
+ if _MSWINDOWS == True:
+ def _communicate(self, input=None):
+ assert input == None, 'stdin != None not yet supported'
+ # listen to each process' stderr
+ threads = []
+ std_X_arrays = []
+ for proc in self._procs:
+ stderr_array = []
+ thread = Thread(target=proc._readerthread,
+ args=(proc.stderr, stderr_array))
+ thread.setDaemon(True)
+ thread.start()
+ threads.append(thread)
+ std_X_arrays.append(stderr_array)
+
+ # also listen to the last processes stdout
+ stdout_array = []
+ thread = Thread(target=proc._readerthread,
+ args=(proc.stdout, stdout_array))
+ thread.setDaemon(True)
+ thread.start()
+ threads.append(thread)
+ std_X_arrays.append(stdout_array)
+
+ # join threads as they die
+ for thread in threads:
+ thread.join()
+
+ # read output from reader threads
+ std_X_strings = []
+ for std_X_array in std_X_arrays:
+ std_X_strings.append(std_X_array[0])
+
+ stdout = std_X_strings.pop(-1)
+ stderrs = std_X_strings
+ return (stdout, stderrs)
+ else:
+ assert _POSIX==True, 'invalid platform'
+ def _communicate(self, input=None):
+ read_set = []
+ write_set = []
+ read_arrays = []
+ stdout = None # Return
+ stderr = None # Return
+
+ if self._procs[0].stdin:
+ # Flush stdio buffer. This might block, if the user has
+ # been writing to .stdin in an uncontrolled fashion.
+ self._procs[0].stdin.flush()
+ if input:
+ write_set.append(self._procs[0].stdin)
+ else:
+ self._procs[0].stdin.close()
+ for proc in self._procs:
+ read_set.append(proc.stderr)
+ read_arrays.append([])
+ read_set.append(self._procs[-1].stdout)
+ read_arrays.append([])
+
+ input_offset = 0
+ while read_set or write_set:
+ try:
+ rlist, wlist, xlist = select.select(read_set, write_set, [])
+ except select.error, e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+ if self._procs[0].stdin in wlist:
+ # When select has indicated that the file is writable,
+ # we can write up to PIPE_BUF bytes without risk
+ # blocking. POSIX defines PIPE_BUF >= 512
+ chunk = input[input_offset : input_offset + 512]
+ bytes_written = os.write(self.stdin.fileno(), chunk)
+ input_offset += bytes_written
+ if input_offset >= len(input):
+ self._procs[0].stdin.close()
+ write_set.remove(self._procs[0].stdin)
+ if self._procs[-1].stdout in rlist:
+ data = os.read(self._procs[-1].stdout.fileno(), 1024)
+ if data == '':
+ self._procs[-1].stdout.close()
+ read_set.remove(self._procs[-1].stdout)
+ read_arrays[-1].append(data)
+ for i,proc in enumerate(self._procs):
+ if proc.stderr in rlist:
+ data = os.read(proc.stderr.fileno(), 1024)
+ if data == '':
+ proc.stderr.close()
+ read_set.remove(proc.stderr)
+ read_arrays[i].append(data)
+
+ # All data exchanged. Translate lists into strings.
+ read_strings = []
+ for read_array in read_arrays:
+ read_strings.append(''.join(read_array))
+
+ stdout = read_strings.pop(-1)
+ stderrs = read_strings
+ return (stdout, stderrs)
+
+suite = doctest.DocTestSuite()
diff --git a/libbe/vcs.py b/libbe/vcs.py
index 1ac5dd9..be28846 100644
--- a/libbe/vcs.py
+++ b/libbe/vcs.py
@@ -25,7 +25,6 @@ subclassed by other Version Control System backends. The base class
implements a "do not version" VCS.
"""
-from subprocess import Popen, PIPE
import codecs
import os
import os.path
@@ -38,6 +37,7 @@ import unittest
import doctest
from utility import Dir, search_parent_directories
+from subproc import CommandError, invoke
def _get_matching_vcs(matchfn):
@@ -67,15 +67,6 @@ def installed_vcs():
return _get_matching_vcs(lambda vcs: vcs.installed())
-class CommandError(Exception):
- def __init__(self, command, status, stdout, stderr):
- strerror = ["Command failed (%d):\n %s\n" % (status, stderr),
- "while executing\n %s" % command]
- Exception.__init__(self, "\n".join(strerror))
- self.command = command
- self.status = status
- self.stdout = stdout
- self.stderr = stderr
class SettingIDnotSupported(NotImplementedError):
pass
@@ -457,37 +448,14 @@ class VCS(object):
if list_string in string:
return True
return False
- def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None,
- unicode_output=True):
- """
- expect should be a tuple of allowed exit codes. cwd should be
- the directory from which the command will be executed. When
- unicode_output == True, convert stdout and stdin strings to
- unicode before returing them.
- """
- if cwd == None:
- cwd = self.rootdir
- if self.verboseInvoke == True:
- print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args))
- try :
- if sys.platform != "win32":
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
- else:
- # win32 don't have os.execvp() so have to run command in a shell
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
- shell=True, cwd=cwd)
- except OSError, e :
- raise CommandError(args, status=e.args[0], stdout="", stderr=e)
- stdout,stderr = q.communicate(input=stdin)
- status = q.wait()
- if unicode_output == True:
- stdout = unicode(stdout, self.encoding)
- stderr = unicode(stderr, self.encoding)
- if self.verboseInvoke == True:
- print >> sys.stderr, "%d\n%s%s" % (status, stdout, stderr)
- if status not in expect:
- raise CommandError(args, status, stdout, stderr)
- return status, stdout, stderr
+ def _u_invoke(self, *args, **kwargs):
+ if 'cwd' not in kwargs:
+ kwargs['cwd'] = self.rootdir
+ if 'verbose' not in kwargs:
+ kwargs['verbose'] = self.verboseInvoke
+ if 'encoding' not in kwargs:
+ kwargs['encoding'] = self.encoding
+ return invoke(*args, **kwargs)
def _u_invoke_client(self, *args, **kwargs):
cl_args = [self.client]
cl_args.extend(args)