Commit c142c589 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

Merge branch 'messaging-layer' into 'master'

Improve Messaging layer

See merge request !28
parents d384712c 0b0ab966
Pipeline #4169 passed with stages
in 54 seconds
......@@ -3,12 +3,12 @@
from __future__ import print_function
import argparse
import logging
import uuid
import signal
import zmq
import os
import nrm.messaging
logger = logging.getLogger('nrm-cmd')
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
class CommandLineInterface(object):
......@@ -23,85 +23,55 @@ class CommandLineInterface(object):
exit(1)
def setup(self):
# SUB port to the upstream API (connected to its PUB port)
upstream_sub_port = 2345
# PUB port to the upstream API (connected to its SUB port)
upstream_pub_port = 3456
self.context = zmq.Context()
self.upstream_pub_socket = self.context.socket(zmq.PUB)
self.upstream_sub_socket = self.context.socket(zmq.SUB)
upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)
self.upstream_pub_socket.connect(upstream_pub_param)
self.upstream_sub_socket.connect(upstream_sub_param)
# we want to receive everything for now
upstream_sub_filter = ""
self.upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param)
# upstream RPC port
upstream_client_port = 3456
upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
# create a uuid for this client instance
self.uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.uuid)
self.client.wait_connected()
def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command
in it.
The NRM should notify us on the pub socket of the container
creation."""
The NRM should reply for container info."""
# build the command as a JSON dict containing enough info. We add to
# the command a container uuid as a way to make sure that we can make
# the command idempotent.
environ = os.environ
command = {'clientid': self.uuid,
'ucontainername': argv.ucontainername,
'command': 'run',
command = {'api': 'up_rpc_req',
'type': 'run',
'manifest': argv.manifest,
'file': argv.command,
'path': argv.command,
'args': argv.args,
'environ': dict(environ),
'container_uuid': str(argv.ucontainername),
}
msg = RPC_MSG['run'](**command)
# command fsm
state = 'init'
outeof = False
erreof = False
exitmsg = None
self.upstream_pub_socket.send_json(command)
self.client.sendmsg(msg)
while(True):
msg = self.upstream_sub_socket.recv_json()
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['clientid'] == self.uuid:
if msg['event'] == 'start':
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'stdout', 'stderr', 'exit',
'process_start', 'process_exit']
if msg.type == 'start':
if state == 'init':
state = 'started'
logger.info("container started: %r", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg['event'] == 'stdout':
logger.info("container msg: %r", msg)
if msg['payload'] == 'eof':
outeof = True
elif msg['event'] == 'stderr':
logger.info("container msg: %r", msg)
if msg['payload'] == 'eof':
erreof = True
elif msg['event'] == 'exit':
if state == 'started':
state = 'exiting'
exitmsg = msg
else:
logger.info("unexpected exit message: %r", msg)
elif msg['event'] == 'process_start':
elif msg.type == 'process_start':
if state == 'init':
state = 'started'
logger.info("process started in existing "
......@@ -109,9 +79,23 @@ class CommandLineInterface(object):
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg['event'] == 'process_exit':
elif msg.type == 'stdout':
logger.info("container msg: %r", msg)
if msg.payload == 'eof':
outeof = True
elif msg.type == 'stderr':
logger.info("container msg: %r", msg)
if msg.payload == 'eof':
erreof = True
elif msg.type == 'process_exit':
logger.info("process ended: %r", msg)
break
elif msg.type == 'exit':
if state == 'started':
state = 'exiting'
exitmsg = msg
else:
logger.info("unexpected exit message: %r", msg)
if outeof and erreof and state == 'exiting':
state = 'exit'
logger.info("container ended: %r", exitmsg)
......@@ -121,65 +105,55 @@ class CommandLineInterface(object):
"""Connect to the NRM and ask to list the containers present on the
system.
The NRM should respond to us on the pub socket with one message listing
all containers."""
command = {'command': 'list',
}
The NRM should respond to us with one message listing all
containers."""
self.upstream_pub_socket.send_json(command)
while(True):
msg = self.upstream_sub_socket.recv_json()
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['event'] == 'list':
command = {'api': 'up_rpc_req',
'type': 'list'}
msg = RPC_MSG['list'](**command)
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'list'
logger.info("list response: %r", msg)
break
def do_kill(self, argv):
"""Connect to the NRM and ask to kill a container by uuid.
The NRM should respond to us on the pub socket with a message
containing the exit status of the top process of the container."""
The NRM should respond to us with a message containing the exit status
of the top process of the container."""
command = {'command': 'kill',
'uuid': argv.uuid
command = {'api': 'up_rpc_req',
'type': 'kill',
'container_uuid': argv.uuid
}
self.upstream_pub_socket.send_json(command)
while(True):
msg = self.upstream_sub_socket.recv_json()
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['event'] == 'exit' and msg['uuid'] == argv.uuid:
msg = RPC_MSG['kill'](**command)
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'exit'
logger.info("container exit: %r", msg)
break
def do_setpower(self, argv):
""" Connect to the NRM and ask to change the power limit.
The NRM should answer on the pub socket with an acknowledgment."""
The NRM should answer with an acknowledgment."""
# build the command as a JSON dict giving enough info. This is an
# idempotent command, so we will repeat the command if we don't get a
# timely answer.
# TODO: check that the level makes a little bit of sense in the first
# place
command = {'command': 'setpower',
'limit': argv.limit,
command = {'api': 'up_rpc_req',
'type': 'setpower',
'limit': str(argv.limit),
}
self.upstream_pub_socket.send_json(command)
while(True):
msg = self.upstream_sub_socket.recv_json()
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'power':
if msg['limit'] == argv.limit:
logger.info("command received by the daemon")
break
msg = RPC_MSG['setpower'](**command)
self.client.sendmsg(msg)
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type == 'getpower'
logger.info("command received by the daemon: %r", msg)
def main(self):
parser = argparse.ArgumentParser()
......
......@@ -37,7 +37,7 @@ class ContainerManager(object):
command = request['file']
args = request['args']
environ = request['environ']
ucontainername = request['ucontainername']
ucontainername = request['uuid']
logger.info("run: manifest file: %s", manifestfile)
logger.info("run: command: %s", command)
logger.info("run: args: %r", args)
......
......@@ -13,7 +13,11 @@ from sensor import SensorManager
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
logger = logging.getLogger('nrm')
......@@ -63,38 +67,28 @@ class Daemon(object):
logger.error("unknown event: %r", event)
return
def do_upstream_receive(self, parts):
logger.info("receiving upstream message: %r", parts)
if len(parts) != 1:
logger.error("unexpected msg length, dropping it: %r", parts)
return
msg = json.loads(parts[0])
if isinstance(msg, dict):
command = msg.get('command')
# TODO: switch to a dispatch dictionary
if command is None:
logger.error("missing command in message: %r", msg)
return
if command == 'setpower':
self.target = float(msg['limit'])
def do_upstream_receive(self, msg, client):
if msg.type == 'setpower':
self.target = float(msg.limit)
logger.info("new target measure: %g", self.target)
elif command == 'run':
logger.info("new container will be created if it doesn't "
"exist: %r", msg)
pid, container = self.container_manager.create(msg)
cid = container.uuid
clientid = container.clientids[pid]
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'uuid': cid,
'clientid': clientid,
'errno': 0 if container else -1,
'pid': pid,
update = {'api': 'up_rpc_rep',
'type': 'getpower',
'limit': str(self.target)
}
self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
client)
elif msg.type == 'run':
container_uuid = msg.container_uuid
params = {'manifest': msg.manifest,
'file': msg.path,
'args': msg.args,
'uuid': msg.container_uuid,
'environ': msg.environ,
'clientid': client,
}
pid, container = self.container_manager.create(params)
container_uuid = container.uuid
if len(container.processes.keys()) == 1:
update['event'] = 'start'
if container.power['policy']:
container.power['manager'] = PowerPolicyManager(
container.resources['cpus'],
......@@ -105,55 +99,75 @@ class Daemon(object):
p = container.power['profile']
p['start'] = self.machine_info['energy']['energy']
p['start']['time'] = self.machine_info['time']
update['power'] = container.power['policy']
update = {'api': 'up_rpc_rep',
'type': 'start',
'container_uuid': container_uuid,
'errno': 0 if container else -1,
'pid': pid,
'power': container.power['policy'] or dict()
}
self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
client)
# setup io callbacks
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
else:
update['event'] = 'process_start'
update = {'api': 'up_rpc_rep',
'type': 'process_start',
'container_uuid': container_uuid,
}
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_start'](**update), client)
# setup io callbacks
outcb = partial(self.do_children_io, clientid, cid, 'stdout')
errcb = partial(self.do_children_io, clientid, cid, 'stderr')
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
self.upstream_pub.send_json(update)
elif command == 'kill':
elif msg.type == 'kill':
logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg['uuid'])
response = self.container_manager.kill(msg.container_uuid)
# no update here, as it will trigger child exit
elif command == 'list':
elif msg.type == 'list':
logger.info("asked for container list: %r", msg)
response = self.container_manager.list()
update = {'type': 'container',
'event': 'list',
update = {'api': 'up_rpc_rep',
'type': 'list',
'payload': response,
}
self.upstream_pub.send_json(update)
self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
client)
else:
logger.error("invalid command: %r", command)
logger.error("invalid command: %r", msg.type)
def do_children_io(self, clientid, uuid, io, data):
def do_children_io(self, client, container_uuid, io, data):
"""Receive data from one of the children, and send it down the pipe.
Meant to be partially defined on a children basis."""
logger.info("%r received %r data: %r", uuid, io, data)
update = {'type': 'container',
'event': io,
'uuid': uuid,
'clientid': clientid,
logger.info("%r received %r data: %r", container_uuid, io, data)
update = {'api': 'up_rpc_rep',
'type': io,
'container_uuid': container_uuid,
'payload': data or 'eof',
}
self.upstream_pub.send_json(update)
self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
def do_sensor(self):
self.machine_info = self.sensor_manager.do_update()
logger.info("current state: %r", self.machine_info)
total_power = self.machine_info['energy']['power']['total']
msg = {'type': 'power',
msg = {'api': 'up_pub',
'type': 'power',
'total': total_power,
'limit': self.target
}
self.upstream_pub.send_json(msg)
self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
logger.info("sending sensor message: %r", msg)
def do_control(self):
......@@ -193,14 +207,13 @@ class Daemon(object):
clientid = container.clientids[pid]
remaining_pids = [p for p in container.processes.keys()
if p != pid]
msg = {'type': 'container',
'status': status,
'uuid': container.uuid,
'clientid': clientid,
msg = {'api': 'up_rpc_rep',
'status': str(status),
'container_uuid': container.uuid,
}
if not remaining_pids:
msg['event'] = 'exit'
msg['type'] = 'exit'
pp = container.power
if pp['policy']:
pp['manager'].reset_all()
......@@ -219,15 +232,18 @@ class Daemon(object):
container.uuid, diff)
msg['profile_data'] = diff
self.container_manager.delete(container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['exit'](**msg), clientid)
else:
msg['event'] = 'process_exit'
msg['type'] = 'process_exit'
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
self.upstream_pub.send_json(msg)
else:
logger.debug("child update ignored")
pass
......@@ -240,44 +256,37 @@ class Daemon(object):
# Bind address for downstream clients
bind_address = '*'
# PUB port for upstream clients
# port for upstream PUB API
upstream_pub_port = 2345
# SUB port for upstream clients
upstream_sub_port = 3456
# port for upstream RPC API
upstream_rpc_port = 3456
# setup application listening socket
context = zmq.Context()
downstream_pub_socket = context.socket(zmq.PUB)
downstream_sub_socket = context.socket(zmq.SUB)
upstream_pub_socket = context.socket(zmq.PUB)
upstream_sub_socket = context.socket(zmq.SUB)
downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
downstream_pub_socket.bind(downstream_pub_param)
downstream_sub_socket.bind(downstream_sub_param)
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
upstream_pub_socket.bind(upstream_pub_param)
upstream_sub_socket.bind(upstream_sub_param)
upstream_sub_filter = ""
upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
logger.info("downstream pub socket bound to: %s", downstream_pub_param)
logger.info("downstream sub socket bound to: %s", downstream_sub_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param)
logger.info("upstream rpc socket connected to: %s", upstream_rpc_param)
# register socket triggers
self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
self.downstream_sub.on_recv(self.do_downstream_receive)
self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
self.upstream_sub.on_recv(self.do_upstream_receive)
self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive)
# create a stream to let ioloop deal with blocking calls on HWM
self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
# create managers
......
from collections import namedtuple
import json
import logging
import uuid
import zmq
import zmq.utils
import zmq.utils.monitor
from zmq.eventloop import zmqstream
# basestring support
try:
basestring
except NameError:
basestring = str
logger = logging.getLogger('nrm')
# list of APIs supported by this messaging layer. Each message is
# indexed by its intended api user and the type of the message, along with
# basic field type information.
APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub']
MSGFORMATS = {k: {} for k in APIS}
MSGFORMATS['up_rpc_req'] = {'list': {},
'run': {'manifest': basestring,
'path': basestring,
'args': list,
'container_uuid': basestring,
'environ': dict},
'kill': {'container_uuid': basestring},
'setpower': {'limit': basestring},
}
MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'errno': int,
'pid': int,
'power': dict},
'list': {'payload': list},
'stdout': {'container_uuid': basestring,
'payload': basestring},
'stderr': {'container_uuid': basestring,
'payload': basestring},
'exit': {'container_uuid': basestring,
'status': basestring,
'profile_data': dict},
'process_start': {'container_uuid': basestring},
'process_exit': {'container_uuid': basestring,
'status': basestring},
'getpower': {'limit': basestring},
}
MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float}}
# Mirror of the message formats, using namedtuples as the actual transport
# for users of this messaging layer.
MSGTYPES = {k: {} for k in APIS}
for api, types in MSGFORMATS.items():
tname = "msg_{}_".format(api)
MSGTYPES[api] = {k: namedtuple(tname+k, sorted(['api', 'type'] + v.keys()))
for k, v in types.items()}
def wire2msg(wire_msg):
"""Convert the wire format into a msg from the available MSGTYPES."""
fields = json.loads(wire_msg)
assert 'api' in fields
api = fields['api']
assert api in MSGFORMATS
valid_types = MSGFORMATS[api]
assert 'type' in fields
mtype = fields['type']
assert mtype in valid_types
# format check
fmt = valid_types[mtype]
for key in fields:
if key in ['api', 'type']:
continue
assert key in fmt, "%r missing from %r" % (key, fmt)
assert isinstance(fields[key], fmt[key]), \
"type mismatch for %r: %r != %r" % (key, fields[key], fmt[key])
for key in fmt:
assert key in fields, "%r missing from %r" % (key, fields)
assert isinstance(fields[key], fmt[key]), \
"type mismatch for %r: %r != %r" % (key, fields[key], fmt[key])
mtuple = MSGTYPES[api][mtype]
return mtuple(**fields)
def msg2wire(msg):
"""Convert a message to its wire format (dict)."""
fields = msg._asdict()
return json.dumps(fields)
class UpstreamRPCClient(object):
"""Implements the message layer client to the upstream RPC API."""
def __init__(self, address):
self.address = address
self.uuid = str(uuid.uuid4())
self.zmq_context = zmq.Context()
self.socket = self.zmq_context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.uuid)
self.socket.connect(address)
def wait_connected(self):
"""Creates a monitor socket and wait for the connect event."""
monitor = self.socket.get_monitor_socket()
while True:
msg = zmq.utils.monitor.recv_monitor_message(