Commit 0a3dc20a authored by jtchilders's avatar jtchilders
Browse files

change balsam_job_id to job_id, implemented self serializing functions.

parent e286f589
......@@ -2,7 +2,7 @@
# ------------- BalsamJob Transitions -------------------
import multiprocessing,os,time,logging,sys
import multiprocessing,os,time,logging,sys,datetime
logger = logging.getLogger(__name__)
from django.db import utils,connections,DEFAULT_DB_ALIAS
......@@ -11,7 +11,7 @@ from django.conf import settings
from balsam import BalsamJobStatus
from common import transfer,MessageInterface,run_subprocess
from common import log_uncaught_exceptions,db_tools
from common import log_uncaught_exceptions,db_tools,Serializer
from balsam import scheduler,BalsamJobMessage
from schedulers import exceptions
......@@ -133,7 +133,7 @@ def submit(job):
logger.error(message)
job.state = SUBMIT_DISABLED.name
except Exception,e:
message = 'received exception while calling scheduler submit for job ' + str(job.balsam_job_id) + ', exception: ' + str(e)
message = 'received exception while calling scheduler submit for job ' + str(job.job_id) + ', exception: ' + str(e)
logger.exception(message)
job.state = SUBMIT_FAILED.name
......@@ -212,7 +212,7 @@ def send_status_message(job,message=''):
p.send_msg(statmsg.serialize(), settings.RABBITMQ_BALSAM_JOB_STATUS_ROUTING_KEY)
p.close()
except Exception,e:
logger.exception('job(pk='+str(job.pk)+',id='+str(job.balsam_job_id)+
logger.exception('job(pk='+str(job.pk)+',id='+str(job.job_id)+
'): Failed to send BalsamJobStatus message, received exception')
# -------- Job States ------------------------
......@@ -292,13 +292,13 @@ class BalsamJob(models.Model):
''' A DB representation of a Balsam Job '''
# a unique job id
balsam_job_id = models.BigIntegerField('Balsam Job ID',help_text='A unique id used by the Balsam service.',default=0)
site = models.TextField('Balsam Site Name',help_text='The name of the computer system, supercomputer, or location where Balsam is running.',default='')
job_id = models.BigIntegerField('Job ID',help_text='A unique id for this job.',default=0)
site = models.TextField('Site Name',help_text='The name of the computer system, supercomputer, or location where Balsam is running.',default='')
# an arbitrary name, this is here for the user
name = models.TextField('Job Name',help_text='A name for the job given by the user.',default='')
description = models.TextField('Job Description',help_text='A description of the job.',default='')
origin_id = models.BigIntegerField('Origin ID',help_text='The ID provided to balsam for this job.',default=0)
argo_job_id = models.BigIntegerField('Origin ID',help_text='The ID of the Argo job to which this subjob belongs. Can be set to 0 if there is no Argo job set.',default=0)
# scheduler specific attributes
queue = models.TextField('Scheduler Queue',help_text='The local scheduler queue to which to submit jobs.',default=settings.BALSAM_DEFAULT_QUEUE)
......@@ -326,12 +326,43 @@ class BalsamJob(models.Model):
time_job_started = models.DateTimeField('Job Run Start Time',help_text='The time when the job started running.',null=True)
time_job_finished = models.DateTimeField('Job Finish Time',help_text='The time at which the job stopped running.',null=True)
SERIAL_FIELDS = [
'job_id',
'site',
'name',
'description',
'argo_job_id',
'queue',
'project',
'wall_time_minutes',
'num_nodes',
'processes_per_node',
'scheduler_config',
'scheduler_id',
'application',
'config_file',
'state',
'working_directory',
'input_url',
'output_url',
'time_created',
'time_modified',
'time_job_started',
'time_job_finished',
]
DATETIME_FIELDS = [
'time_created',
'time_modified',
'time_job_started',
'time_job_finished',
]
def __str__(self):
s = 'BalsamJob: ' + str(self.balsam_job_id) + '\n'
s = 'BalsamJob: ' + str(self.job_id) + '\n'
s += ' site: ' + self.site + '\n'
s += ' name: ' + self.name + '\n'
s += ' description: ' + self.description + '\n'
s += ' origin_id: ' + str(self.origin_id) + '\n'
s += ' argo_job_id: ' + str(self.argo_job_id) + '\n'
s += ' queue: ' + self.queue + '\n'
s += ' project: ' + self.project + '\n'
s += ' wall_time_minutes: ' + str(self.wall_time_minutes) + '\n'
......@@ -354,7 +385,7 @@ class BalsamJob(models.Model):
def get_line_string(self):
format = ' %7i | %18i | %18i | %15s | %20s | %9i | %8i | %10s | %10s | %10s | %15s '
output = format % (self.pk,self.balsam_job_id,self.origin_id,self.state,self.site,
output = format % (self.pk,self.job_id,self.argo_job_id,self.state,self.site,
self.num_nodes,self.processes_per_node,self.scheduler_id,self.queue,
self.project,self.application)
return output
......@@ -362,7 +393,7 @@ class BalsamJob(models.Model):
@staticmethod
def get_header():
format = ' %7s | %18s | %18s | %15s | %20s | %9s | %8s | %10s | %10s | %10s | %15s '
output = format % ('pk','balsam_job_id','origin_id','state','site',
output = format % ('pk','job_id','argo_job_id','state','site',
'num_nodes','procs','sched_id','queue','project','application')
return output
......@@ -379,27 +410,27 @@ class BalsamJob(models.Model):
def generate_job_id():
# time.time() is a double with units seconds
# so grabing the number of microseconds
balsam_job_id = int(time.time()*1e6)
job_id = int(time.time()*1e6)
# make sure no jobs with the same job_id
same_jobs = BalsamJob.objects.filter(balsam_job_id=balsam_job_id)
same_jobs = BalsamJob.objects.filter(job_id=job_id)
while len(same_jobs) > 0:
job_id = int(time.time()*1e6)
same_jobs = BalsamJob.objects.filter(balsam_job_id=balsam_job_id)
return balsam_job_id
same_jobs = BalsamJob.objects.filter(job_id=job_id)
return job_id
@staticmethod
def create_working_path(balsam_job_id):
path = os.path.join(settings.BALSAM_WORK_DIRECTORY,str(balsam_job_id))
def create_working_path(job_id):
path = os.path.join(settings.BALSAM_WORK_DIRECTORY,str(job_id))
try:
os.makedirs(path)
except:
logger.error(' Received exception while making job working directory: ' + str(sys.exc_info()[1]))
logger.exception(' Received exception while making job working directory: ')
raise
return path
def get_balsam_job_message(self):
msg = BalsamJobMessage.BalsamJobMessage()
msg.origin_id = self.subjob_id
msg.argo_job_id = self.subjob_id
msg.site = self.site
msg.name = self.name
msg.description = self.description
......@@ -414,6 +445,21 @@ class BalsamJob(models.Model):
msg.input_url = self.input_url
msg.output_url = self.output_url
return msg
def serialize(self):
d = {}
for field in BalsamJob.SERIAL_FIELDS:
d[field] = self.__dict__[field]
return Serializer.serialize(d)
def deserialize(self,serial_data):
d = Serializer.deserialize(serial_data)
for field,value in d.iteritems():
if field in DATETIME_FIELDS and value != None:
self.__dict__[field] = datetime.datetime.strptime(value,"%Y-%m-%d %H:%M:%S %z")
else:
self.__dict__[field] = value
class ApplicationDefinition(models.Model):
''' application definition, each DB entry is a task that can be run
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment