UserJobReceiver.py 6.33 KB
Newer Older
jtchilders's avatar
jtchilders committed
1 2 3 4 5 6 7
import logging,sys,multiprocessing,time,os,pwd,grp
logger = logging.getLogger(__name__)

from django.db import connections,DEFAULT_DB_ALIAS
from django.db.utils import load_backend
from django.conf import settings

8 9 10
from balsam.argo import models,QueueMessage
from balsam.common import db_tools
from balsam.common import MessageReceiver,Serializer
jtchilders's avatar
jtchilders committed
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42

def CreateWorkingPath(job_id):
   path = os.path.join(settings.ARGO_WORK_DIRECTORY,str(job_id))
   os.makedirs(path)
   return path

class UserJobReceiver(MessageReceiver.MessageReceiver):
   ''' subscribes to the input user job queue and adds jobs to the database '''

   def __init__(self,process_queue = None):
      super(UserJobReceiver,self).__init__(
            settings.RABBITMQ_USER_JOB_QUEUE_NAME,
            settings.RABBITMQ_USER_JOB_ROUTING_KEY,
            settings.RABBITMQ_SERVER_NAME,
            settings.RABBITMQ_SERVER_PORT,
            settings.RABBITMQ_USER_EXCHANGE_NAME,
            settings.RABBITMQ_SSL_CERT,
            settings.RABBITMQ_SSL_KEY,
            settings.RABBITMQ_SSL_CA_CERTS,
           )
      self.process_queue = process_queue
            

   
   # This is where the real processing of incoming messages happens
   def consume_msg(self,channel,method_frame,header_frame,body):
      logger.debug('in consume_msg')
      if body is not None:
         logger.debug(' received message: ' + body )

         # convert body text to ArgoUserJob
         try:
jtchilders's avatar
jtchilders committed
43
            userjob = Serializer.deserialize(body)
44
         except Exception as e:
jtchilders's avatar
jtchilders committed
45 46 47 48 49 50 51
            logger.error(' received exception while deserializing message to create ArgoUserJob, \nexception message: ' + str(e) + '\n message body: \n' + body + ' \n cannot continue with this job, ignoring it and moving on.')
            # acknoledge message
            channel.basic_ack(method_frame.delivery_tag)
            return
         
         # create unique DB connection string
         try:
jtchilders's avatar
jtchilders committed
52
            db_connection_id = db_tools.get_db_connection_id(os.getpid())
jtchilders's avatar
jtchilders committed
53 54 55
            db_backend = load_backend(connections.databases[DEFAULT_DB_ALIAS]['ENGINE'])
            db_conn = db_backend.DatabaseWrapper(connections.databases[DEFAULT_DB_ALIAS], db_connection_id)
            connections[db_connection_id] = db_conn
56
         except Exception as e:
jtchilders's avatar
jtchilders committed
57
            logger.error(' received exception while creating DB connection, exception message: ' + str(e) + ' \n job id: ' + str(userjob['user_id']) + ' job user: ' + userjob['username'] + ' job description: ' + userjob['description'] + '\n cannot continue with this job, moving on.')
jtchilders's avatar
jtchilders committed
58 59 60 61 62 63 64
            # acknoledge message
            channel.basic_ack(method_frame.delivery_tag)
            return

         # create ArgoJob and initialize it
         try:
            argojob = models.ArgoJob()
65 66 67
            argojob.job_id              = models.ArgoJob.generate_job_id()
            logger.debug(' created ArgoJob with id: ' + str(argojob.job_id) )
            argojob.working_directory        = CreateWorkingPath(argojob.job_id)
jtchilders's avatar
jtchilders committed
68 69 70 71 72 73 74 75 76
            argojob.user_id                  = userjob['user_id']
            argojob.job_name                 = userjob['name']
            argojob.job_description          = userjob['description']
            argojob.group_identifier         = userjob['group_identifier']
            argojob.username                 = userjob['username']
            argojob.email                    = userjob['email']
            argojob.input_url                = userjob['input_url']
            argojob.output_url               = userjob['output_url']
            argojob.job_status_routing_key   = userjob['job_status_routing_key']
jtchilders's avatar
jtchilders committed
77 78

            # if there are no subjobs, there isn't anything to do
jtchilders's avatar
jtchilders committed
79
            if len(userjob['subjobs']) == 0:
jtchilders's avatar
jtchilders committed
80 81 82 83 84 85 86 87 88 89
               logger.error(' Job received with no subjobs, failing job and moving on.')
               argojob.state_id = models.REJECTED.id
               argojob.save()
               message = 'Job rejected because there are no subjobs.'
               models.send_status_message(job,message)
               # acknoledge message
               channel.basic_ack(method_frame.delivery_tag)
               del connections[db_connection_id]
               return

jtchilders's avatar
jtchilders committed
90 91 92
            # add subjobs
            subjob_pks = []
            for usersubjob in userjob['subjobs']:
jtchilders's avatar
jtchilders committed
93
               argosubjob                       = models.ArgoSubJob()
jtchilders's avatar
jtchilders committed
94
               argosubjob.site                  = usersubjob['site']
95
               argosubjob.job_id                = models.ArgoJob.generate_job_id()
jtchilders's avatar
jtchilders committed
96 97
               argosubjob.name                  = usersubjob['name']
               argosubjob.description           = usersubjob['description']
98
               argosubjob.argo_job_id           = argojob.job_id
jtchilders's avatar
jtchilders committed
99 100 101 102 103 104 105 106
               argosubjob.queue                 = usersubjob['queue']
               argosubjob.project               = usersubjob['project']
               argosubjob.wall_time_minutes     = usersubjob['wall_time_minutes']
               argosubjob.num_nodes             = usersubjob['num_nodes']
               argosubjob.processes_per_node    = usersubjob['processes_per_node']
               argosubjob.scheduler_config      = usersubjob['scheduler_config']
               argosubjob.application           = usersubjob['application']
               argosubjob.config_file           = usersubjob['config_file']
jtchilders's avatar
jtchilders committed
107 108 109 110 111
               argosubjob.input_url =  (
                     settings.GRIDFTP_PROTOCOL + 
                     settings.GRIDFTP_SERVER + 
                     argojob.working_directory
                    )
jtchilders's avatar
jtchilders committed
112
               argosubjob.output_url            = argosubjob.input_url
jtchilders's avatar
jtchilders committed
113
               argosubjob.save()
jtchilders's avatar
jtchilders committed
114 115
               subjob_pks.append(argosubjob.pk)
            argojob.subjob_pk_list = Serializer.serialize(subjob_pks)
jtchilders's avatar
jtchilders committed
116
            argojob.save()
jtchilders's avatar
jtchilders committed
117
            self.process_queue.put(QueueMessage.QueueMessage(argojob.pk,0,'new job received'))
118
         except Exception as e:
119
            message = 'received an exception while parsing the incomping user job. Exception: ' + str(e) + '; userjob id = ' + str(userjob['user_id']) + '; job_id = ' + str(argojob.job_id) + '; job_name = ' + userjob['name']
jtchilders's avatar
jtchilders committed
120 121 122 123
            logger.error(message)

         # delete DB connection
         del connections[db_connection_id]
jtchilders's avatar
jtchilders committed
124
         logger.debug('added user job')
jtchilders's avatar
jtchilders committed
125 126 127 128
      else:
         logger.error('received user job message with no body')
      # acknoledge message
      channel.basic_ack(method_frame.delivery_tag)