containers.py 7.8 KB
Newer Older
1 2 3
from __future__ import print_function

from aci import ImageManifest
4
from collections import namedtuple
5
import logging
6
from subprograms import ChrtClient, NodeOSClient, resources
7
import operator
8

9
logger = logging.getLogger('nrm')
10
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
11 12
                                     'power', 'processes', 'clientids',
                                     'hwbindings'])
13

14 15 16 17 18 19

class ContainerManager(object):

    """Manages the creation, listing and deletion of containers, using a
    container runtime underneath."""

20 21 22
    def __init__(self, rm,
                 perfwrapper="argo-perf-wrapper",
                 linuxperf="perf",
23 24
                 argo_nodeos_config="argo_nodeos_config",
                 pmpi_lib="/usr/lib/libnrm-pmpi.so"):
25 26 27
        self.linuxperf = linuxperf
        self.perfwrapper = perfwrapper
        self.nodeos = NodeOSClient(argo_nodeos_config=argo_nodeos_config)
28
        self.containers = dict()
29 30
        self.pids = dict()
        self.resourcemanager = rm
31
        self.hwloc = rm.hwloc
32
        self.chrt = ChrtClient()
33
        self.pmpi_lib = pmpi_lib
34

35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
    def _get_container_tuple(self, container_name, manifest):
        """Retrieve a container tuple if the container exists, otherwise use
        the manifest to create a new one.

        Returns (bool, container_tuple), the first field telling if a container
        needs to be created."""

        if container_name in self.containers:
            return (False, self.containers[container_name])

        # ask the resource manager for resources
        ncpus = int(manifest.app.isolators.container.cpus.value)
        nmems = int(manifest.app.isolators.container.mems.value)
        req = resources(ncpus, nmems)
        alloc = self.resourcemanager.schedule(container_name, req)
        logger.info("create: allocation: %r", alloc)

        container_resources = dict()
        container_resources['cpus'], container_resources['mems'] = alloc

        # Container power settings
        container_power = dict()
        container_power['profile'] = None
        container_power['policy'] = None
        container_power['damper'] = None
        container_power['slowdown'] = None
        container_power['manager'] = None

        if manifest.is_feature_enabled('power'):
            pp = manifest.app.isolators.power
            if pp.profile in ["1", "True"]:
                container_power['profile'] = dict()
                container_power['profile']['start'] = dict()
                container_power['profile']['end'] = dict()
            if pp.policy != "NONE":
                container_power['policy'] = pp.policy
                container_power['damper'] = pp.damper
                container_power['slowdown'] = pp.slowdown

        # Compute hardware bindings
        hwbindings = dict()
        if manifest.is_feature_enabled('hwbind'):
            hwbindings['distrib'] = sorted(self.hwloc.distrib(
                                        ncpus, alloc), key=operator.
                                            attrgetter('cpus'))
        return (True, Container(container_name, manifest,
                                container_resources,
                                container_power, {}, {}, hwbindings))

84 85 86 87
    def create(self, request):
        """Create a container according to the request.

        Returns the pid of the container or a negative number for errors."""
88

89 90 91
        manifestfile = request['manifest']
        command = request['file']
        args = request['args']
92
        environ = request['environ']
93
        container_name = request['uuid']
94 95 96 97
        logger.info("create: manifest file:  %s", manifestfile)
        logger.info("create: command:        %s", command)
        logger.info("create: args:           %r", args)
        logger.info("create: container name: %s", container_name)
98

99 100
        manifest = ImageManifest()
        if not manifest.load(manifestfile):
101
            logger.error("Manifest is invalid")
102
            return None
103

104 105 106 107 108 109 110 111
        creation_needed, container = self._get_container_tuple(container_name,
                                                               manifest)
        if creation_needed:
            logger.info("Creating container %s", container_name)
            req = resources(container.resources['cpus'],
                            container.resources['mems'])
            self.nodeos.create(container_name, req)
            self.containers[container_name] = container
112

113
        # build context to execute
114 115
        # environ['PATH'] = ("/usr/local/sbin:"
        #                   "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
116
        environ['ARGO_CONTAINER_UUID'] = container_name
117
        environ['PERF'] = self.linuxperf
118 119
        environ['AC_APP_NAME'] = manifest.name
        environ['AC_METADATA_URL'] = "localhost"
120 121 122 123 124

        # power profiling uses LD_PRELOAD, we use get to ensure that it
        # doesn't crash if the policy doesn't exits.
        if container.power.get('policy'):
            environ['LD_PRELOAD'] = self.pmpi_lib
125
            environ['NRM_TRANSMIT'] = '1'
126
            environ['NRM_DAMPER'] = container.power['damper']
127

128 129 130 131 132
        # build prefix to the entire command based on enabled features
        argv = []
        if manifest.is_feature_enabled('scheduler'):
            sched = manifest.app.isolators.scheduler
            argv = self.chrt.getwrappedcmd(sched)
133

134 135 136 137
        # Use hwloc-bind to launch each process in the conatiner by prepending
        # it as an argument to the command line, if enabled in manifest.
        # The hardware binding computed using hwloc-distrib is used here
        # --single
138 139 140 141
        if container.hwbindings:
            # round robin over the cpu bindings available
            bind_index = len(container.processes) % \
                    len(container.hwbindings['distrib'])
142 143
            argv.append('hwloc-bind')
            # argv.append('--single')
144 145 146
            cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
            memmask = container.hwbindings['distrib'][bind_index].mems[0]
            argv.append("core:{}".format(cpumask))
147
            argv.append('--membind')
148
            argv.append("numa:{}".format(memmask))
149

150 151 152 153 154 155 156
        # It would've been better if argo-perf-wrapper wrapped around
        # argo-nodeos-config and not the final command -- that way it would
        # be running outside of the container.  However, because
        # argo-nodeos-config is suid root, perf can't monitor it.
        if manifest.is_feature_enabled('perfwrapper'):
            argv.append(self.perfwrapper)

157 158
        argv.append(command)
        argv.extend(args)
159

160
        # run my command
161
        process = self.nodeos.execute(container_name, argv, environ)
162

163 164 165 166 167 168
        # register the process
        container.processes[process.pid] = process
        container.clientids[process.pid] = request['clientid']
        self.pids[process.pid] = container
        logger.info("Created process %s in container %s", process.pid,
                    container_name)
169
        return process.pid, container
170 171 172

    def delete(self, uuid):
        """Delete a container and kill all related processes."""
173
        self.nodeos.delete(uuid, kill=True)
174
        self.resourcemanager.update(uuid)
175 176
        c = self.containers[uuid]
        del self.containers[uuid]
177
        map(lambda i: self.pids.pop(c.processes[i].pid, None), c.processes)
178

Swann Perarnau's avatar
Swann Perarnau committed
179 180 181 182
    def kill(self, uuid):
        """Kill all the processes of a container."""
        if uuid in self.containers:
            c = self.containers[uuid]
183
            logger.debug("killing %r:", c)
184 185
            for p in c.processes.values():
                try:
186
                    p.proc.terminate()
187 188
                except OSError:
                    logging.error("OS error: could not terminate process.")
Swann Perarnau's avatar
Swann Perarnau committed
189

190 191
    def list(self):
        """List the containers in the system."""
192 193
        return [{'uuid': c.uuid, 'pid': c.processes.keys()}
                for c in self.containers.values()]