Commit 9fe3c506 authored by Paul Rich's avatar Paul Rich
Browse files

Intermediate checkin: fixes for equivalence classes.

Checking in fixes for find queue equivalence classes that impact
draining.  Drain-status-clear now working.  Stub for drain selection.
parent 0b8cccd8
......@@ -18,6 +18,8 @@ class CrayNode(ClusterNode):
'UNKNOWN':'down', 'UNAVAIL': 'down', 'SWDOWN': 'down',
'REBOOTQ':'down', 'ADMINDOWN':'down'}
DOWN_STATUSES = ['down', 'alps-interactive']
def __init__(self, spec):
super(CrayNode, self).__init__(spec)
self._status = self.CRAY_STATE_MAP[spec['state'].upper()]
......
......@@ -606,17 +606,17 @@ class CraySystem(BaseSystem):
'''
equiv = []
node_active_queues = []
node_active_queues = set([])
self.current_equivalence_classes = []
#reverse mapping of queues to nodes
for node in self.nodes.values():
if node.managed and node.schedulable:
#only condiser nodes that we are scheduling.
node_active_queues = []
node_active_queues = set([])
for queue in node.queues:
if queue in active_queue_names:
node_active_queues.append(queue)
if node_active_queues == []:
node_active_queues.add(queue)
if node_active_queues == set([]):
#this node has nothing active. The next check can get
#expensive, so skip it.
continue
......@@ -641,7 +641,7 @@ class CraySystem(BaseSystem):
for eq_class in equiv:
found_a_match = False
for e in real_equiv:
if e['data'].intersection(eq_class['data']):
if e['queues'].intersection(eq_class['queues']):
e['queues'].update(eq_class['queues'])
e['data'].update(eq_class['data'])
found_a_match = True
......@@ -657,7 +657,7 @@ class CraySystem(BaseSystem):
if str(node_id) in eq_class['data']:
eq_class['reservations'].add(res_name)
break
#don't send what could be a large block list back in the return
#don't send what could be a large block list back in the returun
for key in eq_class:
eq_class[key] = list(eq_class[key])
del eq_class['data']
......@@ -675,10 +675,16 @@ class CraySystem(BaseSystem):
retlist.extend(expand_num_list(locs))
return retlist
def _assemble_queue_data(self, job, idle_nodes_by_queue):
def _assemble_queue_data(self, job, idle_only=True):
'''put together data for a queue, or queue-like reservation structure.
return count of idle resources, and a list of valid nodes..
Input:
job - dictionary of job data.
idle_only - [default: True] if True, return only idle nodes.
Otherwise return nodes in any non-down status.
return count of idle resources, and a list of valid nodes to run on.
if idle_only is set to false, returns a set of candidate draining nodes.
'''
......@@ -686,7 +692,6 @@ class CraySystem(BaseSystem):
# not find the queue normally. In the event of a reservation we'll
# have to intersect required nodes with the idle and available
# we also have to forbid a bunch of locations, in this case.
idle_nodecount = 0
unavailable_nodes = []
forbidden = set([str(loc) for loc in self.chain_loc_list(job.get('forbidden', []))])
required = set([str(loc) for loc in self.chain_loc_list(job.get('required', []))])
......@@ -697,7 +702,7 @@ class CraySystem(BaseSystem):
#don't spam the logs.
requested_loc_in_forbidden = True
break
if not job['queue'] in self.nodes_by_queue.keys():
if job['queue'] not in self.nodes_by_queue.keys():
# Either a new queue with no resources, or a possible
# reservation need to do extra work for a reservation
node_id_list = list(required - forbidden)
......@@ -705,7 +710,7 @@ class CraySystem(BaseSystem):
node_id_list = list(set(self.nodes_by_queue[job['queue']]) - forbidden)
if requested_locations != set([]): # handle attrs location= requests
job_set = set([str(nid) for nid in requested_locations])
if not job['queue'] in self.nodes_by_queue.keys():
if job['queue'] not in self.nodes_by_queue.keys():
#we're in a reservation and need to further restrict nodes.
if job_set <= set(node_id_list):
# We are in a reservation there are no forbidden nodes.
......@@ -713,7 +718,6 @@ class CraySystem(BaseSystem):
else:
# We can't run this job. Insufficent resources in this
# reservation to do so. Don't risk blocking anything.
#idle_nodecount = 0
node_id_list = []
else:
#normal queues. Restrict to the non-reserved nodes.
......@@ -730,13 +734,69 @@ class CraySystem(BaseSystem):
if not requested_loc_in_forbidden:
raise ValueError("forbidden locations not in queue")
with self._node_lock:
for node_id in node_id_list: #strip non-idle nodes.
if not self.nodes[str(node_id)].status in ['idle']:
unavailable_nodes.append(node_id)
if idle_only:
unavailable_nodes = [node_id for node_id in node_id_list
if self.nodes[str(node_id)].status not in ['idle']]
#unavailable_nodes.append(node_id)
else:
unavailable_nodes = [node_id for node_id in node_id_list
if self.nodes[str(node_id)].status in
self.nodes[str(node_id)].DOWN_STATUSES]
for node_id in set(unavailable_nodes):
node_id_list.remove(node_id)
idle_nodecount = len(node_id_list)
return (idle_nodecount, node_id_list)
return node_id_list
def _select_first_nodes(self, job, node_id_list):
'''Given a list of nids, select the first node count nodes fromt the
list. This is the target for alternate allocator replacement.
Input:
job - dictionary of job data from the scheduler
node_id_list - a list of possible candidate nodes
Return:
A list of nodes. [] if insufficient nodes for the allocation.
Note: hold the node lock while doing this. We really don't want a
update to happen while doing this.
'''
ret_nodes = []
with self._node_lock:
if int(job['nodes']) <= len(node_id_list):
ret_nodes = sorted(node_id_list)[:int(job['nodes'])]
return ret_nodes
def _associate_and_run_immediate(self, job, resource_until_time, node_id_list):
'''Given a list of idle node ids, choose a set that can run a job
immediately, if a set exists in the node_id_list.
Inputs:
job - Dictionary of job data
node_id_list - a list of string node id values
Side Effects:
Will reserve resources in ALPS and will set resource reservations on
allocated nodes.
Return:
None if no match, otherwise the pairing of a jobid and set of nids
that have been allocated to a job.
'''
compact_locs = None
if int(job['nodes']) <= len(node_id_list):
#this job can be run immediately
to_alps_list = self._select_first_nodes(job, node_id_list)
job_locs = self._ALPS_reserve_resources(job, resource_until_time,
to_alps_list)
if job_locs is not None and len(job_locs) == int(job['nodes']):
compact_locs = compact_num_list(job_locs)
#temporary reservation until job actually starts
self.pending_starts[job['jobid']] = resource_until_time
self.reserve_resources_until(compact_locs, resource_until_time, job['jobid'])
return compact_locs
@locking
@exposed
......@@ -771,80 +831,64 @@ class CraySystem(BaseSystem):
this and will re-reserve as needed. The alps reservation id may change
in this case post job startup.
pt_blocking_locations may be used later to block out nodes that are
impacted by warmswap operations.
'''
#TODO: make not first-fit
now = time.time()
resource_until_time = now + TEMP_RESERVATION_TIME
startup_time = now + PENDING_STARTUP_TIMEOUT
with self._node_lock:
# general idle nodecount
idle_nodecount = len([node for node in self.nodes.values() if
node.managed and node.status is 'idle'])
# only valid for this scheduler iteration.
idle_nodes_by_queue = self._idle_nodes_by_queue()
self._clear_draining_for_queues(arg_list[0]['queue'])
#check if we can run immedaitely, if not drain. Keep going until all
#nodes are marked for draining or have a pending run.
best_match = {} #jobid: list(locations)
reservation_queue_info = {}
node_end_times = {}
try:
for loc_time in end_times:
loc_spec = loc_time[0]
time = loc_time[1]
for loc in expand_num_list(loc_spec):
node_end_times[str(loc)] = time
except KeyError:
_logger.error("Invalid value for end_times: %s", end_times)
return best_match
for job in arg_list:
label = '%s/%s' % (job['jobid'], job['user'])
try:
idle_nodecount, node_id_list = self._assemble_queue_data(job, idle_nodes_by_queue)
node_id_list = self._assemble_queue_data(job)
except ValueError as exc:
_logger.warning('Job %s: requesting locations that are not in queue for that job.', job['jobid'])
continue
if int(job['nodes']) > len(node_id_list):
# will happen with reserved jobs.
#_logger.warning('Job %s: requested nodecount of %s exceeds number of nodes in queue %s',
# job['jobid'], job['nodes'], len(node_id_list))
continue
if idle_nodecount == 0:
if len(node_id_list) == 0:
# There are definitely insufficient nodes to run this job
# trivial exclude. Don't break out of the whole thing, may
# have disjoint queues.
continue
elif idle_nodecount < 0:
_logger.error("FJL showing negative nodecount of %s",
idle_nodecount)
elif int(job['nodes']) <= idle_nodecount:
label = '%s/%s' % (job['jobid'], job['user'])
#this can be run immediately
job_locs = self._ALPS_reserve_resources(job,
resource_until_time, node_id_list)
if job_locs is not None and len(job_locs) == int(job['nodes']):
compact_locs = compact_num_list(job_locs)
#temporary reservation until job actually starts
self.pending_starts[job['jobid']] = startup_time
self.reserve_resources_until(compact_locs,
resource_until_time, job['jobid'])
#set resource reservation, adjust idle count
if job['queue'] in idle_nodes_by_queue.keys():
# will recalculate this for reservation-type queues.
# If a reservation, this may not even exist.
idle_nodes_by_queue[job['queue']] -= int(job['nodes'])
elif int(job['nodes']) <= len(node_id_list):
compact_locs = self._associate_and_run_immediate(job, resource_until_time,
node_id_list)
# do we want to allow multiple placements in a single
# pass? That would likely help startup times.
if compact_locs is not None:
best_match[job['jobid']] = [compact_locs]
_logger.info("%s: Job selected for running on nodes %s",
label, compact_locs)
# do we want to allow multiple placements in a single
# pass? That would likely help startup times.
break
break #for now only select one location
else:
#TODO: draining goes here
pass
#TODO: backfill pass goes here
return best_match
def _idle_nodes_by_queue(self):
'''get the count of currently idle nodes by queue.
'''
# TODO: make sure this plays nice with reservations.
retdict = {} #queue_name: nodecount
for queue, nodes in self.nodes_by_queue.items():
retdict[queue] = 0
for node_id in nodes:
if self.nodes[node_id].status in ['idle']:
retdict[queue] += 1
return retdict
drain_node_ids = self._select_nodes_for_draining(job,
node_end_times)
_logger.info('%s: nodes %s selected for draining.', label,
compact_num_list(drain_node_ids))
for job in arg_list:
#TODO: backfill pass goes here
pass
return best_match
def _ALPS_reserve_resources(self, job, new_time, node_id_list):
'''Call ALPS to reserve resrources. Use their allocator. We can change
......@@ -864,17 +908,12 @@ class CraySystem(BaseSystem):
res_info = ALPSBridge.reserve(job['user'], job['jobid'],
int(job['nodes']), job['attrs'], node_id_list)
except ALPSBridge.ALPSError as exc:
_logger.warning('unable to reserve resources from ALPS: %s',
exc.message)
_logger.warning('unable to reserve resources from ALPS: %s', exc.message)
return None
new_alps_res = None
if res_info is not None:
new_alps_res = ALPSReservation(job, res_info, self.nodes)
self.alps_reservations[job['jobid']] = new_alps_res
#place a resource_reservation
if new_alps_res is not None:
self.reserve_resources_until(compact_num_list(new_alps_res.node_ids), new_time,
job['jobid'])
return new_alps_res.node_ids
def _clear_draining_for_queues(self, queue):
......@@ -894,9 +933,27 @@ class CraySystem(BaseSystem):
current_queues = equiv_class['queues']
if current_queues:
for node in self.nodes.values():
for queue in node.queues:
if queue in current_queues:
node.clear_drain
for q in node.queues:
if q in current_queues:
node.clear_drain()
def _select_nodes_for_draining(self, job, end_times):
'''Select nodes to be drainined. Set backfill windows on draining
nodes.
Inputs:
job - dictionary of job information to consider
Return:
List of node ids that have been selected for draining for this job
'''
try:
node_id_list = self._assemble_queue_data(job, idle_only=False)
except ValueError:
_logger.warning('Job %s: requesting locations that are not in queue for that job.', job['jobid'])
return []
@exposed
def reserve_resources_until(self, location, new_time, jobid):
......
......@@ -95,17 +95,13 @@ class TestCraySystem(object):
def test_assemble_queue_data(self):
'''CraySystem._assemble_queue_data: base functionality'''
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 5, 'expected 5 got %s' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['1','2','3','4','5'], 'expected [1, 2, 3, 4, 5] got %s' % nodelist
def test_assemble_queue_data_bad_queue(self):
'''CraySystem._assemble_queue_data: return nothing if queue for job doesn't exist'''
self.base_job['queue'] = 'foo'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Nonzero nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert nodelist == [], 'nonempty nodelist'
def test_assemble_queue_data_multiple_queue(self):
......@@ -113,9 +109,7 @@ class TestCraySystem(object):
self.system.nodes['1'].queues = ['foo']
self.system.nodes['4'].queues = ['bar']
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['2','3','5'], 'Wrong nodelist'
def test_assemble_queue_data_multiple_queue_overlap(self):
......@@ -124,50 +118,50 @@ class TestCraySystem(object):
self.system.nodes['4'].queues = ['default','bar']
self.system.nodes['5'].queues = ['baz']
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 4, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['1','2','3','4'], 'Wrong nodelist'
self.base_job['queue'] = 'foo'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert nodelist == ['1'], 'Wrong nodelist'
self.base_job['queue'] = 'bar'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 2, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['1','4'], 'Wrong nodelist'
self.base_job['queue'] = 'baz'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert nodelist == ['5'], 'Wrong nodelist'
def test_assemble_queue_data_non_idle(self):
'''CraySystem._assemble_queue_data: return only non-idle nodes'''
def test_assemble_queue_data_idle(self):
'''CraySystem._assemble_queue_data: return only idle nodes'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['4'].status = 'ADMINDOWN'
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['2','3','5'], 'Wrong nodelist'
def test_assemble_queue_data_non_down(self):
'''CraySystem._assemble_queue_data: return nodes that are not down'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['2'].status = 'cleanup-pending'
self.system.nodes['3'].status = 'allocated'
self.system.nodes['4'].status = 'ADMINDOWN'
nodelist = self.system._assemble_queue_data(self.base_job,
idle_only=False)
assert sorted(nodelist) == ['1','2','3','5'], 'Wrong nodes in list %s' % nodelist
self.system.nodes['1'].status = 'SUSPECT'
self.system.nodes['2'].status = 'alps-interactive'
nodelist = self.system._assemble_queue_data(self.base_job,
idle_only=False)
assert sorted(nodelist) == ['3','5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_attrs_location(self):
'''CraySystem._assemble_queue_data: return only attr locaiton loc'''
self.base_job['attrs'] = {'location':'3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert nodelist == ['3'], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_location_repeats(self):
'''CraySystem._assemble_queue_data: eliminate repeat location entries'''
self.base_job['attrs'] = {'location':'1,1,2,3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount got %s expected 3' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['1', '2', '3'], 'Wrong node in list %s' % nodelist
@raises(ValueError)
......@@ -175,61 +169,47 @@ class TestCraySystem(object):
'''CraySystem._assemble_queue_data: raise error for location completely outside of
queue'''
self.base_job['attrs'] = {'location':'6'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert nodelist == ['3'], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_location_multi(self):
'''CraySystem._assemble_queue_data: return only attr locaiton complex loc string'''
self.base_job['attrs'] = {'location':'1-3,5'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 4, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['1','2','3','5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes'''
self.base_job['forbidden'] = ['1-3','5']
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount %s' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['4'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc_attrs_loc(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes despite location being set'''
self.base_job['forbidden'] = ['1-3']
self.base_job['attrs'] = {'location':'1-4'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount %s' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['4'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc_attrs_loc_complete(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes block location if superset'''
self.base_job['forbidden'] = ['1-3']
self.base_job['attrs'] = {'location':'1-3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Wrong nodecount %s' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == [], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc_attrs_loc_permit(self):
'''CraySystem._assemble_queue_data: forbidden doesn't block everything'''
self.base_job['forbidden'] = ['1-3']
self.base_job['attrs'] = {'location':'4-5'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 2, 'Wrong nodecount %s' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['4','5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_reserved_loc(self):
'''CraySystem._assemble_queue_data: return reservation nodes'''
self.base_job['required'] = ['2-4']
self.base_job['queue'] = 'reservation'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount %s' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['2','3','4'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_reserved_loc_idle_only(self):
......@@ -240,19 +220,34 @@ class TestCraySystem(object):
self.system.nodes['4'].status = 'ADMINDOWN'
self.base_job['required'] = ['1-5']
self.base_job['queue'] = 'reservation'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount %s' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_reserved_loc_non_down(self):
'''CraySystem._assemble_queue_data: return reservation nodes that are not down'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['2'].status = 'cleanup-pending'
self.system.nodes['3'].status = 'allocated'
self.system.nodes['4'].status = 'ADMINDOWN'
self.base_job['required'] = ['1-5']
self.base_job['queue'] = 'reservation'
nodelist = self.system._assemble_queue_data(self.base_job,
idle_only=False)
assert sorted(nodelist) == ['1','2','3','5'], 'Wrong nodes in list %s' % nodelist
self.system.nodes['1'].status = 'SUSPECT'
self.system.nodes['2'].status = 'alps-interactive'
self.base_job['required'] = ['1-5']
self.base_job['queue'] = 'reservation'
nodelist = self.system._assemble_queue_data(self.base_job,
idle_only=False)
assert sorted(nodelist) == ['3','5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_reserved_loc_location_set(self):
'''CraySystem._assemble_queue_data: return reservation nodes for job with location set'''
self.base_job['required'] = ['1-4']
self.base_job['attrs'] = {'location':'1,2,4'}
self.base_job['queue'] = 'reservation'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount %s' % nodecount
nodelist = self.system._assemble_queue_data(self.base_job)
assert sorted(nodelist) == ['1','2','4'], 'Wrong nodes in list %s' % nodelist
#need testcase with loc targeting down nodes.
......@@ -263,9 +258,7 @@ class TestCraySystem(object):
self.system.nodes['3'].status = 'allocated'
self.system.nodes['4'].status = 'ADMINDOWN'
self.base_job['attrs'] = {'location':'1-5'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert nodelist == ['5'], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_location_all_blocked_nodes(self):
......@@ -276,25 +269,100 @@ class TestCraySystem(object):
self.system.nodes['3'].status = 'allocated'
self.system.nodes['4'].status = 'ADMINDOWN'
self.base_job['attrs'] = {'location':'1-4'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Wrong nodecount'
nodelist = self.system._assemble_queue_data(self.base_job)
assert nodelist == [], 'Wrong node in list %s' % nodelist
def fake_reserve(self,job, new_time, node_id_list):
if job['nodes'] < len(node_id_list):
return node_id_list[:int(job['nodes'])]
else:
return []
def test_find_queue_equivalence_classes_single(self):
'''CraySystem.find_queue_equivalence_classes: single queue'''
self.system.find_queue_equivalence_classes([], ['default'], [])
self.system.current_equivalence_classes
assert len(self.system.current_equivalence_classes) == 1, 'Have %s equiv classes, should have 1.'
for equiv in self.system.current_equivalence_classes:
assert equiv['queues'] == ['default'], 'mismatch in returned equiv class queues'
def test_find_queue_equivalence_classes_overlap(self):
'''CraySystem.find_queue_equivalence_classes: partial overlapping queues'''