Commit 75df2004 authored by Swann Perarnau's avatar Swann Perarnau

[refactor] use downstream messaging layer

Replace the downstream API handling by the new messaging layer. Not that
we don't have a clean way to deal with dynamic concurrency control using
this API, so we disable the handling of it for now.
parent 0bae924d
...@@ -61,8 +61,8 @@ class Application(object): ...@@ -61,8 +61,8 @@ class Application(object):
def update_phase_context(self, msg): def update_phase_context(self, msg):
"""Update the phase contextual information.""" """Update the phase contextual information."""
id = int(msg['cpu']) id = msg.cpu
self.phase_contexts[id] = {k: int(msg[k]) for k in ('startcompute', self.phase_contexts[id] = {k: getattr(msg, k) for k in ('startcompute',
'endcompute', 'startbarrier', 'endbarrier')} 'endcompute', 'startbarrier', 'endbarrier')}
self.phase_contexts[id]['set'] = True self.phase_contexts[id]['set'] = True
...@@ -77,10 +77,10 @@ class ApplicationManager(object): ...@@ -77,10 +77,10 @@ class ApplicationManager(object):
def register(self, msg, container): def register(self, msg, container):
"""Register a new downstream application.""" """Register a new downstream application."""
uuid = msg['uuid'] uuid = msg.application_uuid
container_uuid = msg['container'] container_uuid = msg.container_uuid
progress = msg['progress'] progress = 0
threads = msg['threads'] threads = False
phase_contexts = dict() phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute', phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier'] 'startbarrier', 'endbarrier']
......
...@@ -5,16 +5,15 @@ from containers import ContainerManager ...@@ -5,16 +5,15 @@ from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator from controller import Controller, ApplicationActuator, PowerActuator
from powerpolicy import PowerPolicyManager from powerpolicy import PowerPolicyManager
from functools import partial from functools import partial
import json
import logging import logging
import os import os
from resources import ResourceManager from resources import ResourceManager
from sensor import SensorManager from sensor import SensorManager
import signal import signal
import zmq from zmq.eventloop import ioloop
from zmq.eventloop import ioloop, zmqstream
from nrm.messaging import MSGTYPES from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
DownstreamEventServer
RPC_MSG = MSGTYPES['up_rpc_rep'] RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub'] PUB_MSG = MSGTYPES['up_pub']
...@@ -28,46 +27,31 @@ class Daemon(object): ...@@ -28,46 +27,31 @@ class Daemon(object):
self.target = 100.0 self.target = 100.0
self.config = config self.config = config
def do_downstream_receive(self, parts): def do_downstream_receive(self, msg, client):
logger.info("receiving downstream message: %r", parts) logger.info("receiving downstream message: %r", msg)
if len(parts) != 1: if msg.type == 'application_start':
logger.error("unexpected msg length, dropping it: %r", parts) cid = msg.container_uuid
container = self.container_manager.containers[cid]
self.application_manager.register(msg, container)
elif msg.type == 'progress':
uuid = msg.container_uuid
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif msg.type == 'phase_context':
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
c = self.container_manager.containers[app.cid]
if c.power['policy']:
app.update_phase_context(msg)
elif msg.type == 'application_exit':
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
self.application_manager.delete(uuid)
else:
logger.error("unknown msg: %r", msg)
return return
msg = json.loads(parts[0])
if isinstance(msg, dict):
msgtype = msg.get('type')
event = msg.get('event')
if msgtype is None or msgtype != 'application' or event is None:
logger.error("wrong message format: %r", msg)
return
if event == 'start':
cid = msg['container']
container = self.container_manager.containers[cid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_threads(msg)
elif event == 'progress':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'phase_context':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
c = self.container_manager.containers[app.cid]
if c.power['policy']:
app.update_phase_context(msg)
elif event == 'exit':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
self.application_manager.delete(msg['uuid'])
else:
logger.error("unknown event: %r", event)
return
def do_upstream_receive(self, msg, client): def do_upstream_receive(self, msg, client):
if msg.type == 'setpower': if msg.type == 'setpower':
...@@ -267,32 +251,22 @@ class Daemon(object): ...@@ -267,32 +251,22 @@ class Daemon(object):
upstream_rpc_port = 3456 upstream_rpc_port = 3456
# setup application listening socket # setup application listening socket
context = zmq.Context() downstream_event_param = "ipc:///tmp/nrm-downstream-event"
downstream_pub_socket = context.socket(zmq.PUB)
downstream_sub_socket = context.socket(zmq.SUB)
downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port) upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port) upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
downstream_pub_socket.bind(downstream_pub_param) self.downstream_event = DownstreamEventServer(downstream_event_param)
downstream_sub_socket.bind(downstream_sub_param)
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
self.upstream_pub_server = UpstreamPubServer(upstream_pub_param) self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param) self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
logger.info("downstream pub socket bound to: %s", downstream_pub_param) logger.info("downstream event socket bound to: %s",
logger.info("downstream sub socket bound to: %s", downstream_sub_param) downstream_event_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param) logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream rpc socket connected to: %s", upstream_rpc_param) logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
# register socket triggers # register socket triggers
self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket) self.downstream_event.setup_recv_callback(self.do_downstream_receive)
self.downstream_sub.on_recv(self.do_downstream_receive)
self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive) self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
# create a stream to let ioloop deal with blocking calls on HWM
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
# create managers # create managers
self.resource_manager = ResourceManager(hwloc=self.config.hwloc) self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
......
...@@ -53,6 +53,11 @@ MSGFORMATS['down_event'] = {'application_start': ...@@ -53,6 +53,11 @@ MSGFORMATS['down_event'] = {'application_start':
'application_exit': 'application_exit':
{'application_uuid': basestring}, {'application_uuid': basestring},
'progress': {'payload': int}, 'progress': {'payload': int},
'phase_context': {'cpu': int,
'startcompute': int,
'endcompute': int,
'startbarrier': int,
'endbarrier': int},
} }
# Mirror of the message formats, using namedtuples as the actual transport # Mirror of the message formats, using namedtuples as the actual transport
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment