daemon.py 7.06 KB
Newer Older
1
2
3
4
5
6
7
8
from __future__ import print_function

import logging
import random
import re
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
9
import sensor
10

11
12
13
14
15
16
17
18
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
                         's_ask_i': {'done': 'stable', 'max': 'max'},
                         's_ask_d': {'done': 'stable', 'min': 'min'},
                         'max': {'d': 'max_ask_d'},
                         'min': {'i': 'min_ask_i'},
                         'max_ask_d': {'done': 'stable', 'min': 'nop'},
                         'min_ask_i': {'done': 'stable', 'max': 'nop'},
                         'nop': {}}
19
20


21
class Application(object):
22
23
24
25
26
27
28
29
30
    def __init__(self, identity):
        self.identity = identity
        self.buf = ''
        self.state = 'stable'

    def append_buffer(self, msg):
        self.buf = self.buf + msg

    def do_transition(self, msg):
31
        transitions = application_fsm_table[self.state]
32
33
34
35
36
37
        if msg in transitions:
            self.state = transitions[msg]
        else:
            pass

    def get_allowed_requests(self):
38
        return application_fsm_table[self.state].keys()
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

    def get_messages(self):
        buf = self.buf
        begin = 0
        off = 0
        ret = ''
        while begin < len(buf):
            if buf.startswith('min', begin):
                ret = 'min'
                off = len(ret)
            elif buf.startswith('max', begin):
                ret = 'max'
                off = len(ret)
            elif buf.startswith('done (', begin):
                n = re.split("done \((\d+)\)", buf[begin:])[1]
                ret = 'done'
                off = len('done ()') + len(n)
            else:
                m = re.match("\d+", buf[begin:])
                if m:
                    ret = 'ok'
                    off = m.end()
                else:
                    break
            begin = begin + off
            yield ret
        self.buf = buf[begin:]
        return


class Daemon(object):
    def __init__(self):
71
        self.applications = {}
72
73
        self.buf = ''
        self.logger = logging.getLogger(__name__)
74
        self.target = 1.0
75

76
77
    def do_application_receive(self, parts):
        self.logger.info("receiving application stream: " + repr(parts))
78
79
80
81
        identity = parts[0]

        if len(parts[1]) == 0:
            # empty frame, indicate connect/disconnect
82
            if identity in self.applications:
83
                self.logger.info("known client disconnected")
84
                del self.applications[identity]
85
86
            else:
                self.logger.info("new client: " + repr(identity))
87
                self.applications[identity] = Application(identity)
88
        else:
89
90
91
            if identity in self.applications:
                application = self.applications[identity]
                # we need to unpack the stream into application messages
92
                # messages can be: min, max, done (%d), %d
93
94
95
96
97
                application.append_buffer(parts[1])
                for m in application.get_messages():
                    application.do_transition(m)
                    self.logger.info("application now in state: " +
                                     application.state)
98

99
100
101
102
103
    def do_upstream_receive(self, msg):
        self.logger.info("receiving upstream message: %r", msg)
        self.target = int(msg.split()[1])
        self.logger.info("target measure: %g", self.target)

104
    def do_sensor(self):
105
        self.machine_info = self.sensor.do_update()
106
107
108
109
110
        self.logger.info("current state: %r", self.machine_info)
        total_power = self.machine_info['energy']['power']['total']
        msg = "23.45 %g" % (total_power)
        self.upstream_pub.send_string(msg)
        self.logger.info("Sending power values: %r", msg)
111
112

    def do_control(self):
113
        total_power = self.machine_info['energy']['power']['total']
114

115
        for identity, application in self.applications.iteritems():
116
            if total_power < self.target:
117
                if 'i' in application.get_allowed_requests():
118
                    self.downstream.send_multipart([identity, 'i'])
119
                    application.do_transition('i')
120
            elif total_power > self.target:
121
                if 'd' in application.get_allowed_requests():
122
                    self.downstream.send_multipart([identity, 'd'])
123
                    application.do_transition('d')
124
125
            else:
                pass
126
            self.logger.info("application now in state: " + application.state)
127
128
129
130
131

    def do_signal(self, signum, frame):
        ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)

    def do_shutdown(self):
132
        self.sensor.stop()
133
134
135
        ioloop.IOLoop.current().stop()

    def main(self):
136
        # Bind port for downstream clients
137
        bind_port = 1234
138
        # Bind address for downstream clients
139
        bind_address = '*'
140
141
142
143
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
144

145
        # setup application listening socket
146
        context = zmq.Context()
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
        downstream_socket = context.socket(zmq.STREAM)
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

        downstream_bind_param = "tcp://%s:%d" % (bind_address, bind_port)
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
        upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)

        downstream_socket.bind(downstream_bind_param)
        upstream_pub_socket.bind(upstream_pub_param)
        upstream_sub_socket.connect(upstream_sub_param)
        upstream_sub_filter = "34.56 "
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

        self.logger.info("downstream socket bound to: %s",
                         downstream_bind_param)
        self.logger.info("upstream pub socket bound to: %s",
                         upstream_pub_param)
        self.logger.info("upstream sub socket connected to: %s",
                         upstream_sub_param)

        # register socket triggers
        self.downstream = zmqstream.ZMQStream(downstream_socket)
        self.downstream.on_recv(self.do_application_receive)
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
175

176
177
        # create sensor manager and make first measurement
        self.sensor = sensor.SensorManager()
178
        self.sensor.start()
179
        self.machine_info = self.sensor.do_update()
180
181
182
183

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()