...
 
Commits (24)
......@@ -46,3 +46,6 @@ venv/
*.log
*.nav
*.out
#nix
result
......@@ -11,3 +11,8 @@ two years ago.
| **Systemwide Power Management with Argo**
| Dan Ellsworth, Tapasya Patki, Swann Perarnau, Pete Beckman *et al*
| In *High-Performance, Power-Aware Computing (HPPAC)*, 2016.
## Packaging
building:
nix-build -A nrm
......@@ -30,7 +30,7 @@ class PerfWrapper(object):
def progress_report(self, progress):
update = {'type': 'application',
'event': 'progress',
'event': 'hardwareprogress',
'payload': progress,
'uuid': self.app_uuid,
}
......
#!/usr/bin/env python2
"""NRM Daemon.
Usage:
daemon bandit
[-k <k> | --discretization=<k>] [-l <watts> | --lowerboundwatts=<watts>]
[-e <eps> | --epsilon=<eps>] [-o <k> | --log_power=<log>]
[-p <period> | --period=<period>]
daemon enforce <enforce>
[-k <k> | --discretization <k>] [-l <watts> | --lowerboundwatts <watts>]
[-o <n> | --log_power=<log>]
[-p <period> | --period=<period>]
Arguments:
<enforce> The powercontrol policy to enforce. [default: None]
Options:
-h --help Show this screen.
-k <k> --discretization=<k> The discretization count to use in powercontrol.
[default: 4]
-l <watts> --lowerboundwatts=<watts> The minimum cpu wattage allowed. [default: 100]
-e <eps> --epsilon=<eps> The exploration constant for bandit powercontrol.
[default: 0.1]
-o <log> --log_power=<log> The log file for power use.
-p <period> --period=<period> The period for power control.
Try: daemon bandit -k 4 -l 100 -e 0.1
daemon bandit -k 4 -l 100 -e 1 #random bandit
daemon enforce 0 -k 2 #enforce the lower bound
daemon enforce 1 -k 2 #enforce the maximum
"""
from docopt import docopt
import nrm
import nrm.daemon
if __name__ == "__main__":
nrm.daemon.runner()
arguments = docopt(__doc__, version='Naval Fate 2.0')
print(arguments)
if arguments["enforce"]:
arguments["<enforce>"]=int(arguments["<enforce>"])
if arguments["--period"] is not None:
arguments["--period"]=int(arguments["--period"])
nrm.daemon.runner(power_discretization=int(arguments["--discretization"]),
enforce_powerpolicy=arguments["<enforce>"],
exploration_constant=float(arguments["--epsilon"]),
lowerboundwatts=int(arguments["--lowerboundwatts"]),
log_power=arguments["--log_power"],
period=arguments["--period"])
{
pkgs ? import ( fetchTarball "https://github.com/NixOS/nixpkgs/archive/17.09.tar.gz") {},
}:
let
callPackage = pkgs.lib.callPackageWith (pkgs // pkgs.xlibs // self);
self = rec {
# Freeze python version to 3.5
pythonPackages = pkgs.python27Packages;
python = pkgs.python27;
nrm = callPackage ./nrm.nix { inherit pythonPackages; };
inherit pkgs;
};
in
self
{ stdenv, pythonPackages, hwloc, nrm-containers }:
pythonPackages.buildPythonPackage {
name = "nrm";
src = ./.;
propagatedBuildInputs = with pythonPackages;[ six numpy tornado pyzmq hwloc docopt nrm-containers];
buildInputs = with pythonPackages;[ pytest];
testPhase = '' pytest '';
}
from __future__ import print_function
import logging
import time
logger = logging.getLogger('nrm')
......@@ -21,7 +22,8 @@ class Application(object):
def __init__(self, uuid, container, progress, threads):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.progress = 0
self.hardwareprogress = 0
self.threads = threads
self.thread_state = 'stable'
......@@ -56,8 +58,15 @@ class Application(object):
def update_progress(self, msg):
"""Update the progress tracking."""
assert self.progress
if msg['event']=='progress':
self.progress = self.progress + float(msg['payload'])
elif msg['event']=='hardwareprogress':
self.hardwareprogress = self.hardwareprogress + float(msg['payload'])/10000.
def reset_progress(self):
"""Update the progress tracking."""
self.progress = 0
self.hardwareprogress = 0
class ApplicationManager(object):
......
from __future__ import print_function
import logging
import itertools
import numpy
import math
logger = logging.getLogger('nrm')
class Action(object):
"""Information about a control action."""
def __init__(self, target, command, delta):
self.target = target
self.command = command
self.delta = delta
class ApplicationActuator(object):
"""Actuator in charge of application thread control."""
......@@ -23,14 +23,17 @@ class ApplicationActuator(object):
self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target):
ret = []
for identity, application in \
self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta))
return ret
def available_actions(self):
pass
# TODO:return all the possible thread commands.
# ret = []
# for identity, application in \
# self.application_manager.applications.iteritems():
# if target in application.get_allowed_thread_requests():
# delta = application.get_thread_request_impact(target)
# ret.append(Action(application, target, delta))
# return ret
def execute(self, action):
target_threads = action.target.threads
......@@ -51,27 +54,28 @@ class ApplicationActuator(object):
def update(self, action):
action.target.do_thread_transition(action.command)
class DiscretizedPowerActuator(object):
class PowerActuator(object):
"""Actuator in charge of power control."""
"""Actuator in charge of power control via discretization."""
def __init__(self, sm):
def __init__(self, sm, lowerboundwatts, k):
self.sensor_manager = sm
self.lowerboundwatts = lowerboundwatts # the minimal cpu wattage
self.k = k # the number of arms
def available_actions(self, target):
def available_actions(self):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("power limits: %r:", pl)
if target == 'i':
for k in pl:
r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW']))
actions.extend([Action(k, s, s - r[0]) for s in r])
elif target == 'd':
for k in pl:
r = range(1, int(pl[k]['curW']))
actions.extend([Action(k, s, r[-1] - s) for s in r])
return actions
logger.info("BanditPowerActuator: power limits %r", pl)
maxW = int(pl[[k for k,i in pl.items()][0]]['maxW'])
if maxW < self.lowerboundwatts:
logger.error( "BanditPowerActuator: The provided power lowerbound is"\
"higher than the available maximum CPU wattage.")
rangeW=maxW-self.lowerboundwatts
arms = [self.lowerboundwatts + (float(a)*rangeW/float(self.k)) for a in range(1,self.k+1)]
logger.info("BanditPowerActuator: discretized power limits: %r:", arms)
actions = [Action([k for k,i in pl.items()][0],int(math.floor(a)),0) for a in arms]
return(actions)
def execute(self, action):
logger.info("changing power limit: %r, %r", action.command, action.delta)
......@@ -80,39 +84,125 @@ class PowerActuator(object):
def update(self, action):
pass
class BasicPowerLoss(object):
def __init__(self, alpha, power_max=0, progress_max=0):
self.alpha = alpha
self.power_max = power_max
self.progress_max = progress_max
def loss(self,progress,power):
if power>self.power_max: self.power_max = power
if progress>self.progress_max: self.progress_max = progress
power_n=power/max(0.001, self.power_max)
progress_n=progress/max(0.001, self.progress_max)
return(1. + 0.5 * (self.alpha* power_n + (self.alpha-1)*progress_n))
class EpsGreedyBandit(object):
"""Epsilon greedy bandit. Actions in O,..,k-1."""
def __init__(self, epsilon, k):
assert(k>=1)
assert(0<=epsilon)
assert(epsilon<=1)
self.losses = [0 for a in range(0,k)]
self.plays = [0 for a in range(0,k)]
self.a=None
self.n=0
self.k=k
self.epsilon=epsilon
def next(self, loss):
assert(loss >= 0)
if self.a is not None:
self.losses[self.a]=self.losses[self.a]+loss
self.plays[self.a]=self.plays[self.a]+1
self.n=self.n+1
logging.info("Bandit: the total plays are:%s" %str(self.plays))
logging.info("Bandit: the estimated losses are:%s" %str([l/(max(1,p)) for l,p in zip(self.losses,self.plays)]))
if self.n <= self.k:
self.a = self.n-1
else:
if numpy.random.binomial(1,self.epsilon) == 1:
self.a=numpy.random.randint(0,self.k)
else:
self.a=numpy.argmin([l/float(n) for l,n in zip(self.losses,self.plays)])
return(self.a)
class Controller(object):
"""Implements a control loop for resource management."""
class BanditController(object):
"""Implements a bandit control loop for resource management."""
def __init__(self, actuators):
def __init__(self, actuators, initialization_rounds=None,
exploration=0.2, enforce=None, log_power=None):
self.actuators = actuators
def planify(self, target, machineinfo):
self.actions = [a for a in itertools.product(*[act.available_actions() for act in actuators])]
self.initialization_rounds = len(self.actions)*2
self.loss = BasicPowerLoss(0.5)
self.exploration=exploration
self.bandit = EpsGreedyBandit(exploration,len(self.actions))
self.last_e=0
self.n=0
if enforce is not None:
assert(enforce>=0)
assert(enforce<len(self.actions))
self.enforce=enforce
self.log_power=log_power
if self.log_power is not None:
self.log_power.write("hardwareprogress progress power loss a desc p_inst\n")
self.log_power.flush()
def planify(self, target, machineinfo, applications):
"""Plan the next action for the control loop."""
total_power = machineinfo['energy']['power']['total']
direction = None
if total_power < target:
direction = 'i'
elif total_power > target:
direction = 'd'
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop(0)
else:
return (None, None)
def execute(self, action, actuator):
current_e = float(machineinfo['energy']['energy']['cumulative']['package-0'])/(1000*1000) # in joules
current_p = float(machineinfo['energy']['power']['p0'])/(1000*1000) # in joules
if self.last_e==0:
self.last_e=current_e
return([],[])
else:
total_power = current_e - self.last_e
self.last_e = current_e
logger.info("Controller: Reading machineinfo %s." %(str(machineinfo)))
if len(applications)==0:
self.bandit = EpsGreedyBandit(self.exploration,len(self.actions))
self.n=0
if self.log_power is not None:
self.log_power.write("new application\n")
self.log_power.flush()
return([],[])
self.n=self.n+1
total_progress = sum([a.progress for a in applications.values()])
total_hardwareprogress = sum([a.hardwareprogress for a in applications.values()])
for a in applications.values():
a.reset_progress()
logger.info("Controller: applications %r" %applications.values())
logger.info("Controller: Reading progress %s and power %s."
%(total_progress,total_power))
loss = self.loss.loss(progress=total_progress,power=total_power)
logger.info("Controller: Incurring loss %s." %loss)
if self.enforce is not None:
logger.info("Controller: enforced action.")
a=self.enforce
elif self.n>self.initialization_rounds:
logger.info("Controller: playing bandit.")
a=self.bandit.next(loss)
else:
logger.info("Controller: estimating max power/max progress ranges.")
a=self.n % len(self.actions)
action = self.actions[a]
logger.info("Controller: playing arm id %s (powercap '%s')."
%(str(a),str([act.command for act in list(action)])))
if self.log_power is not None:
self.log_power.write("%s %s %s %s %s %s %s\n"
%(str(total_hardwareprogress),str(total_progress),str(total_power),str(loss),
str(a),str([act.command for act in list(action)]),current_p))
self.log_power.flush()
return(list(action),self.actuators)
def execute(self, actions, actuators):
"""Build the action for the appropriate manager."""
actuator.execute(action)
for action, actuator in zip(actions,actuators):
actuator.execute(action)
def update(self, action, actuator):
def update(self, actions, actuators):
"""Update tracking across the board to reflect the last action."""
actuator.update(action)
for action, actuator in zip(actions,actuators):
actuator.update(action)
......@@ -233,7 +233,8 @@ class rapl_reader:
ret['energy'] = dict()
for k in sorted(e.keys()):
if k != 'time':
ret['energy'][self.shortenkey(k)] = e[k]
ret['energy'][self.shortenkey(k)] = de[k]
ret['energy']['cumulative'] = self.totalenergy
ret['power'] = dict()
totalpower = 0.0
......
......@@ -2,7 +2,7 @@ from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator
from controller import BanditController, ApplicationActuator, DiscretizedPowerActuator
from functools import partial
import json
import logging
......@@ -18,8 +18,19 @@ logger = logging.getLogger('nrm')
class Daemon(object):
def __init__(self):
def __init__(self,power_discretization=4,
enforce_powerpolicy=False,
lowerboundwatts=100,
exploration_constant=0.1,
log_power=None,
period=5000):
self.target = 100.0
self.log_power = log_power
self.period = period
self.k = power_discretization
self.eps = exploration_constant
self.enforce = enforce_powerpolicy
self.lowerboundwatts = lowerboundwatts
def do_downstream_receive(self, parts):
logger.info("receiving downstream message: %r", parts)
......@@ -45,6 +56,11 @@ class Daemon(object):
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'hardwareprogress':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'exit':
self.application_manager.delete(msg['uuid'])
else:
......@@ -127,11 +143,10 @@ class Daemon(object):
logger.info("sending sensor message: %r", msg)
def do_control(self):
plan = self.controller.planify(self.target, self.machine_info)
action, actuator = plan
if action:
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
self.do_sensor()
actions, actuators = self.controller.planify(self.target, self.machine_info, self.application_manager.applications)
self.controller.execute(actions, actuators)
self.controller.update(actions, actuators)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -221,18 +236,19 @@ class Daemon(object):
self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager()
aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = PowerActuator(self.sensor_manager)
self.controller = Controller([aa, pa])
# aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = DiscretizedPowerActuator(self.sensor_manager,lowerboundwatts=self.lowerboundwatts,k=self.k)
self.controller = BanditController([pa],enforce=self.enforce,
exploration=self.eps,log_power=self.log_power)
self.sensor_manager.start()
self.machine_info = self.sensor_manager.do_update()
# setup periodic sensor updates
self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000)
self.sensor_cb.start()
# self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, self.period / 5)
# self.sensor_cb.start()
self.control = ioloop.PeriodicCallback(self.do_control, 1000)
self.control = ioloop.PeriodicCallback(self.do_control, self.period)
self.control.start()
# take care of signals
......@@ -242,8 +258,24 @@ class Daemon(object):
ioloop.IOLoop.current().start()
def runner():
def runner(power_discretization=4,
enforce_powerpolicy=False,
lowerboundwatts=100,
exploration_constant=0.1,
log_power=None,
period=5000):
ioloop.install()
logging.basicConfig(level=logging.DEBUG)
daemon = Daemon()
daemon.main()
if log_power is not None: log_power=open(log_power,'w',0)
try:
daemon = Daemon(power_discretization=power_discretization,
enforce_powerpolicy=enforce_powerpolicy,
lowerboundwatts=lowerboundwatts,
exploration_constant=exploration_constant,
log_power=log_power)
daemon.main()
if log_power!=None:
log_power.close()
except:
if log_power!=None:
log_power.close()
"""Various clients for system utilities."""
import collections
import logging
import os
import xml.etree.ElementTree
import tornado.process as process
import subprocess
......@@ -42,7 +43,12 @@ class NodeOSClient(object):
def __init__(self):
"""Load client configuration."""
self.prefix = "argo_nodeos_config"
if 'ARGO_NODEOS_CONFIG' in os.environ:
logger.warning("NodeOSClient: bypassing argo_nodeos_config with %s\n" %os.environ['ARGO_NODEOS_CONFIG'])
self.prefix=os.environ['ARGO_NODEOS_CONFIG']
else:
logger.warning("NodeOSClient: using argo_nodeos_config from path")
self.prefix = "argo_nodeos_config"
def getavailable(self):
"""Gather available resources."""
......
......@@ -20,6 +20,6 @@ setup(
],
packages=find_packages(),
install_requires=['six', 'pyzmq', 'tornado', 'numpy'],
install_requires=['six', 'pyzmq', 'tornado', 'numpy', 'docopt'],
scripts=['bin/daemon', 'bin/app', 'bin/cmd', 'bin/argo-perf-wrapper']
)