GitLab maintenance scheduled for Today, 2019-12-05, from 17:00 to 18:00 CT - Services will be unavailable during this time.

Commit 6e0c1e7a authored by Swann Perarnau's avatar Swann Perarnau

[refactor/fix] always send process events for run

Current code sends start/exit events when a container is created and
process_start/process_exit when its already there. Instead, have the
container start/exit only care about container stuff, and always sends
the process start/exit events around. That makes the cmd run fsm easier
to work out.

Changes the message format a tiny bit.
Fixes some missing stdout/stderr issues we had before.
parent 2344824c
......@@ -59,28 +59,23 @@ class CommandLineInterface(object):
erreof = False
exitmsg = None
self.client.sendmsg(msg)
# the first message tells us if we started a container or not
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'process_start']
new_container = False
if msg.type == 'start':
new_container = True
msg = self.client.recvmsg()
assert msg.type == 'process_start'
state = 'started'
while(True):
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'stdout', 'stderr', 'exit',
'process_start', 'process_exit']
assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
if msg.type == 'start':
if state == 'init':
state = 'started'
logger.info("container started: %r", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'process_start':
if state == 'init':
state = 'started'
logger.info("process started in existing "
"container: %r""", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'stdout':
if msg.type == 'stdout':
logger.info("container msg: %r", msg)
if msg.payload == 'eof':
outeof = True
......@@ -90,7 +85,8 @@ class CommandLineInterface(object):
erreof = True
elif msg.type == 'process_exit':
logger.info("process ended: %r", msg)
break
if not new_container:
state = 'exiting'
elif msg.type == 'exit':
if state == 'started':
state = 'exiting'
......
......@@ -103,33 +103,26 @@ class Daemon(object):
'type': 'start',
'container_uuid': container_uuid,
'errno': 0 if container else -1,
'pid': pid,
'power': container.power['policy'] or dict()
}
self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
client)
# setup io callbacks
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
else:
update = {'api': 'up_rpc_rep',
'type': 'process_start',
'container_uuid': container_uuid,
}
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_start'](**update), client)
# setup io callbacks
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
# now deal with the process itself
update = {'api': 'up_rpc_rep',
'type': 'process_start',
'container_uuid': container_uuid,
'pid': pid,
}
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_start'](**update), client)
# setup io callbacks
outcb = partial(self.do_children_io, client, container_uuid,
'stdout')
errcb = partial(self.do_children_io, client, container_uuid,
'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
elif msg.type == 'kill':
logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg.container_uuid)
......@@ -205,16 +198,28 @@ class Daemon(object):
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid]
clientid = container.clientids[pid]
remaining_pids = [p for p in container.processes.keys()
if p != pid]
# first, send a process_exit
msg = {'api': 'up_rpc_rep',
'type': 'process_exit',
'status': str(status),
'container_uuid': container.uuid,
}
if not remaining_pids:
msg['type'] = 'exit'
msg['profile_data'] = dict()
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
if not container.processes:
# deal with container exit
msg = {'api': 'up_rpc_rep',
'type': 'exit',
'container_uuid': container.uuid,
'profile_data': dict(),
}
pp = container.power
if pp['policy']:
pp['manager'].reset_all()
......@@ -235,16 +240,6 @@ class Daemon(object):
self.container_manager.delete(container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['exit'](**msg), clientid)
else:
msg['type'] = 'process_exit'
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
else:
logger.debug("child update ignored")
pass
......
......@@ -31,7 +31,6 @@ MSGFORMATS['up_rpc_req'] = {'list': {},
}
MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'errno': int,
'pid': int,
'power': dict},
'list': {'payload': list},
'stdout': {'container_uuid': basestring,
......@@ -39,9 +38,9 @@ MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'stderr': {'container_uuid': basestring,
'payload': basestring},
'exit': {'container_uuid': basestring,
'status': basestring,
'profile_data': dict},
'process_start': {'container_uuid': basestring},
'process_start': {'container_uuid': basestring,
'pid': int},
'process_exit': {'container_uuid': basestring,
'status': basestring},
'getpower': {'limit': basestring},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment