Commit 96d0e610 authored by Michael Salim's avatar Michael Salim

ZMQ Message Interface

parent e7164b98
......@@ -35,4 +35,4 @@ class BalsamJobReceiver(MessageReceiver.MessageReceiver):
job.save()
del connections[db_connection_id]
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job)
status_sender.send_status(job, message='Receiver has added job to database')
......@@ -303,7 +303,6 @@ class BalsamJob(models.Model):
scheduler_id = models.IntegerField('Scheduler ID',help_text='The ID assigned this job after being submitted to the queue.',default=0)
# task attributes
# task_id specifies the task to run
application = models.TextField('Application to Run',help_text='This is the name of an application that lives in the database as an ApplicationDefinition.',default='')
config_file = models.TextField('Configuration File',help_text='This is the input file provided by the users which is used to configure the application. This may be options that are typically included on the command line. It depends on the applications.',default='')
......
......@@ -5,7 +5,8 @@ logger = logging.getLogger(__name__)
RECEIVER_MAP = {
'pika' : PikaMessageInterface.PikaMessageInterface,
'no_message' : NoMessageInterface.NoMessageInterface
'no_message' : NoMessageInterface.NoMessageInterface,
'zmq' : ZMQMessageInterface.ZMQMessageInterface
}
class MessageReceiver(multiprocessing.Process):
......@@ -39,15 +40,27 @@ class MessageReceiver(multiprocessing.Process):
logger.exception('failed to handle_msg. not continuing with this job')
channel.basic_ack(method_frame.delivery_tag)
return
else:
logger.error(' consume_msg called, but body is None ')
# should be some failure notice to argo
# acknowledge receipt of message
channel.basic_ack(method_frame.delivery_tag)
else:
logger.error(' consume_msg called, but body is None ')
# should be some failure notice to argo
# acknowledge receipt of message
channel.basic_ack(method_frame.delivery_tag)
def no_message_consume_msg(self):
pass
def zmq_consume_msg(self, body):
logger.debug(' in zmq_message_consume_msg')
if body:
logger.debug(' received ZMQmessage: ' + body)
try:
self.handle_msg(body)
except Exception as e:
logger.exception('failed to handle_msg. not continuing with this job')
return
else:
logger.error(' consume_msg called, but body is empty or None'))
def shutdown(self):
logger.debug(' stopping message consumer ')
self.messageInterface.stop_receive_loop()
import logging
logger = logging.getLogger(__name__)
import asyncio
import zmq
import time
from zmq.asyncio import Context
class ZMQMessageInterface(MessageInterface.MessageInterface):
def __init__(self, settings):
zmq.asyncio.install()
self.ctx = zmq.asyncio.Context()
self.sock_sub = None
self.sock_pub = None
self.default_routing_key = b''
self.host = 'tcp://127.0.0.1'
self.port = 5555
def setup_send(self):
self.sock_pub = self.ctx.socket(zmq.PUB)
self.sock_pub.bind('%s:%d' % (self.host, self.port))
time.sleep(1)
def setup_receive(self, consume_msg=None, routing_key=None):
if routing_key is None:
self.routing_key = self.default_routing_key
if consume_msg is not None:
self.consume_msg = consume_msg
self.sock_sub = self.ctx.socket(zmq.SUB)
self.sock_sub.connect('%s:%d' % (self.host, self.port))
self.sock_sub.subscribe(self.routing_key)
time.sleep(1)
def send_msg(self, message_body, routing_key=None):
if routing_key is None:
routing_key = self.default_routing_key
if isinstance(message_body, str):
message_body = message_body.encode('utf-8')
self.sock_pub.send(message_body)
def receive_msg(self):
msg = self.sock_sub.recv_multipart()
body = ''.join(s.decode('utf-8') for s in msg)
return body
def start_receive_loop(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._recv_loop())
loop.close()
async def _recv_loop(self):
while True:
msg = await self.sock_sub.recv_multipart()
body = ''.join(s.decode('utf-8') for s in msg)
self.consume_msg(body)
def stop_receive_loop(self):
pass
def close(self):
if self.sock_sub is not None:
self.sock_sub.close()
self.sock_sub = None
if self.sock_pub is not None:
self.sock_pub.close()
self.sock_pub = None
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment