#!/usr/bin/env python2 ############################################################################### # Copyright 2019 UChicago Argonne, LLC. # (c.f. AUTHORS, LICENSE) # # This file is part of the NRM project. # For more info, see https://xgitlab.cels.anl.gov/argo/nrm # # SPDX-License-Identifier: BSD-3-Clause ############################################################################### from __future__ import print_function import argparse import logging import signal import os import nrm.messaging import uuid import sys import time logger = logging.getLogger('nrm') class CommandLineInterface(object): """Implements a command line interface to the NRM.""" def __init__(self): pass def do_signal(self, uuid, signum, stackframe): if uuid: logger.info("received signal %d, killing the application..", signum) self.client.send( "Kill", container_uuid=uuid) logger.info("killed the application, exiting.") else: logger.info("received signal %d, exiting", signum) exit(130) def setup(self): # upstream RPC port upstream_client_port = 3456 upstream_client_param = "tcp://localhost:%d" % (upstream_client_port) self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param) # take care of signals def handler(signum, frame): self.do_signal(None, signum, frame) signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) self.client.connect() def do_listen(self, argv): """ Connect to the NRM and listen for pub/sub messages.""" upstream_pub_port = 2345 upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port) self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param) self.pub_client.connect() while(True): msg = self.pub_client.recv() logger.debug("pub message: %s", msg) def print_if_filter(): if argv.filter: if argv.filter == msg.tag: if (msg.tag == "performance" or msg.tag == "progress"): print("%s, %s, %s" % (msg.tag, time.time(), msg.payload)) if msg.tag == "power": print("%s, %s, %s" % (msg.tag, time.time(), msg.total)) if msg.tag == "exit": print("%s, %s, %s" % (msg.tag, time.time(), msg.profile_data)) else: print("%s, %s" % (msg.tag, time.time())) sys.stdout.flush() print_if_filter() def do_run(self, argv): """ Connect to the NRM and ask to spawn a container and run a command in it. The NRM should reply for container info.""" # build the command as a JSON dict containing enough info. We add to # the command a container uuid as a way to make sure that we can make # the command idempotent. environ = os.environ # environ = [] container_uuid = argv.ucontainername or str(uuid.uuid4()) # simple check + error msg + non-zero return code def sanitize_manifest(path): if os.path.isfile(path): return(os.path.abspath(path)) else: logger.error("Manifest file not found: %s", path) sys.exit(1) # command fsm state = 'init' outeof = False erreof = False exitmsg = None self.client.send( tag="run", manifest=sanitize_manifest(argv.manifest), path=argv.command, args=argv.args, environ=dict(environ), container_uuid=container_uuid) # the first message tells us if we started a container or not msg = self.client.recv() assert msg.tag == 'start' def handler(signum, frame): self.do_signal(msg.container_uuid, signum, frame) signal.signal(signal.SIGINT, handler) state = 'started' while(True): msg = self.client.recv() assert msg.tag in ['stdout', 'stderr', 'exit'] if msg.tag == 'stdout': logger.debug("container msg: %r", msg) if msg.payload == 'eof': outeof = True else: print(msg.payload, file=sys.stdout) sys.stdout.flush() elif msg.tag == 'stderr': logger.debug("container msg: %r", msg) if msg.payload == 'eof': erreof = True else: print(msg.payload, file=sys.stderr) sys.stderr.flush() elif msg.tag == 'exit': state = 'exiting' exitmsg = msg else: logger.error("unexpected message: %r", msg) if outeof and erreof and state == 'exiting': state = 'exit' istatus = int(exitmsg.status) logger.debug("command ended with istatus %r.", exitmsg) if os.WIFSIGNALED(istatus): logger.error("command ended due to signal %s" % str(os.WTERMSIG(istatus))) sys.exit(1) elif os.WIFEXITED(istatus): s = int(os.WTERMSIG(istatus)) if s > 0: logger.error("command ended with exit code %s" % str(s)) sys.exit(s) else: logger.error( "non-compliant exit code received from" "daemon: %s" % str(exitmsg.status)) sys.exit(1) break def do_list(self, argv): """Connect to the NRM and ask to list the containers present on the system. The NRM should respond to us with one message listing all containers.""" self.client.send(tag="list") msg = self.client.recv() assert msg.tag == 'list' logger.info("list response: %r", msg) def do_kill(self, argv): """Connect to the NRM and ask to kill a container by uuid. The NRM should respond to us with a message containing the exit status of the top process of the container.""" self.client.send(tag="kill", container_uuid=argv.uuid) msg = self.client.recv() assert msg.tag == 'exit' logger.info("container exit: %r", msg) def do_setpower(self, argv): """ Connect to the NRM and ask to change the power limit. The NRM should answer with an acknowledgment.""" # build the command as a JSON dict giving enough info. This is an # idempotent command, so we will repeat the command if we don't get a # timely answer. # TODO: check that the level makes a little bit of sense in the first # place self.client.send(tag="setPower", limit=str(argv.limit)) msg = self.client.recv() assert msg.tag == 'getPower' logger.info("command received by the daemon: %r", msg) def main(self): parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", help="verbose logging information", action='store_true') subparsers = parser.add_subparsers() # run container parser_run = subparsers.add_parser("run") parser_run.add_argument("manifest", help="manifest file to apply") parser_run.add_argument("command", help="command to execute") parser_run.add_argument("args", help="command arguments", nargs=argparse.REMAINDER) parser_run.add_argument("-u", "--ucontainername", help="""user-specified name for container used to attach proceses""", nargs='?', const=None, default=None) parser_run.set_defaults(func=self.do_run) # kill container parser_kill = subparsers.add_parser("kill") parser_kill.add_argument("uuid", help="uuid of the container") parser_kill.set_defaults(func=self.do_kill) # list containers parser_list = subparsers.add_parser("list") parser_list.set_defaults(func=self.do_list) # listen parser_listen = subparsers.add_parser("listen") parser_listen.add_argument("-u", "--uuid", help="container uuid to listen for", default=None) parser_listen.add_argument("-f", "--filter", help="type of message to filter and" " prettyprint, in {power,performance}", default=None) parser_listen.set_defaults(func=self.do_listen) # setpowerlimit parser_setpower = subparsers.add_parser("setpower") parser_setpower.add_argument("-f", "--follow", help="listen for power changes", action='store_true') parser_setpower.add_argument("limit", help="set new power limit", type=float) parser_setpower.set_defaults(func=self.do_setpower) args = parser.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) self.setup() args.func(args) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) cli = CommandLineInterface() cli.main()