Commit 1039e1c1 authored by Paul Rich's avatar Paul Rich
Browse files

Fix for re-reservation behavior

Rereservations were broken for long (>5 min) startups.  This should
allow the CAPMC scripts to do their thing.
parent c826bb6b
......@@ -26,7 +26,7 @@ _logger = logging.getLogger(__name__.split('.')[-1])
init_cobalt_config()
BASIL_PATH = get_config_option('alps', 'basil',
'/home/richp/alps-simulator/apbasil.sh')
DEFAULT_DEPTH = int(get_config_option('alps', 'default_depth', 72))
class ALPSScriptChild (PGChild):
def __init__(self, id = None, **kwargs):
......@@ -109,21 +109,30 @@ class ALPSScriptChild (PGChild):
'''
rc = False
success = self._send_confirm()
#success = False
#if a failure, re-reserve. Use child data to reassociate reservation
#with hardware in system componient.
if not success:
if not success :
_logger.warning('Re-reservation required for %s', self.pg.label)
#rereserve
params = {}
params['user_name'] = self.pg.user
params['batch_id'] = self.pg.jobid
params['width'] = self.pg.nodect
params['depth'] = 1 #FIXME fix this. Pass this in from qsub. FIXME
#params['node_id_list'] = self.pg.node_ids
params['width'] = int(self.pg.nodect) * int(DEFAULT_DEPTH)
params['nppn'] = int(DEFAULT_DEPTH) #FIXME fix this. Pass this in from qsub. FIXME
params['node_id_list'] = self.pg.location[0]
param_attrs['depth'] = None
param_attrs['npps'] = None
param_attrs['nspn'] = None
param_attrs['reservation_mode'] = 'EXCLUSIVE'
param_attrs['nppcu'] = None
param_attrs['p-state'] = None
param_attrs['p-govenor'] = None
reserve_request = BasilRequest('RESERVE', params=params)
basil = subprocess.Popen(BASIL_PATH, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = basil.communicate(str(reserve_request))
if basil.returncode != 0:
if basil.returncode == 0:
try:
response = parse_response(stdout)
#won't need the response itself beyond that we were successful.
......@@ -131,6 +140,8 @@ class ALPSScriptChild (PGChild):
_logger.warning('%s: unable to reserve nodes in ALPS: %s',
self.pg.label, self.pg.location)
else:
_logger.warning('%s: New reservation %s created.',
self.pg.label, response['reservation_id'])
self.pg.alps_res_id = response['reservation_id']
success = self._send_confirm()
if success:
......@@ -138,7 +149,8 @@ class ALPSScriptChild (PGChild):
else:
#Can't re-reserve. We're dead at this point. Exit child
#process now.
_logger.error('%s re-resevation failed.', self.pg.label)
_logger.error('%s re-resevation failed.\n%s\n%s', self.pg.label,
stdout, stderr)
else:
_logger.info('%s: ALPS reservation %s confirmed', self.pg.label,
self.pg.alps_res_id)
......@@ -156,8 +168,6 @@ class ALPSScriptChild (PGChild):
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#couldn't confirm, log message and let failure happen
stdout, stderr = basil.communicate(str(confirm_request))
print stdout
print stderr
if basil.returncode == 0:
#if we get a nonzero, that's a failure, fall through to return no
#success
......@@ -168,6 +178,8 @@ class ALPSScriptChild (PGChild):
_logger.warning('%s: unable to confirm ALPS reservation %s',
self.pg.label, self.pg.alps_res_id)
else:
_logger.info('%s: confirmed alps_reservation %s', self.pg.label,
self.pg.alps_res_id)
success = True
else:
_logger.error('%s: Child exited with stderr: %s', self.pg.label,
......
......@@ -29,6 +29,9 @@ UPDATE_THREAD_TIMEOUT = int(get_config_option('alpssystem',
TEMP_RESERVATION_TIME = int(get_config_option('alpssystem',
'temp_reservation_time', 300))
SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.0))
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem',
'pending_startup_timeout', 1200)) #default 20 minutes to account for boot.
class ALPSProcessGroup(ProcessGroup):
'''ALPS-specific PocessGroup modifications.'''
......@@ -88,6 +91,7 @@ class CraySystem(BaseSystem):
self.process_manager = ProcessGroupManager().__setstate__(spec['process_manager'])
#self.process_manager.forkers.append('alps_script_forker')
self.process_manager.update_launchers()
self.pending_start_timeout = 1200 #20 minutes for long reboots.
_logger.info('PROCESS MANAGER INTIALIZED')
#resource management setup
self.nodes = {} #cray node_id: CrayNode
......@@ -100,6 +104,9 @@ class CraySystem(BaseSystem):
node_info = spec.get('node_info', {})
for nid, node in node_info.items():
self.nodes[nid].reset_info(node)
#storage for pending job starts. Allows us to handle slow starts vs
#user deletes
self.pending_starts = {} #jobid: time this should be cleared.
self.nodes_by_queue = {} #queue:[node_ids]
#populate initial state
#state update thread and lock
......@@ -259,6 +266,14 @@ class CraySystem(BaseSystem):
#nodes. If there is no resource reservation and the node is not in
#current alps reservations, the node is ready to schedule again.
now = time.time()
startup_time_to_clear = []
#clear pending starttimes.
for jobid, start_time in self.pending_starts.items():
if int(now) > int(start_time):
startup_time_to_clear.append(jobid)
for jobid in startup_time_to_clear:
del self.pending_starts[jobid]
with self._node_lock:
fetch_time_start = time.time()
try:
......@@ -278,6 +293,7 @@ class CraySystem(BaseSystem):
fetch_time_start = time.time()
#_logger.debug("time in ALPS fetch: %s seconds", (time.time() - fetch_time_start))
start_time = time.time()
self._detect_rereservation(inven_reservations)
# check our reservation objects. If a res object doesn't correspond
# to any backend reservations, this reservation object should be
# dropped
......@@ -294,6 +310,8 @@ class CraySystem(BaseSystem):
if node.status in ['cleanup', 'cleanup-pending']:
node.status = 'idle'
for alps_res in self.alps_reservations.values():
if alps_res.jobid in self.pending_starts.keys():
continue #Get to this only after timeout happens
#find alps_id associated reservation
if int(alps_res.alps_res_id) not in current_alps_res_ids:
for node_id in alps_res.node_ids:
......@@ -352,7 +370,73 @@ class CraySystem(BaseSystem):
#_logger.debug("time in UNS lock: %s seconds", (time.time() - start_time))
return
def _detect_rereservation(self, inven_reservations):
'''Detect and update the ALPS reservation associated with a running job.
We are only concerned with BATCH reservations. Others would be
associated with running jobs, and should not be touched.
'''
def _construct_alps_res():
with self._node_lock:
job_nodes = [node.node_id for node in self.nodes.values()
if node.reserved_jobid == int(alps_res['batch_id'])]
new_resspec = {'reserved_nodes': job_nodes,
'reservation_id': str(alps_res['reservation_id']),
'pagg_id': 0 #unknown. Not used here.
}
new_jobspec = {'jobid': int(alps_res['batch_id']),
'user' : alps_res['user_name']}
return ALPSReservation(new_jobspec, new_resspec, self.nodes)
replaced_reservation = None
for alps_res in inven_reservations:
try:
#This traversal is terrible. May want to hide this in the API
#somewhere
if alps_res['ApplicationArray'][0]['Application'][0]['CommandArray'][0]['Command'][0]['cmd'] != 'BASIL':
# Not a reservation we're in direct control of.
continue
except (KeyError, IndexError):
#not a batch reservation
continue
if str(alps_res['batch_id']) in self.alps_reservations.keys():
# This is a reservation we may already know about
if (int(alps_res['reservation_id']) ==
self.alps_reservations[str(alps_res['batch_id'])].alps_res_id):
# Already know about this one
continue
# we have a re-reservation. If this has a batch id, we need
# to add it to our list of tracked reservations, and inherit
# other reservation details. We can pull the reservation
# information out of reserve_resources_until.
# If this is a BATCH reservation and no hardware has that
# reservation id, then this reservation needs to be released
# Could happen if we have a job starting right at the RRU
# boundary.
new_alps_res = _construct_alps_res()
tracked_res = self.alps_reservations.get(new_alps_res.jobid, None)
if tracked_res is not None:
try:
tracked_res.release()
except ALPSBridge.ALPSError:
# backend reservation probably is gone, which is why
# we are here in the first place.
pass
self.alps_reservations[str(alps_res['batch_id'])] = new_alps_res
else:
#this is a basil reservation we don't know about already.
new_alps_res = _construct_alps_res()
if len(new_alps_res.node_ids) == 0:
# This reservation has no resources, so Cobalt's internal
# resource reservation tracking has no record. This needs to
# be removed.
new_alps_res.release()
else:
self.alps_reservations[str(alps_res['batch_id'])] = new_alps_res
return
@exposed
def find_queue_equivalence_classes(self, reservation_dict,
active_queue_names, passthrough_blocking_res_list=[]):
......@@ -540,6 +624,7 @@ class CraySystem(BaseSystem):
#TODO: make not first-fit
now = time.time()
resource_until_time = now + TEMP_RESERVATION_TIME
startup_time = now + PENDING_STARTUP_TIMEOUT
with self._node_lock:
# general idle nodecount
idle_nodecount = len([node for node in self.nodes.values() if
......@@ -576,6 +661,7 @@ class CraySystem(BaseSystem):
if job_locs is not None and len(job_locs) == int(job['nodes']):
compact_locs = compact_num_list(job_locs)
#temporary reservation until job actually starts
self.pending_starts[job['jobid']] = startup_time
self.reserve_resources_until(compact_locs,
resource_until_time, job['jobid'])
#set resource reservation, adjust idle count
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment