runners.py 12.3 KB
Newer Older
Michael Salim's avatar
Michael Salim committed
1
2
'''A Runner is constructed with a list of jobs and a list of idle workers. It
creates and monitors the execution subprocess, updating job states in the DB as
Michael Salim's avatar
Michael Salim committed
3
4
5
necessary. RunnerGroup has a collection of Runner objects, logic for creating
the next Runner (i.e. assigning jobs to nodes), and the public interface to
monitor runners'''
Michael Salim's avatar
Michael Salim committed
6

7
import functools
Michael Salim's avatar
Michael Salim committed
8
from math import ceil
Michael Salim's avatar
Michael Salim committed
9
10
11
import os
from pathlib import Path
import shlex
12
import sys
Michael Salim's avatar
Michael Salim committed
13
14
from subprocess import Popen, PIPE, STDOUT
from tempfile import NamedTemporaryFile
15
16
17
from threading import Thread
from queue import Queue, Empty

Michael Salim's avatar
Michael Salim committed
18
from django.conf import settings
Michael Salim's avatar
Michael Salim committed
19
from django.db import transaction
Michael Salim's avatar
Michael Salim committed
20

21
import balsam.models
22
23
24
25
26
27
from balsamlauncher import mpi_commands
from balsamlauncher.exceptions import *
from balsamlauncher import cd

import logging
logger = logging.getLogger(__name__)
Michael Salim's avatar
Michael Salim committed
28
29
30
    
from importlib.util import find_spec
MPI_ENSEMBLE_EXE = find_spec("balsamlauncher.mpi_ensemble").origin
31

32
33
34
def get_tail(fname, nlines=5, indent='    '):
    proc = Popen(f'tail -n {nlines} {fname}'.split(),stdout=PIPE, 
                 stderr=STDOUT)
35
    tail = proc.communicate()[0].decode()
36
37
38
39
40
    lines = tail.split('\n')
    for i, line in enumerate(lines[:]):
        lines[i] = indent + line
    return '\n'.join(lines)

41

Michael Salim's avatar
Michael Salim committed
42
class MonitorStream(Thread):
Michael Salim's avatar
Michael Salim committed
43
    '''Thread: non-blocking read of a process's stdout'''
44
    def __init__(self, runner_output):
Michael Salim's avatar
Michael Salim committed
45
        super().__init__()
46
47
48
        self.stream = runner_output
        self.queue = Queue()
        self.daemon = True
Michael Salim's avatar
Michael Salim committed
49

50
51
52
    def run(self):
        # Call readline until empty string is returned
        for line in iter(self.stream.readline, b''):
Michael Salim's avatar
Michael Salim committed
53
            self.queue.put(line.decode('utf-8'))
54
        self.stream.close()
Michael Salim's avatar
Michael Salim committed
55

Michael Salim's avatar
Michael Salim committed
56
57
    def available_lines(self):
        while True:
Michael Salim's avatar
Michael Salim committed
58
59
            try: yield self.queue.get_nowait()
            except Empty: return
60
61
62
63


class Runner:
    '''Spawns ONE subprocess to run specified job(s) and monitor their execution'''
Michael Salim's avatar
Michael Salim committed
64
65
66
67
    def __init__(self, job_list, worker_list):
        host_type = worker_list[0].host_type
        assert all(w.host_type == host_type for w in worker_list)
        self.worker_list = worker_list
Michael Salim's avatar
Michael Salim committed
68
69
        mpi_cmd_class = getattr(mpi_commands, f"{host_type}MPICommand")
        self.mpi_cmd = mpi_cmd_class()
70
        self.jobs = job_list
71
        self.jobs_by_pk = {str(job.pk) : job for job in self.jobs}
72
73
        self.process = None
        self.monitor = None
Michael Salim's avatar
Michael Salim committed
74
75
        self.outfile = None
        self.popen_args = {}
76
77
78

    def start(self):
        self.process = Popen(**self.popen_args)
Michael Salim's avatar
Michael Salim committed
79
80
81
        if self.popen_args['stdout'] == PIPE:
            self.monitor = MonitorStream(self.process.stdout)
            self.monitor.start()
82
83

    def update_jobs(self):
Michael Salim's avatar
Michael Salim committed
84
        raise NotImplementedError
85

Michael Salim's avatar
Michael Salim committed
86
87
88
    def finished(self):
        return self.process.poll() is not None

Michael Salim's avatar
Michael Salim committed
89
    def timeout(self):
90
        self.process.terminate()
91
92
        for job in self.jobs:
            if job.state == 'RUNNING': job.update_state('RUN_TIMEOUT')
93
94

class MPIRunner(Runner):
Michael Salim's avatar
Michael Salim committed
95
    '''One subprocess, one job'''
Michael Salim's avatar
Michael Salim committed
96
    def __init__(self, job_list, worker_list):
Michael Salim's avatar
Michael Salim committed
97
98
99

        super().__init__(job_list, worker_list)
        if len(self.jobs) != 1:
100
            raise BalsamRunnerError('MPIRunner must take exactly 1 job')
Michael Salim's avatar
Michael Salim committed
101
102

        job = self.jobs[0]
103
104
105
106
107
108
109
        envs = job.get_envs() # dict
        app_cmd = job.app_cmd
        nranks = job.num_ranks
        rpn = job.ranks_per_node
        tpr = job.threads_per_rank
        tpc = job.threads_per_core

110
111
112
        # Note that environment variables are passed through the MPI run command
        # line, rather than Popen directly, due to ALCF restrictions: 
        # https://www.alcf.anl.gov/user-guides/running-jobs-xc40#environment-variables
113
114
115
        mpi_str = self.mpi_cmd(worker_list, app_cmd=app_cmd, envs=envs,
                               num_ranks=nranks, ranks_per_node=rpn,
                               threads_per_rank=tpr, threads_per_core=tpc)
116
        
Michael Salim's avatar
Michael Salim committed
117
118
119
        basename = os.path.basename(job.working_directory)
        outname = os.path.join(job.working_directory, f"{basename}.out")
        self.outfile = open(outname, 'w+b')
120
        self.popen_args['args'] = shlex.split(mpi_str)
Michael Salim's avatar
Michael Salim committed
121
122
123
124
        self.popen_args['cwd'] = job.working_directory
        self.popen_args['stdout'] = self.outfile
        self.popen_args['stderr'] = STDOUT
        self.popen_args['bufsize'] = 1
125
        logger.info(f"MPI Runner Popen args: {self.popen_args['args']}")
126
        logger.info(f"MPI Runner: writing output to {outname}")
Michael Salim's avatar
Michael Salim committed
127
128
129

    def update_jobs(self):
        job = self.jobs[0]
Michael Salim's avatar
Michael Salim committed
130
        #job.refresh_from_db() # TODO: handle RecordModified
Michael Salim's avatar
Michael Salim committed
131
132
        retcode = self.process.poll()
        if retcode == None:
133
            logger.debug(f"MPI Job {job.cute_id} still running")
Michael Salim's avatar
Michael Salim committed
134
135
136
            curstate = 'RUNNING'
            msg = ''
        elif retcode == 0:
137
            logger.debug(f"MPI Job {job.cute_id} return code 0: done")
138
            curstate = 'RUN_DONE'
Michael Salim's avatar
Michael Salim committed
139
140
141
            msg = ''
        else:
            curstate = 'RUN_ERROR'
142
143
144
145
146
            self.process.communicate()
            self.outfile.close()
            tail = get_tail(self.outfile.name)
            msg = f"RETURN CODE {retcode}:\n{tail}"
            logger.debug(msg)
Michael Salim's avatar
Michael Salim committed
147
        if job.state != curstate: job.update_state(curstate, msg) # TODO: handle RecordModified
Michael Salim's avatar
Michael Salim committed
148

149

150

151
152
class MPIEnsembleRunner(Runner):
    '''One subprocess: an ensemble of serial jobs run in an mpi4py wrapper'''
Michael Salim's avatar
Michael Salim committed
153
    def __init__(self, job_list, worker_list):
Michael Salim's avatar
Michael Salim committed
154
155
156
157
158
159
160
161
162

        super().__init__(job_list, worker_list)
        root_dir = Path(self.jobs[0].working_directory).parent
        
        self.popen_args['bufsize'] = 1
        self.popen_args['stdout'] = PIPE
        self.popen_args['stderr'] = STDOUT
        self.popen_args['cwd'] = root_dir

163
        # mpi_ensemble.py reads jobs from this temp file
Michael Salim's avatar
Michael Salim committed
164
165
        with NamedTemporaryFile(prefix='mpi-ensemble', dir=root_dir, 
                                delete=False, mode='w') as fp:
166
            ensemble_filename = os.path.abspath(fp.name)
Michael Salim's avatar
Michael Salim committed
167
            for job in self.jobs:
168
                cmd = job.app_cmd
Michael Salim's avatar
Michael Salim committed
169
170
                fp.write(f"{job.pk} {job.working_directory} {cmd}\n")

171
172
173
        rpn = worker_list[0].max_ranks_per_node
        nranks = sum(w.num_nodes*rpn for w in worker_list)
        envs = self.jobs[0].get_envs() # TODO: different envs for each job
Michael Salim's avatar
Michael Salim committed
174
        app_cmd = f"{sys.executable} {MPI_ENSEMBLE_EXE} {ensemble_filename}"
175
176
177

        mpi_str = self.mpi_cmd(worker_list, app_cmd=app_cmd, envs=envs,
                               num_ranks=nranks, ranks_per_node=rpn)
Michael Salim's avatar
Michael Salim committed
178

179
        self.popen_args['args'] = shlex.split(mpi_str)
180
        logger.info(f"MPI Ensemble Popen args: {self.popen_args['args']}")
Michael Salim's avatar
Michael Salim committed
181

182
    def update_jobs(self):
Michael Salim's avatar
Michael Salim committed
183
        '''Relies on stdout of mpi_ensemble.py'''
184
185
186
187
188
189
190
191
        retcode = self.process.poll()
        if retcode not in [None, 0]:
            msg = "mpi_ensemble.py had nonzero return code:\n"
            msg += "".join(self.monitor.available_lines())
            logger.exception(msg)
            raise RuntimeError(msg)

        logger.debug("Checking mpi_ensemble stdout for status updates...")
Michael Salim's avatar
Michael Salim committed
192
193
194
195
        for line in self.monitor.available_lines():
            pk, state, *msg = line.split()
            msg = ' '.join(msg)
            if pk in self.jobs_by_pk and state in balsam.models.STATES:
Michael Salim's avatar
Michael Salim committed
196
197
                job = self.jobs_by_pk[pk]
                job.update_state(state, msg) # TODO: handle RecordModified exception
198
                logger.debug(f"Job {job.cute_id} updated to {state}: {msg}")
199
            else:
200
                logger.error(f"Invalid statusMsg from mpi_ensemble: {line.strip()}")
Michael Salim's avatar
Michael Salim committed
201
202
203
204

class RunnerGroup:
    
    MAX_CONCURRENT_RUNNERS = settings.BALSAM_MAX_CONCURRENT_RUNNERS
205
    def __init__(self, lock):
Michael Salim's avatar
Michael Salim committed
206
        self.runners = []
207
        self.lock = lock
Michael Salim's avatar
Michael Salim committed
208
209
210

    def __iter__(self):
        return iter(self.runners)
Michael Salim's avatar
Michael Salim committed
211
    
212
    def create_next_runner(self, runnable_jobs, workers):
Michael Salim's avatar
Michael Salim committed
213
214
215
216
217
        '''Implements one particular strategy for choosing the next job, assuming
        all jobs are either single-process or MPI-parallel. Will return the serial
        ensemble job or single MPI job that occupies the largest possible number of
        idle nodes'''

218
        if len(self.runners) == self.MAX_CONCURRENT_RUNNERS:
219
            logger.info("Cannot create another runner: at max")
Michael Salim's avatar
Michael Salim committed
220
            raise ExceededMaxRunners(
221
                f"Cannot have more than {self.MAX_CONCURRENT_RUNNERS} simultaneous runners"
Michael Salim's avatar
Michael Salim committed
222
223
224
            )

        idle_workers = [w for w in workers if w.idle]
225
226
227
228
229
        nidle_workers = len(idle_workers)
        nodes_per_worker = workers[0].num_nodes
        rpn = workers[0].max_ranks_per_node
        assert all(w.num_nodes == nodes_per_worker for w in idle_workers)
        assert all(w.max_ranks_per_node == rpn for w in idle_workers)
230
231
        logger.debug(f"Available workers: {nidle_workers} idle with "
            f"{nodes_per_worker} nodes per worker")
232
233
234
235
        nidle_nodes =  nidle_workers * nodes_per_worker
        nidle_ranks = nidle_nodes * rpn

        serial_jobs = [j for j in runnable_jobs if j.num_ranks == 1]
Michael Salim's avatar
Michael Salim committed
236
        nserial = len(serial_jobs)
237
        logger.debug(f"{nserial} single-process jobs can run")
Michael Salim's avatar
Michael Salim committed
238

239
240
        mpi_jobs = [j for j in runnable_jobs if 1 < j.num_nodes <= nidle_nodes or
                    (1==j.num_nodes<=nidle_nodes and j.ranks_per_node > 1)]
Michael Salim's avatar
Michael Salim committed
241
242
        largest_mpi_job = (max(mpi_jobs, key=lambda job: job.num_nodes) 
                           if mpi_jobs else None)
243
244
245
246
247
        if largest_mpi_job:
            logger.debug(f"{len(mpi_jobs)} MPI jobs can run; largest takes "
            f"{largest_mpi_job.num_nodes} nodes")
        else:
            logger.debug("No MPI jobs can run")
Michael Salim's avatar
Michael Salim committed
248
        
Michael Salim's avatar
Michael Salim committed
249
250
251
        # Try to fill all available nodes with serial ensemble runner
        # If there are not enough serial jobs; run the larger of:
        # largest MPI job that fits, or the remaining serial jobs
252
        if nserial >= nidle_ranks:
253
            jobs = serial_jobs[:nidle_ranks] # TODO: try putting ALL serial jobs into one MPIEnsemble
Michael Salim's avatar
Michael Salim committed
254
255
            assigned_workers = idle_workers
            runner_class = MPIEnsembleRunner
256
257
            logger.info(f"Running {len(jobs)} serial jobs on {nidle_workers} workers "
            f"with {nodes_per_worker} nodes-per-worker and {rpn} ranks per node")
Michael Salim's avatar
Michael Salim committed
258
259
        elif largest_mpi_job and largest_mpi_job.num_nodes > nserial // rpw:
            jobs = [largest_mpi_job]
260
261
            num_workers = ceil(largest_mpi_job.num_nodes / nodes_per_worker)
            assigned_workers = idle_workers[:num_workers]
Michael Salim's avatar
Michael Salim committed
262
            runner_class = MPIRunner
263
            logger.info(f"Running {largest_mpi_job.num_nodes}-node MPI job")
Michael Salim's avatar
Michael Salim committed
264
265
        else:
            jobs = serial_jobs
266
267
            nworkers = ceil(nserial/rpn/nodes_per_worker)
            assigned_workers = idle_workers[:nworkers]
Michael Salim's avatar
Michael Salim committed
268
            runner_class = MPIEnsembleRunner
269
270
271
            logger.info(f"Running {len(jobs)} serial jobs on {nworkers} workers "
                        f"totalling {nworkers*nodes_per_worker} nodes "
                        f"with {rpn} ranks per worker")
Michael Salim's avatar
Michael Salim committed
272
        
273
274
275
        if not jobs: 
            logger.info(f"Not enough idle workers to handle the runnable jobs")
            raise NoAvailableWorkers
Michael Salim's avatar
Michael Salim committed
276
277

        runner = runner_class(jobs, assigned_workers)
Michael Salim's avatar
Michael Salim committed
278
        runner.start()
Michael Salim's avatar
Michael Salim committed
279
280
281
282
283
284
        self.runners.append(runner)
        for worker in assigned_workers: worker.idle = False

    def update_and_remove_finished(self):
        # TODO: Benchmark performance overhead; does grouping into one
        # transaction save significantly?
285
        logger.debug(f"Checking status of {len(self.runners)} active runners")
Michael Salim's avatar
Michael Salim committed
286
        any_finished = False
287
288
289
        self.lock.acquire()
        for runner in self.runners: runner.update_jobs()
        self.lock.release()
Michael Salim's avatar
Michael Salim committed
290

291
292
293
294
295
296
297
298
299
300
301
302
        finished_runners = (r for r in self.runners if r.finished())

        for runner in finished_runners:
            if any(j.state not in ['RUN_DONE','RUN_ERROR','RUN_TIMEOUT'] for j in runner.jobs):
                self.lock.acquire()
                runner.update_jobs()
                self.lock.release()
            if any(j.state not in ['RUN_DONE','RUN_ERROR','RUN_TIMEOUT'] for j in runner.jobs):
                msg = (f"Job {job.cute_id} runner process done, but failed to update job state.")
                logger.exception(msg)
                raise RuntimeError(msg)
            else:
Michael Salim's avatar
Michael Salim committed
303
304
305
306
                any_finished = True
                self.runners.remove(runner)
                for worker in runner.worker_list:
                    worker.idle = True
Michael Salim's avatar
Michael Salim committed
307
308
309
310
        return any_finished

    @property
    def running_job_pks(self):
Michael Salim's avatar
Michael Salim committed
311
        return [j.pk for runner in self.runners for j in runner.jobs]