Commit e73f7cc1 authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'Enh-14-use-system-reservednodes' into 'master'

Enh 14 use system reservednodes

Smaller system query added.  RESERVENODES support will be added in a later ticket.  Also has fixes from first encounters with Kachina.

See merge request !5
parents e044fed8 6bb85c2d
......@@ -10,7 +10,7 @@ import Cobalt.Components.pg_forker
PGChild = Cobalt.Components.pg_forker.PGChild
PGForker = Cobalt.Components.pg_forker.PGForker
import Cobalt.Util
from Cobalt.Util import init_cobalt_config, get_config_option
from cray_messaging import BasilRequest
from cray_messaging import parse_response, ALPSError
......@@ -22,7 +22,10 @@ convert_argv_to_quoted_command_string = Cobalt.Util.convert_argv_to_quoted_comma
_logger = logging.getLogger(__name__.split('.')[-1])
BASIL_PATH = '/home/richp/alps_simulator/' #fetch this from config
BASIL_PATH = get_config_option('alps', 'basil',
class ALPSScriptChild (PGChild):
......@@ -12,7 +12,7 @@ from Cobalt.Proxy import ComponentProxy
from Cobalt.Data import IncrID
from Cobalt.Util import sleep
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.Util import compact_num_list
from Cobalt.Util import compact_num_list, expand_num_list
_logger = logging.getLogger()
......@@ -24,7 +24,7 @@ BASIL_PATH = get_config_option('alps', 'basil',
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
DEFAULT_DEPTH = int(get_config_option('alps', 'default_depth', 8))
DEFAULT_DEPTH = int(get_config_option('alps', 'default_depth', 72))
class BridgeError(Exception):
'''Exception class so that we may easily recognize bridge-specific errors.'''
......@@ -56,9 +56,9 @@ def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
params['user_name'] = user
params['batch_id'] = jobid
param_attrs['width'] = attributes.get('width', nodecount)
param_attrs['depth'] = attributes.get('depth', DEFAULT_DEPTH)
param_attrs['nppn'] = attributes.get('nnpn', None)
param_attrs['width'] = attributes.get('width', nodecount * DEFAULT_DEPTH)
param_attrs['depth'] = attributes.get('depth', None)
param_attrs['nppn'] = attributes.get('nppn', DEFAULT_DEPTH)
param_attrs['npps'] = attributes.get('nnps', None)
param_attrs['nspn'] = attributes.get('nspn', None)
param_attrs['reservation_mode'] = attributes.get('reservation_mode',
......@@ -71,8 +71,11 @@ def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
params[key] = val
if node_id_list is not None:
params['node_list'] = [int(i) for i in node_id_list]
_logger.debug('reserve request: %s', str(BasilRequest('RESERVE',
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
_logger.debug('reserve return %s', retval)
return retval
def release(alps_res_id):
......@@ -144,9 +147,49 @@ def fetch_inventory(changecount=None, resinfo=False):
params['changecount'] = changecount
if resinfo:
params['resinfo'] = True
#TODO: add a flag for systems with version <=1.4 of ALPS
req = BasilRequest('QUERY', 'INVENTORY', params)
#print str(req)
return _call_sys_forker(BASIL_PATH, str(req))
def fetch_reservations():
'''fetch reservation data. This includes reservation metadata but not the
reserved nodes.
params = {'resinfo': True, 'nonodes' : True}
req = BasilRequest('QUERY', 'INVENTORY', params)
return _call_sys_forker(BASIL_PATH, str(req))
def reserved_nodes():
params = {}
req = BasilRequest('QUERY', 'RESERVEDNODES', params)
return _call_sys_forker(BASIL_PATH, str(req))
def fetch_aggretate_reservation_data():
'''correlate node and reservation data to get which nodes are in which
def extract_system_node_data(node_data):
ret_nodeinfo = {}
for node_info in node_data['nodes']:
#extract nodeids, construct from bulk data block...
for node_id in expand_num_list(node_info['node_ids']):
node = {}
node['node_id'] = node_id
node['state'] = node_info['state']
node['role'] = node_info['role']
node['attrs'] = node_info
ret_nodeinfo[str(node_id)] = node
del node_info['state']
del node_info['node_ids']
del node_info['role']
return ret_nodeinfo
def _call_sys_forker(basil_path, in_str):
'''take a parameter dictionary and make appropriate call through to BASIL
wait until we get output and clean up child info.'''
......@@ -193,7 +236,10 @@ def print_node_names(spec):
print node['name']
if __name__ == '__main__':
#print fetch_inventory(changecount=0)
print reserve('richp', 42, 11)
# print fetch_inventory(changecount=0)
# print extract_system_node_data(system())
# print fetch_reserved_nodes()
# print fetch_inventory(resinfo=True)
print fetch_reservations()
......@@ -142,7 +142,14 @@ class CraySystem(BaseSystem):
pending = True
while pending:
inventory = ALPSBridge.fetch_inventory(resinfo=True)
# None of these queries has strictly degenerate data. Inventory
# is needed for raw reservation data. System gets memory and a
# much more compact representation of data. Reservednodes gives
# which notes are reserved.
inventory = ALPSBridge.fetch_inventory()
system = ALPSBridge.extract_system_node_data(ALPSBridge.system())
reservations = ALPSBridge.fetch_reservations()
# reserved_nodes = ALPSBridge.reserved_nodes()
except Exception:
#don't crash out here. That may trash a statefile.
_logger.error('Possible transient encountered during initialization. Retrying.',
......@@ -151,16 +158,34 @@ class CraySystem(BaseSystem):
pending = False
for nodespec in inventory['nodes']:
node = CrayNode(nodespec)
node.managed = True
retnodes[node.node_id] = node
self.nodes = retnodes
self._assemble_nodes(inventory, system)
#Reversing the node name to id lookup is going to save a lot of cycles.
for node in self.nodes.values():
self.node_name_to_id[] = node.node_id'NODE INFORMATION INITIALIZED')'ALPS REPORTS %s NODES', len(self.nodes))
# self._assemble_reservations(reservations, reserved_nodes)
def _assemble_nodes(self, inventory, system):
'''merge together the INVENTORY and SYSTEM query data to form as
complete a picture of a node as we can.
nodes = {}
for nodespec in inventory['nodes']:
node = CrayNode(nodespec)
node.managed = True
nodes[node.node_id] = node
for node_id, nodespec in system.iteritems():
nodes[node_id].status = nodespec['state']
#TODO: put in a disable for nodes in a non-batch role
self.nodes = nodes
def _assemble_reservations(self, reservations, reserved_nodes):
# FIXME: we can recover reservations now. Implement this.
def _gen_node_to_queue(self):
'''(Re)Generate a mapping for fast lookup of node-id's to queues.'''
......@@ -225,15 +250,24 @@ class CraySystem(BaseSystem):
#current alps reservations, the node is ready to schedule again.
now = time.time()
with self._node_lock:
fetch_time_start = time.time()
inventory = ALPSBridge.fetch_inventory(resinfo=True) #This is a full-refresh,
#I have seen problems with the kitchen-sink query here, where
#the output gets truncated on it's way into Cobalt.
#inventory = ALPSBridge.fetch_inventory(resinfo=True) #This is a full-refresh,
#determine if summary may be used under normal operation
#updated for >= 1.6 interface
inven_nodes = ALPSBridge.extract_system_node_data(ALPSBridge.system())
#inventory = ALPSBridge.system()
reservations = ALPSBridge.fetch_reservations()
#reserved_nodes = ALPSBridge.reserved_nodes()
except (ALPSBridge.ALPSError, ComponentLookupError):
_logger.warning('Error contacting ALPS for state update. Aborting this update',
inven_nodes = inventory['nodes']
inven_reservations = inventory['reservations']
inven_reservations = reservations.get('reservations', []) # no reservations will be blank
fetch_time_start = time.time()
_logger.debug("time in ALPS fetch: %s seconds", (time.time() - fetch_time_start))
start_time = time.time()
# if node.status not in ['cleanup', 'cleanup-pending']:
# node.status = 'idle'
......@@ -264,7 +298,6 @@ class CraySystem(BaseSystem):
for alps_res in self.alps_reservations.values():
#find alps_id associated reservation
found = False
if int(alps_res.alps_res_id) not in current_alps_res_ids:
#for res_info in inven_reservations:
#if int(alps_res.alps_res_id) == int(res_info['reservation_id']):
......@@ -293,7 +326,7 @@ class CraySystem(BaseSystem):
#find hardware status
for inven_node in inven_nodes:
for inven_node in inven_nodes.values():
if self.nodes.has_key(str(inven_node['node_id'])):
node = self.nodes[str(inven_node['node_id'])]
if node.reserved:
......@@ -308,8 +341,6 @@ class CraySystem(BaseSystem):
node.release(user=None, jobid=None, force=True)
_logger.debug('node %s should be marked idle',
node.status = inven_node['state'].upper()
# Cannot add nodes on the fly. Or at lesat we shouldn't be
......@@ -465,8 +496,6 @@ class CraySystem(BaseSystem):
# reservation need to do extra work for a reservation
idle_nodecount = idle_nodes_by_queue[job['queue']]
_logger.debug('idle_nodes_by_queue: %s',
node_id_list = list(self.nodes_by_queue[job['queue']])
if 'location' in job['attrs'].keys():
job_set = set([int(nid) for nid in
......@@ -539,7 +568,7 @@ class CraySystem(BaseSystem):
on the set of nodes, and will mark nodes as allocated.
attrs = job['attrs']
_logger.debug('attrs: %s', job['attrs'])
res_info = ALPSBridge.reserve(job['user'], job['jobid'],
int(job['nodes']), job['attrs'], node_id_list)
......@@ -761,6 +790,12 @@ class CraySystem(BaseSystem):
#Right now this does nothing. Still figuring out what a valid
#specification looks like.
# FIXME: Pull this out of the system configuration from ALPS ultimately.
# For now, set this from config for the PE count per node
# nodes = int(spec['nodes'])
# proccount = spec.get('proccount', None)
# if proccount is None:
# nodes *
return spec
......@@ -34,7 +34,7 @@ class Resource(object):
def reset_info(self, node):
'''reset node information on restart from a stored node object'''
self.attributes = node.attributes
#self.attributes = node.attributes
self.reserved_by = node.reserved_by
self.reserved_jobid = node.reserved_jobid
self.reserved_until = node.reserved_until
......@@ -1286,14 +1286,27 @@ def print_node_list():
for node in nodes.values():
entry = []
for key in header:
if key.lower() == 'node_id':
printTabular([header] + print_nodes)
printTabular([header] + sorted(print_nodes))
else:'System has no nodes defined')
def print_node_details(args):
'''fetch and print a detailed view of node information'''
def gen_printable_value(value):
if isinstance(value, dict):
retval = ', '.join(['%s: %s'% (k, gen_printable_value(v)) for k, v in
elif isinstance(value, list):
retval = ', '.join([gen_printable_value(v) for v in value])
retval = str(value)
return retval
nodes = component_call(SYSMGR, False, 'get_nodes',
(True, expand_node_args(args)))
for node in nodes.values():
......@@ -1302,18 +1315,17 @@ def print_node_details(args):
for key, value in node.iteritems():
if isinstance(value, dict):
for k, v in value.iteritems():
value_list.append('%s: %s'% (k, v))
elif isinstance(value, list):
value_list.append('\n'.join([str(v) for v in value]))
elif key == 'node_id':
# if isinstance(value, dict):
# header_list.append(key)
# value_list.append(gen_printable_value(value))
# elif isinstance(value, list):
# header_list.append(key)
# value_list.append(gen_printable_value(value))
if key == 'node_id':
print_vertical([header_list, value_list])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment