daemon.py 4.92 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
from __future__ import print_function

import logging
import random
import re
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream

client_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
                    's_ask_i': {'done': 'stable', 'max': 'max'},
                    's_ask_d': {'done': 'stable', 'min': 'min'},
                    'max': {'d': 'max_ask_d'},
                    'min': {'i': 'min_ask_i'},
                    'max_ask_d': {'done': 'stable', 'min': 'nop'},
                    'min_ask_i': {'done': 'stable', 'max': 'nop'},
                    'nop': {}}


class Client(object):
    def __init__(self, identity):
        self.identity = identity
        self.buf = ''
        self.state = 'stable'

    def append_buffer(self, msg):
        self.buf = self.buf + msg

    def do_transition(self, msg):
        transitions = client_fsm_table[self.state]
        if msg in transitions:
            self.state = transitions[msg]
        else:
            pass

    def get_allowed_requests(self):
        return client_fsm_table[self.state].keys()

    def get_messages(self):
        buf = self.buf
        begin = 0
        off = 0
        ret = ''
        while begin < len(buf):
            if buf.startswith('min', begin):
                ret = 'min'
                off = len(ret)
            elif buf.startswith('max', begin):
                ret = 'max'
                off = len(ret)
            elif buf.startswith('done (', begin):
                n = re.split("done \((\d+)\)", buf[begin:])[1]
                ret = 'done'
                off = len('done ()') + len(n)
            else:
                m = re.match("\d+", buf[begin:])
                if m:
                    ret = 'ok'
                    off = m.end()
                else:
                    break
            begin = begin + off
            yield ret
        self.buf = buf[begin:]
        return


class Daemon(object):
    def __init__(self):
        self.clients = {}
        self.buf = ''
        self.logger = logging.getLogger(__name__)
        self.current = 1
        self.target = 1

    def do_client_receive(self, parts):
        self.logger.info("receiving client stream: " + repr(parts))
        identity = parts[0]

        if len(parts[1]) == 0:
            # empty frame, indicate connect/disconnect
            if identity in self.clients:
                self.logger.info("known client disconnected")
                del self.clients[identity]
            else:
                self.logger.info("new client: " + repr(identity))
                self.clients[identity] = Client(identity)
        else:
            if identity in self.clients:
                client = self.clients[identity]
                # we need to unpack the stream into client messages
                # messages can be: min, max, done (%d), %d
                client.append_buffer(parts[1])
                for m in client.get_messages():
                    client.do_transition(m)
                    self.logger.info("client now in state: " + client.state)

    def do_sensor(self):
        self.current = random.randrange(0, 34)
        self.logger.info("current measure: " + str(self.current))

    def do_control(self):
        self.target = random.randrange(0, 34)
        self.logger.info("target measure: " + str(self.target))

        for identity, client in self.clients.iteritems():
            if self.current < self.target:
                if 'i' in client.get_allowed_requests():
                    self.stream.send_multipart([identity, 'i'])
                    client.do_transition('i')
            elif self.current > self.target:
                if 'd' in client.get_allowed_requests():
                    self.stream.send_multipart([identity, 'd'])
                    client.do_transition('d')
            else:
                pass
            self.logger.info("client now in state: " + client.state)

    def do_signal(self, signum, frame):
        ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)

    def do_shutdown(self):
        ioloop.IOLoop.current().stop()

    def main(self):
        # read config
        bind_port = 1234
        bind_address = '*'

        # setup listening socket
        context = zmq.Context()
        socket = context.socket(zmq.STREAM)
        bind_param = "tcp://%s:%d" % (bind_address, bind_port)
        socket.bind(bind_param)
        self.logger.info("socket bound to: " + bind_param)

        self.stream = zmqstream.ZMQStream(socket)
        self.stream.on_recv(self.do_client_receive)

        self.sensor = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor.start()

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()