Commit 848a9755 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

Merge branch 'containers-launch' into 'master'

Container launching implementation

See merge request !3
parents e9848601 1c4645cc
......@@ -15,8 +15,9 @@ class CommandLineInterface(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
def do_signal(self):
def do_signal(self, signum, stackframe):"received signal %d, exiting", signum)
def setup(self):
# SUB port to the upstream API (connected to its PUB port)
......@@ -50,7 +51,32 @@ class CommandLineInterface(object):"client uuid: %r", self.uuid)
def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command
in it.
The NRM should notify us on the pub socket of the container
# build the command as a JSON dict containing enough info. We add to
# the command a container uuid as a way to make sure that we can make
# the command idempotent.
containerid = str(uuid.uuid4())
command = {'command': 'run',
'manifest': argv.manifest,
'file': argv.command,
'args': argv.args,
'uuid': containerid,
msg = self.upstream_sub_socket.recv_json()"new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['uuid'] == containerid:"container response: %r", msg)
def do_setpower(self, argv):
""" Connect to the NRM and ask to change the power limit.
......@@ -85,7 +111,10 @@ class CommandLineInterface(object):
# run container
parser_run = subparsers.add_parser("run")
parser_run.add_argument("manifest", help="manifest file to apply")
parser_run.add_argument("command", help="command to execute")
parser_run.add_argument("args", help="command arguments",
# setpowerlimit
"acKind": "ImageManifest",
"acVersion": "0.6.0",
"name": "test",
"app": {
"isolators": [
"name": "argo/scheduler",
"value": {
"policy": "SCHED_OTHER",
"priority": "0"
"name": "argo/container",
"value": {
"cpus": "4",
"mems": "1"
"""Parse and Represent the APPC ACI specification."""
import collections
import logging
import json
logger = logging.getLogger('argus')
spec = collections.namedtuple('Field', ['cls', 'required'])
class SpecField(object):
"""Object part of the ACI Image Manifest fields."""
fields = {}
def __init__(self):
"""Create empty field."""
def load(self, data):
"""Load fields."""
for key in self.fields:
spec = self.fields[key]
if key not in data:
if spec.required:
logger.error("Missing key from manifest: %s", key)
return False
ok, v = self.loadfield(data[key], spec.cls)
if not ok:
logger.error("Error for key %s in %s", key, self.__class__)
return False
setattr(self, key, v)
return True
def loadfield(self, data, cls):
"""load data as if from a field of the provided cls.
Make sure the basic types are also respected.
ret = cls()
if not hasattr(ret, 'load'):
if not isinstance(data, cls):
logger.error("Wrong data type %s, expected: %s", cls,
return (False, None)
return (True, data)
return (ret.load(data), ret)
class Scheduler(SpecField):
"""Scheduler information for a container."""
fields = {"policy": spec(unicode, True),
"priority": spec(unicode, False)
def __init__(self):
"""Create scheduler object."""
def load(self, data):
"""Load configuration from json text."""
ret = super(Scheduler, self).load(data)
if not ret:
return ret
# check scheduler class & prio
if self.policy not in self.classes:
logger.error("Wrong scheduling class %s, not any of %r", data,
return False
if self.policy != "SCHED_OTHER":
logger.warning("scheduler priority forced as 0 " +
"for non default policies")
self.priority = "0"
return True
class CPUSet(SpecField):
"""Represent a CPUSet field."""
def __init__(self):
"""Create an empty set."""
def load(self, data):
"""Load from json object."""
self.value = data
return True
class MemSet(SpecField):
"""Represent a MemSet field."""
def __init__(self):
"""Create an empty set."""
def load(self, data):
"""Load from json object."""
self.value = data
return True
class Container(SpecField):
"""Container Information."""
fields = {"cpus": spec(CPUSet, True),
"mems": spec(MemSet, True)
def __init__(self):
"""Create empty container."""
def load(self, data):
"""Load container information."""
return super(Container, self).load(data)
class IsolatorList(SpecField):
"""Represent the list of isolator in a Manifest."""
types = {"argo/scheduler": spec(Scheduler, False),
"argo/container": spec(Container, True)
def __init__(self):
"""Create empty list."""
def load(self, data):
"""Load from json struct."""
for e in data:
name = e['name']
if name in self.types:
t = self.types[name]
ok, v = super(IsolatorList, self).loadfield(e['value'], t.cls)
if not ok:
logger.error("Error with %s in %s", name, self.__class__)
return False
setattr(self, name.lstrip("argo/"), v)
for k in self.types:
if self.types[k].required:
assert name.lstrip("argo/") in self.__dict__
return True
class App(SpecField):
"""Represent the App part of an Image Manifest."""
# attribute, subclass, required
fields = {"environment": spec(list, False),
"isolators": spec(IsolatorList, True),
def __init__(self):
"""Create empty container."""
def load(self, data):
"""Load from json dict."""
return super(App, self).load(data)
class ImageManifest(SpecField):
"""Represent an ACI Image Manifest."""
fields = {"acKind": spec(unicode, True),
"acVersion": spec(unicode, True),
"name": spec(unicode, True),
"app": spec(App, True)
def __init__(self):
"""Create empty manifest."""
def load(self, filename):
"""Load a manifest from JSON file."""
with open(filename, 'r') as f:
data = json.load(f)
return super(ImageManifest, self).load(data)
from __future__ import print_function
from aci import ImageManifest
from collections import namedtuple
import logging
import os
from subprograms import ChrtClient, NodeOSClient, resources
import sys
Container = namedtuple('Container', ['uuid', 'manifest', 'pid'])
class ContainerManager(object):
"""Manages the creation, listing and deletion of containers, using a
container runtime underneath."""
def __init__(self, rm):
self.containers = dict()
self.pids = dict()
self.logger = logging.getLogger(__name__)
self.resourcemanager = rm
self.nodeos = NodeOSClient()
self.chrt = ChrtClient()
def create(self, request):
"""Create a container according to the request.
Returns the pid of the container or a negative number for errors."""
manifestfile = request['manifest']
command = request['file']
args = request['args']"run: manifest file: %s", manifestfile)"run: command: %s", command)"run: args: %r", args)
manifest = ImageManifest()
if not manifest.load(manifestfile):
self.logger.error("Manifest is invalid")
return -1
# ask the resource manager for resources
req = resources(int(,
allocation = self.resourcemanager.schedule(request['uuid'], req)"run: allocation: %r", allocation)
# build context to execute
environ = os.environ
environ['PATH'] = ("/usr/local/sbin:"
environ['AC_APP_NAME'] =
environ['AC_METADATA_URL'] = "localhost"
environ['container'] = 'argo'"run: environ: %r", environ)
# create container
container_name = request['uuid']"creating container %s", container_name)
self.nodeos.create(container_name, allocation)"created container %s", container_name)
newpid = os.fork()"forked: new pid: %s", newpid)
if newpid == 0:
# move myself to that container
mypid = os.getpid()
self.nodeos.attach(container_name, mypid)"child: attached to container %s", container_name)
# run my command
if hasattr(, 'scheduler'):
sched =
argv = self.chrt.getwrappedcmd(sched)
argv = []
self.logger.debug("execvpe %r", argv)
os.execvpe(argv[0], argv, environ)
# should never happen
c = Container(container_name, manifest, newpid)
self.pids[newpid] = c
self.containers[container_name] = c
return newpid
def delete(self, uuid):
"""Delete a container and kill all related processes."""
self.nodeos.delete(uuid, kill=True)
c = self.containers[uuid]
del self.containers[uuid]
del self.pids[]
from __future__ import print_function
from containers import ContainerManager
from resources import ResourceManager
import json
import logging
import os
import re
import sensor
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
import sensor
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'max': 'max'},
......@@ -69,6 +73,7 @@ class Application(object):
class Daemon(object):
def __init__(self):
self.applications = {}
self.containerpids = {}
self.buf = ''
self.logger = logging.getLogger(__name__) = 1.0
......@@ -99,12 +104,38 @@ class Daemon(object):
def do_upstream_receive(self, parts):"receiving upstream message: %r", parts)
if len(parts) != 1:
self.logger.error("unexpected msg length, droping it: %r", parts)
self.logger.error("unexpected msg length, dropping it: %r", parts)
msg = json.loads(parts[0])
if isinstance(msg, dict) and msg.get('command') == 'setpower': = float(msg['limit'])"new target measure: %g",
if isinstance(msg, dict):
command = msg.get('command')
# TODO: switch to a dispatch dictionary
if command is None:
self.logger.error("missing command in message: %r", msg)
if command == 'setpower': = float(msg['limit'])"new target measure: %g",
elif command == 'run':"new container required: %r", msg)
pid = self.container_manager.create(msg)
if pid > 0:
self.containerpids[pid] = msg['uuid']
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'uuid': msg['uuid'],
'errno': 0,
'pid': pid,
update = {'type': 'container',
'uuid': msg['uuid'],
'errno': pid,
self.logger.error("invalid command: %r", command)
def do_sensor(self):
self.machine_info = self.sensor.do_update()
......@@ -134,7 +165,39 @@ class Daemon(object):"application now in state: %s", application.state)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
elif signum == signal.SIGCHLD:
self.logger.error("wrong signal: %d", signum)
def do_children(self):
# find out if children have terminated
while True:
pid, status, rusage = os.wait3(os.WNOHANG)
if pid == 0 and status == 0:
except OSError:
break"child update %d: %r", pid, status)
# check if its a pid we care about
if pid in self.containerpids:
# check if this is an exit
if os.WIFEXITED(status):
uuid = self.containerpids[pid]
msg = {'type': 'container',
'event': 'exit',
'status': status,
'uuid': None,
self.logger.debug("child update ignored")
def do_shutdown(self):
......@@ -181,6 +244,10 @@ class Daemon(object):
# create a stream to let ioloop deal with blocking calls on HWM
self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
# create resource and container manager
self.resource_manager = ResourceManager()
self.container_manager = ContainerManager(self.resource_manager)
# create sensor manager and make first measurement
self.sensor = sensor.SensorManager()
......@@ -195,6 +262,7 @@ class Daemon(object):
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
signal.signal(signal.SIGCHLD, self.do_signal)
from __future__ import print_function
import logging
from subprograms import HwlocClient, resources
class ResourceManager(object):
"""Manages the query of node resources, the tracking of their use and
the scheduling of new containers according to partitioning rules."""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.hwloc = HwlocClient()
# query the node topo, keep track of the critical resources
self.allresources =
self.logger.debug("resource info: %r", self.allresources)
self.available = self.allresources
def schedule(self, uuid, request):
"""Schedule a resource request on the available resources.
Request is a dictionary of the resources asked for."""
# dumb scheduling, just give the first resources available:
# - cpus are exclusive
# - memories exclusive if more than one left
if len(self.available.cpus) >= request.cpus:
retcpus = self.available.cpus[:request.cpus]
availcpus = self.available.cpus[request.cpus:]
retcpus = []
availcpus = self.available.cpus
if len(self.available.mems) > 1:
retmems = self.available.mems[:request.mems]
availmems = self.available.mems[request.mems:]
retmems = self.available.mems
availmems = self.available.mems
self.available = resources(availcpus, availmems)
return resources(retcpus, retmems)
def remove(self, uuid):
"""Free the resources associated with request uuid."""
# def oldcode(self):
# numcpus = int(
# allresources =
# self.logger.debug("resource info: %r", allresources)
# ncontainers = len(allresources.cpus) // numcpus
# self.logger.debug("will support %s containers", ncontainers)
# cur = nodeos.getavailable()
# self.logger.debug("%r are available", cur)
# sets = hwloc.distrib(ncontainers, restrict=cur, fake=allresources)
#"asking for %s cores", numcpus)
# self.logger.debug("will search in one of these: %r", sets)
# # find a free set
# avail = set(cur.cpus)
# for s in sets:
# cpuset = set(s.cpus)
# if cpuset.issubset(avail):
# alloc = s
# break
# else:
# self.logger.error("no exclusive cpuset found among %r", avail)
# return -2
"""Various clients for system utilities."""
import subprocess
import collections
import logging
import xml.etree.ElementTree
resources = collections.namedtuple("Resources", ["cpus", "mems"])
def logpopen(p, args, stdout, stderr):
"""log popen cmd."""
logging.debug("popen cmd: %r", args)
logging.debug("popen return code: %s", p.returncode)
logging.debug("popen stdout: %r", stdout)
logging.debug("popen, stderr: %r", stderr)
def bitmask2list(mask):
"""Convert a bitmask to the list of power of 2 set to 1."""
i = int(mask or '0x0', base=16)
ret = []
for j in range(i.bit_length()):
m = 1 << j
if (i & m):
return ret
def list2bitmask(l):
"""Convert a list into a bitmask."""
m = 0
for e in l:
m |= 1 << e
return hex(m)
class NodeOSClient(object):
"""Client to argo_nodeos_config."""
def __init__(self):
"""Load client configuration."""
self.prefix = "argo_nodeos_config"
def getavailable(self):
"""Gather available resources."""
args = [self.prefix, "--show_available_resources=shared:false"]
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
# parse the format: first line is threads, then a list as multiline,
# then nodes, and the same
cpus = []
mems = []
lines = stdout.splitlines()
splitindex = lines.index('------------Memory nodes------------')