Commit 4f2ae168 authored by Michael Salim's avatar Michael Salim
Browse files

factor out create_job in tests

parent cb782234
......@@ -4,6 +4,12 @@ necessary. RunnerGroup has a collection of Runner objects, logic for creating
the next Runner (i.e. assigning jobs to nodes), and the public interface to
monitor runners'''
# TODO: "balsam qsub" is misleading because you can't qsub a script that calls
# mpirun. Implement a --mode script option that uses a "ScriptRunner". The
# ScriptRunner should parse the script to make sure it is not using more nodes
# than requested by job; and perhaps modify each mpirun with the correct
# workers_string
import functools
from math import ceil
import os
......
......@@ -7,6 +7,8 @@ from django.core.management import call_command
from django import db
from django.conf import settings
from balsam.models import BalsamJob, ApplicationDefinition
class BalsamTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
......@@ -40,3 +42,40 @@ def poll_until_returns_true(function, *, args=(), period=1.0, timeout=12.0):
if result: break
else: time.sleep(period)
return result
def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE, num_nodes=1,
ranks_per_node=1, args='', workflow='', envs={}, state='CREATED'):
if app and direct_command:
raise ValueError("Cannot have both application and direct command")
job = BalsamJob()
job.name = name
job.application = app
job.direct_command = direct_command
job.allowed_work_sites = site
job.num_nodes = num_nodes
job.ranks_per_node = ranks_per_node
job.application_args = args
job.workflow = workflow
job.environ_vars = ':'.join(f'{k}={v}' for k,v in envs.items())
job.state = state
job.save()
job.create_working_path()
return job
def create_app(*, name='', description='', executable='', preproc='',
postproc='', envs={}):
app = ApplicationDefinition()
app.name = name
app.description = description
app.executable = executable
app.default_preprocess = preproc
app.default_postprocess = postproc
app.environ_vars = ':'.join(f'{k}={v}' for k,v in envs.items())
app.save()
return app
......@@ -8,15 +8,33 @@ from uuid import UUID
from importlib.util import find_spec
from tests.BalsamTestCase import BalsamTestCase, cmdline
from tests.BalsamTestCase import poll_until_returns_true
from tests.BalsamTestCase import create_job, create_app
from django.conf import settings
from balsam.schedulers import Scheduler
from balsam.models import BalsamJob, ApplicationDefinition
from balsam.models import BalsamJob
from balsamlauncher import worker
from balsamlauncher import runners
from balsamlauncher.launcher import get_args, create_new_runners
class TestSingleJobTransitions(BalsamTestCase):
pass
def setUp(self):
scheduler = Scheduler.scheduler_main
self.host_type = scheduler.host_type
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 1 --max-ranks-per-node 4'.split())
else:
config = get_args('--consume-all'.split())
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file)
app_path = f"{sys.executable} {find_spec('tests.mock_serial_app').origin}"
self.app = create_app(name="mock_serial", executable=app_path,
preproc='', postproc='', envs={})
def test_one_job_normal(self):
job = create_job(name='test', app=self.app.name)
import random
import tempfile
from tests.BalsamTestCase import BalsamTestCase, cmdline
from tests.BalsamTestCase import BalsamTestCase, cmdline, create_job
from balsamlauncher import jobreader
from balsamlauncher.launcher import get_args
from balsam.models import BalsamJob
......@@ -14,12 +14,10 @@ class JobReaderTests(BalsamTestCase):
self.NUM_JOBS = 128
self.workflows = ['one', 'two', 'three']
sites = f"siteA siteB {BALSAM_SITE} siteD"
for i in range(self.NUM_JOBS):
job = BalsamJob()
job.name = f"job{i}"
job.allowed_work_sites = f"siteA siteB {BALSAM_SITE} siteD"
job.workflow = random.choice(self.workflows)
job.save()
wf = random.choice(self.workflows)
create_job(name=f"job{i}", site=sites, workflow=wf)
def test_consume_all_reader(self):
'''consume-all job reader should retreive all'''
......
......@@ -8,11 +8,12 @@ from uuid import UUID
from importlib.util import find_spec
from tests.BalsamTestCase import BalsamTestCase, cmdline
from tests.BalsamTestCase import poll_until_returns_true
from tests.BalsamTestCase import create_job, create_app
from django.conf import settings
from balsam.schedulers import Scheduler
from balsam.models import BalsamJob, ApplicationDefinition
from balsam.models import BalsamJob
from balsamlauncher import worker
from balsamlauncher import runners
......@@ -27,18 +28,15 @@ class TestMPIRunner(BalsamTestCase):
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 1 --max-ranks-per-node 4'.split())
else:
config = get_args('--consume-all')
config = get_args('--consume-all'.split())
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file)
app_path = f"{sys.executable} {find_spec('tests.mock_mpi_app').origin}"
self.app = ApplicationDefinition()
self.app.name = "mock_mpi"
self.app.description = "print and sleep"
self.app.executable = app_path
self.app.save()
self.app = create_app(name="mock_mpi",description="print and sleep",
executable=app_path)
# Test various worker configurations:
self.work_configs = []
......@@ -69,15 +67,9 @@ class TestMPIRunner(BalsamTestCase):
def test_normal(self):
'''MPI application runs, returns 0, marked RUN_DONE'''
for i, (workerslist, num_nodes, rpn) in enumerate(self.work_configs):
job = BalsamJob()
job.name = f"test{i}"
job.application = self.app.name
job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = num_nodes
job.ranks_per_node = rpn
job.save()
job = create_job(name=f"test{i}", app=self.app.name,
num_nodes=num_nodes, ranks_per_node=rpn)
self.assertEquals(job.state, 'CREATED')
job.create_working_path()
runner = runners.MPIRunner([job], workerslist)
......@@ -104,17 +96,11 @@ class TestMPIRunner(BalsamTestCase):
def test_return_nonzero(self):
'''MPI application runs, return 255, marked RUN_ERROR'''
for i, (workerslist, num_nodes, rpn) in enumerate(self.work_configs):
job = BalsamJob()
job.name = f"test{i}"
job.application = self.app.name
job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = num_nodes
job.ranks_per_node = rpn
job.application_args = '--retcode 255' # FAIL
job.save()
job = create_job(name=f"test{i}", app=self.app.name,
num_nodes=num_nodes, ranks_per_node=rpn,
args='--retcode 255')
self.assertEquals(job.state, 'CREATED')
job.create_working_path()
runner = runners.MPIRunner([job], workerslist)
runner.start()
......@@ -125,17 +111,11 @@ class TestMPIRunner(BalsamTestCase):
def test_timeouts(self):
'''MPI application runs for too long, call timeout, marked RUN_TIMEOUT'''
for i, (workerslist, num_nodes, rpn) in enumerate(self.work_configs):
job = BalsamJob()
job.name = f"test{i}"
job.application = self.app.name
job.allowed_work_sites = settings.BALSAM_SITE
job.num_nodes = num_nodes
job.ranks_per_node = rpn
job.application_args = '--sleep 10' # runs for too long
job.save()
job = create_job(name=f"test{i}", app=self.app.name,
num_nodes=num_nodes, ranks_per_node=rpn,
args='--sleep 10')
self.assertEquals(job.state, 'CREATED')
job.create_working_path()
runner = runners.MPIRunner([job], workerslist)
# job starts running; sleeps for 10 seconds
......@@ -164,18 +144,15 @@ class TestMPIEnsemble(BalsamTestCase):
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 1 --max-ranks-per-node 4'.split())
else:
config = get_args('--consume-all')
config = get_args('--consume-all'.split())
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file)
app_path = f"{sys.executable} {find_spec('tests.mock_serial_app').origin}"
self.app = ApplicationDefinition()
self.app.name = "mock_serial"
self.app.description = "square a number"
self.app.executable = app_path
self.app.save()
self.app = create_app(name="mock_serial", description="square a number",
executable=app_path)
def test_MPIEnsembleRunner(self):
'''Several non-MPI jobs packaged into one mpi4py wrapper'''
......@@ -194,16 +171,15 @@ class TestMPIEnsemble(BalsamTestCase):
}
for jobtype in jobs:
for i in range(num_jobs_per_type):
job = BalsamJob()
job.allowed_work_sites = settings.BALSAM_SITE
job.name = f"{jobtype}{i}"
if jobtype == 'qsub':
job.direct_command = f'echo hello world {i}'
cmd = f'echo hello world {i}'
app, appargs = '', ''
else:
job.application = self.app.name
job.application_args = f"{i} {args[jobtype]}"
job.save()
job.create_working_path()
cmd = ''
app, appargs = self.app.name, f"{i} {args[jobtype]}"
job = create_job(name=f"{jobtype}{i}", app=app,
direct_command=cmd, args=appargs)
jobs[jobtype].append(job)
shuffled_jobs = [j for joblist in jobs.values() for j in joblist]
......@@ -249,25 +225,19 @@ class TestRunnerGroup(BalsamTestCase):
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 1 --max-ranks-per-node 8'.split())
else:
config = get_args('--consume-all')
config = get_args('--consume-all'.split())
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file)
app_path = f"{sys.executable} {find_spec('tests.mock_mpi_app').origin}"
self.mpiapp = ApplicationDefinition()
self.mpiapp.name = "mock_mpi"
self.mpiapp.description = "print and sleep"
self.mpiapp.executable = app_path
self.mpiapp.save()
self.mpiapp = create_app(name="mock_mpi", description="print and sleep",
executable=app_path)
app_path = f"{sys.executable} {find_spec('tests.mock_serial_app').origin}"
self.serialapp = ApplicationDefinition()
self.serialapp.name = "mock_serial"
self.serialapp.description = "square a number"
self.serialapp.executable = app_path
self.serialapp.save()
self.serialapp = create_app(name="mock_serial", description="square a"
" number", executable=app_path)
def test_create_runners(self):
......@@ -287,26 +257,16 @@ class TestRunnerGroup(BalsamTestCase):
# Create a big shuffled assortment of jobs
runner_group = runners.RunnerGroup(Lock())
for i in range(num_serialjobs):
job = BalsamJob()
job.allowed_work_sites = settings.BALSAM_SITE
job.name = f"serial{i}"
job.application = self.serialapp.name
job.application_args = str(i)
job.state = 'PREPROCESSED'
job.save()
job.create_working_path()
job = create_job(name=f"serial{i}", app=self.serialapp.name,
args=str(i), state='PREPROCESSED')
serialjobs.append(job)
for i in range(num_mpijobs):
job = BalsamJob()
job.allowed_work_sites = settings.BALSAM_SITE
job.name = f"mpi{i}"
job.application = self.mpiapp.name
job.num_nodes = random.randint(1,num_nodes)
job.ranks_per_node = random.randint(2, max_rpn)
job.state = 'PREPROCESSED'
job.save()
job.create_working_path()
job = create_job(name=f"mpi{i}", app=self.mpiapp.name,
num_nodes=random.randint(1, num_nodes),
ranks_per_node=random.randint(2, max_rpn),
state='PREPROCESSED')
mpijobs.append(job)
all_jobs = serialjobs + mpijobs
random.shuffle(all_jobs)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment