daemon.py 11.4 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16

17

18 19
logger = logging.getLogger('nrm')

20 21 22

class Daemon(object):
    def __init__(self):
23
        self.target = 100.0
24

25 26 27 28 29 30 31 32 33 34 35 36 37
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
38 39 40
                container_uuid = msg['container']
                container = self.container_manager.containers[container_uuid]
                self.application_manager.register(msg, container)
41
            elif event == 'threads':
42 43 44 45
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
46
            elif event == 'progress':
47 48 49 50
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
51
            elif event == 'phase_context':
52 53 54
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
55 56 57
                    c = self.container_manager.containers[app.container_uuid]
                    if c.powerpolicy['policy'] != "NONE":
                        app.update_phase_context(msg)
58
            elif event == 'exit':
59 60 61
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
62 63 64
            else:
                logger.error("unknown event: %r", event)
                return
65

Swann Perarnau's avatar
Swann Perarnau committed
66
    def do_upstream_receive(self, parts):
67
        logger.info("receiving upstream message: %r", parts)
68
        if len(parts) != 1:
69
            logger.error("unexpected msg length, dropping it: %r", parts)
70 71
            return
        msg = json.loads(parts[0])
72 73 74 75
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
76
                logger.error("missing command in message: %r", msg)
77 78 79
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
80
                logger.info("new target measure: %g", self.target)
81
            elif command == 'run':
82 83 84 85 86 87
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

88
                logger.info("new container required: %r", msg)
89
                container = self.container_manager.create(msg)
90 91 92 93
                if container.powerpolicy['policy'] != "NONE":
                    container.powerpolicy['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.powerpolicy['policy'],
94 95
                            float(container.powerpolicy['damper']),
                            float(container.powerpolicy['slowdown']))
96 97 98
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
99 100 101
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
102
                          'powerpolicy': container.powerpolicy['policy']
103 104
                          }
                self.upstream_pub.send_json(update)
105
                # setup io callbacks
106 107
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
108
                container.process.stdout.read_until_close(outcb, outcb)
109
                container.process.stderr.read_until_close(errcb, errcb)
Swann Perarnau's avatar
Swann Perarnau committed
110
            elif command == 'kill':
111
                logger.info("asked to kill container: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
112 113
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
114
            elif command == 'list':
115
                logger.info("asked for container list: %r", msg)
116 117 118 119 120 121
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
122
            else:
123
                logger.error("invalid command: %r", command)
124

125 126 127 128 129 130 131 132 133 134 135 136
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

137
    def do_sensor(self):
138
        self.machine_info = self.sensor_manager.do_update()
139
        logger.info("current state: %r", self.machine_info)
140
        total_power = self.machine_info['energy']['power']['total']
141 142 143 144 145
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
146
        logger.info("sending sensor message: %r", msg)
147 148

    def do_control(self):
149 150
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
151
        if action:
152 153
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
154 155 156
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
157 158

    def do_signal(self, signum, frame):
159 160 161 162 163
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
164
            logger.error("wrong signal: %d", signum)
165 166 167 168 169

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
170 171
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
172 173 174 175
                    break
            except OSError:
                break

176
            logger.info("child update %d: %r", pid, status)
177
            # check if its a pid we care about
178
            if pid in self.container_manager.pids:
179
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
180
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
181
                    container = self.container_manager.pids[pid]
182 183
                    if container.powerpolicy['policy'] != "NONE":
                        container.powerpolicy['manager'].reset_all()
184
                    self.container_manager.delete(container.uuid)
185 186 187
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
188
                           'uuid': container.uuid,
189 190
                           }
                    self.upstream_pub.send_json(msg)
191
            else:
192
                logger.debug("child update ignored")
193
                pass
194 195

    def do_shutdown(self):
196
        self.sensor_manager.stop()
197 198 199
        ioloop.IOLoop.current().stop()

    def main(self):
200
        # Bind address for downstream clients
201
        bind_address = '*'
202

203 204 205 206
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
207

208
        # setup application listening socket
209
        context = zmq.Context()
210 211
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
212 213 214
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

215 216
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
217
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
218
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
219

220 221 222 223
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
224
        upstream_pub_socket.bind(upstream_pub_param)
225
        upstream_sub_socket.bind(upstream_sub_param)
226
        upstream_sub_filter = ""
227 228
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

229 230
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
231 232
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
233 234

        # register socket triggers
235 236
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
237 238 239 240
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
241
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
242

243
        # create managers
244 245
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
246
        self.application_manager = ApplicationManager()
247 248 249 250 251 252 253
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
254 255 256 257

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
258 259 260 261 262 263

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
264
        signal.signal(signal.SIGCHLD, self.do_signal)
265 266 267 268 269 270 271 272 273

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()