Commit b46ff505 authored by jtchilders's avatar jtchilders

various updates

parent 7ed4b1fa
......@@ -5,8 +5,9 @@ from django.db import connections,DEFAULT_DB_ALIAS
from django.db.utils import load_backend
from django.conf import settings
from argo import models
from common import MessageReceiver,Serializer,ArgoUserJob
from argo import models,QueueMessage
from common import db_tools
from common import MessageReceiver,Serializer
def CreateWorkingPath(job_id):
path = os.path.join(settings.ARGO_WORK_DIRECTORY,str(job_id))
......@@ -39,7 +40,7 @@ class UserJobReceiver(MessageReceiver.MessageReceiver):
# convert body text to ArgoUserJob
try:
userjob = ArgoUserJob.deserialize(body)
userjob = Serializer.deserialize(body)
except Exception,e:
logger.error(' received exception while deserializing message to create ArgoUserJob, \nexception message: ' + str(e) + '\n message body: \n' + body + ' \n cannot continue with this job, ignoring it and moving on.')
# acknoledge message
......@@ -48,12 +49,12 @@ class UserJobReceiver(MessageReceiver.MessageReceiver):
# create unique DB connection string
try:
db_connection_id = models.get_db_connection_id(os.getpid())
db_connection_id = db_tools.get_db_connection_id(os.getpid())
db_backend = load_backend(connections.databases[DEFAULT_DB_ALIAS]['ENGINE'])
db_conn = db_backend.DatabaseWrapper(connections.databases[DEFAULT_DB_ALIAS], db_connection_id)
connections[db_connection_id] = db_conn
except Exception,e:
logger.error(' received exception while creating DB connection, exception message: ' + str(e) + ' \n job id: ' + str(userjob.id) + ' job user: ' + userjob.username + ' job description: ' + userjob.description + '\n cannot continue with this job, moving on.')
logger.error(' received exception while creating DB connection, exception message: ' + str(e) + ' \n job id: ' + str(userjob['user_id']) + ' job user: ' + userjob['username'] + ' job description: ' + userjob['description'] + '\n cannot continue with this job, moving on.')
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
return
......@@ -62,20 +63,20 @@ class UserJobReceiver(MessageReceiver.MessageReceiver):
try:
argojob = models.ArgoJob()
argojob.argo_job_id = models.ArgoJob.generate_job_id()
logger.debug(' created ArgoJob with id: ' + str(argojob.argo_job_id) + ' (pk=' + str(argojob.pk) +')')
logger.debug(' created ArgoJob with id: ' + str(argojob.argo_job_id) )
argojob.working_directory = CreateWorkingPath(argojob.argo_job_id)
argojob.user_id = userjob.job_id
argojob.job_name = userjob.job_name
argojob.job_description = userjob.job_description
argojob.group_identifier = userjob.group_identifier
argojob.username = userjob.username
argojob.email = userjob.email
argojob.input_url = userjob.input_url
argojob.output_url = userjob.output_url
argojob.job_status_routing_key = userjob.job_status_routing_key
argojob.user_id = userjob['user_id']
argojob.job_name = userjob['name']
argojob.job_description = userjob['description']
argojob.group_identifier = userjob['group_identifier']
argojob.username = userjob['username']
argojob.email = userjob['email']
argojob.input_url = userjob['input_url']
argojob.output_url = userjob['output_url']
argojob.job_status_routing_key = userjob['job_status_routing_key']
# if there are no subjobs, there isn't anything to do
if len(userjob.subjobs) == 0:
if len(userjob['subjobs']) == 0:
logger.error(' Job received with no subjobs, failing job and moving on.')
argojob.state_id = models.REJECTED.id
argojob.save()
......@@ -86,43 +87,41 @@ class UserJobReceiver(MessageReceiver.MessageReceiver):
del connections[db_connection_id]
return
# initialize subjobs
subjobs = []
for usersubjob in userjob.subjobs:
# add subjobs
subjob_pks = []
for usersubjob in userjob['subjobs']:
argosubjob = models.ArgoSubJob()
argosubjob.site = usersubjob.site
argosubjob.site = usersubjob['site']
argosubjob.subjob_id = models.ArgoJob.generate_job_id()
argosubjob.name = usersubjob.job_name
argosubjob.description = usersubjob.job_description
argosubjob.name = usersubjob['name']
argosubjob.description = usersubjob['description']
argosubjob.argo_job_id = argojob.argo_job_id
argosubjob.queue = usersubjob.queue
argosubjob.project = usersubjob.project
argosubjob.wall_time_minutes = usersubjob.wall_time_minutes
argosubjob.num_nodes = usersubjob.num_nodes
argosubjob.processes_per_node = usersubjob.processes_per_node
argosubjob.scheduler_config = usersubjob.scheduler_config
argosubjob.application = usersubjob.application
argosubjob.config_file = usersubjob.task_input_file
argosubjob.queue = usersubjob['queue']
argosubjob.project = usersubjob['project']
argosubjob.wall_time_minutes = usersubjob['wall_time_minutes']
argosubjob.num_nodes = usersubjob['num_nodes']
argosubjob.processes_per_node = usersubjob['processes_per_node']
argosubjob.scheduler_config = usersubjob['scheduler_config']
argosubjob.application = usersubjob['application']
argosubjob.config_file = usersubjob['config_file']
argosubjob.input_url = (
settings.GRIDFTP_PROTOCOL +
settings.GRIDFTP_SERVER +
argojob.working_directory
)
argosubjob.output_url = (
settings.GRIDFTP_PROTOCOL +
settings.GRIDFTP_SERVER +
argojob.working_directory
)
argosubjob.output_url = argosubjob.input_url
argosubjob.save()
subjobs.append(argosubjob.pk)
argojob.subjobs = Serializer.serialize(subjobs)
subjob_pks.append(argosubjob.pk)
argojob.subjob_pk_list = Serializer.serialize(subjob_pks)
argojob.save()
self.process_queue.put(QueueMessage.QueueMessage(argojob.pk,0,'new job received'))
except Exception,e:
message = 'received an exception while parsing the incomping user job. Exception: ' + str(e) + '; userjob id = ' + str(userjob.job_id) + '; argo_job_id = ' + str(argojob.argo_job_id) + '; job_name = ' + userjob.job_name
message = 'received an exception while parsing the incomping user job. Exception: ' + str(e) + '; userjob id = ' + str(userjob['user_id']) + '; argo_job_id = ' + str(argojob.argo_job_id) + '; job_name = ' + userjob['name']
logger.error(message)
# delete DB connection
del connections[db_connection_id]
logger.debug('added user job')
else:
logger.error('received user job message with no body')
# acknoledge message
......
......@@ -16,6 +16,7 @@ fields_to_skip = [
'working_directory',
'time_modified',
'id',
'argo_job_id',
]
class Command(BaseCommand):
......@@ -33,7 +34,8 @@ class Command(BaseCommand):
raise Exception(' File already exists: ' + output_filename)
with open(output_filename,'w') as outfile:
outfile.write('''
outfile.write('''import json
class ArgoUserJob:
def __init__(self):
''')
......@@ -43,5 +45,13 @@ class ArgoUserJob:
if isinstance(val,str):
val = "'" + val + "'"
outfile.write(' self.' + var + ' = ' + str(val) + '\n')
outfile.write(''' self.subjobs = []
def serialize(self):
return json.dumps(self.__dict__)
def add_subjob(self,subjob):
self.subjobs.append(subjob.__dict__)
''')
......@@ -7,10 +7,10 @@ logger = logging.getLogger(__name__)
from django.db import utils,connections,DEFAULT_DB_ALIAS,models
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from django.core import serializers
from django.core import serializers as django_serializers
from django.core.validators import validate_comma_separated_integer_list
from argo import QueueMessage
from argo import QueueMessage,ArgoJobStatus
from common import log_uncaught_exceptions,MessageInterface
from common import Serializer,transfer,Mail,db_tools
from balsam.models import BalsamJob
......@@ -46,8 +46,11 @@ def submit_subjob(job):
# opening blocking connection which will close at the end of this function
msgInt.open_blocking_connection()
# create message queue for site in case not already done
msgInt.create_queue(subjob.site,subjob.site)
# serialize subjob for message
body = serializers.serialize('json',[subjob])
body = django_serializers.serialize('json',[subjob])
# submit job
msgInt.send_msg(body,subjob.site)
......@@ -86,10 +89,10 @@ def stage_in(job):
if job.input_url != '':
try:
transfer.stage_in(job.input_url + '/',job.working_directory + '/')
self.state = STAGED_IN.name
job.state = STAGED_IN.name
except Exception,e:
message = 'Exception received during stage_in: ' + str(e)
logger.error(message)
logger.exception(message)
job.state = STAGE_IN_FAILED.name
else:
# no input url specified so stage in is complete
......@@ -108,7 +111,7 @@ def stage_out(job):
job.state = STAGED_OUT.name
except Exception,e:
message = 'Exception received during stage_out: ' + str(e)
logger.error(message)
logger.exception(message)
job.state = STAGE_OUT_FAILED.name
else:
# no input url specified so stage in is complete
......@@ -134,35 +137,35 @@ def send_status_message(job,message=None):
return
# construct body of email
body = ' Your job has reached state ' + STATES_BY_ID[job.state_id].name + '\n'
body = ' Your job has reached state ' + job.state + '\n'
if message is not None:
body += ' with the message: ' + str(message)
body += '------------------------------------------------------------------- \n'
body += 'Job Data: \n'
body += serializers.serialize('json',[job])
body += django_serializers.serialize('json',[job])
body += '------------------------------------------------------------------- \n'
body += 'Subjob Data: \n'
body += serializers.serialize('json',ArgoSubJob.objects.get(id==Serializer.deserialize(job.subjob_pk_list)))
body += django_serializers.serialize('json',ArgoSubJob.objects.filter(pk__in=Serializer.deserialize(job.subjob_pk_list)))
# send notification email to user
send_mail(
sender = sender,
Mail.send_mail(
sender = settings.ARGO_JOB_STATUS_EMAIL_SENDER,
receiver = receiver,
subject = 'ARGO Job Status Report',
body = body,
)
except Exception,e:
logger.error('exception received while trying to send status email. Exception: ' + str(e))
logger.exception('exception received while trying to send status email. Exception: ' + str(e))
# if job has an argo job status routing key, send a message there
if self.job_status_routing_key is not None and send_status_message:
logger.info('sending job status message with routing key: ' + self.job_status_routing_key)
if job.job_status_routing_key != '' and send_status_message:
logger.info('sending job status message with routing key: ' + job.job_status_routing_key)
try:
msg = ArgoJobStatus()
msg.state = STATES_BY_ID[job.state_id].name
msg = ArgoJobStatus.ArgoJobStatus()
msg.state = job.state
msg.message = message
msg.job_id = job.argo_job_id
mi = MessageInterface()
mi = MessageInterface.MessageInterface()
mi.host = settings.RABBITMQ_SERVER_NAME
mi.port = settings.RABBITMQ_SERVER_PORT
mi.exchange_name = settings.RABBITMQ_USER_EXCHANGE_NAME
......@@ -172,7 +175,7 @@ def send_status_message(job,message=None):
mi.ssl_ca_certs = settings.RABBITMQ_SSL_CA_CERTS
logger.debug( ' open blocking connection to send status message ' )
mi.open_blocking_connection()
mi.send_msg(msg.get_serialized_message(),self.job_status_routing_key)
mi.send_msg(msg.get_serialized_message(),job.job_status_routing_key)
mi.close()
except:
logger.exception('Exception while sending status message to user job queue')
......@@ -312,7 +315,7 @@ class ArgoJob(models.Model):
job_status_routing_key = models.TextField(default='')
def get_current_subjob(self):
subjob_list = Serializer.deserialize(self.subjob_pk_list)
subjob_list = self.get_subjob_pk_list()
if self.current_subjob_pk_index < len(subjob_list):
logger.debug('getting subjob index ' + str(self.current_subjob_pk_index) + ' of ' + str(len(subjob_list)))
return ArgoSubJob.objects.get(pk=subjob_list[self.current_subjob_pk_index])
......@@ -320,6 +323,11 @@ class ArgoJob(models.Model):
logger.debug('current_subjob_pk_index=' + str(self.current_subjob_pk_index) + ' number of subjobs = ' + str(len(subjob_list)) + ' subjobs = ' + str(subjob_list))
raise SubJobIndexOutOfRange
def add_subjob(self,subjob):
subjob_list = self.get_subjob_pk_list()
subjob_list.append(subjob.pk)
self.subjob_pk_list = Serializer.serialize(subjob_list)
def get_subjob_pk_list(self):
return Serializer.deserialize(self.subjob_pk_list)
......
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from balsam import models
from builtins import input
import logging
try:
input = raw_input
except NameError:
pass
import logging,os
logger = logging.getLogger('console')
class Command(BaseCommand):
......@@ -14,8 +17,8 @@ class Command(BaseCommand):
parser.add_argument('-d','--description',dest='description',help='application description',required=True)
parser.add_argument('-e','--executable',dest='executable',help='application executable with full path',required=True)
parser.add_argument('-c','--config-script',dest='config_script',help='configuration script takes a single file and parses its content to output the command line, therefore allowing users to pass command line args in a safe way.',required=True)
parser.add_argument('-r','--preprocess',dest='preprocess',help='preprocessing script with full path that can be used to process data in the job working directory before the job is submitted to the local batch queue.',required=True)
parser.add_argument('-o','--postprocess',dest='postprocess',help='postprocessing script with full path that can be used to postprocess data in the job working directory after the job is submitted to the local batch queue.',required=True)
parser.add_argument('-r','--preprocess',dest='preprocess',help='preprocessing script with full path that can be used to process data in the job working directory before the job is submitted to the local batch queue.',default='')
parser.add_argument('-o','--postprocess',dest='postprocess',help='postprocessing script with full path that can be used to postprocess data in the job working directory after the job is submitted to the local batch queue.',default='')
parser.add_argument('-w','--no-checks',dest='no_checks',help='Typically an exception is thrown if one of the paths specified does not exist, but this flag disables that behavior.',action='store_true')
def handle(self, *args, **options):
......@@ -31,11 +34,11 @@ class Command(BaseCommand):
raise Exception('config-script not found: ' + str(options['config_script']))
logger.info(' preprocess = ' + options['preprocess'])
if not os.path.exists(options['preprocess']) or options['no_checks']:
if options['preprocess'] != '' and (not os.path.exists(options['preprocess']) or options['no_checks']):
raise Exception('preprocess not found: ' + str(options['preprocess']))
logger.info(' postprocess = ' + options['postprocess'])
if not os.path.exists(options['postprocess']) or options['no_checks']:
if options['postprocess'] != '' and (not os.path.exists(options['postprocess']) or options['no_checks']):
raise Exception('postprocess not found: ' + str(options['postprocess']))
answer = str(input(' Enter "yes" to continue:'))
......
......@@ -21,6 +21,8 @@ fields_to_skip = [
'scheduler_id',
'time_modified',
'balsam_job_id',
'output_url',
'input_url',
]
class Command(BaseCommand):
......@@ -52,4 +54,5 @@ class BalsamUserJob:
outfile.write(' self.' + var + ' = ' + str(val) + '\n')
from common_core.file_tools import delete_old_files_directories
from common.file_tools import delete_old_files_directories
import time
class DirCleaner:
......
from common_core.MessageInterface import MessageInterface
from common.MessageInterface import MessageInterface
from django.conf import settings
import logging,sys,multiprocessing,time,os
logger = logging.getLogger(__name__)
......
......@@ -2,8 +2,8 @@ import multiprocessing,logging
logger = logging.getLogger(__name__)
from django.db import utils,connections,DEFAULT_DB_ALIAS
from balsam_core import QueueMessage
from common_core import db_tools
from balsam import QueueMessage
from common import db_tools
class TransitionJob(multiprocessing.Process):
''' spawns subprocess which finds the DB entry for the given id
......@@ -64,4 +64,4 @@ class TransitionJob(multiprocessing.Process):
self.queue.put(QueueMessage.QueueMessage(self.entry_pk,QueueMessage.TransitionComplete))
# Transition Done.
\ No newline at end of file
# Transition Done.
......@@ -96,21 +96,29 @@ class LocalHandler:
def pre_stage_hook(self):
pass
def stage_in( self, source_url, destination_directory ):
parts = urlparse.urlparse( source_url )
command = 'cp -p -r %s/* %s' % (parts.path, destination_directory)
print 'transfer.stage_in: command=' + command
ret = os.system(command)
if ret:
raise Exception("Error in stage_in: %d" % ret)
def stage_in(self, source_url, destination_directory):
try:
parts = urlparse.urlparse( source_url )
command = 'cp -p -r /%s%s* %s' % (parts.netloc,parts.path,destination_directory)
logger.debug('transfer.stage_in: command=' + command )
ret = os.system(command)
if ret:
raise Exception("Error in stage_in: %d" % ret)
except Exception as e:
logger.exception('Exception in stage_in: ' + str(e))
raise
def stage_out( self, source_directory, destination_url ):
parts = urlparse.urlparse( destination_url )
command = 'cp -r %s/* %s' % (source_directory, parts.path)
print 'transfer.stage_out: command=' + command
ret = os.system(command)
if ret:
raise Exception("Error in stage_out: %d" % ret)
try:
parts = urlparse.urlparse( destination_url )
command = 'cp -r %s/* /%s/%s' % (source_directory,parts.netloc,parts.path)
logger.debug( 'transfer.stage_out: command=' + command )
ret = os.system(command)
if ret:
raise Exception("Error in stage_out: %d" % ret)
except Exception as e:
logger.exception('Exception in stage_out: ' + str(e))
raise
# - SCP implementation
......@@ -149,18 +157,22 @@ def get_handler(url):
handler = handler_class()
else:
raise Exception('Unknown transfer protocol: %s' % proto)
return handler
return handler
# def pre_stage_hook(url):
# handler = get_handler(url)
# handler.pre_stage_hook()
def stage_in( source_url, destination_directory ):
handler = get_handler(source_url)
logger.debug('pre-stage hook')
handler.pre_stage_hook()
logger.debug('stage-in')
handler.stage_in( source_url, destination_directory )
try:
handler = get_handler(source_url)
logger.debug('pre-stage hook')
handler.pre_stage_hook()
logger.debug('stage-in')
handler.stage_in( source_url, destination_directory )
except Exception as e:
logger.exception('Exception: ' + str(e))
raise
def stage_out( source_directory, destination_url ):
handler = get_handler(destination_url)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment