...
 
Commits (20)
......@@ -9,6 +9,7 @@ six = "==1.11.0"
pyzmq = "==16.0.4"
tornado = "==4.5.3"
numpy = "*"
argparse = "*"
[dev-packages]
pytest = "*"
......
......@@ -18,6 +18,14 @@ And entering the resulting virtual environment with `pipenv shell`.
The NRM code only supports _argo-containers_ for now, so you need to install
the our container piece on the system for now.
### Alternative - Nix usage.
These dependencies can be obtained using the nix package manager.
```
nix-shell https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz -A nodelevel.nrm
```
## Basic Usage
Launch the `daemon`, and use `cmd` to interact with it.
......
......@@ -12,7 +12,6 @@ import uuid
logger = logging.getLogger('perf-wrapper')
class PerfWrapper(object):
"""Implements middleware between the Linux perf and
......@@ -30,9 +29,10 @@ class PerfWrapper(object):
def progress_report(self, progress):
update = {'type': 'application',
'event': 'progress',
'event': 'hardware-progress',
'payload': progress,
'uuid': self.app_uuid,
'container': self.container_uuid,
}
self.downstream_pub_socket.send_json(update)
......@@ -49,6 +49,8 @@ class PerfWrapper(object):
# retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
# logger.error("container uuid:",str(self.container_uuid))
# logger.error("environ:",dict(os.environ))
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
......@@ -73,6 +75,9 @@ class PerfWrapper(object):
parser.add_argument("-f", "--frequency",
help="sampling frequency in ms",
type=int, default=1000)
parser.add_argument("-l", "--logfile",
help="perf-wrapper log file",
type=str, default="/tmp/argo-perf-wrapper.log")
parser.add_argument("cmd", help="command and arguments",
nargs=argparse.REMAINDER)
args = parser.parse_args()
......@@ -80,6 +85,10 @@ class PerfWrapper(object):
if args.verbose:
logger.setLevel(logging.DEBUG)
if args.logfile:
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.FileHandler(args.logfile))
logger.info("cmd: %r", args.cmd)
self.setup()
......@@ -141,6 +150,6 @@ class PerfWrapper(object):
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
# logging.basicConfig(level=logging.INFO)
wrapper = PerfWrapper()
wrapper.main()
......@@ -6,6 +6,7 @@ import logging
import signal
import os
import nrm.messaging
import uuid
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
......@@ -49,7 +50,7 @@ class CommandLineInterface(object):
'path': argv.command,
'args': argv.args,
'environ': dict(environ),
'container_uuid': str(argv.ucontainername),
'container_uuid': argv.ucontainername or str(uuid.uuid4()),
}
msg = RPC_MSG['run'](**command)
# command fsm
......@@ -58,28 +59,23 @@ class CommandLineInterface(object):
erreof = False
exitmsg = None
self.client.sendmsg(msg)
# the first message tells us if we started a container or not
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'process_start']
new_container = False
if msg.type == 'start':
new_container = True
msg = self.client.recvmsg()
assert msg.type == 'process_start'
state = 'started'
while(True):
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'stdout', 'stderr', 'exit',
'process_start', 'process_exit']
assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
if msg.type == 'start':
if state == 'init':
state = 'started'
logger.info("container started: %r", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'process_start':
if state == 'init':
state = 'started'
logger.info("process started in existing "
"container: %r""", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'stdout':
if msg.type == 'stdout':
logger.info("container msg: %r", msg)
if msg.payload == 'eof':
outeof = True
......@@ -89,7 +85,8 @@ class CommandLineInterface(object):
erreof = True
elif msg.type == 'process_exit':
logger.info("process ended: %r", msg)
break
if not new_container:
state = 'exiting'
elif msg.type == 'exit':
if state == 'started':
state = 'exiting'
......
#!/usr/bin/env python2
import argparse
import json
import sys
import nrm
import nrm.daemon
def main(argv=None):
if argv is None:
argv = sys.argv
conf_parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False
)
conf_parser.add_argument("-c", "--configuration",
help="Specify a config json-formatted config file that use any \
CLI option. Any actually provided command-line options \
will silently override a corresponding value from the \
configuration file, if it exists.", metavar="FILE")
conf_parser.add_argument("-d","--print_defaults", action='store_true'
, help="Print the default configuration file.")
args, remaining_argv = conf_parser.parse_known_args()
defaults = { "log":"/tmp/argo_daemon.log"
, "log_power":"/tmp/argo_daemon_power.log"
, "log_progress":"/tmp/argo_daemon_progress.log"
, "log_hardwareprogress":"/tmp/argo_daemon_hardwareprogress.log"
, "powerstrategy":"0" }
if args.print_defaults:
print defaults
return(0)
if args.configuration:
defaults.update(json.load(open(args.configuration)))
parser = argparse.ArgumentParser( parents=[conf_parser])
parser.set_defaults(**defaults)
parser.add_argument("--log", help="Main log file.",metavar="FILE")
parser.add_argument("--log_power", help="Power data log file (High throughput).",metavar="FILE")
parser.add_argument("--log_progress",metavar="FILE")
parser.add_argument("--log_hardwareprogress",metavar="FILE")
parser.add_argument("--powerstrategy", help="Power management strategy.",metavar="STRATEGY")
args = parser.parse_args(remaining_argv)
nrm.daemon.runner(config=args)
return(0)
if __name__ == "__main__":
nrm.daemon.runner()
sys.exit(main())
{
"log":"/tmp/daemon_log.log"
"log_power":"/tmp/daemon_power.log"
"log_progress":"/tmp/daemon_progress.log"
"log_hardwareprogress":"/tmp/daemon_hardwareprogress.log"
}
from __future__ import print_function
import logging
import time
logger = logging.getLogger('nrm')
logger_progress = logging.getLogger('progress')
logger_hardwareprogress = logging.getLogger('hardwareprogress')
class Application(object):
......@@ -18,46 +20,26 @@ class Application(object):
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads, phase_contexts):
def __init__(self, uuid, container, progress, hardwareprogress, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.hardwareprogress = hardwareprogress
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
"""Update the thread fsm state."""
transitions = self.thread_fsm_table[self.thread_state]
if event in transitions:
self.thread_state = transitions[event]
def get_allowed_thread_requests(self):
return self.thread_fsm_table[self.thread_state].keys()
def get_thread_request_impact(self, command):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
return speed
else:
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
if newth == curth:
self.do_thread_transition('noop')
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
assert self.progress
logger.info("received progress message: "+str(msg))
logger_progress.info("%s %s" % (time.time(),msg['payload']))
def update_hardwareprogress(self, msg):
"""Update the progress tracking."""
logger.info("received progress message: "+str(msg))
logger_hardwareprogress.info("%s %s" % (time.time(),msg['payload']))
if not self.hardwareprogress:
logger.debug("Starting to log hardware progress.")
self.hardwareprogress = True
def update_phase_context(self, msg):
"""Update the phase contextual information."""
......@@ -80,7 +62,7 @@ class ApplicationManager(object):
uuid = msg['uuid']
container_uuid = msg['container']
progress = msg['progress']
threads = msg['threads']
hardwareprogress = None
phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier']
......@@ -91,8 +73,7 @@ class ApplicationManager(object):
phase_contexts[id]['set'] = False
else:
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
self.applications[uuid] = Application(uuid, container_uuid, progress, hardwareprogress, phase_contexts)
def delete(self, uuid):
"""Delete an application from the register."""
......
from __future__ import print_function
import logging
import time
import math
logger = logging.getLogger('nrm')
logger_power = logging.getLogger('power')
class Action(object):
......@@ -15,43 +18,6 @@ class Action(object):
self.delta = delta
class ApplicationActuator(object):
"""Actuator in charge of application thread control."""
def __init__(self, am, pubstream):
self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target):
ret = []
for identity, application in \
self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta))
return ret
def execute(self, action):
target_threads = action.target.threads
update = {'type': 'application',
'command': 'threads',
'uuid': action.target.uuid,
'event': 'threads',
}
if action.command == 'i':
payload = target_threads['cur'] + 1
elif action.command == 'd':
payload = target_threads['cur'] - 1
else:
assert False, "impossible command"
update['payload'] = payload
self.pubstream.send_json(update)
def update(self, action):
action.target.do_thread_transition(action.command)
class PowerActuator(object):
"""Actuator in charge of power control."""
......@@ -74,8 +40,44 @@ class PowerActuator(object):
return actions
def execute(self, action):
logger.info("changing power limit: %r, %r", action.command,
action.delta)
logger.info("changing power limit. command: %r, delta: %r, target: %r",
action.command, action.delta, action.target)
# sensor_manager is a SensorManager, which is not only about sensing
# but also about setting power limits.
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
pass
class DiscretizedPowerActuator(object):
"""Actuator in charge of power control via discretization."""
def __init__(self, sm, lowerboundwatts, k):
self.sensor_manager = sm
self.lowerboundwatts = lowerboundwatts # the minimal cpu wattage
self.k = k # the number of arms
def available_actions(self):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("BanditPowerActuator: power limits %r", pl)
maxW = int(pl[[k for k, i in pl.items()][0]]['maxW'])
if maxW < self.lowerboundwatts:
logger.error("BanditPowerActuator: The provided power lowerbound\
is higher than the available maximum CPU wattage.")
rangeW = maxW - self.lowerboundwatts
arms = [self.lowerboundwatts + (float(a)*rangeW/float(self.k))
for a in range(1, self.k+1)]
logger.info("BanditPowerActuator: discretized power limits: %r:", arms)
actions = [Action([k for k, i in pl.items()][0], int(math.floor(a)), 0)
for a in arms]
return(actions)
def execute(self, action):
logger.info("changing power limit: %r, %r",
action.command, action.delta)
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
......@@ -86,29 +88,18 @@ class Controller(object):
"""Implements a control loop for resource management."""
def __init__(self, actuators):
def __init__(self, actuators, strategy):
self.actuators = actuators
def planify(self, target, machineinfo):
"""Plan the next action for the control loop."""
total_power = machineinfo['energy']['power']['total']
direction = None
if total_power < target:
direction = 'i'
elif total_power > target:
direction = 'd'
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop(0)
else:
return (None, None)
# current_e = float(machineinfo['energy']['energy']
# ['cumulative']['package-0'])/(1000*1000) # in joules
# In joules:
current_p = float(machineinfo['energy']['power']['p0'])/(1000*1000)
current_p = float(machineinfo['energy']['power']['p1'])/(1000*1000)
logger_power.info("%s %s %s" % (time.time(), current_p, current_p))
return (None, None)
def execute(self, action, actuator):
"""Build the action for the appropriate manager."""
......
This diff is collapsed.
......@@ -31,7 +31,6 @@ MSGFORMATS['up_rpc_req'] = {'list': {},
}
MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'errno': int,
'pid': int,
'power': dict},
'list': {'payload': list},
'stdout': {'container_uuid': basestring,
......@@ -39,9 +38,9 @@ MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'stderr': {'container_uuid': basestring,
'payload': basestring},
'exit': {'container_uuid': basestring,
'status': basestring,
'profile_data': dict},
'process_start': {'container_uuid': basestring},
'process_start': {'container_uuid': basestring,
'pid': int},
'process_exit': {'container_uuid': basestring,
'status': basestring},
'getpower': {'limit': basestring},
......
......@@ -4,6 +4,7 @@ import logging
import xml.etree.ElementTree
import tornado.process as process
import subprocess
import os
logger = logging.getLogger('nrm')
resources = collections.namedtuple("Resources", ["cpus", "mems"])
......@@ -42,7 +43,14 @@ class NodeOSClient(object):
def __init__(self):
"""Load client configuration."""
self.prefix = "argo_nodeos_config"
if 'ARGO_NODEOS_CONFIG' in os.environ:
logger.info("NodeOSClient: bypassing argo_nodeos_config with %s\n"
% os.environ['ARGO_NODEOS_CONFIG'])
self.prefix = os.environ['ARGO_NODEOS_CONFIG']
else:
logger.info("NodeOSClient: using argo_nodeos_config from path")
self.prefix = "argo_nodeos_config"
def getavailable(self):
"""Gather available resources."""
......@@ -118,6 +126,7 @@ class NodeOSClient(object):
for envname, envval in environ.items()]
cmd += " env:'"+" ".join(env)+"'"
args.append(cmd)
logger.debug(args)
return process.Subprocess(args, stdin=process.Subprocess.STREAM,
stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM)
......