Commit 7bf55d23 authored by Sridutt Bhalachandra's avatar Sridutt Bhalachandra
Browse files

Added changes in NRM to respond to phase_context event

Added changes in NRM to respond to phase_context event from [previously called
power_policy event (874a6a4d)] from the application. The NRM can now store the
informaton received on the event and call DDCM power policy through interfaces
developed (Issue #11) in the control loop

See Issue #10
parent fec59aff
Pipeline #3718 canceled with stages
......@@ -18,12 +18,13 @@ class Application(object):
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads):
def __init__(self, uuid, container, progress, threads, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
"""Update the thread fsm state."""
......@@ -58,6 +59,15 @@ class Application(object):
"""Update the progress tracking."""
assert self.progress
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = msg['cpu']
self.phase_contexts[id]['startcompute'] = msg['startcompute']
self.phase_contexts[id]['endcompute'] = msg['endcompute']
self.phase_contexts[id]['startbarrier'] = msg['startbarrier']
self.phase_contexts[id]['endbarrier'] = msg['endbarrier']
self.phase_contexts[id]['set'] = 1
class ApplicationManager(object):
......@@ -66,15 +76,25 @@ class ApplicationManager(object):
def __init__(self):
self.applications = dict()
def register(self, msg):
def register(self, msg, container):
"""Register a new downstream application."""
uuid = msg['uuid']
container = msg['container']
container_uuid = msg['container']
progress = msg['progress']
threads = msg['threads']
self.applications[uuid] = Application(uuid, container, progress,
phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier']
if (container.powerpolicy['policy'] != "NONE"):
ids = container.resources['cpus']
for id in ids:
phase_contexts[id] = dict.fromkeys(phase_context_keys)
phase_contexts[id]['set'] = 0
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
def delete(self, uuid):
"""Delete an application from the register."""
......@@ -117,3 +117,21 @@ class Controller(object):
def update(self, action, actuator):
"""Update tracking across the board to reflect the last action."""
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
pp = containers[container].powerpolicy
if pp['policy'] != "NONE":
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
ids = containers[container].resources['cpus']
# Run policy only if all phase contexts have been received
if not filter(lambda i: app.phase_contexts[i]['set'] == 0,
if filter(lambda i: app.phase_contexts[i]['set'] == 1,
logger.debug("Phase context not reset %r", app)
......@@ -35,7 +35,9 @@ class Daemon(object):
logger.error("wrong message format: %r", msg)
if event == 'start':
container_uuid = msg['container']
container = self.container_manager.containers[container_uuid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
......@@ -49,8 +51,10 @@ class Daemon(object):
elif event == 'phase_context':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
container = self.container_manager.containers[uuid]
if container.powerpolicy['policy'] != "NONE":
app = self.application_manager.applications[uuid]
# TODO: Take appropriate action
app.update_phase_context(self, msg)
elif event == 'exit':
......@@ -85,8 +89,8 @@ class Daemon(object):
container.powerpolicy['manager'] = PowerPolicyManager(
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'event': 'start',
......@@ -145,6 +149,9 @@ class Daemon(object):
if action:
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
# Call policy only if there are containers
if self.container_manager.containers:
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -170,6 +177,8 @@ class Daemon(object):
# check if this is an exit
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid]
if container.powerpolicy['policy'] != "NONE":
msg = {'type': 'container',
'event': 'exit',
......@@ -58,20 +58,24 @@ class PowerPolicyManager:
self.slowdownexits = 0
self.prevtolalphasetime = 10000.0 # Any large value
def run_policy(self, cpu, startcompute, endcompute, startbarrier,
if cpu not in self.cpus:"Attempt to change power of cpu not in container : %r",
def run_policy(self, phase_contexts):
for id in phase_contexts:
if id not in self.cpus:"""Attempt to change power of cpu not in container
: %r""", id)
# Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition to user
# specified using manifest file
ret, value = self.invoke_policy(cpu, self.policy, self.dclevel[cpu],
self.freqlevel[cpu], startcompute,
endcompute, startbarrier, endbarrier)
# TODO: Need to add a better policy selection logic in addition to
# user specified using manifest file
ret, value = self.invoke_policy(id, self.policy, self.dclevel[id],
if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']:
self.dclevel[cpu] = value
self.dclevel[id] = value
phase_contexts[id]['set'] = 0
def invoke_policy(self, cpu, policy, dclevel, freqlevel, startcompute,
endcompute, startbarrier, endbarrier):
......@@ -133,3 +137,8 @@ class PowerPolicyManager:
def power_check(self, cpu):
# Check status of all power controls
return self.ddcmpolicy.dc.check(cpu)
def reset_all(self):
# Reset all cpus
for cpu in self.cpus:
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment