diff options
author | pcarrier <pcarrier@ef72aa8b-4018-0410-8976-d6e080ef94d8> | 2010-11-27 16:48:26 +0000 |
---|---|---|
committer | pcarrier <pcarrier@ef72aa8b-4018-0410-8976-d6e080ef94d8> | 2010-11-27 16:48:26 +0000 |
commit | f48426d52581a064282bdbc3f7920dfda4b3739d (patch) | |
tree | c9aeb086148d63704f19a41a0333e2f89f0a9430 /worker | |
parent | 74aece56209ee534f89d11c0c8cee00e747359a3 (diff) | |
download | sos-f48426d52581a064282bdbc3f7920dfda4b3739d.tar.gz |
[worker] Changes in specs, first draft implementation
specs:
- s/SIGTERM/SIGINT/
- s/noop/ping/
- new noop command
- introduced UNKNOWN COMMAND and INTERRUPTED on stderr
- (stdout|stderr) and their lengths are now interleaved in 'exec' output
implementation:
- Here's a first design
git-svn-id: svn+ssh://svn.fedorahosted.org/svn/sos/trunk@1033 ef72aa8b-4018-0410-8976-d6e080ef94d8
Diffstat (limited to 'worker')
-rw-r--r-- | worker/specs | 30 | ||||
-rw-r--r-- | worker/worker.py | 159 |
2 files changed, 178 insertions, 11 deletions
diff --git a/worker/specs b/worker/specs index b709fde9..73ed80db 100644 --- a/worker/specs +++ b/worker/specs @@ -22,18 +22,20 @@ When gatherer is expected to write in a file, its parent directory should exist and it will be opened with the "w" flag (unless stated otherwise). No timeout handling should be implemented in gatherer. -SIGTERM should be intercepted and: -- stop the current request and continue the normal flow of execution when a - request is being handled, +SIGINT should be handled and: +- stop the current request, write INTERRUPTED to stderr then continue the normal + flow of execution when a request is being handled, - be treated as an 'exit' request is a command was being read - be treated as a 'reset' if a command was already read, but the request is still being read. The current request should be dropped and a new request should be read, the request counter is not incremented as no request was processed. -stderr is only used: -- by Python, for example to display exceptions. -- to print "ALIVE" when SIGUSR1 is received or after a "noop" request. +stderr is used: +- by Python to display exceptions; +- to print "ALIVE" when SIGUSR1 is received or after a "ping" request; +- to print "UNKNOWN COMMAND" when a command is unknown; +- to print "INTERRUPTED" when a command is interrupted with SIGINT. Requests -------- @@ -42,6 +44,14 @@ Command: "noop" Params: - None Action: +- None +Returns: +- Nothing + +Command: "ping" +Params: +- None +Action: - Prints "ALIVE" on stderr Returns: - Nothing @@ -69,12 +79,10 @@ Params: Action: - Executes the command in a shell using subprocess.Popen Returns: -- On the first line, the return code -- On the second line, the length of its stdout output as return by len(), in - decimal. -- On the third line, the length of its stderr output as return by len(), in - decimal. +- The return code of the command +- The length of its stdout output as return by len(), in decimal. - Its stdout output, followed by a trailing '\n' +- The length of its stderr output as return by len(), in decimal. - Its stderr output, followed by a trailing '\n' Command: "exec2file" diff --git a/worker/worker.py b/worker/worker.py new file mode 100644 index 00000000..335bcbf8 --- /dev/null +++ b/worker/worker.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python + +class Request: + Commands = {} + + @staticmethod + def Register(name): + def decorate(cls): + Request.Commands[name] = cls + return cls + return decorate + + @staticmethod + def ReadCommand(shell): + cmd = shell.read_line().strip() + request = Request.Commands[cmd](shell) + return request + + def __init__(self, shell): + self.shell = shell + + def read_params(self): + pass + + def execute(self): + raise NotImplementedError() + + +@Request.Register("noop") +class NoopRequest(Request): + + def execute(self): + pass + +@Request.Register("ping") +class PingRequest(Request): + + def execute(self): + self.shell.status("ALIVE") + + +@Request.Register("exit") +class ExitRequest(Request): + + def execute(self): + self.shell.exit() + + +@Request.Register("glob") +class GlobRequest(Request): + + def read_params(self): + self.pattern = self.shell.read_line() + + def execute(self): + from glob import glob + results = glob(self.pattern) + self.shell.write(len(results)+"\n") + for result in results: + self.shell.write(result+"\n") + + +@Request.Register("exec") +class ExecRequest(Request): + + def read_params(self): + self.cmd = self.shell.read_line() + + def execute(self): + from subprocess import Popen, PIPE + proc = Popen(self.cmd, shell=True, stdout=PIPE, stderr=PIPE, bufsize=-1) + stdout, stderr = proc.communicate() + self.shell.write("%i\n" % proc.returncode) + self.shell.write_blob(stdout) + self.shell.write_blob(stderr) + +class Shell: + def __init__(self, + input_stream = None, + output_stream = None, + status_stream = None, + bork_action = None): + self.__input_stream__ = input_stream + self.__output_stream__ = output_stream + self.__status_stream__ = status_stream + if bork_action: + self.__bork_action__ = bork_action + else: + self.__bork_action__ = self.exit + self.__exit__ = False + self.__cmd_number__ = 0 + + def loop(self): + while self.__exit__ == False: + self.show_prompt() + try: request = Request.ReadCommand(self) + except KeyboardInterrupt: + self.exit() + except KeyError: + self.status("UNKNOWN COMMAND"); + self.bork() + else: + try: request.read_params() + except KeyboardInterrupt: + pass + else: + self.__cmd_number__ += 1 + try: + request.execute() + except KeyboardInterrupt: + self.status("INTERRUPTED"); + + def exit(self): + self.__exit__ = True + + def bork(self): + self.__bork_action__() + + def show_prompt(self): + self.write("#%i#\n" % self.__cmd_number__) + + def status(self, str): + print >> self.__status_stream__, str + + def write(self, msg): + self.__output_stream__.write(msg) + + def write_blob(self, blob): + self.write("%i\n" % len(blob)) + self.write(str(blob)+"\n") + + def read_line(self): + while True: + try: + return self.__input_stream__.readline().strip() + except IOError: + pass + + def read_blob(self, length): + try: + blob = self.__input_stream__.read(length) + assert self.__input_stream__.read(1) == "\n" + except: + raise IOError() + else: + return blob + +if __name__ == "__main__": + from sys import stdin, stdout, stderr, exit + from signal import signal, SIGUSR1 + def handler(signum, frame): + print >> stderr, "ALIVE" + signal(SIGUSR1, handler) + def bork(): + exit(-1) + Shell(input_stream = stdin, + output_stream = stdout, + status_stream = stderr, + bork_action = bork).loop() |