Commit af621b32 authored by Valentin Reis's avatar Valentin Reis

Merge branch 'downstream-refactor' into 'master'

Downstream refactor

See merge request !41
parents e43c491a 37f55942
Pipeline #4759 passed with stages
in 2 minutes and 10 seconds
...@@ -23,7 +23,7 @@ flake8: ...@@ -23,7 +23,7 @@ flake8:
- /^wip\/.*/ - /^wip\/.*/
- /^WIP\/.*/ - /^WIP\/.*/
integration.test: helloworld.integration.test:
stage: test stage: test
script: script:
- nix-shell .integration.nix --run "argotk.hs helloworld" - nix-shell .integration.nix --run "argotk.hs helloworld"
...@@ -34,7 +34,25 @@ integration.test: ...@@ -34,7 +34,25 @@ integration.test:
- _output/daemon_out.log - _output/daemon_out.log
- _output/daemon_out.log - _output/daemon_out.log
- _output/nrm.log - _output/nrm.log
- _output/time.log - _output/.argo_nodeos_config_exit_message
expire_in: 1 week
except:
- /^wip\/.*/
- /^WIP\/.*/
tags:
- integration
perfwrapper.integration.test:
stage: test
script:
- nix-shell .integration.nix --run "argotk.hs perfwrapper"
artifacts:
paths:
- _output/cmd_err.log
- _output/cmd_out.log
- _output/daemon_out.log
- _output/daemon_out.log
- _output/nrm.log
- _output/.argo_nodeos_config_exit_message - _output/.argo_nodeos_config_exit_message
expire_in: 1 week expire_in: 1 week
except: except:
......
{ argotest ? (builtins.fetchGit { { argotest ? (builtins.fetchGit {
url = https://xgitlab.cels.anl.gov/argo/argotest.git; url = https://xgitlab.cels.anl.gov/argo/argotest.git;
ref="refactor-argotk"; ref="master";
rev="a20358e5b72f267eb8e2a9152e62c9ebbb3b2d4a"; }) rev="646d42f7b64f56cdb3ff54a7b4a59e0dfad3209c";
})
}: }:
(import argotest { (import argotest {
nrm-src = ./.; nrm-src = ./.;
libnrm-src = builtins.fetchGit {
url = https://xgitlab.cels.anl.gov/argo/libnrm.git;
ref="downstream-refactor"; };
}).test }).test
...@@ -4,12 +4,13 @@ from __future__ import print_function ...@@ -4,12 +4,13 @@ from __future__ import print_function
import argparse import argparse
import logging import logging
import zmq
import os import os
import tempfile import tempfile
import subprocess import subprocess
import uuid import uuid
from nrm import messaging
PUB_MSG = messaging.MSGTYPES['down_event']
logger = logging.getLogger('perf-wrapper') logger = logging.getLogger('perf-wrapper')
...@@ -22,30 +23,26 @@ class PerfWrapper(object): ...@@ -22,30 +23,26 @@ class PerfWrapper(object):
pass pass
def shutdown(self): def shutdown(self):
update = {'type': 'application', update = {'api': 'down_event',
'event': 'exit', 'type': 'application_exit',
'uuid': self.app_uuid, 'application_uuid': self.app_uuid,
} }
self.downstream_pub_socket.send_json(update) msg = PUB_MSG['application_exit'](**update)
self.downstream_event.sendmsg(msg)
def progress_report(self, progress): def progress_report(self, progress):
update = {'type': 'application', update = {'api': 'down_event',
'event': 'progress', 'type': 'progress',
'payload': progress, 'payload': progress,
'uuid': self.app_uuid,
} }
self.downstream_pub_socket.send_json(update) msg = PUB_MSG['progress'](**update)
self.downstream_event.sendmsg(msg)
def setup(self): def setup(self):
context = zmq.Context() downstream_url = "ipc:///tmp/nrm-downstream-event"
self.downstream_pub_socket = context.socket(zmq.PUB) self.downstream_event = messaging.DownstreamEventClient(downstream_url)
self.downstream_event.wait_connected()
downstream_pub_param = "ipc:///tmp/nrm-downstream-in" logger.info("downstream pub socket connected to: %s", downstream_url)
self.downstream_pub_socket.connect(downstream_pub_param)
logger.info("downstream pub socket connected to: %s",
downstream_pub_param)
# retrieve our container uuid # retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID') self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
...@@ -54,16 +51,14 @@ class PerfWrapper(object): ...@@ -54,16 +51,14 @@ class PerfWrapper(object):
exit(1) exit(1)
self.app_uuid = str(uuid.uuid4()) self.app_uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.app_uuid) logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon # send an hello to the demon
update = {'type': 'application', update = {'api': 'down_event',
'event': 'start', 'type': 'application_start',
'container': self.container_uuid, 'container_uuid': self.container_uuid,
'uuid': self.app_uuid, 'application_uuid': self.app_uuid,
'progress': True,
'threads': {'min': 1, 'cur': 1, 'max': 1},
} }
self.downstream_pub_socket.send_json(update) msg = PUB_MSG['application_start'](**update)
self.downstream_event.sendmsg(msg)
def main(self): def main(self):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
......
...@@ -33,7 +33,9 @@ def main(argv=None): ...@@ -33,7 +33,9 @@ def main(argv=None):
"hwloc": "hwloc", "hwloc": "hwloc",
"perf": "perf", "perf": "perf",
"argo_perf_wrapper": "argo-perf-wrapper", "argo_perf_wrapper": "argo-perf-wrapper",
"argo_nodeos_config": "argo_nodeos_config"} "argo_nodeos_config": "argo_nodeos_config",
"pmpi_lib": "/usr/lib/libnrm-pmpi.so",
}
if args.print_defaults: if args.print_defaults:
print defaults print defaults
...@@ -74,6 +76,11 @@ def main(argv=None): ...@@ -74,6 +76,11 @@ def main(argv=None):
"variable.", "variable.",
default=os.environ.get('PERF', default=os.environ.get('PERF',
'perf')) 'perf'))
parser.add_argument(
'--pmpi_lib',
help="Path to the libnrm PMPI library used for the power policy"
"Override default with the PMPI environment variable.",
default=os.environ.get('PMPI', defaults['pmpi_lib']))
parser.add_argument( parser.add_argument(
'--argo_perf_wrapper', '--argo_perf_wrapper',
help="Path to the linux perf tool to use. This path can" help="Path to the linux perf tool to use. This path can"
......
...@@ -61,8 +61,8 @@ class Application(object): ...@@ -61,8 +61,8 @@ class Application(object):
def update_phase_context(self, msg): def update_phase_context(self, msg):
"""Update the phase contextual information.""" """Update the phase contextual information."""
id = int(msg['cpu']) id = msg.cpu
self.phase_contexts[id] = {k: int(msg[k]) for k in ('startcompute', self.phase_contexts[id] = {k: getattr(msg, k) for k in ('startcompute',
'endcompute', 'startbarrier', 'endbarrier')} 'endcompute', 'startbarrier', 'endbarrier')}
self.phase_contexts[id]['set'] = True self.phase_contexts[id]['set'] = True
...@@ -77,10 +77,10 @@ class ApplicationManager(object): ...@@ -77,10 +77,10 @@ class ApplicationManager(object):
def register(self, msg, container): def register(self, msg, container):
"""Register a new downstream application.""" """Register a new downstream application."""
uuid = msg['uuid'] uuid = msg.application_uuid
container_uuid = msg['container'] container_uuid = msg.container_uuid
progress = msg['progress'] progress = 0
threads = msg['threads'] threads = False
phase_contexts = dict() phase_contexts = dict()
phase_context_keys = ['set', 'startcompute', 'endcompute', phase_context_keys = ['set', 'startcompute', 'endcompute',
'startbarrier', 'endbarrier'] 'startbarrier', 'endbarrier']
......
...@@ -19,7 +19,8 @@ class ContainerManager(object): ...@@ -19,7 +19,8 @@ class ContainerManager(object):
def __init__(self, rm, def __init__(self, rm,
perfwrapper="argo-perf-wrapper", perfwrapper="argo-perf-wrapper",
linuxperf="perf", linuxperf="perf",
argo_nodeos_config="argo_nodeos_config"): argo_nodeos_config="argo_nodeos_config",
pmpi_lib="/usr/lib/libnrm-pmpi.so"):
self.linuxperf = linuxperf self.linuxperf = linuxperf
self.perfwrapper = perfwrapper self.perfwrapper = perfwrapper
self.nodeos = NodeOSClient(argo_nodeos_config=argo_nodeos_config) self.nodeos = NodeOSClient(argo_nodeos_config=argo_nodeos_config)
...@@ -27,6 +28,7 @@ class ContainerManager(object): ...@@ -27,6 +28,7 @@ class ContainerManager(object):
self.pids = dict() self.pids = dict()
self.resourcemanager = rm self.resourcemanager = rm
self.chrt = ChrtClient() self.chrt = ChrtClient()
self.pmpi_lib = pmpi_lib
def create(self, request): def create(self, request):
"""Create a container according to the request. """Create a container according to the request.
...@@ -49,7 +51,7 @@ class ContainerManager(object): ...@@ -49,7 +51,7 @@ class ContainerManager(object):
logger.info("run: ucontainername: %s", ucontainername) logger.info("run: ucontainername: %s", ucontainername)
# TODO: Application library to load must be set during configuration # TODO: Application library to load must be set during configuration
apppreloadlibrary = '' apppreloadlibrary = self.pmpi_lib
manifest = ImageManifest() manifest = ImageManifest()
if not manifest.load(manifestfile): if not manifest.load(manifestfile):
......
...@@ -2,19 +2,18 @@ from __future__ import print_function ...@@ -2,19 +2,18 @@ from __future__ import print_function
from applications import ApplicationManager from applications import ApplicationManager
from containers import ContainerManager from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator from controller import Controller, PowerActuator
from powerpolicy import PowerPolicyManager from powerpolicy import PowerPolicyManager
from functools import partial from functools import partial
import json
import logging import logging
import os import os
from resources import ResourceManager from resources import ResourceManager
from sensor import SensorManager from sensor import SensorManager
import signal import signal
import zmq from zmq.eventloop import ioloop
from zmq.eventloop import ioloop, zmqstream
from nrm.messaging import MSGTYPES from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer from nrm.messaging import UpstreamRPCServer, UpstreamPubServer, \
DownstreamEventServer
RPC_MSG = MSGTYPES['up_rpc_rep'] RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub'] PUB_MSG = MSGTYPES['up_pub']
...@@ -28,46 +27,31 @@ class Daemon(object): ...@@ -28,46 +27,31 @@ class Daemon(object):
self.target = 100.0 self.target = 100.0
self.config = config self.config = config
def do_downstream_receive(self, parts): def do_downstream_receive(self, msg, client):
logger.info("receiving downstream message: %r", parts) logger.info("receiving downstream message: %r", msg)
if len(parts) != 1: if msg.type == 'application_start':
logger.error("unexpected msg length, dropping it: %r", parts) cid = msg.container_uuid
container = self.container_manager.containers[cid]
self.application_manager.register(msg, container)
elif msg.type == 'progress':
uuid = msg.container_uuid
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif msg.type == 'phase_context':
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
c = self.container_manager.containers[app.cid]
if c.power['policy']:
app.update_phase_context(msg)
elif msg.type == 'application_exit':
uuid = msg.application_uuid
if uuid in self.application_manager.applications:
self.application_manager.delete(uuid)
else:
logger.error("unknown msg: %r", msg)
return return
msg = json.loads(parts[0])
if isinstance(msg, dict):
msgtype = msg.get('type')
event = msg.get('event')
if msgtype is None or msgtype != 'application' or event is None:
logger.error("wrong message format: %r", msg)
return
if event == 'start':
cid = msg['container']
container = self.container_manager.containers[cid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_threads(msg)
elif event == 'progress':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'phase_context':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
c = self.container_manager.containers[app.cid]
if c.power['policy']:
app.update_phase_context(msg)
elif event == 'exit':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
self.application_manager.delete(msg['uuid'])
else:
logger.error("unknown event: %r", event)
return
def do_upstream_receive(self, msg, client): def do_upstream_receive(self, msg, client):
if msg.type == 'setpower': if msg.type == 'setpower':
...@@ -267,32 +251,22 @@ class Daemon(object): ...@@ -267,32 +251,22 @@ class Daemon(object):
upstream_rpc_port = 3456 upstream_rpc_port = 3456
# setup application listening socket # setup application listening socket
context = zmq.Context() downstream_event_param = "ipc:///tmp/nrm-downstream-event"
downstream_pub_socket = context.socket(zmq.PUB)
downstream_sub_socket = context.socket(zmq.SUB)
downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port) upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port) upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
downstream_pub_socket.bind(downstream_pub_param) self.downstream_event = DownstreamEventServer(downstream_event_param)
downstream_sub_socket.bind(downstream_sub_param)
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
self.upstream_pub_server = UpstreamPubServer(upstream_pub_param) self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param) self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
logger.info("downstream pub socket bound to: %s", downstream_pub_param) logger.info("downstream event socket bound to: %s",
logger.info("downstream sub socket bound to: %s", downstream_sub_param) downstream_event_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param) logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream rpc socket connected to: %s", upstream_rpc_param) logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
# register socket triggers # register socket triggers
self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket) self.downstream_event.setup_recv_callback(self.do_downstream_receive)
self.downstream_sub.on_recv(self.do_downstream_receive)
self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive) self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
# create a stream to let ioloop deal with blocking calls on HWM
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
# create managers # create managers
self.resource_manager = ResourceManager(hwloc=self.config.hwloc) self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
...@@ -300,13 +274,13 @@ class Daemon(object): ...@@ -300,13 +274,13 @@ class Daemon(object):
self.resource_manager, self.resource_manager,
perfwrapper=self.config.argo_perf_wrapper, perfwrapper=self.config.argo_perf_wrapper,
linuxperf=self.config.perf, linuxperf=self.config.perf,
argo_nodeos_config=self.config.argo_nodeos_config argo_nodeos_config=self.config.argo_nodeos_config,
pmpi_lib=self.config.pmpi_lib,
) )
self.application_manager = ApplicationManager() self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager() self.sensor_manager = SensorManager()
aa = ApplicationActuator(self.application_manager, self.downstream_pub)
pa = PowerActuator(self.sensor_manager) pa = PowerActuator(self.sensor_manager)
self.controller = Controller([aa, pa]) self.controller = Controller([pa])
self.sensor_manager.start() self.sensor_manager.start()
self.machine_info = self.sensor_manager.do_update() self.machine_info = self.sensor_manager.do_update()
......
...@@ -18,7 +18,7 @@ logger = logging.getLogger('nrm') ...@@ -18,7 +18,7 @@ logger = logging.getLogger('nrm')
# list of APIs supported by this messaging layer. Each message is # list of APIs supported by this messaging layer. Each message is
# indexed by its intended api user and the type of the message, along with # indexed by its intended api user and the type of the message, along with
# basic field type information. # basic field type information.
APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub'] APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub', 'down_event']
MSGFORMATS = {k: {} for k in APIS} MSGFORMATS = {k: {} for k in APIS}
MSGFORMATS['up_rpc_req'] = {'list': {}, MSGFORMATS['up_rpc_req'] = {'list': {},
'run': {'manifest': basestring, 'run': {'manifest': basestring,
...@@ -47,6 +47,18 @@ MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float}, ...@@ -47,6 +47,18 @@ MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float},
'container_exit': {'container_uuid': basestring, 'container_exit': {'container_uuid': basestring,
'profile_data': dict}, 'profile_data': dict},
} }
MSGFORMATS['down_event'] = {'application_start':
{'container_uuid': basestring,
'application_uuid': basestring},
'application_exit':
{'application_uuid': basestring},
'progress': {'payload': int},
'phase_context': {'cpu': int,
'startcompute': int,
'endcompute': int,
'startbarrier': int,
'endbarrier': int},
}
# Mirror of the message formats, using namedtuples as the actual transport # Mirror of the message formats, using namedtuples as the actual transport
# for users of this messaging layer. # for users of this messaging layer.
...@@ -223,3 +235,20 @@ class UpstreamPubClient(object): ...@@ -223,3 +235,20 @@ class UpstreamPubClient(object):
self.stream = zmqstream.ZMQStream(self.socket) self.stream = zmqstream.ZMQStream(self.socket)
self.callback = callback self.callback = callback
self.stream.on_recv(self.do_recv_callback) self.stream.on_recv(self.do_recv_callback)
class DownstreamEventServer(UpstreamRPCServer):
"""Implements the message layer server for the downstream event API."""
def sendmsg(self, msg, client_uuid):
assert False, "Cannot send message from this side of the event stream."
class DownstreamEventClient(UpstreamRPCClient):
"""Implements the message layer client for the downstream event API."""
def recvmsg(self):
assert False, \
"Cannot receive messages from this side of the event stream."
...@@ -28,6 +28,18 @@ def upstream_pub_client(): ...@@ -28,6 +28,18 @@ def upstream_pub_client():
return nrm.messaging.UpstreamPubClient("ipc:///tmp/nrm-pytest-pub") return nrm.messaging.UpstreamPubClient("ipc:///tmp/nrm-pytest-pub")
@pytest.fixture
def downstream_event_server():
"""Fixture for a server handle on the downstream event API"""
return nrm.messaging.DownstreamEventServer("ipc:///tmp/nrm-pytest-down")
@pytest.fixture
def downstream_event_client():
"""Fixture for a client handle on the downstream event API"""
return nrm.messaging.DownstreamEventClient("ipc:///tmp/nrm-pytest-down")
@pytest.fixture @pytest.fixture
def dummy_msg(): def dummy_msg():
"""Fixture for a dummy valid message.""" """Fixture for a dummy valid message."""
...@@ -89,3 +101,26 @@ def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg): ...@@ -89,3 +101,26 @@ def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg):
upstream_pub_server.sendmsg(dummy_msg) upstream_pub_server.sendmsg(dummy_msg)
msg = upstream_pub_client.recvmsg() msg = upstream_pub_client.recvmsg()
assert msg == dummy_msg assert msg == dummy_msg
def test_down_connection(downstream_event_client, downstream_event_server):
downstream_event_client.wait_connected()
def test_down_event_send_recv(downstream_event_client, downstream_event_server,
dummy_msg):
downstream_event_client.sendmsg(dummy_msg)
msg, client = downstream_event_server.recvmsg()
assert msg == dummy_msg
assert client == downstream_event_client.uuid
def test_down_event_server_callback(downstream_event_client,
downstream_event_server,
dummy_msg, dummy_daemon):
downstream_event_server.setup_recv_callback(dummy_daemon.callback)
frames = [downstream_event_client.uuid, nrm.messaging.msg2wire(dummy_msg)]
downstream_event_server.do_recv_callback(frames)
assert dummy_daemon.called
assert dummy_daemon.msg == dummy_msg
assert dummy_daemon.client == downstream_event_client.uuid
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment