Commit 0bae924d authored by Swann Perarnau's avatar Swann Perarnau

[feature] add downstream rpc to messaging layer

Add downstream RPC client/server classes that are the same as the
upstream ones.

This is part of a series of changes to downstream to allow for more
reliable communications between the daemon and applications. At this
time, the daemon never replies, so the RPC_REQ is basically used as a
way to publish events to the daemon.
parent 168247bb
...@@ -18,7 +18,7 @@ logger = logging.getLogger('nrm') ...@@ -18,7 +18,7 @@ logger = logging.getLogger('nrm')
# list of APIs supported by this messaging layer. Each message is # list of APIs supported by this messaging layer. Each message is
# indexed by its intended api user and the type of the message, along with # indexed by its intended api user and the type of the message, along with
# basic field type information. # basic field type information.
APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub'] APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub', 'down_event']
MSGFORMATS = {k: {} for k in APIS} MSGFORMATS = {k: {} for k in APIS}
MSGFORMATS['up_rpc_req'] = {'list': {}, MSGFORMATS['up_rpc_req'] = {'list': {},
'run': {'manifest': basestring, 'run': {'manifest': basestring,
...@@ -47,6 +47,13 @@ MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float}, ...@@ -47,6 +47,13 @@ MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float},
'container_exit': {'container_uuid': basestring, 'container_exit': {'container_uuid': basestring,
'profile_data': dict}, 'profile_data': dict},
} }
MSGFORMATS['down_event'] = {'application_start':
{'container_uuid': basestring,
'application_uuid': basestring},
'application_exit':
{'application_uuid': basestring},
'progress': {'payload': int},
}
# Mirror of the message formats, using namedtuples as the actual transport # Mirror of the message formats, using namedtuples as the actual transport
# for users of this messaging layer. # for users of this messaging layer.
...@@ -223,3 +230,20 @@ class UpstreamPubClient(object): ...@@ -223,3 +230,20 @@ class UpstreamPubClient(object):
self.stream = zmqstream.ZMQStream(self.socket) self.stream = zmqstream.ZMQStream(self.socket)
self.callback = callback self.callback = callback
self.stream.on_recv(self.do_recv_callback) self.stream.on_recv(self.do_recv_callback)
class DownstreamEventServer(UpstreamRPCServer):
"""Implements the message layer server for the downstream event API."""
def sendmsg(self, msg, client_uuid):
assert False, "Cannot send message from this side of the event stream."
class DownstreamEventClient(UpstreamRPCClient):
"""Implements the message layer client for the downstream event API."""
def recvmsg(self):
assert False, \
"Cannot receive messages from this side of the event stream."
...@@ -28,6 +28,18 @@ def upstream_pub_client(): ...@@ -28,6 +28,18 @@ def upstream_pub_client():
return nrm.messaging.UpstreamPubClient("ipc:///tmp/nrm-pytest-pub") return nrm.messaging.UpstreamPubClient("ipc:///tmp/nrm-pytest-pub")
@pytest.fixture
def downstream_event_server():
"""Fixture for a server handle on the downstream event API"""
return nrm.messaging.DownstreamEventServer("ipc:///tmp/nrm-pytest-down")
@pytest.fixture
def downstream_event_client():
"""Fixture for a client handle on the downstream event API"""
return nrm.messaging.DownstreamEventClient("ipc:///tmp/nrm-pytest-down")
@pytest.fixture @pytest.fixture
def dummy_msg(): def dummy_msg():
"""Fixture for a dummy valid message.""" """Fixture for a dummy valid message."""
...@@ -89,3 +101,26 @@ def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg): ...@@ -89,3 +101,26 @@ def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg):
upstream_pub_server.sendmsg(dummy_msg) upstream_pub_server.sendmsg(dummy_msg)
msg = upstream_pub_client.recvmsg() msg = upstream_pub_client.recvmsg()
assert msg == dummy_msg assert msg == dummy_msg
def test_down_connection(downstream_event_client, downstream_event_server):
downstream_event_client.wait_connected()
def test_down_event_send_recv(downstream_event_client, downstream_event_server,
dummy_msg):
downstream_event_client.sendmsg(dummy_msg)
msg, client = downstream_event_server.recvmsg()
assert msg == dummy_msg
assert client == downstream_event_client.uuid
def test_down_event_server_callback(downstream_event_client,
downstream_event_server,
dummy_msg, dummy_daemon):
downstream_event_server.setup_recv_callback(dummy_daemon.callback)
frames = [downstream_event_client.uuid, nrm.messaging.msg2wire(dummy_msg)]
downstream_event_server.do_recv_callback(frames)
assert dummy_daemon.called
assert dummy_daemon.msg == dummy_msg
assert dummy_daemon.client == downstream_event_client.uuid
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment