Commit cd1d86b3 authored by Valentin Reis's avatar Valentin Reis

[feature] moves the message formats to json schema.

Adds the nrm/schemas repository which defines the communication schemas
for the upstream and downstream APIs. The messaging.py file now uses
decorators and two added python dependencies (jsonschema and warlock).
This commits also adds the .envrc direnv configuration file for
nix-based development.
parent cb3132e2
Pipeline #7313 passed with stages
in 5 minutes and 25 seconds
eval "$(lorri direnv)"
--- ---
variables: variables:
ARGOPKGS: "https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz" ARGOPKGS: "https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/nrm-gen/argopkgs-nrm-gen.tar.gz"
EXTRA: "--nrm ./." EXTRA: "--nrm ./."
stages: stages:
...@@ -11,11 +11,11 @@ stages: ...@@ -11,11 +11,11 @@ stages:
- quality - quality
include: include:
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/argonix.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/argonix.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/components.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/components.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/integration.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/integration.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/applications.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/repoquality.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/repoquality.yml
py.test: py.test:
stage: test stage: test
...@@ -28,6 +28,6 @@ py.test: ...@@ -28,6 +28,6 @@ py.test:
flake8: flake8:
stage: style stage: style
script: script:
- nix run -f "$ARGOPKGS" pythonPackages.flake8 --command flake8 nrm/* bin/* - nix run -f "$ARGOPKGS" pythonPackages.flake8 --command flake8 nrm/*.py bin/*
tags: tags:
- integration - integration
include Makefile include Makefile
include tox.ini include nrm/schemas/*.json
...@@ -9,6 +9,8 @@ six = "==1.11.0" ...@@ -9,6 +9,8 @@ six = "==1.11.0"
pyzmq = "*" pyzmq = "*"
tornado = "*" tornado = "*"
numpy = "*" numpy = "*"
warlock = "*"
scipy = "*"
argparse = "*" argparse = "*"
[dev-packages] [dev-packages]
......
This diff is collapsed.
...@@ -19,11 +19,8 @@ import nrm.messaging ...@@ -19,11 +19,8 @@ import nrm.messaging
import uuid import uuid
import sys import sys
import time import time
import collections
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
KillArgs = collections.namedtuple("Kill", ["uuid"])
class CommandLineInterface(object): class CommandLineInterface(object):
...@@ -37,12 +34,9 @@ class CommandLineInterface(object): ...@@ -37,12 +34,9 @@ class CommandLineInterface(object):
if uuid: if uuid:
logger.info("received signal %d, killing the application..", logger.info("received signal %d, killing the application..",
signum) signum)
command = {'api': 'up_rpc_req', self.client.send(
'type': 'kill', "Kill",
'container_uuid': uuid container_uuid=uuid)
}
msg = RPC_MSG['kill'](**command)
self.client.sendmsg(msg)
logger.info("killed the application, exiting.") logger.info("killed the application, exiting.")
else: else:
logger.info("received signal %d, exiting", signum) logger.info("received signal %d, exiting", signum)
...@@ -70,33 +64,26 @@ class CommandLineInterface(object): ...@@ -70,33 +64,26 @@ class CommandLineInterface(object):
self.pub_client.connect() self.pub_client.connect()
while(True): while(True):
msg = self.pub_client.recvmsg() msg = self.pub_client.recv()
logger.debug("pub message: %s", msg) logger.debug("pub message: %s", msg)
def print_if_filter(): def print_if_filter():
if argv.filter: if argv.filter:
if argv.filter == msg.type: if argv.filter == msg.tag:
if (msg.type == "performance" or if (msg.tag == "performance" or
msg.type == "progress"): msg.tag == "progress"):
print("%s, %s, %s" % (msg.type, time.time(), print("%s, %s, %s" % (msg.tag, time.time(),
msg.payload)) msg.payload))
if msg.type == "power": if msg.tag == "power":
print("%s, %s, %s" % (msg.type, time.time(), print("%s, %s, %s" % (msg.tag, time.time(),
msg.total)) msg.total))
if msg.type == "container_exit": if msg.tag == "exit":
print("%s, %s, %s" % (msg.type, time.time(), print("%s, %s, %s" % (msg.tag, time.time(),
msg.profile_data)) msg.profile_data))
else: else:
print("%s, %s" % (msg.type, time.time())) print("%s, %s" % (msg.tag, time.time()))
sys.stdout.flush() sys.stdout.flush()
print_if_filter() print_if_filter()
# if argv.uuid:
# uuid = getattr(msg, 'container_uuid', None)
# if argv.uuid == uuid or msg.type == "power":
# print_if_filter()
# else:
# print_if_filter()
def do_run(self, argv): def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command """ Connect to the NRM and ask to spawn a container and run a command
...@@ -108,6 +95,7 @@ class CommandLineInterface(object): ...@@ -108,6 +95,7 @@ class CommandLineInterface(object):
# the command a container uuid as a way to make sure that we can make # the command a container uuid as a way to make sure that we can make
# the command idempotent. # the command idempotent.
environ = os.environ environ = os.environ
# environ = []
container_uuid = argv.ucontainername or str(uuid.uuid4()) container_uuid = argv.ucontainername or str(uuid.uuid4())
# simple check + error msg + non-zero return code # simple check + error msg + non-zero return code
...@@ -118,26 +106,22 @@ class CommandLineInterface(object): ...@@ -118,26 +106,22 @@ class CommandLineInterface(object):
logger.error("Manifest file not found: %s", path) logger.error("Manifest file not found: %s", path)
sys.exit(1) sys.exit(1)
command = {'api': 'up_rpc_req',
'type': 'run',
'manifest': sanitize_manifest(argv.manifest),
'path': argv.command,
'args': argv.args,
'environ': dict(environ),
'container_uuid': container_uuid,
}
msg = RPC_MSG['run'](**command)
# command fsm # command fsm
state = 'init' state = 'init'
outeof = False outeof = False
erreof = False erreof = False
exitmsg = None exitmsg = None
self.client.sendmsg(msg) self.client.send(
tag="run",
manifest=sanitize_manifest(argv.manifest),
path=argv.command,
args=argv.args,
environ=dict(environ),
container_uuid=container_uuid)
# the first message tells us if we started a container or not # the first message tells us if we started a container or not
msg = self.client.recvmsg() msg = self.client.recv()
assert msg.api == 'up_rpc_rep' assert msg.tag == 'start'
assert msg.type == 'process_start'
def handler(signum, frame): def handler(signum, frame):
self.do_signal(msg.container_uuid, signum, frame) self.do_signal(msg.container_uuid, signum, frame)
...@@ -145,25 +129,24 @@ class CommandLineInterface(object): ...@@ -145,25 +129,24 @@ class CommandLineInterface(object):
state = 'started' state = 'started'
while(True): while(True):
msg = self.client.recvmsg() msg = self.client.recv()
assert msg.api == 'up_rpc_rep' assert msg.tag in ['stdout', 'stderr', 'exit']
assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
if msg.type == 'stdout': if msg.tag == 'stdout':
logger.debug("container msg: %r", msg) logger.debug("container msg: %r", msg)
if msg.payload == 'eof': if msg.payload == 'eof':
outeof = True outeof = True
else: else:
print(msg.payload) print(msg.payload, file=sys.stdout)
sys.stdout.flush() sys.stdout.flush()
elif msg.type == 'stderr': elif msg.tag == 'stderr':
logger.debug("container msg: %r", msg) logger.debug("container msg: %r", msg)
if msg.payload == 'eof': if msg.payload == 'eof':
erreof = True erreof = True
else: else:
print(msg.payload, file=sys.stderr) print(msg.payload, file=sys.stderr)
sys.stdout.flush() sys.stderr.flush()
elif msg.type == 'process_exit': elif msg.tag == 'exit':
logger.info("process ended: %r", msg) logger.info("process ended: %r", msg)
state = 'exiting' state = 'exiting'
exitmsg = msg exitmsg = msg
...@@ -197,13 +180,9 @@ class CommandLineInterface(object): ...@@ -197,13 +180,9 @@ class CommandLineInterface(object):
The NRM should respond to us with one message listing all The NRM should respond to us with one message listing all
containers.""" containers."""
command = {'api': 'up_rpc_req', self.client.send(tag="list")
'type': 'list'} msg = self.client.recv()
msg = RPC_MSG['list'](**command) assert msg.tag == 'list'
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'list'
logger.info("list response: %r", msg) logger.info("list response: %r", msg)
def do_kill(self, argv): def do_kill(self, argv):
...@@ -212,15 +191,9 @@ class CommandLineInterface(object): ...@@ -212,15 +191,9 @@ class CommandLineInterface(object):
The NRM should respond to us with a message containing the exit status The NRM should respond to us with a message containing the exit status
of the top process of the container.""" of the top process of the container."""
command = {'api': 'up_rpc_req', self.client.send(tag="kill", container_uuid=argv.uuid)
'type': 'kill', msg = self.client.recv()
'container_uuid': argv.uuid assert msg.tag == 'exit'
}
msg = RPC_MSG['kill'](**command)
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'exit'
logger.info("container exit: %r", msg) logger.info("container exit: %r", msg)
def do_setpower(self, argv): def do_setpower(self, argv):
...@@ -233,15 +206,9 @@ class CommandLineInterface(object): ...@@ -233,15 +206,9 @@ class CommandLineInterface(object):
# timely answer. # timely answer.
# TODO: check that the level makes a little bit of sense in the first # TODO: check that the level makes a little bit of sense in the first
# place # place
command = {'api': 'up_rpc_req', self.client.send(tag="setPower", limit=str(argv.limit))
'type': 'setpower', msg = self.client.recv()
'limit': str(argv.limit), assert msg.tag == 'getPower'
}
msg = RPC_MSG['setpower'](**command)
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'getpower'
logger.info("command received by the daemon: %r", msg) logger.info("command received by the daemon: %r", msg)
def main(self): def main(self):
......
...@@ -20,7 +20,6 @@ import subprocess ...@@ -20,7 +20,6 @@ import subprocess
import uuid import uuid
from nrm import messaging from nrm import messaging
PUB_MSG = messaging.MSGTYPES['down_event']
logger = logging.getLogger('perf-wrapper') logger = logging.getLogger('perf-wrapper')
...@@ -33,26 +32,19 @@ class PerfWrapper(object): ...@@ -33,26 +32,19 @@ class PerfWrapper(object):
pass pass
def shutdown(self): def shutdown(self):
update = {'api': 'down_event', self.downstream_event.send(tag="exit", application_uuid=self.app_uuid)
'type': 'application_exit',
'application_uuid': self.app_uuid,
}
msg = PUB_MSG['application_exit'](**update)
self.downstream_event.sendmsg(msg)
def performance_report(self, performance): def performance_report(self, performance):
update = {'api': 'down_event', self.downstream_event.send(
'type': 'performance', tag="performance",
'payload': performance, payload=performance,
'container_uuid': self.container_uuid, container_uuid=self.container_uuid,
'application_uuid': self.app_uuid, application_uuid=self.app_uuid)
}
msg = PUB_MSG['performance'](**update)
self.downstream_event.sendmsg(msg)
def setup(self): def setup(self):
downstream_url = "ipc:///tmp/nrm-downstream-event" downstream_url = "ipc:///tmp/nrm-downstream-event"
self.downstream_event = messaging.DownstreamEventClient(downstream_url) self.downstream_event = messaging.DownstreamEventClient(downstream_url)
logger.info("connecting downstream pub")
self.downstream_event.connect() self.downstream_event.connect()
logger.info("downstream pub socket connected to: %s", downstream_url) logger.info("downstream pub socket connected to: %s", downstream_url)
...@@ -64,13 +56,10 @@ class PerfWrapper(object): ...@@ -64,13 +56,10 @@ class PerfWrapper(object):
self.app_uuid = str(uuid.uuid4()) self.app_uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.app_uuid) logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon # send an hello to the demon
update = {'api': 'down_event', self.downstream_event.send(
'type': 'application_start', tag="start",
'container_uuid': self.container_uuid, container_uuid=self.container_uuid,
'application_uuid': self.app_uuid, application_uuid=self.app_uuid)
}
msg = PUB_MSG['application_start'](**update)
self.downstream_event.sendmsg(msg)
def main(self): def main(self):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
......
...@@ -91,8 +91,8 @@ class ApplicationManager(object): ...@@ -91,8 +91,8 @@ class ApplicationManager(object):
def register(self, msg, container): def register(self, msg, container):
"""Register a new downstream application.""" """Register a new downstream application."""
uuid = msg.application_uuid uuid = msg['application_uuid']
container_uuid = msg.container_uuid container_uuid = msg['container_uuid']
progress = 0 progress = 0
threads = False threads = False
phase_contexts = dict() phase_contexts = dict()
......
...@@ -25,41 +25,41 @@ class Action(object): ...@@ -25,41 +25,41 @@ class Action(object):
self.delta = delta self.delta = delta
class ApplicationActuator(object): # class ApplicationActuator(object):
#
"""Actuator in charge of application thread control.""" # """Actuator in charge of application thread control."""
#
def __init__(self, am, pubstream): # def __init__(self, am, pubstream):
self.application_manager = am # self.application_manager = am
self.pubstream = pubstream # self.pubstream = pubstream
#
def available_actions(self, target): # def available_actions(self, target):
ret = [] # ret = []
for identity, application in \ # for identity, application in \
self.application_manager.applications.iteritems(): # self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests(): # if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target) # delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta)) # ret.append(Action(application, target, delta))
return ret # return ret
#
def execute(self, action): # def execute(self, action):
target_threads = action.target.threads # target_threads = action.target.threads
update = {'type': 'application', # update = {'type': 'application',
'command': 'threads', # 'command': 'threads',
'uuid': action.target.uuid, # 'uuid': action.target.uuid,
'event': 'threads', # 'event': 'threads',
} # }
if action.command == 'i': # if action.command == 'i':
payload = target_threads['cur'] + 1 # payload = target_threads['cur'] + 1
elif action.command == 'd': # elif action.command == 'd':
payload = target_threads['cur'] - 1 # payload = target_threads['cur'] - 1
else: # else:
assert False, "impossible command" # assert False, "impossible command"
update['payload'] = payload # update['payload'] = payload
self.pubstream.send_json(update) # self.pubstream.send_json()
#
def update(self, action): # def update(self, action):
action.target.do_thread_transition(action.command) # action.target.do_thread_transition(action.command)
class PowerActuator(object): class PowerActuator(object):
......
This diff is collapsed.
This diff is collapsed.
{
"oneOf": [
{
"required": [
"tag",
"container_uuid",
"application_uuid"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"start"
]
},
"container_uuid": {
"type": "string"
},
"application_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"application_uuid"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"exit"
]
},
"application_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"container_uuid",
"application_uuid",
"payload"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"performance"
]
},
"payload": {
"type": "number"
},
"container_uuid": {
"type": "string"
},
"application_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"application_uuid",
"payload"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"progress"
]
},
"payload": {
"type": "number"
},
"application_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"cpu",
"startcompute",
"endcompute",
"startbarrier",
"endbarrier"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"phasecontext"
]
},
"endcompute": {
"type": "number"
},
"endbarrier": {
"type": "number"
},
"startbarrier": {
"type": "number"
},
"startcompute": {
"type": "number"
},
"cpu": {
"type": "number"
}
}
}
]
}
{
"oneOf": [
{
"required": [
"tag",
"total",
"limit"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"power"
]
},
"total": {
"type": "number"
},
"limit": {
"type": "number"
}
}
},
{
"required": [
"tag",
"container_uuid",
"errno",
"power"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"start"
]
},
"errno": {
"type": "number"
},
"power": {
"type": "string"
},
"container_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"container_uuid",
"profile_data"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"exit"
]
},
"profile_data": {
"additionalProperties": {
"type": "string"
},
"type": "object"
},
"container_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"container_uuid",
"payload"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"performance"
]
},
"payload": {
"type": "number"
},
"container_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"application_uuid",
"payload"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"progress"
]
},
"payload": {
"type": "number"
},
"application_uuid": {
"type": "string"