containers.py 10.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

11 12 13
from __future__ import print_function

from aci import ImageManifest
14
from collections import namedtuple
15
import logging
16
from subprograms import ChrtClient, NodeOSClient, resources
17
import operator
18

19
logger = logging.getLogger('nrm')
20
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
21 22
                                     'power', 'processes', 'clientids',
                                     'hwbindings'])
23

24 25 26 27 28 29

class ContainerManager(object):

    """Manages the creation, listing and deletion of containers, using a
    container runtime underneath."""

30
    def __init__(self, container_runtime, rm,
31
                 perfwrapper="nrm-perfwrapper",
32
                 linuxperf="perf",
33
                 pmpi_lib="/usr/lib/libnrm-pmpi.so"):
34 35
        self.linuxperf = linuxperf
        self.perfwrapper = perfwrapper
36
        self.runtime = container_runtime
37
        self.containers = dict()
38 39
        self.pids = dict()
        self.resourcemanager = rm
40
        self.hwloc = rm.hwloc
41
        self.chrt = ChrtClient()
42
        self.pmpi_lib = pmpi_lib
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57
    def _get_container_tuple(self, container_name, manifest):
        """Retrieve a container tuple if the container exists, otherwise use
        the manifest to create a new one.

        Returns (bool, container_tuple), the first field telling if a container
        needs to be created."""

        if container_name in self.containers:
            return (False, self.containers[container_name])

        # ask the resource manager for resources
        ncpus = int(manifest.app.isolators.container.cpus.value)
        nmems = int(manifest.app.isolators.container.mems.value)
        req = resources(ncpus, nmems)
58 59
        allocated = self.resourcemanager.schedule(container_name, req)
        logger.info("create: allocation: %r", allocated)
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83

        # Container power settings
        container_power = dict()
        container_power['profile'] = None
        container_power['policy'] = None
        container_power['damper'] = None
        container_power['slowdown'] = None
        container_power['manager'] = None

        if manifest.is_feature_enabled('power'):
            pp = manifest.app.isolators.power
            if pp.profile in ["1", "True"]:
                container_power['profile'] = dict()
                container_power['profile']['start'] = dict()
                container_power['profile']['end'] = dict()
            if pp.policy != "NONE":
                container_power['policy'] = pp.policy
                container_power['damper'] = pp.damper
                container_power['slowdown'] = pp.slowdown

        # Compute hardware bindings
        hwbindings = dict()
        if manifest.is_feature_enabled('hwbind'):
            hwbindings['distrib'] = sorted(self.hwloc.distrib(
84
                                        ncpus, allocated), key=operator.
85
                                            attrgetter('cpus'))
86
        return (True, Container(container_name, manifest, allocated,
87 88
                                container_power, {}, {}, hwbindings))

89 90 91 92
    def create(self, request):
        """Create a container according to the request.

        Returns the pid of the container or a negative number for errors."""
93

94 95 96
        manifestfile = request['manifest']
        command = request['file']
        args = request['args']
97
        environ = request['environ']
98
        container_name = request['uuid']
99 100 101 102
        logger.info("create: manifest file:  %s", manifestfile)
        logger.info("create: command:        %s", command)
        logger.info("create: args:           %r", args)
        logger.info("create: container name: %s", container_name)
103

104 105
        manifest = ImageManifest()
        if not manifest.load(manifestfile):
106
            logger.error("Manifest is invalid")
107
            return None
108

109 110 111 112
        creation_needed, container = self._get_container_tuple(container_name,
                                                               manifest)
        if creation_needed:
            logger.info("Creating container %s", container_name)
113
            self.runtime.create(container)
114
            self.containers[container_name] = container
115

116
        # build context to execute
117 118
        # environ['PATH'] = ("/usr/local/sbin:"
        #                   "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
119
        environ['ARGO_CONTAINER_UUID'] = container_name
120
        environ['PERF'] = self.linuxperf
121 122
        environ['AC_APP_NAME'] = manifest.name
        environ['AC_METADATA_URL'] = "localhost"
123 124 125 126 127

        # power profiling uses LD_PRELOAD, we use get to ensure that it
        # doesn't crash if the policy doesn't exits.
        if container.power.get('policy'):
            environ['LD_PRELOAD'] = self.pmpi_lib
128
            environ['NRM_TRANSMIT'] = '1'
129
            environ['NRM_DAMPER'] = container.power['damper']
130

131 132 133 134 135
        # monitoring section involves libnrm
        if manifest.is_feature_enabled('monitoring'):
            environ['ARGO_NRM_RATELIMIT'] = \
                    manifest.app.isolators.monitoring.ratelimit

136 137 138 139 140
        # build prefix to the entire command based on enabled features
        argv = []
        if manifest.is_feature_enabled('scheduler'):
            sched = manifest.app.isolators.scheduler
            argv = self.chrt.getwrappedcmd(sched)
141

142 143 144 145
        # Use hwloc-bind to launch each process in the conatiner by prepending
        # it as an argument to the command line, if enabled in manifest.
        # The hardware binding computed using hwloc-distrib is used here
        # --single
146 147 148 149
        if container.hwbindings:
            # round robin over the cpu bindings available
            bind_index = len(container.processes) % \
                    len(container.hwbindings['distrib'])
150 151
            argv.append('hwloc-bind')
            # argv.append('--single')
152 153
            cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
            memmask = container.hwbindings['distrib'][bind_index].mems[0]
154
            logging.info('create: binding to: %s, %s', cpumask, memmask)
155
            argv.append("core:{}".format(cpumask))
156
            argv.append('--membind')
157
            argv.append("numa:{}".format(memmask))
158

159 160 161 162 163 164 165
        # It would've been better if argo-perf-wrapper wrapped around
        # argo-nodeos-config and not the final command -- that way it would
        # be running outside of the container.  However, because
        # argo-nodeos-config is suid root, perf can't monitor it.
        if manifest.is_feature_enabled('perfwrapper'):
            argv.append(self.perfwrapper)

166 167
        argv.append(command)
        argv.extend(args)
168

169
        # run my command
170
        process = self.runtime.execute(container_name, argv, environ)
171

172 173 174 175 176 177
        # register the process
        container.processes[process.pid] = process
        container.clientids[process.pid] = request['clientid']
        self.pids[process.pid] = container
        logger.info("Created process %s in container %s", process.pid,
                    container_name)
178
        return process.pid, container
179 180 181

    def delete(self, uuid):
        """Delete a container and kill all related processes."""
182
        self.runtime.delete(uuid, kill=True)
183
        self.resourcemanager.update(uuid)
184 185
        c = self.containers[uuid]
        del self.containers[uuid]
186
        map(lambda i: self.pids.pop(c.processes[i].pid, None), c.processes)
187

Swann Perarnau's avatar
Swann Perarnau committed
188 189 190 191
    def kill(self, uuid):
        """Kill all the processes of a container."""
        if uuid in self.containers:
            c = self.containers[uuid]
192
            logger.debug("killing %r:", c)
193 194
            for p in c.processes.values():
                try:
195
                    p.proc.terminate()
196 197
                except OSError:
                    logging.error("OS error: could not terminate process.")
Swann Perarnau's avatar
Swann Perarnau committed
198

199 200
    def list(self):
        """List the containers in the system."""
201 202
        return [{'uuid': c.uuid, 'pid': c.processes.keys()}
                for c in self.containers.values()]
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250


class ContainerRuntime(object):

    """Implements the creation, deleting and spawning of commands for a
    container runtime."""

    def __init__(self):
        pass

    def create(self, container):
        """Create the container defined by the container namedtuple on the
        system."""
        raise NotImplementedError

    def execute(self, container_uuid, args, environ):
        """Execute a command inside a container, using a similar interface to
        popen.

        Returns a tornado.process.Subprocess"""
        raise NotImplementedError

    def delete(self, container_uuid, kill=False):
        """Delete a container, possibly killing all the processes inside."""
        raise NotImplementedError


class NodeOSRuntime(ContainerRuntime):

    """Implements the container runtime interface using the nodeos
    subprogram."""

    def __init__(self, path="argo_nodeos_config"):
        """Creates the client for nodeos, with an optional custom
        path/command."""
        self.client = NodeOSClient(argo_nodeos_config=path)

    def create(self, container):
        """Uses the container resource allocation to create a container."""
        self.client.create(container.uuid, container.resources)

    def execute(self, container_uuid, args, environ):
        """Launches a command in the container."""
        return self.client.execute(container_uuid, args, environ)

    def delete(self, container_uuid, kill=False):
        """Delete the container."""
        self.client.delete(container_uuid, kill)
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265


class DummyRuntime(ContainerRuntime):

    """Implements a dummy runtime that doesn't create any container, but still
    launches commands."""

    def __init__(self):
        pass

    def create(self, container):
        pass

    def execute(self, container_uuid, args, environ):
        import tornado.process as process
266
        return process.Subprocess(args,
267 268 269 270 271 272
                                  stdout=process.Subprocess.STREAM,
                                  stderr=process.Subprocess.STREAM,
                                  env=environ)

    def delete(self, container_uuid, kill=False):
        pass