cmd 9.54 KB
Newer Older
Swann Perarnau's avatar
Swann Perarnau committed
1 2 3 4 5 6
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import logging
import signal
7
import os
8
import nrm.messaging
9
import uuid
Valentin Reis's avatar
Valentin Reis committed
10
import sys
11
import time
Valentin Reis's avatar
Valentin Reis committed
12
import collections
Swann Perarnau's avatar
Swann Perarnau committed
13

14 15
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
Valentin Reis's avatar
Valentin Reis committed
16
KillArgs = collections.namedtuple("Kill", ["uuid"])
17

Swann Perarnau's avatar
Swann Perarnau committed
18 19 20 21 22 23

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
24
        pass
Swann Perarnau's avatar
Swann Perarnau committed
25

Valentin Reis's avatar
Valentin Reis committed
26 27 28 29
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
30 31 32 33 34 35 36
            command = {'api': 'up_rpc_req',
                       'type': 'kill',
                       'container_uuid': uuid
                       }
            msg = RPC_MSG['kill'](**command)
            self.client.sendmsg(msg)
            logger.info("killed the application, exiting.")
Valentin Reis's avatar
Valentin Reis committed
37 38
        else:
            logger.info("received signal %d, exiting", signum)
39
        exit(130)
Swann Perarnau's avatar
Swann Perarnau committed
40 41

    def setup(self):
42 43 44 45
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
Swann Perarnau's avatar
Swann Perarnau committed
46 47

        # take care of signals
Valentin Reis's avatar
Valentin Reis committed
48 49 50
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
Swann Perarnau's avatar
Swann Perarnau committed
51

52
        self.client.wait_connected()
Swann Perarnau's avatar
Swann Perarnau committed
53

54 55 56 57 58 59 60 61 62
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
        self.pub_client.wait_connected()

        while(True):
            msg = self.pub_client.recvmsg()
63 64 65 66 67 68 69 70 71 72 73 74
            logger.debug("pub message: %s", msg)

            def print_if_filter():
                if argv.filter:
                    if argv.filter == msg.type:
                        print("%s, %s, %s" % (msg.type, time.time(),
                                              msg.payload))
                        sys.stdout.flush()
                else:
                    print("%s, %s" % (msg.type, time.time()))
                    sys.stdout.flush()

75 76
            if argv.uuid:
                uuid = getattr(msg, 'container_uuid', None)
77
                if argv.uuid == uuid:
78
                    print_if_filter()
79
            else:
80
                print_if_filter()
81

Swann Perarnau's avatar
Swann Perarnau committed
82
    def do_run(self, argv):
83 84 85
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

86
        The NRM should reply for container info."""
87 88 89 90

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
91
        environ = os.environ
92
        container_uuid = argv.ucontainername or str(uuid.uuid4())
93 94 95 96 97 98 99 100 101

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

102 103
        command = {'api': 'up_rpc_req',
                   'type': 'run',
104
                   'manifest': sanitize_manifest(argv.manifest),
105
                   'path': argv.command,
106
                   'args': argv.args,
107
                   'environ': dict(environ),
108
                   'container_uuid': container_uuid,
109
                   }
110
        msg = RPC_MSG['run'](**command)
111 112 113 114 115
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
116
        self.client.sendmsg(msg)
117 118 119 120

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
121
        assert msg.type == 'process_start'
Valentin Reis's avatar
Valentin Reis committed
122 123 124 125 126

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

127
        state = 'started'
128
        while(True):
129 130
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
131
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
132

133
            if msg.type == 'stdout':
134
                logger.debug("container msg: %r", msg)
135 136
                if msg.payload == 'eof':
                    outeof = True
137 138
                else:
                    print(msg.payload)
139
            elif msg.type == 'stderr':
140
                logger.debug("container msg: %r", msg)
141 142
                if msg.payload == 'eof':
                    erreof = True
143 144
                else:
                    print(msg.payload, file=sys.stderr)
145 146
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
147 148 149 150
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
151 152
            if outeof and erreof and state == 'exiting':
                state = 'exit'
153
                logger.info("command ended: %r", exitmsg)
Valentin Reis's avatar
Valentin Reis committed
154
                sys.exit(int(exitmsg.status))
155
                break
Swann Perarnau's avatar
Swann Perarnau committed
156

157 158 159 160
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

161 162
        The NRM should respond to us with one message listing all
        containers."""
163

164 165 166 167 168 169 170 171
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
172

Swann Perarnau's avatar
Swann Perarnau committed
173 174 175
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

176 177
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
178

179 180 181
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
Swann Perarnau's avatar
Swann Perarnau committed
182
                   }
183 184 185 186 187 188
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
189

Swann Perarnau's avatar
Swann Perarnau committed
190 191 192
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

193
        The NRM should answer with an acknowledgment."""
Swann Perarnau's avatar
Swann Perarnau committed
194

195 196 197 198 199
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
200 201 202
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
203
                   }
204 205 206 207 208 209
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
210 211 212 213 214 215 216 217 218 219

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
220 221 222 223
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
224 225 226
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
Swann Perarnau's avatar
Swann Perarnau committed
227 228
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
229 230 231 232 233
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

234 235 236 237
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

238 239 240 241 242
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
243 244 245 246
        parser_listen.add_argument("-f", "--filter",
                                   help="type of message to filter and \
                                        \ prettyprint, in {power,performance}",
                                   default=None)
247 248
        parser_listen.set_defaults(func=self.do_listen)

Swann Perarnau's avatar
Swann Perarnau committed
249 250 251 252 253
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
254
        parser_setpower.add_argument("limit",
Swann Perarnau's avatar
Swann Perarnau committed
255 256 257
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
258

Swann Perarnau's avatar
Swann Perarnau committed
259 260
        args = parser.parse_args()
        if args.verbose:
261
            logger.setLevel(logging.DEBUG)
Swann Perarnau's avatar
Swann Perarnau committed
262 263 264 265 266 267 268 269 270

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()