Commit 185c199a authored by Michael Salim's avatar Michael Salim
Browse files

launcher

parent 3cedc3d2
import balsam.models
from balsam.models import BalsamJob
class JobReader(dict):
class JobReader():
'''Interface with BalsamJob DB & pull relevant jobs'''
@staticmethod
def from_config(config):
......@@ -13,36 +13,28 @@ class JobReader(dict):
def by_states(self):
'''dict of jobs keyed by state'''
result = defaultdict(list)
for job self.values():
for job in self.jobs:
result[job.state].append(job)
return result
@property
def jobs(self): return self.values()
@property
def pks(self): return self.keys()
def refresh_from_db(self):
'''caller invokes this to read from DB'''
jobs = self._get_jobs()
jobs = self._filter(jobs)
job_dict = {job.pk : job for job in jobs}
self.update(job_dict)
self.jobs = jobs
def _get_jobs(self): raise NotImplementedError
def _filter(self, job_queryset):
jobs = job_queryset.exclude(state__in=balsam.models.END_STATES)
jobs = jobs.filter(allowed_work_sites__icontains=settings.BALSAM_SITE)
jobs = jobs.exclude(job_id__in=self.keys())
return [j for j in jobs if j.idle()]
return jobs
class FileJobReader(JobReader):
'''Limit to job PKs specified in a file'''
'''Limit to job PKs specified in a file. Used by metascheduler.'''
def __init__(self, job_file):
super().__init__()
self.jobs = []
self.job_file = job_file
self.pk_list = None
......@@ -55,13 +47,15 @@ class FileJobReader(JobReader):
class WFJobReader(JobReader):
'''Consume all jobs from DB, optionally matching a Workflow name'''
'''Consume all jobs from DB, optionally matching a Workflow name.
Will not consume jobs scheduled by metascheduler'''
def __init__(self, wf_name):
super().__init__()
self.jobs = []
self.wf_name = wf_name
def _get_jobs(self):
objects = BalsamJob.objects
wf = self.wf_name
jobs = objects.filter(workflow=wf) if wf else objects.all()
jobs = jobs.filter(scheduler_id__exact='')
return jobs
......@@ -6,8 +6,11 @@ from collections import defaultdict
import os
import multiprocessing
import queue
from sys import exit
import signal
import time
import django
from django.conf import settings
from django.db import transaction
......@@ -18,14 +21,9 @@ from balsam.launcher import worker
from balsam.launcher.exceptions import *
START_TIME = time.time() + 10.0
RUNNABLE_STATES = ['PREPROCESSED', 'RESTART_READY']
SIGTIMEOUT = 'TIMEOUT'
SIGNALS = {
signal.SIGINT: 'SIG_INT',
signal.SIGTERM: 'SIG_TERM',
}
def delay(period=10.0):
def delay(period=settings.BALSAM_SERVICE_PERIOD):
nexttime = time.time() + period
while True:
now = time.time()
......@@ -53,9 +51,6 @@ class HostEnvironment:
self.partition = None
self.walltime_seconds = None
self.job_file = args.job_file
self.wf_name = args.consume_wf
self.consume_all = args.consume_all
self.num_workers = args.num_workers
self.ranks_per_worker_serial = args.serial_jobs_per_worker
self.walltime_minutes = args.time_limit_minutes
......@@ -107,7 +102,7 @@ class HostEnvironment:
return False
def get_runnable_jobs(jobs, running_pks, host_env):
runnable_jobs = [job for job in jobsource.jobs
runnable_jobs = [job for job in jobs
if job.pk not in running_pks and
job.state in RUNNABLE_STATES and
host_env.sufficient_time(job)]
......@@ -125,13 +120,9 @@ def create_new_runners(jobs, runner_group, worker_group, host_env):
running_pks = runner_group.running_job_pks
runnable_jobs = get_runnable_jobs(jobs, running_pks, host_env)
def main(args):
def main(args, transition_pool, runner_group, job_source):
host_env = HostEnvironment(args)
worker_group = worker.WorkerGroup(host_env)
jobsource = jobreader.JobReader.from_config(args)
transition_pool = transitions.TransitionProcessPool()
runner_group = runners.RunnerGroup()
delay_timer = delay()
# Main Launcher Service Loop
......@@ -141,39 +132,46 @@ def main(args):
logger.debug(f'Transition: {stat.pk} {stat.state}: {stat.msg}')
wait = False
jobsource.refresh_from_db()
job_source.refresh_from_db()
transitionable_jobs = [
job for job in jobsource.jobs
job for job in job_source.jobs
if job not in transitions_pool
and job.state in transitions_pool.TRANSITIONS
]
for job in transitionable_jobs:
transitions_pool.add_job(job)
transition_pool.add_job(job)
wait = False
runner_group.update_and_remove_finished()
any_finished = create_new_runners(
jobsource.jobs, runner_group, worker_group, host_env
)
any_finished = runner_group.update_and_remove_finished()
create_new_runners(job_source.jobs, runner_group, worker_group, host_env)
if any_finished: wait = False
if wait: next(delay_timer)
transitions_pool.stop_processes()
def on_exit(runner_group, transition_pool, job_source):
transition_pool.flush_job_queue()
runner_group.update_and_remove_finished()
for runner in runner_group:
runner.timeout(SIGTIMEOUT, None)
runner.timeout()
job_source.refresh_from_db()
timedout_jobs = job_source.by_states['RUN_TIMEOUT']
for job in timedout_jobs:
transition_pool.add_job(job)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Start Balsam Job Launcher.")
transition_pool.end_and_wait()
exit(0)
def get_args():
parser = argparse.ArgumentParser(description="Start Balsam Job Launcher.")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--job-file', help="File of Balsam job IDs")
group.add_argument('--consume-all', action='store_true',
help="Continuously run all jobs from DB")
group.add_argument('--wf-name',
help="Continuously run jobs of specified workflow")
parser.add_argument('--num-workers', type=int, default=1,
help="Theta: defaults to # nodes. BGQ: the # of subblocks")
parser.add_argument('--serial-jobs-per-worker', type=int, default=4,
......@@ -181,7 +179,22 @@ if __name__ == "__main__":
parser.add_argument('--time-limit-minutes', type=int,
help="Override auto-detected walltime limit (runs
forever if no limit is detected or specified)")
args = parser.parse_args()
# TODO: intercept KeyboardInterrupt and all INT,TERM signals
# Cleanup actions; mark jobs as idle
main(args)
return parser.parse_args()
if __name__ == "__main__":
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
args = get_args()
job_source = jobreader.JobReader.from_config(args)
runner_group = runners.RunnerGroup()
transition_pool = transitions.TransitionProcessPool()
handl = lambda a,b: on_exit(runner_group, transition_pool, job_source)
signal.signal(signal.SIGINT, handl)
signal.signal(signal.SIGTERM, handl)
signal.signal(signal.SIGHUP, handl)
main(args, transition_pool, runner_group, job_source)
on_exit(runner_group, transition_pool, job_source)
......@@ -38,7 +38,7 @@ def run(job):
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT)
retcode = proc.wait()
except Exception as e:
status_msg(job.id, "RUN_ERROR", msg=str(e))
status_msg(job.id, "FAILED", msg=str(e))
raise MPIEnsembleError from e
else:
if retcode == 0: status_msg(job.id, "RUN_FINISHED")
......
......@@ -7,7 +7,6 @@ import functools
from math import ceil
import os
from pathlib import Path
import signal
import shlex
import sys
from subprocess import Popen, PIPE, STDOUT
......@@ -18,7 +17,6 @@ from queue import Queue, Empty
from django.conf import settings
import balsam.models
from balsam.launcher.launcher import SIGNALS
from balsam.launcher import mpi_commands
from balsam.launcher import mpi_ensemble
from balsam.launcher.exceptions import *
......@@ -72,8 +70,6 @@ class Runner:
self.popen_args = {}
def start(self):
for signum in SIGNALS:
signal.signal(sigum, self.timeout)
self.process = Popen(**self.popen_args)
if self.popen_args['stdout'] == PIPE:
self.monitor = MonitorStream(self.process.stdout)
......@@ -93,14 +89,11 @@ class Runner:
else:
return job.direct_command
def timeout(self, signum, stack)
sig_msg = SIGNALS.get(signum, signum)
message = f"{self.__class__.__name__} got signal {sig_msg}"
self.update_jobs()
def timeout(self):
self.process.terminate()
for job in self.jobs:
if job.state == 'RUNNING':
job.update_state('RUN_TIMEOUT', message)
with transaction.atomic():
for job in self.jobs:
if job.state == 'RUNNING': job.update_state('RUN_TIMEOUT')
class MPIRunner(Runner):
'''One subprocess, one job'''
......@@ -138,9 +131,7 @@ class MPIRunner(Runner):
else:
curstate = 'RUN_ERROR'
msg = str(retcode)
if job.state != curstate:
job.update_state(curstate, msg) # TODO: handle RecordModified
job.service_ping()
if job.state != curstate: job.update_state(curstate, msg) # TODO: handle RecordModified
class MPIEnsembleRunner(Runner):
......@@ -180,14 +171,15 @@ class MPIEnsembleRunner(Runner):
job.update_state(state, msg) # TODO: handle RecordModified exception
else:
raise BalsamRunnerException(f"Invalid status update: {status}")
for job in self.jobs:
job.service_ping()
class RunnerGroup:
MAX_CONCURRENT_RUNNERS = settings.BALSAM_MAX_CONCURRENT_RUNNERS
def __init__(self):
self.runners = []
def __iter__(self):
return iter(self.runners)
def create_next_runner(runnable_jobs, workers):
'''Implements one particular strategy for choosing the next job, assuming
......@@ -230,6 +222,7 @@ class RunnerGroup:
if not jobs: raise NoAvailableWorkers
runner = runner_class(jobs, assigned_workers)
runner.start()
self.runners.append(runner)
for worker in assigned_workers: worker.idle = False
......@@ -238,13 +231,14 @@ class RunnerGroup:
# transaction save significantly?
any_finished = False
with transaction.atomic():
for runner in self.runners[:]:
runner.update_jobs()
if runner.finished():
any_finished = True
self.runners.remove(runner)
for worker in runner.worker_list:
worker.idle = True
for runner in self.runners: runner.update_jobs()
for runner in self.runners[:]:
if runner.finished():
any_finished = True
self.runners.remove(runner)
for worker in runner.worker_list:
worker.idle = True
return any_finished
@property
......
......@@ -4,6 +4,7 @@ import logging
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from django import db
from common import transfer
from balsam.launcher.exceptions import *
......@@ -13,11 +14,13 @@ logger = logging.getLogger(__name__)
StatusMsg = namedtuple('Status', ['pk', 'state', 'msg'])
JobMsg = namedtuple('JobMsg', ['pk', 'transition_function'])
def main(job_queue, status_queue):
db.connection.close()
while True:
job, process_function = job_queue.get()
if job == 'end':
return
if job == 'end': return
try:
process_function(job)
except BalsamTransitionError as e:
......@@ -27,8 +30,11 @@ def main(job_queue, status_queue):
s = StatusMsg(job.pk, job.state, 'success')
status_queue.put(s)
class TransitionProcessPool:
NUM_PROC = settings.BALSAM_MAX_CONCURRENT_TRANSITIONS
def __init__(self):
self.job_queue = multiprocessing.Queue()
......@@ -40,6 +46,7 @@ class TransitionProcessPool:
args=(self.job_queue, self.status_queue))
for i in range(NUM_PROC)
]
db.connections.close_all()
for proc in self.procs: proc.start()
def __contains__(self, job):
......@@ -63,15 +70,18 @@ class TransitionProcessPool:
except queue.Empty:
break
def stop_processes(self):
def flush_job_queue(self):
while not self.job_queue.empty():
try:
self.job_queue.get_nowait()
except queue.Empty:
break
def end_and_wait(self):
m = JobMsg('end', None)
for proc in self.procs:
self.job_queue.put(m)
for proc in self.procs: proc.wait()
self.transitions_pk_list = []
......
......@@ -12,7 +12,6 @@ from django.db import models
from concurrency.fields import IntegerVersionField
from common import Serializer
from balsam import scheduler, BalsamJobMessage
logger = logging.getLogger(__name__)
......@@ -184,10 +183,10 @@ class BalsamJob(models.Model):
help_text="Colon-separated list of envs like VAR1=value1:VAR2=value2",
default='')
ping_info = models.TextField(
scheduler_id = models.TextField(
'Scheduler ID',
help_text='Information on the service (such as scheduler ID, queue) that most recently touched this job',
default='{}')
help_text='Scheduler ID (if job assigned by metascheduler)',
default='')
application = models.TextField(
'Application to Run',
......@@ -253,48 +252,12 @@ stage_out_urls: {self.stage_out_urls}
wall_time_minutes: {self.wall_time_minutes}
num_nodes: {self.num_nodes}
processes_per_node: {self.processes_per_node}
ping_info: {self.ping_info}
scheduler_id: {self.scheduler_id}
runtime_seconds: {self.runtime_seconds}
application: {self.application}
'''
return s.strip() + '\n'
def idle(self):
'''job.ping_info has a 'ping' time key: 1) if key missing, job has not
been touched yet 2) if None, then service has signalled job is now free.
If the job is LAUNCHER_QUEUED and appears in local scheduler, it's busy.
Otherwise, the job is idle if it has not been pinged in the last 5
minutes (signalling that a service processing the job crashed)'''
info = self.get_ping_info()
if 'ping' not in info: return True
if info['ping'] is None: return True # signals idle
sched_id = info['scheduler_id']
if self.state == 'LAUNCHER_QUEUED' and sched_id:
try: queue_stat = scheduler.get_job_status(sched_id)
except scheduler.NoQStatInformation: return True # not in queue
else: return False # in queue; not idle
last_ping = (info['ping'] - datetime.now()).total_seconds()
if last_ping > 300.0: return True # detect hard failure; dead launcher
else: return False
def get_ping_info(self):
info = json.loads(self.ping_info)
if info['ping'] is not None:
info['ping'] = from_time_string(info['ping'])
return info
def service_ping(self, *, scheduler_id=None, set_idle=False):
if set_idle: time = None
else: time = get_time_string()
pid = os.getpid()
hostname = gethostname()
info = dict(ping=time, scheduler_id=scheduler_id, pid=pid,
hostname=hostname)
self.ping_info = json.dumps(info)
self.save(update_fields=['ping_info'])
def get_parents_by_id(self):
return json.loads(self.parents)
......
......@@ -2,5 +2,6 @@
#source /gpfs/mira-home/msalim/argobalsam/env/bin/activate
export ARGOBALSAM_INSTALL_PATH=$(pwd)
export PYTHONPATH=$ARGOBALSAM_INSTALL_PATH:$PYTHONPATH
export ARGOBALSAM_DATA_PATH=$ARGOBALSAM_INSTALL_PATH/data
export ARGOBALSAM_EXE_PATH=$ARGOBALSAM_INSTALL_PATH/exe
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment