daemon.py 15.8 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, PowerActuator, DiscretizedPowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22
logger = logging.getLogger('nrm')
Valentin Reis's avatar
Valentin Reis committed
23
logger_power = logging.getLogger('power')
24 25

class Daemon(object):
26

27
    def __init__(self, config):
28
        self.target = 100.0
29
        self.config = config
30
        self.container_owner = dict()
31

32 33 34 35 36 37 38 39 40 41 42 43 44
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
45 46
                cid = msg['container']
                container = self.container_manager.containers[cid]
47
                self.application_manager.register(msg, container)
48
            elif event == 'progress':
49 50 51 52
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
53 54 55 56 57 58
            elif event == 'hardware-progress':
                cid = msg['container']
                for app_uuid in self.application_manager.applications:
                    app = self.application_manager.applications[app_uuid]
                    if app.container_uuid == cid:
                        app.update_hardwareprogress(msg)
59
            elif event == 'phase_context':
60 61 62
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
63
                    c = self.container_manager.containers[app.cid]
64
                    if c.power['policy']:
65
                        app.update_phase_context(msg)
66
            elif event == 'exit':
67 68 69
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
70 71 72
            else:
                logger.error("unknown event: %r", event)
                return
73

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
            if len(container.processes.keys()) == 1:
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
                update = {'api': 'up_rpc_rep',
                          'type': 'start',
                          'container_uuid': container_uuid,
109
                          'errno': 0 if container else -1,
110
                          'power': container.power['policy'] or dict()
111
                          }
112 113
                self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
                                                 client)
114
                self.container_owner[container.uuid] = client
115

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
146

147
    def do_children_io(self, client, container_uuid, io, data):
148 149 150
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
151 152 153 154
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
155 156
                  'payload': data or 'eof',
                  }
157
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
158

159
    def do_sensor(self):
160
        self.machine_info = self.sensor_manager.do_update()
161
        logger.info("current state: %r", self.machine_info)
162
        total_power = self.machine_info['energy']['power']['total']
163 164
        msg = {'api': 'up_pub',
               'type': 'power',
165 166 167
               'total': total_power,
               'limit': self.target
               }
168
        self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
169
        logger.info("sending sensor message: %r", msg)
170 171

    def do_control(self):
172 173 174
        logger.info(
                "Asking controller to plan for target %s using machine info %s"
                % (self.target, self.machine_info))
175
        plan = self.controller.planify(self.target, self.machine_info)
176
        logger.info("Controller chose plan " + str(plan))
177
        action, actuator = plan
178
        if action:
179 180
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
181 182 183
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
184 185

    def do_signal(self, signum, frame):
186 187
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
188 189
        elif signum == signal.SIGTERM:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
190 191 192
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
193
            logger.error("wrong signal: %d", signum)
194 195 196 197 198

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
199 200
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
201 202 203 204
                    break
            except OSError:
                break

205
            logger.info("child update %d: %r", pid, status)
206
            # check if its a pid we care about
207
            if pid in self.container_manager.pids:
208
                # check if this is an exit
209
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
210
                    container = self.container_manager.pids[pid]
211
                    clientid = container.clientids[pid]
212 213

                    # first, send a process_exit
214
                    msg = {'api': 'up_rpc_rep',
215
                           'type': 'process_exit',
216 217
                           'status': str(status),
                           'container_uuid': container.uuid,
218
                           }
219 220 221 222 223 224 225
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
226

227 228 229
                    # if this process was owner of the container,
                    # kill everything
                    if self.container_owner[container.uuid] == clientid:
230 231 232 233 234 235
                        # deal with container exit
                        msg = {'api': 'up_rpc_rep',
                               'type': 'exit',
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
236 237 238 239
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
240 241 242 243 244 245 246 247 248 249 250 251 252
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
253
                        self.container_manager.delete(container.uuid)
254 255
                        self.upstream_rpc_server.sendmsg(
                                RPC_MSG['exit'](**msg), clientid)
256
                        del self.container_owner[container.uuid]
257
            else:
258
                logger.debug("child update ignored")
259
                pass
260 261

    def do_shutdown(self):
262
        self.sensor_manager.stop()
263 264 265
        ioloop.IOLoop.current().stop()

    def main(self):
266
        # Bind address for downstream clients
267
        bind_address = '*'
268

269
        # port for upstream PUB API
270
        upstream_pub_port = 2345
271 272
        # port for upstream RPC API
        upstream_rpc_port = 3456
273

274
        # setup application listening socket
275
        context = zmq.Context()
276 277 278 279
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
280
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
281
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
282

283 284 285 286
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
287 288
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
289

290 291
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
292
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
293
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
294 295

        # register socket triggers
296 297
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
298
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
299
        # create a stream to let ioloop deal with blocking calls on HWM
300
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
301

302
        # create managers
303 304
        self.resource_manager = ResourceManager()
        self.container_manager = ContainerManager(self.resource_manager)
305
        self.application_manager = ApplicationManager()
306
        self.sensor_manager = SensorManager()
307 308
        pa = PowerActuator(self.sensor_manager,
                           strategy=self.config['powerlevel'])
309
        self.controller = Controller([pa])
310 311 312

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
313 314 315 316

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
317 318 319 320 321 322

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
323
        signal.signal(signal.SIGTERM, self.do_signal)
324
        signal.signal(signal.SIGCHLD, self.do_signal)
325 326 327 328

        ioloop.IOLoop.current().start()


329
def runner(config):
330
    ioloop.install()
331 332

    if config.log:
333 334
        print("Logging nrm to %s" % config.log)
        logger.setLevel(logging.DEBUG)
335 336
        logger.addHandler(logging.FileHandler(config.log))

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
    if config.log_progress:
        print("Logging progress data to %s" % config.log_progress)
        logger_progress = logging.getLogger('progress')
        logger_progress.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(message)s')
        handler = logging.FileHandler(config.log_progress)
        handler.setFormatter(formatter)
        logger_progress.addHandler(handler)

    if config.log_hardwareprogress:
        print("Logging progress data to %s" % config.log_hardwareprogress)
        logger_hardwareprogress = logging.getLogger('hardwareprogress')
        logger_hardwareprogress.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(message)s')
        handler = logging.FileHandler(config.log_hardwareprogress)
        handler.setFormatter(formatter)
        logger_hardwareprogress.addHandler(handler)

355
    if config.log_power:
356
        print("Logging progress data to %s" % config.log_power)
357 358
        logger_power = logging.getLogger('power')
        logger_power.setLevel(logging.DEBUG)
359 360
        formatter = logging.Formatter('%(message)s')
        handler = logging.FileHandler(config.log_power)
361 362 363 364
        handler.setFormatter(formatter)
        logger_power.addHandler(handler)

    daemon = Daemon(config)
365
    daemon.main()