daemon.py 5.14 KB
Newer Older
1 2 3 4 5 6 7 8 9
from __future__ import print_function

import logging
import random
import re
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream

10 11 12 13 14 15 16 17
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
                         's_ask_i': {'done': 'stable', 'max': 'max'},
                         's_ask_d': {'done': 'stable', 'min': 'min'},
                         'max': {'d': 'max_ask_d'},
                         'min': {'i': 'min_ask_i'},
                         'max_ask_d': {'done': 'stable', 'min': 'nop'},
                         'min_ask_i': {'done': 'stable', 'max': 'nop'},
                         'nop': {}}
18 19


20
class Application(object):
21 22 23 24 25 26 27 28 29
    def __init__(self, identity):
        self.identity = identity
        self.buf = ''
        self.state = 'stable'

    def append_buffer(self, msg):
        self.buf = self.buf + msg

    def do_transition(self, msg):
30
        transitions = application_fsm_table[self.state]
31 32 33 34 35 36
        if msg in transitions:
            self.state = transitions[msg]
        else:
            pass

    def get_allowed_requests(self):
37
        return application_fsm_table[self.state].keys()
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69

    def get_messages(self):
        buf = self.buf
        begin = 0
        off = 0
        ret = ''
        while begin < len(buf):
            if buf.startswith('min', begin):
                ret = 'min'
                off = len(ret)
            elif buf.startswith('max', begin):
                ret = 'max'
                off = len(ret)
            elif buf.startswith('done (', begin):
                n = re.split("done \((\d+)\)", buf[begin:])[1]
                ret = 'done'
                off = len('done ()') + len(n)
            else:
                m = re.match("\d+", buf[begin:])
                if m:
                    ret = 'ok'
                    off = m.end()
                else:
                    break
            begin = begin + off
            yield ret
        self.buf = buf[begin:]
        return


class Daemon(object):
    def __init__(self):
70
        self.applications = {}
71 72 73 74 75
        self.buf = ''
        self.logger = logging.getLogger(__name__)
        self.current = 1
        self.target = 1

76 77
    def do_application_receive(self, parts):
        self.logger.info("receiving application stream: " + repr(parts))
78 79 80 81
        identity = parts[0]

        if len(parts[1]) == 0:
            # empty frame, indicate connect/disconnect
82
            if identity in self.applications:
83
                self.logger.info("known client disconnected")
84
                del self.applications[identity]
85 86
            else:
                self.logger.info("new client: " + repr(identity))
87
                self.applications[identity] = Application(identity)
88
        else:
89 90 91
            if identity in self.applications:
                application = self.applications[identity]
                # we need to unpack the stream into application messages
92
                # messages can be: min, max, done (%d), %d
93 94 95 96 97
                application.append_buffer(parts[1])
                for m in application.get_messages():
                    application.do_transition(m)
                    self.logger.info("application now in state: " +
                                     application.state)
98 99 100 101 102 103 104 105 106

    def do_sensor(self):
        self.current = random.randrange(0, 34)
        self.logger.info("current measure: " + str(self.current))

    def do_control(self):
        self.target = random.randrange(0, 34)
        self.logger.info("target measure: " + str(self.target))

107
        for identity, application in self.applications.iteritems():
108
            if self.current < self.target:
109
                if 'i' in application.get_allowed_requests():
110
                    self.stream.send_multipart([identity, 'i'])
111
                    application.do_transition('i')
112
            elif self.current > self.target:
113
                if 'd' in application.get_allowed_requests():
114
                    self.stream.send_multipart([identity, 'd'])
115
                    application.do_transition('d')
116 117
            else:
                pass
118
            self.logger.info("application now in state: " + application.state)
119 120 121 122 123 124 125 126 127 128 129 130

    def do_signal(self, signum, frame):
        ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)

    def do_shutdown(self):
        ioloop.IOLoop.current().stop()

    def main(self):
        # read config
        bind_port = 1234
        bind_address = '*'

131
        # setup application listening socket
132 133 134 135 136 137
        context = zmq.Context()
        socket = context.socket(zmq.STREAM)
        bind_param = "tcp://%s:%d" % (bind_address, bind_port)
        socket.bind(bind_param)
        self.logger.info("socket bound to: " + bind_param)
        self.stream = zmqstream.ZMQStream(socket)
138
        self.stream.on_recv(self.do_application_receive)
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156

        self.sensor = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor.start()

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()