models.py 17.4 KB
Newer Older
jtchilders's avatar
jtchilders committed
1 2 3 4 5 6 7 8 9 10 11

#-----------  ArgoJob Transitions ---------------

import multiprocessing,sys,logging
logger = logging.getLogger(__name__)

from django.db import utils,connections,DEFAULT_DB_ALIAS,models
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from django.core.validators import validate_comma_separated_integer_list

12 13 14 15 16
from balsam.argo import QueueMessage,ArgoJobStatus
from balsam.common import log_uncaught_exceptions,MessageInterface
from balsam.common import Serializer,transfer,Mail,db_tools
from balsam.service.models import BalsamJob
from balsam.service.models import STATES_BY_NAME as BALSAM_STATES_BY_NAME
jtchilders's avatar
jtchilders committed
17 18 19 20 21 22

# assign this function to the system exception hook
sys.excepthook = log_uncaught_exceptions.log_uncaught_exceptions

    
def submit_subjob(job):
23
   logger.debug('in submit_subjob pk=' + str(job.pk) + ' job_id='+str(job.job_id))
jtchilders's avatar
jtchilders committed
24 25 26 27 28 29 30 31 32
   message = 'Subjob submitted'
   try:
      # get the current subjob
      subjob = job.get_current_subjob()

      # use subjob to fill BalsamJobMessage that will be sent to balsam
      #balsamJobMsg = subjob.get_balsam_job_message()
     
      # determine site name
33 34
      logger.info('Submitting Subjob ' + str(subjob.job_id) + ' from ArgoJob ' 
            + str(subjob.job_id) + ' (pk=' + str(job.pk) + ') to ' + subjob.site )
jtchilders's avatar
jtchilders committed
35 36 37 38 39 40 41 42 43 44 45 46 47

      # create and configure message interface
      msgInt = MessageInterface.MessageInterface(
                host          = settings.RABBITMQ_SERVER_NAME,
                port          = settings.RABBITMQ_SERVER_PORT,
                exchange_name = settings.RABBITMQ_BALSAM_EXCHANGE_NAME,
                ssl_cert      = settings.RABBITMQ_SSL_CERT,
                ssl_key       = settings.RABBITMQ_SSL_KEY,
                ssl_ca_certs  = settings.RABBITMQ_SSL_CA_CERTS,
               )
      # opening blocking connection which will close at the end of this function
      msgInt.open_blocking_connection()

jtchilders's avatar
jtchilders committed
48 49 50
      # create message queue for site in case not already done
      msgInt.create_queue(subjob.site,subjob.site)

jtchilders's avatar
jtchilders committed
51
      # serialize subjob for message
52 53 54
      body = subjob.serialize()

      logger.debug('sending job message: \n' + body)
jtchilders's avatar
jtchilders committed
55 56 57 58 59 60 61 62 63 64

      # submit job
      msgInt.send_msg(body,subjob.site)
      # close connection
      msgInt.close()
      
      job.state = SUBJOB_SUBMITTED.name
   except SubJobIndexOutOfRange:
      message = 'All Subjobs Completed'
      job.state = SUBJOBS_COMPLETED.name
65
   except Exception as e:
jtchilders's avatar
jtchilders committed
66 67
      message = ('Exception received while submitting subjob to ' 
         + subjob.site + ' for job pk=' + str(job.pk) + ' argo_id=' 
68
         + str(job.job_id) + ': ' + str(e))
jtchilders's avatar
jtchilders committed
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
      logger.exception(message)
      job.state = SUBJOB_SUBMIT_FAILED.name

   job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
   send_status_message(job,message)

def increment_subjob(job):
   ''' increments subjob index '''
   logger.debug('in increment subjob pk='+str(job.pk))
   message = 'subjob incremented'
   job.current_subjob_pk_index += 1
   logger.debug(' setting current_subjob_pk_index = ' + str(job.current_subjob_pk_index))
   job.state = SUBJOB_INCREMENTED.name
   job.save(update_fields=['state','current_subjob_pk_index'],
            using=db_tools.get_db_connection_id(job.pk))


def stage_in(job):
   ''' stages data in from the user if an input_url is specified '''
   logger.debug('in stage_in pk=' + str(job.pk))
   message = 'Job staged in'
   if job.input_url != '':
      try:
         transfer.stage_in(job.input_url + '/',job.working_directory + '/')
jtchilders's avatar
jtchilders committed
93
         job.state = STAGED_IN.name
94
      except Exception as e:
jtchilders's avatar
jtchilders committed
95
         message = 'Exception received during stage_in: ' + str(e)
jtchilders's avatar
jtchilders committed
96
         logger.exception(message)
jtchilders's avatar
jtchilders committed
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
         job.state = STAGE_IN_FAILED.name
   else:
      # no input url specified so stage in is complete
      job.state = STAGED_IN.name
   
   job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))


def stage_out(job):
   ''' stages data out to the user if an output_url is specified '''
   logger.debug('in stage_out pk=' + str(job.pk))
   message = 'Job staged out'
   if job.output_url != '':
      try:
         transfer.stage_out(str(job.working_directory) + '/', str(job.output_url) + '/')
         job.state = STAGED_OUT.name
113
      except Exception as e:
jtchilders's avatar
jtchilders committed
114
         message = 'Exception received during stage_out: ' + str(e)
jtchilders's avatar
jtchilders committed
115
         logger.exception(message)
jtchilders's avatar
jtchilders committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
         job.state = STAGE_OUT_FAILED.name
   else:
      # no input url specified so stage in is complete
      job.state = STAGED_OUT.name

   job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))


def make_history(job):
   logger.debug('job ' + str(job.pk) + ' in make_history ')
   job.state = HISTORY.name
   job.save(update_fields=['state'],using=db_tools.get_db_connection_id(job.pk))
      
def send_status_message(job,message=None):
   ''' this function sends status messages back to the users via email and message queue '''
131
   logger.debug('in send_status_message pk=' + str(job.pk) + ' job_id='+str(job.job_id))
jtchilders's avatar
jtchilders committed
132 133 134 135 136 137 138 139 140
   try:
      receiver = ''
      if len(job.email) > 0 and '@' in job.email:
         receiver = job.email
      else:
         logger.warning(' no email address specified, not sending mail, email=' + str(job.email))
         return
      
      # construct body of email
jtchilders's avatar
jtchilders committed
141
      body = ' Your job has reached state ' + job.state + '\n'
jtchilders's avatar
jtchilders committed
142 143 144 145
      if message is not None:
        body += '    with the message: ' + str(message)
      body += '------------------------------------------------------------------- \n'
      body += 'Job Data: \n'
jtchilders's avatar
jtchilders committed
146
      body += django_serializers.serialize('json',[job])
jtchilders's avatar
jtchilders committed
147 148
      body += '------------------------------------------------------------------- \n'
      body += 'Subjob Data: \n'
jtchilders's avatar
jtchilders committed
149
      body += django_serializers.serialize('json',ArgoSubJob.objects.filter(pk__in=Serializer.deserialize(job.subjob_pk_list)))
jtchilders's avatar
jtchilders committed
150 151
      
      # send notification email to user
jtchilders's avatar
jtchilders committed
152 153
      Mail.send_mail(
                sender    = settings.ARGO_JOB_STATUS_EMAIL_SENDER,
jtchilders's avatar
jtchilders committed
154 155 156 157
                receiver  = receiver,
                subject   = 'ARGO Job Status Report',
                body      = body,
               )
158
   except Exception as e:
jtchilders's avatar
jtchilders committed
159
      logger.exception('exception received while trying to send status email. Exception: ' + str(e))
jtchilders's avatar
jtchilders committed
160 161

   # if job has an argo job status routing key, send a message there
jtchilders's avatar
jtchilders committed
162 163
   if job.job_status_routing_key != '' and send_status_message:
      logger.info('sending job status message with routing key: ' + job.job_status_routing_key)
jtchilders's avatar
jtchilders committed
164
      try:
jtchilders's avatar
jtchilders committed
165 166
         msg = ArgoJobStatus.ArgoJobStatus()
         msg.state = job.state
jtchilders's avatar
jtchilders committed
167
         msg.message = message
168
         msg.job_id = job.job_id
jtchilders's avatar
jtchilders committed
169
         mi                = MessageInterface.MessageInterface()
jtchilders's avatar
jtchilders committed
170 171 172 173 174 175 176 177 178
         mi.host           = settings.RABBITMQ_SERVER_NAME
         mi.port           = settings.RABBITMQ_SERVER_PORT
         mi.exchange_name  = settings.RABBITMQ_USER_EXCHANGE_NAME

         mi.ssl_cert       = settings.RABBITMQ_SSL_CERT
         mi.ssl_key        = settings.RABBITMQ_SSL_KEY
         mi.ssl_ca_certs   = settings.RABBITMQ_SSL_CA_CERTS
         logger.debug( ' open blocking connection to send status message ' )
         mi.open_blocking_connection()
jtchilders's avatar
jtchilders committed
179
         mi.send_msg(msg.get_serialized_message(),job.job_status_routing_key)
jtchilders's avatar
jtchilders committed
180 181 182 183 184 185 186 187
         mi.close()
      except:
         logger.exception('Exception while sending status message to user job queue')



# ------------  Job States ----------------------------

188
from balsam.common.JobState import JobState
jtchilders's avatar
jtchilders committed
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295

# Job States
CREATE_FAILED              = JobState('CREATE_FAILED')
CREATED                    = JobState('CREATED',CREATE_FAILED,stage_in)
STAGE_IN_FAILED            = JobState('STAGE_IN_FAILED')
STAGED_IN                  = JobState('STAGED_IN',STAGE_IN_FAILED,submit_subjob)

SUBJOB_SUBMITTED           = JobState('SUBJOB_SUBMITTED')
SUBJOB_SUBMIT_FAILED       = JobState('SUBJOB_SUBMIT_FAILED')
SUBJOB_IN_PREPROCESS       = JobState('SUBJOB_IN_PREPROCESS')
SUBJOB_PREPROCESS_FAILED   = JobState('SUBJOB_PREPROCESS_FAILED')
SUBJOB_QUEUED              = JobState('SUBJOB_QUEUED')
SUBJOB_RUNNING             = JobState('SUBJOB_RUNNING')
SUBJOB_RUN_FINISHED        = JobState('SUBJOB_RUN_FINISHED')
SUBJOB_RUN_FAILED          = JobState('SUBJOB_RUN_FAILED')
SUBJOB_IN_POSTPROCESS      = JobState('SUBJOB_IN_POSTPROCESS')
SUBJOB_POSTPROCESS_FAILED  = JobState('SUBJOB_POSTPROCESS_FAILED')

SUBJOB_COMPLETE_FAILED     = JobState('SUBJOB_COMPLETE_FAILED')
SUBJOB_COMPLETED           = JobState('SUBJOB_COMPLETED',SUBJOB_COMPLETE_FAILED,increment_subjob)
SUBJOB_REJECTED            = JobState('SUBJOB_REJECTED')

SUBJOB_INCREMENT_FAILED    = JobState('SUBJOB_INCREMENT_FAILED')
SUBJOB_INCREMENTED         = JobState('SUBJOB_INCREMENTED',SUBJOB_INCREMENT_FAILED,submit_subjob)

SUBJOBS_COMPLETED          = JobState('SUBJOBS_COMPLETED',stage_out)

STAGE_OUT_FAILED           = JobState('STAGE_OUT_FAILED')
STAGED_OUT                 = JobState('STAGED_OUT',STAGE_OUT_FAILED,make_history)
HISTORY                    = JobState('HISTORY')
FAILED                     = JobState('FAILED')
REJECTED                   = JobState('REJECTED')


STATES  = [
   CREATED,
   CREATE_FAILED,
   STAGED_IN,
   STAGE_IN_FAILED,

   SUBJOB_SUBMITTED,
   SUBJOB_SUBMIT_FAILED,
   SUBJOB_IN_PREPROCESS,
   SUBJOB_PREPROCESS_FAILED,
   SUBJOB_QUEUED,
   SUBJOB_RUNNING,
   SUBJOB_RUN_FINISHED,
   SUBJOB_RUN_FAILED,
   SUBJOB_IN_POSTPROCESS,
   SUBJOB_POSTPROCESS_FAILED,
   SUBJOB_COMPLETED,
   SUBJOB_COMPLETE_FAILED,
   SUBJOB_REJECTED,
   SUBJOB_INCREMENTED,
   SUBJOB_INCREMENT_FAILED,
   SUBJOBS_COMPLETED,

   STAGED_OUT,
   STAGE_OUT_FAILED,
   HISTORY,
   FAILED,
   REJECTED,
]

TRANSITIONABLE_STATES = []
for state in STATES:
   if state.transition_function is not None:
      TRANSITIONABLE_STATES.append(state.name)

STATES_BY_NAME = { x.name:x for x in STATES }

