Commit 95d68fc6 authored by Michael Salim's avatar Michael Salim

- 2-->3 and bugfixes

- Updated syntax for exception handling from 2 to 3
- Replaced "Queue" with "queue" module
- Broke ConnectionParameters -- changed to 'localhost' for testing
- Identified BUG: argojob = ArgoJob.objects.get(job_id=sub.job_id)
parent 9723a23d
...@@ -58,7 +58,7 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver): ...@@ -58,7 +58,7 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver):
# get the argo job for this subjob # get the argo job for this subjob
try: try:
argojob = ArgoJob.objects.get(job_id=subjob.job_id) argojob = ArgoJob.objects.get(job_id=subjob.job_id) # BUG !
except Exception as e: except Exception as e:
logger.error(' exception received while retrieving ArgoJob with id = ' + str(subjob.job_id + ': ' + str(e))) logger.error(' exception received while retrieving ArgoJob with id = ' + str(subjob.job_id + ': ' + str(e)))
# acknoledge message # acknoledge message
...@@ -72,9 +72,9 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver): ...@@ -72,9 +72,9 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver):
# get the deserialized balsam job # get the deserialized balsam job
try: try:
balsam_job = models.BalsamJob() balsam_job = models.BalsamJob()
statusMsg.get_job(balsam_job) statusMsg.get_job(balsam_job) # statusMsg.serialzed_job gets loaded into balsam_job
logger.debug('balsam_job = ' + str(balsam_job)) logger.debug('balsam_job = ' + str(balsam_job))
except BalsamJobStatus.DeserializeFailed,e: except BalsamJobStatus.DeserializeFailed as e:
logger.error('Failed to deserialize BalsamJob for BalsamJobStatus message for argojob: ' + str(argojob.job_id) + ' subjob_id: ' + str(subjob.job_id)) logger.error('Failed to deserialize BalsamJob for BalsamJobStatus message for argojob: ' + str(argojob.job_id) + ' subjob_id: ' + str(subjob.job_id))
# acknoledge message # acknoledge message
channel.basic_ack(method_frame.delivery_tag) channel.basic_ack(method_frame.delivery_tag)
...@@ -84,7 +84,8 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver): ...@@ -84,7 +84,8 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver):
QueueMessage.JobStatusReceiverRetrieveArgoJobFailed)) QueueMessage.JobStatusReceiverRetrieveArgoJobFailed))
return return
# parse balsam_job into subjob and argojob # parse balsam_job (just received from balsam, new status) into
# subjob and argojob (need to be synced)
if balsam_job is not None: if balsam_job is not None:
# copy scheduler id to subjob # copy scheduler id to subjob
...@@ -98,7 +99,7 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver): ...@@ -98,7 +99,7 @@ class JobStatusReceiver(MessageReceiver.MessageReceiver):
try: try:
argojob.state = BALSAM_JOB_TO_SUBJOB_STATE_MAP[balsam_job.state].name argojob.state = BALSAM_JOB_TO_SUBJOB_STATE_MAP[balsam_job.state].name
logger.debug(' receieved subjob state = ' + subjob.state + ' setting argo job state to ' + argojob.state) logger.debug(' receieved subjob state = ' + subjob.state + ' setting argo job state to ' + argojob.state)
except KeyError,e: except KeyError as e:
logger.error(' could not map balsam_job state: ' + str(balsam_job.state) + ' to an ArgoJob state for job id: ' + str(argojob.job_id)) logger.error(' could not map balsam_job state: ' + str(balsam_job.state) + ' to an ArgoJob state for job id: ' + str(argojob.job_id))
# acknoledge message # acknoledge message
channel.basic_ack(method_frame.delivery_tag) channel.basic_ack(method_frame.delivery_tag)
......
import os,sys,time,multiprocessing,Queue,logging import os,sys,time,multiprocessing,queue,logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
...@@ -30,7 +30,7 @@ class Command(BaseCommand): ...@@ -30,7 +30,7 @@ class Command(BaseCommand):
p = UserJobReceiver.UserJobReceiver(process_queue=argo_service_queue) p = UserJobReceiver.UserJobReceiver(process_queue=argo_service_queue)
p.start() p.start()
subprocesses['UserJobReceiver'] = p subprocesses['UserJobReceiver'] = p
except Exception,e: except Exception as e:
logger.exception(' Received Exception while trying to start job receiver: ' + str(e)) logger.exception(' Received Exception while trying to start job receiver: ' + str(e))
raise raise
logger.debug(' Launching balsam job status receiver ') logger.debug(' Launching balsam job status receiver ')
...@@ -38,7 +38,7 @@ class Command(BaseCommand): ...@@ -38,7 +38,7 @@ class Command(BaseCommand):
p = JobStatusReceiver.JobStatusReceiver(process_queue=argo_service_queue) p = JobStatusReceiver.JobStatusReceiver(process_queue=argo_service_queue)
p.start() p.start()
subprocesses['JobStatusReceiver'] = p subprocesses['JobStatusReceiver'] = p
except Exception,e: except Exception as e:
logger.exception(' Received exception while trying to start balsam job status receiver: ' + str(e)) logger.exception(' Received exception while trying to start balsam job status receiver: ' + str(e))
raise raise
...@@ -90,7 +90,7 @@ class Command(BaseCommand): ...@@ -90,7 +90,7 @@ class Command(BaseCommand):
+ str(settings.ARGO_MAX_CONCURRENT_TRANSITIONS)) + str(settings.ARGO_MAX_CONCURRENT_TRANSITIONS))
# loop over running process and check status # loop over running process and check status
for name,proc in subprocesses.iteritems(): for name,proc in subprocesses.items():
if not proc.is_alive(): if not proc.is_alive():
logger.info(' subprocess ' + name + ' has stopped with returncode ' + str(proc.exitcode) ) logger.info(' subprocess ' + name + ' has stopped with returncode ' + str(proc.exitcode) )
...@@ -119,7 +119,7 @@ class Command(BaseCommand): ...@@ -119,7 +119,7 @@ class Command(BaseCommand):
job.save(update_fields=['state']) job.save(update_fields=['state'])
else: else:
logger.error('Unrecognized QueueMessage code: ' + str(qmsg.code)) logger.error('Unrecognized QueueMessage code: ' + str(qmsg.code))
except Queue.Empty: except queue.Empty:
logger.debug(' no objects on queue ') logger.debug(' no objects on queue ')
...@@ -127,7 +127,7 @@ class Command(BaseCommand): ...@@ -127,7 +127,7 @@ class Command(BaseCommand):
if settings.ARGO_DELETE_OLD_WORK: if settings.ARGO_DELETE_OLD_WORK:
workDirCleaner.clean() workDirCleaner.clean()
for key,item in receivers.iteritems(): for key,item in receivers.items():
item.terminate() item.terminate()
item.join() item.join()
......
...@@ -8,6 +8,8 @@ class DeserializeFailed(Exception): pass ...@@ -8,6 +8,8 @@ class DeserializeFailed(Exception): pass
class BalsamJobStatus: class BalsamJobStatus:
def __init__(self,job=None,message=None): def __init__(self,job=None,message=None):
'''Constructed with a BalsamJob, but only contains simple id,
serialized_job, and message attributes'''
self.job_id = None self.job_id = None
self.serialized_job = None self.serialized_job = None
self.message = message self.message = message
......
import os,sys,logging,multiprocessing,Queue,traceback import os,sys,logging,multiprocessing,queue,traceback
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
...@@ -30,7 +30,7 @@ class Command(BaseCommand): ...@@ -30,7 +30,7 @@ class Command(BaseCommand):
p = BalsamJobReceiver.BalsamJobReceiver() p = BalsamJobReceiver.BalsamJobReceiver()
p.start() p.start()
subprocesses['BalsamJobReceiver'] = p subprocesses['BalsamJobReceiver'] = p
except Exception,e: except Exception as e:
logger.exception(' Received Exception while trying to start job receiver: ' + str(e)) logger.exception(' Received Exception while trying to start job receiver: ' + str(e))
# setup timer for cleaning the work folder of old files # setup timer for cleaning the work folder of old files
...@@ -77,12 +77,12 @@ class Command(BaseCommand): ...@@ -77,12 +77,12 @@ class Command(BaseCommand):
continue # jump to next job, skip remaining actions continue # jump to next job, skip remaining actions
job.save(update_fields=['state']) job.save(update_fields=['state'])
models.send_status_message(job,'Job entered ' + job.state + ' state') models.send_status_message(job,'Job entered ' + job.state + ' state')
except exceptions.JobStatusFailed,e: except exceptions.JobStatusFailed as e:
message = 'get_job_status failed for pk='+str(job.pk)+': ' + str(e) message = 'get_job_status failed for pk='+str(job.pk)+': ' + str(e)
logger.error(message) logger.error(message)
# TODO: Should I fail the job? # TODO: Should I fail the job?
models.send_status_message(job,message) models.send_status_message(job,message)
except Exception,e: except Exception as e:
message = 'failed to get status for pk='+str(job.pk)+', exception: ' + str(e) message = 'failed to get status for pk='+str(job.pk)+', exception: ' + str(e)
logger.error(message) logger.error(message)
# TODO: Should I fail the job? # TODO: Should I fail the job?
...@@ -129,7 +129,7 @@ class Command(BaseCommand): ...@@ -129,7 +129,7 @@ class Command(BaseCommand):
workDirCleaner.clean() workDirCleaner.clean()
# loop over running process and check status # loop over running process and check status
for name,proc in subprocesses.iteritems(): for name,proc in subprocesses.items():
if not proc.is_alive(): if not proc.is_alive():
logger.info(' subprocess ' + name + ' has stopped with returncode ' + str(proc.exitcode) ) logger.info(' subprocess ' + name + ' has stopped with returncode ' + str(proc.exitcode) )
...@@ -160,12 +160,12 @@ class Command(BaseCommand): ...@@ -160,12 +160,12 @@ class Command(BaseCommand):
job.save(update_fields=['state']) job.save(update_fields=['state'])
else: else:
logger.error('No recognized QueueMessage code') logger.error('No recognized QueueMessage code')
except Queue.Empty,e: except queue.Empty as e:
logger.debug('no messages on queue') logger.debug('no messages on queue')
logger.info(' Balsam Service Exiting ') logger.info(' Balsam Service Exiting ')
except KeyboardInterrupt,e: except KeyboardInterrupt as e:
logger.info('Balsam Service Exiting') logger.info('Balsam Service Exiting')
return return
...@@ -463,7 +463,7 @@ class BalsamJob(models.Model): ...@@ -463,7 +463,7 @@ class BalsamJob(models.Model):
def deserialize(self,serial_data): def deserialize(self,serial_data):
d = Serializer.deserialize(serial_data) d = Serializer.deserialize(serial_data)
for field,value in d.iteritems(): for field,value in d.items():
if field in DATETIME_FIELDS and value != None: if field in DATETIME_FIELDS and value != None:
self.__dict__[field] = datetime.datetime.strptime(value,"%Y-%m-%d %H:%M:%S %z") self.__dict__[field] = datetime.datetime.strptime(value,"%Y-%m-%d %H:%M:%S %z")
else: else:
......
...@@ -113,15 +113,16 @@ class MessageInterface: ...@@ -113,15 +113,16 @@ class MessageInterface:
#logger.debug(str(ssl_options_dict)) #logger.debug(str(ssl_options_dict))
# setup our connection parameters # setup our connection parameters
self.parameters = pika.ConnectionParameters( self.parameters = pika.ConnectionParameters('localhost')
host = self.host, #self.parameters = pika.ConnectionParameters(
port = self.port, # host = self.host,
virtual_host = self.virtual_host, # port = self.port,
credentials = self.credentials, # virtual_host = self.virtual_host,
socket_timeout = self.socket_timeout, # credentials = self.credentials,
ssl = True, # socket_timeout = self.socket_timeout,
ssl_options = ssl_options_dict, # ssl = True,
) # ssl_options = ssl_options_dict,
# )
def create_queue(self,name,routing_key): def create_queue(self,name,routing_key):
# declare a random queue which this job will use to receive messages # declare a random queue which this job will use to receive messages
......
#!/usr/bin/env bash #!/usr/bin/env bash
source argobalsam_env/bin/activate
#### export ARGOBALSAM_INSTALL_PATH=$(pwd)
# you can place additional environment needs here export ARGOBALSAM_DATA_PATH=$ARGOBALSAM_INSTALL_PATH/data
#########
#####
# set pathas for edge service
################################
export ARGOBALSAM_INSTALL_PATH=FIXME
export ARGOBALSAM_DATA_PATH=$ARGOBALSAM_INSTALL_PATH
export ARGOBALSAM_EXE_PATH=$ARGOBALSAM_INSTALL_PATH/exe export ARGOBALSAM_EXE_PATH=$ARGOBALSAM_INSTALL_PATH/exe
#####
# activate the virtualenv
################################
. $ARGOBALSAM_INSTALL_PATH/argobalsam_env/bin/activate
#####
# setup the certificate info
#######################################
. grid_setup.sh
...@@ -86,7 +86,7 @@ GRIDFTP_SERVER = 'atlasgridftp02.hep.anl.gov' ...@@ -86,7 +86,7 @@ GRIDFTP_SERVER = 'atlasgridftp02.hep.anl.gov'
#------------------------------ #------------------------------
# RABBITMQ/PIKA CONFIG # RABBITMQ/PIKA CONFIG
#------------------------------ #------------------------------
RABBITMQ_SERVER_NAME = 'atlasgridftp02.hep.anl.gov' RABBITMQ_SERVER_NAME = 'localhost'
RABBITMQ_SERVER_PORT = 5671 RABBITMQ_SERVER_PORT = 5671
try: try:
RABBITMQ_SSL_CERT = os.environ['X509_USER_CERT'] RABBITMQ_SSL_CERT = os.environ['X509_USER_CERT']
...@@ -94,6 +94,9 @@ try: ...@@ -94,6 +94,9 @@ try:
RABBITMQ_SSL_CA_CERTS = os.environ['X509_CACERTS'] RABBITMQ_SSL_CA_CERTS = os.environ['X509_CACERTS']
except KeyError as e: except KeyError as e:
logger.error('Environment variable undefined: ' + str(e)) logger.error('Environment variable undefined: ' + str(e))
RABBITMQ_SSL_CERT = ''
RABBITMQ_SSL_KEY = ''
RABBITMQ_SSL_CA_CERTS = ''
RABBITMQ_USER_EXCHANGE_NAME = 'argo_users_dev' RABBITMQ_USER_EXCHANGE_NAME = 'argo_users_dev'
RABBITMQ_USER_JOB_QUEUE_NAME = 'argo_service_dev' RABBITMQ_USER_JOB_QUEUE_NAME = 'argo_service_dev'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment