...
  View open merge request
Commits (1)
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
from __future__ import print_function
import logging
import os
import uuid
from time import time
import nrm.messaging as msg
logger = logging.getLogger('nrm')
class Progress(object):
"""Implements functions communicating with the NRM downstream interface."""
def __init__(self, app_uuid):
self.timestamp = None
self.app_uuid = app_uuid
self.progress_acc = 0
def shutdown(self):
self.downstream_event.send(tag="exit", application_uuid=self.app_uuid)
def progress_report(self, progress):
current_time = time()
timediff = current_time - self.timestamp
timediff = timediff *1e6
self.progress_acc += progress
if timediff > self.ratelimit_threshold:
self.downstream_event.send(
tag="progress",
payload=self.progress_acc,
container_uuid=self.container_uuid,
application_uuid=self.app_uuid)
self.progress_acc = 0
self.timestamp = current_time
def setup(self):
downstream_url = os.environ.get('ARGO_NRM_DOWNSTREAM_EVENT_URI',
"ipc:///tmp/nrm-downstream-event")
self.downstream_event = msg.DownstreamEventClient(downstream_url)
self.downstream_event.connect()
logger.info("downstream pub socket connected to: %s", downstream_url)
# retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
# retrieve our ratelimiting
self.ratelimit_threshold = int(os.environ.get('ARGO_NRM_RATELIMIT',
"10000000"))
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
self.app_uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon
self.downstream_event.send(
tag="start",
container_uuid=self.container_uuid,
application_uuid=self.app_uuid)
self.timestamp = time()