Commit 2f939fb0 authored by Valentin Reis's avatar Valentin Reis

Tentative removal of references to "threads".

parent 06c7232b
Pipeline #4556 failed with stages
in 2 minutes and 16 seconds
......@@ -18,45 +18,12 @@ class Application(object):
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads, phase_contexts):
def __init__(self, uuid, container, progress, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
"""Update the thread fsm state."""
transitions = self.thread_fsm_table[self.thread_state]
if event in transitions:
self.thread_state = transitions[event]
def get_allowed_thread_requests(self):
return self.thread_fsm_table[self.thread_state].keys()
def get_thread_request_impact(self, command):
# TODO: not a real model
if command not in self.thread_fsm_table[self.thread_state]:
return 0.0
logger.info("SELF.PROGRESS: %s" % self.progress)
logger.info("SELF.THREADS: %s" % self.threads)
speed = float(self.progress)/float(self.threads['cur'])
if command == 'i':
return speed
else:
return -speed
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
if newth == curth:
self.do_thread_transition('noop')
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
assert self.progress
......@@ -83,7 +50,6 @@ class ApplicationManager(object):
uuid = msg['uuid']
container_uuid = msg['container']
progress = msg['progress']
threads = msg['threads']
phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier']
......@@ -94,8 +60,7 @@ class ApplicationManager(object):
phase_contexts[id]['set'] = False
else:
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
self.applications[uuid] = Application(uuid, container_uuid, progress, phase_contexts)
def delete(self, uuid):
"""Delete an application from the register."""
......
......@@ -14,44 +14,6 @@ class Action(object):
self.command = command
self.delta = delta
class ApplicationActuator(object):
"""Actuator in charge of application thread control."""
def __init__(self, am, pubstream):
self.application_manager = am
self.pubstream = pubstream
def available_actions(self, target):
ret = []
for identity, application in \
self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta))
return ret
def execute(self, action):
target_threads = action.target.threads
update = {'type': 'application',
'command': 'threads',
'uuid': action.target.uuid,
'event': 'threads',
}
if action.command == 'i':
payload = target_threads['cur'] + 1
elif action.command == 'd':
payload = target_threads['cur'] - 1
else:
assert False, "impossible command"
update['payload'] = payload
self.pubstream.send_json(update)
def update(self, action):
action.target.do_thread_transition(action.command)
class PowerActuator(object):
"""Actuator in charge of power control."""
......
......@@ -2,7 +2,7 @@ from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator
from controller import Controller, PowerActuator
from powerpolicy import PowerPolicyManager
from functools import partial
import json
......@@ -45,11 +45,6 @@ class Daemon(object):
cid = msg['container']
container = self.container_manager.containers[cid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_threads(msg)
elif event == 'progress':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
......@@ -182,6 +177,8 @@ class Daemon(object):
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
elif signum == signal.SIGTERM:
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
elif signum == signal.SIGCHLD:
ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
else:
......@@ -299,9 +296,8 @@ class Daemon(object):
self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager()
aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = PowerActuator(self.sensor_manager)
self.controller = Controller([aa, pa])
self.controller = Controller([pa])
self.sensor_manager.start()
self.machine_info = self.sensor_manager.do_update()
......@@ -315,6 +311,7 @@ class Daemon(object):
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
signal.signal(signal.SIGTERM, self.do_signal)
signal.signal(signal.SIGCHLD, self.do_signal)
ioloop.IOLoop.current().start()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment