assemble_queue_data now can now eliminate nodes marked for draining.

......@@ -679,7 +679,7 @@ class CraySystem(BaseSystem):
return retlist
def _assemble_queue_data(self, job, idle_only=True):
def _assemble_queue_data(self, job, idle_only=True, no_draining=False):
'''put together data for a queue, or queue-like reservation structure.
......@@ -746,6 +746,9 @@ class CraySystem(BaseSystem):
unavailable_nodes = [node_id for node_id in node_id_list
if self.nodes[str(node_id)].status in
if no_draining:
unavailable_nodes.extend([node_id for node_id in node_id_list
if self.nodes[str(node_id)].draining])
for node_id in set(unavailable_nodes):
return node_id_list
......@@ -841,6 +844,7 @@ class CraySystem(BaseSystem):
now = time.time()
resource_until_time = now + TEMP_RESERVATION_TIME
end_times.sort(key=lambda x: x[1]) #sort on end time, ascending.
with self._node_lock:
# only valid for this scheduler iteration.
......@@ -851,29 +855,35 @@ class CraySystem(BaseSystem):
for loc_time in end_times:
loc_spec = loc_time[0]
time = loc_time[1]
end_time = loc_time[1]
for loc in expand_num_list(loc_spec):
node_end_times[str(loc)] = time
node_end_times[str(loc)] = end_time
except KeyError:
_logger.error("Invalid value for end_times: %s", end_times)
return best_match
for node, end_time in node_end_times.iteritems():
#initilaize our end times.
for job in arg_list:
label = '%s/%s' % (job['jobid'], job['user'])
node_id_list = self._assemble_queue_data(job)
available_node_list = self._assemble_queue_data(job, idle_only=False)
except ValueError as exc:
_logger.warning('Job %s: requesting locations that are not in queue for that job.', job['jobid'])
if int(job['nodes']) > len(node_id_list):
if int(job['nodes']) > len(available_node_list):
# will happen with reserved jobs.
if len(node_id_list) == 0:
# There are definitely insufficient nodes to run this job
# trivial exclude. Don't break out of the whole thing, may
# have disjoint queues.
elif int(job['nodes']) <= len(node_id_list):
# enough nodes are in a working state to consider the job.
# enough nodes are idle that we can run this job
compact_locs = self._associate_and_run_immediate(job, resource_until_time,
# do we want to allow multiple placements in a single
......@@ -883,15 +893,17 @@ class CraySystem(BaseSystem):"%s: Job selected for running on nodes %s",
label, compact_locs)
break #for now only select one location
#TODO: draining goes here
elif DRAIN_MODE in ['backfill', 'drain-only']:
# drain sufficient nodes for this job to run
drain_node_ids = self._select_nodes_for_draining(job,
node_end_times)'%s: nodes %s selected for draining.', label,
for job in arg_list:
#TODO: backfill pass goes here
if DRAIN_MODE in ['backfill']:
for job in arg_list:
# Backfill is first fit
#TODO: backfill pass goes here
return best_match
def _ALPS_reserve_resources(self, job, new_time, node_id_list):
......@@ -947,15 +959,27 @@ class CraySystem(BaseSystem):
job - dictionary of job information to consider
node_end_times - a list of nodes and their endtimes should be sorted
in order of location preference
List of node ids that have been selected for draining for this job
List of node ids that have been selected for draining for this job,
as well as the expected drain time.
node_id_list = self._assemble_queue_data(job, idle_only=False)
except ValueError:
_logger.warning('Job %s: requesting locations that are not in queue for that job.', job['jobid'])
with self._node_lock:
if len(node_id_list) >= int(job['nodes']):
#order the node ids by id and drain-time.
key=lambda nid: self.nodes[str(nid)].drain_until)
return []
......@@ -267,7 +267,6 @@ class TestCraySystem(object):
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['1','2','4'], 'Wrong nodes in list %s' % nodelist
#need testcase with loc targeting down nodes.
def test_assemble_queue_data_attrs_location_blocked_nodes(self):
'''CraySystem._assemble_queue_data: return only idle locations'''
self.system.nodes['1'].status = 'busy'
......@@ -289,6 +288,16 @@ class TestCraySystem(object):
nodelist = self.system._assemble_queue_data(self.base_job)
assert nodelist == [], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_non_draining(self):
'''CraySystem._assemble_queue_data: return idle and non draining only'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['2'].status = 'down'
self.system.nodes['3'].status = 'allocated'
self.system.nodes['4'].set_drain(100, 1)
nodelist = self.system._assemble_queue_data(self.base_job,
assert_match(sorted(nodelist), ['5'], "Bad Nodelist")
def test_find_queue_equivalence_classes_single(self):
'''CraySystem.find_queue_equivalence_classes: single queue'''
self.system.find_queue_equivalence_classes([], ['default'], [])
......@@ -358,7 +367,6 @@ class TestCraySystem(object):
assert node.draining, "drain should not be cleared for node %s" % node.node_id
@patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
@patch.object(time, 'time', return_value=500.000)
def test_find_job_location_allocate_first_fit(self, *args, **kwargs):
......@@ -372,3 +380,4 @@ class TestCraySystem(object):
assert self.system.nodes['1'].reserved_until == 800.0, (
'reserved until expected 800.0, got %s' % self.system.nodes['1'].reserved_until)
