Commit ee5b31e0 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

Merge branch 'big-controller-work' into 'master'

Implement control loop

See merge request !12
parents 8fa7a4e1 36206879
......@@ -34,6 +34,16 @@ class Application(object):
def get_allowed_thread_requests(self):
return self.thread_fsm_table[self.thread_state].keys()
def get_thread_request_impact(self, command):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
return speed
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
class Action(object):
"""Information about a control action."""
def __init__(self, target, command, delta): = target
self.command = command = delta
class ApplicationActuator(object):
"""Actuator in charge of application thread control."""
def __init__(self, am, pubstream):
self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target):
ret = []
for identity, application in \
if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta))
return ret
def execute(self, action):
target_threads =
update = {'type': 'application',
'command': 'threads',
'event': 'threads',
if action.command == 'i':
payload = target_threads['cur'] + 1
elif action.command == 'd':
payload = target_threads['cur'] - 1
assert False, "impossible command"
update['payload'] = payload
def update(self, action):
class PowerActuator(object):
"""Actuator in charge of power control."""
def __init__(self, sm):
self.sensor_manager = sm
def available_actions(self, target):
actions = []
pl = self.sensor_manager.get_powerlimits()"power limits: %r:", pl)
if target == 'i':
for k in pl:
r = range(int(pl[k]['curW']), int(pl[k]['maxW']))
actions.extend([Action(k, s, s - r[0]) for s in r])
elif target == 'd':
for k in pl:
r = range(0, int(pl[k]['curW']))
actions.extend([Action(k, s, s - r[-1]) for s in r])
return actions
def execute(self, action):"changing power limit: %r", action)
self.sensor_manager.set_powerlimit(, action.command)
def update(self, action):
class Controller(object):
"""Implements a control loop for resource management."""
def __init__(self, actuators):
self.actuators = actuators
def planify(self, target, machineinfo):
"""Plan the next action for the control loop."""
total_power = machineinfo['energy']['power']['total']
direction = None
if total_power < target:
direction = 'i'
elif total_power > target:
direction = 'd'
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop()
return (None, None)
def execute(self, action, actuator):
"""Build the action for the appropriate manager."""
def update(self, action, actuator):
"""Update tracking across the board to reflect the last action."""
......@@ -2,12 +2,13 @@ from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator
from functools import partial
import json
import logging
import os
from resources import ResourceManager
import sensor
from sensor import SensorManager
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
......@@ -115,7 +116,7 @@ class Daemon(object):
def do_sensor(self):
self.machine_info = self.sensor.do_update()
self.machine_info = self.sensor_manager.do_update()"current state: %r", self.machine_info)
total_power = self.machine_info['energy']['power']['total']
msg = {'type': 'power',
......@@ -126,29 +127,11 @@ class Daemon(object):"sending sensor message: %r", msg)
def do_control(self):
total_power = self.machine_info['energy']['power']['total']
for identity, application in \
update = {'type': 'application',
'command': 'threads',
'uuid': identity,
'event': 'threads',
if total_power <
if 'i' in application.get_allowed_thread_requests():
update['payload'] = application.threads['cur'] + 1
elif total_power >
if 'd' in application.get_allowed_thread_requests():
update['payload'] = application.threads['cur'] - 1
continue"application now in state: %s",
plan = self.controller.planify(, self.machine_info)
action, actuator = plan
if action:
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -186,7 +169,7 @@ class Daemon(object):
def do_shutdown(self):
def main(self):
......@@ -237,11 +220,13 @@ class Daemon(object):
self.resource_manager = ResourceManager()
self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager()
aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = PowerActuator(self.sensor_manager)
self.controller = Controller([aa, pa])
# create sensor manager and make first measurement
self.sensor = sensor.SensorManager()
self.machine_info = self.sensor.do_update()
self.machine_info = self.sensor_manager.do_update()
# setup periodic sensor updates
self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
......@@ -34,3 +34,11 @@ class SensorManager:
machine_info['energy'] = self.rapl.sample(accflag=True)
machine_info['temperature'] = self.coretemp.sample()
return machine_info
def get_powerlimits(self):
pl = self.rapl.get_powerlimits()
# only return enabled domains
return {k: pl[k] for k in pl if pl[k]['enabled']}
def set_powerlimit(self, domain, value):
self.rapl.set_powerlimit(value, domain)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment