aboutsummaryrefslogtreecommitdiffstats
path: root/sos/policies/runtimes/__init__.py
blob: 20492e208dd1fa8fe09daba7fdf28b398510dd93 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# Copyright (C) 2020 Red Hat, Inc., Jake Hunsaker <jhunsake@redhat.com>

# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.

import re

from shlex import quote
from sos.utilities import sos_get_command_output, is_executable


class ContainerRuntime():
    """Encapsulates a container runtime that provides the ability to plugins to
    check runtime status, check for the presence of specific containers, and
    to format commands to run in those containers

    :param policy: The loaded policy for the system
    :type policy: ``Policy()``

    :cvar name: The name of the container runtime, e.g. 'podman'
    :vartype name: ``str``

    :cvar containers: A list of containers known to the runtime
    :vartype containers: ``list``

    :cvar images: A list of images known to the runtime
    :vartype images: ``list``

    :cvar binary: The binary command to run for the runtime, must exit within
                  $PATH
    :vartype binary: ``str``
    """

    name = 'Undefined'
    containers = []
    images = []
    volumes = []
    binary = ''
    active = False

    def __init__(self, policy=None):
        self.policy = policy
        self.run_cmd = "%s exec " % self.binary

    def load_container_info(self):
        """If this runtime is found to be active, attempt to load information
        on the objects existing in the runtime.
        """
        self.containers = self.get_containers()
        self.images = self.get_images()
        self.volumes = self.get_volumes()

    def check_is_active(self):
        """Check to see if the container runtime is both present AND active.

        Active in this sense means that the runtime can be used to glean
        information about the runtime itself and containers that are running.

        :returns: ``True`` if the runtime is active, else ``False``
        :rtype: ``bool``
        """
        if is_executable(self.binary, self.policy.sysroot):
            self.active = True
            return True
        return False

    def check_can_copy(self):
        """Check if the runtime supports copying files out of containers and
        onto the host filesystem
        """
        return True

    def get_containers(self, get_all=False):
        """Get a list of containers present on the system.

        :param get_all: If set, include stopped containers as well
        :type get_all: ``bool``
        """
        containers = []
        _cmd = "%s ps %s" % (self.binary, '-a' if get_all else '')
        if self.active:
            out = sos_get_command_output(_cmd, chroot=self.policy.sysroot)
            if out['status'] == 0:
                for ent in out['output'].splitlines()[1:]:
                    ent = ent.split()
                    # takes the form (container_id, container_name)
                    containers.append((ent[0], ent[-1]))
        return containers

    def get_container_by_name(self, name):
        """Get the container ID for the container matching the provided
        name

        :param name: The name of the container, note this can be a regex
        :type name: ``str``

        :returns: The id of the first container to match `name`, else ``None``
        :rtype: ``str``
        """
        if not self.active or name is None:
            return None
        for c in self.containers:
            if re.match(name, c[1]):
                return c[0]
        return None

    def get_images(self):
        """Get a list of images present on the system

        :returns: A list of 2-tuples containing (image_name, image_id)
        :rtype: ``list``
        """
        images = []
        fmt = '{{lower .Repository}}:{{lower .Tag}} {{lower .ID}}'
        if self.active:
            out = sos_get_command_output(
                "%s images --format '%s'" % (self.binary, fmt),
                chroot=self.policy.sysroot
            )
            if out['status'] == 0:
                for ent in out['output'].splitlines():
                    ent = ent.split()
                    # takes the form (image_name, image_id)
                    images.append((ent[0], ent[1]))
        return images

    def get_volumes(self):
        """Get a list of container volumes present on the system

        :returns: A list of volume IDs on the system
        :rtype: ``list``
        """
        vols = []
        if self.active:
            out = sos_get_command_output(
                "%s volume ls" % self.binary,
                chroot=self.policy.sysroot
            )
            if out['status'] == 0:
                for ent in out['output'].splitlines()[1:]:
                    ent = ent.split()
                    vols.append(ent[-1])
        return vols

    def container_exists(self, container):
        """Check if a given container ID or name exists on the system from the
        perspective of the container runtime.

        Note that this will only check _running_ containers

        :param container:       The name or ID of the container
        :type container:        ``str``

        :returns:               True if the container exists, else False
        :rtype:                 ``bool``
        """
        for _contup in self.containers:
            if container in _contup:
                return True
        return False

    def fmt_container_cmd(self, container, cmd, quotecmd):
        """Format a command to run inside a container using the runtime

        :param container: The name or ID of the container in which to run
        :type container: ``str``

        :param cmd: The command to run inside `container`
        :type cmd: ``str``

        :param quotecmd: Whether the cmd should be quoted.
        :type quotecmd: ``bool``

        :returns: Formatted string to run `cmd` inside `container`
        :rtype: ``str``
        """
        if quotecmd:
            quoted_cmd = quote(cmd)
        else:
            quoted_cmd = cmd
        return "%s %s %s" % (self.run_cmd, container, quoted_cmd)

    def fmt_registry_credentials(self, username, password):
        """Format a string to pass to the 'run' command of the runtime to
        enable authorization for pulling the image during `sos collect`, if
        needed using username and optional password creds

        :param username:    The name of the registry user
        :type username:     ``str``

        :param password:    The password of the registry user
        :type password:     ``str`` or ``None``

        :returns:  The string to use to enable a run command to pull the image
        :rtype:    ``str``
        """
        return "--creds=%s%s" % (username, ':' + password if password else '')

    def fmt_registry_authfile(self, authfile):
        """Format a string to pass to the 'run' command of the runtime to
        enable authorization for pulling the image during `sos collect`, if
        needed using an authfile.
        """
        if authfile:
            return "--authfile %s" % authfile
        return ''

    def get_logs_command(self, container):
        """Get the command string used to dump container logs from the
        runtime

        :param container: The name or ID of the container to get logs for
        :type container: ``str``

        :returns: Formatted runtime command to get logs from `container`
        :type: ``str``
        """
        return "%s logs -t %s" % (self.binary, container)

    def get_copy_command(self, container, path, dest, sizelimit=None):
        """Generate the command string used to copy a file out of a container
        by way of the runtime.

        :param container:   The name or ID of the container
        :type container:    ``str``

        :param path:        The path to copy from the container. Note that at
                            this time, no supported runtime supports globbing
        :type path:         ``str``

        :param dest:        The destination on the *host* filesystem to write
                            the file to
        :type dest:         ``str``

        :param sizelimit:   Limit the collection to the last X bytes of the
                            file at PATH
        :type sizelimit:    ``int``

        :returns:   Formatted runtime command to copy a file from a container
        :rtype:     ``str``
        """
        if sizelimit:
            return "%s %s tail -c %s %s" % (self.run_cmd, container, sizelimit,
                                            path)
        return "%s cp %s:%s %s" % (self.binary, container, path, dest)

# vim: set et ts=4 sw=4 :