aboutsummaryrefslogtreecommitdiffstats
path: root/worker
diff options
context:
space:
mode:
authorpcarrier <pcarrier@ef72aa8b-4018-0410-8976-d6e080ef94d8>2010-11-27 16:48:26 +0000
committerpcarrier <pcarrier@ef72aa8b-4018-0410-8976-d6e080ef94d8>2010-11-27 16:48:26 +0000
commitf48426d52581a064282bdbc3f7920dfda4b3739d (patch)
treec9aeb086148d63704f19a41a0333e2f89f0a9430 /worker
parent74aece56209ee534f89d11c0c8cee00e747359a3 (diff)
downloadsos-f48426d52581a064282bdbc3f7920dfda4b3739d.tar.gz
[worker] Changes in specs, first draft implementation
specs: - s/SIGTERM/SIGINT/ - s/noop/ping/ - new noop command - introduced UNKNOWN COMMAND and INTERRUPTED on stderr - (stdout|stderr) and their lengths are now interleaved in 'exec' output implementation: - Here's a first design git-svn-id: svn+ssh://svn.fedorahosted.org/svn/sos/trunk@1033 ef72aa8b-4018-0410-8976-d6e080ef94d8
Diffstat (limited to 'worker')
-rw-r--r--worker/specs30
-rw-r--r--worker/worker.py159
2 files changed, 178 insertions, 11 deletions
diff --git a/worker/specs b/worker/specs
index b709fde9..73ed80db 100644
--- a/worker/specs
+++ b/worker/specs
@@ -22,18 +22,20 @@ When gatherer is expected to write in a file, its parent directory should exist
and it will be opened with the "w" flag (unless stated otherwise).
No timeout handling should be implemented in gatherer.
-SIGTERM should be intercepted and:
-- stop the current request and continue the normal flow of execution when a
- request is being handled,
+SIGINT should be handled and:
+- stop the current request, write INTERRUPTED to stderr then continue the normal
+ flow of execution when a request is being handled,
- be treated as an 'exit' request is a command was being read
- be treated as a 'reset' if a command was already read, but the request is
still being read. The current request should be dropped and a new request
should be read, the request counter is not incremented as no request was
processed.
-stderr is only used:
-- by Python, for example to display exceptions.
-- to print "ALIVE" when SIGUSR1 is received or after a "noop" request.
+stderr is used:
+- by Python to display exceptions;
+- to print "ALIVE" when SIGUSR1 is received or after a "ping" request;
+- to print "UNKNOWN COMMAND" when a command is unknown;
+- to print "INTERRUPTED" when a command is interrupted with SIGINT.
Requests
--------
@@ -42,6 +44,14 @@ Command: "noop"
Params:
- None
Action:
+- None
+Returns:
+- Nothing
+
+Command: "ping"
+Params:
+- None
+Action:
- Prints "ALIVE" on stderr
Returns:
- Nothing
@@ -69,12 +79,10 @@ Params:
Action:
- Executes the command in a shell using subprocess.Popen
Returns:
-- On the first line, the return code
-- On the second line, the length of its stdout output as return by len(), in
- decimal.
-- On the third line, the length of its stderr output as return by len(), in
- decimal.
+- The return code of the command
+- The length of its stdout output as return by len(), in decimal.
- Its stdout output, followed by a trailing '\n'
+- The length of its stderr output as return by len(), in decimal.
- Its stderr output, followed by a trailing '\n'
Command: "exec2file"
diff --git a/worker/worker.py b/worker/worker.py
new file mode 100644
index 00000000..335bcbf8
--- /dev/null
+++ b/worker/worker.py
@@ -0,0 +1,159 @@
+#!/usr/bin/env python
+
+class Request:
+ Commands = {}
+
+ @staticmethod
+ def Register(name):
+ def decorate(cls):
+ Request.Commands[name] = cls
+ return cls
+ return decorate
+
+ @staticmethod
+ def ReadCommand(shell):
+ cmd = shell.read_line().strip()
+ request = Request.Commands[cmd](shell)
+ return request
+
+ def __init__(self, shell):
+ self.shell = shell
+
+ def read_params(self):
+ pass
+
+ def execute(self):
+ raise NotImplementedError()
+
+
+@Request.Register("noop")
+class NoopRequest(Request):
+
+ def execute(self):
+ pass
+
+@Request.Register("ping")
+class PingRequest(Request):
+
+ def execute(self):
+ self.shell.status("ALIVE")
+
+
+@Request.Register("exit")
+class ExitRequest(Request):
+
+ def execute(self):
+ self.shell.exit()
+
+
+@Request.Register("glob")
+class GlobRequest(Request):
+
+ def read_params(self):
+ self.pattern = self.shell.read_line()
+
+ def execute(self):
+ from glob import glob
+ results = glob(self.pattern)
+ self.shell.write(len(results)+"\n")
+ for result in results:
+ self.shell.write(result+"\n")
+
+
+@Request.Register("exec")
+class ExecRequest(Request):
+
+ def read_params(self):
+ self.cmd = self.shell.read_line()
+
+ def execute(self):
+ from subprocess import Popen, PIPE
+ proc = Popen(self.cmd, shell=True, stdout=PIPE, stderr=PIPE, bufsize=-1)
+ stdout, stderr = proc.communicate()
+ self.shell.write("%i\n" % proc.returncode)
+ self.shell.write_blob(stdout)
+ self.shell.write_blob(stderr)
+
+class Shell:
+ def __init__(self,
+ input_stream = None,
+ output_stream = None,
+ status_stream = None,
+ bork_action = None):
+ self.__input_stream__ = input_stream
+ self.__output_stream__ = output_stream
+ self.__status_stream__ = status_stream
+ if bork_action:
+ self.__bork_action__ = bork_action
+ else:
+ self.__bork_action__ = self.exit
+ self.__exit__ = False
+ self.__cmd_number__ = 0
+
+ def loop(self):
+ while self.__exit__ == False:
+ self.show_prompt()
+ try: request = Request.ReadCommand(self)
+ except KeyboardInterrupt:
+ self.exit()
+ except KeyError:
+ self.status("UNKNOWN COMMAND");
+ self.bork()
+ else:
+ try: request.read_params()
+ except KeyboardInterrupt:
+ pass
+ else:
+ self.__cmd_number__ += 1
+ try:
+ request.execute()
+ except KeyboardInterrupt:
+ self.status("INTERRUPTED");
+
+ def exit(self):
+ self.__exit__ = True
+
+ def bork(self):
+ self.__bork_action__()
+
+ def show_prompt(self):
+ self.write("#%i#\n" % self.__cmd_number__)
+
+ def status(self, str):
+ print >> self.__status_stream__, str
+
+ def write(self, msg):
+ self.__output_stream__.write(msg)
+
+ def write_blob(self, blob):
+ self.write("%i\n" % len(blob))
+ self.write(str(blob)+"\n")
+
+ def read_line(self):
+ while True:
+ try:
+ return self.__input_stream__.readline().strip()
+ except IOError:
+ pass
+
+ def read_blob(self, length):
+ try:
+ blob = self.__input_stream__.read(length)
+ assert self.__input_stream__.read(1) == "\n"
+ except:
+ raise IOError()
+ else:
+ return blob
+
+if __name__ == "__main__":
+ from sys import stdin, stdout, stderr, exit
+ from signal import signal, SIGUSR1
+ def handler(signum, frame):
+ print >> stderr, "ALIVE"
+ signal(SIGUSR1, handler)
+ def bork():
+ exit(-1)
+ Shell(input_stream = stdin,
+ output_stream = stdout,
+ status_stream = stderr,
+ bork_action = bork).loop()