Commit ebd0bc30 authored by Michael Salim's avatar Michael Salim
Browse files

All balsamlauncher tests passing on Cooley: from interactive session with 2 compute nodes

parent 313f2b38
......@@ -26,6 +26,7 @@ from balsamlauncher import runners
from balsamlauncher.exceptions import *
RUNNABLE_STATES = ['PREPROCESSED', 'RESTART_READY']
HANDLING_EXIT = False
def delay(period=settings.BALSAM_SERVICE_PERIOD):
nexttime = time.time() + period
......@@ -95,8 +96,14 @@ def check_parents(job, lock):
def main(args, transition_pool, runner_group, job_source):
delay_timer = delay()
elapsed_min = elapsed_time_minutes()
logger.debug(f"time limit provided {args.time_limit_minutes}")
last_created = 0.0
if args.time_limit_minutes > 0:
timeout = lambda : next(elapsed_min) >= args.time_limit_minutes
def timeout():
elapsed = next(elapsed_min)
logger.debug(f"{elapsed} minutes elapsed out of {args.time_limit_minutes}")
return elapsed >= args.time_limit_minutes
else:
timeout = lambda : scheduler.remaining_time_seconds() <= 0.0
......@@ -109,8 +116,7 @@ def main(args, transition_pool, runner_group, job_source):
job_source.refresh_from_db()
waiting_jobs = (j for j in job_source.jobs if
j.state in
'CREATED AWAITING_PARENTS LAUNCHER_QUEUED'.split())
j.state in 'CREATED AWAITING_PARENTS LAUNCHER_QUEUED'.split())
for job in waiting_jobs: check_parents(job, transition_pool.lock)
transitionable_jobs = [
......@@ -126,22 +132,28 @@ def main(args, transition_pool, runner_group, job_source):
any_finished = runner_group.update_and_remove_finished()
job_source.refresh_from_db()
created = create_new_runners(job_source.jobs, runner_group, worker_group)
if time.time() - last_created > 5:
created = create_new_runners(job_source.jobs, runner_group, worker_group)
if created:
last_created = time.time()
if any_finished or created: wait = False
if wait: next(delay_timer)
def on_exit(runner_group, transition_pool, job_source):
global HANDLING_EXIT
if HANDLING_EXIT: return
HANDLING_EXIT = True
logger.debug("Entering on_exit cleanup function")
logger.debug("on_exit: flush job queue")
transition_pool.flush_job_queue()
runner_group.update_and_remove_finished()
logger.debug("Timing out runner processes")
transition_pool.lock.acquire()
for runner in runner_group:
runner.timeout()
transition_pool.lock.release()
logger.debug("on_exit: update/remove/timeout jobs from runner group")
runner_group.update_and_remove_finished(timeout=True)
logger.debug("on_exit: send end message to transition threads")
transition_pool.end_and_wait()
logger.debug("Launcher exit graceful\n\n")
logger.debug("on_exit: Launcher exit graceful\n\n")
exit(0)
......
......@@ -9,6 +9,7 @@ os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
logger = logging.getLogger('balsamlauncher.mpi_ensemble')
from subprocess import Popen, STDOUT
from mpi4py import MPI
......@@ -19,16 +20,25 @@ from balsam.models import BalsamJob
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
def on_exit():
logger.debug("mpi_ensemble received interrupt: quitting now")
HANDLE_EXIT = False
def on_exit(job):
global HANDLE_EXIT
if HANDLE_EXIT: return
HANDLE_EXIT = True
logger.debug(f"mpi_ensemble.py rank {RANK} received interrupt: quitting now")
if job is not None:
job.terminate()
try: job.wait(timeout=10)
except: job.kill()
print("TIMEOUT")
MPI.Finalize()
sys.exit(0)
handler = lambda a,b: on_exit()
handler = lambda a,b: on_exit(None)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGHUP, handler)
Job = namedtuple('Job', ['id', 'workdir', 'cmd'])
......@@ -52,15 +62,22 @@ def read_jobs(fp):
def run(job):
basename = os.path.basename(job.workdir)
outname = f"{basename}.out"
logger.debug(f"mpi_ensemble rank {RANK}: starting job {job.id}")
with cd(job.workdir) as _, open(outname, 'wb') as outf:
try:
status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
env = BalsamJob.objects.get(pk=job.id).get_envs() # TODO: Should we include this?
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT,
cwd=job.workdir,env=env)
handler = lambda a,b: on_exit(proc)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
retcode = proc.wait()
except Exception as e:
logger.exception(f"mpi_ensemble rank {RANK} job {job.id}: exception during Popen")
......
......@@ -77,19 +77,12 @@ class Runner:
self.monitor = MonitorStream(self.process.stdout)
self.monitor.start()
def update_jobs(self):
def update_jobs(self, timeout=False):
raise NotImplementedError
def finished(self):
return self.process.poll() is not None
def timeout(self):
self.process.terminate()
for job in self.jobs:
if job.state == 'RUNNING':
job.update_state('RUN_TIMEOUT')
logger.info(f"Runner job {job.cute_id}: RUN_TIMEOUT")
class MPIRunner(Runner):
'''One subprocess, one job'''
def __init__(self, job_list, worker_list):
......@@ -124,7 +117,7 @@ class MPIRunner(Runner):
logger.info(f"MPIRunner {job.cute_id} Popen:\n{self.popen_args['args']}")
logger.info(f"MPIRunner: writing output to {outname}")
def update_jobs(self):
def update_jobs(self, timeout=False):
job = self.jobs[0]
#job.refresh_from_db() # TODO: handle RecordModified
retcode = self.process.poll()
......@@ -143,8 +136,15 @@ class MPIRunner(Runner):
tail = get_tail(self.outfile.name)
msg = f"MPIRunner {job.cute_id} RETURN CODE {retcode}:\n{tail}"
logger.info(msg)
if job.state != curstate: job.update_state(curstate, msg) # TODO: handle RecordModified
if curstate in ['RUNNING', 'RUN_ERROR'] and timeout:
curstate = 'RUN_TIMEOUT'
msg = f"MPIRunner {job.cute_id} RUN_TIMEOUT"
logger.info(msg)
if job.state != curstate: job.update_state(curstate, msg) # TODO: handle RecordModified
finished = timeout or (retcode is not None)
return finished
class MPIEnsembleRunner(Runner):
......@@ -170,6 +170,8 @@ class MPIEnsembleRunner(Runner):
logger.info('MPIEnsemble handling jobs: '
f' {", ".join(j.cute_id for j in self.jobs)} '
)
os.chmod(ensemble_filename, 0o644)
rpn = worker_list[0].max_ranks_per_node
nranks = sum(w.num_nodes*rpn for w in worker_list)
envs = self.jobs[0].get_envs() # TODO: is pulling envs in runner inefficient?
......@@ -180,15 +182,11 @@ class MPIEnsembleRunner(Runner):
self.popen_args['args'] = shlex.split(mpi_str)
logger.info(f"MPIEnsemble Popen:\n {self.popen_args['args']}")
self.ensemble_filename = ensemble_filename
def update_jobs(self):
def update_jobs(self, timeout=False):
'''Relies on stdout of mpi_ensemble.py'''
retcode = self.process.poll()
if retcode not in [None, 0]:
msg = "mpi_ensemble.py had nonzero return code:\n"
msg += "".join(self.monitor.available_lines())
logger.exception(msg)
logger.debug("Checking mpi_ensemble stdout for status updates...")
for line in self.monitor.available_lines():
try:
......@@ -200,6 +198,21 @@ class MPIEnsembleRunner(Runner):
except (ValueError, KeyError, InvalidStateError) as e:
logger.error(f"Invalid statusMsg from mpi_ensemble: {line.strip()}")
retcode = None
if timeout:
for job in self.jobs:
if job.state == 'RUNNING':
logger.debug(f"MPIEnsemble job {job.cute_id} RUN_TIMEOUT")
job.update_state('RUN_TIMEOUT', 'timed out during MPIEnsemble')
else:
retcode = self.process.poll()
if retcode not in [None, 0]:
msg = f"mpi_ensemble.py had nonzero return code: {retcode}\n"
msg += "".join(self.monitor.available_lines())
logger.exception(msg)
finished = timeout or (retcode is not None)
return finished
class RunnerGroup:
MAX_CONCURRENT_RUNNERS = settings.BALSAM_MAX_CONCURRENT_RUNNERS
......@@ -283,31 +296,37 @@ class RunnerGroup:
self.runners.append(runner)
for worker in assigned_workers: worker.idle = False
def update_and_remove_finished(self):
def update_and_remove_finished(self, timeout=False):
# TODO: Benchmark performance overhead; does grouping into one
# transaction save significantly?
logger.debug(f"Checking status of {len(self.runners)} active runners")
any_finished = False
finished_runners = []
self.lock.acquire()
for runner in self.runners: runner.update_jobs()
for i, runner in enumerate(self.runners):
logger.debug(f"updating runner {i}")
finished = runner.update_jobs(timeout)
if finished: finished_runners.append(runner)
self.lock.release()
finished_runners = (r for r in self.runners if r.finished())
if timeout:
for runner in self.runners:
runner.process.terminate()
try: runner.process.wait(timeout=10)
except: runner.process.kill()
for runner in finished_runners:
if any(j.state not in ['RUN_DONE','RUN_ERROR','RUN_TIMEOUT'] for j in runner.jobs):
self.lock.acquire()
runner.update_jobs()
self.lock.release()
if any(j.state not in ['RUN_DONE','RUN_ERROR','RUN_TIMEOUT'] for j in runner.jobs):
msg = (f"Job {job.cute_id} runner process done, but failed to update job state.")
if any(j.state == 'RUNNING' for j in runner.jobs):
msg = (f"Runner process done, but failed to update job state.")
logger.exception(msg)
raise RuntimeError(msg)
else:
any_finished = True
self.runners.remove(runner)
for worker in runner.worker_list:
worker.idle = True
any_finished = finished_runners != []
return any_finished
@property
......
......@@ -7,6 +7,7 @@ import os
from io import StringIO
from traceback import print_exc
import signal
import shutil
import sys
import subprocess
import tempfile
......@@ -33,15 +34,15 @@ logger = logging.getLogger('balsamlauncher.transitions')
# Balsam, because it's built in and requires zero user configuration
if sys.platform.startswith('darwin'):
LockClass = multiprocessing.Lock
logger.debug('Using real multiprocessing.Lock')
elif sys.platform.startswith('win32'):
LockClass = multiprocessing.Lock
else:
logger.debug('Using dummy lock')
class DummyLock:
def acquire(self): pass
def release(self): pass
LockClass = DummyLock
LockClass = multiprocessing.Lock # TODO: replace with better solution!
logger.debug(f'Using lock: {LockClass}')
PREPROCESS_TIMEOUT_SECONDS = 300
POSTPROCESS_TIMEOUT_SECONDS = 300
......@@ -50,13 +51,11 @@ SITE = settings.BALSAM_SITE
StatusMsg = namedtuple('StatusMsg', ['pk', 'state', 'msg'])
JobMsg = namedtuple('JobMsg', ['job', 'transition_function'])
def on_exit(lock):
logger.debug("Transition thread caught SIGTERM")
#try: lock.release()
#except ValueError: pass
def on_exit():
logger.debug("TransitionProc caught SIGTERM: do nothing and wait for end")
def main(job_queue, status_queue, lock):
handler = lambda a,b: on_exit(lock)
handler = lambda a,b: on_exit()
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
......@@ -65,7 +64,7 @@ def main(job_queue, status_queue, lock):
job_msg = job_queue.get()
job, transition_function = job_msg
if job == 'end':
if job == 'end':
logger.debug("Received end..quitting transition loop")
return
if job.work_site != SITE:
......@@ -239,7 +238,7 @@ def stage_out(job, lock):
for f in matches:
base = os.path.basename(f)
dst = os.path.join(stagingdir, base)
os.link(src=f, dst=dst)
shutil.copyfile(src=f, dst=dst)
logger.info(f"staging {f} out for transfer")
logger.info(f"transferring to {url_out}")
transfer.stage_out(f"{stagingdir}/", f"{url_out}/")
......
......@@ -235,8 +235,6 @@ def mkchild(args):
print(f"Created link {dag.current_job.cute_id} --> {child_job.cute_id}")
def launcher(args):
import signal
daemon = args.daemon
from importlib.util import find_spec
fname = find_spec("balsamlauncher.launcher").origin
......@@ -245,19 +243,10 @@ def launcher(args):
print("Starting Balsam launcher")
p = subprocess.Popen(command)
handler = lambda a,b: p.terminate()
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGHUP, handler)
if args.daemon:
sys.exit(0)
try:
else:
p.wait()
except KeyboardInterrupt:
print("Killing Balsam launcher")
finally:
p.terminate()
def service(args):
......
......@@ -5,6 +5,13 @@ import tempfile
import unittest
def set_permissions(top):
os.chmod(top, 0o755)
for root,subdirs,files in os.walk(top):
for dir in (os.path.join(root, s) for s in subdirs):
os.chmod(dir, 0o755)
for file in (os.path.join(root, f) for f in files):
os.chmod(file, 0o644)
if __name__ == "__main__":
tempdir = tempfile.TemporaryDirectory(dir=os.getcwd(), prefix="testdata_")
......@@ -14,6 +21,8 @@ if __name__ == "__main__":
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
set_permissions(tempdir.name)
loader = unittest.defaultTestLoader
if len(sys.argv) > 1:
names = sys.argv[1:]
......
......@@ -15,18 +15,7 @@ from tests.BalsamTestCase import create_job, create_app
BALSAM_TEST_DIR = os.environ['BALSAM_TEST_DIRECTORY']
def kill_stragglers():
grep = subprocess.Popen('ps aux | grep launcher | grep consume',
stdout=subprocess.PIPE, shell=True)
stdout,stderr=grep.communicate()
stdout = stdout.decode('utf-8')
pids = [int(line.split()[1]) for line in stdout.split('\n') if 'python' in line]
for pid in pids:
try: os.kill(pid, signal.SIGKILL)
except: pass
def run_launcher_until(function, period=1.0, timeout=20.0):
def run_launcher_until(function, period=1.0, timeout=60.0):
launcher_proc = subprocess.Popen(['balsam', 'launcher', '--consume',
'--max-ranks-per-node', '8'],
stdout=subprocess.PIPE,
......@@ -39,8 +28,8 @@ def run_launcher_until(function, period=1.0, timeout=20.0):
# then you have two launchers from different test cases processing the same
# job...Very hard to catch bug.
os.killpg(os.getpgid(launcher_proc.pid), signal.SIGTERM) # Send the signal to all the process groups
launcher_proc.wait(timeout=5)
kill_stragglers()
time.sleep(10)
os.killpg(os.getpgid(launcher_proc.pid), signal.SIGKILL) # Send the signal to all the process groups
return success
def run_launcher_seconds(seconds):
......@@ -51,10 +40,12 @@ def run_launcher_seconds(seconds):
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid)
launcher_proc.communicate(timeout=seconds+5)
launcher_proc.communicate(timeout=seconds+30)
try: os.killpg(os.getpgid(launcher_proc.pid), signal.SIGKILL) # Send the signal to all the process groups
except: pass
def run_launcher_until_state(job, state, period=1.0, timeout=20.0):
def run_launcher_until_state(job, state, period=1.0, timeout=60.0):
def check():
job.refresh_from_db()
return job.state == state
......@@ -219,9 +210,9 @@ class TestSingleJobTransitions(BalsamTestCase):
with open(remote_path, 'w') as fp:
fp.write('9\n')
# Same as previous test, but square.py hangs for 300 sec
# Same as previous test, but square.py hangs for 10 sec
job = create_job(name='square_testjob2', app='square',
args='side0.dat --sleep 5',
args='side0.dat --sleep 10',
url_in=f'local:{remote_dir.name}', stage_out_files='square*',
url_out=f'local:{remote_dir.name}')
......@@ -239,7 +230,7 @@ class TestSingleJobTransitions(BalsamTestCase):
self.assertEquals(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
success = run_launcher_until_state(job, 'JOB_FINISHED')
success = run_launcher_until_state(job, 'JOB_FINISHED', timeout=60)
self.assertTrue(success)
self.assertIn('RESTART_READY', job.state_history)
......@@ -424,7 +415,7 @@ class TestDAG(BalsamTestCase):
return all(j.state == 'JOB_FINISHED' for j in BalsamJob.objects.all())
# Just check that all jobs reach JOB_FINISHED state
success = run_launcher_until(check, timeout=90.0)
success = run_launcher_until(check, timeout=360.0)
self.assertTrue(success)
def test_static(self):
......@@ -505,8 +496,9 @@ class TestDAG(BalsamTestCase):
chA.set_parents([parent])
chB.set_parents([parent])
# Run until A finishes, but B will still be hanging
# If a child times out and we re-run the launcher, everything is handled
success = run_launcher_until_state(chB, 'RUNNING')
success = run_launcher_until_state(chA, 'JOB_FINISHED')
self.assertTrue(success)
chB.refresh_from_db()
self.assertEqual(chB.state, 'RUN_TIMEOUT')
......@@ -541,7 +533,12 @@ class TestDAG(BalsamTestCase):
chB.set_parents([parent])
# child B will give a RUN_ERROR, but it will be handled
success = run_launcher_until_state(chB, 'JOB_FINISHED')
def check():
for j in (chA, chB, parent):
j.refresh_from_db()
return all(j.state=='JOB_FINISHED' for j in (parent,chA,chB))
success = run_launcher_until(check)
self.assertTrue(success)
parent.refresh_from_db()
......
......@@ -82,7 +82,7 @@ class TestMPIRunner(BalsamTestCase):
# Now wait for the job to finish
# On sucessful run, it should be RUN_DONE
poll_until_returns_true(runner.finished, period=0.5)
poll_until_returns_true(runner.finished, period=0.5, timeout=40)
self.assertTrue(runner.finished())
runner.update_jobs()
self.assertEquals(job.state, 'RUN_DONE')
......@@ -129,7 +129,9 @@ class TestMPIRunner(BalsamTestCase):
# Timeout the runner
# Now the job is marked as RUN_TIMEOUT
runner.timeout()
runner_group = runners.RunnerGroup(Lock())
runner_group.runners.append(runner)
runner_group.update_and_remove_finished(timeout=True)
self.assertEquals(job.state, 'RUN_TIMEOUT')
# A moment later, the runner process is indeed terminated
......@@ -167,9 +169,9 @@ class TestMPIEnsemble(BalsamTestCase):
}
args = {'normal' : '',
'fail' : '--retcode 1',
'timeout' : '--sleep 100'
'timeout' : '--sleep 25'
}
for jobtype in jobs:
for jobtype in 'qsub normal fail'.split():
for i in range(num_jobs_per_type):
if jobtype == 'qsub':
cmd = f'echo hello world {i}'
......@@ -184,6 +186,15 @@ class TestMPIEnsemble(BalsamTestCase):
shuffled_jobs = [j for joblist in jobs.values() for j in joblist]
random.shuffle(shuffled_jobs)
# We want to put timeout jobs at the end of this list
app, appargs = self.app.name, f"{i} {args['timeout']}"
for i in range(num_jobs_per_type):
job = create_job(name=f"timeout{i}", app=app,
direct_command='', args=appargs)
jobs['timeout'].append(job)
shuffled_jobs.append(job)
all_workers = list(self.worker_group)
runner = runners.MPIEnsembleRunner(shuffled_jobs, all_workers)
......@@ -202,14 +213,17 @@ class TestMPIEnsemble(BalsamTestCase):
error_done = all(j.state=='RUN_ERROR' for j in jobs['fail'])
return normal_done and qsub_done and error_done
finished = poll_until_returns_true(check_done, period=1, timeout=12)
finished = poll_until_returns_true(check_done, period=1, timeout=20)
self.assertTrue(finished)
# And the long-running jobs in the ensemble are still going:
self.assertTrue(all(j.state=='RUNNING' for j in jobs['timeout']))
# So we kill the runner. The timed-out jobs are marked accordingly
runner.timeout()
runner_group = runners.RunnerGroup(Lock())
runner_group.runners.append(runner)
runner_group.update_and_remove_finished(timeout=True)
self.assertTrue(all(j.state=='RUN_TIMEOUT' for j in jobs['timeout']))
# Double-check that the rest of the jobs are unaffected
......@@ -217,6 +231,12 @@ class TestMPIEnsemble(BalsamTestCase):
self.assertTrue(all(j.state=='RUN_DONE' for j in jobs['qsub']))
self.assertTrue(all(j.state=='RUN_ERROR' for j in jobs['fail']))
# Kill the sleeping jobs in case they do not terminate
killcmd = "ps aux | grep mock_serial | grep -v grep | grep -v vim | awk '{print $2}' | xargs kill -9"
os.system(killcmd)
killcmd = "ps aux | grep mpi_ensemble.py | grep -v grep | grep -v vim | awk '{print $2}' | xargs kill -9"
os.system(killcmd)
class TestRunnerGroup(BalsamTestCase):
def setUp(self):
......@@ -314,7 +334,7 @@ class TestRunnerGroup(BalsamTestCase):
runner_group.update_and_remove_finished()
return all(r.finished() for r in runner_group)
poll_until_returns_true(check_done)
poll_until_returns_true(check_done, timeout=40)
# Now there should be no runners, PKs, or busy workers left
self.assertListEqual(list(runner_group), [])
......
......@@ -32,7 +32,18 @@ class WorkerGroupUnitTests(BalsamTestCase):
if self.scheduler.host_type != 'CRAY':
self.skipTest('scheduler did not recognize Cray environment')
group = worker.WorkerGroup(config, host_type='CRAY',
workers_str=self.scheduler.workers_str)
workers_str=self.scheduler.workers_str,
workers_file=self.scheduler.workers_file)
if self.scheduler.workers_str:
num_worker_env = self.scheduler.SCHEDULER_VARIABLES['num_workers']
self.assertEqual(len(group.workers), int(os.environ[num_worker_env]))
def test_cooley(self):
'''Construct WorkerGroup from reading Cooley environment'''
config = get_args('--consume-all'.split())
if self.scheduler.host_type != 'COOLEY':
self.skipTest('scheduler did not recognize Cooley environment')
group = worker.WorkerGroup(config, host_type='COOLEY',
workers_str=self.scheduler.workers_str,
workers_file=self.scheduler.workers_file)
self.assertGreaterEqual(len(group.workers), 1)
......@@ -27,7 +27,7 @@ if USING_DB_LOGIN:
default_db = {}
default_db['ENGINE'] = 'django.db.backends.sqlite3'
default_db['NAME'] = os.path.join(INSTALL_PATH,'db.sqlite3')
default_db['OPTIONS'] = {'timeout' : 500000.0}
default_db['OPTIONS'] = {'timeout' : 5000.0}
if USING_DB_LOGIN:
default_db['USER'] = DBUSER
default_db['PASSWORD'] = DBPASS
......