Commit ffe2d632 authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'container-singularity' into 'master'

Container singularity support

Closes #42 and #49

See merge request !85
parents f73b4415 b3450523
Pipeline #7121 passed with stages
in 13 minutes and 47 seconds
...@@ -4,3 +4,4 @@ Kazutomo Yoshii <kazutomo.yoshii@gmail.com> ...@@ -4,3 +4,4 @@ Kazutomo Yoshii <kazutomo.yoshii@gmail.com>
Sridutt Bhalachandra <sriduttb@anl.gov> Sridutt Bhalachandra <sriduttb@anl.gov>
Srinivasan Ramesh <ramesh2@llnl.gov> Srinivasan Ramesh <ramesh2@llnl.gov>
Valentin Reis <vreis@anl.gov> Valentin Reis <vreis@anl.gov>
Florence Monna <fmonna@anl.gov>
...@@ -45,6 +45,8 @@ def main(argv=None): ...@@ -45,6 +45,8 @@ def main(argv=None):
"argo_perf_wrapper": "nrm-perfwrapper", "argo_perf_wrapper": "nrm-perfwrapper",
"argo_nodeos_config": "argo_nodeos_config", "argo_nodeos_config": "argo_nodeos_config",
"pmpi_lib": "/usr/lib/libnrm-pmpi.so", "pmpi_lib": "/usr/lib/libnrm-pmpi.so",
"singularity": "singularity",
"container_runtime": "nodeos",
} }
if args.print_defaults: if args.print_defaults:
...@@ -60,15 +62,15 @@ def main(argv=None): ...@@ -60,15 +62,15 @@ def main(argv=None):
action="store_true") action="store_true")
parser.add_argument( parser.add_argument(
"--nrm_log", "--nrm_log",
help="Main log file. Override default with the NRM_LOG." help="Main log file. Override default with the NRM_LOG "
"environment variable", "environment variable",
default=os.environ.get('NRM_LOG', default=os.environ.get('NRM_LOG',
'/tmp/nrm.log')) '/tmp/nrm.log'))
parser.add_argument( parser.add_argument(
'--hwloc', '--hwloc',
help="Path to the hwloc to use. This path can be " help="Path to the hwloc to use. This path can be "
"relative and makes uses of the $PATH if necessary." "relative and makes uses of the $PATH if necessary. "
"Override default with the HWLOC environment" "Override default with the HWLOC environment "
"variable.", "variable.",
default=os.environ.get('HWLOC', default=os.environ.get('HWLOC',
'hwloc')) 'hwloc'))
...@@ -82,25 +84,39 @@ def main(argv=None): ...@@ -82,25 +84,39 @@ def main(argv=None):
'argo_nodeos_config')) 'argo_nodeos_config'))
parser.add_argument( parser.add_argument(
'--perf', '--perf',
help="Path to the linux perf tool to use. This path can be" help="Path to the linux perf tool to use. This path can be "
"relative and makes uses of the $PATH if necessary." "relative and makes uses of the $PATH if necessary. "
"Override default with the PERF environment" "Override default with the PERF environment "
"variable.", "variable.",
default=os.environ.get('PERF', default=os.environ.get('PERF',
'perf')) 'perf'))
parser.add_argument( parser.add_argument(
'--pmpi_lib', '--pmpi_lib',
help="Path to the libnrm PMPI library used for the power policy" help="Path to the libnrm PMPI library used for the power policy. "
"Override default with the PMPI environment variable.", "Override default with the PMPI environment variable.",
default=os.environ.get('PMPI', defaults['pmpi_lib'])) default=os.environ.get('PMPI', defaults['pmpi_lib']))
parser.add_argument( parser.add_argument(
'--argo_perf_wrapper', '--argo_perf_wrapper',
help="Path to the linux perf tool to use. This path can" help="Path to the linux perf tool to use. This path can "
"be relative and makes uses of the $PATH if " "be relative and makes uses of the $PATH if "
"necessary. Override default with the PERFWRAPPER " "necessary. Override default with the PERFWRAPPER "
"environment variable.", "environment variable.",
default=os.environ.get('ARGO_PERF_WRAPPER', default=os.environ.get('ARGO_PERF_WRAPPER',
'nrm-perfwrapper')) 'nrm-perfwrapper'))
parser.add_argument(
'--singularity',
help="Path to the singularity command. "
"Override default with the SINGULARITY environment variable.",
default=os.environ.get('SINGULARITY', defaults['singularity']))
parser.add_argument(
'--container-runtime',
help="Choice of container runtime. "
"Override default with the ARGO_CONTAINER_RUNTIME "
"environment variable.",
choices=['nodeos', 'singularity'],
default=os.environ.get('ARGO_CONTAINER_RUNTIME',
defaults['container_runtime']))
args = parser.parse_args(remaining_argv) args = parser.parse_args(remaining_argv)
nrm.daemon.runner(config=args) nrm.daemon.runner(config=args)
return(0) return(0)
......
...@@ -321,6 +321,29 @@ class App(SpecField): ...@@ -321,6 +321,29 @@ class App(SpecField):
return super(App, self).load(data) return super(App, self).load(data)
class Image(SpecField):
"""Information on the container image to use."""
fields = {"path": spec(unicode, True),
"type": spec(unicode, True),
}
def __init__(self):
"""Create an empty image."""
pass
def load(self, data):
"""Load from json dict."""
ret = super(Image, self).load(data)
if not ret:
return ret
if self.type not in ['sif', 'docker']:
logger.error("Image type not recognized")
return False
return True
class ImageManifest(SpecField): class ImageManifest(SpecField):
"""Represent an ACI Image Manifest.""" """Represent an ACI Image Manifest."""
...@@ -329,6 +352,7 @@ class ImageManifest(SpecField): ...@@ -329,6 +352,7 @@ class ImageManifest(SpecField):
"acVersion": spec(unicode, True), "acVersion": spec(unicode, True),
"name": spec(unicode, True), "name": spec(unicode, True),
"app": spec(App, True), "app": spec(App, True),
"image": spec(Image, False),
} }
def __init__(self): def __init__(self):
......
...@@ -13,7 +13,7 @@ from __future__ import print_function ...@@ -13,7 +13,7 @@ from __future__ import print_function
from aci import ImageManifest from aci import ImageManifest
from collections import namedtuple from collections import namedtuple
import logging import logging
from subprograms import ChrtClient, NodeOSClient, resources from subprograms import ChrtClient, NodeOSClient, resources, SingularityClient
import operator import operator
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
...@@ -30,7 +30,8 @@ class ContainerManager(object): ...@@ -30,7 +30,8 @@ class ContainerManager(object):
def __init__(self, container_runtime, rm, def __init__(self, container_runtime, rm,
perfwrapper="nrm-perfwrapper", perfwrapper="nrm-perfwrapper",
linuxperf="perf", linuxperf="perf",
pmpi_lib="/usr/lib/libnrm-pmpi.so"): pmpi_lib="/usr/lib/libnrm-pmpi.so",
downstream_event_uri="ipc:///tmp/nrm-downstream-event"):
self.linuxperf = linuxperf self.linuxperf = linuxperf
self.perfwrapper = perfwrapper self.perfwrapper = perfwrapper
self.runtime = container_runtime self.runtime = container_runtime
...@@ -40,6 +41,7 @@ class ContainerManager(object): ...@@ -40,6 +41,7 @@ class ContainerManager(object):
self.hwloc = rm.hwloc self.hwloc = rm.hwloc
self.chrt = ChrtClient() self.chrt = ChrtClient()
self.pmpi_lib = pmpi_lib self.pmpi_lib = pmpi_lib
self.downstream_event_uri = downstream_event_uri
def _get_container_tuple(self, container_name, manifest): def _get_container_tuple(self, container_name, manifest):
"""Retrieve a container tuple if the container exists, otherwise use """Retrieve a container tuple if the container exists, otherwise use
...@@ -110,7 +112,7 @@ class ContainerManager(object): ...@@ -110,7 +112,7 @@ class ContainerManager(object):
manifest) manifest)
if creation_needed: if creation_needed:
logger.info("Creating container %s", container_name) logger.info("Creating container %s", container_name)
self.runtime.create(container) self.runtime.create(container, self.downstream_event_uri)
self.containers[container_name] = container self.containers[container_name] = container
# build context to execute # build context to execute
...@@ -133,6 +135,11 @@ class ContainerManager(object): ...@@ -133,6 +135,11 @@ class ContainerManager(object):
environ['ARGO_NRM_RATELIMIT'] = \ environ['ARGO_NRM_RATELIMIT'] = \
manifest.app.isolators.monitoring.ratelimit manifest.app.isolators.monitoring.ratelimit
if container.power.get('policy') or \
manifest.is_feature_enabled('monitoring'):
environ['ARGO_NRM_DOWNSTREAM_EVENT_URI'] = \
self.downstream_event_uri
# build prefix to the entire command based on enabled features # build prefix to the entire command based on enabled features
argv = [] argv = []
if manifest.is_feature_enabled('scheduler'): if manifest.is_feature_enabled('scheduler'):
...@@ -210,7 +217,7 @@ class ContainerRuntime(object): ...@@ -210,7 +217,7 @@ class ContainerRuntime(object):
def __init__(self): def __init__(self):
pass pass
def create(self, container): def create(self, container, downstream_uri):
"""Create the container defined by the container namedtuple on the """Create the container defined by the container namedtuple on the
system.""" system."""
raise NotImplementedError raise NotImplementedError
...@@ -237,7 +244,7 @@ class NodeOSRuntime(ContainerRuntime): ...@@ -237,7 +244,7 @@ class NodeOSRuntime(ContainerRuntime):
path/command.""" path/command."""
self.client = NodeOSClient(argo_nodeos_config=path) self.client = NodeOSClient(argo_nodeos_config=path)
def create(self, container): def create(self, container, downstream_uri):
"""Uses the container resource allocation to create a container.""" """Uses the container resource allocation to create a container."""
self.client.create(container.uuid, container.resources) self.client.create(container.uuid, container.resources)
...@@ -250,6 +257,31 @@ class NodeOSRuntime(ContainerRuntime): ...@@ -250,6 +257,31 @@ class NodeOSRuntime(ContainerRuntime):
self.client.delete(container_uuid, kill) self.client.delete(container_uuid, kill)
class SingularityUserRuntime(ContainerRuntime):
"""Implements the container runtime interface using the singularity
subprogram."""
def __init__(self, path="singularity"):
"""Creates the client for singularity, with an optional custom
path/command."""
self.client = SingularityClient(singularity_path=path)
def create(self, container, downstream_uri):
"""Uses the container resource allocation to create a container."""
imageinfo = container.manifest.image
self.client.instance_start(container.uuid, imageinfo.path,
[downstream_uri])
def execute(self, container_uuid, args, environ):
"""Launches a command in the container."""
return self.client.execute(container_uuid, args, environ)
def delete(self, container_uuid, kill=False):
"""Delete the container."""
self.client.instance_stop(container_uuid, kill)
class DummyRuntime(ContainerRuntime): class DummyRuntime(ContainerRuntime):
"""Implements a dummy runtime that doesn't create any container, but still """Implements a dummy runtime that doesn't create any container, but still
...@@ -258,7 +290,7 @@ class DummyRuntime(ContainerRuntime): ...@@ -258,7 +290,7 @@ class DummyRuntime(ContainerRuntime):
def __init__(self): def __init__(self):
pass pass
def create(self, container): def create(self, container, downstream_uri):
pass pass
def execute(self, container_uuid, args, environ): def execute(self, container_uuid, args, environ):
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
from __future__ import print_function from __future__ import print_function
from applications import ApplicationManager from applications import ApplicationManager
from containers import ContainerManager, NodeOSRuntime from containers import ContainerManager, NodeOSRuntime, SingularityUserRuntime
from controller import Controller, PowerActuator from controller import Controller, PowerActuator
from powerpolicy import PowerPolicyManager from powerpolicy import PowerPolicyManager
from functools import partial from functools import partial
...@@ -307,15 +307,22 @@ class Daemon(object): ...@@ -307,15 +307,22 @@ class Daemon(object):
# create managers # create managers
self.resource_manager = ResourceManager(hwloc=self.config.hwloc) self.resource_manager = ResourceManager(hwloc=self.config.hwloc)
container_runtime = \ container_runtime = None
NodeOSRuntime(self.config.argo_nodeos_config) if self.config.container_runtime == 'nodeos':
container_runtime = \
NodeOSRuntime(path=self.config.argo_nodeos_config)
elif self.config.container_runtime == 'singularity':
container_runtime = \
SingularityUserRuntime(self.config.singularity)
assert(container_runtime is not None)
self.container_manager = ContainerManager( self.container_manager = ContainerManager(
container_runtime, container_runtime,
self.resource_manager, self.resource_manager,
perfwrapper=self.config.argo_perf_wrapper, perfwrapper=self.config.argo_perf_wrapper,
linuxperf=self.config.perf, linuxperf=self.config.perf,
pmpi_lib=self.config.pmpi_lib, pmpi_lib=self.config.pmpi_lib,
) downstream_event_uri=downstream_event_param,
)
self.application_manager = ApplicationManager() self.application_manager = ApplicationManager()
self.sensor_manager = SensorManager() self.sensor_manager = SensorManager()
pa = PowerActuator(self.sensor_manager) pa = PowerActuator(self.sensor_manager)
......
...@@ -141,6 +141,57 @@ class NodeOSClient(object): ...@@ -141,6 +141,57 @@ class NodeOSClient(object):
cwd=environ['PWD']) cwd=environ['PWD'])
class SingularityClient(object):
"""Client to singularity."""
def __init__(self, singularity_path="singularity"):
"""Load client configuration."""
self.prefix = singularity_path
def instance_start(self, instance_name, container_image, bind_list=[]):
"""Start a named instance of a container image.
Note that singularity will also start the startscript if
defined in the container image, which might be an issue."""
args = [self.prefix]
args.extend(['instance', 'start'])
if bind_list:
args.extend(['--bind', ','.join(bind_list)])
args.extend([container_image, instance_name])
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
def execute(self, instance_name, argv, environ):
"""Execute argv inside container.
singularity exec instance://instance_name <command>"""
args = [self.prefix] # singularity
container_name = "instance://" + instance_name
args.extend(['exec', container_name])
args.extend(argv)
return process.Subprocess(args, env=environ,
stdout=process.Subprocess.STREAM,
stderr=process.Subprocess.STREAM,
close_fds=True,
cwd=environ['PWD'])
def instance_stop(self, instance_name, kill=False):
"""Stop an instance and kill everything in it."""
args = [self.prefix]
args.extend(['instance', 'stop'])
if kill:
args.append("--force")
args.append(instance_name)
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
logpopen(p, args, stdout, stderr)
class ChrtClient(object): class ChrtClient(object):
"""Client to chrt command line wrapper.""" """Client to chrt command line wrapper."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment