Commit 5469a4e4 authored by Michael Salim's avatar Michael Salim
Browse files

MessageInterface is a now a generic interface, which is implemented by

PikaMessageInterface and NoMessageInterface.  The BalsamJobReceiver and
BalsamStatusSender interact with their respective MessageInterfaces
through a few public methods that encapsulate all the protocol-specific
methods.  This should make it easier to add new job sources like HTTP
parent 1fa45631
...@@ -2,7 +2,7 @@ import logging,sys,os ...@@ -2,7 +2,7 @@ import logging,sys,os
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from common import MessageReceiver,db_tools from common import MessageReceiver,db_tools
from balsam import models from balsam import models, BalsamStatusSender
from django.conf import settings from django.conf import settings
from django.db import utils,connections,DEFAULT_DB_ALIAS from django.db import utils,connections,DEFAULT_DB_ALIAS
...@@ -10,54 +10,29 @@ from django.db import utils,connections,DEFAULT_DB_ALIAS ...@@ -10,54 +10,29 @@ from django.db import utils,connections,DEFAULT_DB_ALIAS
class BalsamJobReceiver(MessageReceiver.MessageReceiver): class BalsamJobReceiver(MessageReceiver.MessageReceiver):
''' subscribes to the input user job queue and adds jobs to the database ''' ''' subscribes to the input user job queue and adds jobs to the database '''
def __init__(self): def __init__(self, receiver_settings):
MessageReceiver.MessageReceiver.__init__(self, MessageReceiver.MessageReceiver.__init__(self, receiver_settings)
settings.BALSAM_SITE,
settings.BALSAM_SITE,
settings.RABBITMQ_SERVER_NAME,
settings.RABBITMQ_SERVER_PORT,
settings.RABBITMQ_BALSAM_EXCHANGE_NAME,
settings.RABBITMQ_SSL_CERT,
settings.RABBITMQ_SSL_KEY,
settings.RABBITMQ_SSL_CA_CERTS
)
# This is where the real processing of incoming messages happens # This is where the real processing of incoming messages happens
def consume_msg(self,channel,method_frame,header_frame,body): # It is invoked by the parent MessageReceiver's protocol-specific
logger.debug('in consume_msg' ) # message consumer
if body is not None: def handle_msg(self, msg_body):
logger.debug('in handle_msg' )
logger.debug(' received message: ' + body ) try:
try:
job = models.BalsamJob() job = models.BalsamJob()
job.deserialize(body) job.deserialize(body)
except Exception as e: except Exception as e:
logger.exception('error deserializing incoming job. body = ' + body + ' not conitnuing with this job.') logger.exception('error deserializing incoming job. body = ' + body + ' not conitnuing with this job.')
channel.basic_ack(method_frame.delivery_tag) raise Exception("Deserialize failed")
return try:
# should be some failure notice to argo
# create unique DB connection string
try:
db_connection_id = db_tools.get_db_connection_id(os.getpid()) db_connection_id = db_tools.get_db_connection_id(os.getpid())
db_backend = utils.load_backend(connections.databases[DEFAULT_DB_ALIAS]['ENGINE']) db_backend = utils.load_backend(connections.databases[DEFAULT_DB_ALIAS]['ENGINE'])
db_conn = db_backend.DatabaseWrapper(connections.databases[DEFAULT_DB_ALIAS], db_connection_id) db_conn = db_backend.DatabaseWrapper(connections.databases[DEFAULT_DB_ALIAS], db_connection_id)
connections[db_connection_id] = db_conn connections[db_connection_id] = db_conn
except Exception as e: except Exception as e:
logger.exception(' received exception while creating DB connection, exception message: ') raise Exception("received exception while creating DB connection")
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
return
job.save()
models.send_status_message(job)
else:
logger.error(' consume_msg called, but body is None ')
# should be some failure notice to argo
# acknowledge receipt of message
channel.basic_ack(method_frame.delivery_tag)
# delete DB connection
del connections[db_connection_id]
job.save()
del connections[db_connection_id]
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job)
import logging
logger = logging.getLogger(__name__)
from balsam import BalsamJobStatus
from common import PikaMessageInterface, NoMessageInterface
SENDER_MAP = {
'pika' : PikaMessageInterface.PikaMessageInterface,
'no_message' : NoMessageInterface.NoMessageInterface
}
class BalsamStatusSender(object):
'''Use an instance of this class to send status messages out from Balsam.
Constructor passes the messaging protocol details on to MessageInterface'''
def __init__(self, settings):
sender_mode = settings['mode']
MessageClass = SENDER_MAP[sender_mode]
self.messageInterface = MessageClass(settings)
def send_status(job,message=''):
'''send a status message describing a job state'''
p = self.messageInterface
try:
p.setup_send()
statmsg = BalsamJobStatus.BalsamJobStatus(job, message)
p.send_msg(statmsg.serialize())
p.close()
except Exception as e:
logger.exception(
'job(pk='+str(job.pk)+',id='+str(job.job_id)+
'): Failed to send BalsamJobStatus message, received exception'
)
...@@ -4,7 +4,7 @@ logger = logging.getLogger(__name__) ...@@ -4,7 +4,7 @@ logger = logging.getLogger(__name__)
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.conf import settings from django.conf import settings
from balsam import models,BalsamJobReceiver,QueueMessage from balsam import models,BalsamJobReceiver,QueueMessage,BalsamStatusSender
from common import DirCleaner,log_uncaught_exceptions,TransitionJob from common import DirCleaner,log_uncaught_exceptions,TransitionJob
from balsam import scheduler from balsam import scheduler
from balsam.schedulers import exceptions,jobstates from balsam.schedulers import exceptions,jobstates
...@@ -27,12 +27,15 @@ class Command(BaseCommand): ...@@ -27,12 +27,15 @@ class Command(BaseCommand):
subprocesses = {} subprocesses = {}
# start the balsam job receiver in separate thread # start the balsam job receiver in separate thread
try: try:
p = BalsamJobReceiver.BalsamJobReceiver() p = BalsamJobReceiver.BalsamJobReceiver(settings.RECEIVER_CONFIG)
p.start() p.start()
subprocesses['BalsamJobReceiver'] = p subprocesses['BalsamJobReceiver'] = p
except Exception as e: except Exception as e:
logger.exception(' Received Exception while trying to start job receiver: ' + str(e)) logger.exception(' Received Exception while trying to start job receiver: ' + str(e))
# Balsam status message sender
status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
# setup timer for cleaning the work folder of old files # setup timer for cleaning the work folder of old files
logger.debug('creating DirCleaner') logger.debug('creating DirCleaner')
workDirCleaner = DirCleaner.DirCleaner(settings.BALSAM_WORK_DIRECTORY, workDirCleaner = DirCleaner.DirCleaner(settings.BALSAM_WORK_DIRECTORY,
...@@ -76,17 +79,17 @@ class Command(BaseCommand): ...@@ -76,17 +79,17 @@ class Command(BaseCommand):
logger.debug('job pk=' + str(job.pk) + ' remains in state ' + str(jobstate)) logger.debug('job pk=' + str(job.pk) + ' remains in state ' + str(jobstate))
continue # jump to next job, skip remaining actions continue # jump to next job, skip remaining actions
job.save(update_fields=['state']) job.save(update_fields=['state'])
models.send_status_message(job,'Job entered ' + job.state + ' state') status_sender.send_status(job,'Job entered ' + job.state + ' state')
except exceptions.JobStatusFailed as e: except exceptions.JobStatusFailed as e:
message = 'get_job_status failed for pk='+str(job.pk)+': ' + str(e) message = 'get_job_status failed for pk='+str(job.pk)+': ' + str(e)
logger.error(message) logger.error(message)
# TODO: Should I fail the job? # TODO: Should I fail the job?
models.send_status_message(job,message) status_sender.send_status(job,message)
except Exception as e: except Exception as e:
message = 'failed to get status for pk='+str(job.pk)+', exception: ' + str(e) message = 'failed to get status for pk='+str(job.pk)+', exception: ' + str(e)
logger.error(message) logger.error(message)
# TODO: Should I fail the job? # TODO: Should I fail the job?
models.send_status_message(job,message) status_sender.send_status(job,message)
......
...@@ -9,7 +9,7 @@ from django.db import utils,connections,DEFAULT_DB_ALIAS ...@@ -9,7 +9,7 @@ from django.db import utils,connections,DEFAULT_DB_ALIAS
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings from django.conf import settings
from balsam import BalsamJobStatus from balsam import BalsamStatusSender
from common import transfer,MessageInterface,run_subprocess from common import transfer,MessageInterface,run_subprocess
from common import log_uncaught_exceptions,db_tools,Serializer from common import log_uncaught_exceptions,db_tools,Serializer
from balsam import scheduler,BalsamJobMessage from balsam import scheduler,BalsamJobMessage
...@@ -37,7 +37,8 @@ def stage_in(job): ...@@ -37,7 +37,8 @@ def stage_in(job):
job.state = STAGED_IN.name job.state = STAGED_IN.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk)) job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,message) status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
# stage out files for a job # stage out files for a job
def stage_out(job): def stage_out(job):
...@@ -58,7 +59,8 @@ def stage_out(job): ...@@ -58,7 +59,8 @@ def stage_out(job):
job.state = STAGED_OUT.name job.state = STAGED_OUT.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk)) job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,message) status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
# preprocess a job # preprocess a job
def preprocess(job): def preprocess(job):
...@@ -105,7 +107,8 @@ def preprocess(job): ...@@ -105,7 +107,8 @@ def preprocess(job):
job.state = PREPROCESS_FAILED.name job.state = PREPROCESS_FAILED.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk)) job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,message) status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
# submit the job to the local scheduler # submit the job to the local scheduler
def submit(job): def submit(job):
...@@ -143,7 +146,8 @@ def submit(job): ...@@ -143,7 +146,8 @@ def submit(job):
job.save(update_fields=['state','scheduler_id'],using=db_tools.get_db_connection_id(job.pk)) job.save(update_fields=['state','scheduler_id'],using=db_tools.get_db_connection_id(job.pk))
logger.debug('sending status message') logger.debug('sending status message')
send_status_message(job,message) status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
logger.debug('submit done') logger.debug('submit done')
...@@ -192,38 +196,17 @@ def postprocess(job): ...@@ -192,38 +196,17 @@ def postprocess(job):
job.state = POSTPROCESS_FAILED.name job.state = POSTPROCESS_FAILED.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk)) job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,message) status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
def finish_job(job): def finish_job(job):
''' simply change state to Finished and send status to user ''' ''' simply change state to Finished and send status to user '''
job.state = JOB_FINISHED.name job.state = JOB_FINISHED.name
job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk)) job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
send_status_message(job,'Job finished') status_sender = BalsamStatusSender.BalsamStatusSender(settings.SENDER_CONFIG)
status_sender.send_status(job,message)
def send_status_message(job,message=''):
''' send a status message describing a job state '''
return
logger.debug('in send_status_message')
# setup message interface
try:
p = MessageInterface.MessageInterface(
host = settings.RABBITMQ_SERVER_NAME,
port = settings.RABBITMQ_SERVER_PORT,
exchange_name = settings.RABBITMQ_BALSAM_EXCHANGE_NAME,
ssl_cert = settings.RABBITMQ_SSL_CERT,
ssl_key = settings.RABBITMQ_SSL_KEY,
ssl_ca_certs = settings.RABBITMQ_SSL_CA_CERTS,
)
p.open_blocking_connection()
statmsg = BalsamJobStatus.BalsamJobStatus(job,message)
p.send_msg(statmsg.serialize(), settings.RABBITMQ_BALSAM_JOB_STATUS_ROUTING_KEY)
p.close()
except Exception as e:
logger.exception('job(pk='+str(job.pk)+',id='+str(job.job_id)+
'): Failed to send BalsamJobStatus message, received exception')
# -------- Job States ------------------------ # -------- Job States ------------------------
from common.JobState import JobState from common.JobState import JobState
......
import sys,os,ssl
import pika,time
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.getLogger('pika').setLevel(logging.WARNING)
#logging.getLogger('select_connection').setLevel(logging.DEBUG)
class MessageInterface: class MessageInterface:
'''These are the public methods to be implemented by MessageInterfaces like
PikaMessageInterface. All protocol-specfic methods should be hidden'''
def __init__(self, def __init__(self, settings):
username = '', raise NotImplementedError
password = '',
host = '', def setup_send(self):
port = -1, raise NotImplementedError
virtual_host = '/',
socket_timeout = 120, def setup_receive(self, consume_msg=None):
exchange_name = '', raise NotImplementedError
exchange_type = 'topic',
exchange_durable = True, def send_msg(self, message_body):
exchange_auto_delete = False, raise NotImplementedError
ssl_cert = '',
ssl_key = '', def receive_msg(self):
ssl_ca_certs = '', raise NotImplementedError
queue_is_durable = True,
queue_is_exclusive = False, def start_receive_loop(self):
queue_is_auto_delete = False, raise NotImplementedError
):
self.username = username def stop_receive_loop(self):
self.password = password raise NotImplementedError
self.host = host
self.port = port def close(self):
self.virtual_host = virtual_host raise NotImplementedError
self.socket_timeout = socket_timeout
self.exchange_name = exchange_name
self.exchange_type = exchange_type
self.exchange_durable = exchange_durable
self.exchange_auto_delete = exchange_auto_delete
self.queue_is_durable = queue_is_durable
self.queue_is_exclusive = queue_is_exclusive
self.queue_is_auto_delete = queue_is_auto_delete
self.ssl_cert = ssl_cert
self.ssl_key = ssl_key
self.ssl_ca_certs = ssl_ca_certs
self.credentials = None
self.parameters = None
self.connection = None
self.channel = None
def open_blocking_connection(self):
logger.debug("open blocking connection")
self.create_connection_parameters()
# open the connection and grab the channel
try:
self.connection = pika.BlockingConnection(self.parameters)
except:
logger.exception(' Exception received while trying to open blocking connection to message server')
raise
try:
self.channel = self.connection.channel()
except:
logger.exception(' Exception received while trying to open a channel to the message server')
raise
logger.debug("create exchange, name = " + self.exchange_name)
# make sure exchange exists (doesn't do anything if already created)
self.channel.exchange_declare(
exchange = self.exchange_name,
exchange_type = self.exchange_type,
durable = self.exchange_durable,
auto_delete = self.exchange_auto_delete,
)
def open_select_connection(self,
on_open_callback = None,
on_open_error_callback = None,
on_close_callback = None,
stop_ioloop_on_close = True,
):
logger.debug("create select connection")
self.create_connection_parameters()
# open the connection
if on_open_callback is not None:
try:
self.connection = pika.SelectConnection(self.parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
stop_ioloop_on_close,
)
except:
logger.error(' Exception received while trying to open select connection to message server: ' + str(sys.exc_info()))
raise
def create_connection_parameters(self):
logger.debug("create connection parameters, server = " + self.host + " port = " + str(self.port))
# need to set credentials to login to the message server
#self.credentials = pika.PlainCredentials(self.username,self.password)
self.credentials = pika.credentials.ExternalCredentials()
ssl_options_dict = {
"certfile": self.ssl_cert,
"keyfile": self.ssl_key,
"ca_certs": self.ssl_ca_certs,
"cert_reqs": ssl.CERT_REQUIRED,
}
#logger.debug(str(ssl_options_dict))
# setup our connection parameters
self.parameters = pika.ConnectionParameters(
host = self.host,
port = self.port,
virtual_host = self.virtual_host,
credentials = self.credentials,
socket_timeout = self.socket_timeout,
ssl = True,
ssl_options = ssl_options_dict,
)
def create_queue(self,name,routing_key):
# declare a random queue which this job will use to receive messages
# durable = survive reboots of the broker
# exclusive = only current connection can access this queue
# auto_delete = queue will be deleted after connection is closed
self.channel.queue_declare(
queue = str(name),
durable = self.queue_is_durable,
exclusive = self.queue_is_exclusive,
auto_delete = self.queue_is_auto_delete
)
# now bind this queue to the exchange, using a routing key
# any message submitted to the echange with the
# routing key will appear on this queue
self.channel.queue_bind(exchange=self.exchange_name,
queue=str(name),
routing_key=str(routing_key)
)
def close(self):
#self.channel.close()
#self.connection.close()
self.channel = None
self.connection = None
def send_msg(self,
message_body,
routing_key,
exchange_name = None,
message_headers = {},
priority = 0, # make message persistent
delivery_mode = 2, # default
):
try:
if exchange_name is None:
exchange_name = self.exchange_name
timestamp = time.time()
# create the message properties
properties = pika.BasicProperties(
delivery_mode = delivery_mode,
priority = priority,
timestamp = timestamp,
headers = message_headers,
)
logger.debug("sending message body:\n" + str(message_body))
logger.debug('sending message to exchange: ' + self.exchange_name)
logger.debug('sending message with routing key: ' + routing_key)
self.channel.basic_publish(
exchange = exchange_name,
routing_key = routing_key,
body = message_body,
properties = properties,
)
except Exception as e:
logger.exception('exception received while trying to send message')
raise Exception('exception received while trying to send message' + str(e))
def receive_msg(self,queue_name):
# retrieve one message
method, properties, body = self.channel.basic_get(queue=queue_name)
return method,properties,body
def purge_queue(self,queue_name):
self.channel.queue_purge(queue = queue_name)
from common.MessageInterface import MessageInterface from common import PikaMessageInterface, NoMessageInterface
from django.conf import settings from django.conf import settings
import logging,sys,multiprocessing,time,os import logging,sys,multiprocessing,time,os
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
QUEUE_NAME = None RECEIVER_MAP = {
ROUTING_KEY = None 'pika' : PikaMessageInterface.PikaMessageInterface,
'no_message' : NoMessageInterface.NoMessageInterface
}
class MessageReceiver(multiprocessing.Process): class MessageReceiver(multiprocessing.Process):
''' subscribes to a queue and executes the given callback''' ''' subscribes to a queue and executes the given callback'''
# this method should be re-defined by the user via inheritance def __init__(self, settings):
def consume_msg(self,channel,method_frame,header_frame,body): # execute multiprocessing.Process superconstructor
pass super(MessageReceiver,self).__init__()
def __init__(self, receiver_mode = settings['mode']
msg_queue, MessageClass = RECEIVER_MAP[receiver_mode]
msg_routing_key, self.messageInterface = MessageClass(settings)
msg_host, self.consume_msg = getattr(self, '%s_consume_msg' % receiver_mode)
msg_port,
msg_exchange_name, def handle_msg(self, msg_body):
msg_ssl_cert, '''This handles the message in a protocol-independent way'''
msg_ssl_key, raise NotImplementedError
msg_ssl_ca_certs,
):
# execute super constructor
super(MessageReceiver,self).__init__()
#self.exit = multiprocessing.Event()
self.messageInterface = MessageInterface()
self.messageInterface.host = msg_host
self.messageInterface.port = msg_port
self.messageInterface.exchange_name = msg_exchange_name
self.messageInterface.ssl_cert = msg_ssl_cert
self.messageInterface.ssl_key = msg_ssl_key
self.messageInterface.ssl_ca_certs = msg_ssl_ca_certs
self.message_queue = msg_queue
self.message_routing_key = msg_routing_key
def run(self):
logger.debug(' in run ')
# setup receiving queue and exchange
logger.debug( ' open blocking connection to setup queue ' )
self.messageInterface.open_blocking_connection()
self.messageInt