BALSAM_JOB_TO_SUBJOB_STATE_MAP = {
   BALSAM_STATES_BY_NAME['CREATED'].name:SUBJOB_IN_PREPROCESS,
   BALSAM_STATES_BY_NAME['CREATE_FAILED'].name:SUBJOB_PREPROCESS_FAILED,
   BALSAM_STATES_BY_NAME['STAGED_IN'].name:SUBJOB_IN_PREPROCESS,
   BALSAM_STATES_BY_NAME['STAGE_IN_FAILED'].name:SUBJOB_PREPROCESS_FAILED,
   BALSAM_STATES_BY_NAME['PREPROCESSED'].name:SUBJOB_IN_PREPROCESS,
   BALSAM_STATES_BY_NAME['PREPROCESS_FAILED'].name:SUBJOB_PREPROCESS_FAILED,
   BALSAM_STATES_BY_NAME['SUBMITTED'].name:SUBJOB_IN_PREPROCESS,
   BALSAM_STATES_BY_NAME['SUBMIT_FAILED'].name:SUBJOB_PREPROCESS_FAILED,
   BALSAM_STATES_BY_NAME['SUBMIT_DISABLED'].name:SUBJOB_COMPLETED,

   BALSAM_STATES_BY_NAME['QUEUED'].name:SUBJOB_QUEUED,
   BALSAM_STATES_BY_NAME['RUNNING'].name:SUBJOB_RUNNING,
   BALSAM_STATES_BY_NAME['EXECUTION_FINISHED'].name:SUBJOB_RUN_FINISHED,
   BALSAM_STATES_BY_NAME['EXECUTION_FAILED'].name:SUBJOB_RUN_FAILED,

   BALSAM_STATES_BY_NAME['POSTPROCESSED'].name:SUBJOB_IN_POSTPROCESS,
   BALSAM_STATES_BY_NAME['POSTPROCESS_FAILED'].name:SUBJOB_POSTPROCESS_FAILED,
   BALSAM_STATES_BY_NAME['STAGED_OUT'].name:SUBJOB_IN_POSTPROCESS,
   BALSAM_STATES_BY_NAME['STAGE_OUT_FAILED'].name:SUBJOB_POSTPROCESS_FAILED,

   BALSAM_STATES_BY_NAME['JOB_FINISHED'].name:SUBJOB_COMPLETED,
   BALSAM_STATES_BY_NAME['JOB_FAILED'].name:SUBJOB_COMPLETE_FAILED,
   BALSAM_STATES_BY_NAME['JOB_REJECTED'].name:SUBJOB_REJECTED,
}


# -------------   ArgoJob DB Object ----------------------

import time,os,shutil
#from django.db import models

class SubJobIndexOutOfRange(Exception): pass
class ArgoJob(models.Model):
   
   # ARGO DB table columns
296
   job_id                  = models.BigIntegerField(default=0)
jtchilders's avatar
jtchilders committed
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
   user_id                 = models.BigIntegerField(default=0)
   name                    = models.TextField(default='')
   description             = models.TextField(default='')
   group_identifier        = models.TextField(default='')

   working_directory       = models.TextField(default='')
   time_created            = models.DateTimeField(auto_now_add=True)
   time_modified           = models.DateTimeField(auto_now=True)
   time_finished           = models.DateTimeField(null=True)
   state                   = models.TextField(default=CREATED.name)
   username                = models.TextField(default='')
   email                   = models.TextField(default='')

   input_url               = models.TextField(default='')
   output_url              = models.TextField(default='')

   subjob_pk_list          = models.TextField(default='',validators=[validate_comma_separated_integer_list])
   current_subjob_pk_index = models.IntegerField(default=0)

   job_status_routing_key  = models.TextField(default='')

   def get_current_subjob(self):
jtchilders's avatar
jtchilders committed
319
      subjob_list = self.get_subjob_pk_list()
jtchilders's avatar
jtchilders committed
320 321 322 323 324 325 326
      if self.current_subjob_pk_index < len(subjob_list):
         logger.debug('getting subjob index ' + str(self.current_subjob_pk_index) + ' of ' + str(len(subjob_list)))
         return ArgoSubJob.objects.get(pk=subjob_list[self.current_subjob_pk_index])
      else:
         logger.debug('current_subjob_pk_index=' + str(self.current_subjob_pk_index) + ' number of subjobs = ' + str(len(subjob_list)) + ' subjobs = ' + str(subjob_list))
         raise SubJobIndexOutOfRange

jtchilders's avatar
jtchilders committed
327 328 329 330 331
   def add_subjob(self,subjob):
      subjob_list = self.get_subjob_pk_list()
      subjob_list.append(subjob.pk)
      self.subjob_pk_list = Serializer.serialize(subjob_list)

jtchilders's avatar
jtchilders committed
332 333 334 335 336
   def get_subjob_pk_list(self):
      return Serializer.deserialize(self.subjob_pk_list)

   def get_line_string(self):
      format = " %10i | %20i | %20s | %35s | %15s | %20s "
337
      output = format % (self.pk,self.job_id,self.state,str(self.time_modified),self.username,self.subjob_pk_list)
jtchilders's avatar
jtchilders committed
338 339 340 341 342
      return output

   @staticmethod
   def get_header():
      format = " %10s | %20s | %20s | %35s | %15s | %20s "
343
      output = format % ('pk','job_id','state','time_modified','username','subjob_pk_list')
jtchilders's avatar
jtchilders committed
344 345 346 347 348 349 350 351
      return output

   @staticmethod
   def generate_job_id():
      # time.time() is a double with units seconds
      # so grabing the number of microseconds
      job_id = int(time.time()*1e6)
      # make sure no jobs with the same job_id
352
      same_jobs = ArgoJob.objects.filter(job_id=job_id)
jtchilders's avatar
jtchilders committed
353 354
      while len(same_jobs) > 0:
         job_id = int(time.time()*1e6)
355
         same_jobs = ArgoJob.objects.filter(job_id=job_id)
jtchilders's avatar
jtchilders committed
356 357
      return job_id

358
   def delete(self,delete_subjobs=True):
jtchilders's avatar
jtchilders committed
359 360 361 362 363
      # delete local argo job path
      if os.path.exists(self.working_directory):
         try:
            shutil.rmtree(self.working_directory)
            logger.info('removed job path: ' + str(self.working_directory))
364
         except Exception as e:
jtchilders's avatar
jtchilders committed
365 366
            logger.error('Error trying to remove argo job path: ' + str(self.working_directory) + ' Exception: ' + str(e))

367 368 369 370 371 372
      # delete subjobs
      if delete_subjobs:
         subjobs = ArgoSubJob.objects.filter(pk__in=self.get_subjob_pk_list())
         for subjob in subjobs:
            subjob.delete()

jtchilders's avatar
jtchilders committed
373 374 375
      # call base class delete function
      try:
         super(ArgoJob,self).delete()
376
      except Exception as e:
jtchilders's avatar
jtchilders committed
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
         logger.error('pk='+str(self.pk) + ' Received exception during "delete": ' + str(e))

# must do this to force django to create a DB table for ARGO independent of the one created for Balsam
class ArgoSubJob(BalsamJob): pass

'''
class ArgoSubJob(models.Model):

   # ArgoSubJob DB table columns
   site                    = models.TextField(default='')
   state                   = models.TextField(default='PRESUBMIT')

   name                    = models.TextField(default='')
   description             = models.TextField(default='')
   subjob_id               = models.BigIntegerField(default=0)
392
   job_id             = models.BigIntegerField(default=0)
jtchilders's avatar
jtchilders committed
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427

   queue                   = models.TextField(default=settings.BALSAM_DEFAULT_QUEUE)
   project                 = models.TextField(default=settings.BALSAM_DEFAULT_PROJECT)
   wall_time_minutes       = models.IntegerField(default=0)
   num_nodes               = models.IntegerField(default=0)
   processes_per_node      = models.IntegerField(default=0)
   scheduler_config        = models.TextField(default='None')
   scheduler_id            = models.IntegerField(default=0)
   
   application             = models.TextField(default='')
   config_file             = models.TextField(default='')

   input_url               = models.TextField(default='')
   output_url              = models.TextField(default='')

   def get_balsam_job_message(self):
      msg = BalsamJobMessage.BalsamJobMessage()
      msg.origin_id           = self.subjob_id
      msg.site                = self.site
      msg.name                = self.name
      msg.description         = self.description
      msg.queue               = self.queue
      msg.project             = self.project
      msg.wall_time_minutes   = self.wall_time_minutes
      msg.num_nodes           = self.num_nodes
      msg.processes_per_node  = self.processes_per_node
      msg.scheduler_config    = self.scheduler_config
      msg.application         = self.application
      msg.config_file         = self.config_file
      msg.input_url           = self.input_url
      msg.output_url          = self.output_url
      return msg

   def get_line_string(self):
      format = ' %10i | %20i | %20i | %10s | %20s | %10i | %10i | %10s | %10s | %10s | %15s '
428
      output = format % (self.pk,self.subjob_id,self.job_id,self.state,self.site,
jtchilders's avatar
jtchilders committed
429 430 431 432 433 434 435
            self.num_nodes,self.processes_per_node,self.scheduler_id,self.queue,
            self.project,self.application)
      return output

   @staticmethod
   def get_header():
      format = ' %10s | %20s | %20s | %10s | %20s | %10s | %10s | %10s | %10s | %10s | %15s '
436
      output = format % ('pk','subjob_id','job_id','state','site',
jtchilders's avatar
jtchilders committed
437 438 439
            'num_nodes','procs','sched_id','queue','project','application')
      return output
'''