daemon.py 13.2 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16

17

18 19
logger = logging.getLogger('nrm')

20 21 22

class Daemon(object):
    def __init__(self):
23
        self.target = 100.0
24

25 26 27 28 29 30 31 32 33 34 35 36 37
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
38 39
                cid = msg['container']
                container = self.container_manager.containers[cid]
40
                self.application_manager.register(msg, container)
41
            elif event == 'threads':
42 43 44 45
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
46
            elif event == 'progress':
47 48 49 50
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
51
            elif event == 'phase_context':
52 53 54
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
55
                    c = self.container_manager.containers[app.cid]
56
                    if c.power['policy']:
57
                        app.update_phase_context(msg)
58
            elif event == 'exit':
59 60 61
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
62 63 64
            else:
                logger.error("unknown event: %r", event)
                return
65

66
    def do_upstream_receive(self, parts):
67
        logger.info("receiving upstream message: %r", parts)
68
        if len(parts) != 1:
69
            logger.error("unexpected msg length, dropping it: %r", parts)
70 71
            return
        msg = json.loads(parts[0])
72 73 74 75
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
76
                logger.error("missing command in message: %r", msg)
77 78 79
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
80
                logger.info("new target measure: %g", self.target)
81
            elif command == 'run':
82 83 84 85 86 87
                logger.info("new container will be created if it doesn't "
                            "exist: %r", msg)
                pid, container = self.container_manager.create(msg)
                cid = container.uuid
                clientid = container.clientids[pid]

88 89
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
90 91
                          'uuid': cid,
                          'clientid': clientid,
92
                          'errno': 0 if container else -1,
93
                          'pid': pid,
94
                          }
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112

                if len(container.processes.keys()) == 1:
                    update['event'] = 'start'
                    if container.power['policy']:
                        container.power['manager'] = PowerPolicyManager(
                                container.resources['cpus'],
                                container.power['policy'],
                                float(container.power['damper']),
                                float(container.power['slowdown']))
                    if container.power['profile']:
                        p = container.power['profile']
                        p['start'] = self.machine_info['energy']['energy']
                        p['start']['time'] = self.machine_info['time']
                    update['power'] = container.power['policy']

                else:
                    update['event'] = 'process_start'

113
                # setup io callbacks
114 115 116 117 118 119
                outcb = partial(self.do_children_io, clientid, cid, 'stdout')
                errcb = partial(self.do_children_io, clientid, cid, 'stderr')
                container.processes[pid].stdout.read_until_close(outcb, outcb)
                container.processes[pid].stderr.read_until_close(errcb, errcb)

                self.upstream_pub.send_json(update)
120
            elif command == 'kill':
121
                logger.info("asked to kill container: %r", msg)
122 123
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
124
            elif command == 'list':
125
                logger.info("asked for container list: %r", msg)
126 127 128 129 130 131
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
132
            else:
133
                logger.error("invalid command: %r", command)
134

135
    def do_children_io(self, clientid, uuid, io, data):
136 137 138 139 140 141 142
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
143
                  'clientid': clientid,
144 145 146 147
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

148
    def do_sensor(self):
149
        self.machine_info = self.sensor_manager.do_update()
150
        logger.info("current state: %r", self.machine_info)
151
        total_power = self.machine_info['energy']['power']['total']
152 153 154 155 156
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
157
        logger.info("sending sensor message: %r", msg)
158 159

    def do_control(self):
160 161
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
162
        if action:
163 164
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
165 166 167
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
168 169

    def do_signal(self, signum, frame):
170 171 172 173 174
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
175
            logger.error("wrong signal: %d", signum)
176 177 178 179 180

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
181 182
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
183 184 185 186
                    break
            except OSError:
                break

187
            logger.info("child update %d: %r", pid, status)
188
            # check if its a pid we care about
189
            if pid in self.container_manager.pids:
190
                # check if this is an exit
191
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
192
                    container = self.container_manager.pids[pid]
193 194 195
                    clientid = container.clientids[pid]
                    remaining_pids = [p for p in container.processes.keys()
                                      if p != pid]
196 197
                    msg = {'type': 'container',
                           'status': status,
198
                           'uuid': container.uuid,
199
                           'clientid': clientid,
200
                           }
201 202 203 204 205 206 207

                    if not remaining_pids:
                        msg['event'] = 'exit'
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
208 209 210 211 212 213 214 215 216 217 218 219 220
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
221 222 223 224 225 226 227 228 229
                        self.container_manager.delete(container.uuid)
                    else:
                        msg['event'] = 'process_exit'
                        # Remove the pid of process that is finished
                        container.processes.pop(pid, None)
                        self.container_manager.pids.pop(pid, None)
                        logger.info("Process %s in Container %s has finised.",
                                    pid, container.uuid)

230
                    self.upstream_pub.send_json(msg)
231
            else:
232
                logger.debug("child update ignored")
233
                pass
234 235

    def do_shutdown(self):
236
        self.sensor_manager.stop()
237 238 239
        ioloop.IOLoop.current().stop()

    def main(self):
240
        # Bind address for downstream clients
241
        bind_address = '*'
242

243 244 245 246
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
247

248
        # setup application listening socket
249
        context = zmq.Context()
250 251
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
252 253 254
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

255 256
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
257
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
258
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
259

260 261 262 263
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
264
        upstream_pub_socket.bind(upstream_pub_param)
265
        upstream_sub_socket.bind(upstream_sub_param)
266
        upstream_sub_filter = ""
267 268
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

269 270
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
271 272
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
273 274

        # register socket triggers
275 276
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
277 278 279 280
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
281
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
282

283
        # create managers
284 285
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
286
        self.application_manager = ApplicationManager()
287 288 289 290 291 292 293
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
294 295 296 297

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
298 299 300 301 302 303

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
304
        signal.signal(signal.SIGCHLD, self.do_signal)
305 306 307 308 309 310 311 312 313

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()