controller.py 5.48 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

11 12 13 14 15 16 17 18 19 20 21
from __future__ import print_function

import logging

logger = logging.getLogger('nrm')


class Action(object):

    """Information about a control action."""

22
    def __init__(self, target, command, delta):
23 24
        self.target = target
        self.command = command
25 26 27
        self.delta = delta


28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
# class ApplicationActuator(object):
#
#    """Actuator in charge of application thread control."""
#
#    def __init__(self, am, pubstream):
#        self.application_manager = am
#        self.pubstream = pubstream
#
#    def available_actions(self, target):
#        ret = []
#        for identity, application in \
#                self.application_manager.applications.iteritems():
#            if target in application.get_allowed_thread_requests():
#                delta = application.get_thread_request_impact(target)
#                ret.append(Action(application, target, delta))
#        return ret
#
#    def execute(self, action):
#        target_threads = action.target.threads
#        update = {'type': 'application',
#                  'command': 'threads',
#                  'uuid': action.target.uuid,
#                  'event': 'threads',
#                  }
#        if action.command == 'i':
#            payload = target_threads['cur'] + 1
#        elif action.command == 'd':
#            payload = target_threads['cur'] - 1
#        else:
#            assert False, "impossible command"
#        update['payload'] = payload
#        self.pubstream.send_json()
#
#    def update(self, action):
#        action.target.do_thread_transition(action.command)
63 64 65 66 67 68 69 70 71 72 73 74 75


class PowerActuator(object):

    """Actuator in charge of power control."""

    def __init__(self, sm):
        self.sensor_manager = sm

    def available_actions(self, target):
        actions = []
        pl = self.sensor_manager.get_powerlimits()
        logger.info("power limits: %r:", pl)
76 77
        if target == 'i':
            for k in pl:
78
                r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW']))
79 80 81
                actions.extend([Action(k, s, s - r[0]) for s in r])
        elif target == 'd':
            for k in pl:
82 83
                r = range(1, int(pl[k]['curW']))
                actions.extend([Action(k, s, r[-1] - s) for s in r])
84 85 86
        return actions

    def execute(self, action):
87 88
        logger.info("changing power limit: %r, %r", action.command,
                    action.delta)
89 90 91 92 93
        self.sensor_manager.set_powerlimit(action.target, action.command)

    def update(self, action):
        pass

94 95 96 97 98

class Controller(object):

    """Implements a control loop for resource management."""

99 100
    def __init__(self, actuators):
        self.actuators = actuators
101 102 103

    def planify(self, target, machineinfo):
        """Plan the next action for the control loop."""
104 105 106 107 108 109 110
        try:
            total_power = machineinfo['energy']['power']['total']
        except TypeError:
            logging.error("\"machineinfo\" malformed. Can not run "
                          "control loop.")
            return (None, None)

111
        direction = None
112
        if total_power < target:
113
            direction = 'i'
114
        elif total_power > target:
115 116 117
            direction = 'd'

        if direction:
118 119 120 121
            actions = []
            for act in self.actuators:
                newactions = act.available_actions(direction)
                actions.extend([(a, act) for a in newactions])
122 123
            if actions:
                # TODO: better choice
124
                actions.sort(key=lambda x: x[0].delta)
125
                return actions.pop(0)
126
            else:
127
                return (None, None)
128

129
    def execute(self, action, actuator):
130
        """Build the action for the appropriate manager."""
131
        actuator.execute(action)
132

133
    def update(self, action, actuator):
134
        """Update tracking across the board to reflect the last action."""
135
        actuator.update(action)
136

137 138
    def run_policy_container(self, container, application):
        """Run policies on a container."""
139
        ids = container.resources.cpus
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
        pcs = application.phase_contexts
        # Run policy only if all phase contexts have been received
        if not filter(lambda i: not pcs[i]['set'], ids):
            # Only run policy if all phase contexts are an
            # aggregation of same number of phases
            aggs = [pcs[i]['aggregation'] for i in ids]
            if aggs.count(aggs[0]) == len(aggs):
                container.power['manager'].run_policy(pcs)
                if filter(lambda i: pcs[i]['set'], ids):
                    logger.debug("Phase context not reset %r", application)
            else:
                container.power['manager'].reset_all()
                for i in ids:
                    pcs[i]['set'] = False

155 156 157
    def run_policy(self, containers):
        """Run policies on containers with policies set."""
        for container in containers:
158 159
            p = containers[container].power
            if p['policy']:
160 161 162 163
                apps = self.actuators[0].application_manager.applications
                if apps:
                    app = next(apps[a] for a in apps if apps[a].container_uuid
                               == container)
164
                    self.run_policy_container(containers[container], app)