daemon.py 14 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22 23
logger = logging.getLogger('nrm')

24 25 26

class Daemon(object):
    def __init__(self):
27
        self.target = 100.0
28
        self.container_owner = dict()
29

30 31 32 33 34 35 36 37 38 39 40 41 42
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
43 44
                cid = msg['container']
                container = self.container_manager.containers[cid]
45
                self.application_manager.register(msg, container)
46
            elif event == 'threads':
47 48 49 50
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
51
            elif event == 'progress':
52 53 54 55
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
56
            elif event == 'phase_context':
57 58 59
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
60
                    c = self.container_manager.containers[app.cid]
61
                    if c.power['policy']:
62
                        app.update_phase_context(msg)
63
            elif event == 'exit':
64 65 66
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
67 68 69
            else:
                logger.error("unknown event: %r", event)
                return
70

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
            if len(container.processes.keys()) == 1:
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
103 104
                update = {'api': 'up_pub',
                          'type': 'container_start',
105
                          'container_uuid': container_uuid,
106
                          'errno': 0 if container else -1,
107
                          'power': container.power['policy'] or dict()
108
                          }
109 110
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
111
                self.container_owner[container.uuid] = client
112

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
143

144
    def do_children_io(self, client, container_uuid, io, data):
145 146 147
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
148 149 150 151
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
152 153
                  'payload': data or 'eof',
                  }
154
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
155

156
    def do_sensor(self):
157
        self.machine_info = self.sensor_manager.do_update()
158
        logger.info("current state: %r", self.machine_info)
159
        total_power = self.machine_info['energy']['power']['total']
160 161
        msg = {'api': 'up_pub',
               'type': 'power',
162 163 164
               'total': total_power,
               'limit': self.target
               }
165
        self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
166
        logger.info("sending sensor message: %r", msg)
167 168

    def do_control(self):
169 170
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
171
        if action:
172 173
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
174 175 176
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
177 178

    def do_signal(self, signum, frame):
179 180 181 182 183
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
184
            logger.error("wrong signal: %d", signum)
185 186 187 188 189

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
190 191
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
192 193 194 195
                    break
            except OSError:
                break

196
            logger.info("child update %d: %r", pid, status)
197
            # check if its a pid we care about
198
            if pid in self.container_manager.pids:
199
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
200
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
201
                    container = self.container_manager.pids[pid]
202
                    clientid = container.clientids[pid]
203 204

                    # first, send a process_exit
205
                    msg = {'api': 'up_rpc_rep',
206
                           'type': 'process_exit',
207 208
                           'status': str(status),
                           'container_uuid': container.uuid,
209
                           }
210 211 212 213 214 215 216
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
217

218 219 220
                    # if this process was owner of the container,
                    # kill everything
                    if self.container_owner[container.uuid] == clientid:
221
                        # deal with container exit
222 223
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
224 225 226
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
227 228 229 230
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
231 232 233 234 235 236 237 238 239 240 241 242 243
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
244
                        self.container_manager.delete(container.uuid)
245 246
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
247
                        del self.container_owner[container.uuid]
248
            else:
249
                logger.debug("child update ignored")
250
                pass
251 252

    def do_shutdown(self):
253
        self.sensor_manager.stop()
254 255 256
        ioloop.IOLoop.current().stop()

    def main(self):
257
        # Bind address for downstream clients
258
        bind_address = '*'
259

260
        # port for upstream PUB API
261
        upstream_pub_port = 2345
262 263
        # port for upstream RPC API
        upstream_rpc_port = 3456
264

265
        # setup application listening socket
266
        context = zmq.Context()
267 268 269 270
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
271
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
272
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
273

274 275 276 277
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
278 279
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
280

281 282
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
283
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
284
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
285 286

        # register socket triggers
287 288
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
289
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
290
        # create a stream to let ioloop deal with blocking calls on HWM
291
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
292

293
        # create managers
294 295
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
296
        self.application_manager = ApplicationManager()
297 298 299 300 301 302 303
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
304 305 306 307

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
308 309 310 311 312 313

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
314
        signal.signal(signal.SIGCHLD, self.do_signal)
315 316 317 318 319 320 321 322 323

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()