1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
|
# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.
from avocado.core.exceptions import TestSkipError
from avocado import Test
from avocado.utils import archive, process
from fnmatch import fnmatch
import glob
import json
import os
import pickle
import socket
import re
SOS_TEST_DIR = os.path.dirname(os.path.realpath(__file__))
SOS_BIN = os.path.realpath(os.path.join(SOS_TEST_DIR, '../bin/sos'))
def skipIf(cond, message=None):
def decorator(function):
def wrapper(self, *args, **kwargs):
if callable(cond):
if cond(self):
raise TestSkipError(message)
elif cond:
raise TestSkipError(message)
return wrapper
return decorator
class BaseSoSTest(Test):
"""Base class for all our test classes to build off of.
Subclasses avocado.Test and then adds wrappers and helper methods that are
needed across sos components. Component specific test classes should in
turn subclass ``BaseSoSTest`` rather than ``avocado.Test`` directly
"""
_klass_name = None
_tmpdir = None
sos_cmd = ''
@property
def klass_name(self):
if not self._klass_name:
self._klass_name = os.path.basename(__file__) + '.' + self.__class__.__name__
return self._klass_name
@property
def tmpdir(self):
if not self._tmpdir:
self._tmpdir = os.getenv('AVOCADO_TESTS_COMMON_TMPDIR') + self.klass_name
return self._tmpdir
def generate_sysinfo(self):
"""Collects some basic information about the system for later reference
in individual tests
"""
sysinfo = {}
# get kernel modules
mods = []
_out = process.run('lsmod').stdout.decode()
for line in _out.splitlines()[1:]:
mods.append(line.split()[0])
# this particular kmod is both innocuous and unpredictable in terms of
# pre-loading even within the same distribution. For now, turn a blind
# eye to it with regards to the "no new kmods loaded" perspective
if 'binfmt_misc' in mods:
mods.remove('binfmt_misc')
sysinfo['modules'] = sorted(mods, key=str.lower)
# get networking info
hostname = socket.gethostname()
ip_addr = socket.gethostbyname(hostname)
sysinfo['networking'] = {}
sysinfo['networking']['hostname'] = hostname
sysinfo['networking']['ip_addr'] = ip_addr
return sysinfo
def _write_file_to_tmpdir(self, fname, content):
"""Write the given content to fname within the test's tmpdir
"""
fname = os.path.join(self.tmpdir, fname)
if isinstance(content, bytes):
content = content.decode()
with open(fname, 'w') as wfile:
wfile.write(content)
def read_file_from_tmpdir(self, fname):
fname = os.path.join(self.tmpdir, fname)
with open(fname, 'r') as tfile:
return tfile.read()
return ''
def _write_sysinfo(self, fname):
"""Get the current state of sysinfo and write it into our shared
tempdir so it can be loaded in setUp() later
:param fname: The name of the file to be written in the tempdir
:type fname: ``str``
"""
sysinfo = self.generate_sysinfo()
self._write_file_to_tmpdir(fname, json.dumps(sysinfo))
def _read_sysinfo(self, fname):
sysinfo = {}
content = self.read_file_from_tmpdir(fname)
if content:
sysinfo = json.loads(content)
return sysinfo
def set_pre_sysinfo(self):
self._write_sysinfo('pre_sysinfo')
def get_pre_sysinfo(self):
return self._read_sysinfo('pre_sysinfo')
def set_post_sysinfo(self):
self._write_sysinfo('post_sysinfo')
def get_post_sysinfo(self):
return self._read_sysinfo('post_sysinfo')
def get_sysinfo(self):
sinfo = {
'pre': self.get_pre_sysinfo(),
'post': self.get_post_sysinfo()
}
return sinfo
def assertFileExists(self, fname):
"""Asserts that fname exists on the filesystem"""
assert os.path.exists(fname), "%s does not exist" % fname
def assertFileNotExists(self, fname):
"""Asserts that fname does not exist on the filesystem"""
assert not os.path.exists(fname), "%s exists" % fname
class BaseSoSReportTest(BaseSoSTest):
"""This is the class to use for building sos report tests with.
An instance of this test is expected to set at minimum a ``sos_cmd`` class
attribute that represets the options handed to a specific execution of an
sos command. This should be anything following ``sos report --batch``.
"""
archive = None
_manifest = None
@property
def manifest(self):
if self._manifest is None:
try:
content = self.read_file_from_tmpdir(self.get_name_in_archive('sos_reports/manifest.json'))
self._manifest = json.loads(content)
except Exception:
self._manifest = ''
self.warn('Could not load manifest for test')
return self._manifest
def _extract_archive(self, arc_path):
"""Extract an archive to the temp directory
"""
_extract_path = self._get_extracted_tarball_path()
try:
archive.extract(arc_path, _extract_path)
self.archive_path = self._get_archive_path()
except Exception as err:
self.cancel("Could not extract archive: %s" % err)
def _get_extracted_tarball_path(self):
"""Based on the klass id setup earlier, provide a name to extract the
archive to within the tmpdir
"""
return os.path.join(self.tmpdir, "sosreport-%s" % self.__class__.__name__)
def _execute_sos_cmd(self):
"""Run the sos command for this test case, and extract it
"""
_cmd = '%s report --batch --tmp-dir %s %s'
exec_cmd = _cmd % (SOS_BIN, self.tmpdir, self.sos_cmd)
self.cmd_output = process.run(exec_cmd, timeout=300)
with open(os.path.join(self.tmpdir, 'output'), 'wb') as pfile:
pickle.dump(self.cmd_output, pfile)
self.cmd_output.stdout = self.cmd_output.stdout.decode()
self.cmd_output.stderr = self.cmd_output.stderr.decode()
self.archive = re.findall('/.*sosreport-.*tar.*', self.cmd_output.stdout)[-1]
if self.archive:
self._extract_archive(self.archive)
def _setup_tmpdir(self):
if not os.path.isdir(self.tmpdir):
os.mkdir(self.tmpdir)
def _get_archive_path(self):
return glob.glob(self._get_extracted_tarball_path() + '/sosreport*')[0]
def setup_mocking(self):
"""Since we need to use setUp() in our overrides of avocado.Test,
provide an alternate method for test cases that subclass BaseSoSTest
to use.
"""
pass
def setUp(self):
"""Execute and extract the sos report to our temporary location, then
call sos_setup() for individual test case setup and/or mocking.
"""
# check to prevent multiple setUp() runs
if not os.path.isdir(self.tmpdir):
# setup our class-shared tmpdir
self._setup_tmpdir()
# do our mocking called for in sos_setup
self.setup_mocking()
# gather some pre-execution information
self.set_pre_sysinfo()
# run the sos command for this test case
self._execute_sos_cmd()
self.set_post_sysinfo()
else:
with open(os.path.join(self.tmpdir, 'output'), 'rb') as pfile:
self.cmd_output = pickle.load(pfile)
if isinstance(self.cmd_output.stdout, bytes):
self.cmd_output.stdout = self.cmd_output.stdout.decode()
self.cmd_output.stderr = self.cmd_output.stderr.decode()
for f in os.listdir(self.tmpdir):
if fnmatch(f, 'sosreport*.tar.??'):
self.archive = os.path.join(self.tmpdir, f)
break
self.sysinfo = self.get_sysinfo()
self.archive_path = self._get_archive_path()
def get_name_in_archive(self, fname):
"""Get the full path to fname as it (would) exist in the archive
"""
return os.path.join(self.archive_path, fname.lstrip('/'))
def get_file_content(self, fname):
"""Reads the content of fname from within the archive and returns it
:param fname: The name of the file
:type fname: ``str``
:returns: Content of fname
:rtype: ``str``
"""
content = ''
with open(self.get_name_in_archive(fname), 'r') as gfile:
content = gfile.read()
return content
def assertFileCollected(self, fname):
"""Ensure that a given fname is in the extracted archive if it exists
on the host system
:param fname: The name of the file within the archive
:type fname: ``str``
"""
if os.path.exists(fname):
self.assertFileExists(self.get_name_in_archive(fname))
assert True
def assertFileNotCollected(self, fname):
"""Ensure that a given fname is NOT in the extracted archive
:param fname: The name of the file within the archive
:type fname: ``str``
"""
self.assertFileNotExists(self.get_name_in_archive(fname))
def assertFileGlobInArchive(self, fname):
"""Ensure that at least one file in the archive matches a given fname
glob
:param fname: The glob to match filenames of
:type fname: ``str``
"""
files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/')))
assert files, "No files matching %s found" % fname
def assertFileGlobNotInArchive(self, fname):
"""Ensure that there are NO files in the archive matching a given fname
glob
:param fname: The glob to match filename(s) of
:type fname: ``str``
"""
files = glob.glob(os.path.join(self.tmpdir, fname.lstrip('/')))
self.log.debug(files)
assert not files, "Found files in archive matching %s: %s" % (fname, files)
def assertFileHasContent(self, fname, content):
"""Ensure that the given file fname contains the given content
:param fname: The name of the file
:type fname: ``str``
:param content: The content to match
:type content: ``str``
"""
matched = False
fname = self.get_name_in_archive(fname)
self.assertFileExists(fname)
with open(fname, 'r') as lfile:
_contents = lfile.read()
for line in _contents.splitlines():
if re.match(".*%s.*" % content, line, re.I):
matched = True
break
assert matched, "Content '%s' does not appear in %s\n%s" % (content, fname, _contents)
def assertFileNotHasContent(self, fname, content):
"""Ensure that the file file fname does NOT contain the given content
:param fname: The name of the file
:type fname: ``str``
:param content: The content to (not) match
:type content: ``str``
"""
matched = False
fname = self.get_name_in_archive(fname)
with open(fname, 'r') as mfile:
for line in mfile.read().splitlines():
if re.match(".*%s.*" % content, line, re.I):
matched = True
break
assert not matched, "Content '%s' appears in file %s" % (content, fname)
def assertSosLogContains(self, content):
"""Ensure that the given content string exists in sos.log
"""
self.assertFileHasContent('sos_logs/sos.log', content)
def assertSosLogNotContains(self, content):
"""Ensure that the given content string does NOT exist in sos.log
"""
self.assertFileNotHasContent('sos_logs/sos.log', content)
def assertOutputContains(self, content):
"""Ensure that stdout did contain the given content string
:param content: The string that should not be in stdout
:type content: ``str``
"""
assert content in self.cmd_output.stdout, 'Content string not in output'
def assertOutputNotContains(self, content):
"""Ensure that stdout did NOT contain the given content string
:param content: The string that should not be in stdout
:type content: ``str``
"""
assert not re.match(".*%s.*" % content, self.cmd_output.stdout), "String '%s' present in stdout" % content
def assertPluginIncluded(self, plugin):
"""Ensure that the specified plugin did run for the sos execution
Note that this relies on manifest.json being successfully created
:param plugin: The name of the plugin
:type plugin: `` str``
"""
if not self.manifest:
self.error("No manifest found, cannot check for %s execution" % plugin)
assert plugin in self.manifest['components']['report']['plugins'].keys(), 'Plugin not recorded in manifest'
def assertPluginNotIncluded(self, plugin):
"""Ensure that the specified plugin did NOT run for the sos execution
Note that this relies on manifest.json being successfully created
:param plugin: The name of the plugin
:type plugin: `` str``
"""
if not self.manifest:
self.error("No manifest found, cannot check for %s execution" % plugin)
assert plugin not in self.manifest['components']['report']['plugins'].keys(), 'Plugin is recorded in manifest'
def assertOnlyPluginsIncluded(self, plugins):
"""Ensure that only the specified plugins are in the manifest
:param plugins: The plugin names
:type plugins: ``str`` or ``list`` of strings
"""
if not self.manifest:
self.error("No manifest found, cannot check for %s execution" % plugins)
if isinstance(plugins, str):
plugins = [plugins]
_executed = self.manifest['components']['report']['plugins'].keys()
# test that all requested plugins did run
for i in plugins:
assert i in _executed, "Requested plugin '%s' did not run" % i
# test that no unrequested plugins ran
for j in _executed:
assert j in plugins, "Unrequested plugin '%s' ran as well" % j
class StageOneReportTest(BaseSoSReportTest):
"""This is the test class to subclass for all Stage One (no mocking) tests
within the sos test suite.
In addition to any test_* methods defined in the test cases that subclass
this, the methods defined here will ALSO run, to ensure basic consistency
across test cases
NOTE: You MUST replace the following line in the docstring of your own
test cases, as otherwise the test will be disabled. This line is here to
prevent this base class from being treated as a valid test case. Also, if
you add any tests to this base class, make sure to add a line such as
':avocado: tags=stageone' to ensure the base tests run with new test cases
:avocado: disable
:avocado: tags=stageone
"""
sos_cmd = ''
def test_archive_created(self):
"""Ensure that the archive tarball was created and has the right owner
:avocado: tags=stageone
"""
self.assertFileExists(self.archive)
self.assertTrue(os.stat(self.archive).st_uid == 0)
def test_no_new_kmods_loaded(self):
"""Ensure that no additional kernel modules have been loaded during an
execution of a test
:avocado: tags=stageone
"""
self.assertCountEqual(self.sysinfo['pre']['modules'],
self.sysinfo['post']['modules'])
def test_archive_has_sos_dirs(self):
"""Ensure that we have the expected directory layout with in the
archive
:avocado: tags=stageone
"""
self.assertFileCollected('sos_commands')
self.assertFileCollected('sos_logs')
def test_manifest_created(self):
"""
:avocado: tags=stageone
"""
self.assertFileCollected('sos_reports/manifest.json')
@skipIf(lambda x: '--no-report' in x.sos_cmd, '--no-report used in command')
def test_html_reports_created(self):
"""
:avocado: tags=stageone
"""
self.assertFileCollected('sos_reports/sos.html')
def test_no_exceptions_during_execution(self):
"""
:avocado: tags=stageone
"""
self.assertSosLogNotContains('caught exception in plugin')
self.assertFileGlobNotInArchive('sos_logs/*-plugin-errors.txt')
def test_no_ip_changes(self):
"""
:avocado: tags=stageone
"""
# I.E. make sure we didn't cause any NIC flaps that for some reason
# resulted in a new primary IP address. TODO: build this out to make
# sure this IP is still bound to the same NIC
self.assertEqual(self.sysinfo['pre']['networking']['ip_addr'],
self.sysinfo['post']['networking']['ip_addr'])
|