Commit 079d16a5 authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '26-fix-overly-aggressive-cleanup' into 'master'

Resolve "FIX: Overly Aggressive Cleanup"

Closes #26

See merge request !11
parents 2667e70c 1319037d
......@@ -19,8 +19,6 @@ from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Util import compact_num_list, expand_num_list
from Cobalt.Util import init_cobalt_config, get_config_option
_logger = logging.getLogger(__name__)
init_cobalt_config()
......@@ -44,9 +42,6 @@ class ALPSProcessGroup(ProcessGroup):
#inherit generic getstate and setstate methods from parent
class CraySystem(BaseSystem):
'''Cray/ALPS-specific system component. Behaviors should go here. Direct
ALPS interaction through BASIL/other APIs should go through the ALPSBridge
......@@ -80,6 +75,8 @@ class CraySystem(BaseSystem):
while bridge_pending:
# purge stale children from prior run. Also ensure the
# system_script_forker is currently up.
# These attempts may fail due to system_script_forker not being up.
# We don't want to trash the statefile in this case.
try:
ALPSBridge.init_bridge()
except ALPSBridge.BridgeError:
......@@ -96,11 +93,10 @@ class CraySystem(BaseSystem):
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
else:
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
self.logger.info('pg type %s',
self.process_manager.process_groups.item_cls)
self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
#self.process_manager.forkers.append('alps_script_forker')
self.process_manager.update_launchers()
self.pending_start_timeout = 1200 #20 minutes for long reboots.
self.pending_start_timeout = PENDING_STARTUP_TIMEOUT
_logger.info('PROCESS MANAGER INTIALIZED')
#resource management setup
self.nodes = {} #cray node_id: CrayNode
......@@ -124,8 +120,7 @@ class CraySystem(BaseSystem):
#state update thread and lock
self._node_lock = threading.RLock()
self._gen_node_to_queue()
self.node_update_thread = thread.start_new_thread(self._run_update_state,
tuple())
self.node_update_thread = thread.start_new_thread(self._run_update_state, tuple())
_logger.info('UPDATE THREAD STARTED')
self.current_equivalence_classes = []
self.killing_jobs = {}
......@@ -388,7 +383,7 @@ class CraySystem(BaseSystem):
#resource reservation
cleanup_nodes = [node for node in self.nodes.values()
if node.status == 'cleanup-pending']
#If we have a block marked for cleanup, send a relesae message.
#If we have a block marked for cleanup, send a release message.
released_res_jobids = []
for node in cleanup_nodes:
for alps_res in self.alps_reservations.values():
......@@ -1280,23 +1275,30 @@ class ALPSReservation(object):
# fetch reservation information so that we can send kills to
# interactive apruns.
resinfo = ALPSBridge.fetch_reservations()
apids = _find_non_batch_apids(resinfo['reservations'])
apids = _find_non_batch_apids(resinfo['reservations'], self.alps_res_id)
else:
_logger.info('ALPS reservation: %s has no claims left.',
self.alps_res_id)
self.dying = True
return apids
def _find_non_batch_apids(resinfo):
'''Extract apids from non-batch items.'''
def _find_non_batch_apids(resinfo, alps_res_id):
'''Extract apids from non-basil items.'''
apids = []
for alps_res in resinfo:
#wow, this is ugly.
for applications in alps_res['ApplicationArray']:
for application in applications.values():
for app_data in application:
for commands in app_data['CommandArray']:
for command in commands.values():
if command[0]['cmd'] != 'BASIL':
apids.append(app_data['application_id'])
if str(alps_res['reservation_id']) == str(alps_res_id):
#wow, this is ugly. Traversing the XML from BASIL
for applications in alps_res['ApplicationArray']:
for application in applications.values():
for app_data in application:
# applicaiton id is at the app_data level. Multiple
# commands don't normally happen. Maybe in a MPMD job?
# All commands will have the same applicaiton id.
for commands in app_data['CommandArray']:
for command in commands.values():
# BASIL is the indicaiton of a apbasil
# reservation. apruns with the application of
# BASIL would be an error.
if command[0]['cmd'] != 'BASIL':
apids.append(app_data['application_id'])
return apids
......@@ -48,13 +48,10 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
else:
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
_logger.info("%s", state['process_groups'])
for pgroup in state['process_groups']:
pg = self.process_groups.item_cls().__setstate__(pgroup)
self.process_groups[pg.id] = pg
self.process_groups.q_add(state['process_groups'])
self.process_groups = state.get('process_groups',
ProcessGroupDict())
for pg in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pg.id, pg.jobid)
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
......@@ -64,8 +61,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
def __getstate__(self):
state = {}
state['process_groups'] = [pg.__getstate__ for pg in
self.process_groups.values()]
state['process_groups'] = self.process_groups
state['next_pg_id'] = self.process_groups.id_gen.idnum + 1
return state
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment