Commit 6f7033ed authored by Paul Rich's avatar Paul Rich
Browse files

Preliminary checkin of new drain/backfill behavior.

parent e692a86d
......@@ -1107,7 +1107,7 @@ class BGBaseSystem (Component):
requested_location = args['attrs']['location']
if required:
# whittle down the list of required blocks to the ones of the proper size
# this is a lot like the stuff in _build_locations_cache, but unfortunately,
# this is a lot like the stuff in _build_locations_cache, but unfortunately,
# reservation queues aren't assigned like real queues, so that code doesn't find
# these
for p_name in required:
......@@ -1187,33 +1187,34 @@ class BGBaseSystem (Component):
return {jobid: [best_block.name]}
def _find_drain_block(self, job):
def _find_drain_block(self, job, drain_blocks):
geometry = job.get('geometry', None)
# if the user requested a particular block, we only try to drain that one
if job['attrs'].has_key("location"):
#don't drain a dead block
if job['attrs']['location'] not in self.offline_blocks:
if (job['attrs']['location'] not in self.offline_blocks and
set(job['attrs']['location']).isdisjoint(drain_blocks)):
target_name = job['attrs']['location']
return self.cached_blocks.get(target_name, None)
else:
#return this drain location or not at all for this job
return None
drain_block = None
locations = self.possible_locations(job['nodes'], job['queue'])
locations -= drain_blocks # Only you can prevent drain spam.
for p in locations:
if geometry != None and geometry != p.node_geometry:
continue
if not drain_block:
if drain_block is not None:
drain_block = p
else:
if p.backfill_time < drain_block.backfill_time:
drain_block = p
if drain_block:
# don't try to drain for an entire weekend
if drain_block is not None:
# don't try to drain for an entire weekend
hours = (drain_block.backfill_time - time.time()) / 3600.0
if hours > MAX_DRAIN_HOURS:
drain_block = None
......@@ -1231,9 +1232,9 @@ class BGBaseSystem (Component):
break
if self._locations_cache.has_key(q_name):
return self._locations_cache[q_name].get(desired_size, [])
return set(self._locations_cache[q_name].get(desired_size, []))
else:
return []
return set([])
def _build_locations_cache(self):
'''Build three things: a pair of dictionaries keyed by queue names,
......@@ -1295,11 +1296,11 @@ class BGBaseSystem (Component):
now is the time to use as the starting point of this drain window
minimum_not_idle is a minimum delta in seconds to add to now for non-idle non-job blocks
job_end_times is a dict of block_location:job_end_time. All times for this function are in seconds from epoch.
'''
#Initialize block to immediately available or ready 5 min from now.
#the backfill time is in sec from Epoch.
#initialize everything that isn't idle as being ready 5 min from now.
print BACKFILL_MODE
for block in blocks.itervalues():
if block.name in job_end_times.keys():
block.backfill_time = max(now + minimum_not_idle,
......@@ -1309,60 +1310,60 @@ class BGBaseSystem (Component):
elif block.state == 'idle':
block.backfill_time = now
else:
# Not idle, but not job running according to the queue manager.
# Not idle, but not running a job according to the queue manager.
# Usually this is cleanup.
block.backfill_time = now + minimum_not_idle
block.draining = False
for block in blocks.itervalues():
if block.name in job_end_times.keys():
#job actively running
#iterate over current jobs. Blocks with running jobs are set to the job's end time (startime + walltime)
#Iterate over parents and set their time to the backfill window as well.
# only set the parent block's time if it is earlier than the block's time
if BACKFILL_MODE == 'PESSIMISTIC':
if job_end_times[block.name] > block.backfill_time:
block.backfill_time = job_end_times[block.name]
for parent_block in block._parents:
if parent_block.backfill_time < block.backfill_time:
parent_block.backfill_time = block.backfill_time
elif BACKFILL_MODE == 'OPTIMISTIC':
for parent_block in block._parents:
# push times to top-level. These will get pushed
# back down to children later. This keeps times
# reasonalble for child-blocks ultimately, and
# should allow for looser backfilling
if (parent_block.backfill_time > block.backfill_time or
block.backfill_time == now):
parent_block.backfill_time = block.backfill_time
#Over all blocks, ignore if the time has not been changed, otherwise push
# the backfill time to children. Do so if the child is either immediately available
# or if the child has a longer backfill time that the block does.
# is this backwards?
for block in blocks.itervalues():
if block.backfill_time == now:
# Block not impacted by a running job
continue
if BACKFILL_MODE == 'PESSIMISTIC':
for child in block._children:
if child.backfill_time == now or child.backfill_time > block.backfill_time:
child.backfill_time = block.backfill_time
elif BACKFILL_MODE == 'OPTIMISTIC':
# blocks should be at least their children's time.
print "tagging children", block.name
for child in block._children:
print child.name, child.backfill_time, block.backfill_time
if child.backfill_time < block.backfill_time:
print "tagging from child", child.name, child.backfill_time
child.backfill_time = block.backfill_time
print "tagging from child", child.name, child.backfill_time
#Go back through, if we're actually running a job on a block, all of it's children should have timese set to the greater of their current time or the parent block's time
for name in job_end_times.iterkeys():
job_block = blocks[name]
for child in job_block._children:
if child.backfill_time < job_end_times[name]:
child.backfill_time = job_block.backfill_time
# for block in blocks.itervalues():
# if block.name in job_end_times.keys():
# #job actively running
# #iterate over current jobs. Blocks with running jobs are set to the job's end time (startime + walltime)
# #Iterate over parents and set their time to the backfill window as well.
# # only set the parent block's time if it is earlier than the block's time
# if BACKFILL_MODE == 'PESSIMISTIC':
# if job_end_times[block.name] > block.backfill_time:
# block.backfill_time = job_end_times[block.name]
# for parent_block in block._parents:
# if parent_block.backfill_time < block.backfill_time:
# parent_block.backfill_time = block.backfill_time
# elif BACKFILL_MODE == 'OPTIMISTIC':
# for parent_block in block._parents:
# # push times to top-level. These will get pushed
# # back down to children later. This keeps times
# # reasonalble for child-blocks ultimately, and
# # should allow for looser backfilling
# if (parent_block.backfill_time > block.backfill_time or
# block.backfill_time == now):
# parent_block.backfill_time = block.backfill_time
#
# #Over all blocks, ignore if the time has not been changed, otherwise push
# # the backfill time to children. Do so if the child is either immediately available
# # or if the child has a longer backfill time that the block does.
# # is this backwards?
# for block in blocks.itervalues():
# if block.backfill_time == now:
# # Block not impacted by a running job
# continue
# if BACKFILL_MODE == 'PESSIMISTIC':
# for child in block._children:
# if child.backfill_time == now or child.backfill_time > block.backfill_time:
# child.backfill_time = block.backfill_time
# elif BACKFILL_MODE == 'OPTIMISTIC':
# # blocks should be at least their children's time.
# print "tagging children", block.name
# for child in block._children:
# print child.name, child.backfill_time, block.backfill_time
# if child.backfill_time < block.backfill_time:
# print "tagging from child", child.name, child.backfill_time
# child.backfill_time = block.backfill_time
# print "tagging from child", child.name, child.backfill_time
# #Go back through, if we're actually running a job on a block, all of it's children should have timese set to the greater of their current time or the parent block's time
# for name in job_end_times.iterkeys():
# job_block = blocks[name]
# for child in job_block._children:
# if child.backfill_time < job_end_times[name]:
# child.backfill_time = job_block.backfill_time
def find_job_location(self, arg_list, end_times, pt_blocking_locations=[]):
......@@ -1435,11 +1436,13 @@ class BGBaseSystem (Component):
jobs[job['jobid']] = job
block_name = self._find_job_location(job, drain_blocks)
if block_name:
# we can run immediately and need to tell the rest of Cobalt to
# start a job.
best_block_dict.update(block_name)
break
#Keep pending reservations from collapsing the backfill window
location = self._find_drain_block(job)
location = self._find_drain_block(job, drain_blocks)
forbidden_locs = []
if job.has_key('forbidden'):
forbidden_locs = job['forbidden']
......@@ -1465,6 +1468,12 @@ class BGBaseSystem (Component):
for p_name in location.children:
drain_blocks.add(self.cached_blocks[p_name])
self.cached_blocks[p_name].draining = True
if BACKFILL_MODE == 'OPTIMISTIC':
self.cached_blocks[p_name].backfill_time = max([parent.backfill_time for parent in
self.cached_blocks[p_name]._parents])
else:
self.cached_blocks[p_name].backfill_time = max([parent.backfill_time for parent in
self.cached_blocks[p_name]._parents if parent.backfill_time != now])
drain_blocks.add(location)
self.logger.debug("job %s is draining %s state %s backfill ends at %s" % (job['jobid'], location.name,
location.state, \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment