Commit edeb413b authored by Swann Perarnau's avatar Swann Perarnau

[feature] Use argo_nodeos_config --exec

Use the new argo_nodeos_config --exec feature in development.
Allow us to delegate fork+attach+exec to argo_nodeos_config, and
simplifying the create command as a result.

We use tornado.process to wrap this command, as we want to able to
stream stdout/stderr in the future.

This patch also misuse, the 'pid' field of the container namedtuple to
save the tornado.process.Subprocess object itself, so some functions
need to be adapted.
parent ed33ef6d
...@@ -6,7 +6,6 @@ import logging ...@@ -6,7 +6,6 @@ import logging
import os import os
import signal import signal
from subprograms import ChrtClient, NodeOSClient, resources from subprograms import ChrtClient, NodeOSClient, resources
import sys
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'pid']) Container = namedtuple('Container', ['uuid', 'manifest', 'pid'])
...@@ -60,32 +59,20 @@ class ContainerManager(object): ...@@ -60,32 +59,20 @@ class ContainerManager(object):
self.nodeos.create(container_name, allocation) self.nodeos.create(container_name, allocation)
logger.info("created container %s", container_name) logger.info("created container %s", container_name)
newpid = os.fork() # run my command
logger.info("forked: new pid: %s", newpid) if hasattr(manifest.app.isolators, 'scheduler'):
if newpid == 0: sched = manifest.app.isolators.scheduler
# move myself to that container argv = self.chrt.getwrappedcmd(sched)
mypid = os.getpid()
self.nodeos.attach(container_name, mypid)
logger.info("child: attached to container %s", container_name)
# run my command
if hasattr(manifest.app.isolators, 'scheduler'):
sched = manifest.app.isolators.scheduler
argv = self.chrt.getwrappedcmd(sched)
else:
argv = []
argv.append(command)
argv.extend(args)
logger.debug("execvpe %r", argv)
os.execvpe(argv[0], argv, environ)
# should never happen
sys.exit(1)
else: else:
c = Container(container_name, manifest, newpid) argv = []
self.pids[newpid] = c
self.containers[container_name] = c argv.append(command)
return newpid argv.extend(args)
process = self.nodeos.execute(container_name, argv, environ)
c = Container(container_name, manifest, process)
self.pids[process.pid] = c
self.containers[container_name] = c
return process.pid
def delete(self, uuid): def delete(self, uuid):
"""Delete a container and kill all related processes.""" """Delete a container and kill all related processes."""
...@@ -93,7 +80,7 @@ class ContainerManager(object): ...@@ -93,7 +80,7 @@ class ContainerManager(object):
self.resourcemanager.update(uuid) self.resourcemanager.update(uuid)
c = self.containers[uuid] c = self.containers[uuid]
del self.containers[uuid] del self.containers[uuid]
del self.pids[c.pid] del self.pids[c.pid.pid]
def kill(self, uuid): def kill(self, uuid):
"""Kill all the processes of a container.""" """Kill all the processes of a container."""
...@@ -101,12 +88,11 @@ class ContainerManager(object): ...@@ -101,12 +88,11 @@ class ContainerManager(object):
c = self.containers[uuid] c = self.containers[uuid]
logger.debug("killing %r:", c) logger.debug("killing %r:", c)
try: try:
os.kill(c.pid, signal.SIGKILL) c.pid.terminate()
except OSError: except OSError:
pass pass
def list(self): def list(self):
"""List the containers in the system.""" """List the containers in the system."""
fields = ['uuid', 'pid'] return [{'uuid': c.uuid, 'pid': c.pid.pid} for c in
ret = [c._asdict() for c in self.containers.values()] self.containers.values()]
return [{k: d[k] for k in fields} for d in ret]
"""Various clients for system utilities.""" """Various clients for system utilities."""
import subprocess
import collections import collections
import logging import logging
import xml.etree.ElementTree import xml.etree.ElementTree
import tornado.process as process
import subprocess
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
resources = collections.namedtuple("Resources", ["cpus", "mems"]) resources = collections.namedtuple("Resources", ["cpus", "mems"])
...@@ -103,6 +104,22 @@ class NodeOSClient(object): ...@@ -103,6 +104,22 @@ class NodeOSClient(object):
stdout, stderr = p.communicate() stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr) logpopen(p, args, stdout, stderr)
def execute(self, name, argv, environ):
"""Execute argv inside container."""
args = [self.prefix]
cmd = '--exec='
cmd += 'name:{0}'.format(name)
# argo_nodeos_config takes argv as 'arg0 arg1 ...' so we need to merge
# the arguments into a single list, with single quotes. We also need
# to escape spaces from arguments before.
argv = [s.replace(' ', r'\ ') for s in argv]
cmd += " argv:'"+" ".join(argv)+"'"
args.append(cmd)
return process.Subprocess(args, stdin=process.Subprocess.STREAM,
stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM,
env=environ)
class ChrtClient(object): class ChrtClient(object):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment