daemon.py 13.7 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22 23
logger = logging.getLogger('nrm')

24 25 26

class Daemon(object):
    def __init__(self):
27
        self.target = 100.0
28

29 30 31 32 33 34 35 36 37 38 39 40 41
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
42 43
                cid = msg['container']
                container = self.container_manager.containers[cid]
44
                self.application_manager.register(msg, container)
45
            elif event == 'threads':
46 47 48 49
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
50
            elif event == 'progress':
51 52 53 54
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
55
            elif event == 'phase_context':
56 57 58
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
59
                    c = self.container_manager.containers[app.cid]
60
                    if c.power['policy']:
61
                        app.update_phase_context(msg)
62
            elif event == 'exit':
63 64 65
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
66 67 68
            else:
                logger.error("unknown event: %r", event)
                return
69

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
            if len(container.processes.keys()) == 1:
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
                update = {'api': 'up_rpc_rep',
                          'type': 'start',
                          'container_uuid': container_uuid,
105
                          'errno': 0 if container else -1,
106
                          'power': container.power['policy'] or dict()
107
                          }
108 109 110
                self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
                                                 client)

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
141

142
    def do_children_io(self, client, container_uuid, io, data):
143 144 145
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
146 147 148 149
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
150 151
                  'payload': data or 'eof',
                  }
152
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
153

154
    def do_sensor(self):
155
        self.machine_info = self.sensor_manager.do_update()
156
        logger.info("current state: %r", self.machine_info)
157
        total_power = self.machine_info['energy']['power']['total']
158 159
        msg = {'api': 'up_pub',
               'type': 'power',
160 161 162
               'total': total_power,
               'limit': self.target
               }
163
        self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
164
        logger.info("sending sensor message: %r", msg)
165 166

    def do_control(self):
167 168
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
169
        if action:
170 171
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
172 173 174
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
175 176

    def do_signal(self, signum, frame):
177 178 179 180 181
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
182
            logger.error("wrong signal: %d", signum)
183 184 185 186 187

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
188 189
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
190 191 192 193
                    break
            except OSError:
                break

194
            logger.info("child update %d: %r", pid, status)
195
            # check if its a pid we care about
196
            if pid in self.container_manager.pids:
197
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
198
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
199
                    container = self.container_manager.pids[pid]
200
                    clientid = container.clientids[pid]
201 202

                    # first, send a process_exit
203
                    msg = {'api': 'up_rpc_rep',
204
                           'type': 'process_exit',
205 206
                           'status': str(status),
                           'container_uuid': container.uuid,
207
                           }
208 209 210 211 212 213 214
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
215

216 217 218 219 220 221 222
                    if not container.processes:
                        # deal with container exit
                        msg = {'api': 'up_rpc_rep',
                               'type': 'exit',
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
223 224 225 226
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
227 228 229 230 231 232 233 234 235 236 237 238 239
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
240
                        self.container_manager.delete(container.uuid)
241 242
                        self.upstream_rpc_server.sendmsg(
                                RPC_MSG['exit'](**msg), clientid)
243
            else:
244
                logger.debug("child update ignored")
245
                pass
246 247

    def do_shutdown(self):
248
        self.sensor_manager.stop()
249 250 251
        ioloop.IOLoop.current().stop()

    def main(self):
252
        # Bind address for downstream clients
253
        bind_address = '*'
254

255
        # port for upstream PUB API
256
        upstream_pub_port = 2345
257 258
        # port for upstream RPC API
        upstream_rpc_port = 3456
259

260
        # setup application listening socket
261
        context = zmq.Context()
262 263 264 265
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
266
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
267
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
268

269 270 271 272
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
273 274
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
275

276 277
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
278
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
279
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
280 281

        # register socket triggers
282 283
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
284
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
285
        # create a stream to let ioloop deal with blocking calls on HWM
286
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
287

288
        # create managers
289 290
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
291
        self.application_manager = ApplicationManager()
292 293 294 295 296 297 298
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
299 300 301 302

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
303 304 305 306 307 308

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
309
        signal.signal(signal.SIGCHLD, self.do_signal)
310 311 312 313 314 315 316 317 318

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()