Commit 68784add authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'switch-to-subprocess' into 'master'

Switch to subprocess

See merge request !6
parents ed33ef6d 957deb8d
......@@ -67,16 +67,43 @@ class CommandLineInterface(object):
'args': argv.args,
'uuid': containerid,
}
# command fsm
state = 'init'
outeof = False
erreof = False
exitmsg = None
self.upstream_pub_socket.send_json(command)
while(True):
self.upstream_pub_socket.send_json(command)
msg = self.upstream_sub_socket.recv_json()
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['uuid'] == containerid:
logger.info("container response: %r", msg)
break
if msg['event'] == 'start':
if state == 'init':
state = 'started'
logger.info("container started: %r", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg['event'] == 'stdout':
logger.info("container msg: %r", msg)
if msg['payload'] == 'eof':
outeof = True
elif msg['event'] == 'stderr':
logger.info("container msg: %r", msg)
if msg['payload'] == 'eof':
erreof = True
elif msg['event'] == 'exit':
if state == 'started':
state = 'exiting'
exitmsg = msg
else:
logger.info("unexpected exit message: %r", msg)
if state == 'init':
self.upstream_pub_socket.send_json(command)
if outeof and erreof and state == 'exiting':
state = 'exit'
logger.info("container ended: %r", exitmsg)
break
def do_list(self, argv):
"""Connect to the NRM and ask to list the containers present on the
......
......@@ -4,12 +4,10 @@ from aci import ImageManifest
from collections import namedtuple
import logging
import os
import signal
from subprograms import ChrtClient, NodeOSClient, resources
import sys
logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'pid'])
Container = namedtuple('Container', ['uuid', 'manifest', 'process'])
class ContainerManager(object):
......@@ -37,7 +35,7 @@ class ContainerManager(object):
manifest = ImageManifest()
if not manifest.load(manifestfile):
logger.error("Manifest is invalid")
return -1
return None
# ask the resource manager for resources
req = resources(int(manifest.app.isolators.container.cpus.value),
......@@ -60,32 +58,20 @@ class ContainerManager(object):
self.nodeos.create(container_name, allocation)
logger.info("created container %s", container_name)
newpid = os.fork()
logger.info("forked: new pid: %s", newpid)
if newpid == 0:
# move myself to that container
mypid = os.getpid()
self.nodeos.attach(container_name, mypid)
logger.info("child: attached to container %s", container_name)
# run my command
if hasattr(manifest.app.isolators, 'scheduler'):
sched = manifest.app.isolators.scheduler
argv = self.chrt.getwrappedcmd(sched)
else:
argv = []
argv.append(command)
argv.extend(args)
logger.debug("execvpe %r", argv)
os.execvpe(argv[0], argv, environ)
# should never happen
sys.exit(1)
# run my command
if hasattr(manifest.app.isolators, 'scheduler'):
sched = manifest.app.isolators.scheduler
argv = self.chrt.getwrappedcmd(sched)
else:
c = Container(container_name, manifest, newpid)
self.pids[newpid] = c
self.containers[container_name] = c
return newpid
argv = []
argv.append(command)
argv.extend(args)
process = self.nodeos.execute(container_name, argv, environ)
c = Container(container_name, manifest, process)
self.pids[process.pid] = c
self.containers[container_name] = c
return c
def delete(self, uuid):
"""Delete a container and kill all related processes."""
......@@ -93,7 +79,7 @@ class ContainerManager(object):
self.resourcemanager.update(uuid)
c = self.containers[uuid]
del self.containers[uuid]
del self.pids[c.pid]
del self.pids[c.process.pid]
def kill(self, uuid):
"""Kill all the processes of a container."""
......@@ -101,12 +87,11 @@ class ContainerManager(object):
c = self.containers[uuid]
logger.debug("killing %r:", c)
try:
os.kill(c.pid, signal.SIGKILL)
c.process.proc.terminate()
except OSError:
pass
def list(self):
"""List the containers in the system."""
fields = ['uuid', 'pid']
ret = [c._asdict() for c in self.containers.values()]
return [{k: d[k] for k in fields} for d in ret]
return [{'uuid': c.uuid, 'pid': c.process.pid} for c in
self.containers.values()]
......@@ -2,6 +2,7 @@ from __future__ import print_function
from containers import ContainerManager
from resources import ResourceManager
from functools import partial
import json
import logging
import os
......@@ -75,7 +76,6 @@ class Application(object):
class Daemon(object):
def __init__(self):
self.applications = {}
self.containerpids = {}
self.buf = ''
self.target = 1.0
......@@ -118,25 +118,27 @@ class Daemon(object):
self.target = float(msg['limit'])
logger.info("new target measure: %g", self.target)
elif command == 'run':
container_uuid = msg['uuid']
if container_uuid in self.container_manager.containers:
logger.info("container already created: %r",
container_uuid)
return
logger.info("new container required: %r", msg)
pid = self.container_manager.create(msg)
if pid > 0:
self.containerpids[pid] = msg['uuid']
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'event': 'start',
'uuid': msg['uuid'],
'errno': 0,
'pid': pid,
}
self.upstream_pub.send_json(update)
else:
update = {'type': 'container',
'event': 'start',
'uuid': msg['uuid'],
'errno': pid,
}
self.upstream_pub.send_json(update)
container = self.container_manager.create(msg)
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'event': 'start',
'uuid': container_uuid,
'errno': 0 if container else -1,
'pid': container.process.pid,
}
self.upstream_pub.send_json(update)
# setup io callbacks
outcb = partial(self.do_children_io, container_uuid, 'stdout')
errcb = partial(self.do_children_io, container_uuid, 'stderr')
container.process.stdout.read_until_close(outcb, outcb)
container.process.stderr.read_until_close(errcb, outcb)
elif command == 'kill':
logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg['uuid'])
......@@ -152,6 +154,18 @@ class Daemon(object):
else:
logger.error("invalid command: %r", command)
def do_children_io(self, uuid, io, data):
"""Receive data from one of the children, and send it down the pipe.
Meant to be partially defined on a children basis."""
logger.info("%r received %r data: %r", uuid, io, data)
update = {'type': 'container',
'event': io,
'uuid': uuid,
'payload': data or 'eof',
}
self.upstream_pub.send_json(update)
def do_sensor(self):
self.machine_info = self.sensor.do_update()
logger.info("current state: %r", self.machine_info)
......@@ -199,15 +213,15 @@ class Daemon(object):
logger.info("child update %d: %r", pid, status)
# check if its a pid we care about
if pid in self.containerpids:
if pid in self.container_manager.pids:
# check if this is an exit
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
uuid = self.containerpids[pid]
self.container_manager.delete(uuid)
container = self.container_manager.pids[pid]
self.container_manager.delete(container.uuid)
msg = {'type': 'container',
'event': 'exit',
'status': status,
'uuid': uuid,
'uuid': container.uuid,
}
self.upstream_pub.send_json(msg)
else:
......
"""Various clients for system utilities."""
import subprocess
import collections
import logging
import xml.etree.ElementTree
import tornado.process as process
import subprocess
logger = logging.getLogger('nrm')
resources = collections.namedtuple("Resources", ["cpus", "mems"])
......@@ -103,6 +104,22 @@ class NodeOSClient(object):
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
def execute(self, name, argv, environ):
"""Execute argv inside container."""
args = [self.prefix]
cmd = '--exec='
cmd += 'name:{0}'.format(name)
# argo_nodeos_config takes argv as 'arg0 arg1 ...' so we need to merge
# the arguments into a single list, with single quotes. We also need
# to escape spaces from arguments before.
argv = [s.replace(' ', r'\ ') for s in argv]
cmd += " argv:'"+" ".join(argv)+"'"
args.append(cmd)
return process.Subprocess(args, stdin=process.Subprocess.STREAM,
stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM,
env=environ)
class ChrtClient(object):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment