Commit 313f2b38 authored by Michael Salim's avatar Michael Salim

functional test and bug-fix in dag.py

parent 3a181b8f
......@@ -394,6 +394,15 @@ auto timeout retry: {self.auto_timeout_retry}
def get_recent_state_str(self):
return self.state_history.split("\n")[-1].strip()
def read_file_in_workdir(self, fname):
work_dir = self.working_directory
path = os.path.join(work_dir, fname)
if not os.path.exists(path):
raise ValueError(f"{fname} not found in working directory of"
" {self.cute_id}")
else:
return open(path).read()
def get_line_string(self):
recent_state = self.get_recent_state_str()
app = self.application if self.application else self.direct_command
......
......@@ -113,11 +113,13 @@ def spawn_child(clone=False, **kwargs):
raise ValueError(f"Invalid field {k}")
else:
setattr(child, k, v)
child.working_directory = '' # This is essential; awful BUG if not here
child.save()
else:
child = add_job(**kwargs)
add_dependency(current_job, child)
child.state_history = ''
child.update_state("CREATED", f"spawned by {current_job.cute_id}")
return child
......
......@@ -284,6 +284,7 @@ def preprocess(job, lock):
out = os.path.join(job.working_directory, f"preprocess.log")
with open(out, 'w') as fp:
fp.write(f"# Balsam Preprocessor: {preproc_app}")
fp.flush()
try:
args = preproc_app.split()
logger.info(f"{job.cute_id} preprocess Popen {args}")
......@@ -292,6 +293,7 @@ def preprocess(job, lock):
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=PREPROCESS_TIMEOUT_SECONDS)
proc.communicate()
lock.release()
except Exception as e:
message = f"Preprocess failed: {e}"
......@@ -362,7 +364,8 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
fp.write(f"# Balsam Postprocessor: {postproc_app}\n")
if timeout_handling: fp.write("# Invoked to handle RUN_TIMEOUT\n")
if error_handling: fp.write("# Invoked to handle RUN_ERROR\n")
fp.flush()
try:
args = postproc_app.split()
logger.info(f"{job.cute_id} postprocess Popen {args}")
......@@ -371,6 +374,7 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
stderr=subprocess.STDOUT, env=envs,
cwd=job.working_directory)
retcode = proc.wait(timeout=POSTPROCESS_TIMEOUT_SECONDS)
proc.communicate()
lock.release()
except Exception as e:
message = f"Postprocess failed: {e}"
......@@ -397,7 +401,7 @@ def postprocess(job, lock, *, error_handling=False, timeout_handling=False):
lock.acquire()
job.update_state('POSTPROCESSED', f"{os.path.basename(postproc_app)}")
lock.release()
logger.info(f"{job.cute_id} postprocess done")
logger.info(f"{job.cute_id} postprocess done")
def handle_timeout(job, lock):
......
......@@ -8,8 +8,10 @@ current_job = dag.current_job
print("Hello from make_sides_post")
if dag.ERROR or dag.TIMEOUT:
if dag.ERROR: print("make_sides_post recognized error flag")
else: print("make_sides_post recognized timeout flag")
if dag.ERROR:
print("make_sides_post recognized error flag")
else:
print("make_sides_post recognized timeout flag")
num_sides = int(os.environ['BALSAM_FT_NUM_SIDES'])
num_files = len(glob.glob("side*.dat"))
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment