Commit f0d1f9ea authored by Valentin Reis's avatar Valentin Reis

wip

parent 25b64e79
Pipeline #7874 failed with stages
in 3 minutes and 57 seconds
......@@ -179,7 +179,7 @@ class ContainerManager(object):
argv = []
if manifest.is_feature_enabled("scheduler"):
sched = manifest.app["scheduler"]
argv = self.chrt.getwrappedcmd(sched)
argv = self.chrt.getwrappedcmd(sched.policy, sched.priority)
# Use hwloc-bind to launch each process in the conatiner by prepending
# it as an argument to the command line, if enabled in manifest.
......
......@@ -17,10 +17,20 @@ import re
import time
import getopt
from nrm.coolr.clr_misc import readuptime
from nrm.types import PowerSample
from typing import Any, Dict, Union, Pattern
class rapl_reader:
dryrun = False
dirs: Dict[str, str]
lastpower: dict
max_energy_range_uj_d: Dict[str, int]
prev_e: Any
rapldir: str
re_domain: Pattern[str]
start_time_e: float
stop_time: float
totalenergy: dict
rapldir = "/sys/devices/virtual/powercap/intel-rapl"
......@@ -58,13 +68,11 @@ class rapl_reader:
self.dirs = {}
self.max_energy_range_uj_d = {}
if self.dryrun:
return
self.init = False
if not os.path.exists(self.rapldir):
return
self.init = True
raise Exception(
"RAPL configuration error. Path %s does not exist."
% self.rapldir
)
for d1 in os.listdir(self.rapldir):
dn = "%s/%s" % (self.rapldir, d1)
......@@ -98,9 +106,6 @@ class rapl_reader:
self.start_energy_counter()
def initialized(self):
return self.init
def shortenkey(self, str):
return str.replace("package-", "p")
......@@ -108,14 +113,8 @@ class rapl_reader:
# print k, self.max_energy_range_uj_d[k]
def readenergy(self):
if not self.init:
return {}
ret = {}
ret["time"] = time.time()
if self.dryrun:
ret["package-0"] = readuptime() * 1000.0 * 1000.0
return ret
for k in sorted(self.dirs.keys()):
fn = self.dirs[k] + "/energy_uj"
ret[k] = self.readint(fn)
......@@ -126,14 +125,8 @@ class rapl_reader:
# from a slow path. return a dict with long domain names as keys
# and a value contains a dict with 'curW', 'maxW', 'enabled'
def readpowerlimitall(self):
if not self.init:
return {}
def get_powerlimits(self) -> Dict[str, Dict[str, Union[bool, float]]]:
ret = {}
if self.dryrun:
ret["package-0"] = 100.0
return ret
for k in sorted(self.dirs.keys()):
dvals = {}
v = self.readint(self.dirs[k] + "/constraint_0_power_limit_uw")
......@@ -166,11 +159,6 @@ class rapl_reader:
ret = {}
delta = e2["time"] - e1["time"] # assume 'time' never wrap around
ret["delta"] = delta
if self.dryrun:
k = "package-0"
ret[k] = e2[k] - e1[k]
ret[k] /= 1000.0 * 1000.0 # conv. [uW] to [W]
return ret
for k in self.max_energy_range_uj_d:
if e2[k] >= e1[k]:
......@@ -183,9 +171,6 @@ class rapl_reader:
# this should be renamed to reset_...
def start_energy_counter(self):
if not self.initialized():
return
self.start_time_e = time.time()
self.totalenergy = {}
self.lastpower = {}
......@@ -198,10 +183,7 @@ class rapl_reader:
self.prev_e = e
# XXX: fix the total energy tracking later
def read_energy_acc(self):
if not self.initialized():
return
def read_energy_acc(self) -> Dict[str, Any]:
e = self.readenergy()
de = self.diffenergy(self.prev_e, e)
......@@ -215,18 +197,11 @@ class rapl_reader:
return e
def stop_energy_counter(self):
if not self.initialized():
return
self.stop_time = time.time()
def sample(self, accflag=False):
if not self.initialized():
return
def sample(self, accflag=False) -> PowerSample:
e = self.readenergy()
de = self.diffenergy(self.prev_e, e)
for k in sorted(e.keys()):
if k != "time":
if accflag:
......@@ -234,34 +209,27 @@ class rapl_reader:
self.lastpower[k] = de[k] / de["time"] / 1000.0 / 1000.0
self.prev_e = e
ret = dict()
ret["energy"] = dict()
for k in sorted(e.keys()):
if k != "time":
ret["energy"][self.shortenkey(k)] = e[k]
ret["power"] = dict()
totalpower = 0.0
for k in sorted(self.lastpower.keys()):
if k != "time":
ret["power"][self.shortenkey(k)] = self.lastpower[k]
# this is a bit ad hoc way to calculate the total.
# needs to be fixed later
if k.find("core") == -1:
totalpower += self.lastpower[k]
ret["power"]["total"] = totalpower
ret["powercap"] = dict()
rlimit = self.readpowerlimitall()
for k in sorted(rlimit.keys()):
ret["powercap"][self.shortenkey(k)] = rlimit[k]["curW"]
return ret
return PowerSample(
energy={
self.shortenkey(k): v
for k, v in sorted(e.keys())
if k != "time"
},
power={
self.shortenkey(k): v
for k, v in sorted(self.lastpower.keys())
if k != "time"
},
power_total=sum(
[k for k in self.lastpower.keys() if (k.find("core") == -1)]
),
powercap={
self.shortenkey(k): v["curW"]
for k, v in sorted(self.get_powerlimits())
},
)
def total_energy_json(self):
if not self.initialized():
return ""
dt = self.stop_time - self.start_time_e
# constructing a json output
e = self.totalenergy
......@@ -284,16 +252,6 @@ class rapl_reader:
sn += subname
return sn
#
# APIs for power capping
#
def get_powerdomains(self):
return self.readpowerlimitall().keys
def get_powerlimits(self):
return self.readpowerlimitall()
def _set_powerlimit(self, rrdir, newval, id=0):
fn = rrdir + "/constraint_%d_power_limit_uw" % id
uw = newval * 1e6
......@@ -309,50 +267,7 @@ class rapl_reader:
self._set_powerlimit(self.dirs[dom], newval)
def set_powerlimit_pkg(self, newval):
rlims = self.readpowerlimitall()
rlims = self.get_powerlimits()
for k in rlims.keys():
if re.findall("package-[0-9]$", k):
self._set_powerlimit(self.dirs[k], newval)
# Implement the following method
# is_enabled_rapl(), enable_rapl(), disable_rapl()
# convshort2long
# pkgid2cpuids, cpuid2pkgid
# def usage():
# print("clr_rapl.py [options]")
# print("")
# print("--show [pkgid]: show the current setting")
# print("--limitp val: set the limit to all packages")
# print(" [pkgid:]powerval e.g., 140, 1:120")
# print("")
# print("If no option is specified, run the test pattern.")
# print("")
# def test_conv():
# for s in ["package-1", "package-3/dram", "package-2/core"]:
# rr.conv_long2short(s)
# def report_powerlimits():
# l = rr.get_powerlimits()
# for k in l.keys():
# if l[k]["enabled"]:
# print(k, "curW:", l[k]["curW"], "maxW:", l[k]["maxW"])
# def run_powercap_testbench():
# # hard-coded for Haswell E5-2699v2 dual socket
# print(rr.get_powerdomains())
# report_powerlimits()
# w = 10
# rr.set_powerlimit_pkg(120)
# time.sleep(w)
# rr.set_powerlimit_pkg(80)
# time.sleep(w)
# rr.set_powerlimit(130, "package-1")
# time.sleep(w)
# rr.set_powerlimit_pkg(145)
......@@ -24,11 +24,21 @@ import nrm.coolr.clr_nodeinfo
import nrm.coolr.clr_cpufreq
import nrm.coolr.clr_misc
from typing import Any, Dict, NamedTuple
from nrm.types import MachineInfo
class SensorManager:
"""Performs sensor reading and basic data aggregation."""
def __init__(self):
coretemp: nrm.coolr.clr_hwmon.coretemp_reader
cputopology: nrm.coolr.clr_nodeinfo.cputopology
nodeconfig: nrm.coolr.clr_nodeinfo.nodeconfig
nodename: str
rapl: nrm.coolr.clr_rapl.rapl_reader
def __init__(self) -> None:
self.nodeconfig = nrm.coolr.clr_nodeinfo.nodeconfig()
self.nodename = self.nodeconfig.nodename
self.cputopology = nrm.coolr.clr_nodeinfo.cputopology()
......@@ -41,12 +51,10 @@ class SensorManager:
def stop(self):
self.rapl.stop_energy_counter()
def do_update(self):
machine_info = dict()
machine_info["energy"] = self.rapl.sample(accflag=True)
machine_info["temperature"] = self.coretemp.sample()
machine_info["time"] = time.time()
return machine_info
def do_update(self) -> MachineInfo:
return MachineInfo(
self.rapl.sample(accflag=True), self.coretemp.sample(), time.time()
)
def get_powerlimits(self):
pl = self.rapl.get_powerlimits()
......@@ -56,7 +64,7 @@ class SensorManager:
def set_powerlimit(self, domain, value):
self.rapl.set_powerlimit(value, domain)
def calc_difference(self, start, end):
def calc_difference(self, start, end) -> Dict[str, Any]:
diff = dict()
for k in start.keys():
if k not in ["time"]:
......
......@@ -15,18 +15,7 @@ import xml.etree.ElementTree
import tornado.process as process # type: ignore
import subprocess
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Sized,
Tuple,
Type,
TypeVar,
NamedTuple,
)
from typing import Any, List, Optional, NamedTuple, Dict
logger = logging.getLogger("nrm")
......@@ -36,7 +25,7 @@ class Resources(NamedTuple):
mems: List[int]
def logpopen(p, args, stdout, stderr) -> None:
def logpopen(p, args: List[str], stdout, stderr) -> None:
"""log popen cmd."""
logger.debug("popen cmd: %r", args)
logger.debug("popen return code: %s", p.returncode)
......@@ -64,14 +53,15 @@ def list2bitmask(l) -> str:
class NodeOSClient(object):
prefix: str
"""Client to argo_nodeos_config."""
def __init__(self, argo_nodeos_config="argo_nodeos_config"):
def __init__(self, argo_nodeos_config: str = "argo_nodeos_config"):
"""Load client configuration."""
self.prefix = argo_nodeos_config
def getavailable(self):
def getavailable(self) -> Resources:
"""Gather available resources."""
args = [self.prefix, "--show_available_resources=shared:false"]
p = subprocess.Popen(
......@@ -93,7 +83,7 @@ class NodeOSClient(object):
mems.extend(l.split())
return Resources([int(x) for x in cpus], [int(x) for x in mems])
def create(self, name, params):
def create(self, name: str, params: Resources) -> None:
"""Create container, according to params."""
args = [self.prefix]
cmd = "--create_container="
......@@ -115,7 +105,7 @@ class NodeOSClient(object):
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
def attach(self, name, pid):
def attach(self, name: str, pid) -> None:
"""Attach a pid to a container."""
args = [self.prefix]
cmd = "--attach_to_container="
......@@ -128,7 +118,7 @@ class NodeOSClient(object):
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
def delete(self, name, kill=False):
def delete(self, name: str, kill: bool = False) -> None:
"""Destroy container."""
# destroy container
args = [self.prefix]
......@@ -143,7 +133,7 @@ class NodeOSClient(object):
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
def execute(self, name, argv, environ):
def execute(self, name: str, argv, environ) -> Any:
"""Execute argv inside container."""
args = [self.prefix]
cmd = "--exec="
......@@ -172,11 +162,15 @@ class SingularityClient(object):
"""Client to singularity."""
def __init__(self, singularity_path="singularity"):
prefix: str
def __init__(self, singularity_path: str = "singularity"):
"""Load client configuration."""
self.prefix = singularity_path
def instance_start(self, instance_name, container_image, bind_list=[]):
def instance_start(
self, instance_name: str, container_image: str, bind_list=[]
) -> None:
"""Start a named instance of a container image.
Note that singularity will also start the startscript if
......@@ -193,7 +187,7 @@ class SingularityClient(object):
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
def execute(self, instance_name, argv, environ):
def execute(self, instance_name: str, argv: List[str], environ) -> Any:
"""Execute argv inside container.
singularity exec instance://instance_name <command>"""
......@@ -210,7 +204,7 @@ class SingularityClient(object):
cwd=environ["PWD"],
)
def instance_stop(self, instance_name, kill=False):
def instance_stop(self, instance_name: str, kill: bool = False) -> None:
"""Stop an instance and kill everything in it."""
args = [self.prefix]
......@@ -244,11 +238,11 @@ class ChrtClient(object):
"""Load configuration."""
self.prefix = "chrt"
def getwrappedcmd(self, params) -> List[str]:
def getwrappedcmd(self, policy: str, priority: str) -> List[str]:
"""Return a list of args to prepend to a popen call."""
args = [self.prefix]
args.append(self.flags[params.policy])
args.append(params.priority)
args.append(self.flags[policy])
args.append(priority)
return args
......@@ -261,7 +255,7 @@ class HwlocClient(object):
"""Load configuration."""
self.prefix = hwloc
def info(self):
def info(self) -> Resources:
"""Return list of all cpus and mems."""
cmd = self.prefix + "-ls"
args = [cmd, "--whole-system", "--output-format", "xml"]
......@@ -282,7 +276,7 @@ class HwlocClient(object):
ret.mems.append(0)
return ret
def all2fake(self, resources) -> str:
def all2fake(self, resources: Resources) -> str:
"""Convert resource description of the system into fake topology.
We need that because hwloc barfs on fake numa nodes.
......@@ -295,8 +289,11 @@ class HwlocClient(object):
return "numa: %s pu:%s".format(mems, pu)
def distrib(
self, numprocs, restrict=None, fake=None
) -> dict_values[Resources]:
self,
numprocs: int,
restrict: Optional[Resources] = None,
fake: Optional[Resources] = None,
):
"""Distribute numprocs across the hierarchy."""
# The original command only reports back cpusets. We do better, by
# reporting the mems that go with it. This requires some magic, using
......
from typing import Any, Dict, Union, NamedTuple
class PowerSample(NamedTuple):
energy: Dict[str, Any]
power: Dict[str, Any]
power_total: float
powercap: Dict[str, Union[bool, float]]
class MachineInfo(NamedTuple):
energy: PowerSample
temperature: float
time: int
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment