containers.py 12.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

11 12 13
from __future__ import print_function

from aci import ImageManifest
14
from collections import namedtuple
15
import logging
16
from subprograms import ChrtClient, NodeOSClient, resources, SingularityClient
17
import operator
18

19
logger = logging.getLogger('nrm')
20
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
21 22
                                     'power', 'processes', 'clientids',
                                     'hwbindings'])
23

24 25 26 27 28 29

class ContainerManager(object):

    """Manages the creation, listing and deletion of containers, using a
    container runtime underneath."""

30
    def __init__(self, container_runtime, rm,
31
                 perfwrapper="nrm-perfwrapper",
32
                 linuxperf="perf",
33 34
                 pmpi_lib="/usr/lib/libnrm-pmpi.so",
                 downstream_event_uri="ipc:///tmp/nrm-downstream-event"):
35 36
        self.linuxperf = linuxperf
        self.perfwrapper = perfwrapper
37
        self.runtime = container_runtime
38
        self.containers = dict()
39 40
        self.pids = dict()
        self.resourcemanager = rm
41
        self.hwloc = rm.hwloc
42
        self.chrt = ChrtClient()
43
        self.pmpi_lib = pmpi_lib
44
        self.downstream_event_uri = downstream_event_uri
45

46 47 48 49 50 51 52 53 54 55 56 57 58 59
    def _get_container_tuple(self, container_name, manifest):
        """Retrieve a container tuple if the container exists, otherwise use
        the manifest to create a new one.

        Returns (bool, container_tuple), the first field telling if a container
        needs to be created."""

        if container_name in self.containers:
            return (False, self.containers[container_name])

        # ask the resource manager for resources
        ncpus = int(manifest.app.isolators.container.cpus.value)
        nmems = int(manifest.app.isolators.container.mems.value)
        req = resources(ncpus, nmems)
60 61
        allocated = self.resourcemanager.schedule(container_name, req)
        logger.info("create: allocation: %r", allocated)
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85

        # Container power settings
        container_power = dict()
        container_power['profile'] = None
        container_power['policy'] = None
        container_power['damper'] = None
        container_power['slowdown'] = None
        container_power['manager'] = None

        if manifest.is_feature_enabled('power'):
            pp = manifest.app.isolators.power
            if pp.profile in ["1", "True"]:
                container_power['profile'] = dict()
                container_power['profile']['start'] = dict()
                container_power['profile']['end'] = dict()
            if pp.policy != "NONE":
                container_power['policy'] = pp.policy
                container_power['damper'] = pp.damper
                container_power['slowdown'] = pp.slowdown

        # Compute hardware bindings
        hwbindings = dict()
        if manifest.is_feature_enabled('hwbind'):
            hwbindings['distrib'] = sorted(self.hwloc.distrib(
86
                                        ncpus, allocated), key=operator.
87
                                            attrgetter('cpus'))
88
        return (True, Container(container_name, manifest, allocated,
89 90
                                container_power, {}, {}, hwbindings))

91 92 93 94
    def create(self, request):
        """Create a container according to the request.

        Returns the pid of the container or a negative number for errors."""
95

96 97 98
        manifestfile = request['manifest']
        command = request['file']
        args = request['args']
99
        environ = request['environ']
100
        container_name = request['uuid']
101 102 103 104
        logger.info("create: manifest file:  %s", manifestfile)
        logger.info("create: command:        %s", command)
        logger.info("create: args:           %r", args)
        logger.info("create: container name: %s", container_name)
105

106 107
        manifest = ImageManifest()
        if not manifest.load(manifestfile):
108
            logger.error("Manifest is invalid")
109
            return None
110

111 112 113 114
        creation_needed, container = self._get_container_tuple(container_name,
                                                               manifest)
        if creation_needed:
            logger.info("Creating container %s", container_name)
115
            self.runtime.create(container)
116
            self.containers[container_name] = container
117

118
        # build context to execute
119 120
        # environ['PATH'] = ("/usr/local/sbin:"
        #                   "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
121
        environ['ARGO_CONTAINER_UUID'] = container_name
122
        environ['PERF'] = self.linuxperf
123 124
        environ['AC_APP_NAME'] = manifest.name
        environ['AC_METADATA_URL'] = "localhost"
125 126 127 128 129

        # power profiling uses LD_PRELOAD, we use get to ensure that it
        # doesn't crash if the policy doesn't exits.
        if container.power.get('policy'):
            environ['LD_PRELOAD'] = self.pmpi_lib
130
            environ['NRM_TRANSMIT'] = '1'
131
            environ['NRM_DAMPER'] = container.power['damper']
132

133 134 135 136 137
        # monitoring section involves libnrm
        if manifest.is_feature_enabled('monitoring'):
            environ['ARGO_NRM_RATELIMIT'] = \
                    manifest.app.isolators.monitoring.ratelimit

138 139 140 141 142
        if container.power.get('policy') or \
                manifest.is_feature_enabled('monitoring'):
            environ['ARGO_NRM_DOWNSTREAM_EVENT_URI'] = \
                    self.downstream_event_uri

143 144 145 146 147
        # build prefix to the entire command based on enabled features
        argv = []
        if manifest.is_feature_enabled('scheduler'):
            sched = manifest.app.isolators.scheduler
            argv = self.chrt.getwrappedcmd(sched)
148

149 150 151 152
        # Use hwloc-bind to launch each process in the conatiner by prepending
        # it as an argument to the command line, if enabled in manifest.
        # The hardware binding computed using hwloc-distrib is used here
        # --single
153 154 155 156
        if container.hwbindings:
            # round robin over the cpu bindings available
            bind_index = len(container.processes) % \
                    len(container.hwbindings['distrib'])
157 158
            argv.append('hwloc-bind')
            # argv.append('--single')
159 160
            cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
            memmask = container.hwbindings['distrib'][bind_index].mems[0]
161
            logging.info('create: binding to: %s, %s', cpumask, memmask)
162
            argv.append("core:{}".format(cpumask))
163
            argv.append('--membind')
164
            argv.append("numa:{}".format(memmask))
165

166 167 168 169 170 171 172
        # It would've been better if argo-perf-wrapper wrapped around
        # argo-nodeos-config and not the final command -- that way it would
        # be running outside of the container.  However, because
        # argo-nodeos-config is suid root, perf can't monitor it.
        if manifest.is_feature_enabled('perfwrapper'):
            argv.append(self.perfwrapper)

173 174
        argv.append(command)
        argv.extend(args)
175

176
        # run my command
177
        process = self.runtime.execute(container_name, argv, environ)
178

179 180 181 182 183 184
        # register the process
        container.processes[process.pid] = process
        container.clientids[process.pid] = request['clientid']
        self.pids[process.pid] = container
        logger.info("Created process %s in container %s", process.pid,
                    container_name)
185
        return process.pid, container
186 187 188

    def delete(self, uuid):
        """Delete a container and kill all related processes."""
189
        self.runtime.delete(uuid, kill=True)
190
        self.resourcemanager.update(uuid)
191 192
        c = self.containers[uuid]
        del self.containers[uuid]
193
        map(lambda i: self.pids.pop(c.processes[i].pid, None), c.processes)
194

Swann Perarnau's avatar
Swann Perarnau committed
195 196 197 198
    def kill(self, uuid):
        """Kill all the processes of a container."""
        if uuid in self.containers:
            c = self.containers[uuid]
199
            logger.debug("killing %r:", c)
200 201
            for p in c.processes.values():
                try:
202
                    p.proc.terminate()
203 204
                except OSError:
                    logging.error("OS error: could not terminate process.")
Swann Perarnau's avatar
Swann Perarnau committed
205

206 207
    def list(self):
        """List the containers in the system."""
208 209
        return [{'uuid': c.uuid, 'pid': c.processes.keys()}
                for c in self.containers.values()]
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257


class ContainerRuntime(object):

    """Implements the creation, deleting and spawning of commands for a
    container runtime."""

    def __init__(self):
        pass

    def create(self, container):
        """Create the container defined by the container namedtuple on the
        system."""
        raise NotImplementedError

    def execute(self, container_uuid, args, environ):
        """Execute a command inside a container, using a similar interface to
        popen.

        Returns a tornado.process.Subprocess"""
        raise NotImplementedError

    def delete(self, container_uuid, kill=False):
        """Delete a container, possibly killing all the processes inside."""
        raise NotImplementedError


class NodeOSRuntime(ContainerRuntime):

    """Implements the container runtime interface using the nodeos
    subprogram."""

    def __init__(self, path="argo_nodeos_config"):
        """Creates the client for nodeos, with an optional custom
        path/command."""
        self.client = NodeOSClient(argo_nodeos_config=path)

    def create(self, container):
        """Uses the container resource allocation to create a container."""
        self.client.create(container.uuid, container.resources)

    def execute(self, container_uuid, args, environ):
        """Launches a command in the container."""
        return self.client.execute(container_uuid, args, environ)

    def delete(self, container_uuid, kill=False):
        """Delete the container."""
        self.client.delete(container_uuid, kill)
258 259


260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
class SingularityRootRuntime(ContainerRuntime):

    """Implements the container runtime interface using the singularity
    subprogram."""

    def __init__(self, path="argo_singularity_config"):
        """Creates the client for singularity, with an optional custom
        path/command."""
        self.client = SingularityClient(argo_singularity_config=path)

    def create(self, container):
        """Uses the container resource allocation to create a container."""
        self.client.oci_start(container.uuid, container.resources)

    def execute(self, container_uuid, args, environ):
        """Launches a command in the container."""
        return self.client.oci_execute(container_uuid, args, environ)

    def delete(self, container_uuid, kill=False):
        """Delete the container."""
        self.client.oci_delete(container_uuid, kill)


class SingularityUserRuntime(ContainerRuntime):

    """Implements the container runtime interface using the singularity
    subprogram."""

    def __init__(self, path="singularity"):
        """Creates the client for singularity, with an optional custom
        path/command."""
        self.client = SingularityClient(singularity_path=path)
        self.containers = dict()

    def create(self, container):
        """Uses the container resource allocation to create a container."""
        self.containers[container.uuid] = container

    def execute(self, container_uuid, args, environ):
        """Launches a command in the container."""
        imageinfo = self.containers[container_uuid].manifest.image
        # not checking image because singularity supports all types
        return self.client.execute(imageinfo.path, args, environ)

    def delete(self, container_uuid, kill=False):
        """Delete the container."""
        del self.containers[container_uuid]


309 310 311 312 313 314 315 316 317 318 319 320 321
class DummyRuntime(ContainerRuntime):

    """Implements a dummy runtime that doesn't create any container, but still
    launches commands."""

    def __init__(self):
        pass

    def create(self, container):
        pass

    def execute(self, container_uuid, args, environ):
        import tornado.process as process
322
        return process.Subprocess(args,
323 324 325 326 327 328
                                  stdout=process.Subprocess.STREAM,
                                  stderr=process.Subprocess.STREAM,
                                  env=environ)

    def delete(self, container_uuid, kill=False):
        pass