daemon.py 11.3 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16

17

18 19
logger = logging.getLogger('nrm')

20 21 22

class Daemon(object):
    def __init__(self):
23
        self.target = 100.0
24

25 26 27 28 29 30 31 32 33 34 35 36 37
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
38 39 40
                container_uuid = msg['container']
                container = self.container_manager.containers[container_uuid]
                self.application_manager.register(msg, container)
41
            elif event == 'threads':
42 43 44 45
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
46
            elif event == 'progress':
47 48 49 50
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
51
            elif event == 'phase_context':
52 53
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
54 55 56 57
                    container = self.container_manager.containers[uuid]
                    if container.powerpolicy['policy'] != "NONE":
                        app = self.application_manager.applications[uuid]
                        app.update_phase_context(self, msg)
58 59
            elif event == 'exit':
                self.application_manager.delete(msg['uuid'])
60 61 62
            else:
                logger.error("unknown event: %r", event)
                return
63

Swann Perarnau's avatar
Swann Perarnau committed
64
    def do_upstream_receive(self, parts):
65
        logger.info("receiving upstream message: %r", parts)
66
        if len(parts) != 1:
67
            logger.error("unexpected msg length, dropping it: %r", parts)
68 69
            return
        msg = json.loads(parts[0])
70 71 72 73
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
74
                logger.error("missing command in message: %r", msg)
75 76 77
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
78
                logger.info("new target measure: %g", self.target)
79
            elif command == 'run':
80 81 82 83 84 85
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

86
                logger.info("new container required: %r", msg)
87
                container = self.container_manager.create(msg)
88 89 90 91
                if container.powerpolicy['policy'] != "NONE":
                    container.powerpolicy['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.powerpolicy['policy'],
92 93
                            float(container.powerpolicy['damper']),
                            float(container.powerpolicy['slowdown']))
94 95 96
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
97 98 99
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
100
                          'powerpolicy': container.powerpolicy['policy']
101 102
                          }
                self.upstream_pub.send_json(update)
103
                # setup io callbacks
104 105
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
106
                container.process.stdout.read_until_close(outcb, outcb)
107
                container.process.stderr.read_until_close(errcb, errcb)
Swann Perarnau's avatar
Swann Perarnau committed
108
            elif command == 'kill':
109
                logger.info("asked to kill container: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
110 111
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
112
            elif command == 'list':
113
                logger.info("asked for container list: %r", msg)
114 115 116 117 118 119
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
120
            else:
121
                logger.error("invalid command: %r", command)
122

123 124 125 126 127 128 129 130 131 132 133 134
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

135
    def do_sensor(self):
136
        self.machine_info = self.sensor_manager.do_update()
137
        logger.info("current state: %r", self.machine_info)
138
        total_power = self.machine_info['energy']['power']['total']
139 140 141 142 143
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
144
        logger.info("sending sensor message: %r", msg)
145 146

    def do_control(self):
147 148
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
149
        if action:
150 151
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
152 153 154
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
155 156

    def do_signal(self, signum, frame):
157 158 159 160 161
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
162
            logger.error("wrong signal: %d", signum)
163 164 165 166 167

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
168 169
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
170 171 172 173
                    break
            except OSError:
                break

174
            logger.info("child update %d: %r", pid, status)
175
            # check if its a pid we care about
176
            if pid in self.container_manager.pids:
177
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
178
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
179
                    container = self.container_manager.pids[pid]
180 181
                    if container.powerpolicy['policy'] != "NONE":
                        container.powerpolicy['manager'].reset_all()
182
                    self.container_manager.delete(container.uuid)
183 184 185
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
186
                           'uuid': container.uuid,
187 188
                           }
                    self.upstream_pub.send_json(msg)
189
            else:
190
                logger.debug("child update ignored")
191
                pass
192 193

    def do_shutdown(self):
194
        self.sensor_manager.stop()
195 196 197
        ioloop.IOLoop.current().stop()

    def main(self):
198
        # Bind address for downstream clients
199
        bind_address = '*'
200

201 202 203 204
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
205

206
        # setup application listening socket
207
        context = zmq.Context()
208 209
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
210 211 212
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

213 214
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
215
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
216
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
217

218 219 220 221
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
222
        upstream_pub_socket.bind(upstream_pub_param)
223
        upstream_sub_socket.bind(upstream_sub_param)
224
        upstream_sub_filter = ""
225 226
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

227 228
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
229 230
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
231 232

        # register socket triggers
233 234
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
235 236 237 238
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
239
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
240

241
        # create managers
242 243
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
244
        self.application_manager = ApplicationManager()
245 246 247 248 249 250 251
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
252 253 254 255

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
256 257 258 259 260 261

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
262
        signal.signal(signal.SIGCHLD, self.do_signal)
263 264 265 266 267 268 269 270 271

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()