transitions.py 11.4 KB
Newer Older
Michael Salim's avatar
Michael Salim committed
1 2
'''BalsamJob pre and post execution'''
from collections import namedtuple
Michael Salim's avatar
Michael Salim committed
3 4 5
import glob
import multiprocessing
import queue
6 7 8
import logging

from django.core.exceptions import ObjectDoesNotExist
Michael Salim's avatar
Michael Salim committed
9
from django.conf import settings
Michael Salim's avatar
Michael Salim committed
10
from django import db
11

Michael Salim's avatar
Michael Salim committed
12
from common import transfer
Michael Salim's avatar
Michael Salim committed
13
from balsam.launcher.exceptions import *
Michael Salim's avatar
Michael Salim committed
14
from balsam.models import BalsamJob
Michael Salim's avatar
Michael Salim committed
15
logger = logging.getLogger(__name__)
Michael Salim's avatar
Michael Salim committed
16

Michael Salim's avatar
Michael Salim committed
17
PREPROCESS_TIMEOUT_SECONDS = 300
Michael Salim's avatar
Michael Salim committed
18
SITE = settings.BALSAM_SITE
Michael Salim's avatar
Michael Salim committed
19 20

StatusMsg = namedtuple('Status', ['pk', 'state', 'msg'])
Michael Salim's avatar
Michael Salim committed
21
JobMsg =   namedtuple('JobMsg', ['job', 'transition_function'])
Michael Salim's avatar
Michael Salim committed
22

Michael Salim's avatar
Michael Salim committed
23

Michael Salim's avatar
Michael Salim committed
24
def main(job_queue, status_queue):
Michael Salim's avatar
Michael Salim committed
25
    db.connection.close()
Michael Salim's avatar
Michael Salim committed
26
    while True:
Michael Salim's avatar
Michael Salim committed
27 28
        job_msg = job_queue.get()
        job, transition_function = job_msg
Michael Salim's avatar
Michael Salim committed
29
        if job == 'end': return
Michael Salim's avatar
Michael Salim committed
30 31 32
        if job.work_site != SITE:
            job.work_site = SITE
            job.save(update_fields=['work_site'])
Michael Salim's avatar
Michael Salim committed
33
        try:
Michael Salim's avatar
Michael Salim committed
34
            transition_function(job)
Michael Salim's avatar
Michael Salim committed
35
        except BalsamTransitionError as e:
Michael Salim's avatar
Michael Salim committed
36
            job.update_state('FAILED', str(e))
Michael Salim's avatar
Michael Salim committed
37 38 39 40 41 42
            s = StatusMsg(job.pk, 'FAILED', str(e))
            status_queue.put(s)
        else:
            s = StatusMsg(job.pk, job.state, 'success')
            status_queue.put(s)

Michael Salim's avatar
Michael Salim committed
43

Michael Salim's avatar
Michael Salim committed
44
class TransitionProcessPool:
Michael Salim's avatar
Michael Salim committed
45
    
Michael Salim's avatar
Michael Salim committed
46
    NUM_PROC = settings.BALSAM_MAX_CONCURRENT_TRANSITIONS
Michael Salim's avatar
Michael Salim committed
47

Michael Salim's avatar
Michael Salim committed
48 49 50 51 52 53 54 55 56 57 58
    def __init__(self):
        
        self.job_queue = multiprocessing.Queue()
        self.status_queue = multiprocessing.Queue()
        self.transitions_pk_list = []

        self.procs = [
            multiprocessing.Process( target=main, 
                                    args=(self.job_queue, self.status_queue))
            for i in range(NUM_PROC)
        ]
Michael Salim's avatar
Michael Salim committed
59
        db.connections.close_all()
Michael Salim's avatar
Michael Salim committed
60 61 62 63 64 65 66 67 68
        for proc in self.procs: proc.start()

    def __contains__(self, job):
        return job.pk in self.transitions_pk_list

    def add_job(self, job):
        if job in self: raise BalsamTransitionError("already in transition")
        if job.state not in TRANSITIONS: raise TransitionNotFoundError
        transition_function = TRANSITIONS[job.state]
Michael Salim's avatar
Michael Salim committed
69
        m = JobMsg(job, transition_function)
Michael Salim's avatar
Michael Salim committed
70
        self.job_queue.put(m)
Michael Salim's avatar
Michael Salim committed
71
        self.transitions_pk_list.append(job.pk)
Michael Salim's avatar
Michael Salim committed
72 73 74 75 76 77 78 79 80 81

    def get_statuses():
        while not self.status_queue.empty():
            try:
                stat = self.status_queue.get_nowait()
                self.transitions_pk_list.remove(stat.pk)
                yield stat
            except queue.Empty:
                break

Michael Salim's avatar
Michael Salim committed
82
    def flush_job_queue(self):
Michael Salim's avatar
Michael Salim committed
83 84 85 86 87
        while not self.job_queue.empty():
            try:
                self.job_queue.get_nowait()
            except queue.Empty:
                break
Michael Salim's avatar
Michael Salim committed
88 89

    def end_and_wait(self):
Michael Salim's avatar
Michael Salim committed
90 91 92
        m = JobMsg('end', None)
        for proc in self.procs:
            self.job_queue.put(m)
Michael Salim's avatar
Michael Salim committed
93
        for proc in self.procs: proc.wait()
Michael Salim's avatar
Michael Salim committed
94 95
        self.transitions_pk_list = []

96 97

def check_parents(job):
Michael Salim's avatar
Michael Salim committed
98 99 100 101
    parents = job.get_parents()
    ready = all(p.state == 'JOB_FINISHED' for p in parents)
    if ready:
        job.update_state('READY', 'dependencies satisfied')
Michael Salim's avatar
Michael Salim committed
102 103
    elif job.state != 'AWAITING_PARENTS':
        job.update_state('AWAITING_PARENTS', f'{len(parents)} pending jobs')
104 105 106


def stage_in(job):
Michael Salim's avatar
Michael Salim committed
107
    # Create workdirs for jobs: use job.create_working_path
108
    logger.debug('in stage_in')
Michael Salim's avatar
Michael Salim committed
109 110 111

    if not os.path.exists(job.working_directory):
        job.create_working_path()
Michael Salim's avatar
Michael Salim committed
112
    work_dir = job.working_directory
Michael Salim's avatar
Michael Salim committed
113

Michael Salim's avatar
Michael Salim committed
114 115 116 117 118
    # stage in all remote urls
    # TODO: stage_in remote transfer should allow a list of files and folders,
    # rather than copying just one entire folder
    url_in = job.stage_in_url
    if url_in:
119
        try:
Michael Salim's avatar
Michael Salim committed
120
            transfer.stage_in(f"{url_in}/",  f"{work_dir}/")
121 122 123
        except Exception as e:
            message = 'Exception received during stage_in: ' + str(e)
            logger.error(message)
Michael Salim's avatar
Michael Salim committed
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
            raise BalsamTransitionError from e

    # create unique symlinks to "input_files" patterns from parents
    # TODO: handle data flow from remote sites transparently
    matches = []
    parents = job.get_parents()
    input_patterns = job.input_files.split()
    for parent in parents:
        parent_dir = parent.working_directory
        for pattern in input_patterns:
            path = os.path.join(parent_dir, pattern)
            matches.extend((parent.pk, glob.glob(path)))

    for parent_pk, inp_file in matches:
        basename = os.path.basename(inp_file)
        newpath = os.path.join(work_dir, basename)
        if os.path.exists(newpath): newpath += f"_{parent_pk}"
        # pointing to src, named dst
        os.symlink(src=inp_file, dst=newpath)
    job.update_state('STAGED_IN')
144 145 146


def stage_out(job):
Michael Salim's avatar
Michael Salim committed
147
    '''copy from the local working_directory to the output_url '''
148 149
    logger.debug('in stage_out')

Michael Salim's avatar
Michael Salim committed
150 151 152 153 154 155
    stage_out_patterns = job.stage_out_files.split()
    work_dir = job.working_directory
    matches = []
    for pattern in stage_out_patterns:
        path = os.path.join(work_dir, pattern)
        matches.extend(glob.glob(path))
156

Michael Salim's avatar
Michael Salim committed
157 158 159 160 161 162 163 164 165 166 167 168 169
    if matches:
        with tempfile.TemporaryDirectory() as stagingdir:
            try:
                for f in matches: 
                    base = os.path.basename(f)
                    dst = os.path.join(stagingdir, base)
                    os.link(src=f, dst=dst)
                transfer.stage_out(f"{stagingdir}/", f"{job.stage_out_url}/")
            except Exception as e:
                message = 'Exception received during stage_out: ' + str(e)
                logger.error(message)
                raise BalsamTransitionError from e
    job.update_state('JOB_FINISHED') # this completes the transitions
170 171 172 173


def preprocess(job):
    logger.debug('in preprocess ')
Michael Salim's avatar
Michael Salim committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

    # Get preprocesser exe
    preproc_app = job.preprocess
    if not preproc_app:
        try:
            app = job.get_application()
        except ObjectDoesNotExist as e:
            message = f"application {job.application} does not exist"
            logger.error(message)
            raise BalsamTransitionError(message)
        preproc_app = app.default_preprocess
    if not preproc_app:
        job.update_state('PREPROCESSED', 'No preprocess: skipped')
        return
    if not os.path.exists(preproc_app):
        #TODO: look for preproc in the EXE directories
        message = f"Preprocessor {preproc_app} does not exist on filesystem"
191
        logger.error(message)
Michael Salim's avatar
Michael Salim committed
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
        raise BalsamTransitionError

    # Create preprocess-specific environment
    envs = job.get_envs()

    # Run preprocesser with special environment in job working directory
    out = os.path.join(job.working_directory, f"preprocess.log.pid{os.getpid()}")
    with open(out, 'wb') as fp:
        fp.write(f"# Balsam Preprocessor: {preproc_app}")
        try:
            proc = subprocess.Popen(preproc_app, stdout=fp,
                                    stderr=subprocess.STDOUT, env=envs,
                                    cwd=job.working_directory)
            retcode = proc.wait(timeout=PREPROCESS_TIMEOUT_SECONDS)
        except Exception as e:
            message = f"Preprocess failed: {e}"
            logger.error(message)
            proc.kill()
            raise BalsamTransitionError from e
    if retcode != 0:
        message = f"Preprocess Popen nonzero returncode: {retcode}"
213
        logger.error(message)
Michael Salim's avatar
Michael Salim committed
214 215 216 217 218 219
        raise BalsamTransitionError(message)

    job.update_state('PREPROCESSED', f"{os.path.basename(preproc_app)}")


def postprocess(job, *, error_handling=False, timeout_handling=False):
220
    logger.debug('in postprocess ')
Michael Salim's avatar
Michael Salim committed
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
    if error_handling and timeout_handling:
        raise ValueError("Both error-handling and timeout-handling is invalid")
    if error_handling: logger.debug('Handling RUN_ERROR')
    if timeout_handling: logger.debug('Handling RUN_TIMEOUT')

    # Get postprocesser exe
    postproc_app = job.postprocess
    if not postproc_app:
        try:
            app = job.get_application()
        except ObjectDoesNotExist as e:
            message = f"application {job.application} does not exist"
            logger.error(message)
            raise BalsamTransitionError(message)
        postproc_app = app.default_postprocess

    # If no postprocesssor; move on (unless in error_handling mode)
    if not postproc_app:
        if error_handling:
            message = "Trying to handle error, but no postprocessor found"
            logger.warning(message)
            raise BalsamTransitionError(message)
        elif timeout_handling:
            logger.warning('Unhandled job timeout: marking RESTART_READY')
            job.update_state('RESTART_READY', 'marking for re-run')
            return
247
        else:
Michael Salim's avatar
Michael Salim committed
248 249 250 251 252 253
            job.update_state('POSTPROCESSED', 'No postprocess: skipped')
            return

    if not os.path.exists(postproc_app):
        #TODO: look for postproc in the EXE directories
        message = f"Postprocessor {postproc_app} does not exist on filesystem"
254
        logger.error(message)
Michael Salim's avatar
Michael Salim committed
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
        raise BalsamTransitionError

    # Create postprocess-specific environment
    envs = job.get_envs(timeout=timeout_handling, error=error_handling)

    # Run postprocesser with special environment in job working directory
    out = os.path.join(job.working_directory, f"postprocess.log.pid{os.getpid()}")
    with open(out, 'wb') as fp:
        fp.write(f"# Balsam Postprocessor: {postproc_app}\n")
        if timeout_handling: fp.write("# Invoked to handle RUN_TIMEOUT\n")
        if error_handling: fp.write("# Invoked to handle RUN_ERROR\n")

        try:
            proc = subprocess.Popen(postproc_app, stdout=fp,
                                    stderr=subprocess.STDOUT, env=envs,
                                    cwd=job.working_directory)
            retcode = proc.wait(timeout=POSTPROCESS_TIMEOUT_SECONDS)
        except Exception as e:
            message = f"Postprocess failed: {e}"
            logger.error(message)
            proc.kill()
            raise BalsamTransitionError from e
    if retcode != 0:
        message = f"Postprocess Popen nonzero returncode: {retcode}"
279
        logger.error(message)
Michael Salim's avatar
Michael Salim committed
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
        raise BalsamTransitionError(message)

    # If postprocessor handled error or timeout, it should have changed job's
    # state. If it failed to do this, mark FAILED.  Otherwise, POSTPROCESSED.
    job.refresh_from_db()
    if error_handling and job.state == 'RUN_ERROR':
        message = f"Error handling failed to change job state: marking FAILED"
        logger.warning(message)
        raise BalsamTransitionError(message)

    if timeout_handling and job.state == 'RUN_TIMEOUT':
        message = f"Timeout handling failed to change job state: marking FAILED"
        logger.warning(message)
        raise BalsamTransitionError(message)

    if not (error_handling or timeout_handling):
        job.update_state('POSTPROCESSED', f"{os.path.basename(postproc_app)}")


def handle_timeout(job):
    if job.post_timeout_handler:
        postprocess(job, timeout_handling=True)
    elif job.auto_timeout_retry:
        job.update_state('RESTART_READY', 'timedout: auto retry')
    else:
        raise BalsamTransitionError("No timeout handling: marking FAILED")


def handle_run_error(job):
    if job.post_error_handler:
        postprocess(job, error_handling=True)
    else:
        raise BalsamTransitionError("No error handler: run failed")
313 314


Michael Salim's avatar
Michael Salim committed
315 316 317 318 319 320 321
TRANSITIONS = {
    'CREATED':          check_parents,
    'LAUNCHER_QUEUED':  check_parents,
    'AWAITING_PARENTS': check_parents,
    'READY':            stage_in,
    'STAGED_IN':        preprocess,
    'RUN_DONE':         postprocess,
Michael Salim's avatar
Michael Salim committed
322 323
    'RUN_TIMEOUT':      handle_timeout,
    'RUN_ERROR':        handle_run_error,
Michael Salim's avatar
Michael Salim committed
324 325
    'POSTPROCESSED':    stage_out,
}