Commit 52329a8d authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '93-enh-ssd-request' into 'develop'

Resolve "Enh: Add abilitiy to request SSD nodes"

Closes #93

See merge request !66
parents dadabcb6 a59e7346
......@@ -538,6 +538,9 @@ on each node for XC40 systems. The default value is 72.
.SS [alpssystem]
.B min_ssd_size
The size of the smallest SSD available on the system in GB.
.B pgroup_startup_timeout
The time to allow for process group startup in seconds. The default is 120
......@@ -34,6 +34,8 @@ APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apk
DRAIN_MODE = get_config_option('system', 'drain_mode', 'first-fit')
#cleanup time in seconds
CLEANUP_DRAIN_WINDOW = get_config_option('system', 'cleanup_drain_window', 300)
#SSD information
DEFAULT_MIN_SSD_SIZE = int(get_config_option('alpssystem', 'min_ssd_size', -1))
#Epsilon for backfilling. This system does not do this on a per-node basis.
BACKFILL_EPSILON = int(get_config_option('system', 'backfill_epsilon', 120))
......@@ -757,12 +759,19 @@ class CraySystem(BaseSystem):
return equiv
def _setup_special_locaitons(job):
def _setup_special_locations(self, job):
forbidden = set([str(loc) for loc in chain_loc_list(job.get('forbidden', []))])
required = set([str(loc) for loc in chain_loc_list(job.get('required', []))])
requested_locations = set([str(n) for n in expand_num_list(job['attrs'].get('location', ''))])
return (forbidden, required, requested_locations)
# If ssds are required, add nodes without working SSDs to the forbidden list
ssd_unavail = set([])
if job.get('attrs', {}).get("ssds", "none").lower() == "required":
ssd_min_size = int(job.get('attrs', {}).get("ssd_size", DEFAULT_MIN_SSD_SIZE)) * 1000000000 #convert to bytes
ssd_unavail.update(set([str(node.node_id) for node in self.nodes.values()
if (node.attributes['ssd_enabled'] == 0 or
int(node.attributes.get('ssd_info', {'size': DEFAULT_MIN_SSD_SIZE})['size']) < ssd_min_size)
return (forbidden, required, requested_locations, ssd_unavail)
def _assemble_queue_data(self, job, idle_only=True, drain_time=None):
'''put together data for a queue, or queue-like reservation structure.
......@@ -782,7 +791,7 @@ class CraySystem(BaseSystem):
# have to intersect required nodes with the idle and available
# we also have to forbid a bunch of locations, in this case.
unavailable_nodes = []
forbidden, required, requested_locations = self._setup_special_locaitons(job)
forbidden, required, requested_locations, ssd_unavail = self._setup_special_locations(job)
requested_loc_in_forbidden = False
for loc in requested_locations:
if loc in forbidden:
......@@ -792,16 +801,16 @@ class CraySystem(BaseSystem):
if job['queue'] not in self.nodes_by_queue.keys():
# Either a new queue with no resources, or a possible
# reservation need to do extra work for a reservation
node_id_list = list(required - forbidden)
node_id_list = list(required - forbidden - ssd_unavail)
node_id_list = list(set(self.nodes_by_queue[job['queue']]) - forbidden)
node_id_list = list(set(self.nodes_by_queue[job['queue']]) - forbidden - ssd_unavail)
if requested_locations != set([]): # handle attrs location= requests
job_set = set([str(nid) for nid in requested_locations])
if job['queue'] not in self.nodes_by_queue.keys():
#we're in a reservation and need to further restrict nodes.
if job_set <= set(node_id_list):
# We are in a reservation there are no forbidden nodes.
node_id_list = list(requested_locations)
node_id_list = list(requested_locations - ssd_unavail)
# We can't run this job. Insufficent resources in this
# reservation to do so. Don't risk blocking anything.
......@@ -815,9 +824,7 @@ class CraySystem(BaseSystem):
# this job has requested locations that are a part of an
# active reservation. Remove locaitons and drop available
# nodecount appropriately.
node_id_list = list(set(node_id_list) - forbidden)
node_id_list = []
node_id_list = list(set(node_id_list) - forbidden - ssd_unavail)
if not requested_loc_in_forbidden:
raise ValueError("forbidden locations not in queue")
with self._node_lock:
......@@ -874,6 +881,7 @@ class CraySystem(BaseSystem):
update to happen while doing this.
ssd_required = (job.get('attrs', {}).get("ssds", "none").lower() == "required")
if job.get('attrs', {}).get('mcdram', None) is None or job.get('attrs', {}).get('numa', None) is None:
# insufficient information to include a mode match
return self._select_first_nodes(job, node_id_list)
......@@ -893,6 +901,7 @@ class CraySystem(BaseSystem):
return ret_nids[:int(job['nodes'])]
def _associate_and_run_immediate(self, job, resource_until_time, node_id_list):
'''Given a list of idle node ids, choose a set that can run a job
immediately, if a set exists in the node_id_list.
......@@ -1082,7 +1091,7 @@ class CraySystem(BaseSystem):
drain_list = []
candidate_list = []
cleanup_statuses = ['cleanup', 'cleanup-pending']
forbidden, required, requested_locations = self._setup_special_locaitons(job)
forbidden, required, requested_locations, ssd_unavail = self._setup_special_locations(job)
node_id_list = self._assemble_queue_data(job, idle_only=False)
except ValueError:
......@@ -1131,6 +1140,8 @@ class CraySystem(BaseSystem):
# location and reservation avoidance data:
if forbidden != set([]):
candidates = candidates.difference(forbidden)
if ssd_unavail != set([]):
candidates = candidates.difference(ssd_unavail)
if requested_locations != set([]):
candidates = candidates.intersection(requested_locations)
candidate_list = list(candidates)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment