Commit 7e22422b authored by Paul Rich's avatar Paul Rich
Browse files

Initial commit of messaging through ALPS Bridge for SSD data.

Slight bit of reorganization.  Also added some things to make the
presentation elsewhere in Cobalt kind of consistent with the rest of
Cray's stack.
parent c5a1f1fd
......@@ -7,6 +7,7 @@
import logging
import xml.etree
import xmlrpclib
import json
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
from Cobalt.Proxy import ComponentProxy
......@@ -21,7 +22,8 @@ init_cobalt_config()
FORKER = get_config_option('alps', 'forker', 'system_script_forker')
BASIL_PATH = get_config_option('alps', 'basil',
'/home/richp/alps-simulator/apbasil.sh')
# Make sure that you have key and cert set for CAPMC operaition and paths are established in the exec environment
CAPMC_PATH = get_config_option('capmc', 'path', '/opt/cray/capmc/default/bin/capmc')
_RUNID_GEN = IncrID()
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
1.0))
......@@ -72,7 +74,7 @@ def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
params[key] = val
if node_id_list is not None:
params['node_list'] = [int(i) for i in node_id_list]
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('RESERVE',
params=params)))
return retval
......@@ -96,7 +98,7 @@ def release(alps_res_id):
'''
params = {'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RELEASE',
retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('RELEASE',
params=params)))
return retval
......@@ -119,7 +121,7 @@ def confirm(alps_res_id, pg_id):
'''
params = {'pagg_id': pg_id,
'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('CONFIRM',
params=params)))
return retval
......@@ -128,7 +130,7 @@ def system():
information'''
params = {}
req = BasilRequest('QUERY', 'SYSTEM', params)
return _call_sys_forker(BASIL_PATH, str(req))
return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_inventory(changecount=None, resinfo=False):
'''fetch the inventory for the machine
......@@ -148,7 +150,7 @@ def fetch_inventory(changecount=None, resinfo=False):
#TODO: add a flag for systems with version <=1.4 of ALPS
req = BasilRequest('QUERY', 'INVENTORY', params)
#print str(req)
return _call_sys_forker(BASIL_PATH, str(req))
return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_reservations():
'''fetch reservation data. This includes reservation metadata but not the
......@@ -157,12 +159,12 @@ def fetch_reservations():
'''
params = {'resinfo': True, 'nonodes' : True}
req = BasilRequest('QUERY', 'INVENTORY', params)
return _call_sys_forker(BASIL_PATH, str(req))
return _call_sys_forker_basil(BASIL_PATH, str(req))
def reserved_nodes():
params = {}
req = BasilRequest('QUERY', 'RESERVEDNODES', params)
return _call_sys_forker(BASIL_PATH, str(req))
return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_aggretate_reservation_data():
'''correlate node and reservation data to get which nodes are in which
......@@ -187,6 +189,95 @@ def extract_system_node_data(node_data):
del node_info['role']
return ret_nodeinfo
def fetch_ssd_static_data(nid_list=None, by_cname=False):
'''Get static SSD information from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
by_cname - if True, returns the nodes keyed by Cray cname (default False
Returns:
A dictionary with call status, Consult CAPMC documentation for details
Notes:
Consult CAPMC v1.2 or ls' call for more information.
'''
args = ['get_ssds']
if nid_list is not None:
args.extend(['-n', nid_list])
ret_info = _call_sys_forker_capmc(CAPMC_PATH, args)
if not by_cname:
# Because everything else in this system works on nids.
fixed_ret_info = {}
fixed_ret_info['nids'] = []
for key, val in ret_info.items():
if key not in ['e', 'err_msg']:
fixed_val = val
val['cname'] = key
fixed_ret_info['nids'].append(fixed_val)
else:
fixed_ret_info[key] = val
ret_info = fixed_ret_info
return ret_info
def fetch_ssd_enable(nid_list=None):
'''Get SSD enable flags from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
Returns:
A dictionary with call status, and list of nid dicts of the form {"ssd_enable": val, "nid": id}
Notes:
Consult CAPMC v1.2 or later documentation for details on 'get_ssd_enable' call for more information.
'''
args = ['get_ssd_enable']
if nid_list is not None:
args.extend(['-n', nid_list])
return _call_sys_forker_capmc(CAPMC_PATH, args)
def fetch_ssd_diags(nid_list=None, raw=False):
'''Get static SSD information from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
raw - If true, do not make records consistient with other CAPMC calls output. (default False).
Returns:
A dictionary with call status, Consult CAPMC documentation for details
Notes:
Consult CAPMC v1.2 or ls' call for more information.
This call to CAPMC, unlike others, returns 'ssd_diags' as a list of dictionaries as a top-level
object, not 'nids'. Size is in GB (10^3 not 2^10) instead of bytes. 'serial_num' is equivalent
to 'serial_number' in CAPMC's get_ssds call. Both keys are converted to match 'get_ssds' output.
'''
args = ['get_ssd_diags']
if nid_list is not None:
args.extend(['-n', nid_list])
ret_info = _call_sys_forker_capmc(CAPMC_PATH, args)
if not raw: # Not all consistency is foolish.
fixed_ret_info = {}
for key, val in ret_info.items():
if key not in ['e', 'err_msg']:
for diag_key, diag_val in ret_info[key]:
if diag_key not in ['serial_num', 'size']:
fixed_ret_info[key][diag_key] = diag_val
elif diag_key == 'serial_num':
fixed_ret_info[key]['serial_number'] = diag_val
elif diag_key == 'size':
# It's storage so apparently we're using 10^3 instead of 2^10
# Going from GB back to bytes
fixed_ret_info[key][diag_key] = int(1000000000 * int(diag_val))
else:
fixed_ret_info[key] = val
ret_info = fixed_ret_info
return ret_info
def _log_xmlrpc_error(runid, fault):
'''Log an xmlrpc error.
......@@ -205,34 +296,35 @@ def _log_xmlrpc_error(runid, fault):
_logger.debug('Traceback information: for runid %s', runid,
exc_info=True)
def _call_sys_forker(basil_path, in_str):
'''Make a call through to BASIL wait until we get output and clean up
child info.
def _call_sys_forker(path, tag, label, args=None, in_str=None):
'''Make a call through the system_script_forker to get output from a cray command.
Args:
basil_path: path to the BAISL executable. May be overriden for
test environments.
in_str: A string of XML to send to 'apbasil'
path - path to the command
tag - string tag for call
label - label for logging on call
args - arguments to command (default None)
in_str - string to send to stdin of command (default None)
Returns:
The XML response parsed into a Python dictionary.
stdout as a string
Exceptions:
Will raise a xmlrpclib.Fault if communication with the bridge
and/or system component fails completely at startup.
Notes:
This will block until 'apbasil' completion. 'apbasil' messages can
be failrly large for things sent to stdout.
Notes:
This is currently a blocking call until command completion.
'''
runid = None #_RUNID_GEN.next()i
runid = None #_RUNID_GEN.next()
resp = None
cmd = [path]
if args is not None:
cmd.extend(args)
try:
child = ComponentProxy(FORKER).fork([basil_path], 'apbridge',
child = ComponentProxy(FORKER).fork(cmd, 'apbridge',
'alps', None, None, runid, in_str, True)
runid = child
except Exception:
......@@ -265,7 +357,31 @@ def _call_sys_forker(basil_path, in_str):
if complete:
break
sleep(CHILD_SLEEP_TIMEOUT)
return resp
def _call_sys_forker_basil(basil_path, in_str):
'''Make a call through to BASIL wait until we get output and clean up
child info.
Args:
basil_path: path to the BAISL executable. May be overriden for
test environments.
in_str: A string of XML to send to 'apbasil'
Returns:
The XML response parsed into a Python dictionary.
Exceptions:
Will raise a xmlrpclib.Fault if communication with the bridge
and/or system component fails completely at startup.
Notes:
This will block until 'apbasil' completion. 'apbasil' messages can
be failrly large for things sent to stdout.
'''
resp = _call_sys_forker(basil_path, 'apbridge', 'alps', in_str=in_str)
parsed_resp = {}
try:
parsed_resp = parse_response(resp)
......@@ -274,6 +390,26 @@ def _call_sys_forker(basil_path, in_str):
raise exc
return parsed_resp
def _call_sys_forker_capmc(capmc_path, args):
'''Call a CAPMC command and recieve response'''
resp = _call_sys_forker(capmc_path, 'apbridge', 'capmc_ssd', args=args)
try:
parsed_response = json.loads(resp)
except TypeError:
_logger.error("Bad type recieved for CAPMC response, expected %s got %s.", type(""), type(resp))
raise
except ValueError:
_logger.error("Invalid JSON string returned: %s", resp)
else:
err_code = parsed_response.get('e', None)
err_msg = parsed_response.get('err_msg', None)
if err_code is None:
raise ValueError('Error code in CAPMC response not provided. Invalid response recieved. %s', resp)
if int(err_code) != 0:
raise ValueError('Error Code %s recieved. Message: %s', err_code, err_msg)
return parsed_response
def print_node_names(spec):
'''Debugging utility to print nodes returned by ALPS'''
print spec['reservations']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment