...
 
Commits (7)
---
variables:
ARGOPKGS: "https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz"
ARGOPKGS: "https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/nrm-py3/argopkgs-nrm-py3.tar.gz"
EXTRA: "--nrm ./."
stages:
......@@ -12,12 +12,12 @@ stages:
- docs
include:
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/argonix.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/nrm.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/components.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/integration.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/repoquality.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-py3/gitlab-ci/argonix.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-py3/gitlab-ci/nrm.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-py3/gitlab-ci/components.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-py3/gitlab-ci/integration.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-py3/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-py3/gitlab-ci/repoquality.yml
flake8:
stage: style
......
PYTHON:= $(shell which python2)
PYTHON:= $(shell which python3)
source:
$(PYTHON) setup.py sdist
......
#!/usr/bin/env python2
#!/usr/bin/env python3
from __future__ import print_function
import argparse
......
#!/usr/bin/env python2
#!/usr/bin/env python3
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
......
#!/usr/bin/env python2
#!/usr/bin/env python3
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
......
#!/usr/bin/env python2
#!/usr/bin/env python3
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
......
......@@ -4,8 +4,8 @@
rec {
nrm = pkgs.nrm;
hack = nrm.overrideAttrs (old:{
buildInputs = old.buildInputs ++ [
pkgs.pythonPackages.flake8
pkgs.pythonPackages.sphinx ];
propagatedBuildInputs = old.propagatedBuildInputs ++ [
pkgs.python37Packages.flake8
pkgs.python37Packages.black ];
});
}
......@@ -12,11 +12,11 @@
import logging
from schema import loadschema
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
def has(self, f):
return(f in self.app.keys())
return f in self.app.keys()
ImageManifest = loadschema("yml", "manifest")
......
......@@ -12,28 +12,30 @@ from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
class Application(object):
"""Information about a downstream API user."""
thread_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'noop': 'max'},
's_ask_d': {'done': 'stable', 'noop': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'noop': 'noop'},
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
thread_fsm_table = {
"stable": {"i": "s_ask_i", "d": "s_ask_d"},
"s_ask_i": {"done": "stable", "noop": "max"},
"s_ask_d": {"done": "stable", "noop": "min"},
"max": {"d": "max_ask_d"},
"min": {"i": "min_ask_i"},
"max_ask_d": {"done": "stable", "noop": "noop"},
"min_ask_i": {"done": "stable", "noop": "noop"},
"noop": {},
}
def __init__(self, uuid, container, progress, threads, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.thread_state = "stable"
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
......@@ -49,21 +51,21 @@ class Application(object):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
speed = float(self.progress) / float(self.threads["cur"])
if command == "i":
return speed
else:
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
newth = msg["payload"]
curth = self.threads["cur"]
if newth == curth:
self.do_thread_transition('noop')
self.do_thread_transition("noop")
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
self.do_thread_transition("done")
self.threads["cur"] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
......@@ -75,10 +77,11 @@ class Application(object):
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = int(msg.cpu)
self.phase_contexts[id] = {k: getattr(msg, k) for k in
('aggregation', 'computetime',
'totaltime')}
self.phase_contexts[id]['set'] = True
self.phase_contexts[id] = {
k: getattr(msg, k)
for k in ("aggregation", "computetime", "totaltime")
}
self.phase_contexts[id]["set"] = True
class ApplicationManager(object):
......@@ -91,21 +94,22 @@ class ApplicationManager(object):
def register(self, msg, container):
"""Register a new downstream application."""
uuid = msg['application_uuid']
container_uuid = msg['container_uuid']
uuid = msg["application_uuid"]
container_uuid = msg["container_uuid"]
progress = 0
threads = False
phase_contexts = dict()
phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime']
if container.power['policy']:
phase_context_keys = ["set", "aggregation", "computetime", "totaltime"]
if container.power["policy"]:
ids = container.resources.cpus
for id in ids:
phase_contexts[id] = dict.fromkeys(phase_context_keys)
phase_contexts[id]['set'] = False
phase_contexts[id]["set"] = False
else:
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
self.applications[uuid] = Application(
uuid, container_uuid, progress, threads, phase_contexts
)
def delete(self, uuid):
"""Delete an application from the register."""
......
This diff is collapsed.
......@@ -12,7 +12,7 @@ from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
class Action(object):
......@@ -73,19 +73,20 @@ class PowerActuator(object):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("power limits: %r:", pl)
if target == 'i':
if target == "i":
for k in pl:
r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW']))
r = range(int(pl[k]["curW"]) + 1, int(pl[k]["maxW"]))
actions.extend([Action(k, s, s - r[0]) for s in r])
elif target == 'd':
elif target == "d":
for k in pl:
r = range(1, int(pl[k]['curW']))
r = range(1, int(pl[k]["curW"]))
actions.extend([Action(k, s, r[-1] - s) for s in r])
return actions
def execute(self, action):
logger.info("changing power limit: %r, %r", action.command,
action.delta)
logger.info(
"changing power limit: %r, %r", action.command, action.delta
)
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
......@@ -102,17 +103,18 @@ class Controller(object):
def planify(self, target, machineinfo):
"""Plan the next action for the control loop."""
try:
total_power = machineinfo['energy']['power']['total']
total_power = machineinfo["energy"]["power"]["total"]
except TypeError:
logging.error("\"machineinfo\" malformed. Can not run "
"control loop.")
logging.error(
'"machineinfo" malformed. Can not run ' "control loop."
)
return (None, None)
direction = None
if total_power < target:
direction = 'i'
direction = "i"
elif total_power > target:
direction = 'd'
direction = "d"
if direction:
actions = []
......@@ -139,26 +141,29 @@ class Controller(object):
ids = container.resources.cpus
pcs = application.phase_contexts
# Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids):
if not filter(lambda i: not pcs[i]["set"], ids):
# Only run policy if all phase contexts are an
# aggregation of same number of phases
aggs = [pcs[i]['aggregation'] for i in ids]
aggs = [pcs[i]["aggregation"] for i in ids]
if aggs.count(aggs[0]) == len(aggs):
container.power['manager'].run_policy(pcs)
if filter(lambda i: pcs[i]['set'], ids):
container.power["manager"].run_policy(pcs)
if filter(lambda i: pcs[i]["set"], ids):
logger.debug("Phase context not reset %r", application)
else:
container.power['manager'].reset_all()
container.power["manager"].reset_all()
for i in ids:
pcs[i]['set'] = False
pcs[i]["set"] = False
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
p = containers[container].power
if p['policy']:
if p["policy"]:
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
app = next(
apps[a]
for a in apps
if apps[a].container_uuid == container
)
self.run_policy_container(containers[container], app)
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
#!/usr/bin/env python
#
# coolr cpufreq related codes
......@@ -20,24 +10,36 @@
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
import os, sys, time
import struct
import copy
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
import os
import sys
import time
import clr_nodeinfo
import numpy as np
#an example the content of cpustat
#id 0
#aperf 4926926121023
#mperf 4582847073452
#perf_bias 8
#ucc 281462841145699
#urc 0
#perf_target 8448
#perf_status 8448
#pstate 33
#turbo_disengage 0
#tsc 1117245755950154
# an example the content of cpustat
# id 0
# aperf 4926926121023
# mperf 4582847073452
# perf_bias 8
# ucc 281462841145699
# urc 0
# perf_target 8448
# perf_status 8448
# pstate 33
# turbo_disengage 0
# tsc 1117245755950154
class cpustatvals:
def cpustatfn(self, cpuid):
......@@ -49,56 +51,54 @@ class cpustatvals:
self.cpuid = cpuid
def parse(self):
self.d = {} # clear d's contents
self.d['time'] = time.time()
self.d = {} # clear d's contents
self.d["time"] = time.time()
with open(self.cpustatfn(self.cpuid)) as f:
while True:
l = f.readline()
if not l:
break
a = l.split()
if a[0] in ('id', 'aperf', 'mperf', 'pstate', 'tsc'):
if a[0] in ("id", "aperf", "mperf", "pstate", "tsc"):
self.d[a[0]] = int(a[1])
def pr(self):
for k in ('id', 'aperf', 'mperf', 'pstate', 'tsc'):
print '%s=%d ' % (k, self.d[k]),
print
def diff_u64(self, v1, v2): # v1 - v2
for k in ("id", "aperf", "mperf", "pstate", "tsc"):
print("%s=%d " % (k, self.d[k]))
print()
def diff_u64(self, v1, v2): # v1 - v2
if v1 >= v2:
return v1 - v2
return (self.u64max -v2) + v1
return (self.u64max - v2) + v1
def calc_cpufreq(self,prev): # prev is an object of cpustatvals
if not (prev.d.has_key('tsc') and self.d.has_key('tsc')):
return 0.0
def calc_cpufreq(self, prev): # prev is an object of cpustatvals
if not (prev.d.has_key("tsc") and self.d.has_key("tsc")):
return 0.0
tmp = {}
for k in ('tsc', 'aperf', 'mperf'):
for k in ("tsc", "aperf", "mperf"):
tmp[k] = float(self.diff_u64(self.d[k], prev.d[k]))
dt = self.d['time'] - prev.d['time']
freq = tmp['aperf'] / tmp['mperf']
freq *= tmp['tsc']
dt = self.d["time"] - prev.d["time"]
freq = tmp["aperf"] / tmp["mperf"]
freq *= tmp["tsc"]
freq *= 1e-9 # covert it to GHz
freq /= dt
return freq
def calc_aperf(self,prev): # prev is an object of cpustatvals
if not (prev.d.has_key('tsc') and self.d.has_key('tsc')):
return 0.0
def calc_aperf(self, prev): # prev is an object of cpustatvals
if not (prev.d.has_key("tsc") and self.d.has_key("tsc")):
return 0.0
tmp = {}
k = 'aperf'
k = "aperf"
tmp[k] = float(self.diff_u64(self.d[k], prev.d[k]))
dt = self.d['time'] - prev.d['time']
return tmp['aperf'] * 1e-9 / dt
dt = self.d["time"] - prev.d["time"]
return tmp["aperf"] * 1e-9 / dt
class cpufreq_reader:
def __init__(self):
self.outputpercore(True)
......@@ -106,12 +106,12 @@ class cpufreq_reader:
# so simply instantiating an object of cputopology again here.
self.ct = clr_nodeinfo.cputopology()
self.cpus = self.ct.onlinecpus # just for convenience
self.cpus = self.ct.onlinecpus # just for convenience
self.init = False
for cpuid in self.cpus:
tmp = cpustatvals(cpuid) # just for cpustatfn
tmp = cpustatvals(cpuid) # just for cpustatfn
statpath = tmp.cpustatfn(cpuid)
if not os.path.exists(statpath):
# print 'Not found', statpath
......@@ -121,7 +121,8 @@ class cpufreq_reader:
self.cnt = 0
self.samples = [
[cpustatvals(i) for i in self.cpus],
[cpustatvals(i) for i in self.cpus] ]
[cpustatvals(i) for i in self.cpus],
]
self.sample()
......@@ -134,7 +135,6 @@ class cpufreq_reader:
self.samples[idx][cpuid].parse()
self.cnt = self.cnt + 1
def pstate(self):
ret = [0.0 for i in self.cpus]
if not self.init:
......@@ -142,11 +142,11 @@ class cpufreq_reader:
if self.cnt == 0:
return ret
idx = 0 # if cnt is an odd number
idx = 0 # if cnt is an odd number
if self.cnt % 2 == 0:
idx = 1
for cpuid in self.cpus:
ret[cpuid] = self.samples[idx][cpuid].d['pstate']
ret[cpuid] = self.samples[idx][cpuid].d["pstate"]
return ret
......@@ -165,7 +165,8 @@ class cpufreq_reader:
for cpuid in self.cpus:
ret[cpuid] = self.samples[idxcur][cpuid].calc_cpufreq(
self.samples[idxprev][cpuid])
self.samples[idxprev][cpuid]
)
return ret
......@@ -184,16 +185,17 @@ class cpufreq_reader:
for cpuid in self.cpus:
ret[cpuid] = self.samples[idxcur][cpuid].calc_aperf(
self.samples[idxprev][cpuid])
self.samples[idxprev][cpuid]
)
return ret
def outputpercore(self,flag=True):
self.percore=flag
def outputpercore(self, flag=True):
self.percore = flag
def sample_and_json(self, node=""):
if not self.init:
return ''
return ""
self.sample()
f = self.aperf()
......@@ -207,29 +209,28 @@ class cpufreq_reader:
freqstd = np.std(tmp)
buf += ',"p%s":{' % p
buf += '"mean":%.3lf,"std":%.3lf' % (freqmean,freqstd)
buf += '"mean":%.3lf,"std":%.3lf' % (freqmean, freqstd)
if self.percore:
for c in self.ct.pkgcpus[p]:
buf += ',"c%d":%.3lf' % (c, f[c])
buf += '}'
buf += '}'
buf += "}"
buf += "}"
return buf
if __name__ == '__main__':
if __name__ == "__main__":
freq = cpufreq_reader()
if not freq.init:
print 'Please check the cpustat module is installed'
print("Please check the cpustat module is installed")
sys.exit(1)
freq.outputpercore(False)
for i in range(0, 20):
j = freq.sample_and_json()
print '[freq json]'
print j
print("[freq json]")
print(j)
time.sleep(1)
sys.exit(0)
......@@ -237,25 +238,24 @@ if __name__ == '__main__':
for i in range(0, 20):
freq.sample()
print '[pstate]',
print("[pstate]")
for p in freq.pstate():
print p,
print
print(p)
print()
print '[aperf]',
print("[aperf]")
for f in freq.aperf():
print '%.2lf ' % f,
print
print("%.2lf " % f)
print()
print '[freq]',
print("[freq]")
for f in freq.cpufreq():
print '%.2lf ' % f,
print
print("%.2lf " % f)
print()
j = freq.sample_and_json()
print '[freq json]'
print j
print("[freq json]")
print(j)
print
print()
time.sleep(1)
#!/usr/bin/env python
#
# coolr hwmon related codes
#
# This code requires the coretemp driver for temperature reading
#
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
......@@ -8,90 +17,86 @@
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
#!/usr/bin/env python
#
# coolr hwmon related codes
#
# This code requires the coretemp driver for temperature reading
#
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
import re, os, sys
import re
import os
import sys
import numpy as np
from clr_nodeinfo import *
class coretemp_reader :
def parse_pkgtemp(self,fn):
class coretemp_reader:
def parse_pkgtemp(self, fn):
retval = -1
try:
f = open(fn , "r")
f = open(fn, "r")
except:
return retval
l = f.readline()
m = re.search('Physical id ([0-9]+)', l )
m = re.search("Physical id ([0-9]+)", l)
if m:
retval=int(m.group(1))
retval = int(m.group(1))
f.close()
return retval
def parse_coretemp(self,fn):
def parse_coretemp(self, fn):
retval = -1
try:
f = open(fn , "r")
f = open(fn, "r")
except:
return retval
l = f.readline()
m = re.search('Core ([0-9]+)', l )
m = re.search("Core ([0-9]+)", l)
if m:
retval=int(m.group(1))
retval = int(m.group(1))
f.close()
return retval
hwmondir = '/sys/class/hwmon/'
hwmondir = "/sys/class/hwmon/"
class coretempinfo:
def __init__(self):
self.dir = ''
self.coretempfns = {} # use coreid as key
self.pkgtempfn = ''
self.dir = ""
self.coretempfns = {} # use coreid as key
self.pkgtempfn = ""
def __init__ (self):
def __init__(self):
self.outputpercore(True)
self.coretemp = {} # use pkgid as key
self.coretemp = {} # use pkgid as key
for d1 in os.listdir(self.hwmondir):
# try to check see if 'name' contains 'coretemp'
tmpdir = "%s%s" % (self.hwmondir,d1)
tmpdir = "%s%s" % (self.hwmondir, d1)
drivername = readbuf("%s/name" % tmpdir).rstrip()
if not drivername == "coretemp":
continue
pkgid = -1
coretempfns = {}
pkgtempfn = ''
pkgtempfn = ""
# parse all temp*_label files
for d2 in os.listdir( tmpdir ):
m = re.search( 'temp([0-9]+)_label', d2 )
for d2 in os.listdir(tmpdir):
m = re.search("temp([0-9]+)_label", d2)
if m:
tempid=int(m.group(1))
tempid = int(m.group(1))
coreid = self.parse_coretemp("%s/%s" % (tmpdir, d2))
if coreid >= 0 :
coretempfns[coreid] = "%s/temp%d_input" % (tmpdir, tempid)
else: # possibly pkgid
if coreid >= 0:
coretempfns[coreid] = "%s/temp%d_input" % (
tmpdir,
tempid,
)
else: # possibly pkgid
pkgtempfn = "%s/temp%d_input" % (tmpdir, tempid)
pkgid = self.parse_pkgtemp("%s/%s" % (tmpdir, d2))
if pkgid < 0 :
print 'unlikely: ', pkgtempfn
if pkgid < 0:
print("unlikely: ", pkgtempfn)
cti = self.coretempinfo()
cti.dir = tmpdir
cti.coretempfns = coretempfns
cti.pkgtempfn = pkgtempfn
if pkgid < 0: # assume a single socket machine
if pkgid < 0: # assume a single socket machine
self.coretemp[0] = cti
else:
self.coretemp[pkgid] = cti
......@@ -102,17 +107,17 @@ class coretemp_reader :
for pkgid in sorted(ctemp.keys()):
temps = {}
if os.access(ctemp[pkgid].pkgtempfn, os.R_OK):
val = int(readbuf(ctemp[pkgid].pkgtempfn))/1000
temps['pkg'] = val
val = int(readbuf(ctemp[pkgid].pkgtempfn)) / 1000
temps["pkg"] = val
for c in sorted(ctemp[pkgid].coretempfns.keys()):
if os.access(ctemp[pkgid].coretempfns[c], os.R_OK):
val = int(readbuf(ctemp[pkgid].coretempfns[c]))/1000
val = int(readbuf(ctemp[pkgid].coretempfns[c])) / 1000
temps[c] = val
ret[pkgid] = temps
return ret
def outputpercore(self,flag=True):
self.percore=flag
def outputpercore(self, flag=True):
self.percore = flag
def sample(self):
temp = self.readtempall()
......@@ -123,10 +128,10 @@ class coretemp_reader :
key = "p%d" % p
ret[key] = dict()
pstat = self.getpkgstats(temp, p)
ret[key]['mean'] = pstat[0]
ret[key]['std'] = pstat[1]
ret[key]['min'] = pstat[2]
ret[key]['max'] = pstat[3]
ret[key]["mean"] = pstat[0]
ret[key]["std"] = pstat[1]
ret[key]["min"] = pstat[2]
ret[key]["max"] = pstat[3]
if self.percore:
for c in sorted(temp[p].keys()):
ret[key][c] = temp[p][c]
......@@ -144,32 +149,32 @@ class coretemp_reader :
for c in temps[pkgid].keys():
vals.append(temps[pkgid][c])
return [np.mean(vals), np.std(vals), np.min(vals), np.max(vals)]
def readpkgtemp(self):
fn = "%s_input" % self.pkgtempfns[pkgid].pkgfn
f = open(fn)
v = int(f.readline())/1000.0
f = open(fn)
v = int(f.readline()) / 1000.0
f.close()
return v
def readcoretemp(self,pkgid):
def readcoretemp(self, pkgid):
t = []
for fnbase in self.pkgtempfns[pkgid].corefns:
fn = "%s_input" % fnbase
if not os.access( fn, os.R_OK ):
if not os.access(fn, os.R_OK):
continue # cpu may become offline
f = open(fn)
v = int(f.readline())/1000.0
f = open(fn)
v = int(f.readline()) / 1000.0
f.close()
t.append(v)
return t
class acpi_power_meter_reader :
class acpi_power_meter_reader:
# add a nicer detection routine later
def __init__(self):
self.init = False
fn = '/sys/class/hwmon/hwmon0/device/power1_average'
fn = "/sys/class/hwmon/hwmon0/device/power1_average"
if os.path.exists(fn):
self.init = True
......@@ -179,17 +184,17 @@ class acpi_power_meter_reader :
def read(self):
if not self.init:
return -1
retval=-1
fn = '/sys/class/hwmon/hwmon0/device/power1_average'
retval = -1
fn = "/sys/class/hwmon/hwmon0/device/power1_average"
try:
f = open(fn , "r")
f = open(fn, "r")
except:
return retval
l = f.readline()
retval = int(l) * 1e-6 # uW to W
retval = int(l) * 1e-6 # uW to W
f.close()
return retval
......@@ -197,4 +202,4 @@ class acpi_power_meter_reader :
if not self.init:
return {}
return {'power': self.read()}
return {"power": self.read()}
#!/usr/bin/env python
#
# misc. classes, functions
#
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
......@@ -8,29 +15,28 @@
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
#!/usr/bin/env python
#
# misc. classes, functions
#
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
import os, sys, re, time
import os
import sys
import re
import time
def readbuf(fn):
for retry in range(0,10):
for retry in range(0, 10):
try:
f = open( fn )
f = open(fn)
l = f.readline()
f.close()
return l
except:
time.sleep(0.01)
continue
return ''
return ""
def readuptime():
f = open( '/proc/uptime' )
f = open("/proc/uptime")
l = f.readline()
v = l.split()
return float( v[0] )
return float(v[0])
#!/usr/bin/env python
#
# CPU related codes
#
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
......@@ -8,14 +15,10 @@
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
#!/usr/bin/env python
#
# CPU related codes
#
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
import os, sys, re, time, socket
import os
import sys
import re
import socket
# local
from clr_misc import *
......@@ -29,85 +32,90 @@ from clr_misc import *
#
# limitation: no support for runtime change
#
class cputopology:
cpubasedir = '/sys/devices/system/cpu/'
nodebasedir = '/sys/devices/system/node/'
cpubasedir = "/sys/devices/system/cpu/"
nodebasedir = "/sys/devices/system/node/"
def parserange(self,fn):
tmp = readbuf( fn )
def parserange(self, fn):
tmp = readbuf(fn)
ret = []
for t in tmp.split(','):
ab = re.findall('[0-9]+', t)
if len(ab) == 2 :
ret = ret + range( int(ab[0]), int(ab[1])+1 )
for t in tmp.split(","):
ab = re.findall("[0-9]+", t)
if len(ab) == 2:
ret = ret + range(int(ab[0]), int(ab[1]) + 1)
elif len(ab) == 1:
ret = ret + [int(ab[0])]
else:
print 'unlikely at cputoplogy.parserange():',ab
print("unlikely at cputoplogy.parserange():", ab)
sys.exit(1)
return ret
def parsemask(self,fn):
tmp = readbuf( fn )
def parsemask(self, fn):
tmp = readbuf(fn)
tmp = tmp.rstrip()
maskstrs = tmp.split(',')
maskstrs = tmp.split(",")
maskstrs.reverse()
shift=0
shift = 0
ret = []
for mstr in maskstrs:
bmint = long(mstr,16)
for i in range(0,32):
if (bmint&1) == 1:
ret.append(i+shift)
bmint = long(mstr, 16)
for i in range(0, 32):
if (bmint & 1) == 1:
ret.append(i + shift)
bmint = bmint >> 1
shift = shift + 32
return ret
def detect(self):
self.onlinecpus = self.parserange(self.cpubasedir + 'online')
self.onlinecpus = self.parserange(self.cpubasedir + "online")
self.pkgcpus = {}
for cpuid in self.onlinecpus:
pkgidfn = self.cpubasedir + "cpu%d/topology/physical_package_id" % (cpuid)
pkgidfn = (
self.cpubasedir
+ "cpu%d/topology/physical_package_id" % (cpuid)
)
pkgid = int(readbuf(pkgidfn))
if not self.pkgcpus.has_key(pkgid) :
if not self.pkgcpus.has_key(pkgid):
self.pkgcpus[pkgid] = []
self.pkgcpus[pkgid].append(cpuid)
self.cpu2coreid = {}
self.core2cpuid = {}
for pkgid in self.pkgcpus.keys() :
for pkgid in self.pkgcpus.keys():
for cpuid in self.pkgcpus[pkgid]:
coreidfn = self.cpubasedir + "cpu%d/topology/core_id" % (cpuid)
coreid = int(readbuf(coreidfn))
self.cpu2coreid[cpuid] = (pkgid, coreid)
self.core2cpuid[(pkgid, coreid)] = cpuid
self.onlinenodes = self.parserange(self.nodebasedir + 'online')
self.onlinenodes = self.parserange(self.nodebasedir + "online")
self.nodecpus = {}
for n in self.onlinenodes:
self.nodecpus[n] = self.parsemask(self.nodebasedir + "node%d/cpumap" % (n))
self.nodecpus[n] = self.parsemask(
self.nodebasedir + "node%d/cpumap" % (n)
)
def __init__(self):
self.detect()
class nodeconfig :
class nodeconfig:
def parse(self):
self.hostname = socket.gethostname()
# XXX: not sure this is unique
self.nodename = self.hostname.split('.')[0]
self.nodename = self.hostname.split(".")[0]
tmp = readbuf( '/proc/version' )
tmp = readbuf("/proc/version")
self.version = tmp.split()[2]
re_model = re.compile("^model\s+:\s+([0-9]+)")
self.cpumodel = -1
with open('/proc/cpuinfo') as f:
with open("/proc/cpuinfo") as f:
while True:
l = f.readline()
if not l:
......@@ -117,85 +125,81 @@ class nodeconfig :
self.cpumodel = int(m.group(1))
self.memoryKB = -1
with open('/proc/meminfo') as f:
with open("/proc/meminfo") as f:
l = f.readline()
self.memoryKB = int(l.split()[1])
# assume that all cpu have the same setting for this experiment
self.driver = ''
self.freqdriver = ''
d = '/sys/devices/system/cpu/cpu0/cpufreq'
self.driver = ""
self.freqdriver = ""
d = "/sys/devices/system/cpu/cpu0/cpufreq"
if os.path.exists(d):
self.freqdriver = 'acpi_cpufreq'
self.freqdriver = "acpi_cpufreq"
fn = d + "/scaling_driver"
self.driver = readbuf( fn ).rstrip()
self.driver = readbuf(fn).rstrip()
fn = d + "/scaling_governor"
self.governor = readbuf( fn ).rstrip()
self.governor = readbuf(fn).rstrip()
fn = d + "/scaling_cur_freq"
self.cur_freq = readbuf( fn ).rstrip()
self.cur_freq = readbuf(fn).rstrip()
d = "/sys/devices/system/cpu/intel_pstate"
if os.path.exists(d):
self.freqdriver = 'pstate'
k = 'max_perf_pct'
pmax = readbuf( "%s/%s" % (d,k) ).rstrip()
k = 'min_perf_pct'
pmin = readbuf( "%s/%s" % (d,k) ).rstrip()
k = 'no_turbo'
noturbo = readbuf( "%s/%s" % (d,k) ).rstrip()
self.pstate = "%s/%s/%s" % (pmax,pmin,noturbo)
self.freqdriver = "pstate"
k = "max_perf_pct"
pmax = readbuf("%s/%s" % (d, k)).rstrip()
k = "min_perf_pct"
pmin = readbuf("%s/%s" % (d, k)).rstrip()
k = "no_turbo"
noturbo = readbuf("%s/%s" % (d, k)).rstrip()
self.pstate = "%s/%s/%s" % (pmax, pmin, noturbo)
d = "/sys/devices/system/cpu/turbofreq"
if os.path.exists(d):
self.freqdriver = 'coolrfreq'
self.policy = d + '/pstate_policy'
self.freqdriver = "coolrfreq"
self.policy = d + "/pstate_policy"
def __init__ (self):
def __init__(self):
self.parse()
def testnodeconfig():
print '=== ', sys._getframe().f_code.co_name
def testnodeconfig():
print("=== ", sys._getframe().f_code.co_name)
nc = nodeconfig()
print 'node: ', nc.nodename
print 'version: ', nc.version
print 'cpumodel: ', nc.cpumodel
print 'memoryKB: ', nc.memoryKB
print 'freqdriver: ', nc.freqdriver
print
print("node: ", nc.nodename)
print("version: ", nc.version)
print("cpumodel: ", nc.cpumodel)
print("memoryKB: ", nc.memoryKB)
print("freqdriver: ", nc.freqdriver)
print()
def testcputopology():
print '=== ', sys._getframe().f_code.co_name
print("=== ", sys._getframe().f_code.co_name)
ct = cputopology()
print
print 'No. of online cpus: ', len(ct.onlinecpus)
print
print()
print("No. of online cpus: ", len(ct.onlinecpus))
print()
for p in sorted(ct.pkgcpus.keys()):
print 'pkg%d:' % p, len(ct.pkgcpus[p]), ct.pkgcpus[p]
print ' cpuid:',
print("pkg%d:" % p, len(ct.pkgcpus[p]), ct.pkgcpus[p])
print(" cpuid:")
for cpuid in ct.pkgcpus[p]:
print ct.cpu2coreid[cpuid],ct.cpu2coreid[cpuid][1],
print
print
print(ct.cpu2coreid[cpuid], ct.cpu2coreid[cpuid][1])
print()
print()
for n in sorted(ct.nodecpus.keys()):
print 'node%d:' % n, len(ct.nodecpus[n]), ct.nodecpus[n]
print("node%d:" % n, len(ct.nodecpus[n]), ct.nodecpus[n])
print ' cpuid:',
print(" cpuid:")
for cpuid in ct.nodecpus[n]:
print ct.cpu2coreid[cpuid],
print
print
print(ct.cpu2coreid[cpuid])
print()
print()
if __name__ == '__main__':
if __name__ == "__main__":
testnodeconfig()
testcputopology()
This diff is collapsed.
......@@ -31,7 +31,6 @@ import msr
class DutyCycle:
def __init__(self):
self.register = 0x19A
self.msr = msr.Msr()
......
......@@ -26,66 +26,66 @@ import struct
class Msr:
# get msr file name for the cpu
def get_file_name(self, cpu):
return '/dev/cpu/%d/msr_safe' % cpu
return "/dev/cpu/%d/msr_safe" % cpu
# open msr file with correct privileges
def file_open(self, filename, privilege):
try:
if privilege == 'r':
if privilege == "r":
fd = os.open(filename, os.O_RDONLY)
elif privilege == 'w':
elif privilege == "w":
fd = os.open(filename, os.O_WRONLY)
except OSError as e:
if e.errno == errno.ENXIO:
sys.exit('file_open: No such device or address ' + filename)
sys.exit("file_open: No such device or address " + filename)
elif e.errno == errno.EIO:
sys.exit('file_open: I/O error ' + filename)
sys.exit("file_open: I/O error " + filename)
elif e.errno == errno.EACCES:
sys.exit('file_open: Permission denied ' + filename)
sys.exit("file_open: Permission denied " + filename)
else:
sys.exit('file_open: Error ' + filename)
sys.exit("file_open: Error " + filename)
return fd
# read a msr
def read(self, cpu, register):
msrfile = self.get_file_name(cpu)
fd = self.file_open(msrfile, 'r')
fd = self.file_open(msrfile, "r")
try:
os.lseek(fd, int(register), os.SEEK_SET)
""" read and handle binary data from msr file """
value = struct.unpack('Q', os.read(fd, 8))[0]
value = struct.unpack("Q", os.read(fd, 8))[0]
os.close(fd)
except OSError as e:
os.close(fd)
if e.errno == errno.EIO:
sys.exit('read: I/O error ' + msrfile)
sys.exit("read: I/O error " + msrfile)
elif e.errno == errno.EACCES:
sys.exit('read: Permission denied ' + msrfile)
sys.exit("read: Permission denied " + msrfile)
else:
sys.exit('read: Error ' + msrfile)
sys.exit("read: Error " + msrfile)
return value
# write a msr
def write(self, cpu, register, value):
msrfile = self.get_file_name(cpu)
fd = self.file_open(msrfile, 'w')
fd = self.file_open(msrfile, "w")
try:
os.lseek(fd, int(register), os.SEEK_SET)
""" write binary data to msr file """
os.write(fd, struct.pack('Q', value))
os.write(fd, struct.pack("Q", value))
os.close(fd)
except OSError as e:
os.close(fd)
if e.errno == errno.EIO:
sys.exit('write: I/O error ' + msrfile)
sys.exit("write: I/O error " + msrfile)
elif e.errno == errno.EACCES:
sys.exit('write: Permission denied ' + msrfile)
sys.exit("write: Permission denied " + msrfile)
else:
sys.exit('write: Error ' + msrfile)
sys.exit("write: Error " + msrfile)
return value
This diff is collapsed.
......@@ -42,6 +42,7 @@ import coolr.dutycycle
class DDCMPolicy:
""" Contains cpu-specific DDCM based power policy """
def __init__(self, maxlevel=16, minlevel=1):
self.maxdclevel = maxlevel
self.mindclevel = minlevel
......@@ -55,8 +56,8 @@ class DDCMPolicy:
def print_stats(self, resetflag=False):
ddcmstats = dict()
ddcmstats['DDCMPolicySets'] = self.ddcmpolicyset
ddcmstats['DDCMPolicyResets'] = self.ddcmpolicyreset
ddcmstats["DDCMPolicySets"] = self.ddcmpolicyset
ddcmstats["DDCMPolicyResets"] = self.ddcmpolicyreset
if resetflag:
self.ddcmpolicyset = 0
self.ddcmpolicyreset = 0
......
......@@ -18,27 +18,28 @@ from zmq.eventloop import zmqstream
from schema import loadschema
_logger = logging.getLogger('nrm')
_UpstreamRep = loadschema('json', 'upstreamRep')
_UpstreamPub = loadschema('json', 'upstreamPub')
_logger = logging.getLogger("nrm")
_UpstreamRep = loadschema("json", "upstreamRep")
_UpstreamPub = loadschema("json", "upstreamPub")
def send(apiname):
def wrap(cls):
model = loadschema('json', apiname)
model = loadschema("json", apiname)
def send(self, *args, **kwargs):
self.socket.send(
json.dumps(model(dict(*args, **kwargs))))
self.socket.send(json.dumps(model(dict(*args, **kwargs))))
setattr(cls, "send", send)
return(cls)
return(wrap)
return cls
return wrap
def recv_callback(apiname):
def wrap(cls):
model = loadschema('json', apiname)
model = loadschema("json", apiname)
def recv(self):
"""Receives a response to a message."""
......@@ -65,8 +66,9 @@ def recv_callback(apiname):
setattr(cls, "do_recv_callback", do_recv_callback)
setattr(cls, "setup_recv_callback", setup_recv_callback)
return(cls)
return(wrap)
return cls
return wrap
class RPCClient(object):
......@@ -89,7 +91,7 @@ class RPCClient(object):
while wait:
msg = zmq.utils.monitor.recv_monitor_message(monitor)
_logger.debug("monitor message: %r", msg)
if int(msg['event']) == zmq.EVENT_CONNECTED:
if int(msg["event"]) == zmq.EVENT_CONNECTED:
_logger.debug("socket connected")
break
self.socket.disable_monitor()
......@@ -155,7 +157,7 @@ class UpstreamPubClient(object):
self.zmq_context = zmq.Context.instance()
self.socket = self.zmq_context.socket(zmq.SUB)
self.socket.setsockopt(zmq.RCVHWM, 0)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.SUBSCRIBE, "")
def connect(self, wait=True):
"""Creates a monitor socket and wait for the connect event."""
......@@ -164,7 +166,7 @@ class UpstreamPubClient(object):
while wait:
msg = zmq.utils.monitor.recv_monitor_message(monitor)
_logger.debug("monitor message: %r", msg)
if int(msg['event']) == zmq.EVENT_CONNECTED:
if int(msg["event"]) == zmq.EVENT_CONNECTED:
_logger.debug("socket connected")
break
self.socket.disable_monitor()
......
......@@ -41,14 +41,15 @@ import ddcmpolicy
import logging
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
class PowerPolicyManager:
""" Used for power policy application """
def __init__(self, cpus=None, policy=None, damper=1000000000,
slowdown=1.1):
def __init__(
self, cpus=None, policy=None, damper=1000000000, slowdown=1.1
):
self.cpus = cpus
self.policy = policy
self.damper = damper
......@@ -74,62 +75,68 @@ class PowerPolicyManager:
if self.policy:
for id in phase_contexts:
if id not in self.cpus:
logger.info("""Attempt to change power of cpu not in container
: %r""", id)
logger.info(
"""Attempt to change power of cpu not in container
: %r""",
id,
)
return
# Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition
# to user specified using manifest file
ret, value = self.execute(id, **phase_contexts[id])
if self.policy == 'DDCM':
if ret == 'DDCM':
if self.policy == "DDCM":
if ret == "DDCM":
self.dclevel[id] = value
# Incase of slowdown experienced by even process, reset all
# cpus
if ret == 'SLOWDOWN':
if ret == "SLOWDOWN":
self.reset_all()
phase_contexts[id]['set'] = False
phase_contexts[id]["set"] = False
def execute(self, cpu, **kwargs):
computetime = kwargs['computetime']
totalphasetime = kwargs['totaltime']
computetime = kwargs["computetime"]
totalphasetime = kwargs["totaltime"]
# If the current phase length is less than the damper value, then do
# not use policy. This avoids use of policy during startup operation
# insignificant phases
if totalphasetime < self.damper:
self.damperexits += 1
return 'DAMPER', -1
return "DAMPER", -1
# If the current phase has slowed down beyond the threshold set, then
# reset power. This helps correct error in policy application or acts
# as a rudimentary way to detect phase change
if(self.prevtolalphasetime[cpu] is not None and totalphasetime >
self.slowdown * self.prevtolalphasetime[cpu]):
if (
self.prevtolalphasetime[cpu] is not None
and totalphasetime > self.slowdown * self.prevtolalphasetime[cpu]
):
self.ddcmpolicy.dc.reset(cpu)
newdclevel = self.ddcmpolicy.maxdclevel
# Reset value for next phase
self.prevtolalphasetime[cpu] = totalphasetime
return 'SLOWDOWN', newdclevel
return "SLOWDOWN", newdclevel
# Invoke the correct policy based on operation module
if self.policy == "DDCM":
newdclevel = self.ddcmpolicy.execute(cpu, self.dclevel[cpu],
computetime