daemon.py 14.6 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22
logger = logging.getLogger('nrm')
23
logger_power = logging.getLogger('power')
24 25

class Daemon(object):
26

27
    def __init__(self, config):
28
        self.target = 100.0
29
        self.config = config
30

31 32 33 34 35 36 37 38 39 40 41 42 43
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
44 45
                cid = msg['container']
                container = self.container_manager.containers[cid]
46
                self.application_manager.register(msg, container)
47
            elif event == 'threads':
48 49 50 51
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
52
            elif event == 'progress':
53 54 55 56
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
57
            elif event == 'phase_context':
58 59 60
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
61
                    c = self.container_manager.containers[app.cid]
62
                    if c.power['policy']:
63
                        app.update_phase_context(msg)
64
            elif event == 'exit':
65 66 67
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
68 69 70
            else:
                logger.error("unknown event: %r", event)
                return
71

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
            if len(container.processes.keys()) == 1:
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
                update = {'api': 'up_rpc_rep',
                          'type': 'start',
                          'container_uuid': container_uuid,
107
                          'errno': 0 if container else -1,
108
                          'pid': pid,
109
                          'power': container.power['policy'] or dict()
110
                          }
111 112
                self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
                                                 client)
113
                # setup io callbacks
114 115 116 117
                outcb = partial(self.do_children_io, client,
                                container_uuid, 'stdout')
                errcb = partial(self.do_children_io, client,
                                container_uuid, 'stderr')
118 119
                container.processes[pid].stdout.read_until_close(outcb, outcb)
                container.processes[pid].stderr.read_until_close(errcb, errcb)
120
            else:
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
                update = {'api': 'up_rpc_rep',
                          'type': 'process_start',
                          'container_uuid': container_uuid,
                          }
                self.upstream_rpc_server.sendmsg(
                        RPC_MSG['process_start'](**update), client)
                # setup io callbacks
                outcb = partial(self.do_children_io, client,
                                container_uuid, 'stdout')
                errcb = partial(self.do_children_io, client,
                                container_uuid, 'stderr')
                container.processes[pid].stdout.read_until_close(outcb, outcb)
                container.processes[pid].stderr.read_until_close(errcb, errcb)

        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
150

151
    def do_children_io(self, client, container_uuid, io, data):
152 153 154
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
155 156 157 158
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
159 160
                  'payload': data or 'eof',
                  }
161
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
162

163
    def do_sensor(self):
164
        self.machine_info = self.sensor_manager.do_update()
165
        logger.info("current state: %r", self.machine_info)
166
        total_power = self.machine_info['energy']['power']['total']
167 168
        msg = {'api': 'up_pub',
               'type': 'power',
169 170 171
               'total': total_power,
               'limit': self.target
               }
172
        self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
173
        logger.info("sending sensor message: %r", msg)
174 175

    def do_control(self):
176 177
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
178
        if action:
179 180
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
181 182 183
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
184 185

    def do_signal(self, signum, frame):
186 187 188 189 190
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
191
            logger.error("wrong signal: %d", signum)
192 193 194 195 196

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
197 198
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
199 200 201 202
                    break
            except OSError:
                break

203
            logger.info("child update %d: %r", pid, status)
204
            # check if its a pid we care about
205
            if pid in self.container_manager.pids:
206
                # check if this is an exit
207
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
208
                    container = self.container_manager.pids[pid]
209 210 211
                    clientid = container.clientids[pid]
                    remaining_pids = [p for p in container.processes.keys()
                                      if p != pid]
212 213 214
                    msg = {'api': 'up_rpc_rep',
                           'status': str(status),
                           'container_uuid': container.uuid,
215
                           }
216 217

                    if not remaining_pids:
218
                        msg['type'] = 'exit'
219
                        msg['profile_data'] = dict()
220 221 222 223
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
224 225 226 227 228 229 230 231 232 233 234 235 236
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
237
                        self.container_manager.delete(container.uuid)
238 239
                        self.upstream_rpc_server.sendmsg(
                                RPC_MSG['exit'](**msg), clientid)
240
                    else:
241
                        msg['type'] = 'process_exit'
242 243 244 245 246
                        # Remove the pid of process that is finished
                        container.processes.pop(pid, None)
                        self.container_manager.pids.pop(pid, None)
                        logger.info("Process %s in Container %s has finised.",
                                    pid, container.uuid)
247 248
                        self.upstream_rpc_server.sendmsg(
                                RPC_MSG['process_exit'](**msg), clientid)
249

250
            else:
251
                logger.debug("child update ignored")
252
                pass
253 254

    def do_shutdown(self):
255
        self.sensor_manager.stop()
256 257 258
        ioloop.IOLoop.current().stop()

    def main(self):
259
        # Bind address for downstream clients
260
        bind_address = '*'
261

262
        # port for upstream PUB API
263
        upstream_pub_port = 2345
264 265
        # port for upstream RPC API
        upstream_rpc_port = 3456
266

267
        # setup application listening socket
268
        context = zmq.Context()
269 270 271 272
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
273
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
274
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
275

276 277 278 279
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
280 281
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
282

283 284
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
285
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
286
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
287 288

        # register socket triggers
289 290
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
291
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
292
        # create a stream to let ioloop deal with blocking calls on HWM
293
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
294

295
        # create managers
296 297
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
298
        self.application_manager = ApplicationManager()
299 300 301 302 303 304 305
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
306 307 308 309

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
310 311 312 313 314 315

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
316
        signal.signal(signal.SIGCHLD, self.do_signal)
317 318 319 320

        ioloop.IOLoop.current().start()


321
def runner(config):
322
    ioloop.install()
323 324 325 326 327

    logger.setLevel(logging.DEBUG)
    logger_power.setLevel(logging.DEBUG)

    if config.log:
328
        print("Logging to %s" % config.log)
329 330 331
        logger.addHandler(logging.FileHandler(config.log))

    if config.log_power:
332 333 334
        print("Logging power data to %s" % config.log_power)
        formatter = logging.Formatter('%(message)s')
        handler = logging.FileHandler(config.log_power)
335 336 337 338
        handler.setFormatter(formatter)
        logger_power.addHandler(handler)

    daemon = Daemon(config)
339
    daemon.main()