daemon.py 10.3 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from functools import partial
7
import json
8
import logging
9
import os
10
from resources import ResourceManager
11
from sensor import SensorManager
12 13 14
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
15

16

17 18
logger = logging.getLogger('nrm')

19 20 21

class Daemon(object):
    def __init__(self):
22
        self.target = 100.0
23

24 25 26 27 28 29 30 31 32 33 34 35 36
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
37
                self.application_manager.register(msg)
38
            elif event == 'threads':
39 40 41 42
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
43
            elif event == 'progress':
44 45 46 47
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
48 49 50 51 52
            elif event == 'power_policy':
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    # TODO: Invoke appropriate power policy
53 54
            elif event == 'exit':
                self.application_manager.delete(msg['uuid'])
55 56 57
            else:
                logger.error("unknown event: %r", event)
                return
58

59
    def do_upstream_receive(self, parts):
60
        logger.info("receiving upstream message: %r", parts)
61
        if len(parts) != 1:
62
            logger.error("unexpected msg length, dropping it: %r", parts)
63 64
            return
        msg = json.loads(parts[0])
65 66 67 68
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
69
                logger.error("missing command in message: %r", msg)
70 71 72
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
73
                logger.info("new target measure: %g", self.target)
74
            elif command == 'run':
75 76 77 78 79 80
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

81
                logger.info("new container required: %r", msg)
82
                container = self.container_manager.create(msg)
83 84 85
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
86 87 88
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
89
                          'powerpolicy': container.powerpolicy['policy']
90 91
                          }
                self.upstream_pub.send_json(update)
92
                # setup io callbacks
93 94
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
95
                container.process.stdout.read_until_close(outcb, outcb)
96
                container.process.stderr.read_until_close(errcb, errcb)
Swann Perarnau's avatar
Swann Perarnau committed
97
            elif command == 'kill':
98
                logger.info("asked to kill container: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
99 100
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
101
            elif command == 'list':
102
                logger.info("asked for container list: %r", msg)
103 104 105 106 107 108
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
109
            else:
110
                logger.error("invalid command: %r", command)
111

112 113 114 115 116 117 118 119 120 121 122 123
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

124
    def do_sensor(self):
125
        self.machine_info = self.sensor_manager.do_update()
126
        logger.info("current state: %r", self.machine_info)
127
        total_power = self.machine_info['energy']['power']['total']
128 129 130 131 132
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
133
        logger.info("sending sensor message: %r", msg)
134 135

    def do_control(self):
136 137
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
138
        if action:
139 140
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
141 142

    def do_signal(self, signum, frame):
143 144 145 146 147
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
148
            logger.error("wrong signal: %d", signum)
149 150 151 152 153

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
154 155
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
156 157 158 159
                    break
            except OSError:
                break

160
            logger.info("child update %d: %r", pid, status)
161
            # check if its a pid we care about
162
            if pid in self.container_manager.pids:
163
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
164
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
165 166
                    container = self.container_manager.pids[pid]
                    self.container_manager.delete(container.uuid)
167 168 169
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
170
                           'uuid': container.uuid,
171 172
                           }
                    self.upstream_pub.send_json(msg)
173
            else:
174
                logger.debug("child update ignored")
175
                pass
176 177

    def do_shutdown(self):
178
        self.sensor_manager.stop()
179 180 181
        ioloop.IOLoop.current().stop()

    def main(self):
182
        # Bind address for downstream clients
183
        bind_address = '*'
184

185 186 187 188
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
189

190
        # setup application listening socket
191
        context = zmq.Context()
192 193
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
194 195 196
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

197 198
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
199
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
200
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
201

202 203 204 205
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
206
        upstream_pub_socket.bind(upstream_pub_param)
207
        upstream_sub_socket.bind(upstream_sub_param)
208
        upstream_sub_filter = ""
209 210
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

211 212
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
213 214
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
215 216

        # register socket triggers
217 218
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
219 220 221 222
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
223
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
224

225
        # create managers
226 227
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
228
        self.application_manager = ApplicationManager()
229 230 231 232 233 234 235
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
236 237 238 239

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
240 241 242 243 244 245

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
246
        signal.signal(signal.SIGCHLD, self.do_signal)
247 248 249 250 251 252 253 254 255

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()