daemon.py 9.91 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller
6
from functools import partial
7
import json
8
import logging
9
import os
10
from resources import ResourceManager
11
import sensor
12 13 14
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
15

16

17 18
logger = logging.getLogger('nrm')

19 20 21

class Daemon(object):
    def __init__(self):
22
        self.target = 1.0
23

24 25 26 27 28 29 30 31 32 33 34 35 36
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
37
                self.application_manager.register(msg)
38
            elif event == 'threads':
39 40 41 42
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
43
            elif event == 'progress':
44 45 46 47 48 49
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
            elif event == 'exit':
                self.application_manager.delete(msg['uuid'])
50 51 52
            else:
                logger.error("unknown event: %r", event)
                return
53

Swann Perarnau's avatar
Swann Perarnau committed
54
    def do_upstream_receive(self, parts):
55
        logger.info("receiving upstream message: %r", parts)
56
        if len(parts) != 1:
57
            logger.error("unexpected msg length, dropping it: %r", parts)
58 59
            return
        msg = json.loads(parts[0])
60 61 62 63
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
64
                logger.error("missing command in message: %r", msg)
65 66 67
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
68
                logger.info("new target measure: %g", self.target)
69
            elif command == 'run':
70 71 72 73 74 75
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

76
                logger.info("new container required: %r", msg)
77
                container = self.container_manager.create(msg)
78 79 80
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
81 82 83
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
84 85
                          }
                self.upstream_pub.send_json(update)
86
                # setup io callbacks
87 88
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
89
                container.process.stdout.read_until_close(outcb, outcb)
90
                container.process.stderr.read_until_close(errcb, errcb)
Swann Perarnau's avatar
Swann Perarnau committed
91
            elif command == 'kill':
92
                logger.info("asked to kill container: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
93 94
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
95
            elif command == 'list':
96
                logger.info("asked for container list: %r", msg)
97 98 99 100 101 102
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
103
            else:
104
                logger.error("invalid command: %r", command)
105

106 107 108 109 110 111 112 113 114 115 116 117
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

118
    def do_sensor(self):
119
        self.machine_info = self.sensor.do_update()
120
        logger.info("current state: %r", self.machine_info)
121
        total_power = self.machine_info['energy']['power']['total']
122 123 124 125 126
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
127
        logger.info("sending sensor message: %r", msg)
128 129

    def do_control(self):
130 131 132 133 134
        action = self.controller.planify(self.target, self.machine_info)
        if action:
            msg = self.controller.execute(action)
            self.downstream_pub.send_json(msg)
            self.controller.update(action, msg)
135 136

    def do_signal(self, signum, frame):
137 138 139 140 141
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
142
            logger.error("wrong signal: %d", signum)
143 144 145 146 147

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
148 149
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
150 151 152 153
                    break
            except OSError:
                break

154
            logger.info("child update %d: %r", pid, status)
155
            # check if its a pid we care about
156
            if pid in self.container_manager.pids:
157
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
158
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
159 160
                    container = self.container_manager.pids[pid]
                    self.container_manager.delete(container.uuid)
161 162 163
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
164
                           'uuid': container.uuid,
165 166
                           }
                    self.upstream_pub.send_json(msg)
167
            else:
168
                logger.debug("child update ignored")
169
                pass
170 171

    def do_shutdown(self):
172
        self.sensor.stop()
173 174 175
        ioloop.IOLoop.current().stop()

    def main(self):
176
        # Bind address for downstream clients
177
        bind_address = '*'
178

179 180 181 182
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
183

184
        # setup application listening socket
185
        context = zmq.Context()
186 187
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
188 189 190
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

191 192
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
193
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
194
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
195

196 197 198 199
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
200
        upstream_pub_socket.bind(upstream_pub_param)
201
        upstream_sub_socket.bind(upstream_sub_param)
202
        upstream_sub_filter = ""
203 204
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

205 206
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
207 208
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
209 210

        # register socket triggers
211 212
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
213 214 215 216
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
217
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
218

219
        # create managers
220 221
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
222
        self.application_manager = ApplicationManager()
223 224 225
        self.controller = Controller(self.application_manager,
                                     self.container_manager,
                                     self.resource_manager)
226

227 228
        # create sensor manager and make first measurement
        self.sensor = sensor.SensorManager()
229
        self.sensor.start()
230
        self.machine_info = self.sensor.do_update()
231 232 233 234

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
235 236 237 238 239 240

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
241
        signal.signal(signal.SIGCHLD, self.do_signal)
242 243 244 245 246 247 248 249 250

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()