Commit a5900178 authored by Valentin Reis's avatar Valentin Reis

Merge branch 'cmd-listen-fixes' into 'master'

Fixes for cmd listen on power

Closes #26

See merge request !42
parents af621b32 9b222f2a
Pipeline #4866 passed with stages
in 2 minutes and 22 seconds
stages:
- build
- test
- style
......@@ -13,7 +14,6 @@ py.test:
tags:
- rapl
flake8:
stage: style
script:
......@@ -23,29 +23,13 @@ flake8:
- /^wip\/.*/
- /^WIP\/.*/
helloworld.integration.test:
stage: test
script:
- nix-shell .integration.nix --run "argotk.hs helloworld"
artifacts:
paths:
- _output/cmd_err.log
- _output/cmd_out.log
- _output/daemon_out.log
- _output/daemon_out.log
- _output/nrm.log
- _output/.argo_nodeos_config_exit_message
expire_in: 1 week
except:
- /^wip\/.*/
- /^WIP\/.*/
.nix-build:
stage: build
tags:
- integration
perfwrapper.integration.test:
.nix-integration:
stage: test
script:
- nix-shell .integration.nix --run "argotk.hs perfwrapper"
artifacts:
paths:
- _output/cmd_err.log
......@@ -60,3 +44,45 @@ perfwrapper.integration.test:
- /^WIP\/.*/
tags:
- integration
containers.build:
extends: .nix-build
script:
- CACHE=$(mktemp -d --suffix=nixcache /tmp/deletable-nix-cache-XXXX)
- XDG_CACHE_HOME=$CACHE nix-build .nix -A containers
- rm -rf $CACHE
libnrm.build:
extends: .nix-build
script:
- CACHE=$(mktemp -d --suffix=nixcache /tmp/deletable-nix-cache-XXXX)
- XDG_CACHE_HOME=$CACHE nix-build .nix -A libnrm
- rm -rf $CACHE
nrm.build:
extends: .nix-build
script:
- CACHE=$(mktemp -d --suffix=nixcache /tmp/deletable-nix-cache-XXXX)
- XDG_CACHE_HOME=$CACHE nix-build .nix -A nrm
- rm -rf $CACHE
TestHello.test:
extends: .nix-integration
script:
- CACHE=$(mktemp -d --suffix=nixcache /tmp/deletable-nix-cache-XXXX)
- XDG_CACHE_HOME=$CACHE nix-shell .nix -A test --run "argotk.hs TestHello"
- rm -rf $CACHE
TestListen.test:
extends: .nix-integration
script:
- CACHE=$(mktemp -d --suffix=nixcache /tmp/deletable-nix-cache-XXXX)
- XDG_CACHE_HOME=$CACHE nix-shell .nix -A test --run "argotk.hs TestListen"
- rm -rf $CACHE
TestPerfwrapper.test:
extends: .nix-integration
script:
- CACHE=$(mktemp -d --suffix=nixcache /tmp/deletable-nix-cache-XXXX)
- XDG_CACHE_HOME=$CACHE nix-shell .nix -A test --run "argotk.hs TestPerfwrapper"
- rm -rf $CACHE
let argotest =
builtins.fetchTarball
"https://xgitlab.cels.anl.gov/argo/argotest/-/archive/master/argotest-master.tar.gz";
in import "${argotest}/test.nix" {
nrm-override = ./..;
testName = "base";
}
......@@ -30,12 +30,14 @@ class PerfWrapper(object):
msg = PUB_MSG['application_exit'](**update)
self.downstream_event.sendmsg(msg)
def progress_report(self, progress):
def performance_report(self, performance):
update = {'api': 'down_event',
'type': 'progress',
'payload': progress,
'type': 'performance',
'payload': performance,
'container_uuid' : self.container_uuid,
'application_uuid' : self.app_uuid,
}
msg = PUB_MSG['progress'](**update)
msg = PUB_MSG['performance'](**update)
self.downstream_event.sendmsg(msg)
def setup(self):
......@@ -123,7 +125,7 @@ class PerfWrapper(object):
ips = int(instructions / (time - last_time))
logger.info("instructions per second: %r", ips)
self.progress_report(ips)
self.performance_report(ips)
last_time = time
......
......@@ -61,10 +61,10 @@ class CommandLineInterface(object):
msg = self.pub_client.recvmsg()
if argv.uuid:
uuid = getattr(msg, 'container_uuid', None)
if argv.container == uuid:
logger.info("pub message", msg)
if argv.uuid == uuid:
logger.info("pub message: %s", msg)
else:
logger.info("pub message", msg)
logger.info("pub message: %s", msg)
def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command
......@@ -118,13 +118,17 @@ class CommandLineInterface(object):
assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
if msg.type == 'stdout':
logger.info("container msg: %r", msg)
logger.debug("container msg: %r", msg)
if msg.payload == 'eof':
outeof = True
else:
print(msg.payload)
elif msg.type == 'stderr':
logger.info("container msg: %r", msg)
logger.debug("container msg: %r", msg)
if msg.payload == 'eof':
erreof = True
else:
print(msg.payload, file=sys.stderr)
elif msg.type == 'process_exit':
logger.info("process ended: %r", msg)
state = 'exiting'
......
......@@ -59,6 +59,9 @@ class Application(object):
"""Update the progress tracking."""
assert self.progress
def update_performance(self, msg):
"""Update the progress tracking."""
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = msg.cpu
......
......@@ -34,10 +34,27 @@ class Daemon(object):
container = self.container_manager.containers[cid]
self.application_manager.register(msg, container)
elif msg.type == 'progress':
uuid = msg.container_uuid
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
if msg.application_uuid in self.application_manager.applications:
app = self.application_manager.applications[
msg.application_uuid]
app.update_performance(msg)
pub = {'api': 'up_pub',
'type': 'progress',
'payload': msg.payload,
'container_uuid': msg.container_uuid}
self.upstream_pub_server.sendmsg(
PUB_MSG['progress'](**pub))
elif msg.type == 'performance':
if msg.application_uuid in self.application_manager.applications:
app = self.application_manager.applications[
msg.application_uuid]
app.update_performance(msg)
pub = {'api': 'up_pub',
'type': 'performance',
'payload': msg.payload,
'container_uuid': msg.container_uuid}
self.upstream_pub_server.sendmsg(
PUB_MSG['performance'](**pub))
elif msg.type == 'phase_context':
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
......
......@@ -46,13 +46,22 @@ MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float},
'power': dict},
'container_exit': {'container_uuid': basestring,
'profile_data': dict},
'performance': {'container_uuid': basestring,
'payload': int},
'progress': {'application_uuid': basestring,
'payload': int},
}
MSGFORMATS['down_event'] = {'application_start':
{'container_uuid': basestring,
'application_uuid': basestring},
'application_exit':
{'application_uuid': basestring},
'progress': {'payload': int},
'performance': {'payload': int,
'application_uuid': basestring,
'container_uuid': basestring},
'progress': {'payload': int,
'application_uuid': basestring,
'container_uuid': basestring},
'phase_context': {'cpu': int,
'startcompute': int,
'endcompute': int,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment