diff --git a/bin/cmd b/bin/cmd index 7c2992c65ee349ac7dbad30224b23f8d498a9941..8a3039a0e703672a30a9429eed7752ef59b18887 100755 --- a/bin/cmd +++ b/bin/cmd @@ -3,12 +3,12 @@ from __future__ import print_function import argparse import logging -import uuid import signal -import zmq import os +import nrm.messaging -logger = logging.getLogger('nrm-cmd') +RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req'] +logger = logging.getLogger('nrm') class CommandLineInterface(object): @@ -23,95 +23,79 @@ class CommandLineInterface(object): exit(1) def setup(self): - # SUB port to the upstream API (connected to its PUB port) - upstream_sub_port = 2345 - # PUB port to the upstream API (connected to its SUB port) - upstream_pub_port = 3456 - - self.context = zmq.Context() - self.upstream_pub_socket = self.context.socket(zmq.PUB) - self.upstream_sub_socket = self.context.socket(zmq.SUB) - - upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port) - upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port) - - self.upstream_pub_socket.connect(upstream_pub_param) - self.upstream_sub_socket.connect(upstream_sub_param) - # we want to receive everything for now - upstream_sub_filter = "" - self.upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter) - - logger.info("upstream pub socket bound to: %s", upstream_pub_param) - logger.info("upstream sub socket connected to: %s", upstream_sub_param) + # upstream RPC port + upstream_client_port = 3456 + upstream_client_param = "tcp://localhost:%d" % (upstream_client_port) + self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param) # take care of signals signal.signal(signal.SIGINT, self.do_signal) - # create a uuid for this client instance - self.uuid = str(uuid.uuid4()) - logger.info("client uuid: %r", self.uuid) + self.client.wait_connected() def do_run(self, argv): """ Connect to the NRM and ask to spawn a container and run a command in it. - The NRM should notify us on the pub socket of the container - creation.""" + The NRM should reply for container info.""" # build the command as a JSON dict containing enough info. We add to # the command a container uuid as a way to make sure that we can make # the command idempotent. environ = os.environ - command = {'clientid': self.uuid, - 'ucontainername': argv.ucontainername, - 'command': 'run', + command = {'api': 'up_rpc_req', + 'type': 'run', 'manifest': argv.manifest, - 'file': argv.command, + 'path': argv.command, 'args': argv.args, 'environ': dict(environ), + 'container_uuid': str(argv.ucontainername), } + msg = RPC_MSG['run'](**command) # command fsm state = 'init' outeof = False erreof = False exitmsg = None - self.upstream_pub_socket.send_json(command) + self.client.sendmsg(msg) while(True): - msg = self.upstream_sub_socket.recv_json() - if isinstance(msg, dict) and msg.get('type') == 'container': - if msg['clientid'] == self.uuid: - if msg['event'] == 'start': - if state == 'init': - state = 'started' - logger.info("container started: %r", msg) - else: - logger.info("unexpected start message: %r", state) - exit(1) - elif msg['event'] == 'stdout': - logger.info("container msg: %r", msg) - if msg['payload'] == 'eof': - outeof = True - elif msg['event'] == 'stderr': - logger.info("container msg: %r", msg) - if msg['payload'] == 'eof': - erreof = True - elif msg['event'] == 'exit': - if state == 'started': - state = 'exiting' - exitmsg = msg - else: - logger.info("unexpected exit message: %r", msg) - elif msg['event'] == 'process_start': - if state == 'init': - state = 'started' - logger.info("process started in existing " - "container: %r""", msg) - else: - logger.info("unexpected start message: %r", state) - exit(1) - elif msg['event'] == 'process_exit': - logger.info("process ended: %r", msg) - break + msg = self.client.recvmsg() + assert msg.api == 'up_rpc_rep' + assert msg.type in ['start', 'stdout', 'stderr', 'exit', + 'process_start', 'process_exit'] + + if msg.type == 'start': + if state == 'init': + state = 'started' + logger.info("container started: %r", msg) + else: + logger.info("unexpected start message: %r", state) + exit(1) + elif msg.type == 'process_start': + if state == 'init': + state = 'started' + logger.info("process started in existing " + "container: %r""", msg) + else: + logger.info("unexpected start message: %r", state) + exit(1) + elif msg.type == 'stdout': + logger.info("container msg: %r", msg) + if msg.payload == 'eof': + outeof = True + elif msg.type == 'stderr': + logger.info("container msg: %r", msg) + if msg.payload == 'eof': + erreof = True + elif msg.type == 'process_exit': + logger.info("process ended: %r", msg) + break + elif msg.type == 'exit': + if state == 'started': + state = 'exiting' + exitmsg = msg + else: + logger.info("unexpected exit message: %r", msg) if outeof and erreof and state == 'exiting': state = 'exit' logger.info("container ended: %r", exitmsg) @@ -121,65 +105,55 @@ class CommandLineInterface(object): """Connect to the NRM and ask to list the containers present on the system. - The NRM should respond to us on the pub socket with one message listing - all containers.""" - - command = {'command': 'list', - } + The NRM should respond to us with one message listing all + containers.""" - self.upstream_pub_socket.send_json(command) - while(True): - msg = self.upstream_sub_socket.recv_json() - logger.info("new message: %r", msg) - # ignore other messages - if isinstance(msg, dict) and msg.get('type') == 'container': - if msg['event'] == 'list': - logger.info("list response: %r", msg) - break + command = {'api': 'up_rpc_req', + 'type': 'list'} + msg = RPC_MSG['list'](**command) + self.client.sendmsg(msg) + msg = self.client.recvmsg() + assert msg.api == 'up_rpc_rep' + assert msg.type == 'list' + logger.info("list response: %r", msg) def do_kill(self, argv): """Connect to the NRM and ask to kill a container by uuid. - The NRM should respond to us on the pub socket with a message - containing the exit status of the top process of the container.""" + The NRM should respond to us with a message containing the exit status + of the top process of the container.""" - command = {'command': 'kill', - 'uuid': argv.uuid + command = {'api': 'up_rpc_req', + 'type': 'kill', + 'container_uuid': argv.uuid } - - self.upstream_pub_socket.send_json(command) - while(True): - msg = self.upstream_sub_socket.recv_json() - logger.info("new message: %r", msg) - # ignore other messages - if isinstance(msg, dict) and msg.get('type') == 'container': - if msg['event'] == 'exit' and msg['uuid'] == argv.uuid: - logger.info("container exit: %r", msg) - break + msg = RPC_MSG['kill'](**command) + self.client.sendmsg(msg) + msg = self.client.recvmsg() + assert msg.api == 'up_rpc_rep' + assert msg.type == 'exit' + logger.info("container exit: %r", msg) def do_setpower(self, argv): """ Connect to the NRM and ask to change the power limit. - The NRM should answer on the pub socket with an acknowledgment.""" + The NRM should answer with an acknowledgment.""" # build the command as a JSON dict giving enough info. This is an # idempotent command, so we will repeat the command if we don't get a # timely answer. # TODO: check that the level makes a little bit of sense in the first # place - command = {'command': 'setpower', - 'limit': argv.limit, + command = {'api': 'up_rpc_req', + 'type': 'setpower', + 'limit': str(argv.limit), } - - self.upstream_pub_socket.send_json(command) - while(True): - msg = self.upstream_sub_socket.recv_json() - logger.info("new message: %r", msg) - # ignore other messages - if isinstance(msg, dict) and msg.get('type') == 'power': - if msg['limit'] == argv.limit: - logger.info("command received by the daemon") - break + msg = RPC_MSG['setpower'](**command) + self.client.sendmsg(msg) + msg = self.client.recvmsg() + assert msg.api == 'up_rpc_rep' + assert msg.type == 'getpower' + logger.info("command received by the daemon: %r", msg) def main(self): parser = argparse.ArgumentParser() diff --git a/nrm/containers.py b/nrm/containers.py index 4a5cd484262fb0c8344c4cc809ac40e543efcf23..a05149f78c5619cc7aef359ef07e1543b8815dc1 100644 --- a/nrm/containers.py +++ b/nrm/containers.py @@ -37,7 +37,7 @@ class ContainerManager(object): command = request['file'] args = request['args'] environ = request['environ'] - ucontainername = request['ucontainername'] + ucontainername = request['uuid'] logger.info("run: manifest file: %s", manifestfile) logger.info("run: command: %s", command) logger.info("run: args: %r", args) diff --git a/nrm/daemon.py b/nrm/daemon.py index 354ea069c7af00bc31139732aff96be2f9dcf78b..03b7be5bdb66bb068aeb240e276932c853613690 100644 --- a/nrm/daemon.py +++ b/nrm/daemon.py @@ -13,7 +13,11 @@ from sensor import SensorManager import signal import zmq from zmq.eventloop import ioloop, zmqstream +from nrm.messaging import MSGTYPES +from nrm.messaging import UpstreamRPCServer, UpstreamPubServer +RPC_MSG = MSGTYPES['up_rpc_rep'] +PUB_MSG = MSGTYPES['up_pub'] logger = logging.getLogger('nrm') @@ -63,97 +67,107 @@ class Daemon(object): logger.error("unknown event: %r", event) return - def do_upstream_receive(self, parts): - logger.info("receiving upstream message: %r", parts) - if len(parts) != 1: - logger.error("unexpected msg length, dropping it: %r", parts) - return - msg = json.loads(parts[0]) - if isinstance(msg, dict): - command = msg.get('command') - # TODO: switch to a dispatch dictionary - if command is None: - logger.error("missing command in message: %r", msg) - return - if command == 'setpower': - self.target = float(msg['limit']) - logger.info("new target measure: %g", self.target) - elif command == 'run': - logger.info("new container will be created if it doesn't " - "exist: %r", msg) - pid, container = self.container_manager.create(msg) - cid = container.uuid - clientid = container.clientids[pid] - - # TODO: obviously we need to send more info than that - update = {'type': 'container', - 'uuid': cid, - 'clientid': clientid, + def do_upstream_receive(self, msg, client): + if msg.type == 'setpower': + self.target = float(msg.limit) + logger.info("new target measure: %g", self.target) + update = {'api': 'up_rpc_rep', + 'type': 'getpower', + 'limit': str(self.target) + } + self.upstream_rpc_server.sendmsg(RPC_MSG['getpower'](**update), + client) + elif msg.type == 'run': + container_uuid = msg.container_uuid + params = {'manifest': msg.manifest, + 'file': msg.path, + 'args': msg.args, + 'uuid': msg.container_uuid, + 'environ': msg.environ, + 'clientid': client, + } + pid, container = self.container_manager.create(params) + container_uuid = container.uuid + if len(container.processes.keys()) == 1: + if container.power['policy']: + container.power['manager'] = PowerPolicyManager( + container.resources['cpus'], + container.power['policy'], + float(container.power['damper']), + float(container.power['slowdown'])) + if container.power['profile']: + p = container.power['profile'] + p['start'] = self.machine_info['energy']['energy'] + p['start']['time'] = self.machine_info['time'] + update = {'api': 'up_rpc_rep', + 'type': 'start', + 'container_uuid': container_uuid, 'errno': 0 if container else -1, 'pid': pid, + 'power': container.power['policy'] or dict() } - - if len(container.processes.keys()) == 1: - update['event'] = 'start' - if container.power['policy']: - container.power['manager'] = PowerPolicyManager( - container.resources['cpus'], - container.power['policy'], - float(container.power['damper']), - float(container.power['slowdown'])) - if container.power['profile']: - p = container.power['profile'] - p['start'] = self.machine_info['energy']['energy'] - p['start']['time'] = self.machine_info['time'] - update['power'] = container.power['policy'] - - else: - update['event'] = 'process_start' - + self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update), + client) # setup io callbacks - outcb = partial(self.do_children_io, clientid, cid, 'stdout') - errcb = partial(self.do_children_io, clientid, cid, 'stderr') + outcb = partial(self.do_children_io, client, + container_uuid, 'stdout') + errcb = partial(self.do_children_io, client, + container_uuid, 'stderr') container.processes[pid].stdout.read_until_close(outcb, outcb) container.processes[pid].stderr.read_until_close(errcb, errcb) - - self.upstream_pub.send_json(update) - elif command == 'kill': - logger.info("asked to kill container: %r", msg) - response = self.container_manager.kill(msg['uuid']) - # no update here, as it will trigger child exit - elif command == 'list': - logger.info("asked for container list: %r", msg) - response = self.container_manager.list() - update = {'type': 'container', - 'event': 'list', - 'payload': response, - } - self.upstream_pub.send_json(update) else: - logger.error("invalid command: %r", command) + update = {'api': 'up_rpc_rep', + 'type': 'process_start', + 'container_uuid': container_uuid, + } + self.upstream_rpc_server.sendmsg( + RPC_MSG['process_start'](**update), client) + # setup io callbacks + outcb = partial(self.do_children_io, client, + container_uuid, 'stdout') + errcb = partial(self.do_children_io, client, + container_uuid, 'stderr') + container.processes[pid].stdout.read_until_close(outcb, outcb) + container.processes[pid].stderr.read_until_close(errcb, errcb) + + elif msg.type == 'kill': + logger.info("asked to kill container: %r", msg) + response = self.container_manager.kill(msg.container_uuid) + # no update here, as it will trigger child exit + elif msg.type == 'list': + logger.info("asked for container list: %r", msg) + response = self.container_manager.list() + update = {'api': 'up_rpc_rep', + 'type': 'list', + 'payload': response, + } + self.upstream_rpc_server.sendmsg(RPC_MSG['list'](**update), + client) + else: + logger.error("invalid command: %r", msg.type) - def do_children_io(self, clientid, uuid, io, data): + def do_children_io(self, client, container_uuid, io, data): """Receive data from one of the children, and send it down the pipe. Meant to be partially defined on a children basis.""" - logger.info("%r received %r data: %r", uuid, io, data) - update = {'type': 'container', - 'event': io, - 'uuid': uuid, - 'clientid': clientid, + logger.info("%r received %r data: %r", container_uuid, io, data) + update = {'api': 'up_rpc_rep', + 'type': io, + 'container_uuid': container_uuid, 'payload': data or 'eof', } - self.upstream_pub.send_json(update) + self.upstream_rpc_server.sendmsg(RPC_MSG[io](**update), client) def do_sensor(self): self.machine_info = self.sensor_manager.do_update() logger.info("current state: %r", self.machine_info) total_power = self.machine_info['energy']['power']['total'] - msg = {'type': 'power', + msg = {'api': 'up_pub', + 'type': 'power', 'total': total_power, 'limit': self.target } - self.upstream_pub.send_json(msg) + self.upstream_pub_server.sendmsg(PUB_MSG['power'](**msg)) logger.info("sending sensor message: %r", msg) def do_control(self): @@ -193,14 +207,13 @@ class Daemon(object): clientid = container.clientids[pid] remaining_pids = [p for p in container.processes.keys() if p != pid] - msg = {'type': 'container', - 'status': status, - 'uuid': container.uuid, - 'clientid': clientid, + msg = {'api': 'up_rpc_rep', + 'status': str(status), + 'container_uuid': container.uuid, } if not remaining_pids: - msg['event'] = 'exit' + msg['type'] = 'exit' pp = container.power if pp['policy']: pp['manager'].reset_all() @@ -219,15 +232,18 @@ class Daemon(object): container.uuid, diff) msg['profile_data'] = diff self.container_manager.delete(container.uuid) + self.upstream_rpc_server.sendmsg( + RPC_MSG['exit'](**msg), clientid) else: - msg['event'] = 'process_exit' + msg['type'] = 'process_exit' # Remove the pid of process that is finished container.processes.pop(pid, None) self.container_manager.pids.pop(pid, None) logger.info("Process %s in Container %s has finised.", pid, container.uuid) + self.upstream_rpc_server.sendmsg( + RPC_MSG['process_exit'](**msg), clientid) - self.upstream_pub.send_json(msg) else: logger.debug("child update ignored") pass @@ -240,44 +256,37 @@ class Daemon(object): # Bind address for downstream clients bind_address = '*' - # PUB port for upstream clients + # port for upstream PUB API upstream_pub_port = 2345 - # SUB port for upstream clients - upstream_sub_port = 3456 + # port for upstream RPC API + upstream_rpc_port = 3456 # setup application listening socket context = zmq.Context() downstream_pub_socket = context.socket(zmq.PUB) downstream_sub_socket = context.socket(zmq.SUB) - upstream_pub_socket = context.socket(zmq.PUB) - upstream_sub_socket = context.socket(zmq.SUB) - downstream_pub_param = "ipc:///tmp/nrm-downstream-out" downstream_sub_param = "ipc:///tmp/nrm-downstream-in" upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port) - upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port) + upstream_rpc_param = "tcp://%s:%d" % (bind_address, upstream_rpc_port) downstream_pub_socket.bind(downstream_pub_param) downstream_sub_socket.bind(downstream_sub_param) downstream_sub_filter = "" downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter) - upstream_pub_socket.bind(upstream_pub_param) - upstream_sub_socket.bind(upstream_sub_param) - upstream_sub_filter = "" - upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter) + self.upstream_pub_server = UpstreamPubServer(upstream_pub_param) + self.upstream_rpc_server = UpstreamRPCServer(upstream_rpc_param) logger.info("downstream pub socket bound to: %s", downstream_pub_param) logger.info("downstream sub socket bound to: %s", downstream_sub_param) logger.info("upstream pub socket bound to: %s", upstream_pub_param) - logger.info("upstream sub socket connected to: %s", upstream_sub_param) + logger.info("upstream rpc socket connected to: %s", upstream_rpc_param) # register socket triggers self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket) self.downstream_sub.on_recv(self.do_downstream_receive) - self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket) - self.upstream_sub.on_recv(self.do_upstream_receive) + self.upstream_rpc_server.setup_recv_callback(self.do_upstream_receive) # create a stream to let ioloop deal with blocking calls on HWM - self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket) self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket) # create managers diff --git a/nrm/messaging.py b/nrm/messaging.py index f726515d090d99e2a246f156c6b6e0eb2d90220d..021974daad3142999c5f77d866cccc506c9e1b51 100644 --- a/nrm/messaging.py +++ b/nrm/messaging.py @@ -24,7 +24,8 @@ MSGFORMATS['up_rpc_req'] = {'list': {}, 'run': {'manifest': basestring, 'path': basestring, 'args': list, - 'container_uuid': basestring}, + 'container_uuid': basestring, + 'environ': dict}, 'kill': {'container_uuid': basestring}, 'setpower': {'limit': basestring}, } @@ -40,6 +41,9 @@ MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring, 'exit': {'container_uuid': basestring, 'status': basestring, 'profile_data': dict}, + 'process_start': {'container_uuid': basestring}, + 'process_exit': {'container_uuid': basestring, + 'status': basestring}, 'getpower': {'limit': basestring}, } MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float}}