daemon.py 14 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

11 12
from __future__ import print_function

13
from applications import ApplicationManager
14
from containers import ContainerManager, NodeOSRuntime, SingularityUserRuntime
15
from controller import Controller, PowerActuator
16
from powerpolicy import PowerPolicyManager
17
from functools import partial
18
import logging
19
import os
20
from resources import ResourceManager
21
from sensor import SensorManager
22
import signal
23 24 25
from zmq.eventloop import ioloop
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
        DownstreamEventServer
26

27 28
logger = logging.getLogger('nrm')

29 30

class Daemon(object):
31 32

    def __init__(self, config):
33
        self.target = 100.0
34
        self.config = config
35

36 37 38 39
    def do_downstream_receive(self, event, client):
        logger.info("receiving downstream message: %r", event)
        if event.tag == 'start':
            cid = event.container_uuid
40
            container = self.container_manager.containers[cid]
41 42 43
            self.application_manager.register(event, container)
        elif event.tag == 'progress':
            if event.application_uuid in self.application_manager.applications:
44
                app = self.application_manager.applications[
45 46 47 48 49 50 51 52 53
                        event.application_uuid]
                app.update_performance(event)
                # self.upstream_pub_server.send(event) TODO try this.
                self.upstream_pub_server.send(
                        tag='progress',
                        payload=event.payload,
                        application_uuid=event.application_uuid)
        elif event.tag == 'performance':
            if event.application_uuid in self.application_manager.applications:
54
                app = self.application_manager.applications[
55 56 57 58 59 60 61 62
                        event.application_uuid]
                app.update_performance(event)
            self.upstream_pub_server.send(
                        tag='performance',
                        payload=event.payload,
                        container_uuid=event.container_uuid)
        elif event.tag == 'phasecontext':
            uuid = event.application_uuid
63 64
            if uuid in self.application_manager.applications:
                app = self.application_manager.applications[uuid]
65 66 67 68
                if bool(self.container_manager.containers):
                    cid = app.container_uuid
                    c = self.container_manager.containers[cid]
                    if c.power['policy']:
69
                        app.update_phase_context(event)
70 71
                        # Run container policy
                        self.controller.run_policy_container(c, app)
72 73
        elif event.tag == 'exit':
            uuid = event.application_uuid
74 75 76
            if uuid in self.application_manager.applications:
                self.application_manager.delete(uuid)
        else:
77
            logger.error("unknown msg: %r", event)
78
            return
79

80 81 82
    def do_upstream_receive(self, req, client):
        if req.tag == 'setPower':
            self.target = float(req.limit)
83
            logger.info("new target measure: %g", self.target)
84 85 86 87 88 89 90 91 92 93 94 95
            self.upstream_rpc_server.send(
                    client,
                    tag='getPower',
                    limit=str(self.target))
        elif req.tag == 'run':
            logger.info("asked to run a command in a container: %r", req)
            container_uuid = req.container_uuid
            params = {'manifest': req.manifest,
                      'file': req.path,
                      'args': req.args,
                      'uuid': req.container_uuid,
                      'environ': req.environ,
96 97 98 99
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
100
            if len(container.processes) == 1:
101 102
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
103
                            container.resources.cpus,
104 105 106 107 108 109 110
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
111 112 113 114 115
                self.upstream_pub_server.send(
                        tag='start',
                        container_uuid=container_uuid,
                        errno=0 if container else -1,
                        power=container.power['policy'] or str(None))
116
            # now deal with the process itself
117 118 119 120 121
            self.upstream_rpc_server.send(
                    client,
                    tag='start',
                    pid=pid,
                    container_uuid=container_uuid)
122 123 124 125 126 127 128
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
129 130 131
        elif req.tag == 'kill':
            logger.info("asked to kill container: %r", req)
            response = self.container_manager.kill(req.container_uuid)
132
            # no update here, as it will trigger child exit
133 134
        elif req.tag == 'list':
            logger.info("asked for container list: %r", req)
135
            response = self.container_manager.list()
136 137 138
            self.upstream_rpc_server.send(
                    client,
                    tag="list",
139
                    containers=response)
140
        else:
141
            logger.error("invalid command: %r", req.tag)
142

143
    def do_children_io(self, client, container_uuid, io, data):
144 145 146
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
147
        logger.info("%r received %r data: %r", container_uuid, io, data)
148 149 150 151 152
        self.upstream_rpc_server.send(
                client,
                tag=io,
                container_uuid=container_uuid,
                payload=data or 'eof')
153

154
    def do_sensor(self):
155
        self.machine_info = self.sensor_manager.do_update()
156
        logger.info("current state: %r", self.machine_info)
157 158 159 160 161 162
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
163 164 165 166
            self.upstream_pub_server.send(
                    tag="power",
                    total=total_power,
                    limit=self.target)
167 168

    def do_control(self):
169 170
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
171
        if action:
172 173
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
174
        # Call policy only if there are containers
175 176
        # if self.container_manager.containers:
            # self.controller.run_policy(self.container_manager.containers)
177 178

    def do_signal(self, signum, frame):
179 180 181 182 183
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
184
            logger.error("wrong signal: %d", signum)
185 186 187 188 189

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
190 191
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
192 193 194 195
                    break
            except OSError:
                break

196
            logger.info("child update %d: %r", pid, status)
197
            # check if its a pid we care about
198
            if pid in self.container_manager.pids:
199
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
200
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
201
                    container = self.container_manager.pids[pid]
202
                    clientid = container.clientids[pid]
203 204

                    # first, send a process_exit
205 206 207 208 209
                    self.upstream_rpc_server.send(
                            clientid,
                            tag="exit",
                            status=str(status),
                            container_uuid=container.uuid)
210 211 212
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
Valentin Reis's avatar
Valentin Reis committed
213
                    logger.info("Process %s in Container %s has finished.",
214
                                pid, container.uuid)
215

216
                    # if this is the last process in the container,
217
                    # kill everything
218
                    if len(container.processes) == 0:
219
                        # deal with container exit
220
                        diff = {}
221 222 223 224 225
                        p = container.power
                        if p['policy']:
                            p['manager'].reset_all()
                        if p['profile']:
                            e = p['profile']['end']
226 227 228
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
229
                            s = p['profile']['start']
230 231 232 233 234
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
235 236 237 238
                            diff['policy'] = p['policy']
                            if p['policy']:
                                diff['damper'] = float(p['damper'])/1000000000
                                diff['slowdown'] = p['slowdown']
239
                            diff['nodename'] = self.sensor_manager.nodename
240 241
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
242
                        self.container_manager.delete(container.uuid)
243 244 245 246
                        self.upstream_pub_server.send(
                                tag="exit",
                                container_uuid=container.uuid,
                                profile_data=diff)
247
            else:
248
                logger.debug("child update ignored")
249
                pass
250 251

    def do_shutdown(self):
252
        self.sensor_manager.stop()
253 254 255
        ioloop.IOLoop.current().stop()

    def main(self):
256
        # Bind address for downstream clients
257
        bind_address = '*'
258

259
        # port for upstream PUB API
260
        upstream_pub_port = 2345
261 262
        # port for upstream RPC API
        upstream_rpc_port = 3456
263

264
        # setup application listening socket
265
        downstream_event_param = "ipc:///tmp/nrm-downstream-event"
266
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
267
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
268

269
        self.downstream_event = DownstreamEventServer(downstream_event_param)
270 271
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
272

273 274
        logger.info("downstream event socket bound to: %s",
                    downstream_event_param)
275
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
276
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
277 278

        # register socket triggers
279
        self.downstream_event.setup_recv_callback(self.do_downstream_receive)
280
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
281

282
        # create managers
283
        self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
284 285 286 287 288 289 290 291
        container_runtime = None
        if self.config.container_runtime == 'nodeos':
            container_runtime = \
                NodeOSRuntime(path=self.config.argo_nodeos_config)
        elif self.config.container_runtime == 'singularity':
            container_runtime = \
                SingularityUserRuntime(self.config.singularity)
        assert(container_runtime is not None)
292
        self.container_manager = ContainerManager(
293 294 295 296 297
                container_runtime,
                self.resource_manager,
                perfwrapper=self.config.argo_perf_wrapper,
                linuxperf=self.config.perf,
                pmpi_lib=self.config.pmpi_lib,
298 299
                downstream_event_uri=downstream_event_param,
        )
300
        self.application_manager = ApplicationManager()
301
        self.sensor_manager = SensorManager(use_msr_safe=self.config.msr_safe)
302
        pa = PowerActuator(self.sensor_manager)
303
        self.controller = Controller([pa])
304 305 306

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
307 308 309 310

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
311 312 313 314 315 316

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
317
        signal.signal(signal.SIGCHLD, self.do_signal)
318 319 320 321

        ioloop.IOLoop.current().start()


322
def runner(config):
323 324
    if config.verbose:
        logger.setLevel(logging.DEBUG)
325 326 327 328 329 330

    if config.nrm_log:
        print("Logging to %s" % config.nrm_log)
        logger.addHandler(logging.FileHandler(config.nrm_log))

    daemon = Daemon(config)
331
    daemon.main()