Commit 550b75ee authored by jtchilders's avatar jtchilders
Browse files

changed argo_job_id to job_id

parent 0a3dc20a
...@@ -7,7 +7,6 @@ logger = logging.getLogger(__name__) ...@@ -7,7 +7,6 @@ logger = logging.getLogger(__name__)
from django.db import utils,connections,DEFAULT_DB_ALIAS,models from django.db import utils,connections,DEFAULT_DB_ALIAS,models
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings from django.conf import settings
from django.core import serializers as django_serializers
from django.core.validators import validate_comma_separated_integer_list from django.core.validators import validate_comma_separated_integer_list
from argo import QueueMessage,ArgoJobStatus from argo import QueueMessage,ArgoJobStatus
...@@ -21,7 +20,7 @@ sys.excepthook = log_uncaught_exceptions.log_uncaught_exceptions ...@@ -21,7 +20,7 @@ sys.excepthook = log_uncaught_exceptions.log_uncaught_exceptions
def submit_subjob(job): def submit_subjob(job):
logger.debug('in submit_subjob pk=' + str(job.pk) + ' argo_job_id='+str(job.argo_job_id)) logger.debug('in submit_subjob pk=' + str(job.pk) + ' job_id='+str(job.job_id))
message = 'Subjob submitted' message = 'Subjob submitted'
try: try:
# get the current subjob # get the current subjob
...@@ -31,8 +30,8 @@ def submit_subjob(job): ...@@ -31,8 +30,8 @@ def submit_subjob(job):
#balsamJobMsg = subjob.get_balsam_job_message() #balsamJobMsg = subjob.get_balsam_job_message()
# determine site name # determine site name
logger.info('Submitting Subjob ' + str(subjob.balsam_job_id) + ' from ArgoJob ' logger.info('Submitting Subjob ' + str(subjob.job_id) + ' from ArgoJob '
+ str(job.argo_job_id) + ' (pk=' + str(job.pk) + ') to ' + subjob.site ) + str(subjob.job_id) + ' (pk=' + str(job.pk) + ') to ' + subjob.site )
# create and configure message interface # create and configure message interface
msgInt = MessageInterface.MessageInterface( msgInt = MessageInterface.MessageInterface(
...@@ -50,7 +49,9 @@ def submit_subjob(job): ...@@ -50,7 +49,9 @@ def submit_subjob(job):
msgInt.create_queue(subjob.site,subjob.site) msgInt.create_queue(subjob.site,subjob.site)
# serialize subjob for message # serialize subjob for message
body = django_serializers.serialize('json',[subjob]) body = subjob.serialize()
logger.debug('sending job message: \n' + body)
# submit job # submit job
msgInt.send_msg(body,subjob.site) msgInt.send_msg(body,subjob.site)
...@@ -64,7 +65,7 @@ def submit_subjob(job): ...@@ -64,7 +65,7 @@ def submit_subjob(job):
except Exception,e: except Exception,e:
message = ('Exception received while submitting subjob to ' message = ('Exception received while submitting subjob to '
+ subjob.site + ' for job pk=' + str(job.pk) + ' argo_id=' + subjob.site + ' for job pk=' + str(job.pk) + ' argo_id='
+ str(job.argo_job_id) + ': ' + str(e)) + str(job.job_id) + ': ' + str(e))
logger.exception(message) logger.exception(message)
job.state = SUBJOB_SUBMIT_FAILED.name job.state = SUBJOB_SUBMIT_FAILED.name
...@@ -127,7 +128,7 @@ def make_history(job): ...@@ -127,7 +128,7 @@ def make_history(job):
def send_status_message(job,message=None): def send_status_message(job,message=None):
''' this function sends status messages back to the users via email and message queue ''' ''' this function sends status messages back to the users via email and message queue '''
logger.debug('in send_status_message pk=' + str(job.pk) + ' argo_job_id='+str(job.argo_job_id)) logger.debug('in send_status_message pk=' + str(job.pk) + ' job_id='+str(job.job_id))
try: try:
receiver = '' receiver = ''
if len(job.email) > 0 and '@' in job.email: if len(job.email) > 0 and '@' in job.email:
...@@ -164,7 +165,7 @@ def send_status_message(job,message=None): ...@@ -164,7 +165,7 @@ def send_status_message(job,message=None):
msg = ArgoJobStatus.ArgoJobStatus() msg = ArgoJobStatus.ArgoJobStatus()
msg.state = job.state msg.state = job.state
msg.message = message msg.message = message
msg.job_id = job.argo_job_id msg.job_id = job.job_id
mi = MessageInterface.MessageInterface() mi = MessageInterface.MessageInterface()
mi.host = settings.RABBITMQ_SERVER_NAME mi.host = settings.RABBITMQ_SERVER_NAME
mi.port = settings.RABBITMQ_SERVER_PORT mi.port = settings.RABBITMQ_SERVER_PORT
...@@ -292,7 +293,7 @@ class SubJobIndexOutOfRange(Exception): pass ...@@ -292,7 +293,7 @@ class SubJobIndexOutOfRange(Exception): pass
class ArgoJob(models.Model): class ArgoJob(models.Model):
# ARGO DB table columns # ARGO DB table columns
argo_job_id = models.BigIntegerField(default=0) job_id = models.BigIntegerField(default=0)
user_id = models.BigIntegerField(default=0) user_id = models.BigIntegerField(default=0)
name = models.TextField(default='') name = models.TextField(default='')
description = models.TextField(default='') description = models.TextField(default='')
...@@ -333,13 +334,13 @@ class ArgoJob(models.Model): ...@@ -333,13 +334,13 @@ class ArgoJob(models.Model):
def get_line_string(self): def get_line_string(self):
format = " %10i | %20i | %20s | %35s | %15s | %20s " format = " %10i | %20i | %20s | %35s | %15s | %20s "
output = format % (self.pk,self.argo_job_id,self.state,str(self.time_modified),self.username,self.subjob_pk_list) output = format % (self.pk,self.job_id,self.state,str(self.time_modified),self.username,self.subjob_pk_list)
return output return output
@staticmethod @staticmethod
def get_header(): def get_header():
format = " %10s | %20s | %20s | %35s | %15s | %20s " format = " %10s | %20s | %20s | %35s | %15s | %20s "
output = format % ('pk','argo_job_id','state','time_modified','username','subjob_pk_list') output = format % ('pk','job_id','state','time_modified','username','subjob_pk_list')
return output return output
@staticmethod @staticmethod
...@@ -348,10 +349,10 @@ class ArgoJob(models.Model): ...@@ -348,10 +349,10 @@ class ArgoJob(models.Model):
# so grabing the number of microseconds # so grabing the number of microseconds
job_id = int(time.time()*1e6) job_id = int(time.time()*1e6)
# make sure no jobs with the same job_id # make sure no jobs with the same job_id
same_jobs = ArgoJob.objects.filter(argo_job_id=job_id) same_jobs = ArgoJob.objects.filter(job_id=job_id)
while len(same_jobs) > 0: while len(same_jobs) > 0:
job_id = int(time.time()*1e6) job_id = int(time.time()*1e6)
same_jobs = ArgoJob.objects.filter(argo_job_id=job_id) same_jobs = ArgoJob.objects.filter(job_id=job_id)
return job_id return job_id
def delete(self,delete_subjobs=True): def delete(self,delete_subjobs=True):
...@@ -388,7 +389,7 @@ class ArgoSubJob(models.Model): ...@@ -388,7 +389,7 @@ class ArgoSubJob(models.Model):
name = models.TextField(default='') name = models.TextField(default='')
description = models.TextField(default='') description = models.TextField(default='')
subjob_id = models.BigIntegerField(default=0) subjob_id = models.BigIntegerField(default=0)
argo_job_id = models.BigIntegerField(default=0) job_id = models.BigIntegerField(default=0)
queue = models.TextField(default=settings.BALSAM_DEFAULT_QUEUE) queue = models.TextField(default=settings.BALSAM_DEFAULT_QUEUE)
project = models.TextField(default=settings.BALSAM_DEFAULT_PROJECT) project = models.TextField(default=settings.BALSAM_DEFAULT_PROJECT)
...@@ -424,7 +425,7 @@ class ArgoSubJob(models.Model): ...@@ -424,7 +425,7 @@ class ArgoSubJob(models.Model):
def get_line_string(self): def get_line_string(self):
format = ' %10i | %20i | %20i | %10s | %20s | %10i | %10i | %10s | %10s | %10s | %15s ' format = ' %10i | %20i | %20i | %10s | %20s | %10i | %10i | %10s | %10s | %10s | %15s '
output = format % (self.pk,self.subjob_id,self.argo_job_id,self.state,self.site, output = format % (self.pk,self.subjob_id,self.job_id,self.state,self.site,
self.num_nodes,self.processes_per_node,self.scheduler_id,self.queue, self.num_nodes,self.processes_per_node,self.scheduler_id,self.queue,
self.project,self.application) self.project,self.application)
return output return output
...@@ -432,7 +433,7 @@ class ArgoSubJob(models.Model): ...@@ -432,7 +433,7 @@ class ArgoSubJob(models.Model):
@staticmethod @staticmethod
def get_header(): def get_header():
format = ' %10s | %20s | %20s | %10s | %20s | %10s | %10s | %10s | %10s | %10s | %15s ' format = ' %10s | %20s | %20s | %10s | %20s | %10s | %10s | %10s | %10s | %10s | %15s '
output = format % ('pk','subjob_id','argo_job_id','state','site', output = format % ('pk','subjob_id','job_id','state','site',
'num_nodes','procs','sched_id','queue','project','application') 'num_nodes','procs','sched_id','queue','project','application')
return output return output
''' '''
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment