Commit 81cc7212 authored by Michael Salim's avatar Michael Salim

functional tests & transition debugging

parent 4f2ae168
......@@ -93,8 +93,7 @@ def from_time_string(s):
return datetime.strptime(s, TIME_FMT)
def history_line(state='CREATED', message=''):
newline = '' if state=='CREATED' else '\n'
return newline + f"[{get_time_string()} {state}] ".rjust(46) + message
return f"\n[{get_time_string()} {state}] ".rjust(46) + message
class BalsamJob(models.Model):
......@@ -322,6 +321,15 @@ auto timeout retry: {self.auto_timeout_retry}
children = self.get_children()
return [c.pk for c in children]
def get_child_by_name(self, name):
children = self.get_children().filter(name=name)
if children.count() == 0:
raise ValueError(f"No child named {name}")
elif children.count() > 1:
raise ValueError(f"More than one child named {name}")
else:
return children.first()
def set_parents(self, parents):
try:
parents_list = list(parents)
......
......@@ -31,6 +31,7 @@ os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
from balsam.models import BalsamJob as _BalsamJob
from django.conf import settings
current_job = None
parents = None
......@@ -48,13 +49,12 @@ if JOB_ID:
current_job = _BalsamJob.objects.get(pk=JOB_ID)
except:
raise RuntimeError(f"The environment specified current job: "
"BALSAM_JOB_ID {JOB_ID}\n but this does not "
"exist in DB! Was it deleted accidentally?")
f"BALSAM_JOB_ID {JOB_ID}\n but this does not "
f"exist in DB! Was it deleted accidentally?")
else:
parents = current_job.get_parents()
children = current_job.get_children()
def add_job(**kwargs):
'''Add a new job to BalsamJob DB'''
job = _BalsamJob()
......@@ -65,6 +65,8 @@ def add_job(**kwargs):
raise ValueError(f"Invalid field {k}")
else:
setattr(job, k, v)
if 'allowed_work_sites' not in kwargs:
job.allowed_work_sites = settings.BALSAM_SITE
job.save()
return job
......@@ -116,6 +118,7 @@ def spawn_child(clone=False, **kwargs):
child = add_job(**kwargs)
add_dependency(current_job, child)
child.update_state("CREATED", f"spawned by {current_job.cute_id}")
return child
def kill(job, recursive=False):
......
......@@ -7,6 +7,8 @@ import os
from io import StringIO
from traceback import print_exc
import sys
import subprocess
import tempfile
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
......@@ -39,6 +41,7 @@ else:
LockClass = DummyLock
PREPROCESS_TIMEOUT_SECONDS = 300
POSTPROCESS_TIMEOUT_SECONDS = 300
SITE = settings.BALSAM_SITE
StatusMsg = namedtuple('StatusMsg', ['pk', 'state', 'msg'])
......@@ -61,6 +64,7 @@ def main(job_queue, status_queue, lock):
try:
transition_function(job, lock)
except BalsamTransitionError as e:
job.refresh_from_db()
lock.acquire()
job.update_state('FAILED', str(e))
lock.release()
......@@ -237,6 +241,7 @@ def stage_out(job, lock):
base = os.path.basename(f)
dst = os.path.join(stagingdir, base)
os.link(src=f, dst=dst)
logger.info(f"staging {f} out for transfer")
logger.info(f"transferring to {url_out}")
transfer.stage_out(f"{stagingdir}/", f"{url_out}/")
except Exception as e:
......@@ -268,7 +273,7 @@ def preprocess(job, lock):
lock.release()
logger.info(f"{job.cute_id} no preprocess: skipped")
return
if not os.path.exists(preproc_app):
if not all(os.path.exists(p) for p in preproc_app.split()):
#TODO: look for preproc in the EXE directories
message = f"Preprocessor {preproc_app} does not exist on filesystem"
raise BalsamTransitionError(message)
......@@ -278,7 +283,7 @@ def preprocess(job, lock):
# Run preprocesser with special environment in job working directory
out = os.path.join(job.working_directory, f"preprocess.log")
with open(out, 'wb') as fp:
with open(out, 'w') as fp:
fp.write(f"# Balsam Preprocessor: {preproc_app}")
try:
args = preproc_app.split()
......@@ -291,6 +296,8 @@ def preprocess(job, lock):
message = f"Preprocess failed: {e}"
proc.kill()
raise BalsamTransitionError(message) from e
job.refresh_from_db()
if retcode != 0:
tail = get_tail(out)
message = f"{job.cute_id} preprocess returned {retcode}:\n{tail}"
......@@ -340,7 +347,7 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
logger.info(f'{job.cute_id} no postprocess: skipped')
return
if not os.path.exists(postproc_app):
if not all(os.path.exists(p) for p in postproc_app.split()):
#TODO: look for postproc in the EXE directories
message = f"Postprocessor {postproc_app} does not exist on filesystem"
raise BalsamTransitionError(message)
......@@ -349,8 +356,8 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
envs = job.get_envs(timeout=timeout_handling, error=error_handling)
# Run postprocesser with special environment in job working directory
out = os.path.join(job.working_directory, f"postprocess.log.pid{os.getpid()}")
with open(out, 'wb') as fp:
out = os.path.join(job.working_directory, f"postprocess.log")
with open(out, 'w') as fp:
fp.write(f"# Balsam Postprocessor: {postproc_app}\n")
if timeout_handling: fp.write("# Invoked to handle RUN_TIMEOUT\n")
if error_handling: fp.write("# Invoked to handle RUN_ERROR\n")
......@@ -358,7 +365,7 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
try:
args = postproc_app.split()
logger.info(f"{job.cute_id} postprocess Popen {args}")
proc = subprocess.Popen(postproc_app, stdout=fp,
proc = subprocess.Popen(args, stdout=fp,
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=POSTPROCESS_TIMEOUT_SECONDS)
......@@ -366,14 +373,15 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
message = f"Postprocess failed: {e}"
proc.kill()
raise BalsamTransitionError(message) from e
if retcode != 0:
tail = get_tail(out)
message = f"{job.cute_id} postprocess returned {retcode}:\n{tail}"
raise BalsamTransitionError(message)
job.refresh_from_db()
# If postprocessor handled error or timeout, it should have changed job's
# state. If it failed to do this, mark FAILED. Otherwise, POSTPROCESSED.
job.refresh_from_db()
if error_handling and job.state == 'RUN_ERROR':
message = f"{job.cute_id} Error handling didn't fix job state: marking FAILED"
raise BalsamTransitionError(message)
......
......@@ -21,12 +21,11 @@ def cmd_confirmation(message=''):
def newapp(args):
if AppDef.objects.filter(name=args.name).exists():
raise RuntimeError(f"An application named {args.name} exists")
if not os.path.exists(args.executable):
raise RuntimeError(f"Executable {args.executable} not found")
if args.preprocess and not os.path.exists(args.preprocess):
raise RuntimeError(f"Script {args.preprocess} not found")
if args.postprocess and not os.path.exists(args.postprocess):
raise RuntimeError(f"Script {args.postprocess} not found")
for arg in (args.executable,args.preprocess,args.postprocess):
paths = arg.split()
if arg and not all(os.path.exists(p) for p in paths):
raise RuntimeError(f"{path} not found")
app = AppDef()
app.name = args.name
......@@ -129,8 +128,11 @@ def modify(args):
target_type = type(getattr(item, args.attr))
new_value = target_type(args.value)
setattr(item, args.attr, new_value)
item.save()
if args.attr == 'state':
item.update_state(new_value, 'User mutated state from command line')
else:
setattr(item, args.attr, new_value)
item.save()
print(f'{args.obj_type[:-1]} {args.attr} changed to: {new_value}')
......
......@@ -44,7 +44,10 @@ def poll_until_returns_true(function, *, args=(), period=1.0, timeout=12.0):
return result
def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE, num_nodes=1,
ranks_per_node=1, args='', workflow='', envs={}, state='CREATED'):
ranks_per_node=1, args='', workflow='', envs={}, state='CREATED',
url_in='', input_files='', url_out='', stage_out_files='',
post_error_handler=False, post_timeout_handler=False,
auto_timeout_retry=True):
if app and direct_command:
raise ValueError("Cannot have both application and direct command")
......@@ -63,6 +66,16 @@ def create_job(*, name='', app='', direct_command='', site=settings.BALSAM_SITE,
job.workflow = workflow
job.environ_vars = ':'.join(f'{k}={v}' for k,v in envs.items())
job.state = state
job.stage_in_url = url_in
job.input_files = input_files
job.stage_out_url = url_out
job.stage_out_files = stage_out_files
job.post_error_handler = post_error_handler
job.post_timeout_handler = post_timeout_handler
job.auto_timeout_retry = auto_timeout_retry
job.save()
job.create_working_path()
return job
......
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import os
import random
import argparse
import time
import sys
parser = argparse.ArgumentParser()
parser.add_argument('--sleep', type=int, default=0)
parser.add_argument('--retcode', type=int, default=0)
args = parser.parse_args()
time.sleep(args.sleep)
num_sides = int(os.environ['BALSAM_FT_NUM_SIDES'])
num_ranks = int(os.environ['BALSAM_FT_NUM_RANKS'])
if num_ranks > 1:
from mpi4py import MPI
COMM = MPI.COMM_WORLD
rank = COMM.Get_rank()
print(f"Rank {rank}")
else:
print("Rank 0")
rank = 0
if rank == 0:
for i in range(num_sides):
side_length = random.uniform(0.5,5)
with open(f"side{i}.dat", 'w') as fp:
fp.write(str(side_length) + "\n")
sys.exit(args.retcode)
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import balsamlauncher.dag as dag
import glob
import sys
import os
current_job = dag.current_job
print("Hello from make_sides_post")
if dag.ERROR:
print("make_sides_post recognized error flag")
num_sides = int(os.environ['BALSAM_FT_NUM_SIDES'])
num_files = len(glob.glob("side*.dat"))
assert num_files == num_sides
print("it's okay, the job was actually done")
current_job.update_state("JOB_FINISHED", "handled error; it was okay")
exit(0)
elif dag.TIMEOUT:
print("make_sides_post recognized timeout flag")
num_files = len(glob.glob("side*.dat"))
assert num_files == 0
print("Creating rescue job")
dag.spawn_child(clone=True,
application_args="--sleep 0 --retcode 0")
current_job.update_state("JOB_FINISHED", "spawned rescue job")
exit(0)
elif '--dynamic-spawn' not in sys.argv:
sys.exit(0)
reduce_job = current_job.get_child_by_name('sum_squares')
for i, sidefile in enumerate(glob.glob("side*.dat")):
square_job = dag.spawn_child(name=f"square{i}", application="square",
application_args=sidefile, input_files=sidefile)
dag.add_dependency(parent=square_job, child=reduce_job)
print(f"spawned square{i} job")
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import balsamlauncher.dag as dag
import sys
num_sides = sys.argv[1]
num_ranks = sys.argv[2]
current_job = dag.current_job
current_job.environ_vars=f"BALSAM_FT_NUM_SIDES={num_sides}:BALSAM_FT_NUM_RANKS={num_ranks}"
current_job.ranks_per_node = num_ranks
current_job.save(update_fields=['environ_vars', 'ranks_per_node'])
print("Hello from make_sides_pre")
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import sys
total = 0.0
for fname in sys.argv[1:]:
area = float(open(fname).read())
total += area
print("Total area:", total)
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import balsamlauncher.dag as dag
print("Hello from reduce_post")
for i in range(5):
with open(f"summary{i}.dat", 'w') as fp:
fp.write("test\n")
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import glob
import balsamlauncher.dag as dag
square_files = glob.glob("square*.dat*")
job = dag.current_job
job.application_args=" ".join(square_files)
print(f"Have {len(square_files)} squares to sum")
job.save(update_fields=['application_args'])
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import argparse
import time
import sys
print("Hello from square")
parser = argparse.ArgumentParser()
parser.add_argument('infile')
parser.add_argument('--sleep', type=int, default=0)
parser.add_argument('--retcode', type=int, default=0)
args = parser.parse_args()
side_length = float(open(args.infile).read())
with open('square.dat', 'w') as fp:
square = side_length**2
fp.write(str(square) + "\n")
time.sleep(args.sleep)
sys.exit(args.retcode)
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import balsamlauncher.dag as dag
print("hello from square_post")
print(f"jobid: {dag.current_job.pk}")
if dag.ERROR:
print("recgonized error")
dag.current_job.update_state("JOB_FINISHED", "handled error in square_post")
if dag.TIMEOUT:
print("recgonized timeout")
dag.current_job.update_state("JOB_FINISHED", "handled timeout in square_post")
#!/Users/misha/anaconda3/envs/testmpi/bin/python
import balsamlauncher.dag as dag
import glob
print("hello from square_pre")
print(f"jobid: {dag.current_job.pk}")
if not dag.current_job.application_args:
print("no input file set for this job. searching workdir...")
infile = glob.glob("side*.dat*")[0]
dag.current_job.application_args += infile
dag.current_job.save()
print("set square.py input to", infile)
from collections import namedtuple
import os
import random
from multiprocessing import Lock
import sys
import signal
import time
from uuid import UUID
import subprocess
import tempfile
from importlib.util import find_spec
from balsam.models import BalsamJob
from tests.BalsamTestCase import BalsamTestCase, cmdline
from tests.BalsamTestCase import poll_until_returns_true
from tests.BalsamTestCase import create_job, create_app
from django.conf import settings
from balsam.schedulers import Scheduler
from balsam.models import BalsamJob
BALSAM_TEST_DIR = os.environ['BALSAM_TEST_DIRECTORY']
def run_launcher_until(function):
launcher_proc = subprocess.Popen(['balsam', 'launcher', '--consume'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid)
success = poll_until_returns_true(function, timeout=20)
# WEIRDEST BUG IN TESTING IF YOU OMIT THE FOLLOIWNG STATEMENT!
# launcher_proc.terminate() doesn't work; the process keeps on running and
# then you have two launchers from different test cases processing the same
# job...Very hard to catch bug.
os.killpg(os.getpgid(launcher_proc.pid), signal.SIGTERM) # Send the signal to all the process groups
launcher_proc.wait(timeout=10)
return success
from balsamlauncher import worker
from balsamlauncher import runners
from balsamlauncher.launcher import get_args, create_new_runners
class TestSingleJobTransitions(BalsamTestCase):
def setUp(self):
scheduler = Scheduler.scheduler_main
self.host_type = scheduler.host_type
if self.host_type == 'DEFAULT':
config = get_args('--consume-all --num-workers 1 --max-ranks-per-node 4'.split())
else:
config = get_args('--consume-all'.split())
aliases = "make_sides square reduce".split()
self.apps = {}
for name in aliases:
interpreter = sys.executable
exe_path = interpreter + " " + find_spec(f'tests.ft_apps.{name}').origin
pre_path = interpreter + " " + find_spec(f'tests.ft_apps.{name}_pre').origin
post_path = interpreter + " " + find_spec(f'tests.ft_apps.{name}_post').origin
app = create_app(name=name, executable=exe_path, preproc=pre_path,
postproc=post_path)
self.apps[name] = app
self.worker_group = worker.WorkerGroup(config, host_type=self.host_type,
workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file)
def test_one_job_normal(self):
'''normal processing of a single job'''
# A mock "remote" data source has a file side0.dat
# This file contains the side length of a square: 9
remote_dir = tempfile.TemporaryDirectory(prefix="remote")
remote_path = os.path.join(remote_dir.name, 'side0.dat')
with open(remote_path, 'w') as fp:
fp.write('9\n')
app_path = f"{sys.executable} {find_spec('tests.mock_serial_app').origin}"
self.app = create_app(name="mock_serial", executable=app_path,
preproc='', postproc='', envs={})
job = create_job(name='square_testjob', app='square',
url_in=f'local:{remote_dir.name}', stage_out_files='square*',
url_out=f'local:{remote_dir.name}',
args='')
def test_one_job_normal(self):
job = create_job(name='test', app=self.app.name)
# Sanity check test case isolation
self.assertEquals(job.state, 'CREATED')
self.assertEqual(job.application_args, '')
self.assertEqual(BalsamJob.objects.all().count(), 1)
# Run the launcher and make sure that the job gets carried all the way
# through to completion
def check():
job.refresh_from_db()
return job.state == 'JOB_FINISHED'
success = run_launcher_until(check)
self.assertTrue(success)
work_dir = job.working_directory
# job staged in this remote side0.dat file; it's really here now
staged_in_file = os.path.join(work_dir, 'side0.dat')
self.assertTrue(os.path.exists(staged_in_file))
# And it contains "9"
staged_in_file_contents = open(staged_in_file).read()
self.assertIn('9\n', staged_in_file_contents)
# Preprocess script actually ran:
preproc_out = os.path.join(work_dir, 'preprocess.log')
self.assertTrue(os.path.exists(preproc_out))
preproc_out_contents = open(preproc_out).read()
# Preprocess inherited the correct job from the environment:
jobid_line = [l for l in preproc_out_contents.split('\n') if 'jobid' in l][0]
self.assertIn(str(job.pk), jobid_line)
# Preprocess recgonized the side0.dat file
# And it altered the job application_args accordingly:
self.assertIn('set square.py input to side0.dat', preproc_out_contents)
self.assertIn('side0.dat', job.application_args)
# application stdout was written to the job's .out file
app_stdout = os.path.join(work_dir, 'square_testjob.out')
self.assertTrue(os.path.exists(app_stdout))
self.assertIn("Hello from square", open(app_stdout).read())
# the square.py app wrote its result to square.dat
app_outfile = os.path.join(work_dir, 'square.dat')
self.assertTrue(os.path.exists(app_outfile))
# The result of squaring 9 is 81
result = float(open(app_outfile).read())
self.assertEqual(result, 81.0)
# the job finished normally, so square_post.py just said hello
post_outfile = os.path.join(work_dir, 'postprocess.log')
self.assertTrue(os.path.exists(post_outfile))
post_contents = open(post_outfile).read()
jobid_line = [l for l in post_contents.split('\n') if 'jobid' in l][0]
self.assertIn(str(job.pk), jobid_line)
self.assertIn('hello from square_post', post_contents)
# After stage out, the remote directory contains two new files
# That matched the pattern square* ....
# square.dat and square_testjob.out
remote_square = os.path.join(remote_dir.name, 'square.dat')
remote_stdout = os.path.join(remote_dir.name, 'square_testjob.out')
self.assertTrue(os.path.exists(remote_square))
self.assertTrue(os.path.exists(remote_stdout))
result_remote = float(open(remote_square).read())
self.assertEquals(result_remote, 81.0)
self.assertIn("Hello from square", open(remote_stdout).read())
def test_one_job_error_unhandled(self):
'''test unhandled return code from app'''
remote_dir = tempfile.TemporaryDirectory(prefix="remote")
remote_path = os.path.join(remote_dir.name, 'side0.dat')
with open(remote_path, 'w') as fp:
fp.write('9\n')
# Same as previous test, but square.py returns nonzero
job = create_job(name='square_testjob2', app='square',
args='side0.dat --retcode 1',
url_in=f'local:{remote_dir.name}', stage_out_files='square*',
url_out=f'local:{remote_dir.name}')
self.assertEqual(job.application_args, 'side0.dat --retcode 1')
self.assertEqual(BalsamJob.objects.all().count(), 1)
# The job is marked FAILED due to unhandled nonzero return code
def check():
job.refresh_from_db()
return job.state == 'FAILED'
success = run_launcher_until(check)
self.assertTrue(success)
# (But actually the application ran and printed its result correctly)
work_dir = job.working_directory
out_path = os.path.join(work_dir, 'square.dat')
result = float(open(out_path).read())
self.assertEqual(result, 81.0)
preproc_out = os.path.join(work_dir, 'preprocess.log')
self.assertTrue(os.path.exists(preproc_out))
preproc_out_contents = open(preproc_out).read()
jobid_line = [l for l in preproc_out_contents.split('\n') if 'jobid' in l][0]
self.assertIn(str(job.pk), jobid_line)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment