Commit 0c07c4dd authored by Sridutt Bhalachandra's avatar Sridutt Bhalachandra
Browse files

[Feature] NRM response to phase_context event

Made changes in NRM to respond to phase_context event from [previously
called power_policy event (874a6a4d)] from the application. The NRM can now
store the informaton received on the event and call DDCM power policy through
interfaces developed (Issue #11) in the control loop

See Issue #10
parent 40f4361c
......@@ -18,12 +18,13 @@ class Application(object):
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads):
def __init__(self, uuid, container, progress, threads, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
"""Update the thread fsm state."""
......@@ -58,6 +59,13 @@ class Application(object):
"""Update the progress tracking."""
assert self.progress
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = int(msg['cpu'])
self.phase_contexts[id] = {k: int(msg[k]) for k in ('startcompute',
'endcompute', 'startbarrier', 'endbarrier')}
self.phase_contexts[id]['set'] = True
class ApplicationManager(object):
......@@ -66,15 +74,25 @@ class ApplicationManager(object):
def __init__(self):
self.applications = dict()
def register(self, msg):
def register(self, msg, container):
"""Register a new downstream application."""
uuid = msg['uuid']
container = msg['container']
container_uuid = msg['container']
progress = msg['progress']
threads = msg['threads']
self.applications[uuid] = Application(uuid, container, progress,
phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier']
if container.powerpolicy['policy']:
ids = container.resources['cpus']
for id in ids:
phase_contexts[id] = dict.fromkeys(phase_context_keys)
phase_contexts[id]['set'] = False
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
def delete(self, uuid):
"""Delete an application from the register."""
......@@ -117,3 +117,20 @@ class Controller(object):
def update(self, action, actuator):
"""Update tracking across the board to reflect the last action."""
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
pp = containers[container].powerpolicy
if pp['policy']:
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
ids = containers[container].resources['cpus']
# Run policy only if all phase contexts have been received
if not filter(lambda i: not app.phase_contexts[i]['set'],
if filter(lambda i: app.phase_contexts[i]['set'], ids):
logger.debug("Phase context not reset %r", app)
......@@ -35,7 +35,9 @@ class Daemon(object):
logger.error("wrong message format: %r", msg)
if event == 'start':
container_uuid = msg['container']
container = self.container_manager.containers[container_uuid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
......@@ -50,9 +52,13 @@ class Daemon(object):
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
# TODO: Take appropriate action
c = self.container_manager.containers[app.container_uuid]
if c.powerpolicy['policy']:
elif event == 'exit':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
logger.error("unknown event: %r", event)
......@@ -85,8 +91,8 @@ class Daemon(object):
container.powerpolicy['manager'] = PowerPolicyManager(
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'event': 'start',
......@@ -145,6 +151,9 @@ class Daemon(object):
if action:
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
# Call policy only if there are containers
if self.container_manager.containers:
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -170,6 +179,8 @@ class Daemon(object):
# check if this is an exit
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid]
if container.powerpolicy['policy']:
msg = {'type': 'container',
'event': 'exit',
......@@ -58,31 +58,27 @@ class PowerPolicyManager:
self.slowdownexits = 0
self.prevtolalphasetime = 10000.0 # Any large value
def run_policy(self, cpu, startcompute, endcompute, startbarrier,
def run_policy(self, phase_contexts):
# Run only if policy is specified
if self.policy:
if cpu not in self.cpus:"""Attempt to change power of cpu not in container
: %r""", cpu)
# Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition to
# user specified using manifest file
ret, value = self.invoke_policy(cpu, self.policy,
self.freqlevel[cpu], startcompute,
endcompute, startbarrier,
if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']:
self.dclevel[cpu] = value
def invoke_policy(self, cpu, policy, dclevel, freqlevel, startcompute,
endcompute, startbarrier, endbarrier):
for id in phase_contexts:
if id not in self.cpus:"""Attempt to change power of cpu not in container
: %r""", id)
# Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition
# to user specified using manifest file
ret, value = self.invoke_policy(id, **phase_contexts[id])
if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']:
self.dclevel[id] = value
phase_contexts[id]['set'] = False
def invoke_policy(self, cpu, **kwargs):
# Calculate time spent in computation, barrier in current phase along
# with total phase time
computetime = endcompute - startcompute
barriertime = endbarrier - startbarrier
computetime = kwargs['endcompute'] - kwargs['startcompute']
barriertime = kwargs['endbarrier'] - kwargs['startbarrier']
totalphasetime = computetime + barriertime
# If the current phase length is less than the damper value, then do
......@@ -98,7 +94,7 @@ class PowerPolicyManager:
# If the current phase has slowed down beyond the threshold set, then
# reset power. This helps correct error in policy application or acts
# as a rudimentary way to detect phase change
if(dclevel < self.ddcmpolicy.maxdclevel and totalphasetime >
if(self.dclevel[cpu] < self.ddcmpolicy.maxdclevel and totalphasetime >
self.slowdown * self.prevtolalphasetime):
newdclevel = self.ddcmpolicy.maxdclevel
......@@ -106,9 +102,9 @@ class PowerPolicyManager:
return 'SLOWDOWN', newdclevel
# Invoke the correct policy based on operation module
if policy == "DDCM":
newdclevel = self.ddcmpolicy.execute(cpu, dclevel, computetime,
if self.policy == "DDCM":
newdclevel = self.ddcmpolicy.execute(cpu, self.dclevel[cpu],
computetime, totalphasetime)
# TODO: Add DVFS and Combined policies
......@@ -135,3 +131,8 @@ class PowerPolicyManager:
def power_check(self, cpu):
# Check status of all power controls
return self.ddcmpolicy.dc.check(cpu)
def reset_all(self):
# Reset all cpus
for cpu in self.cpus:
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment