diff options
author | Jake Hunsaker <jhunsake@redhat.com> | 2020-03-25 13:54:25 -0400 |
---|---|---|
committer | Jake Hunsaker <jhunsake@redhat.com> | 2020-04-08 09:27:16 -0400 |
commit | b8338c6ebd4d82baeea5ed40b0d9133d5c66907e (patch) | |
tree | 73f15ebbefbd849d5ca3dec5bb1a404bddba4f1f | |
parent | afb4211d57415267a46da1bc830fe2d08406c2cf (diff) | |
download | sos-b8338c6ebd4d82baeea5ed40b0d9133d5c66907e.tar.gz |
[options] Split SoSOptions into a new module
Moves `SoSOptions()` and the related bits to a new sos/options.py
location. This should allow for easier maintenance and ongoing
development as our option handling approach will be shared across
components.
Signed-off-by: Jake Hunsaker <jhunsake@redhat.com>
-rw-r--r-- | sos/__init__.py | 322 | ||||
-rw-r--r-- | sos/options.py | 341 |
2 files changed, 342 insertions, 321 deletions
diff --git a/sos/__init__.py b/sos/__init__.py index 9da2c107..6b1be16f 100644 --- a/sos/__init__.py +++ b/sos/__init__.py @@ -26,327 +26,7 @@ else: from ConfigParser import ConfigParser, ParsingError, Error from sos.report import SoSReport - -# Global option definitions -# These must be in the module itself in order to be available to both -# the sosreport and policy module (and to avoid recursive import errors). -# -# FIXME: these definitions make our main module a bit more bulky: the -# alternative is to place these in a new sos.options module. This may -# prove to be the best route long-term (as it could also contain an -# exported parsing routine, and all the command-line definitions). - -#: Names of all arguments -_arg_names = [ - 'add_preset', 'alloptions', 'allow_system_changes', 'all_logs', 'batch', - 'build', 'case_id', 'chroot', 'compression_type', 'config_file', 'desc', - 'debug', 'del_preset', 'dry_run', 'enableplugins', 'encrypt_key', - 'encrypt_pass', 'experimental', 'label', 'list_plugins', 'list_presets', - 'list_profiles', 'log_size', 'noplugins', 'noreport', 'no_env_vars', - 'no_postproc', 'note', 'onlyplugins', 'plugin_timeout', 'plugopts', - 'preset', 'profiles', 'quiet', 'since', 'sysroot', 'threads', 'tmp_dir', - 'upload', 'upload_url', 'upload_directory', 'upload_user', 'upload_pass', - 'verbosity', 'verify' -] - -#: Arguments with non-zero default values -_arg_defaults = { - "chroot": "auto", - "compression_type": "auto", - "log_size": 25, - "preset": "auto", - # Verbosity has an explicit zero default since the ArgumentParser - # count action default is None. - "verbosity": 0 -} - - -def _is_seq(val): - """Return true if val is an instance of a known sequence type. - """ - val_type = type(val) - return val_type is list or val_type is tuple - - -class SoSOptions(object): - - def _merge_opt(self, opt, src, is_default): - def _unset(val): - return (val == "" or val is None) - - if hasattr(src, opt): - newvalue = getattr(src, opt) - oldvalue = getattr(self, opt) - # overwrite value iff: - # - we replace unset option by a real value - # - new default is set, or - # - non-sequential variable keeps its default value - if (_unset(oldvalue) and not _unset(newvalue)) or \ - is_default or \ - ((opt not in self._nondefault) and (not _is_seq(newvalue))): - # Overwrite atomic values - setattr(self, opt, newvalue) - if is_default: - self._nondefault.discard(opt) - else: - self._nondefault.add(opt) - elif _is_seq(newvalue): - # Concatenate sequence types - setattr(self, opt, newvalue + oldvalue) - - def _merge_opts(self, src, is_default): - for arg in _arg_names: - self._merge_opt(arg, src, is_default) - - def __str(self, quote=False, sep=" ", prefix="", suffix=""): - """Format a SoSOptions object as a human or machine readable string. - - :param quote: quote option values - :param sep: list separator string - :param prefix: arbitrary prefix string - :param suffix: arbitrary suffix string - :param literal: print values as Python literals - """ - args = prefix - arg_fmt = "=%s" - for arg in _arg_names: - args += arg + arg_fmt + sep - args.strip(sep) - - vals = [getattr(self, arg) for arg in _arg_names] - if not quote: - # Convert Python source notation for sequences into plain strings - vals = [",".join(v) if _is_seq(v) else v for v in vals] - else: - def is_string(val): - return isinstance(val, six.string_types) - # Only quote strings if quote=False - vals = ["'%s'" % v if is_string(v) else v for v in vals] - - return (args % tuple(vals)).strip(sep) + suffix - - def __str__(self): - return self.__str() - - def __repr__(self): - return self.__str(quote=True, sep=", ", prefix="SoSOptions(", - suffix=")") - - def __init__(self, **kwargs): - """Initialise a new ``SoSOptions`` object from keyword arguments. - - Initialises the new object with values taken from keyword - arguments matching the names of ``SoSOptions`` attributes. - - A ``ValueError`` is raised is any of the supplied keyword - arguments does not correspond to a known ``SoSOptions` - attribute name. - - :param *kwargs: a list of ``SoSOptions`` keyword args. - :returns: the new ``SoSOptions`` object. - """ - self.add_preset = "" - self.alloptions = False - self.all_logs = False - self.since = None - self.batch = False - self.build = False - self.case_id = "" - self.chroot = _arg_defaults["chroot"] - self.compression_type = _arg_defaults["compression_type"] - self.config_file = "" - self.debug = False - self.del_preset = "" - self.desc = "" - self.dry_run = False - self.enableplugins = [] - self.encrypt_key = None - self.encrypt_pass = None - self.experimental = False - self.label = "" - self.list_plugins = False - self.list_presets = False - self.list_profiles = False - self.log_size = _arg_defaults["log_size"] - self.noplugins = [] - self.noreport = False - self.allow_system_changes = False - self.no_env_vars = False - self.no_postproc = False - self.note = "" - self.onlyplugins = [] - self.plugin_timeout = None - self.plugopts = [] - self.preset = _arg_defaults["preset"] - self.profiles = [] - self.quiet = False - self.sysroot = None - self.threads = 4 - self.tmp_dir = "" - self.upload = False - self.upload_url = "" - self.upload_directory = "" - self.upload_user = "" - self.upload_pass = "" - self.verbosity = _arg_defaults["verbosity"] - self.verify = False - self._nondefault = set() - for arg in kwargs.keys(): - if arg not in _arg_names: - raise ValueError("Unknown SoSOptions attribute: %s" % arg) - setattr(self, arg, kwargs[arg]) - - @classmethod - def from_args(cls, args): - """Initialise a new SoSOptions object from a ``Namespace`` - obtained by parsing command line arguments. - - :param args: parsed command line arguments - :returns: an initialised SoSOptions object - :returntype: SoSOptions - """ - opts = SoSOptions() - opts._merge_opts(args, True) - return opts - - @classmethod - def _opt_to_args(cls, opt, val): - """Convert a named option and optional value to command line - argument notation, correctly handling options that take - no value or that have special representations (e.g. verify - and verbose). - """ - no_value = ( - "alloptions", "allow-system-changes", "all-logs", "batch", "build", - "debug", "experimental", "list-plugins", "list-presets", - "list-profiles", "no-report", "no-env-vars", "quiet", "verify" - ) - count = ("verbose",) - if opt in no_value: - return ["--%s" % opt] - if opt in count: - return ["--%s" % opt for d in range(0, int(val))] - return ["--" + opt + "=" + val] - - @classmethod - def from_file(cls, argparser, config_file, is_default=True): - opts = SoSOptions() - config = ConfigParser() - try: - try: - with open(config_file) as f: - config.readfp(f) - except (ParsingError, Error) as e: - raise exit('Failed to parse configuration ' - 'file %s' % config_file) - except (OSError, IOError) as e: - raise exit('Unable to read configuration file %s ' - ': %s' % (config_file, e.args[1])) - - if config.has_section("general"): - optlist = [] - for opt, val in config.items("general"): - optlist.extend(SoSOptions._opt_to_args(opt, val)) - opts._merge_opts(argparser.parse_args(optlist), is_default) - - opts.noplugins = [] - if config.has_option("plugins", "disable"): - opts.noplugins.extend([plugin.strip() for plugin in - config.get("plugins", "disable").split(',')]) - - if config.has_option("plugins", "enable"): - opts.enableplugins = [] - opts.enableplugins.extend( - [plugin.strip() for plugin in - config.get("plugins", "enable").split(',')]) - - if config.has_section("tunables"): - opts.plugopts = [] - for opt, val in config.items("tunables"): - if not opt.split('.')[0] in opts.noplugins: - opts.plugopts.append(opt + "=" + val) - - return opts - - def merge(self, src, skip_default=True): - """Merge another set of ``SoSOptions`` into this object. - - Merge two ``SoSOptions`` objects by setting unset or default - values to their value in the ``src`` object. - - :param src: the ``SoSOptions`` object to copy from - :param is_default: ``True`` if new default values are to be set. - """ - for arg in _arg_names: - if not hasattr(src, arg): - continue - if getattr(src, arg) is not None or not skip_default: - self._merge_opt(arg, src, False) - - def dict(self): - """Return this ``SoSOptions`` option values as a dictionary of - argument name to value mappings. - - :returns: a name:value dictionary of option values. - """ - odict = {} - for arg in _arg_names: - value = getattr(self, arg) - # Do not attempt to store preset option values in presets - if arg in ('add_preset', 'del_preset', 'desc', 'note'): - value = None - odict[arg] = value - return odict - - def to_args(self): - """Return command arguments for this object. - - Return a list of the non-default options of this ``SoSOptions`` - object in ``sosreport`` command line argument notation: - - ``["--all-logs", "-vvv"]`` - - """ - def has_value(name, value): - """ Test for non-null option values. - """ - null_values = ("False", "None", "[]", '""', "''", "0") - if not value or value in null_values: - return False - if name in _arg_defaults: - if str(value) == str(_arg_defaults[name]): - return False - return True - - def filter_opt(name, value): - """ Filter out preset and null-valued options. - """ - if name in ("add_preset", "del_preset", "desc", "note"): - return False - return has_value(name, value) - - def argify(name, value): - """ Convert sos option notation to command line arguments. - """ - # Handle --verbosity specially - if name.startswith("verbosity"): - arg = "-" + int(value) * "v" - return arg - - name = name.replace("_", "-") - - value = ",".join(value) if _is_seq(value) else value - - if value is not True: - opt = "%s %s" % (name, value) - else: - opt = name - - arg = "--" + opt if len(opt) > 1 else "-" + opt - return arg - - opt_items = sorted(self.dict().items(), key=lambda x: x[0]) - return [argify(n, v) for (n, v) in opt_items if filter_opt(n, v)] +from sos.options import SoSListOption class SoSComponent(): diff --git a/sos/options.py b/sos/options.py new file mode 100644 index 00000000..38140a29 --- /dev/null +++ b/sos/options.py @@ -0,0 +1,341 @@ +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + +from argparse import ArgumentParser, Action + +# Global option definitions +# These must be in the module itself in order to be available to both +# the sosreport and policy module (and to avoid recursive import errors). +# +# FIXME: these definitions make our main module a bit more bulky: the +# alternative is to place these in a new sos.options module. This may +# prove to be the best route long-term (as it could also contain an +# exported parsing routine, and all the command-line definitions). + +#: Names of all arguments +_arg_names = [ + 'add_preset', 'alloptions', 'allow_system_changes', 'all_logs', 'batch', + 'build', 'case_id', 'chroot', 'compression_type', 'config_file', 'desc', + 'debug', 'del_preset', 'dry_run', 'enableplugins', 'encrypt_key', + 'encrypt_pass', 'experimental', 'label', 'list_plugins', 'list_presets', + 'list_profiles', 'log_size', 'noplugins', 'noreport', 'no_env_vars', + 'no_postproc', 'note', 'onlyplugins', 'plugin_timeout', 'plugopts', + 'preset', 'profiles', 'quiet', 'since', 'sysroot', 'threads', 'tmp_dir', + 'upload', 'upload_url', 'upload_directory', 'upload_user', 'upload_pass', + 'verbosity', 'verify' +] + +#: Arguments with non-zero default values +_arg_defaults = { + "chroot": "auto", + "compression_type": "auto", + "log_size": 25, + "preset": "auto", + # Verbosity has an explicit zero default since the ArgumentParser + # count action default is None. + "verbosity": 0 +} + + +def _is_seq(val): + """Return true if val is an instance of a known sequence type. + """ + val_type = type(val) + return val_type is list or val_type is tuple + + +class SoSOptions(object): + + def _merge_opt(self, opt, src, is_default): + def _unset(val): + return (val == "" or val is None) + + if hasattr(src, opt): + newvalue = getattr(src, opt) + oldvalue = getattr(self, opt) + # overwrite value iff: + # - we replace unset option by a real value + # - new default is set, or + # - non-sequential variable keeps its default value + if (_unset(oldvalue) and not _unset(newvalue)) or \ + is_default or \ + ((opt not in self._nondefault) and (not _is_seq(newvalue))): + # Overwrite atomic values + setattr(self, opt, newvalue) + if is_default: + self._nondefault.discard(opt) + else: + self._nondefault.add(opt) + elif _is_seq(newvalue): + # Concatenate sequence types + setattr(self, opt, newvalue + oldvalue) + + def _merge_opts(self, src, is_default): + for arg in _arg_names: + self._merge_opt(arg, src, is_default) + + def __str(self, quote=False, sep=" ", prefix="", suffix=""): + """Format a SoSOptions object as a human or machine readable string. + + :param quote: quote option values + :param sep: list separator string + :param prefix: arbitrary prefix string + :param suffix: arbitrary suffix string + :param literal: print values as Python literals + """ + args = prefix + arg_fmt = "=%s" + for arg in _arg_names: + args += arg + arg_fmt + sep + args.strip(sep) + + vals = [getattr(self, arg) for arg in _arg_names] + if not quote: + # Convert Python source notation for sequences into plain strings + vals = [",".join(v) if _is_seq(v) else v for v in vals] + else: + def is_string(val): + return isinstance(val, six.string_types) + # Only quote strings if quote=False + vals = ["'%s'" % v if is_string(v) else v for v in vals] + + return (args % tuple(vals)).strip(sep) + suffix + + def __str__(self): + return self.__str() + + def __repr__(self): + return self.__str(quote=True, sep=", ", prefix="SoSOptions(", + suffix=")") + + def __init__(self, **kwargs): + """Initialise a new ``SoSOptions`` object from keyword arguments. + + Initialises the new object with values taken from keyword + arguments matching the names of ``SoSOptions`` attributes. + + A ``ValueError`` is raised is any of the supplied keyword + arguments does not correspond to a known ``SoSOptions` + attribute name. + + :param *kwargs: a list of ``SoSOptions`` keyword args. + :returns: the new ``SoSOptions`` object. + """ + self.add_preset = "" + self.alloptions = False + self.all_logs = False + self.since = None + self.batch = False + self.build = False + self.case_id = "" + self.chroot = _arg_defaults["chroot"] + self.compression_type = _arg_defaults["compression_type"] + self.config_file = "" + self.debug = False + self.del_preset = "" + self.desc = "" + self.dry_run = False + self.enableplugins = [] + self.encrypt_key = None + self.encrypt_pass = None + self.experimental = False + self.label = "" + self.list_plugins = False + self.list_presets = False + self.list_profiles = False + self.log_size = _arg_defaults["log_size"] + self.noplugins = [] + self.noreport = False + self.allow_system_changes = False + self.no_env_vars = False + self.no_postproc = False + self.note = "" + self.onlyplugins = [] + self.plugin_timeout = None + self.plugopts = [] + self.preset = _arg_defaults["preset"] + self.profiles = [] + self.quiet = False + self.sysroot = None + self.threads = 4 + self.tmp_dir = "" + self.upload = False + self.upload_url = "" + self.upload_directory = "" + self.upload_user = "" + self.upload_pass = "" + self.verbosity = _arg_defaults["verbosity"] + self.verify = False + self._nondefault = set() + for arg in kwargs.keys(): + if arg not in _arg_names: + raise ValueError("Unknown SoSOptions attribute: %s" % arg) + setattr(self, arg, kwargs[arg]) + + @classmethod + def from_args(cls, args): + """Initialise a new SoSOptions object from a ``Namespace`` + obtained by parsing command line arguments. + + :param args: parsed command line arguments + :returns: an initialised SoSOptions object + :returntype: SoSOptions + """ + opts = SoSOptions() + opts._merge_opts(args, True) + return opts + + @classmethod + def _opt_to_args(cls, opt, val): + """Convert a named option and optional value to command line + argument notation, correctly handling options that take + no value or that have special representations (e.g. verify + and verbose). + """ + no_value = ( + "alloptions", "allow-system-changes", "all-logs", "batch", "build", + "debug", "experimental", "list-plugins", "list-presets", + "list-profiles", "no-report", "no-env-vars", "quiet", "verify" + ) + count = ("verbose",) + if opt in no_value: + return ["--%s" % opt] + if opt in count: + return ["--%s" % opt for d in range(0, int(val))] + return ["--" + opt + "=" + val] + + @classmethod + def from_file(cls, argparser, config_file, is_default=True): + opts = SoSOptions() + config = ConfigParser() + try: + try: + with open(config_file) as f: + config.readfp(f) + except (ParsingError, Error) as e: + raise exit('Failed to parse configuration ' + 'file %s' % config_file) + except (OSError, IOError) as e: + raise exit('Unable to read configuration file %s ' + ': %s' % (config_file, e.args[1])) + + if config.has_section("general"): + optlist = [] + for opt, val in config.items("general"): + optlist.extend(SoSOptions._opt_to_args(opt, val)) + opts._merge_opts(argparser.parse_args(optlist), is_default) + + opts.noplugins = [] + if config.has_option("plugins", "disable"): + opts.noplugins.extend([plugin.strip() for plugin in + config.get("plugins", "disable").split(',')]) + + if config.has_option("plugins", "enable"): + opts.enableplugins = [] + opts.enableplugins.extend( + [plugin.strip() for plugin in + config.get("plugins", "enable").split(',')]) + + if config.has_section("tunables"): + opts.plugopts = [] + for opt, val in config.items("tunables"): + if not opt.split('.')[0] in opts.noplugins: + opts.plugopts.append(opt + "=" + val) + + return opts + + def merge(self, src, skip_default=True): + """Merge another set of ``SoSOptions`` into this object. + + Merge two ``SoSOptions`` objects by setting unset or default + values to their value in the ``src`` object. + + :param src: the ``SoSOptions`` object to copy from + :param is_default: ``True`` if new default values are to be set. + """ + for arg in _arg_names: + if not hasattr(src, arg): + continue + if getattr(src, arg) is not None or not skip_default: + self._merge_opt(arg, src, False) + + def dict(self): + """Return this ``SoSOptions`` option values as a dictionary of + argument name to value mappings. + + :returns: a name:value dictionary of option values. + """ + odict = {} + for arg in _arg_names: + value = getattr(self, arg) + # Do not attempt to store preset option values in presets + if arg in ('add_preset', 'del_preset', 'desc', 'note'): + value = None + odict[arg] = value + return odict + + def to_args(self): + """Return command arguments for this object. + + Return a list of the non-default options of this ``SoSOptions`` + object in ``sosreport`` command line argument notation: + + ``["--all-logs", "-vvv"]`` + + """ + def has_value(name, value): + """ Test for non-null option values. + """ + null_values = ("False", "None", "[]", '""', "''", "0") + if not value or value in null_values: + return False + if name in _arg_defaults: + if str(value) == str(_arg_defaults[name]): + return False + return True + + def filter_opt(name, value): + """ Filter out preset and null-valued options. + """ + if name in ("add_preset", "del_preset", "desc", "note"): + return False + return has_value(name, value) + + def argify(name, value): + """ Convert sos option notation to command line arguments. + """ + # Handle --verbosity specially + if name.startswith("verbosity"): + arg = "-" + int(value) * "v" + return arg + + name = name.replace("_", "-") + + value = ",".join(value) if _is_seq(value) else value + + if value is not True: + opt = "%s %s" % (name, value) + else: + opt = name + + arg = "--" + opt if len(opt) > 1 else "-" + opt + return arg + + opt_items = sorted(self.dict().items(), key=lambda x: x[0]) + return [argify(n, v) for (n, v) in opt_items if filter_opt(n, v)] + + +class SosListOption(Action): + + """Allow to specify comma delimited list of plugins""" + + def __call__(self, parser, namespace, values, option_string=None): + items = [opt for opt in values.split(',')] + if getattr(namespace, self.dest): + items += getattr(namespace, self.dest) + setattr(namespace, self.dest, items) |