Commit d9595cc8 authored by Paul Rich's avatar Paul Rich
Browse files

Fixed error in recovering pgroups.

System component restart on the fly should be safe again.  We recover
the process groups properly now.  Found this while testing other changes
in the fix for aggressive cleanup.
parent 2667e70c
......@@ -19,8 +19,6 @@ from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Util import compact_num_list, expand_num_list
from Cobalt.Util import init_cobalt_config, get_config_option
_logger = logging.getLogger(__name__)
init_cobalt_config()
......@@ -44,9 +42,6 @@ class ALPSProcessGroup(ProcessGroup):
#inherit generic getstate and setstate methods from parent
class CraySystem(BaseSystem):
'''Cray/ALPS-specific system component. Behaviors should go here. Direct
ALPS interaction through BASIL/other APIs should go through the ALPSBridge
......@@ -80,6 +75,8 @@ class CraySystem(BaseSystem):
while bridge_pending:
# purge stale children from prior run. Also ensure the
# system_script_forker is currently up.
# These attempts may fail due to system_script_forker not being up.
# We don't want to trash the statefile in this case.
try:
ALPSBridge.init_bridge()
except ALPSBridge.BridgeError:
......@@ -96,11 +93,10 @@ class CraySystem(BaseSystem):
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
else:
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
self.logger.info('pg type %s',
self.process_manager.process_groups.item_cls)
self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
#self.process_manager.forkers.append('alps_script_forker')
self.process_manager.update_launchers()
self.pending_start_timeout = 1200 #20 minutes for long reboots.
self.pending_start_timeout = PENDING_STARTUP_TIMEOUT
_logger.info('PROCESS MANAGER INTIALIZED')
#resource management setup
self.nodes = {} #cray node_id: CrayNode
......@@ -124,8 +120,7 @@ class CraySystem(BaseSystem):
#state update thread and lock
self._node_lock = threading.RLock()
self._gen_node_to_queue()
self.node_update_thread = thread.start_new_thread(self._run_update_state,
tuple())
self.node_update_thread = thread.start_new_thread(self._run_update_state, tuple())
_logger.info('UPDATE THREAD STARTED')
self.current_equivalence_classes = []
self.killing_jobs = {}
......@@ -388,7 +383,7 @@ class CraySystem(BaseSystem):
#resource reservation
cleanup_nodes = [node for node in self.nodes.values()
if node.status == 'cleanup-pending']
#If we have a block marked for cleanup, send a relesae message.
#If we have a block marked for cleanup, send a release message.
released_res_jobids = []
for node in cleanup_nodes:
for alps_res in self.alps_reservations.values():
......
......@@ -48,13 +48,10 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
else:
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
_logger.info("%s", state['process_groups'])
for pgroup in state['process_groups']:
pg = self.process_groups.item_cls().__setstate__(pgroup)
self.process_groups[pg.id] = pg
self.process_groups.q_add(state['process_groups'])
self.process_groups = state.get('process_groups',
ProcessGroupDict())
for pg in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pg.id, pg.jobid)
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
......@@ -64,8 +61,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
def __getstate__(self):
state = {}
state['process_groups'] = [pg.__getstate__ for pg in
self.process_groups.values()]
state['process_groups'] = self.process_groups
state['next_pg_id'] = self.process_groups.id_gen.idnum + 1
return state
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment