Commit 2c2d8d29 authored by Paul Rich's avatar Paul Rich
Browse files

Initial commit of functional qsub -I eLogin handling. Testing pending.

parent 9dba3670
......@@ -420,6 +420,8 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
if deljob:
os.system("/usr/bin/ssh -o \"SendEnv COBALT_NODEFILE COBALT_JOBID\" %s" % (loc))
if impl == 'alps_system':
# We may need to use a remote host depending on whether or not we
# are on an eLogin.
exec_user_shell(user, jobid, loc)
else:
os.system(os.environ['SHELL'])
......@@ -454,6 +456,19 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
return deljob
def qsub_remote_host(host):
'''On ALPS eLogins, we need to remote interactive mode qusubs to a host that
actually can run aprun. If you don't do this, authentication to the batch
reservation fails due to the session id being used to authenticate and the
eLogin aprun being a ssh wrapper.
'''
SSH_CMD = '/usr/bin/ssh'
# And yes, that behavior is inherently broken.
ssh_cmd = [SSH_CMD, '-t', host]
ssh_cmd.extend(sys.argv)
return subprocess.call(ssh_cmd)
def run_job(parser, user, spec, opts):
"""
run the job
......@@ -461,6 +476,8 @@ def run_job(parser, user, spec, opts):
jobid = None
deljob = True
exc_occurred = False
interactive_remote_host = opts.get('ssh_host', None)
try:
not_exit_on_interrupt()
jobs = client_utils.component_call(QUEMGR, False, 'add_jobs',([spec],), False)
......@@ -492,6 +509,8 @@ def main():
# setup logging for client. The clients should call this before doing anything else.
client_utils.setup_logging(logging.INFO)
spec = {} # map of destination option strings and parsed values
opts = {} # old map
opt2spec = {}
......@@ -555,9 +574,22 @@ def main():
check_inputfile(parser, spec)
not_exit_on_interrupt()
opts['qsub_host'] = socket.gethostname()
print opts['qsub_host']
opts = client_utils.component_call(SYSMGR, False, 'validate_job',(opts,))
impl = client_utils.component_call(SYSMGR, False, 'get_implementation',())
exit_on_interrupt()
if impl in ['alps_system']:
# If we're interactive, remote and go.
if opts['mode'] == 'interactive':
if opts.get('ssh_host', None) is not None:
print opts['qsub_host'], opts.get('ssh_host', None)
if opts['qsub_host'] != opts['ssh_host']:
#remote execute qsub on the ssh_host
client_utils.logger.info('Connecting to %s for interactive qsub...', opts['ssh_host'])
sys.exit(qsub_remote_host(opts['ssh_host'])) # return status from qsub-ssh
filters = client_utils.get_filters()
client_utils.process_filters(filters, spec)
update_spec(parser, opts, spec, opt2spec)
......
......@@ -18,13 +18,9 @@ from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
from Cobalt.Components.system.ALPSProcessGroup import ALPSProcessGroup
from Cobalt.Exceptions import ComponentLookupError
from Cobalt.Exceptions import JobNotInteractive
<<<<<<< Updated upstream
from Cobalt.Components.system.ALPSProcessGroup import ALPSProcessGroup
from Cobalt.Exceptions import JobValidationError
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
=======
from Cobalt.Exceptions import JobValidationError
>>>>>>> Stashed changes
from Cobalt.Util import compact_num_list, expand_num_list
from Cobalt.Util import init_cobalt_config, get_config_option
......@@ -47,8 +43,9 @@ CLEANUP_DRAIN_WINDOW = get_config_option('system', 'cleanup_drain_window', 300)
#Epsilon for backfilling. This system does not do this on a per-node basis.
BACKFILL_EPSILON = int(get_config_option('system', 'backfill_epsilon', 120))
ELOGIN_HOSTS = [host for host in ":".split(get_config_option('alpssystem', 'elogin_hosts', ''))]
ELOGIN_HOSTS = [host for host in get_config_option('alpssystem', 'elogin_hosts', '').split(':')]
if ELOGIN_HOSTS == ['']:
ELOGIN_HOSTS = []
DRAIN_MODES = ['first-fit', 'backfill']
CLEANING_ID = -1
......@@ -1245,7 +1242,6 @@ class CraySystem(BaseSystem):
spec['mode'] = 'script'
if spec['mode'] not in ['script', 'interactive']:
raise JobValidationError("Mode %s is not supported on Cray systems." % mode)
# FIXME: Pull this out of the system configuration from ALPS ultimately.
# For now, set this from config for the PE count per node
spec['nodecount'] = int(spec['nodecount'])
......@@ -1262,13 +1258,14 @@ class CraySystem(BaseSystem):
# Handle which script host should handle their job if they're on a
# login.
if spec.get('qsub_host', None) in ELOGIN_HOSTS:
spec['ssh_host'] = self._get_ssh_host()
try:
spec['ssh_host'] = self.process_manager.select_ssh_host()
except RuntimeError:
spec['ssh_host'] = None
if spec['ssh_host'] is None:
raise JobValidationError('No valid SSH host could be found for interactive job.')
return spec
def _get_ssh_host(self):
return None
@exposed
def verify_locations(self, nodes):
'''verify that a list of nodes exist on this system. Return the list
......
......@@ -16,17 +16,10 @@ from Cobalt.Data import IncrID
_logger = logging.getLogger()
init_cobalt_config()
FORKER_RE = re.compile('forker')
class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Manager for process groups. These are tasks that Cobalt run on behalf of
the user. Typically these are scripts submitted via qsub.'''
SIGKILL_TIMEOUT = int(get_config_option('system', 'sigkill_timeout', 300))
def __init__(self, pgroup_type=ProcessGroup):
'''Initialize process group manager.
......@@ -35,10 +28,10 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
compatible with the ProcessGroupDict class.
'''
self._init_config_vars()
self.pgroup_type = pgroup_type
self._common_init_restart()
def _common_init_restart(self, state=None):
'''common intitialization code for both cold initilaization and
reinitialization.
......@@ -50,15 +43,29 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
else:
self.process_groups = state.get('process_groups',
ProcessGroupDict())
for pg in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pg.id, pg.jobid)
for pgroup in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pgroup.id, pgroup.jobid)
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
self.forker_taskcounts = {} # dict of forkers and counts of pgs attached
self.forker_loctions = {} # dict of forkers a tuple (host, port)
self.remote_qsub_hosts = [] # list of hosts that qsub -I requires
# ssh-ing to a forker host
self.process_groups_lock = RLock()
self.update_launchers()
def _init_config_vars(self):
'''Initialize variables from Cobalt's configuration files.'''
init_cobalt_config()
self.forker_re = re.compile('forker')
self.sigkill_timeout = int(get_config_option('system', 'sigkill_timeout',
300))
self.remote_qsub_hosts = get_config_option('system',
'remote_qsub_hosts', '').split(":")
_logger.info('REMOTE QSUB HOSTS: %s',
", ".join(self.remote_qsub_hosts))
def __getstate__(self):
state = {}
state['process_groups'] = self.process_groups
......@@ -66,6 +73,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
return state
def __setstate__(self, state):
self._init_config_vars()
self._common_init_restart(state)
return self
......@@ -118,7 +126,8 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
now = int(time.time())
self.signal_groups(pgids)
for pg_id in pgids:
self.process_groups[pg_id].sigkill_timeout = int(now + self.SIGKILL_TIMEOUT)
self.process_groups[pg_id].sigkill_timeout = int(now +
self.sigkill_timeout)
def start_groups(self, pgids):
'''Start process groups. Return groups that succeeded startup.
......@@ -234,6 +243,30 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
_logger.info('%s Process Group deleted', pg.label)
return cleaned_groups
def select_ssh_host(self):
'''select a host to ssh to for interactive jobs. Choose the most
lightly loaded host at this time.
Returns:
A string hostname for SSH use to set up an interactive shell
If no locaiton is specified, None is returneddddd
Exceptions:
RuntimeError if no forkers are currently set up.
'''
ordered_forkers = [f[0] for f in
sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
else:
forker = ordered_forkers[0] #this is now a tuple
try:
return self.forker_loctions[forker]
except KeyError:
pass
return None
def update_launchers(self):
'''Update the list of task launchers. This right now works for
alps_forkers. Drop entries that slp doesn't know about and add in ones
......@@ -246,21 +279,27 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
resets the internal forker list to an updated list based on SLP registry
return is void
'''
# TODO: Move this to something Cray-specific later
'''
updated_forker_list = []
new_forker_locations = {}
try:
services = ComponentProxy('service-location').get_services([{'name':'*'}])
services = ComponentProxy('service-location').get_services([{'name': '*',
'location': '*'}])
except Exception:
_logger.critical('Unable to reach service-location', exc_info=True)
return
for service in services:
asf_re = re.compile('alps_script_forker')
host_re = re.compile(r'https://(?P<host>.*):[0-9]*')
if re.match(asf_re, service['name']):
loc = re.match(host_re, service['location'])
if loc:
new_forker_locations[service['name']] = loc
updated_forker_list.append(service['name'])
if service['name'] not in self.forker_taskcounts.keys():
self.forker_taskcounts[service['name']] = 0
# Get currently running tasks from forkers. Different loop?
self.forkers = updated_forker_list
self.forker_loctions = new_forker_locations
return
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment