Commit d40a69d5 authored by Michael Salim's avatar Michael Salim

Cobalt and cray workers

parent afafa5b5
......@@ -2,7 +2,7 @@ import subprocess
import sys
import shlex
import os
import time
from datetime import datetime
from collections import namedtuple
from django.conf import settings
......@@ -27,6 +27,7 @@ class CobaltScheduler(Scheduler.Scheduler):
'time_remaining' : 'TimeRemaining',
'state' : 'State',
}
GENERIC_NAME_MAP = {v:k for k,v in JOBSTATUS_VARIABLES.items()}
def _make_submit_cmd(self, job, cmd):
exe = settings.BALSAM_SCHEDULER_SUBMIT_EXE # qsub
......@@ -42,15 +43,20 @@ class CobaltScheduler(Scheduler.Scheduler):
def get_status(self, scheduler_id, jobstatus_vars=None):
if jobstatus_vars is None:
jobstatus_vars = JOBSTATUS_VARIABLES.values()
jobstatus_vars = self.JOBSTATUS_VARIABLES.values()
else:
jobstatus_vars = [JOBSTATUS_VARIABLES[a] for a in jobstatus_vars]
info = qstat(scheduler_id, attrs)
for attr in info:
if 'time' in attr:
time = time.strptime(info[attr], '%H:%M:%S')
time_sec = time.hour*3600 + time.min*60 + time.sec
info[attr+'_sec'] = time_sec
jobstatus_vars = [self.JOBSTATUS_VARIABLES[a] for a in jobstatus_vars]
logger.debug(f"Cobalt ID {scheduler_id} get_status:")
info = qstat(scheduler_id, jobstatus_vars)
info = {self.GENERIC_NAME_MAP[k] : v for k,v in info.items()}
time_attrs_seconds = {k+"_sec" : datetime.strptime(v, '%H:%M:%S')
for k,v in info.items() if 'time' in k}
for k,time in time_attrs_seconds.items():
time_attrs_seconds[k] = time.hour*3600 + time.minute*60 + time.second
info.update(time_attrs_seconds)
logger.debug(str(info))
return info
def qstat(scheduler_id, attrs):
......@@ -62,14 +68,13 @@ def qstat(scheduler_id, attrs):
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
stdout, _ = p.communicate()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
logger.debug('qstat ouput: \n' + stdout)
if p.returncode != 0:
logger.error('return code for qstat is non-zero. stdout = \n' +
stdout + '\n stderr = \n' + stderr)
logger.exception('return code for qstat is non-zero:\n'+stdout)
raise NoQStatInformation("qstat nonzero return code: this might signal job is done")
try:
logger.debug('parsing qstat ouput: \n' + stdout)
qstat_fields = stdout.split('\n')[2].split()
qstat_info = {attr : qstat_fields[i] for (i,attr) in
enumerate(attrs)}
......
......@@ -67,4 +67,4 @@ class CRAYMPICommand(DEFAULTMPICommand):
def worker_str(self, workers):
if not workers:
return ""
return f"-L {','.join(worker.id for worker in workers)}"
return f"-L {','.join(str(worker.id) for worker in workers)}"
......@@ -20,12 +20,14 @@ from django.db import transaction
import balsam.models
from balsamlauncher import mpi_commands
from balsamlauncher import mpi_ensemble
from balsamlauncher.exceptions import *
from balsamlauncher import cd
import logging
logger = logging.getLogger(__name__)
from importlib.util import find_spec
MPI_ENSEMBLE_EXE = find_spec("balsamlauncher.mpi_ensemble").origin
class MonitorStream(Thread):
......@@ -132,8 +134,6 @@ class MPIEnsembleRunner(Runner):
'''One subprocess: an ensemble of serial jobs run in an mpi4py wrapper'''
def __init__(self, job_list, worker_list):
mpi_ensemble_exe = os.path.abspath(mpi_ensemble.__file__)
super().__init__(job_list, worker_list)
root_dir = Path(self.jobs[0].working_directory).parent
......@@ -153,7 +153,7 @@ class MPIEnsembleRunner(Runner):
rpn = worker_list[0].max_ranks_per_node
nranks = sum(w.num_nodes*rpn for w in worker_list)
envs = self.jobs[0].get_envs() # TODO: different envs for each job
app_cmd = f"{sys.executable} {mpi_ensemble_exe} {ensemble_filename}"
app_cmd = f"{sys.executable} {MPI_ENSEMBLE_EXE} {ensemble_filename}"
mpi_str = self.mpi_cmd(worker_list, app_cmd=app_cmd, envs=envs,
num_ranks=nranks, ranks_per_node=rpn)
......
......@@ -28,7 +28,11 @@ class WorkerGroup:
self.workers = []
self.setup = getattr(self, f"setup_{self.host_type}")
self.setup(config)
logger.debug(f"Built {len(self.workers)} {self.host_type} workers")
logger.info(f"Built {len(self.workers)} {self.host_type} workers")
for worker in self.workers:
line = (f"ID {worker.id} NODES {worker.num_nodes} MAX-RANKS-PER-NODE"
f" {worker.max_ranks_per_node}")
logger.debug(line)
def __iter__(self):
return iter(self.workers)
......@@ -39,6 +43,9 @@ class WorkerGroup:
def setup_CRAY(self, config):
# workers_str is string like: 1001-1005,1030,1034-1200
node_ids = []
if not self.workers_str:
raise ValueError("Cray WorkerGroup needs workers_str to setup")
ranges = self.workers_str.split(',')
for node_range in ranges:
lo, *hi = node_range.split('-')
......
......@@ -128,7 +128,7 @@ class SCPHandler:
def stage_in( self, source_url, destination_directory ):
parts = urlparse.urlparse( source_url )
command = 'scp -p -r %s:%s %s' % (source_url, destination_directory)
print('transfer.stage_in: command=' + command )
logger.debug('transfer.stage_in: command=' + command )
ret = os.system(command)
if ret:
raise Exception("Error in stage_in: %d" % ret)
......@@ -136,7 +136,7 @@ class SCPHandler:
def stage_out( self, source_directory, destination_url ):
# ensure that source and destination each have a trailing '/'
command = 'scp -p -r %s %s' % (source_directory, destination_url)
print('transfer.stage_out: command=' + command)
logger.debug('transfer.stage_out: command=' + command)
ret = os.system(command)
if ret:
raise Exception("Error in stage_out: %d" % ret)
......
......@@ -93,7 +93,6 @@ class BalsamDAGTests(BalsamTestCase):
# user postprocess script: use dag API to kill the "A" subtree
out = self.mock_postprocessor_run(A, "kill")
print(out)
# There are still 5 jobs
# But now A,B,C are killed; D,E unaffected
......
import os
from tests.BalsamTestCase import BalsamTestCase
from balsamlauncher import worker
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment