Commit d2a10dcf authored by Swann Perarnau's avatar Swann Perarnau

Merge branch '10-integrating-power-policy-with-downstream-api-daemon-py' into 'master'

Resolve "Integration of power policy in to NRM"

Closes #10

See merge request !20
parents 65ec78a7 b07a4ff6
Pipeline #3763 canceled with stages
......@@ -28,7 +28,9 @@
"name": "argo/powerpolicy",
"value": {
"enabled": "0",
"policy": "NONE"
"policy": "NONE",
"damper": "0.1",
"slowdown": "1.1"
}
}
]
......
......@@ -3,7 +3,7 @@
* Description: This file contains the implementation of downstream API to
* transmit application context information to NRM.
*
* The application context information transmitted is used to monitor
* The application context information transmitted can be used to monitor
* application progress and/or invoke power policies to improve energy
* efficiency at the node level.
*/
......@@ -71,9 +71,9 @@ int nrm_send_progress(struct nrm_context *ctxt, unsigned long progress)
return 0;
}
int nrm_invoke_power_policy(struct nrm_context *ctxt, int cpu, double
startCompute, double endCompute, double startBarrier, double
endBarrier)
int nrm_send_phase_context(struct nrm_context *ctxt, int cpu, unsigned long
long int startCompute, unsigned long long int endCompute, unsigned
long long int startBarrier, unsigned long long int endBarrier)
{
char buf[512];
struct timespec now;
......@@ -83,7 +83,7 @@ int nrm_invoke_power_policy(struct nrm_context *ctxt, int cpu, double
if(timediff > NRM_RATELIMIT_THRESHOLD)
{
snprintf(buf, 512, NRM_POWER_POLICY_FORMAT, cpu, startCompute,
snprintf(buf, 512, NRM_PHASE_CONTEXT_FORMAT, cpu, startCompute,
endCompute, startBarrier, endBarrier, ctxt->app_uuid);
int err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
......
......@@ -28,15 +28,15 @@ struct nrm_context {
#define NRM_START_FORMAT "{\"type\":\"application\", \"event\":\"start\", \"container\": \"%s\", \"uuid\": \"%s\", \"progress\": true, \"threads\": null}"
#define NRM_PROGRESS_FORMAT "{\"type\":\"application\", \"event\":\"progress\", \"payload\": \"%lu\", \"uuid\": \"%s\"}"
#define NRM_POWER_POLICY_FORMAT "{\"type\":\"application\", \"event\":\"power_policy\", \"cpu\": \"%d\", \"startcompute\": \"%lf\", \"endcompute\": \"%lf\", \"startbarrier\": \"%lf\", \"endbarrier\": \"%lf\", \"uuid\": \"%s\"}"
#define NRM_PHASE_CONTEXT_FORMAT "{\"type\":\"application\", \"event\":\"phase_context\", \"cpu\": \"%d\", \"startcompute\": \"%llu\", \"endcompute\": \"%llu\", \"startbarrier\": \"%llu\", \"endbarrier\": \"%llu\", \"uuid\": \"%s\"}"
#define NRM_EXIT_FORMAT "{\"type\":\"application\", \"event\":\"exit\", \"uuid\": \"%s\"}"
int nrm_init(struct nrm_context *, const char *);
int nrm_fini(struct nrm_context *);
int nrm_send_progress(struct nrm_context *, unsigned long);
int nrm_invoke_power_policy(struct nrm_context *ctxt, int cpu, double
startCompute, double endCompute, double startBarrier, double
endBarrier);
int nrm_send_phase_context(struct nrm_context *ctxt, int cpu, unsigned long
long int startCompute, unsigned long long int endCompute, unsigned
long long int startBarrier, unsigned long long int endBarrier);
#endif
......@@ -156,7 +156,9 @@ class PowerPolicy(SpecField):
policies = ['NONE', 'DDCM', 'DVFS', 'COMBINED']
fields = {"enabled": spec(unicode, False),
"policy": spec(unicode, False)
"policy": spec(unicode, False),
"damper": spec(unicode, False),
"slowdown": spec(unicode, False)
}
def __init__(self):
......@@ -176,6 +178,14 @@ class PowerPolicy(SpecField):
logger.error("Invalid value of powerpolicy policy: %s",
self.policy)
return False
if self.damper < 0.0:
logger.error("Invalid value of powerpolicy damper: %s",
self.policy)
return False
if self.slowdown < 1.0:
logger.error("Invalid value of powerpolicy slowdown: %s",
self.policy)
return False
return True
......
......@@ -18,12 +18,13 @@ class Application(object):
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads):
def __init__(self, uuid, container, progress, threads, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
"""Update the thread fsm state."""
......@@ -58,6 +59,13 @@ class Application(object):
"""Update the progress tracking."""
assert self.progress
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = int(msg['cpu'])
self.phase_contexts[id] = {k: int(msg[k]) for k in ('startcompute',
'endcompute', 'startbarrier', 'endbarrier')}
self.phase_contexts[id]['set'] = True
class ApplicationManager(object):
......@@ -66,15 +74,25 @@ class ApplicationManager(object):
def __init__(self):
self.applications = dict()
def register(self, msg):
def register(self, msg, container):
"""Register a new downstream application."""
uuid = msg['uuid']
container = msg['container']
container_uuid = msg['container']
progress = msg['progress']
threads = msg['threads']
self.applications[uuid] = Application(uuid, container, progress,
threads)
phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier']
if container.powerpolicy['policy']:
ids = container.resources['cpus']
for id in ids:
phase_contexts[id] = dict.fromkeys(phase_context_keys)
phase_contexts[id]['set'] = False
else:
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
def delete(self, uuid):
"""Delete an application from the register."""
......
......@@ -7,7 +7,8 @@ import os
from subprograms import ChrtClient, NodeOSClient, resources
logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'process'])
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
'powerpolicy', 'process'])
class ContainerManager(object):
......@@ -50,15 +51,23 @@ class ContainerManager(object):
environ['AC_APP_NAME'] = manifest.name
environ['AC_METADATA_URL'] = "localhost"
logger.info("run: environ: %r", environ)
# TODO: Application library to load must be set during configuration
applicationpreloadlibrary = '.so'
# create container
container_name = request['uuid']
environ['ARGO_CONTAINER_UUID'] = container_name
logger.info("creating container %s", container_name)
self.nodeos.create(container_name, allocation)
logger.info("created container %s", container_name)
container_resources = dict()
container_resources['cpus'], container_resources['mems'] = allocation
# Container power policy information
container_powerpolicy = dict()
container_powerpolicy['policy'] = None
container_powerpolicy['damper'] = None
container_powerpolicy['slowdown'] = None
container_powerpolicy['manager'] = None
# TODO: Application library to load must be set during configuration
applicationpreloadlibrary = ''
# run my command
if hasattr(manifest.app.isolators, 'scheduler'):
......@@ -78,16 +87,22 @@ class ContainerManager(object):
if hasattr(manifest.app.isolators, 'powerpolicy'):
if hasattr(manifest.app.isolators.powerpolicy, 'enabled'):
if manifest.app.isolators.powerpolicy.enabled in ["1", "True"]:
if manifest.app.isolators.powerpolicy.policy != "NONE":
environ['LD_PRELOAD'] = applicationpreloadlibrary
pp = manifest.app.isolators.powerpolicy
if pp.enabled in ["1", "True"]:
if pp.policy != "NONE":
container_powerpolicy['policy'] = pp.policy
container_powerpolicy['damper'] = pp.damper
container_powerpolicy['slowdown'] = pp.slowdown
environ['LD_PRELOAD'] = applicationpreloadlibrary
argv.append(command)
argv.extend(args)
process = self.nodeos.execute(container_name, argv, environ)
c = Container(container_name, manifest, process)
c = Container(container_name, manifest, container_resources,
container_powerpolicy, process)
self.pids[process.pid] = c
self.containers[container_name] = c
logger.info("Container %s created and running : %r", container_name, c)
return c
def delete(self, uuid):
......
......@@ -117,3 +117,20 @@ class Controller(object):
def update(self, action, actuator):
"""Update tracking across the board to reflect the last action."""
actuator.update(action)
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
pp = containers[container].powerpolicy
if pp['policy']:
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
ids = containers[container].resources['cpus']
# Run policy only if all phase contexts have been received
if not filter(lambda i: not app.phase_contexts[i]['set'],
ids):
pp['manager'].run_policy(app.phase_contexts)
if filter(lambda i: app.phase_contexts[i]['set'], ids):
logger.debug("Phase context not reset %r", app)
......@@ -3,6 +3,7 @@ from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator
from powerpolicy import PowerPolicyManager
from functools import partial
import json
import logging
......@@ -34,7 +35,9 @@ class Daemon(object):
logger.error("wrong message format: %r", msg)
return
if event == 'start':
self.application_manager.register(msg)
container_uuid = msg['container']
container = self.container_manager.containers[container_uuid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
......@@ -45,13 +48,17 @@ class Daemon(object):
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'power_policy':
elif event == 'phase_context':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
# TODO: Invoke appropriate power policy
c = self.container_manager.containers[app.container_uuid]
if c.powerpolicy['policy']:
app.update_phase_context(msg)
elif event == 'exit':
self.application_manager.delete(msg['uuid'])
uuid = msg['uuid']
if uuid in self.application_manager.applications:
self.application_manager.delete(msg['uuid'])
else:
logger.error("unknown event: %r", event)
return
......@@ -80,12 +87,19 @@ class Daemon(object):
logger.info("new container required: %r", msg)
container = self.container_manager.create(msg)
if container.powerpolicy['policy']:
container.powerpolicy['manager'] = PowerPolicyManager(
container.resources['cpus'],
container.powerpolicy['policy'],
float(container.powerpolicy['damper']),
float(container.powerpolicy['slowdown']))
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'event': 'start',
'uuid': container_uuid,
'errno': 0 if container else -1,
'pid': container.process.pid,
'powerpolicy': container.powerpolicy['policy']
}
self.upstream_pub.send_json(update)
# setup io callbacks
......@@ -137,6 +151,9 @@ class Daemon(object):
if action:
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
# Call policy only if there are containers
if self.container_manager.containers:
self.controller.run_policy(self.container_manager.containers)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -162,6 +179,8 @@ class Daemon(object):
# check if this is an exit
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid]
if container.powerpolicy['policy']:
container.powerpolicy['manager'].reset_all()
self.container_manager.delete(container.uuid)
msg = {'type': 'container',
'event': 'exit',
......
......@@ -28,19 +28,21 @@
for supported power contols and related information.
"""
import ddcmpolicy
import logging
logger = logging.getLogger('nrm')
class PowerPolicyManager:
""" Used for power policy application """
def __init__(self, ncpus=0, policy='NONE', damper=0.1, slowdown=1.1):
def __init__(self, cpus=None, policy=None, damper=0.1, slowdown=1.1):
self.cpus = cpus
self.policy = policy
self.damper = damper
self.slowdown = slowdown
# TODO: Need to set this based on container configuration
self.ncpus = ncpus
# Intiliaze all power interfaces
self.ddcmpolicy = ddcmpolicy.DDCMPolicy()
......@@ -48,36 +50,35 @@ class PowerPolicyManager:
self.maxdclevel = self.ddcmpolicy.maxdclevel
# TODO: Need to set this value when DVFS policies are added
self.maxfreqlevel = -1
# TODO: Need to only allow power changes to cpus in container
self.dclevel = dict.fromkeys(range(0, self.ncpus), self.maxdclevel)
self.freqlevel = dict.fromkeys(range(0, self.ncpus), self.maxfreqlevel)
self.dclevel = dict.fromkeys(self.cpus, self.maxdclevel)
self.freqlevel = dict.fromkeys(self.cpus, self.maxfreqlevel)
# Book-keeping
self.damperexits = 0
self.slowdownexits = 0
self.prevtolalphasetime = 10000.0 # Random large value
def run_policy(self, cpu, startcompute, endcompute, startbarrier,
endbarrier):
# Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition to user
# specified
ret, value = self.invoke_policy(cpu, self.policy, self.dclevel[cpu],
self.freqlevel[cpu], startcompute,
endcompute, startbarrier, endbarrier)
if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']:
self.dclevel[cpu] = value
def invoke_policy(self, cpu, policy, dclevel, freqlevel, startcompute,
endcompute, startbarrier, endbarrier):
# Run with no policy
if policy == "NONE":
return 'NONE', -1
self.prevtolalphasetime = 10000.0 # Any large value
def run_policy(self, phase_contexts):
# Run only if policy is specified
if self.policy:
for id in phase_contexts:
if id not in self.cpus:
logger.info("""Attempt to change power of cpu not in container
: %r""", id)
return
# Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition
# to user specified using manifest file
ret, value = self.invoke_policy(id, **phase_contexts[id])
if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']:
self.dclevel[id] = value
phase_contexts[id]['set'] = False
def invoke_policy(self, cpu, **kwargs):
# Calculate time spent in computation, barrier in current phase along
# with total phase time
computetime = endcompute - startcompute
barriertime = endbarrier - startbarrier
computetime = kwargs['endcompute'] - kwargs['startcompute']
barriertime = kwargs['endbarrier'] - kwargs['startbarrier']
totalphasetime = computetime + barriertime
# If the current phase length is less than the damper value, then do
......@@ -93,7 +94,7 @@ class PowerPolicyManager:
# If the current phase has slowed down beyond the threshold set, then
# reset power. This helps correct error in policy application or acts
# as a rudimentary way to detect phase change
if(dclevel < self.ddcmpolicy.maxdclevel and totalphasetime >
if(self.dclevel[cpu] < self.ddcmpolicy.maxdclevel and totalphasetime >
self.slowdown * self.prevtolalphasetime):
self.ddcmpolicy.dc.reset(cpu)
newdclevel = self.ddcmpolicy.maxdclevel
......@@ -101,9 +102,9 @@ class PowerPolicyManager:
return 'SLOWDOWN', newdclevel
# Invoke the correct policy based on operation module
if policy == "DDCM":
newdclevel = self.ddcmpolicy.execute(cpu, dclevel, computetime,
totalphasetime)
if self.policy == "DDCM":
newdclevel = self.ddcmpolicy.execute(cpu, self.dclevel[cpu],
computetime, totalphasetime)
# TODO: Add DVFS and Combined policies
......@@ -128,3 +129,8 @@ class PowerPolicyManager:
def power_check(self, cpu):
# Check status of all power controls
return self.ddcmpolicy.dc.check(cpu)
def reset_all(self):
# Reset all cpus
for cpu in self.cpus:
self.power_reset(cpu)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment