nrm 9.91 KB
Newer Older
1 2
#!/usr/bin/env python2

3 4 5 6 7 8 9 10 11 12
###############################################################################
# Copyright 2019 UChicago Argonne, LLC.
# (c.f. AUTHORS, LICENSE)
#
# This file is part of the NRM project.
# For more info, see https://xgitlab.cels.anl.gov/argo/nrm
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################

13 14 15 16
from __future__ import print_function
import argparse
import logging
import signal
17
import os
18
import nrm.messaging
19
import uuid
20
import sys
21
import time
22

23
logger = logging.getLogger('nrm')
24

25 26 27 28 29 30

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
31
        pass
32

33 34 35 36
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
37 38 39
            self.client.send(
                    "Kill",
                    container_uuid=uuid)
40
            logger.info("killed the application, exiting.")
41 42
        else:
            logger.info("received signal %d, exiting", signum)
43
        exit(130)
44 45

    def setup(self):
46 47 48 49
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
50 51

        # take care of signals
52 53 54
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
55
        signal.signal(signal.SIGTERM, handler)
56

57
        self.client.connect()
58

59 60 61 62 63
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
64
        self.pub_client.connect()
65 66

        while(True):
67
            msg = self.pub_client.recv()
68 69 70 71
            logger.debug("pub message: %s", msg)

            def print_if_filter():
                if argv.filter:
72 73 74 75
                    if argv.filter == msg.tag:
                        if (msg.tag == "performance" or
                           msg.tag == "progress"):
                            print("%s, %s, %s" % (msg.tag, time.time(),
76
                                                  msg.payload))
77 78
                        if msg.tag == "power":
                            print("%s, %s, %s" % (msg.tag, time.time(),
79
                                                  msg.total))
80 81
                        if msg.tag == "exit":
                            print("%s, %s, %s" % (msg.tag, time.time(),
82
                                                  msg.profile_data))
83
                else:
84
                    print("%s, %s" % (msg.tag, time.time()))
85
                sys.stdout.flush()
86
            print_if_filter()
87

88
    def do_run(self, argv):
89 90 91
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

92
        The NRM should reply for container info."""
93 94 95 96

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
97
        environ = os.environ
98
        # environ = []
99
        container_uuid = argv.ucontainername or str(uuid.uuid4())
100 101 102 103 104 105 106 107 108

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

109 110 111 112 113
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
114 115 116 117 118 119 120
        self.client.send(
                tag="run",
                manifest=sanitize_manifest(argv.manifest),
                path=argv.command,
                args=argv.args,
                environ=dict(environ),
                container_uuid=container_uuid)
121 122

        # the first message tells us if we started a container or not
123 124
        msg = self.client.recv()
        assert msg.tag == 'start'
125 126 127 128 129

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

130
        state = 'started'
131
        while(True):
132 133
            msg = self.client.recv()
            assert msg.tag in ['stdout', 'stderr', 'exit']
134

135
            if msg.tag == 'stdout':
136
                logger.debug("container msg: %r", msg)
137 138
                if msg.payload == 'eof':
                    outeof = True
139
                else:
140
                    print(msg.payload, file=sys.stdout)
141
                    sys.stdout.flush()
142
            elif msg.tag == 'stderr':
143
                logger.debug("container msg: %r", msg)
144 145
                if msg.payload == 'eof':
                    erreof = True
146 147
                else:
                    print(msg.payload, file=sys.stderr)
148 149
                    sys.stderr.flush()
            elif msg.tag == 'exit':
150
                logger.info("process ended: %r", msg)
151 152 153 154
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
155 156
            if outeof and erreof and state == 'exiting':
                state = 'exit'
157
                istatus = int(exitmsg.status)
158
                logger.info("command ended: %r", exitmsg)
159
                if os.WIFSIGNALED(istatus):
160
                    logger.error("command ended due to signal %s" %
161
                                 str(os.WTERMSIG(istatus)))
162
                    sys.exit(1)
163 164
                elif os.WIFEXITED(istatus):
                    s = int(os.WTERMSIG(istatus))
165 166 167 168 169 170 171 172 173
                    if s > 0:
                        logger.debug("command ended with exit code %s" %
                                     str(s))
                    sys.exit(s)
                else:
                    logger.error(
                            "non-compliant exit code received from"
                            "daemon: %s" % str(exitmsg.status))
                    sys.exit(1)
174
                break
175

176 177 178 179
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

180 181
        The NRM should respond to us with one message listing all
        containers."""
182

183 184 185
        self.client.send(tag="list")
        msg = self.client.recv()
        assert msg.tag == 'list'
186
        logger.info("list response: %r", msg)
187

Swann Perarnau's avatar
Swann Perarnau committed
188 189 190
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

191 192
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
193

194 195 196
        self.client.send(tag="kill", container_uuid=argv.uuid)
        msg = self.client.recv()
        assert msg.tag == 'exit'
197
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
198

199 200 201
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

202
        The NRM should answer with an acknowledgment."""
203

204 205 206 207 208
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
209 210 211
        self.client.send(tag="setPower", limit=str(argv.limit))
        msg = self.client.recv()
        assert msg.tag == 'getPower'
212
        logger.info("command received by the daemon: %r", msg)
213 214 215 216 217 218 219 220 221 222

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
223 224 225 226
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
227 228 229
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
230 231
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
232 233 234 235 236
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

237 238 239 240
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

241 242 243 244 245
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
246
        parser_listen.add_argument("-f", "--filter",
247 248
                                   help="type of message to filter and"
                                        " prettyprint, in {power,performance}",
249
                                   default=None)
250 251
        parser_listen.set_defaults(func=self.do_listen)

252 253 254 255 256
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
257
        parser_setpower.add_argument("limit",
258 259 260
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
261

262 263
        args = parser.parse_args()
        if args.verbose:
264
            logger.setLevel(logging.DEBUG)
265 266 267 268 269 270 271 272 273

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()