From 539f99ac4cb3a6b6764b5903c4c2178ed4a5d528 Mon Sep 17 00:00:00 2001 From: Swann Perarnau Date: Wed, 27 Feb 2019 09:21:28 -0600 Subject: [PATCH] [fix] create monitor socket before connect Change the `wait_connected` method to a `connect` method with an optional wait. Ensure that the actual connect call on the socket is done after the monitor is created, otherwise the monitor might miss the connection event. --- bin/argo-perf-wrapper | 2 +- bin/cmd | 4 ++-- nrm/messaging.py | 14 +++++++------- test/test_messaging.py | 6 +++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/bin/argo-perf-wrapper b/bin/argo-perf-wrapper index 00cd81c..67e860a 100755 --- a/bin/argo-perf-wrapper +++ b/bin/argo-perf-wrapper @@ -53,7 +53,7 @@ class PerfWrapper(object): def setup(self): downstream_url = "ipc:///tmp/nrm-downstream-event" self.downstream_event = messaging.DownstreamEventClient(downstream_url) - self.downstream_event.wait_connected() + self.downstream_event.connect() logger.info("downstream pub socket connected to: %s", downstream_url) # retrieve our container uuid diff --git a/bin/cmd b/bin/cmd index 3f8e1ac..1caf64f 100755 --- a/bin/cmd +++ b/bin/cmd @@ -59,14 +59,14 @@ class CommandLineInterface(object): self.do_signal(None, signum, frame) signal.signal(signal.SIGINT, handler) - self.client.wait_connected() + self.client.connect() def do_listen(self, argv): """ Connect to the NRM and listen for pub/sub messages.""" upstream_pub_port = 2345 upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port) self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param) - self.pub_client.wait_connected() + self.pub_client.connect() while(True): msg = self.pub_client.recvmsg() diff --git a/nrm/messaging.py b/nrm/messaging.py index 838b9c7..15a6347 100644 --- a/nrm/messaging.py +++ b/nrm/messaging.py @@ -175,12 +175,12 @@ class UpstreamRPCClient(object): self.zmq_context = zmq.Context.instance() self.socket = self.zmq_context.socket(zmq.DEALER) self.socket.setsockopt(zmq.IDENTITY, self.uuid) - self.socket.connect(address) - def wait_connected(self): - """Creates a monitor socket and wait for the connect event.""" + def connect(self, wait=True): + """Connect, and wait for the socket to be connected.""" monitor = self.socket.get_monitor_socket() - while True: + self.socket.connect(self.address) + while wait: msg = zmq.utils.monitor.recv_monitor_message(monitor) logger.debug("monitor message: %r", msg) if int(msg['event']) == zmq.EVENT_CONNECTED: @@ -264,12 +264,12 @@ class UpstreamPubClient(object): self.zmq_context = zmq.Context.instance() self.socket = self.zmq_context.socket(zmq.SUB) self.socket.setsockopt(zmq.SUBSCRIBE, '') - self.socket.connect(address) - def wait_connected(self): + def connect(self, wait=True): """Creates a monitor socket and wait for the connect event.""" monitor = self.socket.get_monitor_socket() - while True: + self.socket.connect(self.address) + while wait: msg = zmq.utils.monitor.recv_monitor_message(monitor) logger.debug("monitor message: %r", msg) if int(msg['event']) == zmq.EVENT_CONNECTED: diff --git a/test/test_messaging.py b/test/test_messaging.py index 23eeaa7..1ab04e5 100644 --- a/test/test_messaging.py +++ b/test/test_messaging.py @@ -76,7 +76,7 @@ def test_msg_convertion(dummy_msg): def test_rpc_connection(upstream_rpc_client, upstream_rpc_server): - upstream_rpc_client.wait_connected() + upstream_rpc_client.connect() def test_rpc_send_recv(upstream_rpc_client, upstream_rpc_server, dummy_msg): @@ -104,7 +104,7 @@ def test_pub_server_send(upstream_pub_server, dummy_msg): def test_pub_connection(upstream_pub_client, upstream_pub_server): - upstream_pub_client.wait_connected() + upstream_pub_client.connect() def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg): @@ -114,7 +114,7 @@ def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg): def test_down_connection(downstream_event_client, downstream_event_server): - downstream_event_client.wait_connected() + downstream_event_client.connect() def test_down_event_send_recv(downstream_event_client, downstream_event_server, -- 2.26.2