containers.py 7.57 KB
Newer Older
1 2 3
from __future__ import print_function

from aci import ImageManifest
4
from collections import namedtuple
5
import logging
6
from subprograms import ChrtClient, NodeOSClient, resources
7
import operator
8

9
logger = logging.getLogger('nrm')
10
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
11 12
                                     'power', 'processes', 'clientids',
                                     'hwbindings'])
13

14 15 16 17 18 19

class ContainerManager(object):

    """Manages the creation, listing and deletion of containers, using a
    container runtime underneath."""

20 21 22
    def __init__(self, rm,
                 perfwrapper="argo-perf-wrapper",
                 linuxperf="perf",
23 24
                 argo_nodeos_config="argo_nodeos_config",
                 pmpi_lib="/usr/lib/libnrm-pmpi.so"):
25 26 27
        self.linuxperf = linuxperf
        self.perfwrapper = perfwrapper
        self.nodeos = NodeOSClient(argo_nodeos_config=argo_nodeos_config)
28
        self.containers = dict()
29 30
        self.pids = dict()
        self.resourcemanager = rm
31
        self.hwloc = rm.hwloc
32
        self.chrt = ChrtClient()
33
        self.pmpi_lib = pmpi_lib
34

35 36 37 38 39 40 41 42 43 44 45 46 47 48
    def _get_container_tuple(self, container_name, manifest):
        """Retrieve a container tuple if the container exists, otherwise use
        the manifest to create a new one.

        Returns (bool, container_tuple), the first field telling if a container
        needs to be created."""

        if container_name in self.containers:
            return (False, self.containers[container_name])

        # ask the resource manager for resources
        ncpus = int(manifest.app.isolators.container.cpus.value)
        nmems = int(manifest.app.isolators.container.mems.value)
        req = resources(ncpus, nmems)
49 50
        allocated = self.resourcemanager.schedule(container_name, req)
        logger.info("create: allocation: %r", allocated)
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74

        # Container power settings
        container_power = dict()
        container_power['profile'] = None
        container_power['policy'] = None
        container_power['damper'] = None
        container_power['slowdown'] = None
        container_power['manager'] = None

        if manifest.is_feature_enabled('power'):
            pp = manifest.app.isolators.power
            if pp.profile in ["1", "True"]:
                container_power['profile'] = dict()
                container_power['profile']['start'] = dict()
                container_power['profile']['end'] = dict()
            if pp.policy != "NONE":
                container_power['policy'] = pp.policy
                container_power['damper'] = pp.damper
                container_power['slowdown'] = pp.slowdown

        # Compute hardware bindings
        hwbindings = dict()
        if manifest.is_feature_enabled('hwbind'):
            hwbindings['distrib'] = sorted(self.hwloc.distrib(
75
                                        ncpus, allocated), key=operator.
76
                                            attrgetter('cpus'))
77
        return (True, Container(container_name, manifest, allocated,
78 79
                                container_power, {}, {}, hwbindings))

80 81 82 83
    def create(self, request):
        """Create a container according to the request.

        Returns the pid of the container or a negative number for errors."""
84

85 86 87
        manifestfile = request['manifest']
        command = request['file']
        args = request['args']
88
        environ = request['environ']
89
        container_name = request['uuid']
90 91 92 93
        logger.info("create: manifest file:  %s", manifestfile)
        logger.info("create: command:        %s", command)
        logger.info("create: args:           %r", args)
        logger.info("create: container name: %s", container_name)
94

95 96
        manifest = ImageManifest()
        if not manifest.load(manifestfile):
97
            logger.error("Manifest is invalid")
98
            return None
99

100 101 102 103
        creation_needed, container = self._get_container_tuple(container_name,
                                                               manifest)
        if creation_needed:
            logger.info("Creating container %s", container_name)
104
            self.nodeos.create(container_name, container.resources)
105
            self.containers[container_name] = container
106

107
        # build context to execute
108 109
        # environ['PATH'] = ("/usr/local/sbin:"
        #                   "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
110
        environ['ARGO_CONTAINER_UUID'] = container_name
111
        environ['PERF'] = self.linuxperf
112 113
        environ['AC_APP_NAME'] = manifest.name
        environ['AC_METADATA_URL'] = "localhost"
114 115 116 117 118

        # power profiling uses LD_PRELOAD, we use get to ensure that it
        # doesn't crash if the policy doesn't exits.
        if container.power.get('policy'):
            environ['LD_PRELOAD'] = self.pmpi_lib
119
            environ['NRM_TRANSMIT'] = '1'
120
            environ['NRM_DAMPER'] = container.power['damper']
121

122 123 124 125 126
        # build prefix to the entire command based on enabled features
        argv = []
        if manifest.is_feature_enabled('scheduler'):
            sched = manifest.app.isolators.scheduler
            argv = self.chrt.getwrappedcmd(sched)
127

128 129 130 131
        # Use hwloc-bind to launch each process in the conatiner by prepending
        # it as an argument to the command line, if enabled in manifest.
        # The hardware binding computed using hwloc-distrib is used here
        # --single
132 133 134 135
        if container.hwbindings:
            # round robin over the cpu bindings available
            bind_index = len(container.processes) % \
                    len(container.hwbindings['distrib'])
136 137
            argv.append('hwloc-bind')
            # argv.append('--single')
138 139 140
            cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
            memmask = container.hwbindings['distrib'][bind_index].mems[0]
            argv.append("core:{}".format(cpumask))
141
            argv.append('--membind')
142
            argv.append("numa:{}".format(memmask))
143

144 145 146 147 148 149 150
        # It would've been better if argo-perf-wrapper wrapped around
        # argo-nodeos-config and not the final command -- that way it would
        # be running outside of the container.  However, because
        # argo-nodeos-config is suid root, perf can't monitor it.
        if manifest.is_feature_enabled('perfwrapper'):
            argv.append(self.perfwrapper)

151 152
        argv.append(command)
        argv.extend(args)
153

154
        # run my command
155
        process = self.nodeos.execute(container_name, argv, environ)
156

157 158 159 160 161 162
        # register the process
        container.processes[process.pid] = process
        container.clientids[process.pid] = request['clientid']
        self.pids[process.pid] = container
        logger.info("Created process %s in container %s", process.pid,
                    container_name)
163
        return process.pid, container
164 165 166

    def delete(self, uuid):
        """Delete a container and kill all related processes."""
167
        self.nodeos.delete(uuid, kill=True)
168
        self.resourcemanager.update(uuid)
169 170
        c = self.containers[uuid]
        del self.containers[uuid]
171
        map(lambda i: self.pids.pop(c.processes[i].pid, None), c.processes)
172

Swann Perarnau's avatar
Swann Perarnau committed
173 174 175 176
    def kill(self, uuid):
        """Kill all the processes of a container."""
        if uuid in self.containers:
            c = self.containers[uuid]
177
            logger.debug("killing %r:", c)
178 179
            for p in c.processes.values():
                try:
180
                    p.proc.terminate()
181 182
                except OSError:
                    logging.error("OS error: could not terminate process.")
Swann Perarnau's avatar
Swann Perarnau committed
183

184 185
    def list(self):
        """List the containers in the system."""
186 187
        return [{'uuid': c.uuid, 'pid': c.processes.keys()}
                for c in self.containers.values()]