Commit 0fcbb56e authored by Paul Rich's avatar Paul Rich
Browse files

apkill support added

Support for apkill added to kill user alps instnace in interactive jobs.
Kachina testing pending.
parent 6af7cebf
......@@ -794,7 +794,7 @@ class BaseForker (Component):
break
elif exc.errno in [errno.EBADF, errno.EINVAL, errno.EPIPE]:
_logger.error("%s: Error reading stdout from child pipe.",
child.label)
child.label, excinfo=True)
break
elif exc.errno in [errno.ENITR]:
#Try again
......
......@@ -32,7 +32,7 @@ TEMP_RESERVATION_TIME = int(get_config_option('alpssystem',
SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.0))
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem',
'pending_startup_timeout', 1200)) #default 20 minutes to account for boot.
APKILL_CMD = get_config_option('alps', 'apkill_cmd', '/opt/cray/usr/bin/apkill')
class ALPSProcessGroup(ProcessGroup):
'''ALPS-specific PocessGroup modifications.'''
......@@ -44,6 +44,9 @@ class ALPSProcessGroup(ProcessGroup):
#inherit generic getstate and setstate methods from parent
class CraySystem(BaseSystem):
'''Cray/ALPS-specific system component. Behaviors should go here. Direct
ALPS interaction through BASIL/other APIs should go through the ALPSBridge
......@@ -122,6 +125,7 @@ class CraySystem(BaseSystem):
tuple())
_logger.info('UPDATE THREAD STARTED')
self.current_equivalence_classes = []
self.killing_jobs = {}
def __getstate__(self):
'''Save process, alps-reservation information, along with base
......@@ -283,6 +287,7 @@ class CraySystem(BaseSystem):
for jobid in startup_time_to_clear:
del self.pending_starts[jobid]
self.check_killing_aprun()
with self._node_lock:
fetch_time_start = time.time()
try:
......@@ -344,7 +349,10 @@ class CraySystem(BaseSystem):
if (alps_res.jobid not in released_res_jobids and
str(node.node_id) in alps_res.node_ids):
#send only one release per iteration
alps_res.release()
apids = alps_res.release()
if apids is not None:
for apid in apids:
self.signal_aprun(apid)
released_res_jobids.append(alps_res.jobid)
#find hardware status
......@@ -428,7 +436,7 @@ class CraySystem(BaseSystem):
tracked_res = self.alps_reservations.get(new_alps_res.jobid, None)
if tracked_res is not None:
try:
tracked_res.release()
apids = tracked_res.release()
except ALPSBridge.ALPSError:
# backend reservation probably is gone, which is why
# we are here in the first place.
......@@ -444,8 +452,68 @@ class CraySystem(BaseSystem):
new_alps_res.release()
else:
self.alps_reservations[str(alps_res['batch_id'])] = new_alps_res
return
def signal_aprun(self, aprun_id, signame='SIGINT'):
'''Signal an aprun by aprun id (application_id). Does not block.
Use check_killing_aprun to determine completion/termination. Does not
depend on the host the aprun(s) was launched from.
Input:
aprun_id - integer application id number.
signame - string name of signal to send (default: SIGINT)
Notes:
Valid signals to apkill are:
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGABRT, SIGUSR1, SIGUSR2, SIGURG,
and SIGWINCH (from apkill(1) man page.) Also allowing SIGKILL.
'''
#Expect changes with an API updte
#mark legal signals from docos
if (signame not in ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM', 'SIGABRT',
'SIGUSR1', 'SIGUSR2', 'SIGURG','SIGWINCH', 'SIGKILL']):
raise ValueError('%s is not a legal value for signame.', signame)
try:
retval = Cobalt.Proxy.ComponentProxy('system_script_forker').fork(
[APKILL_CMD, '-%s' % signame, '%d' % int(aprun_id)],
'aprun_termination', '%s cleanup:'% aprun_id)
_logger.info("killing backend job: %s", aprun_id)
except xmlrpclib.Fault:
_logger.warning("XMLRPC Error while killing backend job: %s, will retry.",
aprun_id, exc_info=True)
except:
_logger.critical("Unknown Error while killing backend job: %s, will retry.",
aprun_id, exc_info=True)
else:
self.killing_jobs[aprun_id] = retval
_logger.info("%s", self.killing_jobs)
return
def check_killing_aprun(self):
'''Check that apkill commands have completed and clean them from the
system_script_forker. Allows for non-blocking cleanup initiation.
'''
try:
system_script_forker = Cobalt.Proxy.ComponentProxy('system_script_forker')
except:
self.logger.critical("Cannot connect to system_script_forker.",
exc_info=True)
return
complete_jobs = []
rev_killing_jobs = dict([(v,k) for (k,v) in self.killing_jobs.iteritems()])
removed_jobs = []
current_killing_jobs = system_script_forker.get_children(None, self.killing_jobs.values())
for job in current_killing_jobs:
if job['complete']:
del self.killing_jobs[rev_killing_jobs[int(job['id'])]]
removed_jobs.append(job['id'])
system_script_forker.cleanup_children(removed_jobs)
return
@exposed
def find_queue_equivalence_classes(self, reservation_dict,
active_queue_names, passthrough_blocking_res_list=[]):
......@@ -1138,17 +1206,38 @@ class ALPSReservation(object):
A reservation may remain if there are still active claims. When all
claims are gone
Returns a list of apids and child_ids for the system script forker
for any apids that are still cleaning.
'''
if self.dying:
#release already issued. Ignore
return
apids = []
status = ALPSBridge.release(self.alps_res_id)
if int(status['claims']) != 0:
_logger.info('ALPS reservation: %s still has %s claims.',
self.alps_res_id, status['claims'])
# fetch reservation information so that we can send kills to
# interactive apruns.
resinfo = ALPSBridge.fetch_reservations()
apids = _find_non_batch_apids(resinfo['reservations'])
else:
_logger.info('ALPS reservation: %s has no claims left.',
self.alps_res_id)
self.dying = True
return apids
def _find_non_batch_apids(resinfo):
'''Extract apids from non-batch items.'''
apids = []
for alps_res in resinfo:
#wow, this is ugly.
for applications in alps_res['ApplicationArray']:
for application in applications.values():
for app_data in application:
for commands in app_data['CommandArray']:
for command in commands.values():
if command[0]['cmd'] != 'BASIL':
apids.append(app_data['application_id'])
return apids
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment