Commit 66e4c85d authored by Swann Perarnau's avatar Swann Perarnau

[feature] Replace client with dummy application

This patch replace the client code (bin/client and nrm/client) by a new
application code that integrates progress reports and uses the new
downstream API.

While git is reporting that both codes are different, the app code is
basically a refactoring and adaptation of the client code.

This is directly related to issue #2.
parent f43a38d3
#!/usr/bin/env python2
from __future__ import print_function
import argparse
import json
import logging
import os
import signal
import time
import uuid
import zmq
from zmq.eventloop import ioloop, zmqstream
logger = logging.getLogger('nrm-dummy-application')
class DownstreamApplication(object):
"""Implements a downstream client."""
def __init__(self):
pass
def do_signal(self, signum, frame):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def do_shutdown(self):
update = {'type': 'application',
'event': 'exit',
'uuid': self.app_uuid,
}
self.downstream_pub.send_json(update)
ioloop.IOLoop.current().stop()
def do_downstream_receive(self, parts):
logger.info("receiving message from downstream: %r", parts)
if len(parts) != 1:
logger.error("unexpected msg length, dropping it")
return
msg = json.loads(parts[0])
if isinstance(msg, dict):
uuid = msg['uuid']
if uuid != self.app_uuid:
return
command = msg.get('command')
if command is None:
logger.error("missing command in message")
return
elif command == 'threads':
newth = msg['payload']
if newth >= 1 and newth <= self.max:
self.nt = newth
update = {'type': 'application',
'event': 'threads',
'payload': self.nt,
'uuid': self.app_uuid,
}
self.downstream_pub.send_json(update)
elif command == 'exit':
self.do_shutdown()
else:
logger.error("bad command")
return
def do_progress_report(self):
now = time.time()
seconds = now - self.last_update
ratio = float(self.nt)/float(self.max)
progress = seconds*ratio*42
update = {'type': 'application',
'event': 'progress',
'payload': progress,
'uuid': self.app_uuid,
}
self.downstream_pub.send_json(update)
self.last_update = now
def setup(self):
context = zmq.Context()
downstream_pub_socket = context.socket(zmq.PUB)
downstream_sub_socket = context.socket(zmq.SUB)
downstream_pub_param = "ipc:///tmp/nrm-downstream-in"
downstream_sub_param = "ipc:///tmp/nrm-downstream-out"
downstream_pub_socket.connect(downstream_pub_param)
downstream_sub_socket.connect(downstream_sub_param)
# we want to receive everything for now
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
logger.info("downstream pub socket connected to: %s",
downstream_pub_param)
logger.info("downstream sub socket connected to: %s",
downstream_sub_param)
# link sockets to events
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
self.downstream_sub.on_recv(self.do_downstream_receive)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
# periodic update on progress
self.progress = ioloop.PeriodicCallback(self.do_progress_report, 1000)
self.progress.start()
# retrieve our container uuid
self.container_uuid = os.environ.get('container')
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
self.app_uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon
update = {'type': 'application',
'event': 'start',
'container': self.container_uuid,
'uuid': self.app_uuid,
'progress': True,
'threads': {'min': 1, 'cur': self.nt, 'max': self.max},
}
self.downstream_pub.send_json(update)
def main(self):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
args = parser.parse_args()
# deal with logging
if args.verbose:
logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
self.setup()
self.last_update = time.time()
ioloop.IOLoop.current().start()
if __name__ == "__main__":
ioloop.install()
logging.basicConfig(level=logging.INFO)
app = DownstreamApplication()
app.main()
#!/usr/bin/env python2
from __future__ import print_function
import nrm
import nrm.client
if __name__ == "__main__":
nrm.client.runner()
from __future__ import print_function
import argparse
import logging
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
logger = logging.getLogger('nrm-client')
class Client(object):
def __init__(self):
self.buf = ''
self.nt = 16
self.max = 32
self.server = None
def setup_shutdown(self):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def get_server_message(self):
buf = self.buf
begin = 0
ret = ''
while begin < len(buf):
if buf[begin] in ['d', 'i', 'n', 'q']:
ret = buf[begin]
off = 1
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
def do_receive(self, parts):
logger.info("receive stream: " + repr(parts))
if len(parts[1]) == 0:
if self.server:
# server disconnect, lets quit
self.setup_shutdown()
return
else:
self.server = parts[0]
self.buf = self.buf + parts[1]
for m in self.get_server_message():
logger.info(m)
if m == 'd':
if self.nt == 1:
ret = "min"
else:
self.nt -= 1
ret = "done (%d)" % self.nt
elif m == 'i':
if self.nt == self.max:
ret = "max"
else:
self.nt += 1
ret = "done (%d)" % self.nt
elif m == 'n':
ret = "%d" % self.nt
elif m == 'q':
ret = ''
self.setup_shutdown()
self.stream.send(self.server, zmq.SNDMORE)
self.stream.send(ret)
def do_signal(self, signum, frame):
logger.critical("received signal: " + repr(signum))
self.setup_shutdown()
def do_shutdown(self):
ioloop.IOLoop.current().stop()
def main(self):
# command line options
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
args = parser.parse_args()
# deal with logging
if args.verbose:
logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
# read env variables for connection
connect_addr = "localhost"
connect_port = 1234
connect_param = "tcp://%s:%d" % (connect_addr, connect_port)
# create connection
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect(connect_param)
logger.info("connected to: " + connect_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_receive)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
ioloop.IOLoop.current().start()
def runner():
ioloop.install()
logging.basicConfig(level=logging.INFO)
client = Client()
client.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment