Commit ee252094 authored by Paul Rich's avatar Paul Rich
Browse files

Addressing comments on this code on review. Fix for statefile still pending.

parent 3d1bb7b9
......@@ -121,14 +121,12 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
if len(ordered_forkers) < 0:
raise RuntimeError("Job %s: No forkers registered!", jobid)
else:
count = 0
for forker in ordered_forkers:
if self.forker_reachable[forker]:
selected_forker = ordered_forkers[count]
selected_forker = forker
self.forker_taskcounts[selected_forker] += 1
_logger.info("Job %s using forker %s", jobid, selected_forker)
break
count += 1
if selected_forker is None:
# We didn't find a forker, raise a RuntimeError for this
raise RuntimeError("Job %s: No valid forkers found!" % jobid)
......@@ -344,6 +342,8 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
updated_forker_list = []
new_forker_locations = {}
found_services = []
asf_re = re.compile('alps_script_forker')
host_re = re.compile(r'https://(?P<host>.*):[0-9]*')
try:
services = ComponentProxy('service-location').get_services([{'name': '*', 'location': '*'}])
except Exception:
......@@ -353,8 +353,6 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
_logger.critical('Unable to reach service-location', exc_info=True)
return
for service in services:
asf_re = re.compile('alps_script_forker')
host_re = re.compile(r'https://(?P<host>.*):[0-9]*')
if re.match(asf_re, service['name']):
found_services.append(service)
loc = re.match(host_re, service['location']).group('host')
......@@ -365,8 +363,9 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.forker_taskcounts[service['name']] = 0
_logger.info('Forker %s found', service['name'])
# Get currently running tasks from forkers. Different loop?
self.forkers = updated_forker_list
self.forker_locations = new_forker_locations
for service_name in self.forker_taskcounts.keys():
self.forker_reachable[service_name] = service_name in [fs['name'] for fs in found_services]
with self.process_groups_lock:
self.forkers = updated_forker_list
self.forker_locations = new_forker_locations
for service_name in self.forker_taskcounts.keys():
self.forker_reachable[service_name] = service_name in [fs['name'] for fs in found_services]
return
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment