Commit 78aeb729 authored by Paul Rich's avatar Paul Rich

Still in testing: make cluster systems respect the location attribute on jobs.

parent 05271d31
......@@ -280,6 +280,11 @@ class ClusterBaseSystem (Component):
return ""
unfail_partitions = exposed(unfail_partitions)
def _expand_attrs_location(self, location):
if location is not None and location != '':
return location.split(':')
return []
def _find_job_location(self, job, now, drain_time=0, already_draining=set([])):
'''Get a list of nodes capable of running a job.
......@@ -304,19 +309,32 @@ class ClusterBaseSystem (Component):
'''
forbidden = set(job.get('forbidden', [])) #These are locations the scheduler has decided are inelligible for running.
required = set(job.get('required', [])) #Always consider these nodes for scheduling, due to things being in a reservation
requested_locations = set([]) #Restrict placement to these nodes. The user has requested this restriction
if job.get('attrs', None) is not None:
requested_locations = set([str(n) for n in self._expand_attrs_location(job['attrs'].get('location', ''))])
selected_locations = set()
new_drain_time = 0
ready_to_run = False
nodes = int(job['nodes'])
available_nodes = set()
# required nodes are used for reservations
# requested nodes are requested by the user via --attrs location
# therefore required in this case should become the intersection of required and requested_locations
# this will also play nice with the later overlap check.
if requested_locations:
required = required.intersection(requested_locations)
try:
available_nodes = self.queue_assignments[job['queue']].difference(forbidden).difference(already_draining)
if requested_locations:
available_nodes = available_nodes.intersection(requested_locations)
except KeyError:
#The key error is due to the queue being a reservation queue. Those have no resources assigned in the system
#component. This should be changed in a later version, but for now, we can run straight off the "required"
#nodes list
pass
finally:
# this is a reservation job, so it's got it's list of valid locations with it. Add the required reserved nodes.
# nothing is forbidden in reservations.
available_nodes.update(set(required))
#TODO: include bit to enable predicted walltimes to be used
......
......@@ -84,8 +84,9 @@ def get_basic_job_dict():
'nodes': 1,
'queue': 'default',
'walltime': 10,
'user': 'testuser'
}
'user': 'testuser',
'attrs':{}
}
class TestClusterSystem(object):
'''Test core cluster system functionality'''
......@@ -95,7 +96,7 @@ class TestClusterSystem(object):
self.full_node_set = set(['vs1.test', 'vs2.test', 'vs3.test', 'vs4.test'])
def setup(self):
'''Ensure cluster s ystem exists for all tests. Refresh the setup between tests'''
'''Ensure cluster system exists for all tests. Refresh the setup between tests'''
self.cluster_system = Cobalt.Components.cluster_system.ClusterSystem()
def teardown(self):
......@@ -260,6 +261,124 @@ class TestClusterSystem(object):
assert best_location == {}, "ERROR: Expected best location: %s\nGot:%s" % \
({}, best_location)
def test__find_job_location_location_specified(self):
now = int(time.time())
job = get_basic_job_dict()
job['attrs']['location'] = "vs1.test"
job['nodes'] = 1
self.cluster_system.init_drain_times([]) #All resources are clear
best_location, new_drain_time, ready_to_run = self.cluster_system._find_job_location(job, now)
assert ready_to_run, "ERROR: Job not ready to run on empty system"
assert new_drain_time == 0, "Job ready to run, drain time must be 0"
assert best_location != {}, "ERROR: Ready to run, but we have no best location."
assert best_location == {'1':['vs1.test']}, "ERROR: Missing nodes from best location," \
" got: %s" % best_location
def test__find_job_location_attr_wo_location(self):
now = int(time.time())
job = get_basic_job_dict()
job['attrs']['foo'] = "bar"
job['nodes'] = 4
self.cluster_system.init_drain_times([]) #All resources are clear
best_location, new_drain_time, ready_to_run = self.cluster_system._find_job_location(job, now)
assert ready_to_run, "ERROR: Job not ready to run on empty system"
assert new_drain_time == 0, "Job ready to run, drain time must be 0"
assert best_location != {}, "ERROR: Ready to run, but we have no best location."
assert best_location == {'1':['vs2.test', 'vs4.test', 'vs1.test', 'vs3.test']}, "ERROR: Missing nodes from best location," \
" got: %s" % best_location
def test__find_job_location_no_attrs(self):
now = int(time.time())
job = get_basic_job_dict()
del job['attrs']
job['nodes'] = 4
self.cluster_system.init_drain_times([]) #All resources are clear
best_location, new_drain_time, ready_to_run = self.cluster_system._find_job_location(job, now)
assert ready_to_run, "ERROR: Job not ready to run on empty system"
assert new_drain_time == 0, "Job ready to run, drain time must be 0"
assert best_location != {}, "ERROR: Ready to run, but we have no best location."
assert best_location == {'1':['vs2.test', 'vs4.test', 'vs1.test', 'vs3.test']}, "ERROR: Missing nodes from best location," \
" got: %s" % best_location
def test__find_job_location_multi_location_specified(self):
now = int(time.time())
job = get_basic_job_dict()
job['attrs']['location'] = "vs1.test:vs4.test"
job['nodes'] = 2
self.cluster_system.init_drain_times([]) #All resources are clear
best_location, new_drain_time, ready_to_run = self.cluster_system._find_job_location(job, now)
assert ready_to_run, "ERROR: Job not ready to run on empty system"
assert new_drain_time == 0, "Job ready to run, drain time must be 0"
assert best_location != {}, "ERROR: Ready to run, but we have no best location."
assert best_location == {'1':['vs4.test', 'vs1.test']}, "ERROR: Missing nodes from best location," \
" got: %s" % best_location
def test__find_job_location_loc_and_res(self):
now = int(time.time())
job = get_basic_job_dict()
job['attrs']['location'] = "vs1.test:vs4.test"
job['required'] = ['vs1.test', 'vs2.test', 'vs4.test']
job['nodes'] = 2
self.cluster_system.init_drain_times([]) #All resources are clear
best_location, new_drain_time, ready_to_run = self.cluster_system._find_job_location(job, now)
assert ready_to_run, "ERROR: Job not ready to run on empty system"
assert new_drain_time == 0, "Job ready to run, drain time must be 0"
assert best_location != {}, "ERROR: Ready to run, but we have no best location."
assert best_location == {'1':['vs4.test', 'vs1.test']}, "ERROR: Missing nodes from best location," \
" got: %s" % best_location
def test__find_job_location_loc_no_forbidden(self):
now = int(time.time())
job = get_basic_job_dict()
job['attrs']['location'] = "vs1.test:vs4.test"
job['forbidden'] = ['vs1.test', 'vs2.test', 'vs4.test']
job['nodes'] = 2
self.cluster_system.init_drain_times([]) #All resources are clear
best_location, new_drain_time, ready_to_run = self.cluster_system._find_job_location(job, now)
assert not ready_to_run, "ERROR: No job should be ready to run"
assert new_drain_time == 0, "Job ready to run, drain time must be 0"
assert best_location == {}, "ERROR: Ready to run, but we have no best location."
def test__find_job_location_loc_drain(self):
now = int(time.time())
end_time = now + 600
job = get_basic_job_dict()
job['nodes'] = 3
job['attrs']['location'] = 'vs1.test:vs2.test:vs3.test'
self.cluster_system.running_nodes.update(['vs1.test'])
self.cluster_system.init_drain_times([[['vs1.test'], end_time]])
best_location, new_drain_time, ready_to_run = self.cluster_system._find_job_location(job, now)
assert not ready_to_run, "ERROR: marked ready to run when running impossible."
assert new_drain_time == end_time, "ERROR: Expected new drain time of %d but got %d." % (end_time, new_drain_time)
assert best_location == {'1':['vs2.test', 'vs1.test', 'vs3.test']}, \
"ERROR: Unexpected best location selection. Generated %s." % best_location
def test__find_job_location_loc_no_drain_if_down(self):
now = int(time.time())
end_time = now + 600
job = get_basic_job_dict()
job['nodes'] = 3
job['attrs']['location'] = 'vs1.test:vs2.test:vs3.test'
self.cluster_system.nodes_down(['vs2.test'])
self.cluster_system.running_nodes.update(['vs1.test'])
self.cluster_system.init_drain_times([[['vs1.test'], end_time]])
best_location, new_drain_time, ready_to_run = self.cluster_system._find_job_location(job, now)
assert not ready_to_run, "ERROR: marked ready to run when running impossible."
assert new_drain_time == 0, "Job can't run, drain time must be 0"
assert best_location == {}, "ERROR: Unexpected best location selection. Generated %s." % best_location
#Test find_job_location. Take list of jobs and end times, and find a valid location.
def test_find_job_locaiton_single_valid_job(self):
jobs = [get_basic_job_dict()]
......@@ -267,7 +386,7 @@ class TestClusterSystem(object):
assert best_location == {'1':['vs2.test']}, "ERROR: Unexpected best_location.\nExpected %s\nGot %s" % \
({'1':['vs2.test']}, best_location)
def test_find_job_locaiton_no_valid_job(self):
def test_find_job_location_no_valid_job(self):
jobs = [get_basic_job_dict()]
jobs[0]['walltime'] = 300
self.cluster_system.running_nodes = self.full_node_set
......@@ -722,4 +841,27 @@ class TestReservationHandling(object):
assert best_location == {'2':['vs4.test']}, "ERROR: Unexpected best_location.\nExpected %s\nGot %s" % \
({'2':['vs4.test']}, best_location)
def test_reservations_run_spec_location_occupied(self):
#make sure first-fit rules are used for reservation jobs
#self.cluster_system.drain_mode = 'backfill'
jobs = [get_basic_job_dict() for _ in range(3)]
jobs[0]['walltime'] = 720
jobs[0]['score'] = 50000
jobs[0]['nodes'] = 2
jobs[0]['required'] = ['vs2.test', 'vs3.test', 'vs4.test']
jobs[0]['attrs']['location'] = 'vs3.test'
jobs[1]['jobid'] = 2
jobs[1]['walltime'] = 10
jobs[1]['score'] = 100
jobs[1]['required'] = ['vs2.test', 'vs3.test', 'vs4.test']
jobs[1]['attrs']['location'] = 'vs3.test'
jobs[2]['jobid'] = 3
jobs[2]['walltime'] = 5
jobs[2]['score'] = 10
jobs[2]['required'] = ['vs2.test', 'vs3.test', 'vs4.test']
jobs[2]['attrs']['location'] = 'vs3.test'
self.cluster_system.running_nodes = set(['vs1.test', 'vs2.test', 'vs3.test'])
end_times = []
best_location = self.cluster_system.find_job_location(jobs, end_times)
assert best_location == {}, "ERROR: Unexpected best_location.\nExpected %s\nGot %s" % \
({}, best_location)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment