Commit 3fab168d authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '21-interactive-mode'

parents 3045a0cf 6364161b
......@@ -68,6 +68,7 @@ from Cobalt.arg_parser import ArgParse
from Cobalt.Util import get_config_option, init_cobalt_config, sleep
from Cobalt.Proxy import ComponentProxy
import xmlrpclib
import subprocess
__revision__ = '$Revision: 559 $'
__version__ = '$Version$'
......@@ -83,7 +84,7 @@ def on_interrupt(sig, func=None):
"""
Interrupt Handler to cleanup the interactive job if the user interrupts
Will contain two static variables: count and exit.
'count' will keep track how many interruptions happened and
'count' will keep track how many interruptions happened and
'exit' flags whether we completely exit once the interrupt occurs.
"""
on_interrupt.count += 1
......@@ -368,6 +369,29 @@ def parse_options(parser, spec, opts, opt2spec, def_spec):
opts['disable_preboot'] = not spec['script_preboot']
return opt_count
def fetch_pgid(user, jobid, loc, pgid=None):
'''fetch and set pgid for user shell. Needed for cray systems'''
if client_utils.component_call(SYSMGR, False, 'get_implementation', ()) == 'alps_system':
#Cray is apparently using the session id for interactive jobs.
spec = [{'user': user, 'jobid': jobid, 'pgid': pgid, 'location':loc}]
if not client_utils.component_call(SYSMGR, False, 'confirm_alps_reservation', (spec)):
client_utils.logger.error('Unable to confirm ALPS reservation. Exiting.')
sys.exit(1)
return
def exec_user_shell(user, jobid, loc):
'''Execute shell for user for interactive jobs. Uses the user's defined
SHELL. Will also send the pgid for cray systems so that aprun will work
from within the shell.
Wait until termination.
'''
pgid = os.getppid()
proc = subprocess.Popen([os.environ['SHELL']], shell=True,
preexec_fn=(lambda: fetch_pgid(user, jobid, loc, pgid=pgid)))
os.waitpid(proc.pid, 0)
def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
"""
This will create the shell or ssh session for user
......@@ -376,7 +400,6 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
# save whether we are running on a cluster system
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
exit_on_interrupt()
deljob = True if impl == "cluster_system" else False
def start_session(loc, resid, nodes, procs):
......@@ -396,11 +419,14 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
client_utils.logger.info("Opening interactive session to %s", loc)
if deljob:
os.system("/usr/bin/ssh -o \"SendEnv COBALT_NODEFILE COBALT_JOBID\" %s" % (loc))
if impl == 'alps_system':
exec_user_shell(user, jobid, loc)
else:
os.system(os.environ['SHELL'])
# Wait for job to start
query = [{'tag':'job', 'jobid':jobid, 'location':'*', 'state':"*", 'resid':"*"}]
query = [{'tag':'job', 'jobid':jobid, 'location':'*', 'state':"*",
'resid':"*"}]
client_utils.logger.info("Wait for job %s to start...", str(jobid))
while True:
......
......@@ -794,7 +794,7 @@ class BaseForker (Component):
break
elif exc.errno in [errno.EBADF, errno.EINVAL, errno.EPIPE]:
_logger.error("%s: Error reading stdout from child pipe.",
child.label)
child.label, excinfo=True)
break
elif exc.errno in [errno.ENITR]:
#Try again
......
......@@ -117,7 +117,7 @@ def confirm(alps_res_id, pg_id):
None Yet.
'''
params = {'pagg_id': pg_id,
'reservation': alps_res_id}
'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
params=params)))
return retval
......
......@@ -14,6 +14,7 @@ from Cobalt.Components.system.base_system import BaseSystem
from Cobalt.Components.system.CrayNode import CrayNode
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
from Cobalt.Exceptions import ComponentLookupError
from Cobalt.Exceptions import JobNotInteractive
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Util import compact_num_list, expand_num_list
from Cobalt.Util import init_cobalt_config, get_config_option
......@@ -31,7 +32,7 @@ TEMP_RESERVATION_TIME = int(get_config_option('alpssystem',
SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.0))
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem',
'pending_startup_timeout', 1200)) #default 20 minutes to account for boot.
APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apkill')
class ALPSProcessGroup(ProcessGroup):
'''ALPS-specific PocessGroup modifications.'''
......@@ -39,6 +40,12 @@ class ALPSProcessGroup(ProcessGroup):
def __init__(self, spec):
super(ALPSProcessGroup, self).__init__(spec)
self.alps_res_id = spec['alps_res_id']
self.interactive_complete = False
#inherit generic getstate and setstate methods from parent
class CraySystem(BaseSystem):
'''Cray/ALPS-specific system component. Behaviors should go here. Direct
......@@ -86,9 +93,11 @@ class CraySystem(BaseSystem):
_logger.info('BRIDGE INITIALIZED')
#process manager setup
if spec is None:
self.process_manager = ProcessGroupManager()
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
else:
self.process_manager = ProcessGroupManager().__setstate__(spec['process_manager'])
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
self.logger.info('pg type %s',
self.process_manager.process_groups.item_cls)
#self.process_manager.forkers.append('alps_script_forker')
self.process_manager.update_launchers()
self.pending_start_timeout = 1200 #20 minutes for long reboots.
......@@ -116,6 +125,7 @@ class CraySystem(BaseSystem):
tuple())
_logger.info('UPDATE THREAD STARTED')
self.current_equivalence_classes = []
self.killing_jobs = {}
def __getstate__(self):
'''Save process, alps-reservation information, along with base
......@@ -155,8 +165,11 @@ class CraySystem(BaseSystem):
# much more compact representation of data. Reservednodes gives
# which notes are reserved.
inventory = ALPSBridge.fetch_inventory()
_logger.info('INVENTORY FETCHED')
system = ALPSBridge.extract_system_node_data(ALPSBridge.system())
_logger.info('SYSTEM DATA FETCHED')
reservations = ALPSBridge.fetch_reservations()
_logger.info('ALPS RESERVATION DATA FETCHED')
# reserved_nodes = ALPSBridge.reserved_nodes()
except Exception:
#don't crash out here. That may trash a statefile.
......@@ -274,6 +287,7 @@ class CraySystem(BaseSystem):
for jobid in startup_time_to_clear:
del self.pending_starts[jobid]
self.check_killing_aprun()
with self._node_lock:
fetch_time_start = time.time()
try:
......@@ -335,7 +349,10 @@ class CraySystem(BaseSystem):
if (alps_res.jobid not in released_res_jobids and
str(node.node_id) in alps_res.node_ids):
#send only one release per iteration
alps_res.release()
apids = alps_res.release()
if apids is not None:
for apid in apids:
self.signal_aprun(apid)
released_res_jobids.append(alps_res.jobid)
#find hardware status
......@@ -419,7 +436,7 @@ class CraySystem(BaseSystem):
tracked_res = self.alps_reservations.get(new_alps_res.jobid, None)
if tracked_res is not None:
try:
tracked_res.release()
apids = tracked_res.release()
except ALPSBridge.ALPSError:
# backend reservation probably is gone, which is why
# we are here in the first place.
......@@ -435,8 +452,67 @@ class CraySystem(BaseSystem):
new_alps_res.release()
else:
self.alps_reservations[str(alps_res['batch_id'])] = new_alps_res
return
def signal_aprun(self, aprun_id, signame='SIGINT'):
'''Signal an aprun by aprun id (application_id). Does not block.
Use check_killing_aprun to determine completion/termination. Does not
depend on the host the aprun(s) was launched from.
Input:
aprun_id - integer application id number.
signame - string name of signal to send (default: SIGINT)
Notes:
Valid signals to apkill are:
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGABRT, SIGUSR1, SIGUSR2, SIGURG,
and SIGWINCH (from apkill(1) man page.) Also allowing SIGKILL.
'''
#Expect changes with an API updte
#mark legal signals from docos
if (signame not in ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM', 'SIGABRT',
'SIGUSR1', 'SIGUSR2', 'SIGURG','SIGWINCH', 'SIGKILL']):
raise ValueError('%s is not a legal value for signame.', signame)
try:
retval = Cobalt.Proxy.ComponentProxy('system_script_forker').fork(
[APKILL_CMD, '-%s' % signame, '%d' % int(aprun_id)],
'aprun_termination', '%s cleanup:'% aprun_id)
_logger.info("killing backend ALPS application_id: %s", aprun_id)
except xmlrpclib.Fault:
_logger.warning("XMLRPC Error while killing backend job: %s, will retry.",
aprun_id, exc_info=True)
except:
_logger.critical("Unknown Error while killing backend job: %s, will retry.",
aprun_id, exc_info=True)
else:
self.killing_jobs[aprun_id] = retval
return
def check_killing_aprun(self):
'''Check that apkill commands have completed and clean them from the
system_script_forker. Allows for non-blocking cleanup initiation.
'''
try:
system_script_forker = Cobalt.Proxy.ComponentProxy('system_script_forker')
except:
self.logger.critical("Cannot connect to system_script_forker.",
exc_info=True)
return
complete_jobs = []
rev_killing_jobs = dict([(v,k) for (k,v) in self.killing_jobs.iteritems()])
removed_jobs = []
current_killing_jobs = system_script_forker.get_children(None, self.killing_jobs.values())
for job in current_killing_jobs:
if job['complete']:
del self.killing_jobs[rev_killing_jobs[int(job['id'])]]
removed_jobs.append(job['id'])
system_script_forker.cleanup_children(removed_jobs)
return
@exposed
def find_queue_equivalence_classes(self, reservation_dict,
active_queue_names, passthrough_blocking_res_list=[]):
......@@ -999,6 +1075,77 @@ class CraySystem(BaseSystem):
compact_num_list(mod_nodes), user)
return mod_nodes
@exposed
def confirm_alps_reservation(self, specs):
'''confirm or rereserve if needed the ALPS reservation for an
interactive job.
'''
try:
pg = None
for pgroup in self.process_manager.process_groups.values():
if pgroup.jobid == int(specs['jobid']):
pg = pgroup
#pg = self.process_manager.process_groups[int(specs['pg_id'])]
pg_id = int(specs['pgid'])
except KeyError:
raise
if pg is None:
raise ValueError('invalid jobid specified')
# Try to find the alps_res_id for this job. if we don't have it, then we
# need to reacquire the source reservation. The job locations will be
# critical for making this work.
with self._node_lock:
# do not want to hit this during an update.
alps_res = self.alps_reservations.get(str(pg.jobid), None)
# find nodes for jobid. If we don't have sufficient nodes, job
# should die
job_nodes = [node for node in self.nodes.values()
if node.reserved_jobid == pg.jobid]
nodecount = len(job_nodes)
if nodecount == 0:
_logger.warning('%s: No nodes reserved for job.', pg.label)
return False
new_time = job_nodes[0].reserved_until
node_list = compact_num_list([node.node_id for node in job_nodes])
if alps_res is None:
job_info = {'user': specs['user'],
'jobid':specs['jobid'],
'nodes': nodecount,
'attrs': {},
}
self._ALPS_reserve_resources(job_info, new_time, node_list)
alps_res = self.alps_reservations.get(pg.jobid, None)
if alps_res is None:
_logger.warning('%s: Unable to re-reserve ALPS resources.',
pg.label)
return False
# try to confirm, if we fail at confirmation, try to reserve same
# resource set again
_logger.debug('confirming with pagg_id %s', pg_id)
ALPSBridge.confirm(int(alps_res.alps_res_id), pg_id)
return True
@exposed
def interactive_job_complete (self, jobid):
"""Will terminate the specified interactive job
"""
job_not_found = True
for pg in self.process_manager.process_groups.itervalues():
if pg.jobid == jobid:
job_not_found = False
if pg.mode == 'interactive':
pg.interactive_complete = True
else:
msg = "Job %s not an interactive" % str(jobid)
self.logger.error(msg)
raise JobNotInteractive(msg)
break
if job_not_found:
self.logger.warning("%s: Interactive job not found", str(jobid))
return
class ALPSReservation(object):
'''Container for ALPS Reservation information. Can be used to update
reservations and also internally relases reservation.
......@@ -1060,17 +1207,38 @@ class ALPSReservation(object):
A reservation may remain if there are still active claims. When all
claims are gone
Returns a list of apids and child_ids for the system script forker
for any apids that are still cleaning.
'''
if self.dying:
#release already issued. Ignore
return
apids = []
status = ALPSBridge.release(self.alps_res_id)
if int(status['claims']) != 0:
_logger.info('ALPS reservation: %s still has %s claims.',
self.alps_res_id, status['claims'])
# fetch reservation information so that we can send kills to
# interactive apruns.
resinfo = ALPSBridge.fetch_reservations()
apids = _find_non_batch_apids(resinfo['reservations'])
else:
_logger.info('ALPS reservation: %s has no claims left.',
self.alps_res_id)
self.dying = True
return apids
def _find_non_batch_apids(resinfo):
'''Extract apids from non-batch items.'''
apids = []
for alps_res in resinfo:
#wow, this is ugly.
for applications in alps_res['ApplicationArray']:
for application in applications.values():
for app_data in application:
for commands in app_data['CommandArray']:
for command in commands.values():
if command[0]['cmd'] != 'BASIL':
apids.append(app_data['application_id'])
return apids
......@@ -35,6 +35,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
compatible with the ProcessGroupDict class.
'''
self.pgroup_type = pgroup_type
self._common_init_restart()
......@@ -45,8 +46,15 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''
if state is None:
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
else:
self.process_groups = state['process_groups']
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
_logger.info("%s", state['process_groups'])
for pgroup in state['process_groups']:
pg = self.process_groups.item_cls().__setstate__(pgroup)
self.process_groups[pg.id] = pg
self.process_groups.q_add(state['process_groups'])
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
......@@ -56,7 +64,8 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
def __getstate__(self):
state = {}
state['process_groups'] = self.process_groups
state['process_groups'] = [pg.__getstate__ for pg in
self.process_groups.values()]
state['next_pg_id'] = self.process_groups.id_gen.idnum + 1
return state
......@@ -97,7 +106,10 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''
signaled_pgs = []
for pgid in pgids:
if self.process_groups[pgid].signal(signame):
if self.process_groups[pgid].mode == 'interactive':
self.process_groups[pgid].interactive_complete = True
signaled_pgs.append(self.process_groups[pgid])
elif self.process_groups[pgid].signal(signame):
signaled_pgs.append(self.process_groups[pgid])
return signaled_pgs
......@@ -143,8 +155,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
for forker in self.forkers:
completed[forker] = []
try:
child_data = ComponentProxy(forker).get_children("process group",
None)
child_data = ComponentProxy(forker).get_children("process group", None)
except ComponentLookupError, e:
_logger.error("failed to contact the %s component to obtain a list of children", forker)
except:
......@@ -159,6 +170,14 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
pg_id = pg.id
child_uid = (pg.forker, pg.head_pid)
if child_uid not in children:
if pg.mode == 'interactive':
#interactive job, there is no child job
if pg.interactive_complete:
completed_pgs.append(pg)
#not really orphaned, but this causes the proper cleanup
#to occur
orphaned.append(pg_id)
continue
orphaned.append(pg_id)
_logger.warning('%s: orphaned job exited with unknown status', pg.jobid)
pg.exit_status = 1234567 #FIXME: what should this sentinel be?
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment