dag.py 3.83 KB
Newer Older
1
2
3
'''Python API for Balsam DAG Manipulations

Example usage:
4
>>>     import balsamlauncher.dag as dag
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>>
>>>     output = open('expected_output').read()
>>>
>>>     if 'CONVERGED' not in output:
>>>         for child in dag.children:
>>>             dag.kill(child, recursive=True)
>>>
>>>         with open("data/input.dat", 'w') as fp:
>>>             fp.write("# a new input file here")
>>>
>>>         dag.spawn_child(clone=dag.current_job,
>>>             walltime_minutes=dag.current_job.walltime_minutes + 10, 
>>>             input_files = 'input.dat')
>>>
'''
    
Michael Salim's avatar
Michael Salim committed
21
22
import django as django
import os as os
23
24
25
26
27
28
29
import uuid 

__all__ = ['JOB_ID', 'TIMEOUT', 'ERROR', 
           'current_job', 'parents', 'children',
           'add_job', 'add_dependency', 'spawn_child',
           'kill']

Michael Salim's avatar
Michael Salim committed
30
31
os.environ['DJANGO_SETTINGS_MODULE'] = 'argobalsam.settings'
django.setup()
32
33
34
35
36
37
38

from balsam.models import BalsamJob as _BalsamJob

current_job = None
parents = None
children = None

Michael Salim's avatar
Michael Salim committed
39
40
_envs = {k:v for k,v in os.environ.items() if k.find('BALSAM')>=0}

41
42
43
44
45
46
JOB_ID = _envs.get('BALSAM_JOB_ID', '')
TIMEOUT = bool(_envs.get('BALSAM_JOB_TIMEOUT', False))
ERROR = bool(_envs.get('BALSAM_JOB_ERROR', False))

if JOB_ID:
    JOB_ID = uuid.UUID(JOB_ID)
Michael Salim's avatar
Michael Salim committed
47
48
49
50
51
52
53
54
55
    try:
        current_job = _BalsamJob.objects.get(pk=JOB_ID)
    except:
        raise RuntimeError(f"The environment specified current job: "
                           "BALSAM_JOB_ID {JOB_ID}\n but this does not "
                           "exist in DB! Was it deleted accidentally?")
    else:
        parents = current_job.get_parents()
        children = current_job.get_children()
56
57
58
59
60
61
62
63
64


def add_job(**kwargs):
    '''Add a new job to BalsamJob DB'''
    job = _BalsamJob()
    for k,v in kwargs.items():
        try:
            getattr(job, k)
        except AttributeError: 
Michael Salim's avatar
Michael Salim committed
65
            raise ValueError(f"Invalid field {k}")
66
67
68
69
70
        else:
            setattr(job, k, v)
    job.save()
    return job

Michael Salim's avatar
Michael Salim committed
71
def detect_circular(job, path=[]):
72
73
    if job.pk in path: return True
    path = path[:] + [job.pk]
Michael Salim's avatar
Michael Salim committed
74
    for parent in job.get_parents():
75
        if detect_circular(parent, path): return True
Michael Salim's avatar
Michael Salim committed
76
77
    return False

78
79
80
81
82
83
84
85
86
87
def add_dependency(parent,child):
    '''Create a dependency between two existing jobs'''
    if isinstance(parent, str): parent = uuid.UUID(parent)
    if isinstance(child, str): child = uuid.UUID(child)

    if not isinstance(parent, _BalsamJob): 
        parent = _BalsamJob.objects.get(pk=parent)
    if not isinstance(child, _BalsamJob): 
        child = _BalsamJob.objects.get(pk=child)

Michael Salim's avatar
Michael Salim committed
88
89
90
91
92
93
94
95
96
97
98
    existing_parents = child.get_parents_by_id()
    new_parents = existing_parents.copy()
    parent_pk_str = str(parent.pk)
    if parent_pk_str in existing_parents:
        raise RuntimeError("Dependency already exists; cannot double-create")
    else:
        new_parents.append(parent_pk_str)
    child.set_parents(new_parents)
    if detect_circular(child):
        child.set_parents(existing_parents)
        raise RuntimeError("Detected circular dependency; not creating link")
99

100
def spawn_child(clone=False, **kwargs):
101
    '''Add new job that is dependent on the current job'''
Michael Salim's avatar
Michael Salim committed
102
103
    if not isinstance(current_job, _BalsamJob):
        raise RuntimeError("No current BalsamJob detected in environment")
104
105
106
107
108
109
110
111
112
113
114
115
116
117
    if clone:
        child = _BalsamJob.objects.get(pk=current_job.pk)
        child.pk = None
        for k,v in kwargs.items():
            try:
                getattr(child, k)
            except AttributeError: 
                raise ValueError(f"Invalid field {k}")
            else:
                setattr(child, k, v)
        child.save()
    else:
        child = add_job(**kwargs)

118
119
120
121
122
123
124
125
126
    add_dependency(current_job, child)
    return child

def kill(job, recursive=False):
    '''Kill a job or its entire subtree in the DAG'''
    job.update_state('USER_KILLED')
    if recursive:
        for child in job.get_children():
            kill(child, recursive=True)