aboutsummaryrefslogtreecommitdiffstats
path: root/tests/sos_tests.py
blob: a178ce4a0256e19ad1c02e5eaf63839b8bc6600f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.


from avocado.core.exceptions import TestSkipError
from avocado.core.output import LOG_UI
from avocado import Test
from avocado.utils import archive, process, distro, software_manager
from fnmatch import fnmatch

import glob
import json
import os
import pickle
import shutil
import socket
import re

SOS_TEST_DIR = os.path.dirname(os.path.realpath(__file__))
SOS_REPO_ROOT = os.path.realpath(os.path.join(SOS_TEST_DIR, '../'))
SOS_PLUGIN_DIR = os.path.realpath(os.path.join(SOS_REPO_ROOT, 'sos/report/plugins'))
SOS_TEST_DATA_DIR = os.path.realpath(os.path.join(SOS_TEST_DIR, 'test_data'))
SOS_TEST_BIN = os.path.realpath(os.path.join(SOS_TEST_DIR, '../bin/sos'))

RH_DIST = ['rhel', 'centos', 'fedora']
UBUNTU_DIST = ['Ubuntu', 'debian']

def skipIf(cond, message=None):
    def decorator(function):
        def wrapper(self, *args, **kwargs):
            if callable(cond):
                if cond(self):
                    raise TestSkipError(message)
            elif cond:
                raise TestSkipError(message)
        return wrapper
    return decorator

def redhat_only(tst):
    def wrapper(func):
        if distro.detect().name not in RH_DIST:
            raise TestSkipError('Not running on a Red Hat distro')
    return wrapper

def ubuntu_only(tst):
    def wrapper(func):
        if distro.detect().name not in UBUNTU_DIST:
            raise TestSkipError('Not running on a Ubuntu or Debian distro')
    return wrapper

class BaseSoSTest(Test):
    """Base class for all our test classes to build off of.

    Subclasses avocado.Test and then adds wrappers and helper methods that are
    needed across sos components. Component specific test classes should in
    turn subclass ``BaseSoSTest`` rather than ``avocado.Test`` directly
    """

    _klass_name = None
    _tmpdir = None
    _exception_expected = False
    _local_sos_bin = shutil.which('sos') or SOS_TEST_BIN
    sos_cmd = ''
    sos_timeout = 600
    redhat_only = False
    ubuntu_only = False
    end_of_test_case = False

    @property
    def klass_name(self):
        if not self._klass_name:
            self._klass_name = os.path.basename(__file__) + '.' + self.__class__.__name__
        return self._klass_name

    @property
    def tmpdir(self):
        if not self._tmpdir:
            self._tmpdir = os.getenv('AVOCADO_TESTS_COMMON_TMPDIR') + self.klass_name
        return self._tmpdir

    @property
    def sos_bin(self):
        return self._local_sos_bin if self.params.get('TESTLOCAL') == 'true' else SOS_TEST_BIN

    def generate_sysinfo(self):
        """Collects some basic information about the system for later reference
        in individual tests
        """
        sysinfo = {}

        # get kernel modules
        mods = []
        _out = process.run('lsmod').stdout.decode()
        for line in _out.splitlines()[1:]:
            mods.append(line.split()[0])
        # this particular kmod is both innocuous and unpredictable in terms of
        # pre-loading even within the same distribution. For now, turn a blind
        # eye to it with regards to the "no new kmods loaded" perspective
        if 'binfmt_misc' in mods:
            mods.remove('binfmt_misc')
        sysinfo['modules'] = sorted(mods, key=str.lower)

        # get networking info
        hostname = socket.gethostname()
        ip_addr = socket.gethostbyname(hostname)
        sysinfo['networking'] = {}
        sysinfo['networking']['hostname'] = hostname
        sysinfo['networking']['ip_addr'] = ip_addr

        return sysinfo

    def _generate_sos_command(self):
        """Based on the specific test class that is subclassing BaseSoSTest,
        perform whatever logic is necessary to create the sos command that will
        be executed
        """
        raise NotImplementedError

    def _execute_sos_cmd(self):
        """Run the sos command for this test case, and extract it
        """
        exec_cmd = self._generate_sos_command()
        try:
            self.cmd_output = process.run(exec_cmd, timeout=self.sos_timeout,
                                          env={'SOS_TEST_LOGS': 'keep'})
        except Exception as err:
            if not hasattr(err, 'result'):
                # can't inspect the exception raised, just bail out
                raise
            if self._exception_expected:
                self.cmd_output = err.result
            else:
                msg = err.result.stderr.decode() or err.result.stdout.decode()
                # a little hacky, but using self.log methods here will not
                # print to console unless we ratchet up the verbosity for the
                # entire test suite, which will become very difficult to read
                LOG_UI.error('ERROR:\n' + msg[:8196])  # don't flood w/ super verbose logs
                if err.result.interrupted:
                    raise Exception("Timeout exceeded, see output above")
                else:
                    raise Exception("Command failed, see output above: '%s'"
                                    % err.command.split('bin/')[1])
        with open(os.path.join(self.tmpdir, 'output'), 'wb') as pfile:
            pickle.dump(self.cmd_output, pfile)
        self.cmd_output.stdout = self.cmd_output.stdout.decode()
        self.cmd_output.stderr = self.cmd_output.stderr.decode()

    def _setup_tmpdir(self):
        if not os.path.isdir(self.tmpdir):
            os.mkdir(self.tmpdir)

    def _write_file_to_tmpdir(self, fname, content):
        """Write the given content to fname within the test's tmpdir
        """
        fname = os.path.join(self.tmpdir, fname)
        if isinstance(content, bytes):
            content = content.decode()
        with open(fname, 'w') as wfile:
            wfile.write(content)

    def read_file_from_tmpdir(self, fname):
        fname = os.path.join(self.tmpdir, fname)
        try:
            with open(fname, 'r') as tfile:
                return tfile.read()
        except Exception:
            pass
        return ''

    def _write_sysinfo(self, fname):
        """Get the current state of sysinfo and write it into our shared
        tempdir so it can be loaded in setUp() later

        :param fname:  The name of the file to be written in the tempdir
        :type fname: ``str``
        """
        sysinfo = self.generate_sysinfo()
        self._write_file_to_tmpdir(fname, json.dumps(sysinfo))

    def _read_sysinfo(self, fname):
        sysinfo = {}
        content = self.read_file_from_tmpdir(fname)
        if content:
            sysinfo = json.loads(content)
        return sysinfo

    def set_pre_sysinfo(self):
        self._write_sysinfo('pre_sysinfo')

    def get_pre_sysinfo(self):
        return self._read_sysinfo('pre_sysinfo')

    def set_post_sysinfo(self):
        self._write_sysinfo('post_sysinfo')

    def get_post_sysinfo(self):
        return self._read_sysinfo('post_sysinfo')

    def get_sysinfo(self):
        sinfo = {
            'pre': self.get_pre_sysinfo(),
            'post': self.get_post_sysinfo()
        }
        return sinfo

    def check_distro_for_enablement(self):
        """Check if the test case is meant only for a specific distro family,
        and if it is and we are not running on that family, skip all the tests
        for that test case.

        This allows us to define distro-specific test classes much the same way
        we can define distro-specific tests _within_ a test class using the
        appropriate decorators. We can't use the decorators for the class however
        due to how avocado catches instantiation exceptions, so instead we need
        to raise the skip exception after instantiation is done.
        """
        if self.redhat_only:
            if self.local_distro not in RH_DIST:
                raise TestSkipError('Not running on a Red Hat distro')
        elif self.ubuntu_only:
            if self.local_distro not in UBUNTU_DIST:
                raise TestSkipError("Not running on a Ubuntu or Debian distro")

    def setUp(self):
        """Setup the tmpdir and any needed mocking for the test, then execute
        the defined sos command. Ensure that we only run the sos command once
        for every test case, instead of once for every test_* method defined.
        """
        self.local_distro = distro.detect().name
        self.check_distro_for_enablement()
        # check to prevent multiple setUp() runs
        if not os.path.isdir(self.tmpdir):
            # setup our class-shared tmpdir
            self._setup_tmpdir()

            # do mocking called for in stage 2+ tests
            self.setup_mocking()

            # do any pre-execution setup
            self.pre_sos_setup()

            # gather some pre-execution information
            self.set_pre_sysinfo()

            # run the sos command for this test case
            self._execute_sos_cmd()
            self.set_post_sysinfo()
        else:
            with open(os.path.join(self.tmpdir, 'output'), 'rb') as pfile:
                self.cmd_output = pickle.load(pfile)
            if isinstance(self.cmd_output.stdout, bytes):
                self.cmd_output.stdout = self.cmd_output.stdout.decode()
                self.cmd_output.stderr = self.cmd_output.stderr.decode()
            for f in os.listdir(self.tmpdir):
                if fnmatch(f, '*sosreport*.tar.??'):
                    self.archive = os.path.join(self.tmpdir, f)
                    break
        self.sysinfo = self.get_sysinfo()

    def tearDown(self):
        """If a test being run is the last one defined for a test case, then
        we should remove the extracted tarball directory so that we can upload
        a reasonably sized artifact to the GCE storage bucket if any tests
        fail during the test execution.

        The use of `end_of_test_case` is a bit wonky because we don't have a
        different/more reliable way to identify that a test class has completed
        all tests - mainly because avocado re-initializes the entire class
        for each `test_*` method defined within it.
        """
        if self.end_of_test_case:
            self.post_test_tear_down()
            # remove the extracted directory only if we have the tarball
            if self.archive and os.path.exists(self.archive):
                if os.path.exists(self.archive_path):
                    shutil.rmtree(self.archive_path)

    def post_test_tear_down(self):
        """Called at the end of a test run to ensure that any needed per-test
        cleanup can be done.
        """
        pass

    def setup_mocking(self):
        """Since we need to use setUp() in our overrides of avocado.Test,
        provide an alternate method for test cases that subclass BaseSoSTest
        to use.
        """
        pass

    def pre_sos_setup(self):
        """Do any needed non-mocking setup prior to the sos execution that is
        called in setUp()
        """
        pass

    def assertFileExists(self, fname):
        """Asserts that fname exists on the filesystem"""
        assert os.path.exists(fname), "%s does not exist" % fname

    def assertFileNotExists(self, fname):
        """Asserts that fname does not exist on the filesystem"""
        assert not os.path.exists(fname), "%s exists" % fname

    def assertOutputContains(self, content):
        """Ensure that stdout did contain the given content string

        :param content:  The string that should not be in stdout
        :type content:  ``str``
        """
        found = re.search(r"(.*)?%s(.*)?" % content, self.cmd_output.stdout + self.cmd_output.stderr)
        assert found, "Content string '%s' not in output" % content

    def assertOutputNotContains(self, content):
        """Ensure that stdout did NOT contain the given content string

        :param content:  The string that should not be in stdout
        :type content:  ``str``
        """
        found = re.search(r"(.*)?%s(.*)?" % content, self.cmd_output.stdout + self.cmd_output.stderr)
        assert not found, "String '%s' present in stdout" % content


class BaseSoSReportTest(BaseSoSTest):
    """This is the class to use for building sos report tests with.

    An instance of this test is expected to set at minimum a ``sos_cmd`` class
    attribute that represets the options handed to a specific execution of an
    sos command. This should be anything following ``sos report --batch``.

    """

    archive = None
    _manifest = None
    _exception_expected = False
    encrypt_pass = None
    sos_component = 'report'

    @property
    def manifest(self):
        if self._manifest is None:
            try:
                content = self.read_file_from_tmpdir(self.get_name_in_archive('sos_reports/manifest.json'))
                self._manifest = json.loads(content)
            except Exception:
                self._manifest = ''
                self.warning('Could not load manifest for test')
        return self._manifest

    @property
    def encrypted_path(self):
        return self.get_encrypted_path()

    def _decrypt_archive(self, archive):
        _archive = archive.strip('.gpg')
        cmd = ("gpg --batch --passphrase %s -o %s --decrypt %s"
               % (self.encrypt_pass, _archive, archive))
        try:
            res = process.run(cmd, timeout=10)
        except Exception as err:
            if err.result.interrupted:
                self.error("Timeout while decrypting")
            if 'Bad session key' in err.result.stderr.decode():
                self.fail("Decryption with well-known passphrase failed")
            raise
        return _archive

    def grep_for_content(self, search):
        """Call out to grep for finding a specific string 'search' in any place
        in the archive
        """
        cmd = "grep -ril '%s' %s" % (search, self.archive_path)
        try:
            out = process.run(cmd)
            rc = out.exit_status
        except process.CmdError as err:
            out = err.result
            rc = err.result.exit_status

        if rc == 1:
            # grep will return an exit code of 1 if no matches are found,
            # which is what we want
            return False
        else:
            flist = []
            for ln in out.stdout.decode('utf-8').splitlines():
                flist.append(ln.split(self.tmpdir)[-1])
            return flist

    def get_encrypted_path(self):
        """Since avocado re-instantiates a new object for every test_ method,
        we need to be able to retrieve the original path for the encrypted
        archive and cannot rely on it being set by the _extract_archive()
        override
        """
        try:
            return re.findall('/.*sosreport-.*tar.*\.gpg', self.cmd_output.stdout)[-1]
        except:
            return None

    def _extract_archive(self, arc_path):
        """Extract an archive to the temp directory
        """
        if '--encrypt' in self.sos_cmd:
            arc_path = self._decrypt_archive(arc_path)
        _extract_path = self._get_extracted_tarball_path()
        try:
            archive.extract(arc_path, _extract_path)
            self.archive_path = self._get_archive_path()
        except Exception as err:
            self.cancel("Could not extract archive: %s" % err)

    def _get_extracted_tarball_path(self):
        """Based on the klass id setup earlier, provide a name to extract the
        archive to within the tmpdir
        """
        return os.path.join(self.tmpdir, "sosreport-%s" % self.__class__.__name__)
        
    def _generate_sos_command(self):
        return "%s %s -v --batch --tmp-dir %s %s" % (self.sos_bin, self.sos_component, self.tmpdir, self.sos_cmd)

    def _execute_sos_cmd(self):
        super(BaseSoSReportTest, self)._execute_sos_cmd()
        self.archive = re.findall('/.*sosreport-.*tar.*', self.cmd_output.stdout)
        if self.archive:
            self.archive = self.archive[-1]
            self._extract_archive(self.archive)

    def _get_archive_path(self):
        path = glob.glob(self._get_extracted_tarball_path() + '/sosreport*')
        if path:
            return path[0]
        return None

    def setUp(self):
        super(BaseSoSReportTest, self).setUp()
        self.archive_path = self._get_archive_path()

    def get_name_in_archive(self, fname):
        """Get the full path to fname as it (would) exist in the archive
        """
        return os.path.join(self.archive_path, fname.lstrip('/'))

    def get_file_content(self, fname):
        """Reads the content of fname from within the archive and returns it

        :param fname:  The name of the file
        :type fname:  ``str``

        :returns: Content of fname
        :rtype: ``str``
        """
        content = ''
        with open(self.get_name_in_archive(fname), 'r') as gfile:
            content = gfile.read()
        return content

    def assertFileCollected(self, fname):
        """Ensure that a given fname is in the extracted archive if it exists
        on the host system

        :param fname:  The name of the file within the archive
        :type fname:  ``str``
        """
        if fname.startswith(('sos_', '/sos_')) or os.path.exists(fname):
            self.assertFileExists(self.get_name_in_archive(fname))
        else:
            assert True

    def assertFileNotCollected(self, fname):
        """Ensure that a given fname is NOT in the extracted archive

        :param fname:  The name of the file within the archive
        :type fname:  ``str``
        """
        self.assertFileNotExists(self.get_name_in_archive(fname))

    def assertFileGlobInArchive(self, fname):
        """Ensure that at least one file in the archive matches a given fname
        glob, iff it exists on the host system

        :param fname:  The glob to match filenames of
        :type fname:  ``str``
        """
        if fname.startswith(('sos_', '/sos_')):
            files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/')))
        elif not glob.glob(fname):
            # force the test to pass since the file glob could not have been
            # collected
            files = True
        else:
            files = glob.glob(os.path.join(self.archive_path, fname.lstrip('/')))
        assert files, "No files matching %s found" % fname

    def assertFileGlobNotInArchive(self, fname):
        """Ensure that there are NO files in the archive matching a given fname
        glob

        :param fname:  The glob to match filename(s) of
        :type fname:  ``str``
        """
        files = glob.glob(os.path.join(self.tmpdir, fname.lstrip('/')))
        self.log.debug(files)
        assert not files, "Found files in archive matching %s: %s" % (fname, files)

    def assertFileHasContent(self, fname, content):
        """Ensure that the given file fname contains the given content

        :param fname:  The name of the file
        :type fname:  ``str``

        :param content:  The content to match
        :type content: ``str``
        """
        matched = False
        fname = self.get_name_in_archive(fname)
        self.assertFileExists(fname)
        with open(fname, 'r') as lfile:
            _contents = lfile.read()
            for line in _contents.splitlines():
                if re.match(".*%s.*" % content, line, re.I):
                    matched = True
                    break
        assert matched, "Content '%s' does not appear in %s\n%s" % (content, fname, _contents)

    def assertFileNotHasContent(self, fname, content):
        """Ensure that the file file fname does NOT contain the given content

        :param fname:  The name of the file
        :type fname:  ``str``

        :param content:  The content to (not) match
        :type content:   ``str``
        """
        matched = False
        fname = self.get_name_in_archive(fname)
        with open(fname, 'r') as mfile:
            for line in mfile.read().splitlines():
                if re.match(".*%s.*" % content, line, re.I):
                    matched = True
                    break
        assert not matched, "Content '%s' appears in file %s" % (content, fname)

    def assertSosLogContains(self, content):
        """Ensure that the given content string exists in sos.log
        """
        self.assertFileHasContent('sos_logs/sos.log', content)

    def assertSosLogNotContains(self, content):
        """Ensure that the given content string does NOT exist in sos.log
        """
        self.assertFileNotHasContent('sos_logs/sos.log', content)

    def assertSosUILogContains(self, content):
        """Ensure that the given content string exists in ui.log
        """
        self.assertFileHasContent('sos_logs/ui.log', content)

    def assertSosUILogNotContains(self, content):
        """Ensure that the given content string does NOT exist in ui.log
        """
        self.assertFileNotHasContent('sos_logs/ui.log', content)

    def assertPluginIncluded(self, plugin):
        """Ensure that the specified plugin did run for the sos execution

        Note that this relies on manifest.json being successfully created

        :param plugin:  The name of the plugin
        :type plugin:  `` str``
        """
        if not self.manifest:
            self.error("No manifest found, cannot check for %s execution" % plugin)
        if isinstance(plugin, str):
            plugin = [plugin]
        for plug in plugin:
            assert plug in self.manifest['components']['report']['plugins'].keys(), "Plugin '%s' not recorded in manifest" % plug

    def assertPluginNotIncluded(self, plugin):
        """Ensure that the specified plugin did NOT run for the sos execution
        Note that this relies on manifest.json being successfully created

        :param plugin:  The name of the plugin
        :type plugin:  `` str``
        """
        if not self.manifest:
            self.error("No manifest found, cannot check for %s execution" % plugin)
        if isinstance(plugin, str):
            plugin = [plugin]
        for plug in plugin:
            assert plug not in self.manifest['components']['report']['plugins'].keys(), "Plugin '%s' is recorded in manifest" % plug

    def assertOnlyPluginsIncluded(self, plugins):
        """Ensure that only the specified plugins are in the manifest

        :param plugins:  The plugin names
        :type plugins:  ``str`` or ``list`` of strings
        """
        if not self.manifest:
            self.error("No manifest found, cannot check for %s execution" % plugins)
        if isinstance(plugins, str):
            plugins = [plugins]
        _executed = self.manifest['components']['report']['plugins'].keys()

        # test that all requested plugins did run
        for i in plugins:
            assert i in _executed, "Requested plugin '%s' did not run" % i

        # test that no unrequested plugins ran
        for j in _executed:
            assert j in plugins, "Unrequested plugin '%s' ran as well" % j

    def get_plugin_manifest(self, plugin):
        """Get the manifest data for the specified plugin

        :param plugin:  The name of the plugin
        :type plugin:   ``str``

        :returns: The section of the manifest for the plugin
        :rtype:   ``dict``
        """
        if not self.manifest['components']['report']['plugins'][plugin]:
            raise Exception("Manifest for %s not present" % plugin)
        return self.manifest['components']['report']['plugins'][plugin]


class StageOneReportTest(BaseSoSReportTest):
    """This is the test class to subclass for all Stage One (no mocking) tests
    within the sos test suite.

    In addition to any test_* methods defined in the test cases that subclass
    this, the methods defined here will ALSO run, to ensure basic consistency
    across test cases

    NOTE: You MUST replace the following line in the docstring of your own
    test cases, as otherwise the test will be disabled. This line is here to
    prevent this base class from being treated as a valid test case. Also, if
    you add any tests to this base class, make sure to add a line such as
    ':avocado: tags=stageone' to ensure the base tests run with new test cases

    :avocado: disable
    :avocado: tags=stageone,foreman
    """

    sos_cmd = ''

    def test_archive_created(self):
        """Ensure that the archive tarball was created and has the right owner
        """
        self.assertFileExists(self.archive)
        self.assertTrue(os.stat(self.archive).st_uid == 0)

    def test_checksum_is_valid(self):
        """Ensure that a checksum was generated, reported, and is correct
        """
        _chk = re.findall('sha256\t.*\n', self.cmd_output.stdout)
        _chk = _chk[0].split('sha256\t')[1].strip()
        assert _chk, "No checksum reported"
        _found = process.run("sha256sum %s" % (self.encrypted_path or self.archive)).stdout.decode().split()[0]
        self.assertEqual(_chk, _found)

    def test_no_new_kmods_loaded(self):
        """Ensure that no additional kernel modules have been loaded during an
        execution of a test
        """
        self.assertCountEqual(self.sysinfo['pre']['modules'],
                              self.sysinfo['post']['modules'])

    def test_archive_has_sos_dirs(self):
        """Ensure that we have the expected directory layout with in the
        archive
        """
        self.assertFileCollected('sos_commands')
        self.assertFileCollected('sos_logs')

    def test_manifest_created(self):
        self.assertFileCollected('sos_reports/manifest.json')

    @skipIf(lambda x: '--no-report' in x.sos_cmd, '--no-report used in command')
    def test_html_reports_created(self):
        self.assertFileCollected('sos_reports/sos.html')

    def test_no_ip_changes(self):
        # I.E. make sure we didn't cause any NIC flaps that for some reason
        # resulted in a new primary IP address. TODO: build this out to make
        # sure this IP is still bound to the same NIC
        self.assertEqual(self.sysinfo['pre']['networking']['ip_addr'],
                         self.sysinfo['post']['networking']['ip_addr'])

    def test_no_exceptions_during_execution(self):
        self.end_of_test_case = True
        self.assertSosLogNotContains('caught exception in plugin')
        self.assertFileGlobNotInArchive('sos_logs/*-plugin-errors.txt')


