Commit e40262db authored by Michael Salim's avatar Michael Salim

working on launcher

parent b67da820
......@@ -2,21 +2,70 @@
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
import argparse
from collections import namedtuple
import os
import multiprocessing
import queue
import time
from django.conf import settings
import balsam.models
from balsam.models import BalsamJob, ApplicationDefinition
from balsam.models import BalsamJob
from balsam import scheduler
START_TIME = time.time() + 10.0
class BalsamLauncherException(Exception): pass
Worker = namedtuple('Worker', ['id', 'shape', 'block', 'corner',
'ranks_per_worker'])
class Worker:
def __init__(self, id, *, shape=None, block=None, corner=None,
ranks_per_worker=None):
self.id = id
self.shape = shape
self.block = block
self.corner = corner
self.ranks_per_worker = ranks_per_worker
self.idle = True
class WorkerGroup:
def __init__(self, config):
self.host_type = config.host_type
self.partition = config.partition
self.workers = []
self.setup = getattr(self, f"setup_{self.host_type}")
if self.host_type == 'DEFAULT':
self.num_workers = config.num_workers
else:
self.num_workers = None
self.setup()
def setup_CRAY(self):
node_ids = []
ranges = self.partition.split(',')
for node_range in ranges:
lo, *hi = node_range.split('-')
lo = int(lo)
if hi:
hi = int(hi[0])
node_ids.extend(list(range(lo, hi+1)))
else:
node_ids.append(lo)
for id in node_ids:
self.workers.append(Worker(id))
def setup_BGQ(self):
# Boot blocks
# Get (block, corner, shape) args for each sub-block
pass
def setup_DEFAULT(self):
for i in range(self.num_workers):
self.workers.apppend(Worker(i))
def get_idle_workers(self):
return [w for w in self.workers if w.idle]
SIGTIMEOUT = 'TIMEOUT!'
SIGNALS = {
......@@ -29,7 +78,7 @@ class JobRetriever:
def __init__(self, config):
self.job_pk_list = None
self.job_file = config.job_file
self._job_file = config.job_file
self.wf_name = config.wf_name
self.host_type = config.host_type
......@@ -37,14 +86,15 @@ class JobRetriever:
if self._job_file:
jobs = self._jobs_from_file()
else:
wf_name = options.consume_wf
jobs = self._jobs_from_wf(wf=wf_name)
jobs = self._jobs_from_wf(wf=self.wf_name)
return self._filter(jobs)
def _filter(self, jobs):
jobs = jobs.exclude(state__in=balsam.models.END_STATES)
jobs = jobs.filter(allowed_work_sites__icontains=settings.BALSAM_SITE)
return jobs
# Exclude jobs that are already in LauncherConfig pulled_jobs
# Otherwise, you'll be calling job.idle() and qstating too much
return [j for j in jobs if j.idle()]
def _jobs_from_file(self):
if self._job_pk_list is None:
......@@ -84,6 +134,7 @@ class LauncherConfig:
self.hostname = None
self.host_type = None
self.scheduler_id = None
self.pid = None
self.num_nodes = None
self.partition = None
self.walltime_seconds = None
......@@ -92,7 +143,7 @@ class LauncherConfig:
self.wf_name = args.consume_wf
self.consume_all = args.consume_all
self.num_workers = args.num_workers
self.ranks_per_worker_serial = args.ppn_serial
self.ranks_per_worker_serial = args.serial_jobs_per_worker
self.walltime_minutes = args.time_limit_minutes
self.set_hostname_and_type()
......@@ -104,17 +155,17 @@ class LauncherConfig:
def set_hostname_and_type(self):
from socket import gethostname
self.pid = os.getpid()
hostname = gethostname().lower()
self.hostname = hostname
for host_type, known_names in RECOGNIZED_HOSTS.values():
if any(hostname.find(name) >= 0 for name in known_names):
self.host_type = host_type
return
self.host_type = None # default
self.host_type = 'DEFAULT'
def query_scheduler(self):
if not scheduler.scheduler_class:
return
if not scheduler.scheduler_class: return
env = scheduler.get_environ()
self.scheduler_id = env.id
self.num_nodes = env.num_nodes
......@@ -136,35 +187,78 @@ class LauncherConfig:
def sufficient_time(self, job):
return 60*job.wall_time_minutes < self.remaining_time_seconds()
def check_timeout(self):
def check_timeout(self, active_runners):
if self.remaining_time_seconds() < 1.0:
for runner in self.active_runners:
for runner in active_runners:
runner.timeout(SIGTIMEOUT, None)
return True
return False
class TransitionProcessPool:
TRANSITIONS = {
'CREATED': check_parents,
'LAUNCHER_QUEUED': check_parents,
'AWAITING_PARENTS': check_parents,
'READY': stage_in,
'STAGED_IN': preprocess,
'RUN_DONE': postprocess,
'RUN_TIMEOUT': postprocess,
'RUN_ERROR': postprocess,
'POSTPROCESSED': stage_out
}
def __init__(self, num_transitions=None):
if not num_transitions:
num_transitions = settings.BALSAM_MAX_CONCURRENT_TRANSITIONS
self.job_queue = multiprocessing.Queue()
self.status_queue = multiprocessing.Queue()
self.procs = [
multiprocessing.Process( target=transitions.main,
args=(self.job_queue, self.status_queue))
for i in range(num_transitions)
]
for proc in self.procs:
proc.start()
def add_job(self, pk, transition_function):
m = transitions.JobMsg(pk, transition_function)
self.job_queue.put(m)
def get_statuses():
while not self.status_queue.empty():
try:
yield self.status_queue.get_nowait()
except queue.Empty:
break
def stop_processes(self):
while not self.job_queue.empty():
try:
self.job_queue.get_nowait()
except queue.Empty:
break
m = transitions.JobMsg('end', None)
for proc in self.procs:
self.job_queue.put(m)
def main(args):
launcher_config = LauncherConfig(args)
job_retriever = JobRetriever(launcher_config)
runners.setup(launcher_config)
workers = WorkerGroup(launcher_config)
workers.
transitions_pool = TransitionProcessPool()
# Initialize compute environment (any launcher actions that take place
# before jobs start): on BGQ, this means booting blocks
# Create workdirs for jobs: organized by workflow
# Job dirs named according to job.name (not pk)...use just enough of pk to
# resolve conflicts
while not launcher_config.check_timeout():
# keep a list of jobs I'm handling
# get_jobs() should only fetch new ones
# ping jobs I'm handling using job.service_ping
jobs = job_retriever.get_jobs()
transitions_pool.stop_processes()
# Query Balsam DB
# Run 10 multiprocessing "transitions": communicate via 2 queues
# Add jobs to transitions queue
# Maintain up to 50 active runners (1 runner tracks 1 subprocess-aprun)
# Handle timeout: invoke TIMEOUT in active runners; let transitions finish
# Stop adding new transitions to queue,
# Add transitions to error_handle all the RUN_TIMEOUT jobs
if __name__ == "__main__":
......@@ -177,9 +271,9 @@ if __name__ == "__main__":
group.add_argument('--consume-wf',
help="Continuously run jobs of specified workflow")
parser.add_argument('--num_workers', type=int, default=1,
parser.add_argument('--num-workers', type=int, default=1,
help="Theta: defaults to # nodes. BGQ: the # of subblocks")
parser.add_argument('--ppn-serial', type=int, default=4,
parser.add_argument('--serial-jobs-per-worker', type=int, default=4,
help="For non-MPI jobs, how many to pack per worker")
parser.add_argument('--time-limit-minutes', type=int,
help="Override auto-detected walltime limit (runs
......
class DefaultMPICommand(object):
class DEFAULTMPICommand(object):
def __init__(self):
self.mpi = 'mpirun'
self.nproc = '-n'
......@@ -36,7 +36,7 @@ class DefaultMPICommand(object):
return result
class BGQMPICommand(DefaultMPICommand):
class BGQMPICommand(DEFAULTMPICommand):
def __init__(self):
self.mpi = 'runjob'
self.nproc = '--np'
......@@ -62,7 +62,7 @@ class BGQMPICommand(DefaultMPICommand):
shape, block, corner = worker.shape, worker.block, worker.corner
return f"--shape {shape} --block {block} --corner {corner} "
class CRAYMPICommand(DefaultMPICommand):
class CRAYMPICommand(DEFAULTMPICommand):
def __init__(self):
# 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1
self.mpi = 'aprun'
......
......@@ -5,7 +5,7 @@ import sys
from subprocess import Popen, STDOUT
from mpi4py import MPI
from runners import cd
from balsam.launcher.runners import cd
class MPIEnsembleError(Exception): pass
......@@ -41,26 +41,21 @@ def run(job):
status_msg(job.id, "RUN_ERROR", msg=str(e))
raise MPIEnsembleError from e
else:
if retcode == 0:
status_msg(job.id, "RUN_FINISHED")
else:
status_msg(job.id, "RUN_ERROR", msg=f"error code {retcode}")
if retcode == 0: status_msg(job.id, "RUN_FINISHED")
else: status_msg(job.id, "RUN_ERROR", msg=f"process return code {retcode}")
finally:
proc.terminate()
def main(jobs_path):
job_list = None
proc = None
if RANK == 0:
with open(jobs_path) as fp:
with open(jobs_path) as fp:
job_list = list(read_jobs(fp))
job_list = COMM.bcast(job_list, root=0)
for job in job_list[RANK::COMM.size]:
run(job)
for job in job_list[RANK::COMM.size]: run(job)
if __name__ == "__main__":
path = sys.argv[1]
......
from collections import namedtuple
import functools
import os
from pathlib import Path
......@@ -11,23 +10,12 @@ from threading import Thread
from queue import Queue, Empty
import balsam.models
from balsam.launchers.launcher import SIGNALS, BGQ_HOSTS, CRAY_HOSTS
from balsam.launchers import mpi_commands
from balsam.launcher.launcher import SIGNALS
from balsam.launcher import mpi_commands
from balsam.launcher import mpi_ensemble
class BalsamRunnerException(Exception): pass
MPIcommand = None
def setup(launcher_config):
global MPIcommand
if launcher_config.host_type == 'BGQ':
MPIcommand = mpi_commands.BGQMPICommand()
elif launcher_config.host_type == 'CRAY':
MPIcommand = mpi_commands.CRAYMPICommand()
else:
MPIcommand = mpi_commands.DefaultMPICommand()
class cd:
'''Context manager for changing cwd'''
def __init__(self, new_path):
......@@ -42,28 +30,30 @@ class cd:
class MonitorStream(Thread):
'''Thread for non-blocking read of Runner's subprocess stdout'''
'''Thread: non-blocking read of a process's stdout'''
def __init__(self, runner_output):
super().__init__()
self.stream = runner_output
self.queue = Queue()
self.daemon = True
def run(self):
# Call readline until empty string is returned
for line in iter(self.stream.readline, b''):
self.queue.put(line.decode('utf-8'))
self.stream.close()
def available_lines(self):
while True:
try:
yield self.queue.get_nowait()
except Empty:
return
try: yield self.queue.get_nowait()
except Empty: return
class Runner:
'''Spawns ONE subprocess to run specified job(s) and monitor their execution'''
def __init__(self, job_list, worker_list):
def __init__(self, job_list, worker_list, host_type):
mpi_cmd_class = getattr(mpi_commands, f"{host_type}MPICommand")
self.mpi_cmd = mpi_cmd_class()
self.jobs = job_list
self.jobs_by_pk = {job.pk : job for job in self.jobs}
self.process = None
......@@ -80,7 +70,7 @@ class Runner:
self.monitor.start()
def update_jobs(self):
pass
raise NotImplementedError
@staticmethod
def get_app_cmd(job):
......@@ -101,7 +91,7 @@ class Runner:
class MPIRunner(Runner):
'''One subprocess, one job'''
def __init__(self, job_list, worker_list):
def __init__(self, job_list, worker_list, host_type):
super().__init__(job_list, worker_list)
if len(self.jobs) != 1:
......@@ -110,12 +100,12 @@ class MPIRunner(Runner):
job = self.jobs[0]
app_cmd = self.get_app_cmd(job)
mpi = MPICommand(job, worker_list)
mpi_str = self.mpi_cmd(job, worker_list)
basename = os.path.basename(job.working_directory)
outname = os.path.join(job.working_directory, f"{basename}.out")
self.outfile = open(outname, 'w+b')
command = f"{mpi} {app_cmd}"
command = f"{mpi_str} {app_cmd}"
self.popen_args['args'] = shlex.split(command)
self.popen_args['cwd'] = job.working_directory
self.popen_args['stdout'] = self.outfile
......@@ -140,9 +130,8 @@ class MPIRunner(Runner):
class MPIEnsembleRunner(Runner):
'''One subprocess: an ensemble of serial jobs run in an mpi4py wrapper'''
def __init__(self, job_list, worker_list):
def __init__(self, job_list, worker_list, host_type):
from balsam.launchers import mpi_ensemble
mpi_ensemble_exe = os.path.abspath(mpi_ensemble.__file__)
super().__init__(job_list, worker_list)
......@@ -160,10 +149,10 @@ class MPIEnsembleRunner(Runner):
cmd = self.get_app_cmd(job)
fp.write(f"{job.pk} {job.working_directory} {cmd}\n")
nproc = len(worker_list) * worker_list[0].ranks_per_worker
mpi = MPICommand(self.jobs[0], worker_list, nproc=nproc)
nproc = sum(w.ranks_per_worker for w in worker_list)
mpi_str = self.mpi_cmd(self.jobs[0], worker_list, nproc=nproc)
command = f"{mpi} {mpi_ensemble_exe} {self.ensemble_filename}"
command = f"{mpi_str} {mpi_ensemble_exe} {self.ensemble_filename}"
self.popen_args['args'] = shlex.split(command)
def update_jobs(self):
......
'''BalsamJob pre and post execution Transitions'''
'''BalsamJob pre and post execution'''
from collections import namedtuple
import logging
from common import transfer, MessageInterface, run_subprocess
from common import db_tools
logger = logging.getLogger(__name__)
from balsam import BalsamStatusSender
#from django.db import utils, connections, DEFAULT_DB_ALIAS
from django.core.exceptions import ObjectDoesNotExist
from balsam.schedulers import exceptions
def main(job_queue, status_queue):
while True:
job, process_function = job_queue.get()
process_function(job)
from common import transfer
logger = logging.getLogger(__name__)
class ProcessingError(Exception): pass
def check_parents(job):
pass
parents = job.get_parents()
ready = all(p.state == 'JOB_FINISHED' for p in parents)
if ready:
job.update_state('READY', 'dependencies satisfied')
elif job.state == 'CREATED':
job.update_state('NOT_READY', 'awaiting dependencies')
def stage_in(job):
''' if the job an input_url defined,
the files are copied to the local working_directory '''
# Create workdirs for jobs: use job.create_working_path
logger.debug('in stage_in')
message = 'job staged in'
job.update_state('STAGING_IN')
if not os.path.exists(job.working_directory):
job.create_working_path()
if job.input_url != '':
try:
transfer.stage_in(job.input_url + '/', job.working_directory + '/')
......@@ -36,16 +38,7 @@ def stage_in(job):
# no input url specified so stage in is complete
job.state = STAGED_IN.name
job.save(
update_fields=['state'],
using=db_tools.get_db_connection_id(
job.pk))
status_sender = BalsamStatusSender.BalsamStatusSender(
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
# stage out files for a job
job.update_state('STAGE_IN_DONE')
def stage_out(job):
''' if the job has files defined via the output_files and an output_url is defined,
......@@ -130,53 +123,6 @@ def preprocess(job):
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
# submit the job to the local scheduler
def submit(job):
''' this function submits the job to the local batch system '''
logger.debug('in submit')
message = ''
try:
# some schedulers have limits on the number of jobs that can
# be queued, so check to see if we are at that number
# If so, don't submit the job
jobs_queued = BalsamJob.objects.filter(state=QUEUED.name)
if len(jobs_queued) <= settings.BALSAM_MAX_QUEUED:
cmd = job.get_application_command()
scheduler.submit(job, cmd)
job.state = SUBMITTED.name
message = 'Job entered SUBMITTED state'
else:
message = 'Job submission delayed due to local queue limits'
except exceptions.SubmitNonZeroReturnCode as e:
message = 'scheduler returned non-zero value during submit command: ' + \
str(e)
logger.error(message)
job.state = SUBMIT_FAILED.name
except exceptions.SubmitSubprocessFailed as e:
message = 'subprocess in scheduler submit failed: ' + str(e)
logger.error(message)
job.state = SUBMIT_FAILED.name
except exceptions.JobSubmissionDisabled as e:
message = 'scheduler job submission is currently disabled: ' + str(e)
logger.error(message)
job.state = SUBMIT_DISABLED.name
except Exception as e:
message = 'received exception while calling scheduler submit for job ' + \
str(job.job_id) + ', exception: ' + str(e)
logger.exception(message)
job.state = SUBMIT_FAILED.name
job.save(update_fields=['state', 'scheduler_id'],
using=db_tools.get_db_connection_id(job.pk))
logger.debug('sending status message')
status_sender = BalsamStatusSender.BalsamStatusSender(
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
logger.debug('submit done')
# perform any post job processing needed
def postprocess(job):
''' some jobs need to have some postprocessing performed,
......@@ -231,14 +177,19 @@ def postprocess(job):
status_sender.send_status(job, message)
def finish_job(job):
''' simply change state to Finished and send status to user '''
job.state = JOB_FINISHED.name
job.save(
update_fields=['state'],
using=db_tools.get_db_connection_id(
job.pk))
message = "Success!"
status_sender = BalsamStatusSender.BalsamStatusSender(
settings.SENDER_CONFIG)
status_sender.send_status(job, message)
StatusMsg = namedtuple('Status', ['pk', 'state', 'msg'])
JobMsg = namedtuple('JobMsg', ['pk', 'transition_function'])
def main(job_queue, status_queue):
while True:
job, process_function = job_queue.get()
if job == 'end':
return
try:
process_function(job)
except ProcessingError as e:
s = StatusMsg(job.pk, 'FAILED', str(e))
status_queue.put(s)
else:
s = StatusMsg(job.pk, job.state, 'success')
status_queue.put(s)
......@@ -2,13 +2,12 @@ import os
import json
import logging
import sys
import datetime
from datetime import datetime
import uuid
from django.core.exceptions import ValidationError
from django.conf import settings
from django.db import models
from django.utils import timezone
from concurrency.fields import IntegerVersionField
from common import Serializer
......@@ -19,61 +18,56 @@ logger = logging.getLogger(__name__)
class InvalidStateError(ValidationError): pass
class InvalidParentsError(ValidationError): pass
TIME_FMT = '%m-%d-%Y %H:%M:%S'
STATES = '''
CREATED
NOT_READY
LAUNCHER_QUEUED
AWAITING_PARENTS
READY
STAGING_IN
STAGE_IN_DONE
PREPROCESSING
PREPROCESS_DONE
STAGED_IN
PREPROCESSED
RUN_READY
RUNNING
RUN_DONE
POSTPROCESSING
POSTPROCESS_DONE
STAGING_OUT
POSTPROCESSED
JOB_FINISHED
RUN_TIMEOUT
RUN_ERROR
RUN_ERROR_HANDLED
RESTART_READY
FAILED
DETECTED_DEAD_LAUNCHER
USER_KILLED
PARENT_KILLED'''.split()
ACTIVE_STATES = '''
STAGING_IN
PREPROCESSING
RUNNING
POSTPROCESSING
STAGING_OUT'''.split()
'''.split()
PROCESSABLE_STATES = '''
CREATED
NOT_READY
LAUNCHER_QUEUED
AWAITING_PARENTS
READY
STAGE_IN_DONE
PREPROCESS_DONE
STAGED_IN
RUN_DONE
POSTPROCESS_DONE
POSTPROCESSED
RUN_TIMEOUT
RUN_ERROR
RUN_ERROR_HANDLED
DETECTED_DEAD_LAUNCHER'''.split()
'''.split()
RUNNABLE_STATES = '''
RUN_READY
RESTART_READY'''.split()
PREPROCESSED
RESTART_READY
'''.split()
END_STATES = '''
JOB_FINISHED
FAILED
USER_KILLED
PARENT_KILLED'''.split()
def assert_disjoint():
......@@ -92,7 +86,10 @@ def validate_state(value):
raise InvalidStateError(f"{value} is not a valid state in balsam.models")
def get_time_string():
return timezone.now().strftime('%m-%d-%Y %H:%M:%S')
return datetime.now().strftime(TIME_FMT)
def from_time_string(s):
return datetime.strptime(s, TIME_FMT)
def history_line(state='CREATED', message=''):
newline = '' if state=='CREATED' else '\n'
......@@ -191,11 +188,10 @@ class BalsamJob(models.Model):
help_text="Colon-separated list of envs like VAR1=value1:VAR2=value2",
default='')
launcher_info = models.TextField(
ping_info = models.TextField(
'Scheduler ID',
help_text='Information on the launcher (such as scheduler ID, queue) that most recently touched this job',
default='')
launcher_ping_time = models.DateTimeField(default=timezone.now)
help_text='Information on the service (such as scheduler ID, queue) that most recently touched this job',
default='{}')
application = models.TextField(
'Application to Run',
......@@ -262,21 +258,47 @@ stage_out_urls: {self.stage_out_urls}
wall_time_minutes: {self.wall_time_minutes}
num_nodes: {self.num_nodes}
processes_per_node: {self.processes_per_node}
launcher_info: {self.launcher_info}
launcher_ping_time: {self.launcher_ping_str()}
ping_info: {self.ping_info}
runtime_seconds: {self.runtime_seconds}
application: {self.application}
'''
return s.strip() + '\n'
def launcher_ping_str(self):
if self.launcher_ping_time is None:
return ''
return self.launcher_ping_time.strftime('%m-%d-%Y %H:%M:%S')
def launcher_ping(self):
self.launcher_ping_time = timezone.now()
self.save(update_fields=['launcher_ping_time'])