Commit afafa5b5 authored by Michael Salim's avatar Michael Salim
Browse files

changes to launcher supporting tests

parent b8fea20a
...@@ -345,8 +345,7 @@ auto timeout retry: {self.auto_timeout_retry} ...@@ -345,8 +345,7 @@ auto timeout retry: {self.auto_timeout_retry}
return {variable:value for (variable,value) in entries} return {variable:value for (variable,value) in entries}
def get_envs(self, *, timeout=False, error=False): def get_envs(self, *, timeout=False, error=False):
#envs = os.environ.copy() envs = os.environ.copy()
envs = {}
try: try:
app = self.get_application() app = self.get_application()
except NoApplication: except NoApplication:
......
'''Python API for Balsam DAG Manipulations '''Python API for Balsam DAG Manipulations
Example usage: Example usage:
>>> import balsam.dag as dag >>> import balsamlauncher.dag as dag
>>> >>>
>>> output = open('expected_output').read() >>> output = open('expected_output').read()
>>> >>>
...@@ -97,11 +97,24 @@ def add_dependency(parent,child): ...@@ -97,11 +97,24 @@ def add_dependency(parent,child):
child.set_parents(existing_parents) child.set_parents(existing_parents)
raise RuntimeError("Detected circular dependency; not creating link") raise RuntimeError("Detected circular dependency; not creating link")
def spawn_child(**kwargs): def spawn_child(clone=False, **kwargs):
'''Add new job that is dependent on the current job''' '''Add new job that is dependent on the current job'''
if not isinstance(current_job, _BalsamJob): if not isinstance(current_job, _BalsamJob):
raise RuntimeError("No current BalsamJob detected in environment") raise RuntimeError("No current BalsamJob detected in environment")
child = add_job(**kwargs) if clone:
child = _BalsamJob.objects.get(pk=current_job.pk)
child.pk = None
for k,v in kwargs.items():
try:
getattr(child, k)
except AttributeError:
raise ValueError(f"Invalid field {k}")
else:
setattr(child, k, v)
child.save()
else:
child = add_job(**kwargs)
add_dependency(current_job, child) add_dependency(current_job, child)
return child return child
......
...@@ -4,6 +4,7 @@ import balsam.models ...@@ -4,6 +4,7 @@ import balsam.models
from balsam.models import BalsamJob from balsam.models import BalsamJob
import logging import logging
import uuid
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class JobReader(): class JobReader():
......
...@@ -111,7 +111,7 @@ def on_exit(runner_group, transition_pool, job_source): ...@@ -111,7 +111,7 @@ def on_exit(runner_group, transition_pool, job_source):
exit(0) exit(0)
def get_args(): def get_args(inputcmd=None):
parser = argparse.ArgumentParser(description="Start Balsam Job Launcher.") parser = argparse.ArgumentParser(description="Start Balsam Job Launcher.")
group = parser.add_mutually_exclusive_group(required=True) group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--job-file', help="File of Balsam job IDs") group.add_argument('--job-file', help="File of Balsam job IDs")
...@@ -127,7 +127,10 @@ def get_args(): ...@@ -127,7 +127,10 @@ def get_args():
parser.add_argument('--time-limit-minutes', type=int, parser.add_argument('--time-limit-minutes', type=int,
help="Provide a walltime limit if not already imposed") help="Provide a walltime limit if not already imposed")
parser.add_argument('--daemon', action='store_true') parser.add_argument('--daemon', action='store_true')
return parser.parse_args() if inputcmd:
return parser.parse_args(inputcmd)
else:
return parser.parse_args()
def detect_dead_runners(job_source): def detect_dead_runners(job_source):
for job in job_source.by_states['RUNNING']: for job in job_source.by_states['RUNNING']:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment