Commit 8dfa0897 authored by Paul Rich's avatar Paul Rich
Browse files

Support for node attributes from system query added.

We can now get and properly display node attributes via the system type
parent 2227dc94
......@@ -12,7 +12,7 @@ from Cobalt.Proxy import ComponentProxy
from Cobalt.Data import IncrID
from Cobalt.Util import sleep
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.Util import compact_num_list
from Cobalt.Util import compact_num_list, expand_num_list
_logger = logging.getLogger()
......@@ -149,18 +149,46 @@ def fetch_inventory(changecount=None, resinfo=False):
params['resinfo'] = True
#TODO: add a flag for systems with version <=1.4 of ALPS
req = BasilRequest('QUERY', 'INVENTORY', params)
#print str(req)
return _call_sys_forker(BASIL_PATH, str(req))
def fetch_system():
params = {}
req = BasilRequest('QUERY', 'SYSTEM', params)
def fetch_reservations():
'''fetch reservation data. This includes reservation metadata but not the
reserved nodes.
params = {'resinfo': True, 'nonodes' : True}
req = BasilRequest('QUERY', 'INVENTORY', params)
return _call_sys_forker(BASIL_PATH, str(req))
def fetch_reserved_nodes():
def reserved_nodes():
params = {}
req = BasilRequest('QUERY', 'RESERVEDNODES', params)
return _call_sys_forker(BASIL_PATH, str(req))
def fetch_aggretate_reservation_data():
'''correlate node and reservation data to get which nodes are in which
def extract_system_node_data(node_data):
ret_nodeinfo = {}
for node_info in node_data['nodes']:
#extract nodeids, construct from bulk data block...
for node_id in expand_num_list(node_info['node_ids']):
node = {}
node['node_id'] = node_id
node['state'] = node_info['state']
node['role'] = node_info['role']
node['attrs'] = node_info
ret_nodeinfo[str(node_id)] = node
del node_info['state']
del node_info['node_ids']
del node_info['role']
return ret_nodeinfo
def _call_sys_forker(basil_path, in_str):
'''take a parameter dictionary and make appropriate call through to BASIL
wait until we get output and clean up child info.'''
......@@ -207,7 +235,10 @@ def print_node_names(spec):
print node['name']
if __name__ == '__main__':
#print fetch_inventory(changecount=0)
print fetch_system()
# print fetch_inventory(changecount=0)
# print extract_system_node_data(system())
# print fetch_reserved_nodes()
# print fetch_inventory(resinfo=True)
print fetch_reservations()
......@@ -142,7 +142,14 @@ class CraySystem(BaseSystem):
pending = True
while pending:
inventory = ALPSBridge.fetch_inventory(resinfo=True)
# None of these queries has strictly degenerate data. Inventory
# is needed for raw reservation data. System gets memory and a
# much more compact representation of data. Reservednodes gives
# which notes are reserved.
inventory = ALPSBridge.fetch_inventory()
system = ALPSBridge.extract_system_node_data(ALPSBridge.system())
reservations = ALPSBridge.fetch_reservations()
# reserved_nodes = ALPSBridge.reserved_nodes()
except Exception:
#don't crash out here. That may trash a statefile.
_logger.error('Possible transient encountered during initialization. Retrying.',
......@@ -151,16 +158,34 @@ class CraySystem(BaseSystem):
pending = False
for nodespec in inventory['nodes']:
node = CrayNode(nodespec)
node.managed = True
retnodes[node.node_id] = node
self.nodes = retnodes
self._assemble_nodes(inventory, system)
#Reversing the node name to id lookup is going to save a lot of cycles.
for node in self.nodes.values():
self.node_name_to_id[] = node.node_id'NODE INFORMATION INITIALIZED')'ALPS REPORTS %s NODES', len(self.nodes))
# self._assemble_reservations(reservations, reserved_nodes)
def _assemble_nodes(self, inventory, system):
'''merge together the INVENTORY and SYSTEM query data to form as
complete a picture of a node as we can.
nodes = {}
for nodespec in inventory['nodes']:
node = CrayNode(nodespec)
node.managed = True
nodes[node.node_id] = node
for node_id, nodespec in system.iteritems():
nodes[node_id].status = nodespec['state']
#TODO: put in a disable for nodes in a non-batch role
self.nodes = nodes
def _assemble_reservations(self, reservations, reserved_nodes):
# FIXME: we can recover reservations now. Implement this.
def _gen_node_to_queue(self):
'''(Re)Generate a mapping for fast lookup of node-id's to queues.'''
......@@ -226,8 +251,13 @@ class CraySystem(BaseSystem):
now = time.time()
with self._node_lock:
#I have seen problems with the kitchen-sink query here, where
#the output gets truncated on it's way into Cobalt.
inventory = ALPSBridge.fetch_inventory(resinfo=True) #This is a full-refresh,
#determine if summary may be used under normal operation
#updated for >= 1.6 interface
#inventory = ALPSBridge.fetch_system()
#reserved_nodes = ALPSBridge.fetch_reserved_nodes()
except (ALPSBridge.ALPSError, ComponentLookupError):
_logger.warning('Error contacting ALPS for state update. Aborting this update',
......@@ -264,7 +294,6 @@ class CraySystem(BaseSystem):
for alps_res in self.alps_reservations.values():
#find alps_id associated reservation
found = False
if int(alps_res.alps_res_id) not in current_alps_res_ids:
#for res_info in inven_reservations:
#if int(alps_res.alps_res_id) == int(res_info['reservation_id']):
......@@ -308,8 +337,6 @@ class CraySystem(BaseSystem):
node.release(user=None, jobid=None, force=True)
_logger.debug('node %s should be marked idle',
node.status = inven_node['state'].upper()
# Cannot add nodes on the fly. Or at lesat we shouldn't be
......@@ -465,8 +492,6 @@ class CraySystem(BaseSystem):
# reservation need to do extra work for a reservation
idle_nodecount = idle_nodes_by_queue[job['queue']]
_logger.debug('idle_nodes_by_queue: %s',
node_id_list = list(self.nodes_by_queue[job['queue']])
if 'location' in job['attrs'].keys():
job_set = set([int(nid) for nid in
......@@ -539,7 +564,7 @@ class CraySystem(BaseSystem):
on the set of nodes, and will mark nodes as allocated.
attrs = job['attrs']
_logger.debug('attrs: %s', job['attrs'])
res_info = ALPSBridge.reserve(job['user'], job['jobid'],
int(job['nodes']), job['attrs'], node_id_list)
......@@ -761,6 +786,12 @@ class CraySystem(BaseSystem):
#Right now this does nothing. Still figuring out what a valid
#specification looks like.
# FIXME: Pull this out of the system configuration from ALPS ultimately.
# For now, set this from config for the PE count per node
# nodes = int(spec['nodes'])
# proccount = spec.get('proccount', None)
# if proccount is None:
# nodes *
return spec
......@@ -34,7 +34,7 @@ class Resource(object):
def reset_info(self, node):
'''reset node information on restart from a stored node object'''
self.attributes = node.attributes
#self.attributes = node.attributes
self.reserved_by = node.reserved_by
self.reserved_jobid = node.reserved_jobid
self.reserved_until = node.reserved_until
......@@ -1297,6 +1297,16 @@ def print_node_list():
def print_node_details(args):
'''fetch and print a detailed view of node information'''
def gen_printable_value(value):
if isinstance(value, dict):
retval = ', '.join(['%s: %s'% (k, gen_printable_value(v)) for k, v in
elif isinstance(value, list):
retval = ', '.join([gen_printable_value(v) for v in value])
retval = str(value)
return retval
nodes = component_call(SYSMGR, False, 'get_nodes',
(True, expand_node_args(args)))
for node in nodes.values():
......@@ -1305,18 +1315,17 @@ def print_node_details(args):
for key, value in node.iteritems():
if isinstance(value, dict):
for k, v in value.iteritems():
value_list.append('%s: %s'% (k, v))
elif isinstance(value, list):
value_list.append('\n'.join([str(v) for v in value]))
elif key == 'node_id':
# if isinstance(value, dict):
# header_list.append(key)
# value_list.append(gen_printable_value(value))
# elif isinstance(value, list):
# header_list.append(key)
# value_list.append(gen_printable_value(value))
if key == 'node_id':
print_vertical([header_list, value_list])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment