daemon.py 14 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22 23
logger = logging.getLogger('nrm')

24 25 26

class Daemon(object):
    def __init__(self):
27
        self.target = 100.0
28

29 30 31 32 33 34 35 36 37 38 39 40 41
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
42 43
                cid = msg['container']
                container = self.container_manager.containers[cid]
44
                self.application_manager.register(msg, container)
45
            elif event == 'threads':
46 47 48 49
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
50
            elif event == 'progress':
51 52 53 54
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
55
            elif event == 'phase_context':
56 57 58
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
59
                    c = self.container_manager.containers[app.cid]
60
                    if c.power['policy']:
61
                        app.update_phase_context(msg)
62
            elif event == 'exit':
63 64 65
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
66 67 68
            else:
                logger.error("unknown event: %r", event)
                return
69

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
91
            if len(container.processes) == 1:
92 93 94 95 96 97 98 99 100 101
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
102 103
                update = {'api': 'up_pub',
                          'type': 'container_start',
104
                          'container_uuid': container_uuid,
105
                          'errno': 0 if container else -1,
106
                          'power': container.power['policy'] or dict()
107
                          }
108 109
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
110

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
141

142
    def do_children_io(self, client, container_uuid, io, data):
143 144 145
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
146 147 148 149
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
150 151
                  'payload': data or 'eof',
                  }
152
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
153

154
    def do_sensor(self):
155
        self.machine_info = self.sensor_manager.do_update()
156
        logger.info("current state: %r", self.machine_info)
157 158 159 160 161 162 163 164 165 166 167 168 169
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
            msg = {'api': 'up_pub',
                   'type': 'power',
                   'total': total_power,
                   'limit': self.target
                   }
            self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
            logger.info("sending sensor message: %r", msg)
170 171

    def do_control(self):
172 173
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
174
        if action:
175 176
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
177 178 179
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
180 181

    def do_signal(self, signum, frame):
182 183 184 185 186
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
187
            logger.error("wrong signal: %d", signum)
188 189 190 191 192

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
193 194
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
195 196 197 198
                    break
            except OSError:
                break

199
            logger.info("child update %d: %r", pid, status)
200
            # check if its a pid we care about
201
            if pid in self.container_manager.pids:
202
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
203
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
204
                    container = self.container_manager.pids[pid]
205
                    clientid = container.clientids[pid]
206 207

                    # first, send a process_exit
208
                    msg = {'api': 'up_rpc_rep',
209
                           'type': 'process_exit',
210 211
                           'status': str(status),
                           'container_uuid': container.uuid,
212
                           }
213 214 215 216 217 218 219
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
220

221
                    # if this is the last process in the container,
222
                    # kill everything
223
                    if len(container.processes) == 0:
224
                        # deal with container exit
225 226
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
227 228 229
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
230 231 232 233
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
234 235 236 237 238 239 240 241 242 243 244 245 246
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
247
                        self.container_manager.delete(container.uuid)
248 249
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
250
            else:
251
                logger.debug("child update ignored")
252
                pass
253 254

    def do_shutdown(self):
255
        self.sensor_manager.stop()
256 257 258
        ioloop.IOLoop.current().stop()

    def main(self):
259
        # Bind address for downstream clients
260
        bind_address = '*'
261

262
        # port for upstream PUB API
263
        upstream_pub_port = 2345
264 265
        # port for upstream RPC API
        upstream_rpc_port = 3456
266

267
        # setup application listening socket
268
        context = zmq.Context()
269 270 271 272
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
273
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
274
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
275

276 277 278 279
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
280 281
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
282

283 284
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
285
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
286
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
287 288

        # register socket triggers
289 290
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
291
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
292
        # create a stream to let ioloop deal with blocking calls on HWM
293
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
294

295
        # create managers
296 297
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
298
        self.application_manager = ApplicationManager()
299 300 301 302 303 304 305
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
306 307 308 309

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
310 311 312 313 314 315

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
316
        signal.signal(signal.SIGCHLD, self.do_signal)
317 318 319 320 321 322 323 324 325

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()