Commit 086f7b6b authored by Paul Rich's avatar Paul Rich
Browse files

core implementation of interactive functionality

parent 8fdaae4a
......@@ -68,6 +68,7 @@ from Cobalt.arg_parser import ArgParse
from Cobalt.Util import get_config_option, init_cobalt_config, sleep
from Cobalt.Proxy import ComponentProxy
import xmlrpclib
import subprocess
__revision__ = '$Revision: 559 $'
__version__ = '$Version$'
......@@ -83,7 +84,7 @@ def on_interrupt(sig, func=None):
Interrupt Handler to cleanup the interactive job if the user interrupts
Will contain two static variables: count and exit.
'count' will keep track how many interruptions happened and
'count' will keep track how many interruptions happened and
'exit' flags whether we completely exit once the interrupt occurs.
on_interrupt.count += 1
......@@ -368,6 +369,28 @@ def parse_options(parser, spec, opts, opt2spec, def_spec):
opts['disable_preboot'] = not spec['script_preboot']
return opt_count
def fetch_pgid(user, jobid, loc):
'''fetch and set pgid for user shell. Needed for cray systems'''
if client_utils.component_call(SYSMGR, False, 'get_implementation', ()) == 'alpssystem':
pgid = os.getpgid(0)
spec = [{'user': user, 'jobid': jobid, 'pgid': pgid, 'location':loc}]
if not client_utils.component_call(SYSMGR, False, 'confirm_alps_reservation', (spec)):
logger.error('Unable to confirm ALPS reservation. Exiting.')
def exec_user_shell(user, jobid, loc):
'''Execute shell for user for interactive jobs. Uses the user's defined
SHELL. Will also send the pgid for cray systems so that aprun will work
from within the shell.
Wait until termination.
proc = subprocess.Popen([os.environ['SHELL']], shell=True,
preexec_fn=(lambda: fetch_pgid(user, jobid, loc)))
os.waitpid(, 0)
def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
This will create the shell or ssh session for user
......@@ -396,11 +419,14 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):"Opening interactive session to %s", loc)
if deljob:
os.system("/usr/bin/ssh -o \"SendEnv COBALT_NODEFILE COBALT_JOBID\" %s" % (loc))
if impl == 'alpssystem':
exec_user_shell(user, jobid, loc)
# Wait for job to start
query = [{'tag':'job', 'jobid':jobid, 'location':'*', 'state':"*", 'resid':"*"}]
query = [{'tag':'job', 'jobid':jobid, 'location':'*', 'state':"*",
'resid':"*"}]"Wait for job %s to start...", str(jobid))
while True:
......@@ -997,6 +997,45 @@ class CraySystem(BaseSystem):
compact_num_list(mod_nodes), user)
return mod_nodes
def confirm_alps_reservation(self, specs):
'''confirm or rereserve if needed the ALPS reservation for an
interactive job.
pg = self.process_manager.process_groups[int(specs['jobid'])]
pg_id = int(specs['pg_id'])
except KeyError:
return False
# Try to find the alps_res_id for this job. if we don't have it, then we
# need to reacquire the resource reservation. The job locations will be
# critical for making this work.
with self._node_lock:
# do not want to hit this during an update.
alps_res = self.alps_reservations.get(pg.jobid, None)
# find nodes for jobid. If we don't have sufficient nodes, job
# should die
job_nodes = [node for node in self.nodes if node.reserved_jobid == pg.jobid]
if len(job_nodes) == 0:
_logger.warning('%s: No nodes reserved for job.', pg.label)
return False
new_time = job_nodes[0].reserved_until
node_list = compact_num_list([node.node_id for node in job_nodes])
if alps_res is None:
self._ALPS_reserve_resources(pg.jobid, new_time, node_list)
alps_res = self.alps_reservations.get(pg.jobid, None)
if alps_res is None:
_logger.warning('%s: Unable to re-reserve ALPS resources.',
return False
# try to confirm, if we fail at confirmation, try to reserve same
# resource set again
ALPSBridge.confirm(alps_res.alps_res_id, pg_id)
return True
class ALPSReservation(object):
'''Container for ALPS Reservation information. Can be used to update
reservations and also internally relases reservation.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment