Commit cd19412c authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'revert-e5cf5321' into 'develop'

Revert "Merge branch 'revert-5c1ac88b' into 'develop'"

See merge request !55
parents 738f6419 2a518b44
......@@ -554,7 +554,11 @@ seconds. The default is 300 seconds.
.B update_thread_timeout
The polling interval for state updates from ALPS in seconds. The default is
10 seconds.
.SS [capmc]
.TP
.B path
Path to CAPMC command front-end. If unset, the default is /opt/cray/capmc/default/bin/capmc
.TP
.SS [system]
.TP
.B backfill_epsillon
......
......@@ -7,6 +7,7 @@
import logging
import xml.etree
import xmlrpclib
import json
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
from Cobalt.Proxy import ComponentProxy
......@@ -21,7 +22,8 @@ init_cobalt_config()
FORKER = get_config_option('alps', 'forker', 'system_script_forker')
BASIL_PATH = get_config_option('alps', 'basil',
'/home/richp/alps-simulator/apbasil.sh')
# Make sure that you have key and cert set for CAPMC operaition and paths are established in the exec environment
CAPMC_PATH = get_config_option('capmc', 'path', '/opt/cray/capmc/default/bin/capmc')
_RUNID_GEN = IncrID()
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
1.0))
......@@ -72,7 +74,7 @@ def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
params[key] = val
if node_id_list is not None:
params['node_list'] = [int(i) for i in node_id_list]
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('RESERVE',
params=params)))
return retval
......@@ -96,7 +98,7 @@ def release(alps_res_id):
'''
params = {'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RELEASE',
retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('RELEASE',
params=params)))
return retval
......@@ -119,7 +121,7 @@ def confirm(alps_res_id, pg_id):
'''
params = {'pagg_id': pg_id,
'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('CONFIRM',
params=params)))
return retval
......@@ -128,7 +130,7 @@ def system():
information'''
params = {}
req = BasilRequest('QUERY', 'SYSTEM', params)
return _call_sys_forker(BASIL_PATH, str(req))
return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_inventory(changecount=None, resinfo=False):
'''fetch the inventory for the machine
......@@ -148,7 +150,7 @@ def fetch_inventory(changecount=None, resinfo=False):
#TODO: add a flag for systems with version <=1.4 of ALPS
req = BasilRequest('QUERY', 'INVENTORY', params)
#print str(req)
return _call_sys_forker(BASIL_PATH, str(req))
return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_reservations():
'''fetch reservation data. This includes reservation metadata but not the
......@@ -157,12 +159,12 @@ def fetch_reservations():
'''
params = {'resinfo': True, 'nonodes' : True}
req = BasilRequest('QUERY', 'INVENTORY', params)
return _call_sys_forker(BASIL_PATH, str(req))
return _call_sys_forker_basil(BASIL_PATH, str(req))
def reserved_nodes():
params = {}
req = BasilRequest('QUERY', 'RESERVEDNODES', params)
return _call_sys_forker(BASIL_PATH, str(req))
return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_aggretate_reservation_data():
'''correlate node and reservation data to get which nodes are in which
......@@ -187,6 +189,99 @@ def extract_system_node_data(node_data):
del node_info['role']
return ret_nodeinfo
def fetch_ssd_static_data(nid_list=None, by_cname=False):
'''Get static SSD information from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
by_cname - if True, returns the nodes keyed by Cray cname (default False
Returns:
A dictionary with call status, Consult CAPMC documentation for details
Notes:
Consult CAPMC v1.2 or ls' call for more information.
'''
args = ['get_ssds']
if nid_list is not None:
args.extend(['-n', nid_list])
ret_info = _call_sys_forker_capmc(CAPMC_PATH, args)
if not by_cname:
# Because everything else in this system works on nids.
fixed_ret_info = {}
fixed_ret_info['nids'] = []
for key, val in ret_info.items():
if key not in ['e', 'err_msg']:
fixed_val = val
val['cname'] = key
fixed_ret_info['nids'].append(fixed_val)
else:
fixed_ret_info[key] = val
ret_info = fixed_ret_info
return ret_info
def fetch_ssd_enable(nid_list=None):
'''Get SSD enable flags from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
Returns:
A dictionary with call status, and list of nid dicts of the form {"ssd_enable": val, "nid": id}
Notes:
Consult CAPMC v1.2 or later documentation for details on 'get_ssd_enable' call for more information.
'''
args = ['get_ssd_enable']
if nid_list is not None:
args.extend(['-n', nid_list])
return _call_sys_forker_capmc(CAPMC_PATH, args)
def fetch_ssd_diags(nid_list=None, raw=False):
'''Get static SSD information from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
raw - If true, do not make records consistient with other CAPMC calls output. (default False).
Returns:
A dictionary with call status, Consult CAPMC documentation for details
Notes:
Consult CAPMC v1.2 or ls' call for more information.
This call to CAPMC, unlike others, returns 'ssd_diags' as a list of dictionaries as a top-level
object, not 'nids'. Size is in GB (10^3 not 2^10) instead of bytes. 'serial_num' is equivalent
to 'serial_number' in CAPMC's get_ssds call. Both keys are converted to match 'get_ssds' output.
'''
args = ['get_ssd_diags']
if nid_list is not None:
args.extend(['-n', nid_list])
ret_info = _call_sys_forker_capmc(CAPMC_PATH, args)
if not raw: # Not all consistency is foolish.
fixed_ret_info = {}
fixed_ret_info['e'] = ret_info['e']
fixed_ret_info['err_msg'] = ret_info['err_msg']
fixed_ret_info['ssd_diags'] = []
diag_info = ret_info['ssd_diags']
for info in diag_info:
fixed_diag_info = {}
for diag_key, diag_val in info.items():
if diag_key not in ['serial_num', 'size']:
fixed_diag_info[diag_key] = diag_val
elif diag_key == 'serial_num':
fixed_diag_info['serial_number'] = diag_val
elif diag_key == 'size':
# It's storage so apparently we're using 10^3 instead of 2^10
# Going from GB back to bytes
fixed_diag_info[diag_key] = int(1000000000 * int(diag_val))
fixed_ret_info['ssd_diags'].append(fixed_diag_info)
ret_info = fixed_ret_info
return ret_info
def _log_xmlrpc_error(runid, fault):
'''Log an xmlrpc error.
......@@ -205,34 +300,35 @@ def _log_xmlrpc_error(runid, fault):
_logger.debug('Traceback information: for runid %s', runid,
exc_info=True)
def _call_sys_forker(basil_path, in_str):
'''Make a call through to BASIL wait until we get output and clean up
child info.
def _call_sys_forker(path, tag, label, args=None, in_str=None):
'''Make a call through the system_script_forker to get output from a cray command.
Args:
basil_path: path to the BAISL executable. May be overriden for
test environments.
in_str: A string of XML to send to 'apbasil'
path - path to the command
tag - string tag for call
label - label for logging on call
args - arguments to command (default None)
in_str - string to send to stdin of command (default None)
Returns:
The XML response parsed into a Python dictionary.
stdout as a string
Exceptions:
Will raise a xmlrpclib.Fault if communication with the bridge
and/or system component fails completely at startup.
Notes:
This will block until 'apbasil' completion. 'apbasil' messages can
be failrly large for things sent to stdout.
This is currently a blocking call until command completion.
'''
runid = None #_RUNID_GEN.next()i
runid = None #_RUNID_GEN.next()
resp = None
cmd = [path]
if args is not None:
cmd.extend(args)
try:
child = ComponentProxy(FORKER).fork([basil_path], 'apbridge',
child = ComponentProxy(FORKER).fork(cmd, 'apbridge',
'alps', None, None, runid, in_str, True)
runid = child
except Exception:
......@@ -253,8 +349,8 @@ def _call_sys_forker(basil_path, in_str):
# invalid. If we never got one, then let the
# caller handle the error.
if child['exit_status'] != 0:
_logger.error("BASIL returned a status of %s",
child['exit_status'])
_logger.error("%s returned a status of %s, stderr: %s",
cmd, child['exit_status'], "\n".join(child['stderr']))
resp = child['stdout_string']
try:
ComponentProxy(FORKER).cleanup_children([runid])
......@@ -265,7 +361,31 @@ def _call_sys_forker(basil_path, in_str):
if complete:
break
sleep(CHILD_SLEEP_TIMEOUT)
return resp
def _call_sys_forker_basil(basil_path, in_str):
'''Make a call through to BASIL wait until we get output and clean up
child info.
Args:
basil_path: path to the BAISL executable. May be overriden for
test environments.
in_str: A string of XML to send to 'apbasil'
Returns:
The XML response parsed into a Python dictionary.
Exceptions:
Will raise a xmlrpclib.Fault if communication with the bridge
and/or system component fails completely at startup.
Notes:
This will block until 'apbasil' completion. 'apbasil' messages can
be failrly large for things sent to stdout.
'''
resp = _call_sys_forker(basil_path, 'apbridge', 'alps', in_str=in_str)
parsed_resp = {}
try:
parsed_resp = parse_response(resp)
......@@ -274,6 +394,27 @@ def _call_sys_forker(basil_path, in_str):
raise exc
return parsed_resp
def _call_sys_forker_capmc(capmc_path, args):
'''Call a CAPMC command and recieve response'''
resp = _call_sys_forker(capmc_path, 'apbridge', 'capmc_ssd', args=args)
parsed_response = {}
try:
parsed_response = json.loads(resp)
except TypeError:
_logger.error("Bad type recieved for CAPMC response, expected %s got %s.", type(""), type(resp))
raise
except ValueError:
_logger.error("Invalid JSON string returned: %s", resp)
else:
err_code = parsed_response.get('e', None)
err_msg = parsed_response.get('err_msg', None)
if err_code is None:
raise ValueError('Error code in CAPMC response not provided. Invalid response recieved. %s', resp)
if int(err_code) != 0:
raise ValueError('Error Code %s recieved. Message: %s', err_code, err_msg)
return parsed_response
def print_node_names(spec):
'''Debugging utility to print nodes returned by ALPS'''
print spec['reservations']
......
......@@ -202,6 +202,12 @@ class CraySystem(BaseSystem):
reservations = ALPSBridge.fetch_reservations()
_logger.info('ALPS RESERVATION DATA FETCHED')
# reserved_nodes = ALPSBridge.reserved_nodes()
ssd_enabled = ALPSBridge.fetch_ssd_enable()
_logger.info('CAPMC SSD ENABLED DATA FETCHED')
ssd_info = ALPSBridge.fetch_ssd_static_data()
_logger.info('CAPMC SSD DETAIL DATA FETCHED')
ssd_diags = ALPSBridge.fetch_ssd_diags()
_logger.info('CAPMC SSD DIAG DATA FETCHED')
except Exception:
#don't crash out here. That may trash a statefile.
_logger.error('Possible transient encountered during initialization. Retrying.',
......@@ -210,7 +216,7 @@ class CraySystem(BaseSystem):
else:
pending = False
self._assemble_nodes(inventory, system)
self._assemble_nodes(inventory, system, ssd_enabled, ssd_info, ssd_diags)
#Reversing the node name to id lookup is going to save a lot of cycles.
for node in self.nodes.values():
self.node_name_to_id[node.name] = node.node_id
......@@ -219,10 +225,23 @@ class CraySystem(BaseSystem):
# self._assemble_reservations(reservations, reserved_nodes)
return
def _assemble_nodes(self, inventory, system):
def _assemble_nodes(self, inventory, system, ssd_enabled, ssd_info, ssd_diags):
'''merge together the INVENTORY and SYSTEM query data to form as
complete a picture of a node as we can.
Args:
inventory - ALPS QUERY(INVENTORY) data
system - ALPS QUERY(SYSTEM) data
ssd_enable - CAPMC get_ssd_enable data
ssd_info - CAPMC get_ssds data
ssd_diags - CAPMC get_ssd_diags data
Returns:
None
Side Effects:
Populates the node dictionary
'''
nodes = {}
for nodespec in inventory['nodes']:
......@@ -236,8 +255,34 @@ class CraySystem(BaseSystem):
if nodes[node_id].role.upper() not in ['BATCH']:
nodes[node_id].status = 'down'
nodes[node_id].status = nodespec['state']
self._update_ssd_data(nodes, ssd_enabled, ssd_info, ssd_diags)
self.nodes = nodes
def _update_ssd_data(self, nodes, ssd_enabled=None, ssd_info=None, ssd_diags=None):
'''Update/add ssd data from CAPMC'''
if ssd_enabled is not None:
for ssd_data in ssd_enabled['nids']:
try:
nodes[str(ssd_data['nid'])].attributes['ssd_enabled'] = int(ssd_data['ssd_enable'])
except KeyError:
_logger.warning('ssd info present for nid %s, but not reported in ALPS.', ssd_data['nid'])
if ssd_info is not None:
for ssd_data in ssd_info['nids']:
try:
nodes[str(ssd_data['nid'])].attributes['ssd_info'] = ssd_data
except KeyError:
_logger.warning('ssd info present for nid %s, but not reported in ALPS.', ssd_data['nid'])
if ssd_diags is not None:
for diag_info in ssd_diags['ssd_diags']:
try:
node = nodes[str(diag_info['nid'])]
except KeyError:
_logger.warning('ssd diag data present for nid %s, but not reported in ALPS.', ssd_data['nid'])
else:
for field in ['life_remaining', 'ts', 'firmware', 'percent_used']:
node.attributes['ssd_info'][field] = diag_info[field]
def _assemble_reservations(self, reservations, reserved_nodes):
# FIXME: we can recover reservations now. Implement this.
pass
......@@ -341,6 +386,7 @@ class CraySystem(BaseSystem):
self.nodes[str(nid)] = new_node
self.logger.warning('Node %s added to tracking.', nid)
@exposed
def update_node_state(self):
'''update the state of cray nodes. Check reservation status and system
......@@ -371,6 +417,9 @@ class CraySystem(BaseSystem):
inven_nodes = ALPSBridge.extract_system_node_data(ALPSBridge.system())
reservations = ALPSBridge.fetch_reservations()
#reserved_nodes = ALPSBridge.reserved_nodes()
# Fetch SSD diagnostic data and enabled flags. I would hope these change in event of dead ssd
ssd_enabled = ALPSBridge.fetch_ssd_enable()
ssd_diags = ALPSBridge.fetch_ssd_diags()
except (ALPSBridge.ALPSError, ComponentLookupError):
_logger.warning('Error contacting ALPS for state update. Aborting this update',
exc_info=True)
......@@ -485,6 +534,8 @@ class CraySystem(BaseSystem):
self._reconstruct_node(inven_node, recon_inventory)
# _logger.error('UNS: ALPS reports node %s but not in our node list.',
# inven_node['node_id'])
# Update SSD data:
self._update_ssd_data(self.nodes, ssd_enabled=ssd_enabled, ssd_diags=ssd_diags)
#should down win over running in terms of display?
#keep node that are marked for cleanup still in cleanup
for node in cleanup_nodes:
......
......@@ -1376,8 +1376,8 @@ def print_node_details(args):
retval = str(value)
return retval
nodes = component_call(SYSMGR, False, 'get_nodes',
(True, expand_node_args(args)))
nodes = json.loads(component_call(SYSMGR, False, 'get_nodes',
(True, expand_node_args(args), None, True)))
res_queues = _setup_res_info()
for node in nodes.values():
header_list = []
......@@ -1393,6 +1393,10 @@ def print_node_details(args):
if res_queues.get(str(node['node_id']), False):
queues.extend(res_queues[str(node['node_id'])])
value_list.append(':'.join(queues))
elif key == 'attributes':
for attr_key, attr_val in value.items():
header_list.append(key +'.'+ attr_key )
value_list.append(gen_printable_value(attr_val))
else:
header_list.append(key)
value_list.append(gen_printable_value(value))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment