Commit 2a518b44 authored by Paul Rich's avatar Paul Rich
Browse files

Revert "Merge branch 'revert-5c1ac88b' into 'develop'"

This reverts merge request !54
parent 738f6419
...@@ -554,7 +554,11 @@ seconds. The default is 300 seconds. ...@@ -554,7 +554,11 @@ seconds. The default is 300 seconds.
.B update_thread_timeout .B update_thread_timeout
The polling interval for state updates from ALPS in seconds. The default is The polling interval for state updates from ALPS in seconds. The default is
10 seconds. 10 seconds.
.SS [capmc]
.TP
.B path
Path to CAPMC command front-end. If unset, the default is /opt/cray/capmc/default/bin/capmc
.TP
.SS [system] .SS [system]
.TP .TP
.B backfill_epsillon .B backfill_epsillon
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
import logging import logging
import xml.etree import xml.etree
import xmlrpclib import xmlrpclib
import json
from cray_messaging import InvalidBasilMethodError, BasilRequest from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError from cray_messaging import parse_response, ALPSError
from Cobalt.Proxy import ComponentProxy from Cobalt.Proxy import ComponentProxy
...@@ -21,7 +22,8 @@ init_cobalt_config() ...@@ -21,7 +22,8 @@ init_cobalt_config()
FORKER = get_config_option('alps', 'forker', 'system_script_forker') FORKER = get_config_option('alps', 'forker', 'system_script_forker')
BASIL_PATH = get_config_option('alps', 'basil', BASIL_PATH = get_config_option('alps', 'basil',
'/home/richp/alps-simulator/apbasil.sh') '/home/richp/alps-simulator/apbasil.sh')
# Make sure that you have key and cert set for CAPMC operaition and paths are established in the exec environment
CAPMC_PATH = get_config_option('capmc', 'path', '/opt/cray/capmc/default/bin/capmc')
_RUNID_GEN = IncrID() _RUNID_GEN = IncrID()
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout', CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
1.0)) 1.0))
...@@ -72,7 +74,7 @@ def reserve(user, jobid, nodecount, attributes=None, node_id_list=None): ...@@ -72,7 +74,7 @@ def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
params[key] = val params[key] = val
if node_id_list is not None: if node_id_list is not None:
params['node_list'] = [int(i) for i in node_id_list] params['node_list'] = [int(i) for i in node_id_list]
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE', retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('RESERVE',
params=params))) params=params)))
return retval return retval
...@@ -96,7 +98,7 @@ def release(alps_res_id): ...@@ -96,7 +98,7 @@ def release(alps_res_id):
''' '''
params = {'reservation_id': alps_res_id} params = {'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RELEASE', retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('RELEASE',
params=params))) params=params)))
return retval return retval
...@@ -119,7 +121,7 @@ def confirm(alps_res_id, pg_id): ...@@ -119,7 +121,7 @@ def confirm(alps_res_id, pg_id):
''' '''
params = {'pagg_id': pg_id, params = {'pagg_id': pg_id,
'reservation_id': alps_res_id} 'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM', retval = _call_sys_forker_basil(BASIL_PATH, str(BasilRequest('CONFIRM',
params=params))) params=params)))
return retval return retval
...@@ -128,7 +130,7 @@ def system(): ...@@ -128,7 +130,7 @@ def system():
information''' information'''
params = {} params = {}
req = BasilRequest('QUERY', 'SYSTEM', params) req = BasilRequest('QUERY', 'SYSTEM', params)
return _call_sys_forker(BASIL_PATH, str(req)) return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_inventory(changecount=None, resinfo=False): def fetch_inventory(changecount=None, resinfo=False):
'''fetch the inventory for the machine '''fetch the inventory for the machine
...@@ -148,7 +150,7 @@ def fetch_inventory(changecount=None, resinfo=False): ...@@ -148,7 +150,7 @@ def fetch_inventory(changecount=None, resinfo=False):
#TODO: add a flag for systems with version <=1.4 of ALPS #TODO: add a flag for systems with version <=1.4 of ALPS
req = BasilRequest('QUERY', 'INVENTORY', params) req = BasilRequest('QUERY', 'INVENTORY', params)
#print str(req) #print str(req)
return _call_sys_forker(BASIL_PATH, str(req)) return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_reservations(): def fetch_reservations():
'''fetch reservation data. This includes reservation metadata but not the '''fetch reservation data. This includes reservation metadata but not the
...@@ -157,12 +159,12 @@ def fetch_reservations(): ...@@ -157,12 +159,12 @@ def fetch_reservations():
''' '''
params = {'resinfo': True, 'nonodes' : True} params = {'resinfo': True, 'nonodes' : True}
req = BasilRequest('QUERY', 'INVENTORY', params) req = BasilRequest('QUERY', 'INVENTORY', params)
return _call_sys_forker(BASIL_PATH, str(req)) return _call_sys_forker_basil(BASIL_PATH, str(req))
def reserved_nodes(): def reserved_nodes():
params = {} params = {}
req = BasilRequest('QUERY', 'RESERVEDNODES', params) req = BasilRequest('QUERY', 'RESERVEDNODES', params)
return _call_sys_forker(BASIL_PATH, str(req)) return _call_sys_forker_basil(BASIL_PATH, str(req))
def fetch_aggretate_reservation_data(): def fetch_aggretate_reservation_data():
'''correlate node and reservation data to get which nodes are in which '''correlate node and reservation data to get which nodes are in which
...@@ -187,6 +189,99 @@ def extract_system_node_data(node_data): ...@@ -187,6 +189,99 @@ def extract_system_node_data(node_data):
del node_info['role'] del node_info['role']
return ret_nodeinfo return ret_nodeinfo
def fetch_ssd_static_data(nid_list=None, by_cname=False):
'''Get static SSD information from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
by_cname - if True, returns the nodes keyed by Cray cname (default False
Returns:
A dictionary with call status, Consult CAPMC documentation for details
Notes:
Consult CAPMC v1.2 or ls' call for more information.
'''
args = ['get_ssds']
if nid_list is not None:
args.extend(['-n', nid_list])
ret_info = _call_sys_forker_capmc(CAPMC_PATH, args)
if not by_cname:
# Because everything else in this system works on nids.
fixed_ret_info = {}
fixed_ret_info['nids'] = []
for key, val in ret_info.items():
if key not in ['e', 'err_msg']:
fixed_val = val
val['cname'] = key
fixed_ret_info['nids'].append(fixed_val)
else:
fixed_ret_info[key] = val
ret_info = fixed_ret_info
return ret_info
def fetch_ssd_enable(nid_list=None):
'''Get SSD enable flags from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
Returns:
A dictionary with call status, and list of nid dicts of the form {"ssd_enable": val, "nid": id}
Notes:
Consult CAPMC v1.2 or later documentation for details on 'get_ssd_enable' call for more information.
'''
args = ['get_ssd_enable']
if nid_list is not None:
args.extend(['-n', nid_list])
return _call_sys_forker_capmc(CAPMC_PATH, args)
def fetch_ssd_diags(nid_list=None, raw=False):
'''Get static SSD information from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
raw - If true, do not make records consistient with other CAPMC calls output. (default False).
Returns:
A dictionary with call status, Consult CAPMC documentation for details
Notes:
Consult CAPMC v1.2 or ls' call for more information.
This call to CAPMC, unlike others, returns 'ssd_diags' as a list of dictionaries as a top-level
object, not 'nids'. Size is in GB (10^3 not 2^10) instead of bytes. 'serial_num' is equivalent
to 'serial_number' in CAPMC's get_ssds call. Both keys are converted to match 'get_ssds' output.
'''
args = ['get_ssd_diags']
if nid_list is not None:
args.extend(['-n', nid_list])
ret_info = _call_sys_forker_capmc(CAPMC_PATH, args)
if not raw: # Not all consistency is foolish.
fixed_ret_info = {}
fixed_ret_info['e'] = ret_info['e']
fixed_ret_info['err_msg'] = ret_info['err_msg']
fixed_ret_info['ssd_diags'] = []
diag_info = ret_info['ssd_diags']
for info in diag_info:
fixed_diag_info = {}
for diag_key, diag_val in info.items():
if diag_key not in ['serial_num', 'size']:
fixed_diag_info[diag_key] = diag_val
elif diag_key == 'serial_num':
fixed_diag_info['serial_number'] = diag_val
elif diag_key == 'size':
# It's storage so apparently we're using 10^3 instead of 2^10
# Going from GB back to bytes
fixed_diag_info[diag_key] = int(1000000000 * int(diag_val))
fixed_ret_info['ssd_diags'].append(fixed_diag_info)
ret_info = fixed_ret_info
return ret_info
def _log_xmlrpc_error(runid, fault): def _log_xmlrpc_error(runid, fault):
'''Log an xmlrpc error. '''Log an xmlrpc error.
...@@ -205,34 +300,35 @@ def _log_xmlrpc_error(runid, fault): ...@@ -205,34 +300,35 @@ def _log_xmlrpc_error(runid, fault):
_logger.debug('Traceback information: for runid %s', runid, _logger.debug('Traceback information: for runid %s', runid,
exc_info=True) exc_info=True)
def _call_sys_forker(path, tag, label, args=None, in_str=None):
'''Make a call through the system_script_forker to get output from a cray command.
def _call_sys_forker(basil_path, in_str):
'''Make a call through to BASIL wait until we get output and clean up
child info.
Args: Args:
basil_path: path to the BAISL executable. May be overriden for path - path to the command
test environments. tag - string tag for call
in_str: A string of XML to send to 'apbasil' label - label for logging on call
args - arguments to command (default None)
in_str - string to send to stdin of command (default None)
Returns: Returns:
The XML response parsed into a Python dictionary. stdout as a string
Exceptions: Exceptions:
Will raise a xmlrpclib.Fault if communication with the bridge Will raise a xmlrpclib.Fault if communication with the bridge
and/or system component fails completely at startup. and/or system component fails completely at startup.
Notes: Notes:
This will block until 'apbasil' completion. 'apbasil' messages can This is currently a blocking call until command completion.
be failrly large for things sent to stdout.
''' '''
runid = None #_RUNID_GEN.next()i runid = None #_RUNID_GEN.next()
resp = None resp = None
cmd = [path]
if args is not None:
cmd.extend(args)
try: try:
child = ComponentProxy(FORKER).fork([basil_path], 'apbridge', child = ComponentProxy(FORKER).fork(cmd, 'apbridge',
'alps', None, None, runid, in_str, True) 'alps', None, None, runid, in_str, True)
runid = child runid = child
except Exception: except Exception:
...@@ -253,8 +349,8 @@ def _call_sys_forker(basil_path, in_str): ...@@ -253,8 +349,8 @@ def _call_sys_forker(basil_path, in_str):
# invalid. If we never got one, then let the # invalid. If we never got one, then let the
# caller handle the error. # caller handle the error.
if child['exit_status'] != 0: if child['exit_status'] != 0:
_logger.error("BASIL returned a status of %s", _logger.error("%s returned a status of %s, stderr: %s",
child['exit_status']) cmd, child['exit_status'], "\n".join(child['stderr']))
resp = child['stdout_string'] resp = child['stdout_string']
try: try:
ComponentProxy(FORKER).cleanup_children([runid]) ComponentProxy(FORKER).cleanup_children([runid])
...@@ -265,7 +361,31 @@ def _call_sys_forker(basil_path, in_str): ...@@ -265,7 +361,31 @@ def _call_sys_forker(basil_path, in_str):
if complete: if complete:
break break
sleep(CHILD_SLEEP_TIMEOUT) sleep(CHILD_SLEEP_TIMEOUT)
return resp
def _call_sys_forker_basil(basil_path, in_str):
'''Make a call through to BASIL wait until we get output and clean up
child info.
Args:
basil_path: path to the BAISL executable. May be overriden for
test environments.
in_str: A string of XML to send to 'apbasil'
Returns:
The XML response parsed into a Python dictionary.
Exceptions:
Will raise a xmlrpclib.Fault if communication with the bridge
and/or system component fails completely at startup.
Notes:
This will block until 'apbasil' completion. 'apbasil' messages can
be failrly large for things sent to stdout.
'''
resp = _call_sys_forker(basil_path, 'apbridge', 'alps', in_str=in_str)
parsed_resp = {} parsed_resp = {}
try: try:
parsed_resp = parse_response(resp) parsed_resp = parse_response(resp)
...@@ -274,6 +394,27 @@ def _call_sys_forker(basil_path, in_str): ...@@ -274,6 +394,27 @@ def _call_sys_forker(basil_path, in_str):
raise exc raise exc
return parsed_resp return parsed_resp
def _call_sys_forker_capmc(capmc_path, args):
'''Call a CAPMC command and recieve response'''
resp = _call_sys_forker(capmc_path, 'apbridge', 'capmc_ssd', args=args)
parsed_response = {}
try:
parsed_response = json.loads(resp)
except TypeError:
_logger.error("Bad type recieved for CAPMC response, expected %s got %s.", type(""), type(resp))
raise
except ValueError:
_logger.error("Invalid JSON string returned: %s", resp)
else:
err_code = parsed_response.get('e', None)
err_msg = parsed_response.get('err_msg', None)
if err_code is None:
raise ValueError('Error code in CAPMC response not provided. Invalid response recieved. %s', resp)
if int(err_code) != 0:
raise ValueError('Error Code %s recieved. Message: %s', err_code, err_msg)
return parsed_response
def print_node_names(spec): def print_node_names(spec):
'''Debugging utility to print nodes returned by ALPS''' '''Debugging utility to print nodes returned by ALPS'''
print spec['reservations'] print spec['reservations']
......
...@@ -202,6 +202,12 @@ class CraySystem(BaseSystem): ...@@ -202,6 +202,12 @@ class CraySystem(BaseSystem):
reservations = ALPSBridge.fetch_reservations() reservations = ALPSBridge.fetch_reservations()
_logger.info('ALPS RESERVATION DATA FETCHED') _logger.info('ALPS RESERVATION DATA FETCHED')
# reserved_nodes = ALPSBridge.reserved_nodes() # reserved_nodes = ALPSBridge.reserved_nodes()
ssd_enabled = ALPSBridge.fetch_ssd_enable()
_logger.info('CAPMC SSD ENABLED DATA FETCHED')
ssd_info = ALPSBridge.fetch_ssd_static_data()
_logger.info('CAPMC SSD DETAIL DATA FETCHED')
ssd_diags = ALPSBridge.fetch_ssd_diags()
_logger.info('CAPMC SSD DIAG DATA FETCHED')
except Exception: except Exception:
#don't crash out here. That may trash a statefile. #don't crash out here. That may trash a statefile.
_logger.error('Possible transient encountered during initialization. Retrying.', _logger.error('Possible transient encountered during initialization. Retrying.',
...@@ -210,7 +216,7 @@ class CraySystem(BaseSystem): ...@@ -210,7 +216,7 @@ class CraySystem(BaseSystem):
else: else:
pending = False pending = False
self._assemble_nodes(inventory, system) self._assemble_nodes(inventory, system, ssd_enabled, ssd_info, ssd_diags)
#Reversing the node name to id lookup is going to save a lot of cycles. #Reversing the node name to id lookup is going to save a lot of cycles.
for node in self.nodes.values(): for node in self.nodes.values():
self.node_name_to_id[node.name] = node.node_id self.node_name_to_id[node.name] = node.node_id
...@@ -219,10 +225,23 @@ class CraySystem(BaseSystem): ...@@ -219,10 +225,23 @@ class CraySystem(BaseSystem):
# self._assemble_reservations(reservations, reserved_nodes) # self._assemble_reservations(reservations, reserved_nodes)
return return
def _assemble_nodes(self, inventory, system): def _assemble_nodes(self, inventory, system, ssd_enabled, ssd_info, ssd_diags):
'''merge together the INVENTORY and SYSTEM query data to form as '''merge together the INVENTORY and SYSTEM query data to form as
complete a picture of a node as we can. complete a picture of a node as we can.
Args:
inventory - ALPS QUERY(INVENTORY) data
system - ALPS QUERY(SYSTEM) data
ssd_enable - CAPMC get_ssd_enable data
ssd_info - CAPMC get_ssds data
ssd_diags - CAPMC get_ssd_diags data
Returns:
None
Side Effects:
Populates the node dictionary
''' '''
nodes = {} nodes = {}
for nodespec in inventory['nodes']: for nodespec in inventory['nodes']:
...@@ -236,8 +255,34 @@ class CraySystem(BaseSystem): ...@@ -236,8 +255,34 @@ class CraySystem(BaseSystem):
if nodes[node_id].role.upper() not in ['BATCH']: if nodes[node_id].role.upper() not in ['BATCH']:
nodes[node_id].status = 'down' nodes[node_id].status = 'down'
nodes[node_id].status = nodespec['state'] nodes[node_id].status = nodespec['state']
self._update_ssd_data(nodes, ssd_enabled, ssd_info, ssd_diags)
self.nodes = nodes self.nodes = nodes
def _update_ssd_data(self, nodes, ssd_enabled=None, ssd_info=None, ssd_diags=None):
'''Update/add ssd data from CAPMC'''
if ssd_enabled is not None:
for ssd_data in ssd_enabled['nids']:
try:
nodes[str(ssd_data['nid'])].attributes['ssd_enabled'] = int(ssd_data['ssd_enable'])
except KeyError:
_logger.warning('ssd info present for nid %s, but not reported in ALPS.', ssd_data['nid'])
if ssd_info is not None:
for ssd_data in ssd_info['nids']:
try:
nodes[str(ssd_data['nid'])].attributes['ssd_info'] = ssd_data
except KeyError:
_logger.warning('ssd info present for nid %s, but not reported in ALPS.', ssd_data['nid'])
if ssd_diags is not None:
for diag_info in ssd_diags['ssd_diags']:
try:
node = nodes[str(diag_info['nid'])]
except KeyError:
_logger.warning('ssd diag data present for nid %s, but not reported in ALPS.', ssd_data['nid'])
else:
for field in ['life_remaining', 'ts', 'firmware', 'percent_used']:
node.attributes['ssd_info'][field] = diag_info[field]
def _assemble_reservations(self, reservations, reserved_nodes): def _assemble_reservations(self, reservations, reserved_nodes):
# FIXME: we can recover reservations now. Implement this. # FIXME: we can recover reservations now. Implement this.
pass pass
...@@ -341,6 +386,7 @@ class CraySystem(BaseSystem): ...@@ -341,6 +386,7 @@ class CraySystem(BaseSystem):
self.nodes[str(nid)] = new_node self.nodes[str(nid)] = new_node
self.logger.warning('Node %s added to tracking.', nid) self.logger.warning('Node %s added to tracking.', nid)
@exposed @exposed
def update_node_state(self): def update_node_state(self):
'''update the state of cray nodes. Check reservation status and system '''update the state of cray nodes. Check reservation status and system
...@@ -371,6 +417,9 @@ class CraySystem(BaseSystem): ...@@ -371,6 +417,9 @@ class CraySystem(BaseSystem):
inven_nodes = ALPSBridge.extract_system_node_data(ALPSBridge.system()) inven_nodes = ALPSBridge.extract_system_node_data(ALPSBridge.system())
reservations = ALPSBridge.fetch_reservations() reservations = ALPSBridge.fetch_reservations()
#reserved_nodes = ALPSBridge.reserved_nodes() #reserved_nodes = ALPSBridge.reserved_nodes()
# Fetch SSD diagnostic data and enabled flags. I would hope these change in event of dead ssd
ssd_enabled = ALPSBridge.fetch_ssd_enable()
ssd_diags = ALPSBridge.fetch_ssd_diags()
except (ALPSBridge.ALPSError, ComponentLookupError): except (ALPSBridge.ALPSError, ComponentLookupError):
_logger.warning('Error contacting ALPS for state update. Aborting this update', _logger.warning('Error contacting ALPS for state update. Aborting this update',
exc_info=True) exc_info=True)
...@@ -485,6 +534,8 @@ class CraySystem(BaseSystem): ...@@ -485,6 +534,8 @@ class CraySystem(BaseSystem):
self._reconstruct_node(inven_node, recon_inventory) self._reconstruct_node(inven_node, recon_inventory)
# _logger.error('UNS: ALPS reports node %s but not in our node list.', # _logger.error('UNS: ALPS reports node %s but not in our node list.',
# inven_node['node_id']) # inven_node['node_id'])
# Update SSD data:
self._update_ssd_data(self.nodes, ssd_enabled=ssd_enabled, ssd_diags=ssd_diags)
#should down win over running in terms of display? #should down win over running in terms of display?
#keep node that are marked for cleanup still in cleanup #keep node that are marked for cleanup still in cleanup
for node in cleanup_nodes: for node in cleanup_nodes:
......
...@@ -1376,8 +1376,8 @@ def print_node_details(args): ...@@ -1376,8 +1376,8 @@ def print_node_details(args):
retval = str(value) retval = str(value)
return retval return retval
nodes = component_call(SYSMGR, False, 'get_nodes', nodes = json.loads(component_call(SYSMGR, False, 'get_nodes',
(True, expand_node_args(args))) (True, expand_node_args(args), None, True)))
res_queues = _setup_res_info() res_queues = _setup_res_info()
for node in nodes.values(): for node in nodes.values():
header_list = [] header_list = []
...@@ -1393,6 +1393,10 @@ def print_node_details(args): ...@@ -1393,6 +1393,10 @@ def print_node_details(args):
if res_queues.get(str(node['node_id']), False): if res_queues.get(str(node['node_id']), False):
queues.extend(res_queues[str(node['node_id'])]) queues.extend(res_queues[str(node['node_id'])])
value_list.append(':'.join(queues)) value_list.append(':'.join(queues))
elif key == 'attributes':
for attr_key, attr_val in value.items():
header_list.append(key +'.'+ attr_key )
value_list.append(gen_printable_value(attr_val))
else: else:
header_list.append(key) header_list.append(key)
value_list.append(gen_printable_value(value)) value_list.append(gen_printable_value(value))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment