From c4e50535c9ea462c1994605b45f31157d5179d0b Mon Sep 17 00:00:00 2001 From: Swann Perarnau Date: Wed, 28 Nov 2018 14:30:46 -0600 Subject: [PATCH] [feature] add messaging class for pub client Add a upstream pub client, to be able to listen to messages coming from the daemon on the upstream pub/sub channel. Doesn't support any fancy filter, as that's not used by the daemon so far. --- nrm/messaging.py | 45 ++++++++++++++++++++++++++++++++++++++++++ test/test_messaging.py | 16 +++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/nrm/messaging.py b/nrm/messaging.py index d9f4c82..053bcd3 100644 --- a/nrm/messaging.py +++ b/nrm/messaging.py @@ -176,3 +176,48 @@ class UpstreamPubServer(object): """Sends a message.""" logger.debug("sending message: %r", msg) self.socket.send(msg2wire(msg)) + + +class UpstreamPubClient(object): + + """Implements the message layer client to the upstream Pub API.""" + + def __init__(self, address): + self.address = address + self.zmq_context = zmq.Context() + self.socket = self.zmq_context.socket(zmq.SUB) + self.socket.setsockopt(zmq.SUBSCRIBE, '') + self.socket.connect(address) + + def wait_connected(self): + """Creates a monitor socket and wait for the connect event.""" + monitor = self.socket.get_monitor_socket() + while True: + msg = zmq.utils.monitor.recv_monitor_message(monitor) + logger.debug("monitor message: %r", msg) + if int(msg['event']) == zmq.EVENT_CONNECTED: + logger.debug("socket connected") + break + self.socket.disable_monitor() + + def recvmsg(self): + """Receives a message and returns it.""" + frames = self.socket.recv_multipart() + logger.debug("received message: %r", frames) + assert len(frames) == 1 + return wire2msg(frames[0]) + + def do_recv_callback(self, frames): + """Receives a message from zmqstream.on_recv, passing it to a user + callback.""" + logger.info("receiving message: %r", frames) + assert len(frames) == 1 + msg = wire2msg(frames[0]) + assert self.callback + self.callback(msg) + + def setup_recv_callback(self, callback): + """Setup a ioloop-backed callback for receiving messages.""" + self.stream = zmqstream.ZMQStream(self.socket) + self.callback = callback + self.stream.on_recv(self.do_recv_callback) diff --git a/test/test_messaging.py b/test/test_messaging.py index d060005..7df4479 100644 --- a/test/test_messaging.py +++ b/test/test_messaging.py @@ -22,6 +22,12 @@ def upstream_pub_server(): return nrm.messaging.UpstreamPubServer("ipc:///tmp/nrm-pytest-pub") +@pytest.fixture +def upstream_pub_client(): + """Fixture for a server handle on the upstream PUB API""" + return nrm.messaging.UpstreamPubClient("ipc:///tmp/nrm-pytest-pub") + + @pytest.fixture def dummy_msg(): """Fixture for a dummy valid message.""" @@ -73,3 +79,13 @@ def test_rpc_server_callback(upstream_rpc_client, upstream_rpc_server, def test_pub_server_send(upstream_pub_server, dummy_msg): upstream_pub_server.sendmsg(dummy_msg) + + +def test_pub_connection(upstream_pub_client, upstream_pub_server): + upstream_pub_client.wait_connected() + + +def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg): + upstream_pub_server.sendmsg(dummy_msg) + msg = upstream_pub_client.recvmsg() + assert msg == dummy_msg -- 2.26.2