Commit 26e9c239 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

[feature] Add PowerActuator and update control

This patch adds a poweractuator based on rapl settings available through
the sensor manager. Adding this actuator forces us to use a list of
actuators in the controller, changing a bit the structure of the code.
parent 57840dc9
from __future__ import print_function from __future__ import print_function
import logging import logging
from operator import attrgetter
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
...@@ -20,8 +19,9 @@ class ApplicationActuator(object): ...@@ -20,8 +19,9 @@ class ApplicationActuator(object):
"""Actuator in charge of application thread control.""" """Actuator in charge of application thread control."""
def __init__(self, am): def __init__(self, am, pubstream):
self.application_manager = am self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target): def available_actions(self, target):
ret = [] ret = []
...@@ -32,16 +32,55 @@ class ApplicationActuator(object): ...@@ -32,16 +32,55 @@ class ApplicationActuator(object):
ret.append(Action(application, target, delta)) ret.append(Action(application, target, delta))
return ret return ret
def execute(self, action):
target_threads = action.target.threads
update = {'type': 'application',
'command': 'threads',
'uuid': action.target.uuid,
'event': 'threads',
}
if action.command == 'i':
payload = target_threads['cur'] + 1
elif action.command == 'd':
payload = target_threads['cur'] - 1
else:
assert False, "impossible command"
update['payload'] = payload
self.pubstream.send_json(update)
def update(self, action):
action.target.do_thread_transition(action.command)
class PowerActuator(object):
"""Actuator in charge of power control."""
def __init__(self, sm):
self.sensor_manager = sm
def available_actions(self, target):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("power limits: %r:", pl)
for k in pl:
r = range(int(pl[k]['curW']), int(pl[k]['maxW']))
actions.extend([Action(k, s, s - r[0]) for s in r])
return actions
def execute(self, action):
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
pass
class Controller(object): class Controller(object):
"""Implements a control loop for resource management.""" """Implements a control loop for resource management."""
def __init__(self, am, cm, rm): def __init__(self, actuators):
self.application_manager = am self.actuators = actuators
self.container_manager = cm
self.resource_manager = rm
self.app_actuator = ApplicationActuator(am)
def planify(self, target, machineinfo): def planify(self, target, machineinfo):
"""Plan the next action for the control loop.""" """Plan the next action for the control loop."""
...@@ -53,33 +92,21 @@ class Controller(object): ...@@ -53,33 +92,21 @@ class Controller(object):
direction = 'd' direction = 'd'
if direction: if direction:
actions = self.app_actuator.available_actions(direction) actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions: if actions:
# TODO: better choice # TODO: better choice
actions.sort(key=attrgetter('delta')) actions.sort(key=lambda x: x[0].delta)
return actions.pop() return actions.pop()
else: else:
return None return (None, None)
def execute(self, action): def execute(self, action, actuator):
"""Build the action for the appropriate manager.""" """Build the action for the appropriate manager."""
assert action actuator.execute(action)
target_threads = action.target.threads
update = {'type': 'application',
'command': 'threads',
'uuid': action.target.uuid,
'event': 'threads',
}
if action.command == 'i':
payload = target_threads['cur'] + 1
elif action.command == 'd':
payload = target_threads['cur'] - 1
else:
assert False, "impossible command"
update['payload'] = payload
return update
def update(self, action, request): def update(self, action, actuator):
"""Update tracking across the board to reflect the last action.""" """Update tracking across the board to reflect the last action."""
assert action actuator.update(action)
action.target.do_thread_transition(action.command)
...@@ -2,13 +2,13 @@ from __future__ import print_function ...@@ -2,13 +2,13 @@ from __future__ import print_function
from applications import ApplicationManager from applications import ApplicationManager
from containers import ContainerManager from containers import ContainerManager
from controller import Controller from controller import Controller, ApplicationActuator, PowerActuator
from functools import partial from functools import partial
import json import json
import logging import logging
import os import os
from resources import ResourceManager from resources import ResourceManager
import sensor from sensor import SensorManager
import signal import signal
import zmq import zmq
from zmq.eventloop import ioloop, zmqstream from zmq.eventloop import ioloop, zmqstream
...@@ -116,7 +116,7 @@ class Daemon(object): ...@@ -116,7 +116,7 @@ class Daemon(object):
self.upstream_pub.send_json(update) self.upstream_pub.send_json(update)
def do_sensor(self): def do_sensor(self):
self.machine_info = self.sensor.do_update() self.machine_info = self.sensor_manager.do_update()
logger.info("current state: %r", self.machine_info) logger.info("current state: %r", self.machine_info)
total_power = self.machine_info['energy']['power']['total'] total_power = self.machine_info['energy']['power']['total']
msg = {'type': 'power', msg = {'type': 'power',
...@@ -127,11 +127,11 @@ class Daemon(object): ...@@ -127,11 +127,11 @@ class Daemon(object):
logger.info("sending sensor message: %r", msg) logger.info("sending sensor message: %r", msg)
def do_control(self): def do_control(self):
action = self.controller.planify(self.target, self.machine_info) plan = self.controller.planify(self.target, self.machine_info)
action, actuator = plan
if action: if action:
msg = self.controller.execute(action) self.controller.execute(action, actuator)
self.downstream_pub.send_json(msg) self.controller.update(action, actuator)
self.controller.update(action, msg)
def do_signal(self, signum, frame): def do_signal(self, signum, frame):
if signum == signal.SIGINT: if signum == signal.SIGINT:
...@@ -169,7 +169,7 @@ class Daemon(object): ...@@ -169,7 +169,7 @@ class Daemon(object):
pass pass
def do_shutdown(self): def do_shutdown(self):
self.sensor.stop() self.sensor_manager.stop()
ioloop.IOLoop.current().stop() ioloop.IOLoop.current().stop()
def main(self): def main(self):
...@@ -220,14 +220,13 @@ class Daemon(object): ...@@ -220,14 +220,13 @@ class Daemon(object):
self.resource_manager = ResourceManager() self.resource_manager = ResourceManager()
self.container_manager = ContainerManager(self.resource_manager) self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager() self.application_manager = ApplicationManager()
self.controller = Controller(self.application_manager, self.sensor_manager = SensorManager()
self.container_manager, aa = ApplicationActuator(self.application_manager, self.downstream_pub)
self.resource_manager) pa = PowerActuator(self.sensor_manager)
self.controller = Controller([aa, pa])
# create sensor manager and make first measurement
self.sensor = sensor.SensorManager() self.sensor_manager.start()
self.sensor.start() self.machine_info = self.sensor_manager.do_update()
self.machine_info = self.sensor.do_update()
# setup periodic sensor updates # setup periodic sensor updates
self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000) self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
......
...@@ -34,3 +34,11 @@ class SensorManager: ...@@ -34,3 +34,11 @@ class SensorManager:
machine_info['energy'] = self.rapl.sample(accflag=True) machine_info['energy'] = self.rapl.sample(accflag=True)
machine_info['temperature'] = self.coretemp.sample() machine_info['temperature'] = self.coretemp.sample()
return machine_info return machine_info
def get_powerlimits(self):
pl = self.rapl.get_powerlimits()
# only return enabled domains
return {k: pl[k] for k in pl if pl[k]['enabled']}
def set_powerlimits(self, domain, value):
self.rapl.set_powerlimit(value, domain)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment