GitLab maintenance scheduled form Friday, 2021-06-18 5:00pm to Satursday, 2021-06-19 10:00pm CT - Services will be unavailable during this time.

Commit 1a8707f2 authored by Swann Perarnau's avatar Swann Perarnau

[refactor] Rename clients to applications

Giving the deadlines we have, it's probably a good idea to keep things
simple and keep the application facing protocol exactly as it is, and
restrict it to just applications. This way we can keep using the argobots
tests as valid benchmarks, and at the same start building a decent
communication protocol on a different socket, with a better interface.

To clarify that, the daemon now use the word application to refer to
clients connecting on the "legacy" interface.

We'll add a different socket and start building a real protocol in
future commits.
parent 7532b063
...@@ -7,17 +7,17 @@ import signal ...@@ -7,17 +7,17 @@ import signal
import zmq import zmq
from zmq.eventloop import ioloop, zmqstream from zmq.eventloop import ioloop, zmqstream
client_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'}, application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'max': 'max'}, 's_ask_i': {'done': 'stable', 'max': 'max'},
's_ask_d': {'done': 'stable', 'min': 'min'}, 's_ask_d': {'done': 'stable', 'min': 'min'},
'max': {'d': 'max_ask_d'}, 'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'}, 'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'min': 'nop'}, 'max_ask_d': {'done': 'stable', 'min': 'nop'},
'min_ask_i': {'done': 'stable', 'max': 'nop'}, 'min_ask_i': {'done': 'stable', 'max': 'nop'},
'nop': {}} 'nop': {}}
class Client(object): class Application(object):
def __init__(self, identity): def __init__(self, identity):
self.identity = identity self.identity = identity
self.buf = '' self.buf = ''
...@@ -27,14 +27,14 @@ class Client(object): ...@@ -27,14 +27,14 @@ class Client(object):
self.buf = self.buf + msg self.buf = self.buf + msg
def do_transition(self, msg): def do_transition(self, msg):
transitions = client_fsm_table[self.state] transitions = application_fsm_table[self.state]
if msg in transitions: if msg in transitions:
self.state = transitions[msg] self.state = transitions[msg]
else: else:
pass pass
def get_allowed_requests(self): def get_allowed_requests(self):
return client_fsm_table[self.state].keys() return application_fsm_table[self.state].keys()
def get_messages(self): def get_messages(self):
buf = self.buf buf = self.buf
...@@ -67,33 +67,34 @@ class Client(object): ...@@ -67,33 +67,34 @@ class Client(object):
class Daemon(object): class Daemon(object):
def __init__(self): def __init__(self):
self.clients = {} self.applications = {}
self.buf = '' self.buf = ''
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.current = 1 self.current = 1
self.target = 1 self.target = 1
def do_client_receive(self, parts): def do_application_receive(self, parts):
self.logger.info("receiving client stream: " + repr(parts)) self.logger.info("receiving application stream: " + repr(parts))
identity = parts[0] identity = parts[0]
if len(parts[1]) == 0: if len(parts[1]) == 0:
# empty frame, indicate connect/disconnect # empty frame, indicate connect/disconnect
if identity in self.clients: if identity in self.applications:
self.logger.info("known client disconnected") self.logger.info("known client disconnected")
del self.clients[identity] del self.applications[identity]
else: else:
self.logger.info("new client: " + repr(identity)) self.logger.info("new client: " + repr(identity))
self.clients[identity] = Client(identity) self.applications[identity] = Application(identity)
else: else:
if identity in self.clients: if identity in self.applications:
client = self.clients[identity] application = self.applications[identity]
# we need to unpack the stream into client messages # we need to unpack the stream into application messages
# messages can be: min, max, done (%d), %d # messages can be: min, max, done (%d), %d
client.append_buffer(parts[1]) application.append_buffer(parts[1])
for m in client.get_messages(): for m in application.get_messages():
client.do_transition(m) application.do_transition(m)
self.logger.info("client now in state: " + client.state) self.logger.info("application now in state: " +
application.state)
def do_sensor(self): def do_sensor(self):
self.current = random.randrange(0, 34) self.current = random.randrange(0, 34)
...@@ -103,18 +104,18 @@ class Daemon(object): ...@@ -103,18 +104,18 @@ class Daemon(object):
self.target = random.randrange(0, 34) self.target = random.randrange(0, 34)
self.logger.info("target measure: " + str(self.target)) self.logger.info("target measure: " + str(self.target))
for identity, client in self.clients.iteritems(): for identity, application in self.applications.iteritems():
if self.current < self.target: if self.current < self.target:
if 'i' in client.get_allowed_requests(): if 'i' in application.get_allowed_requests():
self.stream.send_multipart([identity, 'i']) self.stream.send_multipart([identity, 'i'])
client.do_transition('i') application.do_transition('i')
elif self.current > self.target: elif self.current > self.target:
if 'd' in client.get_allowed_requests(): if 'd' in application.get_allowed_requests():
self.stream.send_multipart([identity, 'd']) self.stream.send_multipart([identity, 'd'])
client.do_transition('d') application.do_transition('d')
else: else:
pass pass
self.logger.info("client now in state: " + client.state) self.logger.info("application now in state: " + application.state)
def do_signal(self, signum, frame): def do_signal(self, signum, frame):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown) ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
...@@ -127,15 +128,14 @@ class Daemon(object): ...@@ -127,15 +128,14 @@ class Daemon(object):
bind_port = 1234 bind_port = 1234
bind_address = '*' bind_address = '*'
# setup listening socket # setup application listening socket
context = zmq.Context() context = zmq.Context()
socket = context.socket(zmq.STREAM) socket = context.socket(zmq.STREAM)
bind_param = "tcp://%s:%d" % (bind_address, bind_port) bind_param = "tcp://%s:%d" % (bind_address, bind_port)
socket.bind(bind_param) socket.bind(bind_param)
self.logger.info("socket bound to: " + bind_param) self.logger.info("socket bound to: " + bind_param)
self.stream = zmqstream.ZMQStream(socket) self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_client_receive) self.stream.on_recv(self.do_application_receive)
self.sensor = ioloop.PeriodicCallback(self.do_sensor, 1000) self.sensor = ioloop.PeriodicCallback(self.do_sensor, 1000)
self.sensor.start() self.sensor.start()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment