containers.py 9.68 KB
Newer Older
1 2 3
from __future__ import print_function

from aci import ImageManifest
4
from collections import namedtuple
5
import logging
6
from subprograms import ChrtClient, NodeOSClient, resources
7
import operator
8

9
logger = logging.getLogger('nrm')
10
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
11 12
                                     'power', 'processes', 'clientids',
                                     'hwbindings'])
13

14 15 16 17 18 19

class ContainerManager(object):

    """Manages the creation, listing and deletion of containers, using a
    container runtime underneath."""

20
    def __init__(self, container_runtime, rm,
21 22
                 perfwrapper="argo-perf-wrapper",
                 linuxperf="perf",
23
                 pmpi_lib="/usr/lib/libnrm-pmpi.so"):
24 25
        self.linuxperf = linuxperf
        self.perfwrapper = perfwrapper
26
        self.runtime = container_runtime
27
        self.containers = dict()
28 29
        self.pids = dict()
        self.resourcemanager = rm
30
        self.hwloc = rm.hwloc
31
        self.chrt = ChrtClient()
32
        self.pmpi_lib = pmpi_lib
33

34 35 36 37 38 39 40 41 42 43 44 45 46 47
    def _get_container_tuple(self, container_name, manifest):
        """Retrieve a container tuple if the container exists, otherwise use
        the manifest to create a new one.

        Returns (bool, container_tuple), the first field telling if a container
        needs to be created."""

        if container_name in self.containers:
            return (False, self.containers[container_name])

        # ask the resource manager for resources
        ncpus = int(manifest.app.isolators.container.cpus.value)
        nmems = int(manifest.app.isolators.container.mems.value)
        req = resources(ncpus, nmems)
48 49
        allocated = self.resourcemanager.schedule(container_name, req)
        logger.info("create: allocation: %r", allocated)
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

        # Container power settings
        container_power = dict()
        container_power['profile'] = None
        container_power['policy'] = None
        container_power['damper'] = None
        container_power['slowdown'] = None
        container_power['manager'] = None

        if manifest.is_feature_enabled('power'):
            pp = manifest.app.isolators.power
            if pp.profile in ["1", "True"]:
                container_power['profile'] = dict()
                container_power['profile']['start'] = dict()
                container_power['profile']['end'] = dict()
            if pp.policy != "NONE":
                container_power['policy'] = pp.policy
                container_power['damper'] = pp.damper
                container_power['slowdown'] = pp.slowdown

        # Compute hardware bindings
        hwbindings = dict()
        if manifest.is_feature_enabled('hwbind'):
            hwbindings['distrib'] = sorted(self.hwloc.distrib(
74
                                        ncpus, allocated), key=operator.
75
                                            attrgetter('cpus'))
76
        return (True, Container(container_name, manifest, allocated,
77 78
                                container_power, {}, {}, hwbindings))

79 80 81 82
    def create(self, request):
        """Create a container according to the request.

        Returns the pid of the container or a negative number for errors."""
83

84 85 86
        manifestfile = request['manifest']
        command = request['file']
        args = request['args']
87
        environ = request['environ']
88
        container_name = request['uuid']
89 90 91 92
        logger.info("create: manifest file:  %s", manifestfile)
        logger.info("create: command:        %s", command)
        logger.info("create: args:           %r", args)
        logger.info("create: container name: %s", container_name)
93

94 95
        manifest = ImageManifest()
        if not manifest.load(manifestfile):
96
            logger.error("Manifest is invalid")
97
            return None
98

99 100 101 102
        creation_needed, container = self._get_container_tuple(container_name,
                                                               manifest)
        if creation_needed:
            logger.info("Creating container %s", container_name)
103
            self.runtime.create(container)
104
            self.containers[container_name] = container
105

106
        # build context to execute
107 108
        # environ['PATH'] = ("/usr/local/sbin:"
        #                   "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
109
        environ['ARGO_CONTAINER_UUID'] = container_name
110
        environ['PERF'] = self.linuxperf
111 112
        environ['AC_APP_NAME'] = manifest.name
        environ['AC_METADATA_URL'] = "localhost"
113 114 115 116 117

        # power profiling uses LD_PRELOAD, we use get to ensure that it
        # doesn't crash if the policy doesn't exits.
        if container.power.get('policy'):
            environ['LD_PRELOAD'] = self.pmpi_lib
118
            environ['NRM_TRANSMIT'] = '1'
119
            environ['NRM_DAMPER'] = container.power['damper']
120

121 122 123 124 125
        # build prefix to the entire command based on enabled features
        argv = []
        if manifest.is_feature_enabled('scheduler'):
            sched = manifest.app.isolators.scheduler
            argv = self.chrt.getwrappedcmd(sched)
126

127 128 129 130
        # Use hwloc-bind to launch each process in the conatiner by prepending
        # it as an argument to the command line, if enabled in manifest.
        # The hardware binding computed using hwloc-distrib is used here
        # --single
131 132 133 134
        if container.hwbindings:
            # round robin over the cpu bindings available
            bind_index = len(container.processes) % \
                    len(container.hwbindings['distrib'])
135 136
            argv.append('hwloc-bind')
            # argv.append('--single')
137 138
            cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
            memmask = container.hwbindings['distrib'][bind_index].mems[0]
139
            logging.info('create: binding to: %s, %s', cpumask, memmask)
140
            argv.append("core:{}".format(cpumask))
141
            argv.append('--membind')
142
            argv.append("numa:{}".format(memmask))
143

144 145 146 147 148 149 150
        # It would've been better if argo-perf-wrapper wrapped around
        # argo-nodeos-config and not the final command -- that way it would
        # be running outside of the container.  However, because
        # argo-nodeos-config is suid root, perf can't monitor it.
        if manifest.is_feature_enabled('perfwrapper'):
            argv.append(self.perfwrapper)

151 152
        argv.append(command)
        argv.extend(args)
153

154
        # run my command
155
        process = self.runtime.execute(container_name, argv, environ)
156

157 158 159 160 161 162
        # register the process
        container.processes[process.pid] = process
        container.clientids[process.pid] = request['clientid']
        self.pids[process.pid] = container
        logger.info("Created process %s in container %s", process.pid,
                    container_name)
163
        return process.pid, container
164 165 166

    def delete(self, uuid):
        """Delete a container and kill all related processes."""
167
        self.runtime.delete(uuid, kill=True)
168
        self.resourcemanager.update(uuid)
169 170
        c = self.containers[uuid]
        del self.containers[uuid]
171
        map(lambda i: self.pids.pop(c.processes[i].pid, None), c.processes)
172

173 174 175 176
    def kill(self, uuid):
        """Kill all the processes of a container."""
        if uuid in self.containers:
            c = self.containers[uuid]
177
            logger.debug("killing %r:", c)
178 179
            for p in c.processes.values():
                try:
180
                    p.proc.terminate()
181 182
                except OSError:
                    logging.error("OS error: could not terminate process.")
183

184 185
    def list(self):
        """List the containers in the system."""
186 187
        return [{'uuid': c.uuid, 'pid': c.processes.keys()}
                for c in self.containers.values()]
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235


class ContainerRuntime(object):

    """Implements the creation, deleting and spawning of commands for a
    container runtime."""

    def __init__(self):
        pass

    def create(self, container):
        """Create the container defined by the container namedtuple on the
        system."""
        raise NotImplementedError

    def execute(self, container_uuid, args, environ):
        """Execute a command inside a container, using a similar interface to
        popen.

        Returns a tornado.process.Subprocess"""
        raise NotImplementedError

    def delete(self, container_uuid, kill=False):
        """Delete a container, possibly killing all the processes inside."""
        raise NotImplementedError


class NodeOSRuntime(ContainerRuntime):

    """Implements the container runtime interface using the nodeos
    subprogram."""

    def __init__(self, path="argo_nodeos_config"):
        """Creates the client for nodeos, with an optional custom
        path/command."""
        self.client = NodeOSClient(argo_nodeos_config=path)

    def create(self, container):
        """Uses the container resource allocation to create a container."""
        self.client.create(container.uuid, container.resources)

    def execute(self, container_uuid, args, environ):
        """Launches a command in the container."""
        return self.client.execute(container_uuid, args, environ)

    def delete(self, container_uuid, kill=False):
        """Delete the container."""
        self.client.delete(container_uuid, kill)
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257


class DummyRuntime(ContainerRuntime):

    """Implements a dummy runtime that doesn't create any container, but still
    launches commands."""

    def __init__(self):
        pass

    def create(self, container):
        pass

    def execute(self, container_uuid, args, environ):
        import tornado.process as process
        return process.Subprocess(args, stdin=process.Subprocess.STREAM,
                                  stdout=process.Subprocess.STREAM,
                                  stderr=process.Subprocess.STREAM,
                                  env=environ)

    def delete(self, container_uuid, kill=False):
        pass