aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins_overview.py2
-rw-r--r--sos/__init__.py8
-rw-r--r--sos/archive.py106
-rw-r--r--sos/cleaner/__init__.py96
-rw-r--r--sos/cleaner/archives/__init__.py31
-rw-r--r--sos/cleaner/mappings/hostname_map.py4
-rw-r--r--sos/cleaner/mappings/ip_map.py10
-rw-r--r--sos/cleaner/mappings/keyword_map.py2
-rw-r--r--sos/cleaner/mappings/username_map.py2
-rw-r--r--sos/collector/__init__.py139
-rw-r--r--sos/collector/clusters/__init__.py27
-rw-r--r--sos/collector/clusters/kubernetes.py2
-rw-r--r--sos/collector/clusters/ocp.py28
-rw-r--r--sos/collector/clusters/openstack.py4
-rw-r--r--sos/collector/clusters/ovirt.py20
-rw-r--r--sos/collector/clusters/pacemaker.py14
-rw-r--r--sos/collector/exceptions.py12
-rw-r--r--sos/collector/sosnode.py187
-rw-r--r--sos/collector/transports/__init__.py73
-rw-r--r--sos/collector/transports/control_persist.py38
-rw-r--r--sos/collector/transports/local.py4
-rw-r--r--sos/collector/transports/oc.py42
-rw-r--r--sos/component.py10
-rw-r--r--sos/help/__init__.py22
-rw-r--r--sos/missing.py6
-rw-r--r--sos/options.py31
-rw-r--r--sos/policies/__init__.py13
-rw-r--r--sos/policies/auth/__init__.py9
-rw-r--r--sos/policies/distros/__init__.py79
-rw-r--r--sos/policies/distros/redhat.py10
-rw-r--r--sos/policies/init_systems/__init__.py6
-rw-r--r--sos/policies/runtimes/__init__.py21
-rw-r--r--sos/policies/runtimes/crio.py8
-rw-r--r--sos/presets/__init__.py8
-rw-r--r--sos/report/__init__.py215
-rw-r--r--sos/utilities.py21
36 files changed, 630 insertions, 680 deletions
diff --git a/plugins_overview.py b/plugins_overview.py
index fcd0d9d6..520cf300 100644
--- a/plugins_overview.py
+++ b/plugins_overview.py
@@ -45,7 +45,7 @@ def add_valid_item(dest, item):
# method to find in `plugcontent` all items of given method (a_c_s/a_c_o/..)
# split by comma; add each valid item to the `dest` list
def add_all_items(method, dest, wrapopen=r'\(', wrapclose=r'\)'):
- regexp = "%s%s(.*?)%s" % (method, wrapopen, wrapclose)
+ regexp = f"{method}{wrapopen}(.*?){wrapclose}"
for match in re.findall(regexp, plugcontent, flags=re.MULTILINE | re.DOTALL):
# tuple of distros ended by either (class|from|import)
if isinstance(match, tuple):
diff --git a/sos/__init__.py b/sos/__init__.py
index 42912f7f..412a8906 100644
--- a/sos/__init__.py
+++ b/sos/__init__.py
@@ -111,9 +111,9 @@ class SoS():
_com_subparser = self.subparsers.add_parser(
comp,
aliases=self._components[comp][1],
- prog="sos %s" % comp
+ prog=f"sos {comp}"
)
- _com_subparser.usage = "sos %s [options]" % comp
+ _com_subparser.usage = f"sos {comp} [options]"
_com_subparser.register('action', 'extend', SosListOption)
self._add_common_options(_com_subparser)
self._components[comp][0].add_parser_options(parser=_com_subparser)
@@ -174,7 +174,7 @@ class SoS():
"""
_com = self.args.component
if _com not in self._components.keys():
- print("Unknown subcommand '%s' specified" % _com)
+ print(f"Unknown subcommand '{_com}' specified")
try:
_to_load = self._components[_com][0]
if _to_load.root_required and not os.getuid() == 0:
@@ -182,7 +182,7 @@ class SoS():
self._component = _to_load(self.parser, self.args, self.cmdline)
except Exception as err:
- print("Could not initialize '%s': %s" % (_com, err))
+ print(f"Could not initialize '{_com}': {err}")
if self.args.debug:
raise err
sys.exit(1)
diff --git a/sos/archive.py b/sos/archive.py
index 5b038cfa..2ec60f51 100644
--- a/sos/archive.py
+++ b/sos/archive.py
@@ -54,7 +54,7 @@ class Archive(object):
_path_lock = Lock()
def _format_msg(self, msg):
- return "[archive:%s] %s" % (self.archive_type(), msg)
+ return f"[archive:{self.archive_type()}] {msg}"
def set_debug(self, debug):
self._debug = debug
@@ -151,8 +151,8 @@ class FileCacheArchive(Archive):
self._archive_root = os.path.join(tmpdir, name)
with self._path_lock:
os.makedirs(self._archive_root, 0o700)
- self.log_info("initialised empty FileCacheArchive at '%s'" %
- (self._archive_root,))
+ self.log_info("initialised empty FileCacheArchive at "
+ f"'{self._archive_root}'")
def dest_path(self, name):
if os.path.isabs(name):
@@ -187,7 +187,7 @@ class FileCacheArchive(Archive):
or more symbolic links in intermediate components
of the path have altered the path destination.
"""
- self.log_debug("Making leading paths for %s" % src)
+ self.log_debug(f"Making leading paths for {src}")
root = self._archive_root
dest = src
@@ -227,7 +227,7 @@ class FileCacheArchive(Archive):
src_path = os.path.join(src_path, comp)
if not os.path.exists(abs_path):
- self.log_debug("Making path %s" % abs_path)
+ self.log_debug(f"Making path {abs_path}")
if os.path.islink(src_path) and os.path.isdir(src_path):
target = os.readlink(src_path)
@@ -248,11 +248,11 @@ class FileCacheArchive(Archive):
if os.path.isabs(target):
target = os.path.relpath(target, target_dir)
- self.log_debug("Making symlink '%s' -> '%s'" %
- (abs_path, target))
+ self.log_debug(f"Making symlink '{abs_path}' -> "
+ f"'{target}'")
os.symlink(target, abs_path)
else:
- self.log_debug("Making directory %s" % abs_path)
+ self.log_debug(f"Making directory {abs_path}")
os.mkdir(abs_path, mode)
dest = src_path
@@ -299,8 +299,8 @@ class FileCacheArchive(Archive):
# Check containing directory presence and path type
if os.path.exists(dest_dir) and not os.path.isdir(dest_dir):
- raise ValueError("path '%s' exists and is not a directory" %
- dest_dir)
+ raise ValueError(f"path '{dest_dir}' exists and is not a "
+ "directory")
elif not os.path.exists(dest_dir):
src_dir = src if path_type == P_DIR else os.path.split(src)[0]
self._make_leading_paths(src_dir)
@@ -344,8 +344,7 @@ class FileCacheArchive(Archive):
shutil.copystat(src, dest)
os.chown(dest, stat.st_uid, stat.st_gid)
except Exception as e:
- self.log_debug("caught '%s' setting attributes of '%s'"
- % (e, dest))
+ self.log_debug(f"caught '{e}' setting attributes of '{dest}'")
def add_file(self, src, dest=None, force=False):
with self._path_lock:
@@ -367,10 +366,10 @@ class FileCacheArchive(Archive):
if src.startswith("/sys/") or src.startswith("/proc/"):
pass
else:
- self.log_info("File %s not collected: '%s'" % (src, e))
+ self.log_info(f"File {src} not collected: '{e}'")
self._copy_attributes(src, dest)
- file_name = "'%s'" % src
+ file_name = f"'{src}'"
else:
# Open file case: first rewind the file to obtain
# everything written to it.
@@ -380,8 +379,8 @@ class FileCacheArchive(Archive):
f.write(line)
file_name = "open file"
- self.log_debug("added %s to FileCacheArchive '%s'" %
- (file_name, self._archive_root))
+ self.log_debug(f"added {file_name} to FileCacheArchive "
+ f"'{self._archive_root}'")
def add_string(self, content, dest, mode='w'):
with self._path_lock:
@@ -399,8 +398,8 @@ class FileCacheArchive(Archive):
f.write(content)
if os.path.exists(src):
self._copy_attributes(src, dest)
- self.log_debug("added string at '%s' to FileCacheArchive '%s'"
- % (src, self._archive_root))
+ self.log_debug(f"added string at '{src}' to FileCacheArchive "
+ f"'{self._archive_root}'")
def add_binary(self, content, dest):
with self._path_lock:
@@ -410,11 +409,11 @@ class FileCacheArchive(Archive):
with codecs.open(dest, 'wb', encoding=None) as f:
f.write(content)
- self.log_debug("added binary content at '%s' to archive '%s'"
- % (dest, self._archive_root))
+ self.log_debug(f"added binary content at '{dest}' to archive "
+ f"'{self._archive_root}'")
def add_link(self, source, link_name):
- self.log_debug("adding symlink at '%s' -> '%s'" % (link_name, source))
+ self.log_debug(f"adding symlink at '{link_name}' -> '{source}'")
with self._path_lock:
dest = self.check_path(link_name, P_LINK)
if not dest:
@@ -422,14 +421,14 @@ class FileCacheArchive(Archive):
if not os.path.lexists(dest):
os.symlink(source, dest)
- self.log_debug("added symlink at '%s' to '%s' in archive '%s'"
- % (dest, source, self._archive_root))
+ self.log_debug(f"added symlink at '{dest}' to '{source}' in "
+ f"archive '{self._archive_root}'")
# Follow-up must be outside the path lock: we recurse into
# other monitor methods that will attempt to reacquire it.
- self.log_debug("Link follow up: source=%s link_name=%s dest=%s" %
- (source, link_name, dest))
+ self.log_debug(f"Link follow up: source={source} link_name={link_name}"
+ f" dest={dest}")
source_dir = os.path.dirname(link_name)
host_path_name = os.path.realpath(os.path.join(source_dir, source))
@@ -468,21 +467,21 @@ class FileCacheArchive(Archive):
source = os.path.join(dest_dir, os.readlink(host_path_name))
source = os.path.relpath(source, dest_dir)
if is_loop(link_name, source):
- self.log_debug("Link '%s' - '%s' loops: skipping..." %
- (link_name, source))
+ self.log_debug(f"Link '{link_name}' - '{source}' loops: "
+ "skipping...")
return
- self.log_debug("Adding link %s -> %s for link follow up" %
- (link_name, source))
+ self.log_debug(f"Adding link {link_name} -> {source} for link "
+ "follow up")
self.add_link(source, link_name)
elif os.path.isdir(host_path_name):
- self.log_debug("Adding dir %s for link follow up" % source)
+ self.log_debug(f"Adding dir {source} for link follow up")
self.add_dir(host_path_name)
elif os.path.isfile(host_path_name):
- self.log_debug("Adding file %s for link follow up" % source)
+ self.log_debug(f"Adding file {source} for link follow up")
self.add_file(host_path_name)
else:
- self.log_debug("No link follow up: source=%s link_name=%s" %
- (source, link_name))
+ self.log_debug(f"No link follow up: source={source} "
+ f"link_name={link_name}")
def add_dir(self, path):
"""Create a directory in the archive.
@@ -504,7 +503,7 @@ class FileCacheArchive(Archive):
except OSError as e:
if e.errno == errno.EPERM:
msg = "Operation not permitted"
- self.log_info("add_node: %s - mknod '%s'" % (msg, dest))
+ self.log_info(f"add_node: {msg} - mknod '{dest}'")
return
raise e
self._copy_attributes(path, dest)
@@ -528,8 +527,8 @@ class FileCacheArchive(Archive):
Used by sos.sosreport to set up sos_* directories.
"""
os.makedirs(os.path.join(self._archive_root, path), mode=mode)
- self.log_debug("created directory at '%s' in FileCacheArchive '%s'"
- % (path, self._archive_root))
+ self.log_debug(f"created directory at '{path}' in FileCacheArchive "
+ f"'{self._archive_root}'")
def open_file(self, path):
path = self.dest_path(path)
@@ -600,25 +599,24 @@ class FileCacheArchive(Archive):
return replacements
def finalize(self, method):
- self.log_info("finalizing archive '%s' using method '%s'"
- % (self._archive_root, method))
+ self.log_info(f"finalizing archive '{self._archive_root}' using method"
+ f" '{method}'")
try:
res = self._build_archive(method)
except Exception as err:
- self.log_error("An error occurred compressing the archive: %s"
- % err)
+ self.log_error(f"An error occurred compressing the archive: {err}")
return self.name()
self.cleanup()
- self.log_info("built archive at '%s' (size=%d)" % (self._archive_name,
- os.stat(self._archive_name).st_size))
+ self.log_info(f"built archive at '{self._archive_name}' "
+ f"(size={os.stat(self._archive_name).st_size})")
if self.enc_opts['encrypt']:
try:
return self._encrypt(res)
except Exception as e:
exp_msg = "An error occurred encrypting the archive:"
- self.log_error("%s %s" % (exp_msg, e))
+ self.log_error(f"{exp_msg} {e}")
return res
else:
return res
@@ -637,20 +635,20 @@ class FileCacheArchive(Archive):
"""
arc_name = archive.replace("sosreport-", "secured-sosreport-")
arc_name += ".gpg"
- enc_cmd = "gpg --batch -o %s " % arc_name
+ enc_cmd = f"gpg --batch -o {arc_name} "
env = None
if self.enc_opts["key"]:
# need to assume a trusted key here to be able to encrypt the
# archive non-interactively
- enc_cmd += "--trust-model always -e -r %s " % self.enc_opts["key"]
+ enc_cmd += f"--trust-model always -e -r {self.enc_opts['key']} "
enc_cmd += archive
if self.enc_opts["password"]:
# prevent change of gpg options using a long password, but also
# prevent the addition of quote characters to the passphrase
- passwd = "%s" % self.enc_opts["password"].replace('\'"', '')
+ passwd = self.enc_opts['password'].replace('\'"', '')
env = {"sos_gpg": passwd}
enc_cmd += "-c --passphrase-fd 0 "
- enc_cmd = "/bin/bash -c \"echo $sos_gpg | %s\"" % enc_cmd
+ enc_cmd = f"/bin/bash -c \"echo $sos_gpg | {enc_cmd}\""
enc_cmd += archive
r = sos_get_command_output(enc_cmd, timeout=0, env=env)
if r["status"] == 0:
@@ -663,7 +661,7 @@ class FileCacheArchive(Archive):
else:
# TODO: report the actual error from gpg. Currently, we cannot as
# sos_get_command_output() does not capture stderr
- msg = "gpg exited with code %s" % r["status"]
+ msg = f"gpg exited with code {r['status']}"
raise Exception(msg)
@@ -684,8 +682,8 @@ class TarFileArchive(FileCacheArchive):
def set_tarinfo_from_stat(self, tar_info, fstat, mode=None):
tar_info.mtime = fstat.st_mtime
- tar_info.pax_headers['atime'] = "%.9f" % fstat.st_atime
- tar_info.pax_headers['ctime'] = "%.9f" % fstat.st_ctime
+ tar_info.pax_headers['atime'] = f"{fstat.st_atime:.9f}"
+ tar_info.pax_headers['ctime'] = f"{fstat.st_ctime:.9f}"
if mode:
tar_info.mode = mode
else:
@@ -721,7 +719,7 @@ class TarFileArchive(FileCacheArchive):
return None
def name(self):
- return "%s.%s" % (self._archive_root, self._suffix)
+ return f"{self._archive_root}.{self._suffix}"
def name_max(self):
# GNU Tar format supports unlimited file name length. Just return
@@ -732,14 +730,14 @@ class TarFileArchive(FileCacheArchive):
if method == 'auto':
method = 'xz' if find_spec('lzma') is not None else 'gzip'
_comp_mode = method.strip('ip')
- self._archive_name = self._archive_name + ".%s" % _comp_mode
+ self._archive_name = f"{self._archive_name}.{_comp_mode}"
# tarfile does not currently have a consistent way to define comnpress
# level for both xz and gzip ('preset' for xz, 'compresslevel' for gz)
if method == 'gzip':
kwargs = {'compresslevel': 6}
else:
kwargs = {'preset': 3}
- tar = tarfile.open(self._archive_name, mode="w:%s" % _comp_mode,
+ tar = tarfile.open(self._archive_name, mode=f"w:{_comp_mode}",
**kwargs)
# add commonly reviewed files first, so that they can be more easily
# read from memory without needing to extract the whole archive
@@ -755,7 +753,7 @@ class TarFileArchive(FileCacheArchive):
tar.add(self._archive_root, arcname=self._name,
filter=self.copy_permissions_filter)
tar.close()
- self._suffix += ".%s" % _comp_mode
+ self._suffix += f".{_comp_mode}"
return self.name()
diff --git a/sos/cleaner/__init__.py b/sos/cleaner/__init__.py
index c4fd53af..1379abf3 100644
--- a/sos/cleaner/__init__.py
+++ b/sos/cleaner/__init__.py
@@ -141,11 +141,10 @@ class SoSCleaner(SoSComponent):
for _loaded in self.parsers:
_loaded_name = _loaded.name.lower().split('parser')[0].strip()
if _parser.lower().strip() == _loaded_name:
- self.log_info("Disabling parser: %s" % _loaded_name)
+ self.log_info(f"Disabling parser: {_loaded_name}")
self.ui_log.warning(
- "Disabling the '%s' parser. Be aware that this may "
- "leave sensitive plain-text data in the archive."
- % _parser
+ f"Disabling the '{_parser}' parser. Be aware that this"
+ " may leave sensitive plain-text data in the archive."
)
self.parsers.remove(_loaded)
@@ -161,11 +160,11 @@ class SoSCleaner(SoSComponent):
]
self.nested_archive = None
- self.log_info("Cleaner initialized. From cmdline: %s"
- % self.from_cmdline)
+ self.log_info(
+ f"Cleaner initialized. From cmdline: {self.from_cmdline}")
def _fmt_log_msg(self, msg, caller=None):
- return "[cleaner%s] %s" % (":%s" % caller if caller else '', msg)
+ return f"[cleaner{f':{caller}' if caller else ''}] {msg}"
def log_debug(self, msg, caller=None):
self.soslog.debug(self._fmt_log_msg(msg, caller))
@@ -197,13 +196,13 @@ class SoSCleaner(SoSComponent):
_conf = {}
default_map = '/etc/sos/cleaner/default_mapping'
if os.path.isdir(self.opts.map_file):
- raise Exception("Requested map file %s is a directory"
- % self.opts.map_file)
+ raise Exception(f"Requested map file {self.opts.map_file} is a "
+ "directory")
if not os.path.exists(self.opts.map_file):
if self.opts.map_file != default_map:
self.log_error(
- "ERROR: map file %s does not exist, will not load any "
- "obfuscation matches" % self.opts.map_file)
+ f"ERROR: map file {self.opts.map_file} does not exist, "
+ "will not load any obfuscation matches")
else:
with open(self.opts.map_file, 'r') as mf:
try:
@@ -212,8 +211,8 @@ class SoSCleaner(SoSComponent):
self.log_error("ERROR: Unable to parse map file, json is "
"malformed. Will not load any mappings.")
except Exception as err:
- self.log_error("ERROR: Could not load '%s': %s"
- % (self.opts.map_file, err))
+ self.log_error("ERROR: Could not load "
+ f"'{self.opts.map_file}': {err}")
return _conf
def print_disclaimer(self):
@@ -234,7 +233,7 @@ Users should review any resulting data and/or archives generated or processed \
by this utility for remaining sensitive content before being passed to a \
third party.
""")
- self.ui_log.info("\nsos clean (version %s)\n" % __version__)
+ self.ui_log.info(f"\nsos clean (version {__version__})\n")
self.ui_log.info(msg)
if not self.opts.batch:
try:
@@ -362,8 +361,8 @@ third party.
self.print_disclaimer()
self.report_paths = []
if not os.path.exists(self.opts.target):
- self.ui_log.error("Invalid target: no such file or directory %s"
- % self.opts.target)
+ self.ui_log.error("Invalid target: no such file or directory "
+ f"{self.opts.target}")
self._exit(1)
self.inspect_target_archive()
@@ -389,8 +388,8 @@ third party.
self.ui_log.info("No reports obfuscated, aborting...\n")
self._exit(1)
- self.ui_log.info("\nSuccessfully obfuscated %s report(s)\n"
- % len(self.completed_reports))
+ self.ui_log.info("\nSuccessfully obfuscated "
+ f"{len(self.completed_reports)} report(s)\n")
_map = self.compile_mapping_dict()
map_path = self.write_map_for_archive(_map)
@@ -410,7 +409,7 @@ third party.
checksum = self.get_new_checksum(arc.final_archive_path)
if checksum is not None:
chksum_name = self.obfuscate_string(
- "%s.%s" % (arc_path.split('/')[-1], self.hash_name)
+ f"{arc_path.split('/')[-1]}.{self.hash_name}"
)
with open(os.path.join(self.sys_tmp, chksum_name), 'w') as cf:
cf.write(checksum)
@@ -452,7 +451,7 @@ third party.
arc_dest = archive.final_archive_path.split('/')[-1]
checksum = self.get_new_checksum(archive.final_archive_path)
if checksum is not None:
- dname = "checksums/%s.%s" % (arc_dest, self.hash_name)
+ dname = f"checksums/{arc_dest}.{self.hash_name}"
self.archive.add_string(checksum, dest=dname)
for dirn, dirs, files in os.walk(self.nested_archive.extracted_path):
for filename in files:
@@ -490,11 +489,11 @@ third party.
try:
map_path = os.path.join(
self.sys_tmp,
- self.obfuscate_string("%s-private_map" % self.arc_name)
+ self.obfuscate_string(f"{self.arc_name}-private_map")
)
return self.write_map_to_file(_map, map_path)
except Exception as err:
- self.log_error("Could not write private map file: %s" % err)
+ self.log_error(f"Could not write private map file: {err}")
return None
def write_map_for_config(self, _map):
@@ -509,10 +508,9 @@ third party.
try:
os.makedirs(cleaner_dir, exist_ok=True)
self.write_map_to_file(_map, self.opts.map_file)
- self.log_debug("Wrote mapping to %s" % self.opts.map_file)
+ self.log_debug(f"Wrote mapping to {self.opts.map_file}")
except Exception as err:
- self.log_error("Could not update mapping config file: %s"
- % err)
+ self.log_error(f"Could not update mapping config file: {err}")
def write_cleaner_log(self, archive=False):
"""When invoked via the command line, the logging from SoSCleaner will
@@ -520,7 +518,7 @@ third party.
separately to disk
"""
log_name = os.path.join(
- self.sys_tmp, "%s-obfuscation.log" % self.arc_name
+ self.sys_tmp, f"{self.arc_name}-obfuscation.log"
)
with open(log_name, 'w') as logfile:
self.sos_log_file.seek(0)
@@ -546,7 +544,7 @@ third party.
digest.update(hashdata)
return digest.hexdigest() + '\n'
except Exception as err:
- self.log_debug("Could not generate new checksum: %s" % err)
+ self.log_debug(f"Could not generate new checksum: {err}")
return None
def obfuscate_report_paths(self):
@@ -558,8 +556,8 @@ third party.
"""
try:
msg = (
- "Found %s total reports to obfuscate, processing up to %s "
- "concurrently\n" % (len(self.report_paths), self.opts.jobs)
+ f"Found {len(self.report_paths)} total reports to obfuscate, "
+ f"processing up to {self.opts.jobs} concurrently\n"
)
self.ui_log.info(msg)
if self.opts.keep_binary_files:
@@ -698,19 +696,18 @@ third party.
if count:
archive.update_sub_count(short_name, count)
except Exception as err:
- self.log_debug("Unable to parse file %s: %s"
- % (short_name, err))
+ self.log_debug(f"Unable to parse file {short_name}: {err}")
try:
self.obfuscate_directory_names(archive)
except Exception as err:
- self.log_info("Failed to obfuscate directories: %s" % err,
+ self.log_info(f"Failed to obfuscate directories: {err}",
caller=archive.archive_name)
try:
self.obfuscate_symlinks(archive)
except Exception as err:
- self.log_info("Failed to obfuscate symlinks: %s" % err,
+ self.log_info(f"Failed to obfuscate symlinks: {err}",
caller=archive.archive_name)
# if the archive was already a tarball, repack it
@@ -724,10 +721,10 @@ third party.
)
archive.compress(method)
except Exception as err:
- self.log_debug("Archive %s failed to compress: %s"
- % (archive.archive_name, err))
- archive.report_msg("Failed to re-compress archive: %s"
- % err)
+ self.log_debug(f"Archive {archive.archive_name} failed"
+ f" to compress: {err}")
+ archive.report_msg(
+ f"Failed to re-compress archive: {err}")
return
self.completed_reports.append(archive)
@@ -740,11 +737,11 @@ third party.
if archive.removed_file_count:
rmsg = " [removed %s unprocessable files]"
rmsg = rmsg % archive.removed_file_count
- archive.report_msg("Obfuscation completed%s" % rmsg)
+ archive.report_msg(f"Obfuscation completed{rmsg}")
except Exception as err:
- self.ui_log.info("Exception while processing %s: %s"
- % (archive.archive_name, err))
+ self.ui_log.info("Exception while processing "
+ f"{archive.archive_name}: {err}")
def obfuscate_file(self, filename, short_name=None, arc_name=None):
"""Obfuscate and individual file, line by line.
@@ -769,7 +766,7 @@ third party.
if not os.path.islink(filename):
# don't run the obfuscation on the link, but on the actual file
# at some other point.
- self.log_debug("Obfuscating %s" % short_name or filename,
+ self.log_debug(f"Obfuscating {short_name or filename}",
caller=arc_name)
tfile = tempfile.NamedTemporaryFile(mode='w', dir=self.tmpdir)
_parsers = [
@@ -785,8 +782,8 @@ third party.
subs += count
tfile.write(line)
except Exception as err:
- self.log_debug("Unable to obfuscate %s: %s"
- % (short_name, err), caller=arc_name)
+ self.log_debug(f"Unable to obfuscate {short_name}: "
+ f"{err}", caller=arc_name)
tfile.seek(0)
if subs:
shutil.copyfile(tfile.name, filename)
@@ -831,7 +828,7 @@ third party.
try:
# relative name of the symlink in the archive
_sym = symlink.split(archive.extracted_path)[1].lstrip('/')
- self.log_debug("Obfuscating symlink %s" % _sym,
+ self.log_debug(f"Obfuscating symlink {_sym}",
caller=archive.archive_name)
# current target of symlink, again relative to the archive
_target = os.readlink(symlink)
@@ -848,15 +845,14 @@ third party.
os.remove(symlink)
os.symlink(_ob_target, _ob_sym_name)
except Exception as err:
- self.log_info("Error obfuscating symlink '%s': %s"
- % (symlink, err))
+ self.log_info(f"Error obfuscating symlink '{symlink}': {err}")
def obfuscate_directory_names(self, archive):
"""For all directories that exist within the archive, obfuscate the
directory name if it contains sensitive strings found during execution
"""
- self.log_info("Obfuscating directory names in archive %s"
- % archive.archive_name)
+ self.log_info("Obfuscating directory names in archive "
+ f"{archive.archive_name}")
for dirpath in sorted(archive.get_directory_list(), reverse=True):
for _name in os.listdir(dirpath):
_dirname = os.path.join(dirpath, _name)
@@ -877,7 +873,7 @@ third party.
try:
string_data = parser.parse_string_for_keys(string_data)
except Exception as err:
- self.log_info("Error obfuscating string data: %s" % err)
+ self.log_info(f"Error obfuscating string data: {err}")
return string_data
def obfuscate_line(self, line, parsers=None):
@@ -905,7 +901,7 @@ third party.
line, _count = parser.parse_line(line)
count += _count
except Exception as err:
- self.log_debug("failed to parse line: %s" % err, parser.name)
+ self.log_debug(f"failed to parse line: {err}", parser.name)
return line, count
def write_stats_to_manifest(self):
diff --git a/sos/cleaner/archives/__init__.py b/sos/cleaner/archives/__init__.py
index 0fa1ef43..35dc7b8e 100644
--- a/sos/cleaner/archives/__init__.py
+++ b/sos/cleaner/archives/__init__.py
@@ -65,8 +65,7 @@ class SoSObfuscationArchive():
self._load_self()
self.archive_root = ''
self.log_info(
- "Loaded %s as type %s"
- % (self.archive_path, self.description)
+ f"Loaded {self.archive_path} as type {self.description}"
)
@classmethod
@@ -112,7 +111,7 @@ class SoSObfuscationArchive():
self.ui_log.info(f"{self.ui_name + ' :':<50} {msg}")
def _fmt_log_msg(self, msg):
- return "[cleaner:%s] %s" % (self.archive_name, msg)
+ return f"[cleaner:{self.archive_name}] {msg}"
def log_debug(self, msg):
self.soslog.debug(self._fmt_log_msg(msg))
@@ -148,7 +147,7 @@ class SoSObfuscationArchive():
full_fname = self.get_file_path(fname)
# don't call a blank remove() here
if full_fname:
- self.log_info("Removing binary file '%s' from archive" % fname)
+ self.log_info(f"Removing binary file '{fname}' from archive")
os.remove(full_fname)
self.removed_file_count += 1
@@ -175,7 +174,7 @@ class SoSObfuscationArchive():
return self.tarobj.extractfile(filename).read().decode('utf-8')
except KeyError:
self.log_debug(
- "Unable to retrieve %s: no such file in archive" % fname
+ f"Unable to retrieve {fname}: no such file in archive"
)
return ''
else:
@@ -215,13 +214,13 @@ class SoSObfuscationArchive():
if (not os.access(fname, os.R_OK) or not
os.access(fname, os.W_OK)):
self.log_debug(
- "Adding owner rw permissions to %s"
- % fname.split(self.archive_path)[-1]
+ "Adding owner rw permissions to "
+ f"{fname.split(self.archive_path)[-1]}"
)
os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR)
except Exception as err:
- self.log_debug("Error while trying to set perms: %s" % err)
- self.log_debug("Extracted path is %s" % self.extracted_path)
+ self.log_debug(f"Error while trying to set perms: {err}")
+ self.log_debug(f"Extracted path is {self.extracted_path}")
def rename_top_dir(self, new_name):
"""Rename the top-level directory to new_name, which should be an
@@ -251,13 +250,13 @@ class SoSObfuscationArchive():
tarpath = self.extracted_path + '-obfuscated.tar'
compr_args = {}
if method:
- mode += ":%s" % method
- tarpath += ".%s" % method
+ mode += f":{method}"
+ tarpath += f".{method}"
if method == 'xz':
compr_args = {'preset': 3}
else:
compr_args = {'compresslevel': 6}
- self.log_debug("Building tar file %s" % tarpath)
+ self.log_debug(f"Building tar file {tarpath}")
tar = tarfile.open(tarpath, mode=mode, **compr_args)
tar.add(self.extracted_path,
arcname=os.path.split(self.archive_name)[1])
@@ -271,13 +270,13 @@ class SoSObfuscationArchive():
try:
self.final_archive_path = self.build_tar_file(method)
except Exception as err:
- self.log_debug("Exception while re-compressing archive: %s" % err)
+ self.log_debug(f"Exception while re-compressing archive: {err}")
raise
- self.log_debug("Compressed to %s" % self.final_archive_path)
+ self.log_debug(f"Compressed to {self.final_archive_path}")
try:
self.remove_extracted_path()
except Exception as err:
- self.log_debug("Failed to remove extraction directory: %s" % err)
+ self.log_debug(f"Failed to remove extraction directory: {err}")
self.report_msg('Failed to remove temporary extraction directory')
def remove_extracted_path(self):
@@ -291,7 +290,7 @@ class SoSObfuscationArchive():
os.remove(name)
else:
shutil.rmtree(name)
- self.log_debug("Removing %s" % self.extracted_path)
+ self.log_debug(f"Removing {self.extracted_path}")
# pylint: disable-next=deprecated-argument
shutil.rmtree(self.extracted_path, onerror=force_delete_file)
diff --git a/sos/cleaner/mappings/hostname_map.py b/sos/cleaner/mappings/hostname_map.py
index dbb97d98..659ba4af 100644
--- a/sos/cleaner/mappings/hostname_map.py
+++ b/sos/cleaner/mappings/hostname_map.py
@@ -226,7 +226,7 @@ class SoSHostnameMap(SoSMap):
if not hostname or hostname in self.skip_keys:
return hostname
if hostname not in self.dataset:
- ob_host = "host%s" % self.host_count
+ ob_host = f"host{self.host_count}"
self.hosts[hostname] = ob_host
self.host_count += 1
self.dataset[hostname] = ob_host
@@ -252,6 +252,6 @@ class SoSHostnameMap(SoSMap):
"""Generate an obfuscated domain for each subdomain name given
"""
if dname not in self._domains:
- self._domains[dname] = "obfuscateddomain%s" % self.domain_count
+ self._domains[dname] = f"obfuscateddomain{self.domain_count}"
self.domain_count += 1
return self._domains[dname]
diff --git a/sos/cleaner/mappings/ip_map.py b/sos/cleaner/mappings/ip_map.py
index 55a841a5..11d779b5 100644
--- a/sos/cleaner/mappings/ip_map.py
+++ b/sos/cleaner/mappings/ip_map.py
@@ -156,7 +156,7 @@ class SoSIPMap(SoSMap):
if not self.ip_in_dataset(_ip):
# the ipaddress module does not assign the network's
# netmask to hosts in the hosts() generator for some reason
- return "%s/%s" % (str(_ip), _obf_network.prefixlen)
+ return f"{str(_ip)}/{_obf_network.prefixlen}"
# ip is a single ip address without the netmask
return self._new_obfuscated_single_address()
@@ -166,7 +166,7 @@ class SoSIPMap(SoSMap):
_octets = []
for i in range(0, 4):
_octets.append(random.randint(11, 99))
- return "%s.%s.%s.%s" % tuple(_octets)
+ return f"{_octets[0]}.{_octets[1]}.{_octets[2]}.{_octets[3]}"
_addr = _gen_address()
if _addr in self.dataset.values():
@@ -187,11 +187,9 @@ class SoSIPMap(SoSMap):
if isinstance(network, ipaddress.IPv4Network):
if self.network_first_octet in self.skip_network_octets:
self.network_first_octet += 1
- _obf_address = "%s.0.0.0" % self.network_first_octet
+ _obf_address = f"{self.network_first_octet}.0.0.0"
_obf_mask = network.with_netmask.split('/')[1]
- _obf_network = ipaddress.IPv4Network(
- "%s/%s" % (_obf_address, _obf_mask)
- )
+ _obf_network = ipaddress.IPv4Network(f"{_obf_address}/{_obf_mask}")
self.network_first_octet += 1
if isinstance(network, ipaddress.IPv6Network):
diff --git a/sos/cleaner/mappings/keyword_map.py b/sos/cleaner/mappings/keyword_map.py
index 1fd1a73a..cbeb0c4e 100644
--- a/sos/cleaner/mappings/keyword_map.py
+++ b/sos/cleaner/mappings/keyword_map.py
@@ -25,7 +25,7 @@ class SoSKeywordMap(SoSMap):
word_count = 0
def sanitize_item(self, item):
- _ob_item = "obfuscatedword%s" % self.word_count
+ _ob_item = f"obfuscatedword{self.word_count}"
self.word_count += 1
if _ob_item in self.dataset.values():
return self.sanitize_item(item)
diff --git a/sos/cleaner/mappings/username_map.py b/sos/cleaner/mappings/username_map.py
index f6eedb34..f03dd510 100644
--- a/sos/cleaner/mappings/username_map.py
+++ b/sos/cleaner/mappings/username_map.py
@@ -27,7 +27,7 @@ class SoSUsernameMap(SoSMap):
def sanitize_item(self, username):
"""Obfuscate a new username not currently found in the map
"""
- ob_name = "obfuscateduser%s" % self.name_count
+ ob_name = f"obfuscateduser{self.name_count}"
self.name_count += 1
if ob_name in self.dataset.values():
return self.sanitize_item(username.lower())
diff --git a/sos/collector/__init__.py b/sos/collector/__init__.py
index 023389d1..7d63e5e9 100644
--- a/sos/collector/__init__.py
+++ b/sos/collector/__init__.py
@@ -186,9 +186,9 @@ class SoSCollector(SoSComponent):
try:
self.parse_node_strings()
self.parse_cluster_options()
- self.log_debug('Executing %s' % ' '.join(s for s in sys.argv))
- self.log_debug("Found cluster profiles: %s"
- % self.clusters.keys())
+ self.log_debug(f'Executing {" ".join(s for s in sys.argv)}')
+ self.log_debug(
+ f"Found cluster profiles: {self.clusters.keys()}")
self.verify_cluster_options()
except KeyboardInterrupt:
@@ -236,7 +236,7 @@ class SoSCollector(SoSComponent):
if '__' in pyfile:
continue
fname, ext = os.path.splitext(pyfile)
- modname = 'sos.collector.%s.%s' % (modulename, fname)
+ modname = f'sos.collector.{modulename}.{fname}'
modules.extend(cls._import_modules(modname))
return modules
@@ -601,8 +601,8 @@ class SoSCollector(SoSComponent):
opt.value = self._validate_option(option, opt)
break
if not match:
- self.exit('Unknown cluster option provided: %s.%s'
- % (opt.cluster, opt.name), 1)
+ self.exit('Unknown cluster option provided: '
+ f'{opt.cluster}.{opt.name}', 1)
def _validate_option(self, default, cli):
"""Checks to make sure that the option given on the CLI is valid.
@@ -645,7 +645,7 @@ class SoSCollector(SoSComponent):
def log_debug(self, msg):
"""Log debug message to both console and log file"""
caller = inspect.stack()[1][3]
- msg = '[sos_collector:%s] %s' % (caller, msg)
+ msg = f'[sos_collector:{caller}] {msg}'
self.soslog.debug(msg)
def list_options(self):
@@ -696,9 +696,9 @@ class SoSCollector(SoSComponent):
"""Generates a name for the tarball archive"""
nstr = 'sos-collector'
if self.opts.label:
- nstr += '-%s' % self.opts.label
+ nstr += f'-{self.opts.label}'
if self.opts.case_id:
- nstr += '-%s' % self.opts.case_id
+ nstr += f'-{self.opts.case_id}'
dt = datetime.strftime(datetime.now(), '%Y-%m-%d')
try:
@@ -707,7 +707,7 @@ class SoSCollector(SoSComponent):
self.log_debug(f"Could not cast to ascii_lowercase: {err}")
rand = ''.join(random.choice(string.lowercase) for x in range(5))
- return '%s-%s-%s' % (nstr, dt, rand)
+ return f'{nstr}-{dt}-{rand}'
def _get_archive_path(self):
"""Returns the path, including filename, of the tarball we build
@@ -737,7 +737,7 @@ class SoSCollector(SoSComponent):
grp = self.opts.group
paths = [
grp,
- os.path.join(Path.home(), '.config/sos/groups.d/%s' % grp),
+ os.path.join(Path.home(), f'.config/sos/groups.d/{grp}'),
os.path.join(COLLECTOR_CONFIG_DIR, grp)
]
@@ -747,19 +747,19 @@ class SoSCollector(SoSComponent):
fname = path
break
if fname is None:
- raise OSError("no group definition for %s" % grp)
+ raise OSError(f"no group definition for {grp}")
- self.log_debug("Loading host group %s" % fname)
+ self.log_debug(f"Loading host group {fname}")
with open(fname, 'r') as hf:
_group = json.load(hf)
for key in ['primary', 'cluster_type']:
if _group[key]:
- self.log_debug("Setting option '%s' to '%s' per host group"
- % (key, _group[key]))
+ self.log_debug(f"Setting option '{key}' to '{_group[key]}'"
+ "per host group")
setattr(self.opts, key, _group[key])
if _group['nodes']:
- self.log_debug("Adding %s to node list" % _group['nodes'])
+ self.log_debug(f"Adding {_group['nodes']} to node list")
self.opts.nodes.extend(_group['nodes'])
def write_host_group(self):
@@ -802,8 +802,8 @@ class SoSCollector(SoSComponent):
self.opts.primary))
and not self.opts.batch):
self.log_debug('password specified, not using SSH keys')
- msg = ('Provide the SSH password for user %s: '
- % self.opts.ssh_user)
+ msg = ('Provide the SSH password for user '
+ f'{self.opts.ssh_user}: ')
self.opts.password = getpass(prompt=msg)
if ((self.commons['need_sudo'] and not self.opts.nopasswd_sudo)
@@ -812,8 +812,8 @@ class SoSCollector(SoSComponent):
self.log_debug('non-root user specified, will request '
'sudo password')
msg = ('A non-root user has been provided. Provide sudo '
- 'password for %s on remote nodes: '
- % self.opts.ssh_user)
+ f'password for {self.opts.ssh_user} on remote '
+ 'nodes: ')
self.opts.sudo_pw = getpass(prompt=msg)
else:
if not self.opts.nopasswd_sudo:
@@ -829,8 +829,8 @@ class SoSCollector(SoSComponent):
"to become root remotely.")
self.exit(msg, 1)
self.log_debug('non-root user asking to become root remotely')
- msg = ('User %s will attempt to become root. '
- 'Provide root password: ' % self.opts.ssh_user)
+ msg = (f'User {self.opts.ssh_user} will attempt to become '
+ 'root. Provide root password: ')
self.opts.root_password = getpass(prompt=msg)
self.commons['need_sudo'] = False
else:
@@ -842,8 +842,8 @@ class SoSCollector(SoSComponent):
try:
self._load_group_config()
except Exception as err:
- msg = ("Could not load specified group %s: %s"
- % (self.opts.group, err))
+ msg = (f"Could not load specified group {self.opts.group}: "
+ f"{err}")
self.exit(msg, 1)
try:
@@ -883,8 +883,8 @@ class SoSCollector(SoSComponent):
local_sudo=local_sudo,
load_facts=can_run_local)
except Exception as err:
- self.log_debug("Unable to determine local installation: %s" %
- err)
+ self.log_debug("Unable to determine local installation: "
+ f"{err}")
self.exit('Unable to determine local installation. Use the '
'--no-local option if localhost should not be '
'included.\nAborting...\n', 1)
@@ -923,8 +923,9 @@ class SoSCollector(SoSComponent):
self.cluster.setup()
if self.cluster.cluster_ssh_key:
if not self.opts.ssh_key:
- self.log_debug("Updating SSH key to %s per cluster"
- % self.cluster.cluster_ssh_key)
+ self.log_debug(
+ f"Updating SSH key to {self.cluster.cluster_ssh_key} "
+ "per cluster")
self.opts.ssh_key = self.cluster.cluster_ssh_key
self.get_nodes()
@@ -932,9 +933,9 @@ class SoSCollector(SoSComponent):
gname = self.opts.save_group
try:
fname = self.write_host_group()
- self.log_info("Wrote group '%s' to %s" % (gname, fname))
+ self.log_info(f"Wrote group '{gname}' to {fname}")
except Exception as err:
- self.log_error("Could not save group %s: %s" % (gname, err))
+ self.log_error(f"Could not save group {gname}: {err}")
def display_nodes(self):
"""Prints a list of nodes to collect from, if available. If no nodes
@@ -950,11 +951,12 @@ class SoSCollector(SoSComponent):
if self.primary.connected and self.primary.hostname is not None:
if not ((self.primary.local and self.opts.no_local)
or self.cluster.strict_node_list):
- self.ui_log.info('\t%-*s' % (self.commons['hostlen'],
- self.primary.hostname))
+ self.ui_log.info(
+ f"\t{self.primary.hostname:<{self.commons['hostlen']}}"
+ )
for node in sorted(self.node_list):
- self.ui_log.info("\t%-*s" % (self.commons['hostlen'], node))
+ self.ui_log.info(f"\t{node:<{self.commons['hostlen']}}")
self.ui_log.info('')
if not self.opts.batch:
@@ -1004,10 +1006,10 @@ class SoSCollector(SoSComponent):
"""
try:
self.primary = SosNode(self.opts.primary, self.commons)
- self.ui_log.info('Connected to %s, determining cluster type...'
- % self.opts.primary)
+ self.ui_log.info(f'Connected to {self.opts.primary}, determining '
+ 'cluster type...')
except Exception as e:
- self.log_debug('Failed to connect to primary node: %s' % e)
+ self.log_debug(f'Failed to connect to primary node: {e}')
self.exit('Could not connect to primary node. Aborting...', 1)
def determine_cluster(self):
@@ -1025,34 +1027,33 @@ class SoSCollector(SoSComponent):
cluster.primary = self.primary
if cluster.check_enabled():
cname = cluster.__class__.__name__
- self.log_debug("Installation matches %s, checking for layered "
- "profiles" % cname)
+ self.log_debug(f"Installation matches {cname}, checking for "
+ "layered profiles")
for remaining in checks:
if issubclass(remaining.__class__, cluster.__class__):
rname = remaining.__class__.__name__
- self.log_debug("Layered profile %s found. "
- "Checking installation"
- % rname)
+ self.log_debug(f"Layered profile {rname} found. "
+ "Checking installation")
remaining.primary = self.primary
if remaining.check_enabled():
self.log_debug("Installation matches both layered "
- "profile %s and base profile %s, "
- "setting cluster type to layered "
- "profile" % (rname, cname))
+ f"profile {rname} and base profile "
+ f"{cname}, setting cluster type to "
+ "layered profile")
cluster = remaining
break
self.cluster = cluster
self.cluster_type = cluster.name()
self.commons['cluster'] = self.cluster
self.ui_log.info(
- 'Cluster type set to %s' % self.cluster_type)
+ f'Cluster type set to {self.cluster_type}')
break
def get_nodes_from_cluster(self):
"""Collects the list of nodes from the determined cluster cluster"""
if self.cluster_type:
nodes = self.cluster._get_nodes()
- self.log_debug('Node list: %s' % nodes)
+ self.log_debug(f'Node list: {nodes}')
return nodes
return []
@@ -1072,7 +1073,7 @@ class SoSCollector(SoSComponent):
if n == self.primary.hostname or n == self.opts.primary:
self.node_list.remove(n)
self.node_list = list(set(n for n in self.node_list if n))
- self.log_debug('Node list reduced to %s' % self.node_list)
+ self.log_debug(f'Node list reduced to {self.node_list}')
self.collect_md.add_list('node_list', self.node_list)
def compare_node_to_regex(self, node):
@@ -1105,7 +1106,7 @@ class SoSCollector(SoSComponent):
else:
self.node_list = nodes
except Exception as e:
- self.log_debug("Error parsing node list: %s" % e)
+ self.log_debug(f"Error parsing node list: {e}")
self.log_debug('Setting node list to --nodes option')
self.node_list = self.opts.nodes
for node in self.node_list:
@@ -1118,7 +1119,7 @@ class SoSCollector(SoSComponent):
if any(i in node for i in '*\\?()/[]'):
continue
if node not in self.node_list:
- self.log_debug("Force adding %s to node list" % node)
+ self.log_debug(f"Force adding {node} to node list")
self.node_list.append(node)
if not self.primary:
@@ -1177,7 +1178,7 @@ organization before being passed to any third party.
No configuration changes will be made to the system running \
this utility or remote systems that it connects to.
""")
- self.ui_log.info("\nsos-collector (version %s)\n" % __version__)
+ self.ui_log.info(f"\nsos-collector (version {__version__})\n")
intro_msg = self._fmt_msg(disclaimer % self.tmpdir)
self.ui_log.info(intro_msg)
@@ -1230,8 +1231,8 @@ this utility or remote systems that it connects to.
if self.opts.password_per_node:
_nodes = []
for node in nodes:
- msg = ("Please enter the password for %s@%s: "
- % (self.opts.ssh_user, node[0]))
+ msg = (f"Please enter the password for {self.opts.ssh_user}@"
+ f"{node[0]}: ")
node_pwd = getpass(msg)
_nodes.append((node[0], node_pwd))
nodes = _nodes
@@ -1259,10 +1260,9 @@ this utility or remote systems that it connects to.
"Aborting...", 1
)
- self.ui_log.info("\nBeginning collection of sosreports from %s "
- "nodes, collecting a maximum of %s "
- "concurrently\n"
- % (self.report_num, self.opts.jobs))
+ self.ui_log.info("\nBeginning collection of sosreports from "
+ f"{self.report_num} nodes, collecting a maximum "
+ f"of {self.opts.jobs} concurrently\n")
npool = ThreadPoolExecutor(self.opts.jobs)
npool.map(self._finalize_sos_cmd, self.client_list, chunksize=1)
@@ -1274,7 +1274,7 @@ this utility or remote systems that it connects to.
except KeyboardInterrupt:
self.exit("Exiting on user cancel\n", 130, force=True)
except Exception as err:
- msg = 'Could not connect to nodes: %s' % err
+ msg = f'Could not connect to nodes: {err}'
self.exit(msg, 1, force=True)
if hasattr(self.cluster, 'run_extra_cmd'):
@@ -1297,7 +1297,7 @@ this utility or remote systems that it connects to.
self.policy.upload_archive(arc_name)
self.ui_log.info("Uploaded archive successfully")
except Exception as err:
- self.ui_log.error("Upload attempt failed: %s" % err)
+ self.ui_log.error(f"Upload attempt failed: {err}")
def _finalize_sos_cmd(self, client):
"""Calls finalize_sos_cmd() on each node so that we have the final
@@ -1306,8 +1306,8 @@ this utility or remote systems that it connects to.
try:
client.finalize_sos_cmd()
except Exception as err:
- self.log_error("Could not finalize sos command for %s: %s"
- % (client.address, err))
+ self.log_error("Could not finalize sos command for "
+ f"{client.address}: {err}")
def _collect(self, client):
"""Runs sosreport on each node"""
@@ -1320,13 +1320,13 @@ this utility or remote systems that it connects to.
if client.retrieved:
self.retrieved += 1
except Exception as err:
- self.log_error("Error running sosreport: %s" % err)
+ self.log_error(f"Error running sosreport: {err}")
def close_all_connections(self):
"""Close all sessions for nodes"""
for client in self.client_list:
if client.connected:
- self.log_debug('Closing connection to %s' % client.address)
+ self.log_debug(f'Closing connection to {client.address}')
client.disconnect()
def create_cluster_archive(self):
@@ -1355,8 +1355,7 @@ this utility or remote systems that it connects to.
map_file, arc_paths = cleaner.execute()
do_clean = True
except Exception as err:
- self.ui_log.error("ERROR: unable to obfuscate reports: %s"
- % err)
+ self.ui_log.error(f"ERROR: unable to obfuscate reports: {err}")
try:
self.log_info('Creating archive of sosreports...')
@@ -1411,18 +1410,18 @@ this utility or remote systems that it connects to.
# the temp dir it was constructed in
map_name = cleaner.obfuscate_string(
os.path.join(self.sys_tmp,
- "%s_private_map" % self.archive_name)
+ f"{self.archive_name}_private_map")
)
os.rename(map_file, map_name)
self.ui_log.info("A mapping of obfuscated elements is "
- "available at\n\t%s" % map_name)
+ f"available at\n\t{map_name}")
- self.soslog.info('Archive created as %s' % final_name)
+ self.soslog.info(f'Archive created as {final_name}')
self.ui_log.info('\nThe following archive has been created. '
'Please provide it to your support team.')
- self.ui_log.info('\t%s\n' % final_name)
+ self.ui_log.info(f'\t{final_name}\n')
return final_name
except Exception as err:
- msg = ("Could not finalize archive: %s\n\nData may still be "
- "available uncompressed at %s" % (err, self.archive_path))
+ msg = (f"Could not finalize archive: {err}\n\nData may still be "
+ f"available uncompressed at {self.archive_path}")
self.exit(msg, 2)
diff --git a/sos/collector/clusters/__init__.py b/sos/collector/clusters/__init__.py
index 5ee2baa6..5a993d85 100644
--- a/sos/collector/clusters/__init__.py
+++ b/sos/collector/clusters/__init__.py
@@ -95,8 +95,7 @@ class Cluster():
if cls is Cluster:
cls.display_self_help(section)
return
- section.set_title("%s Cluster Profile Detailed Help"
- % cls.cluster_name)
+ section.set_title(f"{cls.cluster_name} Cluster Profile Detailed Help")
if cls.__doc__ and cls.__doc__ is not Cluster.__doc__:
section.add_text(cls.__doc__)
# [1] here is the actual cluster profile
@@ -109,14 +108,14 @@ class Cluster():
if cls.packages:
section.add_text(
- "Enabled by the following packages: %s"
- % ', '.join(p for p in cls.packages),
+ "Enabled by the following packages: "
+ f"{', '.join(p for p in cls.packages)}",
newline=False
)
if cls.sos_preset:
section.add_text(
- "Uses the following sos preset: %s" % cls.sos_preset,
+ f"Uses the following sos preset: {cls.sos_preset}",
newline=False
)
@@ -126,24 +125,24 @@ class Cluster():
if cls.sos_plugins:
section.add_text(
- "Enables the following plugins: %s"
- % ', '.join(plug for plug in cls.sos_plugins),
+ "Enables the following plugins: "
+ f"{', '.join(plug for plug in cls.sos_plugins)}",
newline=False
)
if cls.sos_plugin_options:
_opts = cls.sos_plugin_options
- opts = ', '.join("%s=%s" % (opt, _opts[opt]) for opt in _opts)
+ opts = ', '.join(f"{opt}={_opts[opt]}" for opt in _opts)
section.add_text(
- "Sets the following plugin options: %s" % opts,
+ f"Sets the following plugin options: {opts}",
newline=False
)
if cls.option_list:
optsec = section.add_section("Available cluster options")
optsec.add_text(
- "These options may be toggled or changed using '%s'"
- % bold("-c %s.$option=$value" % cls.__name__)
+ "These options may be toggled or changed using "
+ f"'{bold(f'-c {cls.__name__}.$option=$value')}'"
)
optsec.add_text(
bold(
@@ -189,7 +188,7 @@ class Cluster():
newline=False
)
for cluster in clusters:
- _sec = bold("collect.clusters.%s" % cluster[0])
+ _sec = bold(f"collect.clusters.{cluster[0]}")
section.add_text(
f"{' ':>8}{_sec:<40}{cluster[1].cluster_name:<30}",
newline=False
@@ -204,7 +203,7 @@ class Cluster():
self.options.append(option)
def _fmt_msg(self, msg):
- return '[%s] %s' % (self.cluster_type[0], msg)
+ return f'[{self.cluster_type[0]}] {msg}'
def log_info(self, msg):
"""Used to print info messages"""
@@ -371,7 +370,7 @@ class Cluster():
try:
return self.format_node_list()
except Exception as e:
- self.log_debug('Failed to get node list: %s' % e)
+ self.log_debug(f'Failed to get node list: {e}')
return []
def get_node_label(self, node):
diff --git a/sos/collector/clusters/kubernetes.py b/sos/collector/clusters/kubernetes.py
index 0108f974..50f7fa42 100644
--- a/sos/collector/clusters/kubernetes.py
+++ b/sos/collector/clusters/kubernetes.py
@@ -34,7 +34,7 @@ class kubernetes(Cluster):
def get_nodes(self):
self.cmd += ' get nodes'
if self.get_option('label'):
- self.cmd += ' -l %s ' % quote(self.get_option('label'))
+ self.cmd += f' -l {quote(self.get_option("label"))} '
res = self.exec_primary_cmd(self.cmd)
if res['status'] == 0:
nodes = []
diff --git a/sos/collector/clusters/ocp.py b/sos/collector/clusters/ocp.py
index b1ab95c3..cc878770 100644
--- a/sos/collector/clusters/ocp.py
+++ b/sos/collector/clusters/ocp.py
@@ -91,19 +91,19 @@ class ocp(Cluster):
"Unable to to determine PATH for 'oc' command, "
"node enumeration may fail."
)
- self.log_debug("Locating 'oc' failed: %s"
- % _oc_path['output'])
+ self.log_debug(
+ f"Locating 'oc' failed: {_oc_path['output']}")
if self.get_option('kubeconfig'):
self._oc_cmd += " --kubeconfig " \
f"{self.get_option('kubeconfig')}"
- self.log_debug("oc base command set to %s" % self._oc_cmd)
+ self.log_debug(f"oc base command set to {self._oc_cmd}")
return self._oc_cmd
def fmt_oc_cmd(self, cmd):
"""Format the oc command to optionall include the kubeconfig file if
one is specified
"""
- return "%s %s" % (self.oc_cmd, cmd)
+ return f"{self.oc_cmd} {cmd}"
def _attempt_oc_login(self):
"""Attempt to login to the API using the oc command using a provided
@@ -140,15 +140,15 @@ class ocp(Cluster):
raise Exception("Insufficient permissions to create temporary "
"collection project.\nAborting...")
- self.log_info("Creating new temporary project '%s'" % self.project)
+ self.log_info(f"Creating new temporary project '{self.project}'")
ret = self.exec_primary_cmd(
- self.fmt_oc_cmd("new-project %s" % self.project)
+ self.fmt_oc_cmd(f"new-project {self.project}")
)
if ret['status'] == 0:
self._label_sos_project()
return True
- self.log_debug("Failed to create project: %s" % ret['output'])
+ self.log_debug(f"Failed to create project: {ret['output']}")
raise Exception("Failed to create temporary project for collection. "
"\nAborting...")
@@ -257,7 +257,7 @@ class ocp(Cluster):
cmd = 'get nodes -o wide'
if self.get_option('label'):
labels = ','.join(self.get_option('label').split(':'))
- cmd += " -l %s" % quote(labels)
+ cmd += f" -l {quote(labels)}"
res = self.exec_primary_cmd(self.fmt_oc_cmd(cmd))
if res['status'] == 0:
if self.get_option('role') == 'master':
@@ -318,7 +318,7 @@ class ocp(Cluster):
else:
_opt = 'no-oc'
_val = 'off' if use_api else 'on'
- node.plugopts.append("openshift.%s=%s" % (_opt, _val))
+ node.plugopts.append(f"openshift.{_opt}={_val}")
def set_primary_options(self, node):
@@ -351,7 +351,7 @@ class ocp(Cluster):
# cannot do remotely
if node.file_exists('/root/.kube/config', need_root=True):
_oc_cmd += ' --kubeconfig /host/root/.kube/config'
- can_oc = node.run_command("%s whoami" % _oc_cmd,
+ can_oc = node.run_command(f"{_oc_cmd} whoami",
use_container=node.host.containerized,
# container is available only to root
# and if rhel, need to run sos as root
@@ -370,14 +370,14 @@ class ocp(Cluster):
# if the with-api option is turned on
if not _kubeconfig == master_kube:
node.plugopts.append(
- "openshift.kubeconfig=%s" % _kubeconfig
+ f"openshift.kubeconfig={_kubeconfig}"
)
self._toggle_api_opt(node, True)
self.api_collect_enabled = True
if self.api_collect_enabled:
- msg = ("API collections will be performed on %s\nNote: API "
- "collections may extend runtime by 10s of minutes\n"
- % node.address)
+ msg = (f"API collections will be performed on {node.address}\n"
+ "Note: API collections may extend runtime by 10s of "
+ "minutes\n")
self.soslog.info(msg)
self.ui_log.info(msg)
diff --git a/sos/collector/clusters/openstack.py b/sos/collector/clusters/openstack.py
index a5e0aad9..c20ec069 100644
--- a/sos/collector/clusters/openstack.py
+++ b/sos/collector/clusters/openstack.py
@@ -55,7 +55,7 @@ class rhosp(Cluster):
try:
_inv = yaml.safe_load(self.primary.read_file(INVENTORY))
except Exception as err:
- self.log_info("Error parsing yaml: %s" % err)
+ self.log_info(f"Error parsing yaml: {err}")
raise Exception("Could not parse yaml for node addresses")
try:
for _t in ['Controller', 'Compute']:
@@ -64,5 +64,5 @@ class rhosp(Cluster):
for host in _inv[_t]['hosts'].keys():
_nodes.append(_inv[_t]['hosts'][host][_addr_field])
except Exception as err:
- self.log_error("Error getting %s host addresses: %s" % (_t, err))
+ self.log_error(f"Error getting {_t} host addresses: {err}")
return _nodes
diff --git a/sos/collector/clusters/ovirt.py b/sos/collector/clusters/ovirt.py
index d7a1b92b..1d358a9c 100644
--- a/sos/collector/clusters/ovirt.py
+++ b/sos/collector/clusters/ovirt.py
@@ -62,7 +62,7 @@ class ovirt(Cluster):
Wrapper for running DB queries on the manager. Any scrubbing of the
query should be done _before_ passing the query to this method.
'''
- cmd = "%s %s" % (self.db_exec, quote(query))
+ cmd = f"{self.db_exec} {quote(query)}"
return self.exec_primary_cmd(cmd, need_root=True)
def _sql_scrub(self, val):
@@ -75,8 +75,8 @@ class ovirt(Cluster):
invalid_chars = ['\x00', '\\', '\n', '\r', '\032', '"', '\'']
if any(x in invalid_chars for x in val):
- self.log_warn("WARNING: Cluster option \'%s\' contains invalid "
- "characters. Using '%%' instead." % val)
+ self.log_warn(f"WARNING: Cluster option \'{val}\' contains invalid"
+ " characters. Using '%%' instead.")
return '%'
return val
@@ -109,16 +109,16 @@ class ovirt(Cluster):
cluster = self._sql_scrub(self.get_option('cluster'))
datacenter = self._sql_scrub(self.get_option('datacenter'))
self.dbquery = ("SELECT host_name from vds where cluster_id in "
- "(select cluster_id FROM cluster WHERE name like '%s'"
- " and storage_pool_id in (SELECT id FROM storage_pool "
- "WHERE name like '%s'))" % (cluster, datacenter))
+ "(select cluster_id FROM cluster WHERE name like "
+ f"'{cluster}' and storage_pool_id in (SELECT id FROM "
+ f"storage_pool WHERE name like '{datacenter}'))")
if self.get_option('spm-only'):
# spm_status is an integer with the following meanings
# 0 - Normal (not SPM)
# 1 - Contending (SPM election in progress, but is not SPM)
# 2 - SPM
self.dbquery += ' AND spm_status = 2'
- self.log_debug('Query command for ovirt DB set to: %s' % self.dbquery)
+ self.log_debug(f'Query command for ovirt DB set to: {self.dbquery}')
def get_nodes(self):
if self.get_option('no-hypervisors'):
@@ -128,8 +128,8 @@ class ovirt(Cluster):
nodes = res['output'].splitlines()[2:-1]
return [n.split('(')[0].strip() for n in nodes]
else:
- raise Exception('database query failed, return code: %s'
- % res['status'])
+ raise Exception('database query failed, return code: '
+ f'{res["status"]}')
def run_extra_cmd(self):
if not self.get_option('no-database') and self.conf:
@@ -139,7 +139,7 @@ class ovirt(Cluster):
def parse_db_conf(self):
conf = {}
engconf = '/etc/ovirt-engine/engine.conf.d/10-setup-database.conf'
- res = self.exec_primary_cmd('cat %s' % engconf, need_root=True)
+ res = self.exec_primary_cmd(f'cat {engconf}', need_root=True)
if res['status'] == 0:
config = res['output'].splitlines()
for line in config:
diff --git a/sos/collector/clusters/pacemaker.py b/sos/collector/clusters/pacemaker.py
index bd3a832b..3c25fcaa 100644
--- a/sos/collector/clusters/pacemaker.py
+++ b/sos/collector/clusters/pacemaker.py
@@ -36,21 +36,21 @@ class pacemaker(Cluster):
self.get_nodes_from_crm()
except Exception as err:
self.log_warn("Falling back to sourcing corosync.conf. "
- "Could not parse crm_mon output: %s" % err)
+ f"Could not parse crm_mon output: {err}")
if not self.nodes:
# fallback to corosync.conf, in case the node we're inspecting
# is offline from the cluster
self.get_nodes_from_corosync()
except Exception as err:
- self.log_error("Could not determine nodes from cluster: %s" % err)
+ self.log_error(f"Could not determine nodes from cluster: {err}")
_shorts = [n for n in self.nodes if '.' not in n]
if _shorts:
self.log_warn(
- "WARNING: Node addresses '%s' may not resolve locally if you "
- "are not running on a node in the cluster. Try using option "
- "'-c pacemaker.only-corosync' if these connections fail."
- % ','.join(_shorts)
+ f"WARNING: Node addresses '{','.join(_shorts)}' may not "
+ "resolve locally if you are not running on a node in the "
+ "cluster. Try using option '-c pacemaker.only-corosync' if "
+ "these connections fail."
)
return self.nodes
@@ -68,7 +68,7 @@ class pacemaker(Cluster):
else:
return
_out = self.exec_primary_cmd(
- "crm_mon --one-shot --inactive %s" % xmlopt,
+ f"crm_mon --one-shot --inactive {xmlopt}",
need_root=True
)
if _out['status'] == 0:
diff --git a/sos/collector/exceptions.py b/sos/collector/exceptions.py
index e9edc249..cb1c2314 100644
--- a/sos/collector/exceptions.py
+++ b/sos/collector/exceptions.py
@@ -47,8 +47,8 @@ class ConnectionException(Exception):
"""Raised when an attempt to connect fails"""
def __init__(self, address='', port=''):
- message = ("Could not connect to host %s on specified port %s"
- % (address, port))
+ message = (f"Could not connect to host {address} on specified port "
+ f"{port}")
super(ConnectionException, self).__init__(message)
@@ -58,7 +58,7 @@ class CommandTimeoutException(Exception):
def __init__(self, command=None):
message = 'Timeout expired'
if command:
- message += " executing %s" % command
+ message += f" executing {command}"
super(CommandTimeoutException, self).__init__(message)
@@ -74,7 +74,7 @@ class ControlSocketMissingException(Exception):
"""Raised when the SSH control socket is missing"""
def __init__(self, path=''):
- message = "SSH control socket %s does not exist" % path
+ message = f"SSH control socket {path} does not exist"
super(ControlSocketMissingException, self).__init__(message)
@@ -99,8 +99,8 @@ class InvalidTransportException(Exception):
not supported locally"""
def __init__(self, transport=None):
- message = ("Connection failed: unknown or unsupported transport %s"
- % transport if transport else '')
+ message = ("Connection failed: unknown or unsupported transport "
+ f"{transport if transport else ''}")
super(InvalidTransportException, self).__init__(message)
diff --git a/sos/collector/sosnode.py b/sos/collector/sosnode.py
index fad0c378..1efd6e5b 100644
--- a/sos/collector/sosnode.py
+++ b/sos/collector/sosnode.py
@@ -84,7 +84,7 @@ class SosNode():
try:
self._transport.connect(self._password)
except Exception as err:
- self.log_error('Unable to open remote session: %s' % err)
+ self.log_error(f'Unable to open remote session: {err}')
raise
# load the host policy now, even if we don't want to load further
# host information. This is necessary if we're running locally on the
@@ -128,8 +128,8 @@ class SosNode():
return TRANSPORTS[self.opts.transport](self.address, commons)
elif self.opts.transport != 'auto':
self.log_error(
- "Connection failed: unknown or unsupported transport %s"
- % self.opts.transport
+ "Connection failed: unknown or unsupported transport "
+ f"{self.opts.transport}"
)
raise InvalidTransportException(self.opts.transport)
return SSHControlPersist(self.address, commons)
@@ -202,16 +202,16 @@ class SosNode():
ret = self.run_command(self.host.restart_sos_container(),
need_root=True)
if ret['status'] == 0:
- self.log_info("Temporary container %s created"
- % self.host.sos_container_name)
+ self.log_info("Temporary container "
+ f"{self.host.sos_container_name} created")
return True
else:
- self.log_error("Could not start container after create: %s"
- % ret['output'])
+ self.log_error("Could not start container after create: "
+ f"{ret['output']}")
raise Exception
else:
- self.log_error("Could not create container on host: %s"
- % res['output'])
+ self.log_error("Could not create container on host: "
+ f"{res['output']}")
raise Exception
return False
@@ -232,7 +232,7 @@ class SosNode():
def file_exists(self, fname, need_root=False):
"""Checks for the presence of fname on the remote node"""
try:
- res = self.run_command("stat %s" % fname, need_root=need_root)
+ res = self.run_command(f"stat {fname}", need_root=need_root)
return res['status'] == 0
except Exception:
return False
@@ -256,20 +256,20 @@ class SosNode():
def log_info(self, msg):
"""Used to print and log info messages"""
caller = inspect.stack()[1][3]
- lmsg = '[%s:%s] %s' % (self._hostname, caller, msg)
+ lmsg = f'[{self._hostname}:{caller}] {msg}'
self.soslog.info(lmsg)
def log_error(self, msg):
"""Used to print and log error messages"""
caller = inspect.stack()[1][3]
- lmsg = '[%s:%s] %s' % (self._hostname, caller, msg)
+ lmsg = f'[{self._hostname}:{caller}] {msg}'
self.soslog.error(lmsg)
def log_debug(self, msg):
"""Used to print and log debug messages"""
msg = self._sanitize_log_msg(msg)
caller = inspect.stack()[1][3]
- msg = '[%s:%s] %s' % (self._hostname, caller, msg)
+ msg = f'[{self._hostname}:{caller}] {msg}'
self.soslog.debug(msg)
def _format_cmd(self, cmd):
@@ -277,9 +277,9 @@ class SosNode():
here we prefix the command with the correct bits
"""
if self.opts.become_root:
- return "su -c %s" % quote(cmd)
+ return f"su -c {quote(cmd)}"
if self.need_sudo:
- return "sudo -S %s" % cmd
+ return f"sudo -S {cmd}"
return cmd
def _load_sos_info(self):
@@ -307,14 +307,14 @@ class SosNode():
# comparison by parse_version
ver += '.0'
try:
- ver += '-%s' % rel.split('.')[0]
+ ver += f'-{rel.split(".")[0]}'
except Exception as err:
- self.log_debug("Unable to fully parse sos release: %s" % err)
+ self.log_debug(f"Unable to fully parse sos release: {err}")
self.sos_info['version'] = ver
if self.sos_info['version']:
- self.log_info('sos version is %s' % self.sos_info['version'])
+ self.log_info(f'sos version is {self.sos_info["version"]}')
else:
if not self.address == self.opts.primary:
# in the case where the 'primary' enumerates nodes but is not
@@ -326,7 +326,7 @@ class SosNode():
# sos-4.0 changes the binary
if self.check_sos_version('4.0'):
self.sos_bin = 'sos report'
- cmd = "%s -l" % self.sos_bin
+ cmd = f"{self.sos_bin} -l"
sosinfo = self.run_command(cmd, use_container=True, need_root=True)
if sosinfo['status'] == 0:
self._load_sos_plugins(sosinfo['output'])
@@ -335,7 +335,7 @@ class SosNode():
return None
def _load_sos_presets(self):
- cmd = '%s --list-presets' % self.sos_bin
+ cmd = f'{self.sos_bin} --list-presets'
res = self.run_command(cmd, use_container=True, need_root=True)
if res['status'] == 0:
for line in res['output'].splitlines():
@@ -377,10 +377,10 @@ class SosNode():
def read_file(self, to_read):
"""Reads the specified file and returns the contents"""
try:
- self.log_info("Reading file %s" % to_read)
+ self.log_info(f"Reading file {to_read}")
return self._transport.read_file(to_read)
except Exception as err:
- self.log_error("Exception while reading %s: %s" % (to_read, err))
+ self.log_error(f"Exception while reading {to_read}: {err}")
return ''
def determine_host_policy(self):
@@ -388,15 +388,15 @@ class SosNode():
distributions
"""
if self.local:
- self.log_info("using local policy %s"
- % self.commons['policy'].distro)
+ self.log_info(
+ f"using local policy {self.commons['policy'].distro}")
return self.commons['policy']
host = load(cache={}, sysroot=self.opts.sysroot, init=InitSystem(),
probe_runtime=True,
remote_exec=self._transport.run_command,
remote_check=self.read_file('/etc/os-release'))
if host:
- self.log_info("loaded policy %s for host" % host.distro)
+ self.log_info(f"loaded policy {host.distro} for host")
return host
self.log_error('Unable to determine host installation. Ignoring node')
raise UnsupportedHostException
@@ -416,7 +416,7 @@ class SosNode():
_node_ver = self.sos_info['version']
return sos_parse_version(_node_ver) >= sos_parse_version(ver)
except Exception as err:
- self.log_error("Error checking sos version: %s" % err)
+ self.log_error(f"Error checking sos version: {err}")
return False
def is_installed(self, pkg):
@@ -453,7 +453,7 @@ class SosNode():
self.log_debug('Failed to reconnect to node')
raise ConnectionException
except Exception as err:
- self.log_debug("Error while trying to reconnect: %s" % err)
+ self.log_debug(f"Error while trying to reconnect: {err}")
raise
if use_container and self.host.containerized:
cmd = self.host.format_container_command(cmd)
@@ -536,9 +536,9 @@ class SosNode():
if not self.preset:
self.preset = self.cluster.sos_preset
else:
- self.log_info('Cluster specified preset %s but user has also '
- 'defined a preset. Using user specification.'
- % self.cluster.sos_preset)
+ self.log_info('Cluster specified preset '
+ f'{self.cluster.sos_preset} but user has also '
+ 'defined a preset. Using user specification.')
if self.cluster.sos_plugins:
for plug in self.cluster.sos_plugins:
if plug not in self.enable_plugins:
@@ -553,8 +553,7 @@ class SosNode():
if self.cluster.sos_plugin_options:
for opt in self.cluster.sos_plugin_options:
if not any(opt in o for o in self.plugopts):
- option = '%s=%s' % (opt,
- self.cluster.sos_plugin_options[opt])
+ option = f'{opt}={self.cluster.sos_plugin_options[opt]}'
self.plugopts.append(option)
# set primary-only options
@@ -582,7 +581,7 @@ class SosNode():
sos_cmd = self.sos_info['sos_cmd']
label = self.determine_sos_label()
if label:
- sos_cmd = '%s %s ' % (sos_cmd, quote(label))
+ sos_cmd = f'{sos_cmd} {quote(label)} '
sos_opts = []
@@ -590,13 +589,13 @@ class SosNode():
if self.check_sos_version('3.6'):
# 4 threads is the project's default
if self.opts.threads != 4:
- sos_opts.append('--threads=%s' % quote(str(self.opts.threads)))
+ sos_opts.append(f'--threads={quote(str(self.opts.threads))}')
# sos-3.7 added options
if self.check_sos_version('3.7'):
if self.opts.plugin_timeout:
- sos_opts.append('--plugin-timeout=%s'
- % quote(str(self.opts.plugin_timeout)))
+ sos_opts.append(
+ f'--plugin-timeout={quote(str(self.opts.plugin_timeout))}')
# sos-3.8 added options
if self.check_sos_version('3.8'):
@@ -607,33 +606,32 @@ class SosNode():
sos_opts.append('--no-env-vars')
if self.opts.since:
- sos_opts.append('--since=%s' % quote(self.opts.since))
+ sos_opts.append(f'--since={quote(self.opts.since)}')
if self.check_sos_version('4.1'):
if self.opts.skip_commands:
- sos_opts.append(
- '--skip-commands=%s' % (
- quote(','.join(self.opts.skip_commands)))
- )
+ sos_opts.append('--skip-commands='
+ f'{quote(",".join(self.opts.skip_commands))}')
if self.opts.skip_files:
sos_opts.append(
- '--skip-files=%s' % (quote(','.join(self.opts.skip_files)))
+ f'--skip-files={quote(",".join(self.opts.skip_files))}'
)
if self.check_sos_version('4.2'):
if self.opts.cmd_timeout:
- sos_opts.append('--cmd-timeout=%s'
- % quote(str(self.opts.cmd_timeout)))
+ sos_opts.append(
+ f'--cmd-timeout={quote(str(self.opts.cmd_timeout))}'
+ )
# handle downstream versions that backported this option
if self.check_sos_version('4.3') or self.check_sos_version('4.2-13'):
if self.opts.container_runtime != 'auto':
sos_opts.append(
- "--container-runtime=%s" % self.opts.container_runtime
+ f"--container-runtime={self.opts.container_runtime}"
)
if self.opts.namespaces:
sos_opts.append(
- "--namespaces=%s" % self.opts.namespaces
+ f"--namespaces={self.opts.namespaces}"
)
if self.check_sos_version('4.5.2'):
@@ -658,26 +656,26 @@ class SosNode():
if self._plugin_exists(o.split('.')[0])
and self._plugin_option_exists(o.split('=')[0])]
if opts:
- sos_opts.append('-k %s' % quote(','.join(o for o in opts)))
+ sos_opts.append(f'-k {quote(",".join(o for o in opts))}')
if self.preset:
if self._preset_exists(self.preset):
- sos_opts.append('--preset=%s' % quote(self.preset))
+ sos_opts.append(f'--preset={quote(self.preset)}')
else:
- self.log_debug('Requested to enable preset %s but preset does '
- 'not exist on node' % self.preset)
+ self.log_debug(f'Requested to enable preset {self.preset} but '
+ 'preset does not exist on node')
if self.only_plugins:
plugs = [o for o in self.only_plugins if self._plugin_exists(o)]
if len(plugs) != len(self.only_plugins):
not_only = list(set(self.only_plugins) - set(plugs))
- self.log_debug('Requested plugins %s were requested to be '
- 'enabled but do not exist' % not_only)
+ self.log_debug(f'Requested plugins {not_only} were requested '
+ 'to be enabled but do not exist')
only = self._fmt_sos_opt_list(self.only_plugins)
if only:
- sos_opts.append('--only-plugins=%s' % quote(only))
- self.sos_cmd = "%s %s" % (sos_cmd, ' '.join(sos_opts))
- self.log_info('Final sos command set to %s' % self.sos_cmd)
+ sos_opts.append(f'--only-plugins={quote(only)}')
+ self.sos_cmd = f"{sos_cmd} {' '.join(sos_opts)}"
+ self.log_info(f'Final sos command set to {self.sos_cmd}')
self.manifest.add_field('final_sos_command', self.sos_cmd)
return
@@ -686,11 +684,11 @@ class SosNode():
skip = [o for o in self.skip_plugins if self._check_enabled(o)]
if len(skip) != len(self.skip_plugins):
not_skip = list(set(self.skip_plugins) - set(skip))
- self.log_debug('Requested to skip plugins %s, but plugins are '
- 'already not enabled' % not_skip)
+ self.log_debug(f'Requested to skip plugins {not_skip}, but '
+ 'plugins are already not enabled')
skipln = self._fmt_sos_opt_list(skip)
if skipln:
- sos_opts.append('--skip-plugins=%s' % quote(skipln))
+ sos_opts.append(f'--skip-plugins={quote(skipln)}')
if self.enable_plugins:
# only run enable for plugins that are disabled
@@ -699,14 +697,14 @@ class SosNode():
and self._check_disabled(o) and self._plugin_exists(o)]
if len(opts) != len(self.enable_plugins):
not_on = list(set(self.enable_plugins) - set(opts))
- self.log_debug('Requested to enable plugins %s, but plugins '
- 'are already enabled or do not exist' % not_on)
+ self.log_debug(f'Requested to enable plugins {not_on}, but '
+ 'plugins are already enabled or do not exist')
enable = self._fmt_sos_opt_list(opts)
if enable:
- sos_opts.append('--enable-plugins=%s' % quote(enable))
+ sos_opts.append(f'--enable-plugins={quote(enable)}')
- self.sos_cmd = "%s %s" % (sos_cmd, ' '.join(sos_opts))
- self.log_info('Final sos command set to %s' % self.sos_cmd)
+ self.sos_cmd = f"{sos_cmd} {' '.join(sos_opts)}"
+ self.log_info(f'Final sos command set to {self.sos_cmd}')
self.manifest.add_field('final_sos_command', self.sos_cmd)
def determine_sos_label(self):
@@ -715,19 +713,19 @@ class SosNode():
label += self.cluster.get_node_label(self)
if self.opts.label:
- label += ('%s' % self.opts.label if not label
- else '-%s' % self.opts.label)
+ label += (f'{self.opts.label}' if not label
+ else f'-{self.opts.label}')
if not label:
return None
- self.log_debug('Label for sos report set to %s' % label)
+ self.log_debug(f'Label for sos report set to {label}')
if self.check_sos_version('3.6'):
lcmd = '--label'
else:
lcmd = '--name'
- label = '%s-%s' % (self.address.split('.')[0], label)
- return '%s=%s' % (lcmd, label)
+ label = f'{self.address.split(".")[0]}-{label}'
+ return f'{lcmd}={label}'
def finalize_sos_path(self, path):
"""Use host facts to determine if we need to change the sos path
@@ -736,7 +734,7 @@ class SosNode():
if pstrip:
path = path.replace(pstrip, '')
path = path.split()[0]
- self.log_info('Final sos path: %s' % path)
+ self.log_info(f'Final sos path: {path}')
self.sos_path = path
self.archive = path.split('/')[-1]
self.manifest.add_field('collected_archive', self.archive)
@@ -752,7 +750,7 @@ class SosNode():
if len(stdout) > 0:
return stdout.split('\n')[0:1]
else:
- return 'sos exited with code %s' % rc
+ return f'sos exited with code {rc}'
def execute_sos_command(self):
"""Run sos report and capture the resulting file path"""
@@ -787,8 +785,8 @@ class SosNode():
self.manifest.add_field('checksum_type', 'unknown')
else:
err = self.determine_sos_error(res['status'], res['output'])
- self.log_debug("Error running sos report. rc = %s msg = %s"
- % (res['status'], res['output']))
+ self.log_debug("Error running sos report. rc = "
+ f"{res['status']} msg = {res['output']}")
raise Exception(err)
return path
except CommandTimeoutException:
@@ -805,15 +803,14 @@ class SosNode():
dest = os.path.join(destdir, path.split('/')[-1])
try:
if self.file_exists(path):
- self.log_info("Copying remote %s to local %s" %
- (path, destdir))
+ self.log_info(f"Copying remote {path} to local {destdir}")
return self._transport.retrieve_file(path, dest)
else:
- self.log_debug("Attempting to copy remote file %s, but it "
- "does not exist on filesystem" % path)
+ self.log_debug(f"Attempting to copy remote file {path}, but it"
+ " does not exist on filesystem")
return False
except Exception as err:
- self.log_debug("Failed to retrieve %s: %s" % (path, err))
+ self.log_debug(f"Failed to retrieve {path}: {err}")
return False
def remove_file(self, path):
@@ -823,20 +820,20 @@ class SosNode():
path = ''.join(path.split())
try:
if len(path.split('/')) <= 2: # ensure we have a non '/' path
- self.log_debug("Refusing to remove path %s: appears to be "
- "incorrect and possibly dangerous" % path)
+ self.log_debug(f"Refusing to remove path {path}: appears to "
+ "be incorrect and possibly dangerous")
return False
if self.file_exists(path):
- self.log_info("Removing file %s" % path)
- cmd = "rm -f %s" % path
+ self.log_info(f"Removing file {path}")
+ cmd = f"rm -f {path}"
res = self.run_command(cmd, need_root=True)
return res['status'] == 0
else:
- self.log_debug("Attempting to remove remote file %s, but it "
- "does not exist on filesystem" % path)
+ self.log_debug(f"Attempting to remove remote file {path}, but "
+ "it does not exist on filesystem")
return False
except Exception as e:
- self.log_debug('Failed to remove %s: %s' % (path, e))
+ self.log_debug(f'Failed to remove {path}: {e}')
return False
def retrieve_sosreport(self):
@@ -848,7 +845,7 @@ class SosNode():
except Exception:
self.log_error('Failed to make archive readable')
return False
- self.log_info('Retrieving sos report from %s' % self.address)
+ self.log_info(f'Retrieving sos report from {self.address}')
self.ui_msg('Retrieving sos report...')
try:
ret = self.retrieve_file(self.sos_path)
@@ -869,8 +866,8 @@ class SosNode():
else:
e = [x.strip() for x in self.stdout.readlines() if x.strip][-1]
self.soslog.error(
- 'Failed to run sos report on %s: %s' % (self.address, e))
- self.log_error('Failed to run sos report. %s' % e)
+ f'Failed to run sos report on {self.address}: {e}')
+ self.log_error(f'Failed to run sos report. {e}')
return False
def remove_sos_archive(self):
@@ -881,8 +878,8 @@ class SosNode():
# is no archive at the original location to remove
return
if 'sosreport' not in self.sos_path:
- self.log_debug("Node sos report path %s looks incorrect. Not "
- "attempting to remove path" % self.sos_path)
+ self.log_debug(f"Node sos report path {self.sos_path} looks "
+ "incorrect. Not attempting to remove path")
return
removed = self.remove_file(self.sos_path)
if not removed:
@@ -907,18 +904,18 @@ class SosNode():
try:
self.make_archive_readable(filename)
except Exception as err:
- self.log_error("Unable to retrieve file %s" % filename)
- self.log_debug("Failed to make file %s readable: %s"
- % (filename, err))
+ self.log_error(f"Unable to retrieve file {filename}")
+ self.log_debug(f"Failed to make file {filename} "
+ f"readable: {err}")
continue
ret = self.retrieve_file(filename)
if ret:
self.file_list.append(filename.split('/')[-1])
self.remove_file(filename)
else:
- self.log_error("Unable to retrieve file %s" % filename)
+ self.log_error(f"Unable to retrieve file {filename}")
except Exception as e:
- msg = 'Error collecting additional data from primary: %s' % e
+ msg = f'Error collecting additional data from primary: {e}'
self.log_error(msg)
def make_archive_readable(self, filepath):
@@ -927,7 +924,7 @@ class SosNode():
This is only used when we're not connecting as root.
"""
- cmd = 'chmod o+r %s' % filepath
+ cmd = f'chmod o+r {filepath}'
res = self.run_command(cmd, timeout=10, need_root=True)
if res['status'] == 0:
return True
diff --git a/sos/collector/transports/__init__.py b/sos/collector/transports/__init__.py
index dbabee4b..92bd9195 100644
--- a/sos/collector/transports/__init__.py
+++ b/sos/collector/transports/__init__.py
@@ -49,20 +49,20 @@ class RemoteTransport():
def log_info(self, msg):
"""Used to print and log info messages"""
caller = inspect.stack()[1][3]
- lmsg = '[%s:%s] %s' % (self.hostname, caller, msg)
+ lmsg = f'[{self.hostname}:{caller}] {msg}'
self.soslog.info(lmsg)
def log_error(self, msg):
"""Used to print and log error messages"""
caller = inspect.stack()[1][3]
- lmsg = '[%s:%s] %s' % (self.hostname, caller, msg)
+ lmsg = f'[{self.hostname}:{caller}] {msg}'
self.soslog.error(lmsg)
def log_debug(self, msg):
"""Used to print and log debug messages"""
msg = self._sanitize_log_msg(msg)
caller = inspect.stack()[1][3]
- msg = '[%s:%s] %s' % (self.hostname, caller, msg)
+ msg = f'[{self.hostname}:{caller}] {msg}'
self.soslog.debug(msg)
@property
@@ -94,8 +94,8 @@ class RemoteTransport():
def display_help(cls, section):
if cls is RemoteTransport:
return cls.display_self_help(section)
- section.set_title("%s Transport Detailed Help"
- % cls.name.title().replace('_', ' '))
+ section.set_title(f"{cls.name.title().replace('_', ' ')} "
+ "Transport Detailed Help")
if cls.__doc__ and cls.__doc__ is not RemoteTransport.__doc__:
section.add_text(cls.__doc__)
else:
@@ -109,18 +109,17 @@ class RemoteTransport():
section.set_title('SoS Remote Transport Help')
section.add_text(
"\nTransports define how SoS connects to nodes and executes "
- "commands on them for the purposes of an %s run. Generally, "
- "this means transports define how commands are wrapped locally "
- "so that they are executed on the remote node(s) instead."
- % bold('sos collect')
+ f"commands on them for the purposes of an {bold('sos collect')} "
+ "run. Generally, this means transports define how commands are "
+ "wrapped locally so that they are executed on the remote node(s) "
+ "instead."
)
section.add_text(
"Transports are generally selected by the cluster profile loaded "
"for a given execution, however users may explicitly set one "
- "using '%s'. Note that not all transports will function for all "
- "cluster/node types."
- % bold('--transport=$transport_name')
+ f"using '{bold('--transport=$transport_name')}'. Note that not all"
+ " transports will function for all cluster/node types."
)
section.add_text(
@@ -131,8 +130,8 @@ class RemoteTransport():
from sos.collector.sosnode import TRANSPORTS
for transport in TRANSPORTS:
- _sec = bold("collect.transports.%s" % transport)
- _desc = "The '%s' transport" % transport.lower()
+ _sec = bold(f"collect.transports.{transport}")
+ _desc = f"The '{transport.lower()}' transport"
section.add_text(
f"{' ':>8}{_sec:<45}{_desc:<30}",
newline=False
@@ -153,8 +152,8 @@ class RemoteTransport():
"""Actually perform the connection requirements. Should be overridden
by specific transports that subclass RemoteTransport
"""
- raise NotImplementedError("Transport %s does not define connect"
- % self.name)
+ raise NotImplementedError(
+ f"Transport {self.name} does not define connect")
def reconnect(self, password):
"""Attempts to reconnect to the node using the standard connect()
@@ -164,18 +163,17 @@ class RemoteTransport():
attempts = 1
last_err = 'unknown'
while attempts < 5:
- self.log_debug("Attempting reconnect (#%s) to node" % attempts)
+ self.log_debug(f"Attempting reconnect (#{attempts}) to node")
try:
if self.connect(password):
return True
except Exception as err:
- self.log_debug("Attempt #%s exception: %s" % (attempts, err))
+ self.log_debug(f"Attempt #{attempts} exception: {err}")
last_err = err
attempts += 1
self.log_error("Unable to reconnect to node after 5 attempts, "
"aborting.")
- raise ConnectionException("last exception from transport: %s"
- % last_err)
+ raise ConnectionException(f"last exception from transport: {last_err}")
def disconnect(self):
"""Perform whatever steps are necessary, if any, to terminate any
@@ -188,11 +186,11 @@ class RemoteTransport():
self.log_error("Unable to successfully disconnect, see log for"
" more details")
except Exception as err:
- self.log_error("Failed to disconnect: %s" % err)
+ self.log_error(f"Failed to disconnect: {err}")
def _disconnect(self):
- raise NotImplementedError("Transport %s does not define disconnect"
- % self.name)
+ raise NotImplementedError(
+ f"Transport {self.name} does not define disconnect")
@property
def _need_shell(self):
@@ -227,10 +225,10 @@ class RemoteTransport():
:returns: Output of ``cmd`` and the exit code
:rtype: ``dict`` with keys ``output`` and ``status``
"""
- self.log_debug('Running command %s' % cmd)
+ self.log_debug(f'Running command {cmd}')
if (use_shell is True or
(self._need_shell if use_shell == 'auto' else False)):
- cmd = "/bin/bash -c %s" % quote(cmd)
+ cmd = f"/bin/bash -c {quote(cmd)}"
self.log_debug(f"Shell requested, command is now {cmd}")
# currently we only use/support the use of pexpect for handling the
# execution of these commands, as opposed to directly invoking
@@ -252,7 +250,7 @@ class RemoteTransport():
transport
:rtype: ``str``
"""
- cmd = "%s %s" % (self.remote_exec, quote(cmd))
+ cmd = f"{self.remote_exec} {quote(cmd)}"
cmd = cmd.lstrip()
return cmd
@@ -350,7 +348,7 @@ class RemoteTransport():
if not self._hostname:
self._hostname = self.address
- self.log_info("Hostname set to %s" % self._hostname)
+ self.log_info(f"Hostname set to {self._hostname}")
return self._hostname
def retrieve_file(self, fname, dest):
@@ -372,17 +370,17 @@ class RemoteTransport():
ret = self._retrieve_file(fname, dest)
if ret:
return True
- self.log_info("File retrieval attempt %s failed" % attempts)
+ self.log_info(f"File retrieval attempt {attempts} failed")
self.log_info("File retrieval failed after 5 attempts")
return False
except Exception as err:
- self.log_error("Exception encountered during retrieval attempt %s "
- "for %s: %s" % (attempts, fname, err))
+ self.log_error("Exception encountered during retrieval attempt "
+ f"{attempts} for {fname}: {err}")
raise err
def _retrieve_file(self, fname, dest):
- raise NotImplementedError("Transport %s does not support file copying"
- % self.name)
+ raise NotImplementedError(
+ f"Transport {self.name} does not support file copying")
def read_file(self, fname):
"""Read the given file fname and return its contents
@@ -393,20 +391,19 @@ class RemoteTransport():
:returns: The content of the file
:rtype: ``str``
"""
- self.log_debug("Reading file %s" % fname)
+ self.log_debug(f"Reading file {fname}")
return self._read_file(fname)
def _read_file(self, fname):
- res = self.run_command("cat %s" % fname, timeout=10)
+ res = self.run_command(f"cat {fname}", timeout=10)
if res['status'] == 0:
return res['output']
else:
if 'No such file' in res['output']:
- self.log_debug("File %s does not exist on node"
- % fname)
+ self.log_debug(f"File {fname} does not exist on node")
else:
- self.log_error("Error reading %s: %s" %
- (fname, res['output'].split(':')[1:]))
+ self.log_error(f"Error reading {fname}: "
+ f"{res['output'].split(':')[1:]}")
return ''
# vim: set et ts=4 sw=4 :
diff --git a/sos/collector/transports/control_persist.py b/sos/collector/transports/control_persist.py
index 1cffef76..b8a979bf 100644
--- a/sos/collector/transports/control_persist.py
+++ b/sos/collector/transports/control_persist.py
@@ -95,24 +95,20 @@ class SSHControlPersist(RemoteTransport):
"Please update your OpenSSH installation.")
raise
self.log_info('Opening SSH session to create control socket')
- self.control_path = ("%s/.sos-collector-%s" % (self.tmpdir,
- self.address))
+ self.control_path = (f"{self.tmpdir}/.sos-collector-{self.address}")
self.ssh_cmd = ''
connected = False
ssh_key = ''
ssh_port = ''
if self.opts.ssh_port != 22:
- ssh_port = "-p%s " % self.opts.ssh_port
+ ssh_port = f"-p{self.opts.ssh_port} "
if self.opts.ssh_key:
- ssh_key = "-i%s" % self.opts.ssh_key
-
- cmd = ("ssh %s %s -oControlPersist=600 -oControlMaster=auto "
- "-oStrictHostKeyChecking=no -oControlPath=%s %s@%s "
- "\"echo Connected\"" % (ssh_key,
- ssh_port,
- self.control_path,
- self.opts.ssh_user,
- self.address))
+ ssh_key = f"-i{self.opts.ssh_key}"
+
+ cmd = (f"ssh {ssh_key} {ssh_port} -oControlPersist=600 "
+ "-oControlMaster=auto -oStrictHostKeyChecking=no "
+ f"-oControlPath={self.control_path} {self.opts.ssh_user}@"
+ f"{self.address} \"echo Connected\"")
res = pexpect.spawn(cmd, encoding='utf-8')
connect_expects = [
@@ -157,12 +153,12 @@ class SSHControlPersist(RemoteTransport):
elif index == 5:
raise ConnectionTimeoutException
else:
- raise Exception("Unknown error, client returned %s" % res.before)
+ raise Exception(f"Unknown error, client returned {res.before}")
if connected:
if not os.path.exists(self.control_path):
raise ControlSocketMissingException
- self.log_debug("Successfully created control socket at %s"
- % self.control_path)
+ self.log_debug("Successfully created control socket at "
+ f"{self.control_path}")
return True
return False
@@ -172,7 +168,7 @@ class SSHControlPersist(RemoteTransport):
os.remove(self.control_path)
return True
except Exception as err:
- self.log_debug("Could not disconnect properly: %s" % err)
+ self.log_debug(f"Could not disconnect properly: {err}")
return False
self.log_debug("Control socket not present when attempting to "
"terminate session")
@@ -193,15 +189,13 @@ class SSHControlPersist(RemoteTransport):
@property
def remote_exec(self):
if not self.ssh_cmd:
- self.ssh_cmd = "ssh -oControlPath=%s %s@%s" % (
- self.control_path, self.opts.ssh_user, self.address
- )
+ self.ssh_cmd = (f"ssh -oControlPath={self.control_path} "
+ f"{self.opts.ssh_user}@{self.address}")
return self.ssh_cmd
def _retrieve_file(self, fname, dest):
- cmd = "/usr/bin/scp -oControlPath=%s %s@%s:%s %s" % (
- self.control_path, self.opts.ssh_user, self.address, fname, dest
- )
+ cmd = (f"/usr/bin/scp -oControlPath={self.control_path} "
+ f"{self.opts.ssh_user}@{self.address}:{fname} {dest}")
res = sos_get_command_output(cmd)
return res['status'] == 0
diff --git a/sos/collector/transports/local.py b/sos/collector/transports/local.py
index dd72053c..52cd0d15 100644
--- a/sos/collector/transports/local.py
+++ b/sos/collector/transports/local.py
@@ -34,7 +34,7 @@ class LocalTransport(RemoteTransport):
return True
def _retrieve_file(self, fname, dest):
- self.log_debug("Moving %s to %s" % (fname, dest))
+ self.log_debug(f"Moving {fname} to {dest}")
shutil.copy(fname, dest)
return True
@@ -45,7 +45,7 @@ class LocalTransport(RemoteTransport):
if os.path.exists(fname):
with open(fname, 'r') as rfile:
return rfile.read()
- self.log_debug("No such file: %s" % fname)
+ self.log_debug(f"No such file: {fname}")
return ''
# vim: set et ts=4 sw=4 :
diff --git a/sos/collector/transports/oc.py b/sos/collector/transports/oc.py
index ae228c5c..e011b0f6 100644
--- a/sos/collector/transports/oc.py
+++ b/sos/collector/transports/oc.py
@@ -51,14 +51,14 @@ class OCTransport(RemoteTransport):
execution
"""
return sos_get_command_output(
- "oc -n %s %s" % (self.project, cmd),
+ f"oc -n {self.project} {cmd}",
**kwargs
)
@property
def connected(self):
up = self.run_oc(
- "wait --timeout=0s --for=condition=ready pod/%s" % self.pod_name
+ f"wait --timeout=0s --for=condition=ready pod/{self.pod_name}"
)
return up['status'] == 0
@@ -71,7 +71,7 @@ class OCTransport(RemoteTransport):
"kind": "Pod",
"apiVersion": "v1",
"metadata": {
- "name": "%s-sos-collector" % self.address.split('.')[0],
+ "name": f"{self.address.split('.')[0]}-sos-collector",
"namespace": self.project
},
"priorityClassName": "system-cluster-critical",
@@ -169,23 +169,23 @@ class OCTransport(RemoteTransport):
fd, self.pod_tmp_conf = tempfile.mkstemp(dir=self.tmpdir)
with open(fd, 'w') as cfile:
json.dump(podconf, cfile)
- self.log_debug("Starting sos collector container '%s'" % self.pod_name)
+ self.log_debug(f"Starting sos collector container '{self.pod_name}'")
# this specifically does not need to run with a project definition
out = sos_get_command_output(
- "oc create -f %s" % self.pod_tmp_conf
+ f"oc create -f {self.pod_tmp_conf}"
)
- if (out['status'] != 0 or "pod/%s created" % self.pod_name not in
+ if (out['status'] != 0 or f"pod/{self.pod_name} created" not in
out['output']):
self.log_error("Unable to deploy sos collect pod")
- self.log_debug("Debug pod deployment failed: %s" % out['output'])
+ self.log_debug(f"Debug pod deployment failed: {out['output']}")
return False
- self.log_debug("Pod '%s' successfully deployed, waiting for pod to "
- "enter ready state" % self.pod_name)
+ self.log_debug(f"Pod '{self.pod_name}' successfully deployed, waiting "
+ "for pod to enter ready state")
# wait for the pod to report as running
try:
- up = self.run_oc("wait --for=condition=Ready pod/%s --timeout=30s"
- % self.pod_name,
+ up = self.run_oc(f"wait --for=condition=Ready pod/{self.pod_name} "
+ "--timeout=30s",
# timeout is for local safety, not oc
timeout=40)
if not up['status'] == 0:
@@ -195,16 +195,15 @@ class OCTransport(RemoteTransport):
self.log_error("Timeout while polling for pod readiness")
return False
except Exception as err:
- self.log_error("Error while waiting for pod to be ready: %s"
- % err)
+ self.log_error(f"Error while waiting for pod to be ready: {err}")
return False
return True
def _format_cmd_for_exec(self, cmd):
if cmd.startswith('oc'):
- return ("oc -n %s exec --request-timeout=0 %s -- chroot /host %s"
- % (self.project, self.pod_name, cmd))
+ return (f"oc -n {self.project} exec --request-timeout=0 "
+ f"{self.pod_name} -- chroot /host {cmd}")
return super(OCTransport, self)._format_cmd_for_exec(cmd)
def run_command(self, cmd, timeout=180, need_root=False, env=None,
@@ -221,22 +220,21 @@ class OCTransport(RemoteTransport):
def _disconnect(self):
if os.path.exists(self.pod_tmp_conf):
os.unlink(self.pod_tmp_conf)
- removed = self.run_oc("delete pod %s" % self.pod_name)
+ removed = self.run_oc(f"delete pod {self.pod_name}")
if "deleted" not in removed['output']:
- self.log_debug("Calling delete on pod '%s' failed: %s"
- % (self.pod_name, removed))
+ self.log_debug(f"Calling delete on pod '{self.pod_name}' failed: "
+ f"{removed}")
return False
return True
@property
def remote_exec(self):
- return ("oc -n %s exec --request-timeout=0 %s -- /bin/bash -c"
- % (self.project, self.pod_name))
+ return (f"oc -n {self.project} exec --request-timeout=0 "
+ f"{self.pod_name} -- /bin/bash -c")
def _retrieve_file(self, fname, dest):
# check if --retries flag is available for given version of oc
result = self.run_oc("cp --retries", stderr=True)
flags = '' if "unknown flag" in result["output"] else '--retries=5'
- cmd = self.run_oc("cp %s %s:%s %s"
- % (flags, self.pod_name, fname, dest))
+ cmd = self.run_oc(f"cp {flags} {self.pod_name}:{fname} {dest}")
return cmd['status'] == 0
diff --git a/sos/component.py b/sos/component.py
index 9a6b490a..68ff2b6a 100644
--- a/sos/component.py
+++ b/sos/component.py
@@ -109,7 +109,7 @@ class SoSComponent():
if not os.path.isdir(tmpdir) \
or not os.access(tmpdir, os.W_OK):
- msg = "temporary directory %s " % tmpdir
+ msg = f"temporary directory {tmpdir} "
msg += "does not exist or is not writable\n"
# write directly to stderr as logging is not initialised yet
sys.stderr.write(msg)
@@ -175,7 +175,7 @@ class SoSComponent():
# no standard library method exists for this, so call out to stat to
# avoid bringing in a dependency on psutil
self.tmpfstype = shell_out(
- "stat --file-system --format=%s %s" % ("%T", tmpdir)
+ f"stat --file-system --format=%T {tmpdir}"
).strip()
if self.tmpfstype == 'tmpfs':
@@ -281,7 +281,7 @@ class SoSComponent():
if opts.preset != self._arg_defaults["preset"]:
self.preset = self.policy.find_preset(opts.preset)
if not self.preset:
- sys.stderr.write("Unknown preset: '%s'\n" % opts.preset)
+ sys.stderr.write(f"Unknown preset: '{opts.preset}'\n")
self.preset = self.policy.probe_preset()
opts.list_presets = True
@@ -310,8 +310,8 @@ class SoSComponent():
if self.tmpdir:
rmtree(self.tmpdir)
except Exception as err:
- print("Failed to finish cleanup: %s\nContents may remain in %s"
- % (err, self.tmpdir))
+ print(f"Failed to finish cleanup: {err}\nContents may remain in "
+ f"{self.tmpdir}")
def _set_encrypt_from_env_vars(self):
msg = ('No encryption environment variables set, archive will not be '
diff --git a/sos/help/__init__.py b/sos/help/__init__.py
index a96f3474..c3625087 100644
--- a/sos/help/__init__.py
+++ b/sos/help/__init__.py
@@ -76,7 +76,7 @@ class SoSHelper(SoSComponent):
try:
klass = self.get_obj_for_topic()
except Exception as err:
- print("Could not load help for '%s': %s" % (self.opts.topic, err))
+ print(f"Could not load help for '{self.opts.topic}': {err}")
sys.exit(1)
if klass:
@@ -85,9 +85,9 @@ class SoSHelper(SoSComponent):
klass.display_help(ht)
ht.display()
except Exception as err:
- print("Error loading help: %s" % err)
+ print(f"Error loading help: {err}")
else:
- print("No help section found for '%s'" % self.opts.topic)
+ print(f"No help section found for '{self.opts.topic}'")
def get_obj_for_topic(self):
"""Based on the help topic we're after, try to smartly decide which
@@ -176,22 +176,22 @@ class SoSHelper(SoSComponent):
'SoS - officially pronounced "ess-oh-ess" - is a diagnostic and '
'supportability utility used by several Linux distributions as an '
'easy-to-use tool for standardized data collection. The most known'
- ' component of which is %s (formerly sosreport) which is used to '
- 'collect troubleshooting information into an archive for review '
- 'by sysadmins or technical support teams.'
- % bold('sos report')
+ f' component of which is {bold("sos report")} (formerly sosreport)'
+ ' which is used to collect troubleshooting information into an '
+ 'archive for review by sysadmins or technical support teams.'
)
subsect = self_help.add_section('How to search using sos help')
usage = bold('$component.$topic.$subtopic')
subsect.add_text(
- 'To get more information on a given topic, use the form \'%s\'.'
- % usage
+ 'To get more information on a given topic, use the form '
+ f'\'{usage}\'.'
)
rep_ex = bold('sos help report.plugins.kernel')
- subsect.add_text("For example '%s' will provide more information on "
- "the kernel plugin for the report function." % rep_ex)
+ subsect.add_text(f"For example '{rep_ex}' will provide more "
+ "information on the kernel plugin for the report "
+ "function.")
avail_help = self_help.add_section('Available Help Sections')
avail_help.add_text(
diff --git a/sos/missing.py b/sos/missing.py
index 30c5b89c..7a903b0a 100644
--- a/sos/missing.py
+++ b/sos/missing.py
@@ -39,10 +39,8 @@ class MissingCollect(SoSComponent):
"""Set the --help output for collect to a message that shows that
the functionality is unavailable
"""
- msg = "%s %s" % (
- 'WARNING: `collect` is not available with this installation!',
- cls.missing_msg
- )
+ msg = ("WARNING: `collect` is not available with this installation! "
+ f"{cls.missing_msg}")
parser.epilog = msg
return parser
diff --git a/sos/options.py b/sos/options.py
index deb65f65..6ec1e4a6 100644
--- a/sos/options.py
+++ b/sos/options.py
@@ -81,7 +81,7 @@ class SoSOptions():
vals = [",".join(v) if _is_seq(v) else v for v in vals]
else:
# Only quote strings if quote=False
- vals = ["'%s'" % v if isinstance(v, str) else v for v in vals]
+ vals = [f"'{v}'" if isinstance(v, str) else v for v in vals]
return (args % tuple(vals)).strip(sep) + suffix
@@ -143,10 +143,10 @@ class SoSOptions():
)
count = ("verbose",)
if opt in no_value:
- return ["--%s" % opt]
+ return [f"--{opt}"]
if opt in count:
- return ["--%s" % opt for d in range(0, int(val))]
- return ["--" + opt + "=" + val]
+ return [f"--{opt}" for d in range(0, int(val))]
+ return [f"--{opt}={val}"]
def _convert_to_type(self, key, val, conf):
"""Ensure that the value read from a config file is the proper type
@@ -166,16 +166,15 @@ class SoSOptions():
val = str_to_bool(val)
if val is None:
raise Exception(
- "Value of '%s' in %s must be True or False or analagous"
- % (key, conf))
+ f"Value of '{key}' in {conf} must be True or False or "
+ "analagous")
else:
return val
if isinstance(self.arg_defaults[key], int):
try:
return int(val)
except ValueError:
- raise Exception("Value of '%s' in %s must be integer"
- % (key, conf))
+ raise Exception(f"Value of '{key}' in {conf} must be integer")
return val
def update_from_conf(self, config_file, component):
@@ -216,8 +215,7 @@ class SoSOptions():
if key not in self.arg_defaults:
# read an option that is not loaded by the current
# SoSComponent
- print("Unknown option '%s' in section '%s'"
- % (key, section))
+ print(f"Unknown option '{key}' in section '{section}'")
continue
val = self._convert_to_type(key, val, config_file)
setattr(self, key, val)
@@ -228,15 +226,14 @@ class SoSOptions():
with open(config_file) as f:
config.read_file(f, config_file)
except DuplicateOptionError as err:
- raise exit("Duplicate option '%s' in section '%s' in file %s"
- % (err.option, err.section, config_file))
+ raise exit(f"Duplicate option '{err.option}' in section "
+ f"'{err.section}' in file {config_file}")
except (ParsingError, Error):
- raise exit('Failed to parse configuration file %s'
- % config_file)
+ raise exit(f'Failed to parse configuration file {config_file}')
except (OSError, IOError) as e:
print(
- 'WARNING: Unable to read configuration file %s : %s'
- % (config_file, e.args[1])
+ f'WARNING: Unable to read configuration file {config_file} : '
+ f'{e.args[1]}'
)
_update_from_section("global", config)
@@ -323,7 +320,7 @@ class SoSOptions():
value = ",".join(value) if _is_seq(value) else value
if value is not True:
- opt = "%s %s" % (name, value)
+ opt = f"{name} {value}"
else:
opt = name
diff --git a/sos/policies/__init__.py b/sos/policies/__init__.py
index a13bf209..e13b0ff0 100644
--- a/sos/policies/__init__.py
+++ b/sos/policies/__init__.py
@@ -21,7 +21,7 @@ from textwrap import fill
def import_policy(name):
- policy_fqname = "sos.policies.distros.%s" % name
+ policy_fqname = f"sos.policies.distros.{name}"
try:
return import_module(policy_fqname, Policy)
except ImportError:
@@ -369,7 +369,8 @@ any third party.
)
section.add_text(
- "When SoS intializes most functions, for example %s and %s, one "
+ "When SoS intializes most functions, for example "
+ f"{bold('sos report')} and {bold('sos collect')}, one "
"of the first operations is to determine the correct policy to "
"load for the local system. Policies will determine the proper "
"package manager to use, any applicable container runtime(s), and "
@@ -377,7 +378,6 @@ any third party.
" for collections. Generally speaking a single policy will map to"
" a single distribution; for example there are separate policies "
"for Debian, Ubuntu, RHEL, and Fedora."
- % (bold('sos report'), bold('sos collect'))
)
section.add_text(
@@ -580,7 +580,7 @@ any third party.
raise ValueError("Preset name cannot be empty")
if name in self.presets.keys():
- raise ValueError("A preset with name '%s' already exists" % name)
+ raise ValueError(f"A preset with name '{name}' already exists")
preset = PresetDefaults(name=name, desc=desc, note=note, opts=opts)
preset.builtin = False
@@ -589,13 +589,12 @@ any third party.
def del_preset(self, name=""):
if not name or name not in self.presets.keys():
- raise ValueError("Unknown profile: '%s'" % name)
+ raise ValueError(f"Unknown profile: '{name}'")
preset = self.presets[name]
if preset.builtin:
- raise ValueError("Cannot delete built-in preset '%s'" %
- preset.name)
+ raise ValueError(f"Cannot delete built-in preset '{preset.name}'")
preset.delete(self.presets_path)
self.presets.pop(name)
diff --git a/sos/policies/auth/__init__.py b/sos/policies/auth/__init__.py
index 5b62a495..9530f647 100644
--- a/sos/policies/auth/__init__.py
+++ b/sos/policies/auth/__init__.py
@@ -58,7 +58,7 @@ class DeviceAuthorizationClass:
requesting a new device code.
"""
- data = "client_id={}".format(DEVICE_AUTH_CLIENT_ID)
+ data = f"client_id={DEVICE_AUTH_CLIENT_ID}"
headers = {'content-type': 'application/x-www-form-urlencoded'}
if not REQUESTS_LOADED:
raise Exception("python3-requests is not installed and is required"
@@ -197,10 +197,9 @@ class DeviceAuthorizationClass:
elif refresh_token_res.status_code == 400 and 'invalid' in\
refresh_token_res.json()['error']:
logger.warning("Problem while fetching the new tokens from refresh"
- " token grant - {} {}."
- " New Device code will be requested !".format
- (refresh_token_res.status_code,
- refresh_token_res.json()['error']))
+ f" token grant - {refresh_token_res.status_code} "
+ f"{refresh_token_res.json()['error']}."
+ " New Device code will be requested !")
self._use_device_code_grant()
else:
raise Exception(
diff --git a/sos/policies/distros/__init__.py b/sos/policies/distros/__init__.py
index b5f5baee..6f832bf5 100644
--- a/sos/policies/distros/__init__.py
+++ b/sos/policies/distros/__init__.py
@@ -161,7 +161,7 @@ class LinuxPolicy(Policy):
if cls == LinuxPolicy:
cls.display_self_help(section)
else:
- section.set_title("%s Distribution Policy" % cls.distro)
+ section.set_title(f"{cls.distro} Distribution Policy")
cls.display_distro_help(section)
@classmethod
@@ -186,14 +186,14 @@ class LinuxPolicy(Policy):
# information like $PATH and loaded presets
_pol = cls(None, None, False)
section.add_text(
- "Default --upload location: %s" % _pol._upload_url
+ f"Default --upload location: {_pol._upload_url}"
)
section.add_text(
- "Default container runtime: %s" % _pol.default_container_runtime,
+ f"Default container runtime: {_pol.default_container_runtime}",
newline=False
)
section.add_text(
- "$PATH used when running report: %s" % _pol.PATH,
+ f"$PATH used when running report: {_pol.PATH}",
newline=False
)
@@ -249,7 +249,7 @@ class LinuxPolicy(Policy):
# next, include kernel builtins
builtins = self.join_sysroot(
- "/usr/lib/modules/%s/modules.builtin" % release
+ f"/usr/lib/modules/{release}/modules.builtin"
)
try:
with open(builtins, "r") as mfile:
@@ -267,7 +267,7 @@ class LinuxPolicy(Policy):
'dm_mod': 'CONFIG_BLK_DEV_DM'
}
- booted_config = self.join_sysroot("/boot/config-%s" % release)
+ booted_config = self.join_sysroot(f"/boot/config-{release}")
kconfigs = []
try:
with open(booted_config, "r") as kfile:
@@ -436,7 +436,7 @@ class LinuxPolicy(Policy):
be provided or not
"""
if not self.get_upload_user():
- msg = "Please provide upload user for %s: " % self.get_upload_url()
+ msg = f"Please provide upload user for {self.get_upload_url()}: "
self.upload_user = input(_(msg))
def prompt_for_upload_password(self):
@@ -445,8 +445,8 @@ class LinuxPolicy(Policy):
"""
if not self.get_upload_password() and (self.get_upload_user() !=
self._upload_user):
- msg = ("Please provide the upload password for %s: "
- % self.get_upload_user())
+ msg = ("Please provide the upload password for "
+ f"{self.get_upload_user()}: ")
self.upload_password = getpass(msg)
def upload_archive(self, archive):
@@ -524,7 +524,7 @@ class LinuxPolicy(Policy):
raise Exception("Must provide protocol in upload URL")
prot, url = self.upload_url.split('://')
if prot not in prots.keys():
- raise Exception("Unsupported or unrecognized protocol: %s" % prot)
+ raise Exception(f"Unsupported or unrecognized protocol: {prot}")
return prots[prot]
def get_upload_https_auth(self, user=None, password=None):
@@ -693,7 +693,7 @@ class LinuxPolicy(Policy):
# need to strip the protocol prefix here
sftp_url = self.get_upload_url().replace('sftp://', '')
- sftp_cmd = "sftp -oStrictHostKeyChecking=no %s@%s" % (user, sftp_url)
+ sftp_cmd = f"sftp -oStrictHostKeyChecking=no {user}@{sftp_url}"
ret = pexpect.spawn(sftp_cmd, encoding='utf-8')
sftp_expects = [
@@ -719,25 +719,25 @@ class LinuxPolicy(Policy):
sftp_connected = ret.expect(pass_expects, timeout=10) == 0
if not sftp_connected:
ret.close()
- raise Exception("Incorrect username or password for %s"
- % self.get_upload_url_string())
+ raise Exception("Incorrect username or password for "
+ f"{self.get_upload_url_string()}")
elif idx == 2:
- raise Exception("Connection refused by %s. Incorrect port?"
- % self.get_upload_url_string())
+ raise Exception("Connection refused by "
+ f"{self.get_upload_url_string()}. Incorrect port?")
elif idx == 3:
- raise Exception("Timeout hit trying to connect to %s"
- % self.get_upload_url_string())
+ raise Exception("Timeout hit trying to connect to "
+ f"{self.get_upload_url_string()}")
elif idx == 4:
- raise Exception("Unexpected error trying to connect to sftp: %s"
- % ret.before)
+ raise Exception("Unexpected error trying to connect to sftp: "
+ f"{ret.before}")
if not sftp_connected:
ret.close()
- raise Exception("Unable to connect via SFTP to %s"
- % self.get_upload_url_string())
+ raise Exception("Unable to connect via SFTP to "
+ f"{self.get_upload_url_string()}")
- put_cmd = 'put %s %s' % (self.upload_archive_name,
- self._get_sftp_upload_name())
+ put_cmd = (f'put {self.upload_archive_name} '
+ f'{self._get_sftp_upload_name()}')
ret.sendline(put_cmd)
put_expects = [
@@ -755,11 +755,11 @@ class LinuxPolicy(Policy):
elif put_success == 1:
raise Exception("Timeout expired while uploading")
elif put_success == 2:
- raise Exception("Unknown error during upload: %s" % ret.before)
+ raise Exception(f"Unknown error during upload: {ret.before}")
elif put_success == 3:
raise Exception("Unable to write archive to destination")
else:
- raise Exception("Unexpected response from server: %s" % ret.before)
+ raise Exception(f"Unexpected response from server: {ret.before}")
def _get_sftp_upload_name(self):
"""If a specific file name pattern is required by the SFTP server,
@@ -832,8 +832,8 @@ class LinuxPolicy(Policy):
raise Exception(
"Authentication failed: invalid user credentials"
)
- raise Exception("POST request returned %s: %s"
- % (r.status_code, r.reason))
+ raise Exception(f"POST request returned {r.status_code}: "
+ f"{r.reason}")
return True
def upload_ftp(self, url=None, directory=None, user=None, password=None):
@@ -889,25 +889,24 @@ class LinuxPolicy(Policy):
"password?")
session.cwd(directory)
except socket.timeout:
- raise Exception("timeout hit while connecting to %s" % url)
+ raise Exception(f"timeout hit while connecting to {url}")
except socket.gaierror:
- raise Exception("unable to connect to %s" % url)
+ raise Exception(f"unable to connect to {url}")
except ftplib.error_perm as err:
errno = str(err).split()[0]
if errno == '503':
- raise Exception("could not login as '%s'" % user)
+ raise Exception(f"could not login as '{user}'")
if errno == '530':
- raise Exception("invalid password for user '%s'" % user)
+ raise Exception(f"invalid password for user '{user}'")
if errno == '550':
- raise Exception("could not set upload directory to %s"
- % directory)
- raise Exception("error trying to establish session: %s"
- % str(err))
+ raise Exception("could not set upload directory to "
+ f"{directory}")
+ raise Exception(f"error trying to establish session: {str(err)}")
try:
with open(self.upload_archive_name, 'rb') as _arcfile:
session.storbinary(
- "STOR %s" % self.upload_archive_name.split('/')[-1],
+ f"STOR {self.upload_archive_name.split('/')[-1]}",
_arcfile
)
session.quit()
@@ -1027,8 +1026,7 @@ class LinuxPolicy(Policy):
default to opening a bash shell in the container to keep it running,
thus allowing us to exec into it again.
"""
- return "%s start %s" % (self.container_runtime,
- self.sos_container_name)
+ return f"{self.container_runtime} start {self.sos_container_name}"
def format_container_command(self, cmd):
"""Returns the command that allows us to exec into the created
@@ -1041,9 +1039,8 @@ class LinuxPolicy(Policy):
:rtype: ``str``
"""
if self.container_runtime:
- return '%s exec %s %s' % (self.container_runtime,
- self.sos_container_name,
- cmd)
+ return (f'{self.container_runtime} exec {self.sos_container_name} '
+ f'{cmd}')
else:
return cmd
diff --git a/sos/policies/distros/redhat.py b/sos/policies/distros/redhat.py
index 5658516b..a72fe6d0 100644
--- a/sos/policies/distros/redhat.py
+++ b/sos/policies/distros/redhat.py
@@ -120,7 +120,7 @@ class RedHatPolicy(LinuxPolicy):
}
for subc in subs:
- subln = bold("policies.%s" % subc)
+ subln = bold(f"policies.{subc}")
section.add_text(
f"{' ':>8}{subln:<35}{subs[subc].distro:<30}",
newline=False
@@ -303,7 +303,7 @@ support representative.
return RH_API_HOST + rh_case_api % self.case_id
def _get_upload_https_auth(self):
- str_auth = "Bearer {}".format(self._device_token)
+ str_auth = f"Bearer {self._device_token}"
return {'Authorization': str_auth}
def _upload_https_post(self, archive, verify=True):
@@ -349,7 +349,7 @@ support representative.
"""
fname = self.upload_archive_name.split('/')[-1]
if self.case_id:
- fname = "%s_%s" % (self.case_id, fname)
+ fname = f"{self.case_id}_{fname}"
if self.upload_directory:
fname = os.path.join(self.upload_directory, fname)
return fname
@@ -468,7 +468,7 @@ support representative.
def probe_preset(self):
# Emergency or rescue mode?
for target in ["rescue", "emergency"]:
- if self.init_system.is_running("%s.target" % target, False):
+ if self.init_system.is_running(f"{target}.target", False):
return self.find_preset(CB)
# Package based checks
if self.pkg_by_name("satellite-common") is not None:
@@ -582,7 +582,7 @@ support representative.
)
def set_cleanup_cmd(self):
- return 'podman rm --force %s' % self.sos_container_name
+ return f'podman rm --force {self.sos_container_name}'
class FedoraPolicy(RedHatPolicy):
diff --git a/sos/policies/init_systems/__init__.py b/sos/policies/init_systems/__init__.py
index 97d65a01..88ca3b14 100644
--- a/sos/policies/init_systems/__init__.py
+++ b/sos/policies/init_systems/__init__.py
@@ -42,8 +42,8 @@ class InitSystem():
self.services = {}
self.init_cmd = init_cmd
- self.list_cmd = "%s %s" % (self.init_cmd, list_cmd) or None
- self.query_cmd = "%s %s" % (self.init_cmd, query_cmd) or None
+ self.list_cmd = f"{self.init_cmd} {list_cmd}" or None
+ self.query_cmd = f"{self.init_cmd} {query_cmd}" or None
self.chroot = chroot
def is_enabled(self, name):
@@ -118,7 +118,7 @@ class InitSystem():
if self.query_cmd:
try:
return sos_get_command_output(
- "%s %s" % (self.query_cmd, name),
+ f"{self.query_cmd} {name}",
chroot=self.chroot
)
except Exception:
diff --git a/sos/policies/runtimes/__init__.py b/sos/policies/runtimes/__init__.py
index 20492e20..010b4d1d 100644
--- a/sos/policies/runtimes/__init__.py
+++ b/sos/policies/runtimes/__init__.py
@@ -45,7 +45,7 @@ class ContainerRuntime():
def __init__(self, policy=None):
self.policy = policy
- self.run_cmd = "%s exec " % self.binary
+ self.run_cmd = f"{self.binary} exec "
def load_container_info(self):
"""If this runtime is found to be active, attempt to load information
@@ -82,7 +82,7 @@ class ContainerRuntime():
:type get_all: ``bool``
"""
containers = []
- _cmd = "%s ps %s" % (self.binary, '-a' if get_all else '')
+ _cmd = f"{self.binary} ps {'-a' if get_all else ''}"
if self.active:
out = sos_get_command_output(_cmd, chroot=self.policy.sysroot)
if out['status'] == 0:
@@ -119,7 +119,7 @@ class ContainerRuntime():
fmt = '{{lower .Repository}}:{{lower .Tag}} {{lower .ID}}'
if self.active:
out = sos_get_command_output(
- "%s images --format '%s'" % (self.binary, fmt),
+ f"{self.binary} images --format '{fmt}'",
chroot=self.policy.sysroot
)
if out['status'] == 0:
@@ -138,7 +138,7 @@ class ContainerRuntime():
vols = []
if self.active:
out = sos_get_command_output(
- "%s volume ls" % self.binary,
+ f"{self.binary} volume ls",
chroot=self.policy.sysroot
)
if out['status'] == 0:
@@ -183,7 +183,7 @@ class ContainerRuntime():
quoted_cmd = quote(cmd)
else:
quoted_cmd = cmd
- return "%s %s %s" % (self.run_cmd, container, quoted_cmd)
+ return f"{self.run_cmd} {container} {quoted_cmd}"
def fmt_registry_credentials(self, username, password):
"""Format a string to pass to the 'run' command of the runtime to
@@ -199,7 +199,7 @@ class ContainerRuntime():
:returns: The string to use to enable a run command to pull the image
:rtype: ``str``
"""
- return "--creds=%s%s" % (username, ':' + password if password else '')
+ return f"--creds={username}{':' + password if password else ''}"
def fmt_registry_authfile(self, authfile):
"""Format a string to pass to the 'run' command of the runtime to
@@ -207,7 +207,7 @@ class ContainerRuntime():
needed using an authfile.
"""
if authfile:
- return "--authfile %s" % authfile
+ return f"--authfile {authfile}"
return ''
def get_logs_command(self, container):
@@ -220,7 +220,7 @@ class ContainerRuntime():
:returns: Formatted runtime command to get logs from `container`
:type: ``str``
"""
- return "%s logs -t %s" % (self.binary, container)
+ return f"{self.binary} logs -t {container}"
def get_copy_command(self, container, path, dest, sizelimit=None):
"""Generate the command string used to copy a file out of a container
@@ -245,8 +245,7 @@ class ContainerRuntime():
:rtype: ``str``
"""
if sizelimit:
- return "%s %s tail -c %s %s" % (self.run_cmd, container, sizelimit,
- path)
- return "%s cp %s:%s %s" % (self.binary, container, path, dest)
+ return f"{self.run_cmd} {container} tail -c {sizelimit} {path}"
+ return f"{self.binary} cp {container}:{path} {dest}"
# vim: set et ts=4 sw=4 :
diff --git a/sos/policies/runtimes/crio.py b/sos/policies/runtimes/crio.py
index c586866b..d7726f8b 100644
--- a/sos/policies/runtimes/crio.py
+++ b/sos/policies/runtimes/crio.py
@@ -30,7 +30,7 @@ class CrioContainerRuntime(ContainerRuntime):
:type get_all: ``bool``
"""
containers = []
- _cmd = "%s ps %s -o json" % (self.binary, '-a' if get_all else '')
+ _cmd = f"{self.binary} ps {'-a' if get_all else ''} -o json"
if self.active:
out = sos_get_command_output(_cmd, chroot=self.policy.sysroot)
if out["status"] == 0:
@@ -49,7 +49,7 @@ class CrioContainerRuntime(ContainerRuntime):
"""
images = []
if self.active:
- out = sos_get_command_output("%s images -o json" % self.binary,
+ out = sos_get_command_output(f"{self.binary} images -o json",
chroot=self.policy.sysroot)
if out['status'] == 0:
out_json = json.loads(out["output"])
@@ -86,7 +86,7 @@ class CrioContainerRuntime(ContainerRuntime):
else:
quoted_cmd = cmd
container_id = self.get_container_by_name(container)
- return "%s %s %s" % (self.run_cmd, container_id,
- quoted_cmd) if container_id is not None else ''
+ return (f"{self.run_cmd} {container_id} {quoted_cmd}"
+ if container_id is not None else '')
# vim: set et ts=4 sw=4 :
diff --git a/sos/presets/__init__.py b/sos/presets/__init__.py
index e99a32a1..4764d926 100644
--- a/sos/presets/__init__.py
+++ b/sos/presets/__init__.py
@@ -53,15 +53,15 @@ class PresetDefaults():
"""Return a human readable string representation of this
``PresetDefaults`` object.
"""
- return ("name=%s desc=%s note=%s opts=(%s)" %
- (self.name, self.desc, self.note, str(self.opts)))
+ return (f"name={self.name} desc={self.desc} note={self.note} "
+ f"opts=({str(self.opts)})")
def __repr__(self):
"""Return a machine readable string representation of this
``PresetDefaults`` object.
"""
- return ("PresetDefaults(name='%s' desc='%s' note='%s' opts=(%s)" %
- (self.name, self.desc, self.note, repr(self.opts)))
+ return (f"PresetDefaults(name='{self.name}' desc='{self.desc}' "
+ f"note='{self.note}' opts=({repr(self.opts)})")
def __init__(self, name="", desc="", note=None, opts=SoSOptions()):
"""Initialise a new ``PresetDefaults`` object with the specified
diff --git a/sos/report/__init__.py b/sos/report/__init__.py
index ede21ef5..dde6ffd6 100644
--- a/sos/report/__init__.py
+++ b/sos/report/__init__.py
@@ -168,10 +168,10 @@ class SoSReport(SoSComponent):
msg = "cmdline"
elif self.policy.in_container() and self.sysroot != os.sep:
msg = "policy"
- self.soslog.debug("set sysroot to '%s' (%s)" % (self.sysroot, msg))
+ self.soslog.debug(f"set sysroot to '{self.sysroot}' ({msg})")
if self.opts.chroot not in chroot_modes:
- self.soslog.error("invalid chroot mode: %s" % self.opts.chroot)
+ self.soslog.error(f"invalid chroot mode: {self.opts.chroot}")
logging.shutdown()
self.tempfile_util.clean()
self._exit(1)
@@ -426,11 +426,11 @@ class SoSReport(SoSComponent):
ssec = section.add_section(title='See Also')
ssec.add_text(
- "For information on available options for report, see %s and %s"
- % (bold('sos report --help'), bold('man sos-report'))
+ "For information on available options for report, see "
+ f"{bold('sos report --help')} and {bold('man sos-report')}"
)
- ssec.add_text("The following %s sections may be of interest:\n"
- % bold('sos help'))
+ ssec.add_text(f"The following {bold('sos help')} sections may be of "
+ "interest:\n")
help_lines = {
'report.plugins': 'Information on the plugin design of sos',
'report.plugins.$plugin': 'Information on a specific $plugin',
@@ -442,7 +442,7 @@ class SoSReport(SoSComponent):
ssec.add_text(helpln)
def print_header(self):
- print("\n%s\n" % _("sosreport (version %s)" % (__version__,)))
+ print(f"\n{_(f'sosreport (version {__version__})')}\n")
def _get_hardware_devices(self):
self.devices = {
@@ -471,19 +471,19 @@ class SoSReport(SoSComponent):
)
elif not self.policy.runtimes:
msg = ("WARNING: No container runtimes are active, ignoring "
- "option to set default runtime to '%s'\n" % crun)
+ f"option to set default runtime to '{crun}'\n")
self.soslog.warning(msg)
elif crun not in self.policy.runtimes.keys():
valid = ', '.join(p for p in self.policy.runtimes.keys()
if p != 'default')
- raise Exception("Cannot use container runtime '%s': no such "
- "runtime detected. Available runtimes: %s"
- % (crun, valid))
+ raise Exception(f"Cannot use container runtime '{crun}': no "
+ "such runtime detected. Available runtimes: "
+ f"{valid}")
else:
self.policy.runtimes['default'] = self.policy.runtimes[crun]
self.soslog.info(
- "Set default container runtime to '%s'"
- % self.policy.runtimes['default'].name
+ "Set default container runtime to "
+ f"'{self.policy.runtimes['default'].name}'"
)
def _get_fibre_devs(self):
@@ -501,11 +501,11 @@ class SoSReport(SoSComponent):
'fc_vports'
]
for devdir in devdirs:
- if os.path.isdir("/sys/class/%s" % devdir):
- devs.extend(glob.glob("/sys/class/%s/*" % devdir))
+ if os.path.isdir(f"/sys/class/{devdir}"):
+ devs.extend(glob.glob(f"/sys/class/{devdir}/*"))
return devs
except Exception as err:
- self.soslog.error("Could not get fibre device list: %s" % err)
+ self.soslog.error(f"Could not get fibre device list: {err}")
return []
def _get_block_devs(self):
@@ -515,7 +515,7 @@ class SoSReport(SoSComponent):
These devices are used by add_device_cmd() in the Plugin class.
"""
try:
- device_list = ["/dev/%s" % d for d in os.listdir('/sys/block')]
+ device_list = [f"/dev/{d}" for d in os.listdir('/sys/block')]
loop_devices = sos_get_command_output('losetup --all --noheadings')
real_loop_devices = []
if loop_devices['status'] == 0:
@@ -528,7 +528,7 @@ class SoSReport(SoSComponent):
dev_list = list(set(device_list) - set(ghost_loop_devs))
return dev_list
except Exception as err:
- self.soslog.error("Could not get block device list: %s" % err)
+ self.soslog.error(f"Could not get block device list: {err}")
return []
def _get_namespaces(self):
@@ -595,7 +595,7 @@ class SoSReport(SoSComponent):
_devs['ethernet'].append(dname)
_devs['ethernet'] = list(set(_devs['ethernet']))
except Exception as err:
- self.soslog.debug("Could not parse nmcli devices: %s" % err)
+ self.soslog.debug(f"Could not parse nmcli devices: {err}")
return _devs
def _get_eth_devs(self, namespace=None):
@@ -631,7 +631,7 @@ class SoSReport(SoSComponent):
)
else:
try:
- _nscmd = "ip netns exec %s ls /sys/class/net" % namespace
+ _nscmd = f"ip netns exec {namespace} ls /sys/class/net"
_nsout = sos_get_command_output(_nscmd)
if _nsout['status'] == 0:
for _nseth in _nsout['output'].split():
@@ -639,8 +639,8 @@ class SoSReport(SoSComponent):
_eth_devs.append(_nseth)
except Exception as err:
self.soslog.warning(
- "Could not determine network namespace '%s' devices: %s"
- % (namespace, err)
+ f"Could not determine network namespace '{namespace}' "
+ f"devices: {err}"
)
return {
'ethernet': _eth_devs,
@@ -659,15 +659,14 @@ class SoSReport(SoSComponent):
try:
_bout = sos_get_command_output('brctl show', timeout=15)
except Exception as err:
- self.soslog.warning("Unable to enumerate bridge devices: %s" % err)
+ self.soslog.warning(f"Unable to enumerate bridge devices: {err}")
if _bout['status'] == 0:
for _bline in _bout['output'].splitlines()[1:]:
try:
_bridges.append(_bline.split()[0])
except Exception as err:
self.soslog.info(
- "Could not parse device from line '%s': %s"
- % (_bline, err)
+ f"Could not parse device from line '{_bline}': {err}"
)
return _bridges
@@ -941,22 +940,21 @@ class SoSReport(SoSComponent):
if plugname in opts:
for opt in opts[plugname]:
if opt not in plug.options:
- self.soslog.error('no such option "%s" for plugin '
- '(%s)' % (opt, plugname))
+ self.soslog.error(f'no such option "{opt}" for '
+ f'plugin ({plugname})')
self._exit(1)
try:
plug.options[opt].set_value(opts[plugname][opt])
self.soslog.debug(
- "Set %s plugin option to %s"
- % (plugname, plug.options[opt])
- )
+ f"Set {plugname} plugin option to "
+ f"{plug.options[opt]}")
except Exception as err:
self.soslog.error(err)
self._exit(1)
del opts[plugname]
for plugname in opts.keys():
self.soslog.error('WARNING: unable to set option for disabled '
- 'or non-existing plugin (%s).' % (plugname))
+ f'or non-existing plugin ({plugname}).')
# in case we printed warnings above, visually intend them from
# subsequent header text
if opts.keys():
@@ -968,13 +966,13 @@ class SoSReport(SoSComponent):
self.opts.enable_plugins):
plugin_name = plugin.split(".")[0]
if plugin_name not in self.plugin_names:
- self.soslog.fatal('a non-existing plugin (%s) was specified '
- 'in the command line.' % (plugin_name))
+ self.soslog.fatal(f'a non-existing plugin ({plugin_name}) was '
+ 'specified in the command line.')
self._exit(1)
for plugin in self.opts.skip_plugins:
if plugin not in self.plugin_names:
self.soslog.warning(
- "Requested to skip non-existing plugin '%s'." % plugin
+ f"Requested to skip non-existing plugin '{plugin}'."
)
def _set_plugin_options(self):
@@ -988,7 +986,7 @@ class SoSReport(SoSComponent):
msg = "\nEstimate-only mode enabled"
ext_msg = []
if self.opts.threads > 1:
- ext_msg += ["--threads=%s overriden to 1" % self.opts.threads, ]
+ ext_msg += [f"--threads={self.opts.threads} overriden to 1", ]
self.opts.threads = 1
if not self.opts.build:
ext_msg += ["--build enabled", ]
@@ -1012,11 +1010,11 @@ class SoSReport(SoSComponent):
def _report_profiles_and_plugins(self):
self.ui_log.info("")
if len(self.loaded_plugins):
- self.ui_log.info(" %d profiles, %d plugins"
- % (len(self.profiles), len(self.loaded_plugins)))
+ self.ui_log.info(f" {len(self.profiles)} profiles, "
+ f"{len(self.loaded_plugins)} plugins")
else:
# no valid plugins for this profile
- self.ui_log.info(" %d profiles" % len(self.profiles))
+ self.ui_log.info(f" {len(self.profiles)} profiles")
self.ui_log.info("")
def list_plugins(self):
@@ -1028,8 +1026,7 @@ class SoSReport(SoSComponent):
self.ui_log.info(_("The following plugins are currently enabled:"))
self.ui_log.info("")
for (plugname, plug) in self.loaded_plugins:
- self.ui_log.info(" %-20s %s" % (plugname,
- plug.get_description()))
+ self.ui_log.info(f"{plugname:<20} {plug.get_description()}")
else:
self.ui_log.info(_("No plugin enabled."))
self.ui_log.info("")
@@ -1039,10 +1036,9 @@ class SoSReport(SoSComponent):
"disabled:"))
self.ui_log.info("")
for (plugname, plugclass, reason) in self.skipped_plugins:
- self.ui_log.info(" %-20s %-14s %s" % (
- plugname,
- reason,
- plugclass.get_description()))
+ self.ui_log.info(f"{plugname:<20} {reason:<14} "
+ f"{plugclass.get_description()}")
+
self.ui_log.info("")
if self.all_options:
@@ -1061,7 +1057,7 @@ class SoSReport(SoSComponent):
val = TIMEOUT_DEFAULT
if opt.name == 'postproc':
val = not self.opts.no_postproc
- self.ui_log.info(" %-25s %-15s %s" % (opt.name, val, opt.desc))
+ self.ui_log.info(f"{opt.name:<25} {val:<15} {opt.desc}")
self.ui_log.info("")
self.ui_log.info(_("The following plugin options are available:"))
@@ -1081,8 +1077,8 @@ class SoSReport(SoSComponent):
if tmpopt is None:
tmpopt = 0
- self.ui_log.info(" %-25s %-15s %s" % (
- opt.plugin + "." + opt.name, tmpopt, opt.desc))
+ self.ui_log.info(f" {f'{opt.plugin}.{opt.name}':<25} "
+ f"{tmpopt:<15} {opt.desc}")
else:
self.ui_log.info(_("No plugin options available."))
@@ -1091,7 +1087,7 @@ class SoSReport(SoSComponent):
profiles.sort()
lines = _format_list("Profiles: ", profiles, indent=True)
for line in lines:
- self.ui_log.info(" %s" % line)
+ self.ui_log.info(f" {line}")
self._report_profiles_and_plugins()
def list_profiles(self):
@@ -1111,9 +1107,9 @@ class SoSReport(SoSComponent):
for name, plugin in self.loaded_plugins:
if _has_prof(plugin) and profile in plugin.profiles:
plugins.append(name)
- lines = _format_list("%-15s " % profile, plugins, indent=True)
+ lines = _format_list(f"{profile:<15}", plugins, indent=True)
for line in lines:
- self.ui_log.info(" %s" % line)
+ self.ui_log.info(f" {line}")
self._report_profiles_and_plugins()
def list_presets(self):
@@ -1127,14 +1123,14 @@ class SoSReport(SoSComponent):
if not preset:
continue
preset = self.policy.find_preset(preset)
- self.ui_log.info("%14s %s" % ("name:", preset.name))
- self.ui_log.info("%14s %s" % ("description:", preset.desc))
+ self.ui_log.info(f"name: {preset.name:>14}")
+ self.ui_log.info(f"description: {preset.desc:>14}")
if preset.note:
- self.ui_log.info("%14s %s" % ("note:", preset.note))
+ self.ui_log.info(f"note: {preset.note:>14}")
if self.opts.verbosity > 0:
args = preset.opts.to_args()
- options_str = "%14s " % "options:"
+ options_str = f"{'options:':>14}"
lines = _format_list(options_str, args, indent=True, sep=' ')
for line in lines:
self.ui_log.info(line)
@@ -1149,7 +1145,7 @@ class SoSReport(SoSComponent):
"""
policy = self.policy
if policy.find_preset(name):
- self.ui_log.error("A preset named '%s' already exists" % name)
+ self.ui_log.error(f"A preset named '{name}' already exists")
return False
desc = desc or self.opts.desc
@@ -1158,15 +1154,15 @@ class SoSReport(SoSComponent):
try:
policy.add_preset(name=name, desc=desc, note=note, opts=self.opts)
except Exception as e:
- self.ui_log.error("Could not add preset: %s" % e)
+ self.ui_log.error(f"Could not add preset: {e}")
return False
# Filter --add-preset <name> from arguments list
arg_index = self.cmdline.index("--add-preset")
args = self.cmdline[0:arg_index] + self.cmdline[arg_index + 2:]
- self.ui_log.info("Added preset '%s' with options %s\n" %
- (name, " ".join(args)))
+ self.ui_log.info(
+ f"Added preset '{name}' with options {' '.join(args)}\n")
return True
def del_preset(self, name):
@@ -1177,7 +1173,7 @@ class SoSReport(SoSComponent):
"""
policy = self.policy
if not policy.find_preset(name):
- self.ui_log.error("Preset '%s' not found" % name)
+ self.ui_log.error(f"Preset '{name}' not found")
return False
try:
@@ -1186,7 +1182,7 @@ class SoSReport(SoSComponent):
self.ui_log.error(str(e) + "\n")
return False
- self.ui_log.info("Deleted preset '%s'\n" % name)
+ self.ui_log.info(f"Deleted preset '{name}'\n")
return True
def batch(self):
@@ -1208,11 +1204,11 @@ class SoSReport(SoSComponent):
def _log_plugin_exception(self, plugin, method):
trace = traceback.format_exc()
msg = "caught exception in plugin method"
- plugin_err_log = "%s-plugin-errors.txt" % plugin
+ plugin_err_log = f"{plugin}-plugin-errors.txt"
logpath = os.path.join(self.logdir, plugin_err_log)
- self.soslog.error('%s "%s.%s()"' % (msg, plugin, method))
- self.soslog.error('writing traceback to %s' % logpath)
- self.archive.add_string("%s\n" % trace, logpath, mode='a')
+ self.soslog.error(f'{msg} "{plugin}.{method}()"')
+ self.soslog.error(f'writing traceback to {logpath}')
+ self.archive.add_string(f"{trace}\n", logpath, mode='a')
def prework(self):
self.policy.pre_work()
@@ -1227,10 +1223,10 @@ class SoSReport(SoSComponent):
# file system containing our temporary log files).
if e.errno in fatal_fs_errors:
print("")
- print(" %s while setting up archive" % e.strerror)
+ print(f" {e.strerror} while setting up archive")
print("")
else:
- print("Error setting up archive: %s" % e)
+ print(f"Error setting up archive: {e}")
raise
except Exception as e:
self.ui_log.error("")
@@ -1262,8 +1258,8 @@ class SoSReport(SoSComponent):
except (OSError, IOError) as e:
if e.errno in fatal_fs_errors:
self.ui_log.error("")
- self.ui_log.error(" %s while setting up plugins"
- % e.strerror)
+ self.ui_log.error(
+ f" {e.strerror} while setting up plugins")
self.ui_log.error("")
self._exit(1)
self.handle_exception(plugname, "setup")
@@ -1275,7 +1271,7 @@ class SoSReport(SoSComponent):
version file"""
versions = []
- versions.append("sosreport: %s" % __version__)
+ versions.append(f"sosreport: {__version__}")
self.archive.add_string(content="\n".join(versions),
dest='version.txt')
@@ -1297,8 +1293,7 @@ class SoSReport(SoSComponent):
list(self.pluglist))
for res in results:
if not res:
- self.soslog.debug("Unexpected plugin task result: %s" %
- res)
+ self.soslog.debug(f"Unexpected plugin task result: {res}")
self.ui_log.info("")
except KeyboardInterrupt:
# We may not be at a newline when the user issues Ctrl-C
@@ -1322,11 +1317,11 @@ class SoSReport(SoSComponent):
_plug.manifest.add_field('end_time', end)
_plug.manifest.add_field('run_time', end - start)
except TimeoutError:
- msg = "Plugin %s timed out" % plugin[1]
+ msg = f"Plugin {plugin[1]} timed out"
# log to ui_log.error to show the user, log to soslog.info
# so that someone investigating the sos execution has it all
# in one place, but without double notifying the user.
- self.ui_log.error("\n %s\n" % msg)
+ self.ui_log.error(f"\n {msg}\n")
self.soslog.info(msg)
self.running_plugs.remove(plugin[1])
self.loaded_plugins[plugin[0]-1][1].set_timeout_hit()
@@ -1337,7 +1332,7 @@ class SoSReport(SoSComponent):
# data collected by the plugin - if the command fails, count with 0
tmpdir = self.archive.get_tmp_dir()
try:
- du = sos_get_command_output('du -sB1 %s' % tmpdir)
+ du = sos_get_command_output(f'du -sB1 {tmpdir}')
self.estimated_plugsizes[plugin[1]] = \
int(du['output'].split()[0])
except Exception:
@@ -1362,11 +1357,8 @@ class SoSReport(SoSComponent):
except Exception:
return False
numplugs = len(self.loaded_plugins)
- status_line = " Starting %-5s %-15s %s" % (
- "%d/%d" % (count, numplugs),
- plugname,
- "[Running: %s]" % ' '.join(p for p in self.running_plugs)
- )
+ status_line = (f" Starting {f'{count}/{numplugs}':<5} {plugname:<15} "
+ f"[Running: {' '.join(p for p in self.running_plugs)}]")
self.ui_progress(status_line)
try:
plug.collect_plugin()
@@ -1389,10 +1381,8 @@ class SoSReport(SoSComponent):
status = ''
if (len(self.pluglist) <= int(self.opts.threads) and
self.running_plugs):
- status = " Finishing plugins %-12s %s" % (
- " ",
- "[Running: %s]" % (' '.join(p for p in self.running_plugs))
- )
+ status = (f" Finishing plugins {' ':<12} [Running: "
+ f"{' '.join(p for p in self.running_plugs)}]")
if not self.running_plugs and not self.pluglist:
status = "\n Finished running plugins"
if status:
@@ -1403,10 +1393,10 @@ class SoSReport(SoSComponent):
pass
except (OSError, IOError) as e:
if e.errno in fatal_fs_errors:
- self.ui_log.error("\n %s while collecting plugin data"
- % e.strerror)
- self.ui_log.error(" Data collected still available at %s\n"
- % self.tmpdir)
+ self.ui_log.error(
+ f"\n {e.strerror} while collecting plugin data")
+ self.ui_log.error(
+ f" Data collected still available at {self.tmpdir}\n")
os._exit(1)
self.handle_exception(plugname, "collect")
except Exception:
@@ -1415,9 +1405,9 @@ class SoSReport(SoSComponent):
def ui_progress(self, status_line):
if self.opts.verbosity == 0 and not self.opts.batch:
- status_line = "\r%s" % status_line.ljust(90)
+ status_line = f"\r{status_line.ljust(90)}"
else:
- status_line = "%s\n" % status_line
+ status_line = f"{status_line}\n"
if not self.opts.quiet:
sys.stdout.write(status_line)
sys.stdout.flush()
@@ -1426,8 +1416,8 @@ class SoSReport(SoSComponent):
if not self.env_vars:
return
env = '\n'.join([
- "%s=%s" % (name, val) for (name, val) in
- [(name, '%s' % os.environ.get(name)) for name in self.env_vars if
+ f"{name}={val}" for (name, val) in
+ [(name, f'{os.environ.get(name)}') for name in self.env_vars if
os.environ.get(name) is not None]
]) + '\n'
self.archive.add_string(env, 'environment')
@@ -1482,8 +1472,8 @@ class SoSReport(SoSComponent):
except (OSError, IOError) as e:
if e.errno in fatal_fs_errors:
self.ui_log.error("")
- self.ui_log.error(" %s while writing %s report"
- % (e.strerror, type_))
+ self.ui_log.error(
+ f" {e.strerror} while writing {type_} report")
self.ui_log.error("")
self._exit(1)
@@ -1493,13 +1483,13 @@ class SoSReport(SoSComponent):
if plug.get_option('postproc'):
plug.postproc()
else:
- self.soslog.info("Skipping postproc for plugin %s"
- % plugname)
+ self.soslog.info(
+ f"Skipping postproc for plugin {plugname}")
except (OSError, IOError) as e:
if e.errno in fatal_fs_errors:
self.ui_log.error("")
- self.ui_log.error(" %s while post-processing plugin data"
- % e.strerror)
+ self.ui_log.error(
+ f" {e.strerror} while post-processing plugin data")
self.ui_log.error("")
self._exit(1)
self.handle_exception(plugname, "postproc")
@@ -1556,7 +1546,7 @@ class SoSReport(SoSComponent):
map_file, _paths = cleaner.execute()
do_clean = True
except Exception as err:
- print(_("ERROR: Unable to obfuscate report: %s" % err))
+ print(_(f"ERROR: Unable to obfuscate report: {err}"))
self._add_sos_logs()
if self.manifest is not None:
@@ -1594,13 +1584,12 @@ class SoSReport(SoSComponent):
_sum = get_human_readable(sum(self.estimated_plugsizes.values()))
self.ui_log.info("Estimated disk space requirement for whole "
- "uncompressed sos report directory: %s" % _sum)
+ f"uncompressed sos report directory: {_sum}")
bigplugins = sorted(self.estimated_plugsizes.items(),
key=lambda x: x[1], reverse=True)[:5]
- bp_out = ", ".join("%s: %s" %
- (p, get_human_readable(v, precision=0))
+ bp_out = ", ".join(f"{p}: {get_human_readable(v, precision=0)}"
for p, v in bigplugins)
- self.ui_log.info("Five biggest plugins: %s" % bp_out)
+ self.ui_log.info(f"Five biggest plugins: {bp_out}")
self.ui_log.info("")
self.ui_log.info("Please note the estimation is relevant to the "
"current options.")
@@ -1622,8 +1611,8 @@ class SoSReport(SoSComponent):
self.opts.compression_type)
except (OSError, IOError) as e:
print("")
- print(_(" %s while finalizing archive %s" %
- (e.strerror, self.archive.get_archive_path())))
+ print(_(f" {e.strerror} while finalizing archive "
+ f"{self.archive.get_archive_path()}"))
print("")
if e.errno in fatal_fs_errors:
self._exit(1)
@@ -1649,7 +1638,7 @@ class SoSReport(SoSComponent):
os.rename(directory, final_dir)
directory = final_dir
except (OSError, IOError):
- print(_("Error moving directory: %s" % directory))
+ print(_(f"Error moving directory: {directory}"))
return False
checksum = None
@@ -1671,7 +1660,7 @@ class SoSReport(SoSComponent):
try:
self._write_checksum(archive, hash_name, checksum)
except (OSError, IOError):
- print(_("Error writing checksum for file: %s" % archive))
+ print(_(f"Error writing checksum for file: {archive}"))
# output filename is in the private tmpdir - move it to the
# containing directory.
@@ -1692,7 +1681,7 @@ class SoSReport(SoSComponent):
os.rename(archive, final_name)
archive = final_name
except (OSError, IOError):
- print(_("Error moving archive file: %s" % archive))
+ print(_(f"Error moving archive file: {archive}"))
return False
# There is a race in the creation of the final checksum file:
@@ -1710,7 +1699,7 @@ class SoSReport(SoSComponent):
try:
os.rename(archive_hash, final_hash)
except (OSError, IOError):
- print(_("Error moving checksum file: %s" % archive_hash))
+ print(_(f"Error moving checksum file: {archive_hash}"))
self.policy.display_results(archive, directory, checksum,
archivestat, map_file=map_file)
@@ -1725,7 +1714,7 @@ class SoSReport(SoSComponent):
self.policy.upload_archive(archive)
self.ui_log.info(_("Uploaded archive successfully"))
except Exception as err:
- self.ui_log.error("Upload attempt failed: %s" % err)
+ self.ui_log.error(f"Upload attempt failed: {err}")
else:
msg = ("Unable to upload archive when using --build as no "
"archive is created.")
@@ -1810,13 +1799,13 @@ class SoSReport(SoSComponent):
# Log active preset defaults
preset_args = self.preset.opts.to_args()
- msg = ("[%s:%s] using '%s' preset defaults (%s)" %
- (__name__, "setup", self.preset.name, " ".join(preset_args)))
+ msg = (f"[{__name__}:setup] using '{self.preset.name}' preset defaults"
+ f" ({' '.join(preset_args)})")
self.soslog.info(msg)
# Log effective options after applying preset defaults
- self.soslog.info("[%s:%s] effective options now: %s" %
- (__name__, "setup", " ".join(self.opts.to_args())))
+ self.soslog.info(f"[{__name__}:setup] effective options now: "
+ f"{' '.join(self.opts.to_args())}")
def execute(self):
try:
diff --git a/sos/utilities.py b/sos/utilities.py
index 84419e4e..8f165ab4 100644
--- a/sos/utilities.py
+++ b/sos/utilities.py
@@ -114,7 +114,7 @@ def fileobj(path_or_file, mode='r'):
return open(path_or_file, mode)
except IOError:
log = logging.getLogger('sos')
- log.debug("fileobj: %s could not be opened" % path_or_file)
+ log.debug(f"fileobj: {path_or_file} could not be opened")
return closing(io.StringIO())
else:
return closing(path_or_file)
@@ -124,15 +124,15 @@ def convert_bytes(bytes_, K=1 << 10, M=1 << 20, G=1 << 30, T=1 << 40):
"""Converts a number of bytes to a shorter, more human friendly format"""
fn = float(bytes_)
if bytes_ >= T:
- return '%.1fT' % (fn / T)
+ return f'{(fn / T):.1fT}'
elif bytes_ >= G:
- return '%.1fG' % (fn / G)
+ return f'{(fn / G):.1fG}'
elif bytes_ >= M:
- return '%.1fM' % (fn / M)
+ return f'{(fn / M):.1fM}'
elif bytes_ >= K:
- return '%.1fK' % (fn / K)
+ return f'{(fn / K):.1fK}'
else:
- return '%d' % bytes_
+ return f'{bytes_}'
def file_is_binary(fname):
@@ -250,11 +250,8 @@ def sos_get_command_output(command, timeout=TIMEOUT_DEFAULT, stderr=False,
cmd_env.pop(key, None)
# use /usr/bin/timeout to implement a timeout
if timeout and is_executable("timeout"):
- command = "timeout %s %ds %s" % (
- '--foreground' if foreground else '',
- timeout,
- command
- )
+ command = (f"timeout {'--foreground' if foreground else ''} {timeout}s"
+ f" {command}")
args = shlex.split(command)
# Expand arguments that are wildcard root paths.
@@ -364,7 +361,7 @@ def get_human_readable(size, precision=2):
while size > 1024 and suffixindex < 4:
suffixindex += 1
size = size/1024.0
- return "%.*f%s" % (precision, size, suffixes[suffixindex])
+ return f"{size:.{precision}f}{suffixes[suffixindex]}"
def _os_wrapper(path, sysroot, method, module=os.path):