transitions.py 11.3 KB
Newer Older
Michael Salim's avatar
Michael Salim committed
1 2
'''BalsamJob pre and post execution'''
from collections import namedtuple
Michael Salim's avatar
Michael Salim committed
3 4 5
import glob
import multiprocessing
import queue
6 7 8
import logging

from django.core.exceptions import ObjectDoesNotExist
Michael Salim's avatar
Michael Salim committed
9
from django.conf import settings
Michael Salim's avatar
Michael Salim committed
10
from django import db
11

Michael Salim's avatar
Michael Salim committed
12
from common import transfer
Michael Salim's avatar
Michael Salim committed
13
from balsam.launcher.exceptions import *
Michael Salim's avatar
Michael Salim committed
14
from balsam.models import BalsamJob
Michael Salim's avatar
Michael Salim committed
15
logger = logging.getLogger(__name__)
Michael Salim's avatar
Michael Salim committed
16

Michael Salim's avatar
Michael Salim committed
17
PREPROCESS_TIMEOUT_SECONDS = 300
Michael Salim's avatar
Michael Salim committed
18 19

StatusMsg = namedtuple('Status', ['pk', 'state', 'msg'])
Michael Salim's avatar
Michael Salim committed
20
JobMsg =   namedtuple('JobMsg', ['job', 'transition_function'])
Michael Salim's avatar
Michael Salim committed
21

Michael Salim's avatar
Michael Salim committed
22

Michael Salim's avatar
Michael Salim committed
23
def main(job_queue, status_queue):
Michael Salim's avatar
Michael Salim committed
24
    db.connection.close()
Michael Salim's avatar
Michael Salim committed
25
    while True:
Michael Salim's avatar
Michael Salim committed
26 27
        job_msg = job_queue.get()
        job, transition_function = job_msg
Michael Salim's avatar
Michael Salim committed
28 29
        if job == 'end': return

Michael Salim's avatar
Michael Salim committed
30
        try:
Michael Salim's avatar
Michael Salim committed
31
            transition_function(job)
Michael Salim's avatar
Michael Salim committed
32
        except BalsamTransitionError as e:
Michael Salim's avatar
Michael Salim committed
33
            job.update_state('FAILED', str(e))
Michael Salim's avatar
Michael Salim committed
34 35 36 37 38 39
            s = StatusMsg(job.pk, 'FAILED', str(e))
            status_queue.put(s)
        else:
            s = StatusMsg(job.pk, job.state, 'success')
            status_queue.put(s)

Michael Salim's avatar
Michael Salim committed
40

Michael Salim's avatar
Michael Salim committed
41
class TransitionProcessPool:
Michael Salim's avatar
Michael Salim committed
42
    
Michael Salim's avatar
Michael Salim committed
43
    NUM_PROC = settings.BALSAM_MAX_CONCURRENT_TRANSITIONS
Michael Salim's avatar
Michael Salim committed
44

Michael Salim's avatar
Michael Salim committed
45 46 47 48 49 50 51 52 53 54 55
    def __init__(self):
        
        self.job_queue = multiprocessing.Queue()
        self.status_queue = multiprocessing.Queue()
        self.transitions_pk_list = []

        self.procs = [
            multiprocessing.Process( target=main, 
                                    args=(self.job_queue, self.status_queue))
            for i in range(NUM_PROC)
        ]
Michael Salim's avatar
Michael Salim committed
56
        db.connections.close_all()
Michael Salim's avatar
Michael Salim committed
57 58 59 60 61 62 63 64 65
        for proc in self.procs: proc.start()

    def __contains__(self, job):
        return job.pk in self.transitions_pk_list

    def add_job(self, job):
        if job in self: raise BalsamTransitionError("already in transition")
        if job.state not in TRANSITIONS: raise TransitionNotFoundError
        transition_function = TRANSITIONS[job.state]
Michael Salim's avatar
Michael Salim committed
66
        m = JobMsg(job, transition_function)
Michael Salim's avatar
Michael Salim committed
67
        self.job_queue.put(m)
Michael Salim's avatar
Michael Salim committed
68
        self.transitions_pk_list.append(job.pk)
Michael Salim's avatar
Michael Salim committed
69 70 71 72 73 74 75 76 77 78

    def get_statuses():
        while not self.status_queue.empty():
            try:
                stat = self.status_queue.get_nowait()
                self.transitions_pk_list.remove(stat.pk)
                yield stat
            except queue.Empty:
                break

Michael Salim's avatar
Michael Salim committed
79
    def flush_job_queue(self):
Michael Salim's avatar
Michael Salim committed
80 81 82 83 84
        while not self.job_queue.empty():
            try:
                self.job_queue.get_nowait()
            except queue.Empty:
                break
Michael Salim's avatar
Michael Salim committed
85 86

    def end_and_wait(self):
Michael Salim's avatar
Michael Salim committed
87 88 89
        m = JobMsg('end', None)
        for proc in self.procs:
            self.job_queue.put(m)
Michael Salim's avatar
Michael Salim committed
90
        for proc in self.procs: proc.wait()
Michael Salim's avatar
Michael Salim committed
91 92
        self.transitions_pk_list = []

93 94

def check_parents(job):
Michael Salim's avatar
Michael Salim committed
95 96 97 98
    parents = job.get_parents()
    ready = all(p.state == 'JOB_FINISHED' for p in parents)
    if ready:
        job.update_state('READY', 'dependencies satisfied')
Michael Salim's avatar
Michael Salim committed
99 100
    elif job.state != 'AWAITING_PARENTS':
        job.update_state('AWAITING_PARENTS', f'{len(parents)} pending jobs')
101 102 103


def stage_in(job):
Michael Salim's avatar
Michael Salim committed
104
    # Create workdirs for jobs: use job.create_working_path
105
    logger.debug('in stage_in')
Michael Salim's avatar
Michael Salim committed
106 107 108

    if not os.path.exists(job.working_directory):
        job.create_working_path()
Michael Salim's avatar
Michael Salim committed
109
    work_dir = job.working_directory
Michael Salim's avatar
Michael Salim committed
110

Michael Salim's avatar
Michael Salim committed
111 112 113 114 115
    # stage in all remote urls
    # TODO: stage_in remote transfer should allow a list of files and folders,
    # rather than copying just one entire folder
    url_in = job.stage_in_url
    if url_in:
116
        try:
Michael Salim's avatar
Michael Salim committed
117
            transfer.stage_in(f"{url_in}/",  f"{work_dir}/")
118 119 120
        except Exception as e:
            message = 'Exception received during stage_in: ' + str(e)
            logger.error(message)
Michael Salim's avatar
Michael Salim committed
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
            raise BalsamTransitionError from e

    # create unique symlinks to "input_files" patterns from parents
    # TODO: handle data flow from remote sites transparently
    matches = []
    parents = job.get_parents()
    input_patterns = job.input_files.split()
    for parent in parents:
        parent_dir = parent.working_directory
        for pattern in input_patterns:
            path = os.path.join(parent_dir, pattern)
            matches.extend((parent.pk, glob.glob(path)))

    for parent_pk, inp_file in matches:
        basename = os.path.basename(inp_file)
        newpath = os.path.join(work_dir, basename)
        if os.path.exists(newpath): newpath += f"_{parent_pk}"
        # pointing to src, named dst
        os.symlink(src=inp_file, dst=newpath)
    job.update_state('STAGED_IN')
141 142 143


def stage_out(job):
Michael Salim's avatar
Michael Salim committed
144
    '''copy from the local working_directory to the output_url '''
145 146
    logger.debug('in stage_out')

Michael Salim's avatar
Michael Salim committed
147 148 149 150 151 152
    stage_out_patterns = job.stage_out_files.split()
    work_dir = job.working_directory
    matches = []
    for pattern in stage_out_patterns:
        path = os.path.join(work_dir, pattern)
        matches.extend(glob.glob(path))
153

Michael Salim's avatar
Michael Salim committed
154 155 156 157 158 159 160 161 162 163 164 165 166
    if matches:
        with tempfile.TemporaryDirectory() as stagingdir:
            try:
                for f in matches: 
                    base = os.path.basename(f)
                    dst = os.path.join(stagingdir, base)
                    os.link(src=f, dst=dst)
                transfer.stage_out(f"{stagingdir}/", f"{job.stage_out_url}/")
            except Exception as e:
                message = 'Exception received during stage_out: ' + str(e)
                logger.error(message)
                raise BalsamTransitionError from e
    job.update_state('JOB_FINISHED') # this completes the transitions
167 168 169 170


def preprocess(job):
    logger.debug('in preprocess ')
Michael Salim's avatar
Michael Salim committed
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187

    # Get preprocesser exe
    preproc_app = job.preprocess
    if not preproc_app:
        try:
            app = job.get_application()
        except ObjectDoesNotExist as e:
            message = f"application {job.application} does not exist"
            logger.error(message)
            raise BalsamTransitionError(message)
        preproc_app = app.default_preprocess
    if not preproc_app:
        job.update_state('PREPROCESSED', 'No preprocess: skipped')
        return
    if not os.path.exists(preproc_app):
        #TODO: look for preproc in the EXE directories
        message = f"Preprocessor {preproc_app} does not exist on filesystem"
188
        logger.error(message)
Michael Salim's avatar
Michael Salim committed
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
        raise BalsamTransitionError

    # Create preprocess-specific environment
    envs = job.get_envs()

    # Run preprocesser with special environment in job working directory
    out = os.path.join(job.working_directory, f"preprocess.log.pid{os.getpid()}")
    with open(out, 'wb') as fp:
        fp.write(f"# Balsam Preprocessor: {preproc_app}")
        try:
            proc = subprocess.Popen(preproc_app, stdout=fp,
                                    stderr=subprocess.STDOUT, env=envs,
                                    cwd=job.working_directory)
            retcode = proc.wait(timeout=PREPROCESS_TIMEOUT_SECONDS)
        except Exception as e:
            message = f"Preprocess failed: {e}"
            logger.error(message)
            proc.kill()
            raise BalsamTransitionError from e
    if retcode != 0:
        message = f"Preprocess Popen nonzero returncode: {retcode}"
210
        logger.error(message)
Michael Salim's avatar
Michael Salim committed
211 212 213 214 215 216
        raise BalsamTransitionError(message)

    job.update_state('PREPROCESSED', f"{os.path.basename(preproc_app)}")


def postprocess(job, *, error_handling=False, timeout_handling=False):
217
    logger.debug('in postprocess ')
Michael Salim's avatar
Michael Salim committed
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
    if error_handling and timeout_handling:
        raise ValueError("Both error-handling and timeout-handling is invalid")
    if error_handling: logger.debug('Handling RUN_ERROR')
    if timeout_handling: logger.debug('Handling RUN_TIMEOUT')

    # Get postprocesser exe
    postproc_app = job.postprocess
    if not postproc_app:
        try:
            app = job.get_application()
        except ObjectDoesNotExist as e:
            message = f"application {job.application} does not exist"
            logger.error(message)
            raise BalsamTransitionError(message)
        postproc_app = app.default_postprocess

    # If no postprocesssor; move on (unless in error_handling mode)
    if not postproc_app:
        if error_handling:
            message = "Trying to handle error, but no postprocessor found"
            logger.warning(message)
            raise BalsamTransitionError(message)
        elif timeout_handling:
            logger.warning('Unhandled job timeout: marking RESTART_READY')
            job.update_state('RESTART_READY', 'marking for re-run')
            return
244
        else:
Michael Salim's avatar
Michael Salim committed
245 246 247 248 249 250
            job.update_state('POSTPROCESSED', 'No postprocess: skipped')
            return

    if not os.path.exists(postproc_app):
        #TODO: look for postproc in the EXE directories
        message = f"Postprocessor {postproc_app} does not exist on filesystem"
251
        logger.error(message)
Michael Salim's avatar
Michael Salim committed
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
        raise BalsamTransitionError

    # Create postprocess-specific environment
    envs = job.get_envs(timeout=timeout_handling, error=error_handling)

    # Run postprocesser with special environment in job working directory
    out = os.path.join(job.working_directory, f"postprocess.log.pid{os.getpid()}")
    with open(out, 'wb') as fp:
        fp.write(f"# Balsam Postprocessor: {postproc_app}\n")
        if timeout_handling: fp.write("# Invoked to handle RUN_TIMEOUT\n")
        if error_handling: fp.write("# Invoked to handle RUN_ERROR\n")

        try:
            proc = subprocess.Popen(postproc_app, stdout=fp,
                                    stderr=subprocess.STDOUT, env=envs,
                                    cwd=job.working_directory)
            retcode = proc.wait(timeout=POSTPROCESS_TIMEOUT_SECONDS)
        except Exception as e:
            message = f"Postprocess failed: {e}"
            logger.error(message)
            proc.kill()
            raise BalsamTransitionError from e
    if retcode != 0:
        message = f"Postprocess Popen nonzero returncode: {retcode}"
276
        logger.error(message)
Michael Salim's avatar
Michael Salim committed
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
        raise BalsamTransitionError(message)

    # If postprocessor handled error or timeout, it should have changed job's
    # state. If it failed to do this, mark FAILED.  Otherwise, POSTPROCESSED.
    job.refresh_from_db()
    if error_handling and job.state == 'RUN_ERROR':
        message = f"Error handling failed to change job state: marking FAILED"
        logger.warning(message)
        raise BalsamTransitionError(message)

    if timeout_handling and job.state == 'RUN_TIMEOUT':
        message = f"Timeout handling failed to change job state: marking FAILED"
        logger.warning(message)
        raise BalsamTransitionError(message)

    if not (error_handling or timeout_handling):
        job.update_state('POSTPROCESSED', f"{os.path.basename(postproc_app)}")


def handle_timeout(job):
    if job.post_timeout_handler:
        postprocess(job, timeout_handling=True)
    elif job.auto_timeout_retry:
        job.update_state('RESTART_READY', 'timedout: auto retry')
    else:
        raise BalsamTransitionError("No timeout handling: marking FAILED")


def handle_run_error(job):
    if job.post_error_handler:
        postprocess(job, error_handling=True)
    else:
        raise BalsamTransitionError("No error handler: run failed")
310 311


Michael Salim's avatar
Michael Salim committed
312 313 314 315 316 317 318
TRANSITIONS = {
    'CREATED':          check_parents,
    'LAUNCHER_QUEUED':  check_parents,
    'AWAITING_PARENTS': check_parents,
    'READY':            stage_in,
    'STAGED_IN':        preprocess,
    'RUN_DONE':         postprocess,
Michael Salim's avatar
Michael Salim committed
319 320
    'RUN_TIMEOUT':      handle_timeout,
    'RUN_ERROR':        handle_run_error,
Michael Salim's avatar
Michael Salim committed
321 322
    'POSTPROCESSED':    stage_out,
}