cmd 10.1 KB
Newer Older
1 2 3 4 5 6
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import logging
import signal
7
import os
8
import nrm.messaging
9
import uuid
10
import sys
11
import time
12
import collections
13

14 15
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
16
KillArgs = collections.namedtuple("Kill", ["uuid"])
17

18 19 20 21 22 23

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
24
        pass
25

26 27 28 29
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
30 31 32 33 34 35 36
            command = {'api': 'up_rpc_req',
                       'type': 'kill',
                       'container_uuid': uuid
                       }
            msg = RPC_MSG['kill'](**command)
            self.client.sendmsg(msg)
            logger.info("killed the application, exiting.")
37 38
        else:
            logger.info("received signal %d, exiting", signum)
39
        exit(130)
40 41

    def setup(self):
42 43 44 45
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
46 47

        # take care of signals
48 49 50
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
51

52
        self.client.wait_connected()
53

54 55 56 57 58 59 60 61 62
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
        self.pub_client.wait_connected()

        while(True):
            msg = self.pub_client.recvmsg()
63 64 65 66 67
            logger.debug("pub message: %s", msg)

            def print_if_filter():
                if argv.filter:
                    if argv.filter == msg.type:
68 69 70 71 72 73 74
                        if (msg.type == "performance" or
                           msg.type == "progress"):
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.payload))
                        if msg.type == "power":
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.total))
75
                        sys.stdout.flush()
76 77 78 79
                        if msg.type == "container_exit":
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.profile_data))
                        sys.stdout.flush()
80 81 82 83
                else:
                    print("%s, %s" % (msg.type, time.time()))
                    sys.stdout.flush()

84 85 86 87 88 89 90
            print_if_filter()
            # if argv.uuid:
            #     uuid = getattr(msg, 'container_uuid', None)
            #     if argv.uuid == uuid or msg.type == "power":
            #         print_if_filter()
            # else:
            #     print_if_filter()
91

92
    def do_run(self, argv):
93 94 95
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

96
        The NRM should reply for container info."""
97 98 99 100

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
101
        environ = os.environ
102
        container_uuid = argv.ucontainername or str(uuid.uuid4())
103 104 105 106 107 108 109 110 111

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

112 113
        command = {'api': 'up_rpc_req',
                   'type': 'run',
114
                   'manifest': sanitize_manifest(argv.manifest),
115
                   'path': argv.command,
116
                   'args': argv.args,
117
                   'environ': dict(environ),
118
                   'container_uuid': container_uuid,
119
                   }
120
        msg = RPC_MSG['run'](**command)
121 122 123 124 125
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
126
        self.client.sendmsg(msg)
127 128 129 130

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
131
        assert msg.type == 'process_start'
132 133 134 135 136

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

137
        state = 'started'
138
        while(True):
139 140
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
141
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
142

143
            if msg.type == 'stdout':
144
                logger.debug("container msg: %r", msg)
145 146
                if msg.payload == 'eof':
                    outeof = True
147 148
                else:
                    print(msg.payload)
149
            elif msg.type == 'stderr':
150
                logger.debug("container msg: %r", msg)
151 152
                if msg.payload == 'eof':
                    erreof = True
153 154
                else:
                    print(msg.payload, file=sys.stderr)
155 156
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
157 158 159 160
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
161 162
            if outeof and erreof and state == 'exiting':
                state = 'exit'
163
                logger.info("command ended: %r", exitmsg)
164
                sys.exit(int(exitmsg.status))
165
                break
166

167 168 169 170
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

171 172
        The NRM should respond to us with one message listing all
        containers."""
173

174 175 176 177 178 179 180 181
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
182

183 184 185
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

186 187
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
188

189 190 191
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
192
                   }
193 194 195 196 197 198
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
199

200 201 202
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

203
        The NRM should answer with an acknowledgment."""
204

205 206 207 208 209
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
210 211 212
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
213
                   }
214 215 216 217 218 219
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
220 221 222 223 224 225 226 227 228 229

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
230 231 232 233
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
234 235 236
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
237 238
        parser_run.set_defaults(func=self.do_run)

239 240 241 242 243
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

244 245 246 247
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

248 249 250 251 252
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
253 254 255 256
        parser_listen.add_argument("-f", "--filter",
                                   help="type of message to filter and \
                                        \ prettyprint, in {power,performance}",
                                   default=None)
257 258
        parser_listen.set_defaults(func=self.do_listen)

259 260 261 262 263
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
264
        parser_setpower.add_argument("limit",
265 266 267
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
268

269 270
        args = parser.parse_args()
        if args.verbose:
271
            logger.setLevel(logging.DEBUG)
272 273 274 275 276 277 278 279 280

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()