transitions.py 16.6 KB
Newer Older
Michael Salim's avatar
Michael Salim committed
1 2
'''BalsamJob pre and post execution'''
from collections import namedtuple
Michael Salim's avatar
Michael Salim committed
3 4
import glob
import multiprocessing
5
import multiprocessing.managers
Michael Salim's avatar
Michael Salim committed
6
import queue
7 8 9
import os
from io import StringIO
from traceback import print_exc
Michael Salim's avatar
Michael Salim committed
10
import signal
11
import shutil
12
import sys
13 14
import subprocess
import tempfile
15 16

from django.core.exceptions import ObjectDoesNotExist
Michael Salim's avatar
Michael Salim committed
17
from django.conf import settings
Michael Salim's avatar
Michael Salim committed
18
from django import db
19

20 21 22 23
from balsam.common import transfer 
from balsam.launcher.exceptions import *
from balsam.service.models import BalsamJob, NoApplication
from balsam.launcher.util import get_tail
24 25

import logging
26
logger = logging.getLogger('balsam.launcher.transitions')
Michael Salim's avatar
Michael Salim committed
27

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
# SQLite exclusive lock is broken on Windows & OSX; even with two writers, two
# records, and a long timeout, a "database locked" exception is thrown
# immediately.  Writers are supposed to queue up, but this correct behavior is
# seen only on Linux.  If we are running OSX or Windows, we have to implement
# our own global DB write lock (multiprocessing.Lock object). If concurrent
# DB writes become a bottleneck, we have to go to a DB that supports better
# concurrency -- but SQLite makes it signifcantly easier for users to deploy
# Balsam, because it's built in and requires zero user configuration
if sys.platform.startswith('darwin'):
    LockClass = multiprocessing.Lock
elif sys.platform.startswith('win32'):
    LockClass = multiprocessing.Lock
else:
    class DummyLock:
        def acquire(self): pass
        def release(self): pass
    LockClass = DummyLock
45 46
LockClass = multiprocessing.Lock # TODO: replace with better solution!
logger.debug(f'Using lock: {LockClass}')
47

Michael Salim's avatar
Michael Salim committed
48
PREPROCESS_TIMEOUT_SECONDS = 300
49
POSTPROCESS_TIMEOUT_SECONDS = 300
Michael Salim's avatar
Michael Salim committed
50
SITE = settings.BALSAM_SITE
Michael Salim's avatar
Michael Salim committed
51

52
StatusMsg = namedtuple('StatusMsg', ['pk', 'state', 'msg'])
Michael Salim's avatar
Michael Salim committed
53

54 55
def on_exit():
    logger.debug("TransitionProc caught SIGTERM: do nothing and wait for end")
Michael Salim's avatar
Michael Salim committed
56

57
def main(job_queue, status_queue, lock):
58
    handler = lambda a,b: on_exit()
Michael Salim's avatar
Michael Salim committed
59 60 61
    signal.signal(signal.SIGINT, handler)
    signal.signal(signal.SIGTERM, handler)

Michael Salim's avatar
Michael Salim committed
62
    while True:
63
        logger.debug("Transition process waiting for job")
Michael Salim's avatar
Michael Salim committed
64
        job_msg = job_queue.get()
65
        priority, jobid = job_msg
66
        
67
        if jobid == 'end':
Michael Salim's avatar
Michael Salim committed
68 69
            logger.debug("Received end..quitting transition loop")
            return
70 71 72 73
        else:
            job = BalsamJob.objects.get(pk=jobid)
            transition_function = TRANSITIONS[job.state]

Michael Salim's avatar
Michael Salim committed
74 75
        if job.work_site != SITE:
            job.work_site = SITE
76
            lock.acquire()
Michael Salim's avatar
Michael Salim committed
77
            job.save(update_fields=['work_site'])
78
            lock.release()
79
        logger.debug(f"Received job {job.cute_id}: {transition_function} (priority {priority})")
Michael Salim's avatar
Michael Salim committed
80
        try:
81
            transition_function(job, lock)
Michael Salim's avatar
Michael Salim committed
82
        except BalsamTransitionError as e:
83
            job.refresh_from_db()
84
            lock.acquire()
Michael Salim's avatar
Michael Salim committed
85
            job.update_state('FAILED', str(e))
86
            lock.release()
87

Michael Salim's avatar
Michael Salim committed
88 89
            s = StatusMsg(job.pk, 'FAILED', str(e))
            status_queue.put(s)
90 91
            buf = StringIO()
            print_exc(file=buf)
92 93
            logger.exception(f"{job.cute_id} BalsamTransitionError:\n%s\n", buf.getvalue())
            logger.exception(f"Marking {job.cute_id} as FAILED")
94 95 96
        except:
            buf = StringIO()
            print_exc(file=buf)
97
            logger.critical(f"{job.cute_id} Uncaught exception:\n%s", buf.getvalue())
98
            raise
Michael Salim's avatar
Michael Salim committed
99
        else:
100
            s = StatusMsg(job.pk, str(job.state), 'success')
Michael Salim's avatar
Michael Salim committed
101 102
            status_queue.put(s)

Michael Salim's avatar
Michael Salim committed
103

Michael Salim's avatar
Michael Salim committed
104
class TransitionProcessPool:
Michael Salim's avatar
Michael Salim committed
105
    
Michael Salim's avatar
Michael Salim committed
106
    NUM_PROC = settings.BALSAM_MAX_CONCURRENT_TRANSITIONS
Michael Salim's avatar
Michael Salim committed
107

Michael Salim's avatar
Michael Salim committed
108 109
    def __init__(self):
        
110 111 112 113 114 115 116 117 118 119 120
        # Use a priority queue to ensure pre-run transitions occur fast
        handler = lambda a,b: on_exit()
        signal.signal(signal.SIGINT, handler)
        signal.signal(signal.SIGTERM, handler)

        class PQueueManager(multiprocessing.managers.SyncManager): pass
        PQueueManager.register("PriorityQueue", queue.PriorityQueue)
        self.pqueue_manager = PQueueManager()
        self.pqueue_manager.start()
        self.job_queue = self.pqueue_manager.PriorityQueue()

Michael Salim's avatar
Michael Salim committed
121
        self.status_queue = multiprocessing.Queue()
122
        self.lock = LockClass()
Michael Salim's avatar
Michael Salim committed
123 124 125 126
        self.transitions_pk_list = []

        self.procs = [
            multiprocessing.Process( target=main, 
127
                                    args=(self.job_queue, self.status_queue, self.lock))
128
            for i in range(self.NUM_PROC)
Michael Salim's avatar
Michael Salim committed
129
        ]
130
        logger.debug(f"Starting {len(self.procs)} transition processes")
131
        db.connections.close_all()
132 133 134
        for proc in self.procs: 
            proc.daemon = True
            proc.start()
Michael Salim's avatar
Michael Salim committed
135 136 137 138 139 140 141

    def __contains__(self, job):
        return job.pk in self.transitions_pk_list

    def add_job(self, job):
        if job in self: raise BalsamTransitionError("already in transition")
        if job.state not in TRANSITIONS: raise TransitionNotFoundError
142 143
        
        priority = PRIORITIES[job.state]
144 145
        job_msg = (priority, job.pk)
        self.job_queue.put(job_msg)
Michael Salim's avatar
Michael Salim committed
146
        self.transitions_pk_list.append(job.pk)
Michael Salim's avatar
Michael Salim committed
147

148
    def get_statuses(self):
Michael Salim's avatar
Michael Salim committed
149 150 151 152 153 154 155 156
        while not self.status_queue.empty():
            try:
                stat = self.status_queue.get_nowait()
                self.transitions_pk_list.remove(stat.pk)
                yield stat
            except queue.Empty:
                break

Michael Salim's avatar
Michael Salim committed
157
    def end_and_wait(self):
158
        priority = PRIORITIES['end']
159
        job_msg = (priority, 'end')
160
        logger.debug("Sending end message and waiting on transition processes")
Michael Salim's avatar
Michael Salim committed
161
        for proc in self.procs:
162
            self.job_queue.put(job_msg)
163 164
        for proc in self.procs: 
            proc.join()
165
        self.pqueue_manager.shutdown()
166
        logger.info("All Transition processes joined: done.")
Michael Salim's avatar
Michael Salim committed
167 168
        self.transitions_pk_list = []

169
def stage_in(job, lock):
Michael Salim's avatar
Michael Salim committed
170
    # Create workdirs for jobs: use job.create_working_path
171
    logger.debug(f'{job.cute_id} in stage_in')
Michael Salim's avatar
Michael Salim committed
172 173

    if not os.path.exists(job.working_directory):
174
        lock.acquire()
Michael Salim's avatar
Michael Salim committed
175
        job.create_working_path()
176
        lock.release()
Michael Salim's avatar
Michael Salim committed
177
    work_dir = job.working_directory
178
    logger.info(f"{job.cute_id} working directory {work_dir}")
Michael Salim's avatar
Michael Salim committed
179

Michael Salim's avatar
Michael Salim committed
180 181 182 183 184
    # stage in all remote urls
    # TODO: stage_in remote transfer should allow a list of files and folders,
    # rather than copying just one entire folder
    url_in = job.stage_in_url
    if url_in:
185
        logger.info(f"{job.cute_id} transfer in from {url_in}")
186
        try:
Michael Salim's avatar
Michael Salim committed
187
            transfer.stage_in(f"{url_in}/",  f"{work_dir}/")
188 189
        except Exception as e:
            message = 'Exception received during stage_in: ' + str(e)
190
            raise BalsamTransitionError(message) from e
Michael Salim's avatar
Michael Salim committed
191 192 193 194 195 196

    # create unique symlinks to "input_files" patterns from parents
    # TODO: handle data flow from remote sites transparently
    matches = []
    parents = job.get_parents()
    input_patterns = job.input_files.split()
197
    logger.debug(f"{job.cute_id} searching parent workdirs for {input_patterns}")
Michael Salim's avatar
Michael Salim committed
198 199 200 201
    for parent in parents:
        parent_dir = parent.working_directory
        for pattern in input_patterns:
            path = os.path.join(parent_dir, pattern)
Michael Salim's avatar
Michael Salim committed
202 203
            matches.extend((parent.pk,match) 
                           for match in glob.glob(path))
Michael Salim's avatar
Michael Salim committed
204 205 206

    for parent_pk, inp_file in matches:
        basename = os.path.basename(inp_file)
Michael Salim's avatar
Michael Salim committed
207 208 209
        new_path = os.path.join(work_dir, basename)
        
        if os.path.exists(new_path): new_path += f"_{str(parent_pk)[:8]}"
Michael Salim's avatar
Michael Salim committed
210
        # pointing to src, named dst
Michael Salim's avatar
Michael Salim committed
211
        logger.info(f"{job.cute_id}   {new_path}  -->  {inp_file}")
212
        try:
Michael Salim's avatar
Michael Salim committed
213
            os.symlink(src=inp_file, dst=new_path)
214 215 216
        except Exception as e:
            raise BalsamTransitionError(
                f"Exception received during symlink: {e}") from e
Michael Salim's avatar
Michael Salim committed
217

218
    lock.acquire()
Michael Salim's avatar
Michael Salim committed
219
    job.update_state('STAGED_IN')
220
    lock.release()
221
    logger.info(f"{job.cute_id} stage_in done")
222 223


224
def stage_out(job, lock):
Michael Salim's avatar
Michael Salim committed
225
    '''copy from the local working_directory to the output_url '''
226 227 228 229 230 231 232 233 234
    logger.debug(f'{job.cute_id} in stage_out')

    url_out = job.stage_out_url
    if not url_out:
        lock.acquire()
        job.update_state('JOB_FINISHED')
        lock.release()
        logger.info(f'{job.cute_id} no stage_out_url: done')
        return
235

Michael Salim's avatar
Michael Salim committed
236
    stage_out_patterns = job.stage_out_files.split()
237
    logger.debug(f"{job.cute_id} stage out files match: {stage_out_patterns}")
