Commit 2a3e4f59 authored by Michael Salim's avatar Michael Salim

Finished basic CLI tools

parent d88268d2
......@@ -24,7 +24,8 @@ def make_parser():
subparsers = parser.add_subparsers(title="Subcommands")
# Add app
# ADD APP
# --------
parser_app = subparsers.add_parser('app',
help="add a new application definition",
description="add a new application definition",
......@@ -42,8 +43,10 @@ def make_parser():
help="Environment variables specific "
"to this app; specify multiple envs like "
"'--env VAR1=VAL1 --env VAR2=VAL2'. ")
# -------------------------------------------------------------------
# Add job
# ADD JOB
# -------
BALSAM_SITE = settings.BALSAM_SITE
parser_job = subparsers.add_parser('job',
......@@ -118,8 +121,11 @@ def make_parser():
parser_job.add_argument('--yes', help='Skip prompt confirming job details.',
required=False,action='store_true')
#--------------------------------------------------------------------
# Add dep
# ADD DEP
# -------
parser_dep = subparsers.add_parser('dep',
help="add a dependency between two existing jobs",
description="add a dependency between two existing jobs"
......@@ -127,8 +133,11 @@ def make_parser():
parser_dep.set_defaults(func=newdep)
parser_dep.add_argument('parent', help="Parent must be finished before child")
parser_dep.add_argument('child', help="The dependent job")
#-------------------------------------------------------------------------
# ls
# LS
# ----
parser_ls = subparsers.add_parser('ls', help="list jobs, applications, or jobs-by-workflow")
parser_ls.set_defaults(func=ls)
parser_ls.add_argument('objects', choices=['jobs', 'apps', 'wf'], default='jobs',
......@@ -139,22 +148,37 @@ def make_parser():
parser_ls.add_argument('--wf', help="Filter jobs matching a workflow")
parser_ls.add_argument('--verbose', action='store_true')
parser_ls.add_argument('--tree', action='store_true', help="show DAG in tree format")
# -----------------------------------------------------------
# modify
# MODIFY
# ------
parser_modify = subparsers.add_parser('modify', help="alter job or application")
parser_modify.set_defaults(func=modify)
parser_modify.add_argument('obj_type', choices=['jobs', 'apps'])
parser_modify.add_argument('id', help="substring of job or app ID to match")
parser_modify.add_argument('--attr', help="attribute of job or app to modify",
required=True)
parser_modify.add_argument('--value', help="modify attr to this value",
required=True)
# -----------------------------------------------------------------------
# rm
# RM
# --
parser_rm = subparsers.add_parser('rm', help="remove jobs or applications from the database")
parser_rm.set_defaults(func=rm)
parser_rm.add_argument('objects', choices=['jobs', 'apps'], default='jobs',
nargs='?', help="permanently delete jobs or apps from DB")
parser_rm.add_argument('--name', help="match any substring of job name")
parser_rm.add_argument('--id', help="match any substring of job id")
parser_rm.add_argument('--recursive', action='store_true', help="delete all jobs in subtree")
parser_rm.add_argument('--force', action='store_true', help="show DAG in tree format")
# qsub
parser_rm.add_argument('objects', choices=['jobs', 'apps'], help="permanently delete jobs or apps from DB")
parser_rm.add_argument('--force', action='store_true', help="force delete")
group = parser_rm.add_mutually_exclusive_group(required=True)
group.add_argument('--name', help="match any substring of job name")
group.add_argument('--id', help="match any substring of job id")
group.add_argument('--all', action='store_true', help="delete all objects in the DB")
# --------------------------------------------------------------------------------------------------
# QSUB
# ----
parser_qsub = subparsers.add_parser('qsub', help="add a one-line job to the database, bypassing Application table")
parser_qsub.set_defaults(func=qsub)
parser_qsub.add_argument('command', nargs='+')
......@@ -165,14 +189,20 @@ def make_parser():
parser_qsub.add_argument('-d', '--threads-per-rank',type=int, default=1)
parser_qsub.add_argument('-j', '--threads-per-core',type=int, default=1)
parser_qsub.add_argument('--env', action='append', required=False, default=[])
# --------------------------------------------------------------------------------------------------
# kill
# KILL
# ----
parser_kill = subparsers.add_parser('killjob', help="Kill a job without removing it from the DB")
parser_kill.set_defaults(func=kill)
parser_kill.add_argument('--id', required=True)
parser_kill.add_argument('--recursive', action='store_true')
# -----------------------------------------------------
# makechild
# MAKECHILD
# ---------
parser_mkchild = subparsers.add_parser('mkchild', help="Create a child job of a specified job")
parser_mkchild.set_defaults(func=mkchild)
parser_mkchild.add_argument('--name', required=True)
......@@ -241,15 +271,37 @@ def make_parser():
parser_mkchild.add_argument('--yes', help='Skip prompt confirming job details.',
required=False,action='store_true')
# -----------------------------------------------------
# launcher
# LAUNCHER
# --------
parser_launcher = subparsers.add_parser('launcher', help="Start an instance of the balsam launcher")
group = parser_launcher.add_mutually_exclusive_group(required=True)
group.add_argument('--job-file', help="File of Balsam job IDs")
group.add_argument('--consume-all', action='store_true',
help="Continuously run all jobs from DB")
group.add_argument('--wf-name',
help="Continuously run jobs of specified workflow")
parser_launcher.add_argument('--num-workers', type=int, default=1,
help="Theta: defaults to # nodes. BGQ: the # of subblocks")
parser_launcher.add_argument('--serial-jobs-per-worker', type=int, default=4,
help="For non-MPI jobs, how many to pack per worker")
parser_launcher.add_argument('--time-limit-minutes', type=int,
help="Override auto-detected walltime limit (runs"
" forever if no limit is detected or specified)")
parser_launcher.set_defaults(func=launcher)
# -----------------
# service
# SERVICE
# -------
parser_service = subparsers.add_parser('service',
help="Start an instance of the balsam metascheduler service")
parser_service.set_defaults(func=service)
# -------------------------
return parser
......
......@@ -3,6 +3,8 @@ from django.conf import settings
import balsam.models
from balsam import dag
import ls_commands as lscmd
import subprocess
import sys
Job = balsam.models.BalsamJob
AppDef = balsam.models.ApplicationDefinition
......@@ -115,10 +117,65 @@ def ls(args):
lscmd.ls_wf(name, verbose, tree, wf)
def modify(args):
pass
if args.obj_type == 'jobs': cls = Job
elif args.obj_type == 'apps': cls = AppDef
item = cls.objects.filter(pk__contains=args.id)
if item.count() == 0:
raise RuntimeError(f"no matching {args.obj_type}")
elif item.count() > 1:
raise RuntimeError(f"more than one matching {args.obj_type}")
item = item.first()
target_type = type(getattr(item, args.attr))
new_value = target_type(args.value)
setattr(item, args.attr, new_value)
item.save()
print(f'{args.obj_type[:-1]} {args.attr} changed to: {new_value}')
def rm(args):
pass
objects_name = args.objects
name = args.name
objid = args.id
deleteall = args.all
force = args.force
# Are we removing jobs or apps?
if objects_name.startswith('job'): cls = Job
elif objects_name.startswith('app'): cls = AppDef
objects = cls.objects.all()
# Filter: all objects, by name-match (multiple), or by ID (unique)?
if deleteall:
deletion_objs = objects
message = f"ALL {objects_name}"
elif name:
deletion_objs = objects.filter(name__icontains=name)
message = f"{len(deletion_objs)} {objects_name} matching name {name}"
if not deletion_objs.exists():
print("No {objects_name} matching query")
return
elif objid:
deletion_objs = objects.filter(pk__icontains=objid)
if deletion_objs.count() > 1:
raise RuntimeError(f"Multiple {objects_name} match ID")
elif deletion_objs.count() == 0:
raise RuntimeError(f"No {objects_name} match ID")
else:
message = f"{objects_name[:-1]} with ID matching {objid}"
# User confirmation
if not force:
if not cmd_confirmation(f"PERMANENTLY remove {message}?"):
print("Delete aborted")
return
# Actually delete things here
for obj in deletion_objs:
obj.delete()
print(f"Deleted {objects_name[:-1]} {str(obj.pk)[:8]}")
def qsub(args):
job = Job()
......@@ -176,7 +233,13 @@ def mkchild(args):
f"[{str(child_job.job_id)[:8]}]")
def launcher(args):
pass
import balsam.launcher.launcher
fname = balsam.launcher.launcher.__file__
original_args = sys.argv[2:]
command = [sys.executable] + [fname] + original_args
print("Starting Balsam launcher")
subprocess.Popen(command)
sys.exit(0)
def service(args):
pass
print("dummy -- invoking balsam metascheduler service")
......@@ -44,9 +44,15 @@ ERROR = bool(_envs.get('BALSAM_JOB_ERROR', False))
if JOB_ID:
JOB_ID = uuid.UUID(JOB_ID)
current_job = _BalsamJob.objects.get(pk=JOB_ID)
parents = current_job.get_parents()
children = current_job.get_children()
try:
current_job = _BalsamJob.objects.get(pk=JOB_ID)
except:
raise RuntimeError(f"The environment specified current job: "
"BALSAM_JOB_ID {JOB_ID}\n but this does not "
"exist in DB! Was it deleted accidentally?")
else:
parents = current_job.get_parents()
children = current_job.get_children()
def add_job(**kwargs):
......
class cd:
'''Context manager for changing cwd'''
def __init__(self, new_path):
self.new_path = os.path.expanduser(new_path)
def __enter__(self):
self.saved_path = os.getcwd()
os.chdir(self.new_path)
def __exit__(self):
os.chdir(self.saved_path)
class BalsamLauncherError(Exception): pass
class BalsamRunnerError(Exception): pass
class ExceededMaxConcurrentRunners(BalsamRunnerException): pass
class NoAvailableWorkers(BalsamRunnerException): pass
class ExceededMaxConcurrentRunners(BalsamRunnerError): pass
class NoAvailableWorkers(BalsamRunnerError): pass
class BalsamTransitionError(Exception): pass
class TransitionNotFoundError(BalsamTransitionException, ValueError): pass
class TransitionNotFoundError(BalsamTransitionError, ValueError): pass
class MPIEnsembleError(Exception): pass
'''The Launcher is either invoked by the user, who bypasses the Balsam
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
import argparse
import os
import django
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
import argparse
from sys import exit
import signal
import time
import django
from django.conf import settings
from balsam import scheduler
......@@ -179,8 +182,8 @@ def get_args():
parser.add_argument('--serial-jobs-per-worker', type=int, default=4,
help="For non-MPI jobs, how many to pack per worker")
parser.add_argument('--time-limit-minutes', type=int,
help="Override auto-detected walltime limit (runs
forever if no limit is detected or specified)")
help="Override auto-detected walltime limit (runs"
" forever if no limit is detected or specified)")
return parser.parse_args()
def detect_dead_runners(job_source):
......@@ -188,8 +191,6 @@ def detect_dead_runners(job_source):
job.update_state('RESTART_READY', 'Detected dead runner')
if __name__ == "__main__":
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
args = get_args()
job_source = jobreader.JobReader.from_config(args)
......
from collections import namedtuple
from contextlib import nested
import os
import sys
from subprocess import Popen, STDOUT
from mpi4py import MPI
from balsam.launcher.runners import cd
from balsam.launcher.cd import cd
from balsam.launcher.exceptions import *
COMM = MPI.COMM_WORLD
......@@ -31,7 +30,7 @@ def read_jobs(fp):
def run(job):
basename = os.path.basename(job.workdir)
outname = f"{basename}.out"
with nested(cd(job.workdir), open(outname, 'wb')) as (_,outf):
with cd(job.workdir) as _, open(outname, 'wb') as outf:
try:
status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT)
......
......@@ -22,18 +22,7 @@ import balsam.models
from balsam.launcher import mpi_commands
from balsam.launcher import mpi_ensemble
from balsam.launcher.exceptions import *
class cd:
'''Context manager for changing cwd'''
def __init__(self, new_path):
self.new_path = os.path.expanduser(new_path)
def __enter__(self):
self.saved_path = os.getcwd()
os.chdir(self.new_path)
def __exit__(self):
os.chdir(self.saved_path)
from balsam.launcher.cd import cd
class MonitorStream(Thread):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment