Commit 002696a6 authored by Michael Salim's avatar Michael Salim

Major changes to Balsam model; started writing Balsam launcher

parent 96d0e610
......@@ -6,6 +6,7 @@ log
argojobs
balsamjobs
argobalsam_env
env
db.sqlite3
argo/migrations
balsam/migrations
......@@ -21,7 +21,7 @@ from user_settings import *
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CONCURRENCY_ENABLED = True
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.9/howto/deployment/checklist/
......@@ -39,7 +39,6 @@ ALLOWED_HOSTS = []
INSTALLED_APPS = [
'balsam.apps.BalsamCoreConfig',
'argo.apps.ArgoCoreConfig',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
......
......@@ -17,6 +17,6 @@ from django.conf.urls import url,include
from django.contrib import admin
urlpatterns = [
url(r'^argo/',include('argo.urls')),
#url(r'^argo/',include('argo.urls')),
url(r'^admin/', admin.site.urls),
]
import logging,sys
logger = logging.getLogger(__name__)
from common import Serializer
......
from common_core.MessageInterface import MessageInterface
from balsam_core.BalsamJobMessage import BalsamJobMessage
from balsam_core.job_sources.StatusMessage import StatusMessage
import logging,time,sys
logger = logging.getLogger(__name__)
class NoMoreJobs(Exception): pass
class JobListener:
''' opens message interface and receives jobs '''
def __init__(self,site_name):
self.site_name = site_name
self.msgInt = MessageInterface()
# open connection that just stays open until class is destroyed
self.msgInt.open_blocking_connection()
# make sure machine queue exists
self.msgInt.create_queue(self.site_name,self.site_name)
def get_job_to_submit(self):
# create empty message
msg = BalsamJobMessage()
# request message
method,properties,body = self.msgInt.receive_msg(self.site_name)
if method is not None:
# acknowledge receipt of the message
self.msgInt.send_ack(method.delivery_tag)
logger.debug('BalsamMsgHandler.recv_new_job: received message')
try:
msg.load(body)
except:
logger.error( ' received exception while loading message body into BalsamJobMessage: ' + str(sys.exc_info()[1]))
raise
logger.debug(str(msg))
return msg
else:
logger.debug('No new job message received')
raise NoMoreJobs('No jobs available')
return None
def get_jobs_to_submit(self):
jobs = []
i = 0
# only get 10 jobs at a time so as not to overwhelm the system
while i < 10:
try:
new_job = self.get_job_to_submit()
except NoMoreJobs,e:
logger.debug('Done retrieving jobs')
break
jobs.append(new_job)
logger.debug('retrieved ' + str(len(jobs)) + ' jobs to process.')
return jobs
def send_job_failed(machine_name,job_id,message=None):
logger.debug(' sending job failed message ')
msg = StatusMessage.FAILED
if message is not None:
msg = (StatusMessage.FAILED | message)
send_job_status_msg(machine_name,operation = '',job_id=job_id,message=msg)
def send_job_finished(machine_name,job_id,message=None):
logger.debug(' sending job succeeded message ')
msg = StatusMessage.SUCCEEDED
if message is not None:
msg = (StatusMessage.SUCCEEDED | message)
send_job_status_msg(machine_name,operation='',job_id=job_id,message=msg)
def send_job_status_msg(machine_name,
operation,
job_id,
message = '',
priority = 0, # make message persistent
delivery_mode = 2, # default
):
logger.debug('sending job status message: ' + str(message))
timestamp = time.time()
# create message interface
msgInt = MessageInterface()
msgInt.open_blocking_connection()
# create a header
headers = {
'hpc': machine_name,
'taskID': job_id,
'operation': operation,
'created': int(timestamp),
}
# send message
msgInt.send_msg(str(message),
str(job_id),
exchange_name = None,
message_headers = headers,
)
# close connection
msgInt.close()
This diff is collapsed.
import logging
logger = logging.getLogger(__name__)
class StatusMessage:
NO_MESSAGE = 0x0
SUCCEEDED = 0x1 << 0
SUBMIT_DISABLED = 0x1 << 1
FAILED = 0x1 << 2
INVALID_EXE = 0x1 << 3
message_list = [
NO_MESSAGE,
SUCCEEDED,
SUBMIT_DISABLED,
FAILED,
INVALID_EXE,
]
message_text = {
NO_MESSAGE:'no message',
SUCCEEDED:'succeeded',
SUBMIT_DISABLED:'submission disabled',
FAILED:'failed',
INVALID_EXE:'invalid executable',
}
def __init__(self):
self.message = StatusMessage.NO_MESSAGE
def __str__(self):
out = []
for message in StatusMessage.message_list:
if message & self.message:
out.append(StatusMessage.message_text[message])
return str(out)
def contains(self,flag):
if flag in StatusMessage.message_list:
if self.message & flag:
return True
else:
logger.warning('Invalid Flag passed')
return False
import urllib2
def get_jobs():
job_source_url = 'http://www.mcs.anl.gov/~turam/balsam/jobs'
response = urllib2.urlopen(job_source_url)
html = response.read()
urls = html.split('\n')
urls = [u for u in urls if len(u) > 0]
return urls
if __name__ == '__main__':
print get_job_urls()
import pika
import sys
import time
import datetime
import json
import logging
from django.conf import settings
from balsam_core.MessageInterface import MessageInterface
class NoMoreMessages(Exception): pass
credentials = pika.PlainCredentials(settings.BALSAM_PIKA_USERNAME, settings.BALSAM_PIKA_PASSWORD)
parameters = pika.ConnectionParameters(
host=settings.BALSAM_PIKA_HOSTNAME,
port=settings.BALSAM_PIKA_PORT,
virtual_host='/',
credentials=credentials,
socket_timeout=120)
def get_job_to_estimate():
global credentials, parameters
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
queue_name = '%s.getjobs' % settings.BALSAM_SITE
method_frame, header_frame, body = channel.basic_get(queue=queue_name)
if method_frame:
# print method_frame
# print header_frame
# print body
channel.basic_ack(method_frame.delivery_tag)
else:
raise NoMoreMessages('No message returned')
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
# print 'Requeued %i messages' % requeued_messages
# Close the channel and the connection
channel.close()
connection.close()
b = eval(body)
return b
def get_jobs_to_estimate():
urls = []
try:
while 1:
urls.append( get_job_to_estimate() )
except NoMoreMessages:
#print "No more messages"
pass
return urls
def get_job_to_submit():
global credentials, parameters
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
queue_name = '%s.submit' % settings.BALSAM_SITE
method_frame, header_frame, body = channel.basic_get(queue=queue_name)
if method_frame:
# print method_frame
# print header_frame
# print body
channel.basic_ack(method_frame.delivery_tag)
else:
raise NoMoreMessages('No message returned')
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
# print 'Requeued %i messages' % requeued_messages
# Close the channel and the connection
channel.close()
connection.close()
b = eval(body)
return b
def get_jobs_to_submit():
urls = []
try:
while 1:
urls.append( get_job_to_submit() )
except NoMoreMessages:
#print "No more messages"
pass
return urls
def send_message(job_id, operation, message='dummy message'):
global credentials, parameters
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
exchange_name = 'hpc'
hpc_name = settings.BALSAM_SITE
taskid_str = str(job_id)
routing_key = '%s.%s.%s' % (settings.BALSAM_SITE, str(job_id), operation)
timestamp = time.time()
headers = { # example how headers can be used
'hpc': hpc_name,
'taskID':taskid_str,
'operation':operation,
'created': int(timestamp)
}
data = { # example hot to transfer objects rather than string using json.dumps and json.loads
'hpc': hpc_name,
'taskID':taskid_str,
'operation':operation,
'created': int(timestamp),
'message': message
}
properties=pika.BasicProperties(
delivery_mode=2, # makes persistent job
priority=0, # default priority
timestamp=timestamp, # timestamp of job creation
headers=headers )
channel.basic_publish(exchange=exchange_name,
routing_key=routing_key,
body=json.dumps(data), # must be a string
properties=properties)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
def send_job_estimate(job_id, estimate):
send_message(job_id, 'jobestimate', estimate)
def send_job_submit(job_id):
send_message(job_id, 'submit','http://atlasgridftp02.hep.anl.gov:40000/hpc/jobs/testjob.xml')
def send_job_failed(job_id, message):
send_message(job_id, 'finishedjob', 'Error: ' + message)
def send_job_finished(job_id):
send_message(job_id, 'finishedjob', 'Success')
if __name__ == '__main__':
print get_jobs_to_estimate()
print get_jobs_to_submit()
/one/test/path /apps/super.py --doit 3 --now 3
fsdf
/that/crazy/job/dir /home/msalim/bin/nwchem.x in.nw
0 0
/Users/misha /home/msalim/bin/nwchem.x fksdj
'''The Launcher is either invoked by the user, who bypasses the Balsam
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
import argparse
import os
import time
from django.conf import settings
import balsam.models
from balsam.models import BalsamJob, ApplicationDefinition
START_TIME = time.time() + 10.0
class BalsamLauncherException(Exception): pass
SIGTIMEOUT = 'TIMEOUT!'
SIGNALS = {
signal.SIGINT: 'SIG_INT',
signal.SIGTERM: 'SIG_TERM',
}
class JobRetriever:
'''Use the get_jobs method to pull valid jobs for this run'''
def __init__(self, config):
self.job_pk_list = None
self.job_file = config.job_file
self.wf_name = config.wf_name
self.host_type = config.host_type
def get_jobs(self):
if self._job_file:
jobs = self._jobs_from_file()
else:
wf_name = options.consume_wf
jobs = self._jobs_from_wf(wf=wf_name)
return self._filter(jobs)
def _filter(self, jobs):
jobs = jobs.exclude(state__in=balsam.models.END_STATES)
jobs = jobs.filter(allowed_work_sites__icontains=settings.BALSAM_SITE)
return jobs
def _jobs_from_file(self):
if self._job_pk_list is None:
try:
pk_strings = open(self._job_file).read().split()
except IOError as e:
raise BalsamLauncherException(f"Can't read {self._job_file}") from e
try:
self._job_pk_list = [uuid.UUID(pk) for pk in pk_strings]
except ValueError:
raise BalsamLauncherException(f"{self._job_file} contains bad UUID strings")
try:
jobs = BalsamJob.objects.filter(job_id__in=self._job_file_pk_list)
except Exception as e:
raise BalsamLauncherException("Failed to query BalsamJobDB") from e
else:
return jobs
def _jobs_from_wf(self, wf=''):
objects = BalsamJob.objects
try:
jobs = objects.filter(workflow=wf) if wf else objects.all()
except Exception as e:
raise BalsamLauncherException(f"Failed to query BalsamJobDB for '{wf}'") from e
else:
self._job_pk_list = [job.pk for job in jobs]
return jobs
class LauncherConfig:
'''Set user- and environment-specific settings for this run'''
RECOGNIZED_HOSTS = {
'BGQ' : 'vesta cetus mira'.split(),
'CRAY' : 'theta'.split(),
}
def __init__(self, args):
self.hostname = None
self.host_type = None
self.scheduler_id = None
self.num_nodes = None
self.partition = None
self.walltime_seconds = None
self.job_file = args.job_file
self.wf_name = args.consume_wf
self.consume_all = args.consume_all
self.num_workers = args.num_workers
self.ranks_per_worker_serial = args.ppn_serial
self.walltime_minutes = args.time_limit_minutes
self.set_hostname_and_type()
self.query_scheduler()
if self.walltime_minutes is not None:
self.walltime_seconds = self.walltime_minutes * 60
def set_hostname_and_type(self):
from socket import gethostname
hostname = gethostname().lower()
self.hostname = hostname
for host_type, known_names in RECOGNIZED_HOSTS.values():
if any(hostname.find(name) >= 0 for name in known_names):
self.host_type = host_type
return
self.host_type = None # default
def query_scheduler(self):
if not scheduler.scheduler_class:
return
env = scheduler.get_environ()
self.scheduler_id = env.id
self.num_nodes = env.num_nodes
self.partition = env.partition
info = scheduler.get_job_status(self.scheduler_id)
self.walltime_seconds = info['walltime_sec']
def elapsed_time_seconds(self):
return time.time() - START_TIME
def remaining_time_seconds(self):
if self.walltime_seconds:
elasped = self.elapsed_time_seconds()
return self.walltime_seconds - elapsed
else:
return float("inf")
def sufficient_time(self, job):
return 60*job.wall_time_minutes < self.remaining_time_seconds()
def check_timeout(self):
if self.remaining_time_seconds() < 1.0:
for runner in self.active_runners:
runner.timeout(SIGTIMEOUT, None)
return True
return False
def main(args):
launcher_config = LauncherConfig(args)
job_retriever = JobRetriever(launcher_config)
runners.setup(launcher_config)
workers = WorkerGroup(launcher_config)
workers.
# Initialize compute environment (any launcher actions that take place
# before jobs start): on BGQ, this means booting blocks
# Create workdirs for jobs: organized by workflow
# Job dirs named according to job.name (not pk)...use just enough of pk to
# resolve conflicts
# Query Balsam DB
# Run 10 multiprocessing "transitions": communicate via 2 queues
# Add jobs to transitions queue
# Maintain up to 50 active runners (1 runner tracks 1 subprocess-aprun)
# Handle timeout: invoke TIMEOUT in active runners; let transitions finish
# Stop adding new transitions to queue,
# Add transitions to error_handle all the RUN_TIMEOUT jobs
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Start Balsam Job Launcher.")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--job-file', help="File of Balsam job IDs")
group.add_argument('--consume-all', action='store_true',
help="Continuously run all jobs from DB")
group.add_argument('--consume-wf',
help="Continuously run jobs of specified workflow")
parser.add_argument('--num_workers', type=int, default=1,
help="Theta: defaults to # nodes. BGQ: the # of subblocks")
parser.add_argument('--ppn-serial', type=int, default=4,
help="For non-MPI jobs, how many to pack per worker")
parser.add_argument('--time-limit-minutes', type=int,
help="Override auto-detected walltime limit (runs
forever if no limit is detected or specified)")
args = parser.parse_args()
main(args)
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
print(rank)
from collections import namedtuple
from contextlib import nested
import os
import sys
from subprocess import Popen
from mpi4py import MPI
from runners import cd
class MPIEnsembleError(Exception): pass
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
Job = namedtuple('Job', ['id', 'workdir', 'cmd'])
def status_msg(id, state, msg=''):
print(f"{id} {state} {msg}", flush=True)
def read_jobs(fp):
for line in fp:
try:
id, workdir, *command = line.split()
except:
continue
if id and command and os.path.isdir(workdir):
yield Job(id, workdir, command)
def run(job):
basename = os.path.basename(job.workdir)
outname = f"{basename}.out"
with nested(cd(job.workdir), open(outname, 'wb')) as (_,outf):
try:
status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
proc = Popen(job.cmd, stdout=outf, stderr=outf)
retcode = proc.wait()
except Exception as e:
status_msg(job.id, "RUN_ERROR", msg=str(e))
raise MPIEnsembleError from e
else:
if retcode == 0:
status_msg(job.id, "RUN_FINISHED")
else:
status_msg(job.id, "RUN_ERROR", msg=f"error code {retcode}")
finally:
proc.terminate()
def main(jobs_path):
job_list = None
proc = None
if RANK == 0:
with open(jobs_path) as fp:
job_list = list(read_jobs(fp))
job_list = COMM.bcast(job_list, root=0)
for job in job_list[RANK::COMM.size]:
run(job)
if __name__ == "__main__":
path = sys.argv[1]
main(path)
'''BalsamJob pre and post execution Transitions'''
import logging
from common import transfer, MessageInterface, run_subprocess
from common import db_tools
logger = logging.getLogger(__name__)
from balsam import BalsamStatusSender
#from django.db import utils, connections, DEFAULT_DB_ALIAS
from django.core.exceptions import ObjectDoesNotExist
from balsam.schedulers import exceptions
def main(job_queue, status_queue):
while True:
job, process_function = job_queue.get()
process_function(job)
def check_parents(job):
pass
def stage_in(job):
''' if the job an input_url defined,
the files are copied to the local working_directory '''
logger.debug('in stage_in')
message = 'job staged in'
if job.input_url != '':
try:
transfer.stage_in(job.input_url + '/', job.working_directory + '/')
job.state = STAGED_IN.name
except Exception as e:
message = 'Exception received during stage_in: ' + str(e)
logger.error(message)
job.state = STAGE_IN_FAILED.name
else:
# no input url specified so stage in is complete
job.state = STAGED_IN.name
job.save(
update_fields=['state'],
using=db_tools.get_db_connection_id(
job.pk))
status_sender = BalsamStatusSender.BalsamStatusSender(
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
# stage out files for a job
def stage_out(job):
''' if the job has files defined via the output_files and an output_url is defined,
they are copied from the local working_directory to the output_url '''
logger.debug('in stage_out')
message = None
if job.output_url != '':
try:
transfer.stage_out(
job.working_directory + '/',
job.output_url + '/')
job.state = STAGED_OUT.name
except Exception as e:
message = 'Exception received during stage_out: ' + str(e)
logger.error(message)
job.state = STAGE_OUT_FAILED.name
else:
# no output url specififed so stage out is complete
job.state = STAGED_OUT.name
job.save(
update_fields=['state'],
using=db_tools.get_db_connection_id(
job.pk))
status_sender = BalsamStatusSender.BalsamStatusSender(
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
# preprocess a job