Commit fb396383 authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'refactor-container-management' into 'master'

Code refactoring in Container management

See merge request !54
parents 08badd4f e15360e4
Pipeline #5096 passed with stages
in 5 minutes and 51 seconds
...@@ -58,6 +58,7 @@ class Scheduler(SpecField): ...@@ -58,6 +58,7 @@ class Scheduler(SpecField):
fields = {"policy": spec(unicode, True), fields = {"policy": spec(unicode, True),
"priority": spec(unicode, False), "priority": spec(unicode, False),
"enabled": spec(unicode, False),
} }
def __init__(self): def __init__(self):
...@@ -78,6 +79,10 @@ class Scheduler(SpecField): ...@@ -78,6 +79,10 @@ class Scheduler(SpecField):
logger.warning("scheduler priority forced as 0 " + logger.warning("scheduler priority forced as 0 " +
"for non default policies") "for non default policies")
self.priority = "0" self.priority = "0"
if getattr(self, "enabled", "1") not in ["0", "False", "1", "True"]:
logger.error("Invalid value for scheduler enabled: %s",
self.enabled)
return False
return True return True
...@@ -142,7 +147,7 @@ class PerfWrapper(SpecField): ...@@ -142,7 +147,7 @@ class PerfWrapper(SpecField):
ret = super(PerfWrapper, self).load(data) ret = super(PerfWrapper, self).load(data)
if not ret: if not ret:
return ret return ret
if self.enabled not in ["0", "False", "1", "True"]: if getattr(self, "enabled", "1") not in ["0", "False", "1", "True"]:
logger.error("Invalid value for perfwrapper enabled: %s", logger.error("Invalid value for perfwrapper enabled: %s",
self.enabled) self.enabled)
return False return False
...@@ -253,7 +258,9 @@ class IsolatorList(SpecField): ...@@ -253,7 +258,9 @@ class IsolatorList(SpecField):
setattr(self, name.lstrip("argo/"), v) setattr(self, name.lstrip("argo/"), v)
for k in self.types: for k in self.types:
if self.types[k].required: if self.types[k].required:
assert name.lstrip("argo/") in self.__dict__ if not hasattr(self, k.lstrip("argo/")):
logger.error("Missing mandatory isolator: %s", k)
return False
return True return True
...@@ -294,3 +301,25 @@ class ImageManifest(SpecField): ...@@ -294,3 +301,25 @@ class ImageManifest(SpecField):
with open(filename, 'r') as f: with open(filename, 'r') as f:
data = json.load(f) data = json.load(f)
return super(ImageManifest, self).load(data) return super(ImageManifest, self).load(data)
def load_dict(self, data):
"""Load a manifest in dictionary form."""
return super(ImageManifest, self).load(data)
def is_feature_enabled(self, feature, true_values=["1", "True"]):
"""Check if a specific feature is enabled.
Since the enabled field itself is optional, we return true if an
isolator is present in a manifest or the enabled field is not true."""
typename = "argo/{}".format(feature)
assert typename in IsolatorList.types, \
"{} in not a valid feature".format(feature)
logger.debug(repr(self))
if hasattr(self.app.isolators, feature):
isolator = getattr(self.app.isolators, feature)
if hasattr(isolator, 'enabled'):
if isolator.enabled not in true_values:
return False
return True
else:
return False
...@@ -88,7 +88,7 @@ class ApplicationManager(object): ...@@ -88,7 +88,7 @@ class ApplicationManager(object):
phase_contexts = dict() phase_contexts = dict()
phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime'] phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime']
if container.power['policy']: if container.power['policy']:
ids = container.resources['cpus'] ids = container.resources.cpus
for id in ids: for id in ids:
phase_contexts[id] = dict.fromkeys(phase_context_keys) phase_contexts[id] = dict.fromkeys(phase_context_keys)
phase_contexts[id]['set'] = False phase_contexts[id]['set'] = False
......
...@@ -17,14 +17,13 @@ class ContainerManager(object): ...@@ -17,14 +17,13 @@ class ContainerManager(object):
"""Manages the creation, listing and deletion of containers, using a """Manages the creation, listing and deletion of containers, using a
container runtime underneath.""" container runtime underneath."""
def __init__(self, rm, def __init__(self, container_runtime, rm,
perfwrapper="argo-perf-wrapper", perfwrapper="argo-perf-wrapper",
linuxperf="perf", linuxperf="perf",
argo_nodeos_config="argo_nodeos_config",
pmpi_lib="/usr/lib/libnrm-pmpi.so"): pmpi_lib="/usr/lib/libnrm-pmpi.so"):
self.linuxperf = linuxperf self.linuxperf = linuxperf
self.perfwrapper = perfwrapper self.perfwrapper = perfwrapper
self.nodeos = NodeOSClient(argo_nodeos_config=argo_nodeos_config) self.runtime = container_runtime
self.containers = dict() self.containers = dict()
self.pids = dict() self.pids = dict()
self.resourcemanager = rm self.resourcemanager = rm
...@@ -32,108 +31,77 @@ class ContainerManager(object): ...@@ -32,108 +31,77 @@ class ContainerManager(object):
self.chrt = ChrtClient() self.chrt = ChrtClient()
self.pmpi_lib = pmpi_lib self.pmpi_lib = pmpi_lib
def _get_container_tuple(self, container_name, manifest):
"""Retrieve a container tuple if the container exists, otherwise use
the manifest to create a new one.
Returns (bool, container_tuple), the first field telling if a container
needs to be created."""
if container_name in self.containers:
return (False, self.containers[container_name])
# ask the resource manager for resources
ncpus = int(manifest.app.isolators.container.cpus.value)
nmems = int(manifest.app.isolators.container.mems.value)
req = resources(ncpus, nmems)
allocated = self.resourcemanager.schedule(container_name, req)
logger.info("create: allocation: %r", allocated)
# Container power settings
container_power = dict()
container_power['profile'] = None
container_power['policy'] = None
container_power['damper'] = None
container_power['slowdown'] = None
container_power['manager'] = None
if manifest.is_feature_enabled('power'):
pp = manifest.app.isolators.power
if pp.profile in ["1", "True"]:
container_power['profile'] = dict()
container_power['profile']['start'] = dict()
container_power['profile']['end'] = dict()
if pp.policy != "NONE":
container_power['policy'] = pp.policy
container_power['damper'] = pp.damper
container_power['slowdown'] = pp.slowdown
# Compute hardware bindings
hwbindings = dict()
if manifest.is_feature_enabled('hwbind'):
hwbindings['distrib'] = sorted(self.hwloc.distrib(
ncpus, allocated), key=operator.
attrgetter('cpus'))
return (True, Container(container_name, manifest, allocated,
container_power, {}, {}, hwbindings))
def create(self, request): def create(self, request):
"""Create a container according to the request. """Create a container according to the request.
Returns the pid of the container or a negative number for errors.""" Returns the pid of the container or a negative number for errors."""
container = None
containerexistsflag = False
processes = None
clientids = None
pp = None
hwbindings = None
bind_index = 0
manifestfile = request['manifest'] manifestfile = request['manifest']
command = request['file'] command = request['file']
args = request['args'] args = request['args']
environ = request['environ'] environ = request['environ']
container_name = request['uuid'] container_name = request['uuid']
logger.info("run: manifest file: %s", manifestfile) logger.info("create: manifest file: %s", manifestfile)
logger.info("run: command: %s", command) logger.info("create: command: %s", command)
logger.info("run: args: %r", args) logger.info("create: args: %r", args)
logger.info("run: container name: %s", container_name) logger.info("create: container name: %s", container_name)
# TODO: Application library to load must be set during configuration
apppreloadlibrary = self.pmpi_lib
manifest = ImageManifest() manifest = ImageManifest()
if not manifest.load(manifestfile): if not manifest.load(manifestfile):
logger.error("Manifest is invalid") logger.error("Manifest is invalid")
return None return None
if hasattr(manifest.app.isolators, 'scheduler'): creation_needed, container = self._get_container_tuple(container_name,
sched = manifest.app.isolators.scheduler manifest)
argv = self.chrt.getwrappedcmd(sched) if creation_needed:
else: logger.info("Creating container %s", container_name)
argv = [] self.runtime.create(container)
self.containers[container_name] = container
# Check if container exists else create it
if container_name in self.containers:
container = self.containers[container_name]
containerexistsflag = True
processes = container.processes
clientids = container.clientids
hwbindings = container.hwbindings
bind_index = len(processes)
else:
processes = dict()
clientids = dict()
hwbindings = dict()
# ask the resource manager for resources
ncpus = int(manifest.app.isolators.container.cpus.value)
nmems = int(manifest.app.isolators.container.mems.value)
req = resources(ncpus, nmems)
alloc = self.resourcemanager.schedule(container_name, req)
logger.info("run: allocation: %r", alloc)
# create container
logger.info("creating container %s", container_name)
self.nodeos.create(container_name, alloc)
container_resources = dict()
container_resources['cpus'], container_resources['mems'] = alloc
# Container power settings
container_power = dict()
container_power['profile'] = None
container_power['policy'] = None
container_power['damper'] = None
container_power['slowdown'] = None
container_power['manager'] = None
# It would've been better if argo-perf-wrapper wrapped around
# argo-nodeos-config and not the final command -- that way it would
# be running outside of the container. However, because
# argo-nodeos-config is suid root, perf can't monitor it.
if hasattr(manifest.app.isolators, 'perfwrapper'):
manifest_perfwrapper = manifest.app.isolators.perfwrapper
if hasattr(manifest_perfwrapper, 'enabled'):
if manifest_perfwrapper.enabled in ["1", "True"]:
argv.append(self.perfwrapper)
if hasattr(manifest.app.isolators, 'power'):
if hasattr(manifest.app.isolators.power, 'enabled'):
pp = manifest.app.isolators.power
if pp.enabled in ["1", "True"]:
if pp.profile in ["1", "True"]:
container_power['profile'] = dict()
container_power['profile']['start'] = dict()
container_power['profile']['end'] = dict()
if pp.policy != "NONE":
container_power['policy'] = pp.policy
container_power['damper'] = pp.damper
container_power['slowdown'] = pp.slowdown
# Compute hardware bindings
if hasattr(manifest.app.isolators, 'hwbind'):
manifest_hwbind = manifest.app.isolators.hwbind
if hasattr(manifest_hwbind, 'enabled'):
if manifest_hwbind.enabled in ["1", "True"]:
hwbindings['enabled'] = True
hwbindings['distrib'] = sorted(self.hwloc.distrib(
ncpus, alloc), key=operator.
attrgetter('cpus'))
# build context to execute # build context to execute
# environ['PATH'] = ("/usr/local/sbin:" # environ['PATH'] = ("/usr/local/sbin:"
...@@ -142,53 +110,61 @@ class ContainerManager(object): ...@@ -142,53 +110,61 @@ class ContainerManager(object):
environ['PERF'] = self.linuxperf environ['PERF'] = self.linuxperf
environ['AC_APP_NAME'] = manifest.name environ['AC_APP_NAME'] = manifest.name
environ['AC_METADATA_URL'] = "localhost" environ['AC_METADATA_URL'] = "localhost"
if (containerexistsflag and container.power['policy'] is not None) or (
pp is not None and pp.policy != "NONE"): # power profiling uses LD_PRELOAD, we use get to ensure that it
environ['LD_PRELOAD'] = apppreloadlibrary # doesn't crash if the policy doesn't exits.
if container.power.get('policy'):
environ['LD_PRELOAD'] = self.pmpi_lib
environ['NRM_TRANSMIT'] = '1' environ['NRM_TRANSMIT'] = '1'
if containerexistsflag: environ['NRM_DAMPER'] = container.power['damper']
environ['NRM_DAMPER'] = container.power['damper']
else: # build prefix to the entire command based on enabled features
environ['NRM_DAMPER'] = pp.damper argv = []
if manifest.is_feature_enabled('scheduler'):
sched = manifest.app.isolators.scheduler
argv = self.chrt.getwrappedcmd(sched)
# Use hwloc-bind to launch each process in the conatiner by prepending # Use hwloc-bind to launch each process in the conatiner by prepending
# it as an argument to the command line, if enabled in manifest. # it as an argument to the command line, if enabled in manifest.
# The hardware binding computed using hwloc-distrib is used here # The hardware binding computed using hwloc-distrib is used here
# --single # --single
if bool(hwbindings) and hwbindings['enabled']: if container.hwbindings:
# round robin over the cpu bindings available
bind_index = len(container.processes) % \
len(container.hwbindings['distrib'])
argv.append('hwloc-bind') argv.append('hwloc-bind')
# argv.append('--single') # argv.append('--single')
argv.append('core:'+str(hwbindings['distrib'][bind_index].cpus[0])) cpumask = container.hwbindings['distrib'][bind_index].cpus[0]
memmask = container.hwbindings['distrib'][bind_index].mems[0]
logging.info('create: binding to: %s, %s', cpumask, memmask)
argv.append("core:{}".format(cpumask))
argv.append('--membind') argv.append('--membind')
argv.append('numa:'+str(hwbindings['distrib'][bind_index].mems[0])) argv.append("numa:{}".format(memmask))
# It would've been better if argo-perf-wrapper wrapped around
# argo-nodeos-config and not the final command -- that way it would
# be running outside of the container. However, because
# argo-nodeos-config is suid root, perf can't monitor it.
if manifest.is_feature_enabled('perfwrapper'):
argv.append(self.perfwrapper)
argv.append(command) argv.append(command)
argv.extend(args) argv.extend(args)
# run my command # run my command
process = self.nodeos.execute(container_name, argv, environ) process = self.runtime.execute(container_name, argv, environ)
processes[process.pid] = process
clientids[process.pid] = request['clientid']
if containerexistsflag:
container.processes[process.pid] = process
self.pids[process.pid] = container
logger.info("Created process %s in container %s", process.pid,
container_name)
else:
container = Container(container_name, manifest,
container_resources, container_power,
processes, clientids, hwbindings)
self.pids[process.pid] = container
self.containers[container_name] = container
logger.info("Container %s created and running : %r",
container_name, container)
# register the process
container.processes[process.pid] = process
container.clientids[process.pid] = request['clientid']
self.pids[process.pid] = container
logger.info("Created process %s in container %s", process.pid,
container_name)
return process.pid, container return process.pid, container
def delete(self, uuid): def delete(self, uuid):
"""Delete a container and kill all related processes.""" """Delete a container and kill all related processes."""
self.nodeos.delete(uuid, kill=True) self.runtime.delete(uuid, kill=True)
self.resourcemanager.update(uuid) self.resourcemanager.update(uuid)
c = self.containers[uuid] c = self.containers[uuid]
del self.containers[uuid] del self.containers[uuid]
...@@ -209,3 +185,73 @@ class ContainerManager(object): ...@@ -209,3 +185,73 @@ class ContainerManager(object):
"""List the containers in the system.""" """List the containers in the system."""
return [{'uuid': c.uuid, 'pid': c.processes.keys()} return [{'uuid': c.uuid, 'pid': c.processes.keys()}
for c in self.containers.values()] for c in self.containers.values()]
class ContainerRuntime(object):
"""Implements the creation, deleting and spawning of commands for a
container runtime."""
def __init__(self):
pass
def create(self, container):
"""Create the container defined by the container namedtuple on the
system."""
raise NotImplementedError
def execute(self, container_uuid, args, environ):
"""Execute a command inside a container, using a similar interface to
popen.
Returns a tornado.process.Subprocess"""
raise NotImplementedError
def delete(self, container_uuid, kill=False):
"""Delete a container, possibly killing all the processes inside."""
raise NotImplementedError
class NodeOSRuntime(ContainerRuntime):
"""Implements the container runtime interface using the nodeos
subprogram."""
def __init__(self, path="argo_nodeos_config"):
"""Creates the client for nodeos, with an optional custom
path/command."""
self.client = NodeOSClient(argo_nodeos_config=path)
def create(self, container):
"""Uses the container resource allocation to create a container."""
self.client.create(container.uuid, container.resources)
def execute(self, container_uuid, args, environ):
"""Launches a command in the container."""
return self.client.execute(container_uuid, args, environ)
def delete(self, container_uuid, kill=False):
"""Delete the container."""
self.client.delete(container_uuid, kill)
class DummyRuntime(ContainerRuntime):
"""Implements a dummy runtime that doesn't create any container, but still
launches commands."""
def __init__(self):
pass
def create(self, container):
pass
def execute(self, container_uuid, args, environ):
import tornado.process as process
return process.Subprocess(args, stdin=process.Subprocess.STREAM,
stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM,
env=environ)
def delete(self, container_uuid, kill=False):
pass
...@@ -126,7 +126,7 @@ class Controller(object): ...@@ -126,7 +126,7 @@ class Controller(object):
def run_policy_container(self, container, application): def run_policy_container(self, container, application):
"""Run policies on a container.""" """Run policies on a container."""
ids = container.resources['cpus'] ids = container.resources.cpus
pcs = application.phase_contexts pcs = application.phase_contexts
# Run policy only if all phase contexts have been received # Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids): if not filter(lambda i: not pcs[i]['set'], ids):
......
from __future__ import print_function from __future__ import print_function
from applications import ApplicationManager from applications import ApplicationManager
from containers import ContainerManager from containers import ContainerManager, NodeOSRuntime
from controller import Controller, PowerActuator from controller import Controller, PowerActuator
from powerpolicy import PowerPolicyManager from powerpolicy import PowerPolicyManager
from functools import partial from functools import partial
...@@ -85,6 +85,7 @@ class Daemon(object): ...@@ -85,6 +85,7 @@ class Daemon(object):
self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update), self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
client) client)
elif msg.type == 'run': elif msg.type == 'run':
logger.info("asked to run a command in a container: %r", msg)
container_uuid = msg.container_uuid container_uuid = msg.container_uuid
params = {'manifest': msg.manifest, params = {'manifest': msg.manifest,
'file': msg.path, 'file': msg.path,
...@@ -98,7 +99,7 @@ class Daemon(object): ...@@ -98,7 +99,7 @@ class Daemon(object):
if len(container.processes) == 1: if len(container.processes) == 1:
if container.power['policy']: if container.power['policy']:
container.power['manager'] = PowerPolicyManager( container.power['manager'] = PowerPolicyManager(
container.resources['cpus'], container.resources.cpus,
container.power['policy'], container.power['policy'],
float(container.power['damper']), float(container.power['damper']),
float(container.power['slowdown'])) float(container.power['slowdown']))
...@@ -296,12 +297,14 @@ class Daemon(object): ...@@ -296,12 +297,14 @@ class Daemon(object):
# create managers # create managers
self.resource_manager = ResourceManager(hwloc=self.config.hwloc) self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
container_runtime = \
NodeOSRuntime(self.config.argo_nodeos_config)
self.container_manager = ContainerManager( self.container_manager = ContainerManager(
self.resource_manager, container_runtime,
perfwrapper=self.config.argo_perf_wrapper, self.resource_manager,
linuxperf=self.config.perf, perfwrapper=self.config.argo_perf_wrapper,
argo_nodeos_config=self.config.argo_nodeos_config, linuxperf=self.config.perf,
pmpi_lib=self.config.pmpi_lib, pmpi_lib=self.config.pmpi_lib,
) )
self.application_manager = ApplicationManager() self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager() self.sensor_manager = SensorManager()
......
"""Tests for the ACI Manifest module."""
import nrm
import nrm.aci
import pytest
import json
@pytest.fixture
def manifest_base_data():
data = '''{
"acKind": "ImageManifest",
"acVersion": "0.6.0",
"name": "test",
"app": {
"isolators": [
{
"name": "argo/container",
"value": {
"cpus": "1",
"mems": "1"
}
}
]
}
}'''
return json.loads(data)
def test_manifest_disabled_perfwrapper(manifest_base_data):
"""Ensure we can check if a feature is disabled."""
manifest = nrm.aci.ImageManifest()
isolator_text = '''{
"name": "argo/perfwrapper",
"value": {
"enabled": "0"
}
}'''
isolator = json.loads(isolator_text)
data = manifest_base_data
data["app"]["isolators"].append(isolator)
assert manifest.load_dict(data)
assert not manifest.is_feature_enabled("perfwrapper")
def test_enabled_feature(manifest_base_data):
"""Ensure we can check if a feature is enabled without enabled in it."""
manifest = nrm.aci.ImageManifest()
isolator_text = '''{
"name": "argo/perfwrapper",
"value": {}
}'''
isolator = json.loads(isolator_text)
data = manifest_base_data
data["app"]["isolators"].append(isolator)
assert manifest.load_dict(data)
assert manifest.is_feature_enabled("perfwrapper")
def test_missing_disabled(manifest_base_data):
"""Ensure that a missing feature doesn't appear enabled."""
manifest = nrm.aci.ImageManifest()
assert manifest.load_dict(manifest_base_data)
assert not manifest.is_feature_enabled("perfwrapper")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment