Commit c3cbf3c1 authored by Michael Salim's avatar Michael Salim

Cooley envs and MPI runner

parent 95f28ca5
......@@ -19,8 +19,9 @@ def new_scheduler():
class CobaltScheduler(Scheduler.Scheduler):
SCHEDULER_VARIABLES = {
'id' : 'COBALT_JOBID',
'num_workers' : 'COBALT_PARTSIZE',
'workers_str' : 'COBALT_PARTNAME',
'num_workers' : 'COBALT_PARTSIZE',
'workers_str' : 'COBALT_PARTNAME',
'workers_file' : 'COBALT_NODEFILE',
}
JOBSTATUS_VARIABLES = {
'id' : 'JobID',
......
......@@ -15,8 +15,9 @@ class SubmissionRecord:
class Scheduler:
RECOGNIZED_HOSTS = {
'BGQ' : 'vesta cetus mira'.split(),
'CRAY' : 'theta'.split(),
'BGQ' : 'vesta cetus mira'.split(),
'CRAY' : 'theta'.split(),
'COOLEY' : 'cooley cc'.split()
}
SCHEDULER_VARIABLES = {} # mappings defined in subclass
JOBSTATUS_VARIABLES = {}
......@@ -27,6 +28,7 @@ class Scheduler:
self.pid = None
self.num_workers = None
self.workers_str = None
self.workers_file = None
self.current_scheduler_id = None
self.remaining_seconds = None
self.last_check_seconds = None
......@@ -51,18 +53,23 @@ class Scheduler:
logger.debug(f"Detected scheduler ID {self.current_scheduler_id}")
def get_env(self):
'''Check for environment variables (e.g. COBALT_JOBID) indicating
currently inside a scheduled job'''
environment = {}
for generic_name, specific_var in self.SCHEDULER_VARIABLES.items():
if specific_var not in os.environ:
raise SchedulerException(f"{specific_var} not in environment")
else:
environment[generic_name] = os.environ[specific_var]
if 'id' in environment:
environment[generic_name] = os.environ.get([specific_var], None)
if environment['id']:
self.current_scheduler_id = environment['id']
if 'num_workers' in environment:
self.num_workers = environment['num_workers']
if 'workers_str' in environment:
if environment['num_workers']:
self.num_workers = int(environment['num_workers'])
if environment['workers_str']:
self.workers_str = environment['workers_str']
if environment['workers_file']:
self.workers_file = environment['workers_file']
if not environment['id']:
raise SchedulerException(f"No ID in environment")
return environment
def remaining_time_seconds(self, sched_id=None):
......@@ -83,6 +90,7 @@ class Scheduler:
info = self.get_status(sched_id)
self.remaining_seconds = info['time_remaining_sec']
self.last_check_seconds = time.time()
logger.debug(f"{self.remaining_seconds} seconds remaining")
return self.remaining_seconds
def submit(self, job, cmd):
......
......@@ -145,7 +145,8 @@ if __name__ == "__main__":
transition_pool = transitions.TransitionProcessPool()
runner_group = runners.RunnerGroup(transition_pool.lock)
worker_group = worker.WorkerGroup(args, host_type=scheduler.host_type,
workers_str=scheduler.workers_str)
workers_str=scheduler.workers_str,
workers_file=scheduler.workers_file)
detect_dead_runners(job_source)
......
......@@ -68,3 +68,19 @@ class CRAYMPICommand(DEFAULTMPICommand):
if not workers:
return ""
return f"-L {','.join(str(worker.id) for worker in workers)}"
class COOLEYMPICommand(DEFAULTMPICommand):
def __init__(self):
# 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1
self.mpi = 'mpirun'
self.nproc = '-n'
self.ppn = '--ppn'
self.env = '-e'
self.cpu_binding = None
self.threads_per_rank = None
self.threads_per_core = None
def worker_str(self, workers):
if not workers:
return ""
return f"--hosts {','.join(str(worker.id) for worker in workers)} "
......@@ -22,17 +22,20 @@ class Worker:
self.idle = True
class WorkerGroup:
def __init__(self, config, *, host_type=None, workers_str=None):
def __init__(self, config, *, host_type=None, workers_str=None,
workers_file=None):
self.host_type = host_type
self.workers_str = workers_str
self.workers_file = workers_file
self.workers = []
self.setup = getattr(self, f"setup_{self.host_type}")
self.setup(config)
logger.info(f"Built {len(self.workers)} {self.host_type} workers")
for worker in self.workers:
line = (f"ID {worker.id} NODES {worker.num_nodes} MAX-RANKS-PER-NODE"
f" {worker.max_ranks_per_node}")
logger.debug(line)
logger.debug(
f"ID {worker.id} NODES {worker.num_nodes} MAX-RANKS-PER-NODE"
f" {worker.max_ranks_per_node}"
)
def __iter__(self):
return iter(self.workers)
......@@ -65,6 +68,20 @@ class WorkerGroup:
# For each worker, set num_nodes and max_ranks_per_node attributes
pass
def setup_COOLEY(self, config):
node_ids = []
if not self.workers_file:
raise ValueError("Cooley WorkerGroup needs workers_file to setup")
data = open(self.workers_file).read()
splitter = ',' if ',' in data else None
node_ids = data.split(splitter)
self.workers_str = " ".join(worker_ids)
for id in node_ids:
self.workers.append(Worker(id, host_type='COOLEY', num_nodes=1,
max_ranks_per_node=16))
def setup_DEFAULT(self, config):
# Use command line config: num_workers, nodes_per_worker,
# max_ranks_per_node
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment