daemon.py 9.68 KB
Newer Older
1 2
from __future__ import print_function

3
import containers
4
import json
5
import logging
6
import os
7
import re
8
import sensor
9 10 11
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
12

13

14 15 16 17 18 19 20 21
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
                         's_ask_i': {'done': 'stable', 'max': 'max'},
                         's_ask_d': {'done': 'stable', 'min': 'min'},
                         'max': {'d': 'max_ask_d'},
                         'min': {'i': 'min_ask_i'},
                         'max_ask_d': {'done': 'stable', 'min': 'nop'},
                         'min_ask_i': {'done': 'stable', 'max': 'nop'},
                         'nop': {}}
22 23


24
class Application(object):
25 26 27 28 29 30 31 32 33
    def __init__(self, identity):
        self.identity = identity
        self.buf = ''
        self.state = 'stable'

    def append_buffer(self, msg):
        self.buf = self.buf + msg

    def do_transition(self, msg):
34
        transitions = application_fsm_table[self.state]
35 36 37 38 39 40
        if msg in transitions:
            self.state = transitions[msg]
        else:
            pass

    def get_allowed_requests(self):
41
        return application_fsm_table[self.state].keys()
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

    def get_messages(self):
        buf = self.buf
        begin = 0
        off = 0
        ret = ''
        while begin < len(buf):
            if buf.startswith('min', begin):
                ret = 'min'
                off = len(ret)
            elif buf.startswith('max', begin):
                ret = 'max'
                off = len(ret)
            elif buf.startswith('done (', begin):
                n = re.split("done \((\d+)\)", buf[begin:])[1]
                ret = 'done'
                off = len('done ()') + len(n)
            else:
                m = re.match("\d+", buf[begin:])
                if m:
                    ret = 'ok'
                    off = m.end()
                else:
                    break
            begin = begin + off
            yield ret
        self.buf = buf[begin:]
        return


class Daemon(object):
    def __init__(self):
74
        self.applications = {}
75 76
        self.buf = ''
        self.logger = logging.getLogger(__name__)
77
        self.target = 1.0
78

79
    def do_application_receive(self, parts):
80
        self.logger.info("receiving application stream: %r", parts)
81 82 83 84
        identity = parts[0]

        if len(parts[1]) == 0:
            # empty frame, indicate connect/disconnect
85
            if identity in self.applications:
86
                self.logger.info("known client disconnected")
87
                del self.applications[identity]
88 89
            else:
                self.logger.info("new client: " + repr(identity))
90
                self.applications[identity] = Application(identity)
91
        else:
92 93 94
            if identity in self.applications:
                application = self.applications[identity]
                # we need to unpack the stream into application messages
95
                # messages can be: min, max, done (%d), %d
96 97 98
                application.append_buffer(parts[1])
                for m in application.get_messages():
                    application.do_transition(m)
99
                    self.logger.info("application now in state: %s",
100
                                     application.state)
101

Swann Perarnau's avatar
Swann Perarnau committed
102 103
    def do_upstream_receive(self, parts):
        self.logger.info("receiving upstream message: %r", parts)
104
        if len(parts) != 1:
105
            self.logger.error("unexpected msg length, dropping it: %r", parts)
106 107
            return
        msg = json.loads(parts[0])
108 109 110 111 112 113 114 115 116 117 118
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
                self.logger.error("missing command in message: %r", msg)
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
                self.logger.info("new target measure: %g", self.target)
            elif command == 'run':
                self.logger.info("new container required: %r", msg)
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
                pid = self.container_manager.create(msg)
                if pid > 0:
                    self.children[pid] = msg['uuid']
                    # TODO: obviously we need to send more info than that
                    update = {'type': 'container',
                              'uuid': msg['uuid'],
                              'errno': 0,
                              'pid': pid,
                              }
                    self.upstream_pub.send_json(update)
                else:
                    update = {'type': 'container',
                              'uuid': msg['uuid'],
                              'errno': pid,
                              }
                    self.upstream_pub.send_json(update)
135 136
            else:
                self.logger.error("invalid command: %r", command)
137

138
    def do_sensor(self):
139
        self.machine_info = self.sensor.do_update()
140 141
        self.logger.info("current state: %r", self.machine_info)
        total_power = self.machine_info['energy']['power']['total']
142 143 144 145 146 147
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
        self.logger.info("sending sensor message: %r", msg)
148 149

    def do_control(self):
150
        total_power = self.machine_info['energy']['power']['total']
151

152
        for identity, application in self.applications.iteritems():
153
            if total_power < self.target:
154
                if 'i' in application.get_allowed_requests():
155
                    self.downstream.send_multipart([identity, 'i'])
156
                    application.do_transition('i')
157
            elif total_power > self.target:
158
                if 'd' in application.get_allowed_requests():
159
                    self.downstream.send_multipart([identity, 'd'])
160
                    application.do_transition('d')
161 162
            else:
                pass
163
            self.logger.info("application now in state: %s", application.state)
164 165

    def do_signal(self, signum, frame):
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
            self.logger.error("wrong signal: %d", signum)

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
                ret = os.wait3(os.WNOHANG)
                if ret == (0, 0):
                    break
            except OSError:
                break

            pid, status = ret
            self.logger.info("child update: %d, %r", pid, status)
            # check if this is an exit
            if os.WIFEXITED(status):
                # TODO: update container tracking
                msg = {'type': 'container',
                       'event': 'exit',
                       'status': status,
                       'uuid': None,
                       }
                self.upstream_pub.send_json(msg)
            else:
                # ignore on purpose
                pass
197 198

    def do_shutdown(self):
199
        self.sensor.stop()
200 201 202
        ioloop.IOLoop.current().stop()

    def main(self):
203
        # Bind port for downstream clients
204
        bind_port = 1234
205
        # Bind address for downstream clients
206
        bind_address = '*'
207 208 209 210
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
211

212
        # setup application listening socket
213
        context = zmq.Context()
214 215 216 217 218 219 220 221 222 223 224
        downstream_socket = context.socket(zmq.STREAM)
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

        downstream_bind_param = "tcp://%s:%d" % (bind_address, bind_port)
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
        upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)

        downstream_socket.bind(downstream_bind_param)
        upstream_pub_socket.bind(upstream_pub_param)
        upstream_sub_socket.connect(upstream_sub_param)
225
        upstream_sub_filter = ""
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

        self.logger.info("downstream socket bound to: %s",
                         downstream_bind_param)
        self.logger.info("upstream pub socket bound to: %s",
                         upstream_pub_param)
        self.logger.info("upstream sub socket connected to: %s",
                         upstream_sub_param)

        # register socket triggers
        self.downstream = zmqstream.ZMQStream(downstream_socket)
        self.downstream.on_recv(self.do_application_receive)
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
242

243 244 245
        # create container manager
        self.container_manager = containers.ContainerManager()

246 247
        # create sensor manager and make first measurement
        self.sensor = sensor.SensorManager()
248
        self.sensor.start()
249
        self.machine_info = self.sensor.do_update()
250 251 252 253

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
254 255 256 257 258 259

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
260
        signal.signal(signal.SIGCHLD, self.do_signal)
261 262 263 264 265 266 267 268 269

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()