Commit 7807b9ef authored by Michael Salim's avatar Michael Salim
Browse files

* --num-workers launcher argument will now cap the total number of

workers(nodes) available to Balsam; useful for running benchmarks

* timing capabilities built in to MPIEnsemble runner and MPIRunner
Now successful job runs should have an elapsed runtime in seconds
added to the field runtime_seconds

* Refactored utilites for automated testing and benchmarking
parent 1530e587
......@@ -76,7 +76,7 @@ for d in [
# LOGGING SETUP
# ----------------
HANDLER_FILE = os.path.join(LOGGING_DIRECTORY, LOG_FILENAME)
BALSAM_DB_CONFIG_LOG = os.path.join(LOGGING_DIRECTORY, "balsamdb-config.log")
BALSAM_DB_CONFIG_LOG = os.path.join(LOGGING_DIRECTORY, "db.log")
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
......
......@@ -227,9 +227,10 @@ def get_args(inputcmd=None):
help="Continuously run all jobs from DB")
group.add_argument('--wf-name',
help="Continuously run jobs of specified workflow")
parser.add_argument('--num-workers', type=int, default=1,
parser.add_argument('--num-workers', type=int, default=0,
help="Theta: defaults to # nodes. BGQ: the # of subblocks")
parser.add_argument('--nodes-per-worker', type=int, default=1)
parser.add_argument('--nodes-per-worker', help="(BG/Q only) # nodes per sublock",
type=int, default=1)
parser.add_argument('--max-ranks-per-node', type=int, default=1,
help="For non-MPI jobs, how many to pack per worker")
parser.add_argument('--time-limit-minutes', type=float, default=0,
......
......@@ -15,7 +15,7 @@ from subprocess import Popen, STDOUT, TimeoutExpired
from mpi4py import MPI
from balsam.launcher.util import cd, get_tail
from balsam.launcher.util import cd, get_tail, parse_real_time
from balsam.launcher.exceptions import *
from balsam.service.models import BalsamJob
......@@ -89,8 +89,9 @@ def run(job):
try:
status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
cmd = f"time -p ( {job.cmd} )"
env = job_from_db.get_envs() # TODO: Should we include this?
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT,
proc = Popen(cmd, stdout=outf, stderr=STDOUT,
cwd=job.workdir,env=env)
handler = lambda a,b: on_exit(proc)
......@@ -106,7 +107,9 @@ def run(job):
else:
if retcode == 0:
logger.debug(f"mpi_ensemble rank {RANK}: job returned 0")
status_msg(job.id, "RUN_DONE")
elapsed = parse_real_time(get_tail(outname, indent=''))
msg = f"elapsed seconds {elapsed}" if elapsed else ""
status_msg(job.id, "RUN_DONE", msg=msg)
elif retcode == "USER_KILLED":
status_msg(job.id, "USER_KILLED", msg="mpi_ensemble aborting job due to user request")
else:
......
......@@ -34,7 +34,7 @@ from django.db import transaction
from balsam.service.models import InvalidStateError
from balsam.launcher import mpi_commands
from balsam.launcher.exceptions import *
from balsam.launcher.util import cd, get_tail
from balsam.launcher.util import cd, get_tail, parse_real_time
import logging
logger = logging.getLogger(__name__)
......@@ -151,6 +151,7 @@ class MPIRunner(Runner):
num_ranks=nranks, ranks_per_node=rpn,
threads_per_rank=tpr, threads_per_core=tpc)
mpi_str = f'time -p ( {mpi_str} )'
basename = job.name
outname = os.path.join(job.working_directory, f"{basename}.out")
self.outfile = open(outname, 'w+b')
......@@ -175,6 +176,7 @@ class MPIRunner(Runner):
logger.info(f"MPIRunner {job.cute_id} return code 0: done")
curstate = 'RUN_DONE'
msg = ''
self.outfile.close()
else:
curstate = 'RUN_ERROR'
self.process.communicate()
......@@ -189,6 +191,11 @@ class MPIRunner(Runner):
logger.info(msg)
if job.state != curstate:
if curstate == 'RUN_DONE':
elapsed = parse_real_time(get_tail(self.outfile.name, indent=''))
if elapsed:
job.runtime_seconds = float(elapsed)
job.save(update_fields=['runtime_seconds'])
job.update_state(curstate, msg)
else:
job.refresh_from_db()
......@@ -254,6 +261,10 @@ class MPIEnsembleRunner(Runner):
except (ValueError, KeyError, InvalidStateError) as e:
if 'resources: utime' not in line:
logger.error(f"Unexpected statusMsg from mpi_ensemble: {line.strip()}")
else:
if "elapsed seconds" in msg:
job.runtime_seconds = float(msg.split()[-1])
job.save(update_fields=['runtime_seconds'])
retcode = None
if timeout:
......
from subprocess import Popen, PIPE, STDOUT
import os
def time_cmd(cmd, stdout=PIPE, stderr=STDOUT, envs=None):
'''Return string output from a command line'''
if type(cmd) == list:
cmd = ' '.join(cmd)
cmd = f'time ( {cmd} )'
p = subprocess.Popen(cmd, shell=True, stdout=stdout,
stderr=stdout, env=envs)
stdout = p.communicate()[0].decode('utf-8')
real_seconds = parse_real_time(stdout)
return stdout, realtime
def parse_real_time(stdout):
'''Parse linux "time -p" command'''
if type(stdout) == bytes:
stdout = stdout.decode()
lines = stdout.split('\n')
real_lines = [l for l in lines[-5:] if l.startswith('real')]
if not real_lines:
return None
elif len(real_lines) > 1:
real_line = real_lines[-1]
else:
real_line = real_lines[0]
time_str = real_line.split()[1]
return float(time_str)
def get_tail(fname, nlines=5, indent=' '):
'''grab last nlines of fname and format nicely'''
......
......@@ -44,6 +44,10 @@ class WorkerGroup:
self.workers = []
self.setup = getattr(self, f"setup_{self.host_type}")
self.setup(config)
if config.num_workers >= 1:
self.workers = self.workers[:config.num_workers]
logger.info(f"Built {len(self.workers)} {self.host_type} workers")
for worker in self.workers:
logger.debug(
......@@ -108,7 +112,10 @@ class WorkerGroup:
def setup_DEFAULT(self, config):
# Use command line config: num_workers, nodes_per_worker,
# max_ranks_per_node
for i in range(config.num_workers):
num_workers = config.num_workers
if not num_workers or num_workers < 1:
num_workers = 1
for i in range(num_workers):
w = Worker(i, host_type='DEFAULT',
num_nodes=config.nodes_per_worker,
max_ranks_per_node=config.max_ranks_per_node)
......
......@@ -182,7 +182,7 @@ def make_parser():
# QSUB
# ----
parser_qsub = subparsers.add_parser('qsub', help="add a one-line job to the database, bypassing Application table")
parser_qsub = subparsers.add_parser('qsub', help="add a one-line bash command or script job")
parser_qsub.set_defaults(func=qsub)
parser_qsub.add_argument('command', nargs='+')
parser_qsub.add_argument('-n', '--nodes', type=int, default=1, help="Number of compute nodes on which to run job")
......
......@@ -257,7 +257,6 @@ class BalsamJob(models.Model):
logger.info(f"direct save of {self.cute_id}")
self._save_direct(force_insert, force_update, using, update_fields)
else:
logger.info(f"sending request for save of {self.cute_id}")
settings.SAVE_CLIENT.save(self, force_insert, force_update, using, update_fields)
self.refresh_from_db()
......
import os
import unittest
import subprocess
import time
from django.core.management import call_command
from django import db
......@@ -35,20 +34,7 @@ class BalsamTestCase(unittest.TestCase):
raise RuntimeError("Test DB not configured")
call_command('flush',interactive=False,verbosity=0)
def cmdline(cmd,envs=None,shell=True):
'''Return string output from a command line'''
p = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,env=envs)
return p.communicate()[0].decode('utf-8')
def poll_until_returns_true(function, *, args=(), period=1.0, timeout=12.0):
start = time.time()
result = False
while time.time() - start < timeout:
result = function(*args)
if result: break
else: time.sleep(period)
return result
def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE, num_nodes=1,
ranks_per_node=1, threads_per_rank=1, threads_per_core=1, args='', workflow='',
......
import os
import sys
from socket import gethostname
import time
import subprocess
from importlib.util import find_spec
from balsam.service.models import BalsamJob
from tests.BalsamTestCase import BalsamTestCase, cmdline
from tests.BalsamTestCase import poll_until_returns_true
from tests.BalsamTestCase import BalsamTestCase
from tests.BalsamTestCase import create_job, create_app
def get_real_time(stdout):
'''Parse linux "time" command'''
if type(stdout) == bytes:
stdout = stdout.decode()
lines = stdout.split('\n')
nlines = len(lines)
nlines = max(5, nlines)
real_line = None
for line in lines[-nlines:]:
if line.startswith('real') and len(line.split())==2:
real_line = line
break
if not real_line: return None
time_str = real_line.split()[1]
minutes, seconds = time_str.split('m')
minutes = float(minutes)
seconds = float(seconds[:-1])
return 60*minutes + seconds
from tests import util
class TestInsertion(BalsamTestCase):
def setUp(self):
from itertools import takewhile, product
hello = find_spec("tests.benchmarks.concurrent_insert.hello").origin
create_app(name="hello", executable=hello)
data_dir = find_spec("tests.benchmarks.data").origin
self.data_dir = os.path.dirname(data_dir)
self.launcherInfo = util.launcher_info()
max_workers = self.launcherInfo.num_workers
worker_counts = takewhile(lambda x: x<=max_workers, (2**i for i in range(20)))
ranks_per_node = [4, 8, 16, 32]
self.experiments = list(product(worker_counts, ranks_per_node))
def test_concurrent_mpi_insert(self):
'''Timing: many MPI ranks simultaneously call dag.add_job'''
resultpath = os.path.join(self.data_dir, 'concurrent_insert.dat')
num_nodes = int(os.environ.get('COBALT_JOBSIZE', 0))
if num_nodes < 1:
self.skipTest("Need a COBALT allocation")
resultpath = os.path.join(util.DATA_DIR, 'concurrent_insert.dat')
cobalt_envs = {k:v for k,v in os.environ.items() if 'COBALT' in k}
with open(resultpath, 'a') as fp:
fp.write(f'# BENCHMARK: test_concurrent_mpi_insert ({__file__})\n')
fp.write(f'# Host: {gethostname()}\n')
for k, v in cobalt_envs.items():
fp.write(f'# {k}: {v}\n')
fp.write("# Each rank simultaneously calls dag.add_job (num_ranks simultaneous insertions)\n")
title = 'test_concurrent_mpi_insert'
comment = 'Each rank simultaneously calls dag.add_job (num_ranks simultaneous insertions)'
resultTable = util.FormatTable(
'num_nodes ranks_per_node num_ranks total_time_sec'.split()
)
ranks_per_node = [1, 2, 4, 8, 16, 32]
python = sys.executable
insert_app = find_spec("tests.benchmarks.concurrent_insert.mpi_insert").origin
with open(resultpath, 'a') as fp:
header = f'# {"# ranks".rjust(14):14} {"time / seconds".rjust(16):16} {"py_time / seconds".rjust(18):18}'
header += '\n# ' + '-'*(len(header)-2) + '\n'
fp.write(header)
for rpn in ranks_per_node:
for job in BalsamJob.objects.all(): job.delete()
for (num_nodes, rpn) in self.experiments:
BalsamJob.objects.all().delete()
total_ranks = num_nodes * rpn
start = time.time()
cmdline = f"time aprun -n {total_ranks} -N {rpn} {python} {insert_app}"
proc = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout, stderr = proc.communicate()
elapsed_py = time.time() - start
elapsed_sh = get_real_time(stdout)
app_cmd = f"{python} {insert_app}"
mpi_str = self.launcherInfo.mpi_cmd(
self.launcherInfo.workerGroup.workers,
app_cmd=app_cmd,
envs={},
num_ranks=total_ranks,
ranks_per_node=rpn,
threads_per_rank=1,
threads_per_core=1
)
cmdline = f"time -p ( {mpi_str} )"
stdout, elapsed_time = util.cmdline(cmdline)
out_lines = stdout.decode().split('\n')
success = list(l for l in out_lines if 'added job: success' in l)
success = list(l for l in stdout.split('\n') if 'added job: success' in l)
self.assertEqual(len(success), total_ranks)
self.assertEqual(BalsamJob.objects.count(), total_ranks)
with open(resultpath, 'a') as fp:
fp.write(f'{total_ranks:16} {elapsed_sh:16.3f} {elapsed_py:18.3f}\n')
with open(resultpath, 'a') as fp: fp.write(f'\n')
resultTable.add_row(num_nodes=num_nodes, ranks_per_node=rpn,
num_ranks=total_ranks,
total_time_sec=elapsed_time
)
with open(resultpath, 'w') as fp:
fp.write(resultTable.generate(title, comment))
import itertools
import os
import sys
from socket import gethostname
import time
import subprocess
from importlib.util import find_spec
from balsam.service.models import BalsamJob
from tests.BalsamTestCase import BalsamTestCase
from tests.BalsamTestCase import create_job, create_app
from tests import util
class TestNoOp(BalsamTestCase):
def setUp(self):
self.max_nodes = int(os.environ.get('COBALT_JOBSIZE', 1))
num_nodes = [2**n for n in range(1,13) if 2**n <= self.max_nodes]
rpn = [16, 32]
jpn = [16, 64, 128, 256, 512, 1024]
self.experiments = itertools.product(num_nodes, rpn, jpn)
def serial_expt(self, num_nodes, rpn, jpn):
BalsamJob.objects.all().delete()
num_jobs = num_nodes * jpn
for i in range(num_jobs):
job = create_job(name=f'task{i}', direct_command=f'echo Hello',
args=str(i), workflow='bench-no-op')
def test_no_op(self):
for (num_nodes, rpn, jpn) in self.experiments:
self.serial_expt(num_nodes, rpn, jpn)
# BENCHMARK: test_concurrent_mpi_insert
# Host: alcfwl130.alcf.anl.gov
# Each rank simultaneously calls dag.add_job (num_ranks simultaneous insertions)
# num_nodes ranks_per_node num_ranks total_time_sec
# --------------------------------------------------------------
1 4 4 1.700
1 8 8 2.000
1 16 16 2.800
1 32 32 4.470
import sys
from tests.BalsamTestCase import BalsamTestCase, cmdline
from tests.BalsamTestCase import BalsamTestCase
from tests.util import cmdline
from balsam.service.models import BalsamJob
class BalsamDAGTests(BalsamTestCase):
......@@ -13,7 +14,7 @@ class BalsamDAGTests(BalsamTestCase):
def mock_postprocessor_run(self, job, keyword):
'''Run the mock postprocesser as if it were happening in a Balsam Transition'''
envs = job.get_envs()
stdout = cmdline(' '.join([sys.executable, self.user_script, keyword]),
stdout, time = cmdline(' '.join([sys.executable, self.user_script, keyword]),
envs=envs)
return stdout
......
......@@ -4,90 +4,14 @@ import os
import random
import getpass
import sys
import signal
import subprocess
import time
import tempfile
from importlib.util import find_spec
from balsam.service.models import BalsamJob
from tests.BalsamTestCase import BalsamTestCase, cmdline
from tests.BalsamTestCase import poll_until_returns_true
from tests.BalsamTestCase import BalsamTestCase
from tests import util
from tests.BalsamTestCase import create_job, create_app
def ls_procs(keywords):
if type(keywords) == str: keywords = [keywords]
username = getpass.getuser()
searchcmd = 'ps aux | grep '
searchcmd += ' | grep '.join(f'"{k}"' for k in keywords)
grep = subprocess.Popen(searchcmd, shell=True, stdout=subprocess.PIPE)
stdout,stderr = grep.communicate()
stdout = stdout.decode('utf-8')
processes = [line for line in stdout.split('\n') if 'python' in line and line.split()[0]==username]
return processes
def sig_processes(process_lines, signal):
for line in process_lines:
proc = int(line.split()[1])
try:
os.kill(proc, signal)
except ProcessLookupError:
print(f"WARNING: could not find:\n{line}\ntried to send {signal}")
def stop_launcher_processes():
processes = ls_procs('launcher.py --consume')
sig_processes(processes, signal.SIGTERM)
def check_processes_done():
procs = ls_procs('launcher.py --consume')
return len(procs) == 0
poll_until_returns_true(check_processes_done, period=2, timeout=12)
processes = ls_procs('launcher.py --consume')
if processes:
print("Warning: these did not properly shutdown on SIGTERM:")
print("\n".join(processes))
print("Sending SIGKILL")
sig_processes(processes, signal.SIGKILL)
time.sleep(3)
def run_launcher_until(function, args=(), period=1.0, timeout=60.0):
launcher_proc = subprocess.Popen(['balsam', 'launcher', '--consume',
'--max-ranks-per-node', '8'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
success = poll_until_returns_true(function, args=args, period=period, timeout=timeout)
stop_launcher_processes()
return success
def run_launcher_seconds(seconds):
minutes = seconds / 60.0
launcher_path = sys.executable + " " + find_spec("balsam.launcher.launcher").origin
launcher_path += " --consume --max-ranks 8 --time-limit-minutes " + str(minutes)
launcher_proc = subprocess.Popen(launcher_path.split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
try: launcher_proc.communicate(timeout=seconds+30)
finally: stop_launcher_processes()
def run_launcher_until_state(job, state, period=1.0, timeout=60.0):
def check():
job.refresh_from_db()
return job.state == state
success = run_launcher_until(check, period=period, timeout=timeout)
return success
class TestSingleJobTransitions(BalsamTestCase):
def setUp(self):
aliases = "make_sides square reduce".split()
......@@ -123,7 +47,7 @@ class TestSingleJobTransitions(BalsamTestCase):
# Run the launcher and make sure that the job gets carried all the way
# through to completion
success = run_launcher_until_state(job, 'JOB_FINISHED')
success = util.run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
# job staged in this remote side0.dat file; it contains "9"
......@@ -189,7 +113,7 @@ class TestSingleJobTransitions(BalsamTestCase):
self.assertEqual(BalsamJob.objects.all().count(), 1)
# The job is marked FAILED due to unhandled nonzero return code
success = run_launcher_until_state(job, 'FAILED')
success = util.run_launcher_until_state(job, 'FAILED')
self.assertTrue(success)
# (But actually the application ran and printed its result correctly)
......@@ -219,7 +143,7 @@ class TestSingleJobTransitions(BalsamTestCase):
self.assertEqual(BalsamJob.objects.all().count(), 1)
# The job finished successfully despite a nonzero return code
success = run_launcher_until_state(job, 'JOB_FINISHED')
success = util.run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
# Make sure at some point, it was marked with RUN_ERROR
......@@ -252,7 +176,7 @@ class TestSingleJobTransitions(BalsamTestCase):
url_out=f'local:{remote_dir.name}')
# Job reaches the RUNNING state and then times out
success = run_launcher_until_state(job, 'RUNNING')
success = util.run_launcher_until_state(job, 'RUNNING')
self.assertTrue(success)
# On termination, actively running job is marked RUN_TIMEOUT
......@@ -260,12 +184,12 @@ class TestSingleJobTransitions(BalsamTestCase):
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=12)
success = util.poll_until_returns_true(check,timeout=12)
self.assertTrue(success)
self.assertEquals(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
success = run_launcher_until_state(job, 'JOB_FINISHED')
success = util.run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
self.assertIn('RESTART_READY', job.state_history)
......@@ -289,7 +213,7 @@ class TestSingleJobTransitions(BalsamTestCase):
post_timeout_handler=True)
# Job reaches the RUNNING state and then times out
success = run_launcher_until_state(job, 'RUNNING')
success = util.run_launcher_until_state(job, 'RUNNING')
self.assertTrue(success)
# On termination, actively running job is marked RUN_TIMEOUT
......@@ -297,11 +221,11 @@ class TestSingleJobTransitions(BalsamTestCase):
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=12)
success = util.poll_until_returns_true(check,timeout=12)
self.assertEquals(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
success = run_launcher_until_state(job, 'JOB_FINISHED')
success = util.run_launcher_until_state(job, 'JOB_FINISHED')
self.assertTrue(success)
self.assertNotIn('RESTART_READY', job.state_history)
self.assertIn('handled timeout in square_post', job.state_history)
......@@ -327,7 +251,7 @@ class TestSingleJobTransitions(BalsamTestCase):
auto_timeout_retry=False)
# Job reaches the RUNNING state and then times out
success = run_launcher_until_state(job, 'RUNNING')
success = util.run_launcher_until_state(job, 'RUNNING')
self.assertTrue(success)
# On termination, actively running job is marked RUN_TIMEOUT
......@@ -335,12 +259,12 @@ class TestSingleJobTransitions(BalsamTestCase):
job.refresh_from_db()
return job.state == 'RUN_TIMEOUT'
success = poll_until_returns_true(check,timeout=12)
success = util.poll_until_returns_true(check,timeout=12)
self.assertEqual(job.state, 'RUN_TIMEOUT')
# If we run the launcher again, it will pick up the timed out job
# But without timeout handling, it fails
success = run_launcher_until_state(job, 'FAILED')
success = util.run_launcher_until_state(job, 'FAILED')
self.assertTrue(success)
class TestDAG(BalsamTestCase):
......@@ -450,8 +374,8 @@ class TestDAG(BalsamTestCase):