Commit e510862c authored by Paul Rich's avatar Paul Rich
Browse files

preliminary commit of drainer. Passing initial test.

parent 57711a07
......@@ -842,6 +842,8 @@ class CraySystem(BaseSystem):
pt_blocking_locations may be used later to block out nodes that are
impacted by warmswap operations.
This function doesn't hold the component lock.
'''
now = time.time()
resource_until_time = now + TEMP_RESERVATION_TIME
......@@ -869,10 +871,12 @@ class CraySystem(BaseSystem):
for job in arg_list:
label = '%s/%s' % (job['jobid'], job['user'])
try:
node_id_list = self._assemble_queue_data(job)
node_id_list = self._assemble_queue_data(job,
drain_time=(now + int(job['walltime'])))
available_node_list = self._assemble_queue_data(job, idle_only=False)
except ValueError as exc:
_logger.warning('Job %s: requesting locations that are not in queue for that job.', job['jobid'])
except ValueError:
_logger.warning('Job %s: requesting locations that are not in requested queue.',
job['jobid'])
continue
if int(job['nodes']) > len(available_node_list):
# will happen with reserved jobs.
......@@ -968,21 +972,38 @@ class CraySystem(BaseSystem):
as well as the expected drain time.
'''
print end_times
drain_list = []
candidate_list = []
try:
node_id_list = self._assemble_queue_data(job, idle_only=False)
except ValueError:
_logger.warning('Job %s: requesting locations that are not in queue for that job.', job['jobid'])
else:
with self._node_lock:
if len(node_id_list) >= int(job['nodes']):
#order the node ids by id and drain-time.
node_id_list.sort()
node_id_list.sort(reverse=True,
drain_time = None
candidate_list = [nid for nid in node_id_list
if (self.nodes[str(nid)].status == 'idle' and
not self.nodes[str(nid)].draining)]
for loc_time in end_times:
running_nodes = [str(nid) for nid in expand_num_list(loc_time[0])]
for nid in running_nodes:
self.nodes[str(nid)].set_drain(loc_time[1], job['jobid'])
candidate_list.extend(running_nodes)
if len(candidate_list) >= int(job['nodes']):
# Enough nodes have been found to drain for this job
drain_time = int(loc_time[1])
break
if drain_time is not None:
# order the node ids by id and drain-time. Longest drain
# first
candidate_list.sort()
candidate_list.sort(reverse=True,
key=lambda nid: self.nodes[str(nid)].drain_until)
return []
drain_list = candidate_list[:int(job['nodes'])]
for nid in drain_list:
self.nodes[str(nid)].set_drain(drain_time, job['jobid'])
return drain_list
@exposed
def reserve_resources_until(self, location, new_time, jobid):
......
......@@ -91,7 +91,7 @@ class TestCraySystem(object):
self.system._gen_node_to_queue()
self.base_job = {'jobid':1, 'user':'crusher', 'attrs':{},
'queue':'default', 'nodes': 1,
'queue':'default', 'nodes': 1, 'walltime': 60,
}
def teardown(self):
......@@ -388,6 +388,17 @@ class TestCraySystem(object):
else:
assert node.draining, "drain should not be cleared for node %s" % node.node_id
def test_select_nodes_for_draining_single_job(self):
'''CraySystem._select_nodes_for_draining: drain nodes from a single job'''
end_times = [['1-3', 100]]
self.system.nodes['1'].status = 'busy'
self.system.nodes['2'].status = 'busy'
self.system.nodes['3'].status = 'busy'
self.base_job['nodes'] = 4
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
assert_match(sorted(drain_nodes), ['1', '2', '3', '4'], "Bad Selection.")
@patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
@patch.object(time, 'time', return_value=500.000)
def test_find_job_location_allocate_first_fit(self, *args, **kwargs):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment