Commit 7856d38b authored by Paul Rich's avatar Paul Rich
Browse files

Draining and backfilling basics operational.

Draining and backfilling are passing basic tests.  Need to add more test
cases to the automated suite and test corner cases around
queues/reservations/locations list.

Also need to add backfill time display to nodelist/nodeadm -l.
parent 6696bfa5
......@@ -35,7 +35,11 @@ PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem',
APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apkill')
PGROUP_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pgroup_startup_timeout', 120.0))
DRAIN_MODE = get_config_option('system', 'drain_mode', 'first-fit')
#cleanup time in seconds
CLEANUP_DRAIN_WINDOW = get_config_option('system', 'cleanup_drain_window', 300)
DRAIN_MODES = ['first-fit', 'drain-only', 'backfill']
CLEANING_ID = -1
class ALPSProcessGroup(ProcessGroup):
'''ALPS-specific PocessGroup modifications.'''
......@@ -752,7 +756,7 @@ class CraySystem(BaseSystem):
self.nodes[str(node_id)].drain_until < int(drain_time))])
for node_id in set(unavailable_nodes):
node_id_list.remove(node_id)
return node_id_list
return sorted(node_id_list, key=lambda nid: int(nid))
def _select_first_nodes(self, job, node_id_list):
......@@ -773,7 +777,8 @@ class CraySystem(BaseSystem):
ret_nodes = []
with self._node_lock:
if int(job['nodes']) <= len(node_id_list):
ret_nodes = sorted(node_id_list)[:int(job['nodes'])]
node_id_list.sort(key=lambda nid: int(nid))
ret_nodes = node_id_list[:int(job['nodes'])]
return ret_nodes
def _associate_and_run_immediate(self, job, resource_until_time, node_id_list):
......@@ -855,19 +860,26 @@ class CraySystem(BaseSystem):
best_match = {} #jobid: list(locations)
try:
for loc_time in end_times:
loc_spec = loc_time[0]
loc_spec = ",".join(loc_time[0])
end_time = loc_time[1]
for loc in expand_num_list(loc_spec):
self.nodes[str(loc)].set_drain(end_time)
if self.nodes[str(loc)].reserved_jobid is not None:
# if the reserved_jobid is none, this job is already
# going for cleanup. Drain location selection
# handles that.
self.nodes[str(loc)].set_drain(end_time,
self.nodes[str(loc)].reserved_jobid)
except (KeyError, IndexError):
err = "Invalid value for end_times: %s" % end_times
_logger.error(err)
raise ValueError(err)
for job in arg_list:
label = '%s/%s' % (job['jobid'], job['user'])
# walltime is in minutes. We should really fix the storage of
# that --PMR
job_endtime = now + (int(job['walltime']) * 60)
try:
node_id_list = self._assemble_queue_data(job,
drain_time=(now + int(job['walltime'])))
node_id_list = self._assemble_queue_data(job, drain_time=job_endtime)
available_node_list = self._assemble_queue_data(job, idle_only=False)
except ValueError:
_logger.warning('Job %s: requesting locations that are not in requested queue.',
......@@ -875,6 +887,7 @@ class CraySystem(BaseSystem):
continue
if int(job['nodes']) > len(available_node_list):
# will happen with reserved jobs.
_logger.debug('Reserved skip?')
continue
if len(node_id_list) == 0:
# There are definitely insufficient nodes to run this job
......@@ -897,13 +910,9 @@ class CraySystem(BaseSystem):
# drain sufficient nodes for this job to run
drain_node_ids = self._select_nodes_for_draining(job,
end_times)
_logger.info('%s: nodes %s selected for draining.', label,
compact_num_list(drain_node_ids))
if DRAIN_MODE in ['backfill']:
for job in arg_list:
# Backfill is first fit
#TODO: backfill pass goes here
pass
if drain_node_ids != []:
_logger.info('%s: nodes %s selected for draining.', label,
compact_num_list(drain_node_ids))
return best_match
def _ALPS_reserve_resources(self, job, new_time, node_id_list):
......@@ -943,15 +952,17 @@ class CraySystem(BaseSystem):
Note: does not acquire block lock. Must be locked externally.
'''
now = int(time.time())
current_queues = []
for equiv_class in self.current_equivalence_classes:
if queue in equiv_class['queues']:
current_queues = equiv_class['queues']
if current_queues:
for node in self.nodes.values():
for q in node.queues:
if q in current_queues:
node.clear_drain()
with self._node_lock:
for node in self.nodes.values():
for q in node.queues:
if q in current_queues:
node.clear_drain()
def _select_nodes_for_draining(self, job, end_times):
'''Select nodes to be drainined. Set backfill windows on draining
......@@ -970,10 +981,11 @@ class CraySystem(BaseSystem):
as well as the expected drain time.
'''
now = int(time.time())
end_times.sort(key=lambda x: int(x[1]))
print end_times
drain_list = []
candidate_list = []
cleanup_statuses = ['cleanup', 'cleanup-pending']
try:
node_id_list = self._assemble_queue_data(job, idle_only=False)
except ValueError:
......@@ -981,11 +993,26 @@ class CraySystem(BaseSystem):
else:
with self._node_lock:
drain_time = None
# remove the following from the list:i
# 1. idle nodes that are already marked for draining.
# 2. Nodes that are in an in-use status (busy, allocated).
# 3. Nodes marked for cleanup that are not allocated to a real
# jobid. CLEANING_ID is a sentiel jobid value so we can set
# a drain window on cleaning nodes easiliy. Not sure if this
# is the right thing to do. --PMR
candidate_list = []
candidate_list = [nid for nid in node_id_list
if (self.nodes[str(nid)].status == 'idle' and
not self.nodes[str(nid)].draining)]
if ((self.nodes[str(nid)].status in ['idle'] and
not self.nodes[str(nid)].draining) or
(self.nodes[str(nid)].status in cleanup_statuses)# and
#self.nodes[str(nid)].drain_jobid == CLEANING_ID)
)]
for nid in candidate_list:
if self.nodes[str(nid)].status in cleanup_statuses:
drain_time = now + CLEANUP_DRAIN_WINDOW
for loc_time in end_times:
running_nodes = [str(nid) for nid in expand_num_list(loc_time[0])
running_nodes = [str(nid) for nid in
expand_num_list(",".join(loc_time[0]))
if job['queue'] in self.nodes[str(nid)].queues]
for nid in running_nodes:
self.nodes[str(nid)].set_drain(loc_time[1], job['jobid'])
......@@ -997,7 +1024,7 @@ class CraySystem(BaseSystem):
if drain_time is not None:
# order the node ids by id and drain-time. Longest drain
# first
candidate_list.sort()
candidate_list.sort(key=lambda nid: int(nid))
candidate_list.sort(reverse=True,
key=lambda nid: self.nodes[str(nid)].drain_until)
drain_list = candidate_list[:int(job['nodes'])]
......
......@@ -390,7 +390,7 @@ class TestCraySystem(object):
def test_select_nodes_for_draining_single_job(self):
'''CraySystem._select_nodes_for_draining: drain nodes from a single job'''
end_times = [['1-3', 100]]
end_times = [[['1-3'], 100]]
self.system.nodes['1'].status = 'busy'
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
......@@ -401,7 +401,7 @@ class TestCraySystem(object):
def test_select_nodes_for_draining_prefer_running(self):
'''CraySystem._select_nodes_for_draining: prefer nodes from running job'''
end_times = [['4-5', 100]]
end_times = [[['4-5'], 100]]
self.system.nodes['4'].status = 'busy'
self.system.nodes['5'].status = 'busy'
self.base_job['nodes'] = 4
......@@ -411,7 +411,7 @@ class TestCraySystem(object):
def test_select_nodes_for_draining_only_running(self):
'''CraySystem._select_nodes_for_draining: fit entirely in running job if possible'''
end_times = [['2-5', 100]]
end_times = [[['2-5'], 100]]
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'busy'
......@@ -423,7 +423,7 @@ class TestCraySystem(object):
def test_select_nodes_for_draining_correct_time(self):
'''CraySystem._select_nodes_for_draining: set correct drain times single job'''
end_times = [['5', 100]]
end_times = [[['5'], 100]]
self.system.nodes['5'].status = 'busy'
self.base_job['nodes'] = 5
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
......@@ -436,7 +436,7 @@ class TestCraySystem(object):
def test_select_nodes_for_draining_multiple_running(self):
'''CraySystem._select_nodes_for_draining: choose from shortest job to drain'''
end_times = [['2-3', 100.0], ['4-5', 91.0]]
end_times = [[['2-3'], 100.0], [['4-5'], 91.0]]
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'allocated'
......@@ -453,7 +453,7 @@ class TestCraySystem(object):
def test_select_nodes_for_draining_select_multiple_running(self):
'''CraySystem._select_nodes_for_draining: set time to longest if draining from multiple jobs'''
end_times = [['2-3', 100.0], ['4-5', 91.0]]
end_times = [[['2-3'], 100.0], [['4-5'], 91.0]]
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.system.nodes['4'].status = 'allocated'
......@@ -471,7 +471,7 @@ class TestCraySystem(object):
def test_select_nodes_for_draining_select_queue(self):
'''CraySystem._select_nodes_for_draining: confine to proper queue'''
self.base_job['queue'] = 'bar'
end_times = [['5', 100.0], ['2', 50.0]]
end_times = [[['5'], 100.0], [['2'], 50.0]]
self.system.nodes['1'].queues = ['default']
self.system.nodes['2'].queues = ['default']
self.system.nodes['3'].queues = ['bar']
......@@ -491,6 +491,23 @@ class TestCraySystem(object):
assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
assert_match(self.system.nodes[str(i)].drain_until, 100, "Bad drain time")
def test_select_nodes_for_draining_select_cleaning(self):
'''CraySystem._select_nodes_for_draining: include cleaning nodes if marked'''
end_times = []
now = int(time.time())
self.system.nodes['2'].status = 'cleanup'
self.system.nodes['5'].status = 'cleanup-pending'
self.system.nodes['2'].set_drain(now + 300, -1)
self.system.nodes['5'].set_drain(now + 300, -1)
self.base_job['nodes'] = 5
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
assert_match(sorted(drain_nodes), ['1', '2', '3', '4', '5'], "Bad Selection")
for i in range(1,6):
assert_match(self.system.nodes[str(i)].draining, True, "Draining not set")
assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
assert_match(self.system.nodes[str(i)].drain_until, now + 300, "Bad drain time")
@patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
@patch.object(time, 'time', return_value=500.000)
def test_find_job_location_allocate_first_fit(self, *args, **kwargs):
......@@ -505,3 +522,18 @@ class TestCraySystem(object):
'reserved until expected 800.0, got %s' % self.system.nodes['1'].reserved_until)
@patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
@patch.object(time, 'time', return_value=500.000)
def test_find_job_location_allocate_first_fit_prior_job(self, *args, **kwargs):
'''CraySystem.find_job_locaton: Assign second job to nodes'''
self.system.nodes['2'].status = 'allocated'
self.system.nodes['2'].reserved_jobid = 2
retval = self.system.find_job_location([self.base_job],
[[['2'], int(time.time()) + 3600 ]], [])
assert retval == {1: ['1']}, 'bad loc: expected %s, got %s' % ({1: ['1']}, retval)
assert self.system.pending_starts[1] == 800.0, (
'bad pending start: expected %s, got %s' %
(800.0, self.system.pending_starts[1]))
assert self.system.nodes['1'].reserved_jobid == 1, 'Node not reserved'
assert self.system.nodes['1'].reserved_until == 800.0, (
'reserved until expected 800.0, got %s' % self.system.nodes['1'].reserved_until)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment