AlpsBridge.py 8.08 KB
Newer Older
1
2
3
4
5
6
7
"""Bridge for communicating and fetching system state in ALPS."""

#this requires forking off a apbasil process, sending XML to stdin and
#parsing XML from stdout. Synchyronous, however, the system script
#forker isn't.  These will be able to block for now.

import logging
8
import xml.etree
9
10
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
11
12
13
from Cobalt.Proxy import ComponentProxy
from Cobalt.Data import IncrID
from Cobalt.Util import sleep
14
from Cobalt.Util import init_cobalt_config, get_config_option
15
from Cobalt.Util import compact_num_list, expand_num_list
16
17

_logger = logging.getLogger()
18
init_cobalt_config()
19

20
FORKER = get_config_option('alps', 'forker', 'system_script_forker')
21
22
BASIL_PATH = get_config_option('alps', 'basil',
                               '/home/richp/alps-simulator/apbasil.sh')
23
24

_RUNID_GEN = IncrID()
25
26
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
                                              1.0))
27
DEFAULT_DEPTH = int(get_config_option('alps', 'default_depth', 72))
28
29

class BridgeError(Exception):
30
    '''Exception class so that we may easily recognize bridge-specific errors.'''
31
32
    pass

33
34
35
36
37
38
def init_bridge():
    '''Initialize the bridge.  This includes purging all old bridge messages
    from the system_script_forker.  On restart or reinitialization, these old
    children should be considered invalid and purged.

    '''
39
    forker = ComponentProxy(FORKER, defer=True)
40
41
42
43
44
45
46
47
48
    try:
        stale_children = forker.get_children('apbridge', None)
        forker.cleanup_children([int(child['id']) for child in stale_children])
    except Exception:
        _logger.error('Unable to clear children from prior runs.  Init failed.',
                exc_info=True)
        raise BridgeError('Bridge initialization failed.')
    return

49
def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
50
51

    '''reserve a set of nodes in ALPS'''
52
53
    if attributes is None:
        attributes = {}
54
    params = {}
55
56
    param_attrs = {}

57
58
    params['user_name'] = user
    params['batch_id'] = jobid
59
60
61
    param_attrs['width'] = attributes.get('width', nodecount * DEFAULT_DEPTH)
    param_attrs['depth'] = attributes.get('depth', None)
    param_attrs['nppn'] = attributes.get('nppn', DEFAULT_DEPTH)
62
63
64
65
66
67
68
69
70
71
    param_attrs['npps'] = attributes.get('nnps', None)
    param_attrs['nspn'] = attributes.get('nspn', None)
    param_attrs['reservation_mode'] = attributes.get('reservation_mode',
                                                     'EXCLUSIVE')
    param_attrs['nppcu'] = attributes.get('nppcu', None)
    param_attrs['p-state'] = attributes.get('p-state', None)
    param_attrs['p-govenor'] = attributes.get('p-govenor', None)
    for key, val in param_attrs.items():
        if val is not None:
            params[key] = val
72
    if node_id_list is not None:
73
        params['node_list'] = [int(i) for i in node_id_list]
74
75
76
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
        params=params)))
    return retval
77

78
def release(alps_res_id):
79
80
81
    '''release a set of nodes in an ALPS reservation.  May be used on a
    reservation with running jobs.  If that occurs, the reservation will be
    released when the jobs exit/are terminated.
82

83
84
85
86
87
88
89
90
91
92
93
94
95
96
    Input:
        alps_res_id - id of the ALPS reservation to release.

    Returns:
        True if relese was successful

    Side Effects:
        ALPS reservation will be released.  New aprun attempts agianst
        reservation will fail.

    Exceptions:
        None Yet

    '''
97
    params = {'reservation_id': alps_res_id}
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RELEASE',
            params=params)))
    return retval

def confirm(alps_res_id, pg_id):
    '''confirm an ALPS reservation.  Call this after we have the process group
    id of the user job that we are reserving nodes for.

    Input:
        alps_res_id - The id of the reservation that is being confirmed.
        pg_id - process group id to bind the reservation to.

    Return:
        True if the reservation is confirmed.  False otherwise.

    Side effects:
        None

    Exceptions:
        None Yet.
    '''
    params = {'pagg_id': pg_id,
Paul Rich's avatar
Paul Rich committed
120
              'reservation_id': alps_res_id}
121
122
123
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
            params=params)))
    return retval
124

Paul Rich's avatar
Paul Rich committed
125
126
127
128
129
130
131
def system():
    '''fetch system information using the SYSTEM query.  Provides memory
    information'''
    params = {}
    req = BasilRequest('QUERY', 'SYSTEM', params)
    return _call_sys_forker(BASIL_PATH, str(req))

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def fetch_inventory(changecount=None, resinfo=False):
    '''fetch the inventory for the machine

        changecount -  changecount to send if we only want deltas past a certain
        point
        resinfo - also fetch information on current reservations

        return:
        dictionary of machine information parsed from XML response
    '''
    params = {}
    if changecount is not None:
        params['changecount'] = changecount
    if resinfo:
        params['resinfo'] = True
147
    #TODO: add a flag for systems with version <=1.4 of ALPS
148
    req = BasilRequest('QUERY', 'INVENTORY', params)
149
    #print str(req)
150
151
    return _call_sys_forker(BASIL_PATH, str(req))

152
153
154
155
156
157
158
def fetch_reservations():
    '''fetch reservation data.  This includes reservation metadata but not the
    reserved nodes.

    '''
    params = {'resinfo': True, 'nonodes' : True}
    req = BasilRequest('QUERY', 'INVENTORY', params)
159
160
    return _call_sys_forker(BASIL_PATH, str(req))

161
def reserved_nodes():
162
163
164
165
    params = {}
    req = BasilRequest('QUERY', 'RESERVEDNODES', params)
    return _call_sys_forker(BASIL_PATH, str(req))

166
167
168
169
170
def fetch_aggretate_reservation_data():
    '''correlate node and reservation data to get which nodes are in which
    reservation.

    '''
Paul Rich's avatar
Paul Rich committed
171
    pass
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189

def extract_system_node_data(node_data):
    ret_nodeinfo = {}
    for node_info in node_data['nodes']:
        #extract nodeids, construct from bulk data block...
        for node_id in expand_num_list(node_info['node_ids']):
            node = {}
            node['node_id'] = node_id
            node['state'] = node_info['state']
            node['role'] = node_info['role']
            node['attrs'] = node_info
            ret_nodeinfo[str(node_id)] = node
        del node_info['state']
        del node_info['node_ids']
        del node_info['role']
    return ret_nodeinfo


190
191
192
193
194
195
196
def _call_sys_forker(basil_path, in_str):
    '''take a parameter dictionary and make appropriate call through to BASIL
    wait until we get output and clean up child info.'''

    runid = None #_RUNID_GEN.next()i
    resp = None
    try:
197
        child = ComponentProxy(FORKER).fork([basil_path], 'apbridge',
198
199
200
201
202
203
                'alps', None, None, runid, in_str, True)
        runid = child
    except Exception:
        _logger.critical("error communicating with bridge", exc_info=True)
        raise

204
    while True:
205
206
207
208
209
210
211
212
213
214
215
216
217
218
        #Is a timeout needed here?
        children = ComponentProxy(FORKER).get_children('apbridge', [runid])
        complete = False
        for child in children:
            if child['complete']:
                if child['exit_status'] != 0:
                    _logger.error("BASIL returned a status of %s",
                            child['exit_status'])
                resp = child['stdout_string']
                ComponentProxy(FORKER).cleanup_children([runid])
                complete = True
        if complete:
            break
        sleep(CHILD_SLEEP_TIMEOUT)
219

220
221
222
223
224
225
226
    parsed_resp = {}
    try:
        parsed_resp = parse_response(resp)
    except xml.etree.ElementTree.ParseError as exc:
        _logger.error('Error parsing response "%s"', resp)
        raise exc
    return parsed_resp
227
228

def print_node_names(spec):
229
    '''Debugging utility to print nodes returned by ALPS'''
230
231
232
233
234
235
    print spec['reservations']
    print spec['nodes'][0]
    for node in spec['nodes']:
        print node['name']

if __name__ == '__main__':
236
    #print_node_names(fetch_inventory(resinfo=True))
237

238
239
240
241
242
    # print fetch_inventory(changecount=0)
    # print extract_system_node_data(system())
    # print fetch_reserved_nodes()
    # print fetch_inventory(resinfo=True)
    print fetch_reservations()