daemon.py 14 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22 23
logger = logging.getLogger('nrm')

24 25 26

class Daemon(object):
    def __init__(self):
27
        self.target = 100.0
28

29 30 31 32 33 34 35 36 37 38 39 40 41
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
42 43
                cid = msg['container']
                container = self.container_manager.containers[cid]
44
                self.application_manager.register(msg, container)
45
            elif event == 'threads':
46 47 48 49
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
50
            elif event == 'progress':
51 52 53 54
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
55
            elif event == 'phase_context':
56 57 58
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
59
                    c = self.container_manager.containers[app.cid]
60
                    if c.power['policy']:
61
                        app.update_phase_context(msg)
62
            elif event == 'exit':
63 64 65
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
66 67 68
            else:
                logger.error("unknown event: %r", event)
                return
69

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
            if len(container.processes.keys()) == 1:
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
                update = {'api': 'up_rpc_rep',
                          'type': 'start',
                          'container_uuid': container_uuid,
105
                          'errno': 0 if container else -1,
106
                          'pid': pid,
107
                          'power': container.power['policy'] or dict()
108
                          }
109 110
                self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
                                                 client)
111
                # setup io callbacks
112 113 114 115
                outcb = partial(self.do_children_io, client,
                                container_uuid, 'stdout')
                errcb = partial(self.do_children_io, client,
                                container_uuid, 'stderr')
116 117
                container.processes[pid].stdout.read_until_close(outcb, outcb)
                container.processes[pid].stderr.read_until_close(errcb, errcb)
118
            else:
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
                update = {'api': 'up_rpc_rep',
                          'type': 'process_start',
                          'container_uuid': container_uuid,
                          }
                self.upstream_rpc_server.sendmsg(
                        RPC_MSG['process_start'](**update), client)
                # setup io callbacks
                outcb = partial(self.do_children_io, client,
                                container_uuid, 'stdout')
                errcb = partial(self.do_children_io, client,
                                container_uuid, 'stderr')
                container.processes[pid].stdout.read_until_close(outcb, outcb)
                container.processes[pid].stderr.read_until_close(errcb, errcb)

        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
148

149
    def do_children_io(self, client, container_uuid, io, data):
150 151 152
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
153 154 155 156
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
157 158
                  'payload': data or 'eof',
                  }
159
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
160

161
    def do_sensor(self):
162
        self.machine_info = self.sensor_manager.do_update()
163
        logger.info("current state: %r", self.machine_info)
164
        total_power = self.machine_info['energy']['power']['total']
165 166
        msg = {'api': 'up_pub',
               'type': 'power',
167 168 169
               'total': total_power,
               'limit': self.target
               }
170
        self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
171
        logger.info("sending sensor message: %r", msg)
172 173

    def do_control(self):
174 175
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
176
        if action:
177 178
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
179 180 181
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
182 183

    def do_signal(self, signum, frame):
184 185 186 187 188
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
189
            logger.error("wrong signal: %d", signum)
190 191 192 193 194

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
195 196
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
197 198 199 200
                    break
            except OSError:
                break

201
            logger.info("child update %d: %r", pid, status)
202
            # check if its a pid we care about
203
            if pid in self.container_manager.pids:
204
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
205
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
206
                    container = self.container_manager.pids[pid]
207 208 209
                    clientid = container.clientids[pid]
                    remaining_pids = [p for p in container.processes.keys()
                                      if p != pid]
210 211 212
                    msg = {'api': 'up_rpc_rep',
                           'status': str(status),
                           'container_uuid': container.uuid,
213
                           }
214 215

                    if not remaining_pids:
216
                        msg['type'] = 'exit'
217 218 219 220
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
221 222 223 224 225 226 227 228 229 230 231 232 233
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
234
                        self.container_manager.delete(container.uuid)
235 236
                        self.upstream_rpc_server.sendmsg(
                                RPC_MSG['exit'](**msg), clientid)
237
                    else:
238
                        msg['type'] = 'process_exit'
239 240 241 242 243
                        # Remove the pid of process that is finished
                        container.processes.pop(pid, None)
                        self.container_manager.pids.pop(pid, None)
                        logger.info("Process %s in Container %s has finised.",
                                    pid, container.uuid)
244 245
                        self.upstream_rpc_server.sendmsg(
                                RPC_MSG['process_exit'](**msg), clientid)
246

247
            else:
248
                logger.debug("child update ignored")
249
                pass
250 251

    def do_shutdown(self):
252
        self.sensor_manager.stop()
253 254 255
        ioloop.IOLoop.current().stop()

    def main(self):
256
        # Bind address for downstream clients
257
        bind_address = '*'
258

259
        # port for upstream PUB API
260
        upstream_pub_port = 2345
261 262
        # port for upstream RPC API
        upstream_rpc_port = 3456
263

264
        # setup application listening socket
265
        context = zmq.Context()
266 267 268 269
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
270
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
271
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
272

273 274 275 276
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
277 278
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
279

280 281
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
282
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
283
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
284 285

        # register socket triggers
286 287
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
288
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
289
        # create a stream to let ioloop deal with blocking calls on HWM
290
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
291

292
        # create managers
293 294
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
295
        self.application_manager = ApplicationManager()
296 297 298 299 300 301 302
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
303 304 305 306

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
307 308 309 310 311 312

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
313
        signal.signal(signal.SIGCHLD, self.do_signal)
314 315 316 317 318 319 320 321 322

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()