Commit 8c544aef authored by Valentin Reis's avatar Valentin Reis

[feature] wip bandit controller

parent 7644538d
from __future__ import print_function
import logging
import time
logger = logging.getLogger('nrm')
......@@ -56,8 +57,11 @@ class Application(object):
def update_progress(self, msg):
"""Update the progress tracking."""
assert self.progress
payload = self.progress + float(msg['payload'])
def reset_progress(self, msg):
"""Update the progress tracking."""
self.progress = 0
class ApplicationManager(object):
......
from __future__ import print_function
import logging
import itertools
import numpy
logger = logging.getLogger('nrm')
class Action(object):
"""Information about a control action."""
def __init__(self, target, command, delta):
self.target = target
self.command = command
self.delta = delta
class ApplicationActuator(object):
"""Actuator in charge of application thread control."""
......@@ -23,14 +22,17 @@ class ApplicationActuator(object):
self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target):
ret = []
for identity, application in \
self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta))
return ret
def available_actions(self):
pass
# TODO:return all the possible thread commands.
# ret = []
# for identity, application in \
# self.application_manager.applications.iteritems():
# if target in application.get_allowed_thread_requests():
# delta = application.get_thread_request_impact(target)
# ret.append(Action(application, target, delta))
# return ret
def execute(self, action):
target_threads = action.target.threads
......@@ -51,27 +53,27 @@ class ApplicationActuator(object):
def update(self, action):
action.target.do_thread_transition(action.command)
class DiscretizedPowerActuator(object):
class PowerActuator(object):
"""Actuator in charge of power control."""
"""Actuator in charge of power control via discretization."""
def __init__(self, sm):
def __init__(self, sm, lowerboundwatts, n):
self.sensor_manager = sm
self.lowerboundwatts = lowerboundwatts # the minimal cpu wattage
self.n = n # the number of arms
def available_actions(self, target):
def available_actions(self):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("power limits: %r:", pl)
if target == 'i':
for k in pl:
r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW']))
actions.extend([Action(k, s, s - r[0]) for s in r])
elif target == 'd':
for k in pl:
r = range(1, int(pl[k]['curW']))
actions.extend([Action(k, s, r[-1] - s) for s in r])
return actions
logger.info("BanditPowerActuator: power limits %r", pl)
maxW = int(pl[k]['maxW'])
if maxW > self.lowerboundwatts:
logger.error( "BanditPowerActuator: The provided power lowerbound is"\
"lower than the available maximum CPU wattage.")
arms = [self.lowerboundwatts + (float(a)*rangeW/float(n)) for a in range(1,n+1)]
logger.info("BanditPowerActuator: discretized power limits: %r:", arms)
actions = [Action(target,a,target-a) for a in arms]
return(actions)
def execute(self, action):
logger.info("changing power limit: %r, %r", action.command, action.delta)
......@@ -81,38 +83,94 @@ class PowerActuator(object):
pass
class Controller(object):
class BasicPowerLoss(object):
def __init__(self, a, b, power_min, power_max, progress_min, progress_max):
assert(a < b)
self.a = a
self.b = b
self.power_min = 100000000
self.power_max = 0
self.progress_min = 1000000000
self.progress_max = 0
def perf(self,progress,power):
if power>power_max: power_max = power
if power<power_min: power_min = power
if progress>progress_max: progress_max = progress
if progress<progress_min: progress_min = progress
return((self.a*(power-power_min)/(power_max-power_min)) +
(self.b*(progress-progress_min)(progress_max-progress_min)))
class EpsGreedyBandit(object):
"""Epsilon greedy bandit. Actions in O,..,k-1."""
def __init__(self, epsilon, k):
assert(k>=1)
assert(0<=epsilon)
assert(epsilon<=1)
self.losses = [0 for a in range(0,k)]
self.plays = [0 for a in range(0,k)]
self.a=None
self.n=0
self.k=k
self.eps=epsilon
def next(self, loss):
assert(loss >= 0)
if self.a:
self.losses[self.a]=self.losses[self.a]+loss
self.plays[self.a]=self.plays[self.a]+1
self.n=self.n+1
if self.n <= self.k:
self.a = self.n-1
else:
if numpy.random.binomial(1,self.epsilon) == 1:
self.a=numpy.random.randint(0,self.k)
else:
self.a=numpy.argmin([self.losses])
return(self.a)
"""Implements a control loop for resource management."""
class BanditController(object):
"""Implements a bandit control loop for resource management."""
def __init__(self, actuators):
def __init__(self, actuators, initialization_rounds=20, exploration=0.2, enforce=None):
self.actuators = actuators
def planify(self, target, machineinfo):
self.initialization_rounds = 20
self.actions = itertools.product(*[act.available_actions() for a in actuators])
self.loss = BasicPowerLoss(1,-1)
self.bandit = EpsGreedyBandit(exploration,len(self.actions))
self.n=0
if enforce:
assert(enforce>=0)
assert(enforce<len(self.actions))
self.enforce=enforce
def planify(self, target, machineinfo, applications):
"""Plan the next action for the control loop."""
total_power = machineinfo['energy']['power']['total']
direction = None
if total_power < target:
direction = 'i'
elif total_power > target:
direction = 'd'
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop(0)
else:
return (None, None)
total_progress = sum([a.progress for a in applications.values()])
total_power = float(machineinfo['energy']['power']['total'])
logger.info("Controller: Reading progress %s and power %s." %(total_progress,total_power))
loss = self.loss(progress=total_progress,power=total_power)
logger.info("Controller: Computing loss %s." %loss)
if self.enforce:
logger.info("Controller: enforced action.")
a=self.enforce
if self.n>self.initialization_rounds:
logger.info("Controller: playing bandit.")
a=self.bandit.next(loss)
else:
logger.info("Controller: estimating max power/max progress ranges.")
a=self.n % k
action = self.actions[a]
logger.info("Controller: playing arm id %a (powercap '%r')." %(a,action.command))
return(list(action),self.actuators)
def execute(self, action, actuator):
def execute(self, actions, actuators):
"""Build the action for the appropriate manager."""
actuator.execute(action)
for action, actuator in zip(actions,actuators):
actuator.execute(action)
def update(self, action, actuator):
"""Update tracking across the board to reflect the last action."""
actuator.update(action)
for action, actuator in zip(actions,actuators):
actuator.update(action)
......@@ -2,7 +2,7 @@ from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator
from controller import BanditController, ApplicationActuator, DiscretizedPowerActuator
from functools import partial
import json
import logging
......@@ -127,11 +127,11 @@ class Daemon(object):
logger.info("sending sensor message: %r", msg)
def do_control(self):
plan = self.controller.planify(self.target, self.machine_info)
action, actuator = plan
plan = self.controller.planify(self.target, self.machine_info, self.application_manager.applications)
actions, actuators = plan
if action:
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
self.controller.execute(actions, actuators)
self.controller.update(actions, actuators)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -221,9 +221,9 @@ class Daemon(object):
self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager()
aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = PowerActuator(self.sensor_manager)
self.controller = Controller([aa, pa])
# aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = DiscretizedPowerActuator(self.sensor_manager,100,4)
self.controller = BanditController([pa])
self.sensor_manager.start()
self.machine_info = self.sensor_manager.do_update()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment