From 2fcf5f09e0e809124aeb5e262eeecbe446824efc Mon Sep 17 00:00:00 2001 From: Jake Hunsaker Date: Fri, 6 Oct 2017 15:52:06 -0400 Subject: [utilities] Allow size limits for command output Previously, any command output generated from plugins from add_cmd_output and add_journal would be collected in full in memory. For example, if a journal was 4GB in size, then 4GB would be read into memory and subsequently written to the final sos archive. This lead to not only potentially large archives, but in some cases failure to collect data or produce an archive due to memory constraints on the system. This patch adds the ability to use a sizelimit option in both add_cmd_output and add_journal. This will limit the collected output from commands or journals to the given limit, both what is read into memory and what is written to the final archive. If not given, sizelimit will default to --log-size. For journal collection, if no sizelimit is given then the larger of either --log-size or 100mb is used. Resolves: #1120 Signed-off-by: Jake Hunsaker Signed-off-by: Bryn M. Reeves --- sos/plugins/__init__.py | 39 +++++++++++++++++++------------ sos/utilities.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 17 deletions(-) diff --git a/sos/plugins/__init__.py b/sos/plugins/__init__.py index 9f0995d8..5882c37f 100644 --- a/sos/plugins/__init__.py +++ b/sos/plugins/__init__.py @@ -600,7 +600,7 @@ class Plugin(object): def get_command_output(self, prog, timeout=300, stderr=True, chroot=True, runat=None, env=None, - binary=False): + binary=False, sizelimit=None): if chroot or self.commons['cmdlineopts'].chroot == 'always': root = self.sysroot else: @@ -608,7 +608,8 @@ class Plugin(object): result = sos_get_command_output(prog, timeout=timeout, stderr=stderr, chroot=root, chdir=runat, - env=env, binary=binary) + env=env, binary=binary, + sizelimit=sizelimit) if result['status'] == 124: self._log_warn("command '%s' timed out after %ds" @@ -646,14 +647,16 @@ class Plugin(object): def _add_cmd_output(self, cmd, suggest_filename=None, root_symlink=None, timeout=300, stderr=True, - chroot=True, runat=None, env=None, binary=False): + chroot=True, runat=None, env=None, binary=False, + sizelimit=None): """Internal helper to add a single command to the collection list.""" cmdt = ( cmd, suggest_filename, root_symlink, timeout, stderr, - chroot, runat, env, binary + chroot, runat, env, binary, sizelimit ) - _tuplefmt = "('%s', '%s', '%s', %s, '%s', '%s', '%s', '%s', '%s')" + _tuplefmt = ("('%s', '%s', '%s', %s, '%s', '%s', '%s', '%s', '%s', " + "'%s')") _logstr = "packed command tuple: " + _tuplefmt self._log_debug(_logstr % cmdt) self.collect_cmds.append(cmdt) @@ -661,7 +664,8 @@ class Plugin(object): def add_cmd_output(self, cmds, suggest_filename=None, root_symlink=None, timeout=300, stderr=True, - chroot=True, runat=None, env=None, binary=False): + chroot=True, runat=None, env=None, binary=False, + sizelimit=None): """Run a program or a list of programs and collect the output""" if isinstance(cmds, six.string_types): cmds = [cmds] @@ -670,7 +674,7 @@ class Plugin(object): for cmd in cmds: self._add_cmd_output(cmd, suggest_filename, root_symlink, timeout, stderr, - chroot, runat, env, binary) + chroot, runat, env, binary, sizelimit) def get_cmd_output_path(self, name=None, make=True): """Return a path into which this module should store collected @@ -725,14 +729,15 @@ class Plugin(object): def get_cmd_output_now(self, exe, suggest_filename=None, root_symlink=False, timeout=300, stderr=True, chroot=True, runat=None, env=None, - binary=False): + binary=False, sizelimit=None): """Execute a command and save the output to a file for inclusion in the report. """ start = time() result = self.get_command_output(exe, timeout=timeout, stderr=stderr, chroot=chroot, runat=runat, - env=env, binary=binary) + env=env, binary=binary, + sizelimit=sizelimit) self._log_debug("collected output of '%s' in %s" % (exe.split()[0], time() - start)) @@ -781,7 +786,7 @@ class Plugin(object): def add_journal(self, units=None, boot=None, since=None, until=None, lines=None, allfields=False, output=None, timeout=None, - identifier=None, catalog=None): + identifier=None, catalog=None, sizelimit=None): """ Collect journald logs from one of more units. Keyword arguments: @@ -803,6 +808,8 @@ class Plugin(object): identifier -- an optional message identifier. catalog -- If True, augment lines with descriptions from the system catalog. + sizelimit -- Limit to the size of output returned in MB. Defaults + to --log-size """ journal_cmd = "journalctl --no-pager " unit_opt = " --unit %s" @@ -850,7 +857,9 @@ class Plugin(object): journal_cmd += output_opt % output self._log_debug("collecting journal: %s" % journal_cmd) - self._add_cmd_output(journal_cmd, None, None, timeout) + self._add_cmd_output(journal_cmd, None, None, timeout, + sizelimit=sizelimit + ) def add_udev_info(self, device, attrs=False): """Collect udevadm info output for a given device @@ -887,16 +896,18 @@ class Plugin(object): timeout, stderr, chroot, runat, - env, binary + env, binary, + sizelimit ) = progs[0] self._log_debug(("unpacked command tuple: " + "('%s', '%s', '%s', %s, '%s', '%s', '%s', '%s'," + - "'%s')") % progs[0]) + "'%s %s')") % progs[0]) self._log_info("collecting output of '%s'" % prog) self.get_cmd_output_now(prog, suggest_filename=suggest_filename, root_symlink=root_symlink, timeout=timeout, stderr=stderr, chroot=chroot, runat=runat, - env=env, binary=binary) + env=env, binary=binary, + sizelimit=sizelimit) def _collect_strings(self): for string, file_name in self.copy_strings: diff --git a/sos/utilities.py b/sos/utilities.py index 2efcc35d..4899c58c 100644 --- a/sos/utilities.py +++ b/sos/utilities.py @@ -17,8 +17,10 @@ import fnmatch import errno import shlex import glob +import threading from contextlib import closing +from collections import deque # PYCOMPAT import six @@ -105,7 +107,7 @@ def is_executable(command): def sos_get_command_output(command, timeout=300, stderr=False, chroot=None, chdir=None, env=None, - binary=False): + binary=False, sizelimit=None): """Execute a command and return a dictionary of status and output, optionally changing root or current working directory before executing command. @@ -147,7 +149,11 @@ def sos_get_command_output(command, timeout=300, stderr=False, stderr=STDOUT if stderr else PIPE, bufsize=-1, env=cmd_env, close_fds=True, preexec_fn=_child_prep_fn) - stdout, stderr = p.communicate() + + reader = AsyncReader(p.stdout, sizelimit, binary) + stdout = reader.get_contents() + p.poll() + except OSError as e: if e.errno == errno.ENOENT: return {'status': 127, 'output': ""} @@ -159,7 +165,7 @@ def sos_get_command_output(command, timeout=300, stderr=False, return { 'status': p.returncode, - 'output': stdout if binary else stdout.decode('utf-8', 'ignore') + 'output': stdout } @@ -187,6 +193,55 @@ def shell_out(cmd, timeout=30, chroot=None, runat=None): chroot=chroot, chdir=runat)['output'] +class AsyncReader(threading.Thread): + '''Used to limit command output to a given size without deadlocking + sos. + + Takes a sizelimit value in MB, and will compile stdout from Popen into a + string that is limited to the given sizelimit. + ''' + + def __init__(self, channel, sizelimit, binary): + super(AsyncReader, self).__init__() + self.chan = channel + self.binary = binary + self.chunksize = 2048 + slots = None + if sizelimit: + sizelimit = sizelimit * 1048576 # convert to bytes + slots = sizelimit / self.chunksize + self.deque = deque(maxlen=slots) + self.start() + self.join() + + def run(self): + '''Reads from the channel (pipe) that is the output pipe for a + called Popen. As we are reading from the pipe, the output is added + to a deque. After the size of the deque exceeds the sizelimit + earlier (older) entries are removed. + + This means the returned output is chunksize-sensitive, but is not + really byte-sensitive. + ''' + try: + while True: + line = self.chan.read(self.chunksize) + if not line: + # Pipe can remain open after output has completed + break + self.deque.append(line) + except (ValueError, IOError): + # pipe has closed, meaning command output is done + pass + + def get_contents(self): + '''Returns the contents of the deque as a string''' + if not self.binary: + return ''.join(ln.decode('utf-8', 'ignore') for ln in self.deque) + else: + return b''.join(ln for ln in self.deque) + + class ImporterHelper(object): """Provides a list of modules that can be imported in a package. Importable modules are located along the module __path__ list and modules -- cgit