Commit c3cce338 authored by Paul Rich's avatar Paul Rich
Browse files

Forkers that have unregistered will no longer be used to start jobs.

parent 52329a8d
......@@ -122,9 +122,9 @@ class CraySystem(BaseSystem):
if spec is None:
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
self.process_manager = spec['process_manager']
self.process_manager.pgroup_type = ALPSProcessGroup
self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
......@@ -165,7 +165,7 @@ class CraySystem(BaseSystem):
state = {}
state.update(super(CraySystem, self).__getstate__())
state['alps_system_statefile_version'] = 1
state['process_manager'] = self.process_manager.__getstate__()
state['process_manager'] = self.process_manager
state['alps_reservations'] = self.alps_reservations
state['node_info'] = self.nodes
return state
......@@ -1265,7 +1265,11 @@ class CraySystem(BaseSystem):
alps_res = self.alps_reservations.get(str(spec['jobid']), None)
if alps_res is not None:
spec['alps_res_id'] = alps_res.alps_res_id
new_pgroups = self.process_manager.init_groups(specs)
new_pgroups = self.process_manager.init_groups(specs)
except RuntimeError:
_logger.error('Job %s: Unable to initialize process group.', spec['jobid'])
for pgroup in new_pgroups:'%s: process group %s created to track job status',
......@@ -6,6 +6,7 @@ import logging
import time
import Queue
import re
import xmlrpclib
from threading import RLock
from Cobalt.Proxy import ComponentProxy
from Cobalt.DataTypes.ProcessGroup import ProcessGroup, ProcessGroupDict
......@@ -40,8 +41,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
self.process_groups = state.get('process_groups',
self.process_groups = state.get('process_groups', ProcessGroupDict())
for pgroup in self.process_groups.values():'recovering pgroup %s, jobid %s',, pgroup.jobid)
......@@ -49,6 +49,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.forkers = [] #list of forker identifiers to use with ComponentProxy
self.forker_taskcounts = {} # dict of forkers and counts of pgs attached
self.forker_locations = {} # dict of forkers a tuple (host, port)
self.forker_reachable = {} # Is the forker currently reachable?
self.remote_qsub_hosts = [] # list of hosts that qsub -I requires
# ssh-ing to a forker host
self.process_groups_lock = RLock()
......@@ -67,12 +68,14 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
def __getstate__(self):
state = {}
state['pgroup_type'] = self.pgroup_type
state['process_groups'] = self.process_groups
state['next_pg_id'] = self.process_groups.id_gen.idnum + 1
return state
def __setstate__(self, state):
self.pgroup_type = state.get('pgroup_type', ProcessGroup)
return self
......@@ -91,16 +94,44 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
# modify the forker in specs to force the job to round-robbin forkers
with self.process_groups_lock:
for spec in specs:
ordered_forkers = [f[0] for f in
sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
spec['forker'] = ordered_forkers[0] #this is now a tuple
self.forker_taskcounts[spec['forker']] += 1"Job %s using forker %s", spec['jobid'], spec['forker'])
spec['forker'] = self._select_forker(spec['jobid'])
except RuntimeError:
_logger.error('Job %s: Unable to find valid forker to associate with pending process group. Failing startup.',
return self.process_groups.q_add(specs)
def _select_forker(self, jobid):
'''Select a forker from the list of registered forkers for job execution.
This favors the forker with the lowest current running jobcount.
jobid - jobid for ProcessGroup object that we are assigning a forker to.
String name of forker to use. If none found, None returned
Raises a RuntimeError if there are no registered forkers, or none otherwise available.
selected_forker = None
ordered_forkers = [f[0] for f in sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("Job %s: No forkers registered!", jobid)
for forker in ordered_forkers:
if self.forker_reachable[forker]:
selected_forker = ordered_forkers[0] #this is now a tuple
self.forker_taskcounts[selected_forker] += 1"Job %s using forker %s", jobid, selected_forker)
if selected_forker is None:
# We didn't find a forker, raise a RuntimeError for this
raise RuntimeError("Job %s: No valid forkers found!" % jobid)
return selected_forker
def signal_groups(self, pgids, signame="SIGINT"):
'''Send signal with signame to a list of process groups.
......@@ -136,14 +167,24 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
with self.process_groups_lock:
started = []
for pg_id in pgids:
process_group = self.process_groups[pg_id]
except ProcessGroupStartupError:
_logger.error("%s: Unable to start process group.",
except ComponentLookupError:
# Retry this with a different forker, if we run out of forkers, then this startup fails.
self.forker_reachable[process_group.forker] = False
process_group.forker = self._select_forker(process_group.jobid)
except RuntimeError as err:
#No forkers left!
_logger.critical('%s: Unable to assign forker to starting job. Failing startup: %s',
process_group.label, err.message)
raise ProcessGroupStartupError('No functional forkers.')
except (ProcessGroupStartupError, xmlrpclib.Fault, xmlrpclib.ProtocolError):
_logger.error("%s: Unable to start process group.", process_group.label)
self.process_groups[pg_id].startup_timeout = 0
process_group.startup_timeout = 0
return started
#make automatic get final status of process group
......@@ -281,33 +322,48 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
alps_forkers. Drop entries that slp doesn't know about and add in ones
that it does.
Will want to run this in the normal update loop
Args: None
Returns: None
If we have no launchers, we should prevent jobs from starting.
Side Effects:
Updates current active forkers. If a new forker is found this is
added to the list we can select from. If a loss-of-contact is
detected, by a forker being unregistered with SLP, then the forker
data will be retained for possible reconnection, while it will at
the same time be marked as unavailable for selection.
resets the internal forker list to an updated list based on SLP registry
Notes: This runs as a part of the state update driver loop and is
invoked by a system component class.
return is void
updated_forker_list = []
new_forker_locations = {}
found_services = []
services = ComponentProxy('service-location').get_services([{'name': '*',
'location': '*'}])
services = ComponentProxy('service-location').get_services([{'name': '*', 'location': '*'}])
except Exception:
# SLP is down! We can't contact anybody at all
for forker in self.forker_reachable.keys():
self.forker_reachable[forker] = False
_logger.critical('Unable to reach service-location', exc_info=True)
for service in services:
asf_re = re.compile('alps_script_forker')
host_re = re.compile(r'https://(?P<host>.*):[0-9]*')
if re.match(asf_re, service['name']):
loc = re.match(host_re, service['location']).group('host')
if loc:
new_forker_locations[service['name']] = loc
if service['name'] not in self.forker_taskcounts.keys():
self.forker_taskcounts[service['name']] = 0'Forker %s found', service['name'])
for service_name in updated_forker_list:
if service_name not in found_services:
self.forker_reachable[service_name] = service_name in [fs['name'] for fs in found_services]
# Get currently running tasks from forkers. Different loop?
self.forkers = updated_forker_list
self.forker_locations = new_forker_locations
"""Contains the ProcessGroup and ProcessGroupDict Data Types"""
__revision__ = "$Revision$"
import logging
import signal
import xmlrpclib
from Cobalt.Data import Data, DataDict, IncrID
from Cobalt.Exceptions import DataCreationError, ProcessGroupStartupError
from Cobalt.Exceptions import ComponentLookupError
from Cobalt.Proxy import ComponentProxy
__revision__ = "$Revision$"
_logger = logging.getLogger()
#Get a list of valid signal strings
SIGNALS = [ s for s in signal.__dict__.keys()
SIGNALS = [s for s in signal.__dict__.keys()
if (s.startswith("SIG") and not s.startswith("SIG_"))]
class ProcessGroup(Data):
......@@ -35,7 +33,7 @@ class ProcessGroup(Data):
location -- location in system where job will run
mode -- "script" or other
nodefile -- used to make a file listing locations that job can run
size --
size -- allocated resource size (usually nodecount)
state -- "running" or "terminated"
stderr -- file to use for stderr of script
stdin -- file to use for stdin of script
......@@ -136,6 +134,16 @@ class ProcessGroup(Data):
data = self.prefork()
self.head_pid = ComponentProxy(self.forker, retry=False).fork([self.executable] + self.args, self.tag,
"Job %s/%s/%s" %(self.jobid, self.user,, self.env, data, self.runid)
except ComponentLookupError:
_logger.error('Unable to reach %s component.', self.forker)
except xmlrpclib.Fault as fault:
_logger.error('XMLRPC fault from %s: code: %s string %s', self.forker, fault.faultCode, fault.faultString)
except xmlrpclib.ProtocolError as err:
_logger.error('Protocol Error while contacting %s. code: %s msg: %s headers: %s', self.forker, err.errcode,
err.errmsg, err.headers)
err = "Job %s/%s/%s: problem forking; %s did not return a child id" % (self.jobid,
self.user,, self.forker)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment