daemon.py 10.7 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16

17

18 19
logger = logging.getLogger('nrm')

20 21 22

class Daemon(object):
    def __init__(self):
23
        self.target = 100.0
24

25 26 27 28 29 30 31 32 33 34 35 36 37
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
38
                self.application_manager.register(msg)
39
            elif event == 'threads':
40 41 42 43
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
44
            elif event == 'progress':
45 46 47 48
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
49
            elif event == 'phase_context':
50 51 52
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
53
                    # TODO: Take appropriate action
54 55
            elif event == 'exit':
                self.application_manager.delete(msg['uuid'])
56 57 58
            else:
                logger.error("unknown event: %r", event)
                return
59

60
    def do_upstream_receive(self, parts):
61
        logger.info("receiving upstream message: %r", parts)
62
        if len(parts) != 1:
63
            logger.error("unexpected msg length, dropping it: %r", parts)
64 65
            return
        msg = json.loads(parts[0])
66 67 68 69
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
70
                logger.error("missing command in message: %r", msg)
71 72 73
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
74
                logger.info("new target measure: %g", self.target)
75
            elif command == 'run':
76 77 78 79 80 81
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

82
                logger.info("new container required: %r", msg)
83
                container = self.container_manager.create(msg)
84 85 86 87 88 89
                if container.powerpolicy['policy']:
                    container.powerpolicy['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.powerpolicy['policy'],
                            container.powerpolicy['damper'],
                            container.powerpolicy['slowdown'])
90 91 92
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
93 94 95
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
96
                          'powerpolicy': container.powerpolicy['policy']
97 98
                          }
                self.upstream_pub.send_json(update)
99
                # setup io callbacks
100 101
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
102
                container.process.stdout.read_until_close(outcb, outcb)
103
                container.process.stderr.read_until_close(errcb, errcb)
104
            elif command == 'kill':
105
                logger.info("asked to kill container: %r", msg)
106 107
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
108
            elif command == 'list':
109
                logger.info("asked for container list: %r", msg)
110 111 112 113 114 115
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
116
            else:
117
                logger.error("invalid command: %r", command)
118

119 120 121 122 123 124 125 126 127 128 129 130
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

131
    def do_sensor(self):
132
        self.machine_info = self.sensor_manager.do_update()
133
        logger.info("current state: %r", self.machine_info)
134
        total_power = self.machine_info['energy']['power']['total']
135 136 137 138 139
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
140
        logger.info("sending sensor message: %r", msg)
141 142

    def do_control(self):
143 144
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
145
        if action:
146 147
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
148 149

    def do_signal(self, signum, frame):
150 151 152 153 154
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
155
            logger.error("wrong signal: %d", signum)
156 157 158 159 160

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
161 162
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
163 164 165 166
                    break
            except OSError:
                break

167
            logger.info("child update %d: %r", pid, status)
168
            # check if its a pid we care about
169
            if pid in self.container_manager.pids:
170
                # check if this is an exit
171
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
172 173
                    container = self.container_manager.pids[pid]
                    self.container_manager.delete(container.uuid)
174 175 176
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
177
                           'uuid': container.uuid,
178 179
                           }
                    self.upstream_pub.send_json(msg)
180
            else:
181
                logger.debug("child update ignored")
182
                pass
183 184

    def do_shutdown(self):
185
        self.sensor_manager.stop()
186 187 188
        ioloop.IOLoop.current().stop()

    def main(self):
189
        # Bind address for downstream clients
190
        bind_address = '*'
191

192 193 194 195
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
196

197
        # setup application listening socket
198
        context = zmq.Context()
199 200
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
201 202 203
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

204 205
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
206
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
207
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
208

209 210 211 212
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
213
        upstream_pub_socket.bind(upstream_pub_param)
214
        upstream_sub_socket.bind(upstream_sub_param)
215
        upstream_sub_filter = ""
216 217
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

218 219
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
220 221
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
222 223

        # register socket triggers
224 225
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
226 227 228 229
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
230
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
231

232
        # create managers
233 234
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
235
        self.application_manager = ApplicationManager()
236 237 238 239 240 241 242
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
243 244 245 246

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
247 248 249 250 251 252

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
253
        signal.signal(signal.SIGCHLD, self.do_signal)
254 255 256 257 258 259 260 261 262

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()