#!/usr/bin/env python2 from __future__ import print_function import argparse import logging import signal import os import nrm.messaging import uuid import sys import time import collections RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req'] logger = logging.getLogger('nrm') KillArgs = collections.namedtuple("Kill", ["uuid"]) class CommandLineInterface(object): """Implements a command line interface to the NRM.""" def __init__(self): pass def do_signal(self, uuid, signum, stackframe): if uuid: logger.info("received signal %d, killing the application..", signum) command = {'api': 'up_rpc_req', 'type': 'kill', 'container_uuid': uuid } msg = RPC_MSG['kill'](**command) self.client.sendmsg(msg) logger.info("killed the application, exiting.") else: logger.info("received signal %d, exiting", signum) exit(130) def setup(self): # upstream RPC port upstream_client_port = 3456 upstream_client_param = "tcp://localhost:%d" % (upstream_client_port) self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param) # take care of signals def handler(signum, frame): self.do_signal(None, signum, frame) signal.signal(signal.SIGINT, handler) self.client.wait_connected() def do_listen(self, argv): """ Connect to the NRM and listen for pub/sub messages.""" upstream_pub_port = 2345 upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port) self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param) self.pub_client.wait_connected() while(True): msg = self.pub_client.recvmsg() logger.debug("pub message: %s", msg) def print_if_filter(): if argv.filter: if argv.filter == msg.type: if (msg.type == "performance" or msg.type == "progress"): print("%s, %s, %s" % (msg.type, time.time(), msg.payload)) if msg.type == "power": print("%s, %s, %s" % (msg.type, time.time(), msg.total)) sys.stdout.flush() else: print("%s, %s" % (msg.type, time.time())) sys.stdout.flush() print_if_filter() # if argv.uuid: # uuid = getattr(msg, 'container_uuid', None) # if argv.uuid == uuid or msg.type == "power": # print_if_filter() # else: # print_if_filter() def do_run(self, argv): """ Connect to the NRM and ask to spawn a container and run a command in it. The NRM should reply for container info.""" # build the command as a JSON dict containing enough info. We add to # the command a container uuid as a way to make sure that we can make # the command idempotent. environ = os.environ container_uuid = argv.ucontainername or str(uuid.uuid4()) # simple check + error msg + non-zero return code def sanitize_manifest(path): if os.path.isfile(path): return(os.path.abspath(path)) else: logger.error("Manifest file not found: %s", path) sys.exit(1) command = {'api': 'up_rpc_req', 'type': 'run', 'manifest': sanitize_manifest(argv.manifest), 'path': argv.command, 'args': argv.args, 'environ': dict(environ), 'container_uuid': container_uuid, } msg = RPC_MSG['run'](**command) # command fsm state = 'init' outeof = False erreof = False exitmsg = None self.client.sendmsg(msg) # the first message tells us if we started a container or not msg = self.client.recvmsg() assert msg.api == 'up_rpc_rep' assert msg.type == 'process_start' def handler(signum, frame): self.do_signal(msg.container_uuid, signum, frame) signal.signal(signal.SIGINT, handler) state = 'started' while(True): msg = self.client.recvmsg() assert msg.api == 'up_rpc_rep' assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit'] if msg.type == 'stdout': logger.debug("container msg: %r", msg) if msg.payload == 'eof': outeof = True else: print(msg.payload) elif msg.type == 'stderr': logger.debug("container msg: %r", msg) if msg.payload == 'eof': erreof = True else: print(msg.payload, file=sys.stderr) elif msg.type == 'process_exit': logger.info("process ended: %r", msg) state = 'exiting' exitmsg = msg else: logger.error("unexpected message: %r", msg) if outeof and erreof and state == 'exiting': state = 'exit' logger.info("command ended: %r", exitmsg) sys.exit(int(exitmsg.status)) break def do_list(self, argv): """Connect to the NRM and ask to list the containers present on the system. The NRM should respond to us with one message listing all containers.""" command = {'api': 'up_rpc_req', 'type': 'list'} msg = RPC_MSG['list'](**command) self.client.sendmsg(msg) msg = self.client.recvmsg() assert msg.api == 'up_rpc_rep' assert msg.type == 'list' logger.info("list response: %r", msg) def do_kill(self, argv): """Connect to the NRM and ask to kill a container by uuid. The NRM should respond to us with a message containing the exit status of the top process of the container.""" command = {'api': 'up_rpc_req', 'type': 'kill', 'container_uuid': argv.uuid } msg = RPC_MSG['kill'](**command) self.client.sendmsg(msg) msg = self.client.recvmsg() assert msg.api == 'up_rpc_rep' assert msg.type == 'exit' logger.info("container exit: %r", msg) def do_setpower(self, argv): """ Connect to the NRM and ask to change the power limit. The NRM should answer with an acknowledgment.""" # build the command as a JSON dict giving enough info. This is an # idempotent command, so we will repeat the command if we don't get a # timely answer. # TODO: check that the level makes a little bit of sense in the first # place command = {'api': 'up_rpc_req', 'type': 'setpower', 'limit': str(argv.limit), } msg = RPC_MSG['setpower'](**command) self.client.sendmsg(msg) msg = self.client.recvmsg() assert msg.api == 'up_rpc_rep' assert msg.type == 'getpower' logger.info("command received by the daemon: %r", msg) def main(self): parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", help="verbose logging information", action='store_true') subparsers = parser.add_subparsers() # run container parser_run = subparsers.add_parser("run") parser_run.add_argument("manifest", help="manifest file to apply") parser_run.add_argument("command", help="command to execute") parser_run.add_argument("args", help="command arguments", nargs=argparse.REMAINDER) parser_run.add_argument("-u", "--ucontainername", help="""user-specified name for container used to attach proceses""", nargs='?', const=None, default=None) parser_run.set_defaults(func=self.do_run) # kill container parser_kill = subparsers.add_parser("kill") parser_kill.add_argument("uuid", help="uuid of the container") parser_kill.set_defaults(func=self.do_kill) # list containers parser_list = subparsers.add_parser("list") parser_list.set_defaults(func=self.do_list) # listen parser_listen = subparsers.add_parser("listen") parser_listen.add_argument("-u", "--uuid", help="container uuid to listen for", default=None) parser_listen.add_argument("-f", "--filter", help="type of message to filter and \ \ prettyprint, in {power,performance}", default=None) parser_listen.set_defaults(func=self.do_listen) # setpowerlimit parser_setpower = subparsers.add_parser("setpower") parser_setpower.add_argument("-f", "--follow", help="listen for power changes", action='store_true') parser_setpower.add_argument("limit", help="set new power limit", type=float) parser_setpower.set_defaults(func=self.do_setpower) args = parser.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) self.setup() args.func(args) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) cli = CommandLineInterface() cli.main()