daemon.py 10.8 KB
Newer Older
1 2
from __future__ import print_function

3 4
from containers import ContainerManager
from resources import ResourceManager
5
from functools import partial
6
import json
7
import logging
8 9
import os
import sensor
10 11 12
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
13

14

15 16 17 18 19 20 21 22
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
                         's_ask_i': {'done': 'stable', 'max': 'max'},
                         's_ask_d': {'done': 'stable', 'min': 'min'},
                         'max': {'d': 'max_ask_d'},
                         'min': {'i': 'min_ask_i'},
                         'max_ask_d': {'done': 'stable', 'min': 'nop'},
                         'min_ask_i': {'done': 'stable', 'max': 'nop'},
                         'nop': {}}
23

24 25
logger = logging.getLogger('nrm')

26

27
class Application(object):
28 29 30 31 32
    def __init__(self, identity):
        self.identity = identity
        self.state = 'stable'

    def do_transition(self, msg):
33
        transitions = application_fsm_table[self.state]
34 35 36 37 38 39
        if msg in transitions:
            self.state = transitions[msg]
        else:
            pass

    def get_allowed_requests(self):
40
        return application_fsm_table[self.state].keys()
41 42 43 44


class Daemon(object):
    def __init__(self):
45
        self.applications = {}
46
        self.buf = ''
47
        self.target = 1.0
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
                logger.info("new application: %r", msg)
                identity = msg['uuid']
64
                self.applications[identity] = Application(identity)
65 66 67 68 69 70 71 72 73
            elif event == 'threads':
                logger.info("change in threads")
                application = self.applications[msg['uuid']]
                application.do_transition(msg['payload'])
            elif event == 'progress':
                logger.info("new progress")
            else:
                logger.error("unknown event: %r", event)
                return
74

75
    def do_upstream_receive(self, parts):
76
        logger.info("receiving upstream message: %r", parts)
77
        if len(parts) != 1:
78
            logger.error("unexpected msg length, dropping it: %r", parts)
79 80
            return
        msg = json.loads(parts[0])
81 82 83 84
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
85
                logger.error("missing command in message: %r", msg)
86 87 88
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
89
                logger.info("new target measure: %g", self.target)
90
            elif command == 'run':
91 92 93 94 95 96
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

97
                logger.info("new container required: %r", msg)
98
                container = self.container_manager.create(msg)
99 100 101
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
102 103 104
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
105 106
                          }
                self.upstream_pub.send_json(update)
107
                # setup io callbacks
108 109
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
110 111
                container.process.stdout.read_until_close(outcb, outcb)
                container.process.stderr.read_until_close(errcb, outcb)
Swann Perarnau's avatar
Swann Perarnau committed
112
            elif command == 'kill':
113
                logger.info("asked to kill container: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
114 115
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
116
            elif command == 'list':
117
                logger.info("asked for container list: %r", msg)
118 119 120 121 122 123
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
124
            else:
125
                logger.error("invalid command: %r", command)
126

127 128 129 130 131 132 133 134 135 136 137 138
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

139
    def do_sensor(self):
140
        self.machine_info = self.sensor.do_update()
141
        logger.info("current state: %r", self.machine_info)
142
        total_power = self.machine_info['energy']['power']['total']
143 144 145 146 147
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
148
        logger.info("sending sensor message: %r", msg)
149 150

    def do_control(self):
151
        total_power = self.machine_info['energy']['power']['total']
152

153
        for identity, application in self.applications.iteritems():
154
            if total_power < self.target:
155
                if 'i' in application.get_allowed_requests():
156
                    self.downstream.send_multipart([identity, 'i'])
157
                    application.do_transition('i')
158
            elif total_power > self.target:
159
                if 'd' in application.get_allowed_requests():
160
                    self.downstream.send_multipart([identity, 'd'])
161
                    application.do_transition('d')
162 163
            else:
                pass
164
            logger.info("application now in state: %s", application.state)
165 166

    def do_signal(self, signum, frame):
167 168 169 170 171
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
172
            logger.error("wrong signal: %d", signum)
173 174 175 176 177

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
178 179
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
180 181 182 183
                    break
            except OSError:
                break

184
            logger.info("child update %d: %r", pid, status)
185
            # check if its a pid we care about
186
            if pid in self.container_manager.pids:
187
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
188
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
189 190
                    container = self.container_manager.pids[pid]
                    self.container_manager.delete(container.uuid)
191 192 193
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
194
                           'uuid': container.uuid,
195 196
                           }
                    self.upstream_pub.send_json(msg)
197
            else:
198
                logger.debug("child update ignored")
199
                pass
200 201

    def do_shutdown(self):
202
        self.sensor.stop()
203 204 205
        ioloop.IOLoop.current().stop()

    def main(self):
206
        # Bind address for downstream clients
207
        bind_address = '*'
208

209 210 211 212
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
213

214
        # setup application listening socket
215
        context = zmq.Context()
216 217
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
218 219 220
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

221 222
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
223
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
224
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
225

226 227 228 229
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
230
        upstream_pub_socket.bind(upstream_pub_param)
231
        upstream_sub_socket.bind(upstream_sub_param)
232
        upstream_sub_filter = ""
233 234
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

235 236
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
237 238
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
239 240

        # register socket triggers
241 242
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
243 244 245 246
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
247
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
248

249 250 251
        # create resource and container manager
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
252

253 254
        # create sensor manager and make first measurement
        self.sensor = sensor.SensorManager()
255
        self.sensor.start()
256
        self.machine_info = self.sensor.do_update()
257 258 259 260

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
261 262 263 264 265 266

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
267
        signal.signal(signal.SIGCHLD, self.do_signal)
268 269 270 271 272 273 274 275 276

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()