GitLab maintenance scheduled for Today, 2019-12-05, from 17:00 to 18:00 CT - Services will be unavailable during this time.

Commit 7532b063 authored by Swann Perarnau's avatar Swann Perarnau

[refactor] Move client/daemon code into package

The previous code was entirely inside the bin directory, which is not a
good idea in the long term.

This patch move everything inside the nrm package, so that we can start
building a proper code.
parent 86409f88
#!/usr/bin/env python2
from __future__ import print_function
import argparse
import logging
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
class Client(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.buf = ''
self.nt = 16
self.max = 32
self.server = None
def setup_shutdown(self):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def get_server_message(self):
buf = self.buf
begin = 0
ret = ''
while begin < len(buf):
if buf[begin] in ['d', 'i', 'n', 'q']:
ret = buf[begin]
off = 1
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
def do_receive(self, parts):
self.logger.info("receive stream: " + repr(parts))
if len(parts[1]) == 0:
if self.server:
# server disconnect, lets quit
self.setup_shutdown()
return
else:
self.server = parts[0]
self.buf = self.buf + parts[1]
for m in self.get_server_message():
self.logger.info(m)
if m == 'd':
if self.nt == 1:
ret = "min"
else:
self.nt -= 1
ret = "done (%d)" % self.nt
elif m == 'i':
if self.nt == self.max:
ret = "max"
else:
self.nt += 1
ret = "done (%d)" % self.nt
elif m == 'n':
ret = "%d" % self.nt
elif m == 'q':
ret = ''
self.setup_shutdown()
self.stream.send(self.server, zmq.SNDMORE)
self.stream.send(ret)
def do_signal(self, signum, frame):
self.logger.critical("received signal: " + repr(signum))
self.setup_shutdown()
def do_shutdown(self):
ioloop.IOLoop.current().stop()
def main(self):
# command line options
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
args = parser.parse_args()
# deal with logging
if args.verbose:
self.logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
# read env variables for connection
connect_addr = "localhost"
connect_port = 1234
connect_param = "tcp://%s:%d" % (connect_addr, connect_port)
# create connection
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect(connect_param)
self.logger.info("connected to: " + connect_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_receive)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
ioloop.IOLoop.current().start()
import nrm
import nrm.client
if __name__ == "__main__":
ioloop.install()
logging.basicConfig(level=logging.INFO)
client = Client()
client.main()
nrm.client.runner()
#!/usr/bin/env python2
from __future__ import print_function
import logging
import random
import re
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
client_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'max': 'max'},
's_ask_d': {'done': 'stable', 'min': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'min': 'nop'},
'min_ask_i': {'done': 'stable', 'max': 'nop'},
'nop': {}}
class Client(object):
def __init__(self, identity):
self.identity = identity
self.buf = ''
self.state = 'stable'
def append_buffer(self, msg):
self.buf = self.buf + msg
def do_transition(self, msg):
transitions = client_fsm_table[self.state]
if msg in transitions:
self.state = transitions[msg]
else:
pass
def get_allowed_requests(self):
return client_fsm_table[self.state].keys()
def get_messages(self):
buf = self.buf
begin = 0
off = 0
ret = ''
while begin < len(buf):
if buf.startswith('min', begin):
ret = 'min'
off = len(ret)
elif buf.startswith('max', begin):
ret = 'max'
off = len(ret)
elif buf.startswith('done (', begin):
n = re.split("done \((\d+)\)", buf[begin:])[1]
ret = 'done'
off = len('done ()') + len(n)
else:
m = re.match("\d+", buf[begin:])
if m:
ret = 'ok'
off = m.end()
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
class Daemon(object):
def __init__(self):
self.clients = {}
self.buf = ''
self.logger = logging.getLogger(__name__)
self.current = 1
self.target = 1
def do_client_receive(self, parts):
self.logger.info("receiving client stream: " + repr(parts))
identity = parts[0]
if len(parts[1]) == 0:
# empty frame, indicate connect/disconnect
if identity in self.clients:
self.logger.info("known client disconnected")
del self.clients[identity]
else:
self.logger.info("new client: " + repr(identity))
self.clients[identity] = Client(identity)
else:
if identity in self.clients:
client = self.clients[identity]
# we need to unpack the stream into client messages
# messages can be: min, max, done (%d), %d
client.append_buffer(parts[1])
for m in client.get_messages():
client.do_transition(m)
self.logger.info("client now in state: " + client.state)
def do_sensor(self):
self.current = random.randrange(0, 34)
self.logger.info("current measure: " + str(self.current))
def do_control(self):
self.target = random.randrange(0, 34)
self.logger.info("target measure: " + str(self.target))
for identity, client in self.clients.iteritems():
if self.current < self.target:
if 'i' in client.get_allowed_requests():
self.stream.send_multipart([identity, 'i'])
client.do_transition('i')
elif self.current > self.target:
if 'd' in client.get_allowed_requests():
self.stream.send_multipart([identity, 'd'])
client.do_transition('d')
else:
pass
self.logger.info("client now in state: " + client.state)
def do_signal(self, signum, frame):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def do_shutdown(self):
ioloop.IOLoop.current().stop()
def main(self):
# read config
bind_port = 1234
bind_address = '*'
# setup listening socket
context = zmq.Context()
socket = context.socket(zmq.STREAM)
bind_param = "tcp://%s:%d" % (bind_address, bind_port)
socket.bind(bind_param)
self.logger.info("socket bound to: " + bind_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_client_receive)
self.sensor = ioloop.PeriodicCallback(self.do_sensor, 1000)
self.sensor.start()
self.control = ioloop.PeriodicCallback(self.do_control, 1000)
self.control.start()
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
ioloop.IOLoop.current().start()
import nrm
import nrm.daemon
if __name__ == "__main__":
ioloop.install()
logging.basicConfig(level=logging.DEBUG)
daemon = Daemon()
daemon.main()
nrm.daemon.runner()
from __future__ import print_function
import argparse
import logging
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
class Client(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.buf = ''
self.nt = 16
self.max = 32
self.server = None
def setup_shutdown(self):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def get_server_message(self):
buf = self.buf
begin = 0
ret = ''
while begin < len(buf):
if buf[begin] in ['d', 'i', 'n', 'q']:
ret = buf[begin]
off = 1
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
def do_receive(self, parts):
self.logger.info("receive stream: " + repr(parts))
if len(parts[1]) == 0:
if self.server:
# server disconnect, lets quit
self.setup_shutdown()
return
else:
self.server = parts[0]
self.buf = self.buf + parts[1]
for m in self.get_server_message():
self.logger.info(m)
if m == 'd':
if self.nt == 1:
ret = "min"
else:
self.nt -= 1
ret = "done (%d)" % self.nt
elif m == 'i':
if self.nt == self.max:
ret = "max"
else:
self.nt += 1
ret = "done (%d)" % self.nt
elif m == 'n':
ret = "%d" % self.nt
elif m == 'q':
ret = ''
self.setup_shutdown()
self.stream.send(self.server, zmq.SNDMORE)
self.stream.send(ret)
def do_signal(self, signum, frame):
self.logger.critical("received signal: " + repr(signum))
self.setup_shutdown()
def do_shutdown(self):
ioloop.IOLoop.current().stop()
def main(self):
# command line options
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
args = parser.parse_args()
# deal with logging
if args.verbose:
self.logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
# read env variables for connection
connect_addr = "localhost"
connect_port = 1234
connect_param = "tcp://%s:%d" % (connect_addr, connect_port)
# create connection
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect(connect_param)
self.logger.info("connected to: " + connect_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_receive)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
ioloop.IOLoop.current().start()
def runner():
ioloop.install()
logging.basicConfig(level=logging.INFO)
client = Client()
client.main()
from __future__ import print_function
import logging
import random
import re
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
client_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'max': 'max'},
's_ask_d': {'done': 'stable', 'min': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'min': 'nop'},
'min_ask_i': {'done': 'stable', 'max': 'nop'},
'nop': {}}
class Client(object):
def __init__(self, identity):
self.identity = identity
self.buf = ''
self.state = 'stable'
def append_buffer(self, msg):
self.buf = self.buf + msg
def do_transition(self, msg):
transitions = client_fsm_table[self.state]
if msg in transitions:
self.state = transitions[msg]
else:
pass
def get_allowed_requests(self):
return client_fsm_table[self.state].keys()
def get_messages(self):
buf = self.buf
begin = 0
off = 0
ret = ''
while begin < len(buf):
if buf.startswith('min', begin):
ret = 'min'
off = len(ret)
elif buf.startswith('max', begin):
ret = 'max'
off = len(ret)
elif buf.startswith('done (', begin):
n = re.split("done \((\d+)\)", buf[begin:])[1]
ret = 'done'
off = len('done ()') + len(n)
else:
m = re.match("\d+", buf[begin:])
if m:
ret = 'ok'
off = m.end()
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
class Daemon(object):
def __init__(self):
self.clients = {}
self.buf = ''
self.logger = logging.getLogger(__name__)
self.current = 1
self.target = 1
def do_client_receive(self, parts):
self.logger.info("receiving client stream: " + repr(parts))
identity = parts[0]
if len(parts[1]) == 0:
# empty frame, indicate connect/disconnect
if identity in self.clients:
self.logger.info("known client disconnected")
del self.clients[identity]
else:
self.logger.info("new client: " + repr(identity))
self.clients[identity] = Client(identity)
else:
if identity in self.clients:
client = self.clients[identity]
# we need to unpack the stream into client messages
# messages can be: min, max, done (%d), %d
client.append_buffer(parts[1])
for m in client.get_messages():
client.do_transition(m)
self.logger.info("client now in state: " + client.state)
def do_sensor(self):
self.current = random.randrange(0, 34)
self.logger.info("current measure: " + str(self.current))
def do_control(self):
self.target = random.randrange(0, 34)
self.logger.info("target measure: " + str(self.target))
for identity, client in self.clients.iteritems():
if self.current < self.target:
if 'i' in client.get_allowed_requests():
self.stream.send_multipart([identity, 'i'])
client.do_transition('i')
elif self.current > self.target:
if 'd' in client.get_allowed_requests():
self.stream.send_multipart([identity, 'd'])
client.do_transition('d')
else:
pass
self.logger.info("client now in state: " + client.state)
def do_signal(self, signum, frame):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def do_shutdown(self):
ioloop.IOLoop.current().stop()
def main(self):
# read config
bind_port = 1234
bind_address = '*'
# setup listening socket
context = zmq.Context()
socket = context.socket(zmq.STREAM)
bind_param = "tcp://%s:%d" % (bind_address, bind_port)
socket.bind(bind_param)
self.logger.info("socket bound to: " + bind_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_client_receive)
self.sensor = ioloop.PeriodicCallback(self.do_sensor, 1000)
self.sensor.start()
self.control = ioloop.PeriodicCallback(self.do_control, 1000)
self.control.start()
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
ioloop.IOLoop.current().start()
def runner():
ioloop.install()
logging.basicConfig(level=logging.DEBUG)
daemon = Daemon()
daemon.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment