...
 
Commits (20)
...@@ -9,6 +9,7 @@ six = "==1.11.0" ...@@ -9,6 +9,7 @@ six = "==1.11.0"
pyzmq = "==16.0.4" pyzmq = "==16.0.4"
tornado = "==4.5.3" tornado = "==4.5.3"
numpy = "*" numpy = "*"
argparse = "*"
[dev-packages] [dev-packages]
pytest = "*" pytest = "*"
......
...@@ -18,6 +18,14 @@ And entering the resulting virtual environment with `pipenv shell`. ...@@ -18,6 +18,14 @@ And entering the resulting virtual environment with `pipenv shell`.
The NRM code only supports _argo-containers_ for now, so you need to install The NRM code only supports _argo-containers_ for now, so you need to install
the our container piece on the system for now. the our container piece on the system for now.
### Alternative - Nix usage.
These dependencies can be obtained using the nix package manager.
```
nix-shell https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz -A nodelevel.nrm
```
## Basic Usage ## Basic Usage
Launch the `daemon`, and use `cmd` to interact with it. Launch the `daemon`, and use `cmd` to interact with it.
......
...@@ -12,7 +12,6 @@ import uuid ...@@ -12,7 +12,6 @@ import uuid
logger = logging.getLogger('perf-wrapper') logger = logging.getLogger('perf-wrapper')
class PerfWrapper(object): class PerfWrapper(object):
"""Implements middleware between the Linux perf and """Implements middleware between the Linux perf and
...@@ -30,9 +29,10 @@ class PerfWrapper(object): ...@@ -30,9 +29,10 @@ class PerfWrapper(object):
def progress_report(self, progress): def progress_report(self, progress):
update = {'type': 'application', update = {'type': 'application',
'event': 'progress', 'event': 'hardware-progress',
'payload': progress, 'payload': progress,
'uuid': self.app_uuid, 'uuid': self.app_uuid,
'container': self.container_uuid,
} }
self.downstream_pub_socket.send_json(update) self.downstream_pub_socket.send_json(update)
...@@ -49,6 +49,8 @@ class PerfWrapper(object): ...@@ -49,6 +49,8 @@ class PerfWrapper(object):
# retrieve our container uuid # retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID') self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
# logger.error("container uuid:",str(self.container_uuid))
# logger.error("environ:",dict(os.environ))
if self.container_uuid is None: if self.container_uuid is None:
logger.error("missing container uuid") logger.error("missing container uuid")
exit(1) exit(1)
...@@ -73,6 +75,9 @@ class PerfWrapper(object): ...@@ -73,6 +75,9 @@ class PerfWrapper(object):
parser.add_argument("-f", "--frequency", parser.add_argument("-f", "--frequency",
help="sampling frequency in ms", help="sampling frequency in ms",
type=int, default=1000) type=int, default=1000)
parser.add_argument("-l", "--logfile",
help="perf-wrapper log file",
type=str, default="/tmp/argo-perf-wrapper.log")
parser.add_argument("cmd", help="command and arguments", parser.add_argument("cmd", help="command and arguments",
nargs=argparse.REMAINDER) nargs=argparse.REMAINDER)
args = parser.parse_args() args = parser.parse_args()
...@@ -80,6 +85,10 @@ class PerfWrapper(object): ...@@ -80,6 +85,10 @@ class PerfWrapper(object):
if args.verbose: if args.verbose:
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
if args.logfile:
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.FileHandler(args.logfile))
logger.info("cmd: %r", args.cmd) logger.info("cmd: %r", args.cmd)
self.setup() self.setup()
...@@ -141,6 +150,6 @@ class PerfWrapper(object): ...@@ -141,6 +150,6 @@ class PerfWrapper(object):
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING) # logging.basicConfig(level=logging.INFO)
wrapper = PerfWrapper() wrapper = PerfWrapper()
wrapper.main() wrapper.main()
...@@ -6,6 +6,7 @@ import logging ...@@ -6,6 +6,7 @@ import logging
import signal import signal
import os import os
import nrm.messaging import nrm.messaging
import uuid
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req'] RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
...@@ -49,7 +50,7 @@ class CommandLineInterface(object): ...@@ -49,7 +50,7 @@ class CommandLineInterface(object):
'path': argv.command, 'path': argv.command,
'args': argv.args, 'args': argv.args,
'environ': dict(environ), 'environ': dict(environ),
'container_uuid': str(argv.ucontainername), 'container_uuid': argv.ucontainername or str(uuid.uuid4()),
} }
msg = RPC_MSG['run'](**command) msg = RPC_MSG['run'](**command)
# command fsm # command fsm
...@@ -58,28 +59,23 @@ class CommandLineInterface(object): ...@@ -58,28 +59,23 @@ class CommandLineInterface(object):
erreof = False erreof = False
exitmsg = None exitmsg = None
self.client.sendmsg(msg) self.client.sendmsg(msg)
# the first message tells us if we started a container or not
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'process_start']
new_container = False
if msg.type == 'start':
new_container = True
msg = self.client.recvmsg()
assert msg.type == 'process_start'
state = 'started'
while(True): while(True):
msg = self.client.recvmsg() msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep' assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'stdout', 'stderr', 'exit', assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
'process_start', 'process_exit']
if msg.type == 'start': if msg.type == 'stdout':
if state == 'init':
state = 'started'
logger.info("container started: %r", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'process_start':
if state == 'init':
state = 'started'
logger.info("process started in existing "
"container: %r""", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'stdout':
logger.info("container msg: %r", msg) logger.info("container msg: %r", msg)
if msg.payload == 'eof': if msg.payload == 'eof':
outeof = True outeof = True
...@@ -89,7 +85,8 @@ class CommandLineInterface(object): ...@@ -89,7 +85,8 @@ class CommandLineInterface(object):
erreof = True erreof = True
elif msg.type == 'process_exit': elif msg.type == 'process_exit':
logger.info("process ended: %r", msg) logger.info("process ended: %r", msg)
break if not new_container:
state = 'exiting'
elif msg.type == 'exit': elif msg.type == 'exit':
if state == 'started': if state == 'started':
state = 'exiting' state = 'exiting'
......
#!/usr/bin/env python2 #!/usr/bin/env python2
import argparse
import json
import sys
import nrm import nrm
import nrm.daemon import nrm.daemon
def main(argv=None):
if argv is None:
argv = sys.argv
conf_parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False
)
conf_parser.add_argument("-c", "--configuration",
help="Specify a config json-formatted config file that use any \
CLI option. Any actually provided command-line options \
will silently override a corresponding value from the \
configuration file, if it exists.", metavar="FILE")
conf_parser.add_argument("-d","--print_defaults", action='store_true'
, help="Print the default configuration file.")
args, remaining_argv = conf_parser.parse_known_args()
defaults = { "log":"/tmp/argo_daemon.log"
, "log_power":"/tmp/argo_daemon_power.log"
, "log_progress":"/tmp/argo_daemon_progress.log"
, "log_hardwareprogress":"/tmp/argo_daemon_hardwareprogress.log"
, "powerstrategy":"0" }
if args.print_defaults:
print defaults
return(0)
if args.configuration:
defaults.update(json.load(open(args.configuration)))
parser = argparse.ArgumentParser( parents=[conf_parser])
parser.set_defaults(**defaults)
parser.add_argument("--log", help="Main log file.",metavar="FILE")
parser.add_argument("--log_power", help="Power data log file (High throughput).",metavar="FILE")
parser.add_argument("--log_progress",metavar="FILE")
parser.add_argument("--log_hardwareprogress",metavar="FILE")
parser.add_argument("--powerstrategy", help="Power management strategy.",metavar="STRATEGY")
args = parser.parse_args(remaining_argv)
nrm.daemon.runner(config=args)
return(0)
if __name__ == "__main__": if __name__ == "__main__":
nrm.daemon.runner() sys.exit(main())
{
"log":"/tmp/daemon_log.log"
"log_power":"/tmp/daemon_power.log"
"log_progress":"/tmp/daemon_progress.log"
"log_hardwareprogress":"/tmp/daemon_hardwareprogress.log"
}
from __future__ import print_function from __future__ import print_function
import logging import logging
import time
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
logger_progress = logging.getLogger('progress')
logger_hardwareprogress = logging.getLogger('hardwareprogress')
class Application(object): class Application(object):
...@@ -18,46 +20,26 @@ class Application(object): ...@@ -18,46 +20,26 @@ class Application(object):
'min_ask_i': {'done': 'stable', 'noop': 'noop'}, 'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}} 'noop': {}}
def __init__(self, uuid, container, progress, threads, phase_contexts): def __init__(self, uuid, container, progress, hardwareprogress, phase_contexts):
self.uuid = uuid self.uuid = uuid
self.container_uuid = container self.container_uuid = container
self.progress = progress self.progress = progress
self.threads = threads self.hardwareprogress = hardwareprogress
self.thread_state = 'stable'
self.phase_contexts = phase_contexts self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
"""Update the thread fsm state."""
transitions = self.thread_fsm_table[self.thread_state]
if event in transitions:
self.thread_state = transitions[event]
def get_allowed_thread_requests(self):
return self.thread_fsm_table[self.thread_state].keys()
def get_thread_request_impact(self, command):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
return speed
else:
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
if newth == curth:
self.do_thread_transition('noop')
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
def update_progress(self, msg): def update_progress(self, msg):
"""Update the progress tracking.""" """Update the progress tracking."""
assert self.progress assert self.progress
logger.info("received progress message: "+str(msg))
logger_progress.info("%s %s" % (time.time(),msg['payload']))
def update_hardwareprogress(self, msg):
"""Update the progress tracking."""
logger.info("received progress message: "+str(msg))
logger_hardwareprogress.info("%s %s" % (time.time(),msg['payload']))
if not self.hardwareprogress:
logger.debug("Starting to log hardware progress.")
self.hardwareprogress = True
def update_phase_context(self, msg): def update_phase_context(self, msg):
"""Update the phase contextual information.""" """Update the phase contextual information."""
...@@ -80,7 +62,7 @@ class ApplicationManager(object): ...@@ -80,7 +62,7 @@ class ApplicationManager(object):
uuid = msg['uuid'] uuid = msg['uuid']
container_uuid = msg['container'] container_uuid = msg['container']
progress = msg['progress'] progress = msg['progress']
threads = msg['threads'] hardwareprogress = None
phase_contexts = dict() phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute', phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier'] 'startbarrier', 'endbarrier']
...@@ -91,8 +73,7 @@ class ApplicationManager(object): ...@@ -91,8 +73,7 @@ class ApplicationManager(object):
phase_contexts[id]['set'] = False phase_contexts[id]['set'] = False
else: else:
phase_contexts = None phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress, self.applications[uuid] = Application(uuid, container_uuid, progress, hardwareprogress, phase_contexts)
threads, phase_contexts)
def delete(self, uuid): def delete(self, uuid):
"""Delete an application from the register.""" """Delete an application from the register."""
......
from __future__ import print_function from __future__ import print_function
import logging import logging
import time
import math
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
logger_power = logging.getLogger('power')
class Action(object): class Action(object):
...@@ -15,43 +18,6 @@ class Action(object): ...@@ -15,43 +18,6 @@ class Action(object):
self.delta = delta self.delta = delta
class ApplicationActuator(object):
"""Actuator in charge of application thread control."""
def __init__(self, am, pubstream):
self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target):
ret = []
for identity, application in \
self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta))
return ret
def execute(self, action):
target_threads = action.target.threads
update = {'type': 'application',
'command': 'threads',
'uuid': action.target.uuid,
'event': 'threads',
}
if action.command == 'i':
payload = target_threads['cur'] + 1
elif action.command == 'd':
payload = target_threads['cur'] - 1
else:
assert False, "impossible command"
update['payload'] = payload
self.pubstream.send_json(update)
def update(self, action):
action.target.do_thread_transition(action.command)
class PowerActuator(object): class PowerActuator(object):
"""Actuator in charge of power control.""" """Actuator in charge of power control."""
...@@ -74,8 +40,44 @@ class PowerActuator(object): ...@@ -74,8 +40,44 @@ class PowerActuator(object):
return actions return actions
def execute(self, action): def execute(self, action):
logger.info("changing power limit: %r, %r", action.command, logger.info("changing power limit. command: %r, delta: %r, target: %r",
action.delta) action.command, action.delta, action.target)
# sensor_manager is a SensorManager, which is not only about sensing
# but also about setting power limits.
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
pass
class DiscretizedPowerActuator(object):
"""Actuator in charge of power control via discretization."""
def __init__(self, sm, lowerboundwatts, k):
self.sensor_manager = sm
self.lowerboundwatts = lowerboundwatts # the minimal cpu wattage
self.k = k # the number of arms
def available_actions(self):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("BanditPowerActuator: power limits %r", pl)
maxW = int(pl[[k for k, i in pl.items()][0]]['maxW'])
if maxW < self.lowerboundwatts:
logger.error("BanditPowerActuator: The provided power lowerbound\
is higher than the available maximum CPU wattage.")
rangeW = maxW - self.lowerboundwatts
arms = [self.lowerboundwatts + (float(a)*rangeW/float(self.k))
for a in range(1, self.k+1)]
logger.info("BanditPowerActuator: discretized power limits: %r:", arms)
actions = [Action([k for k, i in pl.items()][0], int(math.floor(a)), 0)
for a in arms]
return(actions)
def execute(self, action):
logger.info("changing power limit: %r, %r",
action.command, action.delta)
self.sensor_manager.set_powerlimit(action.target, action.command) self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action): def update(self, action):
...@@ -86,29 +88,18 @@ class Controller(object): ...@@ -86,29 +88,18 @@ class Controller(object):
"""Implements a control loop for resource management.""" """Implements a control loop for resource management."""
def __init__(self, actuators): def __init__(self, actuators, strategy):
self.actuators = actuators self.actuators = actuators
def planify(self, target, machineinfo): def planify(self, target, machineinfo):
"""Plan the next action for the control loop.""" """Plan the next action for the control loop."""
total_power = machineinfo['energy']['power']['total'] # current_e = float(machineinfo['energy']['energy']
direction = None # ['cumulative']['package-0'])/(1000*1000) # in joules
if total_power < target: # In joules:
direction = 'i' current_p = float(machineinfo['energy']['power']['p0'])/(1000*1000)
elif total_power > target: current_p = float(machineinfo['energy']['power']['p1'])/(1000*1000)
direction = 'd' logger_power.info("%s %s %s" % (time.time(), current_p, current_p))
return (None, None)
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop(0)
else:
return (None, None)
def execute(self, action, actuator): def execute(self, action, actuator):
"""Build the action for the appropriate manager.""" """Build the action for the appropriate manager."""
......
This diff is collapsed.
...@@ -31,7 +31,6 @@ MSGFORMATS['up_rpc_req'] = {'list': {}, ...@@ -31,7 +31,6 @@ MSGFORMATS['up_rpc_req'] = {'list': {},
} }
MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring, MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'errno': int, 'errno': int,
'pid': int,
'power': dict}, 'power': dict},
'list': {'payload': list}, 'list': {'payload': list},
'stdout': {'container_uuid': basestring, 'stdout': {'container_uuid': basestring,
...@@ -39,9 +38,9 @@ MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring, ...@@ -39,9 +38,9 @@ MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'stderr': {'container_uuid': basestring, 'stderr': {'container_uuid': basestring,
'payload': basestring}, 'payload': basestring},
'exit': {'container_uuid': basestring, 'exit': {'container_uuid': basestring,
'status': basestring,
'profile_data': dict}, 'profile_data': dict},
'process_start': {'container_uuid': basestring}, 'process_start': {'container_uuid': basestring,
'pid': int},
'process_exit': {'container_uuid': basestring, 'process_exit': {'container_uuid': basestring,
'status': basestring}, 'status': basestring},
'getpower': {'limit': basestring}, 'getpower': {'limit': basestring},
......
...@@ -4,6 +4,7 @@ import logging ...@@ -4,6 +4,7 @@ import logging
import xml.etree.ElementTree import xml.etree.ElementTree
import tornado.process as process import tornado.process as process
import subprocess import subprocess
import os
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
resources = collections.namedtuple("Resources", ["cpus", "mems"]) resources = collections.namedtuple("Resources", ["cpus", "mems"])
...@@ -42,7 +43,14 @@ class NodeOSClient(object): ...@@ -42,7 +43,14 @@ class NodeOSClient(object):
def __init__(self): def __init__(self):
"""Load client configuration.""" """Load client configuration."""
self.prefix = "argo_nodeos_config" if 'ARGO_NODEOS_CONFIG' in os.environ:
logger.info("NodeOSClient: bypassing argo_nodeos_config with %s\n"
% os.environ['ARGO_NODEOS_CONFIG'])
self.prefix = os.environ['ARGO_NODEOS_CONFIG']
else:
logger.info("NodeOSClient: using argo_nodeos_config from path")
self.prefix = "argo_nodeos_config"
def getavailable(self): def getavailable(self):
"""Gather available resources.""" """Gather available resources."""
...@@ -118,6 +126,7 @@ class NodeOSClient(object): ...@@ -118,6 +126,7 @@ class NodeOSClient(object):
for envname, envval in environ.items()] for envname, envval in environ.items()]
cmd += " env:'"+" ".join(env)+"'" cmd += " env:'"+" ".join(env)+"'"
args.append(cmd) args.append(cmd)
logger.debug(args)
return process.Subprocess(args, stdin=process.Subprocess.STREAM, return process.Subprocess(args, stdin=process.Subprocess.STREAM,
stdout=process.Subprocess.STREAM, stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM) stderr=process.Subprocess.STREAM)
......