containers.py 10 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

11 12 13
from __future__ import print_function

from aci import ImageManifest
14
from collections import namedtuple
15
import logging
16
from subprograms import ChrtClient, NodeOSClient, resources
17
import operator
18

19
logger = logging.getLogger('nrm')
20
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
21 22
                                     'power', 'processes', 'clientids',
                                     'hwbindings'])
23

24 25 26 27 28 29

class ContainerManager(object):

    """Manages the creation, listing and deletion of containers, using a
    container runtime underneath."""

30
    def __init__(self, container_runtime, rm,
31 32
                 perfwrapper="argo-perf-wrapper",
                 linuxperf="perf",
33
                 pmpi_lib="/usr/lib/libnrm-pmpi.so"):
34 35
        self.linuxperf = linuxperf
        self.perfwrapper = perfwrapper
36
        self.runtime = container_runtime
37
        self.containers = dict()
38 39
        self.pids = dict()
        self.resourcemanager = rm
40
        self.hwloc = rm.hwloc
41
        self.chrt = ChrtClient()
42
        self.pmpi_lib = pmpi_lib
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57
    def _get_container_tuple(self, container_name, manifest):
        """Retrieve a container tuple if the container exists, otherwise use
        the manifest to create a new one.

        Returns (bool, container_tuple), the first field telling if a container
        needs to be created."""

        if container_name in self.containers:
            return (False, self.containers[container_name])

        # ask the resource manager for resources
        ncpus = int(manifest.app.isolators.container.cpus.value)
        nmems = int(manifest.app.isolators.container.mems.value)
        req = resources(ncpus, nmems)
58 59
        allocated = self.resourcemanager.schedule(container_name, req)
        logger.info("create: allocation: %r", allocated)
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83

        # Container power settings
        container_power = dict()
        container_power['profile'] = None
        container_power['policy'] = None
        container_power['damper'] = None
        container_power['slowdown'] = None
        container_power['manager'] = None

        if manifest.is_feature_enabled('power'):
            pp = manifest.app.isolators.power
            if pp.profile in ["1", "True"]:
                container_power['profile'] = dict()
                container_power['profile']['start'] = dict()
                container_power['profile']['end'] = dict()
            if pp.policy != "NONE":
                container_power['policy'] = pp.policy
                container_power['damper'] = pp.damper
                container_power['slowdown'] = pp.slowdown

        # Compute hardware bindings
        hwbindings = dict()
        if manifest.is_feature_enabled('hwbind'):
            hwbindings['distrib'] = sorted(self.hwloc.distrib(
84
                                        ncpus, allocated), key=operator.
85
                                            attrgetter('cpus'))
86
        return (True, Container(container_name, manifest, allocated,
87 88
                                container_power, {}, {}, hwbindings))

89 90 91 92
    def create(self, request):
        """Create a container according to the request.

        Returns the pid of the container or a negative number for errors."""
93

94 95 96
        manifestfile = request['manifest']
        command = request['file']
        args = request['args']
97
        environ = request['environ']
98
        container_name = request['uuid']
99 100 101 102
        logger.info("create: manifest file:  %s", manifestfile)
        logger.info("create: command:        %s", command)
        logger.info("create: args:           %r", args)
        logger.info("create: container name: %s", container_name)
103

104 105
        manifest = ImageManifest()
        if not manifest.load(manifestfile):
106
            logger.error("Manifest is invalid")
107
            return None
108

109 110 111 112
        creation_needed, container = self._get_container_tuple(container_name,
                                                               manifest)
        if creation_needed:
            logger.info("Creating container %s", container_name)
113
            self.runtime.create(container)
114
            self.containers[container_name] = container
115

116
        # build context to execute
117 118
        # environ['PATH'] = ("/usr/local/sbin:"
        #                   "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
119
        environ['ARGO_CONTAINER_UUID'] = container_name
120
        environ['PERF'] = self.linuxperf
121 122
        environ['AC_APP_NAME'] = manifest.name
        environ['AC_METADATA_URL'] = "localhost"
123 124 125 126 127

        # power profiling uses LD_PRELOAD, we use get to ensure that it
        # doesn't crash if the policy doesn't exits.
        if container.power.get('policy'):
            environ['LD_PRELOAD'] = self.pmpi_lib
128
            environ['NRM_TRANSMIT'] = '1'
129
            environ['NRM_DAMPER'] = container.power['damper']
130

131 132 133 134 135
        # build prefix to the entire command based on enabled features
        argv = []
        if manifest.is_feature_enabled('scheduler'):
            sched = manifest.app.isolators.scheduler
            argv = self.chrt.getwrappedcmd(sched)
136

137 138 139 140
        # Use hwloc-bind to launch each process in the conatiner by prepending
        # it as an argument to the command line, if enabled in manifest.
        # The hardware binding computed using hwloc-distrib is used here
        # --single
141 142 143 144
        if container.hwbindings:
            # round robin over the cpu bindings available
            bind_index = len(container.processes) % \
                    len(container.hwbindings['distrib'])
145 146
            argv.append('hwloc-bind')
            # argv.append('--single')
147 148
            cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
            memmask = container.hwbindings['distrib'][bind_index].mems[0]
149
            logging.info('create: binding to: %s, %s', cpumask, memmask)
150
            argv.append("core:{}".format(cpumask))
151
            argv.append('--membind')
152
            argv.append("numa:{}".format(memmask))
153

154 155 156 157 158 159 160
        # It would've been better if argo-perf-wrapper wrapped around
        # argo-nodeos-config and not the final command -- that way it would
        # be running outside of the container.  However, because
        # argo-nodeos-config is suid root, perf can't monitor it.
        if manifest.is_feature_enabled('perfwrapper'):
            argv.append(self.perfwrapper)

161 162
        argv.append(command)
        argv.extend(args)
163

164
        # run my command
165
        process = self.runtime.execute(container_name, argv, environ)
166

167 168 169 170 171 172
        # register the process
        container.processes[process.pid] = process
        container.clientids[process.pid] = request['clientid']
        self.pids[process.pid] = container
        logger.info("Created process %s in container %s", process.pid,
                    container_name)
173
        return process.pid, container
174 175 176

    def delete(self, uuid):
        """Delete a container and kill all related processes."""
177
        self.runtime.delete(uuid, kill=True)
178
        self.resourcemanager.update(uuid)
179 180
        c = self.containers[uuid]
        del self.containers[uuid]
181
        map(lambda i: self.pids.pop(c.processes[i].pid, None), c.processes)
182

Swann Perarnau's avatar
Swann Perarnau committed
183 184 185 186
    def kill(self, uuid):
        """Kill all the processes of a container."""
        if uuid in self.containers:
            c = self.containers[uuid]
187
            logger.debug("killing %r:", c)
188 189
            for p in c.processes.values():
                try:
190
                    p.proc.terminate()
191 192
                except OSError:
                    logging.error("OS error: could not terminate process.")
Swann Perarnau's avatar
Swann Perarnau committed
193

194 195
    def list(self):
        """List the containers in the system."""
196 197
        return [{'uuid': c.uuid, 'pid': c.processes.keys()}
                for c in self.containers.values()]
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245


class ContainerRuntime(object):

    """Implements the creation, deleting and spawning of commands for a
    container runtime."""

    def __init__(self):
        pass

    def create(self, container):
        """Create the container defined by the container namedtuple on the
        system."""
        raise NotImplementedError

    def execute(self, container_uuid, args, environ):
        """Execute a command inside a container, using a similar interface to
        popen.

        Returns a tornado.process.Subprocess"""
        raise NotImplementedError

    def delete(self, container_uuid, kill=False):
        """Delete a container, possibly killing all the processes inside."""
        raise NotImplementedError


class NodeOSRuntime(ContainerRuntime):

    """Implements the container runtime interface using the nodeos
    subprogram."""

    def __init__(self, path="argo_nodeos_config"):
        """Creates the client for nodeos, with an optional custom
        path/command."""
        self.client = NodeOSClient(argo_nodeos_config=path)

    def create(self, container):
        """Uses the container resource allocation to create a container."""
        self.client.create(container.uuid, container.resources)

    def execute(self, container_uuid, args, environ):
        """Launches a command in the container."""
        return self.client.execute(container_uuid, args, environ)

    def delete(self, container_uuid, kill=False):
        """Delete the container."""
        self.client.delete(container_uuid, kill)
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260


class DummyRuntime(ContainerRuntime):

    """Implements a dummy runtime that doesn't create any container, but still
    launches commands."""

    def __init__(self):
        pass

    def create(self, container):
        pass

    def execute(self, container_uuid, args, environ):
        import tornado.process as process
261
        return process.Subprocess(args,
262 263 264 265 266 267
                                  stdout=process.Subprocess.STREAM,
                                  stderr=process.Subprocess.STREAM,
                                  env=environ)

    def delete(self, container_uuid, kill=False):
        pass