Commit f43a38d3 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

[feature] Implement Application Manager

This patch moves the tracking of applications clients of the downstream
API into a ApplicationManager, that is able to track progress and thread
management.

This change is necessary in the long term to build a comprehensive
downstream API and centralize the management of application tracking.

Note that this tracking is currently independent of the container and
pid tracking, and that might be a problem in the long term.
parent 19c9eb54
from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
class Application(object):
"""Information about a downstream API user."""
thread_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'noop': 'max'},
's_ask_d': {'done': 'stable', 'noop': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'noop': 'noop'},
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
def do_thread_transition(self, event):
"""Update the thread fsm state."""
transitions = self.thread_fsm_table[self.thread_state]
if event in transitions:
self.thread_state = transitions[event]
def get_allowed_thread_requests(self):
return self.thread_fsm_table[self.thread_state].keys()
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
if newth == curth:
self.do_thread_transition('noop')
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
assert self.progress
class ApplicationManager(object):
"""Manages the tracking of applications: users of the downstream API."""
def __init__(self):
self.applications = dict()
def register(self, msg):
"""Register a new downstream application."""
uuid = msg['uuid']
container = msg['container']
progress = msg['progress']
threads = msg['threads']
self.applications[uuid] = Application(uuid, container, progress,
threads)
def delete(self, uuid):
"""Delete an application from the register."""
del self.applications[uuid]
from __future__ import print_function from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager from containers import ContainerManager
from resources import ResourceManager
from functools import partial from functools import partial
import json import json
import logging import logging
import os import os
from resources import ResourceManager
import sensor import sensor
import signal import signal
import zmq import zmq
from zmq.eventloop import ioloop, zmqstream from zmq.eventloop import ioloop, zmqstream
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'max': 'max'},
's_ask_d': {'done': 'stable', 'min': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'min': 'nop'},
'min_ask_i': {'done': 'stable', 'max': 'nop'},
'nop': {}}
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
class Application(object):
def __init__(self, identity):
self.identity = identity
self.state = 'stable'
def do_transition(self, msg):
transitions = application_fsm_table[self.state]
if msg in transitions:
self.state = transitions[msg]
else:
pass
def get_allowed_requests(self):
return application_fsm_table[self.state].keys()
class Daemon(object): class Daemon(object):
def __init__(self): def __init__(self):
self.applications = {}
self.buf = ''
self.target = 1.0 self.target = 1.0
def do_downstream_receive(self, parts): def do_downstream_receive(self, parts):
...@@ -59,15 +33,19 @@ class Daemon(object): ...@@ -59,15 +33,19 @@ class Daemon(object):
logger.error("wrong message format: %r", msg) logger.error("wrong message format: %r", msg)
return return
if event == 'start': if event == 'start':
logger.info("new application: %r", msg) self.application_manager.register(msg)
identity = msg['uuid']
self.applications[identity] = Application(identity)
elif event == 'threads': elif event == 'threads':
logger.info("change in threads") uuid = msg['uuid']
application = self.applications[msg['uuid']] if uuid in self.application_manager.applications:
application.do_transition(msg['payload']) app = self.application_manager.applications[uuid]
app.update_threads(msg)
elif event == 'progress': elif event == 'progress':
logger.info("new progress") uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'exit':
self.application_manager.delete(msg['uuid'])
else: else:
logger.error("unknown event: %r", event) logger.error("unknown event: %r", event)
return return
...@@ -150,18 +128,27 @@ class Daemon(object): ...@@ -150,18 +128,27 @@ class Daemon(object):
def do_control(self): def do_control(self):
total_power = self.machine_info['energy']['power']['total'] total_power = self.machine_info['energy']['power']['total']
for identity, application in self.applications.iteritems(): for identity, application in \
self.application_manager.applications.iteritems():
update = {'type': 'application',
'command': 'threads',
'uuid': identity,
'event': 'threads',
}
if total_power < self.target: if total_power < self.target:
if 'i' in application.get_allowed_requests(): if 'i' in application.get_allowed_thread_requests():
self.downstream.send_multipart([identity, 'i']) update['payload'] = application.threads['cur'] + 1
application.do_transition('i') self.downstream_pub.send_json(update)
application.do_thread_transition('i')
elif total_power > self.target: elif total_power > self.target:
if 'd' in application.get_allowed_requests(): if 'd' in application.get_allowed_thread_requests():
self.downstream.send_multipart([identity, 'd']) update['payload'] = application.threads['cur'] - 1
application.do_transition('d') self.downstream_pub.send_json(update)
application.do_thread_transition('d')
else: else:
pass continue
logger.info("application now in state: %s", application.state) logger.info("application now in state: %s",
application.thread_state)
def do_signal(self, signum, frame): def do_signal(self, signum, frame):
if signum == signal.SIGINT: if signum == signal.SIGINT:
...@@ -246,9 +233,10 @@ class Daemon(object): ...@@ -246,9 +233,10 @@ class Daemon(object):
self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket) self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket) self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
# create resource and container manager # create managers
self.resource_manager = ResourceManager() self.resource_manager = ResourceManager()
self.container_manager = ContainerManager(self.resource_manager) self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager()
# create sensor manager and make first measurement # create sensor manager and make first measurement
self.sensor = sensor.SensorManager() self.sensor = sensor.SensorManager()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment