...
 
Commits (1)
eval "$(lorri direnv)"
......@@ -83,6 +83,14 @@ class CommandLineInterface(object):
if msg.type == "power":
print("%s, %s, %s" % (msg.type, time.time(),
msg.total))
if msg.type == "control":
print("%s, %s, %s, %s, %s, %s"
% (msg.type,
msg.powercap,
msg.power,
msg.performance,
msg.control_time,
msg.feedback_time))
if msg.type == "container_exit":
print("%s, %s, %s" % (msg.type, time.time(),
msg.profile_data))
......@@ -189,6 +197,7 @@ class CommandLineInterface(object):
"daemon: %s" % str(exitmsg.status))
sys.exit(1)
break
sys.stdout.flush()
def do_list(self, argv):
"""Connect to the NRM and ask to list the containers present on the
......
......@@ -58,6 +58,10 @@ def main(argv=None):
parser.set_defaults(**defaults)
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="store_true")
parser.add_argument(
"--powercap",
help="The powercap policy to use. Currently, a cap value in watts.",
default=120)
parser.add_argument(
"--nrm_log",
help="Main log file. Override default with the NRM_LOG."
......
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
class Action(object):
"""Information about a control action."""
def __init__(self, target, command, delta):
self.target = target
self.command = command
self.delta = delta
class ApplicationActuator(object):
"""Actuator in charge of application thread control."""
def __init__(self, am, pubstream):
self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target):
ret = []
for identity, application in \
self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta))
return ret
def execute(self, action):
target_threads = action.target.threads
update = {'type': 'application',
'command': 'threads',
'uuid': action.target.uuid,
'event': 'threads',
}
if action.command == 'i':
payload = target_threads['cur'] + 1
elif action.command == 'd':
payload = target_threads['cur'] - 1
else:
assert False, "impossible command"
update['payload'] = payload
self.pubstream.send_json(update)
def update(self, action):
action.target.do_thread_transition(action.command)
class PowerActuator(object):
"""Actuator in charge of power control."""
def __init__(self, sm):
self.sensor_manager = sm
def available_actions(self, target):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("power limits: %r:", pl)
if target == 'i':
for k in pl:
r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW']))
actions.extend([Action(k, s, s - r[0]) for s in r])
elif target == 'd':
for k in pl:
r = range(1, int(pl[k]['curW']))
actions.extend([Action(k, s, r[-1] - s) for s in r])
return actions
def execute(self, action):
logger.info("changing power limit: %r, %r", action.command,
action.delta)
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
pass
class Controller(object):
"""Implements a control loop for resource management."""
def __init__(self, actuators):
self.actuators = actuators
def planify(self, target, machineinfo):
"""Plan the next action for the control loop."""
try:
total_power = machineinfo['energy']['power']['total']
except TypeError:
logging.error("\"machineinfo\" malformed. Can not run "
"control loop.")
return (None, None)
direction = None
if total_power < target:
direction = 'i'
elif total_power > target:
direction = 'd'
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop(0)
else:
return (None, None)
def execute(self, action, actuator):
"""Build the action for the appropriate manager."""
actuator.execute(action)
def update(self, action, actuator):
"""Update tracking across the board to reflect the last action."""
actuator.update(action)
def run_policy_container(self, container, application):
"""Run policies on a container."""
ids = container.resources.cpus
pcs = application.phase_contexts
# Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids):
# Only run policy if all phase contexts are an
# aggregation of same number of phases
aggs = [pcs[i]['aggregation'] for i in ids]
if aggs.count(aggs[0]) == len(aggs):
container.power['manager'].run_policy(pcs)
if filter(lambda i: pcs[i]['set'], ids):
logger.debug("Phase context not reset %r", application)
else:
container.power['manager'].reset_all()
for i in ids:
pcs[i]['set'] = False
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
p = containers[container].power
if p['policy']:
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
self.run_policy_container(containers[container], app)
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
from __future__ import print_function
import logging
import time
from scipy.integrate import trapz
from nrm.messaging import MSGTYPES
PUB_MSG = MSGTYPES['up_pub']
logger = logging.getLogger('nrm')
class DDCMController(object):
def __init__(self):
pass
def run_policy_container(self, container, application):
"""run policies on a container."""
ids = container.resources.cpus
pcs = application.phase_contexts
# Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids):
# Only run policy if all phase contexts are an
# aggregation of same number of phases
aggs = [pcs[i]['aggregation'] for i in ids]
if aggs.count(aggs[0]) == len(aggs):
container.power['manager'].run_policy(pcs)
if filter(lambda i: pcs[i]['set'], ids):
logger.debug("Phase context not reset %r", application)
else:
container.power['manager'].reset_all()
for i in ids:
pcs[i]['set'] = False
class NodePowerController(object):
"""Implements a control loop for power capping."""
def __init__(self,
upstream_pub_server,
powercap,
sensor_manager,
upstream_pub,
period):
self.upstream_pub_server = upstream_pub_server
self.upstream_pub = upstream_pub
self.sensor_manager = sensor_manager
self.period = period # control period length
self.power_ts = [] # power time series
self.perf_ts = [] # performance time series
self.last_action = powercap # initial powercap
self.last_time = time.time() # time of last action
def step_ready(self):
def ready(ts):
if len(ts) > 0:
return ((ts[-1][0] > self.last_time + self.period)
and len(ts) > 1)
else:
return False
return(ready(self.power_ts) and ready(self.perf_ts))
def integrate_and_drop(self, t_now):
def filter_ts(ts):
return([(t, x) for (t, x) in ts if t >= self.last_time])
def integrate(ts):
return(trapz([x[1] for x in ts],
[x[0] for x in ts]))
def spantime(ts): return(ts[-1][0] - ts[0][0])
power_ts = filter_ts(self.power_ts)
perf_ts = filter_ts(self.perf_ts)
perf = integrate(perf_ts)
power = integrate(power_ts)
t_power = spantime(power_ts)
t_perf = spantime(perf_ts)
self.power_ts = [self.power_ts[-1]]
self.perf_ts = [self.perf_ts[-1]]
return(perf, power, t_power, t_perf)
def feed_power(self, v):
self.power_ts.append((time.time(), v))
def feed_performance(self, v):
self.perf_ts.append((time.time(), v))
def step(self):
if self.step_ready():
logger.info("ready to control")
now = time.time()
perf, power, t_power, t_perf = self.integrate_and_drop(now)
perfvalue = float(perf)/float(t_perf)
powervalue = float(power)/float(t_power)
self.publish(self.last_action,
perfvalue,
powervalue,
self.last_time,
now)
else:
logger.info("wasn't ready to control")
logger.info(self.power_ts)
logger.info(self.perf_ts)
def command(self, cap):
domains = ['package-0', 'package-1']
logger.info("GET:")
logger.info(self.sensor_manager.rapl.get_powerlimits())
for domain in domains:
logger.info("Setting powercap on domain %s to %d", domain, cap)
self.sensor_manager.set_powerlimit(domain, cap)
def publish(self, cap, power, perf, time1, time2):
pub = {'api': 'up_pub',
'type': 'control',
'powercap': cap,
'power': power,
'performance': perf,
'control_time': time1,
'feedback_time': time2}
self.upstream_pub_server.sendmsg(PUB_MSG['control'](**pub))
......@@ -310,7 +310,12 @@ class rapl_reader:
print 'Failed to update:', fn, '(root privilege is required)'
return
f.write('%d' % uw)
f.close()
try:
f.close()
except:
print('Failed to close file %s' % fn)
print('make sure that the bios allows changing the package powercap.')
print('debug this with `dmesg`.')
def set_powerlimit(self, newval, dom):
l = self.dirs[dom]
......
......@@ -12,7 +12,7 @@ from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager, NodeOSRuntime
from controller import Controller, PowerActuator
from controllers import DDCMController, NodePowerController
from powerpolicy import PowerPolicyManager
from functools import partial
import logging
......@@ -65,6 +65,7 @@ class Daemon(object):
'container_uuid': msg.container_uuid}
self.upstream_pub_server.sendmsg(
PUB_MSG['performance'](**pub))
self.nodepowercontroller.feed_performance(msg.payload)
elif msg.type == 'phase_context':
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
......@@ -75,7 +76,7 @@ class Daemon(object):
if c.power['policy']:
app.update_phase_context(msg)
# Run container policy
self.controller.run_policy_container(c, app)
self.ddcmcontroller.run_policy_container(c, app)
elif msg.type == 'application_exit':
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
......@@ -178,6 +179,7 @@ class Daemon(object):
logger.error("power sensor format malformed, "
"can not report power upstream.")
else:
self.nodepowercontroller.feed_power(total_power)
msg = {'api': 'up_pub',
'type': 'power',
'total': total_power,
......@@ -187,14 +189,7 @@ class Daemon(object):
logger.info("sending sensor message: %r", msg)
def do_control(self):
plan = self.controller.planify(self.target, self.machine_info)
action, actuator = plan
if action:
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
# Call policy only if there are containers
# if self.container_manager.containers:
# self.controller.run_policy(self.container_manager.containers)
self.nodepowercontroller.step()
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -318,8 +313,15 @@ class Daemon(object):
)
self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager()
pa = PowerActuator(self.sensor_manager)
self.controller = Controller([pa])
self.ddcmcontroller = DDCMController()
self.nodepowercontroller = NodePowerController(
upstream_pub_server=self.upstream_pub_server,
powercap=self.config.powercap,
sensor_manager=self.sensor_manager,
upstream_pub=self.upstream_pub_server,
period=1
)
self.sensor_manager.start()
self.machine_info = self.sensor_manager.do_update()
......
......@@ -94,6 +94,13 @@ MSGFORMATS['up_pub'] = {
'progress': {
'application_uuid': basestring,
'payload': int
},
'control': {
'powercap': int,
'power': float,
'performance': float,
'control_time': float,
'feedback_time': float
}
}
......