Commit 65fe3f11 authored by Michael Salim's avatar Michael Salim

launcher, added CLI and python API for pre/post

parent 6e22729b
#!/bin/bash
PYTHON="$(which python)"
$PYTHON $ARGOBALSAM_INSTALL_PATH/balsam/bin/cli.py $*
# These must come before any other imports
import django
import os
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
# --------------
import argparse
import sys
from cli_commands import newapp,newjob,newdep,ls,modify,rm,qsub
from cli_commands import kill,mkchild,launcher,service
from django.conf import settings
def make_parser():
parser = argparse.ArgumentParser(prog='balsam', description="Balsam command line interface")
subparsers = parser.add_subparsers(title="Subcommands")
# Add app
parser_app = subparsers.add_parser('app',
help="add a new application definition",
description="add a new application definition",
)
parser_app.set_defaults(func=newapp)
parser_app.add_argument('-n','--name',dest='name',
help='application name',required=True)
parser_app.add_argument('-d','--description',dest='description',
help='application description',required=True)
parser_app.add_argument('-e','--executable',dest='executable',
help='application executable with full path',required=True)
parser_app.add_argument('-r','--preprocess',dest='preprocess',
help='preprocessing script with full path', default='')
parser_app.add_argument('-o','--postprocess',dest='postprocess',
help='postprocessing script with full path', default='')
# Add job
parser_job = subparsers.add_parser('job',
help="add a new Balsam job",
description="add a new Balsam job",
)
parser_job.set_defaults(func=newjob)
parser_job.add_argument('-e','--name',dest='name',type=str,
help='job name',required=True)
parser_job.add_argument('-d','--description',dest='description',type=str,
help='job description',required=False,default='')
parser_job.add_argument('-t','--wall-minutes',dest='wall_time_minutes',type=int,
help='estimated job walltime in minutes',required=True)
parser_job.add_argument('-n','--num-nodes',dest='num_nodes',type=int,
help='number of nodes to use',required=True)
parser_job.add_argument('-p','--processes-per-node',dest='processes_per_node',type=int,
help='number of processes to run on each node',required=True)
parser_job.add_argument('-m','--threads-per-rank',dest='threads_per_rank',type=int,
default=1)
parser_job.add_argument('-m','--threads-per-core',dest='threads_per_core',type=int,
default=1)
parser_job.add_argument('-a','--application',dest='application',type=str,
help='Name of the application to use; must exist in ApplicationDefinition DB',
required=True)
parser_job.add_argument('-i','--input-url',dest='input_url',type=str,
help='Input URL from which input files are copied.',required=False,default='')
parser_job.add_argument('-o','--output-url',dest='output_url',type=str,
help='Output URL to which output files are copied.',required=False,default='')
parser_job.add_argument('-y',dest='yes',
help='Skip prompt confirming job details.',required=False,action='store_true')
# Add dep
parser_dep = subparsers.add_parser('dep',
help="add a dependency between two existing jobs",
description="add a dependency between two existing jobs"
)
parser_dep.set_defaults(func=newdep)
parser_dep.add_argument('parent', help="Parent must be finished before child")
parser_dep.add_argument('child', help="The dependent job")
# ls
parser_ls = subparsers.add_parser('ls', help="list jobs, applications, or jobs-by-workflow")
parser_ls.set_defaults(func=ls)
parser_ls.add_argument('object', choices=['jobs', 'apps', 'wf'],
help="list all jobs, all apps, or jobs by workflow")
# modify
parser_modify = subparsers.add_parser('modify', help="alter job or application")
parser_modify.set_defaults(func=modify)
# rm
parser_rm = subparsers.add_parser('rm', help="remove jobs or applications from the database")
parser_rm.set_defaults(func=rm)
# qsub
parser_qsub = subparsers.add_parser('qsub', help="add a one-line job to the database, bypassing Application table")
parser_qsub.set_defaults(func=qsub)
# kill
parser_kill = subparsers.add_parser('killjob', help="Kill a job without removing it from the DB")
parser_kill.set_defaults(func=kill)
# makechild
parser_mkchild = subparsers.add_parser('mkchild', help="Create a child job of a specified job")
parser_mkchild.set_defaults(func=mkchild)
# launcher
parser_launcher = subparsers.add_parser('launcher', help="Start an instance of the balsam launcher")
parser_launcher.set_defaults(func=launcher)
# service
parser_service = subparsers.add_parser('service',
help="Start an instance of the balsam metascheduler service")
parser_service.set_defaults(func=service)
return parser
if __name__ == "__main__":
parser = make_parser()
args = parser.parse_args()
if len(sys.argv) == 1:
parser.print_help()
sys.exit(0)
print(args)
args.func(args)
# These statements must come before any other imports
#import django
import os
#os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
#django.setup()
# --------------------
from django.conf import settings
import balsam.models
BalsamJob = balsam.models.BalsamJob
def newapp(args):
pass
def newjob(args):
pass
def newdep(args):
pass
def ls(args):
pass
def modify(args):
pass
def rm(args):
pass
def qsub(args):
pass
def kill(args):
pass
def mkchild(args):
pass
def launcher(args):
pass
def service(args):
pass
'''Python API for Balsam DAG Manipulations
Example usage:
>>> import balsam.dag as dag
>>>
>>> output = open('expected_output').read()
>>>
>>> if 'CONVERGED' not in output:
>>> for child in dag.children:
>>> dag.kill(child, recursive=True)
>>>
>>> with open("data/input.dat", 'w') as fp:
>>> fp.write("# a new input file here")
>>>
>>> dag.spawn_child(clone=dag.current_job,
>>> walltime_minutes=dag.current_job.walltime_minutes + 10,
>>> input_files = 'input.dat')
>>>
'''
import django as _django
import os as _os
import uuid
__all__ = ['JOB_ID', 'TIMEOUT', 'ERROR',
'current_job', 'parents', 'children',
'add_job', 'add_dependency', 'spawn_child',
'kill']
_os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
_django.setup()
from django.conf import settings
from balsam.models import BalsamJob as _BalsamJob
x = _BalsamJob()
assert isinstance(x, _BalsamJob)
_envs = {k:v for k,v in _os.environ.items() if k.find('BALSAM')>=0}
current_job = None
parents = None
children = None
JOB_ID = _envs.get('BALSAM_JOB_ID', '')
TIMEOUT = bool(_envs.get('BALSAM_JOB_TIMEOUT', False))
ERROR = bool(_envs.get('BALSAM_JOB_ERROR', False))
if JOB_ID:
JOB_ID = uuid.UUID(JOB_ID)
current_job = _BalsamJob.objects.get(pk=JOB_ID)
parents = current_job.get_parents()
children = curren_job.get_children()
def add_job(**kwargs):
'''Add a new job to BalsamJob DB'''
job = _BalsamJob()
for k,v in kwargs.items():
try:
getattr(job, k)
except AttributeError:
raise
else:
setattr(job, k, v)
job.save()
return job
def add_dependency(parent,child):
'''Create a dependency between two existing jobs'''
if isinstance(parent, str): parent = uuid.UUID(parent)
if isinstance(child, str): child = uuid.UUID(child)
if not isinstance(parent, _BalsamJob):
parent = _BalsamJob.objects.get(pk=parent)
if not isinstance(child, _BalsamJob):
child = _BalsamJob.objects.get(pk=child)
new_parents = child.get_parents_by_id()
new_parents.append(str(parent.pk))
child.parents.set_parents(new_parents)
def spawn_child(**kwargs):
'''Add new job that is dependent on the current job'''
child = add_job(**kwargs)
add_dependency(current_job, child)
return child
def kill(job, recursive=False):
'''Kill a job or its entire subtree in the DAG'''
job.update_state('USER_KILLED')
if recursive:
for child in job.get_children():
kill(child, recursive=True)
......@@ -145,7 +145,7 @@ class BalsamJob(models.Model):
stage_in_url = models.TextField(
'External stage in files or folders', help_text="A list of URLs for external data to be staged in prior to job processing. Job dataflow from parents to children is NOT handled here; see `input_files` field instead.",
default='')
stage_out_files = models.TextField
stage_out_files = models.TextField(
'External stage out files or folders',
help_text="A string of filename patterns. Matches will be transferred to the stage_out_url. Default: no files are staged out",
default='')
......@@ -329,9 +329,9 @@ application: {self.application}
BALSAM_JOB_ID=str(self.pk),
BALSAM_PARENT_IDS=str(self.parents),
BALSAM_CHILD_IDS=children,
BALSAM_JOB_TIMEOUT=str(timeout),
BALSAM_JOB_ERROR=str(error)
)
if timeout: balsam_envs['BALSAM_JOB_TIMEOUT']="TRUE"
if error: balsam_envs['BALSAM_JOB_ERROR']="TRUE"
envs.update(balsam_envs)
return envs
......
#!/usr/bin/env bash
#source /gpfs/mira-home/msalim/argobalsam/env/bin/activate
source env/bin/activate
export ARGOBALSAM_INSTALL_PATH=$(pwd)
export PYTHONPATH=$ARGOBALSAM_INSTALL_PATH:$PYTHONPATH
export PATH=$ARGOBALSAM_INSTALL_PATH/balsam/bin:$PATH
export ARGOBALSAM_DATA_PATH=$ARGOBALSAM_INSTALL_PATH/data
export ARGOBALSAM_EXE_PATH=$ARGOBALSAM_INSTALL_PATH/exe
......@@ -157,7 +157,7 @@ if 'argo_service' in sys.argv:
LOG_HANDLER_FILENAME = ARGO_SERVICE_LOG_FILENAME
elif 'balsam_service' in sys.argv:
LOG_HANDLER_FILENAME = BALSAM_SERVICE_LOG_FILENAME
print('logging to ' + str(LOG_HANDLER_FILENAME))
#print('logging to ' + str(LOG_HANDLER_FILENAME))
LOGGING = {
'version': 1,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment