Commit 95f28ca5 authored by Michael Salim's avatar Michael Salim
Browse files

fixed environment variables Popen bug; cleaned up logging

parent d40a69d5
......@@ -53,7 +53,7 @@ def create_new_runners(jobs, runner_group, worker_group):
created_one = False
running_pks = runner_group.running_job_pks
runnable_jobs = get_runnable_jobs(jobs, running_pks)
logger.debug(f"Have {len(runnable_jobs)} runnable jobs")
logger.debug(f"Have {len(runnable_jobs)} new runnable jobs")
while runnable_jobs:
try:
runner_group.create_next_runner(runnable_jobs, worker_group)
......
......@@ -13,7 +13,7 @@ class DEFAULTMPICommand(object):
return ""
def env_str(self, envs):
envstrs = (f"{self.env} {var}={val}" for var,val in envs.items())
envstrs = (f'{self.env} {var}="{val}"' for var,val in envs.items())
return " ".join(envstrs)
def threads(self, thread_per_rank, thread_per_core):
......
......@@ -40,7 +40,7 @@ def read_jobs(fp):
def run(job):
basename = os.path.basename(job.workdir)
outname = f"{basename}.out"
logger.debug(f"Running job {job.id}")
logger.debug(f"mpi_ensemble rank {RANK}: starting job {job.id}")
with cd(job.workdir) as _, open(outname, 'wb') as outf:
try:
status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
......@@ -60,14 +60,15 @@ def main(jobs_path):
job_list = None
if RANK == 0:
logger.debug(f"Master rank of mpi_ensemble.py: reading jobs from {jobs_path}")
with open(jobs_path) as fp:
job_list = list(read_jobs(fp))
job_list = COMM.bcast(job_list, root=0)
logger.debug(f"Broadcasted job list. Total {len(job_list)} jobs to run")
if RANK == 0:
logger.debug(f"Broadcasted job list. Total {len(job_list)} jobs to run")
for job in job_list[RANK::COMM.size]: run(job)
if __name__ == "__main__":
path = sys.argv[1]
logger.debug(f"Starting mpi_ensemble.py. Reading jobs from {path}")
main(path)
......@@ -110,21 +110,22 @@ class MPIRunner(Runner):
self.popen_args['stdout'] = self.outfile
self.popen_args['stderr'] = STDOUT
self.popen_args['bufsize'] = 1
logger.info(f"MPI Runner Popen args: {self.popen_args['args']}")
def update_jobs(self):
job = self.jobs[0]
#job.refresh_from_db() # TODO: handle RecordModified
retcode = self.process.poll()
if retcode == None:
logger.debug(f"Job {job.cute_id} still running")
logger.debug(f"MPI Job {job.cute_id} still running")
curstate = 'RUNNING'
msg = ''
elif retcode == 0:
logger.debug(f"Job {job.cute_id} return code 0: done")
logger.debug(f"MPI Job {job.cute_id} return code 0: done")
curstate = 'RUN_DONE'
msg = ''
else:
logger.debug(f"Job {job.cute_id} return code!=0: error")
logger.debug(f"MPI Job {job.cute_id} return code!=0: error")
curstate = 'RUN_ERROR'
msg = str(retcode)
if job.state != curstate: job.update_state(curstate, msg) # TODO: handle RecordModified
......@@ -159,7 +160,7 @@ class MPIEnsembleRunner(Runner):
num_ranks=nranks, ranks_per_node=rpn)
self.popen_args['args'] = shlex.split(mpi_str)
logger.debug(f"MPI Ensemble Popen args: {self.popen_args['args']}")
logger.info(f"MPI Ensemble Popen args: {self.popen_args['args']}")
def update_jobs(self):
'''Relies on stdout of mpi_ensemble.py'''
......@@ -172,7 +173,7 @@ class MPIEnsembleRunner(Runner):
logger.debug("Checking mpi_ensemble stdout for status updates...")
for line in self.monitor.available_lines():
logger.debug(f"Monitor stdout line: {line.strip()}")
logger.debug(f"mpi_ensemble stdout: {line.strip()}")
pk, state, *msg = line.split()
msg = ' '.join(msg)
if pk in self.jobs_by_pk and state in balsam.models.STATES:
......@@ -210,8 +211,8 @@ class RunnerGroup:
rpn = workers[0].max_ranks_per_node
assert all(w.num_nodes == nodes_per_worker for w in idle_workers)
assert all(w.max_ranks_per_node == rpn for w in idle_workers)
logger.info(f"Creating next runner: {nidle_workers} idle workers with "
f"{nodes_per_worker} nodes per worker; {len(runnable_jobs)} runnable jobs")
logger.debug(f"Available workers: {nidle_workers} idle with "
f"{nodes_per_worker} nodes per worker")
nidle_nodes = nidle_workers * nodes_per_worker
nidle_ranks = nidle_nodes * rpn
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment