daemon.py 14.3 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, ApplicationActuator, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import json
9
import logging
10
import os
11
from resources import ResourceManager
12
from sensor import SensorManager
13 14 15
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
16 17
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
18

19 20
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
21

22 23
logger = logging.getLogger('nrm')

24 25

class Daemon(object):
26 27

    def __init__(self, config):
28
        self.target = 100.0
29
        self.config = config
30

31 32 33 34 35 36 37 38 39 40 41 42 43
    def do_downstream_receive(self, parts):
        logger.info("receiving downstream message: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it: %r", parts)
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            msgtype = msg.get('type')
            event = msg.get('event')
            if msgtype is None or msgtype != 'application' or event is None:
                logger.error("wrong message format: %r", msg)
                return
            if event == 'start':
44 45
                cid = msg['container']
                container = self.container_manager.containers[cid]
46
                self.application_manager.register(msg, container)
47
            elif event == 'threads':
48 49 50 51
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_threads(msg)
52
            elif event == 'progress':
53 54 55 56
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
                    app.update_progress(msg)
57
            elif event == 'phase_context':
58 59 60
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    app = self.application_manager.applications[uuid]
61
                    c = self.container_manager.containers[app.cid]
62
                    if c.power['policy']:
63
                        app.update_phase_context(msg)
64
            elif event == 'exit':
65 66 67
                uuid = msg['uuid']
                if uuid in self.application_manager.applications:
                    self.application_manager.delete(msg['uuid'])
68 69 70
            else:
                logger.error("unknown event: %r", event)
                return
71

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
93
            if len(container.processes) == 1:
94 95 96 97 98 99 100 101 102 103
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
104 105
                update = {'api': 'up_pub',
                          'type': 'container_start',
106
                          'container_uuid': container_uuid,
107
                          'errno': 0 if container else -1,
108
                          'power': container.power['policy'] or dict()
109
                          }
110 111
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
112

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
143

144
    def do_children_io(self, client, container_uuid, io, data):
145 146 147
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
148 149 150 151
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
152 153
                  'payload': data or 'eof',
                  }
154
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
155

156
    def do_sensor(self):
157
        self.machine_info = self.sensor_manager.do_update()
158
        logger.info("current state: %r", self.machine_info)
159 160 161 162 163 164 165 166 167 168 169 170 171
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
            msg = {'api': 'up_pub',
                   'type': 'power',
                   'total': total_power,
                   'limit': self.target
                   }
            self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
            logger.info("sending sensor message: %r", msg)
172 173

    def do_control(self):
174 175
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
176
        if action:
177 178
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
179 180 181
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
182 183

    def do_signal(self, signum, frame):
184 185 186 187 188
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
189
            logger.error("wrong signal: %d", signum)
190 191 192 193 194

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
195 196
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
197 198 199 200
                    break
            except OSError:
                break

201
            logger.info("child update %d: %r", pid, status)
202
            # check if its a pid we care about
203
            if pid in self.container_manager.pids:
204
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
205
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
206
                    container = self.container_manager.pids[pid]
207
                    clientid = container.clientids[pid]
208 209

                    # first, send a process_exit
210
                    msg = {'api': 'up_rpc_rep',
211
                           'type': 'process_exit',
212 213
                           'status': str(status),
                           'container_uuid': container.uuid,
214
                           }
215 216 217 218 219 220 221
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
222

223
                    # if this is the last process in the container,
224
                    # kill everything
225
                    if len(container.processes) == 0:
226
                        # deal with container exit
227 228
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
229 230 231
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
232 233 234 235
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
236 237 238 239 240 241 242 243 244 245 246 247 248
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
249
                        self.container_manager.delete(container.uuid)
250 251
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
252
            else:
253
                logger.debug("child update ignored")
254
                pass
255 256

    def do_shutdown(self):
257
        self.sensor_manager.stop()
258 259 260
        ioloop.IOLoop.current().stop()

    def main(self):
261
        # Bind address for downstream clients
262
        bind_address = '*'
263

264
        # port for upstream PUB API
265
        upstream_pub_port = 2345
266 267
        # port for upstream RPC API
        upstream_rpc_port = 3456
268

269
        # setup application listening socket
270
        context = zmq.Context()
271 272 273 274
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)
        downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
275
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
276
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
277

278 279 280 281
        downstream_pub_socket.bind(downstream_pub_param)
        downstream_sub_socket.bind(downstream_sub_param)
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
282 283
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
284

285 286
        logger.info("downstream pub socket bound to: %s", downstream_pub_param)
        logger.info("downstream sub socket bound to: %s", downstream_sub_param)
287
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
288
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
289 290

        # register socket triggers
291 292
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)
293
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
294
        # create a stream to let ioloop deal with blocking calls on HWM
295
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
296

297
        # create managers
298 299 300 301 302 303 304
        self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
        self.container_manager = ContainerManager(
           self.resource_manager,
           perfwrapper=self.config.argo_perf_wrapper,
           linuxperf=self.config.perf,
           argo_nodeos_config=self.config.argo_nodeos_config
           )
305
        self.application_manager = ApplicationManager()
306 307 308 309 310 311 312
        self.sensor_manager = SensorManager()
        aa = ApplicationActuator(self.application_manager, self.downstream_pub)
        pa = PowerActuator(self.sensor_manager)
        self.controller = Controller([aa, pa])

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
313 314 315 316

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
317 318 319 320 321 322

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
323
        signal.signal(signal.SIGCHLD, self.do_signal)
324 325 326 327

        ioloop.IOLoop.current().start()


328
def runner(config):
329
    ioloop.install()
330 331 332 333 334 335 336 337

    logger.setLevel(logging.DEBUG)

    if config.nrm_log:
        print("Logging to %s" % config.nrm_log)
        logger.addHandler(logging.FileHandler(config.nrm_log))

    daemon = Daemon(config)
338
    daemon.main()