Commit 2cda012d authored by Paul Rich's avatar Paul Rich
Browse files

Refactor find_queue_equivalence_classes and drain clear code

After discussions, the current find_queue_equivalence_classes for this
system really only complicates the codebase for very little actual gain.
After this, the system will have only one equivalence class at all times
consisting of all active queues assigned to nodes and all active
reservations.

This simplification allows us to ensure that find_job_location only gets
called twice, once for reservations, which ignore drain times, and then
immediately after for the normal "production" queue jobs, which do set
drain times.  In both cases we can just clear drain times across the
machine.

In addition to testing (and more tests coming for the case that caused
this examination to begin with), we know that this works, as any system
with a queue or set of overlapping queues across all resources on the
machine forms a single equivalence class under the old code.
parent cca15e3f
......@@ -160,7 +160,6 @@ class CraySystem(BaseSystem):
self._gen_node_to_queue()
self.node_update_thread = thread.start_new_thread(self._run_update_state, tuple())
_logger.info('UPDATE THREAD STARTED')
self.current_equivalence_classes = []
self.killing_jobs = {}
#hold on to the initial spec in case nodes appear out of nowhere.
self.init_spec = None
......@@ -722,63 +721,9 @@ class CraySystem(BaseSystem):
queue_assignments: a mapping of queues to schedulable locations.
'''
equiv = []
node_active_queues = set([])
self.current_equivalence_classes = [] #reverse mapping of queues to nodes
for node in self.nodes.values():
if node.managed and node.schedulable:
#only condiser nodes that we are scheduling.
node_active_queues = set([])
for queue in node.queues:
if queue in active_queue_names:
node_active_queues.add(queue)
if node_active_queues == set([]):
#this node has nothing active. The next check can get
#expensive, so skip it.
continue
#determine the queues that overlap. Hardware has to be included so
#that reservations can be mapped into the equiv classes.
found_a_match = False
for e in equiv:
for queue in node_active_queues:
if queue in e['queues']:
e['data'].add(node.node_id)
e['queues'] = e['queues'] | set(node_active_queues)
found_a_match = True
break
if found_a_match:
break
if not found_a_match:
equiv.append({'queues': set(node_active_queues),
'data': set([node.node_id]),
'reservations': set()})
#second pass to merge queue lists based on hardware
real_equiv = []
for eq_class in equiv:
found_a_match = False
for e in real_equiv:
if e['queues'].intersection(eq_class['queues']):
e['queues'].update(eq_class['queues'])
e['data'].update(eq_class['data'])
found_a_match = True
break
if not found_a_match:
real_equiv.append(eq_class)
equiv = real_equiv
#add in reservations:
for eq_class in equiv:
for res_name in reservation_dict:
for node_hunk in reservation_dict[res_name].split(":"):
for node_id in expand_num_list(node_hunk):
if str(node_id) in eq_class['data']:
eq_class['reservations'].add(res_name)
break
#don't send what could be a large block list back in the returun
for key in eq_class:
eq_class[key] = list(eq_class[key])
del eq_class['data']
self.current_equivalence_classes.append(eq_class)
return equiv
return [{'reservations': reservation_dict.keys(),
'queues': [queue_name for queue_name in self.nodes_by_queue.keys()
if queue_name in active_queue_names]}]
def _setup_special_locations(self, job):
forbidden = set([str(loc) for loc in chain_loc_list(job.get('forbidden', []))])
......@@ -1083,17 +1028,10 @@ class CraySystem(BaseSystem):
Note: does not acquire block lock. Must be locked externally.
'''
now = int(time.time())
current_queues = []
for equiv_class in self.current_equivalence_classes:
if queue in equiv_class['queues']:
current_queues = equiv_class['queues']
if current_queues:
with self._node_lock:
for node in self.nodes.values():
for q in node.queues:
if q in current_queues:
node.clear_drain()
# now we can just clear all the nodes at once. Always have a single equivalence class
with self._node_lock:
for node in self.nodes.values():
node.clear_drain()
def _select_nodes_for_draining(self, job, end_times):
'''Select nodes to be drainined. Set backfill windows on draining
......
......@@ -394,58 +394,50 @@ class TestCraySystem(object):
def test_find_queue_equivalence_classes_single(self):
'''CraySystem.find_queue_equivalence_classes: single queue'''
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.current_equivalence_classes
assert len(self.system.current_equivalence_classes) == 1, 'Have %s equiv classes, should have 1.'
for equiv in self.system.current_equivalence_classes:
self.system._gen_node_to_queue()
equivs = self.system.find_queue_equivalence_classes({}, ['default'], [])
assert len(equivs) == 1, 'Have %s equiv classes, should have 1.'
for equiv in equivs:
assert equiv['queues'] == ['default'], 'mismatch in returned equiv class queues'
def test_find_queue_equivalence_classes_overlap(self):
'''CraySystem.find_queue_equivalence_classes: partial overlapping queues'''
self.system.nodes['1'].queues = ['foo']
self.system.nodes['2'].queues = ['foo', 'default']
self.system.find_queue_equivalence_classes([], ['default', 'foo'], [])
assert len(self.system.current_equivalence_classes) == 1, (
self.system._gen_node_to_queue()
equivs = self.system.find_queue_equivalence_classes({}, ['default', 'foo'], [])
assert len(equivs) == 1, (
'Have %s equiv classes, should have 1.' %
len(self.system.current_equivalence_classes))
for equiv in self.system.current_equivalence_classes:
len(equivs))
for equiv in equivs:
assert sorted(equiv['queues']) == ['default', 'foo'], (
'mismatch in returned equiv class queues %s' %
equiv['queues'])
def test_find_queue_equivalence_classes_disjoint(self):
'''CraySystem.find_queue_equivalence_classes: disjoint queues'''
# we return one class now, no matter what.
self.system.nodes['1'].queues = ['foo']
self.system.nodes['2'].queues = ['foo']
val = self.system.find_queue_equivalence_classes([], ['default', 'foo'], [])
self.system.current_equivalence_classes
expect = [{'reservations': [], 'queues': ['foo']},
{'reservations': [], 'queues': ['default']}]
assert self.system.current_equivalence_classes == expect, (
'Expected %s, got %s' % (expect,
self.system.current_equivalence_classes))
assert val == self.system.current_equivalence_classes, (
"val/current_equivalence_class mismatch\nReturn: %s\nInternal: %s")
self.system._gen_node_to_queue()
equivs = self.system.find_queue_equivalence_classes({}, ['default', 'foo'], [])
expect = [{'reservations': [], 'queues': ['default', 'foo']}]
assert equivs == expect, 'Expected %s, got %s' % (expect, equivs)
def test_find_queue_equivalence_classes_disjoint_reservation(self):
'''CraySystem.find_queue_equivalence_classes: bind reservation all eq classes'''
self.system.nodes['1'].queues = ['foo']
self.system.nodes['2'].queues = ['foo']
val = self.system.find_queue_equivalence_classes({'test':'1-2,4-5'}, ['default', 'foo'], [])
self.system.current_equivalence_classes
expect = [{'reservations': ['test'], 'queues': ['foo']},
{'reservations': ['test'], 'queues': ['default']}]
assert self.system.current_equivalence_classes == expect, (
'Expected %s, got %s' % (expect,
self.system.current_equivalence_classes))
assert val == self.system.current_equivalence_classes, (
"val/current_equivalence_class mismatch\nReturn: %s\nInternal: %s")
self.system._gen_node_to_queue()
equivs = self.system.find_queue_equivalence_classes({'test':'1-2,4-5'}, ['default', 'foo'], [])
expect = [{'reservations': ['test'], 'queues': ['default', 'foo']}]
assert equivs == expect, 'Expected %s, got %s' % (expect, equivs)
def test_clear_draining_for_queues_full_clear(self):
'''CraySystem._clear_draining_for_queues: clear queue's draining times'''
for node in self.system.nodes.values():
node.set_drain(100.0, 300)
self.system.find_queue_equivalence_classes([], ['default'], [])
node.set_drain(101.0, 300)
self.system.find_queue_equivalence_classes({}, ['default'], [])
self.system._clear_draining_for_queues('default')
for node in self.system.nodes.values():
assert not node.draining, "node %s marked as draining!" % node.node_id
......@@ -454,27 +446,14 @@ class TestCraySystem(object):
'''CraySystem._clear_draining_for_queues: clear whole equivalence class'''
self.system.nodes['1'].queues = ['foo']
self.system.nodes['2'].queues = ['foo', 'default']
self.system._gen_node_to_queue()
for node in self.system.nodes.values():
node.set_drain(100.0, 300)
self.system.find_queue_equivalence_classes([], ['default', 'foo'], [])
self.system.find_queue_equivalence_classes({}, ['default', 'foo'], [])
self.system._clear_draining_for_queues('default')
for node in self.system.nodes.values():
assert not node.draining, "node %s marked as draining!" % node.node_id
def test_clear_drianing_for_queues_one_equiv(self):
'''CraySystem._clear_draining_for_queues: clear only one equivalence class'''
self.system.nodes['1'].queues = ['foo']
self.system.nodes['2'].queues = ['foo']
for node in self.system.nodes.values():
node.set_drain(100.0, 300)
self.system.find_queue_equivalence_classes([], ['default', 'foo'], [])
self.system._clear_draining_for_queues('default')
for node in self.system.nodes.values():
if node.node_id not in ['1', '2']:
assert not node.draining, "node %s marked as draining!" % node.node_id
else:
assert node.draining, "drain should not be cleared for node %s" % node.node_id
def test_select_nodes_for_draining_single_job(self):
'''CraySystem._select_nodes_for_draining: drain nodes from a single job'''
end_times = [[['1-3'], 100]]
......@@ -578,7 +557,7 @@ class TestCraySystem(object):
self.system.nodes['5'].queues = ['default', 'bar']
self.system.nodes['2'].status = 'busy'
self.system.nodes['5'].status = 'busy'
self.system.find_queue_equivalence_classes([],['default', 'bar'],[])
self.system.find_queue_equivalence_classes({},['default', 'bar'],[])
self.system._gen_node_to_queue()
self.base_job['nodes'] = 3
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
......@@ -703,7 +682,7 @@ class TestCraySystem(object):
jobs[1]['walltime'] = 400
self.system.reserve_resources_until('1', 100, 1)
self.system.nodes['1'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.find_queue_equivalence_classes({}, ['default'], [])
retval = self.system.find_job_location(jobs, [[['1'], 600]], [])
assert_match(retval, {}, "no location should be assigned")
assert_match(self.system.pending_starts, {}, "no starts should be pending")
......@@ -726,7 +705,7 @@ class TestCraySystem(object):
jobs[1]['walltime'] = 400
self.system.reserve_resources_until('1', 600, 1)
self.system.nodes['1'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.find_queue_equivalence_classes({}, ['default'], [])
retval = self.system.find_job_location(jobs, [[['1'], 600]], [])
assert_match(retval, {3: ['2']}, "bad location")
assert_match(self.system.pending_starts, {3: 800.0}, "no starts should be pending")
......@@ -752,7 +731,7 @@ class TestCraySystem(object):
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'busy'
self.system.nodes['5'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.find_queue_equivalence_classes({}, ['default'], [])
retval = self.system.find_job_location(jobs, [[['3-5'], 600]], [])
assert_match(retval, {3: ['1-2']}, 'bad location')
assert_match(self.system.pending_starts, {3: 800.0}, "bad pending start")
......@@ -779,7 +758,7 @@ class TestCraySystem(object):
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'down'
self.system.nodes['5'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.find_queue_equivalence_classes({}, ['default'], [])
retval = self.system.find_job_location(jobs, [[['2,5'], 600]], [])
assert_match(retval, {3: ['1,4']}, 'bad location')
assert_match(self.system.pending_starts, {3: 800.0}, "bad pending start")
......@@ -808,7 +787,7 @@ class TestCraySystem(object):
self.system.update_nodes({'queues': 'bar:default'}, ['4', '5'], None)
self.system.nodes['2'].status = 'busy'
self.system.nodes['5'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default', 'foo', 'bar'], [])
self.system.find_queue_equivalence_classes({}, ['default', 'foo', 'bar'], [])
retval = self.system.find_job_location(jobs, [[['2,5'], 600]], [])
assert_match(retval, {}, 'bad location')
assert_match(self.system.pending_starts, {}, "bad pending start")
......@@ -839,7 +818,7 @@ class TestCraySystem(object):
self.system.update_nodes({'queues': 'bar:default'}, ['4', '5'], None)
self.system.nodes['2'].status = 'busy'
self.system.nodes['5'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default', 'foo', 'bar'], [])
self.system.find_queue_equivalence_classes({}, ['default', 'foo', 'bar'], [])
retval = self.system.find_job_location(jobs, [[['2,5'], 1000]], [])
assert_match(retval, {3: ['4']}, 'bad location')
assert_match(self.system.pending_starts, {3: 800.0}, "bad pending start")
......@@ -865,7 +844,7 @@ class TestCraySystem(object):
jobs[1]['walltime'] = 400
self.system.reserve_resources_until('1', 600, 1)
self.system.nodes['1'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.find_queue_equivalence_classes({}, ['default'], [])
retval = self.system.find_job_location(jobs, [[['1'], 600]], [])
assert_match(retval, {}, 'bad location')
assert_match(self.system.pending_starts, {}, "bad pending start")
......@@ -898,7 +877,7 @@ class TestCraySystem(object):
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.find_queue_equivalence_classes({}, ['default'], [])
retval = self.system.find_job_location(jobs, [[['2-3'], 550.0], [['1'],
600.0], [['4'], 700.0]], [])
assert_match(retval, {}, 'bad location')
......@@ -933,7 +912,7 @@ class TestCraySystem(object):
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.find_queue_equivalence_classes({}, ['default'], [])
retval = self.system.find_job_location(jobs, [[['2-3'], 550.0], [['1,4'],
600.0]], [])
assert_match(retval, {4: ['5']}, 'bad location')
......@@ -968,7 +947,7 @@ class TestCraySystem(object):
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'busy'
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.find_queue_equivalence_classes({}, ['default'], [])
retval = self.system.find_job_location(jobs, [[['2-3'], 550.0], [['1,4'],
600.0]], [])
assert_match(retval, {4: ['5']}, 'bad location')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment