Commit d88d127f authored by jtchilders's avatar jtchilders
Browse files

changed argo_job_id to job_id in ArgoJob, changed origin_id to argo_job_id in BalsamJob/ArgoSubJob

parent 0a3dd01f
......@@ -8,7 +8,7 @@ from django.conf import settings
from common import MessageReceiver
from argo import QueueMessage
from argo.models import ArgoJob,ArgoSubJob,BALSAM_JOB_TO_SUBJOB_STATE_MAP
from balsam import BalsamJobStatus
from balsam import BalsamJobStatus,models
class JobStatusReceiver(MessageReceiver.MessageReceiver):
''' subscribes to the balsam job status queue and updates a job state '''
......@@ -35,51 +35,52 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver):
# convert body text to BalsamJobStatusMessage
statusMsg = BalsamJobStatus.BalsamJobStatus()
statusMsg.deserialize(body)
logger.info(' received status message for job ' + str(statusMsg.origin_id) + ', message: ' + str(statusMsg.message))
logger.info(' received status message for job ' + str(statusMsg.job_id) + ', message: ' + str(statusMsg.message))
# create unique DB connection string
db_connection_id = 'db_con_%08i' % statusMsg.origin_id
db_connection_id = 'db_con_%08i' % statusMsg.job_id
db_backend = load_backend(connections.databases[DEFAULT_DB_ALIAS]['ENGINE'])
db_conn = db_backend.DatabaseWrapper(connections.databases[DEFAULT_DB_ALIAS], db_connection_id)
connections[db_connection_id] = db_conn
# get the subjob for this message
try:
subjob = ArgoSubJob.objects.get(balsam_job_id=statusMsg.id)
subjob = ArgoSubJob.objects.get(job_id=statusMsg.job_id)
except Exception,e:
logger.error(' exception received while retreiving ArgoSubJob with id = ' + str(statusMsg.id) + ': ' + str(e))
logger.error(' exception received while retreiving ArgoSubJob with id = ' + str(statusMsg.job_id) + ': ' + str(e))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
# send message to balsam_service about completion
self.process_queue.put(QueueMessage.QueueMessage(statusMsg.id,
self.process_queue.put(QueueMessage.QueueMessage(statusMsg.job_id,
QueueMessage.JobStatusReceiverRetrieveArgoSubJobFailed))
return
# get the argo job for this subjob
try:
argojob = ArgoJob.objects.get(argo_job_id=subjob.origin_id)
argojob = ArgoJob.objects.get(job_id=subjob.job_id)
except Exception,e:
logger.error(' exception received while retrieving ArgoJob with id = ' + str(subjob.origin_id + ': ' + str(e)))
logger.error(' exception received while retrieving ArgoJob with id = ' + str(subjob.job_id + ': ' + str(e)))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
# send message to balsam_service about completion
self.process_queue.put(QueueMessage.QueueMessage(subjob.origin_id,
self.process_queue.put(QueueMessage.QueueMessage(subjob.job_id,
QueueMessage.JobStatusReceiverRetrieveArgoJobFailed))
return
# get the deserialized balsam job
try:
balsam_job = statusMsg.get_job()
balsam_job = models.BalsamJob()
statusMsg.get_job(balsam_job)
logger.debug('balsam_job = ' + str(balsam_job))
except BalsamJobStatus.DeserializeFailed,e:
logger.error('Failed to deserialize BalsamJob for BalsamJobStatus message for argojob: ' + str(argojob.argo_job_id) + ' subjob_id: ' + str(subjob.balsam_job_id))
logger.error('Failed to deserialize BalsamJob for BalsamJobStatus message for argojob: ' + str(argojob.job_id) + ' subjob_id: ' + str(subjob.job_id))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
# send message to balsam_service about completion
self.process_queue.put(QueueMessage.QueueMessage(subjob.origin_id,
self.process_queue.put(QueueMessage.QueueMessage(subjob.job_id,
QueueMessage.JobStatusReceiverRetrieveArgoJobFailed))
return
......@@ -98,12 +99,12 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver):
argojob.state = BALSAM_JOB_TO_SUBJOB_STATE_MAP[balsam_job.state].name
logger.debug(' receieved subjob state = ' + subjob.state + ' setting argo job state to ' + argojob.state)
except KeyError,e:
logger.error(' could not map balsam_job state: ' + str(balsam_job.state) + ' to an ArgoJob state for job id: ' + str(argojob.argo_job_id))
logger.error(' could not map balsam_job state: ' + str(balsam_job.state) + ' to an ArgoJob state for job id: ' + str(argojob.job_id))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
# send message to balsam_service about completion
self.process_queue.put(QueueMessage.QueueMessage(argojob.argo_job_id,
self.process_queue.put(QueueMessage.QueueMessage(argojob.job_id,
QueueMessage.JobStatusReceiverBalsamStateMapFailure))
return
......@@ -112,7 +113,7 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver):
else:
logger.error('received no balsam_job from BalsamJobStatus')
self.process_queue.put(QueueMessage.QueueMessage(argojob.argo_job_id,
self.process_queue.put(QueueMessage.QueueMessage(argojob.job_id,
QueueMessage.JobStatusReceiverCompleted))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
......@@ -123,7 +124,7 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver):
else:
logger.debug(' consume_msg called, but body is None ')
self.process_queue.put(QueueMessage.QueueMessage(argojob.argo_job_id,
self.process_queue.put(QueueMessage.QueueMessage(argojob.job_id,
QueueMessage.JobStatusReceiverMessageNoBody))
except Exception,e:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment