GitLab maintenance scheduled for Tomorrow, 2020-01-30, from 17:00 to 18:00 CT - Services will be unavailable during this time.

...
 
Commits (24)
...@@ -46,3 +46,6 @@ venv/ ...@@ -46,3 +46,6 @@ venv/
*.log *.log
*.nav *.nav
*.out *.out
#nix
result
...@@ -11,3 +11,8 @@ two years ago. ...@@ -11,3 +11,8 @@ two years ago.
| **Systemwide Power Management with Argo** | **Systemwide Power Management with Argo**
| Dan Ellsworth, Tapasya Patki, Swann Perarnau, Pete Beckman *et al* | Dan Ellsworth, Tapasya Patki, Swann Perarnau, Pete Beckman *et al*
| In *High-Performance, Power-Aware Computing (HPPAC)*, 2016. | In *High-Performance, Power-Aware Computing (HPPAC)*, 2016.
## Packaging
building:
nix-build -A nrm
...@@ -30,7 +30,7 @@ class PerfWrapper(object): ...@@ -30,7 +30,7 @@ class PerfWrapper(object):
def progress_report(self, progress): def progress_report(self, progress):
update = {'type': 'application', update = {'type': 'application',
'event': 'progress', 'event': 'hardwareprogress',
'payload': progress, 'payload': progress,
'uuid': self.app_uuid, 'uuid': self.app_uuid,
} }
......
#!/usr/bin/env python2 #!/usr/bin/env python2
"""NRM Daemon.
Usage:
daemon bandit
[-k <k> | --discretization=<k>] [-l <watts> | --lowerboundwatts=<watts>]
[-e <eps> | --epsilon=<eps>] [-o <k> | --log_power=<log>]
[-p <period> | --period=<period>]
daemon enforce <enforce>
[-k <k> | --discretization <k>] [-l <watts> | --lowerboundwatts <watts>]
[-o <n> | --log_power=<log>]
[-p <period> | --period=<period>]
Arguments:
<enforce> The powercontrol policy to enforce. [default: None]
Options:
-h --help Show this screen.
-k <k> --discretization=<k> The discretization count to use in powercontrol.
[default: 4]
-l <watts> --lowerboundwatts=<watts> The minimum cpu wattage allowed. [default: 100]
-e <eps> --epsilon=<eps> The exploration constant for bandit powercontrol.
[default: 0.1]
-o <log> --log_power=<log> The log file for power use.
-p <period> --period=<period> The period for power control.
Try: daemon bandit -k 4 -l 100 -e 0.1
daemon bandit -k 4 -l 100 -e 1 #random bandit
daemon enforce 0 -k 2 #enforce the lower bound
daemon enforce 1 -k 2 #enforce the maximum
"""
from docopt import docopt
import nrm import nrm
import nrm.daemon import nrm.daemon
if __name__ == "__main__": if __name__ == "__main__":
nrm.daemon.runner() arguments = docopt(__doc__, version='Naval Fate 2.0')
print(arguments)
if arguments["enforce"]:
arguments["<enforce>"]=int(arguments["<enforce>"])
if arguments["--period"] is not None:
arguments["--period"]=int(arguments["--period"])
nrm.daemon.runner(power_discretization=int(arguments["--discretization"]),
enforce_powerpolicy=arguments["<enforce>"],
exploration_constant=float(arguments["--epsilon"]),
lowerboundwatts=int(arguments["--lowerboundwatts"]),
log_power=arguments["--log_power"],
period=arguments["--period"])
{
pkgs ? import ( fetchTarball "https://github.com/NixOS/nixpkgs/archive/17.09.tar.gz") {},
}:
let
callPackage = pkgs.lib.callPackageWith (pkgs // pkgs.xlibs // self);
self = rec {
# Freeze python version to 3.5
pythonPackages = pkgs.python27Packages;
python = pkgs.python27;
nrm = callPackage ./nrm.nix { inherit pythonPackages; };
inherit pkgs;
};
in
self
{ stdenv, pythonPackages, hwloc, nrm-containers }:
pythonPackages.buildPythonPackage {
name = "nrm";
src = ./.;
propagatedBuildInputs = with pythonPackages;[ six numpy tornado pyzmq hwloc docopt nrm-containers];
buildInputs = with pythonPackages;[ pytest];
testPhase = '' pytest '';
}
from __future__ import print_function from __future__ import print_function
import logging import logging
import time
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
...@@ -21,7 +22,8 @@ class Application(object): ...@@ -21,7 +22,8 @@ class Application(object):
def __init__(self, uuid, container, progress, threads): def __init__(self, uuid, container, progress, threads):
self.uuid = uuid self.uuid = uuid
self.container_uuid = container self.container_uuid = container
self.progress = progress self.progress = 0
self.hardwareprogress = 0
self.threads = threads self.threads = threads
self.thread_state = 'stable' self.thread_state = 'stable'
...@@ -56,8 +58,15 @@ class Application(object): ...@@ -56,8 +58,15 @@ class Application(object):
def update_progress(self, msg): def update_progress(self, msg):
"""Update the progress tracking.""" """Update the progress tracking."""
assert self.progress if msg['event']=='progress':
self.progress = self.progress + float(msg['payload'])
elif msg['event']=='hardwareprogress':
self.hardwareprogress = self.hardwareprogress + float(msg['payload'])/10000.
def reset_progress(self):
"""Update the progress tracking."""
self.progress = 0
self.hardwareprogress = 0
class ApplicationManager(object): class ApplicationManager(object):
......
from __future__ import print_function from __future__ import print_function
import logging import logging
import itertools
import numpy
import math
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
class Action(object): class Action(object):
"""Information about a control action.""" """Information about a control action."""
def __init__(self, target, command, delta): def __init__(self, target, command, delta):
self.target = target self.target = target
self.command = command self.command = command
self.delta = delta self.delta = delta
class ApplicationActuator(object): class ApplicationActuator(object):
"""Actuator in charge of application thread control.""" """Actuator in charge of application thread control."""
...@@ -23,14 +23,17 @@ class ApplicationActuator(object): ...@@ -23,14 +23,17 @@ class ApplicationActuator(object):
self.application_manager = am self.application_manager = am
self.pubstream = pubstream self.pubstream = pubstream
def available_actions(self, target): def available_actions(self):
ret = [] pass
for identity, application in \ # TODO:return all the possible thread commands.
self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests(): # ret = []
delta = application.get_thread_request_impact(target) # for identity, application in \
ret.append(Action(application, target, delta)) # self.application_manager.applications.iteritems():
return ret # if target in application.get_allowed_thread_requests():
# delta = application.get_thread_request_impact(target)
# ret.append(Action(application, target, delta))
# return ret
def execute(self, action): def execute(self, action):
target_threads = action.target.threads target_threads = action.target.threads
...@@ -51,27 +54,28 @@ class ApplicationActuator(object): ...@@ -51,27 +54,28 @@ class ApplicationActuator(object):
def update(self, action): def update(self, action):
action.target.do_thread_transition(action.command) action.target.do_thread_transition(action.command)
class DiscretizedPowerActuator(object):
class PowerActuator(object): """Actuator in charge of power control via discretization."""
"""Actuator in charge of power control."""
def __init__(self, sm): def __init__(self, sm, lowerboundwatts, k):
self.sensor_manager = sm self.sensor_manager = sm
self.lowerboundwatts = lowerboundwatts # the minimal cpu wattage
self.k = k # the number of arms
def available_actions(self, target): def available_actions(self):
actions = [] actions = []
pl = self.sensor_manager.get_powerlimits() pl = self.sensor_manager.get_powerlimits()
logger.info("power limits: %r:", pl) logger.info("BanditPowerActuator: power limits %r", pl)
if target == 'i': maxW = int(pl[[k for k,i in pl.items()][0]]['maxW'])
for k in pl: if maxW < self.lowerboundwatts:
r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW'])) logger.error( "BanditPowerActuator: The provided power lowerbound is"\
actions.extend([Action(k, s, s - r[0]) for s in r]) "higher than the available maximum CPU wattage.")
elif target == 'd': rangeW=maxW-self.lowerboundwatts
for k in pl: arms = [self.lowerboundwatts + (float(a)*rangeW/float(self.k)) for a in range(1,self.k+1)]
r = range(1, int(pl[k]['curW'])) logger.info("BanditPowerActuator: discretized power limits: %r:", arms)
actions.extend([Action(k, s, r[-1] - s) for s in r]) actions = [Action([k for k,i in pl.items()][0],int(math.floor(a)),0) for a in arms]
return actions return(actions)
def execute(self, action): def execute(self, action):
logger.info("changing power limit: %r, %r", action.command, action.delta) logger.info("changing power limit: %r, %r", action.command, action.delta)
...@@ -80,39 +84,125 @@ class PowerActuator(object): ...@@ -80,39 +84,125 @@ class PowerActuator(object):
def update(self, action): def update(self, action):
pass pass
class BasicPowerLoss(object):
def __init__(self, alpha, power_max=0, progress_max=0):
self.alpha = alpha
self.power_max = power_max
self.progress_max = progress_max
def loss(self,progress,power):
if power>self.power_max: self.power_max = power
if progress>self.progress_max: self.progress_max = progress
power_n=power/max(0.001, self.power_max)
progress_n=progress/max(0.001, self.progress_max)
return(1. + 0.5 * (self.alpha* power_n + (self.alpha-1)*progress_n))
class EpsGreedyBandit(object):
"""Epsilon greedy bandit. Actions in O,..,k-1."""
def __init__(self, epsilon, k):
assert(k>=1)
assert(0<=epsilon)
assert(epsilon<=1)
self.losses = [0 for a in range(0,k)]
self.plays = [0 for a in range(0,k)]
self.a=None
self.n=0
self.k=k
self.epsilon=epsilon
def next(self, loss):
assert(loss >= 0)
if self.a is not None:
self.losses[self.a]=self.losses[self.a]+loss
self.plays[self.a]=self.plays[self.a]+1
self.n=self.n+1
logging.info("Bandit: the total plays are:%s" %str(self.plays))
logging.info("Bandit: the estimated losses are:%s" %str([l/(max(1,p)) for l,p in zip(self.losses,self.plays)]))
if self.n <= self.k:
self.a = self.n-1
else:
if numpy.random.binomial(1,self.epsilon) == 1:
self.a=numpy.random.randint(0,self.k)
else:
self.a=numpy.argmin([l/float(n) for l,n in zip(self.losses,self.plays)])
return(self.a)
class Controller(object): class BanditController(object):
"""Implements a bandit control loop for resource management."""
"""Implements a control loop for resource management."""
def __init__(self, actuators): def __init__(self, actuators, initialization_rounds=None,
exploration=0.2, enforce=None, log_power=None):
self.actuators = actuators self.actuators = actuators
self.actions = [a for a in itertools.product(*[act.available_actions() for act in actuators])]
def planify(self, target, machineinfo): self.initialization_rounds = len(self.actions)*2
self.loss = BasicPowerLoss(0.5)
self.exploration=exploration
self.bandit = EpsGreedyBandit(exploration,len(self.actions))
self.last_e=0
self.n=0
if enforce is not None:
assert(enforce>=0)
assert(enforce<len(self.actions))
self.enforce=enforce
self.log_power=log_power
if self.log_power is not None:
self.log_power.write("hardwareprogress progress power loss a desc p_inst\n")
self.log_power.flush()
def planify(self, target, machineinfo, applications):
"""Plan the next action for the control loop.""" """Plan the next action for the control loop."""
total_power = machineinfo['energy']['power']['total'] current_e = float(machineinfo['energy']['energy']['cumulative']['package-0'])/(1000*1000) # in joules
direction = None current_p = float(machineinfo['energy']['power']['p0'])/(1000*1000) # in joules
if total_power < target: if self.last_e==0:
direction = 'i' self.last_e=current_e
elif total_power > target: return([],[])
direction = 'd' else:
total_power = current_e - self.last_e
if direction: self.last_e = current_e
actions = [] logger.info("Controller: Reading machineinfo %s." %(str(machineinfo)))
for act in self.actuators: if len(applications)==0:
newactions = act.available_actions(direction) self.bandit = EpsGreedyBandit(self.exploration,len(self.actions))
actions.extend([(a, act) for a in newactions]) self.n=0
if actions: if self.log_power is not None:
# TODO: better choice self.log_power.write("new application\n")
actions.sort(key=lambda x: x[0].delta) self.log_power.flush()
return actions.pop(0) return([],[])
else: self.n=self.n+1
return (None, None) total_progress = sum([a.progress for a in applications.values()])
total_hardwareprogress = sum([a.hardwareprogress for a in applications.values()])
def execute(self, action, actuator): for a in applications.values():
a.reset_progress()
logger.info("Controller: applications %r" %applications.values())
logger.info("Controller: Reading progress %s and power %s."
%(total_progress,total_power))
loss = self.loss.loss(progress=total_progress,power=total_power)
logger.info("Controller: Incurring loss %s." %loss)
if self.enforce is not None:
logger.info("Controller: enforced action.")
a=self.enforce
elif self.n>self.initialization_rounds:
logger.info("Controller: playing bandit.")
a=self.bandit.next(loss)
else:
logger.info("Controller: estimating max power/max progress ranges.")
a=self.n % len(self.actions)
action = self.actions[a]
logger.info("Controller: playing arm id %s (powercap '%s')."
%(str(a),str([act.command for act in list(action)])))
if self.log_power is not None:
self.log_power.write("%s %s %s %s %s %s %s\n"
%(str(total_hardwareprogress),str(total_progress),str(total_power),str(loss),
str(a),str([act.command for act in list(action)]),current_p))
self.log_power.flush()
return(list(action),self.actuators)
def execute(self, actions, actuators):
"""Build the action for the appropriate manager.""" """Build the action for the appropriate manager."""
actuator.execute(action) for action, actuator in zip(actions,actuators):
actuator.execute(action)
def update(self, action, actuator): def update(self, actions, actuators):
"""Update tracking across the board to reflect the last action.""" """Update tracking across the board to reflect the last action."""
actuator.update(action) for action, actuator in zip(actions,actuators):
actuator.update(action)
...@@ -233,7 +233,8 @@ class rapl_reader: ...@@ -233,7 +233,8 @@ class rapl_reader:
ret['energy'] = dict() ret['energy'] = dict()
for k in sorted(e.keys()): for k in sorted(e.keys()):
if k != 'time': if k != 'time':
ret['energy'][self.shortenkey(k)] = e[k] ret['energy'][self.shortenkey(k)] = de[k]
ret['energy']['cumulative'] = self.totalenergy
ret['power'] = dict() ret['power'] = dict()
totalpower = 0.0 totalpower = 0.0
......
...@@ -2,7 +2,7 @@ from __future__ import print_function ...@@ -2,7 +2,7 @@ from __future__ import print_function
from applications import ApplicationManager from applications import ApplicationManager
from containers import ContainerManager from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator from controller import BanditController, ApplicationActuator, DiscretizedPowerActuator
from functools import partial from functools import partial
import json import json
import logging import logging
...@@ -18,8 +18,19 @@ logger = logging.getLogger('nrm') ...@@ -18,8 +18,19 @@ logger = logging.getLogger('nrm')
class Daemon(object): class Daemon(object):
def __init__(self): def __init__(self,power_discretization=4,
enforce_powerpolicy=False,
lowerboundwatts=100,
exploration_constant=0.1,
log_power=None,
period=5000):
self.target = 100.0 self.target = 100.0
self.log_power = log_power
self.period = period
self.k = power_discretization
self.eps = exploration_constant
self.enforce = enforce_powerpolicy
self.lowerboundwatts = lowerboundwatts
def do_downstream_receive(self, parts): def do_downstream_receive(self, parts):
logger.info("receiving downstream message: %r", parts) logger.info("receiving downstream message: %r", parts)
...@@ -45,6 +56,11 @@ class Daemon(object): ...@@ -45,6 +56,11 @@ class Daemon(object):
if uuid in self.application_manager.applications: if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid] app = self.application_manager.applications[uuid]
app.update_progress(msg) app.update_progress(msg)
elif event == 'hardwareprogress':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'exit': elif event == 'exit':
self.application_manager.delete(msg['uuid']) self.application_manager.delete(msg['uuid'])
else: else:
...@@ -127,11 +143,10 @@ class Daemon(object): ...@@ -127,11 +143,10 @@ class Daemon(object):
logger.info("sending sensor message: %r", msg) logger.info("sending sensor message: %r", msg)
def do_control(self): def do_control(self):
plan = self.controller.planify(self.target, self.machine_info) self.do_sensor()
action, actuator = plan actions, actuators = self.controller.planify(self.target, self.machine_info, self.application_manager.applications)
if action: self.controller.execute(actions, actuators)
self.controller.execute(action, actuator) self.controller.update(actions, actuators)
self.controller.update(action, actuator)
def do_signal(self, signum, frame): def do_signal(self, signum, frame):
if signum == signal.SIGINT: if signum == signal.SIGINT:
...@@ -221,18 +236,19 @@ class Daemon(object): ...@@ -221,18 +236,19 @@ class Daemon(object):
self.container_manager = ContainerManager(self.resource_manager) self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager() self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager() self.sensor_manager = SensorManager()
aa = ApplicationActuator(self.application_manager, self.downstream_pub) # aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = PowerActuator(self.sensor_manager) pa = DiscretizedPowerActuator(self.sensor_manager,lowerboundwatts=self.lowerboundwatts,k=self.k)
self.controller = Controller([aa, pa]) self.controller = BanditController([pa],enforce=self.enforce,
exploration=self.eps,log_power=self.log_power)
self.sensor_manager.start() self.sensor_manager.start()
self.machine_info = self.sensor_manager.do_update() self.machine_info = self.sensor_manager.do_update()
# setup periodic sensor updates # setup periodic sensor updates
self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, 1000) # self.sensor_cb = ioloop.PeriodicCallback(self.do_sensor, self.period / 5)
self.sensor_cb.start() # self.sensor_cb.start()
self.control = ioloop.PeriodicCallback(self.do_control, 1000) self.control = ioloop.PeriodicCallback(self.do_control, self.period)
self.control.start() self.control.start()
# take care of signals # take care of signals
...@@ -242,8 +258,24 @@ class Daemon(object): ...@@ -242,8 +258,24 @@ class Daemon(object):
ioloop.IOLoop.current().start() ioloop.IOLoop.current().start()
def runner(): def runner(power_discretization=4,
enforce_powerpolicy=False,
lowerboundwatts=100,
exploration_constant=0.1,
log_power=None,
period=5000):
ioloop.install() ioloop.install()
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
daemon = Daemon() if log_power is not None: log_power=open(log_power,'w',0)
daemon.main() try:
daemon = Daemon(power_discretization=power_discretization,
enforce_powerpolicy=enforce_powerpolicy,
lowerboundwatts=lowerboundwatts,
exploration_constant=exploration_constant,
log_power=log_power)
daemon.main()
if log_power!=None:
log_power.close()
except:
if log_power!=None:
log_power.close()
"""Various clients for system utilities.""" """Various clients for system utilities."""
import collections import collections
import logging import logging
import os
import xml.etree.ElementTree import xml.etree.ElementTree
import tornado.process as process import tornado.process as process
import subprocess import subprocess
...@@ -42,7 +43,12 @@ class NodeOSClient(object): ...@@ -42,7 +43,12 @@ class NodeOSClient(object):
def __init__(self): def __init__(self):
"""Load client configuration.""" """Load client configuration."""
self.prefix = "argo_nodeos_config" if 'ARGO_NODEOS_CONFIG' in os.environ:
logger.warning("NodeOSClient: bypassing argo_nodeos_config with %s\n" %os.environ['ARGO_NODEOS_CONFIG'])
self.prefix=os.environ['ARGO_NODEOS_CONFIG']
else:
logger.warning("NodeOSClient: using argo_nodeos_config from path")
self.prefix = "argo_nodeos_config"
def getavailable(self): def getavailable(self):
"""Gather available resources.""" """Gather available resources."""
......
...@@ -20,6 +20,6 @@ setup( ...@@ -20,6 +20,6 @@ setup(
], ],
packages=find_packages(), packages=find_packages(),
install_requires=['six', 'pyzmq', 'tornado', 'numpy'], install_requires=['six', 'pyzmq', 'tornado', 'numpy', 'docopt'],
scripts=['bin/daemon', 'bin/app', 'bin/cmd', 'bin/argo-perf-wrapper'] scripts=['bin/daemon', 'bin/app', 'bin/cmd', 'bin/argo-perf-wrapper']
) )