cmd 9.89 KB
Newer Older
Swann Perarnau's avatar
Swann Perarnau committed
1 2 3 4 5 6
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import logging
import signal
7
import os
8
import nrm.messaging
9
import uuid
Valentin Reis's avatar
Valentin Reis committed
10
import sys
11
import time
Valentin Reis's avatar
Valentin Reis committed
12
import collections
Swann Perarnau's avatar
Swann Perarnau committed
13

14 15
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
Valentin Reis's avatar
Valentin Reis committed
16
KillArgs = collections.namedtuple("Kill", ["uuid"])
17

Swann Perarnau's avatar
Swann Perarnau committed
18 19 20 21 22 23

class CommandLineInterface(object):

    """Implements a command line interface to the NRM."""

    def __init__(self):
24
        pass
Swann Perarnau's avatar
Swann Perarnau committed
25

Valentin Reis's avatar
Valentin Reis committed
26 27 28 29
    def do_signal(self, uuid, signum, stackframe):
        if uuid:
            logger.info("received signal %d, killing the application..",
                        signum)
30 31 32 33 34 35 36
            command = {'api': 'up_rpc_req',
                       'type': 'kill',
                       'container_uuid': uuid
                       }
            msg = RPC_MSG['kill'](**command)
            self.client.sendmsg(msg)
            logger.info("killed the application, exiting.")
Valentin Reis's avatar
Valentin Reis committed
37 38
        else:
            logger.info("received signal %d, exiting", signum)
39
        exit(130)
Swann Perarnau's avatar
Swann Perarnau committed
40 41

    def setup(self):
42 43 44 45
        # upstream RPC port
        upstream_client_port = 3456
        upstream_client_param = "tcp://localhost:%d" % (upstream_client_port)
        self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
Swann Perarnau's avatar
Swann Perarnau committed
46 47

        # take care of signals
Valentin Reis's avatar
Valentin Reis committed
48 49 50
        def handler(signum, frame):
            self.do_signal(None, signum, frame)
        signal.signal(signal.SIGINT, handler)
Swann Perarnau's avatar
Swann Perarnau committed
51

52
        self.client.wait_connected()
Swann Perarnau's avatar
Swann Perarnau committed
53

54 55 56 57 58 59 60 61 62
    def do_listen(self, argv):
        """ Connect to the NRM and listen for pub/sub messages."""
        upstream_pub_port = 2345
        upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
        self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
        self.pub_client.wait_connected()

        while(True):
            msg = self.pub_client.recvmsg()
63 64 65 66 67
            logger.debug("pub message: %s", msg)

            def print_if_filter():
                if argv.filter:
                    if argv.filter == msg.type:
68 69 70 71 72 73 74
                        if (msg.type == "performance" or
                           msg.type == "progress"):
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.payload))
                        if msg.type == "power":
                            print("%s, %s, %s" % (msg.type, time.time(),
                                                  msg.total))
75 76 77 78 79
                        sys.stdout.flush()
                else:
                    print("%s, %s" % (msg.type, time.time()))
                    sys.stdout.flush()

80 81 82 83 84 85 86
            print_if_filter()
            # if argv.uuid:
            #     uuid = getattr(msg, 'container_uuid', None)
            #     if argv.uuid == uuid or msg.type == "power":
            #         print_if_filter()
            # else:
            #     print_if_filter()
87

Swann Perarnau's avatar
Swann Perarnau committed
88
    def do_run(self, argv):
89 90 91
        """ Connect to the NRM and ask to spawn a container and run a command
        in it.

92
        The NRM should reply for container info."""
93 94 95 96

        # build the command as a JSON dict containing enough info. We add to
        # the command a container uuid as a way to make sure that we can make
        # the command idempotent.
97
        environ = os.environ
98
        container_uuid = argv.ucontainername or str(uuid.uuid4())
99 100 101 102 103 104 105 106 107

        # simple check + error msg + non-zero return code
        def sanitize_manifest(path):
            if os.path.isfile(path):
                return(os.path.abspath(path))
            else:
                logger.error("Manifest file not found: %s", path)
                sys.exit(1)

108 109
        command = {'api': 'up_rpc_req',
                   'type': 'run',
110
                   'manifest': sanitize_manifest(argv.manifest),
111
                   'path': argv.command,
112
                   'args': argv.args,
113
                   'environ': dict(environ),
114
                   'container_uuid': container_uuid,
115
                   }
116
        msg = RPC_MSG['run'](**command)
117 118 119 120 121
        # command fsm
        state = 'init'
        outeof = False
        erreof = False
        exitmsg = None
122
        self.client.sendmsg(msg)
123 124 125 126

        # the first message tells us if we started a container or not
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
127
        assert msg.type == 'process_start'
Valentin Reis's avatar
Valentin Reis committed
128 129 130 131 132

        def handler(signum, frame):
            self.do_signal(msg.container_uuid, signum, frame)
        signal.signal(signal.SIGINT, handler)

133
        state = 'started'
134
        while(True):
135 136
            msg = self.client.recvmsg()
            assert msg.api == 'up_rpc_rep'
137
            assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
138

139
            if msg.type == 'stdout':
140
                logger.debug("container msg: %r", msg)
141 142
                if msg.payload == 'eof':
                    outeof = True
