GitLab maintenance scheduled for Tomorrow, 2020-08-11, from 17:00 to 18:00 CT - Services will be unavailable during this time.

daemon.py 10.2 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from functools import partial
7
import json
8
import logging
9
import os
10
from resources import ResourceManager
11
from sensor import SensorManager
12 13 14
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
15

16

17 18
logger = logging.getLogger('nrm')

19 20 21

class Daemon(object):
    def __init__(self):
22
        self.target = 100.0
23

24 25 26 27 28 29 30 31 32 33 34 35 36
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
37
                self.application_manager.register(msg)
38
            elif event == 'threads':
39 40 41 42
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
43
            elif event == 'progress':
44 45 46 47
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
48 49 50 51 52
            elif event == 'power_policy':
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    # TODO: Invoke appropriate power policy
53 54
            elif event == 'exit':
                self.application_manager.delete(msg['uuid'])
55 56 57
            else:
                logger.error("unknown event: %r", event)
                return
58

59
    def do_upstream_receive(self, parts):
60
        logger.info("receiving upstream message: %r", parts)
61
        if len(parts) != 1:
62
            logger.error("unexpected msg length, dropping it: %r", parts)
63 64
            return
        msg = json.loads(parts[0])
65 66 67 68
        if isinstance(msg, dict):
            command = msg.get('command')
            # TODO: switch to a dispatch dictionary
            if command is None:
69
                logger.error("missing command in message: %r", msg)
70 71 72
                return
            if command == 'setpower':
                self.target = float(msg['limit'])
73
                logger.info("new target measure: %g", self.target)
74
            elif command == 'run':
75 76 77 78 79 80
                container_uuid = msg['uuid']
                if container_uuid in self.container_manager.containers:
                    logger.info("container already created: %r",
                                container_uuid)
                    return

81
                logger.info("new container required: %r", msg)
82
                container = self.container_manager.create(msg)
83 84 85
                # TODO: obviously we need to send more info than that
                update = {'type': 'container',
                          'event': 'start',
86 87 88
                          'uuid': container_uuid,
                          'errno': 0 if container else -1,
                          'pid': container.process.pid,
89 90
                          }
                self.upstream_pub.send_json(update)
91
                # setup io callbacks
92 93
                outcb = partial(self.do_children_io, container_uuid, 'stdout')
                errcb = partial(self.do_children_io, container_uuid, 'stderr')
94
                container.process.stdout.read_until_close(outcb, outcb)
95
                container.process.stderr.read_until_close(errcb, errcb)
96
            elif command == 'kill':
97
                logger.info("asked to kill container: %r", msg)
98 99
                response = self.container_manager.kill(msg['uuid'])
                # no update here, as it will trigger child exit
100
            elif command == 'list':
101
                logger.info("asked for container list: %r", msg)
102 103 104 105 106 107
                response = self.container_manager.list()
                update = {'type': 'container',
                          'event': 'list',
                          'payload': response,
                          }
                self.upstream_pub.send_json(update)
108
            else:
109
                logger.error("invalid command: %r", command)
110

111 112 113 114 115 116 117 118 119 120 121 122
    def do_children_io(self, uuid, io, data):
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
        logger.info("%r received %r data: %r", uuid, io, data)
        update = {'type': 'container',
                  'event': io,
                  'uuid': uuid,
                  'payload': data or 'eof',
                  }
        self.upstream_pub.send_json(update)

123
    def do_sensor(self):
124
        self.machine_info = self.sensor_manager.do_update()
125
        logger.info("current state: %r", self.machine_info)
126
        total_power = self.machine_info['energy']['power']['total']
127 128 129 130 131
        msg = {'type': 'power',
               'total': total_power,
               'limit': self.target
               }
        self.upstream_pub.send_json(msg)
132
        logger.info("sending sensor message: %r", msg)
133 134

    def do_control(self):
135 136
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
137
        if action:
138 139
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
140 141

    def do_signal(self, signum, frame):
142 143 144 145 146
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
147
            logger.error("wrong signal: %d", signum)
148 149 150 151 152

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
153 154
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
155 156 157 158
                    break
            except OSError:
                break

159
            logger.info("child update %d: %r", pid, status)
160
            # check if its a pid we care about
161
            if pid in self.container_manager.pids:
162
                # check if this is an exit
163
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
164 165
                    container = self.container_manager.pids[pid]
                    self.container_manager.delete(container.uuid)
166 167 168
                    msg = {'type': 'container',
                           'event': 'exit',
                           'status': status,
169
                           'uuid': container.uuid,
170 171
                           }
                    self.upstream_pub.send_json(msg)
172
            else:
173
                logger.debug("child update ignored")
174
                pass
175 176

    def do_shutdown(self):
177
        self.sensor_manager.stop()
178 179 180
        ioloop.IOLoop.current().stop()

    def main(self):
181
        # Bind address for downstream clients
182
        bind_address = '*'
183

184 185 186 187
        # PUB port for upstream clients
        upstream_pub_port = 2345
        # SUB port for upstream clients
        upstream_sub_port = 3456
188

189
        # setup application listening socket
190
        context = zmq.Context()
191 192
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
193 194 195
        upstream_pub_socket = context.socket(zmq.PUB)
        upstream_sub_socket = context.socket(zmq.SUB)

196 197
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
198
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
199
        upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
200

201 202 203 204
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
205
        upstream_pub_socket.bind(upstream_pub_param)
206
        upstream_sub_socket.bind(upstream_sub_param)
207
        upstream_sub_filter = ""
208 209
        upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

210 211
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
212 213
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
214 215

        # register socket triggers
216 217
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
218 219 220 221
        self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
        self.upstream_sub.on_recv(self.do_upstream_receive)
        # create a stream to let ioloop deal with blocking calls on HWM
        self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
222
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
223

224
        # create managers
225 226
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
227
        self.application_manager = ApplicationManager()
228 229 230 231 232 233 234
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
235 236 237 238

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
239 240 241 242 243 244

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
245
        signal.signal(signal.SIGCHLD, self.do_signal)
246 247 248 249 250 251 252 253 254

        ioloop.IOLoop.current().start()


def runner():
    ioloop.install()
    logging.basicConfig(level=logging.DEBUG)
    daemon = Daemon()
    daemon.main()