From f1ae4b7f3fea6fb78a54ee5073aee648a3d73179 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Sat, 25 Feb 2012 11:16:01 -0500 Subject: Remove util.subproc.Pipe, as we no longer use it in BE. This was at one point used by `update_copyright.py`. Now that that is an external package (and doesn't use `Pipe` anymore either), we can safely remove this code. As a side benifit, the Pipe doctests will no longer be there to fail on OS X and other systems sufficiently different from my development box. --- libbe/util/subproc.py | 139 -------------------------------------------------- 1 file changed, 139 deletions(-) diff --git a/libbe/util/subproc.py b/libbe/util/subproc.py index 3a66f49..0bda520 100644 --- a/libbe/util/subproc.py +++ b/libbe/util/subproc.py @@ -95,144 +95,5 @@ def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,), raise CommandError(list_args, status, stdout, stderr) return status, stdout, stderr -class Pipe (object): - """Simple interface for executing POSIX-style pipes. - - Based on the `subprocess` module. The only complication is the - adaptation of `subprocess.Popen._communicate` to listen to the - stderrs of all processes involved in the pipe, as well as the - terminal process' stdout. There are two implementations of - `Pipe._communicate`, one for MS Windows, and one for POSIX - systems. The MS Windows implementation is currently untested. - - >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']]) - >>> p.stdout - '/etc/ssh\\n' - >>> p.status - 1 - >>> p.statuses - [1, 0] - >>> p.stderrs # doctest: +ELLIPSIS - [...find: ...: Permission denied..., ''] - """ - def __init__(self, cmds, stdin=None): - # spawn processes - self._procs = [] - for cmd in cmds: - if len(self._procs) != 0: - stdin = self._procs[-1].stdout - self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE)) - - self.stdout,self.stderrs = self._communicate(input=None) - - # collect process statuses - self.statuses = [] - self.status = 0 - for proc in self._procs: - self.statuses.append(proc.wait()) - if self.statuses[-1] != 0: - self.status = self.statuses[-1] - - # Code excerpted from subprocess.Popen._communicate() - if _MSWINDOWS == True: - def _communicate(self, input=None): - assert input == None, 'stdin != None not yet supported' - # listen to each process' stderr - threads = [] - std_X_arrays = [] - for proc in self._procs: - stderr_array = [] - thread = Thread(target=proc._readerthread, - args=(proc.stderr, stderr_array)) - thread.setDaemon(True) - thread.start() - threads.append(thread) - std_X_arrays.append(stderr_array) - - # also listen to the last processes stdout - stdout_array = [] - thread = Thread(target=proc._readerthread, - args=(proc.stdout, stdout_array)) - thread.setDaemon(True) - thread.start() - threads.append(thread) - std_X_arrays.append(stdout_array) - - # join threads as they die - for thread in threads: - thread.join() - - # read output from reader threads - std_X_strings = [] - for std_X_array in std_X_arrays: - std_X_strings.append(std_X_array[0]) - - stdout = std_X_strings.pop(-1) - stderrs = std_X_strings - return (stdout, stderrs) - else: - assert _POSIX==True, 'invalid platform' - def _communicate(self, input=None): - read_set = [] - write_set = [] - read_arrays = [] - stdout = None # Return - stderr = None # Return - - if self._procs[0].stdin: - # Flush stdio buffer. This might block, if the user has - # been writing to .stdin in an uncontrolled fashion. - self._procs[0].stdin.flush() - if input: - write_set.append(self._procs[0].stdin) - else: - self._procs[0].stdin.close() - for proc in self._procs: - read_set.append(proc.stderr) - read_arrays.append([]) - read_set.append(self._procs[-1].stdout) - read_arrays.append([]) - - input_offset = 0 - while read_set or write_set: - try: - rlist, wlist, xlist = select.select(read_set, write_set, []) - except select.error, e: - if e.args[0] == errno.EINTR: - continue - raise - if self._procs[0].stdin in wlist: - # When select has indicated that the file is writable, - # we can write up to PIPE_BUF bytes without risk - # blocking. POSIX defines PIPE_BUF >= 512 - chunk = input[input_offset : input_offset + 512] - bytes_written = os.write(self.stdin.fileno(), chunk) - input_offset += bytes_written - if input_offset >= len(input): - self._procs[0].stdin.close() - write_set.remove(self._procs[0].stdin) - if self._procs[-1].stdout in rlist: - data = os.read(self._procs[-1].stdout.fileno(), 1024) - if data == '': - self._procs[-1].stdout.close() - read_set.remove(self._procs[-1].stdout) - read_arrays[-1].append(data) - for i,proc in enumerate(self._procs): - if proc.stderr in rlist: - data = os.read(proc.stderr.fileno(), 1024) - if data == '': - proc.stderr.close() - read_set.remove(proc.stderr) - read_arrays[i].append(data) - - # All data exchanged. Translate lists into strings. - read_strings = [] - for read_array in read_arrays: - read_strings.append(''.join(read_array)) - - stdout = read_strings.pop(-1) - stderrs = read_strings - return (stdout, stderrs) - if libbe.TESTING == True: suite = doctest.DocTestSuite() -- cgit