Commit 88e8325f authored by Paul Rich's avatar Paul Rich
Browse files

finishing putting in process group locking. The timeout is not enough.

parent 3e23a769
"""Resource management for Cray ALPS based systems"""
import logging
import threading
import thread
......@@ -1256,15 +1255,12 @@ class CraySystem(BaseSystem):
if alps_res is not None:
spec['alps_res_id'] = alps_res.alps_res_id
new_pgroups = self.process_manager.init_groups(specs)
_logger.debug('init groups')
for pgroup in new_pgroups:
_logger.info('%s: process group %s created to track job status',
pgroup.label, pgroup.id)
#check resource reservation, and attempt to start. If there's a
#failure here, set exit status in process group to a sentinel value.
try:
_logger.debug('starting groups')
time.sleep(30)
started = self.process_manager.start_groups([pgroup.id])
except ComponentLookupError:
_logger.error("%s: failed to contact the %s component",
......@@ -1297,7 +1293,6 @@ class CraySystem(BaseSystem):
pgroup.exit_status = 255
self.reserve_resources_until(pgroup.location, None,
pgroup.jobid)
end_apg_timer = time.time()
self.logger.debug("add_process_groups startup time: %s sec", (end_apg_timer - start_apg_timer))
return new_pgroups
......@@ -1344,11 +1339,12 @@ class CraySystem(BaseSystem):
is called.
'''
completed_pgs = self.process_manager.update_groups()
for pgroup in completed_pgs:
_logger.info('%s: process group reported as completed with status %s',
pgroup.label, pgroup.exit_status)
self.reserve_resources_until(pgroup.location, None, pgroup.jobid)
with self.process_manager.process_groups_lock:
completed_pgs = self.process_manager.update_groups()
for pgroup in completed_pgs:
_logger.info('%s: process group reported as completed with status %s',
pgroup.label, pgroup.exit_status)
self.reserve_resources_until(pgroup.location, None, pgroup.jobid)
return
@exposed
......
......@@ -2,7 +2,6 @@
"""
import logging
import time
import Queue
......@@ -162,10 +161,8 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
# Hold for update. Process group addition also results in a forker call, so we need to lock that, too
# so we have a consistient view
with self.process_groups_lock:
_logger.debug('updating groups')
now = int(time.time())
for forker in self.forkers:
completed[forker] = []
try:
child_data = ComponentProxy(forker).get_children("process group", None)
except ComponentLookupError, e:
......@@ -174,6 +171,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
_logger.error("unexpected exception while getting a list of children from the %s component",
forker, exc_info=True)
else:
completed[forker] = []
for child in child_data:
children[(forker, child['id'])] = child
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment