daemon.py 14.4 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import logging
9
import os
10
from resources import ResourceManager
11
from sensor import SensorManager
12
import signal
13
from zmq.eventloop import ioloop
14
from nrm.messaging import MSGTYPES
15 16
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
        DownstreamEventServer
17

18 19
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
20

21 22
logger = logging.getLogger('nrm')

23 24

class Daemon(object):
25 26

    def __init__(self, config):
27
        self.target = 100.0
28
        self.config = config
29

30 31 32 33 34 35 36
    def do_downstream_receive(self, msg, client):
        logger.info("receiving downstream message: %r", msg)
        if msg.type == 'application_start':
            cid = msg.container_uuid
            container = self.container_manager.containers[cid]
            self.application_manager.register(msg, container)
        elif msg.type == 'progress':
37 38 39 40
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
41 42 43 44 45 46
                pub = {'api': 'up_pub',
                       'type': 'progress',
                       'payload': msg.payload,
                       'application_uuid': msg.application_uuid}
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['progress'](**pub))
47 48 49 50 51 52 53 54 55 56 57
        elif msg.type == 'performance':
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
            pub = {'api': 'up_pub',
                   'type': 'performance',
                   'payload': msg.payload,
                   'container_uuid': msg.container_uuid}
            self.upstream_pub_server.sendmsg(
                    PUB_MSG['performance'](**pub))
58 59 60 61
        elif msg.type == 'phase_context':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                app = self.application_manager.applications[uuid]
62 63 64 65 66 67 68
                if bool(self.container_manager.containers):
                    cid = app.container_uuid
                    c = self.container_manager.containers[cid]
                    if c.power['policy']:
                        app.update_phase_context(msg)
                        # Run container policy
                        self.controller.run_policy_container(c, app)
69 70 71 72 73 74
        elif msg.type == 'application_exit':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                self.application_manager.delete(uuid)
        else:
            logger.error("unknown msg: %r", msg)
75
            return
76

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
98
            if len(container.processes) == 1:
99 100 101 102 103 104 105 106 107 108
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
109 110
                update = {'api': 'up_pub',
                          'type': 'container_start',
111
                          'container_uuid': container_uuid,
112
                          'errno': 0 if container else -1,
113
                          'power': container.power['policy'] or str(None)
114
                          }
115 116
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
117

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
148

149
    def do_children_io(self, client, container_uuid, io, data):
150 151 152
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
153 154 155 156
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
157 158
                  'payload': data or 'eof',
                  }
159
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
160

161
    def do_sensor(self):
162
        self.machine_info = self.sensor_manager.do_update()
163
        logger.info("current state: %r", self.machine_info)
164 165 166 167 168 169 170 171 172 173 174 175 176
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
            msg = {'api': 'up_pub',
                   'type': 'power',
                   'total': total_power,
                   'limit': self.target
                   }
            self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
            logger.info("sending sensor message: %r", msg)
177 178

    def do_control(self):
179 180
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
181
        if action:
182 183
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
184
        # Call policy only if there are containers
185 186
        # if self.container_manager.containers:
            # self.controller.run_policy(self.container_manager.containers)
187 188

    def do_signal(self, signum, frame):
189 190 191 192 193
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
194
            logger.error("wrong signal: %d", signum)
195 196 197 198 199

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
200 201
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
202 203 204 205
                    break
            except OSError:
                break

206
            logger.info("child update %d: %r", pid, status)
207
            # check if its a pid we care about
208
            if pid in self.container_manager.pids:
209
                # check if this is an exit
210
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
211
                    container = self.container_manager.pids[pid]
212
                    clientid = container.clientids[pid]
213 214

                    # first, send a process_exit
215
                    msg = {'api': 'up_rpc_rep',
216
                           'type': 'process_exit',
217 218
                           'status': str(status),
                           'container_uuid': container.uuid,
219
                           }
220 221 222 223 224 225 226
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
227

228
                    # if this is the last process in the container,
229
                    # kill everything
230
                    if len(container.processes) == 0:
231
                        # deal with container exit
232 233
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
234 235 236
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
237 238 239 240 241
                        p = container.power
                        if p['policy']:
                            p['manager'].reset_all()
                        if p['profile']:
                            e = p['profile']['end']
242 243 244
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
245
                            s = p['profile']['start']
246 247 248 249 250
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
251 252 253 254
                            diff['policy'] = p['policy']
                            if p['policy']:
                                diff['damper'] = float(p['damper'])/1000000000
                                diff['slowdown'] = p['slowdown']
255
                            diff['nodename'] = self.sensor_manager.nodename
256 257 258
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
259
                        self.container_manager.delete(container.uuid)
260 261
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
262
            else:
263
                logger.debug("child update ignored")
264
                pass
265 266

    def do_shutdown(self):
267
        self.sensor_manager.stop()
268 269 270
        ioloop.IOLoop.current().stop()

    def main(self):
271
        # Bind address for downstream clients
272
        bind_address = '*'
273

274
        # port for upstream PUB API
275
        upstream_pub_port = 2345
276 277
        # port for upstream RPC API
        upstream_rpc_port = 3456
278

279
        # setup application listening socket
280
        downstream_event_param = "ipc:///tmp/nrm-downstream-event"
281
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
282
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
283

284
        self.downstream_event = DownstreamEventServer(downstream_event_param)
285 286
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
287

288 289
        logger.info("downstream event socket bound to: %s",
                    downstream_event_param)
290
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
291
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
292 293

        # register socket triggers
294
        self.downstream_event.setup_recv_callback(self.do_downstream_receive)
295
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
296

297
        # create managers
298 299 300 301 302
        self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
        self.container_manager = ContainerManager(
           self.resource_manager,
           perfwrapper=self.config.argo_perf_wrapper,
           linuxperf=self.config.perf,
303 304
           argo_nodeos_config=self.config.argo_nodeos_config,
           pmpi_lib=self.config.pmpi_lib,
305
           )
306
        self.application_manager = ApplicationManager()
307 308
        self.sensor_manager = SensorManager()
        pa = PowerActuator(self.sensor_manager)
309
        self.controller = Controller([pa])
310 311 312

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
313 314 315 316

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
317 318 319 320 321 322

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
323
        signal.signal(signal.SIGCHLD, self.do_signal)
324 325 326 327

        ioloop.IOLoop.current().start()


328
def runner(config):
329
    ioloop.install()
330 331 332 333 334 335 336 337

    logger.setLevel(logging.DEBUG)

    if config.nrm_log:
        print("Logging to %s" % config.nrm_log)
        logger.addHandler(logging.FileHandler(config.nrm_log))

    daemon = Daemon(config)
338
    daemon.main()