mpi_ensemble.py 3.37 KB
Newer Older
1 2 3
from collections import namedtuple
import os
import sys
4 5
import logging
import django
Michael Salim's avatar
Michael Salim committed
6 7
import signal

8
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
9
django.setup()
10
logger = logging.getLogger('balsam.launcher.mpi_ensemble')
11

Michael Salim's avatar
Michael Salim committed
12
from subprocess import Popen, STDOUT
13 14

from mpi4py import MPI
15

16 17 18
from balsam.launcher.util import cd, get_tail
from balsam.launcher.exceptions import *
from balsam.service.models import BalsamJob
19 20 21

COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
22 23 24 25 26 27
HANDLE_EXIT = False

def on_exit(job):
    global HANDLE_EXIT
    if HANDLE_EXIT: return
    HANDLE_EXIT = True
28

29 30 31 32 33 34
    logger.debug(f"mpi_ensemble.py rank {RANK} received interrupt: quitting now")
    if job is not None:
        job.terminate()
        try: job.wait(timeout=10)
        except: job.kill()
    print("TIMEOUT")
Michael Salim's avatar
Michael Salim committed
35 36 37
    MPI.Finalize()
    sys.exit(0)

38
handler = lambda a,b: on_exit(None)
Michael Salim's avatar
Michael Salim committed
39 40 41
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

42 43 44 45 46 47 48 49 50 51 52
Job = namedtuple('Job', ['id', 'workdir', 'cmd'])


def status_msg(id, state, msg=''):
    print(f"{id} {state} {msg}", flush=True)


def read_jobs(fp):
    for line in fp:
        try:
            id, workdir, *command = line.split()
53
            logger.debug(f"Read Job {id}  CMD: {command}  DIR: {workdir}")
54
        except:
55
            logger.debug("Invalid jobline")
56 57 58
            continue
        if id and command and os.path.isdir(workdir):
            yield Job(id, workdir, command)
59 60
        else:
            logger.debug("Invalid workdir")
61 62 63


def run(job):
64

Michael Salim's avatar
Michael Salim committed
65 66
    job_from_db = BalsamJob.objects.get(pk=job.id)
    basename = job_from_db.name
67
    outname = f"{basename}.out"
68
    logger.debug(f"mpi_ensemble rank {RANK}: starting job {job.id}")
Michael Salim's avatar
Michael Salim committed
69
    with cd(job.workdir) as _, open(outname, 'wb') as outf:
70 71
        try:
            status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
72

Michael Salim's avatar
Michael Salim committed
73
            env = job_from_db.get_envs() # TODO: Should we include this?
74 75
            proc = Popen(job.cmd, stdout=outf, stderr=STDOUT,
                         cwd=job.workdir,env=env)
76 77 78 79 80

            handler = lambda a,b: on_exit(proc)
            signal.signal(signal.SIGINT, handler)
            signal.signal(signal.SIGTERM, handler)

81 82
            retcode = proc.wait()
        except Exception as e:
83
            logger.exception(f"mpi_ensemble rank {RANK} job {job.id}: exception during Popen")
Michael Salim's avatar
Michael Salim committed
84
            status_msg(job.id, "FAILED", msg=str(e))
85 86
            raise MPIEnsembleError from e
        else:
87 88 89 90 91 92 93 94 95
            if retcode == 0: 
                logger.debug(f"mpi_ensemble rank {RANK}: job returned 0")
                status_msg(job.id, "RUN_DONE")
            else:
                outf.flush()
                tail = get_tail(outf.name).replace('\n', '\\n')
                msg = f"NONZERO RETURN {retcode}: {tail}"
                status_msg(job.id, "RUN_ERROR", msg=msg)
                logger.debug(f"mpi_ensemble rank {RANK} job {job.id} {msg}")
96
        finally:
97
            proc.kill()
98 99 100 101 102 103


def main(jobs_path):
    job_list = None

    if RANK == 0:
104
        logger.debug(f"Master rank of mpi_ensemble.py: reading jobs from {jobs_path}")
Michael Salim's avatar
Michael Salim committed
105
        with open(jobs_path) as fp: 
106 107 108
            job_list = list(read_jobs(fp))

    job_list = COMM.bcast(job_list, root=0)
109 110
    if RANK == 0:
        logger.debug(f"Broadcasted job list. Total {len(job_list)} jobs to run")
Michael Salim's avatar
Michael Salim committed
111
    for job in job_list[RANK::COMM.size]: run(job)
112 113 114 115

if __name__ == "__main__":
    path = sys.argv[1]
    main(path)