From a501c976f487da91f9dc49d2101b219788d64ee2 Mon Sep 17 00:00:00 2001 From: Sridutt Bhalachandra Date: Wed, 25 Jul 2018 08:57:56 -0500 Subject: [PATCH] [feature] Aggregative Downstream API integration Adds support for aggregation of phase context information for an application. The damper value (in nanoseconds in the manifest file) decides the minimum phase length for which the phase context information is sent to the NRM (implemented in 'libnrm' repo [See Issue 2]). This will limit the number of msgs sent to the NRM. See Issue #13 --- nrm/aci.py | 8 ++++++++ nrm/applications.py | 9 ++++---- nrm/controller.py | 30 +++++++++++++++++++-------- nrm/daemon.py | 16 +++++++++------ nrm/ddcmpolicy.py | 21 +++++++++---------- nrm/powerpolicy.py | 50 ++++++++++++++++++++++++++------------------- 6 files changed, 82 insertions(+), 52 deletions(-) diff --git a/nrm/aci.py b/nrm/aci.py index dca2af8..300e34b 100644 --- a/nrm/aci.py +++ b/nrm/aci.py @@ -191,6 +191,14 @@ class Power(SpecField): logger.error("Invalid value for power policy slowdown: %s", self.policy) return False + if self.damper < 0.0: + logger.error("Invalid value of powerpolicy damper: %s", + self.policy) + return False + if self.slowdown < 1.0: + logger.error("Invalid value of powerpolicy slowdown: %s", + self.policy) + return False return True diff --git a/nrm/applications.py b/nrm/applications.py index efc1a76..19f72ce 100644 --- a/nrm/applications.py +++ b/nrm/applications.py @@ -64,9 +64,9 @@ class Application(object): def update_phase_context(self, msg): """Update the phase contextual information.""" - id = msg.cpu - self.phase_contexts[id] = {k: getattr(msg, k) for k in ('startcompute', - 'endcompute', 'startbarrier', 'endbarrier')} + id = int(msg['cpu']) + self.phase_contexts[id] = {k: int(msg[k]) for k in ('aggregation', + 'computetime', 'totaltime')} self.phase_contexts[id]['set'] = True @@ -85,8 +85,7 @@ class ApplicationManager(object): progress = 0 threads = False phase_contexts = dict() - phase_context_keys = ['set', 'startcompute', 'endcompute', - 'startbarrier', 'endbarrier'] + phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime'] if container.power['policy']: ids = container.resources['cpus'] for id in ids: diff --git a/nrm/controller.py b/nrm/controller.py index 08ee5db..3b516e0 100644 --- a/nrm/controller.py +++ b/nrm/controller.py @@ -124,19 +124,31 @@ class Controller(object): """Update tracking across the board to reflect the last action.""" actuator.update(action) + def run_policy_container(self, container, application): + """Run policies on a container.""" + ids = container.resources['cpus'] + pcs = application.phase_contexts + # Run policy only if all phase contexts have been received + if not filter(lambda i: not pcs[i]['set'], ids): + # Only run policy if all phase contexts are an + # aggregation of same number of phases + aggs = [pcs[i]['aggregation'] for i in ids] + if aggs.count(aggs[0]) == len(aggs): + container.power['manager'].run_policy(pcs) + if filter(lambda i: pcs[i]['set'], ids): + logger.debug("Phase context not reset %r", application) + else: + container.power['manager'].reset_all() + for i in ids: + pcs[i]['set'] = False + def run_policy(self, containers): """Run policies on containers with policies set.""" for container in containers: - pp = containers[container].power - if pp['policy']: + p = containers[container].power + if p['policy']: apps = self.actuators[0].application_manager.applications if apps: app = next(apps[a] for a in apps if apps[a].container_uuid == container) - ids = containers[container].resources['cpus'] - # Run policy only if all phase contexts have been received - if not filter(lambda i: not app.phase_contexts[i]['set'], - ids): - pp['manager'].run_policy(app.phase_contexts) - if filter(lambda i: app.phase_contexts[i]['set'], ids): - logger.debug("Phase context not reset %r", app) + self.run_policy_container(containers[container], app) diff --git a/nrm/daemon.py b/nrm/daemon.py index e0cc0bf..5d3e731 100644 --- a/nrm/daemon.py +++ b/nrm/daemon.py @@ -230,20 +230,24 @@ class Daemon(object): 'container_uuid': container.uuid, 'profile_data': dict(), } - pp = container.power - if pp['policy']: - pp['manager'].reset_all() - if pp['profile']: - e = pp['profile']['end'] + p = container.power + if p['policy']: + p['manager'].reset_all() + if p['profile']: + e = p['profile']['end'] self.machine_info = self.sensor_manager.do_update() e = self.machine_info['energy']['energy'] e['time'] = self.machine_info['time'] - s = pp['profile']['start'] + s = p['profile']['start'] # Calculate difference between the values diff = self.sensor_manager.calc_difference(s, e) # Get final package temperature temp = self.machine_info['temperature'] diff['temp'] = map(lambda k: temp[k]['pkg'], temp) + diff['policy'] = p['policy'] + if p['policy']: + diff['damper'] = float(p['damper'])/1000000000 + diff['slowdown'] = p['slowdown'] logger.info("Container %r profile data: %r", container.uuid, diff) msg['profile_data'] = diff diff --git a/nrm/ddcmpolicy.py b/nrm/ddcmpolicy.py index 62bdc1f..b7d64cc 100644 --- a/nrm/ddcmpolicy.py +++ b/nrm/ddcmpolicy.py @@ -24,6 +24,7 @@ p. 3. ACM, 2015. """ +from __future__ import division import math import coolr import coolr.dutycycle @@ -43,12 +44,15 @@ class DDCMPolicy: self.dc = coolr.dutycycle.DutyCycle() def print_stats(self, resetflag=False): - print('DDCM Policy: DDCMPolicySets %d DDCMPolicyResets %d' % - (self.ddcmpolicyset, self.ddcmpolicyreset)) + ddcmstats = dict() + ddcmstats['DDCMPolicySets'] = self.ddcmpolicyset + ddcmstats['DDCMPolicyResets'] = self.ddcmpolicyreset if resetflag: self.ddcmpolicyset = 0 self.ddcmpolicyreset = 0 + return ddcmstats + def execute(self, cpu, currentdclevel, computetime, totalphasetime): # Compute work done by cpu during current phase work = computetime / totalphasetime @@ -79,10 +83,6 @@ class DDCMPolicy: # If reduction required is 0 newdclevel = currentdclevel - # Check if new dc level computed is not less than whats permissible - if newdclevel < self.mindclevel: - newdclevel = self.maxdclevel - # If there was a slowdown in the last phase, then increase the duty # cycle level corresponding to the slowdown else: @@ -93,11 +93,10 @@ class DDCMPolicy: newdclevel = currentdclevel + dcincrease - # Check if new dc level computed is not greater than whats - # permissible - if newdclevel > self.maxdclevel: - newdclevel = self.maxdclevel - + # Check if new dc level computed is within permissible range, else + # reset + if newdclevel < self.mindclevel or newdclevel > self.maxdclevel: + newdclevel = self.maxdclevel # Set the duty cycle of cpu to the new value computed self.dc.set(cpu, newdclevel) diff --git a/nrm/powerpolicy.py b/nrm/powerpolicy.py index f9ffc65..3884eda 100644 --- a/nrm/powerpolicy.py +++ b/nrm/powerpolicy.py @@ -37,7 +37,8 @@ logger = logging.getLogger('nrm') class PowerPolicyManager: """ Used for power policy application """ - def __init__(self, cpus=None, policy=None, damper=0.1, slowdown=1.1): + def __init__(self, cpus=None, policy=None, damper=1000000000, + slowdown=1.1): self.cpus = cpus self.policy = policy self.damper = damper @@ -56,7 +57,7 @@ class PowerPolicyManager: # Book-keeping self.damperexits = 0 self.slowdownexits = 0 - self.prevtolalphasetime = 10000.0 # Any large value + self.prevtolalphasetime = dict.fromkeys(self.cpus, None) def run_policy(self, phase_contexts): # Run only if policy is specified @@ -69,17 +70,19 @@ class PowerPolicyManager: # Select and invoke appropriate power policy # TODO: Need to add a better policy selection logic in addition # to user specified using manifest file - ret, value = self.invoke_policy(id, **phase_contexts[id]) - if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']: - self.dclevel[id] = value + ret, value = self.execute(id, **phase_contexts[id]) + if self.policy == 'DDCM': + if ret == 'DDCM': + self.dclevel[id] = value + # Incase of slowdown experienced by even process, reset all + # cpus + if ret == 'SLOWDOWN': + self.reset_all() phase_contexts[id]['set'] = False - def invoke_policy(self, cpu, **kwargs): - # Calculate time spent in computation, barrier in current phase along - # with total phase time - computetime = kwargs['endcompute'] - kwargs['startcompute'] - barriertime = kwargs['endbarrier'] - kwargs['startbarrier'] - totalphasetime = computetime + barriertime + def execute(self, cpu, **kwargs): + computetime = kwargs['computetime'] + totalphasetime = kwargs['totaltime'] # If the current phase length is less than the damper value, then do # not use policy. This avoids use of policy during startup operation @@ -88,23 +91,25 @@ class PowerPolicyManager: self.damperexits += 1 return 'DAMPER', -1 - # Reset value for next phase - self.prevtolalphasetime = totalphasetime - # If the current phase has slowed down beyond the threshold set, then # reset power. This helps correct error in policy application or acts # as a rudimentary way to detect phase change - if(self.dclevel[cpu] < self.ddcmpolicy.maxdclevel and totalphasetime > - self.slowdown * self.prevtolalphasetime): + if(self.prevtolalphasetime[cpu] is not None and totalphasetime > + self.slowdown * self.prevtolalphasetime[cpu]): self.ddcmpolicy.dc.reset(cpu) newdclevel = self.ddcmpolicy.maxdclevel + # Reset value for next phase + self.prevtolalphasetime[cpu] = totalphasetime + return 'SLOWDOWN', newdclevel # Invoke the correct policy based on operation module if self.policy == "DDCM": newdclevel = self.ddcmpolicy.execute(cpu, self.dclevel[cpu], computetime, totalphasetime) + # Reset value for next phase + self.prevtolalphasetime[cpu] = totalphasetime # TODO: Add DVFS and Combined policies @@ -112,18 +117,21 @@ class PowerPolicyManager: def print_policy_stats(self, resetflag=False): # Get statistics for policy run - print('PowerPolicyManager: DamperExits %d SlowdownExits %d' % - (self.damperexits, self.slowdownexits)) - self.ddcmpolicy.print_stats(resetflag) - + ppstats = dict() + ppstats['PowerPolicyDamperExits'] = self.damperexits + ppstats['PowerPolicySlowdownExits'] = self.slowdownexits + ppstats.update(self.ddcmpolicy.print_stats(resetflag)) if resetflag: self.damperexits = 0 self.slowdownexits = 0 + return ppstats + def power_reset(self, cpu): - # Reset all power controls + # Reset power control self.ddcmpolicy.dc.reset(cpu) + # Reset value self.dclevel[cpu] = self.maxdclevel def power_check(self, cpu): -- 2.26.2