daemon.py 13.8 KB
Newer Older
1 2
from __future__ import print_function

3
from applications import ApplicationManager
4
from containers import ContainerManager
5
from controller import Controller, PowerActuator
6
from powerpolicy import PowerPolicyManager
7
from functools import partial
8
import logging
9
import os
10
from resources import ResourceManager
11
from sensor import SensorManager
12
import signal
13
from zmq.eventloop import ioloop
14
from nrm.messaging import MSGTYPES
15 16
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
        DownstreamEventServer
17

18 19
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
20

21 22
logger = logging.getLogger('nrm')

23 24

class Daemon(object):
25 26

    def __init__(self, config):
27
        self.target = 100.0
28
        self.config = config
29

30 31 32 33 34 35 36
    def do_downstream_receive(self, msg, client):
        logger.info("receiving downstream message: %r", msg)
        if msg.type == 'application_start':
            cid = msg.container_uuid
            container = self.container_manager.containers[cid]
            self.application_manager.register(msg, container)
        elif msg.type == 'progress':
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
            pub = {'api': 'up_pub',
                   'type': 'progress',
                   'payload': msg.payload,
                   'application_uuid': msg.application_uuid}
            self.upstream_pub_server.sendmsg(
                    PUB_MSG['progress'](**pub))
        elif msg.type == 'performance':
            if msg.application_uuid in self.application_manager.applications:
                app = self.application_manager.applications[
                        msg.application_uuid]
                app.update_performance(msg)
            pub = {'api': 'up_pub',
                   'type': 'performance',
                   'payload': msg.payload,
                   'container_uuid': msg.container_uuid}
            self.upstream_pub_server.sendmsg(
                    PUB_MSG['performance'](**pub))
58 59 60 61 62 63 64 65 66 67 68 69 70
        elif msg.type == 'phase_context':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                app = self.application_manager.applications[uuid]
                c = self.container_manager.containers[app.cid]
                if c.power['policy']:
                    app.update_phase_context(msg)
        elif msg.type == 'application_exit':
            uuid = msg.application_uuid
            if uuid in self.application_manager.applications:
                self.application_manager.delete(uuid)
        else:
            logger.error("unknown msg: %r", msg)
71
            return
72

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
    def do_upstream_receive(self, msg, client):
        if msg.type == 'setpower':
            self.target = float(msg.limit)
            logger.info("new target measure: %g", self.target)
            update = {'api': 'up_rpc_rep',
                      'type': 'getpower',
                      'limit': str(self.target)
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
                                             client)
        elif msg.type == 'run':
            container_uuid = msg.container_uuid
            params = {'manifest': msg.manifest,
                      'file': msg.path,
                      'args': msg.args,
                      'uuid': msg.container_uuid,
                      'environ': msg.environ,
                      'clientid': client,
                      }
            pid, container = self.container_manager.create(params)
            container_uuid = container.uuid
94
            if len(container.processes) == 1:
95 96 97 98 99 100 101 102 103 104
                if container.power['policy']:
                    container.power['manager'] = PowerPolicyManager(
                            container.resources['cpus'],
                            container.power['policy'],
                            float(container.power['damper']),
                            float(container.power['slowdown']))
                if container.power['profile']:
                    p = container.power['profile']
                    p['start'] = self.machine_info['energy']['energy']
                    p['start']['time'] = self.machine_info['time']
105 106
                update = {'api': 'up_pub',
                          'type': 'container_start',
107
                          'container_uuid': container_uuid,
108
                          'errno': 0 if container else -1,
109
                          'power': container.power['policy'] or dict()
110
                          }
111 112
                self.upstream_pub_server.sendmsg(
                        PUB_MSG['container_start'](**update))
113

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
            # now deal with the process itself
            update = {'api': 'up_rpc_rep',
                      'type': 'process_start',
                      'container_uuid': container_uuid,
                      'pid': pid,
                      }
            self.upstream_rpc_server.sendmsg(
                RPC_MSG['process_start'](**update), client)
            # setup io callbacks
            outcb = partial(self.do_children_io, client, container_uuid,
                            'stdout')
            errcb = partial(self.do_children_io, client, container_uuid,
                            'stderr')
            container.processes[pid].stdout.read_until_close(outcb, outcb)
            container.processes[pid].stderr.read_until_close(errcb, errcb)
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
        elif msg.type == 'kill':
            logger.info("asked to kill container: %r", msg)
            response = self.container_manager.kill(msg.container_uuid)
            # no update here, as it will trigger child exit
        elif msg.type == 'list':
            logger.info("asked for container list: %r", msg)
            response = self.container_manager.list()
            update = {'api': 'up_rpc_rep',
                      'type': 'list',
                      'payload': response,
                      }
            self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
                                             client)
        else:
            logger.error("invalid command: %r", msg.type)
144

145
    def do_children_io(self, client, container_uuid, io, data):
146 147 148
        """Receive data from one of the children, and send it down the pipe.

        Meant to be partially defined on a children basis."""
149 150 151 152
        logger.info("%r received %r data: %r", container_uuid, io, data)
        update = {'api': 'up_rpc_rep',
                  'type': io,
                  'container_uuid': container_uuid,
153 154
                  'payload': data or 'eof',
                  }
155
        self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
156

157
    def do_sensor(self):
158
        self.machine_info = self.sensor_manager.do_update()
159
        logger.info("current state: %r", self.machine_info)
160 161 162 163 164 165 166 167 168 169 170 171 172
        try:
            total_power = self.machine_info['energy']['power']['total']
        except TypeError:
            logger.error("power sensor format malformed, "
                         "can not report power upstream.")
        else:
            msg = {'api': 'up_pub',
                   'type': 'power',
                   'total': total_power,
                   'limit': self.target
                   }
            self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
            logger.info("sending sensor message: %r", msg)
173 174

    def do_control(self):
175 176
        plan = self.controller.planify(self.target, self.machine_info)
        action, actuator = plan
177
        if action:
178 179
            self.controller.execute(action, actuator)
            self.controller.update(action, actuator)
180 181 182
        # Call policy only if there are containers
        if self.container_manager.containers:
            self.controller.run_policy(self.container_manager.containers)
183 184

    def do_signal(self, signum, frame):
185 186 187 188 189
        if signum == signal.SIGINT:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
        elif signum == signal.SIGCHLD:
            ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
        else:
190
            logger.error("wrong signal: %d", signum)
191 192 193 194 195

    def do_children(self):
        # find out if children have terminated
        while True:
            try:
196 197
                pid, status, rusage = os.wait3(os.WNOHANG)
                if pid == 0 and status == 0:
198 199 200 201
                    break
            except OSError:
                break

202
            logger.info("child update %d: %r", pid, status)
203
            # check if its a pid we care about
204
            if pid in self.container_manager.pids:
205
                # check if this is an exit
Swann Perarnau's avatar
Swann Perarnau committed
206
                if os.WIFEXITED(status) or os.WIFSIGNALED(status):
207
                    container = self.container_manager.pids[pid]
208
                    clientid = container.clientids[pid]
209 210

                    # first, send a process_exit
211
                    msg = {'api': 'up_rpc_rep',
212
                           'type': 'process_exit',
213 214
                           'status': str(status),
                           'container_uuid': container.uuid,
215
                           }
216 217 218 219 220 221 222
                    self.upstream_rpc_server.sendmsg(
                            RPC_MSG['process_exit'](**msg), clientid)
                    # Remove the pid of process that is finished
                    container.processes.pop(pid, None)
                    self.container_manager.pids.pop(pid, None)
                    logger.info("Process %s in Container %s has finised.",
                                pid, container.uuid)
223

224
                    # if this is the last process in the container,
225
                    # kill everything
226
                    if len(container.processes) == 0:
227
                        # deal with container exit
228 229
                        msg = {'api': 'up_pub',
                               'type': 'container_exit',
230 231 232
                               'container_uuid': container.uuid,
                               'profile_data': dict(),
                               }
233 234 235 236
                        pp = container.power
                        if pp['policy']:
                            pp['manager'].reset_all()
                        if pp['profile']:
237 238 239 240 241 242 243 244 245 246 247 248 249
                            e = pp['profile']['end']
                            self.machine_info = self.sensor_manager.do_update()
                            e = self.machine_info['energy']['energy']
                            e['time'] = self.machine_info['time']
                            s = pp['profile']['start']
                            # Calculate difference between the values
                            diff = self.sensor_manager.calc_difference(s, e)
                            # Get final package temperature
                            temp = self.machine_info['temperature']
                            diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
                            logger.info("Container %r profile data: %r",
                                        container.uuid, diff)
                            msg['profile_data'] = diff
250
                        self.container_manager.delete(container.uuid)
251 252
                        self.upstream_pub_server.sendmsg(
                                PUB_MSG['container_exit'](**msg))
253
            else:
254
                logger.debug("child update ignored")
255
                pass
256 257

    def do_shutdown(self):
258
        self.sensor_manager.stop()
259 260 261
        ioloop.IOLoop.current().stop()

    def main(self):
262
        # Bind address for downstream clients
263
        bind_address = '*'
264

265
        # port for upstream PUB API
266
        upstream_pub_port = 2345
267 268
        # port for upstream RPC API
        upstream_rpc_port = 3456
269

270
        # setup application listening socket
271
        downstream_event_param = "ipc:///tmp/nrm-downstream-event"
272
        upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
273
        upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
274

275
        self.downstream_event = DownstreamEventServer(downstream_event_param)
276 277
        self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
        self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
278

279 280
        logger.info("downstream event socket bound to: %s",
                    downstream_event_param)
281
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
282
        logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
283 284

        # register socket triggers
285
        self.downstream_event.setup_recv_callback(self.do_downstream_receive)
286
        self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
287

288
        # create managers
289 290 291 292 293
        self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
        self.container_manager = ContainerManager(
           self.resource_manager,
           perfwrapper=self.config.argo_perf_wrapper,
           linuxperf=self.config.perf,
294 295
           argo_nodeos_config=self.config.argo_nodeos_config,
           pmpi_lib=self.config.pmpi_lib,
296
           )
297
        self.application_manager = ApplicationManager()
298 299
        self.sensor_manager = SensorManager()
        pa = PowerActuator(self.sensor_manager)
300
        self.controller = Controller([pa])
301 302 303

        self.sensor_manager.start()
        self.machine_info = self.sensor_manager.do_update()
304 305 306 307

        # setup periodic sensor updates
        self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
        self.sensor_cb.start()
308 309 310 311 312 313

        self.control = ioloop.PeriodicCallback(self.do_control, 1000)
        self.control.start()

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)
314
        signal.signal(signal.SIGCHLD, self.do_signal)
315 316 317 318

        ioloop.IOLoop.current().start()


319
def runner(config):
320
    ioloop.install()
321 322 323 324 325 326 327 328

    logger.setLevel(logging.DEBUG)

    if config.nrm_log:
        print("Logging to %s" % config.nrm_log)
        logger.addHandler(logging.FileHandler(config.nrm_log))

    daemon = Daemon(config)
329
    daemon.main()