Commit 957deb8d authored by Swann Perarnau's avatar Swann Perarnau
Browse files

[feature] Properly handle run events in order

This patch implements a small finite state machine on the cmd side to be
able to run a command, wait for all of its output, and then exit.

As the daemon can send those message in any order, we need to wait them
properly, in particular the closing of stdout/stderr before exiting.

This patch also fixes the read_until_close callback creation to ensure
that the stream EOF is handled as a distinct message.
parent f2bc8b80
...@@ -67,16 +67,42 @@ class CommandLineInterface(object): ...@@ -67,16 +67,42 @@ class CommandLineInterface(object):
'args': argv.args, 'args': argv.args,
'uuid': containerid, 'uuid': containerid,
} }
# command fsm
while(True): state = 'init'
outeof = False
erreof = False
exitmsg = None
self.upstream_pub_socket.send_json(command) self.upstream_pub_socket.send_json(command)
while(True):
msg = self.upstream_sub_socket.recv_json() msg = self.upstream_sub_socket.recv_json()
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'container': if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['uuid'] == containerid: if msg['uuid'] == containerid:
if msg['event'] == 'start':
if state == 'init':
state = 'started'
logger.info("container started: %r", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg['event'] == 'stdout':
logger.info("container msg: %r", msg) logger.info("container msg: %r", msg)
if msg['event'] == 'exit': if msg['payload'] == 'eof':
outeof = True
elif msg['event'] == 'stderr':
logger.info("container msg: %r", msg)
if msg['payload'] == 'eof':
erreof = True
elif msg['event'] == 'exit':
if state == 'started':
state = 'exiting'
exitmsg = msg
else:
logger.info("unexpected exit message: %r", msg)
if state == 'init':
self.upstream_pub_socket.send_json(command)
if outeof and erreof and state == 'exiting':
state = 'exit'
logger.info("container ended: %r", exitmsg)
break break
def do_list(self, argv): def do_list(self, argv):
......
...@@ -137,8 +137,8 @@ class Daemon(object): ...@@ -137,8 +137,8 @@ class Daemon(object):
# setup io callbacks # setup io callbacks
outcb = partial(self.do_children_io, container_uuid, 'stdout') outcb = partial(self.do_children_io, container_uuid, 'stdout')
errcb = partial(self.do_children_io, container_uuid, 'stderr') errcb = partial(self.do_children_io, container_uuid, 'stderr')
container.process.stdout.read_until_close(outcb) container.process.stdout.read_until_close(outcb, outcb)
container.process.stderr.read_until_close(errcb) container.process.stderr.read_until_close(errcb, outcb)
elif command == 'kill': elif command == 'kill':
logger.info("asked to kill container: %r", msg) logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg['uuid']) response = self.container_manager.kill(msg['uuid'])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment