cmd 8.8 KB
Newer Older
1 2 3 4 5 6 7 8
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import logging
import uuid
import signal
import zmq
9
import os
10

11 12
logger = logging.getLogger('nrm-cmd')

13 14 15 16 17 18

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
19
        pass
20

21
    def do_signal(self, signum, stackframe):
22
        logger.info("received signal %d, exiting", signum)
23
        exit(1)
24 25 26 27 28 29 30 31 32 33 34

    def setup(self):
        # SUB port to the upstream API (connected to its PUB port)
        upstream_sub_port = 2345
        # PUB port to the upstream API (connected to its SUB port)
        upstream_pub_port = 3456

        self.context = zmq.Context()
        self.upstream_pub_socket = self.context.socket(zmq.PUB)
        self.upstream_sub_socket = self.context.socket(zmq.SUB)

35
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
36 37
        upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)

38
        self.upstream_pub_socket.connect(upstream_pub_param)
39 40 41 42 43
        self.upstream_sub_socket.connect(upstream_sub_param)
        # we want to receive everything for now
        upstream_sub_filter = ""
        self.upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)

44 45
        logger.info("upstream pub socket bound to: %s", upstream_pub_param)
        logger.info("upstream sub socket connected to: %s", upstream_sub_param)
46 47 48 49 50 51

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)

        # create a uuid for this client instance
        self.uuid = str(uuid.uuid4())
52
        logger.info("client uuid: %r", self.uuid)
53 54

    def do_run(self, argv):
55 56 57 58 59 60 61 62 63
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

        The NRM should notify us on the pub socket of the container
        creation."""

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
64 65 66 67
        environ = os.environ
        command = {'clientid': self.uuid,
                   'ucontainername': argv.ucontainername,
                   'command': 'run',
68 69 70
                   'manifest': argv.manifest,
                   'file': argv.command,
                   'args': argv.args,
71
                   'environ': dict(environ),
72
                   }
73 74 75 76 77 78
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
        self.upstream_pub_socket.send_json(command)
79 80 81
        while(True):
            msg = self.upstream_sub_socket.recv_json()
            if isinstance(msg, dict) and msg.get('type') == 'container':
82
                if msg['clientid'] == self.uuid:
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
                    if msg['event'] == 'start':
                        if state == 'init':
                            state = 'started'
                            logger.info("container started: %r", msg)
                        else:
                            logger.info("unexpected start message: %r", state)
                            exit(1)
                    elif msg['event'] == 'stdout':
                        logger.info("container msg: %r", msg)
                        if msg['payload'] == 'eof':
                            outeof = True
                    elif msg['event'] == 'stderr':
                        logger.info("container msg: %r", msg)
                        if msg['payload'] == 'eof':
                            erreof = True
                    elif msg['event'] == 'exit':
                        if state == 'started':
                            state = 'exiting'
                            exitmsg = msg
                        else:
                            logger.info("unexpected exit message: %r", msg)
104 105 106 107 108 109 110 111 112 113 114
                    elif msg['event'] == 'process_start':
                        if state == 'init':
                            state = 'started'
                            logger.info("process started in existing "
                                        "container: %r""", msg)
                        else:
                            logger.info("unexpected start message: %r", state)
                            exit(1)
                    elif msg['event'] == 'process_exit':
                            logger.info("process ended: %r", msg)
                            break
115 116 117 118
            if outeof and erreof and state == 'exiting':
                state = 'exit'
                logger.info("container ended: %r", exitmsg)
                break
119

120 121 122 123 124 125 126 127 128 129
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

        The NRM should respond to us on the pub socket with one message listing
        all containers."""

        command = {'command': 'list',
                   }

130
        self.upstream_pub_socket.send_json(command)
131 132
        while(True):
            msg = self.upstream_sub_socket.recv_json()
133
            logger.info("new message: %r", msg)
134 135 136
            # ignore other messages
            if isinstance(msg, dict) and msg.get('type') == 'container':
                if msg['event'] == 'list':
137
                    logger.info("list response: %r", msg)
138 139
                    break

140 141 142 143 144 145 146 147 148 149
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

        The NRM should respond to us on the pub socket with a message
        containing the exit status of the top process of the container."""

        command = {'command': 'kill',
                   'uuid': argv.uuid
                   }

150
        self.upstream_pub_socket.send_json(command)
151 152
        while(True):
            msg = self.upstream_sub_socket.recv_json()
153
            logger.info("new message: %r", msg)
154 155 156
            # ignore other messages
            if isinstance(msg, dict) and msg.get('type') == 'container':
                if msg['event'] == 'exit' and msg['uuid'] == argv.uuid:
157
                    logger.info("container exit: %r", msg)
158 159
                    break

160 161 162 163 164
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

        The NRM should answer on the pub socket with an acknowledgment."""

165 166 167 168 169 170 171 172
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
        command = {'command': 'setpower',
                   'limit': argv.limit,
                   }
173

174
        self.upstream_pub_socket.send_json(command)
175
        while(True):
176
            msg = self.upstream_sub_socket.recv_json()
177
            logger.info("new message: %r", msg)
178 179 180
            # ignore other messages
            if isinstance(msg, dict) and msg.get('type') == 'power':
                if msg['limit'] == argv.limit:
181
                    logger.info("command received by the daemon")
182
                    break
183 184 185 186 187 188 189 190 191 192

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
193 194 195 196
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
197 198 199
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
200 201
        parser_run.set_defaults(func=self.do_run)

202 203 204 205 206
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

207 208 209 210
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

211 212 213 214 215
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
216
        parser_setpower.add_argument("limit",
217 218 219
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
220

221 222
        args = parser.parse_args()
        if args.verbose:
223
            logger.setLevel(logging.DEBUG)
224 225 226 227 228 229 230 231 232

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()