Commit 6e22729b authored by Michael Salim's avatar Michael Salim
Browse files

working on launcher

parent 185c199a
......@@ -6,3 +6,5 @@ class NoAvailableWorkers(BalsamRunnerException): pass
class BalsamTransitionError(Exception): pass
class TransitionNotFoundError(BalsamTransitionException, ValueError): pass
class MPIEnsembleError(Exception): pass
from collections import defaultdict
import balsam.models
from balsam.models import BalsamJob
......
......@@ -2,22 +2,19 @@
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
import argparse
from collections import defaultdict
import os
import multiprocessing
import queue
from sys import exit
import signal
import time
import django
from django.conf import settings
from django.db import transaction
from balsam import scheduler
from balsam.launcher import jobreader
from balsam.launcher import transitions
from balsam.launcher import worker
from balsam.launcher import runners
from balsam.launcher.exceptions import *
START_TIME = time.time() + 10.0
......@@ -35,6 +32,7 @@ def delay(period=settings.BALSAM_SERVICE_PERIOD):
nexttime = now + tosleep + period
yield
class HostEnvironment:
'''Set user- and environment-specific settings for this run'''
RECOGNIZED_HOSTS = {
......@@ -109,6 +107,7 @@ def get_runnable_jobs(jobs, running_pks, host_env):
return runnable_jobs
def create_new_runners(jobs, runner_group, worker_group, host_env):
created_one = False
running_pks = runner_group.running_job_pks
runnable_jobs = get_runnable_jobs(jobs, running_pks, host_env)
while runnable_jobs:
......@@ -117,8 +116,11 @@ def create_new_runners(jobs, runner_group, worker_group, host_env):
except (ExceededMaxRunners, NoAvailableWorkers) as e:
break
else:
created_one = True
running_pks = runner_group.running_job_pks
runnable_jobs = get_runnable_jobs(jobs, running_pks, host_env)
return created_one
def main(args, transition_pool, runner_group, job_source):
host_env = HostEnvironment(args)
......@@ -144,8 +146,8 @@ def main(args, transition_pool, runner_group, job_source):
wait = False
any_finished = runner_group.update_and_remove_finished()
create_new_runners(job_source.jobs, runner_group, worker_group, host_env)
if any_finished: wait = False
created = create_new_runners(job_source.jobs, runner_group, worker_group, host_env)
if any_finished or created: wait = False
if wait: next(delay_timer)
def on_exit(runner_group, transition_pool, job_source):
......@@ -181,6 +183,9 @@ def get_args():
forever if no limit is detected or specified)")
return parser.parse_args()
def detect_dead_runners(job_source):
for job in job_source.by_states['RUNNING']:
job.update_state('RESTART_READY', 'Detected dead runner')
if __name__ == "__main__":
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
......@@ -188,9 +193,12 @@ if __name__ == "__main__":
args = get_args()
job_source = jobreader.JobReader.from_config(args)
job_source.refresh_from_db()
runner_group = runners.RunnerGroup()
transition_pool = transitions.TransitionProcessPool()
detect_dead_runners(job_source)
handl = lambda a,b: on_exit(runner_group, transition_pool, job_source)
signal.signal(signal.SIGINT, handl)
signal.signal(signal.SIGTERM, handl)
......
......@@ -6,8 +6,7 @@ from subprocess import Popen, STDOUT
from mpi4py import MPI
from balsam.launcher.runners import cd
class MPIEnsembleError(Exception): pass
from balsam.launcher.exceptions import *
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
......
'''A Runner is constructed with a list of jobs and a list of idle workers. It
creates and monitors the execution subprocess, updating job states in the DB as
necessary. RunnerGroup contains the list of Runner objects, logic for creating
the next Runner (i.e. assigning jobs to nodes), and the public interface'''
necessary. RunnerGroup has a collection of Runner objects, logic for creating
the next Runner (i.e. assigning jobs to nodes), and the public interface to
monitor runners'''
import functools
from math import ceil
......@@ -15,6 +16,7 @@ from threading import Thread
from queue import Queue, Empty
from django.conf import settings
from django.db import transaction
import balsam.models
from balsam.launcher import mpi_commands
......@@ -197,15 +199,18 @@ class RunnerGroup:
rpw = workers[0].ranks_per_worker
assert all(w.ranks_per_worker == rpw for w in idle_workers)
serial_jobs = [j for j in runnable_jobs if j.num_nodes == 1 and
j.processes_per_node == 1]
serial_jobs = [j for j in runnable_jobs
if j.num_nodes == 1 and j.processes_per_node == 1]
nserial = len(serial_jobs)
mpi_jobs = [j for j in runnable_jobs if 1 < j.num_nodes <= nidle or
(1==j.num_nodes<=nidle and j.processes_per_node > 1)]
(1==j.num_nodes<=nidle and j.processes_per_node > 1)]
largest_mpi_job = (max(mpi_jobs, key=lambda job: job.num_nodes)
if mpi_jobs else None)
# Try to fill all available nodes with serial ensemble runner
# If there are not enough serial jobs; run the larger of:
# largest MPI job that fits, or the remaining serial jobs
if nserial >= nidle*rpw:
jobs = serial_jobs[:nidle*rpw]
assigned_workers = idle_workers
......@@ -243,5 +248,4 @@ class RunnerGroup:
@property
def running_job_pks(self):
active_runners = [r for r in self.runners if not r.finished()]
return [j.pk for runner in active_runners for j in runner.jobs]
return [j.pk for runner in self.runners for j in runner.jobs]
'''BalsamJob pre and post execution'''
from collections import namedtuple
import glob
import multiprocessing
import queue
import logging
from django.core.exceptions import ObjectDoesNotExist
......@@ -8,22 +11,26 @@ from django import db
from common import transfer
from balsam.launcher.exceptions import *
from balsam.models import BalsamJob
logger = logging.getLogger(__name__)
PREPROCESS_TIMEOUT_SECONDS = 300
StatusMsg = namedtuple('Status', ['pk', 'state', 'msg'])
JobMsg = namedtuple('JobMsg', ['pk', 'transition_function'])
JobMsg = namedtuple('JobMsg', ['job', 'transition_function'])
def main(job_queue, status_queue):
db.connection.close()
while True:
job, process_function = job_queue.get()
job_msg = job_queue.get()
job, transition_function = job_msg
if job == 'end': return
try:
process_function(job)
transition_function(job)
except BalsamTransitionError as e:
job.update_state('FAILED', str(e))
s = StatusMsg(job.pk, 'FAILED', str(e))
status_queue.put(s)
else:
......@@ -55,11 +62,10 @@ class TransitionProcessPool:
def add_job(self, job):
if job in self: raise BalsamTransitionError("already in transition")
if job.state not in TRANSITIONS: raise TransitionNotFoundError
pk = job.pk
transition_function = TRANSITIONS[job.state]
m = JobMsg(pk, transition_function)
m = JobMsg(job, transition_function)
self.job_queue.put(m)
self.transitions_pk_list.append(pk)
self.transitions_pk_list.append(job.pk)
def get_statuses():
while not self.status_queue.empty():
......@@ -90,167 +96,218 @@ def check_parents(job):
ready = all(p.state == 'JOB_FINISHED' for p in parents)
if ready:
job.update_state('READY', 'dependencies satisfied')
elif job.state == 'CREATED':
job.update_state('NOT_READY', 'awaiting dependencies')
elif job.state != 'AWAITING_PARENTS':
job.update_state('AWAITING_PARENTS', f'{len(parents)} pending jobs')
def stage_in(job):
# Create workdirs for jobs: use job.create_working_path
logger.debug('in stage_in')
job.update_state('STAGING_IN')
if not os.path.exists(job.working_directory):
job.create_working_path()
work_dir = job.working_directory
if job.input_url != '':
# stage in all remote urls
# TODO: stage_in remote transfer should allow a list of files and folders,
# rather than copying just one entire folder
url_in = job.stage_in_url
if url_in:
try:
transfer.stage_in(job.input_url + '/', job.working_directory + '/')
job.state = STAGED_IN.name
transfer.stage_in(f"{url_in}/", f"{work_dir}/")
except Exception as e:
message = 'Exception received during stage_in: ' + str(e)
logger.error(message)
job.state = STAGE_IN_FAILED.name
else:
# no input url specified so stage in is complete
job.state = STAGED_IN.name
raise BalsamTransitionError from e
# create unique symlinks to "input_files" patterns from parents
# TODO: handle data flow from remote sites transparently
matches = []
parents = job.get_parents()
input_patterns = job.input_files.split()
for parent in parents:
parent_dir = parent.working_directory
for pattern in input_patterns:
path = os.path.join(parent_dir, pattern)
matches.extend((parent.pk, glob.glob(path)))
for parent_pk, inp_file in matches:
basename = os.path.basename(inp_file)
newpath = os.path.join(work_dir, basename)
if os.path.exists(newpath): newpath += f"_{parent_pk}"
# pointing to src, named dst
os.symlink(src=inp_file, dst=newpath)
job.update_state('STAGED_IN')
job.update_state('STAGE_IN_DONE')
def stage_out(job):
''' if the job has files defined via the output_files and an output_url is defined,
they are copied from the local working_directory to the output_url '''
'''copy from the local working_directory to the output_url '''
logger.debug('in stage_out')
message = None
if job.output_url != '':
try:
transfer.stage_out(
job.working_directory + '/',
job.output_url + '/')
job.state = STAGED_OUT.name
except Exception as e:
message = 'Exception received during stage_out: ' + str(e)
logger.error(message)
job.state = STAGE_OUT_FAILED.name
else:
# no output url specififed so stage out is complete
job.state = STAGED_OUT.name
job.save(
update_fields=['state'],
using=db_tools.get_db_connection_id(
job.pk))
status_sender = BalsamStatusSender.BalsamStatusSender(
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
stage_out_patterns = job.stage_out_files.split()
work_dir = job.working_directory
matches = []
for pattern in stage_out_patterns:
path = os.path.join(work_dir, pattern)
matches.extend(glob.glob(path))
# preprocess a job
if matches:
with tempfile.TemporaryDirectory() as stagingdir:
try:
for f in matches:
base = os.path.basename(f)
dst = os.path.join(stagingdir, base)
os.link(src=f, dst=dst)
transfer.stage_out(f"{stagingdir}/", f"{job.stage_out_url}/")
except Exception as e:
message = 'Exception received during stage_out: ' + str(e)
logger.error(message)
raise BalsamTransitionError from e
job.update_state('JOB_FINISHED') # this completes the transitions
def preprocess(job):
''' Each job defines a task to perform, so tasks need preprocessing to prepare
for the job to be submitted to the batch queue. '''
logger.debug('in preprocess ')
message = 'Job prepocess complete.'
# get the task that is running
try:
app = ApplicationDefinition.objects.get(name=job.application)
if app.preprocess:
if os.path.exists(app.preprocess):
stdout = run_subprocess.run_subprocess(app.preprocess)
# write stdout to log file
f = open(os.path.join(job.working_directory, app.name +
'.preprocess.log.pid' + str(os.getpid())), 'wb')
f.write(stdout)
f.close()
job.state = PREPROCESSED.name
else:
message = ('Preprocess, "' + app.preprocess + '", of application, "' + str(job.application)
+ '", does not exist on filesystem.')
logger.error(message)
job.state = PREPROCESS_FAILED.name
else:
logger.debug('No preprocess specified for this job; skipping')
job.state = PREPROCESSED.name
except run_subprocess.SubprocessNonzeroReturnCode as e:
message = ('Preprocess, "' + app.preprocess + '", of application, "' + str(job.application)
+ '", exited with non-zero return code: ' + str(returncode))
logger.error(message)
job.state = PREPROCESS_FAILED.name
except run_subprocess.SubprocessFailed as e:
message = ('Received exception while running preprocess, "' + app.preprocess
+ '", of application, "' + str(job.application) + '", exception: ' + str(e))
# Get preprocesser exe
preproc_app = job.preprocess
if not preproc_app:
try:
app = job.get_application()
except ObjectDoesNotExist as e:
message = f"application {job.application} does not exist"
logger.error(message)
raise BalsamTransitionError(message)
preproc_app = app.default_preprocess
if not preproc_app:
job.update_state('PREPROCESSED', 'No preprocess: skipped')
return
if not os.path.exists(preproc_app):
#TODO: look for preproc in the EXE directories
message = f"Preprocessor {preproc_app} does not exist on filesystem"
logger.error(message)
job.state = PREPROCESS_FAILED.name
except ObjectDoesNotExist as e:
message = 'application,' + str(job.application) + ', does not exist.'
raise BalsamTransitionError
# Create preprocess-specific environment
envs = job.get_envs()
# Run preprocesser with special environment in job working directory
out = os.path.join(job.working_directory, f"preprocess.log.pid{os.getpid()}")
with open(out, 'wb') as fp:
fp.write(f"# Balsam Preprocessor: {preproc_app}")
try:
proc = subprocess.Popen(preproc_app, stdout=fp,
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=PREPROCESS_TIMEOUT_SECONDS)
except Exception as e:
message = f"Preprocess failed: {e}"
logger.error(message)
proc.kill()
raise BalsamTransitionError from e
if retcode != 0:
message = f"Preprocess Popen nonzero returncode: {retcode}"
logger.error(message)
job.state = PREPROCESS_FAILED.name
except Exception as e:
message = 'Received exception while in preprocess, "' + \
app.preprocess + '", for application ' + str(job.application)
logger.exception(message)
job.state = PREPROCESS_FAILED.name
job.save(
update_fields=['state'],
using=db_tools.get_db_connection_id(
job.pk))
status_sender = BalsamStatusSender.BalsamStatusSender(
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
# perform any post job processing needed
def postprocess(job):
''' some jobs need to have some postprocessing performed,
this function does this.'''
raise BalsamTransitionError(message)
job.update_state('PREPROCESSED', f"{os.path.basename(preproc_app)}")
def postprocess(job, *, error_handling=False, timeout_handling=False):
logger.debug('in postprocess ')
message = 'Job postprocess complete'
try:
app = ApplicationDefinition.objects.get(name=job.application)
if app.postprocess:
if os.path.exists(app.postprocess):
stdout = run_subprocess.run_subprocess(app.postprocess)
# write stdout to log file
f = open(os.path.join(job.working_directory, app.name +
'.postprocess.log.pid' + str(os.getpid())), 'wb')
f.write(stdout)
f.close()
job.state = POSTPROCESSED.name
else:
message = ('Postprocess, "' + app.postprocess + '", of application, "' + str(job.application)
+ '", does not exist on filesystem.')
logger.error(message)
job.state = POSTPROCESS_FAILED.name
if error_handling and timeout_handling:
raise ValueError("Both error-handling and timeout-handling is invalid")
if error_handling: logger.debug('Handling RUN_ERROR')
if timeout_handling: logger.debug('Handling RUN_TIMEOUT')
# Get postprocesser exe
postproc_app = job.postprocess
if not postproc_app:
try:
app = job.get_application()
except ObjectDoesNotExist as e:
message = f"application {job.application} does not exist"
logger.error(message)
raise BalsamTransitionError(message)
postproc_app = app.default_postprocess
# If no postprocesssor; move on (unless in error_handling mode)
if not postproc_app:
if error_handling:
message = "Trying to handle error, but no postprocessor found"
logger.warning(message)
raise BalsamTransitionError(message)
elif timeout_handling:
logger.warning('Unhandled job timeout: marking RESTART_READY')
job.update_state('RESTART_READY', 'marking for re-run')
return
else:
logger.debug('No postprocess specified for this job; skipping')
job.state = POSTPROCESSED.name
except run_subprocess.SubprocessNonzeroReturnCode as e:
message = ('Postprocess, "' + app.postprocess + '", of application, "' + str(job.application)
+ '", exited with non-zero return code: ' + str(returncode))
logger.error(message)
job.state = POSTPROCESS_FAILED.name
except run_subprocess.SubprocessFailed as e:
message = ('Received exception while running postprocess, "' + app.preprocess
+ '", of application, "' + str(job.application) + '", exception: ' + str(e))
logger.error(message)
job.state = POSTPROCESS_FAILED.name
except ObjectDoesNotExist as e:
message = 'application,' + str(job.application) + ', does not exist.'
job.update_state('POSTPROCESSED', 'No postprocess: skipped')
return
if not os.path.exists(postproc_app):
#TODO: look for postproc in the EXE directories
message = f"Postprocessor {postproc_app} does not exist on filesystem"
logger.error(message)
job.state = POSTPROCESS_FAILED.name
except Exception as e:
message = 'Received exception while in postprocess, "' + \
app.postprocess + '", for application ' + str(job.application)
raise BalsamTransitionError
# Create postprocess-specific environment
envs = job.get_envs(timeout=timeout_handling, error=error_handling)
# Run postprocesser with special environment in job working directory
out = os.path.join(job.working_directory, f"postprocess.log.pid{os.getpid()}")
with open(out, 'wb') as fp:
fp.write(f"# Balsam Postprocessor: {postproc_app}\n")
if timeout_handling: fp.write("# Invoked to handle RUN_TIMEOUT\n")
if error_handling: fp.write("# Invoked to handle RUN_ERROR\n")
try:
proc = subprocess.Popen(postproc_app, stdout=fp,
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=POSTPROCESS_TIMEOUT_SECONDS)
except Exception as e:
message = f"Postprocess failed: {e}"
logger.error(message)
proc.kill()
raise BalsamTransitionError from e
if retcode != 0:
message = f"Postprocess Popen nonzero returncode: {retcode}"
logger.error(message)
job.state = POSTPROCESS_FAILED.name
raise BalsamTransitionError(message)
# If postprocessor handled error or timeout, it should have changed job's
# state. If it failed to do this, mark FAILED. Otherwise, POSTPROCESSED.
job.refresh_from_db()
if error_handling and job.state == 'RUN_ERROR':
message = f"Error handling failed to change job state: marking FAILED"
logger.warning(message)
raise BalsamTransitionError(message)
if timeout_handling and job.state == 'RUN_TIMEOUT':
message = f"Timeout handling failed to change job state: marking FAILED"
logger.warning(message)
raise BalsamTransitionError(message)
if not (error_handling or timeout_handling):
job.update_state('POSTPROCESSED', f"{os.path.basename(postproc_app)}")
def handle_timeout(job):
if job.post_timeout_handler:
postprocess(job, timeout_handling=True)
elif job.auto_timeout_retry:
job.update_state('RESTART_READY', 'timedout: auto retry')
else:
raise BalsamTransitionError("No timeout handling: marking FAILED")
def handle_run_error(job):
if job.post_error_handler:
postprocess(job, error_handling=True)
else:
raise BalsamTransitionError("No error handler: run failed")
job.save(
update_fields=['state'],
using=db_tools.get_db_connection_id(
job.pk))
status_sender = BalsamStatusSender.BalsamStatusSender(
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
TRANSITIONS = {
'CREATED': check_parents,
......@@ -259,7 +316,7 @@ TRANSITIONS = {
'READY': stage_in,
'STAGED_IN': preprocess,
'RUN_DONE': postprocess,
'RUN_TIMEOUT': postprocess,
'RUN_ERROR': postprocess,
'RUN_TIMEOUT': handle_timeout,
'RUN_ERROR': handle_run_error,
'POSTPROCESSED': stage_out,
}
......@@ -137,15 +137,15 @@ class BalsamJob(models.Model):
input_files = models.TextField(
'Input File Patterns',
help_text="A string of filename patterns that will be searched in the parents'"\
help_text="Space-delimited filename patterns that will be searched in the parents'"\
"working directories. Every matching file will be made available in this"\
"job's working directory (symlinks for local Balsam jobs, file transfer for"\
"remote Balsam jobs). Default: all files from parent jobs are made available.",
default='*')
stage_in_urls = models.TextField(
stage_in_url = models.TextField(
'External stage in files or folders', help_text="A list of URLs for external data to be staged in prior to job processing. Job dataflow from parents to children is NOT handled here; see `input_files` field instead.",
default='')
stage_out_files = models.TextField(
stage_out_files = models.TextField
'External stage out files or folders',
help_text="A string of filename patterns. Matches will be transferred to the stage_out_url. Default: no files are staged out",
default='')
......@@ -154,7 +154,7 @@ class BalsamJob(models.Model):
help_text='The URLs to which designated stage out files are sent.',
default='')
requested_wall_time_minutes = models.IntegerField(
wall_time_minutes = models.IntegerField(
'Job Wall Time in Minutes',
help_text='The number of minutes the job is expected to take',
default=1)
......@@ -215,6 +215,21 @@ class BalsamJob(models.Model):
help_text='A script that is run in a job working directory after the job has completed.'
' If blank, will default to the default_postprocess script defined for the application.',
default='')
post_error_handler = models.BooleanField(
'Let postprocesser try to handle RUN_ERROR',
help_text='If true, the postprocessor will be invoked for RUN_ERROR jobs'
' and it is up to the script to handle error and update job state.',
default=False)
post_timeout_handler = models.BooleanField(
'Let postprocesser try to handle RUN_TIMEOUT',
help_text='If true, the postprocessor will be invoked for RUN_TIMEOUT jobs'
' and it is up to the script to handle timeout and update job state.',