daemon.py 14.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

11 12
from __future__ import print_function

13
from applications import ApplicationManager
14
from containers import ContainerManager, NodeOSRuntime
15
from controller import Controller, PowerActuator
16
from powerpolicy import PowerPolicyManager
17
from functools import partial
18
import logging
19
import os
20
from resources import ResourceManager
21
from sensor import SensorManager
22
import signal
23
from zmq.eventloop import ioloop
24
from nrm.messaging import MSGTYPES
25 26
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
        DownstreamEventServer
27

28 29
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
30

31 32
logger = logging.getLogger('nrm')

33 34

class Daemon(object):
35 36

    def __init__(self, config):
37
        self.target = 100.0
38
        self.config = config
39

40 41 42 43 44 45 46
    def do_downstream_receive(self, msg, client):
        logger.info("receiving downstream message: %r", msg)
        if msg.type == 'application_start':
            cid = msg.container_uuid
            container = self.container_manager.containers[cid]
            self.application_manager.register(msg, container)
        elif msg.type == 'progress':
47 48 49 50
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
51 52 53 54 55 56
                pub = {'api': 'up_pub',
                       'type': 'progress',
                       'payload': msg.payload,
                       'application_uuid': msg.application_uuid}
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['progress'](**pub))
57 58 59 60 61 62 63 64 65 66 67
        elif msg.type == 'performance':
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
            pub = {'api': 'up_pub',
                   'type': 'performance',
                   'payload': msg.payload,
                   'container_uuid': msg.container_uuid}
            self.upstream_pub_server.sendmsg(
                    PUB_MSG['performance'](**pub))
68 69 70 71
        elif msg.type == 'phase_context':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                app = self.application_manager.applications[uuid]
72 73 74 75 76 77 78
                if bool(self.container_manager.containers):
                    cid = app.container_uuid
                    c = self.container_manager.containers[cid]
                    if c.power['policy']:
                        app.update_phase_context(msg)
                        # Run container policy
                        self.controller.run_policy_container(c, app)
79 80 81 82 83 84
        elif msg.type == 'application_exit':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                self.application_manager.delete(uuid)
        else:
            logger.error("unknown msg: %r", msg)
85
            return
86

87 88 89 90 91 92 93 94 95 96 97
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
98
            logger.info("asked to run a command in a container: %r", msg)
99 100 101 102 103 104 105 106 107 108
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
109
            if len(container.processes) == 1:
110 111
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
112
                            container.resources.cpus,
113 114 115 116 117 118 119
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
120 121
                update = {'api': 'up_pub',
                          'type': 'container_start',
122
                          'container_uuid': container_uuid,
123
                          'errno': 0 if container else -1,
124
                          'power': container.power['policy'] or str(None)
125
                          }
126 127
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
128

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
159

160
    def do_children_io(self, client, container_uuid, io, data):
161 162 163
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
164 165 166 167
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
168 169
                  'payload': data or 'eof',
                  }
170
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
171

172
    def do_sensor(self):
173
        self.machine_info = self.sensor_manager.do_update()
174
        logger.info("current state: %r", self.machine_info)
175 176 177 178 179 180 181 182 183 184 185 186 187
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
            msg = {'api': 'up_pub',
                   'type': 'power',
                   'total': total_power,
                   'limit': self.target
                   }
            self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
            logger.info("sending sensor message: %r", msg)
188 189

    def do_control(self):
190 191
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
192
        if action:
193 194
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
195
        # Call policy only if there are containers
196 197
        # if self.container_manager.containers:
            # self.controller.run_policy(self.container_manager.containers)
198 199

    def do_signal(self, signum, frame):
200 201 202 203 204
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
205
            logger.error("wrong signal: %d", signum)
206 207 208 209 210

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
211 212
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
213 214 215 216
                    break
            except OSError:
                break

217
            logger.info("child update %d: %r", pid, status)
218
            # check if its a pid we care about
219
            if pid in self.container_manager.pids:
220
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
221
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
222
                    container = self.container_manager.pids[pid]
223
                    clientid = container.clientids[pid]
224 225

                    # first, send a process_exit
226
                    msg = {'api': 'up_rpc_rep',
227
                           'type': 'process_exit',
228 229
                           'status': str(status),
                           'container_uuid': container.uuid,
230
                           }
231 232 233 234 235
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
Valentin Reis's avatar
Valentin Reis committed
236
                    logger.info("Process %s in Container %s has finished.",
237
                                pid, container.uuid)
238

239
                    # if this is the last process in the container,
240
                    # kill everything
241
                    if len(container.processes) == 0:
242
                        # deal with container exit
243 244
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
245 246 247
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
248 249 250 251 252
                        p = container.power
                        if p['policy']:
                            p['manager'].reset_all()
                        if p['profile']:
                            e = p['profile']['end']
253 254 255
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
256
                            s = p['profile']['start']
257 258 259 260 261
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
262 263 264 265
                            diff['policy'] = p['policy']
                            if p['policy']:
                                diff['damper'] = float(p['damper'])/1000000000
                                diff['slowdown'] = p['slowdown']
266
                            diff['nodename'] = self.sensor_manager.nodename
267 268 269
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
270
                        self.container_manager.delete(container.uuid)
271 272
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
273
            else:
274
                logger.debug("child update ignored")
275
                pass
276 277

    def do_shutdown(self):
278
        self.sensor_manager.stop()
279 280 281
        ioloop.IOLoop.current().stop()

    def main(self):
282
        # Bind address for downstream clients
283
        bind_address = '*'
284

285
        # port for upstream PUB API
286
        upstream_pub_port = 2345
287 288
        # port for upstream RPC API
        upstream_rpc_port = 3456
289

290
        # setup application listening socket
291
        downstream_event_param = "ipc:///tmp/nrm-downstream-event"
292
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
293
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
294

295
        self.downstream_event = DownstreamEventServer(downstream_event_param)
296 297
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
298

299 300
        logger.info("downstream event socket bound to: %s",
                    downstream_event_param)
301
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
302
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
303 304

        # register socket triggers
305
        self.downstream_event.setup_recv_callback(self.do_downstream_receive)
306
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
307

308
        # create managers
309
        self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
310
        container_runtime = \
311
            NodeOSRuntime(self.config.argo_nodeos_config)
312
        self.container_manager = ContainerManager(
313 314 315 316 317
                container_runtime,
                self.resource_manager,
                perfwrapper=self.config.argo_perf_wrapper,
                linuxperf=self.config.perf,
                pmpi_lib=self.config.pmpi_lib,
318
           )
319
        self.application_manager = ApplicationManager()
320 321
        self.sensor_manager = SensorManager()
        pa = PowerActuator(self.sensor_manager)
322
        self.controller = Controller([pa])
323 324 325

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
326 327 328 329

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
330 331 332 333 334 335

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
336
        signal.signal(signal.SIGCHLD, self.do_signal)
337 338 339 340

        ioloop.IOLoop.current().start()


341
def runner(config):
342 343
    if config.verbose:
        logger.setLevel(logging.DEBUG)
344 345 346 347 348 349

    if config.nrm_log:
        print("Logging to %s" % config.nrm_log)
        logger.addHandler(logging.FileHandler(config.nrm_log))

    daemon = Daemon(config)
350
    daemon.main()