Commit 1c6ac937 authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'wip/aggregative-downstream_for_merge' into 'master'

Aggregative Downstream API integration and Support for Process/Task pinning

See merge request !47
parents 95188157 f3c53106
Pipeline #4962 failed with stages
in 6 minutes and 36 seconds
......@@ -14,8 +14,8 @@
{
"name": "argo/container",
"value": {
"cpus": "4",
"mems": "1"
"cpus": "24",
"mems": "2"
}
},
{
......@@ -30,9 +30,15 @@
"enabled": "1",
"profile": "1",
"policy": "NONE",
"damper": "0.1",
"damper": "1e9",
"slowdown": "1.1"
}
},
{
"name": "argo/hwbind",
"value": {
"enabled": "1"
}
}
]
}
......
......@@ -191,6 +191,37 @@ class Power(SpecField):
logger.error("Invalid value for power policy slowdown: %s",
self.policy)
return False
if self.damper < 0.0:
logger.error("Invalid value of powerpolicy damper: %s",
self.policy)
return False
if self.slowdown < 1.0:
logger.error("Invalid value of powerpolicy slowdown: %s",
self.policy)
return False
return True
class HwBind(SpecField):
"""Hardware bindings for a container."""
fields = {"enabled": spec(unicode, False),
}
def __init__(self):
"""Create empty hardware bindings settings object."""
pass
def load(self, data):
"""Load hardware bindings settings."""
ret = super(HwBind, self).load(data)
if not ret:
return ret
if self.enabled not in ["0", "False", "1", "True"]:
logger.error("Invalid value for hardware bindings enabled: %s",
self.enabled)
return False
return True
......@@ -202,6 +233,7 @@ class IsolatorList(SpecField):
"argo/container": spec(Container, True),
"argo/perfwrapper": spec(PerfWrapper, False),
"argo/power": spec(Power, False),
"argo/hwbind": spec(HwBind, False),
}
def __init__(self):
......
......@@ -64,9 +64,10 @@ class Application(object):
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = msg.cpu
self.phase_contexts[id] = {k: getattr(msg, k) for k in ('startcompute',
'endcompute', 'startbarrier', 'endbarrier')}
id = int(msg.cpu)
self.phase_contexts[id] = {k: getattr(msg, k) for k in
('aggregation', 'computetime',
'totaltime')}
self.phase_contexts[id]['set'] = True
......@@ -85,8 +86,7 @@ class ApplicationManager(object):
progress = 0
threads = False
phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier']
phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime']
if container.power['policy']:
ids = container.resources['cpus']
for id in ids:
......
......@@ -4,11 +4,12 @@ from aci import ImageManifest
from collections import namedtuple
import logging
from subprograms import ChrtClient, NodeOSClient, resources
import uuid
import operator
logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
'power', 'processes', 'clientids'])
'power', 'processes', 'clientids',
'hwbindings'])
class ContainerManager(object):
......@@ -27,6 +28,7 @@ class ContainerManager(object):
self.containers = dict()
self.pids = dict()
self.resourcemanager = rm
self.hwloc = rm.hwloc
self.chrt = ChrtClient()
self.pmpi_lib = pmpi_lib
......@@ -35,20 +37,22 @@ class ContainerManager(object):
Returns the pid of the container or a negative number for errors."""
container = None
container_name = None
containerexistsflag = False
processes = None
clientids = None
pp = None
hwbindings = None
bind_index = 0
manifestfile = request['manifest']
command = request['file']
args = request['args']
environ = request['environ']
ucontainername = request['uuid']
container_name = request['uuid']
logger.info("run: manifest file: %s", manifestfile)
logger.info("run: command: %s", command)
logger.info("run: args: %r", args)
logger.info("run: ucontainername: %s", ucontainername)
logger.info("run: container name: %s", container_name)
# TODO: Application library to load must be set during configuration
apppreloadlibrary = self.pmpi_lib
......@@ -64,26 +68,23 @@ class ContainerManager(object):
else:
argv = []
# Check if user-specified container exists else create it
if ucontainername in self.containers:
container_name = ucontainername
container = self.containers[ucontainername]
# Check if container exists else create it
if container_name in self.containers:
container = self.containers[container_name]
containerexistsflag = True
processes = container.processes
clientids = container.clientids
hwbindings = container.hwbindings
bind_index = len(processes)
else:
processes = dict()
clientids = dict()
if ucontainername:
container_name = ucontainername
else:
# If no user-specified container name create one
container_name = str(uuid.uuid4())
hwbindings = dict()
# ask the resource manager for resources
req = resources(int(manifest.app.isolators.container.cpus.value),
int(manifest.app.isolators.container.mems.value))
ncpus = int(manifest.app.isolators.container.cpus.value)
nmems = int(manifest.app.isolators.container.mems.value)
req = resources(ncpus, nmems)
alloc = self.resourcemanager.schedule(container_name, req)
logger.info("run: allocation: %r", alloc)
......@@ -123,7 +124,16 @@ class ContainerManager(object):
container_power['policy'] = pp.policy
container_power['damper'] = pp.damper
container_power['slowdown'] = pp.slowdown
environ['LD_PRELOAD'] = apppreloadlibrary
# Compute hardware bindings
if hasattr(manifest.app.isolators, 'hwbind'):
manifest_hwbind = manifest.app.isolators.hwbind
if hasattr(manifest_hwbind, 'enabled'):
if manifest_hwbind.enabled in ["1", "True"]:
hwbindings['enabled'] = True
hwbindings['distrib'] = sorted(self.hwloc.distrib(
ncpus, alloc), key=operator.
attrgetter('cpus'))
# build context to execute
# environ['PATH'] = ("/usr/local/sbin:"
......@@ -132,6 +142,25 @@ class ContainerManager(object):
environ['PERF'] = self.linuxperf
environ['AC_APP_NAME'] = manifest.name
environ['AC_METADATA_URL'] = "localhost"
if (containerexistsflag and container.power['policy'] is not None) or (
pp is not None and pp.policy != "NONE"):
environ['LD_PRELOAD'] = apppreloadlibrary
environ['NRM_TRANSMIT'] = '1'
if containerexistsflag:
environ['NRM_DAMPER'] = container.power['damper']
else:
environ['NRM_DAMPER'] = pp.damper
# Use hwloc-bind to launch each process in the conatiner by prepending
# it as an argument to the command line, if enabled in manifest.
# The hardware binding computed using hwloc-distrib is used here
# --single
if bool(hwbindings) and hwbindings['enabled']:
argv.append('hwloc-bind')
# argv.append('--single')
argv.append('core:'+str(hwbindings['distrib'][bind_index].cpus[0]))
argv.append('--membind')
argv.append('numa:'+str(hwbindings['distrib'][bind_index].mems[0]))
argv.append(command)
argv.extend(args)
......@@ -149,7 +178,7 @@ class ContainerManager(object):
else:
container = Container(container_name, manifest,
container_resources, container_power,
processes, clientids)
processes, clientids, hwbindings)
self.pids[process.pid] = container
self.containers[container_name] = container
logger.info("Container %s created and running : %r",
......
......@@ -124,19 +124,31 @@ class Controller(object):
"""Update tracking across the board to reflect the last action."""
actuator.update(action)
def run_policy_container(self, container, application):
"""Run policies on a container."""
ids = container.resources['cpus']
pcs = application.phase_contexts
# Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids):
# Only run policy if all phase contexts are an
# aggregation of same number of phases
aggs = [pcs[i]['aggregation'] for i in ids]
if aggs.count(aggs[0]) == len(aggs):
container.power['manager'].run_policy(pcs)
if filter(lambda i: pcs[i]['set'], ids):
logger.debug("Phase context not reset %r", application)
else:
container.power['manager'].reset_all()
for i in ids:
pcs[i]['set'] = False
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
pp = containers[container].power
if pp['policy']:
p = containers[container].power
if p['policy']:
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
ids = containers[container].resources['cpus']
# Run policy only if all phase contexts have been received
if not filter(lambda i: not app.phase_contexts[i]['set'],
ids):
pp['manager'].run_policy(app.phase_contexts)
if filter(lambda i: app.phase_contexts[i]['set'], ids):
logger.debug("Phase context not reset %r", app)
self.run_policy_container(containers[container], app)
......@@ -142,12 +142,10 @@ class rapl_reader:
ret[k] = dvals
return ret
def diffenergy(self,e1,e2,shortenFlag=False): # e1 is prev and e2 is not
def diffenergy(self,e1,e2): # e1 is prev and e2 is not
ret = {}
ret['time'] = e2['time'] - e1['time']
for k in self.max_energy_range_uj_d:
if shortenFlag:
k = self.shortenkey(k)
if e2[k]>=e1[k]:
ret[k] = e2[k] - e1[k]
else:
......@@ -157,7 +155,7 @@ class rapl_reader:
# calculate the average power from two energy values
# e1 and e2 are the value returned from readenergy()
# e1 should be sampled before e2
def calcpower(self,e1,e2,shortenFlag=False):
def calcpower(self,e1,e2):
ret = {}
delta = e2['time'] - e1['time'] # assume 'time' never wrap around
ret['delta'] = delta
......@@ -168,8 +166,6 @@ class rapl_reader:
return ret
for k in self.max_energy_range_uj_d:
if shortenFlag:
k = self.shortenkey(k)
if e2[k]>=e1[k]:
ret[k] = e2[k] - e1[k]
else:
......
......@@ -7,6 +7,7 @@ from powerpolicy import PowerPolicyManager
from functools import partial
import logging
import os
import socket
from resources import ResourceManager
from sensor import SensorManager
import signal
......@@ -59,9 +60,13 @@ class Daemon(object):
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
c = self.container_manager.containers[app.cid]
if bool(self.container_manager.containers):
cid = app.container_uuid
c = self.container_manager.containers[cid]
if c.power['policy']:
app.update_phase_context(msg)
# Run container policy
self.controller.run_policy_container(c, app)
elif msg.type == 'application_exit':
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
......@@ -106,7 +111,7 @@ class Daemon(object):
'type': 'container_start',
'container_uuid': container_uuid,
'errno': 0 if container else -1,
'power': container.power['policy'] or dict()
'power': container.power['policy'] or str(None)
}
self.upstream_pub_server.sendmsg(
PUB_MSG['container_start'](**update))
......@@ -178,8 +183,8 @@ class Daemon(object):
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
# Call policy only if there are containers
if self.container_manager.containers:
self.controller.run_policy(self.container_manager.containers)
# if self.container_manager.containers:
# self.controller.run_policy(self.container_manager.containers)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -230,20 +235,25 @@ class Daemon(object):
'container_uuid': container.uuid,
'profile_data': dict(),
}
pp = container.power
if pp['policy']:
pp['manager'].reset_all()
if pp['profile']:
e = pp['profile']['end']
p = container.power
if p['policy']:
p['manager'].reset_all()
if p['profile']:
e = p['profile']['end']
self.machine_info = self.sensor_manager.do_update()
e = self.machine_info['energy']['energy']
e['time'] = self.machine_info['time']
s = pp['profile']['start']
s = p['profile']['start']
# Calculate difference between the values
diff = self.sensor_manager.calc_difference(s, e)
# Get final package temperature
temp = self.machine_info['temperature']
diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
diff['policy'] = p['policy']
if p['policy']:
diff['damper'] = float(p['damper'])/1000000000
diff['slowdown'] = p['slowdown']
diff['nodename'] = self.sensor_manager.nodename
logger.info("Container %r profile data: %r",
container.uuid, diff)
msg['profile_data'] = diff
......
......@@ -24,6 +24,7 @@
p. 3. ACM, 2015.
"""
from __future__ import division
import math
import coolr
import coolr.dutycycle
......@@ -43,12 +44,15 @@ class DDCMPolicy:
self.dc = coolr.dutycycle.DutyCycle()
def print_stats(self, resetflag=False):
print('DDCM Policy: DDCMPolicySets %d DDCMPolicyResets %d' %
(self.ddcmpolicyset, self.ddcmpolicyreset))
ddcmstats = dict()
ddcmstats['DDCMPolicySets'] = self.ddcmpolicyset
ddcmstats['DDCMPolicyResets'] = self.ddcmpolicyreset
if resetflag:
self.ddcmpolicyset = 0
self.ddcmpolicyreset = 0
return ddcmstats
def execute(self, cpu, currentdclevel, computetime, totalphasetime):
# Compute work done by cpu during current phase
work = computetime / totalphasetime
......@@ -79,10 +83,6 @@ class DDCMPolicy:
# If reduction required is 0
newdclevel = currentdclevel
# Check if new dc level computed is not less than whats permissible
if newdclevel < self.mindclevel:
newdclevel = self.maxdclevel
# If there was a slowdown in the last phase, then increase the duty
# cycle level corresponding to the slowdown
else:
......@@ -93,11 +93,10 @@ class DDCMPolicy:
newdclevel = currentdclevel + dcincrease
# Check if new dc level computed is not greater than whats
# permissible
if newdclevel > self.maxdclevel:
# Check if new dc level computed is within permissible range, else
# reset
if newdclevel < self.mindclevel or newdclevel > self.maxdclevel:
newdclevel = self.maxdclevel
# Set the duty cycle of cpu to the new value computed
self.dc.set(cpu, newdclevel)
......
......@@ -43,7 +43,7 @@ MSGFORMATS['up_rpc_rep'] = {'list': {'payload': list},
MSGFORMATS['up_pub'] = {'power': {'total': float, 'limit': float},
'container_start': {'container_uuid': basestring,
'errno': int,
'power': dict},
'power': basestring},
'container_exit': {'container_uuid': basestring,
'profile_data': dict},
'performance': {'container_uuid': basestring,
......@@ -63,10 +63,10 @@ MSGFORMATS['down_event'] = {'application_start':
'application_uuid': basestring,
'container_uuid': basestring},
'phase_context': {'cpu': int,
'startcompute': int,
'endcompute': int,
'startbarrier': int,
'endbarrier': int},
'aggregation': int,
'computetime': int,
'totaltime': int,
'application_uuid': basestring},
}
# Mirror of the message formats, using namedtuples as the actual transport
......
......@@ -37,7 +37,8 @@ logger = logging.getLogger('nrm')
class PowerPolicyManager:
""" Used for power policy application """
def __init__(self, cpus=None, policy=None, damper=0.1, slowdown=1.1):
def __init__(self, cpus=None, policy=None, damper=1000000000,
slowdown=1.1):
self.cpus = cpus
self.policy = policy
self.damper = damper
......@@ -56,7 +57,7 @@ class PowerPolicyManager:
# Book-keeping
self.damperexits = 0
self.slowdownexits = 0
self.prevtolalphasetime = 10000.0 # Any large value
self.prevtolalphasetime = dict.fromkeys(self.cpus, None)
def run_policy(self, phase_contexts):
# Run only if policy is specified
......@@ -69,17 +70,19 @@ class PowerPolicyManager:
# Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition
# to user specified using manifest file
ret, value = self.invoke_policy(id, **phase_contexts[id])
if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']:
ret, value = self.execute(id, **phase_contexts[id])
if self.policy == 'DDCM':
if ret == 'DDCM':
self.dclevel[id] = value
# Incase of slowdown experienced by even process, reset all
# cpus
if ret == 'SLOWDOWN':
self.reset_all()
phase_contexts[id]['set'] = False
def invoke_policy(self, cpu, **kwargs):
# Calculate time spent in computation, barrier in current phase along
# with total phase time
computetime = kwargs['endcompute'] - kwargs['startcompute']
barriertime = kwargs['endbarrier'] - kwargs['startbarrier']
totalphasetime = computetime + barriertime
def execute(self, cpu, **kwargs):
computetime = kwargs['computetime']
totalphasetime = kwargs['totaltime']
# If the current phase length is less than the damper value, then do
# not use policy. This avoids use of policy during startup operation
......@@ -88,23 +91,25 @@ class PowerPolicyManager:
self.damperexits += 1
return 'DAMPER', -1
# Reset value for next phase
self.prevtolalphasetime = totalphasetime
# If the current phase has slowed down beyond the threshold set, then
# reset power. This helps correct error in policy application or acts
# as a rudimentary way to detect phase change
if(self.dclevel[cpu] < self.ddcmpolicy.maxdclevel and totalphasetime >
self.slowdown * self.prevtolalphasetime):
if(self.prevtolalphasetime[cpu] is not None and totalphasetime >
self.slowdown * self.prevtolalphasetime[cpu]):
self.ddcmpolicy.dc.reset(cpu)
newdclevel = self.ddcmpolicy.maxdclevel
# Reset value for next phase
self.prevtolalphasetime[cpu] = totalphasetime
return 'SLOWDOWN', newdclevel
# Invoke the correct policy based on operation module
if self.policy == "DDCM":
newdclevel = self.ddcmpolicy.execute(cpu, self.dclevel[cpu],
computetime, totalphasetime)
# Reset value for next phase
self.prevtolalphasetime[cpu] = totalphasetime
# TODO: Add DVFS and Combined policies
......@@ -112,18 +117,21 @@ class PowerPolicyManager:
def print_policy_stats(self, resetflag=False):
# Get statistics for policy run
print('PowerPolicyManager: DamperExits %d SlowdownExits %d' %
(self.damperexits, self.slowdownexits))
self.ddcmpolicy.print_stats(resetflag)
ppstats = dict()
ppstats['PowerPolicyDamperExits'] = self.damperexits
ppstats['PowerPolicySlowdownExits'] = self.slowdownexits
ppstats.update(self.ddcmpolicy.print_stats(resetflag))
if resetflag:
self.damperexits = 0
self.slowdownexits = 0
return ppstats
def power_reset(self, cpu):
# Reset all power controls
# Reset power control
self.ddcmpolicy.dc.reset(cpu)
# Reset value
self.dclevel[cpu] = self.maxdclevel
def power_check(self, cpu):
......
......@@ -28,11 +28,11 @@ class ResourceManager(object):
# - cpus are exclusive
# - memories exclusive if more than one left
if len(self.available.cpus) >= request.cpus:
retcpus = self.available.cpus[:request.cpus]
retcpus = sorted(self.available.cpus)[:request.cpus]
else:
retcpus = []
if len(self.available.mems) > 1:
retmems = self.available.mems[:request.mems]
retmems = sorted(self.available.mems)[:request.mems]
else:
retmems = self.available.mems
ret = resources(retcpus, retmems)
......
......@@ -20,6 +20,7 @@ class SensorManager:
def __init__(self):
self.nodeconfig = coolr.clr_nodeinfo.nodeconfig()
self.nodename = self.nodeconfig.nodename
self.cputopology = coolr.clr_nodeinfo.cputopology()
self.coretemp = coolr.clr_hwmon.coretemp_reader()
self.rapl = coolr.clr_rapl.rapl_reader()
......@@ -47,8 +48,15 @@ class SensorManager:
def calc_difference(self, start, end):
diff = dict()
for k in start.keys():
if k not in ['time']:
start[k.replace('p', 'package-')] = start[k]
start.pop(k)
end[k.replace('p', 'package-')] = end[k]
end.pop(k)
# Calculate energy difference
diff['energy'] = self.rapl.diffenergy(start, end, shortenFlag=True)
diff['energy'] = self.rapl.diffenergy(start, end)
# Update time elapsed
diff['time'] = diff['energy']['time']
# Remove 'time' field returned by function
......@@ -58,7 +66,7 @@ class SensorManager:
diff['energy']}
# Calculate power difference
diff['power'] = self.rapl.calcpower(start, end, shortenFlag=True)
diff['power'] = self.rapl.calcpower(start, end)
# Remove 'delta' field returned by function
diff['power'].pop('delta')
......
......@@ -32,7 +32,7 @@ def list2bitmask(l):
"""Convert a list into a bitmask."""
m = 0
for e in l:
m |= 1 << e
m |= 1 << int(e)
return hex(m)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment