daemon.py 13.1 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import logging
9
import os
10
from resources import ResourceManager
11
from sensor import SensorManager
12
import signal
13
from zmq.eventloop import ioloop
14
from nrm.messaging import MSGTYPES
15 16
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
        DownstreamEventServer
17

18 19
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
20

21 22
logger = logging.getLogger('nrm')

23 24

class Daemon(object):
25 26

    def __init__(self, config):
27
        self.target = 100.0
28
        self.config = config
29

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
    def do_downstream_receive(self, msg, client):
        logger.info("receiving downstream message: %r", msg)
        if msg.type == 'application_start':
            cid = msg.container_uuid
            container = self.container_manager.containers[cid]
            self.application_manager.register(msg, container)
        elif msg.type == 'progress':
            uuid = msg.container_uuid
            if uuid in self.application_manager.applications:
                app = self.application_manager.applications[uuid]
                app.update_progress(msg)
        elif msg.type == 'phase_context':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                app = self.application_manager.applications[uuid]
                c = self.container_manager.containers[app.cid]
                if c.power['policy']:
                    app.update_phase_context(msg)
        elif msg.type == 'application_exit':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                self.application_manager.delete(uuid)
        else:
            logger.error("unknown msg: %r", msg)
54
            return
55

56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
77
            if len(container.processes) == 1:
78 79 80 81 82 83 84 85 86 87
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
88 89
                update = {'api': 'up_pub',
                          'type': 'container_start',
90
                          'container_uuid': container_uuid,
91
                          'errno': 0 if container else -1,
92
                          'power': container.power['policy'] or dict()
93
                          }
94 95
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
96

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
127

128
    def do_children_io(self, client, container_uuid, io, data):
129 130 131
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
132 133 134 135
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
136 137
                  'payload': data or 'eof',
                  }
138
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
139

140
    def do_sensor(self):
141
        self.machine_info = self.sensor_manager.do_update()
142
        logger.info("current state: %r", self.machine_info)
143 144 145 146 147 148 149 150 151 152 153 154 155
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
            msg = {'api': 'up_pub',
                   'type': 'power',
                   'total': total_power,
                   'limit': self.target
                   }
            self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
            logger.info("sending sensor message: %r", msg)
156 157

    def do_control(self):
158 159
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
160
        if action:
161 162
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
163 164 165
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
166 167

    def do_signal(self, signum, frame):
168 169 170 171 172
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
173
            logger.error("wrong signal: %d", signum)
174 175 176 177 178

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
179 180
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
181 182 183 184
                    break
            except OSError:
                break

185
            logger.info("child update %d: %r", pid, status)
186
            # check if its a pid we care about
187
            if pid in self.container_manager.pids:
188
                # check if this is an exit
189
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
190
                    container = self.container_manager.pids[pid]
191
                    clientid = container.clientids[pid]
192 193

                    # first, send a process_exit
194
                    msg = {'api': 'up_rpc_rep',
195
                           'type': 'process_exit',
196 197
                           'status': str(status),
                           'container_uuid': container.uuid,
198
                           }
199 200 201 202 203 204 205
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
206

207
                    # if this is the last process in the container,
208
                    # kill everything
209
                    if len(container.processes) == 0:
210
                        # deal with container exit
211 212
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
213 214 215
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
216 217 218 219
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
220 221 222 223 224 225 226 227 228 229 230 231 232
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
233
                        self.container_manager.delete(container.uuid)
234 235
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
236
            else:
237
                logger.debug("child update ignored")
238
                pass
239 240

    def do_shutdown(self):
241
        self.sensor_manager.stop()
242 243 244
        ioloop.IOLoop.current().stop()

    def main(self):
245
        # Bind address for downstream clients
246
        bind_address = '*'
247

248
        # port for upstream PUB API
249
        upstream_pub_port = 2345
250 251
        # port for upstream RPC API
        upstream_rpc_port = 3456
252

253
        # setup application listening socket
254
        downstream_event_param = "ipc:///tmp/nrm-downstream-event"
255
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
256
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
257

258
        self.downstream_event = DownstreamEventServer(downstream_event_param)
259 260
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
261

262 263
        logger.info("downstream event socket bound to: %s",
                    downstream_event_param)
264
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
265
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
266 267

        # register socket triggers
268
        self.downstream_event.setup_recv_callback(self.do_downstream_receive)
269
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
270

271
        # create managers
272 273 274 275 276
        self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
        self.container_manager = ContainerManager(
           self.resource_manager,
           perfwrapper=self.config.argo_perf_wrapper,
           linuxperf=self.config.perf,
277 278
           argo_nodeos_config=self.config.argo_nodeos_config,
           pmpi_lib=self.config.pmpi_lib,
279
           )
280
        self.application_manager = ApplicationManager()
281 282 283 284 285 286 287
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
288 289 290 291

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
292 293 294 295 296 297

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
298
        signal.signal(signal.SIGCHLD, self.do_signal)
299 300 301 302

        ioloop.IOLoop.current().start()


303
def runner(config):
304
    ioloop.install()
305 306 307 308 309 310 311 312

    logger.setLevel(logging.DEBUG)

    if config.nrm_log:
        print("Logging to %s" % config.nrm_log)
        logger.addHandler(logging.FileHandler(config.nrm_log))

    daemon = Daemon(config)
313
    daemon.main()