daemon.py 14.6 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22
logger = logging.getLogger('nrm')
23
logger_power = logging.getLogger('power')
24 25

class Daemon(object):
26
    def __init__(self, config):
27
        self.target = 100.0
28
        self.config = config
29

30 31 32 33 34 35 36 37 38 39 40 41 42
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
43 44
                cid = msg['container']
                container = self.container_manager.containers[cid]
45
                self.application_manager.register(msg, container)
46
            elif event == 'threads':
47 48 49 50
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
51
            elif event == 'progress':
52 53 54 55
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
56
            elif event == 'phase_context':
57 58 59
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
60
                    c = self.container_manager.containers[app.cid]
61
                    if c.power['policy']:
62
                        app.update_phase_context(msg)
63
            elif event == 'exit':
64 65 66
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
67 68 69
            else:
                logger.error("unknown event: %r", event)
                return
70

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
            if len(container.processes.keys()) == 1:
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
                update = {'api': 'up_rpc_rep',
                          'type': 'start',
                          'container_uuid': container_uuid,
106
                          'errno': 0 if container else -1,
107
                          'pid': pid,
108
                          'power': container.power['policy'] or dict()
109
                          }
110 111
                self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
                                                 client)
112
                # setup io callbacks
113 114 115 116
                outcb = partial(self.do_children_io, client,
                                container_uuid, 'stdout')
                errcb = partial(self.do_children_io, client,
                                container_uuid, 'stderr')
117 118
                container.processes[pid].stdout.read_until_close(outcb, outcb)
                container.processes[pid].stderr.read_until_close(errcb, errcb)
119
            else:
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
                update = {'api': 'up_rpc_rep',
                          'type': 'process_start',
                          'container_uuid': container_uuid,
                          }
                self.upstream_rpc_server.sendmsg(
                        RPC_MSG['process_start'](**update), client)
                # setup io callbacks
                outcb = partial(self.do_children_io, client,
                                container_uuid, 'stdout')
                errcb = partial(self.do_children_io, client,
                                container_uuid, 'stderr')
                container.processes[pid].stdout.read_until_close(outcb, outcb)
                container.processes[pid].stderr.read_until_close(errcb, errcb)

        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
149

150
    def do_children_io(self, client, container_uuid, io, data):
151 152 153
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
154 155 156 157
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
158 159
                  'payload': data or 'eof',
                  }
160
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
161

162
    def do_sensor(self):
163
        self.machine_info = self.sensor_manager.do_update()
164
        logger.info("current state: %r", self.machine_info)
165
        total_power = self.machine_info['energy']['power']['total']
166 167
        msg = {'api': 'up_pub',
               'type': 'power',
168 169 170
               'total': total_power,
               'limit': self.target
               }
171
        self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
172
        logger.info("sending sensor message: %r", msg)
173 174

    def do_control(self):
175 176
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
177
        if action:
178 179
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
180 181 182
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
183 184

    def do_signal(self, signum, frame):
185 186 187 188 189
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
190
            logger.error("wrong signal: %d", signum)
191 192 193 194 195

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
196 197
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
198 199 200 201
                    break
            except OSError:
                break

202
            logger.info("child update %d: %r", pid, status)
203
            # check if its a pid we care about
204
            if pid in self.container_manager.pids:
205
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
206
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
207
                    container = self.container_manager.pids[pid]
208 209 210
                    clientid = container.clientids[pid]
                    remaining_pids = [p for p in container.processes.keys()
                                      if p != pid]
211 212 213
                    msg = {'api': 'up_rpc_rep',
                           'status': str(status),
                           'container_uuid': container.uuid,
214
                           }
215 216

                    if not remaining_pids:
217
                        msg['type'] = 'exit'
218
                        msg['profile_data'] = dict()
219 220 221 222
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
223 224 225 226 227 228 229 230 231 232 233 234 235
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
236
                        self.container_manager.delete(container.uuid)
237 238
                        self.upstream_rpc_server.sendmsg(
                                RPC_MSG['exit'](**msg), clientid)
239
                    else:
240
                        msg['type'] = 'process_exit'
241 242 243 244 245
                        # Remove the pid of process that is finished
                        container.processes.pop(pid, None)
                        self.container_manager.pids.pop(pid, None)
                        logger.info("Process %s in Container %s has finised.",
                                    pid, container.uuid)
246 247
                        self.upstream_rpc_server.sendmsg(
                                RPC_MSG['process_exit'](**msg), clientid)
248

249
            else:
250
                logger.debug("child update ignored")
251
                pass
252 253

    def do_shutdown(self):
254
        self.sensor_manager.stop()
255 256 257
        ioloop.IOLoop.current().stop()

    def main(self):
258
        # Bind address for downstream clients
259
        bind_address = '*'
260

261
        # port for upstream PUB API
262
        upstream_pub_port = 2345
263 264
        # port for upstream RPC API
        upstream_rpc_port = 3456
265

266
        # setup application listening socket
267
        context = zmq.Context()
268 269 270 271
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
272
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
273
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
274

275 276 277 278
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
279 280
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
281

282 283
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
284
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
285
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
286 287

        # register socket triggers
288 289
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
290
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
291
        # create a stream to let ioloop deal with blocking calls on HWM
292
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
293

294
        # create managers
295 296
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
297
        self.application_manager = ApplicationManager()
298 299 300 301 302 303 304
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
305 306 307 308

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
309 310 311 312 313 314

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
315
        signal.signal(signal.SIGCHLD, self.do_signal)
316 317 318 319

        ioloop.IOLoop.current().start()


320
def runner(config):
321
    ioloop.install()
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337

    logger.setLevel(logging.DEBUG)
    logger_power.setLevel(logging.DEBUG)

    if config.log:
        print("Logging to %s" %config.log)
        logger.addHandler(logging.FileHandler(config.log))

    if config.log_power:
        print("Logging power data to %s" %config.log_power)
        formatter=logging.Formatter('%(message)s')
        handler=logging.FileHandler(config.log_power)
        handler.setFormatter(formatter)
        logger_power.addHandler(handler)

    daemon = Daemon(config)
338
    daemon.main()