Commit 9d016e1a authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '37-ssh-interactive-mode' into 'develop'

Resolve "SSH to mom/script host for Cray systems"

Closes #37

See merge request !19
parents db7bb983 19a85b5d
......@@ -63,6 +63,13 @@ Path to where the statefiles are stored.
.SS "[system]"
Common system configuration settings. These apply to all types of systems.
.TP
.B elogin_hosts
A ':'-delimited list of hostnames of hosts that users can qsub interactive jobs
on that then have to be run on another node. This is ususally due to
restrictions in the authentication and authorization mechanisms for the mpirun
equivalent on a given system. This is most commonly required for Cray systems
using eLogin nodes.
.TP
.B size
Maximum size of a given system in nodes.
.PP
......
......@@ -387,7 +387,7 @@ def exec_user_shell(user, jobid, loc):
Wait until termination.
'''
pgid = os.getppid()
pgid = os.getsid(0)
proc = subprocess.Popen([os.environ['SHELL']], shell=True,
preexec_fn=(lambda: fetch_pgid(user, jobid, loc, pgid=pgid)))
os.waitpid(proc.pid, 0)
......@@ -420,6 +420,8 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
if deljob:
os.system("/usr/bin/ssh -o \"SendEnv COBALT_NODEFILE COBALT_JOBID\" %s" % (loc))
if impl == 'alps_system':
# We may need to use a remote host depending on whether or not we
# are on an eLogin.
exec_user_shell(user, jobid, loc)
else:
os.system(os.environ['SHELL'])
......@@ -454,6 +456,19 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
return deljob
def qsub_remote_host(host):
'''On ALPS eLogins, we need to remote interactive mode qusubs to a host that
actually can run aprun. If you don't do this, authentication to the batch
reservation fails due to the session id being used to authenticate and the
eLogin aprun being a ssh wrapper.
'''
SSH_CMD = '/usr/bin/ssh'
# And yes, that behavior is inherently broken.
ssh_cmd = [SSH_CMD, '-t', host]
ssh_cmd.extend(sys.argv)
return subprocess.call(ssh_cmd)
def run_job(parser, user, spec, opts):
"""
run the job
......@@ -461,6 +476,8 @@ def run_job(parser, user, spec, opts):
jobid = None
deljob = True
exc_occurred = False
interactive_remote_host = opts.get('ssh_host', None)
try:
not_exit_on_interrupt()
jobs = client_utils.component_call(QUEMGR, False, 'add_jobs',([spec],), False)
......@@ -492,6 +509,8 @@ def main():
# setup logging for client. The clients should call this before doing anything else.
client_utils.setup_logging(logging.INFO)
spec = {} # map of destination option strings and parsed values
opts = {} # old map
opt2spec = {}
......@@ -555,9 +574,22 @@ def main():
check_inputfile(parser, spec)
not_exit_on_interrupt()
opts['qsub_host'] = socket.gethostname()
print opts['qsub_host']
opts = client_utils.component_call(SYSMGR, False, 'validate_job',(opts,))
impl = client_utils.component_call(SYSMGR, False, 'get_implementation',())
exit_on_interrupt()
if impl in ['alps_system']:
# If we're interactive, remote and go.
if opts['mode'] == 'interactive':
if opts.get('ssh_host', None) is not None:
print opts['qsub_host'], opts.get('ssh_host', None)
if opts['qsub_host'] != opts['ssh_host']:
#remote execute qsub on the ssh_host
client_utils.logger.info('Connecting to %s for interactive qsub...', opts['ssh_host'])
sys.exit(qsub_remote_host(opts['ssh_host'])) # return status from qsub-ssh
filters = client_utils.get_filters()
client_utils.process_filters(filters, spec)
update_spec(parser, opts, spec, opt2spec)
......
"""Process group for Cray systems. The earliest system this targets is the
XC-40 running ALPS. This adds server-side information for interactive job
launch that is unique to the ALPS environment and is necessary on systems using
eLogin nodes (formerly known as CDL nodes).
"""
import time
import logging
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
_logger = logging.getLogger(__name__)
init_cobalt_config()
PGROUP_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pgroup_startup_timeout', 120.0))
USER_SESSION_HOSTS = [host.strip() for host in
get_config_option('alpssystem', 'user_session_hosts', '').split(':')]
class ALPSProcessGroup(ProcessGroup):
'''ALPS-specific PocessGroup modifications.'''
def __init__(self, spec):
super(ALPSProcessGroup, self).__init__(spec)
self.alps_res_id = spec.get('alps_res_id', None)
self.interactive_complete = False
now = int(time.time())
self.startup_timeout = int(spec.get("pgroup_startup_timeout",
now + PGROUP_STARTUP_TIMEOUT))
def start(self):
'''Start the process group. The ALPS version also sets the host to use.
This host is in a list provided by the configuration file. If the host
has an alps_script_forker instance running on it, those currently
running jobs will be taken into account when selecting where to run.
The forker host with the lowest number of locations
Args:
None
Returns:
None
Raises:
ProcessGroupStartupError: The start for the process group has failed
and no child process id has been returned.
Side Effects:
Prompts the specified forker to start a job. In the event of an
interactive job, sets a fake head pid (1) and notes which host
should be used for the interactive job launch.
'''
if self.mode == 'interactive':
if len(USER_SESSION_HOSTS):
pass
return super(ALPSProcessGroup, self).start()
......@@ -7,16 +7,17 @@ import time
import sys
import xmlrpclib
import json
import ConfigParser
import Cobalt.Util
import Cobalt.Components.system.AlpsBridge as ALPSBridge
from Cobalt.Components.base import Component, exposed, automatic, query, locking
from Cobalt.Components.system.base_system import BaseSystem
from Cobalt.Components.system.CrayNode import CrayNode
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
from Cobalt.Components.system.ALPSProcessGroup import ALPSProcessGroup
from Cobalt.Exceptions import ComponentLookupError
from Cobalt.Exceptions import JobNotInteractive
from Cobalt.Components.system.ALPSProcessGroup import ALPSProcessGroup
from Cobalt.Exceptions import JobValidationError
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Util import compact_num_list, expand_num_list
......@@ -34,17 +35,20 @@ SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem',
'pending_startup_timeout', 1200)) #default 20 minutes to account for boot.
APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apkill')
PGROUP_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pgroup_startup_timeout', 120.0))
DRAIN_MODE = get_config_option('system', 'drain_mode', 'first-fit')
#cleanup time in seconds
CLEANUP_DRAIN_WINDOW = get_config_option('system', 'cleanup_drain_window', 300)
#Epsilon for backfilling. This system does not do this on a per-node basis.
BACKFILL_EPSILON = int(get_config_option('system', 'backfill_epsilon', 120))
ELOGIN_HOSTS = [host for host in get_config_option('system', 'elogin_hosts', '').split(':')]
if ELOGIN_HOSTS == ['']:
ELOGIN_HOSTS = []
DRAIN_MODES = ['first-fit', 'backfill']
CLEANING_ID = -1
def chain_loc_list(loc_list):
'''Take a list of compact Cray locations,
expand and concatenate them.
......@@ -55,18 +59,6 @@ def chain_loc_list(loc_list):
retlist.extend(expand_num_list(locs))
return retlist
class ALPSProcessGroup(ProcessGroup):
'''ALPS-specific PocessGroup modifications.'''
def __init__(self, spec):
super(ALPSProcessGroup, self).__init__(spec)
self.alps_res_id = spec.get('alps_res_id', None)
self.interactive_complete = False
now = time.time()
self.startup_timeout = int(spec.get("pgroup_startup_timeout",
now + PGROUP_STARTUP_TIMEOUT))
#inherit generic getstate and setstate methods from parent
class CraySystem(BaseSystem):
'''Cray/ALPS-specific system component. Behaviors should go here. Direct
......@@ -96,6 +88,11 @@ class CraySystem(BaseSystem):
component.
'''
try:
self.system_size = int(get_config_option('system', 'size'))
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
_logger.critical('ALPS SYSTEM: ABORT STARTUP: System size must be specified in the [system] section of the cobalt configuration file.')
sys.exit(1)
if DRAIN_MODE not in DRAIN_MODES:
#abort startup, we have a completely invalid config.
_logger.critical('ALPS SYSTEM: ABORT STARTUP: %s is not a valid drain mode. Must be one of %s.',
......@@ -1119,6 +1116,18 @@ class CraySystem(BaseSystem):
'''Add process groups and start their runs. Adjust the resource
reservation time to full run time at this point.
Args:
specs: A list of dictionaries that contain information on the Cobalt
job required to start the backend process.
Returns:
A created process group object. This is wrapped and sent via XMLRPC
to the caller.
Side Effects:
Invokes a forker component to run a user script. In the event of a
fatal startup error, will release resource reservations.
'''
start_apg_timer = int(time.time())
......@@ -1236,14 +1245,28 @@ class CraySystem(BaseSystem):
spec['mode'] = 'script'
if spec['mode'] not in ['script', 'interactive']:
raise JobValidationError("Mode %s is not supported on Cray systems." % mode)
# FIXME: Pull this out of the system configuration from ALPS ultimately.
# For now, set this from config for the PE count per node
spec['nodecount'] = int(spec['nodecount'])
# proccount = spec.get('proccount', None)
# if proccount is None:
# nodes *
spec['proccount'] = spec['nodecount']
# nodes *
if spec['nodecount'] > self.system_size:
raise JobValidationError('Job requested %s nodes. Maximum permitted size is %s' %
(spec['nodecount'], self.system_size))
spec['proccount'] = spec['nodecount'] #set multiplier for default depth
mode = spec.get('mode', 'script')
spec['mode'] = mode
if mode == 'interactive':
# Handle which script host should handle their job if they're on a
# login.
if spec.get('qsub_host', None) in ELOGIN_HOSTS:
try:
spec['ssh_host'] = self.process_manager.select_ssh_host()
except RuntimeError:
spec['ssh_host'] = None
if spec['ssh_host'] is None:
raise JobValidationError('No valid SSH host could be found for interactive job.')
return spec
@exposed
......@@ -1341,7 +1364,8 @@ class CraySystem(BaseSystem):
# try to confirm, if we fail at confirmation, try to reserve same
# resource set again
_logger.debug('confirming with pagg_id %s', pg_id)
_logger.info('%s/%s: confirming with pagg_id %s', specs['jobid'],
specs['user'], pg_id)
ALPSBridge.confirm(int(alps_res.alps_res_id), pg_id)
return True
......
......@@ -16,17 +16,10 @@ from Cobalt.Data import IncrID
_logger = logging.getLogger()
init_cobalt_config()
FORKER_RE = re.compile('forker')
class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Manager for process groups. These are tasks that Cobalt run on behalf of
the user. Typically these are scripts submitted via qsub.'''
SIGKILL_TIMEOUT = int(get_config_option('system', 'sigkill_timeout', 300))
def __init__(self, pgroup_type=ProcessGroup):
'''Initialize process group manager.
......@@ -35,10 +28,10 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
compatible with the ProcessGroupDict class.
'''
self._init_config_vars()
self.pgroup_type = pgroup_type
self._common_init_restart()
def _common_init_restart(self, state=None):
'''common intitialization code for both cold initilaization and
reinitialization.
......@@ -50,15 +43,29 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
else:
self.process_groups = state.get('process_groups',
ProcessGroupDict())
for pg in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pg.id, pg.jobid)
for pgroup in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pgroup.id, pgroup.jobid)
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
self.forker_taskcounts = {} # dict of forkers and counts of pgs attached
self.forker_locations = {} # dict of forkers a tuple (host, port)
self.remote_qsub_hosts = [] # list of hosts that qsub -I requires
# ssh-ing to a forker host
self.process_groups_lock = RLock()
self.update_launchers()
def _init_config_vars(self):
'''Initialize variables from Cobalt's configuration files.'''
init_cobalt_config()
self.forker_re = re.compile('forker')
self.sigkill_timeout = int(get_config_option('system', 'sigkill_timeout',
300))
self.remote_qsub_hosts = get_config_option('system',
'elogin_hosts', '').split(":")
_logger.info('REMOTE QSUB HOSTS: %s',
", ".join(self.remote_qsub_hosts))
def __getstate__(self):
state = {}
state['process_groups'] = self.process_groups
......@@ -66,6 +73,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
return state
def __setstate__(self, state):
self._init_config_vars()
self._common_init_restart(state)
return self
......@@ -118,7 +126,8 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
now = int(time.time())
self.signal_groups(pgids)
for pg_id in pgids:
self.process_groups[pg_id].sigkill_timeout = int(now + self.SIGKILL_TIMEOUT)
self.process_groups[pg_id].sigkill_timeout = int(now +
self.sigkill_timeout)
def start_groups(self, pgids):
'''Start process groups. Return groups that succeeded startup.
......@@ -234,6 +243,33 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
_logger.info('%s Process Group deleted', pg.label)
return cleaned_groups
def select_ssh_host(self):
'''select a host to ssh to for interactive jobs. Choose the most
lightly loaded host at this time.
Returns:
A string hostname for SSH use to set up an interactive shell
If no locaiton is specified, None is returneddddd
Exceptions:
RuntimeError if no forkers are currently set up.
'''
ordered_forkers = [f[0] for f in
sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
else:
if len(ordered_forkers) > 0:
forker = ordered_forkers[0] #this is now a tuple
else:
return None
try:
return self.forker_locations[forker]
except KeyError:
pass
return None
def update_launchers(self):
'''Update the list of task launchers. This right now works for
alps_forkers. Drop entries that slp doesn't know about and add in ones
......@@ -246,21 +282,27 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
resets the internal forker list to an updated list based on SLP registry
return is void
'''
# TODO: Move this to something Cray-specific later
'''
updated_forker_list = []
new_forker_locations = {}
try:
services = ComponentProxy('service-location').get_services([{'name':'*'}])
services = ComponentProxy('service-location').get_services([{'name': '*',
'location': '*'}])
except Exception:
_logger.critical('Unable to reach service-location', exc_info=True)
return
for service in services:
asf_re = re.compile('alps_script_forker')
host_re = re.compile(r'https://(?P<host>.*):[0-9]*')
if re.match(asf_re, service['name']):
loc = re.match(host_re, service['location']).group('host')
if loc:
new_forker_locations[service['name']] = loc
updated_forker_list.append(service['name'])
if service['name'] not in self.forker_taskcounts.keys():
self.forker_taskcounts[service['name']] = 0
# Get currently running tasks from forkers. Different loop?
self.forkers = updated_forker_list
self.forker_locations = new_forker_locations
return
# Test Cray-specific utilities/calls.
SYSTEM_CONFIG_ENTRY = """
[system]
size: 10
elogin_hosts: foo:bar
"""
import Cobalt
import TestCobalt
config_file = Cobalt.CONFIG_FILES[0]
config_fp = open(config_file, "w")
config_fp.write(SYSTEM_CONFIG_ENTRY)
config_fp.close()
from nose.tools import raises
from testsuite.TestCobalt.Utilities.assert_functions import assert_match, assert_not_match
from Cobalt.Components.system.CrayNode import CrayNode
......@@ -10,7 +22,6 @@ import Cobalt.Components.system.AlpsBridge as AlpsBridge
from mock import MagicMock, Mock, patch
def is_match(a, b):
return a is b
......@@ -954,3 +965,22 @@ class TestCraySystem(object):
self.assert_draining(i, 550, 2)
for i in [1, 4, 5]:
self.assert_not_draining(i)
def test_validate_job_normal(self):
'''CraySystem.validate_job: valid job submission'''
expected = {'mode':'script', 'nodecount': 1, 'proccount': 1}
spec = {'mode':'script', 'nodecount': 1}
ret_spec = self.system.validate_job(spec)
assert expected == ret_spec, "Invalid spec returned"
@raises(Cobalt.Exceptions.JobValidationError)
def test_validate_job_reject_too_large(self):
'''CraySystem.validate_job: reject too big job'''
spec = {'mode':'script', 'nodecount': 9999}
ret_spec = self.system.validate_job(spec)
@raises(Cobalt.Exceptions.JobValidationError)
def test_validate_job_reject_no_host(self):
'''CraySystem.validate_job: reject missing ssh host'''
spec = {'mode':'interactive', 'nodecount': 1, 'qsub_host':'foo'}
ret_spec = self.system.validate_job(spec)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment