...
  View open merge request
Commits (7)
[flake8]
ignore = E203, E266, E501, W503, F403, F401, W605
max-line-length = 79
max-complexity = 18
select = B,C,E,F,W,T4,B9
......@@ -58,3 +58,7 @@ docs/manifest.rst
dist/
doc/
.pytype
.mypy_cache
out
---
variables:
ARGOPKGS: "https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz"
ARGOPKGS: "https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/py3/argopkgs-py3.tar.gz"
EXTRA: "--nrm ./."
stages:
......@@ -12,16 +12,9 @@ stages:
- docs
include:
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/argonix.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/nrm.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/components.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/integration.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/repoquality.yml
flake8:
stage: style
script:
- nix run -f "$ARGOPKGS" pythonPackages.flake8 --command flake8 nrm/*.py bin/*
tags:
- integration
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/py3/gitlab-ci/argonix.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/py3/gitlab-ci/nrm.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/py3/gitlab-ci/components.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/py3/gitlab-ci/integration.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/py3/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/py3/gitlab-ci/repoquality.yml
......@@ -50,6 +50,44 @@ Origin](https://developercertificate.org/) for copyright and license
management. We ask that you sign-off all of your commits as certification that
you have the rights to submit this work under the license of the project (in
the `LICENSE` file) and that you agree to the DCO.
Additionally, all source files must contain the license information, and authors
must be listed in the AUTHORS file.
To signoff commits, use: `git commit --signoff`.
To signoff a branch after the fact: `git rebase --signoff`
## Development environment:
get nix through your package manager or `curl https://nixos.org/nix/install | sh`
setup our local package repository:
```
$ nix-channel --add https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz argopkgs
$ nix-channel --update argopkgs
```
enter a `nix-shell` inside a local clone of the source repository.
```
$ cd /path/to/nrm
$ nix-shell
```
## Code quality tools:
This repository uses `flake8`, `black`, `mypy` and `pytype`. These are provided
by `nix-shell`, so you should just have to run them normally:
```
$ nix-shell
nix-shell$ flake8 nrm bin/nrm bin/nrmd bin/nrm-perfwrapper
nix-shell$ black nrm --check
nix-shell$ black bin/nrm --check
nix-shell$ black bin/nrmd --check
nix-shell$ black bin/nrm-perfwrapper --check
nix-shell$ mypy -p nrm
nix-shell$ pytype nrm -V 3.7
```
`black` is meant to be configured in your text editor, of course.
# Argo Node Resource Manager
# Node Resource Manager
Resource management daemon using communication with clients to control
power usage of application.
This is a python rewrite of the original code developed for the Argo project
two years ago.
The Node Resource Manager(NRM) is a node-local userspace client-server daemon
for managing your scientific applications. It runs the various tasks that
compose an application in resource-constrained slices, monitors performance,
power use and application progress, and arbitrates resources at the node
level, among which CPU Cores, NUMA Nodes, and Power budgets.
## Requirements
......@@ -15,8 +15,8 @@ running simply with:
And entering the resulting virtual environment with `pipenv shell`.
The NRM code only supports _argo-containers_ for now, so you need to install
the our container piece on the system for now.
The NRM code supports _argo-containers_ and _singularity_ for now, so you need
to either install our container piece on the system or have Singularity set up.
## Basic Usage
......@@ -27,3 +27,8 @@ Launch `nrmd` and use `nrm` to interact with it.
| **Systemwide Power Management with Argo**
| Dan Ellsworth, Tapasya Patki, Swann Perarnau, Pete Beckman *et al*
| In *High-Performance, Power-Aware Computing (HPPAC)*, 2016.
## Hacking
Please see CONTRIBUTING.markdown for information on the contribution model and
technical details.
#!/usr/bin/env python2
#!/usr/bin/env python3
from __future__ import print_function
import argparse
......@@ -11,7 +11,7 @@ import uuid
import zmq
from zmq.eventloop import ioloop, zmqstream
logger = logging.getLogger('nrm-dummy-application')
logger = logging.getLogger("nrm-dummy-application")
class DownstreamApplication(object):
......@@ -25,10 +25,11 @@ class DownstreamApplication(object):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def do_shutdown(self):
update = {'type': 'application',
'event': 'exit',
'uuid': self.app_uuid,
}
update = {
"type": "application",
"event": "exit",
"uuid": self.app_uuid,
}
self.downstream_pub.send_json(update)
ioloop.IOLoop.current().stop()
......@@ -39,25 +40,26 @@ class DownstreamApplication(object):
return
msg = json.loads(parts[0])
if isinstance(msg, dict):
uuid = msg['uuid']
uuid = msg["uuid"]
if uuid != self.app_uuid:
return
command = msg.get('command')
command = msg.get("command")
if command is None:
logger.error("missing command in message")
return
elif command == 'threads':
newth = msg['payload']
elif command == "threads":
newth = msg["payload"]
if newth >= 1 and newth <= self.max:
self.nt = newth
update = {'type': 'application',
'event': 'threads',
'payload': self.nt,
'uuid': self.app_uuid,
}
update = {
"type": "application",
"event": "threads",
"payload": self.nt,
"uuid": self.app_uuid,
}
self.downstream_pub.send_json(update)
elif command == 'exit':
elif command == "exit":
self.do_shutdown()
else:
logger.error("bad command")
......@@ -66,13 +68,14 @@ class DownstreamApplication(object):
def do_progress_report(self):
now = time.time()
seconds = now - self.last_update
ratio = float(self.nt)/float(self.max)
progress = seconds*ratio*42
update = {'type': 'application',
'event': 'progress',
'payload': progress,
'uuid': self.app_uuid,
}
ratio = float(self.nt) / float(self.max)
progress = seconds * ratio * 42
update = {
"type": "application",
"event": "progress",
"payload": progress,
"uuid": self.app_uuid,
}
self.downstream_pub.send_json(update)
self.last_update = now
......@@ -90,10 +93,12 @@ class DownstreamApplication(object):
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
logger.info("downstream pub socket connected to: %s",
downstream_pub_param)
logger.info("downstream sub socket connected to: %s",
downstream_sub_param)
logger.info(
"downstream pub socket connected to: %s", downstream_pub_param
)
logger.info(
"downstream sub socket connected to: %s", downstream_sub_param
)
# link sockets to events
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
......@@ -108,7 +113,7 @@ class DownstreamApplication(object):
self.progress.start()
# retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
self.container_uuid = os.environ.get("ARGO_CONTAINER_UUID")
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
......@@ -116,24 +121,30 @@ class DownstreamApplication(object):
logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon
update = {'type': 'application',
'event': 'start',
'container': self.container_uuid,
'uuid': self.app_uuid,
'progress': True,
'threads': {'min': 1, 'cur': self.nt, 'max': self.max},
}
update = {
"type": "application",
"event": "start",
"container": self.container_uuid,
"uuid": self.app_uuid,
"progress": True,
"threads": {"min": 1, "cur": self.nt, "max": self.max},
}
self.downstream_pub.send_json(update)
def main(self):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
parser.add_argument(
"-v",
"--verbose",
help="verbose logging information",
action="store_true",
)
parser.add_argument(
"threads", help="starting number of threads", type=int, default=16
)
parser.add_argument(
"maxthreads", help="max number of threads", type=int, default=32
)
args = parser.parse_args()
# deal with logging
......
This diff is collapsed.
#!/usr/bin/env python2
#!/usr/bin/env python3
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
......@@ -20,7 +20,7 @@ import subprocess
import uuid
from nrm import messaging
logger = logging.getLogger('perf-wrapper')
logger = logging.getLogger("perf-wrapper")
class PerfWrapper(object):
......@@ -36,10 +36,11 @@ class PerfWrapper(object):
def performance_report(self, performance):
self.downstream_event.send(
tag="performance",
payload=performance,
container_uuid=self.container_uuid,
application_uuid=self.app_uuid)
tag="performance",
payload=performance,
container_uuid=self.container_uuid,
application_uuid=self.app_uuid,
)
def setup(self):
downstream_url = "ipc:///tmp/nrm-downstream-event"
......@@ -49,7 +50,7 @@ class PerfWrapper(object):
logger.info("downstream pub socket connected to: %s", downstream_url)
# retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
self.container_uuid = os.environ.get("ARGO_CONTAINER_UUID")
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
......@@ -57,20 +58,29 @@ class PerfWrapper(object):
logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon
self.downstream_event.send(
tag="start",
container_uuid=self.container_uuid,
application_uuid=self.app_uuid)
tag="start",
container_uuid=self.container_uuid,
application_uuid=self.app_uuid,
)
def main(self):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("-f", "--frequency",
help="sampling frequency in ms",
type=int, default=1000)
parser.add_argument("cmd", help="command and arguments",
nargs=argparse.REMAINDER)
parser.add_argument(
"-v",
"--verbose",
help="verbose logging information",
action="store_true",
)
parser.add_argument(
"-f",
"--frequency",
help="sampling frequency in ms",
type=int,
default=1000,
)
parser.add_argument(
"cmd", help="command and arguments", nargs=argparse.REMAINDER
)
args = parser.parse_args()
if args.verbose:
......@@ -84,13 +94,24 @@ class PerfWrapper(object):
# There is no mkstemp for FIFOs but we can securely create a temporary
# directory and then create a FIFO inside of it.
tmpdir = tempfile.mkdtemp()
fifoname = os.path.join(tmpdir, 'perf-fifo')
fifoname = os.path.join(tmpdir, "perf-fifo")
logger.info("fifoname: %r", fifoname)
os.mkfifo(fifoname, 0o600)
perf_tool_path = os.environ.get('PERF', 'perf')
argv = [perf_tool_path, 'stat', '-e', 'instructions', '-x', ',',
'-I', str(args.frequency), '-o', fifoname, '--']
perf_tool_path = os.environ.get("PERF", "perf")
argv = [
perf_tool_path,
"stat",
"-e",
"instructions",
"-x",
",",
"-I",
str(args.frequency),
"-o",
fifoname,
"--",
]
argv.extend(args.cmd)
logger.info("argv: %r", argv)
......@@ -99,7 +120,7 @@ class PerfWrapper(object):
# This blocks until the other end opens as well so we need to invoke
# it after Popen.
# FIXME: will deadlock if Popen fails (say, no perf).
fifo = open(fifoname, 'r')
fifo = open(fifoname, "r")
last_time = 0.0
# "for line in fifo" idiom didn't work for me here -- Python was
......@@ -110,14 +131,14 @@ class PerfWrapper(object):
break
line = line.strip()
if len(line) == 0 or line[0] == '#':
if len(line) == 0 or line[0] == "#":
continue
tokens = line.split(',')
tokens = line.split(",")
logger.info("tokens: %r", tokens)
time = float(tokens[0])
if tokens[1] == '<not counted>':
if tokens[1] == "<not counted>":
instructions = 0
else:
instructions = int(tokens[1])
......
#!/usr/bin/env python2
#!/usr/bin/env python3
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
......@@ -23,107 +23,122 @@ def main(argv=None):
argv = sys.argv
conf_parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False
)
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False,
)
conf_parser.add_argument(
"-c",
"--configuration",
help="Specify a config json-formatted config file to override "
"any of the available CLI options. If an option is "
"actually provided on the command-line, it overrides "
"its corresponding value from the configuration file.",
metavar="FILE")
conf_parser.add_argument("-d", "--print_defaults", action='store_true',
help="Print the default configuration file.")
"-c",
"--configuration",
help="Specify a config json-formatted config file to override "
"any of the available CLI options. If an option is "
"actually provided on the command-line, it overrides "
"its corresponding value from the configuration file.",
metavar="FILE",
)
conf_parser.add_argument(
"-d",
"--print_defaults",
action="store_true",
help="Print the default configuration file.",
)
args, remaining_argv = conf_parser.parse_known_args()
defaults = {"nrm_log": "/tmp/nrm.log",
"hwloc": "hwloc",
"perf": "perf",
"argo_perf_wrapper": "nrm-perfwrapper",
"argo_nodeos_config": "argo_nodeos_config",
"pmpi_lib": "/usr/lib/libnrm-pmpi.so",
"singularity": "singularity",
"container_runtime": "nodeos",
}
defaults = {
"nrm_log": "/tmp/nrm.log",
"hwloc": "hwloc",
"perf": "perf",
"argo_perf_wrapper": "nrm-perfwrapper",
"argo_nodeos_config": "argo_nodeos_config",
"pmpi_lib": "/usr/lib/libnrm-pmpi.so",
"singularity": "singularity",
"container_runtime": "nodeos",
}
if args.print_defaults:
print(defaults)
return(0)
return 0
if args.configuration:
defaults.update(json.load(open(args.configuration)))
parser = argparse.ArgumentParser(parents=[conf_parser])
parser.set_defaults(**defaults)
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="store_true")
parser.add_argument(
"--nrm_log",
help="Main log file. Override default with the NRM_LOG "
"environment variable",
default=os.environ.get('NRM_LOG',
'/tmp/nrm.log'))
"-v",
"--verbose",
help="increase output verbosity",
action="store_true",
)
parser.add_argument(
"--nrm_log",
help="Main log file. Override default with the NRM_LOG "
"environment variable",
default=os.environ.get("NRM_LOG", "/tmp/nrm.log"),
)
parser.add_argument(
'--msr_safe',
help="Whether to use msr_safe instead of RAPL",
default=True)
"--msr_safe",
help="Whether to use msr_safe instead of RAPL",
default=True,
)
parser.add_argument(
'--hwloc',
help="Path to the hwloc to use. This path can be "
"relative and makes uses of the $PATH if necessary. "
"Override default with the HWLOC environment "
"variable.",
default=os.environ.get('HWLOC',
'hwloc'))
"--hwloc",
help="Path to the hwloc to use. This path can be "
"relative and makes uses of the $PATH if necessary. "
"Override default with the HWLOC environment "
"variable.",
default=os.environ.get("HWLOC", "hwloc"),
)
parser.add_argument(
'--argo_nodeos_config',
help="Path to the argo_nodeos_config to use. This path "
"can be relative and makes uses of the $PATH if "
"necessary. Override default with the "
"ARGO_NODEOS_CONFIG environment variable.",
default=os.environ.get('ARGO_NODEOS_CONFIG',
'argo_nodeos_config'))
"--argo_nodeos_config",
help="Path to the argo_nodeos_config to use. This path "
"can be relative and makes uses of the $PATH if "
"necessary. Override default with the "
"ARGO_NODEOS_CONFIG environment variable.",
default=os.environ.get("ARGO_NODEOS_CONFIG", "argo_nodeos_config"),
)
parser.add_argument(
'--perf',
help="Path to the linux perf tool to use. This path can be "
"relative and makes uses of the $PATH if necessary. "
"Override default with the PERF environment "
"variable.",
default=os.environ.get('PERF',
'perf'))
"--perf",
help="Path to the linux perf tool to use. This path can be "
"relative and makes uses of the $PATH if necessary. "
"Override default with the PERF environment "
"variable.",
default=os.environ.get("PERF", "perf"),
)
parser.add_argument(
'--pmpi_lib',
help="Path to the libnrm PMPI library used for the power policy. "
"Override default with the PMPI environment variable.",
default=os.environ.get('PMPI', defaults['pmpi_lib']))
"--pmpi_lib",
help="Path to the libnrm PMPI library used for the power policy. "
"Override default with the PMPI environment variable.",
default=os.environ.get("PMPI", defaults["pmpi_lib"]),
)
parser.add_argument(
'--argo_perf_wrapper',
help="Path to the linux perf tool to use. This path can "
"be relative and makes uses of the $PATH if "
"necessary. Override default with the PERFWRAPPER "
"environment variable.",
default=os.environ.get('ARGO_PERF_WRAPPER',
'nrm-perfwrapper'))
"--argo_perf_wrapper",
help="Path to the linux perf tool to use. This path can "
"be relative and makes uses of the $PATH if "
"necessary. Override default with the PERFWRAPPER "
"environment variable.",
default=os.environ.get("ARGO_PERF_WRAPPER", "nrm-perfwrapper"),
)
parser.add_argument(
'--singularity',
help="Path to the singularity command. "
"Override default with the SINGULARITY environment variable.",
default=os.environ.get('SINGULARITY', defaults['singularity']))
"--singularity",
help="Path to the singularity command. "
"Override default with the SINGULARITY environment variable.",
default=os.environ.get("SINGULARITY", defaults["singularity"]),
)
parser.add_argument(
'--container-runtime',
help="Choice of container runtime. "
"Override default with the ARGO_CONTAINER_RUNTIME "
"environment variable.",
choices=['nodeos', 'singularity'],
default=os.environ.get('ARGO_CONTAINER_RUNTIME',
defaults['container_runtime']))
"--container-runtime",
help="Choice of container runtime. "
"Override default with the ARGO_CONTAINER_RUNTIME "
"environment variable.",
choices=["nodeos", "singularity"],
default=os.environ.get(
"ARGO_CONTAINER_RUNTIME", defaults["container_runtime"]
),
)
args = parser.parse_args(remaining_argv)
nrm.daemon.runner(config=args)
return(0)
return 0
if __name__ == "__main__":
......
......@@ -6,7 +6,11 @@ rec {
nrm-dist = pkgs.nrm-dist;
hack = nrm.overrideAttrs (old:{
buildInputs = old.buildInputs ++ [
pkgs.pythonPackages.flake8
pkgs.pythonPackages.sphinx ];
pkgs.flake8
pkgs.black
pkgs.mypy
pkgs.pytype
pkgs.sphinx
];
});
}
......@@ -10,13 +10,13 @@
"""Parse and Represent the APPC ACI specification."""
import logging
from schema import loadschema
from nrm.schema import loadschema
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
def has(self, f):
return(f in self.app.keys())
return f in self.app.keys()
ImageManifest = loadschema("yml", "manifest")
......
......@@ -12,28 +12,30 @@ from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
class Application(object):
"""Information about a downstream API user."""
thread_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'noop': 'max'},
's_ask_d': {'done': 'stable', 'noop': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'noop': 'noop'},
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
thread_fsm_table = {
"stable": {"i": "s_ask_i", "d": "s_ask_d"},
"s_ask_i": {"done": "stable", "noop": "max"},
"s_ask_d": {"done": "stable", "noop": "min"},
"max": {"d": "max_ask_d"},
"min": {"i": "min_ask_i"},
"max_ask_d": {"done": "stable", "noop": "noop"},
"min_ask_i": {"done": "stable", "noop": "noop"},
"noop": {},
}
def __init__(self, uuid, container, progress, threads, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.thread_state = "stable"
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
......@@ -49,21 +51,21 @@ class Application(object):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
speed = float(self.progress) / float(self.threads["cur"])
if command == "i":
return speed
else:
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
newth = msg["payload"]
curth = self.threads["cur"]
if newth == curth:
self.do_thread_transition('noop')
self.do_thread_transition("noop")
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
self.do_thread_transition("done")
self.threads["cur"] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
......@@ -75,10 +77,11 @@ class Application(object):
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = int(msg.cpu)
self.phase_contexts[id] = {k: getattr(msg, k) for k in
('aggregation', 'computetime',
'totaltime')}
self.phase_contexts[id]['set'] = True
self.phase_contexts[id] = {
k: getattr(msg, k)
for k in ("aggregation", "computetime", "totaltime")
}
self.phase_contexts[id]["set"] = True
class ApplicationManager(object):
......@@ -91,21 +94,22 @@ class ApplicationManager(object):
def register(self, msg, container):
"""Register a new downstream application."""
uuid = msg['application_uuid']
container_uuid = msg['container_uuid']
uuid = msg["application_uuid"]
container_uuid = msg["container_uuid"]
progress = 0
threads = False
phase_contexts = dict()
phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime']
if container.power['policy']:
phase_context_keys = ["set", "aggregation", "computetime", "totaltime"]
if container.power["policy"]:
ids = container.resources.cpus
for id in ids:
phase_contexts[id] = dict.fromkeys(phase_context_keys)
phase_contexts[id]['set'] = False
phase_contexts[id]["set"] = False
else:
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
self.applications[uuid] = Application(
uuid, container_uuid, progress, threads, phase_contexts
)
def delete(self, uuid):
"""Delete an application from the register."""
......
This diff is collapsed.
......@@ -12,7 +12,7 @@ from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
class Action(object):
......@@ -25,43 +25,6 @@ class Action(object):
self.delta = delta
# class ApplicationActuator(object):
#
# """Actuator in charge of application thread control."""
#
# def __init__(self, am, pubstream):
# self.application_manager = am
# self.pubstream = pubstream
#
# def available_actions(self, target):
# ret = []
# for identity, application in \
# self.application_manager.applications.iteritems():
# if target in application.get_allowed_thread_requests():
# delta = application.get_thread_request_impact(target)
# ret.append(Action(application, target, delta))
# return ret
#
# def execute(self, action):
# target_threads = action.target.threads
# update = {'type': 'application',
# 'command': 'threads',
# 'uuid': action.target.uuid,
# 'event': 'threads',
# }
# if action.command == 'i':
# payload = target_threads['cur'] + 1
# elif action.command == 'd':
# payload = target_threads['cur'] - 1
# else:
# assert False, "impossible command"
# update['payload'] = payload
# self.pubstream.send_json()
#
# def update(self, action):
# action.target.do_thread_transition(action.command)
class PowerActuator(object):
"""Actuator in charge of power control."""
......@@ -73,19 +36,20 @@ class PowerActuator(object):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("power limits: %r:", pl)
if target == 'i':
if target == "i":
for k in pl:
r = range(int(pl[k]['curW'])+1, int(pl[k]['maxW']))
r = range(int(pl[k]["curW"]) + 1, int(pl[k]["maxW"]))
actions.extend([Action(k, s, s - r[0]) for s in r])
elif target == 'd':
elif target == "d":
for k in pl:
r = range(1, int(pl[k]['curW']))
r = range(1, int(pl[k]["curW"]))
actions.extend([Action(k, s, r[-1] - s) for s in r])
return actions
def execute(self, action):
logger.info("changing power limit: %r, %r", action.command,
action.delta)
logger.info(
"changing power limit: %r, %r", action.command, action.delta
)
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
......@@ -102,30 +66,31 @@ class Controller(object):
def planify(self, target, machineinfo):
"""Plan the next action for the control loop."""
try:
total_power = machineinfo['energy']['power']['total']
total_power = machineinfo["energy"]["power"]["total"]
direction = None
if total_power < target:
direction = "i"
elif total_power > target:
direction = "d"
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop(0)
else:
return (None, None)
return (None, None)
except TypeError:
logging.error("\"machineinfo\" malformed. Can not run "
"control loop.")
logging.error(
'"machineinfo" malformed. Can not run ' "control loop."
)
return (None, None)
direction = None
if total_power < target:
direction = 'i'
elif total_power > target:
direction = 'd'
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop(0)
else:
return (None, None)
def execute(self, action, actuator):
"""Build the action for the appropriate manager."""
actuator.execute(action)
......@@ -139,26 +104,29 @@ class Controller(object):
ids = container.resources.cpus
pcs = application.phase_contexts
# Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids):
if not filter(lambda i: not pcs[i]["set"], ids):
# Only run policy if all phase contexts are an
# aggregation of same number of phases
aggs = [pcs[i]['aggregation'] for i in ids]
aggs = [pcs[i]["aggregation"] for i in ids]
if aggs.count(aggs[0]) == len(aggs):
container.power['manager'].run_policy(pcs)
if filter(lambda i: pcs[i]['set'], ids):
container.power["manager"].run_policy(pcs)
if filter(lambda i: pcs[i]["set"], ids):
logger.debug("Phase context not reset %r", application)
else:
container.power['manager'].reset_all()
container.power["manager"].reset_all()
for i in ids:
pcs[i]['set'] = False
pcs[i]["set"] = False
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
p = containers[container].power
if p['policy']:
if p["policy"]:
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
app = next(
apps[a]
for a in apps
if apps[a].container_uuid == container
)
self.run_policy_container(containers[container], app)
......@@ -8,8 +8,6 @@
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
#!/usr/bin/env python
#
# coolr cpufreq related codes
#
# There is no perfect way to read the CPU clock on x86. We need to
......@@ -18,26 +16,26 @@
# This code requires the cpustat driver
#
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
import os, sys, time
import struct
import copy
import clr_nodeinfo
import numpy as np
#an example the content of cpustat
#id 0
#aperf 4926926121023
#mperf 4582847073452
#perf_bias 8
#ucc 281462841145699
#urc 0
#perf_target 8448
#perf_status 8448
#pstate 33
#turbo_disengage 0
#tsc 1117245755950154
import os
import sys
import time
import nrm.coolr.clr_nodeinfo # type: ignore
import numpy as np # type: ignore
# an example the content of cpustat
# id 0
# aperf 4926926121023
# mperf 4582847073452
# perf_bias 8
# ucc 281462841145699
# urc 0
# perf_target 8448
# perf_status 8448
# pstate 33
# turbo_disengage 0
# tsc 1117245755950154
class cpustatvals:
def cpustatfn(self, cpuid):
......@@ -49,69 +47,69 @@ class cpustatvals:
self.cpuid = cpuid
def parse(self):
self.d = {} # clear d's contents
self.d['time'] = time.time()
self.d = {} # clear d's contents
self.d["time"] = time.time()
with open(self.cpustatfn(self.cpuid)) as f:
while True:
l = f.readline()
if not l:
li = f.readline()
if not li:
break
a = l.split()
if a[0] in ('id', 'aperf', 'mperf', 'pstate', 'tsc'):
a = li.split()
if a[0] in ("id", "aperf", "mperf", "pstate", "tsc"):
self.d[a[0]] = int(a[1])
def pr(self):
for k in ('id', 'aperf', 'mperf', 'pstate', 'tsc'):
print '%s=%d ' % (k, self.d[k]),
print
def diff_u64(self, v1, v2): # v1 - v2
for k in ("id", "aperf", "mperf", "pstate", "tsc"):
print("%s=%d " % (k, self.d[k]))
print()
def diff_u64(self, v1, v2): # v1 - v2
if v1 >= v2:
return v1 - v2
return (self.u64max -v2) + v1
return (self.u64max - v2) + v1
def calc_cpufreq(self,prev): # prev is an object of cpustatvals
if not (prev.d.has_key('tsc') and self.d.has_key('tsc')):
return 0.0
def calc_cpufreq(self, prev): # prev is an object of cpustatvals
if not ("tsc" in prev.d and "tsc" in self.d):
return 0.0
tmp = {}
for k in ('tsc', 'aperf', 'mperf'):
for k in ("tsc", "aperf", "mperf"):
tmp[k] = float(self.diff_u64(self.d[k], prev.d[k]))
dt = self.d['time'] - prev.d['time']
freq = tmp['aperf'] / tmp['mperf']
freq *= tmp['tsc']
dt = self.d["time"] - prev.d["time"] # pytype: disable=key-error
freq = tmp["aperf"] / tmp["mperf"]
freq *= tmp["tsc"]
freq *= 1e-9 # covert it to GHz
freq /= dt
return freq
def calc_aperf(self,prev): # prev is an object of cpustatvals
if not (prev.d.has_key('tsc') and self.d.has_key('tsc')):
return 0.0
def calc_aperf(self, prev): # prev is an object of cpustatvals
if not ("tsc" in prev.d and "tsc" in self.d):
return 0.0
tmp = {}
k = 'aperf'
tmp[k] = float(self.diff_u64(self.d[k], prev.d[k]))
k = "aperf"
tmp[k] = float(
self.diff_u64(self.d[k], prev.d[k]) # pytype: disable=key-error
)
dt = self.d['time'] - prev.d['time']
return tmp['aperf'] * 1e-9 / dt
dt = self.d["time"] - prev.d["time"] # pytype: disable=key-error
return tmp["aperf"] * 1e-9 / dt
class cpufreq_reader:
def __init__(self):
self.outputpercore(True)
# I don't know how to create an object in a singleton manner in python
# so simply instantiating an object of cputopology again here.
self.ct = clr_nodeinfo.cputopology()
self.ct = nrm.coolr.clr_nodeinfo.cputopology()
self.cpus = self.ct.onlinecpus # just for convenience
self.cpus = self.ct.onlinecpus # just for convenience
self.init = False
for cpuid in self.cpus:
tmp = cpustatvals(cpuid) # just for cpustatfn
tmp = cpustatvals(cpuid) # just for cpustatfn
statpath = tmp.cpustatfn(cpuid)
if not os.path.exists(statpath):
# print 'Not found', statpath
......@@ -121,7 +119,8 @@ class cpufreq_reader:
self.cnt = 0
self.samples = [
[cpustatvals(i) for i in self.cpus],
[cpustatvals(i) for i in self.cpus] ]
[cpustatvals(i) for i in self.cpus],
]
self.sample()
......@@ -134,7 +133,6 @@ class cpufreq_reader:
self.samples[idx][cpuid].parse()
self.cnt = self.cnt + 1
def pstate(self):
ret = [0.0 for i in self.cpus]
if not self.init:
......@@ -142,11 +140,11 @@ class cpufreq_reader:
if self.cnt == 0:
return ret
idx = 0 # if cnt is an odd number
idx = 0 # if cnt is an odd number
if self.cnt % 2 == 0:
idx = 1
for cpuid in self.cpus:
ret[cpuid] = self.samples[idx][cpuid].d['pstate']
ret[cpuid] = self.samples[idx][cpuid].d["pstate"]
return ret
......@@ -165,7 +163,8 @@ class cpufreq_reader:
for cpuid in self.cpus:
ret[cpuid] = self.samples[idxcur][cpuid].calc_cpufreq(
self.samples[idxprev][cpuid])
self.samples[idxprev][cpuid]
)
return ret
......@@ -184,16 +183,17 @@ class cpufreq_reader:
for cpuid in self.cpus:
ret[cpuid] = self.samples[idxcur][cpuid].calc_aperf(
self.samples[idxprev][cpuid])
self.samples[idxprev][cpuid]
)
return ret
def outputpercore(self,flag=True):
self.percore=flag
def outputpercore(self, flag=True):
self.percore = flag
def sample_and_json(self, node=""):
if not self.init:
return ''
return ""
self.sample()
f = self.aperf()
......@@ -207,55 +207,10 @@ class cpufreq_reader:
freqstd = np.std(tmp)
buf += ',"p%s":{' % p
buf += '"mean":%.3lf,"std":%.3lf' % (freqmean,freqstd)
buf += '"mean":%.3lf,"std":%.3lf' % (freqmean, freqstd)
if self.percore:
for c in self.ct.pkgcpus[p]:
buf += ',"c%d":%.3lf' % (c, f[c])
buf += '}'
buf += '}'
buf += "}"
buf += "}"
return buf
if __name__ == '__main__':
freq = cpufreq_reader()
if not freq.init:
print 'Please check the cpustat module is installed'
sys.exit(1)
freq.outputpercore(False)
for i in range(0, 20):
j = freq.sample_and_json()
print '[freq json]'
print j
time.sleep(1)
sys.exit(0)
for i in range(0, 20):
freq.sample()
print '[pstate]',
for p in freq.pstate():
print p,
print
print '[aperf]',
for f in freq.aperf():
print '%.2lf ' % f,
print
print '[freq]',
for f in freq.cpufreq():
print '%.2lf ' % f,
print
j = freq.sample_and_json()
print '[freq json]'
print j
print
time.sleep(1)
......@@ -8,8 +8,6 @@
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
#!/usr/bin/env python
#
# coolr hwmon related codes
#
# This code requires the coretemp driver for temperature reading
......@@ -17,84 +15,88 @@
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
import re, os, sys
import numpy as np
from clr_nodeinfo import *
import re
import os
import sys
import numpy as np # type: ignore
import nrm.coolr.clr_nodeinfo
from nrm.coolr.clr_misc import readbuf
class coretemp_reader :
def parse_pkgtemp(self,fn):
class coretemp_reader:
def parse_pkgtemp(self, fn):
retval = -1
try:
f = open(fn , "r")
except:
f = open(fn, "r")
except Exception:
return retval
l = f.readline()
m = re.search('Physical id ([0-9]+)', l )
m = re.search("Physical id ([0-9]+)", f.readline())
if m:
retval=int(m.group(1))
retval = int(m.group(1))
f.close()
return retval
def parse_coretemp(self,fn):
def parse_coretemp(self, fn):
retval = -1
try:
f = open(fn , "r")
except:
f = open(fn, "r")
except Exception:
return retval
l = f.readline()
m = re.search('Core ([0-9]+)', l )
m = re.search("Core ([0-9]+)", f.readline())
if m:
retval=int(m.group(1))
retval = int(m.group(1))
f.close()
return retval
hwmondir = '/sys/class/hwmon/'
hwmondir = "/sys/class/hwmon/"
class coretempinfo:
def __init__(self):
self.dir = ''
self.coretempfns = {} # use coreid as key
self.pkgtempfn = ''
self.dir = ""
self.coretempfns = {} # use coreid as key
self.pkgtempfn = ""
def __init__ (self):
def __init__(self):
self.outputpercore(True)
self.coretemp = {} # use pkgid as key
self.coretemp = {} # use pkgid as key
for d1 in os.listdir(self.hwmondir):
# try to check see if 'name' contains 'coretemp'
tmpdir = "%s%s" % (self.hwmondir,d1)
tmpdir = "%s%s" % (self.hwmondir, d1)
drivername = readbuf("%s/name" % tmpdir).rstrip()
if not drivername == "coretemp":
continue
pkgid = -1
coretempfns = {}
pkgtempfn = ''
pkgtempfn = ""
# parse all temp*_label files
for d2 in os.listdir( tmpdir ):
m = re.search( 'temp([0-9]+)_label', d2 )
for d2 in os.listdir(tmpdir):
m = re.search("temp([0-9]+)_label", d2)
if m:
tempid=int(m.group(1))
tempid = int(m.group(1))
coreid = self.parse_coretemp("%s/%s" % (tmpdir, d2))
if coreid >= 0 :
coretempfns[coreid] = "%s/temp%d_input" % (tmpdir, tempid)
else: # possibly pkgid
if coreid >= 0:
coretempfns[coreid] = "%s/temp%d_input" % (
tmpdir,
tempid,
)
else: # possibly pkgid
pkgtempfn = "%s/temp%d_input" % (tmpdir, tempid)
pkgid = self.parse_pkgtemp("%s/%s" % (tmpdir, d2))
if pkgid < 0 :
print 'unlikely: ', pkgtempfn
if pkgid < 0:
print("unlikely: ", pkgtempfn)
cti = self.coretempinfo()
cti.dir = tmpdir
cti.coretempfns = coretempfns
cti.pkgtempfn = pkgtempfn
if pkgid < 0: # assume a single socket machine
if pkgid < 0: # assume a single socket machine
self.coretemp[0] = cti
else:
self.coretemp[pkgid] = cti
print(self.coretemp)
def readtempall(self):
ctemp = self.coretemp
......@@ -102,17 +104,17 @@ class coretemp_reader :
for pkgid in sorted(ctemp.keys()):
temps = {}
if os.access(ctemp[pkgid].pkgtempfn, os.R_OK):
val = int(readbuf(ctemp[pkgid].pkgtempfn))/1000
temps['pkg'] = val
val = int(readbuf(ctemp[pkgid].pkgtempfn)) / 1000
temps["pkg"] = val
for c in sorted(ctemp[pkgid].coretempfns.keys()):
if os.access(ctemp[pkgid].coretempfns[c], os.R_OK):
val = int(readbuf(ctemp[pkgid].coretempfns[c]))/1000
val = int(readbuf(ctemp[pkgid].coretempfns[c])) / 1000
temps[c] = val
ret[pkgid] = temps
return ret
def outputpercore(self,flag=True):
self.percore=flag
def outputpercore(self, flag=True):
self.percore = flag
def sample(self):
temp = self.readtempall()
......@@ -123,10 +125,10 @@ class coretemp_reader :
key = "p%d" % p
ret[key] = dict()
pstat = self.getpkgstats(temp, p)
ret[key]['mean'] = pstat[0]
ret[key]['std'] = pstat[1]
ret[key]['min'] = pstat[2]
ret[key]['max'] = pstat[3]
ret[key]["mean"] = pstat[0]
ret[key]["std"] = pstat[1]
ret[key]["min"] = pstat[2]
ret[key]["max"] = pstat[3]
if self.percore:
for c in sorted(temp[p].keys()):
ret[key][c] = temp[p][c]
......@@ -144,32 +146,32 @@ class coretemp_reader :
for c in temps[pkgid].keys():
vals.append(temps[pkgid][c])
return [np.mean(vals), np.std(vals), np.min(vals), np.max(vals)]
def readpkgtemp(self):
fn = "%s_input" % self.pkgtempfns[pkgid].pkgfn
f = open(fn)
v = int(f.readline())/1000.0
f.close()
return v
def readcoretemp(self,pkgid):
t = []
for fnbase in self.pkgtempfns[pkgid].corefns:
fn = "%s_input" % fnbase
if not os.access( fn, os.R_OK ):
continue # cpu may become offline
f = open(fn)
v = int(f.readline())/1000.0
f.close()
t.append(v)
return t
class acpi_power_meter_reader :
# def readpkgtemp(self, pkgid):
# fn = "%s_input" % self.pkgtempfns[pkgid].pkgfn
# f = open(fn)
# v = int(f.readline()) / 1000.0
# f.close()
# return v
# def readcoretemp(self, pkgid):
# t = []
# for fnbase in self.pkgtempfns[pkgid].corefns:
# fn = "%s_input" % fnbase
# if not os.access(fn, os.R_OK):
# continue # cpu may become offline
# f = open(fn)
# v = int(f.readline()) / 1000.0
# f.close()
# t.append(v)
# return t
class acpi_power_meter_reader:
# add a nicer detection routine later
def __init__(self):
self.init = False
fn = '/sys/class/hwmon/hwmon0/device/power1_average'
fn = "/sys/class/hwmon/hwmon0/device/power1_average"
if os.path.exists(fn):
self.init = True
......@@ -179,17 +181,16 @@ class acpi_power_meter_reader :
def read(self):
if not self.init:
return -1
retval=-1
fn = '/sys/class/hwmon/hwmon0/device/power1_average'
retval = -1
fn = "/sys/class/hwmon/hwmon0/device/power1_average"
try:
f = open(fn , "r")
except:
f = open(fn, "r")
except Exception:
return retval
l = f.readline()
retval = int(l) * 1e-6 # uW to W
retval = int(f.readline()) * 1e-6 # uW to W
f.close()
return retval
......@@ -197,4 +198,4 @@ class acpi_power_meter_reader :
if not self.init:
return {}
return {'power': self.read()}
return {"power": self.read()}
......@@ -8,29 +8,29 @@
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
#!/usr/bin/env python
#
# misc. classes, functions
#
# Contact: Kazutomo Yoshii <ky@anl.gov>
#
import os, sys, re, time
import os
import sys
import re
import time
def readbuf(fn):