AlpsBridge.py 15.1 KB
Newer Older
1
2
3
4
5
6
7
"""Bridge for communicating and fetching system state in ALPS."""

#this requires forking off a apbasil process, sending XML to stdin and
#parsing XML from stdout. Synchyronous, however, the system script
#forker isn't.  These will be able to block for now.

import logging
8
import xml.etree
9
import xmlrpclib
10
import json
11
12
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
13
14
15
from Cobalt.Proxy import ComponentProxy
from Cobalt.Data import IncrID
from Cobalt.Util import sleep
16
from Cobalt.Util import init_cobalt_config, get_config_option
17
from Cobalt.Util import compact_num_list, expand_num_list
18
19

_logger = logging.getLogger()
20
init_cobalt_config()
21

22
FORKER = get_config_option('alps', 'forker', 'system_script_forker')
23
24
BASIL_PATH = get_config_option('alps', 'basil',
                               '/home/richp/alps-simulator/apbasil.sh')
25
26
# Make sure that you have key and cert set for CAPMC operaition and paths are established in the exec environment
CAPMC_PATH = get_config_option('capmc', 'path', '/opt/cray/capmc/default/bin/capmc')
27
_RUNID_GEN = IncrID()
28
29
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
                                              1.0))
30
DEFAULT_DEPTH = int(get_config_option('alps', 'default_depth', 72))
31
32

class BridgeError(Exception):
33
    '''Exception class so that we may easily recognize bridge-specific errors.'''
34
35
    pass

36
37
38
39
40
41
def init_bridge():
    '''Initialize the bridge.  This includes purging all old bridge messages
    from the system_script_forker.  On restart or reinitialization, these old
    children should be considered invalid and purged.

    '''
42
    forker = ComponentProxy(FORKER, defer=True)
43
44
45
46
47
48
49
50
51
    try:
        stale_children = forker.get_children('apbridge', None)
        forker.cleanup_children([int(child['id']) for child in stale_children])
    except Exception:
        _logger.error('Unable to clear children from prior runs.  Init failed.',
                exc_info=True)
        raise BridgeError('Bridge initialization failed.')
    return

52
def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
53
54

    '''reserve a set of nodes in ALPS'''
55
56
    if attributes is None:
        attributes = {}
57
    params = {}
58
59
    param_attrs = {}

60
61
    params['user_name'] = user
    params['batch_id'] = jobid
62
63
64
    param_attrs['width'] = attributes.get('width', nodecount * DEFAULT_DEPTH)
    param_attrs['depth'] = attributes.get('depth', None)
    param_attrs['nppn'] = attributes.get('nppn', DEFAULT_DEPTH)
65
66
67
68
69
70
71
72
73
74
    param_attrs['npps'] = attributes.get('nnps', None)
    param_attrs['nspn'] = attributes.get('nspn', None)
    param_attrs['reservation_mode'] = attributes.get('reservation_mode',
                                                     'EXCLUSIVE')
    param_attrs['nppcu'] = attributes.get('nppcu', None)
    param_attrs['p-state'] = attributes.get('p-state', None)
    param_attrs['p-govenor'] = attributes.get('p-govenor', None)
    for key, val in param_attrs.items():
        if val is not None:
            params[key] = val
75
    if node_id_list is not None:
76
        params['node_list'] = [int(i) for i in node_id_list]
77
    retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('RESERVE',
78
79
        params=params)))
    return retval
80

81
def release(alps_res_id):
82
83
84
    '''release a set of nodes in an ALPS reservation.  May be used on a
    reservation with running jobs.  If that occurs, the reservation will be
    released when the jobs exit/are terminated.
85

86
87
88
89
90
91
92
93
94
95
96
97
98
99
    Input:
        alps_res_id - id of the ALPS reservation to release.

    Returns:
        True if relese was successful

    Side Effects:
        ALPS reservation will be released.  New aprun attempts agianst
        reservation will fail.

    Exceptions:
        None Yet

    '''
100
    params = {'reservation_id': alps_res_id}
101
    retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('RELEASE',
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
            params=params)))
    return retval

def confirm(alps_res_id, pg_id):
    '''confirm an ALPS reservation.  Call this after we have the process group
    id of the user job that we are reserving nodes for.

    Input:
        alps_res_id - The id of the reservation that is being confirmed.
        pg_id - process group id to bind the reservation to.

    Return:
        True if the reservation is confirmed.  False otherwise.

    Side effects:
        None

    Exceptions:
        None Yet.
    '''
    params = {'pagg_id': pg_id,
Paul Rich's avatar
Paul Rich committed
123
              'reservation_id': alps_res_id}
124
    retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('CONFIRM',
125
126
            params=params)))
    return retval
127

Paul Rich's avatar
Paul Rich committed
128
129
130
131
132
def system():
    '''fetch system information using the SYSTEM query.  Provides memory
    information'''
    params = {}
    req = BasilRequest('QUERY', 'SYSTEM', params)
133
    return _call_sys_forker_basil(BASIL_PATH, str(req))
Paul Rich's avatar
Paul Rich committed
134

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def fetch_inventory(changecount=None, resinfo=False):
    '''fetch the inventory for the machine

        changecount -  changecount to send if we only want deltas past a certain
        point
        resinfo - also fetch information on current reservations

        return:
        dictionary of machine information parsed from XML response
    '''
    params = {}
    if changecount is not None:
        params['changecount'] = changecount
    if resinfo:
        params['resinfo'] = True
150
    #TODO: add a flag for systems with version <=1.4 of ALPS
151
    req = BasilRequest('QUERY', 'INVENTORY', params)
152
    #print str(req)
153
    return _call_sys_forker_basil(BASIL_PATH, str(req))
154

155
156
157
158
159
160
161
def fetch_reservations():
    '''fetch reservation data.  This includes reservation metadata but not the
    reserved nodes.

    '''
    params = {'resinfo': True, 'nonodes' : True}
    req = BasilRequest('QUERY', 'INVENTORY', params)
162
    return _call_sys_forker_basil(BASIL_PATH, str(req))
163

164
def reserved_nodes():
165
166
    params = {}
    req = BasilRequest('QUERY', 'RESERVEDNODES', params)
167
    return _call_sys_forker_basil(BASIL_PATH, str(req))
168

169
170
171
172
173
def fetch_aggretate_reservation_data():
    '''correlate node and reservation data to get which nodes are in which
    reservation.

    '''
Paul Rich's avatar
Paul Rich committed
174
    pass
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191

def extract_system_node_data(node_data):
    ret_nodeinfo = {}
    for node_info in node_data['nodes']:
        #extract nodeids, construct from bulk data block...
        for node_id in expand_num_list(node_info['node_ids']):
            node = {}
            node['node_id'] = node_id
            node['state'] = node_info['state']
            node['role'] = node_info['role']
            node['attrs'] = node_info
            ret_nodeinfo[str(node_id)] = node
        del node_info['state']
        del node_info['node_ids']
        del node_info['role']
    return ret_nodeinfo

192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265

def fetch_ssd_static_data(nid_list=None, by_cname=False):
    '''Get static SSD information from CAPMC.

    Args:
        nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
        by_cname - if True, returns the nodes keyed by Cray cname (default False
    Returns:
        A dictionary with call status, Consult CAPMC documentation for details

    Notes:
        Consult CAPMC v1.2 or ls' call for more information.

    '''
    args = ['get_ssds']
    if nid_list is not None:
        args.extend(['-n', nid_list])
    ret_info = _call_sys_forker_capmc(CAPMC_PATH, args)
    if not by_cname:
        # Because everything else in this system works on nids.
        fixed_ret_info = {}
        fixed_ret_info['nids'] = []
        for key, val in ret_info.items():
            if key not in ['e', 'err_msg']:
                fixed_val = val
                val['cname'] = key
                fixed_ret_info['nids'].append(fixed_val)
            else:
                fixed_ret_info[key] = val
        ret_info = fixed_ret_info
    return ret_info

def fetch_ssd_enable(nid_list=None):
    '''Get SSD enable flags from CAPMC.

    Args:
        nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).

    Returns:
        A dictionary with call status, and list of nid dicts of the form {"ssd_enable": val, "nid": id}

    Notes:
        Consult CAPMC v1.2 or later documentation for details on 'get_ssd_enable' call for more information.

    '''
    args = ['get_ssd_enable']
    if nid_list is not None:
        args.extend(['-n', nid_list])
    return _call_sys_forker_capmc(CAPMC_PATH, args)

def fetch_ssd_diags(nid_list=None, raw=False):
    '''Get static SSD information from CAPMC.

    Args:
        nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
        raw - If true, do not make records consistient with other CAPMC calls output. (default False).

    Returns:
        A dictionary with call status, Consult CAPMC documentation for details

    Notes:
        Consult CAPMC v1.2 or ls' call for more information.

        This call to CAPMC, unlike others, returns 'ssd_diags' as a list of dictionaries as a top-level
            object, not 'nids'.  Size is in GB (10^3 not 2^10) instead of bytes. 'serial_num' is equivalent
            to 'serial_number' in CAPMC's get_ssds call.  Both keys are converted to match 'get_ssds' output.

    '''
    args = ['get_ssd_diags']
    if nid_list is not None:
        args.extend(['-n', nid_list])
    ret_info = _call_sys_forker_capmc(CAPMC_PATH, args)
    if not raw: # Not all consistency is foolish.
        fixed_ret_info = {}
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
        fixed_ret_info['e'] = ret_info['e']
        fixed_ret_info['err_msg'] = ret_info['err_msg']
        fixed_ret_info['ssd_diags'] = []
        diag_info = ret_info['ssd_diags']
        for info in diag_info:
            fixed_diag_info = {}
            for diag_key, diag_val in info.items():
                if diag_key not in ['serial_num', 'size']:
                    fixed_diag_info[diag_key] = diag_val
                elif diag_key == 'serial_num':
                    fixed_diag_info['serial_number'] = diag_val
                elif diag_key == 'size':
                    # It's storage so apparently we're using 10^3 instead of 2^10
                    # Going from GB back to bytes
                    fixed_diag_info[diag_key] = int(1000000000 * int(diag_val))
            fixed_ret_info['ssd_diags'].append(fixed_diag_info)
282
283
        ret_info = fixed_ret_info
    return ret_info
284

285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def _log_xmlrpc_error(runid, fault):
    '''Log an xmlrpc error.

    Args:
        runid: integer id of the current run in system_script_forker
        fault: xmlrpclib.Fault object raised by a component call

    Returns:
        None

    '''
    _logger.error('XMLRPC Fault recieved while fetching child %s status:', runid)
    _logger.error('Child %s: Fault code: %s', runid, fault.faultCode)
    _logger.error('Child %s: Fault string: %s', runid,
            fault.faultString)
    _logger.debug('Traceback information: for runid %s', runid,
           exc_info=True)

303
304
def _call_sys_forker(path, tag, label, args=None, in_str=None):
    '''Make a call through the system_script_forker to get output from a cray command.
305
306

    Args:
307
308
309
310
311
        path - path to the command
        tag - string tag for call
        label - label for logging on call
        args - arguments to command (default None)
        in_str - string to send to stdin of command (default None)
312
313

    Returns:
314
        stdout as a string
315
316
317
318
319

    Exceptions:
        Will raise a xmlrpclib.Fault if communication with the bridge
        and/or system component fails completely at startup.

320
321
     Notes:
        This is currently a blocking call until command completion.
322
323

    '''
324

325
    runid = None #_RUNID_GEN.next()
326
    resp = None
327
328
329
    cmd = [path]
    if args is not None:
        cmd.extend(args)
330
    try:
331
        child = ComponentProxy(FORKER).fork(cmd, 'apbridge',
332
333
334
335
336
337
                'alps', None, None, runid, in_str, True)
        runid = child
    except Exception:
        _logger.critical("error communicating with bridge", exc_info=True)
        raise

338
    while True:
339
        #Is a timeout needed here?
340
341
342
        try:
            children = ComponentProxy(FORKER).get_children('apbridge', [runid])
        except xmlrpclib.Fault as fault:
343
            _log_xmlrpc_error(runid, fault)
344
345
346
        complete = False
        for child in children:
            if child['complete']:
347
348
349
350
                if child['lost_child'] and resp is None:
                    continue # Use the original response.  This child object is
                             # invalid.  If we never got one, then let the
                             # caller handle the error.
351
                if child['exit_status'] != 0:
352
353
                    _logger.error("%s returned a status of %s, stderr: %s",
                            cmd, child['exit_status'], "\n".join(child['stderr']))
354
                resp = child['stdout_string']
355
356
357
                try:
                    ComponentProxy(FORKER).cleanup_children([runid])
                except xmlrpclib.Fault as fault:
358
359
360
                    _log_xmlrpc_error(runid, fault)
                else:
                    complete = True
361
362
363
        if complete:
            break
        sleep(CHILD_SLEEP_TIMEOUT)
364
365
366
367
368
369
370
371
372
373
    return resp

def _call_sys_forker_basil(basil_path, in_str):
    '''Make a  call through to BASIL wait until we get output and clean up
    child info.

    Args:
        basil_path: path to the BAISL executable. May be overriden for
                    test environments.
        in_str: A string of XML to send to 'apbasil'
374

375
376
377
378
379
380
381
382
383
384
385
386
387
388
    Returns:
        The XML response parsed into a Python dictionary.

    Exceptions:
        Will raise a xmlrpclib.Fault if communication with the bridge
        and/or system component fails completely at startup.

    Notes:
        This will block until 'apbasil' completion.  'apbasil' messages can
        be failrly large for things sent to stdout.

    '''

    resp = _call_sys_forker(basil_path, 'apbridge', 'alps', in_str=in_str)
389
390
391
392
393
394
395
    parsed_resp = {}
    try:
        parsed_resp = parse_response(resp)
    except xml.etree.ElementTree.ParseError as exc:
        _logger.error('Error parsing response "%s"', resp)
        raise exc
    return parsed_resp
396

397
398
399
def _call_sys_forker_capmc(capmc_path, args):
    '''Call a CAPMC command and recieve response'''
    resp = _call_sys_forker(capmc_path, 'apbridge', 'capmc_ssd', args=args)
400
    parsed_response = {}
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
    try:
        parsed_response = json.loads(resp)
    except TypeError:
        _logger.error("Bad type recieved for CAPMC response, expected %s got %s.", type(""), type(resp))
        raise
    except ValueError:
        _logger.error("Invalid JSON string returned: %s", resp)
    else:
        err_code = parsed_response.get('e', None)
        err_msg = parsed_response.get('err_msg', None)
        if err_code is None:
            raise ValueError('Error code in CAPMC response not provided.  Invalid response recieved. %s', resp)
        if int(err_code) != 0:
            raise ValueError('Error Code %s recieved.  Message: %s', err_code, err_msg)
    return parsed_response


418
def print_node_names(spec):
419
    '''Debugging utility to print nodes returned by ALPS'''
420
421
422
423
424
425
    print spec['reservations']
    print spec['nodes'][0]
    for node in spec['nodes']:
        print node['name']

if __name__ == '__main__':
426
    #print_node_names(fetch_inventory(resinfo=True))
427

428
429
430
431
432
    # print fetch_inventory(changecount=0)
    # print extract_system_node_data(system())
    # print fetch_reserved_nodes()
    # print fetch_inventory(resinfo=True)
    print fetch_reservations()