Commit 44967bc1 authored by Michael Salim's avatar Michael Salim
Browse files

DAG functional tests

parent 47e44147
......@@ -75,6 +75,22 @@ def create_new_runners(jobs, runner_group, worker_group):
runnable_jobs = get_runnable_jobs(jobs, running_pks)
return created_one
def check_parents(job, lock):
parents = job.get_parents()
ready = all(p.state == 'JOB_FINISHED' for p in parents)
if ready:
lock.acquire()
job.update_state('READY', 'dependencies satisfied')
lock.release()
logger.info(f'{job.cute_id} ready')
elif job.state != 'AWAITING_PARENTS':
lock.acquire()
job.update_state('AWAITING_PARENTS', f'{len(parents)} parents')
lock.release()
logger.info(f'{job.cute_id} waiting for parents')
def main(args, transition_pool, runner_group, job_source):
delay_timer = delay()
elapsed_min = elapsed_time_minutes()
......@@ -88,11 +104,12 @@ def main(args, transition_pool, runner_group, job_source):
"BEGIN SERVICE LOOP\n"
"******************")
wait = True
for stat in transition_pool.get_statuses():
wait = False
for stat in transition_pool.get_statuses(): wait = False
job_source.refresh_from_db()
waiting_jobs = (j for j in job_source.jobs if
j.state in 'CREATED AWAITING_PARENTS LAUNCHER_QUEUED')
for job in waiting_jobs: check_parents(job, transition_pool.lock)
transitionable_jobs = [
job for job in job_source.jobs
......@@ -115,8 +132,8 @@ def on_exit(runner_group, transition_pool, job_source):
transition_pool.flush_job_queue()
runner_group.update_and_remove_finished()
logger.debug("Timing out runner processes")
for runner in runner_group:
logger.debug("Timing out runner processes")
runner.timeout()
transition_pool.end_and_wait()
......
......@@ -3,6 +3,8 @@ import os
import sys
import logging
import django
import signal
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
logger = logging.getLogger('balsamlauncher.mpi_ensemble')
......@@ -17,6 +19,16 @@ from balsamlauncher.exceptions import *
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
def on_exit():
logger.debug("mpi_ensemble received interrupt: quitting now")
MPI.Finalize()
sys.exit(0)
handler = lambda a,b: on_exit()
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGHUP, handler)
Job = namedtuple('Job', ['id', 'workdir', 'cmd'])
......
......@@ -6,6 +6,7 @@ import queue
import os
from io import StringIO
from traceback import print_exc
import signal
import sys
import subprocess
import tempfile
......@@ -32,9 +33,11 @@ logger = logging.getLogger('balsamlauncher.transitions')
# Balsam, because it's built in and requires zero user configuration
if sys.platform.startswith('darwin'):
LockClass = multiprocessing.Lock
logger.debug('Using real multiprocessing.Lock')
elif sys.platform.startswith('win32'):
LockClass = multiprocessing.Lock
else:
logger.debug('Using dummy lock')
class DummyLock:
def acquire(self): pass
def release(self): pass
......@@ -47,14 +50,24 @@ SITE = settings.BALSAM_SITE
StatusMsg = namedtuple('StatusMsg', ['pk', 'state', 'msg'])
JobMsg = namedtuple('JobMsg', ['job', 'transition_function'])
def on_exit(lock):
logger.debug("Transition thread caught SIGTERM")
#try: lock.release()
#except ValueError: pass
def main(job_queue, status_queue, lock):
handler = lambda a,b: on_exit(lock)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
while True:
logger.debug("Transition process waiting for job")
job_msg = job_queue.get()
job, transition_function = job_msg
if job == 'end': return
if job == 'end':
logger.debug("Received end..quitting transition loop")
return
if job.work_site != SITE:
job.work_site = SITE
lock.acquire()
......@@ -144,23 +157,6 @@ class TransitionProcessPool:
logger.info("All Transition processes joined: done.")
self.transitions_pk_list = []
def check_parents(job, lock):
logger.debug(f'{job.cute_id} in check_parents')
parents = job.get_parents()
ready = all(p.state == 'JOB_FINISHED' for p in parents)
if ready:
lock.acquire()
job.update_state('READY', 'dependencies satisfied')
lock.release()
logger.info(f'{job.cute_id} ready')
elif job.state != 'AWAITING_PARENTS':
lock.acquire()
job.update_state('AWAITING_PARENTS', f'{len(parents)} pending jobs')
lock.release()
logger.info(f'{job.cute_id} waiting for parents')
def stage_in(job, lock):
# Create workdirs for jobs: use job.create_working_path
logger.debug(f'{job.cute_id} in stage_in')
......@@ -194,19 +190,22 @@ def stage_in(job, lock):
parent_dir = parent.working_directory
for pattern in input_patterns:
path = os.path.join(parent_dir, pattern)
matches.extend((parent.pk, glob.glob(path)))
matches.extend((parent.pk,match)
for match in glob.glob(path))
for parent_pk, inp_file in matches:
basename = os.path.basename(inp_file)
newpath = os.path.join(work_dir, basename)
if os.path.exists(newpath): newpath += f"_{str(parent_pk)[:8]}"
new_path = os.path.join(work_dir, basename)
if os.path.exists(new_path): new_path += f"_{str(parent_pk)[:8]}"
# pointing to src, named dst
logger.info(f"{job.cute_id} {newpath} --> {inp_file}")
logger.info(f"{job.cute_id} {new_path} --> {inp_file}")
try:
os.symlink(src=inp_file, dst=newpath)
os.symlink(src=inp_file, dst=new_path)
except Exception as e:
raise BalsamTransitionError(
f"Exception received during symlink: {e}") from e
lock.acquire()
job.update_state('STAGED_IN')
lock.release()
......@@ -273,7 +272,7 @@ def preprocess(job, lock):
lock.release()
logger.info(f"{job.cute_id} no preprocess: skipped")
return
if not all(os.path.exists(p) for p in preproc_app.split()):
if not os.path.exists(preproc_app.split()[0]):
#TODO: look for preproc in the EXE directories
message = f"Preprocessor {preproc_app} does not exist on filesystem"
raise BalsamTransitionError(message)
......@@ -288,10 +287,12 @@ def preprocess(job, lock):
try:
args = preproc_app.split()
logger.info(f"{job.cute_id} preprocess Popen {args}")
lock.acquire()
proc = subprocess.Popen(args, stdout=fp,
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=PREPROCESS_TIMEOUT_SECONDS)
lock.release()
except Exception as e:
message = f"Preprocess failed: {e}"
proc.kill()
......@@ -347,7 +348,7 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
logger.info(f'{job.cute_id} no postprocess: skipped')
return
if not all(os.path.exists(p) for p in postproc_app.split()):
if not os.path.exists(postproc_app.split()[0]):
#TODO: look for postproc in the EXE directories
message = f"Postprocessor {postproc_app} does not exist on filesystem"
raise BalsamTransitionError(message)
......@@ -365,10 +366,12 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
try:
args = postproc_app.split()
logger.info(f"{job.cute_id} postprocess Popen {args}")
lock.acquire()
proc = subprocess.Popen(args, stdout=fp,
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=POSTPROCESS_TIMEOUT_SECONDS)
lock.release()
except Exception as e:
message = f"Postprocess failed: {e}"
proc.kill()
......@@ -421,9 +424,6 @@ def handle_run_error(job, lock):
TRANSITIONS = {
'CREATED': check_parents,
'LAUNCHER_QUEUED': check_parents,
'AWAITING_PARENTS': check_parents,
'READY': stage_in,
'STAGED_IN': preprocess,
'RUN_DONE': postprocess,
......
......@@ -47,7 +47,7 @@ def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE,
ranks_per_node=1, args='', workflow='', envs={}, state='CREATED',
url_in='', input_files='', url_out='', stage_out_files='',
post_error_handler=False, post_timeout_handler=False,
auto_timeout_retry=True):
auto_timeout_retry=True, preproc='', postproc=''):
if app and direct_command:
raise ValueError("Cannot have both application and direct command")
......@@ -75,6 +75,9 @@ def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE,
job.post_error_handler = post_error_handler
job.post_timeout_handler = post_timeout_handler
job.auto_timeout_retry = auto_timeout_retry
job.preprocess = preproc
job.postprocess = postproc
job.save()
job.create_working_path()
......
......@@ -15,26 +15,38 @@ from tests.BalsamTestCase import create_job, create_app
BALSAM_TEST_DIR = os.environ['BALSAM_TEST_DIRECTORY']
def run_launcher_until(function):
launcher_proc = subprocess.Popen(['balsam', 'launcher', '--consume'],
def kill_stragglers():
grep = subprocess.Popen('ps aux | grep launcher | grep consume',
stdout=subprocess.PIPE, shell=True)
stdout,stderr=grep.communicate()
stdout = stdout.decode('utf-8')
pids = [int(line.split()[1]) for line in stdout.split('\n') if 'python' in line]
for pid in pids:
try: os.kill(pid, signal.SIGKILL)
except: pass
def run_launcher_until(function, period=1.0, timeout=20.0):
launcher_proc = subprocess.Popen(['balsam', 'launcher', '--consume',
'--max-ranks-per-node', '8'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid)
success = poll_until_returns_true(function, timeout=20)
success = poll_until_returns_true(function, period=period, timeout=timeout)
# WEIRDEST BUG IN TESTING IF YOU OMIT THE FOLLOIWNG STATEMENT!
# launcher_proc.terminate() doesn't work; the process keeps on running and
# then you have two launchers from different test cases processing the same
# job...Very hard to catch bug.
os.killpg(os.getpgid(launcher_proc.pid), signal.SIGTERM) # Send the signal to all the process groups
launcher_proc.wait(timeout=10)
launcher_proc.wait(timeout=5)
kill_stragglers()
return success
def run_launcher_until_state(job, state):
def run_launcher_until_state(job, state, period=1.0, timeout=20.0):
def check():
job.refresh_from_db()
return job.state == state
success = run_launcher_until(check)
success = run_launcher_until(check, period=period, timeout=timeout)
return success
class TestSingleJobTransitions(BalsamTestCase):
......@@ -297,7 +309,7 @@ class TestSingleJobTransitions(BalsamTestCase):
args='side0.dat --sleep 5',
url_in=f'local:{remote_dir.name}', stage_out_files='square*',
url_out=f'local:{remote_dir.name}',
post_timeout_handler=True)
auto_timeout_retry=False)
# Job reaches the RUNNING state and then times out
success = run_launcher_until_state(job, 'RUNNING')
......@@ -316,3 +328,149 @@ class TestSingleJobTransitions(BalsamTestCase):
# But without timeout handling, it fails
success = run_launcher_until_state(job, 'FAILED')
self.assertTrue(success)
class TestDAG(BalsamTestCase):
def setUp(self):
aliases = "make_sides square reduce".split()
self.apps = {}
for name in aliases:
interpreter = sys.executable
exe_path = interpreter + " " + find_spec(f'tests.ft_apps.{name}').origin
pre_path = interpreter + " " + find_spec(f'tests.ft_apps.{name}_pre').origin
post_path = interpreter + " " + find_spec(f'tests.ft_apps.{name}_post').origin
app = create_app(name=name, executable=exe_path, preproc=pre_path,
postproc=post_path)
self.apps[name] = app
def test_dag_error_timeout(self):
'''test error/timeout handling mechanisms'''
from itertools import product
states = 'normal timeout fail'.split()
triplets = product(states, repeat=3)
NUM_SIDES, NUM_RANKS = 2, random.randint(1,2)
pre = self.apps['make_sides'].default_preprocess + f' {NUM_SIDES} {NUM_RANKS}'
parent_types = {
'normal': create_job(name='make_sides', app='make_sides',
preproc=pre, args='',
post_error_handler=True,
post_timeout_handler=True),
'timeout': create_job(name='make_sides', app='make_sides',
preproc=pre, args='--sleep 2',
post_error_handler=True,
post_timeout_handler=True),
'fail': create_job(name='make_sides', app='make_sides',
preproc=pre, args='--retcode 1',
post_error_handler=True,
post_timeout_handler=True),
}
child_types = {
'normal': create_job(name='square', app='square', args='',
post_error_handler=True,
post_timeout_handler=True),
'timeout': create_job(name='square', app='square', args='--sleep 2',
post_error_handler=True,
post_timeout_handler=True),
'fail': create_job(name='square', app='square', args='--retcode 1',
post_error_handler=True,
post_timeout_handler=True),
}
job_triplets = {}
for triplet in triplets:
parent, childA, childB = triplet
jobP = BalsamJob.objects.get(pk=parent_types[parent].pk)
jobA = BalsamJob.objects.get(pk=child_types[childA].pk)
jobB = BalsamJob.objects.get(pk=child_types[childB].pk)
jobP.pk, jobA.pk, jobB.pk = None,None,None
jobP.save()
jobA.application_args += " side0.dat"
jobA.input_files += "side0.dat"
jobA.save()
jobA.set_parents([jobP])
jobB.application_args += " side1.dat"
jobB.input_files += "side1.dat"
jobB.save()
jobB.set_parents([jobP])
job_triplets[triplet] = (jobP, jobA, jobB)
for j in parent_types.values(): j.delete()
for j in child_types.values(): j.delete()
del parent_types, child_types
self.assertEqual(BalsamJob.objects.all().count(), 81)
# Run the entire DAG until finished
now = time.time()
run_launcher_until(lambda: time.time() - now >= 20)
now = time.time()
run_launcher_until(lambda: time.time() - now >= 20)
def check():
for job in BalsamJob.objects.all():
job.refresh_from_db()
return all(j.state == 'JOB_FINISHED' for j in BalsamJob.objects.all())
success = run_launcher_until(check, timeout=180.0)
self.assertTrue(success)
def test_static(self):
'''test normal processing of a pre-defined DAG'''
NUM_SIDES, NUM_RANKS = 3, 2
pre = self.apps['make_sides'].default_preprocess + f' {NUM_SIDES} {NUM_RANKS}'
parent = create_job(name='make_sides', app='make_sides',
preproc=pre)
# Each side length is mapped to a square area in a set of mapping jobs.
# These 3 "square_jobs" all have the same parent make_sides, but each
# takes a different input file
square_jobs = {
i : create_job(
name=f'square{i}', app='square',
args=f'side{i}.dat', input_files=f'side{i}.dat'
)
for i in range(NUM_SIDES)
}
for job in square_jobs.values():
job.set_parents([parent])
# The final reduce job depends on all the square jobs: all square.dat
# files will be staged in and final results staged out to a remote
# directory
remote_dir = tempfile.TemporaryDirectory(prefix="remote")
reduce_job = create_job(name='reduce', app='reduce',
input_files="square*.dat*",
url_out=f'local:{remote_dir.name}',
stage_out_files='summary*.dat reduce.out'
)
reduce_job.set_parents(square_jobs.values())
# Run the entire DAG until finished
success = run_launcher_until_state(reduce_job, 'JOB_FINISHED',
timeout=180.0)
self.assertTrue(success)
for job in (parent, *square_jobs.values(), reduce_job):
job.refresh_from_db()
self.assertEqual(job.state, 'JOB_FINISHED')
# Double-check the calculation result; thereby testing flow of data
workdir = parent.working_directory
files = (os.path.join(workdir, f"side{i}.dat") for i in range(NUM_SIDES))
sides = [float(open(f).read()) for f in files]
self.assertTrue(all(0.5 <= s <= 5.0 for s in sides))
expected_result = sum(s**2 for s in sides)
resultpath = os.path.join(remote_dir.name, 'reduce.out')
result = open(resultpath).read()
self.assertIn('Total area:', result)
result = float(result.split()[-1])
self.assertAlmostEqual(result, expected_result)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment