applications.py 3.82 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
from __future__ import print_function

import logging

logger = logging.getLogger('nrm')


class Application(object):

    """Information about a downstream API user."""

    thread_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
                        's_ask_i': {'done': 'stable', 'noop': 'max'},
                        's_ask_d': {'done': 'stable', 'noop': 'min'},
                        'max': {'d': 'max_ask_d'},
                        'min': {'i': 'min_ask_i'},
                        'max_ask_d': {'done': 'stable', 'noop': 'noop'},
                        'min_ask_i': {'done': 'stable', 'noop': 'noop'},
                        'noop': {}}

31
    def __init__(self, uuid, container, progress, threads, phase_contexts):
32 33 34 35 36
        self.uuid = uuid
        self.container_uuid = container
        self.progress = progress
        self.threads = threads
        self.thread_state = 'stable'
37
        self.phase_contexts = phase_contexts
38 39 40 41 42 43 44 45 46 47

    def do_thread_transition(self, event):
        """Update the thread fsm state."""
        transitions = self.thread_fsm_table[self.thread_state]
        if event in transitions:
            self.thread_state = transitions[event]

    def get_allowed_thread_requests(self):
        return self.thread_fsm_table[self.thread_state].keys()

48 49 50 51 52 53 54 55 56 57
    def get_thread_request_impact(self, command):
        # TODO: not a real model
        if command not in self.thread_fsm_table[self.thread_state]:
            return 0.0
        speed = float(self.progress)/float(self.threads['cur'])
        if command == 'i':
            return speed
        else:
            return -speed

58 59 60 61 62 63 64 65 66 67 68 69 70 71
    def update_threads(self, msg):
        """Update the thread tracking."""
        newth = msg['payload']
        curth = self.threads['cur']
        if newth == curth:
            self.do_thread_transition('noop')
        else:
            self.do_thread_transition('done')
        self.threads['cur'] = newth

    def update_progress(self, msg):
        """Update the progress tracking."""
        assert self.progress

72 73 74
    def update_performance(self, msg):
        """Update the progress tracking."""

75 76
    def update_phase_context(self, msg):
        """Update the phase contextual information."""
77 78 79 80
        id = int(msg.cpu)
        self.phase_contexts[id] = {k: getattr(msg, k) for k in
                                   ('aggregation', 'computetime',
                                    'totaltime')}
81 82
        self.phase_contexts[id]['set'] = True

83 84 85 86 87 88 89 90

class ApplicationManager(object):

    """Manages the tracking of applications: users of the downstream API."""

    def __init__(self):
        self.applications = dict()

91
    def register(self, msg, container):
92 93
        """Register a new downstream application."""

94 95
        uuid = msg['application_uuid']
        container_uuid = msg['container_uuid']
96 97
        progress = 0
        threads = False
98
        phase_contexts = dict()
99
        phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime']
100
        if container.power['policy']:
101
            ids = container.resources.cpus
102 103 104 105 106 107 108
            for id in ids:
                phase_contexts[id] = dict.fromkeys(phase_context_keys)
                phase_contexts[id]['set'] = False
        else:
            phase_contexts = None
        self.applications[uuid] = Application(uuid, container_uuid, progress,
                                              threads, phase_contexts)
109 110 111 112

    def delete(self, uuid):
        """Delete an application from the register."""
        del self.applications[uuid]