Commit e714b27f authored by jtchilders's avatar jtchilders

first commit

parents
.*
!.gitignore
*.pyc
exe
log
argojobs
balsamjobs
argobalsam_env
db.sqlite3
argo/migrations
balsam/migrations
** INSTALLATION
# install virtualenv
https://pypi.python.org/pypi/virtualenv
# set up an installation directory
export BASEDIR=/path/to/installation
# set up virtual env
mkdir $BASEDIR
cd $BASEDIR
virtualenv balsam_env
. balsam_env/bin/activate
# install prerequisites
pip install django
pip install south
pip install pika
# install balsam_core
# (extract the tar file in some directory other than $BASEDIR)
tar xzf balsam_core.tgz
cd balsam_core
python setup.py install --prefix=$BASEDIR/balsam_env
cd $BASEDIR
django-admin.py startproject balsam_deploy
#cp settings.py $BASEDIR/balsam_deploy/balsam_deploy
# edit $BASEDIR/balsam_deploy/balsam_deploy/settings.py
# - set the database filename
# - add balsam_core and south to the INSTALLED_APPLICATIONS
# - add 'from argo_settings import *' at end
# edit argo_settings.py and change the following:
- in the DATABASES section, set the NAME field to the full path to an sqlite3 file, preferably in $BASEDIR/balsam_deploy
- set BALSAM_DEPLOYMENT_DIRECTORY to $BASEDIR/balsam_deploy
- set BALSAM_WORK_DIRECTORY. A work directory will be created here for each job, and the job will be run in this directory.
- set BALSAM_SCHEDULER_SUBMIT_EXE, BALSAM_SCHEDULER_STATUS_EXE appropriately
- set BALSAM_GLOBUS_URL_COPY_EXE, BALSAM_GRID_PROXY_INIT_EXE appropriately
- set BALSAM_ALLOWED_EXECUTABLE_DIRECTORY. Only executables from this directory will be executed.
cd balsam_deploy
python manage.py syncdb --noinput
** START BALSAM SERVICE
The service fetches jobs from the messages queues and adds them to the database, and updates the job status in the database periodically. It operates on a period defined by BALSAM_FETCH_DELAY in settings.py.
. balsam_env/bin/activate
cd $BASEDIR/balsam_deploy
python manage.py balsam_service
** START BALSAM DAEMON
The daemon queries the local database for jobs to be run and manages them over their lifetime. It operates on a period defined by BALSAM_EXECUTION_DELAY in settings.py.
. balsam_env/bin/activate
cd $BASEDIR/balsam_deploy
python manage.py balsam_daemon
** ADD TEST JOB TO MESSAGE QUEUE
. balsam_env/bin/activate
cd $BASEDIR/rabbitmq
./newjob testjob
** INTEGRATION POINTS
- in settings.py, BALSAM_SCHEDULER_SUBMIT_EXE, BALSAM_SCHEDULER_STATUS_EXE identify the qsub and qstat executables, respectively
- in settings.py, BALSAM_GLOBUS_URL_COPY_EXE, BALSAM_GRID_PROXY_INIT_EXE identify the globus-url-copy and grid-proxy-init executables, respectively
- balsam_env/lib/python2.6/site-packages/balsam_core/scheduler.py is where qsub and qstat are called
- balsam_env/lib/python2.6/site-packages/balsam_core/management/commands/balsam_service.py is where qstat output is parsed for updating job status
- the queue for job submission is hard-coded in scheduler.py
import common.Serializer as Serializer
class ArgoJobStatus:
def __init__(self):
self.state = None
self.job_id = None
self.message = None
def get_serialized_message(self):
return Serializer.serialize(self.__dict__)
@staticmethod
def get_from_message(message):
tmp = ArgoJobStatus()
tmp.__dict__ = Serializer.deserialize(message)
return tmp
\ No newline at end of file
import logging,sys,multiprocessing,time,os
logger = logging.getLogger(__name__)
from django.db import connections,DEFAULT_DB_ALIAS
from django.db.utils import load_backend
from django.conf import settings
from common import MessageReceiver
from argo import QueueMessage
from argo.models import ArgoJob,ArgoSubJob,BALSAM_JOB_TO_SUBJOB_STATE_MAP
from balsam import BalsamJobStatus
class JobStatusReceiver(MessageReceiver.MessageReceiver):
''' subscribes to the balsam job status queue and updates a job state '''
def __init__(self,process_queue):
super(JobStatusReceiver,self).__init__(
settings.RABBITMQ_BALSAM_JOB_STATUS_QUEUE,
settings.RABBITMQ_BALSAM_JOB_STATUS_ROUTING_KEY,
settings.RABBITMQ_SERVER_NAME,
settings.RABBITMQ_SERVER_PORT,
settings.RABBITMQ_BALSAM_EXCHANGE_NAME,
settings.RABBITMQ_SSL_CERT,
settings.RABBITMQ_SSL_KEY,
settings.RABBITMQ_SSL_CA_CERTS
)
self.process_queue = process_queue
# This is where the real processing of incoming messages happens
def consume_msg(self,channel,method_frame,header_frame,body):
logger.debug(' in consume_msg ')
try:
if body is not None:
# convert body text to BalsamJobStatusMessage
statusMsg = BalsamJobStatus.BalsamJobStatus()
statusMsg.deserialize(body)
logger.info(' received status message for job ' + str(statusMsg.origin_id) + ', message: ' + str(statusMsg.message))
# create unique DB connection string
db_connection_id = 'db_con_%08i' % statusMsg.origin_id
db_backend = load_backend(connections.databases[DEFAULT_DB_ALIAS]['ENGINE'])
db_conn = db_backend.DatabaseWrapper(connections.databases[DEFAULT_DB_ALIAS], db_connection_id)
connections[db_connection_id] = db_conn
# get the subjob for this message
try:
subjob = ArgoSubJob.objects.get(balsam_job_id=statusMsg.id)
except Exception,e:
logger.error(' exception received while retreiving ArgoSubJob with id = ' + str(statusMsg.id) + ': ' + str(e))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
# send message to balsam_service about completion
self.process_queue.put(QueueMessage.QueueMessage(statusMsg.id,
QueueMessage.JobStatusReceiverRetrieveArgoSubJobFailed))
return
# get the argo job for this subjob
try:
argojob = ArgoJob.objects.get(argo_job_id=subjob.origin_id)
except Exception,e:
logger.error(' exception received while retrieving ArgoJob with id = ' + str(subjob.origin_id + ': ' + str(e)))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
# send message to balsam_service about completion
self.process_queue.put(QueueMessage.QueueMessage(subjob.origin_id,
QueueMessage.JobStatusReceiverRetrieveArgoJobFailed))
return
# get the deserialized balsam job
try:
balsam_job = statusMsg.get_job()
logger.debug('balsam_job = ' + str(balsam_job))
except BalsamJobStatus.DeserializeFailed,e:
logger.error('Failed to deserialize BalsamJob for BalsamJobStatus message for argojob: ' + str(argojob.argo_job_id) + ' subjob_id: ' + str(subjob.balsam_job_id))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
# send message to balsam_service about completion
self.process_queue.put(QueueMessage.QueueMessage(subjob.origin_id,
QueueMessage.JobStatusReceiverRetrieveArgoJobFailed))
return
# parse balsam_job into subjob and argojob
if balsam_job is not None:
# copy scheduler id to subjob
subjob.scheduler_id = balsam_job.scheduler_id
# copy current job state to subjob
subjob.state = balsam_job.state
# save subjob
subjob.save(update_fields=['state','scheduler_id'],using=db_connection_id)
# map subjob state to argo job state
try:
argojob.state = BALSAM_JOB_TO_SUBJOB_STATE_MAP[balsam_job.state].name
logger.debug(' receieved subjob state = ' + subjob.state + ' setting argo job state to ' + argojob.state)
except KeyError,e:
logger.error(' could not map balsam_job state: ' + str(balsam_job.state) + ' to an ArgoJob state for job id: ' + str(argojob.argo_job_id))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
# send message to balsam_service about completion
self.process_queue.put(QueueMessage.QueueMessage(argojob.argo_job_id,
QueueMessage.JobStatusReceiverBalsamStateMapFailure))
return
# save argojob
argojob.save(update_fields=['state'],using=db_connection_id)
else:
logger.error('received no balsam_job from BalsamJobStatus')
self.process_queue.put(QueueMessage.QueueMessage(argojob.argo_job_id,
QueueMessage.JobStatusReceiverCompleted))
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
else:
logger.debug(' consume_msg called, but body is None ')
self.process_queue.put(QueueMessage.QueueMessage(argojob.argo_job_id,
QueueMessage.JobStatusReceiverMessageNoBody))
except Exception,e:
logger.exception("Error consuming status message: " + str(e))
self.process_queue.put(QueueMessage.QueueMessage(0,
QueueMessage.JobStatusReceiverFailed))
logger.debug(' leaving consume_msg ' )
# errors from Transition function
TransitionComplete = 10
TransitionDbConnectionFailed = 11
TransitionDbRetrieveFailed = 12
TransitionFunctionException = 13
# errors from JobStatusReceiver
JobStatusReceiverRetrieveArgoSubJobFailed = 21
JobStatusReceiverRetrieveArgoJobFailed = 22
JobStatusReceiverBalsamStateMapFailure = 23
JobStatusReceiverCompleted = 24
JobStatusReceiverMessageNoBody = 25
JobStatusReceiverFailed = 26
msg_codes = {
0:'NoMessageCode',
TransitionComplete:'TransitionComplete',
TransitionDbConnectionFailed:'TransitionDbConnectionFailed',
TransitionDbRetrieveFailed:'TransitionDbRetrieveFailed',
TransitionFunctionException:'TransitionFunctionException',
JobStatusReceiverRetrieveArgoSubJobFailed:'JobStatusReceiverRetrieveArgoSubJobFailed',
JobStatusReceiverRetrieveArgoJobFailed:'JobStatusReceiverRetrieveArgoJobFailed',
JobStatusReceiverBalsamStateMapFailure:'JobStatusReceiverBalsamStateMapFailure',
JobStatusReceiverCompleted:'JobStatusReceiverCompleted',
JobStatusReceiverMessageNoBody:'JobStatusReceiverMessageNoBody',
JobStatusReceiverFailed:'JobStatusReceiverFailed',
}
class QueueMessage:
''' a message used to communicate with the balsam_service main loop '''
def __init__(self,pk=0,code=0,message=''):
self.pk = pk
self.code = code
self.message = message
def __str__(self):
s = ''
s = '%i:%s:%s' % (self.pk,self.msg_codes[self.msg_code],self.message)
return s
\ No newline at end of file
import logging,sys,multiprocessing,time,os,pwd,grp
logger = logging.getLogger(__name__)
from django.db import connections,DEFAULT_DB_ALIAS
from django.db.utils import load_backend
from django.conf import settings
from argo import models
from common import MessageReceiver,Serializer,ArgoUserJob
def CreateWorkingPath(job_id):
path = os.path.join(settings.ARGO_WORK_DIRECTORY,str(job_id))
os.makedirs(path)
return path
class UserJobReceiver(MessageReceiver.MessageReceiver):
''' subscribes to the input user job queue and adds jobs to the database '''
def __init__(self,process_queue = None):
super(UserJobReceiver,self).__init__(
settings.RABBITMQ_USER_JOB_QUEUE_NAME,
settings.RABBITMQ_USER_JOB_ROUTING_KEY,
settings.RABBITMQ_SERVER_NAME,
settings.RABBITMQ_SERVER_PORT,
settings.RABBITMQ_USER_EXCHANGE_NAME,
settings.RABBITMQ_SSL_CERT,
settings.RABBITMQ_SSL_KEY,
settings.RABBITMQ_SSL_CA_CERTS,
)
self.process_queue = process_queue
# This is where the real processing of incoming messages happens
def consume_msg(self,channel,method_frame,header_frame,body):
logger.debug('in consume_msg')
if body is not None:
logger.debug(' received message: ' + body )
# convert body text to ArgoUserJob
try:
userjob = ArgoUserJob.deserialize(body)
except Exception,e:
logger.error(' received exception while deserializing message to create ArgoUserJob, \nexception message: ' + str(e) + '\n message body: \n' + body + ' \n cannot continue with this job, ignoring it and moving on.')
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
return
# create unique DB connection string
try:
db_connection_id = models.get_db_connection_id(os.getpid())
db_backend = load_backend(connections.databases[DEFAULT_DB_ALIAS]['ENGINE'])
db_conn = db_backend.DatabaseWrapper(connections.databases[DEFAULT_DB_ALIAS], db_connection_id)
connections[db_connection_id] = db_conn
except Exception,e:
logger.error(' received exception while creating DB connection, exception message: ' + str(e) + ' \n job id: ' + str(userjob.id) + ' job user: ' + userjob.username + ' job description: ' + userjob.description + '\n cannot continue with this job, moving on.')
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
return
# create ArgoJob and initialize it
try:
argojob = models.ArgoJob()
argojob.argo_job_id = models.ArgoJob.generate_job_id()
logger.debug(' created ArgoJob with id: ' + str(argojob.argo_job_id) + ' (pk=' + str(argojob.pk) +')')
argojob.working_directory = CreateWorkingPath(argojob.argo_job_id)
argojob.user_id = userjob.job_id
argojob.job_name = userjob.job_name
argojob.job_description = userjob.job_description
argojob.group_identifier = userjob.group_identifier
argojob.username = userjob.username
argojob.email = userjob.email
argojob.input_url = userjob.input_url
argojob.output_url = userjob.output_url
argojob.job_status_routing_key = userjob.job_status_routing_key
# if there are no subjobs, there isn't anything to do
if len(userjob.subjobs) == 0:
logger.error(' Job received with no subjobs, failing job and moving on.')
argojob.state_id = models.REJECTED.id
argojob.save()
message = 'Job rejected because there are no subjobs.'
models.send_status_message(job,message)
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
del connections[db_connection_id]
return
# initialize subjobs
subjobs = []
for usersubjob in userjob.subjobs:
argosubjob = models.ArgoSubJob()
argosubjob.site = usersubjob.site
argosubjob.subjob_id = models.ArgoJob.generate_job_id()
argosubjob.name = usersubjob.job_name
argosubjob.description = usersubjob.job_description
argosubjob.argo_job_id = argojob.argo_job_id
argosubjob.queue = usersubjob.queue
argosubjob.project = usersubjob.project
argosubjob.wall_time_minutes = usersubjob.wall_time_minutes
argosubjob.num_nodes = usersubjob.num_nodes
argosubjob.processes_per_node = usersubjob.processes_per_node
argosubjob.scheduler_config = usersubjob.scheduler_config
argosubjob.application = usersubjob.application
argosubjob.config_file = usersubjob.task_input_file
argosubjob.input_url = (
settings.GRIDFTP_PROTOCOL +
settings.GRIDFTP_SERVER +
argojob.working_directory
)
argosubjob.output_url = (
settings.GRIDFTP_PROTOCOL +
settings.GRIDFTP_SERVER +
argojob.working_directory
)
argosubjob.save()
subjobs.append(argosubjob.pk)
argojob.subjobs = Serializer.serialize(subjobs)
argojob.save()
except Exception,e:
message = 'received an exception while parsing the incomping user job. Exception: ' + str(e) + '; userjob id = ' + str(userjob.job_id) + '; argo_job_id = ' + str(argojob.argo_job_id) + '; job_name = ' + userjob.job_name
logger.error(message)
# delete DB connection
del connections[db_connection_id]
else:
logger.error('received user job message with no body')
# acknoledge message
channel.basic_ack(method_frame.delivery_tag)
from django.contrib import admin
# Register your models here.
from __future__ import unicode_literals
from django.apps import AppConfig
class ArgoCoreConfig(AppConfig):
name = 'argo'
from __future__ import absolute_import, unicode_literals
import copy
from itertools import chain
import warnings
#from django.conf import settings
#from django.forms.util import flatatt, to_current_timezone
#from django.utils.datastructures import MultiValueDict, MergeDict
#from django.utils.html import conditional_escape, format_html
#from django.utils.translation import ugettext_lazy
#from django.utils.encoding import force_text, python_2_unicode_compatible
#from django.utils.safestring import mark_safe
#from django.utils import datetime_safe, formats, six
#from django.utils.six.moves.urllib.parse import urljoin
from django import forms
from django.forms.widgets import CheckboxInput
from argo import models
import logging
logger = logging.getLogger(__name__)
list_of_tuples = []
for state in models.STATES:
list_of_tuples.append( (state.name,state.name) )
ALL_STATES = tuple(list_of_tuples)
class JobDisplayForm(forms.Form):
state = forms.ChoiceField(required=True,widget=forms.Select,choices=ALL_STATES)
group_identifier = forms.CharField(required=True,widget=forms.TextInput(attrs={'size':150}))
email = forms.CharField(required=True,widget=forms.TextInput(attrs={'size':150}))
confirm_delete = forms.BooleanField(required=False)
def __init__(self, current_subjob_choices=[('-1','no choices')], *args, **kwargs):
super(JobDisplayForm, self).__init__(*args, **kwargs)
self.fields['current_subjob'] = forms.ChoiceField(choices=current_subjob_choices)
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from argo import models,UserJobReceiver
from common import Serializer
import os,sys,time,logging
logger = logging.getLogger('console')
class Command(BaseCommand):
help = 'Adds a test job to the Argo DB'
logger.debug('Adds a test job to the Argo DB')
def add_arguments(self,parser):
parser.add_argument('-s',type=str,help="The name of the Balsam site to submit a job.",default='argo_cluster_dev')
parser.add_argument('-n',type=int,help='The number of subjobs to include',default=1)
def handle(self, *args, **options):
n_subjobs = options['n']
site = options['s']
argojob = models.ArgoJob()
argojob.argo_job_id = models.ArgoJob.generate_job_id()
argojob.working_directory = UserJobReceiver.CreateWorkingPath(argojob.argo_job_id)
argojob.user_id = 0
argojob.job_name = 'test job'
argojob.job_description = 'A job to test ARGO/Balsam.'
argojob.group_identifier = 'argo_add_test_job'
argojob.username = os.environ['USER']
#argojob.email = userjob.email
#argojob.input_url = userjob.input_url
#argojob.output_url = userjob.output_url
#argojob.job_status_routing_key = userjob.job_status_routing_key
subjobs = []
for i in range(n_subjobs):
argosubjob = models.ArgoSubJob()
argosubjob.site = site
argosubjob.balsam_job_id = models.ArgoJob.generate_job_id()
argosubjob.name = 'ARGO Test subjob %i' % i
argosubjob.description = 'A subjob of an ARGO test job'
argosubjob.origin_id = argojob.argo_job_id
#argosubjob.queue = usersubjob.queue
#argosubjob.project = usersubjob.project
argosubjob.wall_time_minutes = 60
argosubjob.num_nodes = 1
argosubjob.processes_per_node = 2
#argosubjob.scheduler_config_id = usersubjob.scheduler_config_id
argosubjob.application = 'test'
argosubjob.config_file = create_test_input(os.path.join(argojob.working_directory,'test_config_'+str(i)+'.txt'))
argosubjob.input_url = (
settings.GRIDFTP_PROTOCOL +
settings.GRIDFTP_SERVER +
argojob.working_directory
)
argosubjob.output_url = (
settings.GRIDFTP_PROTOCOL +
settings.GRIDFTP_SERVER +
argojob.working_directory
)
argosubjob.save()
subjobs.append(argosubjob.pk)
argojob.subjob_pk_list = Serializer.serialize(subjobs)
argojob.save()
logger.info(' created ArgoJob with id: ' + str(argojob.argo_job_id) + ' (pk=' + str(argojob.pk) +') with ' + str(len(subjobs)) + ' balsam jobs.')
def create_test_input(filename):
filecontent='NAME=hello_test\nFILENAME=%s' % filename
with open(filename,'w') as f:
f.write(filecontent)
f.close()
return filename
from django.core.management.base import BaseCommand, CommandError
from argo import models
try:
input = raw_input
except NameError:
pass
import os,logging
logger = logging.getLogger('console')
fields_to_skip = [
'time_finished',
'current_subjob_pk_index',
'time_created',
'state',
'subjob_pk_list',
'working_directory',
'time_modified',
'id',
]
class Command(BaseCommand):
help = 'Dump a python class file which a User can use to submit an ARGO job.'
def add_arguments(self,parser):
parser.add_argument('-o','--output-file',dest='output_filename',help='The filename to which to write the output.',default='ArgoUserJob.py')
def handle(self, *args, **options):
logger.info('Dump a ArgoUserJob python class.')
bj = models.ArgoJob()
output_filename = options['output_filename']
if os.path.exists(output_filename):
raise Exception(' File already exists: ' + output_filename)
with open(output_filename,'w') as outfile:
outfile.write('''
class ArgoUserJob:
def __init__(self):
''')
for var,val in bj.__dict__.iteritems():
if var[0] == '_': continue
if var in fields_to_skip: continue
if isinstance(val,str):
val = "'" + val + "'"
outfile.write(' self.' + var + ' = ' + str(val) + '\n')
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from argo.UserJobReceiver import UserJobReceiver
from argo.models import ArgoJob
import os,sys,time,multiprocessing
import logging
logger = logging.getLogger('console')
class Command(BaseCommand):
help = 'Lists jobs stored in the DB, '
logger.debug('Listing jobs in the ARGO Service DB.')
def add_arguments(self,parser):
parser.add_argument('-pk', nargs='+', type=int, help="If given, only speficied pks will be reported.")
def handle(self, *args, **options):
process = {}
jobs = []
if options['pk'] is None:
jobs = ArgoJob.objects.all()
else:
jobs = ArgoJob.objects.filter(pk__in=options['pk'])
logger.info(str(len(jobs)) + ' jobs in the DB')
if len(jobs) > 0:
list = '\n\n'
list += ArgoJob.get_header() + '\n'
list += '---------------------------------------------------------------------------------------------------------------------------------------------------------------------\n'
for job in jobs:
list += job.get_line_string() + '\n'
list += '\n\n'
logger.info(list)
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from argo.models import ArgoJob,ArgoSubJob
import os,sys,time,multiprocessing
import logging
logger = logging.getLogger('console')
class Command(BaseCommand):
help = 'Lists subjobs stored in the DB, '
logger.debug('Listing subjobs in the ARGO Service DB.')
def add_arguments(self,parser):
parser.add_argument('-pk', nargs='+', type=int, help="If given, only speficied pks will be reported.")
def handle(self, *args, **options):
process = {}
subjobs = []
if options['pk'] is None:
subjobs = ArgoSubJob.objects.all()
else:
subjobs = ArgoSubJob.objects.filter(pk__in=options['pk'])
logger.info(str(len(subjobs)) + ' subjobs in the DB')
if len(subjobs) > 0:
list = '\n\n'