Commit abc03f50 authored by Valentin Reis's avatar Valentin Reis

Merge branch '52-support-adding-bind-mounts-to-container-for-runtimes-that-allow-it' into 'master'

Resolve "Support adding bind mounts to container for runtimes that allow it"

Closes #52

See merge request !92
parents c7771b4a 57980d37
Pipeline #7444 passed with stages
in 40 minutes and 51 seconds
...@@ -17,14 +17,6 @@ include: ...@@ -17,14 +17,6 @@ include:
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/applications.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/repoquality.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/repoquality.yml
py.test:
stage: test
script:
- pipenv install --dev
- pipenv run py.test --deselect=test/test_messaging.py
tags:
- rapl
flake8: flake8:
stage: style stage: style
script: script:
......
...@@ -147,7 +147,6 @@ class CommandLineInterface(object): ...@@ -147,7 +147,6 @@ class CommandLineInterface(object):
print(msg.payload, file=sys.stderr) print(msg.payload, file=sys.stderr)
sys.stderr.flush() sys.stderr.flush()
elif msg.tag == 'exit': elif msg.tag == 'exit':
logger.info("process ended: %r", msg)
state = 'exiting' state = 'exiting'
exitmsg = msg exitmsg = msg
else: else:
...@@ -155,7 +154,7 @@ class CommandLineInterface(object): ...@@ -155,7 +154,7 @@ class CommandLineInterface(object):
if outeof and erreof and state == 'exiting': if outeof and erreof and state == 'exiting':
state = 'exit' state = 'exit'
istatus = int(exitmsg.status) istatus = int(exitmsg.status)
logger.info("command ended: %r", exitmsg) logger.debug("command ended with istatus %r.", exitmsg)
if os.WIFSIGNALED(istatus): if os.WIFSIGNALED(istatus):
logger.error("command ended due to signal %s" % logger.error("command ended due to signal %s" %
str(os.WTERMSIG(istatus))) str(os.WTERMSIG(istatus)))
...@@ -163,7 +162,7 @@ class CommandLineInterface(object): ...@@ -163,7 +162,7 @@ class CommandLineInterface(object):
elif os.WIFEXITED(istatus): elif os.WIFEXITED(istatus):
s = int(os.WTERMSIG(istatus)) s = int(os.WTERMSIG(istatus))
if s > 0: if s > 0:
logger.debug("command ended with exit code %s" % logger.error("command ended with exit code %s" %
str(s)) str(s))
sys.exit(s) sys.exit(s)
else: else:
......
{
"acKind": "ImageManifest",
"acVersion": "0.6.0",
"name": "test",
"app": {
"isolators": [
{
"name": "argo/scheduler",
"value": {
"policy": "SCHED_OTHER",
"priority": "0"
}
},
{
"name": "argo/container",
"value": {
"cpus": "24",
"mems": "2"
}
},
{
"name": "argo/perfwrapper",
"value": {
"enabled": "1"
}
},
{
"name": "argo/power",
"value": {
"enabled": "1",
"profile": "1",
"policy": "NONE",
"damper": "1e9",
"slowdown": "1.1"
}
},
{
"name": "argo/monitoring",
"value": {
"enabled": "1",
"ratelimit": "10000000"
}
},
{
"name": "argo/hwbind",
"value": {
"enabled": "1"
}
}
]
}
}
{
"acKind": "ImageManifest",
"acVersion": "0.6.0",
"name": "test",
"app": {
"isolators": [
{
"name": "argo/scheduler",
"value": {
"policy": "SCHED_OTHER",
"priority": "0"
}
},
{
"name": "argo/container",
"value": {
"cpus": "1",
"mems": "1"
}
},
{
"name": "argo/perfwrapper",
"value": {
"enabled": "0"
}
},
{
"name": "argo/power",
"value": {
"enabled": "1",
"profile": "1",
"policy": "NONE",
"damper": "1e9",
"slowdown": "1.1"
}
},
{
"name": "argo/monitoring",
"value": {
"enabled": "1",
"ratelimit": "10000000"
}
},
{
"name": "argo/hwbind",
"value": {
"enabled": "0"
}
}
]
},
"image": {
"path": "docker://ubuntu",
"type": "docker"
}
}
{
"name": "basic",
"version": "0.0.1",
"app": {
"container": {
"cpus": 2,
"mems": 1
}
}
}
{
"name": "basic",
"version": "0.0.1",
"app": {
"container": {
"cpus": 2,
"mems": 1
}
},
"image": {
"path": "docker://ubuntu",
"type": "docker",
"binds": [
"/nix"
]
}
}
This diff is collapsed.
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
from __future__ import print_function from __future__ import print_function
from aci import ImageManifest from aci import ImageManifest
from json import load
from collections import namedtuple from collections import namedtuple
import logging import logging
from subprograms import ChrtClient, NodeOSClient, resources, SingularityClient from subprograms import ChrtClient, NodeOSClient, resources, SingularityClient
...@@ -54,8 +55,8 @@ class ContainerManager(object): ...@@ -54,8 +55,8 @@ class ContainerManager(object):
return (False, self.containers[container_name]) return (False, self.containers[container_name])
# ask the resource manager for resources # ask the resource manager for resources
ncpus = int(manifest.app.isolators.container.cpus.value) ncpus = manifest.app['container']['cpus']
nmems = int(manifest.app.isolators.container.mems.value) nmems = manifest.app['container']['mems']
req = resources(ncpus, nmems) req = resources(ncpus, nmems)
allocated = self.resourcemanager.schedule(container_name, req) allocated = self.resourcemanager.schedule(container_name, req)
logger.info("create: allocation: %r", allocated) logger.info("create: allocation: %r", allocated)
...@@ -69,15 +70,15 @@ class ContainerManager(object): ...@@ -69,15 +70,15 @@ class ContainerManager(object):
container_power['manager'] = None container_power['manager'] = None
if manifest.is_feature_enabled('power'): if manifest.is_feature_enabled('power'):
pp = manifest.app.isolators.power pp = manifest.app['power']
if pp.profile in ["1", "True"]: if pp['profile'] is True:
container_power['profile'] = dict() container_power['profile'] = dict()
container_power['profile']['start'] = dict() container_power['profile']['start'] = dict()
container_power['profile']['end'] = dict() container_power['profile']['end'] = dict()
if pp.policy != "NONE": if pp['policy'] != "NONE":
container_power['policy'] = pp.policy container_power['policy'] = pp['policy']
container_power['damper'] = pp.damper container_power['damper'] = pp['damper']
container_power['slowdown'] = pp.slowdown container_power['slowdown'] = pp['slowdown']
# Compute hardware bindings # Compute hardware bindings
hwbindings = dict() hwbindings = dict()
...@@ -103,10 +104,12 @@ class ContainerManager(object): ...@@ -103,10 +104,12 @@ class ContainerManager(object):
logger.info("create: args: %r", args) logger.info("create: args: %r", args)
logger.info("create: container name: %s", container_name) logger.info("create: container name: %s", container_name)
manifest = ImageManifest() try:
if not manifest.load(manifestfile): with open(manifestfile) as f:
logger.error("Manifest is invalid") manifest = ImageManifest((load(f)))
return None except Exception as e:
logger.error("error occured in manifest loading:")
raise(e)
creation_needed, container = self._get_container_tuple(container_name, creation_needed, container = self._get_container_tuple(container_name,
manifest) manifest)
...@@ -120,7 +123,7 @@ class ContainerManager(object): ...@@ -120,7 +123,7 @@ class ContainerManager(object):
# "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin") # "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
environ['ARGO_CONTAINER_UUID'] = container_name environ['ARGO_CONTAINER_UUID'] = container_name
environ['PERF'] = self.linuxperf environ['PERF'] = self.linuxperf
environ['AC_APP_NAME'] = manifest.name environ['AC_APP_NAME'] = manifest['name']
environ['AC_METADATA_URL'] = "localhost" environ['AC_METADATA_URL'] = "localhost"
# power profiling uses LD_PRELOAD, we use get to ensure that it # power profiling uses LD_PRELOAD, we use get to ensure that it
...@@ -133,7 +136,7 @@ class ContainerManager(object): ...@@ -133,7 +136,7 @@ class ContainerManager(object):
# monitoring section involves libnrm # monitoring section involves libnrm
if manifest.is_feature_enabled('monitoring'): if manifest.is_feature_enabled('monitoring'):
environ['ARGO_NRM_RATELIMIT'] = \ environ['ARGO_NRM_RATELIMIT'] = \
manifest.app.isolators.monitoring.ratelimit manifest.app['monitoring']['ratelimit']
if container.power.get('policy') or \ if container.power.get('policy') or \
manifest.is_feature_enabled('monitoring'): manifest.is_feature_enabled('monitoring'):
...@@ -143,7 +146,7 @@ class ContainerManager(object): ...@@ -143,7 +146,7 @@ class ContainerManager(object):
# build prefix to the entire command based on enabled features # build prefix to the entire command based on enabled features
argv = [] argv = []
if manifest.is_feature_enabled('scheduler'): if manifest.is_feature_enabled('scheduler'):
sched = manifest.app.isolators.scheduler sched = manifest.app['scheduler']
argv = self.chrt.getwrappedcmd(sched) argv = self.chrt.getwrappedcmd(sched)
# Use hwloc-bind to launch each process in the conatiner by prepending # Use hwloc-bind to launch each process in the conatiner by prepending
...@@ -270,8 +273,8 @@ class SingularityUserRuntime(ContainerRuntime): ...@@ -270,8 +273,8 @@ class SingularityUserRuntime(ContainerRuntime):
def create(self, container, downstream_uri): def create(self, container, downstream_uri):
"""Uses the container resource allocation to create a container.""" """Uses the container resource allocation to create a container."""
imageinfo = container.manifest.image imageinfo = container.manifest.image
self.client.instance_start(container.uuid, imageinfo.path, self.client.instance_start(container.uuid, imageinfo['path'],
[downstream_uri]) [downstream_uri]+imageinfo['binds'])
def execute(self, container_uuid, args, environ): def execute(self, container_uuid, args, environ):
"""Launches a command in the container.""" """Launches a command in the container."""
......
...@@ -12,32 +12,20 @@ import json ...@@ -12,32 +12,20 @@ import json
import logging import logging
import uuid import uuid
import zmq import zmq
import os
import zmq.utils import zmq.utils
import zmq.utils.monitor import zmq.utils.monitor
from zmq.eventloop import zmqstream from zmq.eventloop import zmqstream
import warlock from schema import loadschema
from jsonschema import Draft4Validator
_logger = logging.getLogger('nrm') _logger = logging.getLogger('nrm')
_UpstreamRep = loadschema('upstreamRep')
_UpstreamPub = loadschema('upstreamPub')
def _loadschema(api):
sourcedir = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(sourcedir, "schemas", api+".json")) as f:
s = json.load(f)
Draft4Validator.check_schema(s)
return(warlock.model_factory(s))
_UpstreamRep = _loadschema('upstreamRep')
_UpstreamPub = _loadschema('upstreamPub')
def send(apiname): def send(apiname):
def wrap(cls): def wrap(cls):
model = _loadschema(apiname) model = loadschema(apiname)
def send(self, *args, **kwargs): def send(self, *args, **kwargs):
self.socket.send( self.socket.send(
...@@ -50,7 +38,7 @@ def send(apiname): ...@@ -50,7 +38,7 @@ def send(apiname):
def recv_callback(apiname): def recv_callback(apiname):
def wrap(cls): def wrap(cls):
model = _loadschema(apiname) model = loadschema(apiname)
def recv(self): def recv(self):
"""Receives a response to a message.""" """Receives a response to a message."""
...@@ -63,7 +51,6 @@ def recv_callback(apiname): ...@@ -63,7 +51,6 @@ def recv_callback(apiname):
callback.""" callback."""
_logger.info("receiving message: %r", frames) _logger.info("receiving message: %r", frames)
assert len(frames) == 2 assert len(frames) == 2
print(frames)
msg = model(json.loads(frames[1])) msg = model(json.loads(frames[1]))
assert self.callback assert self.callback
self.callback(msg, str(frames[0])) self.callback(msg, str(frames[0]))
......
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
import warlock
import json
import os
from jsonschema import Draft4Validator
def loadschema(api):
sourcedir = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(sourcedir, "schemas", api+".json")) as f:
s = json.load(f)
Draft4Validator.check_schema(s)
return(warlock.model_factory(s))
{
"type": "object",
"required": [
"name",
"version",
"app"
],
"properties": {
"version": {
"const": "0.0.1"
},
"name": {
"type": "string"
},
"app": {
"type": "object",
"required": [
"container"
],
"properties": {
"container": {
"type": "object",
"required": [
"cpus",
"mems"
],
"properties": {
"cpus": {
"type": "number"
},
"mems": {
"type": "number"
}
}
},
"scheduler": {
"oneOf": [
{
"type": "object",
"required": [
"policy",
"priority"
],
"properties": {
"policy": {
"const": "SCHED_OTHER"
},
"priority": {
"type": "number"
}
}
},
{
"type": "object",
"required": [
"policy",
"priority"
],
"properties": {
"policy": {
"type": "string",
"enum": [
"SCHED_FIFO",
"SCHED_HPC"
]
}
}
}
]
},
"perfwrapper": {
"const": true
},
"power": {
"type": "object",
"required": [
"profile",
"policy",
"damper",
"slowdown"
],
"properties": {
"policy": {
"type": "string",
"enum": [
"NONE",
"DDCM",
"DVFS",
"COMBINED"
]
},
"profile": {
"type": "boolean"
},
"damper": {
"type": "number",
"minimum": 0
},
"slowdown": {
"type": "number",
"maximum": 1,
"exclusiveMaximum": true
}
}
},
"monitoring": {
"type": "object",
"required": [
"ratelimit"
],
"properties": {
"ratelimit": {
"type": "number",
"minimum": 0
}
}
}
}
},
"hwbind": {
"const": "enabled"
},
"image": {
"type": "object",
"required": [
"path",
"type"
],
"properties": {
"path": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"sif",
"docker"
]
},
"binds": {
"type": "array",
"items": {
"type": "string"
}
}
}
}
}
}
...@@ -159,6 +159,7 @@ class SingularityClient(object): ...@@ -159,6 +159,7 @@ class SingularityClient(object):
if bind_list: if bind_list:
args.extend(['--bind', ','.join(bind_list)]) args.extend(['--bind', ','.join(bind_list)])
args.extend([container_image, instance_name]) args.extend([container_image, instance_name])
logger.error("launching singularity command: %s", args)
p = subprocess.Popen(args, stdout=subprocess.PIPE, p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
stdout, stderr = p.communicate() stdout, stderr = p.communicate()
......
...@@ -17,57 +17,19 @@ import json ...@@ -17,57 +17,19 @@ import json
@pytest.fixture @pytest.fixture
def manifest_base_data(): def manifest_base_data():
data = '''{ with open("examples/basic.json") as f:
"acKind": "ImageManifest", return json.load(f)
"acVersion": "0.6.0",
"name": "test",
"app": {
"isolators": [
{
"name": "argo/container",
"value": {
"cpus": "1",
"mems": "1"
}
}
]
}
}'''
return json.loads(data)
def test_manifest_disabled_perfwrapper(manifest_base_data): def test_manifest_disabled_perfwrapper(manifest_base_data):
"""Ensure we can check if a feature is disabled.""" """Ensure we can check if a feature is disabled."""
manifest = nrm.aci.ImageManifest() manifest = nrm.aci.ImageManifest(manifest_base_data)
isolator_text = '''{
"name": "argo/perfwrapper",
"value": {
"enabled": "0"
}
}'''
isolator = json.loads(isolator_text)
data = manifest_base_data
data["app"]["isolators"].append(isolator)
assert manifest.load_dict(data)
assert not manifest.is_feature_enabled("perfwrapper") assert not manifest.is_feature_enabled("perfwrapper")
def test_enabled_feature(manifest_base_data): def test_enabled_feature(manifest_base_data):
"""Ensure we can check if a feature is enabled without enabled in it.""" """Ensure we can check if a feature is enabled without enabled in it."""
manifest = nrm.aci.ImageManifest() data = manifest_base_data.copy()
isolator_text = '''{ data["app"]["perfwrapper"] = "enabled"
"name": "argo/perfwrapper", manifest = nrm.aci.ImageManifest(data)
"value": {}
}'''
isolator = json.loads(isolator_text)
data = manifest_base_data
data["app"]["isolators"].append(isolator)
assert manifest.load_dict(data)
assert manifest.is_feature_enabled("perfwrapper") assert manifest.is_feature_enabled("perfwrapper")
def test_missing_disabled(manifest_base_data):
"""Ensure that a missing feature doesn't appear enabled."""
manifest = nrm.aci.ImageManifest()
assert manifest.load_dict(manifest_base_data)
assert not manifest.is_feature_enabled("perfwrapper")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment