Commit 0b0ab966 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

[refactor] replace upstream comms with msg layer

Replace the fragile upstream communications with the new messaging
layer, improving the stability and performance of this API.

NOTE: this breaks previous clients
NOTE: this patch is missing client tracking, to handle children signals.
parent c29ed7ea
Pipeline #4168 passed with stages
in 54 seconds
...@@ -3,12 +3,12 @@ ...@@ -3,12 +3,12 @@
from __future__ import print_function from __future__ import print_function
import argparse import argparse
import logging import logging
import uuid
import signal import signal
import zmq
import os import os
import nrm.messaging
logger = logging.getLogger('nrm-cmd') RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
class CommandLineInterface(object): class CommandLineInterface(object):
...@@ -23,95 +23,79 @@ class CommandLineInterface(object): ...@@ -23,95 +23,79 @@ class CommandLineInterface(object):
exit(1) exit(1)
def setup(self): def setup(self):
# SUB port to the upstream API (connected to its PUB port) # upstream RPC port
upstream_sub_port = 2345 upstream_client_port = 3456
# PUB port to the upstream API (connected to its SUB port) upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
upstream_pub_port = 3456 self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
self.context = zmq.Context()
self.upstream_pub_socket = self.context.socket(zmq.PUB)
self.upstream_sub_socket = self.context.socket(zmq.SUB)
upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)
self.upstream_pub_socket.connect(upstream_pub_param)
self.upstream_sub_socket.connect(upstream_sub_param)
# we want to receive everything for now
upstream_sub_filter = ""
self.upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param)
# take care of signals # take care of signals
signal.signal(signal.SIGINT, self.do_signal) signal.signal(signal.SIGINT, self.do_signal)
# create a uuid for this client instance self.client.wait_connected()
self.uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.uuid)
def do_run(self, argv): def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command """ Connect to the NRM and ask to spawn a container and run a command
in it. in it.
The NRM should notify us on the pub socket of the container The NRM should reply for container info."""
creation."""
# build the command as a JSON dict containing enough info. We add to # build the command as a JSON dict containing enough info. We add to
# the command a container uuid as a way to make sure that we can make # the command a container uuid as a way to make sure that we can make
# the command idempotent. # the command idempotent.
environ = os.environ environ = os.environ
command = {'clientid': self.uuid, command = {'api': 'up_rpc_req',
'ucontainername': argv.ucontainername, 'type': 'run',
'command': 'run',
'manifest': argv.manifest, 'manifest': argv.manifest,
'file': argv.command, 'path': argv.command,
'args': argv.args, 'args': argv.args,
'environ': dict(environ), 'environ': dict(environ),
'container_uuid': str(argv.ucontainername),
} }
msg = RPC_MSG['run'](**command)
# command fsm # command fsm
state = 'init' state = 'init'
outeof = False outeof = False
erreof = False erreof = False
exitmsg = None exitmsg = None
self.upstream_pub_socket.send_json(command) self.client.sendmsg(msg)
while(True): while(True):
msg = self.upstream_sub_socket.recv_json() msg = self.client.recvmsg()
if isinstance(msg, dict) and msg.get('type') == 'container': assert msg.api == 'up_rpc_rep'
if msg['clientid'] == self.uuid: assert msg.type in ['start', 'stdout', 'stderr', 'exit',
if msg['event'] == 'start': 'process_start', 'process_exit']
if state == 'init':
state = 'started' if msg.type == 'start':
logger.info("container started: %r", msg) if state == 'init':
else: state = 'started'
logger.info("unexpected start message: %r", state) logger.info("container started: %r", msg)
exit(1) else:
elif msg['event'] == 'stdout': logger.info("unexpected start message: %r", state)
logger.info("container msg: %r", msg) exit(1)
if msg['payload'] == 'eof': elif msg.type == 'process_start':
outeof = True if state == 'init':
elif msg['event'] == 'stderr': state = 'started'
logger.info("container msg: %r", msg) logger.info("process started in existing "
if msg['payload'] == 'eof': "container: %r""", msg)
erreof = True else:
elif msg['event'] == 'exit': logger.info("unexpected start message: %r", state)
if state == 'started': exit(1)
state = 'exiting' elif msg.type == 'stdout':
exitmsg = msg logger.info("container msg: %r", msg)
else: if msg.payload == 'eof':
logger.info("unexpected exit message: %r", msg) outeof = True
elif msg['event'] == 'process_start': elif msg.type == 'stderr':
if state == 'init': logger.info("container msg: %r", msg)
state = 'started' if msg.payload == 'eof':
logger.info("process started in existing " erreof = True
"container: %r""", msg) elif msg.type == 'process_exit':
else: logger.info("process ended: %r", msg)
logger.info("unexpected start message: %r", state) break
exit(1) elif msg.type == 'exit':
elif msg['event'] == 'process_exit': if state == 'started':
logger.info("process ended: %r", msg) state = 'exiting'
break exitmsg = msg
else:
logger.info("unexpected exit message: %r", msg)
if outeof and erreof and state == 'exiting': if outeof and erreof and state == 'exiting':
state = 'exit' state = 'exit'
logger.info("container ended: %r", exitmsg) logger.info("container ended: %r", exitmsg)
...@@ -121,65 +105,55 @@ class CommandLineInterface(object): ...@@ -121,65 +105,55 @@ class CommandLineInterface(object):
"""Connect to the NRM and ask to list the containers present on the """Connect to the NRM and ask to list the containers present on the
system. system.
The NRM should respond to us on the pub socket with one message listing The NRM should respond to us with one message listing all
all containers.""" containers."""
command = {'command': 'list',
}
self.upstream_pub_socket.send_json(command) command = {'api': 'up_rpc_req',
while(True): 'type': 'list'}
msg = self.upstream_sub_socket.recv_json() msg = RPC_MSG['list'](**command)
logger.info("new message: %r", msg) self.client.sendmsg(msg)
# ignore other messages msg = self.client.recvmsg()
if isinstance(msg, dict) and msg.get('type') == 'container': assert msg.api == 'up_rpc_rep'
if msg['event'] == 'list': assert msg.type == 'list'
logger.info("list response: %r", msg) logger.info("list response: %r", msg)
break
def do_kill(self, argv): def do_kill(self, argv):
"""Connect to the NRM and ask to kill a container by uuid. """Connect to the NRM and ask to kill a container by uuid.
The NRM should respond to us on the pub socket with a message The NRM should respond to us with a message containing the exit status
containing the exit status of the top process of the container.""" of the top process of the container."""
command = {'command': 'kill', command = {'api': 'up_rpc_req',
'uuid': argv.uuid 'type': 'kill',
'container_uuid': argv.uuid
} }
msg = RPC_MSG['kill'](**command)
self.upstream_pub_socket.send_json(command) self.client.sendmsg(msg)
while(True): msg = self.client.recvmsg()
msg = self.upstream_sub_socket.recv_json() assert msg.api == 'up_rpc_rep'
logger.info("new message: %r", msg) assert msg.type == 'exit'
# ignore other messages logger.info("container exit: %r", msg)
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['event'] == 'exit' and msg['uuid'] == argv.uuid:
logger.info("container exit: %r", msg)
break
def do_setpower(self, argv): def do_setpower(self, argv):
""" Connect to the NRM and ask to change the power limit. """ Connect to the NRM and ask to change the power limit.
The NRM should answer on the pub socket with an acknowledgment.""" The NRM should answer with an acknowledgment."""
# build the command as a JSON dict giving enough info. This is an # build the command as a JSON dict giving enough info. This is an
# idempotent command, so we will repeat the command if we don't get a # idempotent command, so we will repeat the command if we don't get a
# timely answer. # timely answer.
# TODO: check that the level makes a little bit of sense in the first # TODO: check that the level makes a little bit of sense in the first
# place # place
command = {'command': 'setpower', command = {'api': 'up_rpc_req',
'limit': argv.limit, 'type': 'setpower',
'limit': str(argv.limit),
} }
msg = RPC_MSG['setpower'](**command)
self.upstream_pub_socket.send_json(command) self.client.sendmsg(msg)
while(True): msg = self.client.recvmsg()
msg = self.upstream_sub_socket.recv_json() assert msg.api == 'up_rpc_rep'
logger.info("new message: %r", msg) assert msg.type == 'getpower'
# ignore other messages logger.info("command received by the daemon: %r", msg)
if isinstance(msg, dict) and msg.get('type') == 'power':
if msg['limit'] == argv.limit:
logger.info("command received by the daemon")
break
def main(self): def main(self):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
......
...@@ -37,7 +37,7 @@ class ContainerManager(object): ...@@ -37,7 +37,7 @@ class ContainerManager(object):
command = request['file'] command = request['file']
args = request['args'] args = request['args']
environ = request['environ'] environ = request['environ']
ucontainername = request['ucontainername'] ucontainername = request['uuid']
logger.info("run: manifest file: %s", manifestfile) logger.info("run: manifest file: %s", manifestfile)
logger.info("run: command: %s", command) logger.info("run: command: %s", command)
logger.info("run: args: %r", args) logger.info("run: args: %r", args)
......
...@@ -13,7 +13,11 @@ from sensor import SensorManager ...@@ -13,7 +13,11 @@ from sensor import SensorManager
import signal import signal
import zmq import zmq
from zmq.eventloop import ioloop, zmqstream from zmq.eventloop import ioloop, zmqstream
from nrm.messaging import MSGTYPES
from nrm.messaging import UpstreamRPCServer, UpstreamPubServer
RPC_MSG = MSGTYPES['up_rpc_rep']
PUB_MSG = MSGTYPES['up_pub']
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
...@@ -63,97 +67,107 @@ class Daemon(object): ...@@ -63,97 +67,107 @@ class Daemon(object):
logger.error("unknown event: %r", event) logger.error("unknown event: %r", event)
return return
def do_upstream_receive(self, parts): def do_upstream_receive(self, msg, client):
logger.info("receiving upstream message: %r", parts) if msg.type == 'setpower':
if len(parts) != 1: self.target = float(msg.limit)
logger.error("unexpected msg length, dropping it: %r", parts) logger.info("new target measure: %g", self.target)
return update = {'api': 'up_rpc_rep',
msg = json.loads(parts[0]) 'type': 'getpower',
if isinstance(msg, dict): 'limit': str(self.target)
command = msg.get('command') }
# TODO: switch to a dispatch dictionary self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update),
if command is None: client)
logger.error("missing command in message: %r", msg) elif msg.type == 'run':
return container_uuid = msg.container_uuid
if command == 'setpower': params = {'manifest': msg.manifest,
self.target = float(msg['limit']) 'file': msg.path,
logger.info("new target measure: %g", self.target) 'args': msg.args,
elif command == 'run': 'uuid': msg.container_uuid,
logger.info("new container will be created if it doesn't " 'environ': msg.environ,
"exist: %r", msg) 'clientid': client,
pid, container = self.container_manager.create(msg) }
cid = container.uuid pid, container = self.container_manager.create(params)
clientid = container.clientids[pid] container_uuid = container.uuid
if len(container.processes.keys()) == 1:
# TODO: obviously we need to send more info than that if container.power['policy']:
update = {'type': 'container', container.power['manager'] = PowerPolicyManager(
'uuid': cid, container.resources['cpus'],
'clientid': clientid, container.power['policy'],
float(container.power['damper']),
float(container.power['slowdown']))
if container.power['profile']:
p = container.power['profile']
p['start'] = self.machine_info['energy']['energy']
p['start']['time'] = self.machine_info['time']
update = {'api': 'up_rpc_rep',
'type': 'start',
'container_uuid': container_uuid,
'errno': 0 if container else -1, 'errno': 0 if container else -1,
'pid': pid, 'pid': pid,
'power': container.power['policy'] or dict()
} }
self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
if len(container.processes.keys()) == 1: client)
update['event'] = 'start'
if container.power['policy']:
container.power['manager'] = PowerPolicyManager(
container.resources['cpus'],
container.power['policy'],
float(container.power['damper']),
float(container.power['slowdown']))
if container.power['profile']:
p = container.power['profile']
p['start'] = self.machine_info['energy']['energy']
p['start']['time'] = self.machine_info['time']
update['power'] = container.power['policy']
else:
update['event'] = 'process_start'
# setup io callbacks # setup io callbacks
outcb = partial(self.do_children_io, clientid, cid, 'stdout') outcb = partial(self.do_children_io, client,
errcb = partial(self.do_children_io, clientid, cid, 'stderr') container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb) container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb) container.processes[pid].stderr.read_until_close(errcb, errcb)
self.upstream_pub.send_json(update)
elif command == 'kill':
logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg['uuid'])
# no update here, as it will trigger child exit
elif command == 'list':
logger.info("asked for container list: %r", msg)
response = self.container_manager.list()
update = {'type': 'container',
'event': 'list',
'payload': response,
}
self.upstream_pub.send_json(update)
else: else:
logger.error("invalid command: %r", command) update = {'api': 'up_rpc_rep',
'type': 'process_start',
'container_uuid': container_uuid,
}
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_start'](**update), client)
# setup io callbacks
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
elif msg.type == 'kill':
logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg.container_uuid)
# no update here, as it will trigger child exit
elif msg.type == 'list':
logger.info("asked for container list: %r", msg)
response = self.container_manager.list()
update = {'api': 'up_rpc_rep',
'type': 'list',
'payload': response,
}
self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update),
client)
else:
logger.error("invalid command: %r", msg.type)
def do_children_io(self, clientid, uuid, io, data): def do_children_io(self, client, container_uuid, io, data):
"""Receive data from one of the children, and send it down the pipe. """Receive data from one of the children, and send it down the pipe.
Meant to be partially defined on a children basis.""" Meant to be partially defined on a children basis."""
logger.info("%r received %r data: %r", uuid, io, data) logger.info("%r received %r data: %r", container_uuid, io, data)
update = {'type': 'container', update = {'api': 'up_rpc_rep',
'event': io, 'type': io,
'uuid': uuid, 'container_uuid': container_uuid,
'clientid': clientid,
'payload': data or 'eof', 'payload': data or 'eof',
} }
self.upstream_pub.send_json(update) self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client)
def do_sensor(self): def do_sensor(self):
self.machine_info = self.sensor_manager.do_update() self.machine_info = self.sensor_manager.do_update()
logger.info("current state: %r", self.machine_info) logger.info("current state: %r", self.machine_info)
total_power = self.machine_info['energy']['power']['total'] total_power = self.machine_info['energy']['power']['total']
msg = {'type': 'power', msg = {'api': 'up_pub',
'type': 'power',
'total': total_power, 'total': total_power,
'limit': self.target 'limit': self.target
} }
self.upstream_pub.send_json(msg) self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg))
logger.info("sending sensor message: %r", msg) logger.info("sending sensor message: %r", msg)
def do_control(self): def do_control(self):
...@@ -193,14 +207,13 @@ class Daemon(object): ...@@ -193,14 +207,13 @@ class Daemon(object):
clientid = container.clientids[pid] clientid = container.clientids[pid]
remaining_pids = [p for p in container.processes.keys() remaining_pids = [p for p in container.processes.keys()
if p != pid] if p != pid]
msg = {'type': 'container', msg = {'api': 'up_rpc_rep',
'status': status, 'status': str(status),
'uuid': container.uuid, 'container_uuid': container.uuid,
'clientid': clientid,
} }
if not remaining_pids: if not remaining_pids:
msg['event'] = 'exit' msg['type'] = 'exit'
pp = container.power pp = container.power
if pp['policy']: if pp['policy']:
pp['manager'].reset_all() pp['manager'].reset_all()
...@@ -219,15 +232,18 @@ class Daemon(object): ...@@ -219,15 +232,18 @@ class Daemon(object):
container.uuid, diff) container.uuid, diff)
msg['profile_data'] = diff msg['profile_data'] = diff
self.container_manager.delete(container.uuid) self.container_manager.delete(container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['exit'](**msg), clientid)
else: else:
msg['event'] = 'process_exit' msg['type'] = 'process_exit'
# Remove the pid of process that is finished # Remove the pid of process that is finished
container.processes.pop(pid, None) container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None) self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.", logger.info("Process %s in Container %s has finised.",
pid, container.uuid) pid, container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
self.upstream_pub.send_json(msg)
else: else:
logger.debug("child update ignored") logger.debug("child update ignored")
pass pass
...@@ -240,44 +256,37 @@ class Daemon(object): ...@@ -240,44 +256,37 @@ class Daemon(object):
# Bind address for downstream clients # Bind address for downstream clients
bind_address = '*' bind_address = '*'
# PUB port for upstream clients # port for upstream PUB API
upstream_pub_port = 2345 upstream_pub_port = 2345
# SUB port for upstream clients # port for upstream RPC API
upstream_sub_port = 3456 upstream_rpc_port = 3456
# setup application listening socket # setup application listening socket
context = zmq.Context() context = zmq.Context()
downstream_pub_socket = context.socket(zmq.PUB) downstream_pub_socket = context.socket(zmq.PUB)
downstream_sub_socket = context.socket(zmq.SUB) downstream_sub_socket = context.socket(zmq.SUB)
upstream_pub_socket = context.socket(zmq.PUB)
upstream_sub_socket = context.socket(zmq.SUB)
downstream_pub_param = "ipc:///tmp/nrm-downstream-out" downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
downstream_sub_param = "ipc:///tmp/nrm-downstream-in" downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port) upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port) upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port)
downstream_pub_socket.bind(downstream_pub_param) downstream_pub_socket.bind(downstream_pub_param)
downstream_sub_socket.bind(downstream_sub_param) downstream_sub_socket.bind(downstream_sub_param)
downstream_sub_filter = "" downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter) downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
upstream_pub_socket.bind(upstream_pub_param) self.upstream_pub_server = UpstreamPubServer(upstream_pub_param)
upstream_sub_socket.bind(upstream_sub_param) self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param)
upstream_sub_filter = ""
upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)