Commit 47e44147 authored by Michael Salim's avatar Michael Salim
Browse files

one-job FTs and added some logging to runner

parent 81cc7212
......@@ -116,13 +116,9 @@ def on_exit(runner_group, transition_pool, job_source):
runner_group.update_and_remove_finished()
for runner in runner_group:
logger.debug("Timing out runner processes")
runner.timeout()
job_source.refresh_from_db()
timedout_jobs = job_source.by_states['RUN_TIMEOUT']
for job in timedout_jobs:
transition_pool.add_job(job)
transition_pool.end_and_wait()
logger.debug("Launcher exit graceful\n\n")
exit(0)
......@@ -141,7 +137,7 @@ def get_args(inputcmd=None):
parser.add_argument('--nodes-per-worker', type=int, default=1)
parser.add_argument('--max-ranks-per-node', type=int, default=1,
help="For non-MPI jobs, how many to pack per worker")
parser.add_argument('--time-limit-minutes', type=int, default=0,
parser.add_argument('--time-limit-minutes', type=float, default=0,
help="Provide a walltime limit if not already imposed")
parser.add_argument('--daemon', action='store_true')
if inputcmd:
......
......@@ -24,7 +24,7 @@ from queue import Queue, Empty
from django.conf import settings
from django.db import transaction
import balsam.models
from balsam.models import InvalidStateError
from balsamlauncher import mpi_commands
from balsamlauncher.exceptions import *
from balsamlauncher.util import cd, get_tail
......@@ -86,7 +86,9 @@ class Runner:
def timeout(self):
self.process.terminate()
for job in self.jobs:
if job.state == 'RUNNING': job.update_state('RUN_TIMEOUT')
if job.state == 'RUNNING':
job.update_state('RUN_TIMEOUT')
logger.info(f"Runner job {job.cute_id}: RUN_TIMEOUT")
class MPIRunner(Runner):
'''One subprocess, one job'''
......@@ -190,13 +192,13 @@ class MPIEnsembleRunner(Runner):
logger.debug("Checking mpi_ensemble stdout for status updates...")
for line in self.monitor.available_lines():
pk, state, *msg = line.split()
msg = ' '.join(msg)
if pk in self.jobs_by_pk and state in balsam.models.STATES:
try:
pk, state, *msg = line.split()
msg = ' '.join(msg)
job = self.jobs_by_pk[pk]
job.update_state(state, msg) # TODO: handle RecordModified exception
logger.info(f"MPIEnsemble {job.cute_id} updated to {state}: {msg}")
else:
except (ValueError, KeyError, InvalidStateError) as e:
logger.error(f"Invalid statusMsg from mpi_ensemble: {line.strip()}")
class RunnerGroup:
......
......@@ -288,7 +288,7 @@ def make_parser():
parser_launcher.add_argument('--nodes-per-worker', type=int, default=1,
help="For non-MPI jobs, how many to pack per worker")
parser_launcher.add_argument('--max-ranks-per-node', type=int, default=1)
parser_launcher.add_argument('--time-limit-minutes', type=int,
parser_launcher.add_argument('--time-limit-minutes', type=float,
help="Override auto-detected walltime limit (runs"
" forever if no limit is detected or specified)")
parser_launcher.add_argument('--daemon', action='store_true')
......
......@@ -4,8 +4,8 @@ import balsamlauncher.dag as dag
print("hello from square_post")
print(f"jobid: {dag.current_job.pk}")
if dag.ERROR:
print("recgonized error")
print("recognized error")
dag.current_job.update_state("JOB_FINISHED", "handled error in square_post")
if dag.TIMEOUT:
print("recgonized timeout")
print("recognized timeout")
dag.current_job.update_state("JOB_FINISHED", "handled timeout in square_post")
......@@ -30,6 +30,12 @@ def run_launcher_until(function):
launcher_proc.wait(timeout=10)
return success
def run_launcher_until_state(job, state):
def check():
job.refresh_from_db()
return job.state == state
success = run_launcher_until(check)
return success
class TestSingleJobTransitions(BalsamTestCase):
def setUp(self):
......@@ -44,7 +50,7 @@ class TestSingleJobTransitions(BalsamTestCase):
postproc=post_path)
self.apps[name] = app
def test_one_job_normal(self):
def test_normal(self):
'''normal processing of a single job'''
# A mock "remote" data source has a file side0.dat
......@@ -66,10 +72,7 @@ class TestSingleJobTransitions(BalsamTestCase):
# Run the launcher and make sure that the job gets carried all the way
# through to completion
def check():
job.refresh_from_db()
return job.state == 'JOB_FINISHED'
success = run_launcher_until(check)
success = run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
work_dir = job.working_directory
......@@ -130,7 +133,7 @@ class TestSingleJobTransitions(BalsamTestCase):
self.assertEquals(result_remote, 81.0)
self.assertIn("Hello from square", open(remote_stdout).read())
def test_one_job_error_unhandled(self):
def test_error_unhandled(self):
'''test unhandled return code from app'''
remote_dir = tempfile.TemporaryDirectory(prefix="remote")
......@@ -147,10 +150,7 @@ class TestSingleJobTransitions(BalsamTestCase):
self.assertEqual(BalsamJob.objects.all().count(), 1)
# The job is marked FAILED due to unhandled nonzero return code
def check():
job.refresh_from_db()
return job.state == 'FAILED'
success = run_launcher_until(check)
success = run_launcher_until_state(job, 'FAILED')
self.assertTrue(success)
# (But actually the application ran and printed its result correctly)
......@@ -166,3 +166,153 @@ class TestSingleJobTransitions(BalsamTestCase):
jobid_line = [l for l in preproc_out_contents.split('\n') if 'jobid' in l][0]
self.assertIn(str(job.pk), jobid_line)
def test_error_handled(self):
'''test postprocessor-handled nonzero return code'''
remote_dir = tempfile.TemporaryDirectory(prefix="remote")
remote_path = os.path.join(remote_dir.name, 'side0.dat')
with open(remote_path, 'w') as fp:
fp.write('9\n')
# Same as previous test, but square.py returns nonzero
job = create_job(name='square_testjob2', app='square',
args='side0.dat --retcode 1',
url_in=f'local:{remote_dir.name}', stage_out_files='square*',
url_out=f'local:{remote_dir.name}',
post_error_handler=True)
self.assertEqual(job.application_args, 'side0.dat --retcode 1')
self.assertEqual(BalsamJob.objects.all().count(), 1)
# The job finished successfully despite a nonzero return code
success = run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
# Make sure at some point, it was marked with RUN_ERROR
self.assertIn('RUN_ERROR', job.state_history)
# It was saved by the postprocessor:
self.assertIn('handled error in square_post', job.state_history)
# We can also check the postprocessor stdout:
work_dir = job.working_directory
post_out = os.path.join(work_dir, 'postprocess.log')
post_contents = open(post_out).read()
self.assertIn("recognized error", post_contents)
self.assertIn("Invoked to handle RUN_ERROR", post_contents)
# job id sanity check
jobid_line = [l for l in post_contents.split('\n') if 'jobid' in l][0]
self.assertIn(str(job.pk), jobid_line)
def test_timeout_auto_retry(self):
'''test auto retry mechanism for timed out jobs'''
remote_dir = tempfile.TemporaryDirectory(prefix="remote")
remote_path = os.path.join(remote_dir.name, 'side0.dat')
with open(remote_path, 'w') as fp:
fp.write('9\n')
# Same as previous test, but square.py hangs for 300 sec
job = create_job(name='square_testjob2', app='square',
args='side0.dat --sleep 5',
url_in=f'local:{remote_dir.name}', stage_out_files='square*',
url_out=f'local:{remote_dir.name}')
# Job reaches the RUNNING state and then times out
success = run_launcher_until_state(job, 'RUNNING')
self.assertTrue(success)
# On termination, actively running job is marked RUN_TIMEOUT
def check():
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=6)
self.assertTrue(success)
self.assertEquals(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
success = run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
self.assertIn('RESTART_READY', job.state_history)
# The postprocessor was not invoked by timeout handler
work_dir = job.working_directory
post_out = os.path.join(work_dir, 'postprocess.log')
post_contents = open(post_out).read()
self.assertNotIn('handling RUN_TIMEOUT', post_contents)
def test_timeout_post_handler(self):
'''test postprocess handling option for timed-out jobs'''
remote_dir = tempfile.TemporaryDirectory(prefix="remote")
remote_path = os.path.join(remote_dir.name, 'side0.dat')
with open(remote_path, 'w') as fp:
fp.write('9\n')
# Same as previous test, but square.py hangs for 300 sec
job = create_job(name='square_testjob2', app='square',
args='side0.dat --sleep 5',
url_in=f'local:{remote_dir.name}', stage_out_files='square*',
url_out=f'local:{remote_dir.name}',
post_timeout_handler=True)
# Job reaches the RUNNING state and then times out
success = run_launcher_until_state(job, 'RUNNING')
self.assertTrue(success)
# On termination, actively running job is marked RUN_TIMEOUT
def check():
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=6)
self.assertTrue(success)
self.assertEquals(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
success = run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
self.assertNotIn('RESTART_READY', job.state_history)
self.assertIn('handled timeout in square_post', job.state_history)
# The postprocessor handled the timeout; did not restart
work_dir = job.working_directory
post_out = os.path.join(work_dir, 'postprocess.log')
post_contents = open(post_out).read()
self.assertIn('Invoked to handle RUN_TIMEOUT', post_contents)
self.assertIn('recognized timeout', post_contents)
def test_timeout_unhandled(self):
'''with timeout handling disabled, jobs are marked FAILED'''
remote_dir = tempfile.TemporaryDirectory(prefix="remote")
remote_path = os.path.join(remote_dir.name, 'side0.dat')
with open(remote_path, 'w') as fp:
fp.write('9\n')
# Same as previous test, but square.py hangs for 300 sec
job = create_job(name='square_testjob2', app='square',
args='side0.dat --sleep 5',
url_in=f'local:{remote_dir.name}', stage_out_files='square*',
url_out=f'local:{remote_dir.name}',
post_timeout_handler=True)
# Job reaches the RUNNING state and then times out
success = run_launcher_until_state(job, 'RUNNING')
self.assertTrue(success)
# On termination, actively running job is marked RUN_TIMEOUT
def check():
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=6)
self.assertTrue(success)
self.assertEquals(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
# But without timeout handling, it fails
success = run_launcher_until_state(job, 'FAILED')
self.assertTrue(success)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment