cmd 8.22 KB
Newer Older
Swann Perarnau's avatar
Swann Perarnau committed
1 2 3 4 5 6
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import logging
import signal
7
import os
8
import nrm.messaging
9
import uuid
Valentin Reis's avatar
Valentin Reis committed
10 11
import sys
import collections
Swann Perarnau's avatar
Swann Perarnau committed
12

13 14
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
Valentin Reis's avatar
Valentin Reis committed
15
KillArgs = collections.namedtuple("Kill", ["uuid"])
16

Swann Perarnau's avatar
Swann Perarnau committed
17 18 19 20 21 22

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
23
        pass
Swann Perarnau's avatar
Swann Perarnau committed
24

Valentin Reis's avatar
Valentin Reis committed
25 26 27 28 29 30 31 32
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
            self.do_kill(KillArgs(uuid))
            logger.info("killed the application.", signum)
        else:
            logger.info("received signal %d, exiting", signum)
33
        exit(1)
Swann Perarnau's avatar
Swann Perarnau committed
34 35

    def setup(self):
36 37 38 39
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
Swann Perarnau's avatar
Swann Perarnau committed
40 41

        # take care of signals
Valentin Reis's avatar
Valentin Reis committed
42 43 44
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
Swann Perarnau's avatar
Swann Perarnau committed
45

46
        self.client.wait_connected()
Swann Perarnau's avatar
Swann Perarnau committed
47

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
        self.pub_client.wait_connected()

        while(True):
            msg = self.pub_client.recvmsg()
            if argv.uuid:
                uuid = getattr(msg, 'container_uuid', None)
                if argv.container == uuid:
                    logger.info("pub message", msg)
            else:
                logger.info("pub message", msg)

Swann Perarnau's avatar
Swann Perarnau committed
64
    def do_run(self, argv):
65 66 67
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

68
        The NRM should reply for container info."""
69 70 71 72

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
73
        environ = os.environ
74
        container_uuid = argv.ucontainername or str(uuid.uuid4())
75 76
        command = {'api': 'up_rpc_req',
                   'type': 'run',
77
                   'manifest': argv.manifest,
78
                   'path': argv.command,
79
                   'args': argv.args,
80
                   'environ': dict(environ),
81
                   'container_uuid': container_uuid,
82
                   }
83
        msg = RPC_MSG['run'](**command)
84 85 86 87 88
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
89
        self.client.sendmsg(msg)
90 91 92 93

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
94
        assert msg.type == 'process_start'
Valentin Reis's avatar
Valentin Reis committed
95 96 97 98 99

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

100
        state = 'started'
101
        while(True):
102 103
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
104
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
105

106
            if msg.type == 'stdout':
107 108 109 110 111 112 113 114 115
                logger.info("container msg: %r", msg)
                if msg.payload == 'eof':
                    outeof = True
            elif msg.type == 'stderr':
                logger.info("container msg: %r", msg)
                if msg.payload == 'eof':
                    erreof = True
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
116 117 118 119
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
120 121
            if outeof and erreof and state == 'exiting':
                state = 'exit'
122
                logger.info("command ended: %r", exitmsg)
Valentin Reis's avatar
Valentin Reis committed
123
                sys.exit(int(exitmsg.status))
124
                break
Swann Perarnau's avatar
Swann Perarnau committed
125

126 127 128 129
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

130 131
        The NRM should respond to us with one message listing all
        containers."""
132

133 134 135 136 137 138 139 140
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
141

Swann Perarnau's avatar
Swann Perarnau committed
142 143 144
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

145 146
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
147

148 149 150
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
Swann Perarnau's avatar
Swann Perarnau committed
151
                   }
152 153 154 155 156 157
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
158

Swann Perarnau's avatar
Swann Perarnau committed
159 160 161
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

162
        The NRM should answer with an acknowledgment."""
Swann Perarnau's avatar
Swann Perarnau committed
163

164 165 166 167 168
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
169 170 171
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
172
                   }
173 174 175 176 177 178
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
179 180 181 182 183 184 185 186 187 188

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
189 190 191 192
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
193 194 195
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
Swann Perarnau's avatar
Swann Perarnau committed
196 197
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
198 199 200 201 202
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

203 204 205 206
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

207 208 209 210 211 212 213
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
        parser_listen.set_defaults(func=self.do_listen)

Swann Perarnau's avatar
Swann Perarnau committed
214 215 216 217 218
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
219
        parser_setpower.add_argument("limit",
Swann Perarnau's avatar
Swann Perarnau committed
220 221 222
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
223

Swann Perarnau's avatar
Swann Perarnau committed
224 225
        args = parser.parse_args()
        if args.verbose:
226
            logger.setLevel(logging.DEBUG)
Swann Perarnau's avatar
Swann Perarnau committed
227 228 229 230 231 232 233 234 235

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()