Commit b67da820 authored by Michael Salim's avatar Michael Salim
Browse files

Workon runners

parent feef6a1a
......@@ -2,6 +2,7 @@
scheduling service and submits directly to a local job queue, or by the
Balsam service metascheduler'''
import argparse
from collections import namedtuple
import os
import time
......@@ -14,6 +15,9 @@ START_TIME = time.time() + 10.0
class BalsamLauncherException(Exception): pass
Worker = namedtuple('Worker', ['id', 'shape', 'block', 'corner',
'ranks_per_worker'])
SIGTIMEOUT = 'TIMEOUT!'
SIGNALS = {
signal.SIGINT: 'SIG_INT',
......
......@@ -2,7 +2,7 @@ from collections import namedtuple
from contextlib import nested
import os
import sys
from subprocess import Popen
from subprocess import Popen, STDOUT
from mpi4py import MPI
from runners import cd
......@@ -35,7 +35,7 @@ def run(job):
with nested(cd(job.workdir), open(outname, 'wb')) as (_,outf):
try:
status_msg(job.id, "RUNNING", msg="executing from mpi_ensemble")
proc = Popen(job.cmd, stdout=outf, stderr=outf)
proc = Popen(job.cmd, stdout=outf, stderr=STDOUT)
retcode = proc.wait()
except Exception as e:
status_msg(job.id, "RUN_ERROR", msg=str(e))
......
from collections import namedtuple
import functools
import os
from pathlib import Path
import signal
import shlex
import sys
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, STDOUT
from tempfile import NamedTemporaryFile
from threading import Thread
from queue import Queue, Empty
import balsam.models
from launcher import SIGNALS, BGQ_HOSTS, CRAY_HOSTS
from balsam.launchers.launcher import SIGNALS, BGQ_HOSTS, CRAY_HOSTS
from balsam.launchers import mpi_commands
class BalsamRunnerException(Exception): pass
Status = namedtuple('Status', ['id', 'state', 'msg'])
MPIcommand = None
def setup(launcher_config):
global MPIcommand
if launcher_config.host_type == 'BGQ':
MPIcommand = mpi_commands.BGQMPICommand()
elif launcher_config.host_type == 'CRAY':
MPIcommand = mpi_commands.CRAYMPICommand()
else:
MPIcommand = mpi_commands.DefaultMPICommand()
class cd:
'''Context manager for changing cwd'''
......@@ -26,77 +41,56 @@ class cd:
os.chdir(self.saved_path)
class RunnerConfig(object):
def __init__(self, host_type):
if host_type is None:
self.mpi = 'mpirun'
self.nproc = '-n'
self.ppn = '-ppn'
self.env = '-env'
self.cpu_binding = None
self.threads_per_rank = None
self.threads_per_core = None
elif host_type == 'BGQ':
self.mpi = 'runjob'
self.nproc = '--np'
self.ppn = '-p'
self.env = '--envs' # VAR1=val1:VAR2=val2
self.cpu_binding = None
self.threads_per_rank = None
self.threads_per_core = None
elif host_type == 'CRAY':
# 64 independent jobs, 1 per core of a KNL node: -n64 -N64 -d1 -j1
self.mpi = 'aprun'
self.nproc = '-n'
self.ppn = '-N'
self.env = '--env' # VAR1=val1:VAR2=val2
self.cpu_binding = '-cc depth'
self.threads_per_rank = '-d'
self.threads_per_core = '-j'
def command(self, workers):
self.set_workers_string(workers)
return f"{self.mpi}"
class Monitor(Thread):
class MonitorStream(Thread):
'''Thread for non-blocking read of Runner's subprocess stdout'''
def __init__(self, runner_output):
super().__init__()
self.stream = runner_output
self.queue = Queue()
self.daemon = True
def run(self):
# Call readline until empty string is returned
for line in iter(self.stream.readline, b''):
id, state, *msg = line.split()
msg = ' '.join(msg)
self.queue.put(Status(id, state, msg))
self.queue.put(line.decode('utf-8'))
self.stream.close()
def available_lines(self):
while True:
try:
yield self.queue.get_nowait()
except Empty:
return
class Runner:
'''Spawns ONE subprocess to run specified job(s) and monitor their execution'''
def __init__(self, job_list, host_type):
def __init__(self, job_list, worker_list):
self.jobs = job_list
self.jobs_by_pk = {job.pk : job for job in self.jobs}
self.host_type = host_type
self.process = None
self.monitor = None
self.popen_args = {'bufsize':1, 'stdout':PIPE}
self.cmd_generator = None
self.outfile = None
self.popen_args = {}
def start(self):
for signum in SIGNALS:
signal.signal(sigum, self.timeout)
self.process = Popen(**self.popen_args)
self.monitor = Monitor(self.process.stdout)
self.monitor.start()
if self.popen_args['stdout'] == PIPE:
self.monitor = MonitorStream(self.process.stdout)
self.monitor.start()
def update_jobs(self):
pass
def timeout(self, signum, stack):
@staticmethod
def get_app_cmd(job):
if job.application:
app = ApplicationDefinition.objects.get(name=job.application)
return f"{app.executable} {app.application_args}"
else:
return job.direct_command
def timeout(self, signum, stack)
sig_msg = SIGNALS.get(signum, signum)
message = f"{self.__class__.__name__} got signal {sig_msg}"
self.update_jobs()
......@@ -106,24 +100,77 @@ class Runner:
job.update_state('RUN_TIMEOUT', message)
class MPIRunner(Runner):
'''One subprocess: one mpi invocation'''
def __init__(self, cmd_generator, job):
self.cmd_generator = cmd_generator
self.job = job
'''One subprocess, one job'''
def __init__(self, job_list, worker_list):
super().__init__(job_list, worker_list)
if len(self.jobs) != 1:
raise BalsamRunnerException('MPIRunner must take exactly 1 job')
job = self.jobs[0]
app_cmd = self.get_app_cmd(job)
mpi = MPICommand(job, worker_list)
basename = os.path.basename(job.working_directory)
outname = os.path.join(job.working_directory, f"{basename}.out")
self.outfile = open(outname, 'w+b')
command = f"{mpi} {app_cmd}"
self.popen_args['args'] = shlex.split(command)
self.popen_args['cwd'] = job.working_directory
self.popen_args['stdout'] = self.outfile
self.popen_args['stderr'] = STDOUT
self.popen_args['bufsize'] = 1
def update_jobs(self):
job = self.jobs[0]
retcode = self.process.poll()
if retcode == None:
curstate = 'RUNNING'
msg = ''
elif retcode == 0:
curstate = 'RUN_FINISHED'
msg = ''
else:
curstate = 'RUN_ERROR'
msg = str(retcode)
if job.state != curstate:
job.update_state(curstate, msg) # TODO: handle RecordModified
class MPIEnsembleRunner(Runner):
'''One subprocess: an ensemble of serial jobs run in an mpi4py wrapper'''
def __init__(self, job_list, worker_list)
def __init__(self, job_list, worker_list):
from balsam.launchers import mpi_ensemble
mpi_ensemble_exe = os.path.abspath(mpi_ensemble.__file__)
super().__init__(job_list, worker_list)
root_dir = Path(self.jobs[0].working_directory).parent
self.popen_args['bufsize'] = 1
self.popen_args['stdout'] = PIPE
self.popen_args['stderr'] = STDOUT
self.popen_args['cwd'] = root_dir
with NamedTemporaryFile(prefix='mpi-ensemble', dir=root_dir,
delete=False, mode='w') as fp:
self.ensemble_filename = fp.name
for job in self.job_list:
cmd = self.get_app_cmd(job)
fp.write(f"{job.pk} {job.working_directory} {cmd}\n")
nproc = len(worker_list) * worker_list[0].ranks_per_worker
mpi = MPICommand(self.jobs[0], worker_list, nproc=nproc)
command = f"{mpi} {mpi_ensemble_exe} {self.ensemble_filename}"
self.popen_args['args'] = shlex.split(command)
def update_jobs(self):
while True:
try:
status = self.monitor.queue.get_nowait()
except Empty:
return
for line in self.monitor.available_lines():
pk, state, *msg = line.split()
msg = ' '.join(msg)
if pk in self.jobs_by_pk and state in balsam.models.STATES:
self.jobs_by_pk[id].update_state(state, msg) # TODO: handle RecordModified exception
else:
pk, state, msg = status
if pk in self.jobs_by_pk and state in balsam.models.STATES:
self.jobs_by_pk[id].update_state(state, msg) # TODO: handle RecordModified exception
else:
raise BalsamRunnerException(f"Invalid status update: {status}")
raise BalsamRunnerException(f"Invalid status update: {status}")
......@@ -186,6 +186,10 @@ class BalsamJob(models.Model):
'Number of hyperthreads per physical core (if applicable)',
help_text='Number of hyperthreads per physical core.',
default=1)
environ_vars = models.TextField(
'Environment variables specific to this job',
help_text="Colon-separated list of envs like VAR1=value1:VAR2=value2",
default='')
launcher_info = models.TextField(
'Scheduler ID',
......@@ -197,17 +201,28 @@ class BalsamJob(models.Model):
'Application to Run',
help_text='The application to run; located in Applications database',
default='')
application_args = models.TextField(
'Command-line args to the application exe',
help_text='Command line arguments used by the Balsam job runner',
default='')
direct_command = models.TextField(
'Command line to execute (specified with balsam qsub <args> <command>)',
help_text="Instead of creating BalsamJobs that point to a pre-defined "
"application, users can directly add jobs consisting of a single command "
"line with `balsam qsub`. This direct command is then invoked by the "
"Balsam job launcher.",
default='')
preprocess = models.TextField(
'Preprocessing Script',
help_text='A script that is run in a job working directory prior to submitting the job to the queue.'
' If blank, will default to the default_preprocess script defined for the
application.',
' If blank, will default to the default_preprocess script defined for the application.',
default='')
postprocess = models.TextField(
'Postprocessing Script',
help_text='A script that is run in a job working directory after the job has completed.'
' If blank, will default to the default_postprocess script defined for the
application.',
' If blank, will default to the default_postprocess script defined for the application.',
default='')
state = models.TextField(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment