cmd 10.5 KB
Newer Older
Swann Perarnau's avatar
Swann Perarnau committed
1 2
#!/usr/bin/env python2

3 4 5 6 7 8 9 10 11 12
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

Swann Perarnau's avatar
Swann Perarnau committed
13 14 15 16
from __future__ import print_function
import argparse
import logging
import signal
17
import os
18
import nrm.messaging
19
import uuid
Valentin Reis's avatar
Valentin Reis committed
20
import sys
21
import time
Valentin Reis's avatar
Valentin Reis committed
22
import collections
Swann Perarnau's avatar
Swann Perarnau committed
23

24 25
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
Valentin Reis's avatar
Valentin Reis committed
26
KillArgs = collections.namedtuple("Kill", ["uuid"])
27

Swann Perarnau's avatar
Swann Perarnau committed
28 29 30 31 32 33

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
34
        pass
Swann Perarnau's avatar
Swann Perarnau committed
35

Valentin Reis's avatar
Valentin Reis committed
36 37 38 39
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
40 41 42 43 44 45 46
            command = {'api': 'up_rpc_req',
                       'type': 'kill',
                       'container_uuid': uuid
                       }
            msg = RPC_MSG['kill'](**command)
            self.client.sendmsg(msg)
            logger.info("killed the application, exiting.")
Valentin Reis's avatar
Valentin Reis committed
47 48
        else:
            logger.info("received signal %d, exiting", signum)
49
        exit(130)
Swann Perarnau's avatar
Swann Perarnau committed
50 51

    def setup(self):
52 53 54 55
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
Swann Perarnau's avatar
Swann Perarnau committed
56 57

        # take care of signals
Valentin Reis's avatar
Valentin Reis committed
58 59 60
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
Swann Perarnau's avatar
Swann Perarnau committed
61

62
        self.client.connect()
Swann Perarnau's avatar
Swann Perarnau committed
63

64 65 66 67 68
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
69
        self.pub_client.connect()
70 71 72

        while(True):
            msg = self.pub_client.recvmsg()
73 74 75 76 77
            logger.debug("pub message: %s", msg)

            def print_if_filter():
                if argv.filter:
                    if argv.filter == msg.type:
78 79 80 81 82 83 84
                        if (msg.type == "performance" or
                           msg.type == "progress"):
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.payload))
                        if msg.type == "power":
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.total))
85
                        sys.stdout.flush()
86 87 88 89
                        if msg.type == "container_exit":
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.profile_data))
                        sys.stdout.flush()
90 91 92 93
                else:
                    print("%s, %s" % (msg.type, time.time()))
                    sys.stdout.flush()

94 95 96 97 98 99 100
            print_if_filter()
            # if argv.uuid:
            #     uuid = getattr(msg, 'container_uuid', None)
            #     if argv.uuid == uuid or msg.type == "power":
            #         print_if_filter()
            # else:
            #     print_if_filter()
101

Swann Perarnau's avatar
Swann Perarnau committed
102
    def do_run(self, argv):
103 104 105
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

106
        The NRM should reply for container info."""
107 108 109 110

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
111
        environ = os.environ
112
        container_uuid = argv.ucontainername or str(uuid.uuid4())
113 114 115 116 117 118 119 120 121

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

122 123
        command = {'api': 'up_rpc_req',
                   'type': 'run',
124
                   'manifest': sanitize_manifest(argv.manifest),
125
                   'path': argv.command,
126
                   'args': argv.args,
127
                   'environ': dict(environ),
128
                   'container_uuid': container_uuid,
129
                   }
130
        msg = RPC_MSG['run'](**command)
131 132 133 134 135
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
136
        self.client.sendmsg(msg)
137 138 139 140

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
141
        assert msg.type == 'process_start'
Valentin Reis's avatar
Valentin Reis committed
142 143 144 145 146

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

147
        state = 'started'
148
        while(True):
149 150
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
151
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
152

153
            if msg.type == 'stdout':
154
                logger.debug("container msg: %r", msg)
155 156
                if msg.payload == 'eof':
                    outeof = True
157 158
                else:
                    print(msg.payload)
159
            elif msg.type == 'stderr':
160
                logger.debug("container msg: %r", msg)
161 162
                if msg.payload == 'eof':
                    erreof = True
163 164
                else:
                    print(msg.payload, file=sys.stderr)
165 166
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
167 168 169 170
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
171 172
            if outeof and erreof and state == 'exiting':
                state = 'exit'
173
                logger.info("command ended: %r", exitmsg)
Valentin Reis's avatar
Valentin Reis committed
174
                sys.exit(int(exitmsg.status))
175
                break
Swann Perarnau's avatar
Swann Perarnau committed
176

177 178 179 180
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

181 182
        The NRM should respond to us with one message listing all
        containers."""
183

184 185 186 187 188 189 190 191
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
192

Swann Perarnau's avatar
Swann Perarnau committed
193 194 195
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

196 197
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
198

199 200 201
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
Swann Perarnau's avatar
Swann Perarnau committed
202
                   }
203 204 205 206 207 208
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
209

Swann Perarnau's avatar
Swann Perarnau committed
210 211 212
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

213
        The NRM should answer with an acknowledgment."""
Swann Perarnau's avatar
Swann Perarnau committed
214

215 216 217 218 219
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
220 221 222
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
223
                   }
224 225 226 227 228 229
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
230 231 232 233 234 235 236 237 238 239

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
240 241 242 243
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
244 245 246
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
Swann Perarnau's avatar
Swann Perarnau committed
247 248
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
249 250 251 252 253
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

254 255 256 257
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

258 259 260 261 262
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
263
        parser_listen.add_argument("-f", "--filter",
264 265
                                   help="type of message to filter and"
                                        " prettyprint, in {power,performance}",
266
                                   default=None)
267 268
        parser_listen.set_defaults(func=self.do_listen)

Swann Perarnau's avatar
Swann Perarnau committed
269 270 271 272 273
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
274
        parser_setpower.add_argument("limit",
Swann Perarnau's avatar
Swann Perarnau committed
275 276 277
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
278

Swann Perarnau's avatar
Swann Perarnau committed
279 280
        args = parser.parse_args()
        if args.verbose:
281
            logger.setLevel(logging.DEBUG)
Swann Perarnau's avatar
Swann Perarnau committed
282 283 284 285 286 287 288 289 290

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()