MessageReceiver.py 2.41 KB
Newer Older
1
from balsam.common import PikaMessageInterface, NoMessageInterface
jtchilders's avatar
jtchilders committed
2 3 4 5
from django.conf import settings
import logging,sys,multiprocessing,time,os
logger = logging.getLogger(__name__)

6 7
RECEIVER_MAP = {
    'pika' : PikaMessageInterface.PikaMessageInterface,
Michael Salim's avatar
Michael Salim committed
8 9
    'no_message' : NoMessageInterface.NoMessageInterface,
    'zmq' : ZMQMessageInterface.ZMQMessageInterface
10
}
jtchilders's avatar
jtchilders committed
11 12

class MessageReceiver(multiprocessing.Process):
13 14 15 16 17 18 19 20 21 22 23 24 25 26
    ''' subscribes to a queue and executes the given callback'''
    
    def __init__(self, settings):
        # execute multiprocessing.Process superconstructor
        super(MessageReceiver,self).__init__()

        receiver_mode = settings['mode']
        MessageClass = RECEIVER_MAP[receiver_mode]
        self.messageInterface = MessageClass(settings)
        self.consume_msg = getattr(self, '%s_consume_msg' % receiver_mode)

    def handle_msg(self, msg_body):
        '''This handles the message in a protocol-independent way'''
        raise NotImplementedError
jtchilders's avatar
jtchilders committed
27
   
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
    def run(self):
       logger.debug(' in run ')
       self.messageInterface.setup_receive(self.consume_msg)
       self.messageInterface.start_receive_loop()

    def pika_consume_msg(self,channel,method_frame,header_frame,body):
       logger.debug('in pika_consume_msg' )
       if body is not None:
           logger.debug(' received message: ' + body )
           try:
               self.handle_msg(body)
           except Exception as e:
               logger.exception('failed to handle_msg. not continuing with this job')
               channel.basic_ack(method_frame.delivery_tag)
               return
Michael Salim's avatar
Michael Salim committed
43 44 45 46 47
       else:
           logger.error(' consume_msg called, but body is None ')
           # should be some failure notice to argo
           # acknowledge receipt of message
           channel.basic_ack(method_frame.delivery_tag)
48 49 50 51

    def no_message_consume_msg(self):
       pass

Michael Salim's avatar
Michael Salim committed
52 53 54 55 56 57 58 59 60 61 62 63
   def zmq_consume_msg(self, body):
       logger.debug(' in zmq_message_consume_msg')
       if body:
           logger.debug(' received ZMQmessage: ' + body)
           try:
               self.handle_msg(body)
            except Exception as e:
                logger.exception('failed to handle_msg. not continuing with this job')
                return
        else:
            logger.error(' consume_msg called, but body is empty or None'))

64 65 66
    def shutdown(self):
       logger.debug(' stopping message consumer ')
       self.messageInterface.stop_receive_loop()