nrm 9.87 KB
Newer Older
1 2
#!/usr/bin/env python2

3 4 5 6 7 8 9 10 11 12
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

13 14 15 16
from __future__ import print_function
import argparse
import logging
import signal
17
import os
18
import nrm.messaging
19
import uuid
20
import sys
21
import time
22

23
logger = logging.getLogger('nrm')
24

25 26 27 28 29 30

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
31
        pass
32

33 34 35 36
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
37 38 39
            self.client.send(
                    "Kill",
                    container_uuid=uuid)
40
            logger.info("killed the application, exiting.")
41 42
        else:
            logger.info("received signal %d, exiting", signum)
43
        exit(130)
44 45

    def setup(self):
46 47 48 49
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
50 51

        # take care of signals
52 53 54
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
55
        signal.signal(signal.SIGTERM, handler)
56

57
        self.client.connect()
58

59 60 61 62 63
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
64
        self.pub_client.connect()
65 66

        while(True):
67
            msg = self.pub_client.recv()
68 69 70 71
            logger.debug("pub message: %s", msg)

            def print_if_filter():
                if argv.filter:
72 73 74 75
                    if argv.filter == msg.tag:
                        if (msg.tag == "performance" or
                           msg.tag == "progress"):
                            print("%s, %s, %s" % (msg.tag, time.time(),
76
                                                  msg.payload))
77 78
                        if msg.tag == "power":
                            print("%s, %s, %s" % (msg.tag, time.time(),
79
                                                  msg.total))
80 81
                        if msg.tag == "exit":
                            print("%s, %s, %s" % (msg.tag, time.time(),
82
                                                  msg.profile_data))
83
                else:
84
                    print("%s, %s" % (msg.tag, time.time()))
85
                sys.stdout.flush()
86
            print_if_filter()
87

88
    def do_run(self, argv):
89 90 91
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

92
        The NRM should reply for container info."""
93 94 95 96

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
97
        environ = os.environ
98
        # environ = []
99
        container_uuid = argv.ucontainername or str(uuid.uuid4())
100 101 102 103 104 105 106 107 108

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

109 110 111 112 113
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
114 115 116 117 118 119 120
        self.client.send(
                tag="run",
                manifest=sanitize_manifest(argv.manifest),
                path=argv.command,
                args=argv.args,
                environ=dict(environ),
                container_uuid=container_uuid)
121 122

        # the first message tells us if we started a container or not
123 124
        msg = self.client.recv()
        assert msg.tag == 'start'
125 126 127 128 129

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

130
        state = 'started'
131
        while(True):
132 133
            msg = self.client.recv()
            assert msg.tag in ['stdout', 'stderr', 'exit']
134

135
            if msg.tag == 'stdout':
136
                logger.debug("container msg: %r", msg)
137 138
                if msg.payload == 'eof':
                    outeof = True
139
                else:
140
                    print(msg.payload, file=sys.stdout)
141
                    sys.stdout.flush()
142
            elif msg.tag == 'stderr':
143
                logger.debug("container msg: %r", msg)
144 145
                if msg.payload == 'eof':
                    erreof = True
146 147
                else:
                    print(msg.payload, file=sys.stderr)
148 149
                    sys.stderr.flush()
            elif msg.tag == 'exit':
150 151 152 153
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
154 155
            if outeof and erreof and state == 'exiting':
                state = 'exit'
156
                istatus = int(exitmsg.status)
157
                logger.debug("command ended with istatus %r.", exitmsg)
158
                if os.WIFSIGNALED(istatus):
159
                    logger.error("command ended due to signal %s" %
160
                                 str(os.WTERMSIG(istatus)))
161
                    sys.exit(1)
162 163
                elif os.WIFEXITED(istatus):
                    s = int(os.WTERMSIG(istatus))
164
                    if s > 0:
165
                        logger.error("command ended with exit code %s" %
166 167 168 169 170 171 172
                                     str(s))
                    sys.exit(s)
                else:
                    logger.error(
                            "non-compliant exit code received from"
                            "daemon: %s" % str(exitmsg.status))
                    sys.exit(1)
173
                break
174

175 176 177 178
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

179 180
        The NRM should respond to us with one message listing all
        containers."""
181

182 183 184
        self.client.send(tag="list")
        msg = self.client.recv()
        assert msg.tag == 'list'
185
        logger.info("list response: %r", msg)
186

Swann Perarnau's avatar
Swann Perarnau committed
187 188 189
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

190 191
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
192

193 194 195
        self.client.send(tag="kill", container_uuid=argv.uuid)
        msg = self.client.recv()
        assert msg.tag == 'exit'
196
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
197

198 199 200
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

201
        The NRM should answer with an acknowledgment."""
202

203 204 205 206 207
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
208 209 210
        self.client.send(tag="setPower", limit=str(argv.limit))
        msg = self.client.recv()
        assert msg.tag == 'getPower'
211
        logger.info("command received by the daemon: %r", msg)
212 213 214 215 216 217 218 219 220 221

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
222 223 224 225
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
226 227 228
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
229 230
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
231 232 233 234 235
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

236 237 238 239
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

240 241 242 243 244
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
245
        parser_listen.add_argument("-f", "--filter",
246 247
                                   help="type of message to filter and"
                                        " prettyprint, in {power,performance}",
248
                                   default=None)
249 250
        parser_listen.set_defaults(func=self.do_listen)

251 252 253 254 255
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
256
        parser_setpower.add_argument("limit",
257 258 259
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
260

261 262
        args = parser.parse_args()
        if args.verbose:
263
            logger.setLevel(logging.DEBUG)
264 265 266 267 268 269 270 271 272

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()