Commit 6e7fb763 authored by Michael Salim's avatar Michael Salim
Browse files

added docstrings and sphinx documentation. added MPICH DefaultMPICommand class

parent 84e05b7e
......@@ -16,4 +16,5 @@ balsam/service/migrations/*.py
The Balsam Launcher is a pilot application that runs either directly on compute
nodes or very close to them (perhaps on a service node with ``mpirun`` access).
The Launcher is responsible for:
* **pulling jobs** from the Balsam database
* **stage-in and stage-out** transitions that include working
directory creation, remote and local file transfer, and the enabling the
requested flow of files from parent to child jobs transparently
* running custom **pre- and post-processing** scripts for each job
* invoking **job execution** on the appropriate resources
* **monitoring** job execution and providing resilient mechanisms to **handle
expected and unexpected** runtime errors or job timeouts
In normal Balsam usage, the Launcher is not invoked directly. Instead, multiple
Launcher instances with specific workloads are automatically scheduled for execution by the
Metascheduler. However, the Launcher can also simply be invoked from the
command line. For example, to consume all jobs from the database, use:
>>> $ balsam launcher --consume-all
'''Python API for Balsam DAG Manipulations
Example usage:
>>> import launcher.dag as dag
>>> output = open('expected_output').read()
>>> if 'CONVERGED' not in output:
>>> for child in dag.children:
>>> dag.kill(child, recursive=True)
>>> with open("data/input.dat", 'w') as fp:
>>> fp.write("# a new input file here")
>>> dag.spawn_child(clone=dag.current_job,
>>> walltime_minutes=dag.current_job.walltime_minutes + 10,
>>> input_files = 'input.dat')
''' API for BalsamJob database (DAG) Manipulations
The ``launcher.dag`` module provides a number of convenient environment
variables and functions that allow you to quickly write pre- and post-processing
scripts that interact with the BalsamJob database.
When pre- or post-processing steps for a job occur, the Launcher runs your
scripts in the job's working directory with some job-specific environment
variables. If you choose to write these scripts in Python with ``launcher.dag``,
then these variables are automatically loaded for you on module import.
Useful module-level attributes
The BalsamJob object which is currently being processed
Parent BalsamJob objects on which current_job depends
Children BalsamJob objects that depend on current_job
Unique identifier and primary key of the BalsamJob
Boolean flag indicating whether the current job has timed-out while running
in the Launcher. If True, Balsam has invoked the your script as a
timeout-handler, and the script should take some clean-up or rescue acction
such as spawning a new job.
Boolean flag indicating whether the current job's application returned a
nonzero exit code to the Launcher. If True, Balsam has invoked your
script as an error-handler, and the script should take some clean-up or
rescue action.
Usage example
A typical user's post-processing script might import and use the ``dag`` API as
import balsam.launcher.dag as dag
output = open('expected_output').read() # in job workdir
if 'CONVERGED' not in output:
# Kill subtree of this job
for child in dag.children:
dag.kill(child, recursive=True)
# Create a child clone job with increased walltime and new input
with open("input_rescue.dat", 'w') as fp:
fp.write("# a new input file here")
walltime_minutes=dag.current_job.walltime_minutes + 10,
input_files = 'input_rescue.dat')
import django
......@@ -56,7 +98,22 @@ if JOB_ID:
children = current_job.get_children()
def add_job(**kwargs):
'''Add a new job to BalsamJob DB'''
'''Add a new job to the BalsamJob DB
Creates a new job and saves it to the database in CREATED state.
The job is initialized with all blank/default values for its fields; these
must be configured by the user or provided via ``kwargs``
- ``kwargs`` (*dict*): contains BalsamJob fields (keys) and their values to
be set on BalsamJob instantiation.
- ``job`` (*BalsamJob*): the newly-created BalsamJob instance
- ``ValueError``: if an invalid field name is provided to *kwargs*
job = _BalsamJob()
for k,v in kwargs.items():
......@@ -71,6 +128,14 @@ def add_job(**kwargs):
return job
def detect_circular(job, path=[]):
'''Detect a circular dependency in DAG
- ``job`` (*BalsamJob*): node at which to start traversing the DAG
- ``detected`` (*bool*): True if a circular dependency was detected
if in path: return True
path = path[:] + []
for parent in job.get_parents():
......@@ -78,7 +143,18 @@ def detect_circular(job, path=[]):
return False
def add_dependency(parent,child):
'''Create a dependency between two existing jobs'''
'''Create a dependency between two existing jobs
- ``parent`` (*BalsamJob*): The job which must reach state JOB_FINISHED
before ``child`` begins processing
- ``child`` (*BalsamJob*): The job that is dependent on ``parent`` for
control- and/or data-flow.
- ``RuntimeError``: if the attempted edge would create a circular
dependency in the BalsamJob DAG.
if isinstance(parent, str): parent = uuid.UUID(parent)
if isinstance(child, str): child = uuid.UUID(child)
......@@ -100,9 +176,33 @@ def add_dependency(parent,child):
raise RuntimeError("Detected circular dependency; not creating link")
def spawn_child(clone=False, **kwargs):
'''Add new job that is dependent on the current job'''
'''Add a new job that is dependent on the current job
This function creates a new child job that will not start until the current
job is finished processing. The job is added to the BalsamJob database in
CREATED state.
- ``clone`` (*bool*): If *True*, all fields of the current BalsamJob are
copied into the child job (except for primary key and working
directory). Specific fields may then be overwritten via *kwargs*.
Defaults to *False*.
- ``kwargs`` (*dict*) : Contains BalsamJob field names as keys and their
desired values.
- ``child`` (BalsamJob): returns the newly created BalsamJob
- ``RuntimeError``: If no BalsamJob detected on module-load
- ``ValueError``: if an invalid field name is passed into *kwargs*
if not isinstance(current_job, _BalsamJob):
raise RuntimeError("No current BalsamJob detected in environment")
if 'workflow' not in kwargs:
kwargs['workflow'] = current_job.workflow
if clone:
child = _BalsamJob.objects.get( = None
......@@ -124,7 +224,16 @@ def spawn_child(clone=False, **kwargs):
return child
def kill(job, recursive=False):
'''Kill a job or its entire subtree in the DAG'''
'''Kill a job or its entire subtree in the DAG
Mark a job (and optionally all jobs that depend on it) by the state
USER_KILLED, which will prevent any further processing.
- ``job`` (*BalsamJob*): the job (or subtree root) to kill
- ``recursive`` (*bool*): if *True*, then traverse the DAG recursively
to kill all jobs in the subtree. Defaults to *False*
if recursive:
for child in job.get_children():
'''JobReaders are responsible for pulling relevant jobs from the Balsam database.
The base class provides a constructor that uses the command line arguments to
initialize the appropriate JobReader. It also contains a generic method for
filtering the Balsam Job database query (e.g. ignore jobs that are
finished, ignore jobs with Applications that cannot run locally)
from collections import defaultdict
from django.conf import settings
from balsam.service import models
......@@ -8,23 +14,35 @@ import uuid
logger = logging.getLogger(__name__)
class JobReader():
'''Interface with BalsamJob DB & pull relevant jobs'''
'''Define JobReader interface and provide generic constructor, filters'''
def from_config(config):
'''Constructor: build a FileJobReader or WFJobReader from argparse
If a job file is given, a FileJobReader will be constructed to read only
BalsamJob primary keys from that file. Otherwise, a WFJobReader is
- ``config``: command-line arguments *namespace* object returned by
- ``JobReader`` instance
if config.job_file: return FileJobReader(config.job_file)
else: return WFJobReader(config.wf_name)
def by_states(self):
'''dict of jobs keyed by state'''
'''A dictionary of jobs keyed by their state'''
result = defaultdict(list)
for job in
return result
def refresh_from_db(self):
'''caller invokes this to read from DB'''
'''Reload and re-filter jobs from the database'''
jobs = self._get_jobs()
jobs = self._filter(jobs) = jobs
......@@ -32,13 +50,15 @@ class JobReader():
def _get_jobs(self): raise NotImplementedError
def _filter(self, job_queryset):
'''Filter out jobs that are done or cannot run locally'''
jobs = job_queryset.exclude(state__in=models.END_STATES)
jobs = jobs.filter(allowed_work_sites__icontains=settings.BALSAM_SITE)
return jobs
class FileJobReader(JobReader):
'''Limit to job PKs specified in a file. Used by metascheduler.'''
'''Read a file of whitespace delimited BalsamJob primary keys. Primarily
intended for use by the Metascheduler to execute specific workloads.'''
def __init__(self, job_file): = []
self.job_file = job_file
'''The Launcher is either invoked by the user, who bypasses the Balsam
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
'''Main Launcher entry point
The ``main()`` function contains the Launcher service loop, in which:
1. Transitions are checked for completion and jobs states are updated
2. Dependencies of waiting jobs are checked
3. New transitions are added to the transitions queue
4. The RunnerGroup is checked for jobs that have stopped execution
5. A new Runner is created according to logic in create_next_runner
The ``on_exit()`` handler is invoked either when the time limit is exceeded or
if the program receives a SIGTERM or SIGINT. This takes the necessary cleanup
actions and is guaranteed to execute only once through the HANDLING_EXIT global
import argparse
from math import floor
import os
def delay_generator(period=settings.BALSAM_SERVICE_PERIOD):
'''Generator: Block for ``period`` seconds since the last call to __next__()'''
nexttime = time.time() + period
while True:
now = time.time()
......@@ -44,11 +56,24 @@ def delay_generator(period=settings.BALSAM_SERVICE_PERIOD):
def elapsed_time_minutes():
'''Generator: yield elapsed time in minutes since first call to __next__'''
start = time.time()
while True:
yield (time.time() - start) / 60.0
def remaining_time_minutes(time_limit_minutes=0.0):
'''Generator: yield remaining time for Launcher execution
If time_limit_minutes is given, use internal timer to count down remaining
time. Otherwise, query scheduler for remaining time.
- ``time_limit_minutes`` (*float*): runtime limit
- ``remaining`` (*float*): remaining time
- ``StopIteration``: when there is no time left
elapsed_timer = elapsed_time_minutes()
while True:
if time_limit_minutes > 0.0:
......@@ -59,6 +84,7 @@ def remaining_time_minutes(time_limit_minutes=0.0):
else: break
def check_parents(job, lock):
'''Check job's dependencies, update to READY if satisfied'''
parents = job.get_parents()
ready = all(p.state == 'JOB_FINISHED' for p in parents)
......@@ -74,6 +100,7 @@ def check_parents(job, lock):'{job.cute_id} waiting for parents')
def log_time(minutes_left):
'''Pretty log of remaining time'''
if minutes_left > 1e12:
whole_minutes = floor(minutes_left)
......@@ -82,6 +109,10 @@ def log_time(minutes_left):
def create_runner(jobs, runner_group, worker_group, remaining_minutes, last_runner_created):
'''Decide whether or not to create another runner. Considers how many jobs
can run, how many can *almost* run, how long since the last Runner was
created, and how many jobs are serial as opposed to MPI.
runnable_jobs = [
job for job in jobs
if not in runner_group.running_job_pks and
......@@ -117,7 +148,7 @@ def create_runner(jobs, runner_group, worker_group, remaining_minutes, last_runn
return last_runner_created
def main(args, transition_pool, runner_group, job_source):
'''Main Launcher service loop'''
delay_sleeper = delay_generator()
last_runner_created = time.time()
remaining_timer = remaining_time_minutes(args.time_limit_minutes)
......@@ -165,6 +196,7 @@ def main(args, transition_pool, runner_group, job_source):
if delay: next(delay_sleeper)
def on_exit(runner_group, transition_pool, job_source):
'''Exit cleanup'''
if HANDLING_EXIT: return
......@@ -180,6 +212,7 @@ def on_exit(runner_group, transition_pool, job_source):
def get_args(inputcmd=None):
'''Parse command line arguments'''
parser = argparse.ArgumentParser(description="Start Balsam Job Launcher.")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--job-file', help="File of Balsam job IDs")
......@@ -201,6 +234,8 @@ def get_args(inputcmd=None):
return parser.parse_args()
def detect_dead_runners(job_source):
'''Jobs found in the RUNNING state before the Launcher starts may have
crashed; pick them up and restart'''
for job in job_source.by_states['RUNNING']:'Picked up dead running job {job.cute_id}: marking RESTART_READY')
job.update_state('RESTART_READY', 'Detected dead runner')
'''The MPICommand subclasses express a template for system-specfic MPI calls. If
the Launcher detects a specific host type, the appropriate MPICommand class is
automatically used. Otherwise, the DEFAULTMPICommand class is assigned at
module-load time, testing for MPICH or OpenMPI'''
import subprocess
import logging
logger = logging.getLogger(__name__)
class MPICommand(object):
'''Base Class for creating ``mpirun`` command lines.
System-specific commands are generated by subclasses that specify the
command and argument names. MPICommand instances are callable; the relevant
parameters are passed in as arguments and an MPI command is generated from a
template and returned'''
def __init__(self):
self.mpi = ''
self.nproc = ''
......@@ -109,6 +120,20 @@ class CRAYMPICommand(MPICommand):
return ""
return f"-L {','.join(str( for worker in workers)}"
class MPICHCommand(MPICommand):
def __init__(self):
# 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1
self.mpi = 'mpirun'
self.nproc = '-n'
self.ppn = '--ppn'
self.env = '--env'
self.cpu_binding = None
self.threads_per_rank = None
self.threads_per_core = None
def worker_str(self, workers):
return ""
class COOLEYMPICommand(MPICommand):
def __init__(self):
# 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1
......@@ -130,7 +155,7 @@ try_mpich = subprocess.Popen(['mpirun', '-npernode'], stdout=subprocess.PIPE,
stdout, _ = try_mpich.communicate()
if 'unrecognized argument npernode' in stdout.decode():
logger.debug("Assuming MPICH")
logger.debug("Assuming OpenMPI")
'''mpi4py wrapper that allows an ensemble of serial applications to run in
parallel across ranks on the computing resource'''
from collections import namedtuple
import os
import sys
'''A Runner is constructed with a list of jobs and a list of idle workers. It
creates and monitors the execution subprocess, updating job states in the DB as
necessary. RunnerGroup has a collection of Runner objects, logic for creating
the next Runner (i.e. assigning jobs to nodes), and the public interface to
monitor runners'''
# TODO: "balsam qsub" is misleading because you can't qsub a script that calls
# mpirun. Implement a --mode script option that uses a "ScriptRunner". The
# ScriptRunner should parse the script to make sure it is not using more nodes
# than requested by job; and perhaps modify each mpirun with the correct
# workers_string
The actual execution of BalsamJob applications occurs in **Runners** which
delegate a list of **BalsamJobs** to one or more **Workers**, which are an
abstraction for the local computing resources. Each **Runner** subclass hides
system-dependent MPI details and executes jobs in a particular mode, such as:
* ``MPIRunner``: a single multi-node job
* ``MPIEnsembleRunner``: several serial jobs packaged into one ensemble
.. note::
The current command line tool ``balsam qsub`` is misleading because users
can't qsub a script that calls mpirun. We must implement a ``--mode
script`` option that results in jobs which are handled by a
``ScriptRunner``. The ScriptRunner should parse the script to make sure it
is not using more nodes than requested by the job; and perhaps identify and
translate each ``mpirun`` in the script to the correct, local
system-specific command.
import functools
from math import ceil
......@@ -57,8 +64,28 @@ class MonitorStream(Thread):
class Runner:
'''Spawns ONE subprocess to run specified job(s) and monitor their execution'''
'''Runner Base Class: constructor and methods for starting application
Each Runner instance manages one Python ``subprocess`` and is destroyed when
the subprocess is no longer tracked. The Runner may manage and monitor the
execution of one or many BalsamJobs.
def __init__(self, job_list, worker_list):
'''Instantiate a new Runner instance.
This method should be extended by concrete Runner subclasses.
A Runner is constructed with a list of BalsamJobs and a list of idle workers.
After the ``__init__`` method is finished, the instance ``popen_args``
attribute should contain all the arguments to ``subprocess.Popen`` that
are necessary to start the Runner.
- ``job_list`` (*list*): A list of *BalsamJobs* to be executed
- ``worker_list``: A list of *Workers* that will run the BalsamJob
host_type = worker_list[0].host_type
assert all(w.host_type == host_type for w in worker_list)
self.worker_list = worker_list
......@@ -72,21 +99,39 @@ class Runner:
self.popen_args = {}
def start(self):
'''Start the Runner subprocess.
If the Popen stdout argument is PIPE, then a separate MonitorStream
thread is started, which is used to check the output of the Runner
subprocess in a non-blocking fashion.
self.process = Popen(**self.popen_args)
if self.popen_args['stdout'] == PIPE:
self.monitor = MonitorStream(self.process.stdout)
def update_jobs(self, timeout=False):
'''Monitor the execution subprocess and update job states in the DB
-``timeout`` (*bool*): If *True*, then jobs that are currently in
state RUNNING under this Runner will be timed-out and marked in
state RUN_TIMEOUT. Defaults to *False*.
-``finished`` (*bool*): If *True*, then the managing ``RunnerGroup``
will assume this Runner is finished and remove it
raise NotImplementedError
def finished(self):
'''Return *True* if the Runner subprocess has finished'''
return self.process.poll() is not None
class MPIRunner(Runner):
'''One subprocess, one job'''
'''Manage one mpirun subprocess for one multi-node job'''
def __init__(self, job_list, worker_list):
'''Create the command line for mpirun'''
super().__init__(job_list, worker_list)
if len( != 1:
raise BalsamRunnerError('MPIRunner must take exactly 1 job')
......@@ -118,6 +163,7 @@ class MPIRunner(Runner):"MPIRunner: writing output to {outname}")
def update_jobs(self, timeout=False):
'''Update the job state and return finished flag'''
job =[0]
#job.refresh_from_db() # TODO: handle RecordModified
retcode = self.process.poll()
......@@ -148,9 +194,15 @@ class MPIRunner(Runner):
class MPIEnsembleRunner(Runner):
'''One subprocess: an ensemble of serial jobs run in an mpi4py wrapper'''
'''Manage an ensemble of serial (non-MPI) jobs in one MPI subprocess
Invokes the script, where the jobs are run in parallel
across workers
def __init__(self, job_list, worker_list):
'''Create an mpi_ensemble file with jobs passed to the master-worker
super().__init__(job_list, worker_list)
root_dir = Path([0].working_directory).parent
......@@ -185,7 +237,7 @@ class MPIEnsembleRunner(Runner):
self.ensemble_filename = ensemble_filename
def update_jobs(self, timeout=False):
'''Relies on stdout of'''
'''Update serial job states, according to the stdout of'''
logger.debug("Checking mpi_ensemble stdout for status updates...")
for line in self.monitor.available_lines():
......@@ -214,6 +266,9 @@ class MPIEnsembleRunner(Runner):
return finished
class RunnerGroup:
'''Iterable collection of Runner objects with logic for creating
the next Runner (i.e. assigning jobs to nodes), and the public interface to
monitor runners'''
def __init__(self, lock):
......@@ -224,10 +279,24 @@ class RunnerGroup:
return iter(self.runners)
def create_next_runner(self, runnable_jobs, workers):
'''Implements one particular strategy for choosing the next job, assuming
all jobs are either single-process or MPI-parallel. Will return the serial
ensemble job or single MPI job that occupies the largest possible number of
idle nodes'''
'''Create the next Runner object, add it to the RunnerGroup collection,
and start the Runner subprocess.
This method implements one particular strategy for choosing the next
job, assuming all jobs are either single-process or MPI-parallel. Will
return the serial ensemble job or single MPI job that occupies the
largest possible number of idle nodes.
- ``runnable_jobs``: list of ``BalsamJob`` objects that are ready to run
- ``workers``: iterable container of all ``Worker`` instances; idle
workers may be assigned to a Runner
- ExceededMaxRunners: if the number of current Runners (and thereby
background mpirun processes) goes over the user-configured threshold
- NoAvailableWorkers: if no workers are idle
if len(self.runners) == self.MAX_CONCURRENT_RUNNERS:
raise ExceededMaxRunners(
......@@ -298,6 +367,19 @@ class RunnerGroup:
logger.debug(f"Using workers: {[ for w in assigned_workers]}")