Commit e7164b98 authored by Michael Salim's avatar Michael Salim

Merge branch 'message-refactor' into testing

parents c24d23cb 2257eab3
......@@ -2,7 +2,7 @@ import logging,sys,os
logger = logging.getLogger(__name__)
from common import MessageReceiver,db_tools
from balsam import models
from balsam import models, BalsamStatusSender
from django.conf import settings
from django.db import utils,connections,DEFAULT_DB_ALIAS
......@@ -10,54 +10,29 @@ from django.db import utils,connections,DEFAULT_DB_ALIAS
class BalsamJobReceiver(MessageReceiver.MessageReceiver):
''' subscribes to the input user job queue and adds jobs to the database '''
def __init__(self):
MessageReceiver.MessageReceiver.__init__(self,
settings.BALSAM_SITE,
settings.BALSAM_SITE,
settings.RABBITMQ_SERVER_NAME,
settings.RABBITMQ_SERVER_PORT,
settings.RABBITMQ_BALSAM_EXCHANGE_NAME,
settings.RABBITMQ_SSL_CERT,
settings.RABBITMQ_SSL_KEY,
settings.RABBITMQ_SSL_CA_CERTS
)
def __init__(self, receiver_settings):
MessageReceiver.MessageReceiver.__init__(self, receiver_settings)
# This is where the real processing of incoming messages happens
def consume_msg(self,channel,method_frame,header_frame,body):
logger.debug('in consume_msg' )
if body is not None:
logger.debug(' received message: ' + body )
try:
# It is invoked by the parent MessageReceiver's protocol-specific
# message consumer
def handle_msg(self, msg_body):
logger.debug('in handle_msg' )
try:
job = models.BalsamJob()
job.deserialize(body)
except Exception as e:
except Exception as e:
logger.exception('error deserializing incoming job. body = ' + body + ' not conitnuing with this job.')
channel.basic_ack(method_frame.delivery_tag)
return
# should be some failure notice to argo
# create unique DB connection string
try:
raise Exception("Deserialize failed")
try:
db_connection_id = db_tools.get_db_connection_id(os.getpid())
db_backend = utils.load_backend(connections.databases[DEFAULT_DB_ALIAS]['ENGINE'])
db_conn = db_backend.DatabaseWrapper(connections.databases[DEFAULT_DB_ALIAS], db_connection_id)
connections[db_connection_id] = db_conn
except Exception as e:
logger.exception(' received exception while creating DB connection, exception message: ')
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
return
job.save()
models.send_status_message(job)
else:
logger.error(' consume_msg called, but body is None ')
# should be some failure notice to argo
# acknowledge receipt of message
channel.basic_ack(method_frame.delivery_tag)
# delete DB connection
del connections[db_connection_id]
except Exception as e:
raise Exception("received exception while creating DB connection")
job.save()
del connections[db_connection_id]
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job)
import logging
logger = logging.getLogger(__name__)
from balsam import BalsamJobStatus
from common import PikaMessageInterface, NoMessageInterface
SENDER_MAP = {
'pika' : PikaMessageInterface.PikaMessageInterface,
'no_message' : NoMessageInterface.NoMessageInterface
}
class BalsamStatusSender(object):
'''Use an instance of this class to send status messages out from Balsam.
Constructor passes the messaging protocol details on to MessageInterface'''
def __init__(self, settings):
sender_mode = settings['mode']
MessageClass = SENDER_MAP[sender_mode]
self.messageInterface = MessageClass(settings)
def send_status(self,job,message=''):
'''send a status message describing a job state'''
p = self.messageInterface
try:
p.setup_send()
statmsg = BalsamJobStatus.BalsamJobStatus(job, message)
p.send_msg(statmsg.serialize())
p.close()
except Exception as e:
logger.exception(
'job(pk='+str(job.pk)+',id='+str(job.job_id)+
'): Failed to send BalsamJobStatus message, received exception'
)
......@@ -4,7 +4,7 @@ logger = logging.getLogger(__name__)
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from balsam import models,BalsamJobReceiver,QueueMessage
from balsam import models,BalsamJobReceiver,QueueMessage,BalsamStatusSender
from common import DirCleaner,log_uncaught_exceptions,TransitionJob
from balsam import scheduler
from balsam.schedulers import exceptions,jobstates
......@@ -27,12 +27,15 @@ class Command(BaseCommand):
subprocesses = {}
# start the balsam job receiver in separate thread
try:
p = BalsamJobReceiver.BalsamJobReceiver()
p = BalsamJobReceiver.BalsamJobReceiver(settings.RECEIVER_CONFIG)
p.start()
subprocesses['BalsamJobReceiver'] = p
except Exception as e:
logger.exception(' Received Exception while trying to start job receiver: ' + str(e))
# Balsam status message sender
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
# setup timer for cleaning the work folder of old files
logger.debug('creating DirCleaner')
workDirCleaner = DirCleaner.DirCleaner(settings.BALSAM_WORK_DIRECTORY,
......@@ -76,17 +79,17 @@ class Command(BaseCommand):
logger.debug('job pk=' + str(job.pk) + ' remains in state ' + str(jobstate))
continue # jump to next job, skip remaining actions
job.save(update_fields=['state'])
models.send_status_message(job,'Job entered ' + job.state + ' state')
status_sender.send_status(job,'Job entered ' + job.state + ' state')
except exceptions.JobStatusFailed as e:
message = 'get_job_status failed for pk='+str(job.pk)+': ' + str(e)
logger.error(message)
# TODO: Should I fail the job?
models.send_status_message(job,message)
status_sender.send_status(job,message)
except Exception as e:
message = 'failed to get status for pk='+str(job.pk)+', exception: ' + str(e)
logger.error(message)
# TODO: Should I fail the job?
models.send_status_message(job,message)
status_sender.send_status(job,message)
......
......@@ -9,7 +9,7 @@ from django.db import utils,connections,DEFAULT_DB_ALIAS
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from balsam import BalsamJobStatus
from balsam import BalsamStatusSender
from common import transfer,MessageInterface,run_subprocess
from common import log_uncaught_exceptions,db_tools,Serializer
from balsam import scheduler,BalsamJobMessage
......@@ -37,7 +37,8 @@ def stage_in(job):
job.state = STAGED_IN.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,message)
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
# stage out files for a job
def stage_out(job):
......@@ -58,7 +59,8 @@ def stage_out(job):
job.state = STAGED_OUT.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,message)
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
# preprocess a job
def preprocess(job):
......@@ -105,7 +107,8 @@ def preprocess(job):
job.state = PREPROCESS_FAILED.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,message)
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
# submit the job to the local scheduler
def submit(job):
......@@ -143,7 +146,8 @@ def submit(job):
job.save(update_fields=['state','scheduler_id'],using=db_tools.get_db_connection_id(job.pk))
logger.debug('sending status message')
send_status_message(job,message)
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
logger.debug('submit done')
......@@ -192,38 +196,18 @@ def postprocess(job):
job.state = POSTPROCESS_FAILED.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,message)
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
def finish_job(job):
''' simply change state to Finished and send status to user '''
job.state = JOB_FINISHED.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,'Job finished')
message= "Success!"
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
def send_status_message(job,message=''):
''' send a status message describing a job state '''
return
logger.debug('in send_status_message')
# setup message interface
try:
p = MessageInterface.MessageInterface(
host = settings.RABBITMQ_SERVER_NAME,
port = settings.RABBITMQ_SERVER_PORT,
exchange_name = settings.RABBITMQ_BALSAM_EXCHANGE_NAME,
ssl_cert = settings.RABBITMQ_SSL_CERT,
ssl_key = settings.RABBITMQ_SSL_KEY,
ssl_ca_certs = settings.RABBITMQ_SSL_CA_CERTS,
)
p.open_blocking_connection()
statmsg = BalsamJobStatus.BalsamJobStatus(job,message)
p.send_msg(statmsg.serialize(), settings.RABBITMQ_BALSAM_JOB_STATUS_ROUTING_KEY)
p.close()
except Exception as e:
logger.exception('job(pk='+str(job.pk)+',id='+str(job.job_id)+
'): Failed to send BalsamJobStatus message, received exception')
# -------- Job States ------------------------
from common.JobState import JobState
......
import sys,os,ssl
import pika,time
import logging
logger = logging.getLogger(__name__)
logging.getLogger('pika').setLevel(logging.WARNING)
#logging.getLogger('select_connection').setLevel(logging.DEBUG)
class MessageInterface:
'''These are the public methods to be implemented by MessageInterfaces like
PikaMessageInterface. All protocol-specfic methods should be hidden'''
def __init__(self,
username = '',
password = '',
host = '',
port = -1,
virtual_host = '/',
socket_timeout = 120,
exchange_name = '',
exchange_type = 'topic',
exchange_durable = True,
exchange_auto_delete = False,
ssl_cert = '',
ssl_key = '',
ssl_ca_certs = '',
queue_is_durable = True,
queue_is_exclusive = False,
queue_is_auto_delete = False,
):
self.username = username
self.password = password
self.host = host
self.port = port
self.virtual_host = virtual_host
self.socket_timeout = socket_timeout
self.exchange_name = exchange_name
self.exchange_type = exchange_type
self.exchange_durable = exchange_durable
self.exchange_auto_delete = exchange_auto_delete
self.queue_is_durable = queue_is_durable
self.queue_is_exclusive = queue_is_exclusive
self.queue_is_auto_delete = queue_is_auto_delete
self.ssl_cert = ssl_cert
self.ssl_key = ssl_key
self.ssl_ca_certs = ssl_ca_certs
self.credentials = None
self.parameters = None
self.connection = None
self.channel = None
def open_blocking_connection(self):
logger.debug("open blocking connection")
self.create_connection_parameters()
# open the connection and grab the channel
try:
self.connection = pika.BlockingConnection(self.parameters)
except:
logger.exception(' Exception received while trying to open blocking connection to message server')
raise
try:
self.channel = self.connection.channel()
except:
logger.exception(' Exception received while trying to open a channel to the message server')
raise
logger.debug("create exchange, name = " + self.exchange_name)
# make sure exchange exists (doesn't do anything if already created)
self.channel.exchange_declare(
exchange = self.exchange_name,
exchange_type = self.exchange_type,
durable = self.exchange_durable,
auto_delete = self.exchange_auto_delete,
)
def open_select_connection(self,
on_open_callback = None,
on_open_error_callback = None,
on_close_callback = None,
stop_ioloop_on_close = True,
):
logger.debug("create select connection")
self.create_connection_parameters()
# open the connection
if on_open_callback is not None:
try:
self.connection = pika.SelectConnection(self.parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
stop_ioloop_on_close,
)
except:
logger.error(' Exception received while trying to open select connection to message server: ' + str(sys.exc_info()))
raise
def create_connection_parameters(self):
logger.debug("create connection parameters, server = " + self.host + " port = " + str(self.port))
# need to set credentials to login to the message server
#self.credentials = pika.PlainCredentials(self.username,self.password)
self.credentials = pika.credentials.ExternalCredentials()
ssl_options_dict = {
"certfile": self.ssl_cert,
"keyfile": self.ssl_key,
"ca_certs": self.ssl_ca_certs,
"cert_reqs": ssl.CERT_REQUIRED,
}
#logger.debug(str(ssl_options_dict))
# setup our connection parameters
self.parameters = pika.ConnectionParameters(
host = self.host,
port = self.port,
virtual_host = self.virtual_host,
credentials = self.credentials,
socket_timeout = self.socket_timeout,
ssl = True,
ssl_options = ssl_options_dict,
)
def create_queue(self,name,routing_key):
# declare a random queue which this job will use to receive messages
# durable = survive reboots of the broker
# exclusive = only current connection can access this queue
# auto_delete = queue will be deleted after connection is closed
self.channel.queue_declare(
queue = str(name),
durable = self.queue_is_durable,
exclusive = self.queue_is_exclusive,
auto_delete = self.queue_is_auto_delete
)
# now bind this queue to the exchange, using a routing key
# any message submitted to the echange with the
# routing key will appear on this queue
self.channel.queue_bind(exchange=self.exchange_name,
queue=str(name),
routing_key=str(routing_key)
)
def close(self):
#self.channel.close()
#self.connection.close()
self.channel = None
self.connection = None
def send_msg(self,
message_body,
routing_key,
exchange_name = None,
message_headers = {},
priority = 0, # make message persistent
delivery_mode = 2, # default
):
try:
if exchange_name is None:
exchange_name = self.exchange_name
timestamp = time.time()
# create the message properties
properties = pika.BasicProperties(
delivery_mode = delivery_mode,
priority = priority,
timestamp = timestamp,
headers = message_headers,
)
logger.debug("sending message body:\n" + str(message_body))
logger.debug('sending message to exchange: ' + self.exchange_name)
logger.debug('sending message with routing key: ' + routing_key)
self.channel.basic_publish(
exchange = exchange_name,
routing_key = routing_key,
body = message_body,
properties = properties,
)
except Exception as e:
logger.exception('exception received while trying to send message')
raise Exception('exception received while trying to send message' + str(e))
def receive_msg(self,queue_name):
# retrieve one message
method, properties, body = self.channel.basic_get(queue=queue_name)
return method,properties,body
def purge_queue(self,queue_name):
self.channel.queue_purge(queue = queue_name)
def __init__(self, settings):
raise NotImplementedError
def setup_send(self):
raise NotImplementedError
def setup_receive(self, consume_msg=None):
raise NotImplementedError
def send_msg(self, message_body):
raise NotImplementedError
def receive_msg(self):
raise NotImplementedError
def start_receive_loop(self):
raise NotImplementedError
def stop_receive_loop(self):
raise NotImplementedError
def close(self):
raise NotImplementedError
from common.MessageInterface import MessageInterface
from common import PikaMessageInterface, NoMessageInterface
from django.conf import settings
import logging,sys,multiprocessing,time,os
logger = logging.getLogger(__name__)
QUEUE_NAME = None
ROUTING_KEY = None
RECEIVER_MAP = {
'pika' : PikaMessageInterface.PikaMessageInterface,
'no_message' : NoMessageInterface.NoMessageInterface
}
class MessageReceiver(multiprocessing.Process):
''' subscribes to a queue and executes the given callback'''
# this method should be re-defined by the user via inheritance
def consume_msg(self,channel,method_frame,header_frame,body):
pass
def __init__(self,
msg_queue,
msg_routing_key,
msg_host,
msg_port,
msg_exchange_name,
msg_ssl_cert,
msg_ssl_key,
msg_ssl_ca_certs,
):
# execute super constructor
super(MessageReceiver,self).__init__()
#self.exit = multiprocessing.Event()
self.messageInterface = MessageInterface()
self.messageInterface.host = msg_host
self.messageInterface.port = msg_port
self.messageInterface.exchange_name = msg_exchange_name
self.messageInterface.ssl_cert = msg_ssl_cert
self.messageInterface.ssl_key = msg_ssl_key
self.messageInterface.ssl_ca_certs = msg_ssl_ca_certs
self.message_queue = msg_queue
self.message_routing_key = msg_routing_key
def run(self):
logger.debug(' in run ')
# setup receiving queue and exchange
logger.debug( ' open blocking connection to setup queue ' )
self.messageInterface.open_blocking_connection()
self.messageInterface.create_queue(self.message_queue,self.message_routing_key)
self.messageInterface.close()
logger.debug( ' open select connection ' )
# start consuming incoming messages
try:
self.messageInterface.open_select_connection(self.on_connection_open)
except:
logger.exception(' Received exception while opening select connection: ' + str(sys.exc_info()[1]))
raise
logger.debug( ' start message consumer ' )
try:
self.messageInterface.connection.ioloop.start()
except:
logger.exception(' Received exception while starting ioloop for message consumer: ' + str(sys.exc_info()[1]))
raise
# not working... connection is None for some reason
def shutdown(self):
logger.debug(' stopping message consumer ')
try:
logger.debug(' message connection: ' + str(self.messageInterface.connection) )
logger.debug(' message ioloop: ' + str(self.messageInterface.connection.ioloop) )
self.messageInterface.connection.ioloop.stop()
logger.debug( ' after stopping message consumer ')
except:
logger.exception(' Received exception while stopping ioloop for the message consumer: ' + str(sys.exc_info()[1]))
raise
#self.exit.set()
''' subscribes to a queue and executes the given callback'''
def __init__(self, settings):
# execute multiprocessing.Process superconstructor
super(MessageReceiver,self).__init__()
receiver_mode = settings['mode']
MessageClass = RECEIVER_MAP[receiver_mode]
self.messageInterface = MessageClass(settings)
self.consume_msg = getattr(self, '%s_consume_msg' % receiver_mode)
def handle_msg(self, msg_body):
'''This handles the message in a protocol-independent way'''
raise NotImplementedError
def on_connection_open(self,connection):
logger.debug(' in on_connection_open')
try:
connection.channel(self.on_channel_open)
except:
logger.exception(' Received exception while opening connection to message server: ' + str(sys.exc_info()[1]))
raise
def on_channel_open(self,channel):
logger.debug(' in on_channel_open')
try:
channel.basic_consume(self.consume_msg,self.message_queue)
except:
logger.exception(' Received exception while creating message consumer: ' + str(sys.exc_info()[1]))
raise
def run(self):
logger.debug(' in run ')
self.messageInterface.setup_receive(self.consume_msg)
self.messageInterface.start_receive_loop()
def pika_consume_msg(self,channel,method_frame,header_frame,body):
logger.debug('in pika_consume_msg' )
if body is not None:
logger.debug(' received message: ' + body )
try:
self.handle_msg(body)
except Exception as e:
logger.exception('failed to handle_msg. not continuing with this job')
channel.basic_ack(method_frame.delivery_tag)
return
else:
logger.error(' consume_msg called, but body is None ')
# should be some failure notice to argo
# acknowledge receipt of message
channel.basic_ack(method_frame.delivery_tag)
def no_message_consume_msg(self):
pass
def shutdown(self):
logger.debug(' stopping message consumer ')
self.messageInterface.stop_receive_loop()
import logging
import time
import threading
logger = logging.getLogger(__name__)
from common import MessageInterface
class NoMessageInterface(MessageInterface.MessageInterface):
def __init__(self, settings):
self.alive = True
self.thread = None
def setup_send(self):
pass
def setup_receive(self, consume_msg=None):
self.thread = threading.Thread(target=self._fake_ioloop)
def send_msg(self, message_body):
pass
def receive_msg(self):
pass
def start_receive_loop(self):
try:
logger.debug('starting dummy receiver ioloop')
self.thread.start()
self.thread.join()
except Exception:
logger.exception('failed to start dummy receiver ioloop')
def stop_receive_loop(self):
self.alive = False
self.thread.join()
def close(self):
pass
def _fake_ioloop(self):
logger.debug('Thread: inside fake_ioloop')
try:
while self.alive:
logger.debug(' Thread: Idling Receiver')
time.sleep(15)
except Exception:
logger.debug('The ioloop failed')
import sys,os,ssl
import pika,time
import logging
logger = logging.getLogger(__name__)
logging.getLogger('pika').setLevel(logging.WARNING)
from common import MessageInterface
class PikaMessageInterface(MessageInterface.MessageInterface):
def __init__(self, settings):
self.username = settings['username']
self.password = settings['password']
self.host = settings['host']
self.port = settings['port']
self.virtual_host = settings['virtual_host']
self.socket_timeout = settings['socket_timeout']
self.exchange_name = settings['exchange_name']
self.exchange_type = settings['exchange_type']
self.exchange_durable = settings['exchange_durable']
self.exchange_auto_delete = settings['exchange_auto_delete']
self.queue_name = settings['queue_name']
self.queue_is_durable = settings['queue_is_durable']
self.queue_is_exclusive = settings['queue_is_exclusive']
self.queue_is_auto_delete = settings['queue_is_auto_delete']
self.default_routing_key = settings['default_routing_key']
self.ssl_cert = settings['ssl_cert']
self.ssl_key = settings['ssl_key']
self.ssl_ca_certs = settings['ssl_ca_certs']
self.credentials = None
self.parameters = None
self.connection = None
self.channel = None
self.consume_msg = None
def send_msg(self, message_body, routing_key=None):
exchange_name = self.exchange_name
message_headers = {}
priority = 0
delivery_mode = 2
if routing_key is None: