cmd 8.72 KB
Newer Older
Swann Perarnau's avatar
Swann Perarnau committed
1 2 3 4 5 6
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import logging
import signal
7
import os
8
import nrm.messaging
9
import uuid
Valentin Reis's avatar
Valentin Reis committed
10 11
import sys
import collections
Swann Perarnau's avatar
Swann Perarnau committed
12

13 14
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
Valentin Reis's avatar
Valentin Reis committed
15
KillArgs = collections.namedtuple("Kill", ["uuid"])
16

Swann Perarnau's avatar
Swann Perarnau committed
17 18 19 20 21 22

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
23
        pass
Swann Perarnau's avatar
Swann Perarnau committed
24

Valentin Reis's avatar
Valentin Reis committed
25 26 27 28
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
29 30 31 32 33 34 35
            command = {'api': 'up_rpc_req',
                       'type': 'kill',
                       'container_uuid': uuid
                       }
            msg = RPC_MSG['kill'](**command)
            self.client.sendmsg(msg)
            logger.info("killed the application, exiting.")
Valentin Reis's avatar
Valentin Reis committed
36 37
        else:
            logger.info("received signal %d, exiting", signum)
38
        exit(130)
Swann Perarnau's avatar
Swann Perarnau committed
39 40

    def setup(self):
41 42 43 44
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
Swann Perarnau's avatar
Swann Perarnau committed
45 46

        # take care of signals
Valentin Reis's avatar
Valentin Reis committed
47 48 49
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
Swann Perarnau's avatar
Swann Perarnau committed
50

51
        self.client.wait_connected()
Swann Perarnau's avatar
Swann Perarnau committed
52

53 54 55 56 57 58 59 60 61 62 63
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
        self.pub_client.wait_connected()

        while(True):
            msg = self.pub_client.recvmsg()
            if argv.uuid:
                uuid = getattr(msg, 'container_uuid', None)
64
                if argv.uuid == uuid:
65
                    logger.info("pub message: %s", msg)
66
            else:
67
                logger.info("pub message: %s", msg)
68

Swann Perarnau's avatar
Swann Perarnau committed
69
    def do_run(self, argv):
70 71 72
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

73
        The NRM should reply for container info."""
74 75 76 77

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
78
        environ = os.environ
79
        container_uuid = argv.ucontainername or str(uuid.uuid4())
80 81 82 83 84 85 86 87 88

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

89 90
        command = {'api': 'up_rpc_req',
                   'type': 'run',
91
                   'manifest': sanitize_manifest(argv.manifest),
92
                   'path': argv.command,
93
                   'args': argv.args,
94
                   'environ': dict(environ),
95
                   'container_uuid': container_uuid,
96
                   }
97
        msg = RPC_MSG['run'](**command)
98 99 100 101 102
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
103
        self.client.sendmsg(msg)
104 105 106 107

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
108
        assert msg.type == 'process_start'
Valentin Reis's avatar
Valentin Reis committed
109 110 111 112 113

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

114
        state = 'started'
115
        while(True):
116 117
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
118
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
119

120
            if msg.type == 'stdout':
121 122 123 124 125 126 127 128 129
                logger.info("container msg: %r", msg)
                if msg.payload == 'eof':
                    outeof = True
            elif msg.type == 'stderr':
                logger.info("container msg: %r", msg)
                if msg.payload == 'eof':
                    erreof = True
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
130 131 132 133
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
134 135
            if outeof and erreof and state == 'exiting':
                state = 'exit'
136
                logger.info("command ended: %r", exitmsg)
Valentin Reis's avatar
Valentin Reis committed
137
                sys.exit(int(exitmsg.status))
138
                break
Swann Perarnau's avatar
Swann Perarnau committed
139

140 141 142 143
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

144 145
        The NRM should respond to us with one message listing all
        containers."""
146

147 148 149 150 151 152 153 154
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
155

Swann Perarnau's avatar
Swann Perarnau committed
156 157 158
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

159 160
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
161

162 163 164
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
Swann Perarnau's avatar
Swann Perarnau committed
165
                   }
166 167 168 169 170 171
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
172

Swann Perarnau's avatar
Swann Perarnau committed
173 174 175
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

176
        The NRM should answer with an acknowledgment."""
Swann Perarnau's avatar
Swann Perarnau committed
177

178 179 180 181 182
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
183 184 185
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
186
                   }
187 188 189 190 191 192
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
193 194 195 196 197 198 199 200 201 202

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
203 204 205 206
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
207 208 209
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
Swann Perarnau's avatar
Swann Perarnau committed
210 211
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
212 213 214 215 216
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

217 218 219 220
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

221 222 223 224 225 226 227
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
        parser_listen.set_defaults(func=self.do_listen)

Swann Perarnau's avatar
Swann Perarnau committed
228 229 230 231 232
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
233
        parser_setpower.add_argument("limit",
Swann Perarnau's avatar
Swann Perarnau committed
234 235 236
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
237

Swann Perarnau's avatar
Swann Perarnau committed
238 239
        args = parser.parse_args()
        if args.verbose:
240
            logger.setLevel(logging.DEBUG)
Swann Perarnau's avatar
Swann Perarnau committed
241 242 243 244 245 246 247 248 249

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()