class StageTwoReportTest(BaseSoSReportTest):
    """This is the testing class to subclass when light mocking is needed to
    perform the test.

    Light mocking for our uses is restricted to dropping files in well-known
    locations, temporarily replacing binaries, and installing packages.

    Note: Stage 2 tests should NOT be run on any system that is considered
    either production, or is a workstation that cannot be easily re-imaged or
    re-deployed. While efforts are taken to ensure that systems are left in
    their original state after mocking tests are done, the assumption is that
    these tests are being run on "throw-away" test systems where it does not
    matter if that original state is indeed attained or not.

    This kind of mocking is described in the class attributes as follows for
    each test case that is a Stage 2 test:

    files   -   a list containing the files to drop on the test system's real
                filesystem. Mocked files should be placed in the same locations
                under tests/test_data. If list items are tuples, then the tuple
                elements are (source_path, dest_path), which will allow the
                project to store multiple versions of files in the tree without
                interfering with other tests

    packages -  a dict where the keys are the distribution names (e.g. 'rhel',
                'ubuntu') and the values are the package names optionally with
                version

    install_plugins - a list containing the names of test plugins to be dropped
                      inside the test repo for testing specific use cases.
                      The list values are strings that match the test plugin's
                      filename, and test plugins should be placed under
                      tests/test_data/fake_plugins

    :avocado: disable
    :avocado: tags=stagetwo,foreman2
    """

    sos_cmd = ''
    files = []
    packages = {}
    install_plugins = []
    _created_files = []

    def setUp(self):
        self.end_of_test_case = False
        # seems awkward, but check_installed() and remove() are not exposed
        # together with install_distro_packages()
        self.installer = software_manager
        self.sm = self.installer.SoftwareManager()

        for dist in self.packages:
            if isinstance(self.packages[dist], str):
                self.packages[dist] = [self.packages[dist]]

        keys = self.packages.keys()
        # allow for single declaration of packages for the RH family
        # for our purposes centos == rhel here
        if 'fedora' in keys and 'rhel' not in keys:
            self.packages['rhel'] = self.packages['fedora']
        elif 'rhel' in keys and 'fedora' not in keys:
            self.packages['fedora'] = self.packages['rhel']
        if 'rhel' in keys:
            self.packages['centos'] = self.packages['rhel']

        super(StageTwoReportTest, self).setUp()

    def tearDown(self):
        if self.end_of_test_case:
            self.teardown_mocking()
            super(StageTwoReportTest, self).tearDown()

    def teardown_mocking(self):
        """Undo any and all mocked setup that we did for tests
        """
        self.teardown_mocked_packages()
        self.teardown_mocked_files()
        self.teardown_mocked_plugins()

    def setup_mocking(self):
        """Main entrypoint for setting up our mocking for the test"""
        self.setup_mocked_packages()
        self.setup_mocked_files()
        self.setup_mocked_plugins()

    def setup_mocked_plugins(self):
        """Drop any plugins specified from tests/test_data/fake_plugins into
        the test repo root (as created by CirrusCI).
        """
        _installed = []
        for plug in self.install_plugins:
            if not plug.endswith('.py'):
                plug += '.py'
            fake_plug = os.path.join(SOS_TEST_DATA_DIR, 'fake_plugins', plug)
            if os.path.exists(fake_plug):
                shutil.copy(fake_plug, SOS_PLUGIN_DIR)
                _installed.append(os.path.realpath(os.path.join(SOS_PLUGIN_DIR, plug)))
        self._write_file_to_tmpdir('mocked_plugins', json.dumps(_installed))

    def teardown_mocked_plugins(self):
        """Remove any test plugins dropped into the repo during setup
        """
        _plugins = self.read_file_from_tmpdir('mocked_plugins')
        if not _plugins:
            return
        _plugins = json.loads(_plugins)
        for plug in _plugins:
            os.remove(plug)

    def setup_mocked_packages(self):
        """Install any required packages using avocado's software manager
        abstraction
        """
        if self.local_distro in self.packages:
            # remove any packages already locally installed, as otherwise
            # our call to SoftwareManager will return False
            self._strip_installed_packages()
            if not self.packages[self.local_distro]:
                return
            installed = self.installer.install_distro_packages(self.packages)
            if not installed:
                raise Exception(
                    "Unable to install requested packages %s"
                    % ', '.join(self.packages[self.local_distro])
                )
            # save installed package list to our tmpdir to be removed later
            self._write_file_to_tmpdir('mocked_packages', json.dumps(self.packages[self.local_distro]))

    def _strip_installed_packages(self):
        """For the list of packages given for a test, if any of the packages
        already exist on the test system, remove them from the list of packages
        to be installed.
        """
        self.packages[self.local_distro] = [
            p for p in self.packages[self.local_distro] if not
            self.sm.check_installed(p)
        ]

    def teardown_mocked_packages(self):
        """Uninstall any packages that we installed for this test
        """
        pkgs = self.read_file_from_tmpdir('mocked_packages')
        if not pkgs:
            return
        pkgs = json.loads(pkgs)
        for pkg in pkgs:
            self.sm.remove(pkg)

    def _copy_test_file(self, src, dest=None):
        """Helper to copy files from tests/test_data to relevant locations on
        the test system. If ``dest`` is provided, use that as the destination
        filename instead of using the ``src`` name
        """

        if dest is None:
            dest = src
        dir_added = False
        if os.path.exists(dest):
            os.rename(dest, dest + '.sostesting')
        _dir = os.path.split(src)[0]
        if not os.path.exists(_dir):
            os.makedirs(_dir)
            self._created_files.append(_dir)
            dir_added = True
        _test_file = os.path.join(SOS_TEST_DIR, 'test_data', src.lstrip('/'))
        shutil.copy(_test_file, dest)
        if not dir_added:
            self._created_files.append(dest)

    def setup_mocked_files(self):
        """Place any requested files from under tests/test_data into "proper"
        locations on the test system's filesystem.

        If any of these files already exist, rename the existing copy with a
        '.sostesting' extension, so we can easily undo any changes after the
        test(s) have run.
        """
        for mfile in self.files:
            if isinstance(mfile, tuple):
                self._copy_test_file(mfile[0], mfile[1])
            else:
                self._copy_test_file(mfile)
        if self._created_files:
            self._write_file_to_tmpdir('mocked_files', json.dumps(self._created_files))

    def teardown_mocked_files(self):
        """Remove any mocked files from the test system's filesystem, and
        if applicable, restore previously moved files
        """
        _files = self.read_file_from_tmpdir('mocked_files')
        if not _files:
            return
        _files = json.loads(_files)
        for mocked in _files:
            if os.path.isdir(mocked):
                shutil.rmtree(mocked)
            else:
                os.remove(mocked)
            if os.path.exists(mocked + '.sostesting'):
                os.rename(mocked + '.sostesting', mocked)

    def test_archive_created(self):
        """Ensure that the archive tarball was created and has the right owner
        """
        # kind of a hack, but since avocado test order is predicatable, we can
        # use this to avoid calling setUp() and tearDown() at each test_ method
        # for stagetwo like we use the tmpdir for stageone.
        # THIS TEST MUST ALWAYS BE DEFINED LAST IN THIS CLASS FOR THIS TO WORK
        self.end_of_test_case = True

        self.assertFileExists(self.archive)
        self.assertTrue(os.stat(self.archive).st_uid == 0)


class StageOneReportExceptionTest(BaseSoSReportTest):
    """This test class should be used when we expect to generate an exception
    with a given command invocation.

    By default, this class assumes no archive will be generated and the exit
    code from the sos command will be nonzero. If this is not the case for
    the specific test being run, e.g. testing plugin exception handling, then
    set the ``archive_still_expected`` class attr to ``True``

    :avocado: disable
    :avocado: tags=stageone
    """

    _exception_expected = True

    # set this to True if the exception generated is not expected to halt
    # archive generation
    archive_still_expected = False

    sos_cmd = ''

    @skipIf(lambda x: x.archive_still_expected, "0 exit code still expected")
    def test_nonzero_return_code(self):
        self.assertFalse(self.cmd_output.exit_status == 0)

    @skipIf(lambda x: x.archive_still_expected, "Output expected in test")
    def test_no_archive_generated(self):
        self.assertTrue(self.archive is None)


class StageOneOutputTest(BaseSoSTest):
    """This test class should be used for tests that are only checking or
    validating output from a specific command or option, such as --help or
    --list.

    :avocado: disable
    :avocado: tags=stageone
    """

    sos_cmd = ''

    def _generate_sos_command(self):
        return "%s %s" % (self.sos_bin, self.sos_cmd)

    @skipIf(lambda x: x._exception_expected, "Non-zero exit code expected")
    def test_help_output_successful(self):
        self.assertTrue(self.cmd_output.exit_status == 0)
        assert self.cmd_output.stdout, "No stdout output generated"
        assert not self.cmd_output.stderr, "stderr received, but not expected: %s" % self.cmd_output.stderr

    @skipIf(lambda x: not x._exception_expected, "Not anticipating stderr output")
    def test_help_error_reported(self):
        self.assertTrue(self.cmd_output.exit_status != 0)
        assert not self.cmd_output.stdout, "stdout received, but not expected: %s" % self.cmd_output.stdout
        assert self.cmd_output.stderr, "No stderr output generated"