Commit f3103f61 authored by Michael Salim's avatar Michael Salim

Restructured packages: balsam launcher separate from service

parent 2a3e4f59
** INSTALLATION
# install virtualenv
https://pypi.python.org/pypi/virtualenv
# set up an installation directory
export BASEDIR=/path/to/installation
# set up virtual env
mkdir $BASEDIR
cd $BASEDIR
virtualenv balsam_env
. balsam_env/bin/activate
# install prerequisites
pip install django
pip install south
pip install pika
# install balsam_core
# (extract the tar file in some directory other than $BASEDIR)
tar xzf balsam_core.tgz
cd balsam_core
python setup.py install --prefix=$BASEDIR/balsam_env
cd $BASEDIR
django-admin.py startproject balsam_deploy
#cp settings.py $BASEDIR/balsam_deploy/balsam_deploy
# edit $BASEDIR/balsam_deploy/balsam_deploy/settings.py
# - set the database filename
# - add balsam_core and south to the INSTALLED_APPLICATIONS
# - add 'from argo_settings import *' at end
# edit argo_settings.py and change the following:
- in the DATABASES section, set the NAME field to the full path to an sqlite3 file, preferably in $BASEDIR/balsam_deploy
- set BALSAM_DEPLOYMENT_DIRECTORY to $BASEDIR/balsam_deploy
- set BALSAM_WORK_DIRECTORY. A work directory will be created here for each job, and the job will be run in this directory.
- set BALSAM_SCHEDULER_SUBMIT_EXE, BALSAM_SCHEDULER_STATUS_EXE appropriately
- set BALSAM_GLOBUS_URL_COPY_EXE, BALSAM_GRID_PROXY_INIT_EXE appropriately
- set BALSAM_ALLOWED_EXECUTABLE_DIRECTORY. Only executables from this directory will be executed.
cd balsam_deploy
python manage.py syncdb --noinput
** START BALSAM SERVICE
The service fetches jobs from the messages queues and adds them to the database, and updates the job status in the database periodically. It operates on a period defined by BALSAM_FETCH_DELAY in settings.py.
. balsam_env/bin/activate
cd $BASEDIR/balsam_deploy
python manage.py balsam_service
** START BALSAM DAEMON
The daemon queries the local database for jobs to be run and manages them over their lifetime. It operates on a period defined by BALSAM_EXECUTION_DELAY in settings.py.
. balsam_env/bin/activate
cd $BASEDIR/balsam_deploy
python manage.py balsam_daemon
** ADD TEST JOB TO MESSAGE QUEUE
. balsam_env/bin/activate
cd $BASEDIR/rabbitmq
./newjob testjob
** INTEGRATION POINTS
- in settings.py, BALSAM_SCHEDULER_SUBMIT_EXE, BALSAM_SCHEDULER_STATUS_EXE identify the qsub and qstat executables, respectively
- in settings.py, BALSAM_GLOBUS_URL_COPY_EXE, BALSAM_GRID_PROXY_INIT_EXE identify the globus-url-copy and grid-proxy-init executables, respectively
- balsam_env/lib/python2.6/site-packages/balsam_core/scheduler.py is where qsub and qstat are called
- balsam_env/lib/python2.6/site-packages/balsam_core/management/commands/balsam_service.py is where qstat output is parsed for updating job status
- the queue for job submission is hard-coded in scheduler.py
......@@ -294,6 +294,10 @@ auto timeout retry: {self.auto_timeout_retry}
parent_ids = self.get_parents_by_id()
return BalsamJob.objects.filter(job_id__in=parent_ids)
@property
def cute_id(self):
return f"[{ str(self.pk)[:8] }]"
def get_children(self):
return BalsamJob.objects.filter(parents__icontains=str(self.pk))
......@@ -357,7 +361,7 @@ auto timeout retry: {self.auto_timeout_retry}
self.save(update_fields=['state', 'state_history'])
def get_recent_state_str(self):
return self.state_history.split("\n")[-1].strip()[1:-1]
return self.state_history.split("\n")[-1].strip()
def get_line_string(self):
recent_state = self.get_recent_state_str()
......@@ -454,3 +458,7 @@ Envs: {self.environ_vars}
'preprocess', 'postprocess',
'description')
return output
@property
def cute_id(self):
return f"[{ str(self.pk)[:8] }]"
from django.conf import settings
from importlib import import_module
from balsam.schedulers.exceptions import *
scheduler_class = settings.BALSAM_SCHEDULER_CLASS
def _dummy(): pass
if scheduler_class:
_temp = import_module('balsam.schedulers.'+scheduler_class)
submit = _temp.submit
get_job_status = _temp.get_job_status
get_environ = _temp.get_environ
else:
submit = _dummy
get_job_status = _dummy
get_environ = _dummy
......@@ -2,107 +2,78 @@ import subprocess
import sys
import shlex
import os
import logging
import time
from collections import namedtuple
from django.conf import settings
from balsam.schedulers.exceptions import *
from balsam.schedulers import Scheduler
from common import run_subprocess
import logging
logger = logging.getLogger(__name__)
def get_environ():
SchedulerEnv = namedtuple('SchedulerEnv', ['id', 'num_nodes', 'partition'])
def new_scheduler():
return CobaltScheduler()
class CobaltScheduler(Scheduler.Scheduler):
SCHEDULER_VARIABLES = {
'id' : 'COBALT_JOBID',
'num_workers' : 'COBALT_PARTSIZE',
'workers_str' : 'COBALT_PARTNAME',
}
JOBSTATUS_VARIABLES = {
'id' : 'JobID',
'time_remaining' : 'TimeRemaining',
'state' : 'State',
}
def _make_submit_cmd(self, job, cmd):
exe = settings.BALSAM_SCHEDULER_SUBMIT_EXE # qsub
return (f"{exe} -A {job.project} -q {job.queue} -n {job.num_nodes} "
f"-t {job.wall_time_minutes} --cwd {job.working_directory} {cmd}")
def _post_submit(self, job, cmd, submit_output):
'''Return a SubmissionRecord: contains scheduler ID'''
try: scheduler_id = int(output)
except ValueError: scheduler_id = int(output.split()[-1])
logger.debug(f'job {job.cute_id} submitted as Cobalt job {scheduler_id}')
return Scheduler.SubmissionRecord(scheduler_id=scheduler_id)
def get_status(self, scheduler_id, jobstatus_vars=None):
if jobstatus_vars is None:
jobstatus_vars = JOBSTATUS_VARIABLES.values()
else:
jobstatus_vars = [JOBSTATUS_VARIABLES[a] for a in jobstatus_vars]
info = qstat(scheduler_id, attrs)
for attr in info:
if 'time' in attr:
time = time.strptime(info[attr], '%H:%M:%S')
time_sec = time.hour*3600 + time.min*60 + time.sec
info[attr+'_sec'] = time_sec
return info
def qstat(scheduler_id, attrs):
exe = settings.BALSAM_SCHEDULER_STATUS_EXE
qstat_cmd = f"{exe} {scheduler_id}"
os.environ['QSTAT_HEADER'] = ':'.join(attrs)
p = subprocess.Popen(shlex.split(qstat_cmd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
logger.debug('qstat ouput: \n' + stdout)
if p.returncode != 0:
logger.error('return code for qstat is non-zero. stdout = \n' +
stdout + '\n stderr = \n' + stderr)
try:
cobalt_id = os.environ['COBALT_JOBID']
num_nodes = int(os.environ['COBALT_PARTSIZE'])
partition = os.environ['COBALT_PARTNAME']
except KeyError:
raise SchedulerException("Can't read COBALT_JOBID. Are you really on MOM node?")
return SchedulerEnv(cobalt_id, num_nodes, partition)
def submit(job, cmd):
''' should submit a job to the queue and raise a pre-defined sheduler exception if something fails'''
logger.info("Submitting Cobalt job: %d", job.id)
logger.debug("Submitting command: " + cmd)
command = '%s -A %s -q %s -n %d -t %d --cwd %s %s' % (settings.BALSAM_SCHEDULER_SUBMIT_EXE,
job.project,
job.queue,
job.num_nodes,
job.wall_time_minutes,
job.working_directory,
cmd)
logger.debug('CobaltScheduler command = %s', command)
if settings.BALSAM_SUBMIT_JOBS:
try:
output = run_subprocess.run_subprocess(command)
output = output.strip()
try:
scheduler_id = int(output)
except ValueError:
scheduler_id = int(output.split()[-1])
logger.debug('CobaltScheduler job (pk=' + str(job.pk) +
') submitted to scheduler as job ' + str(output))
job.scheduler_id = scheduler_id
except run_subprocess.SubprocessNonzeroReturnCode as e:
raise exceptions.SubmitNonZeroReturnCode(
'CobaltScheduler submit command returned non-zero value. command = "' +
command +
'", exception: ' +
str(e))
except run_subprocess.SubprocessFailed as e:
raise exceptions.SubmitSubprocessFailed(
'CobaltScheduler subprocess to run commit command failed with exception: ' + str(e))
qstat_fields = stdout.split('\n')[2].split()
qstat_info = {attr : qstat_fields[i] for (i,attr) in
enumerate(attrs)}
except:
raise NoQStatInformation
else:
raise exceptions.JobSubmissionDisabled(
'CobaltScheduler Job submission disabled')
logger.debug('CobaltScheduler Job submission complete')
def get_job_status(scheduler_id):
qstat = QStat(scheduler_id)
if not qstat.qstat_info:
raise NoQStatInformation(f"There was no qstat output for scheduler ID {scheduler_id}")
info = qstat.qstat_info
for attr in info:
if 'time' in attr:
time = time.strptime(info[attr], '%H:%M:%S')
time_sec = time.hour*3600 + time.min*60 + time.sec
info[attr+'_sec'] = time_sec
return info
class QStat:
QSTAT_ATTRS = "JobID Nodes WallTime State".split()
QSTAT_EXE = settings.BALSAM_SCHEDULER_STATUS_EXE
def __init__(self, scheduler_id):
qstat_cmd = f"{QSTAT_EXE} {scheduler_id}"
try:
os.environ['QSTAT_HEADER'] = ':'.join(QSTAT_ATTRS)
p = subprocess.Popen( shlex.split(qstat_cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except BaseException:
logger.exception('received exception while trying to run qstat: ' + str(sys.exc_info()[1]))
raise
stdout, stderr = p.communicate()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
logger.debug(' qstat ouput: \n' + stdout)
if p.returncode != 0:
logger.exception('return code for qstat is non-zero. stdout = \n' +
stdout + '\n stderr = \n' + stderr)
try:
qstat_fields = stdout.split('\n')[2].split()
self.qstat_info = {attr.lower() : qstat_fields[i] for (i,attr) in
enumerate(QSTAT_ATTRS)}
except:
self.qstat_info = {}
return qstat_info
from django.conf import settings
from importlib import import_module
from balsam.schedulers.exceptions import *
from socket import gethostname
import os
import time
import logging
logger = logging.getLogger(__name__)
class SubmissionRecord:
'''Extend me: contains any data returned after queue submission'''
def __init__(self, *, scheduler_id=None):
self.scheduler_id = scheduler_id
class Scheduler:
RECOGNIZED_HOSTS = {
'BGQ' : 'vesta cetus mira'.split(),
'CRAY' : 'theta'.split(),
}
SCHEDULER_VARIABLES = {} # mappings defined in subclass
JOBSTATUS_VARIABLES = {}
def __init__(self):
self.hostname = None
self.host_type = None
self.pid = None
self.num_workers = None
self.workers_str = None
self.current_scheduler_id = None
self.remaining_seconds = None
self.last_check_seconds = None
self.pid = os.getpid()
self.hostname = gethostname()
for host_type, known_names in self.RECOGNIZED_HOSTS.items():
if any(self.hostname.find(name) >= 0 for name in known_names):
self.host_type = host_type
if self.host_type is None:
self.host_type = 'DEFAULT'
logger.debug(f"Recognized host_type: {self.host_type}")
logger.debug(f"Using scheduler class {self.__class__}")
try:
self.get_env()
except SchedulerException:
logger.debug(f"Did not detect a scheduler ID")
return
else:
logger.debug(f"Detected scheduler ID {self.current_scheduler_id}")
def get_env(self):
environment = {}
for generic_name, specific_var in self.SCHEDULER_VARIABLES.items():
if specific_var not in os.environ:
raise SchedulerException(f"{specific_var} not in environment")
else:
environment[generic_name] = os.environ[specific_var]
if 'id' in environment:
self.current_scheduler_id = environment['id']
if 'num_workers' in environment:
self.num_workers = environment['num_workers']
if 'workers_str' in environment:
self.workers_str = environment['workers_str']
return environment
def remaining_time_seconds(self, sched_id=None):
'''Remaining time from a qstat or internal timer'''
if sched_id:
info = self.get_status(sched_id)
return info['time_remaining_sec']
elif self.remaining_seconds:
now = time.time()
elapsed_time = now - self.last_check_seconds
self.remaining_seconds -= elapsed_time
self.last_check_seconds = now
return self.remaining_seconds
sched_id = self.current_scheduler_id
if sched_id is None:
return float("inf")
info = self.get_status(sched_id)
self.remaining_seconds = info['time_remaining_sec']
self.last_check_seconds = time.time()
return self.remaining_seconds
def submit(self, job, cmd):
logger.info(f"Submit {job.cute_id} {cmd} ({self.__class__})")
self._pre_submit(self, job, cmd)
submit_cmd = self._make_submit_cmd(job, cmd)
p = subprocess.Popen(submit_cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
if p.returncode != 0: raise SubmitNonZeroReturnCode
submissionRecord = self._post_submit(job, cmd, stdout)
return submissionRecord
def get_status(self, scheduler_id, jobstatus_vars=['state','time_remaining']):
raise NotImplementedError
def _make_submit_cmd(self, job, cmd):
'''must return the string which is passed to Popen'''
raise NotImplementedError
def _pre_submit(self, job, cmd):
'''optional pre-submit'''
pass
def _post_submit(self, job, cmd, submit_output):
'''Returns SubmissionRecord: contains scheduler ID'''
pass
scheduler_class = settings.BALSAM_SCHEDULER_CLASS
if scheduler_class:
_temp = import_module('balsam.schedulers.'+scheduler_class)
scheduler_main = _temp.new_scheduler()
else:
scheduler_main = Scheduler()
......@@ -69,11 +69,10 @@ def add_job(**kwargs):
return job
def detect_circular(job, path=[]):
if job.pk in path: return True
path = path[:] + [job.pk]
for parent in job.get_parents():
if parent.pk in path:
return True
else:
return detect_circular(parent, path+[parent.pk])
if detect_circular(parent, path): return True
return False
def add_dependency(parent,child):
......
from collections import defaultdict
from django.conf import settings
import balsam.models
from balsam.models import BalsamJob
import logging
logger = logging.getLogger(__name__)
class JobReader():
'''Interface with BalsamJob DB & pull relevant jobs'''
@staticmethod
......@@ -38,6 +42,7 @@ class FileJobReader(JobReader):
self.jobs = []
self.job_file = job_file
self.pk_list = None
logger.info(f"Taking jobs from file {self.job_file}")
def _get_jobs(self):
if self.pk_list is None:
......@@ -53,6 +58,10 @@ class WFJobReader(JobReader):
def __init__(self, wf_name):
self.jobs = []
self.wf_name = wf_name
if wf_name:
logger.info(f"Consuming jobs from workflow {wf_name}")
else:
logger.info("Consuming all jobs from local DB")
def _get_jobs(self):
objects = BalsamJob.objects
......
'''The Launcher is either invoked by the user, who bypasses the Balsam
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
import os
import django
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
import argparse
import os
from sys import exit
import signal
import time
import django
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
from django.conf import settings
from balsam import scheduler
from balsam.launcher import jobreader
from balsam.launcher import transitions
from balsam.launcher import worker
from balsam.launcher import runners
from balsam.launcher.exceptions import *
import logging
logger = logging.getLogger('balsamlauncher')
logger.info("Loading Balsam Launcher")
from balsam.schedulers import Scheduler
scheduler = Scheduler.scheduler_main
from balsamlauncher import jobreader
from balsamlauncher import transitions
from balsamlauncher import worker
from balsamlauncher import runners
from balsamlauncher.exceptions import *
START_TIME = time.time() + 10.0
RUNNABLE_STATES = ['PREPROCESSED', 'RESTART_READY']
def delay(period=settings.BALSAM_SERVICE_PERIOD):
......@@ -35,84 +39,20 @@ def delay(period=settings.BALSAM_SERVICE_PERIOD):
nexttime = now + tosleep + period
yield
def sufficient_time(job):
return 60*job.wall_time_minutes < scheduler.remaining_time_seconds()
class HostEnvironment:
'''Set user- and environment-specific settings for this run'''
RECOGNIZED_HOSTS = {
'BGQ' : 'vesta cetus mira'.split(),
'CRAY' : 'theta'.split(),
}
def __init__(self, args):
self.hostname = None
self.host_type = None
self.scheduler_id = None
self.pid = None
self.num_nodes = None
self.partition = None
self.walltime_seconds = None
self.num_workers = args.num_workers
self.ranks_per_worker_serial = args.serial_jobs_per_worker
self.walltime_minutes = args.time_limit_minutes
self.set_hostname_and_type()
self.query_scheduler()
if self.walltime_minutes is not None:
self.walltime_seconds = self.walltime_minutes * 60
def set_hostname_and_type(self):
from socket import gethostname
self.pid = os.getpid()
hostname = gethostname().lower()
self.hostname = hostname
for host_type, known_names in RECOGNIZED_HOSTS.values():
if any(hostname.find(name) >= 0 for name in known_names):
self.host_type = host_type
return
self.host_type = 'DEFAULT'
def query_scheduler(self):
if not scheduler.scheduler_class: return
env = scheduler.get_environ()
self.scheduler_id = env.id
self.num_nodes = env.num_nodes
self.partition = env.partition
info = scheduler.get_job_status(self.scheduler_id)
self.walltime_seconds = info['walltime_sec']
def elapsed_time_seconds(self):
return time.time() - START_TIME
def remaining_time_seconds(self):
if self.walltime_seconds:
elasped = self.elapsed_time_seconds()
return self.walltime_seconds - elapsed
else:
return float("inf")
def sufficient_time(self, job):
return 60*job.wall_time_minutes < self.remaining_time_seconds()
def check_timeout(self):
if self.remaining_time_seconds() < 1.0:
return True
return False
def get_runnable_jobs(jobs, running_pks, host_env):
def get_runnable_jobs(jobs, running_pks):
runnable_jobs = [job for job in jobs
if job.pk not in running_pks and
job.state in RUNNABLE_STATES and
host_env.sufficient_time(job)]
sufficient_time(job)]
return runnable_jobs
def create_new_runners(jobs, runner_group, worker_group, host_env):
def create_new_runners(jobs, runner_group, worker_group):
created_one = False
running_pks = runner_group.running_job_pks
runnable_jobs = get_runnable_jobs(jobs, running_pks, host_env)
runnable_jobs = get_runnable_jobs(jobs, running_pks)
while runnable_jobs:
try:
runner_group.create_next_runner(runnable_jobs, worker_group)
......@@ -121,35 +61,34 @@ def create_new_runners(jobs, runner_group, worker_group, host_env):
else:
created_one = True
running_pks = runner_group.running_job_pks
runnable_jobs = get_runnable_jobs(jobs, running_pks, host_env)
runnable_jobs = get_runnable_jobs(jobs, running_pks)
return created_one
def main(args, transition_pool, runner_group, job_source):
host_env = HostEnvironment(args)
worker_group = worker.WorkerGroup(host_env)
delay_timer = delay()
# Main Launcher Service Loop
while not host_env.check_timeout():
while not scheduler.remaining_time_seconds() <= 0.0:
logger.debug("Begin launcher service loop")
wait = True
for stat in transitions_pool.get_statuses():
logger.debug(f'Transition: {stat.pk} {stat.state}: {stat.msg}')
for stat in transition_pool.get_statuses():
logger.info(f'[{str(stat.pk)[:8]}] transitioned to {stat.state}: {stat.msg}')
wait = False
job_source.refresh_from_db()
transitionable_jobs = [
job for job in job_source.jobs
if job not in transitions_pool
and job.state in transitions_pool.TRANSITIONS
if job not in transition_pool
and job.state in transitions.TRANSITIONS
]
for job in transitionable_jobs:
transition_pool.add_job(job)
wait = False
logger.info(f"Queued trans: {job.cute_id} in state {job.state}")
any_finished = runner_group.update_and_remove_finished()
created = create_new_runners(job_source.jobs, runner_group, worker_group, host_env)
created = create_new_runners(job_source.jobs, runner_group, worker_group)
if any_finished or created: wait = False
if wait: next(delay_timer)
......@@ -166,6 +105,7 @@ def on_exit(runner_group, transition_pool, job_source):
transition_pool.add_job(job)
transition_pool.end_and_wait()
logger.debug("Launcher exit graceful\n\n")
exit(0)
......@@ -182,12 +122,13 @@ def get_args():
parser.add_argument('--serial-jobs-per-worker', type=int, default=4,
help="For non-MPI jobs, how many to pack per worker")
parser.add_argument('--time-limit-minutes', type=int,
help="Override auto-detected walltime limit (runs"
" forever if no limit is detected or specified)")
help="Provide a walltime limit if not already imposed")
parser.add_argument('--daemon', action='store_true')
return parser.parse_args()
def detect_dead_runners(job_source):
for job in job_source.by_states['RUNNING']:
logger.info(f'Picked up running job {job.cute_id}: marking RESTART_READY')
job.update_state('RESTART_READY', 'Detected dead runner')
if __name__ == "__main__":
......@@ -197,6 +138,8 @@ if __name__ == "__main__":
job_source.refresh_from_db()
runner_group = runners.RunnerGroup()
transition_pool = transitions.TransitionProcessPool()
worker_group = worker.WorkerGroup(args, host_type=scheduler.host_type,
workers_str=scheduler.workers_str)
detect_dead_runners(job_source)
......
......@@ -4,8 +4,8 @@ import sys
from subprocess import Popen, STDOUT
from mpi4py import MPI
from balsam.launcher.cd import cd
from balsam.launcher.exceptions import *
import balsamlauncher.cd
from balsamlauncher.exceptions import *
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
......
......@@ -19,10 +19,13 @@ from django.conf import settings
from django.db import transaction
import balsam.models
from balsam.launcher import mpi_commands
from balsam.launcher import mpi_ensemble
from balsam.launcher.exceptions import *
from balsam.launcher.cd import cd
from balsamlauncher import mpi_commands
from balsamlauncher import mpi_ensemble
from balsamlauncher.exceptions import *
from balsamlauncher import cd
import logging