daemon.py 13.8 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22 23
logger = logging.getLogger('nrm')

24 25 26

class Daemon(object):
    def __init__(self):
27
        self.target = 100.0
28

29 30 31 32 33 34 35 36 37 38 39 40 41
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
42 43
                cid = msg['container']
                container = self.container_manager.containers[cid]
44
                self.application_manager.register(msg, container)
45
            elif event == 'threads':
46 47 48 49
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
50
            elif event == 'progress':
51 52 53 54
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
55
            elif event == 'phase_context':
56 57 58
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
59
                    c = self.container_manager.containers[app.cid]
60
                    if c.power['policy']:
61
                        app.update_phase_context(msg)
62
            elif event == 'exit':
63 64 65
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
66 67 68
            else:
                logger.error("unknown event: %r", event)
                return
69

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
91
            if len(container.processes) == 1:
92 93 94 95 96 97 98 99 100 101
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
102 103
                update = {'api': 'up_pub',
                          'type': 'container_start',
104
                          'container_uuid': container_uuid,
105
                          'errno': 0 if container else -1,
106
                          'power': container.power['policy'] or dict()
107
                          }
108 109
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
110

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
141

142
    def do_children_io(self, client, container_uuid, io, data):
143 144 145
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
146 147 148 149
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
150 151
                  'payload': data or 'eof',
                  }
152
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
153

154
    def do_sensor(self):
155
        self.machine_info = self.sensor_manager.do_update()
156
        logger.info("current state: %r", self.machine_info)
157
        total_power = self.machine_info['energy']['power']['total']
158 159
        msg = {'api': 'up_pub',
               'type': 'power',
160 161 162
               'total': total_power,
               'limit': self.target
               }
163
        self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
164
        logger.info("sending sensor message: %r", msg)
165 166

    def do_control(self):
167 168
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
169
        if action:
170 171
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
172 173 174
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
175 176

    def do_signal(self, signum, frame):
177 178 179 180 181
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
182
            logger.error("wrong signal: %d", signum)
183 184 185 186 187

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
188 189
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
190 191 192 193
                    break
            except OSError:
                break

194
            logger.info("child update %d: %r", pid, status)
195
            # check if its a pid we care about
196
            if pid in self.container_manager.pids:
197
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
198
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
199
                    container = self.container_manager.pids[pid]
200
                    clientid = container.clientids[pid]
201 202

                    # first, send a process_exit
203
                    msg = {'api': 'up_rpc_rep',
204
                           'type': 'process_exit',
205 206
                           'status': str(status),
                           'container_uuid': container.uuid,
207
                           }
208 209 210 211 212 213 214
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
215

216
                    # if this is the last process in the container,
217
                    # kill everything
218
                    if len(container.processes) == 0:
219
                        # deal with container exit
220 221
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
222 223 224
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
225 226 227 228
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
229 230 231 232 233 234 235 236 237 238 239 240 241
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
242
                        self.container_manager.delete(container.uuid)
243 244
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
245
            else:
246
                logger.debug("child update ignored")
247
                pass
248 249

    def do_shutdown(self):
250
        self.sensor_manager.stop()
251 252 253
        ioloop.IOLoop.current().stop()

    def main(self):
254
        # Bind address for downstream clients
255
        bind_address = '*'
256

257
        # port for upstream PUB API
258
        upstream_pub_port = 2345
259 260
        # port for upstream RPC API
        upstream_rpc_port = 3456
261

262
        # setup application listening socket
263
        context = zmq.Context()
264 265 266 267
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
268
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
269
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
270

271 272 273 274
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
275 276
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
277

278 279
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
280
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
281
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
282 283

        # register socket triggers
284 285
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
286
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
287
        # create a stream to let ioloop deal with blocking calls on HWM
288
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
289

290
        # create managers
291 292
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
293
        self.application_manager = ApplicationManager()
294 295 296 297 298 299 300
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
301 302 303 304

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
305 306 307 308 309 310

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
311
        signal.signal(signal.SIGCHLD, self.do_signal)
312 313 314 315 316 317 318 319 320

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()