controller.py 4.64 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
from __future__ import print_function

import logging

logger = logging.getLogger('nrm')


class Action(object):

    """Information about a control action."""

12
    def __init__(self, target, command, delta):
13 14
        self.target = target
        self.command = command
15 16 17 18 19 20 21
        self.delta = delta


class ApplicationActuator(object):

    """Actuator in charge of application thread control."""

22
    def __init__(self, am, pubstream):
23
        self.application_manager = am
24
        self.pubstream = pubstream
25 26 27 28 29 30 31 32 33

    def available_actions(self, target):
        ret = []
        for identity, application in \
                self.application_manager.applications.iteritems():
            if target in application.get_allowed_thread_requests():
                delta = application.get_thread_request_impact(target)
                ret.append(Action(application, target, delta))
        return ret
34

35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
    def execute(self, action):
        target_threads = action.target.threads
        update = {'type': 'application',
                  'command': 'threads',
                  'uuid': action.target.uuid,
                  'event': 'threads',
                  }
        if action.command == 'i':
            payload = target_threads['cur'] + 1
        elif action.command == 'd':
            payload = target_threads['cur'] - 1
        else:
            assert False, "impossible command"
        update['payload'] = payload
        self.pubstream.send_json(update)

    def update(self, action):
        action.target.do_thread_transition(action.command)


class PowerActuator(object):

    """Actuator in charge of power control."""

    def __init__(self, sm):
        self.sensor_manager = sm

    def available_actions(self, target):
        actions = []
        pl = self.sensor_manager.get_powerlimits()
        logger.info("power limits: %r:", pl)
66 67
        if target == 'i':
            for k in pl:
68
                r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW']))
69 70 71
                actions.extend([Action(k, s, s - r[0]) for s in r])
        elif target == 'd':
            for k in pl:
72 73
                r = range(1, int(pl[k]['curW']))
                actions.extend([Action(k, s, r[-1] - s) for s in r])
74 75 76
        return actions

    def execute(self, action):
Valentin Reis's avatar
Valentin Reis committed
77 78 79 80
        logger.info("changing power limit. command: %r, delta: %r, target: %r",
                    action.command, action.delta, action.target)
        # sensor_manager is a SensorManager, which is not only about sensing
        # but also about setting power limits.
81 82 83 84 85
        self.sensor_manager.set_powerlimit(action.target, action.command)

    def update(self, action):
        pass

86 87 88 89 90

class Controller(object):

    """Implements a control loop for resource management."""

91 92
    def __init__(self, actuators):
        self.actuators = actuators
93 94 95 96

    def planify(self, target, machineinfo):
        """Plan the next action for the control loop."""
        total_power = machineinfo['energy']['power']['total']
97
        direction = None
98
        if total_power < target:
99
            direction = 'i'
100
        elif total_power > target:
101 102 103
            direction = 'd'

        if direction:
104 105 106 107
            actions = []
            for act in self.actuators:
                newactions = act.available_actions(direction)
                actions.extend([(a, act) for a in newactions])
108 109
            if actions:
                # TODO: better choice
110
                actions.sort(key=lambda x: x[0].delta)
111
                return actions.pop(0)
112
            else:
113
                return (None, None)
114

115
    def execute(self, action, actuator):
116
        """Build the action for the appropriate manager."""
117
        actuator.execute(action)
118

119
    def update(self, action, actuator):
120
        """Update tracking across the board to reflect the last action."""
121
        actuator.update(action)
122 123 124 125

    def run_policy(self, containers):
        """Run policies on containers with policies set."""
        for container in containers:
126
            pp = containers[container].power
127 128 129 130 131 132 133 134 135 136 137 138
            if pp['policy']:
                apps = self.actuators[0].application_manager.applications
                if apps:
                    app = next(apps[a] for a in apps if apps[a].container_uuid
                               == container)
                    ids = containers[container].resources['cpus']
                    # Run policy only if all phase contexts have been received
                    if not filter(lambda i: not app.phase_contexts[i]['set'],
                                  ids):
                        pp['manager'].run_policy(app.phase_contexts)
                        if filter(lambda i: app.phase_contexts[i]['set'], ids):
                            logger.debug("Phase context not reset %r", app)