nrm 11.2 KB
Newer Older
1 2
#!/usr/bin/env python2

3 4 5 6 7 8 9 10 11 12
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

13 14 15 16
from __future__ import print_function
import argparse
import logging
import signal
17
import os
18
import nrm.messaging
19
import uuid
20
import sys
21
import time
22
import collections
23

24 25
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
26
KillArgs = collections.namedtuple("Kill", ["uuid"])
27

28 29 30 31 32 33

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
34
        pass
35

36 37 38 39
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
40 41 42 43 44 45 46
            command = {'api': 'up_rpc_req',
                       'type': 'kill',
                       'container_uuid': uuid
                       }
            msg = RPC_MSG['kill'](**command)
            self.client.sendmsg(msg)
            logger.info("killed the application, exiting.")
47 48
        else:
            logger.info("received signal %d, exiting", signum)
49
        exit(130)
50 51

    def setup(self):
52 53 54 55
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
56 57

        # take care of signals
58 59 60
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
61
        signal.signal(signal.SIGTERM, handler)
62

63
        self.client.connect()
64

65 66 67 68 69
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
70
        self.pub_client.connect()
71 72 73

        while(True):
            msg = self.pub_client.recvmsg()
74 75 76 77 78
            logger.debug("pub message: %s", msg)

            def print_if_filter():
                if argv.filter:
                    if argv.filter == msg.type:
79 80 81 82 83 84 85
                        if (msg.type == "performance" or
                           msg.type == "progress"):
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.payload))
                        if msg.type == "power":
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.total))
86 87 88
                        if msg.type == "container_exit":
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.profile_data))
89 90
                else:
                    print("%s, %s" % (msg.type, time.time()))
91
                sys.stdout.flush()
92

93 94 95 96 97 98 99
            print_if_filter()
            # if argv.uuid:
            #     uuid = getattr(msg, 'container_uuid', None)
            #     if argv.uuid == uuid or msg.type == "power":
            #         print_if_filter()
            # else:
            #     print_if_filter()
100

101
    def do_run(self, argv):
102 103 104
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

105
        The NRM should reply for container info."""
106 107 108 109

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
110
        environ = os.environ
111
        container_uuid = argv.ucontainername or str(uuid.uuid4())
112 113 114 115 116 117 118 119 120

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

121 122
        command = {'api': 'up_rpc_req',
                   'type': 'run',
123
                   'manifest': sanitize_manifest(argv.manifest),
124
                   'path': argv.command,
125
                   'args': argv.args,
126
                   'environ': dict(environ),
127
                   'container_uuid': container_uuid,
128
                   }
129
        msg = RPC_MSG['run'](**command)
130 131 132 133 134
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
135
        self.client.sendmsg(msg)
136 137 138 139

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
140
        assert msg.type == 'process_start'
141 142 143 144 145

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

146
        state = 'started'
147
        while(True):
148 149
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
150
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
151

152
            if msg.type == 'stdout':
153
                logger.debug("container msg: %r", msg)
154 155
                if msg.payload == 'eof':
                    outeof = True
156 157
                else:
                    print(msg.payload)
158
                    sys.stdout.flush()
159
            elif msg.type == 'stderr':
160
                logger.debug("container msg: %r", msg)
161 162
                if msg.payload == 'eof':
                    erreof = True
163 164
                else:
                    print(msg.payload, file=sys.stderr)
165
                    sys.stdout.flush()
166 167
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
168 169 170 171
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
172 173
            if outeof and erreof and state == 'exiting':
                state = 'exit'
174
                istatus = int(exitmsg.status)
175
                logger.info("command ended: %r", exitmsg)
176
                if os.WIFSIGNALED(istatus):
177
                    logger.error("command ended due to signal %s" %
178
                                 str(os.WTERMSIG(istatus)))
179
                    sys.exit(1)
180 181
                elif os.WIFEXITED(istatus):
                    s = int(os.WTERMSIG(istatus))
182 183 184 185 186 187 188 189 190
                    if s > 0:
                        logger.debug("command ended with exit code %s" %
                                     str(s))
                    sys.exit(s)
                else:
                    logger.error(
                            "non-compliant exit code received from"
                            "daemon: %s" % str(exitmsg.status))
                    sys.exit(1)
191
                break
192

193 194 195 196
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

197 198
        The NRM should respond to us with one message listing all
        containers."""
199

200 201 202 203 204 205 206 207
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
208

Swann Perarnau's avatar
Swann Perarnau committed
209 210 211
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

212 213
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
214

215 216 217
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
Swann Perarnau's avatar
Swann Perarnau committed
218
                   }
219 220 221 222 223 224
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
225

226 227 228
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

229
        The NRM should answer with an acknowledgment."""
230

231 232 233 234 235
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
236 237 238
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
239
                   }
240 241 242 243 244 245
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
246 247 248 249 250 251 252 253 254 255

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
256 257 258 259
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
260 261 262
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
263 264
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
265 266 267 268 269
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

270 271 272 273
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

274 275 276 277 278
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
279
        parser_listen.add_argument("-f", "--filter",
280 281
                                   help="type of message to filter and"
                                        " prettyprint, in {power,performance}",
282
                                   default=None)
283 284
        parser_listen.set_defaults(func=self.do_listen)

285 286 287 288 289
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
290
        parser_setpower.add_argument("limit",
291 292 293
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
294

295 296
        args = parser.parse_args()
        if args.verbose:
297
            logger.setLevel(logging.DEBUG)
298 299 300 301 302 303 304 305 306

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()