cmd 8.86 KB
Newer Older
1 2 3 4 5 6
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import logging
import signal
7
import os
8
import nrm.messaging
9
import uuid
10 11
import sys
import collections
12

13 14
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
15
KillArgs = collections.namedtuple("Kill", ["uuid"])
16

17 18 19 20 21 22

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
23
        pass
24

25 26 27 28
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
29 30 31 32 33 34 35
            command = {'api': 'up_rpc_req',
                       'type': 'kill',
                       'container_uuid': uuid
                       }
            msg = RPC_MSG['kill'](**command)
            self.client.sendmsg(msg)
            logger.info("killed the application, exiting.")
36 37
        else:
            logger.info("received signal %d, exiting", signum)
38
        exit(130)
39 40

    def setup(self):
41 42 43 44
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
45 46

        # take care of signals
47 48 49
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
50

51
        self.client.wait_connected()
52

53 54 55 56 57 58 59 60 61 62 63
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
        self.pub_client.wait_connected()

        while(True):
            msg = self.pub_client.recvmsg()
            if argv.uuid:
                uuid = getattr(msg, 'container_uuid', None)
64
                if argv.uuid == uuid:
65
                    logger.info("pub message: %s", msg)
66
            else:
67
                logger.info("pub message: %s", msg)
68

69
    def do_run(self, argv):
70 71 72
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

73
        The NRM should reply for container info."""
74 75 76 77

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
78
        environ = os.environ
79
        container_uuid = argv.ucontainername or str(uuid.uuid4())
80 81 82 83 84 85 86 87 88

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

89 90
        command = {'api': 'up_rpc_req',
                   'type': 'run',
91
                   'manifest': sanitize_manifest(argv.manifest),
92
                   'path': argv.command,
93
                   'args': argv.args,
94
                   'environ': dict(environ),
95
                   'container_uuid': container_uuid,
96
                   }
97
        msg = RPC_MSG['run'](**command)
98 99 100 101 102
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
103
        self.client.sendmsg(msg)
104 105 106 107

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
108
        assert msg.type == 'process_start'
109 110 111 112 113

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

114
        state = 'started'
115
        while(True):
116 117
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
118
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
119

120
            if msg.type == 'stdout':
121
                logger.debug("container msg: %r", msg)
122 123
                if msg.payload == 'eof':
                    outeof = True
124 125
                else:
                    print(msg.payload)
126
            elif msg.type == 'stderr':
127
                logger.debug("container msg: %r", msg)
128 129
                if msg.payload == 'eof':
                    erreof = True
130 131
                else:
                    print(msg.payload, file=sys.stderr)
132 133
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
134 135 136 137
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
138 139
            if outeof and erreof and state == 'exiting':
                state = 'exit'
140
                logger.info("command ended: %r", exitmsg)
141
                sys.exit(int(exitmsg.status))
142
                break
143

144 145 146 147
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

148 149
        The NRM should respond to us with one message listing all
        containers."""
150

151 152 153 154 155 156 157 158
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
159

160 161 162
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

163 164
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
165

166 167 168
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
169
                   }
170 171 172 173 174 175
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
176

177 178 179
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

180
        The NRM should answer with an acknowledgment."""
181

182 183 184 185 186
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
187 188 189
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
190
                   }
191 192 193 194 195 196
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
197 198 199 200 201 202 203 204 205 206

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
207 208 209 210
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
211 212 213
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
214 215
        parser_run.set_defaults(func=self.do_run)

216 217 218 219 220
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

221 222 223 224
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

225 226 227 228 229 230 231
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
        parser_listen.set_defaults(func=self.do_listen)

232 233 234 235 236
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
237
        parser_setpower.add_argument("limit",
238 239 240
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
241

242 243
        args = parser.parse_args()
        if args.verbose:
244
            logger.setLevel(logging.DEBUG)
245 246 247 248 249 250 251 252 253

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()