daemon.py 10.6 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from functools import partial
6
import json
7
import logging
8
import os
9
from resources import ResourceManager
10
import sensor
11 12 13
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
14

15

16 17
logger = logging.getLogger('nrm')

18 19 20

class Daemon(object):
    def __init__(self):
21
        self.target = 1.0
22

23 24 25 26 27 28 29 30 31 32 33 34 35
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
36
                self.application_manager.register(msg)
37
            elif event == 'threads':
38 39 40 41
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
42
            elif event == 'progress':
43 44 45 46 47 48
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
            elif event == 'exit':
                self.application_manager.delete(msg['uuid'])
49 50 51
            else:
                logger.error("unknown event: %r", event)
                return
52

Swann Perarnau's avatar
Swann Perarnau committed
53
    def do_upstream_receive(self, parts):
54
        logger.info("receiving upstream message: %r", parts)
55
        if len(parts) != 1:
56
            logger.error("unexpected msg length, dropping it: %r", parts)
57 58
            return
        msg = json.loads(parts[0])
59 60 61 62
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
63
                logger.error("missing command in message: %r", msg)
64 65 66
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
67
                logger.info("new target measure: %g", self.target)
68
            elif command == 'run':
69 70 71 72 73 74
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

75
                logger.info("new container required: %r", msg)
76
                container = self.container_manager.create(msg)
77 78 79
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
80 81 82
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
83 84
                          }
                self.upstream_pub.send_json(update)
85
                # setup io callbacks
86 87
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
88
                container.process.stdout.read_until_close(outcb, outcb)
89
                container.process.stderr.read_until_close(errcb, errcb)
Swann Perarnau's avatar
Swann Perarnau committed
90
            elif command == 'kill':
91
                logger.info("asked to kill container: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
92 93
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
94
            elif command == 'list':
95
                logger.info("asked for container list: %r", msg)
96 97 98 99 100 101
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
102
            else:
103
                logger.error("invalid command: %r", command)
104

105 106 107 108 109 110 111 112 113 114 115 116
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

117
    def do_sensor(self):
118
        self.machine_info = self.sensor.do_update()
119
        logger.info("current state: %r", self.machine_info)
120
        total_power = self.machine_info['energy']['power']['total']
121 122 123 124 125
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
126
        logger.info("sending sensor message: %r", msg)
127 128

    def do_control(self):
129
        total_power = self.machine_info['energy']['power']['total']
130

131 132 133 134 135 136 137
        for identity, application in \
                self.application_manager.applications.iteritems():
            update = {'type': 'application',
                      'command': 'threads',
                      'uuid': identity,
                      'event': 'threads',
                      }
138
            if total_power < self.target:
139 140 141 142
                if 'i' in application.get_allowed_thread_requests():
                    update['payload'] = application.threads['cur'] + 1
                    self.downstream_pub.send_json(update)
                    application.do_thread_transition('i')
143
            elif total_power > self.target:
144 145 146 147
                if 'd' in application.get_allowed_thread_requests():
                    update['payload'] = application.threads['cur'] - 1
                    self.downstream_pub.send_json(update)
                    application.do_thread_transition('d')
148
            else:
149 150 151
                continue
            logger.info("application now in state: %s",
                        application.thread_state)
152 153

    def do_signal(self, signum, frame):
154 155 156 157 158
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
159
            logger.error("wrong signal: %d", signum)
160 161 162 163 164

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
165 166
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
167 168 169 170
                    break
            except OSError:
                break

171
            logger.info("child update %d: %r", pid, status)
172
            # check if its a pid we care about
173
            if pid in self.container_manager.pids:
174
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
175
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
176 177
                    container = self.container_manager.pids[pid]
                    self.container_manager.delete(container.uuid)
178 179 180
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
181
                           'uuid': container.uuid,
182 183
                           }
                    self.upstream_pub.send_json(msg)
184
            else:
185
                logger.debug("child update ignored")
186
                pass
187 188

    def do_shutdown(self):
189
        self.sensor.stop()
190 191 192
        ioloop.IOLoop.current().stop()

    def main(self):
193
        # Bind address for downstream clients
194
        bind_address = '*'
195

196 197 198 199
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
200

201
        # setup application listening socket
202
        context = zmq.Context()
203 204
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
205 206 207
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

208 209
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
210
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
211
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
212

213 214 215 216
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
217
        upstream_pub_socket.bind(upstream_pub_param)
218
        upstream_sub_socket.bind(upstream_sub_param)
219
        upstream_sub_filter = ""
220 221
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

222 223
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
224 225
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
226 227

        # register socket triggers
228 229
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
230 231 232 233
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
234
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
235

236
        # create managers
237 238
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
239
        self.application_manager = ApplicationManager()
240

241 242
        # create sensor manager and make first measurement
        self.sensor = sensor.SensorManager()
243
        self.sensor.start()
244
        self.machine_info = self.sensor.do_update()
245 246 247 248

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
249 250 251 252 253 254

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
255
        signal.signal(signal.SIGCHLD, self.do_signal)
256 257 258 259 260 261 262 263 264

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()