MessageInterface.py 8.54 KB
Newer Older
jtchilders's avatar
jtchilders committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
import sys,os,ssl
import pika,time

import logging
logger = logging.getLogger(__name__)
logging.getLogger('pika').setLevel(logging.WARNING)
#logging.getLogger('select_connection').setLevel(logging.DEBUG)

class MessageInterface:
   
   def __init__(self,
                username                  = '',
                password                  = '',
                host                      = '',
                port                      = -1,
                virtual_host              = '/',
                socket_timeout            = 120,
                exchange_name             = '',
                exchange_type             = 'topic',
                exchange_durable          = True,
                exchange_auto_delete      = False,
                ssl_cert                  = '',
                ssl_key                   = '',
                ssl_ca_certs              = '',
                queue_is_durable          = True,
                queue_is_exclusive        = False,
                queue_is_auto_delete      = False,
               ):
      self.username                 = username
      self.password                 = password
      self.host                     = host
      self.port                     = port
      self.virtual_host             = virtual_host
      self.socket_timeout           = socket_timeout
      self.exchange_name            = exchange_name
      self.exchange_type            = exchange_type
      self.exchange_durable         = exchange_durable
      self.exchange_auto_delete     = exchange_auto_delete
      self.queue_is_durable         = queue_is_durable
      self.queue_is_exclusive       = queue_is_exclusive
      self.queue_is_auto_delete     = queue_is_auto_delete

      self.ssl_cert                 = ssl_cert
      self.ssl_key                  = ssl_key
      self.ssl_ca_certs             = ssl_ca_certs

      self.credentials = None
      self.parameters = None
      self.connection = None
      self.channel = None


   def open_blocking_connection(self):
      
      logger.debug("open blocking connection")
      self.create_connection_parameters()

      # open the connection and grab the channel
      try:
         self.connection = pika.BlockingConnection(self.parameters)
      except:
         logger.exception(' Exception received while trying to open blocking connection to message server')
         raise

      try:
         self.channel   = self.connection.channel()
      except:
         logger.exception(' Exception received while trying to open a channel to the message server')
         raise
      
      logger.debug("create exchange, name = " + self.exchange_name) 
      # make sure exchange exists (doesn't do anything if already created)
      self.channel.exchange_declare(
                                    exchange       = self.exchange_name,
                                    exchange_type  = self.exchange_type,
                                    durable        = self.exchange_durable,
                                    auto_delete    = self.exchange_auto_delete,
                                   )

   def open_select_connection(self,
                              on_open_callback              = None,
                              on_open_error_callback        = None,
                              on_close_callback             = None,
                              stop_ioloop_on_close          = True,
                             ):
      logger.debug("create select connection")
      self.create_connection_parameters()
      # open the connection
      if on_open_callback is not None:
         try:
            self.connection = pika.SelectConnection(self.parameters,
                                                    on_open_callback,
                                                    on_open_error_callback,
                                                    on_close_callback,
                                                    stop_ioloop_on_close,
                                                   )
         except:
            logger.error(' Exception received while trying to open select connection to message server: ' + str(sys.exc_info()))
            raise

   def create_connection_parameters(self):
      logger.debug("create connection parameters, server = " + self.host + " port = " + str(self.port))
      # need to set credentials to login to the message server
      #self.credentials = pika.PlainCredentials(self.username,self.password)
      self.credentials = pika.credentials.ExternalCredentials()
      ssl_options_dict = {
                          "certfile":  self.ssl_cert,
                          "keyfile":   self.ssl_key,
                          "ca_certs":  self.ssl_ca_certs,
                          "cert_reqs": ssl.CERT_REQUIRED,
                         }

                           
      #logger.debug(str(ssl_options_dict))
      # setup our connection parameters
Michael Salim's avatar
Michael Salim committed
116 117 118 119 120 121 122 123 124 125
      self.parameters = pika.ConnectionParameters('localhost')
      #self.parameters = pika.ConnectionParameters(
      #                                            host               = self.host,
      #                                            port               = self.port,
      #                                            virtual_host       = self.virtual_host,
      #                                            credentials        = self.credentials,
      #                                            socket_timeout     = self.socket_timeout,
      #                                            ssl                = True,
      #                                            ssl_options        = ssl_options_dict,
      #                                           )
jtchilders's avatar
jtchilders committed
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

   def create_queue(self,name,routing_key):
      # declare a random queue which this job will use to receive messages
      # durable = survive reboots of the broker
      # exclusive = only current connection can access this queue
      # auto_delete = queue will be deleted after connection is closed
      self.channel.queue_declare(
                                 queue       = str(name),
                                 durable     = self.queue_is_durable,
                                 exclusive   = self.queue_is_exclusive,
                                 auto_delete = self.queue_is_auto_delete
                                )

      # now bind this queue to the exchange, using a routing key
      # any message submitted to the echange with the
      # routing key will appear on this queue
      self.channel.queue_bind(exchange=self.exchange_name,
                              queue=str(name),
                              routing_key=str(routing_key)
                             )

   def close(self):
      #self.channel.close()
      #self.connection.close()
      self.channel = None
      self.connection = None

   def send_msg(self,
                  message_body,
                  routing_key,
                  exchange_name = None,
                  message_headers = {},
                  priority = 0, # make message persistent
                  delivery_mode = 2, # default
               ):
      try:
         if exchange_name is None:
            exchange_name = self.exchange_name
         
         timestamp = time.time()

         # create the message properties
         properties = pika.BasicProperties(
                                           delivery_mode = delivery_mode,
                                           priority      = priority,
                                           timestamp     = timestamp,
                                           headers       = message_headers,
                                          )

         logger.debug("sending message body:\n" +  str(message_body))
         logger.debug('sending message to exchange: ' + self.exchange_name)
         logger.debug('sending message with routing key: ' + routing_key)

         self.channel.basic_publish(
                                    exchange         = exchange_name,
                                    routing_key      = routing_key,
                                    body             = message_body,
                                    properties       = properties,
                                   )
Thomas Uram's avatar
Thomas Uram committed
185
      except Exception as e:
jtchilders's avatar
jtchilders committed
186 187 188 189 190 191 192 193 194 195 196
         logger.exception('exception received while trying to send message')
         raise Exception('exception received while trying to send message' + str(e))
   
   def receive_msg(self,queue_name):
      # retrieve one message
      method, properties, body = self.channel.basic_get(queue=queue_name)
      return method,properties,body

   def purge_queue(self,queue_name):
      self.channel.queue_purge(queue = queue_name)