app 5.07 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
#!/usr/bin/env python2

from __future__ import print_function
import argparse
import json
import logging
import os
import signal
import time
import uuid
import zmq
from zmq.eventloop import ioloop, zmqstream

logger = logging.getLogger('nrm-dummy-application')


class DownstreamApplication(object):

    """Implements a downstream client."""

    def __init__(self):
        pass

    def do_signal(self, signum, frame):
        ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)

    def do_shutdown(self):
        update = {'type': 'application',
                  'event': 'exit',
                  'uuid': self.app_uuid,
                  }
        self.downstream_pub.send_json(update)
        ioloop.IOLoop.current().stop()

    def do_downstream_receive(self, parts):
        logger.info("receiving message from downstream: %r", parts)
        if len(parts) != 1:
            logger.error("unexpected msg length, dropping it")
            return
        msg = json.loads(parts[0])
        if isinstance(msg, dict):
            uuid = msg['uuid']
            if uuid != self.app_uuid:
                return

            command = msg.get('command')
            if command is None:
                logger.error("missing command in message")
                return
            elif command == 'threads':
                newth = msg['payload']
                if newth >= 1 and newth <= self.max:
                    self.nt = newth
                update = {'type': 'application',
                          'event': 'threads',
                          'payload': self.nt,
                          'uuid': self.app_uuid,
                          }
                self.downstream_pub.send_json(update)
            elif command == 'exit':
                self.do_shutdown()
            else:
                logger.error("bad command")
                return

    def do_progress_report(self):
        now = time.time()
        seconds = now - self.last_update
        ratio = float(self.nt)/float(self.max)
        progress = seconds*ratio*42
        update = {'type': 'application',
                  'event': 'progress',
                  'payload': progress,
                  'uuid': self.app_uuid,
                  }
        self.downstream_pub.send_json(update)
        self.last_update = now

    def setup(self):
        context = zmq.Context()
        downstream_pub_socket = context.socket(zmq.PUB)
        downstream_sub_socket = context.socket(zmq.SUB)

        downstream_pub_param = "ipc:///tmp/nrm-downstream-in"
        downstream_sub_param = "ipc:///tmp/nrm-downstream-out"

        downstream_pub_socket.connect(downstream_pub_param)
        downstream_sub_socket.connect(downstream_sub_param)
        # we want to receive everything for now
        downstream_sub_filter = ""
        downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)

        logger.info("downstream pub socket connected to: %s",
                    downstream_pub_param)
        logger.info("downstream sub socket connected to: %s",
                    downstream_sub_param)

        # link sockets to events
        self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
        self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
        self.downstream_sub.on_recv(self.do_downstream_receive)

        # take care of signals
        signal.signal(signal.SIGINT, self.do_signal)

        # periodic update on progress
        self.progress = ioloop.PeriodicCallback(self.do_progress_report, 1000)
        self.progress.start()

        # retrieve our container uuid
111
        self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
        if self.container_uuid is None:
            logger.error("missing container uuid")
            exit(1)
        self.app_uuid = str(uuid.uuid4())
        logger.info("client uuid: %r", self.app_uuid)

        # send an hello to the demon
        update = {'type': 'application',
                  'event': 'start',
                  'container': self.container_uuid,
                  'uuid': self.app_uuid,
                  'progress': True,
                  'threads': {'min': 1, 'cur': self.nt, 'max': self.max},
                  }
        self.downstream_pub.send_json(update)

    def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-v", "--verbose",
                            help="verbose logging information",
                            action='store_true')
        parser.add_argument("threads", help="starting number of threads",
                            type=int, default=16)
        parser.add_argument("maxthreads", help="max number of threads",
                            type=int, default=32)
        args = parser.parse_args()

        # deal with logging
        if args.verbose:
            logger.setLevel(logging.DEBUG)
        self.nt = args.threads
        self.max = args.maxthreads

        self.setup()
        self.last_update = time.time()
        ioloop.IOLoop.current().start()


if __name__ == "__main__":
    ioloop.install()
    logging.basicConfig(level=logging.INFO)
    app = DownstreamApplication()
    app.main()