cmd 8.42 KB
Newer Older
Swann Perarnau's avatar
Swann Perarnau committed
1 2 3 4 5 6
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import logging
import signal
7
import os
8
import nrm.messaging
9
import uuid
Valentin Reis's avatar
Valentin Reis committed
10 11
import sys
import collections
Swann Perarnau's avatar
Swann Perarnau committed
12

13 14
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
Valentin Reis's avatar
Valentin Reis committed
15
KillArgs = collections.namedtuple("Kill", ["uuid"])
16

Swann Perarnau's avatar
Swann Perarnau committed
17 18 19 20 21 22

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
23
        pass
Swann Perarnau's avatar
Swann Perarnau committed
24

Valentin Reis's avatar
Valentin Reis committed
25 26 27 28
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
29 30 31 32 33 34 35
            command = {'api': 'up_rpc_req',
                       'type': 'kill',
                       'container_uuid': uuid
                       }
            msg = RPC_MSG['kill'](**command)
            self.client.sendmsg(msg)
            logger.info("killed the application, exiting.")
Valentin Reis's avatar
Valentin Reis committed
36 37
        else:
            logger.info("received signal %d, exiting", signum)
38
        exit(130)
Swann Perarnau's avatar
Swann Perarnau committed
39 40

    def setup(self):
41 42 43 44
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
Swann Perarnau's avatar
Swann Perarnau committed
45 46

        # take care of signals
Valentin Reis's avatar
Valentin Reis committed
47 48 49
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
Swann Perarnau's avatar
Swann Perarnau committed
50

51
        self.client.wait_connected()
Swann Perarnau's avatar
Swann Perarnau committed
52

53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
        self.pub_client.wait_connected()

        while(True):
            msg = self.pub_client.recvmsg()
            if argv.uuid:
                uuid = getattr(msg, 'container_uuid', None)
                if argv.container == uuid:
                    logger.info("pub message", msg)
            else:
                logger.info("pub message", msg)

Swann Perarnau's avatar
Swann Perarnau committed
69
    def do_run(self, argv):
70 71 72
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

73
        The NRM should reply for container info."""
74 75 76 77

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
78
        environ = os.environ
79
        container_uuid = argv.ucontainername or str(uuid.uuid4())
80 81
        command = {'api': 'up_rpc_req',
                   'type': 'run',
82
                   'manifest': argv.manifest,
83
                   'path': argv.command,
84
                   'args': argv.args,
85
                   'environ': dict(environ),
86
                   'container_uuid': container_uuid,
87
                   }
88
        msg = RPC_MSG['run'](**command)
89 90 91 92 93
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
94
        self.client.sendmsg(msg)
95 96 97 98

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
99
        assert msg.type == 'process_start'
Valentin Reis's avatar
Valentin Reis committed
100 101 102 103 104

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

105
        state = 'started'
106
        while(True):
107 108
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
109
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
110

111
            if msg.type == 'stdout':
112 113 114 115 116 117 118 119 120
                logger.info("container msg: %r", msg)
                if msg.payload == 'eof':
                    outeof = True
            elif msg.type == 'stderr':
                logger.info("container msg: %r", msg)
                if msg.payload == 'eof':
                    erreof = True
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
121 122 123 124
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
125 126
            if outeof and erreof and state == 'exiting':
                state = 'exit'
127
                logger.info("command ended: %r", exitmsg)
Valentin Reis's avatar
Valentin Reis committed
128
                sys.exit(int(exitmsg.status))
129
                break
Swann Perarnau's avatar
Swann Perarnau committed
130

131 132 133 134
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

135 136
        The NRM should respond to us with one message listing all
        containers."""
137

138 139 140 141 142 143 144 145
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
146

Swann Perarnau's avatar
Swann Perarnau committed
147 148 149
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

150 151
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
152

153 154 155
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
Swann Perarnau's avatar
Swann Perarnau committed
156
                   }
157 158 159 160 161 162
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
163

Swann Perarnau's avatar
Swann Perarnau committed
164 165 166
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

167
        The NRM should answer with an acknowledgment."""
Swann Perarnau's avatar
Swann Perarnau committed
168

169 170 171 172 173
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
174 175 176
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
177
                   }
178 179 180 181 182 183
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
184 185 186 187 188 189 190 191 192 193

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
194 195 196 197
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
198 199 200
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
Swann Perarnau's avatar
Swann Perarnau committed
201 202
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
203 204 205 206 207
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

208 209 210 211
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

212 213 214 215 216 217 218
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
        parser_listen.set_defaults(func=self.do_listen)

Swann Perarnau's avatar
Swann Perarnau committed
219 220 221 222 223
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
224
        parser_setpower.add_argument("limit",
Swann Perarnau's avatar
Swann Perarnau committed
225 226 227
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
228

Swann Perarnau's avatar
Swann Perarnau committed
229 230
        args = parser.parse_args()
        if args.verbose:
231
            logger.setLevel(logging.DEBUG)
Swann Perarnau's avatar
Swann Perarnau committed
232 233 234 235 236 237 238 239 240

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()