Commit df67000b authored by Swann Perarnau's avatar Swann Perarnau

[refactor] Integrate GRM message flow properly

The previous commit added pub/sub communications in the wrong places,
creating synchronizations in an asynchronous event loop.

This commit fixes those issues, adding the upstream (GRM/Flux) flow to
the event loop, and renaming objects here and there for clarity.
parent d019a890
......@@ -96,32 +96,30 @@ class Daemon(object):
self.logger.info("application now in state: " +
application.state)
def do_upstream_receive(self, msg):
self.logger.info("receiving upstream message: %r", msg)
self.target = int(msg.split()[1])
self.logger.info("target measure: %g", self.target)
def do_sensor(self):
self.machine_info = self.sensor.do_update()
self.logger.info("current state: %r" % self.machine_info)
self.logger.info("current state: %r", self.machine_info)
total_power = self.machine_info['energy']['power']['total']
msg = "23.45 %g" % (total_power)
self.upstream_pub.send_string(msg)
self.logger.info("Sending power values: %r", msg)
def do_control(self):
total_power = self.machine_info['energy']['power']['total']
# Publish the power values so that GRM can pick it up
nrm_topic = "23.45"
self.nrm_publish_sock.send("%s %s" % (nrm_topic, str(total_power)))
self.logger.info("Sending power values: " + "%s %s" % (nrm_topic, str(total_power)))
# Subcribe to the topic for new power allocations published by the GRM
# self.target = random.randrange(0, 34)
string = self.grm_publish_sock.recv()
topic, self.target = string.split()
self.logger.info("target measure: " + str(self.target))
for identity, application in self.applications.iteritems():
if total_power < self.target:
if 'i' in application.get_allowed_requests():
self.stream.send_multipart([identity, 'i'])
self.downstream.send_multipart([identity, 'i'])
application.do_transition('i')
elif total_power > self.target:
if 'd' in application.get_allowed_requests():
self.stream.send_multipart([identity, 'd'])
self.downstream.send_multipart([identity, 'd'])
application.do_transition('d')
else:
pass
......@@ -135,37 +133,45 @@ class Daemon(object):
ioloop.IOLoop.current().stop()
def main(self):
# read config for the port numbers
# Bind port for downstream clients
bind_port = 1234
# NRM publish port is the port to which NRM publishes node power info
nrm_publish_port = 2345
# GRM publish port is the port to which GRM publishes the new power allocation
grm_publish_port = 3456
# Bind address for downstream clients
bind_address = '*'
# PUB port for upstream clients
upstream_pub_port = 2345
# SUB port for upstream clients
upstream_sub_port = 3456
# setup application listening socket
context = zmq.Context()
socket = context.socket(zmq.STREAM)
self.nrm_publish_sock = context.socket(zmq.PUB)
self.grm_publish_sock = context.socket(zmq.SUB)
bind_param = "tcp://%s:%d" % (bind_address, bind_port)
nrm_bind_param = "tcp://%s:%d" % (bind_address, nrm_publish_port)
grm_bind_param = "tcp://localhost:%d" % (grm_publish_port)
socket.bind(bind_param)
self.nrm_publish_sock.bind(nrm_bind_param)
self.grm_publish_sock.connect(grm_bind_param)
grm_filter = "34.56 "
self.grm_publish_sock.setsockopt(zmq.SUBSCRIBE, grm_filter)
self.logger.info("socket bound to: " + bind_param)
self.logger.info("NRM publish socket bound to: " + nrm_bind_param)
self.logger.info("GRM publish socket connected to: " + grm_bind_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_application_receive)
downstream_socket = context.socket(zmq.STREAM)
upstream_pub_socket = context.socket(zmq.PUB)
upstream_sub_socket = context.socket(zmq.SUB)
downstream_bind_param = "tcp://%s:%d" % (bind_address, bind_port)
upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port)
downstream_socket.bind(downstream_bind_param)
upstream_pub_socket.bind(upstream_pub_param)
upstream_sub_socket.connect(upstream_sub_param)
upstream_sub_filter = "34.56 "
upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
self.logger.info("downstream socket bound to: %s",
downstream_bind_param)
self.logger.info("upstream pub socket bound to: %s",
upstream_pub_param)
self.logger.info("upstream sub socket connected to: %s",
upstream_sub_param)
# register socket triggers
self.downstream = zmqstream.ZMQStream(downstream_socket)
self.downstream.on_recv(self.do_application_receive)
self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
self.upstream_sub.on_recv(self.do_upstream_receive)
# create a stream to let ioloop deal with blocking calls on HWM
self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
# create sensor manager and make first measurement
self.sensor = sensor.SensorManager()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment