Commit 57980d37 authored by Valentin Reis's avatar Valentin Reis

[feature] moves the manifest management to json schema.

parent c7771b4a
Pipeline #7443 passed with stages
in 9 minutes and 14 seconds
......@@ -17,14 +17,6 @@ include:
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/repoquality.yml
py.test:
stage: test
script:
- pipenv install --dev
- pipenv run py.test --deselect=test/test_messaging.py
tags:
- rapl
flake8:
stage: style
script:
......
......@@ -147,7 +147,6 @@ class CommandLineInterface(object):
print(msg.payload, file=sys.stderr)
sys.stderr.flush()
elif msg.tag == 'exit':
logger.info("process ended: %r", msg)
state = 'exiting'
exitmsg = msg
else:
......@@ -155,7 +154,7 @@ class CommandLineInterface(object):
if outeof and erreof and state == 'exiting':
state = 'exit'
istatus = int(exitmsg.status)
logger.info("command ended: %r", exitmsg)
logger.debug("command ended with istatus %r.", exitmsg)
if os.WIFSIGNALED(istatus):
logger.error("command ended due to signal %s" %
str(os.WTERMSIG(istatus)))
......@@ -163,7 +162,7 @@ class CommandLineInterface(object):
elif os.WIFEXITED(istatus):
s = int(os.WTERMSIG(istatus))
if s > 0:
logger.debug("command ended with exit code %s" %
logger.error("command ended with exit code %s" %
str(s))
sys.exit(s)
else:
......
{
"acKind": "ImageManifest",
"acVersion": "0.6.0",
"name": "test",
"app": {
"isolators": [
{
"name": "argo/scheduler",
"value": {
"policy": "SCHED_OTHER",
"priority": "0"
}
},
{
"name": "argo/container",
"value": {
"cpus": "24",
"mems": "2"
}
},
{
"name": "argo/perfwrapper",
"value": {
"enabled": "1"
}
},
{
"name": "argo/power",
"value": {
"enabled": "1",
"profile": "1",
"policy": "NONE",
"damper": "1e9",
"slowdown": "1.1"
}
},
{
"name": "argo/monitoring",
"value": {
"enabled": "1",
"ratelimit": "10000000"
}
},
{
"name": "argo/hwbind",
"value": {
"enabled": "1"
}
}
]
}
}
{
"acKind": "ImageManifest",
"acVersion": "0.6.0",
"name": "test",
"app": {
"isolators": [
{
"name": "argo/scheduler",
"value": {
"policy": "SCHED_OTHER",
"priority": "0"
}
},
{
"name": "argo/container",
"value": {
"cpus": "1",
"mems": "1"
}
},
{
"name": "argo/perfwrapper",
"value": {
"enabled": "0"
}
},
{
"name": "argo/power",
"value": {
"enabled": "1",
"profile": "1",
"policy": "NONE",
"damper": "1e9",
"slowdown": "1.1"
}
},
{
"name": "argo/monitoring",
"value": {
"enabled": "1",
"ratelimit": "10000000"
}
},
{
"name": "argo/hwbind",
"value": {
"enabled": "0"
}
}
]
},
"image": {
"path": "docker://ubuntu",
"type": "docker"
}
}
{
"name": "basic",
"version": "0.0.1",
"app": {
"container": {
"cpus": 2,
"mems": 1
}
}
}
{
"name": "basic",
"version": "0.0.1",
"app": {
"container": {
"cpus": 2,
"mems": 1
}
},
"image": {
"path": "docker://ubuntu",
"type": "docker",
"binds": [
"/nix"
]
}
}
......@@ -9,380 +9,15 @@
###############################################################################
"""Parse and Represent the APPC ACI specification."""
import collections
import logging
import json
from schema import loadschema
logger = logging.getLogger('nrm')
spec = collections.namedtuple('Field', ['cls', 'required'])
class SpecField(object):
def has(self, f):
return(f in self.app.keys())
"""Object part of the ACI Image Manifest fields."""
fields = {}
def __init__(self):
"""Create empty field."""
pass
def load(self, data):
"""Load fields."""
for key in self.fields:
spec = self.fields[key]
if key not in data:
if spec.required:
logger.error("Missing key from manifest: %s", key)
return False
else:
ok, v = self.loadfield(data[key], spec.cls)
if not ok:
logger.error("Error for key %s in %s", key, self.__class__)
return False
setattr(self, key, v)
return True
def loadfield(self, data, cls):
"""load data as if from a field of the provided cls.
Make sure the basic types are also respected.
"""
ret = cls()
if not hasattr(ret, 'load'):
if not isinstance(data, cls):
logger.error("Wrong data type %s, expected: %s", cls,
data.__class__)
return (False, None)
else:
return (True, data)
else:
return (ret.load(data), ret)
class Scheduler(SpecField):
"""Scheduler information for a container."""
classes = ['SCHED_FIFO', 'SCHED_HPC', 'SCHED_OTHER']
fields = {"policy": spec(unicode, True),
"priority": spec(unicode, False),
"enabled": spec(unicode, False),
}
def __init__(self):
"""Create scheduler object."""
pass
def load(self, data):
"""Load configuration from json text."""
ret = super(Scheduler, self).load(data)
if not ret:
return ret
# check scheduler class & prio
if self.policy not in self.classes:
logger.error("Wrong scheduling class %s, not any of %r", data,
Scheduler.classes)
return False
if self.policy != "SCHED_OTHER":
logger.warning("scheduler priority forced as 0 " +
"for non default policies")
self.priority = "0"
if getattr(self, "enabled", "1") not in ["0", "False", "1", "True"]:
logger.error("Invalid value for scheduler enabled: %s",
self.enabled)
return False
return True
class CPUSet(SpecField):
"""Represent a CPUSet field."""
def __init__(self):
"""Create an empty set."""
pass
def load(self, data):
"""Load from json object."""
self.value = data
return True
class MemSet(SpecField):
"""Represent a MemSet field."""
def __init__(self):
"""Create an empty set."""
pass
def load(self, data):
"""Load from json object."""
self.value = data
return True
class Container(SpecField):
"""Container Information."""
fields = {"cpus": spec(CPUSet, True),
"mems": spec(MemSet, True)
}
def __init__(self):
"""Create empty container."""
pass
def load(self, data):
"""Load container information."""
return super(Container, self).load(data)
class PerfWrapper(SpecField):
"""Information on whether to use perf for a container."""
fields = {"enabled": spec(unicode, False)
}
def __init__(self):
"""Create empty perf wrapper."""
pass
def load(self, data):
"""Load perf wrapper information."""
ret = super(PerfWrapper, self).load(data)
if not ret:
return ret
if getattr(self, "enabled", "1") not in ["0", "False", "1", "True"]:
logger.error("Invalid value for perfwrapper enabled: %s",
self.enabled)
return False
return True
class Power(SpecField):
"""Power settings for a container."""
policies = ['NONE', 'DDCM', 'DVFS', 'COMBINED']
fields = {"enabled": spec(unicode, False),
"profile": spec(unicode, False),
"policy": spec(unicode, False),
"damper": spec(unicode, False),
"slowdown": spec(unicode, False)
}
def __init__(self):
"""Create empty power settings object."""
pass
def load(self, data):
"""Load power settings."""
ret = super(Power, self).load(data)
if not ret:
return ret
if self.enabled not in ["0", "False", "1", "True"]:
logger.error("Invalid value for power enabled: %s",
self.enabled)
return False
if self.profile not in ["0", "False", "1", "True"]:
logger.error("Invalid value for power profile: %s",
self.enabled)
return False
if self.policy not in self.policies:
logger.error("Invalid value for power policy: %s",
self.policy)
return False
if self.damper < 0.0:
logger.error("Invalid value for power policy damper: %s",
self.policy)
return False
if self.slowdown < 1.0:
logger.error("Invalid value for power policy slowdown: %s",
self.policy)
return False
if self.damper < 0.0:
logger.error("Invalid value of powerpolicy damper: %s",
self.policy)
return False
if self.slowdown < 1.0:
logger.error("Invalid value of powerpolicy slowdown: %s",
self.policy)
return False
return True
class HwBind(SpecField):
"""Hardware bindings for a container."""
fields = {"enabled": spec(unicode, False),
}
def __init__(self):
"""Create empty hardware bindings settings object."""
pass
def load(self, data):
"""Load hardware bindings settings."""
ret = super(HwBind, self).load(data)
if not ret:
return ret
if self.enabled not in ["0", "False", "1", "True"]:
logger.error("Invalid value for hardware bindings enabled: %s",
self.enabled)
return False
return True
class Monitoring(SpecField):
"""Monitoring options (libnrm)."""
fields = {"enabled": spec(unicode, False),
"ratelimit": spec(unicode, False),
}
def __init__(self):
"""Create empty monitoring option object."""
pass
def load(self, data):
"""Load monitoring options."""
ret = super(Monitoring, self).load(data)
if not ret:
return ret
if self.enabled not in ["0", "False", "1", "True"]:
logger.error("Invalid value for monitoring options enabled: %s",
self.enabled)
return False
if self.ratelimit < 0:
logger.error("Invalid value for monitoring ratelimit: %s",
self.ratelimit)
return False
return True
class IsolatorList(SpecField):
"""Represent the list of isolator in a Manifest."""
types = {"argo/scheduler": spec(Scheduler, False),
"argo/container": spec(Container, True),
"argo/perfwrapper": spec(PerfWrapper, False),
"argo/power": spec(Power, False),
"argo/hwbind": spec(HwBind, False),
"argo/monitoring": spec(Monitoring, False),
}
def __init__(self):
"""Create empty list."""
pass
def load(self, data):
"""Load from json struct."""
for e in data:
name = e['name']
if name in self.types:
t = self.types[name]
ok, v = super(IsolatorList, self).loadfield(e['value'], t.cls)
if not ok:
logger.error("Error with %s in %s", name, self.__class__)
return False
setattr(self, name.lstrip("argo/"), v)
for k in self.types:
if self.types[k].required:
if not hasattr(self, k.lstrip("argo/")):
logger.error("Missing mandatory isolator: %s", k)
return False
return True
class App(SpecField):
"""Represent the App part of an Image Manifest."""
# attribute, subclass, required
fields = {"environment": spec(list, False),
"isolators": spec(IsolatorList, True),
}
def __init__(self):
"""Create empty container."""
pass
def load(self, data):
"""Load from json dict."""
return super(App, self).load(data)
class Image(SpecField):
"""Information on the container image to use."""
fields = {"path": spec(unicode, True),
"type": spec(unicode, True),
}
def __init__(self):
"""Create an empty image."""
pass
def load(self, data):
"""Load from json dict."""
ret = super(Image, self).load(data)
if not ret:
return ret
if self.type not in ['sif', 'docker']:
logger.error("Image type not recognized")
return False
return True
class ImageManifest(SpecField):
"""Represent an ACI Image Manifest."""
fields = {"acKind": spec(unicode, True),
"acVersion": spec(unicode, True),
"name": spec(unicode, True),
"app": spec(App, True),
"image": spec(Image, False),
}
def __init__(self):
"""Create empty manifest."""
pass
def load(self, filename):
"""Load a manifest from JSON file."""
with open(filename, 'r') as f:
data = json.load(f)
return super(ImageManifest, self).load(data)
def load_dict(self, data):
"""Load a manifest in dictionary form."""
return super(ImageManifest, self).load(data)
def is_feature_enabled(self, feature, true_values=["1", "True"]):
"""Check if a specific feature is enabled.
Since the enabled field itself is optional, we return true if an
isolator is present in a manifest or the enabled field is not true."""
typename = "argo/{}".format(feature)
assert typename in IsolatorList.types, \
"{} in not a valid feature".format(feature)
logger.debug(repr(self))
if hasattr(self.app.isolators, feature):
isolator = getattr(self.app.isolators, feature)
if hasattr(isolator, 'enabled'):
if isolator.enabled not in true_values:
return False
return True
else:
return False
ImageManifest = loadschema("manifest")
setattr(ImageManifest, "is_feature_enabled", has)
......@@ -11,6 +11,7 @@
from __future__ import print_function
from aci import ImageManifest
from json import load
from collections import namedtuple
import logging
from subprograms import ChrtClient, NodeOSClient, resources, SingularityClient
......@@ -54,8 +55,8 @@ class ContainerManager(object):
return (False, self.containers[container_name])
# ask the resource manager for resources
ncpus = int(manifest.app.isolators.container.cpus.value)
nmems = int(manifest.app.isolators.container.mems.value)
ncpus = manifest.app['container']['cpus']
nmems = manifest.app['container']['mems']
req = resources(ncpus, nmems)
allocated = self.resourcemanager.schedule(container_name, req)
logger.info("create: allocation: %r", allocated)
......@@ -69,15 +70,15 @@ class ContainerManager(object):
container_power['manager'] = None
if manifest.is_feature_enabled('power'):
pp = manifest.app.isolators.power
if pp.profile in ["1", "True"]:
pp = manifest.app['power']
if pp['profile'] is True:
container_power['profile'] = dict()
container_power['profile']['start'] = dict()
container_power['profile']['end'] = dict()
if pp.policy != "NONE":
container_power['policy'] = pp.policy
container_power['damper'] = pp.damper
container_power['slowdown'] = pp.slowdown
if pp['policy'] != "NONE":
container_power['policy'] = pp['policy']
container_power['damper'] = pp['damper']
container_power['slowdown'] = pp['slowdown']
# Compute hardware bindings
hwbindings = dict()
......@@ -103,10 +104,12 @@ class ContainerManager(object):
logger.info("create: args: %r", args)
logger.info("create: container name: %s", container_name)
manifest = ImageManifest()
if not manifest.load(manifestfile):
logger.error("Manifest is invalid")
return None
try:
with open(manifestfile) as f:
manifest = ImageManifest((load(f)))
except Exception as e:
logger.error("error occured in manifest loading:")
raise(e)
creation_needed, container = self._get_container_tuple(container_name,
manifest)
......@@ -120,7 +123,7 @@ class ContainerManager(object):
# "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
environ['ARGO_CONTAINER_UUID'] = container_name
environ['PERF'] = self.linuxperf
environ['AC_APP_NAME'] = manifest.name
environ['AC_APP_NAME'] = manifest['name']
environ['AC_METADATA_URL'] = "localhost"
# power profiling uses LD_PRELOAD, we use get to ensure that it
......@@ -133,7 +136,7 @@ class ContainerManager(object):
# monitoring section involves libnrm
if manifest.is_feature_enabled('monitoring'):
environ['ARGO_NRM_RATELIMIT'] = \
manifest.app.isolators.monitoring.ratelimit
manifest.app['monitoring']['ratelimit']
if container.power.get('policy') or \
manifest.is_feature_enabled('monitoring'):
......@@ -143,7 +146,7 @@ class ContainerManager(object):
# build prefix to the entire command based on enabled features
argv = []
if manifest.is_feature_enabled('scheduler'):
sched = manifest.app.isolators.scheduler
sched = manifest.app['scheduler']
argv = self.chrt.getwrappedcmd(sched)
# Use hwloc-bind to launch each process in the conatiner by prepending
......@@ -270,8 +273,8 @@ class SingularityUserRuntime(ContainerRuntime):
def create(self, container, downstream_uri):
"""Uses the container resource allocation to create a container."""
imageinfo = container.manifest.image
self.client.instance_start(container.uuid, imageinfo.path,
[downstream_uri])
self.client.instance_start(container.uuid, imageinfo['path'],
[downstream_uri]+imageinfo['binds'])
def execute(self, container_uuid, args, environ):
"""Launches a command in the container."""
......
......@@ -12,32 +12,20 @@ import json
import logging
import uuid
import zmq
import os
import zmq.utils
import zmq.utils.monitor
from zmq.eventloop import zmqstream
import warlock
from jsonschema import Draft4Validator
from schema import loadschema
_logger = logging.getLogger('nrm')
def _loadschema(api):
sourcedir = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(sourcedir, "schemas", api+".json")) as f:
s = json.load(f)
Draft4Validator.check_schema(s)
return(warlock.model_factory(s))
_UpstreamRep = _loadschema('upstreamRep')
_UpstreamPub = _loadschema('upstreamPub')
_UpstreamRep = loadschema('upstreamRep')
_UpstreamPub = loadschema('upstreamPub')
def send(apiname):
def wrap(cls):
model = _loadschema(apiname)
model = loadschema(apiname)
def send(self, *args, **kwargs):
self.socket.send(
......@@ -50,7 +38,7 @@ def send(apiname):
def recv_callback(apiname):
def wrap(cls):
model = _loadschema(apiname)
model = loadschema(apiname)
def recv(self):
"""Receives a response to a message."""
......@@ -63,7 +51,6 @@ def recv_callback(apiname):
callback."""
_logger.info("receiving message: %r", frames)
assert len(frames) == 2
print(frames)
msg = model(json.loads(frames[1]))
assert self.callback
self.callback(msg, str(frames[0]))
......
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
import warlock
import json
import os
from jsonschema import Draft4Validator
def loadschema(api):
sourcedir = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(sourcedir, "schemas", api+".json")) as f:
s = json.load(f)
Draft4Validator.check_schema(s)
return(warlock.model_factory(s))
{
"type": "object",
"required": [
"name",
"version",
"app"