Commit f81b95e0 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

[refactor] move container start/exit to up_pub

Move the container start/exit events to the upstream pub/sub event
stream. As these are more of a global event now that we support multiple
commands in the same container, it makes sense to move them to the more
general event stream.

This patch also remove the code in cmd waiting for container start or
exit, making (temporarily) the cmd unable to report power metrics. We
will fix that in a later commit.

This patch fixes complicated issues we had with how a job running
multiple commands in the container might not all wait for the end of the
container: now none of them do.
parent c4e50535
...@@ -44,13 +44,14 @@ class CommandLineInterface(object): ...@@ -44,13 +44,14 @@ class CommandLineInterface(object):
# the command a container uuid as a way to make sure that we can make # the command a container uuid as a way to make sure that we can make
# the command idempotent. # the command idempotent.
environ = os.environ environ = os.environ
container_uuid = argv.ucontainername or str(uuid.uuid4())
command = {'api': 'up_rpc_req', command = {'api': 'up_rpc_req',
'type': 'run', 'type': 'run',
'manifest': argv.manifest, 'manifest': argv.manifest,
'path': argv.command, 'path': argv.command,
'args': argv.args, 'args': argv.args,
'environ': dict(environ), 'environ': dict(environ),
'container_uuid': argv.ucontainername or str(uuid.uuid4()), 'container_uuid': container_uuid,
} }
msg = RPC_MSG['run'](**command) msg = RPC_MSG['run'](**command)
# command fsm # command fsm
...@@ -63,11 +64,6 @@ class CommandLineInterface(object): ...@@ -63,11 +64,6 @@ class CommandLineInterface(object):
# the first message tells us if we started a container or not # the first message tells us if we started a container or not
msg = self.client.recvmsg() msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep' assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'process_start']
new_container = False
if msg.type == 'start':
new_container = True
msg = self.client.recvmsg()
assert msg.type == 'process_start' assert msg.type == 'process_start'
state = 'started' state = 'started'
while(True): while(True):
...@@ -85,17 +81,13 @@ class CommandLineInterface(object): ...@@ -85,17 +81,13 @@ class CommandLineInterface(object):
erreof = True erreof = True
elif msg.type == 'process_exit': elif msg.type == 'process_exit':
logger.info("process ended: %r", msg) logger.info("process ended: %r", msg)
if not new_container:
state = 'exiting'
elif msg.type == 'exit':
if state == 'started':
state = 'exiting' state = 'exiting'
exitmsg = msg exitmsg = msg
else: else:
logger.info("unexpected exit message: %r", msg) logger.error("unexpected message: %r", msg)
if outeof and erreof and state == 'exiting': if outeof and erreof and state == 'exiting':
state = 'exit' state = 'exit'
logger.info("container ended: %r", exitmsg) logger.info("command ended: %r", exitmsg)
break break
def do_list(self, argv): def do_list(self, argv):
......
...@@ -100,14 +100,14 @@ class Daemon(object): ...@@ -100,14 +100,14 @@ class Daemon(object):
p = container.power['profile'] p = container.power['profile']
p['start'] = self.machine_info['energy']['energy'] p['start'] = self.machine_info['energy']['energy']
p['start']['time'] = self.machine_info['time'] p['start']['time'] = self.machine_info['time']
update = {'api': 'up_rpc_rep', update = {'api': 'up_pub',
'type': 'start', 'type': 'container_start',
'container_uuid': container_uuid, 'container_uuid': container_uuid,
'errno': 0 if container else -1, 'errno': 0 if container else -1,
'power': container.power['policy'] or dict() 'power': container.power['policy'] or dict()
} }
self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update), self.upstream_pub_server.sendmsg(
client) PUB_MSG['container_start'](**update))
self.container_owner[container.uuid] = client self.container_owner[container.uuid] = client
# now deal with the process itself # now deal with the process itself
...@@ -219,8 +219,8 @@ class Daemon(object): ...@@ -219,8 +219,8 @@ class Daemon(object):
# kill everything # kill everything
if self.container_owner[container.uuid] == clientid: if self.container_owner[container.uuid] == clientid:
# deal with container exit # deal with container exit
msg = {'api': 'up_rpc_rep', msg = {'api': 'up_pub',
'type': 'exit', 'type': 'container_exit',
'container_uuid': container.uuid, 'container_uuid': container.uuid,
'profile_data': dict(), 'profile_data': dict(),
} }
...@@ -242,8 +242,8 @@ class Daemon(object): ...@@ -242,8 +242,8 @@ class Daemon(object):
container.uuid, diff) container.uuid, diff)
msg['profile_data'] = diff msg['profile_data'] = diff
self.container_manager.delete(container.uuid) self.container_manager.delete(container.uuid)
self.upstream_rpc_server.sendmsg( self.upstream_pub_server.sendmsg(
RPC_MSG['exit'](**msg), clientid) PUB_MSG['container_exit'](**msg))
del self.container_owner[container.uuid] del self.container_owner[container.uuid]
else: else:
logger.debug("child update ignored") logger.debug("child update ignored")
......
...@@ -29,23 +29,24 @@ MSGFORMATS['up_rpc_req'] = {'list': {}, ...@@ -29,23 +29,24 @@ MSGFORMATS['up_rpc_req'] = {'list': {},
'kill': {'container_uuid': basestring}, 'kill': {'container_uuid': basestring},
'setpower': {'limit': basestring}, 'setpower': {'limit': basestring},
} }
MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring, MSGFORMATS['up_rpc_rep'] = {'list': {'payload': list},
'errno': int,
'power': dict},
'list': {'payload': list},
'stdout': {'container_uuid': basestring, 'stdout': {'container_uuid': basestring,
'payload': basestring}, 'payload': basestring},
'stderr': {'container_uuid': basestring, 'stderr': {'container_uuid': basestring,
'payload': basestring}, 'payload': basestring},
'exit': {'container_uuid': basestring,
'profile_data': dict},
'process_start': {'container_uuid': basestring, 'process_start': {'container_uuid': basestring,
'pid': int}, 'pid': int},
'process_exit': {'container_uuid': basestring, 'process_exit': {'container_uuid': basestring,
'status': basestring}, 'status': basestring},
'getpower': {'limit': basestring}, 'getpower': {'limit': basestring},
} }
MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float}} MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float},
'container_start': {'container_uuid': basestring,
'errno': int,
'power': dict},
'container_exit': {'container_uuid': basestring,
'profile_data': dict},
}
# Mirror of the message formats, using namedtuples as the actual transport # Mirror of the message formats, using namedtuples as the actual transport
# for users of this messaging layer. # for users of this messaging layer.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment