balsam_service.py 8.59 KB
Newer Older
Michael Salim's avatar
Michael Salim committed
1
import os,sys,logging,multiprocessing,queue,traceback
jtchilders's avatar
jtchilders committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
logger = logging.getLogger(__name__)

from django.core.management.base import BaseCommand, CommandError
from django.conf import settings

from balsam import models,BalsamJobReceiver,QueueMessage
from common import DirCleaner,log_uncaught_exceptions,TransitionJob
from balsam import scheduler
from balsam.schedulers import exceptions,jobstates

# assign this function to the system exception hook
sys.excepthook = log_uncaught_exceptions.log_uncaught_exceptions

class Command(BaseCommand):
   help = 'Start Balsam Service, which monitors the message queue for new jobs and submits them to the local batch system.'
   logger.info('''
   >>>>>      Starting Balsam Service                 <<<<<  
   >>>>>      pid: ''' + str(os.getpid()) + '''       <<<<<
      ''')

   def handle(self, *args, **options):

      try:
         
         logger.debug('starting BalsamJobReceiver')
         subprocesses = {}
         # start the balsam job receiver in separate thread
         try:
            p = BalsamJobReceiver.BalsamJobReceiver()
            p.start()
            subprocesses['BalsamJobReceiver'] = p
Michael Salim's avatar
Michael Salim committed
33
         except Exception as e:
jtchilders's avatar
jtchilders committed
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
             logger.exception(' Received Exception while trying to start job receiver: ' + str(e))
         
         # setup timer for cleaning the work folder of old files
         logger.debug('creating DirCleaner')
         workDirCleaner = DirCleaner.DirCleaner(settings.BALSAM_WORK_DIRECTORY,
                                     settings.BALSAM_DELETE_OLD_WORK_PERIOD,
                                     settings.BALSAM_DELETE_OLD_WORK_AGE,
                                    )

         # create the balsam service queue which subprocesses use to commicate
         # back to the the service. It is also used to wake up the while-loop
         logger.debug('creating balsam_service_queue')
         balsam_service_queue = multiprocessing.Queue()
         jobs_in_transition_by_id = {}

         # this is the loop that never ends, yes it goes on and on my friends...
         while True:
            logger.debug('begin service loop ')


            # loop over queued jobs and check their status
            # also look for jobs that have been submitted but are not in the queued or running state, which 
            # may mean they have finished or exited.
            logger.debug( ' checking for active jobs ')
            active_jobs = models.BalsamJob.objects.filter(state__in = models.CHECK_STATUS_STATES)
            if len(active_jobs) > 0:
               logger.info( 'monitoring ' + str(len(active_jobs)) + ' active jobs')
            else:
               logger.debug(' no active jobs')

            for job in active_jobs:
               # update job's status
               try:
                  jobstate = scheduler.get_job_status(job)
                  if jobstate == jobstates.JOB_RUNNING and job.state != models.RUNNING.name:
                     job.state = models.RUNNING.name
                  elif jobstate == jobstates.JOB_QUEUED and job.state != models.QUEUED.name:
                     job.state = models.QUEUED.name
                  elif jobstate == jobstates.JOB_FINISHED and job.state != models.EXECUTION_FINISHED.name:
                     job.state = models.EXECUTION_FINISHED.name
                     #scheduler.postprocess(job) <<< check on this...
                  else:
                     logger.debug('job pk=' + str(job.pk) + ' remains in state ' + str(jobstate))
                     continue # jump to next job, skip remaining actions
                  job.save(update_fields=['state'])
                  models.send_status_message(job,'Job entered ' + job.state + ' state')
Michael Salim's avatar
Michael Salim committed
80
               except exceptions.JobStatusFailed as e:
jtchilders's avatar
jtchilders committed
81 82 83 84
                  message = 'get_job_status failed for pk='+str(job.pk)+': ' + str(e)
                  logger.error(message)
                  # TODO: Should I fail the job?
                  models.send_status_message(job,message)
Michael Salim's avatar
Michael Salim committed
85
               except Exception as e:
jtchilders's avatar
jtchilders committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
                  message = 'failed to get status for pk='+str(job.pk)+', exception: ' + str(e)
                  logger.error(message)
                  # TODO: Should I fail the job?
                  models.send_status_message(job,message)



            # first loop over jobs in transition and remove entries that are complete
            for pk in jobs_in_transition_by_id.keys():
               proc = jobs_in_transition_by_id[pk]
               if not proc.is_alive():
                  # did subprocess exit cleanly with exitcode == 0
                  if proc.exitcode != 0:
                     logger.error('transition subprocess for  pk=' + str(pk) 
                               + ' returned exit code ' + str(proc.exitcode))
                     # probably want to do other things to recover from error?
                  del jobs_in_transition_by_id[pk]
                     

            # see if any jobs are ready to transition, but exclude jobs already in transition
            transitionable_jobs = models.BalsamJob.objects.filter(state__in=models.TRANSITIONABLE_STATES).exclude(pk__in=jobs_in_transition_by_id.keys())
            logger.debug( ' found ' + str(len(transitionable_jobs)) + ' in states that need to be transitioned ')
            # loop over jobs and transition
            for job in transitionable_jobs:
               # place a limit on the number of concurrent threads to avoid overloading CPU
               if len(jobs_in_transition_by_id) < settings.BALSAM_MAX_CONCURRENT_TRANSITIONS:
                  logger.debug(' creating job transition ')
                  proc = TransitionJob.TransitionJob(
                           job.pk,
                           balsam_service_queue,
                           models.BalsamJob,
                           models.STATES_BY_NAME[job.state].transition_function
                        )
                  logger.debug(' start ')
                  proc.start()
                  jobs_in_transition_by_id[job.pk] = proc
               else:
                  logger.debug(' too many jobs currently transitioning ' 
                     + str(len(jobs_in_transition_by_id)) + ' and max is ' 
                     + str(settings.BALSAM_MAX_CONCURRENT_TRANSITIONS))

            # clean work directory periodically
            if settings.BALSAM_DELETE_OLD_WORK:
               workDirCleaner.clean()

            # loop over running process and check status
Michael Salim's avatar
Michael Salim committed
132
            for name,proc in subprocesses.items():
jtchilders's avatar
jtchilders committed
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
               if not proc.is_alive():
                  logger.info(' subprocess ' + name + ' has stopped with returncode ' + str(proc.exitcode) )

            # block on getting message from the queue where subprocesses will send messages
            try:
               logger.debug('getting message from queue, blocking for ' 
                            + str(settings.BALSAM_SERVICE_PERIOD) + ' seconds')
               qmsg = balsam_service_queue.get(block=True,timeout=settings.BALSAM_SERVICE_PERIOD)
               # act on messages
               logger.debug('Received queue message code: ' + QueueMessage.msg_codes[qmsg.code])
               logger.debug('Received queue message: ' + qmsg.message)
               if qmsg.code == QueueMessage.TransitionComplete:
                  logger.debug('Transition Succeeded')
               elif qmsg.code == QueueMessage.TransitionDbConnectionFailed:
                  logger.error('Transition DB connection failed: ' + qmsg.message)
                  job = models.BalsamJob.objects.get(pk=qmsg.pk)
Thomas Uram's avatar
Thomas Uram committed
149
                  job.state = models.STATES_BY_NAME[job.state].failed_state
jtchilders's avatar
jtchilders committed
150 151 152 153
                  job.save(update_fields=['state'])
               elif qmsg.code == QueueMessage.TransitionDbRetrieveFailed:
                  logger.error('Transition failed to retrieve job from DB: ' + qmsg.message)
                  job = models.BalsamJob.objects.get(pk=qmsg.pk)
Thomas Uram's avatar
Thomas Uram committed
154
                  job.state = models.STATES_BY_NAME[job.state].failed_state
jtchilders's avatar
jtchilders committed
155 156 157 158
                  job.save(update_fields=['state'])
               elif qmsg.code == QueueMessage.TransitionFunctionException:
                  logger.error('Exception received while running transition function: ' + qmsg.message)
                  job = models.BalsamJob.objects.get(pk=qmsg.pk)
Thomas Uram's avatar
Thomas Uram committed
159
                  job.state = models.STATES_BY_NAME[job.state].failed_state
jtchilders's avatar
jtchilders committed
160 161 162
                  job.save(update_fields=['state'])
               else:
                  logger.error('No recognized QueueMessage code')
Michael Salim's avatar
Michael Salim committed
163
            except queue.Empty as e:
jtchilders's avatar
jtchilders committed
164 165 166 167
               logger.debug('no messages on queue')

      
         logger.info(' Balsam Service Exiting ')
Michael Salim's avatar
Michael Salim committed
168
      except KeyboardInterrupt as e:
jtchilders's avatar
jtchilders committed
169 170 171
         logger.info('Balsam Service Exiting')
         return