Commit 1a6a0526 authored by Michael Salim's avatar Michael Salim
Browse files

added more runner tests and improved logging in MPIEnsemble

parent 2459d2ec
...@@ -12,6 +12,7 @@ from subprocess import Popen, STDOUT ...@@ -12,6 +12,7 @@ from subprocess import Popen, STDOUT
from mpi4py import MPI from mpi4py import MPI
from balsamlauncher.cd import cd from balsamlauncher.cd import cd
from balsamlauncher.exceptions import * from balsamlauncher.exceptions import *
from balsamlauncher.runners import get_tail
COMM = MPI.COMM_WORLD COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank() RANK = COMM.Get_rank()
...@@ -47,11 +48,19 @@ def run(job): ...@@ -47,11 +48,19 @@ def run(job):
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT, cwd=job.workdir) proc = Popen(job.cmd, stdout=outf, stderr=STDOUT, cwd=job.workdir)
retcode = proc.wait() retcode = proc.wait()
except Exception as e: except Exception as e:
logger.exception(f"mpi_ensemble rank {RANK} job {job.id}: exception during Popen")
status_msg(job.id, "FAILED", msg=str(e)) status_msg(job.id, "FAILED", msg=str(e))
raise MPIEnsembleError from e raise MPIEnsembleError from e
else: else:
if retcode == 0: status_msg(job.id, "RUN_DONE") if retcode == 0:
else: status_msg(job.id, "RUN_ERROR", msg=f"process return code {retcode}") logger.debug(f"mpi_ensemble rank {RANK}: job returned 0")
status_msg(job.id, "RUN_DONE")
else:
outf.flush()
tail = get_tail(outf.name).replace('\n', '\\n')
msg = f"NONZERO RETURN {retcode}: {tail}"
status_msg(job.id, "RUN_ERROR", msg=msg)
logger.debug(f"mpi_ensemble rank {RANK} job {job.id} {msg}")
finally: finally:
proc.kill() proc.kill()
......
...@@ -32,7 +32,7 @@ MPI_ENSEMBLE_EXE = find_spec("balsamlauncher.mpi_ensemble").origin ...@@ -32,7 +32,7 @@ MPI_ENSEMBLE_EXE = find_spec("balsamlauncher.mpi_ensemble").origin
def get_tail(fname, nlines=5, indent=' '): def get_tail(fname, nlines=5, indent=' '):
proc = Popen(f'tail -n {nlines} {fname}'.split(),stdout=PIPE, proc = Popen(f'tail -n {nlines} {fname}'.split(),stdout=PIPE,
stderr=STDOUT) stderr=STDOUT)
tail = str(proc.communicate()[0]) tail = proc.communicate()[0].decode()
lines = tail.split('\n') lines = tail.split('\n')
for i, line in enumerate(lines[:]): for i, line in enumerate(lines[:]):
lines[i] = indent + line lines[i] = indent + line
...@@ -190,15 +190,14 @@ class MPIEnsembleRunner(Runner): ...@@ -190,15 +190,14 @@ class MPIEnsembleRunner(Runner):
logger.debug("Checking mpi_ensemble stdout for status updates...") logger.debug("Checking mpi_ensemble stdout for status updates...")
for line in self.monitor.available_lines(): for line in self.monitor.available_lines():
logger.debug(f"mpi_ensemble stdout: {line.strip()}")
pk, state, *msg = line.split() pk, state, *msg = line.split()
msg = ' '.join(msg) msg = ' '.join(msg)
if pk in self.jobs_by_pk and state in balsam.models.STATES: if pk in self.jobs_by_pk and state in balsam.models.STATES:
job = self.jobs_by_pk[pk] job = self.jobs_by_pk[pk]
job.update_state(state, msg) # TODO: handle RecordModified exception job.update_state(state, msg) # TODO: handle RecordModified exception
logger.debug(f"MPIEnsemble job {job.cute_id} updated to {state}") logger.debug(f"Job {job.cute_id} updated to {state}: {msg}")
else: else:
logger.error(f"Invalid status update: {line.strip()}") logger.error(f"Invalid statusMsg from mpi_ensemble: {line.strip()}")
class RunnerGroup: class RunnerGroup:
...@@ -251,7 +250,7 @@ class RunnerGroup: ...@@ -251,7 +250,7 @@ class RunnerGroup:
# If there are not enough serial jobs; run the larger of: # If there are not enough serial jobs; run the larger of:
# largest MPI job that fits, or the remaining serial jobs # largest MPI job that fits, or the remaining serial jobs
if nserial >= nidle_ranks: if nserial >= nidle_ranks:
jobs = serial_jobs[:nidle_ranks] jobs = serial_jobs[:nidle_ranks] # TODO: try putting ALL serial jobs into one MPIEnsemble
assigned_workers = idle_workers assigned_workers = idle_workers
runner_class = MPIEnsembleRunner runner_class = MPIEnsembleRunner
logger.info(f"Running {len(jobs)} serial jobs on {nidle_workers} workers " logger.info(f"Running {len(jobs)} serial jobs on {nidle_workers} workers "
......
import time
from sys import exit
import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('number', type=int)
parser.add_argument('--sleep', type=int, default=0)
parser.add_argument('--retcode', type=int, default=0)
args = parser.parse_args()
print(args.number**2)
if args.sleep:
time.sleep(args.sleep)
exit(args.retcode)
from collections import namedtuple from collections import namedtuple
import os import os
import random
import sys import sys
import time import time
from importlib.util import find_spec from importlib.util import find_spec
...@@ -10,11 +11,18 @@ from django.conf import settings ...@@ -10,11 +11,18 @@ from django.conf import settings
from balsam.schedulers import Scheduler from balsam.schedulers import Scheduler
from balsam.models import BalsamJob, ApplicationDefinition from balsam.models import BalsamJob, ApplicationDefinition
from balsamlauncher import jobreader
from balsamlauncher import worker from balsamlauncher import worker
from balsamlauncher import runners from balsamlauncher import runners
from balsamlauncher.launcher import get_args, create_new_runners from balsamlauncher.launcher import get_args, create_new_runners
def poll_until_returns_true(function, *, args=(), period=1.0, timeout=12.0):
start = time.time()
while time.time() - start < timeout:
result = function(*args)
if result: break
else: time.sleep(period)
return result
class TestMPIRunner(BalsamTestCase): class TestMPIRunner(BalsamTestCase):
'''start, update_jobs, finished, error/timeout handling''' '''start, update_jobs, finished, error/timeout handling'''
def setUp(self): def setUp(self):
...@@ -28,7 +36,6 @@ class TestMPIRunner(BalsamTestCase): ...@@ -28,7 +36,6 @@ class TestMPIRunner(BalsamTestCase):
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type, self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str, workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file) workers_file=scheduler.workers_file)
self.job_source = jobreader.JobReader.from_config(config)
app_path = f"{sys.executable} {find_spec('tests.mock_mpi_app').origin}" app_path = f"{sys.executable} {find_spec('tests.mock_mpi_app').origin}"
self.app = ApplicationDefinition() self.app = ApplicationDefinition()
...@@ -36,38 +43,39 @@ class TestMPIRunner(BalsamTestCase): ...@@ -36,38 +43,39 @@ class TestMPIRunner(BalsamTestCase):
self.app.description = "print and sleep" self.app.description = "print and sleep"
self.app.executable = app_path self.app.executable = app_path
self.app.save() self.app.save()
def assert_output_file_contains_n_ranks(self, fp, n):
'''specific check of mock_mpi_app.py output'''
found = []
for line in fp:
found.append(int(line.split()[1]))
self.assertSetEqual(set(range(n)), set(found))
def testMPIRunner_passes(self):
# Test various worker configurations: # Test various worker configurations:
work_configs = [] self.work_configs = []
WorkerConfig = namedtuple('WorkerConfig', ['workers', 'num_nodes', WorkerConfig = namedtuple('WorkerConfig', ['workers', 'num_nodes',
'ranks_per_node']) 'ranks_per_node'])
# 2 ranks on one node # 2 ranks on one node
node0 = self.worker_group[0] node0 = self.worker_group[0]
cfg = WorkerConfig([node0], 1, 2) cfg = WorkerConfig([node0], 1, 2)
work_configs.append(cfg) self.work_configs.append(cfg)
# max ranks on one node # max ranks on one node
cfg = WorkerConfig([node0], 1, node0.max_ranks_per_node) cfg = WorkerConfig([node0], 1, node0.max_ranks_per_node)
work_configs.append(cfg) self.work_configs.append(cfg)
# max ranks on all nodes # max ranks on all nodes
cfg = WorkerConfig(list(self.worker_group), len(self.worker_group), cfg = WorkerConfig(list(self.worker_group), len(self.worker_group),
node0.max_ranks_per_node) node0.max_ranks_per_node)
work_configs.append(cfg) self.work_configs.append(cfg)
for i, (workerslist, num_nodes, rpn) in enumerate(work_configs):
def assert_output_file_contains_n_ranks(self, fp, n):
'''specific check of mock_mpi_app.py output'''
found = []
for line in fp:
found.append(int(line.split()[1]))
self.assertSetEqual(set(range(n)), set(found))
def test_normal(self):
'''MPI application runs, returns 0, marked RUN_DONE'''
for i, (workerslist, num_nodes, rpn) in enumerate(self.work_configs):
job = BalsamJob() job = BalsamJob()
job.name = f"test{i}" job.name = f"test{i}"
job.application = "mock_mpi" job.application = self.app.name
job.allowed_work_sites = settings.BALSAM_SITE job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = num_nodes job.num_nodes = num_nodes
job.ranks_per_node = rpn job.ranks_per_node = rpn
...@@ -76,75 +84,166 @@ class TestMPIRunner(BalsamTestCase): ...@@ -76,75 +84,166 @@ class TestMPIRunner(BalsamTestCase):
job.create_working_path() job.create_working_path()
runner = runners.MPIRunner([job], workerslist) runner = runners.MPIRunner([job], workerslist)
# Start the job and update state right away
# If it didn't finish too fast, it should now be RUNNING
runner.start() runner.start()
runner.update_jobs() runner.update_jobs()
while not runner.finished(): if not runner.finished():
self.assertEquals(job.state, 'RUNNING') self.assertEquals(job.state, 'RUNNING')
runner.update_jobs()
time.sleep(0.5) # Now wait for the job to finish
# On sucessful run, it should be RUN_DONE
poll_until_returns_true(runner.finished, period=0.5)
self.assertTrue(runner.finished())
runner.update_jobs() runner.update_jobs()
self.assertEquals(job.state, 'RUN_DONE') self.assertEquals(job.state, 'RUN_DONE')
# Check that the correct output is really there:
outpath = runner.outfile.name outpath = runner.outfile.name
with open(outpath) as fp: with open(outpath) as fp:
self.assert_output_file_contains_n_ranks(fp, num_nodes*rpn) self.assert_output_file_contains_n_ranks(fp, num_nodes*rpn)
def testMPIRunner_fails(self): def test_return_nonzero(self):
# ensure correct when job returns nonzero '''MPI application runs, return 255, marked RUN_ERROR'''
work_configs = [] for i, (workerslist, num_nodes, rpn) in enumerate(self.work_configs):
WorkerConfig = namedtuple('WorkerConfig', ['workers', 'num_nodes',
'ranks_per_node'])
# 2 ranks on one node
node0 = self.worker_group[0]
cfg = WorkerConfig([node0], 1, 2)
work_configs.append(cfg)
# max ranks on one node
cfg = WorkerConfig([node0], 1, node0.max_ranks_per_node)
work_configs.append(cfg)
# max ranks on all nodes
cfg = WorkerConfig(list(self.worker_group), len(self.worker_group),
node0.max_ranks_per_node)
work_configs.append(cfg)
for i, (workerslist, num_nodes, rpn) in enumerate(work_configs):
job = BalsamJob() job = BalsamJob()
job.name = f"test{i}" job.name = f"test{i}"
job.application = "mock_mpi" job.application = self.app.name
job.allowed_work_sites = settings.BALSAM_SITE job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = num_nodes job.num_nodes = num_nodes
job.ranks_per_node = rpn job.ranks_per_node = rpn
job.application_args = '--retcode 255' job.application_args = '--retcode 255' # FAIL
job.save() job.save()
self.assertEquals(job.state, 'CREATED') self.assertEquals(job.state, 'CREATED')
job.create_working_path() job.create_working_path()
workers = self.worker_group[0]
runner = runners.MPIRunner([job], workerslist) runner = runners.MPIRunner([job], workerslist)
runner.start() runner.start()
runner.update_jobs()
while not runner.finished(): poll_until_returns_true(runner.finished, period=0.5)
self.assertEquals(job.state, 'RUNNING')
runner.update_jobs()
time.sleep(0.5)
runner.update_jobs() runner.update_jobs()
self.assertEquals(job.state, 'RUN_ERROR') self.assertEquals(job.state, 'RUN_ERROR')
def testMPIRunner_timeouts(self): def test_timeouts(self):
# ensure correct when longr-running job times out '''MPI application runs for too long, call timeout, marked RUN_TIMEOUT'''
pass for i, (workerslist, num_nodes, rpn) in enumerate(self.work_configs):
job = BalsamJob()
job.name = f"test{i}"
job.application = self.app.name
job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = num_nodes
job.ranks_per_node = rpn
job.application_args = '--sleep 10' # runs for too long
job.save()
self.assertEquals(job.state, 'CREATED')
job.create_working_path()
runner = runners.MPIRunner([job], workerslist)
# job starts running; sleeps for 10 seconds
runner.start()
runner.update_jobs()
self.assertEquals(job.state, 'RUNNING')
# we wait just 2 seconds and the job is still going
time.sleep(2)
self.assertEquals(job.state, 'RUNNING')
# Timeout the runner
# Now the job is marked as RUN_TIMEOUT
runner.timeout()
self.assertEquals(job.state, 'RUN_TIMEOUT')
# A moment later, the runner process is indeed terminated
term = poll_until_returns_true(runner.finished, period=0.1,
timeout=6.0)
self.assertTrue(term)
class TestMPIEnsemble: class TestMPIEnsemble(BalsamTestCase):
def setUp(self): def setUp(self):
pass scheduler = Scheduler.scheduler_main
self.host_type = scheduler.host_type
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 1 --max-ranks-per-node 4'.split())
else:
config = get_args('--consume-all')
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file)
app_path = f"{sys.executable} {find_spec('tests.mock_serial_app').origin}"
self.app = ApplicationDefinition()
self.app.name = "mock_serial"
self.app.description = "square a number"
self.app.executable = app_path
self.app.save()
def testMPIEnsembleRunner(self): def testMPIEnsembleRunner(self):
'''Several non-MPI jobs packaged into one mpi4py wrapper''' '''Several non-MPI jobs packaged into one mpi4py wrapper'''
# Some jobs will pass; some will fail; some will timeout num_ranks = sum(w.num_nodes*w.max_ranks_per_node for w in
pass self.worker_group)
num_jobs_per_type = num_ranks // 3
jobs = {'qsub' : [], # these have no AppDef, will run ok
'normal':[], # these will succeed as well
'fail' :[], # these should be RUN_ERROR
'timeout':[] # these should be RUN_TIMEOUT
}
args = {'normal' : '',
'fail' : '--retcode 1',
'timeout' : '--sleep 100'
}
for jobtype in jobs:
for i in range(num_jobs_per_type):
job = BalsamJob()
job.allowed_work_sites = settings.BALSAM_SITE
job.name = f"{jobtype}{i}"
if jobtype == 'qsub':
job.direct_command = f'echo hello world {i}'
else:
job.application = self.app.name
job.application_args = f"{i} {args[jobtype]}"
job.save()
job.create_working_path()
jobs[jobtype].append(job)
shuffled_jobs = [j for joblist in jobs.values() for j in joblist]
random.shuffle(shuffled_jobs)
all_workers = list(self.worker_group)
runner = runners.MPIEnsembleRunner(shuffled_jobs, all_workers)
for job in shuffled_jobs:
self.assertEqual(job.state, 'CREATED')
# start the ensemble
runner.start()
# All of the qsub, normal, and fail jobs should be done quickly
# Let's give it up to 12 seconds, checking once a second
def check_done():
runner.update_jobs()
normal_done = all(j.state=='RUN_DONE' for j in jobs['normal'])
qsub_done = all(j.state=='RUN_DONE' for j in jobs['qsub'])
error_done = all(j.state=='RUN_ERROR' for j in jobs['fail'])
return normal_done and qsub_done and error_done
finished = poll_until_returns_true(check_done, period=1, timeout=12)
self.assertTrue(finished)
# And the long-running jobs in the ensemble are still going:
self.assertTrue(all(j.state=='RUNNING' for j in jobs['timeout']))
# So we kill the runner. The timed-out jobs are marked accordingly
runner.timeout()
self.assertTrue(all(j.state=='RUN_TIMEOUT' for j in jobs['timeout']))
# Double-check that the rest of the jobs are unaffected
self.assertTrue(all(j.state=='RUN_DONE' for j in jobs['normal']))
self.assertTrue(all(j.state=='RUN_DONE' for j in jobs['qsub']))
self.assertTrue(all(j.state=='RUN_ERROR' for j in jobs['fail']))
class TestRunnerGroup: class TestRunnerGroup:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment