Commit 64cf4e19 authored by Valentin Reis's avatar Valentin Reis

Merge branch 'cmd-listen-app_uuid' into 'master'

[refactor] messaging code style + cmd_listen application_uuid relaxing

See merge request !48
parents bfbddc9b 1120d6d5
Pipeline #5000 passed with stages
in 6 minutes and 13 seconds
...@@ -3,5 +3,5 @@ let argotest = ...@@ -3,5 +3,5 @@ let argotest =
"https://xgitlab.cels.anl.gov/argo/argotest/-/archive/master/argotest-master.tar.gz"; "https://xgitlab.cels.anl.gov/argo/argotest/-/archive/master/argotest-master.tar.gz";
in import "${argotest}/test.nix" { in import "${argotest}/test.nix" {
nrm-override = ./..; nrm-override = ./..;
testName = "refactored"; testName = "cmd-uuid";
} }
...@@ -65,19 +65,29 @@ class CommandLineInterface(object): ...@@ -65,19 +65,29 @@ class CommandLineInterface(object):
def print_if_filter(): def print_if_filter():
if argv.filter: if argv.filter:
if argv.filter == msg.type: if argv.filter == msg.type:
print("%s, %s, %s" % (msg.type, time.time(), if (msg.type == "performance" or
msg.payload)) msg.type == "progress"):
print("%s, %s, %s" % (msg.type, time.time(),
msg.payload))
if msg.type == "power":
print("%s, %s, %s" % (msg.type, time.time(),
msg.total))
sys.stdout.flush()
if msg.type == "container_exit":
print("%s, %s, %s" % (msg.type, time.time(),
msg.profile_data))
sys.stdout.flush() sys.stdout.flush()
else: else:
print("%s, %s" % (msg.type, time.time())) print("%s, %s" % (msg.type, time.time()))
sys.stdout.flush() sys.stdout.flush()
if argv.uuid: print_if_filter()
uuid = getattr(msg, 'container_uuid', None) # if argv.uuid:
if argv.uuid == uuid or msg.type == "power": # uuid = getattr(msg, 'container_uuid', None)
print_if_filter() # if argv.uuid == uuid or msg.type == "power":
else: # print_if_filter()
print_if_filter() # else:
# print_if_filter()
def do_run(self, argv): def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command """ Connect to the NRM and ask to spawn a container and run a command
......
...@@ -38,12 +38,12 @@ class Daemon(object): ...@@ -38,12 +38,12 @@ class Daemon(object):
app = self.application_manager.applications[ app = self.application_manager.applications[
msg.application_uuid] msg.application_uuid]
app.update_performance(msg) app.update_performance(msg)
pub = {'api': 'up_pub', pub = {'api': 'up_pub',
'type': 'progress', 'type': 'progress',
'payload': msg.payload, 'payload': msg.payload,
'container_uuid': msg.container_uuid} 'application_uuid': msg.application_uuid}
self.upstream_pub_server.sendmsg( self.upstream_pub_server.sendmsg(
PUB_MSG['progress'](**pub)) PUB_MSG['progress'](**pub))
elif msg.type == 'performance': elif msg.type == 'performance':
if msg.application_uuid in self.application_manager.applications: if msg.application_uuid in self.application_manager.applications:
app = self.application_manager.applications[ app = self.application_manager.applications[
......
...@@ -20,54 +20,99 @@ logger = logging.getLogger('nrm') ...@@ -20,54 +20,99 @@ logger = logging.getLogger('nrm')
# basic field type information. # basic field type information.
APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub', 'down_event'] APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub', 'down_event']
MSGFORMATS = {k: {} for k in APIS} MSGFORMATS = {k: {} for k in APIS}
MSGFORMATS['up_rpc_req'] = {'list': {},
'run': {'manifest': basestring, MSGFORMATS['up_rpc_req'] = {
'path': basestring, 'list': {},
'args': list, 'run': {
'container_uuid': basestring, 'manifest': basestring,
'environ': dict}, 'path': basestring,
'kill': {'container_uuid': basestring}, 'args': list,
'setpower': {'limit': basestring}, 'container_uuid': basestring,
} 'environ': dict
MSGFORMATS['up_rpc_rep'] = {'list': {'payload': list}, },
'stdout': {'container_uuid': basestring, 'kill': {
'payload': basestring}, 'container_uuid': basestring
'stderr': {'container_uuid': basestring, },
'payload': basestring}, 'setpower': {
'process_start': {'container_uuid': basestring, 'limit': basestring
'pid': int}, }
'process_exit': {'container_uuid': basestring, }
'status': basestring},
'getpower': {'limit': basestring}, MSGFORMATS['up_rpc_rep'] = {
} 'list': {
MSGFORMATS['up_pub'] = {'power': {'total': float, 'limit': float}, 'payload': list
'container_start': {'container_uuid': basestring, },
'errno': int, 'stdout': {
'power': basestring}, 'container_uuid': basestring,
'container_exit': {'container_uuid': basestring, 'payload': basestring
'profile_data': dict}, },
'performance': {'container_uuid': basestring, 'stderr': {
'payload': int}, 'container_uuid': basestring,
'progress': {'application_uuid': basestring, 'payload': basestring
'payload': int}, },
} 'process_start': {
MSGFORMATS['down_event'] = {'application_start': 'container_uuid': basestring,
{'container_uuid': basestring, 'pid': int
'application_uuid': basestring}, },
'application_exit': 'process_exit': {
{'application_uuid': basestring}, 'container_uuid': basestring,
'performance': {'payload': int, 'status': basestring
'application_uuid': basestring, },
'container_uuid': basestring}, 'getpower': {
'progress': {'payload': int, 'limit': basestring
'application_uuid': basestring, }
'container_uuid': basestring}, }
'phase_context': {'cpu': int,
'aggregation': int, MSGFORMATS['up_pub'] = {
'computetime': int, 'power': {
'totaltime': int, 'total': float,
'application_uuid': basestring}, 'limit': float
} },
'container_start': {
'container_uuid': basestring,
'errno': int,
'power': dict
},
'container_exit': {
'container_uuid': basestring,
'profile_data': dict
},
'performance': {
'container_uuid': basestring,
'payload': int
},
'progress': {
'application_uuid': basestring,
'payload': int
}
}
MSGFORMATS['down_event'] = {
'application_start': {
'container_uuid': basestring,
'application_uuid': basestring
},
'application_exit': {
'application_uuid': basestring
},
'performance': {
'container_uuid': basestring,
'application_uuid': basestring,
'payload': int,
},
'progress': {
'application_uuid': basestring,
'container_uuid': basestring,
'payload': int
},
'phase_context': {
'cpu': int,
'startcompute': int,
'endcompute': int,
'startbarrier': int,
'endbarrier': int
}
}
# Mirror of the message formats, using namedtuples as the actual transport # Mirror of the message formats, using namedtuples as the actual transport
# for users of this messaging layer. # for users of this messaging layer.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment