GitLab maintenance scheduled for Tomorrow, 2019-04-24, from 12:00 to 13:00 CDT - Services will be unavailable during this time.

Commit c4e50535 authored by Swann Perarnau's avatar Swann Perarnau

[feature] add messaging class for pub client

Add a upstream pub client, to be able to listen to messages coming from
the daemon on the upstream pub/sub channel.

Doesn't support any fancy filter, as that's not used by the daemon so
far.
parent 93ae9144
......@@ -176,3 +176,48 @@ class UpstreamPubServer(object):
"""Sends a message."""
logger.debug("sending message: %r", msg)
self.socket.send(msg2wire(msg))
class UpstreamPubClient(object):
"""Implements the message layer client to the upstream Pub API."""
def __init__(self, address):
self.address = address
self.zmq_context = zmq.Context()
self.socket = self.zmq_context.socket(zmq.SUB)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.connect(address)
def wait_connected(self):
"""Creates a monitor socket and wait for the connect event."""
monitor = self.socket.get_monitor_socket()
while True:
msg = zmq.utils.monitor.recv_monitor_message(monitor)
logger.debug("monitor message: %r", msg)
if int(msg['event']) == zmq.EVENT_CONNECTED:
logger.debug("socket connected")
break
self.socket.disable_monitor()
def recvmsg(self):
"""Receives a message and returns it."""
frames = self.socket.recv_multipart()
logger.debug("received message: %r", frames)
assert len(frames) == 1
return wire2msg(frames[0])
def do_recv_callback(self, frames):
"""Receives a message from zmqstream.on_recv, passing it to a user
callback."""
logger.info("receiving message: %r", frames)
assert len(frames) == 1
msg = wire2msg(frames[0])
assert self.callback
self.callback(msg)
def setup_recv_callback(self, callback):
"""Setup a ioloop-backed callback for receiving messages."""
self.stream = zmqstream.ZMQStream(self.socket)
self.callback = callback
self.stream.on_recv(self.do_recv_callback)
......@@ -22,6 +22,12 @@ def upstream_pub_server():
return nrm.messaging.UpstreamPubServer("ipc:///tmp/nrm-pytest-pub")
@pytest.fixture
def upstream_pub_client():
"""Fixture for a server handle on the upstream PUB API"""
return nrm.messaging.UpstreamPubClient("ipc:///tmp/nrm-pytest-pub")
@pytest.fixture
def dummy_msg():
"""Fixture for a dummy valid message."""
......@@ -73,3 +79,13 @@ def test_rpc_server_callback(upstream_rpc_client, upstream_rpc_server,
def test_pub_server_send(upstream_pub_server, dummy_msg):
upstream_pub_server.sendmsg(dummy_msg)
def test_pub_connection(upstream_pub_client, upstream_pub_server):
upstream_pub_client.wait_connected()
def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg):
upstream_pub_server.sendmsg(dummy_msg)
msg = upstream_pub_client.recvmsg()
assert msg == dummy_msg
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment