Commit 40edc37d authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '41-terminal-job-statefile-destruction' into 'master'

Statefiles should survive having a stuck terminal job during restart

Closes #41

See merge request !23
parents cf2d2aea cb45a821
......@@ -2,7 +2,6 @@
# $Id$
'''Cobalt Queue Manager'''
__revision__ = '$Revision$'
#
# TODO:
......@@ -68,7 +67,6 @@ __revision__ = '$Revision$'
# to be non-zero and thus would be a valid exit status if the task was terminated.
#
DEFAULT_FORCE_KILL_DELAY = 5 # (in minutes)
import errno
import logging
......@@ -103,8 +101,10 @@ from Cobalt import accounting
from Cobalt.Statistics import Statistics
from Cobalt.Util import get_config_option, init_cobalt_config
__revision__ = '$Revision$'
init_cobalt_config()
DEFAULT_FORCE_KILL_DELAY = 5 # (in minutes)
CLOB_SIZE = 4096
logger = logging.getLogger(__name__.split('.')[-1])
......@@ -788,7 +788,6 @@ class Job (StateMachine):
self.runid = state.get("runid", None)
#for old statefiles, make sure to update the dependency state on restart:
self.__dep_hold = False
self.update_dep_state()
self.initializing = False
def __task_signal(self, retry = True):
......@@ -3584,6 +3583,18 @@ class QueueManager(Component):
Component.__setstate__(self, state)
self.Queues = state['Queues']
# jobs are reloaded after queues. Update dependencies on jobs now. In
# the event of a non-terminal job, don't try and transition it
# FIXME: migrate change here.
for queue in self.Queues.values():
for job in queue.jobs:
try:
job.update_dep_state()
except StateMachineIllegalEventError:
if job.state == 'done':
logger.warning('Job %s/%s: Job in Terminal state found.', job.jobid, job.user)
else:
raise
use_db_jobid_generator = get_cqm_config("use_db_jobid_generator", "False").lower() in Cobalt.Util.config_true_values
self.id_gen = IncrID(use_database = use_db_jobid_generator)
self.id_gen.set(state['next_job_id'], override = True)
......@@ -3625,7 +3636,6 @@ class QueueManager(Component):
if state.has_key('overflow') and (dbwriter.max_queued != None):
dbwriter.overflow = state['overflow']
def __save_me(self):
Component.save(self)
__save_me = automatic(__save_me, float(get_cqm_config('save_me_interval', 10)))
......@@ -3807,7 +3817,7 @@ class QueueManager(Component):
failure_msg = 'No Max Walltime default or for queue "%s" defined. Please contact system administrator' % spec['queue']
logger.error(failure_msg)
raise QueueError, failure_msg
spec.update({'adminemail':self.Queues[spec['queue']].adminemail})
if walltime_prediction_enabled:
spec['walltime_p'] = self.get_walltime_p(spec) #*AdjEst*
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment