AlpsBridge.py 5.74 KB
Newer Older
1
2
3
4
5
6
7
8
"""Bridge for communicating and fetching system state in ALPS."""

#this requires forking off a apbasil process, sending XML to stdin and
#parsing XML from stdout. Synchyronous, however, the system script
#forker isn't.  These will be able to block for now.

import logging

9
10
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
11
12
13
from Cobalt.Proxy import ComponentProxy
from Cobalt.Data import IncrID
from Cobalt.Util import sleep
14
15
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.Util import compact_num_list
16
17

_logger = logging.getLogger()
18
init_cobalt_config()
19

20
21
22
FORKER = get_config_option('alps', 'forker', 'system_script_forker')
BASIL_PATH = get_config_option('alps', 'basil_path',
                               '/home/richp/alps_simulator/apbasil.sh')
23
24

_RUNID_GEN = IncrID()
25
26
27
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
                                              1.0))
DEFAULT_DEPTH = int(get_config_option('alps', 'default_depth', 8))
28
29

class BridgeError(Exception):
30
    '''Exception class so that we may easily recognize bridge-specific errors.'''
31
32
    pass

33
def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
34
35

    '''reserve a set of nodes in ALPS'''
36
37
    if attributes is None:
        attributes = {}
38
    params = {}
39
40
    param_attrs = {}

41
42
    params['user_name'] = user
    params['batch_id'] = jobid
43
44
45
46
47
48
49
50
51
52
53
54
55
    param_attrs['width'] = attributes.get('width', nodecount)
    param_attrs['depth'] = attributes.get('depth', DEFAULT_DEPTH)
    param_attrs['nppn'] = attributes.get('nnpn', None)
    param_attrs['npps'] = attributes.get('nnps', None)
    param_attrs['nspn'] = attributes.get('nspn', None)
    param_attrs['reservation_mode'] = attributes.get('reservation_mode',
                                                     'EXCLUSIVE')
    param_attrs['nppcu'] = attributes.get('nppcu', None)
    param_attrs['p-state'] = attributes.get('p-state', None)
    param_attrs['p-govenor'] = attributes.get('p-govenor', None)
    for key, val in param_attrs.items():
        if val is not None:
            params[key] = val
56
    if node_id_list is not None:
57
        params['node_id_list'] = compact_num_list(node_id_list)
58
59
60
    print str(BasilRequest('RESERVE', params=params))
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
        params=params)))
61
    print str(retval)
62
    return retval
63

64
def release(alps_res_id):
65
66
67
    '''release a set of nodes in an ALPS reservation.  May be used on a
    reservation with running jobs.  If that occurs, the reservation will be
    released when the jobs exit/are terminated.
68

69
70
71
72
73
74
75
76
77
78
79
80
81
82
    Input:
        alps_res_id - id of the ALPS reservation to release.

    Returns:
        True if relese was successful

    Side Effects:
        ALPS reservation will be released.  New aprun attempts agianst
        reservation will fail.

    Exceptions:
        None Yet

    '''
83
    params = {'reservation_id': alps_res_id}
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RELEASE',
            params=params)))
    return retval

def confirm(alps_res_id, pg_id):
    '''confirm an ALPS reservation.  Call this after we have the process group
    id of the user job that we are reserving nodes for.

    Input:
        alps_res_id - The id of the reservation that is being confirmed.
        pg_id - process group id to bind the reservation to.

    Return:
        True if the reservation is confirmed.  False otherwise.

    Side effects:
        None

    Exceptions:
        None Yet.
    '''
    params = {'pagg_id': pg_id,
              'reservation': alps_res_id}
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
            params=params)))
    return retval
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142

def fetch_inventory(changecount=None, resinfo=False):
    '''fetch the inventory for the machine

        changecount -  changecount to send if we only want deltas past a certain
        point
        resinfo - also fetch information on current reservations

        return:
        dictionary of machine information parsed from XML response
    '''
    params = {}
    if changecount is not None:
        params['changecount'] = changecount
    if resinfo:
        params['resinfo'] = True
    req = BasilRequest('QUERY', 'INVENTORY', params)
    return _call_sys_forker(BASIL_PATH, str(req))

def _call_sys_forker(basil_path, in_str):
    '''take a parameter dictionary and make appropriate call through to BASIL
    wait until we get output and clean up child info.'''

    runid = None #_RUNID_GEN.next()i
    resp = None
    try:
        child = ComponentProxy(FORKER).fork([BASIL_PATH], 'apbridge',
                'alps', None, None, runid, in_str, True)
        runid = child
    except Exception:
        _logger.critical("error communicating with bridge", exc_info=True)
        raise

143
    while True:
144
145
146
147
148
149
150
151
152
153
154
155
156
157
        #Is a timeout needed here?
        children = ComponentProxy(FORKER).get_children('apbridge', [runid])
        complete = False
        for child in children:
            if child['complete']:
                if child['exit_status'] != 0:
                    _logger.error("BASIL returned a status of %s",
                            child['exit_status'])
                resp = child['stdout_string']
                ComponentProxy(FORKER).cleanup_children([runid])
                complete = True
        if complete:
            break
        sleep(CHILD_SLEEP_TIMEOUT)
158

159
160
161
    return parse_response(resp)

def print_node_names(spec):
162
    '''Debugging utility to print nodes returned by ALPS'''
163
164
165
166
167
168
169
    print spec['reservations']
    print spec['nodes'][0]
    for node in spec['nodes']:
        print node['name']

if __name__ == '__main__':
    print_node_names(fetch_inventory(resinfo=True))
170

171
172
    #print fetch_inventory(changecount=0)
    print reserve('richp', 42, 11)