Commit e47c3494 authored by Valentin Reis's avatar Valentin Reis
Browse files

testing black

parent 476517f2
Pipeline #7816 failed with stages
in 2 minutes and 51 seconds
......@@ -4,9 +4,8 @@
rec {
nrm = pkgs.nrm;
hack = nrm.overrideAttrs (old:{
buildInputs = old.buildInputs ++ [
pkgs.python3Packages.flake8
pkgs.python3Packages.autopep8
pkgs.python3Packages.sphinx ];
propagatedBuildInputs = old.propagatedBuildInputs ++ [
pkgs.python37Packages.flake8
pkgs.python37Packages.black ];
});
}
......@@ -12,11 +12,11 @@
import logging
from schema import loadschema
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
def has(self, f):
return(f in self.app.keys())
return f in self.app.keys()
ImageManifest = loadschema("yml", "manifest")
......
......@@ -12,28 +12,30 @@ from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
class Application(object):
"""Information about a downstream API user."""
thread_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'noop': 'max'},
's_ask_d': {'done': 'stable', 'noop': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'noop': 'noop'},
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
thread_fsm_table = {
"stable": {"i": "s_ask_i", "d": "s_ask_d"},
"s_ask_i": {"done": "stable", "noop": "max"},
"s_ask_d": {"done": "stable", "noop": "min"},
"max": {"d": "max_ask_d"},
"min": {"i": "min_ask_i"},
"max_ask_d": {"done": "stable", "noop": "noop"},
"min_ask_i": {"done": "stable", "noop": "noop"},
"noop": {},
}
def __init__(self, uuid, container, progress, threads, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.thread_state = "stable"
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
......@@ -49,21 +51,21 @@ class Application(object):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
speed = float(self.progress) / float(self.threads["cur"])
if command == "i":
return speed
else:
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
newth = msg["payload"]
curth = self.threads["cur"]
if newth == curth:
self.do_thread_transition('noop')
self.do_thread_transition("noop")
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
self.do_thread_transition("done")
self.threads["cur"] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
......@@ -75,10 +77,11 @@ class Application(object):
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = int(msg.cpu)
self.phase_contexts[id] = {k: getattr(msg, k) for k in
('aggregation', 'computetime',
'totaltime')}
self.phase_contexts[id]['set'] = True
self.phase_contexts[id] = {
k: getattr(msg, k)
for k in ("aggregation", "computetime", "totaltime")
}
self.phase_contexts[id]["set"] = True
class ApplicationManager(object):
......@@ -91,21 +94,22 @@ class ApplicationManager(object):
def register(self, msg, container):
"""Register a new downstream application."""
uuid = msg['application_uuid']
container_uuid = msg['container_uuid']
uuid = msg["application_uuid"]
container_uuid = msg["container_uuid"]
progress = 0
threads = False
phase_contexts = dict()
phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime']
if container.power['policy']:
phase_context_keys = ["set", "aggregation", "computetime", "totaltime"]
if container.power["policy"]:
ids = container.resources.cpus
for id in ids:
phase_contexts[id] = dict.fromkeys(phase_context_keys)
phase_contexts[id]['set'] = False
phase_contexts[id]["set"] = False
else:
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
self.applications[uuid] = Application(
uuid, container_uuid, progress, threads, phase_contexts
)
def delete(self, uuid):
"""Delete an application from the register."""
......
......@@ -17,10 +17,19 @@ import logging
from subprograms import ChrtClient, NodeOSClient, resources, SingularityClient
import operator
logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
'power', 'processes', 'clientids',
'hwbindings'])
logger = logging.getLogger("nrm")
Container = namedtuple(
"Container",
[
"uuid",
"manifest",
"resources",
"power",
"processes",
"clientids",
"hwbindings",
],
)
class ContainerManager(object):
......@@ -28,11 +37,15 @@ class ContainerManager(object):
"""Manages the creation, listing and deletion of containers, using a
container runtime underneath."""
def __init__(self, container_runtime, rm,
perfwrapper="nrm-perfwrapper",
linuxperf="perf",
pmpi_lib="/usr/lib/libnrm-pmpi.so",
downstream_event_uri="ipc:///tmp/nrm-downstream-event"):
def __init__(
self,
container_runtime,
rm,
perfwrapper="nrm-perfwrapper",
linuxperf="perf",
pmpi_lib="/usr/lib/libnrm-pmpi.so",
downstream_event_uri="ipc:///tmp/nrm-downstream-event",
):
self.linuxperf = linuxperf
self.perfwrapper = perfwrapper
self.runtime = container_runtime
......@@ -55,50 +68,61 @@ class ContainerManager(object):
return (False, self.containers[container_name])
# ask the resource manager for resources
ncpus = manifest.app['slice']['cpus']
nmems = manifest.app['slice']['mems']
ncpus = manifest.app["slice"]["cpus"]
nmems = manifest.app["slice"]["mems"]
req = resources(ncpus, nmems)
allocated = self.resourcemanager.schedule(container_name, req)
logger.info("create: allocation: %r", allocated)
# Container power settings
container_power = dict()
container_power['profile'] = None
container_power['policy'] = None
container_power['damper'] = None
container_power['slowdown'] = None
container_power['manager'] = None
if manifest.is_feature_enabled('power'):
pp = manifest.app['power']
if pp['profile'] is True:
container_power['profile'] = dict()
container_power['profile']['start'] = dict()
container_power['profile']['end'] = dict()
if pp['policy'] != "NONE":
container_power['policy'] = pp['policy']
container_power['damper'] = manifest.ratelimit
container_power['slowdown'] = pp['slowdown']
container_power["profile"] = None
container_power["policy"] = None
container_power["damper"] = None
container_power["slowdown"] = None
container_power["manager"] = None
if manifest.is_feature_enabled("power"):
pp = manifest.app["power"]
if pp["profile"] is True:
container_power["profile"] = dict()
container_power["profile"]["start"] = dict()
container_power["profile"]["end"] = dict()
if pp["policy"] != "NONE":
container_power["policy"] = pp["policy"]
container_power["damper"] = manifest.ratelimit
container_power["slowdown"] = pp["slowdown"]
# Compute hardware bindings
hwbindings = dict()
if manifest.is_feature_enabled('hwbind'):
hwbindings['distrib'] = sorted(self.hwloc.distrib(
ncpus, allocated), key=operator.
attrgetter('cpus'))
return (True, Container(container_name, manifest, allocated,
container_power, {}, {}, hwbindings))
if manifest.is_feature_enabled("hwbind"):
hwbindings["distrib"] = sorted(
self.hwloc.distrib(ncpus, allocated),
key=operator.attrgetter("cpus"),
)
return (
True,
Container(
container_name,
manifest,
allocated,
container_power,
{},
{},
hwbindings,
),
)
def create(self, request):
"""Create a container according to the request.
Returns the pid of the container or a negative number for errors."""
manifestfile = request['manifest']
command = request['file']
args = request['args']
environ = request['environ']
container_name = request['uuid']
manifestfile = request["manifest"]
command = request["file"]
args = request["args"]
environ = request["environ"]
container_name = request["uuid"]
logger.info("create: manifest file: %s", manifestfile)
logger.info("create: command: %s", command)
logger.info("create: args: %r", args)
......@@ -109,10 +133,11 @@ class ContainerManager(object):
manifest = ImageManifest((load(f)))
except Exception as e:
logger.error("error occured in manifest loading:")
raise(e)
raise (e)
creation_needed, container = self._get_container_tuple(container_name,
manifest)
creation_needed, container = self._get_container_tuple(
container_name, manifest
)
if creation_needed:
logger.info("Creating container %s", container_name)
self.runtime.create(container, self.downstream_event_uri)
......@@ -121,32 +146,35 @@ class ContainerManager(object):
# build context to execute
# environ['PATH'] = ("/usr/local/sbin:"
# "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
environ['ARGO_CONTAINER_UUID'] = container_name
environ['PERF'] = self.linuxperf
environ['AC_APP_NAME'] = manifest['name']
environ['AC_METADATA_URL'] = "localhost"
environ["ARGO_CONTAINER_UUID"] = container_name
environ["PERF"] = self.linuxperf
environ["AC_APP_NAME"] = manifest["name"]
environ["AC_METADATA_URL"] = "localhost"
# power profiling uses LD_PRELOAD, we use get to ensure that it
# doesn't crash if the policy doesn't exits.
if container.power.get('policy'):
environ['LD_PRELOAD'] = self.pmpi_lib
environ['NRM_TRANSMIT'] = '1'
environ['ARGO_NRM_RATELIMIT'] = container.power['damper']
if container.power.get("policy"):
environ["LD_PRELOAD"] = self.pmpi_lib
environ["NRM_TRANSMIT"] = "1"
environ["ARGO_NRM_RATELIMIT"] = container.power["damper"]
# monitoring section involves libnrm
if manifest.is_feature_enabled('monitoring'):
environ['ARGO_NRM_RATELIMIT'] = \
manifest.app['monitoring']['ratelimit']
if container.power.get('policy') or \
manifest.is_feature_enabled('monitoring'):
environ['ARGO_NRM_DOWNSTREAM_EVENT_URI'] = \
self.downstream_event_uri
if manifest.is_feature_enabled("monitoring"):
environ["ARGO_NRM_RATELIMIT"] = manifest.app["monitoring"][
"ratelimit"
]
if container.power.get("policy") or manifest.is_feature_enabled(
"monitoring"
):
environ[
"ARGO_NRM_DOWNSTREAM_EVENT_URI"
] = self.downstream_event_uri
# build prefix to the entire command based on enabled features
argv = []
if manifest.is_feature_enabled('scheduler'):
sched = manifest.app['scheduler']
if manifest.is_feature_enabled("scheduler"):
sched = manifest.app["scheduler"]
argv = self.chrt.getwrappedcmd(sched)
# Use hwloc-bind to launch each process in the conatiner by prepending
......@@ -155,22 +183,23 @@ class ContainerManager(object):
# --single
if container.hwbindings:
# round robin over the cpu bindings available
bind_index = len(container.processes) % \
len(container.hwbindings['distrib'])
argv.append('hwloc-bind')
bind_index = len(container.processes) % len(
container.hwbindings["distrib"]
)
argv.append("hwloc-bind")
# argv.append('--single')
cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
memmask = container.hwbindings['distrib'][bind_index].mems[0]
logging.info('create: binding to: %s, %s', cpumask, memmask)
cpumask = container.hwbindings["distrib"][bind_index].cpus[0]
memmask = container.hwbindings["distrib"][bind_index].mems[0]
logging.info("create: binding to: %s, %s", cpumask, memmask)
argv.append("core:{}".format(cpumask))
argv.append('--membind')
argv.append("--membind")
argv.append("numa:{}".format(memmask))
# It would've been better if argo-perf-wrapper wrapped around
# argo-nodeos-config and not the final command -- that way it would
# be running outside of the container. However, because
# argo-nodeos-config is suid root, perf can't monitor it.
if manifest.is_feature_enabled('perfwrapper'):
if manifest.is_feature_enabled("perfwrapper"):
argv.append(self.perfwrapper)
argv.append(command)
......@@ -181,10 +210,11 @@ class ContainerManager(object):
# register the process
container.processes[process.pid] = process
container.clientids[process.pid] = request['clientid']
container.clientids[process.pid] = request["clientid"]
self.pids[process.pid] = container
logger.info("Created process %s in container %s", process.pid,
container_name)
logger.info(
"Created process %s in container %s", process.pid, container_name
)
return process.pid, container
def delete(self, uuid):
......@@ -208,8 +238,10 @@ class ContainerManager(object):
def list(self):
"""List the containers in the system."""
return [{'uuid': c.uuid, 'pid': c.processes.keys()}
for c in self.containers.values()]
return [
{"uuid": c.uuid, "pid": c.processes.keys()}
for c in self.containers.values()
]
class ContainerRuntime(object):
......@@ -273,8 +305,11 @@ class SingularityUserRuntime(ContainerRuntime):
def create(self, container, downstream_uri):
"""Uses the container resource allocation to create a container."""
imageinfo = container.manifest.image
self.client.instance_start(container.uuid, imageinfo['path'],
[downstream_uri]+imageinfo['binds'])
self.client.instance_start(
container.uuid,
imageinfo["path"],
[downstream_uri] + imageinfo["binds"],
)
def execute(self, container_uuid, args, environ):
"""Launches a command in the container."""
......@@ -298,10 +333,13 @@ class DummyRuntime(ContainerRuntime):
def execute(self, container_uuid, args, environ):
import tornado.process as process
return process.Subprocess(args,
stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM,
env=environ)
return process.Subprocess(
args,
stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM,
env=environ,
)
def delete(self, container_uuid, kill=False):
pass
......@@ -12,7 +12,7 @@ from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
class Action(object):
......@@ -73,19 +73,20 @@ class PowerActuator(object):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("power limits: %r:", pl)
if target == 'i':
if target == "i":
for k in pl:
r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW']))
r = range(int(pl[k]["curW"]) + 1, int(pl[k]["maxW"]))
actions.extend([Action(k, s, s - r[0]) for s in r])
elif target == 'd':
elif target == "d":
for k in pl:
r = range(1, int(pl[k]['curW']))
r = range(1, int(pl[k]["curW"]))
actions.extend([Action(k, s, r[-1] - s) for s in r])
return actions
def execute(self, action):
logger.info("changing power limit: %r, %r", action.command,
action.delta)
logger.info(
"changing power limit: %r, %r", action.command, action.delta
)
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
......@@ -102,17 +103,18 @@ class Controller(object):
def planify(self, target, machineinfo):
"""Plan the next action for the control loop."""
try:
total_power = machineinfo['energy']['power']['total']
total_power = machineinfo["energy"]["power"]["total"]
except TypeError:
logging.error("\"machineinfo\" malformed. Can not run "
"control loop.")
logging.error(
'"machineinfo" malformed. Can not run ' "control loop."
)
return (None, None)
direction = None
if total_power < target:
direction = 'i'
direction = "i"
elif total_power > target:
direction = 'd'
direction = "d"
if direction:
actions = []
......@@ -139,26 +141,29 @@ class Controller(object):
ids = container.resources.cpus
pcs = application.phase_contexts
# Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids):
if not filter(lambda i: not pcs[i]["set"], ids):
# Only run policy if all phase contexts are an
# aggregation of same number of phases
aggs = [pcs[i]['aggregation'] for i in ids]
aggs = [pcs[i]["aggregation"] for i in ids]
if aggs.count(aggs[0]) == len(aggs):
container.power['manager'].run_policy(pcs)
if filter(lambda i: pcs[i]['set'], ids):
container.power["manager"].run_policy(pcs)
if filter(lambda i: pcs[i]["set"], ids):
logger.debug("Phase context not reset %r", application)
else:
container.power['manager'].reset_all()
container.power["manager"].reset_all()
for i in ids:
pcs[i]['set'] = False
pcs[i]["set"] = False
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
p = containers[container].power
if p['policy']:
if p["policy"]:
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
app = next(
apps[a]
for a in apps
if apps[a].container_uuid == container
)
self.run_policy_container(containers[container], app)
......@@ -52,19 +52,19 @@ class cpustatvals:
def parse(self):
self.d = {} # clear d's contents
self.d['time'] = time.time()
self.d["time"] = time.time()
with open(self.cpustatfn(self.cpuid)) as f:
while True:
l = f.readline()
if not l:
break
a = l.split()
if a[0] in ('id', 'aperf', 'mperf', 'pstate', 'tsc'):
if a[0] in ("id", "aperf", "mperf", "pstate", "tsc"):
self.d[a[0]] = int(a[1])
def pr(self):
for k in ('id', 'aperf', 'mperf', 'pstate', 'tsc'):
print('%s=%d ' % (k, self.d[k]))
for k in ("id", "aperf", "mperf", "pstate", "tsc"):
print("%s=%d " % (k, self.d[k]))
print()
def diff_u64(self, v1, v2): # v1 - v2
......@@ -73,33 +73,32 @@ class cpustatvals:
return (self.u64max - v2) + v1
def calc_cpufreq(self, prev): # prev is an object of cpustatvals
if not (prev.d.has_key('tsc') and self.d.has_key('tsc')):
if not (prev.d.has_key("tsc") and self.d.has_key("tsc")):
return 0.0
tmp = {}
for k in ('tsc', 'aperf', 'mperf'):
for k in ("tsc", "aperf", "mperf"):
tmp[k] = float(self.diff_u64(self.d[k], prev.d[k]))
dt = self.d['time'] - prev.d['time']
freq = tmp['aperf'] / tmp['mperf']
freq *= tmp['tsc']
dt = self.d["time"] - prev.d["time"]
freq = tmp["aperf"] / tmp["mperf"]
freq *= tmp["tsc"]
freq *= 1e-9 # covert it to GHz
freq /= dt
return freq
def calc_aperf(self, prev): # prev is an object of cpustatvals
if not (prev.d.has_key('tsc') and self.d.has_key('tsc')):
if not (prev.d.has_key("tsc") and self.d.has_key("tsc")):
return 0.0
tmp = {}
k = 'aperf'
k = "aperf"
tmp[k] = float(self.diff_u64(self.d[k], prev.d[k]))
dt = self.d['time'] - prev.d['time']
return tmp['aperf'] * 1e-9 / dt
dt = self.d["time"] - prev.d["time"]
return tmp["aperf"] * 1e-9 / dt