Commit 1c4645cc authored by Swann Perarnau's avatar Swann Perarnau

[feature] Implement simple RM for containers

This patch refactor the resource management and hwloc code into a
working, albeit very simple scheduling policy. Indeed, the previous code
contained strong assumptions about the output of hwloc matching an Argo
NodeOS configuration used during the previous phase of the project, that
always contained enough CPUs and Mems to perform exclusive scheduling.

The current version is simpler, but should work on more regular systems.
The patch also improves code organization so that introducing more
complex scheduling algorithms will be simpler.

The testing of this code resulted in the discovery of simple bugs in the
daemon children handling code, which should work now.
parent 92290b22
from __future__ import print_function from __future__ import print_function
from aci import ImageManifest from aci import ImageManifest
from collections import namedtuple
import logging import logging
import os import os
from subprograms import ChrtClient, NodeOSClient, HwlocClient from subprograms import ChrtClient, NodeOSClient, resources
import sys import sys
Container = namedtuple('Container', ['uuid', 'manifest', 'pid'])
class ContainerManager(object): class ContainerManager(object):
"""Manages the creation, listing and deletion of containers, using a """Manages the creation, listing and deletion of containers, using a
container runtime underneath.""" container runtime underneath."""
def __init__(self): def __init__(self, rm):
self.containers = dict() self.containers = dict()
self.pids = dict()
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.resourcemanager = rm
self.nodeos = NodeOSClient()
self.chrt = ChrtClient()
def create(self, request): def create(self, request):
"""Create a container according to the request. """Create a container according to the request.
...@@ -31,9 +38,11 @@ class ContainerManager(object): ...@@ -31,9 +38,11 @@ class ContainerManager(object):
self.logger.error("Manifest is invalid") self.logger.error("Manifest is invalid")
return -1 return -1
chrt = ChrtClient() # ask the resource manager for resources
nodeos = NodeOSClient() req = resources(int(manifest.app.isolators.container.cpus.value),
hwloc = HwlocClient() int(manifest.app.isolators.container.mems.value))
allocation = self.resourcemanager.schedule(request['uuid'], req)
self.logger.info("run: allocation: %r", allocation)
# build context to execute # build context to execute
environ = os.environ environ = os.environ
...@@ -44,38 +53,10 @@ class ContainerManager(object): ...@@ -44,38 +53,10 @@ class ContainerManager(object):
environ['container'] = 'argo' environ['container'] = 'argo'
self.logger.info("run: environ: %r", environ) self.logger.info("run: environ: %r", environ)
# Resource Mapping: the container spec gives us the number of cpus
# wanted by container.. We compute the number of times we can allocate
# that inside the system, assuming the container spec to be a valid
# request. We then use hwloc-distrib to map exclusive sets of cpus for
# each, and find one set that isn't in use yet.
# This is not the right way to do it, but it will work for now.
numcpus = int(manifest.app.isolators.container.cpus.value)
allresources = hwloc.info()
self.logger.debug("resource info: %r", allresources)
ncontainers = len(allresources.cpus) // numcpus
self.logger.debug("will support %s containers", ncontainers)
cur = nodeos.getavailable()
self.logger.debug("%r are available", cur)
sets = hwloc.distrib(ncontainers, restrict=cur, fake=allresources)
self.logger.info("asking for %s cores", numcpus)
self.logger.debug("will search in one of these: %r", sets)
# find a free set
avail = set(cur.cpus)
for s in sets:
cpuset = set(s.cpus)
if cpuset.issubset(avail):
alloc = s
break
else:
self.logger.error("no exclusive cpuset found among %r", avail)
return -2
# create container # create container
container_name = request['uuid'] container_name = request['uuid']
self.logger.info("creating container %s", container_name) self.logger.info("creating container %s", container_name)
nodeos.create(container_name, alloc) self.nodeos.create(container_name, allocation)
self.logger.info("created container %s", container_name) self.logger.info("created container %s", container_name)
newpid = os.fork() newpid = os.fork()
...@@ -83,26 +64,31 @@ class ContainerManager(object): ...@@ -83,26 +64,31 @@ class ContainerManager(object):
if newpid == 0: if newpid == 0:
# move myself to that container # move myself to that container
mypid = os.getpid() mypid = os.getpid()
nodeos.attach(container_name, mypid) self.nodeos.attach(container_name, mypid)
self.logger.info("child: attached to container %s", container_name) self.logger.info("child: attached to container %s", container_name)
# run my command # run my command
if hasattr(manifest.app.isolators, 'scheduler'): if hasattr(manifest.app.isolators, 'scheduler'):
chrt = ChrtClient(self.config) sched = manifest.app.isolators.scheduler
args = chrt.getwrappedcmd(manifest.app.isolators.scheduler) argv = self.chrt.getwrappedcmd(sched)
else: else:
args = [] argv = []
args.append(command) argv.append(command)
args.extend(args) argv.extend(args)
self.logger.debug("execvpe %r", args) self.logger.debug("execvpe %r", argv)
os.execvpe(args[0], args, environ) os.execvpe(argv[0], argv, environ)
# should never happen # should never happen
sys.exit(1) sys.exit(1)
else: else:
c = Container(container_name, manifest, newpid)
self.pids[newpid] = c
self.containers[container_name] = c
return newpid return newpid
def delete(self, uuid): def delete(self, uuid):
"""Delete a container and kill all related processes.""" """Delete a container and kill all related processes."""
nodeos = NodeOSClient() self.nodeos.delete(uuid, kill=True)
nodeos.delete(uuid, kill=True) c = self.containers[uuid]
del self.containers[uuid]
del self.pids[c.pid]
from __future__ import print_function from __future__ import print_function
import containers from containers import ContainerManager
from resources import ResourceManager
import json import json
import logging import logging
import os import os
...@@ -72,6 +73,7 @@ class Application(object): ...@@ -72,6 +73,7 @@ class Application(object):
class Daemon(object): class Daemon(object):
def __init__(self): def __init__(self):
self.applications = {} self.applications = {}
self.containerpids = {}
self.buf = '' self.buf = ''
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.target = 1.0 self.target = 1.0
...@@ -118,7 +120,7 @@ class Daemon(object): ...@@ -118,7 +120,7 @@ class Daemon(object):
self.logger.info("new container required: %r", msg) self.logger.info("new container required: %r", msg)
pid = self.container_manager.create(msg) pid = self.container_manager.create(msg)
if pid > 0: if pid > 0:
self.children[pid] = msg['uuid'] self.containerpids[pid] = msg['uuid']
# TODO: obviously we need to send more info than that # TODO: obviously we need to send more info than that
update = {'type': 'container', update = {'type': 'container',
'uuid': msg['uuid'], 'uuid': msg['uuid'],
...@@ -174,25 +176,27 @@ class Daemon(object): ...@@ -174,25 +176,27 @@ class Daemon(object):
# find out if children have terminated # find out if children have terminated
while True: while True:
try: try:
ret = os.wait3(os.WNOHANG) pid, status, rusage = os.wait3(os.WNOHANG)
if ret == (0, 0): if pid == 0 and status == 0:
break break
except OSError: except OSError:
break break
pid, status = ret self.logger.info("child update %d: %r", pid, status)
self.logger.info("child update: %d, %r", pid, status) # check if its a pid we care about
# check if this is an exit if pid in self.containerpids:
if os.WIFEXITED(status): # check if this is an exit
# TODO: update container tracking if os.WIFEXITED(status):
msg = {'type': 'container', uuid = self.containerpids[pid]
'event': 'exit', self.container_manager.delete(uuid)
'status': status, msg = {'type': 'container',
'uuid': None, 'event': 'exit',
} 'status': status,
self.upstream_pub.send_json(msg) 'uuid': None,
}
self.upstream_pub.send_json(msg)
else: else:
# ignore on purpose self.logger.debug("child update ignored")
pass pass
def do_shutdown(self): def do_shutdown(self):
...@@ -240,8 +244,9 @@ class Daemon(object): ...@@ -240,8 +244,9 @@ class Daemon(object):
# create a stream to let ioloop deal with blocking calls on HWM # create a stream to let ioloop deal with blocking calls on HWM
self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket) self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
# create container manager # create resource and container manager
self.container_manager = containers.ContainerManager() self.resource_manager = ResourceManager()
self.container_manager = ContainerManager(self.resource_manager)
# create sensor manager and make first measurement # create sensor manager and make first measurement
self.sensor = sensor.SensorManager() self.sensor = sensor.SensorManager()
......
from __future__ import print_function
import logging
from subprograms import HwlocClient, resources
class ResourceManager(object):
"""Manages the query of node resources, the tracking of their use and
the scheduling of new containers according to partitioning rules."""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.hwloc = HwlocClient()
# query the node topo, keep track of the critical resources
self.allresources = self.hwloc.info()
self.logger.debug("resource info: %r", self.allresources)
self.available = self.allresources
def schedule(self, uuid, request):
"""Schedule a resource request on the available resources.
Request is a dictionary of the resources asked for."""
# dumb scheduling, just give the first resources available:
# - cpus are exclusive
# - memories exclusive if more than one left
if len(self.available.cpus) >= request.cpus:
retcpus = self.available.cpus[:request.cpus]
availcpus = self.available.cpus[request.cpus:]
else:
retcpus = []
availcpus = self.available.cpus
if len(self.available.mems) > 1:
retmems = self.available.mems[:request.mems]
availmems = self.available.mems[request.mems:]
else:
retmems = self.available.mems
availmems = self.available.mems
self.available = resources(availcpus, availmems)
return resources(retcpus, retmems)
def remove(self, uuid):
"""Free the resources associated with request uuid."""
pass
# def oldcode(self):
# numcpus = int(manifest.app.isolators.container.cpus.value)
#
# allresources = hwloc.info()
# self.logger.debug("resource info: %r", allresources)
# ncontainers = len(allresources.cpus) // numcpus
# self.logger.debug("will support %s containers", ncontainers)
# cur = nodeos.getavailable()
# self.logger.debug("%r are available", cur)
# sets = hwloc.distrib(ncontainers, restrict=cur, fake=allresources)
# self.logger.info("asking for %s cores", numcpus)
# self.logger.debug("will search in one of these: %r", sets)
# # find a free set
# avail = set(cur.cpus)
# for s in sets:
# cpuset = set(s.cpus)
# if cpuset.issubset(avail):
# alloc = s
# break
# else:
# self.logger.error("no exclusive cpuset found among %r", avail)
# return -2
#
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
import subprocess import subprocess
import collections import collections
import logging import logging
import xml.etree.ElementTree
resources = collections.namedtuple("Resources", ["cpus", "mems"]) resources = collections.namedtuple("Resources", ["cpus", "mems"])
...@@ -136,23 +137,22 @@ class HwlocClient(object): ...@@ -136,23 +137,22 @@ class HwlocClient(object):
def info(self): def info(self):
"""Return list of all cpus and mems.""" """Return list of all cpus and mems."""
cmd = self.prefix + "-info" cmd = self.prefix + "-ls"
args = [cmd, '--whole-system'] args = [cmd, '--whole-system', '--output-format', 'xml']
p = subprocess.Popen(args, stdout=subprocess.PIPE, p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
stdout, stderr = p.communicate() stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr) logpopen(p, args, stdout, stderr)
lines = stdout.splitlines() xmlroot = xml.etree.ElementTree.fromstring(stdout)
ret = resources([], []) ret = resources([], [])
for l in lines: for obj in xmlroot.iter('object'):
numa = l.find('NUMANode') if obj.attrib['type'] == "NUMANode":
pu = l.find('PU') ret.mems.append(int(obj.attrib['os_index']))
if numa != -1: if obj.attrib['type'] == "PU":
fields = l[:numa].split() ret.cpus.append(int(obj.attrib['os_index']))
ret.mems.extend(range(int(fields[-1]))) # if there's only one memory node, hwloc doesn't list it
if pu != -1: if not ret.mems:
fields = l[:pu].split() ret.mems.append(0)
ret.cpus.extend(range(int(fields[-1])))
return ret return ret
def all2fake(self, resources): def all2fake(self, resources):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment