launcher.py 8.05 KB
Newer Older
1 2 3
'''The Launcher is either invoked by the user, who bypasses the Balsam
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
Michael Salim's avatar
Michael Salim committed
4
import argparse
5
from math import floor
6
import os
Michael Salim's avatar
Michael Salim committed
7 8
from sys import exit
import signal
9 10
import time

11
import django
12
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
13
django.setup()
14 15
from django.conf import settings

16
import logging
17
logger = logging.getLogger('balsam.launcher')
18 19
logger.info("Loading Balsam Launcher")

20
from balsam.service.schedulers import Scheduler
21 22
scheduler = Scheduler.scheduler_main

23 24 25 26 27
from balsam.launcher import jobreader
from balsam.launcher import transitions
from balsam.launcher import worker
from balsam.launcher import runners
from balsam.launcher.exceptions import *
28

29
ALMOST_RUNNABLE_STATES = ['READY','STAGED_IN']
Michael Salim's avatar
Michael Salim committed
30
RUNNABLE_STATES = ['PREPROCESSED', 'RESTART_READY']
31
WAITING_STATES = ['CREATED', 'LAUNCHER_QUEUED', 'AWAITING_PARENTS']
32
HANDLING_EXIT = False
33

34
def delay_generator(period=settings.BALSAM_SERVICE_PERIOD):
Michael Salim's avatar
Michael Salim committed
35 36 37 38 39 40
    nexttime = time.time() + period
    while True:
        now = time.time()
        tosleep = nexttime - now
        if tosleep <= 0:
            nexttime = now + period
41
        else:
Michael Salim's avatar
Michael Salim committed
42 43 44
            time.sleep(tosleep)
            nexttime = now + tosleep + period
        yield
45

46 47 48 49 50
def elapsed_time_minutes():
    start = time.time()
    while True:
        yield (time.time() - start) / 60.0

51 52 53 54 55
def remaining_time_minutes(time_limit_minutes=0.0):
    elapsed_timer = elapsed_time_minutes()
    while True:
        if time_limit_minutes > 0.0:
            remaining = time_limit_minutes - next(elapsed_timer)
Michael Salim's avatar
Michael Salim committed
56
        else:
57 58 59
            remaining = scheduler.remaining_time_seconds() / 60.0
        if remaining > 0: yield remaining
        else: break
Michael Salim's avatar
Michael Salim committed
60 61

def check_parents(job, lock):
62
    job.refresh_from_db()
Michael Salim's avatar
Michael Salim committed
63 64 65 66 67 68 69 70 71 72 73 74 75
    parents = job.get_parents()
    ready = all(p.state == 'JOB_FINISHED' for p in parents)
    if ready:
        lock.acquire()
        job.update_state('READY', 'dependencies satisfied')
        lock.release()
        logger.info(f'{job.cute_id} ready')
    elif job.state != 'AWAITING_PARENTS':
        lock.acquire()
        job.update_state('AWAITING_PARENTS', f'{len(parents)} parents')
        lock.release()
        logger.info(f'{job.cute_id} waiting for parents')

76 77 78 79 80 81 82
def log_time(minutes_left):
    if minutes_left > 1e12:
        return
    whole_minutes = floor(minutes_left)
    whole_seconds = round((minutes_left - whole_minutes)*60)
    time_str = f"{whole_minutes:02d} min : {whole_seconds:02d} sec remaining"
    logger.info(time_str)
Michael Salim's avatar
Michael Salim committed
83

Michael Salim's avatar
Michael Salim committed
84
def main(args, transition_pool, runner_group, job_source):
Michael Salim's avatar
Michael Salim committed
85

86 87 88 89 90 91 92
    delay_sleeper = delay_generator()
    runner_create_period = settings.BALSAM_RUNNER_CREATION_PERIOD_SEC
    last_runner_created = time.time()
    remaining_timer = remaining_time_minutes(args.time_limit_minutes)

    for remaining_minutes in remaining_timer:

93
        logger.info("\n******************\n"
94 95
                       "BEGIN SERVICE LOOP\n"
                       "******************")
96 97 98 99 100
        log_time(remaining_minutes)
        delay = True

        # Update after any finished transitions
        for stat in transition_pool.get_statuses(): delay = False
Michael Salim's avatar
Michael Salim committed
101
        job_source.refresh_from_db()
102 103 104 105 106

        # Update jobs awaiting dependencies
        waiting_jobs = (j for j in job_source.jobs if j.state in WAITING_STATES)
        for job in waiting_jobs: 
            check_parents(job, transition_pool.lock)
Michael Salim's avatar
Michael Salim committed
107
        
108
        # Enqueue new transitions
Michael Salim's avatar
Michael Salim committed
109
        transitionable_jobs = [
Michael Salim's avatar
Michael Salim committed
110
            job for job in job_source.jobs
111 112
            if job not in transition_pool
            and job.state in transitions.TRANSITIONS
Michael Salim's avatar
Michael Salim committed
113
        ]
114

Michael Salim's avatar
Michael Salim committed
115
        for job in transitionable_jobs:
Michael Salim's avatar
Michael Salim committed
116
            transition_pool.add_job(job)
117
            delay = False
118 119
            fxn = transitions.TRANSITIONS[job.state]
            logger.info(f"Queued transition: {job.cute_id} will undergo {fxn}")
Michael Salim's avatar
Michael Salim committed
120
        
121
        # Update jobs that are running/finished
Michael Salim's avatar
Michael Salim committed
122
        any_finished = runner_group.update_and_remove_finished()
123
        if any_finished: delay = False
124
        job_source.refresh_from_db()
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
    
        # Decide whether or not to start a new runner
        runnable_jobs = [
            job for job in job_source.jobs
            if job.pk not in runner_group.running_job_pks and
            job.state in RUNNABLE_STATES and
            job.wall_time_minutes <= remaining_minutes
        ]

        logger.debug(f"Have {len(runnable_jobs)} runnable jobs")
        almost_runnable = any(j.state in ALMOST_RUNNABLE_STATES for j in job_source.jobs)
        now = time.time()
        runner_ready = bool(now - last_runner_created > runner_create_period)
        num_serial = len([j for j in runnable_jobs if j.num_ranks == 1])
        worker = worker_group[0]
        max_serial_per_ensemble = 2 * worker.num_nodes * worker.max_ranks_per_node
        ensemble_ready = (num_serial >= max_serial_per_ensemble) or (num_serial == 0)

        if runnable_jobs:
            if runner_ready or not almost_runnable or ensemble_ready:
                try:
                    runner_group.create_next_runner(runnable_jobs, worker_group)
                except ExceededMaxRunners:
                    logger.info("Exceeded max concurrent runners; waiting")
                except NoAvailableWorkers:
                    logger.info("Not enough idle workers to start any new runs")
                else:
                    last_runner_created = now

        if delay: next(delay_sleeper)
Michael Salim's avatar
Michael Salim committed
155
    
Michael Salim's avatar
Michael Salim committed
156
def on_exit(runner_group, transition_pool, job_source):
157 158 159 160 161 162 163
    global HANDLING_EXIT
    if HANDLING_EXIT: return
    HANDLING_EXIT = True

    logger.debug("Entering on_exit cleanup function")
    logger.debug("on_exit: update/remove/timeout jobs from runner group")
    runner_group.update_and_remove_finished(timeout=True)
164

165
    logger.debug("on_exit: send end message to transition threads")
Michael Salim's avatar
Michael Salim committed
166
    transition_pool.end_and_wait()
167
    logger.debug("on_exit: Launcher exit graceful\n\n")
Michael Salim's avatar
Michael Salim committed
168
    exit(0)
169

Michael Salim's avatar
Michael Salim committed
170

171
def get_args(inputcmd=None):
Michael Salim's avatar
Michael Salim committed
172
    parser = argparse.ArgumentParser(description="Start Balsam Job Launcher.")
173 174 175 176
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--job-file', help="File of Balsam job IDs")
    group.add_argument('--consume-all', action='store_true', 
                        help="Continuously run all jobs from DB")
Michael Salim's avatar
Michael Salim committed
177
    group.add_argument('--wf-name',
178
                       help="Continuously run jobs of specified workflow")
Michael Salim's avatar
Michael Salim committed
179
    parser.add_argument('--num-workers', type=int, default=1,
180
                        help="Theta: defaults to # nodes. BGQ: the # of subblocks")
181 182
    parser.add_argument('--nodes-per-worker', type=int, default=1)
    parser.add_argument('--max-ranks-per-node', type=int, default=1,
183
                        help="For non-MPI jobs, how many to pack per worker")
184
    parser.add_argument('--time-limit-minutes', type=float, default=0,
185 186
                        help="Provide a walltime limit if not already imposed")
    parser.add_argument('--daemon', action='store_true')
187 188 189 190
    if inputcmd:
        return parser.parse_args(inputcmd)
    else:
        return parser.parse_args()
Michael Salim's avatar
Michael Salim committed
191

Michael Salim's avatar
Michael Salim committed
192 193
def detect_dead_runners(job_source):
    for job in job_source.by_states['RUNNING']:
194
        logger.info(f'Picked up dead running job {job.cute_id}: marking RESTART_READY')
Michael Salim's avatar
Michael Salim committed
195
        job.update_state('RESTART_READY', 'Detected dead runner')
Michael Salim's avatar
Michael Salim committed
196 197 198 199 200

if __name__ == "__main__":
    args = get_args()
    
    job_source = jobreader.JobReader.from_config(args)
Michael Salim's avatar
Michael Salim committed
201
    job_source.refresh_from_db()
Michael Salim's avatar
Michael Salim committed
202
    transition_pool = transitions.TransitionProcessPool()
203
    runner_group  = runners.RunnerGroup(transition_pool.lock)
204
    worker_group = worker.WorkerGroup(args, host_type=scheduler.host_type,
205 206
                                      workers_str=scheduler.workers_str,
                                      workers_file=scheduler.workers_file)
Michael Salim's avatar
Michael Salim committed
207

Michael Salim's avatar
Michael Salim committed
208 209
    detect_dead_runners(job_source)

Michael Salim's avatar
Michael Salim committed
210 211 212 213 214 215 216
    handl = lambda a,b: on_exit(runner_group, transition_pool, job_source)
    signal.signal(signal.SIGINT, handl)
    signal.signal(signal.SIGTERM, handl)
    signal.signal(signal.SIGHUP, handl)

    main(args, transition_pool, runner_group, job_source)
    on_exit(runner_group, transition_pool, job_source)