Commit ae0e5310 authored by Michael Salim's avatar Michael Salim
Browse files

fixed bugs related to testing; runner timing

parent e0b89170
......@@ -231,7 +231,7 @@ def get_args(inputcmd=None):
help="Theta: defaults to # nodes. BGQ: the # of subblocks")
parser.add_argument('--nodes-per-worker', help="(BG/Q only) # nodes per sublock",
type=int, default=1)
parser.add_argument('--max-ranks-per-node', type=int, default=1,
parser.add_argument('--max-ranks-per-node', type=int, default=4,
help="For non-MPI jobs, how many to pack per worker")
parser.add_argument('--time-limit-minutes', type=float, default=0,
help="Provide a walltime limit if not already imposed")
......
......@@ -52,6 +52,7 @@ def read_jobs(fp):
for line in fp:
try:
id, workdir, *command = line.split()
command = ' '.join(command)
logger.debug(f"Read Job {id} CMD: {command} DIR: {workdir}")
except:
logger.debug("Invalid jobline")
......@@ -92,7 +93,7 @@ def run(job):
cmd = f"time -p ( {job.cmd} )"
env = job_from_db.get_envs() # TODO: Should we include this?
proc = Popen(cmd, stdout=outf, stderr=STDOUT,
cwd=job.workdir,env=env)
cwd=job.workdir,env=env, shell=True)
handler = lambda a,b: on_exit(proc)
signal.signal(signal.SIGINT, handler)
......@@ -108,7 +109,7 @@ def run(job):
if retcode == 0:
logger.debug(f"mpi_ensemble rank {RANK}: job returned 0")
elapsed = parse_real_time(get_tail(outname, indent=''))
msg = f"elapsed seconds {elapsed}" if elapsed else ""
msg = f"elapsed seconds {elapsed}" if elapsed is not None else ""
status_msg(job.id, "RUN_DONE", msg=msg)
elif retcode == "USER_KILLED":
status_msg(job.id, "USER_KILLED", msg="mpi_ensemble aborting job due to user request")
......
......@@ -155,10 +155,11 @@ class MPIRunner(Runner):
basename = job.name
outname = os.path.join(job.working_directory, f"{basename}.out")
self.outfile = open(outname, 'w+b')
self.popen_args['args'] = shlex.split(mpi_str)
self.popen_args['args'] = mpi_str
self.popen_args['cwd'] = job.working_directory
self.popen_args['stdout'] = self.outfile
self.popen_args['stderr'] = STDOUT
self.popen_args['shell'] = True
self.popen_args['bufsize'] = 1
logger.info(f"MPIRunner {job.cute_id} Popen:\n{self.popen_args['args']}")
logger.info(f"MPIRunner: writing output to {outname}")
......@@ -270,8 +271,8 @@ class MPIEnsembleRunner(Runner):
if timeout:
for job in self.jobs:
if job.state == 'RUNNING':
logger.debug(f"MPIEnsemble job {job.cute_id} RUN_TIMEOUT")
job.update_state('RUN_TIMEOUT', 'timed out during MPIEnsemble')
logger.debug(f"MPIEnsemble job {job.cute_id} RUN_TIMEOUT")
else:
retcode = self.process.poll()
if retcode not in [None, 0]:
......
......@@ -447,7 +447,7 @@ def make_dummies(args):
job.stage_in_url = ''
job.stage_out_url = ''
job.stage_out_files = ''
job.direct_command = 'echo hello'
job.direct_command = 'sleep 0.1 && echo hello'
job.save()
print(f"Added {args.num} dummy jobs to the DB")
......@@ -458,11 +458,11 @@ auto timeout retry: {self.auto_timeout_retry}
return f' {str(self.pk):36} | {self.name:26} | {self.workflow:26} | {app:26} | {recent_state}'
def runtime_str(self):
if self.runtime_seconds == 0: return ''
minutes, seconds = divmod(self.runtime_seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours: return f"{hours:02d} hr : {minutes:02d} min : {seconds:02d} sec"
else: return f"{minutes:02d} min : {seconds:02d} sec"
hours, minutes = round(hours), round(minutes)
if hours: return f"{hours:02d} hr : {minutes:02d} min : {seconds:05.2f} sec"
else: return f"{minutes:02d} min : {seconds:05.2f} sec"
@staticmethod
def get_header():
......
......@@ -2,7 +2,6 @@ from collections import defaultdict
import glob
import os
import random
import getpass
import sys
import tempfile
from importlib.util import find_spec
......@@ -820,7 +819,6 @@ class TestConcurrentDB(BalsamTestCase):
created_jobs = BalsamJob.objects.filter(name__icontains='hello')
self.assertEqual(created_jobs.count(), num_ranks)
print(f"Successfully created {num_ranks} jobs by concurrent insertion")
class TestUserKill(BalsamTestCase):
......
from collections import namedtuple
import getpass
import os
import random
from multiprocessing import Lock
......@@ -77,7 +76,7 @@ class TestMPIRunner(BalsamTestCase):
self.assertTrue(runner.finished())
runner.update_jobs()
self.assertEquals(job.state, 'RUN_DONE')
stop_processes('mock_mpi')
util.stop_processes('mock_mpi')
# Check that the correct output is really there:
outpath = runner.outfile.name
......@@ -100,7 +99,7 @@ class TestMPIRunner(BalsamTestCase):
util.poll_until_returns_true(runner.finished, period=0.5)
runner.update_jobs()
self.assertEquals(job.state, 'RUN_ERROR')
stop_processes('mock_mpi')
util.stop_processes('mock_mpi')
def test_timeouts(self):
'''MPI application runs for too long, call timeout, marked RUN_TIMEOUT'''
......@@ -132,11 +131,11 @@ class TestMPIRunner(BalsamTestCase):
term = util.poll_until_returns_true(runner.finished, period=0.1,
timeout=6.0)
self.assertTrue(term)
stop_processes('mock_mpi')
util.stop_processes('mock_mpi')
class TestMPIEnsemble(BalsamTestCase):
def setUp(self):
launchInfo = util.launcher_info()
launchInfo = util.launcher_info(max_ranks=8)
self.worker_group = launchInfo.workerGroup
app_path = f"{sys.executable} {find_spec('tests.mock_serial_app').origin}"
......@@ -219,12 +218,12 @@ class TestMPIEnsemble(BalsamTestCase):
self.assertTrue(all(j.state=='RUN_ERROR' for j in jobs['fail']))
# Kill the sleeping jobs in case they do not terminate
stop_processes('mpi_ensemble mock_serial')
util.stop_processes('mpi_ensemble mock_serial')
class TestRunnerGroup(BalsamTestCase):
def setUp(self):
launchInfo = util.launcher_info()
launchInfo = util.launcher_info(max_ranks=4)
self.worker_group = launchInfo.workerGroup
app_path = f"{sys.executable} {find_spec('tests.mock_mpi_app').origin}"
......@@ -314,9 +313,9 @@ class TestRunnerGroup(BalsamTestCase):
success = util.poll_until_returns_true(check_done, timeout=40)
self.assertTrue(success)
stop_processes('mpi_ensemble')
stop_processes('mock_serial')
stop_processes('mock_mpi')
util.stop_processes('mpi_ensemble')
util.stop_processes('mock_serial')
util.stop_processes('mock_mpi')
# Now there should be no runners, PKs, or busy workers left
self.assertListEqual(list(runner_group), [])
......
......@@ -23,7 +23,7 @@ class WorkerGroupUnitTests(BalsamTestCase):
group = worker.WorkerGroup(config, host_type='DEFAULT', workers_str=None)
self.assertEqual(len(group.workers), 1)
self.assertEqual(group.workers[0].num_nodes, 1)
self.assertEqual(group.workers[0].max_ranks_per_node, 1)
self.assertEqual(group.workers[0].max_ranks_per_node, 4)
config = get_args('--consume-all --num-workers 3 --max-ranks-per-node 4'.split())
group = worker.WorkerGroup(config, host_type='DEFAULT', workers_str=None)
......@@ -67,4 +67,3 @@ class WorkerGroupUnitTests(BalsamTestCase):
stdout, _ = util.cmdline(mpi_str)
self.assertIn('Rank 0', stdout)
self.assertIn('Rank 1', stdout)
self.assertEqual(mpi.returncode, 0)
import getpass
import os
import subprocess
import signal
......@@ -14,12 +15,12 @@ def launcher_info(num_workers=None, max_ranks=None):
from balsam.launcher.launcher import get_args
from balsam.launcher import mpi_commands
args = '--consume-all'
args = '--consume-all '
if num_workers and num_workers > 0:
args += f'--num-workers {num_workers}'
args += f'--num-workers {num_workers} '
if max_ranks and max_ranks > 0:
args += f'--max-ranks-per-node {max_ranks}'
args += f'--max-ranks-per-node {max_ranks} '
config = get_args(args.split())
scheduler = Scheduler.scheduler_main
......@@ -90,7 +91,7 @@ def ls_procs(keywords):
searchcmd = 'ps aux | grep '
searchcmd += ' | grep '.join(f'"{k}"' for k in keywords)
grep_out, _ = cmdline(searchcmd)
stdout, _ = cmdline(searchcmd)
processes = [line for line in stdout.split('\n') if 'python' in line and line.split()[0]==username]
return processes
......@@ -118,10 +119,10 @@ def stop_processes(name):
time.sleep(3)
def stop_launcher_processes():
stop_processes('launcher.py --consume')
stop_processes('launcher.py')
def run_launcher_until(function, args=(), period=1.0, timeout=60.0, maxrpn=16):
cmd = 'balsam launcher --consume --max-ranks-per-node {maxrpn}'
def run_launcher_until(function, args=(), period=1.0, timeout=60.0, maxrpn=8):
cmd = f'balsam launcher --consume --max-ranks-per-node {maxrpn}'
launcher_proc = subprocess.Popen(cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment