diff --git a/bin/argo-perf-wrapper b/bin/argo-perf-wrapper index d1409f496e49150086507193672ed16a157f34f3..6f4534fb652f7febd85d8853c472b2302f2a5ece 100755 --- a/bin/argo-perf-wrapper +++ b/bin/argo-perf-wrapper @@ -29,9 +29,10 @@ class PerfWrapper(object): def progress_report(self, progress): update = {'type': 'application', - 'event': 'progress', + 'event': 'hardware-progress', 'payload': progress, 'uuid': self.app_uuid, + 'container': self.container_uuid, } self.downstream_pub_socket.send_json(update) diff --git a/nrm/applications.py b/nrm/applications.py index 17ab21fa1cc6089fb18411de251293760d812b4e..a8e1912baede59b133bdd13a34e6a7d31c4dee82 100644 --- a/nrm/applications.py +++ b/nrm/applications.py @@ -18,10 +18,11 @@ class Application(object): 'min_ask_i': {'done': 'stable', 'noop': 'noop'}, 'noop': {}} - def __init__(self, uuid, container, progress, phase_contexts): + def __init__(self, uuid, container, progress, hardwareprogress, phase_contexts): self.uuid = uuid self.container_uuid = container self.progress = progress + self.hardwareprogress = hardwareprogress self.phase_contexts = phase_contexts def update_progress(self, msg): @@ -29,6 +30,13 @@ class Application(object): assert self.progress logger.info("received progress message: "+str(msg)) + def update_hardwareprogress(self, msg): + """Update the progress tracking.""" + logger.info("received progress message: "+str(msg)) + if not self.hardwareprogress: + logger.debug("Starting to log hardware progress.") + self.hardwareprogress = True + def update_phase_context(self, msg): """Update the phase contextual information.""" id = int(msg['cpu']) @@ -50,6 +58,7 @@ class ApplicationManager(object): uuid = msg['uuid'] container_uuid = msg['container'] progress = msg['progress'] + hardwareprogress = None phase_contexts = dict() phase_context_keys = ['set', 'startcompute', 'endcompute', 'startbarrier', 'endbarrier'] @@ -60,7 +69,7 @@ class ApplicationManager(object): phase_contexts[id]['set'] = False else: phase_contexts = None - self.applications[uuid] = Application(uuid, container_uuid, progress, phase_contexts) + self.applications[uuid] = Application(uuid, container_uuid, progress, hardwareprogress, phase_contexts) def delete(self, uuid): """Delete an application from the register.""" diff --git a/nrm/daemon.py b/nrm/daemon.py index 26230feb7b081110dd283c9b80098dba7e38801f..9330cc2c6d7aeabf3680e134e5e5811165df9e33 100644 --- a/nrm/daemon.py +++ b/nrm/daemon.py @@ -50,6 +50,12 @@ class Daemon(object): if uuid in self.application_manager.applications: app = self.application_manager.applications[uuid] app.update_progress(msg) + elif event == 'hardware-progress': + cid = msg['container'] + for app_uuid in self.application_manager.applications: + app = self.application_manager.applications[app_uuid] + if app.container_uuid == cid: + app.update_hardwareprogress(msg) elif event == 'phase_context': uuid = msg['uuid'] if uuid in self.application_manager.applications: @@ -253,6 +259,7 @@ class Daemon(object): def do_shutdown(self): self.sensor_manager.stop() ioloop.IOLoop.current().stop() + context.term() def main(self): # Bind address for downstream clients @@ -315,6 +322,7 @@ class Daemon(object): signal.signal(signal.SIGCHLD, self.do_signal) ioloop.IOLoop.current().start() + context.term() def runner(config):