Commit 539f99ac authored by Swann Perarnau's avatar Swann Perarnau

[fix] create monitor socket before connect

Change the `wait_connected` method to a `connect` method with an
optional wait. Ensure that the actual connect call on the socket is done
after the monitor is created, otherwise the monitor might miss the
connection event.
parent e01ce640
......@@ -53,7 +53,7 @@ class PerfWrapper(object):
def setup(self):
downstream_url = "ipc:///tmp/nrm-downstream-event"
self.downstream_event = messaging.DownstreamEventClient(downstream_url)
self.downstream_event.wait_connected()
self.downstream_event.connect()
logger.info("downstream pub socket connected to: %s", downstream_url)
# retrieve our container uuid
......
......@@ -59,14 +59,14 @@ class CommandLineInterface(object):
self.do_signal(None, signum, frame)
signal.signal(signal.SIGINT, handler)
self.client.wait_connected()
self.client.connect()
def do_listen(self, argv):
""" Connect to the NRM and listen for pub/sub messages."""
upstream_pub_port = 2345
upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
self.pub_client.wait_connected()
self.pub_client.connect()
while(True):
msg = self.pub_client.recvmsg()
......
......@@ -175,12 +175,12 @@ class UpstreamRPCClient(object):
self.zmq_context = zmq.Context.instance()
self.socket = self.zmq_context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.uuid)
self.socket.connect(address)
def wait_connected(self):
"""Creates a monitor socket and wait for the connect event."""
def connect(self, wait=True):
"""Connect, and wait for the socket to be connected."""
monitor = self.socket.get_monitor_socket()
while True:
self.socket.connect(self.address)
while wait:
msg = zmq.utils.monitor.recv_monitor_message(monitor)
logger.debug("monitor message: %r", msg)
if int(msg['event']) == zmq.EVENT_CONNECTED:
......@@ -264,12 +264,12 @@ class UpstreamPubClient(object):
self.zmq_context = zmq.Context.instance()
self.socket = self.zmq_context.socket(zmq.SUB)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.connect(address)
def wait_connected(self):
def connect(self, wait=True):
"""Creates a monitor socket and wait for the connect event."""
monitor = self.socket.get_monitor_socket()
while True:
self.socket.connect(self.address)
while wait:
msg = zmq.utils.monitor.recv_monitor_message(monitor)
logger.debug("monitor message: %r", msg)
if int(msg['event']) == zmq.EVENT_CONNECTED:
......
......@@ -76,7 +76,7 @@ def test_msg_convertion(dummy_msg):
def test_rpc_connection(upstream_rpc_client, upstream_rpc_server):
upstream_rpc_client.wait_connected()
upstream_rpc_client.connect()
def test_rpc_send_recv(upstream_rpc_client, upstream_rpc_server, dummy_msg):
......@@ -104,7 +104,7 @@ def test_pub_server_send(upstream_pub_server, dummy_msg):
def test_pub_connection(upstream_pub_client, upstream_pub_server):
upstream_pub_client.wait_connected()
upstream_pub_client.connect()
def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg):
......@@ -114,7 +114,7 @@ def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg):
def test_down_connection(downstream_event_client, downstream_event_server):
downstream_event_client.wait_connected()
downstream_event_client.connect()
def test_down_event_send_recv(downstream_event_client, downstream_event_server,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment