daemon.py 11.2 KB
Newer Older
1 2
from __future__ import print_function

3 4
from containers import ContainerManager
from resources import ResourceManager
5
from functools import partial
6
import json
7
import logging
8
import os
9
import re
10
import sensor
11 12 13
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
14

15

16 17 18 19 20 21 22 23
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
                         's_ask_i': {'done': 'stable', 'max': 'max'},
                         's_ask_d': {'done': 'stable', 'min': 'min'},
                         'max': {'d': 'max_ask_d'},
                         'min': {'i': 'min_ask_i'},
                         'max_ask_d': {'done': 'stable', 'min': 'nop'},
                         'min_ask_i': {'done': 'stable', 'max': 'nop'},
                         'nop': {}}
24

25 26
logger = logging.getLogger('nrm')

27

28
class Application(object):
29 30 31 32 33 34 35 36 37
    def __init__(self, identity):
        self.identity = identity
        self.buf = ''
        self.state = 'stable'

    def append_buffer(self, msg):
        self.buf = self.buf + msg

    def do_transition(self, msg):
38
        transitions = application_fsm_table[self.state]
39 40 41 42 43 44
        if msg in transitions:
            self.state = transitions[msg]
        else:
            pass

    def get_allowed_requests(self):
45
        return application_fsm_table[self.state].keys()
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77

    def get_messages(self):
        buf = self.buf
        begin = 0
        off = 0
        ret = ''
        while begin < len(buf):
            if buf.startswith('min', begin):
                ret = 'min'
                off = len(ret)
            elif buf.startswith('max', begin):
                ret = 'max'
                off = len(ret)
            elif buf.startswith('done (', begin):
                n = re.split("done \((\d+)\)", buf[begin:])[1]
                ret = 'done'
                off = len('done ()') + len(n)
            else:
                m = re.match("\d+", buf[begin:])
                if m:
                    ret = 'ok'
                    off = m.end()
                else:
                    break
            begin = begin + off
            yield ret
        self.buf = buf[begin:]
        return


class Daemon(object):
    def __init__(self):
78
        self.applications = {}
79
        self.containerpids = {}
80
        self.buf = ''
81
        self.target = 1.0
82

83
    def do_application_receive(self, parts):
84
        logger.info("receiving application stream: %r", parts)
85 86 87 88
        identity = parts[0]

        if len(parts[1]) == 0:
            # empty frame, indicate connect/disconnect
89
            if identity in self.applications:
90
                logger.info("known client disconnected")
91
                del self.applications[identity]
92
            else:
93
                logger.info("new client: " + repr(identity))
94
                self.applications[identity] = Application(identity)
95
        else:
96 97 98
            if identity in self.applications:
                application = self.applications[identity]
                # we need to unpack the stream into application messages
99
                # messages can be: min, max, done (%d), %d
100 101 102
                application.append_buffer(parts[1])
                for m in application.get_messages():
                    application.do_transition(m)
103 104
                    logger.info("application now in state: %s",
                                application.state)
105

106
    def do_upstream_receive(self, parts):
107
        logger.info("receiving upstream message: %r", parts)
108
        if len(parts) != 1:
109
            logger.error("unexpected msg length, dropping it: %r", parts)
110 111
            return
        msg = json.loads(parts[0])
112 113 114 115
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
116
                logger.error("missing command in message: %r", msg)
117 118 119
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
120
                logger.info("new target measure: %g", self.target)
121
            elif command == 'run':
122
                logger.info("new container required: %r", msg)
123 124 125 126 127 128 129 130 131 132
                process = self.container_manager.create(msg)
                self.containerpids[process.pid] = msg['uuid']
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
                          'uuid': msg['uuid'],
                          'errno': 0,
                          'pid': process.pid,
                          }
                self.upstream_pub.send_json(update)
133 134 135 136 137 138 139
                # setup io callbacks
                process.stdout.read_until_close(partial(self.do_children_io,
                                                        msg['uuid'],
                                                        'stdout'))
                process.stderr.read_until_close(partial(self.do_children_io,
                                                        msg['uuid'],
                                                        'stderr'))
140
            elif command == 'kill':
141
                logger.info("asked to kill container: %r", msg)
142 143
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
144
            elif command == 'list':
145
                logger.info("asked for container list: %r", msg)
146 147 148 149 150 151
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
152
            else:
153
                logger.error("invalid command: %r", command)
154

155 156 157 158 159 160 161 162 163 164 165 166
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

167
    def do_sensor(self):
168
        self.machine_info = self.sensor.do_update()
169
        logger.info("current state: %r", self.machine_info)
170
        total_power = self.machine_info['energy']['power']['total']
171 172 173 174 175
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
176
        logger.info("sending sensor message: %r", msg)
177 178

    def do_control(self):
179
        total_power = self.machine_info['energy']['power']['total']
180

181
        for identity, application in self.applications.iteritems():
182
            if total_power < self.target:
183
                if 'i' in application.get_allowed_requests():
184
                    self.downstream.send_multipart([identity, 'i'])
185
                    application.do_transition('i')
186
            elif total_power > self.target:
187
                if 'd' in application.get_allowed_requests():
188
                    self.downstream.send_multipart([identity, 'd'])
189
                    application.do_transition('d')
190 191
            else:
                pass
192
            logger.info("application now in state: %s", application.state)
193 194

    def do_signal(self, signum, frame):
195 196 197 198 199
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
200
            logger.error("wrong signal: %d", signum)
201 202 203 204 205

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
206 207
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
208 209 210 211
                    break
            except OSError:
                break

212
            logger.info("child update %d: %r", pid, status)
213 214 215
            # check if its a pid we care about
            if pid in self.containerpids:
                # check if this is an exit
216
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
217 218 219 220 221
                    uuid = self.containerpids[pid]
                    self.container_manager.delete(uuid)
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
222
                           'uuid': uuid,
223 224
                           }
                    self.upstream_pub.send_json(msg)
225
            else:
226
                logger.debug("child update ignored")
227
                pass
228 229

    def do_shutdown(self):
230
        self.sensor.stop()
231 232 233
        ioloop.IOLoop.current().stop()

    def main(self):
234
        # Bind port for downstream clients
235
        bind_port = 1234
236
        # Bind address for downstream clients
237
        bind_address = '*'
238 239 240 241
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
242

243
        # setup application listening socket
244
        context = zmq.Context()
245 246 247 248 249 250 251 252 253 254 255
        downstream_socket = context.socket(zmq.STREAM)
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

        downstream_bind_param = "tcp://%s:%d" % (bind_address, bind_port)
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
        upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)

        downstream_socket.bind(downstream_bind_param)
        upstream_pub_socket.bind(upstream_pub_param)
        upstream_sub_socket.connect(upstream_sub_param)
256
        upstream_sub_filter = ""
257 258
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

259 260 261
        logger.info("downstream socket bound to: %s", downstream_bind_param)
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
262 263 264 265 266 267 268 269

        # register socket triggers
        self.downstream = zmqstream.ZMQStream(downstream_socket)
        self.downstream.on_recv(self.do_application_receive)
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
270

271 272 273
        # create resource and container manager
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
274

275 276
        # create sensor manager and make first measurement
        self.sensor = sensor.SensorManager()
277
        self.sensor.start()
278
        self.machine_info = self.sensor.do_update()
279 280 281 282

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
283 284 285 286 287 288

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
289
        signal.signal(signal.SIGCHLD, self.do_signal)
290 291 292 293 294 295 296 297 298

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()