diff --git a/nrm/daemon.py b/nrm/daemon.py index e32591e3c13795b114ffc74cbcb60c74f71bf8f1..53f5f061a1e1118dd5232f9b913c330cde1d1430 100644 --- a/nrm/daemon.py +++ b/nrm/daemon.py @@ -96,32 +96,30 @@ class Daemon(object): self.logger.info("application now in state: " + application.state) + def do_upstream_receive(self, msg): + self.logger.info("receiving upstream message: %r", msg) + self.target = int(msg.split()[1]) + self.logger.info("target measure: %g", self.target) + def do_sensor(self): self.machine_info = self.sensor.do_update() - self.logger.info("current state: %r" % self.machine_info) + self.logger.info("current state: %r", self.machine_info) + total_power = self.machine_info['energy']['power']['total'] + msg = "23.45 %g" % (total_power) + self.upstream_pub.send_string(msg) + self.logger.info("Sending power values: %r", msg) def do_control(self): total_power = self.machine_info['energy']['power']['total'] - # Publish the power values so that GRM can pick it up - nrm_topic = "23.45" - self.nrm_publish_sock.send("%s %s" % (nrm_topic, str(total_power))) - self.logger.info("Sending power values: " + "%s %s" % (nrm_topic, str(total_power))) - - # Subcribe to the topic for new power allocations published by the GRM - # self.target = random.randrange(0, 34) - string = self.grm_publish_sock.recv() - topic, self.target = string.split() - self.logger.info("target measure: " + str(self.target)) - for identity, application in self.applications.iteritems(): if total_power < self.target: if 'i' in application.get_allowed_requests(): - self.stream.send_multipart([identity, 'i']) + self.downstream.send_multipart([identity, 'i']) application.do_transition('i') elif total_power > self.target: if 'd' in application.get_allowed_requests(): - self.stream.send_multipart([identity, 'd']) + self.downstream.send_multipart([identity, 'd']) application.do_transition('d') else: pass @@ -135,37 +133,45 @@ class Daemon(object): ioloop.IOLoop.current().stop() def main(self): - # read config for the port numbers + # Bind port for downstream clients bind_port = 1234 - # NRM publish port is the port to which NRM publishes node power info - nrm_publish_port = 2345 - # GRM publish port is the port to which GRM publishes the new power allocation - grm_publish_port = 3456 - + # Bind address for downstream clients bind_address = '*' + # PUB port for upstream clients + upstream_pub_port = 2345 + # SUB port for upstream clients + upstream_sub_port = 3456 # setup application listening socket context = zmq.Context() - socket = context.socket(zmq.STREAM) - self.nrm_publish_sock = context.socket(zmq.PUB) - self.grm_publish_sock = context.socket(zmq.SUB) - - bind_param = "tcp://%s:%d" % (bind_address, bind_port) - nrm_bind_param = "tcp://%s:%d" % (bind_address, nrm_publish_port) - grm_bind_param = "tcp://localhost:%d" % (grm_publish_port) - - socket.bind(bind_param) - self.nrm_publish_sock.bind(nrm_bind_param) - self.grm_publish_sock.connect(grm_bind_param) - grm_filter = "34.56 " - self.grm_publish_sock.setsockopt(zmq.SUBSCRIBE, grm_filter) - - self.logger.info("socket bound to: " + bind_param) - self.logger.info("NRM publish socket bound to: " + nrm_bind_param) - self.logger.info("GRM publish socket connected to: " + grm_bind_param) - - self.stream = zmqstream.ZMQStream(socket) - self.stream.on_recv(self.do_application_receive) + downstream_socket = context.socket(zmq.STREAM) + upstream_pub_socket = context.socket(zmq.PUB) + upstream_sub_socket = context.socket(zmq.SUB) + + downstream_bind_param = "tcp://%s:%d" % (bind_address, bind_port) + upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port) + upstream_sub_param = "tcp://localhost:%d" % (upstream_sub_port) + + downstream_socket.bind(downstream_bind_param) + upstream_pub_socket.bind(upstream_pub_param) + upstream_sub_socket.connect(upstream_sub_param) + upstream_sub_filter = "34.56 " + upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter) + + self.logger.info("downstream socket bound to: %s", + downstream_bind_param) + self.logger.info("upstream pub socket bound to: %s", + upstream_pub_param) + self.logger.info("upstream sub socket connected to: %s", + upstream_sub_param) + + # register socket triggers + self.downstream = zmqstream.ZMQStream(downstream_socket) + self.downstream.on_recv(self.do_application_receive) + self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket) + self.upstream_sub.on_recv(self.do_upstream_receive) + # create a stream to let ioloop deal with blocking calls on HWM + self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket) # create sensor manager and make first measurement self.sensor = sensor.SensorManager()