Commit 3a181b8f authored by Michael Salim's avatar Michael Salim
Browse files

DAG functional tests; bugfixes in launcher and runners. mpi_ensemble reads...

DAG functional tests; bugfixes in launcher and runners. mpi_ensemble reads envs directly from database
parent 44967bc1
......@@ -77,6 +77,7 @@ def create_new_runners(jobs, runner_group, worker_group):
def check_parents(job, lock):
job.refresh_from_db()
parents = job.get_parents()
ready = all(p.state == 'JOB_FINISHED' for p in parents)
if ready:
......@@ -108,7 +109,8 @@ def main(args, transition_pool, runner_group, job_source):
job_source.refresh_from_db()
waiting_jobs = (j for j in job_source.jobs if
j.state in 'CREATED AWAITING_PARENTS LAUNCHER_QUEUED')
j.state in
'CREATED AWAITING_PARENTS LAUNCHER_QUEUED'.split())
for job in waiting_jobs: check_parents(job, transition_pool.lock)
transitionable_jobs = [
......@@ -133,8 +135,10 @@ def on_exit(runner_group, transition_pool, job_source):
runner_group.update_and_remove_finished()
logger.debug("Timing out runner processes")
transition_pool.lock.acquire()
for runner in runner_group:
runner.timeout()
transition_pool.lock.release()
transition_pool.end_and_wait()
logger.debug("Launcher exit graceful\n\n")
......
......@@ -15,6 +15,7 @@ from mpi4py import MPI
from balsamlauncher.util import cd, get_tail
from balsamlauncher.exceptions import *
from balsam.models import BalsamJob
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
......@@ -57,7 +58,9 @@ def run(job):
with cd(job.workdir) as _, open(outname, 'wb') as outf:
try:
status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT, cwd=job.workdir)
env = BalsamJob.objects.get(pk=job.id).get_envs() # TODO: Should we include this?
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT,
cwd=job.workdir,env=env)
retcode = proc.wait()
except Exception as e:
logger.exception(f"mpi_ensemble rank {RANK} job {job.id}: exception during Popen")
......
......@@ -172,7 +172,7 @@ class MPIEnsembleRunner(Runner):
)
rpn = worker_list[0].max_ranks_per_node
nranks = sum(w.num_nodes*rpn for w in worker_list)
envs = self.jobs[0].get_envs() # TODO: different envs for each job
envs = self.jobs[0].get_envs() # TODO: is pulling envs in runner inefficient?
app_cmd = f"{sys.executable} {MPI_ENSEMBLE_EXE} {ensemble_filename}"
mpi_str = self.mpi_cmd(worker_list, app_cmd=app_cmd, envs=envs,
......@@ -188,7 +188,6 @@ class MPIEnsembleRunner(Runner):
msg = "mpi_ensemble.py had nonzero return code:\n"
msg += "".join(self.monitor.available_lines())
logger.exception(msg)
raise RuntimeError(msg)
logger.debug("Checking mpi_ensemble stdout for status updates...")
for line in self.monitor.available_lines():
......
......@@ -235,6 +235,8 @@ def mkchild(args):
print(f"Created link {dag.current_job.cute_id} --> {child_job.cute_id}")
def launcher(args):
import signal
daemon = args.daemon
from importlib.util import find_spec
fname = find_spec("balsamlauncher.launcher").origin
......@@ -242,12 +244,19 @@ def launcher(args):
command = [sys.executable] + [fname] + original_args
print("Starting Balsam launcher")
p = subprocess.Popen(command)
handler = lambda a,b: p.terminate()
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGHUP, handler)
if args.daemon:
sys.exit(0)
try:
p.wait()
except KeyboardInterrupt:
print("Killing Balsam launcher")
finally:
p.terminate()
......
......@@ -47,7 +47,7 @@ def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE,
ranks_per_node=1, args='', workflow='', envs={}, state='CREATED',
url_in='', input_files='', url_out='', stage_out_files='',
post_error_handler=False, post_timeout_handler=False,
auto_timeout_retry=True, preproc='', postproc=''):
auto_timeout_retry=True, preproc='', postproc='', wtime=1):
if app and direct_command:
raise ValueError("Cannot have both application and direct command")
......@@ -78,6 +78,7 @@ def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE,
job.preprocess = preproc
job.postprocess = postproc
job.wall_time_minutes = wtime
job.save()
job.create_working_path()
......
......@@ -7,24 +7,28 @@ current_job = dag.current_job
print("Hello from make_sides_post")
if dag.ERROR:
print("make_sides_post recognized error flag")
if dag.ERROR or dag.TIMEOUT:
if dag.ERROR: print("make_sides_post recognized error flag")
else: print("make_sides_post recognized timeout flag")
num_sides = int(os.environ['BALSAM_FT_NUM_SIDES'])
num_files = len(glob.glob("side*.dat"))
assert num_files == num_sides
print("it's okay, the job was actually done")
current_job.update_state("JOB_FINISHED", "handled error; it was okay")
exit(0)
elif dag.TIMEOUT:
print("make_sides_post recognized timeout flag")
num_files = len(glob.glob("side*.dat"))
assert num_files == 0
print("Creating rescue job")
dag.spawn_child(clone=True,
application_args="--sleep 0 --retcode 0")
current_job.update_state("JOB_FINISHED", "spawned rescue job")
exit(0)
elif '--dynamic-spawn' not in sys.argv:
if num_files == num_sides:
print("it's okay, the job was actually done")
current_job.update_state("JOB_FINISHED", "handled error; it was okay")
exit(0)
elif num_files == 0:
print("Creating rescue job")
children = current_job.get_children()
rescue = dag.spawn_child(clone=True, application_args="--sleep 0 --retcode 0")
rescue.set_parents([])
current_job.update_state("JOB_FINISHED", f"spawned rescue job {rescue.cute_id}")
for child in children:
child.set_parents([rescue])
exit(0)
if '--dynamic-spawn' not in sys.argv:
sys.exit(0)
reduce_job = current_job.get_child_by_name('sum_squares')
......
......@@ -42,6 +42,17 @@ def run_launcher_until(function, period=1.0, timeout=20.0):
kill_stragglers()
return success
def run_launcher_seconds(seconds):
minutes = seconds / 60.0
launcher_path = sys.executable + " " + find_spec("balsamlauncher.launcher").origin
launcher_path += " --consume --max-ranks 8 --time-limit-minutes " + str(minutes)
launcher_proc = subprocess.Popen(launcher_path.split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid)
launcher_proc.communicate(timeout=seconds+5)
def run_launcher_until_state(job, state, period=1.0, timeout=20.0):
def check():
job.refresh_from_db()
......@@ -343,7 +354,7 @@ class TestDAG(BalsamTestCase):
self.apps[name] = app
def test_dag_error_timeout(self):
'''test error/timeout handling mechanisms'''
'''test error/timeout handling mechanisms (takes a couple min)'''
from itertools import product
states = 'normal timeout fail'.split()
......@@ -356,27 +367,33 @@ class TestDAG(BalsamTestCase):
'normal': create_job(name='make_sides', app='make_sides',
preproc=pre, args='',
post_error_handler=True,
post_timeout_handler=True),
post_timeout_handler=True,
wtime=0),
'timeout': create_job(name='make_sides', app='make_sides',
preproc=pre, args='--sleep 2',
post_error_handler=True,
post_timeout_handler=True),
post_timeout_handler=True,
wtime=0),
'fail': create_job(name='make_sides', app='make_sides',
preproc=pre, args='--retcode 1',
post_error_handler=True,
post_timeout_handler=True),
post_timeout_handler=True,
wtime=0),
}
child_types = {
'normal': create_job(name='square', app='square', args='',
post_error_handler=True,
post_timeout_handler=True),
post_timeout_handler=True,
wtime=0),
'timeout': create_job(name='square', app='square', args='--sleep 2',
post_error_handler=True,
post_timeout_handler=True),
post_timeout_handler=True,
wtime=0),
'fail': create_job(name='square', app='square', args='--retcode 1',
post_error_handler=True,
post_timeout_handler=True),
post_timeout_handler=True,
wtime=0),
}
......@@ -388,6 +405,10 @@ class TestDAG(BalsamTestCase):
jobA = BalsamJob.objects.get(pk=child_types[childA].pk)
jobB = BalsamJob.objects.get(pk=child_types[childB].pk)
jobP.pk, jobA.pk, jobB.pk = None,None,None
NUM_SIDES, NUM_RANKS = 2, random.randint(1,2)
pre = self.apps['make_sides'].default_preprocess + f' {NUM_SIDES} {NUM_RANKS}'
jobP.preprocess = pre
jobP.save()
jobA.application_args += " side0.dat"
......@@ -407,18 +428,19 @@ class TestDAG(BalsamTestCase):
del parent_types, child_types
self.assertEqual(BalsamJob.objects.all().count(), 81)
# Run the entire DAG until finished
now = time.time()
run_launcher_until(lambda: time.time() - now >= 20)
now = time.time()
run_launcher_until(lambda: time.time() - now >= 20)
# Run the entire DAG until finished, with two interruptions
run_launcher_seconds(25.0)
run_launcher_seconds(25.0)
def check():
for job in BalsamJob.objects.all():
job.refresh_from_db()
return all(j.state == 'JOB_FINISHED' for j in BalsamJob.objects.all())
success = run_launcher_until(check, timeout=180.0)
success = run_launcher_until(check, timeout=90.0)
for job in BalsamJob.objects.all():
job.refresh_from_db()
print(job.cute_id, job.get_recent_state_str())
self.assertTrue(success)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment