Commit 0d9ae5c9 authored by Michael Salim's avatar Michael Salim
Browse files

When a job is USER_KILLED, the Runners respond accordingly: terminate

the job.

BalsamJob Model:
If a RecordModifiedError arises while killing, the job
state is always overwritten as USER_KILLED

If RecordModifiedError arises when trying to update the job to
some other state, it is ignored and the job is left in USER_KILLED
state.
parent f14b1ed3
......@@ -65,6 +65,11 @@ def read_jobs(fp):
def run(job):
job_from_db = BalsamJob.objects.get(pk=job.id)
if job_from_db.state == 'USER_KILLED':
status_msg(job.id, "USER_KILLED", msg="mpi_ensemble skipping this job")
return
basename = job_from_db.name
outname = f"{basename}.out"
logger.debug(f"mpi_ensemble rank {RANK}: starting job {job.id}")
......
......@@ -165,7 +165,7 @@ class MPIRunner(Runner):
def update_jobs(self, timeout=False):
'''Update the job state and return finished flag'''
job = self.jobs[0]
#job.refresh_from_db() # TODO: handle RecordModified
retcode = self.process.poll()
if retcode == None:
logger.debug(f"MPIRunner {job.cute_id} still running")
......@@ -188,7 +188,11 @@ class MPIRunner(Runner):
msg = f"MPIRunner {job.cute_id} RUN_TIMEOUT"
logger.info(msg)
if job.state != curstate: job.update_state(curstate, msg) # TODO: handle RecordModified
if job.state != curstate:
job.update_state(curstate, msg)
else:
job.refresh_from_db()
finished = timeout or (retcode is not None)
return finished
......@@ -262,6 +266,9 @@ class MPIEnsembleRunner(Runner):
msg = f"mpi_ensemble.py had nonzero return code: {retcode}\n"
msg += "".join(self.monitor.available_lines())
logger.exception(msg)
for job in self.jobs:
if job.state != 'RUN_DONE':
job.update_state('FAILED', 'MPIEnsemble error')
finished = timeout or (retcode is not None)
return finished
......@@ -391,6 +398,15 @@ class RunnerGroup:
logger.debug(f"updating runner {i}")
finished = runner.update_jobs(timeout)
if finished: finished_runners.append(runner)
killed_runners = (r for r in self.runners if r not in finished_runners
and all(j.state=='USER_KILLED' for j in r.jobs))
for runner in killed_runners:
runner.process.terminate()
try: runner.process.wait(timeout=10)
except: runner.process.kill()
finished_runners.append(runner)
self.lock.release()
if timeout:
......
......@@ -10,6 +10,7 @@ from django.core.exceptions import ValidationError,ObjectDoesNotExist
from django.conf import settings
from django.db import models
from concurrency.fields import IntegerVersionField
from concurrency.exceptions import RecordModifiedError
from balsam.common import Serializer
......@@ -389,7 +390,18 @@ auto timeout retry: {self.auto_timeout_retry}
self.state_history += history_line(new_state, message)
self.state = new_state
self.save(update_fields=['state', 'state_history'],using=using)
try:
self.save(update_fields=['state', 'state_history'],using=using)
except RecordModifiedError:
self.refresh_from_db()
if self.state == 'USER_KILLED' and new_state != 'USER_KILLED':
return
elif new_state == 'USER_KILLED':
self.state_history += history_line(new_state, message)
self.state = new_state
self.save(update_fields=['state', 'state_history'],using=using)
else:
raise
def get_recent_state_str(self):
return self.state_history.split("\n")[-1].strip()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment