models.py 18.4 KB
Newer Older
1
2
3
4
import os
import json
import logging
import sys
Michael Salim's avatar
Michael Salim committed
5
from datetime import datetime
Michael Salim's avatar
Michael Salim committed
6
from socket import gethostname
7
8
import uuid

9
from django.core.exceptions import ValidationError,ObjectDoesNotExist
jtchilders's avatar
jtchilders committed
10
11
from django.conf import settings
from django.db import models
12
from django.db.utils import OperationalError
13
from concurrency.fields import IntegerVersionField
14
from concurrency.exceptions import RecordModifiedError
jtchilders's avatar
jtchilders committed
15

16
from balsam.common import Serializer
jtchilders's avatar
jtchilders committed
17

18
logger = logging.getLogger(__name__)
jtchilders's avatar
jtchilders committed
19

20
21
class InvalidStateError(ValidationError): pass
class InvalidParentsError(ValidationError): pass
22
class NoApplication(Exception): pass
23

Michael Salim's avatar
Michael Salim committed
24
25
TIME_FMT = '%m-%d-%Y %H:%M:%S'

26
27
STATES = '''
CREATED
Michael Salim's avatar
Michael Salim committed
28
29
LAUNCHER_QUEUED
AWAITING_PARENTS
30
31
READY

Michael Salim's avatar
Michael Salim committed
32
33
STAGED_IN
PREPROCESSED
34
35
36
37

RUNNING
RUN_DONE

Michael Salim's avatar
Michael Salim committed
38
POSTPROCESSED
39
40
41
42
43
44
45
JOB_FINISHED

RUN_TIMEOUT
RUN_ERROR
RESTART_READY

FAILED
Michael Salim's avatar
Michael Salim committed
46
USER_KILLED
47
48
49
50
PARENT_KILLED'''.split()

ACTIVE_STATES = '''
RUNNING
Michael Salim's avatar
Michael Salim committed
51
'''.split()
52
53
54

PROCESSABLE_STATES = '''
CREATED
Michael Salim's avatar
Michael Salim committed
55
56
LAUNCHER_QUEUED
AWAITING_PARENTS
57
READY
Michael Salim's avatar
Michael Salim committed
58
STAGED_IN
59
RUN_DONE
Michael Salim's avatar
Michael Salim committed
60
POSTPROCESSED
61
62
RUN_TIMEOUT
RUN_ERROR
Michael Salim's avatar
Michael Salim committed
63
'''.split()
64
65

RUNNABLE_STATES = '''
Michael Salim's avatar
Michael Salim committed
66
67
68
PREPROCESSED
RESTART_READY
'''.split()
69
70
71
72

END_STATES = '''
JOB_FINISHED
FAILED
Michael Salim's avatar
Michael Salim committed
73
USER_KILLED
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
PARENT_KILLED'''.split()

def assert_disjoint():
    groups = [ACTIVE_STATES, PROCESSABLE_STATES, RUNNABLE_STATES, END_STATES]
    joined = [state for g in groups for state in g]
    assert len(joined) == len(set(joined)) == len(STATES)
    assert set(joined) == set(STATES) 
    from itertools import combinations
    for g1,g2 in combinations(groups, 2):
        s1,s2 = set(g1), set(g2)
        assert s1.intersection(s2) == set()
assert_disjoint()

def validate_state(value):
    if value not in STATES:
        raise InvalidStateError(f"{value} is not a valid state in balsam.models")

def get_time_string():
Michael Salim's avatar
Michael Salim committed
92
93
94
95
    return datetime.now().strftime(TIME_FMT)

def from_time_string(s):
    return datetime.strptime(s, TIME_FMT)
96
97

def history_line(state='CREATED', message=''):
98
    return f"\n[{get_time_string()} {state}] ".rjust(46) + message
jtchilders's avatar
jtchilders committed
99
100


101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class BalsamJob(models.Model):
    ''' A DB representation of a Balsam Job '''

    version = IntegerVersionField() # optimistic lock

    job_id = models.UUIDField(
        primary_key=True,
        default=uuid.uuid4,
        editable=False)
    allowed_work_sites = models.TextField(
        'Allowed Work Sites',
        help_text='Name of the Balsam instance(s) where this job can run.',
        default='')
    work_site = models.TextField(
        'Actual work site',
        help_text='Name of the Balsam instance that handled this job.',
        default='')

    workflow = models.TextField(
        'Workflow Name',
        help_text='Name of the workflow to which this job belongs',
        default='')
    name = models.TextField(
        'Job Name',
        help_text='A name for the job given by the user.',
        default='')
    description = models.TextField(
        'Job Description',
        help_text='A description of the job.',
        default='')

    working_directory = models.TextField(
        'Local Job Directory',
        help_text='Local working directory where job files are stored.',
        default='')
    parents = models.TextField(
        'IDs of the parent jobs which must complete prior to the start of this job.',
        default='[]')

    input_files = models.TextField(
        'Input File Patterns',
Michael Salim's avatar
Michael Salim committed
142
        help_text="Space-delimited filename patterns that will be searched in the parents'"\
143
144
145
146
        "working directories. Every matching file will be made available in this"\
        "job's working directory (symlinks for local Balsam jobs, file transfer for"\
        "remote Balsam jobs). Default: all files from parent jobs are made available.",
        default='*')
Michael Salim's avatar
Michael Salim committed
147
    stage_in_url = models.TextField(
148
149
        'External stage in files or folders', help_text="A list of URLs for external data to be staged in prior to job processing. Job dataflow from parents to children is NOT handled here; see `input_files` field instead.",
        default='')
150
    stage_out_files = models.TextField(
151
152
153
        'External stage out files or folders',
        help_text="A string of filename patterns. Matches will be transferred to the stage_out_url. Default: no files are staged out",
        default='')
Michael Salim's avatar
Michael Salim committed
154
    stage_out_url = models.TextField(
155
156
157
158
        'Stage Out URL',
        help_text='The URLs to which designated stage out files are sent.',
        default='')

Michael Salim's avatar
Michael Salim committed
159
    wall_time_minutes = models.IntegerField(
160
161
162
163
164
165
166
167
168
169
        'Job Wall Time in Minutes',
        help_text='The number of minutes the job is expected to take',
        default=1)
    runtime_seconds = models.FloatField(
        'Measured Job Execution Time in seconds',
        help_text='The actual elapsed runtime of the job, measured by launcher.',
        default=0.0)
    num_nodes = models.IntegerField(
        'Number of Compute Nodes',
        help_text='The number of compute nodes requested for this job.',
Michael Salim's avatar
Michael Salim committed
170
        default=1)
171
    ranks_per_node = models.IntegerField(
172
173
        'Number of ranks per node',
        help_text='The number of MPI ranks per node to schedule for this job.',
Michael Salim's avatar
Michael Salim committed
174
        default=1)
175
176
177
178
179
180
181
182
    threads_per_rank = models.IntegerField(
        'Number of threads per MPI rank',
        help_text='The number of OpenMP threads per MPI rank (if applicable)',
        default=1)
    threads_per_core = models.IntegerField(
        'Number of hyperthreads per physical core (if applicable)',
        help_text='Number of hyperthreads per physical core.',
        default=1)
Michael Salim's avatar
Michael Salim committed
183
184
185
186
    environ_vars = models.TextField(
        'Environment variables specific to this job',
        help_text="Colon-separated list of envs like VAR1=value1:VAR2=value2",
        default='')
187
    
Michael Salim's avatar
Michael Salim committed
188
    scheduler_id = models.TextField(
189
        'Scheduler ID',
Michael Salim's avatar
Michael Salim committed
190
191
        help_text='Scheduler ID (if job assigned by metascheduler)',
        default='')
192
193
194
195
196

    application = models.TextField(
        'Application to Run',
        help_text='The application to run; located in Applications database',
        default='')
Michael Salim's avatar
Michael Salim committed
197
198
199
200
201
202
203
204
205
206
207
208
209
    application_args = models.TextField(
        'Command-line args to the application exe',
        help_text='Command line arguments used by the Balsam job runner',
        default='')

    direct_command = models.TextField(
        'Command line to execute (specified with balsam qsub <args> <command>)',
        help_text="Instead of creating BalsamJobs that point to a pre-defined "
        "application, users can directly add jobs consisting of a single command "
        "line with `balsam qsub`.  This direct command is then invoked by the  "
        "Balsam job launcher.",
        default='')

210
211
212
    preprocess = models.TextField(
        'Preprocessing Script',
        help_text='A script that is run in a job working directory prior to submitting the job to the queue.'
Michael Salim's avatar
Michael Salim committed
213
        ' If blank, will default to the default_preprocess script defined for the application.',
214
215
216
217
        default='')
    postprocess = models.TextField(
        'Postprocessing Script',
        help_text='A script that is run in a job working directory after the job has completed.'
Michael Salim's avatar
Michael Salim committed
218
        ' If blank, will default to the default_postprocess script defined for the application.',
219
        default='')
Michael Salim's avatar
Michael Salim committed
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
    post_error_handler = models.BooleanField(
        'Let postprocesser try to handle RUN_ERROR',
        help_text='If true, the postprocessor will be invoked for RUN_ERROR jobs'
        ' and it is up to the script to handle error and update job state.',
        default=False)
    post_timeout_handler = models.BooleanField(
        'Let postprocesser try to handle RUN_TIMEOUT',
        help_text='If true, the postprocessor will be invoked for RUN_TIMEOUT jobs'
        ' and it is up to the script to handle timeout and update job state.',
        default=False)
    auto_timeout_retry = models.BooleanField(
        'Automatically restart jobs that have timed out',
        help_text="If True and post_timeout_handler is False, then jobs will "
        "simply be marked RESTART_READY upon timing out.",
        default=True)
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252

    state = models.TextField(
        'Job State',
        help_text='The current state of the job.',
        default='CREATED',
        validators=[validate_state])
    state_history = models.TextField(
        'Job State History',
        help_text="Chronological record of the job's states",
        default=history_line)
    
    def save(self, force_insert=False, force_update=False, using=None, 
             update_fields=None):
        '''Override default Django save to ensure version always updated'''
        if update_fields is not None: 
            update_fields.append('version')
        if self._state.adding:
            update_fields = None
253
254
255
256
257
258

        # Work around sqlite3 DB locked error
        while True:
            try: models.Model.save(self, force_insert, force_update, using, update_fields)
            except OperationalError: pass
            else: break
259
260

    def __str__(self):
Michael Salim's avatar
Michael Salim committed
261
262
263
        return f'''
Balsam Job
----------
Michael Salim's avatar
Michael Salim committed
264
ID:                     {self.pk}
Michael Salim's avatar
Michael Salim committed
265
name:                   {self.name} 
266
workflow:               {self.workflow}
Michael Salim's avatar
Michael Salim committed
267
268
269
270
latest state:           {self.get_recent_state_str()}
description:            {self.description[:80]}
work site:              {self.work_site} 
allowed work sites:     {self.allowed_work_sites}
271
272
273
working_directory:      {self.working_directory}
parents:                {self.parents}
input_files:            {self.input_files}
Michael Salim's avatar
Michael Salim committed
274
275
stage_in_url:           {self.stage_in_url}
stage_out_url:          {self.stage_out_url}
276
277
stage_out_files:        {self.stage_out_files}
wall_time_minutes:      {self.wall_time_minutes}
Michael Salim's avatar
Michael Salim committed
278
actual_runtime:         {self.runtime_str()}
279
num_nodes:              {self.num_nodes}
Michael Salim's avatar
Michael Salim committed
280
281
threads per rank:       {self.threads_per_rank}
threads per core:       {self.threads_per_core}
282
ranks_per_node:         {self.ranks_per_node}
Michael Salim's avatar
Michael Salim committed
283
scheduler_id:           {self.scheduler_id}
Michael Salim's avatar
Michael Salim committed
284
285
286
287
288
289
290
291
292
293
294
application:            {self.application if self.application else 
                            self.direct_command}
args:                   {self.application_args}
envs:                   {self.environ_vars}
created with qsub:      {bool(self.direct_command)}
preprocess override:    {self.preprocess}
postprocess override:   {self.postprocess}
post handles error:     {self.post_error_handler}
post handles timeout:   {self.post_timeout_handler}
auto timeout retry:     {self.auto_timeout_retry}
'''.strip() + '\n'
Michael Salim's avatar
Michael Salim committed
295
    
296
297
298
299
300
301
302
303

    def get_parents_by_id(self):
        return json.loads(self.parents)

    def get_parents(self):
        parent_ids = self.get_parents_by_id()
        return BalsamJob.objects.filter(job_id__in=parent_ids)

304
305
306
307
    @property
    def num_ranks(self):
        return self.num_nodes * self.ranks_per_node

308
309
    @property
    def cute_id(self):
310
311
312
313
        if self.name:
            return f"[{self.name} | { str(self.pk)[:8] }]"
        else:
            return f"[{ str(self.pk)[:8] }]"
314

315
316
317
    @property
    def app_cmd(self):
        if self.application:
318
            app = ApplicationDefinition.objects.get(name=self.application)
319
            line = f"{app.executable} {self.application_args}"
320
        else:
321
322
            line = self.direct_command
        return ' '.join(os.path.expanduser(w) for w in line.split())
323

Michael Salim's avatar
Michael Salim committed
324
325
326
327
328
329
330
    def get_children(self):
        return BalsamJob.objects.filter(parents__icontains=str(self.pk))

    def get_children_by_id(self):
        children = self.get_children()
        return [c.pk for c in children]

331
332
333
334
335
336
337
338
339
    def get_child_by_name(self, name):
        children = self.get_children().filter(name=name)
        if children.count() == 0:
            raise ValueError(f"No child named {name}")
        elif children.count() > 1:
            raise ValueError(f"More than one child named {name}")
        else:
            return children.first()

340
341
342
343
344
345
346
347
348
349
350
351
352
    def set_parents(self, parents):
        try:
            parents_list = list(parents)
        except:
            raise InvalidParentsError("Cannot convert input to list")
        for i, parent in enumerate(parents_list):
            pk = parent.pk if isinstance(parent,BalsamJob) else parent
            if not BalsamJob.objects.filter(pk=pk).exists():
                raise InvalidParentsError("Job PK {pk} is not in the BalsamJob DB")
            parents_list[i] = str(pk)
        self.parents = json.dumps(parents_list)
        self.save(update_fields=['parents'])

Michael Salim's avatar
Michael Salim committed
353
    def get_application(self):
354
355
356
357
        if self.application:
            return ApplicationDefinition.objects.get(name=self.application)
        else:
            raise NoApplication
Michael Salim's avatar
Michael Salim committed
358
359
360
361
362
363
364
365
366

    @staticmethod
    def parse_envstring(s):
        result = {}
        entries = s.split(':')
        entries = [e.split('=') for e in entries]
        return {variable:value for (variable,value) in entries}

    def get_envs(self, *, timeout=False, error=False):
367
        keywords = 'BALSAM DJANGO PYTHON'.split()
368
369
        envs = {var:value for var,value in os.environ.items() 
                if any(keyword in var for keyword in keywords)}
370
371
372
373
        try:
            app = self.get_application()
        except NoApplication:
            app = None
Michael Salim's avatar
Michael Salim committed
374
375
376
377
378
379
380
381
382
383
384
385
386
387
        if app and app.environ_vars:
            app_vars = self.parse_envstring(app.environ_vars)
            envs.update(app_vars)
        if self.environ_vars:
            job_vars = self.parse_envstring(self.environ_vars)
            envs.update(job_vars)
    
        children = self.get_children_by_id()
        children = json.dumps([str(c) for c in children])
        balsam_envs = dict(
            BALSAM_JOB_ID=str(self.pk),
            BALSAM_PARENT_IDS=str(self.parents),
            BALSAM_CHILD_IDS=children,
        )
388
389
390
391

        if self.threads_per_rank > 1:
            balsam_envs['OMP_NUM_THREADS'] = self.threads_per_rank

392
393
        if timeout: balsam_envs['BALSAM_JOB_TIMEOUT']="TRUE"
        if error: balsam_envs['BALSAM_JOB_ERROR']="TRUE"
Michael Salim's avatar
Michael Salim committed
394
395
396
        envs.update(balsam_envs)
        return envs

397
    def update_state(self, new_state, message='',using=None):
398
399
400
401
402
        if new_state not in STATES:
            raise InvalidStateError(f"{new_state} is not a job state in balsam.models")

        self.state_history += history_line(new_state, message)
        self.state = new_state
403
404
405
406
407
408
409
410
411
412
413
414
        try:
            self.save(update_fields=['state', 'state_history'],using=using)
        except RecordModifiedError:
            self.refresh_from_db()
            if self.state == 'USER_KILLED' and new_state != 'USER_KILLED':
                return
            elif new_state == 'USER_KILLED':
                self.state_history += history_line(new_state, message)
                self.state = new_state
                self.save(update_fields=['state', 'state_history'],using=using)
            else:
                raise
415

Michael Salim's avatar
Michael Salim committed
416
    def get_recent_state_str(self):
417
        return self.state_history.split("\n")[-1].strip()
Michael Salim's avatar
Michael Salim committed
418

419
420
421
422
423
424
425
426
427
    def read_file_in_workdir(self, fname):
        work_dir = self.working_directory
        path = os.path.join(work_dir, fname)
        if not os.path.exists(path):
            raise ValueError(f"{fname} not found in working directory of"
            " {self.cute_id}")
        else:
            return open(path).read()

428
    def get_line_string(self):
Michael Salim's avatar
Michael Salim committed
429
430
        recent_state = self.get_recent_state_str()
        app = self.application if self.application else self.direct_command
Michael Salim's avatar
Michael Salim committed
431
        return f' {str(self.pk):36} | {self.name:26} | {self.workflow:26} | {app:26} | {recent_state}'
Michael Salim's avatar
Michael Salim committed
432
433
434
435
436
437
438

    def runtime_str(self):
        if self.runtime_seconds == 0: return ''
        minutes, seconds = divmod(self.runtime_seconds, 60)
        hours, minutes = divmod(minutes, 60)
        if hours: return f"{hours:02d} hr : {minutes:02d} min : {seconds:02d} sec"
        else: return f"{minutes:02d} min : {seconds:02d} sec"
439
440
441

    @staticmethod
    def get_header():
Michael Salim's avatar
Michael Salim committed
442
        return f' {"job_id":36} | {"name":26} | {"workflow":26} | {"application":26} | {"latest update"}'
443

Michael Salim's avatar
Michael Salim committed
444
445
446
447
    def create_working_path(self):
        top = settings.BALSAM_WORK_DIRECTORY
        if self.workflow:
            top = os.path.join(top, self.workflow)
Michael Salim's avatar
Michael Salim committed
448
449
        name = self.name.strip().replace(' ', '_')
        name += '_' + str(self.pk)[:8]
Michael Salim's avatar
Michael Salim committed
450
        path = os.path.join(top, name)
451
452

        if os.path.exists(path):
Michael Salim's avatar
Michael Salim committed
453
454
            jid = str(self.pk)[8:]
            path += jid[0]
455
456
457
458
459
            i = 1
            while os.path.exists(path):
                path += jid[i]
                i += 1
                
Michael Salim's avatar
Michael Salim committed
460
461
462
        os.makedirs(path)
        self.working_directory = path
        self.save(update_fields=['working_directory'])
463
464
465
        return path

    def serialize(self):
Michael Salim's avatar
Michael Salim committed
466
467
468
469
470
        pass

    @classmethod
    def deserialize(cls, serial_data):
        pass
jtchilders's avatar
jtchilders committed
471
472


473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
class ApplicationDefinition(models.Model):
    ''' application definition, each DB entry is a task that can be run
        on the local resource. '''
    name = models.TextField(
        'Application Name',
        help_text='The name of an application that can be run locally.',
        default='')
    description = models.TextField(
        'Application Description',
        help_text='A description of the application.',
        default='')
    executable = models.TextField(
        'Executable',
        help_text='The executable and path need to run this application on the local system.',
        default='')
    default_preprocess = models.TextField(
        'Preprocessing Script',
        help_text='A script that is run in a job working directory prior to submitting the job to the queue.',
        default='')
    default_postprocess = models.TextField(
        'Postprocessing Script',
        help_text='A script that is run in a job working directory after the job has completed.',
        default='')
Michael Salim's avatar
Michael Salim committed
496
497
498
499
    environ_vars = models.TextField(
        'Environment variables specific to this application',
        help_text="Colon-separated list of envs like VAR1=value2:VAR2=value2",
        default='')
500
501

    def __str__(self):
Michael Salim's avatar
Michael Salim committed
502
503
504
505
506
507
508
509
510
511
512
        return f'''
Application:
------------
PK:             {self.pk}
Name:           {self.name}
Description:    {self.description}
Executable:     {self.executable}
Preprocess:     {self.default_preprocess}
Postprocess:    {self.default_postprocess}
Envs:           {self.environ_vars}
'''.strip() + '\n'
513
514

    def get_line_string(self):
Michael Salim's avatar
Michael Salim committed
515
516
517
518
        format = ' %20s | %20s | %20s | %20s | %s '
        output = format % (self.name, self.executable,
                           self.default_preprocess, 
                           self.default_postprocess,
519
520
521
522
523
                           self.description)
        return output

    @staticmethod
    def get_header():
Michael Salim's avatar
Michael Salim committed
524
525
        format = ' %20s | %20s | %20s | %20s | %s '
        output = format % ('name', 'executable',
526
527
528
                           'preprocess', 'postprocess',
                           'description')
        return output
529
530
531
    
    @property
    def cute_id(self):
532
        return f"[{self.name} | { str(self.pk)[:8] }]"