143 144
                else:
                    print(msg.payload)
145
            elif msg.type == 'stderr':
146
                logger.debug("container msg: %r", msg)
147 148
                if msg.payload == 'eof':
                    erreof = True
149 150
                else:
                    print(msg.payload, file=sys.stderr)
151 152
            elif msg.type == 'process_exit':
                logger.info("process ended: %r", msg)
153 154 155 156
                state = 'exiting'
                exitmsg = msg
            else:
                logger.error("unexpected message: %r", msg)
157 158
            if outeof and erreof and state == 'exiting':
                state = 'exit'
159
                logger.info("command ended: %r", exitmsg)
Valentin Reis's avatar
Valentin Reis committed
160
                sys.exit(int(exitmsg.status))
161
                break
Swann Perarnau's avatar
Swann Perarnau committed
162

163 164 165 166
    def do_list(self, argv):
        """Connect to the NRM and ask to list the containers present on the
        system.

167 168
        The NRM should respond to us with one message listing all
        containers."""
169

170 171 172 173 174 175 176 177
        command = {'api': 'up_rpc_req',
                   'type': 'list'}
        msg = RPC_MSG['list'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'list'
        logger.info("list response: %r", msg)
178

Swann Perarnau's avatar
Swann Perarnau committed
179 180 181
    def do_kill(self, argv):
        """Connect to the NRM and ask to kill a container by uuid.

182 183
        The NRM should respond to us with a message containing the exit status
        of the top process of the container."""
Swann Perarnau's avatar
Swann Perarnau committed
184

185 186 187
        command = {'api': 'up_rpc_req',
                   'type': 'kill',
                   'container_uuid': argv.uuid
Swann Perarnau's avatar
Swann Perarnau committed
188
                   }
189 190 191 192 193 194
        msg = RPC_MSG['kill'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'exit'
        logger.info("container exit: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
195

Swann Perarnau's avatar
Swann Perarnau committed
196 197 198
    def do_setpower(self, argv):
        """ Connect to the NRM and ask to change the power limit.

199
        The NRM should answer with an acknowledgment."""
Swann Perarnau's avatar
Swann Perarnau committed
200

201 202 203 204 205
        # build the command as a JSON dict giving enough info. This is an
        # idempotent command, so we will repeat the command if we don't get a
        # timely answer.
        # TODO: check that the level makes a little bit of sense in the first
        # place
206 207 208
        command = {'api': 'up_rpc_req',
                   'type': 'setpower',
                   'limit': str(argv.limit),
209
                   }
210 211 212 213 214 215
        msg = RPC_MSG['setpower'](**command)
        self.client.sendmsg(msg)
        msg = self.client.recvmsg()
        assert msg.api == 'up_rpc_rep'
        assert msg.type == 'getpower'
        logger.info("command received by the daemon: %r", msg)
Swann Perarnau's avatar
Swann Perarnau committed
216 217 218 219 220 221 222 223 224 225

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        subparsers = parser.add_subparsers()

        # run container
        parser_run = subparsers.add_parser("run")
226 227 228 229
        parser_run.add_argument("manifest", help="manifest file to apply")
        parser_run.add_argument("command", help="command to execute")
        parser_run.add_argument("args", help="command arguments",
                                nargs=argparse.REMAINDER)
230 231 232
        parser_run.add_argument("-u", "--ucontainername", help="""user-specified
                                name for container used to attach proceses""",
                                nargs='?', const=None, default=None)
Swann Perarnau's avatar
Swann Perarnau committed
233 234
        parser_run.set_defaults(func=self.do_run)

Swann Perarnau's avatar
Swann Perarnau committed
235 236 237 238 239
        # kill container
        parser_kill = subparsers.add_parser("kill")
        parser_kill.add_argument("uuid", help="uuid of the container")
        parser_kill.set_defaults(func=self.do_kill)

240 241 242 243
        # list containers
        parser_list = subparsers.add_parser("list")
        parser_list.set_defaults(func=self.do_list)

244 245 246 247 248
        # listen
        parser_listen = subparsers.add_parser("listen")
        parser_listen.add_argument("-u", "--uuid",
                                   help="container uuid to listen for",
                                   default=None)
249 250 251 252
        parser_listen.add_argument("-f", "--filter",
                                   help="type of message to filter and \
                                        \ prettyprint, in {power,performance}",
                                   default=None)
253 254
        parser_listen.set_defaults(func=self.do_listen)

Swann Perarnau's avatar
Swann Perarnau committed
255 256 257 258 259
        # setpowerlimit
        parser_setpower = subparsers.add_parser("setpower")
        parser_setpower.add_argument("-f", "--follow",
                                     help="listen for power changes",
                                     action='store_true')
260
        parser_setpower.add_argument("limit",
Swann Perarnau's avatar
Swann Perarnau committed
261 262 263
                                     help="set new power limit",
                                     type=float)
        parser_setpower.set_defaults(func=self.do_setpower)
264

Swann Perarnau's avatar
Swann Perarnau committed
265 266
        args = parser.parse_args()
        if args.verbose:
267
            logger.setLevel(logging.DEBUG)
Swann Perarnau's avatar
Swann Perarnau committed
268 269 270 271 272 273 274 275 276

        self.setup()
        args.func(args)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    cli = CommandLineInterface()
    cli.main()