Commit 2d354b85 authored by Michael Salim's avatar Michael Salim

MPIRunner unit tests and bugfixes

parent 13d7915a
......@@ -306,8 +306,8 @@ auto timeout retry: {self.auto_timeout_retry}
@property
def app_cmd(self):
if self.application:
app = ApplicationDefinition.objects.get(name=job.application)
return f"{app.executable} {app.application_args}"
app = ApplicationDefinition.objects.get(name=self.application)
return f"{app.executable} {self.application_args}"
else:
return self.direct_command
......@@ -345,7 +345,9 @@ auto timeout retry: {self.auto_timeout_retry}
return {variable:value for (variable,value) in entries}
def get_envs(self, *, timeout=False, error=False):
envs = os.environ.copy()
keywords = 'PATH LIBRARY BALSAM DJANGO PYTHON'.split()
envs = {var:value for var,value in os.environ.items()
if any(keyword in var for keyword in keywords)}
try:
app = self.get_application()
except NoApplication:
......@@ -402,10 +404,15 @@ auto timeout retry: {self.auto_timeout_retry}
top = os.path.join(top, self.workflow)
name = self.name.replace(' ', '_')
path = os.path.join(top, name)
if os.path.exists(path): path += "_"
for char in str(self.job_id):
path += char
if not os.path.exists(path): break
if os.path.exists(path):
jid = str(self.job_id)
path += "_" + jid[0]
i = 1
while os.path.exists(path):
path += jid[i]
i += 1
os.makedirs(path)
self.working_directory = path
self.save(update_fields=['working_directory'])
......
......@@ -83,8 +83,9 @@ class Scheduler:
self.remaining_seconds -= elapsed_time
self.last_check_seconds = now
return self.remaining_seconds
else:
sched_id = self.current_scheduler_id
sched_id = self.current_scheduler_id
if sched_id is None:
return float("inf")
try:
......
......@@ -39,8 +39,8 @@ children = None
_envs = {k:v for k,v in os.environ.items() if k.find('BALSAM')>=0}
JOB_ID = _envs.get('BALSAM_JOB_ID', '')
TIMEOUT = bool(_envs.get('BALSAM_JOB_TIMEOUT', False))
ERROR = bool(_envs.get('BALSAM_JOB_ERROR', False))
TIMEOUT = _envs.get('BALSAM_JOB_TIMEOUT', False) == "TRUE"
ERROR = _envs.get('BALSAM_JOB_ERROR', False) == "TRUE"
if JOB_ID:
JOB_ID = uuid.UUID(JOB_ID)
......
......@@ -78,7 +78,9 @@ def main(args, transition_pool, runner_group, job_source):
timeout = lambda : scheduler.remaining_time_seconds() <= 0.0
while not timeout():
logger.debug("\n************\nSERVICE LOOP\n************")
logger.debug("\n******************\n"
"BEGIN SERVICE LOOP\n"
"******************")
wait = True
for stat in transition_pool.get_statuses():
......
......@@ -29,6 +29,15 @@ logger = logging.getLogger(__name__)
from importlib.util import find_spec
MPI_ENSEMBLE_EXE = find_spec("balsamlauncher.mpi_ensemble").origin
def get_tail(fname, nlines=5, indent=' '):
proc = Popen(f'tail -n {nlines} {fname}'.split(),stdout=PIPE,
stderr=STDOUT)
tail = str(proc.communicate()[0])
lines = tail.split('\n')
for i, line in enumerate(lines[:]):
lines[i] = indent + line
return '\n'.join(lines)
class MonitorStream(Thread):
'''Thread: non-blocking read of a process's stdout'''
......@@ -98,6 +107,9 @@ class MPIRunner(Runner):
tpr = job.threads_per_rank
tpc = job.threads_per_core
# Note that environment variables are passed through the MPI run command
# line, rather than Popen directly, due to ALCF restrictions:
# https://www.alcf.anl.gov/user-guides/running-jobs-xc40#environment-variables
mpi_str = self.mpi_cmd(worker_list, app_cmd=app_cmd, envs=envs,
num_ranks=nranks, ranks_per_node=rpn,
threads_per_rank=tpr, threads_per_core=tpc)
......@@ -111,6 +123,7 @@ class MPIRunner(Runner):
self.popen_args['stderr'] = STDOUT
self.popen_args['bufsize'] = 1
logger.info(f"MPI Runner Popen args: {self.popen_args['args']}")
logger.info(f"MPI Runner: writing output to {outname}")
def update_jobs(self):
job = self.jobs[0]
......@@ -125,12 +138,16 @@ class MPIRunner(Runner):
curstate = 'RUN_DONE'
msg = ''
else:
logger.debug(f"MPI Job {job.cute_id} return code!=0: error")
curstate = 'RUN_ERROR'
msg = str(retcode)
self.process.communicate()
self.outfile.close()
tail = get_tail(self.outfile.name)
msg = f"RETURN CODE {retcode}:\n{tail}"
logger.debug(msg)
if job.state != curstate: job.update_state(curstate, msg) # TODO: handle RecordModified
class MPIEnsembleRunner(Runner):
'''One subprocess: an ensemble of serial jobs run in an mpi4py wrapper'''
def __init__(self, job_list, worker_list):
......@@ -272,14 +289,18 @@ class RunnerGroup:
for runner in self.runners: runner.update_jobs()
self.lock.release()
for runner in self.runners[:]:
if runner.finished():
for job in runner.jobs:
if job.state not in 'RUN_DONE RUN_ERROR RUN_TIMEOUT'.split():
msg = (f"Job {job.cute_id} runner process done, but "
"failed to update job state.")
logger.exception(msg)
raise RuntimeError(msg)
finished_runners = (r for r in self.runners if r.finished())
for runner in finished_runners:
if any(j.state not in ['RUN_DONE','RUN_ERROR','RUN_TIMEOUT'] for j in runner.jobs):
self.lock.acquire()
runner.update_jobs()
self.lock.release()
if any(j.state not in ['RUN_DONE','RUN_ERROR','RUN_TIMEOUT'] for j in runner.jobs):
msg = (f"Job {job.cute_id} runner process done, but failed to update job state.")
logger.exception(msg)
raise RuntimeError(msg)
else:
any_finished = True
self.runners.remove(runner)
for worker in runner.worker_list:
......
......@@ -40,6 +40,9 @@ class WorkerGroup:
def __iter__(self):
return iter(self.workers)
def __len__(self):
return len(self.workers)
def __getitem__(self, i):
return self.workers[i]
......
......@@ -4,14 +4,16 @@ import django
import tempfile
import unittest
tempdir = tempfile.TemporaryDirectory()
os.environ['BALSAM_TEST_DIRECTORY'] = tempdir.name
os.environ['BALSAM_TEST']='1'
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
if __name__ == "__main__":
tempdir = tempfile.TemporaryDirectory(dir=os.getcwd(), prefix="testdata_")
os.environ['BALSAM_TEST_DIRECTORY'] = tempdir.name
os.environ['BALSAM_TEST']='1'
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
loader = unittest.defaultTestLoader
if len(sys.argv) > 1:
names = sys.argv[1:]
......
from mpi4py import MPI
import argparse
import time
from sys import exit
rank = MPI.COMM_WORLD.Get_rank()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--sleep', type=int, default=0)
parser.add_argument('--retcode', type=int, default=0)
args = parser.parse_args()
print("Rank", rank, "on", MPI.Get_processor_name())
if args.sleep:
time.sleep(args.sleep)
exit(args.retcode)
from collections import namedtuple
import os
import sys
import time
from importlib.util import find_spec
from tests.BalsamTestCase import BalsamTestCase, cmdline
from django.conf import settings
from balsam.schedulers import Scheduler
from balsam.models import BalsamJob, ApplicationDefinition
......@@ -8,30 +15,82 @@ from balsamlauncher import worker
from balsamlauncher import runners
from balsamlauncher.launcher import get_args, create_new_runners
class TestRunners(BalsamTestCase):
'''Integration test for WorkerGroup, JobReader, and Runners/RunnerGroup'''
class TestMPIRunner(BalsamTestCase):
'''start, update_jobs, finished, error/timeout handling'''
def setUp(self):
self.scheduler = Scheduler.scheduler_main
self.host_type = self.scheduler.host_type
scheduler = Scheduler.scheduler_main
self.host_type = scheduler.host_type
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 4 --max-ranks-per-node 4'.split())
config = get_args('--consume-all --num-workers 2 --max-ranks-per-node 4'.split())
else:
config = get_args('--consume-all')
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str)
workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file)
self.job_source = jobreader.JobReader.from_config(config)
def testMPIEnsembleRunner(self):
'''Several non-MPI jobs packaged into one mpi4py wrapper'''
# Some jobs will pass; some will fail; some will timeout
pass
app_path = f"{sys.executable} {find_spec('tests.mock_mpi_app').origin}"
self.app = ApplicationDefinition()
self.app.name = "mock_mpi"
self.app.description = "print and sleep"
self.app.executable = app_path
self.app.save()
def assert_output_file_contains_n_ranks(self, fp, n):
'''specific check of mock_mpi_app.py output'''
found = []
for line in fp:
found.append(int(line.split()[1]))
self.assertSetEqual(set(range(n)), set(found))
def testMPIRunner_passes(self):
# varying ranks, rpn, tpr, tpc, envs
# varying application args
# check for successful job run, update, and output
pass
# Test:
work_configs = []
WorkerConfig = namedtuple('WorkerConfig', ['workers', 'num_nodes',
'ranks_per_node'])
# 2 ranks on one node
node0 = self.worker_group[0]
cfg = WorkerConfig([node0], 1, 2)
work_configs.append(cfg)
# max ranks on one node
cfg = WorkerConfig([node0], 1, node0.max_ranks_per_node)
work_configs.append(cfg)
# max ranks on all nodes
cfg = WorkerConfig(list(self.worker_group), len(self.worker_group),
node0.max_ranks_per_node)
work_configs.append(cfg)
for i, (workers, num_nodes, rpn) in enumerate(work_configs):
job = BalsamJob()
job.name = f"test{i}"
job.application = "mock_mpi"
job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = 1
job.ranks_per_node = 2
job.save()
self.assertEquals(job.state, 'CREATED')
job.create_working_path()
workers = self.worker_group[0]
runner = runners.MPIRunner([job], [workers])
runner.start()
runner.update_jobs()
while not runner.finished():
self.assertEquals(job.state, 'RUNNING')
runner.update_jobs()
time.sleep(0.5)
runner.update_jobs()
self.assertEquals(job.state, 'RUN_DONE')
outpath = os.path.join(job.working_directory, f"test{i}.out")
self.assertEqual(outpath, runner.outfile.name)
with open(outpath) as fp:
self.assert_output_file_contains_n_ranks(fp, 2)
def testMPIRunner_fails(self):
# ensure correct when job returns nonzero
......@@ -41,6 +100,21 @@ class TestRunners(BalsamTestCase):
# ensure correct when longr-running job times out
pass
class TestMPIEnsemble:
def setUp(self):
pass
def testMPIEnsembleRunner(self):
'''Several non-MPI jobs packaged into one mpi4py wrapper'''
# Some jobs will pass; some will fail; some will timeout
pass
class TestRunnerGroup:
def setUp(self):
pass
def test_create_runners(self):
# Create sets of jobs intended to exercise each code path
# in a single call to launcher.create_new_runners()
......
......@@ -5,6 +5,7 @@ logger = logging.getLogger('console')
try:
INSTALL_PATH = os.environ['ARGOBALSAM_INSTALL_PATH']
LOGGING_DIRECTORY = os.path.join(INSTALL_PATH, 'log') # where to store log files
except KeyError as e:
logger.error('Environment not setup: ' + str(e))
raise
......@@ -147,7 +148,6 @@ SENDER_CONFIG = {
#------------------------------
# logging settings
#------------------------------
LOGGING_DIRECTORY = os.path.join(INSTALL_PATH, 'log') # where to store log files
LOG_HANDLER_LEVEL = 'DEBUG'
LOG_BACKUP_COUNT = 5 # number of files worth of history
LOG_FILE_SIZE_LIMIT = 100 * 1024 * 1024 # file size at which to move to a new log file
......@@ -231,6 +231,7 @@ elif 'launcher' in ' '.join(sys.argv):
else:
logger = logging.getLogger('console')
def log_uncaught_exceptions(exctype, value, tb,logger=logger):
logger.error(f"Uncaught Exception {exctype}: {value}",exc_info=(exctype,value,tb))
logger = logging.getLogger('console')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment