Commit 2459d2ec authored by Michael Salim's avatar Michael Salim
Browse files

runner tests & mpi_command PPN bugfix

parent 2d354b85
......@@ -32,7 +32,7 @@ class DEFAULTMPICommand(object):
envs = self.env_str(envs)
thread_str = self.threads(threads_per_rank, threads_per_core)
result = (f"{self.mpi} {self.nproc} {num_ranks} {self.ppn} "
f"{num_ranks} {envs} {workers} {thread_str} {app_cmd}")
f"{ranks_per_node} {envs} {workers} {thread_str} {app_cmd}")
return result
......
......@@ -21,7 +21,7 @@ class TestMPIRunner(BalsamTestCase):
scheduler = Scheduler.scheduler_main
self.host_type = scheduler.host_type
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 2 --max-ranks-per-node 4'.split())
config = get_args('--consume-all --num-workers 1 --max-ranks-per-node 4'.split())
else:
config = get_args('--consume-all')
......@@ -46,7 +46,7 @@ class TestMPIRunner(BalsamTestCase):
self.assertSetEqual(set(range(n)), set(found))
def testMPIRunner_passes(self):
# Test:
# Test various worker configurations:
work_configs = []
WorkerConfig = namedtuple('WorkerConfig', ['workers', 'num_nodes',
'ranks_per_node'])
......@@ -64,19 +64,18 @@ class TestMPIRunner(BalsamTestCase):
node0.max_ranks_per_node)
work_configs.append(cfg)
for i, (workers, num_nodes, rpn) in enumerate(work_configs):
for i, (workerslist, num_nodes, rpn) in enumerate(work_configs):
job = BalsamJob()
job.name = f"test{i}"
job.application = "mock_mpi"
job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = 1
job.ranks_per_node = 2
job.num_nodes = num_nodes
job.ranks_per_node = rpn
job.save()
self.assertEquals(job.state, 'CREATED')
job.create_working_path()
workers = self.worker_group[0]
runner = runners.MPIRunner([job], [workers])
runner = runners.MPIRunner([job], workerslist)
runner.start()
runner.update_jobs()
while not runner.finished():
......@@ -86,15 +85,52 @@ class TestMPIRunner(BalsamTestCase):
runner.update_jobs()
self.assertEquals(job.state, 'RUN_DONE')
outpath = os.path.join(job.working_directory, f"test{i}.out")
self.assertEqual(outpath, runner.outfile.name)
outpath = runner.outfile.name
with open(outpath) as fp:
self.assert_output_file_contains_n_ranks(fp, 2)
self.assert_output_file_contains_n_ranks(fp, num_nodes*rpn)
def testMPIRunner_fails(self):
# ensure correct when job returns nonzero
pass
work_configs = []
WorkerConfig = namedtuple('WorkerConfig', ['workers', 'num_nodes',
'ranks_per_node'])
# 2 ranks on one node
node0 = self.worker_group[0]
cfg = WorkerConfig([node0], 1, 2)
work_configs.append(cfg)
# max ranks on one node
cfg = WorkerConfig([node0], 1, node0.max_ranks_per_node)
work_configs.append(cfg)
# max ranks on all nodes
cfg = WorkerConfig(list(self.worker_group), len(self.worker_group),
node0.max_ranks_per_node)
work_configs.append(cfg)
for i, (workerslist, num_nodes, rpn) in enumerate(work_configs):
job = BalsamJob()
job.name = f"test{i}"
job.application = "mock_mpi"
job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = num_nodes
job.ranks_per_node = rpn
job.application_args = '--retcode 255'
job.save()
self.assertEquals(job.state, 'CREATED')
job.create_working_path()
workers = self.worker_group[0]
runner = runners.MPIRunner([job], workerslist)
runner.start()
runner.update_jobs()
while not runner.finished():
self.assertEquals(job.state, 'RUNNING')
runner.update_jobs()
time.sleep(0.5)
runner.update_jobs()
self.assertEquals(job.state, 'RUN_ERROR')
def testMPIRunner_timeouts(self):
# ensure correct when longr-running job times out
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment