containers.py 11.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

11 12 13
from __future__ import print_function

from aci import ImageManifest
14
from json import load
15
from collections import namedtuple
16
import logging
17
from subprograms import ChrtClient, NodeOSClient, resources, SingularityClient
18
import operator
19

20
logger = logging.getLogger('nrm')
21
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
22 23
                                     'power', 'processes', 'clientids',
                                     'hwbindings'])
24

25 26 27 28 29 30

class ContainerManager(object):

    """Manages the creation, listing and deletion of containers, using a
    container runtime underneath."""

31
    def __init__(self, container_runtime, rm,
32
                 perfwrapper="nrm-perfwrapper",
33
                 linuxperf="perf",
34 35
                 pmpi_lib="/usr/lib/libnrm-pmpi.so",
                 downstream_event_uri="ipc:///tmp/nrm-downstream-event"):
36 37
        self.linuxperf = linuxperf
        self.perfwrapper = perfwrapper
38
        self.runtime = container_runtime
39
        self.containers = dict()
40 41
        self.pids = dict()
        self.resourcemanager = rm
42
        self.hwloc = rm.hwloc
43
        self.chrt = ChrtClient()
44
        self.pmpi_lib = pmpi_lib
45
        self.downstream_event_uri = downstream_event_uri
46

47 48 49 50 51 52 53 54 55 56 57
    def _get_container_tuple(self, container_name, manifest):
        """Retrieve a container tuple if the container exists, otherwise use
        the manifest to create a new one.

        Returns (bool, container_tuple), the first field telling if a container
        needs to be created."""

        if container_name in self.containers:
            return (False, self.containers[container_name])

        # ask the resource manager for resources
58 59
        ncpus = manifest.app['container']['cpus']
        nmems = manifest.app['container']['mems']
60
        req = resources(ncpus, nmems)
61 62
        allocated = self.resourcemanager.schedule(container_name, req)
        logger.info("create: allocation: %r", allocated)
63 64 65 66 67 68 69 70 71 72

        # Container power settings
        container_power = dict()
        container_power['profile'] = None
        container_power['policy'] = None
        container_power['damper'] = None
        container_power['slowdown'] = None
        container_power['manager'] = None

        if manifest.is_feature_enabled('power'):
73 74
            pp = manifest.app['power']
            if pp['profile'] is True:
75 76 77
                container_power['profile'] = dict()
                container_power['profile']['start'] = dict()
                container_power['profile']['end'] = dict()
78 79 80 81
            if pp['policy'] != "NONE":
                container_power['policy'] = pp['policy']
                container_power['damper'] = pp['damper']
                container_power['slowdown'] = pp['slowdown']
82 83 84 85 86

        # Compute hardware bindings
        hwbindings = dict()
        if manifest.is_feature_enabled('hwbind'):
            hwbindings['distrib'] = sorted(self.hwloc.distrib(
87
                                        ncpus, allocated), key=operator.
88
                                            attrgetter('cpus'))
89
        return (True, Container(container_name, manifest, allocated,
90 91
                                container_power, {}, {}, hwbindings))

92 93 94 95
    def create(self, request):
        """Create a container according to the request.

        Returns the pid of the container or a negative number for errors."""
96

97 98 99
        manifestfile = request['manifest']
        command = request['file']
        args = request['args']
100
        environ = request['environ']
101
        container_name = request['uuid']
102 103 104 105
        logger.info("create: manifest file:  %s", manifestfile)
        logger.info("create: command:        %s", command)
        logger.info("create: args:           %r", args)
        logger.info("create: container name: %s", container_name)
106

107 108 109 110 111 112
        try:
            with open(manifestfile) as f:
                manifest = ImageManifest((load(f)))
        except Exception as e:
            logger.error("error occured in manifest loading:")
            raise(e)
113

114 115 116 117
        creation_needed, container = self._get_container_tuple(container_name,
                                                               manifest)
        if creation_needed:
            logger.info("Creating container %s", container_name)
118
            self.runtime.create(container, self.downstream_event_uri)
119
            self.containers[container_name] = container
120

121
        # build context to execute
122 123
        # environ['PATH'] = ("/usr/local/sbin:"
        #                   "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
124
        environ['ARGO_CONTAINER_UUID'] = container_name
125
        environ['PERF'] = self.linuxperf
126
        environ['AC_APP_NAME'] = manifest['name']
127
        environ['AC_METADATA_URL'] = "localhost"
128 129 130 131 132

        # power profiling uses LD_PRELOAD, we use get to ensure that it
        # doesn't crash if the policy doesn't exits.
        if container.power.get('policy'):
            environ['LD_PRELOAD'] = self.pmpi_lib
133
            environ['NRM_TRANSMIT'] = '1'
134
            environ['NRM_DAMPER'] = container.power['damper']
135

136 137 138
        # monitoring section involves libnrm
        if manifest.is_feature_enabled('monitoring'):
            environ['ARGO_NRM_RATELIMIT'] = \
139
                    manifest.app['monitoring']['ratelimit']
140

141 142 143 144 145
        if container.power.get('policy') or \
                manifest.is_feature_enabled('monitoring'):
            environ['ARGO_NRM_DOWNSTREAM_EVENT_URI'] = \
                    self.downstream_event_uri

146 147 148
        # build prefix to the entire command based on enabled features
        argv = []
        if manifest.is_feature_enabled('scheduler'):
149
            sched = manifest.app['scheduler']
150
            argv = self.chrt.getwrappedcmd(sched)
151

152 153 154 155
        # Use hwloc-bind to launch each process in the conatiner by prepending
        # it as an argument to the command line, if enabled in manifest.
        # The hardware binding computed using hwloc-distrib is used here
        # --single
156 157 158 159
        if container.hwbindings:
            # round robin over the cpu bindings available
            bind_index = len(container.processes) % \
                    len(container.hwbindings['distrib'])
160 161
            argv.append('hwloc-bind')
            # argv.append('--single')
162 163
            cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
            memmask = container.hwbindings['distrib'][bind_index].mems[0]
164
            logging.info('create: binding to: %s, %s', cpumask, memmask)
165
            argv.append("core:{}".format(cpumask))
166
            argv.append('--membind')
167
            argv.append("numa:{}".format(memmask))
168

169 170 171 172 173 174 175
        # It would've been better if argo-perf-wrapper wrapped around
        # argo-nodeos-config and not the final command -- that way it would
        # be running outside of the container.  However, because
        # argo-nodeos-config is suid root, perf can't monitor it.
        if manifest.is_feature_enabled('perfwrapper'):
            argv.append(self.perfwrapper)

176 177
        argv.append(command)
        argv.extend(args)
178

179
        # run my command
180
        process = self.runtime.execute(container_name, argv, environ)
181

182 183 184 185 186 187
        # register the process
        container.processes[process.pid] = process
        container.clientids[process.pid] = request['clientid']
        self.pids[process.pid] = container
        logger.info("Created process %s in container %s", process.pid,
                    container_name)
188
        return process.pid, container
189 190 191

    def delete(self, uuid):
        """Delete a container and kill all related processes."""
192
        self.runtime.delete(uuid, kill=True)
193
        self.resourcemanager.update(uuid)
194 195
        c = self.containers[uuid]
        del self.containers[uuid]
196
        map(lambda i: self.pids.pop(c.processes[i].pid, None), c.processes)
197

Swann Perarnau's avatar
Swann Perarnau committed
198 199 200 201
    def kill(self, uuid):
        """Kill all the processes of a container."""
        if uuid in self.containers:
            c = self.containers[uuid]
202
            logger.debug("killing %r:", c)
Valentin Reis's avatar
Valentin Reis committed
203 204
            for p in c.processes.values():
                try:
205
                    p.proc.terminate()
Valentin Reis's avatar
Valentin Reis committed
206 207
                except OSError:
                    logging.error("OS error: could not terminate process.")
Swann Perarnau's avatar
Swann Perarnau committed
208

209 210
    def list(self):
        """List the containers in the system."""
Valentin Reis's avatar
Valentin Reis committed
211 212
        return [{'uuid': c.uuid, 'pid': c.processes.keys()}
                for c in self.containers.values()]
213 214 215 216 217 218 219 220 221 222


class ContainerRuntime(object):

    """Implements the creation, deleting and spawning of commands for a
    container runtime."""

    def __init__(self):
        pass

223
    def create(self, container, downstream_uri):
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
        """Create the container defined by the container namedtuple on the
        system."""
        raise NotImplementedError

    def execute(self, container_uuid, args, environ):
        """Execute a command inside a container, using a similar interface to
        popen.

        Returns a tornado.process.Subprocess"""
        raise NotImplementedError

    def delete(self, container_uuid, kill=False):
        """Delete a container, possibly killing all the processes inside."""
        raise NotImplementedError


class NodeOSRuntime(ContainerRuntime):

    """Implements the container runtime interface using the nodeos
    subprogram."""

    def __init__(self, path="argo_nodeos_config"):
        """Creates the client for nodeos, with an optional custom
        path/command."""
        self.client = NodeOSClient(argo_nodeos_config=path)

250
    def create(self, container, downstream_uri):
251 252 253 254 255 256 257 258 259 260
        """Uses the container resource allocation to create a container."""
        self.client.create(container.uuid, container.resources)

    def execute(self, container_uuid, args, environ):
        """Launches a command in the container."""
        return self.client.execute(container_uuid, args, environ)

    def delete(self, container_uuid, kill=False):
        """Delete the container."""
        self.client.delete(container_uuid, kill)
261 262


263 264 265 266 267 268 269 270 271 272
class SingularityUserRuntime(ContainerRuntime):

    """Implements the container runtime interface using the singularity
    subprogram."""

    def __init__(self, path="singularity"):
        """Creates the client for singularity, with an optional custom
        path/command."""
        self.client = SingularityClient(singularity_path=path)

273
    def create(self, container, downstream_uri):
274
        """Uses the container resource allocation to create a container."""
275
        imageinfo = container.manifest.image
276 277
        self.client.instance_start(container.uuid, imageinfo['path'],
                                   [downstream_uri]+imageinfo['binds'])
278 279 280

    def execute(self, container_uuid, args, environ):
        """Launches a command in the container."""
281
        return self.client.execute(container_uuid, args, environ)
282 283 284

    def delete(self, container_uuid, kill=False):
        """Delete the container."""
285
        self.client.instance_stop(container_uuid, kill)
286 287


288 289 290 291 292 293 294 295
class DummyRuntime(ContainerRuntime):

    """Implements a dummy runtime that doesn't create any container, but still
    launches commands."""

    def __init__(self):
        pass

296
    def create(self, container, downstream_uri):
297 298 299 300
        pass

    def execute(self, container_uuid, args, environ):
        import tornado.process as process
301
        return process.Subprocess(args,
302 303 304 305 306 307
                                  stdout=process.Subprocess.STREAM,
                                  stderr=process.Subprocess.STREAM,
                                  env=environ)

    def delete(self, container_uuid, kill=False):
        pass