Commit d090ce11 authored by Michael Salim's avatar Michael Salim
Browse files

updated test runner for Theta

parent 21e3f6f2
...@@ -654,7 +654,7 @@ class TestDAG(BalsamTestCase): ...@@ -654,7 +654,7 @@ class TestDAG(BalsamTestCase):
chA.refresh_from_db() chA.refresh_from_db()
chB.refresh_from_db() chB.refresh_from_db()
return chA.state=='JOB_FINISHED' and chB.state=='JOB_FINISHED' return chA.state=='JOB_FINISHED' and chB.state=='JOB_FINISHED'
success = run_launcher_until(check) success = run_launcher_until(check, timeout=120)
parent.refresh_from_db() parent.refresh_from_db()
chA.refresh_from_db() chA.refresh_from_db()
......
from collections import namedtuple from collections import namedtuple
import getpass
import os import os
import random import random
import signal
from multiprocessing import Lock from multiprocessing import Lock
import sys import sys
import subprocess
import time import time
from uuid import UUID from uuid import UUID
from importlib.util import find_spec from importlib.util import find_spec
...@@ -19,6 +22,42 @@ from balsam.launcher import worker ...@@ -19,6 +22,42 @@ from balsam.launcher import worker
from balsam.launcher import runners from balsam.launcher import runners
from balsam.launcher.launcher import get_args, create_runner from balsam.launcher.launcher import get_args, create_runner
def ls_procs(keywords):
if type(keywords) == str: keywords = [keywords]
username = getpass.getuser()
searchcmd = 'ps aux | grep '
searchcmd += ' | grep '.join(f'"{k}"' for k in keywords)
grep = subprocess.Popen(searchcmd, shell=True, stdout=subprocess.PIPE)
stdout,stderr = grep.communicate()
stdout = stdout.decode('utf-8')
processes = [line for line in stdout.split('\n') if 'python' in line and line.split()[0]==username]
return processes
def sig_processes(process_lines, signal):
for line in process_lines:
proc = int(line.split()[1])
try:
os.kill(proc, signal)
except ProcessLookupError:
pass
def stop_processes(name):
processes = ls_procs(name)
sig_processes(processes, signal.SIGTERM)
def check_processes_done():
procs = ls_procs(name)
return len(procs) == 0
poll_until_returns_true(check_processes_done, period=2, timeout=12)
processes = ls_procs(name)
if processes:
sig_processes(processes, signal.SIGKILL)
class TestMPIRunner(BalsamTestCase): class TestMPIRunner(BalsamTestCase):
'''start, update_jobs, finished, error/timeout handling''' '''start, update_jobs, finished, error/timeout handling'''
...@@ -87,6 +126,7 @@ class TestMPIRunner(BalsamTestCase): ...@@ -87,6 +126,7 @@ class TestMPIRunner(BalsamTestCase):
self.assertTrue(runner.finished()) self.assertTrue(runner.finished())
runner.update_jobs() runner.update_jobs()
self.assertEquals(job.state, 'RUN_DONE') self.assertEquals(job.state, 'RUN_DONE')
stop_processes('mock_mpi')
# Check that the correct output is really there: # Check that the correct output is really there:
outpath = runner.outfile.name outpath = runner.outfile.name
...@@ -109,6 +149,7 @@ class TestMPIRunner(BalsamTestCase): ...@@ -109,6 +149,7 @@ class TestMPIRunner(BalsamTestCase):
poll_until_returns_true(runner.finished, period=0.5) poll_until_returns_true(runner.finished, period=0.5)
runner.update_jobs() runner.update_jobs()
self.assertEquals(job.state, 'RUN_ERROR') self.assertEquals(job.state, 'RUN_ERROR')
stop_processes('mock_mpi')
def test_timeouts(self): def test_timeouts(self):
'''MPI application runs for too long, call timeout, marked RUN_TIMEOUT''' '''MPI application runs for too long, call timeout, marked RUN_TIMEOUT'''
...@@ -140,6 +181,7 @@ class TestMPIRunner(BalsamTestCase): ...@@ -140,6 +181,7 @@ class TestMPIRunner(BalsamTestCase):
term = poll_until_returns_true(runner.finished, period=0.1, term = poll_until_returns_true(runner.finished, period=0.1,
timeout=6.0) timeout=6.0)
self.assertTrue(term) self.assertTrue(term)
stop_processes('mock_mpi')
class TestMPIEnsemble(BalsamTestCase): class TestMPIEnsemble(BalsamTestCase):
def setUp(self): def setUp(self):
...@@ -234,10 +276,8 @@ class TestMPIEnsemble(BalsamTestCase): ...@@ -234,10 +276,8 @@ class TestMPIEnsemble(BalsamTestCase):
self.assertTrue(all(j.state=='RUN_ERROR' for j in jobs['fail'])) self.assertTrue(all(j.state=='RUN_ERROR' for j in jobs['fail']))
# Kill the sleeping jobs in case they do not terminate # Kill the sleeping jobs in case they do not terminate
killcmd = "ps aux | grep mock_serial | grep -v grep | grep -v vim | awk '{print $2}' | xargs kill -9" stop_processes('mpi_ensemble')
os.system(killcmd) stop_processes('mock_serial')
killcmd = "ps aux | grep mpi_ensemble.py | grep -v grep | grep -v vim | awk '{print $2}' | xargs kill -9"
os.system(killcmd)
class TestRunnerGroup(BalsamTestCase): class TestRunnerGroup(BalsamTestCase):
...@@ -337,7 +377,12 @@ class TestRunnerGroup(BalsamTestCase): ...@@ -337,7 +377,12 @@ class TestRunnerGroup(BalsamTestCase):
runner_group.update_and_remove_finished() runner_group.update_and_remove_finished()
return all(r.finished() for r in runner_group) return all(r.finished() for r in runner_group)
poll_until_returns_true(check_done, timeout=40) success = poll_until_returns_true(check_done, timeout=40)
self.assertTrue(success)
stop_processes('mpi_ensemble')
stop_processes('mock_serial')
stop_processes('mock_mpi')
# Now there should be no runners, PKs, or busy workers left # Now there should be no runners, PKs, or busy workers left
self.assertListEqual(list(runner_group), []) self.assertListEqual(list(runner_group), [])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment