Commit cc97e189 authored by Michael Salim's avatar Michael Salim
Browse files

bug fixes, reorganized launcher create_runner

parent e9047ffc
...@@ -81,10 +81,44 @@ def log_time(minutes_left): ...@@ -81,10 +81,44 @@ def log_time(minutes_left):
time_str = f"{whole_minutes:02d} min : {whole_seconds:02d} sec remaining" time_str = f"{whole_minutes:02d} min : {whole_seconds:02d} sec remaining"
logger.info(time_str) logger.info(time_str)
def create_runner(jobs, runner_group, worker_group, remaining_minutes, last_runner_created):
runnable_jobs = [
job for job in jobs
if job.pk not in runner_group.running_job_pks and
job.state in RUNNABLE_STATES and
job.wall_time_minutes <= remaining_minutes
]
logger.debug(f"Have {len(runnable_jobs)} runnable jobs")
# If nothing is getting pre-processed, don't bother waiting
almost_runnable = any(j.state in ALMOST_RUNNABLE_STATES for j in jobs)
# If it has been runner_create_period seconds, don't wait any more
runner_create_period = settings.BALSAM_RUNNER_CREATION_PERIOD_SEC
now = time.time()
runner_ready = bool(now - last_runner_created > runner_create_period)
# If there are enough serial jobs, don't wait to run
num_serial = len([j for j in runnable_jobs if j.num_ranks == 1])
worker = worker_group[0]
max_serial_per_ensemble = 2 * worker.num_nodes * worker.max_ranks_per_node
ensemble_ready = (num_serial >= max_serial_per_ensemble) or (num_serial == 0)
if runnable_jobs:
if runner_ready or not almost_runnable or ensemble_ready:
try:
runner_group.create_next_runner(runnable_jobs, worker_group)
except ExceededMaxRunners:
logger.info("Exceeded max concurrent runners; waiting")
except NoAvailableWorkers:
logger.info("Not enough idle workers to start any new runs")
else:
last_runner_created = now
return last_runner_created
def main(args, transition_pool, runner_group, job_source): def main(args, transition_pool, runner_group, job_source):
delay_sleeper = delay_generator() delay_sleeper = delay_generator()
runner_create_period = settings.BALSAM_RUNNER_CREATION_PERIOD_SEC
last_runner_created = time.time() last_runner_created = time.time()
remaining_timer = remaining_time_minutes(args.time_limit_minutes) remaining_timer = remaining_time_minutes(args.time_limit_minutes)
...@@ -124,32 +158,9 @@ def main(args, transition_pool, runner_group, job_source): ...@@ -124,32 +158,9 @@ def main(args, transition_pool, runner_group, job_source):
job_source.refresh_from_db() job_source.refresh_from_db()
# Decide whether or not to start a new runner # Decide whether or not to start a new runner
runnable_jobs = [ last_runner_created = create_runner(job_source.jobs, runner_group,
job for job in job_source.jobs worker_group, remaining_minutes,
if job.pk not in runner_group.running_job_pks and last_runner_created)
job.state in RUNNABLE_STATES and
job.wall_time_minutes <= remaining_minutes
]
logger.debug(f"Have {len(runnable_jobs)} runnable jobs")
almost_runnable = any(j.state in ALMOST_RUNNABLE_STATES for j in job_source.jobs)
now = time.time()
runner_ready = bool(now - last_runner_created > runner_create_period)
num_serial = len([j for j in runnable_jobs if j.num_ranks == 1])
worker = worker_group[0]
max_serial_per_ensemble = 2 * worker.num_nodes * worker.max_ranks_per_node
ensemble_ready = (num_serial >= max_serial_per_ensemble) or (num_serial == 0)
if runnable_jobs:
if runner_ready or not almost_runnable or ensemble_ready:
try:
runner_group.create_next_runner(runnable_jobs, worker_group)
except ExceededMaxRunners:
logger.info("Exceeded max concurrent runners; waiting")
except NoAvailableWorkers:
logger.info("Not enough idle workers to start any new runs")
else:
last_runner_created = now
if delay: next(delay_sleeper) if delay: next(delay_sleeper)
......
class DEFAULTMPICommand(object): import subprocess
import logging
logger = logging.getLogger(__name__)
class MPICommand(object):
def __init__(self):
self.mpi = ''
self.nproc = ''
self.ppn = ''
self.env = ''
self.cpu_binding = None
self.threads_per_rank = None
self.threads_per_core = None
def worker_str(self, workers):
return ""
def env_str(self, envs):
envstrs = (f'{self.env} {var}="{val}"' for var,val in envs.items())
return " ".join(envstrs)
def threads(self, thread_per_rank, thread_per_core):
result= ""
if self.cpu_binding:
result += f"{self.cpu_binding} "
if self.threads_per_rank:
result += f"{self.threads_per_rank} {thread_per_rank} "
if self.threads_per_core:
result += f"{self.threads_per_core} {thread_per_core} "
return result
def __call__(self, workers, *, app_cmd, num_ranks, ranks_per_node, envs,threads_per_rank=1,threads_per_core=1):
'''Build the mpirun/aprun/runjob command line string'''
workers = self.worker_str(workers)
envs = self.env_str(envs)
thread_str = self.threads(threads_per_rank, threads_per_core)
result = (f"{self.mpi} {self.nproc} {num_ranks} {self.ppn} "
f"{ranks_per_node} {envs} {workers} {thread_str} {app_cmd}")
return result
class OPENMPICommand(MPICommand):
'''Single node OpenMPI: ppn == num_ranks''' '''Single node OpenMPI: ppn == num_ranks'''
def __init__(self): def __init__(self):
self.mpi = 'mpirun' self.mpi = 'mpirun'
...@@ -36,7 +76,7 @@ class DEFAULTMPICommand(object): ...@@ -36,7 +76,7 @@ class DEFAULTMPICommand(object):
return result return result
class BGQMPICommand(DEFAULTMPICommand): class BGQMPICommand(MPICommand):
def __init__(self): def __init__(self):
self.mpi = 'runjob' self.mpi = 'runjob'
self.nproc = '--np' self.nproc = '--np'
...@@ -53,7 +93,7 @@ class BGQMPICommand(DEFAULTMPICommand): ...@@ -53,7 +93,7 @@ class BGQMPICommand(DEFAULTMPICommand):
shape, block, corner = worker.shape, worker.block, worker.corner shape, block, corner = worker.shape, worker.block, worker.corner
return f"--shape {shape} --block {block} --corner {corner} " return f"--shape {shape} --block {block} --corner {corner} "
class CRAYMPICommand(DEFAULTMPICommand): class CRAYMPICommand(MPICommand):
def __init__(self): def __init__(self):
# 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1 # 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1
self.mpi = 'aprun' self.mpi = 'aprun'
...@@ -69,7 +109,7 @@ class CRAYMPICommand(DEFAULTMPICommand): ...@@ -69,7 +109,7 @@ class CRAYMPICommand(DEFAULTMPICommand):
return "" return ""
return f"-L {','.join(str(worker.id) for worker in workers)}" return f"-L {','.join(str(worker.id) for worker in workers)}"
class COOLEYMPICommand(DEFAULTMPICommand): class COOLEYMPICommand(MPICommand):
def __init__(self): def __init__(self):
# 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1 # 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1
self.mpi = 'mpirun' self.mpi = 'mpirun'
...@@ -84,3 +124,13 @@ class COOLEYMPICommand(DEFAULTMPICommand): ...@@ -84,3 +124,13 @@ class COOLEYMPICommand(DEFAULTMPICommand):
if not workers: if not workers:
return "" return ""
return f"--hosts {','.join(str(worker.id) for worker in workers)} " return f"--hosts {','.join(str(worker.id) for worker in workers)} "
try_mpich = subprocess.Popen(['mpirun', '-npernode'], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout, _ = try_mpich.communicate()
if 'unrecognized argument npernode' in stdout.decode():
logger.debug("Assuming MPICH")
DEFAULTMPICommand = COOLEYMPICommand
else:
logger.debug("Assuming OpenMPI")
DEFAULTMPICommand = OPENMPICommand
...@@ -12,12 +12,12 @@ from .BalsamTestCase import create_job, create_app ...@@ -12,12 +12,12 @@ from .BalsamTestCase import create_job, create_app
from django.conf import settings from django.conf import settings
from balsam.schedulers import Scheduler from balsam.service.schedulers import Scheduler
from balsam.models import BalsamJob from balsam.service.models import BalsamJob
from balsam.launcher import worker from balsam.launcher import worker
from balsam.launcher import runners from balsam.launcher import runners
from balsam.launcher.launcher import get_args, create_new_runners from balsam.launcher.launcher import get_args, create_runner
class TestMPIRunner(BalsamTestCase): class TestMPIRunner(BalsamTestCase):
...@@ -260,8 +260,8 @@ class TestRunnerGroup(BalsamTestCase): ...@@ -260,8 +260,8 @@ class TestRunnerGroup(BalsamTestCase):
" number", executable=app_path) " number", executable=app_path)
def test_create_runners(self): def test_create_runner(self):
'''sanity check launcher.create_new_runners() '''sanity check launcher.create_runner()
Don't test implementation details here; just ensuring consistency''' Don't test implementation details here; just ensuring consistency'''
num_workers = len(self.worker_group) num_workers = len(self.worker_group)
num_nodes = sum(w.num_nodes for w in self.worker_group) num_nodes = sum(w.num_nodes for w in self.worker_group)
...@@ -294,11 +294,12 @@ class TestRunnerGroup(BalsamTestCase): ...@@ -294,11 +294,12 @@ class TestRunnerGroup(BalsamTestCase):
running_pks = runner_group.running_job_pks running_pks = runner_group.running_job_pks
self.assertListEqual(running_pks, []) self.assertListEqual(running_pks, [])
# Invoke create_new_runners once # Invoke create_runner once
# Some set of jobs will start running under control of the RunnerGroup # Some set of jobs will start running under control of the RunnerGroup
# Nondeterministic, due to random() used above, but we just want to # Nondeterministic, due to random() used above, but we just want to
# check for consistency # check for consistency
create_new_runners(all_jobs, runner_group, self.worker_group) create_runner(all_jobs, runner_group, self.worker_group,
remaining_minutes=100, last_runner_created=0)
# Get the list of running PKs from the RunnerGroup # Get the list of running PKs from the RunnerGroup
# At least some jobs are running nwo # At least some jobs are running nwo
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment