Commit 6af7cebf authored by Paul Rich's avatar Paul Rich
Browse files

Interactive cleanup now working.

Resources for interactive jobs are now appropriately released.  There is
still a known issue with currently running aprun instances.  That will
be addressed in a further patch.
parent 086f7b6b
......@@ -369,13 +369,13 @@ def parse_options(parser, spec, opts, opt2spec, def_spec):
opts['disable_preboot'] = not spec['script_preboot']
return opt_count
def fetch_pgid(user, jobid, loc):
def fetch_pgid(user, jobid, loc, pgid=None):
'''fetch and set pgid for user shell. Needed for cray systems'''
if client_utils.component_call(SYSMGR, False, 'get_implementation', ()) == 'alpssystem':
pgid = os.getpgid(0)
if client_utils.component_call(SYSMGR, False, 'get_implementation', ()) == 'alps_system':
#Cray is apparently using the session id for interactive jobs.
spec = [{'user': user, 'jobid': jobid, 'pgid': pgid, 'location':loc}]
if not client_utils.component_call(SYSMGR, False, 'confirm_alps_reservation', (spec)):
logger.error('Unable to confirm ALPS reservation. Exiting.')
client_utils.logger.error('Unable to confirm ALPS reservation. Exiting.')
sys.exit(1)
return
......@@ -387,8 +387,9 @@ def exec_user_shell(user, jobid, loc):
Wait until termination.
'''
pgid = os.getppid()
proc = subprocess.Popen([os.environ['SHELL']], shell=True,
preexec_fn=(lambda: fetch_pgid(user, jobid, loc)))
preexec_fn=(lambda: fetch_pgid(user, jobid, loc, pgid=pgid)))
os.waitpid(proc.pid, 0)
def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
......@@ -399,7 +400,6 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
# save whether we are running on a cluster system
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
exit_on_interrupt()
deljob = True if impl == "cluster_system" else False
def start_session(loc, resid, nodes, procs):
......@@ -419,7 +419,7 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
client_utils.logger.info("Opening interactive session to %s", loc)
if deljob:
os.system("/usr/bin/ssh -o \"SendEnv COBALT_NODEFILE COBALT_JOBID\" %s" % (loc))
if impl == 'alpssystem':
if impl == 'alps_system':
exec_user_shell(user, jobid, loc)
else:
os.system(os.environ['SHELL'])
......
......@@ -117,7 +117,7 @@ def confirm(alps_res_id, pg_id):
None Yet.
'''
params = {'pagg_id': pg_id,
'reservation': alps_res_id}
'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
params=params)))
return retval
......
......@@ -14,6 +14,7 @@ from Cobalt.Components.system.base_system import BaseSystem
from Cobalt.Components.system.CrayNode import CrayNode
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
from Cobalt.Exceptions import ComponentLookupError
from Cobalt.Exceptions import JobNotInteractive
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Util import compact_num_list, expand_num_list
from Cobalt.Util import init_cobalt_config, get_config_option
......@@ -39,6 +40,9 @@ class ALPSProcessGroup(ProcessGroup):
def __init__(self, spec):
super(ALPSProcessGroup, self).__init__(spec)
self.alps_res_id = spec['alps_res_id']
self.interactive_complete = False
#inherit generic getstate and setstate methods from parent
class CraySystem(BaseSystem):
'''Cray/ALPS-specific system component. Behaviors should go here. Direct
......@@ -86,9 +90,11 @@ class CraySystem(BaseSystem):
_logger.info('BRIDGE INITIALIZED')
#process manager setup
if spec is None:
self.process_manager = ProcessGroupManager()
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
else:
self.process_manager = ProcessGroupManager().__setstate__(spec['process_manager'])
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
self.logger.info('pg type %s',
self.process_manager.process_groups.item_cls)
#self.process_manager.forkers.append('alps_script_forker')
self.process_manager.update_launchers()
self.pending_start_timeout = 1200 #20 minutes for long reboots.
......@@ -155,8 +161,11 @@ class CraySystem(BaseSystem):
# much more compact representation of data. Reservednodes gives
# which notes are reserved.
inventory = ALPSBridge.fetch_inventory()
_logger.info('INVENTORY FETCHED')
system = ALPSBridge.extract_system_node_data(ALPSBridge.system())
_logger.info('SYSTEM DATA FETCHED')
reservations = ALPSBridge.fetch_reservations()
_logger.info('ALPS RESERVATION DATA FETCHED')
# reserved_nodes = ALPSBridge.reserved_nodes()
except Exception:
#don't crash out here. That may trash a statefile.
......@@ -1004,26 +1013,39 @@ class CraySystem(BaseSystem):
'''
try:
pg = self.process_manager.process_groups[int(specs['jobid'])]
pg_id = int(specs['pg_id'])
pg = None
for pgroup in self.process_manager.process_groups.values():
if pgroup.jobid == int(specs['jobid']):
pg = pgroup
#pg = self.process_manager.process_groups[int(specs['pg_id'])]
pg_id = int(specs['pgid'])
except KeyError:
return False
raise
if pg is None:
raise ValueError('invalid jobid specified')
# Try to find the alps_res_id for this job. if we don't have it, then we
# need to reacquire the resource reservation. The job locations will be
# need to reacquire the source reservation. The job locations will be
# critical for making this work.
with self._node_lock:
# do not want to hit this during an update.
alps_res = self.alps_reservations.get(pg.jobid, None)
alps_res = self.alps_reservations.get(str(pg.jobid), None)
# find nodes for jobid. If we don't have sufficient nodes, job
# should die
job_nodes = [node for node in self.nodes if node.reserved_jobid == pg.jobid]
if len(job_nodes) == 0:
job_nodes = [node for node in self.nodes.values()
if node.reserved_jobid == pg.jobid]
nodecount = len(job_nodes)
if nodecount == 0:
_logger.warning('%s: No nodes reserved for job.', pg.label)
return False
new_time = job_nodes[0].reserved_until
node_list = compact_num_list([node.node_id for node in job_nodes])
if alps_res is None:
self._ALPS_reserve_resources(pg.jobid, new_time, node_list)
job_info = {'user': specs['user'],
'jobid':specs['jobid'],
'nodes': nodecount,
'attrs': {},
}
self._ALPS_reserve_resources(job_info, new_time, node_list)
alps_res = self.alps_reservations.get(pg.jobid, None)
if alps_res is None:
_logger.warning('%s: Unable to re-reserve ALPS resources.',
......@@ -1032,9 +1054,28 @@ class CraySystem(BaseSystem):
# try to confirm, if we fail at confirmation, try to reserve same
# resource set again
ALPSBridge.confirm(alps_res.alps_res_id, pg_id)
_logger.debug('confirming with pagg_id %s', pg_id)
ALPSBridge.confirm(int(alps_res.alps_res_id), pg_id)
return True
@exposed
def interactive_job_complete (self, jobid):
"""Will terminate the specified interactive job
"""
job_not_found = True
for pg in self.process_manager.process_groups.itervalues():
if pg.jobid == jobid:
job_not_found = False
if pg.mode == 'interactive':
pg.interactive_complete = True
else:
msg = "Job %s not an interactive" % str(jobid)
self.logger.error(msg)
raise JobNotInteractive(msg)
break
if job_not_found:
self.logger.warning("%s: Interactive job not found", str(jobid))
return
class ALPSReservation(object):
'''Container for ALPS Reservation information. Can be used to update
......
......@@ -35,6 +35,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
compatible with the ProcessGroupDict class.
'''
self.pgroup_type = pgroup_type
self._common_init_restart()
......@@ -45,8 +46,15 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''
if state is None:
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
else:
self.process_groups = state['process_groups']
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
_logger.info("%s", state['process_groups'])
for pgroup in state['process_groups']:
pg = self.process_groups.item_cls().__setstate__(pgroup)
self.process_groups[pg.id] = pg
self.process_groups.q_add(state['process_groups'])
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
......@@ -56,7 +64,8 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
def __getstate__(self):
state = {}
state['process_groups'] = self.process_groups
state['process_groups'] = [pg.__getstate__ for pg in
self.process_groups.values()]
state['next_pg_id'] = self.process_groups.id_gen.idnum + 1
return state
......@@ -97,7 +106,10 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''
signaled_pgs = []
for pgid in pgids:
if self.process_groups[pgid].signal(signame):
if self.process_groups[pgid].mode == 'interactive':
self.process_groups[pgid].interactive_complete = True
signaled_pgs.append(self.process_groups[pgid])
elif self.process_groups[pgid].signal(signame):
signaled_pgs.append(self.process_groups[pgid])
return signaled_pgs
......@@ -143,8 +155,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
for forker in self.forkers:
completed[forker] = []
try:
child_data = ComponentProxy(forker).get_children("process group",
None)
child_data = ComponentProxy(forker).get_children("process group", None)
except ComponentLookupError, e:
_logger.error("failed to contact the %s component to obtain a list of children", forker)
except:
......@@ -159,6 +170,14 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
pg_id = pg.id
child_uid = (pg.forker, pg.head_pid)
if child_uid not in children:
if pg.mode == 'interactive':
#interactive job, there is no child job
if pg.interactive_complete:
completed_pgs.append(pg)
#not really orphaned, but this causes the proper cleanup
#to occur
orphaned.append(pg_id)
continue
orphaned.append(pg_id)
_logger.warning('%s: orphaned job exited with unknown status', pg.jobid)
pg.exit_status = 1234567 #FIXME: what should this sentinel be?
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment