Commit 76c466ca authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '101-fix-disabled-forkers' into 'develop'

Forkers that have unregistered will no longer be used to start jobs.

Closes #101

See merge request aig/cobalt!67
parents a9702838 55c4bec8
......@@ -122,9 +122,17 @@ class CraySystem(BaseSystem):
if spec is None:
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
else:
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
#self.process_manager.forkers.append('alps_script_forker')
spec_version = spec.get('alps_system_statefile_version', 1)
if spec_version <= 1:
# Compat for old version of process manager information that was stored as a dict
# rather than an actual object. Yes, this results in a double initialize. Ugly, but
# doesn't hurt anything, yet.
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
else:
self.process_manager = spec['process_manager']
self.process_manager.pgroup_type = ALPSProcessGroup
self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
self.process_manager.update_launchers()
self.pending_start_timeout = PENDING_STARTUP_TIMEOUT
_logger.info('PROCESS MANAGER INTIALIZED')
......@@ -164,14 +172,15 @@ class CraySystem(BaseSystem):
information'''
state = {}
state.update(super(CraySystem, self).__getstate__())
state['alps_system_statefile_version'] = 1
state['process_manager'] = self.process_manager.__getstate__()
state['alps_system_statefile_version'] = 2
state['process_manager'] = self.process_manager
state['alps_reservations'] = self.alps_reservations
state['node_info'] = self.nodes
return state
def __setstate__(self, state):
start_time = time.time()
_logger.info('INITIALIZING FROM ALPS SYSTEM STATE FILE VERSION %s', state.get('alps_system_statefile_version', None))
super(CraySystem, self).__setstate__(state)
_logger.info('BASE SYSTEM INITIALIZED')
self._common_init_restart(state)
......@@ -1277,7 +1286,11 @@ class CraySystem(BaseSystem):
alps_res = self.alps_reservations.get(str(spec['jobid']), None)
if alps_res is not None:
spec['alps_res_id'] = alps_res.alps_res_id
new_pgroups = self.process_manager.init_groups(specs)
try:
new_pgroups = self.process_manager.init_groups(specs)
except RuntimeError:
_logger.error('Job %s: Unable to initialize process group.', spec['jobid'])
raise
for pgroup in new_pgroups:
_logger.info('%s: process group %s created to track job status',
pgroup.label, pgroup.id)
......
......@@ -6,6 +6,7 @@ import logging
import time
import Queue
import re
import xmlrpclib
from threading import RLock
from Cobalt.Proxy import ComponentProxy
from Cobalt.DataTypes.ProcessGroup import ProcessGroup, ProcessGroupDict
......@@ -40,8 +41,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
else:
self.process_groups = state.get('process_groups',
ProcessGroupDict())
self.process_groups = state.get('process_groups', ProcessGroupDict())
for pgroup in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pgroup.id, pgroup.jobid)
self.process_groups.id_gen.set(int(state['next_pg_id']))
......@@ -49,6 +49,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.forkers = [] #list of forker identifiers to use with ComponentProxy
self.forker_taskcounts = {} # dict of forkers and counts of pgs attached
self.forker_locations = {} # dict of forkers a tuple (host, port)
self.forker_reachable = {} # Is the forker currently reachable?
self.remote_qsub_hosts = [] # list of hosts that qsub -I requires
# ssh-ing to a forker host
self.process_groups_lock = RLock()
......@@ -67,12 +68,14 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
def __getstate__(self):
state = {}
state['pgroup_type'] = self.pgroup_type
state['process_groups'] = self.process_groups
state['next_pg_id'] = self.process_groups.id_gen.idnum + 1
return state
def __setstate__(self, state):
self._init_config_vars()
self.pgroup_type = state.get('pgroup_type', ProcessGroup)
self._common_init_restart(state)
return self
......@@ -91,17 +94,45 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
# modify the forker in specs to force the job to round-robbin forkers
with self.process_groups_lock:
for spec in specs:
ordered_forkers = [f[0] for f in
sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
else:
spec['forker'] = ordered_forkers[0] #this is now a tuple
self.forker_taskcounts[spec['forker']] += 1
_logger.info("Job %s using forker %s", spec['jobid'], spec['forker'])
try:
spec['forker'] = self._select_forker(spec['jobid'])
except RuntimeError:
_logger.error('Job %s: Unable to find valid forker to associate with pending process group. Failing startup.',
spec['jobid'])
raise
return self.process_groups.q_add(specs)
def signal_groups(self, pgids, signame="SIGINT"):
def _select_forker(self, jobid):
'''Select a forker from the list of registered forkers for job execution.
This favors the forker with the lowest current running jobcount.
Args:
jobid - jobid for ProcessGroup object that we are assigning a forker to.
Returns:
String name of forker to use. If none found, None returned
Exceptions:
Raises a RuntimeError if there are no registered forkers, or none otherwise available.
'''
selected_forker = None
ordered_forkers = [f[0] for f in sorted(self.forker_taskcounts.items(), key=lambda x: x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("Job %s: No forkers registered!", jobid)
else:
for forker in ordered_forkers:
if self.forker_reachable[forker]:
selected_forker = forker
self.forker_taskcounts[selected_forker] += 1
_logger.info("Job %s using forker %s", jobid, selected_forker)
break
if selected_forker is None:
# We didn't find a forker, raise a RuntimeError for this
raise RuntimeError("Job %s: No valid forkers found!" % jobid)
return selected_forker
def signal_groups(self, pgids, signame="SIGTERM"):
'''Send signal with signame to a list of process groups.
Returns:
......@@ -136,14 +167,25 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
with self.process_groups_lock:
started = []
for pg_id in pgids:
process_group = self.process_groups[pg_id]
try:
self.process_groups[pg_id].start()
except ProcessGroupStartupError:
_logger.error("%s: Unable to start process group.",
self.process_groups[pg_id].label)
process_group.start()
except ComponentLookupError:
# Retry this with a different forker, if we run out of forkers, then this startup fails.
self.forker_reachable[process_group.forker] = False
self.forker_taskcounts[process_group.forker] -= 1 #decrement since we failed to use this forker.
try:
process_group.forker = self._select_forker(process_group.jobid)
except RuntimeError as err:
#No forkers left!
_logger.critical('%s: Unable to assign forker to starting job. Failing startup: %s',
process_group.label, err.message)
raise ProcessGroupStartupError('No functional forkers.')
except (ProcessGroupStartupError, xmlrpclib.Fault, xmlrpclib.ProtocolError):
_logger.error("%s: Unable to start process group.", process_group.label)
else:
started.append(pg_id)
self.process_groups[pg_id].startup_timeout = 0
process_group.startup_timeout = 0
return started
#make automatic get final status of process group
......@@ -281,34 +323,49 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
alps_forkers. Drop entries that slp doesn't know about and add in ones
that it does.
Will want to run this in the normal update loop
Args: None
Returns: None
If we have no launchers, we should prevent jobs from starting.
Side Effects:
Updates current active forkers. If a new forker is found this is
added to the list we can select from. If a loss-of-contact is
detected, by a forker being unregistered with SLP, then the forker
data will be retained for possible reconnection, while it will at
the same time be marked as unavailable for selection.
resets the internal forker list to an updated list based on SLP registry
Notes: This runs as a part of the state update driver loop and is
invoked by a system component class.
return is void
'''
updated_forker_list = []
new_forker_locations = {}
found_services = []
asf_re = re.compile('alps_script_forker')
host_re = re.compile(r'https://(?P<host>.*):[0-9]*')
try:
services = ComponentProxy('service-location').get_services([{'name': '*',
'location': '*'}])
services = ComponentProxy('service-location').get_services([{'name': '*', 'location': '*'}])
except Exception:
# SLP is down! We can't contact anybody at all
for forker in self.forker_reachable.keys():
self.forker_reachable[forker] = False
_logger.critical('Unable to reach service-location', exc_info=True)
return
for service in services:
asf_re = re.compile('alps_script_forker')
host_re = re.compile(r'https://(?P<host>.*):[0-9]*')
if re.match(asf_re, service['name']):
found_services.append(service)
loc = re.match(host_re, service['location']).group('host')
if loc:
new_forker_locations[service['name']] = loc
updated_forker_list.append(service['name'])
if service['name'] not in self.forker_taskcounts.keys():
self.forker_taskcounts[service['name']] = 0
_logger.info('Forker %s found', service['name'])
# Get currently running tasks from forkers. Different loop?
self.forkers = updated_forker_list
self.forker_locations = new_forker_locations
with self.process_groups_lock:
self.forkers = updated_forker_list
self.forker_locations = new_forker_locations
for service_name in self.forker_taskcounts.keys():
self.forker_reachable[service_name] = service_name in [fs['name'] for fs in found_services]
return
"""Contains the ProcessGroup and ProcessGroupDict Data Types"""
__revision__ = "$Revision$"
import logging
import signal
import xmlrpclib
from Cobalt.Data import Data, DataDict, IncrID
from Cobalt.Exceptions import DataCreationError, ProcessGroupStartupError
from Cobalt.Exceptions import ComponentLookupError
from Cobalt.Proxy import ComponentProxy
__revision__ = "$Revision$"
_logger = logging.getLogger()
#Get a list of valid signal strings
SIGNALS = [ s for s in signal.__dict__.keys()
SIGNALS = [s for s in signal.__dict__.keys()
if (s.startswith("SIG") and not s.startswith("SIG_"))]
class ProcessGroup(Data):
......@@ -35,7 +33,7 @@ class ProcessGroup(Data):
location -- location in system where job will run
mode -- "script" or other
nodefile -- used to make a file listing locations that job can run
size --
size -- allocated resource size (usually nodecount)
state -- "running" or "terminated"
stderr -- file to use for stderr of script
stdin -- file to use for stdin of script
......@@ -136,13 +134,23 @@ class ProcessGroup(Data):
data = self.prefork()
self.head_pid = ComponentProxy(self.forker, retry=False).fork([self.executable] + self.args, self.tag,
"Job %s/%s/%s" %(self.jobid, self.user, self.id), self.env, data, self.runid)
except ComponentLookupError:
_logger.error('Unable to reach %s component.', self.forker)
raise
except xmlrpclib.Fault as fault:
_logger.error('XMLRPC fault from %s: code: %s string %s', self.forker, fault.faultCode, fault.faultString)
raise
except xmlrpclib.ProtocolError as err:
_logger.error('Protocol Error while contacting %s. code: %s msg: %s headers: %s', self.forker, err.errcode,
err.errmsg, err.headers)
raise
except:
err = "Job %s/%s/%s: problem forking; %s did not return a child id" % (self.jobid,
self.user, self.id, self.forker)
_logger.error(err)
raise ProcessGroupStartupError(err)
def signal(self, signame="SIGINT"):
def signal(self, signame="SIGTERM"):
'''Validate and send signal to ProcessGroup. Consult your system and
python documentation for valid signals to send.
......
......@@ -2,10 +2,15 @@
import time
import logging
import sys
from nose.tools import raises
from mock import Mock, MagicMock, patch
import Cobalt.Proxy
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
from testsuite.TestCobalt.Utilities.assert_functions import assert_match, assert_not_match
def is_match(a, b):
return a is b
default_child_data = [{'id': 1}]
......@@ -24,7 +29,24 @@ class InspectMock(MagicMock):
return MagicMock(return_value=1)
return super(InspectMock, self).__getattr__(attr)
_loc_list = [{'name': 'system', 'location': 'https://localhost:52140'},
{'name': 'system_script_forker', 'location': 'https://localhost:49242'},
{'name': 'alps_script_forker_localhost_0', 'location': 'https://localhost:39303'},
{'name': 'alps_script_forker_localhost_1', 'location': 'https://localhost:39304'},
{'name': 'scheduler', 'location': 'https://localhost:41740'},
{'name': 'queue-manager', 'location': 'https://localhost:50308'}
]
class ServicesMock(MagicMock):
'''Fake expected services with multiple forkers'''
def __init__(self, *args, **kwargs):
super(ServicesMock, self).__init__(*args, **kwargs)
def __getattr__(self, attr):
if attr == 'get_services':
return MagicMock(return_value=_loc_list)
return super(ServicesMock, self).__getattr__(attr)
class TestProcessManager(object):
'''tests for the base project manager'''
......@@ -37,21 +59,89 @@ class TestProcessManager(object):
}
self.process_manager = ProcessGroupManager()
self.process_manager.forkers = ['forker1']
self.process_manager.forker_taskcounts = {'forker1':0}
self.process_manager.forker_taskcounts = {'forker1': 0}
self.process_manager.forker_reachable = {'forker1': True}
def teardown(self):
'''common teardown for process group tests'''
del self.base_spec
del self.process_manager
def test_process_manager_init_groups_single(self):
def test_init_groups_single(self):
'''ProcessGroupManager.init_groups: create a process group and add to process manager'''
specs = [self.base_spec]
self.process_manager.init_groups(specs)
assert self.process_manager.process_groups.get(1, None) is not None, "process group not created"
assert self.process_manager.process_groups[1].forker == 'forker1', "forker not set"
def test_init_groups_multiple(self):
'''ProcessGroupManager.init_groups: select a forker for pgroup'''
specs = [self.base_spec]
self.process_manager.forkers = ['forker1', 'forker2']
self.process_manager.forker_taskcounts = {'forker1': 0, 'forker2': 0}
self.process_manager.forker_reachable = {'forker1': True, 'forker2': True}
self.process_manager.init_groups(specs)
assert self.process_manager.process_groups.get(1, None) is not None, "process group not created"
assert_match(self.process_manager.process_groups[1].forker, 'forker1', "Incorrect forker set")
assert_match(self.process_manager.forker_taskcounts['forker1'], 1, "wrong taskcount set")
assert_match(self.process_manager.forker_taskcounts['forker2'], 0, "wrong forker taskcount modified")
def test_init_groups_choose_lowest(self):
'''ProcessGroupManager.init_groups: choose lightest forker load'''
specs = [self.base_spec]
self.process_manager.forkers = ['forker1', 'forker2']
self.process_manager.forker_taskcounts = {'forker1': 2, 'forker2': 0}
self.process_manager.forker_reachable = {'forker1': True, 'forker2': True}
self.process_manager.init_groups(specs)
assert self.process_manager.process_groups.get(1, None) is not None, "process group not created"
assert_match(self.process_manager.process_groups[1].forker, 'forker2', "Incorrect forker set")
assert_match(self.process_manager.forker_taskcounts['forker2'], 1, "wrong taskcount set")
assert_match(self.process_manager.forker_taskcounts['forker1'], 2, "wrong forker taskcount modified")
def test_init_groups_round_robin(self):
'''ProcessGroupManager.init_groups: spread across multiple forkers'''
specs1 = [dict(self.base_spec)]
specs2 = [dict(self.base_spec)]
specs2[0]['jobid'] = 2
specs2[0]['location'] = 'loc2'
self.process_manager.forkers = ['forker1', 'forker2']
self.process_manager.forker_taskcounts = {'forker1': 0, 'forker2': 0}
self.process_manager.forker_reachable = {'forker1': True, 'forker2': True}
self.process_manager.init_groups(specs1)
assert self.process_manager.process_groups.get(1, None) is not None, "process group not created"
assert_match(self.process_manager.process_groups[1].forker, 'forker1', "Incorrect forker set")
assert_match(self.process_manager.forker_taskcounts['forker1'], 1, "wrong taskcount set")
assert_match(self.process_manager.forker_taskcounts['forker2'], 0, "wrong forker taskcount modified")
self.process_manager.init_groups(specs2)
assert self.process_manager.process_groups.get(2, None) is not None, "process group not created"
assert_match(self.process_manager.process_groups[2].forker, 'forker2', "Incorrect forker set")
assert_match(self.process_manager.forker_taskcounts['forker2'], 1, "wrong taskcount set")
assert_match(self.process_manager.forker_taskcounts['forker1'], 1, "wrong forker taskcount modified")
def test_init_groups_select_reachable(self):
'''ProcessGroupManager.init_groups: select only reachable forker'''
specs1 = [self.base_spec]
specs2 = [dict(self.base_spec)]
specs2[0]['jobid'] = 2
specs2[0]['location'] = 'loc2'
self.process_manager.forkers = ['forker1', 'forker2']
self.process_manager.forker_taskcounts = {'forker1': 2, 'forker2': 0}
self.process_manager.forker_reachable = {'forker1': True, 'forker2': False}
self.process_manager.init_groups(specs1)
assert self.process_manager.process_groups.get(1, None) is not None, "process group not created"
assert_match(self.process_manager.process_groups[1].forker, 'forker1', "Incorrect forker set")
assert_match(self.process_manager.forker_taskcounts['forker2'], 0, "wrong taskcount set")
assert_match(self.process_manager.forker_taskcounts['forker1'], 3, "wrong forker taskcount modified")
self.process_manager.init_groups(specs2)
assert self.process_manager.process_groups.get(2, None) is not None, "process group not created"
assert_match(self.process_manager.process_groups[2].forker, 'forker1', "Incorrect forker set")
assert_match(self.process_manager.forker_taskcounts['forker2'], 0, "wrong taskcount set")
assert_match(self.process_manager.forker_taskcounts['forker1'], 4, "wrong forker taskcount modified")
@patch.object(Cobalt.Proxy.DeferredProxyMethod, '__call__', return_value=1)
def test_process_manager_start_groups_single(self, *args, **kwargs):
def test_start_groups_single(self, *args, **kwargs):
'''ProcessGroupManager.start_groups: start up a single process group'''
self.base_spec['startup_timeout'] = 120
self.process_manager.init_groups([self.base_spec])
......@@ -61,8 +151,41 @@ class TestProcessManager(object):
assert self.process_manager.process_groups[1].startup_timeout == 0, (
"startup_timeout not reset")
@patch.object(Cobalt.Proxy.DeferredProxyMethod, '__call__', return_value=1,
side_effect=[Cobalt.Exceptions.ComponentLookupError('failed lookup'), 1])
def test_start_groups_one_bad_forker(self, *args, **kwargs):
'''ProcessGroupManager.start_groups: switch forker for failure'''
self.base_spec['startup_timeout'] = 120
self.process_manager.forkers = ['forker1', 'forker2']
self.process_manager.forker_taskcounts = {'forker1': 0, 'forker2': 1}
self.process_manager.forker_reachable = {'forker1': True, 'forker2': True}
self.process_manager.init_groups([self.base_spec])
started = self.process_manager.start_groups([1])
started = self.process_manager.start_groups([1])
assert len(started) == 1, "started %s groups, should have started 1" % len(started)
assert sorted(started) == [1], "wrong groups started."
assert self.process_manager.process_groups[1].startup_timeout == 0, (
"startup_timeout not reset")
assert_match(self.process_manager.process_groups[1].forker, 'forker2', "Wrong forker selected")
assert_match(self.process_manager.forker_taskcounts['forker1'], 0, "Wrong count forker1")
assert_match(self.process_manager.forker_taskcounts['forker2'], 2, "Wrong count forker2")
@patch.object(Cobalt.Proxy.DeferredProxyMethod, '__call__', return_value=1,
side_effect=Cobalt.Exceptions.ComponentLookupError('failed lookup'))
@raises(Cobalt.Exceptions.ProcessGroupStartupError)
def test_start_groups_no_forkers(self, *args, **kwargs):
'''ProcessGroupManager.start_groups: RuntimeError if no forkers reachable'''
self.base_spec['startup_timeout'] = 120
self.process_manager.forkers = ['forker1', 'forker2']
self.process_manager.forker_taskcounts = {'forker1': 0, 'forker2': 1}
self.process_manager.forker_reachable = {'forker1': True, 'forker2': True}
self.process_manager.init_groups([self.base_spec])
started = self.process_manager.start_groups([1])
started = self.process_manager.start_groups([1])
@patch('Cobalt.Proxy.DeferredProxy', side_effect=InspectMock)
def test_process_manager_update_groups_timeout(self, *args, **kwargs):
def test_update_groups_timeout(self, *args, **kwargs):
'''ProcessGroupManager.update_groups: startup timeout respected.'''
now = int(time.time())
pgroups = self.process_manager.process_groups
......@@ -75,9 +198,8 @@ class TestProcessManager(object):
assert pgroups[1].startup_timeout == now + 120, (
"bad startup timeout: %s" % pgroups[1].startup_timeout)
@patch('Cobalt.Proxy.DeferredProxy', side_effect=InspectMock)
def test_process_manager_update_groups_timeout_exceeded(self, *args, **kwargs):
def test_update_groups_over_timeout(self, *args, **kwargs):
'''ProcessGroupManager.update_groups: startup timeout exceeded.'''
now = int(time.time())
pgroups = self.process_manager.process_groups
......@@ -87,3 +209,75 @@ class TestProcessManager(object):
pgroups = self.process_manager.process_groups
assert len(pgroups) == 0, "%s groups, should have 0" % len(pgroups)
assert sorted(pgroups.keys()) == [], "groups should be empty"
class TestPMUpdateLaunchers(object):
def setup(self):
'''common setup for process group tests'''
global _loc_list
self.init_loc_list = _loc_list
def teardown(self):
'''common teardown for process group tests'''
global _loc_list
_loc_list = self.init_loc_list
@patch('Cobalt.Proxy.DeferredProxy', side_effect=ServicesMock)
def test_update_launchers_register(self, *args, **kwargs):
'''ProcessGroupManager.update_launchers: register new launcher'''
pgm = ProcessGroupManager() #implicit update_launchers call on init.
assert_match(pgm.forkers, ['alps_script_forker_localhost_0', 'alps_script_forker_localhost_1'],
'Forker list mismatch')
assert_match(pgm.forker_locations,
{'alps_script_forker_localhost_1': 'localhost',
'alps_script_forker_localhost_0': 'localhost'},
'Incorrect forker locations')
assert_match(pgm.forker_reachable,
{'alps_script_forker_localhost_1': True, 'alps_script_forker_localhost_0': True},
'Incorrect forker reachable')
@patch('Cobalt.Proxy.DeferredProxy', side_effect=ServicesMock)
def test_update_launchers_unregister(self, *args, **kwargs):
'''ProcessGroupManager.update_launchers: detect down forker'''
pgm = ProcessGroupManager() #implicit update_launchers call on init.
global _loc_list
_loc_list = [{'name': 'system', 'location': 'https://localhost:52140'},
{'name': 'system_script_forker', 'location': 'https://localhost:49242'},
{'name': 'alps_script_forker_localhost_1', 'location': 'https://localhost:39304'},
{'name': 'scheduler', 'location': 'https://localhost:41740'},
{'name': 'queue-manager', 'location': 'https://localhost:50308'}
]
pgm.update_launchers()
assert_match(pgm.forkers, ['alps_script_forker_localhost_1'],
'Forker list mismatch')
assert_match(pgm.forker_locations,
{'alps_script_forker_localhost_1': 'localhost'},
'Incorrect forker locations')
assert_match(pgm.forker_reachable,
{'alps_script_forker_localhost_1': True, 'alps_script_forker_localhost_0': False},
'Incorrect forker reachable')
@patch('Cobalt.Proxy.DeferredProxy', side_effect=ServicesMock)
def test_update_launchers_reregister(self, *args, **kwargs):
'''ProcessGroupManager.update_launchers: detect forker reregister'''
pgm = ProcessGroupManager() #implicit update_launchers call on init.
global _loc_list
_loc_list = [{'name': 'system', 'location': 'https://localhost:52140'},
{'name': 'system_script_forker', 'location': 'https://localhost:49242'},
{'name': 'alps_script_forker_localhost_1', 'location': 'https://localhost:39304'},
{'name': 'scheduler', 'location': 'https://localhost:41740'},
{'name': 'queue-manager', 'location': 'https://localhost:50308'}
]
pgm.update_launchers()
_loc_list = self.init_loc_list
pgm.update_launchers()
assert_match(pgm.forkers, ['alps_script_forker_localhost_0', 'alps_script_forker_localhost_1'],
'Forker list mismatch')
assert_match(pgm.forker_locations,
{'alps_script_forker_localhost_1': 'localhost',
'alps_script_forker_localhost_0': 'localhost'},
'Incorrect forker locations')
assert_match(pgm.forker_reachable,
{'alps_script_forker_localhost_1': True, 'alps_script_forker_localhost_0': True},
'Incorrect forker reachable')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment