models.py 21 KB
Newer Older
1
2
3
import os
import json
import logging
4
import re
5
import sys
Michael Salim's avatar
Michael Salim committed
6
from datetime import datetime
7
8
import uuid

9
from django.core.exceptions import ValidationError,ObjectDoesNotExist
jtchilders's avatar
jtchilders committed
10
11
from django.conf import settings
from django.db import models
12
from concurrency.fields import IntegerVersionField
13
from concurrency.exceptions import RecordModifiedError
jtchilders's avatar
jtchilders committed
14

15
logger = logging.getLogger('balsam.service.models')
jtchilders's avatar
jtchilders committed
16

17
18
class InvalidStateError(ValidationError): pass
class InvalidParentsError(ValidationError): pass
19
class NoApplication(Exception): pass
20

Michael Salim's avatar
Michael Salim committed
21
22
TIME_FMT = '%m-%d-%Y %H:%M:%S'

23
24
STATES = '''
CREATED
Michael Salim's avatar
Michael Salim committed
25
26
LAUNCHER_QUEUED
AWAITING_PARENTS
27
28
READY

Michael Salim's avatar
Michael Salim committed
29
30
STAGED_IN
PREPROCESSED
31
32
33
34

RUNNING
RUN_DONE

Michael Salim's avatar
Michael Salim committed
35
POSTPROCESSED
36
37
38
39
40
41
42
JOB_FINISHED

RUN_TIMEOUT
RUN_ERROR
RESTART_READY

FAILED
Michael Salim's avatar
Michael Salim committed
43
USER_KILLED
44
45
46
47
PARENT_KILLED'''.split()

ACTIVE_STATES = '''
RUNNING
Michael Salim's avatar
Michael Salim committed
48
'''.split()
49
50
51

PROCESSABLE_STATES = '''
CREATED
Michael Salim's avatar
Michael Salim committed
52
53
LAUNCHER_QUEUED
AWAITING_PARENTS
54
READY
Michael Salim's avatar
Michael Salim committed
55
STAGED_IN
56
RUN_DONE
Michael Salim's avatar
Michael Salim committed
57
POSTPROCESSED
58
59
RUN_TIMEOUT
RUN_ERROR
Michael Salim's avatar
Michael Salim committed
60
'''.split()
61
62

RUNNABLE_STATES = '''
Michael Salim's avatar
Michael Salim committed
63
64
65
PREPROCESSED
RESTART_READY
'''.split()
66
67
68
69

END_STATES = '''
JOB_FINISHED
FAILED
Michael Salim's avatar
Michael Salim committed
70
USER_KILLED
71
PARENT_KILLED'''.split()
72
73
74
75
76
77
78
79
80
81
82
83
        
STATE_TIME_PATTERN = re.compile(r'''
^                  # start of line
\[                 # opening square bracket
(\d+-\d+-\d\d\d\d  # date MM-DD-YYYY
\s+                # one or more space
\d+:\d+:\d+)       # time HH:MM:SS
\s+                # one or more space
(\w+)              # state
\s*                # 0 or more space
\]                 # closing square bracket
''', re.VERBOSE | re.MULTILINE)
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

def assert_disjoint():
    groups = [ACTIVE_STATES, PROCESSABLE_STATES, RUNNABLE_STATES, END_STATES]
    joined = [state for g in groups for state in g]
    assert len(joined) == len(set(joined)) == len(STATES)
    assert set(joined) == set(STATES) 
    from itertools import combinations
    for g1,g2 in combinations(groups, 2):
        s1,s2 = set(g1), set(g2)
        assert s1.intersection(s2) == set()
assert_disjoint()

def validate_state(value):
    if value not in STATES:
        raise InvalidStateError(f"{value} is not a valid state in balsam.models")

def get_time_string():
Michael Salim's avatar
Michael Salim committed
101
102
103
104
    return datetime.now().strftime(TIME_FMT)

def from_time_string(s):
    return datetime.strptime(s, TIME_FMT)
105
106

def history_line(state='CREATED', message=''):
107
    return f"\n[{get_time_string()} {state}] ".rjust(46) + message
jtchilders's avatar
jtchilders committed
108
109


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class BalsamJob(models.Model):
    ''' A DB representation of a Balsam Job '''

    version = IntegerVersionField() # optimistic lock

    job_id = models.UUIDField(
        primary_key=True,
        default=uuid.uuid4,
        editable=False)
    allowed_work_sites = models.TextField(
        'Allowed Work Sites',
        help_text='Name of the Balsam instance(s) where this job can run.',
        default='')
    work_site = models.TextField(
        'Actual work site',
        help_text='Name of the Balsam instance that handled this job.',
        default='')

    workflow = models.TextField(
        'Workflow Name',
        help_text='Name of the workflow to which this job belongs',
        default='')
    name = models.TextField(
        'Job Name',
        help_text='A name for the job given by the user.',
        default='')
    description = models.TextField(
        'Job Description',
        help_text='A description of the job.',
        default='')

    working_directory = models.TextField(
        'Local Job Directory',
        help_text='Local working directory where job files are stored.',
        default='')
    parents = models.TextField(
        'IDs of the parent jobs which must complete prior to the start of this job.',
        default='[]')

    input_files = models.TextField(
        'Input File Patterns',
Michael Salim's avatar
Michael Salim committed
151
        help_text="Space-delimited filename patterns that will be searched in the parents'"\
152
153
154
155
        "working directories. Every matching file will be made available in this"\
        "job's working directory (symlinks for local Balsam jobs, file transfer for"\
        "remote Balsam jobs). Default: all files from parent jobs are made available.",
        default='*')
Michael Salim's avatar
Michael Salim committed
156
    stage_in_url = models.TextField(
157
158
        'External stage in files or folders', help_text="A list of URLs for external data to be staged in prior to job processing. Job dataflow from parents to children is NOT handled here; see `input_files` field instead.",
        default='')
159
    stage_out_files = models.TextField(
160
161
162
        'External stage out files or folders',
        help_text="A string of filename patterns. Matches will be transferred to the stage_out_url. Default: no files are staged out",
        default='')
Michael Salim's avatar
Michael Salim committed
163
    stage_out_url = models.TextField(
164
165
166
167
        'Stage Out URL',
        help_text='The URLs to which designated stage out files are sent.',
        default='')

Michael Salim's avatar
Michael Salim committed
168
    wall_time_minutes = models.IntegerField(
169
170
171
172
173
174
175
176
177
178
        'Job Wall Time in Minutes',
        help_text='The number of minutes the job is expected to take',
        default=1)
    runtime_seconds = models.FloatField(
        'Measured Job Execution Time in seconds',
        help_text='The actual elapsed runtime of the job, measured by launcher.',
        default=0.0)
    num_nodes = models.IntegerField(
        'Number of Compute Nodes',
        help_text='The number of compute nodes requested for this job.',
Michael Salim's avatar
Michael Salim committed
179
        default=1)
180
    ranks_per_node = models.IntegerField(
181
182
        'Number of ranks per node',
        help_text='The number of MPI ranks per node to schedule for this job.',
Michael Salim's avatar
Michael Salim committed
183
        default=1)
184
185
186
187
188
189
190
191
    threads_per_rank = models.IntegerField(
        'Number of threads per MPI rank',
        help_text='The number of OpenMP threads per MPI rank (if applicable)',
        default=1)
    threads_per_core = models.IntegerField(
        'Number of hyperthreads per physical core (if applicable)',
        help_text='Number of hyperthreads per physical core.',
        default=1)
Michael Salim's avatar
Michael Salim committed
192
193
194
195
    environ_vars = models.TextField(
        'Environment variables specific to this job',
        help_text="Colon-separated list of envs like VAR1=value1:VAR2=value2",
        default='')
196
    
Michael Salim's avatar
Michael Salim committed
197
    scheduler_id = models.TextField(
198
        'Scheduler ID',
Michael Salim's avatar
Michael Salim committed
199
200
        help_text='Scheduler ID (if job assigned by metascheduler)',
        default='')
201
202
203
204
205

    application = models.TextField(
        'Application to Run',
        help_text='The application to run; located in Applications database',
        default='')
Michael Salim's avatar
Michael Salim committed
206
207
208
209
210
211
212
213
214
215
216
217
218
    application_args = models.TextField(
        'Command-line args to the application exe',
        help_text='Command line arguments used by the Balsam job runner',
        default='')

    direct_command = models.TextField(
        'Command line to execute (specified with balsam qsub <args> <command>)',
        help_text="Instead of creating BalsamJobs that point to a pre-defined "
        "application, users can directly add jobs consisting of a single command "
        "line with `balsam qsub`.  This direct command is then invoked by the  "
        "Balsam job launcher.",
        default='')

219
220
221
    preprocess = models.TextField(
        'Preprocessing Script',
        help_text='A script that is run in a job working directory prior to submitting the job to the queue.'
Michael Salim's avatar
Michael Salim committed
222
        ' If blank, will default to the default_preprocess script defined for the application.',
223
224
225
226
        default='')
    postprocess = models.TextField(
        'Postprocessing Script',
        help_text='A script that is run in a job working directory after the job has completed.'
Michael Salim's avatar
Michael Salim committed
227
        ' If blank, will default to the default_postprocess script defined for the application.',
228
        default='')
229
230
231
    wait_for_parents = models.BooleanField(
            'If True, do not process this job until parents are FINISHED',
            default=True)
Michael Salim's avatar
Michael Salim committed
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
    post_error_handler = models.BooleanField(
        'Let postprocesser try to handle RUN_ERROR',
        help_text='If true, the postprocessor will be invoked for RUN_ERROR jobs'
        ' and it is up to the script to handle error and update job state.',
        default=False)
    post_timeout_handler = models.BooleanField(
        'Let postprocesser try to handle RUN_TIMEOUT',
        help_text='If true, the postprocessor will be invoked for RUN_TIMEOUT jobs'
        ' and it is up to the script to handle timeout and update job state.',
        default=False)
    auto_timeout_retry = models.BooleanField(
        'Automatically restart jobs that have timed out',
        help_text="If True and post_timeout_handler is False, then jobs will "
        "simply be marked RESTART_READY upon timing out.",
        default=True)
247
248
249
250
251
252
253
254
255
256

    state = models.TextField(
        'Job State',
        help_text='The current state of the job.',
        default='CREATED',
        validators=[validate_state])
    state_history = models.TextField(
        'Job State History',
        help_text="Chronological record of the job's states",
        default=history_line)
257
258

    def _save_direct(self, force_insert=False, force_update=False, using=None, 
259
260
261
262
263
264
             update_fields=None):
        '''Override default Django save to ensure version always updated'''
        if update_fields is not None: 
            update_fields.append('version')
        if self._state.adding:
            update_fields = None
265
266
267
        models.Model.save(self, force_insert, force_update, using, update_fields)

    def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
268
        if settings.SAVE_CLIENT is None:
269
            logger.info(f"direct save of {self.cute_id}")
270
271
            self._save_direct(force_insert, force_update, using, update_fields)
        else:
272
            settings.SAVE_CLIENT.save(self, force_insert, force_update, using, update_fields)
273
            self.refresh_from_db()
274
275
276
277
278

    @staticmethod
    def from_dict(d):
        job = BalsamJob()
        SERIAL_FIELDS = [f for f in job.__dict__ if f not in
279
                '_state force_insert force_update using update_fields'.split()
280
281
282
283
                ]

        if type(d['job_id']) is str:
            d['job_id'] = uuid.UUID(d['job_id'])
284
285
286
        else:
            assert d['job_id'] is None
            d['job_id'] = job.job_id
287
288
289

        for field in SERIAL_FIELDS:
            job.__dict__[field] = d[field]
290
291

        assert type(job.job_id) == uuid.UUID
292
        return job
293

294
295

    def __str__(self):
Michael Salim's avatar
Michael Salim committed
296
297
298
        return f'''
Balsam Job
----------
Michael Salim's avatar
Michael Salim committed
299
ID:                     {self.pk}
Michael Salim's avatar
Michael Salim committed
300
name:                   {self.name} 
301
workflow:               {self.workflow}
Michael Salim's avatar
Michael Salim committed
302
303
304
305
latest state:           {self.get_recent_state_str()}
description:            {self.description[:80]}
work site:              {self.work_site} 
allowed work sites:     {self.allowed_work_sites}
306
307
308
working_directory:      {self.working_directory}
parents:                {self.parents}
input_files:            {self.input_files}
Michael Salim's avatar
Michael Salim committed
309
310
stage_in_url:           {self.stage_in_url}
stage_out_url:          {self.stage_out_url}
311
312
stage_out_files:        {self.stage_out_files}
wall_time_minutes:      {self.wall_time_minutes}
Michael Salim's avatar
Michael Salim committed
313
actual_runtime:         {self.runtime_str()}
314
num_nodes:              {self.num_nodes}
Michael Salim's avatar
Michael Salim committed
315
316
threads per rank:       {self.threads_per_rank}
threads per core:       {self.threads_per_core}
317
ranks_per_node:         {self.ranks_per_node}
Michael Salim's avatar
Michael Salim committed
318
scheduler_id:           {self.scheduler_id}
Michael Salim's avatar
Michael Salim committed
319
320
321
322
323
324
325
326
327
328
329
application:            {self.application if self.application else 
                            self.direct_command}
args:                   {self.application_args}
envs:                   {self.environ_vars}
created with qsub:      {bool(self.direct_command)}
preprocess override:    {self.preprocess}
postprocess override:   {self.postprocess}
post handles error:     {self.post_error_handler}
post handles timeout:   {self.post_timeout_handler}
auto timeout retry:     {self.auto_timeout_retry}
'''.strip() + '\n'
Michael Salim's avatar
Michael Salim committed
330
    
331
332
333
334
335
336
337
338

    def get_parents_by_id(self):
        return json.loads(self.parents)

    def get_parents(self):
        parent_ids = self.get_parents_by_id()
        return BalsamJob.objects.filter(job_id__in=parent_ids)

339
340
341
342
    @property
    def num_ranks(self):
        return self.num_nodes * self.ranks_per_node

343
344
    @property
    def cute_id(self):
345
346
347
348
        if self.name:
            return f"[{self.name} | { str(self.pk)[:8] }]"
        else:
            return f"[{ str(self.pk)[:8] }]"
349
350
351
352
    
    @staticmethod
    def short_exe(exe):
        return " ".join(os.path.basename(p) for p in exe.split())
353

354
355
356
    @property
    def app_cmd(self):
        if self.application:
357
            app = ApplicationDefinition.objects.get(name=self.application)
358
            line = f"{app.executable} {self.application_args}"
359
        else:
360
            line = f"{self.direct_command} {self.application_args}"
361
        return ' '.join(os.path.expanduser(w) for w in line.split())
362

Michael Salim's avatar
Michael Salim committed
363
364
365
366
367
368
369
    def get_children(self):
        return BalsamJob.objects.filter(parents__icontains=str(self.pk))

    def get_children_by_id(self):
        children = self.get_children()
        return [c.pk for c in children]

370
371
372
373
374
375
376
377
378
    def get_child_by_name(self, name):
        children = self.get_children().filter(name=name)
        if children.count() == 0:
            raise ValueError(f"No child named {name}")
        elif children.count() > 1:
            raise ValueError(f"More than one child named {name}")
        else:
            return children.first()

379
380
381
382
383
384
385
386
    def set_parents(self, parents):
        try:
            parents_list = list(parents)
        except:
            raise InvalidParentsError("Cannot convert input to list")
        for i, parent in enumerate(parents_list):
            pk = parent.pk if isinstance(parent,BalsamJob) else parent
            if not BalsamJob.objects.filter(pk=pk).exists():
387
                raise InvalidParentsError(f"Job PK {pk} is not in the BalsamJob DB")
388
389
390
391
            parents_list[i] = str(pk)
        self.parents = json.dumps(parents_list)
        self.save(update_fields=['parents'])

Michael Salim's avatar
Michael Salim committed
392
    def get_application(self):
393
394
395
396
        if self.application:
            return ApplicationDefinition.objects.get(name=self.application)
        else:
            raise NoApplication
Michael Salim's avatar
Michael Salim committed
397
398
399
400
401
402
403
404
405

    @staticmethod
    def parse_envstring(s):
        result = {}
        entries = s.split(':')
        entries = [e.split('=') for e in entries]
        return {variable:value for (variable,value) in entries}

    def get_envs(self, *, timeout=False, error=False):
406
        keywords = 'BALSAM DJANGO PYTHON'.split()
407
408
        envs = {var:value for var,value in os.environ.items() 
                if any(keyword in var for keyword in keywords)}
409
410
411
412
        try:
            app = self.get_application()
        except NoApplication:
            app = None
Michael Salim's avatar
Michael Salim committed
413
414
415
416
417
418
419
420
421
422
423
424
425
426
        if app and app.environ_vars:
            app_vars = self.parse_envstring(app.environ_vars)
            envs.update(app_vars)
        if self.environ_vars:
            job_vars = self.parse_envstring(self.environ_vars)
            envs.update(job_vars)
    
        children = self.get_children_by_id()
        children = json.dumps([str(c) for c in children])
        balsam_envs = dict(
            BALSAM_JOB_ID=str(self.pk),
            BALSAM_PARENT_IDS=str(self.parents),
            BALSAM_CHILD_IDS=children,
        )
427
428
429
430

        if self.threads_per_rank > 1:
            balsam_envs['OMP_NUM_THREADS'] = self.threads_per_rank

431
432
        if timeout: balsam_envs['BALSAM_JOB_TIMEOUT']="TRUE"
        if error: balsam_envs['BALSAM_JOB_ERROR']="TRUE"
Michael Salim's avatar
Michael Salim committed
433
434
435
        envs.update(balsam_envs)
        return envs

436
    def update_state(self, new_state, message='',using=None):
437
438
439
        if new_state not in STATES:
            raise InvalidStateError(f"{new_state} is not a job state in balsam.models")

440
441
442
        # If already exists
        if not self._state.adding:
            self.refresh_from_db()
443
444
        if self.state == 'USER_KILLED': return

445
446
        self.state_history += history_line(new_state, message)
        self.state = new_state
447
448
449
450
451
452
453
454
455
456
457
458
        try:
            self.save(update_fields=['state', 'state_history'],using=using)
        except RecordModifiedError:
            self.refresh_from_db()
            if self.state == 'USER_KILLED' and new_state != 'USER_KILLED':
                return
            elif new_state == 'USER_KILLED':
                self.state_history += history_line(new_state, message)
                self.state = new_state
                self.save(update_fields=['state', 'state_history'],using=using)
            else:
                raise
459

Michael Salim's avatar
Michael Salim committed
460
    def get_recent_state_str(self):
461
        return self.state_history.split("\n")[-1].strip()
Michael Salim's avatar
Michael Salim committed
462

463
464
465
466
    def read_file_in_workdir(self, fname):
        work_dir = self.working_directory
        path = os.path.join(work_dir, fname)
        if not os.path.exists(path):
467
            raise ValueError(f"{fname} not found in working directory of {self.cute_id}")
468
469
470
        else:
            return open(path).read()

471
    def get_line_string(self):
Michael Salim's avatar
Michael Salim committed
472
473
        recent_state = self.get_recent_state_str()
        app = self.application if self.application else self.direct_command
474
475
        app = self.short_exe(app)
        return f' {str(self.pk):36} | {self.name:20} | {self.workflow:20} | {app:36} | {recent_state}'
Michael Salim's avatar
Michael Salim committed
476
477
478
479

    def runtime_str(self):
        minutes, seconds = divmod(self.runtime_seconds, 60)
        hours, minutes = divmod(minutes, 60)
480
481
482
        hours, minutes = round(hours), round(minutes)
        if hours: return f"{hours:02d} hr : {minutes:02d} min : {seconds:05.2f} sec"
        else: return f"{minutes:02d} min : {seconds:05.2f} sec"
483
484
485

    @staticmethod
    def get_header():
486
487
488
489
490
491
492
        return f' {"job_id":36} | {"name":20} | {"workflow":20} | {"application":36} | {"latest update"}'

    def get_state_times(self):
        matches = STATE_TIME_PATTERN.findall(self.state_history)
        return {state: datetime.strptime(timestr, TIME_FMT)
                for timestr, state in matches
               }
493

Michael Salim's avatar
Michael Salim committed
494
495
496
497
    def create_working_path(self):
        top = settings.BALSAM_WORK_DIRECTORY
        if self.workflow:
            top = os.path.join(top, self.workflow)
Michael Salim's avatar
Michael Salim committed
498
499
        name = self.name.strip().replace(' ', '_')
        name += '_' + str(self.pk)[:8]
Michael Salim's avatar
Michael Salim committed
500
        path = os.path.join(top, name)
501
502

        if os.path.exists(path):
Michael Salim's avatar
Michael Salim committed
503
504
            jid = str(self.pk)[8:]
            path += jid[0]
505
506
507
508
509
            i = 1
            while os.path.exists(path):
                path += jid[i]
                i += 1
                
Michael Salim's avatar
Michael Salim committed
510
511
512
        os.makedirs(path)
        self.working_directory = path
        self.save(update_fields=['working_directory'])
513
514
        return path

515
    def to_dict(self):
516
        SERIAL_FIELDS = [f for f in self.__dict__ if f not in ['_state']]
517
518
519
520
521
522
        d = {field : self.__dict__[field] for field in SERIAL_FIELDS}
        return d

    def serialize(self, **kwargs):
        d = self.to_dict()
        d.update(kwargs)
523
524
525
526
527
        if type(self.job_id) == uuid.UUID:
            d['job_id'] = str(self.job_id)
        else:
            assert self.job_id == d['job_id'] == None

528
529
        serial_data = json.dumps(d)
        return serial_data
Michael Salim's avatar
Michael Salim committed
530
531
532

    @classmethod
    def deserialize(cls, serial_data):
533
534
535
536
537
538
        if type(serial_data) is bytes:
            serial_data = serial_data.decode('utf-8')
        if type(serial_data) is str:
            serial_data = json.loads(serial_data)
        job = BalsamJob.from_dict(serial_data)
        return job
jtchilders's avatar
jtchilders committed
539

540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
class ApplicationDefinition(models.Model):
    ''' application definition, each DB entry is a task that can be run
        on the local resource. '''
    name = models.TextField(
        'Application Name',
        help_text='The name of an application that can be run locally.',
        default='')
    description = models.TextField(
        'Application Description',
        help_text='A description of the application.',
        default='')
    executable = models.TextField(
        'Executable',
        help_text='The executable and path need to run this application on the local system.',
        default='')
    default_preprocess = models.TextField(
        'Preprocessing Script',
        help_text='A script that is run in a job working directory prior to submitting the job to the queue.',
        default='')
    default_postprocess = models.TextField(
        'Postprocessing Script',
        help_text='A script that is run in a job working directory after the job has completed.',
        default='')
Michael Salim's avatar
Michael Salim committed
563
564
565
566
    environ_vars = models.TextField(
        'Environment variables specific to this application',
        help_text="Colon-separated list of envs like VAR1=value2:VAR2=value2",
        default='')
567
568

    def __str__(self):
Michael Salim's avatar
Michael Salim committed
569
570
571
572
573
574
575
576
577
578
579
        return f'''
Application:
------------
PK:             {self.pk}
Name:           {self.name}
Description:    {self.description}
Executable:     {self.executable}
Preprocess:     {self.default_preprocess}
Postprocess:    {self.default_postprocess}
Envs:           {self.environ_vars}
'''.strip() + '\n'
580
581

    def get_line_string(self):
582
583
584
585
586
587
588
        format = ' %20s | %30s | %20s | %20s | %s '
        output = format % (self.name,
                           self.short_exe(self.executable),
                           self.short_exe(self.default_preprocess),
                           self.short_exe(self.default_postprocess),
                           self.description
                          )
589
590
591
592
        return output

    @staticmethod
    def get_header():
593
        format = ' %20s | %30s | %20s | %20s | %s '
Michael Salim's avatar
Michael Salim committed
594
        output = format % ('name', 'executable',
595
596
597
                           'preprocess', 'postprocess',
                           'description')
        return output
598
599
600
    
    @property
    def cute_id(self):
601
        return f"[{self.name} | { str(self.pk)[:8] }]"
602
603
604
605

    @staticmethod
    def short_exe(exe):
        return " ".join(os.path.basename(p) for p in exe.split())