daemon.py 14.5 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager, NodeOSRuntime
5
from controller import Controller, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import logging
9
import os
10
from resources import ResourceManager
11
from sensor import SensorManager
12
import signal
13
from zmq.eventloop import ioloop
14
from nrm.messaging import MSGTYPES
15 16
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
        DownstreamEventServer
17

18 19
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
20

21 22
logger = logging.getLogger('nrm')

23 24

class Daemon(object):
25 26

    def __init__(self, config):
27
        self.target = 100.0
28
        self.config = config
29

30 31 32 33 34 35 36
    def do_downstream_receive(self, msg, client):
        logger.info("receiving downstream message: %r", msg)
        if msg.type == 'application_start':
            cid = msg.container_uuid
            container = self.container_manager.containers[cid]
            self.application_manager.register(msg, container)
        elif msg.type == 'progress':
37 38 39 40
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
41 42 43 44 45 46
                pub = {'api': 'up_pub',
                       'type': 'progress',
                       'payload': msg.payload,
                       'application_uuid': msg.application_uuid}
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['progress'](**pub))
47 48 49 50 51 52 53 54 55 56 57
        elif msg.type == 'performance':
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
            pub = {'api': 'up_pub',
                   'type': 'performance',
                   'payload': msg.payload,
                   'container_uuid': msg.container_uuid}
            self.upstream_pub_server.sendmsg(
                    PUB_MSG['performance'](**pub))
58 59 60 61
        elif msg.type == 'phase_context':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                app = self.application_manager.applications[uuid]
62 63 64 65 66 67 68
                if bool(self.container_manager.containers):
                    cid = app.container_uuid
                    c = self.container_manager.containers[cid]
                    if c.power['policy']:
                        app.update_phase_context(msg)
                        # Run container policy
                        self.controller.run_policy_container(c, app)
69 70 71 72 73 74
        elif msg.type == 'application_exit':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                self.application_manager.delete(uuid)
        else:
            logger.error("unknown msg: %r", msg)
75
            return
76

77 78 79 80 81 82 83 84 85 86 87
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
88
            logger.info("asked to run a command in a container: %r", msg)
89 90 91 92 93 94 95 96 97 98
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
99
            if len(container.processes) == 1:
100 101
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
102
                            container.resources.cpus,
103 104 105 106 107 108 109
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
110 111
                update = {'api': 'up_pub',
                          'type': 'container_start',
112
                          'container_uuid': container_uuid,
113
                          'errno': 0 if container else -1,
114
                          'power': container.power['policy'] or str(None)
115
                          }
116 117
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
118

119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
149

150
    def do_children_io(self, client, container_uuid, io, data):
151 152 153
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
154 155 156 157
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
158 159
                  'payload': data or 'eof',
                  }
160
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
161

162
    def do_sensor(self):
163
        self.machine_info = self.sensor_manager.do_update()
164
        logger.info("current state: %r", self.machine_info)
165 166 167 168 169 170 171 172 173 174 175 176 177
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
            msg = {'api': 'up_pub',
                   'type': 'power',
                   'total': total_power,
                   'limit': self.target
                   }
            self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
            logger.info("sending sensor message: %r", msg)
178 179

    def do_control(self):
180 181
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
182
        if action:
183 184
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
185
        # Call policy only if there are containers
186 187
        # if self.container_manager.containers:
            # self.controller.run_policy(self.container_manager.containers)
188 189

    def do_signal(self, signum, frame):
190 191 192 193 194
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
195
            logger.error("wrong signal: %d", signum)
196 197 198 199 200

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
201 202
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
203 204 205 206
                    break
            except OSError:
                break

207
            logger.info("child update %d: %r", pid, status)
208
            # check if its a pid we care about
209
            if pid in self.container_manager.pids:
210
                # check if this is an exit
211
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
212
                    container = self.container_manager.pids[pid]
213
                    clientid = container.clientids[pid]
214 215

                    # first, send a process_exit
216
                    msg = {'api': 'up_rpc_rep',
217
                           'type': 'process_exit',
218 219
                           'status': str(status),
                           'container_uuid': container.uuid,
220
                           }
221 222 223 224 225 226 227
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
228

229
                    # if this is the last process in the container,
230
                    # kill everything
231
                    if len(container.processes) == 0:
232
                        # deal with container exit
233 234
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
235 236 237
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
238 239 240 241 242
                        p = container.power
                        if p['policy']:
                            p['manager'].reset_all()
                        if p['profile']:
                            e = p['profile']['end']
243 244 245
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
246
                            s = p['profile']['start']
247 248 249 250 251
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
252 253 254 255
                            diff['policy'] = p['policy']
                            if p['policy']:
                                diff['damper'] = float(p['damper'])/1000000000
                                diff['slowdown'] = p['slowdown']
256
                            diff['nodename'] = self.sensor_manager.nodename
257 258 259
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
260
                        self.container_manager.delete(container.uuid)
261 262
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
263
            else:
264
                logger.debug("child update ignored")
265
                pass
266 267

    def do_shutdown(self):
268
        self.sensor_manager.stop()
269 270 271
        ioloop.IOLoop.current().stop()

    def main(self):
272
        # Bind address for downstream clients
273
        bind_address = '*'
274

275
        # port for upstream PUB API
276
        upstream_pub_port = 2345
277 278
        # port for upstream RPC API
        upstream_rpc_port = 3456
279

280
        # setup application listening socket
281
        downstream_event_param = "ipc:///tmp/nrm-downstream-event"
282
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
283
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
284

285
        self.downstream_event = DownstreamEventServer(downstream_event_param)
286 287
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
288

289 290
        logger.info("downstream event socket bound to: %s",
                    downstream_event_param)
291
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
292
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
293 294

        # register socket triggers
295
        self.downstream_event.setup_recv_callback(self.do_downstream_receive)
296
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
297

298
        # create managers
299
        self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
300
        container_runtime = \
301
            NodeOSRuntime(self.config.argo_nodeos_config)
302
        self.container_manager = ContainerManager(
303 304 305 306 307
                container_runtime,
                self.resource_manager,
                perfwrapper=self.config.argo_perf_wrapper,
                linuxperf=self.config.perf,
                pmpi_lib=self.config.pmpi_lib,
308
           )
309
        self.application_manager = ApplicationManager()
310 311
        self.sensor_manager = SensorManager()
        pa = PowerActuator(self.sensor_manager)
312
        self.controller = Controller([pa])
313 314 315

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
316 317 318 319

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
320 321 322 323 324 325

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
326
        signal.signal(signal.SIGCHLD, self.do_signal)
327 328 329 330

        ioloop.IOLoop.current().start()


331
def runner(config):
332
    ioloop.install()
333 334 335 336 337 338 339 340

    logger.setLevel(logging.DEBUG)

    if config.nrm_log:
        print("Logging to %s" % config.nrm_log)
        logger.addHandler(logging.FileHandler(config.nrm_log))

    daemon = Daemon(config)
341
    daemon.main()