daemon.py 12.4 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16

17

18 19
logger = logging.getLogger('nrm')

20 21 22

class Daemon(object):
    def __init__(self):
23
        self.target = 100.0
24

25 26 27 28 29 30 31 32 33 34 35 36 37
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
38 39 40
                container_uuid = msg['container']
                container = self.container_manager.containers[container_uuid]
                self.application_manager.register(msg, container)
41
            elif event == 'threads':
42 43 44 45
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
46
            elif event == 'progress':
47 48 49 50
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
51
            elif event == 'phase_context':
52 53 54
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
55
                    c = self.container_manager.containers[app.container_uuid]
56
                    if c.power['policy']:
57
                        app.update_phase_context(msg)
58
            elif event == 'exit':
59 60 61
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
62 63 64
            else:
                logger.error("unknown event: %r", event)
                return
65

Swann Perarnau's avatar
Swann Perarnau committed
66
    def do_upstream_receive(self, parts):
67
        logger.info("receiving upstream message: %r", parts)
68
        if len(parts) != 1:
69
            logger.error("unexpected msg length, dropping it: %r", parts)
70 71
            return
        msg = json.loads(parts[0])
72 73 74 75
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
76
                logger.error("missing command in message: %r", msg)
77 78 79
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
80
                logger.info("new target measure: %g", self.target)
81
            elif command == 'run':
82 83 84 85 86 87
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

88
                logger.info("new container required: %r", msg)
89
                container = self.container_manager.create(msg)
90 91
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
92
                            container.resources['cpus'],
93 94 95
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
96 97 98 99
                if container.power['profile']:
                            p = container.power['profile']
                            p['start'] = self.machine_info['energy']['energy']
                            p['start']['time'] = self.machine_info['time']
100 101 102
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
103 104 105
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
106
                          'power': container.power['policy']
107 108
                          }
                self.upstream_pub.send_json(update)
109
                # setup io callbacks
110 111
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
112
                container.process.stdout.read_until_close(outcb, outcb)
113
                container.process.stderr.read_until_close(errcb, errcb)
Swann Perarnau's avatar
Swann Perarnau committed
114
            elif command == 'kill':
115
                logger.info("asked to kill container: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
116 117
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
118
            elif command == 'list':
119
                logger.info("asked for container list: %r", msg)
120 121 122 123 124 125
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
126
            else:
127
                logger.error("invalid command: %r", command)
128

129 130 131 132 133 134 135 136 137 138 139 140
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

141
    def do_sensor(self):
142
        self.machine_info = self.sensor_manager.do_update()
143
        logger.info("current state: %r", self.machine_info)
144
        total_power = self.machine_info['energy']['power']['total']
145 146 147 148 149
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
150
        logger.info("sending sensor message: %r", msg)
151 152

    def do_control(self):
153 154
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
155
        if action:
156 157
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
158 159 160
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
161 162

    def do_signal(self, signum, frame):
163 164 165 166 167
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
168
            logger.error("wrong signal: %d", signum)
169 170 171 172 173

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
174 175
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
176 177 178 179
                    break
            except OSError:
                break

180
            logger.info("child update %d: %r", pid, status)
181
            # check if its a pid we care about
182
            if pid in self.container_manager.pids:
183
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
184
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
185
                    container = self.container_manager.pids[pid]
186 187 188
                    pp = container.power
                    if pp['policy']:
                        pp['manager'].reset_all()
189 190 191
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
192
                           'uuid': container.uuid,
193
                           }
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
                    if pp['profile']:
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
                    self.container_manager.delete(container.uuid)
209
                    self.upstream_pub.send_json(msg)
210
            else:
211
                logger.debug("child update ignored")
212
                pass
213 214

    def do_shutdown(self):
215
        self.sensor_manager.stop()
216 217 218
        ioloop.IOLoop.current().stop()

    def main(self):
219
        # Bind address for downstream clients
220
        bind_address = '*'
221

222 223 224 225
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
226

227
        # setup application listening socket
228
        context = zmq.Context()
229 230
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
231 232 233
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

234 235
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
236
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
237
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
238

239 240 241 242
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
243
        upstream_pub_socket.bind(upstream_pub_param)
244
        upstream_sub_socket.bind(upstream_sub_param)
245
        upstream_sub_filter = ""
246 247
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

248 249
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
250 251
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
252 253

        # register socket triggers
254 255
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
256 257 258 259
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
260
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
261

262
        # create managers
263 264
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
265
        self.application_manager = ApplicationManager()
266 267 268 269 270 271 272
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
273 274 275 276

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
277 278 279 280 281 282

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
283
        signal.signal(signal.SIGCHLD, self.do_signal)
284 285 286 287 288 289 290 291 292

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()