Commit 167bdfc1 authored by Valentin Reis's avatar Valentin Reis

[refactor] moves the codebase to python3 and applies code quality tools

this massive commit moves the codebase to Python 3.7 and
configures/applies flake8,black,mypy,pytypes. As a result, various bugs
have been catched statically. The code passes pytype inference and is
ready for type annotations. This corresponds to the addition of these
tools to the CI.
parent 6e11fe7e
Pipeline #7871 failed with stages
in 39 minutes and 2 seconds
[flake8]
ignore = E203, E266, E501, W503, F403, F401, W605
max-line-length = 79
max-complexity = 18
select = B,C,E,F,W,T4,B9
......@@ -55,3 +55,8 @@ _output
# documentation
docs/_build
docs/manifest.rst
.pytype
.mypy_cache
out
......@@ -18,10 +18,3 @@ include:
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/integration.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/repoquality.yml
flake8:
stage: style
script:
- nix run -f "$ARGOPKGS" pythonPackages.flake8 --command flake8 nrm/*.py bin/*
tags:
- integration
......@@ -50,6 +50,44 @@ Origin](https://developercertificate.org/) for copyright and license
management. We ask that you sign-off all of your commits as certification that
you have the rights to submit this work under the license of the project (in
the `LICENSE` file) and that you agree to the DCO.
Additionally, all source files must contain the license information, and authors
must be listed in the AUTHORS file.
To signoff commits, use: `git commit --signoff`.
To signoff a branch after the fact: `git rebase --signoff`
## Development environment:
get nix through your package manager or `curl https://nixos.org/nix/install | sh`
setup our local package repository:
```
$ nix-channel --add https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz argopkgs
$ nix-channel --update argopkgs
```
enter a `nix-shell` inside a local clone of the source repository.
```
$ cd /path/to/nrm
$ nix-shell
```
## Code quality tools:
This repository uses `flake8`, `black`, `mypy` and `pytype`. These are provided
by `nix-shell`, so you should just have to run them normally:
```
$ nix-shell
nix-shell$ flake8 nrm bin/nrm bin/nrmd bin/nrm-perfwrapper
nix-shell$ black nrm --check
nix-shell$ black bin/nrm --check
nix-shell$ black bin/nrmd --check
nix-shell$ black bin/nrm-perfwrapper --check
nix-shell$ mypy -p nrm
nix-shell$ pytype nrm -V 3.7
```
`black` is meant to be configured in your text editor, of course.
PYTHON:= $(shell which python2)
PYTHON:= $(shell which python3)
source:
$(PYTHON) setup.py sdist
......
# Argo Node Resource Manager
# Node Resource Manager
Resource management daemon using communication with clients to control
power usage of application.
This is a python rewrite of the original code developed for the Argo project
two years ago.
The Node Resource Manager(NRM) is a node-local userspace client-server daemon
for managing your scientific applications. It runs the various tasks that
compose an application in resource-constrained slices, monitors performance,
power use and application progress, and arbitrates resources at the node
level, among which CPU Cores, NUMA Nodes, and Power budgets.
## Requirements
......@@ -15,8 +15,8 @@ running simply with:
And entering the resulting virtual environment with `pipenv shell`.
The NRM code only supports _argo-containers_ for now, so you need to install
the our container piece on the system for now.
The NRM code supports _argo-containers_ and _singularity_ for now, so you need
to either install our container piece on the system or have Singularity set up.
## Basic Usage
......@@ -27,3 +27,8 @@ Launch `nrmd` and use `nrm` to interact with it.
| **Systemwide Power Management with Argo**
| Dan Ellsworth, Tapasya Patki, Swann Perarnau, Pete Beckman *et al*
| In *High-Performance, Power-Aware Computing (HPPAC)*, 2016.
## Hacking
Please see CONTRIBUTING.markdown for information on the contribution model and
technical details.
#!/usr/bin/env python2
#!/usr/bin/env python3
from __future__ import print_function
import argparse
......@@ -11,7 +11,7 @@ import uuid
import zmq
from zmq.eventloop import ioloop, zmqstream
logger = logging.getLogger('nrm-dummy-application')
logger = logging.getLogger("nrm-dummy-application")
class DownstreamApplication(object):
......@@ -25,10 +25,11 @@ class DownstreamApplication(object):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def do_shutdown(self):
update = {'type': 'application',
'event': 'exit',
'uuid': self.app_uuid,
}
update = {
"type": "application",
"event": "exit",
"uuid": self.app_uuid,
}
self.downstream_pub.send_json(update)
ioloop.IOLoop.current().stop()
......@@ -39,25 +40,26 @@ class DownstreamApplication(object):
return
msg = json.loads(parts[0])
if isinstance(msg, dict):
uuid = msg['uuid']
uuid = msg["uuid"]
if uuid != self.app_uuid:
return
command = msg.get('command')
command = msg.get("command")
if command is None:
logger.error("missing command in message")
return
elif command == 'threads':
newth = msg['payload']
elif command == "threads":
newth = msg["payload"]
if newth >= 1 and newth <= self.max:
self.nt = newth
update = {'type': 'application',
'event': 'threads',
'payload': self.nt,
'uuid': self.app_uuid,
}
update = {
"type": "application",
"event": "threads",
"payload": self.nt,
"uuid": self.app_uuid,
}
self.downstream_pub.send_json(update)
elif command == 'exit':
elif command == "exit":
self.do_shutdown()
else:
logger.error("bad command")
......@@ -66,13 +68,14 @@ class DownstreamApplication(object):
def do_progress_report(self):
now = time.time()
seconds = now - self.last_update
ratio = float(self.nt)/float(self.max)
progress = seconds*ratio*42
update = {'type': 'application',
'event': 'progress',
'payload': progress,
'uuid': self.app_uuid,
}
ratio = float(self.nt) / float(self.max)
progress = seconds * ratio * 42
update = {
"type": "application",
"event": "progress",
"payload": progress,
"uuid": self.app_uuid,
}
self.downstream_pub.send_json(update)
self.last_update = now
......@@ -90,10 +93,12 @@ class DownstreamApplication(object):
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
logger.info("downstream pub socket connected to: %s",
downstream_pub_param)
logger.info("downstream sub socket connected to: %s",
downstream_sub_param)
logger.info(
"downstream pub socket connected to: %s", downstream_pub_param
)
logger.info(
"downstream sub socket connected to: %s", downstream_sub_param
)
# link sockets to events
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
......@@ -108,7 +113,7 @@ class DownstreamApplication(object):
self.progress.start()
# retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
self.container_uuid = os.environ.get("ARGO_CONTAINER_UUID")
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
......@@ -116,24 +121,30 @@ class DownstreamApplication(object):
logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon
update = {'type': 'application',
'event': 'start',
'container': self.container_uuid,
'uuid': self.app_uuid,
'progress': True,
'threads': {'min': 1, 'cur': self.nt, 'max': self.max},
}
update = {
"type": "application",
"event": "start",
"container": self.container_uuid,
"uuid": self.app_uuid,
"progress": True,
"threads": {"min": 1, "cur": self.nt, "max": self.max},
}
self.downstream_pub.send_json(update)
def main(self):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
parser.add_argument(
"-v",
"--verbose",
help="verbose logging information",
action="store_true",
)
parser.add_argument(
"threads", help="starting number of threads", type=int, default=16
)
parser.add_argument(
"maxthreads", help="max number of threads", type=int, default=32
)
args = parser.parse_args()
# deal with logging
......
This diff is collapsed.
#!/usr/bin/env python2
#!/usr/bin/env python3
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
......@@ -20,7 +20,7 @@ import subprocess
import uuid
from nrm import messaging
logger = logging.getLogger('perf-wrapper')
logger = logging.getLogger("perf-wrapper")
class PerfWrapper(object):
......@@ -36,10 +36,11 @@ class PerfWrapper(object):
def performance_report(self, performance):
self.downstream_event.send(
tag="performance",
payload=performance,
container_uuid=self.container_uuid,
application_uuid=self.app_uuid)
tag="performance",
payload=performance,
container_uuid=self.container_uuid,
application_uuid=self.app_uuid,
)
def setup(self):
downstream_url = "ipc:///tmp/nrm-downstream-event"
......@@ -49,7 +50,7 @@ class PerfWrapper(object):
logger.info("downstream pub socket connected to: %s", downstream_url)
# retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
self.container_uuid = os.environ.get("ARGO_CONTAINER_UUID")
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
......@@ -57,20 +58,29 @@ class PerfWrapper(object):
logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon
self.downstream_event.send(
tag="start",
container_uuid=self.container_uuid,
application_uuid=self.app_uuid)
tag="start",
container_uuid=self.container_uuid,
application_uuid=self.app_uuid,
)
def main(self):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("-f", "--frequency",
help="sampling frequency in ms",
type=int, default=1000)
parser.add_argument("cmd", help="command and arguments",
nargs=argparse.REMAINDER)
parser.add_argument(
"-v",
"--verbose",
help="verbose logging information",
action="store_true",
)
parser.add_argument(
"-f",
"--frequency",
help="sampling frequency in ms",
type=int,
default=1000,
)
parser.add_argument(
"cmd", help="command and arguments", nargs=argparse.REMAINDER
)
args = parser.parse_args()
if args.verbose:
......@@ -84,13 +94,24 @@ class PerfWrapper(object):
# There is no mkstemp for FIFOs but we can securely create a temporary
# directory and then create a FIFO inside of it.
tmpdir = tempfile.mkdtemp()
fifoname = os.path.join(tmpdir, 'perf-fifo')
fifoname = os.path.join(tmpdir, "perf-fifo")
logger.info("fifoname: %r", fifoname)
os.mkfifo(fifoname, 0o600)
perf_tool_path = os.environ.get('PERF', 'perf')
argv = [perf_tool_path, 'stat', '-e', 'instructions', '-x', ',',
'-I', str(args.frequency), '-o', fifoname, '--']
perf_tool_path = os.environ.get("PERF", "perf")
argv = [
perf_tool_path,
"stat",
"-e",
"instructions",
"-x",
",",
"-I",
str(args.frequency),
"-o",
fifoname,
"--",
]
argv.extend(args.cmd)
logger.info("argv: %r", argv)
......@@ -99,7 +120,7 @@ class PerfWrapper(object):
# This blocks until the other end opens as well so we need to invoke
# it after Popen.
# FIXME: will deadlock if Popen fails (say, no perf).
fifo = open(fifoname, 'r')
fifo = open(fifoname, "r")
last_time = 0.0
# "for line in fifo" idiom didn't work for me here -- Python was
......@@ -110,14 +131,14 @@ class PerfWrapper(object):
break
line = line.strip()
if len(line) == 0 or line[0] == '#':
if len(line) == 0 or line[0] == "#":
continue
tokens = line.split(',')
tokens = line.split(",")
logger.info("tokens: %r", tokens)
time = float(tokens[0])
if tokens[1] == '<not counted>':
if tokens[1] == "<not counted>":
instructions = 0
else:
instructions = int(tokens[1])
......
#!/usr/bin/env python2
#!/usr/bin/env python3
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
......@@ -23,103 +23,117 @@ def main(argv=None):
argv = sys.argv
conf_parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False
)
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False,
)
conf_parser.add_argument(
"-c",
"--configuration",
help="Specify a config json-formatted config file to override "
"any of the available CLI options. If an option is "
"actually provided on the command-line, it overrides "
"its corresponding value from the configuration file.",
metavar="FILE")
conf_parser.add_argument("-d", "--print_defaults", action='store_true',
help="Print the default configuration file.")
"-c",
"--configuration",
help="Specify a config json-formatted config file to override "
"any of the available CLI options. If an option is "
"actually provided on the command-line, it overrides "
"its corresponding value from the configuration file.",
metavar="FILE",
)
conf_parser.add_argument(
"-d",
"--print_defaults",
action="store_true",
help="Print the default configuration file.",
)
args, remaining_argv = conf_parser.parse_known_args()
defaults = {"nrm_log": "/tmp/nrm.log",
"hwloc": "hwloc",
"perf": "perf",
"argo_perf_wrapper": "nrm-perfwrapper",
"argo_nodeos_config": "argo_nodeos_config",
"pmpi_lib": "/usr/lib/libnrm-pmpi.so",
"singularity": "singularity",
"container_runtime": "nodeos",
}
defaults = {
"nrm_log": "/tmp/nrm.log",
"hwloc": "hwloc",
"perf": "perf",
"argo_perf_wrapper": "nrm-perfwrapper",
"argo_nodeos_config": "argo_nodeos_config",
"pmpi_lib": "/usr/lib/libnrm-pmpi.so",
"singularity": "singularity",
"container_runtime": "nodeos",
}
if args.print_defaults:
print defaults
return(0)
print(defaults)
return 0
if args.configuration:
defaults.update(json.load(open(args.configuration)))
parser = argparse.ArgumentParser(parents=[conf_parser])
parser.set_defaults(**defaults)
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="store_true")
parser.add_argument(
"--nrm_log",
help="Main log file. Override default with the NRM_LOG "
"environment variable",
default=os.environ.get('NRM_LOG',
'/tmp/nrm.log'))
"-v",
"--verbose",
help="increase output verbosity",
action="store_true",
)
parser.add_argument(
"--nrm_log",
help="Main log file. Override default with the NRM_LOG "
"environment variable",
default=os.environ.get("NRM_LOG", "/tmp/nrm.log"),
)
parser.add_argument(
'--hwloc',
help="Path to the hwloc to use. This path can be "
"relative and makes uses of the $PATH if necessary. "
"Override default with the HWLOC environment "
"variable.",
default=os.environ.get('HWLOC',
'hwloc'))
"--hwloc",
help="Path to the hwloc to use. This path can be "
"relative and makes uses of the $PATH if necessary. "
"Override default with the HWLOC environment "
"variable.",
default=os.environ.get("HWLOC", "hwloc"),
)
parser.add_argument(
'--argo_nodeos_config',
help="Path to the argo_nodeos_config to use. This path "
"can be relative and makes uses of the $PATH if "
"necessary. Override default with the "
"ARGO_NODEOS_CONFIG environment variable.",
default=os.environ.get('ARGO_NODEOS_CONFIG',
'argo_nodeos_config'))
"--argo_nodeos_config",
help="Path to the argo_nodeos_config to use. This path "
"can be relative and makes uses of the $PATH if "
"necessary. Override default with the "
"ARGO_NODEOS_CONFIG environment variable.",
default=os.environ.get("ARGO_NODEOS_CONFIG", "argo_nodeos_config"),
)
parser.add_argument(
'--perf',
help="Path to the linux perf tool to use. This path can be "
"relative and makes uses of the $PATH if necessary. "
"Override default with the PERF environment "
"variable.",
default=os.environ.get('PERF',
'perf'))
"--perf",
help="Path to the linux perf tool to use. This path can be "
"relative and makes uses of the $PATH if necessary. "
"Override default with the PERF environment "
"variable.",
default=os.environ.get("PERF", "perf"),
)
parser.add_argument(
'--pmpi_lib',
help="Path to the libnrm PMPI library used for the power policy. "
"Override default with the PMPI environment variable.",
default=os.environ.get('PMPI', defaults['pmpi_lib']))
"--pmpi_lib",
help="Path to the libnrm PMPI library used for the power policy. "
"Override default with the PMPI environment variable.",
default=os.environ.get("PMPI", defaults["pmpi_lib"]),
)
parser.add_argument(
'--argo_perf_wrapper',
help="Path to the linux perf tool to use. This path can "
"be relative and makes uses of the $PATH if "
"necessary. Override default with the PERFWRAPPER "
"environment variable.",
default=os.environ.get('ARGO_PERF_WRAPPER',
'nrm-perfwrapper'))
"--argo_perf_wrapper",
help="Path to the linux perf tool to use. This path can "
"be relative and makes uses of the $PATH if "
"necessary. Override default with the PERFWRAPPER "
"environment variable.",
default=os.environ.get("ARGO_PERF_WRAPPER", "nrm-perfwrapper"),
)
parser.add_argument(
'--singularity',
help="Path to the singularity command. "
"Override default with the SINGULARITY environment variable.",
default=os.environ.get('SINGULARITY', defaults['singularity']))
"--singularity",
help="Path to the singularity command. "
"Override default with the SINGULARITY environment variable.",
default=os.environ.get("SINGULARITY", defaults["singularity"]),
)
parser.add_argument(
'--container-runtime',
help="Choice of container runtime. "
"Override default with the ARGO_CONTAINER_RUNTIME "
"environment variable.",
choices=['nodeos', 'singularity'],
default=os.environ.get('ARGO_CONTAINER_RUNTIME',
defaults['container_runtime']))
"--container-runtime",
help="Choice of container runtime. "
"Override default with the ARGO_CONTAINER_RUNTIME "
"environment variable.",
choices=["nodeos", "singularity"],
default=os.environ.get(
"ARGO_CONTAINER_RUNTIME", defaults["container_runtime"]
),
)
args = parser.parse_args(remaining_argv)
nrm.daemon.runner(config=args)
return(0)
return 0
if __name__ == "__main__":
......
......@@ -5,7 +5,11 @@ rec {
nrm = pkgs.nrm;
hack = nrm.overrideAttrs (old:{
buildInputs = old.buildInputs ++ [
pkgs.pythonPackages.flake8
pkgs.pythonPackages.sphinx ];
pkgs.flake8
pkgs.black
pkgs.mypy
pkgs.pytype
pkgs.sphinx
];
});
}
......@@ -10,13 +10,13 @@
"""Parse and Represent the APPC ACI specification."""
import logging
from schema import loadschema
from nrm.schema import loadschema
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
def has(self, f):
return(f in self.app.keys())
return f in self.app.keys()
ImageManifest = loadschema("yml", "manifest")
......
......@@ -12,28 +12,30 @@ from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
logger = logging.getLogger("nrm")
class Application(object):
"""Information about a downstream API user."""
thread_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'noop': 'max'},
's_ask_d': {'done': 'stable', 'noop': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'noop': 'noop'},
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
thread_fsm_table = {
"stable": {"i": "s_ask_i", "d": "s_ask_d"},
"s_ask_i": {"done": "stable", "noop": "max"},
"s_ask_d": {"done": "stable", "noop": "min"},
"max": {"d": "max_ask_d"},
"min": {"i": "min_ask_i"},
"max_ask_d": {"done": "stable", "noop": "noop"},
"min_ask_i": {"done": "stable", "noop": "noop"},
"noop": {},
}
def __init__(self, uuid, container, progress, threads, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.thread_state = "stable"
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
......@@ -49,21 +51,21 @@ class Application(object):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
speed = float(self.progress) / float(self.threads["cur"])
if command == "i":
return speed
else:
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
newth = msg["payload"]
curth = self.threads["cur"]
if newth == curth:
self.do_thread_transition('noop')
self.do_thread_transition("noop")
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
self.do_thread_transition("done")
self.threads["cur"] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
......@@ -75,10 +77,11 @@ class Application(object):
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = int(msg.cpu)
self.phase_contexts[id] = {k: getattr(msg, k) for k in
('aggregation', 'computetime',
'totaltime')}
self.phase_contexts[id]['set'] = True
self.phase_contexts[id] = {
k: getattr(msg, k)
for k in ("aggregation", "computetime", "totaltime")