Commit 6696bfa5 authored by Paul Rich's avatar Paul Rich
Browse files

More unit testing of draining

parent e510862c
......@@ -847,27 +847,22 @@ class CraySystem(BaseSystem):
'''
now = time.time()
resource_until_time = now + TEMP_RESERVATION_TIME
end_times.sort(key=lambda x: x[1]) #sort on end time, ascending.
with self._node_lock:
# only valid for this scheduler iteration.
self._clear_draining_for_queues(arg_list[0]['queue'])
#check if we can run immedaitely, if not drain. Keep going until all
#nodes are marked for draining or have a pending run.
best_match = {} #jobid: list(locations)
node_end_times = {}
try:
for loc_time in end_times:
loc_spec = loc_time[0]
end_time = loc_time[1]
for loc in expand_num_list(loc_spec):
node_end_times[str(loc)] = end_time
except KeyError:
_logger.error("Invalid value for end_times: %s", end_times)
return best_match
else:
for node, end_time in node_end_times.iteritems():
#initilaize our end times.
self.nodes[str(node)].set_drain(end_time)
self.nodes[str(loc)].set_drain(end_time)
except (KeyError, IndexError):
err = "Invalid value for end_times: %s" % end_times
_logger.error(err)
raise ValueError(err)
for job in arg_list:
label = '%s/%s' % (job['jobid'], job['user'])
try:
......@@ -889,8 +884,8 @@ class CraySystem(BaseSystem):
elif int(job['nodes']) <= len(node_id_list):
# enough nodes are in a working state to consider the job.
# enough nodes are idle that we can run this job
compact_locs = self._associate_and_run_immediate(job, resource_until_time,
node_id_list)
compact_locs = self._associate_and_run_immediate(job,
resource_until_time, node_id_list)
# do we want to allow multiple placements in a single
# pass? That would likely help startup times.
if compact_locs is not None:
......@@ -901,7 +896,7 @@ class CraySystem(BaseSystem):
elif DRAIN_MODE in ['backfill', 'drain-only']:
# drain sufficient nodes for this job to run
drain_node_ids = self._select_nodes_for_draining(job,
node_end_times)
end_times)
_logger.info('%s: nodes %s selected for draining.', label,
compact_num_list(drain_node_ids))
if DRAIN_MODE in ['backfill']:
......@@ -964,21 +959,25 @@ class CraySystem(BaseSystem):
Inputs:
job - dictionary of job information to consider
node_end_times - a list of nodes and their endtimes should be sorted
in order of location preference
end_times - a list of nodes and their endtimes should be sorted
in order of location preference
Side Effect:
end_times will be sorted in ascending end-time order
Return:
List of node ids that have been selected for draining for this job,
as well as the expected drain time.
'''
end_times.sort(key=lambda x: int(x[1]))
print end_times
drain_list = []
candidate_list = []
try:
node_id_list = self._assemble_queue_data(job, idle_only=False)
except ValueError:
_logger.warning('Job %s: requesting locations that are not in queue for that job.', job['jobid'])
_logger.warning('Job %s: requesting locations that are not in queue.', job['jobid'])
else:
with self._node_lock:
drain_time = None
......@@ -986,7 +985,8 @@ class CraySystem(BaseSystem):
if (self.nodes[str(nid)].status == 'idle' and
not self.nodes[str(nid)].draining)]
for loc_time in end_times:
running_nodes = [str(nid) for nid in expand_num_list(loc_time[0])]
running_nodes = [str(nid) for nid in expand_num_list(loc_time[0])
if job['queue'] in self.nodes[str(nid)].queues]
for nid in running_nodes:
self.nodes[str(nid)].set_drain(loc_time[1], job['jobid'])
candidate_list.extend(running_nodes)
......
......@@ -399,6 +399,98 @@ class TestCraySystem(object):
end_times)
assert_match(sorted(drain_nodes), ['1', '2', '3', '4'], "Bad Selection.")
def test_select_nodes_for_draining_prefer_running(self):
'''CraySystem._select_nodes_for_draining: prefer nodes from running job'''
end_times = [['4-5', 100]]
self.system.nodes['4'].status = 'busy'
self.system.nodes['5'].status = 'busy'
self.base_job['nodes'] = 4
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
assert_match(sorted(drain_nodes), ['1', '2', '4', '5'], "Bad Selection.")
def test_select_nodes_for_draining_only_running(self):
'''CraySystem._select_nodes_for_draining: fit entirely in running job if possible'''
end_times = [['2-5', 100]]
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'busy'
self.system.nodes['5'].status = 'busy'
self.base_job['nodes'] = 2
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
assert_match(sorted(drain_nodes), ['2', '3'], "Bad Selection.")
def test_select_nodes_for_draining_correct_time(self):
'''CraySystem._select_nodes_for_draining: set correct drain times single job'''
end_times = [['5', 100]]
self.system.nodes['5'].status = 'busy'
self.base_job['nodes'] = 5
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
for i in range(1, 6):
assert_match(self.system.nodes[str(i)].draining, True,
"Draining not set")
assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
assert_match(self.system.nodes[str(i)].drain_until, 100, "Bad drain time")
def test_select_nodes_for_draining_multiple_running(self):
'''CraySystem._select_nodes_for_draining: choose from shortest job to drain'''
end_times = [['2-3', 100.0], ['4-5', 91.0]]
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'allocated'
self.system.nodes['5'].status = 'allocated'
self.base_job['nodes'] = 3
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
assert_match(sorted(drain_nodes), ['1' ,'4' , '5'], "Bad Selection")
for i in ['1', '4', '5']:
assert_match(self.system.nodes[str(i)].draining, True,
"Draining not set")
assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
assert_match(self.system.nodes[str(i)].drain_until, 91, "Bad drain time")
def test_select_nodes_for_draining_select_multiple_running(self):
'''CraySystem._select_nodes_for_draining: set time to longest if draining from multiple jobs'''
end_times = [['2-3', 100.0], ['4-5', 91.0]]
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'allocated'
self.system.nodes['5'].status = 'allocated'
self.base_job['nodes'] = 5
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
assert_match(sorted(drain_nodes), ['1', '2', '3', '4' , '5'], "Bad Selection")
for i in range(1,6):
assert_match(self.system.nodes[str(i)].draining, True,
"Draining not set")
assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
assert_match(self.system.nodes[str(i)].drain_until, 100, "Bad drain time")
def test_select_nodes_for_draining_select_queue(self):
'''CraySystem._select_nodes_for_draining: confine to proper queue'''
self.base_job['queue'] = 'bar'
end_times = [['5', 100.0], ['2', 50.0]]
self.system.nodes['1'].queues = ['default']
self.system.nodes['2'].queues = ['default']
self.system.nodes['3'].queues = ['bar']
self.system.nodes['4'].queues = ['default', 'bar']
self.system.nodes['5'].queues = ['default', 'bar']
self.system.nodes['2'].status = 'busy'
self.system.nodes['5'].status = 'busy'
self.system.find_queue_equivalence_classes([],['default', 'bar'],[])
self.system._gen_node_to_queue()
self.base_job['nodes'] = 3
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
assert_match(sorted(drain_nodes), ['3', '4', '5'], "Bad Selection")
for i in range(3,6):
assert_match(self.system.nodes[str(i)].draining, True,
"Draining not set")
assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
assert_match(self.system.nodes[str(i)].drain_until, 100, "Bad drain time")
@patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
@patch.object(time, 'time', return_value=500.000)
def test_find_job_location_allocate_first_fit(self, *args, **kwargs):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment