aboutsummaryrefslogtreecommitdiffstats
path: root/sos/policies/runtimes/lxd.py
blob: a6962782c707d5aa6b828c58cf6dce73c92cf8a2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# Copyright (C) 2023 Canonical Ltd., Arif Ali <arif.ali@canonical.com>

# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.

import json

from sos.policies.runtimes import ContainerRuntime
from sos.utilities import sos_get_command_output
from sos.utilities import is_executable


class LxdContainerRuntime(ContainerRuntime):
    """Runtime class to use for systems running LXD"""

    name = 'lxd'
    binary = 'lxc'

    def check_is_active(self):
        # the daemon must be running
        if (is_executable('lxc', self.policy.sysroot) and
                self.policy.package_manager.pkg_by_name('lxd') and
                (self.policy.init_system.is_running('lxd') or
                 self.policy.init_system.is_running('snap.lxd.daemon'))):
            self.active = True
            return True
        return False

    def get_containers(self, get_all=False):
        """Get a list of containers present on the system.

        :param get_all: If set, include stopped containers as well
        :type get_all: ``bool``
        """
        containers = []

        _cmd = f"{self.binary} list --format json"
        if self.active:
            out = sos_get_command_output(_cmd, chroot=self.policy.sysroot)

            if out["status"] == 0:
                out_json = json.loads(out["output"])

                for container in out_json:
                    if container['status'] == 'Running' or get_all:
                        # takes the form (container_id, container_name)
                        containers.append(
                            (container['expanded_config']['volatile.uuid'],
                             container['name']))

        return containers

    def get_images(self):
        """Get a list of images present on the system

        :returns: A list of 2-tuples containing (image_name, image_id)
        :rtype: ``list``
        """
        images = []
        if self.active:
            out = sos_get_command_output(
                f"{self.binary} image list --format json",
                chroot=self.policy.sysroot
            )
            if out['status'] == 0:
                out_json = json.loads(out["output"])
                for ent in out_json:
                    # takes the form (image_name, image_id)
                    if 'update_source' in ent:
                        images.append((
                            ent['update_source']['alias'],
                            ent['fingerprint']))
        return images

    def get_volumes(self):
        """Get a list of container volumes present on the system

        :returns: A list of volume IDs on the system
        :rtype: ``list``
        """
        vols = []
        stg_pool = "default"
        if self.active:

            # first get the default storage pool
            out = sos_get_command_output(
                f"{self.binary} profile list --format json",
                chroot=self.policy.sysroot
            )
            if out['status'] == 0:
                out_json = json.loads(out['output'])
                for profile in out_json:
                    if (profile['name'] == 'default' and
                            'root' in profile['devices']):
                        stg_pool = profile['devices']['root']['pool']
                        break

            out = sos_get_command_output(
                f"{self.binary} storage volume list {stg_pool} --format json",
                chroot=self.policy.sysroot
            )
            if out['status'] == 0:
                out_json = json.loads(out['output'])
                for ent in out_json:
                    vols.append(ent['name'])
        return vols

    def get_logs_command(self, container):
        """Get the command string used to dump container logs from the
        runtime

        :param container: The name or ID of the container to get logs for
        :type container: ``str``

        :returns: Formatted runtime command to get logs from `container`
        :type: ``str``
        """
        return f"{self.binary} info {container} --show-log"

    def get_copy_command(self, container, path, dest, sizelimit=None):
        """Generate the command string used to copy a file out of a container
        by way of the runtime.

        :param container:   The name or ID of the container
        :type container:    ``str``

        :param path:        The path to copy from the container. Note that at
                            this time, no supported runtime supports globbing
        :type path:         ``str``

        :param dest:        The destination on the *host* filesystem to write
                            the file to
        :type dest:         ``str``

        :param sizelimit:   Limit the collection to the last X bytes of the
                            file at PATH
        :type sizelimit:    ``int``

        :returns:   Formatted runtime command to copy a file from a container
        :rtype:     ``str``
        """
        if sizelimit:
            return f"{self.run_cmd} {container} tail -c {sizelimit} {path}"
        return f"{self.binary} file pull {container}{path} {dest}"


# vim: set et ts=4 sw=4 :