Commit a501c976 authored by Sridutt Bhalachandra's avatar Sridutt Bhalachandra

[feature] Aggregative Downstream API integration

Adds support for aggregation of phase context information for an
application. The damper value (in nanoseconds in the manifest file)
decides the minimum phase length for which the phase context
information is sent to the NRM (implemented in 'libnrm' repo
[See Issue 2]). This will limit the number of msgs sent to the NRM.

See Issue #13
parent 5d292f9f
...@@ -191,6 +191,14 @@ class Power(SpecField): ...@@ -191,6 +191,14 @@ class Power(SpecField):
logger.error("Invalid value for power policy slowdown: %s", logger.error("Invalid value for power policy slowdown: %s",
self.policy) self.policy)
return False return False
if self.damper < 0.0:
logger.error("Invalid value of powerpolicy damper: %s",
self.policy)
return False
if self.slowdown < 1.0:
logger.error("Invalid value of powerpolicy slowdown: %s",
self.policy)
return False
return True return True
......
...@@ -64,9 +64,9 @@ class Application(object): ...@@ -64,9 +64,9 @@ class Application(object):
def update_phase_context(self, msg): def update_phase_context(self, msg):
"""Update the phase contextual information.""" """Update the phase contextual information."""
id = msg.cpu id = int(msg['cpu'])
self.phase_contexts[id] = {k: getattr(msg, k) for k in ('startcompute', self.phase_contexts[id] = {k: int(msg[k]) for k in ('aggregation',
'endcompute', 'startbarrier', 'endbarrier')} 'computetime', 'totaltime')}
self.phase_contexts[id]['set'] = True self.phase_contexts[id]['set'] = True
...@@ -85,8 +85,7 @@ class ApplicationManager(object): ...@@ -85,8 +85,7 @@ class ApplicationManager(object):
progress = 0 progress = 0
threads = False threads = False
phase_contexts = dict() phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute', phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime']
'startbarrier', 'endbarrier']
if container.power['policy']: if container.power['policy']:
ids = container.resources['cpus'] ids = container.resources['cpus']
for id in ids: for id in ids:
......
...@@ -124,19 +124,31 @@ class Controller(object): ...@@ -124,19 +124,31 @@ class Controller(object):
"""Update tracking across the board to reflect the last action.""" """Update tracking across the board to reflect the last action."""
actuator.update(action) actuator.update(action)
def run_policy_container(self, container, application):
"""Run policies on a container."""
ids = container.resources['cpus']
pcs = application.phase_contexts
# Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids):
# Only run policy if all phase contexts are an
# aggregation of same number of phases
aggs = [pcs[i]['aggregation'] for i in ids]
if aggs.count(aggs[0]) == len(aggs):
container.power['manager'].run_policy(pcs)
if filter(lambda i: pcs[i]['set'], ids):
logger.debug("Phase context not reset %r", application)
else:
container.power['manager'].reset_all()
for i in ids:
pcs[i]['set'] = False
def run_policy(self, containers): def run_policy(self, containers):
"""Run policies on containers with policies set.""" """Run policies on containers with policies set."""
for container in containers: for container in containers:
pp = containers[container].power p = containers[container].power
if pp['policy']: if p['policy']:
apps = self.actuators[0].application_manager.applications apps = self.actuators[0].application_manager.applications
if apps: if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid app = next(apps[a] for a in apps if apps[a].container_uuid
== container) == container)
ids = containers[container].resources['cpus'] self.run_policy_container(containers[container], app)
# Run policy only if all phase contexts have been received
if not filter(lambda i: not app.phase_contexts[i]['set'],
ids):
pp['manager'].run_policy(app.phase_contexts)
if filter(lambda i: app.phase_contexts[i]['set'], ids):
logger.debug("Phase context not reset %r", app)
...@@ -230,20 +230,24 @@ class Daemon(object): ...@@ -230,20 +230,24 @@ class Daemon(object):
'container_uuid': container.uuid, 'container_uuid': container.uuid,
'profile_data': dict(), 'profile_data': dict(),
} }
pp = container.power p = container.power
if pp['policy']: if p['policy']:
pp['manager'].reset_all() p['manager'].reset_all()
if pp['profile']: if p['profile']:
e = pp['profile']['end'] e = p['profile']['end']
self.machine_info = self.sensor_manager.do_update() self.machine_info = self.sensor_manager.do_update()
e = self.machine_info['energy']['energy'] e = self.machine_info['energy']['energy']
e['time'] = self.machine_info['time'] e['time'] = self.machine_info['time']
s = pp['profile']['start'] s = p['profile']['start']
# Calculate difference between the values # Calculate difference between the values
diff = self.sensor_manager.calc_difference(s, e) diff = self.sensor_manager.calc_difference(s, e)
# Get final package temperature # Get final package temperature
temp = self.machine_info['temperature'] temp = self.machine_info['temperature']
diff['temp'] = map(lambda k: temp[k]['pkg'], temp) diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
diff['policy'] = p['policy']
if p['policy']:
diff['damper'] = float(p['damper'])/1000000000
diff['slowdown'] = p['slowdown']
logger.info("Container %r profile data: %r", logger.info("Container %r profile data: %r",
container.uuid, diff) container.uuid, diff)
msg['profile_data'] = diff msg['profile_data'] = diff
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
p. 3. ACM, 2015. p. 3. ACM, 2015.
""" """
from __future__ import division
import math import math
import coolr import coolr
import coolr.dutycycle import coolr.dutycycle
...@@ -43,12 +44,15 @@ class DDCMPolicy: ...@@ -43,12 +44,15 @@ class DDCMPolicy:
self.dc = coolr.dutycycle.DutyCycle() self.dc = coolr.dutycycle.DutyCycle()
def print_stats(self, resetflag=False): def print_stats(self, resetflag=False):
print('DDCM Policy: DDCMPolicySets %d DDCMPolicyResets %d' % ddcmstats = dict()
(self.ddcmpolicyset, self.ddcmpolicyreset)) ddcmstats['DDCMPolicySets'] = self.ddcmpolicyset
ddcmstats['DDCMPolicyResets'] = self.ddcmpolicyreset
if resetflag: if resetflag:
self.ddcmpolicyset = 0 self.ddcmpolicyset = 0
self.ddcmpolicyreset = 0 self.ddcmpolicyreset = 0
return ddcmstats
def execute(self, cpu, currentdclevel, computetime, totalphasetime): def execute(self, cpu, currentdclevel, computetime, totalphasetime):
# Compute work done by cpu during current phase # Compute work done by cpu during current phase
work = computetime / totalphasetime work = computetime / totalphasetime
...@@ -79,10 +83,6 @@ class DDCMPolicy: ...@@ -79,10 +83,6 @@ class DDCMPolicy:
# If reduction required is 0 # If reduction required is 0
newdclevel = currentdclevel newdclevel = currentdclevel
# Check if new dc level computed is not less than whats permissible
if newdclevel < self.mindclevel:
newdclevel = self.maxdclevel
# If there was a slowdown in the last phase, then increase the duty # If there was a slowdown in the last phase, then increase the duty
# cycle level corresponding to the slowdown # cycle level corresponding to the slowdown
else: else:
...@@ -93,11 +93,10 @@ class DDCMPolicy: ...@@ -93,11 +93,10 @@ class DDCMPolicy:
newdclevel = currentdclevel + dcincrease newdclevel = currentdclevel + dcincrease
# Check if new dc level computed is not greater than whats # Check if new dc level computed is within permissible range, else
# permissible # reset
if newdclevel > self.maxdclevel: if newdclevel < self.mindclevel or newdclevel > self.maxdclevel:
newdclevel = self.maxdclevel newdclevel = self.maxdclevel
# Set the duty cycle of cpu to the new value computed # Set the duty cycle of cpu to the new value computed
self.dc.set(cpu, newdclevel) self.dc.set(cpu, newdclevel)
......
...@@ -37,7 +37,8 @@ logger = logging.getLogger('nrm') ...@@ -37,7 +37,8 @@ logger = logging.getLogger('nrm')
class PowerPolicyManager: class PowerPolicyManager:
""" Used for power policy application """ """ Used for power policy application """
def __init__(self, cpus=None, policy=None, damper=0.1, slowdown=1.1): def __init__(self, cpus=None, policy=None, damper=1000000000,
slowdown=1.1):
self.cpus = cpus self.cpus = cpus
self.policy = policy self.policy = policy
self.damper = damper self.damper = damper
...@@ -56,7 +57,7 @@ class PowerPolicyManager: ...@@ -56,7 +57,7 @@ class PowerPolicyManager:
# Book-keeping # Book-keeping
self.damperexits = 0 self.damperexits = 0
self.slowdownexits = 0 self.slowdownexits = 0
self.prevtolalphasetime = 10000.0 # Any large value self.prevtolalphasetime = dict.fromkeys(self.cpus, None)
def run_policy(self, phase_contexts): def run_policy(self, phase_contexts):
# Run only if policy is specified # Run only if policy is specified
...@@ -69,17 +70,19 @@ class PowerPolicyManager: ...@@ -69,17 +70,19 @@ class PowerPolicyManager:
# Select and invoke appropriate power policy # Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition # TODO: Need to add a better policy selection logic in addition
# to user specified using manifest file # to user specified using manifest file
ret, value = self.invoke_policy(id, **phase_contexts[id]) ret, value = self.execute(id, **phase_contexts[id])
if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']: if self.policy == 'DDCM':
self.dclevel[id] = value if ret == 'DDCM':
self.dclevel[id] = value
# Incase of slowdown experienced by even process, reset all
# cpus
if ret == 'SLOWDOWN':
self.reset_all()
phase_contexts[id]['set'] = False phase_contexts[id]['set'] = False
def invoke_policy(self, cpu, **kwargs): def execute(self, cpu, **kwargs):
# Calculate time spent in computation, barrier in current phase along computetime = kwargs['computetime']
# with total phase time totalphasetime = kwargs['totaltime']
computetime = kwargs['endcompute'] - kwargs['startcompute']
barriertime = kwargs['endbarrier'] - kwargs['startbarrier']
totalphasetime = computetime + barriertime
# If the current phase length is less than the damper value, then do # If the current phase length is less than the damper value, then do
# not use policy. This avoids use of policy during startup operation # not use policy. This avoids use of policy during startup operation
...@@ -88,23 +91,25 @@ class PowerPolicyManager: ...@@ -88,23 +91,25 @@ class PowerPolicyManager:
self.damperexits += 1 self.damperexits += 1
return 'DAMPER', -1 return 'DAMPER', -1
# Reset value for next phase
self.prevtolalphasetime = totalphasetime
# If the current phase has slowed down beyond the threshold set, then # If the current phase has slowed down beyond the threshold set, then
# reset power. This helps correct error in policy application or acts # reset power. This helps correct error in policy application or acts
# as a rudimentary way to detect phase change # as a rudimentary way to detect phase change
if(self.dclevel[cpu] < self.ddcmpolicy.maxdclevel and totalphasetime > if(self.prevtolalphasetime[cpu] is not None and totalphasetime >
self.slowdown * self.prevtolalphasetime): self.slowdown * self.prevtolalphasetime[cpu]):
self.ddcmpolicy.dc.reset(cpu) self.ddcmpolicy.dc.reset(cpu)
newdclevel = self.ddcmpolicy.maxdclevel newdclevel = self.ddcmpolicy.maxdclevel
# Reset value for next phase
self.prevtolalphasetime[cpu] = totalphasetime
return 'SLOWDOWN', newdclevel return 'SLOWDOWN', newdclevel
# Invoke the correct policy based on operation module # Invoke the correct policy based on operation module
if self.policy == "DDCM": if self.policy == "DDCM":
newdclevel = self.ddcmpolicy.execute(cpu, self.dclevel[cpu], newdclevel = self.ddcmpolicy.execute(cpu, self.dclevel[cpu],
computetime, totalphasetime) computetime, totalphasetime)
# Reset value for next phase
self.prevtolalphasetime[cpu] = totalphasetime
# TODO: Add DVFS and Combined policies # TODO: Add DVFS and Combined policies
...@@ -112,18 +117,21 @@ class PowerPolicyManager: ...@@ -112,18 +117,21 @@ class PowerPolicyManager:
def print_policy_stats(self, resetflag=False): def print_policy_stats(self, resetflag=False):
# Get statistics for policy run # Get statistics for policy run
print('PowerPolicyManager: DamperExits %d SlowdownExits %d' % ppstats = dict()
(self.damperexits, self.slowdownexits)) ppstats['PowerPolicyDamperExits'] = self.damperexits
self.ddcmpolicy.print_stats(resetflag) ppstats['PowerPolicySlowdownExits'] = self.slowdownexits
ppstats.update(self.ddcmpolicy.print_stats(resetflag))
if resetflag: if resetflag:
self.damperexits = 0 self.damperexits = 0
self.slowdownexits = 0 self.slowdownexits = 0
return ppstats
def power_reset(self, cpu): def power_reset(self, cpu):
# Reset all power controls # Reset power control
self.ddcmpolicy.dc.reset(cpu) self.ddcmpolicy.dc.reset(cpu)
# Reset value
self.dclevel[cpu] = self.maxdclevel self.dclevel[cpu] = self.maxdclevel
def power_check(self, cpu): def power_check(self, cpu):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment