Commit 04c9ed87 authored by Paul Rich's avatar Paul Rich
Browse files

WIP: testing out locking changes for job startup race condition avoidance.

parent 3fccd335
......@@ -1244,57 +1244,62 @@ class CraySystem(BaseSystem):
Invokes a forker component to run a user script. In the event of a
fatal startup error, will release resource reservations.
Note:
Process Group startup and intialization holds the process group data lock.
'''
start_apg_timer = int(time.time())
for spec in specs:
spec['forker'] = None
alps_res = self.alps_reservations.get(str(spec['jobid']), None)
if alps_res is not None:
spec['alps_res_id'] = alps_res.alps_res_id
new_pgroups = self.process_manager.init_groups(specs)
for pgroup in new_pgroups:
_logger.info('%s: process group %s created to track job status',
pgroup.label, pgroup.id)
#check resource reservation, and attempt to start. If there's a
#failure here, set exit status in process group to a sentinel value.
try:
started = self.process_manager.start_groups([pgroup.id])
except ComponentLookupError:
_logger.error("%s: failed to contact the %s component",
pgroup.label, pgroup.forker)
#this should be reraised and the queue-manager handle it
#that would allow re-requesting the run instead of killing the
#job --PMR
except xmlrpclib.Fault:
_logger.error("%s: a fault occurred while attempting to start "
"the process group using the %s component",
pgroup.label, pgroup.forker)
pgroup.exit_status = 255
self.reserve_resources_until(pgroup.location, None,
pgroup.jobid)
except Exception:
_logger.error("%s: an unexpected exception occurred while "
"attempting to start the process group using the %s "
"component; releasing resources", pgroup.label,
pgroup.forker, exc_info=True)
pgroup.exit_status = 255
self.reserve_resources_until(pgroup.location, None,
pgroup.jobid)
else:
if started is not None and started != []:
_logger.info('%s: Process Group %s started successfully.',
pgroup.label, pgroup.id)
else:
_logger.error('%s: Process Group startup failed. Aborting.',
pgroup.label)
start_apg_timer = time.time()
with self.process_manager.process_groups_lock:
for spec in specs:
spec['forker'] = None
alps_res = self.alps_reservations.get(str(spec['jobid']), None)
if alps_res is not None:
spec['alps_res_id'] = alps_res.alps_res_id
new_pgroups = self.process_manager.init_groups(specs)
_logger.debug('init groups')
for pgroup in new_pgroups:
_logger.info('%s: process group %s created to track job status',
pgroup.label, pgroup.id)
#check resource reservation, and attempt to start. If there's a
#failure here, set exit status in process group to a sentinel value.
try:
_logger.debug('starting groups')
time.sleep(30)
started = self.process_manager.start_groups([pgroup.id])
except ComponentLookupError:
_logger.error("%s: failed to contact the %s component",
pgroup.label, pgroup.forker)
#this should be reraised and the queue-manager handle it
#that would allow re-requesting the run instead of killing the
#job --PMR
except xmlrpclib.Fault:
_logger.error("%s: a fault occurred while attempting to start "
"the process group using the %s component",
pgroup.label, pgroup.forker)
pgroup.exit_status = 255
self.reserve_resources_until(pgroup.location, None,
pgroup.jobid)
end_apg_timer = int(time.time())
self.logger.debug("add_process_groups startup time: %s sec",
(end_apg_timer - start_apg_timer))
except Exception:
_logger.error("%s: an unexpected exception occurred while "
"attempting to start the process group using the %s "
"component; releasing resources", pgroup.label,
pgroup.forker, exc_info=True)
pgroup.exit_status = 255
self.reserve_resources_until(pgroup.location, None,
pgroup.jobid)
else:
if started is not None and started != []:
_logger.info('%s: Process Group %s started successfully.',
pgroup.label, pgroup.id)
else:
_logger.error('%s: Process Group startup failed. Aborting.',
pgroup.label)
pgroup.exit_status = 255
self.reserve_resources_until(pgroup.location, None,
pgroup.jobid)
end_apg_timer = time.time()
self.logger.debug("add_process_groups startup time: %s sec", (end_apg_timer - start_apg_timer))
return new_pgroups
@exposed
......
......@@ -90,16 +90,17 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''
# modify the forker in specs to force the job to round-robbin forkers
for spec in specs:
ordered_forkers = [f[0] for f in
sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
else:
spec['forker'] = ordered_forkers[0] #this is now a tuple
self.forker_taskcounts[spec['forker']] += 1
_logger.info("Job %s using forker %s", spec['jobid'], spec['forker'])
return self.process_groups.q_add(specs)
with self.process_groups_lock:
for spec in specs:
ordered_forkers = [f[0] for f in
sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
else:
spec['forker'] = ordered_forkers[0] #this is now a tuple
self.forker_taskcounts[spec['forker']] += 1
_logger.info("Job %s using forker %s", spec['jobid'], spec['forker'])
return self.process_groups.q_add(specs)
def signal_groups(self, pgids, signame="SIGINT"):
'''Send signal with signame to a list of process groups.
......@@ -133,16 +134,17 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Start process groups. Return groups that succeeded startup.
'''
started = []
for pg_id in pgids:
try:
self.process_groups[pg_id].start()
except ProcessGroupStartupError:
_logger.error("%s: Unable to start process group.",
self.process_groups[pg_id].label)
else:
started.append(pg_id)
self.process_groups[pg_id].startup_timeout = 0
with self.process_groups_lock:
started = []
for pg_id in pgids:
try:
self.process_groups[pg_id].start()
except ProcessGroupStartupError:
_logger.error("%s: Unable to start process group.",
self.process_groups[pg_id].label)
else:
started.append(pg_id)
self.process_groups[pg_id].startup_timeout = 0
return started
#make automatic get final status of process group
......@@ -157,76 +159,80 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
completed = {}
orphaned = []
completed_pgs = []
now = int(time.time())
for forker in self.forkers:
completed[forker] = []
try:
child_data = ComponentProxy(forker).get_children("process group", None)
except ComponentLookupError, e:
_logger.error("failed to contact the %s component to obtain a list of children", forker)
except:
_logger.error("unexpected exception while getting a list of children from the %s component",
forker, exc_info=True)
else:
for child in child_data:
children[(forker, child['id'])] = child
#clean up orphaned process groups
for pg in self.process_groups.values():
if now < pg.startup_timeout:
#wait for startup timeout. We don't want any hasty kills
continue
pg_id = pg.id
child_uid = (pg.forker, pg.head_pid)
if child_uid not in children:
if pg.mode == 'interactive':
#interactive job, there is no child job
if pg.interactive_complete:
completed_pgs.append(pg)
#not really orphaned, but this causes the proper cleanup
#to occur
orphaned.append(pg_id)
# Hold for update. Process group addition also results in a forker call, so we need to lock that, too
# so we have a consistient view
with self.process_groups_lock:
_logger.debug('updating groups')
now = int(time.time())
for forker in self.forkers:
completed[forker] = []
try:
child_data = ComponentProxy(forker).get_children("process group", None)
except ComponentLookupError, e:
_logger.error("failed to contact the %s component to obtain a list of children", forker)
except:
_logger.error("unexpected exception while getting a list of children from the %s component",
forker, exc_info=True)
else:
for child in child_data:
children[(forker, child['id'])] = child
#clean up orphaned process groups
for pg in self.process_groups.values():
if now < pg.startup_timeout:
#wait for startup timeout. We don't want any hasty kills
continue
orphaned.append(pg_id)
_logger.warning('%s: orphaned job exited with unknown status', pg.jobid)
pg.exit_status = 1234567
completed_pgs.append(pg)
else:
children[child_uid]['found'] = True
pg.update_data(children[child_uid])
if pg.exit_status is not None:
_logger.info('%s: job exited with status %s', pg.jobid,
pg.exit_status)
completed[pg.forker].append(children[child_uid]['id'])
pg_id = pg.id
child_uid = (pg.forker, pg.head_pid)
if child_uid not in children:
if pg.mode == 'interactive':
#interactive job, there is no child job
if pg.interactive_complete:
completed_pgs.append(pg)
#not really orphaned, but this causes the proper cleanup
#to occur
orphaned.append(pg_id)
continue
orphaned.append(pg_id)
_logger.warning('%s: orphaned job exited with unknown status', pg.jobid)
pg.exit_status = 1234567
completed_pgs.append(pg)
#check for children without process groups and clean
for forker, child_id in children.keys():
if not children[(forker, child_id)].has_key('found'):
completed[forker].append(child_id)
#clear completed
for forker in completed:
if not completed[forker] == []:
try:
ComponentProxy(forker).cleanup_children(completed[forker])
except ComponentLookupError:
_logger.error("failed to contact the %s component to cleanup children",
forker)
except Exception:
_logger.error("unexpected exception while requesting that the %s component perform cleanup",
forker, exc_info=True)
#Send any needed SIGKILLs for children that have been sent a SIGINT.
for pg in self.process_groups.values():
if (pg.sigkill_timeout is not None and
now >= pg.sigkill_timeout and
pg.exit_status is None):
pg.signal('SIGKILL')
# clear out the orphaned groups. There is no backend data for these
# groups. CQM shouldn't get anything back for these beyond tracking is
# lost.
self.cleanup_groups(orphaned)
#return the exited process groups so we can invoke cleanup
else:
children[child_uid]['found'] = True
pg.update_data(children[child_uid])
if pg.exit_status is not None:
_logger.info('%s: job exited with status %s', pg.jobid,
pg.exit_status)
completed[pg.forker].append(children[child_uid]['id'])
completed_pgs.append(pg)
#check for children without process groups and clean
for forker, child_id in children.keys():
if not children[(forker, child_id)].has_key('found'):
completed[forker].append(child_id)
#clear completed
for forker in completed:
if not completed[forker] == []:
try:
ComponentProxy(forker).cleanup_children(completed[forker])
except ComponentLookupError:
_logger.error("failed to contact the %s component to cleanup children",
forker)
except Exception:
_logger.error("unexpected exception while requesting that the %s component perform cleanup",
forker, exc_info=True)
#Send any needed SIGKILLs for children that have been sent a SIGINT.
for pg in self.process_groups.values():
if (pg.sigkill_timeout is not None and
now >= pg.sigkill_timeout and
pg.exit_status is None):
pg.signal('SIGKILL')
# clear out the orphaned groups. There is no backend data for these
# groups. CQM shouldn't get anything back for these beyond tracking is
# lost.
self.cleanup_groups(orphaned)
#return the exited process groups so we can invoke cleanup
return completed_pgs
......@@ -234,13 +240,14 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Clean up process group data from completed and logged process groups.
'''
cleaned_groups = []
for pg_id in pgids:
pg = self.process_groups[pg_id]
cleaned_groups.append(pg)
self.forker_taskcounts[pg.forker] -= 1
del self.process_groups[pg_id]
_logger.info('%s Process Group deleted', pg.label)
with self.process_groups_lock:
cleaned_groups = []
for pg_id in pgids:
pg = self.process_groups[pg_id]
cleaned_groups.append(pg)
self.forker_taskcounts[pg.forker] -= 1
del self.process_groups[pg_id]
_logger.info('%s Process Group deleted', pg.label)
return cleaned_groups
def select_ssh_host(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment