# Copyright (C) 2009-2010 W. Trevor King # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Functions for running external commands in subprocesses. """ from subprocess import Popen, PIPE import sys import libbe from encoding import get_encoding if libbe.TESTING == True: import doctest _MSWINDOWS = sys.platform == 'win32' _POSIX = not _MSWINDOWS if _POSIX == True: import os import select class CommandError(Exception): def __init__(self, command, status, stdout=None, stderr=None): strerror = ['Command failed (%d):\n %s\n' % (status, stderr), 'while executing\n %s' % str(command)] Exception.__init__(self, '\n'.join(strerror)) self.command = command self.status = status self.stdout = stdout self.stderr = stderr def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,), cwd=None, unicode_output=True, verbose=False, encoding=None): """ expect should be a tuple of allowed exit codes. cwd should be the directory from which the command will be executed. When unicode_output == True, convert stdout and stdin strings to unicode before returing them. """ if cwd == None: cwd = '.' if verbose == True: print >> sys.stderr, '%s$ %s' % (cwd, ' '.join(args)) try : if _POSIX: q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, cwd=cwd) else: assert _MSWINDOWS==True, 'invalid platform' # win32 don't have os.execvp() so have to run command in a shell q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, shell=True, cwd=cwd) except OSError, e: raise CommandError(args, status=e.args[0], stderr=e) stdout,stderr = q.communicate(input=stdin) status = q.wait() if unicode_output == True: if encoding == None: encoding = get_encoding() if stdout != None: stdout = unicode(stdout, encoding) if stderr != None: stderr = unicode(stderr, encoding) if verbose == True: print >> sys.stderr, '%d\n%s%s' % (status, stdout, stderr) if status not in expect: raise CommandError(args, status, stdout, stderr) return status, stdout, stderr class Pipe (object): """ Simple interface for executing POSIX-style pipes based on the subprocess module. The only complication is the adaptation of subprocess.Popen._comminucate to listen to the stderrs of all processes involved in the pipe, as well as the terminal process' stdout. There are two implementations of Pipe._communicate, one for MS Windows, and one for POSIX systems. The MS Windows implementation is currently untested. >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']]) >>> p.stdout '/etc/ssh\\n' >>> p.status 1 >>> p.statuses [1, 0] >>> p.stderrs # doctest: +ELLIPSIS [...find: ...: Permission denied..., ''] """ def __init__(self, cmds, stdin=None): # spawn processes self._procs = [] for cmd in cmds: if len(self._procs) != 0: stdin = self._procs[-1].stdout self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE)) self.stdout,self.stderrs = self._communicate(input=None) # collect process statuses self.statuses = [] self.status = 0 for proc in self._procs: self.statuses.append(proc.wait()) if self.statuses[-1] != 0: self.status = self.statuses[-1] # Code excerpted from subprocess.Popen._communicate() if _MSWINDOWS == True: def _communicate(self, input=None): assert input == None, 'stdin != None not yet supported' # listen to each process' stderr threads = [] std_X_arrays = [] for proc in self._procs: stderr_array = [] thread = Thread(target=proc._readerthread, args=(proc.stderr, stderr_array)) thread.setDaemon(True) thread.start() threads.append(thread) std_X_arrays.append(stderr_array) # also listen to the last processes stdout stdout_array = [] thread = Thread(target=proc._readerthread, args=(proc.stdout, stdout_array)) thread.setDaemon(True) thread.start() threads.append(thread) std_X_arrays.append(stdout_array) # join threads as they die for thread in threads: thread.join() # read output from reader threads std_X_strings = [] for std_X_array in std_X_arrays: std_X_strings.append(std_X_array[0]) stdout = std_X_strings.pop(-1) stderrs = std_X_strings return (stdout, stderrs) else: assert _POSIX==True, 'invalid platform' def _communicate(self, input=None): read_set = [] write_set = [] read_arrays = [] stdout = None # Return stderr = None # Return if self._procs[0].stdin: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. self._procs[0].stdin.flush() if input: write_set.append(self._procs[0].stdin) else: self._procs[0].stdin.close() for proc in self._procs: read_set.append(proc.stderr) read_arrays.append([]) read_set.append(self._procs[-1].stdout) read_arrays.append([]) input_offset = 0 while read_set or write_set: try: rlist, wlist, xlist = select.select(read_set, write_set, []) except select.error, e: if e.args[0] == errno.EINTR: continue raise if self._procs[0].stdin in wlist: # When select has indicated that the file is writable, # we can write up to PIPE_BUF bytes without risk # blocking. POSIX defines PIPE_BUF >= 512 chunk = input[input_offset : input_offset + 512] bytes_written = os.write(self.stdin.fileno(), chunk) input_offset += bytes_written if input_offset >= len(input): self._procs[0].stdin.close() write_set.remove(self._procs[0].stdin) if self._procs[-1].stdout in rlist: data = os.read(self._procs[-1].stdout.fileno(), 1024) if data == '': self._procs[-1].stdout.close() read_set.remove(self._procs[-1].stdout) read_arrays[-1].append(data) for i,proc in enumerate(self._procs): if proc.stderr in rlist: data = os.read(proc.stderr.fileno(), 1024) if data == '': proc.stderr.close() read_set.remove(proc.stderr) read_arrays[i].append(data) # All data exchanged. Translate lists into strings. read_strings = [] for read_array in read_arrays: read_strings.append(''.join(read_array)) stdout = read_strings.pop(-1) stderrs = read_strings return (stdout, stderrs) if libbe.TESTING == True: suite = doctest.DocTestSuite()