Commit 492d5eab authored by Michael Salim's avatar Michael Salim
Browse files

Now jobs correctly set OMP_NUM_THREADS when threads_per_rank is greater

than 1.  Added Functional Test cases for running several hybrid
MPI/OpenMP jobs back-to-back from the launcher; exercising various
combinations of ranks-per-node, threads-per-rank, threads-per-core
(-d and -j options of aprun)

These tests will only run on Theta right now; can extend by recompiling
c_apps/omp.c on different platforms and referring to the appropriate
binary in the test case.
parent 2935d53a
......@@ -379,6 +379,10 @@ auto timeout retry: {self.auto_timeout_retry}
BALSAM_PARENT_IDS=str(self.parents),
BALSAM_CHILD_IDS=children,
)
if self.threads_per_rank > 1:
balsam_envs['OMP_NUM_THREADS'] = self.threads_per_rank
if timeout: balsam_envs['BALSAM_JOB_TIMEOUT']="TRUE"
if error: balsam_envs['BALSAM_JOB_ERROR']="TRUE"
envs.update(balsam_envs)
......
......@@ -45,8 +45,8 @@ def poll_until_returns_true(function, *, args=(), period=1.0, timeout=12.0):
return result
def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE, num_nodes=1,
ranks_per_node=1, args='', workflow='', envs={}, state='CREATED',
url_in='', input_files='', url_out='', stage_out_files='',
ranks_per_node=1, threads_per_rank=1, threads_per_core=1, args='', workflow='',
envs={}, state='CREATED', url_in='', input_files='', url_out='', stage_out_files='',
post_error_handler=False, post_timeout_handler=False,
auto_timeout_retry=True, preproc='', postproc='', wtime=1):
......@@ -62,6 +62,8 @@ def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE,
job.num_nodes = num_nodes
job.ranks_per_node = ranks_per_node
job.threads_per_rank = threads_per_rank
job.threads_per_core = threads_per_core
job.application_args = args
job.workflow = workflow
......
#include <stdio.h>
#include <omp.h>
#include <mpi.h>
int main()
{
MPI_Init(NULL, NULL);
int thread = 0, nthread = 1, rank = 0, nrank = 1;
MPI_Comm_size(MPI_COMM_WORLD, &nrank);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
char proc_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(proc_name, &name_len);
#pragma omp parallel default(shared) private(thread, nthread)
{
#if defined (_OPENMP)
nthread = omp_get_num_threads();
thread = omp_get_thread_num();
#endif
printf("%s %d %d\n", proc_name, rank, thread);
}
MPI_Finalize();
return 0;
}
from collections import defaultdict
import glob
import os
import random
import getpass
......@@ -711,7 +713,7 @@ class TestDAG(BalsamTestCase):
return all(j.state == 'JOB_FINISHED' for j in jobs)
# Everything finished successfully
success = run_launcher_until(check)
success = run_launcher_until(check, timeout=120)
self.assertTrue(success)
parent.refresh_from_db()
......@@ -791,3 +793,73 @@ class TestDAG(BalsamTestCase):
# Make sure that the correct number of dependencies were created for the
# reduce job: one for each dynamically-spawned job (plus the original)
self.assertEqual(reduce_job.get_parents().count(), NUM_SIDES+1)
class TestThreadPlacement(BalsamTestCase):
def setUp(self):
self.app_path = os.path.dirname(find_spec("tests.c_apps").origin)
self.app = create_app(name='omp')
self.job0 = create_job(name='job0', app='omp', num_nodes=2, ranks_per_node=32, threads_per_rank=2)
self.job1 = create_job(name='job1', app='omp', num_nodes=2, ranks_per_node=64, threads_per_rank=1)
self.job2 = create_job(name='job2', app='omp', num_nodes=1, ranks_per_node=2, threads_per_rank=64, threads_per_core=2)
def check_omp_exe_output(self, job):
fname = job.name + '.out'
out = job.read_file_in_workdir(fname)
proc_counts = defaultdict(int)
ranks = []
threads = []
for line in out.split('\n'):
dat = line.split()
if len(dat) == 3:
name, rank, thread = dat
proc_counts[name] += 1
ranks.append(int(rank))
threads.append(int(thread))
proc_names = proc_counts.keys()
self.assertEqual(len(proc_names), job.num_nodes)
if job.num_nodes == 2:
proc1, proc2 = proc_names
self.assertEqual(proc_counts[proc1], proc_counts[proc2])
expected_ranks = list(range(job.num_ranks)) * job.threads_per_rank
self.assertListEqual(sorted(ranks), sorted(expected_ranks))
expected_threads = list(range(job.threads_per_rank)) * job.num_ranks
self.assertListEqual(sorted(threads), sorted(expected_threads))
def test_Theta(self):
'''MPI/OMP C binary for Theta: check thread/rank placement'''
from balsam.service.schedulers import Scheduler
scheduler = Scheduler.scheduler_main
if scheduler.host_type != 'CRAY':
self.skipTest('did not recognize Cray environment')
if scheduler.num_workers < 2:
self.skipTest('need at least two nodes reserved to run this test')
binary = glob.glob(os.path.join(self.app_path, 'omp.theta.x'))
self.app.executable = binary[0]
self.app.save()
def check():
jobs = BalsamJob.objects.all()
return all(j.state == 'JOB_FINISHED' for j in jobs)
run_launcher_until(check)
self.job0.refresh_from_db()
self.job1.refresh_from_db()
self.job2.refresh_from_db()
self.assertEqual(self.job0.state, 'JOB_FINISHED')
self.assertEqual(self.job1.state, 'JOB_FINISHED')
self.assertEqual(self.job2.state, 'JOB_FINISHED')
# Check output of dummy MPI/OpenMP C program
self.check_omp_exe_output(self.job0)
self.check_omp_exe_output(self.job1)
self.check_omp_exe_output(self.job2)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment