daemon.py 14 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import logging
9
import os
10
from resources import ResourceManager
11
from sensor import SensorManager
12
import signal
13
from zmq.eventloop import ioloop
14
from nrm.messaging import MSGTYPES
15 16
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
        DownstreamEventServer
17

18 19
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
20

21 22
logger = logging.getLogger('nrm')

23 24

class Daemon(object):
25 26

    def __init__(self, config):
27
        self.target = 100.0
28
        self.config = config
29

30 31 32 33 34 35 36
    def do_downstream_receive(self, msg, client):
        logger.info("receiving downstream message: %r", msg)
        if msg.type == 'application_start':
            cid = msg.container_uuid
            container = self.container_manager.containers[cid]
            self.application_manager.register(msg, container)
        elif msg.type == 'progress':
37 38 39 40 41 42 43
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
            pub = {'api': 'up_pub',
                   'type': 'progress',
                   'payload': msg.payload,
44
                   'container_uuid': msg.container_uuid}
45 46 47 48 49 50 51 52 53 54 55 56 57
            self.upstream_pub_server.sendmsg(
                    PUB_MSG['progress'](**pub))
        elif msg.type == 'performance':
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
            pub = {'api': 'up_pub',
                   'type': 'performance',
                   'payload': msg.payload,
                   'container_uuid': msg.container_uuid}
            self.upstream_pub_server.sendmsg(
                    PUB_MSG['performance'](**pub))
58 59 60 61 62 63 64 65 66 67 68 69 70
        elif msg.type == 'phase_context':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                app = self.application_manager.applications[uuid]
                c = self.container_manager.containers[app.cid]
                if c.power['policy']:
                    app.update_phase_context(msg)
        elif msg.type == 'application_exit':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                self.application_manager.delete(uuid)
        else:
            logger.error("unknown msg: %r", msg)
71
            return
72

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
94
            if len(container.processes) == 1:
95 96 97 98 99 100 101 102 103 104
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
105 106
                update = {'api': 'up_pub',
                          'type': 'container_start',
107
                          'container_uuid': container_uuid,
108
                          'errno': 0 if container else -1,
109
                          'power': container.power['policy'] or dict()
110
                          }
111 112
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
113

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
144

145
    def do_children_io(self, client, container_uuid, io, data):
146 147 148
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
149 150 151 152
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
153 154
                  'payload': data or 'eof',
                  }
155
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
156

157
    def do_sensor(self):
158
        self.machine_info = self.sensor_manager.do_update()
159
        logger.info("current state: %r", self.machine_info)
160 161 162 163 164 165 166 167 168 169 170 171 172
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
            msg = {'api': 'up_pub',
                   'type': 'power',
                   'total': total_power,
                   'limit': self.target
                   }
            self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
            logger.info("sending sensor message: %r", msg)
173 174

    def do_control(self):
175 176
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
177
        if action:
178 179
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
180 181 182
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
183 184

    def do_signal(self, signum, frame):
185 186 187 188 189
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
190
            logger.error("wrong signal: %d", signum)
191 192 193 194 195

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
196 197
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
198 199 200 201
                    break
            except OSError:
                break

202
            logger.info("child update %d: %r", pid, status)
203
            # check if its a pid we care about
204
            if pid in self.container_manager.pids:
205
                # check if this is an exit
206
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
207
                    container = self.container_manager.pids[pid]
208
                    clientid = container.clientids[pid]
209 210

                    # first, send a process_exit
211
                    msg = {'api': 'up_rpc_rep',
212
                           'type': 'process_exit',
213 214
                           'status': str(status),
                           'container_uuid': container.uuid,
215
                           }
216 217 218 219 220 221 222
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
223

224
                    # if this is the last process in the container,
225
                    # kill everything
226
                    if len(container.processes) == 0:
227
                        # deal with container exit
228 229
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
230 231 232
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
233 234 235 236 237
                        p = container.power
                        if p['policy']:
                            p['manager'].reset_all()
                        if p['profile']:
                            e = p['profile']['end']
238 239 240
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
241
                            s = p['profile']['start']
242 243 244 245 246
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
247 248 249 250
                            diff['policy'] = p['policy']
                            if p['policy']:
                                diff['damper'] = float(p['damper'])/1000000000
                                diff['slowdown'] = p['slowdown']
251 252 253
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
254
                        self.container_manager.delete(container.uuid)
255 256
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
257
            else:
258
                logger.debug("child update ignored")
259
                pass
260 261

    def do_shutdown(self):
262
        self.sensor_manager.stop()
263 264 265
        ioloop.IOLoop.current().stop()

    def main(self):
266
        # Bind address for downstream clients
267
        bind_address = '*'
268

269
        # port for upstream PUB API
270
        upstream_pub_port = 2345
271 272
        # port for upstream RPC API
        upstream_rpc_port = 3456
273

274
        # setup application listening socket
275
        downstream_event_param = "ipc:///tmp/nrm-downstream-event"
276
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
277
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
278

279
        self.downstream_event = DownstreamEventServer(downstream_event_param)
280 281
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
282

283 284
        logger.info("downstream event socket bound to: %s",
                    downstream_event_param)
285
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
286
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
287 288

        # register socket triggers
289
        self.downstream_event.setup_recv_callback(self.do_downstream_receive)
290
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
291

292
        # create managers
293 294 295 296 297
        self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
        self.container_manager = ContainerManager(
           self.resource_manager,
           perfwrapper=self.config.argo_perf_wrapper,
           linuxperf=self.config.perf,
298 299
           argo_nodeos_config=self.config.argo_nodeos_config,
           pmpi_lib=self.config.pmpi_lib,
300
           )
301
        self.application_manager = ApplicationManager()
302 303
        self.sensor_manager = SensorManager()
        pa = PowerActuator(self.sensor_manager)
304
        self.controller = Controller([pa])
305 306 307

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
308 309 310 311

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
312 313 314 315 316 317

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
318
        signal.signal(signal.SIGCHLD, self.do_signal)
319 320 321 322

        ioloop.IOLoop.current().start()


323
def runner(config):
324
    ioloop.install()
325 326 327 328 329 330 331 332

    logger.setLevel(logging.DEBUG)

    if config.nrm_log:
        print("Logging to %s" % config.nrm_log)
        logger.addHandler(logging.FileHandler(config.nrm_log))

    daemon = Daemon(config)
333
    daemon.main()