Commit cb782234 authored by Michael Salim's avatar Michael Salim
Browse files

cleaned up logging and a little refactoring

parent 5fefa6e5
......@@ -301,7 +301,10 @@ auto timeout retry: {self.auto_timeout_retry}
@property
def cute_id(self):
return f"[{ str(self.pk)[:8] }]"
if self.name:
return f"[{self.name} | { str(self.pk)[:8] }]"
else:
return f"[{ str(self.pk)[:8] }]"
@property
def app_cmd(self):
......@@ -486,4 +489,4 @@ Envs: {self.environ_vars}
@property
def cute_id(self):
return f"[{ str(self.pk)[:8] }]"
return f"[{self.name} | { str(self.pk)[:8] }]"
......@@ -77,13 +77,14 @@ def create_new_runners(jobs, runner_group, worker_group):
def main(args, transition_pool, runner_group, job_source):
delay_timer = delay()
elapsed_min = elapsed_time_minutes()
if args.time_limit_minutes > 0:
timeout = lambda : elapsed_time_minutes() >= args.time_limit_minutes
timeout = lambda : next(elapsed_min) >= args.time_limit_minutes
else:
timeout = lambda : scheduler.remaining_time_seconds() <= 0.0
while not timeout():
logger.debug("\n******************\n"
logger.info("\n******************\n"
"BEGIN SERVICE LOOP\n"
"******************")
wait = True
......
......@@ -10,9 +10,9 @@ logger = logging.getLogger('balsamlauncher.mpi_ensemble')
from subprocess import Popen, STDOUT
from mpi4py import MPI
from balsamlauncher.cd import cd
from balsamlauncher.util import cd, get_tail
from balsamlauncher.exceptions import *
from balsamlauncher.runners import get_tail
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
......
......@@ -21,7 +21,7 @@ from django.db import transaction
import balsam.models
from balsamlauncher import mpi_commands
from balsamlauncher.exceptions import *
from balsamlauncher import cd
from balsamlauncher.util import cd, get_tail
import logging
logger = logging.getLogger(__name__)
......@@ -29,15 +29,6 @@ logger = logging.getLogger(__name__)
from importlib.util import find_spec
MPI_ENSEMBLE_EXE = find_spec("balsamlauncher.mpi_ensemble").origin
def get_tail(fname, nlines=5, indent=' '):
proc = Popen(f'tail -n {nlines} {fname}'.split(),stdout=PIPE,
stderr=STDOUT)
tail = proc.communicate()[0].decode()
lines = tail.split('\n')
for i, line in enumerate(lines[:]):
lines[i] = indent + line
return '\n'.join(lines)
class MonitorStream(Thread):
'''Thread: non-blocking read of a process's stdout'''
......@@ -122,19 +113,19 @@ class MPIRunner(Runner):
self.popen_args['stdout'] = self.outfile
self.popen_args['stderr'] = STDOUT
self.popen_args['bufsize'] = 1
logger.info(f"MPI Runner Popen args: {self.popen_args['args']}")
logger.info(f"MPI Runner: writing output to {outname}")
logger.info(f"MPIRunner {job.cute_id} Popen:\n{self.popen_args['args']}")
logger.info(f"MPIRunner: writing output to {outname}")
def update_jobs(self):
job = self.jobs[0]
#job.refresh_from_db() # TODO: handle RecordModified
retcode = self.process.poll()
if retcode == None:
logger.debug(f"MPI Job {job.cute_id} still running")
logger.debug(f"MPIRunner {job.cute_id} still running")
curstate = 'RUNNING'
msg = ''
elif retcode == 0:
logger.debug(f"MPI Job {job.cute_id} return code 0: done")
logger.info(f"MPIRunner {job.cute_id} return code 0: done")
curstate = 'RUN_DONE'
msg = ''
else:
......@@ -142,8 +133,8 @@ class MPIRunner(Runner):
self.process.communicate()
self.outfile.close()
tail = get_tail(self.outfile.name)
msg = f"RETURN CODE {retcode}:\n{tail}"
logger.debug(msg)
msg = f"MPIRunner {job.cute_id} RETURN CODE {retcode}:\n{tail}"
logger.info(msg)
if job.state != curstate: job.update_state(curstate, msg) # TODO: handle RecordModified
......@@ -168,6 +159,9 @@ class MPIEnsembleRunner(Runner):
cmd = job.app_cmd
fp.write(f"{job.pk} {job.working_directory} {cmd}\n")
logger.info('MPIEnsemble handling jobs: '
f' {", ".join(j.cute_id for j in self.jobs)} '
)
rpn = worker_list[0].max_ranks_per_node
nranks = sum(w.num_nodes*rpn for w in worker_list)
envs = self.jobs[0].get_envs() # TODO: different envs for each job
......@@ -177,7 +171,7 @@ class MPIEnsembleRunner(Runner):
num_ranks=nranks, ranks_per_node=rpn)
self.popen_args['args'] = shlex.split(mpi_str)
logger.info(f"MPI Ensemble Popen args: {self.popen_args['args']}")
logger.info(f"MPIEnsemble Popen:\n {self.popen_args['args']}")
def update_jobs(self):
'''Relies on stdout of mpi_ensemble.py'''
......@@ -195,7 +189,7 @@ class MPIEnsembleRunner(Runner):
if pk in self.jobs_by_pk and state in balsam.models.STATES:
job = self.jobs_by_pk[pk]
job.update_state(state, msg) # TODO: handle RecordModified exception
logger.debug(f"Job {job.cute_id} updated to {state}: {msg}")
logger.info(f"MPIEnsemble {job.cute_id} updated to {state}: {msg}")
else:
logger.error(f"Invalid statusMsg from mpi_ensemble: {line.strip()}")
......@@ -216,13 +210,15 @@ class RunnerGroup:
idle nodes'''
if len(self.runners) == self.MAX_CONCURRENT_RUNNERS:
logger.info("Cannot create another runner: at max")
raise ExceededMaxRunners(
f"Cannot have more than {self.MAX_CONCURRENT_RUNNERS} simultaneous runners"
)
idle_workers = [w for w in workers if w.idle]
nidle_workers = len(idle_workers)
if nidle_workers == 0:
raise NoAvailableWorkers
nodes_per_worker = workers[0].num_nodes
rpn = workers[0].max_ranks_per_node
assert all(w.num_nodes == nodes_per_worker for w in idle_workers)
......@@ -241,7 +237,7 @@ class RunnerGroup:
largest_mpi_job = (max(mpi_jobs, key=lambda job: job.num_nodes)
if mpi_jobs else None)
if largest_mpi_job:
logger.debug(f"{len(mpi_jobs)} MPI jobs can run; largest takes "
logger.debug(f"{len(mpi_jobs)} MPI jobs can run; largest requires "
f"{largest_mpi_job.num_nodes} nodes")
else:
logger.debug("No MPI jobs can run")
......@@ -270,8 +266,10 @@ class RunnerGroup:
f"totalling {nworkers*nodes_per_worker} nodes "
f"with {rpn} ranks per worker")
if not jobs: raise NoAvailableWorkers
logger.info(msg)
if not jobs:
raise NoAvailableWorkers
else:
logger.info(msg)
runner = runner_class(jobs, assigned_workers)
runner.start()
......
......@@ -15,10 +15,29 @@ from django import db
from common import transfer
from balsamlauncher.exceptions import *
from balsam.models import BalsamJob, NoApplication
from balsamlauncher.util import get_tail
import logging
logger = logging.getLogger('balsamlauncher.transitions')
# SQLite exclusive lock is broken on Windows & OSX; even with two writers, two
# records, and a long timeout, a "database locked" exception is thrown
# immediately. Writers are supposed to queue up, but this correct behavior is
# seen only on Linux. If we are running OSX or Windows, we have to implement
# our own global DB write lock (multiprocessing.Lock object). If concurrent
# DB writes become a bottleneck, we have to go to a DB that supports better
# concurrency -- but SQLite makes it signifcantly easier for users to deploy
# Balsam, because it's built in and requires zero user configuration
if sys.platform.startswith('darwin'):
LockClass = multiprocessing.Lock
elif sys.platform.startswith('win32'):
LockClass = multiprocessing.Lock
else:
class DummyLock:
def acquire(self): pass
def release(self): pass
LockClass = DummyLock
PREPROCESS_TIMEOUT_SECONDS = 300
SITE = settings.BALSAM_SITE
......@@ -38,7 +57,7 @@ def main(job_queue, status_queue, lock):
lock.acquire()
job.save(update_fields=['work_site'])
lock.release()
logger.debug(f"Received transition job {job.cute_id}: {transition_function}")
logger.debug(f"Received job {job.cute_id}: {transition_function}")
try:
transition_function(job, lock)
except BalsamTransitionError as e:
......@@ -49,12 +68,12 @@ def main(job_queue, status_queue, lock):
status_queue.put(s)
buf = StringIO()
print_exc(file=buf)
logger.exception("Caught BalsamTransitionError:\n%s\n", buf.getvalue())
logger.exception(f"Marking job {job.cute_id} as FAILED")
logger.exception(f"{job.cute_id} BalsamTransitionError:\n%s\n", buf.getvalue())
logger.exception(f"Marking {job.cute_id} as FAILED")
except:
buf = StringIO()
print_exc(file=buf)
logger.exception("Uncaught exception in transition:\n%s", buf.getvalue())
logger.critical(f"{job.cute_id} Uncaught exception:\n%s", buf.getvalue())
raise
else:
s = StatusMsg(job.pk, str(job.state), 'success')
......@@ -69,7 +88,7 @@ class TransitionProcessPool:
self.job_queue = multiprocessing.Queue()
self.status_queue = multiprocessing.Queue()
self.lock = multiprocessing.Lock()
self.lock = LockClass()
self.transitions_pk_list = []
self.procs = [
......@@ -113,7 +132,7 @@ class TransitionProcessPool:
def end_and_wait(self):
m = JobMsg('end', None)
logger.info("Sending end message and waiting on transition processes")
logger.debug("Sending end message and waiting on transition processes")
for proc in self.procs:
self.job_queue.put(m)
for proc in self.procs:
......@@ -123,50 +142,50 @@ class TransitionProcessPool:
def check_parents(job, lock):
logger.debug('checking parents of {job.cute_id}')
logger.debug(f'{job.cute_id} in check_parents')
parents = job.get_parents()
ready = all(p.state == 'JOB_FINISHED' for p in parents)
if ready:
lock.acquire()
job.update_state('READY', 'dependencies satisfied')
lock.release()
logger.debug('job ready')
logger.info(f'{job.cute_id} ready')
elif job.state != 'AWAITING_PARENTS':
lock.acquire()
job.update_state('AWAITING_PARENTS', f'{len(parents)} pending jobs')
lock.release()
logger.debug('awaiting parents')
logger.info(f'{job.cute_id} waiting for parents')
def stage_in(job, lock):
# Create workdirs for jobs: use job.create_working_path
logger.debug(f'in stage_in of {job.cute_id}')
logger.debug(f'{job.cute_id} in stage_in')
if not os.path.exists(job.working_directory):
lock.acquire()
job.create_working_path()
lock.release()
work_dir = job.working_directory
logger.debug(f"set {job.cute_id} working directory {work_dir}")
logger.info(f"{job.cute_id} working directory {work_dir}")
# stage in all remote urls
# TODO: stage_in remote transfer should allow a list of files and folders,
# rather than copying just one entire folder
url_in = job.stage_in_url
if url_in:
logger.debug(f"staging into workdir from {url_in}")
logger.info(f"{job.cute_id} transfer in from {url_in}")
try:
transfer.stage_in(f"{url_in}/", f"{work_dir}/")
except Exception as e:
message = 'Exception received during stage_in: ' + str(e)
logger.error(message)
raise BalsamTransitionError from e
raise BalsamTransitionError(message) from e
# create unique symlinks to "input_files" patterns from parents
# TODO: handle data flow from remote sites transparently
matches = []
parents = job.get_parents()
input_patterns = job.input_files.split()
logger.debug(f"{job.cute_id} searching parent workdirs for {input_patterns}")
for parent in parents:
parent_dir = parent.working_directory
for pattern in input_patterns:
......@@ -176,19 +195,34 @@ def stage_in(job, lock):
for parent_pk, inp_file in matches:
basename = os.path.basename(inp_file)
newpath = os.path.join(work_dir, basename)
if os.path.exists(newpath): newpath += f"_{parent_pk}"
if os.path.exists(newpath): newpath += f"_{str(parent_pk)[:8]}"
# pointing to src, named dst
os.symlink(src=inp_file, dst=newpath)
logger.info(f"{job.cute_id} {newpath} --> {inp_file}")
try:
os.symlink(src=inp_file, dst=newpath)
except Exception as e:
raise BalsamTransitionError(
f"Exception received during symlink: {e}") from e
lock.acquire()
job.update_state('STAGED_IN')
lock.release()
logger.info(f"{job.cute_id} stage_in done")
def stage_out(job, lock):
'''copy from the local working_directory to the output_url '''
logger.debug(f'in stage_out of {job.cute_id}')
logger.debug(f'{job.cute_id} in stage_out')
url_out = job.stage_out_url
if not url_out:
lock.acquire()
job.update_state('JOB_FINISHED')
lock.release()
logger.info(f'{job.cute_id} no stage_out_url: done')
return
stage_out_patterns = job.stage_out_files.split()
logger.debug(f"{job.cute_id} stage out files match: {stage_out_patterns}")
work_dir = job.working_directory
matches = []
for pattern in stage_out_patterns:
......@@ -196,24 +230,26 @@ def stage_out(job, lock):
matches.extend(glob.glob(path))
if matches:
logger.info(f"{job.cute_id} stage out files: {matches}")
with tempfile.TemporaryDirectory() as stagingdir:
try:
for f in matches:
base = os.path.basename(f)
dst = os.path.join(stagingdir, base)
os.link(src=f, dst=dst)
transfer.stage_out(f"{stagingdir}/", f"{job.stage_out_url}/")
logger.info(f"transferring to {url_out}")
transfer.stage_out(f"{stagingdir}/", f"{url_out}/")
except Exception as e:
message = 'Exception received during stage_out: ' + str(e)
logger.error(message)
raise BalsamTransitionError from e
message = f'Exception received during stage_out: {e}'
raise BalsamTransitionError(message) from e
lock.acquire()
job.update_state('JOB_FINISHED')
lock.release()
logger.info(f'{job.cute_id} stage_out done')
def preprocess(job, lock):
logger.debug(f'in preprocess of {job.cute_id}')
logger.debug(f'{job.cute_id} in preprocess')
# Get preprocesser exe
preproc_app = job.preprocess
......@@ -223,7 +259,6 @@ def preprocess(job, lock):
preproc_app = app.default_preprocess
except ObjectDoesNotExist as e:
message = f"application {job.application} does not exist"
logger.error(message)
raise BalsamTransitionError(message)
except NoApplication:
preproc_app = None
......@@ -231,45 +266,47 @@ def preprocess(job, lock):
lock.acquire()
job.update_state('PREPROCESSED', 'No preprocess: skipped')
lock.release()
logger.info(f"{job.cute_id} no preprocess: skipped")
return
if not os.path.exists(preproc_app):
#TODO: look for preproc in the EXE directories
message = f"Preprocessor {preproc_app} does not exist on filesystem"
logger.error(message)
raise BalsamTransitionError
raise BalsamTransitionError(message)
# Create preprocess-specific environment
envs = job.get_envs()
# Run preprocesser with special environment in job working directory
out = os.path.join(job.working_directory, f"preprocess.log.pid{os.getpid()}")
out = os.path.join(job.working_directory, f"preprocess.log")
with open(out, 'wb') as fp:
fp.write(f"# Balsam Preprocessor: {preproc_app}")
try:
proc = subprocess.Popen(preproc_app, stdout=fp,
args = preproc_app.split()
logger.info(f"{job.cute_id} preprocess Popen {args}")
proc = subprocess.Popen(args, stdout=fp,
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=PREPROCESS_TIMEOUT_SECONDS)
except Exception as e:
message = f"Preprocess failed: {e}"
logger.error(message)
proc.kill()
raise BalsamTransitionError from e
raise BalsamTransitionError(message) from e
if retcode != 0:
message = f"Preprocess Popen nonzero returncode: {retcode}"
logger.error(message)
tail = get_tail(out)
message = f"{job.cute_id} preprocess returned {retcode}:\n{tail}"
raise BalsamTransitionError(message)
lock.acquire()
job.update_state('PREPROCESSED', f"{os.path.basename(preproc_app)}")
lock.release()
logger.info(f"{job.cute_id} preprocess done")
def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
logger.debug(f'in postprocess of {job.cute_id}')
logger.debug(f'{job.cute_id} in postprocess')
if error_handling and timeout_handling:
raise ValueError("Both error-handling and timeout-handling is invalid")
if error_handling: logger.debug('Handling RUN_ERROR')
if timeout_handling: logger.debug('Handling RUN_TIMEOUT')
if error_handling: logger.info(f'{job.cute_id} handling RUN_ERROR')
if timeout_handling: logger.info(f'{job.cute_id} handling RUN_TIMEOUT')
# Get postprocesser exe
postproc_app = job.postprocess
......@@ -287,26 +324,26 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
# If no postprocesssor; move on (unless in error_handling mode)
if not postproc_app:
if error_handling:
message = "Trying to handle error, but no postprocessor found"
logger.warning(message)
message = f"{job.cute_id} handle error: no postprocessor found!"
raise BalsamTransitionError(message)
elif timeout_handling:
logger.warning('Unhandled job timeout: marking RESTART_READY')
lock.acquire()
job.update_state('RESTART_READY', 'marking for re-run')
lock.release()
logger.warning(f'{job.cute_id} unhandled job timeout: marked RESTART_READY')
return
else:
lock.acquire()
job.update_state('POSTPROCESSED', 'No postprocess: skipped')
job.update_state('POSTPROCESSED',
f'{job.cute_id} no postprocess: skipped')
lock.release()
logger.info(f'{job.cute_id} no postprocess: skipped')
return
if not os.path.exists(postproc_app):
#TODO: look for postproc in the EXE directories
message = f"Postprocessor {postproc_app} does not exist on filesystem"
logger.error(message)
raise BalsamTransitionError
raise BalsamTransitionError(message)
# Create postprocess-specific environment
envs = job.get_envs(timeout=timeout_handling, error=error_handling)
......@@ -319,57 +356,57 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
if error_handling: fp.write("# Invoked to handle RUN_ERROR\n")
try:
args = postproc_app.split()
logger.info(f"{job.cute_id} postprocess Popen {args}")
proc = subprocess.Popen(postproc_app, stdout=fp,
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=POSTPROCESS_TIMEOUT_SECONDS)
except Exception as e:
message = f"Postprocess failed: {e}"
logger.error(message)
proc.kill()
raise BalsamTransitionError from e
raise BalsamTransitionError(message) from e
if retcode != 0:
message = f"Postprocess Popen nonzero returncode: {retcode}"
logger.error(message)
tail = get_tail(out)
message = f"{job.cute_id} postprocess returned {retcode}:\n{tail}"
raise BalsamTransitionError(message)
# If postprocessor handled error or timeout, it should have changed job's
# state. If it failed to do this, mark FAILED. Otherwise, POSTPROCESSED.
job.refresh_from_db()
if error_handling and job.state == 'RUN_ERROR':
message = f"Error handling failed to change job state: marking FAILED"
logger.warning(message)
message = f"{job.cute_id} Error handling didn't fix job state: marking FAILED"
raise BalsamTransitionError(message)
if timeout_handling and job.state == 'RUN_TIMEOUT':
message = f"Timeout handling failed to change job state: marking FAILED"
logger.warning(message)
message = f"{job.cute_id} Timeout handling didn't change job state: marking FAILED"
raise BalsamTransitionError(message)
if not (error_handling or timeout_handling):
lock.acquire()
job.update_state('POSTPROCESSED', f"{os.path.basename(postproc_app)}")
lock.release()
logger.info(f"{job.cute_id} postprocess done")
def handle_timeout(job, lock):
logger.debug(f'in handle_timeout of {job.cute_id}')
logger.debug(f'{job.cute_id} in handle_timeout')
if job.post_timeout_handler:
logger.debug('invoking postprocess with timeout_handling flag')
logger.debug(f'{job.cute_id} invoking postprocess with timeout_handling flag')
postprocess(job, lock, timeout_handling=True)
elif job.auto_timeout_retry:
logger.debug('marking RESTART_READY')
logger.info(f'{job.cute_id} marking RESTART_READY')
lock.acquire()
job.update_state('RESTART_READY', 'timedout: auto retry')
lock.release()
else:
raise BalsamTransitionError("No timeout handling: marking FAILED")
raise BalsamTransitionError(f"{job.cute_id} no timeout handling: marking FAILED")
def handle_run_error(job, lock):
logger.debug(f'in handle_run_error of {job.cute_id}')
logger.debug(f'{job.cute_id} in handle_run_error')
if job.post_error_handler:
logger.debug('invoking postprocess with error_handling flag')
logger.debug(f'{job.cute_id} invoking postprocess with error_handling flag')
postprocess(job, lock, error_handling=True)
else:
raise BalsamTransitionError("No error handler: run failed")
......
from subprocess import Popen, PIPE, STDOUT
import os
def get_tail(fname, nlines=5, indent=' '):
'''grab last nlines of fname and format nicely'''
# Easier to ask OS than implement a proper tail
proc = Popen(f'tail -n {nlines} {fname}'.split(),stdout=PIPE,
stderr=STDOUT)
tail = proc.communicate()[0].decode()
lines = tail.split('\n')
for i, line in enumerate(lines[:]):
lines[i] = indent + line
return '\n'.join(lines)
class cd:
'''Context manager for changing cwd'''
def __init__(self, new_path):
......
import os
import unittest
import subprocess
import time
from django.core.management import call_command
from django import db
......@@ -31,3 +32,11 @@ def cmdline(cmd,envs=None,shell=True):
p = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,env=envs)
return p.communicate()[0].decode('utf-8')
def poll_until_returns_true(function, *, args=(), period=1.0, timeout=12.0):
start = time.time()
while time.time() - start < timeout:
result = function(*args)
if result: break
else: time.sleep(period)
return result
<
from collections import namedtuple
import os
import random
from multiprocessing import Lock
import sys
import time
from uuid import UUID
from importlib.util import find_spec
from tests.BalsamTestCase import BalsamTestCase, cmdline
from tests.BalsamTestCase import poll_until_returns_true
from django.conf import settings
from balsam.schedulers import Scheduler