Commit 70332624 authored by Michael Salim's avatar Michael Salim
Browse files

Merge remote-tracking branch 'origin/develop'

parents c3e0845e badd48f6
'''
The Balsam Launcher is a pilot application that runs either directly on compute
nodes or very close to them (perhaps on a service node with ``mpirun`` access).
The Launcher is responsible for:
* **pulling jobs** from the Balsam database
* **stage-in and stage-out** transitions that include working
directory creation, remote and local file transfer, and the enabling the
requested flow of files from parent to child jobs transparently
* running custom **pre- and post-processing** scripts for each job
* invoking **job execution** on the appropriate resources
* **monitoring** job execution and providing resilient mechanisms to **handle
expected and unexpected** runtime errors or job timeouts
In normal Balsam usage, the Launcher is not invoked directly. Instead, multiple
Launcher instances with specific workloads are automatically scheduled for execution by the
Metascheduler. However, the Launcher can also simply be invoked from the
command line. For example, to consume all jobs from the database, use:
>>> $ balsam launcher --consume-all
'''
''' API for BalsamJob database (DAG) Manipulations
==================================================
The ``launcher.dag`` module provides a number of convenient environment
variables and functions that allow you to quickly write pre- and post-processing
scripts that interact with the BalsamJob database.
When pre- or post-processing steps for a job occur, the Launcher runs your
scripts in the job's working directory with some job-specific environment
variables. If you choose to write these scripts in Python with ``launcher.dag``,
then these variables are automatically loaded for you on module import.
Useful module-level attributes
-------------------------------
``current_job``
The BalsamJob object which is currently being processed
``parents``
Parent BalsamJob objects on which current_job depends
``children``
Children BalsamJob objects that depend on current_job
``JOB_ID``
Unique identifier and primary key of the BalsamJob
``TIMEOUT``
Boolean flag indicating whether the current job has timed-out while running
in the Launcher. If True, Balsam has invoked the your script as a
timeout-handler, and the script should take some clean-up or rescue acction
such as spawning a new job.
``ERROR``
Boolean flag indicating whether the current job's application returned a
nonzero exit code to the Launcher. If True, Balsam has invoked your
script as an error-handler, and the script should take some clean-up or
rescue action.
Usage example
--------------
A typical user's post-processing script might import and use the ``dag`` API as
follows::
import balsam.launcher.dag as dag
output = open('expected_output').read() # in job workdir
if 'CONVERGED' not in output:
# Kill subtree of this job
for child in dag.children:
dag.kill(child, recursive=True)
# Create a child clone job with increased walltime and new input
with open("input_rescue.dat", 'w') as fp:
fp.write("# a new input file here")
dag.spawn_child(clone=True,
walltime_minutes=dag.current_job.walltime_minutes + 10,
input_files = 'input_rescue.dat')
'''
import django
import json
import os
import uuid
__all__ = ['JOB_ID', 'TIMEOUT', 'ERROR',
'current_job', 'parents', 'children',
'add_job', 'add_dependency', 'spawn_child',
'kill']
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
django.setup()
from balsam.service.models import BalsamJob, history_line
from django.conf import settings
current_job = None
parents = None
children = None
_envs = {k:v for k,v in os.environ.items() if k.find('BALSAM')>=0}
JOB_ID = _envs.get('BALSAM_JOB_ID', '')
TIMEOUT = _envs.get('BALSAM_JOB_TIMEOUT', False) == "TRUE"
ERROR = _envs.get('BALSAM_JOB_ERROR', False) == "TRUE"
if JOB_ID:
JOB_ID = uuid.UUID(JOB_ID)
try:
current_job = BalsamJob.objects.get(pk=JOB_ID)
except:
raise RuntimeError(f"The environment specified current job: "
f"BALSAM_JOB_ID {JOB_ID}\n but this does not "
f"exist in DB! Was it deleted accidentally?")
else:
parents = current_job.get_parents()
children = current_job.get_children()
def add_job(**kwargs):
'''Add a new job to the BalsamJob DB
Creates a new job and saves it to the database in CREATED state.
The job is initialized with all blank/default values for its fields; these
must be configured by the user or provided via ``kwargs``
Args:
- ``kwargs`` (*dict*): contains BalsamJob fields (keys) and their values to
be set on BalsamJob instantiation.
Returns:
- ``job`` (*BalsamJob*): the newly-created BalsamJob instance
Raises:
- ``ValueError``: if an invalid field name is provided to *kwargs*
'''
job = BalsamJob()
for k,v in kwargs.items():
try:
getattr(job, k)
except AttributeError:
raise ValueError(f"Invalid field {k}")
else:
setattr(job, k, v)
if 'allowed_work_sites' not in kwargs:
job.allowed_work_sites = settings.BALSAM_SITE
job.save()
return job
def detect_circular(job, path=[]):
'''Detect a circular dependency in DAG
Args:
- ``job`` (*BalsamJob*): node at which to start traversing the DAG
Returns:
- ``detected`` (*bool*): True if a circular dependency was detected
'''
if job.pk in path: return True
path = path[:] + [job.pk]
for parent in job.get_parents():
if detect_circular(parent, path): return True
return False
def add_dependency(parent,child):
'''Create a dependency between two existing jobs
Args:
- ``parent`` (*BalsamJob*): The job which must reach state JOB_FINISHED
before ``child`` begins processing
- ``child`` (*BalsamJob*): The job that is dependent on ``parent`` for
control- and/or data-flow.
Raises:
- ``RuntimeError``: if the attempted edge would create a circular
dependency in the BalsamJob DAG.
'''
if isinstance(parent, str): parent = uuid.UUID(parent)
if isinstance(child, str): child = uuid.UUID(child)
if not isinstance(parent, BalsamJob):
parent = BalsamJob.objects.get(pk=parent)
if not isinstance(child, BalsamJob):
child = BalsamJob.objects.get(pk=child)
existing_parents = child.get_parents_by_id()
new_parents = existing_parents.copy()
parent_pk_str = str(parent.pk)
if parent_pk_str in existing_parents:
raise RuntimeError("Dependency already exists; cannot double-create")
else:
new_parents.append(parent_pk_str)
child.set_parents(new_parents)
if detect_circular(child):
child.set_parents(existing_parents)
raise RuntimeError("Detected circular dependency; not creating link")
def spawn_child(clone=False, **kwargs):
'''Add a new job that is dependent on the current job
This function creates a new child job that will not start until the current
job is finished processing. The job is added to the BalsamJob database in
CREATED state.
Args:
- ``clone`` (*bool*): If *True*, all fields of the current BalsamJob are
copied into the child job (except for primary key and working
directory). Specific fields may then be overwritten via *kwargs*.
Defaults to *False*.
- ``kwargs`` (*dict*) : Contains BalsamJob field names as keys and their
desired values.
Returns:
- ``child`` (BalsamJob): returns the newly created BalsamJob
Raises:
- ``RuntimeError``: If no BalsamJob detected on module-load
- ``ValueError``: if an invalid field name is passed into *kwargs*
'''
if not isinstance(current_job, BalsamJob):
raise RuntimeError("No current BalsamJob detected in environment")
if 'workflow' not in kwargs:
kwargs['workflow'] = current_job.workflow
if 'allowed_work_sites' not in kwargs:
kwargs['allowed_work_sites'] = settings.BALSAM_SITE
child = BalsamJob()
new_pk = child.pk
exclude_fields = '_state version state_history job_id working_directory'.split()
fields = [f for f in current_job.__dict__ if f not in exclude_fields]
if clone:
for f in fields:
child.__dict__[f] = current_job.__dict__[f]
assert child.pk == new_pk
for k,v in kwargs.items():
if k in fields:
child.__dict__[k] = v
else:
raise ValueError(f"Invalid field {k}")
child.working_directory = '' # This is essential
newparents = json.loads(current_job.parents)
newparents.append(str(current_job.job_id))
child.parents = json.dumps(newparents)
child.state = "CREATED"
child.state_history = history_line("CREATED", f"spawned by {current_job.cute_id}")
child.save()
return child
def kill(job, recursive=True):
'''Kill a job or its entire subtree in the DAG
Mark a job (and optionally all jobs that depend on it) by the state
USER_KILLED, which will prevent any further processing.
Args:
- ``job`` (*BalsamJob*): the job (or subtree root) to kill
- ``recursive`` (*bool*): if *True*, then traverse the DAG recursively
to kill all jobs in the subtree. Defaults to *True*
'''
job.update_state('USER_KILLED')
if recursive:
for child in job.get_children():
kill(child, recursive=True)
class BalsamLauncherError(Exception): pass
class BalsamRunnerError(Exception): pass
class ExceededMaxRunners(BalsamRunnerError): pass
class NoAvailableWorkers(BalsamRunnerError): pass
class BalsamTransitionError(Exception): pass
class TransitionNotFoundError(BalsamTransitionError, ValueError): pass
class MPIEnsembleError(Exception): pass
'''JobReaders are responsible for pulling relevant jobs from the Balsam database.
The base class provides a constructor that uses the command line arguments to
initialize the appropriate JobReader. It also contains a generic method for
filtering the Balsam Job database query (e.g. ignore jobs that are
finished, ignore jobs with Applications that cannot run locally)
'''
from collections import defaultdict
from django.conf import settings
from balsam.service import models
BalsamJob = models.BalsamJob
import logging
import uuid
logger = logging.getLogger(__name__)
class JobReader():
'''Define JobReader interface and provide generic constructor, filters'''
@staticmethod
def from_config(config):
'''Constructor: build a FileJobReader or WFJobReader from argparse
arguments
If a job file is given, a FileJobReader will be constructed to read only
BalsamJob primary keys from that file. Otherwise, a WFJobReader is
created.
Args:
- ``config``: command-line arguments *namespace* object returned by
argparse.ArgumentParser
Returns:
- ``JobReader`` instance
'''
if config.job_file: return FileJobReader(config.job_file)
else: return WFJobReader(config.wf_name)
@property
def by_states(self):
'''A dictionary of jobs keyed by their state'''
result = defaultdict(list)
for job in self.jobs:
result[job.state].append(job)
return result
def refresh_from_db(self):
'''Reload and re-filter jobs from the database'''
jobs = self._get_jobs()
jobs = self._filter(jobs)
self.jobs = jobs
def _get_jobs(self): raise NotImplementedError
def _filter(self, job_queryset):
'''Filter out jobs that are done or cannot run locally'''
jobs = job_queryset.exclude(state__in=models.END_STATES)
jobs = jobs.filter(allowed_work_sites__icontains=settings.BALSAM_SITE)
return jobs
class FileJobReader(JobReader):
'''Read a file of whitespace delimited BalsamJob primary keys. Primarily
intended for use by the Metascheduler to execute specific workloads.'''
def __init__(self, job_file):
self.jobs = []
self.job_file = job_file
self.pk_list = None
logger.info(f"Taking jobs from file {self.job_file}")
def _get_jobs(self):
if self.pk_list is None:
pk_strings = open(self.job_file).read().split()
self.pk_list = [uuid.UUID(pk) for pk in pk_strings]
jobs = BalsamJob.objects.filter(job_id__in=self.pk_list)
return jobs
class WFJobReader(JobReader):
'''Consume all jobs from DB, optionally matching a Workflow name.
Will not consume jobs scheduled by metascheduler'''
def __init__(self, wf_name):
self.jobs = []
self.wf_name = wf_name
if wf_name:
logger.info(f"Consuming jobs from workflow {wf_name}")
else:
logger.info("Consuming all jobs from local DB")
def _get_jobs(self):
objects = BalsamJob.objects
wf = self.wf_name
jobs = objects.filter(workflow=wf) if wf else objects.all()
jobs = jobs.filter(scheduler_id__exact='')
return jobs
'''Main Launcher entry point
The ``main()`` function contains the Launcher service loop, in which:
1. Transitions are checked for completion and jobs states are updated
2. Dependencies of waiting jobs are checked
3. New transitions are added to the transitions queue
4. The RunnerGroup is checked for jobs that have stopped execution
5. A new Runner is created according to logic in create_next_runner
The ``on_exit()`` handler is invoked either when the time limit is exceeded or
if the program receives a SIGTERM or SIGINT. This takes the necessary cleanup
actions and is guaranteed to execute only once through the HANDLING_EXIT global
flag.
'''
import argparse
from math import floor
import os
import sys
import signal
import time
import django
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
django.setup()
from django.conf import settings
import logging
logger = logging.getLogger('balsam.launcher')
logger.info("Loading Balsam Launcher")
from balsam.service.schedulers import Scheduler
from balsam.service.models import END_STATES
scheduler = Scheduler.scheduler_main
from balsam.launcher import jobreader
from balsam.launcher import transitions
from balsam.launcher import worker
from balsam.launcher import runners
from balsam.launcher.exceptions import *
ALMOST_RUNNABLE_STATES = ['READY','STAGED_IN']
RUNNABLE_STATES = ['PREPROCESSED', 'RESTART_READY']
WAITING_STATES = ['CREATED', 'LAUNCHER_QUEUED', 'AWAITING_PARENTS']
HANDLING_EXIT = False
def delay_generator(period=settings.BALSAM_SERVICE_PERIOD):
'''Generator: Block for ``period`` seconds since the last call to __next__()'''
nexttime = time.time() + period
while True:
now = time.time()
tosleep = nexttime - now
if tosleep <= 0:
nexttime = now + period
else:
time.sleep(tosleep)
nexttime = now + tosleep + period
yield
def elapsed_time_minutes():
'''Generator: yield elapsed time in minutes since first call to __next__'''
start = time.time()
while True:
yield (time.time() - start) / 60.0
def remaining_time_minutes(time_limit_minutes=0.0):
'''Generator: yield remaining time for Launcher execution
If time_limit_minutes is given, use internal timer to count down remaining
time. Otherwise, query scheduler for remaining time.
Args:
- ``time_limit_minutes`` (*float*): runtime limit
Yields:
- ``remaining`` (*float*): remaining time
Raises:
- ``StopIteration``: when there is no time left
'''
elapsed_timer = elapsed_time_minutes()
while True:
if time_limit_minutes > 0.0:
remaining = time_limit_minutes - next(elapsed_timer)
else:
remaining = scheduler.remaining_time_seconds() / 60.0
if remaining > 0: yield remaining
else: break
def check_parents(job, lock):
'''Check job's dependencies, update to READY if satisfied'''
job.refresh_from_db()
parents = job.get_parents()
ready = all(p.state == 'JOB_FINISHED' for p in parents)
if ready or not job.wait_for_parents:
lock.acquire()
job.update_state('READY', 'dependencies satisfied')
lock.release()
logger.info(f'{job.cute_id} ready')
elif job.state != 'AWAITING_PARENTS':
lock.acquire()
job.update_state('AWAITING_PARENTS', f'{len(parents)} parents')
lock.release()
logger.info(f'{job.cute_id} waiting for parents')
def log_time(minutes_left):
'''Pretty log of remaining time'''
if minutes_left > 1e12:
return
whole_minutes = floor(minutes_left)
whole_seconds = round((minutes_left - whole_minutes)*60)
time_str = f"{whole_minutes:02d} min : {whole_seconds:02d} sec remaining"
logger.info(time_str)
def create_runner(jobs, runner_group, worker_group, remaining_minutes, last_runner_created):
'''Decide whether or not to create another runner. Considers how many jobs
can run, how many can *almost* run, how long since the last Runner was
created, and how many jobs are serial as opposed to MPI.
'''
runnable_jobs = [
job for job in jobs
if job.pk not in runner_group.running_job_pks and
job.state in RUNNABLE_STATES and
job.wall_time_minutes <= remaining_minutes
]
logger.debug(f"Have {len(runnable_jobs)} runnable jobs")
# If nothing is getting pre-processed, don't bother waiting
almost_runnable = any(j.state in ALMOST_RUNNABLE_STATES for j in jobs)
# If it has been runner_create_period seconds, don't wait any more
runner_create_period = settings.BALSAM_RUNNER_CREATION_PERIOD_SEC
now = time.time()
runner_ready = bool(now - last_runner_created > runner_create_period)
# If there are enough serial jobs, don't wait to run
num_serial = len([j for j in runnable_jobs if j.num_ranks == 1])
worker = worker_group[0]
max_serial_per_ensemble = 2 * worker.num_nodes * worker.max_ranks_per_node
ensemble_ready = (num_serial >= max_serial_per_ensemble) or (num_serial == 0)
if runnable_jobs:
if runner_ready or not almost_runnable or ensemble_ready:
try:
runner_group.create_next_runner(runnable_jobs, worker_group)
except ExceededMaxRunners:
logger.info("Exceeded max concurrent runners; waiting")
except NoAvailableWorkers:
logger.info("Not enough idle workers to start any new runs")
else:
last_runner_created = now
return last_runner_created
def main(args, transition_pool, runner_group, job_source):
'''Main Launcher service loop'''
delay_sleeper = delay_generator()
last_runner_created = time.time()
remaining_timer = remaining_time_minutes(args.time_limit_minutes)
for remaining_minutes in remaining_timer:
logger.info("\n******************\n"
"BEGIN SERVICE LOOP\n"
"******************")
log_time(remaining_minutes)
delay = True
# Update after any finished transitions
for stat in transition_pool.get_statuses(): delay = False
job_source.refresh_from_db()
# Update jobs awaiting dependencies
waiting_jobs = (j for j in job_source.jobs if j.state in WAITING_STATES)
for job in waiting_jobs:
check_parents(job, transition_pool.lock)
# Enqueue new transitions
transitionable_jobs = [
job for job in job_source.jobs
if job not in transition_pool
and job.state in transitions.TRANSITIONS
]
for job in transitionable_jobs:
transition_pool.add_job(job)
delay = False
fxn = transitions.TRANSITIONS[job.state]
logger.info(f"Queued transition: {job.cute_id} will undergo {fxn}")
# Update jobs that are running/finished
any_finished = runner_group.update_and_remove_finished()
if any_finished: delay = False
job_source.refresh_from_db()
# Decide whether or not to start a new runner
last_runner_created = create_runner(job_source.jobs, runner_group,
worker_group, remaining_minutes,
last_runner_created)
if delay: next(delay_sleeper)
if all(j.state in END_STATES for j in job_source.jobs):
logger.info("No jobs to process. Exiting main loop now.")
break
def on_exit(runner_group, transition_pool, job_source):
'''Exit cleanup'''
global HANDLING_EXIT
if HANDLING_EXIT: return
HANDLING_EXIT = True
logger.debug("Entering on_exit cleanup function")
logger.debug("on_exit: update/remove/timeout jobs from runner group")
runner_group.update_and_remove_finished(timeout=True)
logger.debug("on_exit: send end message to transition threads")
transition_pool.end_and_wait()
logger.debug("on_exit: Launcher exit graceful\n\n")
sys.exit(0)
def get_args(inputcmd=None):
'''Parse command line arguments'''
parser = argparse.ArgumentParser(description="Start Balsam Job Launcher.")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--job-file', help="File of Balsam job IDs")
group.add_argument('--consume-all', action='store_true',
help="Continuously run all jobs from DB")
group.add_argument('--wf-name',
help="Continuously run jobs of specified workflow")
parser.add_argument('--num-workers', type=int, default=0,