Commit 9b2d4452 authored by Valentin Reis's avatar Valentin Reis

Merge branch 'nrm-gen' into 'master'

Use json schemas for message formats

See merge request !90
parents cb3132e2 cd1d86b3
Pipeline #7316 passed with stages
in 7 minutes and 1 second
eval "$(lorri direnv)"
--- ---
variables: variables:
ARGOPKGS: "https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/master/argopkgs-master.tar.gz" ARGOPKGS: "https://xgitlab.cels.anl.gov/argo/argopkgs/-/archive/nrm-gen/argopkgs-nrm-gen.tar.gz"
EXTRA: "--nrm ./." EXTRA: "--nrm ./."
stages: stages:
...@@ -11,11 +11,11 @@ stages: ...@@ -11,11 +11,11 @@ stages:
- quality - quality
include: include:
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/argonix.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/argonix.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/components.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/components.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/integration.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/integration.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/applications.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/applications.yml
- https://xgitlab.cels.anl.gov/argo/argopkgs/raw/master/gitlab-ci/repoquality.yml - https://xgitlab.cels.anl.gov/argo/argopkgs/raw/nrm-gen/gitlab-ci/repoquality.yml
py.test: py.test:
stage: test stage: test
...@@ -28,6 +28,6 @@ py.test: ...@@ -28,6 +28,6 @@ py.test:
flake8: flake8:
stage: style stage: style
script: script:
- nix run -f "$ARGOPKGS" pythonPackages.flake8 --command flake8 nrm/* bin/* - nix run -f "$ARGOPKGS" pythonPackages.flake8 --command flake8 nrm/*.py bin/*
tags: tags:
- integration - integration
include Makefile include Makefile
include tox.ini include nrm/schemas/*.json
...@@ -9,6 +9,8 @@ six = "==1.11.0" ...@@ -9,6 +9,8 @@ six = "==1.11.0"
pyzmq = "*" pyzmq = "*"
tornado = "*" tornado = "*"
numpy = "*" numpy = "*"
warlock = "*"
scipy = "*"
argparse = "*" argparse = "*"
[dev-packages] [dev-packages]
......
This diff is collapsed.
...@@ -19,11 +19,8 @@ import nrm.messaging ...@@ -19,11 +19,8 @@ import nrm.messaging
import uuid import uuid
import sys import sys
import time import time
import collections
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
KillArgs = collections.namedtuple("Kill", ["uuid"])
class CommandLineInterface(object): class CommandLineInterface(object):
...@@ -37,12 +34,9 @@ class CommandLineInterface(object): ...@@ -37,12 +34,9 @@ class CommandLineInterface(object):
if uuid: if uuid:
logger.info("received signal %d, killing the application..", logger.info("received signal %d, killing the application..",
signum) signum)
command = {'api': 'up_rpc_req', self.client.send(
'type': 'kill', "Kill",
'container_uuid': uuid container_uuid=uuid)
}
msg = RPC_MSG['kill'](**command)
self.client.sendmsg(msg)
logger.info("killed the application, exiting.") logger.info("killed the application, exiting.")
else: else:
logger.info("received signal %d, exiting", signum) logger.info("received signal %d, exiting", signum)
...@@ -70,33 +64,26 @@ class CommandLineInterface(object): ...@@ -70,33 +64,26 @@ class CommandLineInterface(object):
self.pub_client.connect() self.pub_client.connect()
while(True): while(True):
msg = self.pub_client.recvmsg() msg = self.pub_client.recv()
logger.debug("pub message: %s", msg) logger.debug("pub message: %s", msg)
def print_if_filter(): def print_if_filter():
if argv.filter: if argv.filter:
if argv.filter == msg.type: if argv.filter == msg.tag:
if (msg.type == "performance" or if (msg.tag == "performance" or
msg.type == "progress"): msg.tag == "progress"):
print("%s, %s, %s" % (msg.type, time.time(), print("%s, %s, %s" % (msg.tag, time.time(),
msg.payload)) msg.payload))
if msg.type == "power": if msg.tag == "power":
print("%s, %s, %s" % (msg.type, time.time(), print("%s, %s, %s" % (msg.tag, time.time(),
msg.total)) msg.total))
if msg.type == "container_exit": if msg.tag == "exit":
print("%s, %s, %s" % (msg.type, time.time(), print("%s, %s, %s" % (msg.tag, time.time(),
msg.profile_data)) msg.profile_data))
else: else:
print("%s, %s" % (msg.type, time.time())) print("%s, %s" % (msg.tag, time.time()))
sys.stdout.flush() sys.stdout.flush()
print_if_filter() print_if_filter()
# if argv.uuid:
# uuid = getattr(msg, 'container_uuid', None)
# if argv.uuid == uuid or msg.type == "power":
# print_if_filter()
# else:
# print_if_filter()
def do_run(self, argv): def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command """ Connect to the NRM and ask to spawn a container and run a command
...@@ -108,6 +95,7 @@ class CommandLineInterface(object): ...@@ -108,6 +95,7 @@ class CommandLineInterface(object):
# the command a container uuid as a way to make sure that we can make # the command a container uuid as a way to make sure that we can make
# the command idempotent. # the command idempotent.
environ = os.environ environ = os.environ
# environ = []
container_uuid = argv.ucontainername or str(uuid.uuid4()) container_uuid = argv.ucontainername or str(uuid.uuid4())
# simple check + error msg + non-zero return code # simple check + error msg + non-zero return code
...@@ -118,26 +106,22 @@ class CommandLineInterface(object): ...@@ -118,26 +106,22 @@ class CommandLineInterface(object):
logger.error("Manifest file not found: %s", path) logger.error("Manifest file not found: %s", path)
sys.exit(1) sys.exit(1)
command = {'api': 'up_rpc_req',
'type': 'run',
'manifest': sanitize_manifest(argv.manifest),
'path': argv.command,
'args': argv.args,
'environ': dict(environ),
'container_uuid': container_uuid,
}
msg = RPC_MSG['run'](**command)
# command fsm # command fsm
state = 'init' state = 'init'
outeof = False outeof = False
erreof = False erreof = False
exitmsg = None exitmsg = None
self.client.sendmsg(msg) self.client.send(
tag="run",
manifest=sanitize_manifest(argv.manifest),
path=argv.command,
args=argv.args,
environ=dict(environ),
container_uuid=container_uuid)
# the first message tells us if we started a container or not # the first message tells us if we started a container or not
msg = self.client.recvmsg() msg = self.client.recv()
assert msg.api == 'up_rpc_rep' assert msg.tag == 'start'
assert msg.type == 'process_start'
def handler(signum, frame): def handler(signum, frame):
self.do_signal(msg.container_uuid, signum, frame) self.do_signal(msg.container_uuid, signum, frame)
...@@ -145,25 +129,24 @@ class CommandLineInterface(object): ...@@ -145,25 +129,24 @@ class CommandLineInterface(object):
state = 'started' state = 'started'
while(True): while(True):
msg = self.client.recvmsg() msg = self.client.recv()
assert msg.api == 'up_rpc_rep' assert msg.tag in ['stdout', 'stderr', 'exit']
assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
if msg.type == 'stdout': if msg.tag == 'stdout':
logger.debug("container msg: %r", msg) logger.debug("container msg: %r", msg)
if msg.payload == 'eof': if msg.payload == 'eof':
outeof = True outeof = True
else: else:
print(msg.payload) print(msg.payload, file=sys.stdout)
sys.stdout.flush() sys.stdout.flush()
elif msg.type == 'stderr': elif msg.tag == 'stderr':
logger.debug("container msg: %r", msg) logger.debug("container msg: %r", msg)
if msg.payload == 'eof': if msg.payload == 'eof':
erreof = True erreof = True
else: else:
print(msg.payload, file=sys.stderr) print(msg.payload, file=sys.stderr)
sys.stdout.flush() sys.stderr.flush()
elif msg.type == 'process_exit': elif msg.tag == 'exit':
logger.info("process ended: %r", msg) logger.info("process ended: %r", msg)
state = 'exiting' state = 'exiting'
exitmsg = msg exitmsg = msg
...@@ -197,13 +180,9 @@ class CommandLineInterface(object): ...@@ -197,13 +180,9 @@ class CommandLineInterface(object):
The NRM should respond to us with one message listing all The NRM should respond to us with one message listing all
containers.""" containers."""
command = {'api': 'up_rpc_req', self.client.send(tag="list")
'type': 'list'} msg = self.client.recv()
msg = RPC_MSG['list'](**command) assert msg.tag == 'list'
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'list'
logger.info("list response: %r", msg) logger.info("list response: %r", msg)
def do_kill(self, argv): def do_kill(self, argv):
...@@ -212,15 +191,9 @@ class CommandLineInterface(object): ...@@ -212,15 +191,9 @@ class CommandLineInterface(object):
The NRM should respond to us with a message containing the exit status The NRM should respond to us with a message containing the exit status
of the top process of the container.""" of the top process of the container."""
command = {'api': 'up_rpc_req', self.client.send(tag="kill", container_uuid=argv.uuid)
'type': 'kill', msg = self.client.recv()
'container_uuid': argv.uuid assert msg.tag == 'exit'
}
msg = RPC_MSG['kill'](**command)
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'exit'
logger.info("container exit: %r", msg) logger.info("container exit: %r", msg)
def do_setpower(self, argv): def do_setpower(self, argv):
...@@ -233,15 +206,9 @@ class CommandLineInterface(object): ...@@ -233,15 +206,9 @@ class CommandLineInterface(object):
# timely answer. # timely answer.
# TODO: check that the level makes a little bit of sense in the first # TODO: check that the level makes a little bit of sense in the first
# place # place
command = {'api': 'up_rpc_req', self.client.send(tag="setPower", limit=str(argv.limit))
'type': 'setpower', msg = self.client.recv()
'limit': str(argv.limit), assert msg.tag == 'getPower'
}
msg = RPC_MSG['setpower'](**command)
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'getpower'
logger.info("command received by the daemon: %r", msg) logger.info("command received by the daemon: %r", msg)
def main(self): def main(self):
......
...@@ -20,7 +20,6 @@ import subprocess ...@@ -20,7 +20,6 @@ import subprocess
import uuid import uuid
from nrm import messaging from nrm import messaging
PUB_MSG = messaging.MSGTYPES['down_event']
logger = logging.getLogger('perf-wrapper') logger = logging.getLogger('perf-wrapper')
...@@ -33,26 +32,19 @@ class PerfWrapper(object): ...@@ -33,26 +32,19 @@ class PerfWrapper(object):
pass pass
def shutdown(self): def shutdown(self):
update = {'api': 'down_event', self.downstream_event.send(tag="exit", application_uuid=self.app_uuid)
'type': 'application_exit',
'application_uuid': self.app_uuid,
}
msg = PUB_MSG['application_exit'](**update)
self.downstream_event.sendmsg(msg)
def performance_report(self, performance): def performance_report(self, performance):
update = {'api': 'down_event', self.downstream_event.send(
'type': 'performance', tag="performance",
'payload': performance, payload=performance,
'container_uuid': self.container_uuid, container_uuid=self.container_uuid,
'application_uuid': self.app_uuid, application_uuid=self.app_uuid)
}
msg = PUB_MSG['performance'](**update)
self.downstream_event.sendmsg(msg)
def setup(self): def setup(self):
downstream_url = "ipc:///tmp/nrm-downstream-event" downstream_url = "ipc:///tmp/nrm-downstream-event"
self.downstream_event = messaging.DownstreamEventClient(downstream_url) self.downstream_event = messaging.DownstreamEventClient(downstream_url)
logger.info("connecting downstream pub")
self.downstream_event.connect() self.downstream_event.connect()
logger.info("downstream pub socket connected to: %s", downstream_url) logger.info("downstream pub socket connected to: %s", downstream_url)
...@@ -64,13 +56,10 @@ class PerfWrapper(object): ...@@ -64,13 +56,10 @@ class PerfWrapper(object):
self.app_uuid = str(uuid.uuid4()) self.app_uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.app_uuid) logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon # send an hello to the demon
update = {'api': 'down_event', self.downstream_event.send(
'type': 'application_start', tag="start",
'container_uuid': self.container_uuid, container_uuid=self.container_uuid,
'application_uuid': self.app_uuid, application_uuid=self.app_uuid)
}
msg = PUB_MSG['application_start'](**update)
self.downstream_event.sendmsg(msg)
def main(self): def main(self):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
......
...@@ -91,8 +91,8 @@ class ApplicationManager(object): ...@@ -91,8 +91,8 @@ class ApplicationManager(object):
def register(self, msg, container): def register(self, msg, container):
"""Register a new downstream application.""" """Register a new downstream application."""
uuid = msg.application_uuid uuid = msg['application_uuid']
container_uuid = msg.container_uuid container_uuid = msg['container_uuid']
progress = 0 progress = 0
threads = False threads = False
phase_contexts = dict() phase_contexts = dict()
......
...@@ -25,41 +25,41 @@ class Action(object): ...@@ -25,41 +25,41 @@ class Action(object):
self.delta = delta self.delta = delta
class ApplicationActuator(object): # class ApplicationActuator(object):
#
"""Actuator in charge of application thread control.""" # """Actuator in charge of application thread control."""
#
def __init__(self, am, pubstream): # def __init__(self, am, pubstream):
self.application_manager = am # self.application_manager = am
self.pubstream = pubstream # self.pubstream = pubstream
#
def available_actions(self, target): # def available_actions(self, target):
ret = [] # ret = []
for identity, application in \ # for identity, application in \
self.application_manager.applications.iteritems(): # self.application_manager.applications.iteritems():
if target in application.get_allowed_thread_requests(): # if target in application.get_allowed_thread_requests():
delta = application.get_thread_request_impact(target) # delta = application.get_thread_request_impact(target)
ret.append(Action(application, target, delta)) # ret.append(Action(application, target, delta))
return ret # return ret
#
def execute(self, action): # def execute(self, action):
target_threads = action.target.threads # target_threads = action.target.threads
update = {'type': 'application', # update = {'type': 'application',
'command': 'threads', # 'command': 'threads',
'uuid': action.target.uuid, # 'uuid': action.target.uuid,
'event': 'threads', # 'event': 'threads',
} # }
if action.command == 'i': # if action.command == 'i':
payload = target_threads['cur'] + 1 # payload = target_threads['cur'] + 1
elif action.command == 'd': # elif action.command == 'd':
payload = target_threads['cur'] - 1 # payload = target_threads['cur'] - 1
else: # else:
assert False, "impossible command" # assert False, "impossible command"
update['payload'] = payload # update['payload'] = payload
self.pubstream.send_json(update) # self.pubstream.send_json()
#
def update(self, action): # def update(self, action):
action.target.do_thread_transition(action.command) # action.target.do_thread_transition(action.command)
class PowerActuator(object): class PowerActuator(object):
......
This diff is collapsed.
This diff is collapsed.
{
"oneOf": [
{
"required": [
"tag",
"container_uuid",
"application_uuid"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"start"
]
},
"container_uuid": {
"type": "string"
},
"application_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"application_uuid"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"exit"
]
},
"application_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"container_uuid",
"application_uuid",
"payload"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"performance"
]
},
"payload": {
"type": "number"
},
"container_uuid": {
"type": "string"
},
"application_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"application_uuid",
"payload"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"progress"
]
},
"payload": {
"type": "number"
},
"application_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"cpu",
"startcompute",
"endcompute",
"startbarrier",
"endbarrier"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"phasecontext"
]
},
"endcompute": {
"type": "number"
},
"endbarrier": {
"type": "number"
},
"startbarrier": {
"type": "number"
},
"startcompute": {
"type": "number"
},
"cpu": {
"type": "number"
}
}
}
]
}
{
"oneOf": [
{
"required": [
"tag",
"total",
"limit"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"power"
]
},
"total": {
"type": "number"
},
"limit": {
"type": "number"
}
}
},
{
"required": [
"tag",
"container_uuid",
"errno",
"power"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"start"
]
},
"errno": {
"type": "number"
},
"power": {
"type": "string"
},
"container_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"container_uuid",
"profile_data"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"exit"
]
},
"profile_data": {
"additionalProperties": {
"type": "string"
},
"type": "object"
},
"container_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"container_uuid",
"payload"
],
"type": "object",
"properties": {
"tag": {
"type": "string",
"enum": [
"performance"
]
},
"payload": {
"type": "number"
},
"container_uuid": {
"type": "string"
}
}
},
{
"required": [
"tag",
"application_uuid",
"payload"