Commit 50f4727f authored by Michael Salim's avatar Michael Salim

FT and bugfixes

parent fcd3ad7a
......@@ -254,7 +254,7 @@ class BalsamJob(models.Model):
return f'''
Balsam Job
----------
ID: {self.job_id}
ID: {self.pk}
name: {self.name}
workflow: {self.workflow}
latest state: {self.get_recent_state_str()}
......@@ -406,7 +406,7 @@ auto timeout retry: {self.auto_timeout_retry}
def get_line_string(self):
recent_state = self.get_recent_state_str()
app = self.application if self.application else self.direct_command
return f' {str(self.job_id):36} | {self.name:26} | {self.workflow:26} | {app:26} | {recent_state}'
return f' {str(self.pk):36} | {self.name:26} | {self.workflow:26} | {app:26} | {recent_state}'
def runtime_str(self):
if self.runtime_seconds == 0: return ''
......@@ -423,12 +423,13 @@ auto timeout retry: {self.auto_timeout_retry}
top = settings.BALSAM_WORK_DIRECTORY
if self.workflow:
top = os.path.join(top, self.workflow)
name = self.name.replace(' ', '_')
name = self.name.strip().replace(' ', '_')
name += '_' + str(self.pk)[:8]
path = os.path.join(top, name)
if os.path.exists(path):
jid = str(self.job_id)
path += "_" + jid[0]
jid = str(self.pk)[8:]
path += jid[0]
i = 1
while os.path.exists(path):
path += jid[i]
......
......@@ -63,14 +63,15 @@ def read_jobs(fp):
def run(job):
basename = os.path.basename(job.workdir)
job_from_db = BalsamJob.objects.get(pk=job.id)
basename = job_from_db.name
outname = f"{basename}.out"
logger.debug(f"mpi_ensemble rank {RANK}: starting job {job.id}")
with cd(job.workdir) as _, open(outname, 'wb') as outf:
try:
status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
env = BalsamJob.objects.get(pk=job.id).get_envs() # TODO: Should we include this?
env = job_from_db.get_envs() # TODO: Should we include this?
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT,
cwd=job.workdir,env=env)
......
......@@ -106,7 +106,7 @@ class MPIRunner(Runner):
num_ranks=nranks, ranks_per_node=rpn,
threads_per_rank=tpr, threads_per_core=tpc)
basename = os.path.basename(job.working_directory)
basename = job.name
outname = os.path.join(job.working_directory, f"{basename}.out")
self.outfile = open(outname, 'w+b')
self.popen_args['args'] = shlex.split(mpi_str)
......
......@@ -37,6 +37,7 @@ def cmdline(cmd,envs=None,shell=True):
def poll_until_returns_true(function, *, args=(), period=1.0, timeout=12.0):
start = time.time()
result = False
while time.time() - start < timeout:
result = function(*args)
if result: break
......
......@@ -256,12 +256,12 @@ class TestSingleJobTransitions(BalsamTestCase):
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=6)
success = poll_until_returns_true(check,timeout=12)
self.assertTrue(success)
self.assertEquals(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
success = run_launcher_until_state(job, 'JOB_FINISHED', timeout=60)
success = run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
self.assertIn('RESTART_READY', job.state_history)
......@@ -293,7 +293,7 @@ class TestSingleJobTransitions(BalsamTestCase):
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=10)
success = poll_until_returns_true(check,timeout=12)
self.assertEquals(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
......@@ -331,7 +331,7 @@ class TestSingleJobTransitions(BalsamTestCase):
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=10)
success = poll_until_returns_true(check,timeout=12)
self.assertEqual(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
......@@ -409,6 +409,9 @@ class TestDAG(BalsamTestCase):
jobA = BalsamJob.objects.get(pk=child_types[childA].pk)
jobB = BalsamJob.objects.get(pk=child_types[childB].pk)
jobP.pk, jobA.pk, jobB.pk = None,None,None
for job in (jobP,jobA,jobB):
job.working_directory = ''
job.save()
# Parent has two children (sides); either 1 rank (serial) or 2 ranks (mpi)
NUM_SIDES, NUM_RANKS = 2, random.randint(1,2)
......@@ -435,18 +438,23 @@ class TestDAG(BalsamTestCase):
self.assertEqual(BalsamJob.objects.all().count(), 81)
# Run the entire DAG until finished, with two interruptions
for job in BalsamJob.objects.all():
self.assertEqual(job.working_directory, '')
run_launcher_seconds(25.0)
run_launcher_seconds(25.0)
def check():
for job in BalsamJob.objects.all():
job.refresh_from_db()
return all(j.state == 'JOB_FINISHED' for j in BalsamJob.objects.all())
# Just check that all jobs reach JOB_FINISHED state
success = run_launcher_until(check, timeout=360.0)
self.assertTrue(success)
# No race conditions in working directory naming: each job must have a
# unique working directory
workdirs = [job.working_directory for job in BalsamJob.objects.all()]
self.assertEqual(len(workdirs), len(set(workdirs)))
def test_static(self):
'''test normal processing of a pre-defined DAG'''
......@@ -526,11 +534,23 @@ class TestDAG(BalsamTestCase):
chB.set_parents([parent])
# Run until A finishes, but B will still be hanging
# If a child times out and we re-run the launcher, everything is handled
success = run_launcher_until_state(chA, 'JOB_FINISHED')
def check():
chA.refresh_from_db()
chB.refresh_from_db()
return chA.state=='JOB_FINISHED' and chB.state=='RUNNING'
success = run_launcher_until(check)
self.assertTrue(success)
chB.refresh_from_db()
# Give the launcher time to clean up and mark B as timed out
def check():
chB.refresh_from_db()
return chB.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check, timeout=12)
self.assertEqual(chB.state, 'RUN_TIMEOUT')
# Since B has a timeout handler, when we re-run the launcher,
# It is handled gracefully
success = run_launcher_until_state(chB, 'JOB_FINISHED')
parent.refresh_from_db()
......@@ -563,16 +583,15 @@ class TestDAG(BalsamTestCase):
# child B will give a RUN_ERROR, but it will be handled
def check():
for j in (chA, chB, parent):
j.refresh_from_db()
return all(j.state=='JOB_FINISHED' for j in (parent,chA,chB))
return all(j.state=='JOB_FINISHED' for j in BalsamJob.objects.all())
print("running launcher until finished")
success = run_launcher_until(check)
self.assertTrue(success)
parent.refresh_from_db()
chA.refresh_from_db()
chB.refresh_from_db()
self.assertEqual(parent.state, 'JOB_FINISHED')
self.assertEqual(chA.state, 'JOB_FINISHED')
self.assertEqual(chB.state, 'JOB_FINISHED')
......@@ -605,6 +624,12 @@ class TestDAG(BalsamTestCase):
self.assertTrue(success)
# Parent timed out
def check():
parent.refresh_from_db()
return parent.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=12)
self.assertTrue(success)
parent.refresh_from_db()
chA.refresh_from_db()
chB.refresh_from_db()
......@@ -613,9 +638,15 @@ class TestDAG(BalsamTestCase):
self.assertEqual(chB.state, 'AWAITING_PARENTS')
# On re-run, everything finishes okay
success = run_launcher_until_state(chB, 'JOB_FINISHED')
def check():
chA.refresh_from_db()
chB.refresh_from_db()
return chA.state=='JOB_FINISHED' and chB.state=='JOB_FINISHED'
success = run_launcher_until(check)
parent.refresh_from_db()
chA.refresh_from_db()
chB.refresh_from_db()
self.assertEqual(parent.state, 'JOB_FINISHED')
self.assertEqual(chA.state, 'JOB_FINISHED')
......@@ -715,7 +746,7 @@ class TestDAG(BalsamTestCase):
# Run the entire DAG until finished
success = run_launcher_until_state(reduce_job, 'JOB_FINISHED',
timeout=180.0)
timeout=200.0)
self.assertTrue(success)
for job in BalsamJob.objects.all():
self.assertEqual(job.state, 'JOB_FINISHED')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment