daemon.py 10.3 KB
Newer Older
1
2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from functools import partial
7
import json
8
import logging
9
import os
10
from resources import ResourceManager
11
from sensor import SensorManager
12
13
14
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
15

16

17
18
logger = logging.getLogger('nrm')

19
20
21

class Daemon(object):
    def __init__(self):
22
        self.target = 100.0
23

24
25
26
27
28
29
30
31
32
33
34
35
36
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
37
                self.application_manager.register(msg)
38
            elif event == 'threads':
39
40
41
42
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
43
            elif event == 'progress':
44
45
46
47
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
48
49
50
51
52
            elif event == 'power_policy':
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    # TODO: Invoke appropriate power policy
53
54
            elif event == 'exit':
                self.application_manager.delete(msg['uuid'])
55
56
57
            else:
                logger.error("unknown event: %r", event)
                return
58

Swann Perarnau's avatar
Swann Perarnau committed
59
    def do_upstream_receive(self, parts):
60
        logger.info("receiving upstream message: %r", parts)
61
        if len(parts) != 1:
62
            logger.error("unexpected msg length, dropping it: %r", parts)
63
64
            return
        msg = json.loads(parts[0])
65
66
67
68
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
69
                logger.error("missing command in message: %r", msg)
70
71
72
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
73
                logger.info("new target measure: %g", self.target)
74
            elif command == 'run':
75
76
77
78
79
80
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

81
                logger.info("new container required: %r", msg)
82
                container = self.container_manager.create(msg)
83
84
85
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
86
87
88
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
89
                          'powerpolicy': container.powerpolicy['policy']
90
91
                          }
                self.upstream_pub.send_json(update)
92
                # setup io callbacks
93
94
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
95
                container.process.stdout.read_until_close(outcb, outcb)
96
                container.process.stderr.read_until_close(errcb, errcb)
Swann Perarnau's avatar
Swann Perarnau committed
97
            elif command == 'kill':
98
                logger.info("asked to kill container: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
99
100
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
101
            elif command == 'list':
102
                logger.info("asked for container list: %r", msg)
103
104
105
106
107
108
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
109
            else:
110
                logger.error("invalid command: %r", command)
111

112
113
114
115
116
117
118
119
120
121
122
123
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

124
    def do_sensor(self):
125
        self.machine_info = self.sensor_manager.do_update()
126
        logger.info("current state: %r", self.machine_info)
127
        total_power = self.machine_info['energy']['power']['total']
128
129
130
131
132
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
133
        logger.info("sending sensor message: %r", msg)
134
135

    def do_control(self):
136
137
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
138
        if action:
139
140
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
141
142

    def do_signal(self, signum, frame):
143
144
145
146
147
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
148
            logger.error("wrong signal: %d", signum)
149
150
151
152
153

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
154
155
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
156
157
158
159
                    break
            except OSError:
                break

160
            logger.info("child update %d: %r", pid, status)
161
            # check if its a pid we care about
162
            if pid in self.container_manager.pids:
163
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
164
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
165
166
                    container = self.container_manager.pids[pid]
                    self.container_manager.delete(container.uuid)
167
168
169
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
170
                           'uuid': container.uuid,
171
172
                           }
                    self.upstream_pub.send_json(msg)
173
            else:
174
                logger.debug("child update ignored")
175
                pass
176
177

    def do_shutdown(self):
178
        self.sensor_manager.stop()
179
180
181
        ioloop.IOLoop.current().stop()

    def main(self):
182
        # Bind address for downstream clients
183
        bind_address = '*'
184

185
186
187
188
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
189

190
        # setup application listening socket
191
        context = zmq.Context()
192
193
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
194
195
196
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

197
198
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
199
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
200
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
201

202
203
204
205
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
206
        upstream_pub_socket.bind(upstream_pub_param)
207
        upstream_sub_socket.bind(upstream_sub_param)
208
        upstream_sub_filter = ""
209
210
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

211
212
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
213
214
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
215
216

        # register socket triggers
217
218
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
219
220
221
222
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
223
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
224

225
        # create managers
226
227
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
228
        self.application_manager = ApplicationManager()
229
230
231
232
233
234
235
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
236
237
238
239

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
240
241
242
243
244
245

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
246
        signal.signal(signal.SIGCHLD, self.do_signal)
247
248
249
250
251
252
253
254
255

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()