Michael Salim's avatar
Michael Salim committed
238 239 240 241 242
    work_dir = job.working_directory
    matches = []
    for pattern in stage_out_patterns:
        path = os.path.join(work_dir, pattern)
        matches.extend(glob.glob(path))
243

Michael Salim's avatar
Michael Salim committed
244
    if matches:
245
        logger.info(f"{job.cute_id} stage out files: {matches}")
Michael Salim's avatar
Michael Salim committed
246 247 248 249 250
        with tempfile.TemporaryDirectory() as stagingdir:
            try:
                for f in matches: 
                    base = os.path.basename(f)
                    dst = os.path.join(stagingdir, base)
251
                    shutil.copyfile(src=f, dst=dst)
252
                    logger.info(f"staging {f} out for transfer")
253 254
                logger.info(f"transferring to {url_out}")
                transfer.stage_out(f"{stagingdir}/", f"{url_out}/")
Michael Salim's avatar
Michael Salim committed
255
            except Exception as e:
256 257
                message = f'Exception received during stage_out: {e}'
                raise BalsamTransitionError(message) from e
258 259 260
    lock.acquire()
    job.update_state('JOB_FINISHED')
    lock.release()
261
    logger.info(f'{job.cute_id} stage_out done')
262 263


264
def preprocess(job, lock):
265
    logger.debug(f'{job.cute_id} in preprocess')
Michael Salim's avatar
Michael Salim committed
266 267 268 269 270 271

    # Get preprocesser exe
    preproc_app = job.preprocess
    if not preproc_app:
        try:
            app = job.get_application()
272
            preproc_app = app.default_preprocess
Michael Salim's avatar
Michael Salim committed
273 274 275
        except ObjectDoesNotExist as e:
            message = f"application {job.application} does not exist"
            raise BalsamTransitionError(message)
276 277
        except NoApplication:
            preproc_app = None
Michael Salim's avatar
Michael Salim committed
278
    if not preproc_app:
279
        lock.acquire()
Michael Salim's avatar
Michael Salim committed
280
        job.update_state('PREPROCESSED', 'No preprocess: skipped')
281
        lock.release()
282
        logger.info(f"{job.cute_id} no preprocess: skipped")
Michael Salim's avatar
Michael Salim committed
283
        return
Michael Salim's avatar
Michael Salim committed
284
    if not os.path.exists(preproc_app.split()[0]):
Michael Salim's avatar
Michael Salim committed
285 286
        #TODO: look for preproc in the EXE directories
        message = f"Preprocessor {preproc_app} does not exist on filesystem"
287
        raise BalsamTransitionError(message)
Michael Salim's avatar
Michael Salim committed
288 289 290 291 292

    # Create preprocess-specific environment
    envs = job.get_envs()

    # Run preprocesser with special environment in job working directory
293
    out = os.path.join(job.working_directory, f"preprocess.log")
294
    with open(out, 'w') as fp:
Michael Salim's avatar
Michael Salim committed
295
        fp.write(f"# Balsam Preprocessor: {preproc_app}")
296
        fp.flush()
Michael Salim's avatar
Michael Salim committed
297
        try:
298 299
            args = preproc_app.split()
            logger.info(f"{job.cute_id} preprocess Popen {args}")
Michael Salim's avatar
Michael Salim committed
300
            lock.acquire()
301
            proc = subprocess.Popen(args, stdout=fp,
Michael Salim's avatar
Michael Salim committed
302 303 304
                                    stderr=subprocess.STDOUT, env=envs,
                                    cwd=job.working_directory)
            retcode = proc.wait(timeout=PREPROCESS_TIMEOUT_SECONDS)
305
            proc.communicate()
Michael Salim's avatar
Michael Salim committed
306
            lock.release()
Michael Salim's avatar
Michael Salim committed
307 308 309
        except Exception as e:
            message = f"Preprocess failed: {e}"
            proc.kill()
310
            raise BalsamTransitionError(message) from e
311 312

    job.refresh_from_db()
Michael Salim's avatar
Michael Salim committed
313
    if retcode != 0:
314 315
        tail = get_tail(out)
        message = f"{job.cute_id} preprocess returned {retcode}:\n{tail}"
Michael Salim's avatar
Michael Salim committed
316 317
        raise BalsamTransitionError(message)

318
    lock.acquire()
Michael Salim's avatar
Michael Salim committed
319
    job.update_state('PREPROCESSED', f"{os.path.basename(preproc_app)}")
320
    lock.release()
321
    logger.info(f"{job.cute_id} preprocess done")
Michael Salim's avatar
Michael Salim committed
322

323
def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
324
    logger.debug(f'{job.cute_id} in postprocess')
Michael Salim's avatar
Michael Salim committed
325 326
    if error_handling and timeout_handling:
        raise ValueError("Both error-handling and timeout-handling is invalid")
327 328
    if error_handling: logger.info(f'{job.cute_id} handling RUN_ERROR')
    if timeout_handling: logger.info(f'{job.cute_id} handling RUN_TIMEOUT')
Michael Salim's avatar
Michael Salim committed
329 330 331 332 333 334

    # Get postprocesser exe
    postproc_app = job.postprocess
    if not postproc_app:
        try:
            app = job.get_application()
335
            postproc_app = app.default_postprocess
Michael Salim's avatar
Michael Salim committed
336 337 338 339
        except ObjectDoesNotExist as e:
            message = f"application {job.application} does not exist"
            logger.error(message)
            raise BalsamTransitionError(message)
340 341
        except NoApplication:
            postproc_app = None
Michael Salim's avatar
Michael Salim committed
342 343 344 345

    # If no postprocesssor; move on (unless in error_handling mode)
    if not postproc_app:
        if error_handling:
346
            message = f"{job.cute_id} handle error: no postprocessor found!"
Michael Salim's avatar
Michael Salim committed
347 348
            raise BalsamTransitionError(message)
        elif timeout_handling:
349
            lock.acquire()
Michael Salim's avatar
Michael Salim committed
350
            job.update_state('RESTART_READY', 'marking for re-run')
351
            lock.release()
352
            logger.warning(f'{job.cute_id} unhandled job timeout: marked RESTART_READY')
Michael Salim's avatar
Michael Salim committed
353
            return
354
        else:
355
            lock.acquire()
356 357
            job.update_state('POSTPROCESSED',
                             f'{job.cute_id} no postprocess: skipped')
358
            lock.release()
359
            logger.info(f'{job.cute_id} no postprocess: skipped')
Michael Salim's avatar
Michael Salim committed
360 361
            return

Michael Salim's avatar
Michael Salim committed
362
    if not os.path.exists(postproc_app.split()[0]):
Michael Salim's avatar
Michael Salim committed
363 364
        #TODO: look for postproc in the EXE directories
        message = f"Postprocessor {postproc_app} does not exist on filesystem"
365
        raise BalsamTransitionError(message)
Michael Salim's avatar
Michael Salim committed
366 367 368 369 370

    # Create postprocess-specific environment
    envs = job.get_envs(timeout=timeout_handling, error=error_handling)

    # Run postprocesser with special environment in job working directory
371 372
    out = os.path.join(job.working_directory, f"postprocess.log")
    with open(out, 'w') as fp:
Michael Salim's avatar
Michael Salim committed
373 374 375
        fp.write(f"# Balsam Postprocessor: {postproc_app}\n")
        if timeout_handling: fp.write("# Invoked to handle RUN_TIMEOUT\n")
        if error_handling: fp.write("# Invoked to handle RUN_ERROR\n")
376 377
        fp.flush()
        
Michael Salim's avatar
Michael Salim committed
378
        try:
379 380
            args = postproc_app.split()
            logger.info(f"{job.cute_id} postprocess Popen {args}")
Michael Salim's avatar
Michael Salim committed
381
            lock.acquire()
382
            proc = subprocess.Popen(args, stdout=fp,
Michael Salim's avatar
Michael Salim committed
383 384 385
                                    stderr=subprocess.STDOUT, env=envs,
                                    cwd=job.working_directory)
            retcode = proc.wait(timeout=POSTPROCESS_TIMEOUT_SECONDS)
386
            proc.communicate()
Michael Salim's avatar
Michael Salim committed
387
            lock.release()
Michael Salim's avatar
Michael Salim committed
388 389 390
        except Exception as e:
            message = f"Postprocess failed: {e}"
            proc.kill()
391
            raise BalsamTransitionError(message) from e
392
    
Michael Salim's avatar
Michael Salim committed
393
    if retcode != 0:
394 395
        tail = get_tail(out)
        message = f"{job.cute_id} postprocess returned {retcode}:\n{tail}"
Michael Salim's avatar
Michael Salim committed
396 397
        raise BalsamTransitionError(message)

398
    job.refresh_from_db()
Michael Salim's avatar
Michael Salim committed
399 400 401
    # If postprocessor handled error or timeout, it should have changed job's
    # state. If it failed to do this, mark FAILED.  Otherwise, POSTPROCESSED.
    if error_handling and job.state == 'RUN_ERROR':
402
        message = f"{job.cute_id} Error handling didn't fix job state: marking FAILED"
Michael Salim's avatar
Michael Salim committed
403 404 405
        raise BalsamTransitionError(message)

    if timeout_handling and job.state == 'RUN_TIMEOUT':
406
        message = f"{job.cute_id} Timeout handling didn't change job state: marking FAILED"
Michael Salim's avatar
Michael Salim committed
407 408 409
        raise BalsamTransitionError(message)

    if not (error_handling or timeout_handling):
410
        lock.acquire()
Michael Salim's avatar
Michael Salim committed
411
        job.update_state('POSTPROCESSED', f"{os.path.basename(postproc_app)}")
412
        lock.release()
413
    logger.info(f"{job.cute_id} postprocess done")
Michael Salim's avatar
Michael Salim committed
414 415


416
def handle_timeout(job, lock):
417
    logger.debug(f'{job.cute_id} in handle_timeout')
Michael Salim's avatar
Michael Salim committed
418
    if job.post_timeout_handler:
419
        logger.debug(f'{job.cute_id} invoking postprocess with timeout_handling flag')
420
        postprocess(job, lock, timeout_handling=True)
Michael Salim's avatar
Michael Salim committed
421
    elif job.auto_timeout_retry:
422
        logger.info(f'{job.cute_id} marking RESTART_READY')
423
        lock.acquire()
Michael Salim's avatar
Michael Salim committed
424
        job.update_state('RESTART_READY', 'timedout: auto retry')
425
        lock.release()
Michael Salim's avatar
Michael Salim committed
426
    else:
427
        raise BalsamTransitionError(f"{job.cute_id} no timeout handling: marking FAILED")
Michael Salim's avatar
Michael Salim committed
428 429


430
def handle_run_error(job, lock):
431
    logger.debug(f'{job.cute_id} in handle_run_error')
Michael Salim's avatar
Michael Salim committed
432
    if job.post_error_handler:
433
        logger.debug(f'{job.cute_id} invoking postprocess with error_handling flag')
434
        postprocess(job, lock, error_handling=True)
Michael Salim's avatar
Michael Salim committed
435 436
    else:
        raise BalsamTransitionError("No error handler: run failed")
437 438


Michael Salim's avatar
Michael Salim committed
439 440 441 442
TRANSITIONS = {
    'READY':            stage_in,
    'STAGED_IN':        preprocess,
    'RUN_DONE':         postprocess,
Michael Salim's avatar
Michael Salim committed
443 444
    'RUN_TIMEOUT':      handle_timeout,
    'RUN_ERROR':        handle_run_error,
Michael Salim's avatar
Michael Salim committed
445 446
    'POSTPROCESSED':    stage_out,
}
447 448 449 450 451 452
PRIORITIES = {
    'end' : -1,
    'READY':            0,
    'STAGED_IN':        0,
    'RUN_TIMEOUT':      1,
    'RUN_ERROR':        1,
453 454
    'RUN_DONE':         2,
    'POSTPROCESSED':    2,
455
}