Commit d88268d2 authored by Michael Salim's avatar Michael Salim

command line interface

parent 65fe3f11
This diff is collapsed.
# These statements must come before any other imports
#import django
import os
#os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
#django.setup()
# --------------------
from django.conf import settings
import balsam.models
from balsam import dag
import ls_commands as lscmd
BalsamJob = balsam.models.BalsamJob
Job = balsam.models.BalsamJob
AppDef = balsam.models.ApplicationDefinition
def cmd_confirmation(message=''):
confirm = ''
while not confirm.lower() in ['y', 'n']:
try:
confirm = input(f"{message} [y/n]: ")
except: pass
return confirm.lower() == 'y'
def newapp(args):
pass
if AppDef.objects.filter(name=args.name).exists():
raise RuntimeError(f"An application named {args.name} exists")
if not os.path.exists(args.executable):
raise RuntimeError(f"Executable {args.executable} not found")
if args.preprocess and not os.path.exists(args.preprocess):
raise RuntimeError(f"Script {args.preprocess} not found")
if args.postprocess and not os.path.exists(args.postprocess):
raise RuntimeError(f"Script {args.postprocess} not found")
app = AppDef()
app.name = args.name
app.description = ' '.join(args.description)
app.executable = args.executable
app.default_preprocess = args.preprocess
app.default_postprocess = args.postprocess
app.environ_vars = ":".join(args.env)
app.save()
print(app)
print("Added app to database")
def newjob(args):
pass
if not AppDef.objects.filter(name=args.application).exists():
raise RuntimeError(f"App {args.application} not registered in local DB")
job = Job()
job.name = args.name
job.description = ' '.join(args.description)
job.workflow = args.workflow
job.allowed_work_sites = ' '.join(args.allowed_site)
job.wall_time_minutes = args.wall_minutes
job.num_nodes = args.num_nodes
job.processes_per_node = args.processes_per_node
job.threads_per_rank = args.threads_per_rank
job.threads_per_core = args.threads_per_core
job.application = args.application
job.application_args = ' '.join(args.args)
job.preprocess = args.preprocessor
job.postprocess = args.postprocessor
job.post_error_handler = args.post_handle_error
job.post_timeout_handler = args.post_handle_timeout
job.auto_timeout_retry = not args.disable_auto_timeout_retry
job.input_files = ' '.join(args.input_files)
job.stage_in_url = args.url_in
job.stage_out_url = args.url_out
job.stage_out_files = ' '.join(args.stage_out_files)
job.environ_vars = ":".join(args.env)
print(job)
if not args.yes:
if not cmd_confirmation('Confirm adding job to DB'):
print("Add job aborted")
return
job.save()
return job
print("Added job to database")
def match_uniq_job(s):
job = Job.objects.filter(job_id__icontains=s)
if job.count() > 1:
raise ValueError(f"More than one ID matched {s}")
elif job.count() == 1: return job
job = Job.objects.filter(name__contains=s)
if job.count() > 1: job = Job.objects.filter(name=s)
if job.count() > 1:
raise ValueError(f"More than one Job name matches {s}")
elif job.count() == 1: return job
raise ValueError(f"No job in local DB matched {s}")
def newdep(args):
pass
parent = match_uniq_job(args.parent)
child = match_uniq_job(args.child)
dag.add_dependency(parent, child)
print(f"Created link [{str(parent.first().job_id)[:8]}] --> "
f"[{str(child.first().job_id)[:8]}]")
def ls(args):
pass
objects = args.objects
name = args.name
history = args.history
verbose = args.verbose
id = args.id
tree = args.tree
wf = args.wf
if objects.startswith('job'):
lscmd.ls_jobs(name, history, id, verbose, tree, wf)
elif objects.startswith('app'):
lscmd.ls_apps(name, id, verbose)
elif objects.startswith('work') or objects.startswith('wf'):
lscmd.ls_wf(name, verbose, tree, wf)
def modify(args):
pass
......@@ -28,13 +121,59 @@ def rm(args):
pass
def qsub(args):
pass
job = Job()
job.name = args.name
job.description = 'Added by balsam qsub'
job.workflow = 'qsub'
job.allowed_work_sites = settings.BALSAM_SITE
job.wall_time_minutes = args.wall_minutes
job.num_nodes = args.nodes
job.processes_per_node = args.ppn
job.threads_per_rank = args.threads_per_rank
job.threads_per_core = args.threads_per_core
job.environ_vars = ":".join(args.env)
job.application = ''
job.application_args = ''
job.preprocess = ''
job.postprocess = ''
job.post_error_handler = False
job.post_timeout_handler = False
job.auto_timeout_retry = False
job.input_files = ''
job.stage_in_url = ''
job.stage_out_url = ''
job.stage_out_files = ''
job.direct_command = ' '.join(args.command)
print(job)
job.save()
print("Added to database")
def kill(args):
pass
job_id = args.id
job = Job.objects.filter(job_id__startswith=job_id)
if job.count() > 1:
raise RuntimeError(f"More than one job matches {job_id}")
if job.count() == 0:
print(f"No jobs match the given ID {job_id}")
job = job.first()
if cmd_confirmation(f'Really kill job {job.name} [{str(job.pk)}] ??'):
dag.kill(job, recursive=args.recursive)
print("Job killed")
def mkchild(args):
pass
if not dag.current_job:
raise RuntimeError(f"mkchild requires that BALSAM_JOB_ID is in the environment")
child_job = newjob(args)
dag.add_dependency(dag.current_job, child_job)
print(f"Created link [{str(dag.current_job.job_id)[:8]}] --> "
f"[{str(child_job.job_id)[:8]}]")
def launcher(args):
pass
......
import balsam.models
Job = balsam.models.BalsamJob
AppDef = balsam.models.ApplicationDefinition
def print_history(jobs):
for job in jobs:
print(f'Job {job.name} [{job.job_id}]')
print(f'------------------------------------------------')
print(f'{job.state_history}\n')
def print_jobs(jobs, verbose):
if not verbose:
header = Job.get_header()
print(header)
print('-'*len(header))
for job in jobs:
print(job.get_line_string())
else:
for job in jobs:
print(job)
def print_subtree(job, indent=1):
def job_str(job): return f"{job.name:10} [{str(job.job_id)[:8]}]"
print('|'*indent, end=' ')
print(5*indent*' ', job_str(job))
for job in job.get_children():
print_subtree(job, indent+1)
def print_jobs_tree(jobs):
roots = [j for j in jobs if j.parents=='[]']
for job in roots: print_subtree(job)
def ls_jobs(namestr, show_history, jobid, verbose, tree, wf):
results = Job.objects.all()
if namestr: results = results.filter(name__icontains=namestr)
if jobid: results = results.filter(job_id__icontains=jobid)
if wf: results = results.filter(workflow__icontains=wf)
if not results:
print("No jobs found matching query")
return
if show_history: print_history(results)
elif tree: print_jobs_tree(results)
else: print_jobs(results, verbose)
def ls_apps(namestr, appid, verbose):
if namestr:
results = AppDef.objects.filter(name__icontains=namestr)
elif appid:
results = AppDef.objects.filter(job_id__icontains=appid)
else:
results = AppDef.objects.all()
if not results:
print("No apps found matching query")
return
if verbose:
for app in results: print(app)
else:
header = AppDef.get_header()
print(header)
print('-'*len(header))
for app in results:
print(app.get_line_string())
def ls_wf(name, verbose, tree, wf):
workflows = Job.objects.order_by().values('workflow').distinct()
workflows = [wf['workflow'] for wf in workflows]
if wf: name = wf # wf argument overrides name
if name and name not in workflows:
print(f"No workflow matching {name}")
return
if name and name in workflows:
workflows = [name]
verbose = True
print("Workflows")
print("---------")
for wf in workflows:
print(wf)
if tree:
print('-'*len(wf))
jobs_by_wf = Job.objects.filter(workflow=wf)
print_jobs_tree(jobs_by_wf)
print()
elif verbose:
print('-'*len(wf))
jobs_by_wf = Job.objects.filter(workflow=wf)
print_jobs(jobs_by_wf, False)
print()
......@@ -18,8 +18,8 @@ Example usage:
>>>
'''
import django as _django
import os as _os
import django as django
import os as os
import uuid
__all__ = ['JOB_ID', 'TIMEOUT', 'ERROR',
......@@ -27,22 +27,17 @@ __all__ = ['JOB_ID', 'TIMEOUT', 'ERROR',
'add_job', 'add_dependency', 'spawn_child',
'kill']
_os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
_django.setup()
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
from django.conf import settings
from balsam.models import BalsamJob as _BalsamJob
x = _BalsamJob()
assert isinstance(x, _BalsamJob)
_envs = {k:v for k,v in _os.environ.items() if k.find('BALSAM')>=0}
current_job = None
parents = None
children = None
_envs = {k:v for k,v in os.environ.items() if k.find('BALSAM')>=0}
JOB_ID = _envs.get('BALSAM_JOB_ID', '')
TIMEOUT = bool(_envs.get('BALSAM_JOB_TIMEOUT', False))
ERROR = bool(_envs.get('BALSAM_JOB_ERROR', False))
......@@ -51,7 +46,7 @@ if JOB_ID:
JOB_ID = uuid.UUID(JOB_ID)
current_job = _BalsamJob.objects.get(pk=JOB_ID)
parents = current_job.get_parents()
children = curren_job.get_children()
children = current_job.get_children()
def add_job(**kwargs):
......@@ -61,12 +56,20 @@ def add_job(**kwargs):
try:
getattr(job, k)
except AttributeError:
raise
raise ValueError(f"Invalid field {k}")
else:
setattr(job, k, v)
job.save()
return job
def detect_circular(job, path=[]):
for parent in job.get_parents():
if parent.pk in path:
return True
else:
return detect_circular(parent, path+[parent.pk])
return False
def add_dependency(parent,child):
'''Create a dependency between two existing jobs'''
if isinstance(parent, str): parent = uuid.UUID(parent)
......@@ -77,12 +80,22 @@ def add_dependency(parent,child):
if not isinstance(child, _BalsamJob):
child = _BalsamJob.objects.get(pk=child)
new_parents = child.get_parents_by_id()
new_parents.append(str(parent.pk))
child.parents.set_parents(new_parents)
existing_parents = child.get_parents_by_id()
new_parents = existing_parents.copy()
parent_pk_str = str(parent.pk)
if parent_pk_str in existing_parents:
raise RuntimeError("Dependency already exists; cannot double-create")
else:
new_parents.append(parent_pk_str)
child.set_parents(new_parents)
if detect_circular(child):
child.set_parents(existing_parents)
raise RuntimeError("Detected circular dependency; not creating link")
def spawn_child(**kwargs):
'''Add new job that is dependent on the current job'''
if not isinstance(current_job, _BalsamJob):
raise RuntimeError("No current BalsamJob detected in environment")
child = add_job(**kwargs)
add_dependency(current_job, child)
return child
......
......@@ -15,6 +15,7 @@ from balsam.models import BalsamJob
logger = logging.getLogger(__name__)
PREPROCESS_TIMEOUT_SECONDS = 300
SITE = settings.BALSAM_SITE
StatusMsg = namedtuple('Status', ['pk', 'state', 'msg'])
JobMsg = namedtuple('JobMsg', ['job', 'transition_function'])
......@@ -26,7 +27,9 @@ def main(job_queue, status_queue):
job_msg = job_queue.get()
job, transition_function = job_msg
if job == 'end': return
if job.work_site != SITE:
job.work_site = SITE
job.save(update_fields=['work_site'])
try:
transition_function(job)
except BalsamTransitionError as e:
......
......@@ -149,7 +149,7 @@ class BalsamJob(models.Model):
'External stage out files or folders',
help_text="A string of filename patterns. Matches will be transferred to the stage_out_url. Default: no files are staged out",
default='')
stage_out_urls = models.TextField(
stage_out_url = models.TextField(
'Stage Out URL',
help_text='The URLs to which designated stage out files are sent.',
default='')
......@@ -251,27 +251,40 @@ class BalsamJob(models.Model):
models.Model.save(self, force_insert, force_update, using, update_fields)
def __str__(self):
s = f'''
BalsamJob: {self.job_id}
state: {self.state}
work_site: {self.work_site}
return f'''
Balsam Job
----------
ID: {self.job_id}
name: {self.name}
workflow: {self.workflow}
name: {self.name}
description: {self.description[:50]}
latest state: {self.get_recent_state_str()}
description: {self.description[:80]}
work site: {self.work_site}
allowed work sites: {self.allowed_work_sites}
working_directory: {self.working_directory}
parents: {self.parents}
input_files: {self.input_files}
stage_in_url: {self.stage_in_url}
stage_in_url: {self.stage_in_url}
stage_out_url: {self.stage_out_url}
stage_out_files: {self.stage_out_files}
stage_out_urls: {self.stage_out_urls}
wall_time_minutes: {self.wall_time_minutes}
actual_runtime: {self.runtime_str()}
num_nodes: {self.num_nodes}
threads per rank: {self.threads_per_rank}
threads per core: {self.threads_per_core}
processes_per_node: {self.processes_per_node}
scheduler_id: {self.scheduler_id}
runtime_seconds: {self.runtime_seconds}
application: {self.application}
'''
return s.strip() + '\n'
application: {self.application if self.application else
self.direct_command}
args: {self.application_args}
envs: {self.environ_vars}
created with qsub: {bool(self.direct_command)}
preprocess override: {self.preprocess}
postprocess override: {self.postprocess}
post handles error: {self.post_error_handler}
post handles timeout: {self.post_timeout_handler}
auto timeout retry: {self.auto_timeout_retry}
'''.strip() + '\n'
def get_parents_by_id(self):
......@@ -343,13 +356,24 @@ application: {self.application}
self.state = new_state
self.save(update_fields=['state', 'state_history'])
def get_recent_state_str(self):
return self.state_history.split("\n")[-1].strip()[1:-1]
def get_line_string(self):
recent_state = self.state_history.split("\n")[-1]
return f' {str(self.job_id):36} | {self.workflow:26} | {self.name:26} | {self.application:26} | {self.work_site:20} | {recent_state:100}'
recent_state = self.get_recent_state_str()
app = self.application if self.application else self.direct_command
return f' {str(self.job_id):36} | {self.name:26} | {self.workflow:26} | {app:26} | {recent_state}'
def runtime_str(self):
if self.runtime_seconds == 0: return ''
minutes, seconds = divmod(self.runtime_seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours: return f"{hours:02d} hr : {minutes:02d} min : {seconds:02d} sec"
else: return f"{minutes:02d} min : {seconds:02d} sec"
@staticmethod
def get_header():
return f' {"job_id":36} | {"workflow":26} | {"name":26} | {"application":26} | {"work_site":20} | {"recent state":100}'
return f' {"job_id":36} | {"name":26} | {"workflow":26} | {"application":26} | {"latest update"}'
def create_working_path(self):
top = settings.BALSAM_WORK_DIRECTORY
......@@ -403,25 +427,30 @@ class ApplicationDefinition(models.Model):
default='')
def __str__(self):
s = 'Application: ' + self.name + '\n'
s += ' description: ' + self.description + '\n'
s += ' executable: ' + self.executable + '\n'
s += ' config_script: ' + self.config_script + '\n'
s += ' preprocess: ' + self.preprocess + '\n'
s += ' postprocess: ' + self.postprocess + '\n'
return s
return f'''
Application:
------------
PK: {self.pk}
Name: {self.name}
Description: {self.description}
Executable: {self.executable}
Preprocess: {self.default_preprocess}
Postprocess: {self.default_postprocess}
Envs: {self.environ_vars}
'''.strip() + '\n'
def get_line_string(self):
format = ' %7i | %20s | %20s | %20s | %20s | %20s | %s '
output = format % (self.pk, self.name, self.executable, self.config_script,
self.preprocess, self.postprocess,
format = ' %20s | %20s | %20s | %20s | %s '
output = format % (self.name, self.executable,
self.default_preprocess,
self.default_postprocess,
self.description)
return output
@staticmethod
def get_header():
format = ' %7s | %20s | %20s | %20s | %20s | %20s | %s '
output = format % ('pk', 'name', 'executable', 'config_script',
format = ' %20s | %20s | %20s | %20s | %s '
output = format % ('name', 'executable',
'preprocess', 'postprocess',
'description')
return output
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment