Commit 92290b22 authored by Swann Perarnau's avatar Swann Perarnau

[feature] Pull the Argus code into the NRM

The Argus (globalos) launcher had prototype code to read a container
manifest, create a container using Judi's code, and map resources using

This patch brings that code, almost intact, into the NRM repo. This code
is quite ugly, and the resource mapping crashes if the kernel
configuration isn't right. But it's still a good starting point, and we
should be able to improve things little by little.

One part in particular needs attention: SIGCHLD handling. We should
think of using ioloop-provided facilities to avoid this mess.

The patch also contains the associated CLI changes.

Note: the messaging format is starting to be difficult to keep in check,
as there's conversions and field checks all over the code. See #3 for
a possible solution.
parent 5f6f9415
......@@ -15,8 +15,9 @@ class CommandLineInterface(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
def do_signal(self):
def do_signal(self, signum, stackframe):"received signal %d, exiting", signum)
def setup(self):
# SUB port to the upstream API (connected to its PUB port)
......@@ -74,7 +75,8 @@ class CommandLineInterface(object):
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['uuid'] == containerid:
pass"container response: %r", msg)
def do_setpower(self, argv):
""" Connect to the NRM and ask to change the power limit.
"acKind": "ImageManifest",
"acVersion": "0.6.0",
"name": "test",
"app": {
"isolators": [
"name": "argo/scheduler",
"value": {
"policy": "SCHED_OTHER",
"priority": "0"
"name": "argo/container",
"value": {
"cpus": "4",
"mems": "1"
"""Parse and Represent the APPC ACI specification."""
import collections
import logging
import json
logger = logging.getLogger('argus')
spec = collections.namedtuple('Field', ['cls', 'required'])
class SpecField(object):
"""Object part of the ACI Image Manifest fields."""
fields = {}
def __init__(self):
"""Create empty field."""
def load(self, data):
"""Load fields."""
for key in self.fields:
spec = self.fields[key]
if key not in data:
if spec.required:
logger.error("Missing key from manifest: %s", key)
return False
ok, v = self.loadfield(data[key], spec.cls)
if not ok:
logger.error("Error for key %s in %s", key, self.__class__)
return False
setattr(self, key, v)
return True
def loadfield(self, data, cls):
"""load data as if from a field of the provided cls.
Make sure the basic types are also respected.
ret = cls()
if not hasattr(ret, 'load'):
if not isinstance(data, cls):
logger.error("Wrong data type %s, expected: %s", cls,
return (False, None)
return (True, data)
return (ret.load(data), ret)
class Scheduler(SpecField):
"""Scheduler information for a container."""
fields = {"policy": spec(unicode, True),
"priority": spec(unicode, False)
def __init__(self):
"""Create scheduler object."""
def load(self, data):
"""Load configuration from json text."""
ret = super(Scheduler, self).load(data)
if not ret:
return ret
# check scheduler class & prio
if self.policy not in self.classes:
logger.error("Wrong scheduling class %s, not any of %r", data,
return False
if self.policy != "SCHED_OTHER":
logger.warning("scheduler priority forced as 0 " +
"for non default policies")
self.priority = "0"
return True
class CPUSet(SpecField):
"""Represent a CPUSet field."""
def __init__(self):
"""Create an empty set."""
def load(self, data):
"""Load from json object."""
self.value = data
return True
class MemSet(SpecField):
"""Represent a MemSet field."""
def __init__(self):
"""Create an empty set."""
def load(self, data):
"""Load from json object."""
self.value = data
return True
class Container(SpecField):
"""Container Information."""
fields = {"cpus": spec(CPUSet, True),
"mems": spec(MemSet, True)
def __init__(self):
"""Create empty container."""
def load(self, data):
"""Load container information."""
return super(Container, self).load(data)
class IsolatorList(SpecField):
"""Represent the list of isolator in a Manifest."""
types = {"argo/scheduler": spec(Scheduler, False),
"argo/container": spec(Container, True)
def __init__(self):
"""Create empty list."""
def load(self, data):
"""Load from json struct."""
for e in data:
name = e['name']
if name in self.types:
t = self.types[name]
ok, v = super(IsolatorList, self).loadfield(e['value'], t.cls)
if not ok:
logger.error("Error with %s in %s", name, self.__class__)
return False
setattr(self, name.lstrip("argo/"), v)
for k in self.types:
if self.types[k].required:
assert name.lstrip("argo/") in self.__dict__
return True
class App(SpecField):
"""Represent the App part of an Image Manifest."""
# attribute, subclass, required
fields = {"environment": spec(list, False),
"isolators": spec(IsolatorList, True),
def __init__(self):
"""Create empty container."""
def load(self, data):
"""Load from json dict."""
return super(App, self).load(data)
class ImageManifest(SpecField):
"""Represent an ACI Image Manifest."""
fields = {"acKind": spec(unicode, True),
"acVersion": spec(unicode, True),
"name": spec(unicode, True),
"app": spec(App, True)
def __init__(self):
"""Create empty manifest."""
def load(self, filename):
"""Load a manifest from JSON file."""
with open(filename, 'r') as f:
data = json.load(f)
return super(ImageManifest, self).load(data)
from __future__ import print_function
from aci import ImageManifest
import logging
import os
from subprograms import ChrtClient, NodeOSClient, HwlocClient
import sys
class ContainerManager(object):
"""Manages the creation, listing and deletion of containers, using a
container runtime underneath."""
def __init__(self):
self.containers = dict()
self.logger = logging.getLogger(__name__)
def create(self, request):
"""Create a container according to the request.
Returns the pid of the container or a negative number for errors."""
manifestfile = request['manifest']
command = request['file']
args = request['args']"run: manifest file: %s", manifestfile)"run: command: %s", command)"run: args: %r", args)
manifest = ImageManifest()
if not manifest.load(manifestfile):
self.logger.error("Manifest is invalid")
return -1
chrt = ChrtClient()
nodeos = NodeOSClient()
hwloc = HwlocClient()
# build context to execute
environ = os.environ
environ['PATH'] = ("/usr/local/sbin:"
environ['AC_APP_NAME'] =
environ['AC_METADATA_URL'] = "localhost"
environ['container'] = 'argo'"run: environ: %r", environ)
# Resource Mapping: the container spec gives us the number of cpus
# wanted by container.. We compute the number of times we can allocate
# that inside the system, assuming the container spec to be a valid
# request. We then use hwloc-distrib to map exclusive sets of cpus for
# each, and find one set that isn't in use yet.
# This is not the right way to do it, but it will work for now.
numcpus = int(
allresources =
self.logger.debug("resource info: %r", allresources)
ncontainers = len(allresources.cpus) // numcpus
self.logger.debug("will support %s containers", ncontainers)
cur = nodeos.getavailable()
self.logger.debug("%r are available", cur)
sets = hwloc.distrib(ncontainers, restrict=cur, fake=allresources)"asking for %s cores", numcpus)
self.logger.debug("will search in one of these: %r", sets)
# find a free set
avail = set(cur.cpus)
for s in sets:
cpuset = set(s.cpus)
if cpuset.issubset(avail):
alloc = s
self.logger.error("no exclusive cpuset found among %r", avail)
return -2
# create container
container_name = request['uuid']"creating container %s", container_name)
nodeos.create(container_name, alloc)"created container %s", container_name)
newpid = os.fork()"forked: new pid: %s", newpid)
if newpid == 0:
# move myself to that container
mypid = os.getpid()
nodeos.attach(container_name, mypid)"child: attached to container %s", container_name)
# run my command
if hasattr(, 'scheduler'):
chrt = ChrtClient(self.config)
args = chrt.getwrappedcmd(
args = []
self.logger.debug("execvpe %r", args)
os.execvpe(args[0], args, environ)
# should never happen
return newpid
def delete(self, uuid):
"""Delete a container and kill all related processes."""
nodeos = NodeOSClient()
nodeos.delete(uuid, kill=True)
from __future__ import print_function
import containers
import json
import logging
import os
import re
import sensor
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
import sensor
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'max': 'max'},
......@@ -113,6 +116,22 @@ class Daemon(object):"new target measure: %g",
elif command == 'run':"new container required: %r", msg)
pid = self.container_manager.create(msg)
if pid > 0:
self.children[pid] = msg['uuid']
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'uuid': msg['uuid'],
'errno': 0,
'pid': pid,
update = {'type': 'container',
'uuid': msg['uuid'],
'errno': pid,
self.logger.error("invalid command: %r", command)
......@@ -144,7 +163,37 @@ class Daemon(object):"application now in state: %s", application.state)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
elif signum == signal.SIGCHLD:
self.logger.error("wrong signal: %d", signum)
def do_children(self):
# find out if children have terminated
while True:
ret = os.wait3(os.WNOHANG)
if ret == (0, 0):
except OSError:
pid, status = ret"child update: %d, %r", pid, status)
# check if this is an exit
if os.WIFEXITED(status):
# TODO: update container tracking
msg = {'type': 'container',
'event': 'exit',
'status': status,
'uuid': None,
# ignore on purpose
def do_shutdown(self):
......@@ -191,6 +240,9 @@ class Daemon(object):
# create a stream to let ioloop deal with blocking calls on HWM
self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
# create container manager
self.container_manager = containers.ContainerManager()
# create sensor manager and make first measurement
self.sensor = sensor.SensorManager()
......@@ -205,6 +257,7 @@ class Daemon(object):
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
signal.signal(signal.SIGCHLD, self.do_signal)
"""Various clients for system utilities."""
import subprocess
import collections
import logging
resources = collections.namedtuple("Resources", ["cpus", "mems"])
def logpopen(p, args, stdout, stderr):
"""log popen cmd."""
logging.debug("popen cmd: %r", args)
logging.debug("popen return code: %s", p.returncode)
logging.debug("popen stdout: %r", stdout)
logging.debug("popen, stderr: %r", stderr)
def bitmask2list(mask):
"""Convert a bitmask to the list of power of 2 set to 1."""
i = int(mask or '0x0', base=16)
ret = []
for j in range(i.bit_length()):
m = 1 << j
if (i & m):
return ret
def list2bitmask(l):
"""Convert a list into a bitmask."""
m = 0
for e in l:
m |= 1 << e
return hex(m)
class NodeOSClient(object):
"""Client to argo_nodeos_config."""
def __init__(self):
"""Load client configuration."""
self.prefix = "argo_nodeos_config"
def getavailable(self):
"""Gather available resources."""
args = [self.prefix, "--show_available_resources=shared:false"]
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
# parse the format: first line is threads, then a list as multiline,
# then nodes, and the same
cpus = []
mems = []
lines = stdout.splitlines()
splitindex = lines.index('------------Memory nodes------------')
cpuslines = lines[1:splitindex]
memlines = lines[splitindex+1:]
for l in cpuslines:
for l in memlines:
return resources([int(x) for x in cpus], [int(x) for x in mems])
def create(self, name, params):
"""Create container, according to params."""
args = [self.prefix]
cmd = "--create_container="
cmd += 'name:{0}'.format(name)
cmd += ' cpus:[{0}]'.format(",".join([str(x) for x in params.cpus]))
cmd += ' mems:[{0}]'.format(",".join([str(x) for x in params.mems]))
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
def attach(self, name, pid):
"""Attach a pid to a container."""
args = [self.prefix]
cmd = '--attach_to_container='
cmd += 'name:{0}'.format(name)
cmd += ' pids:[{0}]'.format(pid)
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
def delete(self, name, kill=False):
"""Destroy container."""
# destroy container
args = [self.prefix]
cmd = '--delete_container='
cmd += 'name:{0}'.format(name)
if kill:
cmd += ' kill_content:true'
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
class ChrtClient(object):
"""Client to chrt command line wrapper."""
flags = {'SCHED_OTHER': '--other',
'SCHED_BATCH': '--batch',
'SCHED_FIFO': '--fifo',
'SCHED_IDLE': '--idle',
'SCHED_RR': '--rr',
'SCHED_HPC': '--hpc'
def __init__(self):
"""Load configuration."""
self.prefix = "chrt"
def getwrappedcmd(self, params):
"""Return a list of args to prepend to a popen call."""
args = [self.prefix]
return args
class HwlocClient(object):
"""Client to hwloc binaries."""
def __init__(self):
"""Load configuration."""
self.prefix = "hwloc"
def info(self):
"""Return list of all cpus and mems."""
cmd = self.prefix + "-info"
args = [cmd, '--whole-system']
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
lines = stdout.splitlines()
ret = resources([], [])
for l in lines:
numa = l.find('NUMANode')
pu = l.find('PU')
if numa != -1:
fields = l[:numa].split()
if pu != -1:
fields = l[:pu].split()
return ret
def all2fake(self, resources):
"""Convert resource description of the system into fake topology.
We need that because hwloc barfs on fake numa nodes.
# easy version: we have as many numa nodes as we have cores
mems = len(resources.mems)
cpus = len(resources.mems)
assert cpus % mems == 0
pu = cpus // mems
return "numa: %s pu:%s".format(mems, pu)
def distrib(self, numprocs, restrict=None, fake=None):
"""Distribute numprocs across the hierarchy."""
# The original command only reports back cpusets. We do better, by
# reporting the mems that go with it. This requires some magic, using
# hwloc-ls to find the numa node associated with a cpuset reported by
# distrib
allresources =
cmd = [self.prefix + "-distrib"]
if fake:
cmd.extend(['-i', self.all2fake(fake)])
args = cmd + ["--whole-system", "--taskset", str(numprocs)]
if restrict:
mask = list2bitmask(restrict.cpus)
args.extend(['--restrict', mask])
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
cpusets = stdout.splitlines()
dret = {}
for c in cpusets:
dret[c] = resources(bitmask2list(c), [])
# list all resources, and display cpusets too
# this will give us the memories associated with each cpuset.
cmd = [self.prefix + "-ls"]
if fake:
cmd.extend(['-i', self.all2fake(fake)])
args = cmd + ["--whole-system", "-c", "--taskset"]
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
lines = stdout.splitlines()
curmem = allresources.mems
for l in lines:
pos = l.find('cpuset=')
if pos != -1:
c = l[l.find('cpuset='):].lstrip('cpuset=')
numa = l.find('NUMANode')
cset = set(bitmask2list(c))
if numa != -1:
uid = int(l.split()[1].lstrip('L#'))
curmem = [uid]
for mask in dret:
cs = set(bitmask2list(mask))
if cset.issubset(cs):
# At this point, we have valid cpusets, but the mems associated are not
# restricted, and not necessarily the right amount. We need to:
# - remove memories for the restricted set
# - split each (cpuset, mems) that is too big into a list of memset
# choices of the right size
return dret.values()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment