Commit 1391a197 authored by Swann Perarnau's avatar Swann Perarnau

[refactor] daemon should always bind on sockets

The way 0MQ works on PUB/SUB sockets, publishers might drop
messages if subscribers are not detected faster enough. One way to fix
it is to have the "server" always bind sockets, and the "client" use
connect. This way, the handshake is initiated properly, and the client
can publish as soon as the connection is done.

This patch makes the daemon bind on the upstream API and the CLI connect,
fixing in the process the message dropping we were experiencing before.

Long term, we might have a think of using 2 types of sockets for the
upstream API: pub/sub for actual events published from the daemon, and
a REQ/REP or ROUTER/DEALER pair for "commands".
parent 6f53bbbd
......@@ -31,10 +31,10 @@ class CommandLineInterface(object):
self.upstream_pub_socket = self.context.socket(zmq.PUB)
self.upstream_sub_socket = self.context.socket(zmq.SUB)
upstream_pub_param = "tcp://*:%d" % (upstream_pub_port)
upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)
self.upstream_pub_socket.bind(upstream_pub_param)
self.upstream_pub_socket.connect(upstream_pub_param)
self.upstream_sub_socket.connect(upstream_sub_param)
# we want to receive everything for now
upstream_sub_filter = ""
......@@ -98,8 +98,6 @@ class CommandLineInterface(object):
exitmsg = msg
else:
logger.info("unexpected exit message: %r", msg)
if state == 'init':
self.upstream_pub_socket.send_json(command)
if outeof and erreof and state == 'exiting':
state = 'exit'
logger.info("container ended: %r", exitmsg)
......@@ -115,8 +113,8 @@ class CommandLineInterface(object):
command = {'command': 'list',
}
self.upstream_pub_socket.send_json(command)
while(True):
self.upstream_pub_socket.send_json(command)
msg = self.upstream_sub_socket.recv_json()
logger.info("new message: %r", msg)
# ignore other messages
......@@ -135,8 +133,8 @@ class CommandLineInterface(object):
'uuid': argv.uuid
}
self.upstream_pub_socket.send_json(command)
while(True):
self.upstream_pub_socket.send_json(command)
msg = self.upstream_sub_socket.recv_json()
logger.info("new message: %r", msg)
# ignore other messages
......@@ -159,8 +157,8 @@ class CommandLineInterface(object):
'limit': argv.limit,
}
self.upstream_pub_socket.send_json(command)
while(True):
self.upstream_pub_socket.send_json(command)
msg = self.upstream_sub_socket.recv_json()
logger.info("new message: %r", msg)
# ignore other messages
......
......@@ -250,11 +250,11 @@ class Daemon(object):
downstream_bind_param = "tcp://%s:%d" % (bind_address, bind_port)
upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)
upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
downstream_socket.bind(downstream_bind_param)
upstream_pub_socket.bind(upstream_pub_param)
upstream_sub_socket.connect(upstream_sub_param)
upstream_sub_socket.bind(upstream_sub_param)
upstream_sub_filter = ""
upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment