GitLab maintenance scheduled form Friday, 2021-06-18 5:00pm to Satursday, 2021-06-19 10:00pm CT - Services will be unavailable during this time.

Commit 19c9eb54 authored by Swann Perarnau's avatar Swann Perarnau

[feature] Implement skeleton downstream API

This patch refactors the downstream API to use pub/sub socket pair, like
the upstream API. This is part of the effort to improve the downstream
API. See #2.

This patch doesn't touch the client module, which will be adapted in
future commits.
parent be059812
...@@ -6,7 +6,6 @@ from functools import partial ...@@ -6,7 +6,6 @@ from functools import partial
import json import json
import logging import logging
import os import os
import re
import sensor import sensor
import signal import signal
import zmq import zmq
...@@ -28,12 +27,8 @@ logger = logging.getLogger('nrm') ...@@ -28,12 +27,8 @@ logger = logging.getLogger('nrm')
class Application(object): class Application(object):
def __init__(self, identity): def __init__(self, identity):
self.identity = identity self.identity = identity
self.buf = ''
self.state = 'stable' self.state = 'stable'
def append_buffer(self, msg):
self.buf = self.buf + msg
def do_transition(self, msg): def do_transition(self, msg):
transitions = application_fsm_table[self.state] transitions = application_fsm_table[self.state]
if msg in transitions: if msg in transitions:
...@@ -44,34 +39,6 @@ class Application(object): ...@@ -44,34 +39,6 @@ class Application(object):
def get_allowed_requests(self): def get_allowed_requests(self):
return application_fsm_table[self.state].keys() return application_fsm_table[self.state].keys()
def get_messages(self):
buf = self.buf
begin = 0
off = 0
ret = ''
while begin < len(buf):
if buf.startswith('min', begin):
ret = 'min'
off = len(ret)
elif buf.startswith('max', begin):
ret = 'max'
off = len(ret)
elif buf.startswith('done (', begin):
n = re.split("done \((\d+)\)", buf[begin:])[1]
ret = 'done'
off = len('done ()') + len(n)
else:
m = re.match("\d+", buf[begin:])
if m:
ret = 'ok'
off = m.end()
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
class Daemon(object): class Daemon(object):
def __init__(self): def __init__(self):
...@@ -79,28 +46,31 @@ class Daemon(object): ...@@ -79,28 +46,31 @@ class Daemon(object):
self.buf = '' self.buf = ''
self.target = 1.0 self.target = 1.0
def do_application_receive(self, parts): def do_downstream_receive(self, parts):
logger.info("receiving application stream: %r", parts) logger.info("receiving downstream message: %r", parts)
identity = parts[0] if len(parts) != 1:
logger.error("unexpected msg length, dropping it: %r", parts)
if len(parts[1]) == 0: return
# empty frame, indicate connect/disconnect msg = json.loads(parts[0])
if identity in self.applications: if isinstance(msg, dict):
logger.info("known client disconnected") msgtype = msg.get('type')
del self.applications[identity] event = msg.get('event')
else: if msgtype is None or msgtype != 'application' or event is None:
logger.info("new client: " + repr(identity)) logger.error("wrong message format: %r", msg)
return
if event == 'start':
logger.info("new application: %r", msg)
identity = msg['uuid']
self.applications[identity] = Application(identity) self.applications[identity] = Application(identity)
elif event == 'threads':
logger.info("change in threads")
application = self.applications[msg['uuid']]
application.do_transition(msg['payload'])
elif event == 'progress':
logger.info("new progress")
else: else:
if identity in self.applications: logger.error("unknown event: %r", event)
application = self.applications[identity] return
# we need to unpack the stream into application messages
# messages can be: min, max, done (%d), %d
application.append_buffer(parts[1])
for m in application.get_messages():
application.do_transition(m)
logger.info("application now in state: %s",
application.state)
def do_upstream_receive(self, parts): def do_upstream_receive(self, parts):
logger.info("receiving upstream message: %r", parts) logger.info("receiving upstream message: %r", parts)
...@@ -233,10 +203,9 @@ class Daemon(object): ...@@ -233,10 +203,9 @@ class Daemon(object):
ioloop.IOLoop.current().stop() ioloop.IOLoop.current().stop()
def main(self): def main(self):
# Bind port for downstream clients
bind_port = 1234
# Bind address for downstream clients # Bind address for downstream clients
bind_address = '*' bind_address = '*'
# PUB port for upstream clients # PUB port for upstream clients
upstream_pub_port = 2345 upstream_pub_port = 2345
# SUB port for upstream clients # SUB port for upstream clients
...@@ -244,31 +213,38 @@ class Daemon(object): ...@@ -244,31 +213,38 @@ class Daemon(object):
# setup application listening socket # setup application listening socket
context = zmq.Context() context = zmq.Context()
downstream_socket = context.socket(zmq.STREAM) downstream_pub_socket = context.socket(zmq.PUB)
downstream_sub_socket = context.socket(zmq.SUB)
upstream_pub_socket = context.socket(zmq.PUB) upstream_pub_socket = context.socket(zmq.PUB)
upstream_sub_socket = context.socket(zmq.SUB) upstream_sub_socket = context.socket(zmq.SUB)
downstream_bind_param = "tcp://%s:%d" % (bind_address, bind_port) downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port) upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port) upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
downstream_socket.bind(downstream_bind_param) downstream_pub_socket.bind(downstream_pub_param)
downstream_sub_socket.bind(downstream_sub_param)
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
upstream_pub_socket.bind(upstream_pub_param) upstream_pub_socket.bind(upstream_pub_param)
upstream_sub_socket.bind(upstream_sub_param) upstream_sub_socket.bind(upstream_sub_param)
upstream_sub_filter = "" upstream_sub_filter = ""
upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter) upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
logger.info("downstream socket bound to: %s", downstream_bind_param) logger.info("downstream pub socket bound to: %s", downstream_pub_param)
logger.info("downstream sub socket bound to: %s", downstream_sub_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param) logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param) logger.info("upstream sub socket connected to: %s", upstream_sub_param)
# register socket triggers # register socket triggers
self.downstream = zmqstream.ZMQStream(downstream_socket) self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
self.downstream.on_recv(self.do_application_receive) self.downstream_sub.on_recv(self.do_downstream_receive)
self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket) self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
self.upstream_sub.on_recv(self.do_upstream_receive) self.upstream_sub.on_recv(self.do_upstream_receive)
# create a stream to let ioloop deal with blocking calls on HWM # create a stream to let ioloop deal with blocking calls on HWM
self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket) self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
# create resource and container manager # create resource and container manager
self.resource_manager = ResourceManager() self.resource_manager = ResourceManager()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment