Commit b8fea20a authored by Michael Salim's avatar Michael Salim
Browse files

unit tests

parent 1de34e89
import os
import unittest
import subprocess
from django.core.management import call_command
from django import db
......@@ -8,6 +9,7 @@ from django.conf import settings
class BalsamTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
assert db.connection.settings_dict['NAME'].endswith('test_db.sqlite3')
call_command('makemigrations',interactive=False,verbosity=0)
call_command('migrate',interactive=False,verbosity=0)
assert os.path.exists(settings.DATABASES['default']['NAME'])
......@@ -23,3 +25,9 @@ class BalsamTestCase(unittest.TestCase):
if not db.connection.settings_dict['NAME'].endswith('test_db.sqlite3'):
raise RuntimeError("Test DB not configured")
call_command('flush',interactive=False,verbosity=0)
def cmdline(cmd,envs=None,shell=True):
'''Return string output from a command line'''
p = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,env=envs)
return p.communicate()[0].decode('utf-8')
'''Mock user postprocess script; testing out the Python API provided in
balsamlauncher.dag'''
import sys
import balsamlauncher.dag as dag
def mock_spawn():
child = dag.spawn_child(clone=True, name='spawned_child', state='CREATED')
def mock_addjobs():
job1 = dag.add_job(name="added1")
job2 = dag.add_job(name="added2")
job3 = dag.add_job(name="added3")
dag.add_dependency(parent=job2, child=job3)
def mock_kill():
current_job = dag.current_job
dag.kill(current_job, recursive=True)
if __name__ == "__main__":
this_module = sys.modules[__name__]
keyword = sys.argv[1]
fxn = getattr(this_module, f'mock_{keyword}')
fxn()
import sys
from tests.BalsamTestCase import BalsamTestCase, cmdline
from balsam.models import BalsamJob
class BalsamDAGTests(BalsamTestCase):
def setUp(self):
'''Mock user postprocess script'''
from importlib.util import find_spec
self.user_script = find_spec("tests.mock_postprocess").origin
def mock_postprocessor_run(self, job, keyword):
'''Run the mock postprocesser as if it were happening in a Balsam Transition'''
envs = job.get_envs()
stdout = cmdline(' '.join([sys.executable, self.user_script, keyword]),
envs=envs)
return stdout
def test_dynamically_spawn_child(self):
'''Can dynamically spawn a child job'''
# One job in the Balsam DB is ready for postprocessing
job = BalsamJob()
job.update_state('RUN_DONE')
self.assertEquals(BalsamJob.objects.all().count(), 1)
# The user wrote a postprocess script using the balsamlauncher.dag API
# Balsam transitions.py invokes the script with job-specific envs
# The script, in turn, dynamically spawns a child process:
self.mock_postprocessor_run(job, "spawn")
# Now that the postprocess has run, the user confirms that there are two
# jobs in the Balsam DB, and the second job is a child of the first
jobs = BalsamJob.objects.all()
self.assertEquals(jobs.count(), 2)
parent = BalsamJob.objects.get(pk=job.pk)
child = jobs.exclude(pk=parent.pk).first()
self.assertIn(child, parent.get_children())
def test_dynamically_create_job(self):
'''Can dynamically add some jobs and dependencies'''
# One job in the Balsam DB is ready for postprocessing
job = BalsamJob()
job.name = "original"
job.update_state('RUN_DONE')
self.assertEquals(BalsamJob.objects.all().count(), 1)
# user postprocess script: use dag API to create 3 jobs
# (see function mock_addjobs in mock_postprocess.py)
self.mock_postprocessor_run(job, "addjobs")
# Now there are 4 jobs
self.assertEquals(BalsamJob.objects.all().count(), 4)
newjobs = BalsamJob.objects.filter(name__contains="added")
self.assertEquals(newjobs.count(), 3)
# Job 3 depends on the completion of Job 2
newjob1 = BalsamJob.objects.get(name="added1")
newjob2 = BalsamJob.objects.get(name="added2")
newjob3 = BalsamJob.objects.get(name="added3")
self.assertFalse(newjob1.get_parents().exists())
self.assertFalse(newjob2.get_parents().exists())
parents_of_3 = newjob3.get_parents()
self.assertEqual(parents_of_3.count(), 1)
self.assertEqual([newjob2.pk], list(p.pk for p in parents_of_3))
def test_kill_subtree(self):
'''Can kill a subtree'''
# Five jobs in DB: two subtrees
# A --> B, C (children)
# D --> E
A = BalsamJob()
A.name = 'A'
A.save()
B = BalsamJob()
B.name = 'B'
B.save()
C = BalsamJob()
C.name = 'C'
C.save()
D = BalsamJob()
D.name = 'D'
D.save()
E = BalsamJob()
E.name = 'E'
E.save()
B.set_parents([A])
C.set_parents([A])
E.set_parents([D])
self.assertEquals(BalsamJob.objects.all().count(), 5)
# user postprocess script: use dag API to kill the "A" subtree
out = self.mock_postprocessor_run(A, "kill")
print(out)
# There are still 5 jobs
# But now A,B,C are killed; D,E unaffected
self.assertEquals(BalsamJob.objects.all().count(), 5)
A = BalsamJob.objects.get(name="A")
B = BalsamJob.objects.get(name="B")
C = BalsamJob.objects.get(name="C")
D = BalsamJob.objects.get(name="D")
E = BalsamJob.objects.get(name="E")
self.assertEquals(A.state, "USER_KILLED")
self.assertEquals(B.state, "USER_KILLED")
self.assertEquals(C.state, "USER_KILLED")
self.assertEquals(D.state, "CREATED")
self.assertEquals(E.state, "CREATED")
from tests.BalsamTestCase import BalsamTestCase
import subprocess
import random
import tempfile
def cmdline(cmd):
'''Return string output from a command line'''
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return p.communicate()[0].decode('utf-8')
from tests.BalsamTestCase import BalsamTestCase, cmdline
from balsamlauncher import jobreader
from balsamlauncher.launcher import get_args
from balsam.models import BalsamJob
from django.conf import settings
BALSAM_SITE = settings.BALSAM_SITE
class BalsamLauncherTests(BalsamTestCase):
class JobReaderTests(BalsamTestCase):
def setUp(self):
'''several jobs, each belongs to one of 3 WFs'''
self.NUM_JOBS = 128
self.workflows = ['one', 'two', 'three']
def test_whatever(self):
stdout = cmdline('balsam ls')
self.assertIn('No jobs found matching query', stdout)
for i in range(self.NUM_JOBS):
job = BalsamJob()
job.name = f"job{i}"
job.allowed_work_sites = f"siteA siteB {BALSAM_SITE} siteD"
job.workflow = random.choice(self.workflows)
job.save()
def test_consume_all_reader(self):
'''consume-all job reader should retreive all'''
self.assertEqual(self.NUM_JOBS, BalsamJob.objects.count())
config = get_args('--consume-all'.split())
source = jobreader.JobReader.from_config(config)
self.assertIsInstance(source, jobreader.WFJobReader)
self.assertFalse(source.wf_name)
source.refresh_from_db()
self.assertEqual(len(source.jobs), self.NUM_JOBS)
def test_consume_by_workflow(self):
'''wf-name job reader should retreive only that WF'''
config = get_args('--wf-name two'.split())
source = jobreader.JobReader.from_config(config)
self.assertIsInstance(source, jobreader.WFJobReader)
self.assertTrue(source.wf_name)
source.refresh_from_db()
jobs_in_two = BalsamJob.objects.filter(workflow="two")
self.assertEqual(len(source.jobs), jobs_in_two.count())
source_set = set(job.pk for job in source.jobs)
jobs_in_two_set = set(job.pk for job in jobs_in_two)
self.assertEqual(source_set, jobs_in_two_set)
def test_consume_from_file(self):
'''job-file reader should retreive only PKs in file'''
pks_in_file = []
jobs = BalsamJob.objects.all()
with tempfile.NamedTemporaryFile(mode='w', delete=False) as jobsfile:
fname = jobsfile.name
for i in range(12):
job = random.choice(jobs)
jobsfile.write(f"{job.pk}\n")
pks_in_file.append(job.pk)
config = get_args(f'--job-file {fname}'.split())
source = jobreader.JobReader.from_config(config)
self.assertIsInstance(source, jobreader.FileJobReader)
source.refresh_from_db()
self.assertEqual(len(source.jobs), len(set(pks_in_file)))
source_set = set(job.pk for job in source.jobs)
self.assertEqual(source_set, set(pks_in_file))
from tests.BalsamTestCase import BalsamTestCase, cmdline
from balsam.schedulers import Scheduler
from balsam.models import BalsamJob, ApplicationDefinition
from balsamlauncher import jobreader
from balsamlauncher import worker
from balsamlauncher import runners
from balsamlauncher.launcher import get_args, create_new_runners
class TestRunners(BalsamTestCase):
'''Integration test for WorkerGroup, JobReader, and Runners/RunnerGroup'''
def setUp(self):
self.scheduler = Scheduler.scheduler_main
self.host_type = self.scheduler.host_type
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 4 --max-ranks-per-node 4'.split())
else:
config = get_args('--consume-all')
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str)
self.job_source = jobreader.JobReader.from_config(config)
def testMPIEnsembleRunner(self):
'''Several non-MPI jobs packaged into one mpi4py wrapper'''
# Some jobs will pass; some will fail; some will timeout
pass
def testMPIRunner_passes(self):
# varying ranks, rpn, tpr, tpc, envs
# varying application args
# check for successful job run, update, and output
pass
def testMPIRunner_fails(self):
# ensure correct when job returns nonzero
pass
def testMPIRunner_timeouts(self):
# ensure correct when longr-running job times out
pass
def test_create_runners(self):
# Create sets of jobs intended to exercise each code path
# in a single call to launcher.create_new_runners()
pass
from tests.BalsamTestCase import BalsamTestCase
from balsam.models import BalsamJob
from django import db
class DumbTestCase(BalsamTestCase):
from balsamlauncher import worker
from balsamlauncher.launcher import get_args
from balsam.schedulers import Scheduler
class WorkerGroupUnitTests(BalsamTestCase):
def setUp(self):
BalsamJob.objects.create(name="hello_testing!")
assert db.connection.settings_dict['NAME'].endswith('test_db.sqlite3')
self.scheduler = Scheduler.scheduler_main
def test_default(self):
'''Create default worker groups with various command line arguments'''
config = get_args('--consume-all --num-workers 1'.split())
group = worker.WorkerGroup(config, host_type='DEFAULT', workers_str=None)
self.assertEqual(len(group.workers), 1)
self.assertEqual(group.workers[0].num_nodes, 1)
self.assertEqual(group.workers[0].max_ranks_per_node, 1)
config = get_args('--consume-all --num-workers 3 --max-ranks-per-node 4'.split())
group = worker.WorkerGroup(config, host_type='DEFAULT', workers_str=None)
self.assertEqual(len(group.workers), 3)
self.assertEqual(group.workers[0].num_nodes, 1)
self.assertEqual(group.workers[0].max_ranks_per_node, 4)
def test_can_read(self):
job = BalsamJob.objects.get(name__icontains="testing!")
self.assertEqual(job.name, "hello_testing!")
def test_cray(self):
'''Construct WorkerGroup from reading Cray environment'''
config = get_args('--consume-all'.split())
if self.scheduler.host_type != 'CRAY':
self.skipTest('scheduler did not recognize Cray environment')
group = worker.WorkerGroup(config, host_type='CRAY',
workers_str=self.scheduler.workers_str)
if self.scheduler.workers_str:
num_worker_env = self.scheduler.SCHEDULER_VARIABLES['num_workers']
self.assertEqual(len(group.workers), int(os.environ[num_worker_env]))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment