Commit 1e10e8ff authored by Paul Rich's avatar Paul Rich
Browse files

MCDRAM/NUMA allocator and defaults for qsub added.

Adding in a better validtor to prevent issues with users typing bad
NUMA/MCDRAM modes.  Also, adding a default setting if none provided.
parent 1659a71c
......@@ -275,7 +275,7 @@ def update_spec(parser, opts, spec, opt2spec):
This function will update the appropriate spec values with the opts values
# Get the key validated values into spec dictionary
for opt in ['mode', 'proccount', 'nodecount']:
for opt in ['mode', 'proccount', 'nodecount', 'attrs']:
spec[opt2spec[opt]] = opts[opt]
# Hack until the Cluster Systems get re-written.
......@@ -24,16 +24,13 @@ from Cobalt.Util import compact_num_list, expand_num_list
from Cobalt.Util import init_cobalt_config, get_config_option
_logger = logging.getLogger(__name__)
UPDATE_THREAD_TIMEOUT = int(get_config_option('alpssystem',
'update_thread_timeout', 10))
TEMP_RESERVATION_TIME = int(get_config_option('alpssystem',
'temp_reservation_time', 300))
UPDATE_THREAD_TIMEOUT = int(get_config_option('alpssystem', 'update_thread_timeout', 10))
TEMP_RESERVATION_TIME = int(get_config_option('alpssystem', 'temp_reservation_time', 300))
SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.0))
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem',
'pending_startup_timeout', 1200)) #default 20 minutes to account for boot.
#default 20 minutes to account for boot.
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pending_startup_timeout', 1200))
APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apkill')
DRAIN_MODE = get_config_option('system', 'drain_mode', 'first-fit')
#cleanup time in seconds
......@@ -46,7 +43,11 @@ if ELOGIN_HOSTS == ['']:
DRAIN_MODES = ['first-fit', 'backfill']
DEFAULT_MCDRAM_MODE = get_config_option('alpssystem', 'default_mcdram_mode', 'cache')
DEFAULT_NUMA_MODE = get_config_option('alpssystem', 'default_numa_mode', 'quad')
MCDRAM_TO_CACHEPCT = {'flat':'0', 'cache':'100', 'split':'25', 'equal':'50', '0':'0', '25':'25', '50':'50', '100':'100'}
VALID_MCDRAM_MODES = ['flat', 'cache', 'split', 'equal', '0', '25', '50', '100']
VALID_NUMA_MODES = ['a2a', 'hemi', 'quad', 'snc2', 'snc4']
def chain_loc_list(loc_list):
......@@ -796,6 +797,40 @@ class CraySystem(BaseSystem):
ret_nodes = node_id_list[:int(job['nodes'])]
return ret_nodes
def _select_first_nodes_prefer_memory_match(self, job, node_id_list):
'''Given a list of nids, select the first node count nodes fromt the
list. Prefer nodes that match the memory modes for a given job, then
go in nid order.
job - dictionary of job data from the scheduler
node_id_list - a list of possible candidate nodes
A list of nodes. [] if insufficient nodes for the allocation.
Note: hold the node lock while doing this. We really don't want a
update to happen while doing this.
if job.get('attrs', {}).get('mcdram', None) is None or job.get('attrs', {}).get('numa', None) is None:
# insufficient information to include a mode match
return self._select_first_nodes(job, node_id_list)
ret_nodes = []
with self._node_lock:
considered_nodes = [node for node in self.nodes.values() if node.node_id in node_id_list]
for node in considered_nodes:
if (node.attributes['hbm_cache_pct'] == MCDRAM_TO_CACHEPCT[job['attrs']['mcdram']] and
node.attributes['numa_cfg'] == job['attrs']['numa']):
if len(ret_nodes) < int(job['nodes']):
node_id_list.sort(key=lambda nid: int(nid))
for nid in node_id_list:
if self.nodes[nid] not in ret_nodes:
ret_nids = [node.node_id for node in ret_nodes]
return ret_nids[:int(job['nodes'])]
def _associate_and_run_immediate(self, job, resource_until_time, node_id_list):
'''Given a list of idle node ids, choose a set that can run a job
immediately, if a set exists in the node_id_list.
......@@ -816,7 +851,7 @@ class CraySystem(BaseSystem):
compact_locs = None
if int(job['nodes']) <= len(node_id_list):
#this job can be run immediately
to_alps_list = self._select_first_nodes(job, node_id_list)
to_alps_list = self._select_first_nodes_prefer_memory_match(job, node_id_list)
job_locs = self._ALPS_reserve_resources(job, resource_until_time,
if job_locs is not None and len(job_locs) == int(job['nodes']):
......@@ -1252,8 +1287,21 @@ class CraySystem(BaseSystem):
#Right now this does nothing. Still figuring out what a valid
#specification looks like.
#set default memory modes.
if spec.get('attrs', None) is None:
spec['attrs'] = {'mcdram': DEFAULT_MCDRAM_MODE, 'numa': DEFAULT_NUMA_MODE}
if spec['attrs'].get('mcdram', None) is None:
spec['attrs']['mcdram'] = DEFAULT_MCDRAM_MODE
if spec['attrs'].get('numa', None) is None:
spec['attrs']['numa'] = DEFAULT_NUMA_MODE
# It helps when the mode requested actually exists.
if spec['attrs']['mcdram'] not in VALID_MCDRAM_MODES:
raise JobValidationError('mcdram %s not valid must be one of: %s'% (spec['attrs']['mcdram'],
if spec['attrs']['numa'] not in VALID_NUMA_MODES:
raise JobValidationError('numa %s not valid must be one of: %s' % (spec['attrs']['numa'],
', '.join(VALID_NUMA_MODES)))
# mode on this system defaults to script.
mode = spec.get('mode', None)
if ((mode is None) or (mode == False)):
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment