Commit 717ec9fd authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '25-fix-reservation-location' into 'master'

Resolve "Reservation-location interaction"

Closes #25 
This resolves a number of issues with _assemble_queue_data with types and methods of reservation avoidance.
It also fixes issues with reservations and --attrs location= being used together

See merge request !13
parents b1d21737 bad92e87
......@@ -680,40 +680,54 @@ class CraySystem(BaseSystem):
# we also have to forbid a bunch of locations, in this case.
idle_nodecount = 0
unavailable_nodes = []
forbidden = set(self.chain_loc_list(job.get('forbidden', [])))
required = set(self.chain_loc_list(job.get('required', [])))
requested_locations = expand_num_list(job['attrs'].get('location', ''))
forbidden = set([str(loc) for loc in self.chain_loc_list(job.get('forbidden', []))])
required = set([str(loc) for loc in self.chain_loc_list(job.get('required', []))])
requested_locations = set([str(n) for n in expand_num_list(job['attrs'].get('location', ''))])
requested_loc_in_forbidden = False
for loc in requested_locations:
if loc in forbidden:
#don't spam the logs.
requested_loc_in_forbidden = True
break
if not job['queue'] in self.nodes_by_queue.keys():
# Either a new queue with no resources, or a possible
# reservation need to do extra work for a reservation
node_id_list = list(required - forbidden)
for node_id in node_id_list:
if self.nodes[str(node_id)].status in ['idle']:
idle_nodecount += 1
else:
unavailable_nodes.append(node_id)
for node_id in unavailable_nodes:
node_id_list.remove(node_id)
else:
idle_forbidden_count = len([nid for nid in forbidden
if self.nodes[str(nid)].status =='idle'])
idle_nodecount = idle_nodes_by_queue[job['queue']] - idle_forbidden_count
node_id_list = list(set(self.nodes_by_queue[job['queue']]) - forbidden)
if requested_locations != []:
job_set = set([int(nid) for nid in requested_locations])
if job_set <= set([int(node_id) for node_id in
self.nodes_by_queue[job['queue']]]):
node_id_list = requested_locations
if not set(node_id_list).isdisjoint(forbidden):
# this job has requested locations that are a part of an
# active reservation. Remove locaitons and drop available
# nodecount appropriately.
node_id_list = list(set(node_id_list) - forbidden)
idle_nodecount = len(node_id_list)
if requested_locations != set([]): # handle attrs location= requests
job_set = set([str(nid) for nid in requested_locations])
if not job['queue'] in self.nodes_by_queue.keys():
#we're in a reservation and need to further restrict nodes.
if job_set <= set(node_id_list):
# We are in a reservation there are no forbidden nodes.
node_id_list = list(requested_locations)
else:
# We can't run this job. Insufficent resources in this
# reservation to do so. Don't risk blocking anything.
#idle_nodecount = 0
node_id_list = []
else:
idle_nodecount = 0
node_id_list = []
raise ValueError
#normal queues. Restrict to the non-reserved nodes.
if job_set <= set([str(node_id) for node_id in
self.nodes_by_queue[job['queue']]]):
node_id_list = list(requested_locations)
if not set(node_id_list).isdisjoint(forbidden):
# this job has requested locations that are a part of an
# active reservation. Remove locaitons and drop available
# nodecount appropriately.
node_id_list = list(set(node_id_list) - forbidden)
else:
node_id_list = []
if not requested_loc_in_forbidden:
raise ValueError("forbidden locations not in queue")
with self._node_lock:
for node_id in node_id_list: #strip non-idle nodes.
if not self.nodes[str(node_id)].status in ['idle']:
unavailable_nodes.append(node_id)
for node_id in set(unavailable_nodes):
node_id_list.remove(node_id)
idle_nodecount = len(node_id_list)
return (idle_nodecount, node_id_list)
@locking
......
# Test Cray-specific utilities/calls.
from nose.tools import raises
from testsuite.TestCobalt.Utilities.assert_functions import assert_match, assert_not_match
from Cobalt.Components.system.CrayNode import CrayNode
import Cobalt.Exceptions
import time
from Cobalt.Components.system.CraySystem import CraySystem
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
import Cobalt.Components.system.AlpsBridge as AlpsBridge
from mock import MagicMock, Mock, patch
def is_match(a, b):
return a is b
class TestCrayNode(object):
def setup(self):
self.spec = {'name':'test', 'state': 'UP', 'node_id': 1, 'role':'batch',
'architecture': 'XT', 'SocketArray':['foo', 'bar'],
}
self.base_node = CrayNode(self.spec)
def teardown(self):
del self.spec
del self.base_node
def test_init(self):
'''CrayNode init test'''
spec = {'name':'test', 'state': 'UP', 'node_id': 1, 'role':'batch',
'architecture': 'XT', 'SocketArray':['foo', 'bar'],
}
node = CrayNode(spec)
assert_match(node.status, 'idle', 'bad status')
assert_match(node.node_id, 1, 'bad nodeid')
assert_match(node.role, 'BATCH', 'bad role')
assert_match(node.attributes['architecture'], 'XT',
'bad architecture', is_match)
assert_match(node.segment_details, ['foo', 'bar'],
'bad segment')
assert_match(node.ALPS_status, 'UNKNOWN',
'bad default ALPS status')
assert 'alps-interactive' in node.RESOURCE_STATUSES, 'alps-interactive not in resource statuses'
def test_init_alps_states(self):
'''CrayNode: init alps states'''
cray_state_list = ['UP', 'DOWN', 'UNAVAILABLE', 'ROUTING', 'SUSPECT',
'ADMIN', 'UNKNOWN', 'UNAVAIL', 'SWDOWN', 'REBOOTQ',
'ADMINDOWN']
correct_alps_states = {'UP': 'idle', 'DOWN':'down', 'UNAVAILABLE':'down',
'ROUTING':'down', 'SUSPECT':'down', 'ADMIN':'down',
'UNKNOWN':'down', 'UNAVAIL': 'down', 'SWDOWN': 'down',
'REBOOTQ':'down', 'ADMINDOWN':'down'}
for state in cray_state_list:
self.spec['state'] = state
node = CrayNode(self.spec)
assert node.status == correct_alps_states[state], "%s should map to %s" % (node.status, correct_alps_states[state])
def test_non_cray_statuses(self):
'''CrayNode: can set cobalt-tracking statuses.'''
test_statuses = ['busy', 'cleanup-pending', 'allocated',
'alps-interactive']
for status in test_statuses:
self.base_node.status = status
assert_match(self.base_node.status, status, "failed validation")
class TestCraySystem(object):
@patch.object(AlpsBridge, 'init_bridge')
@patch.object(CraySystem, '_init_nodes_and_reservations', return_value=None)
@patch.object(CraySystem, '_run_update_state', return_value=None)
def setup(self, *args, **kwargs):
self.system = CraySystem()
self.base_spec = {'name':'test', 'state': 'UP', 'node_id': '1', 'role':'batch',
'architecture': 'XT', 'SocketArray':['foo', 'bar'],
'queues':['default'],
}
for i in range(1,6):
self.base_spec['name'] = "test%s" % i
self.base_spec['node_id'] = str(i)
node_dict=dict(self.base_spec)
self.system.nodes[str(i)] = CrayNode(node_dict)
self.system.node_name_to_id[node_dict['name']] = node_dict['node_id']
self.system._gen_node_to_queue()
self.base_job = {'jobid':1, 'user':'crusher', 'attrs':{},
'queue':'default',
}
def teardown(self):
del self.system
del self.base_job
def test_assemble_queue_data(self):
'''CraySystem._assemble_queue_data: base functionality'''
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 5, 'expected 5 got %s' % nodecount
assert sorted(nodelist) == ['1','2','3','4','5'], 'expected [1, 2, 3, 4, 5] got %s' % nodelist
def test_assemble_queue_data_bad_queue(self):
'''CraySystem._assemble_queue_data: return nothing if queue for job doesn't exist'''
self.base_job['queue'] = 'foo'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Nonzero nodecount'
assert nodelist == [], 'nonempty nodelist'
def test_assemble_queue_data_multiple_queue(self):
'''CraySystem._assemble_queue_data: return only proper queue nodes'''
self.system.nodes['1'].queues = ['foo']
self.system.nodes['4'].queues = ['bar']
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount'
assert sorted(nodelist) == ['2','3','5'], 'Wrong nodelist'
def test_assemble_queue_data_multiple_queue_overlap(self):
'''CraySystem._assemble_queue_data: return only proper queue nodes in overlaping queues'''
self.system.nodes['1'].queues = ['foo', 'default', 'bar']
self.system.nodes['4'].queues = ['default','bar']
self.system.nodes['5'].queues = ['baz']
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 4, 'Wrong nodecount'
assert sorted(nodelist) == ['1','2','3','4'], 'Wrong nodelist'
self.base_job['queue'] = 'foo'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['1'], 'Wrong nodelist'
self.base_job['queue'] = 'bar'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 2, 'Wrong nodecount'
assert sorted(nodelist) == ['1','4'], 'Wrong nodelist'
self.base_job['queue'] = 'baz'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['5'], 'Wrong nodelist'
def test_assemble_queue_data_non_idle(self):
'''CraySystem._assemble_queue_data: return only non-idle nodes'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['4'].status = 'ADMINDOWN'
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount'
assert sorted(nodelist) == ['2','3','5'], 'Wrong nodelist'
def test_assemble_queue_data_attrs_location(self):
'''CraySystem._assemble_queue_data: return only attr locaiton loc'''
self.base_job['attrs'] = {'location':'3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['3'], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_location_repeats(self):
'''CraySystem._assemble_queue_data: eliminate repeat location entries'''
self.base_job['attrs'] = {'location':'1,1,2,3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount got %s expected 3' % nodecount
assert sorted(nodelist) == ['1', '2', '3'], 'Wrong node in list %s' % nodelist
@raises(ValueError)
def test_assemble_queue_data_attrs_bad_location(self):
'''CraySystem._assemble_queue_data: raise error for location completely outside of
queue'''
self.base_job['attrs'] = {'location':'6'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['3'], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_location_multi(self):
'''CraySystem._assemble_queue_data: return only attr locaiton complex loc string'''
self.base_job['attrs'] = {'location':'1-3,5'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 4, 'Wrong nodecount'
assert sorted(nodelist) == ['1','2','3','5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes'''
self.base_job['forbidden'] = ['1-3','5']
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == ['4'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc_attrs_loc(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes despite location being set'''
self.base_job['forbidden'] = ['1-3']
self.base_job['attrs'] = {'location':'1-4'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == ['4'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc_attrs_loc_complete(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes block location if superset'''
self.base_job['forbidden'] = ['1-3']
self.base_job['attrs'] = {'location':'1-3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == [], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc_attrs_loc_permit(self):
'''CraySystem._assemble_queue_data: forbidden doesn't block everything'''
self.base_job['forbidden'] = ['1-3']
self.base_job['attrs'] = {'location':'4-5'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 2, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == ['4','5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_reserved_loc(self):
'''CraySystem._assemble_queue_data: return reservation nodes'''
self.base_job['required'] = ['2-4']
self.base_job['queue'] = 'reservation'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == ['2','3','4'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_reserved_loc_idle_only(self):
'''CraySystem._assemble_queue_data: return reservation nodes that are idle'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['2'].status = 'cleanup-pending'
self.system.nodes['3'].status = 'allocated'
self.system.nodes['4'].status = 'ADMINDOWN'
self.base_job['required'] = ['1-5']
self.base_job['queue'] = 'reservation'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == ['5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_reserved_loc_location_set(self):
'''CraySystem._assemble_queue_data: return reservation nodes for job with location set'''
self.base_job['required'] = ['1-4']
self.base_job['attrs'] = {'location':'1,2,4'}
self.base_job['queue'] = 'reservation'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == ['1','2','4'], 'Wrong nodes in list %s' % nodelist
#need testcase with loc targeting down nodes.
def test_assemble_queue_data_attrs_location_blocked_nodes(self):
'''CraySystem._assemble_queue_data: return only idle locations'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['2'].status = 'cleanup-pending'
self.system.nodes['3'].status = 'allocated'
self.system.nodes['4'].status = 'ADMINDOWN'
self.base_job['attrs'] = {'location':'1-5'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['5'], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_location_all_blocked_nodes(self):
'''CraySystem._assemble_queue_data: return no locations if attrs location nodes are
all non idle'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['2'].status = 'cleanup-pending'
self.system.nodes['3'].status = 'allocated'
self.system.nodes['4'].status = 'ADMINDOWN'
self.base_job['attrs'] = {'location':'1-4'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Wrong nodecount'
assert nodelist == [], 'Wrong node in list %s' % nodelist
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment