Commit f06b3995 authored by Paul Rich's avatar Paul Rich
Browse files

Initial testing of new draining/backfilling appears to be working.

Uncovered another bug while working on this, old behavior masked it, but
there was a way to get a block to ignore the scheduled flag as well.
parent 6f7033ed
......@@ -49,7 +49,7 @@ MAX_DRAIN_HOURS = float(get_config_option('bgsystem', 'max_drain_hours', float(s
BACKFILL_MODE = get_config_option('bgsystem', 'backfill_mode', 'OPTIMISTIC').upper()
#you'd think that this would be in the control system database somewhere, but it's not.
#this generates the node locations for N00 in a midplane. So far as I know (and I can
#this generates the node locations for N00 in a midplane. So far as I know (and I can
#be proven wrong here) these are consistient between midplanes.
#Also, when moving between nodes, the B and E dimensions seem to like to reverse together.
......@@ -60,11 +60,11 @@ def generate_base_node_map():
ret_map = [[0,0,0,0,0]]
__transform_subcube(ret_map)
ret_map.append(__transform_dim(('b','d'), ret_map[len(ret_map)-1]))
ret_map.append(__transform_dim(('b','d'), ret_map[len(ret_map)-1]))
__transform_subcube(ret_map)
ret_map.append(__transform_dim(('a','b','c'), ret_map[len(ret_map)-1]))
__transform_subcube(ret_map)
ret_map.append(__transform_dim(('b','d'), ret_map[len(ret_map)-1]))
ret_map.append(__transform_dim(('b','d'), ret_map[len(ret_map)-1]))
__transform_subcube(ret_map)
return ret_map
......@@ -82,20 +82,20 @@ def __transform_subcube(coord_map):
return
def __transform_dim(dims, dim_tuple):
ret = [i for i in dim_tuple]
if 'a' in dims:
ret[0] = ret[0] ^ 1
ret[0] = ret[0] ^ 1
if 'b' in dims:
ret[1] = ret[1] ^ 1
ret[1] = ret[1] ^ 1
if 'c' in dims:
ret[2] = ret[2] ^ 1
ret[2] = ret[2] ^ 1
if 'd' in dims:
ret[3] = ret[3] ^ 1
ret[3] = ret[3] ^ 1
if 'e' in dims:
ret[4] = ret[4] ^ 1
ret[4] = ret[4] ^ 1
return ret
#these allow us to translate from NXX positons in the midplane
......@@ -770,8 +770,6 @@ class BGBaseSystem (Component):
self.logger.info("%s called add_to_managed_blocks(%r)", user_name, specs)
self.logger.log(1, "managed_blocks: %s", managed_set)
specs = [{'name':spec.get("name")} for spec in specs]
self.logger.debug("%s", specs)
self.logger.debug("%s", block_dict.q_get([{'name':'Q0G-I0-384'},]))
self._blocks_lock.acquire()
try:
blocks = [block for block in block_dict.q_get(specs) if block.name not in managed_set]
......@@ -784,9 +782,6 @@ class BGBaseSystem (Component):
for block in blocks:
self.available_block_geometries.add(block.geometry_string)
self._blocks_lock.release()
self.logger.debug("%s", [block.name for block in blocks])
self.logger.debug("%s", [b.name for b in block_dict.values()])
self.logger.debug("%s", managed_set)
self.update_relatives()
return [block.name for block in blocks]
......@@ -938,9 +933,6 @@ class BGBaseSystem (Component):
# only a child if the node-level resources are a proper subset of it's parent block.
b._parents.update([block for block in b._relatives if b.is_parent(block)])
b._children.update([block for block in b._relatives if b.is_child(block)])
#self.logger.debug('Block: %s:\nRelatives: %s', b.name, [block.name for block in b._relatives])
def validate_job(self, spec):
"""validate a job for submission
......@@ -1086,7 +1078,7 @@ class BGBaseSystem (Component):
queue = args['queue']
utility_score = args['utility_score']
walltime = args['walltime']
#walltime_p = args.get('walltime_p', walltime) #*AdjEst*
#walltime_p = args.get('walltime_p', walltime) #*AdjEst*
forbidden = set(args.get("forbidden", []))
pt_forbidden = set(args.get("pt_forbidden", []))
required = args.get("required", [])
......@@ -1127,6 +1119,8 @@ class BGBaseSystem (Component):
for p in available_blocks.copy():
if p.size != desired_size:
available_blocks.remove(p)
elif not backfilling and p.draining:
available_blocks.remove(p)
elif geometry != None and geometry != p.node_geometry:
available_blocks.remove(p)
elif p.name in self._not_functional_set:
......@@ -1139,6 +1133,8 @@ class BGBaseSystem (Component):
skip = False
if geometry != None and geometry != p.node_geometry:
skip = True
if not backfilling and p.draining:
skip = True
if not skip:
for bad_name in forbidden:
if p.name==bad_name or bad_name in p.relatives:
......@@ -1166,7 +1162,8 @@ class BGBaseSystem (Component):
# if 60 * runtime_estimate > (block.backfill_time - now): # *Adj_Est*
# continue
if block.backfill_time is None:
logger.debug("block: %s %s", block.name, block.backfill_time)
if 60*float(walltime) > (block.backfill_time - now):
continue
......@@ -1207,7 +1204,7 @@ class BGBaseSystem (Component):
for p in locations:
if geometry != None and geometry != p.node_geometry:
continue
if drain_block is not None:
if not drain_block:
drain_block = p
else:
if p.backfill_time < drain_block.backfill_time:
......@@ -1256,6 +1253,8 @@ class BGBaseSystem (Component):
not_functional_set = set()
for target_block in self.cached_blocks.itervalues():
usable = True
if not target_block.scheduled:
usable = False
if (target_block.name in self.offline_blocks or
NOT_OFFLINE_RE.search(target_block.state) is None):
usable = False
......@@ -1268,6 +1267,7 @@ class BGBaseSystem (Component):
not_functional_set.add(target_block.name)
break
for queue_name in target_block.queue.split(":"):
if not per_queue.has_key(queue_name):
per_queue[queue_name] = {}
......@@ -1305,8 +1305,6 @@ class BGBaseSystem (Component):
if block.name in job_end_times.keys():
block.backfill_time = max(now + minimum_not_idle,
job_end_times[block.name])
for child in block._children:
child.backfill_time = block.backfill_time
elif block.state == 'idle':
block.backfill_time = now
else:
......@@ -1315,57 +1313,6 @@ class BGBaseSystem (Component):
block.backfill_time = now + minimum_not_idle
block.draining = False
# for block in blocks.itervalues():
# if block.name in job_end_times.keys():
# #job actively running
# #iterate over current jobs. Blocks with running jobs are set to the job's end time (startime + walltime)
# #Iterate over parents and set their time to the backfill window as well.
# # only set the parent block's time if it is earlier than the block's time
# if BACKFILL_MODE == 'PESSIMISTIC':
# if job_end_times[block.name] > block.backfill_time:
# block.backfill_time = job_end_times[block.name]
# for parent_block in block._parents:
# if parent_block.backfill_time < block.backfill_time:
# parent_block.backfill_time = block.backfill_time
# elif BACKFILL_MODE == 'OPTIMISTIC':
# for parent_block in block._parents:
# # push times to top-level. These will get pushed
# # back down to children later. This keeps times
# # reasonalble for child-blocks ultimately, and
# # should allow for looser backfilling
# if (parent_block.backfill_time > block.backfill_time or
# block.backfill_time == now):
# parent_block.backfill_time = block.backfill_time
#
# #Over all blocks, ignore if the time has not been changed, otherwise push
# # the backfill time to children. Do so if the child is either immediately available
# # or if the child has a longer backfill time that the block does.
# # is this backwards?
# for block in blocks.itervalues():
# if block.backfill_time == now:
# # Block not impacted by a running job
# continue
# if BACKFILL_MODE == 'PESSIMISTIC':
# for child in block._children:
# if child.backfill_time == now or child.backfill_time > block.backfill_time:
# child.backfill_time = block.backfill_time
# elif BACKFILL_MODE == 'OPTIMISTIC':
# # blocks should be at least their children's time.
# print "tagging children", block.name
# for child in block._children:
# print child.name, child.backfill_time, block.backfill_time
# if child.backfill_time < block.backfill_time:
# print "tagging from child", child.name, child.backfill_time
# child.backfill_time = block.backfill_time
# print "tagging from child", child.name, child.backfill_time
# #Go back through, if we're actually running a job on a block, all of it's children should have timese set to the greater of their current time or the parent block's time
# for name in job_end_times.iterkeys():
# job_block = blocks[name]
# for child in job_block._children:
# if child.backfill_time < job_end_times[name]:
# child.backfill_time = job_block.backfill_time
def find_job_location(self, arg_list, end_times, pt_blocking_locations=[]):
'''Given a list of job parameters, expected termination times, and
locations blocked by reservation passthrough, find the best available
......@@ -1463,21 +1410,34 @@ class BGBaseSystem (Component):
if location is not None:
if ((location.name not in forbidden_location_blocks)):
for p_name in location.parents:
drain_blocks.add(self.cached_blocks[p_name])
for p_name in location.children:
drain_blocks.add(self.cached_blocks[p_name])
self.cached_blocks[p_name].draining = True
if BACKFILL_MODE == 'OPTIMISTIC':
self.cached_blocks[p_name].backfill_time = max([parent.backfill_time for parent in
self.cached_blocks[p_name]._parents])
# We are now officially draining this block
blocking_time = None
# Get the time we're applying to all relatives of this block
for busy_block_name in [blk for blk in job_end_times.keys() if blk in location.relatives]:
busy_block = self.cached_blocks[busy_block_name]
if blocking_time is None:
blocking_time = busy_block.backfill_time
# minimum non-idle backfill time. If everything was
# idle we'd be running
elif BACKFILL_MODE == 'PESSIMISTIC':
blocking_time = min(blocking_time,
busy_block.backfill_time)
else:
self.cached_blocks[p_name].backfill_time = max([parent.backfill_time for parent in
self.cached_blocks[p_name]._parents if parent.backfill_time != now])
blocking_time = max(blocking_time,
busy_block.backfill_time)
if blocking_time is None:
blocking_time = now + 300
#propigate time to impacted blocks
for p_name in location.relatives:
rel_block = self.cached_blocks[p_name]
drain_blocks.add(rel_block)
rel_block.draining = True
self._mark_child_draining(p_name, blocking_time, now,
drain_blocks)
drain_blocks.add(location)
self.logger.debug("job %s is draining %s state %s backfill ends at %s" % (job['jobid'], location.name,
location.state, \
time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(location.backfill_time))))
self.logger.debug("job %s is draining %s state %s backfill ends at %s",
job['jobid'], location.name, location.state,
time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(location.backfill_time)))
location.draining = True
# the next time through, try to backfill, but only if we couldn't find anything to start
......@@ -1519,6 +1479,17 @@ class BGBaseSystem (Component):
return best_block_dict
find_job_location = locking(exposed(find_job_location))
def _mark_child_draining(self, block_name, backfill_time, now, drain_blocks):
block = self.cached_blocks[block_name]
# Propigate backfill times to relatives, choose largest
if block.backfill_time == now:
block.backfill_time = backfill_time
elif BACKFILL_MODE == 'PESSIMISTIC':
block.backfill_time = max(block.backfill_time, backfill_time)
else:
block.backfill_time = min(block.backfill_time, backfill_time)
def _walltimecmp(self, dict1, dict2):
return -cmp(float(dict1['walltime']), float(dict2['walltime']))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment