GitLab maintenance scheduled form Friday, 2021-06-18 5:00pm to Satursday, 2021-06-19 10:00pm CT - Services will be unavailable during this time.

Commit ab05fbd8 authored by Michael Salim's avatar Michael Salim

major reorganization; added setuptools and sphinx docs folder

parent 75d8ac3b
......@@ -8,6 +8,8 @@ balsamjobs
include docs
# HPC Edge Service and Workflow Management System
**Authors:** J. Taylor Childers (Argonne National Laboratory), Tom Uram (Argonne National Laboratory), Doug Benjamin (Duke University), Misha Salim (Argonne National Laboratory)
An HPC Edge Service to manage remote job submission. The goal of this service is to provide a secure interface for submitting jobs to large computing resources.
# Prerequisites
The Argo and Balsam services require Python 3.6, mpi4py, Django, and django-concurrency.
import common.Serializer as Serializer
import balsam.common.Serializer as Serializer
class ArgoJobStatus:
def __init__(self):
......@@ -13,4 +13,4 @@ class ArgoJobStatus:
def get_from_message(message):
tmp = ArgoJobStatus()
tmp.__dict__ = Serializer.deserialize(message)
return tmp
\ No newline at end of file
return tmp
......@@ -6,9 +6,9 @@ from django.db.utils import load_backend
from django.conf import settings
from common import MessageReceiver
from argo import QueueMessage
from argo.models import ArgoJob,ArgoSubJob,BALSAM_JOB_TO_SUBJOB_STATE_MAP
from balsam import BalsamJobStatus,models
from balsam.argo import QueueMessage
from balsam.argo.models import ArgoJob,ArgoSubJob,BALSAM_JOB_TO_SUBJOB_STATE_MAP
from balsam.service import BalsamJobStatus,models
class JobStatusReceiver(MessageReceiver.MessageReceiver):
''' subscribes to the balsam job status queue and updates a job state '''
......@@ -5,9 +5,9 @@ from django.db import connections,DEFAULT_DB_ALIAS
from django.db.utils import load_backend
from django.conf import settings
from argo import models,QueueMessage
from common import db_tools
from common import MessageReceiver,Serializer
from balsam.argo import models,QueueMessage
from balsam.common import db_tools
from balsam.common import MessageReceiver,Serializer
def CreateWorkingPath(job_id):
path = os.path.join(settings.ARGO_WORK_DIRECTORY,str(job_id))
......@@ -4,4 +4,4 @@ from django.apps import AppConfig
class ArgoCoreConfig(AppConfig):
name = 'argo'
name = 'balsam.argo'
......@@ -18,7 +18,7 @@ import warnings
from django import forms
from django.forms.widgets import CheckboxInput
from argo import models
from balsam.argo import models
import logging
logger = logging.getLogger(__name__)
......@@ -9,11 +9,11 @@ from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from django.core.validators import validate_comma_separated_integer_list
from argo import QueueMessage,ArgoJobStatus
from common import log_uncaught_exceptions,MessageInterface
from common import Serializer,transfer,Mail,db_tools
from balsam.models import BalsamJob
from balsam.models import STATES_BY_NAME as BALSAM_STATES_BY_NAME
from balsam.argo import QueueMessage,ArgoJobStatus
from balsam.common import log_uncaught_exceptions,MessageInterface
from balsam.common import Serializer,transfer,Mail,db_tools
from balsam.service.models import BalsamJob
from balsam.service.models import STATES_BY_NAME as BALSAM_STATES_BY_NAME
# assign this function to the system exception hook
sys.excepthook = log_uncaught_exceptions.log_uncaught_exceptions
......@@ -185,7 +185,7 @@ def send_status_message(job,message=None):
# ------------ Job States ----------------------------
from common.JobState import JobState
from balsam.common.JobState import JobState
# Job States
......@@ -2,7 +2,7 @@ from django.conf.urls import url
import logging
logger = logging.getLogger(__name__)
from argo import views
from balsam.argo import views
local_urlpatterns = [
url(r'^$', views.index, name='index'),
import os,logging,sys
from django.shortcuts import render,get_object_or_404
from argo.html_forms import JobDisplayForm
from balsam.argo.html_forms import JobDisplayForm
logger = logging.getLogger(__name__)
from argo import models
from balsam.argo import models
# Create your views here.
from common.file_tools import delete_old_files_directories
from balsam.common.file_tools import delete_old_files_directories
import time
class DirCleaner:
from common import PikaMessageInterface, NoMessageInterface
from balsam.common import PikaMessageInterface, NoMessageInterface
from django.conf import settings
import logging,sys,multiprocessing,time,os
logger = logging.getLogger(__name__)
......@@ -3,7 +3,7 @@ import time
import threading
logger = logging.getLogger(__name__)
from common import MessageInterface
from balsam.common import MessageInterface
class NoMessageInterface(MessageInterface.MessageInterface):
......@@ -5,7 +5,7 @@ import logging
logger = logging.getLogger(__name__)
from common import MessageInterface
from balsam.common import MessageInterface
class PikaMessageInterface(MessageInterface.MessageInterface):
......@@ -2,8 +2,8 @@ import multiprocessing,logging
logger = logging.getLogger(__name__)
from django.db import utils,connections,DEFAULT_DB_ALIAS
from balsam import QueueMessage
from common import db_tools
from balsam.service import QueueMessage
from balsam.common import db_tools
class TransitionJob(multiprocessing.Process):
''' spawns subprocess which finds the DB entry for the given id
......@@ -12,10 +12,7 @@
import os,logging
logger = logging.getLogger(__name__)'here')
from user_settings import *
from balsam.user_settings import *
......@@ -38,7 +35,7 @@ ALLOWED_HOSTS = []
# Application definition
......@@ -58,7 +55,7 @@ MIDDLEWARE_CLASSES = [
ROOT_URLCONF = 'argobalsam.urls'
ROOT_URLCONF = 'balsam.django_config.urls'
......@@ -76,7 +73,7 @@ TEMPLATES = [
WSGI_APPLICATION = 'argobalsam.wsgi.application'
WSGI_APPLICATION = 'balsam.django_config.wsgi.application'
......@@ -17,6 +17,5 @@ from django.conf.urls import url,include
from django.contrib import admin
urlpatterns = [
......@@ -11,6 +11,6 @@ import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "argobalsam.settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "balsam.django_config.settings")
application = get_wsgi_application()
'''Python API for Balsam DAG Manipulations
Example usage:
>>> import balsamlauncher.dag as dag
>>> import launcher.dag as dag
>>> output = open('expected_output').read()
......@@ -18,8 +18,8 @@ Example usage:
import django as django
import os as os
import django
import os
import uuid
__all__ = ['JOB_ID', 'TIMEOUT', 'ERROR',
......@@ -27,10 +27,10 @@ __all__ = ['JOB_ID', 'TIMEOUT', 'ERROR',
'add_job', 'add_dependency', 'spawn_child',
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
from balsam.models import BalsamJob as _BalsamJob
from balsam.service.models import BalsamJob as _BalsamJob
from django.conf import settings
current_job = None
from collections import defaultdict
from django.conf import settings
import balsam.models
from balsam.models import BalsamJob
from balsam.service import models
BalsamJob = models.BalsamJob
import logging
import uuid
......@@ -32,7 +32,7 @@ class JobReader():
def _get_jobs(self): raise NotImplementedError
def _filter(self, job_queryset):
jobs = job_queryset.exclude(state__in=balsam.models.END_STATES)
jobs = job_queryset.exclude(state__in=models.END_STATES)
jobs = jobs.filter(allowed_work_sites__icontains=settings.BALSAM_SITE)
return jobs
......@@ -2,33 +2,36 @@
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
import argparse
from math import floor
import os
from sys import exit
import signal
import time
import django
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
from django.conf import settings
import logging
logger = logging.getLogger('balsamlauncher')
logger = logging.getLogger('balsam.launcher')"Loading Balsam Launcher")
from balsam.schedulers import Scheduler
from balsam.service.schedulers import Scheduler
scheduler = Scheduler.scheduler_main
from balsamlauncher import jobreader
from balsamlauncher import transitions
from balsamlauncher import worker
from balsamlauncher import runners
from balsamlauncher.exceptions import *
from balsam.launcher import jobreader
from balsam.launcher import transitions
from balsam.launcher import worker
from balsam.launcher import runners
from balsam.launcher.exceptions import *
def delay(period=settings.BALSAM_SERVICE_PERIOD):
def delay_generator(period=settings.BALSAM_SERVICE_PERIOD):
nexttime = time.time() + period
while True:
now = time.time()
......@@ -45,37 +48,15 @@ def elapsed_time_minutes():
while True:
yield (time.time() - start) / 60.0
def sufficient_time(job):
return 60*job.wall_time_minutes < scheduler.remaining_time_seconds()
def get_runnable_jobs(jobs, running_pks):
runnable_jobs = [job for job in jobs
if not in running_pks and
job.state in RUNNABLE_STATES and
return runnable_jobs
def create_new_runners(jobs, runner_group, worker_group):
created_one = False
running_pks = runner_group.running_job_pks
runnable_jobs = get_runnable_jobs(jobs, running_pks)
while runnable_jobs:
logger.debug(f"Have {len(runnable_jobs)} new runnable jobs (out of "
runner_group.create_next_runner(runnable_jobs, worker_group)
except ExceededMaxRunners:"Exceeded max concurrent runners; waiting")
except NoAvailableWorkers:"Not enough idle workers to start any new runs")
def remaining_time_minutes(time_limit_minutes=0.0):
elapsed_timer = elapsed_time_minutes()
while True:
if time_limit_minutes > 0.0:
remaining = time_limit_minutes - next(elapsed_timer)
created_one = True
running_pks = runner_group.running_job_pks
runnable_jobs = get_runnable_jobs(jobs, running_pks)
return created_one
remaining = scheduler.remaining_time_seconds() / 60.0
if remaining > 0: yield remaining
else: break
def check_parents(job, lock):
......@@ -92,53 +73,85 @@ def check_parents(job, lock):
lock.release()'{job.cute_id} waiting for parents')
def log_time(minutes_left):
if minutes_left > 1e12:
whole_minutes = floor(minutes_left)
whole_seconds = round((minutes_left - whole_minutes)*60)
time_str = f"{whole_minutes:02d} min : {whole_seconds:02d} sec remaining"
def main(args, transition_pool, runner_group, job_source):
delay_timer = delay()
elapsed_min = elapsed_time_minutes()
logger.debug(f"time limit provided {args.time_limit_minutes}")
last_created = 0.0
if args.time_limit_minutes > 0:
def timeout():
elapsed = next(elapsed_min)
logger.debug(f"{elapsed} minutes elapsed out of {args.time_limit_minutes}")
return elapsed >= args.time_limit_minutes
timeout = lambda : scheduler.remaining_time_seconds() <= 0.0
while not timeout():
delay_sleeper = delay_generator()
runner_create_period = settings.BALSAM_RUNNER_CREATION_PERIOD_SEC
last_runner_created = time.time()
remaining_timer = remaining_time_minutes(args.time_limit_minutes)
for remaining_minutes in remaining_timer:"\n******************\n"
wait = True
for stat in transition_pool.get_statuses(): wait = False
delay = True
# Update after any finished transitions
for stat in transition_pool.get_statuses(): delay = False
waiting_jobs = (j for j in if
for job in waiting_jobs: check_parents(job, transition_pool.lock)
# Update jobs awaiting dependencies
waiting_jobs = (j for j in if j.state in WAITING_STATES)
for job in waiting_jobs:
check_parents(job, transition_pool.lock)
# Enqueue new transitions
transitionable_jobs = [
job for job in
if job not in transition_pool
and job.state in transitions.TRANSITIONS
for job in transitionable_jobs:
wait = False
delay = False
fxn = transitions.TRANSITIONS[job.state]"Queued transition: {job.cute_id} will undergo {fxn}")
# Update jobs that are running/finished
any_finished = runner_group.update_and_remove_finished()
if any_finished: wait = False
if any_finished: delay = False
if time.time() - last_created > 5:
created = create_new_runners(, runner_group, worker_group)
if created:
last_created = time.time()
wait = False
if wait: next(delay_timer)
# Decide whether or not to start a new runner
runnable_jobs = [
job for job in
if not in runner_group.running_job_pks and
job.state in RUNNABLE_STATES and
job.wall_time_minutes <= remaining_minutes
logger.debug(f"Have {len(runnable_jobs)} runnable jobs")
almost_runnable = any(j.state in ALMOST_RUNNABLE_STATES for j in
now = time.time()
runner_ready = bool(now - last_runner_created > runner_create_period)
num_serial = len([j for j in runnable_jobs if j.num_ranks == 1])
worker = worker_group[0]
max_serial_per_ensemble = 2 * worker.num_nodes * worker.max_ranks_per_node
ensemble_ready = (num_serial >= max_serial_per_ensemble) or (num_serial == 0)
if runnable_jobs:
if runner_ready or not almost_runnable or ensemble_ready:
runner_group.create_next_runner(runnable_jobs, worker_group)
except ExceededMaxRunners:"Exceeded max concurrent runners; waiting")
except NoAvailableWorkers:"Not enough idle workers to start any new runs")
last_runner_created = now
if delay: next(delay_sleeper)
def on_exit(runner_group, transition_pool, job_source):
......@@ -146,9 +159,6 @@ def on_exit(runner_group, transition_pool, job_source):
logger.debug("Entering on_exit cleanup function")
logger.debug("on_exit: flush job queue")
logger.debug("on_exit: update/remove/timeout jobs from runner group")
......@@ -181,7 +191,7 @@ def get_args(inputcmd=None):
def detect_dead_runners(job_source):
for job in job_source.by_states['RUNNING']:'Picked up running job {job.cute_id}: marking RESTART_READY')'Picked up dead running job {job.cute_id}: marking RESTART_READY')
job.update_state('RESTART_READY', 'Detected dead runner')
if __name__ == "__main__":
......@@ -5,18 +5,17 @@ import logging
import django
import signal
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
logger = logging.getLogger('balsamlauncher.mpi_ensemble')
logger = logging.getLogger('balsam.launcher.mpi_ensemble')
from subprocess import Popen, STDOUT
from mpi4py import MPI
from balsamlauncher.util import cd, get_tail
from balsamlauncher.exceptions import *
from balsam.models import BalsamJob
from balsam.launcher.util import cd, get_tail
from balsam.launcher.exceptions import *
from balsam.service.models import BalsamJob
RANK = COMM.Get_rank()
......@@ -24,16 +24,16 @@ from queue import Queue, Empty
from django.conf import settings
from django.db import transaction
from balsam.models import InvalidStateError
from balsamlauncher import mpi_commands
from balsamlauncher.exceptions import *
from balsamlauncher.util import cd, get_tail
from balsam.service.models import InvalidStateError
from balsam.launcher import mpi_commands
from balsam.launcher.exceptions import *
from balsam.launcher.util import cd, get_tail
import logging
logger = logging.getLogger(__name__)
from importlib.util import find_spec
MPI_ENSEMBLE_EXE = find_spec("balsamlauncher.mpi_ensemble").origin
MPI_ENSEMBLE_EXE = find_spec("balsam.launcher.mpi_ensemble").origin
class MonitorStream(Thread):
......@@ -266,7 +266,7 @@ class RunnerGroup:
# If there are not enough serial jobs; run the larger of:
# largest MPI job that fits, or the remaining serial jobs
if nserial >= nidle_ranks:
jobs = serial_jobs[:nidle_ranks] # TODO: try putting ALL serial jobs into one MPIEnsemble
jobs = serial_jobs[:2*nidle_ranks] # TODO:Expt w/ > 2 jobs per worker
assigned_workers = idle_workers
runner_class = MPIEnsembleRunner
msg = (f"Running {len(jobs)} serial jobs on {nidle_workers} workers "
......@@ -295,6 +295,7 @@ class RunnerGroup:
for worker in assigned_workers: worker.idle = False
logger.debug(f"Using workers: {[ for w in assigned_workers]}")
def update_and_remove_finished(self, timeout=False):
# TODO: Benchmark performance overhead; does grouping into one
......@@ -323,6 +324,7 @@ class RunnerGroup:
raise RuntimeError(msg)
logger.debug(f"Freeing workers: {[ for w in runner.worker_list]}")
for worker in runner.worker_list:
worker.idle = True
......@@ -17,13 +17,13 @@ from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings