...
 
Commits (20)
......@@ -9,6 +9,7 @@ six = "==1.11.0"
pyzmq = "==16.0.4"
tornado = "==4.5.3"
numpy = "*"
argparse = "*"
[dev-packages]
pytest = "*"
......
......@@ -18,6 +18,14 @@ And entering the resulting virtual environment with `pipenv shell`.
The NRM code only supports _argo-containers_ for now, so you need to install
the our container piece on the system for now.
### Alternative - Nix usage.
These dependencies can be obtained using the nix package manager.
```
nix-shell https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz -A nodelevel.nrm
```
## Basic Usage
Launch the `daemon`, and use `cmd` to interact with it.
......
......@@ -12,7 +12,6 @@ import uuid
logger = logging.getLogger('perf-wrapper')
class PerfWrapper(object):
"""Implements middleware between the Linux perf and
......@@ -30,9 +29,10 @@ class PerfWrapper(object):
def progress_report(self, progress):
update = {'type': 'application',
'event': 'progress',
'event': 'hardware-progress',
'payload': progress,
'uuid': self.app_uuid,
'container': self.container_uuid,
}
self.downstream_pub_socket.send_json(update)
......@@ -49,6 +49,8 @@ class PerfWrapper(object):
# retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
# logger.error("container uuid:",str(self.container_uuid))
# logger.error("environ:",dict(os.environ))
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
......@@ -73,6 +75,9 @@ class PerfWrapper(object):
parser.add_argument("-f", "--frequency",
help="sampling frequency in ms",
type=int, default=1000)
parser.add_argument("-l", "--logfile",
help="perf-wrapper log file",
type=str, default="/tmp/argo-perf-wrapper.log")
parser.add_argument("cmd", help="command and arguments",
nargs=argparse.REMAINDER)
args = parser.parse_args()
......@@ -80,6 +85,10 @@ class PerfWrapper(object):
if args.verbose:
logger.setLevel(logging.DEBUG)
if args.logfile:
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.FileHandler(args.logfile))
logger.info("cmd: %r", args.cmd)
self.setup()
......@@ -141,6 +150,6 @@ class PerfWrapper(object):
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
# logging.basicConfig(level=logging.INFO)
wrapper = PerfWrapper()
wrapper.main()
......@@ -6,6 +6,7 @@ import logging
import signal
import os
import nrm.messaging
import uuid
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
......@@ -49,7 +50,7 @@ class CommandLineInterface(object):
'path': argv.command,
'args': argv.args,
'environ': dict(environ),
'container_uuid': str(argv.ucontainername),
'container_uuid': argv.ucontainername or str(uuid.uuid4()),
}
msg = RPC_MSG['run'](**command)
# command fsm
......@@ -58,28 +59,23 @@ class CommandLineInterface(object):
erreof = False
exitmsg = None
self.client.sendmsg(msg)
# the first message tells us if we started a container or not
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'process_start']
new_container = False
if msg.type == 'start':
new_container = True
msg = self.client.recvmsg()
assert msg.type == 'process_start'
state = 'started'
while(True):
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'stdout', 'stderr', 'exit',
'process_start', 'process_exit']
assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
if msg.type == 'start':
if state == 'init':
state = 'started'
logger.info("container started: %r", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'process_start':
if state == 'init':
state = 'started'
logger.info("process started in existing "
"container: %r""", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'stdout':
if msg.type == 'stdout':
logger.info("container msg: %r", msg)
if msg.payload == 'eof':
outeof = True
......@@ -89,7 +85,8 @@ class CommandLineInterface(object):
erreof = True
elif msg.type == 'process_exit':
logger.info("process ended: %r", msg)
break
if not new_container:
state = 'exiting'
elif msg.type == 'exit':
if state == 'started':
state = 'exiting'
......
#!/usr/bin/env python2
import argparse
import json
import sys
import nrm
import nrm.daemon
def main(argv=None):
if argv is None:
argv = sys.argv
conf_parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False
)
conf_parser.add_argument("-c", "--configuration",
help="Specify a config json-formatted config file that use any \
CLI option. Any actually provided command-line options \
will silently override a corresponding value from the \
configuration file, if it exists.", metavar="FILE")
conf_parser.add_argument("-d","--print_defaults", action='store_true'
, help="Print the default configuration file.")
args, remaining_argv = conf_parser.parse_known_args()
defaults = { "log":"/tmp/argo_daemon.log"
, "log_power":"/tmp/argo_daemon_power.log"
, "log_progress":"/tmp/argo_daemon_progress.log"
, "log_hardwareprogress":"/tmp/argo_daemon_hardwareprogress.log"
, "powerstrategy":"0" }
if args.print_defaults:
print defaults
return(0)
if args.configuration:
defaults.update(json.load(open(args.configuration)))
parser = argparse.ArgumentParser( parents=[conf_parser])
parser.set_defaults(**defaults)
parser.add_argument("--log", help="Main log file.",metavar="FILE")
parser.add_argument("--log_power", help="Power data log file (High throughput).",metavar="FILE")
parser.add_argument("--log_progress",metavar="FILE")
parser.add_argument("--log_hardwareprogress",metavar="FILE")
parser.add_argument("--powerstrategy", help="Power management strategy.",metavar="STRATEGY")
args = parser.parse_args(remaining_argv)
nrm.daemon.runner(config=args)
return(0)
if __name__ == "__main__":
nrm.daemon.runner()
sys.exit(main())
{
"log":"/tmp/daemon_log.log"
"log_power":"/tmp/daemon_power.log"
"log_progress":"/tmp/daemon_progress.log"
"log_hardwareprogress":"/tmp/daemon_hardwareprogress.log"
}
from __future__ import print_function
import logging
import time
logger = logging.getLogger('nrm')
logger_progress = logging.getLogger('progress')
logger_hardwareprogress = logging.getLogger('hardwareprogress')
class Application(object):
......@@ -18,46 +20,26 @@ class Application(object):
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads, phase_contexts):
def __init__(self, uuid, container, progress, hardwareprogress, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.hardwareprogress = hardwareprogress
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
"""Update the thread fsm state."""
transitions = self.thread_fsm_table[self.thread_state]
if event in transitions:
self.thread_state = transitions[event]
def get_allowed_thread_requests(self):
return self.thread_fsm_table[self.thread_state].keys()
def get_thread_request_impact(self, command):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
return speed
else:
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
if newth == curth:
self.do_thread_transition('noop')
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
assert self.progress
logger.info("received progress message: "+str(msg))
logger_progress.info("%s %s" % (time.time(),msg['payload']))
def update_hardwareprogress(self, msg):
"""Update the progress tracking."""
logger.info("received progress message: "+str(msg))
logger_hardwareprogress.info("%s %s" % (time.time(),msg['payload']))
if not self.hardwareprogress:
logger.debug("Starting to log hardware progress.")
self.hardwareprogress = True
def update_phase_context(self, msg):
"""Update the phase contextual information."""
......@@ -80,7 +62,7 @@ class ApplicationManager(object):
uuid = msg['uuid']
container_uuid = msg['container']
progress = msg['progress']
threads = msg['threads']
hardwareprogress = None
phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier']
......@@ -91,8 +73,7 @@ class ApplicationManager(object):
phase_contexts[id]['set'] = False
else:
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
self.applications[uuid] = Application(uuid, container_uuid, progress, hardwareprogress, phase_contexts)
def delete(self, uuid):
"""Delete an application from the register."""
......
from __future__ import print_function
import logging
import time
import math
logger = logging.getLogger('nrm')
logger_power = logging.getLogger('power')
class Action(object):
......@@ -15,43 +18,6 @@ class Action(object):
self.delta = delta
class ApplicationActuator(object):
"""Actuator in charge of application thread control."""
def __init__(self, am, pubstream):
self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target):
ret = []
for identity, application in \
self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta))
return ret
def execute(self, action):
target_threads = action.target.threads
update = {'type': 'application',
'command': 'threads',
'uuid': action.target.uuid,
'event': 'threads',
}
if action.command == 'i':
payload = target_threads['cur'] + 1
elif action.command == 'd':
payload = target_threads['cur'] - 1
else:
assert False, "impossible command"
update['payload'] = payload
self.pubstream.send_json(update)
def update(self, action):
action.target.do_thread_transition(action.command)
class PowerActuator(object):
"""Actuator in charge of power control."""
......@@ -74,8 +40,44 @@ class PowerActuator(object):
return actions
def execute(self, action):
logger.info("changing power limit: %r, %r", action.command,
action.delta)
logger.info("changing power limit. command: %r, delta: %r, target: %r",
action.command, action.delta, action.target)
# sensor_manager is a SensorManager, which is not only about sensing
# but also about setting power limits.
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
pass
class DiscretizedPowerActuator(object):
"""Actuator in charge of power control via discretization."""
def __init__(self, sm, lowerboundwatts, k):
self.sensor_manager = sm
self.lowerboundwatts = lowerboundwatts # the minimal cpu wattage
self.k = k # the number of arms
def available_actions(self):
actions = []
pl = self.sensor_manager.get_powerlimits()
logger.info("BanditPowerActuator: power limits %r", pl)
maxW = int(pl[[k for k, i in pl.items()][0]]['maxW'])
if maxW < self.lowerboundwatts:
logger.error("BanditPowerActuator: The provided power lowerbound\
is higher than the available maximum CPU wattage.")
rangeW = maxW - self.lowerboundwatts
arms = [self.lowerboundwatts + (float(a)*rangeW/float(self.k))
for a in range(1, self.k+1)]
logger.info("BanditPowerActuator: discretized power limits: %r:", arms)
actions = [Action([k for k, i in pl.items()][0], int(math.floor(a)), 0)
for a in arms]
return(actions)
def execute(self, action):
logger.info("changing power limit: %r, %r",
action.command, action.delta)
self.sensor_manager.set_powerlimit(action.target, action.command)
def update(self, action):
......@@ -86,29 +88,18 @@ class Controller(object):
"""Implements a control loop for resource management."""
def __init__(self, actuators):
def __init__(self, actuators, strategy):
self.actuators = actuators
def planify(self, target, machineinfo):
"""Plan the next action for the control loop."""
total_power = machineinfo['energy']['power']['total']
direction = None
if total_power < target:
direction = 'i'
elif total_power > target:
direction = 'd'
if direction:
actions = []
for act in self.actuators:
newactions = act.available_actions(direction)
actions.extend([(a, act) for a in newactions])
if actions:
# TODO: better choice
actions.sort(key=lambda x: x[0].delta)
return actions.pop(0)
else:
return (None, None)
# current_e = float(machineinfo['energy']['energy']
# ['cumulative']['package-0'])/(1000*1000) # in joules
# In joules:
current_p = float(machineinfo['energy']['power']['p0'])/(1000*1000)
current_p = float(machineinfo['energy']['power']['p1'])/(1000*1000)
logger_power.info("%s %s %s" % (time.time(), current_p, current_p))
return (None, None)
def execute(self, action, actuator):
"""Build the action for the appropriate manager."""
......
......@@ -2,7 +2,7 @@ from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator
from controller import Controller, PowerActuator, DiscretizedPowerActuator
from powerpolicy import PowerPolicyManager
from functools import partial
import json
......@@ -20,11 +20,14 @@ RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
logger = logging.getLogger('nrm')
logger_power = logging.getLogger('power')
class Daemon(object):
def __init__(self):
def __init__(self, config):
self.target = 100.0
self.config = config
self.container_owner = dict()
def do_downstream_receive(self, parts):
logger.info("receiving downstream message: %r", parts)
......@@ -42,16 +45,17 @@ class Daemon(object):
cid = msg['container']
container = self.container_manager.containers[cid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_threads(msg)
elif event == 'progress':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'hardware-progress':
cid = msg['container']
for app_uuid in self.application_manager.applications:
app = self.application_manager.applications[app_uuid]
if app.container_uuid == cid:
app.update_hardwareprogress(msg)
elif event == 'phase_context':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
......@@ -103,33 +107,27 @@ class Daemon(object):
'type': 'start',
'container_uuid': container_uuid,
'errno': 0 if container else -1,
'pid': pid,
'power': container.power['policy'] or dict()
}
self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
client)
# setup io callbacks
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
else:
update = {'api': 'up_rpc_rep',
'type': 'process_start',
'container_uuid': container_uuid,
}
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_start'](**update), client)
# setup io callbacks
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
self.container_owner[container.uuid] = client
# now deal with the process itself
update = {'api': 'up_rpc_rep',
'type': 'process_start',
'container_uuid': container_uuid,
'pid': pid,
}
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_start'](**update), client)
# setup io callbacks
outcb = partial(self.do_children_io, client, container_uuid,
'stdout')
errcb = partial(self.do_children_io, client, container_uuid,
'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
elif msg.type == 'kill':
logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg.container_uuid)
......@@ -171,7 +169,11 @@ class Daemon(object):
logger.info("sending sensor message: %r", msg)
def do_control(self):
logger.info(
"Asking controller to plan for target %s using machine info %s"
% (self.target, self.machine_info))
plan = self.controller.planify(self.target, self.machine_info)
logger.info("Controller chose plan " + str(plan))
action, actuator = plan
if action:
self.controller.execute(action, actuator)
......@@ -183,6 +185,8 @@ class Daemon(object):
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
elif signum == signal.SIGTERM:
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
elif signum == signal.SIGCHLD:
ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
else:
......@@ -205,16 +209,30 @@ class Daemon(object):
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid]
clientid = container.clientids[pid]
remaining_pids = [p for p in container.processes.keys()
if p != pid]
# first, send a process_exit
msg = {'api': 'up_rpc_rep',
'type': 'process_exit',
'status': str(status),
'container_uuid': container.uuid,
}
if not remaining_pids:
msg['type'] = 'exit'
msg['profile_data'] = dict()
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
# if this process was owner of the container,
# kill everything
if self.container_owner[container.uuid] == clientid:
# deal with container exit
msg = {'api': 'up_rpc_rep',
'type': 'exit',
'container_uuid': container.uuid,
'profile_data': dict(),
}
pp = container.power
if pp['policy']:
pp['manager'].reset_all()
......@@ -235,16 +253,7 @@ class Daemon(object):
self.container_manager.delete(container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['exit'](**msg), clientid)
else:
msg['type'] = 'process_exit'
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
del self.container_owner[container.uuid]
else:
logger.debug("child update ignored")
pass
......@@ -295,9 +304,9 @@ class Daemon(object):
self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager()
aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = PowerActuator(self.sensor_manager)
self.controller = Controller([aa, pa])
pa = PowerActuator(self.sensor_manager,
strategy=self.config['powerlevel'])
self.controller = Controller([pa])
self.sensor_manager.start()
self.machine_info = self.sensor_manager.do_update()
......@@ -311,13 +320,46 @@ class Daemon(object):
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
signal.signal(signal.SIGTERM, self.do_signal)
signal.signal(signal.SIGCHLD, self.do_signal)
ioloop.IOLoop.current().start()
def runner():
def runner(config):
ioloop.install()
logging.basicConfig(level=logging.DEBUG)
daemon = Daemon()
if config.log:
print("Logging nrm to %s" % config.log)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.FileHandler(config.log))
if config.log_progress:
print("Logging progress data to %s" % config.log_progress)
logger_progress = logging.getLogger('progress')
logger_progress.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(message)s')
handler = logging.FileHandler(config.log_progress)
handler.setFormatter(formatter)
logger_progress.addHandler(handler)
if config.log_hardwareprogress:
print("Logging progress data to %s" % config.log_hardwareprogress)
logger_hardwareprogress = logging.getLogger('hardwareprogress')
logger_hardwareprogress.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(message)s')
handler = logging.FileHandler(config.log_hardwareprogress)
handler.setFormatter(formatter)
logger_hardwareprogress.addHandler(handler)
if config.log_power:
print("Logging progress data to %s" % config.log_power)
logger_power = logging.getLogger('power')
logger_power.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(message)s')
handler = logging.FileHandler(config.log_power)
handler.setFormatter(formatter)
logger_power.addHandler(handler)
daemon = Daemon(config)
daemon.main()
......@@ -31,7 +31,6 @@ MSGFORMATS['up_rpc_req'] = {'list': {},
}
MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'errno': int,
'pid': int,
'power': dict},
'list': {'payload': list},
'stdout': {'container_uuid': basestring,
......@@ -39,9 +38,9 @@ MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'stderr': {'container_uuid': basestring,
'payload': basestring},
'exit': {'container_uuid': basestring,
'status': basestring,
'profile_data': dict},
'process_start': {'container_uuid': basestring},
'process_start': {'container_uuid': basestring,
'pid': int},
'process_exit': {'container_uuid': basestring,
'status': basestring},
'getpower': {'limit': basestring},
......
......@@ -4,6 +4,7 @@ import logging
import xml.etree.ElementTree
import tornado.process as process
import subprocess
import os
logger = logging.getLogger('nrm')
resources = collections.namedtuple("Resources", ["cpus", "mems"])
......@@ -42,7 +43,14 @@ class NodeOSClient(object):
def __init__(self):
"""Load client configuration."""
self.prefix = "argo_nodeos_config"
if 'ARGO_NODEOS_CONFIG' in os.environ:
logger.info("NodeOSClient: bypassing argo_nodeos_config with %s\n"
% os.environ['ARGO_NODEOS_CONFIG'])
self.prefix = os.environ['ARGO_NODEOS_CONFIG']
else:
logger.info("NodeOSClient: using argo_nodeos_config from path")
self.prefix = "argo_nodeos_config"
def getavailable(self):
"""Gather available resources."""
......@@ -118,6 +126,7 @@ class NodeOSClient(object):
for envname, envval in environ.items()]
cmd += " env:'"+" ".join(env)+"'"
args.append(cmd)
logger.debug(args)
return process.Subprocess(args, stdin=process.Subprocess.STREAM,
stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM)
......